From 94a763e85cd34a51ee9c9609c445d6f9d5490ad1 Mon Sep 17 00:00:00 2001 From: Richard Wheeler Date: Sun, 7 Jun 2026 09:23:25 -0400 Subject: [PATCH] net.http: stream response callbacks and stop limits over HTTP/2 (#27369) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * net.http: stream response callbacks and stop limits over HTTP/2 (closes #27368) The HTTP/2 fetch path (#27362) buffered the entire response body, so requests using on_progress / on_progress_body / stop_copying_limit / stop_receiving_limit were forced onto HTTP/1.1. This adds real streaming support so they work on the HTTP/2 path too. - New H2ClientRequest fields: on_data (per-DATA-frame callback) and stop_copying_limit / stop_receiving_limit, mirroring the HTTP/1.1 semantics. - H2Conn.read_response now tracks cumulative body bytes, reads Content-Length if present, fires on_data per DATA frame (including chunk, cumulative body_so_far, content-length, and status), respects stop_copying_limit (caps the response body while still firing callbacks and draining the stream), and respects stop_receiving_limit (breaks the read loop early). - The h2_do shim in backend.c.v adapts the Request's on_progress and on_progress_body into a single H2DataFn closure and threads the two stop limits through. The previous gate (uses_response_streaming) is removed, and the enable_http2 docs note that on_progress fires per DATA payload on h2 rather than per raw network read. Tests over the in-memory transport assert: on_data fires per DATA frame with cumulative body_so_far, Content-Length (when present), and status; stop_copying_limit caps the response body while callbacks keep firing across all chunks; stop_receiving_limit breaks the loop early. Verified end-to-end against https://www.google.com/ — http.fetch(enable_http2: true, on_progress_body: f) reports HTTP/2.0, status 200, and the on_progress_body callback fires once per 16 KiB DATA frame with cumulative bytes matching the final body length. Passes under -W -cstrict -cc clang. Co-Authored-By: Claude Opus 4.7 * net.http: RST_STREAM(CANCEL) and mark H2Conn unusable on early termination When stop_receiving_limit triggered, the response stream was left open without sending RST_STREAM. On a reused H2Conn the peer's in-flight DATA frames for the abandoned stream would still arrive, consuming the connection-level receive window and risking starvation of subsequent requests. Fix: when bailing early, send RST_STREAM with error code CANCEL on the request stream (RFC 7540 Section 8.1.4 / 5.4.2) so the peer stops sending more DATA, and set a new H2Conn.aborted flag so subsequent H2Conn.do() calls return a clear error rather than proceeding on a half-drained connection. Strengthens the stop_receiving_limit test to assert the client emitted RST_STREAM(CANCEL) on the request stream and that a second do() on the same connection errors out. Co-Authored-By: Claude Opus 4.7 --------- Co-authored-by: Richard Wheeler Co-authored-by: Claude Opus 4.7 --- vlib/net/http/backend.c.v | 45 +++++++--- vlib/net/http/h2_client.v | 9 -- vlib/net/http/h2_client_test.v | 23 ----- vlib/net/http/h2_conn.v | 74 +++++++++++++++- vlib/net/http/h2_conn_test.v | 152 +++++++++++++++++++++++++++++++++ vlib/net/http/http.v | 2 +- vlib/net/http/request.v | 2 +- 7 files changed, 259 insertions(+), 48 deletions(-) diff --git a/vlib/net/http/backend.c.v b/vlib/net/http/backend.c.v index 83ee1ba1b..132eb01b2 100644 --- a/vlib/net/http/backend.c.v +++ b/vlib/net/http/backend.c.v @@ -22,13 +22,10 @@ fn net_ssl_do(req &Request, port int, method Method, host_name string, path stri eprintln('') } // Advertise ALPN `h2` (with an `http/1.1` fallback) only when HTTP/2 is - // requested, so existing callers see no change on the wire. Requests that - // rely on streaming response callbacks or stop limits stay on HTTP/1.1, - // since the HTTP/2 path buffers the full response; the ALPN offer is gated - // here (before negotiation) because once a server selects `h2` we cannot - // fall back to HTTP/1.1 framing on the same connection. - use_h2 := req.enable_http2 && !req.uses_response_streaming() - alpn := if use_h2 { ['h2', 'http/1.1'] } else { []string{} } + // requested, so existing callers see no change on the wire. The HTTP/2 read + // path now feeds the same streaming callbacks and honors the stop limits, + // so they no longer force HTTP/1.1. + alpn := if req.enable_http2 { ['h2', 'http/1.1'] } else { []string{} } for { mut ssl_conn := ssl.new_ssl_conn( verify: req.verify @@ -53,7 +50,7 @@ fn net_ssl_do(req &Request, port int, method Method, host_name string, path stri } // If the server negotiated HTTP/2 via ALPN, speak it; otherwise fall // back to the existing HTTP/1.1 path unchanged. - if use_h2 && ssl_conn.negotiated_alpn() == 'h2' { + if req.enable_http2 && ssl_conn.negotiated_alpn() == 'h2' { return req.h2_do(mut ssl_conn, method, host_name, port, path, data, header)! } return req.do_request(req_headers, mut ssl_conn)! @@ -63,12 +60,40 @@ fn net_ssl_do(req &Request, port int, method Method, host_name string, path stri // h2_do runs a single request over an HTTP/2 connection on an already-dialled, // ALPN-negotiated `h2` TLS socket, and returns the response as a net.http -// Response. +// Response. The request's streaming callbacks (on_progress / on_progress_body) +// and stop limits are adapted onto the H2 chunk hook so they fire per DATA +// frame, matching the HTTP/1.1 streaming semantics as closely as is possible +// on the framed wire (on_progress receives DATA payloads rather than raw +// network reads). fn (req &Request) h2_do(mut ssl_conn ssl.SSLConn, method Method, host_name string, port int, path string, data string, header Header) !Response { defer { ssl_conn.shutdown() or {} } - h2req := req.to_h2_request(method, h2_authority(host_name, port), path, data, header) + base := req.to_h2_request(method, h2_authority(host_name, port), path, data, header) + on_progress := req.on_progress + on_progress_body := req.on_progress_body + mut on_data := H2DataFn(unsafe { nil }) + if on_progress != unsafe { nil } || on_progress_body != unsafe { nil } { + on_data = fn [req, on_progress, on_progress_body] (chunk []u8, body_so_far u64, body_expected u64, status int) ! { + if on_progress != unsafe { nil } { + on_progress(req, chunk, body_so_far)! + } + if on_progress_body != unsafe { nil } { + on_progress_body(req, chunk, body_so_far, body_expected, status)! + } + } + } + h2req := H2ClientRequest{ + method: base.method + scheme: base.scheme + authority: base.authority + path: base.path + headers: base.headers + body: base.body + on_data: on_data + stop_copying_limit: req.stop_copying_limit + stop_receiving_limit: req.stop_receiving_limit + } mut conn := new_h2_conn(ssl_conn) h2resp := conn.do(h2req)! if req.on_finish != unsafe { nil } { diff --git a/vlib/net/http/h2_client.v b/vlib/net/http/h2_client.v index 6acfb266a..f11fc9b45 100644 --- a/vlib/net/http/h2_client.v +++ b/vlib/net/http/h2_client.v @@ -73,15 +73,6 @@ fn (req &Request) to_h2_request(method Method, authority string, path string, da } } -// uses_response_streaming reports whether the request relies on streaming -// response callbacks or stop limits. The HTTP/2 path buffers the full response, -// so such requests must not negotiate HTTP/2 and instead use the HTTP/1.1 path, -// which honors these. (Streaming over HTTP/2 is a planned follow-up.) -fn (req &Request) uses_response_streaming() bool { - return req.on_progress != unsafe { nil } || req.on_progress_body != unsafe { nil } - || req.stop_copying_limit >= 0 || req.stop_receiving_limit >= 0 -} - // h2_response_to_http converts an HTTP/2 response into a net.http Response, // decoding any Content-Encoding the same way the HTTP/1.1 path does. fn h2_response_to_http(h2resp H2ClientResponse) Response { diff --git a/vlib/net/http/h2_client_test.v b/vlib/net/http/h2_client_test.v index f770b89fe..57585e0e4 100644 --- a/vlib/net/http/h2_client_test.v +++ b/vlib/net/http/h2_client_test.v @@ -103,26 +103,3 @@ fn test_to_h2_request_authority_from_host_header() { assert h2req.authority == 'override.example:8443' assert !h2req.headers.any(it.name == 'host') } - -fn test_uses_response_streaming() { - assert !(Request{}).uses_response_streaming() - assert (Request{ - stop_copying_limit: 0 - }).uses_response_streaming() - assert (Request{ - stop_receiving_limit: 100 - }).uses_response_streaming() - progress := fn (request &Request, chunk []u8, read_so_far u64) ! {} - assert (Request{ - on_progress: progress - }).uses_response_streaming() - body_progress := fn (request &Request, chunk []u8, body_read_so_far u64, body_expected_size u64, status_code int) ! {} - assert (Request{ - on_progress_body: body_progress - }).uses_response_streaming() - // A non-streaming callback (on_finish) does not force HTTP/1.1. - finish := fn (request &Request, final_size u64) ! {} - assert !(Request{ - on_finish: finish - }).uses_response_streaming() -} diff --git a/vlib/net/http/h2_conn.v b/vlib/net/http/h2_conn.v index 654aafd47..d4b58084c 100644 --- a/vlib/net/http/h2_conn.v +++ b/vlib/net/http/h2_conn.v @@ -42,6 +42,12 @@ mut: max_header_list_size u32 = max_u32 } +// H2DataFn is called for each DATA frame received on the response stream, with +// the chunk's bytes, the cumulative body bytes received (including this chunk), +// the body length from Content-Length if known (else 0), and the response +// status code. +pub type H2DataFn = fn (chunk []u8, body_so_far u64, body_expected u64, status int) ! + // H2ClientRequest describes a single HTTP/2 request. Header names in `headers` // must be lowercase (RFC 7540 Section 8.1.2); the pseudo-headers are filled in // from the other fields. @@ -53,6 +59,20 @@ pub: path string = '/' headers []H2HeaderField body []u8 + // Optional response chunk callback, called after each DATA frame's payload + // is received. The arguments are the chunk bytes (not yet copied into the + // response body), the cumulative body bytes received so far (including this + // chunk), the body length from Content-Length (0 when not present), and the + // response status code. + on_data H2DataFn = unsafe { nil } + // stop_copying_limit, when >= 0, caps the cumulative body bytes copied into + // the response body; further DATA chunks are dropped but the callback keeps + // firing and the stream is drained to completion. + stop_copying_limit i64 = -1 + // stop_receiving_limit, when >= 0, causes the response read loop to break + // once that many body bytes have been received. The callback fires for the + // final chunk; no further callbacks fire after that. + stop_receiving_limit i64 = -1 } // H2ClientResponse is the result of an HTTP/2 request. @@ -78,6 +98,12 @@ mut: stream_send_window i64 // send window for cur_stream_id handshaked bool goaway bool + // aborted is set when this connection terminated a stream early + // (RST_STREAM sent without draining the remaining DATA). Subsequent + // requests on the same connection must fail rather than risk being starved + // by leftover DATA frames the peer had already sent for the cancelled + // stream. + aborted bool } // new_h2_conn creates a client connection over `transport`. The HTTP/2 @@ -94,6 +120,9 @@ pub fn (mut c H2Conn) do(req H2ClientRequest) !H2ClientResponse { if c.goaway { return error('h2: connection is shutting down (GOAWAY)') } + if c.aborted { + return error('h2: connection is no longer usable after an early stream termination') + } stream_id := c.next_stream_id c.next_stream_id += 2 c.cur_stream_id = stream_id @@ -114,7 +143,7 @@ pub fn (mut c H2Conn) do(req H2ClientRequest) !H2ClientResponse { if has_body { c.send_body(stream_id, req.body)! } - return c.read_response(stream_id)! + return c.read_response(stream_id, req)! } fn (mut c H2Conn) handshake() ! { @@ -135,10 +164,14 @@ fn (mut c H2Conn) handshake() ! { } // read_response reads frames until `stream_id` is closed, returning its -// response and servicing connection-level frames along the way. -fn (mut c H2Conn) read_response(stream_id u32) !H2ClientResponse { +// response and servicing connection-level frames along the way. The streaming +// options on `req` (on_data callback and the two stop limits) are honored +// while reading DATA frames, matching the HTTP/1.1 streaming semantics. +fn (mut c H2Conn) read_response(stream_id u32, req H2ClientRequest) !H2ClientResponse { mut resp := H2ClientResponse{} mut got_headers := false + mut body_so_far := u64(0) + mut body_expected := u64(0) for { frame := c.next_frame()! if c.handle_conn_frame(frame)! { @@ -155,6 +188,9 @@ fn (mut c H2Conn) read_response(stream_id u32) !H2ClientResponse { resp.status = f.value.int() } else if !f.name.starts_with(':') { resp.headers << f + if f.name == 'content-length' { + body_expected = f.value.u64() + } } } got_headers = true @@ -166,8 +202,24 @@ fn (mut c H2Conn) read_response(stream_id u32) !H2ClientResponse { if frame.stream_id != stream_id { continue } - resp.body << frame.data if frame.data.len > 0 { + body_so_far += u64(frame.data.len) + // Append the chunk to the response body unless the copy + // limit has been reached; the callback still fires. + if req.stop_copying_limit < 0 + || i64(body_so_far) - i64(frame.data.len) < req.stop_copying_limit { + if req.stop_copying_limit >= 0 && i64(body_so_far) > req.stop_copying_limit { + remaining := req.stop_copying_limit - (i64(body_so_far) - i64(frame.data.len)) + if remaining > 0 { + resp.body << frame.data[..int(remaining)] + } + } else { + resp.body << frame.data + } + } + if req.on_data != unsafe { nil } { + req.on_data(frame.data, body_so_far, body_expected, resp.status)! + } // Replenish flow control so the peer keeps sending. c.send_window_update(0, u32(frame.data.len))! c.send_window_update(stream_id, u32(frame.data.len))! @@ -175,6 +227,20 @@ fn (mut c H2Conn) read_response(stream_id u32) !H2ClientResponse { if frame.end_stream { break } + if req.stop_receiving_limit >= 0 && i64(body_so_far) >= req.stop_receiving_limit { + // Cancel the stream (RFC 7540 Section 8.1.4 / 5.4.2) so the + // peer stops sending more DATA, and mark the connection + // unusable: in-flight DATA frames that the peer has already + // sent for this stream would otherwise consume the + // connection-level receive window and block subsequent + // requests on the same H2Conn. + c.send_frame(H2RstStreamFrame{ + stream_id: stream_id + error_code: u32(H2ErrorCode.cancel) + })! + c.aborted = true + break + } } H2RstStreamFrame { if frame.stream_id == stream_id { diff --git a/vlib/net/http/h2_conn_test.v b/vlib/net/http/h2_conn_test.v index ee1eaa0a3..bad54f7be 100644 --- a/vlib/net/http/h2_conn_test.v +++ b/vlib/net/http/h2_conn_test.v @@ -400,3 +400,155 @@ fn test_h2_fetch_glue_roundtrip() { assert resp.body == ' world' assert (resp.header.get_custom('content-type') or { '' }) == 'text/plain' } + +// build_streamed_response builds a server stream that delivers the response +// body in fixed-size chunks, useful for streaming tests. +fn build_streamed_response(status string, content_length string, chunks [][]u8) []u8 { + mut senc := H2HpackEncoder{} + mut fields := [H2HeaderField{':status', status}] + if content_length != '' { + fields << H2HeaderField{'content-length', content_length} + } + hdr := senc.encode(fields) + mut out := []u8{} + out << H2Frame(H2SettingsFrame{}).encode() + out << H2Frame(H2SettingsFrame{ + ack: true + }).encode() + last_is_headers := chunks.len == 0 + out << H2Frame(H2HeadersFrame{ + stream_id: 1 + fragment: hdr + end_headers: true + end_stream: last_is_headers + }).encode() + for i, chunk in chunks { + out << H2Frame(H2DataFrame{ + stream_id: 1 + data: chunk + end_stream: i == chunks.len - 1 + }).encode() + } + return out +} + +// ChunkCapture records on_data invocations via a reference captured by the +// returned closure. +struct ChunkCapture { +mut: + chunks [][]u8 + running []u64 + expected []u64 + status []int +} + +fn make_capture_fn(cap &ChunkCapture) H2DataFn { + return fn [cap] (chunk []u8, body_so_far u64, body_expected u64, status int) ! { + unsafe { + cap.chunks << chunk.clone() + cap.running << body_so_far + cap.expected << body_expected + cap.status << status + } + } +} + +fn test_h2_on_data_fires_per_chunk() { + mut cap := &ChunkCapture{} + inbound := build_streamed_response('200', '12', [ + 'foo'.bytes(), + 'bar'.bytes(), + 'baz quux'.bytes(), + ]) + mut t := &MockTransport{ + inbound: inbound + } + mut c := new_h2_conn(t) + resp := c.do(H2ClientRequest{ + authority: 'h.example' + on_data: make_capture_fn(cap) + })! + assert resp.status == 200 + assert resp.body.bytestr() == 'foobarbaz quux' + // Three DATA frames -> three callback invocations. + assert cap.chunks.len == 3 + assert cap.chunks[0].bytestr() == 'foo' + assert cap.chunks[1].bytestr() == 'bar' + assert cap.chunks[2].bytestr() == 'baz quux' + // body_so_far is cumulative including the current chunk. + assert cap.running == [u64(3), u64(6), u64(14)] + // content-length is reported. + assert cap.expected == [u64(12), u64(12), u64(12)] + // Status was known by the first callback (headers arrived first). + assert cap.status == [200, 200, 200] +} + +fn test_h2_stop_copying_limit_caps_body_but_keeps_callback() { + mut cap := &ChunkCapture{} + inbound := build_streamed_response('200', '', [ + 'AAAA'.bytes(), + 'BBBB'.bytes(), + 'CCCC'.bytes(), + ]) + mut t := &MockTransport{ + inbound: inbound + } + mut c := new_h2_conn(t) + resp := c.do(H2ClientRequest{ + authority: 'h.example' + on_data: make_capture_fn(cap) + stop_copying_limit: 6 + })! + // Body is capped at 6 bytes (4 from first chunk + 2 from second). + assert resp.body.bytestr() == 'AAAABB' + // All three chunks still produced callbacks. + assert cap.chunks.len == 3 + assert cap.chunks[0].bytestr() == 'AAAA' + assert cap.chunks[1].bytestr() == 'BBBB' + assert cap.chunks[2].bytestr() == 'CCCC' + // body_so_far still reports the true cumulative size. + assert cap.running == [u64(4), u64(8), u64(12)] +} + +fn test_h2_stop_receiving_limit_breaks_early() { + mut cap := &ChunkCapture{} + inbound := build_streamed_response('200', '', [ + 'XXXX'.bytes(), + 'YYYY'.bytes(), + 'ZZZZ'.bytes(), // should never be delivered + ]) + mut t := &MockTransport{ + inbound: inbound + } + mut c := new_h2_conn(t) + resp := c.do(H2ClientRequest{ + authority: 'h.example' + on_data: make_capture_fn(cap) + stop_receiving_limit: 6 + })! + // Loop breaks after the second DATA frame (8 bytes >= limit 6); body + // contains both chunks delivered so far. + assert resp.body.bytestr() == 'XXXXYYYY' + assert cap.chunks.len == 2 + assert cap.chunks[1].bytestr() == 'YYYY' + + // On early termination the client must send RST_STREAM(CANCEL) on the + // stream, and the connection must refuse further requests. + mut saw_cancel := false + mut pos := h2_client_preface.len + for pos < t.outbound.len { + frame, n := h2_read_frame(t.outbound[pos..])! + pos += n + if frame is H2RstStreamFrame { + if frame.stream_id == 1 && frame.error_code == u32(H2ErrorCode.cancel) { + saw_cancel = true + } + } + } + assert saw_cancel, 'expected RST_STREAM(CANCEL) on early termination' + c.do(H2ClientRequest{ authority: 'h.example' }) or { + assert err.msg().contains('no longer usable') + return + } + assert false, 'expected error on reuse after early termination' +} diff --git a/vlib/net/http/http.v b/vlib/net/http/http.v index c77644c59..e304d2b68 100644 --- a/vlib/net/http/http.v +++ b/vlib/net/http/http.v @@ -36,7 +36,7 @@ pub mut: in_memory_verification bool // if true, verify, cert, and cert_key are read from memory, not from a file allow_redirect bool = true // whether to allow redirect max_retries int = 5 // maximum number of retries required when an underlying socket error occurs - enable_http2 bool // opt in to HTTP/2 over TLS: advertise ALPN `h2` and, if the server selects it, speak HTTP/2 instead of HTTP/1.1. Requests using streaming response callbacks (on_progress/on_progress_body) or stop limits stay on HTTP/1.1, since the HTTP/2 path buffers the full response. + enable_http2 bool // opt in to HTTP/2 over TLS: advertise ALPN `h2` and, if the server selects it, speak HTTP/2 instead of HTTP/1.1. on_progress / on_progress_body / stop_copying_limit / stop_receiving_limit are honored on the HTTP/2 path; on_progress fires per DATA frame payload rather than per raw network read. // callbacks to allow custom reporting code to run, while the request is running, and to implement streaming on_redirect RequestRedirectFn = unsafe { nil } on_progress RequestProgressFn = unsafe { nil } diff --git a/vlib/net/http/request.v b/vlib/net/http/request.v index 0623ca07f..07ea982e7 100644 --- a/vlib/net/http/request.v +++ b/vlib/net/http/request.v @@ -46,7 +46,7 @@ pub mut: in_memory_verification bool // if true, verify, cert, and cert_key are read from memory, not from a file allow_redirect bool = true // whether to allow redirect max_retries int = 5 // maximum number of retries required when an underlying socket error occurs - enable_http2 bool // opt in to HTTP/2 over TLS: advertise ALPN `h2` and, if the server selects it, speak HTTP/2 instead of HTTP/1.1. Requests using streaming response callbacks (on_progress/on_progress_body) or stop limits stay on HTTP/1.1, since the HTTP/2 path buffers the full response. + enable_http2 bool // opt in to HTTP/2 over TLS: advertise ALPN `h2` and, if the server selects it, speak HTTP/2 instead of HTTP/1.1. on_progress / on_progress_body / stop_copying_limit / stop_receiving_limit are honored on the HTTP/2 path; on_progress fires per DATA frame payload rather than per raw network read. // callbacks to allow custom reporting code to run, while the request is running, and to implement streaming on_redirect RequestRedirectFn = unsafe { nil } on_progress RequestProgressFn = unsafe { nil } -- 2.39.5