From 298a573a3e49302a8d24faec28e187d24acc85df Mon Sep 17 00:00:00 2001 From: Alexander Medvednikov Date: Thu, 23 Apr 2026 21:57:54 +0300 Subject: [PATCH] db.mysql: fix Seg fault on Mysql package if app.db.connect() is not at the right place (fixes #14287) --- vlib/db/mysql/README.md | 7 + vlib/db/mysql/_cdefs.c.v | 6 + vlib/db/mysql/mysql.c.v | 352 +++++++++++++++++++++++++++++++------ vlib/db/mysql/mysql_test.v | 38 ++++ vlib/db/mysql/result.c.v | 30 +++- 5 files changed, 373 insertions(+), 60 deletions(-) diff --git a/vlib/db/mysql/README.md b/vlib/db/mysql/README.md index 2a0f209fa..739a2dfdf 100644 --- a/vlib/db/mysql/README.md +++ b/vlib/db/mysql/README.md @@ -68,6 +68,13 @@ for row in rows { db.close() ``` +## Concurrent Usage + +Sharing one `mysql.DB` across threads now serializes connection-level queries safely. + +For concurrent servers, prefer `mysql.new_connection_pool(...)` so requests do not share the same +session and transaction state on one connection. + ## Transaction ```v oksyntax diff --git a/vlib/db/mysql/_cdefs.c.v b/vlib/db/mysql/_cdefs.c.v index f08cc3b55..104d7f6f8 100644 --- a/vlib/db/mysql/_cdefs.c.v +++ b/vlib/db/mysql/_cdefs.c.v @@ -35,6 +35,12 @@ pub struct C.MYSQL_FIELD { // C.mysql_init allocates or initializes a MYSQL object suitable for `mysql_real_connect()`. fn C.mysql_init(mysql &C.MYSQL) &C.MYSQL +// C.mysql_thread_init initializes thread-local client state for threads using the MySQL C API. +fn C.mysql_thread_init() bool + +// C.mysql_thread_end finalizes thread-local client state for threads using the MySQL C API. +fn C.mysql_thread_end() + // C.mysql_real_connect attempts to establish a connection to a MySQL server running on `host`. fn C.mysql_real_connect(mysql &C.MYSQL, host &char, user &char, passwd &char, db &char, port u32, unix_socket &char, client_flag ConnectionFlag) &C.MYSQL diff --git a/vlib/db/mysql/mysql.c.v b/vlib/db/mysql/mysql.c.v index f0a8405d3..bdd693129 100644 --- a/vlib/db/mysql/mysql.c.v +++ b/vlib/db/mysql/mysql.c.v @@ -1,5 +1,7 @@ module mysql +import sync + // Values for the capabilities flag bitmask used by the MySQL protocol. // See more on https://dev.mysql.com/doc/dev/mysql-server/latest/group__group__cs__capabilities__flags.html#details @[flag] @@ -49,9 +51,31 @@ struct SQLError { MessageError } +const mysql_no_connection_error_message = 'No connection to a MySQL server, use `connect()` to connect to a database for working with it' +const mysql_thread_init_error_message = 'db.mysql: failed to initialize MySQL thread state' + +struct ConnectionState { +mut: + conn &C.MYSQL = unsafe { nil } + mutex &sync.Mutex = sync.new_mutex() +} + +struct MySQLThreadGuard { +mut: + active bool +} + +struct MySQLConnectionGuard { +mut: + conn &C.MYSQL = unsafe { nil } + state &ConnectionState = unsafe { nil } + thread MySQLThreadGuard +} + pub struct DB { mut: - conn &C.MYSQL = unsafe { nil } + conn &C.MYSQL = unsafe { nil } + state &ConnectionState = unsafe { nil } } @[params] @@ -130,6 +154,9 @@ pub fn connect(config Config) !DB { // because `throw_mysql_error` can't extract an error from a `null` connection, // and `panic` will be with an empty message. db.conn = connection + db.state = &ConnectionState{ + conn: connection + } return db } @@ -138,11 +165,15 @@ pub fn connect(config Config) !DB { // It cannot be used for statements that contain binary data; // Use `real_query()` instead. pub fn (db &DB) query(q string) !Result { - if C.mysql_query(db.conn, charptr(q.str)) != 0 { - db.throw_mysql_error()! + mut guard := db.acquire_connection_guard()! + defer { + guard.release() + } + if C.mysql_query(guard.conn, charptr(q.str)) != 0 { + throw_mysql_error_for_conn(guard.conn)! } - result := C.mysql_store_result(db.conn) + result := C.mysql_store_result(guard.conn) return Result{result} } @@ -155,7 +186,11 @@ pub fn (db &DB) query(q string) !Result { // mysql_use_result is faster and uses much less memory than C.mysql_store_result(). // You must mysql_free_result() after you are done with the result set. pub fn (db &DB) use_result() { - C.mysql_use_result(db.conn) + mut guard := db.acquire_connection_guard() or { return } + defer { + guard.release() + } + C.mysql_use_result(guard.conn) } // real_query makes an SQL query and receive the results. @@ -164,19 +199,27 @@ pub fn (db &DB) use_result() { // interprets as the end of the statement string). In addition, // `real_query()` is faster than `query()`. pub fn (mut db DB) real_query(q string) !Result { - if C.mysql_real_query(db.conn, q.str, q.len) != 0 { - db.throw_mysql_error()! + mut guard := db.acquire_connection_guard()! + defer { + guard.release() + } + if C.mysql_real_query(guard.conn, q.str, q.len) != 0 { + throw_mysql_error_for_conn(guard.conn)! } - result := C.mysql_store_result(db.conn) + result := C.mysql_store_result(guard.conn) return Result{result} } // select_db causes the database specified by `db` to become // the default (current) database on the connection specified by mysql. pub fn (mut db DB) select_db(dbname string) !bool { - if C.mysql_select_db(db.conn, dbname.str) != 0 { - db.throw_mysql_error()! + mut guard := db.acquire_connection_guard()! + defer { + guard.release() + } + if C.mysql_select_db(guard.conn, dbname.str) != 0 { + throw_mysql_error_for_conn(guard.conn)! } return true @@ -186,15 +229,19 @@ pub fn (mut db DB) select_db(dbname string) !bool { // Passing an empty string for the `dbname` parameter, resultsg in only changing // the user and not changing the default database for the connection. pub fn (mut db DB) change_user(username string, password string, dbname string) !bool { + mut guard := db.acquire_connection_guard()! + defer { + guard.release() + } mut result := true if dbname != '' { - result = C.mysql_change_user(db.conn, username.str, password.str, dbname.str) + result = C.mysql_change_user(guard.conn, username.str, password.str, dbname.str) } else { - result = C.mysql_change_user(db.conn, username.str, password.str, 0) + result = C.mysql_change_user(guard.conn, username.str, password.str, 0) } if !result { - db.throw_mysql_error()! + throw_mysql_error_for_conn(guard.conn)! } return result @@ -203,27 +250,37 @@ pub fn (mut db DB) change_user(username string, password string, dbname string) // affected_rows returns the number of rows changed, deleted, // or inserted by the last statement if it was an `UPDATE`, `DELETE`, or `INSERT`. pub fn (db &DB) affected_rows() u64 { - return C.mysql_affected_rows(db.conn) + mut guard := db.acquire_connection_guard() or { return 0 } + defer { + guard.release() + } + return C.mysql_affected_rows(guard.conn) } // autocommit turns on/off the auto-committing mode for the connection. // When it is on, then each query is committed right away. pub fn (mut db DB) autocommit(mode bool) ! { - db.check_connection_is_established()! - result := C.mysql_autocommit(db.conn, mode) + mut guard := db.acquire_connection_guard()! + defer { + guard.release() + } + result := C.mysql_autocommit(guard.conn, mode) if result != 0 { - db.throw_mysql_error()! + throw_mysql_error_for_conn(guard.conn)! } } // commit commits the current transaction. pub fn (db &DB) commit() ! { - db.check_connection_is_established()! - result := C.mysql_commit(db.conn) + mut guard := db.acquire_connection_guard()! + defer { + guard.release() + } + result := C.mysql_commit(guard.conn) if result != 0 { - db.throw_mysql_error()! + throw_mysql_error_for_conn(guard.conn)! } } @@ -261,11 +318,14 @@ pub fn (db &DB) set_transaction_level(level MySQLTransactionLevel) ! { // rollback rollbacks the current transaction. pub fn (db &DB) rollback() ! { - db.check_connection_is_established()! - result := C.mysql_rollback(db.conn) + mut guard := db.acquire_connection_guard()! + defer { + guard.release() + } + result := C.mysql_rollback(guard.conn) if result != 0 { - db.throw_mysql_error()! + throw_mysql_error_for_conn(guard.conn)! } } @@ -311,9 +371,13 @@ pub fn (db &DB) release_savepoint(savepoint string) ! { // If an empty string is passed, it will return all tables. // Calling `tables()` is similar to executing query `SHOW TABLES [LIKE wildcard]`. pub fn (db &DB) tables(wildcard string) ![]string { - c_mysql_result := C.mysql_list_tables(db.conn, wildcard.str) + mut guard := db.acquire_connection_guard()! + defer { + guard.release() + } + c_mysql_result := C.mysql_list_tables(guard.conn, wildcard.str) if isnil(c_mysql_result) { - db.throw_mysql_error()! + throw_mysql_error_for_conn(guard.conn)! } result := Result{c_mysql_result} @@ -330,9 +394,17 @@ pub fn (db &DB) tables(wildcard string) ![]string { // The `s` argument is encoded to produce an escaped SQL string, // taking into account the current character set of the connection. pub fn (db &DB) escape_string(s string) string { + conn := db.current_conn() + if isnil(conn) { + return '' + } + mut thread_guard := mysql_thread_guard() or { return '' } + defer { + thread_guard.release() + } unsafe { to := malloc_noscan(2 * s.len + 1) - C.mysql_real_escape_string(db.conn, to, s.str, s.len) + C.mysql_real_escape_string(conn, to, s.str, s.len) return to.vstring() } } @@ -341,15 +413,27 @@ pub fn (db &DB) escape_string(s string) string { // a connection. This function may be called multiple times to set several // options. To retrieve the current values for an option, use `get_option()`. pub fn (mut db DB) set_option(option_type int, val voidptr) { - C.mysql_options(db.conn, option_type, val) + conn := db.current_conn() + if isnil(conn) { + return + } + mut thread_guard := mysql_thread_guard() or { return } + defer { + thread_guard.release() + } + C.mysql_options(conn, option_type, val) } // get_option returns the value of an option, settable by `set_option`. // https://dev.mysql.com/doc/c-api/5.7/en/mysql-get-option.html pub fn (db &DB) get_option(option_type int) !voidptr { + mut guard := db.acquire_connection_guard()! + defer { + guard.release() + } mysql_option := unsafe { nil } - if C.mysql_get_option(db.conn, option_type, &mysql_option) != 0 { - db.throw_mysql_error()! + if C.mysql_get_option(guard.conn, option_type, &mysql_option) != 0 { + throw_mysql_error_for_conn(guard.conn)! } return mysql_option @@ -358,8 +442,12 @@ pub fn (db &DB) get_option(option_type int) !voidptr { // refresh flush the tables or caches, or resets replication server // information. The connected user must have the `RELOAD` privilege. pub fn (mut db DB) refresh(options u32) !bool { - if C.mysql_refresh(db.conn, options) != 0 { - db.throw_mysql_error()! + mut guard := db.acquire_connection_guard()! + defer { + guard.release() + } + if C.mysql_refresh(guard.conn, options) != 0 { + throw_mysql_error_for_conn(guard.conn)! } return true @@ -367,16 +455,24 @@ pub fn (mut db DB) refresh(options u32) !bool { // reset resets the connection, and clear the session state. pub fn (mut db DB) reset() ! { - if C.mysql_reset_connection(db.conn) != 0 { - db.throw_mysql_error()! + mut guard := db.acquire_connection_guard()! + defer { + guard.release() + } + if C.mysql_reset_connection(guard.conn) != 0 { + throw_mysql_error_for_conn(guard.conn)! } } // ping pings a server connection, or tries to reconnect if the connection // has gone down. pub fn (mut db DB) ping() !bool { - if C.mysql_ping(db.conn) != 0 { - db.throw_mysql_error()! + mut guard := db.acquire_connection_guard()! + defer { + guard.release() + } + if C.mysql_ping(guard.conn) != 0 { + throw_mysql_error_for_conn(guard.conn)! } return true @@ -390,25 +486,63 @@ pub fn (mut db DB) validate() !bool { // close closes the connection. pub fn (mut db DB) close() ! { - C.mysql_close(db.conn) + if isnil(db.state) { + if isnil(db.conn) { + return + } + mut thread_guard := mysql_thread_guard()! + defer { + thread_guard.release() + } + C.mysql_close(db.conn) + db.conn = unsafe { nil } + return + } + db.state.mutex.@lock() + defer { + db.state.mutex.unlock() + } + if isnil(db.state.conn) { + db.conn = unsafe { nil } + return + } + mut thread_guard := mysql_thread_guard()! + defer { + thread_guard.release() + } + C.mysql_close(db.state.conn) + db.state.conn = unsafe { nil } + db.conn = unsafe { nil } } // info returns information about the most recently executed query. // See more on https://dev.mysql.com/doc/c-api/8.0/en/mysql-info.html pub fn (db &DB) info() string { - return resolve_nil_str(C.mysql_info(db.conn)) + mut guard := db.acquire_connection_guard() or { return '' } + defer { + guard.release() + } + return resolve_nil_str(C.mysql_info(guard.conn)) } // get_host_info returns a string describing the type of connection in use, // including the server host name. pub fn (db &DB) get_host_info() string { - return unsafe { C.mysql_get_host_info(db.conn).vstring() } + mut guard := db.acquire_connection_guard() or { return '' } + defer { + guard.release() + } + return unsafe { C.mysql_get_host_info(guard.conn).vstring() } } // get_server_info returns a string representing the MySQL server version. // For example, `8.0.24`. pub fn (db &DB) get_server_info() string { - return unsafe { C.mysql_get_server_info(db.conn).vstring() } + mut guard := db.acquire_connection_guard() or { return '' } + defer { + guard.release() + } + return unsafe { C.mysql_get_server_info(guard.conn).vstring() } } // get_server_version returns an integer, representing the MySQL server @@ -416,14 +550,22 @@ pub fn (db &DB) get_server_info() string { // `YY` is the release level (or minor version), and `ZZ` is the sub-version // within the release level. For example, `8.0.24` is returned as `80024`. pub fn (db &DB) get_server_version() u64 { - return C.mysql_get_server_version(db.conn) + mut guard := db.acquire_connection_guard() or { return 0 } + defer { + guard.release() + } + return C.mysql_get_server_version(guard.conn) } // dump_debug_info instructs the server to write debugging information // to the error log. The connected user must have the `SUPER` privilege. pub fn (mut db DB) dump_debug_info() !bool { - if C.mysql_dump_debug_info(db.conn) != 0 { - db.throw_mysql_error()! + mut guard := db.acquire_connection_guard()! + defer { + guard.release() + } + if C.mysql_dump_debug_info(guard.conn) != 0 { + throw_mysql_error_for_conn(guard.conn)! } return true @@ -449,11 +591,15 @@ pub fn debug(debug string) { // exec executes the `query` on the given `db`, and returns an array of all the results, or an error on failure pub fn (db &DB) exec(query string) ![]Row { - if C.mysql_query(db.conn, query.str) != 0 { - db.throw_mysql_error()! + mut guard := db.acquire_connection_guard()! + defer { + guard.release() + } + if C.mysql_query(guard.conn, query.str) != 0 { + throw_mysql_error_for_conn(guard.conn)! } - result := C.mysql_store_result(db.conn) + result := C.mysql_store_result(guard.conn) if result == unsafe { nil } { return []Row{} } else { @@ -463,14 +609,18 @@ pub fn (db &DB) exec(query string) ![]Row { // exec_one executes the `query` on the given `db`, and returns either the first row from the result, if the query was successful, or an error pub fn (db &DB) exec_one(query string) !Row { - if C.mysql_query(db.conn, query.str) != 0 { - db.throw_mysql_error()! + mut guard := db.acquire_connection_guard()! + defer { + guard.release() + } + if C.mysql_query(guard.conn, query.str) != 0 { + throw_mysql_error_for_conn(guard.conn)! } - result := C.mysql_store_result(db.conn) + result := C.mysql_store_result(guard.conn) if result == unsafe { nil } { - db.throw_mysql_error()! + throw_mysql_error_for_conn(guard.conn)! } row_vals := C.mysql_fetch_row(result) num_cols := C.mysql_num_fields(result) @@ -495,9 +645,13 @@ pub fn (db &DB) exec_one(query string) !Row { // Use it, in case you don't expect any row results, but still want a result code. // e.g. for queries like these: INSERT INTO ... VALUES (...) pub fn (db &DB) exec_none(query string) int { - C.mysql_query(db.conn, query.str) + mut guard := db.acquire_connection_guard() or { return 1 } + defer { + guard.release() + } + C.mysql_query(guard.conn, query.str) - return get_errno(db.conn) + return get_errno(guard.conn) } // exec_param_many executes the `query` with parameters provided as `?`'s in the query @@ -535,9 +689,13 @@ pub struct StmtHandle { // as needed, which must be closed manually by the user // Placeholders are represented by `?` pub fn (db &DB) prepare(query string) !StmtHandle { - stmt := C.mysql_stmt_init(db.conn) + mut guard := db.acquire_connection_guard()! + defer { + guard.release() + } + stmt := C.mysql_stmt_init(guard.conn) if stmt == unsafe { nil } { - db.throw_mysql_error()! + throw_mysql_error_for_conn(guard.conn)! } mut code := C.mysql_stmt_prepare(stmt, query.str, query.len) @@ -548,7 +706,8 @@ pub fn (db &DB) prepare(query string) !StmtHandle { return StmtHandle{ stmt: stmt db: DB{ - conn: db.conn + conn: db.current_conn() + state: db.state } } } @@ -558,6 +717,10 @@ pub fn (db &DB) prepare(query string) !StmtHandle { // Returns an array of Rows, which will be empty if nothing is returned // from the query, or possibly an error value pub fn (stmt &StmtHandle) execute(params []string) ![]Row { + mut guard := stmt.db.acquire_connection_guard()! + defer { + guard.release() + } mut bind_params := []C.MYSQL_BIND{} for param in params { bind := C.MYSQL_BIND{ @@ -631,12 +794,20 @@ pub fn (stmt &StmtHandle) execute(params []string) ![]Row { // close acts on a StmtHandle to close the mysql Stmt // meaning it is no longer available for use pub fn (stmt &StmtHandle) close() { + mut thread_guard := mysql_thread_guard() or { return } + defer { + thread_guard.release() + } C.mysql_stmt_close(stmt.stmt) } @[inline] fn (db &DB) throw_mysql_error() ! { - return error_with_code(get_error_msg(db.conn), get_errno(db.conn)) + conn := db.current_conn() + if isnil(conn) { + return error(mysql_no_connection_error_message) + } + return error_with_code(get_error_msg(conn), get_errno(conn)) } @[inline] @@ -645,8 +816,75 @@ fn throw_mysql_stmt_error(stmt &C.MYSQL_STMT) ! { } @[inline] -fn (db &DB) check_connection_is_established() ! { +fn throw_mysql_error_for_conn(conn &C.MYSQL) ! { + return error_with_code(get_error_msg(conn), get_errno(conn)) +} + +@[inline] +fn mysql_thread_guard() !MySQLThreadGuard { + if C.mysql_thread_init() { + return error(mysql_thread_init_error_message) + } + return MySQLThreadGuard{ + active: true + } +} + +@[inline] +fn (mut guard MySQLThreadGuard) release() { + if guard.active { + C.mysql_thread_end() + guard.active = false + } +} + +@[inline] +fn (db &DB) current_conn() &C.MYSQL { + if !isnil(db.state) { + return db.state.conn + } + return db.conn +} + +fn (db &DB) acquire_connection_guard() !MySQLConnectionGuard { + if !isnil(db.state) { + db.state.mutex.@lock() + conn := db.state.conn + if isnil(conn) { + db.state.mutex.unlock() + return error(mysql_no_connection_error_message) + } + mut thread_guard := mysql_thread_guard() or { + db.state.mutex.unlock() + return err + } + return MySQLConnectionGuard{ + conn: conn + state: db.state + thread: thread_guard + } + } if isnil(db.conn) { - return error('No connection to a MySQL server, use `connect()` to connect to a database for working with it') + return error(mysql_no_connection_error_message) + } + mut thread_guard := mysql_thread_guard()! + return MySQLConnectionGuard{ + conn: db.conn + thread: thread_guard + } +} + +@[inline] +fn (mut guard MySQLConnectionGuard) release() { + guard.thread.release() + if !isnil(guard.state) { + guard.state.mutex.unlock() + } +} + +@[inline] +fn (db &DB) check_connection_is_established() ! { + if isnil(db.current_conn()) { + return error(mysql_no_connection_error_message) } } diff --git a/vlib/db/mysql/mysql_test.v b/vlib/db/mysql/mysql_test.v index d64a9bffd..7071c8391 100644 --- a/vlib/db/mysql/mysql_test.v +++ b/vlib/db/mysql/mysql_test.v @@ -144,3 +144,41 @@ fn test_mysql() { }, ] } + +fn mysql_query_count_from_shared_connection(db mysql.DB) !int { + result := db.query('SELECT COUNT(*) as table_count FROM information_schema.tables')! + rows := result.maps() + return rows[0]['table_count'].int() +} + +fn test_query_is_serialized_for_shared_connections() { + $if !network ? { + eprintln('> Skipping test ${@FN}, since `-d network` is not passed.') + eprintln('> This test requires a working mysql server running on localhost.') + return + } + config := mysql.Config{ + host: '127.0.0.1' + port: 3306 + username: 'root' + password: '12345678' + dbname: 'mysql' + } + + mut db := mysql.connect(config)! + defer { + db.close() or {} + } + + threads := [ + spawn mysql_query_count_from_shared_connection(db), + spawn mysql_query_count_from_shared_connection(db), + spawn mysql_query_count_from_shared_connection(db), + spawn mysql_query_count_from_shared_connection(db), + ] + results := threads.wait()! + assert results.len == 4 + for count in results { + assert count > 0 + } +} diff --git a/vlib/db/mysql/result.c.v b/vlib/db/mysql/result.c.v index 34c98e822..c662e160a 100644 --- a/vlib/db/mysql/result.c.v +++ b/vlib/db/mysql/result.c.v @@ -35,25 +35,41 @@ pub struct Field { // fetch_row fetches the next row from a result. pub fn (r Result) fetch_row() &charptr { + mut thread_guard := mysql_thread_guard() or { return unsafe { nil } } + defer { + thread_guard.release() + } return C.mysql_fetch_row(r.result) } // n_rows returns the number of rows from a result. pub fn (r Result) n_rows() u64 { + mut thread_guard := mysql_thread_guard() or { return 0 } + defer { + thread_guard.release() + } return C.mysql_num_rows(r.result) } // n_fields returns the number of columns from a result. pub fn (r Result) n_fields() int { + mut thread_guard := mysql_thread_guard() or { return 0 } + defer { + thread_guard.release() + } return C.mysql_num_fields(r.result) } // rows returns array of rows, each containing an array of values, // one for each column. pub fn (r Result) rows() []Row { + mut thread_guard := mysql_thread_guard() or { return []Row{} } + defer { + thread_guard.release() + } mut rows := []Row{} - nr_cols := r.n_fields() - for rr := r.fetch_row(); rr; rr = r.fetch_row() { + nr_cols := C.mysql_num_fields(r.result) + for rr := C.mysql_fetch_row(r.result); rr; rr = C.mysql_fetch_row(r.result) { mut row := Row{} for i in 0 .. nr_cols { if unsafe { rr[i] == 0 } { @@ -87,8 +103,12 @@ pub fn (r Result) maps() []map[string]string { // The definitions apply primarily for columns of results, // such as those produced by `SELECT` statements. pub fn (r Result) fields() []Field { + mut thread_guard := mysql_thread_guard() or { return []Field{} } + defer { + thread_guard.release() + } mut fields := []Field{} - nr_cols := r.n_fields() + nr_cols := C.mysql_num_fields(r.result) orig_fields := C.mysql_fetch_fields(r.result) for i in 0 .. nr_cols { unsafe { @@ -150,5 +170,9 @@ pub fn (f Field) str() string { // free frees the memory used by a result. @[unsafe] pub fn (r &Result) free() { + mut thread_guard := mysql_thread_guard() or { return } + defer { + thread_guard.release() + } C.mysql_free_result(r.result) } -- 2.39.5