From eb7a94bb552f7ec890c47d522531d87c58f017c1 Mon Sep 17 00:00:00 2001 From: Li Jie Date: Tue, 23 Jul 2024 15:58:37 +0800 Subject: [PATCH 1/5] *io.Promise --- x/io/README.md | 116 +++++++++++++++++++++++++++++----- x/io/_demo/asyncdemo/async.go | 107 +++++++++++++++++++++++++++---- x/io/io.go | 29 ++++++++- 3 files changed, 222 insertions(+), 30 deletions(-) diff --git a/x/io/README.md b/x/io/README.md index f5585c58..79d76666 100644 --- a/x/io/README.md +++ b/x/io/README.md @@ -36,9 +36,8 @@ async function asyncCall(): Promise { return `AsyncCall: ${result}`; } -async function asyncCall2(): Promise { - const result = await resolveAfter1Second(); - return `AsyncCall2: ${result}`; +function asyncCall2(): Promise { + return resolveAfter1Second(); } function asyncCall3(): void { @@ -92,9 +91,8 @@ async def async_call() -> str: result = await resolve_after_1_second() return f"AsyncCall: {result}" -async def async_call2() -> str: - result = await resolve_after_1_second() - return f"AsyncCall2: {result}" +def async_call2() -> asyncio.Task: + return resolve_after_1_second() def async_call3() -> None: asyncio.create_task(print_after_1_second()) @@ -139,7 +137,9 @@ async fn name(param0: Type) -> ReturnType { Example: ```rust -use tokio::time::{sleep, Duration}; +use std::time::Duration; +use tokio::time::sleep; +use std::future::Future; async fn resolve_after_1_second() -> String { sleep(Duration::from_secs(1)).await; @@ -151,9 +151,8 @@ async fn async_call() -> String { format!("AsyncCall: {}", result) } -async fn async_call2() -> String { - let result = resolve_after_1_second().await; - format!("AsyncCall2: {}", result) +fn async_call2() -> impl Future { + resolve_after_1_second() } fn async_call3() { @@ -177,7 +176,7 @@ async fn main() { async_call3(); // Wait for AsyncCall3 to complete - sleep(Duration::from_secs(1)).await; + sleep(Duration::from_secs(2)).await; println!("Main function completed"); } @@ -216,10 +215,9 @@ class Program return $"AsyncCall: {result}"; } - static async Task AsyncCall2() + static Task AsyncCall2() { - string result = await ResolveAfter1Second(); - return $"AsyncCall2: {result}"; + return ResolveAfter1Second(); } static void AsyncCall3() @@ -286,8 +284,7 @@ cppcoro::task asyncCall() { } cppcoro::task asyncCall2() { - auto result = co_await resolveAfter1Second(); - co_return "AsyncCall2: " + result; + return resolveAfter1Second(); } cppcoro::task asyncCall3() { @@ -323,3 +320,90 @@ int main() { return 0; } ``` + +## Common concepts + +### Promise, Future, Task, and Coroutine + +- **Promise**: An object that represents the eventual completion (or failure) of an asynchronous operation and its resulting value. It is used to produce a value that will be consumed by a `Future`. + +- **Future**: An object that represents the result of an asynchronous operation. It is used to obtain the value produced by a `Promise`. + +- **Task**: A unit of work that can be scheduled and executed asynchronously. It is a higher-level abstraction that combines a `Promise` and a `Future`. + +- **Coroutine**: A special type of function that can suspend its execution and return control to the caller without losing its state. It can be resumed later, allowing for asynchronous programming. + +### `async`, `await` and similar keywords + +- **`async`**: A keyword used to define a function that returns a `Promise` or `Task`. It allows the function to pause its execution and resume later. + +- **`await`**: A keyword used to pause the execution of an `async` function until a `Promise` or `Task` is resolved. It unwraps the value of the `Promise` or `Task` and allows the function to continue. + +- **`co_return`**: A keyword used in C++ coroutines to return a value from a coroutine. It is similar to `return` but is used in coroutines to indicate that the coroutine has completed. It's similar to `return` in `async` functions in other languages that boxes the value into a `Promise` or `Task`. + +`async/await` and similar constructs provide a more readable and synchronous-like way of writing asynchronous code, it hides the type of `Promise`/`Future`/`Task` from the user and allows them to focus on the logic of the code. + +### Executing Multiple Async Operations Concurrently + +To run multiple promises concurrently, JavaScript provides `Promise.all`, `Promise.allSettled` and `Promise.any`, Python provides `asyncio.gather`, Rust provides `tokio::try_join`, C# provides `Task.WhenAll`, and C++ provides `cppcoro::when_all`. + +In some situations, you may want to get the first result of multiple async operations. JavaScript provides `Promise.race` to get the first result of multiple promises. Python provides `asyncio.wait` to get the first result of multiple coroutines. Rust provides `tokio::select!` to get the first result of multiple futures. C# provides `Task.WhenAny` to get the first result of multiple tasks. C++ provides `cppcoro::when_any` to get the first result of multiple tasks. Those functions are very simular to `select` in Go. + +### Error Handling + +`await` commonly unwraps the value of a `Promise` or `Task`, but it also propagates errors. If the `Promise` or `Task` is rejected or throws an error, the error will be thrown in the `async` function by the `await` keyword. You can use `try/catch` blocks to handle errors in `async` functions. + +## Common patterns + +- `async` keyword hides the types of `Promise`/`Future`/`Task` in the function signature in Python and Rust, but not in JavaScript, C#, and C++. +- `await` keyword unwraps the value of a `Promise`/`Future`/`Task`. +- `return` keyword boxes the value into a `Promise`/`Future`/`Task` if it's not already. + +## Design considerations in LLGo + +- Don't introduce `async`/`await` keywords to compatible with Go compiler. +- For performance reason don't implement async functions with goroutines + +## Design + +```go +func resolveAfter1Second() Promise[string] { + return Async(func (context) { + context.ScheduleAfter(1 * time.Second, func() { + context.Resolve("Resolved after 1 second") + }) + }) +} + +func asyncCall() Promise[string] { + return Async(resolveAfter1Second().Await()) +} + +func asyncCall2() Promise[string] { + return resolveAfter1Second() +} + +func asyncCall3() { + resolveAfter1Second().Then(func(result string) { + fmt.Println("AsyncCall3: " + result) + }) +} + +func asyncMain() { + fmt.Println("Starting AsyncCall") + result1 := asyncCall().Await() + fmt.Println(result1) + + fmt.Println("Starting AsyncCall2") + result2 := asyncCall2().Await() + fmt.Println(result2) + + fmt.Println("Starting AsyncCall3") + asyncCall3() + + // Wait for AsyncCall3 to complete + time.Sleep(2 * time.Second) + + fmt.Println("Main function completed") +} +``` diff --git a/x/io/_demo/asyncdemo/async.go b/x/io/_demo/asyncdemo/async.go index 6d5362ae..ce972fe7 100644 --- a/x/io/_demo/asyncdemo/async.go +++ b/x/io/_demo/asyncdemo/async.go @@ -1,23 +1,108 @@ package main import ( + "encoding/json" "fmt" "time" "github.com/goplus/llgo/x/io" ) -var GetUser = io.Async[any](func(string) any { - panic("todo: GetUser") -}) +type Response struct { + StatusCode int -var GetScore = io.Async[float64](func() float64 { + mockBody string +} + +func (r *Response) mock(body string) { + r.mockBody = body +} + +func (r *Response) Text() *io.Promise[string] { + return io.NewPromise[string](func(resolve func(string, error)) { + resolve(r.mockBody, nil) + }) +} + +func HttpGet(url string, callback func(resp *Response, err error)) { + panic("todo: Get") +} + +func AsyncHttpGet(url string) io.AsyncCall[*Response] { + return io.NewPromise[*Response](func(resolve func(*Response, error)) { + HttpGet(url, resolve) + }) +} + +type User struct { + Name string +} + +func GetUser(uid string) io.AsyncCall[User] { + return io.NewPromise[User](func(resolve func(User, error)) { + resp, err := io.Await(AsyncHttpGet("http://example.com/user/" + uid)) + if err != nil { + resolve(User{}, err) + return + } + + if resp.StatusCode != 200 { + resolve(User{}, fmt.Errorf("http status code: %d", resp.StatusCode)) + return + } + + resp.mock(`{"name":"Alice"}`) + + body, err := io.Await[string](resp.Text()) + if err != nil { + resolve(User{}, err) + return + } + user := User{} + if err := json.Unmarshal([]byte(body), &user); err != nil { + resolve(User{}, err) + return + } + + resolve(user, nil) + }) +} + +// func GetUser1(uid string) (resolve io.AsyncCall[User]) { +// resp, err := io.Await(AsyncHttpGet("http://example.com/user/" + uid)) +// if err != nil { +// resolve(User{}, err) +// return +// } + +// if resp.StatusCode != 200 { +// resolve(User{}, fmt.Errorf("http status code: %d", resp.StatusCode)) +// return +// } + +// resp.mock(`{"name":"Alice"}`) + +// body, err := io.Await[string](resp.Text()) +// if err != nil { +// resolve(User{}, err) +// return +// } +// user := User{} +// if err := json.Unmarshal([]byte(body), &user); err != nil { +// resolve(User{}, err) +// return +// } + +// resolve(user, nil) +// } + +func GetScore() *io.Promise[float64] { panic("todo: GetScore") -}) +} -var DoUpdate = io.Async[io.Void](func(op string) io.Void { +func DoUpdate(op string) *io.Promise[io.Void] { panic("todo: DoUpdate") -}) +} func main() { user, err := GetUser("123").Await() @@ -26,15 +111,15 @@ func main() { user, err = io.Race(GetUser("123"), GetUser("456"), GetUser("789")).Await() fmt.Println(user, err) - user, score, _, err := io.Await3[any, float64, io.Void](GetUser("123"), GetScore(), DoUpdate("update sth.")) + user, score, _, err := io.Await3[User, float64, io.Void](GetUser("123"), GetScore(), DoUpdate("update sth.")) fmt.Println(user, score, err) select { - case user := <-GetUser("123"): + case user := <-GetUser("123").Chan(): fmt.Println("user:", user) - case score := <-GetScore(): + case score := <-GetScore().Chan(): fmt.Println("score:", score) - case <-io.Timeout(5 * time.Second): + case <-io.Timeout(5 * time.Second).Chan(): fmt.Println("timeout") } } diff --git a/x/io/io.go b/x/io/io.go index 8748673d..7ceb650b 100644 --- a/x/io/io.go +++ b/x/io/io.go @@ -30,10 +30,13 @@ type Void = [0]byte // ----------------------------------------------------------------------------- -type AsyncCall[OutT any] chan OutT +type AsyncCall[OutT any] interface { + Await(timeout ...time.Duration) (ret OutT, err error) + Chan() <-chan OutT +} // llgo:link AsyncCall.Await llgo.await -func (AsyncCall[OutT]) Await(timeout ...time.Duration) (ret OutT, err error) { +func Await[OutT any](call AsyncCall[OutT], timeout ...time.Duration) (ret OutT, err error) { return } @@ -61,7 +64,27 @@ func Await3[OutT1, OutT2, OutT3 any]( // ----------------------------------------------------------------------------- -type Promise[OutT any] func(...any) AsyncCall[OutT] +type Promise[OutT any] struct { +} + +func NewPromise[OutT any](fn func(resolve func(OutT, error))) (ret *Promise[OutT]) { + ret = &Promise[OutT]{} + return +} + +func NewPromiseFromValue[OutT any](value OutT) (ret *Promise[OutT]) { + return NewPromise[OutT](func(resolve func(OutT, error)) { + resolve(value, nil) + }) +} + +func (p *Promise[OutT]) Await(timeout ...time.Duration) (ret OutT, err error) { + return +} + +func (p *Promise[OutT]) Chan() <-chan OutT { + return nil +} // llgo:link Async llgo.async func Async[OutT any](fn any) (ret Promise[OutT]) { From 7230e191666990dce1f35181ca6be4ce56d8311b Mon Sep 17 00:00:00 2001 From: Li Jie Date: Tue, 23 Jul 2024 17:02:40 +0800 Subject: [PATCH 2/5] asyncio: redesign --- x/io/README.md | 68 ++++++++++++++++++++++--- x/io/_demo/asyncdemo/async.go | 94 ++++++++++++----------------------- x/io/io.go | 28 +++-------- 3 files changed, 102 insertions(+), 88 deletions(-) diff --git a/x/io/README.md b/x/io/README.md index 79d76666..1ad9dad7 100644 --- a/x/io/README.md +++ b/x/io/README.md @@ -366,23 +366,77 @@ In some situations, you may want to get the first result of multiple async opera ## Design +Introduce `Promise` type to represent the eventual completion of an asynchronous operation and its resulting value. `Promise` can be resolved with a value or rejected with an error. `Promise` can be awaited to get the value or error. + +`Promise` just a type indicating the result of an asynchronous operation, it injected by the LLGo compiler, and the user can't create a `Promise` directly. + ```go -func resolveAfter1Second() Promise[string] { - return Async(func (context) { - context.ScheduleAfter(1 * time.Second, func() { - context.Resolve("Resolved after 1 second") - }) +// Some native async functions +func timeoutAsync(d time.Duration, cb func()) { + go func() { + time.Sleep(d) + cb() + }() +} + +// Wrap callback-based async function into Promise +func resolveAfter1Second() (resolve Promise[string]) { + timeoutAsync(1 * time.Second, func() { + resolve("Resolved after 1 second", nil) }) } -func asyncCall() Promise[string] { - return Async(resolveAfter1Second().Await()) +// Compiled to: +func resolveAfter1Second() (resolve PromiseImpl[string]) { + promise := io.NewPromiseImpl[string](resolve func(value string, err error) { + resolve: func(value string, err error) { + for true { + switch (promise.prev = promise.next) { + case 0: + timeoutAsync(1 * time.Second, func() { + resolve("Resolved after 1 second", nil) + }) + } + } + }, + } + return promise } +func asyncCall() (resolve Promise[string]) { + str, err := resolveAfter1Second().Await() + resolve("AsyncCall: " + str, err) +} + +// Compiled to: +func asyncCall() (resolve PromiseImpl[string]) { + promise := io.NewPromiseImpl[string](resolve func(value string, err error) { + for true { + switch (promise.prev = promise.next) { + case 0: + resolveAfter1Second() + return + case 1: + str, err := promise.value, promise.err + resolve("AsyncCall: " + str, err) + return + } + } + }) + return promise +} + +// Directly return Promise func asyncCall2() Promise[string] { return resolveAfter1Second() } +// Compiled to: +func asyncCall2() PromiseImpl[string] { + return resolveAfter1Second() +} + +// Don't wait for Promise to complete func asyncCall3() { resolveAfter1Second().Then(func(result string) { fmt.Println("AsyncCall3: " + result) diff --git a/x/io/_demo/asyncdemo/async.go b/x/io/_demo/asyncdemo/async.go index ce972fe7..b29751f8 100644 --- a/x/io/_demo/asyncdemo/async.go +++ b/x/io/_demo/asyncdemo/async.go @@ -18,84 +18,53 @@ func (r *Response) mock(body string) { r.mockBody = body } -func (r *Response) Text() *io.Promise[string] { - return io.NewPromise[string](func(resolve func(string, error)) { - resolve(r.mockBody, nil) - }) +func (r *Response) Text() (resolve io.Promise[string]) { + resolve(r.mockBody, nil) + return } func HttpGet(url string, callback func(resp *Response, err error)) { panic("todo: Get") } -func AsyncHttpGet(url string) io.AsyncCall[*Response] { - return io.NewPromise[*Response](func(resolve func(*Response, error)) { - HttpGet(url, resolve) - }) +func AsyncHttpGet(url string) (resolve io.Promise[*Response]) { + HttpGet(url, resolve) + return } type User struct { Name string } -func GetUser(uid string) io.AsyncCall[User] { - return io.NewPromise[User](func(resolve func(User, error)) { - resp, err := io.Await(AsyncHttpGet("http://example.com/user/" + uid)) - if err != nil { - resolve(User{}, err) - return - } +func GetUser(uid string) (resolve io.Promise[User]) { + resp, err := io.Await[*Response](AsyncHttpGet("http://example.com/user/" + uid)) + if err != nil { + resolve(User{}, err) + return + } - if resp.StatusCode != 200 { - resolve(User{}, fmt.Errorf("http status code: %d", resp.StatusCode)) - return - } + if resp.StatusCode != 200 { + resolve(User{}, fmt.Errorf("http status code: %d", resp.StatusCode)) + return + } - resp.mock(`{"name":"Alice"}`) + resp.mock(`{"name":"Alice"}`) - body, err := io.Await[string](resp.Text()) - if err != nil { - resolve(User{}, err) - return - } - user := User{} - if err := json.Unmarshal([]byte(body), &user); err != nil { - resolve(User{}, err) - return - } + body, err := io.Await[string](resp.Text()) + if err != nil { + resolve(User{}, err) + return + } + user := User{} + if err := json.Unmarshal([]byte(body), &user); err != nil { + resolve(User{}, err) + return + } - resolve(user, nil) - }) + resolve(user, nil) + return } -// func GetUser1(uid string) (resolve io.AsyncCall[User]) { -// resp, err := io.Await(AsyncHttpGet("http://example.com/user/" + uid)) -// if err != nil { -// resolve(User{}, err) -// return -// } - -// if resp.StatusCode != 200 { -// resolve(User{}, fmt.Errorf("http status code: %d", resp.StatusCode)) -// return -// } - -// resp.mock(`{"name":"Alice"}`) - -// body, err := io.Await[string](resp.Text()) -// if err != nil { -// resolve(User{}, err) -// return -// } -// user := User{} -// if err := json.Unmarshal([]byte(body), &user); err != nil { -// resolve(User{}, err) -// return -// } - -// resolve(user, nil) -// } - func GetScore() *io.Promise[float64] { panic("todo: GetScore") } @@ -108,9 +77,12 @@ func main() { user, err := GetUser("123").Await() fmt.Println(user, err) - user, err = io.Race(GetUser("123"), GetUser("456"), GetUser("789")).Await() + user, err = io.Race[User](GetUser("123"), GetUser("456"), GetUser("789")).Await() fmt.Println(user, err) + users, err := io.All[User]([]io.AsyncCall[User]{GetUser("123"), GetUser("456"), GetUser("789")}).Await() + fmt.Println(users, err) + user, score, _, err := io.Await3[User, float64, io.Void](GetUser("123"), GetScore(), DoUpdate("update sth.")) fmt.Println(user, score, err) diff --git a/x/io/io.go b/x/io/io.go index 7ceb650b..fde7428f 100644 --- a/x/io/io.go +++ b/x/io/io.go @@ -48,6 +48,10 @@ func Race[OutT any](acs ...AsyncCall[OutT]) (ret AsyncCall[OutT]) { return } +func All[OutT any](acs []AsyncCall[OutT]) (ret AsyncCall[[]OutT]) { + return nil +} + // llgo:link Await2 llgo.await func Await2[OutT1, OutT2 any]( ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], @@ -64,31 +68,15 @@ func Await3[OutT1, OutT2, OutT3 any]( // ----------------------------------------------------------------------------- -type Promise[OutT any] struct { -} +type Promise[OutT any] func(OutT, error) -func NewPromise[OutT any](fn func(resolve func(OutT, error))) (ret *Promise[OutT]) { - ret = &Promise[OutT]{} +// llgo:link Promise.Await llgo.await +func (p Promise[OutT]) Await(timeout ...time.Duration) (ret OutT, err error) { return } -func NewPromiseFromValue[OutT any](value OutT) (ret *Promise[OutT]) { - return NewPromise[OutT](func(resolve func(OutT, error)) { - resolve(value, nil) - }) -} - -func (p *Promise[OutT]) Await(timeout ...time.Duration) (ret OutT, err error) { - return -} - -func (p *Promise[OutT]) Chan() <-chan OutT { +func (p Promise[OutT]) Chan() <-chan OutT { return nil } -// llgo:link Async llgo.async -func Async[OutT any](fn any) (ret Promise[OutT]) { - return -} - // ----------------------------------------------------------------------------- From e1109e9e5175e740946ec7a78c9af941a99ab46e Mon Sep 17 00:00:00 2001 From: Li Jie Date: Tue, 23 Jul 2024 17:25:42 +0800 Subject: [PATCH 3/5] asyncio: doc update --- x/io/README.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/x/io/README.md b/x/io/README.md index 1ad9dad7..9681b3ab 100644 --- a/x/io/README.md +++ b/x/io/README.md @@ -361,14 +361,15 @@ In some situations, you may want to get the first result of multiple async opera ## Design considerations in LLGo -- Don't introduce `async`/`await` keywords to compatible with Go compiler. +- Don't introduce `async`/`await` keywords to compatible with Go compiler (just compiling) - For performance reason don't implement async functions with goroutines +- Avoid implementing `Promise` by using `chan` to avoid blocking the thread, but it can be wrapped as a `chan` to make it compatible `select` statement ## Design -Introduce `Promise` type to represent the eventual completion of an asynchronous operation and its resulting value. `Promise` can be resolved with a value or rejected with an error. `Promise` can be awaited to get the value or error. +Introduce `Promise` type to represent an asynchronous operation and its resulting value. `Promise` can be resolved with a value with an error. `Promise` can be awaited to get the value and error. -`Promise` just a type indicating the result of an asynchronous operation, it injected by the LLGo compiler, and the user can't create a `Promise` directly. +`Promise` just a type indicating the asynchronous operation, it can't be created and assigned directly. It be replaced to `PromiseImpl` by the LLGo compiler. ```go // Some native async functions From ab1afd68b7419ea60cdaedee46323d7a69ac3b32 Mon Sep 17 00:00:00 2001 From: Li Jie Date: Tue, 23 Jul 2024 17:33:01 +0800 Subject: [PATCH 4/5] asyncio: instead io.Await(call) with call.Await() in demo --- x/io/_demo/asyncdemo/async.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x/io/_demo/asyncdemo/async.go b/x/io/_demo/asyncdemo/async.go index b29751f8..171301a9 100644 --- a/x/io/_demo/asyncdemo/async.go +++ b/x/io/_demo/asyncdemo/async.go @@ -37,7 +37,7 @@ type User struct { } func GetUser(uid string) (resolve io.Promise[User]) { - resp, err := io.Await[*Response](AsyncHttpGet("http://example.com/user/" + uid)) + resp, err := AsyncHttpGet("http://example.com/user/" + uid).Await() if err != nil { resolve(User{}, err) return @@ -50,7 +50,7 @@ func GetUser(uid string) (resolve io.Promise[User]) { resp.mock(`{"name":"Alice"}`) - body, err := io.Await[string](resp.Text()) + body, err := resp.Text().Await() if err != nil { resolve(User{}, err) return From 89bdb315d5a972a2a09556cf1afff926bfa16cfe Mon Sep 17 00:00:00 2001 From: Li Jie Date: Wed, 24 Jul 2024 01:01:09 +0800 Subject: [PATCH 5/5] WIP --- x/io/_demo/asyncdemo/async.go | 211 ++++++++++++++++++++++++++++++++-- x/io/io.go | 92 ++++++++++++++- 2 files changed, 292 insertions(+), 11 deletions(-) diff --git a/x/io/_demo/asyncdemo/async.go b/x/io/_demo/asyncdemo/async.go index 171301a9..02b4475a 100644 --- a/x/io/_demo/asyncdemo/async.go +++ b/x/io/_demo/asyncdemo/async.go @@ -3,11 +3,14 @@ package main import ( "encoding/json" "fmt" + "log" "time" "github.com/goplus/llgo/x/io" ) +// ----------------------------------------------------------------------------- + type Response struct { StatusCode int @@ -23,8 +26,26 @@ func (r *Response) Text() (resolve io.Promise[string]) { return } +func (r *Response) TextCompiled() *io.PromiseImpl[string] { + P := &io.PromiseImpl[string]{} + P.Func = func(resolve func(string, error)) { + for { + switch P.Prev = P.Next; P.Prev { + case 0: + resolve(r.mockBody, nil) + P.Next = -1 + return + default: + panic("Promise already done") + } + } + } + return P +} + func HttpGet(url string, callback func(resp *Response, err error)) { - panic("todo: Get") + resp := &Response{StatusCode: 200} + callback(resp, nil) } func AsyncHttpGet(url string) (resolve io.Promise[*Response]) { @@ -32,6 +53,25 @@ func AsyncHttpGet(url string) (resolve io.Promise[*Response]) { return } +func AsyncHttpGetCompiled(url string) *io.PromiseImpl[*Response] { + P := &io.PromiseImpl[*Response]{} + P.Func = func(resolve func(*Response, error)) { + for { + switch P.Prev = P.Next; P.Prev { + case 0: + HttpGet(url, resolve) + P.Next = -1 + return + default: + panic("Promise already done") + } + } + } + return P +} + +// ----------------------------------------------------------------------------- + type User struct { Name string } @@ -65,33 +105,186 @@ func GetUser(uid string) (resolve io.Promise[User]) { return } +func GetUserCompiled(uid string) *io.PromiseImpl[User] { + var state1 *io.PromiseImpl[*Response] + var state2 *io.PromiseImpl[string] + + P := &io.PromiseImpl[User]{} + P.Func = func(resolve func(User, error)) { + for { + switch P.Prev = P.Next; P.Prev { + case 0: + state1 = AsyncHttpGetCompiled("http://example.com/user/" + uid) + P.Next = 1 + return + case 1: + state1.EnsureDone() + resp, err := state1.Value, state1.Err + if err != nil { + resolve(User{}, err) + return + } + + if resp.StatusCode != 200 { + resolve(User{}, fmt.Errorf("http status code: %d", resp.StatusCode)) + return + } + + resp.mock(`{"name":"Alice"}`) + + state2 = resp.TextCompiled() + P.Next = 2 + return + case 2: + state2.EnsureDone() + body, err := state2.Value, state2.Err + if err != nil { + resolve(User{}, err) + return + } + user := User{} + if err := json.Unmarshal([]byte(body), &user); err != nil { + resolve(User{}, err) + return + } + + resolve(user, nil) + P.Next = -1 + return + default: + panic("Promise already done") + } + } + } + return P +} + func GetScore() *io.Promise[float64] { panic("todo: GetScore") } +func GetScoreCompiled() *io.PromiseImpl[float64] { + P := &io.PromiseImpl[float64]{} + P.Func = func(resolve func(float64, error)) { + for { + switch P.Prev = P.Next; P.Prev { + case 0: + panic("todo: GetScore") + default: + panic("Promise already done") + } + } + } + return P +} + func DoUpdate(op string) *io.Promise[io.Void] { panic("todo: DoUpdate") } -func main() { +func DoUpdateCompiled(op string) *io.PromiseImpl[io.Void] { + P := &io.PromiseImpl[io.Void]{} + P.Func = func(resolve func(io.Void, error)) { + for { + switch P.Prev = P.Next; P.Prev { + case 0: + panic("todo: DoUpdate") + default: + panic("Promise already done") + } + } + } + return P +} + +func Demo() (resolve io.Promise[io.Void]) { user, err := GetUser("123").Await() - fmt.Println(user, err) + log.Println(user, err) user, err = io.Race[User](GetUser("123"), GetUser("456"), GetUser("789")).Await() - fmt.Println(user, err) + log.Println(user, err) users, err := io.All[User]([]io.AsyncCall[User]{GetUser("123"), GetUser("456"), GetUser("789")}).Await() - fmt.Println(users, err) + log.Println(users, err) user, score, _, err := io.Await3[User, float64, io.Void](GetUser("123"), GetScore(), DoUpdate("update sth.")) - fmt.Println(user, score, err) + log.Println(user, score, err) + // TODO(lijie): select from multiple promises without channel select { case user := <-GetUser("123").Chan(): - fmt.Println("user:", user) + log.Println("user:", user) case score := <-GetScore().Chan(): - fmt.Println("score:", score) + log.Println("score:", score) case <-io.Timeout(5 * time.Second).Chan(): - fmt.Println("timeout") + log.Println("timeout") } + return +} + +func DemoCompiled() *io.PromiseImpl[io.Void] { + var state1 *io.PromiseImpl[User] + var state2 *io.PromiseImpl[User] + var state3 *io.PromiseImpl[[]User] + var state4 *io.PromiseImpl[io.Await3Result[User, float64, io.Void]] + + P := &io.PromiseImpl[io.Void]{} + P.Func = func(resolve func(io.Void, error)) { + for { + switch P.Prev = P.Next; P.Prev { + case 0: + state1 = GetUserCompiled("123") + P.Next = 1 + return + case 1: + state1.EnsureDone() + user, err := state1.Value, state1.Err + log.Printf("user: %v, err: %v\n", user, err) + + state2 = io.Race[User](GetUserCompiled("123"), GetUserCompiled("456"), GetUserCompiled("789")) + P.Next = 2 + return + case 2: + state2.EnsureDone() + user, err := state2.Value, state2.Err + log.Println(user, err) + + state3 = io.All[User]([]io.AsyncCall[User]{GetUserCompiled("123"), GetUserCompiled("456"), GetUserCompiled("789")}) + P.Next = 3 + return + case 3: + state3.EnsureDone() + users, err := state3.Value, state3.Err + log.Println(users, err) + + state4 = io.Await3Compiled[User, float64, io.Void](GetUserCompiled("123"), GetScoreCompiled(), DoUpdateCompiled("update sth.")) + P.Next = 4 + return + case 4: + state4.EnsureDone() + user, score, _, err := state4.Value.V1, state4.Value.V2, state4.Value.V3, state4.Value.Err + log.Println(user, score, err) + + select { + case user := <-GetUserCompiled("123").Chan(): + log.Println("user:", user) + case score := <-GetScoreCompiled().Chan(): + log.Println("score:", score) + case <-io.TimeoutCompiled(5 * time.Second).Chan(): + log.Println("timeout") + } + P.Next = -1 + return + default: + panic("Promise already done") + } + } + } + return P +} + +func main() { + log.SetFlags(log.Lshortfile | log.LstdFlags) + // io.Run(Demo()) + io.Run(DemoCompiled()) } diff --git a/x/io/io.go b/x/io/io.go index fde7428f..f4b4c18b 100644 --- a/x/io/io.go +++ b/x/io/io.go @@ -33,6 +33,7 @@ type Void = [0]byte type AsyncCall[OutT any] interface { Await(timeout ...time.Duration) (ret OutT, err error) Chan() <-chan OutT + EnsureDone() } // llgo:link AsyncCall.Await llgo.await @@ -43,12 +44,23 @@ func Await[OutT any](call AsyncCall[OutT], timeout ...time.Duration) (ret OutT, //go:linkname Timeout llgo.timeout func Timeout(time.Duration) (ret AsyncCall[Void]) +func TimeoutCompiled(d time.Duration) *PromiseImpl[Void] { + P := &PromiseImpl[Void]{} + P.Func = func(resolve func(Void, error)) { + go func() { + time.Sleep(d) + resolve(Void{}, nil) + }() + } + return P +} + // llgo:link Race llgo.race -func Race[OutT any](acs ...AsyncCall[OutT]) (ret AsyncCall[OutT]) { +func Race[OutT any](acs ...AsyncCall[OutT]) (ret *PromiseImpl[OutT]) { return } -func All[OutT any](acs []AsyncCall[OutT]) (ret AsyncCall[[]OutT]) { +func All[OutT any](acs []AsyncCall[OutT]) (ret *PromiseImpl[[]OutT]) { return nil } @@ -59,6 +71,18 @@ func Await2[OutT1, OutT2 any]( return } +type Await2Result[T1 any, T2 any] struct { + V1 T1 + V2 T2 + Err error +} + +func Await2Compiled[OutT1, OutT2 any]( + ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], + timeout ...time.Duration) (ret *PromiseImpl[Await2Result[OutT1, OutT2]]) { + return +} + // llgo:link Await3 llgo.await func Await3[OutT1, OutT2, OutT3 any]( ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3], @@ -66,6 +90,25 @@ func Await3[OutT1, OutT2, OutT3 any]( return } +type Await3Result[T1 any, T2 any, T3 any] struct { + V1 T1 + V2 T2 + V3 T3 + Err error +} + +func Await3Compiled[OutT1, OutT2, OutT3 any]( + ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3], + timeout ...time.Duration) (ret *PromiseImpl[Await3Result[OutT1, OutT2, OutT3]]) { + return +} + +func Run(ac AsyncCall[Void]) { + p := ac.(*PromiseImpl[Void]) + p.Resume() + <-ac.Chan() +} + // ----------------------------------------------------------------------------- type Promise[OutT any] func(OutT, error) @@ -79,4 +122,49 @@ func (p Promise[OutT]) Chan() <-chan OutT { return nil } +func (p Promise[OutT]) EnsureDone() { + +} + +// ----------------------------------------------------------------------------- + +type PromiseImpl[TOut any] struct { + Func func(resolve func(TOut, error)) + Value TOut + Err error + Prev int + Next int + + c chan TOut +} + +func (p *PromiseImpl[TOut]) Resume() { + p.Func(func(v TOut, err error) { + p.Value = v + p.Err = err + }) +} + +func (p *PromiseImpl[TOut]) EnsureDone() { + if p.Next == -1 { + panic("Promise already done") + } +} + +func (p *PromiseImpl[TOut]) Chan() <-chan TOut { + if p.c == nil { + p.c = make(chan TOut, 1) + p.Func(func(v TOut, err error) { + p.Value = v + p.Err = err + p.c <- v + }) + } + return p.c +} + +func (p *PromiseImpl[TOut]) Await(timeout ...time.Duration) (ret TOut, err error) { + panic("should not called") +} + // -----------------------------------------------------------------------------