v2 / vlib / x / async / task.v
86 lines · 79 sloc · 2.33 KB · 15fb60b77ea6073658aa8355b247f2e1ae03b714
Raw
1module async
2
3import context
4import sync
5
6struct TaskResult[T] {
7 value T
8 err IError = none
9}
10
11// Task represents one concurrent computation that produces either a value or an error.
12//
13// A Task is intentionally small: it starts one `spawn`, stores one result in a
14// bounded channel, and lets the owner consume that result with wait(). It does
15// not recover panics and it does not kill work that ignores cancellation.
16@[heap]
17pub struct Task[T] {
18mut:
19 ctx context.Context
20 cancel context.CancelFn = unsafe { nil }
21 result_ch chan TaskResult[T]
22 // Guards the one-shot wait contract. The result channel carries the actual
23 // value/error, so no additional shared result state is needed.
24 mutex &sync.Mutex = sync.new_mutex()
25 waited bool
26}
27
28// run starts f with a background context and returns a Task for its result.
29//
30// The returned Task owns a derived context that is canceled when f finishes.
31pub fn run[T](f TaskFn[T]) !&Task[T] {
32 return run_with_context[T](context.background(), f)
33}
34
35// run_with_context starts f with a context derived from parent.
36//
37// Cancellation is cooperative. If parent is canceled, f must observe ctx.done()
38// and return. The result channel is buffered so f can publish its single result
39// even if the owner has not called wait() yet.
40pub fn run_with_context[T](parent context.Context, f TaskFn[T]) !&Task[T] {
41 if f == unsafe { nil } {
42 return error(err_nil_job)
43 }
44 ctx, cancel := new_cancel_context(parent)
45 mut task := &Task[T]{
46 ctx: context.Context(ctx)
47 cancel: cancel
48 result_ch: chan TaskResult[T]{cap: 1}
49 mutex: sync.new_mutex()
50 }
51 spawn fn [T](task &Task[T], f TaskFn[T]) {
52 mut job_ctx := task.ctx
53 value := f(mut job_ctx) or {
54 task.result_ch <- TaskResult[T]{
55 err: err
56 }
57 task.cancel()
58 return
59 }
60 task.result_ch <- TaskResult[T]{
61 value: value
62 }
63 task.cancel()
64 }(task, f)
65 return task
66}
67
68// wait blocks until the task publishes its result, then returns the value or error.
69//
70// wait is one-shot. A second call returns a stable error instead of blocking on
71// an already-consumed result channel.
72pub fn (mut task Task[T]) wait() !T {
73 task.mutex.lock()
74 if task.waited {
75 task.mutex.unlock()
76 return error(err_task_wait_called)
77 }
78 task.waited = true
79 task.mutex.unlock()
80
81 result := <-task.result_ch
82 if result.err !is none {
83 return result.err
84 }
85 return result.value
86}
87