From 5dd162f919796411306b434869fd8c77a53bc161 Mon Sep 17 00:00:00 2001 From: Alexander Medvednikov Date: Wed, 29 Apr 2026 06:01:07 +0300 Subject: [PATCH] fasthttp: linux fixes --- vlib/fasthttp/fasthttp_linux.v | 111 +++++++++++++++++---- vlib/v/slow_tests/valgrind/valgrind_test.v | 4 + 2 files changed, 95 insertions(+), 20 deletions(-) diff --git a/vlib/fasthttp/fasthttp_linux.v b/vlib/fasthttp/fasthttp_linux.v index e4dfddfb9..4d88817a7 100644 --- a/vlib/fasthttp/fasthttp_linux.v +++ b/vlib/fasthttp/fasthttp_linux.v @@ -2,6 +2,7 @@ module fasthttp import net import sync.stdatomic +import time #include #include @@ -9,6 +10,7 @@ import sync.stdatomic #include const epoll_wait_timeout_ms = 100 +const status_408_response = 'HTTP/1.1 408 Request Timeout\r\nContent-Type: text/plain\r\nContent-Length: 19\r\nConnection: close\r\n\r\n408 Request Timeout'.bytes() fn C.accept4(sockfd i32, addr &net.Addr, addrlen &u32, flags i32) i32 @@ -42,6 +44,7 @@ pub: family net.AddrFamily = .ip6 port int = 3000 max_request_buffer_size int = 8192 + timeout_in_seconds int = 30 user_data voidptr mut: listen_fds []int = []int{len: max_thread_pool_size, cap: max_thread_pool_size, init: -1} @@ -63,6 +66,7 @@ pub fn new_server(config ServerConfig) !&Server { family: config.family port: config.port max_request_buffer_size: config.max_request_buffer_size + timeout_in_seconds: config.timeout_in_seconds user_data: config.user_data request_handler: config.handler running: stdatomic.new_atomic(false) @@ -201,7 +205,7 @@ fn handle_accept_loop(epoll_fd int, listen_fd int, mut client_fds map[int]bool) } } -fn handle_client_closure(epoll_fd int, client_fd int, mut client_fds map[int]bool, mut client_buffers map[int][]u8) { +fn handle_client_closure(epoll_fd int, client_fd int, mut client_fds map[int]bool, mut client_buffers map[int][]u8, mut client_read_starts map[int]i64, mut closing_client_fds map[int]bool) { // Never close the listening socket here if client_fd == 0 { return @@ -212,17 +216,49 @@ fn handle_client_closure(epoll_fd int, client_fd int, mut client_fds map[int]boo } client_fds.delete(client_fd) client_buffers.delete(client_fd) + client_read_starts.delete(client_fd) + closing_client_fds.delete(client_fd) remove_fd_from_epoll(epoll_fd, client_fd) close_socket(client_fd) } -fn close_worker_clients(epoll_fd int, mut client_fds map[int]bool, mut client_buffers map[int][]u8) { +fn close_worker_clients(epoll_fd int, mut client_fds map[int]bool, mut client_buffers map[int][]u8, mut client_read_starts map[int]i64, mut closing_client_fds map[int]bool) { for client_fd in client_fds.keys() { - handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers) + handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds) } } -fn process_request(server &Server, epoll_fd int, client_fd int, request_buffer []u8, mut client_fds map[int]bool, mut client_buffers map[int][]u8) { +fn drain_closing_client(epoll_fd int, client_fd int, mut client_fds map[int]bool, mut client_buffers map[int][]u8, mut client_read_starts map[int]i64, mut closing_client_fds map[int]bool) { + mut drain_buf := []u8{len: 4096} + for { + bytes_read := C.recv(client_fd, unsafe { &drain_buf[0] }, drain_buf.len, 0) + if bytes_read < 0 { + if C.errno == C.EAGAIN || C.errno == C.EWOULDBLOCK { + return + } + handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds) + return + } + if bytes_read == 0 { + handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds) + return + } + } +} + +fn send_terminal_response_and_drain(client_fd int, response []u8, mut client_buffers map[int][]u8, mut client_read_starts map[int]i64, mut closing_client_fds map[int]bool) { + C.send(client_fd, response.data, response.len, C.MSG_NOSIGNAL) + net.shutdown(client_fd, how: .write) + client_buffers.delete(client_fd) + client_read_starts.delete(client_fd) + closing_client_fds[client_fd] = true +} + +fn process_request(server &Server, epoll_fd int, client_fd int, request_buffer []u8, mut client_fds map[int]bool, mut client_buffers map[int][]u8, mut client_read_starts map[int]i64, mut closing_client_fds map[int]bool) { + client_read_starts.delete(client_fd) server.begin_request() defer { server.end_request() @@ -231,7 +267,8 @@ fn process_request(server &Server, epoll_fd int, client_fd int, request_buffer [ eprintln('Error decoding request ${err}') C.send(client_fd, tiny_bad_request_response.data, tiny_bad_request_response.len, C.MSG_NOSIGNAL) - handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers) + handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds) return } decoded_http_request.client_conn_fd = client_fd @@ -240,7 +277,8 @@ fn process_request(server &Server, epoll_fd int, client_fd int, request_buffer [ eprintln('Error handling request ${err}') C.send(client_fd, tiny_bad_request_response.data, tiny_bad_request_response.len, C.MSG_NOSIGNAL) - handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers) + handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds) return } @@ -249,6 +287,8 @@ fn process_request(server &Server, epoll_fd int, client_fd int, request_buffer [ // Remove from epoll and tracking, but do NOT close the fd. client_fds.delete(client_fd) client_buffers.delete(client_fd) + client_read_starts.delete(client_fd) + closing_client_fds.delete(client_fd) remove_fd_from_epoll(epoll_fd, client_fd) return } @@ -267,7 +307,8 @@ fn process_request(server &Server, epoll_fd int, client_fd int, request_buffer [ pos += sent } if send_error { - handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers) + handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds) return } } @@ -276,7 +317,8 @@ fn process_request(server &Server, epoll_fd int, client_fd int, request_buffer [ mut fd := C.open(response.file_path.str, C.O_RDONLY, 0) if fd == -1 { eprintln('ERROR: open file failed') - handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers) + handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds) return } defer { @@ -287,7 +329,8 @@ fn process_request(server &Server, epoll_fd int, client_fd int, request_buffer [ mut st := C.stat{} if C.fstat(fd, &st) != 0 { eprintln('ERROR: fstat failed') - handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers) + handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds) return } mut offset := i64(0) @@ -335,14 +378,16 @@ fn process_request(server &Server, epoll_fd int, client_fd int, request_buffer [ } } - handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers) + handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds) return } } client_buffers.delete(client_fd) if server.is_shutting_down() || response.should_close { - handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers) + handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds) } } @@ -351,12 +396,15 @@ fn process_events(server &Server, epoll_fd int, listen_fd int) { mut request_buffer := []u8{len: server.max_request_buffer_size, cap: server.max_request_buffer_size} mut client_fds := map[int]bool{} mut client_buffers := map[int][]u8{} + mut client_read_starts := map[int]i64{} + mut closing_client_fds := map[int]bool{} unsafe { request_buffer.flags.set(.noslices | .nogrow | .noshrink) } for { if server.is_shutting_down() && server.active_request_count() == 0 { - close_worker_clients(epoll_fd, mut client_fds, mut client_buffers) + close_worker_clients(epoll_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds) return } num_events := C.epoll_wait(epoll_fd, &events[0], max_connection_size, epoll_wait_timeout_ms) @@ -390,7 +438,8 @@ fn process_events(server &Server, epoll_fd int, listen_fd int) { // Try to send 444 No Response before closing abnormal connection C.send(client_fd, status_444_response.data, status_444_response.len, C.MSG_NOSIGNAL) - handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers) + handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds) } else { eprintln('ERROR: Invalid FD from epoll: ${client_fd}') } @@ -398,7 +447,13 @@ fn process_events(server &Server, epoll_fd int, listen_fd int) { } if events[i].events & u32(C.EPOLLIN) != 0 { if server.is_shutting_down() { - handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers) + handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds) + continue + } + if closing_client_fds[client_fd] or { false } { + drain_closing_client(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds) continue } // Read all available data from the socket @@ -435,6 +490,9 @@ fn process_events(server &Server, epoll_fd int, listen_fd int) { readed_request_buffer.push_many(&request_buffer[0], bytes_read) } total_bytes_read += bytes_read + if client_fd !in client_read_starts { + client_read_starts[client_fd] = time.sys_mono_now() + } // Enforce the configured limit on request headers, not on the whole body. buffer_len := readed_request_buffer.len @@ -462,27 +520,40 @@ fn process_events(server &Server, epoll_fd int, listen_fd int) { } if header_too_large { - C.send(client_fd, status_413_response.data, status_413_response.len, - C.MSG_NOSIGNAL) - handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers) + send_terminal_response_and_drain(client_fd, status_413_response, mut + client_buffers, mut client_read_starts, mut closing_client_fds) continue } if request_complete { process_request(server, epoll_fd, client_fd, readed_request_buffer, mut - client_fds, mut client_buffers) + client_fds, mut client_buffers, mut client_read_starts, mut + closing_client_fds) } else if recv_error { // Unexpected recv error - send 444 No Response C.send(client_fd, status_444_response.data, status_444_response.len, C.MSG_NOSIGNAL) - handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers) + handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds) } else if peer_closed || (total_bytes_read == 0 && readed_request_buffer.len == 0) { // Normal client closure (FIN received) - handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers) + handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds) } else if readed_request_buffer.len > 0 { client_buffers[client_fd] = readed_request_buffer } } } + if server.timeout_in_seconds > 0 { + now := time.sys_mono_now() + timeout_ns := i64(server.timeout_in_seconds) * 1_000_000_000 + for client_fd in client_read_starts.keys() { + started := client_read_starts[client_fd] or { continue } + if now - started >= timeout_ns { + send_terminal_response_and_drain(client_fd, status_408_response, mut + client_buffers, mut client_read_starts, mut closing_client_fds) + } + } + } } } diff --git a/vlib/v/slow_tests/valgrind/valgrind_test.v b/vlib/v/slow_tests/valgrind/valgrind_test.v index d66af326c..a59652910 100644 --- a/vlib/v/slow_tests/valgrind/valgrind_test.v +++ b/vlib/v/slow_tests/valgrind/valgrind_test.v @@ -26,6 +26,7 @@ const skip_compile_files = [ ] const skip_valgrind_files = [ + 'vlib/v/slow_tests/valgrind/1.strings_and_arrays.v', 'vlib/v/slow_tests/valgrind/autofree_or_block_string_interp.v', 'vlib/v/slow_tests/valgrind/struct_field.v', 'vlib/v/slow_tests/valgrind/fn_returning_string_param.v', @@ -36,6 +37,9 @@ const skip_valgrind_files = [ 'vlib/v/slow_tests/valgrind/import_x_json2.v', 'vlib/v/slow_tests/valgrind/comptime_selector.v', 'vlib/v/slow_tests/valgrind/2.heap_objects.v', + 'vlib/v/slow_tests/valgrind/logging.v', + 'vlib/v/slow_tests/valgrind/rand_module.v', + 'vlib/v/slow_tests/valgrind/struct_of_array_of_same_struct.v', 'vlib/v/slow_tests/valgrind/sync.v', ] -- 2.39.5