v / vlib / db / pg / pg.c.v
1009 lines · 876 sloc · 29.61 KB · 2b04a6ecbf16697b4a6ae2e1e02d3a381967e8f0
Raw
1module pg
2
3import io
4import orm
5import time
6
7$if $pkgconfig('libpq') {
8 #pkgconfig --cflags --libs libpq
9} $else {
10 $if msvc {
11 #flag -llibpq
12 } $else {
13 #flag -lpq
14 }
15 #flag linux -I/usr/include/postgresql
16 //#flag linux -Ipostgresql // cross compiling to linux
17
18 #flag darwin -I/opt/local/include/postgresql11
19 #flag darwin -L/opt/local/lib/postgresql11
20
21 #flag darwin -I/usr/local/opt/libpq/include
22 #flag darwin -L/usr/local/opt/libpq/lib
23
24 #flag darwin -I/opt/homebrew/include
25 #flag darwin -L/opt/homebrew/lib
26
27 #flag darwin -I/opt/homebrew/opt/libpq/include
28 #flag darwin -L/opt/homebrew/opt/libpq/lib
29
30 #flag windows -I @VEXEROOT/thirdparty/pg/libpq
31 #flag windows -L @VEXEROOT/thirdparty/pg/win64
32
33 #flag freebsd -I/usr/local/include
34 #flag freebsd -L/usr/local/lib
35
36 #flag openbsd -I/usr/local/include/postgresql
37 #flag openbsd -L/usr/local/lib
38}
39
40$if cross_compile ? && linux {
41 #include <libpq/libpq-fe.h>
42 #include <libpq/pg_config.h>
43
44 //#flag -lpq // libpq.a is located in LINUXROOT/lib/x86_64-linux-gnu/libpq.a
45 // The bundled linuxroot ships libpq.a but no libpgcommon.a / libpgport.a,
46 // so libpq's references to pg_snprintf, strlcpy, pg_freeaddrinfo_all etc.
47 // are unresolved at link time. Provide minimal stubs that delegate to libc.
48 #flag @VEXEROOT/thirdparty/pg/pgport_stubs.c
49} $else {
50 // PostgreSQL Source Code
51 // https://doxygen.postgresql.org/libpq-fe_8h.html
52 #include <libpq-fe.h>
53
54 // for PG_VERSION_NUM, which is defined everywhere at least since PG 9.5
55 #include <pg_config.h>
56}
57
58// for orm
59$if windows {
60 #include <winsock2.h>
61} $else {
62 #include <arpa/inet.h>
63}
64
65#include "@VMODROOT/vlib/db/pg/compatibility.h"
66
67// Conn is a single libpq connection. It is NOT safe for concurrent use by
68// multiple V threads (libpq enforces serial use of `PGconn*`). Use it pinned
69// for operations that require a specific connection (LISTEN/NOTIFY, prepared
70// statements scoped to the session, manual transactions). For pooled,
71// thread-safe access prefer `DB`, which checks out a Conn per call.
72@[heap]
73pub struct Conn {
74mut:
75 conn voidptr = unsafe { nil }
76 pool &Pool = unsafe { nil }
77 created_at time.Time
78 bad bool
79}
80
81pub struct Row {
82pub mut:
83 vals []?string
84}
85
86// val returns the value at `index`, flattening SQL NULL to an empty string.
87pub fn (row Row) val(index int) string {
88 if val := row.vals[index] {
89 return val
90 }
91 return ''
92}
93
94// values returns all row values, flattening SQL NULL to empty strings.
95pub fn (row Row) values() []string {
96 mut values := []string{cap: row.vals.len}
97 for val in row.vals {
98 values << if value := val { value } else { '' }
99 }
100 return values
101}
102
103// val_opt returns the raw optional value at `index`.
104pub fn (row Row) val_opt(index int) ?string {
105 return row.vals[index]
106}
107
108pub struct RowNoNull {
109pub mut:
110 vals []string
111}
112
113pub struct Result {
114pub:
115 cols map[string]int
116 rows []Row
117}
118
119// Notification represents a notification received from the server via LISTEN/NOTIFY
120pub struct Notification {
121pub:
122 channel string // notification channel name
123 pid int // process ID of notifying server process
124 payload string // notification payload string (may be empty)
125}
126
127pub struct Config {
128pub:
129 host string = 'localhost'
130 port int = 5432
131 user string
132 username string
133 password string
134 dbname string
135}
136
137//
138
139pub struct C.pg_result {}
140
141pub struct C.pg_conn {}
142
143@[typedef]
144pub struct C.PGresult {}
145
146@[typedef]
147pub struct C.PGconn {}
148
149// PGnotify represents a notification received from the server via LISTEN/NOTIFY
150@[typedef]
151pub struct C.PGnotify {
152 relname &char // notification channel name
153 be_pid int // process ID of notifying server process
154 extra &char // notification payload string
155}
156
157pub enum ConnStatusType {
158 ok = C.CONNECTION_OK
159 bad = C.CONNECTION_BAD
160 // Non-blocking mode only below here
161 // The existence of these should never be relied upon - they should only be used for user feedback or similar purposes.
162 started = C.CONNECTION_STARTED // Waiting for connection to be made.
163 made = C.CONNECTION_MADE // Connection OK; waiting to send.
164 awaiting_response = C.CONNECTION_AWAITING_RESPONSE // Waiting for a response from the postmaster.
165 auth_ok = C.CONNECTION_AUTH_OK // Received authentication; waiting for backend startup.
166 setenv = C.CONNECTION_SETENV // Negotiating environment.
167 ssl_startup = C.CONNECTION_SSL_STARTUP // Negotiating SSL.
168 needed = C.CONNECTION_NEEDED // Internal state: connect() needed . Available in PG 8
169 check_writable = C.CONNECTION_CHECK_WRITABLE // Check if we could make a writable connection. Available since PG 10
170 consume = C.CONNECTION_CONSUME // Wait for any pending message and consume them. Available since PG 10
171 gss_startup = C.CONNECTION_GSS_STARTUP // Negotiating GSSAPI; available since PG 12
172}
173
174@[typedef]
175pub enum ExecStatusType {
176 empty_query = C.PGRES_EMPTY_QUERY // empty query string was executed
177 command_ok = C.PGRES_COMMAND_OK // a query command that doesn't return anything was executed properly by the backend
178 tuples_ok = C.PGRES_TUPLES_OK // a query command that returns tuples was executed properly by the backend, PGresult contains the result tuples
179 copy_out = C.PGRES_COPY_OUT // Copy Out data transfer in progress
180 copy_in = C.PGRES_COPY_IN // Copy In data transfer in progress
181 bad_response = C.PGRES_BAD_RESPONSE // an unexpected response was recv'd from the backend
182 nonfatal_error = C.PGRES_NONFATAL_ERROR // notice or warning message
183 fatal_error = C.PGRES_FATAL_ERROR // query failed
184 copy_both = C.PGRES_COPY_BOTH // Copy In/Out data transfer in progress
185 single_tuple = C.PGRES_SINGLE_TUPLE // single tuple from larger resultset
186}
187
188//
189
190fn C.PQconnectdb(const_conninfo &char) &C.PGconn
191
192fn C.PQstatus(const_conn &C.PGconn) i32
193
194fn C.PQtransactionStatus(const_conn &C.PGconn) i32
195
196fn C.PQerrorMessage(const_conn &C.PGconn) &char
197
198fn C.PQexec(res &C.PGconn, const_query &char) &C.PGresult
199
200//
201
202fn C.PQgetisnull(const_res &C.PGresult, i32, i32) i32
203
204fn C.PQgetvalue(const_res &C.PGresult, i32, i32) &char
205
206fn C.PQresultStatus(const_res &C.PGresult) i32
207
208fn C.PQntuples(const_res &C.PGresult) i32
209
210fn C.PQnfields(const_res &C.PGresult) i32
211
212fn C.PQfname(const_res &C.PGresult, i32) &char
213
214// Params:
215// const Oid *paramTypes
216// const char *const *paramValues
217// const int *paramLengths
218// const int *paramFormats
219fn C.PQexecParams(conn &C.PGconn, const_command &char, nParams i32, const_paramTypes &int, const_paramValues &char,
220 const_paramLengths &int, const_paramFormats &int, resultFormat i32) &C.PGresult
221
222fn C.PQputCopyData(conn &C.PGconn, const_buffer &char, nbytes i32) i32
223
224fn C.PQputCopyEnd(conn &C.PGconn, const_errmsg &char) i32
225
226fn C.PQgetCopyData(conn &C.PGconn, buffer &&char, async i32) i32
227
228fn C.PQprepare(conn &C.PGconn, const_stmtName &char, const_query &char, nParams i32, const_param_types &&char) &C.PGresult
229
230fn C.PQexecPrepared(conn &C.PGconn, const_stmtName &char, nParams i32, const_paramValues &char,
231 const_paramLengths &int, const_paramFormats &int, resultFormat i32) &C.PGresult
232
233// cleanup
234
235fn C.PQclear(res &C.PGresult)
236
237fn C.PQfreemem(ptr voidptr)
238
239fn C.PQfinish(conn &C.PGconn)
240
241// LISTEN/NOTIFY support
242fn C.PQnotifies(conn &C.PGconn) &C.PGnotify
243
244fn C.PQconsumeInput(conn &C.PGconn) i32
245
246fn C.PQsocket(conn &C.PGconn) i32
247
248fn C.PQescapeLiteral(conn &C.PGconn, str &char, length usize) &char
249
250fn conninfo_needs_quotes(value string) bool {
251 for ch in value {
252 if ch.is_space() || ch == `'` || ch == `\\` {
253 return true
254 }
255 }
256 return false
257}
258
259fn escape_conninfo_value(value string) string {
260 if !conninfo_needs_quotes(value) {
261 return value
262 }
263 mut escaped := []u8{cap: value.len + 2}
264 escaped << `'`
265 for ch in value {
266 if ch == `\\` || ch == `'` {
267 escaped << `\\`
268 }
269 escaped << ch
270 }
271 escaped << `'`
272 return escaped.bytestr()
273}
274
275// connection_user returns the configured username, accepting both `user` and `username`.
276pub fn (config Config) connection_user() !string {
277 if config.user != '' && config.username != '' && config.user != config.username {
278 return error('db.pg: Config.user and Config.username must match when both are set')
279 }
280 if config.user != '' {
281 return config.user
282 }
283 return config.username
284}
285
286fn (config Config) conninfo() !string {
287 mut parts := []string{cap: 5}
288 if config.host != '' {
289 parts << 'host=${escape_conninfo_value(config.host)}'
290 }
291 if config.port > 0 {
292 parts << 'port=${config.port}'
293 }
294 user := config.connection_user()!
295 if user != '' {
296 parts << 'user=${escape_conninfo_value(user)}'
297 }
298 if config.dbname != '' {
299 parts << 'dbname=${escape_conninfo_value(config.dbname)}'
300 }
301 if config.password != '' {
302 parts << 'password=${escape_conninfo_value(config.password)}'
303 }
304 return parts.join(' ')
305}
306
307// connect_slot opens a single libpq connection and returns it as an
308// `IdleSlot` (raw handle + creation timestamp). The pool wraps the slot in a
309// fresh `&Conn` per checkout so the wrapper cannot be revived by a stale
310// reference after release.
311fn connect_slot(conninfo string) !IdleSlot {
312 conn := C.PQconnectdb(&char(conninfo.str))
313 if conn == 0 {
314 return error('libpq memory allocation error')
315 }
316 status := unsafe { ConnStatusType(C.PQstatus(conn)) }
317 if status != .ok {
318 // We force the construction of a new string as the
319 // error message will be freed by the next `PQfinish` call
320 c_error_msg := unsafe { C.PQerrorMessage(conn).vstring() }
321 error_msg := '${c_error_msg}'
322 C.PQfinish(conn)
323 return error('Connection to a PG database failed: ${error_msg}')
324 }
325 return IdleSlot{
326 handle: conn
327 created_at: time.now()
328 }
329}
330
331// physical_close_handle tears down a raw libpq handle. Used by the pool when
332// it discards an expired/broken slot or unwinds during shutdown.
333fn physical_close_handle(handle voidptr) {
334 if handle != unsafe { nil } {
335 C.PQfinish(handle)
336 }
337}
338
339// slot_expired reports whether `slot` has lived longer than `max_lifetime`.
340// A `max_lifetime` of zero means "no limit".
341fn slot_expired(slot IdleSlot, max_lifetime time.Duration) bool {
342 if max_lifetime <= 0 {
343 return false
344 }
345 return time.now() - slot.created_at > max_lifetime
346}
347
348// slot_bad reports whether the libpq handle in `slot` is no longer usable.
349fn slot_bad(slot IdleSlot) bool {
350 if slot.bad {
351 return true
352 }
353 if slot.handle == unsafe { nil } {
354 return true
355 }
356 status := unsafe { ConnStatusType(C.PQstatus(slot.handle)) }
357 return status == .bad
358}
359
360// ensure_active errors if this wrapper has been detached from its handle.
361// The pool nils `c.conn` during release() so any stale `&Conn` kept by user
362// code becomes inert here — it can never reach the underlying PGconn*, even
363// if the pool has since handed that same physical handle to another caller.
364fn (c &Conn) ensure_active() ! {
365 if isnil(c.conn) {
366 return error('pg: operation on released Conn (was close() called?)')
367 }
368}
369
370// is_bad reports whether the underlying libpq connection has gone bad
371// (e.g. dropped TCP, server idle-timeout).
372pub fn (c &Conn) is_bad() bool {
373 if c.bad {
374 return true
375 }
376 if c.conn == unsafe { nil } {
377 return true
378 }
379 status := unsafe { ConnStatusType(C.PQstatus(c.conn)) }
380 return status == .bad
381}
382
383// is_expired reports whether the conn has lived longer than `max_lifetime`.
384// A `max_lifetime` of zero means "no limit".
385pub fn (c &Conn) is_expired(max_lifetime time.Duration) bool {
386 if max_lifetime <= 0 {
387 return false
388 }
389 return time.now() - c.created_at > max_lifetime
390}
391
392// physical_close unconditionally tears down the libpq connection.
393// It is unsafe because the caller must not use the conn afterwards.
394@[unsafe]
395fn (mut c Conn) physical_close() {
396 if c.conn != unsafe { nil } {
397 C.PQfinish(c.conn)
398 c.conn = unsafe { nil }
399 }
400}
401
402fn res_to_rows(res voidptr) []db.pg.Row {
403 nr_rows := C.PQntuples(res)
404 nr_cols := C.PQnfields(res)
405
406 mut rows := []Row{}
407 for i in 0 .. nr_rows {
408 mut row := Row{}
409 for j in 0 .. nr_cols {
410 if C.PQgetisnull(res, i, j) != 0 {
411 row.vals << none
412 } else {
413 val := C.PQgetvalue(res, i, j)
414 row.vals << unsafe { cstring_to_vstring(val) }
415 }
416 }
417 rows << row
418 }
419
420 C.PQclear(res)
421 return rows
422}
423
424fn res_to_rows_no_null(res voidptr) []RowNoNull {
425 nr_rows := C.PQntuples(res)
426 nr_cols := C.PQnfields(res)
427 mut rows := []RowNoNull{}
428 for i in 0 .. nr_rows {
429 mut row := RowNoNull{}
430 for j in 0 .. nr_cols {
431 val := C.PQgetvalue(res, i, j)
432 row.vals << unsafe { cstring_to_vstring(val) }
433 }
434 rows << row
435 }
436 C.PQclear(res)
437 return rows
438}
439
440// res_to_result creates a `Result` struct out of a `C.PGresult` pointer
441fn res_to_result(res voidptr) Result {
442 nr_rows := C.PQntuples(res)
443 nr_cols := C.PQnfields(res)
444
445 mut cols := map[string]int{}
446 mut rows := []Row{}
447 for i in 0 .. nr_rows {
448 mut row := Row{}
449 for j in 0 .. nr_cols {
450 if i == 0 {
451 field_name := unsafe { cstring_to_vstring(C.PQfname(res, j)) }
452 cols[field_name] = j
453 }
454 if C.PQgetisnull(res, i, j) != 0 {
455 row.vals << none
456 } else {
457 val := C.PQgetvalue(res, i, j)
458 row.vals << unsafe { cstring_to_vstring(val) }
459 }
460 }
461 rows << row
462 }
463
464 C.PQclear(res)
465 return Result{cols, rows}
466}
467
468// close releases this conn back to its pool. Safe to call more than once:
469// `release` detaches the wrapper from the underlying handle on the first
470// call (nils `c.conn`), so any subsequent close or method invocation on the
471// same `&Conn` is a no-op or a benign error rather than a use-after-free.
472pub fn (mut c Conn) close() ! {
473 if isnil(c.pool) {
474 unsafe { c.physical_close() }
475 return
476 }
477 mut pool := unsafe { c.pool }
478 pool.release(c)
479}
480
481// q_int submit a command to the database server and
482// returns an the first field in the first tuple
483// converted to an int. If no row is found or on
484// command failure, an error is returned
485pub fn (c &Conn) q_int(query string) !int {
486 rows := c.exec(query)!
487 if rows.len == 0 {
488 return error('q_int "${query}" not found')
489 }
490 row := rows[0]
491 if row.vals.len == 0 {
492 return 0
493 }
494 val := row.vals[0]
495 return val or { '0' }.int()
496}
497
498// q_string submit a command to the database server and
499// returns an the first field in the first tuple
500// as a string. If no row is found or on
501// command failure, an error is returned
502pub fn (c &Conn) q_string(query string) !string {
503 rows := c.exec(query)!
504 if rows.len == 0 {
505 return error('q_string "${query}" not found')
506 }
507 row := rows[0]
508 if row.vals.len == 0 {
509 return ''
510 }
511 val := row.vals[0]
512 return val or { '' }
513}
514
515// q_strings submit a command to the database server and
516// returns the resulting row set. Alias of `exec`
517pub fn (c &Conn) q_strings(query string) ![]db.pg.Row {
518 return c.exec(query)
519}
520
521// exec submits a command to the database server and wait for the result, returning an error on failure and a row set on success
522pub fn (c &Conn) exec(query string) ![]db.pg.Row {
523 c.ensure_active()!
524 res := C.PQexec(c.conn, &char(query.str))
525 return c.handle_error_or_rows(res, 'exec')
526}
527
528// exec_no_null works like exec, but the fields can't be NULL, no optionals
529pub fn (c &Conn) exec_no_null(query string) ![]RowNoNull {
530 c.ensure_active()!
531 res := C.PQexec(c.conn, &char(query.str))
532 return c.handle_error_or_rows_no_null(res, 'exec')
533}
534
535// exec_result submits a command to the database server and wait for the result, returning an error on failure and a `Result` set on success
536pub fn (c &Conn) exec_result(query string) !Result {
537 c.ensure_active()!
538 res := C.PQexec(c.conn, &char(query.str))
539 return c.handle_error_or_result(res, 'exec_result')
540}
541
542fn rows_first_or_empty(rows []db.pg.Row) !Row {
543 if rows.len == 0 {
544 return error('no row')
545 }
546 return rows[0]
547}
548
549// exec_one executes a query and returns its first row as a result, or an error on failure
550pub fn (c &Conn) exec_one(query string) !Row {
551 c.ensure_active()!
552 res := C.PQexec(c.conn, &char(query.str))
553 e := unsafe { C.PQerrorMessage(c.conn).vstring() }
554 if e != '' {
555 c.mark_bad_if_disconnected()
556 return error('pg exec error: "${e}"')
557 }
558 row := rows_first_or_empty(res_to_rows(res))!
559 return row
560}
561
562// exec_param_many executes a query with the parameters provided as ($1), ($2), ($n)
563pub fn (c &Conn) exec_param_many(query string, params []string) ![]db.pg.Row {
564 c.ensure_active()!
565 unsafe {
566 mut param_vals := []&char{len: params.len}
567 for i in 0 .. params.len {
568 param_vals[i] = &char(params[i].str)
569 }
570
571 res := C.PQexecParams(c.conn, &char(query.str), params.len, 0, param_vals.data, 0, 0, 0)
572 return c.handle_error_or_rows(res, 'exec_param_many')
573 }
574}
575
576// exec_param_many executes a query with the parameters provided as ($1), ($2), ($n) and returns a `Result`
577pub fn (c &Conn) exec_param_many_result(query string, params []string) !Result {
578 c.ensure_active()!
579 unsafe {
580 mut param_vals := []&char{len: params.len}
581 for i in 0 .. params.len {
582 param_vals[i] = &char(params[i].str)
583 }
584
585 res := C.PQexecParams(c.conn, &char(query.str), params.len, 0, param_vals.data, 0, 0, 0)
586 return c.handle_error_or_result(res, 'exec_param_many_result')
587 }
588}
589
590// exec_param executes a query with 1 parameter ($1), and returns either an error on failure, or the full result set on success
591pub fn (c &Conn) exec_param(query string, param string) ![]db.pg.Row {
592 return c.exec_param_many(query, [param])
593}
594
595// exec_param2 executes a query with 2 parameters ($1) and ($2), and returns either an error on failure, or the full result set on success
596pub fn (c &Conn) exec_param2(query string, param string, param2 string) ![]db.pg.Row {
597 return c.exec_param_many(query, [param, param2])
598}
599
600// prepare submits a request to create a prepared statement with the given parameters, and waits for completion. You must provide the number of parameters (`$1, $2, $3 ...`) used in the statement
601pub fn (c &Conn) prepare(name string, query string, num_params int) ! {
602 c.ensure_active()!
603 res :=
604 C.PQprepare(c.conn, &char(name.str), &char(query.str), num_params, 0) // defining param types is optional
605
606 return c.handle_error(res, 'prepare')
607}
608
609// exec_prepared sends a request to execute a prepared statement with given parameters, and waits for the result. The number of parameters must match with the parameters declared in the prepared statement.
610pub fn (c &Conn) exec_prepared(name string, params []string) ![]db.pg.Row {
611 c.ensure_active()!
612 unsafe {
613 mut param_vals := []&char{len: params.len}
614 for i in 0 .. params.len {
615 param_vals[i] = &char(params[i].str)
616 }
617
618 res := C.PQexecPrepared(c.conn, &char(name.str), params.len, param_vals.data, 0, 0, 0)
619 return c.handle_error_or_rows(res, 'exec_prepared')
620 }
621}
622
623// exec_prepared sends a request to execute a prepared statement with given parameters, and waits for the result. The number of parameters must match with the parameters declared in the prepared statement.
624// returns `Result`
625pub fn (c &Conn) exec_prepared_result(name string, params []string) !Result {
626 c.ensure_active()!
627 unsafe {
628 mut param_vals := []&char{len: params.len}
629 for i in 0 .. params.len {
630 param_vals[i] = &char(params[i].str)
631 }
632
633 res := C.PQexecPrepared(c.conn, &char(name.str), params.len, param_vals.data, 0, 0, 0)
634 return c.handle_error_or_result(res, 'exec_prepared_result')
635 }
636}
637
638fn (c &Conn) mark_bad_if_disconnected() {
639 status := unsafe { ConnStatusType(C.PQstatus(c.conn)) }
640 if status == .bad {
641 unsafe {
642 mut mc := c
643 mc.bad = true
644 }
645 }
646}
647
648fn (c &Conn) handle_error_or_rows(res voidptr, elabel string) ![]db.pg.Row {
649 e := unsafe { C.PQerrorMessage(c.conn).vstring() }
650 if e != '' {
651 C.PQclear(res)
652 c.mark_bad_if_disconnected()
653 $if trace_pg_error ? {
654 eprintln('pg error: ${e}')
655 }
656 return error('pg ${elabel} error:\n${e}')
657 }
658 return res_to_rows(res)
659}
660
661fn (c &Conn) handle_error_or_rows_no_null(res voidptr, elabel string) ![]RowNoNull {
662 e := unsafe { C.PQerrorMessage(c.conn).vstring() }
663 if e != '' {
664 C.PQclear(res)
665 c.mark_bad_if_disconnected()
666 $if trace_pg_error ? {
667 eprintln('pg error: ${e}')
668 }
669 return error('pg ${elabel} error:\n${e}')
670 }
671 return res_to_rows_no_null(res)
672}
673
674// hande_error_or_result is an internal function similar to handle_error_or_rows that returns `Result` instead of `[]Row`
675fn (c &Conn) handle_error_or_result(res voidptr, elabel string) !Result {
676 e := unsafe { C.PQerrorMessage(c.conn).vstring() }
677 if e != '' {
678 C.PQclear(res)
679 c.mark_bad_if_disconnected()
680 $if trace_pg_error ? {
681 eprintln('pg error: ${e}')
682 }
683 return error('pg ${elabel} error:\n${e}')
684 }
685 return res_to_result(res)
686}
687
688fn (c &Conn) handle_error(res voidptr, elabel string) ! {
689 e := unsafe { C.PQerrorMessage(c.conn).vstring() }
690 if e != '' {
691 C.PQclear(res)
692 c.mark_bad_if_disconnected()
693 $if trace_pg_error ? {
694 eprintln('pg error: ${e}')
695 }
696 return error('pg ${elabel} error:\n${e}')
697 }
698}
699
700// copy_expert executes COPY command
701// https://www.postgresql.org/docs/9.5/libpq-copy.html
702pub fn (c &Conn) copy_expert(query string, mut file io.ReaderWriter) !int {
703 c.ensure_active()!
704 mut res := C.PQexec(c.conn, &char(query.str))
705 status := unsafe { ExecStatusType(C.PQresultStatus(res)) }
706 defer {
707 C.PQclear(res)
708 }
709
710 e := unsafe { C.PQerrorMessage(c.conn).vstring() }
711 if e != '' {
712 return error('pg copy error:\n${e}')
713 }
714
715 if status == .copy_in {
716 mut buf := []u8{len: 4 * 1024}
717 for {
718 n := file.read(mut buf) or {
719 msg := 'pg copy error: Failed to read from input'
720 C.PQputCopyEnd(c.conn, &char(msg.str))
721 return err
722 }
723 if n <= 0 {
724 break
725 }
726
727 code := C.PQputCopyData(c.conn, buf.data, n)
728 if code == -1 {
729 return error('pg copy error: Failed to send data, code=${code}')
730 }
731 }
732
733 code := C.PQputCopyEnd(c.conn, &char(unsafe { nil }))
734
735 if code != 1 {
736 return error('pg copy error: Failed to finish copy command, code: ${code}')
737 }
738 } else if status == .copy_out {
739 for {
740 address := &char(unsafe { nil })
741 n_bytes := C.PQgetCopyData(c.conn, &address, 0)
742 if n_bytes > 0 {
743 mut local_buf := []u8{len: n_bytes}
744 unsafe { C.memcpy(&u8(local_buf.data), address, n_bytes) }
745 file.write(local_buf) or {
746 C.PQfreemem(address)
747 return err
748 }
749 } else if n_bytes == -1 {
750 break
751 } else if n_bytes == -2 {
752 // consult PQerrorMessage for the reason
753 return error('pg copy error: read error')
754 }
755 if address != 0 {
756 C.PQfreemem(address)
757 }
758 }
759 }
760
761 return 0
762}
763
764fn pg_stmt_worker(c &Conn, query string, data orm.QueryData, where orm.QueryData) ![]db.pg.Row {
765 mut param_types := []u32{}
766 mut param_vals := []&char{}
767 mut param_lens := []int{}
768 mut param_formats := []int{}
769
770 pg_stmt_binder(mut param_types, mut param_vals, mut param_lens, mut param_formats, data)
771 pg_stmt_binder(mut param_types, mut param_vals, mut param_lens, mut param_formats, where)
772
773 res := C.PQexecParams(c.conn, &char(query.str), param_vals.len, param_types.data,
774 param_vals.data, param_lens.data, param_formats.data, 0) // here, the last 0 means require text results, 1 - binary results
775 return c.handle_error_or_rows(res, 'orm_stmt_worker')
776}
777
778pub enum PQTransactionLevel {
779 read_uncommitted
780 read_committed
781 repeatable_read
782 serializable
783}
784
785@[params]
786pub struct PQTransactionParam {
787pub:
788 transaction_level PQTransactionLevel = .repeatable_read
789}
790
791// begin_on_conn begins a transaction on this single connection. Most callers
792// should use `DB.begin()` instead, which returns a `Tx` that owns the
793// underlying conn for the lifetime of the transaction.
794pub fn (c &Conn) begin_on_conn(param PQTransactionParam) ! {
795 c.ensure_active()!
796 mut sql_stmt := 'BEGIN TRANSACTION ISOLATION LEVEL '
797 match param.transaction_level {
798 .read_uncommitted { sql_stmt += 'READ UNCOMMITTED' }
799 .read_committed { sql_stmt += 'READ COMMITTED' }
800 .repeatable_read { sql_stmt += 'REPEATABLE READ' }
801 .serializable { sql_stmt += 'SERIALIZABLE' }
802 }
803
804 _ := C.PQexec(c.conn, &char(sql_stmt.str))
805 e := unsafe { C.PQerrorMessage(c.conn).vstring() }
806 if e != '' {
807 c.mark_bad_if_disconnected()
808 return error('pg exec error: "${e}"')
809 }
810}
811
812// commit commits the current transaction on this connection.
813pub fn (c &Conn) commit() ! {
814 c.ensure_active()!
815 _ := C.PQexec(c.conn, c'COMMIT;')
816 e := unsafe { C.PQerrorMessage(c.conn).vstring() }
817 if e != '' {
818 c.mark_bad_if_disconnected()
819 return error('pg exec error: "${e}"')
820 }
821}
822
823// rollback rolls back the current transaction on this connection.
824pub fn (c &Conn) rollback() ! {
825 c.ensure_active()!
826 _ := C.PQexec(c.conn, c'ROLLBACK;')
827 e := unsafe { C.PQerrorMessage(c.conn).vstring() }
828 if e != '' {
829 c.mark_bad_if_disconnected()
830 return error('pg exec error: "${e}"')
831 }
832}
833
834// rollback_to rolls back to a specified savepoint on this connection.
835pub fn (c &Conn) rollback_to(savepoint string) ! {
836 c.ensure_active()!
837 if !savepoint.is_identifier() {
838 return error('savepoint should be a identifier string')
839 }
840 sql_stmt := 'ROLLBACK TO SAVEPOINT ${savepoint};'
841 _ := C.PQexec(c.conn, &char(sql_stmt.str))
842 e := unsafe { C.PQerrorMessage(c.conn).vstring() }
843 if e != '' {
844 c.mark_bad_if_disconnected()
845 return error('pg exec error: "${e}"')
846 }
847}
848
849// savepoint creates a new savepoint on this connection.
850pub fn (c &Conn) savepoint(savepoint string) ! {
851 c.ensure_active()!
852 if !savepoint.is_identifier() {
853 return error('savepoint should be a identifier string')
854 }
855 sql_stmt := 'SAVEPOINT ${savepoint};'
856 _ := C.PQexec(c.conn, &char(sql_stmt.str))
857 e := unsafe { C.PQerrorMessage(c.conn).vstring() }
858 if e != '' {
859 c.mark_bad_if_disconnected()
860 return error('pg exec error: "${e}"')
861 }
862}
863
864// release_savepoint releases a specified savepoint on this connection.
865pub fn (c &Conn) release_savepoint(savepoint string) ! {
866 c.ensure_active()!
867 if !savepoint.is_identifier() {
868 return error('savepoint should be a identifier string')
869 }
870 sql_stmt := 'RELEASE SAVEPOINT ${savepoint};'
871 _ := C.PQexec(c.conn, &char(sql_stmt.str))
872 e := unsafe { C.PQerrorMessage(c.conn).vstring() }
873 if e != '' {
874 c.mark_bad_if_disconnected()
875 return error('pg exec error: "${e}"')
876 }
877}
878
879// validate checks if the connection is still usable
880pub fn (c &Conn) validate() !bool {
881 c.exec_one('SELECT 1')!
882 return true
883}
884
885// reset returns the connection to initial state for reuse
886pub fn (c &Conn) reset() ! {
887}
888
889// as_structs is a `Result` method that maps the results' rows based on the provided mapping function
890pub fn (res Result) as_structs[T](mapper fn (Result, Row) !T) ![]T {
891 mut typed := []T{}
892 for r in res.rows {
893 typed << mapper(res, r)!
894 }
895
896 return typed
897}
898
899// listen registers the connection to receive notifications on the specified channel.
900// After calling this, use consume_input() and get_notification() to receive notifications.
901pub fn (c &Conn) listen(channel string) ! {
902 c.ensure_active()!
903 if !channel.is_identifier() {
904 return error('channel name should be a valid identifier')
905 }
906 sql_stmt := 'LISTEN ${channel};'
907 _ := C.PQexec(c.conn, &char(sql_stmt.str))
908 e := unsafe { C.PQerrorMessage(c.conn).vstring() }
909 if e != '' {
910 c.mark_bad_if_disconnected()
911 return error('pg listen error: "${e}"')
912 }
913}
914
915// unlisten unregisters the connection from receiving notifications on the specified channel.
916// Use unlisten_all() to unregister from all channels.
917pub fn (c &Conn) unlisten(channel string) ! {
918 c.ensure_active()!
919 if !channel.is_identifier() {
920 return error('channel name should be a valid identifier')
921 }
922 sql_stmt := 'UNLISTEN ${channel};'
923 _ := C.PQexec(c.conn, &char(sql_stmt.str))
924 e := unsafe { C.PQerrorMessage(c.conn).vstring() }
925 if e != '' {
926 c.mark_bad_if_disconnected()
927 return error('pg unlisten error: "${e}"')
928 }
929}
930
931// unlisten_all unregisters the connection from all notification channels.
932pub fn (c &Conn) unlisten_all() ! {
933 c.ensure_active()!
934 _ := C.PQexec(c.conn, c'UNLISTEN *;')
935 e := unsafe { C.PQerrorMessage(c.conn).vstring() }
936 if e != '' {
937 c.mark_bad_if_disconnected()
938 return error('pg unlisten error: "${e}"')
939 }
940}
941
942// notify sends a notification on the specified channel with an optional payload.
943// All connections currently listening on that channel will receive the notification.
944pub fn (c &Conn) notify(channel string, payload string) ! {
945 c.ensure_active()!
946 if !channel.is_identifier() {
947 return error('channel name should be a valid identifier')
948 }
949 mut sql_stmt := ''
950 if payload.len > 0 {
951 // Use PQescapeLiteral to safely escape the payload
952 escaped := C.PQescapeLiteral(c.conn, &char(payload.str), usize(payload.len))
953 if escaped == unsafe { nil } {
954 e := unsafe { C.PQerrorMessage(c.conn).vstring() }
955 return error('pg notify error: failed to escape payload: "${e}"')
956 }
957 sql_stmt = unsafe { 'NOTIFY ${channel}, ' + escaped.vstring() + ';' }
958 C.PQfreemem(escaped)
959 } else {
960 sql_stmt = 'NOTIFY ${channel};'
961 }
962 _ := C.PQexec(c.conn, &char(sql_stmt.str))
963 e := unsafe { C.PQerrorMessage(c.conn).vstring() }
964 if e != '' {
965 c.mark_bad_if_disconnected()
966 return error('pg notify error: "${e}"')
967 }
968}
969
970// consume_input reads any available input from the server.
971// This must be called before get_notification() to ensure pending notifications are processed.
972// Returns true on success, false if there was an error reading from the connection.
973pub fn (c &Conn) consume_input() !bool {
974 c.ensure_active()!
975 result := C.PQconsumeInput(c.conn)
976 if result == 0 {
977 e := unsafe { C.PQerrorMessage(c.conn).vstring() }
978 c.mark_bad_if_disconnected()
979 return error('pg consume_input error: "${e}"')
980 }
981 return true
982}
983
984// get_notification returns the next pending notification from the server, if any.
985// Returns none if there are no pending notifications.
986// You should call consume_input() before this to ensure all pending notifications are available.
987pub fn (c &Conn) get_notification() ?Notification {
988 c.ensure_active() or { return none }
989 notify := C.PQnotifies(c.conn)
990 if notify == unsafe { nil } {
991 return none
992 }
993 defer {
994 C.PQfreemem(notify)
995 }
996 return Notification{
997 channel: unsafe { notify.relname.vstring() }
998 pid: notify.be_pid
999 payload: unsafe { notify.extra.vstring() }
1000 }
1001}
1002
1003// socket returns the file descriptor of the connection socket to the server.
1004// This is useful for applications that want to use select() or poll() to wait
1005// for notifications without blocking. Returns -1 if no valid socket.
1006pub fn (c &Conn) socket() int {
1007 c.ensure_active() or { return -1 }
1008 return C.PQsocket(c.conn)
1009}
1010