Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 18 additions & 14 deletions src/conn/pool/futures/get_conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
// option. All files in the project carrying such notice may not be copied,
// modified, or distributed except according to those terms.

use futures_core::ready;
use std::sync::atomic::Ordering;
use std::{
fmt,
future::Future,
pin::Pin,
task::{Context, Poll},
};

use futures_core::ready;
#[cfg(feature = "tracing")]
use {
std::sync::Arc,
Expand Down Expand Up @@ -90,6 +90,19 @@ impl GetConn {
.take()
.expect("GetConn::poll polled after returning Async::Ready")
}

fn ready_connection(&mut self, mut c: Conn) -> Poll<<GetConn as Future>::Output> {
let pool = self.pool_take();
pool.inner
.metrics
.connections_in_use
.fetch_add(1, Ordering::Relaxed);

c.inner.pool = Some(pool);
c.inner.reset_upon_returning_to_a_pool = self.reset_upon_returning_to_a_pool;

Poll::Ready(Ok(c))
}
}

// this manual implementation of Future may seem stupid, but we sort
Expand Down Expand Up @@ -127,18 +140,13 @@ impl Future for GetConn {
}
GetConnInner::Connecting(ref mut f) => {
let result = ready!(Pin::new(f).poll(cx));
let pool = self.pool_take();

self.inner = GetConnInner::Done;

return match result {
Ok(mut c) => {
c.inner.pool = Some(pool);
c.inner.reset_upon_returning_to_a_pool =
self.reset_upon_returning_to_a_pool;
Poll::Ready(Ok(c))
}
Ok(c) => self.ready_connection(c),
Err(e) => {
let pool = self.pool_take();
pool.cancel_connection();
Poll::Ready(Err(e))
}
Expand All @@ -150,11 +158,7 @@ impl Future for GetConn {
Ok(mut c) => {
self.inner = GetConnInner::Done;

let pool = self.pool_take();
c.inner.pool = Some(pool);
c.inner.reset_upon_returning_to_a_pool =
self.reset_upon_returning_to_a_pool;
return Poll::Ready(Ok(c));
return self.ready_connection(c);
}
Err(_) => {
// Idling connection is broken. We'll drop it and try again.
Expand Down
8 changes: 5 additions & 3 deletions src/conn/pool/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ use serde::Serialize;
#[derive(Default, Debug, Serialize)]
#[non_exhaustive]
pub struct Metrics {
/// Guage of active connections to the database server, this includes both connections that have belong
/// Gauge of active connections to the database server, this includes both connections that have belong
/// to the pool, and connections currently owned by the application.
pub connection_count: AtomicUsize,
/// Guage of active connections that currently belong to the pool.
/// Gauge of active connections that currently belong to the pool.
pub connections_in_pool: AtomicUsize,
/// Guage of GetConn requests that are currently active.
/// Gauge of active connections that are currently in use by the application (and not in the pool).
pub connections_in_use: AtomicUsize,
/// Gauge of GetConn requests that are currently active.
pub active_wait_requests: AtomicUsize,
/// Counter of connections that failed to be created.
pub create_failed: AtomicUsize,
Expand Down
87 changes: 80 additions & 7 deletions src/conn/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ use futures_util::FutureExt;
use keyed_priority_queue::KeyedPriorityQueue;
use tokio::sync::mpsc;

use crate::{
conn::{pool::futures::*, Conn},
error::*,
opts::{Opts, PoolOpts},
queryable::transaction::{Transaction, TxOpts},
};
use std::sync::atomic::Ordering;
use std::{
borrow::Borrow,
cmp::Reverse,
Expand All @@ -21,13 +28,6 @@ use std::{
time::{Duration, Instant},
};

use crate::{
conn::{pool::futures::*, Conn},
error::*,
opts::{Opts, PoolOpts},
queryable::transaction::{Transaction, TxOpts},
};

pub use metrics::Metrics;

mod recycler;
Expand Down Expand Up @@ -274,6 +274,10 @@ impl Pool {
fn return_conn(&mut self, conn: Conn) {
// NOTE: we're not in async context here, so we can't block or return NotReady
// any and all cleanup work _has_ to be done in the spawned recycler
self.inner
.metrics
.connections_in_use
.fetch_sub(1, Ordering::Relaxed);
self.send_to_recycler(conn);
}

Expand Down Expand Up @@ -450,6 +454,10 @@ impl Drop for Conn {
if std::thread::panicking() {
// Try to decrease the number of existing connections.
if let Some(pool) = self.inner.pool.take() {
pool.inner
.metrics
.connections_in_use
.fetch_sub(1, Ordering::Relaxed);
pool.cancel_connection();
}

Expand Down Expand Up @@ -1244,6 +1252,71 @@ mod test {
Ok(())
}

#[tokio::test]
async fn connection_in_use_metric() {
let pool = pool_with_one_connection();
let metrics = pool.metrics();

let conn = pool.get_conn().await.unwrap();
assert_eq!(
metrics
.connections_in_use
.load(std::sync::atomic::Ordering::Relaxed),
1
);

drop(conn);
assert_eq!(
metrics
.connections_in_use
.load(std::sync::atomic::Ordering::Relaxed),
0
);
loop {
if metrics
.connections_in_pool
.load(std::sync::atomic::Ordering::Relaxed)
== 1
{
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}

// Ensure that we increase the connections in use metric when we get a connection from the pool
let conn = pool.get_conn().await.unwrap();
assert_eq!(
metrics
.connections_in_use
.load(std::sync::atomic::Ordering::Relaxed),
1
);
assert_eq!(
metrics
.connections_in_pool
.load(std::sync::atomic::Ordering::Relaxed),
0
);

drop(conn);
assert_eq!(
metrics
.connections_in_use
.load(std::sync::atomic::Ordering::Relaxed),
0
);
loop {
if metrics
.connections_in_pool
.load(std::sync::atomic::Ordering::Relaxed)
== 1
{
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}

#[cfg(feature = "nightly")]
mod bench {
use futures_util::future::{FutureExt, TryFutureExt};
Expand Down
Loading