v2 / vlib / arrays / parallel / parallel.v
109 lines · 98 sloc · 2.9 KB · 9771491c9463ed43400dd18dcf21ee0bbd97290a
Raw
1module parallel
2
3import sync
4import runtime
5
6// Params contains the optional parameters that can be passed to `run` and `amap`.
7@[params]
8pub struct Params {
9pub mut:
10 workers int // 0 by default, so that VJOBS will be used, through runtime.nr_jobs()
11}
12
13fn limited_workers(max_workers int, ilen int) int {
14 // create a limited amount of workers to handle the load
15 workers := if max_workers != 0 { max_workers } else { runtime.nr_jobs() }
16 if ilen < workers {
17 return ilen
18 }
19 return workers
20}
21
22// run lets the user run an array of input with a user provided function in parallel.
23// It limits the number of worker threads to min(num_workers, num_cpu).
24// The function aborts if an error is encountered.
25// Example: parallel.run([1, 2, 3, 4, 5], |i| println(i))
26pub fn run[T](input []T, worker fn (T), opt Params) {
27 if input.len == 0 {
28 return
29 }
30 workers := limited_workers(opt.workers, input.len)
31 ch := chan T{cap: workers * 2}
32 mut wg := sync.new_waitgroup()
33 wg.add(input.len)
34 for _ in 0 .. workers {
35 spawn fn [ch, worker, mut wg] [T]() {
36 for {
37 task := <-ch or { break }
38 worker(task)
39 wg.done()
40 }
41 }()
42 }
43
44 // put the input into the channel
45 for i in input {
46 ch <- i
47 }
48
49 // wait for all tasks to complete
50 wg.wait()
51 ch.close() // this will signal all the workers to exit, and we can return, without having to wait for them to finish
52}
53
54struct Task[T, R] {
55 idx int
56 input T
57 result R
58}
59
60// amap lets the user run an array of input with a user provided function in parallel.
61// It limits the number of worker threads to max number of cpus.
62// The worker function can return a value. The returning array maintains the input order.
63// Any error handling should have happened within the worker function.
64// Example: squares := parallel.amap([1, 2, 3, 4, 5], |i| i * i); assert squares == [1, 4, 9, 16, 25]
65pub fn amap[T, R](input []T, worker fn (T) R, opt Params) []R {
66 if input.len == 0 {
67 return []
68 }
69 mut tasks := []Task[T, R]{len: input.len}
70 // the tasks array will be passed to the closure of each worker by reference, so that it could
71 // then modify the same tasks:
72 mut tasks_ref := &tasks
73
74 workers := limited_workers(opt.workers, input.len)
75 // use a buffered channel for transfering the tasks, that has enough space to keep all the workers busy,
76 // without blocking the main thread needlessly
77 ch := chan Task[T, R]{cap: workers * 2}
78 mut wg := sync.new_waitgroup()
79 wg.add(input.len)
80 for _ in 0 .. workers {
81 spawn fn [ch, worker, mut wg, mut tasks_ref] [T, R]() {
82 for {
83 mut task := <-ch or { break }
84 unsafe {
85 tasks_ref[task.idx] = Task[T, R]{
86 idx: task.idx
87 input: task.input
88 result: worker(task.input)
89 }
90 }
91 wg.done()
92 }
93 }()
94 }
95
96 // put the input into the channel
97 for idx, inp in input {
98 ch <- Task[T, R]{
99 idx: idx
100 input: inp
101 }
102 }
103
104 // wait for all tasks to complete
105 wg.wait()
106 ch.close()
107 tasks.sort(a.idx < b.idx)
108 return tasks.map(it.result)
109}
110