v / vlib / goroutines / chan.v
285 lines · 251 sloc · 6.43 KB · dd253e564832e467f50455df879b16ce32298df6
Raw
1// Copyright (c) 2019-2024 Alexander Medvednikov. All rights reserved.
2// Use of this source code is governed by an MIT license
3// that can be found in the LICENSE file.
4//
5// Channel implementation for goroutines.
6// Translated from Go's runtime/chan.go.
7//
8// Channels provide goroutine-safe communication between goroutines.
9// They support both buffered and unbuffered modes.
10//
11// Key operations translated from Go:
12// - makechan() -> chan_make()
13// - chansend() -> chan_send()
14// - chanrecv() -> chan_recv()
15// - closechan() -> chan_close()
16module goroutines
17
18// Chan is a goroutine-safe channel for communication between goroutines.
19// Translated from Go's hchan struct in chan.go.
20pub struct Chan {
21pub mut:
22 mu SpinLock // protects all fields (spinlock is ucontext-safe)
23 qcount u32 // total data in the queue
24 dataqsiz u32 // size of the circular buffer
25 buf voidptr // circular buffer for buffered channels
26 elemsize u16 // size of each element
27 closed bool // true if channel is closed
28
29 sendx u32 // send index into circular buffer
30 recvx u32 // receive index into circular buffer
31
32 recvq WaitQ // list of recv waiters
33 sendq WaitQ // list of send waiters
34}
35
36// chan_make creates a new channel.
37// If buf_size > 0, creates a buffered channel.
38// Translated from Go's makechan() in chan.go.
39pub fn chan_make(elem_size int, buf_size int) &Chan {
40 mut c := &Chan{
41 elemsize: u16(elem_size)
42 dataqsiz: u32(buf_size)
43 }
44 if buf_size > 0 {
45 c.buf = unsafe { malloc(elem_size * buf_size) }
46 }
47 return c
48}
49
50// chan_send sends a value on the channel.
51// If block is true, blocks until the send can proceed.
52// Returns true if the value was sent.
53// Translated from Go's chansend() in chan.go.
54pub fn chan_send(c &Chan, ep voidptr, block bool) bool {
55 if c == unsafe { nil } {
56 if !block {
57 return false
58 }
59 // Block forever on nil channel (Go behavior)
60 gopark('chan send (nil chan)')
61 return false // unreachable
62 }
63
64 mut ch := unsafe { c }
65 ch.mu.acquire()
66
67 if ch.closed {
68 ch.mu.release()
69 panic('send on closed channel')
70 }
71
72 // Fast path: try to find a waiting receiver
73 sg := ch.recvq.dequeue()
74 if sg != unsafe { nil } {
75 // Found a waiting receiver - send directly
76 ch.mu.release()
77 send_direct(sg, ep, ch.elemsize)
78 return true
79 }
80
81 // Buffered channel with space available
82 if ch.qcount < ch.dataqsiz {
83 // Put data in buffer
84 dst := chan_buf(ch, ch.sendx)
85 unsafe { C.memcpy(dst, ep, ch.elemsize) }
86 ch.sendx++
87 if ch.sendx == ch.dataqsiz {
88 ch.sendx = 0
89 }
90 ch.qcount++
91 ch.mu.release()
92 return true
93 }
94
95 if !block {
96 ch.mu.release()
97 return false
98 }
99
100 // Block: enqueue ourselves on the send wait queue
101 gp := get_current_g()
102 mut mysg := &Sudog{
103 g: unsafe { gp }
104 elem: ep
105 c: voidptr(ch)
106 }
107 ch.sendq.enqueue(mysg)
108 ch.mu.release()
109
110 // Park the goroutine until a receiver wakes us
111 gopark('chan send')
112
113 return true
114}
115
116// chan_recv receives a value from the channel.
117// If block is true, blocks until a value is available.
118// Returns (received, ok). ok is false if channel is closed and empty.
119// Translated from Go's chanrecv() in chan.go.
120pub fn chan_recv(c &Chan, ep voidptr, block bool) (bool, bool) {
121 if c == unsafe { nil } {
122 if !block {
123 return false, false
124 }
125 gopark('chan receive (nil chan)')
126 return false, false // unreachable
127 }
128
129 mut ch := unsafe { c }
130 ch.mu.acquire()
131
132 // Fast path: try to find a waiting sender
133 sg := ch.sendq.dequeue()
134 if sg != unsafe { nil } {
135 ch.mu.release()
136 recv_direct(ch, sg, ep)
137 return true, true
138 }
139
140 // Buffered channel with data available
141 if ch.qcount > 0 {
142 src := chan_buf(ch, ch.recvx)
143 if ep != unsafe { nil } {
144 unsafe { C.memcpy(ep, src, ch.elemsize) }
145 }
146 ch.recvx++
147 if ch.recvx == ch.dataqsiz {
148 ch.recvx = 0
149 }
150 ch.qcount--
151 ch.mu.release()
152 return true, true
153 }
154
155 if ch.closed {
156 ch.mu.release()
157 if ep != unsafe { nil } {
158 unsafe { C.memset(ep, 0, ch.elemsize) }
159 }
160 return true, false
161 }
162
163 if !block {
164 ch.mu.release()
165 return false, false
166 }
167
168 // Block: enqueue ourselves on the recv wait queue
169 gp := get_current_g()
170 mut mysg := &Sudog{
171 g: unsafe { gp }
172 elem: ep
173 c: voidptr(ch)
174 }
175 ch.recvq.enqueue(mysg)
176 ch.mu.release()
177
178 // Park until a sender wakes us
179 gopark('chan receive')
180
181 return true, true
182}
183
184// chan_close closes the channel.
185// Translated from Go's closechan() in chan.go.
186pub fn chan_close(c &Chan) {
187 if c == unsafe { nil } {
188 panic('close of nil channel')
189 }
190
191 mut ch := unsafe { c }
192 ch.mu.acquire()
193
194 if ch.closed {
195 ch.mu.release()
196 panic('close of closed channel')
197 }
198
199 ch.closed = true
200
201 // Wake all waiting receivers
202 for {
203 mut sg := ch.recvq.dequeue()
204 if sg == unsafe { nil } {
205 break
206 }
207 if sg.elem != unsafe { nil } {
208 unsafe { C.memset(sg.elem, 0, ch.elemsize) }
209 }
210 sg.success = false
211 goready(sg.g)
212 }
213
214 // Wake all waiting senders (they will panic)
215 for {
216 mut sg := ch.sendq.dequeue()
217 if sg == unsafe { nil } {
218 break
219 }
220 sg.success = false
221 goready(sg.g)
222 }
223
224 ch.mu.release()
225}
226
227// send_direct sends data directly from sender to a waiting receiver.
228// Translated from Go's send() in chan.go.
229fn send_direct(sg &Sudog, ep voidptr, elem_size u16) {
230 if sg.elem != unsafe { nil } {
231 unsafe { C.memcpy(sg.elem, ep, elem_size) }
232 }
233 mut s := unsafe { sg }
234 s.success = true
235 goready(sg.g)
236}
237
238// recv_direct receives data directly from a waiting sender.
239fn recv_direct(ch &Chan, sg &Sudog, ep voidptr) {
240 if ch.dataqsiz == 0 {
241 // Unbuffered: copy directly from sender
242 if ep != unsafe { nil } {
243 unsafe { C.memcpy(ep, sg.elem, ch.elemsize) }
244 }
245 } else {
246 // Buffered: take from buffer, then copy sender's data into buffer
247 buf_elem := chan_buf(ch, unsafe { ch }.recvx)
248 if ep != unsafe { nil } {
249 unsafe { C.memcpy(ep, buf_elem, ch.elemsize) }
250 }
251 unsafe { C.memcpy(buf_elem, sg.elem, ch.elemsize) }
252 unsafe {
253 ch.recvx++
254 if ch.recvx == ch.dataqsiz {
255 ch.recvx = 0
256 }
257 ch.sendx = ch.recvx
258 }
259 }
260 mut s := unsafe { sg }
261 s.success = true
262 goready(sg.g)
263}
264
265// chan_buf returns a pointer to the i-th slot in the buffer.
266// Translated from Go's chanbuf() in chan.go.
267fn chan_buf(c &Chan, i u32) voidptr {
268 return unsafe { voidptr(usize(c.buf) + usize(i) * usize(c.elemsize)) }
269}
270
271// chan_len returns the number of elements in the channel buffer.
272pub fn chan_len(c &Chan) int {
273 if c == unsafe { nil } {
274 return 0
275 }
276 return int(c.qcount)
277}
278
279// chan_cap returns the capacity of the channel buffer.
280pub fn chan_cap(c &Chan) int {
281 if c == unsafe { nil } {
282 return 0
283 }
284 return int(c.dataqsiz)
285}
286