From 46480db4530e0b7dc953c4c797aabfc61d1c073f Mon Sep 17 00:00:00 2001 From: Alexander Medvednikov Date: Wed, 25 Mar 2026 16:42:16 +0300 Subject: [PATCH] net.websocket: add proxy support to websocket client (fixes #21273) --- vlib/net/http/http_proxy.v | 70 ++++++--- vlib/net/websocket/io.v | 7 +- vlib/net/websocket/websocket_client.v | 3 + vlib/net/websocket/websocket_proxy_test.v | 181 ++++++++++++++++++++++ 4 files changed, 240 insertions(+), 21 deletions(-) create mode 100644 vlib/net/websocket/websocket_proxy_test.v diff --git a/vlib/net/http/http_proxy.v b/vlib/net/http/http_proxy.v index 5530eda61..de02f39ba 100644 --- a/vlib/net/http/http_proxy.v +++ b/vlib/net/http/http_proxy.v @@ -18,6 +18,13 @@ mut: url string } +// dial_tcp_via_proxy connects to `host` through the proxy specified by `proxy_url`. +// `host` should be in `host:port` form. +pub fn dial_tcp_via_proxy(proxy_url string, host string) !&net.TcpConn { + proxy := new_http_proxy(proxy_url)! + return proxy.dial(host)! +} + // new_http_proxy creates a new HttpProxy instance, from the given http proxy url in `raw_url` pub fn new_http_proxy(raw_url string) !&HttpProxy { mut url := urllib.parse(raw_url) or { return error('malformed proxy url') } @@ -91,6 +98,47 @@ fn (pr &HttpProxy) build_proxy_headers(host string) string { return 'CONNECT ${host} ${version}\r\nHost: ${address}\r\n' + uheaders.join('') + '\r\n' } +fn read_proxy_connect_response(mut tcp net.TcpConn) !string { + mut total_bytes_read := 0 + mut msg := [4096]u8{} + mut buffer := [1]u8{} + for total_bytes_read < msg.len { + bytes_read := tcp.read_ptr(&buffer[0], 1)! + if bytes_read == 0 { + return error('proxy closed the connection while establishing a tunnel') + } + msg[total_bytes_read] = buffer[0] + total_bytes_read++ + if total_bytes_read > 3 && msg[total_bytes_read - 1] == `\n` + && msg[total_bytes_read - 2] == `\r` && msg[total_bytes_read - 3] == `\n` + && msg[total_bytes_read - 4] == `\r` { + return msg[..total_bytes_read].bytestr() + } + } + return error('proxy response headers exceeded 4096 bytes') +} + +fn validate_proxy_connect_response(response string) ! { + status_line := response.all_before('\r\n') + if !status_line.starts_with('HTTP/1.1 200') && !status_line.starts_with('HTTP/1.0 200') { + return error('proxy tunnel error: ${status_line}') + } +} + +fn (pr &HttpProxy) connect_tcp(host string) !&net.TcpConn { + if pr.scheme in ['http', 'https'] { + mut tcp := net.dial_tcp(pr.host)! + tcp.write(pr.build_proxy_headers(host).bytes())! + response := read_proxy_connect_response(mut tcp)! + validate_proxy_connect_response(response)! + return tcp + } else if pr.scheme == 'socks5' { + return socks.socks5_dial(pr.host, host, pr.username, pr.password)! + } else { + return error('http_proxy connect_tcp: invalid proxy scheme') + } +} + fn (pr &HttpProxy) http_do(host urllib.URL, _method Method, path string, req &Request) !Response { host_name, port := net.split_address(host.hostname())! @@ -135,30 +183,12 @@ fn (pr &HttpProxy) http_do(host urllib.URL, _method Method, path string, req &Re } fn (pr &HttpProxy) dial(host string) !&net.TcpConn { - if pr.scheme in ['http', 'https'] { - mut tcp := net.dial_tcp(pr.host)! - tcp.write(pr.build_proxy_headers(host).bytes())! - mut bf := []u8{len: 4096} - - tcp.read(mut bf)! - return tcp - } else if pr.scheme == 'socks5' { - return socks.socks5_dial(pr.host, host, pr.username, pr.password)! - } else { - return error('http_proxy dial: invalid proxy scheme') - } + return pr.connect_tcp(host)! } fn (pr &HttpProxy) ssl_dial(host string) !&ssl.SSLConn { if pr.scheme in ['http', 'https'] { - mut tcp := net.dial_tcp(pr.host)! - tcp.write(pr.build_proxy_headers(host).bytes())! - mut bf := []u8{len: 4096} - tcp.read(mut bf)! - if !bf.bytestr().contains('HTTP/1.1 200') { - return error('ssl dial error: ${bf.bytestr()}') - } - + mut tcp := pr.connect_tcp(host)! mut ssl_conn := ssl.new_ssl_conn( verify: '' cert: '' diff --git a/vlib/net/websocket/io.v b/vlib/net/websocket/io.v index a9e35b37d..21ecd0f0a 100644 --- a/vlib/net/websocket/io.v +++ b/vlib/net/websocket/io.v @@ -1,6 +1,7 @@ module websocket import net +import net.http // socket_read reads from socket into the provided buffer fn (mut ws Client) socket_read(mut buffer []u8) !int { @@ -73,7 +74,11 @@ fn (mut ws Client) shutdown_socket() ! { // dial_socket connects tcp socket and initializes default configurations fn (mut ws Client) dial_socket() !&net.TcpConn { tcp_address := '${ws.uri.hostname}:${ws.uri.port}' - mut t := net.dial_tcp(tcp_address)! + mut t := if ws.proxy_url == '' { + net.dial_tcp(tcp_address)! + } else { + http.dial_tcp_via_proxy(ws.proxy_url, tcp_address)! + } optval := int(1) t.sock.set_option_int(.keep_alive, optval)! t.set_read_timeout(ws.read_timeout) diff --git a/vlib/net/websocket/websocket_client.v b/vlib/net/websocket/websocket_client.v index ac4d5652a..11fe6f3a6 100644 --- a/vlib/net/websocket/websocket_client.v +++ b/vlib/net/websocket/websocket_client.v @@ -24,6 +24,7 @@ pub struct Client { is_server bool mut: ssl_conn &ssl.SSLConn = unsafe { nil } // secure connection used when wss is used + proxy_url string flags []Flag // flags used in handshake fragments []Fragment // current fragments message_callbacks []MessageEventHandler // all callbacks on_message @@ -86,6 +87,7 @@ pub: read_timeout i64 = 30 * time.second write_timeout i64 = 30 * time.second logger &log.Logger = default_logger + proxy_url string // optional proxy URL used to open the websocket TCP tunnel } // new_client instance a new websocket client @@ -98,6 +100,7 @@ pub fn new_client(address string, opt ClientOpt) !&Client { is_ssl: address.starts_with('wss') logger: opt.logger uri: uri + proxy_url: opt.proxy_url client_state: ClientState{ state: .closed } diff --git a/vlib/net/websocket/websocket_proxy_test.v b/vlib/net/websocket/websocket_proxy_test.v new file mode 100644 index 000000000..cb0df165c --- /dev/null +++ b/vlib/net/websocket/websocket_proxy_test.v @@ -0,0 +1,181 @@ +// vtest build: !windows +import encoding.base64 +import os +import net +import net.websocket +import time + +const proxy_github_job = os.getenv('GITHUB_JOB') + +const proxy_should_skip = proxy_github_job != '' && proxy_github_job != 'websocket_tests' + +const proxy_ws_target_port = 30105 +const proxy_ws_port = 30106 + +@[heap] +struct ProxyWebsocketTestResults { +pub mut: + nr_messages int + nr_pong_received int + error_message string +} + +fn start_proxy_ws_server(listen_port int) ! { + mut s := websocket.new_server(.ip, listen_port, '') + s.set_ping_interval(1) + s.on_connect(fn (mut s websocket.ServerClient) !bool { + return s.resource_name == '/' + })! + s.on_message(fn (mut ws websocket.Client, msg &websocket.Message) ! { + match msg.opcode { + .pong { ws.write_string('pong')! } + else { ws.write(msg.payload, msg.opcode)! } + } + }) + start_proxy_ws_server_in_thread(mut s) +} + +fn start_proxy_ws_server_in_thread(mut ws websocket.Server) { + spawn fn [mut ws] () { + ws.listen() or { panic('websocket server could not listen, err: ${err}') } + }() + for ws.get_state() != .open { + time.sleep(10 * time.millisecond) + } +} + +fn start_proxy_server(listen_port int, target string, requests chan string) { + ready := chan bool{cap: 1} + spawn fn [listen_port, target, requests, ready] () { + mut listener := net.listen_tcp(.ip, ':${listen_port}') or { panic(err) } + ready <- true + mut client := listener.accept() or { + listener.close() or {} + panic(err) + } + listener.close() or {} + request := read_proxy_request(mut client) or { + requests <- 'ERROR: ${err}' + client.close() or {} + return + } + requests <- request + mut upstream := net.dial_tcp(target) or { + client.write_string('HTTP/1.1 502 Bad Gateway\r\n\r\n') or {} + client.close() or {} + return + } + client.write_string('HTTP/1.1 200 Connection Established\r\n\r\n') or { + client.close() or {} + upstream.close() or {} + return + } + tunnel_proxy_connection(mut client, mut upstream) + }() + _ := <-ready +} + +fn read_proxy_request(mut conn net.TcpConn) !string { + mut total_bytes_read := 0 + mut msg := [4096]u8{} + mut buffer := [1]u8{} + for total_bytes_read < msg.len { + bytes_read := conn.read_ptr(&buffer[0], 1)! + if bytes_read == 0 { + return error('unexpected EOF while reading proxy request') + } + msg[total_bytes_read] = buffer[0] + total_bytes_read++ + if total_bytes_read > 3 && msg[total_bytes_read - 1] == `\n` + && msg[total_bytes_read - 2] == `\r` && msg[total_bytes_read - 3] == `\n` + && msg[total_bytes_read - 4] == `\r` { + return msg[..total_bytes_read].bytestr() + } + } + return error('proxy request headers exceeded 4096 bytes') +} + +fn tunnel_proxy_connection(mut client net.TcpConn, mut upstream net.TcpConn) { + defer { + client.close() or {} + upstream.close() or {} + } + client.set_read_timeout(50 * time.millisecond) + upstream.set_read_timeout(50 * time.millisecond) + deadline := time.now().add(4 * time.second) + for time.now() < deadline { + relay_proxy_connection(mut client, mut upstream) + relay_proxy_connection(mut upstream, mut client) + } +} + +fn relay_proxy_connection(mut src net.TcpConn, mut dst net.TcpConn) { + mut buf := []u8{len: 1024} + bytes_read := src.read(mut buf) or { + if err.code() == net.err_timed_out_code { + return + } + return + } + if bytes_read > 0 { + dst.write(buf[..bytes_read]) or {} + } +} + +fn proxy_open_cb(mut client websocket.Client) ! { + client.pong()! +} + +fn proxy_error_cb(mut client websocket.Client, err string, mut res ProxyWebsocketTestResults) ! { + res.error_message = err +} + +fn proxy_message_cb(mut client websocket.Client, msg &websocket.Message, mut res ProxyWebsocketTestResults) ! { + if msg.opcode != .text_frame { + return + } + match msg.payload.bytestr() { + 'pong' { + res.nr_pong_received++ + } + 'a' { + res.nr_messages++ + } + else { + res.error_message = 'unexpected payload: ${msg.payload.bytestr()}' + } + } +} + +fn test_ws_connection_through_proxy() ! { + if proxy_should_skip { + return + } + start_proxy_ws_server(proxy_ws_target_port)! + requests := chan string{cap: 1} + start_proxy_server(proxy_ws_port, '127.0.0.1:${proxy_ws_target_port}', requests) + mut results := ProxyWebsocketTestResults{} + mut client := websocket.new_client('ws://127.0.0.1:${proxy_ws_target_port}', websocket.ClientOpt{ + proxy_url: 'http://user:pass@127.0.0.1:${proxy_ws_port}' + })! + client.on_open(proxy_open_cb) + client.on_error_ref(proxy_error_cb, results) + client.on_message_ref(proxy_message_cb, results) + client.connect()! + spawn client.listen() + for msg in ['a', 'a'] { + client.write(msg.bytes(), .text_frame)! + time.sleep(100 * time.millisecond) + } + time.sleep(1500 * time.millisecond) + request := <-requests + auth_token := base64.encode('user:pass'.bytes()) + assert request.starts_with('CONNECT 127.0.0.1:${proxy_ws_target_port} HTTP/1.1\r\n') + assert request.contains('Proxy-Connection: Keep-Alive\r\n') + assert request.contains('Proxy-Authorization: Basic ${auth_token}\r\n') + client.close(1000, 'done') or {} + time.sleep(100 * time.millisecond) + assert results.error_message == '' + assert results.nr_pong_received >= 2 + assert results.nr_messages == 2 +} -- 2.39.5