From 10f50c54f5ade19764fdaa57c7cec092eb0f856e Mon Sep 17 00:00:00 2001 From: Andras Gyori Date: Thu, 7 Aug 2025 14:05:11 +0200 Subject: [PATCH] Tie connection_in_pool metric to the in pool queue operations --- src/conn/pool/mod.rs | 56 +++++++++++++++++++++++++----- src/conn/pool/ttl_check_inerval.rs | 12 +++---- 2 files changed, 53 insertions(+), 15 deletions(-) diff --git a/src/conn/pool/mod.rs b/src/conn/pool/mod.rs index 53688d9..b2bb5ac 100644 --- a/src/conn/pool/mod.rs +++ b/src/conn/pool/mod.rs @@ -77,7 +77,7 @@ impl From for IdlingConn { #[derive(Debug)] struct Exchange { waiting: Waitlist, - available: VecDeque, + available: InPoolConnections, exist: usize, // only used to spawn the recycler the first time we're in async context recycler: Option<(mpsc::UnboundedReceiver>, PoolOpts)>, @@ -104,6 +104,47 @@ impl Exchange { } } +#[derive(Default, Debug)] +struct InPoolConnections { + connections: VecDeque, + metrics: Arc, +} + +impl InPoolConnections { + fn push_back(&mut self, conn: IdlingConn) { + self.metrics + .connections_in_pool + .fetch_add(1, atomic::Ordering::Relaxed); + self.connections.push_back(conn); + } + + fn pop_back(&mut self) -> Option { + let res = self.connections.pop_back(); + if res.is_some() { + self.metrics + .connections_in_pool + .fetch_sub(1, atomic::Ordering::Relaxed); + } + + res + } + + fn pop_front(&mut self) -> Option { + let res = self.connections.pop_front(); + if res.is_some() { + self.metrics + .connections_in_pool + .fetch_sub(1, atomic::Ordering::Relaxed); + } + + res + } + + fn len(&self) -> usize { + self.connections.len() + } +} + #[derive(Default, Debug)] struct Waitlist { queue: KeyedPriorityQueue, @@ -222,14 +263,18 @@ impl Pool { let opts = Opts::try_from(opts).unwrap(); let pool_opts = opts.pool_opts().clone(); let (tx, rx) = mpsc::unbounded_channel(); + let metrics = Arc::new(Metrics::default()); Pool { opts, inner: Arc::new(Inner { close: false.into(), closed: false.into(), - metrics: Arc::new(Metrics::default()), + metrics: metrics.clone(), exchange: Mutex::new(Exchange { - available: VecDeque::with_capacity(pool_opts.constraints().max()), + available: InPoolConnections { + connections: VecDeque::with_capacity(pool_opts.constraints().max()), + metrics, + }, waiting: Waitlist::default(), exist: 0, recycler: Some((rx, pool_opts)), @@ -383,11 +428,6 @@ impl Pool { } } - self.inner - .metrics - .connections_in_pool - .store(exchange.available.len(), atomic::Ordering::Relaxed); - // we didn't _immediately_ get one -- try to make one // we first try to just do a load so we don't do an unnecessary add then sub if exchange.exist < self.opts.pool_opts().constraints().max() { diff --git a/src/conn/pool/ttl_check_inerval.rs b/src/conn/pool/ttl_check_inerval.rs index 2686e61..7bcdd99 100644 --- a/src/conn/pool/ttl_check_inerval.rs +++ b/src/conn/pool/ttl_check_inerval.rs @@ -15,7 +15,7 @@ use std::{ sync::{atomic::Ordering, Arc}, }; -use super::Inner; +use super::{InPoolConnections, Inner}; use crate::PoolOpts; use futures_core::task::{Context, Poll}; use std::pin::Pin; @@ -53,8 +53,10 @@ impl TtlCheckInterval { .saturating_sub(self.pool_opts.constraints().min()); let mut to_be_dropped = Vec::<_>::with_capacity(exchange.available.len()); - let mut kept_available = - VecDeque::<_>::with_capacity(self.pool_opts.constraints().max()); + let mut kept_available = InPoolConnections { + connections: VecDeque::<_>::with_capacity(self.pool_opts.constraints().max()), + metrics: self.inner.metrics.clone(), + }; while let Some(conn) = exchange.available.pop_front() { if conn.expired() @@ -67,10 +69,6 @@ impl TtlCheckInterval { } } exchange.available = kept_available; - self.inner - .metrics - .connections_in_pool - .store(exchange.available.len(), Ordering::Relaxed); to_be_dropped };