v / vlib / fasthttp / fasthttp_bsd.c.v
730 lines · 662 sloc · 20.84 KB · 45545c2fda3dfafa31fb7341b31b786ad143e67d
Raw
1module fasthttp
2
3import net
4import sync.stdatomic
5import time
6
7#include <sys/event.h>
8#include <sys/stat.h>
9#include <sys/types.h>
10#include <sys/socket.h>
11#include <sys/uio.h>
12#include <signal.h>
13
14fn C.signal(sig int, handler voidptr) voidptr
15
16const buf_size = max_connection_size
17const kqueue_max_events = 128
18const backlog = max_connection_size
19const kqueue_wait_timeout_ms = 100
20
21// send_flags is OR'd into every C.send() call in this file. On OpenBSD,
22// which lacks the per-socket SO_NOSIGPIPE option, we pass MSG_NOSIGNAL
23// instead so writes to a disconnected peer return EPIPE rather than
24// killing the process with SIGPIPE. Other BSDs use SO_NOSIGPIPE set on
25// the socket at accept time (see accept_clients).
26const send_flags = $if openbsd { int(C.MSG_NOSIGNAL) } $else { 0 }
27
28fn C.kevent(kq i32, changelist &C.kevent, nchanges i32, eventlist &C.kevent, nevents i32, timeout &C.timespec) i32
29fn C.kqueue() i32
30fn C.fstat(fd i32, buf &C.stat) i32
31
32// send_file_bytes has three implementations across BSD-family OSes:
33// macOS: int sendfile(int fd, int s, off_t offset, off_t *len, sf_hdtr *hdtr, int flags);
34// (len is in/out: caller sets bytes-to-send, kernel writes bytes-actually-sent)
35// FreeBSD/NetBSD/DragonFly:
36// int sendfile(int fd, int s, off_t offset, size_t nbytes, sf_hdtr *hdtr, off_t *sbytes, int flags);
37// (nbytes is input, sbytes is the separate out-param for bytes actually sent)
38// OpenBSD: no sendfile(2) syscall at all. We fall back to a single
39// pread(2) + send(2) pair per call, using a bounded stack
40// buffer. The outer send_pending() loop will call us again
41// until the file is drained or the socket blocks.
42
43const sendfile_fallback_buf_size = 16384
44
45// send_file_bytes asks the kernel to send up to `nbytes` bytes from `file_fd` at
46// `offset` into socket `sock_fd`, in a single non-blocking operation.
47// Returns (ret, sent) where `ret` is 0 on success or -1 on error (errno set),
48// and `sent` is the number of bytes transferred this call (may be >0 even when ret==-1).
49fn send_file_bytes(file_fd i32, sock_fd i32, offset i64, nbytes i64) (int, i64) {
50 $if macos {
51 mut len := nbytes
52 ret := C.sendfile(file_fd, sock_fd, offset, &len, unsafe { nil }, 0)
53 return int(ret), len
54 } $else $if openbsd {
55 // No sendfile(2) on OpenBSD; pread into a stack buffer, then send.
56 // Cap one call at sendfile_fallback_buf_size so we don't starve
57 // other connections in the kqueue loop.
58 mut buf := [sendfile_fallback_buf_size]u8{}
59 mut want := nbytes
60 if want > sendfile_fallback_buf_size {
61 want = sendfile_fallback_buf_size
62 }
63 nread := C.pread(file_fd, &buf[0], usize(want), offset)
64 if nread <= 0 {
65 // nread == 0 is EOF (shouldn't happen given write_pos < file_len
66 // guards in send_pending, but treat it as an error to close the
67 // connection); nread < 0 propagates errno for EAGAIN handling.
68 return -1, i64(0)
69 }
70 nsent := C.send(sock_fd, &buf[0], usize(nread), send_flags)
71 if nsent < 0 {
72 return -1, i64(0)
73 }
74 return 0, i64(nsent)
75 } $else {
76 mut sbytes := i64(0)
77 ret := C.sendfile(file_fd, sock_fd, offset, usize(nbytes), unsafe { nil }, &sbytes, 0)
78 return int(ret), sbytes
79 }
80}
81
82$if macos {
83 // int sendfile(int fd, int s, off_t offset, off_t *len, struct sf_hdtr *hdtr, int flags);
84 fn C.sendfile(fd i32, s i32, offset i64, len &i64, hdtr voidptr, flags i32) i32
85} $else $if openbsd {
86 // ssize_t pread(int fd, void *buf, size_t nbyte, off_t offset);
87 fn C.pread(fd i32, buf voidptr, nbyte usize, offset i64) isize
88} $else {
89 // int sendfile(int fd, int s, off_t offset, size_t nbytes, struct sf_hdtr *hdtr, off_t *sbytes, int flags);
90 fn C.sendfile(fd i32, s i32, offset i64, nbytes usize, hdtr voidptr, sbytes &i64, flags i32) i32
91}
92
93struct C.kevent {
94 ident u64
95 filter i16
96 flags u16
97 fflags u32
98 data isize
99 udata voidptr
100}
101
102// Helper to set fields of a kevent struct.
103fn ev_set(mut ev C.kevent, ident u64, filter i16, flags u16, fflags u32, data isize, udata voidptr) {
104 ev.ident = ident
105 ev.filter = filter
106 ev.flags = flags
107 ev.fflags = fflags
108 ev.data = data
109 ev.udata = udata
110}
111
112struct Conn {
113 fd int
114 user_data voidptr
115mut:
116 read_buf [buf_size]u8
117 read_len int
118 read_extra []u8 // dynamic overflow buffer for large requests (e.g. chunked uploads)
119 write_buf []u8
120 write_pos int
121 request_active bool
122 read_start i64 // monotonic timestamp (in microseconds) when first data was received
123
124 // Sendfile state
125 file_fd int = -1
126 file_len i64
127 file_pos i64
128 should_close bool
129 request_arena voidptr
130}
131
132fn (mut c Conn) free_write_buf() {
133 if c.write_buf.cap > 0 {
134 unsafe { c.write_buf.free() }
135 c.write_buf = []u8{}
136 }
137}
138
139fn (mut c Conn) free_request_arena() {
140 $if prealloc {
141 if c.request_arena != unsafe { nil } {
142 unsafe { prealloc_scope_free_after(c.request_arena) }
143 c.request_arena = unsafe { nil }
144 }
145 }
146}
147
148pub struct Server {
149pub mut:
150 family net.AddrFamily = .ip6
151 port int
152 max_request_buffer_size int = 8192
153 timeout_in_seconds int = 30
154 socket_fd int = -1
155 poll_fd int = -1 // kqueue fd
156 user_data voidptr
157 request_handler fn (HttpRequest) !HttpResponse @[required]
158 running &stdatomic.AtomicVal[bool] = stdatomic.new_atomic(false)
159 shutting_down &stdatomic.AtomicVal[bool] = stdatomic.new_atomic(false)
160 stopped &stdatomic.AtomicVal[bool] = stdatomic.new_atomic(true)
161 active_requests &stdatomic.AtomicVal[int] = stdatomic.new_atomic(0)
162}
163
164// new_server creates and initializes a new Server instance.
165pub fn new_server(config ServerConfig) !&Server {
166 mut server := &Server{
167 family: config.family
168 port: config.port
169 max_request_buffer_size: config.max_request_buffer_size
170 timeout_in_seconds: config.timeout_in_seconds
171 user_data: config.user_data
172 request_handler: config.handler
173 running: stdatomic.new_atomic(false)
174 shutting_down: stdatomic.new_atomic(false)
175 stopped: stdatomic.new_atomic(true)
176 active_requests: stdatomic.new_atomic(0)
177 }
178 return server
179}
180
181fn set_nonblocking(fd int) {
182 flags := C.fcntl(fd, C.F_GETFL, 0)
183 if flags == -1 {
184 return
185 }
186 C.fcntl(fd, C.F_SETFL, flags | C.O_NONBLOCK)
187}
188
189fn add_event(kq int, ident u64, filter i16, flags u16, udata voidptr) int {
190 mut ev := C.kevent{}
191 ev_set(mut &ev, ident, filter, flags, u32(0), isize(0), udata)
192 return C.kevent(kq, &ev, 1, unsafe { nil }, 0, unsafe { nil })
193}
194
195fn delete_event(kq int, ident u64, filter i16, udata voidptr) {
196 mut ev := C.kevent{}
197 ev_set(mut &ev, ident, filter, u16(C.EV_DELETE), u32(0), isize(0), udata)
198 C.kevent(kq, &ev, 1, unsafe { nil }, 0, unsafe { nil })
199}
200
201fn close_conn(server &Server, kq int, c_ptr voidptr, mut clients map[int]voidptr) {
202 mut c := unsafe { &Conn(c_ptr) }
203 clients.delete(c.fd)
204 delete_event(kq, u64(c.fd), i16(C.EVFILT_READ), c)
205 delete_event(kq, u64(c.fd), i16(C.EVFILT_WRITE), c)
206 C.close(c.fd)
207 if c.request_active {
208 server.end_request()
209 c.request_active = false
210 }
211 c.free_write_buf()
212 c.free_request_arena()
213 if c.read_extra.cap > 0 {
214 unsafe { c.read_extra.free() }
215 }
216 if c.file_fd != -1 {
217 C.close(c.file_fd)
218 c.file_fd = -1
219 }
220 unsafe { free(c_ptr) }
221}
222
223fn send_pending(c_ptr voidptr) bool {
224 mut c := unsafe { &Conn(c_ptr) }
225
226 // 1. Send memory buffer (headers or small response)
227 if c.write_pos < c.write_buf.len {
228 remaining := c.write_buf.len - c.write_pos
229 write_ptr := unsafe { &c.write_buf[0] + c.write_pos }
230 sent := C.send(c.fd, write_ptr, remaining, send_flags)
231 if sent > 0 {
232 c.write_pos += int(sent)
233 }
234 if sent < 0 {
235 if C.errno == C.EAGAIN || C.errno == C.EWOULDBLOCK {
236 return true
237 }
238 c.should_close = true
239 return false
240 }
241 }
242
243 // 2. Send file if buffer is fully sent
244 if c.write_pos >= c.write_buf.len && c.file_fd != -1 {
245 remaining := c.file_len - c.file_pos
246 ret, sent := send_file_bytes(c.file_fd, c.fd, c.file_pos, remaining)
247 if sent > 0 {
248 c.file_pos += sent
249 }
250 if ret == -1 {
251 if C.errno == C.EAGAIN || C.errno == C.EWOULDBLOCK {
252 return true
253 }
254 C.close(c.file_fd)
255 c.file_fd = -1
256 c.should_close = true
257 return false
258 }
259 if c.file_pos >= c.file_len {
260 C.close(c.file_fd)
261 c.file_fd = -1
262 } else {
263 return true
264 }
265 }
266
267 return !(c.write_pos >= c.write_buf.len && c.file_fd == -1)
268}
269
270const status_408_response = 'HTTP/1.1 408 Request Timeout\r\nContent-Type: text/plain\r\nContent-Length: 19\r\nConnection: close\r\n\r\n408 Request Timeout'.bytes()
271
272fn send_bad_request(fd int) {
273 C.send(fd, tiny_bad_request_response.data, tiny_bad_request_response.len, send_flags)
274}
275
276fn send_request_timeout(fd int) {
277 C.send(fd, status_408_response.data, status_408_response.len, send_flags)
278}
279
280fn handle_write(server &Server, kq int, c_ptr voidptr, mut clients map[int]voidptr) {
281 if send_pending(c_ptr) {
282 return
283 }
284 complete_response(server, kq, c_ptr, mut clients, true)
285}
286
287fn complete_response(server &Server, kq int, c_ptr voidptr, mut clients map[int]voidptr, remove_write_event bool) {
288 mut c := unsafe { &Conn(c_ptr) }
289 if remove_write_event {
290 delete_event(kq, u64(c.fd), i16(C.EVFILT_WRITE), c)
291 }
292 if server.is_shutting_down() || c.should_close {
293 close_conn(server, kq, c_ptr, mut clients)
294 return
295 }
296 if c.request_active {
297 server.end_request()
298 c.request_active = false
299 }
300 c.free_write_buf()
301 c.free_request_arena()
302 c.write_pos = 0
303 c.read_len = 0
304 if c.read_extra.cap > 0 {
305 unsafe { c.read_extra.free() }
306 c.read_extra = []u8{}
307 }
308 c.read_start = 0
309 c.should_close = false
310}
311
312// process_request handles a complete HTTP request: decodes, calls the handler,
313// sends the response (or handles takeover/sendfile).
314fn process_request(server &Server, kq int, c_ptr voidptr, mut clients map[int]voidptr) {
315 mut c := unsafe { &Conn(c_ptr) }
316 mut request_arena := voidptr(unsafe { nil })
317 $if prealloc {
318 request_arena = unsafe { prealloc_scope_begin() }
319 }
320
321 mut req_buf := c.get_full_request_data()
322 if c.read_extra.cap > 0 {
323 unsafe { c.read_extra.free() }
324 c.read_extra = []u8{}
325 }
326
327 mut decoded := decode_http_request(req_buf) or {
328 send_bad_request(c.fd)
329 end_request_arena_current_thread(request_arena)
330 close_conn(server, kq, c_ptr, mut clients)
331 return
332 }
333 $if trace_prealloc ? {
334 unsafe { prealloc_scope_checkpoint(c'fasthttp decoded request') }
335 }
336 server.begin_request()
337 c.request_active = true
338 decoded.client_conn_fd = c.fd
339 decoded.user_data = server.user_data
340
341 mut resp := server.request_handler(decoded) or {
342 send_bad_request(c.fd)
343 end_request_arena_current_thread(request_arena)
344 close_conn(server, kq, c_ptr, mut clients)
345 return
346 }
347 $if trace_prealloc ? {
348 unsafe { prealloc_scope_checkpoint(c'fasthttp handler returned') }
349 }
350 resp.attach_request_arena_if_empty(request_arena)
351
352 match resp.takeover_mode {
353 .manual {
354 // The handler has taken ownership of the connection.
355 // Remove from kqueue and tracking, but do NOT close the fd.
356 clients.delete(c.fd)
357 delete_event(kq, u64(c.fd), i16(C.EVFILT_READ), c)
358 delete_event(kq, u64(c.fd), i16(C.EVFILT_WRITE), c)
359 if c.request_active {
360 server.end_request()
361 c.request_active = false
362 }
363 resp.free_owned_content()
364 resp.abandon_request_arena_current_thread()
365 unsafe { free(c_ptr) }
366 return
367 }
368 .reusable {
369 set_nonblocking(c.fd)
370 c.read_len = 0
371 c.read_extra.clear()
372 c.read_start = 0
373 if c.request_active {
374 server.end_request()
375 c.request_active = false
376 }
377 resp.free_owned_content()
378 resp.end_request_arena_current_thread()
379 if server.is_shutting_down() || resp.should_close {
380 close_conn(server, kq, c_ptr, mut clients)
381 }
382 return
383 }
384 .none {}
385 }
386
387 c.should_close = resp.should_close
388 c.free_write_buf()
389 c.free_request_arena()
390 c.request_arena = resp.take_request_arena()
391 c.write_buf = resp.take_or_clone_content()
392 $if trace_prealloc ? {
393 unsafe { prealloc_scope_checkpoint(c'fasthttp response retained') }
394 }
395 leave_request_arena_current_thread(c.request_arena)
396 if resp.file_path != '' {
397 fd := C.open(resp.file_path.str, C.O_RDONLY, 0)
398 if fd != -1 {
399 mut st := C.stat{}
400 if C.fstat(fd, &st) == 0 {
401 c.file_fd = fd
402 c.file_len = st.st_size
403 c.file_pos = 0
404 } else {
405 C.close(fd)
406 }
407 }
408 }
409
410 c.write_pos = 0
411 c.read_len = 0
412 c.read_extra.clear()
413 c.read_start = 0
414
415 if send_pending(c_ptr) {
416 add_event(kq, u64(c.fd), i16(C.EVFILT_WRITE), u16(C.EV_ADD | C.EV_ENABLE | C.EV_CLEAR), c)
417 return
418 }
419
420 complete_response(server, kq, c_ptr, mut clients, false)
421}
422
423// total_read_len returns the total number of request bytes received so far,
424// including both the fixed read_buf and the dynamic read_extra overflow.
425fn (c &Conn) total_read_len() int {
426 return c.read_len + c.read_extra.len
427}
428
429// get_full_request_data copies the complete received data into a single []u8.
430fn (c &Conn) get_full_request_data() []u8 {
431 total := c.total_read_len()
432 mut req_buf := []u8{cap: total}
433 unsafe {
434 req_buf.push_many(&c.read_buf[0], c.read_len)
435 }
436 if c.read_extra.len > 0 {
437 req_buf << c.read_extra
438 }
439 return req_buf
440}
441
442fn handle_read(server &Server, kq int, c_ptr voidptr, mut clients map[int]voidptr) {
443 mut c := unsafe { &Conn(c_ptr) }
444
445 // Drain the socket for this kqueue notification. EV_CLEAR only rearms once
446 // all readable data has been consumed.
447 for {
448 if c.read_len < buf_size {
449 n := C.recv(c.fd, &c.read_buf[c.read_len], buf_size - c.read_len, 0)
450 if n < 0 {
451 if C.errno == C.EAGAIN || C.errno == C.EWOULDBLOCK {
452 break
453 }
454 C.send(c.fd, status_444_response.data, status_444_response.len, send_flags)
455 close_conn(server, kq, c_ptr, mut clients)
456 return
457 }
458 if n == 0 {
459 if c.total_read_len() == 0 {
460 close_conn(server, kq, c_ptr, mut clients)
461 return
462 }
463 break
464 }
465 c.read_len += int(n)
466 } else {
467 // Fixed buffer is full, read the rest into dynamic overflow.
468 mut tmp := []u8{len: 65536}
469 n := C.recv(c.fd, tmp.data, tmp.len, 0)
470 if n < 0 {
471 if C.errno == C.EAGAIN || C.errno == C.EWOULDBLOCK {
472 break
473 }
474 C.send(c.fd, status_444_response.data, status_444_response.len, send_flags)
475 close_conn(server, kq, c_ptr, mut clients)
476 return
477 }
478 if n == 0 {
479 if c.total_read_len() == 0 {
480 close_conn(server, kq, c_ptr, mut clients)
481 return
482 }
483 break
484 }
485 c.read_extra << tmp[..int(n)]
486 }
487 }
488
489 total := c.total_read_len()
490 if total == 0 {
491 return
492 }
493
494 // Enforce the configured header limit without capping large request bodies.
495 mut header_end := -1
496 mut full_data := []u8{}
497 if c.read_extra.len > 0 {
498 full_data = c.get_full_request_data()
499 header_end = find_header_end_in_buf(full_data.data, full_data.len)
500 } else {
501 header_end = find_header_end_in_buf(&c.read_buf[0], c.read_len)
502 }
503 if (header_end == -1 && total >= server.max_request_buffer_size)
504 || header_end > server.max_request_buffer_size {
505 C.send(c.fd, status_413_response.data, status_413_response.len, send_flags)
506 close_conn(server, kq, c_ptr, mut clients)
507 return
508 }
509
510 // Record when we first started receiving data for this request
511 if c.read_start == 0 {
512 c.read_start = time.sys_mono_now()
513 }
514
515 // Check if the full body has been received.
516 if c.read_extra.len > 0 {
517 if !has_complete_body(full_data.data, full_data.len) {
518 elapsed_ns := time.sys_mono_now() - c.read_start
519 timeout_ns := i64(server.timeout_in_seconds) * 1_000_000_000
520 if elapsed_ns >= timeout_ns {
521 send_request_timeout(c.fd)
522 close_conn(server, kq, c_ptr, mut clients)
523 }
524 return
525 }
526 } else if !has_complete_body(&c.read_buf[0], c.read_len) {
527 // Body not complete yet - check for timeout
528 elapsed_ns := time.sys_mono_now() - c.read_start
529 timeout_ns := i64(server.timeout_in_seconds) * 1_000_000_000
530 if elapsed_ns >= timeout_ns {
531 send_request_timeout(c.fd)
532 close_conn(server, kq, c_ptr, mut clients)
533 }
534 // Otherwise wait for more data on the next kqueue event
535 return
536 }
537
538 process_request(server, kq, c_ptr, mut clients)
539}
540
541fn accept_clients(kq int, listen_fd int, mut clients map[int]voidptr) {
542 for {
543 client_fd := C.accept(listen_fd, unsafe { nil }, unsafe { nil })
544 if client_fd < 0 {
545 if C.errno == C.EAGAIN || C.errno == C.EWOULDBLOCK {
546 break
547 }
548 C.perror(c'accept')
549 break
550 }
551 set_nonblocking(client_fd)
552 // Prevent SIGPIPE on writes to disconnected clients. macOS and
553 // FreeBSD/NetBSD/DragonFly expose the per-socket SO_NOSIGPIPE
554 // option; OpenBSD does not, and instead expects MSG_NOSIGNAL on
555 // each send(2) call (handled via the `send_flags` const above).
556 $if !openbsd {
557 nosigpipe_opt := 1
558 C.setsockopt(client_fd, C.SOL_SOCKET, C.SO_NOSIGPIPE, &nosigpipe_opt, sizeof(int))
559 }
560 mut c := &Conn{
561 fd: client_fd
562 user_data: unsafe { nil }
563 file_fd: -1
564 }
565 add_event(kq, u64(client_fd), i16(C.EVFILT_READ), u16(C.EV_ADD | C.EV_ENABLE | C.EV_CLEAR),
566 c)
567 clients[client_fd] = c
568 }
569}
570
571fn close_all_conns(server &Server, kq int, mut clients map[int]voidptr) {
572 for client_fd in clients.keys() {
573 c_ptr := clients[client_fd] or { continue }
574 close_conn(server, kq, c_ptr, mut clients)
575 }
576}
577
578fn (mut s Server) stop_accepting() {
579 if s.poll_fd >= 0 && s.socket_fd >= 0 {
580 delete_event(s.poll_fd, u64(s.socket_fd), i16(C.EVFILT_READ), unsafe { nil })
581 }
582 if s.socket_fd >= 0 {
583 C.close(s.socket_fd)
584 s.socket_fd = -1
585 }
586}
587
588// run starts the server and enters the main event loop (Kqueue version).
589pub fn (mut s Server) run() ! {
590 // Ignore SIGPIPE process-wide. Writing to a disconnected socket raises
591 // SIGPIPE by default, which kills the process. We suppress it per-send
592 // (SO_NOSIGPIPE on macos/freebsd/netbsd/dragonfly, MSG_NOSIGNAL on
593 // openbsd), but this signal handler is a safety net for any code path
594 // that might miss it (e.g. spawned SSE/WebSocket threads using
595 // TcpConn.write).
596 C.signal(C.SIGPIPE, C.SIG_IGN)
597
598 s.socket_fd = C.socket(i32(s.family), i32(net.SocketType.tcp), 0)
599 if s.socket_fd < 0 {
600 C.perror(c'socket')
601 return error('socket creation failed')
602 }
603
604 opt := 1
605 C.setsockopt(s.socket_fd, C.SOL_SOCKET, C.SO_REUSEADDR, &opt, sizeof(int))
606
607 addr := if s.family == .ip6 {
608 net.new_ip6(u16(s.port), [u8(0), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]!)
609 } else {
610 net.new_ip(u16(s.port), [u8(0), 0, 0, 0]!)
611 }
612 alen := addr.len()
613
614 if C.bind(s.socket_fd, voidptr(&addr), alen) < 0 {
615 C.perror(c'bind')
616 return error('socket bind failed')
617 }
618 if C.listen(s.socket_fd, backlog) < 0 {
619 C.perror(c'listen')
620 return error('socket listen failed')
621 }
622
623 set_nonblocking(s.socket_fd)
624
625 s.poll_fd = C.kqueue()
626 if s.poll_fd < 0 {
627 C.perror(c'kqueue')
628 return error('kqueue creation failed')
629 }
630
631 add_event(s.poll_fd, u64(s.socket_fd), i16(C.EVFILT_READ),
632 u16(C.EV_ADD | C.EV_ENABLE | C.EV_CLEAR), unsafe { nil })
633
634 listen_fd := s.socket_fd
635 s.mark_running()
636 println('listening on http://0.0.0.0:${s.port}/')
637
638 mut events := [kqueue_max_events]C.kevent{}
639 mut clients := map[int]voidptr{}
640 for {
641 if s.is_shutting_down() && s.active_request_count() == 0 {
642 close_all_conns(s, s.poll_fd, mut clients)
643 break
644 }
645 timeout := C.timespec{
646 tv_sec: 0
647 tv_nsec: kqueue_wait_timeout_ms * 1_000_000
648 }
649 nev := C.kevent(s.poll_fd, unsafe { nil }, 0, &events[0], kqueue_max_events, &timeout)
650 if nev < 0 {
651 if C.errno == C.EINTR {
652 // kevent may return EINTR when the process receives a signal
653 // (e.g. SIGCHLD from an exec'd subprocess). Treat like a timeout.
654 continue
655 }
656 if s.is_shutting_down() {
657 continue
658 }
659 C.perror(c'kevent')
660 break
661 }
662
663 for i := 0; i < nev; i++ {
664 event := events[i]
665 if event.flags & u16(C.EV_ERROR) != 0 {
666 if event.ident == u64(listen_fd) {
667 C.perror(c'listener error')
668 continue
669 }
670 if event.udata != unsafe { nil } {
671 close_conn(s, s.poll_fd, event.udata, mut clients)
672 }
673 continue
674 }
675
676 if event.ident == u64(listen_fd) {
677 if s.is_shutting_down() {
678 continue
679 }
680 accept_clients(s.poll_fd, listen_fd, mut clients)
681 continue
682 }
683
684 if event.udata == unsafe { nil } {
685 continue
686 }
687
688 if event.flags & u16(C.EV_EOF) != 0 {
689 close_conn(s, s.poll_fd, event.udata, mut clients)
690 continue
691 }
692
693 if event.filter == i16(C.EVFILT_READ) {
694 if s.is_shutting_down() {
695 close_conn(s, s.poll_fd, event.udata, mut clients)
696 continue
697 }
698 handle_read(s, s.poll_fd, event.udata, mut clients)
699 } else if event.filter == i16(C.EVFILT_WRITE) {
700 handle_write(s, s.poll_fd, event.udata, mut clients)
701 }
702 }
703 // Sweep for connections waiting for body data that have timed out
704 if s.timeout_in_seconds > 0 {
705 now := time.sys_mono_now()
706 timeout_ns := i64(s.timeout_in_seconds) * 1_000_000_000
707 for client_fd in clients.keys() {
708 c_ptr := clients[client_fd] or { continue }
709 c := unsafe { &Conn(c_ptr) }
710 if c.read_start > 0 && c.read_len > 0 && !c.request_active {
711 elapsed := now - c.read_start
712 if elapsed >= timeout_ns {
713 send_request_timeout(c.fd)
714 close_conn(s, s.poll_fd, c_ptr, mut clients)
715 }
716 }
717 }
718 }
719 }
720
721 if s.socket_fd >= 0 {
722 C.close(s.socket_fd)
723 s.socket_fd = -1
724 }
725 if s.poll_fd >= 0 {
726 C.close(s.poll_fd)
727 s.poll_fd = -1
728 }
729 s.mark_stopped()
730}
731