diff --git a/src/conn/pool/recycler.rs b/src/conn/pool/recycler.rs index 2809dc0..0b2884e 100644 --- a/src/conn/pool/recycler.rs +++ b/src/conn/pool/recycler.rs @@ -54,6 +54,69 @@ impl Recycler { eof: false, } } + + fn conn_return(&mut self, conn: Conn, pool_is_closed: bool) { + let mut exchange = self.inner.exchange.lock().unwrap(); + if pool_is_closed || exchange.available.len() >= self.pool_opts.active_bound() { + drop(exchange); + self.inner + .metrics + .discarded_superfluous_connection + .fetch_add(1, Ordering::Relaxed); + self.discard.push(conn.close_conn().boxed()); + } else { + self.inner + .metrics + .connection_returned_to_pool + .fetch_add(1, Ordering::Relaxed); + #[cfg(feature = "hdrhistogram")] + self.inner + .metrics + .connection_active_duration + .lock() + .unwrap() + .saturating_record(conn.inner.active_since.elapsed().as_micros() as u64); + exchange.available.push_back(conn.into()); + self.inner + .metrics + .connections_in_pool + .store(exchange.available.len(), Ordering::Relaxed); + if let Some(w) = exchange.waiting.pop() { + w.wake(); + } + } + } + + fn conn_decision(&mut self, conn: Conn, close: bool) { + if conn.inner.stream.is_none() || conn.inner.disconnected { + // drop unestablished connection + self.inner + .metrics + .discarded_unestablished_connection + .fetch_add(1, Ordering::Relaxed); + self.discard.push(futures_util::future::ok(()).boxed()); + } else if conn.inner.tx_status != TxStatus::None || conn.has_pending_result() { + self.inner + .metrics + .dirty_connection_return + .fetch_add(1, Ordering::Relaxed); + self.cleaning.push(conn.cleanup_for_pool().boxed()); + } else if conn.expired() || close { + self.inner + .metrics + .discarded_expired_connection + .fetch_add(1, Ordering::Relaxed); + self.discard.push(conn.close_conn().boxed()); + } else if conn.inner.reset_upon_returning_to_a_pool { + self.inner + .metrics + .resetting_connection + .fetch_add(1, Ordering::Relaxed); + self.reset.push(conn.reset_for_pool().boxed()); + } else { + self.conn_return(conn, false); + } + } } impl Future for Recycler { @@ -62,87 +125,12 @@ impl Future for Recycler { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut close = self.inner.close.load(Ordering::Acquire); - macro_rules! conn_return { - ($self:ident, $conn:ident, $pool_is_closed: expr) => {{ - let mut exchange = $self.inner.exchange.lock().unwrap(); - if $pool_is_closed || exchange.available.len() >= $self.pool_opts.active_bound() { - drop(exchange); - $self - .inner - .metrics - .discarded_superfluous_connection - .fetch_add(1, Ordering::Relaxed); - $self.discard.push($conn.close_conn().boxed()); - } else { - $self - .inner - .metrics - .connection_returned_to_pool - .fetch_add(1, Ordering::Relaxed); - #[cfg(feature = "hdrhistogram")] - $self - .inner - .metrics - .connection_active_duration - .lock() - .unwrap() - .saturating_record($conn.inner.active_since.elapsed().as_micros() as u64); - exchange.available.push_back($conn.into()); - $self - .inner - .metrics - .connections_in_pool - .store(exchange.available.len(), Ordering::Relaxed); - if let Some(w) = exchange.waiting.pop() { - w.wake(); - } - } - }}; - } - - macro_rules! conn_decision { - ($self:ident, $conn:ident) => { - if $conn.inner.stream.is_none() || $conn.inner.disconnected { - // drop unestablished connection - $self - .inner - .metrics - .discarded_unestablished_connection - .fetch_add(1, Ordering::Relaxed); - $self.discard.push(futures_util::future::ok(()).boxed()); - } else if $conn.inner.tx_status != TxStatus::None || $conn.has_pending_result() { - $self - .inner - .metrics - .dirty_connection_return - .fetch_add(1, Ordering::Relaxed); - $self.cleaning.push($conn.cleanup_for_pool().boxed()); - } else if $conn.expired() || close { - $self - .inner - .metrics - .discarded_expired_connection - .fetch_add(1, Ordering::Relaxed); - $self.discard.push($conn.close_conn().boxed()); - } else if $conn.inner.reset_upon_returning_to_a_pool { - $self - .inner - .metrics - .resetting_connection - .fetch_add(1, Ordering::Relaxed); - $self.reset.push($conn.reset_for_pool().boxed()); - } else { - conn_return!($self, $conn, false); - } - }; - } - while !self.eof { // see if there are more connections for us to recycle match Pin::new(&mut self.dropped).poll_recv(cx) { Poll::Ready(Some(Some(conn))) => { assert!(conn.inner.pool.is_none()); - conn_decision!(self, conn); + self.conn_decision(conn, close); } Poll::Ready(Some(None)) => { // someone signaled us that it's exit time @@ -165,11 +153,14 @@ impl Future for Recycler { // if we've been asked to close, reclaim any idle connections if close || self.eof { - while let Some(IdlingConn { conn, .. }) = - self.inner.exchange.lock().unwrap().available.pop_front() - { + loop { + let Some(IdlingConn { conn, .. }) = + self.inner.exchange.lock().unwrap().available.pop_front() + else { + break; + }; assert!(conn.inner.pool.is_none()); - conn_decision!(self, conn); + self.conn_decision(conn, close); } } @@ -177,7 +168,7 @@ impl Future for Recycler { loop { match Pin::new(&mut self.cleaning).poll_next(cx) { Poll::Pending | Poll::Ready(None) => break, - Poll::Ready(Some(Ok(conn))) => conn_decision!(self, conn), + Poll::Ready(Some(Ok(conn))) => self.conn_decision(conn, close), Poll::Ready(Some(Err(e))) => { // an error occurred while cleaning a connection. // what do we do? replace it with a new connection? @@ -199,7 +190,7 @@ impl Future for Recycler { loop { match Pin::new(&mut self.reset).poll_next(cx) { Poll::Pending | Poll::Ready(None) => break, - Poll::Ready(Some(Ok(conn))) => conn_return!(self, conn, close), + Poll::Ready(Some(Ok(conn))) => self.conn_return(conn, close), Poll::Ready(Some(Err(e))) => { // an error during reset. // replace with a new connection