v / vlib / sync / channels.c.v
699 lines · 671 sloc · 18.25 KB · 254e6ceaaeb06919ac3ff3cdb8d2406adccf20d2
Raw
1module sync
2
3import time
4import rand
5
6// how often to try to get data without blocking before to wait for semaphore
7const spinloops = u32(750)
8const spinloops_sem = u32(4000)
9
10enum BufferElemStat {
11 unused = 0
12 writing
13 written
14 reading
15}
16
17struct Subscription {
18mut:
19 sem &Semaphore = unsafe { nil }
20 prev &&Subscription = unsafe { nil }
21 nxt &Subscription = unsafe { nil }
22}
23
24// append_subscription keeps select waiters in FIFO order, so the oldest waiter
25// is woken first when a channel becomes ready.
26fn append_subscription(head &&Subscription, sub &Subscription) {
27 unsafe {
28 mut link := head
29 for *link != 0 {
30 link = &(*link).nxt
31 }
32 sub.prev = link
33 sub.nxt = nil
34 *link = sub
35 }
36}
37
38pub enum Direction {
39 pop
40 push
41}
42
43@[typedef]
44pub struct C.atomic_uintptr_t {}
45
46pub struct Channel {
47 ringbuf &u8 = unsafe { nil } // queue for buffered channels
48 statusbuf &u8 = unsafe { nil } // flags to synchronize write/read in ringbuf
49 objsize u32
50mut:
51 // atomic
52 writesem Semaphore // to wake thread that wanted to write, but buffer was full
53 readsem Semaphore // to wake thread that wanted to read, but buffer was empty
54 writesem_im Semaphore
55 readsem_im Semaphore
56 write_adr C.atomic_uintptr_t // if != NULL the next obj can be written here without wait
57 read_adr C.atomic_uintptr_t // if != NULL an obj can be read from here without wait
58 adr_read C.atomic_uintptr_t // used to identify origin of writesem
59 adr_written C.atomic_uintptr_t // used to identify origin of readsem
60 write_free u32 // for queue state
61 read_avail u32
62 buf_elem_write_idx u32
63 buf_elem_read_idx u32
64 // for select
65 write_subscriber &Subscription = unsafe { nil }
66 read_subscriber &Subscription = unsafe { nil }
67 write_sub_mtx &SpinLock
68 read_sub_mtx &SpinLock
69 closed u16
70 close_err IError = none
71pub:
72 cap u32 // queue length in #objects
73}
74
75pub fn new_channel[T](n u32) &Channel {
76 st := if sizeof(T) > 0 { sizeof(T) } else { 1 }
77 if isreftype(T) {
78 return new_channel_st(n, st)
79 } else {
80 return new_channel_st_noscan(n, st)
81 }
82}
83
84fn new_channel_st(n u32, st u32) &Channel {
85 wsem := if n > 0 { n } else { 1 }
86 rsem := if n > 0 { u32(0) } else { 1 }
87 rbuf := if n > 0 { unsafe { malloc(int(n * st)) } } else { &u8(unsafe { nil }) }
88 sbuf := if n > 0 { vcalloc_noscan(int(n * 2)) } else { &u8(unsafe { nil }) }
89 mut ch := Channel{
90 objsize: st
91 cap: n
92 write_free: n
93 read_avail: 0
94 ringbuf: rbuf
95 statusbuf: sbuf
96 write_subscriber: unsafe { nil }
97 read_subscriber: unsafe { nil }
98 write_sub_mtx: new_spin_lock()
99 read_sub_mtx: new_spin_lock()
100 }
101 ch.writesem.init(wsem)
102 ch.readsem.init(rsem)
103 ch.writesem_im.init(0)
104 ch.readsem_im.init(0)
105 return &ch
106}
107
108fn new_channel_st_noscan(n u32, st u32) &Channel {
109 $if gcboehm_opt ? {
110 wsem := if n > 0 { n } else { 1 }
111 rsem := if n > 0 { u32(0) } else { 1 }
112 rbuf := if n > 0 { unsafe { malloc_noscan(int(n * st)) } } else { &u8(unsafe { nil }) }
113 sbuf := if n > 0 { vcalloc_noscan(int(n * 2)) } else { &u8(unsafe { nil }) }
114 mut ch := Channel{
115 objsize: st
116 cap: n
117 write_free: n
118 read_avail: 0
119 ringbuf: rbuf
120 statusbuf: sbuf
121 write_subscriber: unsafe { nil }
122 read_subscriber: unsafe { nil }
123 write_sub_mtx: new_spin_lock()
124 read_sub_mtx: new_spin_lock()
125 }
126 ch.writesem.init(wsem)
127 ch.readsem.init(rsem)
128 ch.writesem_im.init(0)
129 ch.readsem_im.init(0)
130 return &ch
131 } $else {
132 return new_channel_st(n, st)
133 }
134}
135
136// close closes the channel and optionally stores an error that will be
137// returned by receive operations that use `or {}` or `?` after the
138// buffered values have been drained.
139pub fn (mut ch Channel) close(errs ...IError) {
140 open_val := u16(0)
141 if !C.atomic_compare_exchange_strong_u16(&ch.closed, &open_val, 1) {
142 return
143 }
144 if errs.len > 0 {
145 ch.close_err = errs[0]
146 }
147 mut nulladr := unsafe { nil }
148 for !C.atomic_compare_exchange_weak_ptr(voidptr(&ch.adr_written), voidptr(&nulladr), isize(-1)) {
149 nulladr = unsafe { nil }
150 }
151 ch.readsem_im.post()
152 ch.readsem.post()
153 ch.read_sub_mtx.lock()
154 if ch.read_subscriber != unsafe { nil } {
155 ch.read_subscriber.sem.post()
156 }
157 ch.read_sub_mtx.unlock()
158 ch.write_sub_mtx.lock()
159 if ch.write_subscriber != unsafe { nil } {
160 ch.write_subscriber.sem.post()
161 }
162 ch.write_sub_mtx.unlock()
163 ch.writesem.post()
164 if ch.cap == 0 {
165 C.atomic_store_ptr(unsafe { &voidptr(&ch.read_adr) }, unsafe { nil })
166 }
167 ch.writesem_im.post()
168
169 // Do not destroy `read_sub_mtx` and `write_sub_mtx` here,
170 // because we can read from a closed channel later.
171}
172
173@[inline]
174fn (ch &Channel) closed_error() IError {
175 if ch.close_err !is None__ {
176 return ch.close_err
177 }
178 return error('channel closed')
179}
180
181@[inline]
182pub fn (mut ch Channel) len() int {
183 return int(C.atomic_load_u32(&ch.read_avail))
184}
185
186@[inline]
187pub fn (mut ch Channel) closed() bool {
188 return C.atomic_load_u16(&ch.closed) != 0
189}
190
191@[inline]
192pub fn (mut ch Channel) push(src voidptr) {
193 if ch.try_push_priv(src, false) == .closed {
194 panic('push on closed channel')
195 }
196}
197
198@[inline]
199pub fn (mut ch Channel) try_push(src voidptr) ChanState {
200 return ch.try_push_priv(src, true)
201}
202
203fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) ChanState {
204 if C.atomic_load_u16(&ch.closed) != 0 {
205 return .closed
206 }
207 spinloops_sem_, spinloops_ := if no_block { u32(1), u32(1) } else { spinloops, spinloops_sem }
208 mut have_swapped := false
209 for {
210 mut got_sem := false
211 mut wradr := C.atomic_load_ptr(unsafe { &voidptr(&ch.write_adr) })
212 for wradr != C.NULL {
213 if C.atomic_compare_exchange_strong_ptr(voidptr(&ch.write_adr), voidptr(&wradr),
214 isize(0))
215 {
216 // there is a reader waiting for us
217 unsafe { C.memcpy(wradr, src, ch.objsize) }
218 mut nulladr := unsafe { nil }
219 for !C.atomic_compare_exchange_weak_ptr(voidptr(&ch.adr_written),
220 voidptr(&nulladr), isize(wradr)) {
221 nulladr = unsafe { nil }
222 }
223 ch.readsem_im.post()
224 return .success
225 }
226 }
227 if no_block && ch.cap == 0 {
228 return .not_ready
229 }
230 // get token to read
231 for _ in 0 .. spinloops_sem_ {
232 if got_sem {
233 break
234 }
235 got_sem = ch.writesem.try_wait()
236 }
237 if !got_sem {
238 if no_block {
239 return .not_ready
240 }
241 ch.writesem.wait()
242 }
243 if C.atomic_load_u16(&ch.closed) != 0 {
244 ch.writesem.post()
245 return .closed
246 }
247 if ch.cap == 0 {
248 // try to advertise current object as readable
249 mut read_in_progress := false
250 C.atomic_store_ptr(unsafe { &voidptr(&ch.read_adr) }, src)
251 wradr = C.atomic_load_ptr(unsafe { &voidptr(&ch.write_adr) })
252 if wradr != C.NULL {
253 mut src2 := src
254 if C.atomic_compare_exchange_strong_ptr(voidptr(&ch.read_adr), voidptr(&src2),
255 isize(0))
256 {
257 ch.writesem.post()
258 continue
259 } else {
260 read_in_progress = true
261 }
262 }
263 if !read_in_progress {
264 ch.read_sub_mtx.lock()
265 if ch.read_subscriber != unsafe { nil } {
266 ch.read_subscriber.sem.post()
267 }
268 ch.read_sub_mtx.unlock()
269 }
270 mut src2 := src
271 for sp := u32(0); sp < spinloops_ || read_in_progress; sp++ {
272 if C.atomic_compare_exchange_strong_ptr(voidptr(&ch.adr_read), voidptr(&src2),
273 isize(0))
274 {
275 have_swapped = true
276 read_in_progress = true
277 break
278 }
279 src2 = src
280 }
281 mut got_im_sem := false
282 for sp := u32(0); sp < spinloops_sem_ || read_in_progress; sp++ {
283 got_im_sem = ch.writesem_im.try_wait()
284 if got_im_sem {
285 break
286 }
287 }
288 for {
289 if got_im_sem {
290 got_im_sem = false
291 } else {
292 ch.writesem_im.wait()
293 }
294 if C.atomic_load_u16(&ch.closed) != 0 {
295 if have_swapped
296 || C.atomic_compare_exchange_strong_ptr(voidptr(&ch.adr_read), voidptr(&src2), isize(0)) {
297 ch.writesem.post()
298 return .success
299 } else {
300 return .closed
301 }
302 }
303 if have_swapped
304 || C.atomic_compare_exchange_strong_ptr(voidptr(&ch.adr_read), voidptr(&src2), isize(0)) {
305 ch.writesem.post()
306 break
307 } else {
308 // this semaphore was not for us - repost in
309 ch.writesem_im.post()
310 if src2 == voidptr(-1) {
311 ch.readsem.post()
312 return .closed
313 }
314 src2 = src
315 }
316 }
317 return .success
318 } else {
319 // buffered channel
320 mut space_in_queue := false
321 mut wr_free := C.atomic_load_u32(&ch.write_free)
322 for wr_free > 0 {
323 space_in_queue = C.atomic_compare_exchange_weak_u32(&ch.write_free, &wr_free,
324 wr_free - 1)
325 if space_in_queue {
326 break
327 }
328 }
329 if space_in_queue {
330 mut wr_idx := C.atomic_load_u32(&ch.buf_elem_write_idx)
331 for {
332 mut new_wr_idx := wr_idx + 1
333 for new_wr_idx >= ch.cap {
334 new_wr_idx -= ch.cap
335 }
336 if C.atomic_compare_exchange_strong_u32(&ch.buf_elem_write_idx, &wr_idx,
337 new_wr_idx)
338 {
339 break
340 }
341 }
342 mut wr_ptr := ch.ringbuf
343 mut status_adr := ch.statusbuf
344 unsafe {
345 wr_ptr += (wr_idx * ch.objsize)
346 status_adr += wr_idx * sizeof(u16)
347 }
348 mut expected_status := u16(BufferElemStat.unused)
349 for !C.atomic_compare_exchange_weak_u16(status_adr, &expected_status,
350 u16(BufferElemStat.writing)) {
351 expected_status = u16(BufferElemStat.unused)
352 }
353 unsafe {
354 C.memcpy(wr_ptr, src, ch.objsize)
355 }
356 C.atomic_store_u16(unsafe { &u16(status_adr) }, u16(BufferElemStat.written))
357 C.atomic_fetch_add_u32(voidptr(&ch.read_avail), 1)
358 ch.readsem.post()
359 ch.read_sub_mtx.lock()
360 if ch.read_subscriber != unsafe { nil } {
361 ch.read_subscriber.sem.post()
362 }
363 ch.read_sub_mtx.unlock()
364 return .success
365 } else {
366 if no_block {
367 return .not_ready
368 }
369 ch.writesem.post()
370 }
371 }
372 }
373 // we should not get here but the V compiler want's to see a return statement
374 panic('unknown `try_push_priv` state')
375}
376
377@[inline]
378pub fn (mut ch Channel) pop(dest voidptr) bool {
379 return ch.try_pop_priv(dest, false) == .success
380}
381
382// try_pop returns `.success` if an object is popped without blocking.
383// Pass the destination as `mut`: `ch.try_pop(mut value)`, not `ch.try_pop(&value)`.
384@[inline]
385pub fn (mut ch Channel) try_pop(dest voidptr) ChanState {
386 return ch.try_pop_priv(dest, true)
387}
388
389// try_pop_select_priv treats already closed channels as unavailable for non-blocking `select ... else`.
390@[inline]
391fn (mut ch Channel) try_pop_select_priv(dest voidptr) ChanState {
392 if C.atomic_load_u16(&ch.closed) != 0 {
393 return .closed
394 }
395 return ch.try_pop_priv(dest, true)
396}
397
398fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) ChanState {
399 spinloops_sem_, spinloops_ := if no_block { u32(1), u32(1) } else { spinloops, spinloops_sem }
400 mut have_swapped := false
401 mut write_in_progress := false
402 for {
403 mut got_sem := false
404 if ch.cap == 0 {
405 // unbuffered channel - first see if a `push()` has adversized
406 mut rdadr := C.atomic_load_ptr(unsafe { &voidptr(&ch.read_adr) })
407 for rdadr != C.NULL {
408 if C.atomic_compare_exchange_strong_ptr(voidptr(&ch.read_adr), voidptr(&rdadr),
409 isize(0))
410 {
411 // there is a writer waiting for us
412 unsafe { C.memcpy(dest, rdadr, ch.objsize) }
413 mut nulladr := unsafe { nil }
414 for !C.atomic_compare_exchange_weak_ptr(voidptr(&ch.adr_read),
415 voidptr(&nulladr), isize(rdadr)) {
416 nulladr = unsafe { nil }
417 }
418 ch.writesem_im.post()
419 return .success
420 }
421 }
422 if no_block {
423 if C.atomic_load_u16(&ch.closed) == 0 {
424 return .not_ready
425 } else {
426 return .closed
427 }
428 }
429 }
430 // get token to read
431 for _ in 0 .. spinloops_sem_ {
432 if got_sem {
433 break
434 }
435 got_sem = ch.readsem.try_wait()
436 }
437 if !got_sem {
438 if no_block {
439 if C.atomic_load_u16(&ch.closed) == 0 {
440 return .not_ready
441 } else {
442 return .closed
443 }
444 }
445 ch.readsem.wait()
446 }
447 if ch.cap > 0 {
448 // try to get buffer token
449 mut obj_in_queue := false
450 mut rd_avail := C.atomic_load_u32(&ch.read_avail)
451 for rd_avail > 0 {
452 obj_in_queue = C.atomic_compare_exchange_weak_u32(&ch.read_avail, &rd_avail,
453 rd_avail - 1)
454 if obj_in_queue {
455 break
456 }
457 }
458 if obj_in_queue {
459 mut rd_idx := C.atomic_load_u32(&ch.buf_elem_read_idx)
460 for {
461 mut new_rd_idx := rd_idx + 1
462 for new_rd_idx >= ch.cap {
463 new_rd_idx -= ch.cap
464 }
465 if C.atomic_compare_exchange_weak_u32(&ch.buf_elem_read_idx, &rd_idx,
466 new_rd_idx)
467 {
468 break
469 }
470 }
471 mut rd_ptr := ch.ringbuf
472 mut status_adr := ch.statusbuf
473 unsafe {
474 rd_ptr += rd_idx * ch.objsize
475 status_adr += rd_idx * sizeof(u16)
476 }
477 mut expected_status := u16(BufferElemStat.written)
478 for !C.atomic_compare_exchange_weak_u16(status_adr, &expected_status,
479 u16(BufferElemStat.reading)) {
480 expected_status = u16(BufferElemStat.written)
481 }
482 unsafe {
483 C.memcpy(dest, rd_ptr, ch.objsize)
484 }
485 C.atomic_store_u16(unsafe { &u16(status_adr) }, u16(BufferElemStat.unused))
486 C.atomic_fetch_add_u32(voidptr(&ch.write_free), 1)
487 ch.writesem.post()
488 ch.write_sub_mtx.lock()
489 if ch.write_subscriber != unsafe { nil } {
490 ch.write_subscriber.sem.post()
491 }
492 ch.write_sub_mtx.unlock()
493 return .success
494 }
495 }
496 // try to advertise `dest` as writable
497 C.atomic_store_ptr(unsafe { &voidptr(&ch.write_adr) }, dest)
498 if ch.cap == 0 {
499 mut rdadr := C.atomic_load_ptr(unsafe { &voidptr(&ch.read_adr) })
500 if rdadr != C.NULL {
501 mut dest2 := dest
502 if C.atomic_compare_exchange_strong_ptr(voidptr(&ch.write_adr), voidptr(&dest2),
503 isize(0))
504 {
505 ch.readsem.post()
506 continue
507 } else {
508 write_in_progress = true
509 }
510 }
511 }
512 if ch.cap == 0 && !write_in_progress {
513 ch.write_sub_mtx.lock()
514 if ch.write_subscriber != unsafe { nil } {
515 ch.write_subscriber.sem.post()
516 }
517 ch.write_sub_mtx.unlock()
518 }
519 mut dest2 := dest
520 for sp := u32(0); sp < spinloops_ || write_in_progress; sp++ {
521 if C.atomic_compare_exchange_strong_ptr(voidptr(&ch.adr_written), voidptr(&dest2),
522 isize(0))
523 {
524 have_swapped = true
525 break
526 } else if dest2 == voidptr(-1) {
527 ch.readsem.post()
528 return .closed
529 }
530 dest2 = dest
531 }
532 mut got_im_sem := false
533 for sp := u32(0); sp < spinloops_sem_ || write_in_progress; sp++ {
534 got_im_sem = ch.readsem_im.try_wait()
535 if got_im_sem {
536 break
537 }
538 }
539 for {
540 if got_im_sem {
541 got_im_sem = false
542 } else {
543 ch.readsem_im.wait()
544 }
545 if have_swapped
546 || C.atomic_compare_exchange_strong_ptr(voidptr(&ch.adr_written), voidptr(&dest2), isize(0)) {
547 ch.readsem.post()
548 break
549 } else {
550 // this semaphore was not for us - repost in
551 ch.readsem_im.post()
552 if dest2 == voidptr(-1) {
553 ch.readsem.post()
554 return .closed
555 }
556 dest2 = dest
557 }
558 }
559 break
560 }
561 return .success
562}
563
564// channel_select waits `timeout` on any of `channels[i]` until one of them can
565// push (`dir[i] == .push`) or pop (`dir[i] == .pop`) the object referenced by
566// `objrefs[i]`. `timeout = time.infinite` means wait unlimited time.
567// `timeout <= 0` means return immediately if no transaction can be performed
568// without waiting. It returns the selected channel index, `-1` on timeout, and
569// `-2` when all channels are closed.
570pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []voidptr, timeout time.Duration) int {
571 skip_closed_pop := timeout < 0
572 actual_timeout := if skip_closed_pop { time.Duration(0) } else { timeout }
573 closed_pop_mode := if skip_closed_pop {
574 SelectClosedPopMode.skip
575 } else {
576 SelectClosedPopMode.closed
577 }
578 return channel_select_priv(mut channels, dir, mut objrefs, actual_timeout, closed_pop_mode)
579}
580
581enum SelectClosedPopMode {
582 closed
583 ready
584 skip
585}
586
587// channel_select_lang is used by the language `select` implementation.
588// Closed receive cases stay selectable for blocking/timed selects to match
589// plain `<-ch` semantics, while `select ... else` still skips them.
590fn channel_select_lang(mut channels []&Channel, dir []Direction, mut objrefs []voidptr, timeout time.Duration) int {
591 skip_closed_pop := timeout < 0
592 actual_timeout := if skip_closed_pop { time.Duration(0) } else { timeout }
593 closed_pop_mode := if skip_closed_pop {
594 SelectClosedPopMode.skip
595 } else {
596 SelectClosedPopMode.ready
597 }
598 return channel_select_priv(mut channels, dir, mut objrefs, actual_timeout, closed_pop_mode)
599}
600
601fn channel_select_priv(mut channels []&Channel, dir []Direction, mut objrefs []voidptr, timeout time.Duration, closed_pop_mode SelectClosedPopMode) int {
602 $if debug_channels ? {
603 assert channels.len == dir.len
604 assert dir.len == objrefs.len
605 }
606 mut subscr := []Subscription{len: channels.len}
607 mut sem := unsafe { Semaphore{} }
608 sem.init(0)
609 for i, ch in channels {
610 subscr[i].sem = unsafe { &sem }
611 sub_mtx, subscriber := if dir[i] == .push {
612 ch.write_sub_mtx, &ch.write_subscriber
613 } else {
614 ch.read_sub_mtx, &ch.read_subscriber
615 }
616 sub_mtx.lock()
617 unsafe {
618 append_subscription(subscriber, &subscr[i])
619 }
620 sub_mtx.unlock()
621 }
622 stopwatch := if timeout == time.infinite || timeout <= 0 {
623 time.StopWatch{}
624 } else {
625 time.new_stopwatch()
626 }
627 mut event_idx := -1 // negative index means `timed out`
628
629 outer: for {
630 rnd := rand.intn(channels.len) or { 0 }
631 mut num_closed := 0
632 mut ready_closed_idx := -1
633 for j, _ in channels {
634 mut i := j + rnd
635 if i >= channels.len {
636 i -= channels.len
637 }
638 stat := if dir[i] == .push {
639 channels[i].try_push_priv(objrefs[i], true)
640 } else if closed_pop_mode == .skip {
641 channels[i].try_pop_select_priv(objrefs[i])
642 } else {
643 channels[i].try_pop_priv(objrefs[i], true)
644 }
645 if stat == .success {
646 event_idx = i
647 break outer
648 } else if stat == .closed && dir[i] == .pop && closed_pop_mode == .ready {
649 unsafe {
650 C.memset(objrefs[i], 0, channels[i].objsize)
651 }
652 if ready_closed_idx == -1 {
653 ready_closed_idx = i
654 }
655 num_closed++
656 } else if stat == .closed {
657 num_closed++
658 }
659 }
660 if ready_closed_idx >= 0 {
661 event_idx = ready_closed_idx
662 break outer
663 }
664 if num_closed == channels.len {
665 event_idx = -2
666 break outer
667 }
668 if timeout <= 0 {
669 break outer
670 }
671 if timeout != time.infinite {
672 remaining := timeout - stopwatch.elapsed()
673 if !sem.timed_wait(remaining) {
674 break outer
675 }
676 } else {
677 sem.wait()
678 }
679 }
680 // reset subscribers
681 for i, ch in channels {
682 sub_mtx := if dir[i] == .push {
683 ch.write_sub_mtx
684 } else {
685 ch.read_sub_mtx
686 }
687 sub_mtx.lock()
688 unsafe {
689 *subscr[i].prev = subscr[i].nxt
690 }
691 if unsafe { subscr[i].nxt != 0 } {
692 subscr[i].nxt.prev = subscr[i].prev
693 subscr[i].nxt.sem.post()
694 }
695 sub_mtx.unlock()
696 }
697 sem.destroy()
698 return event_idx
699}
700