v / vlib / pool / connection.v
742 lines · 657 sloc · 19.95 KB · cab97894aee31afb83a4a4f5a07a14206d7223cd
Raw
1module pool
2
3import sync
4import sync.stdatomic { new_atomic }
5import time
6
7// Eviction channel capacity
8const eviction_ch_cap = 1000
9
10// ConnectionPoolable defines the interface for connection objects
11pub interface ConnectionPoolable {
12mut:
13 // validate checks if the connection is still usable
14 validate() !bool
15 // close terminates the physical connection
16 close() !
17 // reset returns the connection to initial state for reuse
18 reset() !
19}
20
21// ConnectionPoolConfig holds configuration settings for the connection pool
22@[params]
23pub struct ConnectionPoolConfig {
24pub mut:
25 max_conns int = 20 // Maximum allowed connections
26 min_idle_conns int = 5 // Minimum idle connections to maintain
27 max_lifetime time.Duration = time.hour // Max lifetime of a connection
28 idle_timeout time.Duration = 30 * time.minute // Time before idle connections are cleaned up
29 get_timeout time.Duration = 5 * time.second // Max time to wait for a connection
30 retry_base_delay time.Duration = 1 * time.second // Base delay for retry backoff
31 max_retry_delay time.Duration = 30 * time.second // Maximum delay for retry backoff
32 max_retry_attempts int = 5 // Maximum retry attempts
33}
34
35// ConnectionWrapper contains metadata about a pooled connection
36struct ConnectionWrapper {
37mut:
38 conn &ConnectionPoolable // The actual connection object
39 created_at time.Time // When connection was created
40 last_used_at time.Time // Last time connection was used
41 last_valid_at time.Time // Last time connection was validated
42 usage_count int // How many times this connection has been used
43}
44
45// EvictionPriority indicates urgency of connection cleanup
46pub enum EvictionPriority {
47 low // Routine cleanup (connection return)
48 medium // Connection get failure
49 high // Configuration change
50 urgent // Database/Server recovery
51}
52
53// ConnectionPool manages a pool of reusable connections
54pub struct ConnectionPool {
55mut:
56 config ConnectionPoolConfig
57 // Lock order:
58 // config_mutex > create_mutex > idle_pool_mutex > all_conns_mutex > wait_queue_mutex
59 config_mutex &sync.RwMutex @[required] // Guards configuration changes
60 create_mutex &sync.Mutex // Serializes connection creation
61 idle_pool_mutex &sync.RwMutex @[required] // Protects idle connections
62 all_conns_mutex &sync.RwMutex @[required] // Protects all connections map
63 wait_queue_mutex &sync.RwMutex @[required] // Protects wait queue
64 is_closed &stdatomic.AtomicVal[bool] @[required] // Pool shutdown flag
65 stop_ch chan bool // Signals maintenance thread to stop
66 eviction_ch chan EvictionPriority // Eviction event channel
67 cleanup_thread thread // Background maintenance thread
68 wait_queue []chan bool // Client wait queue for connection acquisition
69 conn_factory fn () !&ConnectionPoolable @[required] // Creates new connections
70 active_count &stdatomic.AtomicVal[int] @[required] // Currently checked-out connections
71 created_at time.Time // Pool creation timestamp
72 all_conns map[voidptr]&ConnectionWrapper // All tracked connections
73 idle_pool []&ConnectionWrapper // Currently idle connections
74 creation_errors &stdatomic.AtomicVal[int] @[required] // Failed creation attempts
75 evicted_count &stdatomic.AtomicVal[int] @[required] // Connections forcibly removed
76 creating_count &stdatomic.AtomicVal[int] @[required] // Connections being created
77}
78
79// new_connection_pool creates a new connection pool
80pub fn new_connection_pool(conn_factory fn () !&ConnectionPoolable, config ConnectionPoolConfig) !&ConnectionPool {
81 // Validate configuration parameters
82 check_config(config)!
83
84 mut p := &ConnectionPool{
85 conn_factory: conn_factory
86 config: config
87 config_mutex: sync.new_rwmutex()
88 create_mutex: sync.new_mutex()
89 idle_pool_mutex: sync.new_rwmutex()
90 all_conns_mutex: sync.new_rwmutex()
91 wait_queue_mutex: sync.new_rwmutex()
92 is_closed: new_atomic(false)
93 stop_ch: chan bool{cap: 1}
94 eviction_ch: chan EvictionPriority{cap: eviction_ch_cap}
95 active_count: new_atomic(0)
96 creation_errors: new_atomic(0)
97 evicted_count: new_atomic(0)
98 creating_count: new_atomic(0)
99 all_conns: map[voidptr]&ConnectionWrapper{}
100 }
101
102 now := time.utc()
103 p.created_at = now
104
105 // Initialize minimum idle connections
106 for _ in 0 .. config.min_idle_conns {
107 conn := p.create_conn_with_retry() or {
108 // Cleanup if initialization fails
109 p.all_conns_mutex.lock()
110 for _, mut wrapper in p.all_conns {
111 wrapper.conn.close() or {}
112 }
113 p.all_conns.clear()
114 p.all_conns_mutex.unlock()
115 return err
116 }
117 wrapper := &ConnectionWrapper{
118 conn: conn
119 created_at: now
120 last_used_at: now
121 last_valid_at: now
122 }
123 p.idle_pool << wrapper
124 p.all_conns_mutex.lock()
125 p.all_conns[conn] = wrapper
126 p.all_conns_mutex.unlock()
127 }
128
129 // Start background maintenance thread
130 p.cleanup_thread = spawn p.background_maintenance()
131 // Initial connection pruning
132 p.prune_connections()
133 return p
134}
135
136// create_conn_with_retry creates a connection with exponential backoff retries
137fn (mut p ConnectionPool) create_conn_with_retry() !&ConnectionPoolable {
138 // Get current configuration
139 p.config_mutex.rlock()
140 max_attempts := p.config.max_retry_attempts
141 base_delay := p.config.retry_base_delay
142 max_delay := p.config.max_retry_delay
143 p.config_mutex.unlock()
144
145 // Serialize connection creation
146 p.create_mutex.lock()
147 defer {
148 p.create_mutex.unlock()
149 }
150
151 mut attempt := 0
152 p.creating_count.add(1)
153 defer {
154 p.creating_count.sub(1)
155 }
156
157 for {
158 mut conn := p.conn_factory() or {
159 // Handle creation error with exponential backoff
160 if attempt >= max_attempts {
161 return error('Connection creation failed after ${attempt} attempts: ${err}')
162 }
163
164 // Calculate next delay with exponential backoff
165 mut delay := base_delay * time.Duration(1 << attempt)
166 if delay > max_delay {
167 delay = max_delay
168 }
169
170 time.sleep(delay)
171 attempt++
172 p.creation_errors.add(1)
173 continue
174 }
175
176 // Validate new connection
177 if !conn.validate() or { false } {
178 conn.close() or {}
179 return error('New connection validation failed')
180 }
181 return conn
182 }
183 return error('Unreachable code')
184}
185
186// try_wakeup_waiters attempts to notify waiting clients of available resources
187fn (mut p ConnectionPool) try_wakeup_waiters() {
188 can_create := p.can_create()
189 p.wait_queue_mutex.lock()
190 defer {
191 p.wait_queue_mutex.unlock()
192 }
193
194 // Notify first client if resources are available
195 if p.wait_queue.len > 0 {
196 if p.idle_pool.len > 0 || can_create {
197 to_wake := p.wait_queue[0]
198 p.wait_queue.delete(0)
199 to_wake <- true
200 }
201 }
202}
203
204// can_create checks if new connections can be created
205@[inline]
206fn (mut p ConnectionPool) can_create() bool {
207 p.config_mutex.rlock()
208 max_conns := p.config.max_conns
209 p.config_mutex.unlock()
210 return p.active_count.load() + p.creating_count.load() < max_conns && !p.is_closed.load()
211 && p.all_conns.len < max_conns
212}
213
214// get acquires a connection from the pool with timeout
215pub fn (mut p ConnectionPool) get() !&ConnectionPoolable {
216 start_time := time.utc()
217 for {
218 // Check if pool is closed
219 if p.is_closed.load() {
220 return error('Connection pool closed')
221 }
222
223 // Try immediate acquisition
224 if conn := p.try_get() {
225 p.eviction_ch <- .medium
226 return conn
227 }
228
229 // Check if new connection can be created
230 can_create := p.can_create()
231 if can_create {
232 mut new_conn := p.create_conn_with_retry()!
233
234 // Final check before adding to pool
235 if p.is_closed.load() {
236 new_conn.close()!
237 return error('Connection pool closed')
238 }
239
240 p.config_mutex.rlock()
241 max_conns := p.config.max_conns
242 p.config_mutex.unlock()
243
244 p.all_conns_mutex.lock()
245 if p.all_conns.len < max_conns {
246 // Successfully create and add new connection
247 now := time.utc()
248 wrapper := &ConnectionWrapper{
249 conn: new_conn
250 created_at: now
251 last_used_at: now
252 last_valid_at: now
253 }
254 p.all_conns[new_conn] = wrapper
255 p.all_conns_mutex.unlock()
256 p.active_count.add(1)
257 return new_conn
258 } else {
259 // Connection limit reached - close new connection
260 p.all_conns_mutex.unlock()
261 new_conn.close()!
262 }
263 }
264
265 // Second attempt to get connection
266 if conn := p.try_get() {
267 return conn
268 }
269
270 // Calculate remaining time for connection acquisition
271 p.config_mutex.rlock()
272 timeout := p.config.get_timeout
273 p.config_mutex.unlock()
274 elapsed := time.utc() - start_time
275 if elapsed > timeout {
276 return error('Connection acquisition timeout')
277 }
278 remaining := timeout - elapsed
279
280 // Set up notification channel
281 notify_chan := chan bool{cap: 1}
282 defer {
283 notify_chan.close()
284 }
285
286 // Add to wait queue
287 p.wait_queue_mutex.lock()
288 p.wait_queue << notify_chan
289 p.wait_queue_mutex.unlock()
290
291 select {
292 _ := <-notify_chan {
293 // Notification received - retry acquisition
294 }
295 i64(remaining) {
296 // Timeout cleanup
297 p.wait_queue_mutex.lock()
298 for i := 0; i < p.wait_queue.len; i++ {
299 if p.wait_queue[i] == notify_chan {
300 p.wait_queue.delete(i)
301 break
302 }
303 }
304 p.wait_queue_mutex.unlock()
305 if conn := p.try_get() {
306 return conn
307 }
308 return error('Connection acquisition timeout')
309 }
310 }
311 }
312
313 return error('Unreachable code')
314}
315
316// try_get attempts non-blocking connection acquisition
317fn (mut p ConnectionPool) try_get() ?&ConnectionPoolable {
318 // Get relevant configuration parameters
319 p.config_mutex.rlock()
320 min_idle := p.config.min_idle_conns
321 max_lifetime := p.config.max_lifetime
322 p.config_mutex.unlock()
323
324 p.idle_pool_mutex.lock()
325 defer {
326 p.idle_pool_mutex.unlock()
327 }
328
329 // Determine eviction priority based on idle count
330 priority := if p.idle_pool.len <= min_idle {
331 EvictionPriority.urgent
332 } else if p.idle_pool.len > min_idle * 2 {
333 EvictionPriority.low
334 } else {
335 EvictionPriority.medium
336 }
337 p.eviction_ch <- priority
338
339 // Process idle connections
340 for p.idle_pool.len > 0 {
341 mut wrapper := p.idle_pool.pop()
342
343 // Check connection lifetime
344 age := time.utc() - wrapper.created_at
345 if age > max_lifetime {
346 // Close expired connection
347 p.all_conns_mutex.lock()
348 p.all_conns.delete(wrapper.conn)
349 p.all_conns_mutex.unlock()
350 wrapper.conn.close() or {}
351 continue
352 }
353
354 // Validate connection
355 if !wrapper.conn.validate() or { false } {
356 // Handle invalid connection
357 p.all_conns_mutex.lock()
358 p.all_conns.delete(wrapper.conn)
359 p.all_conns_mutex.unlock()
360 wrapper.conn.close() or {}
361 continue
362 }
363
364 wrapper.last_valid_at = time.utc()
365
366 // Mark connection as active
367 p.active_count.add(1)
368 wrapper.last_used_at = time.utc()
369 wrapper.usage_count++
370 return wrapper.conn
371 }
372 return none
373}
374
375// put returns a connection to the pool
376pub fn (mut p ConnectionPool) put(conn &ConnectionPoolable) ! {
377 if p.active_count.load() > 0 {
378 // TODO: may need a atomic check here, compare_exchange?
379 p.active_count.sub(1)
380 }
381
382 mut conn_ptr := unsafe { conn }
383 // Handle closed pool case
384 if p.is_closed.load() {
385 conn_ptr.close()!
386 return
387 }
388
389 // Reset connection to initial state
390 conn_ptr.reset() or {
391 conn_ptr.close() or {}
392 p.all_conns_mutex.lock()
393 p.all_conns.delete(conn)
394 p.all_conns_mutex.unlock()
395 return err
396 }
397
398 p.idle_pool_mutex.lock()
399 p.all_conns_mutex.lock()
400 defer {
401 p.all_conns_mutex.unlock()
402 p.idle_pool_mutex.unlock()
403 }
404
405 // Return connection to idle pool
406 if mut wrapper := p.all_conns[conn] {
407 wrapper.last_used_at = time.utc()
408 p.idle_pool << wrapper
409
410 // Determine if eviction is needed
411 p.config_mutex.rlock()
412 low_eviction := p.idle_pool.len > p.config.min_idle_conns
413 p.config_mutex.unlock()
414
415 // Wake any waiting clients
416 p.try_wakeup_waiters()
417
418 // Trigger eviction if needed
419 priority := if low_eviction { EvictionPriority.low } else { EvictionPriority.urgent }
420 p.eviction_ch <- priority
421 } else {
422 // Handle unmanaged connection
423 conn_ptr.close()!
424 return error('Unmanaged connection')
425 }
426}
427
428// close shuts down the connection pool and cleans up resources
429pub fn (mut p ConnectionPool) close() {
430 if p.is_closed.load() {
431 return
432 }
433 p.is_closed.store(true)
434
435 // Signal background thread to stop
436 p.stop_ch <- true
437 p.cleanup_thread.wait()
438 p.stop_ch.close()
439
440 // Close all active connections
441 p.idle_pool_mutex.lock()
442 p.all_conns_mutex.lock()
443 for _, mut wrapper in p.all_conns {
444 wrapper.conn.close() or {}
445 }
446 p.all_conns.clear()
447 p.idle_pool.clear()
448 p.all_conns_mutex.unlock()
449 p.idle_pool_mutex.unlock()
450
451 // Process clients in the wait queue
452 p.wait_queue_mutex.lock()
453 waiters := p.wait_queue.clone()
454 p.wait_queue.clear()
455 p.wait_queue_mutex.unlock()
456
457 for ch in waiters {
458 ch <- true // Notify all waiting clients
459 }
460
461 p.eviction_ch.close()
462
463 // Reset all counters
464 p.active_count.store(0)
465 p.creation_errors.store(0)
466 p.evicted_count.store(0)
467 p.creating_count.store(0)
468}
469
470// background_maintenance handles periodic connection cleanup
471fn (mut p ConnectionPool) background_maintenance() {
472 mut first_trigger_time := u64(0)
473 mut event_count := 0
474 mut min_interval := time.infinite // Dynamic processing interval
475
476 for {
477 // Calculate adaptive processing interval
478 p.config_mutex.rlock()
479 dynamic_interval := if p.config.idle_timeout / 10 > time.second {
480 time.second
481 } else {
482 p.config.idle_timeout / 10
483 }
484 p.config_mutex.unlock()
485
486 interval := if min_interval < dynamic_interval {
487 min_interval
488 } else {
489 dynamic_interval
490 }
491
492 select {
493 _ := <-p.stop_ch {
494 // Termination signal received
495 return
496 }
497 priority := <-p.eviction_ch {
498 // Process event based on priority
499 match priority {
500 .low {
501 event_count++
502 min_interval = 100 * time.millisecond
503 }
504 .medium {
505 event_count += 10
506 min_interval = 10 * time.millisecond
507 }
508 .high {
509 event_count += 50
510 min_interval = 1 * time.millisecond
511 }
512 .urgent {
513 event_count += 1000
514 min_interval = 100 * time.microsecond
515 }
516 }
517
518 // Track first event time
519 if first_trigger_time == 0 {
520 first_trigger_time = time.sys_mono_now()
521 }
522
523 elapsed := time.sys_mono_now() - first_trigger_time
524
525 // Determine if immediate processing is needed
526 if priority == .urgent
527 || (priority == .high && elapsed > 100 * time.microsecond)
528 || (priority == .medium && elapsed > 1 * time.millisecond)
529 || (priority == .low && elapsed > 10 * time.millisecond)
530 || event_count >= 1000 {
531 p.prune_connections()
532 event_count = 0
533 first_trigger_time = 0
534 min_interval = time.infinite
535 }
536 }
537 i64(interval) {
538 // Periodic maintenance
539 if event_count > 0 || interval == min_interval {
540 p.prune_connections()
541 event_count = 0
542 first_trigger_time = 0
543 min_interval = time.infinite
544 }
545 }
546 }
547 }
548}
549
550// prune_connections removes invalid connections and maintains min idle count
551fn (mut p ConnectionPool) prune_connections() {
552 // Get current configuration parameters
553 p.config_mutex.rlock()
554 max_lifetime := p.config.max_lifetime
555 idle_timeout := p.config.idle_timeout
556 min_idle := p.config.min_idle_conns
557 p.config_mutex.unlock()
558
559 p.idle_pool_mutex.lock()
560 // Remove stale connections
561 for i := p.idle_pool.len - 1; i >= 0; i-- {
562 mut wrapper := p.idle_pool[i]
563 age := time.utc() - wrapper.created_at
564 idle_time := time.utc() - wrapper.last_used_at
565
566 if age > max_lifetime || idle_time > idle_timeout || !wrapper.conn.validate() or { false } {
567 p.all_conns_mutex.lock()
568 p.all_conns.delete(wrapper.conn)
569 p.all_conns_mutex.unlock()
570 wrapper.conn.close() or {}
571 p.idle_pool.delete(i)
572 p.evicted_count.add(1)
573 } else {
574 wrapper.last_valid_at = time.utc()
575 }
576 }
577 current_idle := p.idle_pool.len
578 p.idle_pool_mutex.unlock()
579
580 // Calculate connections to create
581 to_create := if min_idle > current_idle { min_idle - current_idle } else { 0 }
582
583 // Create needed connections
584 mut new_conns := []&ConnectionPoolable{}
585 if to_create > 0 {
586 for _ in 0 .. to_create {
587 if new_conn := p.create_conn_with_retry() {
588 new_conns << new_conn
589 }
590 }
591 }
592
593 // Check if pool was closed during creation
594 if p.is_closed.load() {
595 for mut new_conn in new_conns {
596 new_conn.close() or {}
597 }
598 return
599 }
600
601 p.config_mutex.rlock()
602 current_min_idle := p.config.min_idle_conns
603 max_conns := p.config.max_conns
604 p.config_mutex.unlock()
605
606 // Add new connections to the pool
607 p.idle_pool_mutex.lock()
608 p.all_conns_mutex.lock()
609 defer {
610 p.all_conns_mutex.unlock()
611 p.idle_pool_mutex.unlock()
612 }
613
614 actual_needed := if current_min_idle > p.idle_pool.len {
615 current_min_idle - p.idle_pool.len
616 } else {
617 0
618 }
619 available_slots := max_conns - p.all_conns.len
620 mut actual_to_add := if actual_needed > new_conns.len { new_conns.len } else { actual_needed }
621 actual_to_add = if actual_to_add > available_slots { available_slots } else { actual_to_add }
622
623 // Create wrapper for each new connection
624 for i in 0 .. actual_to_add {
625 now := time.utc()
626 wrapper := &ConnectionWrapper{
627 conn: new_conns[i]
628 created_at: now
629 last_used_at: now
630 last_valid_at: now
631 }
632 p.idle_pool << wrapper
633 p.all_conns[new_conns[i]] = wrapper
634 }
635
636 // Close any extra connections
637 for i in actual_to_add .. new_conns.len {
638 new_conns[i].close() or {}
639 }
640
641 // Wake clients if connections were added
642 if actual_to_add > 0 {
643 p.try_wakeup_waiters()
644 }
645}
646
647fn check_config(config ConnectionPoolConfig) ! {
648 if config.max_conns <= 0 {
649 return error('max_conns must be positive')
650 }
651 if config.min_idle_conns < 0 {
652 return error('min_idle_conns cannot be negative')
653 }
654 if config.min_idle_conns > config.max_conns {
655 return error('min_idle_conns cannot exceed max_conns')
656 }
657 if config.max_lifetime < 0 {
658 return error('max_lifetime cannot be negative')
659 }
660 if config.idle_timeout < 0 {
661 return error('idle_timeout cannot be negative')
662 }
663 if config.idle_timeout > config.max_lifetime {
664 return error('idle_timeout cannot exceed max_lifetime')
665 }
666 if config.get_timeout < 0 {
667 return error('get_timeout cannot be negative')
668 }
669 if config.retry_base_delay < 0 {
670 return error('retry_base_delay cannot be negative')
671 }
672 if config.max_retry_delay < 0 {
673 return error('max_retry_delay cannot be negative')
674 }
675 if config.max_retry_attempts < 0 {
676 return error('max_retry_attempts cannot be negative')
677 }
678}
679
680// update_config changes the connection pool configuration
681pub fn (mut p ConnectionPool) update_config(config ConnectionPoolConfig) ! {
682 // Validate configuration
683 check_config(config)!
684 // Check pool status
685 if p.is_closed.load() {
686 return error('Connection pool is closed')
687 }
688
689 // Update configuration
690 p.config_mutex.lock()
691 p.config = config
692 p.config_mutex.unlock()
693
694 // Trigger maintenance
695 p.eviction_ch <- .high
696}
697
698// signal_recovery_event notifies the pool of recovery event
699pub fn (mut p ConnectionPool) signal_recovery_event() {
700 p.eviction_ch <- .urgent
701}
702
703// send_eviction triggers a cleanup event
704pub fn (mut p ConnectionPool) send_eviction(priority EvictionPriority) {
705 p.eviction_ch <- priority
706}
707
708// ConnectionPoolStats holds statistics about the pool
709pub struct ConnectionPoolStats {
710pub:
711 total_conns int // All managed connections
712 active_conns int // Currently checked-out connections
713 idle_conns int // Available connections
714 waiting_clients int // Clients waiting for a connection
715 evicted_count int // Connections forcibly removed
716 creation_errors int // Failed creation attempts
717 created_at time.Time // When pool was created
718 creating_count int // Connections being created
719}
720
721// stats retrieves current connection pool statistics
722pub fn (mut p ConnectionPool) stats() ConnectionPoolStats {
723 p.idle_pool_mutex.rlock()
724 p.all_conns_mutex.rlock()
725 p.wait_queue_mutex.rlock()
726 defer {
727 p.wait_queue_mutex.unlock()
728 p.all_conns_mutex.unlock()
729 p.idle_pool_mutex.unlock()
730 }
731
732 return ConnectionPoolStats{
733 total_conns: p.all_conns.len
734 active_conns: p.active_count.load()
735 idle_conns: p.idle_pool.len
736 waiting_clients: p.wait_queue.len
737 evicted_count: p.evicted_count.load()
738 creation_errors: p.creation_errors.load()
739 created_at: p.created_at
740 creating_count: p.creating_count.load()
741 }
742}
743