v / vlib / sync / channel_select_fifo_test.v
65 lines · 60 sloc · 1.49 KB · a79e6920cc5df5843453f3c9d258d8224acb4f26
Raw
1module sync
2
3import time
4
5fn wait_select_once(ch &Channel, done chan string, label string) {
6 mut channels := [ch]
7 directions := [Direction.pop]
8 mut value := 0
9 mut objs := [voidptr(&value)]
10 idx := channel_select(mut channels, directions, mut objs, time.infinite)
11 match idx {
12 0 {
13 done <- '${label}:${value}'
14 }
15 -2 {
16 done <- '${label}:closed'
17 }
18 else {
19 done <- '${label}:idx=${idx}'
20 }
21 }
22}
23
24fn pop_subscriber_count(ch &Channel) int {
25 ch.read_sub_mtx.lock()
26 defer {
27 ch.read_sub_mtx.unlock()
28 }
29 mut count := 0
30 mut node := ch.read_subscriber
31 for unsafe { node != 0 } {
32 count++
33 node = node.nxt
34 }
35 return count
36}
37
38fn wait_for_pop_subscribers(ch &Channel, want int) {
39 for _ in 0 .. 200 {
40 if pop_subscriber_count(ch) == want {
41 return
42 }
43 time.sleep(5 * time.millisecond)
44 }
45 assert false, 'timed out waiting for ${want} read select subscriber(s)'
46}
47
48fn test_select_waiters_are_fifo() {
49 mut ch := new_channel[int](0)
50 done := chan string{cap: 2}
51 spawn wait_select_once(ch, done, 'first')
52 wait_for_pop_subscribers(ch, 1)
53 spawn wait_select_once(ch, done, 'second')
54 wait_for_pop_subscribers(ch, 2)
55 // Let both goroutines settle into sem.wait() inside channel_select.
56 // Subscriber registration happens before the first non-blocking try,
57 // so there is a small window where the second goroutine is still in
58 // its initial try_pop before blocking.
59 time.sleep(50 * time.millisecond)
60 value := 999
61 ch.push(&value)
62 assert <-done == 'first:999'
63 ch.close()
64 assert <-done == 'second:closed'
65}
66