asyncio: poc

This commit is contained in:
Li Jie
2024-07-25 08:50:49 +08:00
parent 89bdb315d5
commit f2e15a6846
3 changed files with 499 additions and 155 deletions

View File

@@ -17,6 +17,8 @@
package io
import (
"log"
"sync"
_ "unsafe"
"time"
@@ -26,87 +28,62 @@ const (
LLGoPackage = "decl"
)
var debugAsync = false
type Void = [0]byte
// -----------------------------------------------------------------------------
type asyncCall interface {
Resume()
Call()
Done() bool
}
type AsyncCall[OutT any] interface {
Call()
Await(timeout ...time.Duration) (ret OutT, err error)
Chan() <-chan OutT
EnsureDone()
Done() bool
}
// llgo:link AsyncCall.Await llgo.await
func Await[OutT any](call AsyncCall[OutT], timeout ...time.Duration) (ret OutT, err error) {
return
type executor struct {
ac asyncCall
mu sync.Mutex
cond *sync.Cond
susp bool
}
//go:linkname Timeout llgo.timeout
func Timeout(time.Duration) (ret AsyncCall[Void])
func newExecutor() *executor {
e := &executor{}
e.cond = sync.NewCond(&e.mu)
return e
}
func TimeoutCompiled(d time.Duration) *PromiseImpl[Void] {
P := &PromiseImpl[Void]{}
P.Func = func(resolve func(Void, error)) {
go func() {
time.Sleep(d)
resolve(Void{}, nil)
}()
func (e *executor) Resume() {
e.mu.Lock()
defer e.mu.Unlock()
e.susp = false
e.cond.Signal()
}
func Run[OutT any](ac AsyncCall[OutT]) (OutT, error) {
e := newExecutor()
p := ac.(*PromiseImpl[OutT])
p.Exec = e
for {
e.mu.Lock()
for e.susp {
e.cond.Wait()
}
e.mu.Unlock()
e.susp = true
if ac.Done() {
return p.Value, p.Err
}
ac.Call()
}
return P
}
// llgo:link Race llgo.race
func Race[OutT any](acs ...AsyncCall[OutT]) (ret *PromiseImpl[OutT]) {
return
}
func All[OutT any](acs []AsyncCall[OutT]) (ret *PromiseImpl[[]OutT]) {
return nil
}
// llgo:link Await2 llgo.await
func Await2[OutT1, OutT2 any](
ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2],
timeout ...time.Duration) (ret1 OutT1, ret2 OutT2, err error) {
return
}
type Await2Result[T1 any, T2 any] struct {
V1 T1
V2 T2
Err error
}
func Await2Compiled[OutT1, OutT2 any](
ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2],
timeout ...time.Duration) (ret *PromiseImpl[Await2Result[OutT1, OutT2]]) {
return
}
// llgo:link Await3 llgo.await
func Await3[OutT1, OutT2, OutT3 any](
ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3],
timeout ...time.Duration) (ret1 OutT1, ret2 OutT2, ret3 OutT3, err error) {
return
}
type Await3Result[T1 any, T2 any, T3 any] struct {
V1 T1
V2 T2
V3 T3
Err error
}
func Await3Compiled[OutT1, OutT2, OutT3 any](
ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3],
timeout ...time.Duration) (ret *PromiseImpl[Await3Result[OutT1, OutT2, OutT3]]) {
return
}
func Run(ac AsyncCall[Void]) {
p := ac.(*PromiseImpl[Void])
p.Resume()
<-ac.Chan()
}
// -----------------------------------------------------------------------------
@@ -118,37 +95,49 @@ func (p Promise[OutT]) Await(timeout ...time.Duration) (ret OutT, err error) {
return
}
func (p Promise[OutT]) Call() {
}
func (p Promise[OutT]) Chan() <-chan OutT {
return nil
}
func (p Promise[OutT]) EnsureDone() {
func (p Promise[OutT]) Done() bool {
return false
}
// -----------------------------------------------------------------------------
type PromiseImpl[TOut any] struct {
Func func(resolve func(TOut, error))
Value TOut
Err error
Prev int
Next int
Exec *executor
Debug string
c chan TOut
Func func(resolve func(TOut, error))
Err error
Value TOut
c chan TOut
}
func (p *PromiseImpl[TOut]) Resume() {
p.Func(func(v TOut, err error) {
p.Value = v
p.Err = err
})
p.Exec.Resume()
}
func (p *PromiseImpl[TOut]) EnsureDone() {
if p.Next == -1 {
panic("Promise already done")
}
func (p *PromiseImpl[TOut]) Done() bool {
return p.Next == -1
}
func (p *PromiseImpl[TOut]) Call() {
p.Func(func(v TOut, err error) {
if debugAsync {
log.Printf("Resolve task: %+v, %+v, %+v\n", p, v, err)
}
p.Value = v
p.Err = err
p.Resume()
})
}
func (p *PromiseImpl[TOut]) Chan() <-chan TOut {