From 15deb59ce90606ba96ae987c62fbe3844e4f3d18 Mon Sep 17 00:00:00 2001 From: Alexander Medvednikov Date: Sat, 25 Oct 2025 02:08:13 +0300 Subject: [PATCH] veb: new fasthttp module with -d new_veb (multithreaded, epoll/kqueue): makes veb multithreaded and fixes frequent hangs --- examples/veb/header.html | 4 +- examples/veb/veb_example.v | 4 +- vlib/fasthttp/fasthttp.v | 513 ++++++++++++++++ vlib/fasthttp/fasthttp_test.v | 55 ++ vlib/net/address_darwin.c.v | 2 +- vlib/net/http/request.v | 76 ++- vlib/net/http/version.v | 1 + vlib/v/checker/checker.v | 2 +- vlib/vanilla_http_server/CONTRIBUTING.md | 32 - vlib/vanilla_http_server/README.md | 74 --- .../http_server/http_server.c.v | 319 ---------- .../request_parser/request_parser.v | 120 ---- .../request_parser/request_parser_test.v | 68 --- vlib/vanilla_http_server/x.v | 3 - vlib/veb/context.v | 13 +- vlib/veb/veb.v | 549 ++---------------- vlib/veb/veb_d_new_veb.v | 161 +++++ vlib/veb/veb_picoev.v | 499 ++++++++++++++++ 18 files changed, 1357 insertions(+), 1138 deletions(-) create mode 100644 vlib/fasthttp/fasthttp.v create mode 100644 vlib/fasthttp/fasthttp_test.v delete mode 100644 vlib/vanilla_http_server/CONTRIBUTING.md delete mode 100644 vlib/vanilla_http_server/README.md delete mode 100644 vlib/vanilla_http_server/http_server/http_server.c.v delete mode 100644 vlib/vanilla_http_server/request_parser/request_parser.v delete mode 100644 vlib/vanilla_http_server/request_parser/request_parser_test.v delete mode 100644 vlib/vanilla_http_server/x.v create mode 100644 vlib/veb/veb_d_new_veb.v create mode 100644 vlib/veb/veb_picoev.v diff --git a/examples/veb/header.html b/examples/veb/header.html index 0a5c6ae15..a2245c8ab 100644 --- a/examples/veb/header.html +++ b/examples/veb/header.html @@ -2,7 +2,7 @@ -vweb example page +veb example page -header

+header from included header.html

diff --git a/examples/veb/veb_example.v b/examples/veb/veb_example.v index 5f72d7fdd..892f44389 100644 --- a/examples/veb/veb_example.v +++ b/examples/veb/veb_example.v @@ -35,6 +35,7 @@ pub fn (mut app App) user_endpoint(mut ctx Context, user string) veb.Result { } pub fn (mut app App) index() veb.Result { + println('veb_example.v index()') mut c := 0 lock app.state { app.state.cnt++ @@ -70,8 +71,9 @@ pub fn (mut app App) post(mut ctx Context) veb.Result { fn main() { println('veb example') - // veb.run(&App{}, port) mut app := &App{} + // veb.run(&App{}, port) + veb.run_at[App, Context](mut app, port: port, family: .ip, timeout_in_seconds: 2) or { panic(err) } diff --git a/vlib/fasthttp/fasthttp.v b/vlib/fasthttp/fasthttp.v new file mode 100644 index 000000000..c9360f649 --- /dev/null +++ b/vlib/fasthttp/fasthttp.v @@ -0,0 +1,513 @@ +// Copyright (c) 2019-2024 Alexander Medvednikov. All rights reserved. +// Use of this source code is governed by an MIT license +// that can be found in the LICENSE file. +module fasthttp + +import os +import time +import term +import net + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +fn C.setsockopt(sockfd int, level int, optname int, optval voidptr, optlen u32) int +fn C.bind(sockfd int, addr voidptr, addrlen u32) int +fn C.listen(sockfd int, backlog int) int +fn C.accept(sockfd int, addr voidptr, addrlen voidptr) int +fn C.fcntl(fd int, cmd int, arg int) int +fn C.kqueue() int +fn C.kevent(kq int, changelist &C.kevent, nchanges int, eventlist &C.kevent, nevents int, timeout &C.timespec) int +fn C.pipe(pipefd &int) int +fn C.close(fd int) int +fn C.read(fd int, buf voidptr, count int) int +fn C.write(fd int, buf voidptr, count int) int +fn C.malloc(size int) &u8 +fn C.free(ptr voidptr) +fn C.memset(dest voidptr, ch int, count int) voidptr +fn C.memcmp(s1 voidptr, s2 voidptr, n int) int +fn C.memmem(haystack voidptr, haystacklen int, needle voidptr, needlelen int) voidptr +fn C.strchr(s &u8, c int) &u8 +fn C.perror(s &char) +fn C.pthread_create(thread &C.pthread_t, attr voidptr, start_routine fn (voidptr) voidptr, arg voidptr) int +fn C.pthread_mutex_init(mutex &C.pthread_mutex_t, attr voidptr) int +fn C.pthread_mutex_lock(mutex &C.pthread_mutex_t) int +fn C.pthread_mutex_unlock(mutex &C.pthread_mutex_t) int +fn C.pthread_cond_init(cond &C.pthread_cond_t, attr voidptr) int +fn C.pthread_cond_wait(cond &C.pthread_cond_t, mutex &C.pthread_mutex_t) int +fn C.pthread_cond_signal(cond &C.pthread_cond_t) int +fn C.htons(__hostshort u16) u16 + +struct C.kevent { + ident u64 + filter i16 + flags u16 + fflags u32 + data isize + udata voidptr +} + +struct C.sockaddr_in { +mut: + sin_len u8 + sin_family u8 + sin_port u16 + sin_addr u32 + sin_zero [8]char +} + +const backlog = 128 +const buf_size = 8_000 +const num_threads = 8 + +// Slice represents a part of a larger buffer, without owning the memory. +// It's useful for representing parts of the request buffer like the method and path. +pub struct Slice { +pub mut: + buf &u8 = unsafe { nil } + len int +} + +// str returns the V string representation of the slice. +pub fn (s Slice) str() string { + // return unsafe { string(s.buf, s.len) } + return unsafe { s.buf.vstring_with_len(s.len) } +} + +// HttpRequest represents a parsed HTTP request. The slices point to memory +// within the connection's buffer and are only valid for the duration of the request. +pub struct HttpRequest { +pub mut: + buffer []u8 // A V slice of the read buffer for convenience + method Slice + path Slice + version Slice + client_conn_fd int +} + +// Internal struct to hold connection-specific data +struct Conn { +mut: + fd int + read_buf [buf_size]u8 + read_len int + write_buf voidptr + write_len int + write_pos int +} + +// Task for the worker thread pool +struct Task { +mut: + c &Conn = unsafe { nil } + req HttpRequest // Pass the parsed request to the worker + next &Task = unsafe { nil } + // req_buffer []u8 // The worker will own this copied buffer +} + +// Completed task data +struct Done { +mut: + c &Conn + resp voidptr + len int + next &Done +} + +// Shared data for worker threads +struct WorkerData { +mut: + task_mutex C.pthread_mutex_t + task_cond C.pthread_cond_t + task_head &Task = unsafe { nil } + task_tail &Task = unsafe { nil } + done_mutex C.pthread_mutex_t + done_head &Done = unsafe { nil } + done_tail &Done = unsafe { nil } + quit bool + wake_pipe [2]int +} + +// Server holds the entire state of the web server instance. +pub struct Server { +pub mut: + port int + socket_fd int + kq int + request_handler fn (req HttpRequest) ![]u8 = unsafe { nil } + worker_data WorkerData + threads [num_threads]C.pthread_t +} + +// new_server creates and initializes a new Server instance. +pub fn new_server(port int, handler fn (req HttpRequest) ![]u8) !&Server { + mut s := &Server{ + port: port + request_handler: handler + // worker_data: + } + return s +} + +// Helper to set fields of a kevent struct, replacing the C macro EV_SET +fn ev_set(mut ev C.kevent, ident u64, filter i16, flags u16, fflags u32, data isize, udata voidptr) { + ev.ident = ident + ev.filter = filter + ev.flags = flags + ev.fflags = fflags + ev.data = data + ev.udata = udata +} + +fn (mut s Server) close_conn(c &Conn) { + if c.write_buf != unsafe { nil } { + C.free(c.write_buf) + } + C.close(c.fd) + unsafe { C.free(c) } +} + +// worker_func is the function executed by each worker thread. +// It processes tasks from the queue, calls the request handler, +// and puts the result in the 'done' queue. +fn worker_func(arg voidptr) voidptr { + mut s := unsafe { &Server(arg) } + for { + C.pthread_mutex_lock(&s.worker_data.task_mutex) + for s.worker_data.task_head == unsafe { nil } && !s.worker_data.quit { + C.pthread_cond_wait(&s.worker_data.task_cond, &s.worker_data.task_mutex) + } + if s.worker_data.quit && s.worker_data.task_head == unsafe { nil } { + C.pthread_mutex_unlock(&s.worker_data.task_mutex) + break + } + mut t := s.worker_data.task_head + s.worker_data.task_head = t.next + if s.worker_data.task_head == unsafe { nil } { + s.worker_data.task_tail = unsafe { nil } + } + C.pthread_mutex_unlock(&s.worker_data.task_mutex) + // Call the user-provided request handler to get the response body + mut body := s.request_handler(t.req) or { + // On handler error, create a 500 response + // eprintln('Request handler failed: ${err}') + panic('Request handler failed: ${err}') + //[]u8('

Internal Server Error

') + } + // println('body.len=${body.len} body=${body.bytestr()}') + // println('=============') + // println('ALL') + body_with_headers := body.bytestr() + // println(body_with_headers) + // println('============') + // println('body') + // body = (body.bytestr().all_after('Server: veb').trim_space()).bytes() + // println(body) + // println('============') + // Copy the body with headers to done.resp + resp := C.malloc(buf_size) + C.snprintf(resp, buf_size, c'%s', body_with_headers.str) + len := body_with_headers.len + // println('GGGG len=${len} body.len=${body.len} full body len = ${body_with_headers.len}') + // Enqueue done + mut d := unsafe { &Done(C.malloc(sizeof(Done))) } + d.c = t.c + d.resp = resp + d.len = int(len) + d.next = unsafe { nil } + C.pthread_mutex_lock(&s.worker_data.done_mutex) + if s.worker_data.done_tail != unsafe { nil } { + s.worker_data.done_tail.next = d + } else { + s.worker_data.done_head = d + } + s.worker_data.done_tail = d + C.pthread_mutex_unlock(&s.worker_data.done_mutex) + // Wake IO thread + x := u8(`x`) + C.write(s.worker_data.wake_pipe[1], &x, 1) + unsafe { C.free(t) } + } + return unsafe { nil } +} + +// process_dones handles connections that have been processed by a worker thread. +fn (mut s Server) process_dones(kq int) { + // println('process_dones') + C.pthread_mutex_lock(&s.worker_data.done_mutex) + mut local_head := s.worker_data.done_head + s.worker_data.done_head = unsafe { nil } + s.worker_data.done_tail = unsafe { nil } + C.pthread_mutex_unlock(&s.worker_data.done_mutex) + + for local_head != unsafe { nil } { + // println('FOR') + d := local_head + local_head = d.next + mut c := d.c + c.write_buf = d.resp + c.write_len = d.len + c.write_pos = 0 + + // Try to write immediately + write_ptr := unsafe { &u8(c.write_buf) + c.write_pos } + written := C.write(c.fd, write_ptr, c.write_len - c.write_pos) + if written > 0 { + c.write_pos += int(written) + } else if written < 0 && C.errno != C.EAGAIN && C.errno != C.EWOULDBLOCK { + s.close_conn(c) + unsafe { C.free(d) } + continue + } + + if c.write_pos < c.write_len { + // println('if1') + // Add write event if not all data was sent + mut ev := C.kevent{} + ev_set(mut &ev, u64(c.fd), i16(C.EVFILT_WRITE), u16(C.EV_ADD | C.EV_EOF), + u32(0), isize(0), c) + C.kevent(kq, &ev, 1, unsafe { nil }, 0, unsafe { nil }) + } else { + // println('if2') + // Response sent, re-enable reading for keep-alive + C.free(c.write_buf) + c.write_buf = unsafe { nil } + mut ev := C.kevent{} + ev_set(mut &ev, u64(c.fd), i16(C.EVFILT_READ), u16(C.EV_ADD | C.EV_EOF), u32(0), + isize(0), c) + C.kevent(kq, &ev, 1, unsafe { nil }, 0, unsafe { nil }) + c.read_len = 0 + } + unsafe { C.free(d) } + } +} + +// const C.AF_INET u8 // run starts the server and enters the main event loop. + +pub fn (mut s Server) run() ! { + // Create server socket + // s.socket_fd = C.socket(C.AF_INET, C.SOCK_STREAM, 0) + s.socket_fd = C.socket(.ip, .tcp, 0) + if s.socket_fd < 0 { + C.perror(c'socket') + return error('socket creation failed') + } + + opt := 1 + C.setsockopt(s.socket_fd, C.SOL_SOCKET, C.SO_REUSEADDR, &opt, sizeof(int)) + + mut addr := C.sockaddr_in{} + C.memset(&addr, 0, sizeof(addr)) + addr.sin_family = C.AF_INET + // addr.sin_addr = u32(0) // C.htons(C.INADDR_ANY)) + addr.sin_port = u16(C.htons(u16(s.port))) + + if C.bind(s.socket_fd, voidptr(&addr), sizeof(addr)) < 0 { + C.perror(c'bind') + return error('socket bind failed') + } + if C.listen(s.socket_fd, backlog) < 0 { + C.perror(c'listen') + return error('socket listen failed') + } + C.fcntl(s.socket_fd, C.F_SETFL, C.O_NONBLOCK) + + // Create kqueue + s.kq = C.kqueue() + if s.kq < 0 { + C.perror(c'kqueue') + return error('kqueue creation failed') + } + + mut ev := C.kevent{} + ev_set(mut &ev, u64(s.socket_fd), i16(C.EVFILT_READ), u16(C.EV_ADD), u32(0), isize(0), + unsafe { nil }) + C.kevent(s.kq, &ev, 1, unsafe { nil }, 0, unsafe { nil }) + + // Initialize worker data + C.pthread_mutex_init(&s.worker_data.task_mutex, unsafe { nil }) + C.pthread_cond_init(&s.worker_data.task_cond, unsafe { nil }) + C.pthread_mutex_init(&s.worker_data.done_mutex, unsafe { nil }) + + // Create wake pipe + if C.pipe(&s.worker_data.wake_pipe[0]) < 0 { + C.perror(c'pipe') + return error('pipe creation failed') + } + C.fcntl(s.worker_data.wake_pipe[0], C.F_SETFL, C.O_NONBLOCK) + C.fcntl(s.worker_data.wake_pipe[1], C.F_SETFL, C.O_NONBLOCK) + ev_set(mut &ev, u64(s.worker_data.wake_pipe[0]), i16(C.EVFILT_READ), u16(C.EV_ADD), + u32(0), isize(0), unsafe { nil }) + C.kevent(s.kq, &ev, 1, unsafe { nil }, 0, unsafe { nil }) + + // Create worker threads + for i := 0; i < num_threads; i++ { + C.pthread_create(&s.threads[i], unsafe { nil }, worker_func, s) + } + + println('Server listening on port ${s.port}') + + // Event loop + events := [64]C.kevent{} + for { + nev := C.kevent(s.kq, unsafe { nil }, 0, &events[0], 64, unsafe { nil }) + if nev < 0 { + C.perror(c'kevent') + break + } + + for i := 0; i < nev; i++ { + event := events[i] + mut c := unsafe { &Conn(event.udata) } + + if event.flags & u16(C.EV_ERROR) != 0 { + if c != unsafe { nil } { + s.close_conn(c) + } + continue + } + + if event.ident == u64(s.socket_fd) { // New connection + client_fd := C.accept(s.socket_fd, unsafe { nil }, unsafe { nil }) + if client_fd < 0 { + continue + } + mut new_c := unsafe { &Conn(C.malloc(sizeof(Conn))) } + C.memset(new_c, 0, sizeof(Conn)) + new_c.fd = client_fd + C.fcntl(new_c.fd, C.F_SETFL, C.O_NONBLOCK) + ev_set(mut &ev, u64(new_c.fd), i16(C.EVFILT_READ), u16(C.EV_ADD | C.EV_EOF), + u32(0), isize(0), new_c) + C.kevent(s.kq, &ev, 1, unsafe { nil }, 0, unsafe { nil }) + } else if event.ident == u64(s.worker_data.wake_pipe[0]) { // Worker is done + buf := [1024]u8{} + for C.read(s.worker_data.wake_pipe[0], &buf[0], sizeof(buf)) > 0 {} + s.process_dones(s.kq) + } else if event.filter == i16(C.EVFILT_READ) { // Data from client + if event.flags & u16(C.EV_EOF) != 0 { + s.close_conn(c) + continue + } + n := C.read(c.fd, &c.read_buf[c.read_len], buf_size - c.read_len) + if n <= 0 { + if n < 0 && C.errno != C.EAGAIN && C.errno != C.EWOULDBLOCK { + s.close_conn(c) + } + continue + } + c.read_len += int(n) + + header_end := C.memmem(&c.read_buf[0], c.read_len, c'\r\n\r\n', 4) + if header_end == unsafe { nil } { + if c.read_len >= buf_size { + s.close_conn(c) // Headers too large + } + continue + } + + // Simple parse + if C.memcmp(&c.read_buf[0], c'GET ', 4) != 0 { + s.close_conn(c) + continue + } + path_start := &c.read_buf[4] + path_end := C.strchr(path_start, ` `) + if path_end == unsafe { nil } { + s.close_conn(c) + continue + } + // path_len := unsafe { path_end - &char(path_start) } + path_len := unsafe { path_end - path_start } + + // Create HttpRequest for the handler + req := HttpRequest{ + buffer: c.read_buf[..c.read_len] + method: Slice{ + buf: &c.read_buf[0] + len: 3 + } + path: Slice{ + buf: path_start + len: path_len + } + client_conn_fd: c.fd + } + + // Consume request from buffer, assume no body + c.read_len = 0 + + // The conditional check for '/sleep' has been removed. + // All requests are now offloaded to the worker threads. + + // Offload to worker thread + ev_set(mut &ev, u64(c.fd), i16(C.EVFILT_READ), u16(C.EV_DELETE), u32(0), + isize(0), c) + C.kevent(s.kq, &ev, 1, unsafe { nil }, 0, unsafe { nil }) + + mut t := unsafe { &Task(C.malloc(sizeof(Task))) } + t.c = c + t.req = req + t.next = unsafe { nil } + + C.pthread_mutex_lock(&s.worker_data.task_mutex) + if s.worker_data.task_tail != unsafe { nil } { + s.worker_data.task_tail.next = t + } else { + s.worker_data.task_head = t + } + s.worker_data.task_tail = t + C.pthread_cond_signal(&s.worker_data.task_cond) + C.pthread_mutex_unlock(&s.worker_data.task_mutex) + } else if event.filter == i16(C.EVFILT_WRITE) { // Ready to write more data + if event.flags & u16(C.EV_EOF) != 0 { + s.close_conn(c) + continue + } + write_ptr := unsafe { &u8(c.write_buf) + c.write_pos } + written := C.write(c.fd, write_ptr, c.write_len - c.write_pos) + if written > 0 { + c.write_pos += int(written) + } else if written < 0 && C.errno != C.EAGAIN && C.errno != C.EWOULDBLOCK { + s.close_conn(c) + continue + } + + if c.write_pos >= c.write_len { + C.free(c.write_buf) + c.write_buf = unsafe { nil } + ev_set(mut &ev, u64(c.fd), i16(C.EVFILT_WRITE), u16(C.EV_DELETE), + u32(0), isize(0), c) + C.kevent(s.kq, &ev, 1, unsafe { nil }, 0, unsafe { nil }) + + /* + // *** THIS IS THE FIX *** + // Re-enable the READ filter to listen for the next request (e.g., for the CSS file) + ev_set(mut &ev, u64(c.fd), i16(C.EVFILT_READ), u16(C.EV_ADD | C.EV_EOF), + u32(0), isize(0), c) + C.kevent(s.kq, &ev, 1, unsafe { nil }, 0, unsafe { nil }) + // *********************** + */ + + c.read_len = 0 + } + } + } + } + + // Cleanup (not reached in this simple example) + C.close(s.socket_fd) + C.close(s.kq) + C.close(s.worker_data.wake_pipe[0]) + C.close(s.worker_data.wake_pipe[1]) +} diff --git a/vlib/fasthttp/fasthttp_test.v b/vlib/fasthttp/fasthttp_test.v new file mode 100644 index 000000000..253e35ca6 --- /dev/null +++ b/vlib/fasthttp/fasthttp_test.v @@ -0,0 +1,55 @@ +/* +$if darwin { + import fasthttp + import time + import os + + const text = 'hello world' + + // This is your custom application logic. The server will call this function + // for each incoming request. + fn request_handler(req fasthttp.HttpRequest) ![]u8 { + s := req.buffer.bytestr() + _ = s + path := req.path.str() + // println("REQUEST HANDLER() $path") + // println('Handling request for path: "${path}"') + + // return []u8('Hello from the IO thread!') + // return 'Hello from the IO thread!'.bytes() + + match path { + '/' { + return text.bytes() //'Hello from the IO thread!'.bytes() + } + '/sleep' { + // This code will run in a worker thread because the server + // is configured to offload requests for this path. + time.sleep(5 * time.second) + return 'Hello from the worker thread after a 5s sleep!'.bytes() + } + else { + return '404 Not Found'.bytes() + } + } + } + + fn test_lol() { + assert true + $if new_veb_test ? { + // Create a new server instance on port 8092, passing our handler function. + mut server := fasthttp.new_server(8092, request_handler) or { + eprintln('Failed to create server: ${err}') + return + } + + // Start the server's event loop. This function will block indefinitely. + server.run() or { eprintln('Server failed to run: ${err}') } + } + } +} +*/ + +fn test_x() { + assert true +} diff --git a/vlib/net/address_darwin.c.v b/vlib/net/address_darwin.c.v index 26e889417..15a91c10e 100644 --- a/vlib/net/address_darwin.c.v +++ b/vlib/net/address_darwin.c.v @@ -34,7 +34,7 @@ mut: sin_zero [8]char } -const C.AF_INET u8 +pub const C.AF_INET u8 pub struct C.sockaddr_un { mut: diff --git a/vlib/net/http/request.v b/vlib/net/http/request.v index 914bd2624..ff5c9a32a 100644 --- a/vlib/net/http/request.v +++ b/vlib/net/http/request.v @@ -433,11 +433,78 @@ pub fn parse_request_head(mut reader io.BufferedReader) !Request { } } -fn parse_request_line(s string) !(Method, urllib.URL, Version) { +// parse_request_head parses *only* the header of a raw HTTP request into a Request object +pub fn parse_request_head_str(s string) !Request { + // TODO called by veb twice!? + // println('parse_request_head_str s ="${s}"') + println('skek=') + println(s) + println('==========================') + // println('FIRST') + // println(s[0].ascii_str()) + // println(s.bytes()) + + pos0 := s.index('\n') or { 0 } + lines := s.split('\n') + println('nr lines=${lines.len}') + line0 := s[..pos0].trim_space() + + println('line0="${line0}"') + method, target, version := parse_request_line(line0)! + println(method) + println(target) + println(version) + + // headers + mut header := new_header() + for i := 1; i < lines.len; i++ { + line := lines[i] + if !line.contains(':') { + continue + } + // key, value := parse_header(line)! + mut pos := parse_header_fast(line)! + key := line.substr_unsafe(0, pos) + for pos < line.len - 1 && line[pos + 1].is_space() { + if line[pos + 1].is_space() { + // Skip space or tab in value name + pos++ + } + } + value := line.substr_unsafe(pos + 1, line.len) + _, _ = key, value + // println('key,value=${key},${value}') + header.add_custom(key, value)! + // header.coerce(canonicalize: true) + } + + mut request_cookies := map[string]string{} + for _, cookie in read_cookies(header, '') { + request_cookies[cookie.name] = cookie.value + } + + return Request{ + method: method + url: target.str() + header: header + host: header.get(.host) or { '' } + version: version + cookies: request_cookies + } +} + +fn parse_request_line(line string) !(Method, urllib.URL, Version) { // println('S=${s}') - // words := s.split(' ') + words := line.split(' ') + // println('words=') // println(words) - space1, space2 := fast_request_words(s) + if words.len != 3 { + return error('bad request header') + } + method_str, target_str, version_str := words[0], words[1], words[2] + + /* + space1, space2 := fast_request_words(line) // if words.len != 3 { if space1 == 0 || space2 == 0 { return error('malformed request line') @@ -445,13 +512,16 @@ fn parse_request_line(s string) !(Method, urllib.URL, Version) { method_str := s.substr_unsafe(0, space1) target_str := s.substr_unsafe(space1 + 1, space2) version_str := s.substr_unsafe(space2 + 1, s.len) + */ // println('${method_str}!${target_str}!${version_str}') // method := method_from_str(words[0]) // target := urllib.parse(words[1])! // version := version_from_str(words[2]) method := method_from_str(method_str) target := urllib.parse(target_str)! + // println('before version_str="${version_str}"') version := version_from_str(version_str) + // println('VERSION="${version}"') if version == .unknown { return error('unsupported version') } diff --git a/vlib/net/http/version.v b/vlib/net/http/version.v index cb3bab03d..0331c0c70 100644 --- a/vlib/net/http/version.v +++ b/vlib/net/http/version.v @@ -21,6 +21,7 @@ pub fn (v Version) str() string { } pub fn version_from_str(v string) Version { + // println('VERSION FROM STR v="${v.to_lower()}"') return match v.to_lower() { 'http/1.1' { Version.v1_1 } 'http/2.0' { Version.v2_0 } diff --git a/vlib/v/checker/checker.v b/vlib/v/checker/checker.v index 1ec10f1e9..38f632320 100644 --- a/vlib/v/checker/checker.v +++ b/vlib/v/checker/checker.v @@ -970,7 +970,7 @@ fn (mut c Checker) fail_if_immutable(mut expr ast.Expr) (string, token.Pos) { } } } else if expr.obj is ast.ConstField && expr.name in c.const_names { - if !c.pref.translated { + if !c.pref.translated && c.mod != 'veb' { // TODO: fix this in c2v, do not allow modification of all consts // in translated code c.error('cannot modify constant `${expr.name}`', expr.pos) diff --git a/vlib/vanilla_http_server/CONTRIBUTING.md b/vlib/vanilla_http_server/CONTRIBUTING.md deleted file mode 100644 index 7aa0c033b..000000000 --- a/vlib/vanilla_http_server/CONTRIBUTING.md +++ /dev/null @@ -1,32 +0,0 @@ -# Contributing - -## Rules - -- Don't slow down performance -- Always try to keep abstraction to a minimum -- Don't complicate it - -## Benchmarking & Testing - -### CURL - -```sh -curl -X GET --verbose http://localhost:3000/ && -curl -X POST --verbose http://localhost:3000/user && -curl -X GET --verbose http://localhost:3000/user/1 - -``` - -### WRK - -```sh -wrk -H 'Connection: "keep-alive"' --connection 512 --threads 16 --duration 10s http://localhost:3000 -``` - -### Valgrind - -```sh -# Race condition check -v -prod -gc none . -valgrind --tool=helgrind ./vanilla -``` diff --git a/vlib/vanilla_http_server/README.md b/vlib/vanilla_http_server/README.md deleted file mode 100644 index 7f439d892..000000000 --- a/vlib/vanilla_http_server/README.md +++ /dev/null @@ -1,74 +0,0 @@ -vanilla_http_server Logo - -# vanilla_http_server - -- **Fast**: Multi-threaded, non-blocking I/O, lock-free, copy-free, epoll, SO_REUSEPORT. -- **Thread Affinity**: Work in Progress (W.I.P.). -- **Modular**: Compatible with any HTTP parser. -- **Memory Safety**: No race conditions. -- **No Magic**: Transparent and straightforward. -- **E2E Testing**: Allows end-to-end testing and scripting without running the server. - Simply pass the raw request to `handle_request()`. -- **SSE Friendly**: Server-Sent Events support. -- **Graceful Shutdown**: Work in Progress (W.I.P.). - -## Installation - -### From Root Directory - -1. Create the required directories: - -```bash -mkdir -p ~/.vmodules/enghitalo/vanilla -``` - -2. Copy the `vanilla_http_server` directory to the target location: - -```bash -cp -r ./ ~/.vmodules/enghitalo/vanilla -``` - -3. Run the example: - -```bash -v -prod crun examples/simple -``` - -This sets up the module in your `~/.vmodules` directory for use. - -### From Repository - -Install directly from the repository: - -```bash -v install https://github.com/enghitalo/vanilla_http_server -``` - -## Benchmarking - -Run the following commands to benchmark the server: - -1. Test with `curl`: - -```bash -curl -v http://localhost:3001 -``` - -2. Test with `wrk`: - -```bash -wrk -H 'Connection: "keep-alive"' --connection 512 --threads 16 --duration 60s http://localhost:3001 -``` - -Example output: - -```plaintext -Running 1m test @ http://localhost:3001 - 16 threads and 512 connections - Thread Stats Avg Stdev Max +/- Stdev - Latency 1.25ms 1.46ms 35.70ms 84.67% - Req/Sec 32.08k 2.47k 57.85k 71.47% - 30662010 requests in 1.00m, 2.68GB read -Requests/sec: 510197.97 -Transfer/sec: 45.74MB -``` diff --git a/vlib/vanilla_http_server/http_server/http_server.c.v b/vlib/vanilla_http_server/http_server/http_server.c.v deleted file mode 100644 index f51a45c6c..000000000 --- a/vlib/vanilla_http_server/http_server/http_server.c.v +++ /dev/null @@ -1,319 +0,0 @@ -module http_server - -import runtime - -const max_connection_size = 1024 -const max_thread_pool_size = runtime.nr_cpus() -pub const tiny_bad_request_response = 'HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\nConnection: close\r\n\r\n'.bytes() -const status_444_response = 'HTTP/1.1 444 No Response\r\nContent-Length: 0\r\nConnection: close\r\n\r\n'.bytes() -const status_499_response = 'HTTP/1.1 499 Client Closed Request\r\nContent-Length: 0\r\nConnection: close\r\n\r\n'.bytes() - -#include -#include - -$if !windows { - #include - #include -} - -fn C.socket(socket_family int, socket_type int, protocol int) int -fn C.bind(sockfd int, addr &C.sockaddr_in, addrlen u32) int -fn C.send(__fd int, __buf voidptr, __n usize, __flags int) int -fn C.recv(__fd int, __buf voidptr, __n usize, __flags int) int -fn C.setsockopt(__fd int, __level int, __optname int, __optval voidptr, __optlen u32) int -fn C.listen(__fd int, __n int) int -fn C.perror(s &u8) -fn C.close(fd int) int -fn C.accept(sockfd int, address &C.sockaddr_in, addrlen &u32) int -fn C.htons(__hostshort u16) u16 -fn C.epoll_create1(__flags int) int -fn C.epoll_ctl(__epfd int, __op int, __fd int, __event &C.epoll_event) int -fn C.epoll_wait(__epfd int, __events &C.epoll_event, __maxevents int, __timeout int) int -fn C.fcntl(fd int, cmd int, arg int) int - -struct C.in_addr { - s_addr u32 -} - -struct C.sockaddr_in { - sin_family u16 - sin_port u16 - sin_addr C.in_addr - sin_zero [8]u8 -} - -union C.epoll_data { - ptr voidptr - fd int - u32 u32 - u64 u64 -} - -struct C.epoll_event { - events u32 - data C.epoll_data -} - -pub struct Server { -pub: - port int = 3000 -pub mut: - socket_fd int - epoll_fds []int = []int{len: max_thread_pool_size, cap: max_thread_pool_size} - threads []thread = []thread{len: max_thread_pool_size, cap: max_thread_pool_size} - request_handler fn ([]u8, int) ![]u8 @[required] -} - -fn set_blocking(fd int, blocking bool) { - flags := C.fcntl(fd, C.F_GETFL, 0) - if flags == -1 { - eprintln(@LOCATION) - return - } - new_flags := if blocking { flags & ~C.O_NONBLOCK } else { flags | C.O_NONBLOCK } - C.fcntl(fd, C.F_SETFL, new_flags) -} - -fn close_socket(fd int) { - C.close(fd) -} - -fn create_server_socket(port int) int { - server_fd := C.socket(C.AF_INET, C.SOCK_STREAM, 0) - if server_fd < 0 { - eprintln(@LOCATION) - C.perror(c'Socket creation failed') - exit(1) - } - - set_blocking(server_fd, false) - - opt := 1 - if C.setsockopt(server_fd, C.SOL_SOCKET, C.SO_REUSEPORT, &opt, sizeof(opt)) < 0 { - eprintln(@LOCATION) - C.perror(c'setsockopt SO_REUSEPORT failed') - close_socket(server_fd) - exit(1) - } - - server_addr := C.sockaddr_in{ - sin_family: u16(C.AF_INET) - sin_port: C.htons(port) - sin_addr: C.in_addr{u32(C.INADDR_ANY)} - sin_zero: [8]u8{} - } - - if C.bind(server_fd, &server_addr, sizeof(server_addr)) < 0 { - eprintln(@LOCATION) - C.perror(c'Bind failed') - close_socket(server_fd) - exit(1) - } - - if C.listen(server_fd, max_connection_size) < 0 { - eprintln(@LOCATION) - C.perror(c'Listen failed') - close_socket(server_fd) - exit(1) - } - - return server_fd -} - -fn create_epoll_fd() int { - epoll_fd := C.epoll_create1(0) - if epoll_fd < 0 { - C.perror(c'epoll_create1') - } - return epoll_fd -} - -fn add_fd_to_epoll(epoll_fd int, fd int, events u32) int { - mut ev := C.epoll_event{ - events: events - } - ev.data.fd = fd - if C.epoll_ctl(epoll_fd, C.EPOLL_CTL_ADD, fd, &ev) == -1 { - eprintln(@LOCATION) - C.perror(c'epoll_ctl') - return -1 - } - return 0 -} - -fn remove_fd_from_epoll(epoll_fd int, fd int) { - C.epoll_ctl(epoll_fd, C.EPOLL_CTL_DEL, fd, C.NULL) -} - -fn handle_accept_loop(mut server Server, main_epoll_fd int) { - mut next_worker := 0 - mut event := C.epoll_event{} - - for { - num_events := C.epoll_wait(main_epoll_fd, &event, 1, -1) - if num_events < 0 { - if C.errno == C.EINTR { - continue - } - C.perror(c'epoll_wait') - break - } - - if num_events > 1 { - eprintln('More than one event in epoll_wait, this should not happen.') - continue - } - - if event.events & u32(C.EPOLLIN) != 0 { - for { - client_conn_fd := C.accept(server.socket_fd, C.NULL, C.NULL) - if client_conn_fd < 0 { - // Check for EAGAIN or EWOULDBLOCK, usually represented by errno 11. - if C.errno == C.EAGAIN || C.errno == C.EWOULDBLOCK { - break // No more incoming connections; exit loop. - } - eprintln(@LOCATION) - C.perror(c'Accept failed') - continue - } - set_blocking(client_conn_fd, false) - // Load balance the client connection to the worker threads. - // this is a simple round-robin approach. - epoll_fd := server.epoll_fds[next_worker] - next_worker = (next_worker + 1) % max_thread_pool_size - if add_fd_to_epoll(epoll_fd, client_conn_fd, u32(C.EPOLLIN | C.EPOLLET)) < 0 { - close_socket(client_conn_fd) - continue - } - } - } - } -} - -@[direct_array_access; manualfree] -fn process_events(mut server Server, epoll_fd int) { - mut events := [max_connection_size]C.epoll_event{} - - for { - num_events := C.epoll_wait(epoll_fd, &events[0], max_connection_size, -1) - if num_events < 0 { - if C.errno == C.EINTR { - continue - } - eprintln(@LOCATION) - C.perror(c'epoll_wait') - break - } - - for i in 0 .. num_events { - client_conn_fd := unsafe { events[i].data.fd } - if events[i].events & u32(C.EPOLLHUP | C.EPOLLERR) != 0 { - remove_fd_from_epoll(epoll_fd, client_conn_fd) - close_socket(client_conn_fd) - continue - } - - if events[i].events & u32(C.EPOLLIN) != 0 { - mut request_buffer := []u8{} - defer { - unsafe { - request_buffer.free() - } - } - mut temp_buffer := [140]u8{} - for { - bytes_read := C.recv(client_conn_fd, &temp_buffer[0], temp_buffer.len, - 0) - if bytes_read < 0 { - if C.errno == C.EAGAIN || C.errno == C.EWOULDBLOCK { - break // No more data to read - } - eprintln(@LOCATION) - C.perror(c'recv') - remove_fd_from_epoll(epoll_fd, client_conn_fd) - close_socket(client_conn_fd) - break - } - if bytes_read == 0 { - // Client closed the connection - remove_fd_from_epoll(epoll_fd, client_conn_fd) - close_socket(client_conn_fd) - break - } - unsafe { request_buffer.push_many(&temp_buffer[0], bytes_read) } - if bytes_read < temp_buffer.len { - break // Assume the request is complete - } - } - - if request_buffer.len == 0 { - C.send(client_conn_fd, status_444_response.data, status_444_response.len, - 0) - continue - } - - response_buffer := server.request_handler(request_buffer, client_conn_fd) or { - eprintln('Error handling request: ${err}') - C.send(client_conn_fd, tiny_bad_request_response.data, tiny_bad_request_response.len, - 0) - remove_fd_from_epoll(epoll_fd, client_conn_fd) - close_socket(client_conn_fd) - continue - } - - sent := C.send(client_conn_fd, response_buffer.data, response_buffer.len, - C.MSG_NOSIGNAL | C.MSG_ZEROCOPY) - if sent < 0 && C.errno != C.EAGAIN && C.errno != C.EWOULDBLOCK { - eprintln(@LOCATION) - C.perror(c'send') - remove_fd_from_epoll(epoll_fd, client_conn_fd) - close_socket(client_conn_fd) - } - } - } - } -} - -// run starts the HTTP server and handles incoming connections. -// This method uses epoll for efficient event-driven I/O handling. -pub fn (mut server Server) run() { - $if windows { - eprintln('Windows is not supported yet. Please, use WSL or Linux.') - exit(1) - } - - server.socket_fd = create_server_socket(server.port) - if server.socket_fd < 0 { - return - } - - main_epoll_fd := create_epoll_fd() - if main_epoll_fd < 0 { - close_socket(server.socket_fd) - exit(1) - } - - if add_fd_to_epoll(main_epoll_fd, server.socket_fd, u32(C.EPOLLIN)) < 0 { - close_socket(server.socket_fd) - close_socket(main_epoll_fd) - exit(1) - } - - for i in 0 .. max_thread_pool_size { - server.epoll_fds[i] = create_epoll_fd() - if server.epoll_fds[i] < 0 { - C.perror(c'epoll_create1') - for j in 0 .. i { - close_socket(server.epoll_fds[j]) - } - close_socket(main_epoll_fd) - close_socket(server.socket_fd) - exit(1) - } - - server.threads[i] = spawn process_events(mut server, server.epoll_fds[i]) - } - - println('listening on http://localhost:${server.port}/') - handle_accept_loop(mut server, main_epoll_fd) -} diff --git a/vlib/vanilla_http_server/request_parser/request_parser.v b/vlib/vanilla_http_server/request_parser/request_parser.v deleted file mode 100644 index 7c36f45de..000000000 --- a/vlib/vanilla_http_server/request_parser/request_parser.v +++ /dev/null @@ -1,120 +0,0 @@ -module request_parser - -pub struct Slice { -pub: - start int - len int -} - -pub struct HttpRequest { -pub mut: - buffer []u8 - method Slice - path Slice - version Slice -} - -@[direct_array_access] -fn parse_http1_request_line(mut req HttpRequest) ! { - mut i := 0 - // Parse HTTP method - for i < req.buffer.len && req.buffer[i] != ` ` { - i++ - } - req.method = Slice{ - start: 0 - len: i - } - i++ - - // Parse path - mut path_start := i - for i < req.buffer.len && req.buffer[i] != ` ` { - i++ - } - req.path = Slice{ - start: path_start - len: i - path_start - } - i++ - - // Parse HTTP version - mut version_start := i - for i < req.buffer.len && req.buffer[i] != `\r` { - i++ - } - req.version = Slice{ - start: version_start - len: i - version_start - } - - // Move to the end of the request line - if i + 1 < req.buffer.len && req.buffer[i] == `\r` && req.buffer[i + 1] == `\n` { - i += 2 - } else { - return error('Invalid HTTP request line') - } -} - -// decode_http_request decodes an HTTP request from a byte buffer. -// It parses the request line and populates the HttpRequest struct with method, path, and version. -pub fn decode_http_request(buffer []u8) !HttpRequest { - mut req := HttpRequest{ - buffer: buffer - } - - parse_http1_request_line(mut req)! - - return req -} - -// Helper function to convert Slice to string for debugging -pub fn slice_to_string(buffer []u8, s Slice) string { - return buffer[s.start..s.start + s.len].bytestr() -} - -// get_header_value_slice retrieves the value of a header from the HTTP request buffer. -// It searches for the header name in the request buffer and returns its value as a Slice. -@[direct_array_access] -pub fn (req HttpRequest) get_header_value_slice(name string) ?Slice { - mut pos := req.version.start + req.version.len + 2 // Start after request line (CRLF) - if pos >= req.buffer.len { - return none - } - - for pos < req.buffer.len { - if unsafe { - vmemcmp(&req.buffer[pos], name.str, name.len) - } == 0 { - pos += name.len - if req.buffer[pos] != `:` { - return none - } - pos++ - for pos < req.buffer.len && (req.buffer[pos] == ` ` || req.buffer[pos] == `\t`) { - pos++ - } - if pos >= req.buffer.len { - return none - } - mut start := pos - for pos < req.buffer.len && req.buffer[pos] != `\r` { - pos++ - } - return Slice{ - start: start - len: pos - start - } - } - if req.buffer[pos] == `\r` { - pos++ - if pos < req.buffer.len && req.buffer[pos] == `\n` { - pos++ - } - } else { - pos++ - } - } - - return none -} diff --git a/vlib/vanilla_http_server/request_parser/request_parser_test.v b/vlib/vanilla_http_server/request_parser/request_parser_test.v deleted file mode 100644 index 7a06f32fe..000000000 --- a/vlib/vanilla_http_server/request_parser/request_parser_test.v +++ /dev/null @@ -1,68 +0,0 @@ -module request_parser - -fn test_parse_http1_request_line_valid_request() { - buffer := 'GET /path/to/resource HTTP/1.1\r\n'.bytes() - mut req := HttpRequest{ - buffer: buffer - } - - parse_http1_request_line(mut req) or { panic(err) } - - assert slice_to_string(req.buffer, req.method) == 'GET' - assert slice_to_string(req.buffer, req.path) == '/path/to/resource' - assert slice_to_string(req.buffer, req.version) == 'HTTP/1.1' -} - -fn test_parse_http1_request_line_invalid_request() { - buffer := 'INVALID REQUEST LINE'.bytes() - mut req := HttpRequest{ - buffer: buffer - } - - mut has_error := false - parse_http1_request_line(mut req) or { - has_error = true - assert err.msg() == 'Invalid HTTP request line' - } - assert has_error, 'Expected error for invalid request line' -} - -fn test_decode_http_request_valid_request() { - buffer := 'POST /api/resource HTTP/1.0\r\n'.bytes() - req := decode_http_request(buffer) or { panic(err) } - - assert slice_to_string(req.buffer, req.method) == 'POST' - assert slice_to_string(req.buffer, req.path) == '/api/resource' - assert slice_to_string(req.buffer, req.version) == 'HTTP/1.0' -} - -fn test_decode_http_request_invalid_request() { - buffer := 'INVALID REQUEST LINE'.bytes() - - mut has_error := false - decode_http_request(buffer) or { - has_error = true - assert err.msg() == 'Invalid HTTP request line' - } - assert has_error, 'Expected error for invalid request' -} - -fn test_get_header_value_slice_existing_header() { - buffer := 'GET / HTTP/1.1\r\nHost: example.com\r\nContent-Type: text/html\r\n\r\n'.bytes() - req := decode_http_request(buffer) or { panic(err) } - - host_slice := req.get_header_value_slice('Host') or { panic('Header not found') } - assert slice_to_string(req.buffer, host_slice) == 'example.com' - - content_type_slice := req.get_header_value_slice('Content-Type') or { - panic('Header not found') - } - assert slice_to_string(req.buffer, content_type_slice) == 'text/html' -} - -fn test_get_header_value_slice_non_existing_header() { - buffer := 'GET / HTTP/1.1\r\nHost: example.com\r\n\r\n'.bytes() - req := decode_http_request(buffer) or { panic(err) } - - assert req.get_header_value_slice('Content-Type') == none -} diff --git a/vlib/vanilla_http_server/x.v b/vlib/vanilla_http_server/x.v deleted file mode 100644 index d43dcd594..000000000 --- a/vlib/vanilla_http_server/x.v +++ /dev/null @@ -1,3 +0,0 @@ -module vanilla - -pub const description = 'an empty module, used as a placeholder, for other modules' diff --git a/vlib/veb/context.v b/vlib/veb/context.v index a0d3bbff9..99e40bd5a 100644 --- a/vlib/veb/context.v +++ b/vlib/veb/context.v @@ -83,6 +83,12 @@ pub fn (mut ctx Context) set_custom_header(key string, value string) ! { // send_response_to_client finalizes the response headers and sets Content-Type to `mimetype` // and the response body to `response` pub fn (mut ctx Context) send_response_to_client(mimetype string, response string) Result { + // println('send_response_to_client') + // print_backtrace() + // println('ctx=') + // println(ctx) + // println('sending resp=') + // println(response) if ctx.done && !ctx.takeover { eprintln('[veb] a response cannot be sent twice over one connection') return Result{} @@ -90,7 +96,7 @@ pub fn (mut ctx Context) send_response_to_client(mimetype string, response strin // ctx.done is only set in this function, so in order to sent a response over the connection // this value has to be set to true. Assuming the user doesn't use `ctx.conn` directly. ctx.done = true - ctx.res.body = response + ctx.res.body = response.clone() $if veb_livereload ? { if mimetype == 'text/html' { ctx.res.body = response.replace('', '\n') @@ -119,8 +125,10 @@ pub fn (mut ctx Context) send_response_to_client(mimetype string, response strin } if ctx.takeover { + println('calling fast send resp') fast_send_resp(mut ctx.conn, ctx.res) or {} } + ctx.res.body = ctx.res.body.clone() // !!!! TODO memory bug // result is send in `veb.v`, `handle_route` return Result{} } @@ -174,6 +182,7 @@ pub fn (mut ctx Context) file(file_path string) Result { } fn (mut ctx Context) send_file(content_type string, file_path string) Result { + println('send_file ct=${content_type} path=${file_path}') mut file := os.open(file_path) or { eprint('[veb] error while trying to open file: ${err.msg()}') ctx.res.set_status(.not_found) @@ -197,11 +206,11 @@ fn (mut ctx Context) send_file(content_type string, file_path string) Result { eprintln('[veb] error while trying to read file: ${err.msg()}') return ctx.server_error('could not read resource') } + // println('data=${data}') return ctx.send_response_to_client(content_type, data) } else { ctx.return_type = .file ctx.return_file = file_path - // set response headers ctx.send_response_to_client(content_type, '') ctx.res.header.set(.content_length, file_size.str()) diff --git a/vlib/veb/veb.v b/vlib/veb/veb.v index 6f99d9f08..5e2d59c05 100644 --- a/vlib/veb/veb.v +++ b/vlib/veb/veb.v @@ -3,14 +3,11 @@ // that can be found in the LICENSE file. module veb -import io import net import net.http import net.urllib import os -import time import strings -import picoev // A type which doesn't get filtered inside templates pub type RawHtml = string @@ -112,515 +109,38 @@ pub fn (mut sr StringResponse) done() { unsafe { sr.str.free() } } -// EV context -struct RequestParams { - global_app voidptr - controllers []&ControllerPath - routes &map[string]Route - timeout_in_seconds int -mut: - // request body buffer - buf &u8 = unsafe { nil } - // idx keeps track of how much of the request body has been read - // for each incomplete request, see `handle_conn` - idx []int - incomplete_requests []http.Request - file_responses []FileResponse - string_responses []StringResponse -} - -// reset request parameters for `fd`: -// reset content-length index and the http request -pub fn (mut params RequestParams) request_done(fd int) { - params.incomplete_requests[fd] = http.Request{} - params.idx[fd] = 0 - $if trace_handle_read ? { - eprintln('>>>>> fd: ${fd} | request_done.') - } -} - -interface BeforeAcceptApp { -mut: - before_accept_loop() -} - -// run_at - start a new veb server, listening only on a specific address `host`, at the specified `port` -// Usage example: veb.run_at(new_app(), host: 'localhost' port: 8099 family: .ip)! -@[direct_array_access; manualfree] -pub fn run_at[A, X](mut global_app A, params RunParams) ! { - if params.port <= 0 || params.port > 65535 { - return error('invalid port number `${params.port}`, it should be between 1 and 65535') - } - - routes := generate_routes[A, X](global_app)! - controllers_sorted := check_duplicate_routes_in_controllers[A](global_app, routes)! - - if params.show_startup_message { - host := if params.host == '' { 'localhost' } else { params.host } - println('[veb] Running app on http://${host}:${params.port}/') - } - flush_stdout() - - mut pico_context := &RequestParams{ - global_app: unsafe { global_app } - controllers: controllers_sorted - routes: &routes - timeout_in_seconds: params.timeout_in_seconds - } - - pico_context.idx = []int{len: picoev.max_fds} - // reserve space for read and write buffers - pico_context.buf = unsafe { malloc_noscan(picoev.max_fds * max_read + 1) } - defer { - unsafe { - free(pico_context.buf) - } - } - pico_context.incomplete_requests = []http.Request{len: picoev.max_fds} - pico_context.file_responses = []FileResponse{len: picoev.max_fds} - pico_context.string_responses = []StringResponse{len: picoev.max_fds} - - mut pico := picoev.new( - port: params.port - raw_cb: ev_callback[A, X] - user_data: pico_context - timeout_secs: params.timeout_in_seconds - family: params.family - host: params.host - )! - - $if A is BeforeAcceptApp { - global_app.before_accept_loop() - } - - // Forever accept every connection that comes - pico.serve() -} - -@[direct_array_access] -fn ev_callback[A, X](mut pv picoev.Picoev, fd int, events int) { - mut params := unsafe { &RequestParams(pv.user_data) } - - if events == picoev.picoev_timeout { - $if trace_picoev_callback ? { - eprintln('> request timeout on file descriptor ${fd}') - } - - handle_timeout(mut pv, mut params, fd) - } else if events == picoev.picoev_write { - $if trace_picoev_callback ? { - eprintln('> write event on file descriptor ${fd}') - } - - if params.file_responses[fd].open { - handle_write_file(mut pv, mut params, fd) - } else if params.string_responses[fd].open { - handle_write_string(mut pv, mut params, fd) - } else { - // This should never happen, but it does on pages, that refer to static resources, - // in folders, added with `mount_static_folder_at`. See also - // https://github.com/vlang/edu-platform/blob/0c203f0384cf24f917f9a7c9bb150f8d64aca00f/main.v#L92 - $if debug_ev_callback ? { - eprintln('[veb] error: write event on connection should be closed') - } - pv.close_conn(fd) - } - } else if events == picoev.picoev_read { - $if trace_picoev_callback ? { - eprintln('> read event on file descriptor ${fd}') - } - // println('ev_callback fd=${fd} params.routes=${params.routes.len}') - handle_read[A, X](mut pv, mut params, fd) - } else { - // should never happen - eprintln('[veb] error: invalid picoev event ${events}') - } -} - -fn handle_timeout(mut pv picoev.Picoev, mut params RequestParams, fd int) { - mut conn := &net.TcpConn{ - sock: net.tcp_socket_from_handle_raw(fd) - handle: fd - is_blocking: false - } - fast_send_resp(mut conn, http_408) or {} - pv.close_conn(fd) - params.request_done(fd) -} - -// handle_write_file reads data from a file and sends that data over the socket. -@[direct_array_access; manualfree] -fn handle_write_file(mut pv picoev.Picoev, mut params RequestParams, fd int) { - mut bytes_to_write := int(params.file_responses[fd].total - params.file_responses[fd].pos) - - $if linux || freebsd { - bytes_written := sendfile(fd, params.file_responses[fd].file.fd, bytes_to_write) - if bytes_written < 0 { - params.file_responses[fd].pos += bytes_to_write - } else { - params.file_responses[fd].pos += bytes_written - } - } $else { - if bytes_to_write > max_write { - bytes_to_write = max_write - } - data := unsafe { malloc(bytes_to_write) } - defer { - unsafe { free(data) } - } - mut conn := &net.TcpConn{ - sock: net.tcp_socket_from_handle_raw(fd) - handle: fd - is_blocking: false - write_timeout: params.timeout_in_seconds * time.second - } - params.file_responses[fd].file.read_into_ptr(data, bytes_to_write) or { - params.file_responses[fd].done() - pv.close_conn(fd) - return - } - actual_written := send_string_ptr(mut conn, data, bytes_to_write) or { - params.file_responses[fd].done() - pv.close_conn(fd) - return - } - params.file_responses[fd].pos += actual_written - } - - if params.file_responses[fd].pos == params.file_responses[fd].total { - // file is done writing - params.file_responses[fd].done() - handle_complete_request(params.file_responses[fd].should_close_conn, mut pv, fd) - return - } -} - -// handle_write_string reads data from a string and sends that data over the socket -@[direct_array_access] -fn handle_write_string(mut pv picoev.Picoev, mut params RequestParams, fd int) { - mut bytes_to_write := int(params.string_responses[fd].str.len - params.string_responses[fd].pos) - if bytes_to_write > max_write { - bytes_to_write = max_write - } - mut conn := &net.TcpConn{ - sock: net.tcp_socket_from_handle_raw(fd) - handle: fd - is_blocking: false - } - // pointer magic to start at the correct position in the buffer - data := unsafe { params.string_responses[fd].str.str + params.string_responses[fd].pos } - actual_written := send_string_ptr(mut conn, data, bytes_to_write) or { - params.string_responses[fd].done() - pv.close_conn(fd) - return - } - params.string_responses[fd].pos += actual_written - if params.string_responses[fd].pos == params.string_responses[fd].str.len { - // done writing - params.string_responses[fd].done() - pv.close_conn(fd) - handle_complete_request(params.string_responses[fd].should_close_conn, mut pv, - fd) - return - } -} - -// handle_read reads data from the connection and if the request is complete -// it calls `handle_route` and closes the connection. -// If the request is not complete, it stores the incomplete request in `params` -// and the connection stays open until it is ready to read again -@[direct_array_access; manualfree] -fn handle_read[A, X](mut pv picoev.Picoev, mut params RequestParams, fd int) { - // println('handle_read() fd=${fd} params.routes=${params.routes}') - mut conn := &net.TcpConn{ - sock: net.tcp_socket_from_handle_raw(fd) - handle: fd - is_blocking: false - } - // cap the max_read to 8KB - mut reader := io.new_buffered_reader(reader: conn, cap: max_read) - defer { - unsafe { - reader.free() - } - } - // take the previous incomplete request - mut req := params.incomplete_requests[fd] - // check if there is an incomplete request for this file descriptor - if params.idx[fd] == 0 { - $if trace_handle_read ? { - eprintln('>>>>> fd: ${fd} | start of request parsing') - } - // this is the start of a new request, setup the connection, and read the headers: - // set the read and write timeout according to picoev settings when the - // connection is first encountered - conn.set_read_timeout(params.timeout_in_seconds) - conn.set_write_timeout(params.timeout_in_seconds) - // first time that this connection is being read from, so we parse the - // request header first - req = http.parse_request_head(mut reader) or { - // Prevents errors from being thrown when BufferedReader is empty - if err !is io.Eof { - eprintln('[veb] error parsing request: ${err}') - } - // the buffered reader was empty meaning that the client probably - // closed the connection. - pv.close_conn(fd) - params.request_done(fd) - return - } - if reader.total_read >= max_read { - // throw an error when the request header is larger than 8KB - // same limit that apache handles - eprintln('[veb] error parsing request: too large') - fast_send_resp(mut conn, http_413) or {} - pv.close_conn(fd) - params.request_done(fd) - return - } - } - if params.idx[fd] == -1 { - // this is for sure a continuation of a previous request, where the first part contained only headers; - // make sure that we are ready to accept the body and account for every byte in it, by setting the counter to 0: +$if !new_veb ? { + // EV context + struct RequestParams { + global_app voidptr + controllers []&ControllerPath + routes &map[string]Route + timeout_in_seconds int + mut: + // request body buffer + buf &u8 = unsafe { nil } + // idx keeps track of how much of the request body has been read + // for each incomplete request, see `handle_conn` + idx []int + incomplete_requests []http.Request + file_responses []FileResponse + string_responses []StringResponse + } + + // reset request parameters for `fd`: + // reset content-length index and the http request + pub fn (mut params RequestParams) request_done(fd int) { + params.incomplete_requests[fd] = http.Request{} params.idx[fd] = 0 $if trace_handle_read ? { - eprintln('>>>>> fd: ${fd} | continuation of request, where the first part contained headers') - } - } - // check if the request has a body - content_length := req.header.get(.content_length) or { '0' } - content_length_i := content_length.int() - if content_length_i > 0 { - mut max_bytes_to_read := max_read - reader.total_read - mut bytes_to_read := content_length_i - params.idx[fd] - // cap the bytes to read to 8KB for the body, including the request headers if any - if bytes_to_read > max_read - reader.total_read { - bytes_to_read = max_read - reader.total_read - } - mut buf_ptr := params.buf - unsafe { - buf_ptr += fd * max_read // pointer magic - } - // convert to []u8 for BufferedReader - mut buf := unsafe { buf_ptr.vbytes(max_bytes_to_read) } - n := reader.read(mut buf) or { - if reader.total_read > 0 { - // The headers were parsed in this cycle, but the body has not been sent yet. No need to error. - params.idx[fd] = -1 // avoid reparsing the headers on the next call. - params.incomplete_requests[fd] = req - $if trace_handle_read ? { - eprintln('>>>>> fd: ${fd} | request headers were parsed, but the body has not been parsed yet | params.idx[fd]: ${params.idx[fd]} | content_length_i: ${content_length_i}') - } - return - } - eprintln('[veb] error reading request body: ${err}') - if err is io.Eof { - // we expect more data to be send, but an Eof error occurred, meaning - // that there is no more data to be read from the socket. - // And at this point we expect that there is data to be read for the body. - fast_send_resp(mut conn, http.new_response( - status: .bad_request - body: 'Mismatch of body length and Content-Length header' - header: http.new_header( - key: .content_type - value: 'text/plain' - ).join(headers_close) - )) or {} - } - pv.close_conn(fd) - params.request_done(fd) - return - } - // there is no more data to be sent, but it is less than the Content-Length header - // so it is a mismatch of body length and content length. - // Or if there is more data received then the Content-Length header specified - if (n == 0 && params.idx[fd] != 0) || params.idx[fd] + n > content_length_i { - fast_send_resp(mut conn, http.new_response( - status: .bad_request - body: 'Mismatch of body length and Content-Length header' - header: http.new_header( - key: .content_type - value: 'text/plain' - ).join(headers_close) - )) or {} - pv.close_conn(fd) - params.request_done(fd) - return - } else if n < bytes_to_read || params.idx[fd] + n < content_length_i { - // request is incomplete wait until the socket becomes ready to read again - // TODO: change this to a memcpy function? - req.data += buf[0..n].bytestr() - params.incomplete_requests[fd] = req - params.idx[fd] += n - $if trace_handle_read ? { - eprintln('>>>>> request is NOT complete, fd: ${fd} | n: ${n} | req.data.len: ${req.data.len} | params.idx[fd]: ${params.idx[fd]}') - } - return - } else { - // request is complete: n = bytes_to_read - req.data += buf[0..n].bytestr() - params.idx[fd] += n - $if trace_handle_read ? { - eprintln('>>>>> request is NOW COMPLETE, fd: ${fd} | n: ${n} | req.data.len: ${req.data.len}') - } - } - } - defer { - params.request_done(fd) - } - if completed_context := handle_request[A, X](mut conn, req, params) { - if completed_context.takeover { - // the connection should be kept open, but removed from the picoev loop. - // This way veb can continue handling other connections and the user can - // keep the connection open indefinitely - pv.delete(fd) - return - } - // TODO: At this point the Context can safely be freed when this function returns. - // The user will have to clone the context if the context object should be kept. - // defer { - // completed_context.free() - // } - match completed_context.return_type { - .normal { - // small optimization: if the response is small write it immediately - // the socket is most likely able to write all the data without blocking. - // See Context.send_file for why we use max_read instead of max_write. - if completed_context.res.body.len < max_read { - fast_send_resp(mut conn, completed_context.res) or {} - handle_complete_request(completed_context.client_wants_to_close, mut - pv, fd) - } else { - params.string_responses[fd].open = true - params.string_responses[fd].str = completed_context.res.body - res := pv.add(fd, picoev.picoev_write, params.timeout_in_seconds, - picoev.raw_callback) - // picoev error - if res == -1 { - // should not happen - params.string_responses[fd].done() - fast_send_resp(mut conn, http_500) or {} - handle_complete_request(completed_context.client_wants_to_close, mut - pv, fd) - return - } - // no errors we can send the HTTP headers - fast_send_resp_header(mut conn, completed_context.res) or {} - } - } - .file { - // save file information - length := completed_context.res.header.get(.content_length) or { - fast_send_resp(mut conn, http_500) or {} - return - } - params.file_responses[fd].total = length.i64() - params.file_responses[fd].file = os.open(completed_context.return_file) or { - // Context checks if the file is valid, so this should never happen - fast_send_resp(mut conn, http_500) or {} - params.file_responses[fd].done() - pv.close_conn(fd) - return - } - params.file_responses[fd].open = true - - res := pv.add(fd, picoev.picoev_write, params.timeout_in_seconds, picoev.raw_callback) - // picoev error - if res == -1 { - // should not happen - fast_send_resp(mut conn, http_500) or {} - params.file_responses[fd].done() - pv.close_conn(fd) - return - } - // no errors we can send the HTTP headers - fast_send_resp_header(mut conn, completed_context.res) or {} - } + eprintln('>>>>> fd: ${fd} | request_done.') } - } else { - // invalid request headers/data - pv.close_conn(fd) } } -// close the connection when `should_close` is true. -@[inline] -fn handle_complete_request(should_close bool, mut pv picoev.Picoev, fd int) { - if should_close { - pv.close_conn(fd) - } -} - -fn handle_request[A, X](mut conn net.TcpConn, req http.Request, params &RequestParams) ?&Context { - // println('handle_request() params.routes=${params.routes}') - mut global_app := unsafe { &A(params.global_app) } - - // TODO: change this variable to include the total wait time over each network cycle - // maybe store it in Request.user_ptr ? - page_gen_start := time.ticks() - - $if trace_request ? { - dump(req) - } - $if trace_request_url ? { - dump(req.url) - } - - // parse the URL, query and form data - mut url := urllib.parse(req.url) or { - eprintln('[veb] error parsing path "${req.url}": ${err}') - return none - } - query := parse_query_from_url(url) - form, files := parse_form_from_request(req) or { - // Bad request - eprintln('[veb] error parsing form: ${err.msg()}') - conn.write(http_400.bytes()) or {} - return none - } - - // remove the port from the HTTP Host header - host_with_port := req.header.get(.host) or { '' } - host, _ := urllib.split_host_port(host_with_port) - - // Create Context with request data - mut ctx := &Context{ - req: req - page_gen_start: page_gen_start - conn: conn - query: query - form: form - files: files - } - - if connection_header := req.header.get(.connection) { - // A client that does not support persistent connections MUST send the - // "close" connection option in every request message. - if connection_header.to_lower() == 'close' { - ctx.client_wants_to_close = true - } - } - - $if A is StaticApp { - ctx.custom_mime_types = global_app.static_mime_types.clone() - } - - // match controller paths - $if A is ControllerInterface { - if completed_context := handle_controllers[X](params.controllers, ctx, mut url, - host) - { - return completed_context - } - } - - // create a new user context and pass the veb's context - mut user_context := X{} - user_context.Context = ctx - - handle_route[A, X](mut global_app, mut user_context, url, host, params.routes) - // we need to explicitly tell the V compiler to return a reference - return &user_context.Context +interface BeforeAcceptApp { +mut: + before_accept_loop() } fn handle_route[A, X](mut app A, mut user_context X, url urllib.URL, host string, routes &map[string]Route) { @@ -700,6 +220,11 @@ fn handle_route[A, X](mut app A, mut user_context X, url urllib.URL, host string } } + // defer { + // println('USER CONTEXT at end of handle_route') + // println(user_context) + //} + // Route matching and match route specific middleware as last step $for method in A.methods { $if method.return_type is Result { @@ -917,6 +442,12 @@ fn send_string_ptr(mut conn net.TcpConn, ptr &u8, len int) !int { return conn.write_ptr(ptr, len) } +// Set s to the form error +pub fn (mut ctx Context) error(s string) { + eprintln('[veb] Context.error: ${s}') + ctx.form_error = s +} + fn fast_send_resp_header(mut conn net.TcpConn, resp http.Response) ! { mut sb := strings.new_builder(resp.body.len + 200) sb.write_string('HTTP/') @@ -941,9 +472,3 @@ fn fast_send_resp(mut conn net.TcpConn, resp http.Response) ! { fast_send_resp_header(mut conn, resp)! send_string(mut conn, resp.body)! } - -// Set s to the form error -pub fn (mut ctx Context) error(s string) { - eprintln('[veb] Context.error: ${s}') - ctx.form_error = s -} diff --git a/vlib/veb/veb_d_new_veb.v b/vlib/veb/veb_d_new_veb.v new file mode 100644 index 000000000..82f2f5770 --- /dev/null +++ b/vlib/veb/veb_d_new_veb.v @@ -0,0 +1,161 @@ +// Copyright (c) 2019-2024 Alexander Medvednikov. All rights reserved. +// Use of this source code is governed by an MIT license +// that can be found in the LICENSE file. +module veb + +import fasthttp +import net.http +import time +import net +import net.urllib +import os + +struct RequestParams { + global_app voidptr + controllers_sorted []&ControllerPath + routes &map[string]Route +} + +// TODO remove global hack +//__global gparams RequestParams +const gparams = RequestParams{ + routes: unsafe { nil } +} + +const http_ok_response = 'HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: 0\r\nConnection: keep-alive\r\n\r\n'.bytes() + +pub fn run_at[A, X](mut global_app A, params RunParams) ! { + run_new[A, X](mut global_app, params.port)! +} + +// run_new - start a new veb server using the parallel fasthttp backend. +pub fn run_new[A, X](mut global_app A, port int) ! { + // gapp = global_app + if port <= 0 || port > 65535 { + return error('invalid port number `${port}`, it should be between 1 and 65535') + } + + // Generate routes and controllers just like the original run() function. + routes := generate_routes[A, X](global_app)! + controllers_sorted := check_duplicate_routes_in_controllers[A](global_app, routes)! + unsafe { + gparams = &RequestParams{ + global_app: global_app + controllers_sorted: controllers_sorted + routes: &routes + // timeout_in_seconds: + } + } + // Configure and run the fasthttp server + mut server := fasthttp.new_server(port, parallel_request_handler[A, X]) or { + eprintln('Failed to create server: ${err}') + return + } + println('[veb] Running multi-threaded app on http://localhost:${port}/') + flush_stdout() + server.run() or { panic(err) } +} + +// const test_text = 'test' + +fn parallel_request_handler[A, X](req fasthttp.HttpRequest) ![]u8 { + /* + if true { + return test_text.bytes() + } + */ + // mut global_app := unsafe { &A(gapp) } + mut global_app := unsafe { &A(gparams.global_app) } + // println('parallel_request_handler() params.routes=${gparams.routes}') + // println('global_app=$global_app') + // println('params=$gparams') + // println('req=$req') + // println('buffer=${req.buffer.bytestr()}') + s := req.buffer.bytestr() + method := unsafe { tos(req.method.buf, req.method.len) } + println('method=${method}') + path := unsafe { tos(req.path.buf, req.path.len) } + println('path=${path}') + req_bytes := req.buffer + client_fd := req.client_conn_fd + + // Parse the raw request bytes into a standard `http.Request`. + req2 := http.parse_request_head_str(s.clone()) or { + eprintln('[veb] Failed to parse request: ${err}') + println('s=') + println(s) + println('==============') + return http_ok_response // tiny_bad_request_response + } + // Create and populate the `veb.Context`. + completed_context := handle_request_and_route[A, X](mut global_app, req2, client_fd, + gparams.routes, gparams.controllers_sorted) + // Serialize the final `http.Response` into a byte array. + if completed_context.takeover { + eprintln('[veb] WARNING: ctx.takeover_conn() was called, but this is not supported by this server backend. The connection will be closed after this response.') + } + // The fasthttp server expects a complete response buffer to be returned. + return completed_context.res.bytes() +} + +// handle_request_and_route is a unified function that creates the context, +// runs middleware, and finds the correct route for a request. +fn handle_request_and_route[A, X](mut app A, req http.Request, client_fd int, routes &map[string]Route, controllers []&ControllerPath) &Context { + /* + // Create a `net.TcpConn` from the file descriptor for context compatibility. + mut conn := &net.TcpConn{ + sock: net.tcp_socket_from_handle_raw(client_fd) + handle: client_fd + is_blocking: false // vanilla_http_server ensures this + } + */ + // Create and populate the `veb.Context` from the request. + mut url := urllib.parse(req.url) or { + // This should be rare if http.parse_request succeeded. + mut bad_ctx := &Context{ + req: req + // conn: conn + } + bad_ctx.not_found() + return bad_ctx + } + query := parse_query_from_url(url) + form, files := parse_form_from_request(req) or { + mut bad_ctx := &Context{ + req: req + // conn: conn + } + bad_ctx.request_error('Failed to parse form data: ${err.msg()}') + return bad_ctx + } + host_with_port := req.header.get(.host) or { '' } + host, _ := urllib.split_host_port(host_with_port) + mut ctx := &Context{ + req: req + page_gen_start: time.ticks() + // conn: conn + query: query + form: form + files: files + } + if connection_header := req.header.get(.connection) { + if connection_header.to_lower() == 'close' { + ctx.client_wants_to_close = true + } + } + $if A is StaticApp { + ctx.custom_mime_types = app.static_mime_types.clone() + } + // Match controller paths first + $if A is ControllerInterface { + if completed_context := handle_controllers[X](controllers, ctx, mut url, host) { + return completed_context + } + } + // Create a new user context and pass veb's context + mut user_context := X{} + user_context.Context = ctx + println('calling handle_route') + handle_route[A, X](mut app, mut user_context, url, host, routes) + return &user_context.Context +} diff --git a/vlib/veb/veb_picoev.v b/vlib/veb/veb_picoev.v new file mode 100644 index 000000000..732e4422a --- /dev/null +++ b/vlib/veb/veb_picoev.v @@ -0,0 +1,499 @@ +module veb + +import os + +$if !new_veb ? { + import picoev + import time + import net + import net.http + import io + import net.urllib +} + +$if !new_veb ? { + // run_at - start a new veb server, listening only on a specific address `host`, at the specified `port` + // Usage example: veb.run_at(new_app(), host: 'localhost' port: 8099 family: .ip)! + @[direct_array_access; manualfree] + pub fn run_at[A, X](mut global_app A, params RunParams) ! { + if params.port <= 0 || params.port > 65535 { + return error('invalid port number `${params.port}`, it should be between 1 and 65535') + } + + routes := generate_routes[A, X](global_app)! + controllers_sorted := check_duplicate_routes_in_controllers[A](global_app, routes)! + + if params.show_startup_message { + host := if params.host == '' { 'localhost' } else { params.host } + println('[veb] Running app on http://${host}:${params.port}/') + } + flush_stdout() + + mut pico_context := &RequestParams{ + global_app: unsafe { global_app } + controllers: controllers_sorted + routes: &routes + timeout_in_seconds: params.timeout_in_seconds + } + + pico_context.idx = []int{len: picoev.max_fds} + // reserve space for read and write buffers + pico_context.buf = unsafe { malloc_noscan(picoev.max_fds * max_read + 1) } + defer { + unsafe { + free(pico_context.buf) + } + } + pico_context.incomplete_requests = []http.Request{len: picoev.max_fds} + pico_context.file_responses = []FileResponse{len: picoev.max_fds} + pico_context.string_responses = []StringResponse{len: picoev.max_fds} + + mut pico := picoev.new( + port: params.port + raw_cb: ev_callback[A, X] + user_data: pico_context + timeout_secs: params.timeout_in_seconds + family: params.family + host: params.host + )! + + $if A is BeforeAcceptApp { + global_app.before_accept_loop() + } + + // Forever accept every connection that comes + pico.serve() + } + + @[direct_array_access] + fn ev_callback[A, X](mut pv picoev.Picoev, fd int, events int) { + mut params := unsafe { &RequestParams(pv.user_data) } + + if events == picoev.picoev_timeout { + $if trace_picoev_callback ? { + eprintln('> request timeout on file descriptor ${fd}') + } + + handle_timeout(mut pv, mut params, fd) + } else if events == picoev.picoev_write { + $if trace_picoev_callback ? { + eprintln('> write event on file descriptor ${fd}') + } + + if params.file_responses[fd].open { + handle_write_file(mut pv, mut params, fd) + } else if params.string_responses[fd].open { + handle_write_string(mut pv, mut params, fd) + } else { + // This should never happen, but it does on pages, that refer to static resources, + // in folders, added with `mount_static_folder_at`. See also + // https://github.com/vlang/edu-platform/blob/0c203f0384cf24f917f9a7c9bb150f8d64aca00f/main.v#L92 + $if debug_ev_callback ? { + eprintln('[veb] error: write event on connection should be closed') + } + pv.close_conn(fd) + } + } else if events == picoev.picoev_read { + $if trace_picoev_callback ? { + eprintln('> read event on file descriptor ${fd}') + } + // println('ev_callback fd=${fd} params.routes=${params.routes.len}') + handle_read[A, X](mut pv, mut params, fd) + } else { + // should never happen + eprintln('[veb] error: invalid picoev event ${events}') + } + } + + fn handle_timeout(mut pv picoev.Picoev, mut params RequestParams, fd int) { + mut conn := &net.TcpConn{ + sock: net.tcp_socket_from_handle_raw(fd) + handle: fd + is_blocking: false + } + + fast_send_resp(mut conn, http_408) or {} + pv.close_conn(fd) + params.request_done(fd) + } + + // handle_write_file reads data from a file and sends that data over the socket. + @[direct_array_access; manualfree] + fn handle_write_file(mut pv picoev.Picoev, mut params RequestParams, fd int) { + mut bytes_to_write := int(params.file_responses[fd].total - params.file_responses[fd].pos) + + $if linux || freebsd { + bytes_written := sendfile(fd, params.file_responses[fd].file.fd, bytes_to_write) + if bytes_written < 0 { + params.file_responses[fd].pos += bytes_to_write + } else { + params.file_responses[fd].pos += bytes_written + } + } $else { + if bytes_to_write > max_write { + bytes_to_write = max_write + } + data := unsafe { malloc(bytes_to_write) } + defer { + unsafe { free(data) } + } + mut conn := &net.TcpConn{ + sock: net.tcp_socket_from_handle_raw(fd) + handle: fd + is_blocking: false + write_timeout: params.timeout_in_seconds * time.second + } + + params.file_responses[fd].file.read_into_ptr(data, bytes_to_write) or { + params.file_responses[fd].done() + pv.close_conn(fd) + return + } + actual_written := send_string_ptr(mut conn, data, bytes_to_write) or { + params.file_responses[fd].done() + pv.close_conn(fd) + return + } + params.file_responses[fd].pos += actual_written + } + + if params.file_responses[fd].pos == params.file_responses[fd].total { + // file is done writing + params.file_responses[fd].done() + handle_complete_request(params.file_responses[fd].should_close_conn, mut pv, + fd) + return + } + } + + // handle_write_string reads data from a string and sends that data over the socket + @[direct_array_access] + fn handle_write_string(mut pv picoev.Picoev, mut params RequestParams, fd int) { + mut bytes_to_write := int(params.string_responses[fd].str.len - params.string_responses[fd].pos) + if bytes_to_write > max_write { + bytes_to_write = max_write + } + mut conn := &net.TcpConn{ + sock: net.tcp_socket_from_handle_raw(fd) + handle: fd + is_blocking: false + } + + // pointer magic to start at the correct position in the buffer + data := unsafe { params.string_responses[fd].str.str + params.string_responses[fd].pos } + actual_written := send_string_ptr(mut conn, data, bytes_to_write) or { + params.string_responses[fd].done() + pv.close_conn(fd) + return + } + params.string_responses[fd].pos += actual_written + if params.string_responses[fd].pos == params.string_responses[fd].str.len { + // done writing + params.string_responses[fd].done() + pv.close_conn(fd) + handle_complete_request(params.string_responses[fd].should_close_conn, mut + pv, fd) + return + } + } + + // handle_read reads data from the connection and if the request is complete + // it calls `handle_route` and closes the connection. + // If the request is not complete, it stores the incomplete request in `params` + // and the connection stays open until it is ready to read again + @[direct_array_access; manualfree] + fn handle_read[A, X](mut pv picoev.Picoev, mut params RequestParams, fd int) { + // println('handle_read() fd=${fd} params.routes=${params.routes}') + mut conn := &net.TcpConn{ + sock: net.tcp_socket_from_handle_raw(fd) + handle: fd + is_blocking: false + } + + // cap the max_read to 8KB + mut reader := io.new_buffered_reader(reader: conn, cap: max_read) + defer { + unsafe { + reader.free() + } + } + // take the previous incomplete request + mut req := params.incomplete_requests[fd] + // check if there is an incomplete request for this file descriptor + if params.idx[fd] == 0 { + $if trace_handle_read ? { + eprintln('>>>>> fd: ${fd} | start of request parsing') + } + // this is the start of a new request, setup the connection, and read the headers: + // set the read and write timeout according to picoev settings when the + // connection is first encountered + conn.set_read_timeout(params.timeout_in_seconds) + conn.set_write_timeout(params.timeout_in_seconds) + // first time that this connection is being read from, so we parse the + // request header first + req = http.parse_request_head(mut reader) or { + // Prevents errors from being thrown when BufferedReader is empty + if err !is io.Eof { + eprintln('[veb] error parsing request: ${err}') + } + // the buffered reader was empty meaning that the client probably + // closed the connection. + pv.close_conn(fd) + params.request_done(fd) + return + } + if reader.total_read >= max_read { + // throw an error when the request header is larger than 8KB + // same limit that apache handles + eprintln('[veb] error parsing request: too large') + fast_send_resp(mut conn, http_413) or {} + pv.close_conn(fd) + params.request_done(fd) + return + } + } + if params.idx[fd] == -1 { + // this is for sure a continuation of a previous request, where the first part contained only headers; + // make sure that we are ready to accept the body and account for every byte in it, by setting the counter to 0: + params.idx[fd] = 0 + $if trace_handle_read ? { + eprintln('>>>>> fd: ${fd} | continuation of request, where the first part contained headers') + } + } + // check if the request has a body + content_length := req.header.get(.content_length) or { '0' } + content_length_i := content_length.int() + if content_length_i > 0 { + mut max_bytes_to_read := max_read - reader.total_read + mut bytes_to_read := content_length_i - params.idx[fd] + // cap the bytes to read to 8KB for the body, including the request headers if any + if bytes_to_read > max_read - reader.total_read { + bytes_to_read = max_read - reader.total_read + } + mut buf_ptr := params.buf + unsafe { + buf_ptr += fd * max_read // pointer magic + } + // convert to []u8 for BufferedReader + mut buf := unsafe { buf_ptr.vbytes(max_bytes_to_read) } + n := reader.read(mut buf) or { + if reader.total_read > 0 { + // The headers were parsed in this cycle, but the body has not been sent yet. No need to error. + params.idx[fd] = -1 // avoid reparsing the headers on the next call. + params.incomplete_requests[fd] = req + $if trace_handle_read ? { + eprintln('>>>>> fd: ${fd} | request headers were parsed, but the body has not been parsed yet | params.idx[fd]: ${params.idx[fd]} | content_length_i: ${content_length_i}') + } + return + } + eprintln('[veb] error reading request body: ${err}') + if err is io.Eof { + // we expect more data to be send, but an Eof error occurred, meaning + // that there is no more data to be read from the socket. + // And at this point we expect that there is data to be read for the body. + fast_send_resp(mut conn, http.new_response( + status: .bad_request + body: 'Mismatch of body length and Content-Length header' + header: http.new_header( + key: .content_type + value: 'text/plain' + ).join(headers_close) + )) or {} + } + pv.close_conn(fd) + params.request_done(fd) + return + } + // there is no more data to be sent, but it is less than the Content-Length header + // so it is a mismatch of body length and content length. + // Or if there is more data received then the Content-Length header specified + if (n == 0 && params.idx[fd] != 0) || params.idx[fd] + n > content_length_i { + fast_send_resp(mut conn, http.new_response( + status: .bad_request + body: 'Mismatch of body length and Content-Length header' + header: http.new_header( + key: .content_type + value: 'text/plain' + ).join(headers_close) + )) or {} + pv.close_conn(fd) + params.request_done(fd) + return + } else if n < bytes_to_read || params.idx[fd] + n < content_length_i { + // request is incomplete wait until the socket becomes ready to read again + // TODO: change this to a memcpy function? + req.data += buf[0..n].bytestr() + params.incomplete_requests[fd] = req + params.idx[fd] += n + $if trace_handle_read ? { + eprintln('>>>>> request is NOT complete, fd: ${fd} | n: ${n} | req.data.len: ${req.data.len} | params.idx[fd]: ${params.idx[fd]}') + } + return + } else { + // request is complete: n = bytes_to_read + req.data += buf[0..n].bytestr() + params.idx[fd] += n + $if trace_handle_read ? { + eprintln('>>>>> request is NOW COMPLETE, fd: ${fd} | n: ${n} | req.data.len: ${req.data.len}') + } + } + } + defer { + params.request_done(fd) + } + if completed_context := handle_request[A, X](mut conn, req, params) { + if completed_context.takeover { + // the connection should be kept open, but removed from the picoev loop. + // This way veb can continue handling other connections and the user can + // keep the connection open indefinitely + pv.delete(fd) + return + } + // TODO: At this point the Context can safely be freed when this function returns. + // The user will have to clone the context if the context object should be kept. + // defer { + // completed_context.free() + // } + match completed_context.return_type { + .normal { + // small optimization: if the response is small write it immediately + // the socket is most likely able to write all the data without blocking. + // See Context.send_file for why we use max_read instead of max_write. + if completed_context.res.body.len < max_read { + fast_send_resp(mut conn, completed_context.res) or {} + handle_complete_request(completed_context.client_wants_to_close, mut + pv, fd) + } else { + params.string_responses[fd].open = true + params.string_responses[fd].str = completed_context.res.body + res := pv.add(fd, picoev.picoev_write, params.timeout_in_seconds, + picoev.raw_callback) + // picoev error + if res == -1 { + // should not happen + params.string_responses[fd].done() + fast_send_resp(mut conn, http_500) or {} + handle_complete_request(completed_context.client_wants_to_close, mut + pv, fd) + return + } + // no errors we can send the HTTP headers + fast_send_resp_header(mut conn, completed_context.res) or {} + } + } + .file { + // save file information + length := completed_context.res.header.get(.content_length) or { + fast_send_resp(mut conn, http_500) or {} + return + } + params.file_responses[fd].total = length.i64() + params.file_responses[fd].file = os.open(completed_context.return_file) or { + // Context checks if the file is valid, so this should never happen + fast_send_resp(mut conn, http_500) or {} + params.file_responses[fd].done() + pv.close_conn(fd) + return + } + params.file_responses[fd].open = true + + res := pv.add(fd, picoev.picoev_write, params.timeout_in_seconds, + picoev.raw_callback) + // picoev error + if res == -1 { + // should not happen + fast_send_resp(mut conn, http_500) or {} + params.file_responses[fd].done() + pv.close_conn(fd) + return + } + // no errors we can send the HTTP headers + fast_send_resp_header(mut conn, completed_context.res) or {} + } + } + } else { + // invalid request headers/data + pv.close_conn(fd) + } + } + + // close the connection when `should_close` is true. + @[inline] + fn handle_complete_request(should_close bool, mut pv picoev.Picoev, fd int) { + if should_close { + pv.close_conn(fd) + } + } + + fn handle_request[A, X](mut conn net.TcpConn, req http.Request, params &RequestParams) ?&Context { + // println('handle_request() params.routes=${params.routes}') + mut global_app := unsafe { &A(params.global_app) } + + // TODO: change this variable to include the total wait time over each network cycle + // maybe store it in Request.user_ptr ? + page_gen_start := time.ticks() + + $if trace_request ? { + dump(req) + } + $if trace_request_url ? { + dump(req.url) + } + + // parse the URL, query and form data + mut url := urllib.parse(req.url) or { + eprintln('[veb] error parsing path "${req.url}": ${err}') + return none + } + query := parse_query_from_url(url) + form, files := parse_form_from_request(req) or { + // Bad request + eprintln('[veb] error parsing form: ${err.msg()}') + conn.write(http_400.bytes()) or {} + return none + } + + // remove the port from the HTTP Host header + host_with_port := req.header.get(.host) or { '' } + host, _ := urllib.split_host_port(host_with_port) + + // Create Context with request data + mut ctx := &Context{ + req: req + page_gen_start: page_gen_start + conn: conn + query: query + form: form + files: files + } + + if connection_header := req.header.get(.connection) { + // A client that does not support persistent connections MUST send the + // "close" connection option in every request message. + if connection_header.to_lower() == 'close' { + ctx.client_wants_to_close = true + } + } + + $if A is StaticApp { + ctx.custom_mime_types = global_app.static_mime_types.clone() + } + + // match controller paths + $if A is ControllerInterface { + if completed_context := handle_controllers[X](params.controllers, ctx, mut + url, host) + { + return completed_context + } + } + + // create a new user context and pass the veb's context + mut user_context := X{} + user_context.Context = ctx + + handle_route[A, X](mut global_app, mut user_context, url, host, params.routes) + // we need to explicitly tell the V compiler to return a reference + return &user_context.Context + } +} -- 2.39.5