From e632a84cd573bb05f3f72a0ae0cb9bbcaae404da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=84=9A=E8=80=85?= Date: Sat, 2 May 2026 11:35:36 +0800 Subject: [PATCH] veb: support reusable takeover connections (#27050) --- vlib/fasthttp/fasthttp.v | 14 +++- vlib/fasthttp/fasthttp_bsd.c.v | 39 ++++++--- vlib/fasthttp/fasthttp_linux.v | 30 ++++--- vlib/fasthttp/fasthttp_test.v | 90 +++++++++++++++++++++ vlib/veb/context.v | 40 ++++++--- vlib/veb/middleware.v | 4 +- vlib/veb/tests/persistent_connection_test.v | 62 ++++++++++++++ vlib/veb/veb.v | 12 +-- vlib/veb/veb_fasthttp.v | 22 +++-- 9 files changed, 265 insertions(+), 48 deletions(-) diff --git a/vlib/fasthttp/fasthttp.v b/vlib/fasthttp/fasthttp.v index 09cfea99b..adcd91e01 100644 --- a/vlib/fasthttp/fasthttp.v +++ b/vlib/fasthttp/fasthttp.v @@ -63,12 +63,18 @@ pub mut: user_data voidptr // User-defined context data } +pub enum ResponseTakeoverMode { + none + manual + reusable +} + pub struct HttpResponse { pub: - content []u8 - file_path string - takeover bool // if true, the connection fd is handed off to the caller and must not be closed by fasthttp - should_close bool // if true, close the connection after sending (Connection: close) + content []u8 + file_path string + takeover_mode ResponseTakeoverMode + should_close bool // if true, close the connection after sending (Connection: close) } // ServerConfig bundles the parameters needed to start a fasthttp server. diff --git a/vlib/fasthttp/fasthttp_bsd.c.v b/vlib/fasthttp/fasthttp_bsd.c.v index d7b807bfb..96ae1765a 100644 --- a/vlib/fasthttp/fasthttp_bsd.c.v +++ b/vlib/fasthttp/fasthttp_bsd.c.v @@ -319,18 +319,35 @@ fn process_request(server Server, kq int, c_ptr voidptr, mut clients map[int]voi return } - if resp.takeover { - // The handler has taken ownership of the connection. - // Remove from kqueue and tracking, but do NOT close the fd. - clients.delete(c.fd) - delete_event(kq, u64(c.fd), i16(C.EVFILT_READ), c) - delete_event(kq, u64(c.fd), i16(C.EVFILT_WRITE), c) - if c.request_active { - server.end_request() - c.request_active = false + match resp.takeover_mode { + .manual { + // The handler has taken ownership of the connection. + // Remove from kqueue and tracking, but do NOT close the fd. + clients.delete(c.fd) + delete_event(kq, u64(c.fd), i16(C.EVFILT_READ), c) + delete_event(kq, u64(c.fd), i16(C.EVFILT_WRITE), c) + if c.request_active { + server.end_request() + c.request_active = false + } + unsafe { free(c_ptr) } + return } - unsafe { free(c_ptr) } - return + .reusable { + set_nonblocking(c.fd) + c.read_len = 0 + c.read_extra.clear() + c.read_start = 0 + if c.request_active { + server.end_request() + c.request_active = false + } + if server.is_shutting_down() || resp.should_close { + close_conn(server, kq, c_ptr, mut clients) + } + return + } + .none {} } c.should_close = resp.should_close diff --git a/vlib/fasthttp/fasthttp_linux.v b/vlib/fasthttp/fasthttp_linux.v index 2ba824a40..ca75a4be5 100644 --- a/vlib/fasthttp/fasthttp_linux.v +++ b/vlib/fasthttp/fasthttp_linux.v @@ -282,15 +282,27 @@ fn process_request(server &Server, epoll_fd int, client_fd int, request_buffer [ return } - if response.takeover { - // The handler has taken ownership of the connection. - // Remove from epoll and tracking, but do NOT close the fd. - client_fds.delete(client_fd) - client_buffers.delete(client_fd) - client_read_starts.delete(client_fd) - closing_client_fds.delete(client_fd) - remove_fd_from_epoll(epoll_fd, client_fd) - return + match response.takeover_mode { + .manual { + // The handler has taken ownership of the connection. + // Remove from epoll and tracking, but do NOT close the fd. + client_fds.delete(client_fd) + client_buffers.delete(client_fd) + client_read_starts.delete(client_fd) + closing_client_fds.delete(client_fd) + remove_fd_from_epoll(epoll_fd, client_fd) + return + } + .reusable { + set_blocking(client_fd, false) + client_buffers.delete(client_fd) + if server.is_shutting_down() || response.should_close { + handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut + client_read_starts, mut closing_client_fds) + } + return + } + .none {} } if response.content.len > 0 { diff --git a/vlib/fasthttp/fasthttp_test.v b/vlib/fasthttp/fasthttp_test.v index bb4cae2d0..b2fc94901 100644 --- a/vlib/fasthttp/fasthttp_test.v +++ b/vlib/fasthttp/fasthttp_test.v @@ -1,8 +1,12 @@ module fasthttp +import net import os +import time const fasthttp_example_exe = os.join_path(os.cache_dir(), 'fasthttp_example_test.exe') +const reusable_takeover_port = 13019 +const reusable_takeover_addr = '127.0.0.1:${reusable_takeover_port}' fn testsuite_begin() { // Clean up old example binary if it exists @@ -145,3 +149,89 @@ fn test_server_ipv4_ipv6_binding() { assert server_ipv4.port == 8081 assert server_ipv6.port == 8082 } + +fn test_response_takeover_mode_reusable_keeps_connection() { + $if linux || bsd { + mut server := new_server(ServerConfig{ + family: .ip + port: reusable_takeover_port + timeout_in_seconds: 2 + max_request_buffer_size: 8192 + handler: reusable_takeover_handler + }) or { + assert false, 'Failed to create server: ${err}' + return + } + handle := server.handle() + spawn server.run() + handle.wait_till_running(max_retries: 1000, retry_period_ms: 10) or { + assert false, 'server did not start: ${err}' + return + } + defer { + handle.shutdown(timeout: 5 * time.second) or {} + } + + mut conn := net.dial_tcp(reusable_takeover_addr)! + conn.set_read_timeout(2 * time.second) + conn.set_write_timeout(2 * time.second) + defer { + conn.close() or {} + } + + conn.write_string('GET /reus')! + time.sleep(50 * time.millisecond) + conn.write_string('able HTTP/1.1\r\nHost: ${reusable_takeover_addr}\r\n\r\n')! + reusable_response := read_until_contains(mut conn, '\r\n0\r\n\r\n')! + assert reusable_response.contains('manual') == true, reusable_response + assert reusable_response.contains('\r\n0\r\n\r\n') == true, reusable_response + + conn.write_string('GET /normal HTTP/1.1\r\nHost: ${reusable_takeover_addr}\r\n\r\n')! + normal_response := read_until_contains(mut conn, 'normal')! + assert normal_response.contains('normal') == true, normal_response + assert normal_response.contains('Connection: close') == false, normal_response + } $else { + return + } +} + +fn reusable_takeover_handler(req HttpRequest) !HttpResponse { + path := req.buffer[req.path.start..req.path.start + req.path.len].bytestr() + if path == '/reusable' { + body := 'manual' + send_raw_response(req.client_conn_fd, + 'HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n${body.len:x}\r\n${body}\r\n0\r\n\r\n') + return HttpResponse{ + takeover_mode: .reusable + } + } + return HttpResponse{ + content: 'HTTP/1.1 200 OK\r\nContent-Length: 6\r\n\r\nnormal'.bytes() + } +} + +fn send_raw_response(fd int, response string) { + $if linux { + C.send(fd, response.str, response.len, C.MSG_NOSIGNAL) + } $else $if bsd { + C.send(fd, response.str, response.len, send_flags) + } $else { + C.send(fd, response.str, response.len, 0) + } +} + +fn read_until_contains(mut conn net.TcpConn, marker string) !string { + mut raw := '' + mut buf := []u8{len: 1024} + for _ in 0 .. 16 { + n := conn.read(mut buf)! + if n <= 0 { + break + } + raw += buf[..n].bytestr() + if raw.contains(marker) { + break + } + } + return raw +} diff --git a/vlib/veb/context.v b/vlib/veb/context.v index 45e5a29ed..d098dcb70 100644 --- a/vlib/veb/context.v +++ b/vlib/veb/context.v @@ -14,6 +14,12 @@ enum ContextReturnType { file } +enum ContextTakeoverMode { + none + manual + reusable +} + pub enum RedirectType { found = int(http.Status.found) moved_permanently = int(http.Status.moved_permanently) @@ -41,10 +47,10 @@ mut: enable_static_compression bool static_compression_max_size int static_compression_mime_types []string - // if true the response should not be sent and the connection should be closed - // manually. - takeover bool - return_file string + // controls whether veb should automatically send the response or whether the handler + // takes over response writing. + takeover_mode ContextTakeoverMode + return_file string // raw client file descriptor, used by the fasthttp backend to create a TcpConn // on demand when takeover_conn() is called client_fd int = -1 @@ -99,7 +105,7 @@ pub fn (mut ctx Context) set_custom_header(key string, value string) ! { // send_response_to_client finalizes the response headers and sets Content-Type to `mimetype` // and the response body to `response` pub fn (mut ctx Context) send_response_to_client(mimetype string, response string) Result { - if ctx.done && !ctx.takeover { + if ctx.done && ctx.takeover_mode == .none { eprintln('[veb] a response cannot be sent twice over one connection') return Result{} } @@ -129,7 +135,7 @@ pub fn (mut ctx Context) send_response_to_client(mimetype string, response strin } // send veb's closing headers ctx.res.header.set(.server, 'veb') - if !ctx.takeover && ctx.client_wants_to_close { + if ctx.takeover_mode == .none && ctx.client_wants_to_close { // Only sent the `Connection: close` header when the client wants to close // the connection. This typically happens when the client only supports HTTP 1.0 ctx.res.header.set(.connection, 'close') @@ -139,7 +145,7 @@ pub fn (mut ctx Context) send_response_to_client(mimetype string, response strin if ctx.res.status_code == 0 { ctx.res.set_status(.ok) } - if ctx.takeover && ctx.conn != unsafe { nil } { + if ctx.takeover_mode != .none && ctx.conn != unsafe { nil } { fast_send_resp(mut ctx.conn, ctx.res) or {} } // result is send in `veb.v`, `handle_route` @@ -265,7 +271,7 @@ fn (mut ctx Context) send_file(content_type string, file_path string) Result { } } // Takeover mode: load file in memory (backward compatibility) - if ctx.takeover { + if ctx.takeover_mode != .none { data := os.read_file(file_path) or { eprintln('[veb] error while trying to read file: ${err.msg()}') return ctx.server_error('could not read resource') @@ -491,10 +497,14 @@ pub fn (mut ctx Context) set_content_type(mime string) { // This function is useful when you want to keep the connection alive and/or // send multiple responses. Like with the SSE. pub fn (mut ctx Context) takeover_conn() { - ctx.takeover = true + ctx.takeover_mode = .manual + ctx.prepare_takeover_conn() +} + +fn (mut ctx Context) prepare_takeover_conn() { if ctx.conn == unsafe { nil } && ctx.client_fd >= 0 { // For the fasthttp backend: create a TcpConn from the raw fd on demand. - // Set the fd to blocking mode — fasthttp uses non-blocking sockets, + // Set the fd to blocking mode. fasthttp uses non-blocking sockets, // but TcpConn.write() expects blocking behavior for reliable writes. $if !windows { flags := C.fcntl(ctx.client_fd, C.F_GETFL, 0) @@ -529,6 +539,16 @@ pub fn (mut ctx Context) takeover_conn() { } } +// takeover_conn_reusable prevents veb from automatically sending a response, +// but lets veb keep the connection in the read loop after the handler returns. +// The handler must write exactly one complete HTTP response with a clear body +// boundary, such as Content-Length or a final chunk for Transfer-Encoding: +// chunked. If the client asked to close the connection, veb will still close it. +pub fn (mut ctx Context) takeover_conn_reusable() { + ctx.takeover_mode = .reusable + ctx.prepare_takeover_conn() +} + // user_agent returns the user-agent header for the current client pub fn (ctx &Context) user_agent() string { return ctx.req.header.get(.user_agent) or { '' } diff --git a/vlib/veb/middleware.v b/vlib/veb/middleware.v index 6d1f18bf1..2a52fccaf 100644 --- a/vlib/veb/middleware.v +++ b/vlib/veb/middleware.v @@ -249,9 +249,9 @@ fn should_skip_compression(ctx Context) bool { if ctx.already_compressed { return true } - // Skip compression for files in streaming mode (takeover == false) + // Skip compression for files in streaming mode (no takeover) // Files in takeover mode (small files loaded in memory) are compressed - if ctx.return_type == .file && !ctx.takeover { + if ctx.return_type == .file && ctx.takeover_mode == .none { return true } return false diff --git a/vlib/veb/tests/persistent_connection_test.v b/vlib/veb/tests/persistent_connection_test.v index 4210c1e03..f64c4bc90 100644 --- a/vlib/veb/tests/persistent_connection_test.v +++ b/vlib/veb/tests/persistent_connection_test.v @@ -43,6 +43,13 @@ pub fn (mut app App) reset(mut ctx Context) veb.Result { return ctx.ok('') } +pub fn (mut app App) reusable(mut ctx Context) veb.Result { + ctx.takeover_conn_reusable() + body := 'manual:${app.counter}' + ctx.conn.write_string('HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n${body.len:x}\r\n${body}\r\n0\r\n\r\n') or {} + return veb.no_result() +} + fn testsuite_begin() { os.chdir(os.dir(@FILE))! mut app := &App{} @@ -79,6 +86,45 @@ fn test_conn_remains_intact() { conn.close() or {} } +fn test_conn_remains_intact_after_reusable_takeover() { + http.get('http://${localserver}/reset')! + + mut conn := simple_tcp_client()! + conn.write_string('GET /reusable HTTP/1.1\r\nUser-Agent: VTESTS\r\nAccept: */*\r\n\r\n')! + + mut response := read_until_contains(mut conn, '\r\n0\r\n\r\n')! + assert response.contains('Connection: close') == false, '`Connection` header should NOT be present!' + assert response.contains('manual:0') == true, 'read response: ${response}' + assert response.contains('\r\n0\r\n\r\n') == true, 'read response: ${response}' + + // send request again over the same connection + conn.write_string(default_request)! + + read := io.read_all(reader: conn)! + response = read.bytestr() + assert response.contains('Connection: close') == false, '`Connection` header should NOT be present!' + assert response.ends_with('${response_body}:1') == true, 'read response: ${response}' + + conn.close() or {} +} + +fn test_reusable_takeover_honors_connection_close() { + http.get('http://${localserver}/reset')! + + mut conn := simple_tcp_client()! + conn.write_string('GET /reusable HTTP/1.1\r\nUser-Agent: VTESTS\r\nAccept: */*\r\nConnection: close\r\n\r\n')! + + response := read_until_contains(mut conn, '\r\n0\r\n\r\n')! + assert response.contains('manual:0') == true, 'read response: ${response}' + assert response.contains('\r\n0\r\n\r\n') == true, 'read response: ${response}' + + conn.write_string(default_request) or {} + next := io.read_all(reader: conn) or { []u8{} } + assert next.len == 0 + + conn.close() or {} +} + fn test_support_http_1() { http.get('http://${localserver}/reset')! // HTTP 1.0 always closes the connection after each request, so the client must @@ -124,3 +170,19 @@ fn simple_tcp_client() !&net.TcpConn { client.set_write_timeout(tcp_w_timeout) return client } + +fn read_until_contains(mut conn net.TcpConn, marker string) !string { + mut raw := '' + mut buf := []u8{len: 1024} + for _ in 0 .. 16 { + n := conn.read(mut buf)! + if n <= 0 { + break + } + raw += buf[..n].bytestr() + if raw.contains(marker) { + break + } + } + return raw +} diff --git a/vlib/veb/veb.v b/vlib/veb/veb.v index 6a2d2298c..7148c7679 100644 --- a/vlib/veb/veb.v +++ b/vlib/veb/veb.v @@ -179,14 +179,14 @@ fn handle_ssl_connection[A, X](mut ssl_conn mbedtls.SSLConn, params &SslRequestP write_ssl_response(mut ssl_conn, http_400) or {} return } - if completed_context.takeover { - eprintln('[veb] HTTPS connections do not support `ctx.takeover_conn()` yet; closing the connection after this response.') + if completed_context.takeover_mode != .none { + eprintln('[veb] HTTPS connections do not support takeover connections yet; closing the connection after this response.') } write_ssl_context_response(mut ssl_conn, completed_context) or { eprintln('[veb] error sending HTTPS response: ${err}') return } - if completed_context.takeover + if completed_context.takeover_mode != .none || should_close_connection(completed_context.req, completed_context.res, completed_context.client_wants_to_close) { return } @@ -427,14 +427,14 @@ fn handle_route[A, X](mut app A, mut user_context X, url urllib.URL, host string // send only the headers, because if the response body is too big, TcpConn code will // actually block, because it has to wait for the socket to become ready to write. veb // will handle this case. - if !was_done && !user_context.Context.done && !user_context.Context.takeover { + if !was_done && !user_context.Context.done && user_context.Context.takeover_mode == .none { eprintln('[veb] handler for route "${url.path}" does not send any data!') // send response anyway so the connection won't block // fast_send_resp_header(mut user_context.conn, user_context.res) or {} - } else if !user_context.Context.takeover { + } else if user_context.Context.takeover_mode == .none { // fast_send_resp_header(mut user_context.conn, user_context.res) or {} } - // Context.takeover is set to true, so the user must close the connection and sent a response. + // Context.takeover_mode is set, so the user must send a response. } url_words := url.path.split('/').filter(it != '') diff --git a/vlib/veb/veb_fasthttp.v b/vlib/veb/veb_fasthttp.v index 8249fa7f9..c5d345db2 100644 --- a/vlib/veb/veb_fasthttp.v +++ b/vlib/veb/veb_fasthttp.v @@ -110,13 +110,23 @@ fn parallel_request_handler[A, X](req fasthttp.HttpRequest) !fasthttp.HttpRespon // Create and populate the `veb.Context`. completed_context := handle_request_and_route[A, X](mut global_app, req2, client_fd, params) - if completed_context.takeover { - // The handler has taken over the connection (e.g. for SSE or WebSocket). - // The response was already sent directly over ctx.conn. - // Tell fasthttp to hand off the fd without closing it. - return fasthttp.HttpResponse{ - takeover: true + match completed_context.takeover_mode { + .manual { + // The handler has taken over the connection (e.g. for SSE or WebSocket). + // The response was already sent directly over ctx.conn. + // Tell fasthttp to hand off the fd without closing it. + return fasthttp.HttpResponse{ + takeover_mode: .manual + } + } + .reusable { + return fasthttp.HttpResponse{ + takeover_mode: .reusable + should_close: should_close_connection(completed_context.req, + completed_context.res, completed_context.client_wants_to_close) + } } + .none {} } if completed_context.return_type == .file { -- 2.39.5