v / vlib / net / http / transport.v
403 lines · 378 sloc · 14.61 KB · 065a450b86f6459b1e4398fe7b0594bbfcc2d691
Raw
1// Copyright (c) 2019-2024 Alexander Medvednikov. All rights reserved.
2// Use of this source code is governed by an MIT license
3// that can be found in the LICENSE file.
4@[has_globals]
5module http
6
7import net
8import net.ssl
9import sync
10import time
11
12// This file implements connection reuse for the HTTP client: a Transport owns
13// a pool of keep-alive connections, keyed by origin and TLS configuration, and
14// fetch()/Request.do() route through a process-global default Transport unless
15// the request opts out (`disable_connection_reuse`) or uses a proxy. The first
16// cut pools HTTP/1.1 connections (plain TCP and TLS); HTTP/2 requests run on
17// the existing one-shot driver until the multiplexed H2 connection lands.
18
19// transport_err_unsafe_retry tags a failed exchange on a transport connection
20// for a non-idempotent method: the request bytes may have (partially) reached
21// the server — the write helpers cannot prove otherwise — so it must not be
22// replayed, neither by round_trip's inner loop nor the caller's outer retry
23// loop. is_no_need_retry_error recognizes this code so the outer loop honors it.
24const transport_err_unsafe_retry = -20013
25
26// H1PooledConn is one keep-alive HTTP/1.1 connection in a Transport pool:
27// either a plain TCP connection or a TLS one (exactly one of the two is set).
28@[heap]
29struct H1PooledConn {
30mut:
31 key string
32 tcp &net.TcpConn = unsafe { nil }
33 ssl &ssl.SSLConn = unsafe { nil }
34 idle_since time.Time
35}
36
37// close_conn shuts the underlying connection down and clears it.
38fn (mut c H1PooledConn) close_conn() {
39 if c.ssl != unsafe { nil } {
40 c.ssl.shutdown() or {}
41 c.ssl = unsafe { nil }
42 }
43 if c.tcp != unsafe { nil } {
44 c.tcp.close() or {}
45 c.tcp = unsafe { nil }
46 }
47}
48
49// refresh_timeouts applies the current request's timeouts to a pooled
50// connection, which may have been dialled by a request with different ones.
51fn (mut c H1PooledConn) refresh_timeouts(req &Request) {
52 if c.tcp != unsafe { nil } {
53 c.tcp.set_read_timeout(req.read_timeout)
54 c.tcp.set_write_timeout(req.write_timeout)
55 } else if c.ssl != unsafe { nil } {
56 if req.read_timeout > 0 {
57 c.ssl.set_read_timeout(req.read_timeout)
58 }
59 }
60}
61
62// exchange runs one request/response over the pooled connection, leaving it
63// open. The bool result reports whether the response was precisely framed, so
64// the connection can safely carry another request.
65fn (mut c H1PooledConn) exchange(req &Request, raw string) !(Response, bool) {
66 if c.ssl != unsafe { nil } {
67 return req.h1_exchange_ssl(mut c.ssl, raw)
68 }
69 return req.h1_exchange_tcp(mut c.tcp, raw)
70}
71
72// Transport holds the connection pools and reuse policy for HTTP requests. It
73// is safe for concurrent use; fetch()/Request.do() share one process-global
74// instance (see default_transport).
75//
76// Note: idle_timeout is enforced lazily on checkout of the same pool key; there
77// is no background reaper, so a connection to an origin that is never revisited
78// is reclaimed by the global max_idle_conns cap (or close_idle_connections())
79// rather than on a timer. A time-based reaper is a planned follow-up.
80@[heap]
81pub struct Transport {
82pub mut:
83 // max_idle_conns_per_host caps the idle keep-alive connections retained per
84 // pool key (origin + TLS configuration).
85 max_idle_conns_per_host int = 4
86 // max_idle_conns caps the total idle keep-alive connections kept across all
87 // pool keys, bounding file-descriptor use when many distinct origins are
88 // fetched. The least-recently-used idle connection is evicted on overflow.
89 // 0 disables the global cap.
90 max_idle_conns int = 100
91 // idle_timeout is how long an idle connection may sit in the pool before it
92 // is discarded instead of reused.
93 idle_timeout time.Duration = 90 * time.second
94mut:
95 mu &sync.Mutex = sync.new_mutex()
96 h1_idle map[string][]&H1PooledConn
97}
98
99// new_transport creates an empty Transport with default limits.
100pub fn new_transport() &Transport {
101 return &Transport{}
102}
103
104// Initialized eagerly in _vinit() (before main() and before any thread is
105// spawned), so the shared default Transport is fully constructed without a
106// concurrent-startup race. Lazy init via sync.Once is unsafe here: Once sets its
107// done flag before running the callback, so a second caller can observe "done"
108// and read this global while it is still nil (vlang/v#27456).
109__global http_transport_default = new_transport()
110
111// default_transport returns the process-global Transport that fetch() and
112// Request.do() route through.
113pub fn default_transport() &Transport {
114 return http_transport_default
115}
116
117// close_idle_connections closes every idle pooled connection held by the
118// default Transport. Useful before fork()-style process handoffs, or in tests.
119pub fn close_idle_connections() {
120 mut t := default_transport()
121 t.close_idle()
122}
123
124// close_idle closes every idle pooled connection held by this Transport.
125// In-flight requests are unaffected.
126pub fn (mut t Transport) close_idle() {
127 mut all := []&H1PooledConn{}
128 t.mu.lock()
129 for _, list in t.h1_idle {
130 for c in list {
131 all << c
132 }
133 }
134 t.h1_idle.clear()
135 t.mu.unlock()
136 for mut c in all {
137 c.close_conn()
138 }
139}
140
141// transport_pool_key builds the pool key for a request: connections are only
142// shared between requests whose origin and TLS-relevant settings all match.
143// enable_http2 is part of the key because it changes what the connection
144// advertised via ALPN at dial time: a forced-HTTP/1.1 connection (no ALPN)
145// must not satisfy an HTTP/2-enabled request, which would otherwise silently
146// never negotiate h2 against that origin while the pooled connection lives.
147fn transport_pool_key(req &Request, scheme string, host string, port int) string {
148 // enable_http2 only affects https dials (plain http ignores it), so keep
149 // the plain-http pool unsplit.
150 h2 := scheme == 'https' && req.enable_http2
151 // Length-prefix the free-form string fields (host and the TLS paths/PEM
152 // blobs) so a value containing the '|' separator cannot collide with a
153 // different field split — e.g. cert='a|b',cert_key='c' vs cert='a',
154 // cert_key='b|c' — which would let a request reuse a connection dialed with
155 // the wrong CA or client certificate. Bools and the int port cannot contain
156 // '|', and scheme is a fixed literal, so they need no prefixing.
157 return '${scheme}|${pk_part(host)}|${port}|${h2}|${req.validate}|${pk_part(req.verify)}|${pk_part(req.cert)}|${pk_part(req.cert_key)}|${req.in_memory_verification}'
158}
159
160// pk_part length-prefixes a string (`len:value`) so concatenated pool-key
161// components stay unambiguous regardless of the bytes they contain.
162fn pk_part(s string) string {
163 return '${s.len}:${s}'
164}
165
166// transport_is_idempotent reports whether a request with this method can be
167// replayed safely after a failure on a reused connection (RFC 7231 4.2.2).
168fn transport_is_idempotent(method Method) bool {
169 return method in [.get, .head, .options, .trace, .put, .delete]
170}
171
172// response_allows_reuse reports whether the server's response permits keeping
173// the connection open for another request.
174fn response_allows_reuse(resp &Response) bool {
175 if resp.status_code < 200 {
176 // 1xx handling stops at the headers; the connection state past them is
177 // not tracked, so never reuse.
178 return false
179 }
180 // A server may split Connection across repeated header lines; get() returns
181 // only the first, so join all values before scanning for the close token.
182 conn_tokens := resp.header.values(.connection).join(',').to_lower()
183 if conn_tokens.contains('close') {
184 return false
185 }
186 ver := resp.version()
187 if ver == .v1_0 {
188 // HTTP/1.0 defaults to close; reuse only with an explicit keep-alive.
189 return conn_tokens.contains('keep-alive')
190 }
191 return ver == .v1_1
192}
193
194// checkout pops the most recently used idle connection for `key`, discarding
195// any that sat idle past the timeout. Returns nil when the pool has none.
196fn (mut t Transport) checkout(key string) &H1PooledConn {
197 mut expired := []&H1PooledConn{}
198 mut found := &H1PooledConn(unsafe { nil })
199 t.mu.lock()
200 for {
201 mut list := t.h1_idle[key] or { break }
202 if list.len == 0 {
203 break
204 }
205 mut c := list.pop()
206 t.h1_idle[key] = list
207 if time.now() - c.idle_since > t.idle_timeout {
208 expired << c
209 continue
210 }
211 found = c
212 break
213 }
214 t.mu.unlock()
215 for mut c in expired {
216 c.close_conn()
217 }
218 return found
219}
220
221// checkin returns a healthy connection to the idle pool, or closes it when the
222// per-host pool is already at capacity. Adding it may push the total idle count
223// over max_idle_conns, in which case the least-recently-used idle connection
224// across all pools is evicted and closed.
225fn (mut t Transport) checkin(mut conn H1PooledConn) {
226 conn.idle_since = time.now()
227 mut evicted := &H1PooledConn(unsafe { nil })
228 t.mu.lock()
229 mut list := t.h1_idle[conn.key] or { []&H1PooledConn{} }
230 if list.len >= t.max_idle_conns_per_host {
231 t.mu.unlock()
232 conn.close_conn()
233 return
234 }
235 list << conn
236 t.h1_idle[conn.key] = list
237 if t.max_idle_conns > 0 && t.total_idle_locked() > t.max_idle_conns {
238 evicted = t.evict_oldest_locked()
239 }
240 t.mu.unlock()
241 if evicted != unsafe { nil } {
242 evicted.close_conn()
243 }
244}
245
246// total_idle_locked sums the idle connections across all pool keys. The caller
247// must hold t.mu.
248fn (t &Transport) total_idle_locked() int {
249 mut n := 0
250 for _, list in t.h1_idle {
251 n += list.len
252 }
253 return n
254}
255
256// evict_oldest_locked removes and returns the least-recently-used idle
257// connection across all pool keys (nil if the pool is empty), pruning an
258// emptied key. The caller must hold t.mu and close the returned connection.
259fn (mut t Transport) evict_oldest_locked() &H1PooledConn {
260 mut oldest_key := ''
261 mut oldest_idx := -1
262 mut oldest_since := time.now()
263 for k, list in t.h1_idle {
264 for i, c in list {
265 if oldest_idx == -1 || c.idle_since < oldest_since {
266 oldest_key = k
267 oldest_idx = i
268 oldest_since = c.idle_since
269 }
270 }
271 }
272 if oldest_idx == -1 {
273 return &H1PooledConn(unsafe { nil })
274 }
275 mut list := t.h1_idle[oldest_key] or { return &H1PooledConn(unsafe { nil }) }
276 victim := list[oldest_idx]
277 list.delete(oldest_idx)
278 if list.len == 0 {
279 t.h1_idle.delete(oldest_key)
280 } else {
281 t.h1_idle[oldest_key] = list
282 }
283 return victim
284}
285
286// maybe_checkin pools the connection when both the read framing and the
287// response (and request) headers allow reuse; otherwise it closes it.
288fn (mut t Transport) maybe_checkin(mut conn H1PooledConn, header Header, reusable bool, resp &Response) {
289 if reusable && !header.contains(.connection) && response_allows_reuse(resp) {
290 t.checkin(mut conn)
291 return
292 }
293 conn.close_conn()
294}
295
296// round_trip performs one HTTP request through the connection pool: it reuses
297// an idle connection for the request's pool key when one exists (transparently
298// retrying once on a connection that turned out to be stale), dials otherwise,
299// and returns healthy connections to the pool afterwards.
300fn (mut t Transport) round_trip(req &Request, method Method, scheme string, host string, port int, path string, data string, header Header) !Response {
301 $if windows && !no_vschannel ? {
302 if scheme == 'https' {
303 // The SChannel backend keeps its proven one-shot path until SChannel
304 // pooling lands; plain-http pooling below already works on Windows.
305 return req.ssl_do(port, method, host, path, data, header)
306 }
307 }
308 raw := req.build_request_headers_opts(method, host, port, path, data, header, false)
309 $if trace_http_request ? {
310 eprint('> ')
311 eprint(raw)
312 eprintln('')
313 }
314 key := transport_pool_key(req, scheme, host, port)
315 // A stale pooled connection (closed by the server while idle) fails the
316 // exchange; drain through the pool, then dial fresh.
317 for _ in 0 .. t.max_idle_conns_per_host + 1 {
318 mut conn := t.checkout(key)
319 mut reused := true
320 if conn == unsafe { nil } {
321 reused = false
322 if scheme == 'https' {
323 return t.tls_fresh_round_trip(req, key, raw, method, host, port, path, data, header)
324 }
325 conn = t.dial_h1_tcp(req, key, host, port)!
326 } else {
327 conn.refresh_timeouts(req)
328 }
329 resp, reusable := conn.exchange(req, raw) or {
330 conn.close_conn()
331 // The write helpers do partial writes and cannot report how many
332 // bytes reached the server, so a failed exchange may have delivered
333 // part of the request. Only idempotent methods are safe to replay;
334 // a non-idempotent one is tagged so neither this loop nor the
335 // caller's outer retry loop re-sends it (avoiding duplicate side
336 // effects). A pre-write dial failure, by contrast, propagates before
337 // this point and stays freely retryable.
338 if !transport_is_idempotent(method) {
339 return error_with_code(err.msg(), transport_err_unsafe_retry)
340 }
341 if reused {
342 // Drain stale pooled connections, then fall through to a fresh dial.
343 continue
344 }
345 return err
346 }
347 t.maybe_checkin(mut conn, header, reusable, resp)
348 return resp
349 }
350 return error('http.transport: request failed after retrying on a fresh connection')
351}
352
353// dial_h1_tcp opens a fresh plain-TCP connection for `key`.
354fn (mut t Transport) dial_h1_tcp(req &Request, key string, host string, port int) !&H1PooledConn {
355 mut client := net.dial_tcp('${host}:${port}')!
356 client.set_read_timeout(req.read_timeout)
357 client.set_write_timeout(req.write_timeout)
358 return &H1PooledConn{
359 key: key
360 tcp: client
361 }
362}
363
364// tls_fresh_round_trip dials a fresh TLS connection (advertising ALPN h2 when
365// HTTP/2 is enabled) and completes the request on it. When the server selects
366// h2, the request runs on the existing one-shot HTTP/2 driver and the
367// connection is not pooled yet; an http/1.1 connection is pooled afterwards
368// like any other.
369fn (mut t Transport) tls_fresh_round_trip(req &Request, key string, raw string, method Method, host string, port int, path string, data string, header Header) !Response {
370 alpn := if req.enable_http2 { ['h2', 'http/1.1'] } else { []string{} }
371 mut ssl_conn := ssl.new_ssl_conn(
372 verify: req.verify
373 cert: req.cert
374 cert_key: req.cert_key
375 validate: req.validate
376 in_memory_verification: req.in_memory_verification
377 alpn_protocols: alpn
378 )!
379 ssl_conn.dial(host, port)!
380 if req.read_timeout > 0 {
381 ssl_conn.set_read_timeout(req.read_timeout)
382 }
383 if req.enable_http2 && ssl_conn.negotiated_alpn() == 'h2' {
384 return req.h2_do(mut ssl_conn, method, host, port, path, data, header)
385 }
386 mut conn := &H1PooledConn{
387 key: key
388 ssl: ssl_conn
389 }
390 resp, reusable := conn.exchange(req, raw) or {
391 conn.close_conn()
392 // Past the TLS handshake the request bytes may have been (partially)
393 // written; a non-idempotent method must not be replayed by the outer
394 // retry loop. (A dial/handshake failure above propagates before this and
395 // stays retryable, since no request byte was sent.)
396 if !transport_is_idempotent(method) {
397 return error_with_code(err.msg(), transport_err_unsafe_retry)
398 }
399 return err
400 }
401 t.maybe_checkin(mut conn, header, reusable, resp)
402 return resp
403}
404