v2 / vlib / x / async / pool.v
161 lines · 148 sloc · 4.17 KB · 15fb60b77ea6073658aa8355b247f2e1ae03b714
Raw
1module async
2
3import context
4import sync
5
6// PoolConfig configures a fixed-size concurrency pool.
7//
8// Both values must be positive. `workers` is the maximum number of jobs that
9// can execute concurrently; `queue_size` is the bounded backlog accepted by
10// try_submit() while all worker slots are busy.
11@[params]
12pub struct PoolConfig {
13pub:
14 workers int
15 queue_size int
16}
17
18// Pool limits concurrent JobFn execution with fixed worker slots and bounded backlog.
19//
20// The pool owns one derived context shared by all jobs. Closing the pool stops
21// new submissions, waits for every accepted job, and returns the first job error
22// if any.
23@[heap]
24pub struct Pool {
25mut:
26 ctx context.Context
27 cancel context.CancelFn = unsafe { nil }
28 tokens chan bool
29 wg &sync.WaitGroup = sync.new_waitgroup()
30 max_jobs int
31 // Protects lifecycle flags, accepted job count, WaitGroup.add(), and
32 // first_err. This lock is never held while a user JobFn runs.
33 mutex &sync.Mutex = sync.new_mutex()
34 first_err IError = none
35 closed bool
36 waited bool
37 accepted int
38}
39
40// new_pool creates a Pool with a background parent context.
41pub fn new_pool(config PoolConfig) !&Pool {
42 return new_pool_with_context(context.background(), config)
43}
44
45// new_pool_with_context creates a Pool with a context derived from parent.
46//
47// The worker limit and queue size are fixed for the pool lifetime. Parent
48// cancellation is cooperative: jobs must observe `ctx.done()` and return.
49pub fn new_pool_with_context(parent context.Context, config PoolConfig) !&Pool {
50 if config.workers <= 0 {
51 return error(err_pool_workers_invalid)
52 }
53 if config.queue_size <= 0 {
54 return error(err_pool_queue_size_invalid)
55 }
56 ctx, cancel := new_cancel_context(parent)
57 mut pool := &Pool{
58 ctx: context.Context(ctx)
59 cancel: cancel
60 tokens: chan bool{cap: config.workers}
61 wg: sync.new_waitgroup()
62 max_jobs: config.workers + config.queue_size
63 mutex: sync.new_mutex()
64 }
65 for _ in 0 .. config.workers {
66 pool.tokens <- true
67 }
68 return pool
69}
70
71// try_submit accepts f if the pool is open and its bounded backlog has capacity.
72//
73// It never blocks for queue space. A full backlog returns `async: pool queue is
74// full`, making backpressure explicit for callers.
75pub fn (mut p Pool) try_submit(f JobFn) ! {
76 if f == unsafe { nil } {
77 return error(err_nil_job)
78 }
79 p.mutex.lock()
80 if p.closed {
81 p.mutex.unlock()
82 return error(err_pool_closed)
83 }
84 if p.accepted >= p.max_jobs {
85 p.mutex.unlock()
86 return error(err_pool_queue_full)
87 }
88 p.accepted++
89 // add() is protected by the same mutex as wait(), so callers cannot race an
90 // accepted job against pool shutdown.
91 p.wg.add(1)
92 p.mutex.unlock()
93 // The JobFn is passed directly to the spawned wrapper instead of being
94 // stored in a channel. That keeps closure ownership with V's normal spawn
95 // path while the token channel below still enforces the fixed worker limit.
96 spawn run_pool_job(mut p, f)
97}
98
99// wait closes the pool to new submissions, drains accepted jobs, and waits for completion.
100//
101// wait is one-shot. It returns the first job error observed by any accepted job.
102pub fn (mut p Pool) wait() ! {
103 p.mutex.lock()
104 if p.waited {
105 p.mutex.unlock()
106 return error(err_pool_wait_called)
107 }
108 p.waited = true
109 p.closed = true
110 p.mutex.unlock()
111
112 p.wg.wait()
113 p.cancel()
114 err := p.get_first_error()
115 if err !is none {
116 return err
117 }
118}
119
120// close is an explicit lifecycle alias for wait().
121//
122// It rejects later submissions and waits for all accepted jobs before returning.
123pub fn (mut p Pool) close() ! {
124 p.wait()!
125}
126
127fn run_pool_job(mut p Pool, f JobFn) {
128 defer {
129 p.finish_accepted_job()
130 p.wg.done()
131 }
132 // The token channel is a bounded semaphore. At most `workers` accepted jobs
133 // can pass this point and run user code concurrently.
134 _ := <-p.tokens
135 defer {
136 p.tokens <- true
137 }
138 mut job_ctx := p.ctx
139 f(mut job_ctx) or { p.set_first_error(err) }
140}
141
142fn (mut p Pool) finish_accepted_job() {
143 p.mutex.lock()
144 p.accepted--
145 p.mutex.unlock()
146}
147
148fn (mut p Pool) set_first_error(err IError) {
149 p.mutex.lock()
150 if p.first_err is none {
151 p.first_err = err
152 }
153 p.mutex.unlock()
154}
155
156fn (mut p Pool) get_first_error() IError {
157 p.mutex.lock()
158 err := p.first_err
159 p.mutex.unlock()
160 return err
161}
162