From d0716cd3aaa3243d107e704182b79fe746b3876e Mon Sep 17 00:00:00 2001 From: Dotan Simha Date: Thu, 25 Sep 2025 13:56:02 +0300 Subject: [PATCH 1/2] feat(router): added a simple wrapper around background tasks --- Cargo.lock | 1 + bin/router/Cargo.toml | 1 + bin/router/src/background_tasks/mod.rs | 52 ++++++++++++++++++++++++++ bin/router/src/lib.rs | 13 ++++++- 4 files changed, 65 insertions(+), 2 deletions(-) create mode 100644 bin/router/src/background_tasks/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 7c7c8638..fc10e90f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1264,6 +1264,7 @@ dependencies = [ "sonic-rs", "thiserror 2.0.16", "tokio", + "tokio-util", "tracing", "tracing-subscriber", "tracing-tree", diff --git a/bin/router/Cargo.toml b/bin/router/Cargo.toml index f9e8c5b0..9c90eefb 100644 --- a/bin/router/Cargo.toml +++ b/bin/router/Cargo.toml @@ -42,3 +42,4 @@ mimalloc = { version = "0.1.47", features = ["override"] } moka = { version = "0.12.10", features = ["future"] } ulid = "1.2.1" ntex = { version = "2", features = ["tokio"] } +tokio-util = "0.7.16" diff --git a/bin/router/src/background_tasks/mod.rs b/bin/router/src/background_tasks/mod.rs new file mode 100644 index 00000000..bd80de84 --- /dev/null +++ b/bin/router/src/background_tasks/mod.rs @@ -0,0 +1,52 @@ +use async_trait::async_trait; +use tokio_util::sync::CancellationToken; + +#[async_trait] +pub trait BackgroundTask: Send + Sync { + fn id(&self) -> &str; + async fn run(&self, token: CancellationToken); +} + +use futures::future::join_all; +use std::sync::Arc; +use tokio::task::JoinHandle; +use tracing::info; + +pub struct BackgroundTasksManager { + cancellation_token: CancellationToken, + task_handles: Vec>, +} + +impl BackgroundTasksManager { + pub fn new() -> Self { + Self { + cancellation_token: CancellationToken::new(), + task_handles: Vec::new(), + } + } + + pub fn register_task(&mut self, task: T) + where + T: BackgroundTask + 'static, + { + info!("Registering background task: {}", task.id()); + let child_token = self.cancellation_token.clone(); + let task_arc = Arc::new(task); + + let handle = tokio::spawn(async move { + task_arc.run(child_token).await; + }); + + self.task_handles.push(handle); + } + + pub async fn shutdown(self) { + info!("shutdown triggered, stopping all background tasks..."); + self.cancellation_token.cancel(); + + info!("waiting for background tasks to finish..."); + join_all(self.task_handles).await; + + println!("all background tasks have been shut down gracefully."); + } +} diff --git a/bin/router/src/lib.rs b/bin/router/src/lib.rs index fc484629..1af20c50 100644 --- a/bin/router/src/lib.rs +++ b/bin/router/src/lib.rs @@ -1,3 +1,4 @@ +mod background_tasks; mod http_utils; mod logger; mod pipeline; @@ -6,6 +7,7 @@ mod shared_state; use std::sync::Arc; use crate::{ + background_tasks::BackgroundTasksManager, http_utils::{health::health_check_handler, landing_page::landing_page_handler}, logger::configure_logging, pipeline::graphql_request_handler, @@ -19,6 +21,7 @@ use ntex::{ }; use hive_router_query_planner::utils::parsing::parse_schema; +use tracing::info; async fn graphql_endpoint_handler( mut request: HttpRequest, @@ -37,8 +40,9 @@ pub async fn router_entrypoint() -> Result<(), Box> { let parsed_schema = parse_schema(&supergraph_sdl); let addr = router_config.http.address(); let shared_state = RouterSharedState::new(parsed_schema, router_config); + let mut bg_tasks_manager = BackgroundTasksManager::new(); - web::HttpServer::new(move || { + let maybe_error = web::HttpServer::new(move || { web::App::new() .state(shared_state.clone()) .route("/graphql", web::to(graphql_endpoint_handler)) @@ -48,5 +52,10 @@ pub async fn router_entrypoint() -> Result<(), Box> { .bind(addr)? .run() .await - .map_err(|err| err.into()) + .map_err(|err| err.into()); + + info!("server stopped, clearning background tasks"); + bg_tasks_manager.shutdown().await; + + maybe_error } From fc8bd9dbd86748f26dcecfcf3d7f056886e2fa1c Mon Sep 17 00:00:00 2001 From: Dotan Simha Date: Thu, 25 Sep 2025 13:57:50 +0300 Subject: [PATCH 2/2] clean it up a bit --- bin/router/src/background_tasks/mod.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/bin/router/src/background_tasks/mod.rs b/bin/router/src/background_tasks/mod.rs index bd80de84..98e5f237 100644 --- a/bin/router/src/background_tasks/mod.rs +++ b/bin/router/src/background_tasks/mod.rs @@ -1,5 +1,9 @@ use async_trait::async_trait; +use futures::future::join_all; +use std::sync::Arc; +use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; +use tracing::{debug, info}; #[async_trait] pub trait BackgroundTask: Send + Sync { @@ -7,11 +11,6 @@ pub trait BackgroundTask: Send + Sync { async fn run(&self, token: CancellationToken); } -use futures::future::join_all; -use std::sync::Arc; -use tokio::task::JoinHandle; -use tracing::info; - pub struct BackgroundTasksManager { cancellation_token: CancellationToken, task_handles: Vec>, @@ -29,7 +28,7 @@ impl BackgroundTasksManager { where T: BackgroundTask + 'static, { - info!("Registering background task: {}", task.id()); + info!("registering background task: {}", task.id()); let child_token = self.cancellation_token.clone(); let task_arc = Arc::new(task); @@ -44,7 +43,7 @@ impl BackgroundTasksManager { info!("shutdown triggered, stopping all background tasks..."); self.cancellation_token.cancel(); - info!("waiting for background tasks to finish..."); + debug!("waiting for background tasks to finish..."); join_all(self.task_handles).await; println!("all background tasks have been shut down gracefully.");