Skip to content

Commit

Permalink
ready_queue: improve shutdown latency wrt. slow connections
Browse files Browse the repository at this point in the history
This commit allows a shutdown signal to interrupt a connection attempt.
Previously we'd be at the mercy of the internal connection timeout for
this; the shutdown could interrupt a connected-but-idle session, but not
a connecting-in-progress one.
  • Loading branch information
wez committed Aug 27, 2024
1 parent 9a56413 commit 8972ada
Showing 1 changed file with 21 additions and 12 deletions.
33 changes: 21 additions & 12 deletions crates/kumod/src/ready_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,6 @@ pub struct Dispatcher {
pub path_config: ConfigHandle<EgressPathConfig>,
pub mx: Option<Arc<MailExchanger>>,
pub metrics: DeliveryMetrics,
pub shutting_down: ShutdownSubcription,
pub activity: Activity,
pub egress_source: EgressSource,
pub egress_pool: String,
Expand Down Expand Up @@ -861,7 +860,6 @@ impl Dispatcher {
msg: None,
path_config,
metrics,
shutting_down: ShutdownSubcription::get(),
activity,
egress_source,
egress_pool,
Expand Down Expand Up @@ -907,24 +905,34 @@ impl Dispatcher {

let mut connection_failures = vec![];
let mut num_opportunistic_tls_failures = 0;
let mut shutting_down = ShutdownSubcription::get();

loop {
if !dispatcher.wait_for_message(&mut *queue_dispatcher).await? {
if !dispatcher
.wait_for_message(&mut *queue_dispatcher, &mut shutting_down)
.await?
{
// No more messages within our idle time; we can close
// the connection
tracing::debug!("{} Idling out connection", dispatcher.name);
dispatcher.lease.release().await;
queue_dispatcher.close_connection(&mut dispatcher).await?;
return Ok(());
}
if dispatcher.activity.is_shutting_down() {
tracing::debug!("{} shutting down", dispatcher.name);
dispatcher.lease.release().await;
queue_dispatcher.close_connection(&mut dispatcher).await?;
return Ok(());
}

if let Err(err) = queue_dispatcher.attempt_connection(&mut dispatcher).await {
let result = tokio::select! {
_ = shutting_down.shutting_down() => {
tracing::debug!("{} shutting down", dispatcher.name);
dispatcher.lease.release().await;
queue_dispatcher.close_connection(&mut dispatcher).await?;
return Ok(());
}
result = queue_dispatcher.attempt_connection(&mut dispatcher) => {
result
}
};

if let Err(err) = result {
if OpportunisticInsecureTlsHandshakeError::is_match_anyhow(&err) {
num_opportunistic_tls_failures += 1;
}
Expand Down Expand Up @@ -1344,10 +1352,11 @@ impl Dispatcher {
self.suspended.as_ref().cloned()
}

#[instrument(skip(self))]
#[instrument(skip(self, shutting_down))]
async fn wait_for_message(
&mut self,
queue_dispatcher: &mut dyn QueueDispatcher,
shutting_down: &mut ShutdownSubcription,
) -> anyhow::Result<bool> {
if self.activity.is_shutting_down() {
if let Some(msg) = self.msg.take() {
Expand Down Expand Up @@ -1442,7 +1451,7 @@ impl Dispatcher {
}
}
}
_ = self.shutting_down.shutting_down() => {
_ = shutting_down.shutting_down() => {
return Ok(false);
}
};
Expand Down

0 comments on commit 8972ada

Please sign in to comment.