v / examples / thread_safety / queue.v
94 lines · 79 sloc · 2.54 KB · 7a0febb12dad9fa4711bfcff68b71dfc0003dca1
Raw
1/*
2This example demonstrates thread safety using a queue of callbacks.
3
4### Functions:
5- `producer`: creates a callback and adds it to the queue.
6- `consumer`: consumes a callback from the queue and runs it.
7- `heavy_processing`: a heavy processing function that is added to the queue.
8
9### Thread Safety:
10- The `fn producer` function is protected by a mutex. It locks the mutex before adding a callback
11to the queue and unlocks it after adding the callback.
12- The `fn consumer` function is also protected by the same mutex. It locks the mutex before
13consuming a callback from the queue and unlocks it after consuming the callback.
14- The `heavy_processing` function is added to the queue by the main thread before the producer
15threads start producing callbacks. The main thread is the only thread that adds this function to
16the queue, so it doesn't need to be protected by a mutex.
17*/
18import time
19import sync
20
21type Callback = fn (id string)
22
23fn producer(producer_name string, mut arr []Callback, mut mtx sync.Mutex) {
24 for i in 1 .. 5 {
25 mtx.lock()
26 arr << fn [producer_name, i] (consumer_name string) {
27 println('task ${i} created by producer ${producer_name}: consumed by ${consumer_name}')
28 time.sleep(500 * time.millisecond)
29 }
30 println('Produced: ${i}')
31 time.sleep(50 * time.millisecond)
32 mtx.unlock()
33 }
34}
35
36fn consumer(consumer_name string, mut arr []Callback, mut mtx sync.Mutex) {
37 for {
38 mtx.lock()
39 if arr.len > 0 {
40 callback := arr[0]
41 arr.delete(0)
42
43 mtx.unlock()
44 callback(consumer_name) // run after unlocking to allow other threads to consume
45 continue
46 } else {
47 println('- No items to consume')
48 mtx.unlock()
49
50 // time.sleep(500 * time.millisecond)
51 // continue // uncomment to run forever
52
53 break // uncomment to stop after consuming all items
54 }
55 }
56}
57
58fn heavy_processing(queue_id string) {
59 println('One more: ${queue_id}')
60 time.sleep(500 * time.millisecond)
61}
62
63fn main() {
64 mut mtx := sync.new_mutex()
65 mut arr := []Callback{}
66
67 producer_threads := [
68 spawn producer('Paula', mut &arr, mut mtx),
69 spawn producer('Adriano', mut &arr, mut mtx),
70 spawn producer('Kaka', mut &arr, mut mtx),
71 spawn producer('Hitalo', mut &arr, mut mtx),
72 spawn producer('Jonh', mut &arr, mut mtx),
73 ]
74
75 mut consumer_threads := [
76 spawn consumer('consumer number 0', mut &arr, mut mtx),
77 ]
78
79 // spawn 16 consumers
80 for i in 1 .. 16 {
81 consumer_threads << spawn consumer('consumer number ${i}', mut &arr, mut mtx)
82 }
83
84 mtx.lock()
85 arr << heavy_processing
86 mtx.unlock()
87
88 for t in producer_threads {
89 t.wait()
90 }
91 for t in consumer_threads {
92 t.wait()
93 }
94}
95