v2 / vlib / x / async / group_test.v
211 lines · 200 sloc · 4.48 KB · 15fb60b77ea6073658aa8355b247f2e1ae03b714
Raw
1import context
2import time
3import x.async as xasync
4
5fn test_with_cancel_sets_error_and_closes_done() {
6 mut ctx, cancel := xasync.with_cancel()
7 cancel()
8 done := ctx.done()
9 select {
10 _ := <-done {
11 assert ctx.err().msg() == 'context canceled'
12 }
13 1 * time.second {
14 assert false, 'cancel did not close the context done channel'
15 }
16 }
17}
18
19fn test_group_success() {
20 parent := context.background()
21 mut group := xasync.new_group(parent)
22 done := chan int{cap: 2}
23 group.go(fn [done] (mut ctx context.Context) ! {
24 _ = ctx
25 done <- 1
26 })!
27 group.go(fn [done] (mut ctx context.Context) ! {
28 _ = ctx
29 done <- 2
30 })!
31 group.wait()!
32 assert (<-done) + (<-done) == 3
33}
34
35fn test_group_error_returns_first_error() {
36 parent := context.background()
37 mut group := xasync.new_group(parent)
38 group.go(fn (mut ctx context.Context) ! {
39 _ = ctx
40 return error('group failed')
41 })!
42 group.wait() or {
43 assert err.msg() == 'group failed'
44 return
45 }
46 assert false
47}
48
49fn test_group_wait_without_tasks() {
50 parent := context.background()
51 mut group := xasync.new_group(parent)
52 group.wait()!
53}
54
55fn test_group_refuses_go_after_wait() {
56 parent := context.background()
57 mut group := xasync.new_group(parent)
58 group.wait()!
59 group.go(fn (mut ctx context.Context) ! {
60 _ = ctx
61 }) or {
62 assert err.msg() == 'async: group does not accept new tasks after wait starts'
63 return
64 }
65 assert false
66}
67
68fn test_group_refuses_second_wait() {
69 parent := context.background()
70 mut group := xasync.new_group(parent)
71 group.wait()!
72 group.wait() or {
73 assert err.msg() == 'async: group wait was already called'
74 return
75 }
76 assert false
77}
78
79fn test_group_refuses_nil_job() {
80 parent := context.background()
81 mut group := xasync.new_group(parent)
82 nil_job := unsafe { xasync.JobFn(nil) }
83 group.go(nil_job) or {
84 assert err.msg() == 'async: job function is nil'
85 return
86 }
87 assert false
88}
89
90fn test_group_cancels_siblings_cooperatively() {
91 parent := context.background()
92 mut group := xasync.new_group(parent)
93 cancelled := chan bool{cap: 1}
94 group.go(fn (mut ctx context.Context) ! {
95 _ = ctx
96 return error('stop siblings')
97 })!
98 group.go(fn [cancelled] (mut ctx context.Context) ! {
99 done := ctx.done()
100 select {
101 _ := <-done {
102 cancelled <- true
103 }
104 1 * time.second {
105 cancelled <- false
106 }
107 }
108 })!
109 group.wait() or { assert err.msg() == 'stop siblings' }
110 assert <-cancelled
111}
112
113fn test_group_first_error_remains_stable_with_concurrent_secondary_errors() {
114 parent := context.background()
115 mut group := xasync.new_group(parent)
116 secondary_ready := chan bool{cap: 8}
117 for _ in 0 .. 8 {
118 group.go(fn [secondary_ready] (mut ctx context.Context) ! {
119 secondary_ready <- true
120 done := ctx.done()
121 select {
122 _ := <-done {
123 return error('secondary error after cancellation')
124 }
125 1 * time.second {
126 return error('secondary error timeout')
127 }
128 }
129 })!
130 }
131 for _ in 0 .. 8 {
132 assert <-secondary_ready
133 }
134 group.go(fn (mut ctx context.Context) ! {
135 _ = ctx
136 return error('primary error')
137 })!
138 group.wait() or {
139 assert err.msg() == 'primary error'
140 return
141 }
142 assert false
143}
144
145fn test_group_many_short_jobs_return_first_error() {
146 jobs := 64
147 mut group := xasync.new_group(context.background())
148 done := chan int{cap: jobs}
149 for i in 0 .. jobs {
150 group.go(fn [done, i] (mut ctx context.Context) ! {
151 _ = ctx
152 done <- i
153 })!
154 }
155 group.go(fn (mut ctx context.Context) ! {
156 _ = ctx
157 return error('group stress failure')
158 })!
159 group.wait() or {
160 assert err.msg() == 'group stress failure'
161 mut seen := []bool{len: jobs}
162 for _ in 0 .. jobs {
163 select {
164 i := <-done {
165 assert i >= 0
166 assert i < jobs
167 assert !seen[i]
168 seen[i] = true
169 }
170 1 * time.second {
171 assert false, 'group short job did not finish'
172 }
173 }
174 }
175 for was_seen in seen {
176 assert was_seen
177 }
178 return
179 }
180 assert false
181}
182
183fn test_group_parent_cancellation_is_observed_by_cooperative_job() {
184 parent_ctx, cancel := xasync.with_cancel()
185 mut group := xasync.new_group(parent_ctx)
186 observed := chan string{cap: 1}
187 group.go(fn [observed] (mut ctx context.Context) ! {
188 done := ctx.done()
189 select {
190 _ := <-done {
191 err := ctx.err()
192 observed <- err.msg()
193 return err
194 }
195 1 * time.second {
196 observed <- 'not canceled'
197 return error('parent cancellation was not observed')
198 }
199 }
200 })!
201 cancel()
202 group.wait() or { assert err.msg() == 'context canceled' }
203 select {
204 msg := <-observed {
205 assert msg == 'context canceled'
206 }
207 2 * time.second {
208 assert false, 'cooperative group job did not observe parent cancellation'
209 }
210 }
211}
212