Merge pull request #452 from xushiwei/q

runtime: Select/TrySelect
This commit is contained in:
xushiwei
2024-07-06 22:05:57 +08:00
committed by GitHub
8 changed files with 297 additions and 28 deletions

View File

@@ -59,6 +59,7 @@ You can import a Python library in LLGo!
And you can import any Python library into `llgo` through a program called `llpyg` (see [Development tools](#development-tools)). The following libraries have been included in `llgo`: And you can import any Python library into `llgo` through a program called `llpyg` (see [Development tools](#development-tools)). The following libraries have been included in `llgo`:
* [py](https://pkg.go.dev/github.com/goplus/llgo/py) (abi)
* [py/std](https://pkg.go.dev/github.com/goplus/llgo/py/std) (builtins) * [py/std](https://pkg.go.dev/github.com/goplus/llgo/py/std) (builtins)
* [py/sys](https://pkg.go.dev/github.com/goplus/llgo/py/sys) * [py/sys](https://pkg.go.dev/github.com/goplus/llgo/py/sys)
* [py/os](https://pkg.go.dev/github.com/goplus/llgo/py/os) * [py/os](https://pkg.go.dev/github.com/goplus/llgo/py/os)

View File

@@ -36,6 +36,7 @@ func main() {
runtime.ChanClose(c) runtime.ChanClose(c)
v = 10 v = 10
if runtime.ChanTrySend(c, unsafe.Pointer(&v), eltSize) { if runtime.ChanTrySend(c, unsafe.Pointer(&v), eltSize) {
println("error: chan send to closed chan") println("error: chan send to closed chan")
} }

View File

@@ -0,0 +1,41 @@
package main
import (
"unsafe"
"github.com/goplus/llgo/internal/runtime"
)
const (
eltSize = int(unsafe.Sizeof(0))
)
func fibonacci(c, quit *runtime.Chan) {
x, y := 0, 1
for {
isel, _ := runtime.Select(
runtime.ChanOp{C: c, Send: true, Val: unsafe.Pointer(&x), Size: int32(eltSize)},
runtime.ChanOp{C: quit},
)
if isel == 0 {
x, y = y, x+y
} else {
println("quit")
return
}
}
}
func main() {
c := runtime.NewChan(eltSize, 0)
quit := runtime.NewChan(eltSize, 0)
go func() {
for i := 0; i < 10; i++ {
val := 0
runtime.ChanRecv(c, unsafe.Pointer(&val), eltSize)
println(val)
}
runtime.ChanClose(quit)
}()
fibonacci(c, quit)
}

26
_demo/chansel/chansel.go Normal file
View File

@@ -0,0 +1,26 @@
package main
func fibonacci(c, quit chan int) {
x, y := 0, 1
for {
select {
case c <- x:
x, y = y, x+y
case <-quit:
println("quit")
return
}
}
}
func main() {
c := make(chan int)
quit := make(chan int)
go func() {
for i := 0; i < 10; i++ {
println(<-c)
}
close(quit)
}()
fibonacci(c, quit)
}

4
c/c.go
View File

@@ -68,6 +68,10 @@ func Alloca(size uintptr) Pointer
//go:linkname AllocaCStr llgo.allocaCStr //go:linkname AllocaCStr llgo.allocaCStr
func AllocaCStr(s string) *Char func AllocaCStr(s string) *Char
// TODO(xsw):
// llgo:link AllocaNew llgo.allocaNew
func AllocaNew[T any]() *T { return nil }
//go:linkname Malloc C.malloc //go:linkname Malloc C.malloc
func Malloc(size uintptr) Pointer func Malloc(size uintptr) Pointer

70
c/pthread/sync/_sema.go Normal file
View File

@@ -0,0 +1,70 @@
/*
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sync
// #include <semaphore.h>
import "C"
import (
_ "unsafe"
"github.com/goplus/llgo/c"
)
// Sem represents a semaphore.
type Sem C.sem_t
// initializes the unnamed semaphore at the address
// pointed to by sem. The value argument specifies the initial
// value for the semaphore.
//
// The pshared argument indicates whether this semaphore is to be
// shared between the threads of a process, or between processes.
//
// If pshared has the value 0, then the semaphore is shared between
// the threads of a process, and should be located at some address
// that is visible to all threads (e.g., a global variable, or a
// variable allocated dynamically on the heap).
//
// If pshared is nonzero, then the semaphore is shared between
// processes, and should be located in a region of shared memory
// (see shm_open(3), mmap(2), and shmget(2)). (Since a child
// created by fork(2) inherits its parent's memory mappings, it can
// also access the semaphore.) Any process that can access the
// shared memory region can operate on the semaphore using
// sem_post(3), sem_wait(3), and so on.
//
// Initializing a semaphore that has already been initialized
// results in undefined behavior.
//
// llgo:link (*Sem).Init C.sem_init
func (*Sem) Init(pshared c.Int, value c.Uint) c.Int { return 0 }
// llgo:link (*Sem).Destroy C.sem_destroy
func (*Sem) Destroy() c.Int { return 0 }
// llgo:link (*Sem).Post C.sem_post
func (*Sem) Post() c.Int { return 0 }
// llgo:link (*Sem).Wait C.sem_wait
func (*Sem) Wait() c.Int { return 0 }
// llgo:link (*Sem).TryWait C.sem_trywait
func (*Sem) TryWait() c.Int { return 0 }
// llgo:link (*Sem).GetValue C.sem_getvalue
func (*Sem) GetValue(sval *c.Int) c.Int { return 0 }

View File

@@ -18,7 +18,6 @@ package atomic
import ( import (
"unsafe" "unsafe"
_ "unsafe"
) )
const ( const (

View File

@@ -26,7 +26,8 @@ import (
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
const ( const (
chanFull = 1 chanNoSendRecv = 0
chanHasRecv = 1
) )
type Chan struct { type Chan struct {
@@ -36,6 +37,7 @@ type Chan struct {
getp int getp int
len int len int
cap int cap int
sops *selectOp
close bool close bool
} }
@@ -60,9 +62,16 @@ func ChanCap(p *Chan) int {
return p.cap return p.cap
} }
func notifyOps(p *Chan) {
for sop := p.sops; sop != nil; sop = sop.next {
sop.notify()
}
}
func ChanClose(p *Chan) { func ChanClose(p *Chan) {
p.mutex.Lock() p.mutex.Lock()
p.close = true p.close = true
notifyOps(p)
p.mutex.Unlock() p.mutex.Unlock()
p.cond.Broadcast() p.cond.Broadcast()
} }
@@ -71,12 +80,14 @@ func ChanTrySend(p *Chan, v unsafe.Pointer, eltSize int) bool {
n := p.cap n := p.cap
p.mutex.Lock() p.mutex.Lock()
if n == 0 { if n == 0 {
if p.getp == chanFull || p.close { if p.getp != chanHasRecv || p.close {
p.mutex.Unlock() p.mutex.Unlock()
return false return false
} }
p.data = v if p.data != nil {
p.getp = chanFull c.Memcpy(p.data, v, uintptr(eltSize))
}
p.getp = chanNoSendRecv
} else { } else {
if p.len == n || p.close { if p.len == n || p.close {
p.mutex.Unlock() p.mutex.Unlock()
@@ -86,6 +97,7 @@ func ChanTrySend(p *Chan, v unsafe.Pointer, eltSize int) bool {
c.Memcpy(c.Advance(p.data, off*eltSize), v, uintptr(eltSize)) c.Memcpy(c.Advance(p.data, off*eltSize), v, uintptr(eltSize))
p.len++ p.len++
} }
notifyOps(p)
p.mutex.Unlock() p.mutex.Unlock()
p.cond.Broadcast() p.cond.Broadcast()
return true return true
@@ -95,15 +107,17 @@ func ChanSend(p *Chan, v unsafe.Pointer, eltSize int) bool {
n := p.cap n := p.cap
p.mutex.Lock() p.mutex.Lock()
if n == 0 { if n == 0 {
for p.getp == chanFull { for p.getp != chanHasRecv && !p.close {
p.cond.Wait(&p.mutex) p.cond.Wait(&p.mutex)
} }
if p.close { if p.close {
p.mutex.Unlock() p.mutex.Unlock()
return false return false
} }
p.data = v if p.data != nil {
p.getp = chanFull c.Memcpy(p.data, v, uintptr(eltSize))
}
p.getp = chanNoSendRecv
} else { } else {
for p.len == n { for p.len == n {
p.cond.Wait(&p.mutex) p.cond.Wait(&p.mutex)
@@ -116,48 +130,50 @@ func ChanSend(p *Chan, v unsafe.Pointer, eltSize int) bool {
c.Memcpy(c.Advance(p.data, off*eltSize), v, uintptr(eltSize)) c.Memcpy(c.Advance(p.data, off*eltSize), v, uintptr(eltSize))
p.len++ p.len++
} }
notifyOps(p)
p.mutex.Unlock() p.mutex.Unlock()
p.cond.Broadcast() p.cond.Broadcast()
return true return true
} }
func ChanTryRecv(p *Chan, v unsafe.Pointer, eltSize int) bool { func ChanTryRecv(p *Chan, v unsafe.Pointer, eltSize int) (recvOK bool, tryOK bool) {
n := p.cap n := p.cap
p.mutex.Lock() p.mutex.Lock()
if n == 0 { if n == 0 {
if p.getp == 0 { tryOK = p.close
p.mutex.Unlock() p.mutex.Unlock()
return false return
}
c.Memcpy(v, p.data, uintptr(eltSize))
p.getp = 0
} else { } else {
if p.len == 0 { if p.len == 0 {
tryOK = p.close
p.mutex.Unlock() p.mutex.Unlock()
return false return
} }
if v != nil {
c.Memcpy(v, c.Advance(p.data, p.getp*eltSize), uintptr(eltSize)) c.Memcpy(v, c.Advance(p.data, p.getp*eltSize), uintptr(eltSize))
}
p.getp = (p.getp + 1) % n p.getp = (p.getp + 1) % n
p.len-- p.len--
} }
notifyOps(p)
p.mutex.Unlock() p.mutex.Unlock()
p.cond.Broadcast() p.cond.Broadcast()
return true return true, true
} }
func ChanRecv(p *Chan, v unsafe.Pointer, eltSize int) bool { func ChanRecv(p *Chan, v unsafe.Pointer, eltSize int) (recvOK bool) {
n := p.cap n := p.cap
p.mutex.Lock() p.mutex.Lock()
if n == 0 { if n == 0 {
for p.getp == 0 { for p.getp == chanHasRecv && !p.close {
p.cond.Wait(&p.mutex)
}
if p.close { if p.close {
p.mutex.Unlock() p.mutex.Unlock()
return false return false
} }
p.cond.Wait(&p.mutex) p.getp = chanHasRecv
} p.data = v
c.Memcpy(v, p.data, uintptr(eltSize))
p.getp = 0
} else { } else {
for p.len == 0 { for p.len == 0 {
if p.close { if p.close {
@@ -166,13 +182,124 @@ func ChanRecv(p *Chan, v unsafe.Pointer, eltSize int) bool {
} }
p.cond.Wait(&p.mutex) p.cond.Wait(&p.mutex)
} }
if v != nil {
c.Memcpy(v, c.Advance(p.data, p.getp*eltSize), uintptr(eltSize)) c.Memcpy(v, c.Advance(p.data, p.getp*eltSize), uintptr(eltSize))
}
p.getp = (p.getp + 1) % n p.getp = (p.getp + 1) % n
p.len-- p.len--
} }
notifyOps(p)
p.mutex.Unlock() p.mutex.Unlock()
p.cond.Broadcast() p.cond.Broadcast()
return true if n == 0 {
p.mutex.Lock()
for p.getp == chanHasRecv && !p.close {
p.cond.Wait(&p.mutex)
}
recvOK = !p.close
p.mutex.Unlock()
} else {
recvOK = true
}
return
}
// -----------------------------------------------------------------------------
type selectOp struct {
mutex sync.Mutex
cond sync.Cond
next *selectOp
sem bool
}
func (p *selectOp) init() {
p.mutex.Init(nil)
p.cond.Init(nil)
p.next = nil
p.sem = false
}
func (p *selectOp) notify() {
p.mutex.Lock()
p.sem = true
p.mutex.Unlock()
p.cond.Signal()
}
func (p *selectOp) wait() {
p.mutex.Lock()
if !p.sem {
p.cond.Wait(&p.mutex)
}
p.sem = false
p.mutex.Unlock()
}
// ChanOp represents a channel operation.
type ChanOp struct {
C *Chan
Val unsafe.Pointer
Size int32
Send bool
}
// TrySelect executes a non-blocking select operation.
func TrySelect(ops ...ChanOp) (isel int, recvOK, tryOK bool) {
for isel = range ops {
op := ops[isel]
if op.Send {
if tryOK = ChanTrySend(op.C, op.Val, int(op.Size)); tryOK {
return
}
} else {
if recvOK, tryOK = ChanTryRecv(op.C, op.Val, int(op.Size)); tryOK {
return
}
}
}
return
}
// Select executes a blocking select operation.
func Select(ops ...ChanOp) (isel int, recvOK bool) {
selOp := new(selectOp) // TODO(xsw): use c.AllocaNew[selectOp]()
selOp.init()
for _, op := range ops {
prepareSelect(op.C, selOp)
}
var tryOK bool
for {
if isel, recvOK, tryOK = TrySelect(ops...); tryOK {
break
}
selOp.wait()
}
for _, op := range ops {
endSelect(op.C, selOp)
}
return
}
func prepareSelect(c *Chan, selOp *selectOp) {
c.mutex.Lock()
selOp.next = c.sops
c.sops = selOp
c.mutex.Unlock()
}
func endSelect(c *Chan, selOp *selectOp) {
c.mutex.Lock()
pp := &c.sops
for *pp != selOp {
pp = &(*pp).next
}
*pp = selOp.next
c.mutex.Unlock()
selOp.next = nil
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------