v / vlib / net / http / h2_conn.v
499 lines · 472 sloc · 15.21 KB · 94a763e85cd34a51ee9c9609c445d6f9d5490ad1
Raw
1// Copyright (c) 2019-2024 Alexander Medvednikov. All rights reserved.
2// Use of this source code is governed by an MIT license
3// that can be found in the LICENSE file.
4module http
5
6// This file implements a minimal HTTP/2 client connection (RFC 7540) on top of
7// the framing and HPACK layers. It is intentionally synchronous and handles a
8// single request at a time: it sends one request, then reads frames until that
9// stream completes, servicing connection-level frames (SETTINGS, PING,
10// WINDOW_UPDATE, GOAWAY) inline. Stream multiplexing and a background reader
11// are a follow-up; this is the smallest useful, testable client.
12
13// h2_client_preface is the fixed sequence a client sends to start an HTTP/2
14// connection, immediately followed by a SETTINGS frame (RFC 7540 Section 3.5).
15pub const h2_client_preface = 'PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n'
16
17// h2_default_initial_window is the initial flow-control window for both the
18// connection and new streams (RFC 7540 Section 6.9.2).
19pub const h2_default_initial_window = u32(65535)
20
21// h2_conn_read_chunk is the size of each transport read.
22const h2_conn_read_chunk = 16 * 1024
23
24// H2Transport is the byte transport an H2Conn runs over: typically an
25// ALPN-negotiated `h2` TLS connection, but any reader/writer works (which makes
26// the connection testable without a socket). Its method signatures match
27// net.ssl.SSLConn, so an SSLConn satisfies it directly.
28pub interface H2Transport {
29mut:
30 read(mut buf []u8) !int
31 write(buf []u8) !int
32}
33
34// H2PeerSettings holds the peer's SETTINGS, with HTTP/2 defaults.
35struct H2PeerSettings {
36mut:
37 header_table_size u32 = 4096
38 enable_push bool = true
39 max_concurrent_streams u32 = max_u32
40 initial_window_size u32 = 65535
41 max_frame_size u32 = 16384
42 max_header_list_size u32 = max_u32
43}
44
45// H2DataFn is called for each DATA frame received on the response stream, with
46// the chunk's bytes, the cumulative body bytes received (including this chunk),
47// the body length from Content-Length if known (else 0), and the response
48// status code.
49pub type H2DataFn = fn (chunk []u8, body_so_far u64, body_expected u64, status int) !
50
51// H2ClientRequest describes a single HTTP/2 request. Header names in `headers`
52// must be lowercase (RFC 7540 Section 8.1.2); the pseudo-headers are filled in
53// from the other fields.
54pub struct H2ClientRequest {
55pub:
56 method string = 'GET'
57 scheme string = 'https'
58 authority string
59 path string = '/'
60 headers []H2HeaderField
61 body []u8
62 // Optional response chunk callback, called after each DATA frame's payload
63 // is received. The arguments are the chunk bytes (not yet copied into the
64 // response body), the cumulative body bytes received so far (including this
65 // chunk), the body length from Content-Length (0 when not present), and the
66 // response status code.
67 on_data H2DataFn = unsafe { nil }
68 // stop_copying_limit, when >= 0, caps the cumulative body bytes copied into
69 // the response body; further DATA chunks are dropped but the callback keeps
70 // firing and the stream is drained to completion.
71 stop_copying_limit i64 = -1
72 // stop_receiving_limit, when >= 0, causes the response read loop to break
73 // once that many body bytes have been received. The callback fires for the
74 // final chunk; no further callbacks fire after that.
75 stop_receiving_limit i64 = -1
76}
77
78// H2ClientResponse is the result of an HTTP/2 request.
79pub struct H2ClientResponse {
80pub mut:
81 status int
82 headers []H2HeaderField
83 body []u8
84}
85
86// H2Conn is a client-side HTTP/2 connection.
87pub struct H2Conn {
88mut:
89 transport H2Transport
90 encoder H2HpackEncoder
91 decoder H2HpackDecoder
92 peer H2PeerSettings
93 rbuf []u8 // buffered bytes read from the transport, not yet consumed
94 pending []H2Frame // stream frames read early (while sending), to replay
95 next_stream_id u32 = 1 // clients use odd stream ids
96 cur_stream_id u32 // the stream currently being driven by do()
97 send_window i64 = 65535 // connection-level send flow-control window
98 stream_send_window i64 // send window for cur_stream_id
99 handshaked bool
100 goaway bool
101 // aborted is set when this connection terminated a stream early
102 // (RST_STREAM sent without draining the remaining DATA). Subsequent
103 // requests on the same connection must fail rather than risk being starved
104 // by leftover DATA frames the peer had already sent for the cancelled
105 // stream.
106 aborted bool
107}
108
109// new_h2_conn creates a client connection over `transport`. The HTTP/2
110// connection preface is sent lazily on the first request.
111pub fn new_h2_conn(transport H2Transport) &H2Conn {
112 return &H2Conn{
113 transport: transport
114 }
115}
116
117// do sends `req` and returns the response, after the request's stream closes.
118pub fn (mut c H2Conn) do(req H2ClientRequest) !H2ClientResponse {
119 c.handshake()!
120 if c.goaway {
121 return error('h2: connection is shutting down (GOAWAY)')
122 }
123 if c.aborted {
124 return error('h2: connection is no longer usable after an early stream termination')
125 }
126 stream_id := c.next_stream_id
127 c.next_stream_id += 2
128 c.cur_stream_id = stream_id
129 c.stream_send_window = i64(c.peer.initial_window_size)
130
131 mut fields := [
132 H2HeaderField{':method', req.method},
133 H2HeaderField{':scheme', req.scheme},
134 H2HeaderField{':authority', req.authority},
135 H2HeaderField{':path', req.path},
136 ]
137 for h in req.headers {
138 fields << h
139 }
140 block := c.encoder.encode(fields)
141 has_body := req.body.len > 0
142 c.send_header_block(stream_id, block, !has_body)!
143 if has_body {
144 c.send_body(stream_id, req.body)!
145 }
146 return c.read_response(stream_id, req)!
147}
148
149fn (mut c H2Conn) handshake() ! {
150 if c.handshaked {
151 return
152 }
153 c.write_all(h2_client_preface.bytes())!
154 // Advertise our SETTINGS: refuse server push, and use the protocol defaults
155 // otherwise. (Our decoder uses the default 4096-byte HPACK table.)
156 c.send_frame(H2SettingsFrame{
157 settings: [
158 H2Setting{h2_settings_enable_push, 0},
159 H2Setting{h2_settings_initial_window_size, h2_default_initial_window},
160 H2Setting{h2_settings_max_frame_size, h2_default_max_frame_size},
161 ]
162 })!
163 c.handshaked = true
164}
165
166// read_response reads frames until `stream_id` is closed, returning its
167// response and servicing connection-level frames along the way. The streaming
168// options on `req` (on_data callback and the two stop limits) are honored
169// while reading DATA frames, matching the HTTP/1.1 streaming semantics.
170fn (mut c H2Conn) read_response(stream_id u32, req H2ClientRequest) !H2ClientResponse {
171 mut resp := H2ClientResponse{}
172 mut got_headers := false
173 mut body_so_far := u64(0)
174 mut body_expected := u64(0)
175 for {
176 frame := c.next_frame()!
177 if c.handle_conn_frame(frame)! {
178 continue
179 }
180 match frame {
181 H2HeadersFrame {
182 if frame.stream_id != stream_id {
183 continue
184 }
185 fragment := c.collect_header_block(frame.fragment, frame.end_headers, stream_id)!
186 for f in c.decoder.decode(fragment)! {
187 if f.name == ':status' {
188 resp.status = f.value.int()
189 } else if !f.name.starts_with(':') {
190 resp.headers << f
191 if f.name == 'content-length' {
192 body_expected = f.value.u64()
193 }
194 }
195 }
196 got_headers = true
197 if frame.end_stream {
198 break
199 }
200 }
201 H2DataFrame {
202 if frame.stream_id != stream_id {
203 continue
204 }
205 if frame.data.len > 0 {
206 body_so_far += u64(frame.data.len)
207 // Append the chunk to the response body unless the copy
208 // limit has been reached; the callback still fires.
209 if req.stop_copying_limit < 0
210 || i64(body_so_far) - i64(frame.data.len) < req.stop_copying_limit {
211 if req.stop_copying_limit >= 0 && i64(body_so_far) > req.stop_copying_limit {
212 remaining := req.stop_copying_limit - (i64(body_so_far) - i64(frame.data.len))
213 if remaining > 0 {
214 resp.body << frame.data[..int(remaining)]
215 }
216 } else {
217 resp.body << frame.data
218 }
219 }
220 if req.on_data != unsafe { nil } {
221 req.on_data(frame.data, body_so_far, body_expected, resp.status)!
222 }
223 // Replenish flow control so the peer keeps sending.
224 c.send_window_update(0, u32(frame.data.len))!
225 c.send_window_update(stream_id, u32(frame.data.len))!
226 }
227 if frame.end_stream {
228 break
229 }
230 if req.stop_receiving_limit >= 0 && i64(body_so_far) >= req.stop_receiving_limit {
231 // Cancel the stream (RFC 7540 Section 8.1.4 / 5.4.2) so the
232 // peer stops sending more DATA, and mark the connection
233 // unusable: in-flight DATA frames that the peer has already
234 // sent for this stream would otherwise consume the
235 // connection-level receive window and block subsequent
236 // requests on the same H2Conn.
237 c.send_frame(H2RstStreamFrame{
238 stream_id: stream_id
239 error_code: u32(H2ErrorCode.cancel)
240 })!
241 c.aborted = true
242 break
243 }
244 }
245 H2RstStreamFrame {
246 if frame.stream_id == stream_id {
247 return error('h2: stream reset by peer (${h2_error_code_name(frame.error_code)})')
248 }
249 }
250 else {
251 // PRIORITY / PUSH_PROMISE / stray CONTINUATION / unknown: ignore.
252 }
253 }
254 }
255 if !got_headers {
256 return error('h2: stream closed without a response')
257 }
258 return resp
259}
260
261// collect_header_block returns the full HPACK block for a HEADERS frame,
262// reading and concatenating CONTINUATION frames until END_HEADERS.
263fn (mut c H2Conn) collect_header_block(first []u8, end_headers bool, stream_id u32) ![]u8 {
264 if end_headers {
265 return first
266 }
267 mut fragment := first.clone()
268 for {
269 frame := c.read_frame()!
270 if frame is H2ContinuationFrame {
271 if frame.stream_id != stream_id {
272 return error('h2: CONTINUATION on the wrong stream')
273 }
274 fragment << frame.fragment
275 if frame.end_headers {
276 break
277 }
278 } else {
279 return error('h2: expected a CONTINUATION frame')
280 }
281 }
282 return fragment
283}
284
285// handle_conn_frame services a connection-level frame, returning true if the
286// frame was fully handled here (so the caller should skip it).
287fn (mut c H2Conn) handle_conn_frame(frame H2Frame) !bool {
288 match frame {
289 H2SettingsFrame {
290 if !frame.ack {
291 c.apply_settings(frame.settings)
292 c.send_frame(H2SettingsFrame{
293 ack: true
294 })!
295 }
296 return true
297 }
298 H2PingFrame {
299 if !frame.ack {
300 c.send_frame(H2PingFrame{
301 ack: true
302 data: frame.data
303 })!
304 }
305 return true
306 }
307 H2WindowUpdateFrame {
308 if frame.stream_id == 0 {
309 c.send_window += i64(frame.window_size_increment)
310 } else if frame.stream_id == c.cur_stream_id {
311 c.stream_send_window += i64(frame.window_size_increment)
312 }
313 return true
314 }
315 H2GoawayFrame {
316 c.goaway = true
317 return error('h2: GOAWAY received (${h2_error_code_name(frame.error_code)})')
318 }
319 else {
320 return false
321 }
322 }
323}
324
325fn (mut c H2Conn) apply_settings(settings []H2Setting) {
326 for s in settings {
327 match s.id {
328 h2_settings_header_table_size {
329 c.peer.header_table_size = s.value
330 }
331 h2_settings_enable_push {
332 c.peer.enable_push = s.value != 0
333 }
334 h2_settings_max_concurrent_streams {
335 c.peer.max_concurrent_streams = s.value
336 }
337 h2_settings_initial_window_size {
338 // RFC 7540 Section 6.9.2: a change to the initial window size
339 // retroactively adjusts the active stream's send window by the
340 // delta.
341 delta := i64(s.value) - i64(c.peer.initial_window_size)
342 c.peer.initial_window_size = s.value
343 c.stream_send_window += delta
344 }
345 h2_settings_max_frame_size {
346 c.peer.max_frame_size = s.value
347 }
348 h2_settings_max_header_list_size {
349 c.peer.max_header_list_size = s.value
350 }
351 else {} // unknown settings are ignored (RFC 7540 Section 6.5.2)
352 }
353 }
354}
355
356// send_header_block sends an HPACK header block as a HEADERS frame, splitting
357// it across CONTINUATION frames when it exceeds the peer's max frame size
358// (RFC 7540 Section 4.3). END_STREAM, when set, goes on the HEADERS frame.
359fn (mut c H2Conn) send_header_block(stream_id u32, block []u8, end_stream bool) ! {
360 max := int(c.peer.max_frame_size)
361 if block.len <= max {
362 c.send_frame(H2HeadersFrame{
363 stream_id: stream_id
364 fragment: block
365 end_headers: true
366 end_stream: end_stream
367 })!
368 return
369 }
370 c.send_frame(H2HeadersFrame{
371 stream_id: stream_id
372 fragment: block[..max]
373 end_headers: false
374 end_stream: end_stream
375 })!
376 mut off := max
377 for off < block.len {
378 mut next := off + max
379 if next > block.len {
380 next = block.len
381 }
382 c.send_frame(H2ContinuationFrame{
383 stream_id: stream_id
384 fragment: block[off..next]
385 end_headers: next == block.len
386 })!
387 off = next
388 }
389}
390
391// send_body writes the request body as DATA frames, chunked to the peer's max
392// frame size and bounded by both the connection and the per-stream send
393// flow-control windows (RFC 7540 Section 6.9).
394fn (mut c H2Conn) send_body(stream_id u32, body []u8) ! {
395 max := int(c.peer.max_frame_size)
396 mut off := 0
397 for off < body.len {
398 for c.send_window <= 0 || c.stream_send_window <= 0 {
399 // Wait for the peer to grow a window. Connection-level frames are
400 // handled; any stream frames are stashed for read_response.
401 frame := c.next_frame()!
402 if !c.handle_conn_frame(frame)! {
403 c.pending << frame
404 }
405 }
406 avail := if c.send_window < c.stream_send_window {
407 c.send_window
408 } else {
409 c.stream_send_window
410 }
411 mut chunk := body.len - off
412 if chunk > max {
413 chunk = max
414 }
415 if i64(chunk) > avail {
416 chunk = int(avail)
417 }
418 next := off + chunk
419 c.send_frame(H2DataFrame{
420 stream_id: stream_id
421 data: body[off..next]
422 end_stream: next == body.len
423 })!
424 c.send_window -= i64(chunk)
425 c.stream_send_window -= i64(chunk)
426 off = next
427 }
428}
429
430fn (mut c H2Conn) send_window_update(stream_id u32, inc u32) ! {
431 if inc == 0 {
432 return
433 }
434 c.send_frame(H2WindowUpdateFrame{
435 stream_id: stream_id
436 window_size_increment: inc
437 })!
438}
439
440// next_frame returns the next frame, replaying any stashed stream frames first.
441fn (mut c H2Conn) next_frame() !H2Frame {
442 if c.pending.len > 0 {
443 f := c.pending[0]
444 c.pending.delete(0)
445 return f
446 }
447 return c.read_frame()!
448}
449
450// read_frame reads and decodes one frame from the transport, enforcing our
451// advertised max frame size.
452fn (mut c H2Conn) read_frame() !H2Frame {
453 c.fill_at_least(h2_frame_header_len)!
454 header := h2_parse_frame_header(c.rbuf)!
455 if header.length > h2_default_max_frame_size {
456 return error('h2: frame larger than SETTINGS_MAX_FRAME_SIZE (${header.length})')
457 }
458 total := h2_frame_header_len + int(header.length)
459 c.fill_at_least(total)!
460 frame := h2_parse_frame(header, c.rbuf[h2_frame_header_len..total])!
461 c.rbuf = c.rbuf[total..].clone()
462 return frame
463}
464
465// fill_at_least reads from the transport until rbuf holds at least n bytes.
466fn (mut c H2Conn) fill_at_least(n int) ! {
467 for c.rbuf.len < n {
468 mut tmp := []u8{len: h2_conn_read_chunk}
469 got := c.transport.read(mut tmp)!
470 if got <= 0 {
471 return error('h2: connection closed by peer')
472 }
473 c.rbuf << tmp[..got]
474 }
475}
476
477fn (mut c H2Conn) send_frame(f H2Frame) ! {
478 c.write_all(f.encode())!
479}
480
481fn (mut c H2Conn) write_all(data []u8) ! {
482 mut sent := 0
483 for sent < data.len {
484 n := c.transport.write(data[sent..])!
485 if n <= 0 {
486 return error('h2: transport write returned ${n}')
487 }
488 sent += n
489 }
490}
491
492// h2_error_code_name renders an HTTP/2 error code, falling back to hex for
493// codes outside the defined range.
494fn h2_error_code_name(code u32) string {
495 if code <= u32(H2ErrorCode.http_1_1_required) {
496 return unsafe { H2ErrorCode(code) }.str()
497 }
498 return 'unknown(0x${code.hex()})'
499}
500