diff --git a/crates/kumod/src/ready_queue.rs b/crates/kumod/src/ready_queue.rs index b2063ce0a..8f3bd0ace 100644 --- a/crates/kumod/src/ready_queue.rs +++ b/crates/kumod/src/ready_queue.rs @@ -785,7 +785,6 @@ pub struct Dispatcher { pub path_config: ConfigHandle, pub mx: Option>, pub metrics: DeliveryMetrics, - pub shutting_down: ShutdownSubcription, pub activity: Activity, pub egress_source: EgressSource, pub egress_pool: String, @@ -861,7 +860,6 @@ impl Dispatcher { msg: None, path_config, metrics, - shutting_down: ShutdownSubcription::get(), activity, egress_source, egress_pool, @@ -907,9 +905,13 @@ impl Dispatcher { let mut connection_failures = vec![]; let mut num_opportunistic_tls_failures = 0; + let mut shutting_down = ShutdownSubcription::get(); loop { - if !dispatcher.wait_for_message(&mut *queue_dispatcher).await? { + if !dispatcher + .wait_for_message(&mut *queue_dispatcher, &mut shutting_down) + .await? + { // No more messages within our idle time; we can close // the connection tracing::debug!("{} Idling out connection", dispatcher.name); @@ -917,14 +919,20 @@ impl Dispatcher { queue_dispatcher.close_connection(&mut dispatcher).await?; return Ok(()); } - if dispatcher.activity.is_shutting_down() { - tracing::debug!("{} shutting down", dispatcher.name); - dispatcher.lease.release().await; - queue_dispatcher.close_connection(&mut dispatcher).await?; - return Ok(()); - } - if let Err(err) = queue_dispatcher.attempt_connection(&mut dispatcher).await { + let result = tokio::select! { + _ = shutting_down.shutting_down() => { + tracing::debug!("{} shutting down", dispatcher.name); + dispatcher.lease.release().await; + queue_dispatcher.close_connection(&mut dispatcher).await?; + return Ok(()); + } + result = queue_dispatcher.attempt_connection(&mut dispatcher) => { + result + } + }; + + if let Err(err) = result { if OpportunisticInsecureTlsHandshakeError::is_match_anyhow(&err) { num_opportunistic_tls_failures += 1; } @@ -1344,10 +1352,11 @@ impl Dispatcher { self.suspended.as_ref().cloned() } - #[instrument(skip(self))] + #[instrument(skip(self, shutting_down))] async fn wait_for_message( &mut self, queue_dispatcher: &mut dyn QueueDispatcher, + shutting_down: &mut ShutdownSubcription, ) -> anyhow::Result { if self.activity.is_shutting_down() { if let Some(msg) = self.msg.take() { @@ -1442,7 +1451,7 @@ impl Dispatcher { } } } - _ = self.shutting_down.shutting_down() => { + _ = shutting_down.shutting_down() => { return Ok(false); } };