v / vlib / picoev / picoev.v
366 lines · 339 sloc · 10.06 KB · 491b27ebc36a48e39005748c30a0dcda61e3f904
Raw
1module picoev
2
3import net
4import pico_http_parser
5import time
6
7// maximum size of the event queue.
8pub const max_queue = 4096
9
10// event for incoming data ready to be read on a socket.
11pub const picoev_read = 1
12
13// event for socket ready for writing.
14pub const picoev_write = 2
15
16// event indicating a timeout has occurred.
17pub const picoev_timeout = 4
18
19// flag for adding a file descriptor to the event loop.
20pub const picoev_add = 0x40000000
21
22// flag for removing a file descriptor from the event loop.
23pub const picoev_del = 0x20000000
24
25// event read/write.
26pub const picoev_readwrite = 3
27
28// Target is a data representation of everything that needs to be associated with a single file descriptor (connection).
29pub struct Target {
30pub mut:
31 fd int // file descriptor
32 loop_id int = -1
33 events u32
34 cb fn (int, int, voidptr) = unsafe { nil }
35 // used internally by the kqueue implementation
36 backend int
37}
38
39// Config configures the Picoev instance with server settings and callbacks.
40pub struct Config {
41pub:
42 port int = 8080
43 cb fn (voidptr, pico_http_parser.Request, mut pico_http_parser.Response) = unsafe { nil }
44 err_cb fn (voidptr, pico_http_parser.Request, mut pico_http_parser.Response, IError) = default_error_callback
45 raw_cb fn (mut Picoev, int, int) = unsafe { nil }
46 user_data voidptr = unsafe { nil }
47 timeout_secs int = 8
48 max_headers int = 100
49 max_read int = 4096
50 max_write int = 8192
51 family net.AddrFamily = .ip6
52 host string
53}
54
55// Core structure for managing the event loop and connections.
56// Contains event loop, file descriptor table, timeouts, buffers, and configuration.
57@[heap]
58pub struct Picoev {
59 cb fn (voidptr, pico_http_parser.Request, mut pico_http_parser.Response) = unsafe { nil }
60 error_callback fn (voidptr, pico_http_parser.Request, mut pico_http_parser.Response, IError) = default_error_callback
61 raw_callback fn (mut Picoev, int, int) = unsafe { nil }
62
63 timeout_secs int
64 max_headers int = 100
65 max_read int = 4096
66 max_write int = 8192
67mut:
68 loop &LoopType = unsafe { nil }
69 file_descriptors [4096]&Target // TODO: use max_fds here, instead of the hardcoded size, when the compiler allows it
70 timeouts map[int]i64
71 num_loops int
72
73 buf &u8 = unsafe { nil }
74 idx [max_fds]int
75 out &u8 = unsafe { nil }
76
77 date string
78pub:
79 user_data voidptr = unsafe { nil }
80}
81
82// init fills the `file_descriptors` array.
83pub fn (mut pv Picoev) init() {
84 // assert max_fds > 0
85 pv.num_loops = 0
86 for i in 0 .. max_fds {
87 pv.file_descriptors[i] = &Target{}
88 }
89}
90
91// add a file descriptor to the event loop.
92@[direct_array_access]
93pub fn (mut pv Picoev) add(fd int, events int, timeout int, callback voidptr) int {
94 if pv == unsafe { nil } || fd < 0 || fd >= max_fds {
95 return -1 // Invalid arguments
96 }
97 mut target := pv.file_descriptors[fd]
98 target.fd = fd
99 target.cb = callback
100 target.loop_id = pv.loop.id
101 target.events = 0
102 if pv.update_events(fd, events | picoev_add) != 0 {
103 if pv.delete(fd) != 0 {
104 elog('Error during del')
105 }
106 return -1
107 }
108 pv.set_timeout(fd, timeout)
109 return 0
110}
111
112// remove a file descriptor from the event loop.
113@[direct_array_access]
114pub fn (mut pv Picoev) delete(fd int) int {
115 if fd < 0 || fd >= max_fds {
116 return -1 // Invalid fd
117 }
118 mut target := pv.file_descriptors[fd]
119 trace_fd('remove ${fd}')
120 if pv.update_events(fd, picoev_del) != 0 {
121 elog('Error during update_events. event: `picoev.picoev_del`')
122 return -1
123 }
124 pv.set_timeout(fd, 0)
125 target.loop_id = -1
126 target.fd = 0
127 target.cb = unsafe { nil } // Clear callback to prevent accidental invocations
128 return 0
129}
130
131fn (mut pv Picoev) loop_once(max_wait_in_sec int) int {
132 pv.loop.now = get_time()
133 if pv.poll_once(max_wait_in_sec) != 0 {
134 $if !windows {
135 if C.errno == net.error_eintr {
136 // Signal-driven wakeups are transient. The caller should keep serving,
137 // instead of spinning on a logged "error" until the signal source stops.
138 return 0
139 }
140 }
141 elog('Error during poll_once')
142 return -1
143 }
144 if max_wait_in_sec == 0 {
145 // If no waiting, skip timeout handling for potential performance optimization
146 return 0
147 }
148 // Update loop start time again if waiting occurred
149 pv.loop.now = get_time()
150 pv.handle_timeout()
151 return 0
152}
153
154// set_timeout sets the timeout in seconds for a file descriptor. If a timeout occurs
155// the file descriptors target callback is called with a timeout event.
156@[direct_array_access; inline]
157fn (mut pv Picoev) set_timeout(fd int, secs int) {
158 assert fd < max_fds
159 if secs == 0 {
160 pv.timeouts.delete(fd)
161 } else {
162 pv.timeouts[fd] = pv.loop.now + secs
163 }
164}
165
166// handle_timeout loops over all file descriptors and removes them from the loop
167// if they are timed out. Also the file descriptors target callback is called with a
168// timeout event.
169@[direct_array_access; inline]
170fn (mut pv Picoev) handle_timeout() {
171 mut to_remove := []int{}
172 for fd, timeout in pv.timeouts {
173 if timeout <= pv.loop.now {
174 to_remove << fd
175 }
176 }
177 for fd in to_remove {
178 target := pv.file_descriptors[fd]
179 assert target.loop_id == pv.loop.id
180 pv.timeouts.delete(fd)
181 unsafe { target.cb(fd, picoev_timeout, &pv) }
182 }
183}
184
185// accept_callback accepts a new connection from `listen_fd` and adds it to the event loop.
186fn accept_callback(listen_fd int, _events int, cb_arg voidptr) {
187 mut pv := unsafe { &Picoev(cb_arg) }
188 accepted_fd := accept(listen_fd)
189 if accepted_fd == -1 {
190 if fatal_socket_error(accepted_fd) == false {
191 return
192 }
193 elog('Error during accept')
194 return
195 }
196 if accepted_fd >= max_fds {
197 // should never happen
198 elog('Error during accept, accepted_fd >= max_fd')
199 close_socket(accepted_fd)
200 return
201 }
202 trace_fd('accept ${accepted_fd}')
203 setup_sock(accepted_fd) or {
204 elog('setup_sock failed, fd: ${accepted_fd}, listen_fd: ${listen_fd}, err: ${err.code()}')
205 pv.error_callback(pv.user_data, pico_http_parser.Request{}, mut
206 &pico_http_parser.Response{}, err)
207 close_socket(accepted_fd) // Close fd on failure
208 return
209 }
210 pv.add(accepted_fd, picoev_read, pv.timeout_secs, raw_callback)
211}
212
213// close_conn closes the socket `fd` and removes it from the loop.
214@[inline]
215pub fn (mut pv Picoev) close_conn(fd int) {
216 if pv.delete(fd) != 0 {
217 elog('Error during del')
218 }
219 close_socket(fd)
220}
221
222// raw_callback handles raw events (read, write, timeout) for a file descriptor.
223@[direct_array_access]
224fn raw_callback(fd int, events int, context voidptr) {
225 mut pv := unsafe { &Picoev(context) }
226 defer {
227 pv.idx[fd] = 0
228 }
229 if events & picoev_timeout != 0 {
230 trace_fd('timeout ${fd}')
231 if !isnil(pv.raw_callback) {
232 pv.raw_callback(mut pv, fd, events)
233 return
234 }
235 pv.close_conn(fd)
236 return
237 } else if events & picoev_read != 0 {
238 pv.set_timeout(fd, pv.timeout_secs)
239 if !isnil(pv.raw_callback) {
240 pv.raw_callback(mut pv, fd, events)
241 return
242 }
243 mut request_buffer := pv.buf
244 unsafe {
245 request_buffer += fd * pv.max_read // pointer magic
246 }
247 mut req := pico_http_parser.Request{}
248 // Response init
249 mut response_buffer := pv.out
250 unsafe {
251 response_buffer += fd * pv.max_write // pointer magic
252 }
253 mut res := pico_http_parser.Response{
254 fd: fd
255 buf_start: response_buffer
256 buf: response_buffer
257 date: pv.date.str
258 }
259 for {
260 // Request parsing loop
261 r := req_read(fd, request_buffer, pv.max_read, pv.idx[fd]) // Get data from socket
262 if r == 0 {
263 // connection closed by peer
264 pv.close_conn(fd)
265 return
266 } else if r == -1 {
267 if fatal_socket_error(fd) == false {
268 return
269 }
270 elog('Error during req_read')
271 // fatal error
272 pv.close_conn(fd)
273 return
274 }
275 pv.idx[fd] += r
276 mut s := unsafe { tos(request_buffer, pv.idx[fd]) }
277 pret := req.parse_request(s) or {
278 // Parse error
279 pv.error_callback(pv.user_data, req, mut &res, err)
280 return
281 }
282 if pret > 0 { // Success
283 break
284 }
285 assert pret == -2
286 // request is incomplete, continue the loop
287 if pv.idx[fd] == sizeof(request_buffer) {
288 pv.error_callback(pv.user_data, req, mut &res, error('RequestIsTooLongError'))
289 return
290 }
291 }
292 // Callback (should call .end() itself)
293 pv.cb(pv.user_data, req, mut &res)
294 } else if events & picoev_write != 0 {
295 pv.set_timeout(fd, pv.timeout_secs)
296 if !isnil(pv.raw_callback) {
297 pv.raw_callback(mut pv, fd, events)
298 return
299 }
300 }
301}
302
303fn default_error_callback(_data voidptr, _req pico_http_parser.Request, mut res pico_http_parser.Response, error IError) {
304 elog('picoev: ${error}')
305 res.end()
306}
307
308// new creates a `Picoev` struct and initializes the main loop.
309pub fn new(config Config) !&Picoev {
310 listening_socket_fd := listen(config) or {
311 elog('Error during listen: ${err}')
312 return err
313 }
314 mut pv := &Picoev{
315 num_loops: 1
316 cb: config.cb
317 error_callback: config.err_cb
318 raw_callback: config.raw_cb
319 user_data: config.user_data
320 timeout_secs: config.timeout_secs
321 max_headers: config.max_headers
322 max_read: config.max_read
323 max_write: config.max_write
324 }
325 if isnil(pv.raw_callback) {
326 pv.buf = unsafe { malloc_noscan(max_fds * config.max_read + 1) }
327 pv.out = unsafe { malloc_noscan(max_fds * config.max_write + 1) }
328 }
329 // epoll on linux
330 // kqueue on macos and bsd
331 // select on windows and others
332 $if linux || termux {
333 pv.loop = create_epoll_loop(0) or { panic(err) }
334 } $else $if freebsd || macos || openbsd {
335 pv.loop = create_kqueue_loop(0) or { panic(err) }
336 } $else {
337 pv.loop = create_select_loop(0) or { panic(err) }
338 }
339 if pv.loop == unsafe { nil } {
340 elog('Failed to create loop')
341 close_socket(listening_socket_fd)
342 return unsafe { nil }
343 }
344 pv.init()
345 pv.add(listening_socket_fd, picoev_read, 0, accept_callback)
346 return pv
347}
348
349// serve starts the event loop for accepting new connections.
350// See also picoev.new().
351pub fn (mut pv Picoev) serve() {
352 spawn update_date_string(mut pv)
353 for {
354 pv.loop_once(1)
355 }
356}
357
358// update_date updates the date field of the Picoev instance every second for HTTP headers.
359fn update_date_string(mut pv Picoev) {
360 for {
361 // get GMT (UTC) time for the HTTP Date header
362 gmt := time.utc()
363 pv.date = gmt.http_header_string()
364 time.sleep(time.second)
365 }
366}
367