Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 73 additions & 82 deletions src/conn/pool/recycler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,69 @@ impl Recycler {
eof: false,
}
}

fn conn_return(&mut self, conn: Conn, pool_is_closed: bool) {
let mut exchange = self.inner.exchange.lock().unwrap();
if pool_is_closed || exchange.available.len() >= self.pool_opts.active_bound() {
drop(exchange);
self.inner
.metrics
.discarded_superfluous_connection
.fetch_add(1, Ordering::Relaxed);
self.discard.push(conn.close_conn().boxed());
} else {
self.inner
.metrics
.connection_returned_to_pool
.fetch_add(1, Ordering::Relaxed);
#[cfg(feature = "hdrhistogram")]
self.inner
.metrics
.connection_active_duration
.lock()
.unwrap()
.saturating_record(conn.inner.active_since.elapsed().as_micros() as u64);
exchange.available.push_back(conn.into());
self.inner
.metrics
.connections_in_pool
.store(exchange.available.len(), Ordering::Relaxed);
if let Some(w) = exchange.waiting.pop() {
w.wake();
}
}
}

fn conn_decision(&mut self, conn: Conn, close: bool) {
if conn.inner.stream.is_none() || conn.inner.disconnected {
// drop unestablished connection
self.inner
.metrics
.discarded_unestablished_connection
.fetch_add(1, Ordering::Relaxed);
self.discard.push(futures_util::future::ok(()).boxed());
} else if conn.inner.tx_status != TxStatus::None || conn.has_pending_result() {
self.inner
.metrics
.dirty_connection_return
.fetch_add(1, Ordering::Relaxed);
self.cleaning.push(conn.cleanup_for_pool().boxed());
} else if conn.expired() || close {
self.inner
.metrics
.discarded_expired_connection
.fetch_add(1, Ordering::Relaxed);
self.discard.push(conn.close_conn().boxed());
} else if conn.inner.reset_upon_returning_to_a_pool {
self.inner
.metrics
.resetting_connection
.fetch_add(1, Ordering::Relaxed);
self.reset.push(conn.reset_for_pool().boxed());
} else {
self.conn_return(conn, false);
}
}
}

impl Future for Recycler {
Expand All @@ -62,87 +125,12 @@ impl Future for Recycler {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut close = self.inner.close.load(Ordering::Acquire);

macro_rules! conn_return {
($self:ident, $conn:ident, $pool_is_closed: expr) => {{
let mut exchange = $self.inner.exchange.lock().unwrap();
if $pool_is_closed || exchange.available.len() >= $self.pool_opts.active_bound() {
drop(exchange);
$self
.inner
.metrics
.discarded_superfluous_connection
.fetch_add(1, Ordering::Relaxed);
$self.discard.push($conn.close_conn().boxed());
} else {
$self
.inner
.metrics
.connection_returned_to_pool
.fetch_add(1, Ordering::Relaxed);
#[cfg(feature = "hdrhistogram")]
$self
.inner
.metrics
.connection_active_duration
.lock()
.unwrap()
.saturating_record($conn.inner.active_since.elapsed().as_micros() as u64);
exchange.available.push_back($conn.into());
$self
.inner
.metrics
.connections_in_pool
.store(exchange.available.len(), Ordering::Relaxed);
if let Some(w) = exchange.waiting.pop() {
w.wake();
}
}
}};
}

macro_rules! conn_decision {
($self:ident, $conn:ident) => {
if $conn.inner.stream.is_none() || $conn.inner.disconnected {
// drop unestablished connection
$self
.inner
.metrics
.discarded_unestablished_connection
.fetch_add(1, Ordering::Relaxed);
$self.discard.push(futures_util::future::ok(()).boxed());
} else if $conn.inner.tx_status != TxStatus::None || $conn.has_pending_result() {
$self
.inner
.metrics
.dirty_connection_return
.fetch_add(1, Ordering::Relaxed);
$self.cleaning.push($conn.cleanup_for_pool().boxed());
} else if $conn.expired() || close {
$self
.inner
.metrics
.discarded_expired_connection
.fetch_add(1, Ordering::Relaxed);
$self.discard.push($conn.close_conn().boxed());
} else if $conn.inner.reset_upon_returning_to_a_pool {
$self
.inner
.metrics
.resetting_connection
.fetch_add(1, Ordering::Relaxed);
$self.reset.push($conn.reset_for_pool().boxed());
} else {
conn_return!($self, $conn, false);
}
};
}

while !self.eof {
// see if there are more connections for us to recycle
match Pin::new(&mut self.dropped).poll_recv(cx) {
Poll::Ready(Some(Some(conn))) => {
assert!(conn.inner.pool.is_none());
conn_decision!(self, conn);
self.conn_decision(conn, close);
}
Poll::Ready(Some(None)) => {
// someone signaled us that it's exit time
Expand All @@ -165,19 +153,22 @@ impl Future for Recycler {

// if we've been asked to close, reclaim any idle connections
if close || self.eof {
while let Some(IdlingConn { conn, .. }) =
self.inner.exchange.lock().unwrap().available.pop_front()
{
loop {
let Some(IdlingConn { conn, .. }) =
self.inner.exchange.lock().unwrap().available.pop_front()
else {
break;
};
assert!(conn.inner.pool.is_none());
conn_decision!(self, conn);
self.conn_decision(conn, close);
}
}

// are any dirty connections ready for us to reclaim?
loop {
match Pin::new(&mut self.cleaning).poll_next(cx) {
Poll::Pending | Poll::Ready(None) => break,
Poll::Ready(Some(Ok(conn))) => conn_decision!(self, conn),
Poll::Ready(Some(Ok(conn))) => self.conn_decision(conn, close),
Poll::Ready(Some(Err(e))) => {
// an error occurred while cleaning a connection.
// what do we do? replace it with a new connection?
Expand All @@ -199,7 +190,7 @@ impl Future for Recycler {
loop {
match Pin::new(&mut self.reset).poll_next(cx) {
Poll::Pending | Poll::Ready(None) => break,
Poll::Ready(Some(Ok(conn))) => conn_return!(self, conn, close),
Poll::Ready(Some(Ok(conn))) => self.conn_return(conn, close),
Poll::Ready(Some(Err(e))) => {
// an error during reset.
// replace with a new connection
Expand Down
Loading