v2 / vlib / db / redis / redis.v
1262 lines · 1151 sloc · 33.94 KB · 8e35f4d9848f7ad35d857a187dddbfd2eca5e19d
Raw
1// https://redis.io/docs/latest/develop/reference/protocol-spec/
2
3module redis
4
5import math.big
6import net
7import net.ssl
8import strings
9
10// RESP3 wrapper types
11pub struct RedisBlobError {
12pub:
13 data []u8
14}
15
16pub struct RedisVerbatim {
17pub:
18 format string
19 data []u8
20}
21
22pub struct RedisMap {
23pub:
24 // interleaved key/value pairs: [k1, v1, k2, v2, ...]
25 pairs []RedisValue
26}
27
28pub struct RedisSet {
29pub:
30 elements []RedisValue
31}
32
33pub struct RedisPush {
34pub:
35 elements []RedisValue
36}
37
38// RedisValue represents all possible RESP (Redis Serialization Protocol) data types
39pub type RedisValue = bool
40 | big.Integer
41 | f32
42 | f64
43 | i64
44 | []u8
45 | RedisBlobError
46 | RedisMap
47 | RedisNull
48 | RedisPush
49 | RedisSet
50 | map[string]RedisValue
51 | []RedisValue
52 | RedisVerbatim
53 | string
54
55// RedisNull represents the Redis NULL type
56pub struct RedisNull {}
57
58const cmd_buf_pre_allocate_len = 4096 // Initial buffer size for command building
59const resp_buf_pre_allocate_len = 8192 // Initial buffer size for response reading
60const max_skip = 64 // Max non-prefix bytes to skip when resynchronizing
61
62// DB represents a Redis database connection
63pub struct DB {
64pub mut:
65 version int // RESP protocol version
66 conn &net.TcpConn = unsafe { nil } // TCP connection to Redis
67 ssl_conn &ssl.SSLConn = unsafe { nil } // SSL connection to Redis
68 tls bool
69
70 // Pre-allocated buffers to reduce memory allocations
71 cmd_buf []u8 // Buffer for building commands
72 resp_buf []u8 // Buffer for reading responses
73 pipeline_mode bool
74 pipeline_buffer []u8
75 pipeline_cmd_count int
76}
77
78// Configuration options for Redis connection
79@[params]
80pub struct Config {
81pub mut:
82 host string = '127.0.0.1' // Redis server host
83 port u16 = 6379 // Redis server port
84 password string // Redis server password (optional)
85 tls bool // Enable TLS/SSL connection
86 version int @[deprecated] // ignored - RESP version auto handled in connect
87}
88
89fn resp2_auth(password string, mut db DB) ! {
90 db.version = 2
91 if password.len > 0 {
92 db.auth(password)!
93 }
94}
95
96// connect establishes a connection to a Redis server
97pub fn connect(config Config) !DB {
98 mut db := DB{
99 tls: config.tls
100 cmd_buf: []u8{cap: cmd_buf_pre_allocate_len}
101 resp_buf: []u8{cap: resp_buf_pre_allocate_len}
102 }
103
104 if config.tls {
105 mut ssl_conn := ssl.new_ssl_conn(ssl.SSLConnectConfig{ validate: false })!
106 ssl_conn.dial(config.host, int(config.port))!
107 db.ssl_conn = ssl_conn
108 } else {
109 db.conn = net.dial_tcp('${config.host}:${config.port}')!
110 }
111
112 // Always attempt HELLO during connect() to negotiate RESP3 (and include AUTH
113 // subcommand when a password is provided). If HELLO fails or the server
114 // doesn't support it, fall back to RESP2 and perform AUTH via the AUTH
115 // command (if a password was given).
116
117 // default version 3
118 db.version = 3
119
120 // build HELLO 3 command; include AUTH subcommand when password present
121 db.cmd_buf.clear()
122 if config.password.len > 0 {
123 // *4\r\n$5\r\nHELLO\r\n$1\r\n3\r\n$4\r\nAUTH\r\n$<pwlen>\r\n<pw>\r\n
124 db.cmd_buf << '*4\r\n$5\r\nHELLO\r\n$1\r\n3\r\n$4\r\nAUTH\r\n$${config.password.len}\r\n${config.password}\r\n'.bytes()
125 } else {
126 // *2\r\n$5\r\nHELLO\r\n$1\r\n3\r\n
127 db.cmd_buf << '*2\r\n$5\r\nHELLO\r\n$1\r\n3\r\n'.bytes()
128 }
129
130 // send HELLO and attempt to read response. If any step fails, fallback to RESP2 + AUTH.
131 db.write_data(db.cmd_buf) or {
132 // write failed (connection error?) — fallback to RESP2 and perform AUTH if needed
133 resp2_auth(config.password, mut db)!
134 return db
135 }
136
137 // Try to read and consume HELLO response. If HELLO fails (unknown command / error),
138 // treat as not-supported and fall back to RESP2 + AUTH.
139 db.read_response() or {
140 resp2_auth(config.password, mut db)!
141 return db
142 }
143
144 return db
145}
146
147// close terminates the connection to Redis server
148pub fn (mut db DB) close() ! {
149 if db.tls {
150 db.ssl_conn.close()!
151 } else {
152 db.conn.close()!
153 }
154}
155
156// Helper methods for TLS abstraction
157fn (mut db DB) write_data(data []u8) ! {
158 if db.tls {
159 db.ssl_conn.write(data)!
160 } else {
161 db.conn.write(data)!
162 }
163}
164
165fn (mut db DB) read_data(mut buf []u8) !int {
166 if db.tls {
167 return db.ssl_conn.read(mut buf)
168 } else {
169 return db.conn.read(mut buf)
170 }
171}
172
173fn (mut db DB) read_ptr_data(ptr &u8, len int) !int {
174 if db.tls {
175 return db.ssl_conn.socket_read_into_ptr(ptr, len)!
176 } else {
177 return db.conn.read_ptr(ptr, len)!
178 }
179}
180
181// auth sends an AUTH command to the server with the given password.
182pub fn (mut db DB) auth(password string) ! {
183 resp := db.cmd('AUTH', password)!
184 match resp {
185 string {
186 if resp != 'OK' {
187 return error('Authentication failed: ${resp}')
188 }
189 }
190 else {
191 return error('Authentication failed: unexpected response type')
192 }
193 }
194}
195
196// ping sends a PING command to verify server responsiveness
197pub fn (mut db DB) ping() !string {
198 return db.cmd('PING')! as string
199}
200
201// validate checks whether the Redis connection is still responsive.
202pub fn (mut db DB) validate() !bool {
203 return db.ping()! == 'PONG'
204}
205
206// reset clears any queued pipeline state before the connection is reused.
207pub fn (mut db DB) reset() ! {
208 db.pipeline_mode = false
209 db.pipeline_cmd_count = 0
210 db.cmd_buf.clear()
211 db.resp_buf.clear()
212 db.pipeline_buffer.clear()
213}
214
215// del deletes a `key`
216pub fn (mut db DB) del(key string) !i64 {
217 // *2\r\n$3\r\nDEL\r\n$6\r\ncounter\r\n
218 // send cmd
219 db.cmd_buf.clear()
220 db.cmd_buf << '*2\r\n$3\r\nDEL\r\n$${key.len}\r\n${key}\r\n'.bytes()
221 if db.pipeline_mode {
222 db.pipeline_buffer << db.cmd_buf
223 db.pipeline_cmd_count++
224 } else {
225 db.write_data(db.cmd_buf)!
226
227 // read resp
228 return db.read_response()! as i64
229 }
230 return 0
231}
232
233// set stores a `key`-value` pair in Redis. Supported value types: number, string, []u8
234pub fn (mut db DB) set[T](key string, value T) !string {
235 // *3\r\n$3\r\nSET\r\n$4\r\nname\r\n$5\r\nVlang\r\n
236 db.cmd_buf.clear()
237 db.cmd_buf << '*3\r\n$3\r\nSET\r\n$${key.len}\r\n${key}\r\n'.bytes()
238 $if T is $int {
239 val_str := value.str()
240 db.cmd_buf << '$${val_str.len}\r\n${val_str}'.bytes()
241 } $else $if T is string {
242 db.cmd_buf << '$${value.len}\r\n${value}'.bytes()
243 } $else $if T is []u8 {
244 db.cmd_buf << '$${value.len}\r\n'.bytes()
245 db.cmd_buf << value
246 } $else {
247 return error('`set()`: unsupported value type. Allowed: number, string, []u8')
248 }
249 db.cmd_buf << '\r\n'.bytes()
250 if db.pipeline_mode {
251 db.pipeline_buffer << db.cmd_buf
252 db.pipeline_cmd_count++
253 } else {
254 db.write_data(db.cmd_buf)!
255 return db.read_response()! as string
256 }
257 return ''
258}
259
260// get retrieves the value of a `key`. Supported return types: string, int, []u8
261pub fn (mut db DB) get[T](key string) !T {
262 // *2\r\n$3\r\nGET\r\n$4\r\nname\r\n
263 // send cmd
264 db.cmd_buf.clear()
265 db.cmd_buf << '*2\r\n$3\r\nGET\r\n$${key.len}\r\n${key}\r\n'.bytes()
266 if db.pipeline_mode {
267 db.pipeline_buffer << db.cmd_buf
268 db.pipeline_cmd_count++
269 } else {
270 db.write_data(db.cmd_buf)!
271 resp := db.read_response()!
272 match resp {
273 []u8 {
274 $if T is string {
275 return resp.bytestr()
276 } $else $if T is $int {
277 return T(resp.bytestr().i64())
278 } $else $if T is []u8 {
279 return resp
280 }
281 }
282 RedisNull {
283 return error('`get()`: key ${key} not found')
284 }
285 else {
286 return error('`get()`: unexpected response type')
287 }
288 }
289
290 return error('`get()`: unsupported data type')
291 }
292 return T{}
293}
294
295// incr increments the integer value of a `key` by 1
296pub fn (mut db DB) incr(key string) !i64 {
297 // *2\r\n$4\r\nINCR\r\n$6\r\ncounter\r\n
298 // send cmd
299 db.cmd_buf.clear()
300 db.cmd_buf << '*2\r\n$4\r\nINCR\r\n$${key.len}\r\n${key}\r\n'.bytes()
301 if db.pipeline_mode {
302 db.pipeline_buffer << db.cmd_buf
303 db.pipeline_cmd_count++
304 } else {
305 db.write_data(db.cmd_buf)!
306
307 // read resp
308 return db.read_response()! as i64
309 }
310 return 0
311}
312
313// decr decrements the integer value of a `key` by 1
314pub fn (mut db DB) decr(key string) !i64 {
315 // *2\r\n$4\r\nDECR\r\n$6\r\ncounter\r\n
316 // send cmd
317 db.cmd_buf.clear()
318 db.cmd_buf << '*2\r\n$4\r\nDECR\r\n$${key.len}\r\n${key}\r\n'.bytes()
319 if db.pipeline_mode {
320 db.pipeline_buffer << db.cmd_buf
321 db.pipeline_cmd_count++
322 } else {
323 db.write_data(db.cmd_buf)!
324
325 // read resp
326 return db.read_response()! as i64
327 }
328 return 0
329}
330
331// hset sets multiple `key`-`value` pairs in a hash. Supported value types: string, int, []u8
332pub fn (mut db DB) hset[T](key string, m map[string]T) !int {
333 // HSET user:1 name "John" age 30
334 // *6\r\n$4\r\nHSET\r\n$6\r\nuser:1\r\n$4\r\nname\r\n$4\r\nJohn\r\n$3\r\nage\r\n$2\r\n30\r\n
335 db.cmd_buf.clear()
336 db.cmd_buf << '*${2 + m.len * 2}\r\n$4\r\nHSET\r\n$${key.len}\r\n${key}\r\n'.bytes()
337 for k, v in m {
338 db.cmd_buf << '$${k.len}\r\n${k}\r\n'.bytes()
339 $if T is string {
340 db.cmd_buf << '$${v.len}\r\n${v}\r\n'.bytes()
341 } $else $if T is $int {
342 v_str := v.str()
343 db.cmd_buf << '$${v_str.len}\r\n${v_str}\r\n'.bytes()
344 } $else $if T is []u8 {
345 // Write bulk string header correctly (no stray '$' after the length)
346 db.cmd_buf << '$${v.len}\r\n'.bytes()
347 db.cmd_buf << v
348 db.cmd_buf << '\r\n'.bytes()
349 } $else {
350 return error('`hset()`: unsupported value type. Allowed: number, string, []u8')
351 }
352 }
353 if db.pipeline_mode {
354 db.pipeline_buffer << db.cmd_buf
355 db.pipeline_cmd_count++
356 } else {
357 db.write_data(db.cmd_buf)!
358 return int(db.read_response()! as i64)
359 }
360 return 0
361}
362
363// hget retrieves the value of a hash field. Supported return types: string, int, []u8
364pub fn (mut db DB) hget[T](key string, m_key string) !T {
365 // HGET user:1 name
366 // *3\r\n$4\r\nHGET\r\n$6\r\nuser:1\r\n$4\r\nname\r\n
367 db.cmd_buf.clear()
368 db.cmd_buf << '*3\r\n$4\r\nHGET\r\n$${key.len}\r\n${key}\r\n'.bytes()
369 db.cmd_buf << '$${m_key.len}\r\n${m_key}\r\n'.bytes()
370 if db.pipeline_mode {
371 db.pipeline_buffer << db.cmd_buf
372 db.pipeline_cmd_count++
373 } else {
374 db.write_data(db.cmd_buf)!
375 resp := db.read_response()! as []u8
376 $if T is string {
377 return resp.bytestr()
378 } $else $if T is $int {
379 return resp.bytestr().i64()
380 } $else $if T is []u8 {
381 return resp
382 }
383 return error('`hget()`: unsupported return type. Allowed: number, string, []u8')
384 }
385 return T{}
386}
387
388// hgetall retrieves all fields and values of a hash. Supported value types: string, int, []u8
389pub fn (mut db DB) hgetall[T](key string) !map[string]T {
390 // HGETALL user:1
391 // *2\r\n$7\r\nHGETALL\r\n$6\r\nuser:1\r\n
392 $if T !is string && T !is $int && T !is []u8 {
393 return error('`hgetall()`: unsupported value type. Allowed: number, string, []u8')
394 }
395 db.cmd_buf.clear()
396 db.cmd_buf << '*2\r\n$7\r\nHGETALL\r\n$${key.len}\r\n${key}\r\n'.bytes()
397 if db.pipeline_mode {
398 db.pipeline_buffer << db.cmd_buf
399 db.pipeline_cmd_count++
400 } else {
401 db.write_data(db.cmd_buf)!
402 resp := db.read_response()!
403
404 // normalize result into map[string]T regardless of RESP2 array, RESP3 map,
405 // or RedisMap interleaved pairs.
406 $if T is string {
407 mut result := map[string]T{}
408 match resp {
409 []RedisValue {
410 if resp.len % 2 != 0 {
411 return error('`hgetall()`: invalid HGETALL response format')
412 }
413 for i in 0 .. resp.len / 2 {
414 // keys and values expected as bulk strings for RESP2
415 key_val := resp[2 * i]
416 val_val := resp[2 * i + 1]
417 // key
418 k := match key_val {
419 []u8 { key_val.bytestr() }
420 string { key_val }
421 else { return error('`hgetall()`: unexpected key type: ${key_val.type_name()}') }
422 }
423
424 // value
425 v := match val_val {
426 []u8 { val_val.bytestr() }
427 string { val_val }
428 i64 { val_val.str() }
429 else { return error('`hgetall()`: unexpected value type: ${val_val.type_name()}') }
430 }
431
432 result[k] = v
433 }
434 return result
435 }
436 map[string]RedisValue {
437 for k, v in resp {
438 val_str := match v {
439 []u8 { v.bytestr() }
440 string { v }
441 i64 { v.str() }
442 else { return error('`hgetall()`: unexpected value type in map: ${v.type_name()}') }
443 }
444
445 result[k] = val_str
446 }
447 return result
448 }
449 RedisMap {
450 pairs := resp.pairs
451 if pairs.len % 2 != 0 {
452 return error('`hgetall()`: invalid RedisMap response format')
453 }
454 for i := 0; i < pairs.len; i += 2 {
455 key_val := pairs[i]
456 val_val := pairs[i + 1]
457 k := match key_val {
458 []u8 { key_val.bytestr() }
459 string { key_val }
460 else { return error('`hgetall()`: unexpected key type in RedisMap: ${key_val.type_name()}') }
461 }
462
463 v := match val_val {
464 []u8 { val_val.bytestr() }
465 string { val_val }
466 i64 { val_val.str() }
467 else { return error('`hgetall()`: unexpected value type in RedisMap: ${val_val.type_name()}') }
468 }
469
470 result[k] = v
471 }
472 return result
473 }
474 else {
475 return error('`hgetall()`: unsupported response type: ${resp.type_name()}')
476 }
477 }
478 } $else $if T is $int {
479 mut result := map[string]T{}
480 match resp {
481 []RedisValue {
482 if resp.len % 2 != 0 {
483 return error('`hgetall()`: invalid HGETALL response format')
484 }
485 for i in 0 .. resp.len / 2 {
486 key_val := resp[2 * i]
487 val_val := resp[2 * i + 1]
488 k := match key_val {
489 []u8 { key_val.bytestr() }
490 string { key_val }
491 else { return error('`hgetall()`: unexpected key type: ${key_val.type_name()}') }
492 }
493
494 v := match val_val {
495 []u8 { val_val.bytestr().i64() }
496 string { val_val.i64() }
497 i64 { val_val }
498 else { return error('`hgetall()`: unexpected value type: ${val_val.type_name()}') }
499 }
500
501 result[k] = T(v)
502 }
503 return result
504 }
505 map[string]RedisValue {
506 for k, v in resp {
507 n := match v {
508 []u8 { v.bytestr().i64() }
509 string { v.i64() }
510 i64 { v }
511 else { return error('`hgetall()`: unexpected value type in map: ${v.type_name()}') }
512 }
513
514 result[k] = T(n)
515 }
516 return result
517 }
518 RedisMap {
519 pairs := resp.pairs
520 if pairs.len % 2 != 0 {
521 return error('`hgetall()`: invalid RedisMap response format')
522 }
523 for i := 0; i < pairs.len; i += 2 {
524 key_val := pairs[i]
525 val_val := pairs[i + 1]
526 k := match key_val {
527 []u8 { key_val.bytestr() }
528 string { key_val }
529 else { return error('`hgetall()`: unexpected key type in RedisMap: ${key_val.type_name()}') }
530 }
531
532 n := match val_val {
533 []u8 { val_val.bytestr().i64() }
534 string { val_val.i64() }
535 i64 { val_val }
536 else { return error('`hgetall()`: unexpected value type in RedisMap: ${val_val.type_name()}') }
537 }
538
539 result[k] = T(n)
540 }
541 return result
542 }
543 else {
544 return error('`hgetall()`: unsupported response type: ${resp.type_name()}')
545 }
546 }
547 } $else $if T is []u8 {
548 mut result := map[string]T{}
549 match resp {
550 []RedisValue {
551 if resp.len % 2 != 0 {
552 return error('`hgetall()`: invalid HGETALL response format')
553 }
554 for i in 0 .. resp.len / 2 {
555 key_val := resp[2 * i]
556 val_val := resp[2 * i + 1]
557 k := match key_val {
558 []u8 { key_val.bytestr() }
559 string { key_val }
560 else { return error('`hgetall()`: unexpected key type: ${key_val.type_name()}') }
561 }
562
563 v := match val_val {
564 []u8 { val_val }
565 string { val_val.bytes() }
566 i64 { val_val.str().bytes() }
567 else { return error('`hgetall()`: unexpected value type: ${val_val.type_name()}') }
568 }
569
570 result[k] = v
571 }
572 return result
573 }
574 map[string]RedisValue {
575 for k, v in resp {
576 b := match v {
577 []u8 { v }
578 string { v.bytes() }
579 i64 { v.str().bytes() }
580 else { return error('`hgetall()`: unexpected value type in map: ${v.type_name()}') }
581 }
582
583 result[k] = b
584 }
585 return result
586 }
587 RedisMap {
588 pairs := resp.pairs
589 if pairs.len % 2 != 0 {
590 return error('`hgetall()`: invalid RedisMap response format')
591 }
592 for i := 0; i < pairs.len; i += 2 {
593 key_val := pairs[i]
594 val_val := pairs[i + 1]
595 k := match key_val {
596 []u8 { key_val.bytestr() }
597 string { key_val }
598 else { return error('`hgetall()`: unexpected key type in RedisMap: ${key_val.type_name()}') }
599 }
600
601 b := match val_val {
602 []u8 { val_val }
603 string { val_val.bytes() }
604 i64 { val_val.str().bytes() }
605 else { return error('`hgetall()`: unexpected value type in RedisMap: ${val_val.type_name()}') }
606 }
607
608 result[k] = b
609 }
610 return result
611 }
612 else {
613 return error('`hgetall()`: unsupported response type: ${resp.type_name()}')
614 }
615 }
616 } $else {
617 // should not happen due to compile-time check above
618 return error('`hgetall()`: unsupported value type ${T.type_name()}')
619 }
620 }
621 return map[string]T{}
622}
623
624// expire sets a `key`'s time to live in `seconds`
625pub fn (mut db DB) expire(key string, seconds int) !bool {
626 // *3\r\n$6\r\nEXPIRE\r\n$6\r\ncounter\r\n$3\r\n600\r\n
627 // send cmd
628 seconds_str := seconds.str()
629 db.cmd_buf.clear()
630 db.cmd_buf << '*3\r\n$6\r\nEXPIRE\r\n$${key.len}\r\n${key}\r\n'.bytes()
631 db.cmd_buf << '$${seconds_str.len}\r\n${seconds_str}\r\n'.bytes()
632 if db.pipeline_mode {
633 db.pipeline_buffer << db.cmd_buf
634 db.pipeline_cmd_count++
635 } else {
636 db.write_data(db.cmd_buf)!
637
638 // read resp
639 rv := db.read_response()!
640
641 // normalize to boolean result as before
642 match rv {
643 i64 { return rv != 0 }
644 []u8 { return rv.bytestr().i64() != 0 }
645 string { return rv.i64() != 0 }
646 else { return error('`expire()`: unexpected response type: ${rv.type_name()}') }
647 }
648 }
649 return false
650}
651
652// read_response_bulk_string handles Redis bulk string responses (format: $<length>\r\n<data>\r\n)
653fn (mut db DB) read_response_bulk_string() !RedisValue {
654 mut data_length := i64(-1)
655 mut chunk := []u8{len: 1}
656
657 db.resp_buf.clear()
658 for {
659 bytes_read := db.read_data(mut chunk) or {
660 return error('`read_response_bulk_string()`: connection error ${err}')
661 }
662 if bytes_read == 0 {
663 return error('`read_response_bulk_string()`: connection closed prematurely')
664 }
665 db.resp_buf << chunk[0]
666
667 if chunk[0] == `\n` {
668 break
669 }
670 if (chunk[0] < `0` || chunk[0] > `9`) && chunk[0] != `\r` && chunk[0] != `-` {
671 return error('`read_response_bulk_string()`: invalid bulk string header')
672 }
673 }
674
675 if db.resp_buf.len < 2 {
676 return error('`read_response_bulk_string()`: bulk string header too short')
677 }
678
679 data_length = db.resp_buf[0..db.resp_buf.len - 2].bytestr().i64()
680
681 // -1 -> NULL bulk string
682 if data_length == -1 {
683 return RedisNull{}
684 }
685
686 // If zero-length payload, read exactly the 2-byte terminator CRLF reliably
687 if data_length == 0 {
688 mut term := [2]u8{}
689 n := db.read_ptr_data(&term[0], 2)! // read remaining terminator bytes
690 match n {
691 0, 1 {
692 return error('`read_response_bulk_string()`: incomplete terminator for empty string')
693 }
694 else {
695 if n > 2 {
696 return error('`read_response_bulk_string()`: should never get here - more than 2 bytes read?!?')
697 }
698 }
699 }
700
701 if term[0] != `\r` || term[1] != `\n` {
702 return error('invalid terminator for empty string')
703 }
704 return []u8{}
705 }
706
707 // Read payload of exactly data_length bytes
708 mut data_buf := []u8{len: int(data_length)}
709 mut total_read := 0
710 for total_read < data_buf.len {
711 mut ptr := unsafe { &data_buf[total_read] }
712 n := db.read_ptr_data(ptr, data_buf.len - total_read)!
713 if n == 0 && total_read < data_buf.len {
714 return error('`read_response_bulk_string()`: incomplete data: read ${total_read} / ${data_buf.len} bytes')
715 }
716 total_read += n
717 }
718
719 // Now read the trailing CRLF terminator (2 bytes) reliably
720 mut term := []u8{len: 2}
721 mut term_read := 0
722 for term_read < 2 {
723 mut ptr := unsafe { &term[term_read] }
724 n := db.read_ptr_data(ptr, 2 - term_read)!
725 if n == 0 && term_read < 2 {
726 return error('`read_response_bulk_string()`: incomplete terminator after payload')
727 }
728 term_read += n
729 }
730 if term[0] != `\r` || term[1] != `\n` {
731 return error('`read_response_bulk_string()`: invalid data terminator')
732 }
733
734 return data_buf
735}
736
737// read_header reads a CRLF-terminated header (returns content without trailing CRLF)
738fn (mut db DB) read_header() !string {
739 mut chunk := []u8{len: 1}
740 db.resp_buf.clear()
741 for {
742 bytes_read := db.read_data(mut chunk) or {
743 return error('`read_header()`: connection error ${err}')
744 }
745 if bytes_read == 0 {
746 return error('`read_header()`: connection closed prematurely')
747 }
748 db.resp_buf << chunk[0]
749 if chunk[0] == `\n` {
750 break
751 }
752 }
753 if db.resp_buf.len < 2 {
754 return error('`read_header()`: header too short')
755 }
756 return db.resp_buf[0..db.resp_buf.len - 2].bytestr()
757}
758
759// read_exact_payload reads exactly n bytes + trailing CRLF and return the data bytes (without CRLF)
760fn (mut db DB) read_exact_payload(n int) ![]u8 {
761 if n < 0 {
762 return error('invalid payload length ${n}')
763 }
764 mut data_buf := []u8{len: n + 2}
765 mut total_read := 0
766 for total_read < data_buf.len {
767 remaining := data_buf.len - total_read
768 chunk_size := if remaining > 1 { 1 } else { remaining }
769 mut chunk_ptr := unsafe { &data_buf[total_read] }
770
771 bytes_read := db.read_ptr_data(chunk_ptr, chunk_size)!
772 total_read += bytes_read
773
774 if bytes_read == 0 && total_read < data_buf.len {
775 return error('`read_exact_payload()`: incomplete data: read ${total_read} / ${data_buf.len} bytes')
776 }
777 }
778 // must ending with CRLF
779 if data_buf[n] != `\r` || data_buf[n + 1] != `\n` {
780 return error('`read_exact_payload()`: invalid data terminator')
781 }
782 return data_buf[0..n].clone()
783}
784
785// read_resp3_boolean_payload handles RESP3 boolean (#t or #f)
786fn (mut db DB) read_resp3_boolean_payload() !bool {
787 s := db.read_header()!
788 if s == 't' {
789 return true
790 }
791 if s == 'f' {
792 return false
793 }
794 return error('`read_resp3_boolean_payload()`: invalid boolean: ${s}')
795}
796
797// read_resp3_double_payload handles RESP3 double (,<double>)
798fn (mut db DB) read_resp3_double_payload() !f64 {
799 s := db.read_header()!
800 return s.f64()
801}
802
803// read_resp3_bignum_payload handles RESP3 big number ((<number>) -> big.Integer)
804fn (mut db DB) read_resp3_bignum_payload() !big.Integer {
805 mut s := db.read_header()!
806 // RESP3 bignum frames may be wrapped in parentheses, e.g. "(12345)".
807 // Trim leading '(' and trailing ')' if present to make the numeric string safe for the parser.
808 if s.len > 0 && s[0] == `(` {
809 s = s[1..]
810 }
811 if s.len > 0 && s[s.len - 1] == `)` {
812 s = s[0..s.len - 1]
813 }
814 return big.integer_from_string(s)!
815}
816
817// read_resp3_blob_error_payload handles RESP3 blob error (!<len>\r\n<data>\r\n)
818fn (mut db DB) read_resp3_blob_error_payload() !RedisBlobError {
819 header := db.read_header()!
820 length := header.i64()
821 if length == -1 {
822 return RedisBlobError{
823 data: []u8{}
824 }
825 }
826 payload := db.read_exact_payload(int(length))!
827 return RedisBlobError{
828 data: payload
829 }
830}
831
832// read_resp3_verbatim_payload handles RESP3 verbatim (=<len>\r\n<fmt>:<data>\r\n)
833fn (mut db DB) read_resp3_verbatim_payload() !RedisVerbatim {
834 header := db.read_header()!
835 length := header.i64()
836 if length == -1 {
837 return RedisVerbatim{
838 format: ''
839 data: []u8{}
840 }
841 }
842 payload := db.read_exact_payload(int(length))!
843 // split at first ':'
844 idx := payload.bytestr().index(':') or { -1 }
845 if idx == -1 {
846 return RedisVerbatim{
847 format: ''
848 data: payload
849 }
850 }
851 fmt := payload[0..idx].bytestr()
852 data := payload[idx + 1..].clone()
853 return RedisVerbatim{
854 format: fmt
855 data: data
856 }
857}
858
859// read_resp3_map_payload handles RESP3 map (%) where header is number of key/value pairs
860// Try to return map[string]RedisValue when keys are string-like, otherwise return RedisMap
861fn (mut db DB) read_resp3_map_payload() !RedisValue {
862 header := db.read_header()!
863 count := header.i64()
864 if count == -1 {
865 return RedisNull{}
866 }
867 if count == 0 {
868 return map[string]RedisValue{}
869 }
870 mut pairs := []RedisValue{cap: int(count) * 2}
871 for _ in 0 .. count {
872 key := db.read_response()!
873 val := db.read_response()!
874 pairs << key
875 pairs << val
876 }
877 // attempt to convert to map[string]RedisValue
878 mut kv := map[string]RedisValue{}
879 for i := 0; i < pairs.len; i += 2 {
880 k := pairs[i]
881 v := pairs[i + 1]
882 match k {
883 []u8 {
884 kv[k.bytestr()] = v
885 }
886 string {
887 kv[k] = v
888 }
889 else {
890 // fallback: return interleaved pairs preserved as RedisMap
891 return RedisMap{
892 pairs: pairs
893 }
894 }
895 }
896 }
897 return kv
898}
899
900// read_resp3_attr_payload handles RESP3 attributes/attrs (|) and returns a map[string]RedisValue
901// Attributes are map-like and we return a map when keys are string-like. If a non-string
902// key is encountered, this treats it as an error (attributes are expected to be string-keyed).
903fn (mut db DB) read_resp3_attr_payload() !RedisValue {
904 header := db.read_header()!
905 count := header.i64()
906 if count == -1 {
907 return RedisNull{}
908 }
909 if count == 0 {
910 return map[string]RedisValue{}
911 }
912 mut kv := map[string]RedisValue{}
913 for _ in 0 .. count {
914 k := db.read_response()!
915 v := db.read_response()!
916 match k {
917 []u8 {
918 kv[k.bytestr()] = v
919 }
920 string {
921 kv[k] = v
922 }
923 else {
924 return error('`read_resp3_attr_payload()`: attribute key is not a string-like type')
925 }
926 }
927 }
928 return kv
929}
930
931// read_resp3_set_payload handles RESP3 set (~)
932fn (mut db DB) read_resp3_set_payload() !RedisSet {
933 header := db.read_header()!
934 count := header.i64()
935 if count == -1 {
936 return RedisSet{
937 elements: []RedisValue{}
938 }
939 }
940 mut elems := []RedisValue{cap: int(count)}
941 for _ in 0 .. count {
942 elems << db.read_response()!
943 }
944 return RedisSet{
945 elements: elems
946 }
947}
948
949// read_resp3_push_payload handles RESP3 push (>) - array-like
950fn (mut db DB) read_resp3_push_payload() !RedisPush {
951 header := db.read_header()!
952 count := header.i64()
953 if count == -1 {
954 return RedisPush{
955 elements: []RedisValue{}
956 }
957 }
958 mut elems := []RedisValue{cap: int(count)}
959 for _ in 0 .. count {
960 elems << db.read_response()!
961 }
962 return RedisPush{
963 elements: elems
964 }
965}
966
967// read_response_i64 handles Redis integer responses (format: :<number>\r\n)
968fn (mut db DB) read_response_i64() !i64 {
969 db.resp_buf.clear()
970 unsafe { db.resp_buf.grow_len(resp_buf_pre_allocate_len) }
971 mut total_read := 0
972
973 for total_read < db.resp_buf.len {
974 remaining := db.resp_buf.len - total_read
975 chunk_size := if remaining > 1 { 1 } else { remaining }
976 mut chunk_ptr := unsafe { &db.resp_buf[total_read] }
977
978 bytes_read := db.read_ptr_data(chunk_ptr, chunk_size)!
979 total_read += bytes_read
980
981 if total_read > 2 {
982 if db.resp_buf[total_read - 2] == `\r` && db.resp_buf[total_read - 1] == `\n` {
983 break
984 }
985 }
986 if bytes_read == 0 {
987 return error('`read_response_i64()`: incomplete data: read ${total_read} bytes')
988 }
989 }
990 ret_val := db.resp_buf[0..total_read - 2].bytestr().i64()
991 return ret_val
992}
993
994// read_response_simple_string handles Redis simple string responses (format: +<string>\r\n)
995fn (mut db DB) read_response_simple_string() !string {
996 db.resp_buf.clear()
997 unsafe { db.resp_buf.grow_len(resp_buf_pre_allocate_len) }
998 mut total_read := 0
999
1000 for total_read < db.resp_buf.len {
1001 remaining := db.resp_buf.len - total_read
1002 chunk_size := if remaining > 1 { 1 } else { remaining }
1003 mut chunk_ptr := unsafe { &db.resp_buf[total_read] }
1004
1005 bytes_read := db.read_ptr_data(chunk_ptr, chunk_size)!
1006 total_read += bytes_read
1007
1008 if total_read > 2 {
1009 if db.resp_buf[total_read - 2] == `\r` && db.resp_buf[total_read - 1] == `\n` {
1010 break
1011 }
1012 }
1013 if bytes_read == 0 {
1014 return error('`read_response_simple_string()`: incomplete data: read ${total_read} bytes')
1015 }
1016 }
1017 return db.resp_buf[0..total_read - 2].bytestr()
1018}
1019
1020// read_response_array handles Redis array responses (format: *<length>\r\n<elements>)
1021fn (mut db DB) read_response_array() !RedisValue {
1022 mut array_len := i64(-1)
1023 mut chunk := []u8{len: 1}
1024
1025 db.resp_buf.clear()
1026 for {
1027 bytes_read := db.read_data(mut chunk) or {
1028 return error('`read_response_array()`: connection error: ${err}')
1029 }
1030 if bytes_read == 0 {
1031 return error('`read_response_array()`: connection closed prematurely')
1032 }
1033 db.resp_buf << chunk[0]
1034
1035 if chunk[0] == `\n` {
1036 break
1037 }
1038 if (chunk[0] < `0` || chunk[0] > `9`) && chunk[0] != `\r` && chunk[0] != `-` {
1039 return error('`read_response_array()`: invalid array header')
1040 }
1041 }
1042
1043 if db.resp_buf.len < 2 {
1044 return error('`read_response_array()`: array header too short')
1045 }
1046
1047 array_len = db.resp_buf[0..db.resp_buf.len - 2].bytestr().i64() // 排除\r\n
1048
1049 if array_len == -1 {
1050 return RedisNull{}
1051 }
1052 if array_len == 0 {
1053 return []RedisValue{}
1054 }
1055
1056 mut elements := []RedisValue{cap: int(array_len)}
1057 for _ in 0 .. array_len {
1058 element := db.read_response() or {
1059 return error('`read_response_array()`: failed to read array element: ${err}')
1060 }
1061 elements << element
1062 }
1063 return elements
1064}
1065
1066// read_response handles all types of Redis responses (RESP2 + RESP3 when enabled)
1067fn (mut db DB) read_response() !RedisValue {
1068 db.resp_buf.clear()
1069 unsafe { db.resp_buf.grow_len(1) }
1070 // Read the first non-empty, non-CR/LF prefix byte. Some transports or
1071 // intermediate proxies may emit stray CR/LF bytes; skip them so we parse
1072 // the actual RESP prefix correctly.
1073 for {
1074 read_len := db.read_data(mut db.resp_buf)!
1075 if read_len != 1 {
1076 return error('`read_response()`: empty response from server')
1077 }
1078 // Skip stray CR and LF bytes that may precede the real response prefix.
1079 if db.resp_buf[0] == `\r` || db.resp_buf[0] == `\n` {
1080 continue
1081 }
1082 break
1083 }
1084
1085 // If the first non-CRLF byte is not a valid RESP prefix, attempt a bounded
1086 // resynchronization: read and discard up to `max_skip` bytes looking for a
1087 // valid prefix. This helps tolerate transient stray bytes while avoiding
1088 // silently swallowing large amounts of data.
1089 mut attempts := 0
1090
1091 for {
1092 // If this byte is a known RESP prefix, proceed to parse normally.
1093 ch := db.resp_buf[0]
1094 if ch == `+` || ch == `-` || ch == `:` || ch == `$` || ch == `*` || ch == `#` || ch == `,`
1095 || ch == `(` || ch == `!` || ch == `=` || ch == `%` || ch == `~` || ch == `>`
1096 || ch == `|` {
1097 break
1098 }
1099 // Give up after bounded attempts and return diagnostics.
1100 if attempts >= max_skip {
1101 mut prefix_val := -1
1102 if db.resp_buf.len > 0 {
1103 prefix_val = int(db.resp_buf[0])
1104 }
1105 mut hex := ''
1106 for i in 0 .. db.resp_buf.len {
1107 hex += '${int(db.resp_buf[i]):02x} '
1108 }
1109 return error("`read_response()`: unknown response prefix byte=${prefix_val} data_hex=\"${hex}\" data_str=\"${db.resp_buf.bytestr()}\"")
1110 }
1111 // Read and discard one more byte from the socket and treat it as the new candidate.
1112 mut tmp := []u8{len: 1}
1113 n := db.read_data(mut tmp) or { 0 }
1114 if n == 0 {
1115 return error('`read_response()`: incomplete data during resynchronization')
1116 }
1117 db.resp_buf[0] = tmp[0]
1118 // skip CRLF if encountered but still count the attempt (we consumed a byte)
1119 attempts++
1120 continue
1121 }
1122
1123 match db.resp_buf[0] {
1124 `+` { // Simple string
1125 return db.read_response_simple_string()!
1126 }
1127 `-` { // Error message
1128 msg := db.read_response_simple_string()!
1129 return error(msg)
1130 }
1131 `:` { // Integer
1132 return db.read_response_i64()!
1133 }
1134 `$` { // Bulk string
1135 return db.read_response_bulk_string()!
1136 }
1137 `*` { // Array
1138 return db.read_response_array()!
1139 }
1140 // RESP3-only frames (enabled when db.version >= 3)
1141 `#` { // Boolean
1142 if db.version < 3 {
1143 return error('`read_response()`: unknown response prefix: ${db.resp_buf.bytestr()}')
1144 }
1145 return RedisValue(db.read_resp3_boolean_payload()!)
1146 }
1147 `,` { // Double
1148 if db.version < 3 {
1149 return error('`read_response()`: unknown response prefix: ${db.resp_buf.bytestr()}')
1150 }
1151 return RedisValue(db.read_resp3_double_payload()!)
1152 }
1153 `(` { // Big number
1154 if db.version < 3 {
1155 return error('`read_response()`: unknown response prefix: ${db.resp_buf.bytestr()}')
1156 }
1157 return RedisValue(db.read_resp3_bignum_payload()!)
1158 }
1159 `!` { // Blob error
1160 if db.version < 3 {
1161 return error('`read_response()`: unknown response prefix: ${db.resp_buf.bytestr()}')
1162 }
1163 return db.read_resp3_blob_error_payload()!
1164 }
1165 `=` { // Verbatim string
1166 if db.version < 3 {
1167 return error('`read_response()`: unknown response prefix: ${db.resp_buf.bytestr()}')
1168 }
1169 return db.read_resp3_verbatim_payload()!
1170 }
1171 `%` { // Map
1172 if db.version < 3 {
1173 return error('`read_response()`: unknown response prefix: ${db.resp_buf.bytestr()}')
1174 }
1175 return db.read_resp3_map_payload()!
1176 }
1177 `~` { // Set
1178 if db.version < 3 {
1179 return error('`read_response()`: unknown response prefix: ${db.resp_buf.bytestr()}')
1180 }
1181 return RedisValue(db.read_resp3_set_payload()!)
1182 }
1183 `>` { // Push
1184 if db.version < 3 {
1185 return error('`read_response()`: unknown response prefix: ${db.resp_buf.bytestr()}')
1186 }
1187 return RedisValue(db.read_resp3_push_payload()!)
1188 }
1189 `|` { // Attr (map-like)
1190 if db.version < 3 {
1191 return error('`read_response()`: unknown response prefix: ${db.resp_buf.bytestr()}')
1192 }
1193 // Attributes are parsed like maps; reuse map parsing (attrs preserved as map[string]RedisValue)
1194 return db.read_resp3_map_payload()!
1195 }
1196 else {
1197 // Fallback: this should be unreachable because we validated prefixes above,
1198 // but return a helpful diagnostic if it happens.
1199 mut prefix_val := -1
1200 if db.resp_buf.len > 0 {
1201 prefix_val = int(db.resp_buf[0])
1202 }
1203 mut hex := ''
1204 for i in 0 .. db.resp_buf.len {
1205 hex += '${int(db.resp_buf[i]):02x} '
1206 }
1207 return error("`read_response()`: unknown response prefix byte=${prefix_val} data_hex=\"${hex}\" data_str=\"${db.resp_buf.bytestr()}\"")
1208 }
1209 }
1210
1211 return error('`read_response()`: unreachable code')
1212}
1213
1214// cmd sends a custom command to Redis server
1215// for example: db.cmd('SET', 'key', 'value')!
1216pub fn (mut db DB) cmd(cmd ...string) !RedisValue {
1217 mut sb := strings.new_builder(cmd.len * 20)
1218 sb.write_string('*${cmd.len}\r\n') // Command array header
1219 for arg in cmd {
1220 sb.write_string('$${arg.len}\r\n${arg}\r\n')
1221 }
1222 if db.pipeline_mode {
1223 db.pipeline_buffer << unsafe { sb.reuse_as_plain_u8_array() }
1224 db.pipeline_cmd_count++
1225 } else {
1226 db.write_data(unsafe { sb.reuse_as_plain_u8_array() })!
1227 return db.read_response()!
1228 }
1229 return RedisNull{}
1230}
1231
1232// pipeline_start start a new pipeline
1233pub fn (mut db DB) pipeline_start() {
1234 db.pipeline_mode = true
1235 db.pipeline_cmd_count = 0
1236 db.pipeline_buffer.clear()
1237}
1238
1239// pipeline_execute executes the cmds in pipeline at once and retrieves all responses
1240pub fn (mut db DB) pipeline_execute() ![]RedisValue {
1241 if !db.pipeline_mode {
1242 return error('`pipeline_execute()`: pipeline not started')
1243 }
1244 defer {
1245 db.pipeline_mode = false
1246 }
1247 if db.pipeline_buffer.len == 0 {
1248 return []RedisValue{}
1249 }
1250
1251 db.write_data(db.pipeline_buffer)!
1252
1253 mut results := []RedisValue{cap: db.pipeline_cmd_count}
1254 for _ in 0 .. db.pipeline_cmd_count {
1255 results << db.read_response()!
1256 }
1257
1258 // reset to non-pipeline mode
1259 db.pipeline_mode = false
1260 db.pipeline_cmd_count = 0
1261 return results
1262}
1263