v2 / vlib / veb / veb_fasthttp.v
308 lines · 289 sloc · 10.22 KB · 45545c2fda3dfafa31fb7341b31b786ad143e67d
Raw
1// Copyright (c) 2019-2024 Alexander Medvednikov. All rights reserved.
2// Use of this source code is governed by an MIT license
3// that can be found in the LICENSE file.
4module veb
5
6import fasthttp
7import net.http
8import strconv
9import strings
10import time
11import net.urllib
12
13struct RequestParams {
14 global_app voidptr
15 controllers_sorted []&ControllerPath
16 routes &map[string]Route
17 benchmark_page_generation bool
18}
19
20const http_ok_response = 'HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: 0\r\nConnection: keep-alive\r\n\r\n'.bytes()
21
22pub fn run_at[A, X](mut global_app A, params RunParams) ! {
23 run_new[A, X](mut global_app, params)!
24}
25
26// run_new - start a new veb server using the parallel fasthttp backend.
27pub fn run_new[A, X](mut global_app A, params RunParams) ! {
28 if params.port <= 0 || params.port > 65535 {
29 return error('invalid port number `${params.port}`, it should be between 1 and 65535')
30 }
31 if ssl_enabled(params) {
32 maybe_init_server[A](mut global_app, new_server_without_lifecycle())
33 run_at_with_ssl[A, X](mut global_app, params)!
34 return
35 }
36
37 // Generate routes and controllers just like the original run() function.
38 routes := generate_routes[A, X](global_app)!
39 controllers_sorted := check_duplicate_routes_in_controllers[A](global_app, routes)!
40
41 // Allocate params on the heap to keep it valid for the server lifetime
42 request_params := &RequestParams{
43 global_app: unsafe { voidptr(&global_app) }
44 controllers_sorted: controllers_sorted
45 routes: &routes
46 benchmark_page_generation: params.benchmark_page_generation
47 }
48
49 // Configure and run the fasthttp server
50 mut server := fasthttp.new_server(fasthttp.ServerConfig{
51 family: params.family
52 port: params.port
53 handler: parallel_request_handler[A, X]
54 max_request_buffer_size: params.max_request_buffer_size
55 timeout_in_seconds: params.timeout_in_seconds
56 user_data: voidptr(request_params)
57 }) or {
58 eprintln('Failed to create server: ${err}')
59 return
60 }
61 handle := server.handle()
62 maybe_init_server[A](mut global_app, new_server_with_lifecycle(handle))
63 println('[veb] Running multi-threaded app on ${server_protocol(params)}://${startup_host(params)}:${params.port}/')
64 flush_stdout()
65 $if A is BeforeAcceptApp {
66 mut server_thread := spawn_fasthttp_server_run(mut server)
67 // Wait until the listener is bound before invoking before_accept_loop,
68 // so callers using `<-app.started` actually see the server ready.
69 handle.wait_till_running() or {}
70 global_app.before_accept_loop()
71 server_thread.wait() or { panic(err) }
72 } $else {
73 server.run() or { panic(err) }
74 }
75}
76
77fn spawn_fasthttp_server_run(mut server fasthttp.Server) thread ! {
78 return spawn server.run()
79}
80
81fn parallel_request_handler[A, X](req fasthttp.HttpRequest) !fasthttp.HttpResponse {
82 // Get parameters from user_data - copy to avoid use-after-free
83 params := unsafe { *(&RequestParams(req.user_data)) }
84 mut global_app := unsafe { &A(params.global_app) }
85
86 client_fd := req.client_conn_fd
87
88 head_end := if req.body.start > 0 { req.body.start } else { req.buffer.len }
89 head := req.buffer[..head_end].bytestr()
90 // Parse the request head into a standard `http.Request`, then copy just the body.
91 mut req2 := http.parse_request_head_str(head) or {
92 return fasthttp.HttpResponse{
93 content: 'HTTP/1.1 500 Internal Server Error\r\nContent-Length: 0\r\nConnection: close\r\n\r\n'.bytes()
94 content_owned: true
95 }
96 }
97 $if trace_prealloc ? {
98 unsafe { prealloc_scope_checkpoint(c'veb parsed http head') }
99 }
100 if req.body.len > 0 {
101 req2.data = req.buffer[req.body.start..req.body.start + req.body.len].bytestr()
102 }
103 // If the request uses chunked transfer encoding, decode the chunked body
104 if transfer_encoding_is_chunked(req2.header) {
105 req2.data = decode_chunked_body(req2.data) or {
106 return fasthttp.HttpResponse{
107 content: 'HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\nConnection: close\r\n\r\n'.bytes()
108 content_owned: true
109 }
110 }
111 }
112 if invalid_resp := content_length_validation_response(req, req2) {
113 return invalid_resp
114 }
115 // Create and populate the `veb.Context`.
116 mut completed_context := handle_request_and_route[A, X](mut global_app, req2, client_fd, params)
117 $if trace_prealloc ? {
118 unsafe { prealloc_scope_checkpoint(c'veb handled route') }
119 }
120
121 match completed_context.takeover_mode {
122 .manual {
123 // The handler has taken over the connection (e.g. for SSE or WebSocket).
124 // The response was already sent directly over ctx.conn.
125 // Tell fasthttp to hand off the fd without closing it.
126 return fasthttp.HttpResponse{
127 takeover_mode: .manual
128 }
129 }
130 .reusable {
131 should_close := should_close_connection(completed_context.req, completed_context.res,
132 completed_context.client_wants_to_close)
133 return fasthttp.HttpResponse{
134 takeover_mode: .reusable
135 should_close: should_close
136 }
137 }
138 .none {}
139 }
140
141 should_close := should_close_connection(completed_context.req, completed_context.res,
142 completed_context.client_wants_to_close)
143 content := completed_context.res.bytes()
144 $if trace_prealloc ? {
145 unsafe { prealloc_scope_checkpoint(c'veb serialized response') }
146 }
147 unsafe { completed_context.res.body.free() }
148 completed_context.res.body = ''
149 return_type := completed_context.return_type
150 return_file := completed_context.return_file
151 unsafe { free(completed_context) }
152
153 if return_type == .file {
154 return fasthttp.HttpResponse{
155 content: content
156 content_owned: true
157 file_path: return_file
158 should_close: should_close
159 }
160 }
161
162 // The fasthttp server expects a complete response buffer to be returned.
163 return fasthttp.HttpResponse{
164 content: content
165 content_owned: true
166 should_close: should_close
167 }
168} // handle_request_and_route is a unified function that creates the context,
169
170fn content_length_validation_response(req fasthttp.HttpRequest, parsed http.Request) ?fasthttp.HttpResponse {
171 if transfer_encoding_is_chunked(parsed.header) {
172 return none
173 }
174 content_length := parsed.header.get(.content_length) or { return none }
175 expected_length := content_length.int()
176 actual_length := req.body.len
177 if actual_length == expected_length {
178 return none
179 }
180 if actual_length < expected_length {
181 return fasthttp.HttpResponse{
182 content: http_408.bytes()
183 content_owned: true
184 }
185 }
186 return fasthttp.HttpResponse{
187 content: http.new_response(
188 status: .bad_request
189 body: 'Mismatch of body length and Content-Length header'
190 header: http.new_header(
191 key: .content_type
192 value: 'text/plain'
193 ).join(headers_close)
194 ).bytes()
195 content_owned: true
196 }
197}
198
199// runs middleware, and finds the correct route for a request.
200fn handle_request_and_route[A, X](mut app A, req http.Request, _client_fd int, params RequestParams) &Context {
201 // Create and populate the `veb.Context` from the request.
202 mut url := urllib.parse_request_uri(req.url) or {
203 // This should be rare if http.parse_request succeeded.
204 mut bad_ctx := &Context{
205 req: req
206 }
207 bad_ctx.not_found()
208 return bad_ctx
209 }
210 query := parse_query_from_url(url)
211 form, files := parse_form_from_request(req) or {
212 mut bad_ctx := &Context{
213 req: req
214 }
215 bad_ctx.request_error('Failed to parse form data: ${err.msg()}')
216 return bad_ctx
217 }
218 $if trace_prealloc ? {
219 unsafe { prealloc_scope_checkpoint(c'veb parsed url/form') }
220 }
221 host_with_port := req.header.get(.host) or { '' }
222 host := request_host_name(host_with_port)
223 page_gen_start := if params.benchmark_page_generation { time.ticks() } else { 0 }
224 $if trace_prealloc ? {
225 unsafe { prealloc_scope_checkpoint(c'veb parsed host') }
226 }
227 mut ctx := &Context{
228 req: req
229 page_gen_start: page_gen_start
230 client_fd: _client_fd
231 client_wants_to_close: request_has_connection_close(req)
232 query: query
233 form: form
234 files: files
235 }
236 mut user_context := X{
237 Context: ctx
238 }
239 $if trace_prealloc ? {
240 unsafe { prealloc_scope_checkpoint(c'veb context initialized') }
241 }
242 $if A is StaticApp {
243 ctx.custom_mime_types_ref = unsafe { &app.static_mime_types }
244 if serve_if_static[X](static_handler_config(app.static_files, app.static_mime_types,
245 app.static_hosts, app.static_prefixes, app.enable_static_gzip, app.enable_static_zstd,
246 app.enable_static_compression, app.static_compression_max_size,
247 app.static_compression_mime_types, app.enable_markdown_negotiation), mut user_context,
248 url, host)
249 {
250 // Preserve the handled context on the heap before the stack-local user context goes away.
251 unsafe {
252 *ctx = user_context.Context
253 }
254 return ctx
255 }
256 }
257 $if trace_prealloc ? {
258 unsafe { prealloc_scope_checkpoint(c'veb pre-route static checked') }
259 }
260 // Match controller paths first
261 $if A is ControllerInterface {
262 if completed_context := handle_controllers[X](params.controllers_sorted, ctx, mut url, host) {
263 return completed_context
264 }
265 }
266 // Create a new user context and pass veb's context
267 handle_route[A, X](mut app, mut user_context, url, host, params.routes)
268 $if trace_prealloc ? {
269 unsafe { prealloc_scope_checkpoint(c'veb route returned') }
270 }
271 // Preserve the handled context on the heap before the stack-local user context goes away.
272 unsafe {
273 *ctx = user_context.Context
274 }
275 return ctx
276}
277
278// decode_chunked_body decodes a chunked transfer-encoded body string
279// into the raw body content.
280fn decode_chunked_body(data string) !string {
281 mut sb := strings.new_builder(data.len)
282 mut pos := 0
283 for pos < data.len {
284 // Find the end of the chunk size line
285 line_end := data.index_after_('\r\n', pos)
286 if line_end == -1 {
287 return error('invalid chunked body: missing chunk size line ending')
288 }
289 chunk_size_str := data[pos..line_end].all_before(';').trim_space()
290 if chunk_size_str.len == 0 {
291 return error('invalid chunked body: empty chunk size')
292 }
293 chunk_size := int(strconv.parse_uint(chunk_size_str, 16, 64) or {
294 return error('invalid chunked body: bad chunk size')
295 })
296 pos = line_end + 2 // skip past \r\n
297 if chunk_size == 0 {
298 // Terminal chunk - skip trailers
299 break
300 }
301 if pos + chunk_size > data.len {
302 return error('invalid chunked body: chunk data truncated')
303 }
304 sb.write_string(data[pos..pos + chunk_size])
305 pos += chunk_size + 2 // skip chunk data + \r\n delimiter
306 }
307 return sb.str()
308}
309