diff --git a/Cargo.toml b/Cargo.toml index 6f0dee8..80bf270 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ ulid = { version = "1.1.3", features = ["uuid"] } uuid = { version = "1.10.0", features = ["v4", "serde"] } num_cpus = "1.16.0" tokio-util = "0.7.12" +rand = "0.9.2" [dev-dependencies] futures = "0.3.30" diff --git a/migrations/20251228120000_0.sql b/migrations/20251228120000_0.sql new file mode 100644 index 0000000..360d80e --- /dev/null +++ b/migrations/20251228120000_0.sql @@ -0,0 +1,46 @@ +-- Force anything running this migration to use the right search path. +set local search_path to underway; + +-- Create a temporary column to store serialized retry_policy data +alter table underway.task +add column retry_policy_backup jsonb; + +-- Save existing retry_policy data to backup column +update underway.task +set retry_policy_backup = jsonb_build_object( + 'max_attempts', (retry_policy).max_attempts, + 'initial_interval_ms', (retry_policy).initial_interval_ms, + 'max_interval_ms', (retry_policy).max_interval_ms, + 'backoff_coefficient', (retry_policy).backoff_coefficient +); + +-- Drop and recreate the type with the new field +drop type if exists underway.task_retry_policy cascade; + +create type underway.task_retry_policy as ( + max_attempts int, + initial_interval_ms int, + max_interval_ms int, + backoff_coefficient float, + jitter_factor float +); + +-- Add the new column with updated type +alter table underway.task +add column retry_policy underway.task_retry_policy not null +default row(5, 1000, 60000, 2.0, 0.5)::underway.task_retry_policy; + +-- Migrate data from backup to new column +update underway.task +set retry_policy = row( + (retry_policy_backup->>'max_attempts')::int, + (retry_policy_backup->>'initial_interval_ms')::int, + (retry_policy_backup->>'max_interval_ms')::int, + (retry_policy_backup->>'backoff_coefficient')::float, + 0.5 -- default jitter_factor for existing records +)::underway.task_retry_policy +where retry_policy_backup is not null; + +-- Drop the backup column +alter table underway.task +drop column retry_policy_backup; diff --git a/src/queue.rs b/src/queue.rs index ff58e45..c3e9834 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -1834,6 +1834,102 @@ pub(crate) fn shutdown_channel() -> &'static str { SHUTDOWN_CHANNEL.get_or_init(|| format!("underway_shutdown_{}", Uuid::new_v4())) } +/// Helper function to connect a PostgreSQL listener with retry logic. +/// +/// This function attempts to establish a connection to a PostgreSQL listener +/// and subscribe to the specified channel. If the connection or subscription +/// fails, it will log the error and return None, allowing the caller to +/// handle the retry logic (including backoff delays). +/// +/// Returns `Some(listener)` on success, or `None` if connection/subscription +/// fails. +pub(crate) async fn try_connect_listener( + pool: &PgPool, + channel: &str, +) -> std::result::Result { + let mut listener = sqlx::postgres::PgListener::connect_with(pool).await?; + listener.listen(channel).await?; + Ok(listener) +} + +/// Retry an operation with exponential backoff. +/// +/// This function encapsulates the retry logic with exponential backoff, +/// including error logging and delay calculation. It will keep retrying +/// until the operation succeeds. +/// +/// # Arguments +/// +/// * `backoff_policy` - The retry policy defining backoff behavior +/// * `operation` - The async operation to retry, receives the current retry +/// count +/// * `operation_name` - A descriptive name for logging purposes +pub(crate) async fn retry_with_backoff( + backoff_policy: &RetryPolicy, + operation: F, + operation_name: &str, +) -> std::result::Result +where + F: Fn(i32) -> Fut, + Fut: std::future::Future>, +{ + let mut retry_count: i32 = 1; + + loop { + let backoff_span = backoff_policy.calculate_delay(retry_count); + let backoff: StdDuration = backoff_span.try_into()?; + + match operation(retry_count).await { + Ok(result) => return Ok(result), + Err(err) => { + tracing::error!( + %err, + backoff_secs = backoff.as_secs(), + attempt = retry_count, + "Failed to {}, retrying after backoff", + operation_name + ); + tokio::time::sleep(backoff).await; + retry_count = retry_count.saturating_add(1); + } + } + } +} + +/// Connect multiple PostgreSQL listeners with retry logic. +/// +/// This function attempts to establish connections to multiple PostgreSQL +/// listeners with exponential backoff retry logic. All listeners must connect +/// successfully or the entire operation will be retried. +/// +/// # Arguments +/// +/// * `pool` - The PostgreSQL connection pool +/// * `channels` - Slice of channel names to listen on +/// * `backoff_policy` - The retry policy defining backoff behavior +/// +/// # Returns +/// +/// A vector of connected listeners in the same order as the input channels. +pub(crate) async fn connect_listeners_with_retry( + pool: &PgPool, + channels: &[&str], + backoff_policy: &RetryPolicy, +) -> std::result::Result, Error> { + retry_with_backoff( + backoff_policy, + |_| async { + let mut listeners = Vec::new(); + for channel in channels { + listeners.push(try_connect_listener(pool, channel).await?); + } + Ok(listeners) + }, + "connect PostgreSQL listeners", + ) + .await +} + /// Initiates a graceful shutdown by sending a `NOTIFY` to the /// `underway_shutdown` channel via the `pg_notify` function. /// diff --git a/src/scheduler.rs b/src/scheduler.rs index fcaa7d4..fa20398 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -2,13 +2,17 @@ use std::{result::Result as StdResult, str::FromStr, sync::Arc, time::Duration a use jiff::{tz::TimeZone, Zoned}; use jiff_cron::{Schedule, ScheduleIterator}; -use sqlx::postgres::{PgAdvisoryLock, PgListener}; -use tokio::time::Instant; +use sqlx::postgres::PgAdvisoryLock; +use tokio::time::{sleep, Instant}; use tokio_util::sync::CancellationToken; use tracing::instrument; use crate::{ - queue::{shutdown_channel, try_acquire_advisory_lock, Error as QueueError}, + queue::{ + connect_listeners_with_retry, shutdown_channel, try_acquire_advisory_lock, + Error as QueueError, + }, + task::{RetryCount, RetryPolicy}, Queue, Task, }; @@ -51,6 +55,9 @@ pub struct Scheduler { // When this token is cancelled the queue has been shutdown. shutdown_token: CancellationToken, + + // Backoff policy for reconnecting when PostgreSQL connection is lost. + reconnect_backoff: RetryPolicy, } impl Scheduler { @@ -107,6 +114,7 @@ impl Scheduler { queue_lock, task, shutdown_token: CancellationToken::new(), + reconnect_backoff: RetryPolicy::default(), } } @@ -159,6 +167,71 @@ impl Scheduler { self.shutdown_token = shutdown_token; } + /// Sets the backoff policy for PostgreSQL reconnection attempts. + /// + /// This policy controls the exponential backoff behavior when the + /// scheduler's PostgreSQL listener connection is lost and needs to + /// reconnect. This helps avoid overwhelming the database during + /// connection issues. + /// + /// Defaults to 1 second initial interval, 60 second max interval, 2.0 + /// coefficient and 0.5 jitter_factor. + /// + /// **Note**: The `max_attempts` field is ignored for reconnection - the + /// scheduler will keep retrying until successful or until shutdown. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::{PgPool, Transaction, Postgres}; + /// # use underway::{Task, task::Result as TaskResult, Queue, Scheduler}; + /// use underway::task::RetryPolicy; + /// + /// # struct ExampleTask; + /// # impl Task for ExampleTask { + /// # type Input = (); + /// # type Output = (); + /// # async fn execute( + /// # &self, + /// # _: Transaction<'_, Postgres>, + /// # _: Self::Input, + /// # ) -> TaskResult { + /// # Ok(()) + /// # } + /// # } + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # let queue = Queue::builder() + /// # .name("example") + /// # .pool(pool.clone()) + /// # .build() + /// # .await?; + /// # let task = ExampleTask; + /// # let mut scheduler = Scheduler::new(queue.into(), task); + /// # /* + /// let mut scheduler = { /* A `Scheduler`. */ }; + /// # */ + /// # + /// + /// // Set a custom backoff policy for reconnection. + /// let backoff = RetryPolicy::builder() + /// .initial_interval_ms(2_000) // 2 seconds + /// .max_interval_ms(60_000) // 1 minute + /// .backoff_coefficient(2.5) + /// .jitter_factor(0.5) + /// .build(); + /// scheduler.set_reconnect_backoff(backoff); + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` + pub fn set_reconnect_backoff(&mut self, backoff: RetryPolicy) { + self.reconnect_backoff = backoff; + } + /// Cancels the shutdown token causing the scheduler to exit. /// /// ```rust,no_run @@ -258,49 +331,83 @@ impl Scheduler { /// ``` #[instrument(skip(self), fields(queue.name = self.queue.name), err)] pub async fn run(&self) -> Result { - let conn = self.queue.pool.acquire().await?; - let Some(guard) = try_acquire_advisory_lock(conn, &self.queue_lock).await? else { - tracing::trace!("Scheduler could not acquire lock, exiting"); - return Ok(()); - }; - - let Some((zoned_schedule, input)) = self.queue.task_schedule(&self.queue.pool).await? - else { - // No schedule configured, so we'll exit. - return Ok(()); - }; - - // Set up a listener for shutdown notifications - let mut shutdown_listener = PgListener::connect_with(&self.queue.pool).await?; - let chan = shutdown_channel(); - shutdown_listener.listen(chan).await?; - - // TODO: Handle updates to schedules? - - for next in zoned_schedule.iter() { - tracing::debug!(?next, "Waiting until next scheduled task enqueue"); - - tokio::select! { - notify_shutdown = shutdown_listener.recv() => { - match notify_shutdown { - Ok(_) => { - self.shutdown_token.cancel(); - }, - Err(err) => { - tracing::error!(%err, "Postgres shutdown notification error"); + let mut retry_count: RetryCount = 1; + + // Outer loop: handle reconnection logic for the scheduler's Postgres listener. + 'reconnect: loop { + // Compute current reconnect backoff + let reconnect_backoff_span = self.reconnect_backoff.calculate_delay(retry_count); + let reconnect_backoff: StdDuration = reconnect_backoff_span.try_into()?; + + let conn = match self.queue.pool.acquire().await { + Ok(conn) => conn, + Err(err) => { + tracing::error!( + %err, + backoff_secs = reconnect_backoff.as_secs(), + attempt = retry_count, + "Failed to acquire database connection for scheduler, retrying after backoff" + ); + sleep(reconnect_backoff).await; + retry_count = retry_count.saturating_add(1); + continue 'reconnect; + } + }; + + let Some(guard) = try_acquire_advisory_lock(conn, &self.queue_lock).await? else { + tracing::trace!("Scheduler could not acquire lock, exiting"); + return Ok(()); + }; + + let Some((zoned_schedule, input)) = self.queue.task_schedule(&self.queue.pool).await? + else { + // No schedule configured, so we'll exit. + return Ok(()); + }; + + // Set up a listener for shutdown notifications + let chan = shutdown_channel(); + let mut listeners = + connect_listeners_with_retry(&self.queue.pool, &[chan], &self.reconnect_backoff) + .await?; + + let mut shutdown_listener = listeners.remove(0); + + tracing::info!("Scheduler PostgreSQL listener connected successfully"); + retry_count = 1; + + // TODO: Handle updates to schedules? + + for next in zoned_schedule.iter() { + tracing::debug!(?next, "Waiting until next scheduled task enqueue"); + + tokio::select! { + notify_shutdown = shutdown_listener.recv() => { + match notify_shutdown { + Ok(_) => { + self.shutdown_token.cancel(); + }, + Err(err) => { + tracing::warn!(%err, "Scheduler shutdown listener connection lost, reconnecting"); + continue 'reconnect; + } } } - } - _ = self.shutdown_token.cancelled() => { - guard.release_now().await?; - break - } + _ = self.shutdown_token.cancelled() => { + break; + } - _ = wait_until(&next) => { - self.process_next_schedule(&input).await? + _ = wait_until(&next) => { + self.process_next_schedule(&input).await? + } } } + + guard.release_now().await?; + + // Exit the reconnect loop once we have completed the schedule iteration. + break; } Ok(()) diff --git a/src/task/retry_policy.rs b/src/task/retry_policy.rs index cf1d342..efe03b0 100644 --- a/src/task/retry_policy.rs +++ b/src/task/retry_policy.rs @@ -1,4 +1,5 @@ use jiff::{Span, ToSpan}; +use rand::Rng; /// Configuration of a policy for retries in case of task failure. /// @@ -19,6 +20,7 @@ pub struct RetryPolicy { pub(crate) initial_interval_ms: i32, pub(crate) max_interval_ms: i32, pub(crate) backoff_coefficient: f64, + pub(crate) jitter_factor: f64, } pub(crate) type RetryCount = i32; @@ -40,7 +42,10 @@ impl RetryPolicy { pub(crate) fn calculate_delay(&self, retry_count: RetryCount) -> Span { let base_delay = self.initial_interval_ms as f64; let backoff_delay = base_delay * self.backoff_coefficient.powi(retry_count - 1); - let delay = backoff_delay.min(self.max_interval_ms as f64) as i64; + let target_delay = backoff_delay.min(self.max_interval_ms as f64); + let delay = (target_delay * (1.0 - self.jitter_factor) + + rand::rng().random_range(0.0..=(target_delay * self.jitter_factor))) + as i64; delay.milliseconds() } } @@ -50,6 +55,7 @@ const DEFAULT_RETRY_POLICY: RetryPolicy = RetryPolicy { initial_interval_ms: 1_000, max_interval_ms: 60_000, backoff_coefficient: 2.0, + jitter_factor: 0.5, }; impl Default for RetryPolicy { @@ -146,6 +152,26 @@ impl Builder { self } + /// The jitter_factor is a coefficient (typically between 0.0 and 1.0) + /// that determines what percentage of the backoff duration should + /// be randomized. It controls the balance between predictability + /// and collision avoidance. + /// + /// Default value is `0.5`. + /// + /// # Example + /// + /// ```rust + /// use underway::task::RetryPolicy; + /// + /// // Set a backoff coefficient of one point five. + /// let retry_policy_builder = RetryPolicy::builder().jitter_factor(0.5); + /// ``` + pub const fn jitter_factor(mut self, jitter_factor: f64) -> Self { + self.inner.jitter_factor = jitter_factor; + self + } + /// Builds the `RetryPolicy` with the configured parameters. /// /// # Example diff --git a/src/worker.rs b/src/worker.rs index 51201e0..ec33836 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -129,7 +129,7 @@ use std::{sync::Arc, time::Duration}; use jiff::{Span, ToSpan}; use serde::Deserialize; use sqlx::{ - postgres::{types::PgInterval, PgListener, PgNotification}, + postgres::{types::PgInterval, PgNotification}, Acquire, PgConnection, }; use tokio::{sync::Semaphore, task::JoinSet}; @@ -137,7 +137,9 @@ use tokio_util::sync::CancellationToken; use tracing::instrument; use crate::{ - queue::{shutdown_channel, Error as QueueError, InProgressTask, Queue}, + queue::{ + connect_listeners_with_retry, shutdown_channel, Error as QueueError, InProgressTask, Queue, + }, task::{Error as TaskError, RetryCount, RetryPolicy, Task, TaskId}, }; pub(crate) type Result = std::result::Result; @@ -176,6 +178,9 @@ pub struct Worker { // Limits the number of concurrent `Task::execute` invocations this worker will be allowed. concurrency_limit: usize, + // Backoff policy for reconnecting when PostgreSQL connection is lost. + reconnect_backoff: RetryPolicy, + // When this token is cancelled the queue has been shutdown. shutdown_token: CancellationToken, } @@ -186,6 +191,7 @@ impl Clone for Worker { queue: Arc::clone(&self.queue), task: Arc::clone(&self.task), concurrency_limit: self.concurrency_limit, + reconnect_backoff: self.reconnect_backoff, shutdown_token: self.shutdown_token.clone(), } } @@ -243,6 +249,7 @@ impl Worker { queue, task, concurrency_limit: num_cpus::get(), + reconnect_backoff: RetryPolicy::default(), shutdown_token: CancellationToken::new(), } } @@ -344,6 +351,71 @@ impl Worker { self.shutdown_token = shutdown_token; } + /// Sets the backoff policy for PostgreSQL reconnection attempts. + /// + /// This policy controls the exponential backoff behavior when the + /// worker's PostgreSQL listener connection is lost and needs to + /// reconnect. This helps avoid overwhelming the database during + /// connection issues. + /// + /// Defaults to 1 second initial interval, 60 second max interval, 2.0 + /// coefficient and 0.5 jitter_factor. + /// + /// **Note**: The `max_attempts` field is ignored for reconnection - the + /// worker will keep retrying until successful or until shutdown. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::{PgPool, Transaction, Postgres}; + /// # use underway::{Task, task::Result as TaskResult, Queue, Worker}; + /// use underway::task::RetryPolicy; + /// + /// # struct ExampleTask; + /// # impl Task for ExampleTask { + /// # type Input = (); + /// # type Output = (); + /// # async fn execute( + /// # &self, + /// # _: Transaction<'_, Postgres>, + /// # _: Self::Input, + /// # ) -> TaskResult { + /// # Ok(()) + /// # } + /// # } + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # let queue = Queue::builder() + /// # .name("example") + /// # .pool(pool.clone()) + /// # .build() + /// # .await?; + /// # let task = ExampleTask; + /// # let mut worker = Worker::new(queue.into(), task); + /// # /* + /// let mut worker = { /* A `Worker`. */ }; + /// # */ + /// # + /// + /// // Set a custom backoff policy for reconnection. + /// let backoff = RetryPolicy::builder() + /// .initial_interval_ms(2_000) // 2 seconds + /// .max_interval_ms(60_000) // 1 minute + /// .backoff_coefficient(2.5) + /// .jitter_factor(0.5) + /// .build(); + /// worker.set_reconnect_backoff(backoff); + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` + pub fn set_reconnect_backoff(&mut self, backoff: RetryPolicy) { + self.reconnect_backoff = backoff; + } + /// Cancels the shutdown token and begins a graceful shutdown of in-progress /// tasks. /// @@ -505,58 +577,65 @@ impl Worker { #[instrument(skip(self), fields(queue.name = self.queue.name), err)] pub async fn run_every(&self, period: Span) -> Result { let mut polling_interval = tokio::time::interval(period.try_into()?); - - // Set up a listener for shutdown notifications - let mut shutdown_listener = PgListener::connect_with(&self.queue.pool).await?; let chan = shutdown_channel(); - shutdown_listener.listen(chan).await?; - - // Set up a listener for task change notifications - let mut task_change_listener = PgListener::connect_with(&self.queue.pool).await?; - task_change_listener.listen("task_change").await?; - let concurrency_limit = Arc::new(Semaphore::new(self.concurrency_limit)); let mut processing_tasks = JoinSet::new(); - loop { - tokio::select! { - notify_shutdown = shutdown_listener.recv() => { - match notify_shutdown { - Ok(_) => { - self.shutdown_token.cancel(); - }, + // Outer loop: handle reconnection logic + 'reconnect: loop { + // Connect to PostgreSQL listeners with retry logic + let mut listeners = connect_listeners_with_retry( + &self.queue.pool, + &[chan, "task_change"], + &self.reconnect_backoff, + ) + .await?; - Err(err) => { - tracing::error!(%err, "Postgres shutdown notification error"); + let mut shutdown_listener = listeners.remove(0); + let mut task_change_listener = listeners.remove(0); + + tracing::info!("PostgreSQL listeners connected successfully"); + + // Inner loop: handle normal events + loop { + tokio::select! { + notify_shutdown = shutdown_listener.recv() => { + match notify_shutdown { + Ok(_) => { + self.shutdown_token.cancel(); + }, + Err(err) => { + tracing::warn!(%err, "Shutdown listener connection lost, reconnecting"); + continue 'reconnect; + } } } - } - - _ = self.shutdown_token.cancelled() => { - self.handle_shutdown(&mut processing_tasks).await?; - break - } - // Listen for new pending tasks. - notify_task_change = task_change_listener.recv() => { - match notify_task_change { - Ok(task_change) => self.handle_task_change(task_change, concurrency_limit.clone(), &mut processing_tasks).await?, + _ = self.shutdown_token.cancelled() => { + self.handle_shutdown(&mut processing_tasks).await?; + return Ok(()); + } - Err(err) => { - tracing::error!(%err, "Postgres task change notification error"); + // Listen for new pending tasks + notify_task_change = task_change_listener.recv() => { + match notify_task_change { + Ok(task_change) => { + self.handle_task_change(task_change, concurrency_limit.clone(), &mut processing_tasks).await?; + }, + Err(err) => { + tracing::warn!(%err, "Task change listener connection lost, reconnecting"); + continue 'reconnect; + } } - }; - - } + } - // Pending task polling fallback. - _ = polling_interval.tick() => { - self.trigger_task_processing(concurrency_limit.clone(), &mut processing_tasks).await; + // Pending task polling fallback + _ = polling_interval.tick() => { + self.trigger_task_processing(concurrency_limit.clone(), &mut processing_tasks).await; + } } } } - - Ok(()) } async fn handle_shutdown(&self, processing_tasks: &mut JoinSet<()>) -> Result {