v2 / vlib / sync / waitgroup.c.v
107 lines · 96 sloc · 2.98 KB · 5be2b1d9cd1f93efc8a776455991080648a51fb9
Raw
1// Copyright (c) 2019 Alexander Medvednikov. All rights reserved.
2// Use of this source code is governed by an MIT license
3// that can be found in the LICENSE file.
4module sync
5
6@[trusted]
7fn C.atomic_load_u64(voidptr) u64
8
9@[trusted]
10fn C.atomic_store_u64(voidptr, u64)
11
12@[trusted]
13fn C.atomic_compare_exchange_weak_u64(voidptr, voidptr, u64) bool
14
15@[trusted]
16fn C.atomic_fetch_add_u64(voidptr, u64) u64
17
18// WaitGroup
19// Do not copy an instance of WaitGroup, use a ref instead.
20//
21// usage: in main thread:
22// `wg := sync.new_waitgroup()`
23// `wg.add(nr_jobs)` before starting jobs with `go ...`
24// `wg.wait()` to wait for all jobs to have finished
25//
26// in each parallel job:
27// `wg.done()` when finished
28//
29// [init_with=new_waitgroup] // TODO: implement support for init_with struct attribute, and disallow WaitGroup{} from outside the sync.new_waitgroup() function.
30@[heap]
31pub struct WaitGroup {
32mut:
33 state u64 // high 32 bits: task count, low 32 bits: wait count
34 sem Semaphore // Blocks wait() until add() drops the task count to zero
35}
36
37// new_waitgroup creates a new WaitGroup.
38pub fn new_waitgroup() &WaitGroup {
39 mut wg := WaitGroup{}
40 wg.init()
41 return &wg
42}
43
44// init initializes a WaitGroup.
45pub fn (mut wg WaitGroup) init() {
46 C.atomic_store_u64(voidptr(&wg.state), 0)
47 wg.sem.init(0)
48}
49
50// add increments (+ve delta) or decrements (-ve delta) task count by delta
51// and unblocks any wait() calls if task count becomes zero.
52// add panics if task count drops below zero.
53pub fn (mut wg WaitGroup) add(delta int) {
54 state_delta := u64(u32(delta)) << 32
55 old_state := C.atomic_fetch_add_u64(voidptr(&wg.state), state_delta)
56 new_state := old_state + state_delta
57 new_nrjobs := int(i32(new_state >> 32))
58 mut num_waiters := u32(new_state)
59 if new_nrjobs < 0 {
60 panic('Negative number of jobs in waitgroup')
61 }
62 if new_nrjobs > 0 || num_waiters == 0 {
63 return
64 }
65 if C.atomic_load_u64(voidptr(&wg.state)) != new_state {
66 panic('WaitGroup misuse: add() called concurrently with wait()')
67 }
68 C.atomic_store_u64(voidptr(&wg.state), 0)
69 for num_waiters > 0 {
70 wg.sem.post()
71 num_waiters--
72 }
73}
74
75// done is a convenience fn for add(-1).
76pub fn (mut wg WaitGroup) done() {
77 wg.add(-1)
78}
79
80// wait blocks until all tasks are done (task count becomes zero).
81pub fn (mut wg WaitGroup) wait() {
82 for {
83 mut state := C.atomic_load_u64(voidptr(&wg.state))
84 nrjobs := u32(state >> 32)
85 if nrjobs == 0 {
86 return
87 }
88 if C.atomic_compare_exchange_weak_u64(voidptr(&wg.state), voidptr(&state), state + 1) {
89 wg.sem.wait() // blocks until task_count becomes 0
90 if C.atomic_load_u64(voidptr(&wg.state)) != 0 {
91 panic('WaitGroup misuse: reused before previous wait() returned')
92 }
93 return
94 }
95 }
96}
97
98// go starts `f` in a new thread, arranging to call wg.add(1) before that,
99// and wg.done() in the same thread. The function `f` should not panic.
100// Calls to wg.go() should happen before the call to wg.wait().
101pub fn (mut wg WaitGroup) go(f fn ()) {
102 wg.add(1)
103 spawn fn (mut wg WaitGroup, f fn ()) {
104 f()
105 wg.done()
106 }(mut wg, f)
107}
108