From f1c56378132af477040f27dc2442a2d25e0eeace Mon Sep 17 00:00:00 2001 From: Max Countryman Date: Tue, 15 Jul 2025 15:35:35 -0700 Subject: [PATCH 1/2] refactor: share `Queue` via `Arc` This refactor improves ergonomics and internal consistency by wrapping all Queue instances in Arc. It simplifies the public API, avoids unnecessary clones, and enables more flexible, re-entrant usage patterns. Key changes: - Queue ownership: - `Job`, `Worker`, `Scheduler`, and `EnqueuedJob` now store `Arc>`. - `Worker::new` and `Scheduler::new` now take `Arc>` instead of owning the `Queue` directly. - Job-scoped helpers: - `Job::queue()` -> `Arc>` - `Job::worker()` -> `Worker` - `Job::scheduler()` -> `Scheduler` - These eliminate repetitive clone calls and centralize access to the underlying queue. - Borrow-based execution: - `Job::run(&self)` and `Job::start(&self)` now take `&self` instead of `self`. - Enables spawning multiple schedulers/workers for the same `Job` without moving ownership. - Removed wrappers: - `Job::run_worker` and `Job::run_scheduler` removed. - Use `job.worker().run().await` and `job.scheduler().run().await` instead. Rationale: - Simpler public API: Consumers think in terms of jobs, not queues. The new helpers reduce boilerplate and expose a safer, more intuitive entry point. - Consistent shared state: Advisory locks and database handles are now consistently shared across job runners via `Arc`, reducing the risk of misuse. - Avoid unnecessary clones: Fewer `queue.clone()` calls, more efficient task spawning. - Supports concurrent orchestration: Borrow-based run/start methods allow `tokio::spawn(job.worker().run())` without moving the job. Migration: - Before: ```rust let queue = Queue::builder().build().await?; let job = Job::builder().queue(queue.clone()).build(); Worker::new(queue.clone(), job.clone()).run().await; ``` - After: ```rust let job = Job::builder().pool(pool).build().await?; job.worker().run().await; ``` Update any code accessing `queue` directly to instead call `job.queue()`. --- src/job.rs | 120 +++++++++++++++++++++++------------------------ src/queue.rs | 7 ++- src/scheduler.rs | 16 ++++--- src/worker.rs | 33 +++++++------ 4 files changed, 91 insertions(+), 85 deletions(-) diff --git a/src/job.rs b/src/job.rs index 4f35bfd..84cb238 100644 --- a/src/job.rs +++ b/src/job.rs @@ -568,19 +568,15 @@ //! # */ //! # //! -//! let queue = Queue::builder() +//! let job = Job::builder() +//! .step(|_cx, _: ()| async move { To::done() }) //! .name("example-job") //! .pool(pool) //! .build() //! .await?; //! -//! let job = Job::builder() -//! .step(|_cx, _: ()| async move { To::done() }) -//! .queue(queue.clone()) -//! .build(); -//! -//! let worker = Worker::new(queue.clone(), job.clone()); -//! let scheduler = Scheduler::new(queue, job); +//! let worker = job.worker(); +//! let scheduler = job.scheduler(); //! # Ok::<(), Box>(()) //! # }); //! # } @@ -655,11 +651,11 @@ use uuid::Uuid; use crate::{ queue::{Error as QueueError, InProgressTask, Queue}, - scheduler::{Error as SchedulerError, Result as SchedulerResult, Scheduler, ZonedSchedule}, + scheduler::{Error as SchedulerError, Scheduler, ZonedSchedule}, task::{ Error as TaskError, Result as TaskResult, RetryPolicy, State as TaskState, Task, TaskId, }, - worker::{Error as WorkerError, Result as WorkerResult, Worker}, + worker::{Error as WorkerError, Worker}, }; type Result = std::result::Result; @@ -862,7 +858,7 @@ impl Deref for JobId { /// This handle allows for manipulating the state of the job in the queue. pub struct EnqueuedJob { id: JobId, - queue: Queue, + queue: Arc>, } impl EnqueuedJob { @@ -952,7 +948,7 @@ where I: Sync + Send + 'static, S: Clone + Sync + Send + 'static, { - queue: JobQueue, + queue: Arc>, steps: Arc>>, state: S, current_index: Arc, @@ -1396,11 +1392,7 @@ where Ok(()) } - /// Constructs a worker which then immediately runs task processing. - /// - /// # Errors - /// - /// This has the same error conditions as [`Worker::run`]. + /// Returns this job's `Queue`. /// /// # Example /// @@ -1423,23 +1415,47 @@ where /// # */ /// # /// - /// job.run_worker().await?; + /// let job_queue = job.queue(); /// # Ok::<(), Box>(()) /// # }); /// # } - /// ``` - pub async fn run_worker(self) -> WorkerResult { - let queue = self.queue.clone(); - let job = self.clone(); - let worker = Worker::new(queue, job); - worker.run().await + pub fn queue(&self) -> Arc> { + Arc::clone(&self.queue) } - /// Constructs a worker which then immediately runs schedule processing. + /// Creates a `Worker` for this job. /// - /// # Errors + /// # Example /// - /// This has the same error conditions as [`Scheduler::run`]. + /// ```rust,no_run + /// # use sqlx::PgPool; + /// # use underway::{Job, To}; + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # let job = Job::<(), _>::builder() + /// # .step(|_cx, _| async move { To::done() }) + /// # .name("example") + /// # .pool(pool) + /// # .build() + /// # .await?; + /// # /* + /// let job = { /* A `Job`. */ }; + /// # */ + /// # + /// + /// let job_worker = job.worker(); + /// job_worker.run().await?; // Run the worker directly. + /// # Ok::<(), Box>(()) + /// # }); + /// # } + pub fn worker(&self) -> Worker { + Worker::new(self.queue(), self.clone()) + } + + /// Creates a `Scheduler` for this job. /// /// # Example /// @@ -1462,16 +1478,13 @@ where /// # */ /// # /// - /// job.run_scheduler().await?; + /// let job_scheduler = job.scheduler(); + /// job_scheduler.run().await?; // Run the scheduler directly. /// # Ok::<(), Box>(()) /// # }); /// # } - /// ``` - pub async fn run_scheduler(self) -> SchedulerResult { - let queue = self.queue.clone(); - let job = self.clone(); - let scheduler = Scheduler::new(queue, job); - scheduler.run().await + pub fn scheduler(&self) -> Scheduler { + Scheduler::new(self.queue(), self.clone()) } /// Runs both a worker and scheduler for the job. @@ -1508,12 +1521,9 @@ where /// # }); /// # } /// ``` - pub async fn run(self) -> Result { - let queue = self.queue.clone(); - let job = self.clone(); - - let worker = Worker::new(queue.clone(), job.clone()); - let scheduler = Scheduler::new(queue, job); + pub async fn run(&self) -> Result { + let worker = self.worker(); + let scheduler = self.scheduler(); let mut workers = JoinSet::new(); workers.spawn(async move { worker.run().await.map_err(Error::from) }); @@ -1567,17 +1577,13 @@ where /// # }); /// # } /// ``` - pub fn start(self) -> JobHandle { + pub fn start(&self) -> JobHandle { let shutdown_token = CancellationToken::new(); let mut workers = JoinSet::new(); - let queue = self.queue.clone(); - let job = self.clone(); - - let mut worker = Worker::new(queue.clone(), job.clone()); + let mut worker = self.worker(); worker.set_shutdown_token(shutdown_token.clone()); - - let mut scheduler = Scheduler::new(queue, job); + let mut scheduler = self.scheduler(); scheduler.set_shutdown_token(shutdown_token.clone()); // Spawn the tasks using `tokio::spawn` to decouple them from polling the @@ -2261,7 +2267,7 @@ where } = self.builder_state; let queue = Queue::builder().name(queue_name).pool(pool).build().await?; Ok(Job { - queue, + queue: Arc::new(queue), steps: Arc::new(self.steps), state, current_index: Arc::new(AtomicUsize::new(0)), @@ -2353,7 +2359,7 @@ where pub fn build(self) -> Job { let QueueSet { state, queue } = self.builder_state; Job { - queue, + queue: Arc::new(queue), steps: Arc::new(self.steps), state, current_index: Arc::new(AtomicUsize::new(0)), @@ -2685,7 +2691,7 @@ mod tests { .await?; job.enqueue(&()).await?; - let worker = Worker::new(job.queue.clone(), job.clone()); + let worker = Worker::new(job.queue(), job); // Process the first task. let task_id = worker.process_next_task().await?; @@ -2733,7 +2739,7 @@ mod tests { message: "Hello, world!".to_string(), }; job.enqueue(&input).await?; - let worker = Worker::new(queue.clone(), job.clone()); + let worker = Worker::new(job.queue(), job); // Process the first task. worker.process_next_task().await?; @@ -2829,10 +2835,7 @@ mod tests { .await?; // Process the task to ensure the next task is enqueued. - let worker = { - let worker_job = job.clone(); - Worker::new(queue.clone(), worker_job) - }; + let worker = { Worker::new(job.queue(), job) }; worker.process_next_task().await?; // Dequeue the second task. @@ -2897,7 +2900,7 @@ mod tests { message: "Hello, world!".to_string(), }; job.enqueue(&input).await?; - let worker = Worker::new(queue.clone(), job.clone()); + let worker = Worker::new(job.queue(), job); // Process the first task. worker.process_next_task().await?; @@ -2994,10 +2997,7 @@ mod tests { .await?; // Process the task to ensure the next task is enqueued. - let worker = { - let worker_job = job.clone(); - Worker::new(queue.clone(), worker_job) - }; + let worker = { Worker::new(job.queue(), job) }; worker.process_next_task().await?; // Dequeue the second task. diff --git a/src/queue.rs b/src/queue.rs index 368fd37..6afe6be 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -134,9 +134,8 @@ //! the scheduler can be run. //! //! As with task processing, jobs provide an interface for scheduling. For -//! example, the [`schedule`](crate::Job::schedule) method along with -//! [`run_scheduler`](crate::Job::run_scheduler) are often what you want to use. -//! Use these to set the schedule and run the scheduler, respectively. +//! example, the [`schedule`](crate::Job::schedule) method schedules the job. +//! Once scheduled, a scheduler must be run to ensure exeuction. //! //! Of course it's also possible to interface directly with the queue to achieve //! the same if desired. Schedules can be set with the @@ -180,7 +179,7 @@ //! let task = { /* A type that implements `Task`. */ }; //! # */ //! # let task = MyTask; -//! let scheduler = Scheduler::new(queue, task); +//! let scheduler = Scheduler::new(queue.into(), task); //! //! // Run a scheduler based on our configured schedule. //! scheduler.run().await?; diff --git a/src/scheduler.rs b/src/scheduler.rs index 583a533..4b3de87 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -44,8 +44,9 @@ pub enum Error { /// /// [advisory-lock]: https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS pub struct Scheduler { - queue: Queue, + queue: Arc>, queue_lock: PgAdvisoryLock, + task: Arc, // When this token is cancelled the queue has been shutdown. @@ -93,17 +94,18 @@ impl Scheduler { /// # */ /// # /// - /// Scheduler::new(queue, task); + /// Scheduler::new(queue.into(), task); /// # Ok::<(), Box>(()) /// # }); /// # } /// ``` - pub fn new(queue: Queue, task: T) -> Self { + pub fn new(queue: Arc>, task: T) -> Self { + let task = Arc::new(task); let queue_lock = queue_scheduler_lock(&queue.name); Self { queue, queue_lock, - task: Arc::new(task), + task, shutdown_token: CancellationToken::new(), } } @@ -140,7 +142,7 @@ impl Scheduler { /// # .build() /// # .await?; /// # let task = ExampleTask; - /// # let mut scheduler = Scheduler::new(queue, task); + /// # let mut scheduler = Scheduler::new(queue.into(), task); /// # /* /// let mut scheduler = { /* A `Scheduler`. */ }; /// # */ @@ -187,7 +189,7 @@ impl Scheduler { /// # .build() /// # .await?; /// # let task = ExampleTask; - /// # let scheduler = Scheduler::new(queue, task); + /// # let scheduler = Scheduler::new(queue.into(), task); /// # /* /// let scheduler = { /* A `Scheduler`. */ }; /// # */ @@ -242,7 +244,7 @@ impl Scheduler { /// # .build() /// # .await?; /// # let task = ExampleTask; - /// # let scheduler = Scheduler::new(queue, task); + /// # let scheduler = Scheduler::new(queue.into(), task); /// # /* /// let scheduler = { /* A `Scheduler`. */ }; /// # */ diff --git a/src/worker.rs b/src/worker.rs index 5a58819..51201e0 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -58,7 +58,7 @@ //! # let task = MyTask; //! //! // Create a new worker from the queue and task. -//! let worker = Worker::new(queue, task); +//! let worker = Worker::new(queue.into(), task); //! //! // Run the worker. //! worker.run().await?; @@ -98,7 +98,7 @@ //! # .build() //! # .await?; //! # let task = MyTask; -//! # let worker = Worker::new(queue, task); +//! # let worker = Worker::new(queue.into(), task); //! // Spin up a number of workers to process tasks concurrently. //! for _ in 0..4 { //! let task_worker = worker.clone(); @@ -170,7 +170,7 @@ pub enum Error { /// A worker that's generic over the task it processes. #[derive(Debug)] pub struct Worker { - queue: Queue, + queue: Arc>, task: Arc, // Limits the number of concurrent `Task::execute` invocations this worker will be allowed. @@ -183,8 +183,8 @@ pub struct Worker { impl Clone for Worker { fn clone(&self) -> Self { Self { - queue: self.queue.clone(), - task: self.task.clone(), + queue: Arc::clone(&self.queue), + task: Arc::clone(&self.task), concurrency_limit: self.concurrency_limit, shutdown_token: self.shutdown_token.clone(), } @@ -232,15 +232,16 @@ impl Worker { /// # */ /// # /// - /// Worker::new(queue, task); + /// Worker::new(queue.into(), task); /// # Ok::<(), Box>(()) /// # }); /// # } /// ``` - pub fn new(queue: Queue, task: T) -> Self { + pub fn new(queue: Arc>, task: T) -> Self { + let task = Arc::new(task); Self { queue, - task: Arc::new(task), + task, concurrency_limit: num_cpus::get(), shutdown_token: CancellationToken::new(), } @@ -278,7 +279,7 @@ impl Worker { /// # .build() /// # .await?; /// # let task = ExampleTask; - /// # let mut worker = Worker::new(queue, task); + /// # let mut worker = Worker::new(queue.into(), task); /// # /* /// let mut worker = { /* A `Worker`. */ }; /// # */ @@ -326,7 +327,7 @@ impl Worker { /// # .build() /// # .await?; /// # let task = ExampleTask; - /// # let mut worker = Worker::new(queue, task); + /// # let mut worker = Worker::new(queue.into(), task); /// # /* /// let mut worker = { /* A `Worker`. */ }; /// # */ @@ -377,7 +378,7 @@ impl Worker { /// # .build() /// # .await?; /// # let task = ExampleTask; - /// # let worker = Worker::new(queue, task); + /// # let worker = Worker::new(queue.into(), task); /// # /* /// let worker = { /* A `Worker`. */ }; /// # */ @@ -436,7 +437,7 @@ impl Worker { /// # .build() /// # .await?; /// # let task = ExampleTask; - /// # let worker = Worker::new(queue, task); + /// # let worker = Worker::new(queue.into(), task); /// # /* /// let worker = { /* A `Worker`. */ }; /// # */ @@ -489,7 +490,7 @@ impl Worker { /// # .build() /// # .await?; /// # let task = ExampleTask; - /// # let worker = Worker::new(queue, task); + /// # let worker = Worker::new(queue.into(), task); /// # /* /// let worker = { /* A `Worker`. */ }; /// # */ @@ -704,7 +705,7 @@ impl Worker { /// # .build() /// # .await?; /// # let task = ExampleTask; - /// # let worker = Worker::new(queue, task); + /// # let worker = Worker::new(queue.into(), task); /// # /* /// let worker = { /* A `Worker`. */ }; /// # */ @@ -975,6 +976,7 @@ mod tests { let task_id = queue.enqueue(&pool, &task, &()).await?; // Process the task. + let queue = Arc::new(queue); let worker = Worker::new(queue.clone(), task); let processed_task_id = worker .process_next_task() @@ -1009,6 +1011,7 @@ mod tests { let task = FailingTask { fail_times: fail_times.clone(), }; + let queue = Arc::new(queue); let worker = Worker::new(queue.clone(), task.clone()); // Enqueue the task @@ -1071,6 +1074,7 @@ mod tests { .await?; // Start workers before queuing tasks + let queue = Arc::new(queue); let worker = Worker::new(queue.clone(), LongRunningTask); for _ in 0..2 { let worker = worker.clone(); @@ -1160,6 +1164,7 @@ mod tests { let task_id = queue.enqueue(&pool, &SleepTask, &()).await?; // Start the worker + let queue = Arc::new(queue); let worker = Worker::new(queue.clone(), SleepTask); let worker_handle = tokio::spawn(async move { worker.run_every(1.second()).await }); From bcf274a639ffc986cf103565b390a2a49d870a13 Mon Sep 17 00:00:00 2001 From: Max Countryman Date: Wed, 16 Jul 2025 07:49:07 -0700 Subject: [PATCH 2/2] clean up tests a bit --- src/job.rs | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/src/job.rs b/src/job.rs index 84cb238..3287b58 100644 --- a/src/job.rs +++ b/src/job.rs @@ -2691,10 +2691,9 @@ mod tests { .await?; job.enqueue(&()).await?; - let worker = Worker::new(job.queue(), job); // Process the first task. - let task_id = worker.process_next_task().await?; + let task_id = job.worker().process_next_task().await?; assert!(task_id.is_some()); @@ -2739,10 +2738,9 @@ mod tests { message: "Hello, world!".to_string(), }; job.enqueue(&input).await?; - let worker = Worker::new(job.queue(), job); // Process the first task. - worker.process_next_task().await?; + job.worker().process_next_task().await?; // Inspect the second task. let pending_task = queue @@ -2835,8 +2833,7 @@ mod tests { .await?; // Process the task to ensure the next task is enqueued. - let worker = { Worker::new(job.queue(), job) }; - worker.process_next_task().await?; + job.worker().process_next_task().await?; // Dequeue the second task. let Some(dequeued_task) = queue.dequeue().await? else { @@ -2900,10 +2897,9 @@ mod tests { message: "Hello, world!".to_string(), }; job.enqueue(&input).await?; - let worker = Worker::new(job.queue(), job); // Process the first task. - worker.process_next_task().await?; + job.worker().process_next_task().await?; // Inspect the second task. let pending_task = queue @@ -2997,8 +2993,7 @@ mod tests { .await?; // Process the task to ensure the next task is enqueued. - let worker = { Worker::new(job.queue(), job) }; - worker.process_next_task().await?; + job.worker().process_next_task().await?; // Dequeue the second task. let Some(dequeued_task) = queue.dequeue().await? else {