diff --git a/x/io/README.md b/x/io/README.md index f5585c58..9681b3ab 100644 --- a/x/io/README.md +++ b/x/io/README.md @@ -36,9 +36,8 @@ async function asyncCall(): Promise { return `AsyncCall: ${result}`; } -async function asyncCall2(): Promise { - const result = await resolveAfter1Second(); - return `AsyncCall2: ${result}`; +function asyncCall2(): Promise { + return resolveAfter1Second(); } function asyncCall3(): void { @@ -92,9 +91,8 @@ async def async_call() -> str: result = await resolve_after_1_second() return f"AsyncCall: {result}" -async def async_call2() -> str: - result = await resolve_after_1_second() - return f"AsyncCall2: {result}" +def async_call2() -> asyncio.Task: + return resolve_after_1_second() def async_call3() -> None: asyncio.create_task(print_after_1_second()) @@ -139,7 +137,9 @@ async fn name(param0: Type) -> ReturnType { Example: ```rust -use tokio::time::{sleep, Duration}; +use std::time::Duration; +use tokio::time::sleep; +use std::future::Future; async fn resolve_after_1_second() -> String { sleep(Duration::from_secs(1)).await; @@ -151,9 +151,8 @@ async fn async_call() -> String { format!("AsyncCall: {}", result) } -async fn async_call2() -> String { - let result = resolve_after_1_second().await; - format!("AsyncCall2: {}", result) +fn async_call2() -> impl Future { + resolve_after_1_second() } fn async_call3() { @@ -177,7 +176,7 @@ async fn main() { async_call3(); // Wait for AsyncCall3 to complete - sleep(Duration::from_secs(1)).await; + sleep(Duration::from_secs(2)).await; println!("Main function completed"); } @@ -216,10 +215,9 @@ class Program return $"AsyncCall: {result}"; } - static async Task AsyncCall2() + static Task AsyncCall2() { - string result = await ResolveAfter1Second(); - return $"AsyncCall2: {result}"; + return ResolveAfter1Second(); } static void AsyncCall3() @@ -286,8 +284,7 @@ cppcoro::task asyncCall() { } cppcoro::task asyncCall2() { - auto result = co_await resolveAfter1Second(); - co_return "AsyncCall2: " + result; + return resolveAfter1Second(); } cppcoro::task asyncCall3() { @@ -323,3 +320,145 @@ int main() { return 0; } ``` + +## Common concepts + +### Promise, Future, Task, and Coroutine + +- **Promise**: An object that represents the eventual completion (or failure) of an asynchronous operation and its resulting value. It is used to produce a value that will be consumed by a `Future`. + +- **Future**: An object that represents the result of an asynchronous operation. It is used to obtain the value produced by a `Promise`. + +- **Task**: A unit of work that can be scheduled and executed asynchronously. It is a higher-level abstraction that combines a `Promise` and a `Future`. + +- **Coroutine**: A special type of function that can suspend its execution and return control to the caller without losing its state. It can be resumed later, allowing for asynchronous programming. + +### `async`, `await` and similar keywords + +- **`async`**: A keyword used to define a function that returns a `Promise` or `Task`. It allows the function to pause its execution and resume later. + +- **`await`**: A keyword used to pause the execution of an `async` function until a `Promise` or `Task` is resolved. It unwraps the value of the `Promise` or `Task` and allows the function to continue. + +- **`co_return`**: A keyword used in C++ coroutines to return a value from a coroutine. It is similar to `return` but is used in coroutines to indicate that the coroutine has completed. It's similar to `return` in `async` functions in other languages that boxes the value into a `Promise` or `Task`. + +`async/await` and similar constructs provide a more readable and synchronous-like way of writing asynchronous code, it hides the type of `Promise`/`Future`/`Task` from the user and allows them to focus on the logic of the code. + +### Executing Multiple Async Operations Concurrently + +To run multiple promises concurrently, JavaScript provides `Promise.all`, `Promise.allSettled` and `Promise.any`, Python provides `asyncio.gather`, Rust provides `tokio::try_join`, C# provides `Task.WhenAll`, and C++ provides `cppcoro::when_all`. + +In some situations, you may want to get the first result of multiple async operations. JavaScript provides `Promise.race` to get the first result of multiple promises. Python provides `asyncio.wait` to get the first result of multiple coroutines. Rust provides `tokio::select!` to get the first result of multiple futures. C# provides `Task.WhenAny` to get the first result of multiple tasks. C++ provides `cppcoro::when_any` to get the first result of multiple tasks. Those functions are very simular to `select` in Go. + +### Error Handling + +`await` commonly unwraps the value of a `Promise` or `Task`, but it also propagates errors. If the `Promise` or `Task` is rejected or throws an error, the error will be thrown in the `async` function by the `await` keyword. You can use `try/catch` blocks to handle errors in `async` functions. + +## Common patterns + +- `async` keyword hides the types of `Promise`/`Future`/`Task` in the function signature in Python and Rust, but not in JavaScript, C#, and C++. +- `await` keyword unwraps the value of a `Promise`/`Future`/`Task`. +- `return` keyword boxes the value into a `Promise`/`Future`/`Task` if it's not already. + +## Design considerations in LLGo + +- Don't introduce `async`/`await` keywords to compatible with Go compiler (just compiling) +- For performance reason don't implement async functions with goroutines +- Avoid implementing `Promise` by using `chan` to avoid blocking the thread, but it can be wrapped as a `chan` to make it compatible `select` statement + +## Design + +Introduce `Promise` type to represent an asynchronous operation and its resulting value. `Promise` can be resolved with a value with an error. `Promise` can be awaited to get the value and error. + +`Promise` just a type indicating the asynchronous operation, it can't be created and assigned directly. It be replaced to `PromiseImpl` by the LLGo compiler. + +```go +// Some native async functions +func timeoutAsync(d time.Duration, cb func()) { + go func() { + time.Sleep(d) + cb() + }() +} + +// Wrap callback-based async function into Promise +func resolveAfter1Second() (resolve Promise[string]) { + timeoutAsync(1 * time.Second, func() { + resolve("Resolved after 1 second", nil) + }) +} + +// Compiled to: +func resolveAfter1Second() (resolve PromiseImpl[string]) { + promise := io.NewPromiseImpl[string](resolve func(value string, err error) { + resolve: func(value string, err error) { + for true { + switch (promise.prev = promise.next) { + case 0: + timeoutAsync(1 * time.Second, func() { + resolve("Resolved after 1 second", nil) + }) + } + } + }, + } + return promise +} + +func asyncCall() (resolve Promise[string]) { + str, err := resolveAfter1Second().Await() + resolve("AsyncCall: " + str, err) +} + +// Compiled to: +func asyncCall() (resolve PromiseImpl[string]) { + promise := io.NewPromiseImpl[string](resolve func(value string, err error) { + for true { + switch (promise.prev = promise.next) { + case 0: + resolveAfter1Second() + return + case 1: + str, err := promise.value, promise.err + resolve("AsyncCall: " + str, err) + return + } + } + }) + return promise +} + +// Directly return Promise +func asyncCall2() Promise[string] { + return resolveAfter1Second() +} + +// Compiled to: +func asyncCall2() PromiseImpl[string] { + return resolveAfter1Second() +} + +// Don't wait for Promise to complete +func asyncCall3() { + resolveAfter1Second().Then(func(result string) { + fmt.Println("AsyncCall3: " + result) + }) +} + +func asyncMain() { + fmt.Println("Starting AsyncCall") + result1 := asyncCall().Await() + fmt.Println(result1) + + fmt.Println("Starting AsyncCall2") + result2 := asyncCall2().Await() + fmt.Println(result2) + + fmt.Println("Starting AsyncCall3") + asyncCall3() + + // Wait for AsyncCall3 to complete + time.Sleep(2 * time.Second) + + fmt.Println("Main function completed") +} +``` diff --git a/x/io/_demo/asyncdemo/async.go b/x/io/_demo/asyncdemo/async.go index 6d5362ae..02b4475a 100644 --- a/x/io/_demo/asyncdemo/async.go +++ b/x/io/_demo/asyncdemo/async.go @@ -1,40 +1,290 @@ package main import ( + "encoding/json" "fmt" + "log" "time" "github.com/goplus/llgo/x/io" ) -var GetUser = io.Async[any](func(string) any { - panic("todo: GetUser") -}) +// ----------------------------------------------------------------------------- -var GetScore = io.Async[float64](func() float64 { +type Response struct { + StatusCode int + + mockBody string +} + +func (r *Response) mock(body string) { + r.mockBody = body +} + +func (r *Response) Text() (resolve io.Promise[string]) { + resolve(r.mockBody, nil) + return +} + +func (r *Response) TextCompiled() *io.PromiseImpl[string] { + P := &io.PromiseImpl[string]{} + P.Func = func(resolve func(string, error)) { + for { + switch P.Prev = P.Next; P.Prev { + case 0: + resolve(r.mockBody, nil) + P.Next = -1 + return + default: + panic("Promise already done") + } + } + } + return P +} + +func HttpGet(url string, callback func(resp *Response, err error)) { + resp := &Response{StatusCode: 200} + callback(resp, nil) +} + +func AsyncHttpGet(url string) (resolve io.Promise[*Response]) { + HttpGet(url, resolve) + return +} + +func AsyncHttpGetCompiled(url string) *io.PromiseImpl[*Response] { + P := &io.PromiseImpl[*Response]{} + P.Func = func(resolve func(*Response, error)) { + for { + switch P.Prev = P.Next; P.Prev { + case 0: + HttpGet(url, resolve) + P.Next = -1 + return + default: + panic("Promise already done") + } + } + } + return P +} + +// ----------------------------------------------------------------------------- + +type User struct { + Name string +} + +func GetUser(uid string) (resolve io.Promise[User]) { + resp, err := AsyncHttpGet("http://example.com/user/" + uid).Await() + if err != nil { + resolve(User{}, err) + return + } + + if resp.StatusCode != 200 { + resolve(User{}, fmt.Errorf("http status code: %d", resp.StatusCode)) + return + } + + resp.mock(`{"name":"Alice"}`) + + body, err := resp.Text().Await() + if err != nil { + resolve(User{}, err) + return + } + user := User{} + if err := json.Unmarshal([]byte(body), &user); err != nil { + resolve(User{}, err) + return + } + + resolve(user, nil) + return +} + +func GetUserCompiled(uid string) *io.PromiseImpl[User] { + var state1 *io.PromiseImpl[*Response] + var state2 *io.PromiseImpl[string] + + P := &io.PromiseImpl[User]{} + P.Func = func(resolve func(User, error)) { + for { + switch P.Prev = P.Next; P.Prev { + case 0: + state1 = AsyncHttpGetCompiled("http://example.com/user/" + uid) + P.Next = 1 + return + case 1: + state1.EnsureDone() + resp, err := state1.Value, state1.Err + if err != nil { + resolve(User{}, err) + return + } + + if resp.StatusCode != 200 { + resolve(User{}, fmt.Errorf("http status code: %d", resp.StatusCode)) + return + } + + resp.mock(`{"name":"Alice"}`) + + state2 = resp.TextCompiled() + P.Next = 2 + return + case 2: + state2.EnsureDone() + body, err := state2.Value, state2.Err + if err != nil { + resolve(User{}, err) + return + } + user := User{} + if err := json.Unmarshal([]byte(body), &user); err != nil { + resolve(User{}, err) + return + } + + resolve(user, nil) + P.Next = -1 + return + default: + panic("Promise already done") + } + } + } + return P +} + +func GetScore() *io.Promise[float64] { panic("todo: GetScore") -}) +} -var DoUpdate = io.Async[io.Void](func(op string) io.Void { +func GetScoreCompiled() *io.PromiseImpl[float64] { + P := &io.PromiseImpl[float64]{} + P.Func = func(resolve func(float64, error)) { + for { + switch P.Prev = P.Next; P.Prev { + case 0: + panic("todo: GetScore") + default: + panic("Promise already done") + } + } + } + return P +} + +func DoUpdate(op string) *io.Promise[io.Void] { panic("todo: DoUpdate") -}) +} + +func DoUpdateCompiled(op string) *io.PromiseImpl[io.Void] { + P := &io.PromiseImpl[io.Void]{} + P.Func = func(resolve func(io.Void, error)) { + for { + switch P.Prev = P.Next; P.Prev { + case 0: + panic("todo: DoUpdate") + default: + panic("Promise already done") + } + } + } + return P +} + +func Demo() (resolve io.Promise[io.Void]) { + user, err := GetUser("123").Await() + log.Println(user, err) + + user, err = io.Race[User](GetUser("123"), GetUser("456"), GetUser("789")).Await() + log.Println(user, err) + + users, err := io.All[User]([]io.AsyncCall[User]{GetUser("123"), GetUser("456"), GetUser("789")}).Await() + log.Println(users, err) + + user, score, _, err := io.Await3[User, float64, io.Void](GetUser("123"), GetScore(), DoUpdate("update sth.")) + log.Println(user, score, err) + + // TODO(lijie): select from multiple promises without channel + select { + case user := <-GetUser("123").Chan(): + log.Println("user:", user) + case score := <-GetScore().Chan(): + log.Println("score:", score) + case <-io.Timeout(5 * time.Second).Chan(): + log.Println("timeout") + } + return +} + +func DemoCompiled() *io.PromiseImpl[io.Void] { + var state1 *io.PromiseImpl[User] + var state2 *io.PromiseImpl[User] + var state3 *io.PromiseImpl[[]User] + var state4 *io.PromiseImpl[io.Await3Result[User, float64, io.Void]] + + P := &io.PromiseImpl[io.Void]{} + P.Func = func(resolve func(io.Void, error)) { + for { + switch P.Prev = P.Next; P.Prev { + case 0: + state1 = GetUserCompiled("123") + P.Next = 1 + return + case 1: + state1.EnsureDone() + user, err := state1.Value, state1.Err + log.Printf("user: %v, err: %v\n", user, err) + + state2 = io.Race[User](GetUserCompiled("123"), GetUserCompiled("456"), GetUserCompiled("789")) + P.Next = 2 + return + case 2: + state2.EnsureDone() + user, err := state2.Value, state2.Err + log.Println(user, err) + + state3 = io.All[User]([]io.AsyncCall[User]{GetUserCompiled("123"), GetUserCompiled("456"), GetUserCompiled("789")}) + P.Next = 3 + return + case 3: + state3.EnsureDone() + users, err := state3.Value, state3.Err + log.Println(users, err) + + state4 = io.Await3Compiled[User, float64, io.Void](GetUserCompiled("123"), GetScoreCompiled(), DoUpdateCompiled("update sth.")) + P.Next = 4 + return + case 4: + state4.EnsureDone() + user, score, _, err := state4.Value.V1, state4.Value.V2, state4.Value.V3, state4.Value.Err + log.Println(user, score, err) + + select { + case user := <-GetUserCompiled("123").Chan(): + log.Println("user:", user) + case score := <-GetScoreCompiled().Chan(): + log.Println("score:", score) + case <-io.TimeoutCompiled(5 * time.Second).Chan(): + log.Println("timeout") + } + P.Next = -1 + return + default: + panic("Promise already done") + } + } + } + return P +} func main() { - user, err := GetUser("123").Await() - fmt.Println(user, err) - - user, err = io.Race(GetUser("123"), GetUser("456"), GetUser("789")).Await() - fmt.Println(user, err) - - user, score, _, err := io.Await3[any, float64, io.Void](GetUser("123"), GetScore(), DoUpdate("update sth.")) - fmt.Println(user, score, err) - - select { - case user := <-GetUser("123"): - fmt.Println("user:", user) - case score := <-GetScore(): - fmt.Println("score:", score) - case <-io.Timeout(5 * time.Second): - fmt.Println("timeout") - } + log.SetFlags(log.Lshortfile | log.LstdFlags) + // io.Run(Demo()) + io.Run(DemoCompiled()) } diff --git a/x/io/io.go b/x/io/io.go index 8748673d..f4b4c18b 100644 --- a/x/io/io.go +++ b/x/io/io.go @@ -30,21 +30,40 @@ type Void = [0]byte // ----------------------------------------------------------------------------- -type AsyncCall[OutT any] chan OutT +type AsyncCall[OutT any] interface { + Await(timeout ...time.Duration) (ret OutT, err error) + Chan() <-chan OutT + EnsureDone() +} // llgo:link AsyncCall.Await llgo.await -func (AsyncCall[OutT]) Await(timeout ...time.Duration) (ret OutT, err error) { +func Await[OutT any](call AsyncCall[OutT], timeout ...time.Duration) (ret OutT, err error) { return } //go:linkname Timeout llgo.timeout func Timeout(time.Duration) (ret AsyncCall[Void]) +func TimeoutCompiled(d time.Duration) *PromiseImpl[Void] { + P := &PromiseImpl[Void]{} + P.Func = func(resolve func(Void, error)) { + go func() { + time.Sleep(d) + resolve(Void{}, nil) + }() + } + return P +} + // llgo:link Race llgo.race -func Race[OutT any](acs ...AsyncCall[OutT]) (ret AsyncCall[OutT]) { +func Race[OutT any](acs ...AsyncCall[OutT]) (ret *PromiseImpl[OutT]) { return } +func All[OutT any](acs []AsyncCall[OutT]) (ret *PromiseImpl[[]OutT]) { + return nil +} + // llgo:link Await2 llgo.await func Await2[OutT1, OutT2 any]( ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], @@ -52,6 +71,18 @@ func Await2[OutT1, OutT2 any]( return } +type Await2Result[T1 any, T2 any] struct { + V1 T1 + V2 T2 + Err error +} + +func Await2Compiled[OutT1, OutT2 any]( + ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], + timeout ...time.Duration) (ret *PromiseImpl[Await2Result[OutT1, OutT2]]) { + return +} + // llgo:link Await3 llgo.await func Await3[OutT1, OutT2, OutT3 any]( ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3], @@ -59,13 +90,81 @@ func Await3[OutT1, OutT2, OutT3 any]( return } -// ----------------------------------------------------------------------------- +type Await3Result[T1 any, T2 any, T3 any] struct { + V1 T1 + V2 T2 + V3 T3 + Err error +} -type Promise[OutT any] func(...any) AsyncCall[OutT] - -// llgo:link Async llgo.async -func Async[OutT any](fn any) (ret Promise[OutT]) { +func Await3Compiled[OutT1, OutT2, OutT3 any]( + ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3], + timeout ...time.Duration) (ret *PromiseImpl[Await3Result[OutT1, OutT2, OutT3]]) { return } +func Run(ac AsyncCall[Void]) { + p := ac.(*PromiseImpl[Void]) + p.Resume() + <-ac.Chan() +} + +// ----------------------------------------------------------------------------- + +type Promise[OutT any] func(OutT, error) + +// llgo:link Promise.Await llgo.await +func (p Promise[OutT]) Await(timeout ...time.Duration) (ret OutT, err error) { + return +} + +func (p Promise[OutT]) Chan() <-chan OutT { + return nil +} + +func (p Promise[OutT]) EnsureDone() { + +} + +// ----------------------------------------------------------------------------- + +type PromiseImpl[TOut any] struct { + Func func(resolve func(TOut, error)) + Value TOut + Err error + Prev int + Next int + + c chan TOut +} + +func (p *PromiseImpl[TOut]) Resume() { + p.Func(func(v TOut, err error) { + p.Value = v + p.Err = err + }) +} + +func (p *PromiseImpl[TOut]) EnsureDone() { + if p.Next == -1 { + panic("Promise already done") + } +} + +func (p *PromiseImpl[TOut]) Chan() <-chan TOut { + if p.c == nil { + p.c = make(chan TOut, 1) + p.Func(func(v TOut, err error) { + p.Value = v + p.Err = err + p.c <- v + }) + } + return p.c +} + +func (p *PromiseImpl[TOut]) Await(timeout ...time.Duration) (ret TOut, err error) { + panic("should not called") +} + // -----------------------------------------------------------------------------