v / vlib / db / pg / pool.v
375 lines · 359 sloc · 10.38 KB · 2b04a6ecbf16697b4a6ae2e1e02d3a381967e8f0
Raw
1module pg
2
3import sync
4import time
5
6// PoolStats reports the current state of a `DB`'s connection pool.
7pub struct PoolStats {
8pub:
9 max_open_connections int // configured limit (0 = unlimited)
10 open_connections int // total in-use + idle conns
11 in_use int // conns currently checked out
12 idle int // conns parked as idle
13 wait_count int // number of callers currently blocked on acquire
14}
15
16// PoolConfig configures pool behavior at construction time.
17@[params]
18pub struct PoolConfig {
19pub:
20 max_open_conns int // 0 = unlimited
21 max_idle_conns int = 2 // 0 = keep no idle conns
22 conn_max_lifetime time.Duration // 0 = unlimited
23}
24
25// IdleSlot is the pool's internal representation of a pooled libpq handle.
26// The pool tracks raw PGconn* metadata in idle/waiter channels; `acquire`
27// allocates a fresh `&Conn` wrapping the slot and `release` extracts the
28// metadata back. Keeping the wrapper separate from the slot means a stale
29// `&Conn` kept by user code after `close()` cannot be revived even when
30// the pool re-hands the same physical PGconn* to another caller.
31struct IdleSlot {
32 handle voidptr
33 created_at time.Time
34mut:
35 bad bool
36}
37
38@[heap]
39struct Pool {
40mut:
41 mu &sync.Mutex = unsafe { nil }
42 conninfo string
43 max_open int
44 max_idle int
45 max_lifetime time.Duration
46 idle []IdleSlot
47 open_count int
48 waiters []chan IdleSlot
49 closed bool
50 // last_ids stores the per-thread last-inserted id so DB.last_id() returns
51 // the value captured on the same pooled conn that ran the INSERT, instead
52 // of calling LASTVAL() on whatever conn the pool happens to hand out next
53 // (which is session-scoped and would return 0 or a wrong value).
54 last_ids_mu &sync.Mutex = unsafe { nil }
55 last_ids map[u64]i64
56}
57
58fn new_pool(conninfo string, cfg PoolConfig) &Pool {
59 mut p := &Pool{
60 mu: sync.new_mutex()
61 last_ids_mu: sync.new_mutex()
62 conninfo: conninfo
63 max_open: cfg.max_open_conns
64 max_idle: cfg.max_idle_conns
65 max_lifetime: cfg.conn_max_lifetime
66 }
67 if p.max_open < 0 {
68 p.max_open = 0
69 }
70 if p.max_idle < 0 {
71 p.max_idle = 0
72 }
73 return p
74}
75
76// wrap_slot builds the fresh `&Conn` that gets handed to a caller. Each
77// acquire allocates a new wrapper so a previously-checked-out (and since
78// released) `&Conn` reference can never be reused to reach the underlying
79// PGconn*.
80fn (p &Pool) wrap_slot(slot IdleSlot) &Conn {
81 return &Conn{
82 conn: slot.handle
83 pool: unsafe { p }
84 created_at: slot.created_at
85 bad: slot.bad
86 }
87}
88
89fn (mut p Pool) acquire() !&Conn {
90 for {
91 p.mu.lock()
92 if p.closed {
93 p.mu.unlock()
94 return error('pg: pool is closed')
95 }
96 // Pop newest idle slot (LIFO keeps the freshest connection warm)
97 for p.idle.len > 0 {
98 slot := p.idle.last()
99 p.idle.delete_last()
100 if slot_expired(slot, p.max_lifetime) || slot_bad(slot) {
101 physical_close_handle(slot.handle)
102 p.open_count--
103 continue
104 }
105 p.mu.unlock()
106 return p.wrap_slot(slot)
107 }
108 // Capacity available: open a new conn outside the lock
109 if p.max_open == 0 || p.open_count < p.max_open {
110 p.open_count++
111 conninfo := p.conninfo
112 p.mu.unlock()
113 slot := connect_slot(conninfo) or {
114 p.mu.lock()
115 p.open_count--
116 p.mu.unlock()
117 return err
118 }
119 // close() may have run while we were dialing outside the lock.
120 // Honor the close contract by tearing the fresh handle down
121 // instead of returning a live Conn after shutdown.
122 p.mu.lock()
123 if p.closed {
124 p.open_count--
125 p.mu.unlock()
126 physical_close_handle(slot.handle)
127 return error('pg: pool is closed')
128 }
129 p.mu.unlock()
130 return p.wrap_slot(slot)
131 }
132 // At capacity: wait for a release/close/cap-raise to signal us.
133 // Senders transfer slot ownership via this channel; a sentinel slot
134 // with a nil handle means "capacity changed, retry the acquire
135 // loop" (used by set_max_open when raising the limit).
136 waiter := chan IdleSlot{cap: 1}
137 p.waiters << waiter
138 p.mu.unlock()
139 slot := <-waiter or { return error('pg: pool was closed while waiting for connection') }
140 if isnil(slot.handle) {
141 continue
142 }
143 return p.wrap_slot(slot)
144 }
145 return error('pg: unreachable')
146}
147
148fn (mut p Pool) release(conn &Conn) {
149 if isnil(conn) {
150 return
151 }
152 mut c := unsafe { conn }
153 // Detach the wrapper from the physical handle before doing anything else.
154 // This is what makes a stale `&Conn` reference held by user code inert:
155 // any subsequent method call on it will see `c.conn == nil` and error.
156 // It is also the idempotency guard against double-close — a second
157 // release() finds nothing to return to the pool and is a no-op.
158 if isnil(c.conn) {
159 return
160 }
161 slot := IdleSlot{
162 handle: c.conn
163 created_at: c.created_at
164 bad: c.bad
165 }
166 c.conn = unsafe { nil }
167 c.pool = unsafe { nil }
168 p.mu.lock()
169 if p.closed {
170 // close() only decremented open_count for slots it found parked;
171 // in-use conns are accounted for here as they trickle back.
172 p.open_count--
173 p.mu.unlock()
174 physical_close_handle(slot.handle)
175 return
176 }
177 // Discard broken or expired conns
178 if slot_bad(slot) || slot_expired(slot, p.max_lifetime) {
179 physical_close_handle(slot.handle)
180 p.open_count--
181 // A capacity slot just opened; hand it to a waiter as a fresh conn
182 if p.waiters.len > 0 && (p.max_open == 0 || p.open_count < p.max_open) {
183 waiter := p.waiters[0]
184 p.waiters.delete(0)
185 p.open_count++
186 conninfo := p.conninfo
187 p.mu.unlock()
188 new_slot := connect_slot(conninfo) or {
189 p.mu.lock()
190 p.open_count--
191 // Capacity is now open but the dial just failed. Wake every
192 // other parked waiter too, otherwise they hang forever waiting
193 // for a release that will never come (e.g. max_open=1).
194 extras := p.waiters.clone()
195 p.waiters = []chan IdleSlot{}
196 p.mu.unlock()
197 waiter.close()
198 for w in extras {
199 w.close()
200 }
201 return
202 }
203 p.mu.lock()
204 if p.closed {
205 // Pool closed during the dial: drop the new conn and signal the waiter.
206 p.open_count--
207 p.mu.unlock()
208 physical_close_handle(new_slot.handle)
209 waiter.close()
210 return
211 }
212 waiter <- new_slot
213 p.mu.unlock()
214 return
215 }
216 p.mu.unlock()
217 return
218 }
219 // If a recent set_max_open() shrank the cap below open_count, this
220 // returning slot has to be retired even when a waiter is queued —
221 // otherwise the fast hand-off keeps open_count pinned above the new
222 // limit forever under steady traffic. Wake the waiter with a retry
223 // sentinel so it re-evaluates capacity under the new cap.
224 if p.max_open > 0 && p.open_count > p.max_open {
225 physical_close_handle(slot.handle)
226 p.open_count--
227 if p.waiters.len > 0 {
228 waiter := p.waiters[0]
229 p.waiters.delete(0)
230 waiter <- IdleSlot{
231 handle: unsafe { nil }
232 }
233 }
234 p.mu.unlock()
235 return
236 }
237 // Healthy conn: prefer handing it directly to a waiter. Send under the
238 // lock (cap:1 makes it non-blocking) so close() can't slip in and orphan
239 // the popped waiter with a live conn.
240 if p.waiters.len > 0 {
241 waiter := p.waiters[0]
242 p.waiters.delete(0)
243 waiter <- slot
244 p.mu.unlock()
245 return
246 }
247 // Park as idle, unless we'd exceed max_idle (0 = keep no idle conns)
248 // or we are over a recently-shrunk max_open and need to converge.
249 if p.idle.len >= p.max_idle || (p.max_open > 0 && p.open_count > p.max_open) {
250 physical_close_handle(slot.handle)
251 p.open_count--
252 p.mu.unlock()
253 return
254 }
255 p.idle << slot
256 p.mu.unlock()
257}
258
259fn (mut p Pool) close() {
260 p.mu.lock()
261 if p.closed {
262 p.mu.unlock()
263 return
264 }
265 p.closed = true
266 for slot in p.idle {
267 physical_close_handle(slot.handle)
268 }
269 idle_len := p.idle.len
270 p.idle = []IdleSlot{}
271 p.open_count -= idle_len
272 for waiter in p.waiters {
273 waiter.close()
274 }
275 p.waiters = []chan IdleSlot{}
276 p.mu.unlock()
277}
278
279fn (mut p Pool) stats() PoolStats {
280 p.mu.lock()
281 stats := PoolStats{
282 max_open_connections: p.max_open
283 open_connections: p.open_count
284 idle: p.idle.len
285 in_use: p.open_count - p.idle.len
286 wait_count: p.waiters.len
287 }
288 p.mu.unlock()
289 return stats
290}
291
292fn (mut p Pool) set_max_open(n int) {
293 mut nn := n
294 if nn < 0 {
295 nn = 0
296 }
297 p.mu.lock()
298 p.max_open = nn
299 // Shrink: drop idle slots until we are back under the new cap. Without
300 // this, acquire() pops idle slots before checking max_open, so two
301 // callers could still both succeed after set_max_open_conns(1) when two
302 // warm conns are parked. In-use conns can't be reclaimed here — they get
303 // discarded on release once open_count is over the cap.
304 if nn > 0 {
305 for p.idle.len > 0 && p.open_count > nn {
306 slot := p.idle.last()
307 p.idle.delete_last()
308 physical_close_handle(slot.handle)
309 p.open_count--
310 }
311 }
312 // Raise: wake parked waiters so they can retry the acquire loop and
313 // dial against the new headroom. Without this nudge they stay blocked
314 // until some other release happens, which may never come if all current
315 // conns are long-lived.
316 if p.waiters.len > 0 && (nn == 0 || p.open_count < nn) {
317 spare := if nn == 0 { p.waiters.len } else { nn - p.open_count }
318 mut n_wake := if spare < p.waiters.len { spare } else { p.waiters.len }
319 for n_wake > 0 {
320 waiter := p.waiters[0]
321 p.waiters.delete(0)
322 waiter <- IdleSlot{
323 handle: unsafe { nil }
324 }
325 n_wake--
326 }
327 }
328 p.mu.unlock()
329}
330
331fn (mut p Pool) set_max_idle(n int) {
332 mut nn := n
333 if nn < 0 {
334 nn = 0
335 }
336 p.mu.lock()
337 p.max_idle = nn
338 for p.idle.len > nn {
339 slot := p.idle.last()
340 p.idle.delete_last()
341 physical_close_handle(slot.handle)
342 p.open_count--
343 }
344 p.mu.unlock()
345}
346
347fn (mut p Pool) set_conn_max_lifetime(d time.Duration) {
348 p.mu.lock()
349 p.max_lifetime = d
350 p.mu.unlock()
351}
352
353// stash_last_id records `id` as the last-inserted id for the current thread.
354// Called by `DB.insert` right after running INSERT on the pinned conn, so the
355// next `DB.last_id()` call on the same thread returns this exact value rather
356// than running LASTVAL() on a different pooled session.
357fn (mut p Pool) stash_last_id(id int) {
358 tid := sync.thread_id()
359 p.last_ids_mu.lock()
360 p.last_ids[tid] = i64(id)
361 p.last_ids_mu.unlock()
362}
363
364// take_last_id returns the last-inserted id stashed by this thread's most
365// recent `DB.insert`, or 0 if there is none. Consuming on read keeps the
366// map bounded as threads come and go, and prevents a future thread that
367// reuses this tid from observing a stale id before doing its own insert.
368fn (mut p Pool) take_last_id() int {
369 tid := sync.thread_id()
370 p.last_ids_mu.lock()
371 id := p.last_ids[tid] or { i64(0) }
372 p.last_ids.delete(tid)
373 p.last_ids_mu.unlock()
374 return int(id)
375}
376