From 87592fe4b8c638a957746150d340468df37555c8 Mon Sep 17 00:00:00 2001 From: Andras Gyori Date: Thu, 7 Aug 2025 13:39:51 +0200 Subject: [PATCH] Add connections in use metrics --- src/conn/pool/futures/get_conn.rs | 32 +++++++----- src/conn/pool/metrics.rs | 8 +-- src/conn/pool/mod.rs | 87 ++++++++++++++++++++++++++++--- 3 files changed, 103 insertions(+), 24 deletions(-) diff --git a/src/conn/pool/futures/get_conn.rs b/src/conn/pool/futures/get_conn.rs index b89f9bc6..3ef8fc18 100644 --- a/src/conn/pool/futures/get_conn.rs +++ b/src/conn/pool/futures/get_conn.rs @@ -6,14 +6,14 @@ // option. All files in the project carrying such notice may not be copied, // modified, or distributed except according to those terms. +use futures_core::ready; +use std::sync::atomic::Ordering; use std::{ fmt, future::Future, pin::Pin, task::{Context, Poll}, }; - -use futures_core::ready; #[cfg(feature = "tracing")] use { std::sync::Arc, @@ -90,6 +90,19 @@ impl GetConn { .take() .expect("GetConn::poll polled after returning Async::Ready") } + + fn ready_connection(&mut self, mut c: Conn) -> Poll<::Output> { + let pool = self.pool_take(); + pool.inner + .metrics + .connections_in_use + .fetch_add(1, Ordering::Relaxed); + + c.inner.pool = Some(pool); + c.inner.reset_upon_returning_to_a_pool = self.reset_upon_returning_to_a_pool; + + Poll::Ready(Ok(c)) + } } // this manual implementation of Future may seem stupid, but we sort @@ -127,18 +140,13 @@ impl Future for GetConn { } GetConnInner::Connecting(ref mut f) => { let result = ready!(Pin::new(f).poll(cx)); - let pool = self.pool_take(); self.inner = GetConnInner::Done; return match result { - Ok(mut c) => { - c.inner.pool = Some(pool); - c.inner.reset_upon_returning_to_a_pool = - self.reset_upon_returning_to_a_pool; - Poll::Ready(Ok(c)) - } + Ok(c) => self.ready_connection(c), Err(e) => { + let pool = self.pool_take(); pool.cancel_connection(); Poll::Ready(Err(e)) } @@ -150,11 +158,7 @@ impl Future for GetConn { Ok(mut c) => { self.inner = GetConnInner::Done; - let pool = self.pool_take(); - c.inner.pool = Some(pool); - c.inner.reset_upon_returning_to_a_pool = - self.reset_upon_returning_to_a_pool; - return Poll::Ready(Ok(c)); + return self.ready_connection(c); } Err(_) => { // Idling connection is broken. We'll drop it and try again. diff --git a/src/conn/pool/metrics.rs b/src/conn/pool/metrics.rs index c5c6a08a..84326e9e 100644 --- a/src/conn/pool/metrics.rs +++ b/src/conn/pool/metrics.rs @@ -5,12 +5,14 @@ use serde::Serialize; #[derive(Default, Debug, Serialize)] #[non_exhaustive] pub struct Metrics { - /// Guage of active connections to the database server, this includes both connections that have belong + /// Gauge of active connections to the database server, this includes both connections that have belong /// to the pool, and connections currently owned by the application. pub connection_count: AtomicUsize, - /// Guage of active connections that currently belong to the pool. + /// Gauge of active connections that currently belong to the pool. pub connections_in_pool: AtomicUsize, - /// Guage of GetConn requests that are currently active. + /// Gauge of active connections that are currently in use by the application (and not in the pool). + pub connections_in_use: AtomicUsize, + /// Gauge of GetConn requests that are currently active. pub active_wait_requests: AtomicUsize, /// Counter of connections that failed to be created. pub create_failed: AtomicUsize, diff --git a/src/conn/pool/mod.rs b/src/conn/pool/mod.rs index 53688d9b..c6320502 100644 --- a/src/conn/pool/mod.rs +++ b/src/conn/pool/mod.rs @@ -10,6 +10,13 @@ use futures_util::FutureExt; use keyed_priority_queue::KeyedPriorityQueue; use tokio::sync::mpsc; +use crate::{ + conn::{pool::futures::*, Conn}, + error::*, + opts::{Opts, PoolOpts}, + queryable::transaction::{Transaction, TxOpts}, +}; +use std::sync::atomic::Ordering; use std::{ borrow::Borrow, cmp::Reverse, @@ -21,13 +28,6 @@ use std::{ time::{Duration, Instant}, }; -use crate::{ - conn::{pool::futures::*, Conn}, - error::*, - opts::{Opts, PoolOpts}, - queryable::transaction::{Transaction, TxOpts}, -}; - pub use metrics::Metrics; mod recycler; @@ -274,6 +274,10 @@ impl Pool { fn return_conn(&mut self, conn: Conn) { // NOTE: we're not in async context here, so we can't block or return NotReady // any and all cleanup work _has_ to be done in the spawned recycler + self.inner + .metrics + .connections_in_use + .fetch_sub(1, Ordering::Relaxed); self.send_to_recycler(conn); } @@ -450,6 +454,10 @@ impl Drop for Conn { if std::thread::panicking() { // Try to decrease the number of existing connections. if let Some(pool) = self.inner.pool.take() { + pool.inner + .metrics + .connections_in_use + .fetch_sub(1, Ordering::Relaxed); pool.cancel_connection(); } @@ -1244,6 +1252,71 @@ mod test { Ok(()) } + #[tokio::test] + async fn connection_in_use_metric() { + let pool = pool_with_one_connection(); + let metrics = pool.metrics(); + + let conn = pool.get_conn().await.unwrap(); + assert_eq!( + metrics + .connections_in_use + .load(std::sync::atomic::Ordering::Relaxed), + 1 + ); + + drop(conn); + assert_eq!( + metrics + .connections_in_use + .load(std::sync::atomic::Ordering::Relaxed), + 0 + ); + loop { + if metrics + .connections_in_pool + .load(std::sync::atomic::Ordering::Relaxed) + == 1 + { + break; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + + // Ensure that we increase the connections in use metric when we get a connection from the pool + let conn = pool.get_conn().await.unwrap(); + assert_eq!( + metrics + .connections_in_use + .load(std::sync::atomic::Ordering::Relaxed), + 1 + ); + assert_eq!( + metrics + .connections_in_pool + .load(std::sync::atomic::Ordering::Relaxed), + 0 + ); + + drop(conn); + assert_eq!( + metrics + .connections_in_use + .load(std::sync::atomic::Ordering::Relaxed), + 0 + ); + loop { + if metrics + .connections_in_pool + .load(std::sync::atomic::Ordering::Relaxed) + == 1 + { + break; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + } + #[cfg(feature = "nightly")] mod bench { use futures_util::future::{FutureExt, TryFutureExt};