v2 / vlib / fasthttp / fasthttp_linux.v
643 lines · 598 sloc · 20.08 KB · 45545c2fda3dfafa31fb7341b31b786ad143e67d
Raw
1module fasthttp
2
3import net
4import sync.stdatomic
5import time
6
7#include <sys/epoll.h>
8#include <sys/sendfile.h>
9#include <sys/stat.h>
10#include <netinet/tcp.h>
11
12const epoll_wait_timeout_ms = 100
13const status_408_response = 'HTTP/1.1 408 Request Timeout\r\nContent-Type: text/plain\r\nContent-Length: 19\r\nConnection: close\r\n\r\n408 Request Timeout'.bytes()
14
15fn C.accept4(sockfd i32, addr &net.Addr, addrlen &u32, flags i32) i32
16
17fn C.epoll_create1(__flags i32) i32
18
19fn C.epoll_ctl(__epfd i32, __op i32, __fd i32, __event &C.epoll_event) i32
20
21fn C.epoll_wait(__epfd i32, __events &C.epoll_event, __maxevents i32, __timeout i32) i32
22
23fn C.sendfile(out_fd i32, in_fd i32, offset &i64, count usize) i32
24
25fn C.fstat(fd i32, buf &C.stat) i32
26
27@[typedef]
28union C.epoll_data_t {
29mut:
30 ptr voidptr
31 fd int
32 u32 u32
33 u64 u64
34}
35
36struct C.epoll_event {
37mut:
38 events u32
39 data C.epoll_data_t
40}
41
42pub struct Server {
43pub:
44 family net.AddrFamily = .ip6
45 port int = 3000
46 max_request_buffer_size int = 8192
47 timeout_in_seconds int = 30
48 user_data voidptr
49mut:
50 listen_fds []int = []int{len: max_thread_pool_size, cap: max_thread_pool_size, init: -1}
51 epoll_fds []int = []int{len: max_thread_pool_size, cap: max_thread_pool_size, init: -1}
52 threads []thread = []thread{len: max_thread_pool_size, cap: max_thread_pool_size}
53 request_handler fn (HttpRequest) !HttpResponse @[required]
54 running &stdatomic.AtomicVal[bool] = stdatomic.new_atomic(false)
55 shutting_down &stdatomic.AtomicVal[bool] = stdatomic.new_atomic(false)
56 stopped &stdatomic.AtomicVal[bool] = stdatomic.new_atomic(true)
57 active_requests &stdatomic.AtomicVal[int] = stdatomic.new_atomic(0)
58}
59
60// new_server creates and initializes a new Server instance.
61pub fn new_server(config ServerConfig) !&Server {
62 if config.max_request_buffer_size <= 0 {
63 return error('max_request_buffer_size must be greater than 0')
64 }
65 mut server := &Server{
66 family: config.family
67 port: config.port
68 max_request_buffer_size: config.max_request_buffer_size
69 timeout_in_seconds: config.timeout_in_seconds
70 user_data: config.user_data
71 request_handler: config.handler
72 running: stdatomic.new_atomic(false)
73 shutting_down: stdatomic.new_atomic(false)
74 stopped: stdatomic.new_atomic(true)
75 active_requests: stdatomic.new_atomic(0)
76 }
77 unsafe {
78 server.listen_fds.flags.set(.noslices | .noshrink | .nogrow)
79 server.epoll_fds.flags.set(.noslices | .noshrink | .nogrow)
80 server.threads.flags.set(.noslices | .noshrink | .nogrow)
81 }
82 return server
83}
84
85fn set_blocking(fd int, blocking bool) {
86 flags := C.fcntl(fd, C.F_GETFL, 0)
87 if flags == -1 {
88 // TODO: better error handling
89 eprintln(@LOCATION)
90 return
91 }
92 if blocking {
93 // This removes the O_NONBLOCK flag from flags and set it.
94 C.fcntl(fd, C.F_SETFL, flags & ~C.O_NONBLOCK)
95 } else {
96 // This adds the O_NONBLOCK flag from flags and set it.
97 C.fcntl(fd, C.F_SETFL, flags | C.O_NONBLOCK)
98 }
99}
100
101fn close_socket(fd int) bool {
102 ret := C.close(fd)
103 if ret == -1 {
104 if C.errno == C.EINTR {
105 // Interrupted by signal, retry is safe
106 return close_socket(fd)
107 }
108 eprintln('ERROR: close(fd=${fd}) failed with errno=${C.errno}')
109 return false
110 }
111 return true
112}
113
114fn create_server_socket(server Server) int {
115 // Create a socket with non-blocking mode
116 server_fd := C.socket(i32(server.family), i32(net.SocketType.tcp), 0)
117 if server_fd < 0 {
118 eprintln(@LOCATION)
119 C.perror(c'Socket creation failed')
120 return -1
121 }
122
123 set_blocking(server_fd, false)
124
125 // Enable SO_REUSEADDR and SO_REUSEPORT
126 opt := 1
127 if C.setsockopt(server_fd, C.SOL_SOCKET, C.SO_REUSEADDR, &opt, sizeof(opt)) < 0 {
128 eprintln(@LOCATION)
129 C.perror(c'setsockopt SO_REUSEADDR failed')
130 close_socket(server_fd)
131 return -1
132 }
133 if C.setsockopt(server_fd, C.SOL_SOCKET, C.SO_REUSEPORT, &opt, sizeof(opt)) < 0 {
134 eprintln(@LOCATION)
135 C.perror(c'setsockopt SO_REUSEPORT failed')
136 close_socket(server_fd)
137 return -1
138 }
139
140 addr := if server.family == .ip6 {
141 net.new_ip6(u16(server.port), [u8(0), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]!)
142 } else {
143 net.new_ip(u16(server.port), [u8(0), 0, 0, 0]!)
144 }
145 alen := addr.len()
146 if C.bind(server_fd, voidptr(&addr), alen) < 0 {
147 eprintln(@LOCATION)
148 C.perror(c'Bind failed')
149 close_socket(server_fd)
150 return -1
151 }
152 if C.listen(server_fd, max_connection_size) < 0 {
153 eprintln(@LOCATION)
154 C.perror(c'Listen failed')
155 close_socket(server_fd)
156 return -1
157 }
158 return server_fd
159}
160
161// add_fd_to_epoll adds a file descriptor to the epoll instance
162fn add_fd_to_epoll(epoll_fd int, fd int, events u32) int {
163 mut ev := C.epoll_event{
164 events: events
165 }
166 ev.data.fd = fd
167 if C.epoll_ctl(epoll_fd, C.EPOLL_CTL_ADD, fd, &ev) == -1 {
168 eprintln(@LOCATION)
169 C.perror(c'epoll_ctl')
170 return -1
171 }
172 return 0
173}
174
175// remove_fd_from_epoll removes a file descriptor from the epoll instance
176fn remove_fd_from_epoll(epoll_fd int, fd int) bool {
177 ret := C.epoll_ctl(epoll_fd, C.EPOLL_CTL_DEL, fd, C.NULL)
178 if ret == -1 {
179 eprintln('ERROR: epoll_ctl(DEL, fd=${fd}) failed with errno=${C.errno}')
180 return false
181 }
182 return true
183}
184
185fn handle_accept_loop(epoll_fd int, listen_fd int, mut client_fds map[int]bool) {
186 for {
187 client_fd := C.accept4(listen_fd, C.NULL, C.NULL, C.SOCK_NONBLOCK)
188 if client_fd < 0 {
189 if C.errno == C.EAGAIN || C.errno == C.EWOULDBLOCK {
190 break // No more incoming connections; exit loop.
191 }
192 eprintln(@LOCATION)
193 C.perror(c'Accept failed')
194 break
195 }
196 // Enable TCP_NODELAY for lower latency
197 opt := 1
198 C.setsockopt(client_fd, C.IPPROTO_TCP, C.TCP_NODELAY, &opt, sizeof(opt))
199 // Register client socket with epoll
200 if add_fd_to_epoll(epoll_fd, client_fd, u32(C.EPOLLIN | C.EPOLLET)) == -1 {
201 close_socket(client_fd)
202 continue
203 }
204 client_fds[client_fd] = true
205 }
206}
207
208fn handle_client_closure(epoll_fd int, client_fd int, mut client_fds map[int]bool, mut client_buffers map[int][]u8, mut client_read_starts map[int]i64, mut closing_client_fds map[int]bool) {
209 // Never close the listening socket here
210 if client_fd == 0 {
211 return
212 }
213 if client_fd <= 0 {
214 eprintln('ERROR: Invalid FD=${client_fd} for closure')
215 return
216 }
217 client_fds.delete(client_fd)
218 client_buffers.delete(client_fd)
219 client_read_starts.delete(client_fd)
220 closing_client_fds.delete(client_fd)
221 remove_fd_from_epoll(epoll_fd, client_fd)
222 close_socket(client_fd)
223}
224
225fn close_worker_clients(epoll_fd int, mut client_fds map[int]bool, mut client_buffers map[int][]u8, mut client_read_starts map[int]i64, mut closing_client_fds map[int]bool) {
226 for client_fd in client_fds.keys() {
227 handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut
228 client_read_starts, mut closing_client_fds)
229 }
230}
231
232fn drain_closing_client(epoll_fd int, client_fd int, mut client_fds map[int]bool, mut client_buffers map[int][]u8, mut client_read_starts map[int]i64, mut closing_client_fds map[int]bool) {
233 mut drain_buf := []u8{len: 4096}
234 for {
235 bytes_read := C.recv(client_fd, unsafe { &drain_buf[0] }, drain_buf.len, 0)
236 if bytes_read < 0 {
237 if C.errno == C.EAGAIN || C.errno == C.EWOULDBLOCK {
238 return
239 }
240 handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut
241 client_read_starts, mut closing_client_fds)
242 return
243 }
244 if bytes_read == 0 {
245 handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut
246 client_read_starts, mut closing_client_fds)
247 return
248 }
249 }
250}
251
252fn send_terminal_response_and_drain(client_fd int, response []u8, mut client_buffers map[int][]u8, mut client_read_starts map[int]i64, mut closing_client_fds map[int]bool) {
253 C.send(client_fd, response.data, response.len, C.MSG_NOSIGNAL)
254 net.shutdown(client_fd, how: .write)
255 client_buffers.delete(client_fd)
256 client_read_starts.delete(client_fd)
257 closing_client_fds[client_fd] = true
258}
259
260fn process_request(server &Server, epoll_fd int, client_fd int, request_buffer []u8, mut client_fds map[int]bool, mut client_buffers map[int][]u8, mut client_read_starts map[int]i64, mut closing_client_fds map[int]bool) {
261 mut request_arena := voidptr(unsafe { nil })
262 $if prealloc {
263 request_arena = unsafe { prealloc_scope_begin() }
264 }
265 client_read_starts.delete(client_fd)
266 server.begin_request()
267 defer {
268 server.end_request()
269 }
270 mut decoded_http_request := decode_http_request(request_buffer) or {
271 eprintln('Error decoding request ${err}')
272 C.send(client_fd, tiny_bad_request_response.data, tiny_bad_request_response.len,
273 C.MSG_NOSIGNAL)
274 end_request_arena_current_thread(request_arena)
275 handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut
276 client_read_starts, mut closing_client_fds)
277 return
278 }
279 $if trace_prealloc ? {
280 unsafe { prealloc_scope_checkpoint(c'fasthttp decoded request') }
281 }
282 decoded_http_request.client_conn_fd = client_fd
283 decoded_http_request.user_data = server.user_data
284 mut response := server.request_handler(decoded_http_request) or {
285 eprintln('Error handling request ${err}')
286 C.send(client_fd, tiny_bad_request_response.data, tiny_bad_request_response.len,
287 C.MSG_NOSIGNAL)
288 end_request_arena_current_thread(request_arena)
289 handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut
290 client_read_starts, mut closing_client_fds)
291 return
292 }
293 $if trace_prealloc ? {
294 unsafe { prealloc_scope_checkpoint(c'fasthttp handler returned') }
295 }
296 response.attach_request_arena_if_empty(request_arena)
297 defer {
298 response.free_owned_content()
299 response.end_request_arena_current_thread()
300 }
301
302 match response.takeover_mode {
303 .manual {
304 // The handler has taken ownership of the connection.
305 // Remove from epoll and tracking, but do NOT close the fd.
306 client_fds.delete(client_fd)
307 client_buffers.delete(client_fd)
308 client_read_starts.delete(client_fd)
309 closing_client_fds.delete(client_fd)
310 remove_fd_from_epoll(epoll_fd, client_fd)
311 response.abandon_request_arena_current_thread()
312 return
313 }
314 .reusable {
315 set_blocking(client_fd, false)
316 client_buffers.delete(client_fd)
317 if server.is_shutting_down() || response.should_close {
318 handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut
319 client_read_starts, mut closing_client_fds)
320 }
321 return
322 }
323 .none {}
324 }
325
326 if response.content.len > 0 {
327 mut send_error := false
328 mut pos := 0
329 for pos < response.content.len {
330 sent := C.send(client_fd, unsafe { &response.content[pos] },
331 response.content.len - pos, C.MSG_NOSIGNAL)
332 if sent <= 0 {
333 eprintln('ERROR: send() failed with errno=${C.errno}')
334 send_error = true
335 break
336 }
337 pos += sent
338 }
339 if send_error {
340 handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut
341 client_read_starts, mut closing_client_fds)
342 return
343 }
344 }
345
346 if response.file_path != '' {
347 mut fd := C.open(response.file_path.str, C.O_RDONLY, 0)
348 if fd == -1 {
349 eprintln('ERROR: open file failed')
350 handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut
351 client_read_starts, mut closing_client_fds)
352 return
353 }
354 defer {
355 if fd != -1 {
356 C.close(fd)
357 }
358 }
359 mut st := C.stat{}
360 if C.fstat(fd, &st) != 0 {
361 eprintln('ERROR: fstat failed')
362 handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut
363 client_read_starts, mut closing_client_fds)
364 return
365 }
366 mut offset := i64(0)
367 mut remaining := i64(st.st_size)
368 mut sf_retries := 0
369 for remaining > 0 {
370 ssize := C.sendfile(client_fd, fd, &offset, usize(remaining))
371 if ssize > 0 {
372 remaining -= i64(ssize)
373 sf_retries = 0
374 continue
375 }
376 errno_val := C.errno
377 match errno_val {
378 C.EAGAIN, C.EWOULDBLOCK, C.EINTR {
379 if sf_retries < 3 {
380 sf_retries++
381 continue
382 }
383 eprintln('ERROR: sendfile() transient failure after ${sf_retries} retries (errno=${errno_val})')
384 }
385 C.EBADF {
386 eprintln('ERROR: sendfile() EBADF: input fd or socket not open for required access (errno=${errno_val})')
387 }
388 C.EFAULT {
389 eprintln('ERROR: sendfile() EFAULT: bad address for offset (errno=${errno_val})')
390 }
391 C.EINVAL {
392 eprintln('ERROR: sendfile() EINVAL: invalid descriptor state or non-seekable input (errno=${errno_val})')
393 }
394 C.EIO {
395 eprintln('ERROR: sendfile() EIO: I/O error while reading input file (errno=${errno_val})')
396 }
397 C.ENOMEM {
398 eprintln('ERROR: sendfile() ENOMEM: insufficient kernel memory (errno=${errno_val})')
399 }
400 C.EOVERFLOW {
401 eprintln('ERROR: sendfile() EOVERFLOW: count exceeds file/socket limits (errno=${errno_val})')
402 }
403 C.ESPIPE {
404 eprintln('ERROR: sendfile() ESPIPE: input file not seekable with offset (errno=${errno_val})')
405 }
406 else {
407 eprintln('ERROR: sendfile() failed with errno=${errno_val}')
408 }
409 }
410
411 handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut
412 client_read_starts, mut closing_client_fds)
413 return
414 }
415 }
416
417 client_buffers.delete(client_fd)
418 if server.is_shutting_down() || response.should_close {
419 handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut
420 client_read_starts, mut closing_client_fds)
421 }
422}
423
424fn process_events(server &Server, epoll_fd int, listen_fd int) {
425 mut events := [max_connection_size]C.epoll_event{}
426 mut request_buffer := []u8{len: server.max_request_buffer_size, cap: server.max_request_buffer_size}
427 mut client_fds := map[int]bool{}
428 mut client_buffers := map[int][]u8{}
429 mut client_read_starts := map[int]i64{}
430 mut closing_client_fds := map[int]bool{}
431 unsafe {
432 request_buffer.flags.set(.noslices | .nogrow | .noshrink)
433 }
434 for {
435 if server.is_shutting_down() && server.active_request_count() == 0 {
436 close_worker_clients(epoll_fd, mut client_fds, mut client_buffers, mut
437 client_read_starts, mut closing_client_fds)
438 return
439 }
440 num_events := C.epoll_wait(epoll_fd, &events[0], max_connection_size, epoll_wait_timeout_ms)
441 if num_events < 0 {
442 if C.errno == C.EINTR {
443 continue
444 }
445 if server.is_shutting_down() {
446 continue
447 }
448 eprintln('ERROR: epoll_wait() failed with errno=${C.errno}')
449 continue
450 }
451 for i := 0; i < num_events; i++ {
452 client_fd := unsafe { events[i].data.fd }
453 // Accept new connections when the listening socket is readable
454 if client_fd == listen_fd {
455 if server.is_shutting_down() {
456 continue
457 }
458 handle_accept_loop(epoll_fd, listen_fd, mut client_fds)
459 continue
460 }
461
462 if events[i].events & u32((C.EPOLLHUP | C.EPOLLERR)) != 0 {
463 if client_fd == listen_fd {
464 eprintln('ERROR: listen fd had HUP/ERR')
465 continue
466 }
467 if client_fd > 0 {
468 // Try to send 444 No Response before closing abnormal connection
469 C.send(client_fd, status_444_response.data, status_444_response.len,
470 C.MSG_NOSIGNAL)
471 handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut
472 client_read_starts, mut closing_client_fds)
473 } else {
474 eprintln('ERROR: Invalid FD from epoll: ${client_fd}')
475 }
476 continue
477 }
478 if events[i].events & u32(C.EPOLLIN) != 0 {
479 if server.is_shutting_down() {
480 handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut
481 client_read_starts, mut closing_client_fds)
482 continue
483 }
484 if closing_client_fds[client_fd] or { false } {
485 drain_closing_client(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut
486 client_read_starts, mut closing_client_fds)
487 continue
488 }
489 // Read all available data from the socket
490 mut total_bytes_read := 0
491 mut readed_request_buffer := client_buffers[client_fd] or {
492 []u8{cap: server.max_request_buffer_size}
493 }
494 mut headers_complete := false
495 mut header_too_large := false
496 mut header_end_pos := -1
497 mut request_complete := false
498 mut peer_closed := false
499 mut recv_error := false
500
501 for {
502 bytes_read := C.recv(client_fd, unsafe { &request_buffer[0] },
503 server.max_request_buffer_size - 1, 0)
504 if bytes_read < 0 {
505 if C.errno == C.EAGAIN || C.errno == C.EWOULDBLOCK {
506 // No more data available right now
507 break
508 }
509 // Error occurred
510 eprintln('ERROR: recv() failed with errno=${C.errno}')
511 recv_error = true
512 break
513 } else if bytes_read == 0 {
514 // Connection closed by client
515 peer_closed = true
516 break
517 }
518
519 unsafe {
520 readed_request_buffer.push_many(&request_buffer[0], bytes_read)
521 }
522 total_bytes_read += bytes_read
523 if client_fd !in client_read_starts {
524 client_read_starts[client_fd] = time.sys_mono_now()
525 }
526
527 // Enforce the configured limit on request headers, not on the whole body.
528 buffer_len := readed_request_buffer.len
529 if !headers_complete && buffer_len >= 4 {
530 header_end_pos = find_header_end_in_buf(readed_request_buffer.data,
531 buffer_len)
532 if header_end_pos == -1 {
533 if buffer_len >= server.max_request_buffer_size {
534 header_too_large = true
535 break
536 }
537 } else {
538 headers_complete = true
539 if header_end_pos > server.max_request_buffer_size {
540 header_too_large = true
541 break
542 }
543 }
544 }
545
546 if headers_complete && has_complete_body(readed_request_buffer.data, buffer_len) {
547 request_complete = true
548 break
549 }
550 }
551
552 if header_too_large {
553 send_terminal_response_and_drain(client_fd, status_413_response, mut
554 client_buffers, mut client_read_starts, mut closing_client_fds)
555 continue
556 }
557 if request_complete {
558 process_request(server, epoll_fd, client_fd, readed_request_buffer, mut
559 client_fds, mut client_buffers, mut client_read_starts, mut
560 closing_client_fds)
561 } else if recv_error {
562 // Unexpected recv error - send 444 No Response
563 C.send(client_fd, status_444_response.data, status_444_response.len,
564 C.MSG_NOSIGNAL)
565 handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut
566 client_read_starts, mut closing_client_fds)
567 } else if peer_closed || (total_bytes_read == 0 && readed_request_buffer.len == 0) {
568 // Normal client closure (FIN received)
569 handle_client_closure(epoll_fd, client_fd, mut client_fds, mut client_buffers, mut
570 client_read_starts, mut closing_client_fds)
571 } else if readed_request_buffer.len > 0 {
572 client_buffers[client_fd] = readed_request_buffer
573 }
574 }
575 }
576 if server.timeout_in_seconds > 0 {
577 now := time.sys_mono_now()
578 timeout_ns := i64(server.timeout_in_seconds) * 1_000_000_000
579 for client_fd in client_read_starts.keys() {
580 started := client_read_starts[client_fd] or { continue }
581 if now - started >= timeout_ns {
582 send_terminal_response_and_drain(client_fd, status_408_response, mut
583 client_buffers, mut client_read_starts, mut closing_client_fds)
584 }
585 }
586 }
587 }
588}
589
590fn (mut server Server) stop_accepting() {
591 for i := 0; i < max_thread_pool_size; i++ {
592 if server.listen_fds[i] < 0 {
593 continue
594 }
595 if server.epoll_fds[i] >= 0 {
596 remove_fd_from_epoll(server.epoll_fds[i], server.listen_fds[i])
597 }
598 close_socket(server.listen_fds[i])
599 server.listen_fds[i] = -1
600 }
601}
602
603// run starts the server and begins listening for incoming connections.
604pub fn (mut server Server) run() ! {
605 $if windows {
606 eprintln('Windows is not supported yet')
607 return
608 }
609 for i := 0; i < max_thread_pool_size; i++ {
610 server.listen_fds[i] = create_server_socket(server)
611 if server.listen_fds[i] < 0 {
612 return
613 }
614
615 server.epoll_fds[i] = C.epoll_create1(0)
616 if server.epoll_fds[i] < 0 {
617 C.perror(c'epoll_create1 failed')
618 close_socket(server.listen_fds[i])
619 return
620 }
621
622 // Register the listening socket with each worker epoll for distributed accepts (edge-triggered)
623 if add_fd_to_epoll(server.epoll_fds[i], server.listen_fds[i], u32(C.EPOLLIN | C.EPOLLET)) == -1 {
624 close_socket(server.listen_fds[i])
625 close_socket(server.epoll_fds[i])
626 return
627 }
628
629 server.threads[i] = spawn process_events(server, server.epoll_fds[i], server.listen_fds[i])
630 }
631
632 server.mark_running()
633 println('listening on http://0.0.0.0:${server.port}/')
634 // Main thread waits for workers; accepts are handled in worker epoll loops
635 for i in 0 .. max_thread_pool_size {
636 server.threads[i].wait()
637 if server.epoll_fds[i] >= 0 {
638 close_socket(server.epoll_fds[i])
639 server.epoll_fds[i] = -1
640 }
641 }
642 server.mark_stopped()
643}
644