diff --git a/src/job.rs b/src/job.rs index 25863a3..49c5b70 100644 --- a/src/job.rs +++ b/src/job.rs @@ -933,7 +933,7 @@ where I: Sync + Send + 'static, S: Clone + Sync + Send + 'static, { - queue: JobQueue, + pub(crate) queue: JobQueue, steps: Arc>>, state: S, current_index: Arc, @@ -3118,4 +3118,24 @@ mod tests { Ok(()) } + + #[sqlx::test] + async fn into_worker_and_into_scheduler(pool: PgPool) -> sqlx::Result<(), Error> { + let queue = Queue::builder() + .name("into_worker_queue") + .pool(pool.clone()) + .build() + .await?; + + let job = Job::builder() + .step(|_cx, _: ()| async move { To::done() }) + .queue(queue.clone()) + .build(); + + // Ensure it compiles + let _: Worker> = job.clone().into(); + let _: Scheduler> = job.into(); + + Ok(()) + } } diff --git a/src/scheduler.rs b/src/scheduler.rs index 583a533..b6e5003 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -9,7 +9,7 @@ use tracing::instrument; use crate::{ queue::{shutdown_channel, try_acquire_advisory_lock, Error as QueueError}, - Queue, Task, + Job, Queue, Task, }; pub(crate) type Result = std::result::Result; @@ -52,6 +52,17 @@ pub struct Scheduler { shutdown_token: CancellationToken, } +impl From> for Scheduler> +where + I: Sync + Send + 'static, + S: Clone + Send + Sync + 'static, +{ + fn from(job: Job) -> Self { + let q = job.queue.clone(); + Self::new(q, job.clone()) + } +} + impl Scheduler { /// Creates a new scheduler with the given queue and task. /// @@ -429,8 +440,10 @@ mod tests { use std::time::SystemTime; use jiff::ToSpan; + use sqlx::PgPool; use super::*; + use crate::To; #[test] fn zoned_schedule_creation_valid() { @@ -527,4 +540,25 @@ mod tests { "Expected immediate return" ); } + + #[sqlx::test] + async fn from_job(pool: PgPool) -> sqlx::Result<(), Error> { + let queue = Queue::builder() + .name("into_worker_queue") + .pool(pool.clone()) + .build() + .await?; + + let job = Job::builder() + .step(|_cx, _: ()| async move { To::done() }) + .queue(queue.clone()) + .build(); + + // Ensure it compiles + let s: Scheduler> = Scheduler::from(job); + + assert_eq!(s.queue.name, queue.name); + + Ok(()) + } } diff --git a/src/worker.rs b/src/worker.rs index 5a58819..821d637 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -139,6 +139,7 @@ use tracing::instrument; use crate::{ queue::{shutdown_channel, Error as QueueError, InProgressTask, Queue}, task::{Error as TaskError, RetryCount, RetryPolicy, Task, TaskId}, + Job, }; pub(crate) type Result = std::result::Result; @@ -191,6 +192,17 @@ impl Clone for Worker { } } +impl From> for Worker> +where + I: Sync + Send + 'static, + S: Clone + Send + Sync + 'static, +{ + fn from(job: Job) -> Self { + let q = job.queue.clone(); + Self::new(q, job.clone()) + } +} + impl Worker { /// Creates a new worker with the given queue and task. /// @@ -921,6 +933,7 @@ mod tests { use crate::{ queue::graceful_shutdown, task::{Result as TaskResult, State as TaskState}, + To, }; struct TestTask; @@ -1251,4 +1264,25 @@ mod tests { Ok(()) } + + #[sqlx::test] + async fn from_job(pool: PgPool) -> sqlx::Result<(), Error> { + let queue = Queue::builder() + .name("into_worker_queue") + .pool(pool.clone()) + .build() + .await?; + + let job = Job::builder() + .step(|_cx, _: ()| async move { To::done() }) + .queue(queue.clone()) + .build(); + + // Ensure it compiles + let w: Worker> = Worker::from(job); + + assert_eq!(w.queue.name, queue.name); + + Ok(()) + } }