diff --git a/x/io/_demo/asyncdemo/async.go b/x/io/_demo/asyncdemo/async.go index 4cce61c4..3cccb9ed 100644 --- a/x/io/_demo/asyncdemo/async.go +++ b/x/io/_demo/asyncdemo/async.go @@ -8,6 +8,8 @@ import ( "time" "github.com/goplus/llgo/x/io" + "github.com/goplus/llgo/x/io/naive" + "github.com/goplus/llgo/x/tuple" ) // ----------------------------------------------------------------------------- @@ -35,22 +37,19 @@ type Response struct { Body string } -func (r *Response) Text() *io.Promise[io.R2[string, error]] { - co := &io.Promise[io.R2[string, error]]{} - co.Func = func() { - co.Return(io.R2[string, error]{V1: r.Body, V2: nil}) - } - return co +func (r *Response) Text() (co *io.Promise[tuple.Tuple2[string, error]]) { + co.Return(tuple.Tuple2[string, error]{V1: r.Body, V2: nil}) + return } -func (r *Response) TextCompiled() *io.Promise[io.R2[string, error]] { - co := &io.Promise[io.R2[string, error]]{} +func (r *Response) TextCompiled() *naive.PromiseImpl[tuple.Tuple2[string, error]] { + co := &naive.PromiseImpl[tuple.Tuple2[string, error]]{} co.Debug = "Text" co.Func = func() { switch co.Next { case 0: co.Next = -1 - co.Return(io.R2[string, error]{V1: r.Body, V2: nil}) + co.Return(tuple.Tuple2[string, error]{V1: r.Body, V2: nil}) return default: panic("Promise already done") @@ -59,26 +58,31 @@ func (r *Response) TextCompiled() *io.Promise[io.R2[string, error]] { return co } -func AsyncHttpGet(url string) *io.Promise[io.R2[*Response, error]] { - co := &io.Promise[io.R2[*Response, error]]{} - co.Func = func() { - http("GET", url, func(resp *Response, err error) { - co.Return(io.R2[*Response, error]{V1: resp, V2: nil}) - }) - } +// async AsyncHttpGet(url string) (resp *Response, err error) { +// http("GET", url, func(resp *Response, err error) { +// return resp, err +// }) +// } +func AsyncHttpGet(url string) *io.Promise[tuple.Tuple2[*Response, error]] { + co := &io.Promise[tuple.Tuple2[*Response, error]]{} + http("GET", url, func(resp *Response, err error) { + co.Return(tuple.Tuple2[*Response, error]{V1: resp, V2: nil}) + }) + co.Suspend() return co } -func AsyncHttpGetCompiled(url string) *io.Promise[io.R2[*Response, error]] { - co := &io.Promise[io.R2[*Response, error]]{} +func AsyncHttpGetCompiled(url string) *naive.PromiseImpl[tuple.Tuple2[*Response, error]] { + co := &naive.PromiseImpl[tuple.Tuple2[*Response, error]]{} co.Debug = "HttpGet" co.Func = func() { switch co.Next { case 0: co.Next = -1 http("GET", url, func(resp *Response, err error) { - co.Return(io.R2[*Response, error]{V1: resp, V2: nil}) + co.Return(tuple.Tuple2[*Response, error]{V1: resp, V2: nil}) }) + co.Suspend() return default: panic("Promise already done") @@ -87,25 +91,24 @@ func AsyncHttpGetCompiled(url string) *io.Promise[io.R2[*Response, error]] { return co } -func AsyncHttpPost(url string) *io.Promise[io.R2[*Response, error]] { - co := &io.Promise[io.R2[*Response, error]]{} - co.Func = func() { - http("POST", url, func(resp *Response, err error) { - co.Return(io.R2[*Response, error]{V1: resp, V2: nil}) - }) - } +func AsyncHttpPost(url string) *io.Promise[tuple.Tuple2[*Response, error]] { + co := &io.Promise[tuple.Tuple2[*Response, error]]{} + http("POST", url, func(resp *Response, err error) { + co.Return(tuple.Tuple2[*Response, error]{V1: resp, V2: nil}) + }) + co.Suspend() return co } -func AsyncHttpPostCompiled(url string) *io.Promise[io.R2[*Response, error]] { - P := &io.Promise[io.R2[*Response, error]]{} +func AsyncHttpPostCompiled(url string) *naive.PromiseImpl[tuple.Tuple2[*Response, error]] { + P := &naive.PromiseImpl[tuple.Tuple2[*Response, error]]{} P.Debug = "HttpPost" P.Func = func() { switch P.Next { case 0: P.Next = -1 http("POST", url, func(resp *Response, err error) { - P.Return(io.R2[*Response, error]{V1: resp, V2: nil}) + P.Return(tuple.Tuple2[*Response, error]{V1: resp, V2: nil}) }) return default: @@ -121,43 +124,43 @@ type User struct { Name string } -func GetUser(name string) (co *io.Promise[io.R2[User, error]]) { +func GetUser(name string) (co *naive.PromiseImpl[tuple.Tuple2[User, error]]) { resp, err := AsyncHttpGet("http://example.com/user/" + name).Await().Values() if err != nil { // return User{}, err - co.Return(io.R2[User, error]{V1: User{}, V2: err}) + co.Return(tuple.Tuple2[User, error]{V1: User{}, V2: err}) return } if resp.StatusCode != 200 { // return User{}, fmt.Errorf("http status code: %d", resp.StatusCode) - co.Return(io.R2[User, error]{V1: User{}, V2: fmt.Errorf("http status code: %d", resp.StatusCode)}) + co.Return(tuple.Tuple2[User, error]{V1: User{}, V2: fmt.Errorf("http status code: %d", resp.StatusCode)}) return } body, err := resp.Text().Await().Values() if err != nil { // return User{}, err - co.Return(io.R2[User, error]{V1: User{}, V2: err}) + co.Return(tuple.Tuple2[User, error]{V1: User{}, V2: err}) return } user := User{} if err := json.Unmarshal([]byte(body), &user); err != nil { // return User{}, err - co.Return(io.R2[User, error]{V1: User{}, V2: err}) + co.Return(tuple.Tuple2[User, error]{V1: User{}, V2: err}) return } // return user, nil - co.Return(io.R2[User, error]{V1: user, V2: nil}) + co.Return(tuple.Tuple2[User, error]{V1: user, V2: nil}) return } -func GetUserCompiled(name string) (co *io.Promise[io.R2[User, error]]) { - var state1 *io.Promise[io.R2[*Response, error]] - var state2 *io.Promise[io.R2[string, error]] +func GetUserCompiled(name string) (co *naive.PromiseImpl[tuple.Tuple2[User, error]]) { + var state1 *naive.PromiseImpl[tuple.Tuple2[*Response, error]] + var state2 *naive.PromiseImpl[tuple.Tuple2[string, error]] - co = &io.Promise[io.R2[User, error]]{} + co = &naive.PromiseImpl[tuple.Tuple2[User, error]]{} co.Debug = "GetUser" co.Func = func() { switch co.Next { @@ -173,12 +176,12 @@ func GetUserCompiled(name string) (co *io.Promise[io.R2[User, error]]) { resp, err := state1.Value().Values() log.Printf("resp: %v, err: %v\n", resp, err) if err != nil { - co.Return(io.R2[User, error]{V1: User{}, V2: err}) + co.Return(tuple.Tuple2[User, error]{V1: User{}, V2: err}) return } if resp.StatusCode != 200 { - co.Return(io.R2[User, error]{V1: User{}, V2: fmt.Errorf("http status code: %d", resp.StatusCode)}) + co.Return(tuple.Tuple2[User, error]{V1: User{}, V2: fmt.Errorf("http status code: %d", resp.StatusCode)}) return } @@ -191,18 +194,18 @@ func GetUserCompiled(name string) (co *io.Promise[io.R2[User, error]]) { co.Next = -1 body, err := state2.Value().Values() if err != nil { - co.Return(io.R2[User, error]{V1: User{}, V2: err}) + co.Return(tuple.Tuple2[User, error]{V1: User{}, V2: err}) return } user := User{} log.Printf("body: %v\n", body) if err := json.Unmarshal([]byte(body), &user); err != nil { - co.Return(io.R2[User, error]{V1: User{}, V2: err}) + co.Return(tuple.Tuple2[User, error]{V1: User{}, V2: err}) return } log.Printf("resolve user: %+v\n", user) - co.Return(io.R2[User, error]{V1: user, V2: nil}) + co.Return(tuple.Tuple2[User, error]{V1: user, V2: nil}) return default: panic(fmt.Errorf("Promise already done, %+v", co)) @@ -211,43 +214,43 @@ func GetUserCompiled(name string) (co *io.Promise[io.R2[User, error]]) { return } -func GetScore() (co *io.Promise[io.R2[float64, error]]) { +func GetScore() (co *naive.PromiseImpl[tuple.Tuple2[float64, error]]) { resp, err := AsyncHttpGet("http://example.com/score/").Await().Values() if err != nil { - co.Return(io.R2[float64, error]{V1: 0, V2: err}) + co.Return(tuple.Tuple2[float64, error]{V1: 0, V2: err}) return } if resp.StatusCode != 200 { // return 0, fmt.Errorf("http status code: %d", resp.StatusCode) - co.Return(io.R2[float64, error]{V1: 0, V2: fmt.Errorf("http status code: %d", resp.StatusCode)}) + co.Return(tuple.Tuple2[float64, error]{V1: 0, V2: fmt.Errorf("http status code: %d", resp.StatusCode)}) return } body, err := resp.Text().Await().Values() if err != nil { // return 0, err - co.Return(io.R2[float64, error]{V1: 0, V2: err}) + co.Return(tuple.Tuple2[float64, error]{V1: 0, V2: err}) return } score := 0.0 if _, err := fmt.Sscanf(body, "%f", &score); err != nil { // return 0, err - co.Return(io.R2[float64, error]{V1: 0, V2: err}) + co.Return(tuple.Tuple2[float64, error]{V1: 0, V2: err}) return } // return score, nil - co.Return(io.R2[float64, error]{V1: score, V2: nil}) + co.Return(tuple.Tuple2[float64, error]{V1: score, V2: nil}) return } -func GetScoreCompiled() *io.Promise[io.R2[float64, error]] { - var state1 *io.Promise[io.R2[*Response, error]] - var state2 *io.Promise[io.R2[string, error]] +func GetScoreCompiled() *naive.PromiseImpl[tuple.Tuple2[float64, error]] { + var state1 *naive.PromiseImpl[tuple.Tuple2[*Response, error]] + var state2 *naive.PromiseImpl[tuple.Tuple2[string, error]] - co := &io.Promise[io.R2[float64, error]]{} + co := &naive.PromiseImpl[tuple.Tuple2[float64, error]]{} co.Debug = "GetScore" co.Func = func() { switch co.Next { @@ -263,12 +266,12 @@ func GetScoreCompiled() *io.Promise[io.R2[float64, error]] { resp, err := state1.Value().Values() if err != nil { - co.Return(io.R2[float64, error]{V1: 0, V2: err}) + co.Return(tuple.Tuple2[float64, error]{V1: 0, V2: err}) return } if resp.StatusCode != 200 { - co.Return(io.R2[float64, error]{V1: 0, V2: fmt.Errorf("http status code: %d", resp.StatusCode)}) + co.Return(tuple.Tuple2[float64, error]{V1: 0, V2: fmt.Errorf("http status code: %d", resp.StatusCode)}) return } @@ -282,16 +285,16 @@ func GetScoreCompiled() *io.Promise[io.R2[float64, error]] { co.Next = -1 body, err := state2.Value().Values() if err != nil { - co.Return(io.R2[float64, error]{V1: 0, V2: err}) + co.Return(tuple.Tuple2[float64, error]{V1: 0, V2: err}) return } score := 0.0 if _, err := fmt.Sscanf(body, "%f", &score); err != nil { - co.Return(io.R2[float64, error]{V1: 0, V2: err}) + co.Return(tuple.Tuple2[float64, error]{V1: 0, V2: err}) return } - co.Return(io.R2[float64, error]{V1: score, V2: nil}) + co.Return(tuple.Tuple2[float64, error]{V1: score, V2: nil}) return default: panic("Promise already done") @@ -300,7 +303,7 @@ func GetScoreCompiled() *io.Promise[io.R2[float64, error]] { return co } -func DoUpdate(op string) (co *io.Promise[error]) { +func DoUpdate(op string) (co *naive.PromiseImpl[error]) { resp, err := AsyncHttpPost("http://example.com/update/" + op).Await().Values() if err != nil { co.Return(err) @@ -315,10 +318,10 @@ func DoUpdate(op string) (co *io.Promise[error]) { return } -func DoUpdateCompiled(op string) *io.Promise[error] { - var state1 *io.Promise[io.R2[*Response, error]] +func DoUpdateCompiled(op string) *naive.PromiseImpl[error] { + var state1 *naive.PromiseImpl[tuple.Tuple2[*Response, error]] - co := &io.Promise[error]{} + co := &naive.PromiseImpl[error]{} co.Debug = "DoUpdate" co.Func = func() { switch co.Next { @@ -351,15 +354,15 @@ func DoUpdateCompiled(op string) *io.Promise[error] { return co } -func GenInts() (co *io.Promise[int]) { +func GenInts() (co *naive.PromiseImpl[int]) { co.Yield(3) co.Yield(2) co.Yield(5) return } -func GenIntsCompiled() *io.Promise[int] { - co := &io.Promise[int]{} +func GenIntsCompiled() *naive.PromiseImpl[int] { + co := &naive.PromiseImpl[int]{} co.Debug = "GenInts" co.Func = func() { switch co.Next { @@ -385,7 +388,7 @@ func GenIntsCompiled() *io.Promise[int] { } // Generator with async calls and panic -func GenUsers() (co *io.Promise[User]) { +func GenUsers() (co *naive.PromiseImpl[User]) { u, err := GetUser("Alice").Await().Values() if err != nil { panic(err) @@ -405,10 +408,10 @@ func GenUsers() (co *io.Promise[User]) { return } -func GenUsersCompiled() (resolve *io.Promise[User]) { - var state1, state2, state3 *io.Promise[io.R2[User, error]] +func GenUsersCompiled() (resolve *naive.PromiseImpl[User]) { + var state1, state2, state3 *naive.PromiseImpl[tuple.Tuple2[User, error]] - co := &io.Promise[User]{} + co := &naive.PromiseImpl[User]{} co.Debug = "GenUsers" co.Func = func() { switch co.Next { @@ -469,17 +472,17 @@ func GenUsersCompiled() (resolve *io.Promise[User]) { return co } -func Demo() { +func Demo() (co *io.Promise[io.Void]) { user, err := GetUser("1").Await().Values() log.Println(user, err) - user, err = io.Race[io.R2[User, error]](GetUser("2"), GetUser("3"), GetUser("4")).Value().Values() + user, err = naive.Race[tuple.Tuple2[User, error]](GetUser("2"), GetUser("3"), GetUser("4")).Value().Values() log.Println(user, err) - users := io.All[io.R2[User, error]]([]io.AsyncCall[io.R2[User, error]]{GetUser("5"), GetUser("6"), GetUser("7")}).Value() + users := naive.All[tuple.Tuple2[User, error]]([]naive.AsyncCall[tuple.Tuple2[User, error]]{GetUser("5"), GetUser("6"), GetUser("7")}).Value() log.Println(users, err) - user, score, _ := io.Await3Compiled[User, float64, io.Void](GetUser("8"), GetScore(), DoUpdate("update sth.")).Value().Values() + user, score, _ := naive.Await3Compiled[User, float64, io.Void](GetUser("8"), GetScore(), DoUpdate("update sth.")).Value().Values() log.Println(user, score, err) // for loop with generator @@ -496,8 +499,8 @@ func Demo() { // for u, err := range GenUsers() {...} g1 := GenUsers() for { - g.Call() - u, err := io.Await[int](g) + g1.Call() + u := g1.Await() if g1.Done() { break } @@ -513,17 +516,21 @@ func Demo() { // case <-io.Timeout(5 * time.Second).Chan(): // log.Println("timeout") // } + + log.Println("Demo done") + co.Return(io.Void{}) + return } -func DemoCompiled() *io.Promise[io.Void] { - var state1 *io.Promise[io.R2[User, error]] - var state2 *io.Promise[io.R2[User, error]] - var state3 *io.Promise[[]io.R2[User, error]] - var state4 *io.Promise[io.R3[io.R2[User, error], io.R2[float64, error], error]] - var g1 *io.Promise[int] - var g2 *io.Promise[User] +func DemoCompiled() *naive.PromiseImpl[io.Void] { + var state1 *naive.PromiseImpl[tuple.Tuple2[User, error]] + var state2 *naive.PromiseImpl[tuple.Tuple2[User, error]] + var state3 *naive.PromiseImpl[[]tuple.Tuple2[User, error]] + var state4 *naive.PromiseImpl[tuple.Tuple3[tuple.Tuple2[User, error], tuple.Tuple2[float64, error], error]] + var g1 *naive.PromiseImpl[int] + var g2 *naive.PromiseImpl[User] - P := &io.Promise[io.Void]{} + P := &naive.PromiseImpl[io.Void]{} P.Debug = "Demo" P.Func = func() { switch P.Next { @@ -539,7 +546,7 @@ func DemoCompiled() *io.Promise[io.Void] { user, err := state1.Value().Values() log.Printf("user: %+v, err: %v\n", user, err) - state2 = io.Race[io.R2[User, error]](GetUserCompiled("2"), GetUserCompiled("3"), GetUserCompiled("4")) + state2 = naive.Race[tuple.Tuple2[User, error]](GetUserCompiled("2"), GetUserCompiled("3"), GetUserCompiled("4")) state2.Exec = P.Exec state2.Parent = P state2.Call() @@ -549,7 +556,7 @@ func DemoCompiled() *io.Promise[io.Void] { user, err := state2.Value().Values() log.Printf("race user: %+v, err: %v\n", user, err) - state3 = io.All[io.R2[User, error]]([]io.AsyncCall[io.R2[User, error]]{GetUserCompiled("5"), GetUserCompiled("6"), GetUserCompiled("7")}) + state3 = naive.All[tuple.Tuple2[User, error]]([]naive.AsyncCall[tuple.Tuple2[User, error]]{GetUserCompiled("5"), GetUserCompiled("6"), GetUserCompiled("7")}) state3.Exec = P.Exec state3.Parent = P state3.Call() @@ -560,7 +567,7 @@ func DemoCompiled() *io.Promise[io.Void] { users := state3.Value() log.Println(users) - state4 = io.Await3Compiled[io.R2[User, error], io.R2[float64, error], error](GetUserCompiled("8"), GetScoreCompiled(), DoUpdateCompiled("update sth.")) + state4 = naive.Await3Compiled[tuple.Tuple2[User, error], tuple.Tuple2[float64, error], error](GetUserCompiled("8"), GetScoreCompiled(), DoUpdateCompiled("update sth.")) state4.Exec = P.Exec state4.Parent = P state4.Call() @@ -604,7 +611,13 @@ func DemoCompiled() *io.Promise[io.Void] { func main() { log.SetFlags(log.Lshortfile | log.LstdFlags) - // io.Run(Demo()) - v := io.Run[io.Void](DemoCompiled()) + log.Printf("=========== Run Naive Demo ===========\n") + v := naive.RunImpl[io.Void](DemoCompiled()) log.Println(v) + log.Printf("=========== Run Naive Demo finished ===========\n") + + log.Printf("=========== Run Demo ===========\n") + v1 := Demo() + log.Println(v1) + log.Printf("=========== Run Demo finished ===========\n") } diff --git a/x/io/io.go b/x/io/io.go index 16ae959f..b6afc62e 100644 --- a/x/io/io.go +++ b/x/io/io.go @@ -17,8 +17,7 @@ package io import ( - "log" - "sync" + "unsafe" _ "unsafe" ) @@ -26,174 +25,49 @@ const ( LLGoPackage = "decl" ) -var debugAsync = false +const debugAsync = false type Void = [0]byte -// ----------------------------------------------------------------------------- - -type asyncCall interface { - parent() asyncCall - Resume() - Call() - Done() bool -} - -type AsyncCall[OutT any] interface { - Resume() -} - -type executor struct { - acs []asyncCall - mu sync.Mutex - cond *sync.Cond -} - -func newExecutor() *executor { - e := &executor{} - e.cond = sync.NewCond(&e.mu) - return e -} - -func (e *executor) schedule(ac asyncCall) { - e.mu.Lock() - e.acs = append(e.acs, ac) - e.mu.Unlock() - e.cond.Signal() -} - -func Run[OutT any](ac AsyncCall[OutT]) OutT { - e := newExecutor() - p := ac.(*Promise[OutT]) - p.Exec = e - var rootAc asyncCall = p - e.schedule(rootAc) - - for { - e.mu.Lock() - for len(e.acs) == 0 { - e.cond.Wait() - } - e.mu.Unlock() - ac := e.acs[0] - e.acs = e.acs[1:] - ac.Call() - if ac.Done() && ac == rootAc { - return p.value - } - } -} +type AsyncCall[TOut any] interface{} // ----------------------------------------------------------------------------- -type R1[T any] struct { - V1 T -} - -func (r R1[T]) Values() T { - return r.V1 -} - -type R2[T1 any, T2 any] struct { - V1 T1 - V2 T2 -} - -func (r R2[T1, T2]) Values() (T1, T2) { - return r.V1, r.V2 -} - -type R3[T1 any, T2 any, T3 any] struct { - V1 T1 - V2 T2 - V3 T3 -} - -func (r R3[T1, T2, T3]) Values() (T1, T2, T3) { - return r.V1, r.V2, r.V3 -} - -type R4[T1 any, T2 any, T3 any, T4 any] struct { - V1 T1 - V2 T2 - V3 T3 - V4 T4 -} - -func (r R4[T1, T2, T3, T4]) Values() (T1, T2, T3, T4) { - return r.V1, r.V2, r.V3, r.V4 -} - type Promise[TOut any] struct { - Debug string - Next int - Exec *executor - Parent asyncCall - - Func func() + hdl unsafe.Pointer value TOut - c chan TOut } -func NewPromise[TOut any](fn func()) *Promise[TOut] { - return &Promise[TOut]{Func: fn} -} - -func (p *Promise[TOut]) parent() asyncCall { - return p.Parent -} - -func (p *Promise[TOut]) Resume() { - if debugAsync { - log.Printf("Resume task: %+v\n", p) - } - p.Exec.schedule(p) -} - -func (p *Promise[TOut]) Done() bool { - return p.Next == -1 -} - -func (p *Promise[TOut]) Call() { - p.Func() +// llgo:link PromiseImpl llgo.coAwait +func (p *Promise[TOut]) Await() TOut { + panic("should not executed") } +// llgo:link Return llgo.coReturn func (p *Promise[TOut]) Return(v TOut) { - // TODO(lijie): panic if already resolved - p.value = v - if p.c != nil { - p.c <- v - } - if debugAsync { - log.Printf("Return task: %+v\n", p) - } - if p.Parent != nil { - p.Parent.Resume() - } + panic("should not executed") } +// llgo:link Yield llgo.coYield func (p *Promise[TOut]) Yield(v TOut) { - p.value = v - if debugAsync { - log.Printf("Yield task: %+v\n", p) - } - if p.Parent != nil { - p.Parent.Resume() - } + panic("should not executed") +} + +// llgo:link Suspend llgo.coSuspend +func (p *Promise[TOut]) Suspend() { + panic("should not executed") +} + +// llgo:link Resume llgo.coResume +func (p *Promise[TOut]) Resume() { + panic("should not executed") } func (p *Promise[TOut]) Value() TOut { return p.value } -func (p *Promise[TOut]) Chan() <-chan TOut { - if p.c == nil { - p.c = make(chan TOut, 1) - p.Func() - } - return p.c -} - -func (p *Promise[TOut]) Await() (ret TOut) { - panic("should not called") +// llgo:link Run llgo.coRun +func Run[TOut any](f func() TOut) TOut { + panic("should not executed") } diff --git a/x/io/extra.go b/x/io/naive/extra.go similarity index 74% rename from x/io/extra.go rename to x/io/naive/extra.go index 802a53b1..e92bf37d 100644 --- a/x/io/extra.go +++ b/x/io/naive/extra.go @@ -14,32 +14,27 @@ * limitations under the License. */ -package io +package naive import ( "log" "sync" "time" _ "unsafe" + + "github.com/goplus/llgo/x/io" + "github.com/goplus/llgo/x/tuple" ) // ----------------------------------------------------------------------------- -// llgo:link AsyncCall.Await llgo.await -func Await[OutT any](call AsyncCall[OutT], timeout ...time.Duration) (ret OutT, err error) { - return -} - -//go:linkname Timeout llgo.timeout -func Timeout(time.Duration) (ret AsyncCall[Void]) - -func TimeoutCompiled(d time.Duration) *Promise[Void] { - P := &Promise[Void]{} +func TimeoutCompiled(d time.Duration) *PromiseImpl[io.Void] { + P := &PromiseImpl[io.Void]{} P.Debug = "Timeout" P.Func = func() { go func() { time.Sleep(d) - P.Return(Void{}) + P.Return(io.Void{}) }() } return P @@ -50,17 +45,17 @@ type Result[T any] struct { Err error } -func Race[OutT any](acs ...AsyncCall[OutT]) *Promise[OutT] { +func Race[OutT any](acs ...AsyncCall[OutT]) *PromiseImpl[OutT] { if len(acs) == 0 { panic("race: no promise") } - ps := make([]*Promise[OutT], len(acs)) + ps := make([]*PromiseImpl[OutT], len(acs)) for idx, ac := range acs { - ps[idx] = ac.(*Promise[OutT]) + ps[idx] = ac.(*PromiseImpl[OutT]) } remaining := len(acs) returned := false - P := &Promise[OutT]{} + P := &PromiseImpl[OutT]{} P.Debug = "Race" P.Func = func() { switch P.Next { @@ -100,13 +95,13 @@ func Race[OutT any](acs ...AsyncCall[OutT]) *Promise[OutT] { return P } -func All[OutT any](acs []AsyncCall[OutT]) *Promise[[]OutT] { - ps := make([]*Promise[OutT], len(acs)) +func All[OutT any](acs []AsyncCall[OutT]) *PromiseImpl[[]OutT] { + ps := make([]*PromiseImpl[OutT], len(acs)) for idx, ac := range acs { - ps[idx] = ac.(*Promise[OutT]) + ps[idx] = ac.(*PromiseImpl[OutT]) } done := 0 - P := &Promise[[]OutT]{} + P := &PromiseImpl[[]OutT]{} P.Debug = "All" P.Func = func() { switch P.Next { @@ -150,11 +145,11 @@ func All[OutT any](acs []AsyncCall[OutT]) *Promise[[]OutT] { // llgo:link Await2 llgo.await func Await2Compiled[OutT1, OutT2 any]( ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], - timeout ...time.Duration) (ret *Promise[R3[OutT1, OutT2, error]]) { - p1 := ac1.(*Promise[OutT1]) - p2 := ac2.(*Promise[OutT2]) + timeout ...time.Duration) (ret *PromiseImpl[tuple.Tuple3[OutT1, OutT2, error]]) { + p1 := ac1.(*PromiseImpl[OutT1]) + p2 := ac2.(*PromiseImpl[OutT2]) remaining := 2 - P := &Promise[R3[OutT1, OutT2, error]]{} + P := &PromiseImpl[tuple.Tuple3[OutT1, OutT2, error]]{} P.Debug = "Await2" P.Func = func() { switch P.Next { @@ -178,7 +173,7 @@ func Await2Compiled[OutT1, OutT2 any]( log.Fatalf("io.Await2: not done: %+v, %+v\n", p1, p2) } - P.Return(R3[OutT1, OutT2, error]{ + P.Return(tuple.Tuple3[OutT1, OutT2, error]{ V1: p1.value, V2: p2.value, V3: nil, @@ -194,12 +189,12 @@ func Await2Compiled[OutT1, OutT2 any]( // llgo:link Await2 llgo.await func Await3Compiled[OutT1, OutT2, OutT3 any]( ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3], - timeout ...time.Duration) *Promise[R3[OutT1, OutT2, OutT3]] { - p1 := ac1.(*Promise[OutT1]) - p2 := ac2.(*Promise[OutT2]) - p3 := ac3.(*Promise[OutT3]) + timeout ...time.Duration) *PromiseImpl[tuple.Tuple3[OutT1, OutT2, OutT3]] { + p1 := ac1.(*PromiseImpl[OutT1]) + p2 := ac2.(*PromiseImpl[OutT2]) + p3 := ac3.(*PromiseImpl[OutT3]) remaining := 3 - P := &Promise[R3[OutT1, OutT2, OutT3]]{} + P := &PromiseImpl[tuple.Tuple3[OutT1, OutT2, OutT3]]{} P.Debug = "Await3" P.Func = func() { switch P.Next { @@ -228,7 +223,7 @@ func Await3Compiled[OutT1, OutT2, OutT3 any]( log.Fatalf("io.Await3: not done: %+v, %+v, %+v\n", p1, p2, p3) } - P.Return(R3[OutT1, OutT2, OutT3]{ + P.Return(tuple.Tuple3[OutT1, OutT2, OutT3]{ V1: p1.value, V2: p2.value, V3: p3.value, @@ -241,8 +236,8 @@ func Await3Compiled[OutT1, OutT2, OutT3 any]( return P } -func PAllCompiled[OutT any](acs ...AsyncCall[OutT]) *Promise[[]OutT] { - P := &Promise[[]OutT]{} +func PAllCompiled[OutT any](acs ...AsyncCall[OutT]) *PromiseImpl[[]OutT] { + P := &PromiseImpl[[]OutT]{} P.Debug = "Parallel" P.Func = func() { ret := make([]OutT, len(acs)) @@ -252,7 +247,7 @@ func PAllCompiled[OutT any](acs ...AsyncCall[OutT]) *Promise[[]OutT] { ac := ac wg.Add(1) go func(ac AsyncCall[OutT]) { - v := Run[OutT](ac) + v := RunImpl[OutT](ac) ret[idx] = v wg.Done() }(ac) @@ -264,23 +259,23 @@ func PAllCompiled[OutT any](acs ...AsyncCall[OutT]) *Promise[[]OutT] { } func PAwait3Compiled[OutT1, OutT2, OutT3 any]( - ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3]) *Promise[R4[OutT1, OutT2, OutT3, error]] { - P := &Promise[R4[OutT1, OutT2, OutT3, error]]{} + ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3]) *PromiseImpl[tuple.Tuple4[OutT1, OutT2, OutT3, error]] { + P := &PromiseImpl[tuple.Tuple4[OutT1, OutT2, OutT3, error]]{} P.Debug = "PAwait3" P.Func = func() { - ret := R4[OutT1, OutT2, OutT3, error]{} + ret := tuple.Tuple4[OutT1, OutT2, OutT3, error]{} wg := sync.WaitGroup{} wg.Add(3) go func() { - ret.V1 = Run[OutT1](ac1) + ret.V1 = RunImpl[OutT1](ac1) wg.Done() }() go func() { - ret.V2 = Run[OutT2](ac2) + ret.V2 = RunImpl[OutT2](ac2) wg.Done() }() go func() { - ret.V3 = Run[OutT3](ac3) + ret.V3 = RunImpl[OutT3](ac3) wg.Done() }() wg.Wait() diff --git a/x/io/naive/naive.go b/x/io/naive/naive.go new file mode 100644 index 00000000..a8de2bd9 --- /dev/null +++ b/x/io/naive/naive.go @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package naive + +import ( + "log" + "sync" +) + +const debugAsync = false + +// ----------------------------------------------------------------------------- + +type asyncCall interface { + parent() asyncCall + Resume() + Call() + Done() bool +} + +type AsyncCall[OutT any] interface { + Resume() +} + +type executor struct { + acs []asyncCall + mu sync.Mutex + cond *sync.Cond +} + +func newExecutor() *executor { + e := &executor{} + e.cond = sync.NewCond(&e.mu) + return e +} + +func (e *executor) schedule(ac asyncCall) { + e.mu.Lock() + e.acs = append(e.acs, ac) + e.mu.Unlock() + e.cond.Signal() +} + +func RunImpl[OutT any](ac AsyncCall[OutT]) OutT { + e := newExecutor() + p := ac.(*PromiseImpl[OutT]) + p.Exec = e + var rootAc asyncCall = p + e.schedule(rootAc) + + for { + e.mu.Lock() + for len(e.acs) == 0 { + e.cond.Wait() + } + e.mu.Unlock() + ac := e.acs[0] + e.acs = e.acs[1:] + ac.Call() + if ac.Done() && ac == rootAc { + return p.value + } + } +} + +// ----------------------------------------------------------------------------- + +type PromiseImpl[TOut any] struct { + Debug string + Next int + Exec *executor + Parent asyncCall + + Func func() + value TOut + c chan TOut +} + +func (p *PromiseImpl[TOut]) parent() asyncCall { + return p.Parent +} + +func (p *PromiseImpl[TOut]) Resume() { + if debugAsync { + log.Printf("Resume task: %+v\n", p) + } + p.Exec.schedule(p) +} + +func (p *PromiseImpl[TOut]) Done() bool { + return p.Next == -1 +} + +func (p *PromiseImpl[TOut]) Call() { + p.Func() +} + +func (p *PromiseImpl[TOut]) Suspend() { + +} + +func (p *PromiseImpl[TOut]) Return(v TOut) { + // TODO(lijie): panic if already resolved + p.value = v + if p.c != nil { + p.c <- v + } + if debugAsync { + log.Printf("Return task: %+v\n", p) + } + if p.Parent != nil { + p.Parent.Resume() + } +} + +func (p *PromiseImpl[TOut]) Yield(v TOut) { + p.value = v + if debugAsync { + log.Printf("Yield task: %+v\n", p) + } + if p.Parent != nil { + p.Parent.Resume() + } +} + +func (p *PromiseImpl[TOut]) Value() TOut { + return p.value +} + +func (p *PromiseImpl[TOut]) Chan() <-chan TOut { + if p.c == nil { + p.c = make(chan TOut, 1) + p.Func() + } + return p.c +} + +func (p *PromiseImpl[TOut]) Await() (ret TOut) { + panic("should not called") +} diff --git a/x/tuple/tuple.go b/x/tuple/tuple.go new file mode 100644 index 00000000..4ebd3071 --- /dev/null +++ b/x/tuple/tuple.go @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package tuple + +// ----------------------------------------------------------------------------- + +type Tuple1[T any] struct { + V1 T +} + +func (r Tuple1[T]) Values() T { + return r.V1 +} + +type Tuple2[T1 any, T2 any] struct { + V1 T1 + V2 T2 +} + +func (r Tuple2[T1, T2]) Values() (T1, T2) { + return r.V1, r.V2 +} + +type Tuple3[T1 any, T2 any, T3 any] struct { + V1 T1 + V2 T2 + V3 T3 +} + +func (r Tuple3[T1, T2, T3]) Values() (T1, T2, T3) { + return r.V1, r.V2, r.V3 +} + +type Tuple4[T1 any, T2 any, T3 any, T4 any] struct { + V1 T1 + V2 T2 + V3 T3 + V4 T4 +} + +func (r Tuple4[T1, T2, T3, T4]) Values() (T1, T2, T3, T4) { + return r.V1, r.V2, r.V3, r.V4 +}