From 065a450b86f6459b1e4398fe7b0594bbfcc2d691 Mon Sep 17 00:00:00 2001 From: Richard Wheeler Date: Mon, 15 Jun 2026 09:02:23 -0400 Subject: [PATCH] net.http: add connection-pooling Transport with HTTP/1.1 keep-alive (#27412) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * net.http: add connection-pooling Transport with HTTP/1.1 keep-alive (phase 1) First phase of Go-style connection reuse for the HTTP client (community direction from the pooling proposal: general Transport, on by default, process-global pool). A new Transport (vlib/net/http/transport.v) owns idle keep-alive HTTP/1.1 connections (plain TCP and TLS), keyed by origin + TLS configuration. fetch()/Request.do() route through a process-global default Transport unless the request uses a proxy or opts out via the new disable_connection_reuse flag (which preserves the old behavior exactly, including Connection: close). Mechanics: the request loop's precise response framing now records whether a response ended via Content-Length/chunked/no-body framing (reusable) vs EOF or truncation; only precisely-framed connections return to the pool, and only when neither side sent Connection: close. A Content-Length: 0 response now terminates the read immediately instead of waiting for EOF (required for keep-alive, strictly faster otherwise). Stale pooled connections (closed by the server while idle) are detected on use and transparently retried once on a fresh connection, for idempotent methods or when the request bytes were never written. Idle connections are evicted lazily (90s default, capped at 4 per host); http.close_idle_connections() flushes the default pool. HTTP/2 requests still run on the existing one-shot driver (the multiplexed pooled H2 connection is the next phase), and Windows SChannel https keeps its one-shot path until SChannel pooling lands; plain-http pooling works on Windows already. Verified: new transport_test.v (reuse proven by server accept-counters; both plain and TLS), full vlib/net/http suite, and -d network h2+h1 real-server checks, on Windows (tcc) and with -d no_vschannel (mbedtls client). Co-Authored-By: Claude Fable 5 * net.http: include the ALPN preference in the Transport pool key Review finding: a TLS connection dialled with enable_http2: false (no ALPN) was pooled under the same key as default requests, so a later HTTP/2-enabled request to the same origin could reuse it and silently never negotiate h2 while the pooled connection lived. The h2 preference is now part of the pool key for https origins (plain http ignores it, so that pool stays unsplit). Adds a regression test: forced-h1 then default request to one origin must use two connections, while forced-h1 requests still share among themselves. The TLS test server now serves each accepted connection on its own thread — a pooled idle connection keeps its serve loop parked in read, which must not block accepting the next connection. Co-Authored-By: Claude Fable 5 * net.http: fix downstream tests asserting the pre-pooling Connection: close behavior Two vlib tests relied on the old client always sending Connection: close: - veb_test.v asserted the response carries 'Connection: close'. The default client now keeps connections alive, so the test asserts the keep-alive default and verifies the historical close behavior end-to-end through the disable_connection_reuse opt-out instead. - server_test.v's MyHttpHandler echoed the request headers into the response, so the request's 'Content-Length: 0' (always sent on GETs) overrode the server's correct length: the /big response went out declaring 0 bytes while carrying 40016. The old client masked this malformed framing twice over — it ignored 'Content-Length: 0' and relied on close-framing — and the truncation only surfaced on OpenBSD, whose smaller socket buffers split the body across reads. The handler no longer echoes request headers. Also hardens the client: a response carrying MORE body bytes than its declared Content-Length is malformed and its stream position untrustworthy, so such a connection is now never returned to the pool. Co-Authored-By: Claude Fable 5 * net.http: do not retry non-idempotent pooled requests that may have been sent When a reused keep-alive connection failed after the request bytes were written, round_trip already refused to internally replay a non-idempotent method -- but it returned the raw read error, whose code the caller's outer max_retries loop did not recognize, so the loop re-sent the POST/PATCH on a fresh connection and could duplicate side effects. Tag that case with transport_err_unsafe_retry and teach is_no_need_retry_error about it, so the outer loop honors the idempotency guard. Fresh-connection failures are unaffected and still retry as before. Adds a regression test: the server reads a pooled POST then drops the connection; the request must fail without being replayed (post_count == 1, accept_count == 1). Reported by Codex review on #27412. Co-Authored-By: Claude Opus 4.8 * net.http: honor split Connection: close and cap total idle pooled conns Addresses two Codex review findings on the pooling Transport: - response_allows_reuse() joins all Connection header values before scanning for the close token; a close carried in a repeated header line (after a keep-alive one) is no longer missed, so a socket the server asked us not to reuse is not pooled. - Transport gains max_idle_conns (default 100), a global cap on idle pooled connections across all pool keys, evicting the least-recently -used connection on overflow. This bounds file-descriptor use when many distinct origins are fetched once each. The time-based reaper remains a documented follow-up. Tests: split-Connection close is not pooled (2 accepts); global idle cap retains exactly max_idle_conns, evicting the oldest keys. Co-Authored-By: WOZCODE * net.http: initialize default Transport eagerly to avoid a startup nil race default_transport() created the process-global Transport lazily via sync.Once. But sync.Once sets its done flag before running the callback (vlang/v#27456), so on concurrent first use a second caller could observe the Once as done and read http_transport_default while it was still nil — a nil dereference on startup. Initialize the global eagerly instead (__global http_transport_default = new_transport()), which runs in _vinit() before main() and before any thread is spawned, so it is single-threaded by construction. This drops the sync.Once dependency entirely and makes default_transport() a trivial getter. The startup cost matches the previous code, which already allocated a sync.Once (RwMutex) global. Co-Authored-By: WOZCODE * net.http: collision-proof pool key and stop replaying possibly-sent requests Two correctness fixes from review (Codex P2s on transport.v): - transport_pool_key length-prefixes its free-form string fields (host and the TLS verify/cert/cert_key paths or in-memory PEM blobs) so a value containing the '|' separator can no longer collide with a different field split. Previously cert='a|b',cert_key='c' and cert='a',cert_key='b|c' produced the same key, letting a request reuse a connection dialed with the wrong CA / client certificate. - A failed exchange is no longer assumed to have sent zero bytes. The write helpers do partial writes and cannot report how many bytes reached the server, so transport_err_stale_write (which claimed "always safe to retry") is removed: a non-idempotent method whose exchange fails — on a reused OR a fresh connection — is now tagged transport_err_unsafe_retry so neither round_trip's inner loop nor the caller's outer retry loop replays it and duplicates side effects. Idempotent methods still retry, and pre-write dial/handshake failures stay freely retryable. Test: pool keys do not collide across '|'-containing TLS fields (verified to fail with raw concatenation). Existing reuse/idempotent-retry and non-idempotent-not-retried tests still pass. Co-Authored-By: WOZCODE --------- Co-authored-by: Alexander Medvednikov Co-authored-by: Claude Fable 5 Co-authored-by: WOZCODE --- vlib/net/http/backend.c.v | 25 ++ vlib/net/http/http.v | 68 +++-- vlib/net/http/request.v | 92 +++++- vlib/net/http/server_test.v | 7 +- vlib/net/http/transport.v | 403 ++++++++++++++++++++++++ vlib/net/http/transport_test.v | 543 +++++++++++++++++++++++++++++++++ vlib/veb/tests/veb_test.v | 10 +- 7 files changed, 1099 insertions(+), 49 deletions(-) create mode 100644 vlib/net/http/transport.v create mode 100644 vlib/net/http/transport_test.v diff --git a/vlib/net/http/backend.c.v b/vlib/net/http/backend.c.v index 4332bdefa..da78366c2 100644 --- a/vlib/net/http/backend.c.v +++ b/vlib/net/http/backend.c.v @@ -138,3 +138,28 @@ fn (req &Request) do_request(req_headers string, mut ssl_conn ssl.SSLConn) !Resp } return parse_received_response(response_text, response_info) } + +// h1_exchange_ssl sends an already-built HTTP/1.x request over an open TLS +// connection and reads one response, leaving the connection open (unlike +// do_request, which shuts the connection down). The bool result reports +// whether the response was precisely framed, so the connection can safely +// carry another request (see ReceivedResponseInfo.reusable). +fn (req &Request) h1_exchange_ssl(mut ssl_conn ssl.SSLConn, raw string) !(Response, bool) { + ssl_conn.write_string(raw) or { + return error('http.transport: TLS connection write failed: ${err.msg()}') + } + mut content := strings.new_builder(4096) + response_info := req.receive_all_data_from_cb_in_builder(mut content, voidptr(ssl_conn), + read_from_ssl_connection_cb)! + response_text := content.str() + $if trace_http_response ? { + eprint('< ') + eprint(response_text) + eprintln('') + } + if req.on_finish != unsafe { nil } { + req.on_finish(req, u64(response_text.len))! + } + resp := parse_received_response(response_text, response_info)! + return resp, response_info.reusable +} diff --git a/vlib/net/http/http.v b/vlib/net/http/http.v index dfd59b8f2..0c4304c85 100644 --- a/vlib/net/http/http.v +++ b/vlib/net/http/http.v @@ -29,14 +29,15 @@ pub mut: read_timeout i64 = 30 * time.second // timeout for reading the response; applies to plain http and to direct https requests write_timeout i64 = 30 * time.second // timeout for writing the request; applies to plain http (write timeouts are not enforced on the SSL write path yet) - validate bool // set this to true, if you want to stop requests, when their certificates are found to be invalid - verify string // the path to a rootca.pem file, containing trusted CA certificate(s) - cert string // the path to a cert.pem file, containing client certificate(s) for the request - cert_key string // the path to a key.pem file, containing private keys for the client certificate(s) - in_memory_verification bool // if true, verify, cert, and cert_key are read from memory, not from a file - allow_redirect bool = true // whether to allow redirect - max_retries int = 5 // maximum number of retries required when an underlying socket error occurs - enable_http2 bool = true // when true (the default) and the URL is https, advertise ALPN `h2, http/1.1` and use HTTP/2 if the server selects it; set to false to force HTTP/1.1. Ignored for plain http://, and for the Windows SChannel backend which has no ALPN yet (see vlang/v#27383). on_progress / on_progress_body / stop_copying_limit / stop_receiving_limit are honored on the HTTP/2 path; on_progress fires per DATA frame payload rather than per raw network read. + validate bool // set this to true, if you want to stop requests, when their certificates are found to be invalid + verify string // the path to a rootca.pem file, containing trusted CA certificate(s) + cert string // the path to a cert.pem file, containing client certificate(s) for the request + cert_key string // the path to a key.pem file, containing private keys for the client certificate(s) + in_memory_verification bool // if true, verify, cert, and cert_key are read from memory, not from a file + allow_redirect bool = true // whether to allow redirect + max_retries int = 5 // maximum number of retries required when an underlying socket error occurs + enable_http2 bool = true // when true (the default) and the URL is https, advertise ALPN `h2, http/1.1` and use HTTP/2 if the server selects it; set to false to force HTTP/1.1. Ignored for plain http://, and for the Windows SChannel backend which has no ALPN yet (see vlang/v#27383). on_progress / on_progress_body / stop_copying_limit / stop_receiving_limit are honored on the HTTP/2 path; on_progress fires per DATA frame payload rather than per raw network read. + disable_connection_reuse bool // opt out of the shared connection pool: open a fresh connection for this request, send `Connection: close`, and close the connection after the response (the pre-pooling behavior) // callbacks to allow custom reporting code to run, while the request is running, and to implement streaming on_redirect RequestRedirectFn = unsafe { nil } on_progress RequestProgressFn = unsafe { nil } @@ -171,31 +172,32 @@ pub fn prepare(config FetchConfig) !Request { } url := build_url_from_fetch(config) or { return error('http.fetch: invalid url ${config.url}') } req := Request{ - method: config.method - url: url - data: config.data - header: config.header - cookies: config.cookies - user_agent: config.user_agent - user_ptr: config.user_ptr - verbose: config.verbose - validate: config.validate - read_timeout: config.read_timeout - write_timeout: config.write_timeout - verify: config.verify - cert: config.cert - proxy: config.proxy - cert_key: config.cert_key - in_memory_verification: config.in_memory_verification - allow_redirect: config.allow_redirect - max_retries: config.max_retries - enable_http2: config.enable_http2 - on_progress: config.on_progress - on_progress_body: config.on_progress_body - on_redirect: config.on_redirect - on_finish: config.on_finish - stop_copying_limit: config.stop_copying_limit - stop_receiving_limit: config.stop_receiving_limit + method: config.method + url: url + data: config.data + header: config.header + cookies: config.cookies + user_agent: config.user_agent + user_ptr: config.user_ptr + verbose: config.verbose + validate: config.validate + read_timeout: config.read_timeout + write_timeout: config.write_timeout + verify: config.verify + cert: config.cert + proxy: config.proxy + cert_key: config.cert_key + in_memory_verification: config.in_memory_verification + allow_redirect: config.allow_redirect + max_retries: config.max_retries + enable_http2: config.enable_http2 + disable_connection_reuse: config.disable_connection_reuse + on_progress: config.on_progress + on_progress_body: config.on_progress_body + on_redirect: config.on_redirect + on_finish: config.on_finish + stop_copying_limit: config.stop_copying_limit + stop_receiving_limit: config.stop_receiving_limit } return req } diff --git a/vlib/net/http/request.v b/vlib/net/http/request.v index 1e31add02..1fb208ab7 100644 --- a/vlib/net/http/request.v +++ b/vlib/net/http/request.v @@ -39,14 +39,15 @@ pub mut: read_timeout i64 = 30 * time.second write_timeout i64 = 30 * time.second - validate bool // when true, certificate failures will stop further processing - verify string - cert string - cert_key string - in_memory_verification bool // if true, verify, cert, and cert_key are read from memory, not from a file - allow_redirect bool = true // whether to allow redirect - max_retries int = 5 // maximum number of retries required when an underlying socket error occurs - enable_http2 bool = true // when true (the default) and the URL is https, advertise ALPN `h2, http/1.1` and use HTTP/2 if the server selects it; set to false to force HTTP/1.1. Ignored for plain http://, and for the Windows SChannel backend which has no ALPN yet (see vlang/v#27383). on_progress / on_progress_body / stop_copying_limit / stop_receiving_limit are honored on the HTTP/2 path; on_progress fires per DATA frame payload rather than per raw network read. + validate bool // when true, certificate failures will stop further processing + verify string + cert string + cert_key string + in_memory_verification bool // if true, verify, cert, and cert_key are read from memory, not from a file + allow_redirect bool = true // whether to allow redirect + max_retries int = 5 // maximum number of retries required when an underlying socket error occurs + enable_http2 bool = true // when true (the default) and the URL is https, advertise ALPN `h2, http/1.1` and use HTTP/2 if the server selects it; set to false to force HTTP/1.1. Ignored for plain http://, and for the Windows SChannel backend which has no ALPN yet (see vlang/v#27383). on_progress / on_progress_body / stop_copying_limit / stop_receiving_limit are honored on the HTTP/2 path; on_progress fires per DATA frame payload rather than per raw network read. + disable_connection_reuse bool // opt out of the shared connection pool: open a fresh connection for this request, send `Connection: close`, and close the connection after the response (the pre-pooling behavior) // callbacks to allow custom reporting code to run, while the request is running, and to implement streaming on_redirect RequestRedirectFn = unsafe { nil } on_progress RequestProgressFn = unsafe { nil } @@ -254,6 +255,22 @@ fn (req &Request) method_and_url_to_response(method Method, url urllib.URL, data } } // println('fetch ${method}, ${scheme}, ${host_name}, ${nport}, ${path} ') + if req.proxy == unsafe { nil } && !req.disable_connection_reuse + && (scheme == 'http' || scheme == 'https') { + // Default path: route through the shared connection-pooling Transport, + // which reuses keep-alive connections across requests to the same origin. + mut transport := default_transport() + for i in 0 .. req.max_retries { + res := transport.round_trip(req, method, scheme, host_name, nport, path, data, header) or { + if i == req.max_retries - 1 || is_no_need_retry_error(err.code()) { + return err + } + continue + } + return res + } + return error('http.request.method_and_url_to_response: exhausted retries') + } if scheme == 'https' && req.proxy == unsafe { nil } { // println('ssl_do( ${nport}, ${method}, ${host_name}, ${path} )') for i in 0 .. req.max_retries { @@ -295,6 +312,15 @@ fn (req &Request) build_request_headers(method Method, host_name string, port in } fn (req &Request) build_request_headers_with(method Method, host_name string, port int, path string, data string, header Header) string { + return req.build_request_headers_opts(method, host_name, port, path, data, header, true) +} + +// build_request_headers_opts builds the raw HTTP/1.x request. With +// `connection_close` true it appends `Connection: close` (the historical +// one-shot behavior); the pooled keep-alive path passes false and emits no +// Connection header, leaving the HTTP/1.1 default (keep-alive) in effect and +// respecting any Connection header the caller set themselves. +fn (req &Request) build_request_headers_opts(method Method, host_name string, port int, path string, data string, header Header, connection_close bool) string { mut sb := strings.new_builder(4096) version := if req.version == .unknown { Version.v1_1 } else { req.version } sb.write_string(method.str()) @@ -337,7 +363,9 @@ fn (req &Request) build_request_headers_with(method Method, host_name string, po sb.write_string('\r\n') } sb.write_string(req.build_request_cookies_header_with_header(header)) - sb.write_string('Connection: close\r\n') + if connection_close { + sb.write_string('Connection: close\r\n') + } sb.write_string('\r\n') sb.write_string(data) return sb.str() @@ -403,6 +431,28 @@ fn (req &Request) http_do(host string, method Method, path string, data string, return parse_received_response(response_text, response_data.info) } +// h1_exchange_tcp sends an already-built HTTP/1.x request over an open TCP +// connection and reads one response, leaving the connection open. The bool +// result reports whether the response was precisely framed, so the connection +// can safely carry another request (see ReceivedResponseInfo.reusable). +fn (req &Request) h1_exchange_tcp(mut client net.TcpConn, raw string) !(Response, bool) { + client.write(raw.bytes()) or { + return error('http.transport: connection write failed: ${err.msg()}') + } + response_data := req.read_all_from_client_connection(client)! + response_text := response_data.data.bytestr() + $if trace_http_response ? { + eprint('< ') + eprint(response_text) + eprintln('') + } + if req.on_finish != unsafe { nil } { + req.on_finish(req, u64(response_text.len))! + } + resp := parse_received_response(response_text, response_data.info)! + return resp, response_data.info.reusable +} + // abstract over reading the whole content from TCP or SSL connections: type FnReceiveChunk = fn (con voidptr, buf &u8, bufsize int) !int @@ -543,6 +593,12 @@ struct ReceivedResponseInfo { headers_end int = -1 is_chunked_transfer bool has_truncated_body bool + // reusable is true when the read loop terminated via precise response + // framing (Content-Length satisfied, chunked transfer complete, or a + // no-body status), meaning the connection holds no response leftovers and + // can safely carry another request. EOF-terminated or truncated reads + // leave it false. + reusable bool } fn parse_received_response(response_text string, info ReceivedResponseInfo) !Response { @@ -589,6 +645,7 @@ fn (req &Request) receive_all_data_from_cb_in_builder(mut content strings.Builde mut new_len := u64(0) mut status_code := -1 mut has_truncated_body := false + mut framed_complete := false for { readcounter++ len := receive_chunk_cb(con, bp, bufsize) or { @@ -703,18 +760,25 @@ fn (req &Request) receive_all_data_from_cb_in_builder(mut content strings.Builde has_truncated_body = true } if is_chunked_transfer && chunked_complete { + framed_complete = true break } if headers_end >= 0 && response_has_no_body(req.method, status_code) { // HEAD / 1xx / 204 / 304: response body is forbidden by the spec, so // stop as soon as the headers terminator is in. Any `Content-Length` // describes a body that will never be sent. + framed_complete = true break } - if has_content_length { - if expected_size > 0 && body_so_far >= expected_size { - break - } + if has_content_length && body_so_far >= expected_size { + // The framing is satisfied even when expected_size == 0: on a + // keep-alive connection the server sends nothing further, so waiting + // for EOF here would stall until the read timeout. + // A response carrying MORE body bytes than its declared + // Content-Length is malformed: the stream position is no longer + // trustworthy, so such a connection must never be reused. + framed_complete = body_so_far == expected_size + break } if req.stop_receiving_limit > 0 && new_len > req.stop_receiving_limit { break @@ -725,6 +789,7 @@ fn (req &Request) receive_all_data_from_cb_in_builder(mut content strings.Builde headers_end: headers_end is_chunked_transfer: is_chunked_transfer has_truncated_body: has_truncated_body + reusable: framed_complete } } @@ -1165,5 +1230,6 @@ fn is_no_need_retry_error(err_code int) bool { net.err_no_udp_remote.code(), net.err_connect_timed_out.code(), net.err_timed_out_code, + transport_err_unsafe_retry, ] } diff --git a/vlib/net/http/server_test.v b/vlib/net/http/server_test.v index aec1bf819..731859193 100644 --- a/vlib/net/http/server_test.v +++ b/vlib/net/http/server_test.v @@ -84,9 +84,12 @@ mut: fn (mut handler MyHttpHandler) handle(req http.Request) http.Response { handler.counter++ // eprintln('${time.now()} | counter: ${handler.counter} | ${req.method} ${req.url}\n${req.header}\n${req.data} - 200 OK\n') + // Note: the response must not echo `req.header` wholesale — the request's + // `Content-Length` (0 for GETs) would override the server's correct one, + // yielding a malformed response whose declared length disagrees with its + // body. mut r := http.Response{ - body: req.data + ', ${req.url}' - header: req.header + body: req.data + ', ${req.url}' } match req.url.all_before('?') { '/endpoint', '/another/endpoint' { diff --git a/vlib/net/http/transport.v b/vlib/net/http/transport.v new file mode 100644 index 000000000..4e8a0f8cc --- /dev/null +++ b/vlib/net/http/transport.v @@ -0,0 +1,403 @@ +// Copyright (c) 2019-2024 Alexander Medvednikov. All rights reserved. +// Use of this source code is governed by an MIT license +// that can be found in the LICENSE file. +@[has_globals] +module http + +import net +import net.ssl +import sync +import time + +// This file implements connection reuse for the HTTP client: a Transport owns +// a pool of keep-alive connections, keyed by origin and TLS configuration, and +// fetch()/Request.do() route through a process-global default Transport unless +// the request opts out (`disable_connection_reuse`) or uses a proxy. The first +// cut pools HTTP/1.1 connections (plain TCP and TLS); HTTP/2 requests run on +// the existing one-shot driver until the multiplexed H2 connection lands. + +// transport_err_unsafe_retry tags a failed exchange on a transport connection +// for a non-idempotent method: the request bytes may have (partially) reached +// the server — the write helpers cannot prove otherwise — so it must not be +// replayed, neither by round_trip's inner loop nor the caller's outer retry +// loop. is_no_need_retry_error recognizes this code so the outer loop honors it. +const transport_err_unsafe_retry = -20013 + +// H1PooledConn is one keep-alive HTTP/1.1 connection in a Transport pool: +// either a plain TCP connection or a TLS one (exactly one of the two is set). +@[heap] +struct H1PooledConn { +mut: + key string + tcp &net.TcpConn = unsafe { nil } + ssl &ssl.SSLConn = unsafe { nil } + idle_since time.Time +} + +// close_conn shuts the underlying connection down and clears it. +fn (mut c H1PooledConn) close_conn() { + if c.ssl != unsafe { nil } { + c.ssl.shutdown() or {} + c.ssl = unsafe { nil } + } + if c.tcp != unsafe { nil } { + c.tcp.close() or {} + c.tcp = unsafe { nil } + } +} + +// refresh_timeouts applies the current request's timeouts to a pooled +// connection, which may have been dialled by a request with different ones. +fn (mut c H1PooledConn) refresh_timeouts(req &Request) { + if c.tcp != unsafe { nil } { + c.tcp.set_read_timeout(req.read_timeout) + c.tcp.set_write_timeout(req.write_timeout) + } else if c.ssl != unsafe { nil } { + if req.read_timeout > 0 { + c.ssl.set_read_timeout(req.read_timeout) + } + } +} + +// exchange runs one request/response over the pooled connection, leaving it +// open. The bool result reports whether the response was precisely framed, so +// the connection can safely carry another request. +fn (mut c H1PooledConn) exchange(req &Request, raw string) !(Response, bool) { + if c.ssl != unsafe { nil } { + return req.h1_exchange_ssl(mut c.ssl, raw) + } + return req.h1_exchange_tcp(mut c.tcp, raw) +} + +// Transport holds the connection pools and reuse policy for HTTP requests. It +// is safe for concurrent use; fetch()/Request.do() share one process-global +// instance (see default_transport). +// +// Note: idle_timeout is enforced lazily on checkout of the same pool key; there +// is no background reaper, so a connection to an origin that is never revisited +// is reclaimed by the global max_idle_conns cap (or close_idle_connections()) +// rather than on a timer. A time-based reaper is a planned follow-up. +@[heap] +pub struct Transport { +pub mut: + // max_idle_conns_per_host caps the idle keep-alive connections retained per + // pool key (origin + TLS configuration). + max_idle_conns_per_host int = 4 + // max_idle_conns caps the total idle keep-alive connections kept across all + // pool keys, bounding file-descriptor use when many distinct origins are + // fetched. The least-recently-used idle connection is evicted on overflow. + // 0 disables the global cap. + max_idle_conns int = 100 + // idle_timeout is how long an idle connection may sit in the pool before it + // is discarded instead of reused. + idle_timeout time.Duration = 90 * time.second +mut: + mu &sync.Mutex = sync.new_mutex() + h1_idle map[string][]&H1PooledConn +} + +// new_transport creates an empty Transport with default limits. +pub fn new_transport() &Transport { + return &Transport{} +} + +// Initialized eagerly in _vinit() (before main() and before any thread is +// spawned), so the shared default Transport is fully constructed without a +// concurrent-startup race. Lazy init via sync.Once is unsafe here: Once sets its +// done flag before running the callback, so a second caller can observe "done" +// and read this global while it is still nil (vlang/v#27456). +__global http_transport_default = new_transport() + +// default_transport returns the process-global Transport that fetch() and +// Request.do() route through. +pub fn default_transport() &Transport { + return http_transport_default +} + +// close_idle_connections closes every idle pooled connection held by the +// default Transport. Useful before fork()-style process handoffs, or in tests. +pub fn close_idle_connections() { + mut t := default_transport() + t.close_idle() +} + +// close_idle closes every idle pooled connection held by this Transport. +// In-flight requests are unaffected. +pub fn (mut t Transport) close_idle() { + mut all := []&H1PooledConn{} + t.mu.lock() + for _, list in t.h1_idle { + for c in list { + all << c + } + } + t.h1_idle.clear() + t.mu.unlock() + for mut c in all { + c.close_conn() + } +} + +// transport_pool_key builds the pool key for a request: connections are only +// shared between requests whose origin and TLS-relevant settings all match. +// enable_http2 is part of the key because it changes what the connection +// advertised via ALPN at dial time: a forced-HTTP/1.1 connection (no ALPN) +// must not satisfy an HTTP/2-enabled request, which would otherwise silently +// never negotiate h2 against that origin while the pooled connection lives. +fn transport_pool_key(req &Request, scheme string, host string, port int) string { + // enable_http2 only affects https dials (plain http ignores it), so keep + // the plain-http pool unsplit. + h2 := scheme == 'https' && req.enable_http2 + // Length-prefix the free-form string fields (host and the TLS paths/PEM + // blobs) so a value containing the '|' separator cannot collide with a + // different field split — e.g. cert='a|b',cert_key='c' vs cert='a', + // cert_key='b|c' — which would let a request reuse a connection dialed with + // the wrong CA or client certificate. Bools and the int port cannot contain + // '|', and scheme is a fixed literal, so they need no prefixing. + return '${scheme}|${pk_part(host)}|${port}|${h2}|${req.validate}|${pk_part(req.verify)}|${pk_part(req.cert)}|${pk_part(req.cert_key)}|${req.in_memory_verification}' +} + +// pk_part length-prefixes a string (`len:value`) so concatenated pool-key +// components stay unambiguous regardless of the bytes they contain. +fn pk_part(s string) string { + return '${s.len}:${s}' +} + +// transport_is_idempotent reports whether a request with this method can be +// replayed safely after a failure on a reused connection (RFC 7231 4.2.2). +fn transport_is_idempotent(method Method) bool { + return method in [.get, .head, .options, .trace, .put, .delete] +} + +// response_allows_reuse reports whether the server's response permits keeping +// the connection open for another request. +fn response_allows_reuse(resp &Response) bool { + if resp.status_code < 200 { + // 1xx handling stops at the headers; the connection state past them is + // not tracked, so never reuse. + return false + } + // A server may split Connection across repeated header lines; get() returns + // only the first, so join all values before scanning for the close token. + conn_tokens := resp.header.values(.connection).join(',').to_lower() + if conn_tokens.contains('close') { + return false + } + ver := resp.version() + if ver == .v1_0 { + // HTTP/1.0 defaults to close; reuse only with an explicit keep-alive. + return conn_tokens.contains('keep-alive') + } + return ver == .v1_1 +} + +// checkout pops the most recently used idle connection for `key`, discarding +// any that sat idle past the timeout. Returns nil when the pool has none. +fn (mut t Transport) checkout(key string) &H1PooledConn { + mut expired := []&H1PooledConn{} + mut found := &H1PooledConn(unsafe { nil }) + t.mu.lock() + for { + mut list := t.h1_idle[key] or { break } + if list.len == 0 { + break + } + mut c := list.pop() + t.h1_idle[key] = list + if time.now() - c.idle_since > t.idle_timeout { + expired << c + continue + } + found = c + break + } + t.mu.unlock() + for mut c in expired { + c.close_conn() + } + return found +} + +// checkin returns a healthy connection to the idle pool, or closes it when the +// per-host pool is already at capacity. Adding it may push the total idle count +// over max_idle_conns, in which case the least-recently-used idle connection +// across all pools is evicted and closed. +fn (mut t Transport) checkin(mut conn H1PooledConn) { + conn.idle_since = time.now() + mut evicted := &H1PooledConn(unsafe { nil }) + t.mu.lock() + mut list := t.h1_idle[conn.key] or { []&H1PooledConn{} } + if list.len >= t.max_idle_conns_per_host { + t.mu.unlock() + conn.close_conn() + return + } + list << conn + t.h1_idle[conn.key] = list + if t.max_idle_conns > 0 && t.total_idle_locked() > t.max_idle_conns { + evicted = t.evict_oldest_locked() + } + t.mu.unlock() + if evicted != unsafe { nil } { + evicted.close_conn() + } +} + +// total_idle_locked sums the idle connections across all pool keys. The caller +// must hold t.mu. +fn (t &Transport) total_idle_locked() int { + mut n := 0 + for _, list in t.h1_idle { + n += list.len + } + return n +} + +// evict_oldest_locked removes and returns the least-recently-used idle +// connection across all pool keys (nil if the pool is empty), pruning an +// emptied key. The caller must hold t.mu and close the returned connection. +fn (mut t Transport) evict_oldest_locked() &H1PooledConn { + mut oldest_key := '' + mut oldest_idx := -1 + mut oldest_since := time.now() + for k, list in t.h1_idle { + for i, c in list { + if oldest_idx == -1 || c.idle_since < oldest_since { + oldest_key = k + oldest_idx = i + oldest_since = c.idle_since + } + } + } + if oldest_idx == -1 { + return &H1PooledConn(unsafe { nil }) + } + mut list := t.h1_idle[oldest_key] or { return &H1PooledConn(unsafe { nil }) } + victim := list[oldest_idx] + list.delete(oldest_idx) + if list.len == 0 { + t.h1_idle.delete(oldest_key) + } else { + t.h1_idle[oldest_key] = list + } + return victim +} + +// maybe_checkin pools the connection when both the read framing and the +// response (and request) headers allow reuse; otherwise it closes it. +fn (mut t Transport) maybe_checkin(mut conn H1PooledConn, header Header, reusable bool, resp &Response) { + if reusable && !header.contains(.connection) && response_allows_reuse(resp) { + t.checkin(mut conn) + return + } + conn.close_conn() +} + +// round_trip performs one HTTP request through the connection pool: it reuses +// an idle connection for the request's pool key when one exists (transparently +// retrying once on a connection that turned out to be stale), dials otherwise, +// and returns healthy connections to the pool afterwards. +fn (mut t Transport) round_trip(req &Request, method Method, scheme string, host string, port int, path string, data string, header Header) !Response { + $if windows && !no_vschannel ? { + if scheme == 'https' { + // The SChannel backend keeps its proven one-shot path until SChannel + // pooling lands; plain-http pooling below already works on Windows. + return req.ssl_do(port, method, host, path, data, header) + } + } + raw := req.build_request_headers_opts(method, host, port, path, data, header, false) + $if trace_http_request ? { + eprint('> ') + eprint(raw) + eprintln('') + } + key := transport_pool_key(req, scheme, host, port) + // A stale pooled connection (closed by the server while idle) fails the + // exchange; drain through the pool, then dial fresh. + for _ in 0 .. t.max_idle_conns_per_host + 1 { + mut conn := t.checkout(key) + mut reused := true + if conn == unsafe { nil } { + reused = false + if scheme == 'https' { + return t.tls_fresh_round_trip(req, key, raw, method, host, port, path, data, header) + } + conn = t.dial_h1_tcp(req, key, host, port)! + } else { + conn.refresh_timeouts(req) + } + resp, reusable := conn.exchange(req, raw) or { + conn.close_conn() + // The write helpers do partial writes and cannot report how many + // bytes reached the server, so a failed exchange may have delivered + // part of the request. Only idempotent methods are safe to replay; + // a non-idempotent one is tagged so neither this loop nor the + // caller's outer retry loop re-sends it (avoiding duplicate side + // effects). A pre-write dial failure, by contrast, propagates before + // this point and stays freely retryable. + if !transport_is_idempotent(method) { + return error_with_code(err.msg(), transport_err_unsafe_retry) + } + if reused { + // Drain stale pooled connections, then fall through to a fresh dial. + continue + } + return err + } + t.maybe_checkin(mut conn, header, reusable, resp) + return resp + } + return error('http.transport: request failed after retrying on a fresh connection') +} + +// dial_h1_tcp opens a fresh plain-TCP connection for `key`. +fn (mut t Transport) dial_h1_tcp(req &Request, key string, host string, port int) !&H1PooledConn { + mut client := net.dial_tcp('${host}:${port}')! + client.set_read_timeout(req.read_timeout) + client.set_write_timeout(req.write_timeout) + return &H1PooledConn{ + key: key + tcp: client + } +} + +// tls_fresh_round_trip dials a fresh TLS connection (advertising ALPN h2 when +// HTTP/2 is enabled) and completes the request on it. When the server selects +// h2, the request runs on the existing one-shot HTTP/2 driver and the +// connection is not pooled yet; an http/1.1 connection is pooled afterwards +// like any other. +fn (mut t Transport) tls_fresh_round_trip(req &Request, key string, raw string, method Method, host string, port int, path string, data string, header Header) !Response { + alpn := if req.enable_http2 { ['h2', 'http/1.1'] } else { []string{} } + mut ssl_conn := ssl.new_ssl_conn( + verify: req.verify + cert: req.cert + cert_key: req.cert_key + validate: req.validate + in_memory_verification: req.in_memory_verification + alpn_protocols: alpn + )! + ssl_conn.dial(host, port)! + if req.read_timeout > 0 { + ssl_conn.set_read_timeout(req.read_timeout) + } + if req.enable_http2 && ssl_conn.negotiated_alpn() == 'h2' { + return req.h2_do(mut ssl_conn, method, host, port, path, data, header) + } + mut conn := &H1PooledConn{ + key: key + ssl: ssl_conn + } + resp, reusable := conn.exchange(req, raw) or { + conn.close_conn() + // Past the TLS handshake the request bytes may have been (partially) + // written; a non-idempotent method must not be replayed by the outer + // retry loop. (A dial/handshake failure above propagates before this and + // stays retryable, since no request byte was sent.) + if !transport_is_idempotent(method) { + return error_with_code(err.msg(), transport_err_unsafe_retry) + } + return err + } + t.maybe_checkin(mut conn, header, reusable, resp) + return resp +} diff --git a/vlib/net/http/transport_test.v b/vlib/net/http/transport_test.v new file mode 100644 index 000000000..c7ae6ba60 --- /dev/null +++ b/vlib/net/http/transport_test.v @@ -0,0 +1,543 @@ +module http + +// Tests for the connection-pooling Transport (transport.v): keep-alive reuse +// over plain TCP and TLS, no-reuse on `Connection: close` / truncated reads, +// transparent retry on stale pooled connections, idle eviction, and the +// `disable_connection_reuse` opt-out. Each test runs its own loopback server +// with an accept counter: the accept count is the proof of (non-)reuse. +import net +import net.mbedtls +import sync +import time + +const tls_test_cert_path = @VEXEROOT + + '/vlib/net/websocket/tests/autobahn/fuzzing_server_wss/config/server.crt' +const tls_test_key_path = @VEXEROOT + + '/vlib/net/websocket/tests/autobahn/fuzzing_server_wss/config/server.key' + +@[heap] +struct KaSrv { +mut: + mu &sync.Mutex = sync.new_mutex() + accepts int + // connection_close: respond with `Connection: close` and close afterwards. + connection_close bool + // close_after_each: close after each response WITHOUT advertising it — the + // classic stale-pooled-connection scenario. + close_after_each bool + // split_connection_close: emit the close token in a SECOND, repeated + // `Connection` header (`keep-alive` first, then `close`) while keeping the + // socket open — a server that says "do not reuse" via a split field. The + // client must honor it and not pool, even though the connection stays usable. + split_connection_close bool + // drop_on_post: after reading a POST request head, close the connection + // without responding — a reused-connection failure after the bytes were + // written, for a non-idempotent method. + drop_on_post bool + posts int // POST request heads actually read by the server + body string = 'hello' +} + +fn (mut s KaSrv) bump_accepts() { + s.mu.lock() + s.accepts++ + s.mu.unlock() +} + +fn (mut s KaSrv) accept_count() int { + s.mu.lock() + defer { + s.mu.unlock() + } + return s.accepts +} + +fn (mut s KaSrv) post_count() int { + s.mu.lock() + defer { + s.mu.unlock() + } + return s.posts +} + +// ka_read_request_head reads from `conn` until a full request head (terminated +// by a blank line) has arrived, returning it. The test requests carry no bodies. +fn ka_read_request_head(mut conn net.TcpConn) !string { + mut buf := []u8{len: 4096} + mut sofar := []u8{} + for { + n := conn.read(mut buf)! + if n <= 0 { + return error('closed') + } + sofar << buf[..n] + if sofar.bytestr().contains('\r\n\r\n') { + return sofar.bytestr() + } + } + return error('closed') +} + +fn ka_srv_serve_conn(mut s KaSrv, mut conn net.TcpConn) { + defer { + conn.close() or {} + } + conn.set_read_timeout(10 * time.second) + for { + head := ka_read_request_head(mut conn) or { return } + if s.drop_on_post && head.starts_with('POST ') { + s.mu.lock() + s.posts++ + s.mu.unlock() + // Drop the connection without responding: the request bytes were + // written, so this is not a stale-write. + return + } + mut resp := 'HTTP/1.1 200 OK\r\nContent-Length: ${s.body.len}\r\n' + if s.connection_close { + resp += 'Connection: close\r\n' + } + if s.split_connection_close { + resp += 'Connection: keep-alive\r\nConnection: close\r\n' + } + resp += '\r\n' + s.body + conn.write(resp.bytes()) or { return } + if s.connection_close || s.close_after_each { + return + } + } +} + +fn ka_srv_loop(mut s KaSrv, mut listener net.TcpListener) { + for { + mut conn := listener.accept() or { return } + s.bump_accepts() + ka_srv_serve_conn(mut s, mut conn) + } +} + +// start_ka_srv starts a keep-alive capable loopback HTTP server, returning its +// port, listener and server thread. +fn start_ka_srv(mut s KaSrv) !(int, &net.TcpListener, thread) { + mut listener := net.listen_tcp(.ip, '127.0.0.1:0')! + port := listener.addr()!.port()! + th := spawn ka_srv_loop(mut s, mut listener) + return port, listener, th +} + +// stop_ka_srv tears a test server down fully before the test returns: closing +// the idle pool unblocks a server thread reading a kept-alive connection, +// closing the listener aborts its accept, and the join guarantees the thread +// is gone before the next test creates sockets (otherwise the OS can recycle +// this listener's handle for the next test's listener while the old thread is +// still calling accept on it, stealing its connections). +fn stop_ka_srv(mut listener net.TcpListener, th thread) { + close_idle_connections() + listener.close() or {} + th.wait() +} + +// The pool key must isolate distinct TLS configurations even when a field value +// contains the '|' separator, or a request could reuse a connection dialed with +// the wrong cert/CA. cert='a|b',cert_key='c' and cert='a',cert_key='b|c' must +// not collide. +fn test_transport_pool_key_no_delimiter_collision() { + a := Request{ + cert: 'a|b' + cert_key: 'c' + } + b := Request{ + cert: 'a' + cert_key: 'b|c' + } + assert transport_pool_key(a, 'https', 'h', 443) != transport_pool_key(b, 'https', 'h', 443) + // Identical configs still share a key. + a2 := Request{ + cert: 'a|b' + cert_key: 'c' + } + assert transport_pool_key(a, 'https', 'h', 443) == transport_pool_key(a2, 'https', 'h', 443) + // A host containing '|' must not collide with a different host/port split. + assert transport_pool_key(Request{}, 'https', 'h|x', 443) != transport_pool_key(Request{}, + 'https', 'h', 443) +} + +fn test_h1_plain_reuse() { + mut srv := &KaSrv{} + port, mut listener, th := start_ka_srv(mut srv) or { + assert false, 'server: ${err}' + return + } + for i in 0 .. 3 { + resp := fetch(url: 'http://127.0.0.1:${port}/r${i}') or { + assert false, 'fetch ${i}: ${err}' + return + } + assert resp.status_code == 200 + assert resp.body == 'hello' + } + // All three requests must have shared one connection. + assert srv.accept_count() == 1 + stop_ka_srv(mut listener, th) +} + +fn test_h1_no_reuse_on_connection_close_response() { + mut srv := &KaSrv{ + connection_close: true + } + port, mut listener, th := start_ka_srv(mut srv) or { + assert false, 'server: ${err}' + return + } + for i in 0 .. 2 { + resp := fetch(url: 'http://127.0.0.1:${port}/r${i}') or { + assert false, 'fetch ${i}: ${err}' + return + } + assert resp.status_code == 200 + } + // `Connection: close` responses must not be pooled. + assert srv.accept_count() == 2 + stop_ka_srv(mut listener, th) +} + +// A close token carried in a repeated `Connection` header (after a keep-alive +// one) must still be honored: parse_headers stores repeats separately and get() +// returns only the first, so response_allows_reuse joins all values. The server +// keeps the socket open, so a wrongly-pooled connection would be reused (1 +// accept); honoring the close dials fresh each time (2 accepts). +fn test_h1_no_reuse_on_split_connection_close() { + mut srv := &KaSrv{ + split_connection_close: true + } + port, mut listener, th := start_ka_srv(mut srv) or { + assert false, 'server: ${err}' + return + } + for i in 0 .. 2 { + resp := fetch(url: 'http://127.0.0.1:${port}/r${i}') or { + assert false, 'fetch ${i}: ${err}' + return + } + assert resp.status_code == 200 + } + assert srv.accept_count() == 2 + stop_ka_srv(mut listener, th) +} + +// The total idle pool is bounded by max_idle_conns across all pool keys, not +// just per host: checking in more distinct-keyed connections than the cap +// evicts the least-recently-used ones (here k0, k1) and keeps the newest. +fn test_transport_global_idle_cap() { + mut t := new_transport() + t.max_idle_conns = 3 + t.max_idle_conns_per_host = 10 // keep the per-host cap out of the way + for i in 0 .. 5 { + mut c := &H1PooledConn{ + key: 'k${i}' + } + t.checkin(mut c) + } + mut total := 0 + for _, list in t.h1_idle { + total += list.len + } + assert total == 3, 'global idle cap not enforced: ${total}' + assert 'k0' !in t.h1_idle + assert 'k1' !in t.h1_idle + assert 'k2' in t.h1_idle + assert 'k3' in t.h1_idle + assert 'k4' in t.h1_idle +} + +fn test_h1_stale_pooled_connection_is_retried() { + mut srv := &KaSrv{ + close_after_each: true + } + port, mut listener, th := start_ka_srv(mut srv) or { + assert false, 'server: ${err}' + return + } + // First request succeeds and the connection is pooled (the response did not + // advertise the close). The server then closes it. The second request picks + // up the stale connection, fails, and must transparently retry on a fresh + // one. + for i in 0 .. 2 { + resp := fetch(url: 'http://127.0.0.1:${port}/r${i}') or { + assert false, 'fetch ${i}: ${err}' + return + } + assert resp.status_code == 200 + assert resp.body == 'hello' + } + assert srv.accept_count() == 2 + stop_ka_srv(mut listener, th) +} + +// A non-idempotent request that fails on a reused keep-alive connection after +// its bytes were written must NOT be replayed: doing so could duplicate side +// effects. The server reads the POST then drops the connection; the client must +// surface the error without retrying on a fresh connection. +fn test_h1_unsafe_pooled_post_is_not_retried() { + mut srv := &KaSrv{ + drop_on_post: true + } + port, mut listener, th := start_ka_srv(mut srv) or { + assert false, 'server: ${err}' + return + } + // First, a GET to establish and pool a keep-alive connection. + resp := fetch(url: 'http://127.0.0.1:${port}/warmup') or { + assert false, 'warmup fetch: ${err}' + return + } + assert resp.status_code == 200 + // The POST reuses that connection; the server reads it and drops the + // connection. The request must fail rather than be re-sent. + fetch(method: .post, url: 'http://127.0.0.1:${port}/submit', data: 'payload') or { + // expected: the POST failed and was not retried. + assert srv.post_count() == 1, 'POST was replayed ${srv.post_count()} times' + assert srv.accept_count() == 1, 'a fresh connection was opened to retry the POST' + stop_ka_srv(mut listener, th) + return + } + assert false, 'the POST unexpectedly succeeded' + stop_ka_srv(mut listener, th) +} + +fn test_h1_opt_out_disables_reuse() { + mut srv := &KaSrv{} + port, mut listener, th := start_ka_srv(mut srv) or { + assert false, 'server: ${err}' + return + } + for i in 0 .. 2 { + resp := fetch( + url: 'http://127.0.0.1:${port}/r${i}' + disable_connection_reuse: true + ) or { + assert false, 'fetch ${i}: ${err}' + return + } + assert resp.status_code == 200 + } + // Opt-out requests open one connection each. + assert srv.accept_count() == 2 + stop_ka_srv(mut listener, th) +} + +fn test_h1_truncated_read_poisons_connection() { + mut srv := &KaSrv{ + // Larger than the 64KB read buffer, so the body needs several reads and + // the stop limit actually interrupts the transfer mid-stream. + body: 'x'.repeat(200 * 1024) + } + port, mut listener, th := start_ka_srv(mut srv) or { + assert false, 'server: ${err}' + return + } + // A stop_receiving_limit read leaves unread response bytes on the wire, so + // that connection must not be reused. + resp1 := fetch(url: 'http://127.0.0.1:${port}/big', stop_receiving_limit: 1000) or { + assert false, 'fetch 1: ${err}' + return + } + assert resp1.status_code == 200 + resp2 := fetch(url: 'http://127.0.0.1:${port}/after') or { + assert false, 'fetch 2: ${err}' + return + } + assert resp2.status_code == 200 + assert srv.accept_count() == 2 + stop_ka_srv(mut listener, th) +} + +fn test_h1_idle_eviction() { + mut srv := &KaSrv{} + port, mut listener, th := start_ka_srv(mut srv) or { + assert false, 'server: ${err}' + return + } + mut t := new_transport() + t.idle_timeout = 50 * time.millisecond + req := prepare(url: 'http://127.0.0.1:${port}/') or { + assert false, 'prepare: ${err}' + return + } + r1 := t.round_trip(req, .get, 'http', '127.0.0.1', port, '/', '', req.header) or { + assert false, 'round_trip 1: ${err}' + return + } + assert r1.status_code == 200 + time.sleep(150 * time.millisecond) + // The pooled connection has sat idle past the timeout: it must be evicted + // and a fresh one dialled. + r2 := t.round_trip(req, .get, 'http', '127.0.0.1', port, '/', '', req.header) or { + assert false, 'round_trip 2: ${err}' + return + } + assert r2.status_code == 200 + assert srv.accept_count() == 2 + // This test pools in its own private Transport, so flush that one before + // the joint teardown. + t.close_idle() + stop_ka_srv(mut listener, th) +} + +// --- TLS (h1 over mbedtls) reuse --- + +@[heap] +struct TlsKaSrv { +mut: + mu &sync.Mutex = sync.new_mutex() + accepts int +} + +fn (mut s TlsKaSrv) bump_accepts() { + s.mu.lock() + s.accepts++ + s.mu.unlock() +} + +fn (mut s TlsKaSrv) accept_count() int { + s.mu.lock() + defer { + s.mu.unlock() + } + return s.accepts +} + +fn tls_ka_srv_serve_conn(mut conn mbedtls.SSLConn) { + defer { + conn.shutdown() or {} + } + body := 'tls hello' + for { + mut buf := []u8{len: 4096} + mut sofar := []u8{} + for !sofar.bytestr().contains('\r\n\r\n') { + n := conn.read(mut buf) or { return } + if n <= 0 { + return + } + sofar << buf[..n] + } + conn.write_string('HTTP/1.1 200 OK\r\nContent-Length: ${body.len}\r\n\r\n${body}') or { + return + } + } +} + +fn tls_ka_srv_loop(mut s TlsKaSrv, mut listener mbedtls.SSLListener) { + for { + mut conn := listener.accept() or { return } + s.bump_accepts() + // Serve each connection on its own thread: a pooled idle connection + // keeps its serve loop parked in read, which must not block accepting + // the next connection. (The serve threads exit when the test closes + // the pooled connections via close_idle_connections.) + spawn tls_ka_srv_serve_conn(mut conn) + } +} + +fn test_h1_tls_reuse() { + $if windows && !no_vschannel ? { + // The default Windows TLS backend (SChannel) keeps its one-shot path + // until SChannel pooling lands. + eprintln('skipping: SChannel connection pooling is not implemented yet') + return + } + mut port_listener := net.listen_tcp(.ip, '127.0.0.1:0') or { + assert false, 'port: ${err}' + return + } + port := port_listener.addr() or { + assert false, 'addr: ${err}' + return + }.port() or { + assert false, 'port: ${err}' + return + } + port_listener.close() or {} + mut listener := mbedtls.new_ssl_listener('127.0.0.1:${port}', mbedtls.SSLConnectConfig{ + cert: tls_test_cert_path + cert_key: tls_test_key_path + validate: false + }) or { + assert false, 'listener: ${err}' + return + } + mut srv := &TlsKaSrv{} + th := spawn tls_ka_srv_loop(mut srv, mut listener) + for i in 0 .. 2 { + resp := fetch(url: 'https://127.0.0.1:${port}/r${i}', validate: false) or { + assert false, 'fetch ${i}: ${err}' + return + } + assert resp.status_code == 200 + assert resp.body == 'tls hello' + } + // Both https requests must have shared one TLS connection. + assert srv.accept_count() == 1 + // Free the pooled TLS connection (unblocking the server read), then abort + // the accept and join the server thread before the test returns. + close_idle_connections() + listener.shutdown() or {} + th.wait() +} + +// A TLS connection dialled with HTTP/2 disabled (no ALPN) must not satisfy an +// HTTP/2-enabled request to the same origin — the ALPN preference is part of +// the pool key. Forced-h1 requests still share among themselves. +fn test_h1_tls_no_reuse_across_alpn_preference() { + $if windows && !no_vschannel ? { + eprintln('skipping: SChannel connection pooling is not implemented yet') + return + } + mut port_listener := net.listen_tcp(.ip, '127.0.0.1:0') or { + assert false, 'port: ${err}' + return + } + port := port_listener.addr() or { + assert false, 'addr: ${err}' + return + }.port() or { + assert false, 'port: ${err}' + return + } + port_listener.close() or {} + mut listener := mbedtls.new_ssl_listener('127.0.0.1:${port}', mbedtls.SSLConnectConfig{ + cert: tls_test_cert_path + cert_key: tls_test_key_path + validate: false + }) or { + assert false, 'listener: ${err}' + return + } + mut srv := &TlsKaSrv{} + th := spawn tls_ka_srv_loop(mut srv, mut listener) + // 1. Forced-HTTP/1.1 request: dialled without ALPN, pooled under its key. + r1 := fetch(url: 'https://127.0.0.1:${port}/h1', validate: false, enable_http2: false) or { + assert false, 'fetch 1: ${err}' + return + } + assert r1.status_code == 200 + // 2. Default (HTTP/2-enabled) request: must NOT reuse the no-ALPN + // connection; it dials fresh and advertises h2. + r2 := fetch(url: 'https://127.0.0.1:${port}/h2pref', validate: false) or { + assert false, 'fetch 2: ${err}' + return + } + assert r2.status_code == 200 + assert srv.accept_count() == 2 + // 3. Another forced-HTTP/1.1 request: reuses connection 1. + r3 := fetch(url: 'https://127.0.0.1:${port}/h1again', validate: false, enable_http2: false) or { + assert false, 'fetch 3: ${err}' + return + } + assert r3.status_code == 200 + assert srv.accept_count() == 2 + close_idle_connections() + listener.shutdown() or {} + th.wait() +} diff --git a/vlib/veb/tests/veb_test.v b/vlib/veb/tests/veb_test.v index 3600266c6..2b5a52d69 100644 --- a/vlib/veb/tests/veb_test.v +++ b/vlib/veb/tests/veb_test.v @@ -137,7 +137,15 @@ fn test_http_client_index() { assert_common_http_headers(x)! assert x.header.get(.content_type)? == 'text/plain' assert x.body == 'Welcome to veb' - assert x.header.get(.connection)? == 'close' + // The default http client keeps connections alive (it no longer sends + // `Connection: close`), so veb must not answer with a close either. + if conn_header := x.header.get(.connection) { + assert conn_header != 'close' + } + // Opting out of connection reuse restores the historical behavior + // end-to-end: the client sends `Connection: close` and veb honors it. + y := http.fetch(url: 'http://${localserver}/', disable_connection_reuse: true) or { panic(err) } + assert y.header.get(.connection)? == 'close' } fn test_http_client_404() { -- 2.39.5