| 1 | import context |
| 2 | import time |
| 3 | import x.async as xasync |
| 4 | |
| 5 | fn test_pool_rejects_invalid_config() { |
| 6 | xasync.new_pool(workers: 0, queue_size: 1) or { |
| 7 | assert err.msg() == 'async: pool worker count must be positive' |
| 8 | return |
| 9 | } |
| 10 | assert false |
| 11 | } |
| 12 | |
| 13 | fn test_pool_rejects_invalid_queue_size() { |
| 14 | xasync.new_pool(workers: 1, queue_size: 0) or { |
| 15 | assert err.msg() == 'async: pool queue size must be positive' |
| 16 | return |
| 17 | } |
| 18 | assert false |
| 19 | } |
| 20 | |
| 21 | fn test_pool_respects_worker_count() { |
| 22 | mut pool := xasync.new_pool(workers: 2, queue_size: 4)! |
| 23 | started := chan bool{cap: 4} |
| 24 | release := chan bool{cap: 4} |
| 25 | for _ in 0 .. 4 { |
| 26 | pool.try_submit(fn [started, release] (mut ctx context.Context) ! { |
| 27 | _ = ctx |
| 28 | started <- true |
| 29 | _ := <-release |
| 30 | })! |
| 31 | } |
| 32 | |
| 33 | wait_for_bool_signal(started, 'first pool job did not start') |
| 34 | wait_for_bool_signal(started, 'second pool job did not start') |
| 35 | select { |
| 36 | _ := <-started { |
| 37 | assert false, 'pool started more jobs than worker count before release' |
| 38 | } |
| 39 | 100 * time.millisecond {} |
| 40 | } |
| 41 | |
| 42 | for _ in 0 .. 4 { |
| 43 | release <- true |
| 44 | } |
| 45 | pool.close()! |
| 46 | } |
| 47 | |
| 48 | fn test_pool_queue_full_returns_backpressure_error() { |
| 49 | mut pool := xasync.new_pool(workers: 1, queue_size: 1)! |
| 50 | started := chan bool{cap: 1} |
| 51 | release := chan bool{cap: 2} |
| 52 | blocking_job := fn [started, release] (mut ctx context.Context) ! { |
| 53 | _ = ctx |
| 54 | started <- true |
| 55 | _ := <-release |
| 56 | } |
| 57 | |
| 58 | pool.try_submit(blocking_job)! |
| 59 | wait_for_bool_signal(started, 'blocking pool job did not start') |
| 60 | pool.try_submit(blocking_job)! |
| 61 | pool.try_submit(blocking_job) or { |
| 62 | assert err.msg() == 'async: pool queue is full' |
| 63 | release <- true |
| 64 | release <- true |
| 65 | pool.close()! |
| 66 | return |
| 67 | } |
| 68 | assert false |
| 69 | } |
| 70 | |
| 71 | fn test_pool_submit_after_close_is_refused() { |
| 72 | mut pool := xasync.new_pool(workers: 1, queue_size: 1)! |
| 73 | pool.close()! |
| 74 | pool.try_submit(fn (mut ctx context.Context) ! { |
| 75 | _ = ctx |
| 76 | }) or { |
| 77 | assert err.msg() == 'async: pool is closed' |
| 78 | return |
| 79 | } |
| 80 | assert false |
| 81 | } |
| 82 | |
| 83 | fn test_pool_wait_is_one_shot() { |
| 84 | mut pool := xasync.new_pool(workers: 1, queue_size: 1)! |
| 85 | pool.wait()! |
| 86 | pool.wait() or { |
| 87 | assert err.msg() == 'async: pool wait was already called' |
| 88 | return |
| 89 | } |
| 90 | assert false |
| 91 | } |
| 92 | |
| 93 | fn test_pool_refuses_nil_job() { |
| 94 | mut pool := xasync.new_pool(workers: 1, queue_size: 1)! |
| 95 | nil_job := unsafe { xasync.JobFn(nil) } |
| 96 | pool.try_submit(nil_job) or { |
| 97 | assert err.msg() == 'async: job function is nil' |
| 98 | pool.close()! |
| 99 | return |
| 100 | } |
| 101 | assert false |
| 102 | } |
| 103 | |
| 104 | fn test_pool_close_waits_for_accepted_jobs() { |
| 105 | mut pool := xasync.new_pool(workers: 1, queue_size: 1)! |
| 106 | release := chan bool{cap: 1} |
| 107 | closed := chan bool{cap: 1} |
| 108 | pool.try_submit(fn [release] (mut ctx context.Context) ! { |
| 109 | _ = ctx |
| 110 | _ := <-release |
| 111 | })! |
| 112 | |
| 113 | close_thread := spawn fn [mut pool, closed] () { |
| 114 | pool.close() or { |
| 115 | closed <- false |
| 116 | return |
| 117 | } |
| 118 | closed <- true |
| 119 | }() |
| 120 | select { |
| 121 | _ := <-closed { |
| 122 | assert false, 'pool close returned before accepted job completed' |
| 123 | } |
| 124 | 100 * time.millisecond {} |
| 125 | } |
| 126 | release <- true |
| 127 | select { |
| 128 | ok := <-closed { |
| 129 | assert ok |
| 130 | } |
| 131 | 1 * time.second { |
| 132 | assert false, 'pool close did not return after accepted job completed' |
| 133 | } |
| 134 | } |
| 135 | close_thread.wait() |
| 136 | } |
| 137 | |
| 138 | fn test_pool_first_error_is_propagated() { |
| 139 | mut pool := xasync.new_pool(workers: 1, queue_size: 2)! |
| 140 | started := chan bool{cap: 1} |
| 141 | release := chan bool{cap: 1} |
| 142 | pool.try_submit(fn [started, release] (mut ctx context.Context) ! { |
| 143 | _ = ctx |
| 144 | started <- true |
| 145 | _ := <-release |
| 146 | return error('first pool failure') |
| 147 | })! |
| 148 | wait_for_bool_signal(started, 'first pool failure job did not start') |
| 149 | pool.try_submit(fn (mut ctx context.Context) ! { |
| 150 | _ = ctx |
| 151 | return error('second pool failure') |
| 152 | })! |
| 153 | release <- true |
| 154 | pool.close() or { |
| 155 | assert err.msg() == 'first pool failure' |
| 156 | return |
| 157 | } |
| 158 | assert false |
| 159 | } |
| 160 | |
| 161 | fn test_pool_error_does_not_drop_accepted_jobs() { |
| 162 | mut pool := xasync.new_pool(workers: 1, queue_size: 1)! |
| 163 | accepted_job_ran := chan bool{cap: 1} |
| 164 | pool.try_submit(fn (mut ctx context.Context) ! { |
| 165 | _ = ctx |
| 166 | return error('first pool failure') |
| 167 | })! |
| 168 | pool.try_submit(fn [accepted_job_ran] (mut ctx context.Context) ! { |
| 169 | _ = ctx |
| 170 | accepted_job_ran <- true |
| 171 | })! |
| 172 | pool.close() or { |
| 173 | assert err.msg() == 'first pool failure' |
| 174 | select { |
| 175 | did_run := <-accepted_job_ran { |
| 176 | assert did_run |
| 177 | } |
| 178 | 1 * time.second { |
| 179 | assert false, 'accepted pool job did not run after earlier error' |
| 180 | } |
| 181 | } |
| 182 | return |
| 183 | } |
| 184 | assert false |
| 185 | } |
| 186 | |
| 187 | fn test_pool_concurrent_errors_return_one_error_and_drain_accepted_jobs() { |
| 188 | error_jobs := 4 |
| 189 | ok_jobs := 8 |
| 190 | mut pool := xasync.new_pool(workers: error_jobs, queue_size: ok_jobs)! |
| 191 | started := chan bool{cap: error_jobs} |
| 192 | release := chan bool{cap: error_jobs} |
| 193 | completed_ok := chan bool{cap: ok_jobs} |
| 194 | for i in 0 .. error_jobs { |
| 195 | pool.try_submit(fn [started, release, i] (mut ctx context.Context) ! { |
| 196 | _ = ctx |
| 197 | started <- true |
| 198 | _ := <-release |
| 199 | return error('pool concurrent failure ${i}') |
| 200 | })! |
| 201 | } |
| 202 | for _ in 0 .. ok_jobs { |
| 203 | pool.try_submit(fn [completed_ok] (mut ctx context.Context) ! { |
| 204 | _ = ctx |
| 205 | completed_ok <- true |
| 206 | })! |
| 207 | } |
| 208 | for _ in 0 .. error_jobs { |
| 209 | wait_for_bool_signal(started, 'pool error job did not start') |
| 210 | } |
| 211 | for _ in 0 .. error_jobs { |
| 212 | release <- true |
| 213 | } |
| 214 | pool.close() or { |
| 215 | assert err.msg().starts_with('pool concurrent failure ') |
| 216 | for _ in 0 .. ok_jobs { |
| 217 | wait_for_bool_signal(completed_ok, 'accepted ok pool job did not complete') |
| 218 | } |
| 219 | return |
| 220 | } |
| 221 | assert false |
| 222 | } |
| 223 | |
| 224 | fn test_pool_close_drains_many_accepted_jobs_while_finishing() { |
| 225 | jobs := 12 |
| 226 | workers := 3 |
| 227 | mut pool := xasync.new_pool(workers: workers, queue_size: jobs - workers)! |
| 228 | started := chan bool{cap: jobs} |
| 229 | release := chan bool{cap: jobs} |
| 230 | finished := chan bool{cap: jobs} |
| 231 | closed := chan bool{cap: 1} |
| 232 | for _ in 0 .. jobs { |
| 233 | pool.try_submit(fn [started, release, finished] (mut ctx context.Context) ! { |
| 234 | _ = ctx |
| 235 | started <- true |
| 236 | _ := <-release |
| 237 | finished <- true |
| 238 | })! |
| 239 | } |
| 240 | for _ in 0 .. workers { |
| 241 | wait_for_bool_signal(started, 'initial pool job did not start') |
| 242 | } |
| 243 | close_thread := spawn fn [mut pool, closed] () { |
| 244 | pool.close() or { |
| 245 | closed <- false |
| 246 | return |
| 247 | } |
| 248 | closed <- true |
| 249 | }() |
| 250 | wait_until_pool_rejects_as_closed(mut pool) |
| 251 | assert_no_bool_signal(closed, 'pool close returned while accepted jobs were still blocked') |
| 252 | |
| 253 | for _ in 0 .. jobs - 1 { |
| 254 | release <- true |
| 255 | } |
| 256 | for _ in 0 .. jobs - 1 { |
| 257 | wait_for_bool_signal(finished, 'accepted pool job did not finish') |
| 258 | } |
| 259 | assert_no_bool_signal(closed, 'pool close returned before the last accepted job finished') |
| 260 | |
| 261 | release <- true |
| 262 | wait_for_bool_signal(finished, 'last accepted pool job did not finish') |
| 263 | wait_for_bool_signal(closed, 'pool close did not drain accepted jobs') |
| 264 | close_thread.wait() |
| 265 | } |
| 266 | |
| 267 | fn test_pool_parent_cancellation_is_observed_by_cooperative_job() { |
| 268 | parent_ctx, cancel := xasync.with_cancel() |
| 269 | mut pool := xasync.new_pool_with_context(parent_ctx, workers: 1, queue_size: 1)! |
| 270 | started := chan bool{cap: 1} |
| 271 | pool.try_submit(fn [started] (mut ctx context.Context) ! { |
| 272 | started <- true |
| 273 | done := ctx.done() |
| 274 | select { |
| 275 | _ := <-done { |
| 276 | return ctx.err() |
| 277 | } |
| 278 | 1 * time.second { |
| 279 | return error('pool job did not observe parent cancellation') |
| 280 | } |
| 281 | } |
| 282 | return error('unreachable') |
| 283 | })! |
| 284 | wait_for_bool_signal(started, 'pool job did not start before parent cancellation') |
| 285 | cancel() |
| 286 | pool.close() or { |
| 287 | assert err.msg() == 'context canceled' |
| 288 | return |
| 289 | } |
| 290 | assert false |
| 291 | } |
| 292 | |
| 293 | fn test_pool_non_cooperative_job_finishes_naturally_after_parent_cancel() { |
| 294 | parent_ctx, cancel := xasync.with_cancel() |
| 295 | mut pool := xasync.new_pool_with_context(parent_ctx, workers: 1, queue_size: 1)! |
| 296 | finished := chan bool{cap: 1} |
| 297 | pool.try_submit(fn [finished] (mut ctx context.Context) ! { |
| 298 | _ = ctx |
| 299 | time.sleep(20 * time.millisecond) |
| 300 | finished <- true |
| 301 | })! |
| 302 | cancel() |
| 303 | pool.close()! |
| 304 | assert <-finished |
| 305 | } |
| 306 | |
| 307 | fn test_pool_short_stress_many_jobs() { |
| 308 | jobs := 100 |
| 309 | mut pool := xasync.new_pool(workers: 4, queue_size: jobs)! |
| 310 | done := chan int{cap: jobs} |
| 311 | for i in 0 .. jobs { |
| 312 | pool.try_submit(fn [done, i] (mut ctx context.Context) ! { |
| 313 | _ = ctx |
| 314 | done <- i |
| 315 | })! |
| 316 | } |
| 317 | pool.close()! |
| 318 | |
| 319 | mut seen := []bool{len: jobs} |
| 320 | for _ in 0 .. jobs { |
| 321 | i := <-done |
| 322 | assert i >= 0 |
| 323 | assert i < jobs |
| 324 | assert !seen[i] |
| 325 | seen[i] = true |
| 326 | } |
| 327 | for was_seen in seen { |
| 328 | assert was_seen |
| 329 | } |
| 330 | } |
| 331 | |
| 332 | fn wait_for_bool_signal(signal chan bool, message string) { |
| 333 | select { |
| 334 | ok := <-signal { |
| 335 | assert ok |
| 336 | } |
| 337 | 1 * time.second { |
| 338 | assert false, message |
| 339 | } |
| 340 | } |
| 341 | } |
| 342 | |
| 343 | fn assert_no_bool_signal(signal chan bool, message string) { |
| 344 | select { |
| 345 | _ := <-signal { |
| 346 | assert false, message |
| 347 | } |
| 348 | else {} |
| 349 | } |
| 350 | } |
| 351 | |
| 352 | fn wait_until_pool_rejects_as_closed(mut pool xasync.Pool) { |
| 353 | probe := fn (mut ctx context.Context) ! { |
| 354 | _ = ctx |
| 355 | } |
| 356 | for _ in 0 .. 100 { |
| 357 | pool.try_submit(probe) or { |
| 358 | if err.msg() == 'async: pool is closed' { |
| 359 | return |
| 360 | } |
| 361 | assert err.msg() == 'async: pool queue is full' |
| 362 | time.sleep(1 * time.millisecond) |
| 363 | continue |
| 364 | } |
| 365 | assert false, 'pool accepted probe while backlog should be full' |
| 366 | } |
| 367 | assert false, 'pool close did not start' |
| 368 | } |
| 369 | |