v2 / vlib / x / async / context.v
178 lines · 164 sloc · 4.23 KB · 15fb60b77ea6073658aa8355b247f2e1ae03b714
Raw
1module async
2
3import context
4import sync
5import time
6
7enum CancelSource {
8 unknown
9 parent
10 timeout
11 cancel
12}
13
14// AsyncContext is the small `context.Context` implementation owned by x.async.
15//
16// The public API still accepts and returns the standard `context.Context`
17// interface. Internally, x.async needs a derived context whose cancellation is
18// simple, idempotent, and non-blocking for Group and timeout lifecycles.
19//
20// This is not a scheduler or runtime. It is only the cancellation signal shared
21// by jobs launched through this module.
22@[heap]
23struct AsyncContext {
24mut:
25 parent context.Context
26 done chan int
27 mutex &sync.Mutex = sync.new_mutex()
28 err IError = none
29 cancel_src CancelSource
30 has_deadline bool
31 deadline_at time.Time
32}
33
34fn new_cancel_context(parent context.Context) (&AsyncContext, context.CancelFn) {
35 mut ctx := &AsyncContext{
36 parent: parent
37 done: chan int{cap: 1}
38 mutex: sync.new_mutex()
39 }
40 if !ctx.propagate_existing_parent_error() {
41 spawn watch_parent_context(mut ctx)
42 }
43 cancel_fn := fn [mut ctx] () {
44 ctx.cancel_with(error(context_canceled), .cancel)
45 }
46 return ctx, context.CancelFn(cancel_fn)
47}
48
49fn new_timeout_context(parent context.Context, timeout time.Duration) (&AsyncContext, context.CancelFn) {
50 mut deadline_at := time.now().add(timeout)
51 mut deadline_src := CancelSource.timeout
52 if parent_deadline := parent.deadline() {
53 if parent_deadline < deadline_at {
54 deadline_at = parent_deadline
55 deadline_src = .parent
56 }
57 }
58 mut ctx := &AsyncContext{
59 parent: parent
60 done: chan int{cap: 1}
61 mutex: sync.new_mutex()
62 has_deadline: true
63 deadline_at: deadline_at
64 }
65 if !ctx.propagate_existing_parent_error() {
66 spawn watch_parent_context(mut ctx)
67 }
68 if deadline_src == .timeout {
69 effective_timeout := deadline_at - time.now()
70 if effective_timeout.nanoseconds() <= 0 {
71 ctx.cancel_with(error(context_deadline_exceeded), .timeout)
72 } else {
73 spawn watch_timeout_context(mut ctx, effective_timeout)
74 }
75 }
76 cancel_fn := fn [mut ctx] () {
77 ctx.cancel_with(error(context_canceled), .cancel)
78 }
79 return ctx, context.CancelFn(cancel_fn)
80}
81
82// deadline returns x.async's own deadline when present, otherwise the parent's
83// deadline. This preserves deadline metadata for callers that inspect it.
84pub fn (ctx &AsyncContext) deadline() ?time.Time {
85 if ctx.has_deadline {
86 return ctx.deadline_at
87 }
88 return ctx.parent.deadline()
89}
90
91// value delegates to the parent context. x.async does not add request values.
92pub fn (ctx &AsyncContext) value(key context.Key) ?context.Any {
93 return ctx.parent.value(key)
94}
95
96// done returns the cancellation channel shared by jobs using this context.
97pub fn (mut ctx AsyncContext) done() chan int {
98 ctx.mutex.lock()
99 done := ctx.done
100 ctx.mutex.unlock()
101 return done
102}
103
104// err returns the local cancellation reason, or the parent's reason if the
105// parent is already canceled before propagation reaches this context.
106pub fn (mut ctx AsyncContext) err() IError {
107 ctx.mutex.lock()
108 err := ctx.err
109 ctx.mutex.unlock()
110 if err !is none {
111 return err
112 }
113 return ctx.parent.err()
114}
115
116fn (ctx &AsyncContext) was_canceled_by_timeout() bool {
117 ctx.mutex.lock()
118 cancel_src := ctx.cancel_src
119 ctx.mutex.unlock()
120 return cancel_src == .timeout
121}
122
123fn (mut ctx AsyncContext) cancel_with(err IError, cancel_src CancelSource) {
124 if err is none {
125 return
126 }
127 ctx.mutex.lock()
128 if ctx.err !is none {
129 ctx.mutex.unlock()
130 return
131 }
132 ctx.err = err
133 ctx.cancel_src = cancel_src
134 if !ctx.done.closed {
135 ctx.done <- 0
136 ctx.done.close()
137 }
138 ctx.mutex.unlock()
139}
140
141fn (mut ctx AsyncContext) propagate_existing_parent_error() bool {
142 mut parent := ctx.parent
143 err := parent.err()
144 if err !is none {
145 ctx.cancel_with(err, .parent)
146 return true
147 }
148 return false
149}
150
151fn watch_parent_context(mut ctx AsyncContext) {
152 mut parent := ctx.parent
153 parent_done := parent.done()
154 local_done := ctx.done()
155 select {
156 _ := <-local_done {
157 return
158 }
159 _ := <-parent_done {
160 err := parent.err()
161 if err !is none {
162 ctx.cancel_with(err, .parent)
163 }
164 }
165 }
166}
167
168fn watch_timeout_context(mut ctx AsyncContext, timeout time.Duration) {
169 local_done := ctx.done()
170 select {
171 _ := <-local_done {
172 return
173 }
174 timeout {
175 ctx.cancel_with(error(context_deadline_exceeded), .timeout)
176 }
177 }
178}
179