v2 / vlib / x / async / timeout.v
79 lines · 74 sloc · 2.2 KB · 15fb60b77ea6073658aa8355b247f2e1ae03b714
Raw
1module async
2
3import context
4import time
5
6struct TimeoutResult {
7 err IError = none
8}
9
10// with_timeout runs f with a background context and returns an error if timeout expires first.
11pub fn with_timeout(timeout time.Duration, f JobFn) ! {
12 with_timeout_context(context.background(), timeout, f)!
13}
14
15// with_timeout_context runs f with a context derived from parent and bounded by timeout.
16//
17// If timeout expires before f returns, this returns `async: timeout` and
18// cancels the derived context. The job must observe the context to stop early;
19// x.async does not kill spawned work.
20pub fn with_timeout_context(parent context.Context, timeout time.Duration, f JobFn) ! {
21 if f == unsafe { nil } {
22 return error(err_nil_job)
23 }
24 async_ctx, cancel := new_timeout_context(parent, timeout)
25 mut ctx := context.Context(async_ctx)
26 defer {
27 cancel()
28 }
29 initial_err := ctx.err()
30 if initial_err !is none {
31 if initial_err.msg() == context_deadline_exceeded && async_ctx.was_canceled_by_timeout() {
32 return error(err_timeout)
33 }
34 return initial_err
35 }
36 // The channel is buffered so a non-cooperative job can still publish its
37 // result later without blocking after the caller has returned on timeout.
38 result_ch := chan TimeoutResult{cap: 1}
39 spawn run_timeout_job(ctx, f, result_ch)
40 done_ch := ctx.done()
41 select {
42 result := <-result_ch {
43 if result.err !is none {
44 ctx_err := ctx.err()
45 if ctx_err !is none && ctx_err.msg() == context_deadline_exceeded
46 && result.err.msg() == context_deadline_exceeded {
47 if async_ctx.was_canceled_by_timeout() {
48 return error(err_timeout)
49 }
50 }
51 return result.err
52 }
53 return
54 }
55 _ := <-done_ch {
56 err := ctx.err()
57 if err !is none {
58 if err.msg() == context_deadline_exceeded && async_ctx.was_canceled_by_timeout() {
59 return error(err_timeout)
60 }
61 return err
62 }
63 return error(err_timeout)
64 }
65 }
66}
67
68fn run_timeout_job(ctx context.Context, f JobFn, result_ch chan TimeoutResult) {
69 // Spawned functions cannot receive mutable non-reference arguments, so the
70 // worker creates the mutable context interface value locally.
71 mut job_ctx := ctx
72 f(mut job_ctx) or {
73 result_ch <- TimeoutResult{
74 err: err
75 }
76 return
77 }
78 result_ch <- TimeoutResult{}
79}
80