Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 140 additions & 23 deletions src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@
//!
//! ```rust
//! use serde::{Deserialize, Serialize};
//! use underway::{task::RetryPolicy, Job, To};
//! use underway::{job::StepOptions, task::RetryPolicy, Job, To};
//!
//! #[derive(Serialize, Deserialize)]
//! struct Step1 {
Expand All @@ -310,6 +310,9 @@
//! new: usize,
//! };
//!
//! let single_attempt_policy = RetryPolicy::builder().max_attempts(1).build();
//! let max_15_seconds_policy = RetryPolicy::builder().max_interval_ms(15_000).build();
//!
//! let job_builder = Job::<_, ()>::builder()
//! .step(|_cx, Step1 { n }| async move {
//! println!("Got {n}");
Expand All @@ -318,12 +321,20 @@
//! new: n + 2,
//! })
//! })
//! .retry_policy(RetryPolicy::builder().max_attempts(1).build())
//! .options(
//! StepOptions::builder()
//! .retry_policy(single_attempt_policy)
//! .build(),
//! )
//! .step(|_cx, Step2 { original, new }| async move {
//! println!("Was {original} now is {new}");
//! To::done()
//! })
//! .retry_policy(RetryPolicy::builder().max_interval_ms(15_000).build());
//! .options(
//! StepOptions::builder()
//! .retry_policy(max_15_seconds_policy)
//! .build(),
//! );
//! ```
//!
//! # Enqueuing jobs
Expand Down Expand Up @@ -649,6 +660,7 @@ use tracing::instrument;
use ulid::Ulid;
use uuid::Uuid;

pub use self::step_options::StepOptions;
use crate::{
queue::{Error as QueueError, InProgressTask, Queue},
scheduler::{Error as SchedulerError, Scheduler, ZonedSchedule},
Expand All @@ -657,6 +669,7 @@ use crate::{
},
worker::{Error as WorkerError, Worker},
};
mod step_options;

type Result<T = ()> = std::result::Result<T, Error>;

Expand Down Expand Up @@ -734,7 +747,7 @@ pub struct Context<S> {
pub queue_name: String,
}

type StepConfig<S> = (Box<dyn StepExecutor<S>>, RetryPolicy);
type StepConfig<S> = (Box<dyn StepExecutor<S>>, StepOptions);

mod sealed {
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -1713,9 +1726,49 @@ where

fn retry_policy(&self) -> RetryPolicy {
let current_index = self.current_index.load(Ordering::SeqCst);
let (_, retry_policy) = self.steps[current_index];
let (_, StepOptions { retry_policy, .. }) = self.steps[current_index];
retry_policy
}

fn timeout(&self) -> Span {
let current_index = self.current_index.load(Ordering::SeqCst);
let (_, StepOptions { timeout, .. }) = self.steps[current_index];
timeout
}

fn ttl(&self) -> Span {
let current_index = self.current_index.load(Ordering::SeqCst);
let (_, StepOptions { ttl, .. }) = self.steps[current_index];
ttl
}

fn heartbeat(&self) -> Span {
let current_index = self.current_index.load(Ordering::SeqCst);
let (_, StepOptions { heartbeat, .. }) = self.steps[current_index];
heartbeat
}

fn delay(&self) -> Span {
let current_index = self.current_index.load(Ordering::SeqCst);
let (_, StepOptions { delay, .. }) = self.steps[current_index];
delay
}

#[inline]
fn concurrency_key(&self) -> Option<String> {
let current_index = self.current_index.load(Ordering::SeqCst);
let StepOptions {
concurrency_key, ..
} = self.steps[current_index].1.clone();

concurrency_key
}

fn priority(&self) -> i32 {
let current_index = self.current_index.load(Ordering::SeqCst);
let (_, StepOptions { priority, .. }) = self.steps[current_index];
priority
}
}

impl<I, S> Clone for Job<I, S>
Expand Down Expand Up @@ -1904,7 +1957,7 @@ mod builder_states {
/// Builder for constructing a `Job` with a sequence of steps.
pub struct Builder<I, O, S, B> {
builder_state: B,
steps: Vec<(Box<dyn StepExecutor<S>>, RetryPolicy)>,
steps: Vec<(Box<dyn StepExecutor<S>>, StepOptions)>,
_marker: PhantomData<(I, O, S)>,
}

Expand Down Expand Up @@ -1995,7 +2048,7 @@ impl<I, S> Builder<I, I, S, Initial> {
Fut: Future<Output = TaskResult<To<O>>> + Send + 'static,
{
let step_fn = StepFn::new(move |cx, input| Box::pin(func(cx, input)));
self.steps.push((Box::new(step_fn), RetryPolicy::default()));
self.steps.push((Box::new(step_fn), StepOptions::default()));

Builder {
builder_state: StepSet {
Expand Down Expand Up @@ -2050,7 +2103,7 @@ impl<I, S> Builder<I, I, S, StateSet<S>> {
Fut: Future<Output = TaskResult<To<O>>> + Send + 'static,
{
let step_fn = StepFn::new(move |cx, input| Box::pin(func(cx, input)));
self.steps.push((Box::new(step_fn), RetryPolicy::default()));
self.steps.push((Box::new(step_fn), StepOptions::default()));

Builder {
builder_state: StepSet {
Expand Down Expand Up @@ -2095,7 +2148,7 @@ impl<I, Current, S> Builder<I, Current, S, StepSet<Current, S>> {
Fut: Future<Output = TaskResult<To<New>>> + Send + 'static,
{
let step_fn = StepFn::new(move |cx, input| Box::pin(func(cx, input)));
self.steps.push((Box::new(step_fn), RetryPolicy::default()));
self.steps.push((Box::new(step_fn), StepOptions::default()));

Builder {
builder_state: StepSet {
Expand All @@ -2107,29 +2160,38 @@ impl<I, Current, S> Builder<I, Current, S, StepSet<Current, S>> {
}
}

/// Sets the retry policy of the previous step.
/// Sets options to override the `Task`s defaults.
///
/// Each step in a `Job` can have different options for the following:
///
/// This policy applies to the step immediately before the method. That
/// means that a retry policy may be defined for each step and each
/// step's policy may differ from the others.
/// - `retry_policy`: defines max attempts, backoff jitter and so on
/// - `heartbeat`: period to check for task liveness
/// - `timeout`: max amount of time for task to run
/// - `ttl` (time to live): how long will this task be kept in database
/// - `delay`: time amount to wait before task starts
/// - `concurrency_key`: control the level of concurrency for this task
/// - `priority`: as the name says, the priority amongst the tasks for
/// execution
///
/// # Example
///
/// ```rust
/// use underway::{task::RetryPolicy, Job, To};
/// use underway::{job::StepOptions, task::RetryPolicy, Job, To};
///
/// // Set a retry policy for the step.
/// let retry_policy = RetryPolicy::builder().max_attempts(15).build();
/// let job_builder = Job::<(), ()>::builder()
/// .step(|_cx, _| async move { To::done() })
/// .retry_policy(retry_policy);
/// .options(
/// StepOptions::builder()
/// .priority(2)
/// .retry_policy(retry_policy)
/// .build(),
/// );
/// ```
pub fn retry_policy(
mut self,
retry_policy: RetryPolicy,
) -> Builder<I, Current, S, StepSet<Current, S>> {
let (_, default_policy) = self.steps.last_mut().expect("Steps should not be empty");
*default_policy = retry_policy;
pub fn options(mut self, options: StepOptions) -> Builder<I, Current, S, StepSet<Current, S>> {
let (_, default_options) = self.steps.last_mut().expect("Steps should not be empty");
*default_options = options;

Builder {
builder_state: StepSet {
Expand Down Expand Up @@ -2372,6 +2434,7 @@ where
mod tests {
use std::sync::Mutex;

use jiff::{ToSpan, Unit};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;

Expand Down Expand Up @@ -2605,6 +2668,54 @@ mod tests {
Ok(())
}

#[sqlx::test]
async fn one_step_with_options(pool: PgPool) -> sqlx::Result<(), Error> {
let queue = Queue::builder()
.name("one_step_with_options")
.pool(pool.clone())
.build()
.await?;

let job = Job::builder()
.step(|_cx, ()| async move {
println!("Executing job with different timeout");
To::done()
})
.options(
StepOptions::builder()
.timeout(1.hour())
.heartbeat(15.seconds())
.concurrency_key("some_concurrency".to_string())
.build(),
)
.queue(queue.clone())
.build();

job.enqueue(&()).await?;

let pending_task = queue
.dequeue()
.await?
.expect("There should be an enqueued task");

assert_eq!(
pending_task.timeout.microseconds as f64,
1.hour().total(Unit::Microsecond).unwrap()
);

assert_eq!(
pending_task.heartbeat.microseconds as f64,
15.seconds().total(Unit::Microsecond).unwrap()
);

assert_eq!(
pending_task.concurrency_key,
Some("some_concurrency".to_string())
);

Ok(())
}

#[sqlx::test]
async fn one_step_enqueue(pool: PgPool) -> sqlx::Result<(), Error> {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
Expand Down Expand Up @@ -2789,12 +2900,18 @@ mod tests {
data: message.as_bytes().into(),
})
})
.retry_policy(step1_policy)
.options(StepOptions {
retry_policy: step1_policy,
..Default::default()
})
.step(|_cx, Step2 { data }| async move {
println!("Executing job with data: {data:?}");
To::done()
})
.retry_policy(step2_policy)
.options(StepOptions {
retry_policy: step2_policy,
..Default::default()
})
.queue(queue.clone())
.build();

Expand Down
Loading