v2 / vlib / net / websocket / websocket_proxy_test.v
181 lines · 167 sloc · 4.97 KB · 46480db4530e0b7dc953c4c797aabfc61d1c073f
Raw
1// vtest build: !windows
2import encoding.base64
3import os
4import net
5import net.websocket
6import time
7
8const proxy_github_job = os.getenv('GITHUB_JOB')
9
10const proxy_should_skip = proxy_github_job != '' && proxy_github_job != 'websocket_tests'
11
12const proxy_ws_target_port = 30105
13const proxy_ws_port = 30106
14
15@[heap]
16struct ProxyWebsocketTestResults {
17pub mut:
18 nr_messages int
19 nr_pong_received int
20 error_message string
21}
22
23fn start_proxy_ws_server(listen_port int) ! {
24 mut s := websocket.new_server(.ip, listen_port, '')
25 s.set_ping_interval(1)
26 s.on_connect(fn (mut s websocket.ServerClient) !bool {
27 return s.resource_name == '/'
28 })!
29 s.on_message(fn (mut ws websocket.Client, msg &websocket.Message) ! {
30 match msg.opcode {
31 .pong { ws.write_string('pong')! }
32 else { ws.write(msg.payload, msg.opcode)! }
33 }
34 })
35 start_proxy_ws_server_in_thread(mut s)
36}
37
38fn start_proxy_ws_server_in_thread(mut ws websocket.Server) {
39 spawn fn [mut ws] () {
40 ws.listen() or { panic('websocket server could not listen, err: ${err}') }
41 }()
42 for ws.get_state() != .open {
43 time.sleep(10 * time.millisecond)
44 }
45}
46
47fn start_proxy_server(listen_port int, target string, requests chan string) {
48 ready := chan bool{cap: 1}
49 spawn fn [listen_port, target, requests, ready] () {
50 mut listener := net.listen_tcp(.ip, ':${listen_port}') or { panic(err) }
51 ready <- true
52 mut client := listener.accept() or {
53 listener.close() or {}
54 panic(err)
55 }
56 listener.close() or {}
57 request := read_proxy_request(mut client) or {
58 requests <- 'ERROR: ${err}'
59 client.close() or {}
60 return
61 }
62 requests <- request
63 mut upstream := net.dial_tcp(target) or {
64 client.write_string('HTTP/1.1 502 Bad Gateway\r\n\r\n') or {}
65 client.close() or {}
66 return
67 }
68 client.write_string('HTTP/1.1 200 Connection Established\r\n\r\n') or {
69 client.close() or {}
70 upstream.close() or {}
71 return
72 }
73 tunnel_proxy_connection(mut client, mut upstream)
74 }()
75 _ := <-ready
76}
77
78fn read_proxy_request(mut conn net.TcpConn) !string {
79 mut total_bytes_read := 0
80 mut msg := [4096]u8{}
81 mut buffer := [1]u8{}
82 for total_bytes_read < msg.len {
83 bytes_read := conn.read_ptr(&buffer[0], 1)!
84 if bytes_read == 0 {
85 return error('unexpected EOF while reading proxy request')
86 }
87 msg[total_bytes_read] = buffer[0]
88 total_bytes_read++
89 if total_bytes_read > 3 && msg[total_bytes_read - 1] == `\n`
90 && msg[total_bytes_read - 2] == `\r` && msg[total_bytes_read - 3] == `\n`
91 && msg[total_bytes_read - 4] == `\r` {
92 return msg[..total_bytes_read].bytestr()
93 }
94 }
95 return error('proxy request headers exceeded 4096 bytes')
96}
97
98fn tunnel_proxy_connection(mut client net.TcpConn, mut upstream net.TcpConn) {
99 defer {
100 client.close() or {}
101 upstream.close() or {}
102 }
103 client.set_read_timeout(50 * time.millisecond)
104 upstream.set_read_timeout(50 * time.millisecond)
105 deadline := time.now().add(4 * time.second)
106 for time.now() < deadline {
107 relay_proxy_connection(mut client, mut upstream)
108 relay_proxy_connection(mut upstream, mut client)
109 }
110}
111
112fn relay_proxy_connection(mut src net.TcpConn, mut dst net.TcpConn) {
113 mut buf := []u8{len: 1024}
114 bytes_read := src.read(mut buf) or {
115 if err.code() == net.err_timed_out_code {
116 return
117 }
118 return
119 }
120 if bytes_read > 0 {
121 dst.write(buf[..bytes_read]) or {}
122 }
123}
124
125fn proxy_open_cb(mut client websocket.Client) ! {
126 client.pong()!
127}
128
129fn proxy_error_cb(mut client websocket.Client, err string, mut res ProxyWebsocketTestResults) ! {
130 res.error_message = err
131}
132
133fn proxy_message_cb(mut client websocket.Client, msg &websocket.Message, mut res ProxyWebsocketTestResults) ! {
134 if msg.opcode != .text_frame {
135 return
136 }
137 match msg.payload.bytestr() {
138 'pong' {
139 res.nr_pong_received++
140 }
141 'a' {
142 res.nr_messages++
143 }
144 else {
145 res.error_message = 'unexpected payload: ${msg.payload.bytestr()}'
146 }
147 }
148}
149
150fn test_ws_connection_through_proxy() ! {
151 if proxy_should_skip {
152 return
153 }
154 start_proxy_ws_server(proxy_ws_target_port)!
155 requests := chan string{cap: 1}
156 start_proxy_server(proxy_ws_port, '127.0.0.1:${proxy_ws_target_port}', requests)
157 mut results := ProxyWebsocketTestResults{}
158 mut client := websocket.new_client('ws://127.0.0.1:${proxy_ws_target_port}', websocket.ClientOpt{
159 proxy_url: 'http://user:[email protected]:${proxy_ws_port}'
160 })!
161 client.on_open(proxy_open_cb)
162 client.on_error_ref(proxy_error_cb, results)
163 client.on_message_ref(proxy_message_cb, results)
164 client.connect()!
165 spawn client.listen()
166 for msg in ['a', 'a'] {
167 client.write(msg.bytes(), .text_frame)!
168 time.sleep(100 * time.millisecond)
169 }
170 time.sleep(1500 * time.millisecond)
171 request := <-requests
172 auth_token := base64.encode('user:pass'.bytes())
173 assert request.starts_with('CONNECT 127.0.0.1:${proxy_ws_target_port} HTTP/1.1\r\n')
174 assert request.contains('Proxy-Connection: Keep-Alive\r\n')
175 assert request.contains('Proxy-Authorization: Basic ${auth_token}\r\n')
176 client.close(1000, 'done') or {}
177 time.sleep(100 * time.millisecond)
178 assert results.error_message == ''
179 assert results.nr_pong_received >= 2
180 assert results.nr_messages == 2
181}
182