From dc34f1b1aaf34c142ee99fa306610d6792e6df06 Mon Sep 17 00:00:00 2001 From: Marvel-Gu Date: Fri, 12 Dec 2025 15:39:08 +0800 Subject: [PATCH 1/8] feat: proportional jitter --- Cargo.toml | 1 + migrations/20241105164503_2.sql | 5 +++-- src/task/retry_policy.rs | 26 +++++++++++++++++++++++++- 3 files changed, 29 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6f0dee8..80bf270 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ ulid = { version = "1.1.3", features = ["uuid"] } uuid = { version = "1.10.0", features = ["v4", "serde"] } num_cpus = "1.16.0" tokio-util = "0.7.12" +rand = "0.9.2" [dev-dependencies] futures = "0.3.30" diff --git a/migrations/20241105164503_2.sql b/migrations/20241105164503_2.sql index 750b178..f942fa8 100644 --- a/migrations/20241105164503_2.sql +++ b/migrations/20241105164503_2.sql @@ -18,12 +18,13 @@ create type underway.task_retry_policy as ( max_attempts int, initial_interval_ms int, max_interval_ms int, - backoff_coefficient float + backoff_coefficient float, + jitter_factor float ); alter table underway.task add column if not exists retry_policy underway.task_retry_policy not null - default row(5, 1000, 60000, 2.0)::underway.task_retry_policy; + default row(5, 1000, 60000, 2.0, 0.5)::underway.task_retry_policy; alter table underway.task add column if not exists completed_at timestamp with time zone; diff --git a/src/task/retry_policy.rs b/src/task/retry_policy.rs index cf1d342..ce6d679 100644 --- a/src/task/retry_policy.rs +++ b/src/task/retry_policy.rs @@ -1,4 +1,5 @@ use jiff::{Span, ToSpan}; +use rand::Rng; /// Configuration of a policy for retries in case of task failure. /// @@ -19,6 +20,7 @@ pub struct RetryPolicy { pub(crate) initial_interval_ms: i32, pub(crate) max_interval_ms: i32, pub(crate) backoff_coefficient: f64, + pub(crate) jitter_factor: f64, } pub(crate) type RetryCount = i32; @@ -40,7 +42,8 @@ impl RetryPolicy { pub(crate) fn calculate_delay(&self, retry_count: RetryCount) -> Span { let base_delay = self.initial_interval_ms as f64; let backoff_delay = base_delay * self.backoff_coefficient.powi(retry_count - 1); - let delay = backoff_delay.min(self.max_interval_ms as f64) as i64; + let target_delay = backoff_delay.min(self.max_interval_ms as f64); + let delay=(target_delay * (1.0 - self.jitter_factor)+rand::thread_rng().gen_range(0.0..=(target_delay * self.jitter_factor))) as i64; delay.milliseconds() } } @@ -50,6 +53,7 @@ const DEFAULT_RETRY_POLICY: RetryPolicy = RetryPolicy { initial_interval_ms: 1_000, max_interval_ms: 60_000, backoff_coefficient: 2.0, + jitter_factor: 0.5, }; impl Default for RetryPolicy { @@ -146,6 +150,26 @@ impl Builder { self } + /// The jitter_factor is a coefficient (typically between 0.0 and 1.0) + /// that determines what percentage of the backoff duration should + /// be randomized. It controls the balance between predictability + /// and collision avoidance. + /// + /// Default value is `0.5`. + /// + /// # Example + /// + /// ```rust + /// use underway::task::RetryPolicy; + /// + /// // Set a backoff coefficient of one point five. + /// let retry_policy_builder = RetryPolicy::builder().jitter_factor(0.5); + /// ``` + pub const fn jitter_factor(mut self, jitter_factor: f64) -> Self { + self.inner.jitter_factor=jitter_factor; + self + } + /// Builds the `RetryPolicy` with the configured parameters. /// /// # Example From cb19f9deb7f74e902fcc4ea6c2a8c42158050a21 Mon Sep 17 00:00:00 2001 From: Marvel-Gu Date: Fri, 12 Dec 2025 16:03:59 +0800 Subject: [PATCH 2/8] feat: reconnection use exponential backoff with jitter --- src/worker.rs | 192 ++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 154 insertions(+), 38 deletions(-) diff --git a/src/worker.rs b/src/worker.rs index 51201e0..e56e6d1 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -133,6 +133,7 @@ use sqlx::{ Acquire, PgConnection, }; use tokio::{sync::Semaphore, task::JoinSet}; +use tokio::time::sleep; use tokio_util::sync::CancellationToken; use tracing::instrument; @@ -176,6 +177,9 @@ pub struct Worker { // Limits the number of concurrent `Task::execute` invocations this worker will be allowed. concurrency_limit: usize, + // Policy for reconnection backoff when PostgreSQL connection is lost. + reconnection_policy: RetryPolicy, + // When this token is cancelled the queue has been shutdown. shutdown_token: CancellationToken, } @@ -186,6 +190,7 @@ impl Clone for Worker { queue: Arc::clone(&self.queue), task: Arc::clone(&self.task), concurrency_limit: self.concurrency_limit, + reconnection_policy: self.reconnection_policy, shutdown_token: self.shutdown_token.clone(), } } @@ -243,6 +248,7 @@ impl Worker { queue, task, concurrency_limit: num_cpus::get(), + reconnection_policy: RetryPolicy::default(), shutdown_token: CancellationToken::new(), } } @@ -344,6 +350,68 @@ impl Worker { self.shutdown_token = shutdown_token; } + /// Sets the reconnection policy for PostgreSQL connection failures. + /// + /// This policy controls how the worker retries connecting when the PostgreSQL + /// connection is lost. Uses exponential backoff to avoid overwhelming the database. + /// + /// Defaults to 1 second initial interval, 60 second max interval, 2.0 coefficient and 0.5 jitter_factor. + /// + /// **Note**: The `max_attempts` field is ignored for reconnection - the worker will + /// keep retrying until successful or until shutdown. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::{PgPool, Transaction, Postgres}; + /// # use underway::{Task, task::Result as TaskResult, Queue, Worker}; + /// use underway::task::RetryPolicy; + /// + /// # struct ExampleTask; + /// # impl Task for ExampleTask { + /// # type Input = (); + /// # type Output = (); + /// # async fn execute( + /// # &self, + /// # _: Transaction<'_, Postgres>, + /// # _: Self::Input, + /// # ) -> TaskResult { + /// # Ok(()) + /// # } + /// # } + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # let queue = Queue::builder() + /// # .name("example") + /// # .pool(pool.clone()) + /// # .build() + /// # .await?; + /// # let task = ExampleTask; + /// # let mut worker = Worker::new(queue.into(), task); + /// # /* + /// let mut worker = { /* A `Worker`. */ }; + /// # */ + /// # + /// + /// // Set a custom reconnection policy using RetryPolicy. + /// let policy = RetryPolicy::builder() + /// .initial_interval_ms(2_000) // 2 seconds + /// .max_interval_ms(60_000) // 1 minute + /// .backoff_coefficient(2.5) + /// .jitter_factor(0.5) + /// .build(); + /// worker.set_reconnection_policy(policy); + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` + pub fn set_reconnection_policy(&mut self, reconnection_policy: RetryPolicy) { + self.reconnection_policy = reconnection_policy; + } + /// Cancels the shutdown token and begins a graceful shutdown of in-progress /// tasks. /// @@ -505,58 +573,106 @@ impl Worker { #[instrument(skip(self), fields(queue.name = self.queue.name), err)] pub async fn run_every(&self, period: Span) -> Result { let mut polling_interval = tokio::time::interval(period.try_into()?); - - // Set up a listener for shutdown notifications - let mut shutdown_listener = PgListener::connect_with(&self.queue.pool).await?; let chan = shutdown_channel(); - shutdown_listener.listen(chan).await?; - - // Set up a listener for task change notifications - let mut task_change_listener = PgListener::connect_with(&self.queue.pool).await?; - task_change_listener.listen("task_change").await?; - let concurrency_limit = Arc::new(Semaphore::new(self.concurrency_limit)); let mut processing_tasks = JoinSet::new(); - loop { - tokio::select! { - notify_shutdown = shutdown_listener.recv() => { - match notify_shutdown { - Ok(_) => { - self.shutdown_token.cancel(); - }, - - Err(err) => { - tracing::error!(%err, "Postgres shutdown notification error"); - } - } + let mut retry_count: RetryCount = 1; + + // Outer loop: handle reconnection logic + 'reconnect: loop { + // Compute current reconnect backoff + let reconnect_backoff_span = self.reconnection_policy.calculate_delay(retry_count); + let reconnect_backoff: Duration = reconnect_backoff_span.try_into()?; + + // Try to establish shutdown listener connection + let mut shutdown_listener = match PgListener::connect_with(&self.queue.pool).await { + Ok(listener) => listener, + Err(err) => { + tracing::error!(%err, backoff_secs = reconnect_backoff.as_secs(), + attempt = retry_count, + "Failed to connect shutdown listener, retrying after backoff"); + sleep(reconnect_backoff).await; + retry_count = retry_count.saturating_add(1); + continue 'reconnect; } + }; + + if let Err(err) = shutdown_listener.listen(chan).await { + tracing::error!(%err, backoff_secs = reconnect_backoff.as_secs(), + attempt = retry_count, + "Failed to listen on shutdown channel, retrying after backoff"); + sleep(reconnect_backoff).await; + retry_count = retry_count.saturating_add(1); + continue 'reconnect; + } - _ = self.shutdown_token.cancelled() => { - self.handle_shutdown(&mut processing_tasks).await?; - break + // Try to establish task change listener connection + let mut task_change_listener = match PgListener::connect_with(&self.queue.pool).await { + Ok(listener) => listener, + Err(err) => { + tracing::error!(%err, backoff_secs = reconnect_backoff.as_secs(), + attempt = retry_count, + "Failed to connect task change listener, retrying after backoff"); + sleep(reconnect_backoff).await; + retry_count = retry_count.saturating_add(1); + continue 'reconnect; } + }; + + if let Err(err) = task_change_listener.listen("task_change").await { + tracing::error!(%err, backoff_secs = reconnect_backoff.as_secs(), + attempt = retry_count, + "Failed to listen on task_change channel, retrying after backoff"); + sleep(reconnect_backoff).await; + retry_count = retry_count.saturating_add(1); + continue 'reconnect; + } - // Listen for new pending tasks. - notify_task_change = task_change_listener.recv() => { - match notify_task_change { - Ok(task_change) => self.handle_task_change(task_change, concurrency_limit.clone(), &mut processing_tasks).await?, - - Err(err) => { - tracing::error!(%err, "Postgres task change notification error"); + // Connection succeeded, reset retry counter + tracing::info!("PostgreSQL listeners connected successfully"); + retry_count = 1; + + // Inner loop: handle normal events + loop { + tokio::select! { + notify_shutdown = shutdown_listener.recv() => { + match notify_shutdown { + Ok(_) => { + self.shutdown_token.cancel(); + }, + Err(err) => { + tracing::warn!(%err, "Shutdown listener connection lost, reconnecting"); + continue 'reconnect; + } } - }; + } - } + _ = self.shutdown_token.cancelled() => { + self.handle_shutdown(&mut processing_tasks).await?; + return Ok(()); + } + + // Listen for new pending tasks + notify_task_change = task_change_listener.recv() => { + match notify_task_change { + Ok(task_change) => { + self.handle_task_change(task_change, concurrency_limit.clone(), &mut processing_tasks).await?; + }, + Err(err) => { + tracing::warn!(%err, "Task change listener connection lost, reconnecting"); + continue 'reconnect; + } + } + } - // Pending task polling fallback. - _ = polling_interval.tick() => { - self.trigger_task_processing(concurrency_limit.clone(), &mut processing_tasks).await; + // Pending task polling fallback + _ = polling_interval.tick() => { + self.trigger_task_processing(concurrency_limit.clone(), &mut processing_tasks).await; + } } } } - - Ok(()) } async fn handle_shutdown(&self, processing_tasks: &mut JoinSet<()>) -> Result { From 598562b178bd42abed9bd3e057494a00900d69c1 Mon Sep 17 00:00:00 2001 From: Marvel-Gu Date: Tue, 16 Dec 2025 21:01:44 +0800 Subject: [PATCH 3/8] feat: reconnection use exponential backoff with jitter --- src/scheduler.rs | 199 ++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 161 insertions(+), 38 deletions(-) diff --git a/src/scheduler.rs b/src/scheduler.rs index fcaa7d4..e11d73f 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -3,12 +3,13 @@ use std::{result::Result as StdResult, str::FromStr, sync::Arc, time::Duration a use jiff::{tz::TimeZone, Zoned}; use jiff_cron::{Schedule, ScheduleIterator}; use sqlx::postgres::{PgAdvisoryLock, PgListener}; -use tokio::time::Instant; +use tokio::time::{sleep, Instant}; use tokio_util::sync::CancellationToken; use tracing::instrument; use crate::{ queue::{shutdown_channel, try_acquire_advisory_lock, Error as QueueError}, + task::{RetryCount, RetryPolicy}, Queue, Task, }; @@ -51,6 +52,9 @@ pub struct Scheduler { // When this token is cancelled the queue has been shutdown. shutdown_token: CancellationToken, + + // Policy for reconnection backoff when PostgreSQL connection is lost. + reconnection_policy: RetryPolicy, } impl Scheduler { @@ -107,6 +111,7 @@ impl Scheduler { queue_lock, task, shutdown_token: CancellationToken::new(), + reconnection_policy: RetryPolicy::default(), } } @@ -159,6 +164,68 @@ impl Scheduler { self.shutdown_token = shutdown_token; } + /// Sets the reconnection policy for PostgreSQL connection failures. + /// + /// This policy controls how the scheduler retries connecting when the PostgreSQL + /// connection is lost. Uses exponential backoff to avoid overwhelming the database. + /// + /// Defaults to 1 second initial interval, 60 second max interval, 2.0 coefficient and 0.5 jitter_factor. + /// + /// **Note**: The `max_attempts` field is ignored for reconnection - the scheduler will + /// keep retrying until successful or until shutdown. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::{PgPool, Transaction, Postgres}; + /// # use underway::{Task, task::Result as TaskResult, Queue, Scheduler}; + /// use underway::task::RetryPolicy; + /// + /// # struct ExampleTask; + /// # impl Task for ExampleTask { + /// # type Input = (); + /// # type Output = (); + /// # async fn execute( + /// # &self, + /// # _: Transaction<'_, Postgres>, + /// # _: Self::Input, + /// # ) -> TaskResult { + /// # Ok(()) + /// # } + /// # } + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # let queue = Queue::builder() + /// # .name("example") + /// # .pool(pool.clone()) + /// # .build() + /// # .await?; + /// # let task = ExampleTask; + /// # let mut scheduler = Scheduler::new(queue.into(), task); + /// # /* + /// let mut scheduler = { /* A `Scheduler`. */ }; + /// # */ + /// # + /// + /// // Set a custom reconnection policy using RetryPolicy. + /// let policy = RetryPolicy::builder() + /// .initial_interval_ms(2_000) // 2 seconds + /// .max_interval_ms(60_000) // 1 minute + /// .backoff_coefficient(2.5) + /// .jitter_factor(0.5) + /// .build(); + /// scheduler.set_reconnection_policy(policy); + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` + pub fn set_reconnection_policy(&mut self, reconnection_policy: RetryPolicy) { + self.reconnection_policy = reconnection_policy; + } + /// Cancels the shutdown token causing the scheduler to exit. /// /// ```rust,no_run @@ -258,49 +325,105 @@ impl Scheduler { /// ``` #[instrument(skip(self), fields(queue.name = self.queue.name), err)] pub async fn run(&self) -> Result { - let conn = self.queue.pool.acquire().await?; - let Some(guard) = try_acquire_advisory_lock(conn, &self.queue_lock).await? else { - tracing::trace!("Scheduler could not acquire lock, exiting"); - return Ok(()); - }; - - let Some((zoned_schedule, input)) = self.queue.task_schedule(&self.queue.pool).await? - else { - // No schedule configured, so we'll exit. - return Ok(()); - }; - - // Set up a listener for shutdown notifications - let mut shutdown_listener = PgListener::connect_with(&self.queue.pool).await?; - let chan = shutdown_channel(); - shutdown_listener.listen(chan).await?; - - // TODO: Handle updates to schedules? - - for next in zoned_schedule.iter() { - tracing::debug!(?next, "Waiting until next scheduled task enqueue"); - - tokio::select! { - notify_shutdown = shutdown_listener.recv() => { - match notify_shutdown { - Ok(_) => { - self.shutdown_token.cancel(); - }, - Err(err) => { - tracing::error!(%err, "Postgres shutdown notification error"); + let mut retry_count: RetryCount = 1; + + // Outer loop: handle reconnection logic for the scheduler's Postgres listener. + 'reconnect: loop { + // Compute current reconnect backoff + let reconnect_backoff_span = self.reconnection_policy.calculate_delay(retry_count); + let reconnect_backoff: StdDuration = reconnect_backoff_span.try_into()?; + + let conn = match self.queue.pool.acquire().await { + Ok(conn) => conn, + Err(err) => { + tracing::error!( + %err, + backoff_secs = reconnect_backoff.as_secs(), + attempt = retry_count, + "Failed to acquire database connection for scheduler, retrying after backoff" + ); + sleep(reconnect_backoff).await; + retry_count = retry_count.saturating_add(1); + continue 'reconnect; + } + }; + + let Some(guard) = try_acquire_advisory_lock(conn, &self.queue_lock).await? else { + tracing::trace!("Scheduler could not acquire lock, exiting"); + return Ok(()); + }; + + let Some((zoned_schedule, input)) = self.queue.task_schedule(&self.queue.pool).await? + else { + // No schedule configured, so we'll exit. + return Ok(()); + }; + + // Set up a listener for shutdown notifications + let mut shutdown_listener = match PgListener::connect_with(&self.queue.pool).await { + Ok(listener) => listener, + Err(err) => { + tracing::error!( + %err, + backoff_secs = reconnect_backoff.as_secs(), + attempt = retry_count, + "Failed to connect scheduler shutdown listener, retrying after backoff" + ); + sleep(reconnect_backoff).await; + retry_count = retry_count.saturating_add(1); + continue 'reconnect; + } + }; + + let chan = shutdown_channel(); + if let Err(err) = shutdown_listener.listen(chan).await { + tracing::error!( + %err, + backoff_secs = reconnect_backoff.as_secs(), + attempt = retry_count, + "Failed to listen on scheduler shutdown channel, retrying after backoff" + ); + sleep(reconnect_backoff).await; + retry_count = retry_count.saturating_add(1); + continue 'reconnect; + } + + // Connection succeeded, reset retry counter + tracing::info!("Scheduler PostgreSQL listener connected successfully"); + retry_count = 1; + + // TODO: Handle updates to schedules? + + for next in zoned_schedule.iter() { + tracing::debug!(?next, "Waiting until next scheduled task enqueue"); + + tokio::select! { + notify_shutdown = shutdown_listener.recv() => { + match notify_shutdown { + Ok(_) => { + self.shutdown_token.cancel(); + }, + Err(err) => { + tracing::warn!(%err, "Scheduler shutdown listener connection lost, reconnecting"); + continue 'reconnect; + } } } - } - _ = self.shutdown_token.cancelled() => { - guard.release_now().await?; - break - } + _ = self.shutdown_token.cancelled() => { + break; + } - _ = wait_until(&next) => { - self.process_next_schedule(&input).await? + _ = wait_until(&next) => { + self.process_next_schedule(&input).await? + } } } + + guard.release_now().await?; + + // Exit the reconnect loop once we have completed the schedule iteration. + break; } Ok(()) From 76175731170634ac9b21bba9a8ee15f8bd60ab50 Mon Sep 17 00:00:00 2001 From: Marvel-Gu Date: Tue, 16 Dec 2025 22:30:58 +0800 Subject: [PATCH 4/8] fix --- src/task/retry_policy.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/task/retry_policy.rs b/src/task/retry_policy.rs index ce6d679..eacd296 100644 --- a/src/task/retry_policy.rs +++ b/src/task/retry_policy.rs @@ -43,7 +43,7 @@ impl RetryPolicy { let base_delay = self.initial_interval_ms as f64; let backoff_delay = base_delay * self.backoff_coefficient.powi(retry_count - 1); let target_delay = backoff_delay.min(self.max_interval_ms as f64); - let delay=(target_delay * (1.0 - self.jitter_factor)+rand::thread_rng().gen_range(0.0..=(target_delay * self.jitter_factor))) as i64; + let delay=(target_delay * (1.0 - self.jitter_factor)+rand::rng().random_range(0.0..=(target_delay * self.jitter_factor))) as i64; delay.milliseconds() } } From 6b9d97699164db975ae0924c6a22c7fbcde0b3cc Mon Sep 17 00:00:00 2001 From: Marvel-Gu Date: Wed, 17 Dec 2025 21:19:34 +0800 Subject: [PATCH 5/8] fix --- src/scheduler.rs | 16 +++++++++------- src/task/retry_policy.rs | 6 ++++-- src/worker.rs | 27 ++++++++++++++------------- 3 files changed, 27 insertions(+), 22 deletions(-) diff --git a/src/scheduler.rs b/src/scheduler.rs index e11d73f..f014562 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -166,13 +166,15 @@ impl Scheduler { /// Sets the reconnection policy for PostgreSQL connection failures. /// - /// This policy controls how the scheduler retries connecting when the PostgreSQL - /// connection is lost. Uses exponential backoff to avoid overwhelming the database. + /// This policy controls how the scheduler retries connecting when the + /// PostgreSQL connection is lost. Uses exponential backoff to avoid + /// overwhelming the database. /// - /// Defaults to 1 second initial interval, 60 second max interval, 2.0 coefficient and 0.5 jitter_factor. + /// Defaults to 1 second initial interval, 60 second max interval, 2.0 + /// coefficient and 0.5 jitter_factor. /// - /// **Note**: The `max_attempts` field is ignored for reconnection - the scheduler will - /// keep retrying until successful or until shutdown. + /// **Note**: The `max_attempts` field is ignored for reconnection - the + /// scheduler will keep retrying until successful or until shutdown. /// /// # Example /// @@ -212,8 +214,8 @@ impl Scheduler { /// /// // Set a custom reconnection policy using RetryPolicy. /// let policy = RetryPolicy::builder() - /// .initial_interval_ms(2_000) // 2 seconds - /// .max_interval_ms(60_000) // 1 minute + /// .initial_interval_ms(2_000) // 2 seconds + /// .max_interval_ms(60_000) // 1 minute /// .backoff_coefficient(2.5) /// .jitter_factor(0.5) /// .build(); diff --git a/src/task/retry_policy.rs b/src/task/retry_policy.rs index eacd296..efe03b0 100644 --- a/src/task/retry_policy.rs +++ b/src/task/retry_policy.rs @@ -43,7 +43,9 @@ impl RetryPolicy { let base_delay = self.initial_interval_ms as f64; let backoff_delay = base_delay * self.backoff_coefficient.powi(retry_count - 1); let target_delay = backoff_delay.min(self.max_interval_ms as f64); - let delay=(target_delay * (1.0 - self.jitter_factor)+rand::rng().random_range(0.0..=(target_delay * self.jitter_factor))) as i64; + let delay = (target_delay * (1.0 - self.jitter_factor) + + rand::rng().random_range(0.0..=(target_delay * self.jitter_factor))) + as i64; delay.milliseconds() } } @@ -166,7 +168,7 @@ impl Builder { /// let retry_policy_builder = RetryPolicy::builder().jitter_factor(0.5); /// ``` pub const fn jitter_factor(mut self, jitter_factor: f64) -> Self { - self.inner.jitter_factor=jitter_factor; + self.inner.jitter_factor = jitter_factor; self } diff --git a/src/worker.rs b/src/worker.rs index e56e6d1..41c815b 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -132,8 +132,7 @@ use sqlx::{ postgres::{types::PgInterval, PgListener, PgNotification}, Acquire, PgConnection, }; -use tokio::{sync::Semaphore, task::JoinSet}; -use tokio::time::sleep; +use tokio::{sync::Semaphore, task::JoinSet, time::sleep}; use tokio_util::sync::CancellationToken; use tracing::instrument; @@ -352,13 +351,15 @@ impl Worker { /// Sets the reconnection policy for PostgreSQL connection failures. /// - /// This policy controls how the worker retries connecting when the PostgreSQL - /// connection is lost. Uses exponential backoff to avoid overwhelming the database. + /// This policy controls how the worker retries connecting when the + /// PostgreSQL connection is lost. Uses exponential backoff to avoid + /// overwhelming the database. /// - /// Defaults to 1 second initial interval, 60 second max interval, 2.0 coefficient and 0.5 jitter_factor. + /// Defaults to 1 second initial interval, 60 second max interval, 2.0 + /// coefficient and 0.5 jitter_factor. /// - /// **Note**: The `max_attempts` field is ignored for reconnection - the worker will - /// keep retrying until successful or until shutdown. + /// **Note**: The `max_attempts` field is ignored for reconnection - the + /// worker will keep retrying until successful or until shutdown. /// /// # Example /// @@ -398,8 +399,8 @@ impl Worker { /// /// // Set a custom reconnection policy using RetryPolicy. /// let policy = RetryPolicy::builder() - /// .initial_interval_ms(2_000) // 2 seconds - /// .max_interval_ms(60_000) // 1 minute + /// .initial_interval_ms(2_000) // 2 seconds + /// .max_interval_ms(60_000) // 1 minute /// .backoff_coefficient(2.5) /// .jitter_factor(0.5) /// .build(); @@ -589,7 +590,7 @@ impl Worker { let mut shutdown_listener = match PgListener::connect_with(&self.queue.pool).await { Ok(listener) => listener, Err(err) => { - tracing::error!(%err, backoff_secs = reconnect_backoff.as_secs(), + tracing::error!(%err, backoff_secs = reconnect_backoff.as_secs(), attempt = retry_count, "Failed to connect shutdown listener, retrying after backoff"); sleep(reconnect_backoff).await; @@ -599,7 +600,7 @@ impl Worker { }; if let Err(err) = shutdown_listener.listen(chan).await { - tracing::error!(%err, backoff_secs = reconnect_backoff.as_secs(), + tracing::error!(%err, backoff_secs = reconnect_backoff.as_secs(), attempt = retry_count, "Failed to listen on shutdown channel, retrying after backoff"); sleep(reconnect_backoff).await; @@ -611,7 +612,7 @@ impl Worker { let mut task_change_listener = match PgListener::connect_with(&self.queue.pool).await { Ok(listener) => listener, Err(err) => { - tracing::error!(%err, backoff_secs = reconnect_backoff.as_secs(), + tracing::error!(%err, backoff_secs = reconnect_backoff.as_secs(), attempt = retry_count, "Failed to connect task change listener, retrying after backoff"); sleep(reconnect_backoff).await; @@ -621,7 +622,7 @@ impl Worker { }; if let Err(err) = task_change_listener.listen("task_change").await { - tracing::error!(%err, backoff_secs = reconnect_backoff.as_secs(), + tracing::error!(%err, backoff_secs = reconnect_backoff.as_secs(), attempt = retry_count, "Failed to listen on task_change channel, retrying after backoff"); sleep(reconnect_backoff).await; From d4329350c41d3f8765d408d5ef94626394bbd4f3 Mon Sep 17 00:00:00 2001 From: Marvel-Gu Date: Fri, 19 Dec 2025 22:30:05 +0800 Subject: [PATCH 6/8] refactor: reduce duplicate code --- src/queue.rs | 96 ++++++++++++++++++++++++++++++++++++++++++++++++ src/scheduler.rs | 66 ++++++++++++--------------------- src/worker.rs | 94 ++++++++++++++--------------------------------- 3 files changed, 148 insertions(+), 108 deletions(-) diff --git a/src/queue.rs b/src/queue.rs index ff58e45..c3e9834 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -1834,6 +1834,102 @@ pub(crate) fn shutdown_channel() -> &'static str { SHUTDOWN_CHANNEL.get_or_init(|| format!("underway_shutdown_{}", Uuid::new_v4())) } +/// Helper function to connect a PostgreSQL listener with retry logic. +/// +/// This function attempts to establish a connection to a PostgreSQL listener +/// and subscribe to the specified channel. If the connection or subscription +/// fails, it will log the error and return None, allowing the caller to +/// handle the retry logic (including backoff delays). +/// +/// Returns `Some(listener)` on success, or `None` if connection/subscription +/// fails. +pub(crate) async fn try_connect_listener( + pool: &PgPool, + channel: &str, +) -> std::result::Result { + let mut listener = sqlx::postgres::PgListener::connect_with(pool).await?; + listener.listen(channel).await?; + Ok(listener) +} + +/// Retry an operation with exponential backoff. +/// +/// This function encapsulates the retry logic with exponential backoff, +/// including error logging and delay calculation. It will keep retrying +/// until the operation succeeds. +/// +/// # Arguments +/// +/// * `backoff_policy` - The retry policy defining backoff behavior +/// * `operation` - The async operation to retry, receives the current retry +/// count +/// * `operation_name` - A descriptive name for logging purposes +pub(crate) async fn retry_with_backoff( + backoff_policy: &RetryPolicy, + operation: F, + operation_name: &str, +) -> std::result::Result +where + F: Fn(i32) -> Fut, + Fut: std::future::Future>, +{ + let mut retry_count: i32 = 1; + + loop { + let backoff_span = backoff_policy.calculate_delay(retry_count); + let backoff: StdDuration = backoff_span.try_into()?; + + match operation(retry_count).await { + Ok(result) => return Ok(result), + Err(err) => { + tracing::error!( + %err, + backoff_secs = backoff.as_secs(), + attempt = retry_count, + "Failed to {}, retrying after backoff", + operation_name + ); + tokio::time::sleep(backoff).await; + retry_count = retry_count.saturating_add(1); + } + } + } +} + +/// Connect multiple PostgreSQL listeners with retry logic. +/// +/// This function attempts to establish connections to multiple PostgreSQL +/// listeners with exponential backoff retry logic. All listeners must connect +/// successfully or the entire operation will be retried. +/// +/// # Arguments +/// +/// * `pool` - The PostgreSQL connection pool +/// * `channels` - Slice of channel names to listen on +/// * `backoff_policy` - The retry policy defining backoff behavior +/// +/// # Returns +/// +/// A vector of connected listeners in the same order as the input channels. +pub(crate) async fn connect_listeners_with_retry( + pool: &PgPool, + channels: &[&str], + backoff_policy: &RetryPolicy, +) -> std::result::Result, Error> { + retry_with_backoff( + backoff_policy, + |_| async { + let mut listeners = Vec::new(); + for channel in channels { + listeners.push(try_connect_listener(pool, channel).await?); + } + Ok(listeners) + }, + "connect PostgreSQL listeners", + ) + .await +} + /// Initiates a graceful shutdown by sending a `NOTIFY` to the /// `underway_shutdown` channel via the `pg_notify` function. /// diff --git a/src/scheduler.rs b/src/scheduler.rs index f014562..fa20398 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -2,13 +2,16 @@ use std::{result::Result as StdResult, str::FromStr, sync::Arc, time::Duration a use jiff::{tz::TimeZone, Zoned}; use jiff_cron::{Schedule, ScheduleIterator}; -use sqlx::postgres::{PgAdvisoryLock, PgListener}; +use sqlx::postgres::PgAdvisoryLock; use tokio::time::{sleep, Instant}; use tokio_util::sync::CancellationToken; use tracing::instrument; use crate::{ - queue::{shutdown_channel, try_acquire_advisory_lock, Error as QueueError}, + queue::{ + connect_listeners_with_retry, shutdown_channel, try_acquire_advisory_lock, + Error as QueueError, + }, task::{RetryCount, RetryPolicy}, Queue, Task, }; @@ -53,8 +56,8 @@ pub struct Scheduler { // When this token is cancelled the queue has been shutdown. shutdown_token: CancellationToken, - // Policy for reconnection backoff when PostgreSQL connection is lost. - reconnection_policy: RetryPolicy, + // Backoff policy for reconnecting when PostgreSQL connection is lost. + reconnect_backoff: RetryPolicy, } impl Scheduler { @@ -111,7 +114,7 @@ impl Scheduler { queue_lock, task, shutdown_token: CancellationToken::new(), - reconnection_policy: RetryPolicy::default(), + reconnect_backoff: RetryPolicy::default(), } } @@ -164,11 +167,12 @@ impl Scheduler { self.shutdown_token = shutdown_token; } - /// Sets the reconnection policy for PostgreSQL connection failures. + /// Sets the backoff policy for PostgreSQL reconnection attempts. /// - /// This policy controls how the scheduler retries connecting when the - /// PostgreSQL connection is lost. Uses exponential backoff to avoid - /// overwhelming the database. + /// This policy controls the exponential backoff behavior when the + /// scheduler's PostgreSQL listener connection is lost and needs to + /// reconnect. This helps avoid overwhelming the database during + /// connection issues. /// /// Defaults to 1 second initial interval, 60 second max interval, 2.0 /// coefficient and 0.5 jitter_factor. @@ -212,20 +216,20 @@ impl Scheduler { /// # */ /// # /// - /// // Set a custom reconnection policy using RetryPolicy. - /// let policy = RetryPolicy::builder() + /// // Set a custom backoff policy for reconnection. + /// let backoff = RetryPolicy::builder() /// .initial_interval_ms(2_000) // 2 seconds /// .max_interval_ms(60_000) // 1 minute /// .backoff_coefficient(2.5) /// .jitter_factor(0.5) /// .build(); - /// scheduler.set_reconnection_policy(policy); + /// scheduler.set_reconnect_backoff(backoff); /// # Ok::<(), Box>(()) /// # }); /// # } /// ``` - pub fn set_reconnection_policy(&mut self, reconnection_policy: RetryPolicy) { - self.reconnection_policy = reconnection_policy; + pub fn set_reconnect_backoff(&mut self, backoff: RetryPolicy) { + self.reconnect_backoff = backoff; } /// Cancels the shutdown token causing the scheduler to exit. @@ -332,7 +336,7 @@ impl Scheduler { // Outer loop: handle reconnection logic for the scheduler's Postgres listener. 'reconnect: loop { // Compute current reconnect backoff - let reconnect_backoff_span = self.reconnection_policy.calculate_delay(retry_count); + let reconnect_backoff_span = self.reconnect_backoff.calculate_delay(retry_count); let reconnect_backoff: StdDuration = reconnect_backoff_span.try_into()?; let conn = match self.queue.pool.acquire().await { @@ -362,35 +366,13 @@ impl Scheduler { }; // Set up a listener for shutdown notifications - let mut shutdown_listener = match PgListener::connect_with(&self.queue.pool).await { - Ok(listener) => listener, - Err(err) => { - tracing::error!( - %err, - backoff_secs = reconnect_backoff.as_secs(), - attempt = retry_count, - "Failed to connect scheduler shutdown listener, retrying after backoff" - ); - sleep(reconnect_backoff).await; - retry_count = retry_count.saturating_add(1); - continue 'reconnect; - } - }; - let chan = shutdown_channel(); - if let Err(err) = shutdown_listener.listen(chan).await { - tracing::error!( - %err, - backoff_secs = reconnect_backoff.as_secs(), - attempt = retry_count, - "Failed to listen on scheduler shutdown channel, retrying after backoff" - ); - sleep(reconnect_backoff).await; - retry_count = retry_count.saturating_add(1); - continue 'reconnect; - } + let mut listeners = + connect_listeners_with_retry(&self.queue.pool, &[chan], &self.reconnect_backoff) + .await?; + + let mut shutdown_listener = listeners.remove(0); - // Connection succeeded, reset retry counter tracing::info!("Scheduler PostgreSQL listener connected successfully"); retry_count = 1; diff --git a/src/worker.rs b/src/worker.rs index 41c815b..ec33836 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -129,15 +129,17 @@ use std::{sync::Arc, time::Duration}; use jiff::{Span, ToSpan}; use serde::Deserialize; use sqlx::{ - postgres::{types::PgInterval, PgListener, PgNotification}, + postgres::{types::PgInterval, PgNotification}, Acquire, PgConnection, }; -use tokio::{sync::Semaphore, task::JoinSet, time::sleep}; +use tokio::{sync::Semaphore, task::JoinSet}; use tokio_util::sync::CancellationToken; use tracing::instrument; use crate::{ - queue::{shutdown_channel, Error as QueueError, InProgressTask, Queue}, + queue::{ + connect_listeners_with_retry, shutdown_channel, Error as QueueError, InProgressTask, Queue, + }, task::{Error as TaskError, RetryCount, RetryPolicy, Task, TaskId}, }; pub(crate) type Result = std::result::Result; @@ -176,8 +178,8 @@ pub struct Worker { // Limits the number of concurrent `Task::execute` invocations this worker will be allowed. concurrency_limit: usize, - // Policy for reconnection backoff when PostgreSQL connection is lost. - reconnection_policy: RetryPolicy, + // Backoff policy for reconnecting when PostgreSQL connection is lost. + reconnect_backoff: RetryPolicy, // When this token is cancelled the queue has been shutdown. shutdown_token: CancellationToken, @@ -189,7 +191,7 @@ impl Clone for Worker { queue: Arc::clone(&self.queue), task: Arc::clone(&self.task), concurrency_limit: self.concurrency_limit, - reconnection_policy: self.reconnection_policy, + reconnect_backoff: self.reconnect_backoff, shutdown_token: self.shutdown_token.clone(), } } @@ -247,7 +249,7 @@ impl Worker { queue, task, concurrency_limit: num_cpus::get(), - reconnection_policy: RetryPolicy::default(), + reconnect_backoff: RetryPolicy::default(), shutdown_token: CancellationToken::new(), } } @@ -349,11 +351,12 @@ impl Worker { self.shutdown_token = shutdown_token; } - /// Sets the reconnection policy for PostgreSQL connection failures. + /// Sets the backoff policy for PostgreSQL reconnection attempts. /// - /// This policy controls how the worker retries connecting when the - /// PostgreSQL connection is lost. Uses exponential backoff to avoid - /// overwhelming the database. + /// This policy controls the exponential backoff behavior when the + /// worker's PostgreSQL listener connection is lost and needs to + /// reconnect. This helps avoid overwhelming the database during + /// connection issues. /// /// Defaults to 1 second initial interval, 60 second max interval, 2.0 /// coefficient and 0.5 jitter_factor. @@ -397,20 +400,20 @@ impl Worker { /// # */ /// # /// - /// // Set a custom reconnection policy using RetryPolicy. - /// let policy = RetryPolicy::builder() + /// // Set a custom backoff policy for reconnection. + /// let backoff = RetryPolicy::builder() /// .initial_interval_ms(2_000) // 2 seconds /// .max_interval_ms(60_000) // 1 minute /// .backoff_coefficient(2.5) /// .jitter_factor(0.5) /// .build(); - /// worker.set_reconnection_policy(policy); + /// worker.set_reconnect_backoff(backoff); /// # Ok::<(), Box>(()) /// # }); /// # } /// ``` - pub fn set_reconnection_policy(&mut self, reconnection_policy: RetryPolicy) { - self.reconnection_policy = reconnection_policy; + pub fn set_reconnect_backoff(&mut self, backoff: RetryPolicy) { + self.reconnect_backoff = backoff; } /// Cancels the shutdown token and begins a graceful shutdown of in-progress @@ -578,61 +581,20 @@ impl Worker { let concurrency_limit = Arc::new(Semaphore::new(self.concurrency_limit)); let mut processing_tasks = JoinSet::new(); - let mut retry_count: RetryCount = 1; - // Outer loop: handle reconnection logic 'reconnect: loop { - // Compute current reconnect backoff - let reconnect_backoff_span = self.reconnection_policy.calculate_delay(retry_count); - let reconnect_backoff: Duration = reconnect_backoff_span.try_into()?; - - // Try to establish shutdown listener connection - let mut shutdown_listener = match PgListener::connect_with(&self.queue.pool).await { - Ok(listener) => listener, - Err(err) => { - tracing::error!(%err, backoff_secs = reconnect_backoff.as_secs(), - attempt = retry_count, - "Failed to connect shutdown listener, retrying after backoff"); - sleep(reconnect_backoff).await; - retry_count = retry_count.saturating_add(1); - continue 'reconnect; - } - }; - - if let Err(err) = shutdown_listener.listen(chan).await { - tracing::error!(%err, backoff_secs = reconnect_backoff.as_secs(), - attempt = retry_count, - "Failed to listen on shutdown channel, retrying after backoff"); - sleep(reconnect_backoff).await; - retry_count = retry_count.saturating_add(1); - continue 'reconnect; - } + // Connect to PostgreSQL listeners with retry logic + let mut listeners = connect_listeners_with_retry( + &self.queue.pool, + &[chan, "task_change"], + &self.reconnect_backoff, + ) + .await?; - // Try to establish task change listener connection - let mut task_change_listener = match PgListener::connect_with(&self.queue.pool).await { - Ok(listener) => listener, - Err(err) => { - tracing::error!(%err, backoff_secs = reconnect_backoff.as_secs(), - attempt = retry_count, - "Failed to connect task change listener, retrying after backoff"); - sleep(reconnect_backoff).await; - retry_count = retry_count.saturating_add(1); - continue 'reconnect; - } - }; - - if let Err(err) = task_change_listener.listen("task_change").await { - tracing::error!(%err, backoff_secs = reconnect_backoff.as_secs(), - attempt = retry_count, - "Failed to listen on task_change channel, retrying after backoff"); - sleep(reconnect_backoff).await; - retry_count = retry_count.saturating_add(1); - continue 'reconnect; - } + let mut shutdown_listener = listeners.remove(0); + let mut task_change_listener = listeners.remove(0); - // Connection succeeded, reset retry counter tracing::info!("PostgreSQL listeners connected successfully"); - retry_count = 1; // Inner loop: handle normal events loop { From da58355a19f62d121ef587126c8fd8194b021f4d Mon Sep 17 00:00:00 2001 From: Marvel-Gu Date: Mon, 29 Dec 2025 01:20:18 +0800 Subject: [PATCH 7/8] feat: separate migration --- migrations/20241105164503_2.sql | 5 ++--- migrations/20251228120000_0.sql | 36 +++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 3 deletions(-) create mode 100644 migrations/20251228120000_0.sql diff --git a/migrations/20241105164503_2.sql b/migrations/20241105164503_2.sql index f942fa8..750b178 100644 --- a/migrations/20241105164503_2.sql +++ b/migrations/20241105164503_2.sql @@ -18,13 +18,12 @@ create type underway.task_retry_policy as ( max_attempts int, initial_interval_ms int, max_interval_ms int, - backoff_coefficient float, - jitter_factor float + backoff_coefficient float ); alter table underway.task add column if not exists retry_policy underway.task_retry_policy not null - default row(5, 1000, 60000, 2.0, 0.5)::underway.task_retry_policy; + default row(5, 1000, 60000, 2.0)::underway.task_retry_policy; alter table underway.task add column if not exists completed_at timestamp with time zone; diff --git a/migrations/20251228120000_0.sql b/migrations/20251228120000_0.sql new file mode 100644 index 0000000..6f2f73b --- /dev/null +++ b/migrations/20251228120000_0.sql @@ -0,0 +1,36 @@ +-- Force anything running this migration to use the right search path. +set local search_path to underway; + +-- Save existing retry_policy data +alter table underway.task +rename column retry_policy to retry_policy_old; + +-- Drop and recreate the type with the new field +drop type if exists underway.task_retry_policy cascade; + +create type underway.task_retry_policy as ( + max_attempts int, + initial_interval_ms int, + max_interval_ms int, + backoff_coefficient float, + jitter_factor float +); + +-- Add the new column with updated type +alter table underway.task +add column retry_policy underway.task_retry_policy not null +default row(5, 1000, 60000, 2.0, 0.5)::underway.task_retry_policy; + +-- Migrate data from old column to new column +update underway.task +set retry_policy = row( + (retry_policy_old).max_attempts, + (retry_policy_old).initial_interval_ms, + (retry_policy_old).max_interval_ms, + (retry_policy_old).backoff_coefficient, + 0.5 -- default jitter_factor for existing records +)::underway.task_retry_policy; + +-- Drop the old column +alter table underway.task +drop column retry_policy_old; From 94f7fcbfaf29034dcaf5726725e760b72b6eb6ca Mon Sep 17 00:00:00 2001 From: Marvel-Gu Date: Tue, 6 Jan 2026 21:53:18 +0800 Subject: [PATCH 8/8] fix: migration --- migrations/20251228120000_0.sql | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/migrations/20251228120000_0.sql b/migrations/20251228120000_0.sql index 6f2f73b..360d80e 100644 --- a/migrations/20251228120000_0.sql +++ b/migrations/20251228120000_0.sql @@ -1,9 +1,18 @@ -- Force anything running this migration to use the right search path. set local search_path to underway; --- Save existing retry_policy data +-- Create a temporary column to store serialized retry_policy data alter table underway.task -rename column retry_policy to retry_policy_old; +add column retry_policy_backup jsonb; + +-- Save existing retry_policy data to backup column +update underway.task +set retry_policy_backup = jsonb_build_object( + 'max_attempts', (retry_policy).max_attempts, + 'initial_interval_ms', (retry_policy).initial_interval_ms, + 'max_interval_ms', (retry_policy).max_interval_ms, + 'backoff_coefficient', (retry_policy).backoff_coefficient +); -- Drop and recreate the type with the new field drop type if exists underway.task_retry_policy cascade; @@ -21,16 +30,17 @@ alter table underway.task add column retry_policy underway.task_retry_policy not null default row(5, 1000, 60000, 2.0, 0.5)::underway.task_retry_policy; --- Migrate data from old column to new column +-- Migrate data from backup to new column update underway.task set retry_policy = row( - (retry_policy_old).max_attempts, - (retry_policy_old).initial_interval_ms, - (retry_policy_old).max_interval_ms, - (retry_policy_old).backoff_coefficient, + (retry_policy_backup->>'max_attempts')::int, + (retry_policy_backup->>'initial_interval_ms')::int, + (retry_policy_backup->>'max_interval_ms')::int, + (retry_policy_backup->>'backoff_coefficient')::float, 0.5 -- default jitter_factor for existing records -)::underway.task_retry_policy; +)::underway.task_retry_policy +where retry_policy_backup is not null; --- Drop the old column +-- Drop the backup column alter table underway.task -drop column retry_policy_old; +drop column retry_policy_backup;