v / vlib / net / http / h2_server.v
560 lines · 528 sloc · 15.57 KB · 143e89700bb63ea5073f06793bf2e95d36d81492
Raw
1// Copyright (c) 2019-2024 Alexander Medvednikov. All rights reserved.
2// Use of this source code is governed by an MIT license
3// that can be found in the LICENSE file.
4module http
5
6// This file implements a minimal server-side HTTP/2 connection driver on top
7// of the existing framing (h2_frame.v) and HPACK (h2_hpack.v) layers. It is
8// intentionally serial: SETTINGS_MAX_CONCURRENT_STREAMS is advertised as 1,
9// so the peer sends one request at a time. Concurrent stream handling and
10// background writers are a planned follow-up.
11
12// h2_server_max_concurrent_streams is the value advertised in our SETTINGS;
13// the peer may not open more streams than this at once.
14const h2_server_max_concurrent_streams = u32(1)
15
16// h2_server_default_window is the default initial flow-control window
17// (RFC 7540 Section 6.9.2).
18const h2_server_default_window = u32(65535)
19
20// h2_server_max_request_body caps the in-memory request body the server will
21// accept before responding with a stream error. Bodies larger than this are
22// rejected with RST_STREAM(REFUSED_STREAM).
23const h2_server_max_request_body = 8 * 1024 * 1024
24
25// H2ServerStream holds the state of one in-flight server-side stream.
26struct H2ServerStream {
27mut:
28 id u32
29 hpack_block []u8 // assembled HEADERS + CONTINUATION fragments
30 headers []H2HeaderField
31 body []u8
32 end_headers bool
33 end_stream bool
34 headers_done bool // set once we've decoded the HPACK block
35 send_window i64
36}
37
38// H2ServerConn drives one server-side HTTP/2 connection over a transport.
39struct H2ServerConn {
40mut:
41 transport H2Transport
42 encoder H2HpackEncoder
43 decoder H2HpackDecoder
44 peer H2PeerSettings
45 rbuf []u8
46 send_window i64 = i64(h2_server_default_window)
47 streams map[u32]&H2ServerStream
48 last_stream_id u32
49 awaiting_cont u32 // non-zero when mid-CONTINUATION on this stream
50 closing bool
51 idle_conns &TlsIdleConnTracker = unsafe { nil }
52 idle_handle int
53}
54
55// serve_h2_conn drives a single HTTP/2 server-side connection until the
56// transport closes or a protocol error forces a GOAWAY. `handler` is invoked
57// once per fully-received request stream.
58fn serve_h2_conn(mut transport H2Transport, mut handler Handler) ! {
59 serve_h2_conn_with_idle_tracker(mut transport, mut handler, unsafe { nil }, 0)!
60}
61
62fn serve_h2_conn_with_idle_tracker(mut transport H2Transport, mut handler Handler, idle_conns &TlsIdleConnTracker, idle_handle int) ! {
63 mut c := &H2ServerConn{
64 transport: transport
65 idle_conns: idle_conns
66 idle_handle: idle_handle
67 }
68 c.serve(mut handler) or {
69 // Best-effort GOAWAY before bailing.
70 c.send_goaway(.protocol_error, err.msg()) or {}
71 return err
72 }
73}
74
75fn (mut c H2ServerConn) serve(mut handler Handler) ! {
76 // Register the connection with the idle tracker once for its whole
77 // lifetime, instead of around every frame read. The reader thread spends
78 // nearly all of its time blocked in a frame read, so per-frame mark/unmark
79 // only added shared-lock contention (and an O(n) list scan) on the hot
80 // path. On shutdown, close_idle still interrupts the reader by shutting the
81 // fd down; an h2 request in flight when the server stops is interrupted,
82 // which is acceptable at shutdown and is not relied on by any caller (the
83 // graceful "wait for active request" guarantee is HTTP/1.1-only).
84 tracked := c.should_track_idle_read()
85 if tracked && !c.idle_conns.mark_idle(c.idle_handle) {
86 // The server is already shutting down; do not start serving.
87 return
88 }
89 defer {
90 if tracked {
91 c.idle_conns.unmark_idle(c.idle_handle)
92 }
93 }
94 c.read_client_preface()!
95 c.send_initial_settings()!
96 for !c.closing {
97 frame := c.read_frame() or {
98 // Treat a clean transport close as end of session.
99 return
100 }
101 c.dispatch_frame(frame, mut handler)!
102 }
103}
104
105fn (mut c H2ServerConn) should_track_idle_read() bool {
106 return c.idle_handle > 0 && c.idle_conns != unsafe { nil }
107}
108
109fn (mut c H2ServerConn) read_client_preface() ! {
110 c.fill_at_least(h2_client_preface.len)!
111 got := c.rbuf[..h2_client_preface.len].bytestr()
112 if got != h2_client_preface {
113 return error('h2 server: invalid connection preface')
114 }
115 c.rbuf = c.rbuf[h2_client_preface.len..].clone()
116}
117
118fn (mut c H2ServerConn) send_initial_settings() ! {
119 c.send_frame(H2SettingsFrame{
120 settings: [
121 H2Setting{h2_settings_enable_push, 0},
122 H2Setting{h2_settings_max_concurrent_streams, h2_server_max_concurrent_streams},
123 H2Setting{h2_settings_initial_window_size, h2_server_default_window},
124 H2Setting{h2_settings_max_frame_size, h2_default_max_frame_size},
125 ]
126 })!
127}
128
129fn (mut c H2ServerConn) dispatch_frame(frame H2Frame, mut handler Handler) ! {
130 // RFC 7540 §6.10: once a HEADERS or PUSH_PROMISE without END_HEADERS is
131 // seen, the next frame must be a CONTINUATION on the same stream.
132 if c.awaiting_cont != 0 {
133 if frame is H2ContinuationFrame {
134 if frame.stream_id != c.awaiting_cont {
135 return error('h2 server: CONTINUATION on the wrong stream')
136 }
137 } else {
138 return error('h2 server: expected CONTINUATION after HEADERS without END_HEADERS')
139 }
140 }
141 match frame {
142 H2SettingsFrame {
143 if !frame.ack {
144 c.apply_settings(frame.settings)
145 c.send_frame(H2SettingsFrame{
146 ack: true
147 })!
148 }
149 }
150 H2PingFrame {
151 if !frame.ack {
152 c.send_frame(H2PingFrame{
153 ack: true
154 data: frame.data
155 })!
156 }
157 }
158 H2WindowUpdateFrame {
159 if frame.stream_id == 0 {
160 c.send_window += i64(frame.window_size_increment)
161 } else if mut s := c.streams[frame.stream_id] {
162 s.send_window += i64(frame.window_size_increment)
163 }
164 }
165 H2GoawayFrame {
166 c.closing = true
167 }
168 H2RstStreamFrame {
169 c.streams.delete(frame.stream_id)
170 }
171 H2PriorityFrame {
172 // Priority is advisory; ignore.
173 }
174 H2HeadersFrame {
175 c.on_headers(frame, mut handler)!
176 }
177 H2ContinuationFrame {
178 c.on_continuation(frame, mut handler)!
179 }
180 H2DataFrame {
181 c.on_data(frame, mut handler)!
182 }
183 H2PushPromiseFrame {
184 // Clients should not push. Treat as a protocol error.
185 return error('h2 server: PUSH_PROMISE from client')
186 }
187 H2UnknownFrame {
188 // RFC 7540 §4.1: ignore unknown frame types.
189 }
190 }
191}
192
193fn (mut c H2ServerConn) apply_settings(settings []H2Setting) {
194 for s in settings {
195 match s.id {
196 h2_settings_header_table_size {
197 c.peer.header_table_size = s.value
198 }
199 h2_settings_enable_push {
200 c.peer.enable_push = s.value != 0
201 }
202 h2_settings_max_concurrent_streams {
203 c.peer.max_concurrent_streams = s.value
204 }
205 h2_settings_initial_window_size {
206 // RFC 7540 Section 6.9.2: a change to the initial window size
207 // adjusts the send window of every active stream by the delta.
208 delta := i64(s.value) - i64(c.peer.initial_window_size)
209 c.peer.initial_window_size = s.value
210 for _, mut st in c.streams {
211 st.send_window += delta
212 }
213 }
214 h2_settings_max_frame_size {
215 c.peer.max_frame_size = s.value
216 }
217 h2_settings_max_header_list_size {
218 c.peer.max_header_list_size = s.value
219 }
220 else {}
221 }
222 }
223}
224
225fn (mut c H2ServerConn) on_headers(frame H2HeadersFrame, mut handler Handler) ! {
226 // Stream ids from the client must be odd and strictly increasing.
227 if frame.stream_id & 1 == 0 || frame.stream_id <= c.last_stream_id {
228 return error('h2 server: invalid client stream id ${frame.stream_id}')
229 }
230 c.last_stream_id = frame.stream_id
231 mut s := &H2ServerStream{
232 id: frame.stream_id
233 hpack_block: frame.fragment.clone()
234 end_headers: frame.end_headers
235 end_stream: frame.end_stream
236 send_window: i64(c.peer.initial_window_size)
237 }
238 c.streams[frame.stream_id] = s
239 if !frame.end_headers {
240 c.awaiting_cont = frame.stream_id
241 return
242 }
243 c.finalize_headers(mut s, mut handler)!
244}
245
246fn (mut c H2ServerConn) on_continuation(frame H2ContinuationFrame, mut handler Handler) ! {
247 mut s := c.streams[frame.stream_id] or {
248 return error('h2 server: CONTINUATION for unknown stream ${frame.stream_id}')
249 }
250 s.hpack_block << frame.fragment
251 if frame.end_headers {
252 s.end_headers = true
253 c.awaiting_cont = 0
254 c.finalize_headers(mut s, mut handler)!
255 }
256}
257
258fn (mut c H2ServerConn) finalize_headers(mut s H2ServerStream, mut handler Handler) ! {
259 s.headers = c.decoder.decode(s.hpack_block) or {
260 c.send_rst_stream(s.id, .compression_error)!
261 c.streams.delete(s.id)
262 return
263 }
264 s.headers_done = true
265 if s.end_stream {
266 c.run_request(mut s, mut handler)!
267 }
268}
269
270fn (mut c H2ServerConn) on_data(frame H2DataFrame, mut handler Handler) ! {
271 mut s := c.streams[frame.stream_id] or {
272 // DATA for an unknown stream (likely already RST'd); just drop and
273 // keep flow control consistent.
274 if frame.data.len > 0 {
275 c.send_window_update(0, u32(frame.data.len))!
276 }
277 return
278 }
279 if !s.headers_done {
280 return error('h2 server: DATA before END_HEADERS')
281 }
282 if frame.data.len > 0 {
283 if s.body.len + frame.data.len > h2_server_max_request_body {
284 c.send_rst_stream(s.id, .refused_stream)!
285 c.streams.delete(s.id)
286 return
287 }
288 s.body << frame.data
289 // Replenish the connection window; per-stream we replenish on
290 // completion since we hold the body in memory.
291 c.send_window_update(0, u32(frame.data.len))!
292 c.send_window_update(s.id, u32(frame.data.len))!
293 }
294 if frame.end_stream {
295 s.end_stream = true
296 c.run_request(mut s, mut handler)!
297 }
298}
299
300fn (mut c H2ServerConn) run_request(mut s H2ServerStream, mut handler Handler) ! {
301 req := c.build_request(s) or {
302 c.send_rst_stream(s.id, .protocol_error)!
303 c.streams.delete(s.id)
304 return
305 }
306 resp := handler.handle(req)
307 c.send_response(s.id, resp)!
308 c.streams.delete(s.id)
309}
310
311fn (mut c H2ServerConn) build_request(s &H2ServerStream) !Request {
312 mut req := Request{
313 version: .v2_0
314 header: new_header()
315 }
316 mut method := ''
317 mut path := ''
318 mut authority := ''
319 mut scheme := 'https'
320 for f in s.headers {
321 match f.name {
322 ':method' {
323 method = f.value
324 }
325 ':path' {
326 path = f.value
327 }
328 ':authority' {
329 authority = f.value
330 }
331 ':scheme' {
332 scheme = f.value
333 }
334 else {
335 if f.name.starts_with(':') {
336 return error('h2 server: unknown pseudo-header ${f.name}')
337 }
338 req.header.add_custom(f.name, f.value) or {}
339 }
340 }
341 }
342 if method == '' || path == '' {
343 return error('h2 server: missing :method or :path')
344 }
345 req.method = method_from_str(method)
346 if authority != '' && !req.header.contains(.host) {
347 req.header.add(.host, authority)
348 }
349 // Match the HTTP/1.1 path: req.url is the request-target (the :path
350 // pseudo-header), so handlers see the same shape on both transports.
351 _ = scheme // :scheme is parsed and discarded; handlers infer it from Host
352 req.url = path
353 req.data = s.body.bytestr()
354 req.host = authority
355 return req
356}
357
358fn (mut c H2ServerConn) send_response(stream_id u32, resp Response) ! {
359 status := if resp.status_code == 0 { 200 } else { resp.status_code }
360 mut fields := [H2HeaderField{':status', status.str()}]
361 for key in resp.header.keys() {
362 lkey := key.to_lower()
363 // Drop hop-by-hop headers; HTTP/2 forbids them (RFC 7540 §8.1.2.2).
364 if lkey in ['connection', 'keep-alive', 'transfer-encoding', 'upgrade', 'proxy-connection'] {
365 continue
366 }
367 for val in resp.header.custom_values(key) {
368 fields << H2HeaderField{lkey, val}
369 }
370 }
371 body := resp.body.bytes()
372 has_body := body.len > 0
373 block := c.encoder.encode(fields)
374 c.send_header_block(stream_id, block, !has_body)!
375 if has_body {
376 c.send_body(stream_id, body)!
377 }
378}
379
380fn (mut c H2ServerConn) send_header_block(stream_id u32, block []u8, end_stream bool) ! {
381 max := int(c.peer.max_frame_size)
382 if block.len <= max {
383 c.send_frame(H2HeadersFrame{
384 stream_id: stream_id
385 fragment: block
386 end_headers: true
387 end_stream: end_stream
388 })!
389 return
390 }
391 c.send_frame(H2HeadersFrame{
392 stream_id: stream_id
393 fragment: block[..max]
394 end_headers: false
395 end_stream: end_stream
396 })!
397 mut off := max
398 for off < block.len {
399 mut next := off + max
400 if next > block.len {
401 next = block.len
402 }
403 c.send_frame(H2ContinuationFrame{
404 stream_id: stream_id
405 fragment: block[off..next]
406 end_headers: next == block.len
407 })!
408 off = next
409 }
410}
411
412fn (mut c H2ServerConn) send_body(stream_id u32, body []u8) ! {
413 max := int(c.peer.max_frame_size)
414 mut off := 0
415 for off < body.len {
416 // Respect both the connection and per-stream send windows
417 // (RFC 7540 Section 6.9). When either is exhausted, read frames until
418 // the peer grows a window with WINDOW_UPDATE.
419 for c.send_window <= 0 || c.stream_send_window(stream_id) <= 0 {
420 c.pump_for_window(stream_id)!
421 }
422 avail := if c.send_window < c.stream_send_window(stream_id) {
423 c.send_window
424 } else {
425 c.stream_send_window(stream_id)
426 }
427 mut chunk := body.len - off
428 if chunk > max {
429 chunk = max
430 }
431 if i64(chunk) > avail {
432 chunk = int(avail)
433 }
434 next := off + chunk
435 c.send_frame(H2DataFrame{
436 stream_id: stream_id
437 data: body[off..next]
438 end_stream: next == body.len
439 })!
440 c.send_window -= i64(chunk)
441 if mut s := c.streams[stream_id] {
442 s.send_window -= i64(chunk)
443 }
444 off = next
445 }
446}
447
448// stream_send_window returns the current per-stream send window, or 0 if the
449// stream is gone.
450fn (c &H2ServerConn) stream_send_window(stream_id u32) i64 {
451 if s := c.streams[stream_id] {
452 return s.send_window
453 }
454 return 0
455}
456
457// pump_for_window reads one frame while a response is blocked on flow control,
458// servicing connection-level frames (SETTINGS / PING / WINDOW_UPDATE) and a
459// RST_STREAM for the stream being written.
460fn (mut c H2ServerConn) pump_for_window(stream_id u32) ! {
461 frame := c.read_frame()!
462 match frame {
463 H2SettingsFrame {
464 if !frame.ack {
465 c.apply_settings(frame.settings)
466 c.send_frame(H2SettingsFrame{
467 ack: true
468 })!
469 }
470 }
471 H2PingFrame {
472 if !frame.ack {
473 c.send_frame(H2PingFrame{
474 ack: true
475 data: frame.data
476 })!
477 }
478 }
479 H2WindowUpdateFrame {
480 if frame.stream_id == 0 {
481 c.send_window += i64(frame.window_size_increment)
482 } else if mut s := c.streams[frame.stream_id] {
483 s.send_window += i64(frame.window_size_increment)
484 }
485 }
486 H2RstStreamFrame {
487 if frame.stream_id == stream_id {
488 return error('h2 server: stream reset by peer while writing response')
489 }
490 }
491 else {
492 // With SETTINGS_MAX_CONCURRENT_STREAMS=1 no other stream frames are
493 // expected mid-response; ignore anything else defensively.
494 }
495 }
496}
497
498fn (mut c H2ServerConn) send_window_update(stream_id u32, inc u32) ! {
499 if inc == 0 {
500 return
501 }
502 c.send_frame(H2WindowUpdateFrame{
503 stream_id: stream_id
504 window_size_increment: inc
505 })!
506}
507
508fn (mut c H2ServerConn) send_rst_stream(stream_id u32, code H2ErrorCode) ! {
509 c.send_frame(H2RstStreamFrame{
510 stream_id: stream_id
511 error_code: u32(code)
512 })!
513}
514
515fn (mut c H2ServerConn) send_goaway(code H2ErrorCode, msg string) ! {
516 c.send_frame(H2GoawayFrame{
517 last_stream_id: c.last_stream_id
518 error_code: u32(code)
519 debug_data: msg.bytes()
520 })!
521}
522
523fn (mut c H2ServerConn) read_frame() !H2Frame {
524 c.fill_at_least(h2_frame_header_len)!
525 header := h2_parse_frame_header(c.rbuf)!
526 if header.length > h2_default_max_frame_size {
527 return error('h2 server: frame larger than SETTINGS_MAX_FRAME_SIZE (${header.length})')
528 }
529 total := h2_frame_header_len + int(header.length)
530 c.fill_at_least(total)!
531 frame := h2_parse_frame(header, c.rbuf[h2_frame_header_len..total])!
532 c.rbuf = c.rbuf[total..].clone()
533 return frame
534}
535
536fn (mut c H2ServerConn) fill_at_least(n int) ! {
537 for c.rbuf.len < n {
538 mut tmp := []u8{len: h2_conn_read_chunk}
539 got := c.transport.read(mut tmp)!
540 if got <= 0 {
541 return error('h2 server: connection closed by peer')
542 }
543 c.rbuf << tmp[..got]
544 }
545}
546
547fn (mut c H2ServerConn) send_frame(f H2Frame) ! {
548 c.write_all(f.encode())!
549}
550
551fn (mut c H2ServerConn) write_all(data []u8) ! {
552 mut sent := 0
553 for sent < data.len {
554 n := c.transport.write(data[sent..])!
555 if n <= 0 {
556 return error('h2 server: transport write returned ${n}')
557 }
558 sent += n
559 }
560}
561