v2 / vlib / db / pg / pg.c.v
877 lines · 751 sloc · 25.65 KB · 3ac2c4636dd1054c3faf99c5ac5638be42597156
Raw
1module pg
2
3import io
4import orm
5
6$if $pkgconfig('libpq') {
7 #pkgconfig --cflags --libs libpq
8} $else {
9 $if msvc {
10 #flag -llibpq
11 } $else {
12 #flag -lpq
13 }
14 #flag linux -I/usr/include/postgresql
15 //#flag linux -Ipostgresql // cross compiling to linux
16
17 #flag darwin -I/opt/local/include/postgresql11
18 #flag darwin -L/opt/local/lib/postgresql11
19
20 #flag darwin -I/usr/local/opt/libpq/include
21 #flag darwin -L/usr/local/opt/libpq/lib
22
23 #flag darwin -I/opt/homebrew/include
24 #flag darwin -L/opt/homebrew/lib
25
26 #flag darwin -I/opt/homebrew/opt/libpq/include
27 #flag darwin -L/opt/homebrew/opt/libpq/lib
28
29 #flag windows -I @VEXEROOT/thirdparty/pg/libpq
30 #flag windows -L @VEXEROOT/thirdparty/pg/win64
31
32 #flag freebsd -I/usr/local/include
33 #flag freebsd -L/usr/local/lib
34
35 #flag openbsd -I/usr/local/include/postgresql
36 #flag openbsd -L/usr/local/lib
37}
38
39$if cross_compile ? && linux {
40 #include <libpq/libpq-fe.h>
41 #include <libpq/pg_config.h>
42
43 //#flag -lpq // libpq.a is located in LINUXROOT/lib/x86_64-linux-gnu/libpq.a
44 // The bundled linuxroot ships libpq.a but no libpgcommon.a / libpgport.a,
45 // so libpq's references to pg_snprintf, strlcpy, pg_freeaddrinfo_all etc.
46 // are unresolved at link time. Provide minimal stubs that delegate to libc.
47 #flag @VEXEROOT/thirdparty/pg/pgport_stubs.c
48} $else {
49 // PostgreSQL Source Code
50 // https://doxygen.postgresql.org/libpq-fe_8h.html
51 #include <libpq-fe.h>
52
53 // for PG_VERSION_NUM, which is defined everywhere at least since PG 9.5
54 #include <pg_config.h>
55}
56
57// for orm
58$if windows {
59 #include <winsock2.h>
60} $else {
61 #include <arpa/inet.h>
62}
63
64#include "@VMODROOT/vlib/db/pg/compatibility.h"
65
66pub struct DB {
67mut:
68 conn voidptr = unsafe { nil }
69}
70
71pub struct Row {
72pub mut:
73 vals []?string
74}
75
76// val returns the value at `index`, flattening SQL NULL to an empty string.
77pub fn (row Row) val(index int) string {
78 if val := row.vals[index] {
79 return val
80 }
81 return ''
82}
83
84// values returns all row values, flattening SQL NULL to empty strings.
85pub fn (row Row) values() []string {
86 mut values := []string{cap: row.vals.len}
87 for val in row.vals {
88 values << if value := val { value } else { '' }
89 }
90 return values
91}
92
93// val_opt returns the raw optional value at `index`.
94pub fn (row Row) val_opt(index int) ?string {
95 return row.vals[index]
96}
97
98pub struct RowNoNull {
99pub mut:
100 vals []string
101}
102
103pub struct Result {
104pub:
105 cols map[string]int
106 rows []Row
107}
108
109// Notification represents a notification received from the server via LISTEN/NOTIFY
110pub struct Notification {
111pub:
112 channel string // notification channel name
113 pid int // process ID of notifying server process
114 payload string // notification payload string (may be empty)
115}
116
117pub struct Config {
118pub:
119 host string = 'localhost'
120 port int = 5432
121 user string
122 username string
123 password string
124 dbname string
125}
126
127//
128
129pub struct C.pg_result {}
130
131pub struct C.pg_conn {}
132
133@[typedef]
134pub struct C.PGresult {}
135
136@[typedef]
137pub struct C.PGconn {}
138
139// PGnotify represents a notification received from the server via LISTEN/NOTIFY
140@[typedef]
141pub struct C.PGnotify {
142 relname &char // notification channel name
143 be_pid int // process ID of notifying server process
144 extra &char // notification payload string
145}
146
147pub enum ConnStatusType {
148 ok = C.CONNECTION_OK
149 bad = C.CONNECTION_BAD
150 // Non-blocking mode only below here
151 // The existence of these should never be relied upon - they should only be used for user feedback or similar purposes.
152 started = C.CONNECTION_STARTED // Waiting for connection to be made.
153 made = C.CONNECTION_MADE // Connection OK; waiting to send.
154 awaiting_response = C.CONNECTION_AWAITING_RESPONSE // Waiting for a response from the postmaster.
155 auth_ok = C.CONNECTION_AUTH_OK // Received authentication; waiting for backend startup.
156 setenv = C.CONNECTION_SETENV // Negotiating environment.
157 ssl_startup = C.CONNECTION_SSL_STARTUP // Negotiating SSL.
158 needed = C.CONNECTION_NEEDED // Internal state: connect() needed . Available in PG 8
159 check_writable = C.CONNECTION_CHECK_WRITABLE // Check if we could make a writable connection. Available since PG 10
160 consume = C.CONNECTION_CONSUME // Wait for any pending message and consume them. Available since PG 10
161 gss_startup = C.CONNECTION_GSS_STARTUP // Negotiating GSSAPI; available since PG 12
162}
163
164@[typedef]
165pub enum ExecStatusType {
166 empty_query = C.PGRES_EMPTY_QUERY // empty query string was executed
167 command_ok = C.PGRES_COMMAND_OK // a query command that doesn't return anything was executed properly by the backend
168 tuples_ok = C.PGRES_TUPLES_OK // a query command that returns tuples was executed properly by the backend, PGresult contains the result tuples
169 copy_out = C.PGRES_COPY_OUT // Copy Out data transfer in progress
170 copy_in = C.PGRES_COPY_IN // Copy In data transfer in progress
171 bad_response = C.PGRES_BAD_RESPONSE // an unexpected response was recv'd from the backend
172 nonfatal_error = C.PGRES_NONFATAL_ERROR // notice or warning message
173 fatal_error = C.PGRES_FATAL_ERROR // query failed
174 copy_both = C.PGRES_COPY_BOTH // Copy In/Out data transfer in progress
175 single_tuple = C.PGRES_SINGLE_TUPLE // single tuple from larger resultset
176}
177
178//
179
180fn C.PQconnectdb(const_conninfo &char) &C.PGconn
181
182fn C.PQstatus(const_conn &C.PGconn) i32
183
184fn C.PQtransactionStatus(const_conn &C.PGconn) i32
185
186fn C.PQerrorMessage(const_conn &C.PGconn) &char
187
188fn C.PQexec(res &C.PGconn, const_query &char) &C.PGresult
189
190//
191
192fn C.PQgetisnull(const_res &C.PGresult, i32, i32) i32
193
194fn C.PQgetvalue(const_res &C.PGresult, i32, i32) &char
195
196fn C.PQresultStatus(const_res &C.PGresult) i32
197
198fn C.PQntuples(const_res &C.PGresult) i32
199
200fn C.PQnfields(const_res &C.PGresult) i32
201
202fn C.PQfname(const_res &C.PGresult, i32) &char
203
204// Params:
205// const Oid *paramTypes
206// const char *const *paramValues
207// const int *paramLengths
208// const int *paramFormats
209fn C.PQexecParams(conn &C.PGconn, const_command &char, nParams i32, const_paramTypes &int, const_paramValues &char,
210 const_paramLengths &int, const_paramFormats &int, resultFormat i32) &C.PGresult
211
212fn C.PQputCopyData(conn &C.PGconn, const_buffer &char, nbytes i32) i32
213
214fn C.PQputCopyEnd(conn &C.PGconn, const_errmsg &char) i32
215
216fn C.PQgetCopyData(conn &C.PGconn, buffer &&char, async i32) i32
217
218fn C.PQprepare(conn &C.PGconn, const_stmtName &char, const_query &char, nParams i32, const_param_types &&char) &C.PGresult
219
220fn C.PQexecPrepared(conn &C.PGconn, const_stmtName &char, nParams i32, const_paramValues &char,
221 const_paramLengths &int, const_paramFormats &int, resultFormat i32) &C.PGresult
222
223// cleanup
224
225fn C.PQclear(res &C.PGresult)
226
227fn C.PQfreemem(ptr voidptr)
228
229fn C.PQfinish(conn &C.PGconn)
230
231// LISTEN/NOTIFY support
232fn C.PQnotifies(conn &C.PGconn) &C.PGnotify
233
234fn C.PQconsumeInput(conn &C.PGconn) i32
235
236fn C.PQsocket(conn &C.PGconn) i32
237
238fn C.PQescapeLiteral(conn &C.PGconn, str &char, length usize) &char
239
240fn conninfo_needs_quotes(value string) bool {
241 for ch in value {
242 if ch.is_space() || ch == `'` || ch == `\\` {
243 return true
244 }
245 }
246 return false
247}
248
249fn escape_conninfo_value(value string) string {
250 if !conninfo_needs_quotes(value) {
251 return value
252 }
253 mut escaped := []u8{cap: value.len + 2}
254 escaped << `'`
255 for ch in value {
256 if ch == `\\` || ch == `'` {
257 escaped << `\\`
258 }
259 escaped << ch
260 }
261 escaped << `'`
262 return escaped.bytestr()
263}
264
265// connection_user returns the configured username, accepting both `user` and `username`.
266pub fn (config Config) connection_user() !string {
267 if config.user != '' && config.username != '' && config.user != config.username {
268 return error('db.pg: Config.user and Config.username must match when both are set')
269 }
270 if config.user != '' {
271 return config.user
272 }
273 return config.username
274}
275
276fn (config Config) conninfo() !string {
277 mut parts := []string{cap: 5}
278 if config.host != '' {
279 parts << 'host=${escape_conninfo_value(config.host)}'
280 }
281 if config.port > 0 {
282 parts << 'port=${config.port}'
283 }
284 user := config.connection_user()!
285 if user != '' {
286 parts << 'user=${escape_conninfo_value(user)}'
287 }
288 if config.dbname != '' {
289 parts << 'dbname=${escape_conninfo_value(config.dbname)}'
290 }
291 if config.password != '' {
292 parts << 'password=${escape_conninfo_value(config.password)}'
293 }
294 return parts.join(' ')
295}
296
297// connect makes a new connection to the database server using
298// the parameters from the `Config` structure, returning
299// a connection error when something goes wrong.
300// Empty fields are omitted so libpq defaults can still apply.
301pub fn connect(config Config) !DB {
302 return connect_with_conninfo(config.conninfo()!)!
303}
304
305// connect_with_conninfo makes a new connection to the database server using
306// the `conninfo` connection string, returning
307// a connection error when something goes wrong
308pub fn connect_with_conninfo(conninfo string) !DB {
309 conn := C.PQconnectdb(&char(conninfo.str))
310 if conn == 0 {
311 return error('libpq memory allocation error')
312 }
313 status := unsafe { ConnStatusType(C.PQstatus(conn)) }
314 if status != .ok {
315 // We force the construction of a new string as the
316 // error message will be freed by the next `PQfinish`
317 // call
318 c_error_msg := unsafe { C.PQerrorMessage(conn).vstring() }
319 error_msg := '${c_error_msg}'
320 C.PQfinish(conn)
321 return error('Connection to a PG database failed: ${error_msg}')
322 }
323 return DB{
324 conn: conn
325 }
326}
327
328fn res_to_rows(res voidptr) []db.pg.Row {
329 nr_rows := C.PQntuples(res)
330 nr_cols := C.PQnfields(res)
331
332 mut rows := []Row{}
333 for i in 0 .. nr_rows {
334 mut row := Row{}
335 for j in 0 .. nr_cols {
336 if C.PQgetisnull(res, i, j) != 0 {
337 row.vals << none
338 } else {
339 val := C.PQgetvalue(res, i, j)
340 row.vals << unsafe { cstring_to_vstring(val) }
341 }
342 }
343 rows << row
344 }
345
346 C.PQclear(res)
347 return rows
348}
349
350fn res_to_rows_no_null(res voidptr) []RowNoNull {
351 nr_rows := C.PQntuples(res)
352 nr_cols := C.PQnfields(res)
353 mut rows := []RowNoNull{}
354 for i in 0 .. nr_rows {
355 mut row := RowNoNull{}
356 for j in 0 .. nr_cols {
357 val := C.PQgetvalue(res, i, j)
358 row.vals << unsafe { cstring_to_vstring(val) }
359 }
360 rows << row
361 }
362 C.PQclear(res)
363 return rows
364}
365
366// res_to_result creates a `Result` struct out of a `C.PGresult` pointer
367fn res_to_result(res voidptr) Result {
368 nr_rows := C.PQntuples(res)
369 nr_cols := C.PQnfields(res)
370
371 mut cols := map[string]int{}
372 mut rows := []Row{}
373 for i in 0 .. nr_rows {
374 mut row := Row{}
375 for j in 0 .. nr_cols {
376 if i == 0 {
377 field_name := unsafe { cstring_to_vstring(C.PQfname(res, j)) }
378 cols[field_name] = j
379 }
380 if C.PQgetisnull(res, i, j) != 0 {
381 row.vals << none
382 } else {
383 val := C.PQgetvalue(res, i, j)
384 row.vals << unsafe { cstring_to_vstring(val) }
385 }
386 }
387 rows << row
388 }
389
390 C.PQclear(res)
391 return Result{cols, rows}
392}
393
394// close frees the underlying resource allocated by the database connection
395pub fn (db &DB) close() ! {
396 C.PQfinish(db.conn)
397}
398
399// q_int submit a command to the database server and
400// returns an the first field in the first tuple
401// converted to an int. If no row is found or on
402// command failure, an error is returned
403pub fn (db &DB) q_int(query string) !int {
404 rows := db.exec(query)!
405 if rows.len == 0 {
406 return error('q_int "${query}" not found')
407 }
408 row := rows[0]
409 if row.vals.len == 0 {
410 return 0
411 }
412 val := row.vals[0]
413 return val or { '0' }.int()
414}
415
416// q_string submit a command to the database server and
417// returns an the first field in the first tuple
418// as a string. If no row is found or on
419// command failure, an error is returned
420pub fn (db &DB) q_string(query string) !string {
421 rows := db.exec(query)!
422 if rows.len == 0 {
423 return error('q_string "${query}" not found')
424 }
425 row := rows[0]
426 if row.vals.len == 0 {
427 return ''
428 }
429 val := row.vals[0]
430 return val or { '' }
431}
432
433// q_strings submit a command to the database server and
434// returns the resulting row set. Alias of `exec`
435pub fn (db &DB) q_strings(query string) ![]db.pg.Row {
436 return db.exec(query)
437}
438
439// exec submits a command to the database server and wait for the result, returning an error on failure and a row set on success
440pub fn (db &DB) exec(query string) ![]db.pg.Row {
441 res := C.PQexec(db.conn, &char(query.str))
442 return db.handle_error_or_rows(res, 'exec')
443}
444
445// exec_no_null works like exec, but the fields can't be NULL, no optionals
446pub fn (db &DB) exec_no_null(query string) ![]RowNoNull {
447 res := C.PQexec(db.conn, &char(query.str))
448 return db.handle_error_or_rows_no_null(res, 'exec')
449}
450
451// exec_result submits a command to the database server and wait for the result, returning an error on failure and a `Result` set on success
452pub fn (db &DB) exec_result(query string) !Result {
453 res := C.PQexec(db.conn, &char(query.str))
454 return db.handle_error_or_result(res, 'exec_result')
455}
456
457fn rows_first_or_empty(rows []db.pg.Row) !Row {
458 if rows.len == 0 {
459 return error('no row')
460 }
461 return rows[0]
462}
463
464// exec_one executes a query and returns its first row as a result, or an error on failure
465pub fn (db &DB) exec_one(query string) !Row {
466 res := C.PQexec(db.conn, &char(query.str))
467 e := unsafe { C.PQerrorMessage(db.conn).vstring() }
468 if e != '' {
469 return error('pg exec error: "${e}"')
470 }
471 row := rows_first_or_empty(res_to_rows(res))!
472 return row
473}
474
475// exec_param_many executes a query with the parameters provided as ($1), ($2), ($n)
476pub fn (db &DB) exec_param_many(query string, params []string) ![]db.pg.Row {
477 unsafe {
478 mut param_vals := []&char{len: params.len}
479 for i in 0 .. params.len {
480 param_vals[i] = &char(params[i].str)
481 }
482
483 res := C.PQexecParams(db.conn, &char(query.str), params.len, 0, param_vals.data, 0, 0, 0)
484 return db.handle_error_or_rows(res, 'exec_param_many')
485 }
486}
487
488// exec_param_many executes a query with the parameters provided as ($1), ($2), ($n) and returns a `Result`
489pub fn (db &DB) exec_param_many_result(query string, params []string) !Result {
490 unsafe {
491 mut param_vals := []&char{len: params.len}
492 for i in 0 .. params.len {
493 param_vals[i] = &char(params[i].str)
494 }
495
496 res := C.PQexecParams(db.conn, &char(query.str), params.len, 0, param_vals.data, 0, 0, 0)
497 return db.handle_error_or_result(res, 'exec_param_many_result')
498 }
499}
500
501// exec_param executes a query with 1 parameter ($1), and returns either an error on failure, or the full result set on success
502pub fn (db &DB) exec_param(query string, param string) ![]db.pg.Row {
503 return db.exec_param_many(query, [param])
504}
505
506// exec_param2 executes a query with 2 parameters ($1) and ($2), and returns either an error on failure, or the full result set on success
507pub fn (db &DB) exec_param2(query string, param string, param2 string) ![]db.pg.Row {
508 return db.exec_param_many(query, [param, param2])
509}
510
511// prepare submits a request to create a prepared statement with the given parameters, and waits for completion. You must provide the number of parameters (`$1, $2, $3 ...`) used in the statement
512pub fn (db &DB) prepare(name string, query string, num_params int) ! {
513 res :=
514 C.PQprepare(db.conn, &char(name.str), &char(query.str), num_params, 0) // defining param types is optional
515
516 return db.handle_error(res, 'prepare')
517}
518
519// exec_prepared sends a request to execute a prepared statement with given parameters, and waits for the result. The number of parameters must match with the parameters declared in the prepared statement.
520pub fn (db &DB) exec_prepared(name string, params []string) ![]db.pg.Row {
521 unsafe {
522 mut param_vals := []&char{len: params.len}
523 for i in 0 .. params.len {
524 param_vals[i] = &char(params[i].str)
525 }
526
527 res := C.PQexecPrepared(db.conn, &char(name.str), params.len, param_vals.data, 0, 0, 0)
528 return db.handle_error_or_rows(res, 'exec_prepared')
529 }
530}
531
532// exec_prepared sends a request to execute a prepared statement with given parameters, and waits for the result. The number of parameters must match with the parameters declared in the prepared statement.
533// returns `Result`
534pub fn (db &DB) exec_prepared_result(name string, params []string) !Result {
535 unsafe {
536 mut param_vals := []&char{len: params.len}
537 for i in 0 .. params.len {
538 param_vals[i] = &char(params[i].str)
539 }
540
541 res := C.PQexecPrepared(db.conn, &char(name.str), params.len, param_vals.data, 0, 0, 0)
542 return db.handle_error_or_result(res, 'exec_prepared_result')
543 }
544}
545
546fn (db &DB) handle_error_or_rows(res voidptr, elabel string) ![]db.pg.Row {
547 e := unsafe { C.PQerrorMessage(db.conn).vstring() }
548 if e != '' {
549 C.PQclear(res)
550 // TODO make it default
551 $if trace_pg_error ? {
552 eprintln('pg error: ${e}')
553 }
554 return error('pg ${elabel} error:\n${e}')
555 }
556 return res_to_rows(res)
557}
558
559fn (db &DB) handle_error_or_rows_no_null(res voidptr, elabel string) ![]RowNoNull {
560 // TODO copypasta
561 e := unsafe { C.PQerrorMessage(db.conn).vstring() }
562 if e != '' {
563 C.PQclear(res)
564 $if trace_pg_error ? {
565 eprintln('pg error: ${e}')
566 }
567 return error('pg ${elabel} error:\n${e}')
568 }
569 return res_to_rows_no_null(res)
570}
571
572// hande_error_or_result is an internal function similar to handle_error_or_rows that returns `Result` instead of `[]Row`
573fn (db &DB) handle_error_or_result(res voidptr, elabel string) !Result {
574 e := unsafe { C.PQerrorMessage(db.conn).vstring() }
575 if e != '' {
576 C.PQclear(res)
577 $if trace_pg_error ? {
578 eprintln('pg error: ${e}')
579 }
580 return error('pg ${elabel} error:\n${e}')
581 }
582 return res_to_result(res)
583}
584
585fn (db &DB) handle_error(res voidptr, elabel string) ! {
586 e := unsafe { C.PQerrorMessage(db.conn).vstring() }
587 if e != '' {
588 C.PQclear(res)
589 $if trace_pg_error ? {
590 eprintln('pg error: ${e}')
591 }
592 return error('pg ${elabel} error:\n${e}')
593 }
594}
595
596// copy_expert executes COPY command
597// https://www.postgresql.org/docs/9.5/libpq-copy.html
598pub fn (db &DB) copy_expert(query string, mut file io.ReaderWriter) !int {
599 mut res := C.PQexec(db.conn, &char(query.str))
600 status := unsafe { ExecStatusType(C.PQresultStatus(res)) }
601 defer {
602 C.PQclear(res)
603 }
604
605 e := unsafe { C.PQerrorMessage(db.conn).vstring() }
606 if e != '' {
607 return error('pg copy error:\n${e}')
608 }
609
610 if status == .copy_in {
611 mut buf := []u8{len: 4 * 1024}
612 for {
613 n := file.read(mut buf) or {
614 msg := 'pg copy error: Failed to read from input'
615 C.PQputCopyEnd(db.conn, &char(msg.str))
616 return err
617 }
618 if n <= 0 {
619 break
620 }
621
622 code := C.PQputCopyData(db.conn, buf.data, n)
623 if code == -1 {
624 return error('pg copy error: Failed to send data, code=${code}')
625 }
626 }
627
628 code := C.PQputCopyEnd(db.conn, &char(unsafe { nil }))
629
630 if code != 1 {
631 return error('pg copy error: Failed to finish copy command, code: ${code}')
632 }
633 } else if status == .copy_out {
634 for {
635 address := &char(unsafe { nil })
636 n_bytes := C.PQgetCopyData(db.conn, &address, 0)
637 if n_bytes > 0 {
638 mut local_buf := []u8{len: n_bytes}
639 unsafe { C.memcpy(&u8(local_buf.data), address, n_bytes) }
640 file.write(local_buf) or {
641 C.PQfreemem(address)
642 return err
643 }
644 } else if n_bytes == -1 {
645 break
646 } else if n_bytes == -2 {
647 // consult PQerrorMessage for the reason
648 return error('pg copy error: read error')
649 }
650 if address != 0 {
651 C.PQfreemem(address)
652 }
653 }
654 }
655
656 return 0
657}
658
659fn pg_stmt_worker(db &DB, query string, data orm.QueryData, where orm.QueryData) ![]db.pg.Row {
660 mut param_types := []u32{}
661 mut param_vals := []&char{}
662 mut param_lens := []int{}
663 mut param_formats := []int{}
664
665 pg_stmt_binder(mut param_types, mut param_vals, mut param_lens, mut param_formats, data)
666 pg_stmt_binder(mut param_types, mut param_vals, mut param_lens, mut param_formats, where)
667
668 res := C.PQexecParams(db.conn, &char(query.str), param_vals.len, param_types.data,
669 param_vals.data, param_lens.data, param_formats.data, 0) // here, the last 0 means require text results, 1 - binary results
670 return db.handle_error_or_rows(res, 'orm_stmt_worker')
671}
672
673pub enum PQTransactionLevel {
674 read_uncommitted
675 read_committed
676 repeatable_read
677 serializable
678}
679
680@[params]
681pub struct PQTransactionParam {
682 transaction_level PQTransactionLevel = .repeatable_read
683}
684
685// begin begins a new transaction.
686pub fn (db &DB) begin(param PQTransactionParam) ! {
687 mut sql_stmt := 'BEGIN TRANSACTION ISOLATION LEVEL '
688 match param.transaction_level {
689 .read_uncommitted { sql_stmt += 'READ UNCOMMITTED' }
690 .read_committed { sql_stmt += 'READ COMMITTED' }
691 .repeatable_read { sql_stmt += 'REPEATABLE READ' }
692 .serializable { sql_stmt += 'SERIALIZABLE' }
693 }
694
695 _ := C.PQexec(db.conn, &char(sql_stmt.str))
696 e := unsafe { C.PQerrorMessage(db.conn).vstring() }
697 if e != '' {
698 return error('pg exec error: "${e}"')
699 }
700}
701
702// commit commits the current transaction.
703pub fn (db &DB) commit() ! {
704 _ := C.PQexec(db.conn, c'COMMIT;')
705 e := unsafe { C.PQerrorMessage(db.conn).vstring() }
706 if e != '' {
707 return error('pg exec error: "${e}"')
708 }
709}
710
711// rollback rollbacks the current transaction.
712pub fn (db &DB) rollback() ! {
713 _ := C.PQexec(db.conn, c'ROLLBACK;')
714 e := unsafe { C.PQerrorMessage(db.conn).vstring() }
715 if e != '' {
716 return error('pg exec error: "${e}"')
717 }
718}
719
720// rollback_to rollbacks to a specified savepoint.
721pub fn (db &DB) rollback_to(savepoint string) ! {
722 if !savepoint.is_identifier() {
723 return error('savepoint should be a identifier string')
724 }
725 sql_stmt := 'ROLLBACK TO SAVEPOINT ${savepoint};'
726 _ := C.PQexec(db.conn, &char(sql_stmt.str))
727 e := unsafe { C.PQerrorMessage(db.conn).vstring() }
728 if e != '' {
729 return error('pg exec error: "${e}"')
730 }
731}
732
733// savepoint create a new savepoint.
734pub fn (db &DB) savepoint(savepoint string) ! {
735 if !savepoint.is_identifier() {
736 return error('savepoint should be a identifier string')
737 }
738 sql_stmt := 'SAVEPOINT ${savepoint};'
739 _ := C.PQexec(db.conn, &char(sql_stmt.str))
740 e := unsafe { C.PQerrorMessage(db.conn).vstring() }
741 if e != '' {
742 return error('pg exec error: "${e}"')
743 }
744}
745
746// release_savepoint releases a specified savepoint.
747pub fn (db &DB) release_savepoint(savepoint string) ! {
748 if !savepoint.is_identifier() {
749 return error('savepoint should be a identifier string')
750 }
751 sql_stmt := 'RELEASE SAVEPOINT ${savepoint};'
752 _ := C.PQexec(db.conn, &char(sql_stmt.str))
753 e := unsafe { C.PQerrorMessage(db.conn).vstring() }
754 if e != '' {
755 return error('pg exec error: "${e}"')
756 }
757}
758
759// validate checks if the connection is still usable
760pub fn (db &DB) validate() !bool {
761 db.exec_one('SELECT 1')!
762 return true
763}
764
765// reset returns the connection to initial state for reuse
766pub fn (db &DB) reset() ! {
767}
768
769// as_structs is a `Result` method that maps the results' rows based on the provided mapping function
770pub fn (res Result) as_structs[T](mapper fn (Result, Row) !T) ![]T {
771 mut typed := []T{}
772 for r in res.rows {
773 typed << mapper(res, r)!
774 }
775
776 return typed
777}
778
779// listen registers the connection to receive notifications on the specified channel.
780// After calling this, use consume_input() and get_notification() to receive notifications.
781pub fn (db &DB) listen(channel string) ! {
782 if !channel.is_identifier() {
783 return error('channel name should be a valid identifier')
784 }
785 sql_stmt := 'LISTEN ${channel};'
786 _ := C.PQexec(db.conn, &char(sql_stmt.str))
787 e := unsafe { C.PQerrorMessage(db.conn).vstring() }
788 if e != '' {
789 return error('pg listen error: "${e}"')
790 }
791}
792
793// unlisten unregisters the connection from receiving notifications on the specified channel.
794// Use unlisten_all() to unregister from all channels.
795pub fn (db &DB) unlisten(channel string) ! {
796 if !channel.is_identifier() {
797 return error('channel name should be a valid identifier')
798 }
799 sql_stmt := 'UNLISTEN ${channel};'
800 _ := C.PQexec(db.conn, &char(sql_stmt.str))
801 e := unsafe { C.PQerrorMessage(db.conn).vstring() }
802 if e != '' {
803 return error('pg unlisten error: "${e}"')
804 }
805}
806
807// unlisten_all unregisters the connection from all notification channels.
808pub fn (db &DB) unlisten_all() ! {
809 _ := C.PQexec(db.conn, c'UNLISTEN *;')
810 e := unsafe { C.PQerrorMessage(db.conn).vstring() }
811 if e != '' {
812 return error('pg unlisten error: "${e}"')
813 }
814}
815
816// notify sends a notification on the specified channel with an optional payload.
817// All connections currently listening on that channel will receive the notification.
818pub fn (db &DB) notify(channel string, payload string) ! {
819 if !channel.is_identifier() {
820 return error('channel name should be a valid identifier')
821 }
822 mut sql_stmt := ''
823 if payload.len > 0 {
824 // Use PQescapeLiteral to safely escape the payload
825 escaped := C.PQescapeLiteral(db.conn, &char(payload.str), usize(payload.len))
826 if escaped == unsafe { nil } {
827 e := unsafe { C.PQerrorMessage(db.conn).vstring() }
828 return error('pg notify error: failed to escape payload: "${e}"')
829 }
830 sql_stmt = unsafe { 'NOTIFY ${channel}, ' + escaped.vstring() + ';' }
831 C.PQfreemem(escaped)
832 } else {
833 sql_stmt = 'NOTIFY ${channel};'
834 }
835 _ := C.PQexec(db.conn, &char(sql_stmt.str))
836 e := unsafe { C.PQerrorMessage(db.conn).vstring() }
837 if e != '' {
838 return error('pg notify error: "${e}"')
839 }
840}
841
842// consume_input reads any available input from the server.
843// This must be called before get_notification() to ensure pending notifications are processed.
844// Returns true on success, false if there was an error reading from the connection.
845pub fn (db &DB) consume_input() !bool {
846 result := C.PQconsumeInput(db.conn)
847 if result == 0 {
848 e := unsafe { C.PQerrorMessage(db.conn).vstring() }
849 return error('pg consume_input error: "${e}"')
850 }
851 return true
852}
853
854// get_notification returns the next pending notification from the server, if any.
855// Returns none if there are no pending notifications.
856// You should call consume_input() before this to ensure all pending notifications are available.
857pub fn (db &DB) get_notification() ?Notification {
858 notify := C.PQnotifies(db.conn)
859 if notify == unsafe { nil } {
860 return none
861 }
862 defer {
863 C.PQfreemem(notify)
864 }
865 return Notification{
866 channel: unsafe { notify.relname.vstring() }
867 pid: notify.be_pid
868 payload: unsafe { notify.extra.vstring() }
869 }
870}
871
872// socket returns the file descriptor of the connection socket to the server.
873// This is useful for applications that want to use select() or poll() to wait
874// for notifications without blocking. Returns -1 if no valid socket.
875pub fn (db &DB) socket() int {
876 return C.PQsocket(db.conn)
877}
878