v2 / vlib / datatypes / lockfree / ringbuffer.v
593 lines · 526 sloc · 16.81 KB · c216e59bfc85a863749963938d7131121114afd4
Raw
1module lockfree
2
3// This design is ported from the DPDK rte_ring library.
4// Source: https://doc.dpdk.org/guides/prog_guide/ring_lib.html
5
6// RingBufferMode Operation modes for the ring buffer.
7pub enum RingBufferMode {
8 spsc = 0 // Single Producer, Single Consumer (optimized for single-threaded access)
9 spmc = 1 // Single Producer, Multiple Consumers (one writer, multiple readers)
10 mpsc = 2 // Multiple Producers, Single Consumer (multiple writers, one reader)
11 mpmc = 3 // Multiple Producers, Multiple Consumers (default, fully concurrent)
12}
13
14// RingBufferStat holds performance counters for ring buffer operations.
15pub struct RingBufferStat {
16pub mut:
17 push_full_count u32 // Times producers encountered full buffer
18 push_fail_count u32 // Times producers failed to reserve space
19 push_wait_prev_count u32 // Times producers waited for predecessors
20 push_waiting_count u32 // Current number of producers in waiting state
21 pop_empty_count u32 // Times consumers found empty buffer
22 pop_fail_count u32 // Times consumers failed to reserve items
23 pop_wait_prev_count u32 // Times consumers waited for predecessors
24 pop_waiting_count u32 // Current number of consumers in waiting state
25}
26
27// RingBufferParam Configuration parameters for ring buffer creation.
28// - max_waiting_prod_cons: Setting this to a larger value may improve performance,
29// but in scenarios with many producers/consumers, it could lead to severe contention issues.
30@[params]
31pub struct RingBufferParam {
32pub:
33 mode RingBufferMode = .mpmc // Default to most concurrent mode
34 max_waiting_prod_cons int = 1 // Max allowed waiting producers/consumers before rejecting operations
35}
36
37// RingBuffer Lock-free multiple producer/multiple consumer ring buffer.
38// Requires explicit initialization
39@[noinit]
40pub struct RingBuffer[T] {
41mut:
42 mode u32 // Current operation mode (from RingBufferMode)
43 capacity u32 // Total capacity (always power of two)
44 mask u32 // Bitmask for index calculation (capacity - 1)
45 clear_flag u32 // Flag indicating clear operation in progress
46 max_waiting_prod_cons u32 // Max allowed waiting producers/consumers
47 pad0 [cache_line_size - 20]u8 // Padding to align to cache line boundary
48
49 // Producer state (isolated to prevent false sharing)
50 prod_head u32 // Producer head (next write position)
51 pad1 [cache_line_size - 4]u8 // Cache line padding
52 prod_tail u32 // Producer tail (last committed position)
53 pad2 [cache_line_size - 4]u8 // Cache line padding
54
55 // Consumer state (isolated to prevent false sharing)
56 cons_head u32 // Consumer head (next read position)
57 pad3 [cache_line_size - 4]u8 // Cache line padding
58 cons_tail u32 // Consumer tail (last committed position)
59 pad4 [cache_line_size - 4]u8 // Cache line padding
60
61 // Data storage area
62 slots []T // Array holding actual data elements
63
64 // Performance counters
65 push_full_count u32 // Count of full buffer encounters
66 push_fail_count u32 // Count of failed push attempts
67 push_wait_prev_count u32 // Count of waits for previous producers
68 push_waiting_count u32 // Current number of waiting producers
69 pop_empty_count u32 // Count of empty buffer encounters
70 pop_fail_count u32 // Count of failed pop attempts
71 pop_wait_prev_count u32 // Count of waits for previous consumers
72 pop_waiting_count u32 // Current number of waiting consumers
73}
74
75// new_ringbuffer creates a new lock-free ring buffer.
76// Note: The buffer capacity will be expanded to the next power of two
77// for efficient modulo operations using bitwise AND.
78// The actual capacity may be larger than the requested `size`.
79pub fn new_ringbuffer[T](size u32, param RingBufferParam) &RingBuffer[T] {
80 // Ensure capacity is power of two for efficient modulo operations
81 capacity := next_power_of_two(size)
82 mask := capacity - 1
83
84 // Initialize data storage array
85 mut slots := []T{len: int(capacity)}
86
87 rb := &RingBuffer[T]{
88 mode: u32(param.mode)
89 max_waiting_prod_cons: u32(param.max_waiting_prod_cons)
90 capacity: capacity
91 mask: mask
92 slots: slots
93 }
94
95 // Disable Valgrind checking for performance
96 $if valgrind ? {
97 C.VALGRIND_HG_DISABLE_CHECKING(rb, sizeof(RingBuffer[T]))
98 }
99 return rb
100}
101
102// is_multiple_producer checks if current mode is multiple producer.
103@[inline]
104fn is_multiple_producer(mode u32) bool {
105 return mode & 0x02 != 0
106}
107
108// is_multiple_consumer checks if current mode is multiple consumer.
109@[inline]
110fn is_multiple_consumer(mode u32) bool {
111 return mode & 0x01 != 0
112}
113
114// try_push tries to push a single item non-blocking.
115@[inline]
116pub fn (mut rb RingBuffer[T]) try_push(item T) bool {
117 return rb.try_push_many([item]) == 1
118}
119
120// try_push_many tries to push multiple items non-blocking.
121@[direct_array_access]
122pub fn (mut rb RingBuffer[T]) try_push_many(items []T) u32 {
123 n := u32(items.len)
124 if n == 0 {
125 return 0
126 }
127
128 // Check if clear operation is in progress or too many producers are waiting
129 if C.atomic_load_u32(&rb.clear_flag) != 0 || (is_multiple_producer(rb.mode)
130 && C.atomic_load_u32(&rb.push_waiting_count) > rb.max_waiting_prod_cons) {
131 return 0
132 }
133
134 capacity := rb.capacity
135 mut success := false
136 mut attempts := 0
137 mut old_head := u32(0)
138 mut new_head := u32(0)
139
140 // Attempt to reserve space in the buffer
141 for !success && attempts < 10 {
142 old_head = C.atomic_load_u32(&rb.prod_head)
143
144 // Memory barrier for weak memory models
145 $if !x64 && !x32 {
146 C.atomic_thread_fence(C.memory_order_acquire)
147 }
148
149 // Calculate available space using unsigned arithmetic
150 free_entries := capacity + C.atomic_load_u32(&rb.cons_tail) - old_head
151
152 // Check if there's enough space
153 if n > free_entries {
154 $if debug_ringbuffer ? {
155 C.atomic_fetch_add_u32(&rb.push_full_count, 1)
156 }
157 return 0
158 }
159
160 // Calculate new head position after adding items
161 new_head = old_head + n
162 if is_multiple_producer(rb.mode) {
163 // Atomic compare-and-swap for multiple producers
164 $if valgrind ? {
165 C.ANNOTATE_HAPPENS_BEFORE(&rb.prod_head)
166 }
167 success = C.atomic_compare_exchange_weak_u32(&rb.prod_head, &old_head, new_head)
168 $if valgrind ? {
169 C.ANNOTATE_HAPPENS_AFTER(&rb.prod_head)
170 }
171 } else {
172 // Direct update for single producer
173 rb.prod_head = new_head
174 success = true
175 }
176 attempts++
177 }
178
179 // Exit if space reservation failed
180 if !success {
181 $if debug_ringbuffer ? {
182 C.atomic_fetch_add_u32(&rb.push_fail_count, 1)
183 }
184 return 0
185 }
186
187 // Write data to the reserved slots
188 for i in 0 .. n {
189 index := (old_head + i) & rb.mask
190 $if valgrind ? {
191 C.VALGRIND_HG_DISABLE_CHECKING(&rb.slots[index], sizeof(T))
192 C.ANNOTATE_HAPPENS_BEFORE(&rb.slots[index])
193 }
194 rb.slots[index] = items[i]
195 $if valgrind ? {
196 C.ANNOTATE_HAPPENS_AFTER(&rb.slots[index])
197 }
198 }
199
200 mut add_once := true
201 mut backoff := 1
202 if is_multiple_producer(rb.mode) {
203 // Increment waiting producer count
204 C.atomic_fetch_add_u32(&rb.push_waiting_count, 1)
205
206 mut attempts_wait := 1
207 // Wait for previous producers to complete their writes
208 for C.atomic_load_u32(&rb.prod_tail) != old_head {
209 // Exponential backoff to reduce CPU contention
210 for _ in 0 .. backoff {
211 C.cpu_relax() // Low-latency pause instruction
212 }
213 backoff = int_min(backoff * 2, 1024)
214 attempts_wait++
215 $if debug_ringbuffer ? {
216 if attempts_wait > 100 && add_once {
217 C.atomic_fetch_add_u32(&rb.push_wait_prev_count, 1)
218 add_once = false
219 }
220 }
221 }
222
223 // Decrement waiting producer count
224 C.atomic_fetch_sub_u32(&rb.push_waiting_count, 1)
225 }
226
227 // Make data visible to consumers
228 $if valgrind ? {
229 C.ANNOTATE_HAPPENS_BEFORE(&rb.prod_tail)
230 }
231 C.atomic_store_u32(&rb.prod_tail, new_head)
232 $if valgrind ? {
233 C.ANNOTATE_HAPPENS_AFTER(&rb.prod_tail)
234 }
235 return n
236}
237
238// try_pop tries to pop a single item non-blocking.
239@[inline]
240pub fn (mut rb RingBuffer[T]) try_pop() ?T {
241 mut items := []T{len: 1}
242 if rb.try_pop_many(mut items) == 1 {
243 return items[0]
244 }
245 return none // Buffer empty
246}
247
248// try_pop_many tries to pop multiple items non-blocking.
249@[direct_array_access]
250pub fn (mut rb RingBuffer[T]) try_pop_many(mut items []T) u32 {
251 n := u32(items.len)
252 if n == 0 {
253 return 0
254 }
255
256 // Check if clear operation is in progress or too many consumers are waiting
257 if C.atomic_load_u32(&rb.clear_flag) != 0 || (is_multiple_consumer(rb.mode)
258 && C.atomic_load_u32(&rb.pop_waiting_count) > rb.max_waiting_prod_cons) {
259 return 0
260 }
261
262 mut success := false
263 mut attempts := 0
264 mut old_head := u32(0)
265 mut new_head := u32(0)
266
267 // Attempt to reserve data for reading
268 for !success && attempts < 10 {
269 old_head = C.atomic_load_u32(&rb.cons_head)
270 // Memory barrier for weak memory models
271 $if !x64 && !x32 {
272 C.atomic_thread_fence(C.memory_order_acquire)
273 }
274
275 // Calculate available items to read
276 entries := C.atomic_load_u32(&rb.prod_tail) - old_head
277
278 // Check if enough data is available
279 if n > entries {
280 $if debug_ringbuffer ? {
281 C.atomic_fetch_add_u32(&rb.pop_empty_count, 1)
282 }
283 return 0
284 }
285
286 // Calculate new head position after reading
287 new_head = old_head + n
288 if is_multiple_consumer(rb.mode) {
289 // Atomic compare-and-swap for multiple consumers
290 $if valgrind ? {
291 C.ANNOTATE_HAPPENS_BEFORE(&rb.cons_head)
292 }
293 success = C.atomic_compare_exchange_weak_u32(&rb.cons_head, &old_head, new_head)
294 $if valgrind ? {
295 C.ANNOTATE_HAPPENS_AFTER(&rb.cons_head)
296 }
297 } else {
298 // Direct update for single consumer
299 rb.cons_head = new_head
300 success = true
301 }
302 attempts++
303 }
304
305 // Exit if data reservation failed
306 if !success {
307 C.atomic_fetch_add_u32(&rb.pop_fail_count, 1)
308 return 0
309 }
310
311 // Read data from reserved slots
312 for i in 0 .. n {
313 index := (old_head + i) & rb.mask
314 $if valgrind ? {
315 C.ANNOTATE_HAPPENS_BEFORE(&rb.slots[index])
316 }
317 items[i] = rb.slots[index]
318 $if valgrind ? {
319 C.ANNOTATE_HAPPENS_AFTER(&rb.slots[index])
320 }
321 }
322
323 mut add_once := true
324 mut backoff := 1
325 // For multiple consumers: wait for previous consumers to complete
326 if is_multiple_consumer(rb.mode) {
327 // Increment waiting consumer count
328 C.atomic_fetch_add_u32(&rb.pop_waiting_count, 1)
329
330 mut attempts_wait := 1
331 // Wait for previous consumers to complete their reads
332 for C.atomic_load_u32(&rb.cons_tail) != old_head {
333 // Exponential backoff to reduce CPU contention
334 for _ in 0 .. backoff {
335 C.cpu_relax() // Low-latency pause instruction
336 }
337 backoff = int_min(backoff * 2, 1024)
338 attempts_wait++
339 $if debug_ringbuffer ? {
340 if attempts_wait > 100 && add_once {
341 C.atomic_fetch_add_u32(&rb.pop_wait_prev_count, 1)
342 add_once = false
343 }
344 }
345 }
346
347 // Decrement waiting consumer count
348 C.atomic_fetch_sub_u32(&rb.pop_waiting_count, 1)
349 }
350
351 // Free up buffer space
352 $if valgrind ? {
353 C.ANNOTATE_HAPPENS_BEFORE(&rb.cons_tail)
354 }
355 C.atomic_store_u32(&rb.cons_tail, new_head)
356 $if valgrind ? {
357 C.ANNOTATE_HAPPENS_AFTER(&rb.cons_tail)
358 }
359 return n
360}
361
362// push blocking push of a single item.
363@[inline]
364pub fn (mut rb RingBuffer[T]) push(item T) {
365 mut backoff := 1
366 // Retry until successful
367 for {
368 if rb.try_push(item) {
369 return
370 }
371 // Exponential backoff to reduce contention
372 for _ in 0 .. backoff {
373 C.cpu_relax() // Pause before retry
374 }
375 backoff = int_min(backoff * 2, 1024)
376 }
377}
378
379// pop blocking pop of a single item.
380@[inline]
381pub fn (mut rb RingBuffer[T]) pop() T {
382 mut backoff := 1
383 // Retry until successful
384 for {
385 if item := rb.try_pop() {
386 return item
387 }
388 // Exponential backoff to reduce contention
389 for _ in 0 .. backoff {
390 C.cpu_relax() // Pause before retry
391 }
392 backoff = int_min(backoff * 2, 1024)
393 }
394 return T(0) // Default value (should never be reached)
395}
396
397// push_many blocking push of multiple items.
398@[inline]
399pub fn (mut rb RingBuffer[T]) push_many(items []T) {
400 mut backoff := 1
401 for {
402 n := rb.try_push_many(items)
403 if n == items.len {
404 break
405 } else {
406 // Exponential backoff when buffer is full
407 for _ in 0 .. backoff {
408 C.cpu_relax() // Pause when buffer is full
409 }
410 backoff = int_min(backoff * 2, 1024)
411 }
412 }
413}
414
415// pop_many blocking pop of multiple items.
416@[inline]
417pub fn (mut rb RingBuffer[T]) pop_many(mut result []T) {
418 n := result.len
419 if n == 0 {
420 return
421 }
422 mut backoff := 1
423 for {
424 ret := rb.try_pop_many(mut result)
425 if ret == n {
426 break
427 } else {
428 // Exponential backoff when buffer is empty
429 for _ in 0 .. backoff {
430 C.cpu_relax() // Pause when buffer is empty
431 }
432 backoff = int_min(backoff * 2, 1024)
433 }
434 }
435}
436
437// is_empty checks if the buffer is empty.
438@[inline]
439pub fn (rb RingBuffer[T]) is_empty() bool {
440 return rb.occupied() == 0
441}
442
443// is_full checks if the buffer is full.
444@[inline]
445pub fn (rb RingBuffer[T]) is_full() bool {
446 return rb.occupied() >= rb.capacity
447}
448
449// capacity returns the total capacity of the buffer.
450@[inline]
451pub fn (rb RingBuffer[T]) capacity() u32 {
452 return rb.capacity
453}
454
455// occupied returns the number of occupied slots.
456@[inline]
457pub fn (rb RingBuffer[T]) occupied() u32 {
458 // Memory barrier for weak memory models
459 $if !x64 && !x32 {
460 C.atomic_thread_fence(C.memory_order_acquire)
461 }
462
463 prod_tail := C.atomic_load_u32(&rb.prod_tail)
464 cons_tail := C.atomic_load_u32(&rb.cons_tail)
465
466 // Handle potential overflow
467 used := if prod_tail >= cons_tail {
468 prod_tail - cons_tail
469 } else {
470 (max_u32 - cons_tail) + prod_tail + 1
471 }
472
473 return used
474}
475
476// remaining returns the number of free slots.
477@[inline]
478pub fn (rb RingBuffer[T]) remaining() u32 {
479 return rb.capacity - rb.occupied()
480}
481
482// clear clears the ring buffer and resets all pointers.
483pub fn (mut rb RingBuffer[T]) clear() bool {
484 mut clear_flag := u32(0)
485 mut attempts := 0
486 max_attempts := 1000
487
488 // Acquire clear flag using atomic CAS
489 for {
490 if C.atomic_compare_exchange_weak_u32(&rb.clear_flag, &clear_flag, 1) {
491 break
492 }
493 clear_flag = u32(0)
494 C.cpu_relax()
495 attempts++
496 if attempts > max_attempts {
497 return false // Failed to acquire clear flag
498 }
499 }
500
501 // Wait for producers to finish with exponential backoff
502 mut backoff := 1
503 mut prod_wait := 0
504 for {
505 prod_head := C.atomic_load_u32(&rb.prod_head)
506 prod_tail := C.atomic_load_u32(&rb.prod_tail)
507 if prod_head == prod_tail {
508 break
509 }
510 // Exponential backoff wait
511 for _ in 0 .. backoff {
512 C.cpu_relax()
513 }
514 backoff = int_min(backoff * 2, 1024)
515
516 prod_wait++
517 if prod_wait > max_attempts {
518 // Force advance producer tail
519 C.atomic_store_u32(&rb.prod_tail, prod_head)
520 break
521 }
522 }
523
524 // Wait for consumers to finish with exponential backoff
525 backoff = 1
526 mut cons_wait := 0
527 for {
528 cons_head := C.atomic_load_u32(&rb.cons_head)
529 cons_tail := C.atomic_load_u32(&rb.cons_tail)
530
531 if cons_head == cons_tail {
532 break
533 }
534
535 // Exponential backoff wait
536 for _ in 0 .. backoff {
537 C.cpu_relax()
538 }
539 backoff = int_min(backoff * 2, 1024)
540
541 cons_wait++
542 if cons_wait > max_attempts {
543 // Force advance consumer tail
544 C.atomic_store_u32(&rb.cons_tail, cons_head)
545 break
546 }
547 }
548
549 // Reset all pointers to zero
550 C.atomic_store_u32(&rb.prod_head, 0)
551 C.atomic_store_u32(&rb.prod_tail, 0)
552 C.atomic_store_u32(&rb.cons_head, 0)
553 C.atomic_store_u32(&rb.cons_tail, 0)
554
555 C.atomic_store_u32(&rb.push_full_count, 0)
556 C.atomic_store_u32(&rb.push_fail_count, 0)
557 C.atomic_store_u32(&rb.push_wait_prev_count, 0)
558 C.atomic_store_u32(&rb.push_waiting_count, 0)
559 C.atomic_store_u32(&rb.pop_empty_count, 0)
560 C.atomic_store_u32(&rb.pop_fail_count, 0)
561 C.atomic_store_u32(&rb.pop_wait_prev_count, 0)
562 C.atomic_store_u32(&rb.pop_waiting_count, 0)
563 // Release clear flag
564 C.atomic_store_u32(&rb.clear_flag, 0)
565 return true // Clear operation successful
566}
567
568// stat retrieves current performance statistics of the ring buffer.
569//
570// This method fetches all recorded operation counters:
571// - push_full_count: Times producers encountered full buffer
572// - push_fail_count: Times producers failed to reserve space
573// - push_wait_prev_count: Times producers waited for predecessors
574// - push_waiting_count: Current number of producers in waiting state
575// - pop_empty_count: Times consumers found empty buffer
576// - pop_fail_count: Times consumers failed to reserve items
577// - pop_wait_prev_count: Times consumers waited for predecessors
578// - pop_waiting_count: Current number of consumers in waiting state
579pub fn (rb RingBuffer[T]) stat() RingBufferStat {
580 $if debug_ringbuffer ? {
581 return RingBufferStat{
582 push_full_count: C.atomic_load_u32(&rb.push_full_count)
583 push_fail_count: C.atomic_load_u32(&rb.push_fail_count)
584 push_wait_prev_count: C.atomic_load_u32(&rb.push_wait_prev_count)
585 push_waiting_count: C.atomic_load_u32(&rb.push_waiting_count)
586 pop_empty_count: C.atomic_load_u32(&rb.pop_empty_count)
587 pop_fail_count: C.atomic_load_u32(&rb.pop_fail_count)
588 pop_wait_prev_count: C.atomic_load_u32(&rb.pop_wait_prev_count)
589 pop_waiting_count: C.atomic_load_u32(&rb.pop_waiting_count)
590 }
591 }
592 return RingBufferStat{}
593}
594