From 184883b418c7ec2febd5d4def70259ad81f7bb4b Mon Sep 17 00:00:00 2001 From: Alexander Medvednikov Date: Sat, 18 Apr 2026 01:15:05 +0300 Subject: [PATCH] veb: use fasthttp by default --- vlib/fasthttp/request_parser.v | 4 +- vlib/veb/chunked_request_body_test.v | 33 - vlib/veb/context.v | 2 +- vlib/veb/controller_static_regression_test.v | 28 +- vlib/veb/server_d_new_veb.v | 44 - vlib/veb/server_notd_new_veb.v | 20 - vlib/veb/sse/sse.v | 2 +- vlib/veb/tests/graceful_shutdown_test.v | 191 ++--- vlib/veb/tests/large_payload_test.v | 2 +- vlib/veb/tests/parallel_event_loop_test.v | 138 --- vlib/veb/tests/veb_test.v | 12 - vlib/veb/veb.v | 97 +-- vlib/veb/veb_d_new_veb.v | 163 ---- vlib/veb/veb_picoev.v | 838 ------------------- 14 files changed, 106 insertions(+), 1468 deletions(-) delete mode 100644 vlib/veb/chunked_request_body_test.v delete mode 100644 vlib/veb/server_d_new_veb.v delete mode 100644 vlib/veb/server_notd_new_veb.v delete mode 100644 vlib/veb/tests/parallel_event_loop_test.v delete mode 100644 vlib/veb/veb_d_new_veb.v delete mode 100644 vlib/veb/veb_picoev.v diff --git a/vlib/fasthttp/request_parser.v b/vlib/fasthttp/request_parser.v index a0a32be36..abbf50f0d 100644 --- a/vlib/fasthttp/request_parser.v +++ b/vlib/fasthttp/request_parser.v @@ -1,8 +1,8 @@ module fasthttp const empty_space = u8(` `) -const cr_char = u8(`\r`) -const lf_char = u8(`\n`) +const cr_char = u8(0x0d) +const lf_char = u8(0x0a) // libc memchr is AVX2-accelerated via glibc IFUNC @[inline] diff --git a/vlib/veb/chunked_request_body_test.v b/vlib/veb/chunked_request_body_test.v deleted file mode 100644 index fae745fac..000000000 --- a/vlib/veb/chunked_request_body_test.v +++ /dev/null @@ -1,33 +0,0 @@ -module veb - -$if !new_veb ? { - fn test_decode_chunked_request_body() { - encoded := '4\r\nWiki\r\n5\r\npedia\r\n0\r\n\r\n' - decoded := decode_chunked_request_body(encoded) or { panic(err) } - assert decoded == 'Wikipedia' - } - - fn test_decode_chunked_request_body_with_extensions_and_trailers() { - encoded := '4;foo=bar\r\nWiki\r\n0\r\nX-Trace: true\r\n\r\n' - decoded := decode_chunked_request_body(encoded) or { panic(err) } - assert decoded == 'Wiki' - } - - fn test_decode_chunked_request_body_with_incomplete_data() { - encoded := '4\r\nWiki\r\n5\r\nped' - decode_chunked_request_body(encoded) or { - assert err is IncompleteChunkedRequestBodyError - return - } - assert false - } - - fn test_decode_chunked_request_body_with_invalid_data() { - encoded := 'z\r\nWiki\r\n0\r\n\r\n' - decode_chunked_request_body(encoded) or { - assert err.msg() == 'invalid chunk size line' - return - } - assert false - } -} diff --git a/vlib/veb/context.v b/vlib/veb/context.v index 9b31ba547..45e5a29ed 100644 --- a/vlib/veb/context.v +++ b/vlib/veb/context.v @@ -514,7 +514,7 @@ pub fn (mut ctx Context) takeover_conn() { write_timeout: 30 * time.second } } else if ctx.conn != unsafe { nil } { - // For the picoev backend: the connection exists but uses non-blocking I/O. + // The connection exists but uses non-blocking I/O. // Switch to blocking mode for reliable SSE writes. fd := ctx.conn.handle $if !windows { diff --git a/vlib/veb/controller_static_regression_test.v b/vlib/veb/controller_static_regression_test.v index b3e57c5c0..11cb9025c 100644 --- a/vlib/veb/controller_static_regression_test.v +++ b/vlib/veb/controller_static_regression_test.v @@ -1,6 +1,5 @@ module veb -import net import net.http import net.urllib import os @@ -104,25 +103,12 @@ fn regression_handle_app_request(mut app AppStaticRegressionApp, req http.Reques panic(err) } - $if new_veb ? { - params := RequestParams{ - global_app: unsafe { voidptr(&app) } - controllers_sorted: controllers_sorted - routes: &routes - benchmark_page_generation: false - } - return handle_request_and_route[AppStaticRegressionApp, RegressionContext](mut app, req, 0, - params) - } $else { - params := &RequestParams{ - global_app: unsafe { voidptr(&app) } - controllers: controllers_sorted - routes: &routes - timeout_in_seconds: 2 - } - mut conn := net.TcpConn{} - return handle_request[AppStaticRegressionApp, RegressionContext](mut conn, req, params) or { - panic(err) - } + params := RequestParams{ + global_app: unsafe { voidptr(&app) } + controllers_sorted: controllers_sorted + routes: &routes + benchmark_page_generation: false } + return handle_request_and_route[AppStaticRegressionApp, RegressionContext](mut app, req, 0, + params) } diff --git a/vlib/veb/server_d_new_veb.v b/vlib/veb/server_d_new_veb.v deleted file mode 100644 index 2267b920a..000000000 --- a/vlib/veb/server_d_new_veb.v +++ /dev/null @@ -1,44 +0,0 @@ -module veb - -import fasthttp - -@[heap] -pub struct Server { - handle fasthttp.ServerHandle - lifecycle_control bool -} - -fn new_server_with_lifecycle(handle fasthttp.ServerHandle) &Server { - return &Server{ - handle: handle - lifecycle_control: true - } -} - -fn new_server_without_lifecycle() &Server { - return &Server{} -} - -fn (s &Server) ensure_lifecycle_control() ! { - if !s.lifecycle_control { - return error('veb server lifecycle control requires `-d new_veb` without SSL') - } -} - -// wait_till_running waits until the server starts accepting requests. -pub fn (s &Server) wait_till_running(params WaitTillRunningParams) !int { - s.ensure_lifecycle_control()! - return s.handle.wait_till_running(fasthttp.WaitTillRunningParams{ - max_retries: params.max_retries - retry_period_ms: params.retry_period_ms - })! -} - -// shutdown gracefully stops accepting new requests and waits for in-flight requests to finish. -pub fn (s &Server) shutdown(params ShutdownParams) ! { - s.ensure_lifecycle_control()! - s.handle.shutdown(fasthttp.ShutdownParams{ - timeout: params.timeout - retry_period_ms: params.retry_period_ms - })! -} diff --git a/vlib/veb/server_notd_new_veb.v b/vlib/veb/server_notd_new_veb.v deleted file mode 100644 index 0ef9837c9..000000000 --- a/vlib/veb/server_notd_new_veb.v +++ /dev/null @@ -1,20 +0,0 @@ -module veb - -@[heap] -pub struct Server {} - -fn new_server_without_lifecycle() &Server { - return &Server{} -} - -// wait_till_running waits until the server starts accepting requests. -pub fn (s &Server) wait_till_running(params WaitTillRunningParams) !int { - _ = params - return error('veb server lifecycle control requires `-d new_veb` without SSL') -} - -// shutdown gracefully stops accepting new requests and waits for in-flight requests to finish. -pub fn (s &Server) shutdown(params ShutdownParams) ! { - _ = params - return error('veb server lifecycle control requires `-d new_veb` without SSL') -} diff --git a/vlib/veb/sse/sse.v b/vlib/veb/sse/sse.v index d8da713ee..b1aac920e 100644 --- a/vlib/veb/sse/sse.v +++ b/vlib/veb/sse/sse.v @@ -38,7 +38,7 @@ pub mut: // start an SSE connection pub fn start_connection(mut ctx veb.Context) &SSEConnection { if ctx.conn == unsafe { nil } { - eprintln('[veb.sse] WARNING: SSE requires a direct TCP connection (ctx.conn) which is not available with this server backend. Use the default (picoev) backend instead of `-d new_veb` for SSE support.') + eprintln('[veb.sse] WARNING: SSE requires a direct TCP connection (ctx.conn) which is not available. Use `ctx.takeover_conn()` before starting an SSE connection.') return &SSEConnection{ conn: unsafe { nil } } diff --git a/vlib/veb/tests/graceful_shutdown_test.v b/vlib/veb/tests/graceful_shutdown_test.v index 90bb4579f..87f611f61 100644 --- a/vlib/veb/tests/graceful_shutdown_test.v +++ b/vlib/veb/tests/graceful_shutdown_test.v @@ -1,125 +1,116 @@ -// vfmt off -fn test_stub() { - $if !new_veb ? { - return - } -} +import net +import net.http +import time +import veb -$if new_veb ? { - import net - import net.http - import time - import veb +const graceful_shutdown_host = '127.0.0.1' - const graceful_shutdown_host = '127.0.0.1' - - struct GracefulShutdownContext { - veb.Context - } +struct GracefulShutdownContext { + veb.Context +} - struct GracefulShutdownApp { - mut: - server &veb.Server = unsafe { nil } - slow_started chan bool = chan bool{cap: 1} - } +struct GracefulShutdownApp { +mut: + server &veb.Server = unsafe { nil } + slow_started chan bool = chan bool{cap: 1} +} - pub fn (mut app GracefulShutdownApp) init_server(server &veb.Server) { - app.server = server - } +pub fn (mut app GracefulShutdownApp) init_server(server &veb.Server) { + app.server = server +} - pub fn (mut app GracefulShutdownApp) index(mut ctx GracefulShutdownContext) veb.Result { - return ctx.text('ready') - } +pub fn (mut app GracefulShutdownApp) index(mut ctx GracefulShutdownContext) veb.Result { + return ctx.text('ready') +} - @['/slow'] - pub fn (mut app GracefulShutdownApp) slow(mut ctx GracefulShutdownContext) veb.Result { - app.slow_started <- true - time.sleep(300 * time.millisecond) - return ctx.text('slow done') - } +@['/slow'] +pub fn (mut app GracefulShutdownApp) slow(mut ctx GracefulShutdownContext) veb.Result { + app.slow_started <- true + time.sleep(300 * time.millisecond) + return ctx.text('slow done') +} - pub fn (mut app GracefulShutdownApp) shutdown(mut ctx GracefulShutdownContext) veb.Result { - spawn app.shutdown_now() - return ctx.text('shutting down') - } +pub fn (mut app GracefulShutdownApp) shutdown(mut ctx GracefulShutdownContext) veb.Result { + spawn app.shutdown_now() + return ctx.text('shutting down') +} - fn (app &GracefulShutdownApp) shutdown_now() { - mut server := app.server - if server == unsafe { nil } { - panic('veb server was not initialized') - } - server.shutdown() or { panic(err) } +fn (app &GracefulShutdownApp) shutdown_now() { + mut server := app.server + if server == unsafe { nil } { + panic('veb server was not initialized') } + server.shutdown() or { panic(err) } +} - fn run_graceful_shutdown_app(mut app GracefulShutdownApp, port int) { - veb.run_at[GracefulShutdownApp, GracefulShutdownContext](mut app, - host: graceful_shutdown_host - port: port - family: .ip - show_startup_message: false - ) or { panic(err) } - } +fn run_graceful_shutdown_app(mut app GracefulShutdownApp, port int) { + veb.run_at[GracefulShutdownApp, GracefulShutdownContext](mut app, + host: graceful_shutdown_host + port: port + family: .ip + show_startup_message: false + ) or { panic(err) } +} - fn send_slow_request(port int, responses chan http.Response) { - response := http.get('http://${graceful_shutdown_host}:${port}/slow') or { panic(err) } - responses <- response - } +fn send_slow_request(port int, responses chan http.Response) { + response := http.get('http://${graceful_shutdown_host}:${port}/slow') or { panic(err) } + responses <- response +} - fn wait_for_server(port int) ! { - url := 'http://${graceful_shutdown_host}:${port}/' - for _ in 0 .. 100 { - response := http.get(url) or { - time.sleep(20 * time.millisecond) - continue - } - if response.status() == .ok && response.body == 'ready' { - return - } +fn wait_for_server(port int) ! { + url := 'http://${graceful_shutdown_host}:${port}/' + for _ in 0 .. 100 { + response := http.get(url) or { time.sleep(20 * time.millisecond) + continue } - return error('server did not start listening in time') + if response.status() == .ok && response.body == 'ready' { + return + } + time.sleep(20 * time.millisecond) } + return error('server did not start listening in time') +} - fn test_veb_graceful_shutdown_waits_for_in_flight_requests() { - mut listener := net.listen_tcp(.ip, '${graceful_shutdown_host}:0') or { panic(err) } - port := listener.addr()!.port()! - listener.close() or {} +fn test_veb_graceful_shutdown_waits_for_in_flight_requests() { + mut listener := net.listen_tcp(.ip, '${graceful_shutdown_host}:0') or { panic(err) } + port := listener.addr()!.port()! + listener.close() or {} - mut app := &GracefulShutdownApp{} - server_thread := spawn run_graceful_shutdown_app(mut app, int(port)) + mut app := &GracefulShutdownApp{} + server_thread := spawn run_graceful_shutdown_app(mut app, int(port)) - wait_for_server(int(port)) or { - assert false, err.msg() - return - } + wait_for_server(int(port)) or { + assert false, err.msg() + return + } - slow_responses := chan http.Response{cap: 1} - spawn send_slow_request(int(port), slow_responses) - _ := <-app.slow_started or { - assert false, 'slow request did not start' - return - } + slow_responses := chan http.Response{cap: 1} + spawn send_slow_request(int(port), slow_responses) + _ := <-app.slow_started or { + assert false, 'slow request did not start' + return + } - shutdown_response := http.get('http://${graceful_shutdown_host}:${port}/shutdown') or { - assert false, err.msg() - return - } - assert shutdown_response.status() == .ok - assert shutdown_response.body == 'shutting down' + shutdown_response := http.get('http://${graceful_shutdown_host}:${port}/shutdown') or { + assert false, err.msg() + return + } + assert shutdown_response.status() == .ok + assert shutdown_response.body == 'shutting down' - slow_response := <-slow_responses or { - assert false, err.msg() - return - } - assert slow_response.status() == .ok - assert slow_response.body == 'slow done' + slow_response := <-slow_responses or { + assert false, err.msg() + return + } + assert slow_response.status() == .ok + assert slow_response.body == 'slow done' - server_thread.wait() + server_thread.wait() - http.get('http://${graceful_shutdown_host}:${port}/') or { - assert true - return - } - assert false, 'server still accepts new requests after shutdown' + http.get('http://${graceful_shutdown_host}:${port}/') or { + assert true + return } + assert false, 'server still accepts new requests after shutdown' } diff --git a/vlib/veb/tests/large_payload_test.v b/vlib/veb/tests/large_payload_test.v index 3e912296e..3f69b321f 100644 --- a/vlib/veb/tests/large_payload_test.v +++ b/vlib/veb/tests/large_payload_test.v @@ -53,7 +53,7 @@ fn testsuite_begin() { fn test_large_request_body() { // string of a's of 8.96mb send over the connection - // veb reads a maximum of 4096KB per picoev loop cycle + // veb reads a maximum of 4096KB per read cycle // this test tests if veb is able to do multiple of these // cycles and updates the response body each cycle mut buf := []u8{len: veb.max_read * 10, init: `a`} diff --git a/vlib/veb/tests/parallel_event_loop_test.v b/vlib/veb/tests/parallel_event_loop_test.v deleted file mode 100644 index ba3a1e3f2..000000000 --- a/vlib/veb/tests/parallel_event_loop_test.v +++ /dev/null @@ -1,138 +0,0 @@ -import net -import net.http -import time -import veb - -fn test_stub() { - $if new_veb ? { - return - } -} - -$if !new_veb ? { - const parallel_event_loop_host = '127.0.0.1' - - struct ParallelEventLoopContext { - veb.Context - } - - struct ParallelEventLoopApp {} - - fn (app &ParallelEventLoopApp) index(mut ctx ParallelEventLoopContext) veb.Result { - return ctx.text('ready') - } - - @['/slow'] - fn (app &ParallelEventLoopApp) slow(mut ctx ParallelEventLoopContext) veb.Result { - time.sleep(500 * time.millisecond) - return ctx.text('slow done') - } - - fn run_event_loop_app(mut app ParallelEventLoopApp, port int, nr_workers int) { - veb.run_at[ParallelEventLoopApp, ParallelEventLoopContext](mut app, - host: parallel_event_loop_host - port: port - family: .ip - nr_workers: nr_workers - show_startup_message: false - timeout_in_seconds: 3 - ) or { panic(err) } - } - - fn wait_for_parallel_event_loop_server(port int) ! { - url := 'http://${parallel_event_loop_host}:${port}/' - for _ in 0 .. 100 { - response := http.get(url) or { - time.sleep(20 * time.millisecond) - continue - } - if response.status() == .ok && response.body == 'ready' { - return - } - time.sleep(20 * time.millisecond) - } - return error('server did not start listening in time') - } - - fn fetch_parallel_event_loop_response(port int, responses chan http.Response) { - response := http.get('http://${parallel_event_loop_host}:${port}/slow') or { panic(err) } - responses <- response - } - - fn next_parallel_event_loop_port() int { - mut listener := net.listen_tcp(.ip, '${parallel_event_loop_host}:0') or { panic(err) } - addr := listener.addr() or { panic(err) } - port := addr.port() or { panic(err) } - listener.close() or {} - return int(port) - } - - fn test_single_picoev_worker_still_serves_requests() { - $if !(linux || termux) { - return - } - port := next_parallel_event_loop_port() - mut app := &ParallelEventLoopApp{} - spawn run_event_loop_app(mut app, port, 1) - - wait_for_parallel_event_loop_server(port) or { - assert false, err.msg() - return - } - - response := http.get('http://${parallel_event_loop_host}:${port}/slow') or { - assert false, err.msg() - return - } - assert response.status() == .ok - assert response.body == 'slow done' - } - - fn test_parallel_picoev_workers_handle_requests_concurrently() { - $if !(linux || termux) { - return - } - port := next_parallel_event_loop_port() - mut app := &ParallelEventLoopApp{} - spawn run_event_loop_app(mut app, port, 2) - - wait_for_parallel_event_loop_server(port) or { - assert false, err.msg() - return - } - - responses := chan http.Response{cap: 2} - watch := time.new_stopwatch(auto_start: true) - spawn fetch_parallel_event_loop_response(port, responses) - spawn fetch_parallel_event_loop_response(port, responses) - - first := <-responses or { - assert false, err.msg() - return - } - second := <-responses or { - assert false, err.msg() - return - } - assert first.status() == .ok - assert second.status() == .ok - assert first.body == 'slow done' - assert second.body == 'slow done' - assert watch.elapsed() < 900 * time.millisecond - } - - fn test_parallel_picoev_workers_reject_invalid_nr_workers() { - mut app := &ParallelEventLoopApp{} - veb.run_at[ParallelEventLoopApp, ParallelEventLoopContext](mut app, - host: parallel_event_loop_host - port: 1 - family: .ip - nr_workers: 0 - show_startup_message: false - ) or { - assert err.msg().contains('invalid nr_workers') - return - } - assert false, 'run_at should reject nr_workers <= 0' - } -} diff --git a/vlib/veb/tests/veb_test.v b/vlib/veb/tests/veb_test.v index 919966ac6..fa35e3500 100644 --- a/vlib/veb/tests/veb_test.v +++ b/vlib/veb/tests/veb_test.v @@ -15,7 +15,6 @@ const vexe = os.getenv('VEXE') const veb_logfile = os.getenv('VEB_LOGFILE') const vroot = os.dir(vexe) const serverexe = os.join_path(os.cache_dir(), 'veb_test_server.exe') -const serverexe_new = os.join_path(os.cache_dir(), 'veb_test_server_new_veb.exe') const tcp_r_timeout = 10 * time.second const tcp_w_timeout = 10 * time.second @@ -25,9 +24,6 @@ fn testsuite_begin() { if os.exists(serverexe) { os.rm(serverexe) or {} } - if os.exists(serverexe_new) { - os.rm(serverexe_new) or {} - } } fn test_simple_veb_app_can_be_compiled() { @@ -38,14 +34,6 @@ fn test_simple_veb_app_can_be_compiled() { assert os.exists(serverexe) } -fn test_new_veb_app_can_be_compiled() { - // Ensure the new fasthttp backend builds successfully. - did_server_compile := - os.system('${os.quoted_path(vexe)} -d new_veb -o ${os.quoted_path(serverexe_new)} vlib/veb/tests/veb_test_server.v') - assert did_server_compile == 0 - assert os.exists(serverexe_new) -} - fn test_a_simple_veb_app_runs_in_the_background() { mut suffix := '' $if !windows { diff --git a/vlib/veb/veb.v b/vlib/veb/veb.v index 7d0959ca0..7643ccd84 100644 --- a/vlib/veb/veb.v +++ b/vlib/veb/veb.v @@ -76,11 +76,9 @@ pub fn run[A, X](mut global_app A, port int) { pub struct RunParams { pub: // use `family: .ip, host: 'localhost'` when you want it to bind only to 127.0.0.1 - family net.AddrFamily = .ip6 - host string - port int = default_port - // number of picoev event loops for the default non-SSL backend. - // keep `1` to preserve the historical single-loop behavior. + family net.AddrFamily = .ip6 + host string + port int = default_port nr_workers int = 1 show_startup_message bool = true timeout_in_seconds int = 30 @@ -376,84 +374,6 @@ fn should_close_ssl_connection(req http.Request, resp http.Response, client_want return req.version != .v1_1 } -struct FileResponse { -pub mut: - open bool - file os.File - total i64 - pos i64 - should_close_conn bool -} - -// close the open file and reset the struct to its default values -pub fn (mut fr FileResponse) done() { - fr.open = false - fr.file.close() - fr.total = 0 - fr.pos = 0 - fr.should_close_conn = false -} - -struct StringResponse { -pub mut: - open bool - str string - pos i64 - should_close_conn bool -} - -// free the current string and reset the struct to its default values -@[manualfree] -pub fn (mut sr StringResponse) done() { - sr.open = false - sr.pos = 0 - sr.should_close_conn = false - unsafe { sr.str.free() } -} - -$if !new_veb ? { - // EV context - struct RequestParams { - global_app voidptr - controllers []&ControllerPath - routes &map[string]Route - timeout_in_seconds int - mut: - // request body buffer - buf &u8 = unsafe { nil } - // request bodies are assembled in byte buffers to avoid repeated string reallocations - body_buffers [][]u8 - // chunked request bodies track framing separately so they can be decoded once when complete - chunked_body_trackers []ChunkedBodyTracker - // idx keeps track of how much of the request body has been read - // for each incomplete request, see `handle_conn` - idx []int - incomplete_requests []http.Request - file_responses []FileResponse - string_responses []StringResponse - } - - // reset request parameters for `fd`: - // reset content-length index and the http request - @[manualfree] - pub fn (mut params RequestParams) request_done(fd int) { - mut request := ¶ms.incomplete_requests[fd] - request.reset() - if params.body_buffers[fd].cap > 0 { - unsafe { params.body_buffers[fd].free() } - params.body_buffers[fd] = []u8{} - } - if params.chunked_body_trackers[fd].line_buf.cap > 0 { - unsafe { params.chunked_body_trackers[fd].line_buf.free() } - } - params.chunked_body_trackers[fd] = ChunkedBodyTracker{} - params.idx[fd] = 0 - $if trace_handle_read ? { - eprintln('>>>>> fd: ${fd} | request_done.') - } - } -} - interface BeforeAcceptApp { mut: before_accept_loop() @@ -794,17 +714,6 @@ fn send_string(mut conn net.TcpConn, s string) ! { conn.write_string(s)! } -// send a string ptr over `conn` -fn send_string_ptr(mut conn net.TcpConn, ptr &u8, len int) !int { - $if trace_send_string_conn ? { - eprintln('> send_string: conn: ${ptr_str(conn)}') - } - if voidptr(conn) == unsafe { nil } { - return error('connection was closed before send_string') - } - return conn.write_ptr(ptr, len) -} - // Set s to the form error pub fn (mut ctx Context) error(s string) { eprintln('[veb] Context.error: ${s}') diff --git a/vlib/veb/veb_d_new_veb.v b/vlib/veb/veb_d_new_veb.v deleted file mode 100644 index 795c5401b..000000000 --- a/vlib/veb/veb_d_new_veb.v +++ /dev/null @@ -1,163 +0,0 @@ -// Copyright (c) 2019-2024 Alexander Medvednikov. All rights reserved. -// Use of this source code is governed by an MIT license -// that can be found in the LICENSE file. -module veb - -import fasthttp -import net.http -import time -import net.urllib - -struct RequestParams { - global_app voidptr - controllers_sorted []&ControllerPath - routes &map[string]Route - benchmark_page_generation bool -} - -const http_ok_response = 'HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: 0\r\nConnection: keep-alive\r\n\r\n'.bytes() - -pub fn run_at[A, X](mut global_app A, params RunParams) ! { - run_new[A, X](mut global_app, params)! -} - -// run_new - start a new veb server using the parallel fasthttp backend. -pub fn run_new[A, X](mut global_app A, params RunParams) ! { - if params.port <= 0 || params.port > 65535 { - return error('invalid port number `${params.port}`, it should be between 1 and 65535') - } - if ssl_enabled(params) { - maybe_init_server[A](mut global_app, new_server_without_lifecycle()) - run_at_with_ssl[A, X](mut global_app, params)! - return - } - - // Generate routes and controllers just like the original run() function. - routes := generate_routes[A, X](global_app)! - controllers_sorted := check_duplicate_routes_in_controllers[A](global_app, routes)! - - // Allocate params on the heap to keep it valid for the server lifetime - request_params := &RequestParams{ - global_app: unsafe { voidptr(&global_app) } - controllers_sorted: controllers_sorted - routes: &routes - benchmark_page_generation: params.benchmark_page_generation - } - - // Configure and run the fasthttp server - mut server := fasthttp.new_server(fasthttp.ServerConfig{ - family: params.family - port: params.port - handler: parallel_request_handler[A, X] - max_request_buffer_size: params.max_request_buffer_size - user_data: voidptr(request_params) - }) or { - eprintln('Failed to create server: ${err}') - return - } - maybe_init_server[A](mut global_app, new_server_with_lifecycle(server.handle())) - println('[veb] Running multi-threaded app on ${server_protocol(params)}://${startup_host(params)}:${params.port}/') - flush_stdout() - $if A is BeforeAcceptApp { - global_app.before_accept_loop() - } - server.run() or { panic(err) } -} - -fn parallel_request_handler[A, X](req fasthttp.HttpRequest) !fasthttp.HttpResponse { - // Get parameters from user_data - copy to avoid use-after-free - params := unsafe { *(&RequestParams(req.user_data)) } - mut global_app := unsafe { &A(params.global_app) } - - client_fd := req.client_conn_fd - - s := req.buffer.bytestr() - // Parse the raw request bytes into a standard `http.Request`. - req2 := http.parse_request_str(s.clone()) or { - return fasthttp.HttpResponse{ - content: 'HTTP/1.1 500 Internal Server Error\r\nContent-Length: 0\r\nConnection: close\r\n\r\n'.bytes() - } - } - // Create and populate the `veb.Context`. - completed_context := handle_request_and_route[A, X](mut global_app, req2, client_fd, params) - - if completed_context.takeover { - // The handler has taken over the connection (e.g. for SSE or WebSocket). - // The response was already sent directly over ctx.conn. - // Tell fasthttp to hand off the fd without closing it. - return fasthttp.HttpResponse{ - takeover: true - } - } - - if completed_context.return_type == .file { - return fasthttp.HttpResponse{ - content: completed_context.res.bytes() - file_path: completed_context.return_file - should_close: completed_context.client_wants_to_close - } - } - - // The fasthttp server expects a complete response buffer to be returned. - return fasthttp.HttpResponse{ - content: completed_context.res.bytes() - should_close: completed_context.client_wants_to_close - } -} // handle_request_and_route is a unified function that creates the context, - -// runs middleware, and finds the correct route for a request. -fn handle_request_and_route[A, X](mut app A, req http.Request, _client_fd int, params RequestParams) &Context { - // Create and populate the `veb.Context` from the request. - mut url := urllib.parse_request_uri(req.url) or { - // This should be rare if http.parse_request succeeded. - mut bad_ctx := &Context{ - req: req - } - bad_ctx.not_found() - return bad_ctx - } - query := parse_query_from_url(url) - form, files := parse_form_from_request(req) or { - mut bad_ctx := &Context{ - req: req - } - bad_ctx.request_error('Failed to parse form data: ${err.msg()}') - return bad_ctx - } - host_with_port := req.header.get(.host) or { '' } - host, _ := urllib.split_host_port(host_with_port) - page_gen_start := if params.benchmark_page_generation { time.ticks() } else { 0 } - mut ctx := &Context{ - req: req - page_gen_start: page_gen_start - client_fd: _client_fd - client_wants_to_close: true // fasthttp always closes connections after response - query: query - form: form - files: files - } - $if A is StaticApp { - ctx.custom_mime_types = app.static_mime_types.clone() - mut user_context := X{} - user_context.Context = ctx - if serve_if_static[X](static_handler_config(app.static_files, app.static_mime_types, - app.static_hosts, app.enable_static_gzip, app.enable_static_zstd, - app.enable_static_compression, app.static_compression_max_size, - app.static_compression_mime_types, app.enable_markdown_negotiation), mut user_context, - url, host) - { - return &user_context.Context - } - } - // Match controller paths first - $if A is ControllerInterface { - if completed_context := handle_controllers[X](params.controllers_sorted, ctx, mut url, host) { - return completed_context - } - } - // Create a new user context and pass veb's context - mut user_context := X{} - user_context.Context = ctx - handle_route[A, X](mut app, mut user_context, url, host, params.routes) - return &user_context.Context -} diff --git a/vlib/veb/veb_picoev.v b/vlib/veb/veb_picoev.v deleted file mode 100644 index 24482a1a5..000000000 --- a/vlib/veb/veb_picoev.v +++ /dev/null @@ -1,838 +0,0 @@ -module veb - -$if !new_veb ? { - import os - import picoev - import time - import net - import net.http - import io - import net.urllib - import strconv - import strings -} - -$if !new_veb ? { - // run_at - start a new veb server, listening only on a specific address `host`, at the specified `port` - // Usage example: veb.run_at(new_app(), host: 'localhost' port: 8099 family: .ip)! - @[direct_array_access; manualfree] - pub fn run_at[A, X](mut global_app A, params RunParams) ! { - if params.port <= 0 || params.port > 65535 { - return error('invalid port number `${params.port}`, it should be between 1 and 65535') - } - if params.nr_workers < 1 { - return error('invalid nr_workers `${params.nr_workers}`, it should be above 0') - } - if params.nr_workers > 1 && !picoev_parallel_workers_supported() && !ssl_enabled(params) { - return error('parallel picoev workers require linux or termux; use `-d new_veb` on other platforms') - } - maybe_init_server[A](mut global_app, new_server_without_lifecycle()) - if ssl_enabled(params) { - run_at_with_ssl[A, X](mut global_app, params)! - return - } - routes := generate_routes[A, X](global_app)! - controllers_sorted := check_duplicate_routes_in_controllers[A](global_app, routes)! - mut workers := []&picoev.Picoev{cap: params.nr_workers} - global_app_ptr := unsafe { voidptr(&global_app) } - for _ in 0 .. params.nr_workers { - workers << new_picoev_worker[A, X](global_app_ptr, controllers_sorted, &routes, params)! - } - if params.show_startup_message { - println('[veb] Running app on ${server_protocol(params)}://${startup_host(params)}:${params.port}/') - if params.nr_workers > 1 { - println('[veb] Using ${params.nr_workers} picoev workers') - } - } - flush_stdout() - $if A is BeforeAcceptApp { - global_app.before_accept_loop() - } - start_picoev_workers(mut workers) - } - - fn picoev_parallel_workers_supported() bool { - $if linux || termux { - return true - } $else { - return false - } - } - - @[manualfree] - fn new_picoev_worker[A, X](global_app_ptr voidptr, controllers_sorted []&ControllerPath, routes &map[string]Route, params RunParams) !&picoev.Picoev { - mut pico_context := &RequestParams{ - global_app: global_app_ptr - controllers: controllers_sorted - routes: unsafe { routes } - timeout_in_seconds: params.timeout_in_seconds - } - pico_context.idx = []int{len: picoev.max_fds} - // reserve space for read and write buffers - pico_context.buf = unsafe { malloc_noscan(picoev.max_fds * max_read + 1) } - pico_context.body_buffers = [][]u8{len: picoev.max_fds} - pico_context.chunked_body_trackers = []ChunkedBodyTracker{len: picoev.max_fds} - pico_context.incomplete_requests = []http.Request{len: picoev.max_fds} - pico_context.file_responses = []FileResponse{len: picoev.max_fds} - pico_context.string_responses = []StringResponse{len: picoev.max_fds} - return picoev.new( - port: params.port - raw_cb: ev_callback[A, X] - user_data: pico_context - timeout_secs: params.timeout_in_seconds - family: params.family - host: params.host - )! - } - - fn start_picoev_workers(mut workers []&picoev.Picoev) { - for i in 1 .. workers.len { - mut worker := workers[i] - spawn serve_picoev_worker(mut worker) - } - mut worker := workers[0] - serve_picoev_worker(mut worker) - } - - fn serve_picoev_worker(mut worker picoev.Picoev) { - worker.serve() - } - - @[direct_array_access] - fn ev_callback[A, X](mut pv picoev.Picoev, fd int, events int) { - mut params := unsafe { &RequestParams(pv.user_data) } - if events == picoev.picoev_timeout { - $if trace_picoev_callback ? { - eprintln('> request timeout on file descriptor ${fd}') - } - handle_timeout(mut pv, mut params, fd) - } else if events == picoev.picoev_write { - $if trace_picoev_callback ? { - eprintln('> write event on file descriptor ${fd}') - } - if params.file_responses[fd].open { - handle_write_file(mut pv, mut params, fd) - } else if params.string_responses[fd].open { - handle_write_string(mut pv, mut params, fd) - } else { - // This should never happen, but it does on pages, that refer to static resources, - // in folders, added with `mount_static_folder_at`. See also - // https://github.com/vlang/edu-platform/blob/0c203f0384cf24f917f9a7c9bb150f8d64aca00f/main.v#L92 - $if debug_ev_callback ? { - eprintln('[veb] error: write event on connection should be closed') - } - pv.close_conn(fd) - } - } else if events == picoev.picoev_read { - $if trace_picoev_callback ? { - eprintln('> read event on file descriptor ${fd}') - } - handle_read[A, X](mut pv, mut params, fd) - } else { - // should never happen - eprintln('[veb] error: invalid picoev event ${events}') - } - } - - fn handle_timeout(mut pv picoev.Picoev, mut params RequestParams, fd int) { - mut conn := &net.TcpConn{ - sock: net.tcp_socket_from_handle_raw(fd) - handle: fd - is_blocking: false - } - fast_send_resp(mut conn, http_408) or {} - if params.file_responses[fd].open { - params.file_responses[fd].done() - } - if params.string_responses[fd].open { - params.string_responses[fd].done() - } - pv.close_conn(fd) - params.request_done(fd) - } - - struct IncompleteChunkedRequestBodyError { - Error - } - - enum ChunkedBodyTrackerState { - chunk_size - chunk_data - chunk_data_crlf_start - chunk_data_crlf_end - trailer_line - } - - struct ChunkedBodyTracker { - mut: - state ChunkedBodyTrackerState = .chunk_size - line_buf []u8 - chunk_left u64 - complete bool - invalid bool - } - - fn (mut tracker ChunkedBodyTracker) advance(data []u8) bool { - if tracker.complete || tracker.invalid || data.len == 0 { - return tracker.complete - } - mut i := 0 - for i < data.len { - match tracker.state { - .chunk_size { - ch := data[i] - i++ - if ch == `\r` { - continue - } - if ch != `\n` { - tracker.line_buf << ch - continue - } - chunk_size := parse_chunked_size_line_fast(tracker.line_buf) or { - tracker.invalid = true - return false - } - tracker.line_buf.clear() - if chunk_size == 0 { - tracker.state = .trailer_line - continue - } - tracker.chunk_left = chunk_size - tracker.state = .chunk_data - } - .chunk_data { - available := data.len - i - if tracker.chunk_left < u64(available) { - i += int(tracker.chunk_left) - tracker.chunk_left = 0 - } else { - tracker.chunk_left -= u64(available) - i = data.len - } - if tracker.chunk_left == 0 { - tracker.state = .chunk_data_crlf_start - } - } - .chunk_data_crlf_start { - if data[i] != `\r` { - tracker.invalid = true - return false - } - i++ - tracker.state = .chunk_data_crlf_end - } - .chunk_data_crlf_end { - if data[i] != `\n` { - tracker.invalid = true - return false - } - i++ - tracker.state = .chunk_size - } - .trailer_line { - ch := data[i] - i++ - if ch == `\r` { - continue - } - if ch != `\n` { - tracker.line_buf << ch - continue - } - if tracker.line_buf.len == 0 { - tracker.complete = true - return true - } - tracker.line_buf.clear() - } - } - } - return tracker.complete - } - - fn parse_chunked_size_line_fast(line []u8) !u64 { - mut size := u64(0) - mut has_digit := false - for ch in line { - if ch == `;` { - break - } - if !ch.is_hex_digit() { - return error('invalid chunk size') - } - has_digit = true - size = (size << 4) | u64(chunked_hex_value(ch)) - } - if !has_digit { - return error('invalid chunk size') - } - return size - } - - fn chunked_hex_value(ch u8) u8 { - if `0` <= ch && ch <= `9` { - return ch - `0` - } - if `a` <= ch && ch <= `f` { - return ch - `a` + 10 - } - if `A` <= ch && ch <= `F` { - return ch - `A` + 10 - } - return 0 - } - - fn (err IncompleteChunkedRequestBodyError) msg() string { - return 'incomplete chunked request body' - } - - fn decode_chunked_request_body(encoded_body string) !string { - mut sb := strings.new_builder(encoded_body.len) - mut idx := 0 - for { - chunk_line_end := encoded_body.index_after('\r\n', idx) or { - return IncompleteChunkedRequestBodyError{} - } - mut chunk_size_line := encoded_body[idx..chunk_line_end].trim_space() - if chunk_size_line.len == 0 { - return error('invalid chunk size line') - } - if semicolon_idx := chunk_size_line.index(';') { - chunk_size_line = chunk_size_line[..semicolon_idx].trim_space() - } - if chunk_size_line.len == 0 { - return error('invalid chunk size line') - } - chunk_size_u64 := strconv.parse_uint(chunk_size_line, 16, 64) or { - return error('invalid chunk size line') - } - if chunk_size_u64 > u64(max_int) { - return error('chunk size too large') - } - chunk_size := int(chunk_size_u64) - idx = chunk_line_end + 2 - if chunk_size == 0 { - if idx + 2 > encoded_body.len { - return IncompleteChunkedRequestBodyError{} - } - if encoded_body[idx] == `\r` && encoded_body[idx + 1] == `\n` { - idx += 2 - if idx != encoded_body.len { - return error('invalid chunk trailer') - } - return sb.str() - } - trailer_end := encoded_body.index_after('\r\n\r\n', idx) or { - return IncompleteChunkedRequestBodyError{} - } - if trailer_end + 4 != encoded_body.len { - return error('invalid chunk trailer') - } - return sb.str() - } - if idx + chunk_size + 2 > encoded_body.len { - return IncompleteChunkedRequestBodyError{} - } - sb.write_string(encoded_body[idx..idx + chunk_size]) - idx += chunk_size - if encoded_body[idx] != `\r` || encoded_body[idx + 1] != `\n` { - return error('invalid chunk delimiter') - } - idx += 2 - } - return error('invalid chunked body') - } - - @[inline] - fn append_request_body(mut params RequestParams, fd int, chunk []u8, expected_len int) { - if chunk.len == 0 { - return - } - if params.body_buffers[fd].cap == 0 { - initial_cap := if expected_len > 0 { expected_len } else { chunk.len } - params.body_buffers[fd] = []u8{cap: initial_cap} - } - params.body_buffers[fd] << chunk - } - - @[inline; manualfree] - fn finalize_request_body(mut params RequestParams, fd int) string { - body := params.body_buffers[fd].bytestr() - unsafe { params.body_buffers[fd].free() } - params.body_buffers[fd] = []u8{} - return body - } - - // handle_write_file reads data from a file and sends that data over the socket. - @[direct_array_access; manualfree] - fn handle_write_file(mut pv picoev.Picoev, mut params RequestParams, fd int) { - mut bytes_to_write := int(params.file_responses[fd].total - params.file_responses[fd].pos) - - $if linux || freebsd { - bytes_written := sendfile(fd, params.file_responses[fd].file.fd, bytes_to_write) - if bytes_written < 0 { - params.file_responses[fd].pos += bytes_to_write - } else { - params.file_responses[fd].pos += bytes_written - } - } $else { - if bytes_to_write > max_write { - bytes_to_write = max_write - } - data := unsafe { malloc(bytes_to_write) } - defer { - unsafe { free(data) } - } - mut conn := &net.TcpConn{ - sock: net.tcp_socket_from_handle_raw(fd) - handle: fd - is_blocking: false - write_timeout: params.timeout_in_seconds * time.second - } - params.file_responses[fd].file.read_into_ptr(data, bytes_to_write) or { - params.file_responses[fd].done() - pv.close_conn(fd) - return - } - actual_written := send_string_ptr(mut conn, data, bytes_to_write) or { - params.file_responses[fd].done() - pv.close_conn(fd) - return - } - params.file_responses[fd].pos += actual_written - } - if params.file_responses[fd].pos == params.file_responses[fd].total { - // file is done writing - params.file_responses[fd].done() - handle_complete_request(params.file_responses[fd].should_close_conn, mut pv, fd) - return - } - } - - @[manualfree] - fn send_file_inline(mut conn net.TcpConn, mut file os.File, total i64, timeout_in_seconds int) ! { - conn.write_timeout = timeout_in_seconds * time.second - data := unsafe { malloc(max_write) } - defer { - unsafe { free(data) } - } - mut remaining := total - for remaining > 0 { - bytes_to_write := if remaining > max_write { max_write } else { int(remaining) } - bytes_read := file.read_into_ptr(data, bytes_to_write)! - if bytes_read <= 0 { - return error('unexpected EOF while sending file response') - } - send_string_ptr(mut conn, data, bytes_read)! - remaining -= bytes_read - } - } - - // handle_write_string reads data from a string and sends that data over the socket - @[direct_array_access] - fn handle_write_string(mut pv picoev.Picoev, mut params RequestParams, fd int) { - mut bytes_to_write := int(params.string_responses[fd].str.len - params.string_responses[fd].pos) - if bytes_to_write > max_write { - bytes_to_write = max_write - } - mut conn := &net.TcpConn{ - sock: net.tcp_socket_from_handle_raw(fd) - handle: fd - is_blocking: false - } - - // pointer magic to start at the correct position in the buffer - data := unsafe { params.string_responses[fd].str.str + params.string_responses[fd].pos } - actual_written := send_string_ptr(mut conn, data, bytes_to_write) or { - params.string_responses[fd].done() - pv.close_conn(fd) - return - } - params.string_responses[fd].pos += actual_written - if params.string_responses[fd].pos == params.string_responses[fd].str.len { - // done writing - params.string_responses[fd].done() - pv.close_conn(fd) - handle_complete_request(params.string_responses[fd].should_close_conn, mut pv, fd) - return - } - } - - // handle_read reads data from the connection and if the request is complete - // it calls `handle_route` and closes the connection. - // If the request is not complete, it stores the incomplete request in `params` - // and the connection stays open until it is ready to read again - @[direct_array_access; manualfree] - fn handle_read[A, X](mut pv picoev.Picoev, mut params RequestParams, fd int) { - mut conn := &net.TcpConn{ - sock: net.tcp_socket_from_handle_raw(fd) - handle: fd - is_blocking: false - } - // cap the max_read to 8KB - mut reader := io.new_buffered_reader(reader: conn, cap: max_read) - defer { unsafe { reader.free() } } - // take the previous incomplete request - mut req := params.incomplete_requests[fd] - // check if there is an incomplete request for this file descriptor - if params.idx[fd] == 0 { - $if trace_handle_read ? { - eprintln('>>>>> fd: ${fd} | start of request parsing') - } - // this is the start of a new request, setup the connection, and read the headers: - // set the read and write timeout according to picoev settings when the - // connection is first encountered - conn.set_read_timeout(params.timeout_in_seconds) - conn.set_write_timeout(params.timeout_in_seconds) - // first time that this connection is being read from, so we parse the - // request header first - req = http.parse_request_head(mut reader) or { - // Prevents errors from being thrown when BufferedReader is empty - if err !is io.Eof { - eprintln('[veb] error parsing request: ${err}') - } - // the buffered reader was empty meaning that the client probably - // closed the connection. - pv.close_conn(fd) - params.request_done(fd) - return - } - if reader.total_read >= max_read { - // throw an error when the request header is larger than 8KB - // same limit that apache handles - eprintln('[veb] error parsing request: too large') - fast_send_resp(mut conn, http_413) or {} - pv.close_conn(fd) - params.request_done(fd) - return - } - } - if params.idx[fd] == -1 { - // this is for sure a continuation of a previous request, where the first part contained only headers; - // make sure that we are ready to accept the body and account for every byte in it, by setting the counter to 0: - params.idx[fd] = 0 - $if trace_handle_read ? { - eprintln('>>>>> fd: ${fd} | continuation of request, where the first part contained headers') - } - } - if transfer_encoding_is_chunked(req.header) { - mut max_bytes_to_read := max_read - reader.total_read - if max_bytes_to_read > 0 { - mut buf_ptr := params.buf - unsafe { - buf_ptr += fd * max_read // pointer magic - } - // convert to []u8 for BufferedReader - mut buf := unsafe { buf_ptr.vbytes(max_bytes_to_read) } - n := reader.read(mut buf) or { - if reader.total_read > 0 { - // The headers were parsed in this cycle, but the body has not been sent yet. No need to error. - params.idx[fd] = -1 // avoid reparsing the headers on the next call. - params.incomplete_requests[fd] = req - $if trace_handle_read ? { - eprintln('>>>>> fd: ${fd} | request headers were parsed, but the chunked body has not been parsed yet') - } - return - } - eprintln('[veb] error reading chunked request body: ${err}') - pv.close_conn(fd) - params.request_done(fd) - return - } - if n > 0 { - append_request_body(mut params, fd, buf[0..n], 0) - params.idx[fd] += n - params.chunked_body_trackers[fd].advance(buf[0..n]) - if params.chunked_body_trackers[fd].invalid { - eprintln('[veb] error decoding chunked request body: invalid chunked body') - fast_send_resp(mut conn, http.new_response( - status: .bad_request - body: 'Invalid chunked request body' - header: http.new_header( - key: .content_type - value: 'text/plain' - ).join(headers_close) - )) or {} - pv.close_conn(fd) - params.request_done(fd) - return - } - } - } - if !params.chunked_body_trackers[fd].complete { - if params.idx[fd] == 0 { - params.idx[fd] = -1 - } - params.incomplete_requests[fd] = req - $if trace_handle_read ? { - eprintln('>>>>> request is NOT complete (chunked), fd: ${fd} | raw_body_buffer.len: ${params.body_buffers[fd].len} | params.idx[fd]: ${params.idx[fd]}') - } - return - } - raw_body := finalize_request_body(mut params, fd) - req.data = decode_chunked_request_body(raw_body) or { - eprintln('[veb] error decoding chunked request body: ${err}') - fast_send_resp(mut conn, http.new_response( - status: .bad_request - body: 'Invalid chunked request body' - header: http.new_header( - key: .content_type - value: 'text/plain' - ).join(headers_close) - )) or {} - pv.close_conn(fd) - params.request_done(fd) - return - } - } else { - // check if the request has a body - content_length := req.header.get(.content_length) or { '0' } - content_length_i := content_length.int() - if content_length_i > 0 { - mut max_bytes_to_read := max_read - reader.total_read - mut bytes_to_read := content_length_i - params.idx[fd] - // cap the bytes to read to 8KB for the body, including the request headers if any - if bytes_to_read > max_read - reader.total_read { - bytes_to_read = max_read - reader.total_read - } - mut buf_ptr := params.buf - unsafe { - buf_ptr += fd * max_read // pointer magic - } - // convert to []u8 for BufferedReader - mut buf := unsafe { buf_ptr.vbytes(max_bytes_to_read) } - n := reader.read(mut buf) or { - if reader.total_read > 0 { - // The headers were parsed in this cycle, but the body has not been sent yet. No need to error. - params.idx[fd] = -1 // avoid reparsing the headers on the next call. - params.incomplete_requests[fd] = req - $if trace_handle_read ? { - eprintln('>>>>> fd: ${fd} | request headers were parsed, but the body has not been parsed yet | params.idx[fd]: ${params.idx[fd]} | content_length_i: ${content_length_i}') - } - return - } - eprintln('[veb] error reading request body: ${err}') - if err is io.Eof { - // we expect more data to be send, but an Eof error occurred, meaning - // that there is no more data to be read from the socket. - // And at this point we expect that there is data to be read for the body. - fast_send_resp(mut conn, http.new_response( - status: .bad_request - body: 'Mismatch of body length and Content-Length header' - header: http.new_header( - key: .content_type - value: 'text/plain' - ).join(headers_close) - )) or {} - } - pv.close_conn(fd) - params.request_done(fd) - return - } - // there is no more data to be sent, but it is less than the Content-Length header - // so it is a mismatch of body length and content length. - // Or if there is more data received then the Content-Length header specified - if (n == 0 && params.idx[fd] != 0) || params.idx[fd] + n > content_length_i { - fast_send_resp(mut conn, http.new_response( - status: .bad_request - body: 'Mismatch of body length and Content-Length header' - header: http.new_header( - key: .content_type - value: 'text/plain' - ).join(headers_close) - )) or {} - pv.close_conn(fd) - params.request_done(fd) - return - } else if n < bytes_to_read || params.idx[fd] + n < content_length_i { - // request is incomplete wait until the socket becomes ready to read again - append_request_body(mut params, fd, buf[0..n], content_length_i) - params.incomplete_requests[fd] = req - params.idx[fd] += n - $if trace_handle_read ? { - eprintln('>>>>> request is NOT complete, fd: ${fd} | n: ${n} | body_buffer.len: ${params.body_buffers[fd].len} | params.idx[fd]: ${params.idx[fd]}') - } - return - } else { - // request is complete: n = bytes_to_read - append_request_body(mut params, fd, buf[0..n], content_length_i) - req.data = finalize_request_body(mut params, fd) - params.idx[fd] += n - $if trace_handle_read ? { - eprintln('>>>>> request is NOW COMPLETE, fd: ${fd} | n: ${n} | req.data.len: ${req.data.len}') - } - } - } - } - defer { params.request_done(fd) } - if completed_context := handle_request[A, X](mut conn, req, params) { - if completed_context.takeover { - // the connection should be kept open, but removed from the picoev loop. - // This way veb can continue handling other connections and the user can - // keep the connection open indefinitely - pv.delete(fd) - return - } - match completed_context.return_type { - .normal { - // small optimization: if the response is small write it immediately - // the socket is most likely able to write all the data without blocking. - // See Context.send_file for why we use max_read instead of max_write. - if completed_context.res.body.len < max_read { - fast_send_resp(mut conn, completed_context.res) or {} - handle_complete_request(completed_context.client_wants_to_close, mut pv, fd) - } else { - params.string_responses[fd].open = true - params.string_responses[fd].str = completed_context.res.body - res := pv.add(fd, picoev.picoev_write, params.timeout_in_seconds, - picoev.raw_callback) - // picoev error - if res == -1 { - // should not happen - params.string_responses[fd].done() - fast_send_resp(mut conn, http_500) or {} - handle_complete_request(completed_context.client_wants_to_close, mut - pv, fd) - return - } - // no errors we can send the HTTP headers - fast_send_resp_header(mut conn, completed_context.res) or {} - } - } - .file { - // save file information - length := completed_context.res.header.get(.content_length) or { - fast_send_resp(mut conn, http_500) or {} - return - } - $if termux { - mut file := os.open(completed_context.return_file) or { - // Context checks if the file is valid, so this should never happen - fast_send_resp(mut conn, http_500) or {} - pv.close_conn(fd) - return - } - defer { file.close() } - fast_send_resp_header(mut conn, completed_context.res) or { - pv.close_conn(fd) - return - } - // Termux file responses are sent inline because deferred write events can - // leave static responses stalled. - send_file_inline(mut conn, mut file, length.i64(), - params.timeout_in_seconds) or { - pv.close_conn(fd) - return - } - handle_complete_request(completed_context.client_wants_to_close, mut pv, fd) - } $else { - params.file_responses[fd].total = length.i64() - params.file_responses[fd].file = os.open(completed_context.return_file) or { - // Context checks if the file is valid, so this should never happen - fast_send_resp(mut conn, http_500) or {} - params.file_responses[fd].done() - pv.close_conn(fd) - return - } - params.file_responses[fd].open = true - - res := pv.add(fd, picoev.picoev_write, params.timeout_in_seconds, - picoev.raw_callback) - // picoev error - if res == -1 { - // should not happen - fast_send_resp(mut conn, http_500) or {} - params.file_responses[fd].done() - pv.close_conn(fd) - return - } - // no errors we can send the HTTP headers - fast_send_resp_header(mut conn, completed_context.res) or {} - } - } - } - } else { - // invalid request headers/data - pv.close_conn(fd) - } - } - - // close the connection when `should_close` is true. - @[inline] - fn handle_complete_request(should_close bool, mut pv picoev.Picoev, fd int) { - if should_close { - pv.close_conn(fd) - } - } - - fn handle_request[A, X](mut conn net.TcpConn, req http.Request, params &RequestParams) ?&Context { - mut global_app := unsafe { &A(params.global_app) } - // TODO: change this variable to include the total wait time over each network cycle - // maybe store it in Request.user_ptr ? - page_gen_start := time.ticks() - $if trace_request ? { - dump(req) - } - $if trace_request_url ? { - dump(req.url) - } - // parse the URL, query and form data - mut url := urllib.parse_request_uri(req.url) or { - eprintln('[veb] error parsing path "${req.url}": ${err}') - return none - } - query := parse_query_from_url(url) - form, files := parse_form_from_request(req) or { - // Bad request - eprintln('[veb] error parsing form: ${err.msg()}') - conn.write(http_400.bytes()) or {} - return none - } - // remove the port from the HTTP Host header - host_with_port := req.header.get(.host) or { '' } - host, _ := urllib.split_host_port(host_with_port) - // Create Context with request data - mut ctx := &Context{ - req: req - page_gen_start: page_gen_start - conn: conn - query: query - form: form - files: files - } - if connection_header := req.header.get(.connection) { - // A client that does not support persistent connections MUST send the - // "close" connection option in every request message. - if connection_header.to_lower() == 'close' { - ctx.client_wants_to_close = true - } - } - $if A is StaticApp { - ctx.custom_mime_types = global_app.static_mime_types.clone() - mut user_context := X{} - user_context.Context = ctx - if serve_if_static[X](static_handler_config(global_app.static_files, - global_app.static_mime_types, global_app.static_hosts, - global_app.enable_static_gzip, global_app.enable_static_zstd, - global_app.enable_static_compression, global_app.static_compression_max_size, - global_app.static_compression_mime_types, global_app.enable_markdown_negotiation), mut - user_context, url, host) - { - return &user_context.Context - } - } - // match controller paths - $if A is ControllerInterface { - if completed_context := handle_controllers[X](params.controllers, ctx, mut url, host) { - return completed_context - } - } - // create a new user context and pass the veb's context - mut user_context := X{} - user_context.Context = ctx - handle_route[A, X](mut global_app, mut user_context, url, host, params.routes) - // we need to explicitly tell the V compiler to return a reference - return &user_context.Context - } -} -- 2.39.5