make future IO working both on go and llgo
This commit is contained in:
@@ -5,18 +5,16 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/goplus/llgo/c"
|
|
||||||
"github.com/goplus/llgo/c/net"
|
|
||||||
"github.com/goplus/llgo/x/async"
|
"github.com/goplus/llgo/x/async"
|
||||||
"github.com/goplus/llgo/x/async/timeout"
|
"github.com/goplus/llgo/x/async/timeout"
|
||||||
"github.com/goplus/llgo/x/io"
|
"github.com/goplus/llgo/x/socketio"
|
||||||
"github.com/goplus/llgo/x/tuple"
|
"github.com/goplus/llgo/x/tuple"
|
||||||
)
|
)
|
||||||
|
|
||||||
func ReadFile(fileName string) async.Future[tuple.Tuple2[[]byte, error]] {
|
func ReadFile(fileName string) async.Future[tuple.Tuple2[[]byte, error]] {
|
||||||
return async.Async(func(resolve func(tuple.Tuple2[[]byte, error])) {
|
return async.Async(func(resolve func(tuple.Tuple2[[]byte, error])) {
|
||||||
go func() {
|
go func() {
|
||||||
println(async.Gettid(), "read file", fileName)
|
println("read file", fileName)
|
||||||
bytes, err := os.ReadFile(fileName)
|
bytes, err := os.ReadFile(fileName)
|
||||||
resolve(tuple.T2(bytes, err))
|
resolve(tuple.T2(bytes, err))
|
||||||
}()
|
}()
|
||||||
@@ -50,16 +48,17 @@ func main() {
|
|||||||
func RunIO() {
|
func RunIO() {
|
||||||
println("RunIO with Await")
|
println("RunIO with Await")
|
||||||
|
|
||||||
|
// Hide `resolve` in Go+
|
||||||
async.Run(async.Async(func(resolve func(async.Void)) {
|
async.Run(async.Async(func(resolve func(async.Void)) {
|
||||||
println("read file")
|
println("read file")
|
||||||
defer resolve(async.Void{})
|
defer resolve(async.Void{})
|
||||||
content, err := async.Await(ReadFile("1.txt")).Get()
|
content, err := async.Await(ReadFile("all.go")).Get()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("read err: %v\n", err)
|
fmt.Printf("read err: %v\n", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
fmt.Printf("read content: %s\n", content)
|
fmt.Printf("read content: %s\n", content)
|
||||||
err = async.Await(WriteFile("2.txt", content))
|
err = async.Await(WriteFile("2.out", content))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("write err: %v\n", err)
|
fmt.Printf("write err: %v\n", err)
|
||||||
return
|
return
|
||||||
@@ -71,7 +70,7 @@ func RunIO() {
|
|||||||
println("RunIO with BindIO")
|
println("RunIO with BindIO")
|
||||||
|
|
||||||
async.Run(async.Async(func(resolve func(async.Void)) {
|
async.Run(async.Async(func(resolve func(async.Void)) {
|
||||||
ReadFile("1.txt")(func(v tuple.Tuple2[[]byte, error]) {
|
ReadFile("all.go")(func(v tuple.Tuple2[[]byte, error]) {
|
||||||
content, err := v.Get()
|
content, err := v.Get()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("read err: %v\n", err)
|
fmt.Printf("read err: %v\n", err)
|
||||||
@@ -79,7 +78,7 @@ func RunIO() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
fmt.Printf("read content: %s\n", content)
|
fmt.Printf("read content: %s\n", content)
|
||||||
WriteFile("2.txt", content)(func(v error) {
|
WriteFile("2.out", content)(func(v error) {
|
||||||
err = v
|
err = v
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("write err: %v\n", err)
|
fmt.Printf("write err: %v\n", err)
|
||||||
@@ -173,129 +172,66 @@ func RunSocket() {
|
|||||||
|
|
||||||
println("RunClient")
|
println("RunClient")
|
||||||
|
|
||||||
RunClient()(func(async.Void) {
|
timeout.Timeout(100 * time.Millisecond)(func(async.Void) {
|
||||||
println("RunClient done")
|
RunClient()(func(async.Void) {
|
||||||
resolve(async.Void{})
|
println("RunClient done")
|
||||||
|
resolve(async.Void{})
|
||||||
|
})
|
||||||
})
|
})
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
func RunClient() async.Future[async.Void] {
|
func RunClient() async.Future[async.Void] {
|
||||||
return async.Async(func(resolve func(async.Void)) {
|
return async.Async(func(resolve func(async.Void)) {
|
||||||
bindAddr := "127.0.0.1:3927"
|
addr := "127.0.0.1:3927"
|
||||||
io.ParseAddr(bindAddr)(func(v tuple.Tuple2[*net.SockAddr, error]) {
|
socketio.Connect("tcp", addr)(func(v tuple.Tuple2[*socketio.Conn, error]) {
|
||||||
addr, err := v.Get()
|
client, err := v.Get()
|
||||||
println("Connect to", addr, err)
|
println("Connected", client, err)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
io.Connect(addr)(func(v tuple.Tuple2[*io.Tcp, error]) {
|
counter := 0
|
||||||
client, err := v.Get()
|
var loop func(client *socketio.Conn)
|
||||||
println("Connected", client, err)
|
loop = func(client *socketio.Conn) {
|
||||||
if err != nil {
|
counter++
|
||||||
panic(err)
|
data := fmt.Sprintf("Hello %d", counter)
|
||||||
}
|
client.Write([]byte(data))(func(err error) {
|
||||||
var loop func(client *io.Tcp)
|
if err != nil {
|
||||||
loop = func(client *io.Tcp) {
|
panic(err)
|
||||||
client.Write([]byte("Hello"))(func(err error) {
|
}
|
||||||
|
client.Read()(func(v tuple.Tuple2[[]byte, error]) {
|
||||||
|
data, err := v.Get()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
client.Read()(func(v tuple.Tuple2[[]byte, error]) {
|
println("Read from server:", string(data))
|
||||||
data, err := v.Get()
|
timeout.Timeout(1 * time.Second)(func(async.Void) {
|
||||||
if err != nil {
|
loop(client)
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
println("Read:", string(data))
|
|
||||||
timeout.Timeout(1 * time.Second)(func(async.Void) {
|
|
||||||
loop(client)
|
|
||||||
})
|
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
})
|
||||||
loop(client)
|
}
|
||||||
})
|
loop(client)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func RunServer() async.Future[async.Void] {
|
func RunServer() async.Future[async.Void] {
|
||||||
return async.Async(func(resolve func(async.Void)) {
|
return async.Async(func(resolve func(async.Void)) {
|
||||||
server, err := io.NewTcp()
|
socketio.Listen("tcp", "0.0.0.0:3927", func(client *socketio.Conn, err error) {
|
||||||
if err != nil {
|
println("Client connected", client, err)
|
||||||
panic(err)
|
var loop func(client *socketio.Conn)
|
||||||
}
|
loop = func(client *socketio.Conn) {
|
||||||
|
|
||||||
bindAddr := "0.0.0.0:3927"
|
|
||||||
io.ParseAddr(bindAddr)(func(v tuple.Tuple2[*net.SockAddr, error]) {
|
|
||||||
addr, err := v.Get()
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = server.Bind(addr, 0); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
c.Printf(c.Str("Listening on %s\n"), c.AllocaCStr(bindAddr))
|
|
||||||
|
|
||||||
err = server.Listen(128, func(server *io.Tcp, err error) {
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
client, err := server.Accept()
|
|
||||||
println("Accept", client, err)
|
|
||||||
|
|
||||||
var loop func(client *io.Tcp)
|
|
||||||
loop = func(client *io.Tcp) {
|
|
||||||
client.Read()(func(v tuple.Tuple2[[]byte, error]) {
|
|
||||||
data, err := v.Get()
|
|
||||||
if err != nil {
|
|
||||||
println("Read error", err)
|
|
||||||
} else {
|
|
||||||
println("Read:", string(data))
|
|
||||||
client.Write(data)(func(err error) {
|
|
||||||
if err != nil {
|
|
||||||
println("Write error", err)
|
|
||||||
} else {
|
|
||||||
println("Write done")
|
|
||||||
loop(client)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
loop(client)
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func RunServer1() async.Future[async.Void] {
|
|
||||||
return async.Async(func(resolve func(async.Void)) {
|
|
||||||
io.Listen("tcp", "0.0.0.0:3927")(func(v tuple.Tuple2[*io.Tcp, error]) {
|
|
||||||
server, err := v.Get()
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
client, err := server.Accept()
|
|
||||||
println("Accept", client, err)
|
|
||||||
|
|
||||||
var loop func(client *io.Tcp)
|
|
||||||
loop = func(client *io.Tcp) {
|
|
||||||
client.Read()(func(v tuple.Tuple2[[]byte, error]) {
|
client.Read()(func(v tuple.Tuple2[[]byte, error]) {
|
||||||
data, err := v.Get()
|
data, err := v.Get()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
println("Read error", err)
|
println("Read error", err)
|
||||||
} else {
|
} else {
|
||||||
println("Read:", string(data))
|
println("Read from client:", string(data))
|
||||||
client.Write(data)(func(err error) {
|
client.Write(data)(func(err error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
println("Write error", err)
|
println("Write error", err)
|
||||||
} else {
|
} else {
|
||||||
println("Write done")
|
|
||||||
loop(client)
|
loop(client)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -17,45 +17,14 @@
|
|||||||
package async
|
package async
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"unsafe"
|
|
||||||
_ "unsafe"
|
_ "unsafe"
|
||||||
|
|
||||||
"github.com/goplus/llgo/c/libuv"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type Void = [0]byte
|
type Void = [0]byte
|
||||||
|
|
||||||
type Future[T any] func(func(T))
|
type Future[T any] func(func(T))
|
||||||
|
|
||||||
type asyncBind[T any] struct {
|
// Just for pure LLGo/Go, transpile to callback in Go+
|
||||||
libuv.Async
|
func Await[T1 any](call Future[T1]) (ret T1) {
|
||||||
result T
|
return Run(call)
|
||||||
chain func(T)
|
|
||||||
}
|
|
||||||
|
|
||||||
func asyncCb[T any](a *libuv.Async) {
|
|
||||||
a.Close(nil)
|
|
||||||
aa := (*asyncBind[T])(unsafe.Pointer(a))
|
|
||||||
aa.chain(aa.result)
|
|
||||||
}
|
|
||||||
|
|
||||||
func Async[T any](fn func(func(T))) Future[T] {
|
|
||||||
return func(chain func(T)) {
|
|
||||||
loop := Exec().L
|
|
||||||
// var result T
|
|
||||||
// var a *libuv.Async
|
|
||||||
// var cb libuv.AsyncCb
|
|
||||||
// a, cb = cbind.BindF[libuv.Async, libuv.AsyncCb](func() {
|
|
||||||
// a.Close(nil)
|
|
||||||
// chain(result)
|
|
||||||
// })
|
|
||||||
// loop.Async(a, cb)
|
|
||||||
|
|
||||||
aa := &asyncBind[T]{chain: chain}
|
|
||||||
loop.Async(&aa.Async, asyncCb[T])
|
|
||||||
fn(func(v T) {
|
|
||||||
aa.result = v
|
|
||||||
aa.Send()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
//go:build llgo11
|
//go:build !llgo
|
||||||
// +build llgo11
|
// +build !llgo
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
|
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
|
||||||
@@ -21,71 +21,47 @@ package async
|
|||||||
|
|
||||||
import "sync"
|
import "sync"
|
||||||
|
|
||||||
func BindIO[T any](call IO[T], callback func(T)) {
|
func Async[T any](fn func(func(T))) Future[T] {
|
||||||
callback(Await(call))
|
return func(chain func(T)) {
|
||||||
}
|
go fn(chain)
|
||||||
|
}
|
||||||
func Await[T1 any](call IO[T1]) (ret T1) {
|
|
||||||
ch := make(chan struct{})
|
|
||||||
f := call(&AsyncContext{
|
|
||||||
Executor: Exec(),
|
|
||||||
complete: func() {
|
|
||||||
close(ch)
|
|
||||||
},
|
|
||||||
})
|
|
||||||
<-ch
|
|
||||||
return f()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// -----------------------------------------------------------------------------
|
// -----------------------------------------------------------------------------
|
||||||
|
|
||||||
func Race[T1 any](calls ...IO[T1]) IO[T1] {
|
func Race[T1 any](futures ...Future[T1]) Future[T1] {
|
||||||
return Async(func(resolve func(T1)) {
|
return Async(func(resolve func(T1)) {
|
||||||
ch := make(chan int, len(calls))
|
ch := make(chan T1)
|
||||||
futures := make([]Future[T1], len(calls))
|
for _, future := range futures {
|
||||||
for i, call := range calls {
|
future := future
|
||||||
i := i
|
future(func(v T1) {
|
||||||
call := call
|
defer func() {
|
||||||
go func() {
|
// Avoid panic when the channel is closed.
|
||||||
f := call(&AsyncContext{
|
_ = recover()
|
||||||
Executor: Exec(),
|
}()
|
||||||
complete: func() {
|
ch <- v
|
||||||
defer func() {
|
})
|
||||||
_ = recover()
|
|
||||||
}()
|
|
||||||
ch <- i
|
|
||||||
},
|
|
||||||
})
|
|
||||||
futures[i] = f
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
i := <-ch
|
v := <-ch
|
||||||
close(ch)
|
close(ch)
|
||||||
resolve(futures[i]())
|
resolve(v)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func All[T1 any](calls ...IO[T1]) IO[[]T1] {
|
func All[T1 any](futures ...Future[T1]) Future[[]T1] {
|
||||||
return Async(func(resolve func([]T1)) {
|
return Async(func(resolve func([]T1)) {
|
||||||
n := len(calls)
|
n := len(futures)
|
||||||
results := make([]T1, n)
|
results := make([]T1, n)
|
||||||
futures := make([]Future[T1], n)
|
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
wg.Add(n)
|
wg.Add(n)
|
||||||
for i, call := range calls {
|
for i, future := range futures {
|
||||||
i := i
|
i := i
|
||||||
f := call(&AsyncContext{
|
future(func(v T1) {
|
||||||
Executor: Exec(),
|
results[i] = v
|
||||||
complete: func() {
|
wg.Done()
|
||||||
wg.Done()
|
|
||||||
},
|
|
||||||
})
|
})
|
||||||
futures[i] = f
|
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
for i, f := range futures {
|
|
||||||
results[i] = f()
|
|
||||||
}
|
|
||||||
resolve(results)
|
resolve(results)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,3 +1,6 @@
|
|||||||
|
//go:build llgo
|
||||||
|
// +build llgo
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
|
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
|
||||||
*
|
*
|
||||||
@@ -18,10 +21,30 @@ package async
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
|
"github.com/goplus/llgo/c/libuv"
|
||||||
|
"github.com/goplus/llgo/x/cbind"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Await[T1 any](call Future[T1]) (ret T1) {
|
// Currently Async run chain a future that call chain in the goroutine running `async.Run`.
|
||||||
return Run(call)
|
// TODO(lijie): It would better to switch when needed.
|
||||||
|
func Async[T any](fn func(func(T))) Future[T] {
|
||||||
|
return func(chain func(T)) {
|
||||||
|
loop := Exec().L
|
||||||
|
|
||||||
|
var result T
|
||||||
|
var a *libuv.Async
|
||||||
|
var cb libuv.AsyncCb
|
||||||
|
a, cb = cbind.BindF[libuv.Async, libuv.AsyncCb](func(a *libuv.Async) {
|
||||||
|
a.Close(nil)
|
||||||
|
chain(result)
|
||||||
|
})
|
||||||
|
loop.Async(a, cb)
|
||||||
|
fn(func(v T) {
|
||||||
|
result = v
|
||||||
|
a.Send()
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// -----------------------------------------------------------------------------
|
// -----------------------------------------------------------------------------
|
||||||
@@ -32,6 +55,7 @@ func Race[T1 any](futures ...Future[T1]) Future[T1] {
|
|||||||
for _, future := range futures {
|
for _, future := range futures {
|
||||||
future(func(v T1) {
|
future(func(v T1) {
|
||||||
if !done.Swap(true) {
|
if !done.Swap(true) {
|
||||||
|
// Just resolve the first one.
|
||||||
resolve(v)
|
resolve(v)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@@ -49,6 +73,7 @@ func All[T1 any](futures ...Future[T1]) Future[[]T1] {
|
|||||||
future(func(v T1) {
|
future(func(v T1) {
|
||||||
results[i] = v
|
results[i] = v
|
||||||
if atomic.AddUint32(&done, 1) == uint32(n) {
|
if atomic.AddUint32(&done, 1) == uint32(n) {
|
||||||
|
// All done.
|
||||||
resolve(results)
|
resolve(results)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
//go:build llgo11
|
//go:build !llgo
|
||||||
// +build llgo11
|
// +build !llgo
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
|
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
|
||||||
@@ -28,6 +28,12 @@ func Exec() *Executor {
|
|||||||
return exec
|
return exec
|
||||||
}
|
}
|
||||||
|
|
||||||
func Run(fn func()) {
|
func Run[T any](future Future[T]) (ret T) {
|
||||||
fn()
|
ch := make(chan T)
|
||||||
|
go func() {
|
||||||
|
future(func(v T) {
|
||||||
|
ch <- v
|
||||||
|
})
|
||||||
|
}()
|
||||||
|
return <-ch
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,3 +1,6 @@
|
|||||||
|
//go:build llgo
|
||||||
|
// +build llgo
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
|
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
|
||||||
*
|
*
|
||||||
@@ -19,14 +22,10 @@ package async
|
|||||||
import (
|
import (
|
||||||
"unsafe"
|
"unsafe"
|
||||||
|
|
||||||
"github.com/goplus/llgo/c"
|
|
||||||
"github.com/goplus/llgo/c/libuv"
|
"github.com/goplus/llgo/c/libuv"
|
||||||
"github.com/goplus/llgo/c/pthread"
|
"github.com/goplus/llgo/c/pthread"
|
||||||
)
|
)
|
||||||
|
|
||||||
//go:linkname Gettid C.pthread_self
|
|
||||||
func Gettid() c.Pointer
|
|
||||||
|
|
||||||
var execKey pthread.Key
|
var execKey pthread.Key
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
//go:build llgo11
|
//go:build !llgo
|
||||||
// +build llgo11
|
// +build !llgo
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
|
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
|
||||||
@@ -25,7 +25,7 @@ import (
|
|||||||
"github.com/goplus/llgo/x/async"
|
"github.com/goplus/llgo/x/async"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Timeout(d time.Duration) async.IO[async.Void] {
|
func Timeout(d time.Duration) async.Future[async.Void] {
|
||||||
return async.Async(func(resolve func(async.Void)) {
|
return async.Async(func(resolve func(async.Void)) {
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(d)
|
time.Sleep(d)
|
||||||
|
|||||||
@@ -1,3 +1,6 @@
|
|||||||
|
//go:build llgo
|
||||||
|
// +build llgo
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
|
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -83,17 +83,6 @@ func Callback3[Base any, A any, B any, C any](base *Base, a A, b B, c C) {
|
|||||||
* libuv.InitTimer(async.Exec().L, timer)
|
* libuv.InitTimer(async.Exec().L, timer)
|
||||||
* timer.Start(cb, 1000, 0)
|
* timer.Start(cb, 1000, 0)
|
||||||
*
|
*
|
||||||
* TODO(lijie): fn isn't a C func-ptr, it's closure, should fix the LLGo compiler.
|
|
||||||
* See: https://github.com/goplus/llgo/issues/766
|
|
||||||
*
|
|
||||||
* Workaround:
|
|
||||||
*
|
|
||||||
* timer, _ := cbind.Bind[libuv.Timer](func() {
|
|
||||||
* println("hello")
|
|
||||||
* })
|
|
||||||
* libuv.InitTimer(async.Exec().L, timer)
|
|
||||||
* timer.Start(cbind.Callback[libuv.Timer], 1000, 0)
|
|
||||||
*
|
|
||||||
* @param call The Go function to bind.
|
* @param call The Go function to bind.
|
||||||
* @return The data pointer and the C callback function.
|
* @return The data pointer and the C callback function.
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -1,290 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/goplus/llgo/x/io"
|
|
||||||
)
|
|
||||||
|
|
||||||
// -----------------------------------------------------------------------------
|
|
||||||
|
|
||||||
type Response struct {
|
|
||||||
StatusCode int
|
|
||||||
|
|
||||||
mockBody string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Response) mock(body string) {
|
|
||||||
r.mockBody = body
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Response) Text() (resolve io.Promise[string]) {
|
|
||||||
resolve(r.mockBody, nil)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Response) TextCompiled() *io.PromiseImpl[string] {
|
|
||||||
P := &io.PromiseImpl[string]{}
|
|
||||||
P.Func = func(resolve func(string, error)) {
|
|
||||||
for {
|
|
||||||
switch P.Prev = P.Next; P.Prev {
|
|
||||||
case 0:
|
|
||||||
resolve(r.mockBody, nil)
|
|
||||||
P.Next = -1
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
panic("Promise already done")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return P
|
|
||||||
}
|
|
||||||
|
|
||||||
func HttpGet(url string, callback func(resp *Response, err error)) {
|
|
||||||
resp := &Response{StatusCode: 200}
|
|
||||||
callback(resp, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
func AsyncHttpGet(url string) (resolve io.Promise[*Response]) {
|
|
||||||
HttpGet(url, resolve)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func AsyncHttpGetCompiled(url string) *io.PromiseImpl[*Response] {
|
|
||||||
P := &io.PromiseImpl[*Response]{}
|
|
||||||
P.Func = func(resolve func(*Response, error)) {
|
|
||||||
for {
|
|
||||||
switch P.Prev = P.Next; P.Prev {
|
|
||||||
case 0:
|
|
||||||
HttpGet(url, resolve)
|
|
||||||
P.Next = -1
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
panic("Promise already done")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return P
|
|
||||||
}
|
|
||||||
|
|
||||||
// -----------------------------------------------------------------------------
|
|
||||||
|
|
||||||
type User struct {
|
|
||||||
Name string
|
|
||||||
}
|
|
||||||
|
|
||||||
func GetUser(uid string) (resolve io.Promise[User]) {
|
|
||||||
resp, err := AsyncHttpGet("http://example.com/user/" + uid).Await()
|
|
||||||
if err != nil {
|
|
||||||
resolve(User{}, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if resp.StatusCode != 200 {
|
|
||||||
resolve(User{}, fmt.Errorf("http status code: %d", resp.StatusCode))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
resp.mock(`{"name":"Alice"}`)
|
|
||||||
|
|
||||||
body, err := resp.Text().Await()
|
|
||||||
if err != nil {
|
|
||||||
resolve(User{}, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
user := User{}
|
|
||||||
if err := json.Unmarshal([]byte(body), &user); err != nil {
|
|
||||||
resolve(User{}, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
resolve(user, nil)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func GetUserCompiled(uid string) *io.PromiseImpl[User] {
|
|
||||||
var state1 *io.PromiseImpl[*Response]
|
|
||||||
var state2 *io.PromiseImpl[string]
|
|
||||||
|
|
||||||
P := &io.PromiseImpl[User]{}
|
|
||||||
P.Func = func(resolve func(User, error)) {
|
|
||||||
for {
|
|
||||||
switch P.Prev = P.Next; P.Prev {
|
|
||||||
case 0:
|
|
||||||
state1 = AsyncHttpGetCompiled("http://example.com/user/" + uid)
|
|
||||||
P.Next = 1
|
|
||||||
return
|
|
||||||
case 1:
|
|
||||||
state1.EnsureDone()
|
|
||||||
resp, err := state1.Value, state1.Err
|
|
||||||
if err != nil {
|
|
||||||
resolve(User{}, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if resp.StatusCode != 200 {
|
|
||||||
resolve(User{}, fmt.Errorf("http status code: %d", resp.StatusCode))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
resp.mock(`{"name":"Alice"}`)
|
|
||||||
|
|
||||||
state2 = resp.TextCompiled()
|
|
||||||
P.Next = 2
|
|
||||||
return
|
|
||||||
case 2:
|
|
||||||
state2.EnsureDone()
|
|
||||||
body, err := state2.Value, state2.Err
|
|
||||||
if err != nil {
|
|
||||||
resolve(User{}, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
user := User{}
|
|
||||||
if err := json.Unmarshal([]byte(body), &user); err != nil {
|
|
||||||
resolve(User{}, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
resolve(user, nil)
|
|
||||||
P.Next = -1
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
panic("Promise already done")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return P
|
|
||||||
}
|
|
||||||
|
|
||||||
func GetScore() *io.Promise[float64] {
|
|
||||||
panic("todo: GetScore")
|
|
||||||
}
|
|
||||||
|
|
||||||
func GetScoreCompiled() *io.PromiseImpl[float64] {
|
|
||||||
P := &io.PromiseImpl[float64]{}
|
|
||||||
P.Func = func(resolve func(float64, error)) {
|
|
||||||
for {
|
|
||||||
switch P.Prev = P.Next; P.Prev {
|
|
||||||
case 0:
|
|
||||||
panic("todo: GetScore")
|
|
||||||
default:
|
|
||||||
panic("Promise already done")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return P
|
|
||||||
}
|
|
||||||
|
|
||||||
func DoUpdate(op string) *io.Promise[io.Void] {
|
|
||||||
panic("todo: DoUpdate")
|
|
||||||
}
|
|
||||||
|
|
||||||
func DoUpdateCompiled(op string) *io.PromiseImpl[io.Void] {
|
|
||||||
P := &io.PromiseImpl[io.Void]{}
|
|
||||||
P.Func = func(resolve func(io.Void, error)) {
|
|
||||||
for {
|
|
||||||
switch P.Prev = P.Next; P.Prev {
|
|
||||||
case 0:
|
|
||||||
panic("todo: DoUpdate")
|
|
||||||
default:
|
|
||||||
panic("Promise already done")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return P
|
|
||||||
}
|
|
||||||
|
|
||||||
func Demo() (resolve io.Promise[io.Void]) {
|
|
||||||
user, err := GetUser("123").Await()
|
|
||||||
log.Println(user, err)
|
|
||||||
|
|
||||||
user, err = io.Race[User](GetUser("123"), GetUser("456"), GetUser("789")).Await()
|
|
||||||
log.Println(user, err)
|
|
||||||
|
|
||||||
users, err := io.All[User]([]io.AsyncCall[User]{GetUser("123"), GetUser("456"), GetUser("789")}).Await()
|
|
||||||
log.Println(users, err)
|
|
||||||
|
|
||||||
user, score, _, err := io.Await3[User, float64, io.Void](GetUser("123"), GetScore(), DoUpdate("update sth."))
|
|
||||||
log.Println(user, score, err)
|
|
||||||
|
|
||||||
// TODO(lijie): select from multiple promises without channel
|
|
||||||
select {
|
|
||||||
case user := <-GetUser("123").Chan():
|
|
||||||
log.Println("user:", user)
|
|
||||||
case score := <-GetScore().Chan():
|
|
||||||
log.Println("score:", score)
|
|
||||||
case <-io.Timeout(5 * time.Second).Chan():
|
|
||||||
log.Println("timeout")
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func DemoCompiled() *io.PromiseImpl[io.Void] {
|
|
||||||
var state1 *io.PromiseImpl[User]
|
|
||||||
var state2 *io.PromiseImpl[User]
|
|
||||||
var state3 *io.PromiseImpl[[]User]
|
|
||||||
var state4 *io.PromiseImpl[io.Await3Result[User, float64, io.Void]]
|
|
||||||
|
|
||||||
P := &io.PromiseImpl[io.Void]{}
|
|
||||||
P.Func = func(resolve func(io.Void, error)) {
|
|
||||||
for {
|
|
||||||
switch P.Prev = P.Next; P.Prev {
|
|
||||||
case 0:
|
|
||||||
state1 = GetUserCompiled("123")
|
|
||||||
P.Next = 1
|
|
||||||
return
|
|
||||||
case 1:
|
|
||||||
state1.EnsureDone()
|
|
||||||
user, err := state1.Value, state1.Err
|
|
||||||
log.Printf("user: %v, err: %v\n", user, err)
|
|
||||||
|
|
||||||
state2 = io.Race[User](GetUserCompiled("123"), GetUserCompiled("456"), GetUserCompiled("789"))
|
|
||||||
P.Next = 2
|
|
||||||
return
|
|
||||||
case 2:
|
|
||||||
state2.EnsureDone()
|
|
||||||
user, err := state2.Value, state2.Err
|
|
||||||
log.Println(user, err)
|
|
||||||
|
|
||||||
state3 = io.All[User]([]io.AsyncCall[User]{GetUserCompiled("123"), GetUserCompiled("456"), GetUserCompiled("789")})
|
|
||||||
P.Next = 3
|
|
||||||
return
|
|
||||||
case 3:
|
|
||||||
state3.EnsureDone()
|
|
||||||
users, err := state3.Value, state3.Err
|
|
||||||
log.Println(users, err)
|
|
||||||
|
|
||||||
state4 = io.Await3Compiled[User, float64, io.Void](GetUserCompiled("123"), GetScoreCompiled(), DoUpdateCompiled("update sth."))
|
|
||||||
P.Next = 4
|
|
||||||
return
|
|
||||||
case 4:
|
|
||||||
state4.EnsureDone()
|
|
||||||
user, score, _, err := state4.Value.V1, state4.Value.V2, state4.Value.V3, state4.Value.Err
|
|
||||||
log.Println(user, score, err)
|
|
||||||
|
|
||||||
select {
|
|
||||||
case user := <-GetUserCompiled("123").Chan():
|
|
||||||
log.Println("user:", user)
|
|
||||||
case score := <-GetScoreCompiled().Chan():
|
|
||||||
log.Println("score:", score)
|
|
||||||
case <-io.TimeoutCompiled(5 * time.Second).Chan():
|
|
||||||
log.Println("timeout")
|
|
||||||
}
|
|
||||||
P.Next = -1
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
panic("Promise already done")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return P
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
log.SetFlags(log.Lshortfile | log.LstdFlags)
|
|
||||||
// io.Run(Demo())
|
|
||||||
io.Run(DemoCompiled())
|
|
||||||
}
|
|
||||||
92
x/socketio/socketio_go.go
Normal file
92
x/socketio/socketio_go.go
Normal file
@@ -0,0 +1,92 @@
|
|||||||
|
//go:build !llgo
|
||||||
|
// +build !llgo
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package socketio
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net"
|
||||||
|
|
||||||
|
"github.com/goplus/llgo/x/async"
|
||||||
|
"github.com/goplus/llgo/x/tuple"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Conn struct {
|
||||||
|
conn net.Conn
|
||||||
|
}
|
||||||
|
|
||||||
|
func Listen(protocol, bindAddr string, listenCb func(client *Conn, err error)) {
|
||||||
|
go func() {
|
||||||
|
listener, err := net.Listen(protocol, bindAddr)
|
||||||
|
if err != nil {
|
||||||
|
listenCb(nil, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
conn, err := listener.Accept()
|
||||||
|
if err != nil {
|
||||||
|
listenCb(nil, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
listenCb(&Conn{conn: conn}, nil)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func Connect(network, addr string) async.Future[tuple.Tuple2[*Conn, error]] {
|
||||||
|
return async.Async(func(resolve func(tuple.Tuple2[*Conn, error])) {
|
||||||
|
go func() {
|
||||||
|
conn, err := net.Dial(network, addr)
|
||||||
|
if err != nil {
|
||||||
|
resolve(tuple.T2[*Conn, error](nil, err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
resolve(tuple.T2[*Conn, error](&Conn{conn: conn}, nil))
|
||||||
|
}()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read once from the TCP connection.
|
||||||
|
func (t *Conn) Read() async.Future[tuple.Tuple2[[]byte, error]] {
|
||||||
|
return async.Async(func(resolve func(tuple.Tuple2[[]byte, error])) {
|
||||||
|
go func() {
|
||||||
|
buf := make([]byte, 1024)
|
||||||
|
n, err := t.conn.Read(buf)
|
||||||
|
if err != nil {
|
||||||
|
resolve(tuple.T2[[]byte, error](nil, err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
resolve(tuple.T2[[]byte, error](buf[:n], nil))
|
||||||
|
}()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Conn) Write(data []byte) async.Future[error] {
|
||||||
|
return async.Async(func(resolve func(error)) {
|
||||||
|
go func() {
|
||||||
|
_, err := t.conn.Write(data)
|
||||||
|
resolve(err)
|
||||||
|
}()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Conn) Close() {
|
||||||
|
if t.conn != nil {
|
||||||
|
t.conn.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,3 +1,6 @@
|
|||||||
|
//go:build llgo
|
||||||
|
// +build llgo
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
|
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
|
||||||
*
|
*
|
||||||
@@ -14,7 +17,7 @@
|
|||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package io
|
package socketio
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"strings"
|
"strings"
|
||||||
@@ -30,11 +33,14 @@ import (
|
|||||||
"github.com/goplus/llgo/x/tuple"
|
"github.com/goplus/llgo/x/tuple"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Tcp struct {
|
type Listener struct {
|
||||||
tcp libuv.Tcp
|
tcp libuv.Tcp
|
||||||
listenCb func(server *Tcp, err error)
|
listenCb func(server *Listener, err error)
|
||||||
readCb func([]byte, error)
|
}
|
||||||
writeCb func(int, error)
|
|
||||||
|
type Conn struct {
|
||||||
|
tcp libuv.Tcp
|
||||||
|
readCb func([]byte, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type libuvError libuv.Errno
|
type libuvError libuv.Errno
|
||||||
@@ -58,7 +64,7 @@ func getAddrInfoCb(p *libuv.GetAddrInfo, status c.Int, addr *net.AddrInfo) {
|
|||||||
bind.resolve(tuple.T2[*net.SockAddr, error](addr.Addr, nil))
|
bind.resolve(tuple.T2[*net.SockAddr, error](addr.Addr, nil))
|
||||||
}
|
}
|
||||||
|
|
||||||
func ParseAddr(addr string) async.Future[tuple.Tuple2[*net.SockAddr, error]] {
|
func parseAddr(addr string) async.Future[tuple.Tuple2[*net.SockAddr, error]] {
|
||||||
return async.Async(func(resolve func(tuple.Tuple2[*net.SockAddr, error])) {
|
return async.Async(func(resolve func(tuple.Tuple2[*net.SockAddr, error])) {
|
||||||
host := "127.0.0.1"
|
host := "127.0.0.1"
|
||||||
var port string
|
var port string
|
||||||
@@ -78,73 +84,64 @@ func ParseAddr(addr string) async.Future[tuple.Tuple2[*net.SockAddr, error]] {
|
|||||||
Flags: 0,
|
Flags: 0,
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(lijie): closure problem, instead with a struct to hold the resolve function.
|
req, cb := cbind.Bind2F[libuv.GetAddrInfo, libuv.GetaddrinfoCb](func(i *libuv.GetAddrInfo, status c.Int, addr *net.AddrInfo) {
|
||||||
// req, cb := cbind.Bind2F[libuv.GetAddrInfo, libuv.GetaddrinfoCb](func(status c.Int, addr *net.AddrInfo) {
|
if status != 0 {
|
||||||
// if status != 0 {
|
resolve(tuple.T2[*net.SockAddr, error](nil, libuvError(status)))
|
||||||
// resolve(tuple.T2[*net.SockAddr, error](nil, libuvError(status)))
|
return
|
||||||
// return
|
}
|
||||||
// }
|
resolve(tuple.T2[*net.SockAddr, error](addr.Addr, nil))
|
||||||
// resolve(tuple.T2[*net.SockAddr, error](addr.Addr, nil))
|
})
|
||||||
// })
|
if res := libuv.Getaddrinfo(async.Exec().L, req, cb, c.AllocaCStr(host), c.AllocaCStr(port), hints); res != 0 {
|
||||||
// if res := libuv.Getaddrinfo(async.Exec().L, req, cb, c.AllocaCStr(host), c.AllocaCStr(port), hints); res != 0 {
|
|
||||||
// resolve(tuple.T2[*net.SockAddr, error](nil, libuvError(res)))
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
bind := &getAddrInfoBind{
|
|
||||||
resolve: resolve,
|
|
||||||
}
|
|
||||||
if res := libuv.Getaddrinfo(async.Exec().L, &bind.GetAddrInfo, getAddrInfoCb, c.AllocaCStr(host), c.AllocaCStr(port), hints); res != 0 {
|
|
||||||
resolve(tuple.T2[*net.SockAddr, error](nil, libuvError(res)))
|
resolve(tuple.T2[*net.SockAddr, error](nil, libuvError(res)))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func Listen(protocol, bindAddr string) async.Future[tuple.Tuple2[*Tcp, error]] {
|
func Listen(protocol, bindAddr string, listenCb func(client *Conn, err error)) {
|
||||||
return async.Async(func(resolve func(tuple.Tuple2[*Tcp, error])) {
|
tcp, err := newListener()
|
||||||
tcp, err := NewTcp()
|
if err != nil {
|
||||||
|
listenCb(nil, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
parseAddr(bindAddr)(func(v tuple.Tuple2[*net.SockAddr, error]) {
|
||||||
|
addr, err := v.Get()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
resolve(tuple.T2[*Tcp, error](nil, err))
|
listenCb(nil, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
ParseAddr(bindAddr)(func(v tuple.Tuple2[*net.SockAddr, error]) {
|
if err := tcp.bind(addr, 0); err != nil {
|
||||||
addr, err := v.Get()
|
listenCb(nil, err)
|
||||||
if err != nil {
|
return
|
||||||
resolve(tuple.T2[*Tcp, error](nil, err))
|
}
|
||||||
return
|
if err := tcp.listen(128, func(server *Listener, err error) {
|
||||||
}
|
client, err := server.accept()
|
||||||
if err := tcp.Bind(addr, 0); err != nil {
|
listenCb(client, err)
|
||||||
resolve(tuple.T2[*Tcp, error](nil, err))
|
}); err != nil {
|
||||||
return
|
listenCb(nil, err)
|
||||||
}
|
}
|
||||||
if err := tcp.Listen(128, func(server *Tcp, err error) {
|
|
||||||
resolve(tuple.T2[*Tcp, error](server, err))
|
|
||||||
}); err != nil {
|
|
||||||
resolve(tuple.T2[*Tcp, error](nil, err))
|
|
||||||
}
|
|
||||||
})
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTcp() (*Tcp, error) {
|
func newListener() (*Listener, error) {
|
||||||
t := &Tcp{}
|
t := &Listener{}
|
||||||
if res := libuv.InitTcp(async.Exec().L, &t.tcp); res != 0 {
|
if res := libuv.InitTcp(async.Exec().L, &t.tcp); res != 0 {
|
||||||
return nil, libuvError(res)
|
return nil, libuvError(res)
|
||||||
}
|
}
|
||||||
return t, nil
|
return t, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Tcp) Bind(addr *net.SockAddr, flags uint) error {
|
func (t *Listener) bind(addr *net.SockAddr, flags uint) error {
|
||||||
if res := t.tcp.Bind(addr, c.Uint(flags)); res != 0 {
|
if res := t.tcp.Bind(addr, c.Uint(flags)); res != 0 {
|
||||||
return libuvError(res)
|
return libuvError(res)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Tcp) Listen(backlog int, cb func(server *Tcp, err error)) error {
|
func (l *Listener) listen(backlog int, cb func(server *Listener, err error)) error {
|
||||||
t.listenCb = cb
|
l.listenCb = cb
|
||||||
res := (*libuv.Stream)(&t.tcp).Listen(c.Int(backlog), func(s *libuv.Stream, status c.Int) {
|
res := (*libuv.Stream)(&l.tcp).Listen(c.Int(backlog), func(s *libuv.Stream, status c.Int) {
|
||||||
server := (*Tcp)(unsafe.Pointer(s))
|
server := (*Listener)(unsafe.Pointer(s))
|
||||||
if status != 0 {
|
if status != 0 {
|
||||||
server.listenCb(server, libuvError(libuv.Errno(status)))
|
server.listenCb(server, libuvError(libuv.Errno(status)))
|
||||||
} else {
|
} else {
|
||||||
@@ -157,54 +154,43 @@ func (t *Tcp) Listen(backlog int, cb func(server *Tcp, err error)) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Tcp) Accept() (client *Tcp, err error) {
|
func (l *Listener) accept() (client *Conn, err error) {
|
||||||
tcp := &Tcp{}
|
tcp := &Conn{}
|
||||||
if res := libuv.InitTcp(async.Exec().L, &tcp.tcp); res != 0 {
|
if res := libuv.InitTcp(async.Exec().L, &tcp.tcp); res != 0 {
|
||||||
return nil, libuvError(res)
|
return nil, libuvError(res)
|
||||||
}
|
}
|
||||||
if res := (*libuv.Stream)(&t.tcp).Accept((*libuv.Stream)(&tcp.tcp)); res != 0 {
|
if res := (*libuv.Stream)(&l.tcp).Accept((*libuv.Stream)(&tcp.tcp)); res != 0 {
|
||||||
return nil, libuvError(res)
|
return nil, libuvError(res)
|
||||||
}
|
}
|
||||||
return tcp, nil
|
return tcp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type connectBind struct {
|
func Connect(network, addr string) async.Future[tuple.Tuple2[*Conn, error]] {
|
||||||
libuv.Connect
|
return async.Async(func(resolve func(tuple.Tuple2[*Conn, error])) {
|
||||||
tcp *Tcp
|
parseAddr(addr)(func(v tuple.Tuple2[*net.SockAddr, error]) {
|
||||||
resolve func(tuple.Tuple2[*Tcp, error])
|
addr, err := v.Get()
|
||||||
}
|
if err != nil {
|
||||||
|
resolve(tuple.T2[*Conn, error]((*Conn)(nil), err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func connectCb(p *libuv.Connect, status c.Int) {
|
tcp := &Conn{}
|
||||||
bind := (*connectBind)(unsafe.Pointer(p))
|
if res := libuv.InitTcp(async.Exec().L, &tcp.tcp); res != 0 {
|
||||||
if status != 0 {
|
resolve(tuple.T2[*Conn, error]((*Conn)(nil), libuvError(res)))
|
||||||
bind.resolve(tuple.T2[*Tcp, error](nil, libuvError(libuv.Errno(status))))
|
return
|
||||||
} else {
|
}
|
||||||
bind.resolve(tuple.T2[*Tcp, error](bind.tcp, nil))
|
req, cb := cbind.Bind1F[libuv.Connect, libuv.ConnectCb](func(c *libuv.Connect, status c.Int) {
|
||||||
}
|
if status != 0 {
|
||||||
}
|
resolve(tuple.T2[*Conn, error]((*Conn)(nil), libuvError(libuv.Errno(status))))
|
||||||
|
} else {
|
||||||
func Connect(addr *net.SockAddr) async.Future[tuple.Tuple2[*Tcp, error]] {
|
resolve(tuple.T2[*Conn, error](tcp, nil))
|
||||||
return async.Async(func(resolve func(tuple.Tuple2[*Tcp, error])) {
|
}
|
||||||
tcp := &Tcp{}
|
})
|
||||||
if res := libuv.InitTcp(async.Exec().L, &tcp.tcp); res != 0 {
|
if res := libuv.TcpConnect(req, &tcp.tcp, addr, cb); res != 0 {
|
||||||
resolve(tuple.T2[*Tcp, error]((*Tcp)(nil), libuvError(res)))
|
resolve(tuple.T2[*Conn, error]((*Conn)(nil), libuvError(res)))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// req, _ := cbind.Bind1[libuv.Connect](func(status c.Int) {
|
})
|
||||||
// if status != 0 {
|
|
||||||
// resolve(tuple.T2[*Tcp, error]((*Tcp)(nil), libuvError(libuv.Errno(status))))
|
|
||||||
// } else {
|
|
||||||
// resolve(tuple.T2[*Tcp, error](tcp, nil))
|
|
||||||
// }
|
|
||||||
// })
|
|
||||||
req := &connectBind{
|
|
||||||
tcp: tcp,
|
|
||||||
resolve: resolve,
|
|
||||||
}
|
|
||||||
if res := libuv.TcpConnect(&req.Connect, &req.tcp.tcp, addr, connectCb); res != 0 {
|
|
||||||
resolve(tuple.T2[*Tcp, error]((*Tcp)(nil), libuvError(res)))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -213,13 +199,13 @@ func allocBuffer(handle *libuv.Handle, suggestedSize uintptr, buf *libuv.Buf) {
|
|||||||
buf.Len = suggestedSize
|
buf.Len = suggestedSize
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Tcp) StartRead(fn func(data []byte, err error)) {
|
func (t *Conn) StartRead(fn func(data []byte, err error)) {
|
||||||
t.readCb = func(data []byte, err error) {
|
t.readCb = func(data []byte, err error) {
|
||||||
fn(data, err)
|
fn(data, err)
|
||||||
}
|
}
|
||||||
tcp := (*libuv.Stream)(&t.tcp)
|
tcp := (*libuv.Stream)(&t.tcp)
|
||||||
res := tcp.StartRead(allocBuffer, func(client *libuv.Stream, nread c.Long, buf *libuv.Buf) {
|
res := tcp.StartRead(allocBuffer, func(client *libuv.Stream, nread c.Long, buf *libuv.Buf) {
|
||||||
tcp := (*Tcp)(unsafe.Pointer(client))
|
tcp := (*Conn)(unsafe.Pointer(client))
|
||||||
if nread > 0 {
|
if nread > 0 {
|
||||||
tcp.readCb(cbind.GoBytes(buf.Base, int(nread)), nil)
|
tcp.readCb(cbind.GoBytes(buf.Base, int(nread)), nil)
|
||||||
} else if nread < 0 {
|
} else if nread < 0 {
|
||||||
@@ -233,7 +219,7 @@ func (t *Tcp) StartRead(fn func(data []byte, err error)) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Tcp) StopRead() error {
|
func (t *Conn) StopRead() error {
|
||||||
tcp := (*libuv.Stream)(&t.tcp)
|
tcp := (*libuv.Stream)(&t.tcp)
|
||||||
if res := tcp.StopRead(); res != 0 {
|
if res := tcp.StopRead(); res != 0 {
|
||||||
return libuvError(libuv.Errno(res))
|
return libuvError(libuv.Errno(res))
|
||||||
@@ -242,7 +228,7 @@ func (t *Tcp) StopRead() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Read once from the TCP connection.
|
// Read once from the TCP connection.
|
||||||
func (t *Tcp) Read() async.Future[tuple.Tuple2[[]byte, error]] {
|
func (t *Conn) Read() async.Future[tuple.Tuple2[[]byte, error]] {
|
||||||
return async.Async(func(resolve func(tuple.Tuple2[[]byte, error])) {
|
return async.Async(func(resolve func(tuple.Tuple2[[]byte, error])) {
|
||||||
t.StartRead(func(data []byte, err error) {
|
t.StartRead(func(data []byte, err error) {
|
||||||
if err := t.StopRead(); err != nil {
|
if err := t.StopRead(); err != nil {
|
||||||
@@ -253,7 +239,7 @@ func (t *Tcp) Read() async.Future[tuple.Tuple2[[]byte, error]] {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Tcp) Write(data []byte) async.Future[error] {
|
func (t *Conn) Write(data []byte) async.Future[error] {
|
||||||
return async.Async(func(resolve func(error)) {
|
return async.Async(func(resolve func(error)) {
|
||||||
writer, _ := cbind.Bind1[libuv.Write](func(req *libuv.Write, status c.Int) {
|
writer, _ := cbind.Bind1[libuv.Write](func(req *libuv.Write, status c.Int) {
|
||||||
var result error
|
var result error
|
||||||
@@ -269,23 +255,6 @@ func (t *Tcp) Write(data []byte) async.Future[error] {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Don't use this funciton, just for deubg closure problem.
|
func (t *Conn) Close() {
|
||||||
func (t *Tcp) Write1(data []byte) async.Future[error] {
|
|
||||||
return async.Async(func(resolve func(e error)) {
|
|
||||||
writer, cb := cbind.Bind1F[libuv.Write, libuv.WriteCb](func(req *libuv.Write, status c.Int) {
|
|
||||||
if status != 0 {
|
|
||||||
resolve(libuvError(libuv.Errno(status)))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
resolve(nil)
|
|
||||||
})
|
|
||||||
tcp := (*libuv.Stream)(&t.tcp)
|
|
||||||
buf, len := cbind.CBuffer(data)
|
|
||||||
bufs := &libuv.Buf{Base: buf, Len: uintptr(len)}
|
|
||||||
writer.Write(tcp, bufs, 1, cb)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *Tcp) Close() {
|
|
||||||
(*libuv.Handle)(unsafe.Pointer(&t.tcp)).Close(nil)
|
(*libuv.Handle)(unsafe.Pointer(&t.tcp)).Close(nil)
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user