From c216e59bfc85a863749963938d7131121114afd4 Mon Sep 17 00:00:00 2001 From: kbkpbot Date: Fri, 4 Jul 2025 23:45:14 +0800 Subject: [PATCH] datatypes: add lockfree version of counter and ringbuffer (#24839) --- vlib/datatypes/lockfree/README.md | 67 ++ vlib/datatypes/lockfree/bench/bench_channel.v | 218 +++++++ .../lockfree/bench/bench_ringbuffer.v | 292 +++++++++ vlib/datatypes/lockfree/counter.v | 63 ++ vlib/datatypes/lockfree/counter_test.v | 42 ++ vlib/datatypes/lockfree/lockfree.v | 23 + vlib/datatypes/lockfree/ringbuffer.v | 593 ++++++++++++++++++ vlib/datatypes/lockfree/ringbuffer_test.v | 373 +++++++++++ 8 files changed, 1671 insertions(+) create mode 100644 vlib/datatypes/lockfree/README.md create mode 100644 vlib/datatypes/lockfree/bench/bench_channel.v create mode 100644 vlib/datatypes/lockfree/bench/bench_ringbuffer.v create mode 100644 vlib/datatypes/lockfree/counter.v create mode 100644 vlib/datatypes/lockfree/counter_test.v create mode 100644 vlib/datatypes/lockfree/lockfree.v create mode 100644 vlib/datatypes/lockfree/ringbuffer.v create mode 100644 vlib/datatypes/lockfree/ringbuffer_test.v diff --git a/vlib/datatypes/lockfree/README.md b/vlib/datatypes/lockfree/README.md new file mode 100644 index 000000000..546fe4765 --- /dev/null +++ b/vlib/datatypes/lockfree/README.md @@ -0,0 +1,67 @@ +# Lockfree Library for V + +A high-performance, thread-safe collection of lock-free data structures for +the V programming language. Designed for concurrent applications requiring +low-latency and high-throughput data processing. + +## Features + +- **Truly Lock-Free**: No mutexes or spinlocks +- **Cross-Platform**: Works on Windows, Linux, macOS +- **Configurable**: Tune parameters for specific workloads +- **High Performance**: Optimized for modern multi-core processors + +## Data Structures + +### 1. Atomic Counter + +A thread-safe counter with atomic operations. + +```v +import datatypes.lockfree + +mut counter := lockfree.new_counter[int](0) +counter.increment() +counter.increment_by(5) +value := counter.get() // 6 +counter.decrement() +counter.clear() +``` + +**Features**: +- Atomic increment/decrement +- Batch operations +- Get current value +- Reset functionality + +### 2. Ring Buffer + +A circular buffer for producer-consumer scenarios. + +```v +import datatypes.lockfree + +mut rb := lockfree.new_ringbuffer[int](1024) +rb.push(10) +rb.push(20) +item := rb.pop() // 10 +free := rb.remaining() +``` + +**Features**: +- Single/Multi producer/consumer modes +- Blocking/non-blocking operations +- Batch operations +- Configurable size +- Memory efficient + +## Acknowledgements + +This library incorporates research and design principles from: +- Intel Threading Building Blocks (TBB) +- Facebook Folly +- Java Concurrent Package +- Dmitry Vyukov's lock-free algorithms +- DPDK rte ring + +--- diff --git a/vlib/datatypes/lockfree/bench/bench_channel.v b/vlib/datatypes/lockfree/bench/bench_channel.v new file mode 100644 index 000000000..f86c857a9 --- /dev/null +++ b/vlib/datatypes/lockfree/bench/bench_channel.v @@ -0,0 +1,218 @@ +module main + +import time +import sync +import os +import flag +import runtime + +// Test configuration parameters +const buffer_size = 1024 // Size of the channel + +const items_per_thread = 1_000_000 // Items to produce per thread + +const warmup_runs = 3 // Number of warmup runs + +const test_runs = 5 // Number of test runs + +const max_threads = runtime.nr_jobs() // Maximum number of threads + +// Performance test results +struct ChannelPerfResult { + scenario string // Test scenario identifier + throughput f64 // Throughput in million operations per second + latency f64 // Average latency in nanoseconds + cpu_usage f64 // CPU usage percentage +} + +fn main() { + println('Channel Performance Test') + println('======================================') + println('Maximum number of threads set to nr_jobs = ${max_threads}') + mut fp := flag.new_flag_parser(os.args.clone()) + fp.skip_executable() + show_help := fp.bool('help', 0, false, 'Show this help screen\n') + debug := fp.bool('debug', 0, false, 'Show debug message, stat of channel') + if show_help { + println(fp.usage()) + exit(0) + } + + // Test different scenarios + mut results := []ChannelPerfResult{} + + // Single Producer Single Consumer + results << test_scenario('SPSC', 1, 1, debug) + + // Multiple Producers Single Consumer + for i in [2, 4, 8, 16] { + if i <= max_threads { + results << test_scenario('MPSC (${i}P1C)', i, 1, debug) + } + } + + // Single Producer Multiple Consumers + for i in [2, 4, 8, 16] { + if i <= max_threads { + results << test_scenario('SPMC (1P${i}C)', 1, i, debug) + } + } + + // Multiple Producers Multiple Consumers + for i in [2, 4, 8, 16] { + if i * 2 <= max_threads { + results << test_scenario('MPMC (${i}P${i}C)', i, i, debug) + } + } + + // Print final results + print_results(results) +} + +// Test specific scenario with given producers/consumers +fn test_scenario(scenario string, producers int, consumers int, debug bool) ChannelPerfResult { + println('\nTesting scenario: ${scenario}') + + // Create channel + ch := chan int{cap: buffer_size} + defer { + ch.close() + } + + // Warmup runs + for _ in 0 .. warmup_runs { + run_test(ch, producers, consumers, false, debug) + } + + // Actual test runs + mut total_time := time.Duration(0) + mut total_ops := 0 + + for _ in 0 .. test_runs { + duration, ops := run_test(ch, producers, consumers, true, debug) + total_time += duration + total_ops += ops + } + + // Calculate performance metrics + avg_time := total_time / test_runs + throughput := f64(total_ops) / avg_time.seconds() / 1_000_000 // MOPS + latency := avg_time.nanoseconds() / f64(total_ops / test_runs) // ns/op + + return ChannelPerfResult{ + scenario: scenario + throughput: throughput + latency: latency + cpu_usage: 0.0 // Actual value should be obtained from system + } +} + +// Execute single test run +fn run_test(ch chan int, producers int, consumers int, measure bool, debug bool) (time.Duration, int) { + mut wg := sync.new_waitgroup() + total_items := producers * items_per_thread + + // Key fix: Consumers should consume exact producer total + items_per_consumer := total_items / consumers + mut remaining := total_items % consumers + + // Start producers + start_time := time.now() + for i in 0 .. producers { + wg.add(1) + spawn producer_thread(ch, i, mut wg, debug) + } + + // Start consumers + mut consumed_counts := []int{len: consumers, init: 0} + for i in 0 .. consumers { + wg.add(1) + // Distribute remaining items to first consumers + mut count := items_per_consumer + if remaining > 0 { + count += 1 + remaining -= 1 + } + spawn consumer_thread(ch, i, count, mut consumed_counts, mut wg, debug) + } + + // Wait for all threads to complete + wg.wait() + end_time := time.now() + + // Validate results + mut total_consumed := 0 + for count in consumed_counts { + total_consumed += count + } + + if total_consumed != total_items { + eprintln('Error: Produced ${total_items} items but consumed ${total_consumed}') + } + + duration := end_time - start_time + if measure { + println('Completed ${total_items} items in ${duration.milliseconds()}ms') + } + + return duration, total_items +} + +// Producer thread implementation +fn producer_thread(ch chan int, id int, mut wg sync.WaitGroup, debug bool) { + defer { + wg.done() + } + + // Generate items in producer-specific range + start := id * items_per_thread + end := start + items_per_thread + + for i in start .. end { + ch <- i + } +} + +// Consumer thread with fixed consumption target +fn consumer_thread(ch chan int, id int, items_to_consume int, mut consumed_counts []int, mut wg sync.WaitGroup, debug bool) { + defer { + wg.done() + } + + for i in 0 .. items_to_consume { + _ := <-ch + + // Debug output + if debug && i % 1000000 == 0 { + println('consume item count = ${i}') + } + } + + consumed_counts[id] = items_to_consume +} + +// Print formatted performance results +fn print_results(results []ChannelPerfResult) { + println('\nPerformance Results') + println('====================================================================') + println('Scenario\t\tThroughput (M ops/s)\tAvg Latency (ns)\tCPU Usage (%)') + println('--------------------------------------------------------------------') + + for res in results { + println('${res.scenario:20}\t${res.throughput:8.2f}\t\t\t${res.latency:8.2f}\t\t\t${res.cpu_usage:5.1f}') + } + println('====================================================================') + + // Find best performing scenario + mut best_throughput := 0.0 + mut best_scenario := '' + + for res in results { + if res.throughput > best_throughput { + best_throughput = res.throughput + best_scenario = res.scenario + } + } + + println('\nBest performance: ${best_scenario} with ${best_throughput:.2f} M ops/s') +} diff --git a/vlib/datatypes/lockfree/bench/bench_ringbuffer.v b/vlib/datatypes/lockfree/bench/bench_ringbuffer.v new file mode 100644 index 000000000..c9a1e6eb4 --- /dev/null +++ b/vlib/datatypes/lockfree/bench/bench_ringbuffer.v @@ -0,0 +1,292 @@ +module main + +import datatypes.lockfree +import time +import sync +import os +import flag +import runtime + +// Test configuration parameters +const buffer_size = 1024 // Size of the ring buffer + +const items_per_thread = 1_000_000 // Items to produce per thread + +const warmup_runs = 3 // Number of warmup runs + +const test_runs = 5 // Number of test runs + +const max_threads = runtime.nr_jobs() // Maximum number of threads + +// Performance test results +struct RingBufferPerfResult { + scenario string // Test scenario identifier + throughput f64 // Throughput in million operations per second + latency f64 // Average latency in nanoseconds + cpu_usage f64 // CPU usage percentage +} + +fn main() { + println('Lock-Free Ring Buffer Performance Test') + println('======================================') + println('Maximum number of threads set to nr_jobs = ${max_threads}') + mut fp := flag.new_flag_parser(os.args.clone()) + fp.skip_executable() + show_help := fp.bool('help', 0, false, 'Show this help screen\n') + debug := fp.bool('debug', 0, false, 'Show debug message, stat of ringbuffer') + batch := fp.bool('batch', 0, true, 'Batch mode, batch size = 32') + if show_help { + println(fp.usage()) + exit(0) + } + + // Test different scenarios + mut results := []RingBufferPerfResult{} + + // Single Producer Single Consumer + results << test_scenario('SPSC', 1, 1, batch, debug) + + // Multiple Producers Single Consumer + for i in [2, 4, 8, 16] { + if i <= max_threads { + results << test_scenario('MPSC (${i}P1C)', i, 1, batch, debug) + } + } + + // Single Producer Multiple Consumers + for i in [2, 4, 8, 16] { + if i <= max_threads { + results << test_scenario('SPMC (1P${i}C)', 1, i, batch, debug) + } + } + + // Multiple Producers Multiple Consumers + for i in [2, 4, 8, 16] { + if i * 2 <= max_threads { + results << test_scenario('MPMC (${i}P${i}C)', i, i, batch, debug) + } + } + + // Print final results + print_results(results) +} + +// Test specific scenario with given producers/consumers +fn test_scenario(scenario string, producers int, consumers int, batch bool, debug bool) RingBufferPerfResult { + println('\nTesting scenario: ${scenario}') + + // Create ring buffer + mut rb := lockfree.new_ringbuffer[int](buffer_size) + + // Warmup runs + for _ in 0 .. warmup_runs { + run_test(mut rb, producers, consumers, false, batch, debug) + rb.clear() + } + + // Actual test runs + mut total_time := time.Duration(0) + mut total_ops := 0 + + for _ in 0 .. test_runs { + duration, ops := run_test(mut rb, producers, consumers, true, batch, debug) + total_time += duration + total_ops += ops + + // Reset buffer after each run + rb.clear() + } + + // Calculate performance metrics + avg_time := total_time / test_runs + throughput := f64(total_ops) / avg_time.seconds() / 1_000_000 // MOPS + latency := avg_time.nanoseconds() / f64(total_ops / test_runs) // ns/op + + return RingBufferPerfResult{ + scenario: scenario + throughput: throughput + latency: latency + cpu_usage: 0.0 // Actual value should be obtained from system + } +} + +// Execute single test run +fn run_test(mut rb lockfree.RingBuffer[int], producers int, consumers int, measure bool, batch bool, debug bool) (time.Duration, int) { + mut wg := sync.new_waitgroup() + total_items := producers * items_per_thread + + // Key fix: Consumers should consume exact producer total + items_per_consumer := total_items / consumers + mut remaining := total_items % consumers + + // Start producers + start_time := time.now() + for i in 0 .. producers { + wg.add(1) + spawn producer_thread(mut rb, i, mut wg, batch, debug) + } + + // Start consumers + mut consumed_counts := []int{len: consumers, init: 0} + for i in 0 .. consumers { + wg.add(1) + // Distribute remaining items to first consumers + mut count := items_per_consumer + if remaining > 0 { + count += 1 + remaining -= 1 + } + spawn consumer_thread(mut rb, i, count, mut consumed_counts, mut wg, batch, debug) + } + + // Wait for all threads to complete + wg.wait() + end_time := time.now() + if debug { + println(rb.stat()) + } + + // Validate results + mut total_consumed := 0 + for count in consumed_counts { + total_consumed += count + } + + if total_consumed != total_items { + eprintln('Error: Produced ${total_items} items but consumed ${total_consumed}') + } + + duration := end_time - start_time + if measure { + println('Completed ${total_items} items in ${duration.milliseconds()}ms') + } + + return duration, total_items +} + +// Producer thread implementation +fn producer_thread(mut rb lockfree.RingBuffer[int], id int, mut wg sync.WaitGroup, batch bool, debug bool) { + defer { + wg.done() + } + + // Generate items in producer-specific range + start := id * items_per_thread + end := start + items_per_thread + + if batch { + // Use batch pushing for better performance + batch_size := 32 + mut batch_buffer := []int{cap: batch_size} + + for i in start .. end { + batch_buffer << i + if batch_buffer.len == batch_size { + rb.push_many(batch_buffer) + batch_buffer.clear() + } + } + + // Push remaining items in final batch + if batch_buffer.len > 0 { + rb.push_many(batch_buffer) + } + } else { + for i in start .. end { + rb.push(i) + } + } +} + +// Consumer thread with fixed consumption target +fn consumer_thread(mut rb lockfree.RingBuffer[int], id int, items_to_consume int, mut consumed_counts []int, mut wg sync.WaitGroup, batch bool, debug bool) { + defer { + wg.done() + } + + if batch { + // Use batch consumption for better performance + batch_size := 32 + mut count := 0 + mut last_value := -1 + mut batch_buffer := []int{len: batch_size} // Reusable buffer + + for count < items_to_consume - batch_size { + // Consume batch using pop_many + rb.pop_many(mut batch_buffer) + count += batch_size + + // Debug output + if debug && count % 1000000 == 0 { + println('consume item count = ${count}') + } + + // Check sequence continuity + validate_batch_detailed(id, batch_buffer, last_value) + last_value = batch_buffer[batch_buffer.len - 1] + } + + remaining := items_to_consume - count + if remaining > 0 { + mut remaining_buffer := []int{len: remaining} + rb.pop_many(mut remaining_buffer) + count += remaining + + validate_batch_detailed(id, remaining_buffer, last_value) + } + } else { + for i in 0 .. items_to_consume { + _ := rb.pop() + // Debug output + if debug && i % 1000000 == 0 { + println('consume item count = ${i}') + } + } + } + consumed_counts[id] = items_to_consume +} + +fn validate_batch_detailed(id int, batch []int, prev_last int) bool { + if batch.len == 0 { + return true + } + + mut valid := true + + mut expected := batch[0] + 1 + for i in 1 .. batch.len { + if batch[i] != expected { + eprintln('[Thread ${id}] Sequence error: Position ${i} expected ${expected}, got ${batch[i]}') + valid = false + } + expected += 1 + } + + return valid +} + +// Print formatted performance results +fn print_results(results []RingBufferPerfResult) { + println('\nPerformance Results') + println('====================================================================') + println('Scenario\t\tThroughput (M ops/s)\tAvg Latency (ns)\tCPU Usage (%)') + println('--------------------------------------------------------------------') + + for res in results { + println('${res.scenario:20}\t${res.throughput:8.2f}\t\t\t${res.latency:8.2f}\t\t\t${res.cpu_usage:5.1f}') + } + println('====================================================================') + + // Find best performing scenario + mut best_throughput := 0.0 + mut best_scenario := '' + + for res in results { + if res.throughput > best_throughput { + best_throughput = res.throughput + best_scenario = res.scenario + } + } + + println('\nBest performance: ${best_scenario} with ${best_throughput:.2f} M ops/s') +} diff --git a/vlib/datatypes/lockfree/counter.v b/vlib/datatypes/lockfree/counter.v new file mode 100644 index 000000000..525e6f6c5 --- /dev/null +++ b/vlib/datatypes/lockfree/counter.v @@ -0,0 +1,63 @@ +module lockfree + +import sync.stdatomic + +// Counter is a thread-safe atomic counter supporting multiple integer types. +// It provides atomic increment, decrement, and value retrieval operations. +@[noinit] +struct Counter[T] { +mut: + value &stdatomic.AtomicVal[T] +} + +// new_counter creates a new atomic counter with the specified initial value. +// It only supports integer types (8-bit to 64-bit integers). +pub fn new_counter[T](init T) &Counter[T] { + // Compile-time type check: only integers are supported + $if T !is $int { + $compile_error('new_counter(): only integers are supported.') + } + return &Counter[T]{ + value: stdatomic.new_atomic[T](init) + } +} + +// increment_by atomically adds a delta value to the counter and returns +// the previous value before the operation (fetch-and-add). +@[inline] +pub fn (mut c Counter[T]) increment_by(delta T) T { + return c.value.add(delta) +} + +// increment atomically increments the counter by 1 and returns +// the previous value before the operation. +@[inline] +pub fn (mut c Counter[T]) increment() T { + return c.increment_by(T(1)) +} + +// decrement_by atomically subtracts a delta value from the counter and returns +// the previous value before the operation (fetch-and-sub). +@[inline] +pub fn (mut c Counter[T]) decrement_by(delta T) T { + return c.value.sub(delta) +} + +// decrement atomically decrements the counter by 1 and returns +// the previous value before the operation. +@[inline] +pub fn (mut c Counter[T]) decrement() T { + return c.decrement_by(T(1)) +} + +// get atomically retrieves the current value of the counter. +@[inline] +pub fn (mut c Counter[T]) get() T { + return c.value.load() +} + +// clear atomically resets the counter to zero. +@[inline] +pub fn (mut c Counter[T]) clear() { + c.value.store(0) +} diff --git a/vlib/datatypes/lockfree/counter_test.v b/vlib/datatypes/lockfree/counter_test.v new file mode 100644 index 000000000..7104ba78e --- /dev/null +++ b/vlib/datatypes/lockfree/counter_test.v @@ -0,0 +1,42 @@ +import sync +import datatypes.lockfree + +fn test_counter() { + number_threads := 10 + mut counter := lockfree.new_counter(u64(0)) + mut wg := sync.new_waitgroup() + for i in 0 .. number_threads { + wg.add(1) + spawn fn (mut c lockfree.Counter[u64], id int, mut wg sync.WaitGroup) { + for j in 0 .. 1000 { + c.increment() + } + wg.done() + }(mut counter, i, mut wg) + } + + for i in 0 .. number_threads { + wg.add(1) + spawn fn (mut c lockfree.Counter[u64], id int, mut wg sync.WaitGroup) { + for j in 0 .. 1000 { + c.decrement() + } + wg.done() + }(mut counter, i, mut wg) + } + + wg.wait() + assert counter.get() == u64(0) + + counter.increment_by(100) + assert counter.get() == u64(100) + counter.decrement_by(100) + assert counter.get() == u64(0) + + counter.increment_by(1024) + counter.clear() + assert counter.get() == u64(0) + + mut counter_init := lockfree.new_counter(u64(100)) + assert counter_init.get() == u64(100) +} diff --git a/vlib/datatypes/lockfree/lockfree.v b/vlib/datatypes/lockfree/lockfree.v new file mode 100644 index 000000000..4b0c31e08 --- /dev/null +++ b/vlib/datatypes/lockfree/lockfree.v @@ -0,0 +1,23 @@ +module lockfree + +import sync.stdatomic as _ + +// Define cache line size to prevent false sharing between CPU cores +const cache_line_size = 64 + +// next_power_of_two calculates the smallest power of two >= n +@[inline] +fn next_power_of_two(n u32) u32 { + if n == 0 { + return 1 + } + mut x := n - 1 + + // Efficient bit manipulation to find next power of two + x |= x >> 1 + x |= x >> 2 + x |= x >> 4 + x |= x >> 8 + x |= x >> 16 + return x + 1 +} diff --git a/vlib/datatypes/lockfree/ringbuffer.v b/vlib/datatypes/lockfree/ringbuffer.v new file mode 100644 index 000000000..114cd3dfa --- /dev/null +++ b/vlib/datatypes/lockfree/ringbuffer.v @@ -0,0 +1,593 @@ +module lockfree + +// This design is ported from the DPDK rte_ring library. +// Source: https://doc.dpdk.org/guides/prog_guide/ring_lib.html + +// RingBufferMode Operation modes for the ring buffer. +pub enum RingBufferMode { + spsc = 0 // Single Producer, Single Consumer (optimized for single-threaded access) + spmc = 1 // Single Producer, Multiple Consumers (one writer, multiple readers) + mpsc = 2 // Multiple Producers, Single Consumer (multiple writers, one reader) + mpmc = 3 // Multiple Producers, Multiple Consumers (default, fully concurrent) +} + +// RingBufferStat holds performance counters for ring buffer operations. +pub struct RingBufferStat { +pub mut: + push_full_count u32 // Times producers encountered full buffer + push_fail_count u32 // Times producers failed to reserve space + push_wait_prev_count u32 // Times producers waited for predecessors + push_waiting_count u32 // Current number of producers in waiting state + pop_empty_count u32 // Times consumers found empty buffer + pop_fail_count u32 // Times consumers failed to reserve items + pop_wait_prev_count u32 // Times consumers waited for predecessors + pop_waiting_count u32 // Current number of consumers in waiting state +} + +// RingBufferParam Configuration parameters for ring buffer creation. +// - max_waiting_prod_cons: Setting this to a larger value may improve performance, +// but in scenarios with many producers/consumers, it could lead to severe contention issues. +@[params] +pub struct RingBufferParam { +pub: + mode RingBufferMode = .mpmc // Default to most concurrent mode + max_waiting_prod_cons int = 1 // Max allowed waiting producers/consumers before rejecting operations +} + +// RingBuffer Lock-free multiple producer/multiple consumer ring buffer. +// Requires explicit initialization +@[noinit] +pub struct RingBuffer[T] { +mut: + mode u32 // Current operation mode (from RingBufferMode) + capacity u32 // Total capacity (always power of two) + mask u32 // Bitmask for index calculation (capacity - 1) + clear_flag u32 // Flag indicating clear operation in progress + max_waiting_prod_cons u32 // Max allowed waiting producers/consumers + pad0 [cache_line_size - 20]u8 // Padding to align to cache line boundary + + // Producer state (isolated to prevent false sharing) + prod_head u32 // Producer head (next write position) + pad1 [cache_line_size - 4]u8 // Cache line padding + prod_tail u32 // Producer tail (last committed position) + pad2 [cache_line_size - 4]u8 // Cache line padding + + // Consumer state (isolated to prevent false sharing) + cons_head u32 // Consumer head (next read position) + pad3 [cache_line_size - 4]u8 // Cache line padding + cons_tail u32 // Consumer tail (last committed position) + pad4 [cache_line_size - 4]u8 // Cache line padding + + // Data storage area + slots []T // Array holding actual data elements + + // Performance counters + push_full_count u32 // Count of full buffer encounters + push_fail_count u32 // Count of failed push attempts + push_wait_prev_count u32 // Count of waits for previous producers + push_waiting_count u32 // Current number of waiting producers + pop_empty_count u32 // Count of empty buffer encounters + pop_fail_count u32 // Count of failed pop attempts + pop_wait_prev_count u32 // Count of waits for previous consumers + pop_waiting_count u32 // Current number of waiting consumers +} + +// new_ringbuffer creates a new lock-free ring buffer. +// Note: The buffer capacity will be expanded to the next power of two +// for efficient modulo operations using bitwise AND. +// The actual capacity may be larger than the requested `size`. +pub fn new_ringbuffer[T](size u32, param RingBufferParam) &RingBuffer[T] { + // Ensure capacity is power of two for efficient modulo operations + capacity := next_power_of_two(size) + mask := capacity - 1 + + // Initialize data storage array + mut slots := []T{len: int(capacity)} + + rb := &RingBuffer[T]{ + mode: u32(param.mode) + max_waiting_prod_cons: u32(param.max_waiting_prod_cons) + capacity: capacity + mask: mask + slots: slots + } + + // Disable Valgrind checking for performance + $if valgrind ? { + C.VALGRIND_HG_DISABLE_CHECKING(rb, sizeof(RingBuffer[T])) + } + return rb +} + +// is_multiple_producer checks if current mode is multiple producer. +@[inline] +fn is_multiple_producer(mode u32) bool { + return mode & 0x02 != 0 +} + +// is_multiple_consumer checks if current mode is multiple consumer. +@[inline] +fn is_multiple_consumer(mode u32) bool { + return mode & 0x01 != 0 +} + +// try_push tries to push a single item non-blocking. +@[inline] +pub fn (mut rb RingBuffer[T]) try_push(item T) bool { + return rb.try_push_many([item]) == 1 +} + +// try_push_many tries to push multiple items non-blocking. +@[direct_array_access] +pub fn (mut rb RingBuffer[T]) try_push_many(items []T) u32 { + n := u32(items.len) + if n == 0 { + return 0 + } + + // Check if clear operation is in progress or too many producers are waiting + if C.atomic_load_u32(&rb.clear_flag) != 0 || (is_multiple_producer(rb.mode) + && C.atomic_load_u32(&rb.push_waiting_count) > rb.max_waiting_prod_cons) { + return 0 + } + + capacity := rb.capacity + mut success := false + mut attempts := 0 + mut old_head := u32(0) + mut new_head := u32(0) + + // Attempt to reserve space in the buffer + for !success && attempts < 10 { + old_head = C.atomic_load_u32(&rb.prod_head) + + // Memory barrier for weak memory models + $if !x64 && !x32 { + C.atomic_thread_fence(C.memory_order_acquire) + } + + // Calculate available space using unsigned arithmetic + free_entries := capacity + C.atomic_load_u32(&rb.cons_tail) - old_head + + // Check if there's enough space + if n > free_entries { + $if debug_ringbuffer ? { + C.atomic_fetch_add_u32(&rb.push_full_count, 1) + } + return 0 + } + + // Calculate new head position after adding items + new_head = old_head + n + if is_multiple_producer(rb.mode) { + // Atomic compare-and-swap for multiple producers + $if valgrind ? { + C.ANNOTATE_HAPPENS_BEFORE(&rb.prod_head) + } + success = C.atomic_compare_exchange_weak_u32(&rb.prod_head, &old_head, new_head) + $if valgrind ? { + C.ANNOTATE_HAPPENS_AFTER(&rb.prod_head) + } + } else { + // Direct update for single producer + rb.prod_head = new_head + success = true + } + attempts++ + } + + // Exit if space reservation failed + if !success { + $if debug_ringbuffer ? { + C.atomic_fetch_add_u32(&rb.push_fail_count, 1) + } + return 0 + } + + // Write data to the reserved slots + for i in 0 .. n { + index := (old_head + i) & rb.mask + $if valgrind ? { + C.VALGRIND_HG_DISABLE_CHECKING(&rb.slots[index], sizeof(T)) + C.ANNOTATE_HAPPENS_BEFORE(&rb.slots[index]) + } + rb.slots[index] = items[i] + $if valgrind ? { + C.ANNOTATE_HAPPENS_AFTER(&rb.slots[index]) + } + } + + mut add_once := true + mut backoff := 1 + if is_multiple_producer(rb.mode) { + // Increment waiting producer count + C.atomic_fetch_add_u32(&rb.push_waiting_count, 1) + + mut attempts_wait := 1 + // Wait for previous producers to complete their writes + for C.atomic_load_u32(&rb.prod_tail) != old_head { + // Exponential backoff to reduce CPU contention + for _ in 0 .. backoff { + C.cpu_relax() // Low-latency pause instruction + } + backoff = int_min(backoff * 2, 1024) + attempts_wait++ + $if debug_ringbuffer ? { + if attempts_wait > 100 && add_once { + C.atomic_fetch_add_u32(&rb.push_wait_prev_count, 1) + add_once = false + } + } + } + + // Decrement waiting producer count + C.atomic_fetch_sub_u32(&rb.push_waiting_count, 1) + } + + // Make data visible to consumers + $if valgrind ? { + C.ANNOTATE_HAPPENS_BEFORE(&rb.prod_tail) + } + C.atomic_store_u32(&rb.prod_tail, new_head) + $if valgrind ? { + C.ANNOTATE_HAPPENS_AFTER(&rb.prod_tail) + } + return n +} + +// try_pop tries to pop a single item non-blocking. +@[inline] +pub fn (mut rb RingBuffer[T]) try_pop() ?T { + mut items := []T{len: 1} + if rb.try_pop_many(mut items) == 1 { + return items[0] + } + return none // Buffer empty +} + +// try_pop_many tries to pop multiple items non-blocking. +@[direct_array_access] +pub fn (mut rb RingBuffer[T]) try_pop_many(mut items []T) u32 { + n := u32(items.len) + if n == 0 { + return 0 + } + + // Check if clear operation is in progress or too many consumers are waiting + if C.atomic_load_u32(&rb.clear_flag) != 0 || (is_multiple_consumer(rb.mode) + && C.atomic_load_u32(&rb.pop_waiting_count) > rb.max_waiting_prod_cons) { + return 0 + } + + mut success := false + mut attempts := 0 + mut old_head := u32(0) + mut new_head := u32(0) + + // Attempt to reserve data for reading + for !success && attempts < 10 { + old_head = C.atomic_load_u32(&rb.cons_head) + // Memory barrier for weak memory models + $if !x64 && !x32 { + C.atomic_thread_fence(C.memory_order_acquire) + } + + // Calculate available items to read + entries := C.atomic_load_u32(&rb.prod_tail) - old_head + + // Check if enough data is available + if n > entries { + $if debug_ringbuffer ? { + C.atomic_fetch_add_u32(&rb.pop_empty_count, 1) + } + return 0 + } + + // Calculate new head position after reading + new_head = old_head + n + if is_multiple_consumer(rb.mode) { + // Atomic compare-and-swap for multiple consumers + $if valgrind ? { + C.ANNOTATE_HAPPENS_BEFORE(&rb.cons_head) + } + success = C.atomic_compare_exchange_weak_u32(&rb.cons_head, &old_head, new_head) + $if valgrind ? { + C.ANNOTATE_HAPPENS_AFTER(&rb.cons_head) + } + } else { + // Direct update for single consumer + rb.cons_head = new_head + success = true + } + attempts++ + } + + // Exit if data reservation failed + if !success { + C.atomic_fetch_add_u32(&rb.pop_fail_count, 1) + return 0 + } + + // Read data from reserved slots + for i in 0 .. n { + index := (old_head + i) & rb.mask + $if valgrind ? { + C.ANNOTATE_HAPPENS_BEFORE(&rb.slots[index]) + } + items[i] = rb.slots[index] + $if valgrind ? { + C.ANNOTATE_HAPPENS_AFTER(&rb.slots[index]) + } + } + + mut add_once := true + mut backoff := 1 + // For multiple consumers: wait for previous consumers to complete + if is_multiple_consumer(rb.mode) { + // Increment waiting consumer count + C.atomic_fetch_add_u32(&rb.pop_waiting_count, 1) + + mut attempts_wait := 1 + // Wait for previous consumers to complete their reads + for C.atomic_load_u32(&rb.cons_tail) != old_head { + // Exponential backoff to reduce CPU contention + for _ in 0 .. backoff { + C.cpu_relax() // Low-latency pause instruction + } + backoff = int_min(backoff * 2, 1024) + attempts_wait++ + $if debug_ringbuffer ? { + if attempts_wait > 100 && add_once { + C.atomic_fetch_add_u32(&rb.pop_wait_prev_count, 1) + add_once = false + } + } + } + + // Decrement waiting consumer count + C.atomic_fetch_sub_u32(&rb.pop_waiting_count, 1) + } + + // Free up buffer space + $if valgrind ? { + C.ANNOTATE_HAPPENS_BEFORE(&rb.cons_tail) + } + C.atomic_store_u32(&rb.cons_tail, new_head) + $if valgrind ? { + C.ANNOTATE_HAPPENS_AFTER(&rb.cons_tail) + } + return n +} + +// push blocking push of a single item. +@[inline] +pub fn (mut rb RingBuffer[T]) push(item T) { + mut backoff := 1 + // Retry until successful + for { + if rb.try_push(item) { + return + } + // Exponential backoff to reduce contention + for _ in 0 .. backoff { + C.cpu_relax() // Pause before retry + } + backoff = int_min(backoff * 2, 1024) + } +} + +// pop blocking pop of a single item. +@[inline] +pub fn (mut rb RingBuffer[T]) pop() T { + mut backoff := 1 + // Retry until successful + for { + if item := rb.try_pop() { + return item + } + // Exponential backoff to reduce contention + for _ in 0 .. backoff { + C.cpu_relax() // Pause before retry + } + backoff = int_min(backoff * 2, 1024) + } + return T(0) // Default value (should never be reached) +} + +// push_many blocking push of multiple items. +@[inline] +pub fn (mut rb RingBuffer[T]) push_many(items []T) { + mut backoff := 1 + for { + n := rb.try_push_many(items) + if n == items.len { + break + } else { + // Exponential backoff when buffer is full + for _ in 0 .. backoff { + C.cpu_relax() // Pause when buffer is full + } + backoff = int_min(backoff * 2, 1024) + } + } +} + +// pop_many blocking pop of multiple items. +@[inline] +pub fn (mut rb RingBuffer[T]) pop_many(mut result []T) { + n := result.len + if n == 0 { + return + } + mut backoff := 1 + for { + ret := rb.try_pop_many(mut result) + if ret == n { + break + } else { + // Exponential backoff when buffer is empty + for _ in 0 .. backoff { + C.cpu_relax() // Pause when buffer is empty + } + backoff = int_min(backoff * 2, 1024) + } + } +} + +// is_empty checks if the buffer is empty. +@[inline] +pub fn (rb RingBuffer[T]) is_empty() bool { + return rb.occupied() == 0 +} + +// is_full checks if the buffer is full. +@[inline] +pub fn (rb RingBuffer[T]) is_full() bool { + return rb.occupied() >= rb.capacity +} + +// capacity returns the total capacity of the buffer. +@[inline] +pub fn (rb RingBuffer[T]) capacity() u32 { + return rb.capacity +} + +// occupied returns the number of occupied slots. +@[inline] +pub fn (rb RingBuffer[T]) occupied() u32 { + // Memory barrier for weak memory models + $if !x64 && !x32 { + C.atomic_thread_fence(C.memory_order_acquire) + } + + prod_tail := C.atomic_load_u32(&rb.prod_tail) + cons_tail := C.atomic_load_u32(&rb.cons_tail) + + // Handle potential overflow + used := if prod_tail >= cons_tail { + prod_tail - cons_tail + } else { + (max_u32 - cons_tail) + prod_tail + 1 + } + + return used +} + +// remaining returns the number of free slots. +@[inline] +pub fn (rb RingBuffer[T]) remaining() u32 { + return rb.capacity - rb.occupied() +} + +// clear clears the ring buffer and resets all pointers. +pub fn (mut rb RingBuffer[T]) clear() bool { + mut clear_flag := u32(0) + mut attempts := 0 + max_attempts := 1000 + + // Acquire clear flag using atomic CAS + for { + if C.atomic_compare_exchange_weak_u32(&rb.clear_flag, &clear_flag, 1) { + break + } + clear_flag = u32(0) + C.cpu_relax() + attempts++ + if attempts > max_attempts { + return false // Failed to acquire clear flag + } + } + + // Wait for producers to finish with exponential backoff + mut backoff := 1 + mut prod_wait := 0 + for { + prod_head := C.atomic_load_u32(&rb.prod_head) + prod_tail := C.atomic_load_u32(&rb.prod_tail) + if prod_head == prod_tail { + break + } + // Exponential backoff wait + for _ in 0 .. backoff { + C.cpu_relax() + } + backoff = int_min(backoff * 2, 1024) + + prod_wait++ + if prod_wait > max_attempts { + // Force advance producer tail + C.atomic_store_u32(&rb.prod_tail, prod_head) + break + } + } + + // Wait for consumers to finish with exponential backoff + backoff = 1 + mut cons_wait := 0 + for { + cons_head := C.atomic_load_u32(&rb.cons_head) + cons_tail := C.atomic_load_u32(&rb.cons_tail) + + if cons_head == cons_tail { + break + } + + // Exponential backoff wait + for _ in 0 .. backoff { + C.cpu_relax() + } + backoff = int_min(backoff * 2, 1024) + + cons_wait++ + if cons_wait > max_attempts { + // Force advance consumer tail + C.atomic_store_u32(&rb.cons_tail, cons_head) + break + } + } + + // Reset all pointers to zero + C.atomic_store_u32(&rb.prod_head, 0) + C.atomic_store_u32(&rb.prod_tail, 0) + C.atomic_store_u32(&rb.cons_head, 0) + C.atomic_store_u32(&rb.cons_tail, 0) + + C.atomic_store_u32(&rb.push_full_count, 0) + C.atomic_store_u32(&rb.push_fail_count, 0) + C.atomic_store_u32(&rb.push_wait_prev_count, 0) + C.atomic_store_u32(&rb.push_waiting_count, 0) + C.atomic_store_u32(&rb.pop_empty_count, 0) + C.atomic_store_u32(&rb.pop_fail_count, 0) + C.atomic_store_u32(&rb.pop_wait_prev_count, 0) + C.atomic_store_u32(&rb.pop_waiting_count, 0) + // Release clear flag + C.atomic_store_u32(&rb.clear_flag, 0) + return true // Clear operation successful +} + +// stat retrieves current performance statistics of the ring buffer. +// +// This method fetches all recorded operation counters: +// - push_full_count: Times producers encountered full buffer +// - push_fail_count: Times producers failed to reserve space +// - push_wait_prev_count: Times producers waited for predecessors +// - push_waiting_count: Current number of producers in waiting state +// - pop_empty_count: Times consumers found empty buffer +// - pop_fail_count: Times consumers failed to reserve items +// - pop_wait_prev_count: Times consumers waited for predecessors +// - pop_waiting_count: Current number of consumers in waiting state +pub fn (rb RingBuffer[T]) stat() RingBufferStat { + $if debug_ringbuffer ? { + return RingBufferStat{ + push_full_count: C.atomic_load_u32(&rb.push_full_count) + push_fail_count: C.atomic_load_u32(&rb.push_fail_count) + push_wait_prev_count: C.atomic_load_u32(&rb.push_wait_prev_count) + push_waiting_count: C.atomic_load_u32(&rb.push_waiting_count) + pop_empty_count: C.atomic_load_u32(&rb.pop_empty_count) + pop_fail_count: C.atomic_load_u32(&rb.pop_fail_count) + pop_wait_prev_count: C.atomic_load_u32(&rb.pop_wait_prev_count) + pop_waiting_count: C.atomic_load_u32(&rb.pop_waiting_count) + } + } + return RingBufferStat{} +} diff --git a/vlib/datatypes/lockfree/ringbuffer_test.v b/vlib/datatypes/lockfree/ringbuffer_test.v new file mode 100644 index 000000000..bffca6512 --- /dev/null +++ b/vlib/datatypes/lockfree/ringbuffer_test.v @@ -0,0 +1,373 @@ +import datatypes.lockfree +import time +import sync + +// Test basic push and pop operations +fn test_push_and_pop() { + mut r := lockfree.new_ringbuffer[int](2) + + r.push(3) + r.push(4) + + mut oldest_value := r.pop() + + assert oldest_value == 3 + + r.push(5) + + oldest_value = r.pop() + + assert oldest_value == 4 +} + +// Test clear functionality and empty state +fn test_clear_and_empty() { + mut r := lockfree.new_ringbuffer[int](4) + r.push(3) + r.push(4) + + oldest_value := r.pop() + assert oldest_value == 3 + + r.clear() + + assert r.is_empty() == true +} + +// Test capacity tracking and full state detection +fn test_capacity_and_is_full() { + mut r := lockfree.new_ringbuffer[int](4) + + assert r.capacity() == 4 + + r.push(3) + r.push(4) + r.push(5) + r.push(6) + + assert r.is_full() == true +} + +// Test occupied slots vs remaining capacity +fn test_occupied_and_remaining() { + mut r := lockfree.new_ringbuffer[int](4) + + r.push(3) + r.push(4) + + assert r.occupied() == r.remaining() +} + +// Test batch push/pop operations +fn test_push_and_pop_many() { + mut r := lockfree.new_ringbuffer[int](4) + a := [1, 2, 3, 4] + r.push_many(a) + + assert r.is_full() == true + + mut b := []int{len: 4} + r.pop_many(mut b) + + assert a == b +} + +// Test single producer single consumer mode +fn test_spsc_mode() { + println('===== Testing SPSC Mode =====') + mut rb := lockfree.new_ringbuffer[int](1024, mode: .spsc) + + // Basic push/pop functionality + assert rb.try_push(42) == true + assert rb.try_push(100) == true + assert rb.occupied() == 2 + + item1 := rb.try_pop() or { panic('Expected value') } + assert item1 == 42 + item2 := rb.try_pop() or { panic('Expected value') } + assert item2 == 100 + assert rb.is_empty() == true + + // Boundary capacity testing + for i in 0 .. 1024 { + assert rb.try_push(i) == true + } + assert rb.is_full() == true + assert rb.try_push(1024) == false + + for i in 0 .. 1024 { + item := rb.try_pop() or { panic('Expected value') } + assert item == i + } + assert rb.is_empty() == true + + println('SPSC basic tests passed') + + // Performance measurement + start := time.now() + mut producer := spawn fn (mut rb lockfree.RingBuffer[int]) { + for i in 0 .. 100000 { + rb.push(i) + } + }(mut rb) + + mut consumer := spawn fn (mut rb lockfree.RingBuffer[int]) { + for i in 0 .. 100000 { + item := rb.pop() + assert item == i + } + }(mut rb) + + producer.wait() + consumer.wait() + duration := time.since(start) + println('SPSC performance: ${duration} for 100k items') +} + +// Test single producer multiple consumers mode +fn test_spmc_mode() { + println('===== Testing SPMC Mode =====') + mut rb := lockfree.new_ringbuffer[int](1024, mode: .spmc) + mut wg := sync.new_waitgroup() + consumers := 4 + items_per_consumer := 25000 + total_items := consumers * items_per_consumer + + // Producer thread + spawn fn (mut rb lockfree.RingBuffer[int], total int) { + for i in 0 .. total { + rb.push(i) + } + }(mut rb, total_items) + + // Consumer threads + shared results := []int{cap: total_items} + + for i in 0 .. consumers { + wg.add(1) + spawn fn (id int, mut rb lockfree.RingBuffer[int], shared results []int, count int, mut wg sync.WaitGroup) { + for _ in 0 .. count { + item := rb.pop() + lock results { + results << item + } + } + wg.done() + }(i, mut rb, shared results, items_per_consumer, mut wg) + } + + wg.wait() + + // Result validation + lock results { + assert results.len == total_items + results.sort() + for i in 0 .. total_items { + assert results[i] == i + } + } + println('SPMC test passed with ${consumers} consumers') +} + +// Test multiple producers single consumer mode +fn test_mpsc_mode() { + println('===== Testing MPSC Mode =====') + mut rb := lockfree.new_ringbuffer[int](1024, mode: .mpsc) + mut wg := sync.new_waitgroup() + producers := 4 + items_per_producer := 25000 + total_items := producers * items_per_producer + + // Consumer thread + wg.add(1) + shared results := []int{cap: total_items} + spawn fn (mut rb lockfree.RingBuffer[int], shared results []int, total int, mut wg sync.WaitGroup) { + for _ in 0 .. total { + item := rb.pop() + lock results { + results << item + } + } + wg.done() + }(mut rb, shared results, total_items, mut wg) + + // Producer threads + for i in 0 .. producers { + wg.add(1) + spawn fn (mut rb lockfree.RingBuffer[int], start int, count int, mut wg sync.WaitGroup) { + for j in 0 .. count { + rb.push(start + j) + } + wg.done() + }(mut rb, i * items_per_producer, items_per_producer, mut wg) + } + + wg.wait() + + // Result validation + lock results { + assert results.len == total_items + results.sort() + for i in 0 .. total_items { + assert results[i] == i + } + } + println('MPSC test passed with ${producers} producers') +} + +// Test multiple producers multiple consumers mode +fn test_mpmc_mode() { + println('===== Testing MPMC Mode =====') + mut rb := lockfree.new_ringbuffer[int](1024, mode: .mpmc) + mut wg := sync.new_waitgroup() + producers := 4 + consumers := 4 + items_per_producer := 10000 + total_items := producers * items_per_producer + + // Result collection + shared results := []int{cap: total_items} + + // Producer threads + for i in 0 .. producers { + wg.add(1) + spawn fn (mut rb lockfree.RingBuffer[int], start int, count int, mut wg sync.WaitGroup) { + for j in 0 .. count { + rb.push(start + j) + } + wg.done() + }(mut rb, i * items_per_producer, items_per_producer, mut wg) + } + + // Consumer threads + for i in 0 .. consumers { + wg.add(1) + spawn fn (mut rb lockfree.RingBuffer[int], shared results []int, count int, mut wg sync.WaitGroup) { + for _ in 0 .. count { + item := rb.pop() + lock results { + results << item + } + } + wg.done() + }(mut rb, shared results, items_per_producer, mut wg) + } + + wg.wait() + + // Result validation + lock results { + assert results.len == total_items + results.sort() + for i in 0 .. total_items { + assert results[i] == i + } + } + println('MPMC test passed with ${producers} producers and ${consumers} consumers') +} + +// Test buffer clear functionality +fn test_clear_function() { + println('===== Testing Clear Function =====') + mut rb := lockfree.new_ringbuffer[int](1024, mode: .mpmc) + + // Fill buffer partially + for i in 0 .. 512 { + rb.push(i) + } + assert rb.occupied() == 512 + + // Clear buffer verification + assert rb.clear() == true + assert rb.is_empty() == true + assert rb.try_pop() == none + + // Concurrent clear test + mut wg := sync.new_waitgroup() + producers := 4 + items_per_producer := 1000 + + // Producer threads + for i in 0 .. producers { + wg.add(1) + spawn fn (mut rb lockfree.RingBuffer[int], id int, count int, mut wg sync.WaitGroup) { + for j in 0 .. count { + rb.push(id * 1000 + j) + } + wg.done() + }(mut rb, i, items_per_producer, mut wg) + } + + // Clear thread + spawn fn (mut rb lockfree.RingBuffer[int]) { + time.sleep(1 * time.millisecond) // Allow producers to start + for i in 0 .. 5 { + if rb.clear() { + println('Clear successful ${i}') + time.sleep(2 * time.millisecond) + } else { + println('Clear failed ${i}') + } + } + }(mut rb) + + wg.wait() + println('Clear function test passed') +} + +// Test edge case scenarios +fn test_edge_cases() { + println('===== Testing Edge Cases =====') + mut rb := lockfree.new_ringbuffer[int](4, mode: .spsc) + + // Empty buffer tests + assert rb.is_empty() == true + assert rb.try_pop() == none + assert rb.remaining() == 4 + + // Full buffer tests + assert rb.try_push(1) == true + assert rb.try_push(2) == true + assert rb.try_push(3) == true + assert rb.try_push(4) == true + assert rb.is_full() == true + assert rb.try_push(5) == false + assert rb.remaining() == 0 + + // Pop then push again + item := rb.try_pop() or { panic('Expected value') } + assert item == 1 + assert rb.try_push(5) == true + assert rb.is_full() == true + + // Clear and reuse + assert rb.clear() == true + assert rb.is_empty() == true + assert rb.try_push(10) == true + assert rb.try_pop() or { panic('Expected value') } == 10 + + println('Edge cases test passed') +} + +// Test batch operations functionality +fn test_batch_operations() { + println('===== Testing Batch Operations =====') + mut rb := lockfree.new_ringbuffer[int](1024, mode: .mpmc) + + // Batch push operation + items := []int{len: 100, init: index} + pushed := rb.try_push_many(items) + assert pushed == 100 + assert rb.occupied() == 100 + + // Batch pop operation + mut result := []int{len: 100} + popped := rb.try_pop_many(mut result) + assert popped == 100 + for i in 0 .. 100 { + assert result[i] == i + } + assert rb.is_empty() == true + + println('Batch operations test passed') +} -- 2.39.5