From 8a488593d5d4a101e77952bb6fbf2941293e24f7 Mon Sep 17 00:00:00 2001 From: omerrob Date: Tue, 18 Nov 2025 23:36:06 -0500 Subject: [PATCH] db.pg: support returning `Result` type with column names (#25737) --- vlib/db/pg/pg.c.v | 104 ++++++++++++++++++++++++++++++++++-- vlib/db/pg/pg_result_test.v | 57 ++++++++++++++++++++ 2 files changed, 156 insertions(+), 5 deletions(-) create mode 100644 vlib/db/pg/pg_result_test.v diff --git a/vlib/db/pg/pg.c.v b/vlib/db/pg/pg.c.v index 002725c55..522c4a229 100644 --- a/vlib/db/pg/pg.c.v +++ b/vlib/db/pg/pg.c.v @@ -66,6 +66,12 @@ pub mut: vals []?string } +pub struct Result { +pub: + cols map[string]int + rows []Row +} + pub struct Config { pub: host string = 'localhost' @@ -142,6 +148,8 @@ fn C.PQntuples(const_res &C.PGresult) int fn C.PQnfields(const_res &C.PGresult) int +fn C.PQfname(const_res &C.PGresult, int) &char + // Params: // const Oid *paramTypes // const char *const *paramValues @@ -223,6 +231,34 @@ fn res_to_rows(res voidptr) []Row { return rows } +// res_to_result creates a `Result` struct out of a `C.PGresult` pointer +fn res_to_result(res voidptr) Result { + nr_rows := C.PQntuples(res) + nr_cols := C.PQnfields(res) + + mut cols := map[string]int{} + mut rows := []Row{} + for i in 0 .. nr_rows { + mut row := Row{} + for j in 0 .. nr_cols { + if i == 0 { + field_name := unsafe { cstring_to_vstring(C.PQfname(res, j)) } + cols[field_name] = j + } + if C.PQgetisnull(res, i, j) != 0 { + row.vals << none + } else { + val := C.PQgetvalue(res, i, j) + row.vals << unsafe { cstring_to_vstring(val) } + } + } + rows << row + } + + C.PQclear(res) + return Result{cols, rows} +} + // close frees the underlying resource allocated by the database connection pub fn (db &DB) close() ! { C.PQfinish(db.conn) @@ -271,7 +307,13 @@ pub fn (db &DB) q_strings(query string) ![]Row { // exec submits a command to the database server and wait for the result, returning an error on failure and a row set on success pub fn (db &DB) exec(query string) ![]Row { res := C.PQexec(db.conn, &char(query.str)) - return db.handle_error_or_result(res, 'exec') + return db.handle_error_or_rows(res, 'exec') +} + +// exec_result submits a command to the database server and wait for the result, returning an error on failure and a `Result` set on success +pub fn (db &DB) exec_result(query string) !Result { + res := C.PQexec(db.conn, &char(query.str)) + return db.handle_error_or_result(res, 'exec_result') } fn rows_first_or_empty(rows []Row) !Row { @@ -302,7 +344,21 @@ pub fn (db &DB) exec_param_many(query string, params []string) ![]Row { res := C.PQexecParams(db.conn, &char(query.str), params.len, 0, param_vals.data, 0, 0, 0) - return db.handle_error_or_result(res, 'exec_param_many') + return db.handle_error_or_rows(res, 'exec_param_many') + } +} + +// exec_param_many executes a query with the parameters provided as ($1), ($2), ($n) and returns a `Result` +pub fn (db &DB) exec_param_many_result(query string, params []string) !Result { + unsafe { + mut param_vals := []&char{len: params.len} + for i in 0 .. params.len { + param_vals[i] = &char(params[i].str) + } + + res := C.PQexecParams(db.conn, &char(query.str), params.len, 0, param_vals.data, + 0, 0, 0) + return db.handle_error_or_result(res, 'exec_param_many_result') } } @@ -333,11 +389,26 @@ pub fn (db &DB) exec_prepared(name string, params []string) ![]Row { res := C.PQexecPrepared(db.conn, &char(name.str), params.len, param_vals.data, 0, 0, 0) - return db.handle_error_or_result(res, 'exec_prepared') + return db.handle_error_or_rows(res, 'exec_prepared') + } +} + +// exec_prepared sends a request to execute a prepared statement with given parameters, and waits for the result. The number of parameters must match with the parameters declared in the prepared statement. +// returns `Result` +pub fn (db &DB) exec_prepared_result(name string, params []string) !Result { + unsafe { + mut param_vals := []&char{len: params.len} + for i in 0 .. params.len { + param_vals[i] = &char(params[i].str) + } + + res := C.PQexecPrepared(db.conn, &char(name.str), params.len, param_vals.data, + 0, 0, 0) + return db.handle_error_or_result(res, 'exec_prepared_result') } } -fn (db &DB) handle_error_or_result(res voidptr, elabel string) ![]Row { +fn (db &DB) handle_error_or_rows(res voidptr, elabel string) ![]Row { e := unsafe { C.PQerrorMessage(db.conn).vstring() } if e != '' { C.PQclear(res) @@ -349,6 +420,19 @@ fn (db &DB) handle_error_or_result(res voidptr, elabel string) ![]Row { return res_to_rows(res) } +// hande_error_or_result is an internal function similar to handle_error_or_rows that returns `Result` instead of `[]Row` +fn (db &DB) handle_error_or_result(res voidptr, elabel string) !Result { + e := unsafe { C.PQerrorMessage(db.conn).vstring() } + if e != '' { + C.PQclear(res) + $if trace_pg_error ? { + eprintln('pg error: ${e}') + } + return error('pg ${elabel} error:\n${e}') + } + return res_to_result(res) +} + fn (db &DB) handle_error(res voidptr, elabel string) ! { e := unsafe { C.PQerrorMessage(db.conn).vstring() } if e != '' { @@ -436,7 +520,7 @@ fn pg_stmt_worker(db &DB, query string, data orm.QueryData, where orm.QueryData) res := C.PQexecParams(db.conn, &char(query.str), param_vals.len, param_types.data, param_vals.data, param_lens.data, param_formats.data, 0) // here, the last 0 means require text results, 1 - binary results - return db.handle_error_or_result(res, 'orm_stmt_worker') + return db.handle_error_or_rows(res, 'orm_stmt_worker') } pub enum PQTransactionLevel { @@ -520,3 +604,13 @@ pub fn (db &DB) validate() !bool { // reset returns the connection to initial state for reuse pub fn (db &DB) reset() ! { } + +// as_structs is a `Result` method that maps the results' rows based on the provided mapping function +pub fn (res Result) as_structs[T](mapper fn (Result, Row) !T) ![]T { + mut typed := []T{} + for r in res.rows { + typed << mapper(res, r)! + } + + return typed +} diff --git a/vlib/db/pg/pg_result_test.v b/vlib/db/pg/pg_result_test.v new file mode 100644 index 000000000..8df65fda4 --- /dev/null +++ b/vlib/db/pg/pg_result_test.v @@ -0,0 +1,57 @@ +// vtest build: started_postgres? +module main + +import db.pg + +struct Info { + table_schema string + relname string + attname string + typename string + typealign string + typlen int +} + +fn deref(val ?string) string { + return val or { panic('no value') } +} + +fn row_mapper(res pg.Result, row pg.Row) !Info { + return Info{ + table_schema: deref(row.vals[res.cols['table_schema']]) + relname: deref(row.vals[res.cols['relname']]) + attname: deref(row.vals[res.cols['attname']]) + typename: deref(row.vals[res.cols['typename']]) + typealign: deref(row.vals[res.cols['typealign']]) + typlen: deref(row.vals[res.cols['typlen']]).int() + } +} + +fn test_large_exec() { + $if !network ? { + eprintln('> Skipping test ${@FN}, since `-d network` is not passed.') + eprintln('> This test requires a working postgres server running on localhost.') + return + } + + db := pg.connect(pg.Config{ user: 'postgres', password: '12345678', dbname: 'postgres' })! + defer { + db.close() or {} + } + + result := db.exec_result(' +SELECT ischema.table_schema, c.relname, a.attname, t.typname, t.typalign, t.typlen + FROM pg_class c + JOIN information_schema.tables ischema on ischema.table_name = c.relname + JOIN pg_attribute a ON (a.attrelid = c.oid) + JOIN pg_type t ON (t.oid = a.atttypid) +WHERE + a.attnum >= 0 + ')! + + infos := result.as_structs(row_mapper)! + + assert result.rows.len > 0 && infos.len == result.rows.len + + // println(infos) +} -- 2.39.5