v / vlib / v2 / util / worker_pool_notd_parallel.v
74 lines · 61 sloc · 1.26 KB · f3c5760b8838272e4789c38a1be9e03f9ceac351
Raw
1module util
2
3// TODO: remove workaround once fixed in compiler
4struct SharedIntWorkaround {
5pub mut:
6 value int
7}
8
9pub struct WorkerPool[T, Y] {
10mut:
11 workers []thread
12 queue_len shared SharedIntWorkaround
13 ch_in chan T
14 ch_out chan Y
15}
16
17pub fn WorkerPool.new[T, Y]() &WorkerPool[T, Y] {
18 return &WorkerPool[T, Y]{
19 ch_in: chan T{cap: 1000}
20 ch_out: chan Y{cap: 1000}
21 }
22}
23
24pub fn (mut wp WorkerPool[T, Y]) add_worker(t thread) {
25 wp.workers << t
26}
27
28pub fn (mut wp WorkerPool[T, Y]) active_jobs() int {
29 return lock wp.queue_len {
30 wp.queue_len.value
31 }
32}
33
34pub fn (mut wp WorkerPool[T, Y]) get_job() !T {
35 return <-wp.ch_in or { none }
36}
37
38pub fn (mut wp WorkerPool[T, Y]) push_result(result Y) {
39 wp.ch_out <- result
40 wp.job_done()
41}
42
43pub fn (mut wp WorkerPool[T, Y]) job_done() {
44 lock wp.queue_len {
45 wp.queue_len.value--
46 }
47}
48
49pub fn (mut wp WorkerPool[T, Y]) queue_job(job T) {
50 wp.ch_in <- job
51 lock wp.queue_len {
52 wp.queue_len.value++
53 }
54}
55
56pub fn (mut wp WorkerPool[T, Y]) queue_jobs(jobs []T) {
57 for job in jobs {
58 wp.queue_job(job)
59 }
60}
61
62pub fn (mut wp WorkerPool[T, Y]) wait_for_results() []Y {
63 mut results := []Y{}
64 for wp.active_jobs() > 0 {
65 result := <-wp.ch_out
66 results << result
67 }
68
69 wp.ch_in.close()
70 wp.ch_out.close()
71 wp.workers.wait()
72
73 return results
74}
75