From 2b04a6ecbf16697b4a6ae2e1e02d3a381967e8f0 Mon Sep 17 00:00:00 2001 From: Alexander Medvednikov Date: Tue, 26 May 2026 01:50:05 +0300 Subject: [PATCH] db.pg: thread-safe pool-backed DB, Tx, and Conn (Go-style API) (#27265) --- vlib/db/pg/README.md | 70 +++++-- vlib/db/pg/db.v | 207 +++++++++++++++++++ vlib/db/pg/orm.v | 236 +++++++++++++++++----- vlib/db/pg/pg.c.v | 384 ++++++++++++++++++++++++------------ vlib/db/pg/pg_double_test.v | 2 +- vlib/db/pg/pg_result_test.v | 2 +- vlib/db/pg/pg_test.v | 74 ++++--- vlib/db/pg/pool.v | 383 ++++++++++++++++++++++++++++++++--- vlib/db/pg/tx.v | 167 ++++++++++++++++ 9 files changed, 1289 insertions(+), 236 deletions(-) create mode 100644 vlib/db/pg/db.v create mode 100644 vlib/db/pg/tx.v diff --git a/vlib/db/pg/README.md b/vlib/db/pg/README.md index fcf7541fd..469980611 100644 --- a/vlib/db/pg/README.md +++ b/vlib/db/pg/README.md @@ -124,6 +124,42 @@ When you use `pg.connect(pg.Config{ ... })`, empty `Config` fields are omitted f generated libpq connection string. That lets libpq defaults, `PGPASSWORD`, and `.pgpass` apply when you do not set those fields in code. +## Thread Safety & Connection Pool + +`pg.connect()` returns a `&DB` that is safe to share across V threads. Internally +`DB` holds a pool of `Conn` objects (one libpq `PGconn*` each); every method on +`DB` transparently checks a `Conn` out of the pool for the duration of the call +and returns it when done. This matches Go's `database/sql.DB` model. + +```v ignore +mut db := pg.connect(pg.Config{ ... })! +defer { db.close() or {} } + +// Pool defaults: unlimited open conns, 2 idle conns kept warm, no lifetime cap. +// Tune them like Go: +db.set_max_open_conns(50) +db.set_max_idle_conns(10) +db.set_conn_max_lifetime(30 * time.minute) +``` + +For operations that must run on the **same physical connection** — LISTEN/NOTIFY, +session-scoped prepared statements, manual transactions — pin a conn with +`db.conn()` or open a transaction with `db.begin()`: + +```v ignore +// Pinned connection: returned to the pool when conn.close() is called. +mut c := db.conn()! +defer { c.close() or {} } +c.listen('my_channel')! + +// Transaction: the conn is pinned for the lifetime of the Tx and released on +// commit() or rollback(). +mut tx := db.begin()! +tx.exec('UPDATE accounts SET balance = balance - 100 WHERE id = 1')! +tx.exec('UPDATE accounts SET balance = balance + 100 WHERE id = 2')! +tx.commit()! +``` + ## Using Parameterized Queries Parameterized queries (exec_param, etc.) in V require the use of the following syntax: ($n). @@ -143,33 +179,40 @@ channel will receive them. ### Basic Usage +LISTEN/NOTIFY is session-scoped, so you must pin a `Conn` from the pool — +calling `db.listen()` would only listen on whichever pooled conn happens to +serve that one call. + ```v ignore import db.pg fn main() { - db := pg.connect(pg.Config{ user: 'postgres', password: 'password', dbname: 'mydb' })! + mut db := pg.connect(pg.Config{ user: 'postgres', password: 'password', dbname: 'mydb' })! defer { db.close() or {} } + mut c := db.conn()! + defer { c.close() or {} } + // Start listening on a channel - db.listen('my_channel')! + c.listen('my_channel')! // From another connection or session, send a notification - db.notify('my_channel', 'Hello, World!')! + c.notify('my_channel', 'Hello, World!')! // Process incoming data from the server - db.consume_input()! + c.consume_input()! // Check for notifications - if notification := db.get_notification() { + if notification := c.get_notification() { println('Received notification on channel: ${notification.channel}') println('Payload: ${notification.payload}') println('From server process: ${notification.pid}') } // Stop listening - db.unlisten('my_channel')! + c.unlisten('my_channel')! // Or unlisten from all channels - db.unlisten_all()! + c.unlisten_all()! } ``` @@ -182,20 +225,23 @@ import db.pg import time fn main() { - db := pg.connect(pg.Config{ user: 'postgres', password: 'password', dbname: 'mydb' })! + mut db := pg.connect(pg.Config{ user: 'postgres', password: 'password', dbname: 'mydb' })! defer { db.close() or {} } - db.listen('events')! + mut c := db.conn()! + defer { c.close() or {} } + + c.listen('events')! // Get socket fd for polling (useful with select/epoll) - socket_fd := db.socket() + socket_fd := c.socket() println('Socket FD: ${socket_fd}') // Simple polling loop for { - db.consume_input()! + c.consume_input()! for { - notification := db.get_notification() or { break } + notification := c.get_notification() or { break } println('Event: ${notification.channel} - ${notification.payload}') } time.sleep(100 * time.millisecond) diff --git a/vlib/db/pg/db.v b/vlib/db/pg/db.v new file mode 100644 index 000000000..8c2180f97 --- /dev/null +++ b/vlib/db/pg/db.v @@ -0,0 +1,207 @@ +module pg + +import io +import time + +// DB is a thread-safe handle to a PostgreSQL database, backed by a pool of +// `Conn` objects. It mirrors Go's `database/sql.DB` design: methods on `DB` +// transparently acquire a conn for the call, then release it back to the +// pool. For operations that must run on the same physical connection +// (LISTEN/NOTIFY, session-scoped prepared statements, manual transactions), +// use `db.conn()` to pin a conn or `db.begin()` to start a transaction. +pub struct DB { +mut: + pool &Pool = unsafe { nil } +} + +// connect creates a new pool and opens an initial connection to verify the +// config works. The returned `&DB` is safe to share between threads. +pub fn connect(config Config, pcfg PoolConfig) !&DB { + return connect_with_conninfo(config.conninfo()!, pcfg)! +} + +// connect_with_conninfo is the conninfo-string variant of `connect`. +pub fn connect_with_conninfo(conninfo string, pcfg PoolConfig) !&DB { + mut db := &DB{ + pool: new_pool(conninfo, pcfg) + } + // Fail fast if the conninfo is wrong, rather than at first query. + probe := db.pool.acquire()! + db.pool.release(probe) + return db +} + +// close shuts down the pool and tears down all idle connections. +// In-flight conns will be closed when released. +pub fn (mut db DB) close() ! { + if isnil(db.pool) { + return + } + db.pool.close() +} + +// stats returns a snapshot of the pool state. +pub fn (mut db DB) stats() PoolStats { + return db.pool.stats() +} + +// set_max_open_conns caps the total number of open connections. +// A value of 0 means unlimited (the default, like Go). +pub fn (mut db DB) set_max_open_conns(n int) { + db.pool.set_max_open(n) +} + +// set_max_idle_conns caps the number of idle connections kept warm. +pub fn (mut db DB) set_max_idle_conns(n int) { + db.pool.set_max_idle(n) +} + +// set_conn_max_lifetime sets the maximum amount of time a conn may be reused. +// A value of zero means conns are reused indefinitely. +pub fn (mut db DB) set_conn_max_lifetime(d time.Duration) { + db.pool.set_conn_max_lifetime(d) +} + +// conn checks a conn out of the pool. The caller is responsible for calling +// `conn.close()` when done; failing to do so leaks the conn. Use this when +// you need session-bound operations like LISTEN/NOTIFY. +pub fn (mut db DB) conn() !&Conn { + return db.pool.acquire() +} + +// validate borrows a conn from the pool and checks it is alive. +pub fn (mut db DB) validate() !bool { + mut c := db.pool.acquire()! + defer { c.close() or {} } + return c.validate() +} + +// reset is a no-op kept for ORM compatibility. +pub fn (mut db DB) reset() ! { +} + +// ---- exec/query helpers (acquire-use-release) ---- + +// exec runs `query` on a pooled conn and returns the rows. +pub fn (mut db DB) exec(query string) ![]Row { + mut c := db.pool.acquire()! + defer { c.close() or {} } + return c.exec(query) +} + +// exec_no_null runs `query` and returns rows with no nullable fields. +pub fn (mut db DB) exec_no_null(query string) ![]RowNoNull { + mut c := db.pool.acquire()! + defer { c.close() or {} } + return c.exec_no_null(query) +} + +// exec_result runs `query` and returns a `Result` (rows + column index). +pub fn (mut db DB) exec_result(query string) !Result { + mut c := db.pool.acquire()! + defer { c.close() or {} } + return c.exec_result(query) +} + +// exec_one runs `query` and returns its first row. +pub fn (mut db DB) exec_one(query string) !Row { + mut c := db.pool.acquire()! + defer { c.close() or {} } + return c.exec_one(query) +} + +// exec_param_many runs `query` with the given parameters. +pub fn (mut db DB) exec_param_many(query string, params []string) ![]Row { + mut c := db.pool.acquire()! + defer { c.close() or {} } + return c.exec_param_many(query, params) +} + +// exec_param_many_result runs `query` with parameters and returns a `Result`. +pub fn (mut db DB) exec_param_many_result(query string, params []string) !Result { + mut c := db.pool.acquire()! + defer { c.close() or {} } + return c.exec_param_many_result(query, params) +} + +// exec_param runs `query` with a single `$1` parameter. +pub fn (mut db DB) exec_param(query string, param string) ![]Row { + return db.exec_param_many(query, [param]) +} + +// exec_param2 runs `query` with two parameters (`$1`, `$2`). +pub fn (mut db DB) exec_param2(query string, param string, param2 string) ![]Row { + return db.exec_param_many(query, [param, param2]) +} + +// q_int runs `query` and returns the first column of the first row as int. +pub fn (mut db DB) q_int(query string) !int { + mut c := db.pool.acquire()! + defer { c.close() or {} } + return c.q_int(query) +} + +// q_string runs `query` and returns the first column of the first row as string. +pub fn (mut db DB) q_string(query string) !string { + mut c := db.pool.acquire()! + defer { c.close() or {} } + return c.q_string(query) +} + +// q_strings runs `query` and returns the full row set (alias of `exec`). +pub fn (mut db DB) q_strings(query string) ![]Row { + return db.exec(query) +} + +// copy_expert runs a COPY command on a pooled conn. +pub fn (mut db DB) copy_expert(query string, mut file io.ReaderWriter) !int { + mut c := db.pool.acquire()! + defer { c.close() or {} } + return c.copy_expert(query, mut file) +} + +// ---- prepared statements ---- +// +// NOTE: prepared statements are session-scoped. Calling `prepare` on `DB` +// only registers the statement on the conn that happened to serve the call. +// Use `db.conn()` to pin a conn for prepare+exec_prepared cycles. + +// prepare registers a prepared statement on a transient conn. +// For repeated use, pin a conn via `db.conn()`. +pub fn (mut db DB) prepare(name string, query string, num_params int) ! { + mut c := db.pool.acquire()! + defer { c.close() or {} } + return c.prepare(name, query, num_params) +} + +// exec_prepared runs a previously-prepared statement. +pub fn (mut db DB) exec_prepared(name string, params []string) ![]Row { + mut c := db.pool.acquire()! + defer { c.close() or {} } + return c.exec_prepared(name, params) +} + +// exec_prepared_result runs a previously-prepared statement and returns a `Result`. +pub fn (mut db DB) exec_prepared_result(name string, params []string) !Result { + mut c := db.pool.acquire()! + defer { c.close() or {} } + return c.exec_prepared_result(name, params) +} + +// ---- transactions ---- + +// begin starts a new transaction and returns a `Tx` that pins a conn from +// the pool. The conn is released when `Tx.commit()` or `Tx.rollback()` is +// called. The default isolation level is REPEATABLE READ (matching the old +// single-conn API); pass `PQTransactionParam{ transaction_level: ... }` to +// override. +pub fn (mut db DB) begin(param PQTransactionParam) !&Tx { + mut c := db.pool.acquire()! + c.begin_on_conn(param) or { + c.close() or {} + return err + } + return &Tx{ + conn: c + } +} diff --git a/vlib/db/pg/orm.v b/vlib/db/pg/orm.v index 61f127b7b..a93ecfa12 100644 --- a/vlib/db/pg/orm.v +++ b/vlib/db/pg/orm.v @@ -4,14 +4,15 @@ import orm import time import net.conv -// sql expr +// ---- ORM on Conn (single pinned connection) ---- -// select is used internally by V's ORM for processing `SELECT ` queries -pub fn (db DB) select(config orm.SelectConfig, data orm.QueryData, where orm.QueryData) ![][]orm.Primitive { +// select is used internally by V's ORM for processing `SELECT ` queries. +pub fn (c &Conn) select(config orm.SelectConfig, data orm.QueryData, where orm.QueryData) ![][]orm.Primitive { + c.ensure_active()! where_with_tenant := orm.apply_tenant_filter(config.table, where) query := orm.orm_select_gen(config, '"', true, '$', 1, where_with_tenant) - rows := pg_stmt_worker(db, query, where_with_tenant, data)! + rows := pg_stmt_worker(c, query, where_with_tenant, data)! mut ret := [][]orm.Primitive{} @@ -26,90 +27,235 @@ pub fn (db DB) select(config orm.SelectConfig, data orm.QueryData, where orm.Que return ret } -// sql stmt - -// insert is used internally by V's ORM for processing `INSERT ` queries -pub fn (db DB) insert(table orm.Table, data orm.QueryData) ! { +// insert is used internally by V's ORM for processing `INSERT ` queries. +pub fn (c &Conn) insert(table orm.Table, data orm.QueryData) ! { + c.ensure_active()! query, converted_data := orm.orm_stmt_gen(.pg, table, '"', .insert, true, '$', 1, data, orm.QueryData{}) - pg_stmt_worker(db, query, converted_data, orm.QueryData{})! + pg_stmt_worker(c, query, converted_data, orm.QueryData{})! } -// update is used internally by V's ORM for processing `UPDATE ` queries -pub fn (db DB) update(table orm.Table, data orm.QueryData, where orm.QueryData) ! { +// update is used internally by V's ORM for processing `UPDATE ` queries. +pub fn (c &Conn) update(table orm.Table, data orm.QueryData, where orm.QueryData) ! { + c.ensure_active()! where_with_tenant := orm.apply_tenant_filter(table, where) query, _ := orm.orm_stmt_gen(.default, table, '"', .update, true, '$', 1, data, where_with_tenant) - pg_stmt_worker(db, query, data, where_with_tenant)! + pg_stmt_worker(c, query, data, where_with_tenant)! } -// delete is used internally by V's ORM for processing `DELETE ` queries -pub fn (db DB) delete(table orm.Table, where orm.QueryData) ! { +// delete is used internally by V's ORM for processing `DELETE ` queries. +pub fn (c &Conn) delete(table orm.Table, where orm.QueryData) ! { + c.ensure_active()! where_with_tenant := orm.apply_tenant_filter(table, where) query, _ := orm.orm_stmt_gen(.default, table, '"', .delete, true, '$', 1, orm.QueryData{}, where_with_tenant) - pg_stmt_worker(db, query, orm.QueryData{}, where_with_tenant)! + pg_stmt_worker(c, query, orm.QueryData{}, where_with_tenant)! } -// last_id is used internally by V's ORM for post-processing `INSERT ` queries -pub fn (db DB) last_id() int { +// last_id is used internally by V's ORM for post-processing `INSERT ` queries. +pub fn (c &Conn) last_id() int { query := 'SELECT LASTVAL();' - return db.q_int(query) or { 0 } + return c.q_int(query) or { 0 } } -// DDL (table creation/destroying etc) - -// create is used internally by V's ORM for processing table creation queries (DDL) -pub fn (db DB) create(table orm.Table, fields []orm.TableField) ! { +// create is used internally by V's ORM for processing table creation queries (DDL). +pub fn (c &Conn) create(table orm.Table, fields []orm.TableField) ! { query := orm.orm_table_gen(.pg, table, '"', true, 0, fields, pg_type_from_v, false) or { return err } stmts := query.split(';') for stmt in stmts { if stmt != '' { - pg_stmt_worker(db, stmt + ';', orm.QueryData{}, orm.QueryData{})! + c.exec(stmt + ';')! } } } -// drop is used internally by V's ORM for processing table destroying queries (DDL) -pub fn (db DB) drop(table orm.Table) ! { +// drop is used internally by V's ORM for processing table destroying queries (DDL). +pub fn (c &Conn) drop(table orm.Table) ! { query := 'DROP TABLE "${table.name}";' - pg_stmt_worker(db, query, orm.QueryData{}, orm.QueryData{})! + c.exec(query)! +} + +// orm_begin starts a transaction on this conn. +pub fn (c &Conn) orm_begin() ! { + c.begin_on_conn()! +} + +// orm_commit commits the transaction on this conn. +pub fn (c &Conn) orm_commit() ! { + c.commit()! +} + +// orm_rollback rolls back the transaction on this conn. +pub fn (c &Conn) orm_rollback() ! { + c.rollback()! +} + +// orm_savepoint creates a savepoint on this conn. +pub fn (c &Conn) orm_savepoint(name string) ! { + c.savepoint(name)! +} + +// orm_rollback_to rolls back to a savepoint on this conn. +pub fn (c &Conn) orm_rollback_to(name string) ! { + c.rollback_to(name)! +} + +// orm_release_savepoint releases a savepoint on this conn. +pub fn (c &Conn) orm_release_savepoint(name string) ! { + c.release_savepoint(name)! +} + +// ---- ORM on DB (acquire-use-release per call) ---- + +// select acquires a conn from the pool and runs the ORM SELECT on it. +pub fn (mut db DB) select(config orm.SelectConfig, data orm.QueryData, where orm.QueryData) ![][]orm.Primitive { + mut c := db.pool.acquire()! + defer { + c.close() or {} + } + return c.select(config, data, where) +} + +// insert acquires a conn from the pool, runs the ORM INSERT on it, and +// stashes LASTVAL() captured on the same session for the calling thread. +// V's `sql db { insert ... }` macro emits a follow-up `db.last_id()` call; +// stashing here lets that read return the correct id even though the pool +// may hand out a different conn for the second call. +pub fn (mut db DB) insert(table orm.Table, data orm.QueryData) ! { + mut c := db.pool.acquire()! + defer { + c.close() or {} + } + c.insert(table, data)! + db.pool.stash_last_id(c.last_id()) +} + +// update acquires a conn from the pool and runs the ORM UPDATE on it. +pub fn (mut db DB) update(table orm.Table, data orm.QueryData, where orm.QueryData) ! { + mut c := db.pool.acquire()! + defer { + c.close() or {} + } + c.update(table, data, where)! +} + +// delete acquires a conn from the pool and runs the ORM DELETE on it. +pub fn (mut db DB) delete(table orm.Table, where orm.QueryData) ! { + mut c := db.pool.acquire()! + defer { + c.close() or {} + } + c.delete(table, where)! +} + +// create acquires a conn from the pool and runs CREATE TABLE on it. +pub fn (mut db DB) create(table orm.Table, fields []orm.TableField) ! { + mut c := db.pool.acquire()! + defer { + c.close() or {} + } + c.create(table, fields)! +} + +// drop acquires a conn from the pool and runs DROP TABLE on it. +pub fn (mut db DB) drop(table orm.Table) ! { + mut c := db.pool.acquire()! + defer { + c.close() or {} + } + c.drop(table)! +} + +// last_id returns the id stashed by this thread's most recent `DB.insert` +// (or 0 if there is none). LASTVAL() itself is session-scoped, so calling +// it on a freshly-checked-out pool conn would return the wrong value or 0; +// `DB.insert` captures it on the same conn that ran the INSERT and stashes +// it per-thread, which is what V's ORM macro expects. +pub fn (mut db DB) last_id() int { + return db.pool.take_last_id() +} + +// ---- ORM on Tx (use the pinned conn) ---- + +// select runs the ORM SELECT on the pinned transaction conn. +pub fn (mut tx Tx) select(config orm.SelectConfig, data orm.QueryData, where orm.QueryData) ![][]orm.Primitive { + tx.ensure_active()! + return tx.conn.select(config, data, where) +} + +// insert runs the ORM INSERT on the pinned transaction conn. +pub fn (mut tx Tx) insert(table orm.Table, data orm.QueryData) ! { + tx.ensure_active()! + tx.conn.insert(table, data)! +} + +// update runs the ORM UPDATE on the pinned transaction conn. +pub fn (mut tx Tx) update(table orm.Table, data orm.QueryData, where orm.QueryData) ! { + tx.ensure_active()! + tx.conn.update(table, data, where)! +} + +// delete runs the ORM DELETE on the pinned transaction conn. +pub fn (mut tx Tx) delete(table orm.Table, where orm.QueryData) ! { + tx.ensure_active()! + tx.conn.delete(table, where)! +} + +// create runs CREATE TABLE on the pinned transaction conn. +pub fn (mut tx Tx) create(table orm.Table, fields []orm.TableField) ! { + tx.ensure_active()! + tx.conn.create(table, fields)! +} + +// drop runs DROP TABLE on the pinned transaction conn. +pub fn (mut tx Tx) drop(table orm.Table) ! { + tx.ensure_active()! + tx.conn.drop(table)! +} + +// last_id returns the last inserted id on the pinned conn. +pub fn (mut tx Tx) last_id() int { + if tx.done || isnil(tx.conn) { + return 0 + } + return tx.conn.last_id() } -// orm_begin starts a transaction for ORM helpers. -pub fn (db DB) orm_begin() ! { - db.begin()! +// orm_begin is a no-op on Tx (begin already ran when the Tx was created). +// It exists so Tx satisfies `orm.TransactionalConnection` for nested savepoints. +pub fn (mut tx Tx) orm_begin() ! { } -// orm_commit commits a transaction for ORM helpers. -pub fn (db DB) orm_commit() ! { - db.commit()! +// orm_commit commits the Tx. +pub fn (mut tx Tx) orm_commit() ! { + tx.commit()! } -// orm_rollback rolls back a transaction for ORM helpers. -pub fn (db DB) orm_rollback() ! { - db.rollback()! +// orm_rollback rolls back the Tx. +pub fn (mut tx Tx) orm_rollback() ! { + tx.rollback()! } -// orm_savepoint creates a savepoint for ORM helpers. -pub fn (db DB) orm_savepoint(name string) ! { - db.savepoint(name)! +// orm_savepoint creates a savepoint inside the Tx. +pub fn (mut tx Tx) orm_savepoint(name string) ! { + tx.savepoint(name)! } -// orm_rollback_to rolls back to a savepoint for ORM helpers. -pub fn (db DB) orm_rollback_to(name string) ! { - db.rollback_to(name)! +// orm_rollback_to rolls back to a savepoint inside the Tx. +pub fn (mut tx Tx) orm_rollback_to(name string) ! { + tx.rollback_to(name)! } -// orm_release_savepoint releases a savepoint for ORM helpers. -pub fn (db DB) orm_release_savepoint(name string) ! { - db.release_savepoint(name)! +// orm_release_savepoint releases a savepoint inside the Tx. +pub fn (mut tx Tx) orm_release_savepoint(name string) ! { + tx.release_savepoint(name)! } -// utils +// ---- utils ---- fn pg_stmt_binder(mut types []u32, mut vals []&char, mut lens []int, mut formats []int, d orm.QueryData) { for data in d.data { diff --git a/vlib/db/pg/pg.c.v b/vlib/db/pg/pg.c.v index 1939f453c..06fb59220 100644 --- a/vlib/db/pg/pg.c.v +++ b/vlib/db/pg/pg.c.v @@ -2,6 +2,7 @@ module pg import io import orm +import time $if $pkgconfig('libpq') { #pkgconfig --cflags --libs libpq @@ -63,9 +64,18 @@ $if windows { #include "@VMODROOT/vlib/db/pg/compatibility.h" -pub struct DB { +// Conn is a single libpq connection. It is NOT safe for concurrent use by +// multiple V threads (libpq enforces serial use of `PGconn*`). Use it pinned +// for operations that require a specific connection (LISTEN/NOTIFY, prepared +// statements scoped to the session, manual transactions). For pooled, +// thread-safe access prefer `DB`, which checks out a Conn per call. +@[heap] +pub struct Conn { mut: - conn voidptr = unsafe { nil } + conn voidptr = unsafe { nil } + pool &Pool = unsafe { nil } + created_at time.Time + bad bool } pub struct Row { @@ -294,18 +304,11 @@ fn (config Config) conninfo() !string { return parts.join(' ') } -// connect makes a new connection to the database server using -// the parameters from the `Config` structure, returning -// a connection error when something goes wrong. -// Empty fields are omitted so libpq defaults can still apply. -pub fn connect(config Config) !DB { - return connect_with_conninfo(config.conninfo()!)! -} - -// connect_with_conninfo makes a new connection to the database server using -// the `conninfo` connection string, returning -// a connection error when something goes wrong -pub fn connect_with_conninfo(conninfo string) !DB { +// connect_slot opens a single libpq connection and returns it as an +// `IdleSlot` (raw handle + creation timestamp). The pool wraps the slot in a +// fresh `&Conn` per checkout so the wrapper cannot be revived by a stale +// reference after release. +fn connect_slot(conninfo string) !IdleSlot { conn := C.PQconnectdb(&char(conninfo.str)) if conn == 0 { return error('libpq memory allocation error') @@ -313,15 +316,86 @@ pub fn connect_with_conninfo(conninfo string) !DB { status := unsafe { ConnStatusType(C.PQstatus(conn)) } if status != .ok { // We force the construction of a new string as the - // error message will be freed by the next `PQfinish` - // call + // error message will be freed by the next `PQfinish` call c_error_msg := unsafe { C.PQerrorMessage(conn).vstring() } error_msg := '${c_error_msg}' C.PQfinish(conn) return error('Connection to a PG database failed: ${error_msg}') } - return DB{ - conn: conn + return IdleSlot{ + handle: conn + created_at: time.now() + } +} + +// physical_close_handle tears down a raw libpq handle. Used by the pool when +// it discards an expired/broken slot or unwinds during shutdown. +fn physical_close_handle(handle voidptr) { + if handle != unsafe { nil } { + C.PQfinish(handle) + } +} + +// slot_expired reports whether `slot` has lived longer than `max_lifetime`. +// A `max_lifetime` of zero means "no limit". +fn slot_expired(slot IdleSlot, max_lifetime time.Duration) bool { + if max_lifetime <= 0 { + return false + } + return time.now() - slot.created_at > max_lifetime +} + +// slot_bad reports whether the libpq handle in `slot` is no longer usable. +fn slot_bad(slot IdleSlot) bool { + if slot.bad { + return true + } + if slot.handle == unsafe { nil } { + return true + } + status := unsafe { ConnStatusType(C.PQstatus(slot.handle)) } + return status == .bad +} + +// ensure_active errors if this wrapper has been detached from its handle. +// The pool nils `c.conn` during release() so any stale `&Conn` kept by user +// code becomes inert here — it can never reach the underlying PGconn*, even +// if the pool has since handed that same physical handle to another caller. +fn (c &Conn) ensure_active() ! { + if isnil(c.conn) { + return error('pg: operation on released Conn (was close() called?)') + } +} + +// is_bad reports whether the underlying libpq connection has gone bad +// (e.g. dropped TCP, server idle-timeout). +pub fn (c &Conn) is_bad() bool { + if c.bad { + return true + } + if c.conn == unsafe { nil } { + return true + } + status := unsafe { ConnStatusType(C.PQstatus(c.conn)) } + return status == .bad +} + +// is_expired reports whether the conn has lived longer than `max_lifetime`. +// A `max_lifetime` of zero means "no limit". +pub fn (c &Conn) is_expired(max_lifetime time.Duration) bool { + if max_lifetime <= 0 { + return false + } + return time.now() - c.created_at > max_lifetime +} + +// physical_close unconditionally tears down the libpq connection. +// It is unsafe because the caller must not use the conn afterwards. +@[unsafe] +fn (mut c Conn) physical_close() { + if c.conn != unsafe { nil } { + C.PQfinish(c.conn) + c.conn = unsafe { nil } } } @@ -391,17 +465,25 @@ fn res_to_result(res voidptr) Result { return Result{cols, rows} } -// close frees the underlying resource allocated by the database connection -pub fn (db &DB) close() ! { - C.PQfinish(db.conn) +// close releases this conn back to its pool. Safe to call more than once: +// `release` detaches the wrapper from the underlying handle on the first +// call (nils `c.conn`), so any subsequent close or method invocation on the +// same `&Conn` is a no-op or a benign error rather than a use-after-free. +pub fn (mut c Conn) close() ! { + if isnil(c.pool) { + unsafe { c.physical_close() } + return + } + mut pool := unsafe { c.pool } + pool.release(c) } // q_int submit a command to the database server and // returns an the first field in the first tuple // converted to an int. If no row is found or on // command failure, an error is returned -pub fn (db &DB) q_int(query string) !int { - rows := db.exec(query)! +pub fn (c &Conn) q_int(query string) !int { + rows := c.exec(query)! if rows.len == 0 { return error('q_int "${query}" not found') } @@ -417,8 +499,8 @@ pub fn (db &DB) q_int(query string) !int { // returns an the first field in the first tuple // as a string. If no row is found or on // command failure, an error is returned -pub fn (db &DB) q_string(query string) !string { - rows := db.exec(query)! +pub fn (c &Conn) q_string(query string) !string { + rows := c.exec(query)! if rows.len == 0 { return error('q_string "${query}" not found') } @@ -432,26 +514,29 @@ pub fn (db &DB) q_string(query string) !string { // q_strings submit a command to the database server and // returns the resulting row set. Alias of `exec` -pub fn (db &DB) q_strings(query string) ![]db.pg.Row { - return db.exec(query) +pub fn (c &Conn) q_strings(query string) ![]db.pg.Row { + return c.exec(query) } // exec submits a command to the database server and wait for the result, returning an error on failure and a row set on success -pub fn (db &DB) exec(query string) ![]db.pg.Row { - res := C.PQexec(db.conn, &char(query.str)) - return db.handle_error_or_rows(res, 'exec') +pub fn (c &Conn) exec(query string) ![]db.pg.Row { + c.ensure_active()! + res := C.PQexec(c.conn, &char(query.str)) + return c.handle_error_or_rows(res, 'exec') } // exec_no_null works like exec, but the fields can't be NULL, no optionals -pub fn (db &DB) exec_no_null(query string) ![]RowNoNull { - res := C.PQexec(db.conn, &char(query.str)) - return db.handle_error_or_rows_no_null(res, 'exec') +pub fn (c &Conn) exec_no_null(query string) ![]RowNoNull { + c.ensure_active()! + res := C.PQexec(c.conn, &char(query.str)) + return c.handle_error_or_rows_no_null(res, 'exec') } // exec_result submits a command to the database server and wait for the result, returning an error on failure and a `Result` set on success -pub fn (db &DB) exec_result(query string) !Result { - res := C.PQexec(db.conn, &char(query.str)) - return db.handle_error_or_result(res, 'exec_result') +pub fn (c &Conn) exec_result(query string) !Result { + c.ensure_active()! + res := C.PQexec(c.conn, &char(query.str)) + return c.handle_error_or_result(res, 'exec_result') } fn rows_first_or_empty(rows []db.pg.Row) !Row { @@ -462,10 +547,12 @@ fn rows_first_or_empty(rows []db.pg.Row) !Row { } // exec_one executes a query and returns its first row as a result, or an error on failure -pub fn (db &DB) exec_one(query string) !Row { - res := C.PQexec(db.conn, &char(query.str)) - e := unsafe { C.PQerrorMessage(db.conn).vstring() } +pub fn (c &Conn) exec_one(query string) !Row { + c.ensure_active()! + res := C.PQexec(c.conn, &char(query.str)) + e := unsafe { C.PQerrorMessage(c.conn).vstring() } if e != '' { + c.mark_bad_if_disconnected() return error('pg exec error: "${e}"') } row := rows_first_or_empty(res_to_rows(res))! @@ -473,81 +560,96 @@ pub fn (db &DB) exec_one(query string) !Row { } // exec_param_many executes a query with the parameters provided as ($1), ($2), ($n) -pub fn (db &DB) exec_param_many(query string, params []string) ![]db.pg.Row { +pub fn (c &Conn) exec_param_many(query string, params []string) ![]db.pg.Row { + c.ensure_active()! unsafe { mut param_vals := []&char{len: params.len} for i in 0 .. params.len { param_vals[i] = &char(params[i].str) } - res := C.PQexecParams(db.conn, &char(query.str), params.len, 0, param_vals.data, 0, 0, 0) - return db.handle_error_or_rows(res, 'exec_param_many') + res := C.PQexecParams(c.conn, &char(query.str), params.len, 0, param_vals.data, 0, 0, 0) + return c.handle_error_or_rows(res, 'exec_param_many') } } // exec_param_many executes a query with the parameters provided as ($1), ($2), ($n) and returns a `Result` -pub fn (db &DB) exec_param_many_result(query string, params []string) !Result { +pub fn (c &Conn) exec_param_many_result(query string, params []string) !Result { + c.ensure_active()! unsafe { mut param_vals := []&char{len: params.len} for i in 0 .. params.len { param_vals[i] = &char(params[i].str) } - res := C.PQexecParams(db.conn, &char(query.str), params.len, 0, param_vals.data, 0, 0, 0) - return db.handle_error_or_result(res, 'exec_param_many_result') + res := C.PQexecParams(c.conn, &char(query.str), params.len, 0, param_vals.data, 0, 0, 0) + return c.handle_error_or_result(res, 'exec_param_many_result') } } // exec_param executes a query with 1 parameter ($1), and returns either an error on failure, or the full result set on success -pub fn (db &DB) exec_param(query string, param string) ![]db.pg.Row { - return db.exec_param_many(query, [param]) +pub fn (c &Conn) exec_param(query string, param string) ![]db.pg.Row { + return c.exec_param_many(query, [param]) } // exec_param2 executes a query with 2 parameters ($1) and ($2), and returns either an error on failure, or the full result set on success -pub fn (db &DB) exec_param2(query string, param string, param2 string) ![]db.pg.Row { - return db.exec_param_many(query, [param, param2]) +pub fn (c &Conn) exec_param2(query string, param string, param2 string) ![]db.pg.Row { + return c.exec_param_many(query, [param, param2]) } // prepare submits a request to create a prepared statement with the given parameters, and waits for completion. You must provide the number of parameters (`$1, $2, $3 ...`) used in the statement -pub fn (db &DB) prepare(name string, query string, num_params int) ! { +pub fn (c &Conn) prepare(name string, query string, num_params int) ! { + c.ensure_active()! res := - C.PQprepare(db.conn, &char(name.str), &char(query.str), num_params, 0) // defining param types is optional + C.PQprepare(c.conn, &char(name.str), &char(query.str), num_params, 0) // defining param types is optional - return db.handle_error(res, 'prepare') + return c.handle_error(res, 'prepare') } // exec_prepared sends a request to execute a prepared statement with given parameters, and waits for the result. The number of parameters must match with the parameters declared in the prepared statement. -pub fn (db &DB) exec_prepared(name string, params []string) ![]db.pg.Row { +pub fn (c &Conn) exec_prepared(name string, params []string) ![]db.pg.Row { + c.ensure_active()! unsafe { mut param_vals := []&char{len: params.len} for i in 0 .. params.len { param_vals[i] = &char(params[i].str) } - res := C.PQexecPrepared(db.conn, &char(name.str), params.len, param_vals.data, 0, 0, 0) - return db.handle_error_or_rows(res, 'exec_prepared') + res := C.PQexecPrepared(c.conn, &char(name.str), params.len, param_vals.data, 0, 0, 0) + return c.handle_error_or_rows(res, 'exec_prepared') } } // exec_prepared sends a request to execute a prepared statement with given parameters, and waits for the result. The number of parameters must match with the parameters declared in the prepared statement. // returns `Result` -pub fn (db &DB) exec_prepared_result(name string, params []string) !Result { +pub fn (c &Conn) exec_prepared_result(name string, params []string) !Result { + c.ensure_active()! unsafe { mut param_vals := []&char{len: params.len} for i in 0 .. params.len { param_vals[i] = &char(params[i].str) } - res := C.PQexecPrepared(db.conn, &char(name.str), params.len, param_vals.data, 0, 0, 0) - return db.handle_error_or_result(res, 'exec_prepared_result') + res := C.PQexecPrepared(c.conn, &char(name.str), params.len, param_vals.data, 0, 0, 0) + return c.handle_error_or_result(res, 'exec_prepared_result') } } -fn (db &DB) handle_error_or_rows(res voidptr, elabel string) ![]db.pg.Row { - e := unsafe { C.PQerrorMessage(db.conn).vstring() } +fn (c &Conn) mark_bad_if_disconnected() { + status := unsafe { ConnStatusType(C.PQstatus(c.conn)) } + if status == .bad { + unsafe { + mut mc := c + mc.bad = true + } + } +} + +fn (c &Conn) handle_error_or_rows(res voidptr, elabel string) ![]db.pg.Row { + e := unsafe { C.PQerrorMessage(c.conn).vstring() } if e != '' { C.PQclear(res) - // TODO make it default + c.mark_bad_if_disconnected() $if trace_pg_error ? { eprintln('pg error: ${e}') } @@ -556,11 +658,11 @@ fn (db &DB) handle_error_or_rows(res voidptr, elabel string) ![]db.pg.Row { return res_to_rows(res) } -fn (db &DB) handle_error_or_rows_no_null(res voidptr, elabel string) ![]RowNoNull { - // TODO copypasta - e := unsafe { C.PQerrorMessage(db.conn).vstring() } +fn (c &Conn) handle_error_or_rows_no_null(res voidptr, elabel string) ![]RowNoNull { + e := unsafe { C.PQerrorMessage(c.conn).vstring() } if e != '' { C.PQclear(res) + c.mark_bad_if_disconnected() $if trace_pg_error ? { eprintln('pg error: ${e}') } @@ -570,10 +672,11 @@ fn (db &DB) handle_error_or_rows_no_null(res voidptr, elabel string) ![]RowNoNul } // hande_error_or_result is an internal function similar to handle_error_or_rows that returns `Result` instead of `[]Row` -fn (db &DB) handle_error_or_result(res voidptr, elabel string) !Result { - e := unsafe { C.PQerrorMessage(db.conn).vstring() } +fn (c &Conn) handle_error_or_result(res voidptr, elabel string) !Result { + e := unsafe { C.PQerrorMessage(c.conn).vstring() } if e != '' { C.PQclear(res) + c.mark_bad_if_disconnected() $if trace_pg_error ? { eprintln('pg error: ${e}') } @@ -582,10 +685,11 @@ fn (db &DB) handle_error_or_result(res voidptr, elabel string) !Result { return res_to_result(res) } -fn (db &DB) handle_error(res voidptr, elabel string) ! { - e := unsafe { C.PQerrorMessage(db.conn).vstring() } +fn (c &Conn) handle_error(res voidptr, elabel string) ! { + e := unsafe { C.PQerrorMessage(c.conn).vstring() } if e != '' { C.PQclear(res) + c.mark_bad_if_disconnected() $if trace_pg_error ? { eprintln('pg error: ${e}') } @@ -595,14 +699,15 @@ fn (db &DB) handle_error(res voidptr, elabel string) ! { // copy_expert executes COPY command // https://www.postgresql.org/docs/9.5/libpq-copy.html -pub fn (db &DB) copy_expert(query string, mut file io.ReaderWriter) !int { - mut res := C.PQexec(db.conn, &char(query.str)) +pub fn (c &Conn) copy_expert(query string, mut file io.ReaderWriter) !int { + c.ensure_active()! + mut res := C.PQexec(c.conn, &char(query.str)) status := unsafe { ExecStatusType(C.PQresultStatus(res)) } defer { C.PQclear(res) } - e := unsafe { C.PQerrorMessage(db.conn).vstring() } + e := unsafe { C.PQerrorMessage(c.conn).vstring() } if e != '' { return error('pg copy error:\n${e}') } @@ -612,20 +717,20 @@ pub fn (db &DB) copy_expert(query string, mut file io.ReaderWriter) !int { for { n := file.read(mut buf) or { msg := 'pg copy error: Failed to read from input' - C.PQputCopyEnd(db.conn, &char(msg.str)) + C.PQputCopyEnd(c.conn, &char(msg.str)) return err } if n <= 0 { break } - code := C.PQputCopyData(db.conn, buf.data, n) + code := C.PQputCopyData(c.conn, buf.data, n) if code == -1 { return error('pg copy error: Failed to send data, code=${code}') } } - code := C.PQputCopyEnd(db.conn, &char(unsafe { nil })) + code := C.PQputCopyEnd(c.conn, &char(unsafe { nil })) if code != 1 { return error('pg copy error: Failed to finish copy command, code: ${code}') @@ -633,7 +738,7 @@ pub fn (db &DB) copy_expert(query string, mut file io.ReaderWriter) !int { } else if status == .copy_out { for { address := &char(unsafe { nil }) - n_bytes := C.PQgetCopyData(db.conn, &address, 0) + n_bytes := C.PQgetCopyData(c.conn, &address, 0) if n_bytes > 0 { mut local_buf := []u8{len: n_bytes} unsafe { C.memcpy(&u8(local_buf.data), address, n_bytes) } @@ -656,7 +761,7 @@ pub fn (db &DB) copy_expert(query string, mut file io.ReaderWriter) !int { return 0 } -fn pg_stmt_worker(db &DB, query string, data orm.QueryData, where orm.QueryData) ![]db.pg.Row { +fn pg_stmt_worker(c &Conn, query string, data orm.QueryData, where orm.QueryData) ![]db.pg.Row { mut param_types := []u32{} mut param_vals := []&char{} mut param_lens := []int{} @@ -665,9 +770,9 @@ fn pg_stmt_worker(db &DB, query string, data orm.QueryData, where orm.QueryData) pg_stmt_binder(mut param_types, mut param_vals, mut param_lens, mut param_formats, data) pg_stmt_binder(mut param_types, mut param_vals, mut param_lens, mut param_formats, where) - res := C.PQexecParams(db.conn, &char(query.str), param_vals.len, param_types.data, + res := C.PQexecParams(c.conn, &char(query.str), param_vals.len, param_types.data, param_vals.data, param_lens.data, param_formats.data, 0) // here, the last 0 means require text results, 1 - binary results - return db.handle_error_or_rows(res, 'orm_stmt_worker') + return c.handle_error_or_rows(res, 'orm_stmt_worker') } pub enum PQTransactionLevel { @@ -679,11 +784,15 @@ pub enum PQTransactionLevel { @[params] pub struct PQTransactionParam { +pub: transaction_level PQTransactionLevel = .repeatable_read } -// begin begins a new transaction. -pub fn (db &DB) begin(param PQTransactionParam) ! { +// begin_on_conn begins a transaction on this single connection. Most callers +// should use `DB.begin()` instead, which returns a `Tx` that owns the +// underlying conn for the lifetime of the transaction. +pub fn (c &Conn) begin_on_conn(param PQTransactionParam) ! { + c.ensure_active()! mut sql_stmt := 'BEGIN TRANSACTION ISOLATION LEVEL ' match param.transaction_level { .read_uncommitted { sql_stmt += 'READ UNCOMMITTED' } @@ -692,78 +801,89 @@ pub fn (db &DB) begin(param PQTransactionParam) ! { .serializable { sql_stmt += 'SERIALIZABLE' } } - _ := C.PQexec(db.conn, &char(sql_stmt.str)) - e := unsafe { C.PQerrorMessage(db.conn).vstring() } + _ := C.PQexec(c.conn, &char(sql_stmt.str)) + e := unsafe { C.PQerrorMessage(c.conn).vstring() } if e != '' { + c.mark_bad_if_disconnected() return error('pg exec error: "${e}"') } } -// commit commits the current transaction. -pub fn (db &DB) commit() ! { - _ := C.PQexec(db.conn, c'COMMIT;') - e := unsafe { C.PQerrorMessage(db.conn).vstring() } +// commit commits the current transaction on this connection. +pub fn (c &Conn) commit() ! { + c.ensure_active()! + _ := C.PQexec(c.conn, c'COMMIT;') + e := unsafe { C.PQerrorMessage(c.conn).vstring() } if e != '' { + c.mark_bad_if_disconnected() return error('pg exec error: "${e}"') } } -// rollback rollbacks the current transaction. -pub fn (db &DB) rollback() ! { - _ := C.PQexec(db.conn, c'ROLLBACK;') - e := unsafe { C.PQerrorMessage(db.conn).vstring() } +// rollback rolls back the current transaction on this connection. +pub fn (c &Conn) rollback() ! { + c.ensure_active()! + _ := C.PQexec(c.conn, c'ROLLBACK;') + e := unsafe { C.PQerrorMessage(c.conn).vstring() } if e != '' { + c.mark_bad_if_disconnected() return error('pg exec error: "${e}"') } } -// rollback_to rollbacks to a specified savepoint. -pub fn (db &DB) rollback_to(savepoint string) ! { +// rollback_to rolls back to a specified savepoint on this connection. +pub fn (c &Conn) rollback_to(savepoint string) ! { + c.ensure_active()! if !savepoint.is_identifier() { return error('savepoint should be a identifier string') } sql_stmt := 'ROLLBACK TO SAVEPOINT ${savepoint};' - _ := C.PQexec(db.conn, &char(sql_stmt.str)) - e := unsafe { C.PQerrorMessage(db.conn).vstring() } + _ := C.PQexec(c.conn, &char(sql_stmt.str)) + e := unsafe { C.PQerrorMessage(c.conn).vstring() } if e != '' { + c.mark_bad_if_disconnected() return error('pg exec error: "${e}"') } } -// savepoint create a new savepoint. -pub fn (db &DB) savepoint(savepoint string) ! { +// savepoint creates a new savepoint on this connection. +pub fn (c &Conn) savepoint(savepoint string) ! { + c.ensure_active()! if !savepoint.is_identifier() { return error('savepoint should be a identifier string') } sql_stmt := 'SAVEPOINT ${savepoint};' - _ := C.PQexec(db.conn, &char(sql_stmt.str)) - e := unsafe { C.PQerrorMessage(db.conn).vstring() } + _ := C.PQexec(c.conn, &char(sql_stmt.str)) + e := unsafe { C.PQerrorMessage(c.conn).vstring() } if e != '' { + c.mark_bad_if_disconnected() return error('pg exec error: "${e}"') } } -// release_savepoint releases a specified savepoint. -pub fn (db &DB) release_savepoint(savepoint string) ! { +// release_savepoint releases a specified savepoint on this connection. +pub fn (c &Conn) release_savepoint(savepoint string) ! { + c.ensure_active()! if !savepoint.is_identifier() { return error('savepoint should be a identifier string') } sql_stmt := 'RELEASE SAVEPOINT ${savepoint};' - _ := C.PQexec(db.conn, &char(sql_stmt.str)) - e := unsafe { C.PQerrorMessage(db.conn).vstring() } + _ := C.PQexec(c.conn, &char(sql_stmt.str)) + e := unsafe { C.PQerrorMessage(c.conn).vstring() } if e != '' { + c.mark_bad_if_disconnected() return error('pg exec error: "${e}"') } } // validate checks if the connection is still usable -pub fn (db &DB) validate() !bool { - db.exec_one('SELECT 1')! +pub fn (c &Conn) validate() !bool { + c.exec_one('SELECT 1')! return true } // reset returns the connection to initial state for reuse -pub fn (db &DB) reset() ! { +pub fn (c &Conn) reset() ! { } // as_structs is a `Result` method that maps the results' rows based on the provided mapping function @@ -778,53 +898,60 @@ pub fn (res Result) as_structs[T](mapper fn (Result, Row) !T) ![]T { // listen registers the connection to receive notifications on the specified channel. // After calling this, use consume_input() and get_notification() to receive notifications. -pub fn (db &DB) listen(channel string) ! { +pub fn (c &Conn) listen(channel string) ! { + c.ensure_active()! if !channel.is_identifier() { return error('channel name should be a valid identifier') } sql_stmt := 'LISTEN ${channel};' - _ := C.PQexec(db.conn, &char(sql_stmt.str)) - e := unsafe { C.PQerrorMessage(db.conn).vstring() } + _ := C.PQexec(c.conn, &char(sql_stmt.str)) + e := unsafe { C.PQerrorMessage(c.conn).vstring() } if e != '' { + c.mark_bad_if_disconnected() return error('pg listen error: "${e}"') } } // unlisten unregisters the connection from receiving notifications on the specified channel. // Use unlisten_all() to unregister from all channels. -pub fn (db &DB) unlisten(channel string) ! { +pub fn (c &Conn) unlisten(channel string) ! { + c.ensure_active()! if !channel.is_identifier() { return error('channel name should be a valid identifier') } sql_stmt := 'UNLISTEN ${channel};' - _ := C.PQexec(db.conn, &char(sql_stmt.str)) - e := unsafe { C.PQerrorMessage(db.conn).vstring() } + _ := C.PQexec(c.conn, &char(sql_stmt.str)) + e := unsafe { C.PQerrorMessage(c.conn).vstring() } if e != '' { + c.mark_bad_if_disconnected() return error('pg unlisten error: "${e}"') } } // unlisten_all unregisters the connection from all notification channels. -pub fn (db &DB) unlisten_all() ! { - _ := C.PQexec(db.conn, c'UNLISTEN *;') - e := unsafe { C.PQerrorMessage(db.conn).vstring() } +pub fn (c &Conn) unlisten_all() ! { + c.ensure_active()! + _ := C.PQexec(c.conn, c'UNLISTEN *;') + e := unsafe { C.PQerrorMessage(c.conn).vstring() } if e != '' { + c.mark_bad_if_disconnected() return error('pg unlisten error: "${e}"') } } // notify sends a notification on the specified channel with an optional payload. // All connections currently listening on that channel will receive the notification. -pub fn (db &DB) notify(channel string, payload string) ! { +pub fn (c &Conn) notify(channel string, payload string) ! { + c.ensure_active()! if !channel.is_identifier() { return error('channel name should be a valid identifier') } mut sql_stmt := '' if payload.len > 0 { // Use PQescapeLiteral to safely escape the payload - escaped := C.PQescapeLiteral(db.conn, &char(payload.str), usize(payload.len)) + escaped := C.PQescapeLiteral(c.conn, &char(payload.str), usize(payload.len)) if escaped == unsafe { nil } { - e := unsafe { C.PQerrorMessage(db.conn).vstring() } + e := unsafe { C.PQerrorMessage(c.conn).vstring() } return error('pg notify error: failed to escape payload: "${e}"') } sql_stmt = unsafe { 'NOTIFY ${channel}, ' + escaped.vstring() + ';' } @@ -832,9 +959,10 @@ pub fn (db &DB) notify(channel string, payload string) ! { } else { sql_stmt = 'NOTIFY ${channel};' } - _ := C.PQexec(db.conn, &char(sql_stmt.str)) - e := unsafe { C.PQerrorMessage(db.conn).vstring() } + _ := C.PQexec(c.conn, &char(sql_stmt.str)) + e := unsafe { C.PQerrorMessage(c.conn).vstring() } if e != '' { + c.mark_bad_if_disconnected() return error('pg notify error: "${e}"') } } @@ -842,10 +970,12 @@ pub fn (db &DB) notify(channel string, payload string) ! { // consume_input reads any available input from the server. // This must be called before get_notification() to ensure pending notifications are processed. // Returns true on success, false if there was an error reading from the connection. -pub fn (db &DB) consume_input() !bool { - result := C.PQconsumeInput(db.conn) +pub fn (c &Conn) consume_input() !bool { + c.ensure_active()! + result := C.PQconsumeInput(c.conn) if result == 0 { - e := unsafe { C.PQerrorMessage(db.conn).vstring() } + e := unsafe { C.PQerrorMessage(c.conn).vstring() } + c.mark_bad_if_disconnected() return error('pg consume_input error: "${e}"') } return true @@ -854,8 +984,9 @@ pub fn (db &DB) consume_input() !bool { // get_notification returns the next pending notification from the server, if any. // Returns none if there are no pending notifications. // You should call consume_input() before this to ensure all pending notifications are available. -pub fn (db &DB) get_notification() ?Notification { - notify := C.PQnotifies(db.conn) +pub fn (c &Conn) get_notification() ?Notification { + c.ensure_active() or { return none } + notify := C.PQnotifies(c.conn) if notify == unsafe { nil } { return none } @@ -872,6 +1003,7 @@ pub fn (db &DB) get_notification() ?Notification { // socket returns the file descriptor of the connection socket to the server. // This is useful for applications that want to use select() or poll() to wait // for notifications without blocking. Returns -1 if no valid socket. -pub fn (db &DB) socket() int { - return C.PQsocket(db.conn) +pub fn (c &Conn) socket() int { + c.ensure_active() or { return -1 } + return C.PQsocket(c.conn) } diff --git a/vlib/db/pg/pg_double_test.v b/vlib/db/pg/pg_double_test.v index 4ab7ef8e7..410ffbc42 100644 --- a/vlib/db/pg/pg_double_test.v +++ b/vlib/db/pg/pg_double_test.v @@ -17,7 +17,7 @@ fn test_float_field() { return } conn := 'host=localhost user=postgres password=12345678' // insert own connection string - db := pg.connect_with_conninfo(conn)! + mut db := pg.connect_with_conninfo(conn)! defer { db.close() or {} } diff --git a/vlib/db/pg/pg_result_test.v b/vlib/db/pg/pg_result_test.v index 8df65fda4..0c2687e1d 100644 --- a/vlib/db/pg/pg_result_test.v +++ b/vlib/db/pg/pg_result_test.v @@ -34,7 +34,7 @@ fn test_large_exec() { return } - db := pg.connect(pg.Config{ user: 'postgres', password: '12345678', dbname: 'postgres' })! + mut db := pg.connect(pg.Config{ user: 'postgres', password: '12345678', dbname: 'postgres' })! defer { db.close() or {} } diff --git a/vlib/db/pg/pg_test.v b/vlib/db/pg/pg_test.v index ce445b00d..7d5af8025 100644 --- a/vlib/db/pg/pg_test.v +++ b/vlib/db/pg/pg_test.v @@ -11,7 +11,7 @@ fn test_large_exec() { return } - db := pg.connect(pg.Config{ user: 'postgres', password: '12345678', dbname: 'postgres' })! + mut db := pg.connect(pg.Config{ user: 'postgres', password: '12345678', dbname: 'postgres' })! defer { db.close() or {} } @@ -39,14 +39,19 @@ fn test_prepared() { eprintln('> This test requires a working postgres server running on localhost.') return } - db := pg.connect(pg.Config{ user: 'postgres', password: '12345678', dbname: 'postgres' })! + mut db := pg.connect(pg.Config{ user: 'postgres', password: '12345678', dbname: 'postgres' })! defer { db.close() or {} } - db.prepare('test_prepared', 'SELECT NOW(), $1 AS NAME', 1) or { panic(err) } + // Prepared statements are session-scoped, so pin a single conn. + mut c := db.conn()! + defer { + c.close() or {} + } + c.prepare('test_prepared', 'SELECT NOW(), $1 AS NAME', 1) or { panic(err) } - result := db.exec_prepared('test_prepared', ['hello world']) or { panic(err) } + result := c.exec_prepared('test_prepared', ['hello world']) or { panic(err) } assert result.len == 1 } @@ -68,19 +73,26 @@ fn test_transaction() { username TEXT, last_name TEXT NULL DEFAULT NULL )')! - mut conn := orm.TransactionalConnection(db) - mut tx := orm.begin(mut conn)! - tx.transaction[int](fn (mut tx orm.Tx) !int { + + // orm.TransactionalConnection requires a pinned conn (DB is pool-backed + // and cannot guarantee BEGIN/COMMIT land on the same physical conn). + mut c := db.conn()! + mut tc := orm.TransactionalConnection(c) + mut otx := orm.begin(mut tc)! + otx.transaction[int](fn (mut tx orm.Tx) !int { return 1 })! + otx.commit()! + c.close()! + + mut tx := db.begin()! + tx.exec("insert into users (username) values ('jackson')")! + tx.savepoint('savepoint1')! + tx.exec("insert into users (username) values ('kitty')")! + tx.rollback_to('savepoint1')! + tx.exec("insert into users (username) values ('mars')")! tx.commit()! - db.begin()! - db.exec("insert into users (username) values ('jackson')")! - db.savepoint('savepoint1')! - db.exec("insert into users (username) values ('kitty')")! - db.rollback_to('savepoint1')! - db.exec("insert into users (username) values ('mars')")! - db.commit()! + rows := db.exec('select * from users')! for row in rows { // We just need to access the memory to ensure it's properly allocated @@ -95,22 +107,28 @@ fn test_listen_notify() { return } - db := pg.connect(pg.Config{ user: 'postgres', password: '12345678', dbname: 'postgres' })! + mut db := pg.connect(pg.Config{ user: 'postgres', password: '12345678', dbname: 'postgres' })! defer { db.close() or {} } + // LISTEN/NOTIFY is session-scoped; pin a single conn for the test. + mut c := db.conn()! + defer { + c.close() or {} + } + // Test listen - db.listen('test_channel')! + c.listen('test_channel')! // Test notify with payload - db.notify('test_channel', 'hello world')! + c.notify('test_channel', 'hello world')! // Consume input to process the notification - db.consume_input()! + c.consume_input()! // Get the notification - if notification := db.get_notification() { + if notification := c.get_notification() { assert notification.channel == 'test_channel' assert notification.payload == 'hello world' assert notification.pid > 0 @@ -119,10 +137,10 @@ fn test_listen_notify() { } // Test notify without payload - db.notify('test_channel', '')! - db.consume_input()! + c.notify('test_channel', '')! + c.consume_input()! - if notification := db.get_notification() { + if notification := c.get_notification() { assert notification.channel == 'test_channel' assert notification.payload == '' } else { @@ -130,17 +148,17 @@ fn test_listen_notify() { } // Test that no more notifications are pending - assert db.get_notification() == none + assert c.get_notification() == none // Test unlisten - db.unlisten('test_channel')! + c.unlisten('test_channel')! // Test unlisten_all - db.listen('channel1')! - db.listen('channel2')! - db.unlisten_all()! + c.listen('channel1')! + c.listen('channel2')! + c.unlisten_all()! // Test socket (should return valid fd) - socket_fd := db.socket() + socket_fd := c.socket() assert socket_fd >= 0 } diff --git a/vlib/db/pg/pool.v b/vlib/db/pg/pool.v index 73873cc8e..ae5fc535b 100644 --- a/vlib/db/pg/pool.v +++ b/vlib/db/pg/pool.v @@ -1,38 +1,375 @@ module pg -pub struct ConnectionPool { +import sync +import time + +// PoolStats reports the current state of a `DB`'s connection pool. +pub struct PoolStats { +pub: + max_open_connections int // configured limit (0 = unlimited) + open_connections int // total in-use + idle conns + in_use int // conns currently checked out + idle int // conns parked as idle + wait_count int // number of callers currently blocked on acquire +} + +// PoolConfig configures pool behavior at construction time. +@[params] +pub struct PoolConfig { +pub: + max_open_conns int // 0 = unlimited + max_idle_conns int = 2 // 0 = keep no idle conns + conn_max_lifetime time.Duration // 0 = unlimited +} + +// IdleSlot is the pool's internal representation of a pooled libpq handle. +// The pool tracks raw PGconn* metadata in idle/waiter channels; `acquire` +// allocates a fresh `&Conn` wrapping the slot and `release` extracts the +// metadata back. Keeping the wrapper separate from the slot means a stale +// `&Conn` kept by user code after `close()` cannot be revived even when +// the pool re-hands the same physical PGconn* to another caller. +struct IdleSlot { + handle voidptr + created_at time.Time mut: - connections chan DB - config Config + bad bool } -// new_connection_pool creates a new connection pool with the given size and configuration. -pub fn new_connection_pool(config Config, size int) !ConnectionPool { - mut connections := chan DB{cap: size} - for _ in 0 .. size { - conn := connect(config)! - connections <- conn +@[heap] +struct Pool { +mut: + mu &sync.Mutex = unsafe { nil } + conninfo string + max_open int + max_idle int + max_lifetime time.Duration + idle []IdleSlot + open_count int + waiters []chan IdleSlot + closed bool + // last_ids stores the per-thread last-inserted id so DB.last_id() returns + // the value captured on the same pooled conn that ran the INSERT, instead + // of calling LASTVAL() on whatever conn the pool happens to hand out next + // (which is session-scoped and would return 0 or a wrong value). + last_ids_mu &sync.Mutex = unsafe { nil } + last_ids map[u64]i64 +} + +fn new_pool(conninfo string, cfg PoolConfig) &Pool { + mut p := &Pool{ + mu: sync.new_mutex() + last_ids_mu: sync.new_mutex() + conninfo: conninfo + max_open: cfg.max_open_conns + max_idle: cfg.max_idle_conns + max_lifetime: cfg.conn_max_lifetime + } + if p.max_open < 0 { + p.max_open = 0 } - return ConnectionPool{ - connections: connections - config: config + if p.max_idle < 0 { + p.max_idle = 0 } + return p } -// acquire gets a connection from the pool -pub fn (mut pool ConnectionPool) acquire() !DB { - return <-pool.connections or { return error('Failed to acquire a connection from the pool') } +// wrap_slot builds the fresh `&Conn` that gets handed to a caller. Each +// acquire allocates a new wrapper so a previously-checked-out (and since +// released) `&Conn` reference can never be reused to reach the underlying +// PGconn*. +fn (p &Pool) wrap_slot(slot IdleSlot) &Conn { + return &Conn{ + conn: slot.handle + pool: unsafe { p } + created_at: slot.created_at + bad: slot.bad + } } -// release returns a connection back to the pool. -pub fn (mut pool ConnectionPool) release(conn DB) { - pool.connections <- conn +fn (mut p Pool) acquire() !&Conn { + for { + p.mu.lock() + if p.closed { + p.mu.unlock() + return error('pg: pool is closed') + } + // Pop newest idle slot (LIFO keeps the freshest connection warm) + for p.idle.len > 0 { + slot := p.idle.last() + p.idle.delete_last() + if slot_expired(slot, p.max_lifetime) || slot_bad(slot) { + physical_close_handle(slot.handle) + p.open_count-- + continue + } + p.mu.unlock() + return p.wrap_slot(slot) + } + // Capacity available: open a new conn outside the lock + if p.max_open == 0 || p.open_count < p.max_open { + p.open_count++ + conninfo := p.conninfo + p.mu.unlock() + slot := connect_slot(conninfo) or { + p.mu.lock() + p.open_count-- + p.mu.unlock() + return err + } + // close() may have run while we were dialing outside the lock. + // Honor the close contract by tearing the fresh handle down + // instead of returning a live Conn after shutdown. + p.mu.lock() + if p.closed { + p.open_count-- + p.mu.unlock() + physical_close_handle(slot.handle) + return error('pg: pool is closed') + } + p.mu.unlock() + return p.wrap_slot(slot) + } + // At capacity: wait for a release/close/cap-raise to signal us. + // Senders transfer slot ownership via this channel; a sentinel slot + // with a nil handle means "capacity changed, retry the acquire + // loop" (used by set_max_open when raising the limit). + waiter := chan IdleSlot{cap: 1} + p.waiters << waiter + p.mu.unlock() + slot := <-waiter or { return error('pg: pool was closed while waiting for connection') } + if isnil(slot.handle) { + continue + } + return p.wrap_slot(slot) + } + return error('pg: unreachable') +} + +fn (mut p Pool) release(conn &Conn) { + if isnil(conn) { + return + } + mut c := unsafe { conn } + // Detach the wrapper from the physical handle before doing anything else. + // This is what makes a stale `&Conn` reference held by user code inert: + // any subsequent method call on it will see `c.conn == nil` and error. + // It is also the idempotency guard against double-close — a second + // release() finds nothing to return to the pool and is a no-op. + if isnil(c.conn) { + return + } + slot := IdleSlot{ + handle: c.conn + created_at: c.created_at + bad: c.bad + } + c.conn = unsafe { nil } + c.pool = unsafe { nil } + p.mu.lock() + if p.closed { + // close() only decremented open_count for slots it found parked; + // in-use conns are accounted for here as they trickle back. + p.open_count-- + p.mu.unlock() + physical_close_handle(slot.handle) + return + } + // Discard broken or expired conns + if slot_bad(slot) || slot_expired(slot, p.max_lifetime) { + physical_close_handle(slot.handle) + p.open_count-- + // A capacity slot just opened; hand it to a waiter as a fresh conn + if p.waiters.len > 0 && (p.max_open == 0 || p.open_count < p.max_open) { + waiter := p.waiters[0] + p.waiters.delete(0) + p.open_count++ + conninfo := p.conninfo + p.mu.unlock() + new_slot := connect_slot(conninfo) or { + p.mu.lock() + p.open_count-- + // Capacity is now open but the dial just failed. Wake every + // other parked waiter too, otherwise they hang forever waiting + // for a release that will never come (e.g. max_open=1). + extras := p.waiters.clone() + p.waiters = []chan IdleSlot{} + p.mu.unlock() + waiter.close() + for w in extras { + w.close() + } + return + } + p.mu.lock() + if p.closed { + // Pool closed during the dial: drop the new conn and signal the waiter. + p.open_count-- + p.mu.unlock() + physical_close_handle(new_slot.handle) + waiter.close() + return + } + waiter <- new_slot + p.mu.unlock() + return + } + p.mu.unlock() + return + } + // If a recent set_max_open() shrank the cap below open_count, this + // returning slot has to be retired even when a waiter is queued — + // otherwise the fast hand-off keeps open_count pinned above the new + // limit forever under steady traffic. Wake the waiter with a retry + // sentinel so it re-evaluates capacity under the new cap. + if p.max_open > 0 && p.open_count > p.max_open { + physical_close_handle(slot.handle) + p.open_count-- + if p.waiters.len > 0 { + waiter := p.waiters[0] + p.waiters.delete(0) + waiter <- IdleSlot{ + handle: unsafe { nil } + } + } + p.mu.unlock() + return + } + // Healthy conn: prefer handing it directly to a waiter. Send under the + // lock (cap:1 makes it non-blocking) so close() can't slip in and orphan + // the popped waiter with a live conn. + if p.waiters.len > 0 { + waiter := p.waiters[0] + p.waiters.delete(0) + waiter <- slot + p.mu.unlock() + return + } + // Park as idle, unless we'd exceed max_idle (0 = keep no idle conns) + // or we are over a recently-shrunk max_open and need to converge. + if p.idle.len >= p.max_idle || (p.max_open > 0 && p.open_count > p.max_open) { + physical_close_handle(slot.handle) + p.open_count-- + p.mu.unlock() + return + } + p.idle << slot + p.mu.unlock() } -// close closes all connections in the pool. -pub fn (mut pool ConnectionPool) close() { - for _ in 0 .. pool.connections.len { - conn := <-pool.connections or { break } - conn.close() or { break } +fn (mut p Pool) close() { + p.mu.lock() + if p.closed { + p.mu.unlock() + return } + p.closed = true + for slot in p.idle { + physical_close_handle(slot.handle) + } + idle_len := p.idle.len + p.idle = []IdleSlot{} + p.open_count -= idle_len + for waiter in p.waiters { + waiter.close() + } + p.waiters = []chan IdleSlot{} + p.mu.unlock() +} + +fn (mut p Pool) stats() PoolStats { + p.mu.lock() + stats := PoolStats{ + max_open_connections: p.max_open + open_connections: p.open_count + idle: p.idle.len + in_use: p.open_count - p.idle.len + wait_count: p.waiters.len + } + p.mu.unlock() + return stats +} + +fn (mut p Pool) set_max_open(n int) { + mut nn := n + if nn < 0 { + nn = 0 + } + p.mu.lock() + p.max_open = nn + // Shrink: drop idle slots until we are back under the new cap. Without + // this, acquire() pops idle slots before checking max_open, so two + // callers could still both succeed after set_max_open_conns(1) when two + // warm conns are parked. In-use conns can't be reclaimed here — they get + // discarded on release once open_count is over the cap. + if nn > 0 { + for p.idle.len > 0 && p.open_count > nn { + slot := p.idle.last() + p.idle.delete_last() + physical_close_handle(slot.handle) + p.open_count-- + } + } + // Raise: wake parked waiters so they can retry the acquire loop and + // dial against the new headroom. Without this nudge they stay blocked + // until some other release happens, which may never come if all current + // conns are long-lived. + if p.waiters.len > 0 && (nn == 0 || p.open_count < nn) { + spare := if nn == 0 { p.waiters.len } else { nn - p.open_count } + mut n_wake := if spare < p.waiters.len { spare } else { p.waiters.len } + for n_wake > 0 { + waiter := p.waiters[0] + p.waiters.delete(0) + waiter <- IdleSlot{ + handle: unsafe { nil } + } + n_wake-- + } + } + p.mu.unlock() +} + +fn (mut p Pool) set_max_idle(n int) { + mut nn := n + if nn < 0 { + nn = 0 + } + p.mu.lock() + p.max_idle = nn + for p.idle.len > nn { + slot := p.idle.last() + p.idle.delete_last() + physical_close_handle(slot.handle) + p.open_count-- + } + p.mu.unlock() +} + +fn (mut p Pool) set_conn_max_lifetime(d time.Duration) { + p.mu.lock() + p.max_lifetime = d + p.mu.unlock() +} + +// stash_last_id records `id` as the last-inserted id for the current thread. +// Called by `DB.insert` right after running INSERT on the pinned conn, so the +// next `DB.last_id()` call on the same thread returns this exact value rather +// than running LASTVAL() on a different pooled session. +fn (mut p Pool) stash_last_id(id int) { + tid := sync.thread_id() + p.last_ids_mu.lock() + p.last_ids[tid] = i64(id) + p.last_ids_mu.unlock() +} + +// take_last_id returns the last-inserted id stashed by this thread's most +// recent `DB.insert`, or 0 if there is none. Consuming on read keeps the +// map bounded as threads come and go, and prevents a future thread that +// reuses this tid from observing a stale id before doing its own insert. +fn (mut p Pool) take_last_id() int { + tid := sync.thread_id() + p.last_ids_mu.lock() + id := p.last_ids[tid] or { i64(0) } + p.last_ids.delete(tid) + p.last_ids_mu.unlock() + return int(id) } diff --git a/vlib/db/pg/tx.v b/vlib/db/pg/tx.v new file mode 100644 index 000000000..b9c84ba95 --- /dev/null +++ b/vlib/db/pg/tx.v @@ -0,0 +1,167 @@ +module pg + +import io + +// Tx is a database transaction. It pins a single `Conn` from the pool for +// the lifetime of the transaction, so all queries run on the same physical +// connection. The pinned conn is returned to the pool when `commit()` or +// `rollback()` is called; failing to call either leaks the conn. +@[heap] +pub struct Tx { +mut: + conn &Conn = unsafe { nil } + done bool +} + +fn (tx &Tx) ensure_active() ! { + if tx.done { + return error('pg: transaction is already finished') + } + if isnil(tx.conn) { + return error('pg: transaction has no connection') + } +} + +fn (mut tx Tx) finish() { + if tx.done { + return + } + tx.done = true + if !isnil(tx.conn) { + mut c := tx.conn + c.close() or {} + tx.conn = unsafe { nil } + } +} + +// commit commits the transaction and returns the pinned conn to the pool. +pub fn (mut tx Tx) commit() ! { + tx.ensure_active()! + defer { + tx.finish() + } + tx.conn.commit()! +} + +// rollback rolls back the transaction and returns the pinned conn to the pool. +pub fn (mut tx Tx) rollback() ! { + tx.ensure_active()! + defer { + tx.finish() + } + tx.conn.rollback()! +} + +// savepoint creates a savepoint named `name`. +pub fn (mut tx Tx) savepoint(name string) ! { + tx.ensure_active()! + tx.conn.savepoint(name)! +} + +// rollback_to rolls the transaction back to the savepoint named `name`. +pub fn (mut tx Tx) rollback_to(name string) ! { + tx.ensure_active()! + tx.conn.rollback_to(name)! +} + +// release_savepoint releases the savepoint named `name`. +pub fn (mut tx Tx) release_savepoint(name string) ! { + tx.ensure_active()! + tx.conn.release_savepoint(name)! +} + +// raw returns the pinned conn for advanced use cases. The caller MUST NOT +// call `close()` on it; the conn is owned by the transaction. +pub fn (mut tx Tx) raw() !&Conn { + tx.ensure_active()! + return tx.conn +} + +// ---- exec/query helpers ---- + +// exec runs `query` on the pinned conn. +pub fn (mut tx Tx) exec(query string) ![]Row { + tx.ensure_active()! + return tx.conn.exec(query) +} + +// exec_no_null runs `query` and returns rows with no nullable fields. +pub fn (mut tx Tx) exec_no_null(query string) ![]RowNoNull { + tx.ensure_active()! + return tx.conn.exec_no_null(query) +} + +// exec_result runs `query` and returns a `Result`. +pub fn (mut tx Tx) exec_result(query string) !Result { + tx.ensure_active()! + return tx.conn.exec_result(query) +} + +// exec_one runs `query` and returns its first row. +pub fn (mut tx Tx) exec_one(query string) !Row { + tx.ensure_active()! + return tx.conn.exec_one(query) +} + +// exec_param_many runs `query` with the given parameters. +pub fn (mut tx Tx) exec_param_many(query string, params []string) ![]Row { + tx.ensure_active()! + return tx.conn.exec_param_many(query, params) +} + +// exec_param_many_result runs `query` with parameters and returns a `Result`. +pub fn (mut tx Tx) exec_param_many_result(query string, params []string) !Result { + tx.ensure_active()! + return tx.conn.exec_param_many_result(query, params) +} + +// exec_param runs `query` with one `$1` parameter. +pub fn (mut tx Tx) exec_param(query string, param string) ![]Row { + return tx.exec_param_many(query, [param]) +} + +// exec_param2 runs `query` with two parameters. +pub fn (mut tx Tx) exec_param2(query string, param string, param2 string) ![]Row { + return tx.exec_param_many(query, [param, param2]) +} + +// q_int runs `query` and returns the first column of the first row as int. +pub fn (mut tx Tx) q_int(query string) !int { + tx.ensure_active()! + return tx.conn.q_int(query) +} + +// q_string runs `query` and returns the first column of the first row as string. +pub fn (mut tx Tx) q_string(query string) !string { + tx.ensure_active()! + return tx.conn.q_string(query) +} + +// q_strings runs `query` and returns the full row set (alias of `exec`). +pub fn (mut tx Tx) q_strings(query string) ![]Row { + return tx.exec(query) +} + +// prepare registers a prepared statement on the pinned conn. +pub fn (mut tx Tx) prepare(name string, query string, num_params int) ! { + tx.ensure_active()! + return tx.conn.prepare(name, query, num_params) +} + +// exec_prepared runs a previously-prepared statement on the pinned conn. +pub fn (mut tx Tx) exec_prepared(name string, params []string) ![]Row { + tx.ensure_active()! + return tx.conn.exec_prepared(name, params) +} + +// exec_prepared_result runs a previously-prepared statement on the pinned conn. +pub fn (mut tx Tx) exec_prepared_result(name string, params []string) !Result { + tx.ensure_active()! + return tx.conn.exec_prepared_result(name, params) +} + +// copy_expert runs a COPY command on the pinned conn. +pub fn (mut tx Tx) copy_expert(query string, mut file io.ReaderWriter) !int { + tx.ensure_active()! + return tx.conn.copy_expert(query, mut file) +} -- 2.39.5