From 453faa6a761991737cef56b8dfd4d26912c9c9bb Mon Sep 17 00:00:00 2001 From: xushiwei Date: Sat, 6 Jul 2024 21:59:28 +0800 Subject: [PATCH] runtime: Select/TrySelect --- README.md | 1 + _demo/cchan/cchan.go | 1 + _demo/cchansel/cchansel.go | 41 +++++++++ _demo/chansel/chansel.go | 26 ++++++ c/c.go | 4 + c/pthread/sync/_sema.go | 70 ++++++++++++++ c/sync/atomic/atomic.go | 1 - internal/runtime/z_chan.go | 181 +++++++++++++++++++++++++++++++------ 8 files changed, 297 insertions(+), 28 deletions(-) create mode 100644 _demo/cchansel/cchansel.go create mode 100644 _demo/chansel/chansel.go create mode 100644 c/pthread/sync/_sema.go diff --git a/README.md b/README.md index 9c0d6afb..2f9c518b 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,7 @@ You can import a Python library in LLGo! And you can import any Python library into `llgo` through a program called `llpyg` (see [Development tools](#development-tools)). The following libraries have been included in `llgo`: +* [py](https://pkg.go.dev/github.com/goplus/llgo/py) (abi) * [py/std](https://pkg.go.dev/github.com/goplus/llgo/py/std) (builtins) * [py/sys](https://pkg.go.dev/github.com/goplus/llgo/py/sys) * [py/os](https://pkg.go.dev/github.com/goplus/llgo/py/os) diff --git a/_demo/cchan/cchan.go b/_demo/cchan/cchan.go index 3d2e10a6..11795299 100644 --- a/_demo/cchan/cchan.go +++ b/_demo/cchan/cchan.go @@ -36,6 +36,7 @@ func main() { runtime.ChanClose(c) v = 10 + if runtime.ChanTrySend(c, unsafe.Pointer(&v), eltSize) { println("error: chan send to closed chan") } diff --git a/_demo/cchansel/cchansel.go b/_demo/cchansel/cchansel.go new file mode 100644 index 00000000..4d0fca18 --- /dev/null +++ b/_demo/cchansel/cchansel.go @@ -0,0 +1,41 @@ +package main + +import ( + "unsafe" + + "github.com/goplus/llgo/internal/runtime" +) + +const ( + eltSize = int(unsafe.Sizeof(0)) +) + +func fibonacci(c, quit *runtime.Chan) { + x, y := 0, 1 + for { + isel, _ := runtime.Select( + runtime.ChanOp{C: c, Send: true, Val: unsafe.Pointer(&x), Size: int32(eltSize)}, + runtime.ChanOp{C: quit}, + ) + if isel == 0 { + x, y = y, x+y + } else { + println("quit") + return + } + } +} + +func main() { + c := runtime.NewChan(eltSize, 0) + quit := runtime.NewChan(eltSize, 0) + go func() { + for i := 0; i < 10; i++ { + val := 0 + runtime.ChanRecv(c, unsafe.Pointer(&val), eltSize) + println(val) + } + runtime.ChanClose(quit) + }() + fibonacci(c, quit) +} diff --git a/_demo/chansel/chansel.go b/_demo/chansel/chansel.go new file mode 100644 index 00000000..18ffc61c --- /dev/null +++ b/_demo/chansel/chansel.go @@ -0,0 +1,26 @@ +package main + +func fibonacci(c, quit chan int) { + x, y := 0, 1 + for { + select { + case c <- x: + x, y = y, x+y + case <-quit: + println("quit") + return + } + } +} + +func main() { + c := make(chan int) + quit := make(chan int) + go func() { + for i := 0; i < 10; i++ { + println(<-c) + } + close(quit) + }() + fibonacci(c, quit) +} diff --git a/c/c.go b/c/c.go index 9b7e9854..105335bc 100644 --- a/c/c.go +++ b/c/c.go @@ -68,6 +68,10 @@ func Alloca(size uintptr) Pointer //go:linkname AllocaCStr llgo.allocaCStr func AllocaCStr(s string) *Char +// TODO(xsw): +// llgo:link AllocaNew llgo.allocaNew +func AllocaNew[T any]() *T { return nil } + //go:linkname Malloc C.malloc func Malloc(size uintptr) Pointer diff --git a/c/pthread/sync/_sema.go b/c/pthread/sync/_sema.go new file mode 100644 index 00000000..c46105c7 --- /dev/null +++ b/c/pthread/sync/_sema.go @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package sync + +// #include +import "C" + +import ( + _ "unsafe" + + "github.com/goplus/llgo/c" +) + +// Sem represents a semaphore. +type Sem C.sem_t + +// initializes the unnamed semaphore at the address +// pointed to by sem. The value argument specifies the initial +// value for the semaphore. +// +// The pshared argument indicates whether this semaphore is to be +// shared between the threads of a process, or between processes. +// +// If pshared has the value 0, then the semaphore is shared between +// the threads of a process, and should be located at some address +// that is visible to all threads (e.g., a global variable, or a +// variable allocated dynamically on the heap). +// +// If pshared is nonzero, then the semaphore is shared between +// processes, and should be located in a region of shared memory +// (see shm_open(3), mmap(2), and shmget(2)). (Since a child +// created by fork(2) inherits its parent's memory mappings, it can +// also access the semaphore.) Any process that can access the +// shared memory region can operate on the semaphore using +// sem_post(3), sem_wait(3), and so on. +// +// Initializing a semaphore that has already been initialized +// results in undefined behavior. +// +// llgo:link (*Sem).Init C.sem_init +func (*Sem) Init(pshared c.Int, value c.Uint) c.Int { return 0 } + +// llgo:link (*Sem).Destroy C.sem_destroy +func (*Sem) Destroy() c.Int { return 0 } + +// llgo:link (*Sem).Post C.sem_post +func (*Sem) Post() c.Int { return 0 } + +// llgo:link (*Sem).Wait C.sem_wait +func (*Sem) Wait() c.Int { return 0 } + +// llgo:link (*Sem).TryWait C.sem_trywait +func (*Sem) TryWait() c.Int { return 0 } + +// llgo:link (*Sem).GetValue C.sem_getvalue +func (*Sem) GetValue(sval *c.Int) c.Int { return 0 } diff --git a/c/sync/atomic/atomic.go b/c/sync/atomic/atomic.go index fb96052e..395de99c 100644 --- a/c/sync/atomic/atomic.go +++ b/c/sync/atomic/atomic.go @@ -18,7 +18,6 @@ package atomic import ( "unsafe" - _ "unsafe" ) const ( diff --git a/internal/runtime/z_chan.go b/internal/runtime/z_chan.go index 77fd957b..2438b8b0 100644 --- a/internal/runtime/z_chan.go +++ b/internal/runtime/z_chan.go @@ -26,7 +26,8 @@ import ( // ----------------------------------------------------------------------------- const ( - chanFull = 1 + chanNoSendRecv = 0 + chanHasRecv = 1 ) type Chan struct { @@ -36,6 +37,7 @@ type Chan struct { getp int len int cap int + sops *selectOp close bool } @@ -60,9 +62,16 @@ func ChanCap(p *Chan) int { return p.cap } +func notifyOps(p *Chan) { + for sop := p.sops; sop != nil; sop = sop.next { + sop.notify() + } +} + func ChanClose(p *Chan) { p.mutex.Lock() p.close = true + notifyOps(p) p.mutex.Unlock() p.cond.Broadcast() } @@ -71,12 +80,14 @@ func ChanTrySend(p *Chan, v unsafe.Pointer, eltSize int) bool { n := p.cap p.mutex.Lock() if n == 0 { - if p.getp == chanFull || p.close { + if p.getp != chanHasRecv || p.close { p.mutex.Unlock() return false } - p.data = v - p.getp = chanFull + if p.data != nil { + c.Memcpy(p.data, v, uintptr(eltSize)) + } + p.getp = chanNoSendRecv } else { if p.len == n || p.close { p.mutex.Unlock() @@ -86,6 +97,7 @@ func ChanTrySend(p *Chan, v unsafe.Pointer, eltSize int) bool { c.Memcpy(c.Advance(p.data, off*eltSize), v, uintptr(eltSize)) p.len++ } + notifyOps(p) p.mutex.Unlock() p.cond.Broadcast() return true @@ -95,15 +107,17 @@ func ChanSend(p *Chan, v unsafe.Pointer, eltSize int) bool { n := p.cap p.mutex.Lock() if n == 0 { - for p.getp == chanFull { + for p.getp != chanHasRecv && !p.close { p.cond.Wait(&p.mutex) } if p.close { p.mutex.Unlock() return false } - p.data = v - p.getp = chanFull + if p.data != nil { + c.Memcpy(p.data, v, uintptr(eltSize)) + } + p.getp = chanNoSendRecv } else { for p.len == n { p.cond.Wait(&p.mutex) @@ -116,48 +130,50 @@ func ChanSend(p *Chan, v unsafe.Pointer, eltSize int) bool { c.Memcpy(c.Advance(p.data, off*eltSize), v, uintptr(eltSize)) p.len++ } + notifyOps(p) p.mutex.Unlock() p.cond.Broadcast() return true } -func ChanTryRecv(p *Chan, v unsafe.Pointer, eltSize int) bool { +func ChanTryRecv(p *Chan, v unsafe.Pointer, eltSize int) (recvOK bool, tryOK bool) { n := p.cap p.mutex.Lock() if n == 0 { - if p.getp == 0 { - p.mutex.Unlock() - return false - } - c.Memcpy(v, p.data, uintptr(eltSize)) - p.getp = 0 + tryOK = p.close + p.mutex.Unlock() + return } else { if p.len == 0 { + tryOK = p.close p.mutex.Unlock() - return false + return + } + if v != nil { + c.Memcpy(v, c.Advance(p.data, p.getp*eltSize), uintptr(eltSize)) } - c.Memcpy(v, c.Advance(p.data, p.getp*eltSize), uintptr(eltSize)) p.getp = (p.getp + 1) % n p.len-- } + notifyOps(p) p.mutex.Unlock() p.cond.Broadcast() - return true + return true, true } -func ChanRecv(p *Chan, v unsafe.Pointer, eltSize int) bool { +func ChanRecv(p *Chan, v unsafe.Pointer, eltSize int) (recvOK bool) { n := p.cap p.mutex.Lock() if n == 0 { - for p.getp == 0 { - if p.close { - p.mutex.Unlock() - return false - } + for p.getp == chanHasRecv && !p.close { p.cond.Wait(&p.mutex) } - c.Memcpy(v, p.data, uintptr(eltSize)) - p.getp = 0 + if p.close { + p.mutex.Unlock() + return false + } + p.getp = chanHasRecv + p.data = v } else { for p.len == 0 { if p.close { @@ -166,13 +182,124 @@ func ChanRecv(p *Chan, v unsafe.Pointer, eltSize int) bool { } p.cond.Wait(&p.mutex) } - c.Memcpy(v, c.Advance(p.data, p.getp*eltSize), uintptr(eltSize)) + if v != nil { + c.Memcpy(v, c.Advance(p.data, p.getp*eltSize), uintptr(eltSize)) + } p.getp = (p.getp + 1) % n p.len-- } + notifyOps(p) p.mutex.Unlock() p.cond.Broadcast() - return true + if n == 0 { + p.mutex.Lock() + for p.getp == chanHasRecv && !p.close { + p.cond.Wait(&p.mutex) + } + recvOK = !p.close + p.mutex.Unlock() + } else { + recvOK = true + } + return +} + +// ----------------------------------------------------------------------------- + +type selectOp struct { + mutex sync.Mutex + cond sync.Cond + next *selectOp + + sem bool +} + +func (p *selectOp) init() { + p.mutex.Init(nil) + p.cond.Init(nil) + p.next = nil + p.sem = false +} + +func (p *selectOp) notify() { + p.mutex.Lock() + p.sem = true + p.mutex.Unlock() + p.cond.Signal() +} + +func (p *selectOp) wait() { + p.mutex.Lock() + if !p.sem { + p.cond.Wait(&p.mutex) + } + p.sem = false + p.mutex.Unlock() +} + +// ChanOp represents a channel operation. +type ChanOp struct { + C *Chan + + Val unsafe.Pointer + Size int32 + + Send bool +} + +// TrySelect executes a non-blocking select operation. +func TrySelect(ops ...ChanOp) (isel int, recvOK, tryOK bool) { + for isel = range ops { + op := ops[isel] + if op.Send { + if tryOK = ChanTrySend(op.C, op.Val, int(op.Size)); tryOK { + return + } + } else { + if recvOK, tryOK = ChanTryRecv(op.C, op.Val, int(op.Size)); tryOK { + return + } + } + } + return +} + +// Select executes a blocking select operation. +func Select(ops ...ChanOp) (isel int, recvOK bool) { + selOp := new(selectOp) // TODO(xsw): use c.AllocaNew[selectOp]() + selOp.init() + for _, op := range ops { + prepareSelect(op.C, selOp) + } + var tryOK bool + for { + if isel, recvOK, tryOK = TrySelect(ops...); tryOK { + break + } + selOp.wait() + } + for _, op := range ops { + endSelect(op.C, selOp) + } + return +} + +func prepareSelect(c *Chan, selOp *selectOp) { + c.mutex.Lock() + selOp.next = c.sops + c.sops = selOp + c.mutex.Unlock() +} + +func endSelect(c *Chan, selOp *selectOp) { + c.mutex.Lock() + pp := &c.sops + for *pp != selOp { + pp = &(*pp).next + } + *pp = selOp.next + c.mutex.Unlock() + selOp.next = nil } // -----------------------------------------------------------------------------