Skip to content
Merged
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
251128.0
251201.0
34 changes: 4 additions & 30 deletions crates/db/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use {
anyhow::Context as _,
futures::{future::FusedFuture as _, FutureExt as _},
std::pin,
tokio::signal::unix::SignalKind,
futures::FutureExt,
wc::metrics::exporter_prometheus::PrometheusBuilder,
wcn_db::{config, Error},
wcn_rpc::server::run_with_signal_handling,
};

#[global_allocator]
Expand Down Expand Up @@ -38,32 +37,7 @@ fn main() -> anyhow::Result<()> {
.unwrap()
.block_on(async move {
let shutdown_signal = cfg.shutdown_signal.clone();
let mut shutdown_fut = pin::pin!(tokio::signal::ctrl_c().fuse());

let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())?;

let db_srv_fut = wcn_db::run(cfg)?;
let mut db_srv_fut = pin::pin!(db_srv_fut.fuse());

loop {
tokio::select! {
biased;

_ = &mut shutdown_fut, if !shutdown_fut.is_terminated() => {
shutdown_signal.emit();
}

_ = sigterm.recv() => {
shutdown_signal.emit();
}

_ = &mut db_srv_fut => {
tracing::info!("database server stopped");
break;
}
}
}

Ok(())
let db_src_fut = wcn_db::run(cfg)?.map(|_| tracing::info!("database server stopped"));
run_with_signal_handling(shutdown_signal, db_src_fut).await
})
}
35 changes: 6 additions & 29 deletions crates/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,16 @@ use {
anyhow::Context,
base64::Engine as _,
futures::FutureExt,
futures_concurrency::future::Race as _,
libp2p_identity::Keypair,
serde::Deserialize,
std::{
net::{Ipv4Addr, SocketAddrV4, TcpListener},
pin::pin,
time::Duration,
},
tokio::signal::unix::SignalKind,
wc::metrics::exporter_prometheus::{PrometheusBuilder, PrometheusHandle},
wcn_cluster::smart_contract,
wcn_node::Config,
wcn_rpc::server::ShutdownSignal,
wcn_rpc::server::{run_with_signal_handling, ShutdownSignal},
};

#[global_allocator]
Expand Down Expand Up @@ -74,32 +71,12 @@ fn main() -> anyhow::Result<()> {
.build()
.unwrap()
.block_on(async move {
enum FutureOutput {
CtrlC,
SigTerm,
Node,
}

let shutdown_signal = cfg.shutdown_signal.clone();

let mut shutdown_fut =
pin!(tokio::signal::ctrl_c().map(|_| FutureOutput::CtrlC).fuse());

let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())?;
let mut sigterm_fut = pin!(sigterm.recv().map(|_| FutureOutput::SigTerm).fuse());

let mut node_fut = pin!(wcn_node::run(cfg).await?.map(|_| FutureOutput::Node));

loop {
match (&mut shutdown_fut, &mut sigterm_fut, &mut node_fut)
.race()
.await
{
FutureOutput::CtrlC => shutdown_signal.emit(),
FutureOutput::SigTerm => shutdown_signal.emit(),
FutureOutput::Node => return Ok(()),
}
}
let run_fut = wcn_node::run(cfg)
.await?
.map(|_| tracing::info!("node stopped"));
run_with_signal_handling::<_, anyhow::Error>(shutdown_signal, run_fut).await?;
Ok(())
})
}

Expand Down
38 changes: 38 additions & 0 deletions crates/rpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use {
tap::{Pipe as _, TapFallible as _},
tokio::{
io::{AsyncReadExt, AsyncWriteExt},
signal::unix::SignalKind,
sync::{OwnedSemaphorePermit, Semaphore},
},
tokio_serde::Deserializer as _,
Expand Down Expand Up @@ -252,6 +253,43 @@ impl ShutdownSignal {
}
}

/// Runs `f` while listening for termination signals. If any terminal signal
/// is sent then emits `snutdown_signal`.
///
/// Note: this function does not care whether `f` and `shutdown_signal` are
/// connected or not. The caller must ensure they are.
pub async fn run_with_signal_handling<F, E: From<io::Error>>(
shutdown_signal: ShutdownSignal,
f: F,
) -> Result<(), E>
where
F: futures::Future<Output = ()>,
{
enum FutureOutput {
CtrlC,
SigTerm,
Node,
}

let mut shutdown_fut = pin!(tokio::signal::ctrl_c().map(|_| FutureOutput::CtrlC).fuse());

let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())?;
let mut sigterm_fut = pin!(sigterm.recv().map(|_| FutureOutput::SigTerm).fuse());

let mut f_mapped = pin!(f.map(|_| FutureOutput::Node));

loop {
match (&mut shutdown_fut, &mut sigterm_fut, &mut f_mapped)
.race()
.await
{
FutureOutput::CtrlC => shutdown_signal.emit(),
FutureOutput::SigTerm => shutdown_signal.emit(),
FutureOutput::Node => return Ok(()),
}
}
}

mod sealed {
use super::*;

Expand Down