diff --git a/CHANGELOG.md b/CHANGELOG.md index ec45948..584e72c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ## Unreleased +- bump: introducing rc.1 (#45) +- fix: Add primary keys for database tables (#45) + ## [1.0.0-beta.3] - 2025-12-06 - fix: correct allowSelfAssign param as bool (#25) diff --git a/Cargo.lock b/Cargo.lock index f985688..064dc41 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -19,9 +19,9 @@ dependencies = [ [[package]] name = "apalis" -version = "1.0.0-beta.2" +version = "1.0.0-rc.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6556b89bb5cb40dab6e4d7ee1f2abfef6d372bd3aef300a3faa992bcb13bda8" +checksum = "f93be0eb33b912f5e66004d0b756423c285273259068b1c80a71d7842658189b" dependencies = [ "apalis-core", "futures-util", @@ -31,11 +31,22 @@ dependencies = [ "tracing", ] +[[package]] +name = "apalis-codec" +version = "0.1.0-rc.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5ed6bb8e64c360ed4ad666a6cbc42e9e6df73087461dc4071f510a3af284637" +dependencies = [ + "apalis-core", + "serde", + "serde_json", +] + [[package]] name = "apalis-core" -version = "1.0.0-beta.2" +version = "1.0.0-rc.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49039d4eb476cc05e196210153dbb34be180de9a0ef8b7a666fde0f80c5c357d" +checksum = "6b1557d680ee4a9b42a76ab3a9572cee1a00d45e7eb455427d906c42774766e7" dependencies = [ "futures-channel", "futures-core", @@ -44,7 +55,6 @@ dependencies = [ "futures-util", "pin-project", "serde", - "serde_json", "thiserror", "tower-layer", "tower-service", @@ -53,9 +63,10 @@ dependencies = [ [[package]] name = "apalis-postgres" -version = "1.0.0-beta.3" +version = "1.0.0-rc.1" dependencies = [ "apalis", + "apalis-codec", "apalis-core", "apalis-sql", "apalis-workflow", @@ -75,9 +86,9 @@ dependencies = [ [[package]] name = "apalis-sql" -version = "1.0.0-beta.2" +version = "1.0.0-rc.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "604ed96ae8ff20d4c6f8533ffc9f8c91c5f99da6bc564de6a37b94829522f9ec" +checksum = "5ade5d8faa60e9975b01d3bb1ebc5028589aa4986365eaa4d080d30ed3b5141f" dependencies = [ "apalis-core", "chrono", @@ -88,9 +99,9 @@ dependencies = [ [[package]] name = "apalis-workflow" -version = "0.1.0-beta.2" +version = "0.1.0-rc.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afa1869ce395c0b727315a2be39508eb77ef94ed75492e7b43334b13241b1afa" +checksum = "bc024da2d5d3ab59cc9fea099a2e2b20de5ff608f2e287abcb73aa45e4966a89" dependencies = [ "apalis-core", "futures", @@ -148,7 +159,7 @@ dependencies = [ "async-channel 2.5.0", "async-executor", "async-io 2.6.0", - "async-lock 3.4.1", + "async-lock 3.4.2", "blocking", "futures-lite 2.6.1", "once_cell", @@ -187,7 +198,7 @@ dependencies = [ "futures-lite 2.6.1", "parking", "polling 3.11.0", - "rustix 1.1.2", + "rustix 1.1.3", "slab", "windows-sys 0.61.2", ] @@ -203,9 +214,9 @@ dependencies = [ [[package]] name = "async-lock" -version = "3.4.1" +version = "3.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fd03604047cee9b6ce9de9f70c6cd540a0520c813cbd49bae61f33ab80ed1dc" +checksum = "290f7f2596bd5b78a9fec8088ccd89180d7f9f55b94b0576823bbbdc72ee8311" dependencies = [ "event-listener 5.4.1", "event-listener-strategy", @@ -221,7 +232,7 @@ dependencies = [ "async-channel 1.9.0", "async-global-executor", "async-io 2.6.0", - "async-lock 3.4.1", + "async-lock 3.4.2", "crossbeam-utils", "futures-channel", "futures-core", @@ -273,9 +284,9 @@ checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" [[package]] name = "base64ct" -version = "1.8.0" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55248b47b0caf0546f7988906588779981c43bb1bc9d0c44087278f80cdb44ba" +checksum = "0e050f626429857a27ddccb31e0aca21356bfa709c04041aefddac081a8f068a" [[package]] name = "bitflags" @@ -316,9 +327,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.19.0" +version = "3.19.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" +checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510" [[package]] name = "byteorder" @@ -334,9 +345,9 @@ checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" [[package]] name = "cc" -version = "1.2.48" +version = "1.2.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c481bdbf0ed3b892f6f806287d72acd515b352a4ec27a208489b8c1bc839633a" +checksum = "9f50d563227a1c37cc0a263f64eca3334388c01c5e4c4861a9def205c614383c" dependencies = [ "find-msvc-tools", "shlex", @@ -941,9 +952,9 @@ checksum = "7aedcccd01fc5fe81e6b489c15b247b8b0690feb23304303a9e560f37efc560a" [[package]] name = "icu_properties" -version = "2.1.1" +version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e93fcd3157766c0c8da2f8cff6ce651a31f0810eaa1c51ec363ef790bbb5fb99" +checksum = "020bfc02fe870ec3a66d93e677ccca0562506e5872c650f893269e08615d74ec" dependencies = [ "icu_collections", "icu_locale_core", @@ -955,9 +966,9 @@ dependencies = [ [[package]] name = "icu_properties_data" -version = "2.1.1" +version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02845b3647bb045f1100ecd6480ff52f34c35f82d9880e029d329c21d1054899" +checksum = "616c294cf8d725c6afcd8f55abc17c56464ef6211f9ed59cccffe534129c77af" [[package]] name = "icu_provider" @@ -1027,9 +1038,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.15" +version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +checksum = "7ee5b5339afb4c41626dde77b7a611bd4f2c202b897852b4bcf5d03eddc61010" [[package]] name = "js-sys" @@ -1073,13 +1084,13 @@ checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de" [[package]] name = "libredox" -version = "0.1.10" +version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "416f7e718bdb06000964960ffa43b4335ad4012ae8b99060261aa4a8088d5ccb" +checksum = "df15f6eac291ed1cf25865b1ee60399f57e7c227e7f51bdbd4c5270396a9ed50" dependencies = [ "bitflags 2.10.0", "libc", - "redox_syscall", + "redox_syscall 0.6.0", ] [[package]] @@ -1292,7 +1303,7 @@ checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.5.18", "smallvec", "windows-link", ] @@ -1421,7 +1432,7 @@ dependencies = [ "concurrent-queue", "hermit-abi 0.5.2", "pin-project-lite", - "rustix 1.1.2", + "rustix 1.1.3", "windows-sys 0.61.2", ] @@ -1535,6 +1546,15 @@ dependencies = [ "bitflags 2.10.0", ] +[[package]] +name = "redox_syscall" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec96166dafa0886eb81fe1c0a388bece180fbef2135f97c1e2cf8302e74b43b5" +dependencies = [ + "bitflags 2.10.0", +] + [[package]] name = "ring" version = "0.17.14" @@ -1585,9 +1605,9 @@ dependencies = [ [[package]] name = "rustix" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd15f8a2c5551a84d56efdc1cd049089e409ac19a3072d5037a17fd70719ff3e" +checksum = "146c9e247ccc180c1f61615433868c99f3de3ae256a30a43b49f67c2d9171f34" dependencies = [ "bitflags 2.10.0", "errno", @@ -1612,9 +1632,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.13.1" +version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "708c0f9d5f54ba0272468c1d306a52c495b31fa155e91bc25371e6df7996908c" +checksum = "21e6f2ab2928ca4291b86736a8bd920a277a399bba1589409d72154ff87c1282" dependencies = [ "zeroize", ] @@ -1638,9 +1658,9 @@ checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" [[package]] name = "ryu" -version = "1.0.20" +version = "1.0.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" +checksum = "62049b2877bf12821e8f9ad256ee38fdc31db7387ec2d3b3f403024de2034aea" [[package]] name = "schannel" @@ -1712,15 +1732,15 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.145" +version = "1.0.147" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c" +checksum = "6af14725505314343e673e9ecb7cd7e8a36aa9791eb936235a3567cc31447ae4" dependencies = [ "itoa", "memchr", - "ryu", "serde", "serde_core", + "zmij", ] [[package]] @@ -2078,14 +2098,14 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.23.0" +version = "3.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d31c77bdf42a745371d260a26ca7163f1e0924b64afa0b688e61b5a9fa02f16" +checksum = "655da9c7eb6305c55742045d5a8d2037996d61d8de95806335c7c86ce0f82e9c" dependencies = [ "fastrand 2.3.0", "getrandom 0.3.4", "once_cell", - "rustix 1.1.2", + "rustix 1.1.3", "windows-sys 0.61.2", ] @@ -2215,9 +2235,9 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" -version = "0.1.43" +version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d15d90a0b5c19378952d479dc858407149d7bb45a14de0142f6c534b16fc647" +checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" dependencies = [ "log", "pin-project-lite", @@ -2238,9 +2258,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.35" +version = "0.1.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a04e24fab5c89c6a36eb8558c9656f30d81de51dfa4d3b45f26b21d61fa0a6c" +checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" dependencies = [ "once_cell", ] @@ -2881,3 +2901,9 @@ dependencies = [ "quote", "syn", ] + +[[package]] +name = "zmij" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1dccf46b25b205e4bebe1d5258a991df1cc17801017a845cb5b3fe0269781aa" diff --git a/Cargo.toml b/Cargo.toml index 40a0600..07a652e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "apalis-postgres" -version = "1.0.0-beta.3" +version = "1.0.0-rc.1" authors = ["Njuguna Mureithi "] edition = "2024" repository = "https://github.com/apalis-dev/apalis-postgres" @@ -22,19 +22,22 @@ tokio-comp = ["tokio", "sqlx/runtime-tokio-rustls"] tokio-comp-native-tls = ["tokio", "sqlx/runtime-tokio-native-tls"] [dependencies] -apalis-core = { version = "1.0.0-beta.2", default-features = false, features = [ +apalis-core = { version = "1.0.0-rc.1", default-features = false, features = [ "sleep", - "json", ] } -apalis-sql = { version = "1.0.0-beta.2", default-features = false } +apalis-sql = { version = "1.0.0-rc.1", default-features = false } +apalis-codec = { version = "0.1.0-rc.1", features = ["json"] } serde = { version = "1", features = ["derive"], default-features = false } chrono = { version = "0.4", features = ["serde"], default-features = false } pin-project = "1.1.10" serde_json = "1" futures = "0.3.30" thiserror = "2" -tokio = { version = "1", features = ["rt", "net"], optional = true, default-features = false} -async-std = { version = "1.13.0", optional = true, default-features = false} +tokio = { version = "1", features = [ + "rt", + "net", +], optional = true, default-features = false } +async-std = { version = "1.13.0", optional = true, default-features = false } ulid = { version = "1", features = ["serde"] } @@ -46,6 +49,6 @@ features = ["chrono", "postgres", "json"] [dev-dependencies] tokio = { version = "1", features = ["macros", "rt-multi-thread"] } once_cell = "1.19.0" -apalis = { version = "1.0.0-beta.2" } -apalis-workflow = { version = "0.1.0-beta.2" } +apalis = { version = "1.0.0-rc.1" } +apalis-workflow = { version = "0.1.0-rc.1" } futures-util = "0.3.30" diff --git a/README.md b/README.md index 71a84f6..3916f93 100644 --- a/README.md +++ b/README.md @@ -182,6 +182,7 @@ async fn main() { let pool = PgPool::connect(&std::env::var("DATABASE_URL").unwrap()) .await .unwrap(); + PostgresStorage::setup(&pool).await.unwrap(); let mut store = SharedPostgresStorage::new(pool); let mut map_store = store.make_shared().unwrap(); @@ -217,7 +218,7 @@ async fn main() { ## Observability Track your jobs using [apalis-board](https://github.com/apalis-dev/apalis-board). -![Task](https://github.com/apalis-dev/apalis-board/raw/master/screenshots/task.png) +![Task](https://github.com/apalis-dev/apalis-board/raw/main/screenshots/task.png) ## License diff --git a/examples/basic.rs b/examples/basic.rs index 9ed5403..b9fb8a7 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -2,6 +2,7 @@ use std::time::Duration; use apalis::{layers::retry::RetryPolicy, prelude::*}; use apalis_postgres::*; +use apalis_sql::ext::TaskBuilderExt; use futures::stream::{self, StreamExt}; #[tokio::main] @@ -17,7 +18,8 @@ async fn main() { start += 1; let task = Task::builder(start) .run_after(Duration::from_secs(1)) - .with_ctx(PgContext::new().with_priority(1)) + .priority(1) + .max_attempts(5) .build(); task }) @@ -25,7 +27,7 @@ async fn main() { backend.push_all(&mut items).await.unwrap(); async fn send_reminder(item: usize, _wrk: WorkerContext) -> Result<(), BoxDynError> { - if item % 3 == 0 { + if item.is_multiple_of(3) { println!("Reminding about item: {} but failing", item); return Err("Failed to send reminder".into()); } diff --git a/examples/dag.rs b/examples/dag.rs new file mode 100644 index 0000000..434cf30 --- /dev/null +++ b/examples/dag.rs @@ -0,0 +1,59 @@ +use apalis::prelude::*; +use apalis_postgres::*; +use apalis_workflow::*; + +async fn get_name(user_id: u32) -> Result { + Ok(user_id.to_string()) +} + +async fn get_age(user_id: u32) -> Result { + Ok(user_id as usize + 20) +} + +async fn get_address(user_id: u32) -> Result { + Ok(user_id as usize + 100) +} + +async fn collector( + (name, age, address): (String, usize, usize), + wrk: WorkerContext, +) -> Result { + let result = name.parse::()? + age + address; + wrk.stop().unwrap(); + Ok(result) +} + +#[tokio::main] +async fn main() { + let dag_flow = DagFlow::new("user-etl-workflow"); + let get_name = dag_flow.node(get_name); + let get_age = dag_flow.node(get_age); + let get_address = dag_flow.node(get_address); + dag_flow + .node(collector) + .depends_on((&get_name, &get_age, &get_address)); // Order and types matters here + + dag_flow.validate().unwrap(); + + println!("Executing workflow:\n{}", dag_flow); + + let pool = PgPool::connect(&std::env::var("DATABASE_URL").unwrap()) + .await + .unwrap(); + PostgresStorage::setup(&pool).await.unwrap(); + let mut backend = PostgresStorage::new_with_config(&pool, &Config::new("test-workflow")); + + backend.push_start(vec![42u32, 43, 44]).await.unwrap(); + + let worker = WorkerBuilder::new("rango-tango") + .backend(backend) + .on_event(|ctx, ev| { + println!("On Event = {:?}", ev); + if matches!(ev, Event::Error(_)) { + ctx.stop().unwrap(); + } + }) + .build(dag_flow); + + worker.run().await.unwrap(); +} diff --git a/examples/workflow.rs b/examples/stepped.rs similarity index 78% rename from examples/workflow.rs rename to examples/stepped.rs index ab3879e..8ca8ccb 100644 --- a/examples/workflow.rs +++ b/examples/stepped.rs @@ -12,10 +12,14 @@ async fn main() { .filter_map(|x| async move { if x % 3 != 0 { Some(x) } else { None } }) .filter_map(|x| async move { if x % 5 != 0 { Some(x) } else { None } }) .delay_for(Duration::from_millis(1000)) - .and_then(|a: Vec| async move { - println!("Sum: {}", a.iter().sum::()); - Ok::<(), BoxDynError>(()) - }); + .and_then( + |a: Vec, ctx: WorkerContext, task_id: PgTaskId| async move { + println!("Sum: {}", a.iter().sum::()); + ctx.stop().unwrap(); + println!("Completed Task ID: {}", task_id); + Ok::<(), BoxDynError>(()) + }, + ); let pool = PgPool::connect(&std::env::var("DATABASE_URL").unwrap()) .await diff --git a/migrations/20251225090252_include_primary_keys.sql b/migrations/20251225090252_include_primary_keys.sql new file mode 100644 index 0000000..4959835 --- /dev/null +++ b/migrations/20251225090252_include_primary_keys.sql @@ -0,0 +1,2 @@ +ALTER TABLE apalis.jobs ADD PRIMARY KEY (id); +ALTER TABLE apalis.workers ADD PRIMARY KEY (id); diff --git a/src/config.rs b/src/config.rs deleted file mode 100644 index d2f7565..0000000 --- a/src/config.rs +++ /dev/null @@ -1,17 +0,0 @@ -use apalis_core::backend::{BackendExt, ConfigExt, queue::Queue}; -use apalis_sql::context::SqlContext; -use ulid::Ulid; - -pub use apalis_sql::config::*; - -use crate::{CompactType, PostgresStorage}; - -impl ConfigExt for PostgresStorage -where - PostgresStorage: - BackendExt, -{ - fn get_queue(&self) -> Queue { - self.config.queue().clone() - } -} diff --git a/src/fetcher.rs b/src/fetcher.rs index 70e798e..d43fcf9 100644 --- a/src/fetcher.rs +++ b/src/fetcher.rs @@ -14,7 +14,7 @@ use pin_project::pin_project; use sqlx::{PgPool, Pool, Postgres}; use ulid::Ulid; -use crate::{CompactType, PgContext, PgTask, config::Config, from_row::PgTaskRow}; +use crate::{CompactType, Config, PgContext, PgTask, from_row::PgTaskRow}; async fn fetch_next( pool: PgPool, diff --git a/src/lib.rs b/src/lib.rs index 0523c46..34f49af 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,17 +4,15 @@ //! [`SharedPostgresStorage`]: crate::shared::SharedPostgresStorage use std::{fmt::Debug, marker::PhantomData}; +use apalis_codec::json::JsonCodec; use apalis_core::{ - backend::{ - Backend, BackendExt, TaskStream, - codec::{Codec, json::JsonCodec}, - }, + backend::{Backend, BackendExt, TaskStream, codec::Codec, queue::Queue}, features_table, layers::Stack, task::{Task, task_id::TaskId}, worker::{context::WorkerContext, ext::ack::AcknowledgeLayer}, }; -use apalis_sql::from_row::TaskRow; +pub use apalis_sql::{config::Config, from_row::TaskRow}; use futures::{ StreamExt, TryFutureExt, TryStreamExt, future::ready, @@ -35,25 +33,19 @@ pub use crate::{ }; mod ack; -mod config; mod fetcher; mod from_row; -pub type PgContext = apalis_sql::context::SqlContext; -pub use config::Config; - +pub type PgContext = apalis_sql::context::SqlContext; mod queries; pub mod shared; pub mod sink; pub type PgTask = Task; -pub type CompactType = Vec; +pub type PgTaskId = TaskId; -#[derive(Debug, Clone, Default)] -pub struct PgNotify { - _private: PhantomData<()>, -} +pub type CompactType = Vec; #[doc = features_table! { setup = r#" @@ -98,6 +90,12 @@ pub struct PostgresStorage< sink: PgSink, } +/// A fetcher that does nothing, used for notify-based storage +#[derive(Debug, Clone, Default)] +pub struct PgNotify { + _private: PhantomData<()>, +} + impl Clone for PostgresStorage { @@ -252,6 +250,10 @@ where type Codec = Decode; type CompactStream = TaskStream, Self::Error>; + + fn get_queue(&self) -> Queue { + self.config.queue().clone() + } fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream { self.poll_basic(worker).boxed() } @@ -345,6 +347,11 @@ where type Codec = Decode; type CompactStream = TaskStream, Self::Error>; + + fn get_queue(&self) -> Queue { + self.config.queue().clone() + } + fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream { self.poll_with_notify(worker).boxed() } @@ -440,7 +447,7 @@ impl PostgresStorage { #[derive(Debug, Deserialize)] pub struct InsertEvent { job_type: String, - id: TaskId, + id: PgTaskId, } #[cfg(test)] diff --git a/src/queries/fetch_by_id.rs b/src/queries/fetch_by_id.rs index fd7e808..d9e5afe 100644 --- a/src/queries/fetch_by_id.rs +++ b/src/queries/fetch_by_id.rs @@ -1,12 +1,9 @@ -use apalis_core::{ - backend::{BackendExt, FetchById, codec::Codec}, - task::task_id::TaskId, -}; +use apalis_core::backend::{BackendExt, FetchById, codec::Codec}; use apalis_sql::from_row::{FromRowError, TaskRow}; use ulid::Ulid; -use crate::{CompactType, PgContext, PgTask, PostgresStorage, from_row::PgTaskRow}; +use crate::{CompactType, PgContext, PgTask, PgTaskId, PostgresStorage, from_row::PgTaskRow}; impl FetchById for PostgresStorage where @@ -18,7 +15,7 @@ where { fn fetch_by_id( &mut self, - id: &TaskId, + id: &PgTaskId, ) -> impl Future>, Self::Error>> + Send { let pool = self.pool.clone(); let id = id.to_string(); diff --git a/src/queries/list_queues.rs b/src/queries/list_queues.rs index 4420672..cd65018 100644 --- a/src/queries/list_queues.rs +++ b/src/queries/list_queues.rs @@ -1,14 +1,13 @@ use apalis_core::backend::{BackendExt, ListQueues, QueueInfo}; -use apalis_sql::context::SqlContext; use serde_json::Value; use ulid::Ulid; -use crate::{CompactType, PostgresStorage}; +use crate::{CompactType, PgContext, PostgresStorage}; impl ListQueues for PostgresStorage where PostgresStorage: - BackendExt, + BackendExt, { fn list_queues(&self) -> impl Future, Self::Error>> + Send { let pool = self.pool.clone(); diff --git a/src/queries/list_tasks.rs b/src/queries/list_tasks.rs index 67013d1..47594ec 100644 --- a/src/queries/list_tasks.rs +++ b/src/queries/list_tasks.rs @@ -2,18 +2,15 @@ use apalis_core::{ backend::{BackendExt, Filter, ListAllTasks, ListTasks, codec::Codec}, task::{Task, status::Status}, }; -use apalis_sql::{ - context::SqlContext, - from_row::{FromRowError, TaskRow}, -}; +use apalis_sql::from_row::{FromRowError, TaskRow}; use ulid::Ulid; -use crate::{CompactType, PgTask, PostgresStorage, from_row::PgTaskRow}; +use crate::{CompactType, PgContext, PgTask, PostgresStorage, from_row::PgTaskRow}; impl ListTasks for PostgresStorage where PostgresStorage: - BackendExt, + BackendExt, D: Codec, D::Error: std::error::Error + Send + Sync + 'static, Args: 'static, @@ -62,7 +59,7 @@ where impl ListAllTasks for PostgresStorage where PostgresStorage: - BackendExt, + BackendExt, { fn list_all_tasks( &self, diff --git a/src/queries/list_workers.rs b/src/queries/list_workers.rs index b20e543..adff2a5 100644 --- a/src/queries/list_workers.rs +++ b/src/queries/list_workers.rs @@ -1,5 +1,4 @@ use apalis_core::backend::{BackendExt, ListWorkers, RunningWorker}; -use apalis_sql::context::SqlContext; use chrono::{DateTime, Utc}; use futures::TryFutureExt; use ulid::Ulid; @@ -14,12 +13,12 @@ pub struct WorkerRow { pub started_at: Option>, } -use crate::{CompactType, PostgresStorage}; +use crate::{CompactType, PgContext, PostgresStorage}; impl ListWorkers for PostgresStorage where PostgresStorage: - BackendExt, + BackendExt, { fn list_workers( &self, diff --git a/src/queries/metrics.rs b/src/queries/metrics.rs index ba98504..5785a33 100644 --- a/src/queries/metrics.rs +++ b/src/queries/metrics.rs @@ -1,8 +1,7 @@ use apalis_core::backend::{BackendExt, Metrics, Statistic}; -use apalis_sql::context::SqlContext; use ulid::Ulid; -use crate::{CompactType, PostgresStorage}; +use crate::{CompactType, PgContext, PostgresStorage}; struct StatisticRow { priority: Option, @@ -14,7 +13,7 @@ struct StatisticRow { impl Metrics for PostgresStorage where PostgresStorage: - BackendExt, + BackendExt, { fn global(&self) -> impl Future, Self::Error>> + Send { let pool = self.pool.clone(); diff --git a/src/queries/wait_for.rs b/src/queries/wait_for.rs index ba611e1..4c9b707 100644 --- a/src/queries/wait_for.rs +++ b/src/queries/wait_for.rs @@ -4,12 +4,11 @@ use apalis_core::{ backend::{BackendExt, TaskResult, WaitForCompletion}, task::{status::Status, task_id::TaskId}, }; -use apalis_sql::context::SqlContext; use futures::{StreamExt, stream::BoxStream}; use serde::de::DeserializeOwned; use ulid::Ulid; -use crate::{CompactType, PostgresStorage}; +use crate::{CompactType, PgContext, PostgresStorage}; #[derive(Debug)] pub struct TaskResultRow { @@ -22,10 +21,10 @@ impl WaitForCompletion for PostgresStorage where PostgresStorage: - BackendExt, + BackendExt, Result: DeserializeOwned, { - type ResultStream = BoxStream<'static, Result, Self::Error>>; + type ResultStream = BoxStream<'static, Result, Self::Error>>; fn wait_for( &self, task_ids: impl IntoIterator>, @@ -81,7 +80,7 @@ where fn check_status( &self, task_ids: impl IntoIterator> + Send, - ) -> impl Future>, Self::Error>> + Send { + ) -> impl Future>, Self::Error>> + Send { let pool = self.pool.clone(); let ids: Vec = task_ids.into_iter().map(|id| id.to_string()).collect(); diff --git a/src/shared.rs b/src/shared.rs index 801912b..46a08a5 100644 --- a/src/shared.rs +++ b/src/shared.rs @@ -8,7 +8,7 @@ use std::{ }; use crate::{ - CompactType, Config, InsertEvent, PgContext, PgTask, PostgresStorage, + CompactType, Config, InsertEvent, PgContext, PgTask, PgTaskId, PostgresStorage, ack::{LockTaskLayer, PgAck}, fetcher::PgPollFetcher, queries::{ @@ -17,14 +17,10 @@ use crate::{ }, }; use crate::{from_row::PgTaskRow, sink::PgSink}; +use apalis_codec::json::JsonCodec; use apalis_core::{ - backend::{ - Backend, BackendExt, TaskStream, - codec::{Codec, json::JsonCodec}, - shared::MakeShared, - }, + backend::{Backend, BackendExt, TaskStream, codec::Codec, queue::Queue, shared::MakeShared}, layers::Stack, - task::task_id::TaskId, worker::{context::WorkerContext, ext::ack::AcknowledgeLayer}, }; use apalis_sql::from_row::TaskRow; @@ -40,14 +36,14 @@ use ulid::Ulid; pub struct SharedPostgresStorage> { pool: PgPool, - registry: Arc>>>, + registry: Arc>>>, drive: Shared>, _marker: PhantomData<(Compact, Codec)>, } impl SharedPostgresStorage { pub fn new(pool: PgPool) -> Self { - let registry: Arc>>> = + let registry: Arc>>> = Arc::new(Mutex::new(HashMap::default())); let p = pool.clone(); let instances = registry.clone(); @@ -140,12 +136,11 @@ impl MakeShared for SharedPostgresStorage>, - receiver: Arc>>, + receiver: Arc>>, } impl Stream for SharedFetcher { - type Item = TaskId; - + type Item = PgTaskId; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); // Keep the poller alive by polling it, but ignoring the output @@ -226,6 +221,11 @@ where type Codec = Decode; type CompactStream = TaskStream, Self::Error>; + + fn get_queue(&self) -> Queue { + self.config.queue().clone() + } + fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream { self.poll_shared(worker).boxed() } @@ -318,9 +318,9 @@ mod tests { .unwrap(); int_store.push(99).await.unwrap(); - async fn send_reminder( + async fn send_reminder( _: T, - _task_id: TaskId, + _task_id: PgTaskId, wrk: WorkerContext, ) -> Result<(), BoxDynError> { tokio::time::sleep(Duration::from_secs(2)).await; diff --git a/src/sink.rs b/src/sink.rs index 2409fb5..2c0565f 100644 --- a/src/sink.rs +++ b/src/sink.rs @@ -1,19 +1,19 @@ -use std::{ - pin::Pin, - sync::Arc, - task::{Context, Poll}, -}; - -use apalis_core::backend::codec::json::JsonCodec; +use apalis_codec::json::JsonCodec; +use apalis_sql::config::Config; use chrono::{DateTime, Utc}; use futures::{ FutureExt, Sink, TryFutureExt, future::{BoxFuture, Shared}, }; use sqlx::{Executor, PgPool}; +use std::{ + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; use ulid::Ulid; -use crate::{CompactType, PgTask, PostgresStorage, config::Config}; +use crate::{CompactType, PgTask, PostgresStorage}; type FlushFuture = BoxFuture<'static, Result<(), Arc>>; diff --git a/supply-chain/config.toml b/supply-chain/config.toml index fca1310..0878c0f 100644 --- a/supply-chain/config.toml +++ b/supply-chain/config.toml @@ -16,11 +16,15 @@ version = "0.1.5" criteria = "safe-to-deploy" [[exemptions.apalis]] -version = "1.0.0-beta.2" +version = "1.0.0-rc.1" criteria = "safe-to-run" +[[exemptions.apalis-codec]] +version = "0.1.0-rc.1" +criteria = "safe-to-deploy" + [[exemptions.apalis-core]] -version = "1.0.0-beta.2" +version = "1.0.0-rc.1" criteria = "safe-to-deploy" [[exemptions.apalis-postgres]] @@ -28,11 +32,11 @@ version = "1.0.0-beta.3" criteria = "safe-to-deploy" [[exemptions.apalis-sql]] -version = "1.0.0-beta.2" +version = "1.0.0-rc.1" criteria = "safe-to-deploy" [[exemptions.apalis-workflow]] -version = "0.1.0-beta.2" +version = "0.1.0-rc.1" criteria = "safe-to-run" [[exemptions.async-channel]] @@ -64,7 +68,7 @@ version = "2.8.0" criteria = "safe-to-deploy" [[exemptions.async-lock]] -version = "3.4.1" +version = "3.4.2" criteria = "safe-to-deploy" [[exemptions.async-std]] @@ -92,7 +96,7 @@ version = "0.22.1" criteria = "safe-to-deploy" [[exemptions.base64ct]] -version = "1.8.0" +version = "1.8.1" criteria = "safe-to-deploy" [[exemptions.bitflags]] @@ -112,7 +116,7 @@ version = "1.6.2" criteria = "safe-to-deploy" [[exemptions.bumpalo]] -version = "3.19.0" +version = "3.19.1" criteria = "safe-to-deploy" [[exemptions.byteorder]] @@ -124,7 +128,7 @@ version = "1.11.0" criteria = "safe-to-deploy" [[exemptions.cc]] -version = "1.2.48" +version = "1.2.50" criteria = "safe-to-deploy" [[exemptions.cfg-if]] @@ -388,11 +392,11 @@ version = "2.1.1" criteria = "safe-to-deploy" [[exemptions.icu_properties]] -version = "2.1.1" +version = "2.1.2" criteria = "safe-to-deploy" [[exemptions.icu_properties_data]] -version = "2.1.1" +version = "2.1.2" criteria = "safe-to-deploy" [[exemptions.icu_provider]] @@ -420,7 +424,7 @@ version = "1.0.11" criteria = "safe-to-deploy" [[exemptions.itoa]] -version = "1.0.15" +version = "1.0.16" criteria = "safe-to-deploy" [[exemptions.js-sys]] @@ -444,7 +448,7 @@ version = "0.2.15" criteria = "safe-to-deploy" [[exemptions.libredox]] -version = "0.1.10" +version = "0.1.11" criteria = "safe-to-deploy" [[exemptions.libsqlite3-sys]] @@ -635,6 +639,10 @@ criteria = "safe-to-deploy" version = "0.5.18" criteria = "safe-to-deploy" +[[exemptions.redox_syscall]] +version = "0.6.0" +criteria = "safe-to-deploy" + [[exemptions.ring]] version = "0.17.14" criteria = "safe-to-deploy" @@ -648,7 +656,7 @@ version = "0.37.28" criteria = "safe-to-deploy" [[exemptions.rustix]] -version = "1.1.2" +version = "1.1.3" criteria = "safe-to-deploy" [[exemptions.rustls]] @@ -656,7 +664,7 @@ version = "0.23.35" criteria = "safe-to-deploy" [[exemptions.rustls-pki-types]] -version = "1.13.1" +version = "1.13.2" criteria = "safe-to-deploy" [[exemptions.rustls-webpki]] @@ -668,7 +676,7 @@ version = "1.0.22" criteria = "safe-to-deploy" [[exemptions.ryu]] -version = "1.0.20" +version = "1.0.21" criteria = "safe-to-deploy" [[exemptions.schannel]] @@ -700,7 +708,7 @@ version = "1.0.228" criteria = "safe-to-deploy" [[exemptions.serde_json]] -version = "1.0.145" +version = "1.0.147" criteria = "safe-to-deploy" [[exemptions.serde_urlencoded]] @@ -800,7 +808,7 @@ version = "0.13.2" criteria = "safe-to-deploy" [[exemptions.tempfile]] -version = "3.23.0" +version = "3.24.0" criteria = "safe-to-deploy" [[exemptions.thiserror]] @@ -852,7 +860,7 @@ version = "0.3.3" criteria = "safe-to-deploy" [[exemptions.tracing]] -version = "0.1.43" +version = "0.1.44" criteria = "safe-to-deploy" [[exemptions.tracing-attributes]] @@ -860,7 +868,7 @@ version = "0.1.31" criteria = "safe-to-deploy" [[exemptions.tracing-core]] -version = "0.1.35" +version = "0.1.36" criteria = "safe-to-deploy" [[exemptions.typenum]] @@ -1170,3 +1178,7 @@ criteria = "safe-to-deploy" [[exemptions.zerovec-derive]] version = "0.11.2" criteria = "safe-to-deploy" + +[[exemptions.zmij]] +version = "0.1.8" +criteria = "safe-to-deploy" diff --git a/supply-chain/imports.lock b/supply-chain/imports.lock index 0c397a4..eadcd36 100644 --- a/supply-chain/imports.lock +++ b/supply-chain/imports.lock @@ -1,2 +1,6 @@ # cargo-vet imports lock + +[[unpublished.apalis-postgres]] +version = "1.0.0-rc.1" +audited_as = "1.0.0-beta.3"