diff --git a/src/job.rs b/src/job.rs index 3287b58..730ea37 100644 --- a/src/job.rs +++ b/src/job.rs @@ -297,7 +297,7 @@ //! //! ```rust //! use serde::{Deserialize, Serialize}; -//! use underway::{task::RetryPolicy, Job, To}; +//! use underway::{job::StepOptions, task::RetryPolicy, Job, To}; //! //! #[derive(Serialize, Deserialize)] //! struct Step1 { @@ -310,6 +310,9 @@ //! new: usize, //! }; //! +//! let single_attempt_policy = RetryPolicy::builder().max_attempts(1).build(); +//! let max_15_seconds_policy = RetryPolicy::builder().max_interval_ms(15_000).build(); +//! //! let job_builder = Job::<_, ()>::builder() //! .step(|_cx, Step1 { n }| async move { //! println!("Got {n}"); @@ -318,12 +321,20 @@ //! new: n + 2, //! }) //! }) -//! .retry_policy(RetryPolicy::builder().max_attempts(1).build()) +//! .options( +//! StepOptions::builder() +//! .retry_policy(single_attempt_policy) +//! .build(), +//! ) //! .step(|_cx, Step2 { original, new }| async move { //! println!("Was {original} now is {new}"); //! To::done() //! }) -//! .retry_policy(RetryPolicy::builder().max_interval_ms(15_000).build()); +//! .options( +//! StepOptions::builder() +//! .retry_policy(max_15_seconds_policy) +//! .build(), +//! ); //! ``` //! //! # Enqueuing jobs @@ -649,6 +660,7 @@ use tracing::instrument; use ulid::Ulid; use uuid::Uuid; +pub use self::step_options::StepOptions; use crate::{ queue::{Error as QueueError, InProgressTask, Queue}, scheduler::{Error as SchedulerError, Scheduler, ZonedSchedule}, @@ -657,6 +669,7 @@ use crate::{ }, worker::{Error as WorkerError, Worker}, }; +mod step_options; type Result = std::result::Result; @@ -734,7 +747,7 @@ pub struct Context { pub queue_name: String, } -type StepConfig = (Box>, RetryPolicy); +type StepConfig = (Box>, StepOptions); mod sealed { use serde::{Deserialize, Serialize}; @@ -1713,9 +1726,49 @@ where fn retry_policy(&self) -> RetryPolicy { let current_index = self.current_index.load(Ordering::SeqCst); - let (_, retry_policy) = self.steps[current_index]; + let (_, StepOptions { retry_policy, .. }) = self.steps[current_index]; retry_policy } + + fn timeout(&self) -> Span { + let current_index = self.current_index.load(Ordering::SeqCst); + let (_, StepOptions { timeout, .. }) = self.steps[current_index]; + timeout + } + + fn ttl(&self) -> Span { + let current_index = self.current_index.load(Ordering::SeqCst); + let (_, StepOptions { ttl, .. }) = self.steps[current_index]; + ttl + } + + fn heartbeat(&self) -> Span { + let current_index = self.current_index.load(Ordering::SeqCst); + let (_, StepOptions { heartbeat, .. }) = self.steps[current_index]; + heartbeat + } + + fn delay(&self) -> Span { + let current_index = self.current_index.load(Ordering::SeqCst); + let (_, StepOptions { delay, .. }) = self.steps[current_index]; + delay + } + + #[inline] + fn concurrency_key(&self) -> Option { + let current_index = self.current_index.load(Ordering::SeqCst); + let StepOptions { + concurrency_key, .. + } = self.steps[current_index].1.clone(); + + concurrency_key + } + + fn priority(&self) -> i32 { + let current_index = self.current_index.load(Ordering::SeqCst); + let (_, StepOptions { priority, .. }) = self.steps[current_index]; + priority + } } impl Clone for Job @@ -1904,7 +1957,7 @@ mod builder_states { /// Builder for constructing a `Job` with a sequence of steps. pub struct Builder { builder_state: B, - steps: Vec<(Box>, RetryPolicy)>, + steps: Vec<(Box>, StepOptions)>, _marker: PhantomData<(I, O, S)>, } @@ -1995,7 +2048,7 @@ impl Builder { Fut: Future>> + Send + 'static, { let step_fn = StepFn::new(move |cx, input| Box::pin(func(cx, input))); - self.steps.push((Box::new(step_fn), RetryPolicy::default())); + self.steps.push((Box::new(step_fn), StepOptions::default())); Builder { builder_state: StepSet { @@ -2050,7 +2103,7 @@ impl Builder> { Fut: Future>> + Send + 'static, { let step_fn = StepFn::new(move |cx, input| Box::pin(func(cx, input))); - self.steps.push((Box::new(step_fn), RetryPolicy::default())); + self.steps.push((Box::new(step_fn), StepOptions::default())); Builder { builder_state: StepSet { @@ -2095,7 +2148,7 @@ impl Builder> { Fut: Future>> + Send + 'static, { let step_fn = StepFn::new(move |cx, input| Box::pin(func(cx, input))); - self.steps.push((Box::new(step_fn), RetryPolicy::default())); + self.steps.push((Box::new(step_fn), StepOptions::default())); Builder { builder_state: StepSet { @@ -2107,29 +2160,38 @@ impl Builder> { } } - /// Sets the retry policy of the previous step. + /// Sets options to override the `Task`s defaults. + /// + /// Each step in a `Job` can have different options for the following: /// - /// This policy applies to the step immediately before the method. That - /// means that a retry policy may be defined for each step and each - /// step's policy may differ from the others. + /// - `retry_policy`: defines max attempts, backoff jitter and so on + /// - `heartbeat`: period to check for task liveness + /// - `timeout`: max amount of time for task to run + /// - `ttl` (time to live): how long will this task be kept in database + /// - `delay`: time amount to wait before task starts + /// - `concurrency_key`: control the level of concurrency for this task + /// - `priority`: as the name says, the priority amongst the tasks for + /// execution /// /// # Example /// /// ```rust - /// use underway::{task::RetryPolicy, Job, To}; + /// use underway::{job::StepOptions, task::RetryPolicy, Job, To}; /// /// // Set a retry policy for the step. /// let retry_policy = RetryPolicy::builder().max_attempts(15).build(); /// let job_builder = Job::<(), ()>::builder() /// .step(|_cx, _| async move { To::done() }) - /// .retry_policy(retry_policy); + /// .options( + /// StepOptions::builder() + /// .priority(2) + /// .retry_policy(retry_policy) + /// .build(), + /// ); /// ``` - pub fn retry_policy( - mut self, - retry_policy: RetryPolicy, - ) -> Builder> { - let (_, default_policy) = self.steps.last_mut().expect("Steps should not be empty"); - *default_policy = retry_policy; + pub fn options(mut self, options: StepOptions) -> Builder> { + let (_, default_options) = self.steps.last_mut().expect("Steps should not be empty"); + *default_options = options; Builder { builder_state: StepSet { @@ -2372,6 +2434,7 @@ where mod tests { use std::sync::Mutex; + use jiff::{ToSpan, Unit}; use serde::{Deserialize, Serialize}; use sqlx::PgPool; @@ -2605,6 +2668,54 @@ mod tests { Ok(()) } + #[sqlx::test] + async fn one_step_with_options(pool: PgPool) -> sqlx::Result<(), Error> { + let queue = Queue::builder() + .name("one_step_with_options") + .pool(pool.clone()) + .build() + .await?; + + let job = Job::builder() + .step(|_cx, ()| async move { + println!("Executing job with different timeout"); + To::done() + }) + .options( + StepOptions::builder() + .timeout(1.hour()) + .heartbeat(15.seconds()) + .concurrency_key("some_concurrency".to_string()) + .build(), + ) + .queue(queue.clone()) + .build(); + + job.enqueue(&()).await?; + + let pending_task = queue + .dequeue() + .await? + .expect("There should be an enqueued task"); + + assert_eq!( + pending_task.timeout.microseconds as f64, + 1.hour().total(Unit::Microsecond).unwrap() + ); + + assert_eq!( + pending_task.heartbeat.microseconds as f64, + 15.seconds().total(Unit::Microsecond).unwrap() + ); + + assert_eq!( + pending_task.concurrency_key, + Some("some_concurrency".to_string()) + ); + + Ok(()) + } + #[sqlx::test] async fn one_step_enqueue(pool: PgPool) -> sqlx::Result<(), Error> { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] @@ -2789,12 +2900,18 @@ mod tests { data: message.as_bytes().into(), }) }) - .retry_policy(step1_policy) + .options(StepOptions { + retry_policy: step1_policy, + ..Default::default() + }) .step(|_cx, Step2 { data }| async move { println!("Executing job with data: {data:?}"); To::done() }) - .retry_policy(step2_policy) + .options(StepOptions { + retry_policy: step2_policy, + ..Default::default() + }) .queue(queue.clone()) .build(); diff --git a/src/job/step_options.rs b/src/job/step_options.rs new file mode 100644 index 0000000..83ed718 --- /dev/null +++ b/src/job/step_options.rs @@ -0,0 +1,223 @@ +use jiff::{Span, ToSpan}; + +use crate::task::RetryPolicy; + +/// Options for changing the defaults of each step in a `Job`. +/// +/// These options are used as default implementations of the `Task` trait. +/// +/// # Example +/// +/// ```rust +/// use jiff::ToSpan; +/// use underway::job::StepOptions; +/// +/// let options = StepOptions::builder() +/// .priority(1) +/// .timeout(30.minutes()) +/// .build(); +/// ``` +#[derive(Debug, Clone, PartialEq)] +pub struct StepOptions { + pub(crate) retry_policy: RetryPolicy, + pub(crate) timeout: Span, + pub(crate) ttl: Span, + pub(crate) heartbeat: Span, + pub(crate) delay: Span, + pub(crate) concurrency_key: Option, + pub(crate) priority: i32, +} + +impl Default for StepOptions { + fn default() -> Self { + Self { + retry_policy: Default::default(), + timeout: 15.minutes(), + ttl: 14.days(), + heartbeat: 30.seconds(), + delay: Span::new(), + concurrency_key: None, + priority: 0, + } + } +} + +impl StepOptions { + /// Create a new builder. + /// + /// # Example + /// + /// ```rust + /// use jiff::ToSpan; + /// use underway::{job::StepOptions, task::RetryPolicy}; + /// + /// let retry_policy = StepOptions::builder().timeout(1.hour()).build(); + /// ``` + pub fn builder() -> Builder { + Builder { + inner: StepOptions::default(), + } + } +} + +#[derive(Debug, Default)] +pub struct Builder { + inner: StepOptions, +} + +impl Builder { + /// Creates a new `Builder` with the default task options. + pub fn new() -> Self { + Self { + inner: StepOptions::default(), + } + } + + /// Sets the retry policy. + /// + /// # Example + /// + /// ```rust + /// use underway::{job::StepOptions, task::RetryPolicy}; + /// + /// // Set max attempts to two. + /// + /// let policy = RetryPolicy::builder().max_attempts(2).build(); + /// let options = StepOptions::builder().retry_policy(policy).build(); + /// ``` + pub const fn retry_policy(mut self, retry_policy: RetryPolicy) -> Self { + self.inner.retry_policy = retry_policy; + self + } + + /// Sets the timeout period for the task execution. + /// + /// Default value is 15 minutes. + /// + /// # Example + /// + /// ```rust + /// use jiff::ToSpan; + /// use underway::job::StepOptions; + /// + /// // Set the task timeout to 1 hour + /// let options = StepOptions::builder().timeout(1.hour()); + /// ``` + pub const fn timeout(mut self, timeout: Span) -> Self { + self.inner.timeout = timeout; + self + } + + /// Sets the TTL (time to live) period for the task to be kept in the + /// database. + /// + /// Default value is 14 days. + /// + /// # Example + /// + /// ```rust + /// use jiff::ToSpan; + /// use underway::job::StepOptions; + /// + /// // Set the task TTL to 4 days + /// let options = StepOptions::builder().ttl(4.days()); + /// ``` + pub const fn ttl(mut self, ttl: Span) -> Self { + self.inner.ttl = ttl; + self + } + + /// Sets the task heartbeat. + /// + /// Heartbeat is used to check the tasks liveness. + /// + /// Default value is 30 seconds. + /// + /// # Example + /// + /// ```rust + /// use jiff::ToSpan; + /// use underway::job::StepOptions; + /// + /// // Set the task heartbeat to 15 seconds + /// let options = StepOptions::builder().heartbeat(15.seconds()); + /// ``` + pub const fn heartbeat(mut self, heartbeat: Span) -> Self { + self.inner.heartbeat = heartbeat; + self + } + + /// Sets a delay for the task to wait before executing. + /// + /// Default value is 0. + /// + /// # Example + /// + /// ```rust + /// use jiff::ToSpan; + /// use underway::job::StepOptions; + /// + /// // Set a delay of 15 minutes for the task to begin + /// let options = StepOptions::builder().delay(15.minutes()); + /// ``` + pub const fn delay(mut self, delay: Span) -> Self { + self.inner.delay = delay; + self + } + + /// Sets a concurrency key that controls how many tasks run concurrently. + /// + /// Default value is None + /// + /// # Example + /// + /// ```rust + /// use jiff::ToSpan; + /// use underway::job::StepOptions; + /// + /// // Set the task TTL to 4 days + /// let options = StepOptions::builder().concurrency_key("key".to_string()); + /// ``` + pub fn concurrency_key(mut self, concurrenty_key: String) -> Self { + self.inner.concurrency_key = Some(concurrenty_key); + self + } + + /// Sets the task priority. + /// + /// Task priority makes polling for new tasks configurable. + /// + /// Default value is priority 0 . + /// + /// # Example + /// + /// ```rust + /// use jiff::ToSpan; + /// use underway::job::StepOptions; + /// + /// // Set the task priority to 1 + /// let options = StepOptions::builder().priority(1); + /// ``` + pub const fn priority(mut self, priority: i32) -> Self { + self.inner.priority = priority; + self + } + + /// Builds the `StepOptions` with the configured parameters. + /// + /// # Example + /// + /// ```rust + /// use jiff::ToSpan; + /// use underway::job::StepOptions; + /// + /// // Build a custom step options + /// let options = StepOptions::builder() + /// .priority(1) + /// .timeout(45.minutes()) + /// .build(); + /// ``` + pub fn build(self) -> StepOptions { + self.inner + } +}