From 3f3a7dc57cc717393daf40dc7d75cff0cf189b0f Mon Sep 17 00:00:00 2001 From: Alexander Medvednikov Date: Tue, 14 Apr 2026 02:05:28 +0300 Subject: [PATCH] veb: fasthttp + sse fixes --- examples/veb/server_sent_events/server.v | 16 ++--- vlib/fasthttp/fasthttp.v | 6 +- vlib/fasthttp/fasthttp_darwin.v | 80 +++++++++++++++++------- vlib/fasthttp/fasthttp_linux.v | 10 ++- vlib/net/net_nix.c.v | 2 +- vlib/v/checker/checker.v | 22 +++++++ vlib/veb/context.v | 27 +++++++- vlib/veb/middleware.v | 14 ++--- vlib/veb/sse/sse.v | 12 ++++ vlib/veb/veb_d_new_veb.v | 39 +++++++----- 10 files changed, 167 insertions(+), 61 deletions(-) diff --git a/examples/veb/server_sent_events/server.v b/examples/veb/server_sent_events/server.v index 40988e696..531033ee1 100644 --- a/examples/veb/server_sent_events/server.v +++ b/examples/veb/server_sent_events/server.v @@ -28,17 +28,19 @@ pub fn (mut app App) index() veb.Result { } fn (mut app App) sse() veb.Result { + ctx.takeover_conn() + spawn handle_sse_conn(mut ctx) + return veb.no_result() +} + +fn handle_sse_conn(mut ctx Context) { mut session := sse.start_connection(mut ctx.Context) - // Note: you can setup session.write_timeout and session.headers here - // session.start() or { return app.server_error(501) } - session.send_message(data: 'ok') or { return ctx.server_error_with_status(.not_implemented) } + session.send_message(data: 'ok') or { return } for { data := '{"time": "${time.now().str()}", "random_id": "${rand.ulid()}"}' - session.send_message(event: 'ping', data: data) or { - return ctx.server_error_with_status(.not_implemented) - } + session.send_message(event: 'ping', data: data) or { break } println('> sent event: ${data}') time.sleep(1 * time.second) } - return ctx.server_error_with_status(.not_implemented) + session.close() } diff --git a/vlib/fasthttp/fasthttp.v b/vlib/fasthttp/fasthttp.v index 26aa875a5..d5a720daa 100644 --- a/vlib/fasthttp/fasthttp.v +++ b/vlib/fasthttp/fasthttp.v @@ -65,8 +65,10 @@ pub mut: pub struct HttpResponse { pub: - content []u8 - file_path string + content []u8 + file_path string + takeover bool // if true, the connection fd is handed off to the caller and must not be closed by fasthttp + should_close bool // if true, close the connection after sending (Connection: close) } // ServerConfig bundles the parameters needed to start a fasthttp server. diff --git a/vlib/fasthttp/fasthttp_darwin.v b/vlib/fasthttp/fasthttp_darwin.v index d8a351d95..0b5010439 100644 --- a/vlib/fasthttp/fasthttp_darwin.v +++ b/vlib/fasthttp/fasthttp_darwin.v @@ -2,13 +2,15 @@ module fasthttp import net import sync.stdatomic -import time #include #include #include #include #include +#include + +fn C.signal(sig int, handler voidptr) voidptr const buf_size = max_connection_size const kqueue_max_events = 128 @@ -186,29 +188,10 @@ fn handle_write(server Server, kq int, c_ptr voidptr, mut clients map[int]voidpt close_conn(server, kq, c_ptr, mut clients) } -fn handle_read(server Server, kq int, c_ptr voidptr, mut clients map[int]voidptr) { +// process_request handles a complete HTTP request: decodes, calls the handler, +// sends the response (or handles takeover/sendfile). Runs in a spawned thread. +fn process_request(server Server, kq int, c_ptr voidptr, mut clients map[int]voidptr) { mut c := unsafe { &Conn(c_ptr) } - n := C.recv(c.fd, &c.read_buf[c.read_len], buf_size - c.read_len, 0) - if n <= 0 { - if n < 0 && C.errno != C.EAGAIN && C.errno != C.EWOULDBLOCK { - C.send(c.fd, status_444_response.data, status_444_response.len, 0) - close_conn(server, kq, c_ptr, mut clients) - return - } - close_conn(server, kq, c_ptr, mut clients) - return - } - - c.read_len += int(n) - if c.read_len == 0 { - return - } - - if c.read_len >= buf_size { - C.send(c.fd, status_413_response.data, status_413_response.len, 0) - close_conn(server, kq, c_ptr, mut clients) - return - } mut req_buf := []u8{cap: c.read_len} unsafe { @@ -231,6 +214,20 @@ fn handle_read(server Server, kq int, c_ptr voidptr, mut clients map[int]voidptr return } + if resp.takeover { + // The handler has taken ownership of the connection. + // Remove from kqueue and tracking, but do NOT close the fd. + clients.delete(c.fd) + delete_event(kq, u64(c.fd), i16(C.EVFILT_READ), c) + delete_event(kq, u64(c.fd), i16(C.EVFILT_WRITE), c) + if c.request_active { + server.end_request() + c.request_active = false + } + unsafe { free(c_ptr) } + return + } + c.write_buf = resp.content.clone() if resp.file_path != '' { fd := C.open(resp.file_path.str, C.O_RDONLY) @@ -257,6 +254,33 @@ fn handle_read(server Server, kq int, c_ptr voidptr, mut clients map[int]voidptr close_conn(server, kq, c_ptr, mut clients) } +fn handle_read(server Server, kq int, c_ptr voidptr, mut clients map[int]voidptr) { + mut c := unsafe { &Conn(c_ptr) } + n := C.recv(c.fd, &c.read_buf[c.read_len], buf_size - c.read_len, 0) + if n <= 0 { + if n < 0 && C.errno != C.EAGAIN && C.errno != C.EWOULDBLOCK { + C.send(c.fd, status_444_response.data, status_444_response.len, 0) + close_conn(server, kq, c_ptr, mut clients) + return + } + close_conn(server, kq, c_ptr, mut clients) + return + } + + c.read_len += int(n) + if c.read_len == 0 { + return + } + + if c.read_len >= buf_size { + C.send(c.fd, status_413_response.data, status_413_response.len, 0) + close_conn(server, kq, c_ptr, mut clients) + return + } + + process_request(server, kq, c_ptr, mut clients) +} + fn accept_clients(kq int, listen_fd int, mut clients map[int]voidptr) { for { client_fd := C.accept(listen_fd, unsafe { nil }, unsafe { nil }) @@ -268,6 +292,10 @@ fn accept_clients(kq int, listen_fd int, mut clients map[int]voidptr) { break } set_nonblocking(client_fd) + // Prevent SIGPIPE on writes to disconnected clients (macOS-specific). + // Without this, writing to a closed connection kills the entire server process. + nosigpipe_opt := 1 + C.setsockopt(client_fd, C.SOL_SOCKET, C.SO_NOSIGPIPE, &nosigpipe_opt, sizeof(int)) mut c := &Conn{ fd: client_fd user_data: unsafe { nil } @@ -298,6 +326,12 @@ fn (mut s Server) stop_accepting() { // run starts the server and enters the main event loop (Kqueue version). pub fn (mut s Server) run() ! { + // Ignore SIGPIPE process-wide. On macOS, writing to a disconnected socket + // sends SIGPIPE which terminates the process by default. SO_NOSIGPIPE is set + // per-socket on accept, but this is a safety net for any code path that might + // miss it (e.g. spawned SSE/WebSocket threads using TcpConn.write). + C.signal(C.SIGPIPE, C.SIG_IGN) + s.socket_fd = C.socket(s.family, net.SocketType.tcp, 0) if s.socket_fd < 0 { C.perror(c'socket') diff --git a/vlib/fasthttp/fasthttp_linux.v b/vlib/fasthttp/fasthttp_linux.v index 4cb7f4c3e..3c5a31ab6 100644 --- a/vlib/fasthttp/fasthttp_linux.v +++ b/vlib/fasthttp/fasthttp_linux.v @@ -241,6 +241,14 @@ fn process_request(server &Server, epoll_fd int, client_fd int, request_buffer [ return } + if response.takeover { + // The handler has taken ownership of the connection. + // Remove from epoll and tracking, but do NOT close the fd. + client_fds.delete(client_fd) + remove_fd_from_epoll(epoll_fd, client_fd) + return + } + if response.content.len > 0 { mut send_error := false mut pos := 0 @@ -327,7 +335,7 @@ fn process_request(server &Server, epoll_fd int, client_fd int, request_buffer [ } } - if server.is_shutting_down() { + if server.is_shutting_down() || response.should_close { handle_client_closure(epoll_fd, client_fd, mut client_fds) } } diff --git a/vlib/net/net_nix.c.v b/vlib/net/net_nix.c.v index 3bb2d0d59..3426ac443 100644 --- a/vlib/net/net_nix.c.v +++ b/vlib/net/net_nix.c.v @@ -20,7 +20,7 @@ pub fn error_code() int { fn init() { } -pub const msg_nosignal = 0x4000 +pub const msg_nosignal = C.MSG_NOSIGNAL pub const msg_dontwait = C.MSG_DONTWAIT pub const error_ewouldblock = int(C.EWOULDBLOCK) diff --git a/vlib/v/checker/checker.v b/vlib/v/checker/checker.v index b0f6b1fee..605346529 100644 --- a/vlib/v/checker/checker.v +++ b/vlib/v/checker/checker.v @@ -1992,6 +1992,28 @@ fn (mut c Checker) type_implements(typ ast.Type, interface_type ast.Type, pos to } } } + ast.GenericInst { + parent_sym := c.table.sym(ast.new_type(typ_sym.info.parent_idx)) + match parent_sym.info { + ast.Struct, ast.Interface, ast.SumType { + generic_names := parent_sym.info.generic_types.map(c.table.sym(it).name) + if rt := c.table.convert_generic_type(method.return_type, + generic_names, typ_sym.info.concrete_types) + { + method.return_type = rt + } + method.params = method.params.clone() + for mut param in method.params { + if pt := c.table.convert_generic_type(param.typ, generic_names, + typ_sym.info.concrete_types) + { + param.typ = pt + } + } + } + else {} + } + } else {} } msg := c.table.is_same_method(imethod, method) diff --git a/vlib/veb/context.v b/vlib/veb/context.v index d62053af3..a628aa1a0 100644 --- a/vlib/veb/context.v +++ b/vlib/veb/context.v @@ -45,6 +45,9 @@ mut: // manually. takeover bool return_file string + // raw client file descriptor, used by the fasthttp backend to create a TcpConn + // on demand when takeover_conn() is called + client_fd int = -1 // already_compressed indicates that the response body is already compressed (zstd/gzip) // and the compression middlewares should skip it already_compressed bool @@ -136,7 +139,7 @@ pub fn (mut ctx Context) send_response_to_client(mimetype string, response strin if ctx.res.status_code == 0 { ctx.res.set_status(.ok) } - if ctx.takeover { + if ctx.takeover && ctx.conn != unsafe { nil } { fast_send_resp(mut ctx.conn, ctx.res) or {} } // result is send in `veb.v`, `handle_route` @@ -488,6 +491,26 @@ pub fn (mut ctx Context) set_content_type(mime string) { // send multiple responses. Like with the SSE. pub fn (mut ctx Context) takeover_conn() { ctx.takeover = true + // For the fasthttp backend: create a TcpConn from the raw fd on demand + if ctx.conn == unsafe { nil } && ctx.client_fd >= 0 { + // Set the fd to blocking mode — fasthttp uses non-blocking sockets, + // but TcpConn.write() expects blocking behavior for reliable writes. + flags := C.fcntl(ctx.client_fd, C.F_GETFL, 0) + if flags != -1 { + C.fcntl(ctx.client_fd, C.F_SETFL, flags & ~C.O_NONBLOCK) + } + ctx.conn = &net.TcpConn{ + sock: net.TcpSocket{ + Socket: net.Socket{ + handle: ctx.client_fd + } + } + handle: ctx.client_fd + is_blocking: true + read_timeout: 30 * time.second + write_timeout: 30 * time.second + } + } } // user_agent returns the user-agent header for the current client @@ -510,7 +533,7 @@ pub fn (ctx &Context) ip() string { if ip.contains(',') { ip = ip.all_before(',') } - if ip == '' { + if ip == '' && ctx.conn != unsafe { nil } { ip = ctx.conn.peer_ip() or { '' } } return ip diff --git a/vlib/veb/middleware.v b/vlib/veb/middleware.v index e28ebe67d..ee0f2d228 100644 --- a/vlib/veb/middleware.v +++ b/vlib/veb/middleware.v @@ -141,7 +141,7 @@ enum ContentEncoding { zstd } -// send_compressed_response compresses the response body and sends it to the client. +// send_compressed_response compresses the response body and updates the response. // Returns true if compression should be skipped, false if compression was applied. fn send_compressed_response(mut ctx Context, encoding ContentEncoding) bool { compressed, encoding_name := match encoding { @@ -161,17 +161,15 @@ fn send_compressed_response(mut ctx Context, encoding ContentEncoding) bool { } } - // Take over the connection to have full control over the response - ctx.takeover_conn() - // Set HTTP headers for compressed content ctx.res.header.add(.content_encoding, encoding_name) ctx.res.header.set(.vary, 'Accept-Encoding') - ctx.res.header.set(.content_length, compressed.len.str()) - fast_send_resp_header(mut ctx.conn, ctx.res) or {} - ctx.conn.write_ptr(&u8(compressed.data), compressed.len) or {} - ctx.conn.close() or {} + // Replace the response body with the compressed data and update Content-Length. + // The normal response path will handle sending it. + ctx.res.body = compressed.bytestr() + ctx.res.header.set(.content_length, compressed.len.str()) + ctx.already_compressed = true return false } diff --git a/vlib/veb/sse/sse.v b/vlib/veb/sse/sse.v index 151857d95..d8da713ee 100644 --- a/vlib/veb/sse/sse.v +++ b/vlib/veb/sse/sse.v @@ -37,6 +37,12 @@ pub mut: // start an SSE connection pub fn start_connection(mut ctx veb.Context) &SSEConnection { + if ctx.conn == unsafe { nil } { + eprintln('[veb.sse] WARNING: SSE requires a direct TCP connection (ctx.conn) which is not available with this server backend. Use the default (picoev) backend instead of `-d new_veb` for SSE support.') + return &SSEConnection{ + conn: unsafe { nil } + } + } // Build and send HTTP response headers directly. // SSE responses must NOT include Content-Length since data is streamed. mut sb := strings.new_builder(256) @@ -56,6 +62,9 @@ pub fn start_connection(mut ctx veb.Context) &SSEConnection { // send_message sends a single message to the http client that listens for SSE. // It does not close the connection, so you can use it many times in a loop. pub fn (mut sse SSEConnection) send_message(message SSEMessage) ! { + if sse.conn == unsafe { nil } { + return error('SSE connection is not available (no TCP connection)') + } mut sb := strings.new_builder(512) if message.id != '' { sb.write_string('id: ${message.id}\n') @@ -75,6 +84,9 @@ pub fn (mut sse SSEConnection) send_message(message SSEMessage) ! { // send a 'close' event and close the tcp connection. pub fn (mut sse SSEConnection) close() { + if sse.conn == unsafe { nil } { + return + } sse.send_message(event: 'close', data: 'Closing the connection', retry: -1) or {} sse.conn.close() or {} } diff --git a/vlib/veb/veb_d_new_veb.v b/vlib/veb/veb_d_new_veb.v index 5144ad450..ae65de13f 100644 --- a/vlib/veb/veb_d_new_veb.v +++ b/vlib/veb/veb_d_new_veb.v @@ -56,6 +56,9 @@ pub fn run_new[A, X](mut global_app A, params RunParams) ! { } println('[veb] Running multi-threaded app on ${server_protocol(params)}://${startup_host(params)}:${params.port}/') flush_stdout() + $if A is BeforeAcceptApp { + global_app.before_accept_loop() + } server.run() or { panic(err) } } @@ -75,22 +78,28 @@ fn parallel_request_handler[A, X](req fasthttp.HttpRequest) !fasthttp.HttpRespon } // Create and populate the `veb.Context`. completed_context := handle_request_and_route[A, X](mut global_app, req2, client_fd, params) - // params.routes, params.controllers_sorted) - // Serialize the final `http.Response` into a byte array. + if completed_context.takeover { - eprintln('[veb] WARNING: ctx.takeover_conn() was called, but this is not supported by this server backend. The connection will be closed after this response.') + // The handler has taken over the connection (e.g. for SSE or WebSocket). + // The response was already sent directly over ctx.conn. + // Tell fasthttp to hand off the fd without closing it. + return fasthttp.HttpResponse{ + takeover: true + } } if completed_context.return_type == .file { return fasthttp.HttpResponse{ - content: completed_context.res.bytes() - file_path: completed_context.return_file + content: completed_context.res.bytes() + file_path: completed_context.return_file + should_close: completed_context.client_wants_to_close } } // The fasthttp server expects a complete response buffer to be returned. return fasthttp.HttpResponse{ - content: completed_context.res.bytes() + content: completed_context.res.bytes() + should_close: completed_context.client_wants_to_close } } // handle_request_and_route is a unified function that creates the context, @@ -117,17 +126,13 @@ fn handle_request_and_route[A, X](mut app A, req http.Request, _client_fd int, p host, _ := urllib.split_host_port(host_with_port) page_gen_start := if params.benchmark_page_generation { time.ticks() } else { 0 } mut ctx := &Context{ - req: req - page_gen_start: page_gen_start - // page_gen_start: time.ticks() - query: query - form: form - files: files - } - if connection_header := req.header.get(.connection) { - if connection_header.to_lower() == 'close' { - ctx.client_wants_to_close = true - } + req: req + page_gen_start: page_gen_start + client_fd: _client_fd + client_wants_to_close: true // fasthttp always closes connections after response + query: query + form: form + files: files } $if A is StaticApp { ctx.custom_mime_types = app.static_mime_types.clone() -- 2.39.5