From 9edfec415743e272f44bb076a6513a82735ff776 Mon Sep 17 00:00:00 2001 From: Alexander Medvednikov Date: Wed, 15 Apr 2026 16:00:02 +0300 Subject: [PATCH] sync: fix read from closed channel behavior as in Go (fixes #26796) --- vlib/sync/channel_select_3_test.v | 17 +++++++++++ vlib/sync/channels.c.v | 50 ++++++++++++++++++++++++------- vlib/v/gen/c/cgen.v | 2 +- vlib/v/markused/walker.v | 1 + 4 files changed, 59 insertions(+), 11 deletions(-) diff --git a/vlib/sync/channel_select_3_test.v b/vlib/sync/channel_select_3_test.v index ef2b8eeaa..6b7609c3a 100644 --- a/vlib/sync/channel_select_3_test.v +++ b/vlib/sync/channel_select_3_test.v @@ -121,6 +121,23 @@ fn test_select_blocks() { assert is_open == false } +fn test_select_closed_receive_beats_timeout() { + ch := chan bool{} + ch.close() + mut got := true + mut timed_out := false + select { + got = <-ch { + assert got == false + } + 10 * time.millisecond { + timed_out = true + } + } + assert got == false + assert timed_out == false +} + fn test_select_else_skips_closed_buffered_receive() { ch := chan int{cap: 1} ch <- 1 diff --git a/vlib/sync/channels.c.v b/vlib/sync/channels.c.v index f944f4f5e..dbf7f59c2 100644 --- a/vlib/sync/channels.c.v +++ b/vlib/sync/channels.c.v @@ -547,20 +547,44 @@ fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) ChanState { return .success } -// Wait `timeout` on any of `channels[i]` until one of them can push (`is_push[i] = true`) or pop (`is_push[i] = false`) -// object referenced by `objrefs[i]`. `timeout = time.infinite` means wait unlimited time. `timeout <= 0` means return -// immediately if no transaction can be performed without waiting. -// return value: the index of the channel on which a transaction has taken place -// -1 if waiting for a transaction has exceeded timeout -// -2 if all channels are closed - +// channel_select waits `timeout` on any of `channels[i]` until one of them can +// push (`dir[i] == .push`) or pop (`dir[i] == .pop`) the object referenced by +// `objrefs[i]`. `timeout = time.infinite` means wait unlimited time. +// `timeout <= 0` means return immediately if no transaction can be performed +// without waiting. It returns the selected channel index, `-1` on timeout, and +// `-2` when all channels are closed. pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []voidptr, timeout time.Duration) int { skip_closed_pop := timeout < 0 actual_timeout := if skip_closed_pop { time.Duration(0) } else { timeout } - return channel_select_priv(mut channels, dir, mut objrefs, actual_timeout, skip_closed_pop) + closed_pop_mode := if skip_closed_pop { + SelectClosedPopMode.skip + } else { + SelectClosedPopMode.closed + } + return channel_select_priv(mut channels, dir, mut objrefs, actual_timeout, closed_pop_mode) +} + +enum SelectClosedPopMode { + closed + ready + skip } -fn channel_select_priv(mut channels []&Channel, dir []Direction, mut objrefs []voidptr, timeout time.Duration, skip_closed_pop bool) int { +// channel_select_lang is used by the language `select` implementation. +// Closed receive cases stay selectable for blocking/timed selects to match +// plain `<-ch` semantics, while `select ... else` still skips them. +fn channel_select_lang(mut channels []&Channel, dir []Direction, mut objrefs []voidptr, timeout time.Duration) int { + skip_closed_pop := timeout < 0 + actual_timeout := if skip_closed_pop { time.Duration(0) } else { timeout } + closed_pop_mode := if skip_closed_pop { + SelectClosedPopMode.skip + } else { + SelectClosedPopMode.ready + } + return channel_select_priv(mut channels, dir, mut objrefs, actual_timeout, closed_pop_mode) +} + +fn channel_select_priv(mut channels []&Channel, dir []Direction, mut objrefs []voidptr, timeout time.Duration, closed_pop_mode SelectClosedPopMode) int { $if debug_channels ? { assert channels.len == dir.len assert dir.len == objrefs.len @@ -602,7 +626,7 @@ fn channel_select_priv(mut channels []&Channel, dir []Direction, mut objrefs []v } stat := if dir[i] == .push { channels[i].try_push_priv(objrefs[i], true) - } else if skip_closed_pop { + } else if closed_pop_mode == .skip { channels[i].try_pop_select_priv(objrefs[i]) } else { channels[i].try_pop_priv(objrefs[i], true) @@ -610,6 +634,12 @@ fn channel_select_priv(mut channels []&Channel, dir []Direction, mut objrefs []v if stat == .success { event_idx = i break outer + } else if stat == .closed && dir[i] == .pop && closed_pop_mode == .ready { + unsafe { + C.memset(objrefs[i], 0, channels[i].objsize) + } + event_idx = i + break outer } else if stat == .closed { num_closed++ } diff --git a/vlib/v/gen/c/cgen.v b/vlib/v/gen/c/cgen.v index 68cf40e11..717eeec5f 100644 --- a/vlib/v/gen/c/cgen.v +++ b/vlib/v/gen/c/cgen.v @@ -7306,7 +7306,7 @@ fn (mut g Gen) select_expr(node ast.SelectExpr) { g.writeln('}));\n') } select_result := g.new_tmp_var() - g.write('int ${select_result} = sync__channel_select(&${chan_array}, ${directions_array}, &${objs_array}, ') + g.write('int ${select_result} = sync__channel_select_lang(&${chan_array}, ${directions_array}, &${objs_array}, ') if has_timeout { g.expr(timeout_expr) } else if has_else { diff --git a/vlib/v/markused/walker.v b/vlib/v/markused/walker.v index ad7ba5dfe..7fbddfd9b 100644 --- a/vlib/v/markused/walker.v +++ b/vlib/v/markused/walker.v @@ -2403,6 +2403,7 @@ fn (mut w Walker) mark_resource_dependencies() { if w.uses_channel { w.fn_by_name('sync.new_channel_st') w.fn_by_name('sync.channel_select') + w.fn_by_name('sync.channel_select_lang') } if w.uses_lock { w.mark_by_sym_name('sync.RwMutex') -- 2.39.5