Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ ulid = { version = "1.1.3", features = ["uuid"] }
uuid = { version = "1.10.0", features = ["v4", "serde"] }
num_cpus = "1.16.0"
tokio-util = "0.7.12"
rand = "0.9.2"

[dev-dependencies]
futures = "0.3.30"
Expand Down
36 changes: 36 additions & 0 deletions migrations/20251228120000_0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
-- Force anything running this migration to use the right search path.
set local search_path to underway;

-- Save existing retry_policy data
alter table underway.task
rename column retry_policy to retry_policy_old;

-- Drop and recreate the type with the new field
drop type if exists underway.task_retry_policy cascade;

create type underway.task_retry_policy as (
max_attempts int,
initial_interval_ms int,
max_interval_ms int,
backoff_coefficient float,
jitter_factor float
);

-- Add the new column with updated type
alter table underway.task
add column retry_policy underway.task_retry_policy not null
default row(5, 1000, 60000, 2.0, 0.5)::underway.task_retry_policy;

-- Migrate data from old column to new column
update underway.task
set retry_policy = row(
(retry_policy_old).max_attempts,
(retry_policy_old).initial_interval_ms,
(retry_policy_old).max_interval_ms,
(retry_policy_old).backoff_coefficient,
0.5 -- default jitter_factor for existing records
)::underway.task_retry_policy;

-- Drop the old column
alter table underway.task
drop column retry_policy_old;
96 changes: 96 additions & 0 deletions src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1834,6 +1834,102 @@ pub(crate) fn shutdown_channel() -> &'static str {
SHUTDOWN_CHANNEL.get_or_init(|| format!("underway_shutdown_{}", Uuid::new_v4()))
}

/// Helper function to connect a PostgreSQL listener with retry logic.
///
/// This function attempts to establish a connection to a PostgreSQL listener
/// and subscribe to the specified channel. If the connection or subscription
/// fails, it will log the error and return None, allowing the caller to
/// handle the retry logic (including backoff delays).
///
/// Returns `Some(listener)` on success, or `None` if connection/subscription
/// fails.
pub(crate) async fn try_connect_listener(
pool: &PgPool,
channel: &str,
) -> std::result::Result<sqlx::postgres::PgListener, sqlx::Error> {
let mut listener = sqlx::postgres::PgListener::connect_with(pool).await?;
listener.listen(channel).await?;
Ok(listener)
}

/// Retry an operation with exponential backoff.
///
/// This function encapsulates the retry logic with exponential backoff,
/// including error logging and delay calculation. It will keep retrying
/// until the operation succeeds.
///
/// # Arguments
///
/// * `backoff_policy` - The retry policy defining backoff behavior
/// * `operation` - The async operation to retry, receives the current retry
/// count
/// * `operation_name` - A descriptive name for logging purposes
pub(crate) async fn retry_with_backoff<F, Fut, T>(
backoff_policy: &RetryPolicy,
operation: F,
operation_name: &str,
) -> std::result::Result<T, Error>
where
F: Fn(i32) -> Fut,
Fut: std::future::Future<Output = std::result::Result<T, sqlx::Error>>,
{
let mut retry_count: i32 = 1;

loop {
let backoff_span = backoff_policy.calculate_delay(retry_count);
let backoff: StdDuration = backoff_span.try_into()?;

match operation(retry_count).await {
Ok(result) => return Ok(result),
Err(err) => {
tracing::error!(
%err,
backoff_secs = backoff.as_secs(),
attempt = retry_count,
"Failed to {}, retrying after backoff",
operation_name
);
tokio::time::sleep(backoff).await;
retry_count = retry_count.saturating_add(1);
}
}
}
}

/// Connect multiple PostgreSQL listeners with retry logic.
///
/// This function attempts to establish connections to multiple PostgreSQL
/// listeners with exponential backoff retry logic. All listeners must connect
/// successfully or the entire operation will be retried.
///
/// # Arguments
///
/// * `pool` - The PostgreSQL connection pool
/// * `channels` - Slice of channel names to listen on
/// * `backoff_policy` - The retry policy defining backoff behavior
///
/// # Returns
///
/// A vector of connected listeners in the same order as the input channels.
pub(crate) async fn connect_listeners_with_retry(
pool: &PgPool,
channels: &[&str],
backoff_policy: &RetryPolicy,
) -> std::result::Result<Vec<sqlx::postgres::PgListener>, Error> {
retry_with_backoff(
backoff_policy,
|_| async {
let mut listeners = Vec::new();
for channel in channels {
listeners.push(try_connect_listener(pool, channel).await?);
}
Ok(listeners)
},
"connect PostgreSQL listeners",
)
.await
}

/// Initiates a graceful shutdown by sending a `NOTIFY` to the
/// `underway_shutdown` channel via the `pg_notify` function.
///
Expand Down
187 changes: 147 additions & 40 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@ use std::{result::Result as StdResult, str::FromStr, sync::Arc, time::Duration a

use jiff::{tz::TimeZone, Zoned};
use jiff_cron::{Schedule, ScheduleIterator};
use sqlx::postgres::{PgAdvisoryLock, PgListener};
use tokio::time::Instant;
use sqlx::postgres::PgAdvisoryLock;
use tokio::time::{sleep, Instant};
use tokio_util::sync::CancellationToken;
use tracing::instrument;

use crate::{
queue::{shutdown_channel, try_acquire_advisory_lock, Error as QueueError},
queue::{
connect_listeners_with_retry, shutdown_channel, try_acquire_advisory_lock,
Error as QueueError,
},
task::{RetryCount, RetryPolicy},
Queue, Task,
};

Expand Down Expand Up @@ -51,6 +55,9 @@ pub struct Scheduler<T: Task> {

// When this token is cancelled the queue has been shutdown.
shutdown_token: CancellationToken,

// Backoff policy for reconnecting when PostgreSQL connection is lost.
reconnect_backoff: RetryPolicy,
}

impl<T: Task> Scheduler<T> {
Expand Down Expand Up @@ -107,6 +114,7 @@ impl<T: Task> Scheduler<T> {
queue_lock,
task,
shutdown_token: CancellationToken::new(),
reconnect_backoff: RetryPolicy::default(),
}
}

Expand Down Expand Up @@ -159,6 +167,71 @@ impl<T: Task> Scheduler<T> {
self.shutdown_token = shutdown_token;
}

/// Sets the backoff policy for PostgreSQL reconnection attempts.
///
/// This policy controls the exponential backoff behavior when the
/// scheduler's PostgreSQL listener connection is lost and needs to
/// reconnect. This helps avoid overwhelming the database during
/// connection issues.
///
/// Defaults to 1 second initial interval, 60 second max interval, 2.0
/// coefficient and 0.5 jitter_factor.
///
/// **Note**: The `max_attempts` field is ignored for reconnection - the
/// scheduler will keep retrying until successful or until shutdown.
///
/// # Example
///
/// ```rust,no_run
/// # use sqlx::{PgPool, Transaction, Postgres};
/// # use underway::{Task, task::Result as TaskResult, Queue, Scheduler};
/// use underway::task::RetryPolicy;
///
/// # struct ExampleTask;
/// # impl Task for ExampleTask {
/// # type Input = ();
/// # type Output = ();
/// # async fn execute(
/// # &self,
/// # _: Transaction<'_, Postgres>,
/// # _: Self::Input,
/// # ) -> TaskResult<Self::Output> {
/// # Ok(())
/// # }
/// # }
/// # use tokio::runtime::Runtime;
/// # fn main() {
/// # let rt = Runtime::new().unwrap();
/// # rt.block_on(async {
/// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?;
/// # let queue = Queue::builder()
/// # .name("example")
/// # .pool(pool.clone())
/// # .build()
/// # .await?;
/// # let task = ExampleTask;
/// # let mut scheduler = Scheduler::new(queue.into(), task);
/// # /*
/// let mut scheduler = { /* A `Scheduler`. */ };
/// # */
/// #
///
/// // Set a custom backoff policy for reconnection.
/// let backoff = RetryPolicy::builder()
/// .initial_interval_ms(2_000) // 2 seconds
/// .max_interval_ms(60_000) // 1 minute
/// .backoff_coefficient(2.5)
/// .jitter_factor(0.5)
/// .build();
/// scheduler.set_reconnect_backoff(backoff);
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// # });
/// # }
/// ```
pub fn set_reconnect_backoff(&mut self, backoff: RetryPolicy) {
self.reconnect_backoff = backoff;
}

/// Cancels the shutdown token causing the scheduler to exit.
///
/// ```rust,no_run
Expand Down Expand Up @@ -258,49 +331,83 @@ impl<T: Task> Scheduler<T> {
/// ```
#[instrument(skip(self), fields(queue.name = self.queue.name), err)]
pub async fn run(&self) -> Result {
let conn = self.queue.pool.acquire().await?;
let Some(guard) = try_acquire_advisory_lock(conn, &self.queue_lock).await? else {
tracing::trace!("Scheduler could not acquire lock, exiting");
return Ok(());
};

let Some((zoned_schedule, input)) = self.queue.task_schedule(&self.queue.pool).await?
else {
// No schedule configured, so we'll exit.
return Ok(());
};

// Set up a listener for shutdown notifications
let mut shutdown_listener = PgListener::connect_with(&self.queue.pool).await?;
let chan = shutdown_channel();
shutdown_listener.listen(chan).await?;

// TODO: Handle updates to schedules?

for next in zoned_schedule.iter() {
tracing::debug!(?next, "Waiting until next scheduled task enqueue");

tokio::select! {
notify_shutdown = shutdown_listener.recv() => {
match notify_shutdown {
Ok(_) => {
self.shutdown_token.cancel();
},
Err(err) => {
tracing::error!(%err, "Postgres shutdown notification error");
let mut retry_count: RetryCount = 1;

// Outer loop: handle reconnection logic for the scheduler's Postgres listener.
'reconnect: loop {
// Compute current reconnect backoff
let reconnect_backoff_span = self.reconnect_backoff.calculate_delay(retry_count);
let reconnect_backoff: StdDuration = reconnect_backoff_span.try_into()?;

let conn = match self.queue.pool.acquire().await {
Ok(conn) => conn,
Err(err) => {
tracing::error!(
%err,
backoff_secs = reconnect_backoff.as_secs(),
attempt = retry_count,
"Failed to acquire database connection for scheduler, retrying after backoff"
);
sleep(reconnect_backoff).await;
retry_count = retry_count.saturating_add(1);
continue 'reconnect;
}
};

let Some(guard) = try_acquire_advisory_lock(conn, &self.queue_lock).await? else {
tracing::trace!("Scheduler could not acquire lock, exiting");
return Ok(());
};

let Some((zoned_schedule, input)) = self.queue.task_schedule(&self.queue.pool).await?
else {
// No schedule configured, so we'll exit.
return Ok(());
};

// Set up a listener for shutdown notifications
let chan = shutdown_channel();
let mut listeners =
connect_listeners_with_retry(&self.queue.pool, &[chan], &self.reconnect_backoff)
.await?;

let mut shutdown_listener = listeners.remove(0);

tracing::info!("Scheduler PostgreSQL listener connected successfully");
retry_count = 1;

// TODO: Handle updates to schedules?

for next in zoned_schedule.iter() {
tracing::debug!(?next, "Waiting until next scheduled task enqueue");

tokio::select! {
notify_shutdown = shutdown_listener.recv() => {
match notify_shutdown {
Ok(_) => {
self.shutdown_token.cancel();
},
Err(err) => {
tracing::warn!(%err, "Scheduler shutdown listener connection lost, reconnecting");
continue 'reconnect;
}
}
}
}

_ = self.shutdown_token.cancelled() => {
guard.release_now().await?;
break
}
_ = self.shutdown_token.cancelled() => {
break;
}

_ = wait_until(&next) => {
self.process_next_schedule(&input).await?
_ = wait_until(&next) => {
self.process_next_schedule(&input).await?
}
}
}

guard.release_now().await?;

// Exit the reconnect loop once we have completed the schedule iteration.
break;
}

Ok(())
Expand Down
Loading
Loading