v / vlib / fasthttp / fasthttp_linux.v
885 lines · 829 sloc · 30.45 KB · b6358f093b6e7fea3f6021f9b3035b9e879b9504
Raw
1module fasthttp
2
3import net
4import sync.stdatomic
5import time
6
7#include <sys/epoll.h>
8#include <sys/sendfile.h>
9#include <sys/stat.h>
10#include <netinet/tcp.h>
11
12const epoll_wait_timeout_ms = 100
13const status_408_response = 'HTTP/1.1 408 Request Timeout\r\nContent-Type: text/plain\r\nContent-Length: 19\r\nConnection: close\r\n\r\n408 Request Timeout'.bytes()
14
15fn C.accept4(sockfd i32, addr &net.Addr, addrlen &u32, flags i32) i32
16
17fn C.epoll_create1(__flags i32) i32
18
19fn C.epoll_ctl(__epfd i32, __op i32, __fd i32, __event &C.epoll_event) i32
20
21fn C.epoll_wait(__epfd i32, __events &C.epoll_event, __maxevents i32, __timeout i32) i32
22
23fn C.sendfile(out_fd i32, in_fd i32, offset &i64, count usize) i32
24
25fn C.fstat(fd i32, buf &C.stat) i32
26
27@[typedef]
28union C.epoll_data_t {
29mut:
30 ptr voidptr
31 fd int
32 u32 u32
33 u64 u64
34}
35
36struct C.epoll_event {
37mut:
38 events u32
39 data C.epoll_data_t
40}
41
42pub struct Server {
43pub:
44 family net.AddrFamily = .ip6
45 port int = 3000
46 max_request_buffer_size int = 8192
47 timeout_in_seconds int = 30
48 user_data voidptr
49mut:
50 listen_fds []int = []int{len: max_thread_pool_size, cap: max_thread_pool_size, init: -1}
51 epoll_fds []int = []int{len: max_thread_pool_size, cap: max_thread_pool_size, init: -1}
52 threads []thread = []thread{len: max_thread_pool_size, cap: max_thread_pool_size}
53 request_handler fn (HttpRequest) !HttpResponse @[required]
54 running &stdatomic.AtomicVal[bool] = stdatomic.new_atomic(false)
55 shutting_down &stdatomic.AtomicVal[bool] = stdatomic.new_atomic(false)
56 stopped &stdatomic.AtomicVal[bool] = stdatomic.new_atomic(true)
57 active_requests &stdatomic.AtomicVal[int] = stdatomic.new_atomic(0)
58}
59
60// new_server creates and initializes a new Server instance.
61pub fn new_server(config ServerConfig) !&Server {
62 if config.max_request_buffer_size <= 0 {
63 return error('max_request_buffer_size must be greater than 0')
64 }
65 mut server := &Server{
66 family: config.family
67 port: config.port
68 max_request_buffer_size: config.max_request_buffer_size
69 timeout_in_seconds: config.timeout_in_seconds
70 user_data: config.user_data
71 request_handler: config.handler
72 running: stdatomic.new_atomic(false)
73 shutting_down: stdatomic.new_atomic(false)
74 stopped: stdatomic.new_atomic(true)
75 active_requests: stdatomic.new_atomic(0)
76 }
77 unsafe {
78 server.listen_fds.flags.set(.noslices | .noshrink | .nogrow)
79 server.epoll_fds.flags.set(.noslices | .noshrink | .nogrow)
80 server.threads.flags.set(.noslices | .noshrink | .nogrow)
81 }
82 return server
83}
84
85fn set_blocking(fd int, blocking bool) {
86 flags := C.fcntl(fd, C.F_GETFL, 0)
87 if flags == -1 {
88 // TODO: better error handling
89 eprintln(@LOCATION)
90 return
91 }
92 if blocking {
93 // This removes the O_NONBLOCK flag from flags and set it.
94 C.fcntl(fd, C.F_SETFL, flags & ~C.O_NONBLOCK)
95 } else {
96 // This adds the O_NONBLOCK flag from flags and set it.
97 C.fcntl(fd, C.F_SETFL, flags | C.O_NONBLOCK)
98 }
99}
100
101// ClientWriteState carries the pending response bytes plus optional file data
102// for a connection that could not be drained in a single non-blocking write.
103// It lives in `client_write_states` until both the in-memory content and the
104// file body have been fully transferred. While a state exists, the connection
105// is also armed for EPOLLOUT so process_events can resume the write without
106// blocking the worker.
107struct ClientWriteState {
108mut:
109 content []u8
110 content_pos int
111 content_owned bool
112 file_fd int = -1
113 file_len i64
114 file_pos i64
115 should_close bool
116 request_arena voidptr
117 request_active bool
118 start_ns i64
119 // epollout_armed records whether we actually had to register EPOLLOUT for
120 // this connection. When set, complete_write performs a DEL+ADD on the fd to
121 // re-deliver any pipelined request bytes that arrived during the write as a
122 // fresh EPOLLIN edge (level-triggered semantics are not used here).
123 epollout_armed bool
124}
125
126// drain_status describes the outcome of one try_drain_write() pass.
127enum DrainStatus {
128 done // all bytes (content + file) have been sent
129 blocked // the kernel send buffer is full; come back on EPOLLOUT
130 failed // unrecoverable error or peer closed mid-transfer
131}
132
133fn close_socket(fd int) bool {
134 ret := C.close(fd)
135 if ret == -1 {
136 if C.errno == C.EINTR {
137 // Interrupted by signal, retry is safe
138 return close_socket(fd)
139 }
140 eprintln('ERROR: close(fd=${fd}) failed with errno=${C.errno}')
141 return false
142 }
143 return true
144}
145
146fn create_server_socket(server Server) int {
147 // Create a socket with non-blocking mode
148 server_fd := C.socket(i32(server.family), i32(net.SocketType.tcp), 0)
149 if server_fd < 0 {
150 eprintln(@LOCATION)
151 C.perror(c'Socket creation failed')
152 return -1
153 }
154
155 set_blocking(server_fd, false)
156
157 // Enable SO_REUSEADDR and SO_REUSEPORT
158 opt := 1
159 if C.setsockopt(server_fd, C.SOL_SOCKET, C.SO_REUSEADDR, &opt, sizeof(opt)) < 0 {
160 eprintln(@LOCATION)
161 C.perror(c'setsockopt SO_REUSEADDR failed')
162 close_socket(server_fd)
163 return -1
164 }
165 if C.setsockopt(server_fd, C.SOL_SOCKET, C.SO_REUSEPORT, &opt, sizeof(opt)) < 0 {
166 eprintln(@LOCATION)
167 C.perror(c'setsockopt SO_REUSEPORT failed')
168 close_socket(server_fd)
169 return -1
170 }
171
172 addr := if server.family == .ip6 {
173 net.new_ip6(u16(server.port), [u8(0), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]!)
174 } else {
175 net.new_ip(u16(server.port), [u8(0), 0, 0, 0]!)
176 }
177 alen := addr.len()
178 if C.bind(server_fd, voidptr(&addr), alen) < 0 {
179 eprintln(@LOCATION)
180 C.perror(c'Bind failed')
181 close_socket(server_fd)
182 return -1
183 }
184 if C.listen(server_fd, max_connection_size) < 0 {
185 eprintln(@LOCATION)
186 C.perror(c'Listen failed')
187 close_socket(server_fd)
188 return -1
189 }
190 return server_fd
191}
192
193// add_fd_to_epoll adds a file descriptor to the epoll instance
194fn add_fd_to_epoll(epoll_fd int, fd int, events u32) int {
195 mut ev := C.epoll_event{
196 events: events
197 }
198 ev.data.fd = fd
199 if C.epoll_ctl(epoll_fd, C.EPOLL_CTL_ADD, fd, &ev) == -1 {
200 eprintln(@LOCATION)
201 C.perror(c'epoll_ctl')
202 return -1
203 }
204 return 0
205}
206
207// remove_fd_from_epoll removes a file descriptor from the epoll instance
208fn remove_fd_from_epoll(epoll_fd int, fd int) bool {
209 ret := C.epoll_ctl(epoll_fd, C.EPOLL_CTL_DEL, fd, C.NULL)
210 if ret == -1 {
211 eprintln('ERROR: epoll_ctl(DEL, fd=${fd}) failed with errno=${C.errno}')
212 return false
213 }
214 return true
215}
216
217fn handle_accept_loop(epoll_fd int, listen_fd int, mut client_fds map[int]bool) {
218 for {
219 client_fd := C.accept4(listen_fd, C.NULL, C.NULL, C.SOCK_NONBLOCK)
220 if client_fd < 0 {
221 if C.errno == C.EAGAIN || C.errno == C.EWOULDBLOCK {
222 break // No more incoming connections; exit loop.
223 }
224 eprintln(@LOCATION)
225 C.perror(c'Accept failed')
226 break
227 }
228 // Enable TCP_NODELAY for lower latency
229 opt := 1
230 C.setsockopt(client_fd, C.IPPROTO_TCP, C.TCP_NODELAY, &opt, sizeof(opt))
231 // Register client socket with epoll
232 if add_fd_to_epoll(epoll_fd, client_fd, u32(C.EPOLLIN | C.EPOLLET)) == -1 {
233 close_socket(client_fd)
234 continue
235 }
236 client_fds[client_fd] = true
237 }
238}
239
240fn free_write_state(server &Server, client_fd int, mut client_write_states map[int]&ClientWriteState) {
241 mut state := client_write_states[client_fd] or { return }
242 if state.content_owned && state.content.cap > 0 {
243 unsafe { state.content.free() }
244 }
245 state.content = []u8{}
246 if state.file_fd != -1 {
247 C.close(state.file_fd)
248 state.file_fd = -1
249 }
250 $if prealloc {
251 if state.request_arena != unsafe { nil } {
252 unsafe { prealloc_scope_free_after(state.request_arena) }
253 }
254 }
255 state.request_arena = unsafe { nil }
256 if state.request_active {
257 server.end_request()
258 state.request_active = false
259 }
260 client_write_states.delete(client_fd)
261 unsafe { free(state) }
262}
263
264fn handle_client_closure(server &Server, epoll_fd int, client_fd int, mut client_fds map[int]bool, mut client_buffers map[int][]u8, mut client_read_starts map[int]i64, mut closing_client_fds map[int]bool, mut client_write_states map[int]&ClientWriteState) {
265 // Never close the listening socket here
266 if client_fd == 0 {
267 return
268 }
269 if client_fd <= 0 {
270 eprintln('ERROR: Invalid FD=${client_fd} for closure')
271 return
272 }
273 client_fds.delete(client_fd)
274 client_buffers.delete(client_fd)
275 client_read_starts.delete(client_fd)
276 closing_client_fds.delete(client_fd)
277 free_write_state(server, client_fd, mut client_write_states)
278 remove_fd_from_epoll(epoll_fd, client_fd)
279 close_socket(client_fd)
280}
281
282fn close_worker_clients(server &Server, epoll_fd int, mut client_fds map[int]bool, mut client_buffers map[int][]u8, mut client_read_starts map[int]i64, mut closing_client_fds map[int]bool, mut client_write_states map[int]&ClientWriteState) {
283 for client_fd in client_fds.keys() {
284 handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut
285 client_read_starts, mut closing_client_fds, mut client_write_states)
286 }
287}
288
289fn drain_closing_client(server &Server, epoll_fd int, client_fd int, mut client_fds map[int]bool, mut client_buffers map[int][]u8, mut client_read_starts map[int]i64, mut closing_client_fds map[int]bool, mut client_write_states map[int]&ClientWriteState) {
290 mut drain_buf := []u8{len: 4096}
291 for {
292 bytes_read := C.recv(client_fd, unsafe { &drain_buf[0] }, drain_buf.len, 0)
293 if bytes_read < 0 {
294 if C.errno == C.EAGAIN || C.errno == C.EWOULDBLOCK {
295 return
296 }
297 handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut
298 client_read_starts, mut closing_client_fds, mut client_write_states)
299 return
300 }
301 if bytes_read == 0 {
302 handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut
303 client_read_starts, mut closing_client_fds, mut client_write_states)
304 return
305 }
306 }
307}
308
309fn send_terminal_response_and_drain(client_fd int, response []u8, mut client_buffers map[int][]u8, mut client_read_starts map[int]i64, mut closing_client_fds map[int]bool) {
310 C.send(client_fd, response.data, response.len, C.MSG_NOSIGNAL)
311 net.shutdown(client_fd, how: .write)
312 client_buffers.delete(client_fd)
313 client_read_starts.delete(client_fd)
314 closing_client_fds[client_fd] = true
315}
316
317// try_drain_write attempts to push the remaining bytes of `state.content` and
318// (if present) `state.file_fd` to the socket in non-blocking mode. It returns
319// `.done` once all data has been transferred, `.blocked` when the kernel send
320// buffer is full (EAGAIN/EWOULDBLOCK), or `.failed` on any unrecoverable error.
321// The caller is responsible for parking the state until EPOLLOUT fires (when
322// `.blocked` is returned) and for cleaning up via `free_write_state` once
323// `.done` or `.failed` is reached.
324fn try_drain_write(client_fd int, mut state ClientWriteState) DrainStatus {
325 // 1. Drain in-memory content (response headers + possibly a body).
326 for state.content_pos < state.content.len {
327 remaining := state.content.len - state.content_pos
328 sent := C.send(client_fd, unsafe { &state.content[state.content_pos] }, remaining,
329 C.MSG_NOSIGNAL)
330 if sent > 0 {
331 state.content_pos += sent
332 continue
333 }
334 if sent < 0 {
335 errno_val := C.errno
336 if errno_val == C.EAGAIN || errno_val == C.EWOULDBLOCK {
337 return .blocked
338 }
339 if errno_val == C.EINTR {
340 continue
341 }
342 eprintln('ERROR: send() failed with errno=${errno_val}')
343 return .failed
344 }
345 // send() returning 0 on a non-blocking TCP socket is unusual; treat it as
346 // a failure rather than spin.
347 return .failed
348 }
349
350 // 2. Drain the optional file body via sendfile(2). sendfile updates the
351 // offset pointer in place, so `state.file_pos` advances automatically.
352 if state.file_fd != -1 {
353 for state.file_pos < state.file_len {
354 remaining := state.file_len - state.file_pos
355 ssize := C.sendfile(client_fd, state.file_fd, &state.file_pos, usize(remaining))
356 if ssize > 0 {
357 continue
358 }
359 if ssize == 0 {
360 // Short transfer: the file shrank between fstat() and now, or the
361 // peer closed. We have already promised the full Content-Length,
362 // so the response is now truncated -- closing the connection is
363 // the only way to keep keep-alive clients in sync.
364 eprintln('ERROR: sendfile() returned 0 with ${remaining} bytes still pending; closing connection to avoid desync')
365 return .failed
366 }
367 errno_val := C.errno
368 if errno_val == C.EAGAIN || errno_val == C.EWOULDBLOCK {
369 return .blocked
370 }
371 if errno_val == C.EINTR {
372 continue
373 }
374 match errno_val {
375 C.EBADF {
376 eprintln('ERROR: sendfile() EBADF: input fd or socket not open for required access (errno=${errno_val})')
377 }
378 C.EFAULT {
379 eprintln('ERROR: sendfile() EFAULT: bad address for offset (errno=${errno_val})')
380 }
381 C.EINVAL {
382 eprintln('ERROR: sendfile() EINVAL: invalid descriptor state or non-seekable input (errno=${errno_val})')
383 }
384 C.EIO {
385 eprintln('ERROR: sendfile() EIO: I/O error while reading input file (errno=${errno_val})')
386 }
387 C.ENOMEM {
388 eprintln('ERROR: sendfile() ENOMEM: insufficient kernel memory (errno=${errno_val})')
389 }
390 C.EOVERFLOW {
391 eprintln('ERROR: sendfile() EOVERFLOW: count exceeds file/socket limits (errno=${errno_val})')
392 }
393 C.ESPIPE {
394 eprintln('ERROR: sendfile() ESPIPE: input file not seekable with offset (errno=${errno_val})')
395 }
396 else {
397 eprintln('ERROR: sendfile() failed with errno=${errno_val}')
398 }
399 }
400
401 return .failed
402 }
403 // Done with the file -- close the fd eagerly so keep-alive doesn't hold it open.
404 C.close(state.file_fd)
405 state.file_fd = -1
406 }
407
408 return .done
409}
410
411// arm_epollout switches the connection's epoll mask from EPOLLIN|EPOLLET to
412// EPOLLIN|EPOLLOUT|EPOLLET so the next round of pending bytes is delivered as
413// an EPOLLOUT event instead of blocking the worker.
414fn arm_epollout(epoll_fd int, client_fd int) int {
415 mut ev := C.epoll_event{
416 events: u32(C.EPOLLIN | C.EPOLLOUT | C.EPOLLET)
417 }
418 ev.data.fd = client_fd
419 return C.epoll_ctl(epoll_fd, C.EPOLL_CTL_MOD, client_fd, &ev)
420}
421
422// complete_write tears down the per-fd write state after a successful drain,
423// then either closes the connection or leaves it in keep-alive mode.
424fn complete_write(server &Server, epoll_fd int, client_fd int, mut client_fds map[int]bool, mut client_buffers map[int][]u8, mut client_read_starts map[int]i64, mut closing_client_fds map[int]bool, mut client_write_states map[int]&ClientWriteState) {
425 state := client_write_states[client_fd] or { return }
426 should_close := state.should_close
427 epollout_was_armed := state.epollout_armed
428 free_write_state(server, client_fd, mut client_write_states)
429 client_buffers.delete(client_fd)
430 if server.is_shutting_down() || should_close {
431 handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut
432 client_read_starts, mut closing_client_fds, mut client_write_states)
433 return
434 }
435 // Keep-alive: if EPOLLOUT was registered for the write, drop the fd from
436 // epoll and re-add it with the original EPOLLIN|EPOLLET mask. The re-add
437 // causes the kernel to fire a fresh edge for any pipelined request bytes
438 // that piled up in the recv buffer while we were write-blocked; a plain
439 // EPOLL_CTL_MOD would not generate that edge.
440 if epollout_was_armed {
441 remove_fd_from_epoll(epoll_fd, client_fd)
442 if add_fd_to_epoll(epoll_fd, client_fd, u32(C.EPOLLIN | C.EPOLLET)) == -1 {
443 handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut
444 client_read_starts, mut closing_client_fds, mut client_write_states)
445 }
446 }
447}
448
449// handle_write resumes a blocked write when EPOLLOUT fires for `client_fd`.
450fn handle_write(server &Server, epoll_fd int, client_fd int, mut client_fds map[int]bool, mut client_buffers map[int][]u8, mut client_read_starts map[int]i64, mut closing_client_fds map[int]bool, mut client_write_states map[int]&ClientWriteState) {
451 mut state := client_write_states[client_fd] or { return }
452 status := try_drain_write(client_fd, mut state)
453 match status {
454 .done {
455 complete_write(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut
456 client_read_starts, mut closing_client_fds, mut client_write_states)
457 }
458 .blocked {
459 // Still blocked -- stay armed for the next EPOLLOUT.
460 }
461 .failed {
462 handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut
463 client_read_starts, mut closing_client_fds, mut client_write_states)
464 }
465 }
466}
467
468fn process_request(server &Server, epoll_fd int, client_fd int, request_buffer []u8, mut client_fds map[int]bool, mut client_buffers map[int][]u8, mut client_read_starts map[int]i64, mut closing_client_fds map[int]bool, mut client_write_states map[int]&ClientWriteState) {
469 mut request_arena := voidptr(unsafe { nil })
470 $if prealloc {
471 request_arena = unsafe { prealloc_scope_begin() }
472 }
473 client_read_starts.delete(client_fd)
474 server.begin_request()
475 mut request_active := true
476
477 mut decoded_http_request := decode_http_request(request_buffer) or {
478 eprintln('Error decoding request ${err}')
479 C.send(client_fd, tiny_bad_request_response.data, tiny_bad_request_response.len,
480 C.MSG_NOSIGNAL)
481 end_request_arena_current_thread(request_arena)
482 if request_active {
483 server.end_request()
484 }
485 handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut
486 client_read_starts, mut closing_client_fds, mut client_write_states)
487 return
488 }
489 $if trace_prealloc ? {
490 unsafe { prealloc_scope_checkpoint(c'fasthttp decoded request') }
491 }
492 decoded_http_request.client_conn_fd = client_fd
493 decoded_http_request.user_data = server.user_data
494 mut response := server.request_handler(decoded_http_request) or {
495 eprintln('Error handling request ${err}')
496 C.send(client_fd, tiny_bad_request_response.data, tiny_bad_request_response.len,
497 C.MSG_NOSIGNAL)
498 end_request_arena_current_thread(request_arena)
499 if request_active {
500 server.end_request()
501 }
502 handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut
503 client_read_starts, mut closing_client_fds, mut client_write_states)
504 return
505 }
506 $if trace_prealloc ? {
507 unsafe { prealloc_scope_checkpoint(c'fasthttp handler returned') }
508 }
509 response.attach_request_arena_if_empty(request_arena)
510
511 match response.takeover_mode {
512 .manual {
513 // The handler has taken ownership of the connection.
514 // Remove from epoll and tracking, but do NOT close the fd.
515 client_fds.delete(client_fd)
516 client_buffers.delete(client_fd)
517 client_read_starts.delete(client_fd)
518 closing_client_fds.delete(client_fd)
519 remove_fd_from_epoll(epoll_fd, client_fd)
520 response.abandon_request_arena_current_thread()
521 response.free_owned_content()
522 if request_active {
523 server.end_request()
524 }
525 return
526 }
527 .reusable {
528 set_blocking(client_fd, false)
529 client_buffers.delete(client_fd)
530 response.free_owned_content()
531 response.end_request_arena_current_thread()
532 if request_active {
533 server.end_request()
534 }
535 if server.is_shutting_down() || response.should_close {
536 handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut
537 client_buffers, mut client_read_starts, mut closing_client_fds, mut
538 client_write_states)
539 }
540 return
541 }
542 .none {}
543 }
544
545 // Open the file (if any) before constructing the state so we can fail fast
546 // while we still own the request arena via response.
547 mut file_fd := -1
548 mut file_len := i64(0)
549 if response.file_path != '' {
550 fd := C.open(response.file_path.str, C.O_RDONLY, 0)
551 if fd == -1 {
552 eprintln('ERROR: open file failed')
553 response.free_owned_content()
554 response.end_request_arena_current_thread()
555 if request_active {
556 server.end_request()
557 }
558 handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut
559 client_read_starts, mut closing_client_fds, mut client_write_states)
560 return
561 }
562 mut st := C.stat{}
563 if C.fstat(fd, &st) != 0 {
564 eprintln('ERROR: fstat failed')
565 C.close(fd)
566 response.free_owned_content()
567 response.end_request_arena_current_thread()
568 if request_active {
569 server.end_request()
570 }
571 handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut
572 client_read_starts, mut closing_client_fds, mut client_write_states)
573 return
574 }
575 file_fd = fd
576 file_len = i64(st.st_size)
577 }
578
579 // Move content + arena ownership into the per-fd state. After this point we
580 // must not touch the response's arena via end/free (it will be released by
581 // free_write_state once the drain completes).
582 content_bytes := response.take_or_clone_content()
583 arena_ptr := response.take_request_arena()
584 leave_request_arena_current_thread(arena_ptr)
585
586 mut state := &ClientWriteState{
587 content: content_bytes
588 content_pos: 0
589 content_owned: true
590 file_fd: file_fd
591 file_len: file_len
592 file_pos: 0
593 should_close: response.should_close
594 request_arena: arena_ptr
595 request_active: request_active
596 start_ns: time.sys_mono_now()
597 }
598 // From here on, the state owns end_request bookkeeping.
599 request_active = false
600 client_write_states[client_fd] = state
601
602 status := try_drain_write(client_fd, mut state)
603 match status {
604 .done {
605 complete_write(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut
606 client_read_starts, mut closing_client_fds, mut client_write_states)
607 }
608 .blocked {
609 // Kernel send buffer is full. Park the state and resume on EPOLLOUT.
610 client_buffers.delete(client_fd)
611 client_read_starts.delete(client_fd)
612 if arm_epollout(epoll_fd, client_fd) == -1 {
613 eprintln('ERROR: epoll_ctl(MOD, EPOLLOUT) failed errno=${C.errno}')
614 handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut
615 client_buffers, mut client_read_starts, mut closing_client_fds, mut
616 client_write_states)
617 } else {
618 state.epollout_armed = true
619 }
620 }
621 .failed {
622 handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut
623 client_read_starts, mut closing_client_fds, mut client_write_states)
624 }
625 }
626}
627
628fn process_events(server &Server, epoll_fd int, listen_fd int) {
629 mut events := [max_connection_size]C.epoll_event{}
630 mut request_buffer := []u8{len: server.max_request_buffer_size, cap: server.max_request_buffer_size}
631 mut client_fds := map[int]bool{}
632 mut client_buffers := map[int][]u8{}
633 mut client_read_starts := map[int]i64{}
634 mut closing_client_fds := map[int]bool{}
635 mut client_write_states := map[int]&ClientWriteState{}
636 unsafe {
637 request_buffer.flags.set(.noslices | .nogrow | .noshrink)
638 }
639 for {
640 if server.is_shutting_down() && server.active_request_count() == 0 {
641 close_worker_clients(server, epoll_fd, mut client_fds, mut client_buffers, mut
642 client_read_starts, mut closing_client_fds, mut client_write_states)
643 return
644 }
645 num_events := C.epoll_wait(epoll_fd, &events[0], max_connection_size, epoll_wait_timeout_ms)
646 if num_events < 0 {
647 if C.errno == C.EINTR {
648 continue
649 }
650 if server.is_shutting_down() {
651 continue
652 }
653 eprintln('ERROR: epoll_wait() failed with errno=${C.errno}')
654 continue
655 }
656 for i := 0; i < num_events; i++ {
657 client_fd := unsafe { events[i].data.fd }
658 // Accept new connections when the listening socket is readable
659 if client_fd == listen_fd {
660 if server.is_shutting_down() {
661 continue
662 }
663 handle_accept_loop(epoll_fd, listen_fd, mut client_fds)
664 continue
665 }
666
667 if events[i].events & u32((C.EPOLLHUP | C.EPOLLERR)) != 0 {
668 if client_fd == listen_fd {
669 eprintln('ERROR: listen fd had HUP/ERR')
670 continue
671 }
672 if client_fd > 0 {
673 // Don't bother sending 444 if there is an in-flight write -- the
674 // peer is already disconnected and we want to tear down promptly.
675 if client_fd !in client_write_states {
676 C.send(client_fd, status_444_response.data, status_444_response.len,
677 C.MSG_NOSIGNAL)
678 }
679 handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut
680 client_buffers, mut client_read_starts, mut closing_client_fds, mut
681 client_write_states)
682 } else {
683 eprintln('ERROR: Invalid FD from epoll: ${client_fd}')
684 }
685 continue
686 }
687 if events[i].events & u32(C.EPOLLOUT) != 0 {
688 handle_write(server, epoll_fd, client_fd, mut client_fds, mut client_buffers, mut
689 client_read_starts, mut closing_client_fds, mut client_write_states)
690 // If both EPOLLIN and EPOLLOUT fired, we still want to drain the
691 // socket -- fall through to the EPOLLIN branch below.
692 if client_fd !in client_fds {
693 continue
694 }
695 }
696 if events[i].events & u32(C.EPOLLIN) != 0 {
697 if server.is_shutting_down() {
698 handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut
699 client_buffers, mut client_read_starts, mut closing_client_fds, mut
700 client_write_states)
701 continue
702 }
703 if closing_client_fds[client_fd] or { false } {
704 drain_closing_client(server, epoll_fd, client_fd, mut client_fds, mut
705 client_buffers, mut client_read_starts, mut closing_client_fds, mut
706 client_write_states)
707 continue
708 }
709 if client_fd in client_write_states {
710 // A response is still being drained for this fd. Reading a new
711 // request now would overwrite the in-flight ClientWriteState and
712 // silently drop the response/file transfer. Defer until the write
713 // completes -- on keep-alive completion we re-arm EPOLLIN below so
714 // any bytes that arrived during the write get delivered as a fresh
715 // edge.
716 continue
717 }
718 // Read all available data from the socket
719 mut total_bytes_read := 0
720 mut readed_request_buffer := client_buffers[client_fd] or {
721 []u8{cap: server.max_request_buffer_size}
722 }
723 mut headers_complete := false
724 mut header_too_large := false
725 mut header_end_pos := -1
726 mut request_complete := false
727 mut peer_closed := false
728 mut recv_error := false
729
730 for {
731 bytes_read := C.recv(client_fd, unsafe { &request_buffer[0] },
732 server.max_request_buffer_size - 1, 0)
733 if bytes_read < 0 {
734 if C.errno == C.EAGAIN || C.errno == C.EWOULDBLOCK {
735 // No more data available right now
736 break
737 }
738 // Error occurred
739 eprintln('ERROR: recv() failed with errno=${C.errno}')
740 recv_error = true
741 break
742 } else if bytes_read == 0 {
743 // Connection closed by client
744 peer_closed = true
745 break
746 }
747
748 unsafe {
749 readed_request_buffer.push_many(&request_buffer[0], bytes_read)
750 }
751 total_bytes_read += bytes_read
752 if client_fd !in client_read_starts {
753 client_read_starts[client_fd] = time.sys_mono_now()
754 }
755
756 // Enforce the configured limit on request headers, not on the whole body.
757 buffer_len := readed_request_buffer.len
758 if !headers_complete && buffer_len >= 4 {
759 header_end_pos = find_header_end_in_buf(readed_request_buffer.data,
760 buffer_len)
761 if header_end_pos == -1 {
762 if buffer_len >= server.max_request_buffer_size {
763 header_too_large = true
764 break
765 }
766 } else {
767 headers_complete = true
768 if header_end_pos > server.max_request_buffer_size {
769 header_too_large = true
770 break
771 }
772 }
773 }
774
775 if headers_complete && has_complete_body(readed_request_buffer.data, buffer_len) {
776 request_complete = true
777 break
778 }
779 }
780
781 if header_too_large {
782 send_terminal_response_and_drain(client_fd, status_413_response, mut
783 client_buffers, mut client_read_starts, mut closing_client_fds)
784 continue
785 }
786 if request_complete {
787 process_request(server, epoll_fd, client_fd, readed_request_buffer, mut
788 client_fds, mut client_buffers, mut client_read_starts, mut
789 closing_client_fds, mut client_write_states)
790 } else if recv_error {
791 // Unexpected recv error - send 444 No Response
792 C.send(client_fd, status_444_response.data, status_444_response.len,
793 C.MSG_NOSIGNAL)
794 handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut
795 client_buffers, mut client_read_starts, mut closing_client_fds, mut
796 client_write_states)
797 } else if peer_closed || (total_bytes_read == 0 && readed_request_buffer.len == 0) {
798 // Normal client closure (FIN received)
799 handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut
800 client_buffers, mut client_read_starts, mut closing_client_fds, mut
801 client_write_states)
802 } else if readed_request_buffer.len > 0 {
803 client_buffers[client_fd] = readed_request_buffer
804 }
805 }
806 }
807 if server.timeout_in_seconds > 0 {
808 now := time.sys_mono_now()
809 timeout_ns := i64(server.timeout_in_seconds) * 1_000_000_000
810 for client_fd in client_read_starts.keys() {
811 started := client_read_starts[client_fd] or { continue }
812 if now - started >= timeout_ns {
813 send_terminal_response_and_drain(client_fd, status_408_response, mut
814 client_buffers, mut client_read_starts, mut closing_client_fds)
815 }
816 }
817 // Sweep write-side stalls: a client that armed EPOLLOUT but never
818 // drains within `timeout_in_seconds` gets the connection torn down so
819 // the worker isn't pinned waiting for a dead peer.
820 for client_fd in client_write_states.keys() {
821 state := client_write_states[client_fd] or { continue }
822 if state.start_ns > 0 && now - state.start_ns >= timeout_ns {
823 handle_client_closure(server, epoll_fd, client_fd, mut client_fds, mut
824 client_buffers, mut client_read_starts, mut closing_client_fds, mut
825 client_write_states)
826 }
827 }
828 }
829 }
830}
831
832fn (mut server Server) stop_accepting() {
833 for i := 0; i < max_thread_pool_size; i++ {
834 if server.listen_fds[i] < 0 {
835 continue
836 }
837 if server.epoll_fds[i] >= 0 {
838 remove_fd_from_epoll(server.epoll_fds[i], server.listen_fds[i])
839 }
840 close_socket(server.listen_fds[i])
841 server.listen_fds[i] = -1
842 }
843}
844
845// run starts the server and begins listening for incoming connections.
846pub fn (mut server Server) run() ! {
847 $if windows {
848 eprintln('Windows is not supported yet')
849 return
850 }
851 for i := 0; i < max_thread_pool_size; i++ {
852 server.listen_fds[i] = create_server_socket(server)
853 if server.listen_fds[i] < 0 {
854 return
855 }
856
857 server.epoll_fds[i] = C.epoll_create1(0)
858 if server.epoll_fds[i] < 0 {
859 C.perror(c'epoll_create1 failed')
860 close_socket(server.listen_fds[i])
861 return
862 }
863
864 // Register the listening socket with each worker epoll for distributed accepts (edge-triggered)
865 if add_fd_to_epoll(server.epoll_fds[i], server.listen_fds[i], u32(C.EPOLLIN | C.EPOLLET)) == -1 {
866 close_socket(server.listen_fds[i])
867 close_socket(server.epoll_fds[i])
868 return
869 }
870
871 server.threads[i] = spawn process_events(server, server.epoll_fds[i], server.listen_fds[i])
872 }
873
874 server.mark_running()
875 println('listening on http://0.0.0.0:${server.port}/')
876 // Main thread waits for workers; accepts are handled in worker epoll loops
877 for i in 0 .. max_thread_pool_size {
878 server.threads[i].wait()
879 if server.epoll_fds[i] >= 0 {
880 close_socket(server.epoll_fds[i])
881 server.epoll_fds[i] = -1
882 }
883 }
884 server.mark_stopped()
885}
886