From 15fb60b77ea6073658aa8355b247f2e1ae03b714 Mon Sep 17 00:00:00 2001 From: GGRei Date: Fri, 1 May 2026 05:32:43 +0200 Subject: [PATCH] x.async: add structured concurrency helpers (#27029) --- vlib/x/async/README.md | 482 ++++++++++++++++++ vlib/x/async/async.v | 35 ++ vlib/x/async/benchmarks/README.md | 40 ++ vlib/x/async/benchmarks/async_benchmark.v | 173 +++++++ .../x/async/benchmarks/run_async_benchmark.sh | 34 ++ vlib/x/async/context.v | 178 +++++++ vlib/x/async/context_internal_test.v | 54 ++ vlib/x/async/errors.v | 22 + vlib/x/async/examples/README.md | 51 ++ vlib/x/async/examples/basic_group.v | 30 ++ vlib/x/async/examples/basic_task.v | 12 + vlib/x/async/examples/mcp/README.md | 18 + vlib/x/async/examples/mcp/tool_dispatch.v | 37 ++ vlib/x/async/examples/net_http/README.md | 21 + .../x/async/examples/net_http/request_batch.v | 47 ++ vlib/x/async/examples/net_websocket/README.md | 19 + .../examples/net_websocket/message_pipeline.v | 49 ++ vlib/x/async/examples/periodic.v | 49 ++ vlib/x/async/examples/timeout.v | 23 + vlib/x/async/examples/veb/README.md | 14 + vlib/x/async/examples/veb/app_lifecycle.v | 27 + vlib/x/async/examples/worker_pool.v | 45 ++ vlib/x/async/group.v | 114 +++++ vlib/x/async/group_test.v | 211 ++++++++ vlib/x/async/periodic.v | 68 +++ vlib/x/async/periodic_test.v | 214 ++++++++ vlib/x/async/pool.v | 161 ++++++ vlib/x/async/pool_test.v | 368 +++++++++++++ vlib/x/async/task.v | 86 ++++ vlib/x/async/task_test.v | 184 +++++++ vlib/x/async/tests/README.md | 28 + vlib/x/async/tests/mcp/README.md | 11 + vlib/x/async/tests/mcp/mcp_integration_test.v | 28 + vlib/x/async/tests/net_http/README.md | 11 + .../net_http/net_http_integration_test.v | 73 +++ vlib/x/async/tests/net_websocket/README.md | 11 + .../net_websocket_integration_test.v | 44 ++ vlib/x/async/tests/veb/README.md | 8 + vlib/x/async/tests/veb/veb_integration_test.v | 18 + vlib/x/async/timeout.v | 79 +++ vlib/x/async/timeout_test.v | 295 +++++++++++ vlib/x/async/tools/README.md | 29 ++ vlib/x/async/tools/validate.sh | 46 ++ 43 files changed, 3547 insertions(+) create mode 100644 vlib/x/async/README.md create mode 100644 vlib/x/async/async.v create mode 100644 vlib/x/async/benchmarks/README.md create mode 100644 vlib/x/async/benchmarks/async_benchmark.v create mode 100755 vlib/x/async/benchmarks/run_async_benchmark.sh create mode 100644 vlib/x/async/context.v create mode 100644 vlib/x/async/context_internal_test.v create mode 100644 vlib/x/async/errors.v create mode 100644 vlib/x/async/examples/README.md create mode 100644 vlib/x/async/examples/basic_group.v create mode 100644 vlib/x/async/examples/basic_task.v create mode 100644 vlib/x/async/examples/mcp/README.md create mode 100644 vlib/x/async/examples/mcp/tool_dispatch.v create mode 100644 vlib/x/async/examples/net_http/README.md create mode 100644 vlib/x/async/examples/net_http/request_batch.v create mode 100644 vlib/x/async/examples/net_websocket/README.md create mode 100644 vlib/x/async/examples/net_websocket/message_pipeline.v create mode 100644 vlib/x/async/examples/periodic.v create mode 100644 vlib/x/async/examples/timeout.v create mode 100644 vlib/x/async/examples/veb/README.md create mode 100644 vlib/x/async/examples/veb/app_lifecycle.v create mode 100644 vlib/x/async/examples/worker_pool.v create mode 100644 vlib/x/async/group.v create mode 100644 vlib/x/async/group_test.v create mode 100644 vlib/x/async/periodic.v create mode 100644 vlib/x/async/periodic_test.v create mode 100644 vlib/x/async/pool.v create mode 100644 vlib/x/async/pool_test.v create mode 100644 vlib/x/async/task.v create mode 100644 vlib/x/async/task_test.v create mode 100644 vlib/x/async/tests/README.md create mode 100644 vlib/x/async/tests/mcp/README.md create mode 100644 vlib/x/async/tests/mcp/mcp_integration_test.v create mode 100644 vlib/x/async/tests/net_http/README.md create mode 100644 vlib/x/async/tests/net_http/net_http_integration_test.v create mode 100644 vlib/x/async/tests/net_websocket/README.md create mode 100644 vlib/x/async/tests/net_websocket/net_websocket_integration_test.v create mode 100644 vlib/x/async/tests/veb/README.md create mode 100644 vlib/x/async/tests/veb/veb_integration_test.v create mode 100644 vlib/x/async/timeout.v create mode 100644 vlib/x/async/timeout_test.v create mode 100644 vlib/x/async/tools/README.md create mode 100755 vlib/x/async/tools/validate.sh diff --git a/vlib/x/async/README.md b/vlib/x/async/README.md new file mode 100644 index 000000000..6366688a6 --- /dev/null +++ b/vlib/x/async/README.md @@ -0,0 +1,482 @@ +# x.async + +`x.async` is a small structured-concurrency layer for V programs. +It composes the primitives that already exist in V: `spawn`, +`sync.WaitGroup`, channels, `context`, and `time`. + +The module does not add a scheduler, does not change the language, and does not +implement `async/await`. It must not become a hidden runtime, event loop, +external dependency, or competing concurrency model. Its purpose is narrower: +make common concurrent control-flow easier to write, stop, test, and review. + +Safety is the primary design constraint. Public APIs should be defensive, shared +state must be synchronized explicitly, errors must not be lost silently, +channels must not leave workers blocked without a consumer, and behavior must +remain robust in `-prod`. + +The current API intentionally centers on five building blocks, plus small +context and function-type helpers: + +- `Group`: run related jobs, wait for them, return the first error, and cancel + sibling jobs cooperatively. +- `Task[T]`: run one value-returning job and wait for its result once. +- `Pool`: run accepted jobs with a fixed concurrency limit and bounded backlog. +- `every`: run a periodic job without overlapping iterations. +- `with_timeout` / `with_timeout_context`: run one job with a bounded deadline. + +Ticker objects and server-specific helpers are not part of this API. + +## Why + +Raw `spawn` plus `sync.WaitGroup` is explicit and fast, but real applications +quickly need the same extra wiring around it: + +- one parent cancellation signal shared by all jobs; +- one place to wait for all accepted jobs; +- predictable first-error propagation; +- value-returning tasks without hand-written result channels; +- bounded worker pools for request bursts or CPU-heavy server work; +- periodic cleanup loops that stop through context cancellation; +- fail-fast sibling cancellation; +- timeouts that do not require hand-written result channels in every caller. + +`x.async` keeps those mechanics small and visible. A job is still a normal V +function. Cancellation is still exposed as a normal `context.Context`. Waiting +for groups is still backed by `sync.WaitGroup`; value and timeout handoff uses +small bounded channels. Pools use fixed worker slots plus a bounded accepted-job +backlog. + +Internally, `x.async` owns the derived cancellation context used by its jobs. +That internal context does not schedule work; it only keeps cancellation and +timeout propagation deterministic for this module. + +## Quick Start + +```v +import context +import time +import x.async + +fn main() { + parent := context.background() + mut group := async.new_group(parent) + + group.go(fn (mut ctx context.Context) ! { + // A long-running job should observe ctx.done(). + done := ctx.done() + select { + _ := <-done { + return ctx.err() + } + 50 * time.millisecond { + return + } + } + })! + + group.go(fn (mut ctx context.Context) ! { + _ = ctx + return error('stop the group') + })! + + group.wait() or { + eprintln('group failed: ${err.msg()}') + } +} +``` + +## API + +### Context helpers + +`async.background()` returns `context.background()`. + +`async.with_cancel()` returns a cancellable context derived from background: + +```v ignore +ctx, cancel := async.with_cancel() +defer { + cancel() +} +``` + +These helpers are convenience wrappers. Code that already has a +`context.Context` should pass it directly to `new_group()` or +`with_timeout_context()`. + +### JobFn + +```v ignore +pub type JobFn = fn (mut context.Context) ! +pub type TaskFn[T] = fn (mut context.Context) !T +``` + +Every `x.async` job receives a context. This is a deliberate choice: +cancellation is part of the function signature instead of a hidden global side +channel. + +## Cooperative Cancellation + +Cancellation in `x.async` is cooperative. The module closes the shared context's +`done()` channel, but it does not interrupt, kill, or preempt a running thread. + +A job that can run for a long time should check the context: + +```v ignore +fn worker(mut ctx context.Context) ! { + done := ctx.done() + for { + select { + _ := <-done { + return ctx.err() + } + 10 * time.millisecond { + // Do a small unit of work. + } + } + } +} +``` + +If a job ignores `ctx.done()`, `Group.wait()` will still wait for it to return. +`Pool.close()` and `every()` also wait for running non-cooperative jobs to return. +`with_timeout()` returns when the timeout expires, but the ignored job may keep +running until it finishes naturally. + +## Group + +`Group` coordinates related jobs. + +```v ignore +parent := context.background() +mut group := async.new_group(parent) + +group.go(fn (mut ctx context.Context) ! { + serve_http(mut ctx)! +})! + +group.go(fn (mut ctx context.Context) ! { + cleanup_loop(mut ctx)! +})! + +group.wait()! +``` + +Guarantees: + +- `go()` accepts a job only before `wait()` starts. +- `wait()` blocks until every accepted job has finished. +- `wait()` is safe when no job was accepted. +- the first job error is stored once and returned by `wait()`; +- the first job error cancels the shared context so cooperative sibling jobs can + stop early; +- calling `go()` after `wait()` starts returns an error instead of racing + `sync.WaitGroup.add()` against `sync.WaitGroup.wait()`. + +`wait()` is a one-shot operation. Calling it a second time returns an error. This +keeps the lifecycle simple: create a group, submit jobs, wait once, then discard +the group. + +## Task + +`Task[T]` represents one concurrent computation that returns either a value or +an error. + +```v ignore +mut task := async.run[string](fn (mut ctx context.Context) !string { + _ = ctx + return load_config()! +})! + +config := task.wait()! +``` + +Use `run_with_context()` when the task should follow an existing parent context: + +```v ignore +parent := context.background() +mut task := async.run_with_context[int](parent, fn (mut ctx context.Context) !int { + done := ctx.done() + select { + _ := <-done { + return ctx.err() + } + 50 * time.millisecond { + return 42 + } + } +})! + +value := task.wait()! +``` + +Guarantees: + +- `run()` and `run_with_context()` reject a nil task function. +- `wait()` blocks until the task publishes its single result. +- `wait()` is one-shot; calling it a second time returns an error. +- the result channel is bounded with capacity 1, so a finished task can publish + its value or error before the owner calls `wait()`. +- parent cancellation is cooperative; the task function must observe + `ctx.done()` and return. + +`Task[T]` does not expose a kill operation. A task that ignores cancellation may +continue until its function returns naturally. + +## Pool + +`Pool` limits how many jobs run concurrently and how many pending jobs can wait +in memory. + +```v ignore +mut pool := async.new_pool(workers: 4, queue_size: 128)! +defer { + pool.close() or {} +} + +pool.try_submit(fn (mut ctx context.Context) ! { + process_message(mut ctx)! +})! +``` + +Use `new_pool_with_context()` when the pool should follow an existing parent +context: + +```v ignore +parent := context.background() +mut pool := async.new_pool_with_context(parent, workers: 2, queue_size: 32)! +``` + +Backpressure is explicit. `try_submit()` never waits for backlog space: + +- if the pool is open and the backlog has capacity, the job is accepted; +- if the backlog is full, it returns `async: pool queue is full`; +- if the pool is closed or already waiting, it returns `async: pool is closed`; +- if the job function is nil, it returns `async: job function is nil`. + +Lifecycle: + +- `workers` and `queue_size` must be positive and are fixed at creation. +- at most `workers` accepted jobs execute user code at the same time. +- at most `workers + queue_size` jobs can be accepted and unfinished at once. +- `close()` stops accepting new jobs, waits for accepted jobs, and returns the + first job error if any. +- `wait()` has the same behavior as `close()`; both are one-shot. +- a job error is stored once and returned by `close()` / `wait()`. +- `Pool` is not fail-fast: a job error does not kill running jobs, does not + cancel already accepted sibling jobs, and does not discard accepted backlog. +- parent cancellation is cooperative; jobs must observe `ctx.done()` and return. + +`Pool` does not kill running jobs. A non-cooperative job can delay `close()` until +it returns naturally. `close()` and `wait()` drain accepted jobs before returning +the first stored error. User code never runs while the pool lifecycle mutex is +held. + +## Periodic Jobs + +`every()` runs a job repeatedly until its context is canceled or the job returns +an error. + +```v ignore +ctx, cancel := async.with_cancel() +defer { + cancel() +} + +async.every(ctx, 5 * time.second, fn (mut ctx context.Context) ! { + cleanup_stale_clients(mut ctx)! +})! +``` + +Guarantees: + +- `interval` must be positive; zero or negative intervals return + `async: interval must be positive`. +- `every()` is blocking and does not start a hidden background loop. +- the first iteration runs after one interval. +- iterations never overlap; a slow iteration delays the next one. +- a job error stops the loop and is returned unchanged. +- context cancellation stops the loop and returns the context error. + +`every()` is not a scheduler and does not expose a ticker handle. If a running +job ignores cancellation, `every()` cannot return until that job returns +naturally. + +## Timeout + +`with_timeout()` runs one job with a background context and a timeout: + +```v ignore +async.with_timeout(2 * time.second, fn (mut ctx context.Context) ! { + done := ctx.done() + select { + _ := <-done { + return ctx.err() + } + 100 * time.millisecond { + return + } + } +})! +``` + +`with_timeout_context()` derives the timeout from an existing parent context: + +```v ignore +parent := context.background() +async.with_timeout_context(parent, 250 * time.millisecond, fn (mut ctx context.Context) ! { + fetch_or_compute(mut ctx)! +})! +``` + +Error behavior: + +- if the job returns first, the job error is returned unchanged; +- if the timeout expires first, the public error is `async: timeout`; +- if the parent context is canceled first, including when the parent's own + deadline expires first, the parent context error is returned; +- a job that observes the local `x.async` timeout by returning + `context deadline exceeded` is normalized to `async: timeout`. + +The result channel used internally is buffered so the spawned job can finish and +publish its result even if the caller has already returned on timeout. + +## Safety And Security Notes + +`x.async` is about control-flow safety, not sandboxing. + +- It does not recover panics from spawned jobs. +- It does not kill work that ignores cancellation. +- It does not validate user input, paths, network data, or files. +- `Task[T].wait()` is one-shot so result ownership is unambiguous. +- `Pool.try_submit()` is non-blocking and returns a stable error when the + accepted-job backlog is full instead of hiding backpressure. +- `Pool.close()` drains accepted jobs before returning and reports the first + job error. +- `every()` is blocking and serial, so periodic iterations cannot overlap. +- It uses `sync.Mutex` to protect mutable lifecycle/result state in `Group`, + `Task[T]`, and `Pool`. +- It keeps `sync.WaitGroup.add()` and `sync.WaitGroup.wait()` separated by a + lifecycle mutex for `Group` and `Pool` where accepted work can race with + shutdown. + +When jobs process untrusted input, the application must still apply the normal +validation and resource limits for that domain. + +## Limits + +This milestone does not include: + +- blocking pool submission; +- ticker objects or detached periodic handles; +- multi-consumer futures or promise chaining; +- panic recovery; +- scheduler changes; +- `goroutines`, `coroutines`, `x.atomics`, or `sync.stdatomic`. + +The current module deliberately stays small. Its job is to structure V's +existing `spawn`, `sync`, channels, `context`, and `time` primitives, not to +replace them with a new runtime. + +Possible future additions, if accepted by the project maintainers and backed by +tests, could still fit the current philosophy: + +- blocking or timeout-based pool submission, built on existing channels; +- detached periodic handles with explicit `stop()` / `wait()` lifecycle; +- helpers that combine `Group`, `Pool`, `Task[T]`, and timeout for server-style + code; +- more examples and integration tests for other V modules that already use + concurrency; +- careful rewrites of existing V modules that need concurrency, if maintainers + decide `x.async` makes their lifecycle, cancellation, or backpressure simpler + and safer without breaking compatibility; +- optional error collection helpers, as long as first-error behavior stays + simple and documented; +- more benchmarks and stress tests that remain bounded and reproducible. + +Other ideas would require a separate design because they move beyond this +module's current scope: + +- a scheduler or event loop; +- async/await syntax or compiler changes; +- green threads or coroutine/goroutine integration; +- preemptive cancellation or killing running jobs; +- panic recovery across spawned work; +- low-level atomic or lock-free rewrites of the public API. + +Those larger features may be useful someday, but they should not be hidden +inside `x.async`. They need separate review so this module can remain minimal, +safe, and easy to reason about. + +## Examples + +Small runnable examples live in `vlib/x/async/examples/`: + +- `basic_group.v`: first error propagation and cooperative sibling cancellation. +- `basic_task.v`: run one value-returning task and consume it with `wait()`. +- `worker_pool.v`: fixed concurrency with explicit `try_submit()` backpressure. +- `periodic.v`: a blocking `every()` loop stopped by context cancellation. +- `timeout.v`: run one cooperative job with a bounded timeout. +- `net_http/`: synthetic `net.http` request/response work through `Pool`. +- `net_websocket/`: in-memory `websocket.Message` processing through `Group`; + this is not an end-to-end websocket server test. +- `mcp/`: in-memory MCP request/response dispatch through `Task[T]`. +- `veb/`: in-memory `veb.Context` response lifecycle through `Task[T]`. + +Each example focuses on one API and uses only local, in-memory work. + +## Tests + +The targeted test suite lives next to the module: + +```sh +v test vlib/x/async +v -prod test vlib/x/async +``` + +For automated validation, prefer the guarded script: + +```sh +sh vlib/x/async/tools/validate.sh +``` + +It runs formatting verification, dev tests, and `-prod` tests serially with a +fresh `VTMP` and `VCACHE` for that run. Do not run two V validation runners +against the same checkout/cache unless each runner has isolated `VTMP`, +`VCACHE`, and output paths. That isolation protects the validation harness from +V build artefact collisions; it is separate from the runtime guarantees of +`x.async`. + +The tests cover successful groups, first-error propagation, empty waits, +rejected submissions after `wait()`, one-shot task waits, task values and +errors, pool worker limits, pool queue backpressure, pool close/wait behavior, +periodic execution, periodic cancellation, non-overlapping periodic iterations, +cooperative cancellation, timeout errors, parent cancellation, nil job rejection, +parent deadline preservation, invalid intervals, zero/already-expired timeouts, +stress cases, and jobs that ignore cancellation. Internal tests also verify +that the derived `AsyncContext` closes `done()` and propagates parent +cancellation. + +Module-oriented integration tests live in `vlib/x/async/tests/`. They are +synthetic and local: no external service, fixed port, path dependency, or fragile +server shutdown is required. They currently cover `net.http`, `net.websocket`, +`mcp`, and `veb` boundaries. + +## Benchmarks + +Small local benchmarks live in `vlib/x/async/benchmarks/`: + +```sh +sh vlib/x/async/benchmarks/run_async_benchmark.sh +``` + +The script uses the local `./v`, runs serially, and isolates `VTMP`, `VCACHE`, +and the benchmark executable output. It measures short default runs for +`Group`, `Task[T]`, `Pool`, `with_timeout()`, and `every()`. + +The default sizes are intentionally modest. They can be changed with +`XASYNC_BENCH_GROUP_ROUNDS`, `XASYNC_BENCH_GROUP_JOBS`, +`XASYNC_BENCH_TASK_ROUNDS`, `XASYNC_BENCH_POOL_JOBS`, +`XASYNC_BENCH_POOL_WORKERS`, `XASYNC_BENCH_TIMEOUT_ROUNDS`, +`XASYNC_BENCH_EVERY_ITERATIONS`, and `XASYNC_BENCH_EVERY_INTERVAL_MS`. +Benchmark output is local diagnostic data, not a portable performance claim. diff --git a/vlib/x/async/async.v b/vlib/x/async/async.v new file mode 100644 index 000000000..904d30e1d --- /dev/null +++ b/vlib/x/async/async.v @@ -0,0 +1,35 @@ +module async + +import context + +// JobFn is the function signature run by Group and timeout helpers. +// +// The passed context is canceled when the parent context is canceled, when a +// group task fails, or when a timeout expires. Cancellation is cooperative: +// jobs should observe `ctx.done()` and return. +pub type JobFn = fn (mut context.Context) ! + +// TaskFn is the function signature run by Task. +// +// It mirrors JobFn but returns a value. Like all x.async work, the function +// receives a context and must observe `ctx.done()` for cooperative cancellation. +pub type TaskFn[T] = fn (mut context.Context) !T + +// background returns a root context for async helpers. +// +// It is intentionally just a thin wrapper around `context.background()` so code +// can start with `async.background()` and later pass the same context to the +// standard `context` APIs without conversion. +pub fn background() context.Context { + return context.background() +} + +// with_cancel returns a cancellable context derived from background. +// +// Call the returned cancel function when the surrounding operation is done, even +// when all jobs completed successfully. That mirrors `context.with_cancel()` and +// releases parent/child cancellation references promptly. +pub fn with_cancel() (context.Context, context.CancelFn) { + ctx, cancel := new_cancel_context(context.background()) + return context.Context(ctx), cancel +} diff --git a/vlib/x/async/benchmarks/README.md b/vlib/x/async/benchmarks/README.md new file mode 100644 index 000000000..8a8507688 --- /dev/null +++ b/vlib/x/async/benchmarks/README.md @@ -0,0 +1,40 @@ +# x.async benchmarks + +This folder contains cautious local benchmarks for the public `x.async` API. +They are observation tools, not correctness tests and not portable performance +claims. + +## Available benchmark + +- `async_benchmark.v`: measures short default runs for `Group`, `Task[T]`, + `Pool`, `with_timeout()`, and `every()`. +- `run_async_benchmark.sh`: builds and runs the benchmark with the local `./v`, + isolated `VTMP`/`VCACHE`, and a temporary output binary. + +## Run + +From the repository root: + +```sh +sh vlib/x/async/benchmarks/run_async_benchmark.sh +``` + +The script is serialized by design. Do not run multiple V benchmark or test +runners against the same checkout/cache without isolating `VTMP`, `VCACHE`, and +output paths. + +## Parameters + +The defaults are intentionally small. Tune them locally with: + +- `XASYNC_BENCH_GROUP_ROUNDS` +- `XASYNC_BENCH_GROUP_JOBS` +- `XASYNC_BENCH_TASK_ROUNDS` +- `XASYNC_BENCH_POOL_JOBS` +- `XASYNC_BENCH_POOL_WORKERS` +- `XASYNC_BENCH_TIMEOUT_ROUNDS` +- `XASYNC_BENCH_EVERY_ITERATIONS` +- `XASYNC_BENCH_EVERY_INTERVAL_MS` + +Do not commit machine-specific benchmark results as permanent truth. Tests +remain the authority for functional and concurrency-safety validation. diff --git a/vlib/x/async/benchmarks/async_benchmark.v b/vlib/x/async/benchmarks/async_benchmark.v new file mode 100644 index 000000000..6d1ba991d --- /dev/null +++ b/vlib/x/async/benchmarks/async_benchmark.v @@ -0,0 +1,173 @@ +import benchmark +import context +import os +import time +import x.async as xasync + +const default_group_rounds = 8 +const default_group_jobs = 16 +const default_task_rounds = 32 +const default_pool_jobs = 64 +const default_pool_workers = 4 +const default_timeout_rounds = 32 +const default_every_iterations = 5 +const default_every_interval_ms = 1 + +fn main() { + run_benchmarks() or { + eprintln('x.async benchmark failed: ${err.msg()}') + exit(1) + } +} + +fn run_benchmarks() ! { + group_rounds := env_int('XASYNC_BENCH_GROUP_ROUNDS', default_group_rounds, 1, 200) + group_jobs := env_int('XASYNC_BENCH_GROUP_JOBS', default_group_jobs, 1, 512) + task_rounds := env_int('XASYNC_BENCH_TASK_ROUNDS', default_task_rounds, 1, 500) + pool_jobs := env_int('XASYNC_BENCH_POOL_JOBS', default_pool_jobs, 1, 1000) + pool_workers := env_int('XASYNC_BENCH_POOL_WORKERS', default_pool_workers, 1, 64) + timeout_rounds := env_int('XASYNC_BENCH_TIMEOUT_ROUNDS', default_timeout_rounds, 1, 500) + every_iterations := env_int('XASYNC_BENCH_EVERY_ITERATIONS', default_every_iterations, 1, 100) + every_interval_ms := + env_int('XASYNC_BENCH_EVERY_INTERVAL_MS', default_every_interval_ms, 1, 100) + + println('x.async cautious benchmark') + println('Override sizes with XASYNC_BENCH_* environment variables.') + + mut checksum := 0 + mut b := benchmark.start() + checksum += bench_group(group_rounds, group_jobs)! + b.measure('Group: rounds=${group_rounds}, jobs_per_round=${group_jobs}, checksum=${checksum}') + + checksum += bench_task(task_rounds)! + b.measure('Task: rounds=${task_rounds}, checksum=${checksum}') + + checksum += bench_pool(pool_jobs, pool_workers)! + b.measure('Pool: jobs=${pool_jobs}, workers=${pool_workers}, checksum=${checksum}') + + checksum += bench_timeout(timeout_rounds)! + b.measure('with_timeout: rounds=${timeout_rounds}, checksum=${checksum}') + + checksum += bench_every(every_iterations, every_interval_ms)! + b.measure('every: iterations=${every_iterations}, interval_ms=${every_interval_ms}, checksum=${checksum}') +} + +fn env_int(name string, default_value int, min_value int, max_value int) int { + raw := os.getenv(name) + if raw == '' { + return default_value + } + mut value := raw.int() + if value < min_value { + value = min_value + } + if value > max_value { + value = max_value + } + return value +} + +fn bench_group(rounds int, jobs_per_round int) !int { + mut total := 0 + for _ in 0 .. rounds { + done := chan int{cap: jobs_per_round} + mut group := xasync.new_group(context.background()) + for _ in 0 .. jobs_per_round { + group.go(fn [done] (mut ctx context.Context) ! { + _ = ctx + done <- 1 + })! + } + group.wait()! + for _ in 0 .. jobs_per_round { + total += <-done + } + } + return total +} + +fn bench_task(rounds int) !int { + mut total := 0 + for i in 0 .. rounds { + mut task := xasync.run[int](fn [i] (mut ctx context.Context) !int { + _ = ctx + return i + 1 + })! + total += task.wait()! + } + return total +} + +fn bench_pool(jobs int, workers int) !int { + done := chan int{cap: jobs} + mut pool := xasync.new_pool(workers: workers, queue_size: jobs)! + for _ in 0 .. jobs { + pool.try_submit(fn [done] (mut ctx context.Context) ! { + _ = ctx + done <- 1 + })! + } + pool.close()! + mut total := 0 + for _ in 0 .. jobs { + total += <-done + } + return total +} + +fn bench_timeout(rounds int) !int { + mut total := 0 + for _ in 0 .. rounds { + xasync.with_timeout(1 * time.second, fn (mut ctx context.Context) ! { + _ = ctx + })! + total++ + } + return total +} + +fn bench_every(iterations int, interval_ms int) !int { + ctx, cancel := xasync.with_cancel() + ticks := chan int{cap: iterations + 4} + result := chan string{cap: 1} + interval := time.Duration(interval_ms) * time.millisecond + worker := spawn fn [ctx, ticks, result, interval] () { + xasync.every(ctx, interval, fn [ticks] (mut ctx context.Context) ! { + _ = ctx + ticks <- 1 + }) or { + result <- err.msg() + return + } + result <- 'ok' + }() + + mut total := 0 + for _ in 0 .. iterations { + select { + value := <-ticks { + total += value + } + 1 * time.second { + cancel() + worker.wait() + return error('every benchmark did not receive a tick') + } + } + } + cancel() + select { + msg := <-result { + if msg != 'context canceled' { + worker.wait() + return error('unexpected every benchmark result: ${msg}') + } + } + 1 * time.second { + worker.wait() + return error('every benchmark did not stop after cancellation') + } + } + worker.wait() + return total +} diff --git a/vlib/x/async/benchmarks/run_async_benchmark.sh b/vlib/x/async/benchmarks/run_async_benchmark.sh new file mode 100755 index 000000000..6803c5b7b --- /dev/null +++ b/vlib/x/async/benchmarks/run_async_benchmark.sh @@ -0,0 +1,34 @@ +#!/usr/bin/env sh +set -eu + +script_dir=$(CDPATH= cd "$(dirname "$0")" && pwd) +repo_root=$(CDPATH= cd "$script_dir/../../../.." && pwd) +cd "$repo_root" + +if [ ! -x ./v ]; then + echo "x.async benchmarks must be run from a V checkout with local ./v" >&2 + exit 1 +fi + +tmp_root=$(mktemp -d "${TMPDIR:-/tmp}/xasync-benchmark.XXXXXX") +cleanup() { + rm -rf "$tmp_root" +} +trap cleanup EXIT INT TERM + +vtmp="$tmp_root/vtmp" +vcache="$tmp_root/vcache" +out="$tmp_root/out/async_benchmark" +mkdir -p "$vtmp" "$vcache" "$(dirname "$out")" + +echo "Running x.async benchmark with isolated VTMP and VCACHE." +echo "Tune sizes with XASYNC_BENCH_GROUP_ROUNDS, XASYNC_BENCH_GROUP_JOBS," +echo "XASYNC_BENCH_TASK_ROUNDS, XASYNC_BENCH_POOL_JOBS," +echo "XASYNC_BENCH_POOL_WORKERS, XASYNC_BENCH_TIMEOUT_ROUNDS," +echo "XASYNC_BENCH_EVERY_ITERATIONS, and XASYNC_BENCH_EVERY_INTERVAL_MS." + +if command -v timeout >/dev/null 2>&1; then + timeout 180s env VTMP="$vtmp" VCACHE="$vcache" ./v -prod -o "$out" run vlib/x/async/benchmarks/async_benchmark.v +else + env VTMP="$vtmp" VCACHE="$vcache" ./v -prod -o "$out" run vlib/x/async/benchmarks/async_benchmark.v +fi diff --git a/vlib/x/async/context.v b/vlib/x/async/context.v new file mode 100644 index 000000000..472545691 --- /dev/null +++ b/vlib/x/async/context.v @@ -0,0 +1,178 @@ +module async + +import context +import sync +import time + +enum CancelSource { + unknown + parent + timeout + cancel +} + +// AsyncContext is the small `context.Context` implementation owned by x.async. +// +// The public API still accepts and returns the standard `context.Context` +// interface. Internally, x.async needs a derived context whose cancellation is +// simple, idempotent, and non-blocking for Group and timeout lifecycles. +// +// This is not a scheduler or runtime. It is only the cancellation signal shared +// by jobs launched through this module. +@[heap] +struct AsyncContext { +mut: + parent context.Context + done chan int + mutex &sync.Mutex = sync.new_mutex() + err IError = none + cancel_src CancelSource + has_deadline bool + deadline_at time.Time +} + +fn new_cancel_context(parent context.Context) (&AsyncContext, context.CancelFn) { + mut ctx := &AsyncContext{ + parent: parent + done: chan int{cap: 1} + mutex: sync.new_mutex() + } + if !ctx.propagate_existing_parent_error() { + spawn watch_parent_context(mut ctx) + } + cancel_fn := fn [mut ctx] () { + ctx.cancel_with(error(context_canceled), .cancel) + } + return ctx, context.CancelFn(cancel_fn) +} + +fn new_timeout_context(parent context.Context, timeout time.Duration) (&AsyncContext, context.CancelFn) { + mut deadline_at := time.now().add(timeout) + mut deadline_src := CancelSource.timeout + if parent_deadline := parent.deadline() { + if parent_deadline < deadline_at { + deadline_at = parent_deadline + deadline_src = .parent + } + } + mut ctx := &AsyncContext{ + parent: parent + done: chan int{cap: 1} + mutex: sync.new_mutex() + has_deadline: true + deadline_at: deadline_at + } + if !ctx.propagate_existing_parent_error() { + spawn watch_parent_context(mut ctx) + } + if deadline_src == .timeout { + effective_timeout := deadline_at - time.now() + if effective_timeout.nanoseconds() <= 0 { + ctx.cancel_with(error(context_deadline_exceeded), .timeout) + } else { + spawn watch_timeout_context(mut ctx, effective_timeout) + } + } + cancel_fn := fn [mut ctx] () { + ctx.cancel_with(error(context_canceled), .cancel) + } + return ctx, context.CancelFn(cancel_fn) +} + +// deadline returns x.async's own deadline when present, otherwise the parent's +// deadline. This preserves deadline metadata for callers that inspect it. +pub fn (ctx &AsyncContext) deadline() ?time.Time { + if ctx.has_deadline { + return ctx.deadline_at + } + return ctx.parent.deadline() +} + +// value delegates to the parent context. x.async does not add request values. +pub fn (ctx &AsyncContext) value(key context.Key) ?context.Any { + return ctx.parent.value(key) +} + +// done returns the cancellation channel shared by jobs using this context. +pub fn (mut ctx AsyncContext) done() chan int { + ctx.mutex.lock() + done := ctx.done + ctx.mutex.unlock() + return done +} + +// err returns the local cancellation reason, or the parent's reason if the +// parent is already canceled before propagation reaches this context. +pub fn (mut ctx AsyncContext) err() IError { + ctx.mutex.lock() + err := ctx.err + ctx.mutex.unlock() + if err !is none { + return err + } + return ctx.parent.err() +} + +fn (ctx &AsyncContext) was_canceled_by_timeout() bool { + ctx.mutex.lock() + cancel_src := ctx.cancel_src + ctx.mutex.unlock() + return cancel_src == .timeout +} + +fn (mut ctx AsyncContext) cancel_with(err IError, cancel_src CancelSource) { + if err is none { + return + } + ctx.mutex.lock() + if ctx.err !is none { + ctx.mutex.unlock() + return + } + ctx.err = err + ctx.cancel_src = cancel_src + if !ctx.done.closed { + ctx.done <- 0 + ctx.done.close() + } + ctx.mutex.unlock() +} + +fn (mut ctx AsyncContext) propagate_existing_parent_error() bool { + mut parent := ctx.parent + err := parent.err() + if err !is none { + ctx.cancel_with(err, .parent) + return true + } + return false +} + +fn watch_parent_context(mut ctx AsyncContext) { + mut parent := ctx.parent + parent_done := parent.done() + local_done := ctx.done() + select { + _ := <-local_done { + return + } + _ := <-parent_done { + err := parent.err() + if err !is none { + ctx.cancel_with(err, .parent) + } + } + } +} + +fn watch_timeout_context(mut ctx AsyncContext, timeout time.Duration) { + local_done := ctx.done() + select { + _ := <-local_done { + return + } + timeout { + ctx.cancel_with(error(context_deadline_exceeded), .timeout) + } + } +} diff --git a/vlib/x/async/context_internal_test.v b/vlib/x/async/context_internal_test.v new file mode 100644 index 000000000..2fb626d7a --- /dev/null +++ b/vlib/x/async/context_internal_test.v @@ -0,0 +1,54 @@ +module async + +import context +import time + +fn test_async_context_cancel_closes_done_and_sets_error() { + mut ctx, cancel := new_cancel_context(context.background()) + cancel() + done := ctx.done() + select { + _ := <-done { + assert ctx.err().msg() == context_canceled + } + 1 * time.second { + assert false, 'cancel did not close AsyncContext.done()' + } + } + cancel() + assert ctx.err().msg() == context_canceled +} + +fn test_async_context_timeout_closes_done_and_sets_deadline_error() { + mut ctx, cancel := new_timeout_context(context.background(), 20 * time.millisecond) + defer { + cancel() + } + done := ctx.done() + select { + _ := <-done { + assert ctx.err().msg() == context_deadline_exceeded + } + 1 * time.second { + assert false, 'timeout did not close AsyncContext.done()' + } + } +} + +fn test_async_context_parent_cancel_propagates_to_child() { + parent, parent_cancel := new_cancel_context(context.background()) + mut child, child_cancel := new_cancel_context(context.Context(parent)) + defer { + child_cancel() + } + parent_cancel() + done := child.done() + select { + _ := <-done { + assert child.err().msg() == context_canceled + } + 1 * time.second { + assert false, 'parent cancellation did not reach child AsyncContext' + } + } +} diff --git a/vlib/x/async/errors.v b/vlib/x/async/errors.v new file mode 100644 index 000000000..e0428c206 --- /dev/null +++ b/vlib/x/async/errors.v @@ -0,0 +1,22 @@ +module async + +// Error strings are centralized because they form the observable contract for +// callers and tests. Keep them short, stable, and explicit about whether the +// failure came from x.async or from the parent context. +const err_group_go_after_wait = 'async: group does not accept new tasks after wait starts' +const err_group_wait_called = 'async: group wait was already called' +const err_interval_invalid = 'async: interval must be positive' +const err_nil_job = 'async: job function is nil' +const err_pool_closed = 'async: pool is closed' +const err_pool_queue_full = 'async: pool queue is full' +const err_pool_queue_size_invalid = 'async: pool queue size must be positive' +const err_pool_wait_called = 'async: pool wait was already called' +const err_pool_workers_invalid = 'async: pool worker count must be positive' +const err_task_wait_called = 'async: task wait was already called' +const err_timeout = 'async: timeout' + +const context_canceled = 'context canceled' + +// The current context module exposes deadline expiry as an IError with this +// message. x.async normalizes that specific deadline error to `async: timeout`. +const context_deadline_exceeded = 'context deadline exceeded' diff --git a/vlib/x/async/examples/README.md b/vlib/x/async/examples/README.md new file mode 100644 index 000000000..254a80ec1 --- /dev/null +++ b/vlib/x/async/examples/README.md @@ -0,0 +1,51 @@ +# x.async examples + +This folder contains short public examples for the current `x.async` API. +Each file focuses on one idea and uses only local, in-memory work. + +Examples are documentation-first programs. They show how the public API is +expected to be used in small readable scenarios. They are executed by the +validation script to avoid bit rot, but the regression guarantees live in the +`tests/` folder. + +## Rules + +- No external service is required. +- No fixed local path is used. +- No network listener is opened. +- Examples avoid `panic()` as a control-flow pattern. +- Cancellation remains cooperative; examples show jobs checking `ctx.done()` + when they may run for more than a tiny unit of work. + +## Examples + +- `basic_group.v`: first error propagation and cooperative sibling + cancellation. +- `basic_task.v`: one value-returning task consumed with `wait()`. +- `worker_pool.v`: fixed concurrency and explicit `try_submit()` backpressure. +- `periodic.v`: a blocking `every()` loop stopped by context cancellation. +- `timeout.v`: one cooperative job bounded by `with_timeout()`. +- `net_http/`: synthetic `net.http` request/response processing with `Pool`. +- `net_websocket/`: in-memory `websocket.Message` processing. It is not an + end-to-end websocket server example. +- `mcp/`: in-memory MCP request/response dispatch with `Task[T]`. +- `veb/`: synthetic `veb.Context` response lifecycle with `Task[T]`. + +## Run + +From the repository root: + +```sh +./v run vlib/x/async/examples/basic_group.v +./v run vlib/x/async/examples/basic_task.v +./v run vlib/x/async/examples/worker_pool.v +./v run vlib/x/async/examples/periodic.v +./v run vlib/x/async/examples/timeout.v +./v run vlib/x/async/examples/net_http/request_batch.v +./v run vlib/x/async/examples/net_websocket/message_pipeline.v +./v run vlib/x/async/examples/mcp/tool_dispatch.v +./v run vlib/x/async/examples/veb/app_lifecycle.v +``` + +Use `tools/validate.sh` for the guarded module validation path. It isolates V +temporary/cache directories and keeps validation runners serialized. diff --git a/vlib/x/async/examples/basic_group.v b/vlib/x/async/examples/basic_group.v new file mode 100644 index 000000000..c548e2c37 --- /dev/null +++ b/vlib/x/async/examples/basic_group.v @@ -0,0 +1,30 @@ +import context +import time +import x.async as xasync + +fn main() { + mut group := xasync.new_group(context.background()) + + group.go(fn (mut ctx context.Context) ! { + _ = ctx + time.sleep(20 * time.millisecond) + return error('stop group') + })! + + group.go(fn (mut ctx context.Context) ! { + done := ctx.done() + select { + _ := <-done { + return ctx.err() + } + 1 * time.second { + return error('worker did not observe cancellation') + } + } + })! + + group.wait() or { + println('group failed: ${err.msg()}') + return + } +} diff --git a/vlib/x/async/examples/basic_task.v b/vlib/x/async/examples/basic_task.v new file mode 100644 index 000000000..f350d5adc --- /dev/null +++ b/vlib/x/async/examples/basic_task.v @@ -0,0 +1,12 @@ +import context +import x.async as xasync + +fn main() { + mut task := xasync.run[int](fn (mut ctx context.Context) !int { + _ = ctx + return 42 + })! + + value := task.wait()! + println('task result: ${value}') +} diff --git a/vlib/x/async/examples/mcp/README.md b/vlib/x/async/examples/mcp/README.md new file mode 100644 index 000000000..a26504d45 --- /dev/null +++ b/vlib/x/async/examples/mcp/README.md @@ -0,0 +1,18 @@ +# mcp integration example + +This example uses MCP JSON-RPC values in memory and runs the dispatch step in an +`x.async.Task`. It does not start stdio, HTTP, a process transport, or any +external service. + +The goal is to show the integration boundary: + +- build an MCP request value; +- decode and validate it inside a task; +- return an MCP response value; +- consume the result with `Task.wait()`. + +Run from the repository root: + +```sh +./v run vlib/x/async/examples/mcp/tool_dispatch.v +``` diff --git a/vlib/x/async/examples/mcp/tool_dispatch.v b/vlib/x/async/examples/mcp/tool_dispatch.v new file mode 100644 index 000000000..bdd66ae09 --- /dev/null +++ b/vlib/x/async/examples/mcp/tool_dispatch.v @@ -0,0 +1,37 @@ +import context +import mcp +import x.async as xasync + +struct EchoArgs { +pub: + text string +} + +fn dispatch_tool(raw string) !mcp.Response { + req := mcp.decode_request(raw)! + if req.method != 'tools/call' { + return error('unsupported MCP method') + } + args := req.decode_params[EchoArgs]()! + result := mcp.tool_text_result('echo: ${args.text}') + return mcp.new_response(1, result, mcp.ResponseError{}) +} + +fn main() { + request := mcp.new_request(1, 'tools/call', EchoArgs{ + text: 'hello' + }) + mut task := xasync.run[mcp.Response](fn [request] (mut ctx context.Context) !mcp.Response { + done := ctx.done() + select { + _ := <-done { + return ctx.err() + } + else {} + } + return dispatch_tool(request.encode())! + })! + + response := task.wait()! + println(response.encode()) +} diff --git a/vlib/x/async/examples/net_http/README.md b/vlib/x/async/examples/net_http/README.md new file mode 100644 index 000000000..f99be8fae --- /dev/null +++ b/vlib/x/async/examples/net_http/README.md @@ -0,0 +1,21 @@ +# net.http integration example + +This example shows how `x.async` can coordinate local `net.http` request and +response handling without opening a network listener. + +It is intentionally synthetic: requests are constructed in memory, processed by +a small handler, and drained through a `Pool`. This keeps the example stable +while still showing the lifecycle pattern used by HTTP-facing code: + +- construct or receive a `http.Request`; +- submit bounded work to an `x.async.Pool`; +- publish a `http.Response` result; +- close the pool and report the first error if one occurred. + +Run from the repository root: + +```sh +./v run vlib/x/async/examples/net_http/request_batch.v +``` + +No external service, fixed port, file path, or real HTTP server is used. diff --git a/vlib/x/async/examples/net_http/request_batch.v b/vlib/x/async/examples/net_http/request_batch.v new file mode 100644 index 000000000..813cf4ec9 --- /dev/null +++ b/vlib/x/async/examples/net_http/request_batch.v @@ -0,0 +1,47 @@ +import context +import net.http +import x.async as xasync + +fn synthetic_http_handler(req http.Request) !http.Response { + path := if req.url.contains('?') { req.url.all_before('?') } else { req.url } + match path { + '/health' { + return http.new_response(status: .ok, body: 'healthy') + } + '/ready' { + return http.new_response(status: .ok, body: 'ready') + } + else { + return http.new_response(status: .not_found, body: 'missing') + } + } +} + +fn main() { + requests := [ + http.new_request(.get, '/health', ''), + http.new_request(.get, '/ready', ''), + http.new_request(.get, '/missing', ''), + ] + responses := chan string{cap: requests.len} + mut pool := xasync.new_pool(workers: 2, queue_size: requests.len)! + + for req in requests { + pool.try_submit(fn [req, responses] (mut ctx context.Context) ! { + done := ctx.done() + select { + _ := <-done { + return ctx.err() + } + else {} + } + resp := synthetic_http_handler(req)! + responses <- '${req.url} -> ${resp.status_code} ${resp.body}' + })! + } + + pool.close()! + for _ in 0 .. requests.len { + println(<-responses) + } +} diff --git a/vlib/x/async/examples/net_websocket/README.md b/vlib/x/async/examples/net_websocket/README.md new file mode 100644 index 000000000..edff403a3 --- /dev/null +++ b/vlib/x/async/examples/net_websocket/README.md @@ -0,0 +1,19 @@ +# net.websocket integration example + +This example stays in memory around `websocket.Message` processing. It does not +open a websocket server, does not connect a client, and must not be read as an +end-to-end websocket server validation. + +That boundary is deliberate. In this V snapshot, `websocket.Server.close()` is +not a complete, stable server shutdown mechanism suitable for fragile +validation. The public example therefore demonstrates the safe part that +`x.async` can own here: coordinating message/callback-style work and propagating +errors without inventing a websocket runtime. + +Run from the repository root: + +```sh +./v run vlib/x/async/examples/net_websocket/message_pipeline.v +``` + +No external service, fixed port, or websocket server lifecycle is used. diff --git a/vlib/x/async/examples/net_websocket/message_pipeline.v b/vlib/x/async/examples/net_websocket/message_pipeline.v new file mode 100644 index 000000000..3fc6ba601 --- /dev/null +++ b/vlib/x/async/examples/net_websocket/message_pipeline.v @@ -0,0 +1,49 @@ +import context +import net.websocket +import x.async as xasync + +fn describe_message(msg websocket.Message) !string { + return match msg.opcode { + .text_frame { + 'text:${msg.payload.bytestr()}' + } + .ping { + 'ping' + } + else { + error('unsupported websocket message opcode') + } + } +} + +fn main() { + messages := [ + websocket.Message{ + opcode: .text_frame + payload: 'hello'.bytes() + }, + websocket.Message{ + opcode: .ping + }, + ] + processed := chan string{cap: messages.len} + mut group := xasync.new_group(context.background()) + + for msg in messages { + group.go(fn [msg, processed] (mut ctx context.Context) ! { + done := ctx.done() + select { + _ := <-done { + return ctx.err() + } + else {} + } + processed <- describe_message(msg)! + })! + } + + group.wait()! + for _ in 0 .. messages.len { + println(<-processed) + } +} diff --git a/vlib/x/async/examples/periodic.v b/vlib/x/async/examples/periodic.v new file mode 100644 index 000000000..04a8c3a7d --- /dev/null +++ b/vlib/x/async/examples/periodic.v @@ -0,0 +1,49 @@ +import context +import time +import x.async as xasync + +fn main() { + ctx, cancel := xasync.with_cancel() + ticks := chan int{cap: 4} + done := chan string{cap: 1} + + worker := spawn fn [ctx, ticks, done] () { + xasync.every(ctx, 10 * time.millisecond, fn [ticks] (mut ctx context.Context) ! { + _ = ctx + ticks <- 1 + }) or { + done <- err.msg() + return + } + done <- 'completed' + }() + + if !wait_tick(ticks) || !wait_tick(ticks) { + eprintln('periodic job did not tick') + exit(1) + } + cancel() + + select { + msg := <-done { + println('periodic stopped: ${msg}') + } + 1 * time.second { + eprintln('periodic job did not stop') + exit(1) + } + } + worker.wait() +} + +fn wait_tick(ticks chan int) bool { + select { + _ := <-ticks { + return true + } + 1 * time.second { + return false + } + } + return false +} diff --git a/vlib/x/async/examples/timeout.v b/vlib/x/async/examples/timeout.v new file mode 100644 index 000000000..c706af1b5 --- /dev/null +++ b/vlib/x/async/examples/timeout.v @@ -0,0 +1,23 @@ +import context +import time +import x.async as xasync + +fn main() { + xasync.with_timeout(20 * time.millisecond, fn (mut ctx context.Context) ! { + done := ctx.done() + select { + _ := <-done { + return ctx.err() + } + 1 * time.second { + return error('work unexpectedly finished') + } + } + }) or { + println('timeout result: ${err.msg()}') + return + } + + eprintln('expected timeout error') + exit(1) +} diff --git a/vlib/x/async/examples/veb/README.md b/vlib/x/async/examples/veb/README.md new file mode 100644 index 000000000..e446a2f38 --- /dev/null +++ b/vlib/x/async/examples/veb/README.md @@ -0,0 +1,14 @@ +# veb integration example + +This example is a compile-time, in-memory `veb.Context` lifecycle sample. It +does not start a `veb` server and does not bind a port. + +The example is intentionally small because robust server lifecycle validation +belongs to `veb` itself. Here, `x.async` only coordinates a synthetic handler +step and returns the response body through `Task.wait()`. + +Run from the repository root: + +```sh +./v run vlib/x/async/examples/veb/app_lifecycle.v +``` diff --git a/vlib/x/async/examples/veb/app_lifecycle.v b/vlib/x/async/examples/veb/app_lifecycle.v new file mode 100644 index 000000000..195eb9bac --- /dev/null +++ b/vlib/x/async/examples/veb/app_lifecycle.v @@ -0,0 +1,27 @@ +import context +import veb +import x.async as xasync + +fn render_health_response() !string { + mut web_ctx := veb.Context{} + _ := web_ctx.text('veb ok') + if web_ctx.res.status_code != 200 { + return error('unexpected veb status') + } + return web_ctx.res.body +} + +fn main() { + mut task := xasync.run[string](fn (mut ctx context.Context) !string { + done := ctx.done() + select { + _ := <-done { + return ctx.err() + } + else {} + } + return render_health_response()! + })! + + println(task.wait()!) +} diff --git a/vlib/x/async/examples/worker_pool.v b/vlib/x/async/examples/worker_pool.v new file mode 100644 index 000000000..8f9522b99 --- /dev/null +++ b/vlib/x/async/examples/worker_pool.v @@ -0,0 +1,45 @@ +import context +import time +import x.async as xasync + +fn main() { + mut pool := xasync.new_pool(workers: 1, queue_size: 1)! + started := chan int{cap: 2} + release := chan bool{cap: 2} + job := fn [started, release] (mut ctx context.Context) ! { + _ = ctx + started <- 1 + _ := <-release + } + + pool.try_submit(job)! + if !wait_started(started) { + eprintln('pool job did not start') + exit(1) + } + + pool.try_submit(job)! + pool.try_submit(job) or { println('backpressure: ${err.msg()}') } + + release <- true + if !wait_started(started) { + eprintln('pool job did not start') + exit(1) + } + release <- true + + pool.close()! + println('pool drained') +} + +fn wait_started(started chan int) bool { + select { + _ := <-started { + return true + } + 1 * time.second { + return false + } + } + return false +} diff --git a/vlib/x/async/group.v b/vlib/x/async/group.v new file mode 100644 index 000000000..3cba8ab68 --- /dev/null +++ b/vlib/x/async/group.v @@ -0,0 +1,114 @@ +module async + +import context +import sync + +// Group coordinates a set of related concurrent jobs. +// +// Jobs share a derived context. The first job error is returned by wait() and +// cancels the shared context so sibling jobs can stop cooperatively. +@[heap] +pub struct Group { +mut: + ctx context.Context + cancel context.CancelFn = unsafe { nil } + wg &sync.WaitGroup = sync.new_waitgroup() + // Protects both WaitGroup lifecycle state and first_err. WaitGroup.add() + // must not race with WaitGroup.wait(), so go() and wait() share this lock. + mutex &sync.Mutex = sync.new_mutex() + // Stored once. Later job errors never replace the first failure observed by + // the group, even when several jobs fail concurrently after cancellation. + first_err IError = none + // Set before wait() calls wg.wait(). Once true, no further jobs are accepted. + waiting bool +} + +// new_group creates a Group with a shared cancellable context derived from parent. +// +// The parent is accepted by value to keep the public call site simple. The +// derived context is owned by the group and canceled on first job error or when +// wait() completes. +pub fn new_group(parent context.Context) &Group { + ctx, cancel := new_cancel_context(parent) + return &Group{ + ctx: context.Context(ctx) + cancel: cancel + wg: sync.new_waitgroup() + mutex: sync.new_mutex() + } +} + +// go starts f in a new concurrent task. +// +// Calling go after wait has started returns an error. The task should not +// panic; panics in spawned work are not recovered by x.async. +pub fn (mut g Group) go(f JobFn) ! { + if f == unsafe { nil } { + return error(err_nil_job) + } + g.mutex.lock() + if g.waiting { + g.mutex.unlock() + return error(err_group_go_after_wait) + } + // add() happens while holding the same mutex that guards wait(), preventing + // callers from triggering sync.WaitGroup's add-while-waiting misuse panic. + g.wg.add(1) + g.mutex.unlock() + spawn run_group_job(mut g, f) +} + +// wait blocks until all accepted group jobs finish. +// +// It returns the first job error, if any. wait may be called once; after it +// starts, the group no longer accepts new jobs. +pub fn (mut g Group) wait() ! { + g.mutex.lock() + if g.waiting { + g.mutex.unlock() + return error(err_group_wait_called) + } + g.waiting = true + g.mutex.unlock() + + g.wg.wait() + // Always cancel after all jobs finish to release the derived context and to + // make the lifecycle symmetric with context.with_cancel(). + g.cancel() + err := g.get_first_error() + if err !is none { + return err + } +} + +fn run_group_job(mut g Group, f JobFn) { + defer { + g.wg.done() + } + // Each job gets its own local mutable interface value. The underlying + // context is shared and synchronized by the context module. + mut job_ctx := g.ctx + f(mut job_ctx) or { g.set_first_error(err) } +} + +fn (mut g Group) set_first_error(err IError) { + mut should_cancel := false + g.mutex.lock() + if g.first_err is none { + g.first_err = err + should_cancel = true + } + g.mutex.unlock() + if should_cancel { + // Cancel outside the group mutex. context cancellation can notify child + // contexts, so keeping our own lock out of that path avoids lock nesting. + g.cancel() + } +} + +fn (mut g Group) get_first_error() IError { + g.mutex.lock() + err := g.first_err + g.mutex.unlock() + return err +} diff --git a/vlib/x/async/group_test.v b/vlib/x/async/group_test.v new file mode 100644 index 000000000..daf6297bb --- /dev/null +++ b/vlib/x/async/group_test.v @@ -0,0 +1,211 @@ +import context +import time +import x.async as xasync + +fn test_with_cancel_sets_error_and_closes_done() { + mut ctx, cancel := xasync.with_cancel() + cancel() + done := ctx.done() + select { + _ := <-done { + assert ctx.err().msg() == 'context canceled' + } + 1 * time.second { + assert false, 'cancel did not close the context done channel' + } + } +} + +fn test_group_success() { + parent := context.background() + mut group := xasync.new_group(parent) + done := chan int{cap: 2} + group.go(fn [done] (mut ctx context.Context) ! { + _ = ctx + done <- 1 + })! + group.go(fn [done] (mut ctx context.Context) ! { + _ = ctx + done <- 2 + })! + group.wait()! + assert (<-done) + (<-done) == 3 +} + +fn test_group_error_returns_first_error() { + parent := context.background() + mut group := xasync.new_group(parent) + group.go(fn (mut ctx context.Context) ! { + _ = ctx + return error('group failed') + })! + group.wait() or { + assert err.msg() == 'group failed' + return + } + assert false +} + +fn test_group_wait_without_tasks() { + parent := context.background() + mut group := xasync.new_group(parent) + group.wait()! +} + +fn test_group_refuses_go_after_wait() { + parent := context.background() + mut group := xasync.new_group(parent) + group.wait()! + group.go(fn (mut ctx context.Context) ! { + _ = ctx + }) or { + assert err.msg() == 'async: group does not accept new tasks after wait starts' + return + } + assert false +} + +fn test_group_refuses_second_wait() { + parent := context.background() + mut group := xasync.new_group(parent) + group.wait()! + group.wait() or { + assert err.msg() == 'async: group wait was already called' + return + } + assert false +} + +fn test_group_refuses_nil_job() { + parent := context.background() + mut group := xasync.new_group(parent) + nil_job := unsafe { xasync.JobFn(nil) } + group.go(nil_job) or { + assert err.msg() == 'async: job function is nil' + return + } + assert false +} + +fn test_group_cancels_siblings_cooperatively() { + parent := context.background() + mut group := xasync.new_group(parent) + cancelled := chan bool{cap: 1} + group.go(fn (mut ctx context.Context) ! { + _ = ctx + return error('stop siblings') + })! + group.go(fn [cancelled] (mut ctx context.Context) ! { + done := ctx.done() + select { + _ := <-done { + cancelled <- true + } + 1 * time.second { + cancelled <- false + } + } + })! + group.wait() or { assert err.msg() == 'stop siblings' } + assert <-cancelled +} + +fn test_group_first_error_remains_stable_with_concurrent_secondary_errors() { + parent := context.background() + mut group := xasync.new_group(parent) + secondary_ready := chan bool{cap: 8} + for _ in 0 .. 8 { + group.go(fn [secondary_ready] (mut ctx context.Context) ! { + secondary_ready <- true + done := ctx.done() + select { + _ := <-done { + return error('secondary error after cancellation') + } + 1 * time.second { + return error('secondary error timeout') + } + } + })! + } + for _ in 0 .. 8 { + assert <-secondary_ready + } + group.go(fn (mut ctx context.Context) ! { + _ = ctx + return error('primary error') + })! + group.wait() or { + assert err.msg() == 'primary error' + return + } + assert false +} + +fn test_group_many_short_jobs_return_first_error() { + jobs := 64 + mut group := xasync.new_group(context.background()) + done := chan int{cap: jobs} + for i in 0 .. jobs { + group.go(fn [done, i] (mut ctx context.Context) ! { + _ = ctx + done <- i + })! + } + group.go(fn (mut ctx context.Context) ! { + _ = ctx + return error('group stress failure') + })! + group.wait() or { + assert err.msg() == 'group stress failure' + mut seen := []bool{len: jobs} + for _ in 0 .. jobs { + select { + i := <-done { + assert i >= 0 + assert i < jobs + assert !seen[i] + seen[i] = true + } + 1 * time.second { + assert false, 'group short job did not finish' + } + } + } + for was_seen in seen { + assert was_seen + } + return + } + assert false +} + +fn test_group_parent_cancellation_is_observed_by_cooperative_job() { + parent_ctx, cancel := xasync.with_cancel() + mut group := xasync.new_group(parent_ctx) + observed := chan string{cap: 1} + group.go(fn [observed] (mut ctx context.Context) ! { + done := ctx.done() + select { + _ := <-done { + err := ctx.err() + observed <- err.msg() + return err + } + 1 * time.second { + observed <- 'not canceled' + return error('parent cancellation was not observed') + } + } + })! + cancel() + group.wait() or { assert err.msg() == 'context canceled' } + select { + msg := <-observed { + assert msg == 'context canceled' + } + 2 * time.second { + assert false, 'cooperative group job did not observe parent cancellation' + } + } +} diff --git a/vlib/x/async/periodic.v b/vlib/x/async/periodic.v new file mode 100644 index 000000000..560cfce29 --- /dev/null +++ b/vlib/x/async/periodic.v @@ -0,0 +1,68 @@ +module async + +import context +import time + +// every runs f repeatedly with interval spacing until ctx is canceled or f fails. +// +// This helper is intentionally blocking: it does not start a hidden scheduler or +// background loop. The first iteration runs after one interval. Iterations never +// overlap; a slow job delays the next interval. Cancellation is cooperative, so +// a running job must observe `ctx.done()` if it needs to stop before returning. +pub fn every(parent context.Context, interval time.Duration, f JobFn) ! { + if interval.nanoseconds() <= 0 { + return error(err_interval_invalid) + } + if f == unsafe { nil } { + return error(err_nil_job) + } + + mut ctx := parent + initial_err := ctx.err() + if initial_err !is none { + return initial_err + } + + done_ch := ctx.done() + mut watch_done := true + // context.background().done() is closed in V but err() remains none. Treat + // that as a non-cancelable context instead of returning immediately. + select { + _ := <-done_ch { + err := ctx.err() + if err !is none { + return err + } + watch_done = false + } + else {} + } + + for { + if watch_done { + select { + _ := <-done_ch { + err := ctx.err() + if err !is none { + return err + } + watch_done = false + } + interval { + run_periodic_iteration(mut ctx, f)! + } + } + } else { + time.sleep(interval) + run_periodic_iteration(mut ctx, f)! + } + } +} + +fn run_periodic_iteration(mut ctx context.Context, f JobFn) ! { + f(mut ctx)! + err := ctx.err() + if err !is none { + return err + } +} diff --git a/vlib/x/async/periodic_test.v b/vlib/x/async/periodic_test.v new file mode 100644 index 000000000..eaeef786d --- /dev/null +++ b/vlib/x/async/periodic_test.v @@ -0,0 +1,214 @@ +import context +import time +import x.async as xasync + +fn test_every_runs_at_least_one_iteration() { + parent_ctx, cancel := xasync.with_cancel() + ran := chan bool{cap: 1} + result := chan string{cap: 1} + worker := spawn fn [parent_ctx, ran, result] () { + xasync.every(parent_ctx, 5 * time.millisecond, fn [ran] (mut ctx context.Context) ! { + _ = ctx + ran <- true + }) or { + result <- err.msg() + return + } + result <- 'ok' + }() + + select { + did_run := <-ran { + assert did_run + } + 1 * time.second { + assert false, 'periodic job did not run' + } + } + cancel() + select { + msg := <-result { + assert msg == 'context canceled' + } + 1 * time.second { + assert false, 'every did not stop after cancellation' + } + } + worker.wait() +} + +fn test_every_stops_on_cancellation() { + parent_ctx, cancel := xasync.with_cancel() + entered := chan bool{cap: 1} + result := chan string{cap: 1} + worker := spawn fn [parent_ctx, entered, result] () { + entered <- true + xasync.every(parent_ctx, 1 * time.second, fn (mut ctx context.Context) ! { + _ = ctx + }) or { + result <- err.msg() + return + } + result <- 'ok' + }() + + select { + did_enter := <-entered { + assert did_enter + } + 1 * time.second { + assert false, 'every worker did not start' + } + } + cancel() + select { + msg := <-result { + assert msg == 'context canceled' + } + 1 * time.second { + assert false, 'every did not stop on context cancellation' + } + } + worker.wait() +} + +fn test_every_returns_immediately_when_parent_is_already_canceled() { + parent_ctx, cancel := xasync.with_cancel() + cancel() + xasync.every(parent_ctx, 1 * time.second, fn (mut ctx context.Context) ! { + _ = ctx + }) or { + assert err.msg() == 'context canceled' + return + } + assert false +} + +fn test_every_returns_iteration_error() { + parent_ctx, cancel := xasync.with_cancel() + result := chan string{cap: 1} + worker := spawn fn [parent_ctx, result] () { + xasync.every(parent_ctx, 5 * time.millisecond, fn (mut ctx context.Context) ! { + _ = ctx + return error('periodic failed') + }) or { + result <- err.msg() + return + } + result <- 'ok' + }() + + select { + msg := <-result { + assert msg == 'periodic failed' + } + 1 * time.second { + assert false, 'every did not return the periodic job error' + } + } + cancel() + worker.wait() +} + +fn test_every_rejects_zero_interval() { + xasync.every(context.background(), 0 * time.millisecond, fn (mut ctx context.Context) ! { + _ = ctx + }) or { + assert err.msg() == 'async: interval must be positive' + return + } + assert false +} + +fn test_every_rejects_negative_interval() { + xasync.every(context.background(), -1 * time.millisecond, fn (mut ctx context.Context) ! { + _ = ctx + }) or { + assert err.msg() == 'async: interval must be positive' + return + } + assert false +} + +fn test_every_rejects_nil_job() { + nil_job := unsafe { xasync.JobFn(nil) } + xasync.every(context.background(), 1 * time.second, nil_job) or { + assert err.msg() == 'async: job function is nil' + return + } + assert false +} + +fn test_every_does_not_overlap_iterations() { + parent_ctx, cancel := xasync.with_cancel() + active := chan bool{cap: 1} + active <- true + entered := chan bool{cap: 2} + release := chan bool{cap: 2} + overlap := chan bool{cap: 1} + result := chan string{cap: 1} + worker := spawn fn [parent_ctx, active, entered, release, overlap, result] () { + xasync.every(parent_ctx, 5 * time.millisecond, fn [active, entered, release, overlap] (mut ctx context.Context) ! { + select { + _ := <-active {} + else { + overlap <- true + return error('periodic overlap') + } + } + entered <- true + done := ctx.done() + select { + _ := <-release {} + _ := <-done { + active <- true + return ctx.err() + } + } + active <- true + }) or { + result <- err.msg() + return + } + result <- 'ok' + }() + + wait_for_periodic_entry(entered) + select { + _ := <-entered { + assert false, 'periodic iterations overlapped while first job was still running' + } + 50 * time.millisecond {} + } + select { + did_overlap := <-overlap { + assert !did_overlap + } + else {} + } + + release <- true + wait_for_periodic_entry(entered) + cancel() + release <- true + select { + msg := <-result { + assert msg == 'context canceled' + } + 1 * time.second { + assert false, 'every did not stop after non-overlap test cancellation' + } + } + worker.wait() +} + +fn wait_for_periodic_entry(entered chan bool) { + select { + did_enter := <-entered { + assert did_enter + } + 1 * time.second { + assert false, 'periodic job did not enter' + } + } +} diff --git a/vlib/x/async/pool.v b/vlib/x/async/pool.v new file mode 100644 index 000000000..9bed5f09d --- /dev/null +++ b/vlib/x/async/pool.v @@ -0,0 +1,161 @@ +module async + +import context +import sync + +// PoolConfig configures a fixed-size concurrency pool. +// +// Both values must be positive. `workers` is the maximum number of jobs that +// can execute concurrently; `queue_size` is the bounded backlog accepted by +// try_submit() while all worker slots are busy. +@[params] +pub struct PoolConfig { +pub: + workers int + queue_size int +} + +// Pool limits concurrent JobFn execution with fixed worker slots and bounded backlog. +// +// The pool owns one derived context shared by all jobs. Closing the pool stops +// new submissions, waits for every accepted job, and returns the first job error +// if any. +@[heap] +pub struct Pool { +mut: + ctx context.Context + cancel context.CancelFn = unsafe { nil } + tokens chan bool + wg &sync.WaitGroup = sync.new_waitgroup() + max_jobs int + // Protects lifecycle flags, accepted job count, WaitGroup.add(), and + // first_err. This lock is never held while a user JobFn runs. + mutex &sync.Mutex = sync.new_mutex() + first_err IError = none + closed bool + waited bool + accepted int +} + +// new_pool creates a Pool with a background parent context. +pub fn new_pool(config PoolConfig) !&Pool { + return new_pool_with_context(context.background(), config) +} + +// new_pool_with_context creates a Pool with a context derived from parent. +// +// The worker limit and queue size are fixed for the pool lifetime. Parent +// cancellation is cooperative: jobs must observe `ctx.done()` and return. +pub fn new_pool_with_context(parent context.Context, config PoolConfig) !&Pool { + if config.workers <= 0 { + return error(err_pool_workers_invalid) + } + if config.queue_size <= 0 { + return error(err_pool_queue_size_invalid) + } + ctx, cancel := new_cancel_context(parent) + mut pool := &Pool{ + ctx: context.Context(ctx) + cancel: cancel + tokens: chan bool{cap: config.workers} + wg: sync.new_waitgroup() + max_jobs: config.workers + config.queue_size + mutex: sync.new_mutex() + } + for _ in 0 .. config.workers { + pool.tokens <- true + } + return pool +} + +// try_submit accepts f if the pool is open and its bounded backlog has capacity. +// +// It never blocks for queue space. A full backlog returns `async: pool queue is +// full`, making backpressure explicit for callers. +pub fn (mut p Pool) try_submit(f JobFn) ! { + if f == unsafe { nil } { + return error(err_nil_job) + } + p.mutex.lock() + if p.closed { + p.mutex.unlock() + return error(err_pool_closed) + } + if p.accepted >= p.max_jobs { + p.mutex.unlock() + return error(err_pool_queue_full) + } + p.accepted++ + // add() is protected by the same mutex as wait(), so callers cannot race an + // accepted job against pool shutdown. + p.wg.add(1) + p.mutex.unlock() + // The JobFn is passed directly to the spawned wrapper instead of being + // stored in a channel. That keeps closure ownership with V's normal spawn + // path while the token channel below still enforces the fixed worker limit. + spawn run_pool_job(mut p, f) +} + +// wait closes the pool to new submissions, drains accepted jobs, and waits for completion. +// +// wait is one-shot. It returns the first job error observed by any accepted job. +pub fn (mut p Pool) wait() ! { + p.mutex.lock() + if p.waited { + p.mutex.unlock() + return error(err_pool_wait_called) + } + p.waited = true + p.closed = true + p.mutex.unlock() + + p.wg.wait() + p.cancel() + err := p.get_first_error() + if err !is none { + return err + } +} + +// close is an explicit lifecycle alias for wait(). +// +// It rejects later submissions and waits for all accepted jobs before returning. +pub fn (mut p Pool) close() ! { + p.wait()! +} + +fn run_pool_job(mut p Pool, f JobFn) { + defer { + p.finish_accepted_job() + p.wg.done() + } + // The token channel is a bounded semaphore. At most `workers` accepted jobs + // can pass this point and run user code concurrently. + _ := <-p.tokens + defer { + p.tokens <- true + } + mut job_ctx := p.ctx + f(mut job_ctx) or { p.set_first_error(err) } +} + +fn (mut p Pool) finish_accepted_job() { + p.mutex.lock() + p.accepted-- + p.mutex.unlock() +} + +fn (mut p Pool) set_first_error(err IError) { + p.mutex.lock() + if p.first_err is none { + p.first_err = err + } + p.mutex.unlock() +} + +fn (mut p Pool) get_first_error() IError { + p.mutex.lock() + err := p.first_err + p.mutex.unlock() + return err +} diff --git a/vlib/x/async/pool_test.v b/vlib/x/async/pool_test.v new file mode 100644 index 000000000..a9a62cf62 --- /dev/null +++ b/vlib/x/async/pool_test.v @@ -0,0 +1,368 @@ +import context +import time +import x.async as xasync + +fn test_pool_rejects_invalid_config() { + xasync.new_pool(workers: 0, queue_size: 1) or { + assert err.msg() == 'async: pool worker count must be positive' + return + } + assert false +} + +fn test_pool_rejects_invalid_queue_size() { + xasync.new_pool(workers: 1, queue_size: 0) or { + assert err.msg() == 'async: pool queue size must be positive' + return + } + assert false +} + +fn test_pool_respects_worker_count() { + mut pool := xasync.new_pool(workers: 2, queue_size: 4)! + started := chan bool{cap: 4} + release := chan bool{cap: 4} + for _ in 0 .. 4 { + pool.try_submit(fn [started, release] (mut ctx context.Context) ! { + _ = ctx + started <- true + _ := <-release + })! + } + + wait_for_bool_signal(started, 'first pool job did not start') + wait_for_bool_signal(started, 'second pool job did not start') + select { + _ := <-started { + assert false, 'pool started more jobs than worker count before release' + } + 100 * time.millisecond {} + } + + for _ in 0 .. 4 { + release <- true + } + pool.close()! +} + +fn test_pool_queue_full_returns_backpressure_error() { + mut pool := xasync.new_pool(workers: 1, queue_size: 1)! + started := chan bool{cap: 1} + release := chan bool{cap: 2} + blocking_job := fn [started, release] (mut ctx context.Context) ! { + _ = ctx + started <- true + _ := <-release + } + + pool.try_submit(blocking_job)! + wait_for_bool_signal(started, 'blocking pool job did not start') + pool.try_submit(blocking_job)! + pool.try_submit(blocking_job) or { + assert err.msg() == 'async: pool queue is full' + release <- true + release <- true + pool.close()! + return + } + assert false +} + +fn test_pool_submit_after_close_is_refused() { + mut pool := xasync.new_pool(workers: 1, queue_size: 1)! + pool.close()! + pool.try_submit(fn (mut ctx context.Context) ! { + _ = ctx + }) or { + assert err.msg() == 'async: pool is closed' + return + } + assert false +} + +fn test_pool_wait_is_one_shot() { + mut pool := xasync.new_pool(workers: 1, queue_size: 1)! + pool.wait()! + pool.wait() or { + assert err.msg() == 'async: pool wait was already called' + return + } + assert false +} + +fn test_pool_refuses_nil_job() { + mut pool := xasync.new_pool(workers: 1, queue_size: 1)! + nil_job := unsafe { xasync.JobFn(nil) } + pool.try_submit(nil_job) or { + assert err.msg() == 'async: job function is nil' + pool.close()! + return + } + assert false +} + +fn test_pool_close_waits_for_accepted_jobs() { + mut pool := xasync.new_pool(workers: 1, queue_size: 1)! + release := chan bool{cap: 1} + closed := chan bool{cap: 1} + pool.try_submit(fn [release] (mut ctx context.Context) ! { + _ = ctx + _ := <-release + })! + + close_thread := spawn fn [mut pool, closed] () { + pool.close() or { + closed <- false + return + } + closed <- true + }() + select { + _ := <-closed { + assert false, 'pool close returned before accepted job completed' + } + 100 * time.millisecond {} + } + release <- true + select { + ok := <-closed { + assert ok + } + 1 * time.second { + assert false, 'pool close did not return after accepted job completed' + } + } + close_thread.wait() +} + +fn test_pool_first_error_is_propagated() { + mut pool := xasync.new_pool(workers: 1, queue_size: 2)! + started := chan bool{cap: 1} + release := chan bool{cap: 1} + pool.try_submit(fn [started, release] (mut ctx context.Context) ! { + _ = ctx + started <- true + _ := <-release + return error('first pool failure') + })! + wait_for_bool_signal(started, 'first pool failure job did not start') + pool.try_submit(fn (mut ctx context.Context) ! { + _ = ctx + return error('second pool failure') + })! + release <- true + pool.close() or { + assert err.msg() == 'first pool failure' + return + } + assert false +} + +fn test_pool_error_does_not_drop_accepted_jobs() { + mut pool := xasync.new_pool(workers: 1, queue_size: 1)! + accepted_job_ran := chan bool{cap: 1} + pool.try_submit(fn (mut ctx context.Context) ! { + _ = ctx + return error('first pool failure') + })! + pool.try_submit(fn [accepted_job_ran] (mut ctx context.Context) ! { + _ = ctx + accepted_job_ran <- true + })! + pool.close() or { + assert err.msg() == 'first pool failure' + select { + did_run := <-accepted_job_ran { + assert did_run + } + 1 * time.second { + assert false, 'accepted pool job did not run after earlier error' + } + } + return + } + assert false +} + +fn test_pool_concurrent_errors_return_one_error_and_drain_accepted_jobs() { + error_jobs := 4 + ok_jobs := 8 + mut pool := xasync.new_pool(workers: error_jobs, queue_size: ok_jobs)! + started := chan bool{cap: error_jobs} + release := chan bool{cap: error_jobs} + completed_ok := chan bool{cap: ok_jobs} + for i in 0 .. error_jobs { + pool.try_submit(fn [started, release, i] (mut ctx context.Context) ! { + _ = ctx + started <- true + _ := <-release + return error('pool concurrent failure ${i}') + })! + } + for _ in 0 .. ok_jobs { + pool.try_submit(fn [completed_ok] (mut ctx context.Context) ! { + _ = ctx + completed_ok <- true + })! + } + for _ in 0 .. error_jobs { + wait_for_bool_signal(started, 'pool error job did not start') + } + for _ in 0 .. error_jobs { + release <- true + } + pool.close() or { + assert err.msg().starts_with('pool concurrent failure ') + for _ in 0 .. ok_jobs { + wait_for_bool_signal(completed_ok, 'accepted ok pool job did not complete') + } + return + } + assert false +} + +fn test_pool_close_drains_many_accepted_jobs_while_finishing() { + jobs := 12 + workers := 3 + mut pool := xasync.new_pool(workers: workers, queue_size: jobs - workers)! + started := chan bool{cap: jobs} + release := chan bool{cap: jobs} + finished := chan bool{cap: jobs} + closed := chan bool{cap: 1} + for _ in 0 .. jobs { + pool.try_submit(fn [started, release, finished] (mut ctx context.Context) ! { + _ = ctx + started <- true + _ := <-release + finished <- true + })! + } + for _ in 0 .. workers { + wait_for_bool_signal(started, 'initial pool job did not start') + } + close_thread := spawn fn [mut pool, closed] () { + pool.close() or { + closed <- false + return + } + closed <- true + }() + wait_until_pool_rejects_as_closed(mut pool) + assert_no_bool_signal(closed, 'pool close returned while accepted jobs were still blocked') + + for _ in 0 .. jobs - 1 { + release <- true + } + for _ in 0 .. jobs - 1 { + wait_for_bool_signal(finished, 'accepted pool job did not finish') + } + assert_no_bool_signal(closed, 'pool close returned before the last accepted job finished') + + release <- true + wait_for_bool_signal(finished, 'last accepted pool job did not finish') + wait_for_bool_signal(closed, 'pool close did not drain accepted jobs') + close_thread.wait() +} + +fn test_pool_parent_cancellation_is_observed_by_cooperative_job() { + parent_ctx, cancel := xasync.with_cancel() + mut pool := xasync.new_pool_with_context(parent_ctx, workers: 1, queue_size: 1)! + started := chan bool{cap: 1} + pool.try_submit(fn [started] (mut ctx context.Context) ! { + started <- true + done := ctx.done() + select { + _ := <-done { + return ctx.err() + } + 1 * time.second { + return error('pool job did not observe parent cancellation') + } + } + return error('unreachable') + })! + wait_for_bool_signal(started, 'pool job did not start before parent cancellation') + cancel() + pool.close() or { + assert err.msg() == 'context canceled' + return + } + assert false +} + +fn test_pool_non_cooperative_job_finishes_naturally_after_parent_cancel() { + parent_ctx, cancel := xasync.with_cancel() + mut pool := xasync.new_pool_with_context(parent_ctx, workers: 1, queue_size: 1)! + finished := chan bool{cap: 1} + pool.try_submit(fn [finished] (mut ctx context.Context) ! { + _ = ctx + time.sleep(20 * time.millisecond) + finished <- true + })! + cancel() + pool.close()! + assert <-finished +} + +fn test_pool_short_stress_many_jobs() { + jobs := 100 + mut pool := xasync.new_pool(workers: 4, queue_size: jobs)! + done := chan int{cap: jobs} + for i in 0 .. jobs { + pool.try_submit(fn [done, i] (mut ctx context.Context) ! { + _ = ctx + done <- i + })! + } + pool.close()! + + mut seen := []bool{len: jobs} + for _ in 0 .. jobs { + i := <-done + assert i >= 0 + assert i < jobs + assert !seen[i] + seen[i] = true + } + for was_seen in seen { + assert was_seen + } +} + +fn wait_for_bool_signal(signal chan bool, message string) { + select { + ok := <-signal { + assert ok + } + 1 * time.second { + assert false, message + } + } +} + +fn assert_no_bool_signal(signal chan bool, message string) { + select { + _ := <-signal { + assert false, message + } + else {} + } +} + +fn wait_until_pool_rejects_as_closed(mut pool xasync.Pool) { + probe := fn (mut ctx context.Context) ! { + _ = ctx + } + for _ in 0 .. 100 { + pool.try_submit(probe) or { + if err.msg() == 'async: pool is closed' { + return + } + assert err.msg() == 'async: pool queue is full' + time.sleep(1 * time.millisecond) + continue + } + assert false, 'pool accepted probe while backlog should be full' + } + assert false, 'pool close did not start' +} diff --git a/vlib/x/async/task.v b/vlib/x/async/task.v new file mode 100644 index 000000000..55b36b4bf --- /dev/null +++ b/vlib/x/async/task.v @@ -0,0 +1,86 @@ +module async + +import context +import sync + +struct TaskResult[T] { + value T + err IError = none +} + +// Task represents one concurrent computation that produces either a value or an error. +// +// A Task is intentionally small: it starts one `spawn`, stores one result in a +// bounded channel, and lets the owner consume that result with wait(). It does +// not recover panics and it does not kill work that ignores cancellation. +@[heap] +pub struct Task[T] { +mut: + ctx context.Context + cancel context.CancelFn = unsafe { nil } + result_ch chan TaskResult[T] + // Guards the one-shot wait contract. The result channel carries the actual + // value/error, so no additional shared result state is needed. + mutex &sync.Mutex = sync.new_mutex() + waited bool +} + +// run starts f with a background context and returns a Task for its result. +// +// The returned Task owns a derived context that is canceled when f finishes. +pub fn run[T](f TaskFn[T]) !&Task[T] { + return run_with_context[T](context.background(), f) +} + +// run_with_context starts f with a context derived from parent. +// +// Cancellation is cooperative. If parent is canceled, f must observe ctx.done() +// and return. The result channel is buffered so f can publish its single result +// even if the owner has not called wait() yet. +pub fn run_with_context[T](parent context.Context, f TaskFn[T]) !&Task[T] { + if f == unsafe { nil } { + return error(err_nil_job) + } + ctx, cancel := new_cancel_context(parent) + mut task := &Task[T]{ + ctx: context.Context(ctx) + cancel: cancel + result_ch: chan TaskResult[T]{cap: 1} + mutex: sync.new_mutex() + } + spawn fn [T](task &Task[T], f TaskFn[T]) { + mut job_ctx := task.ctx + value := f(mut job_ctx) or { + task.result_ch <- TaskResult[T]{ + err: err + } + task.cancel() + return + } + task.result_ch <- TaskResult[T]{ + value: value + } + task.cancel() + }(task, f) + return task +} + +// wait blocks until the task publishes its result, then returns the value or error. +// +// wait is one-shot. A second call returns a stable error instead of blocking on +// an already-consumed result channel. +pub fn (mut task Task[T]) wait() !T { + task.mutex.lock() + if task.waited { + task.mutex.unlock() + return error(err_task_wait_called) + } + task.waited = true + task.mutex.unlock() + + result := <-task.result_ch + if result.err !is none { + return result.err + } + return result.value +} diff --git a/vlib/x/async/task_test.v b/vlib/x/async/task_test.v new file mode 100644 index 000000000..0425ab829 --- /dev/null +++ b/vlib/x/async/task_test.v @@ -0,0 +1,184 @@ +import context +import time +import x.async as xasync + +fn test_task_returns_value() { + mut task := xasync.run[int](fn (mut ctx context.Context) !int { + _ = ctx + return 42 + })! + assert task.wait()! == 42 +} + +fn test_task_returns_error() { + mut task := xasync.run[int](fn (mut ctx context.Context) !int { + _ = ctx + return error('task failed') + })! + task.wait() or { + assert err.msg() == 'task failed' + return + } + assert false +} + +fn test_task_wait_is_one_shot() { + mut task := xasync.run[string](fn (mut ctx context.Context) !string { + _ = ctx + return 'ok' + })! + assert task.wait()! == 'ok' + task.wait() or { + assert err.msg() == 'async: task wait was already called' + return + } + assert false +} + +fn test_task_concurrent_wait_is_one_shot_without_deadlock() { + release := chan bool{cap: 1} + results := chan string{cap: 2} + mut task := xasync.run[int](fn [release] (mut ctx context.Context) !int { + _ = ctx + _ := <-release + return 7 + })! + + first_waiter := spawn fn [mut task, results] () { + value := task.wait() or { + results <- 'error:${err.msg()}' + return + } + results <- 'value:${value}' + }() + second_waiter := spawn fn [mut task, results] () { + value := task.wait() or { + results <- 'error:${err.msg()}' + return + } + results <- 'value:${value}' + }() + release <- true + + mut saw_value := false + mut saw_second_wait_error := false + for _ in 0 .. 2 { + select { + result := <-results { + match result { + 'value:7' { + saw_value = true + } + 'error:async: task wait was already called' { + saw_second_wait_error = true + } + else { + assert false, 'unexpected concurrent wait result: ${result}' + } + } + } + 1 * time.second { + assert false, 'concurrent task waiters did not both finish' + } + } + } + first_waiter.wait() + second_waiter.wait() + + assert saw_value + assert saw_second_wait_error +} + +fn test_task_parent_cancellation_is_observed() { + parent_ctx, cancel := xasync.with_cancel() + mut task := xasync.run_with_context[int](parent_ctx, fn (mut ctx context.Context) !int { + done := ctx.done() + select { + _ := <-done { + return ctx.err() + } + 1 * time.second { + return error('task did not observe parent cancellation') + } + } + return error('unreachable') + })! + cancel() + task.wait() or { + assert err.msg() == 'context canceled' + return + } + assert false +} + +fn test_task_parent_already_canceled_is_observed() { + parent_ctx, cancel := xasync.with_cancel() + cancel() + mut task := xasync.run_with_context[int](parent_ctx, fn (mut ctx context.Context) !int { + done := ctx.done() + select { + _ := <-done { + return ctx.err() + } + 1 * time.second { + return error('task did not observe already canceled parent') + } + } + return error('unreachable') + })! + task.wait() or { + assert err.msg() == 'context canceled' + return + } + assert false +} + +fn test_task_observes_context_done() { + parent_ctx, cancel := xasync.with_cancel() + observed := chan bool{cap: 1} + mut task := xasync.run_with_context[int](parent_ctx, fn [observed] (mut ctx context.Context) !int { + done := ctx.done() + select { + _ := <-done { + observed <- true + return ctx.err() + } + 1 * time.second { + observed <- false + return error('task did not observe ctx.done()') + } + } + return error('unreachable') + })! + cancel() + task.wait() or { assert err.msg() == 'context canceled' } + select { + did_observe := <-observed { + assert did_observe + } + 1 * time.second { + assert false, 'task did not report cancellation observation' + } + } +} + +fn test_task_ignores_cancellation_but_result_publication_is_buffered() { + parent_ctx, cancel := xasync.with_cancel() + mut task := xasync.run_with_context[int](parent_ctx, fn (mut ctx context.Context) !int { + _ = ctx + time.sleep(20 * time.millisecond) + return 9 + })! + cancel() + time.sleep(60 * time.millisecond) + assert task.wait()! == 9 +} + +fn test_task_refuses_nil_function() { + nil_task_fn := unsafe { xasync.TaskFn[int](nil) } + xasync.run[int](nil_task_fn) or { + assert err.msg() == 'async: job function is nil' + return + } + assert false +} diff --git a/vlib/x/async/tests/README.md b/vlib/x/async/tests/README.md new file mode 100644 index 000000000..e0c66466d --- /dev/null +++ b/vlib/x/async/tests/README.md @@ -0,0 +1,28 @@ +# x.async integration tests + +This folder contains module-oriented integration tests for `x.async`. + +Tests are validation-first programs. Unlike the examples, they are written to +assert edge cases, error propagation, cancellation behavior, backpressure, and +module integration boundaries. They should fail when a regression is introduced. + +The tests are deliberately local and synthetic. They verify that `x.async` +composes cleanly with selected V modules without modifying those modules and +without depending on public network services, fixed ports, filesystem paths, or +fragile server shutdown behavior. + +Current subfolders: + +- `net_http/`: in-memory `net.http` request/response work through `Pool`. +- `net_websocket/`: in-memory `websocket.Message` processing and callback-style + error propagation through `Task` and `Group`. +- `mcp/`: in-memory MCP JSON-RPC request/response dispatch through `Task`. +- `veb/`: in-memory `veb.Context` response lifecycle through `with_timeout()`. + +Run through the guarded validation path: + +```sh +sh vlib/x/async/tools/validate.sh +``` + +The validation script uses isolated `VTMP` and `VCACHE` and runs serially. diff --git a/vlib/x/async/tests/mcp/README.md b/vlib/x/async/tests/mcp/README.md new file mode 100644 index 000000000..c8ed25bb2 --- /dev/null +++ b/vlib/x/async/tests/mcp/README.md @@ -0,0 +1,11 @@ +# mcp integration tests + +These tests keep MCP traffic in memory. They do not start stdio, HTTP, a child +process, or any external MCP service. + +The coverage target is the `x.async` boundary around MCP-shaped work: + +- build an MCP request; +- decode and validate it inside a `Task`; +- produce an MCP response; +- decode the result after `Task.wait()`. diff --git a/vlib/x/async/tests/mcp/mcp_integration_test.v b/vlib/x/async/tests/mcp/mcp_integration_test.v new file mode 100644 index 000000000..e54b53068 --- /dev/null +++ b/vlib/x/async/tests/mcp/mcp_integration_test.v @@ -0,0 +1,28 @@ +import context +import mcp +import x.async as xasync + +struct TestEchoArgs { +pub: + text string +} + +fn test_mcp_task_dispatches_in_memory_request() { + request := mcp.new_request(1, 'tools/call', TestEchoArgs{ + text: 'hello' + }) + mut task := xasync.run[mcp.Response](fn [request] (mut ctx context.Context) !mcp.Response { + _ = ctx + req := mcp.decode_request(request.encode())! + if req.method != 'tools/call' { + return error('unexpected MCP method') + } + args := req.decode_params[TestEchoArgs]()! + result := mcp.tool_text_result(args.text) + return mcp.new_response(1, result, mcp.ResponseError{}) + })! + + response := task.wait()! + result := response.decode_result[mcp.ToolResult]()! + assert result.content.contains('hello') +} diff --git a/vlib/x/async/tests/net_http/README.md b/vlib/x/async/tests/net_http/README.md new file mode 100644 index 000000000..e6443df34 --- /dev/null +++ b/vlib/x/async/tests/net_http/README.md @@ -0,0 +1,11 @@ +# net.http integration tests + +These tests use `net.http` request and response values in memory. They do not +open a listener, bind a port, or call an external service. + +The coverage target is the `x.async` boundary around HTTP-shaped work: + +- bounded pool submission; +- explicit queue-full backpressure; +- draining accepted request handlers through `Pool.close()`; +- preserving response data produced by accepted jobs. diff --git a/vlib/x/async/tests/net_http/net_http_integration_test.v b/vlib/x/async/tests/net_http/net_http_integration_test.v new file mode 100644 index 000000000..a3b339d6b --- /dev/null +++ b/vlib/x/async/tests/net_http/net_http_integration_test.v @@ -0,0 +1,73 @@ +import context +import net.http +import time +import x.async as xasync + +fn test_net_http_pool_handles_synthetic_requests_with_backpressure() { + mut pool := xasync.new_pool(workers: 1, queue_size: 1)! + started := chan bool{cap: 1} + release := chan bool{cap: 2} + responses := chan int{cap: 2} + first_req := http.new_request(.get, '/first', '') + second_req := http.new_request(.get, '/second', '') + third_req := http.new_request(.get, '/third', '') + + pool.try_submit(fn [first_req, started, release, responses] (mut ctx context.Context) ! { + _ = ctx + started <- true + _ := <-release + resp := synthetic_http_response(first_req)! + responses <- resp.status_code + })! + wait_for_http_signal(started, 'first synthetic HTTP job did not start') + + pool.try_submit(fn [second_req, release, responses] (mut ctx context.Context) ! { + _ = ctx + _ := <-release + resp := synthetic_http_response(second_req)! + responses <- resp.status_code + })! + pool.try_submit(fn [third_req] (mut ctx context.Context) ! { + _ = ctx + _ = third_req + }) or { + assert err.msg() == 'async: pool queue is full' + release <- true + release <- true + pool.close()! + assert read_http_status(responses) == 200 + assert read_http_status(responses) == 200 + return + } + assert false +} + +fn synthetic_http_response(req http.Request) !http.Response { + if req.url == '' { + return error('empty synthetic HTTP URL') + } + return http.new_response(status: .ok, body: req.url) +} + +fn wait_for_http_signal(signal chan bool, message string) { + select { + ok := <-signal { + assert ok + } + 1 * time.second { + assert false, message + } + } +} + +fn read_http_status(responses chan int) int { + select { + status := <-responses { + return status + } + 1 * time.second { + assert false, 'synthetic HTTP response was not produced' + } + } + return 0 +} diff --git a/vlib/x/async/tests/net_websocket/README.md b/vlib/x/async/tests/net_websocket/README.md new file mode 100644 index 000000000..12970ed4a --- /dev/null +++ b/vlib/x/async/tests/net_websocket/README.md @@ -0,0 +1,11 @@ +# net.websocket integration tests + +These tests are intentionally in-memory and synthetic. They exercise +`websocket.Message` processing and callback-shaped control flow with `x.async`, +but they do not start a websocket server or claim end-to-end websocket server +coverage. + +That limitation is important: in this V snapshot, `websocket.Server.close()` is +not a complete, stable shutdown primitive for a fragile validation test. The +tests therefore avoid binding ports and focus on the part `x.async` can safely +compose: message work, cancellation, and error propagation around callbacks. diff --git a/vlib/x/async/tests/net_websocket/net_websocket_integration_test.v b/vlib/x/async/tests/net_websocket/net_websocket_integration_test.v new file mode 100644 index 000000000..bc0414827 --- /dev/null +++ b/vlib/x/async/tests/net_websocket/net_websocket_integration_test.v @@ -0,0 +1,44 @@ +import context +import net.websocket +import x.async as xasync + +fn test_net_websocket_task_processes_message_in_memory() { + msg := websocket.Message{ + opcode: .text_frame + payload: 'hello'.bytes() + } + mut task := xasync.run[string](fn [msg] (mut ctx context.Context) !string { + _ = ctx + return websocket_message_text(msg)! + })! + assert task.wait()! == 'hello' +} + +fn test_net_websocket_callback_error_propagates_through_group() { + mut group := xasync.new_group(context.background()) + observed := chan string{cap: 1} + group.go(fn [observed] (mut ctx context.Context) ! { + _ = ctx + msg := websocket.Message{ + opcode: .close + } + text := websocket_message_text(msg) or { + observed <- err.msg() + return err + } + observed <- text + })! + group.wait() or { + assert err.msg() == 'unsupported websocket opcode' + assert <-observed == 'unsupported websocket opcode' + return + } + assert false +} + +fn websocket_message_text(msg websocket.Message) !string { + if msg.opcode != .text_frame { + return error('unsupported websocket opcode') + } + return msg.payload.bytestr() +} diff --git a/vlib/x/async/tests/veb/README.md b/vlib/x/async/tests/veb/README.md new file mode 100644 index 000000000..eb1ed8117 --- /dev/null +++ b/vlib/x/async/tests/veb/README.md @@ -0,0 +1,8 @@ +# veb integration tests + +These tests use `veb.Context` in memory. They do not start a `veb` server, bind +a port, or depend on HTTP client/server timing. + +The coverage target is narrow: `x.async` can bound and propagate errors for a +synthetic veb handler step without becoming part of `veb`'s runtime or server +lifecycle. diff --git a/vlib/x/async/tests/veb/veb_integration_test.v b/vlib/x/async/tests/veb/veb_integration_test.v new file mode 100644 index 000000000..d00e62403 --- /dev/null +++ b/vlib/x/async/tests/veb/veb_integration_test.v @@ -0,0 +1,18 @@ +import context +import time +import veb +import x.async as xasync + +fn test_veb_context_response_inside_timeout() { + xasync.with_timeout(1 * time.second, fn (mut ctx context.Context) ! { + _ = ctx + mut web_ctx := veb.Context{} + _ := web_ctx.text('hello from veb') + if web_ctx.res.status_code != 200 { + return error('unexpected veb status') + } + if web_ctx.res.body != 'hello from veb' { + return error('unexpected veb body') + } + })! +} diff --git a/vlib/x/async/timeout.v b/vlib/x/async/timeout.v new file mode 100644 index 000000000..31ca8d11e --- /dev/null +++ b/vlib/x/async/timeout.v @@ -0,0 +1,79 @@ +module async + +import context +import time + +struct TimeoutResult { + err IError = none +} + +// with_timeout runs f with a background context and returns an error if timeout expires first. +pub fn with_timeout(timeout time.Duration, f JobFn) ! { + with_timeout_context(context.background(), timeout, f)! +} + +// with_timeout_context runs f with a context derived from parent and bounded by timeout. +// +// If timeout expires before f returns, this returns `async: timeout` and +// cancels the derived context. The job must observe the context to stop early; +// x.async does not kill spawned work. +pub fn with_timeout_context(parent context.Context, timeout time.Duration, f JobFn) ! { + if f == unsafe { nil } { + return error(err_nil_job) + } + async_ctx, cancel := new_timeout_context(parent, timeout) + mut ctx := context.Context(async_ctx) + defer { + cancel() + } + initial_err := ctx.err() + if initial_err !is none { + if initial_err.msg() == context_deadline_exceeded && async_ctx.was_canceled_by_timeout() { + return error(err_timeout) + } + return initial_err + } + // The channel is buffered so a non-cooperative job can still publish its + // result later without blocking after the caller has returned on timeout. + result_ch := chan TimeoutResult{cap: 1} + spawn run_timeout_job(ctx, f, result_ch) + done_ch := ctx.done() + select { + result := <-result_ch { + if result.err !is none { + ctx_err := ctx.err() + if ctx_err !is none && ctx_err.msg() == context_deadline_exceeded + && result.err.msg() == context_deadline_exceeded { + if async_ctx.was_canceled_by_timeout() { + return error(err_timeout) + } + } + return result.err + } + return + } + _ := <-done_ch { + err := ctx.err() + if err !is none { + if err.msg() == context_deadline_exceeded && async_ctx.was_canceled_by_timeout() { + return error(err_timeout) + } + return err + } + return error(err_timeout) + } + } +} + +fn run_timeout_job(ctx context.Context, f JobFn, result_ch chan TimeoutResult) { + // Spawned functions cannot receive mutable non-reference arguments, so the + // worker creates the mutable context interface value locally. + mut job_ctx := ctx + f(mut job_ctx) or { + result_ch <- TimeoutResult{ + err: err + } + return + } + result_ch <- TimeoutResult{} +} diff --git a/vlib/x/async/timeout_test.v b/vlib/x/async/timeout_test.v new file mode 100644 index 000000000..0e2d3b8d0 --- /dev/null +++ b/vlib/x/async/timeout_test.v @@ -0,0 +1,295 @@ +import context +import sync +import time +import x.async as xasync + +@[heap] +struct ControlledDeadlineParent { + deadline_at time.Time +mut: + done_ch chan int + mutex &sync.Mutex = sync.new_mutex() + err_value IError = none +} + +fn new_controlled_deadline_parent(deadline_at time.Time) &ControlledDeadlineParent { + return &ControlledDeadlineParent{ + deadline_at: deadline_at + done_ch: chan int{} + mutex: sync.new_mutex() + } +} + +fn (ctx &ControlledDeadlineParent) deadline() ?time.Time { + return ctx.deadline_at +} + +fn (ctx &ControlledDeadlineParent) value(key context.Key) ?context.Any { + _ = key + return none +} + +fn (mut ctx ControlledDeadlineParent) done() chan int { + ctx.mutex.lock() + done := ctx.done_ch + ctx.mutex.unlock() + return done +} + +fn (mut ctx ControlledDeadlineParent) err() IError { + ctx.mutex.lock() + err := ctx.err_value + ctx.mutex.unlock() + return err +} + +fn (mut ctx ControlledDeadlineParent) close_with_error(err IError) { + ctx.mutex.lock() + ctx.err_value = err + if !ctx.done_ch.closed { + ctx.done_ch.close() + } + ctx.mutex.unlock() +} + +fn context_error_or_closed_without_error(mut ctx context.Context) ! { + err := ctx.err() + if err !is none { + return err + } + return error('context closed without error') +} + +fn test_with_timeout_returns_timeout_error() { + xasync.with_timeout(5 * time.millisecond, fn (mut ctx context.Context) ! { + _ = ctx + time.sleep(50 * time.millisecond) + }) or { + assert err.msg() == 'async: timeout' + return + } + assert false +} + +fn test_with_timeout_returns_without_waiting_for_ignored_cancellation() { + stopwatch := time.new_stopwatch() + xasync.with_timeout(10 * time.millisecond, fn (mut ctx context.Context) ! { + _ = ctx + time.sleep(250 * time.millisecond) + }) or { + assert err.msg() == 'async: timeout' + assert stopwatch.elapsed() < 150 * time.millisecond + return + } + assert false +} + +fn test_with_timeout_zero_duration_expires_immediately() { + xasync.with_timeout(0 * time.millisecond, fn (mut ctx context.Context) ! { + done := ctx.done() + select { + _ := <-done { + return context_error_or_closed_without_error(mut ctx) + } + 1 * time.second { + return + } + } + }) or { + assert err.msg() == 'async: timeout' + return + } + assert false +} + +fn test_with_timeout_negative_duration_is_already_expired() { + xasync.with_timeout(-1 * time.millisecond, fn (mut ctx context.Context) ! { + done := ctx.done() + select { + _ := <-done { + return context_error_or_closed_without_error(mut ctx) + } + 1 * time.second { + return + } + } + }) or { + assert err.msg() == 'async: timeout' + return + } + assert false +} + +fn test_with_timeout_success_before_timeout() { + seen := chan int{cap: 1} + xasync.with_timeout(1 * time.second, fn [seen] (mut ctx context.Context) ! { + _ = ctx + seen <- 7 + })! + assert <-seen == 7 +} + +fn test_with_timeout_returns_job_error() { + xasync.with_timeout(1 * time.second, fn (mut ctx context.Context) ! { + _ = ctx + return error('job failed') + }) or { + assert err.msg() == 'job failed' + return + } + assert false +} + +fn test_with_timeout_preserves_job_error_matching_context_message_before_timeout() { + xasync.with_timeout(1 * time.second, fn (mut ctx context.Context) ! { + _ = ctx + return error('context deadline exceeded') + }) or { + assert err.msg() == 'context deadline exceeded' + return + } + assert false +} + +fn test_with_timeout_refuses_nil_job() { + nil_job := unsafe { xasync.JobFn(nil) } + xasync.with_timeout(1 * time.second, nil_job) or { + assert err.msg() == 'async: job function is nil' + return + } + assert false +} + +fn test_with_timeout_returns_timeout_for_cooperative_job() { + mut timed_out := false + xasync.with_timeout(50 * time.millisecond, fn (mut ctx context.Context) ! { + done := ctx.done() + select { + _ := <-done { + return context_error_or_closed_without_error(mut ctx) + } + 1 * time.second { + return error('cooperative timeout job was not canceled') + } + } + }) or { + assert err.msg() == 'async: timeout' + timed_out = true + } + assert timed_out +} + +fn test_with_timeout_context_uses_parent_context() { + mut parent_ctx, cancel := xasync.with_cancel() + cancel() + xasync.with_timeout_context(parent_ctx, 1 * time.second, fn (mut ctx context.Context) ! { + done := ctx.done() + select { + _ := <-done { + return context_error_or_closed_without_error(mut ctx) + } + 1 * time.second {} + } + }) or { + assert err.msg() == 'context canceled' + return + } + assert false +} + +fn test_with_timeout_context_preserves_already_expired_parent_deadline() { + mut background := context.background() + parent_ctx, parent_cancel := context.with_timeout(mut background, -1 * time.millisecond) + defer { + parent_cancel() + } + + xasync.with_timeout_context(parent_ctx, 1 * time.second, fn (mut ctx context.Context) ! { + _ = ctx + return error('job should not run with an already expired parent') + }) or { + assert err.msg() == 'context deadline exceeded' + return + } + assert false +} + +fn test_with_timeout_context_preserves_parent_deadline_that_expires_first() { + mut background := context.background() + parent_ctx, parent_cancel := context.with_timeout(mut background, 10 * time.millisecond) + defer { + parent_cancel() + } + + xasync.with_timeout_context(parent_ctx, 1 * time.second, fn (mut ctx context.Context) ! { + _ = ctx + time.sleep(100 * time.millisecond) + }) or { + assert err.msg() == 'context deadline exceeded' + return + } + assert false +} + +fn test_with_timeout_context_waits_for_controlled_parent_done_and_returns_exact_parent_error() { + mut parent_ctx := new_controlled_deadline_parent(time.now().add(-1 * time.second)) + started := chan bool{cap: 1} + release := chan bool{cap: 1} + result := chan string{cap: 1} + + caller := spawn fn [mut parent_ctx, started, release, result] () { + xasync.with_timeout_context(context.Context(parent_ctx), 1 * time.second, fn [started, release] (mut ctx context.Context) ! { + _ = ctx + started <- true + _ := <-release + }) or { + result <- err.msg() + return + } + result <- 'ok' + }() + + select { + did_start := <-started { + assert did_start + } + 1 * time.second { + assert false, 'with_timeout_context returned before starting the job for a pending parent' + } + } + select { + msg := <-result { + assert false, 'with_timeout_context returned before parent done closed: ${msg}' + } + else {} + } + + parent_ctx.close_with_error(error('controlled parent deadline')) + select { + msg := <-result { + assert msg == 'controlled parent deadline' + } + 1 * time.second { + assert false, 'with_timeout_context did not return after controlled parent closed' + } + } + release <- true + caller.wait() +} + +fn test_with_timeout_context_uses_own_timeout_before_parent_deadline() { + mut background := context.background() + parent_ctx, parent_cancel := context.with_timeout(mut background, 1 * time.second) + defer { + parent_cancel() + } + + xasync.with_timeout_context(parent_ctx, 10 * time.millisecond, fn (mut ctx context.Context) ! { + _ = ctx + time.sleep(100 * time.millisecond) + }) or { + assert err.msg() == 'async: timeout' + return + } + assert false +} diff --git a/vlib/x/async/tools/README.md b/vlib/x/async/tools/README.md new file mode 100644 index 000000000..a9c2c611c --- /dev/null +++ b/vlib/x/async/tools/README.md @@ -0,0 +1,29 @@ +# x.async tools + +This folder contains local helper scripts for safe validation of `x.async`. +Scripts must use repository-relative paths and must not depend on local machine +paths, secrets, or external services. + +## `validate.sh` + +Runs the guarded validation path: + +- `./v fmt -verify` for module, tests, examples, and benchmarks V files. +- each public example under `vlib/x/async/examples/`, run serially. +- `./v test vlib/x/async`. +- `./v -prod test vlib/x/async`. + +The script creates a fresh temporary root, sets isolated `VTMP` and `VCACHE`, +uses the repository-local `./v`, and runs commands serially. This avoids the +known class of V runner artefact/cache collisions that can happen when multiple +external runners share the same checkout/cache. + +From the repository root: + +```sh +sh vlib/x/async/tools/validate.sh +``` + +If a crash appears through this serialized and isolated path, treat it as a +blocking runtime/test signal. Do not classify it as tooling noise without a new +investigation. diff --git a/vlib/x/async/tools/validate.sh b/vlib/x/async/tools/validate.sh new file mode 100755 index 000000000..4f3d86399 --- /dev/null +++ b/vlib/x/async/tools/validate.sh @@ -0,0 +1,46 @@ +#!/usr/bin/env sh +set -eu + +script_dir=$(CDPATH= cd "$(dirname "$0")" && pwd) +repo_root=$(CDPATH= cd "$script_dir/../../../.." && pwd) +cd "$repo_root" + +if [ ! -x ./v ]; then + echo "x.async validation must be run from a V checkout with local ./v" >&2 + exit 1 +fi + +tmp_root=$(mktemp -d "${TMPDIR:-/tmp}/xasync-validate.XXXXXX") +cleanup() { + rm -rf "$tmp_root" +} +trap cleanup EXIT INT TERM + +vtmp="$tmp_root/vtmp" +vcache="$tmp_root/vcache" +mkdir -p "$vtmp" "$vcache" + +run_v() { + seconds=$1 + shift + echo "+ ./v $*" + if command -v timeout >/dev/null 2>&1; then + timeout "$seconds" env VTMP="$vtmp" VCACHE="$vcache" ./v "$@" + else + env VTMP="$vtmp" VCACHE="$vcache" ./v "$@" + fi +} + +# Keep the official x.async validation serial. Running two V runners against the +# same checkout/cache can make build artefacts collide before x.async code runs. +v_files=$(find vlib/x/async -type f -name '*.v' | sort) +example_files=$(find vlib/x/async/examples -type f -name '*.v' | sort) + +run_v 60 fmt -verify $v_files + +for example in $example_files; do + run_v 60 run "$example" +done + +run_v 120 test vlib/x/async +run_v 180 -prod test vlib/x/async -- 2.39.5