v / vlib / sync / pool / pool.c.v
191 lines · 171 sloc · 5.74 KB · 7e3657247b128052b36dab7b098fdbd13e50169b
Raw
1module pool
2
3import sync
4import runtime
5
6@[trusted]
7fn C.atomic_fetch_add_u32(voidptr, u32) u32
8
9pub const no_result = unsafe { nil }
10
11pub struct PoolProcessor {
12 thread_cb voidptr
13mut:
14 njobs int
15 items []voidptr
16 results shared []voidptr
17 ntask u32 // reading/writing to this should be atomic
18 waitgroup sync.WaitGroup
19 shared_context voidptr
20 thread_contexts []voidptr
21}
22
23pub type ThreadCB = fn (mut p PoolProcessor, idx int, task_id int) voidptr
24
25fn empty_cb(mut _p PoolProcessor, _idx int, _task_id int) voidptr {
26 unsafe {
27 return nil
28 }
29}
30
31pub struct PoolProcessorConfig {
32pub:
33 maxjobs int
34 callback ThreadCB = empty_cb
35}
36
37// new_pool_processor returns a new PoolProcessor instance.
38// The parameters of new_pool_processor are:
39// context.maxjobs: when 0 (the default), the PoolProcessor will use a
40// number of threads, that is optimal for your system to process your items.
41// context.callback: this should be a callback function, that each worker
42// thread in the pool will run for each item.
43// The callback function will receive as parameters:
44// 1) the PoolProcessor instance, so it can call
45// p.get_item[int](idx) to get the actual item at index idx
46// 2) idx - the index of the currently processed item
47// 3) task_id - the index of the worker thread in which the callback
48// function is running.
49pub fn new_pool_processor(context PoolProcessorConfig) &PoolProcessor {
50 if context.callback == unsafe { nil } {
51 panic('You need to pass a valid callback to new_pool_processor.')
52 }
53 mut pool := PoolProcessor{
54 items: []
55 results: []
56 shared_context: unsafe { nil }
57 thread_contexts: []
58 njobs: context.maxjobs
59 ntask: 0
60 thread_cb: voidptr(context.callback)
61 }
62 pool.waitgroup.init()
63 return &pool
64}
65
66// set_max_jobs gives you the ability to override the number
67// of jobs *after* the PoolProcessor had been created already.
68pub fn (mut pool PoolProcessor) set_max_jobs(njobs int) {
69 pool.njobs = njobs
70}
71
72// work_on_items receives a list of items of type T,
73// then starts a work pool of pool.njobs threads, each running
74// pool.thread_cb in a loop, until all items in the list,
75// are processed.
76// When pool.njobs is 0, the number of jobs is determined
77// by the number of available cores on the system.
78// work_on_items returns *after* all threads finish.
79// You can optionally call get_results after that.
80pub fn (mut pool PoolProcessor) work_on_items[T](items []T) {
81 pool.work_on_pointers(unsafe { items.pointers() })
82}
83
84pub fn (mut pool PoolProcessor) work_on_pointers(items []voidptr) {
85 mut njobs := runtime.nr_jobs()
86 if pool.njobs > 0 {
87 njobs = pool.njobs
88 }
89 unsafe {
90 pool.ntask = 0
91 pool.thread_contexts = []voidptr{len: items.len}
92 lock pool.results {
93 pool.results = []voidptr{len: items.len}
94 }
95 pool.items = []voidptr{cap: items.len}
96 pool.items << items
97 pool.waitgroup.add(njobs)
98 for i := 0; i < njobs; i++ {
99 if njobs > 1 {
100 spawn process_in_thread(mut pool, i)
101 } else {
102 // do not run concurrently, just use the same thread:
103 process_in_thread(mut pool, i)
104 }
105 }
106 }
107 pool.waitgroup.wait()
108}
109
110// process_in_thread does the actual work of worker thread.
111// It is a workaround for the current inability to pass a
112// method in a callback.
113fn process_in_thread(mut pool PoolProcessor, task_id int) {
114 cb := ThreadCB(pool.thread_cb)
115 ilen := pool.items.len
116 for {
117 idx := int(C.atomic_fetch_add_u32(voidptr(&pool.ntask), 1))
118 if idx >= ilen {
119 break
120 }
121 res := cb(mut pool, idx, task_id)
122 lock pool.results {
123 pool.results[idx] = res
124 }
125 }
126 pool.waitgroup.done()
127}
128
129// get_item - called by the worker callback.
130// Retrieves a type safe instance of the currently processed item
131pub fn (pool &PoolProcessor) get_item[T](idx int) T {
132 return unsafe { *(&T(pool.items[idx])) }
133}
134
135// get_result - called by the main thread to get a specific result.
136// Retrieves a type safe instance of the produced result.
137pub fn (pool &PoolProcessor) get_result[T](idx int) T {
138 rlock pool.results {
139 return unsafe { *(&T(pool.results[idx])) }
140 }
141}
142
143// get_results - get a list of type safe results in the main thread.
144pub fn (pool &PoolProcessor) get_results[T]() []T {
145 mut res := []T{cap: pool.results.len}
146 for i in 0 .. pool.results.len {
147 rlock pool.results {
148 res << unsafe { *(&T(pool.results[i])) }
149 }
150 }
151 return res
152}
153
154// get_results_ref - get a list of type safe results in the main thread.
155pub fn (pool &PoolProcessor) get_results_ref[T]() []&T {
156 mut res := []&T{cap: pool.results.len}
157 for i in 0 .. pool.results.len {
158 rlock pool.results {
159 res << unsafe { &T(pool.results[i]) }
160 }
161 }
162 return res
163}
164
165// set_shared_context - can be called during the setup so that you can
166// provide a context that is shared between all worker threads, like
167// common options/settings.
168pub fn (mut pool PoolProcessor) set_shared_context(context voidptr) {
169 pool.shared_context = context
170}
171
172// get_shared_context - can be called in each worker callback, to get
173// the context set by pool.set_shared_context
174pub fn (pool &PoolProcessor) get_shared_context() voidptr {
175 return pool.shared_context
176}
177
178// set_thread_context - can be called during the setup at the start of
179// each worker callback, so that the worker callback can have some thread
180// local storage area where it can write/read information that is private
181// to the given thread, without worrying that it will get overwritten by
182// another thread
183pub fn (mut pool PoolProcessor) set_thread_context(idx int, context voidptr) {
184 pool.thread_contexts[idx] = context
185}
186
187// get_thread_context - returns a pointer, that was set with
188// pool.set_thread_context . This pointer is private to each thread.
189pub fn (pool &PoolProcessor) get_thread_context(idx int) voidptr {
190 return pool.thread_contexts[idx]
191}
192