v2 / vlib / x / async / README.md
480 lines · 374 sloc · 15.86 KB · 215cd9b985f66043cab014c9e47deaa98592e1c0
Raw

x.async

x.async is a small structured-concurrency layer for V programs. It composes the primitives that already exist in V: spawn, sync.WaitGroup, channels, context, and time.

The module does not add a scheduler, does not change the language, and does not implement async/await. It must not become a hidden runtime, event loop, external dependency, or competing concurrency model. Its purpose is narrower: make common concurrent control-flow easier to write, stop, test, and review.

Safety is the primary design constraint. Public APIs should be defensive, shared state must be synchronized explicitly, errors must not be lost silently, channels must not leave workers blocked without a consumer, and behavior must remain robust in -prod.

The current API intentionally centers on five building blocks, plus small context and function-type helpers:

Why

Raw spawn plus sync.WaitGroup is explicit and fast, but real applications quickly need the same extra wiring around it:

Quick Start

import context
import time
import x.async

fn main() {
    parent := context.background()
    mut group := async.new_group(parent)

    group.go(fn (mut ctx context.Context) ! {
        // A long-running job should observe ctx.done().
        done := ctx.done()
        select {
            _ := <-done {
                return ctx.err()
            }
            50 * time.millisecond {
                return
            }
        }
    })!

    group.go(fn (mut ctx context.Context) ! {
        _ = ctx
        return error('stop the group')
    })!

    group.wait() or { eprintln('group failed: ${err.msg()}') }
}

API

Context helpers

async.background() returns context.background().

async.with_cancel() returns a cancellable context derived from background:

ctx, cancel := async.with_cancel()
defer {
    cancel()
}

These helpers are convenience wrappers. Code that already has a context.Context should pass it directly to new_group() or with_timeout_context().

JobFn

pub type JobFn = fn (mut context.Context) !
pub type TaskFn[T] = fn (mut context.Context) !T

Every x.async job receives a context. This is a deliberate choice: cancellation is part of the function signature instead of a hidden global side channel.

Cooperative Cancellation

Cancellation in x.async is cooperative. The module closes the shared context's done() channel, but it does not interrupt, kill, or preempt a running thread.

A job that can run for a long time should check the context:

fn worker(mut ctx context.Context) ! {
    done := ctx.done()
    for {
        select {
            _ := <-done {
                return ctx.err()
            }
            10 * time.millisecond {
                // Do a small unit of work.
            }
        }
    }
}

If a job ignores ctx.done(), Group.wait() will still wait for it to return. Pool.close() and every() also wait for running non-cooperative jobs to return. with_timeout() returns when the timeout expires, but the ignored job may keep running until it finishes naturally.

Group

Group coordinates related jobs.

parent := context.background()
mut group := async.new_group(parent)

group.go(fn (mut ctx context.Context) ! {
    serve_http(mut ctx)!
})!

group.go(fn (mut ctx context.Context) ! {
    cleanup_loop(mut ctx)!
})!

group.wait()!

Guarantees:

Task

Task[T] represents one concurrent computation that returns either a value or an error.

mut task := async.run[string](fn (mut ctx context.Context) !string {
    _ = ctx
    return load_config()!
})!

config := task.wait()!

Use run_with_context() when the task should follow an existing parent context:

parent := context.background()
mut task := async.run_with_context[int](parent, fn (mut ctx context.Context) !int {
    done := ctx.done()
    select {
        _ := <-done {
            return ctx.err()
        }
        50 * time.millisecond {
            return 42
        }
    }
})!

value := task.wait()!

Guarantees:

Pool

Pool limits how many jobs run concurrently and how many pending jobs can wait in memory.

mut pool := async.new_pool(workers: 4, queue_size: 128)!
defer {
    pool.close() or {}
}

pool.try_submit(fn (mut ctx context.Context) ! {
    process_message(mut ctx)!
})!

Use new_pool_with_context() when the pool should follow an existing parent context:

parent := context.background()
mut pool := async.new_pool_with_context(parent, workers: 2, queue_size: 32)!

Backpressure is explicit. try_submit() never waits for backlog space:

Periodic Jobs

every() runs a job repeatedly until its context is canceled or the job returns an error.

ctx, cancel := async.with_cancel()
defer {
    cancel()
}

async.every(ctx, 5 * time.second, fn (mut ctx context.Context) ! {
    cleanup_stale_clients(mut ctx)!
})!

Guarantees:

Timeout

with_timeout() runs one job with a background context and a timeout:

async.with_timeout(2 * time.second, fn (mut ctx context.Context) ! {
    done := ctx.done()
    select {
        _ := <-done {
            return ctx.err()
        }
        100 * time.millisecond {
            return
        }
    }
})!

with_timeout_context() derives the timeout from an existing parent context:

parent := context.background()
async.with_timeout_context(parent, 250 * time.millisecond, fn (mut ctx context.Context) ! {
    fetch_or_compute(mut ctx)!
})!

Error behavior:

Safety And Security Notes

x.async is about control-flow safety, not sandboxing.

Limits

This milestone does not include:

Examples

Small runnable examples live in vlib/x/async/examples/:

Tests

The targeted test suite lives next to the module:

v test vlib/x/async
v -prod test vlib/x/async

For automated validation, prefer the guarded script:

sh vlib/x/async/tools/validate.sh

It runs formatting verification, dev tests, and -prod tests serially with a fresh VTMP and VCACHE for that run. Do not run two V validation runners against the same checkout/cache unless each runner has isolated VTMP, VCACHE, and output paths. That isolation protects the validation harness from V build artefact collisions; it is separate from the runtime guarantees of x.async.

The tests cover successful groups, first-error propagation, empty waits, rejected submissions after wait(), one-shot task waits, task values and errors, pool worker limits, pool queue backpressure, pool close/wait behavior, periodic execution, periodic cancellation, non-overlapping periodic iterations, cooperative cancellation, timeout errors, parent cancellation, nil job rejection, parent deadline preservation, invalid intervals, zero/already-expired timeouts, stress cases, and jobs that ignore cancellation. Internal tests also verify that the derived AsyncContext closes done() and propagates parent cancellation.

Module-oriented integration tests live in vlib/x/async/tests/. They are synthetic and local: no external service, fixed port, path dependency, or fragile server shutdown is required. They currently cover net.http, net.websocket, mcp, and veb boundaries.

Benchmarks

Small local benchmarks live in vlib/x/async/benchmarks/:

sh vlib/x/async/benchmarks/run_async_benchmark.sh

The script uses the local ./v, runs serially, and isolates VTMP, VCACHE, and the benchmark executable output. It measures short default runs for Group, Task[T], Pool, with_timeout(), and every().

The default sizes are intentionally modest. They can be changed with XASYNC_BENCH_GROUP_ROUNDS, XASYNC_BENCH_GROUP_JOBS, XASYNC_BENCH_TASK_ROUNDS, XASYNC_BENCH_POOL_JOBS, XASYNC_BENCH_POOL_WORKERS, XASYNC_BENCH_TIMEOUT_ROUNDS, XASYNC_BENCH_EVERY_ITERATIONS, and XASYNC_BENCH_EVERY_INTERVAL_MS. Benchmark output is local diagnostic data, not a portable performance claim.