v / vlib / x / async / timeout.v
103 lines · 95 sloc · 2.93 KB · 4389a3bf8b71672e61650d457e5b58dacf9e0ef7
Raw
1module async
2
3import context
4import time
5
6struct TimeoutResult {
7 err IError = none
8 finished_at time.Time
9}
10
11// with_timeout runs f with a background context and returns an error if timeout expires first.
12pub fn with_timeout(timeout time.Duration, f JobFn) ! {
13 with_timeout_context(context.background(), timeout, f)!
14}
15
16// with_timeout_context runs f with a context derived from parent and bounded by timeout.
17//
18// If timeout expires before f returns, this returns `async: timeout` and
19// cancels the derived context. The job must observe the context to stop early;
20// x.async does not kill spawned work.
21pub fn with_timeout_context(parent context.Context, timeout time.Duration, f JobFn) ! {
22 if f == unsafe { nil } {
23 return error(err_nil_job)
24 }
25 async_ctx, cancel := new_timeout_context(parent, timeout)
26 mut ctx := context.Context(async_ctx)
27 defer {
28 cancel()
29 }
30 initial_err := ctx.err()
31 if initial_err !is none {
32 if initial_err.msg() == context_deadline_exceeded && async_ctx.was_canceled_by_timeout() {
33 return error(err_timeout)
34 }
35 return initial_err
36 }
37 // The channel is buffered so a non-cooperative job can still publish its
38 // result later without blocking after the caller has returned on timeout.
39 result_ch := chan TimeoutResult{cap: 1}
40 spawn run_timeout_job(ctx, f, result_ch)
41 done_ch := ctx.done()
42 select {
43 result := <-result_ch {
44 handle_timeout_result(mut ctx, async_ctx, result)!
45 return
46 }
47 _ := <-done_ch {
48 handle_timeout_done(mut ctx, async_ctx)!
49 return
50 }
51 }
52}
53
54fn handle_timeout_result(mut ctx context.Context, async_ctx &AsyncContext, result TimeoutResult) ! {
55 ctx_err := ctx.err()
56 if result.finished_after_owned_timeout(async_ctx) {
57 if ctx_err !is none && !async_ctx.was_canceled_by_timeout() {
58 return ctx_err
59 }
60 return error(err_timeout)
61 }
62 if result.err !is none {
63 if ctx_err !is none && ctx_err.msg() == context_deadline_exceeded
64 && result.err.msg() == context_deadline_exceeded {
65 if async_ctx.was_canceled_by_timeout() {
66 return error(err_timeout)
67 }
68 }
69 return result.err
70 }
71}
72
73fn handle_timeout_done(mut ctx context.Context, async_ctx &AsyncContext) ! {
74 err := ctx.err()
75 if err !is none {
76 if err.msg() == context_deadline_exceeded && async_ctx.was_canceled_by_timeout() {
77 return error(err_timeout)
78 }
79 return err
80 }
81 return error(err_timeout)
82}
83
84fn (result TimeoutResult) finished_after_owned_timeout(async_ctx &AsyncContext) bool {
85 return async_ctx.deadline_src == .timeout && (async_ctx.deadline_at < result.finished_at
86 || async_ctx.deadline_at == result.finished_at)
87}
88
89fn run_timeout_job(ctx context.Context, f JobFn, result_ch chan TimeoutResult) {
90 // Spawned functions cannot receive mutable non-reference arguments, so the
91 // worker creates the mutable context interface value locally.
92 mut job_ctx := ctx
93 f(mut job_ctx) or {
94 result_ch <- TimeoutResult{
95 err: err
96 finished_at: time.now()
97 }
98 return
99 }
100 result_ch <- TimeoutResult{
101 finished_at: time.now()
102 }
103}
104