From 05f020a11b0a190fb5ea9208099eb97adbdbed34 Mon Sep 17 00:00:00 2001 From: Max Countryman Date: Fri, 26 Dec 2025 10:03:01 -0800 Subject: [PATCH 1/2] Introduce lease tokens for task fencing --- migrations/20250212120000_4.sql | 11 + src/job.rs | 21 +- src/queue.rs | 463 +++++++++++++++++++++++--------- 3 files changed, 359 insertions(+), 136 deletions(-) create mode 100644 migrations/20250212120000_4.sql diff --git a/migrations/20250212120000_4.sql b/migrations/20250212120000_4.sql new file mode 100644 index 0000000..d2a052b --- /dev/null +++ b/migrations/20250212120000_4.sql @@ -0,0 +1,11 @@ +-- Force anything running this migration to use the right search path. +set local search_path to underway; + +alter table underway.task +add column if not exists lease_token uuid; + +alter table underway.task +add column if not exists lease_expires_at timestamp with time zone; + +alter table underway.task_attempt +add column if not exists lease_token uuid; diff --git a/src/job.rs b/src/job.rs index 3287b58..8fc3c7d 100644 --- a/src/job.rs +++ b/src/job.rs @@ -652,9 +652,7 @@ use uuid::Uuid; use crate::{ queue::{Error as QueueError, InProgressTask, Queue}, scheduler::{Error as SchedulerError, Scheduler, ZonedSchedule}, - task::{ - Error as TaskError, Result as TaskResult, RetryPolicy, State as TaskState, Task, TaskId, - }, + task::{Error as TaskError, Result as TaskResult, RetryPolicy, State as TaskState, Task}, worker::{Error as WorkerError, Worker}, }; @@ -907,25 +905,26 @@ impl EnqueuedJob { /// ``` pub async fn cancel(&self) -> Result { let mut tx = self.queue.pool.begin().await?; - let in_progress_tasks = sqlx::query_as!( - InProgressTask, + let in_progress_tasks = sqlx::query_as::<_, InProgressTask>( r#" select - id as "id: TaskId", - task_queue_name as "queue_name", + id as id, + task_queue_name as queue_name, input, - retry_policy as "retry_policy: RetryPolicy", + retry_policy as retry_policy, timeout, heartbeat, - concurrency_key + concurrency_key, + 0::int as attempt_number, + lease_token from underway.task where input->>'job_id' = $1 and state = $2 for update skip locked "#, - self.id.to_string(), - TaskState::Pending as TaskState ) + .bind(self.id.to_string()) + .bind(TaskState::Pending as TaskState) .fetch_all(&mut *tx) .await?; diff --git a/src/queue.rs b/src/queue.rs index ff58e45..6548733 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -806,8 +806,8 @@ impl Queue { // "in-progress". let mut tx = self.pool.begin().await?; - let in_progress_task = sqlx::query_as!( - InProgressTask, + let lease_token = Uuid::new_v4(); + let mut in_progress_task = sqlx::query_as::<_, InProgressTask>( r#" with available_task as ( select id @@ -820,7 +820,7 @@ impl Queue { or ( state = $3 -- Has heartbeat stalled? - and last_heartbeat_at < now() - heartbeat + and (lease_expires_at < now() or lease_expires_at is null) -- Are there remaining retries? and (retry_policy).max_attempts > ( select count(*) @@ -841,27 +841,32 @@ impl Queue { update underway.task t set state = $3, last_attempt_at = now(), - last_heartbeat_at = now() + last_heartbeat_at = now(), + lease_token = $4, + lease_expires_at = now() + heartbeat from available_task where t.task_queue_name = $1 and t.id = available_task.id returning - t.id as "id: TaskId", - t.task_queue_name as "queue_name", + t.id as id, + t.task_queue_name as queue_name, t.input, t.timeout, t.heartbeat, - t.retry_policy as "retry_policy: RetryPolicy", - t.concurrency_key + t.retry_policy as retry_policy, + t.concurrency_key, + 0::int as attempt_number, + t.lease_token "#, - self.name, - TaskState::Pending as TaskState, - TaskState::InProgress as TaskState, ) + .bind(&self.name) + .bind(TaskState::Pending as TaskState) + .bind(TaskState::InProgress as TaskState) + .bind(lease_token) .fetch_optional(&mut *tx) .await?; - if let Some(in_progress_task) = &in_progress_task { + if let Some(in_progress_task) = &mut in_progress_task { let task_id = in_progress_task.id; tracing::Span::current().record("task.id", task_id.as_hyphenated().to_string()); @@ -869,7 +874,7 @@ impl Queue { // // This ensures that if a stuck task was selected, we also update its attempt // row. - sqlx::query!( + sqlx::query( r#" update underway.task_attempt set state = $3 @@ -877,16 +882,16 @@ impl Queue { and task_queue_name = $2 and state = $4 "#, - task_id as TaskId, - self.name, - TaskState::Failed as TaskState, - TaskState::InProgress as TaskState ) + .bind(task_id) + .bind(&self.name) + .bind(TaskState::Failed as TaskState) + .bind(TaskState::InProgress as TaskState) .execute(&mut *tx) .await?; // Insert a new task attempt row - sqlx::query!( + let attempt_number = sqlx::query_scalar::<_, i32>( r#" with next_attempt as ( select coalesce(max(attempt_number) + 1, 1) as attempt_number @@ -898,21 +903,27 @@ impl Queue { task_id, task_queue_name, state, - attempt_number + attempt_number, + lease_token ) values ( $1, $2, $3, - (select attempt_number from next_attempt) + (select attempt_number from next_attempt), + $4 ) + returning attempt_number "#, - task_id as TaskId, - self.name, - TaskState::InProgress as TaskState ) - .execute(&mut *tx) + .bind(task_id) + .bind(&self.name) + .bind(TaskState::InProgress as TaskState) + .bind(in_progress_task.lease_token) + .fetch_one(&mut *tx) .await?; + + in_progress_task.attempt_number = attempt_number; } tx.commit().await?; @@ -1188,12 +1199,23 @@ pub struct InProgressTask { pub(crate) heartbeat: sqlx::postgres::types::PgInterval, pub(crate) retry_policy: RetryPolicy, pub(crate) concurrency_key: Option, + pub(crate) attempt_number: i32, + pub(crate) lease_token: Option, } impl InProgressTask { pub(crate) async fn mark_succeeded(&self, conn: &mut PgConnection) -> Result { + let Some(lease_token) = self.lease_token else { + tracing::warn!( + task.id = %self.id.as_hyphenated(), + attempt_number = self.attempt_number, + "Skipping success update without lease token" + ); + return Ok(()); + }; + // Update the task attempt row. - let result = sqlx::query!( + let result = sqlx::query( r#" update underway.task_attempt set state = $3, @@ -1201,53 +1223,71 @@ impl InProgressTask { completed_at = now() where task_id = $1 and task_queue_name = $2 - and attempt_number = ( - select attempt_number - from underway.task_attempt - where task_id = $1 - and task_queue_name = $2 - order by attempt_number desc - limit 1 - ) + and attempt_number = $4 + and state = $5 + and lease_token = $6 "#, - self.id as TaskId, - self.queue_name, - TaskState::Succeeded as TaskState ) + .bind(self.id) + .bind(&self.queue_name) + .bind(TaskState::Succeeded as TaskState) + .bind(self.attempt_number) + .bind(TaskState::InProgress as TaskState) + .bind(lease_token) .execute(&mut *conn) .await?; if result.rows_affected() == 0 { - return Err(Error::TaskNotFound(self.id)); + tracing::warn!( + task.id = %self.id.as_hyphenated(), + attempt_number = self.attempt_number, + "Skipping success update for non-current task attempt" + ); + return Ok(()); } // Update the task row. - let result = sqlx::query!( + let result = sqlx::query( r#" update underway.task set state = $2, updated_at = now(), - completed_at = now() + completed_at = now(), + lease_token = null, + lease_expires_at = null where id = $1 + and task_queue_name = $3 + and state = $4 + and lease_token = $5 "#, - self.id as TaskId, - TaskState::Succeeded as TaskState ) + .bind(self.id) + .bind(TaskState::Succeeded as TaskState) + .bind(&self.queue_name) + .bind(TaskState::InProgress as TaskState) + .bind(lease_token) .execute(&mut *conn) .await?; if result.rows_affected() == 0 { - return Err(Error::TaskNotFound(self.id)); + tracing::warn!( + task.id = %self.id.as_hyphenated(), + attempt_number = self.attempt_number, + "Skipping success update for non-current task attempt" + ); + return Ok(()); } Ok(()) } pub(crate) async fn mark_cancelled(&self, conn: &mut PgConnection) -> Result { + let lease_token = self.lease_token; + // Update task attempt row if one exists. // // N.B.: A task may be cancelled before an attempt row has been created. - sqlx::query!( + sqlx::query( r#" update underway.task_attempt set state = $3, @@ -1255,42 +1295,56 @@ impl InProgressTask { completed_at = now() where task_id = $1 and task_queue_name = $2 - and attempt_number = ( - select attempt_number - from underway.task_attempt - where task_id = $1 - and task_queue_name = $2 - and state < $4 - order by attempt_number desc - limit 1 + and attempt_number = $4 + and state < $5 + and ( + (lease_token = $6) + or (lease_token is null and $6 is null) ) "#, - self.id as TaskId, - self.queue_name, - TaskState::Cancelled as TaskState, - TaskState::Succeeded as TaskState ) + .bind(self.id) + .bind(&self.queue_name) + .bind(TaskState::Cancelled as TaskState) + .bind(self.attempt_number) + .bind(TaskState::Succeeded as TaskState) + .bind(lease_token) .execute(&mut *conn) .await?; // Update task row. - let result = sqlx::query!( + let result = sqlx::query( r#" update underway.task set state = $2, updated_at = now(), - completed_at = now() - where id = $1 and state < $3 + completed_at = now(), + lease_token = null, + lease_expires_at = null + where id = $1 + and task_queue_name = $3 + and state < $4 + and ( + (lease_token = $5) + or (lease_token is null and $5 is null) + ) "#, - self.id as TaskId, - TaskState::Cancelled as TaskState, - TaskState::Succeeded as TaskState ) + .bind(self.id) + .bind(TaskState::Cancelled as TaskState) + .bind(&self.queue_name) + .bind(TaskState::Succeeded as TaskState) + .bind(lease_token) .execute(&mut *conn) .await?; if result.rows_affected() == 0 { - return Err(Error::TaskNotFound(self.id)); + tracing::warn!( + task.id = %self.id.as_hyphenated(), + attempt_number = self.attempt_number, + "Skipping cancel update for non-current task attempt" + ); + return Ok(false); } Ok(result.rows_affected() > 0) @@ -1302,7 +1356,16 @@ impl InProgressTask { err )] pub(crate) async fn retry_after(&self, conn: &mut PgConnection, delay: Span) -> Result { - let result = sqlx::query!( + let Some(lease_token) = self.lease_token else { + tracing::warn!( + task.id = %self.id.as_hyphenated(), + attempt_number = self.attempt_number, + "Skipping retry update without lease token" + ); + return Ok(()); + }; + + let result = sqlx::query( r#" update underway.task_attempt set state = $3, @@ -1310,43 +1373,58 @@ impl InProgressTask { completed_at = now() where task_id = $1 and task_queue_name = $2 - and attempt_number = ( - select attempt_number - from underway.task_attempt - where task_id = $1 - and task_queue_name = $2 - order by attempt_number desc - limit 1 - ) + and attempt_number = $4 + and lease_token = $5 "#, - self.id as TaskId, - self.queue_name, - TaskState::Failed as TaskState ) + .bind(self.id) + .bind(&self.queue_name) + .bind(TaskState::Failed as TaskState) + .bind(self.attempt_number) + .bind(lease_token) .execute(&mut *conn) .await?; if result.rows_affected() == 0 { - return Err(Error::TaskNotFound(self.id)); + tracing::warn!( + task.id = %self.id.as_hyphenated(), + attempt_number = self.attempt_number, + "Skipping retry update for non-current task attempt" + ); + return Ok(()); } - let result = sqlx::query!( + let delay_duration = StdDuration::try_from(delay)?; + let delay_interval = sqlx::postgres::types::PgInterval::try_from(delay_duration) + .map_err(sqlx::Error::Decode)?; + let result = sqlx::query( r#" update underway.task set state = $3, delay = $2, - updated_at = now() + updated_at = now(), + lease_token = null, + lease_expires_at = null where id = $1 + and task_queue_name = $4 + and lease_token = $5 "#, - self.id as TaskId, - StdDuration::try_from(delay)? as _, - TaskState::Pending as TaskState ) + .bind(self.id) + .bind(delay_interval) + .bind(TaskState::Pending as TaskState) + .bind(&self.queue_name) + .bind(lease_token) .execute(&mut *conn) .await?; if result.rows_affected() == 0 { - return Err(Error::TaskNotFound(self.id)); + tracing::warn!( + task.id = %self.id.as_hyphenated(), + attempt_number = self.attempt_number, + "Skipping retry update for non-current task attempt" + ); + return Ok(()); } Ok(()) @@ -1357,7 +1435,16 @@ impl InProgressTask { conn: &mut PgConnection, error: &TaskError, ) -> Result { - let result = sqlx::query!( + let Some(lease_token) = self.lease_token else { + tracing::warn!( + task.id = %self.id.as_hyphenated(), + attempt_number = self.attempt_number, + "Skipping failure update without lease token" + ); + return Ok(()); + }; + + let result = sqlx::query( r#" update underway.task_attempt set state = $3, @@ -1365,50 +1452,71 @@ impl InProgressTask { error_message = $4 where task_id = $1 and task_queue_name = $2 - and attempt_number = ( - select attempt_number - from underway.task_attempt - where task_id = $1 - and task_queue_name = $2 - order by attempt_number desc - limit 1 - ) + and attempt_number = $5 + and state = $6 + and lease_token = $7 "#, - self.id as TaskId, - self.queue_name, - TaskState::Failed as TaskState, - error.to_string(), ) + .bind(self.id) + .bind(&self.queue_name) + .bind(TaskState::Failed as TaskState) + .bind(error.to_string()) + .bind(self.attempt_number) + .bind(TaskState::InProgress as TaskState) + .bind(lease_token) .execute(&mut *conn) .await?; if result.rows_affected() == 0 { - return Err(Error::TaskNotFound(self.id)); + tracing::warn!( + task.id = %self.id.as_hyphenated(), + attempt_number = self.attempt_number, + "Skipping failure update for non-current task attempt" + ); + return Ok(()); } - let result = sqlx::query!( + let result = sqlx::query( r#" update underway.task set updated_at = now() where id = $1 and task_queue_name = $2 + and state = $3 + and lease_token = $4 "#, - self.id as TaskId, - self.queue_name, ) + .bind(self.id) + .bind(&self.queue_name) + .bind(TaskState::InProgress as TaskState) + .bind(lease_token) .execute(&mut *conn) .await?; if result.rows_affected() == 0 { - return Err(Error::TaskNotFound(self.id)); + tracing::warn!( + task.id = %self.id.as_hyphenated(), + attempt_number = self.attempt_number, + "Skipping failure update for non-current task attempt" + ); + return Ok(()); } Ok(()) } pub(crate) async fn mark_failed(&self, conn: &mut PgConnection) -> Result { + let Some(lease_token) = self.lease_token else { + tracing::warn!( + task.id = %self.id.as_hyphenated(), + attempt_number = self.attempt_number, + "Skipping failed update without lease token" + ); + return Ok(()); + }; + // Update the task attempt row. - let result = sqlx::query!( + let result = sqlx::query( r#" update underway.task_attempt set state = $3, @@ -1416,43 +1524,57 @@ impl InProgressTask { completed_at = now() where task_id = $1 and task_queue_name = $2 - and attempt_number = ( - select attempt_number - from underway.task_attempt - where task_id = $1 - and task_queue_name = $2 - order by attempt_number desc - limit 1 - ) + and attempt_number = $4 + and lease_token = $5 "#, - self.id as TaskId, - self.queue_name, - TaskState::Failed as TaskState ) + .bind(self.id) + .bind(&self.queue_name) + .bind(TaskState::Failed as TaskState) + .bind(self.attempt_number) + .bind(lease_token) .execute(&mut *conn) .await?; if result.rows_affected() == 0 { - return Err(Error::TaskNotFound(self.id)); + tracing::warn!( + task.id = %self.id.as_hyphenated(), + attempt_number = self.attempt_number, + "Skipping failed update for non-current task attempt" + ); + return Ok(()); } // Update the task row. - let result = sqlx::query!( + let result = sqlx::query( r#" update underway.task set state = $2, updated_at = now(), - completed_at = now() + completed_at = now(), + lease_token = null, + lease_expires_at = null where id = $1 + and task_queue_name = $3 + and state = $4 + and lease_token = $5 "#, - self.id as TaskId, - TaskState::Failed as TaskState ) + .bind(self.id) + .bind(TaskState::Failed as TaskState) + .bind(&self.queue_name) + .bind(TaskState::InProgress as TaskState) + .bind(lease_token) .execute(&mut *conn) .await?; if result.rows_affected() == 0 { - return Err(Error::TaskNotFound(self.id)); + tracing::warn!( + task.id = %self.id.as_hyphenated(), + attempt_number = self.attempt_number, + "Skipping failed update for non-current task attempt" + ); + return Ok(()); } Ok(()) @@ -1482,20 +1604,44 @@ impl InProgressTask { where E: PgExecutor<'a>, { - sqlx::query!( + let Some(lease_token) = self.lease_token else { + tracing::warn!( + task.id = %self.id.as_hyphenated(), + attempt_number = self.attempt_number, + "Skipping heartbeat without lease token" + ); + return Ok(()); + }; + + let result = sqlx::query( r#" update underway.task set updated_at = now(), - last_heartbeat_at = now() + last_heartbeat_at = now(), + lease_expires_at = now() + heartbeat where id = $1 and task_queue_name = $2 + and state = $3 + and ( + lease_token = $4 + ) "#, - self.id as TaskId, - self.queue_name ) + .bind(self.id) + .bind(&self.queue_name) + .bind(TaskState::InProgress as TaskState) + .bind(lease_token) .execute(executor) .await?; + if result.rows_affected() == 0 { + tracing::warn!( + task.id = %self.id.as_hyphenated(), + attempt_number = self.attempt_number, + "Skipping heartbeat for non-current task attempt" + ); + } + Ok(()) } @@ -1860,7 +2006,7 @@ mod tests { use std::{collections::HashSet, path::PathBuf}; use serde_json::json; - use sqlx::{Postgres, Transaction}; + use sqlx::{Postgres, Row, Transaction}; use super::*; use crate::{task::Result as TaskResult, worker::pg_interval_to_span}; @@ -3306,14 +3452,14 @@ mod tests { assert_eq!(attempt_rows.len(), 1); // Set a stale heartbeat. - sqlx::query!( + sqlx::query( r#" update underway.task - set last_heartbeat_at = now() - interval '30 seconds' + set lease_expires_at = now() - interval '30 seconds' where id = $1 "#, - task_id as TaskId ) + .bind(task_id) .execute(&pool) .await?; @@ -3339,4 +3485,71 @@ mod tests { Ok(()) } + + #[sqlx::test] + async fn stale_attempt_cannot_finalize_task(pool: PgPool) -> sqlx::Result<(), Error> { + let queue = Queue::builder() + .name("stale_attempt_cannot_finalize_task") + .pool(pool.clone()) + .build() + .await?; + + let task_id = queue.enqueue(&pool, &TestTask, &json!("{}")).await?; + + let mut conn = pool.acquire().await?; + let in_progress_task = queue.dequeue().await?.expect("A task should be dequeued"); + + sqlx::query!( + r#" + update underway.task + set last_heartbeat_at = now() - interval '30 seconds' + where id = $1 + "#, + task_id as TaskId + ) + .execute(&pool) + .await?; + + let _new_in_progress_task = queue + .dequeue() + .await? + .expect("A stale task should be dequeued"); + + // Attempt to finalize the stale attempt; this should have no effect. + in_progress_task.mark_succeeded(&mut conn).await?; + + let task_row = sqlx::query( + r#" + select state + from underway.task + where id = $1 + "#, + ) + .bind(task_id) + .fetch_one(&pool) + .await?; + + let task_state: TaskState = task_row.try_get("state")?; + assert_eq!(task_state, TaskState::InProgress); + + let attempt_rows = sqlx::query( + r#" + select attempt_number, state + from underway.task_attempt + where task_id = $1 + order by attempt_number + "#, + ) + .bind(task_id) + .fetch_all(&pool) + .await?; + + assert_eq!(attempt_rows.len(), 2); + let first_state: TaskState = attempt_rows[0].try_get("state")?; + let second_state: TaskState = attempt_rows[1].try_get("state")?; + assert_eq!(first_state, TaskState::Failed); + assert_eq!(second_state, TaskState::InProgress); + + Ok(()) + } } From 97d89e647df83cb6e43e54f15d25eacd8ff7b2bb Mon Sep 17 00:00:00 2001 From: Max Countryman Date: Fri, 26 Dec 2025 10:29:01 -0800 Subject: [PATCH 2/2] Restore sqlx macros for fenced queries --- src/job.rs | 17 +++-- src/queue.rs | 204 +++++++++++++++++++++++++-------------------------- 2 files changed, 110 insertions(+), 111 deletions(-) diff --git a/src/job.rs b/src/job.rs index 8fc3c7d..57b9896 100644 --- a/src/job.rs +++ b/src/job.rs @@ -905,26 +905,27 @@ impl EnqueuedJob { /// ``` pub async fn cancel(&self) -> Result { let mut tx = self.queue.pool.begin().await?; - let in_progress_tasks = sqlx::query_as::<_, InProgressTask>( + let in_progress_tasks = sqlx::query_as!( + InProgressTask, r#" select - id as id, - task_queue_name as queue_name, + id as "id: TaskId", + task_queue_name as "queue_name", input, - retry_policy as retry_policy, + retry_policy as "retry_policy: RetryPolicy", timeout, heartbeat, concurrency_key, - 0::int as attempt_number, - lease_token + 0::int as "attempt_number!", + lease_token as "lease_token?" from underway.task where input->>'job_id' = $1 and state = $2 for update skip locked "#, + self.id.to_string(), + TaskState::Pending as TaskState ) - .bind(self.id.to_string()) - .bind(TaskState::Pending as TaskState) .fetch_all(&mut *tx) .await?; diff --git a/src/queue.rs b/src/queue.rs index 6548733..67a9ff4 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -807,7 +807,8 @@ impl Queue { let mut tx = self.pool.begin().await?; let lease_token = Uuid::new_v4(); - let mut in_progress_task = sqlx::query_as::<_, InProgressTask>( + let mut in_progress_task = sqlx::query_as!( + InProgressTask, r#" with available_task as ( select id @@ -848,21 +849,21 @@ impl Queue { where t.task_queue_name = $1 and t.id = available_task.id returning - t.id as id, - t.task_queue_name as queue_name, + t.id as "id: TaskId", + t.task_queue_name as "queue_name", t.input, t.timeout, t.heartbeat, - t.retry_policy as retry_policy, + t.retry_policy as "retry_policy: RetryPolicy", t.concurrency_key, - 0::int as attempt_number, - t.lease_token + 0::int as "attempt_number!", + t.lease_token as "lease_token?" "#, + self.name, + TaskState::Pending as TaskState, + TaskState::InProgress as TaskState, + lease_token, ) - .bind(&self.name) - .bind(TaskState::Pending as TaskState) - .bind(TaskState::InProgress as TaskState) - .bind(lease_token) .fetch_optional(&mut *tx) .await?; @@ -874,7 +875,7 @@ impl Queue { // // This ensures that if a stuck task was selected, we also update its attempt // row. - sqlx::query( + sqlx::query!( r#" update underway.task_attempt set state = $3 @@ -882,16 +883,16 @@ impl Queue { and task_queue_name = $2 and state = $4 "#, + task_id as TaskId, + self.name, + TaskState::Failed as TaskState, + TaskState::InProgress as TaskState ) - .bind(task_id) - .bind(&self.name) - .bind(TaskState::Failed as TaskState) - .bind(TaskState::InProgress as TaskState) .execute(&mut *tx) .await?; // Insert a new task attempt row - let attempt_number = sqlx::query_scalar::<_, i32>( + let attempt_number = sqlx::query_scalar!( r#" with next_attempt as ( select coalesce(max(attempt_number) + 1, 1) as attempt_number @@ -915,11 +916,11 @@ impl Queue { ) returning attempt_number "#, + task_id as TaskId, + self.name, + TaskState::InProgress as TaskState, + in_progress_task.lease_token ) - .bind(task_id) - .bind(&self.name) - .bind(TaskState::InProgress as TaskState) - .bind(in_progress_task.lease_token) .fetch_one(&mut *tx) .await?; @@ -1215,7 +1216,7 @@ impl InProgressTask { }; // Update the task attempt row. - let result = sqlx::query( + let result = sqlx::query!( r#" update underway.task_attempt set state = $3, @@ -1227,13 +1228,13 @@ impl InProgressTask { and state = $5 and lease_token = $6 "#, + self.id as TaskId, + self.queue_name, + TaskState::Succeeded as TaskState, + self.attempt_number, + TaskState::InProgress as TaskState, + lease_token ) - .bind(self.id) - .bind(&self.queue_name) - .bind(TaskState::Succeeded as TaskState) - .bind(self.attempt_number) - .bind(TaskState::InProgress as TaskState) - .bind(lease_token) .execute(&mut *conn) .await?; @@ -1247,7 +1248,7 @@ impl InProgressTask { } // Update the task row. - let result = sqlx::query( + let result = sqlx::query!( r#" update underway.task set state = $2, @@ -1260,12 +1261,12 @@ impl InProgressTask { and state = $4 and lease_token = $5 "#, + self.id as TaskId, + TaskState::Succeeded as TaskState, + self.queue_name, + TaskState::InProgress as TaskState, + lease_token ) - .bind(self.id) - .bind(TaskState::Succeeded as TaskState) - .bind(&self.queue_name) - .bind(TaskState::InProgress as TaskState) - .bind(lease_token) .execute(&mut *conn) .await?; @@ -1287,7 +1288,7 @@ impl InProgressTask { // Update task attempt row if one exists. // // N.B.: A task may be cancelled before an attempt row has been created. - sqlx::query( + sqlx::query!( r#" update underway.task_attempt set state = $3, @@ -1302,18 +1303,18 @@ impl InProgressTask { or (lease_token is null and $6 is null) ) "#, + self.id as TaskId, + self.queue_name, + TaskState::Cancelled as TaskState, + self.attempt_number, + TaskState::Succeeded as TaskState, + lease_token ) - .bind(self.id) - .bind(&self.queue_name) - .bind(TaskState::Cancelled as TaskState) - .bind(self.attempt_number) - .bind(TaskState::Succeeded as TaskState) - .bind(lease_token) .execute(&mut *conn) .await?; // Update task row. - let result = sqlx::query( + let result = sqlx::query!( r#" update underway.task set state = $2, @@ -1329,12 +1330,12 @@ impl InProgressTask { or (lease_token is null and $5 is null) ) "#, + self.id as TaskId, + TaskState::Cancelled as TaskState, + self.queue_name, + TaskState::Succeeded as TaskState, + lease_token ) - .bind(self.id) - .bind(TaskState::Cancelled as TaskState) - .bind(&self.queue_name) - .bind(TaskState::Succeeded as TaskState) - .bind(lease_token) .execute(&mut *conn) .await?; @@ -1365,7 +1366,7 @@ impl InProgressTask { return Ok(()); }; - let result = sqlx::query( + let result = sqlx::query!( r#" update underway.task_attempt set state = $3, @@ -1376,12 +1377,12 @@ impl InProgressTask { and attempt_number = $4 and lease_token = $5 "#, + self.id as TaskId, + self.queue_name, + TaskState::Failed as TaskState, + self.attempt_number, + lease_token ) - .bind(self.id) - .bind(&self.queue_name) - .bind(TaskState::Failed as TaskState) - .bind(self.attempt_number) - .bind(lease_token) .execute(&mut *conn) .await?; @@ -1397,7 +1398,7 @@ impl InProgressTask { let delay_duration = StdDuration::try_from(delay)?; let delay_interval = sqlx::postgres::types::PgInterval::try_from(delay_duration) .map_err(sqlx::Error::Decode)?; - let result = sqlx::query( + let result = sqlx::query!( r#" update underway.task set state = $3, @@ -1409,12 +1410,12 @@ impl InProgressTask { and task_queue_name = $4 and lease_token = $5 "#, + self.id as TaskId, + delay_interval, + TaskState::Pending as TaskState, + self.queue_name, + lease_token ) - .bind(self.id) - .bind(delay_interval) - .bind(TaskState::Pending as TaskState) - .bind(&self.queue_name) - .bind(lease_token) .execute(&mut *conn) .await?; @@ -1444,7 +1445,7 @@ impl InProgressTask { return Ok(()); }; - let result = sqlx::query( + let result = sqlx::query!( r#" update underway.task_attempt set state = $3, @@ -1456,14 +1457,14 @@ impl InProgressTask { and state = $6 and lease_token = $7 "#, + self.id as TaskId, + self.queue_name, + TaskState::Failed as TaskState, + error.to_string(), + self.attempt_number, + TaskState::InProgress as TaskState, + lease_token ) - .bind(self.id) - .bind(&self.queue_name) - .bind(TaskState::Failed as TaskState) - .bind(error.to_string()) - .bind(self.attempt_number) - .bind(TaskState::InProgress as TaskState) - .bind(lease_token) .execute(&mut *conn) .await?; @@ -1476,7 +1477,7 @@ impl InProgressTask { return Ok(()); } - let result = sqlx::query( + let result = sqlx::query!( r#" update underway.task set updated_at = now() @@ -1485,11 +1486,11 @@ impl InProgressTask { and state = $3 and lease_token = $4 "#, + self.id as TaskId, + self.queue_name, + TaskState::InProgress as TaskState, + lease_token ) - .bind(self.id) - .bind(&self.queue_name) - .bind(TaskState::InProgress as TaskState) - .bind(lease_token) .execute(&mut *conn) .await?; @@ -1516,7 +1517,7 @@ impl InProgressTask { }; // Update the task attempt row. - let result = sqlx::query( + let result = sqlx::query!( r#" update underway.task_attempt set state = $3, @@ -1527,12 +1528,12 @@ impl InProgressTask { and attempt_number = $4 and lease_token = $5 "#, + self.id as TaskId, + self.queue_name, + TaskState::Failed as TaskState, + self.attempt_number, + lease_token ) - .bind(self.id) - .bind(&self.queue_name) - .bind(TaskState::Failed as TaskState) - .bind(self.attempt_number) - .bind(lease_token) .execute(&mut *conn) .await?; @@ -1546,7 +1547,7 @@ impl InProgressTask { } // Update the task row. - let result = sqlx::query( + let result = sqlx::query!( r#" update underway.task set state = $2, @@ -1559,12 +1560,12 @@ impl InProgressTask { and state = $4 and lease_token = $5 "#, + self.id as TaskId, + TaskState::Failed as TaskState, + self.queue_name, + TaskState::InProgress as TaskState, + lease_token ) - .bind(self.id) - .bind(TaskState::Failed as TaskState) - .bind(&self.queue_name) - .bind(TaskState::InProgress as TaskState) - .bind(lease_token) .execute(&mut *conn) .await?; @@ -1613,7 +1614,7 @@ impl InProgressTask { return Ok(()); }; - let result = sqlx::query( + let result = sqlx::query!( r#" update underway.task set updated_at = now(), @@ -1626,11 +1627,11 @@ impl InProgressTask { lease_token = $4 ) "#, + self.id as TaskId, + self.queue_name, + TaskState::InProgress as TaskState, + lease_token ) - .bind(self.id) - .bind(&self.queue_name) - .bind(TaskState::InProgress as TaskState) - .bind(lease_token) .execute(executor) .await?; @@ -2006,7 +2007,7 @@ mod tests { use std::{collections::HashSet, path::PathBuf}; use serde_json::json; - use sqlx::{Postgres, Row, Transaction}; + use sqlx::{Postgres, Transaction}; use super::*; use crate::{task::Result as TaskResult, worker::pg_interval_to_span}; @@ -3452,14 +3453,14 @@ mod tests { assert_eq!(attempt_rows.len(), 1); // Set a stale heartbeat. - sqlx::query( + sqlx::query!( r#" update underway.task set lease_expires_at = now() - interval '30 seconds' where id = $1 "#, + task_id as TaskId ) - .bind(task_id) .execute(&pool) .await?; @@ -3518,37 +3519,34 @@ mod tests { // Attempt to finalize the stale attempt; this should have no effect. in_progress_task.mark_succeeded(&mut conn).await?; - let task_row = sqlx::query( + let task_row = sqlx::query!( r#" - select state + select state as "state: TaskState" from underway.task where id = $1 "#, + task_id as TaskId ) - .bind(task_id) .fetch_one(&pool) .await?; - let task_state: TaskState = task_row.try_get("state")?; - assert_eq!(task_state, TaskState::InProgress); + assert_eq!(task_row.state, TaskState::InProgress); - let attempt_rows = sqlx::query( + let attempt_rows = sqlx::query!( r#" - select attempt_number, state + select attempt_number, state as "state: TaskState" from underway.task_attempt where task_id = $1 order by attempt_number "#, + task_id as TaskId ) - .bind(task_id) .fetch_all(&pool) .await?; assert_eq!(attempt_rows.len(), 2); - let first_state: TaskState = attempt_rows[0].try_get("state")?; - let second_state: TaskState = attempt_rows[1].try_get("state")?; - assert_eq!(first_state, TaskState::Failed); - assert_eq!(second_state, TaskState::InProgress); + assert_eq!(attempt_rows[0].state, TaskState::Failed); + assert_eq!(attempt_rows[1].state, TaskState::InProgress); Ok(()) }