v2 / vlib / context / onecontext / onecontext.v
164 lines · 144 sloc · 4.11 KB · 56e02481a4f0e421009413f58493620cc3a51f87
Raw
1module onecontext
2
3import context
4import sync
5import time
6
7// canceled is the error returned when the cancel function is called on a merged context
8pub const canceled = error('canceled context')
9
10@[heap]
11struct OneContext {
12mut:
13 ctx context.Context
14 ctxs []context.Context
15 done chan int
16 err IError = none
17 err_mutex &sync.Mutex = sync.new_mutex()
18 cancel_fn context.CancelFn = unsafe { nil }
19 cancel_ctx context.Context
20}
21
22// merge allows to merge multiple contexts
23// it returns the merged context
24pub fn merge(ctx context.Context, ctxs ...context.Context) (context.Context, context.CancelFn) {
25 mut background := context.background()
26 cancel_ctx, cancel := context.with_cancel(mut &background)
27 mut octx := &OneContext{
28 done: chan int{cap: 3}
29 ctx: ctx
30 ctxs: ctxs
31 cancel_fn: cancel
32 cancel_ctx: cancel_ctx
33 }
34 spawn octx.run()
35 return context.Context(octx), context.CancelFn(cancel)
36}
37
38// deadline returns the earliest deadline among all merged contexts,
39// or none if no context has a deadline set.
40pub fn (octx OneContext) deadline() ?time.Time {
41 mut min := time.Time{}
42 mut found := false
43
44 if deadline := octx.ctx.deadline() {
45 min = deadline
46 found = true
47 }
48
49 for ctx in octx.ctxs {
50 if deadline := ctx.deadline() {
51 if !found || deadline < min {
52 min = deadline
53 }
54 found = true
55 }
56 }
57
58 if !found {
59 return none
60 }
61
62 return min
63}
64
65// done returns the done channel, which is closed when the merged context is canceled.
66pub fn (octx OneContext) done() chan int {
67 return octx.done
68}
69
70// err returns the error from the merged context, or `none` if not yet canceled.
71pub fn (mut octx OneContext) err() IError {
72 octx.err_mutex.lock()
73 defer {
74 octx.err_mutex.unlock()
75 }
76 return octx.err
77}
78
79// value looks up a value by key across all merged contexts, returning
80// the first match found or none if no context holds the key.
81pub fn (octx OneContext) value(key context.Key) ?context.Any {
82 if value := octx.ctx.value(key) {
83 return value
84 }
85
86 for ctx in octx.ctxs {
87 if value := ctx.value(key) {
88 return value
89 }
90 }
91
92 return none
93}
94
95// run starts listening for cancellation signals from all merged contexts.
96pub fn (mut octx OneContext) run() {
97 mut wrapped_ctx := &octx.ctx
98 if octx.ctxs.len == 1 {
99 mut first_ctx := &octx.ctxs[0]
100 octx.run_two_contexts(mut wrapped_ctx, mut first_ctx)
101 return
102 }
103
104 octx.run_multiple_contexts(mut wrapped_ctx)
105 for mut ctx in octx.ctxs {
106 octx.run_multiple_contexts(mut ctx)
107 }
108}
109
110// str returns a string representation of the OneContext.
111pub fn (octx OneContext) str() string {
112 return ''
113}
114
115// cancel cancels the merged context with the given error, closing
116// the done channel and propagating cancellation to the underlying context.
117pub fn (mut octx OneContext) cancel(err IError) {
118 octx.cancel_fn()
119 octx.err_mutex.lock()
120 octx.err = err
121 octx.err_mutex.unlock()
122 if !octx.done.closed {
123 octx.done <- 0
124 octx.done.close()
125 }
126}
127
128// run_two_contexts spawns a listener that cancels the merged context
129// when either of the two given contexts is done.
130pub fn (mut octx OneContext) run_two_contexts(mut ctx1 context.Context, mut ctx2 context.Context) {
131 octx_cancel_done := octx.cancel_ctx.done()
132 c1done := ctx1.done()
133 c2done := ctx2.done()
134 spawn fn (mut octx OneContext, octx_cancel_done chan int, c1done chan int, c2done chan int, mut ctx1 context.Context, mut ctx2 context.Context) {
135 select {
136 _ := <-octx_cancel_done {
137 octx.cancel(canceled)
138 }
139 _ := <-c1done {
140 octx.cancel(ctx1.err())
141 }
142 _ := <-c2done {
143 octx.cancel(ctx2.err())
144 }
145 }
146 }(mut &octx, octx_cancel_done, c1done, c2done, mut ctx1, mut ctx2)
147}
148
149// run_multiple_contexts spawns a listener that cancels the merged context
150// when the given context is done.
151pub fn (mut octx OneContext) run_multiple_contexts(mut ctx context.Context) {
152 octx_cancel_done := octx.cancel_ctx.done()
153 cdone := ctx.done()
154 spawn fn (mut octx OneContext, octx_cancel_done chan int, cdone chan int, mut ctx context.Context) {
155 select {
156 _ := <-octx_cancel_done {
157 octx.cancel(canceled)
158 }
159 _ := <-cdone {
160 octx.cancel(ctx.err())
161 }
162 }
163 }(mut &octx, octx_cancel_done, cdone, mut ctx)
164}
165