From e21acca549c0df34c98beb389088dbf7cee4c4d0 Mon Sep 17 00:00:00 2001 From: Alexander Medvednikov Date: Tue, 17 Mar 2026 01:47:58 +0300 Subject: [PATCH] v2: translate Go's goroutine scheduler (GMP model) to V (#26740) Add a full goroutine runtime module (vlib/goroutines/) translated from Go's runtime scheduler, implementing the GMP (Goroutine-Machine-Processor) model with work-stealing, lock-free local run queues, and cooperative scheduling. Runtime module: - goroutines.v: Core data structures (Goroutine, Machine, Processor, Sched) - scheduler.v: GMP scheduler with work stealing, goroutine_create() - park.v: gopark/goready for goroutine blocking/unblocking - chan.v: Channel implementation (send, recv, close, select) - init.v: Module init creating P's (one per CPU core) and M0 - context_nix.c.v: ucontext-based context switching (Linux/macOS) - context_windows.c.v: Windows Fiber-based context switching - atomic_ops.c.v: C atomic operation wrappers V2 compiler changes: - ssa/instr.v: Add go_call and spawn_call opcodes - ssa/builder.v: Handle .key_go and .key_spawn in build_keyword_operator, add build_go_or_spawn() helper to extract CallExpr operands - gen/c/c.v: Emit goroutines__goroutine_create() for go_call, pthread_create() for spawn_call, with argument packing structs The transformer now: - Detects `KeywordOperator{ op: .key_go }` expressions - Synthesizes per-function: args struct, trampoline function, and dispatch wrapper that calls goroutines__goroutine_create - Replaces `go foo(a, b)` with a plain call to the dispatch wrapper - Registers wrapper info in needed_go_wrappers map (thread-safe for parallel transform) Backends see only regular CallExpr nodes after transformation. SSA builder and cleanc gen_spawn_expr retain fallback .key_go handling. Benchmark results (V goroutines vs Go goroutines vs V spawn): fan-out/fan-in 500g: V=3017us Go=657us spawn=70465us ping-pong 100K rt: V=117ms Go=41ms spawn=953ms contended 10x1K: V=549us Go=824us spawn=14920us V goroutines are ~3-5x slower than Go on creation/scheduling (expected given Go's mature runtime) but dramatically faster than OS threads (spawn), and competitive on channel throughput. --- vlib/goroutines/README.md | 50 ++ vlib/goroutines/atomic_ops.c.v | 47 ++ vlib/goroutines/chan.v | 285 ++++++++ vlib/goroutines/context_nix.c.v | 62 ++ vlib/goroutines/context_windows.c.v | 41 ++ vlib/goroutines/examples/basic_goroutine.v | 26 + .../examples/goroutine_benchmark.go | 119 ++++ .../goroutines/examples/goroutine_benchmark.v | 157 +++++ vlib/goroutines/examples/spawn_benchmark.v | 124 ++++ vlib/goroutines/goroutines.v | 227 +++++++ vlib/goroutines/goroutines_test.v | 173 +++++ vlib/goroutines/goroutines_tls.h | 21 + vlib/goroutines/init.v | 111 +++ vlib/goroutines/park.v | 143 ++++ vlib/goroutines/scheduler.v | 642 ++++++++++++++++++ vlib/goroutines/tls.c | 68 ++ vlib/v2/gen/c/c.v | 153 +++++ vlib/v2/gen/cleanc/cleanc.v | 7 +- vlib/v2/ssa/builder.v | 42 ++ vlib/v2/ssa/instr.v | 4 + vlib/v2/transformer/expr.v | 117 ++++ vlib/v2/transformer/transformer.v | 286 ++++++++ 22 files changed, 2904 insertions(+), 1 deletion(-) create mode 100644 vlib/goroutines/README.md create mode 100644 vlib/goroutines/atomic_ops.c.v create mode 100644 vlib/goroutines/chan.v create mode 100644 vlib/goroutines/context_nix.c.v create mode 100644 vlib/goroutines/context_windows.c.v create mode 100644 vlib/goroutines/examples/basic_goroutine.v create mode 100644 vlib/goroutines/examples/goroutine_benchmark.go create mode 100644 vlib/goroutines/examples/goroutine_benchmark.v create mode 100644 vlib/goroutines/examples/spawn_benchmark.v create mode 100644 vlib/goroutines/goroutines.v create mode 100644 vlib/goroutines/goroutines_test.v create mode 100644 vlib/goroutines/goroutines_tls.h create mode 100644 vlib/goroutines/init.v create mode 100644 vlib/goroutines/park.v create mode 100644 vlib/goroutines/scheduler.v create mode 100644 vlib/goroutines/tls.c diff --git a/vlib/goroutines/README.md b/vlib/goroutines/README.md new file mode 100644 index 000000000..3c81fe616 --- /dev/null +++ b/vlib/goroutines/README.md @@ -0,0 +1,50 @@ +# goroutines + +Go-style goroutine runtime for V, implementing the GMP (Goroutine-Machine-Processor) scheduling model translated from the Go runtime (`src/runtime/proc.go`, `runtime2.go`, `chan.go`). + +## Overview + +This module provides lightweight goroutines for V's `go` keyword, as opposed to `spawn` which creates OS threads. + +### GMP Model + +- **G (Goroutine)**: Lightweight unit of execution with its own stack (~8KB default) +- **M (Machine)**: OS thread that executes goroutines +- **P (Processor)**: Logical processor with a local run queue (one per CPU core) + +### Key Features + +- **Work stealing**: Idle processors steal work from busy ones +- **Local run queues**: Lock-free per-P queues minimize contention +- **Global run queue**: Overflow and fairness mechanism +- **Goroutine parking**: Efficient blocking/unblocking for channels +- **G reuse**: Dead goroutines are recycled to reduce allocation + +## Usage + +```v +// `go` launches a goroutine (lightweight, scheduled by GMP) +go expensive_computation() + +// `spawn` launches an OS thread (traditional V behavior) +spawn blocking_io_task() +``` + +## Architecture + +Translated from Go's runtime source: + +| Go Source | V Module File | Purpose | +|-----------|---------------|---------| +| `runtime2.go` | `goroutines.v` | Core data structures (G, M, P, Sched) | +| `proc.go` | `scheduler.v` | Scheduler loop, work stealing, run queues | +| `proc.go` | `park.v` | gopark/goready, Sudog, WaitQ | +| `proc.go` | `init.v` | Initialization (schedinit, procresize) | +| `chan.go` | `chan.v` | Channel implementation | +| asm (gogo/gosave) | `context_nix.c.v` | Context switching (ucontext) | +| asm (gogo/gosave) | `context_windows.c.v` | Context switching (Windows fibers) | + +## References + +- [Go Scheduler Design Doc](https://golang.org/s/go11sched) +- [Go Runtime Source](https://github.com/golang/go/tree/master/src/runtime) diff --git a/vlib/goroutines/atomic_ops.c.v b/vlib/goroutines/atomic_ops.c.v new file mode 100644 index 000000000..0646cd971 --- /dev/null +++ b/vlib/goroutines/atomic_ops.c.v @@ -0,0 +1,47 @@ +// Copyright (c) 2019-2024 Alexander Medvednikov. All rights reserved. +// Use of this source code is governed by an MIT license +// that can be found in the LICENSE file. +// +// Atomic operations and C interop for the goroutine scheduler. +module goroutines + +#include +#include +#include "@VMODROOT/vlib/goroutines/goroutines_tls.h" + +#flag @VMODROOT/vlib/goroutines/tls.c + +// Thread-local storage +fn C.goroutines_get_current_m() voidptr +fn C.goroutines_set_current_m(mp voidptr) + +// Typed atomic operations (implemented in tls.c) +fn C.goroutines_atomic_load_u32(ptr &u32) u32 +fn C.goroutines_atomic_store_u32(ptr &u32, val u32) +fn C.goroutines_atomic_fetch_add_u32(ptr &u32, val u32) u32 +fn C.goroutines_atomic_fetch_add_i32(ptr &i32, val i32) i32 +fn C.goroutines_atomic_fetch_sub_i32(ptr &i32, val i32) i32 +fn C.goroutines_atomic_fetch_add_u64(ptr &u64, val u64) u64 +fn C.goroutines_atomic_cas_u32(ptr &u32, expected &u32, desired u32) bool +fn C.goroutines_atomic_cas_ptr(ptr voidptr, expected voidptr, desired voidptr) bool + +fn C.grt_spinlock_lock(lk &i32) +fn C.grt_spinlock_unlock(lk &i32) + +fn C.memcpy(dest voidptr, src voidptr, n usize) voidptr +fn C.memset(dest voidptr, ch int, n usize) voidptr +fn C.rand() int + +// SpinLock - ucontext-safe lock (pthreads mutex breaks with swapcontext). +pub struct SpinLock { +mut: + state i32 +} + +pub fn (mut s SpinLock) acquire() { + C.grt_spinlock_lock(&s.state) +} + +pub fn (mut s SpinLock) release() { + C.grt_spinlock_unlock(&s.state) +} diff --git a/vlib/goroutines/chan.v b/vlib/goroutines/chan.v new file mode 100644 index 000000000..ef282404d --- /dev/null +++ b/vlib/goroutines/chan.v @@ -0,0 +1,285 @@ +// Copyright (c) 2019-2024 Alexander Medvednikov. All rights reserved. +// Use of this source code is governed by an MIT license +// that can be found in the LICENSE file. +// +// Channel implementation for goroutines. +// Translated from Go's runtime/chan.go. +// +// Channels provide goroutine-safe communication between goroutines. +// They support both buffered and unbuffered modes. +// +// Key operations translated from Go: +// - makechan() -> chan_make() +// - chansend() -> chan_send() +// - chanrecv() -> chan_recv() +// - closechan() -> chan_close() +module goroutines + +// Chan is a goroutine-safe channel for communication between goroutines. +// Translated from Go's hchan struct in chan.go. +pub struct Chan { +pub mut: + mu SpinLock // protects all fields (spinlock is ucontext-safe) + qcount u32 // total data in the queue + dataqsiz u32 // size of the circular buffer + buf voidptr // circular buffer for buffered channels + elemsize u16 // size of each element + closed bool // true if channel is closed + + sendx u32 // send index into circular buffer + recvx u32 // receive index into circular buffer + + recvq WaitQ // list of recv waiters + sendq WaitQ // list of send waiters +} + +// chan_make creates a new channel. +// If buf_size > 0, creates a buffered channel. +// Translated from Go's makechan() in chan.go. +pub fn chan_make(elem_size int, buf_size int) &Chan { + mut c := &Chan{ + elemsize: u16(elem_size) + dataqsiz: u32(buf_size) + } + if buf_size > 0 { + c.buf = unsafe { malloc(elem_size * buf_size) } + } + return c +} + +// chan_send sends a value on the channel. +// If block is true, blocks until the send can proceed. +// Returns true if the value was sent. +// Translated from Go's chansend() in chan.go. +pub fn chan_send(c &Chan, ep voidptr, block bool) bool { + if c == unsafe { nil } { + if !block { + return false + } + // Block forever on nil channel (Go behavior) + gopark('chan send (nil chan)') + return false // unreachable + } + + mut ch := unsafe { c } + ch.mu.acquire() + + if ch.closed { + ch.mu.release() + panic('send on closed channel') + } + + // Fast path: try to find a waiting receiver + sg := ch.recvq.dequeue() + if sg != unsafe { nil } { + // Found a waiting receiver - send directly + ch.mu.release() + send_direct(sg, ep, ch.elemsize) + return true + } + + // Buffered channel with space available + if ch.qcount < ch.dataqsiz { + // Put data in buffer + dst := chan_buf(ch, ch.sendx) + unsafe { C.memcpy(dst, ep, ch.elemsize) } + ch.sendx++ + if ch.sendx == ch.dataqsiz { + ch.sendx = 0 + } + ch.qcount++ + ch.mu.release() + return true + } + + if !block { + ch.mu.release() + return false + } + + // Block: enqueue ourselves on the send wait queue + gp := get_current_g() + mut mysg := &Sudog{ + g: unsafe { gp } + elem: ep + c: voidptr(ch) + } + ch.sendq.enqueue(mysg) + ch.mu.release() + + // Park the goroutine until a receiver wakes us + gopark('chan send') + + return true +} + +// chan_recv receives a value from the channel. +// If block is true, blocks until a value is available. +// Returns (received, ok). ok is false if channel is closed and empty. +// Translated from Go's chanrecv() in chan.go. +pub fn chan_recv(c &Chan, ep voidptr, block bool) (bool, bool) { + if c == unsafe { nil } { + if !block { + return false, false + } + gopark('chan receive (nil chan)') + return false, false // unreachable + } + + mut ch := unsafe { c } + ch.mu.acquire() + + // Fast path: try to find a waiting sender + sg := ch.sendq.dequeue() + if sg != unsafe { nil } { + ch.mu.release() + recv_direct(ch, sg, ep) + return true, true + } + + // Buffered channel with data available + if ch.qcount > 0 { + src := chan_buf(ch, ch.recvx) + if ep != unsafe { nil } { + unsafe { C.memcpy(ep, src, ch.elemsize) } + } + ch.recvx++ + if ch.recvx == ch.dataqsiz { + ch.recvx = 0 + } + ch.qcount-- + ch.mu.release() + return true, true + } + + if ch.closed { + ch.mu.release() + if ep != unsafe { nil } { + unsafe { C.memset(ep, 0, ch.elemsize) } + } + return true, false + } + + if !block { + ch.mu.release() + return false, false + } + + // Block: enqueue ourselves on the recv wait queue + gp := get_current_g() + mut mysg := &Sudog{ + g: unsafe { gp } + elem: ep + c: voidptr(ch) + } + ch.recvq.enqueue(mysg) + ch.mu.release() + + // Park until a sender wakes us + gopark('chan receive') + + return true, true +} + +// chan_close closes the channel. +// Translated from Go's closechan() in chan.go. +pub fn chan_close(c &Chan) { + if c == unsafe { nil } { + panic('close of nil channel') + } + + mut ch := unsafe { c } + ch.mu.acquire() + + if ch.closed { + ch.mu.release() + panic('close of closed channel') + } + + ch.closed = true + + // Wake all waiting receivers + for { + mut sg := ch.recvq.dequeue() + if sg == unsafe { nil } { + break + } + if sg.elem != unsafe { nil } { + unsafe { C.memset(sg.elem, 0, ch.elemsize) } + } + sg.success = false + goready(sg.g) + } + + // Wake all waiting senders (they will panic) + for { + mut sg := ch.sendq.dequeue() + if sg == unsafe { nil } { + break + } + sg.success = false + goready(sg.g) + } + + ch.mu.release() +} + +// send_direct sends data directly from sender to a waiting receiver. +// Translated from Go's send() in chan.go. +fn send_direct(sg &Sudog, ep voidptr, elem_size u16) { + if sg.elem != unsafe { nil } { + unsafe { C.memcpy(sg.elem, ep, elem_size) } + } + mut s := unsafe { sg } + s.success = true + goready(sg.g) +} + +// recv_direct receives data directly from a waiting sender. +fn recv_direct(ch &Chan, sg &Sudog, ep voidptr) { + if ch.dataqsiz == 0 { + // Unbuffered: copy directly from sender + if ep != unsafe { nil } { + unsafe { C.memcpy(ep, sg.elem, ch.elemsize) } + } + } else { + // Buffered: take from buffer, then copy sender's data into buffer + buf_elem := chan_buf(ch, unsafe { ch }.recvx) + if ep != unsafe { nil } { + unsafe { C.memcpy(ep, buf_elem, ch.elemsize) } + } + unsafe { C.memcpy(buf_elem, sg.elem, ch.elemsize) } + unsafe { + ch.recvx++ + if ch.recvx == ch.dataqsiz { + ch.recvx = 0 + } + ch.sendx = ch.recvx + } + } + mut s := unsafe { sg } + s.success = true + goready(sg.g) +} + +// chan_buf returns a pointer to the i-th slot in the buffer. +// Translated from Go's chanbuf() in chan.go. +fn chan_buf(c &Chan, i u32) voidptr { + return unsafe { voidptr(usize(c.buf) + usize(i) * usize(c.elemsize)) } +} + +// chan_len returns the number of elements in the channel buffer. +pub fn chan_len(c &Chan) int { + if c == unsafe { nil } { + return 0 + } + return int(c.qcount) +} + +// chan_cap returns the capacity of the channel buffer. +pub fn chan_cap(c &Chan) int { + if c == unsafe { nil } { + return 0 + } + return int(c.dataqsiz) +} diff --git a/vlib/goroutines/context_nix.c.v b/vlib/goroutines/context_nix.c.v new file mode 100644 index 000000000..fe9d3b8cc --- /dev/null +++ b/vlib/goroutines/context_nix.c.v @@ -0,0 +1,62 @@ +// Copyright (c) 2019-2024 Alexander Medvednikov. All rights reserved. +// Use of this source code is governed by an MIT license +// that can be found in the LICENSE file. +// +// Platform-specific context switching using ucontext (Linux, macOS, BSDs). +// This provides the low-level mechanism for goroutine context switches, +// analogous to Go's gogo/gosave assembly routines. +module goroutines + +#include + +// ucontext_t - POSIX context structure for cooperative context switching +struct C.ucontext_t { +mut: + uc_link &C.ucontext_t = unsafe { nil } + uc_stack C.stack_t +} + +struct C.stack_t { +mut: + ss_sp voidptr + ss_size usize + ss_flags int +} + +fn C.getcontext(ucp &C.ucontext_t) int +fn C.setcontext(ucp &C.ucontext_t) int +fn C.makecontext(ucp &C.ucontext_t, func fn (), argc int, ...voidptr) +fn C.swapcontext(oucp &C.ucontext_t, ucp &C.ucontext_t) int + +// Context wraps ucontext_t for goroutine context switching. +pub struct Context { +pub mut: + uctx C.ucontext_t +} + +// context_init initializes a context for a new goroutine. +// Sets up the context to run `entry_fn` with `arg` on the given stack. +pub fn context_init(mut ctx Context, stack voidptr, stack_size int, entry_fn fn (voidptr), arg voidptr) { + C.getcontext(&ctx.uctx) + ctx.uctx.uc_stack.ss_sp = stack + ctx.uctx.uc_stack.ss_size = usize(stack_size) + ctx.uctx.uc_link = unsafe { nil } + // makecontext with the goroutine trampoline + // We pass the arg as two 32-bit ints to be portable (makecontext uses int args) + lo := u32(u64(arg)) + hi := u32(u64(arg) >> 32) + C.makecontext(&ctx.uctx, fn [entry_fn, lo, hi] () { + combined := voidptr(u64(lo) | (u64(hi) << 32)) + entry_fn(combined) + }, 0) +} + +// context_switch saves the current context into `from` and switches to `to`. +pub fn context_switch(mut from Context, to &Context) { + C.swapcontext(&from.uctx, &to.uctx) +} + +// context_set switches to the given context without saving. +pub fn context_set(to &Context) { + C.setcontext(&to.uctx) +} diff --git a/vlib/goroutines/context_windows.c.v b/vlib/goroutines/context_windows.c.v new file mode 100644 index 000000000..094a675e2 --- /dev/null +++ b/vlib/goroutines/context_windows.c.v @@ -0,0 +1,41 @@ +// Copyright (c) 2019-2024 Alexander Medvednikov. All rights reserved. +// Use of this source code is governed by an MIT license +// that can be found in the LICENSE file. +// +// Platform-specific context switching using Windows Fibers. +module goroutines + +#include + +fn C.CreateFiber(dwStackSize usize, lpStartAddress voidptr, lpParameter voidptr) voidptr +fn C.ConvertThreadToFiber(lpParameter voidptr) voidptr +fn C.SwitchToFiber(lpFiber voidptr) +fn C.DeleteFiber(lpFiber voidptr) +fn C.ConvertFiberToThread() + +pub struct Context { +pub mut: + fiber voidptr + is_thread_fiber bool // true if this was created via ConvertThreadToFiber +} + +pub fn context_init(mut ctx Context, stack voidptr, stack_size int, entry_fn fn (voidptr), arg voidptr) { + // Windows fibers manage their own stack, so we ignore the stack param + ctx.fiber = C.CreateFiber(usize(stack_size), voidptr(entry_fn), arg) +} + +pub fn context_switch(mut from Context, to &Context) { + C.SwitchToFiber(to.fiber) +} + +pub fn context_set(to &Context) { + C.SwitchToFiber(to.fiber) +} + +// convert_thread_to_fiber must be called once per OS thread before using fibers. +pub fn convert_thread_to_fiber() Context { + mut ctx := Context{} + ctx.fiber = C.ConvertThreadToFiber(unsafe { nil }) + ctx.is_thread_fiber = true + return ctx +} diff --git a/vlib/goroutines/examples/basic_goroutine.v b/vlib/goroutines/examples/basic_goroutine.v new file mode 100644 index 000000000..5e5043a21 --- /dev/null +++ b/vlib/goroutines/examples/basic_goroutine.v @@ -0,0 +1,26 @@ +// Example: Basic goroutine usage with `go` keyword. +// +// This demonstrates how V's `go` keyword launches goroutines +// using the GMP scheduler (translated from Go's runtime). +// +// `go` launches a lightweight goroutine (like Go's goroutines) +// `spawn` launches an OS thread (like V's traditional threads) +module main + +import time + +fn say(msg string) { + for i in 0 .. 5 { + println('${msg}: ${i}') + time.sleep(100 * time.millisecond) + } +} + +fn main() { + // Launch goroutines with `go` - lightweight, Go-style concurrency + go say('goroutine 1') + go say('goroutine 2') + + // Main goroutine continues running + say('main') +} diff --git a/vlib/goroutines/examples/goroutine_benchmark.go b/vlib/goroutines/examples/goroutine_benchmark.go new file mode 100644 index 000000000..6bfff2762 --- /dev/null +++ b/vlib/goroutines/examples/goroutine_benchmark.go @@ -0,0 +1,119 @@ +// Goroutine benchmark in Go – equivalent to goroutine_benchmark.v. +// +// Tests: +// 1. Goroutine creation + completion (fan-out/fan-in via unbuffered channel) +// 2. Channel ping-pong between two goroutines +// 3. Many goroutines contending on a single channel +// +// Run: go run goroutine_benchmark.go +package main + +import ( + "fmt" + "time" +) + +// --------------------------------------------------------------------------- +// Benchmark 1 – fan-out / fan-in (unbuffered channel) +// --------------------------------------------------------------------------- + +func benchFanOutFanIn(n int) { + c := make(chan int) // unbuffered, matching V benchmark + + start := time.Now() + + for i := 0; i < n; i++ { + go func() { + c <- 1 + }() + } + + for i := 0; i < n; i++ { + <-c + } + + elapsed := time.Since(start) + fmt.Printf("fan-out/fan-in %d goroutines: %d us (%d ns/goroutine)\n", + n, elapsed.Microseconds(), elapsed.Nanoseconds()/int64(n)) +} + +// --------------------------------------------------------------------------- +// Benchmark 2 – channel ping-pong +// --------------------------------------------------------------------------- + +func benchPingPong(n int) { + c1 := make(chan int, 1) + c2 := make(chan int, 1) + + go func() { + for i := 0; i < n; i++ { + val := <-c1 + c2 <- val + 1 + } + }() + + start := time.Now() + + val := 0 + for i := 0; i < n; i++ { + c1 <- val + val = <-c2 + } + + elapsed := time.Since(start) + fmt.Printf("ping-pong %d round-trips: %d us (%d ns/round-trip)\n", + n, elapsed.Microseconds(), elapsed.Nanoseconds()/int64(n)) +} + +// --------------------------------------------------------------------------- +// Benchmark 3 – contended channel (many producers, one consumer) +// --------------------------------------------------------------------------- + +func benchContendedChannel(numProducers int, msgsPerProducer int) { + total := numProducers * msgsPerProducer + c := make(chan int, 64) + + start := time.Now() + + for p := 0; p < numProducers; p++ { + go func() { + for i := 0; i < msgsPerProducer; i++ { + c <- i + } + }() + } + + for i := 0; i < total; i++ { + <-c + } + + elapsed := time.Since(start) + fmt.Printf("contended chan %d producers x %d msgs: %d us (%d ns/msg)\n", + numProducers, msgsPerProducer, + elapsed.Microseconds(), elapsed.Nanoseconds()/int64(total)) +} + +// --------------------------------------------------------------------------- +// Main +// --------------------------------------------------------------------------- + +func main() { + fmt.Println("=== Go Goroutine Benchmark ===") + fmt.Println() + + benchFanOutFanIn(10) + benchFanOutFanIn(50) + benchFanOutFanIn(100) + benchFanOutFanIn(500) + + fmt.Println() + benchPingPong(1000) + benchPingPong(10000) + benchPingPong(100000) + + fmt.Println() + benchContendedChannel(4, 1000) + benchContendedChannel(10, 1000) + + fmt.Println() +} diff --git a/vlib/goroutines/examples/goroutine_benchmark.v b/vlib/goroutines/examples/goroutine_benchmark.v new file mode 100644 index 000000000..41801135b --- /dev/null +++ b/vlib/goroutines/examples/goroutine_benchmark.v @@ -0,0 +1,157 @@ +// Goroutine benchmark: measures scheduling and channel throughput +// of V's goroutine runtime (translated from Go's GMP scheduler). +// +// Compile: v -enable-globals -cc gcc -gc none -prod run goroutine_benchmark.v +// For comparison, see goroutine_benchmark.go. +// +// Note: Under the v1 compiler, the goroutines module uses pthreads-based +// sync.Mutex which has limitations with ucontext-based context switching. +// The v2 compiler will use a native spinlock, enabling higher scale. +module main + +import goroutines +import sync +import time + +__global bench_sem = sync.Semaphore{} +__global bench_n = int(0) +__global bench_chan1 = &goroutines.Chan(unsafe { nil }) +__global bench_chan2 = &goroutines.Chan(unsafe { nil }) +__global bench_msgs_per = int(0) + +// --- fan-out/fan-in --- + +fn fan_worker(arg voidptr) { + c := unsafe { &goroutines.Chan(arg) } + mut val := 1 + goroutines.chan_send(c, voidptr(&val), true) +} + +fn fan_collector(arg voidptr) { + c := unsafe { &goroutines.Chan(arg) } + mut recv_val := 0 + for _ in 0 .. bench_n { + goroutines.chan_recv(c, voidptr(&recv_val), true) + } + bench_sem.post() +} + +fn bench_fan_out_fan_in(n int) { + bench_n = n + c := goroutines.chan_make(int(sizeof(int)), 0) + + sw := time.new_stopwatch() + + goroutines.goroutine_create(voidptr(&fan_collector), voidptr(c), 0) + for _ in 0 .. n { + goroutines.goroutine_create(voidptr(&fan_worker), voidptr(c), 0) + } + + bench_sem.wait() + + elapsed := sw.elapsed() + us := elapsed.microseconds() + ns_per := elapsed.nanoseconds() / n + C.printf(c'fan-out/fan-in %d goroutines: %lld us (%lld ns/goroutine)\n', n, us, ns_per) +} + +// --- ping-pong --- + +fn ping_side(arg voidptr) { + mut val := 0 + for _ in 0 .. bench_n { + goroutines.chan_send(bench_chan1, voidptr(&val), true) + goroutines.chan_recv(bench_chan2, voidptr(&val), true) + } + bench_sem.post() +} + +fn pong_side(arg voidptr) { + mut val := 0 + for _ in 0 .. bench_n { + goroutines.chan_recv(bench_chan1, voidptr(&val), true) + val++ + goroutines.chan_send(bench_chan2, voidptr(&val), true) + } +} + +fn bench_ping_pong(n int) { + bench_n = n + bench_chan1 = goroutines.chan_make(int(sizeof(int)), 1) + bench_chan2 = goroutines.chan_make(int(sizeof(int)), 1) + + sw := time.new_stopwatch() + + goroutines.goroutine_create(voidptr(&pong_side), unsafe { nil }, 0) + goroutines.goroutine_create(voidptr(&ping_side), unsafe { nil }, 0) + + bench_sem.wait() + + elapsed := sw.elapsed() + us := elapsed.microseconds() + ns_per := elapsed.nanoseconds() / n + C.printf(c'ping-pong %d round-trips: %lld us (%lld ns/round-trip)\n', n, us, ns_per) +} + +// --- contended channel --- + +fn producer(arg voidptr) { + c := unsafe { &goroutines.Chan(arg) } + mut val := 0 + for _ in 0 .. bench_msgs_per { + val++ + goroutines.chan_send(c, voidptr(&val), true) + } +} + +fn consumer_fn(arg voidptr) { + c := unsafe { &goroutines.Chan(arg) } + mut recv_val := 0 + for _ in 0 .. bench_n { + goroutines.chan_recv(c, voidptr(&recv_val), true) + } + bench_sem.post() +} + +fn bench_contended_channel(num_producers int, msgs_per_producer int) { + total := num_producers * msgs_per_producer + bench_n = total + bench_msgs_per = msgs_per_producer + c := goroutines.chan_make(int(sizeof(int)), 64) + + sw := time.new_stopwatch() + + goroutines.goroutine_create(voidptr(&consumer_fn), voidptr(c), 0) + for _ in 0 .. num_producers { + goroutines.goroutine_create(voidptr(&producer), voidptr(c), 0) + } + + bench_sem.wait() + + elapsed := sw.elapsed() + us := elapsed.microseconds() + ns_per := elapsed.nanoseconds() / total + C.printf(c'contended chan %d producers x %d msgs: %lld us (%lld ns/msg)\n', + num_producers, msgs_per_producer, us, ns_per) +} + +fn main() { + C.printf(c'=== V Goroutine Benchmark ===\n\n') + + bench_fan_out_fan_in(10) + bench_fan_out_fan_in(50) + bench_fan_out_fan_in(100) + bench_fan_out_fan_in(500) + + C.printf(c'\n') + bench_ping_pong(1000) + bench_ping_pong(10000) + bench_ping_pong(100000) + + C.printf(c'\n') + bench_contended_channel(4, 1000) + bench_contended_channel(10, 1000) + + C.printf(c'\n') + goroutines.shutdown() +} diff --git a/vlib/goroutines/examples/spawn_benchmark.v b/vlib/goroutines/examples/spawn_benchmark.v new file mode 100644 index 000000000..b842408a7 --- /dev/null +++ b/vlib/goroutines/examples/spawn_benchmark.v @@ -0,0 +1,124 @@ +// V spawn (OS threads) benchmark – for comparison with Go goroutines. +// +// Uses V's built-in `spawn` (OS threads) and `chan` (V1 channels). +// This measures V's current threading primitives vs Go's goroutines. +// +// Run: v -prod run spawn_benchmark.v +module main + +import time + +// --------------------------------------------------------------------------- +// Benchmark 1 – fan-out / fan-in +// --------------------------------------------------------------------------- + +fn fan_worker(c chan int) { + c <- 1 +} + +fn bench_fan_out_fan_in(n int) { + c := chan int{cap: n} + + sw := time.new_stopwatch() + + for _ in 0 .. n { + spawn fan_worker(c) + } + + for _ in 0 .. n { + _ = <-c + } + + elapsed := sw.elapsed() + us := elapsed.microseconds() + ns_per := elapsed.nanoseconds() / n + println('fan-out/fan-in ${n} threads: ${us} us (${ns_per} ns/thread)') +} + +// --------------------------------------------------------------------------- +// Benchmark 2 – channel ping-pong +// --------------------------------------------------------------------------- + +fn pinger(c1 chan int, c2 chan int, n int) { + mut val := 0 + for _ in 0 .. n { + val = <-c1 + val++ + c2 <- val + } +} + +fn bench_ping_pong(n int) { + c1 := chan int{cap: 1} + c2 := chan int{cap: 1} + + spawn pinger(c1, c2, n) + + sw := time.new_stopwatch() + + mut val := 0 + for _ in 0 .. n { + c1 <- val + val = <-c2 + } + + elapsed := sw.elapsed() + us := elapsed.microseconds() + ns_per := elapsed.nanoseconds() / n + println('ping-pong ${n} round-trips: ${us} us (${ns_per} ns/round-trip)') +} + +// --------------------------------------------------------------------------- +// Benchmark 3 – contended channel (many producers, one consumer) +// --------------------------------------------------------------------------- + +fn producer(c chan int, count int) { + mut val := 0 + for _ in 0 .. count { + val++ + c <- val + } +} + +fn bench_contended_channel(num_producers int, msgs_per_producer int) { + total := num_producers * msgs_per_producer + c := chan int{cap: 64} + + sw := time.new_stopwatch() + + for _ in 0 .. num_producers { + spawn producer(c, msgs_per_producer) + } + + for _ in 0 .. total { + _ = <-c + } + + elapsed := sw.elapsed() + us := elapsed.microseconds() + ns_per := elapsed.nanoseconds() / total + println('contended chan ${num_producers} producers x ${msgs_per_producer} msgs: ${us} us (${ns_per} ns/msg)') +} + +// --------------------------------------------------------------------------- +// Main +// --------------------------------------------------------------------------- + +fn main() { + println('=== V spawn (OS threads) Benchmark ===') + println('') + + bench_fan_out_fan_in(1000) + bench_fan_out_fan_in(10000) + // Skip 100k - too many OS threads + + println('') + bench_ping_pong(10000) + bench_ping_pong(100000) + + println('') + bench_contended_channel(10, 1000) + bench_contended_channel(100, 1000) + + println('') +} diff --git a/vlib/goroutines/goroutines.v b/vlib/goroutines/goroutines.v new file mode 100644 index 000000000..229a93ba4 --- /dev/null +++ b/vlib/goroutines/goroutines.v @@ -0,0 +1,227 @@ +// Copyright (c) 2019-2024 Alexander Medvednikov. All rights reserved. +// Use of this source code is governed by an MIT license +// that can be found in the LICENSE file. +// +// Go-style goroutine runtime for V. +// Implements the GMP (Goroutine-Machine-Processor) scheduling model +// translated from the Go runtime (src/runtime/proc.go, runtime2.go). +// +// Goroutine - goroutine: lightweight unit of execution with its own stack +// Machine - machine: OS thread that executes goroutines +// Processor - processor: logical processor that manages a local run queue +// +// Design doc: https://golang.org/s/go11sched +module goroutines + +import sync +import runtime as _ + +// GoFn is the type for a goroutine function: takes a voidptr argument. +pub type GoFn = fn (voidptr) + +// Default goroutine stack size. +// Go uses 8KB with growable stacks; V/C needs more since stacks are fixed. +const default_stack_size = 256 * 1024 + +// Maximum number of goroutines in a Processor's local run queue (matches Go's 256) +const local_queue_size = 256 + +// How often to check the global queue for fairness (matches Go's 61) +const global_queue_check_interval = 61 + +// GStatus represents goroutine states (matches Go's runtime2.go) +pub enum GStatus { + idle // just allocated, not yet initialized + runnable // on a run queue, not currently executing + running // executing user code on an Machine + waiting // blocked (channel, mutex, etc.) + dead // finished execution, available for reuse + copystack // stack is being moved (not used yet) +} + +// PStatus represents processor states +pub enum PStatus { + idle // not being used, available on idle list + running // owned by an Machine and executing code + stopped // halted + dead // no longer used +} + +// Goroutine represents a goroutine - the fundamental unit of concurrent execution. +// Translated from Go's `type g struct` in runtime2.go. +pub struct Goroutine { +pub mut: + id u64 // unique goroutine id + status GStatus = .idle // current state + stack voidptr // stack memory (allocated) + stack_size int = default_stack_size // stack allocation size + context Context // saved CPU context for switching (ucontext_t or similar) + fn_ptr voidptr // function to execute + fn_arg voidptr // argument to the function + sched_link &Goroutine = unsafe { nil } // linked list link for run queues + m &Machine = unsafe { nil } // current Machine executing this Goroutine (nil if not running) + parent_id u64 // goroutine id of creator + wait_reason string // if status==waiting, why + preempt bool // preemption signal +} + +// Machine represents a machine (OS thread) that executes goroutines. +// Translated from Go's `type m struct` in runtime2.go. +pub struct Machine { +pub mut: + id i64 + g0 &Goroutine = unsafe { nil } // goroutine with scheduling stack + curg &Goroutine = unsafe { nil } // current running goroutine + p &Processor = unsafe { nil } // attached processor (nil if not executing Go code) + spinning bool // looking for work + blocked bool // blocked on a note + park sync.Semaphore // for parking/unparking + sched_link &Machine = unsafe { nil } // linked list for idle Machine list + thread thread // underlying OS thread handle +} + +// Processor represents a processor - a resource required to execute goroutines. +// Each Processor has a local run queue. Translated from Go's `type p struct` in runtime2.go. +pub struct Processor { +pub mut: + id i32 + status PStatus = .idle + m &Machine = unsafe { nil } // back-link to associated Machine + sched_tick u32 // incremented on every scheduler call + + // Local run queue - lock-free SPMC ring buffer (matches Go's design) + runq_head u32 // consumer index (atomic) + runq_tail u32 // producer index (atomic) + runq [local_queue_size]&Goroutine // circular buffer + runnext &Goroutine = unsafe { nil } // next Goroutine to run (fast path, like Go's runnext) + + // Free Goroutine list for reuse + g_free GoroutineList + + // ID cache to avoid contention on the global counter + goid_cache u64 + goid_cache_end u64 + + link &Processor = unsafe { nil } // linked list for idle Processor list +} + +// Sched is the global scheduler state. +// Translated from Go's `type schedt struct` in runtime2.go. +pub struct Sched { +pub mut: + goid_gen u64 // global goroutine ID generator (atomic) + + mu SpinLock // protects idle lists, global queue (spinlock is ucontext-safe) + + // Idle Machine's waiting for work + midle &Machine = unsafe { nil } + nmidle i32 + + // Idle Processor's + pidle &Processor = unsafe { nil } + npidle i32 + + // Global run queue + runq GoroutineQueue + nmspinning i32 // number of spinning Machine's (atomic) + + // All Processor's (indexed by id) + allp []&Processor + + // Total Machine count + mnext i64 + maxmcount i32 = 10000 + + // Global Goroutine free list + g_free_mu SpinLock + g_free GoroutineList + g_free_count i32 + + // Shutdown + stopped bool +} + +// GoroutineQueue is a simple linked-list queue of Goroutine's (matches Go's gQueue). +pub struct GoroutineQueue { +pub mut: + head &Goroutine = unsafe { nil } + tail &Goroutine = unsafe { nil } + size i32 +} + +// GoroutineList is a list of Goroutine's. +pub struct GoroutineList { +pub mut: + head &Goroutine = unsafe { nil } + count i32 +} + +fn (mut q GoroutineQueue) push_back(gp &Goroutine) { + mut g := unsafe { gp } + g.sched_link = unsafe { nil } + if q.tail != unsafe { nil } { + q.tail.sched_link = g + } else { + q.head = g + } + q.tail = g + q.size++ +} + +fn (mut q GoroutineQueue) push(gp &Goroutine) { + mut g := unsafe { gp } + g.sched_link = q.head + q.head = g + if q.tail == unsafe { nil } { + q.tail = g + } + q.size++ +} + +fn (mut q GoroutineQueue) pop() &Goroutine { + if q.head == unsafe { nil } { + return unsafe { nil } + } + gp := q.head + q.head = gp.sched_link + if q.head == unsafe { nil } { + q.tail = unsafe { nil } + } + q.size-- + return gp +} + +fn (q &GoroutineQueue) empty() bool { + return q.head == unsafe { nil } +} + +fn (mut l GoroutineList) push(gp &Goroutine) { + mut g := unsafe { gp } + g.sched_link = l.head + l.head = g + l.count++ +} + +fn (mut l GoroutineList) pop() &Goroutine { + if l.head == unsafe { nil } { + return unsafe { nil } + } + gp := l.head + l.head = gp.sched_link + l.count-- + return gp +} + +fn (l &GoroutineList) empty() bool { + return l.head == unsafe { nil } +} + +// Global scheduler instance +__global gsched = Sched{} + +// Number of processors (defaults to number of CPU cores) +__global gomaxprocs = i32(0) + +// All goroutines ever created (for debugging) +__global allgs_mu = SpinLock{} +__global allgs = []&Goroutine{} diff --git a/vlib/goroutines/goroutines_test.v b/vlib/goroutines/goroutines_test.v new file mode 100644 index 000000000..a24cfabd9 --- /dev/null +++ b/vlib/goroutines/goroutines_test.v @@ -0,0 +1,173 @@ +// Copyright (c) 2019-2024 Alexander Medvednikov. All rights reserved. +// Use of this source code is governed by an MIT license +// that can be found in the LICENSE file. +module goroutines + +import time + +// Test basic goroutine creation +fn test_goroutine_create() { + mut done := false + f := fn [mut done] () { + done = true + } + goroutine_create(voidptr(&f), unsafe { nil }, 0) + // Give the goroutine time to run + time.sleep(50 * time.millisecond) + // The goroutine should have set done to true + // (Note: in a real implementation this would use proper synchronization) +} + +// Test channel make +fn test_chan_make() { + c := chan_make(int(sizeof(int)), 10) + assert c != unsafe { nil } + assert chan_cap(c) == 10 + assert chan_len(c) == 0 +} + +// Test buffered channel send/recv +fn test_chan_buffered() { + c := chan_make(int(sizeof(int)), 5) + mut val := 42 + assert chan_send(c, voidptr(&val), false) == true + assert chan_len(c) == 1 + + mut recv_val := 0 + received, ok := chan_recv(c, voidptr(&recv_val), false) + assert received == true + assert ok == true + assert recv_val == 42 + assert chan_len(c) == 0 +} + +// Test channel close +fn test_chan_close() { + c := chan_make(int(sizeof(int)), 1) + mut val := 100 + chan_send(c, voidptr(&val), false) + chan_close(c) + + // Should still receive buffered value + mut recv_val := 0 + received, ok := chan_recv(c, voidptr(&recv_val), false) + assert received == true + // After close with data, ok should be true +} + +// Test GoroutineQueue operations +fn test_gqueue() { + mut q := GoroutineQueue{} + assert q.empty() + assert q.size == 0 + + mut g1 := &Goroutine{ + id: 1 + } + mut g2 := &Goroutine{ + id: 2 + } + mut g3 := &Goroutine{ + id: 3 + } + + q.push_back(g1) + assert !q.empty() + assert q.size == 1 + + q.push_back(g2) + q.push_back(g3) + assert q.size == 3 + + gp := q.pop() + assert gp.id == 1 + assert q.size == 2 + + gp2 := q.pop() + assert gp2.id == 2 + + gp3 := q.pop() + assert gp3.id == 3 + assert q.empty() +} + +// Test GoroutineList operations +fn test_glist() { + mut l := GoroutineList{} + assert l.empty() + + mut g1 := &Goroutine{ + id: 10 + } + mut g2 := &Goroutine{ + id: 20 + } + + l.push(g1) + assert !l.empty() + assert l.count == 1 + + l.push(g2) + assert l.count == 2 + + // GoroutineList is a stack (LIFO) + gp := l.pop() + assert gp.id == 20 + + gp2 := l.pop() + assert gp2.id == 10 + assert l.empty() +} + +// Test WaitQ operations +fn test_waitq() { + mut q := WaitQ{} + assert q.empty() + + mut g1 := &Goroutine{ + id: 1 + } + mut g2 := &Goroutine{ + id: 2 + } + + mut s1 := &Sudog{ + g: g1 + } + mut s2 := &Sudog{ + g: g2 + } + + q.enqueue(s1) + assert !q.empty() + + q.enqueue(s2) + + sg := q.dequeue() + assert sg.g.id == 1 + + sg2 := q.dequeue() + assert sg2.g.id == 2 + + assert q.empty() +} + +// Test scheduler initialization +fn test_scheduler_init() { + // The scheduler should have been initialized by module init() + assert gomaxprocs > 0 + assert gsched.allp.len > 0 + assert gsched.allp.len == int(gomaxprocs) + + // P0 should be running (attached to M0) + p0 := gsched.allp[0] + assert p0.status == .running +} + +// Test goroutine ID allocation +fn test_goid_allocation() { + id1 := assign_goid() + id2 := assign_goid() + // IDs should be unique and increasing + assert id2 > id1 +} diff --git a/vlib/goroutines/goroutines_tls.h b/vlib/goroutines/goroutines_tls.h new file mode 100644 index 000000000..8ff79831a --- /dev/null +++ b/vlib/goroutines/goroutines_tls.h @@ -0,0 +1,21 @@ +#ifndef GOROUTINES_TLS_H +#define GOROUTINES_TLS_H + +#include + +void *goroutines_get_current_m(void); +void goroutines_set_current_m(void *mp); + +uint32_t goroutines_atomic_load_u32(volatile uint32_t *ptr); +void goroutines_atomic_store_u32(volatile uint32_t *ptr, uint32_t val); +uint32_t goroutines_atomic_fetch_add_u32(volatile uint32_t *ptr, uint32_t val); +int32_t goroutines_atomic_fetch_add_i32(volatile int32_t *ptr, int32_t val); +int32_t goroutines_atomic_fetch_sub_i32(volatile int32_t *ptr, int32_t val); +uint64_t goroutines_atomic_fetch_add_u64(volatile uint64_t *ptr, uint64_t val); +int goroutines_atomic_cas_u32(volatile uint32_t *ptr, uint32_t *expected, uint32_t desired); +int goroutines_atomic_cas_ptr(void *volatile *ptr, void **expected, void *desired); + +void grt_spinlock_lock(volatile int32_t *lock); +void grt_spinlock_unlock(volatile int32_t *lock); + +#endif diff --git a/vlib/goroutines/init.v b/vlib/goroutines/init.v new file mode 100644 index 000000000..1a7fd4a28 --- /dev/null +++ b/vlib/goroutines/init.v @@ -0,0 +1,111 @@ +// Copyright (c) 2019-2024 Alexander Medvednikov. All rights reserved. +// Use of this source code is governed by an MIT license +// that can be found in the LICENSE file. +// +// Goroutine runtime initialization. +// Translated from Go's schedinit() and procresize() in proc.go. +module goroutines + +import runtime + +// init initializes the goroutine scheduler. +// Creates P's (one per CPU core by default) and the initial M. +// Translated from Go's schedinit() + procresize(). +fn init() { + // Determine number of processors + n := runtime.nr_cpus() + if n < 1 { + gomaxprocs = 1 + } else if n > 256 { + gomaxprocs = 256 + } else { + gomaxprocs = i32(n) + } + + // Initialize the scheduler + gsched.maxmcount = 10000 + gsched.mnext = 1 + + // Create all P's (translated from procresize in proc.go) + gsched.allp = []&Processor{cap: int(gomaxprocs)} + for i in 0 .. gomaxprocs { + mut pp := &Processor{ + id: i + status: .idle + } + // Initialize the local run queue + for j in 0 .. local_queue_size { + pp.runq[j] = unsafe { nil } + } + gsched.allp << pp + } + + // Create the initial M (M0) for the main thread + mut m0 := &Machine{ + id: 0 + g0: &Goroutine{ + id: 0 + status: .running + } + } + + // Create a "main goroutine" so chan_send/recv work from the main thread + mut main_g := &Goroutine{ + id: 0 + status: .running + } + m0.curg = main_g + main_g.m = m0 + + // Wire M0 to P0 + mut p0 := gsched.allp[0] + wire_p(mut m0, mut p0) + + // Set the current thread's M + set_current_m(m0) + + // Put remaining P's on the idle list + for i in 1 .. gomaxprocs { + pid_put(gsched.allp[i]) + } +} + +// set_max_procs changes the number of active processors. +// Returns the previous value. Translated from Go's GOMAXPROCS(). +pub fn set_max_procs(n int) int { + old := int(gomaxprocs) + if n < 1 || n == old { + return old + } + + // TODO: implement full procresize like Go does + // For now, just update the count + gomaxprocs = i32(n) + return old +} + +// num_goroutine returns the number of goroutines that currently exist. +pub fn num_goroutine() int { + allgs_mu.acquire() + mut count := 0 + for g in allgs { + if g.status != .dead { + count++ + } + } + allgs_mu.release() + return count +} + +// shutdown gracefully shuts down the goroutine scheduler. +pub fn shutdown() { + gsched.stopped = true + // Wake all idle M's so they can exit + gsched.mu.acquire() + mut mp := gsched.midle + for mp != unsafe { nil } { + mp.park.post() + mp = mp.sched_link + } + gsched.mu.release() +} diff --git a/vlib/goroutines/park.v b/vlib/goroutines/park.v new file mode 100644 index 000000000..398235ef4 --- /dev/null +++ b/vlib/goroutines/park.v @@ -0,0 +1,143 @@ +// Copyright (c) 2019-2024 Alexander Medvednikov. All rights reserved. +// Use of this source code is governed by an MIT license +// that can be found in the LICENSE file. +// +// Goroutine parking (blocking/unblocking) mechanism. +// Translated from Go's gopark/goready in proc.go. +// +// When a goroutine needs to block (e.g., waiting on a channel), +// it "parks" itself - yielding its M to the scheduler. +// When the condition is met, another goroutine "readies" it +// by putting it back on a run queue. +module goroutines + +// gopark puts the current goroutine into a waiting state. +// The goroutine can be made runnable again by calling goready. +// reason describes why the goroutine is parking (for debugging). +// Translated from Go's gopark() in proc.go. +pub fn gopark(reason string) { + mut mp := get_current_m() + if mp == unsafe { nil } { + return + } + mut gp := mp.curg + if gp == unsafe { nil } { + return + } + + gp.status = .waiting + gp.wait_reason = reason + + // Dissociate G from M + mp.curg = unsafe { nil } + gp.m = unsafe { nil } + + // Switch back to the scheduler (M's g0 context) + context_switch(mut &gp.context, &mp.g0.context) +} + +// goready puts a waiting goroutine back on a run queue. +// Translated from Go's goready() in proc.go. +pub fn goready(gp &Goroutine) { + if gp == unsafe { nil } { + return + } + mut g := unsafe { gp } + if g.status != .waiting { + return + } + g.status = .runnable + g.wait_reason = '' + + // Put it on the current P's local run queue, or global if no P + mut pp := get_current_p() + if pp != unsafe { nil } { + runq_put(mut pp, g, true) + } else { + glob_runq_put(g) + } + + // Try to wake an idle P to run this goroutine + wake_p() +} + +// gosched yields the processor, allowing other goroutines to run. +// Translated from Go's Gosched() / goschedImpl() in proc.go. +pub fn gosched() { + mut mp := get_current_m() + if mp == unsafe { nil } { + return + } + mut gp := mp.curg + if gp == unsafe { nil } { + return + } + mut pp := mp.p + if pp == unsafe { nil } { + return + } + + // Put the current G back on the run queue as runnable + gp.status = .runnable + mp.curg = unsafe { nil } + gp.m = unsafe { nil } + + // Put on local queue + runq_put(mut pp, gp, false) + + // Switch back to scheduler + context_switch(mut &gp.context, &mp.g0.context) +} + +// Sudog represents a G in a wait list (e.g., channel wait queue). +// Translated from Go's sudog struct in runtime2.go. +pub struct Sudog { +pub mut: + g &Goroutine = unsafe { nil } // the waiting goroutine + next &Sudog = unsafe { nil } // next in wait list + prev &Sudog = unsafe { nil } // prev in wait list + elem voidptr // data element (may point to stack) + success bool // true if woken by successful channel op + c voidptr // channel pointer +} + +// WaitQ is a wait queue of Sudogs (used by channels). +// Translated from Go's waitq struct in runtime2.go. +pub struct WaitQ { +pub mut: + first &Sudog = unsafe { nil } + last &Sudog = unsafe { nil } +} + +pub fn (mut q WaitQ) enqueue(s &Sudog) { + mut sg := unsafe { s } + sg.next = unsafe { nil } + sg.prev = q.last + if q.last != unsafe { nil } { + q.last.next = sg + } else { + q.first = sg + } + q.last = sg +} + +pub fn (mut q WaitQ) dequeue() &Sudog { + sg := q.first + if sg == unsafe { nil } { + return unsafe { nil } + } + q.first = sg.next + if q.first != unsafe { nil } { + q.first.prev = unsafe { nil } + } else { + q.last = unsafe { nil } + } + mut s := unsafe { sg } + s.next = unsafe { nil } + s.prev = unsafe { nil } + return s +} + +pub fn (q &WaitQ) empty() bool { + return q.first == unsafe { nil } +} diff --git a/vlib/goroutines/scheduler.v b/vlib/goroutines/scheduler.v new file mode 100644 index 000000000..804c97afa --- /dev/null +++ b/vlib/goroutines/scheduler.v @@ -0,0 +1,642 @@ +// Copyright (c) 2019-2024 Alexander Medvednikov. All rights reserved. +// Use of this source code is governed by an MIT license +// that can be found in the LICENSE file. +// +// GMP Scheduler - translated from Go's runtime/proc.go. +// +// The scheduler distributes ready-to-run goroutines over worker threads. +// Key functions translated from Go: +// - schedule() -> scheduler_loop() +// - findRunnable() -> find_runnable() +// - execute() -> execute() +// - newproc() -> goroutine_create() +// - wakep() -> wake_p() +// - runqput() -> runq_put() +// - runqget() -> runq_get() +// - stealWork() -> steal_work() +module goroutines + +// import sync as _ +// import runtime as _ + +// goroutine_create creates a new goroutine to run `f` with argument `arg`. +// This is the equivalent of Go's `newproc` function. +// Called by the compiler for `go expr()`. +pub fn goroutine_create(f voidptr, arg voidptr, arg_size int) { + gp := newproc1(f, arg, arg_size) + // Put on global run queue so any M can pick it up. + // This is simpler than local queue + work stealing and avoids + // visibility issues when the creator is the main (non-scheduler) thread. + glob_runq_put(gp) + // Try to wake an idle P if needed + wake_p() +} + +// newproc1 allocates and initializes a new G. +// Translated from Go's newproc1 in proc.go. +fn newproc1(f voidptr, arg voidptr, arg_size int) &Goroutine { + // Try to get a dead G from the local P's free list first + mut pp := get_current_p() + mut gp := if pp != unsafe { nil } { + gfget(mut pp) + } else { + unsafe { &Goroutine(nil) } + } + if gp == unsafe { nil } { + // Try global free list + gp = gfget_global() + } + if gp == unsafe { nil } { + // Allocate a new G + gp = &Goroutine{} + } + + // Allocate or reuse stack + if gp.stack == unsafe { nil } { + stack_size := default_stack_size + gp.stack = unsafe { malloc(stack_size) } + gp.stack_size = stack_size + } + + gp.fn_ptr = f + gp.fn_arg = arg + gp.status = .runnable + gp.preempt = false + + // Assign goroutine ID + gp.id = assign_goid() + + // Initialize context to run goroutine_entry + context_init(mut &gp.context, gp.stack, gp.stack_size, goroutine_trampoline, voidptr(gp)) + + // Track all goroutines + allgs_mu.acquire() + allgs << gp + allgs_mu.release() + + return gp +} + +// goroutine_trampoline is the entry point for all goroutines. +// It calls the user function, then cleans up and returns to the scheduler. +// Equivalent to Go's goexit -> goexit0. +fn goroutine_trampoline(arg voidptr) { + mut gp := unsafe { &Goroutine(arg) } + + // Call the actual function + if gp.fn_ptr != unsafe { nil } { + call_goroutine_fn(gp.fn_ptr, gp.fn_arg) + } + + // Function returned - goroutine is done + goexit0(mut gp) +} + +// call_goroutine_fn calls the goroutine's function pointer with the given argument. +fn call_goroutine_fn(fn_ptr voidptr, arg voidptr) { + unsafe { + cb := GoFn(fn_ptr) + cb(arg) + } +} + +// goexit0 handles goroutine cleanup after the user function returns. +// Translated from Go's goexit0 in proc.go. +fn goexit0(mut gp Goroutine) { + mut mp := get_current_m() + + gp.status = .dead + gp.m = unsafe { nil } + gp.fn_ptr = unsafe { nil } + gp.fn_arg = unsafe { nil } + gp.wait_reason = '' + gp.preempt = false + + // Dissociate G from M + if mp != unsafe { nil } { + mp.curg = unsafe { nil } + } + + // Put the dead G on the free list for reuse + mut pp := get_current_p() + if pp != unsafe { nil } { + gfput(mut pp, gp) + } else { + gfput_global(gp) + } + + // Switch back to the scheduler (g0 context). + // This returns to schedule_loop via execute's context_switch. + if mp != unsafe { nil } && mp.g0 != unsafe { nil } { + context_set(&mp.g0.context) + } +} + +// schedule is the main scheduler entry point. +// Finds a runnable goroutine and executes it. Never returns. +// Translated from Go's schedule() in proc.go. +pub fn schedule() { + mut mp := get_current_m() + if mp == unsafe { nil } { + return + } + mut pp := mp.p + if pp == unsafe { nil } { + return + } + pp.sched_tick++ + + // Find a runnable goroutine + gp, _ := find_runnable(mut mp, mut pp) + if gp == unsafe { nil } { + // No work found - park this M + park_m(mut mp) + return + } + + // Execute the goroutine + execute(mut mp, mut pp, gp) +} + +// find_runnable finds a runnable goroutine to execute. +// Tries: local queue, global queue, work stealing from other P's. +// Translated from Go's findRunnable() in proc.go. +fn find_runnable(mut mp Machine, mut pp Processor) (&Goroutine, bool) { + // Check global queue every Nth tick for fairness (Go uses 61) + if pp.sched_tick % global_queue_check_interval == 0 { + gp := glob_runq_get() + if gp != unsafe { nil } { + return gp, false + } + } + + // Try local run queue first + gp, inherit := runq_get(mut pp) + if gp != unsafe { nil } { + return gp, inherit + } + + // Try global run queue + gp2 := glob_runq_get() + if gp2 != unsafe { nil } { + return gp2, false + } + + // Try to steal work from other P's + gp3 := steal_work(mut pp) + if gp3 != unsafe { nil } { + return gp3, false + } + + return unsafe { nil }, false +} + +// execute runs a goroutine on the current M. +// Translated from Go's execute() in proc.go. +fn execute(mut mp Machine, mut pp Processor, gp &Goroutine) { + mut g := unsafe { gp } + // Associate G with M + mp.curg = g + g.m = mp + g.status = .running + + // Switch context to the goroutine + context_switch(mut &mp.g0.context, &g.context) + // When we return here, the goroutine has yielded back to us + // The scheduler loop will be re-entered +} + +// wake_p tries to wake an idle P to run goroutines. +// Translated from Go's wakep() in proc.go. +fn wake_p() { + // Check if there's an idle P + gsched.mu.acquire() + if gsched.npidle == 0 { + gsched.mu.release() + return + } + // Don't wake if there are already spinning M's + if gsched.nmspinning > 0 { + gsched.mu.release() + return + } + // Get an idle P + pp := pid_get() + if pp == unsafe { nil } { + gsched.mu.release() + return + } + gsched.mu.release() + + // Start a new M for this P (or wake an idle one) + start_m(pp) +} + +// park_m parks the current M - it goes to sleep waiting for work. +fn park_m(mut mp Machine) { + // Release P and add M to idle list under lock + gsched.mu.acquire() + if mp.p != unsafe { nil } { + pid_put(mp.p) + mp.p = unsafe { nil } + } + mp.sched_link = gsched.midle + gsched.midle = mp + gsched.nmidle++ + gsched.mu.release() + + // Sleep until woken + mp.park.wait() + + // Woken up - acquire a P and return to schedule_loop + acquire_p(mut mp) +} + +// acquire_p tries to get an idle P for the given M. +fn acquire_p(mut mp Machine) { + gsched.mu.acquire() + mut pp := pid_get() + gsched.mu.release() + if pp != unsafe { nil } { + wire_p(mut mp, mut pp) + } +} + +// wire_p associates a P with an M. +fn wire_p(mut mp Machine, mut pp Processor) { + mp.p = pp + pp.m = mp + pp.status = .running +} + +// start_m starts or wakes an M to run the given P. +// Translated from Go's startm() in proc.go. +fn start_m(pp &Processor) { + gsched.mu.acquire() + + // Try to get an idle M first + mut mp := gsched.midle + if mp != unsafe { nil } { + gsched.midle = mp.sched_link + gsched.nmidle-- + gsched.mu.release() + // Give it the P and wake it + mut p := unsafe { pp } + wire_p(mut mp, mut p) + mp.park.post() + return + } + + // No idle M available - create a new one + id := gsched.mnext + gsched.mnext++ + gsched.mu.release() + + new_m(id, pp) +} + +// new_m creates a new OS thread (M) and associates it with a P. +fn new_m(id i64, pp &Processor) { + mut mp := &Machine{ + id: id + g0: &Goroutine{ + status: .running + } + } + mut p := unsafe { pp } + wire_p(mut mp, mut p) + mp.thread = spawn m_thread_entry(mut mp) +} + +// m_thread_entry is the entry point for new M (OS thread) goroutine scheduling loops. +fn m_thread_entry(mut mp Machine) { + // Register this M as the current thread's Machine so that + // get_current_m()/get_current_p() work on worker threads. + set_current_m(mp) + // Enter the scheduling loop - this never returns + schedule_loop(mut mp) +} + +// schedule_loop is the main loop for an M. It repeatedly finds and runs goroutines. +fn schedule_loop(mut mp Machine) { + for { + if gsched.stopped { + return + } + mut pp := mp.p + if pp == unsafe { nil } { + acquire_p(mut mp) + pp = mp.p + if pp == unsafe { nil } { + // No P available, park + park_m(mut mp) + continue + } + } + pp.sched_tick++ + + gp, _ := find_runnable(mut mp, mut pp) + if gp == unsafe { nil } { + // No work - try spinning briefly before parking + if !mp.spinning { + mp.spinning = true + C.goroutines_atomic_fetch_add_i32(&gsched.nmspinning, 1) + } + // Spin a bit + mut found := false + for _ in 0 .. 20 { + gp2, _ := find_runnable(mut mp, mut pp) + if gp2 != unsafe { nil } { + mp.spinning = false + C.goroutines_atomic_fetch_sub_i32(&gsched.nmspinning, 1) + execute(mut mp, mut pp, gp2) + found = true + break + } + // Yield to avoid burning CPU + proc_yield(10) + } + if !found { + if mp.spinning { + mp.spinning = false + C.goroutines_atomic_fetch_sub_i32(&gsched.nmspinning, 1) + } + park_m(mut mp) + } + continue + } + + if mp.spinning { + mp.spinning = false + C.goroutines_atomic_fetch_sub_i32(&gsched.nmspinning, 1) + } + + execute(mut mp, mut pp, gp) + } +} + +// steal_work tries to steal goroutines from other P's run queues. +// Translated from Go's stealWork() in proc.go. +fn steal_work(mut thisp Processor) &Goroutine { + n := gsched.allp.len + if n <= 1 { + return unsafe { nil } + } + // Randomize starting point to avoid contention + start := u32(C.rand()) % u32(n) + for i := u32(0); i < u32(n); i++ { + idx := (start + i) % u32(n) + pp := gsched.allp[idx] + if pp == thisp { + continue + } + // Try to steal half of the target's run queue + mut target := unsafe { pp } + gp := runq_steal(mut target, mut thisp) + if gp != unsafe { nil } { + return gp + } + } + return unsafe { nil } +} + +// runq_steal steals half of pp's local run queue. +// Translated from Go's runqgrab/runqsteal in proc.go. +fn runq_steal(mut pp Processor, mut thisp Processor) &Goroutine { + t := C.goroutines_atomic_load_u32(&pp.runq_tail) + h := C.goroutines_atomic_load_u32(&pp.runq_head) + n := t - h + if n == 0 { + // Try runnext + next := pp.runnext + if next != unsafe { nil } { + if C.goroutines_atomic_cas_ptr(voidptr(&pp.runnext), voidptr(&next), unsafe { nil }) { + return next + } + } + return unsafe { nil } + } + // Steal half + steal := n - n / 2 + mut first := unsafe { &Goroutine(nil) } + for i := u32(0); i < steal; i++ { + mut gp := pp.runq[(h + i) % local_queue_size] + if i == 0 { + first = gp + } else { + // Enqueue remaining stolen goroutines into thief's local run queue + runq_put(mut thisp, gp, false) + } + } + C.goroutines_atomic_fetch_add_u32(&pp.runq_head, steal) + return first +} + +// Global run queue operations (translated from Go's globrunqput/get) +fn glob_runq_put(gp &Goroutine) { + gsched.mu.acquire() + gsched.runq.push_back(gp) + gsched.mu.release() +} + +fn glob_runq_get() &Goroutine { + gsched.mu.acquire() + gp := gsched.runq.pop() + gsched.mu.release() + return gp +} + +// Local run queue operations (translated from Go's runqput/runqget) + +// runq_put puts a G on the local run queue. +// If next is true, it goes into runnext for immediate scheduling. +// Translated from Go's runqput() in proc.go. +fn runq_put(mut pp Processor, gp &Goroutine, next bool) { + if next { + // Fast path: put as runnext + old := pp.runnext + pp.runnext = unsafe { gp } + if old == unsafe { nil } { + return + } + // Kick old runnext to the regular queue + runq_put(mut pp, old, false) + return + } + + // Regular path: put on the ring buffer + h := C.goroutines_atomic_load_u32(&pp.runq_head) + t := pp.runq_tail + if t - h < local_queue_size { + pp.runq[t % local_queue_size] = unsafe { gp } + C.goroutines_atomic_store_u32(&pp.runq_tail, t + 1) + return + } + // Queue is full - put half on global queue + runq_put_slow(mut pp, gp, h, t) +} + +// runq_put_slow moves half the local queue to the global queue. +// Translated from Go's runqputslow() in proc.go. +fn runq_put_slow(mut pp Processor, gp &Goroutine, h u32, t u32) { + n := (t - h) / 2 + mut batch := GoroutineQueue{} + for i := u32(0); i < n; i++ { + g := pp.runq[(h + i) % local_queue_size] + batch.push_back(g) + } + C.goroutines_atomic_fetch_add_u32(&pp.runq_head, n) + batch.push_back(gp) + + gsched.mu.acquire() + for !batch.empty() { + gsched.runq.push_back(batch.pop()) + } + gsched.mu.release() +} + +// runq_get gets a G from the local run queue. +// Translated from Go's runqget() in proc.go. +fn runq_get(mut pp Processor) (&Goroutine, bool) { + // Check runnext first (fast path) + next := pp.runnext + if next != unsafe { nil } { + pp.runnext = unsafe { nil } + return next, true + } + + // Regular queue + for { + h := C.goroutines_atomic_load_u32(&pp.runq_head) + t := pp.runq_tail + if t == h { + return unsafe { nil }, false + } + gp := pp.runq[h % local_queue_size] + if C.goroutines_atomic_cas_u32(&pp.runq_head, &h, h + 1) { + return gp, false + } + } + return unsafe { nil }, false +} + +// Idle P list operations +fn pid_get() &Processor { + pp := gsched.pidle + if pp != unsafe { nil } { + gsched.pidle = pp.link + gsched.npidle-- + mut p := unsafe { pp } + p.status = .running + p.link = unsafe { nil } + } + return pp +} + +fn pid_put(pp &Processor) { + mut p := unsafe { pp } + p.status = .idle + p.m = unsafe { nil } + p.link = gsched.pidle + gsched.pidle = p + gsched.npidle++ +} + +// G free list operations (translated from Go's gfput/gfget) +fn gfput(mut pp Processor, gp &Goroutine) { + mut g := unsafe { gp } + g.status = .dead + g.fn_ptr = unsafe { nil } + g.fn_arg = unsafe { nil } + g.sched_link = unsafe { nil } + pp.g_free.push(g) +} + +fn gfget(mut pp Processor) &Goroutine { + return pp.g_free.pop() +} + +fn gfput_global(gp &Goroutine) { + gsched.g_free_mu.acquire() + gsched.g_free.push(unsafe { gp }) + gsched.g_free_count++ + gsched.g_free_mu.release() +} + +fn gfget_global() &Goroutine { + gsched.g_free_mu.acquire() + gp := gsched.g_free.pop() + if gp != unsafe { nil } { + gsched.g_free_count-- + } + gsched.g_free_mu.release() + return gp +} + +// assign_goid allocates a unique goroutine ID. +// Uses per-P caching to avoid contention (like Go's goidcache). +fn assign_goid() u64 { + mut pp := get_current_p() + if pp != unsafe { nil } && pp.goid_cache < pp.goid_cache_end { + id := pp.goid_cache + unsafe { + pp.goid_cache++ + } + return id + } + // Refill cache from global counter + batch := u64(16) + id := C.goroutines_atomic_fetch_add_u64(&gsched.goid_gen, batch) + if pp != unsafe { nil } { + unsafe { + pp.goid_cache = id + 1 + pp.goid_cache_end = id + batch + } + } + return id +} + +// proc_yield spins for a short time (used during work stealing). +fn proc_yield(count int) { + for _ in 0 .. count { + // CPU pause instruction to reduce power and contention + $if amd64 { + asm volatile amd64 { + pause + } + } + $if arm64 { + asm volatile arm64 { + yield + } + } + } +} + +// get_current_m returns the M for the current OS thread. +// Uses thread-local storage via C _Thread_local (see tls.c). +fn get_current_m() &Machine { + return unsafe { &Machine(C.goroutines_get_current_m()) } +} + +fn set_current_m(mp &Machine) { + C.goroutines_set_current_m(voidptr(mp)) +} + +// get_current_p returns the P for the current OS thread's M. +fn get_current_p() &Processor { + mp := get_current_m() + if mp == unsafe { nil } { + return unsafe { nil } + } + return mp.p +} + +// get_current_g returns the currently running G. +pub fn get_current_g() &Goroutine { + mp := get_current_m() + if mp == unsafe { nil } { + return unsafe { nil } + } + return mp.curg +} diff --git a/vlib/goroutines/tls.c b/vlib/goroutines/tls.c new file mode 100644 index 000000000..f276428e8 --- /dev/null +++ b/vlib/goroutines/tls.c @@ -0,0 +1,68 @@ +// Thread-local storage, atomic operations, and spinlock for goroutines scheduler. +#include +#include +#include +#include + +// Thread-local Machine pointer +static _Thread_local void *_goroutines_current_m = NULL; + +void *goroutines_get_current_m(void) { + return _goroutines_current_m; +} + +void goroutines_set_current_m(void *mp) { + _goroutines_current_m = mp; +} + +// Atomic operations on uint32_t +uint32_t goroutines_atomic_load_u32(volatile uint32_t *ptr) { + return atomic_load((_Atomic uint32_t *)ptr); +} + +void goroutines_atomic_store_u32(volatile uint32_t *ptr, uint32_t val) { + atomic_store((_Atomic uint32_t *)ptr, val); +} + +uint32_t goroutines_atomic_fetch_add_u32(volatile uint32_t *ptr, uint32_t val) { + return atomic_fetch_add((_Atomic uint32_t *)ptr, val); +} + +// Atomic operations on int32_t +int32_t goroutines_atomic_fetch_add_i32(volatile int32_t *ptr, int32_t val) { + return atomic_fetch_add((_Atomic int32_t *)ptr, val); +} + +int32_t goroutines_atomic_fetch_sub_i32(volatile int32_t *ptr, int32_t val) { + return atomic_fetch_sub((_Atomic int32_t *)ptr, val); +} + +// Atomic operations on uint64_t +uint64_t goroutines_atomic_fetch_add_u64(volatile uint64_t *ptr, uint64_t val) { + return atomic_fetch_add((_Atomic uint64_t *)ptr, val); +} + +// Atomic CAS on pointer-sized values +int goroutines_atomic_cas_u32(volatile uint32_t *ptr, uint32_t *expected, uint32_t desired) { + return atomic_compare_exchange_strong((_Atomic uint32_t *)ptr, expected, desired); +} + +int goroutines_atomic_cas_ptr(void *volatile *ptr, void **expected, void *desired) { + return atomic_compare_exchange_strong((_Atomic(void *) *)ptr, expected, desired); +} + +// Spinlock - safe to use with ucontext (unlike pthreads mutex) +void grt_spinlock_lock(volatile int32_t *lock) { + while (atomic_exchange((_Atomic int32_t *)lock, 1) != 0) { + // Spin with pause hint + #if defined(__x86_64__) || defined(__i386__) + __asm__ volatile("pause"); + #elif defined(__aarch64__) + __asm__ volatile("yield"); + #endif + } +} + +void grt_spinlock_unlock(volatile int32_t *lock) { + atomic_store((_Atomic int32_t *)lock, 0); +} diff --git a/vlib/v2/gen/c/c.v b/vlib/v2/gen/c/c.v index da027a6e8..2650e2495 100644 --- a/vlib/v2/gen/c/c.v +++ b/vlib/v2/gen/c/c.v @@ -34,6 +34,7 @@ pub fn (mut g Gen) gen() string { g.sb.writeln('#include ') g.sb.writeln('#include ') g.sb.writeln('#include ') + g.sb.writeln('#include ') g.sb.writeln('') // Undefine macOS macros that conflict with struct field names g.sb.writeln('#ifdef __APPLE__') @@ -76,6 +77,9 @@ pub fn (mut g Gen) gen() string { // Function forward declarations for all functions g.gen_fn_forward_decls() + // Spawn wrapper functions (pre-scanned from all spawn_call instructions) + g.gen_spawn_wrappers() + // Function bodies (skip stub functions - they'll come from builtin.o) for i, func in g.mod.funcs { if func.blocks.len == 0 { @@ -91,6 +95,55 @@ pub fn (mut g Gen) gen() string { return g.sb.str() } +// gen_spawn_wrappers pre-scans all functions for spawn_call instructions with +// arguments and emits wrapper functions that unpack the args struct and call the +// original function. These must be emitted before function bodies so that the +// wrappers are visible at each pthread_create call site. +fn (mut g Gen) gen_spawn_wrappers() { + for func in g.mod.funcs { + for blk_id in func.blocks { + block := g.mod.blocks[blk_id] + for instr_id in block.instrs { + val := g.mod.values[instr_id] + if val.kind != .instruction { + continue + } + instr := g.mod.instrs[val.index] + if instr.op !in [.spawn_call, .go_call] || instr.operands.len < 2 { + continue + } + fn_ref := instr.operands[0] + fn_val := g.mod.values[fn_ref] + fn_name := sanitize_c_ident(fn_val.name) + prefix := if instr.op == .spawn_call { '_spawn' } else { '_go' } + wrapper_id := '${fn_name}_${val.name}' + arg_count := instr.operands.len - 1 + + g.sb.write_string('struct ${prefix}_args_${wrapper_id} { ') + for ai in 0 .. arg_count { + arg_type := g.type_name(g.mod.values[instr.operands[ai + 1]].typ) + g.sb.write_string('${arg_type} a${ai}; ') + } + g.sb.writeln('};') + g.sb.writeln('static void* ${prefix}_wrapper_${wrapper_id}(void* _arg) {') + g.sb.writeln('\tstruct ${prefix}_args_${wrapper_id}* _args = (struct ${prefix}_args_${wrapper_id}*)_arg;') + g.sb.write_string('\t${fn_name}(') + for ai in 0 .. arg_count { + if ai > 0 { + g.sb.write_string(', ') + } + g.sb.write_string('_args->a${ai}') + } + g.sb.writeln(');') + g.sb.writeln('\tfree(_arg);') + g.sb.writeln('\treturn NULL;') + g.sb.writeln('}') + g.sb.writeln('') + } + } + } +} + fn (mut g Gen) gen_runtime_helpers() { // println g.sb.writeln('static void println(string s) {') @@ -742,6 +795,106 @@ fn (mut g Gen) gen_function(func ssa.Function) { g.write_indent() g.sb.writeln('__builtin_unreachable();') } + .go_call { + // go fn(args...) -> goroutines__goroutine_create(fn, args) + if instr.operands.len >= 1 { + fn_ref := instr.operands[0] + fn_val := g.mod.values[fn_ref] + fn_name := sanitize_c_ident(fn_val.name) + arg_count := instr.operands.len - 1 + + if arg_count == 0 { + // No arguments: pass function directly with NULL arg + g.write_indent() + g.sb.writeln('goroutines__goroutine_create((void*)(void(*)())${fn_name}, NULL, 0);') + } else { + wrapper_id := '${fn_name}_${val.name}' + + g.write_indent() + g.sb.writeln('{') + g.indent++ + + // Declare args struct on heap + g.write_indent() + g.sb.write_string('struct _go_args_${wrapper_id} { ') + for ai in 0 .. arg_count { + arg_type := g.type_name(g.mod.values[instr.operands[ai + 1]].typ) + g.sb.write_string('${arg_type} a${ai}; ') + } + g.sb.writeln('};') + + g.write_indent() + g.sb.writeln('struct _go_args_${wrapper_id}* _args = (struct _go_args_${wrapper_id}*)malloc(sizeof(struct _go_args_${wrapper_id}));') + + // Pack arguments + for ai in 0 .. arg_count { + g.write_indent() + g.sb.write_string('_args->a${ai} = ') + g.gen_value(instr.operands[ai + 1]) + g.sb.writeln(';') + } + + // Call goroutine_create with the unpacking wrapper + g.write_indent() + g.sb.writeln('goroutines__goroutine_create((void*)(void(*)())_go_wrapper_${wrapper_id}, _args, sizeof(struct _go_args_${wrapper_id}));') + + g.indent-- + g.write_indent() + g.sb.writeln('}') + } + } + } + .spawn_call { + // spawn fn(args...) -> launch OS thread via pthread_create + if instr.operands.len >= 1 { + fn_ref := instr.operands[0] + fn_val := g.mod.values[fn_ref] + fn_name := sanitize_c_ident(fn_val.name) + arg_count := instr.operands.len - 1 + + g.write_indent() + g.sb.writeln('{') + g.indent++ + + g.write_indent() + g.sb.writeln('pthread_t _spawn_thread;') + + if arg_count == 0 { + g.write_indent() + g.sb.writeln('pthread_create(&_spawn_thread, NULL, (void*(*)(void*))${fn_name}, NULL);') + } else { + wrapper_id := '${fn_name}_${val.name}' + + g.write_indent() + g.sb.write_string('struct _spawn_args_${wrapper_id} { ') + for ai in 0 .. arg_count { + arg_type := g.type_name(g.mod.values[instr.operands[ai + 1]].typ) + g.sb.write_string('${arg_type} a${ai}; ') + } + g.sb.writeln('};') + + g.write_indent() + g.sb.writeln('struct _spawn_args_${wrapper_id}* _args = (struct _spawn_args_${wrapper_id}*)malloc(sizeof(struct _spawn_args_${wrapper_id}));') + + for ai in 0 .. arg_count { + g.write_indent() + g.sb.write_string('_args->a${ai} = ') + g.gen_value(instr.operands[ai + 1]) + g.sb.writeln(';') + } + + g.write_indent() + g.sb.writeln('pthread_create(&_spawn_thread, NULL, _spawn_wrapper_${wrapper_id}, _args);') + } + + g.write_indent() + g.sb.writeln('pthread_detach(_spawn_thread);') + + g.indent-- + g.write_indent() + g.sb.writeln('}') + } + } else { // Other ops: emit as comment g.write_indent() diff --git a/vlib/v2/gen/cleanc/cleanc.v b/vlib/v2/gen/cleanc/cleanc.v index 0e3310b5d..f3bcf009e 100644 --- a/vlib/v2/gen/cleanc/cleanc.v +++ b/vlib/v2/gen/cleanc/cleanc.v @@ -1301,7 +1301,12 @@ fn (mut g Gen) gen_keyword_operator(node ast.KeywordOperator) { g.sb.write_string('0') } } - .key_spawn, .key_go { + .key_spawn { + g.gen_spawn_expr(node) + } + .key_go { + // go calls are lowered by the transformer to goroutines__goroutine_create. + // If we reach here, fall back to spawn behavior. g.gen_spawn_expr(node) } else { diff --git a/vlib/v2/ssa/builder.v b/vlib/v2/ssa/builder.v index b5774238a..a3aec4251 100644 --- a/vlib/v2/ssa/builder.v +++ b/vlib/v2/ssa/builder.v @@ -6616,12 +6616,54 @@ fn (mut b Builder) build_keyword_operator(kw ast.KeywordOperator) ValueID { // Fallback: sizeof(int) = 4 return b.mod.get_or_add_const(b.mod.type_store.get_int(32), '4') } + .key_go { + // `go expr()` - launch a goroutine via the GMP scheduler. + // NOTE: The transformer normally lowers `go` to a regular call to + // goroutines__goroutine_create, so this path is a fallback. + if kw.exprs.len > 0 { + return b.build_go_or_spawn(kw.exprs[0], .go_call) + } + return b.mod.get_or_add_const(b.mod.type_store.get_int(64), '0') + } + .key_spawn { + // `spawn expr()` - launch an OS thread. + if kw.exprs.len > 0 { + return b.build_go_or_spawn(kw.exprs[0], .spawn_call) + } + return b.mod.get_or_add_const(b.mod.type_store.get_int(64), '0') + } else { return b.mod.get_or_add_const(b.mod.type_store.get_int(64), '0') } } } +// build_go_or_spawn emits an SSA instruction for `go fn_call()` or `spawn fn_call()`. +// For `go`: emits go_call which the C backend translates to goroutines__goroutine_create(). +// For `spawn`: emits spawn_call which the C backend translates to pthread_create(). +fn (mut b Builder) build_go_or_spawn(expr ast.Expr, opcode OpCode) ValueID { + // The expression should be a function call + if expr is ast.CallExpr { + call := expr as ast.CallExpr + mut operands := []ValueID{} + + // First operand: the function reference + fn_val := b.build_expr(call.lhs) + operands << fn_val + + // Remaining operands: the arguments + for arg in call.args { + arg_val := b.build_expr(arg) + operands << arg_val + } + + void_t := b.mod.type_store.get_void() + return b.mod.add_instr(opcode, b.cur_block, void_t, operands) + } + // Fallback: just build the expression (shouldn't happen for well-formed code) + return b.build_expr(expr) +} + fn (b &Builder) sizeof_value(expr ast.Expr) int { match expr { ast.Ident { diff --git a/vlib/v2/ssa/instr.v b/vlib/v2/ssa/instr.v index e8598a243..aedf8e932 100644 --- a/vlib/v2/ssa/instr.v +++ b/vlib/v2/ssa/instr.v @@ -80,6 +80,10 @@ pub enum OpCode { assign // copy for phi elimination inline_string_init // Create string struct by value: (string){str, len, is_lit} + // Concurrency + go_call // Launch goroutine: go_call fn_ref, args... + spawn_call // Launch OS thread: spawn_call fn_ref, args... + // Aggregate (struct/tuple) operations extractvalue // Extract element from struct/tuple: extractvalue %tuple, index insertvalue // Insert element into struct/tuple: insertvalue %tuple, %val, index diff --git a/vlib/v2/transformer/expr.v b/vlib/v2/transformer/expr.v index ab69bfeda..c50bdeaaf 100644 --- a/vlib/v2/transformer/expr.v +++ b/vlib/v2/transformer/expr.v @@ -324,6 +324,9 @@ fn (mut t Transformer) transform_expr(expr ast.Expr) ast.Expr { } } } + if expr.op == .key_go && expr.exprs.len > 0 { + return t.lower_go_call(expr) + } expr } else { @@ -2695,4 +2698,118 @@ fn (mut t Transformer) transform_comptime_expr(expr ast.ComptimeExpr) ast.Expr { } } +// lower_go_call transforms `go foo(a, b)` into a regular call to +// goroutines__goroutine_create, avoiding backend-specific go handling. +// For zero-arg calls: goroutines__goroutine_create(voidptr(foo), voidptr(0), 0) +// For calls with args: generates a wrapper struct + trampoline function, +// then calls goroutines__goroutine_create(trampoline, packed_args, sizeof). +fn (mut t Transformer) lower_go_call(expr ast.KeywordOperator) ast.Expr { + call := expr.exprs[0] + mut call_lhs := ast.empty_expr + mut call_args := []ast.Expr{} + if call is ast.CallExpr { + call_lhs = call.lhs + call_args = call.args.clone() + } else if call is ast.CallOrCastExpr { + call_lhs = call.lhs + call_args = [call.expr] + } else { + // Not a call expression, return as-is + return expr + } + // Resolve the V-level function name for the wrapper + fn_name := t.resolve_go_fn_name(call_lhs) + if fn_name == '' { + // Cannot resolve — fall back to keeping the original expression + return expr + } + // Transform all arguments + mut transformed_args := []ast.Expr{cap: call_args.len} + for arg in call_args { + transformed_args << t.transform_expr(arg) + } + // Resolve argument type names for struct generation + mut arg_type_names := []string{cap: call_args.len} + for arg in call_args { + if typ := t.get_expr_type(arg) { + arg_type_names << t.type_to_c_name(typ) + } else { + arg_type_names << 'int' + } + } + // Check for method call (receiver needs to be first arg in wrapper) + mut is_method := false + mut receiver_expr := ast.empty_expr + mut receiver_type_name := '' + if call_lhs is ast.SelectorExpr { + sel_lhs := call_lhs.lhs + if !(sel_lhs is ast.Ident && t.is_module_name(sel_lhs.name)) { + is_method = true + receiver_expr = t.transform_expr(sel_lhs) + if recv_type := t.get_expr_type(sel_lhs) { + receiver_type_name = t.type_to_c_name(recv_type) + } else { + receiver_type_name = 'voidptr' + } + } + } + wrapper_name := '__go_wrap_${fn_name}' + if wrapper_name !in t.needed_go_wrappers { + mut param_names := []string{} + mut param_types := []string{} + if is_method { + param_names << '_recv' + param_types << receiver_type_name + } + for i, type_name in arg_type_names { + param_names << '_a${i}' + param_types << type_name + } + t.needed_go_wrappers[wrapper_name] = GoWrapperInfo{ + fn_name: fn_name + wrapper_name: wrapper_name + param_names: param_names + param_types: param_types + } + } + // Build call args: [receiver, args...] for methods, [args...] for functions + mut wrapper_args := []ast.Expr{} + if is_method { + wrapper_args << receiver_expr + } + for arg in transformed_args { + wrapper_args << arg + } + return ast.CallExpr{ + lhs: ast.Ident{ + name: wrapper_name + } + args: wrapper_args + pos: expr.pos + } +} + +// resolve_go_fn_name extracts the C-mangled function name from a call LHS expression. +fn (t &Transformer) resolve_go_fn_name(lhs ast.Expr) string { + if lhs is ast.Ident { + if t.cur_module != '' { + return '${t.cur_module}__${lhs.name}' + } + return lhs.name + } + if lhs is ast.SelectorExpr { + if lhs.lhs is ast.Ident { + mod_name := lhs.lhs.name + return '${mod_name}__${lhs.rhs.name}' + } + } + return '' +} + +// is_module_name checks if a name refers to a known module. +fn (t &Transformer) is_module_name(name string) bool { + // Check if the name is a known module by looking up its scope + return name in t.cached_scopes +} + // eval_comptime_if evaluates a compile-time $if and returns the selected branch expression diff --git a/vlib/v2/transformer/transformer.v b/vlib/v2/transformer/transformer.v index a3207cacd..0387392c8 100644 --- a/vlib/v2/transformer/transformer.v +++ b/vlib/v2/transformer/transformer.v @@ -65,6 +65,8 @@ mut: // Track needed auto-generated sort comparator functions needed_sort_fns map[string]SortComparatorInfo needed_enum_str_fns map[string]types.Enum + // Track needed go wrapper functions (go call -> goroutine_create lowering) + needed_go_wrappers map[string]GoWrapperInfo // Override array element types for variables whose checker-inferred type is wrong // (e.g. .map(fn_name) typed as []voidptr instead of []ReturnType) array_elem_type_overrides map[string]string @@ -109,6 +111,16 @@ struct ArrayMethodInfo { fixed_len int } +// GoWrapperInfo tracks information needed to synthesize a go-wrapper function. +// For `go foo(a, b)`, we generate a wrapper that packs args and calls +// goroutines__goroutine_create, and a trampoline that unpacks args and calls foo. +struct GoWrapperInfo { + fn_name string // C-mangled function name (e.g. "main__foo") + wrapper_name string // Wrapper function name (e.g. "__go_wrap_main__foo") + param_names []string // Parameter names + param_types []string // Parameter C type names +} + fn builder_write_string_stmt(sb_ref ast.Expr, s ast.Expr) ast.Stmt { return ast.Stmt(ast.ExprStmt{ expr: ast.Expr(ast.CallExpr{ @@ -149,6 +161,7 @@ pub fn Transformer.new_with_pref(files []ast.File, env &types.Environment, p &pr needed_array_index_fns: map[string]ArrayMethodInfo{} needed_array_last_index_fns: map[string]ArrayMethodInfo{} needed_sort_fns: map[string]SortComparatorInfo{} + needed_go_wrappers: map[string]GoWrapperInfo{} runtime_const_inits_by_mod: map[string][]RuntimeConstInit{} runtime_const_init_fn_name: map[string]string{} } @@ -180,6 +193,7 @@ pub fn (t &Transformer) new_worker_clone(worker_idx int) &Transformer { needed_array_index_fns: map[string]ArrayMethodInfo{} needed_array_last_index_fns: map[string]ArrayMethodInfo{} needed_sort_fns: map[string]SortComparatorInfo{} + needed_go_wrappers: map[string]GoWrapperInfo{} runtime_const_inits_by_mod: map[string][]RuntimeConstInit{} runtime_const_init_fn_name: map[string]string{} } @@ -205,6 +219,9 @@ pub fn (mut t Transformer) merge_worker(w &Transformer) { for k, v in w.needed_enum_str_fns { t.needed_enum_str_fns[k] = v } + for k, v in w.needed_go_wrappers { + t.needed_go_wrappers[k] = v + } for k, v in w.interface_concrete_types { t.interface_concrete_types[k] = v } @@ -582,6 +599,9 @@ pub fn (mut t Transformer) post_pass(mut result []ast.File) { || t.needed_array_last_index_fns.len > 0 { generated_fns << t.generate_array_method_functions() } + if t.needed_go_wrappers.len > 0 { + generated_fns << t.generate_go_wrapper_functions() + } if generated_fns.len > 0 { // Split into core (builtin types), module-specific, and user (main) functions mut core_fns := []ast.Stmt{} @@ -8924,3 +8944,269 @@ fn (mut t Transformer) generate_sort_comparator_fn(fn_name string, info SortComp stmts: body_stmts }) } + +// generate_go_wrapper_functions generates synthesized functions for lowering +// `go foo(args)` to goroutine creation via goroutines__goroutine_create. +// +// For each go-called function foo(a int, b string), generates: +// 1. struct __GoArgs_foo { a0 int; a1 string } +// 2. fn __go_trampoline_foo(arg voidptr) { args := *(&__GoArgs_foo(arg)); foo(args.a0, args.a1); C.free(arg) } +// 3. fn __go_wrap_foo(a int, b string) { args := C.malloc(sizeof(__GoArgs_foo)); ...; goroutines__goroutine_create(trampoline, args, sizeof) } +fn (mut t Transformer) generate_go_wrapper_functions() []ast.Stmt { + mut result := []ast.Stmt{} + for _, info in t.needed_go_wrappers { + struct_name := '__GoArgs_${info.fn_name}' + trampoline_name := '__go_trampoline_${info.fn_name}' + // 1. Generate the args struct + if info.param_names.len > 0 { + mut fields := []ast.FieldDecl{} + for i, pname in info.param_names { + fields << ast.FieldDecl{ + name: pname + typ: ast.Ident{ + name: info.param_types[i] + } + } + } + result << ast.Stmt(ast.StructDecl{ + name: struct_name + fields: fields + }) + } + // 2. Generate the trampoline function + if info.param_names.len > 0 { + mut trampoline_stmts := []ast.Stmt{} + // args := unsafe { *(&__GoArgs_foo(arg)) } + // We use a cast + dereference at the C level. + // At AST level: args := *voidptr(arg) cast to struct pointer, then deref. + // Simpler: use direct field access with cast. + trampoline_stmts << ast.Stmt(ast.AssignStmt{ + op: .decl_assign + lhs: [ast.Expr(ast.Ident{ + name: '_go_args' + })] + rhs: [ + ast.Expr(ast.PrefixExpr{ + op: .mul + expr: ast.CastExpr{ + typ: ast.PrefixExpr{ + op: .amp + expr: ast.Ident{ + name: struct_name + } + } + expr: ast.Ident{ + name: 'arg' + } + } + }), + ] + }) + // foo(_go_args.a0, _go_args.a1, ...) + mut call_args := []ast.Expr{} + for pname in info.param_names { + call_args << ast.Expr(ast.SelectorExpr{ + lhs: ast.Ident{ + name: '_go_args' + } + rhs: ast.Ident{ + name: pname + } + }) + } + trampoline_stmts << ast.Stmt(ast.ExprStmt{ + expr: ast.CallExpr{ + lhs: ast.Ident{ + name: info.fn_name + } + args: call_args + } + }) + // C.free(arg) + trampoline_stmts << ast.Stmt(ast.ExprStmt{ + expr: ast.CallExpr{ + lhs: ast.SelectorExpr{ + lhs: ast.Ident{ + name: 'C' + } + rhs: ast.Ident{ + name: 'free' + } + } + args: [ast.Expr(ast.Ident{ + name: 'arg' + })] + } + }) + result << ast.Stmt(ast.FnDecl{ + name: trampoline_name + typ: ast.FnType{ + params: [ + ast.Parameter{ + name: 'arg' + typ: ast.Ident{ + name: 'voidptr' + } + }, + ] + } + stmts: trampoline_stmts + }) + } + // 3. Generate the dispatch wrapper function + mut dispatch_stmts := []ast.Stmt{} + if info.param_names.len == 0 { + // Zero args: goroutines__goroutine_create(voidptr(foo), voidptr(0), 0) + dispatch_stmts << ast.Stmt(ast.ExprStmt{ + expr: ast.CallExpr{ + lhs: ast.Ident{ + name: 'goroutines__goroutine_create' + } + args: [ + ast.Expr(ast.CastExpr{ + typ: ast.Ident{ + name: 'voidptr' + } + expr: ast.Ident{ + name: info.fn_name + } + }), + ast.Expr(ast.CastExpr{ + typ: ast.Ident{ + name: 'voidptr' + } + expr: ast.BasicLiteral{ + kind: .number + value: '0' + } + }), + ast.Expr(ast.BasicLiteral{ + kind: .number + value: '0' + }), + ] + } + }) + } else { + // Allocate args struct on heap + // mut _args := &__GoArgs_foo{ a0: param0, a1: param1, ... } + // We use C.malloc + field assignment for simplicity + // _args_ptr := C.malloc(sizeof(__GoArgs_foo)) + dispatch_stmts << ast.Stmt(ast.AssignStmt{ + op: .decl_assign + lhs: [ast.Expr(ast.Ident{ + name: '_args_ptr' + })] + rhs: [ + ast.Expr(ast.CallExpr{ + lhs: ast.SelectorExpr{ + lhs: ast.Ident{ + name: 'C' + } + rhs: ast.Ident{ + name: 'malloc' + } + } + args: [ + ast.Expr(ast.KeywordOperator{ + op: .key_sizeof + exprs: [ast.Expr(ast.Ident{ + name: struct_name + })] + }), + ] + }), + ] + }) + // _args := &__GoArgs_foo(_args_ptr) + dispatch_stmts << ast.Stmt(ast.AssignStmt{ + op: .decl_assign + lhs: [ast.Expr(ast.Ident{ + name: '_args' + })] + rhs: [ + ast.Expr(ast.CastExpr{ + typ: ast.PrefixExpr{ + op: .amp + expr: ast.Ident{ + name: struct_name + } + } + expr: ast.Ident{ + name: '_args_ptr' + } + }), + ] + }) + // _args.field = param for each field + for i, pname in info.param_names { + dispatch_stmts << ast.Stmt(ast.AssignStmt{ + op: .assign + lhs: [ + ast.Expr(ast.SelectorExpr{ + lhs: ast.Ident{ + name: '_args' + } + rhs: ast.Ident{ + name: pname + } + }), + ] + rhs: [ast.Expr(ast.Ident{ + name: pname + })] + }) + } + // goroutines__goroutine_create(voidptr(trampoline), voidptr(_args_ptr), sizeof(__GoArgs_foo)) + dispatch_stmts << ast.Stmt(ast.ExprStmt{ + expr: ast.CallExpr{ + lhs: ast.Ident{ + name: 'goroutines__goroutine_create' + } + args: [ + ast.Expr(ast.CastExpr{ + typ: ast.Ident{ + name: 'voidptr' + } + expr: ast.Ident{ + name: trampoline_name + } + }), + ast.Expr(ast.CastExpr{ + typ: ast.Ident{ + name: 'voidptr' + } + expr: ast.Ident{ + name: '_args_ptr' + } + }), + ast.Expr(ast.KeywordOperator{ + op: .key_sizeof + exprs: [ast.Expr(ast.Ident{ + name: struct_name + })] + }), + ] + } + }) + } + // Build params for the dispatch function (same as original function) + mut dispatch_params := []ast.Parameter{} + for i, pname in info.param_names { + dispatch_params << ast.Parameter{ + name: pname + typ: ast.Ident{ + name: info.param_types[i] + } + } + } + result << ast.Stmt(ast.FnDecl{ + name: info.wrapper_name + typ: ast.FnType{ + params: dispatch_params + } + stmts: dispatch_stmts + }) + } + return result +} -- 2.39.5