| 1 | // vtest build: !windows |
| 2 | import encoding.base64 |
| 3 | import os |
| 4 | import net |
| 5 | import net.websocket |
| 6 | import time |
| 7 | |
| 8 | const proxy_github_job = os.getenv('GITHUB_JOB') |
| 9 | |
| 10 | const proxy_should_skip = proxy_github_job != '' && proxy_github_job != 'websocket_tests' |
| 11 | |
| 12 | const proxy_ws_target_port = 30105 |
| 13 | const proxy_ws_port = 30106 |
| 14 | |
| 15 | @[heap] |
| 16 | struct ProxyWebsocketTestResults { |
| 17 | pub mut: |
| 18 | nr_messages int |
| 19 | nr_pong_received int |
| 20 | error_message string |
| 21 | } |
| 22 | |
| 23 | fn start_proxy_ws_server(listen_port int) ! { |
| 24 | mut s := websocket.new_server(.ip, listen_port, '') |
| 25 | s.set_ping_interval(1) |
| 26 | s.on_connect(fn (mut s websocket.ServerClient) !bool { |
| 27 | return s.resource_name == '/' |
| 28 | })! |
| 29 | s.on_message(fn (mut ws websocket.Client, msg &websocket.Message) ! { |
| 30 | match msg.opcode { |
| 31 | .pong { ws.write_string('pong')! } |
| 32 | else { ws.write(msg.payload, msg.opcode)! } |
| 33 | } |
| 34 | }) |
| 35 | start_proxy_ws_server_in_thread(mut s) |
| 36 | } |
| 37 | |
| 38 | fn start_proxy_ws_server_in_thread(mut ws websocket.Server) { |
| 39 | spawn fn [mut ws] () { |
| 40 | ws.listen() or { panic('websocket server could not listen, err: ${err}') } |
| 41 | }() |
| 42 | for ws.get_state() != .open { |
| 43 | time.sleep(10 * time.millisecond) |
| 44 | } |
| 45 | } |
| 46 | |
| 47 | fn start_proxy_server(listen_port int, target string, requests chan string) { |
| 48 | ready := chan bool{cap: 1} |
| 49 | spawn fn [listen_port, target, requests, ready] () { |
| 50 | mut listener := net.listen_tcp(.ip, ':${listen_port}') or { panic(err) } |
| 51 | ready <- true |
| 52 | mut client := listener.accept() or { |
| 53 | listener.close() or {} |
| 54 | panic(err) |
| 55 | } |
| 56 | listener.close() or {} |
| 57 | request := read_proxy_request(mut client) or { |
| 58 | requests <- 'ERROR: ${err}' |
| 59 | client.close() or {} |
| 60 | return |
| 61 | } |
| 62 | requests <- request |
| 63 | mut upstream := net.dial_tcp(target) or { |
| 64 | client.write_string('HTTP/1.1 502 Bad Gateway\r\n\r\n') or {} |
| 65 | client.close() or {} |
| 66 | return |
| 67 | } |
| 68 | client.write_string('HTTP/1.1 200 Connection Established\r\n\r\n') or { |
| 69 | client.close() or {} |
| 70 | upstream.close() or {} |
| 71 | return |
| 72 | } |
| 73 | tunnel_proxy_connection(mut client, mut upstream) |
| 74 | }() |
| 75 | _ := <-ready |
| 76 | } |
| 77 | |
| 78 | fn read_proxy_request(mut conn net.TcpConn) !string { |
| 79 | mut total_bytes_read := 0 |
| 80 | mut msg := [4096]u8{} |
| 81 | mut buffer := [1]u8{} |
| 82 | for total_bytes_read < msg.len { |
| 83 | bytes_read := conn.read_ptr(&buffer[0], 1)! |
| 84 | if bytes_read == 0 { |
| 85 | return error('unexpected EOF while reading proxy request') |
| 86 | } |
| 87 | msg[total_bytes_read] = buffer[0] |
| 88 | total_bytes_read++ |
| 89 | if total_bytes_read > 3 && msg[total_bytes_read - 1] == `\n` |
| 90 | && msg[total_bytes_read - 2] == `\r` && msg[total_bytes_read - 3] == `\n` |
| 91 | && msg[total_bytes_read - 4] == `\r` { |
| 92 | return msg[..total_bytes_read].bytestr() |
| 93 | } |
| 94 | } |
| 95 | return error('proxy request headers exceeded 4096 bytes') |
| 96 | } |
| 97 | |
| 98 | fn tunnel_proxy_connection(mut client net.TcpConn, mut upstream net.TcpConn) { |
| 99 | defer { |
| 100 | client.close() or {} |
| 101 | upstream.close() or {} |
| 102 | } |
| 103 | client.set_read_timeout(50 * time.millisecond) |
| 104 | upstream.set_read_timeout(50 * time.millisecond) |
| 105 | deadline := time.now().add(4 * time.second) |
| 106 | for time.now() < deadline { |
| 107 | relay_proxy_connection(mut client, mut upstream) |
| 108 | relay_proxy_connection(mut upstream, mut client) |
| 109 | } |
| 110 | } |
| 111 | |
| 112 | fn relay_proxy_connection(mut src net.TcpConn, mut dst net.TcpConn) { |
| 113 | mut buf := []u8{len: 1024} |
| 114 | bytes_read := src.read(mut buf) or { |
| 115 | if err.code() == net.err_timed_out_code { |
| 116 | return |
| 117 | } |
| 118 | return |
| 119 | } |
| 120 | if bytes_read > 0 { |
| 121 | dst.write(buf[..bytes_read]) or {} |
| 122 | } |
| 123 | } |
| 124 | |
| 125 | fn proxy_open_cb(mut client websocket.Client) ! { |
| 126 | client.pong()! |
| 127 | } |
| 128 | |
| 129 | fn proxy_error_cb(mut client websocket.Client, err string, mut res ProxyWebsocketTestResults) ! { |
| 130 | res.error_message = err |
| 131 | } |
| 132 | |
| 133 | fn proxy_message_cb(mut client websocket.Client, msg &websocket.Message, mut res ProxyWebsocketTestResults) ! { |
| 134 | if msg.opcode != .text_frame { |
| 135 | return |
| 136 | } |
| 137 | match msg.payload.bytestr() { |
| 138 | 'pong' { |
| 139 | res.nr_pong_received++ |
| 140 | } |
| 141 | 'a' { |
| 142 | res.nr_messages++ |
| 143 | } |
| 144 | else { |
| 145 | res.error_message = 'unexpected payload: ${msg.payload.bytestr()}' |
| 146 | } |
| 147 | } |
| 148 | } |
| 149 | |
| 150 | fn test_ws_connection_through_proxy() ! { |
| 151 | if proxy_should_skip { |
| 152 | return |
| 153 | } |
| 154 | start_proxy_ws_server(proxy_ws_target_port)! |
| 155 | requests := chan string{cap: 1} |
| 156 | start_proxy_server(proxy_ws_port, '127.0.0.1:${proxy_ws_target_port}', requests) |
| 157 | mut results := ProxyWebsocketTestResults{} |
| 158 | mut client := websocket.new_client('ws://127.0.0.1:${proxy_ws_target_port}', websocket.ClientOpt{ |
| 159 | proxy_url: 'http://user:[email protected]:${proxy_ws_port}' |
| 160 | })! |
| 161 | client.on_open(proxy_open_cb) |
| 162 | client.on_error_ref(proxy_error_cb, results) |
| 163 | client.on_message_ref(proxy_message_cb, results) |
| 164 | client.connect()! |
| 165 | spawn client.listen() |
| 166 | for msg in ['a', 'a'] { |
| 167 | client.write(msg.bytes(), .text_frame)! |
| 168 | time.sleep(100 * time.millisecond) |
| 169 | } |
| 170 | time.sleep(1500 * time.millisecond) |
| 171 | request := <-requests |
| 172 | auth_token := base64.encode('user:pass'.bytes()) |
| 173 | assert request.starts_with('CONNECT 127.0.0.1:${proxy_ws_target_port} HTTP/1.1\r\n') |
| 174 | assert request.contains('Proxy-Connection: Keep-Alive\r\n') |
| 175 | assert request.contains('Proxy-Authorization: Basic ${auth_token}\r\n') |
| 176 | client.close(1000, 'done') or {} |
| 177 | time.sleep(100 * time.millisecond) |
| 178 | assert results.error_message == '' |
| 179 | assert results.nr_pong_received >= 2 |
| 180 | assert results.nr_messages == 2 |
| 181 | } |
| 182 | |