Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,7 @@ where
I: Sync + Send + 'static,
S: Clone + Sync + Send + 'static,
{
queue: JobQueue<I, S>,
pub(crate) queue: JobQueue<I, S>,
steps: Arc<Vec<StepConfig<S>>>,
state: S,
current_index: Arc<AtomicUsize>,
Expand Down Expand Up @@ -3118,4 +3118,24 @@ mod tests {

Ok(())
}

#[sqlx::test]
async fn into_worker_and_into_scheduler(pool: PgPool) -> sqlx::Result<(), Error> {
let queue = Queue::builder()
.name("into_worker_queue")
.pool(pool.clone())
.build()
.await?;

let job = Job::builder()
.step(|_cx, _: ()| async move { To::done() })
.queue(queue.clone())
.build();

// Ensure it compiles
let _: Worker<Job<(), ()>> = job.clone().into();
let _: Scheduler<Job<(), ()>> = job.into();

Ok(())
}
}
36 changes: 35 additions & 1 deletion src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tracing::instrument;

use crate::{
queue::{shutdown_channel, try_acquire_advisory_lock, Error as QueueError},
Queue, Task,
Job, Queue, Task,
};

pub(crate) type Result<T = ()> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -52,6 +52,17 @@ pub struct Scheduler<T: Task> {
shutdown_token: CancellationToken,
}

impl<I, S> From<Job<I, S>> for Scheduler<Job<I, S>>
where
I: Sync + Send + 'static,
S: Clone + Send + Sync + 'static,
{
fn from(job: Job<I, S>) -> Self {
let q = job.queue.clone();
Self::new(q, job.clone())
}
}

impl<T: Task> Scheduler<T> {
/// Creates a new scheduler with the given queue and task.
///
Expand Down Expand Up @@ -429,8 +440,10 @@ mod tests {
use std::time::SystemTime;

use jiff::ToSpan;
use sqlx::PgPool;

use super::*;
use crate::To;

#[test]
fn zoned_schedule_creation_valid() {
Expand Down Expand Up @@ -527,4 +540,25 @@ mod tests {
"Expected immediate return"
);
}

#[sqlx::test]
async fn from_job(pool: PgPool) -> sqlx::Result<(), Error> {
let queue = Queue::builder()
.name("into_worker_queue")
.pool(pool.clone())
.build()
.await?;

let job = Job::builder()
.step(|_cx, _: ()| async move { To::done() })
.queue(queue.clone())
.build();

// Ensure it compiles
let s: Scheduler<Job<(), ()>> = Scheduler::from(job);

assert_eq!(s.queue.name, queue.name);

Ok(())
}
}
34 changes: 34 additions & 0 deletions src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ use tracing::instrument;
use crate::{
queue::{shutdown_channel, Error as QueueError, InProgressTask, Queue},
task::{Error as TaskError, RetryCount, RetryPolicy, Task, TaskId},
Job,
};
pub(crate) type Result<T = ()> = std::result::Result<T, Error>;

Expand Down Expand Up @@ -191,6 +192,17 @@ impl<T: Task> Clone for Worker<T> {
}
}

impl<I, S> From<Job<I, S>> for Worker<Job<I, S>>
where
I: Sync + Send + 'static,
S: Clone + Send + Sync + 'static,
{
fn from(job: Job<I, S>) -> Self {
let q = job.queue.clone();
Self::new(q, job.clone())
}
}

impl<T: Task + Sync> Worker<T> {
/// Creates a new worker with the given queue and task.
///
Expand Down Expand Up @@ -921,6 +933,7 @@ mod tests {
use crate::{
queue::graceful_shutdown,
task::{Result as TaskResult, State as TaskState},
To,
};

struct TestTask;
Expand Down Expand Up @@ -1251,4 +1264,25 @@ mod tests {

Ok(())
}

#[sqlx::test]
async fn from_job(pool: PgPool) -> sqlx::Result<(), Error> {
let queue = Queue::builder()
.name("into_worker_queue")
.pool(pool.clone())
.build()
.await?;

let job = Job::builder()
.step(|_cx, _: ()| async move { To::done() })
.queue(queue.clone())
.build();

// Ensure it compiles
let w: Worker<Job<(), ()>> = Worker::from(job);

assert_eq!(w.queue.name, queue.name);

Ok(())
}
}