v / vlib / x / async / context.v
180 lines · 166 sloc · 4.28 KB · 4389a3bf8b71672e61650d457e5b58dacf9e0ef7
Raw
1module async
2
3import context
4import sync
5import time
6
7enum CancelSource {
8 unknown
9 parent
10 timeout
11 cancel
12}
13
14// AsyncContext is the small `context.Context` implementation owned by x.async.
15//
16// The public API still accepts and returns the standard `context.Context`
17// interface. Internally, x.async needs a derived context whose cancellation is
18// simple, idempotent, and non-blocking for Group and timeout lifecycles.
19//
20// This is not a scheduler or runtime. It is only the cancellation signal shared
21// by jobs launched through this module.
22@[heap]
23struct AsyncContext {
24mut:
25 parent context.Context
26 done chan int
27 mutex &sync.Mutex = sync.new_mutex()
28 err IError = none
29 cancel_src CancelSource
30 has_deadline bool
31 deadline_at time.Time
32 deadline_src CancelSource
33}
34
35fn new_cancel_context(parent context.Context) (&AsyncContext, context.CancelFn) {
36 mut ctx := &AsyncContext{
37 parent: parent
38 done: chan int{cap: 1}
39 mutex: sync.new_mutex()
40 }
41 if !ctx.propagate_existing_parent_error() {
42 spawn watch_parent_context(mut ctx)
43 }
44 cancel_fn := fn [mut ctx] () {
45 ctx.cancel_with(error(context_canceled), .cancel)
46 }
47 return ctx, context.CancelFn(cancel_fn)
48}
49
50fn new_timeout_context(parent context.Context, timeout time.Duration) (&AsyncContext, context.CancelFn) {
51 mut deadline_at := time.now().add(timeout)
52 mut deadline_src := CancelSource.timeout
53 if parent_deadline := parent.deadline() {
54 if parent_deadline < deadline_at {
55 deadline_at = parent_deadline
56 deadline_src = .parent
57 }
58 }
59 mut ctx := &AsyncContext{
60 parent: parent
61 done: chan int{cap: 1}
62 mutex: sync.new_mutex()
63 has_deadline: true
64 deadline_at: deadline_at
65 deadline_src: deadline_src
66 }
67 if !ctx.propagate_existing_parent_error() {
68 spawn watch_parent_context(mut ctx)
69 }
70 if deadline_src == .timeout {
71 effective_timeout := deadline_at - time.now()
72 if effective_timeout.nanoseconds() <= 0 {
73 ctx.cancel_with(error(context_deadline_exceeded), .timeout)
74 } else {
75 spawn watch_timeout_context(mut ctx, effective_timeout)
76 }
77 }
78 cancel_fn := fn [mut ctx] () {
79 ctx.cancel_with(error(context_canceled), .cancel)
80 }
81 return ctx, context.CancelFn(cancel_fn)
82}
83
84// deadline returns x.async's own deadline when present, otherwise the parent's
85// deadline. This preserves deadline metadata for callers that inspect it.
86pub fn (ctx &AsyncContext) deadline() ?time.Time {
87 if ctx.has_deadline {
88 return ctx.deadline_at
89 }
90 return ctx.parent.deadline()
91}
92
93// value delegates to the parent context. x.async does not add request values.
94pub fn (ctx &AsyncContext) value(key context.Key) ?context.Any {
95 return ctx.parent.value(key)
96}
97
98// done returns the cancellation channel shared by jobs using this context.
99pub fn (mut ctx AsyncContext) done() chan int {
100 ctx.mutex.lock()
101 done := ctx.done
102 ctx.mutex.unlock()
103 return done
104}
105
106// err returns the local cancellation reason, or the parent's reason if the
107// parent is already canceled before propagation reaches this context.
108pub fn (mut ctx AsyncContext) err() IError {
109 ctx.mutex.lock()
110 err := ctx.err
111 ctx.mutex.unlock()
112 if err !is none {
113 return err
114 }
115 return ctx.parent.err()
116}
117
118fn (ctx &AsyncContext) was_canceled_by_timeout() bool {
119 ctx.mutex.lock()
120 cancel_src := ctx.cancel_src
121 ctx.mutex.unlock()
122 return cancel_src == .timeout
123}
124
125fn (mut ctx AsyncContext) cancel_with(err IError, cancel_src CancelSource) {
126 if err is none {
127 return
128 }
129 ctx.mutex.lock()
130 if ctx.err !is none {
131 ctx.mutex.unlock()
132 return
133 }
134 ctx.err = err
135 ctx.cancel_src = cancel_src
136 if !ctx.done.closed {
137 ctx.done <- 0
138 ctx.done.close()
139 }
140 ctx.mutex.unlock()
141}
142
143fn (mut ctx AsyncContext) propagate_existing_parent_error() bool {
144 mut parent := ctx.parent
145 err := parent.err()
146 if err !is none {
147 ctx.cancel_with(err, .parent)
148 return true
149 }
150 return false
151}
152
153fn watch_parent_context(mut ctx AsyncContext) {
154 mut parent := ctx.parent
155 parent_done := parent.done()
156 local_done := ctx.done()
157 select {
158 _ := <-local_done {
159 return
160 }
161 _ := <-parent_done {
162 err := parent.err()
163 if err !is none {
164 ctx.cancel_with(err, .parent)
165 }
166 }
167 }
168}
169
170fn watch_timeout_context(mut ctx AsyncContext, timeout time.Duration) {
171 local_done := ctx.done()
172 select {
173 _ := <-local_done {
174 return
175 }
176 timeout {
177 ctx.cancel_with(error(context_deadline_exceeded), .timeout)
178 }
179 }
180}
181