| 1 | module websocket |
| 2 | |
| 3 | import net |
| 4 | import net.ssl |
| 5 | import log |
| 6 | import time |
| 7 | import rand |
| 8 | |
| 9 | pub struct ServerState { |
| 10 | mut: |
| 11 | ping_interval int = 30 // interval for sending ping to clients (seconds) |
| 12 | state State = .closed // current state of connection |
| 13 | pub mut: |
| 14 | clients map[string]&ServerClient // clients connected to this server |
| 15 | } |
| 16 | |
| 17 | // Server represents a websocket server connection |
| 18 | pub struct Server { |
| 19 | mut: |
| 20 | logger &log.Logger = default_logger |
| 21 | ls &net.TcpListener = unsafe { nil } // listener used to get incoming connection to socket |
| 22 | accept_client_callbacks []AcceptClientFn // accept client callback functions |
| 23 | attached_callbacks []AttachedEventHandler |
| 24 | message_callbacks []MessageEventHandler // new message callback functions |
| 25 | close_callbacks []CloseEventHandler // close message callback functions |
| 26 | pub: |
| 27 | family net.AddrFamily = .ip |
| 28 | port int // port used as listen to incoming connections |
| 29 | is_ssl bool // true if secure connection (not supported yet on server) |
| 30 | pub mut: |
| 31 | server_state shared ServerState |
| 32 | } |
| 33 | |
| 34 | // ServerClient represents a connected client |
| 35 | pub struct ServerClient { |
| 36 | pub: |
| 37 | resource_name string // resource that the client access |
| 38 | client_key string // unique key of client |
| 39 | pub mut: |
| 40 | server &Server = unsafe { nil } |
| 41 | client &Client = unsafe { nil } |
| 42 | } |
| 43 | |
| 44 | @[params] |
| 45 | pub struct ServerOpt { |
| 46 | pub: |
| 47 | logger &log.Logger = default_logger |
| 48 | } |
| 49 | |
| 50 | // new_server instance a new websocket server on provided port and route |
| 51 | pub fn new_server(family net.AddrFamily, port int, route string, opt ServerOpt) &Server { |
| 52 | return &Server{ |
| 53 | ls: unsafe { nil } |
| 54 | family: family |
| 55 | port: port |
| 56 | logger: opt.logger |
| 57 | } |
| 58 | } |
| 59 | |
| 60 | // set_ping_interval sets the interval that the server will send ping messages to clients |
| 61 | pub fn (mut s Server) set_ping_interval(seconds int) { |
| 62 | lock s.server_state { |
| 63 | s.server_state.ping_interval = seconds |
| 64 | } |
| 65 | } |
| 66 | |
| 67 | // get_ping_interval return the interval that the server will send ping messages to clients |
| 68 | pub fn (mut s Server) get_ping_interval() int { |
| 69 | return rlock s.server_state { |
| 70 | s.server_state.ping_interval |
| 71 | } |
| 72 | } |
| 73 | |
| 74 | // listen start listen and process to incoming connections from websocket clients |
| 75 | pub fn (mut s Server) listen() ! { |
| 76 | s.logger.info('websocket server: start listen on port ${s.port}') |
| 77 | s.ls = net.listen_tcp(s.family, ':${s.port}')! |
| 78 | s.set_state(.open) |
| 79 | spawn s.handle_ping() |
| 80 | for { |
| 81 | mut c := s.accept_new_client() or { continue } |
| 82 | spawn s.serve_client(mut c) |
| 83 | } |
| 84 | s.logger.info('websocket server: end listen on port ${s.port}') |
| 85 | } |
| 86 | |
| 87 | // Close closes server (not implemented yet) |
| 88 | fn (mut s Server) close() { |
| 89 | // TODO: implement close when moving to net from x.net |
| 90 | } |
| 91 | |
| 92 | // handle_ping sends ping to all clients every set interval |
| 93 | fn (mut s Server) handle_ping() { |
| 94 | mut clients_to_remove := []string{} |
| 95 | for s.get_state() == .open { |
| 96 | time.sleep(s.get_ping_interval() * time.second) |
| 97 | for i, _ in rlock s.server_state { |
| 98 | s.server_state.clients |
| 99 | } { |
| 100 | mut c := rlock s.server_state { |
| 101 | s.server_state.clients[i] or { continue } |
| 102 | } |
| 103 | if c.client.get_state() == .open { |
| 104 | c.client.ping() or { |
| 105 | s.logger.debug('server-> error sending ping to client') |
| 106 | c.client.close(1002, 'Closing connection: ping send error') or { |
| 107 | // we want to continue even if error |
| 108 | continue |
| 109 | } |
| 110 | clients_to_remove << c.client.id |
| 111 | } |
| 112 | if (time.now().unix() - c.client.last_pong_ut) > s.get_ping_interval() * 2 { |
| 113 | clients_to_remove << c.client.id |
| 114 | c.client.close(1000, 'no pong received') or { continue } |
| 115 | } |
| 116 | } |
| 117 | } |
| 118 | // TODO: replace for with s.clients.delete_all(clients_to_remove) if (https://github.com/vlang/v/pull/6020) merges |
| 119 | for client in clients_to_remove { |
| 120 | lock s.server_state { |
| 121 | s.server_state.clients.delete(client) |
| 122 | } |
| 123 | } |
| 124 | clients_to_remove.clear() |
| 125 | } |
| 126 | } |
| 127 | |
| 128 | // serve_client accepts incoming connection and sets up the callbacks |
| 129 | fn (mut s Server) serve_client(mut c Client) ! { |
| 130 | c.logger.debug('server-> Start serve client (${c.id})') |
| 131 | defer { |
| 132 | c.logger.debug('server-> End serve client (${c.id})') |
| 133 | } |
| 134 | mut handshake_response, mut server_client := s.handle_server_handshake(mut c)! |
| 135 | s.attach_client(mut server_client, handshake_response)! |
| 136 | c.listen() or { |
| 137 | s.logger.error(err.msg()) |
| 138 | return err |
| 139 | } |
| 140 | } |
| 141 | |
| 142 | // handle_handshake use an existing connection to respond to the handshake for a given key |
| 143 | pub fn (mut s Server) handle_handshake(mut conn net.TcpConn, key string) !&ServerClient { |
| 144 | mut logger := &log.Log{} |
| 145 | logger.set_level(.debug) |
| 146 | mut c := &Client{ |
| 147 | is_server: true |
| 148 | conn: conn |
| 149 | is_ssl: false |
| 150 | logger: logger |
| 151 | client_state: ClientState{ |
| 152 | state: .open |
| 153 | } |
| 154 | last_pong_ut: time.now().unix() |
| 155 | id: rand.uuid_v4() |
| 156 | } |
| 157 | mut server_client := &ServerClient{ |
| 158 | resource_name: 'GET' |
| 159 | client_key: key |
| 160 | client: unsafe { c } |
| 161 | server: unsafe { &s } |
| 162 | } |
| 163 | digest := create_key_challenge_response(key)! |
| 164 | handshake_response := 'HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: ${digest}\r\n\r\n' |
| 165 | s.attach_client(mut server_client, handshake_response)! |
| 166 | spawn s.handle_ping() |
| 167 | c.listen() or { |
| 168 | s.logger.error(err.msg()) |
| 169 | return err |
| 170 | } |
| 171 | return server_client |
| 172 | } |
| 173 | |
| 174 | fn (mut s Server) attach_client(mut server_client ServerClient, handshake_response string) ! { |
| 175 | accept := s.send_connect_event(mut server_client)! |
| 176 | if !accept { |
| 177 | s.logger.debug('server-> client not accepted') |
| 178 | server_client.client.shutdown_socket()! |
| 179 | return |
| 180 | } |
| 181 | // the client is accepted |
| 182 | server_client.client.socket_write(handshake_response.bytes())! |
| 183 | lock s.server_state { |
| 184 | s.server_state.clients[server_client.client.id] = unsafe { server_client } |
| 185 | } |
| 186 | s.setup_callbacks(mut server_client) |
| 187 | s.send_attached_event(mut server_client) or { |
| 188 | lock s.server_state { |
| 189 | s.server_state.clients.delete(server_client.client.id) |
| 190 | } |
| 191 | server_client.client.shutdown_socket() or {} |
| 192 | return err |
| 193 | } |
| 194 | } |
| 195 | |
| 196 | // setup_callbacks initialize all callback functions |
| 197 | fn (mut s Server) setup_callbacks(mut sc ServerClient) { |
| 198 | if s.message_callbacks.len > 0 { |
| 199 | for cb in s.message_callbacks { |
| 200 | if cb.is_ref { |
| 201 | sc.client.on_message_ref(cb.handler2, cb.ref) |
| 202 | } else { |
| 203 | sc.client.on_message(cb.handler) |
| 204 | } |
| 205 | } |
| 206 | } |
| 207 | if s.close_callbacks.len > 0 { |
| 208 | for cb in s.close_callbacks { |
| 209 | if cb.is_ref { |
| 210 | sc.client.on_close_ref(cb.handler2, cb.ref) |
| 211 | } else { |
| 212 | sc.client.on_close(cb.handler) |
| 213 | } |
| 214 | } |
| 215 | } |
| 216 | // set standard close so we can remove client if closed |
| 217 | sc.client.on_close_ref(delete_client_cb, sc) |
| 218 | } |
| 219 | |
| 220 | fn delete_client_cb(mut c Client, _code int, _reason string, mut sc ServerClient) ! { |
| 221 | c.logger.debug('server-> Delete client') |
| 222 | lock sc.server.server_state { |
| 223 | sc.server.server_state.clients.delete(sc.client.id) |
| 224 | } |
| 225 | } |
| 226 | |
| 227 | // accept_new_client creates a new client instance for client that connects to the socket |
| 228 | fn (mut s Server) accept_new_client() !&Client { |
| 229 | mut new_conn := s.ls.accept()! |
| 230 | c := &Client{ |
| 231 | is_server: true |
| 232 | conn: new_conn |
| 233 | ssl_conn: ssl.new_ssl_conn()! |
| 234 | logger: s.logger |
| 235 | client_state: ClientState{ |
| 236 | state: .open |
| 237 | } |
| 238 | last_pong_ut: time.now().unix() |
| 239 | id: rand.uuid_v4() |
| 240 | } |
| 241 | return c |
| 242 | } |
| 243 | |
| 244 | // set_state sets current state in a thread safe way |
| 245 | pub fn (mut s Server) set_state(state State) { |
| 246 | lock s.server_state { |
| 247 | s.server_state.state = state |
| 248 | } |
| 249 | } |
| 250 | |
| 251 | // get_state return current state in a thread safe way |
| 252 | pub fn (s &Server) get_state() State { |
| 253 | return rlock s.server_state { |
| 254 | s.server_state.state |
| 255 | } |
| 256 | } |
| 257 | |
| 258 | // free manages manual free of memory for Server instance |
| 259 | pub fn (mut s Server) free() { |
| 260 | lock s.server_state { |
| 261 | unsafe { |
| 262 | s.server_state.clients.free() |
| 263 | } |
| 264 | } |
| 265 | |
| 266 | unsafe { |
| 267 | s.accept_client_callbacks.free() |
| 268 | s.message_callbacks.free() |
| 269 | s.close_callbacks.free() |
| 270 | } |
| 271 | } |
| 272 | |