| 1 | module async |
| 2 | |
| 3 | import context |
| 4 | import sync |
| 5 | |
| 6 | // PoolConfig configures a fixed-size concurrency pool. |
| 7 | // |
| 8 | // Both values must be positive. `workers` is the maximum number of jobs that |
| 9 | // can execute concurrently; `queue_size` is the bounded backlog accepted by |
| 10 | // try_submit() while all worker slots are busy. |
| 11 | @[params] |
| 12 | pub struct PoolConfig { |
| 13 | pub: |
| 14 | workers int |
| 15 | queue_size int |
| 16 | } |
| 17 | |
| 18 | // Pool limits concurrent JobFn execution with fixed worker slots and bounded backlog. |
| 19 | // |
| 20 | // The pool owns one derived context shared by all jobs. Closing the pool stops |
| 21 | // new submissions, waits for every accepted job, and returns the first job error |
| 22 | // if any. |
| 23 | @[heap] |
| 24 | pub struct Pool { |
| 25 | mut: |
| 26 | ctx context.Context |
| 27 | cancel context.CancelFn = unsafe { nil } |
| 28 | tokens chan bool |
| 29 | wg &sync.WaitGroup = sync.new_waitgroup() |
| 30 | max_jobs int |
| 31 | // Protects lifecycle flags, accepted job count, WaitGroup.add(), and |
| 32 | // first_err. This lock is never held while a user JobFn runs. |
| 33 | mutex &sync.Mutex = sync.new_mutex() |
| 34 | first_err IError = none |
| 35 | closed bool |
| 36 | waited bool |
| 37 | accepted int |
| 38 | } |
| 39 | |
| 40 | // new_pool creates a Pool with a background parent context. |
| 41 | pub fn new_pool(config PoolConfig) !&Pool { |
| 42 | return new_pool_with_context(context.background(), config) |
| 43 | } |
| 44 | |
| 45 | // new_pool_with_context creates a Pool with a context derived from parent. |
| 46 | // |
| 47 | // The worker limit and queue size are fixed for the pool lifetime. Parent |
| 48 | // cancellation is cooperative: jobs must observe `ctx.done()` and return. |
| 49 | pub fn new_pool_with_context(parent context.Context, config PoolConfig) !&Pool { |
| 50 | if config.workers <= 0 { |
| 51 | return error(err_pool_workers_invalid) |
| 52 | } |
| 53 | if config.queue_size <= 0 { |
| 54 | return error(err_pool_queue_size_invalid) |
| 55 | } |
| 56 | ctx, cancel := new_cancel_context(parent) |
| 57 | mut pool := &Pool{ |
| 58 | ctx: context.Context(ctx) |
| 59 | cancel: cancel |
| 60 | tokens: chan bool{cap: config.workers} |
| 61 | wg: sync.new_waitgroup() |
| 62 | max_jobs: config.workers + config.queue_size |
| 63 | mutex: sync.new_mutex() |
| 64 | } |
| 65 | for _ in 0 .. config.workers { |
| 66 | pool.tokens <- true |
| 67 | } |
| 68 | return pool |
| 69 | } |
| 70 | |
| 71 | // try_submit accepts f if the pool is open and its bounded backlog has capacity. |
| 72 | // |
| 73 | // It never blocks for queue space. A full backlog returns `async: pool queue is |
| 74 | // full`, making backpressure explicit for callers. |
| 75 | pub fn (mut p Pool) try_submit(f JobFn) ! { |
| 76 | if f == unsafe { nil } { |
| 77 | return error(err_nil_job) |
| 78 | } |
| 79 | p.mutex.lock() |
| 80 | if p.closed { |
| 81 | p.mutex.unlock() |
| 82 | return error(err_pool_closed) |
| 83 | } |
| 84 | if p.accepted >= p.max_jobs { |
| 85 | p.mutex.unlock() |
| 86 | return error(err_pool_queue_full) |
| 87 | } |
| 88 | p.accepted++ |
| 89 | // add() is protected by the same mutex as wait(), so callers cannot race an |
| 90 | // accepted job against pool shutdown. |
| 91 | p.wg.add(1) |
| 92 | p.mutex.unlock() |
| 93 | // The JobFn is passed directly to the spawned wrapper instead of being |
| 94 | // stored in a channel. That keeps closure ownership with V's normal spawn |
| 95 | // path while the token channel below still enforces the fixed worker limit. |
| 96 | spawn run_pool_job(mut p, f) |
| 97 | } |
| 98 | |
| 99 | // wait closes the pool to new submissions, drains accepted jobs, and waits for completion. |
| 100 | // |
| 101 | // wait is one-shot. It returns the first job error observed by any accepted job. |
| 102 | pub fn (mut p Pool) wait() ! { |
| 103 | p.mutex.lock() |
| 104 | if p.waited { |
| 105 | p.mutex.unlock() |
| 106 | return error(err_pool_wait_called) |
| 107 | } |
| 108 | p.waited = true |
| 109 | p.closed = true |
| 110 | p.mutex.unlock() |
| 111 | |
| 112 | p.wg.wait() |
| 113 | p.cancel() |
| 114 | err := p.get_first_error() |
| 115 | if err !is none { |
| 116 | return err |
| 117 | } |
| 118 | } |
| 119 | |
| 120 | // close is an explicit lifecycle alias for wait(). |
| 121 | // |
| 122 | // It rejects later submissions and waits for all accepted jobs before returning. |
| 123 | pub fn (mut p Pool) close() ! { |
| 124 | p.wait()! |
| 125 | } |
| 126 | |
| 127 | fn run_pool_job(mut p Pool, f JobFn) { |
| 128 | defer { |
| 129 | p.finish_accepted_job() |
| 130 | p.wg.done() |
| 131 | } |
| 132 | // The token channel is a bounded semaphore. At most `workers` accepted jobs |
| 133 | // can pass this point and run user code concurrently. |
| 134 | _ := <-p.tokens |
| 135 | defer { |
| 136 | p.tokens <- true |
| 137 | } |
| 138 | mut job_ctx := p.ctx |
| 139 | f(mut job_ctx) or { p.set_first_error(err) } |
| 140 | } |
| 141 | |
| 142 | fn (mut p Pool) finish_accepted_job() { |
| 143 | p.mutex.lock() |
| 144 | p.accepted-- |
| 145 | p.mutex.unlock() |
| 146 | } |
| 147 | |
| 148 | fn (mut p Pool) set_first_error(err IError) { |
| 149 | p.mutex.lock() |
| 150 | if p.first_err is none { |
| 151 | p.first_err = err |
| 152 | } |
| 153 | p.mutex.unlock() |
| 154 | } |
| 155 | |
| 156 | fn (mut p Pool) get_first_error() IError { |
| 157 | p.mutex.lock() |
| 158 | err := p.first_err |
| 159 | p.mutex.unlock() |
| 160 | return err |
| 161 | } |
| 162 | |