v2 / vlib / sync / pool / pool_test.v
100 lines · 89 sloc · 2.32 KB · 7e3657247b128052b36dab7b098fdbd13e50169b
Raw
1import time
2import sync
3import sync.pool
4
5pub struct SResult {
6 s string
7}
8
9pub struct IResult {
10 i int
11}
12
13struct SeenContext {
14mut:
15 mutex &sync.Mutex = sync.new_mutex()
16 seen []int
17}
18
19fn worker_s(mut p pool.PoolProcessor, idx int, worker_id int) &SResult {
20 item := p.get_item[string](idx)
21 println('worker_s worker_id: ${worker_id} | idx: ${idx} | item: ${item}')
22 time.sleep(3 * time.millisecond)
23 return &SResult{'${item} ${item}'}
24}
25
26fn worker_i(mut p pool.PoolProcessor, idx int, worker_id int) &IResult {
27 item := p.get_item[int](idx)
28 println('worker_i worker_id: ${worker_id} | idx: ${idx} | item: ${item}')
29 time.sleep(5 * time.millisecond)
30 return &IResult{item * 1000}
31}
32
33fn worker_reuse(mut p pool.PoolProcessor, idx int, _ int) voidptr {
34 item := p.get_item[int](idx)
35 mut ctx := unsafe { &SeenContext(p.get_shared_context()) }
36 ctx.mutex.lock()
37 ctx.seen << item
38 ctx.mutex.unlock()
39 return pool.no_result
40}
41
42fn test_work_on_strings() {
43 mut pool_s := pool.new_pool_processor(
44 callback: worker_s
45 maxjobs: 8
46 )
47
48 pool_s.work_on_items(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'])
49 for x in pool_s.get_results[SResult]() {
50 println(x.s)
51 assert x.s.len > 1
52 }
53 println('---------- pool_s.get_results_ref: --------------')
54 for x in pool_s.get_results_ref[SResult]() {
55 println(x.s)
56 assert x.s.len > 1
57 }
58}
59
60fn test_work_on_ints() {
61 // Note: since maxjobs is left empty here,
62 // the pool processor will use njobs = runtime.nr_jobs so that
63 // it will work optimally without overloading the system
64 mut pool_i := pool.new_pool_processor(
65 callback: worker_i
66 )
67
68 pool_i.work_on_items([1, 2, 3, 4, 5, 6, 7, 8])
69 for x in pool_i.get_results[IResult]() {
70 println(x.i)
71 assert x.i > 100
72 }
73 println('---------- pool_i.get_results_ref: --------------')
74 for x in pool_i.get_results_ref[IResult]() {
75 println(x.i)
76 assert x.i > 100
77 }
78}
79
80fn test_pool_can_be_reused() {
81 mut ctx := &SeenContext{}
82 mut pool_i := pool.new_pool_processor(
83 callback: worker_reuse
84 maxjobs: 2
85 )
86 pool_i.set_shared_context(ctx)
87 pool_i.work_on_items([1, 2, 3])
88 ctx.mutex.lock()
89 mut first_seen := ctx.seen.clone()
90 ctx.seen = []int{}
91 ctx.mutex.unlock()
92 first_seen.sort()
93 assert first_seen == [1, 2, 3]
94 pool_i.work_on_items([4, 5])
95 ctx.mutex.lock()
96 mut second_seen := ctx.seen.clone()
97 ctx.mutex.unlock()
98 second_seen.sort()
99 assert second_seen == [4, 5]
100}
101