v2 / vlib / x / async / pool_test.v
368 lines · 343 sloc · 8.64 KB · 15fb60b77ea6073658aa8355b247f2e1ae03b714
Raw
1import context
2import time
3import x.async as xasync
4
5fn test_pool_rejects_invalid_config() {
6 xasync.new_pool(workers: 0, queue_size: 1) or {
7 assert err.msg() == 'async: pool worker count must be positive'
8 return
9 }
10 assert false
11}
12
13fn test_pool_rejects_invalid_queue_size() {
14 xasync.new_pool(workers: 1, queue_size: 0) or {
15 assert err.msg() == 'async: pool queue size must be positive'
16 return
17 }
18 assert false
19}
20
21fn test_pool_respects_worker_count() {
22 mut pool := xasync.new_pool(workers: 2, queue_size: 4)!
23 started := chan bool{cap: 4}
24 release := chan bool{cap: 4}
25 for _ in 0 .. 4 {
26 pool.try_submit(fn [started, release] (mut ctx context.Context) ! {
27 _ = ctx
28 started <- true
29 _ := <-release
30 })!
31 }
32
33 wait_for_bool_signal(started, 'first pool job did not start')
34 wait_for_bool_signal(started, 'second pool job did not start')
35 select {
36 _ := <-started {
37 assert false, 'pool started more jobs than worker count before release'
38 }
39 100 * time.millisecond {}
40 }
41
42 for _ in 0 .. 4 {
43 release <- true
44 }
45 pool.close()!
46}
47
48fn test_pool_queue_full_returns_backpressure_error() {
49 mut pool := xasync.new_pool(workers: 1, queue_size: 1)!
50 started := chan bool{cap: 1}
51 release := chan bool{cap: 2}
52 blocking_job := fn [started, release] (mut ctx context.Context) ! {
53 _ = ctx
54 started <- true
55 _ := <-release
56 }
57
58 pool.try_submit(blocking_job)!
59 wait_for_bool_signal(started, 'blocking pool job did not start')
60 pool.try_submit(blocking_job)!
61 pool.try_submit(blocking_job) or {
62 assert err.msg() == 'async: pool queue is full'
63 release <- true
64 release <- true
65 pool.close()!
66 return
67 }
68 assert false
69}
70
71fn test_pool_submit_after_close_is_refused() {
72 mut pool := xasync.new_pool(workers: 1, queue_size: 1)!
73 pool.close()!
74 pool.try_submit(fn (mut ctx context.Context) ! {
75 _ = ctx
76 }) or {
77 assert err.msg() == 'async: pool is closed'
78 return
79 }
80 assert false
81}
82
83fn test_pool_wait_is_one_shot() {
84 mut pool := xasync.new_pool(workers: 1, queue_size: 1)!
85 pool.wait()!
86 pool.wait() or {
87 assert err.msg() == 'async: pool wait was already called'
88 return
89 }
90 assert false
91}
92
93fn test_pool_refuses_nil_job() {
94 mut pool := xasync.new_pool(workers: 1, queue_size: 1)!
95 nil_job := unsafe { xasync.JobFn(nil) }
96 pool.try_submit(nil_job) or {
97 assert err.msg() == 'async: job function is nil'
98 pool.close()!
99 return
100 }
101 assert false
102}
103
104fn test_pool_close_waits_for_accepted_jobs() {
105 mut pool := xasync.new_pool(workers: 1, queue_size: 1)!
106 release := chan bool{cap: 1}
107 closed := chan bool{cap: 1}
108 pool.try_submit(fn [release] (mut ctx context.Context) ! {
109 _ = ctx
110 _ := <-release
111 })!
112
113 close_thread := spawn fn [mut pool, closed] () {
114 pool.close() or {
115 closed <- false
116 return
117 }
118 closed <- true
119 }()
120 select {
121 _ := <-closed {
122 assert false, 'pool close returned before accepted job completed'
123 }
124 100 * time.millisecond {}
125 }
126 release <- true
127 select {
128 ok := <-closed {
129 assert ok
130 }
131 1 * time.second {
132 assert false, 'pool close did not return after accepted job completed'
133 }
134 }
135 close_thread.wait()
136}
137
138fn test_pool_first_error_is_propagated() {
139 mut pool := xasync.new_pool(workers: 1, queue_size: 2)!
140 started := chan bool{cap: 1}
141 release := chan bool{cap: 1}
142 pool.try_submit(fn [started, release] (mut ctx context.Context) ! {
143 _ = ctx
144 started <- true
145 _ := <-release
146 return error('first pool failure')
147 })!
148 wait_for_bool_signal(started, 'first pool failure job did not start')
149 pool.try_submit(fn (mut ctx context.Context) ! {
150 _ = ctx
151 return error('second pool failure')
152 })!
153 release <- true
154 pool.close() or {
155 assert err.msg() == 'first pool failure'
156 return
157 }
158 assert false
159}
160
161fn test_pool_error_does_not_drop_accepted_jobs() {
162 mut pool := xasync.new_pool(workers: 1, queue_size: 1)!
163 accepted_job_ran := chan bool{cap: 1}
164 pool.try_submit(fn (mut ctx context.Context) ! {
165 _ = ctx
166 return error('first pool failure')
167 })!
168 pool.try_submit(fn [accepted_job_ran] (mut ctx context.Context) ! {
169 _ = ctx
170 accepted_job_ran <- true
171 })!
172 pool.close() or {
173 assert err.msg() == 'first pool failure'
174 select {
175 did_run := <-accepted_job_ran {
176 assert did_run
177 }
178 1 * time.second {
179 assert false, 'accepted pool job did not run after earlier error'
180 }
181 }
182 return
183 }
184 assert false
185}
186
187fn test_pool_concurrent_errors_return_one_error_and_drain_accepted_jobs() {
188 error_jobs := 4
189 ok_jobs := 8
190 mut pool := xasync.new_pool(workers: error_jobs, queue_size: ok_jobs)!
191 started := chan bool{cap: error_jobs}
192 release := chan bool{cap: error_jobs}
193 completed_ok := chan bool{cap: ok_jobs}
194 for i in 0 .. error_jobs {
195 pool.try_submit(fn [started, release, i] (mut ctx context.Context) ! {
196 _ = ctx
197 started <- true
198 _ := <-release
199 return error('pool concurrent failure ${i}')
200 })!
201 }
202 for _ in 0 .. ok_jobs {
203 pool.try_submit(fn [completed_ok] (mut ctx context.Context) ! {
204 _ = ctx
205 completed_ok <- true
206 })!
207 }
208 for _ in 0 .. error_jobs {
209 wait_for_bool_signal(started, 'pool error job did not start')
210 }
211 for _ in 0 .. error_jobs {
212 release <- true
213 }
214 pool.close() or {
215 assert err.msg().starts_with('pool concurrent failure ')
216 for _ in 0 .. ok_jobs {
217 wait_for_bool_signal(completed_ok, 'accepted ok pool job did not complete')
218 }
219 return
220 }
221 assert false
222}
223
224fn test_pool_close_drains_many_accepted_jobs_while_finishing() {
225 jobs := 12
226 workers := 3
227 mut pool := xasync.new_pool(workers: workers, queue_size: jobs - workers)!
228 started := chan bool{cap: jobs}
229 release := chan bool{cap: jobs}
230 finished := chan bool{cap: jobs}
231 closed := chan bool{cap: 1}
232 for _ in 0 .. jobs {
233 pool.try_submit(fn [started, release, finished] (mut ctx context.Context) ! {
234 _ = ctx
235 started <- true
236 _ := <-release
237 finished <- true
238 })!
239 }
240 for _ in 0 .. workers {
241 wait_for_bool_signal(started, 'initial pool job did not start')
242 }
243 close_thread := spawn fn [mut pool, closed] () {
244 pool.close() or {
245 closed <- false
246 return
247 }
248 closed <- true
249 }()
250 wait_until_pool_rejects_as_closed(mut pool)
251 assert_no_bool_signal(closed, 'pool close returned while accepted jobs were still blocked')
252
253 for _ in 0 .. jobs - 1 {
254 release <- true
255 }
256 for _ in 0 .. jobs - 1 {
257 wait_for_bool_signal(finished, 'accepted pool job did not finish')
258 }
259 assert_no_bool_signal(closed, 'pool close returned before the last accepted job finished')
260
261 release <- true
262 wait_for_bool_signal(finished, 'last accepted pool job did not finish')
263 wait_for_bool_signal(closed, 'pool close did not drain accepted jobs')
264 close_thread.wait()
265}
266
267fn test_pool_parent_cancellation_is_observed_by_cooperative_job() {
268 parent_ctx, cancel := xasync.with_cancel()
269 mut pool := xasync.new_pool_with_context(parent_ctx, workers: 1, queue_size: 1)!
270 started := chan bool{cap: 1}
271 pool.try_submit(fn [started] (mut ctx context.Context) ! {
272 started <- true
273 done := ctx.done()
274 select {
275 _ := <-done {
276 return ctx.err()
277 }
278 1 * time.second {
279 return error('pool job did not observe parent cancellation')
280 }
281 }
282 return error('unreachable')
283 })!
284 wait_for_bool_signal(started, 'pool job did not start before parent cancellation')
285 cancel()
286 pool.close() or {
287 assert err.msg() == 'context canceled'
288 return
289 }
290 assert false
291}
292
293fn test_pool_non_cooperative_job_finishes_naturally_after_parent_cancel() {
294 parent_ctx, cancel := xasync.with_cancel()
295 mut pool := xasync.new_pool_with_context(parent_ctx, workers: 1, queue_size: 1)!
296 finished := chan bool{cap: 1}
297 pool.try_submit(fn [finished] (mut ctx context.Context) ! {
298 _ = ctx
299 time.sleep(20 * time.millisecond)
300 finished <- true
301 })!
302 cancel()
303 pool.close()!
304 assert <-finished
305}
306
307fn test_pool_short_stress_many_jobs() {
308 jobs := 100
309 mut pool := xasync.new_pool(workers: 4, queue_size: jobs)!
310 done := chan int{cap: jobs}
311 for i in 0 .. jobs {
312 pool.try_submit(fn [done, i] (mut ctx context.Context) ! {
313 _ = ctx
314 done <- i
315 })!
316 }
317 pool.close()!
318
319 mut seen := []bool{len: jobs}
320 for _ in 0 .. jobs {
321 i := <-done
322 assert i >= 0
323 assert i < jobs
324 assert !seen[i]
325 seen[i] = true
326 }
327 for was_seen in seen {
328 assert was_seen
329 }
330}
331
332fn wait_for_bool_signal(signal chan bool, message string) {
333 select {
334 ok := <-signal {
335 assert ok
336 }
337 1 * time.second {
338 assert false, message
339 }
340 }
341}
342
343fn assert_no_bool_signal(signal chan bool, message string) {
344 select {
345 _ := <-signal {
346 assert false, message
347 }
348 else {}
349 }
350}
351
352fn wait_until_pool_rejects_as_closed(mut pool xasync.Pool) {
353 probe := fn (mut ctx context.Context) ! {
354 _ = ctx
355 }
356 for _ in 0 .. 100 {
357 pool.try_submit(probe) or {
358 if err.msg() == 'async: pool is closed' {
359 return
360 }
361 assert err.msg() == 'async: pool queue is full'
362 time.sleep(1 * time.millisecond)
363 continue
364 }
365 assert false, 'pool accepted probe while backlog should be full'
366 }
367 assert false, 'pool close did not start'
368}
369