From 984337de5dd73f6f90b3efd718f82f3a722dceee Mon Sep 17 00:00:00 2001 From: Alexander Medvednikov Date: Sun, 24 May 2026 17:04:56 +0300 Subject: [PATCH] fasthttp,veb: fix sendfile() failing on large static files (fix #27080) (#27223) --- vlib/fasthttp/fasthttp_linux.v | 477 +++++++++++++++++++++------- vlib/veb/tests/large_payload_test.v | 17 + 2 files changed, 372 insertions(+), 122 deletions(-) diff --git a/vlib/fasthttp/fasthttp_linux.v b/vlib/fasthttp/fasthttp_linux.v index 9655204e4..e5448eff8 100644 --- a/vlib/fasthttp/fasthttp_linux.v +++ b/vlib/fasthttp/fasthttp_linux.v @@ -98,6 +98,38 @@ fn set_blocking(fd int, blocking bool) { } } +// ClientWriteState carries the pending response bytes plus optional file data +// for a connection that could not be drained in a single non-blocking write. +// It lives in `client_write_states` until both the in-memory content and the +// file body have been fully transferred. While a state exists, the connection +// is also armed for EPOLLOUT so process_events can resume the write without +// blocking the worker. +struct ClientWriteState { +mut: + content []u8 + content_pos int + content_owned bool + file_fd int = -1 + file_len i64 + file_pos i64 + should_close bool + request_arena voidptr + request_active bool + start_ns i64 + // epollout_armed records whether we actually had to register EPOLLOUT for + // this connection. When set, complete_write performs a DEL+ADD on the fd to + // re-deliver any pipelined request bytes that arrived during the write as a + // fresh EPOLLIN edge (level-triggered semantics are not used here). + epollout_armed bool +} + +// drain_status describes the outcome of one try_drain_write() pass. +enum DrainStatus { + done // all bytes (content + file) have been sent + blocked // the kernel send buffer is full; come back on EPOLLOUT + failed // unrecoverable error or peer closed mid-transfer +} + fn close_socket(fd int) bool { ret := C.close(fd) if ret == -1 { @@ -205,7 +237,31 @@ fn handle_accept_loop(epoll_fd int, listen_fd int, mut client_fds map[int]bool) } } -fn handle_client_closure(epoll_fd int, client_fd int, mut client_fds map[int]bool, mut client_buffers map[int][]u8, mut client_read_starts map[int]i64, mut closing_client_fds map[int]bool) { +fn free_write_state(server &Server, client_fd int, mut client_write_states map[int]&ClientWriteState) { + mut state := client_write_states[client_fd] or { return } + if state.content_owned && state.content.cap > 0 { + unsafe { state.content.free() } + } + state.content = []u8{} + if state.file_fd != -1 { + C.close(state.file_fd) + state.file_fd = -1 + } + $if prealloc { + if state.request_arena != unsafe { nil } { + unsafe { prealloc_scope_free_after(state.request_arena) } + } + } + state.request_arena = unsafe { nil } + if state.request_active { + server.end_request() + state.request_active = false + } + client_write_states.delete(client_fd) + unsafe { free(state) } +} + +fn handle_client_closure(server &Server, epoll_fd int, client_fd int, mut client_fds map[int]bool, mut client_buffers map[int][]u8, mut client_read_starts map[int]i64, mut closing_client_fds map[int]bool, mut client_write_states map[int]&ClientWriteState) { // Never close the listening socket here if client_fd == 0 { return @@ -218,18 +274,19 @@ fn handle_client_closure(epoll_fd int, client_fd int, mut client_fds map[int]boo client_buffers.delete(client_fd) client_read_starts.delete(client_fd) closing_client_fds.delete(client_fd) + free_write_state(server, client_fd, mut client_write_states) remove_fd_from_epoll(epoll_fd, client_fd) close_socket(client_fd) } -fn close_worker_clients(epoll_fd int, mut client_fds map[int]bool, mut client_buffers map[int][]u8, mut client_read_starts map[int]i64, mut closing_client_fds map[int]bool) { +fn close_worker_clients(server &Server, epoll_fd int, mut client_fds map[int]bool, mut client_buffers map[int][]u8, mut client_read_starts map[int]i64, mut closing_client_fds map[int]bool, mut client_write_states map[int]&ClientWriteState) { for client_fd in client_fds.keys() { - handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut - client_read_starts, mut closing_client_fds) + handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds, mut client_write_states) } } -fn drain_closing_client(epoll_fd int, client_fd int, mut client_fds map[int]bool, mut client_buffers map[int][]u8, mut client_read_starts map[int]i64, mut closing_client_fds map[int]bool) { +fn drain_closing_client(server &Server, epoll_fd int, client_fd int, mut client_fds map[int]bool, mut client_buffers map[int][]u8, mut client_read_starts map[int]i64, mut closing_client_fds map[int]bool, mut client_write_states map[int]&ClientWriteState) { mut drain_buf := []u8{len: 4096} for { bytes_read := C.recv(client_fd, unsafe { &drain_buf[0] }, drain_buf.len, 0) @@ -237,13 +294,13 @@ fn drain_closing_client(epoll_fd int, client_fd int, mut client_fds map[int]bool if C.errno == C.EAGAIN || C.errno == C.EWOULDBLOCK { return } - handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut - client_read_starts, mut closing_client_fds) + handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds, mut client_write_states) return } if bytes_read == 0 { - handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut - client_read_starts, mut closing_client_fds) + handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds, mut client_write_states) return } } @@ -257,23 +314,175 @@ fn send_terminal_response_and_drain(client_fd int, response []u8, mut client_buf closing_client_fds[client_fd] = true } -fn process_request(server &Server, epoll_fd int, client_fd int, request_buffer []u8, mut client_fds map[int]bool, mut client_buffers map[int][]u8, mut client_read_starts map[int]i64, mut closing_client_fds map[int]bool) { +// try_drain_write attempts to push the remaining bytes of `state.content` and +// (if present) `state.file_fd` to the socket in non-blocking mode. It returns +// `.done` once all data has been transferred, `.blocked` when the kernel send +// buffer is full (EAGAIN/EWOULDBLOCK), or `.failed` on any unrecoverable error. +// The caller is responsible for parking the state until EPOLLOUT fires (when +// `.blocked` is returned) and for cleaning up via `free_write_state` once +// `.done` or `.failed` is reached. +fn try_drain_write(client_fd int, mut state ClientWriteState) DrainStatus { + // 1. Drain in-memory content (response headers + possibly a body). + for state.content_pos < state.content.len { + remaining := state.content.len - state.content_pos + sent := C.send(client_fd, unsafe { &state.content[state.content_pos] }, remaining, + C.MSG_NOSIGNAL) + if sent > 0 { + state.content_pos += sent + continue + } + if sent < 0 { + errno_val := C.errno + if errno_val == C.EAGAIN || errno_val == C.EWOULDBLOCK { + return .blocked + } + if errno_val == C.EINTR { + continue + } + eprintln('ERROR: send() failed with errno=${errno_val}') + return .failed + } + // send() returning 0 on a non-blocking TCP socket is unusual; treat it as + // a failure rather than spin. + return .failed + } + + // 2. Drain the optional file body via sendfile(2). sendfile updates the + // offset pointer in place, so `state.file_pos` advances automatically. + if state.file_fd != -1 { + for state.file_pos < state.file_len { + remaining := state.file_len - state.file_pos + ssize := C.sendfile(client_fd, state.file_fd, &state.file_pos, usize(remaining)) + if ssize > 0 { + continue + } + if ssize == 0 { + // Short transfer: the file shrank between fstat() and now, or the + // peer closed. We have already promised the full Content-Length, + // so the response is now truncated -- closing the connection is + // the only way to keep keep-alive clients in sync. + eprintln('ERROR: sendfile() returned 0 with ${remaining} bytes still pending; closing connection to avoid desync') + return .failed + } + errno_val := C.errno + if errno_val == C.EAGAIN || errno_val == C.EWOULDBLOCK { + return .blocked + } + if errno_val == C.EINTR { + continue + } + match errno_val { + C.EBADF { + eprintln('ERROR: sendfile() EBADF: input fd or socket not open for required access (errno=${errno_val})') + } + C.EFAULT { + eprintln('ERROR: sendfile() EFAULT: bad address for offset (errno=${errno_val})') + } + C.EINVAL { + eprintln('ERROR: sendfile() EINVAL: invalid descriptor state or non-seekable input (errno=${errno_val})') + } + C.EIO { + eprintln('ERROR: sendfile() EIO: I/O error while reading input file (errno=${errno_val})') + } + C.ENOMEM { + eprintln('ERROR: sendfile() ENOMEM: insufficient kernel memory (errno=${errno_val})') + } + C.EOVERFLOW { + eprintln('ERROR: sendfile() EOVERFLOW: count exceeds file/socket limits (errno=${errno_val})') + } + C.ESPIPE { + eprintln('ERROR: sendfile() ESPIPE: input file not seekable with offset (errno=${errno_val})') + } + else { + eprintln('ERROR: sendfile() failed with errno=${errno_val}') + } + } + return .failed + } + // Done with the file -- close the fd eagerly so keep-alive doesn't hold it open. + C.close(state.file_fd) + state.file_fd = -1 + } + + return .done +} + +// arm_epollout switches the connection's epoll mask from EPOLLIN|EPOLLET to +// EPOLLIN|EPOLLOUT|EPOLLET so the next round of pending bytes is delivered as +// an EPOLLOUT event instead of blocking the worker. +fn arm_epollout(epoll_fd int, client_fd int) int { + mut ev := C.epoll_event{ + events: u32(C.EPOLLIN | C.EPOLLOUT | C.EPOLLET) + } + ev.data.fd = client_fd + return C.epoll_ctl(epoll_fd, C.EPOLL_CTL_MOD, client_fd, &ev) +} + +// complete_write tears down the per-fd write state after a successful drain, +// then either closes the connection or leaves it in keep-alive mode. +fn complete_write(server &Server, epoll_fd int, client_fd int, mut client_fds map[int]bool, mut client_buffers map[int][]u8, mut client_read_starts map[int]i64, mut closing_client_fds map[int]bool, mut client_write_states map[int]&ClientWriteState) { + state := client_write_states[client_fd] or { return } + should_close := state.should_close + epollout_was_armed := state.epollout_armed + free_write_state(server, client_fd, mut client_write_states) + client_buffers.delete(client_fd) + if server.is_shutting_down() || should_close { + handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds, mut client_write_states) + return + } + // Keep-alive: if EPOLLOUT was registered for the write, drop the fd from + // epoll and re-add it with the original EPOLLIN|EPOLLET mask. The re-add + // causes the kernel to fire a fresh edge for any pipelined request bytes + // that piled up in the recv buffer while we were write-blocked; a plain + // EPOLL_CTL_MOD would not generate that edge. + if epollout_was_armed { + remove_fd_from_epoll(epoll_fd, client_fd) + if add_fd_to_epoll(epoll_fd, client_fd, u32(C.EPOLLIN | C.EPOLLET)) == -1 { + handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds, mut client_write_states) + } + } +} + +// handle_write resumes a blocked write when EPOLLOUT fires for `client_fd`. +fn handle_write(server &Server, epoll_fd int, client_fd int, mut client_fds map[int]bool, mut client_buffers map[int][]u8, mut client_read_starts map[int]i64, mut closing_client_fds map[int]bool, mut client_write_states map[int]&ClientWriteState) { + mut state := client_write_states[client_fd] or { return } + status := try_drain_write(client_fd, mut state) + match status { + .done { + complete_write(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds, mut client_write_states) + } + .blocked { + // Still blocked -- stay armed for the next EPOLLOUT. + } + .failed { + handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds, mut client_write_states) + } + } +} + +fn process_request(server &Server, epoll_fd int, client_fd int, request_buffer []u8, mut client_fds map[int]bool, mut client_buffers map[int][]u8, mut client_read_starts map[int]i64, mut closing_client_fds map[int]bool, mut client_write_states map[int]&ClientWriteState) { mut request_arena := voidptr(unsafe { nil }) $if prealloc { request_arena = unsafe { prealloc_scope_begin() } } client_read_starts.delete(client_fd) server.begin_request() - defer { - server.end_request() - } + mut request_active := true + mut decoded_http_request := decode_http_request(request_buffer) or { eprintln('Error decoding request ${err}') C.send(client_fd, tiny_bad_request_response.data, tiny_bad_request_response.len, C.MSG_NOSIGNAL) end_request_arena_current_thread(request_arena) - handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut - client_read_starts, mut closing_client_fds) + if request_active { + server.end_request() + } + handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds, mut client_write_states) return } $if trace_prealloc ? { @@ -286,18 +495,17 @@ fn process_request(server &Server, epoll_fd int, client_fd int, request_buffer [ C.send(client_fd, tiny_bad_request_response.data, tiny_bad_request_response.len, C.MSG_NOSIGNAL) end_request_arena_current_thread(request_arena) - handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut - client_read_starts, mut closing_client_fds) + if request_active { + server.end_request() + } + handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds, mut client_write_states) return } $if trace_prealloc ? { unsafe { prealloc_scope_checkpoint(c'fasthttp handler returned') } } response.attach_request_arena_if_empty(request_arena) - defer { - response.free_owned_content() - response.end_request_arena_current_thread() - } match response.takeover_mode { .manual { @@ -309,115 +517,108 @@ fn process_request(server &Server, epoll_fd int, client_fd int, request_buffer [ closing_client_fds.delete(client_fd) remove_fd_from_epoll(epoll_fd, client_fd) response.abandon_request_arena_current_thread() + response.free_owned_content() + if request_active { + server.end_request() + } return } .reusable { set_blocking(client_fd, false) client_buffers.delete(client_fd) + response.free_owned_content() + response.end_request_arena_current_thread() + if request_active { + server.end_request() + } if server.is_shutting_down() || response.should_close { - handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut - client_read_starts, mut closing_client_fds) + handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds, mut client_write_states) } return } .none {} } - if response.content.len > 0 { - mut send_error := false - mut pos := 0 - for pos < response.content.len { - sent := C.send(client_fd, unsafe { &response.content[pos] }, - response.content.len - pos, C.MSG_NOSIGNAL) - if sent <= 0 { - eprintln('ERROR: send() failed with errno=${C.errno}') - send_error = true - break - } - pos += sent - } - if send_error { - handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut - client_read_starts, mut closing_client_fds) - return - } - } - + // Open the file (if any) before constructing the state so we can fail fast + // while we still own the request arena via response. + mut file_fd := -1 + mut file_len := i64(0) if response.file_path != '' { - mut fd := C.open(response.file_path.str, C.O_RDONLY, 0) + fd := C.open(response.file_path.str, C.O_RDONLY, 0) if fd == -1 { eprintln('ERROR: open file failed') - handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut - client_read_starts, mut closing_client_fds) - return - } - defer { - if fd != -1 { - C.close(fd) + response.free_owned_content() + response.end_request_arena_current_thread() + if request_active { + server.end_request() } + handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds, mut client_write_states) + return } mut st := C.stat{} if C.fstat(fd, &st) != 0 { eprintln('ERROR: fstat failed') - handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut - client_read_starts, mut closing_client_fds) + C.close(fd) + response.free_owned_content() + response.end_request_arena_current_thread() + if request_active { + server.end_request() + } + handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds, mut client_write_states) return } - mut offset := i64(0) - mut remaining := i64(st.st_size) - mut sf_retries := 0 - for remaining > 0 { - ssize := C.sendfile(client_fd, fd, &offset, usize(remaining)) - if ssize > 0 { - remaining -= i64(ssize) - sf_retries = 0 - continue - } - errno_val := C.errno - match errno_val { - C.EAGAIN, C.EWOULDBLOCK, C.EINTR { - if sf_retries < 3 { - sf_retries++ - continue - } - eprintln('ERROR: sendfile() transient failure after ${sf_retries} retries (errno=${errno_val})') - } - C.EBADF { - eprintln('ERROR: sendfile() EBADF: input fd or socket not open for required access (errno=${errno_val})') - } - C.EFAULT { - eprintln('ERROR: sendfile() EFAULT: bad address for offset (errno=${errno_val})') - } - C.EINVAL { - eprintln('ERROR: sendfile() EINVAL: invalid descriptor state or non-seekable input (errno=${errno_val})') - } - C.EIO { - eprintln('ERROR: sendfile() EIO: I/O error while reading input file (errno=${errno_val})') - } - C.ENOMEM { - eprintln('ERROR: sendfile() ENOMEM: insufficient kernel memory (errno=${errno_val})') - } - C.EOVERFLOW { - eprintln('ERROR: sendfile() EOVERFLOW: count exceeds file/socket limits (errno=${errno_val})') - } - C.ESPIPE { - eprintln('ERROR: sendfile() ESPIPE: input file not seekable with offset (errno=${errno_val})') - } - else { - eprintln('ERROR: sendfile() failed with errno=${errno_val}') - } + file_fd = fd + file_len = i64(st.st_size) + } + + // Move content + arena ownership into the per-fd state. After this point we + // must not touch the response's arena via end/free (it will be released by + // free_write_state once the drain completes). + content_bytes := response.take_or_clone_content() + arena_ptr := response.take_request_arena() + leave_request_arena_current_thread(arena_ptr) + + mut state := &ClientWriteState{ + content: content_bytes + content_pos: 0 + content_owned: true + file_fd: file_fd + file_len: file_len + file_pos: 0 + should_close: response.should_close + request_arena: arena_ptr + request_active: request_active + start_ns: time.sys_mono_now() + } + // From here on, the state owns end_request bookkeeping. + request_active = false + client_write_states[client_fd] = state + + status := try_drain_write(client_fd, mut state) + match status { + .done { + complete_write(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds, mut client_write_states) + } + .blocked { + // Kernel send buffer is full. Park the state and resume on EPOLLOUT. + client_buffers.delete(client_fd) + client_read_starts.delete(client_fd) + if arm_epollout(epoll_fd, client_fd) == -1 { + eprintln('ERROR: epoll_ctl(MOD, EPOLLOUT) failed errno=${C.errno}') + handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds, mut client_write_states) + } else { + state.epollout_armed = true } - - handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut - client_read_starts, mut closing_client_fds) - return } - } - - client_buffers.delete(client_fd) - if server.is_shutting_down() || response.should_close { - handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut - client_read_starts, mut closing_client_fds) + .failed { + handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds, mut client_write_states) + } } } @@ -428,13 +629,14 @@ fn process_events(server &Server, epoll_fd int, listen_fd int) { mut client_buffers := map[int][]u8{} mut client_read_starts := map[int]i64{} mut closing_client_fds := map[int]bool{} + mut client_write_states := map[int]&ClientWriteState{} unsafe { request_buffer.flags.set(.noslices | .nogrow | .noshrink) } for { if server.is_shutting_down() && server.active_request_count() == 0 { - close_worker_clients(epoll_fd, mut client_fds, mut client_buffers, mut - client_read_starts, mut closing_client_fds) + close_worker_clients(server, epoll_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds, mut client_write_states) return } num_events := C.epoll_wait(epoll_fd, &events[0], max_connection_size, epoll_wait_timeout_ms) @@ -465,25 +667,46 @@ fn process_events(server &Server, epoll_fd int, listen_fd int) { continue } if client_fd > 0 { - // Try to send 444 No Response before closing abnormal connection - C.send(client_fd, status_444_response.data, status_444_response.len, - C.MSG_NOSIGNAL) - handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut - client_read_starts, mut closing_client_fds) + // Don't bother sending 444 if there is an in-flight write -- the + // peer is already disconnected and we want to tear down promptly. + if client_fd !in client_write_states { + C.send(client_fd, status_444_response.data, status_444_response.len, + C.MSG_NOSIGNAL) + } + handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds, mut client_write_states) } else { eprintln('ERROR: Invalid FD from epoll: ${client_fd}') } continue } + if events[i].events & u32(C.EPOLLOUT) != 0 { + handle_write(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds, mut client_write_states) + // If both EPOLLIN and EPOLLOUT fired, we still want to drain the + // socket -- fall through to the EPOLLIN branch below. + if client_fd !in client_fds { + continue + } + } if events[i].events & u32(C.EPOLLIN) != 0 { if server.is_shutting_down() { - handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut - client_read_starts, mut closing_client_fds) + handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds, mut client_write_states) continue } if closing_client_fds[client_fd] or { false } { - drain_closing_client(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut - client_read_starts, mut closing_client_fds) + drain_closing_client(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds, mut client_write_states) + continue + } + if client_fd in client_write_states { + // A response is still being drained for this fd. Reading a new + // request now would overwrite the in-flight ClientWriteState and + // silently drop the response/file transfer. Defer until the write + // completes -- on keep-alive completion we re-arm EPOLLIN below so + // any bytes that arrived during the write get delivered as a fresh + // edge. continue } // Read all available data from the socket @@ -557,17 +780,17 @@ fn process_events(server &Server, epoll_fd int, listen_fd int) { if request_complete { process_request(server, epoll_fd, client_fd, readed_request_buffer, mut client_fds, mut client_buffers, mut client_read_starts, mut - closing_client_fds) + closing_client_fds, mut client_write_states) } else if recv_error { // Unexpected recv error - send 444 No Response C.send(client_fd, status_444_response.data, status_444_response.len, C.MSG_NOSIGNAL) - handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut - client_read_starts, mut closing_client_fds) + handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds, mut client_write_states) } else if peer_closed || (total_bytes_read == 0 && readed_request_buffer.len == 0) { // Normal client closure (FIN received) - handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut - client_read_starts, mut closing_client_fds) + handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds, mut client_write_states) } else if readed_request_buffer.len > 0 { client_buffers[client_fd] = readed_request_buffer } @@ -583,6 +806,16 @@ fn process_events(server &Server, epoll_fd int, listen_fd int) { client_buffers, mut client_read_starts, mut closing_client_fds) } } + // Sweep write-side stalls: a client that armed EPOLLOUT but never + // drains within `timeout_in_seconds` gets the connection torn down so + // the worker isn't pinned waiting for a dead peer. + for client_fd in client_write_states.keys() { + state := client_write_states[client_fd] or { continue } + if state.start_ns > 0 && now - state.start_ns >= timeout_ns { + handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds, mut client_write_states) + } + } } } } diff --git a/vlib/veb/tests/large_payload_test.v b/vlib/veb/tests/large_payload_test.v index 031c509ec..fd2fd71e5 100644 --- a/vlib/veb/tests/large_payload_test.v +++ b/vlib/veb/tests/large_payload_test.v @@ -121,6 +121,23 @@ fn test_sendfile() { assert x.body.len == veb.max_write * 10 } +// Regression test for https://github.com/vlang/v/issues/27080: +// sendfile() returned EAGAIN once the kernel send buffer filled and +// the server gave up after 3 tight-loop retries. With proper poll() +// based waiting, files larger than the socket send buffer must still +// transfer in full. +fn test_sendfile_large_payload() { + // 8 MiB is well above the typical SO_SNDBUF (~200 KiB on Linux), + // so a single sendfile() call cannot drain the whole file. + size := 8 * 1024 * 1024 + mut buf := []u8{len: size, init: `b`} + os.write_file(tmp_file, buf.bytestr())! + + x := http.get('${localserver}/file')! + + assert x.body.len == size +} + fn testsuite_end() { os.rm(tmp_file)! } -- 2.39.5