v2 / vlib / net / websocket / websocket_server.v
271 lines · 250 sloc · 7.55 KB · db0afe477c97d1c90973ae80f3a4324852cc8fda
Raw
1module websocket
2
3import net
4import net.ssl
5import log
6import time
7import rand
8
9pub struct ServerState {
10mut:
11 ping_interval int = 30 // interval for sending ping to clients (seconds)
12 state State = .closed // current state of connection
13pub mut:
14 clients map[string]&ServerClient // clients connected to this server
15}
16
17// Server represents a websocket server connection
18pub struct Server {
19mut:
20 logger &log.Logger = default_logger
21 ls &net.TcpListener = unsafe { nil } // listener used to get incoming connection to socket
22 accept_client_callbacks []AcceptClientFn // accept client callback functions
23 attached_callbacks []AttachedEventHandler
24 message_callbacks []MessageEventHandler // new message callback functions
25 close_callbacks []CloseEventHandler // close message callback functions
26pub:
27 family net.AddrFamily = .ip
28 port int // port used as listen to incoming connections
29 is_ssl bool // true if secure connection (not supported yet on server)
30pub mut:
31 server_state shared ServerState
32}
33
34// ServerClient represents a connected client
35pub struct ServerClient {
36pub:
37 resource_name string // resource that the client access
38 client_key string // unique key of client
39pub mut:
40 server &Server = unsafe { nil }
41 client &Client = unsafe { nil }
42}
43
44@[params]
45pub struct ServerOpt {
46pub:
47 logger &log.Logger = default_logger
48}
49
50// new_server instance a new websocket server on provided port and route
51pub fn new_server(family net.AddrFamily, port int, route string, opt ServerOpt) &Server {
52 return &Server{
53 ls: unsafe { nil }
54 family: family
55 port: port
56 logger: opt.logger
57 }
58}
59
60// set_ping_interval sets the interval that the server will send ping messages to clients
61pub fn (mut s Server) set_ping_interval(seconds int) {
62 lock s.server_state {
63 s.server_state.ping_interval = seconds
64 }
65}
66
67// get_ping_interval return the interval that the server will send ping messages to clients
68pub fn (mut s Server) get_ping_interval() int {
69 return rlock s.server_state {
70 s.server_state.ping_interval
71 }
72}
73
74// listen start listen and process to incoming connections from websocket clients
75pub fn (mut s Server) listen() ! {
76 s.logger.info('websocket server: start listen on port ${s.port}')
77 s.ls = net.listen_tcp(s.family, ':${s.port}')!
78 s.set_state(.open)
79 spawn s.handle_ping()
80 for {
81 mut c := s.accept_new_client() or { continue }
82 spawn s.serve_client(mut c)
83 }
84 s.logger.info('websocket server: end listen on port ${s.port}')
85}
86
87// Close closes server (not implemented yet)
88fn (mut s Server) close() {
89 // TODO: implement close when moving to net from x.net
90}
91
92// handle_ping sends ping to all clients every set interval
93fn (mut s Server) handle_ping() {
94 mut clients_to_remove := []string{}
95 for s.get_state() == .open {
96 time.sleep(s.get_ping_interval() * time.second)
97 for i, _ in rlock s.server_state {
98 s.server_state.clients
99 } {
100 mut c := rlock s.server_state {
101 s.server_state.clients[i] or { continue }
102 }
103 if c.client.get_state() == .open {
104 c.client.ping() or {
105 s.logger.debug('server-> error sending ping to client')
106 c.client.close(1002, 'Closing connection: ping send error') or {
107 // we want to continue even if error
108 continue
109 }
110 clients_to_remove << c.client.id
111 }
112 if (time.now().unix() - c.client.last_pong_ut) > s.get_ping_interval() * 2 {
113 clients_to_remove << c.client.id
114 c.client.close(1000, 'no pong received') or { continue }
115 }
116 }
117 }
118 // TODO: replace for with s.clients.delete_all(clients_to_remove) if (https://github.com/vlang/v/pull/6020) merges
119 for client in clients_to_remove {
120 lock s.server_state {
121 s.server_state.clients.delete(client)
122 }
123 }
124 clients_to_remove.clear()
125 }
126}
127
128// serve_client accepts incoming connection and sets up the callbacks
129fn (mut s Server) serve_client(mut c Client) ! {
130 c.logger.debug('server-> Start serve client (${c.id})')
131 defer {
132 c.logger.debug('server-> End serve client (${c.id})')
133 }
134 mut handshake_response, mut server_client := s.handle_server_handshake(mut c)!
135 s.attach_client(mut server_client, handshake_response)!
136 c.listen() or {
137 s.logger.error(err.msg())
138 return err
139 }
140}
141
142// handle_handshake use an existing connection to respond to the handshake for a given key
143pub fn (mut s Server) handle_handshake(mut conn net.TcpConn, key string) !&ServerClient {
144 mut logger := &log.Log{}
145 logger.set_level(.debug)
146 mut c := &Client{
147 is_server: true
148 conn: conn
149 is_ssl: false
150 logger: logger
151 client_state: ClientState{
152 state: .open
153 }
154 last_pong_ut: time.now().unix()
155 id: rand.uuid_v4()
156 }
157 mut server_client := &ServerClient{
158 resource_name: 'GET'
159 client_key: key
160 client: unsafe { c }
161 server: unsafe { &s }
162 }
163 digest := create_key_challenge_response(key)!
164 handshake_response := 'HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: ${digest}\r\n\r\n'
165 s.attach_client(mut server_client, handshake_response)!
166 spawn s.handle_ping()
167 c.listen() or {
168 s.logger.error(err.msg())
169 return err
170 }
171 return server_client
172}
173
174fn (mut s Server) attach_client(mut server_client ServerClient, handshake_response string) ! {
175 accept := s.send_connect_event(mut server_client)!
176 if !accept {
177 s.logger.debug('server-> client not accepted')
178 server_client.client.shutdown_socket()!
179 return
180 }
181 // the client is accepted
182 server_client.client.socket_write(handshake_response.bytes())!
183 lock s.server_state {
184 s.server_state.clients[server_client.client.id] = unsafe { server_client }
185 }
186 s.setup_callbacks(mut server_client)
187 s.send_attached_event(mut server_client) or {
188 lock s.server_state {
189 s.server_state.clients.delete(server_client.client.id)
190 }
191 server_client.client.shutdown_socket() or {}
192 return err
193 }
194}
195
196// setup_callbacks initialize all callback functions
197fn (mut s Server) setup_callbacks(mut sc ServerClient) {
198 if s.message_callbacks.len > 0 {
199 for cb in s.message_callbacks {
200 if cb.is_ref {
201 sc.client.on_message_ref(cb.handler2, cb.ref)
202 } else {
203 sc.client.on_message(cb.handler)
204 }
205 }
206 }
207 if s.close_callbacks.len > 0 {
208 for cb in s.close_callbacks {
209 if cb.is_ref {
210 sc.client.on_close_ref(cb.handler2, cb.ref)
211 } else {
212 sc.client.on_close(cb.handler)
213 }
214 }
215 }
216 // set standard close so we can remove client if closed
217 sc.client.on_close_ref(delete_client_cb, sc)
218}
219
220fn delete_client_cb(mut c Client, _code int, _reason string, mut sc ServerClient) ! {
221 c.logger.debug('server-> Delete client')
222 lock sc.server.server_state {
223 sc.server.server_state.clients.delete(sc.client.id)
224 }
225}
226
227// accept_new_client creates a new client instance for client that connects to the socket
228fn (mut s Server) accept_new_client() !&Client {
229 mut new_conn := s.ls.accept()!
230 c := &Client{
231 is_server: true
232 conn: new_conn
233 ssl_conn: ssl.new_ssl_conn()!
234 logger: s.logger
235 client_state: ClientState{
236 state: .open
237 }
238 last_pong_ut: time.now().unix()
239 id: rand.uuid_v4()
240 }
241 return c
242}
243
244// set_state sets current state in a thread safe way
245pub fn (mut s Server) set_state(state State) {
246 lock s.server_state {
247 s.server_state.state = state
248 }
249}
250
251// get_state return current state in a thread safe way
252pub fn (s &Server) get_state() State {
253 return rlock s.server_state {
254 s.server_state.state
255 }
256}
257
258// free manages manual free of memory for Server instance
259pub fn (mut s Server) free() {
260 lock s.server_state {
261 unsafe {
262 s.server_state.clients.free()
263 }
264 }
265
266 unsafe {
267 s.accept_client_callbacks.free()
268 s.message_callbacks.free()
269 s.close_callbacks.free()
270 }
271}
272