diff --git a/.sqlx/query-3b0fccfab61f95863ef5a2e5c4ca4a888ce3c14e404ace182b985b8a16999f71.json b/.sqlx/query-5fd3913bd5d12b5571141da1d5fc6ddf0bd6d1af82bccf97b2b39827daf0e4ca.json similarity index 85% rename from .sqlx/query-3b0fccfab61f95863ef5a2e5c4ca4a888ce3c14e404ace182b985b8a16999f71.json rename to .sqlx/query-5fd3913bd5d12b5571141da1d5fc6ddf0bd6d1af82bccf97b2b39827daf0e4ca.json index 3196f3e..41c8653 100644 --- a/.sqlx/query-3b0fccfab61f95863ef5a2e5c4ca4a888ce3c14e404ace182b985b8a16999f71.json +++ b/.sqlx/query-5fd3913bd5d12b5571141da1d5fc6ddf0bd6d1af82bccf97b2b39827daf0e4ca.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "UPDATE apalis.jobs\nSET \n status = 'Running',\n lock_at = now(),\n lock_by = $2\nWHERE \n status = 'Queued'\n AND run_at < now()\n AND id = ANY($1)\nRETURNING *;\n", + "query": "UPDATE apalis.jobs\nSET \n status = 'Running',\n lock_at = now(),\n lock_by = $2\nWHERE \n (status = 'Pending' OR status = 'Queued' OR (status = 'Failed' AND attempts < max_attempts))\n AND run_at < now()\n AND id = ANY($1)\nRETURNING *;\n", "describe": { "columns": [ { @@ -91,5 +91,5 @@ true ] }, - "hash": "3b0fccfab61f95863ef5a2e5c4ca4a888ce3c14e404ace182b985b8a16999f71" + "hash": "5fd3913bd5d12b5571141da1d5fc6ddf0bd6d1af82bccf97b2b39827daf0e4ca" } diff --git a/examples/basic.rs b/examples/basic.rs index 7cb8de8..24021f3 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use apalis::prelude::*; +use apalis::{layers::retry::RetryPolicy, prelude::*}; use apalis_postgres::*; use futures::stream::{self, StreamExt}; @@ -24,12 +24,18 @@ async fn main() { .take(10); backend.push_all(&mut items).await.unwrap(); - async fn send_reminder(_item: usize, _wrk: WorkerContext) -> Result<(), BoxDynError> { + async fn send_reminder(item: usize, wrk: WorkerContext) -> Result<(), BoxDynError> { + if item % 3 == 0 { + println!("Reminding about item: {} but failing", item); + return Err("Failed to send reminder".into()); + } + println!("Reminding about item: {}", item); Ok(()) } let worker = WorkerBuilder::new("worker-1") .backend(backend) + .retry(RetryPolicy::retries(1)) .build(send_reminder); worker.run().await.unwrap(); } diff --git a/queries/task/lock_by_id.sql b/queries/task/lock_by_id.sql index a62350f..0315032 100644 --- a/queries/task/lock_by_id.sql +++ b/queries/task/lock_by_id.sql @@ -1,10 +1,17 @@ -UPDATE apalis.jobs -SET +UPDATE + apalis.jobs +SET status = 'Running', lock_at = now(), lock_by = $2 -WHERE - status = 'Queued' +WHERE + ( + status = 'Pending' + OR status = 'Queued' + OR ( + status = 'Failed' + AND attempts < max_attempts + ) + ) AND run_at < now() - AND id = ANY($1) -RETURNING *; + AND id = ANY($1) RETURNING *; diff --git a/src/ack.rs b/src/ack.rs index 300e82e..1d5bcec 100644 --- a/src/ack.rs +++ b/src/ack.rs @@ -1,4 +1,5 @@ use apalis_core::{ + error::AbortError, error::BoxDynError, layers::{Layer, Service}, task::{Parts, status::Status}, @@ -148,7 +149,9 @@ where }; let fut = self.inner.call(req); async move { - lock_task(&pool, &task_id, &worker_id).await.unwrap(); + lock_task(&pool, &task_id, &worker_id) + .await + .map_err(AbortError::new)?; fut.await.map_err(|e| e.into()) } .boxed() diff --git a/src/queries/task/ack.sql b/src/queries/task/ack.sql deleted file mode 100644 index f4a3dbd..0000000 --- a/src/queries/task/ack.sql +++ /dev/null @@ -1,10 +0,0 @@ -UPDATE - apalis.jobs -SET - status = $4, - attempts = $2, - last_error = $3, - done_at = NOW() -WHERE - id = $1 - AND lock_by = $5 diff --git a/src/queries/task/fetch_next.sql b/src/queries/task/fetch_next.sql deleted file mode 100644 index 84754e9..0000000 --- a/src/queries/task/fetch_next.sql +++ /dev/null @@ -1,4 +0,0 @@ -SELECT - * -FROM - apalis.get_jobs($1, $2, $3) diff --git a/src/queries/task/lock_by_id.sql b/src/queries/task/lock_by_id.sql deleted file mode 100644 index 1f2358c..0000000 --- a/src/queries/task/lock_by_id.sql +++ /dev/null @@ -1,10 +0,0 @@ -UPDATE apalis.jobs -SET - status = 'Running', - lock_at = now(), - lock_by = $2 -WHERE - (status = 'Pending' OR (status = 'Failed' AND attempts < max_attempts)) - AND run_at < now() - AND id = ANY($1) -RETURNING *; diff --git a/src/queries/task/sink.sql b/src/queries/task/sink.sql deleted file mode 100644 index 8722dd5..0000000 --- a/src/queries/task/sink.sql +++ /dev/null @@ -1,20 +0,0 @@ -INSERT INTO - apalis.jobs ( - id, - job_type, - job, - status, - attempts, - max_attempts, - run_at, - priority - ) -SELECT - unnest($1::text[]) as id, - $2::text as job_type, - unnest($3::jsonb[]) as job, - 'Pending' as status, - 0 as attempts, - unnest($4::integer []) as max_attempts, - unnest($5::timestamptz []) as run_at, - unnest($6::integer []) as priority diff --git a/src/queries/worker/register.sql b/src/queries/worker/register.sql deleted file mode 100644 index 22ab4e7..0000000 --- a/src/queries/worker/register.sql +++ /dev/null @@ -1,12 +0,0 @@ -INSERT INTO - apalis.workers (id, worker_type, storage_name, layers, last_seen) -VALUES - ($1, $2, $3, $4, $5) ON CONFLICT (id) DO -UPDATE -SET - worker_type = EXCLUDED.worker_type, - storage_name = EXCLUDED.storage_name, - layers = EXCLUDED.layers, - last_seen = NOW() -WHERE - pg_try_advisory_lock(hashtext(workers.id));