v2 / vlib / net / unix / stream.c.v
512 lines · 450 sloc · 14.42 KB · 9e1d3ca05db333f60c4201e1d35db705c319d4dd
Raw
1module unix
2
3import time
4import os
5import net
6
7const unix_default_read_timeout = 30 * time.second
8const unix_default_write_timeout = 30 * time.second
9const connect_timeout = 5 * time.second
10const msg_nosignal = 0x4000
11
12// UnixDialer is a concrete instance of the Dialer interface,
13// for creating unix socket connections.
14pub struct UnixDialer {}
15
16// dial will try to create a new abstract connection to the given address.
17// It will return an error, if that is not possible.
18pub fn (u UnixDialer) dial(address string) !net.Connection {
19 return connect_stream(address)!
20}
21
22@[heap]
23pub struct StreamConn {
24pub mut:
25 sock StreamSocket
26mut:
27 handle int
28 write_deadline time.Time
29 read_deadline time.Time
30 read_timeout time.Duration
31 write_timeout time.Duration
32 is_blocking bool
33}
34
35// connect_stream returns a SOCK_STREAM connection for an unix domain socket on `socket_path`
36pub fn connect_stream(socket_path string) !&StreamConn {
37 if socket_path.len >= max_sun_path {
38 return error('Socket path too long! Max length: ${max_sun_path - 1} chars.')
39 }
40 mut s := new_stream_socket(socket_path) or {
41 return error('${err.msg()}; could not create new unix socket')
42 }
43
44 s.connect(socket_path)!
45
46 return &StreamConn{
47 sock: s
48 read_timeout: unix_default_read_timeout
49 write_timeout: unix_default_write_timeout
50 }
51}
52
53// addr returns the local address of the stream
54pub fn (c StreamConn) addr() !net.Addr {
55 return error('not implemented for unix connections')
56}
57
58// peer_addr returns the address of the remote peer of the stream
59pub fn (c StreamConn) peer_addr() !net.Addr {
60 return error('not implemented for unix connections')
61}
62
63// close closes the connection
64pub fn (mut c StreamConn) close() ! {
65 $if trace_unix ? {
66 eprintln(' StreamConn.close | c.sock.handle: ${c.sock.handle:6}')
67 }
68 c.sock.close()!
69}
70
71// write_ptr blocks and attempts to write all data
72pub fn (mut c StreamConn) write_ptr(b &u8, len int) !int {
73 $if trace_unix_sock_handle ? {
74 eprintln('>>> StreamConn.write_ptr | c: ${ptr_str(c)} | c.sock.handle: ${c.sock.handle} | b: ${ptr_str(b)} | len: ${len}')
75 }
76 $if trace_unix ? {
77 eprintln(
78 '>>> StreamConn.write_ptr | c.sock.handle: ${c.sock.handle} | b: ${ptr_str(b)} len: ${len} |\n' +
79 unsafe { b.vstring_with_len(len) })
80 }
81 $if trace_unix_data_write ? {
82 eprintln(
83 '>>> StreamConn.write_ptr | data.len: ${len:6} | hex: ${unsafe { b.vbytes(len) }.hex()} | data: ' +
84 unsafe { b.vstring_with_len(len) })
85 }
86 unsafe {
87 mut ptr_base := &u8(b)
88 mut total_sent := 0
89 for total_sent < len {
90 ptr := ptr_base + total_sent
91 remaining := len - total_sent
92 mut sent := $if is_coroutine ? {
93 C.photon_send(c.sock.handle, ptr, remaining, net.msg_nosignal, c.write_timeout)
94 } $else {
95 C.send(c.sock.handle, ptr, remaining, net.msg_nosignal)
96 }
97 $if trace_unix_data_write ? {
98 eprintln('>>> UnixConn.write_ptr | data chunk, total_sent: ${total_sent:6}, remaining: ${remaining:6}, ptr: ${voidptr(ptr):x} => sent: ${sent:6}')
99 }
100 if sent < 0 {
101 code := net.error_code()
102 if code == int(net.error_ewouldblock) {
103 c.wait_for_write()!
104 continue
105 } else {
106 net.wrap_error(code)!
107 }
108 }
109 total_sent += sent
110 }
111 return total_sent
112 }
113}
114
115// write blocks and attempts to write all data
116pub fn (mut c StreamConn) write(bytes []u8) !int {
117 return c.write_ptr(bytes.data, bytes.len)
118}
119
120// write_string blocks and attempts to write all data
121pub fn (mut c StreamConn) write_string(s string) !int {
122 return c.write_ptr(s.str, s.len)
123}
124
125// read_ptr attempts to write all data
126pub fn (mut c StreamConn) read_ptr(buf_ptr &u8, len int) !int {
127 mut res := $if is_coroutine ? {
128 wrap_read_result(C.photon_recv(c.sock.handle, voidptr(buf_ptr), len, 0, c.read_timeout))!
129 } $else {
130 wrap_read_result(C.recv(c.sock.handle, voidptr(buf_ptr), len, 0))!
131 }
132 $if trace_unix ? {
133 eprintln('<<< StreamConn.read_ptr | c.sock.handle: ${c.sock.handle} | buf_ptr: ${ptr_str(buf_ptr)} len: ${len} | res: ${res}')
134 }
135 if res > 0 {
136 $if trace_unix_data_read ? {
137 eprintln(
138 '<<< StreamConn.read_ptr | 1 data.len: ${res:6} | hex: ${unsafe { buf_ptr.vbytes(res) }.hex()} | data: ' +
139 unsafe { buf_ptr.vstring_with_len(res) })
140 }
141 return res
142 }
143 code := net.error_code()
144 if code == int(net.error_ewouldblock) {
145 c.wait_for_read()!
146 res = $if is_coroutine ? {
147 wrap_read_result(C.photon_recv(c.sock.handle, voidptr(buf_ptr), len, 0, c.read_timeout))!
148 } $else {
149 wrap_read_result(C.recv(c.sock.handle, voidptr(buf_ptr), len, 0))!
150 }
151 $if trace_unix ? {
152 eprintln('<<< StreamConn.read_ptr | c.sock.handle: ${c.sock.handle} | buf_ptr: ${ptr_str(buf_ptr)} len: ${len} | res: ${res}')
153 }
154 $if trace_unix_data_read ? {
155 if res > 0 {
156 eprintln(
157 '<<< StreamConn.read_ptr | 2 data.len: ${res:6} | hex: ${unsafe { buf_ptr.vbytes(res) }.hex()} | data: ' +
158 unsafe { buf_ptr.vstring_with_len(res) })
159 }
160 }
161 return net.socket_error(res)
162 } else {
163 net.wrap_error(code)!
164 }
165 return error('none')
166}
167
168// read data into `buf`
169pub fn (mut c StreamConn) read(mut buf []u8) !int {
170 return c.read_ptr(buf.data, buf.len)
171}
172
173// read_deadline returns the read deadline
174pub fn (mut c StreamConn) read_deadline() !time.Time {
175 if c.read_deadline.unix() == 0 {
176 return c.read_deadline
177 }
178 return error('none')
179}
180
181// set_read_deadlien sets the read deadline
182pub fn (mut c StreamConn) set_read_deadline(deadline time.Time) {
183 c.read_deadline = deadline
184}
185
186// write_deadline returns the write deadline
187pub fn (mut c StreamConn) write_deadline() !time.Time {
188 if c.write_deadline.unix() == 0 {
189 return c.write_deadline
190 }
191 return error('none')
192}
193
194// set_write_deadline sets the write deadline
195pub fn (mut c StreamConn) set_write_deadline(deadline time.Time) {
196 c.write_deadline = deadline
197}
198
199// read_timeout returns the read timeout
200pub fn (c &StreamConn) read_timeout() time.Duration {
201 return c.read_timeout
202}
203
204// set_read_timeout sets the read timeout
205pub fn (mut c StreamConn) set_read_timeout(t time.Duration) {
206 c.read_timeout = t
207}
208
209// write_timeout returns the write timeout
210pub fn (c &StreamConn) write_timeout() time.Duration {
211 return c.write_timeout
212}
213
214// set_write_timeout sets the write timeout
215pub fn (mut c StreamConn) set_write_timeout(t time.Duration) {
216 c.write_timeout = t
217}
218
219// wait_for_read blocks until the socket is ready to read
220@[inline]
221pub fn (mut c StreamConn) wait_for_read() ! {
222 return wait_for_read(c.sock.handle, c.read_deadline, c.read_timeout)
223}
224
225// wait_for_read blocks until the socket is ready to write
226@[inline]
227pub fn (mut c StreamConn) wait_for_write() ! {
228 return wait_for_write(c.sock.handle, c.write_deadline, c.write_timeout)
229}
230
231// str returns a string representation of connection `c`
232pub fn (c StreamConn) str() string {
233 s := c.sock.str().replace('\n', ' ').replace(' ', ' ')
234 return 'StreamConn{ write_deadline: ${c.write_deadline}, read_deadline: ${c.read_deadline}, read_timeout: ${c.read_timeout}, write_timeout: ${c.write_timeout}, sock: ${s} }'
235}
236
237pub struct StreamListener {
238pub mut:
239 sock StreamSocket
240mut:
241 accept_timeout time.Duration
242 accept_deadline time.Time
243}
244
245@[params]
246pub struct ListenOptions {
247pub:
248 backlog int = 128
249}
250
251// listen_stream creates an unix domain socket at `socket_path`
252pub fn listen_stream(socket_path string, options ListenOptions) !&StreamListener {
253 if socket_path.len >= max_sun_path {
254 return error('Socket path too long! Max length: ${max_sun_path - 1} chars.')
255 }
256 mut s := new_stream_socket(socket_path) or {
257 return error('${err.msg()}; could not create new unix stream socket')
258 }
259
260 addrs := net.resolve_addrs(socket_path, .unix, .tcp) or {
261 return error('${err.msg()}; could not resolve path ${socket_path}')
262 }
263 addr := addrs[0]
264
265 // cast to the correct type
266 alen := addr.len()
267
268 // try to unlink/remove an existing filesystem object at `socket_path`. Ignore errors,
269 // because it's ok if the path doesn't exists and if it exists, but can't be unlinked
270 // then `bind` will generate an error
271 $if windows {
272 os.rm(socket_path) or {}
273 } $else {
274 C.unlink(&char(socket_path.str))
275 }
276
277 net.socket_error_message(C.bind(s.handle, voidptr(&addr), alen),
278 'binding to ${socket_path} failed')!
279 net.socket_error_message(C.listen(s.handle, options.backlog),
280 'listening on ${socket_path} with maximum backlog pending queue of ${options.backlog}, failed')!
281 return &StreamListener{
282 sock: s
283 accept_deadline: no_deadline
284 accept_timeout: infinite_timeout
285 }
286}
287
288// accept accepts blocks until a new connection occurs
289pub fn (mut l StreamListener) accept() !&StreamConn {
290 $if trace_unix ? {
291 eprintln(' StreamListener.accept | l.sock.handle: ${l.sock.handle:6}')
292 }
293
294 mut new_handle := $if is_coroutine ? {
295 C.photon_accept(l.sock.handle, 0, 0, unix_default_read_timeout)
296 } $else {
297 C.accept(l.sock.handle, 0, 0)
298 }
299 if new_handle <= 0 {
300 l.wait_for_accept()!
301 new_handle = $if is_coroutine ? {
302 C.photon_accept(l.sock.handle, 0, 0, unix_default_read_timeout)
303 } $else {
304 C.accept(l.sock.handle, 0, 0)
305 }
306 if new_handle == -1 || new_handle == 0 {
307 return error('accept failed')
308 }
309 }
310
311 mut c := &StreamConn{
312 handle: new_handle
313 read_timeout: unix_default_read_timeout
314 write_timeout: unix_default_write_timeout
315 }
316 c.sock = stream_socket_from_handle(c.handle)!
317 return c
318}
319
320// accept_deadline returns the deadline until a new client is accepted
321pub fn (l &StreamListener) accept_deadline() !time.Time {
322 if l.accept_deadline.unix() != 0 {
323 return l.accept_deadline
324 }
325 return error('no deadline')
326}
327
328// set_accept_deadline sets the deadlinme until a new client is accepted
329pub fn (mut l StreamListener) set_accept_deadline(deadline time.Time) {
330 l.accept_deadline = deadline
331}
332
333// accept_timeout returns the timeout until a new client is accepted
334pub fn (l &StreamListener) accept_timeout() time.Duration {
335 return l.accept_timeout
336}
337
338// set_accept_timeout sets the timeout until a new client is accepted
339pub fn (mut l StreamListener) set_accept_timeout(t time.Duration) {
340 l.accept_timeout = t
341}
342
343// wait_for_accept blocks until a client can be accepted
344pub fn (mut l StreamListener) wait_for_accept() ! {
345 return wait_for_read(l.sock.handle, l.accept_deadline, l.accept_timeout)
346}
347
348// close closes the listening socket and unlinks/removes the socket file
349pub fn (mut l StreamListener) close() ! {
350 l.sock.close()!
351 l.unlink()!
352}
353
354// unlink removes the unix socket from the file system
355pub fn (mut l StreamListener) unlink() ! {
356 $if windows {
357 os.rm(l.sock.socket_path)!
358 } $else {
359 net.socket_error_message(C.unlink(&char(l.sock.socket_path.str)),
360 'could not unlink ${l.sock.socket_path}')!
361 }
362}
363
364// unlink_on_signal removes the socket from the filesystem when signal `signum` occurs
365pub fn (mut l StreamListener) unlink_on_signal(signum os.Signal) ! {
366 os.signal_opt(.int, fn [mut l] (sign os.Signal) {
367 $if trace_unix ? {
368 eprintln(' StreamListener.unlink_on_signal received signal ${sign}; unlinking unix socket ${l.sock.socket_path}')
369 }
370 l.unlink() or {}
371 exit(1)
372 })!
373}
374
375// addr returns the `net.Addr` version of the listening socket's path
376pub fn (mut l StreamListener) addr() !net.Addr {
377 return l.sock.address()!
378}
379
380pub struct StreamSocket {
381 net.Socket
382mut:
383 socket_path string
384}
385
386fn new_stream_socket(socket_path string) !StreamSocket {
387 handle := $if is_coroutine ? {
388 net.socket_error(C.photon_socket(i32(net.AddrFamily.unix), i32(net.SocketType.tcp), 0))!
389 } $else {
390 net.socket_error(C.socket(i32(net.AddrFamily.unix), i32(net.SocketType.tcp), 0))!
391 }
392 mut s := StreamSocket{
393 handle: handle
394 socket_path: socket_path
395 }
396
397 $if trace_unix ? {
398 eprintln(' new_unix_socket | s.handle: ${s.handle:6}')
399 }
400
401 $if net_nonblocking_sockets ? {
402 net.set_blocking(handle, false)!
403 }
404 return s
405}
406
407fn (mut s StreamSocket) close() ! {
408 // shutdown might be redundant for unix domain sockets, but it doesn't hurt to call it
409 shutdown(s.handle)
410 return close(s.handle)
411}
412
413fn (mut s StreamSocket) select(test Select, timeout time.Duration) !bool {
414 return select(s.handle, test, timeout)
415}
416
417// set_option sets an option on the socket
418fn (mut s StreamSocket) set_option(level int, opt int, value int) ! {
419 net.socket_error(C.setsockopt(s.handle, level, opt, &value, sizeof(int)))!
420}
421
422// set_option_bool sets a boolean option on the socket
423pub fn (mut s StreamSocket) set_option_bool(opt net.SocketOption, value bool) ! {
424 if opt !in net.opts_can_set {
425 return net.err_option_not_settable
426 }
427 if opt !in net.opts_bool {
428 return net.err_option_wrong_type
429 }
430 x := int(value)
431 s.set_option(C.SOL_SOCKET, int(opt), x)!
432}
433
434// set_option_bool sets an int option on the socket
435pub fn (mut s StreamSocket) set_option_int(opt net.SocketOption, value int) ! {
436 s.set_option(C.SOL_SOCKET, int(opt), value)!
437}
438
439fn (mut s StreamSocket) connect(socket_path string) ! {
440 if socket_path.len >= max_sun_path {
441 return error('Socket path too long! Max length: ${max_sun_path - 1} chars.')
442 }
443
444 addrs := net.resolve_addrs(socket_path, .unix, .tcp) or {
445 return error('${err.msg()}; could not resolve path ${socket_path}')
446 }
447 addr := addrs[0]
448 // cast to the correct type
449 alen := addr.len()
450
451 $if net_nonblocking_sockets ? {
452 res := $if is_coroutine ? {
453 C.photon_connect(s.handle, voidptr(&addr), alen, unix_default_read_timeout)
454 } $else {
455 C.connect(s.handle, voidptr(&addr), alen)
456 }
457 if res == 0 {
458 return
459 }
460 ecode := net.error_code()
461
462 // no need to check for einprogress on nix
463 // On windows we expect res == -1 && net.error_code() == ewouldblock
464 $if windows {
465 if ecode == int(net.error_ewouldblock) {
466 // The socket is nonblocking and the connection cannot be completed
467 // immediately. Wait till the socket is ready to write
468 write_result := s.select(.write, connect_timeout)!
469 err := 0
470 len := sizeof(err)
471 // determine whether connect() completed successfully (SO_ERROR is zero)
472 xyz := C.getsockopt(s.handle, C.SOL_SOCKET, C.SO_ERROR, &err, &len)
473 if xyz == 0 && err == 0 {
474 return
475 }
476 if write_result {
477 if xyz == 0 {
478 net.wrap_error(err)!
479 return
480 }
481 return
482 }
483 return net.err_timed_out
484 }
485 }
486 net.wrap_error(ecode)!
487 return
488 } $else {
489 x := $if is_coroutine ? {
490 C.photon_connect(s.handle, voidptr(&addr), alen, unix_default_read_timeout)
491 } $else {
492 C.connect(s.handle, voidptr(&addr), alen)
493 }
494 net.socket_error(x)!
495 }
496}
497
498// stream_socket_from_handle returns a `StreamSocket` instance from the raw file descriptor `sockfd`
499pub fn stream_socket_from_handle(sockfd int) !&StreamSocket {
500 mut s := &StreamSocket{
501 handle: sockfd
502 }
503
504 $if trace_unix ? {
505 eprintln(' stream_socket_from_handle | s.handle: ${s.handle:6}')
506 }
507
508 $if net_nonblocking_sockets ? {
509 net.set_blocking(sockfd, false)!
510 }
511 return s
512}
513