v / vlib / v2 / util / worker_pool_d_parallel.v
86 lines · 73 sloc · 1.67 KB · c4a42aa8b6a4fdfda86f1093df14da0aced148ea
Raw
1module util
2
3// TODO: remove workaround once fixed in compiler
4struct SharedIntWorkaround {
5pub mut:
6 value int
7}
8
9pub struct WorkerPool[T, Y] {
10mut:
11 workers []thread
12 queue_len shared SharedIntWorkaround // jobs queued but not yet completed
13 total_expected shared SharedIntWorkaround // total results expected
14 ch_in chan T
15 ch_out chan Y
16}
17
18pub fn WorkerPool.new[T, Y]() &WorkerPool[T, Y] {
19 return &WorkerPool[T, Y]{
20 ch_in: chan T{cap: 1000}
21 ch_out: chan Y{cap: 1000}
22 }
23}
24
25pub fn (mut wp WorkerPool[T, Y]) add_worker(t thread) {
26 wp.workers << t
27}
28
29pub fn (mut wp WorkerPool[T, Y]) active_jobs() int {
30 return lock wp.queue_len {
31 wp.queue_len.value
32 }
33}
34
35pub fn (mut wp WorkerPool[T, Y]) get_job() !T {
36 return <-wp.ch_in or { none }
37}
38
39pub fn (mut wp WorkerPool[T, Y]) push_result(result Y) {
40 wp.ch_out <- result
41 wp.job_done()
42}
43
44pub fn (mut wp WorkerPool[T, Y]) job_done() {
45 lock wp.queue_len {
46 wp.queue_len.value--
47 }
48}
49
50pub fn (mut wp WorkerPool[T, Y]) queue_job(job T) {
51 lock wp.queue_len {
52 wp.queue_len.value++
53 }
54 lock wp.total_expected {
55 wp.total_expected.value++
56 }
57 wp.ch_in <- job
58}
59
60pub fn (mut wp WorkerPool[T, Y]) queue_jobs(jobs []T) {
61 for job in jobs {
62 wp.queue_job(job)
63 }
64}
65
66pub fn (mut wp WorkerPool[T, Y]) wait_for_results() []Y {
67 mut results := []Y{}
68 // Wait for total_expected results
69 mut expected := lock wp.total_expected {
70 wp.total_expected.value
71 }
72 for results.len < expected {
73 result := <-wp.ch_out
74 results << result
75 // Re-check expected in case more jobs were queued
76 expected = lock wp.total_expected {
77 wp.total_expected.value
78 }
79 }
80
81 wp.ch_in.close()
82 wp.ch_out.close()
83 wp.workers.wait()
84
85 return results
86}
87