From ee5b1c39a16220d254754a0b89f6b1334e87c8f1 Mon Sep 17 00:00:00 2001 From: geofmureithi Date: Fri, 28 Nov 2025 10:24:27 +0300 Subject: [PATCH 01/10] bump: to v1.0.0-beta.2 --- Cargo.toml | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 58b6c52..6f9906c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "apalis-postgres" -version = "1.0.0-beta.1" +version = "1.0.0-beta.2" authors = ["Njuguna Mureithi "] edition = "2024" repository = "https://github.com/apalis-dev/apalis-postgres" @@ -22,19 +22,19 @@ tokio-comp = ["tokio", "sqlx/runtime-tokio-rustls"] tokio-comp-native-tls = ["tokio", "sqlx/runtime-tokio-native-tls"] [dependencies] -apalis-core = { version = "1.0.0-beta.1", default-features = false, features = [ +apalis-core = { version = "1.0.0-beta.2", default-features = false, features = [ "sleep", "json", ] } -apalis-sql = { version = "1.0.0-beta.1", default-features = false } -serde = { version = "1", features = ["derive"] } -chrono = { version = "0.4", features = ["serde"] } +apalis-sql = { version = "1.0.0-beta.2", default-features = false } +serde = { version = "1", features = ["derive"], default-features = false } +chrono = { version = "0.4", features = ["serde"], default-features = false } pin-project = "1.1.10" serde_json = "1" futures = "0.3.30" thiserror = "2" -tokio = { version = "1", features = ["rt", "net"], optional = true } -async-std = { version = "1.13.0", optional = true } +tokio = { version = "1", features = ["rt", "net"], optional = true, default-features = false} +async-std = { version = "1.13.0", optional = true, default-features = false} ulid = { version = "1", features = ["serde"] } @@ -46,6 +46,6 @@ features = ["chrono", "postgres", "json"] [dev-dependencies] tokio = { version = "1", features = ["macros", "rt-multi-thread"] } once_cell = "1.19.0" -apalis = { version = "1.0.0-beta.1" } -apalis-workflow = { version = "0.1.0-beta.1" } +apalis = { version = "1.0.0-beta.2" } +apalis-workflow = { version = "0.1.0-beta.2" } futures-util = "0.3.30" From 1eccdaa6676ae9c53ef21b1b1ac846de7e69be67 Mon Sep 17 00:00:00 2001 From: geofmureithi Date: Fri, 28 Nov 2025 10:31:40 +0300 Subject: [PATCH 02/10] chore: fix ci --- .github/workflows/ci.yml | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b92dfdc..25443c6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -12,8 +12,9 @@ env: DATABASE_URL: postgres://postgres:postgres@localhost/postgres jobs: - defaults: - runs-on: ubuntu-latest + test: + name: Test Suite + runs-on: ${{ matrix.os }} services: postgres: image: postgres:17 @@ -26,9 +27,6 @@ jobs: --health-interval=10s --health-timeout=5s --health-retries=5 - test: - name: Test Suite - runs-on: ${{ matrix.os }} strategy: matrix: os: [ubuntu-latest, windows-latest, macos-latest] From bd57abec85ea64f7c38830a29aa47c25f13200b9 Mon Sep 17 00:00:00 2001 From: geofmureithi Date: Fri, 28 Nov 2025 10:36:45 +0300 Subject: [PATCH 03/10] fix: doc checks --- .github/workflows/docs.yml | 2 +- src/lib.rs | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index b7a5e8e..0b36c28 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -18,7 +18,7 @@ jobs: uses: actions/checkout@v4 - name: Install Rust toolchain - uses: dtolnay/rust-toolchain@stable + uses: dtolnay/rust-toolchain@nightly - name: Cache cargo registry uses: actions/cache@v4 diff --git a/src/lib.rs b/src/lib.rs index f536e62..ef96822 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -80,6 +80,8 @@ pub struct PgNotify { ListWorkers => supported("List all workers registered with the backend", false), ListTasks => supported("List all tasks in the backend", false), }] +/// +/// [`SharedPostgresStorage`]: crate::shared::SharedPostgresStorage #[pin_project::pin_project] pub struct PostgresStorage< Args, From f4356d6ca3c697a7d12b413fb912ccda018f6bc5 Mon Sep 17 00:00:00 2001 From: geofmureithi Date: Fri, 28 Nov 2025 10:50:13 +0300 Subject: [PATCH 04/10] chore: cargo deny --- deny.toml | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/deny.toml b/deny.toml index c5b4063..f555596 100644 --- a/deny.toml +++ b/deny.toml @@ -7,13 +7,36 @@ ignore = [] [licenses] confidence-threshold = 0.8 -allow = ["Apache-2.0", "BSD-3-Clause", "MIT", "Unicode-3.0"] +allow = [ + "Apache-2.0", + "BSD-3-Clause", + "MIT", + "Unicode-3.0", + "Zlib", + "ISC", + "CDLA-Permissive-2.0", +] [bans] multiple-versions = "deny" highlight = "all" skip-tree = [ { name = "windows_x86_64_msvc" }, + { name = "windows_x86_64_gnullvm" }, + { name = "windows_x86_64_gnu" }, + { name = "windows_i686_msvc" }, + { name = "windows_i686_gnullvm" }, + { name = "windows_i686_gnu" }, + { name = "windows_aarch64_msvc" }, + { name = "windows_aarch64_gnullvm" }, + { name = "windows-targets" }, + { name = "windows-sys" }, + # TODO: @geofmureithi Investigate why these are causing issues possibly due to sqlx dependencies + { name = "getrandom" }, + { name = "hashbrown" }, + { name = "rand" }, + { name = "rand_chacha" }, + { name = "rand_core" }, ] [sources] From ae205e1fcb7e12e576a6e9dce01f89c01f983193 Mon Sep 17 00:00:00 2001 From: geofmureithi Date: Fri, 28 Nov 2025 11:00:29 +0300 Subject: [PATCH 05/10] chore: cargo audit --- .cargo/audit.toml | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 .cargo/audit.toml diff --git a/.cargo/audit.toml b/.cargo/audit.toml new file mode 100644 index 0000000..77b2027 --- /dev/null +++ b/.cargo/audit.toml @@ -0,0 +1,11 @@ +[advisories] +ignore = [ + # TODO(geofmureithi): track https://github.com/launchbadge/sqlx/issues/4082 + "RUSTSEC-2023-0071", + "RUSTSEC-2025-0052", + "RUSTSEC-2024-0384", +] + +[output] +quiet = false +deny = ["warnings"] From 3ae12d16150b4d58caaaf7c230780eaa4b03381e Mon Sep 17 00:00:00 2001 From: geofmureithi Date: Fri, 28 Nov 2025 11:05:47 +0300 Subject: [PATCH 06/10] add: spec doc --- examples/basic.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/examples/basic.rs b/examples/basic.rs index 24021f3..840c0d8 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -6,12 +6,14 @@ use futures::stream::{self, StreamExt}; #[tokio::main] async fn main() { - let pool = PgPool::connect(&std::env::var("DATABASE_URL").unwrap()) + let db = std::env::var("DATABASE_URL").unwrap(); + let pool = PgPool::connect(&db) .await .unwrap(); PostgresStorage::setup(&pool).await.unwrap(); let mut backend = PostgresStorage::new(&pool); + // Push some tasks as a stream let mut start = 0usize; let mut items = stream::repeat_with(move || { start += 1; From b9c8bc51b4e3ce003a3e5fa4572b6a11bfeb4551 Mon Sep 17 00:00:00 2001 From: geofmureithi Date: Fri, 28 Nov 2025 11:15:21 +0300 Subject: [PATCH 07/10] [PATCH] feat(tx): allow push tasks inside a transaction Co-authored-by: Victor Oliveira Nascimento --- src/sink.rs | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/src/sink.rs b/src/sink.rs index 6bb6035..fcfd15f 100644 --- a/src/sink.rs +++ b/src/sink.rs @@ -5,12 +5,12 @@ use std::{ }; use apalis_core::backend::codec::json::JsonCodec; -use chrono::DateTime; +use chrono::{DateTime, Utc}; use futures::{ FutureExt, Sink, TryFutureExt, future::{BoxFuture, Shared}, }; -use sqlx::PgPool; +use sqlx::{PgConnection, PgPool}; use ulid::Ulid; use crate::{CompactType, PgTask, PostgresStorage, config::Config}; @@ -39,11 +39,11 @@ impl Clone for PgSink { } } -pub async fn push_tasks( - pool: PgPool, +pub fn push_tasks( + conn: &mut PgConnection, cfg: Config, buffer: Vec>, -) -> Result<(), sqlx::Error> { +) -> impl futures::Future> + Send { let job_type = cfg.queue().to_string(); // Build the multi-row INSERT with UNNEST let mut ids = Vec::new(); @@ -61,10 +61,7 @@ pub async fn push_tasks( .unwrap_or(Ulid::new().to_string()), ); job_data.push(task.args); - run_ats.push( - DateTime::from_timestamp(task.parts.run_at as i64, 0) - .ok_or(sqlx::Error::ColumnNotFound("run_at".to_owned()))?, - ); + run_ats.push(DateTime::from_timestamp(task.parts.run_at as i64, 0).unwrap_or(Utc::now())); priorities.push(task.parts.ctx.priority()); max_attempts_vec.push(task.parts.ctx.max_attempts()); metadata.push(serde_json::Value::Object(task.parts.ctx.meta().clone())); @@ -80,9 +77,9 @@ pub async fn push_tasks( &priorities, &metadata ) - .execute(&pool) - .await?; - Ok(()) + .execute(&mut *conn) + .map_ok(|_| ()) + .boxed() } impl PgSink { @@ -125,11 +122,17 @@ where // Create the future only if we don't have one and there's work to do if this.sink.flush_future.is_none() && !this.sink.buffer.is_empty() { - let pool = this.pool.clone(); let config = this.config.clone(); let buffer = std::mem::take(&mut this.sink.buffer); - let sink_fut = push_tasks(pool, config, buffer).map_err(Arc::new); - this.sink.flush_future = Some(sink_fut.boxed().shared()); + let pool = this.sink.pool.clone(); + let fut = async move { + let mut conn = pool.acquire().map_err(Arc::new).await?; + push_tasks(&mut conn, config, buffer) + .map_err(Arc::new) + .await?; + Ok(()) + }; + this.sink.flush_future = Some(fut.boxed().shared()); } // Poll the existing future From d7984bac2d92479513cbe230e9f9a4435decf83f Mon Sep 17 00:00:00 2001 From: geofmureithi Date: Fri, 28 Nov 2025 11:25:21 +0300 Subject: [PATCH 08/10] chore: use txn in sink --- examples/basic.rs | 2 +- examples/pubsub.rs | 2 +- src/sink.rs | 18 +++++++++++------- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/examples/basic.rs b/examples/basic.rs index 840c0d8..da9a031 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -26,7 +26,7 @@ async fn main() { .take(10); backend.push_all(&mut items).await.unwrap(); - async fn send_reminder(item: usize, wrk: WorkerContext) -> Result<(), BoxDynError> { + async fn send_reminder(item: usize, _wrk: WorkerContext) -> Result<(), BoxDynError> { if item % 3 == 0 { println!("Reminding about item: {} but failing", item); return Err("Failed to send reminder".into()); diff --git a/examples/pubsub.rs b/examples/pubsub.rs index d85d22f..d0f6aa8 100644 --- a/examples/pubsub.rs +++ b/examples/pubsub.rs @@ -35,7 +35,7 @@ async fn main() { .take(20) .collect::>() .await; - apalis_postgres::sink::push_tasks(pool, config, items) + apalis_postgres::sink::push_tasks(&pool, config, items) .await .unwrap(); } diff --git a/src/sink.rs b/src/sink.rs index fcfd15f..2409fb5 100644 --- a/src/sink.rs +++ b/src/sink.rs @@ -10,7 +10,7 @@ use futures::{ FutureExt, Sink, TryFutureExt, future::{BoxFuture, Shared}, }; -use sqlx::{PgConnection, PgPool}; +use sqlx::{Executor, PgPool}; use ulid::Ulid; use crate::{CompactType, PgTask, PostgresStorage, config::Config}; @@ -39,11 +39,14 @@ impl Clone for PgSink { } } -pub fn push_tasks( - conn: &mut PgConnection, +pub fn push_tasks<'a, E>( + conn: E, cfg: Config, buffer: Vec>, -) -> impl futures::Future> + Send { +) -> impl futures::Future> + Send + 'a +where + E: Executor<'a, Database = sqlx::Postgres> + Send + 'a, +{ let job_type = cfg.queue().to_string(); // Build the multi-row INSERT with UNNEST let mut ids = Vec::new(); @@ -77,7 +80,7 @@ pub fn push_tasks( &priorities, &metadata ) - .execute(&mut *conn) + .execute(conn) .map_ok(|_| ()) .boxed() } @@ -126,10 +129,11 @@ where let buffer = std::mem::take(&mut this.sink.buffer); let pool = this.sink.pool.clone(); let fut = async move { - let mut conn = pool.acquire().map_err(Arc::new).await?; - push_tasks(&mut conn, config, buffer) + let mut conn = pool.begin().map_err(Arc::new).await?; + push_tasks(&mut *conn, config, buffer) .map_err(Arc::new) .await?; + conn.commit().map_err(Arc::new).await?; Ok(()) }; this.sink.flush_future = Some(fut.boxed().shared()); From 58faa3681fa2d4b7036b987efbdecc3263da21fb Mon Sep 17 00:00:00 2001 From: geofmureithi Date: Fri, 28 Nov 2025 11:26:49 +0300 Subject: [PATCH 09/10] chore: fmt --- examples/basic.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/examples/basic.rs b/examples/basic.rs index da9a031..9ed5403 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -7,9 +7,7 @@ use futures::stream::{self, StreamExt}; #[tokio::main] async fn main() { let db = std::env::var("DATABASE_URL").unwrap(); - let pool = PgPool::connect(&db) - .await - .unwrap(); + let pool = PgPool::connect(&db).await.unwrap(); PostgresStorage::setup(&pool).await.unwrap(); let mut backend = PostgresStorage::new(&pool); From 0d66566d02c80511391f2d032e325b948782e45b Mon Sep 17 00:00:00 2001 From: geofmureithi Date: Fri, 28 Nov 2025 11:38:26 +0300 Subject: [PATCH 10/10] improve CI --- .github/workflows/ci.yml | 38 +++++++++++--------------------- .github/workflows/docs.yml | 15 +++++++++++++ .github/workflows/pr-preview.yml | 28 ++++++++++++++++++++++- .github/workflows/release.yml | 15 ++++++++++++- 4 files changed, 69 insertions(+), 27 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 25443c6..b3e4808 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,7 +29,7 @@ jobs: --health-retries=5 strategy: matrix: - os: [ubuntu-latest, windows-latest, macos-latest] + os: [ubuntu-latest] rust: [stable, beta] include: - os: ubuntu-latest @@ -77,6 +77,18 @@ jobs: clippy: name: Clippy runs-on: ubuntu-latest + services: + postgres: + image: postgres:17 + env: + POSTGRES_PASSWORD: postgres + ports: + - 5432:5432 + options: >- + --health-cmd="pg_isready -U postgres" + --health-interval=10s + --health-timeout=5s + --health-retries=5 steps: - name: Checkout sources uses: actions/checkout@v4 @@ -98,30 +110,6 @@ jobs: - name: Run cargo clippy run: cargo clippy --all-targets --all-features - docs: - name: Documentation - runs-on: ubuntu-latest - steps: - - name: Checkout sources - uses: actions/checkout@v4 - - - name: Install Rust toolchain - uses: dtolnay/rust-toolchain@stable - - - name: Cache cargo registry - uses: actions/cache@v4 - with: - path: | - ~/.cargo/registry - ~/.cargo/git - target - key: ${{ runner.os }}-cargo-docs-${{ hashFiles('**/Cargo.lock') }} - - - name: Build documentation - run: cargo doc --all-features --no-deps --document-private-items - env: - RUSTDOCFLAGS: "-Dwarnings" - security: name: Security Audit runs-on: ubuntu-latest diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 0b36c28..d259a9d 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -8,11 +8,26 @@ on: env: CARGO_TERM_COLOR: always + RUST_BACKTRACE: 1 + DATABASE_URL: postgres://postgres:postgres@localhost/postgres jobs: + docs: name: Build and Test Documentation runs-on: ubuntu-latest + services: + postgres: + image: postgres:17 + env: + POSTGRES_PASSWORD: postgres + ports: + - 5432:5432 + options: >- + --health-cmd="pg_isready -U postgres" + --health-interval=10s + --health-timeout=5s + --health-retries=5 steps: - name: Checkout sources uses: actions/checkout@v4 diff --git a/.github/workflows/pr-preview.yml b/.github/workflows/pr-preview.yml index fde1d61..a55abc4 100644 --- a/.github/workflows/pr-preview.yml +++ b/.github/workflows/pr-preview.yml @@ -6,11 +6,25 @@ on: env: CARGO_TERM_COLOR: always + DATABASE_URL: postgres://postgres:postgres@localhost/postgres + RUST_BACKTRACE: 1 jobs: quick-check: name: Quick Check runs-on: ubuntu-latest + services: + postgres: + image: postgres:17 + env: + POSTGRES_PASSWORD: postgres + ports: + - 5432:5432 + options: >- + --health-cmd="pg_isready -U postgres" + --health-interval=10s + --health-timeout=5s + --health-retries=5 steps: - name: Checkout sources uses: actions/checkout@v4 @@ -67,6 +81,18 @@ jobs: test-if-needed: name: Test if Source Changed runs-on: ubuntu-latest + services: + postgres: + image: postgres:17 + env: + POSTGRES_PASSWORD: postgres + ports: + - 5432:5432 + options: >- + --health-cmd="pg_isready -U postgres" + --health-interval=10s + --health-timeout=5s + --health-retries=5 needs: changes if: needs.changes.outputs.src == 'true' steps: @@ -114,4 +140,4 @@ jobs: fi if [[ "${{ needs.changes.outputs.workflows }}" == "true" ]]; then echo "- ⚙️ GitHub Actions workflows" >> $GITHUB_STEP_SUMMARY - fi \ No newline at end of file + fi diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index ea14cd6..4140df3 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -13,6 +13,7 @@ on: env: CARGO_TERM_COLOR: always + DATABASE_URL: postgres://postgres:postgres@localhost/postgres jobs: validate: @@ -20,6 +21,18 @@ jobs: runs-on: ubuntu-latest outputs: version: ${{ steps.version.outputs.version }} + services: + postgres: + image: postgres:17 + env: + POSTGRES_PASSWORD: postgres + ports: + - 5432:5432 + options: >- + --health-cmd="pg_isready -U postgres" + --health-interval=10s + --health-timeout=5s + --health-retries=5 steps: - name: Checkout sources uses: actions/checkout@v4 @@ -129,4 +142,4 @@ jobs: draft: false prerelease: ${{ contains(needs.validate.outputs.version, 'alpha') || contains(needs.validate.outputs.version, 'beta') || contains(needs.validate.outputs.version, 'rc') }} env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}