v2 / vlib / x / async / group.v
114 lines · 105 sloc · 3.19 KB · 15fb60b77ea6073658aa8355b247f2e1ae03b714
Raw
1module async
2
3import context
4import sync
5
6// Group coordinates a set of related concurrent jobs.
7//
8// Jobs share a derived context. The first job error is returned by wait() and
9// cancels the shared context so sibling jobs can stop cooperatively.
10@[heap]
11pub struct Group {
12mut:
13 ctx context.Context
14 cancel context.CancelFn = unsafe { nil }
15 wg &sync.WaitGroup = sync.new_waitgroup()
16 // Protects both WaitGroup lifecycle state and first_err. WaitGroup.add()
17 // must not race with WaitGroup.wait(), so go() and wait() share this lock.
18 mutex &sync.Mutex = sync.new_mutex()
19 // Stored once. Later job errors never replace the first failure observed by
20 // the group, even when several jobs fail concurrently after cancellation.
21 first_err IError = none
22 // Set before wait() calls wg.wait(). Once true, no further jobs are accepted.
23 waiting bool
24}
25
26// new_group creates a Group with a shared cancellable context derived from parent.
27//
28// The parent is accepted by value to keep the public call site simple. The
29// derived context is owned by the group and canceled on first job error or when
30// wait() completes.
31pub fn new_group(parent context.Context) &Group {
32 ctx, cancel := new_cancel_context(parent)
33 return &Group{
34 ctx: context.Context(ctx)
35 cancel: cancel
36 wg: sync.new_waitgroup()
37 mutex: sync.new_mutex()
38 }
39}
40
41// go starts f in a new concurrent task.
42//
43// Calling go after wait has started returns an error. The task should not
44// panic; panics in spawned work are not recovered by x.async.
45pub fn (mut g Group) go(f JobFn) ! {
46 if f == unsafe { nil } {
47 return error(err_nil_job)
48 }
49 g.mutex.lock()
50 if g.waiting {
51 g.mutex.unlock()
52 return error(err_group_go_after_wait)
53 }
54 // add() happens while holding the same mutex that guards wait(), preventing
55 // callers from triggering sync.WaitGroup's add-while-waiting misuse panic.
56 g.wg.add(1)
57 g.mutex.unlock()
58 spawn run_group_job(mut g, f)
59}
60
61// wait blocks until all accepted group jobs finish.
62//
63// It returns the first job error, if any. wait may be called once; after it
64// starts, the group no longer accepts new jobs.
65pub fn (mut g Group) wait() ! {
66 g.mutex.lock()
67 if g.waiting {
68 g.mutex.unlock()
69 return error(err_group_wait_called)
70 }
71 g.waiting = true
72 g.mutex.unlock()
73
74 g.wg.wait()
75 // Always cancel after all jobs finish to release the derived context and to
76 // make the lifecycle symmetric with context.with_cancel().
77 g.cancel()
78 err := g.get_first_error()
79 if err !is none {
80 return err
81 }
82}
83
84fn run_group_job(mut g Group, f JobFn) {
85 defer {
86 g.wg.done()
87 }
88 // Each job gets its own local mutable interface value. The underlying
89 // context is shared and synchronized by the context module.
90 mut job_ctx := g.ctx
91 f(mut job_ctx) or { g.set_first_error(err) }
92}
93
94fn (mut g Group) set_first_error(err IError) {
95 mut should_cancel := false
96 g.mutex.lock()
97 if g.first_err is none {
98 g.first_err = err
99 should_cancel = true
100 }
101 g.mutex.unlock()
102 if should_cancel {
103 // Cancel outside the group mutex. context cancellation can notify child
104 // contexts, so keeping our own lock out of that path avoids lock nesting.
105 g.cancel()
106 }
107}
108
109fn (mut g Group) get_first_error() IError {
110 g.mutex.lock()
111 err := g.first_err
112 g.mutex.unlock()
113 return err
114}
115