| 1 | module pg |
| 2 | |
| 3 | import sync |
| 4 | import time |
| 5 | |
| 6 | // PoolStats reports the current state of a `DB`'s connection pool. |
| 7 | pub struct PoolStats { |
| 8 | pub: |
| 9 | max_open_connections int // configured limit (0 = unlimited) |
| 10 | open_connections int // total in-use + idle conns |
| 11 | in_use int // conns currently checked out |
| 12 | idle int // conns parked as idle |
| 13 | wait_count int // number of callers currently blocked on acquire |
| 14 | } |
| 15 | |
| 16 | // PoolConfig configures pool behavior at construction time. |
| 17 | @[params] |
| 18 | pub struct PoolConfig { |
| 19 | pub: |
| 20 | max_open_conns int // 0 = unlimited |
| 21 | max_idle_conns int = 2 // 0 = keep no idle conns |
| 22 | conn_max_lifetime time.Duration // 0 = unlimited |
| 23 | } |
| 24 | |
| 25 | // IdleSlot is the pool's internal representation of a pooled libpq handle. |
| 26 | // The pool tracks raw PGconn* metadata in idle/waiter channels; `acquire` |
| 27 | // allocates a fresh `&Conn` wrapping the slot and `release` extracts the |
| 28 | // metadata back. Keeping the wrapper separate from the slot means a stale |
| 29 | // `&Conn` kept by user code after `close()` cannot be revived even when |
| 30 | // the pool re-hands the same physical PGconn* to another caller. |
| 31 | struct IdleSlot { |
| 32 | handle voidptr |
| 33 | created_at time.Time |
| 34 | mut: |
| 35 | bad bool |
| 36 | } |
| 37 | |
| 38 | @[heap] |
| 39 | struct Pool { |
| 40 | mut: |
| 41 | mu &sync.Mutex = unsafe { nil } |
| 42 | conninfo string |
| 43 | max_open int |
| 44 | max_idle int |
| 45 | max_lifetime time.Duration |
| 46 | idle []IdleSlot |
| 47 | open_count int |
| 48 | waiters []chan IdleSlot |
| 49 | closed bool |
| 50 | // last_ids stores the per-thread last-inserted id so DB.last_id() returns |
| 51 | // the value captured on the same pooled conn that ran the INSERT, instead |
| 52 | // of calling LASTVAL() on whatever conn the pool happens to hand out next |
| 53 | // (which is session-scoped and would return 0 or a wrong value). |
| 54 | last_ids_mu &sync.Mutex = unsafe { nil } |
| 55 | last_ids map[u64]i64 |
| 56 | } |
| 57 | |
| 58 | fn new_pool(conninfo string, cfg PoolConfig) &Pool { |
| 59 | mut p := &Pool{ |
| 60 | mu: sync.new_mutex() |
| 61 | last_ids_mu: sync.new_mutex() |
| 62 | conninfo: conninfo |
| 63 | max_open: cfg.max_open_conns |
| 64 | max_idle: cfg.max_idle_conns |
| 65 | max_lifetime: cfg.conn_max_lifetime |
| 66 | } |
| 67 | if p.max_open < 0 { |
| 68 | p.max_open = 0 |
| 69 | } |
| 70 | if p.max_idle < 0 { |
| 71 | p.max_idle = 0 |
| 72 | } |
| 73 | return p |
| 74 | } |
| 75 | |
| 76 | // wrap_slot builds the fresh `&Conn` that gets handed to a caller. Each |
| 77 | // acquire allocates a new wrapper so a previously-checked-out (and since |
| 78 | // released) `&Conn` reference can never be reused to reach the underlying |
| 79 | // PGconn*. |
| 80 | fn (p &Pool) wrap_slot(slot IdleSlot) &Conn { |
| 81 | return &Conn{ |
| 82 | conn: slot.handle |
| 83 | pool: unsafe { p } |
| 84 | created_at: slot.created_at |
| 85 | bad: slot.bad |
| 86 | } |
| 87 | } |
| 88 | |
| 89 | fn (mut p Pool) acquire() !&Conn { |
| 90 | for { |
| 91 | p.mu.lock() |
| 92 | if p.closed { |
| 93 | p.mu.unlock() |
| 94 | return error('pg: pool is closed') |
| 95 | } |
| 96 | // Pop newest idle slot (LIFO keeps the freshest connection warm) |
| 97 | for p.idle.len > 0 { |
| 98 | slot := p.idle.last() |
| 99 | p.idle.delete_last() |
| 100 | if slot_expired(slot, p.max_lifetime) || slot_bad(slot) { |
| 101 | physical_close_handle(slot.handle) |
| 102 | p.open_count-- |
| 103 | continue |
| 104 | } |
| 105 | p.mu.unlock() |
| 106 | return p.wrap_slot(slot) |
| 107 | } |
| 108 | // Capacity available: open a new conn outside the lock |
| 109 | if p.max_open == 0 || p.open_count < p.max_open { |
| 110 | p.open_count++ |
| 111 | conninfo := p.conninfo |
| 112 | p.mu.unlock() |
| 113 | slot := connect_slot(conninfo) or { |
| 114 | p.mu.lock() |
| 115 | p.open_count-- |
| 116 | p.mu.unlock() |
| 117 | return err |
| 118 | } |
| 119 | // close() may have run while we were dialing outside the lock. |
| 120 | // Honor the close contract by tearing the fresh handle down |
| 121 | // instead of returning a live Conn after shutdown. |
| 122 | p.mu.lock() |
| 123 | if p.closed { |
| 124 | p.open_count-- |
| 125 | p.mu.unlock() |
| 126 | physical_close_handle(slot.handle) |
| 127 | return error('pg: pool is closed') |
| 128 | } |
| 129 | p.mu.unlock() |
| 130 | return p.wrap_slot(slot) |
| 131 | } |
| 132 | // At capacity: wait for a release/close/cap-raise to signal us. |
| 133 | // Senders transfer slot ownership via this channel; a sentinel slot |
| 134 | // with a nil handle means "capacity changed, retry the acquire |
| 135 | // loop" (used by set_max_open when raising the limit). |
| 136 | waiter := chan IdleSlot{cap: 1} |
| 137 | p.waiters << waiter |
| 138 | p.mu.unlock() |
| 139 | slot := <-waiter or { return error('pg: pool was closed while waiting for connection') } |
| 140 | if isnil(slot.handle) { |
| 141 | continue |
| 142 | } |
| 143 | return p.wrap_slot(slot) |
| 144 | } |
| 145 | return error('pg: unreachable') |
| 146 | } |
| 147 | |
| 148 | fn (mut p Pool) release(conn &Conn) { |
| 149 | if isnil(conn) { |
| 150 | return |
| 151 | } |
| 152 | mut c := unsafe { conn } |
| 153 | // Detach the wrapper from the physical handle before doing anything else. |
| 154 | // This is what makes a stale `&Conn` reference held by user code inert: |
| 155 | // any subsequent method call on it will see `c.conn == nil` and error. |
| 156 | // It is also the idempotency guard against double-close — a second |
| 157 | // release() finds nothing to return to the pool and is a no-op. |
| 158 | if isnil(c.conn) { |
| 159 | return |
| 160 | } |
| 161 | slot := IdleSlot{ |
| 162 | handle: c.conn |
| 163 | created_at: c.created_at |
| 164 | bad: c.bad |
| 165 | } |
| 166 | c.conn = unsafe { nil } |
| 167 | c.pool = unsafe { nil } |
| 168 | p.mu.lock() |
| 169 | if p.closed { |
| 170 | // close() only decremented open_count for slots it found parked; |
| 171 | // in-use conns are accounted for here as they trickle back. |
| 172 | p.open_count-- |
| 173 | p.mu.unlock() |
| 174 | physical_close_handle(slot.handle) |
| 175 | return |
| 176 | } |
| 177 | // Discard broken or expired conns |
| 178 | if slot_bad(slot) || slot_expired(slot, p.max_lifetime) { |
| 179 | physical_close_handle(slot.handle) |
| 180 | p.open_count-- |
| 181 | // A capacity slot just opened; hand it to a waiter as a fresh conn |
| 182 | if p.waiters.len > 0 && (p.max_open == 0 || p.open_count < p.max_open) { |
| 183 | waiter := p.waiters[0] |
| 184 | p.waiters.delete(0) |
| 185 | p.open_count++ |
| 186 | conninfo := p.conninfo |
| 187 | p.mu.unlock() |
| 188 | new_slot := connect_slot(conninfo) or { |
| 189 | p.mu.lock() |
| 190 | p.open_count-- |
| 191 | // Capacity is now open but the dial just failed. Wake every |
| 192 | // other parked waiter too, otherwise they hang forever waiting |
| 193 | // for a release that will never come (e.g. max_open=1). |
| 194 | extras := p.waiters.clone() |
| 195 | p.waiters = []chan IdleSlot{} |
| 196 | p.mu.unlock() |
| 197 | waiter.close() |
| 198 | for w in extras { |
| 199 | w.close() |
| 200 | } |
| 201 | return |
| 202 | } |
| 203 | p.mu.lock() |
| 204 | if p.closed { |
| 205 | // Pool closed during the dial: drop the new conn and signal the waiter. |
| 206 | p.open_count-- |
| 207 | p.mu.unlock() |
| 208 | physical_close_handle(new_slot.handle) |
| 209 | waiter.close() |
| 210 | return |
| 211 | } |
| 212 | waiter <- new_slot |
| 213 | p.mu.unlock() |
| 214 | return |
| 215 | } |
| 216 | p.mu.unlock() |
| 217 | return |
| 218 | } |
| 219 | // If a recent set_max_open() shrank the cap below open_count, this |
| 220 | // returning slot has to be retired even when a waiter is queued — |
| 221 | // otherwise the fast hand-off keeps open_count pinned above the new |
| 222 | // limit forever under steady traffic. Wake the waiter with a retry |
| 223 | // sentinel so it re-evaluates capacity under the new cap. |
| 224 | if p.max_open > 0 && p.open_count > p.max_open { |
| 225 | physical_close_handle(slot.handle) |
| 226 | p.open_count-- |
| 227 | if p.waiters.len > 0 { |
| 228 | waiter := p.waiters[0] |
| 229 | p.waiters.delete(0) |
| 230 | waiter <- IdleSlot{ |
| 231 | handle: unsafe { nil } |
| 232 | } |
| 233 | } |
| 234 | p.mu.unlock() |
| 235 | return |
| 236 | } |
| 237 | // Healthy conn: prefer handing it directly to a waiter. Send under the |
| 238 | // lock (cap:1 makes it non-blocking) so close() can't slip in and orphan |
| 239 | // the popped waiter with a live conn. |
| 240 | if p.waiters.len > 0 { |
| 241 | waiter := p.waiters[0] |
| 242 | p.waiters.delete(0) |
| 243 | waiter <- slot |
| 244 | p.mu.unlock() |
| 245 | return |
| 246 | } |
| 247 | // Park as idle, unless we'd exceed max_idle (0 = keep no idle conns) |
| 248 | // or we are over a recently-shrunk max_open and need to converge. |
| 249 | if p.idle.len >= p.max_idle || (p.max_open > 0 && p.open_count > p.max_open) { |
| 250 | physical_close_handle(slot.handle) |
| 251 | p.open_count-- |
| 252 | p.mu.unlock() |
| 253 | return |
| 254 | } |
| 255 | p.idle << slot |
| 256 | p.mu.unlock() |
| 257 | } |
| 258 | |
| 259 | fn (mut p Pool) close() { |
| 260 | p.mu.lock() |
| 261 | if p.closed { |
| 262 | p.mu.unlock() |
| 263 | return |
| 264 | } |
| 265 | p.closed = true |
| 266 | for slot in p.idle { |
| 267 | physical_close_handle(slot.handle) |
| 268 | } |
| 269 | idle_len := p.idle.len |
| 270 | p.idle = []IdleSlot{} |
| 271 | p.open_count -= idle_len |
| 272 | for waiter in p.waiters { |
| 273 | waiter.close() |
| 274 | } |
| 275 | p.waiters = []chan IdleSlot{} |
| 276 | p.mu.unlock() |
| 277 | } |
| 278 | |
| 279 | fn (mut p Pool) stats() PoolStats { |
| 280 | p.mu.lock() |
| 281 | stats := PoolStats{ |
| 282 | max_open_connections: p.max_open |
| 283 | open_connections: p.open_count |
| 284 | idle: p.idle.len |
| 285 | in_use: p.open_count - p.idle.len |
| 286 | wait_count: p.waiters.len |
| 287 | } |
| 288 | p.mu.unlock() |
| 289 | return stats |
| 290 | } |
| 291 | |
| 292 | fn (mut p Pool) set_max_open(n int) { |
| 293 | mut nn := n |
| 294 | if nn < 0 { |
| 295 | nn = 0 |
| 296 | } |
| 297 | p.mu.lock() |
| 298 | p.max_open = nn |
| 299 | // Shrink: drop idle slots until we are back under the new cap. Without |
| 300 | // this, acquire() pops idle slots before checking max_open, so two |
| 301 | // callers could still both succeed after set_max_open_conns(1) when two |
| 302 | // warm conns are parked. In-use conns can't be reclaimed here — they get |
| 303 | // discarded on release once open_count is over the cap. |
| 304 | if nn > 0 { |
| 305 | for p.idle.len > 0 && p.open_count > nn { |
| 306 | slot := p.idle.last() |
| 307 | p.idle.delete_last() |
| 308 | physical_close_handle(slot.handle) |
| 309 | p.open_count-- |
| 310 | } |
| 311 | } |
| 312 | // Raise: wake parked waiters so they can retry the acquire loop and |
| 313 | // dial against the new headroom. Without this nudge they stay blocked |
| 314 | // until some other release happens, which may never come if all current |
| 315 | // conns are long-lived. |
| 316 | if p.waiters.len > 0 && (nn == 0 || p.open_count < nn) { |
| 317 | spare := if nn == 0 { p.waiters.len } else { nn - p.open_count } |
| 318 | mut n_wake := if spare < p.waiters.len { spare } else { p.waiters.len } |
| 319 | for n_wake > 0 { |
| 320 | waiter := p.waiters[0] |
| 321 | p.waiters.delete(0) |
| 322 | waiter <- IdleSlot{ |
| 323 | handle: unsafe { nil } |
| 324 | } |
| 325 | n_wake-- |
| 326 | } |
| 327 | } |
| 328 | p.mu.unlock() |
| 329 | } |
| 330 | |
| 331 | fn (mut p Pool) set_max_idle(n int) { |
| 332 | mut nn := n |
| 333 | if nn < 0 { |
| 334 | nn = 0 |
| 335 | } |
| 336 | p.mu.lock() |
| 337 | p.max_idle = nn |
| 338 | for p.idle.len > nn { |
| 339 | slot := p.idle.last() |
| 340 | p.idle.delete_last() |
| 341 | physical_close_handle(slot.handle) |
| 342 | p.open_count-- |
| 343 | } |
| 344 | p.mu.unlock() |
| 345 | } |
| 346 | |
| 347 | fn (mut p Pool) set_conn_max_lifetime(d time.Duration) { |
| 348 | p.mu.lock() |
| 349 | p.max_lifetime = d |
| 350 | p.mu.unlock() |
| 351 | } |
| 352 | |
| 353 | // stash_last_id records `id` as the last-inserted id for the current thread. |
| 354 | // Called by `DB.insert` right after running INSERT on the pinned conn, so the |
| 355 | // next `DB.last_id()` call on the same thread returns this exact value rather |
| 356 | // than running LASTVAL() on a different pooled session. |
| 357 | fn (mut p Pool) stash_last_id(id int) { |
| 358 | tid := sync.thread_id() |
| 359 | p.last_ids_mu.lock() |
| 360 | p.last_ids[tid] = i64(id) |
| 361 | p.last_ids_mu.unlock() |
| 362 | } |
| 363 | |
| 364 | // take_last_id returns the last-inserted id stashed by this thread's most |
| 365 | // recent `DB.insert`, or 0 if there is none. Consuming on read keeps the |
| 366 | // map bounded as threads come and go, and prevents a future thread that |
| 367 | // reuses this tid from observing a stale id before doing its own insert. |
| 368 | fn (mut p Pool) take_last_id() int { |
| 369 | tid := sync.thread_id() |
| 370 | p.last_ids_mu.lock() |
| 371 | id := p.last_ids[tid] or { i64(0) } |
| 372 | p.last_ids.delete(tid) |
| 373 | p.last_ids_mu.unlock() |
| 374 | return int(id) |
| 375 | } |
| 376 | |