diff --git a/CHANGELOG.md b/CHANGELOG.md index c69cf85..4c692f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## Unreleased +- _bump_: introducing mysql v1.0.0-rc.1 (#12) + ## [1.0.0-beta.1] - 2025-12-06 - _chore_: introducing mysql v1 (#6) @@ -10,7 +12,7 @@ ### 🐛 Bug Fixes -- Reenqueue oprphaned before starting streaming (#507) +- Reenqueue orphaned before starting streaming (#507) ### 💼 Other diff --git a/Cargo.lock b/Cargo.lock index 6ff465e..12290f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -78,9 +78,9 @@ dependencies = [ [[package]] name = "apalis" -version = "1.0.0-beta.2" +version = "1.0.0-rc.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6556b89bb5cb40dab6e4d7ee1f2abfef6d372bd3aef300a3faa992bcb13bda8" +checksum = "f93be0eb33b912f5e66004d0b756423c285273259068b1c80a71d7842658189b" dependencies = [ "apalis-core", "futures-util", @@ -90,11 +90,22 @@ dependencies = [ "tracing", ] +[[package]] +name = "apalis-codec" +version = "0.1.0-rc.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5ed6bb8e64c360ed4ad666a6cbc42e9e6df73087461dc4071f510a3af284637" +dependencies = [ + "apalis-core", + "serde", + "serde_json", +] + [[package]] name = "apalis-core" -version = "1.0.0-beta.2" +version = "1.0.0-rc.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49039d4eb476cc05e196210153dbb34be180de9a0ef8b7a666fde0f80c5c357d" +checksum = "6b1557d680ee4a9b42a76ab3a9572cee1a00d45e7eb455427d906c42774766e7" dependencies = [ "futures-channel", "futures-core", @@ -103,7 +114,6 @@ dependencies = [ "futures-util", "pin-project", "serde", - "serde_json", "thiserror", "tower-layer", "tower-service", @@ -112,9 +122,10 @@ dependencies = [ [[package]] name = "apalis-mysql" -version = "1.0.0-beta.1" +version = "1.0.0-rc.1" dependencies = [ "apalis", + "apalis-codec", "apalis-core", "apalis-sql", "apalis-workflow", @@ -135,9 +146,9 @@ dependencies = [ [[package]] name = "apalis-sql" -version = "1.0.0-beta.2" +version = "1.0.0-rc.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "604ed96ae8ff20d4c6f8533ffc9f8c91c5f99da6bc564de6a37b94829522f9ec" +checksum = "5ade5d8faa60e9975b01d3bb1ebc5028589aa4986365eaa4d080d30ed3b5141f" dependencies = [ "apalis-core", "chrono", @@ -148,9 +159,9 @@ dependencies = [ [[package]] name = "apalis-workflow" -version = "0.1.0-beta.2" +version = "0.1.0-rc.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afa1869ce395c0b727315a2be39508eb77ef94ed75492e7b43334b13241b1afa" +checksum = "bc024da2d5d3ab59cc9fea099a2e2b20de5ff608f2e287abcb73aa45e4966a89" dependencies = [ "apalis-core", "futures", @@ -208,7 +219,7 @@ dependencies = [ "async-channel 2.5.0", "async-executor", "async-io 2.6.0", - "async-lock 3.4.1", + "async-lock 3.4.2", "blocking", "futures-lite 2.6.1", "once_cell", @@ -247,7 +258,7 @@ dependencies = [ "futures-lite 2.6.1", "parking", "polling 3.11.0", - "rustix 1.1.2", + "rustix 1.1.3", "slab", "windows-sys 0.61.2", ] @@ -263,9 +274,9 @@ dependencies = [ [[package]] name = "async-lock" -version = "3.4.1" +version = "3.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fd03604047cee9b6ce9de9f70c6cd540a0520c813cbd49bae61f33ab80ed1dc" +checksum = "290f7f2596bd5b78a9fec8088ccd89180d7f9f55b94b0576823bbbdc72ee8311" dependencies = [ "event-listener 5.4.1", "event-listener-strategy", @@ -281,7 +292,7 @@ dependencies = [ "async-channel 1.9.0", "async-global-executor", "async-io 2.6.0", - "async-lock 3.4.1", + "async-lock 3.4.2", "crossbeam-utils", "futures-channel", "futures-core", @@ -333,9 +344,9 @@ checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" [[package]] name = "base64ct" -version = "1.8.0" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55248b47b0caf0546f7988906588779981c43bb1bc9d0c44087278f80cdb44ba" +checksum = "0e050f626429857a27ddccb31e0aca21356bfa709c04041aefddac081a8f068a" [[package]] name = "bitflags" @@ -376,9 +387,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.19.0" +version = "3.19.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" +checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510" [[package]] name = "byteorder" @@ -394,9 +405,9 @@ checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" [[package]] name = "cc" -version = "1.2.49" +version = "1.2.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90583009037521a116abf44494efecd645ba48b6622457080f080b85544e2215" +checksum = "9f50d563227a1c37cc0a263f64eca3334388c01c5e4c4861a9def205c614383c" dependencies = [ "find-msvc-tools", "shlex", @@ -1030,9 +1041,9 @@ checksum = "7aedcccd01fc5fe81e6b489c15b247b8b0690feb23304303a9e560f37efc560a" [[package]] name = "icu_properties" -version = "2.1.1" +version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e93fcd3157766c0c8da2f8cff6ce651a31f0810eaa1c51ec363ef790bbb5fb99" +checksum = "020bfc02fe870ec3a66d93e677ccca0562506e5872c650f893269e08615d74ec" dependencies = [ "icu_collections", "icu_locale_core", @@ -1044,9 +1055,9 @@ dependencies = [ [[package]] name = "icu_properties_data" -version = "2.1.1" +version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02845b3647bb045f1100ecd6480ff52f34c35f82d9880e029d329c21d1054899" +checksum = "616c294cf8d725c6afcd8f55abc17c56464ef6211f9ed59cccffe534129c77af" [[package]] name = "icu_provider" @@ -1122,15 +1133,15 @@ checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" [[package]] name = "itoa" -version = "1.0.15" +version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +checksum = "7ee5b5339afb4c41626dde77b7a611bd4f2c202b897852b4bcf5d03eddc61010" [[package]] name = "jiff" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49cce2b81f2098e7e3efc35bc2e0a6b7abec9d34128283d7a26fa8f32a6dbb35" +checksum = "a87d9b8105c23642f50cbbae03d1f75d8422c5cb98ce7ee9271f7ff7505be6b8" dependencies = [ "jiff-static", "log", @@ -1141,9 +1152,9 @@ dependencies = [ [[package]] name = "jiff-static" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "980af8b43c3ad5d8d349ace167ec8170839f753a42d233ba19e08afe1850fa69" +checksum = "b787bebb543f8969132630c51fd0afab173a86c6abae56ff3b9e5e3e3f9f6e58" dependencies = [ "proc-macro2", "quote", @@ -1192,13 +1203,13 @@ checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de" [[package]] name = "libredox" -version = "0.1.10" +version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "416f7e718bdb06000964960ffa43b4335ad4012ae8b99060261aa4a8088d5ccb" +checksum = "df15f6eac291ed1cf25865b1ee60399f57e7c227e7f51bdbd4c5270396a9ed50" dependencies = [ "bitflags 2.10.0", "libc", - "redox_syscall", + "redox_syscall 0.6.0", ] [[package]] @@ -1417,7 +1428,7 @@ checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.5.18", "smallvec", "windows-link", ] @@ -1546,15 +1557,15 @@ dependencies = [ "concurrent-queue", "hermit-abi 0.5.2", "pin-project-lite", - "rustix 1.1.2", + "rustix 1.1.3", "windows-sys 0.61.2", ] [[package]] name = "portable-atomic" -version = "1.11.1" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" +checksum = "f59e70c4aef1e55797c2e8fd94a4f2a973fc972cfde0e0b05f683667b0cd39dd" [[package]] name = "portable-atomic-util" @@ -1675,6 +1686,15 @@ dependencies = [ "bitflags 2.10.0", ] +[[package]] +name = "redox_syscall" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec96166dafa0886eb81fe1c0a388bece180fbef2135f97c1e2cf8302e74b43b5" +dependencies = [ + "bitflags 2.10.0", +] + [[package]] name = "regex" version = "1.12.2" @@ -1754,9 +1774,9 @@ dependencies = [ [[package]] name = "rustix" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd15f8a2c5551a84d56efdc1cd049089e409ac19a3072d5037a17fd70719ff3e" +checksum = "146c9e247ccc180c1f61615433868c99f3de3ae256a30a43b49f67c2d9171f34" dependencies = [ "bitflags 2.10.0", "errno", @@ -1781,9 +1801,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.13.1" +version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "708c0f9d5f54ba0272468c1d306a52c495b31fa155e91bc25371e6df7996908c" +checksum = "21e6f2ab2928ca4291b86736a8bd920a277a399bba1589409d72154ff87c1282" dependencies = [ "zeroize", ] @@ -1807,9 +1827,9 @@ checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" [[package]] name = "ryu" -version = "1.0.20" +version = "1.0.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" +checksum = "62049b2877bf12821e8f9ad256ee38fdc31db7387ec2d3b3f403024de2034aea" [[package]] name = "schannel" @@ -1881,15 +1901,15 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.145" +version = "1.0.147" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c" +checksum = "6af14725505314343e673e9ecb7cd7e8a36aa9791eb936235a3567cc31447ae4" dependencies = [ "itoa", "memchr", - "ryu", "serde", "serde_core", + "zmij", ] [[package]] @@ -2247,14 +2267,14 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.23.0" +version = "3.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d31c77bdf42a745371d260a26ca7163f1e0924b64afa0b688e61b5a9fa02f16" +checksum = "655da9c7eb6305c55742045d5a8d2037996d61d8de95806335c7c86ce0f82e9c" dependencies = [ "fastrand 2.3.0", "getrandom 0.3.4", "once_cell", - "rustix 1.1.2", + "rustix 1.1.3", "windows-sys 0.61.2", ] @@ -2384,9 +2404,9 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" -version = "0.1.43" +version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d15d90a0b5c19378952d479dc858407149d7bb45a14de0142f6c534b16fc647" +checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" dependencies = [ "log", "pin-project-lite", @@ -2407,9 +2427,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.35" +version = "0.1.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a04e24fab5c89c6a36eb8558c9656f30d81de51dfa4d3b45f26b21d61fa0a6c" +checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" dependencies = [ "once_cell", ] @@ -3056,3 +3076,9 @@ dependencies = [ "quote", "syn", ] + +[[package]] +name = "zmij" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1dccf46b25b205e4bebe1d5258a991df1cc17801017a845cb5b3fe0269781aa" diff --git a/Cargo.toml b/Cargo.toml index 18dd11c..a4ab096 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "apalis-mysql" -version = "1.0.0-beta.1" +version = "1.0.0-rc.1" authors = ["Njuguna Mureithi "] edition = "2024" repository = "https://github.com/apalis-dev/apalis-mysql" @@ -22,11 +22,11 @@ tokio-comp = ["tokio", "sqlx/runtime-tokio-rustls"] tokio-comp-native-tls = ["tokio", "sqlx/runtime-tokio-native-tls"] [dependencies] -apalis-core = { version = "1.0.0-beta.2", default-features = false, features = [ +apalis-codec = "0.1.0-rc.1" +apalis-core = { version = "1.0.0-rc.1", default-features = false, features = [ "sleep", - "json", ] } -apalis-sql = { version = "1.0.0-beta.2", default-features = false } +apalis-sql = { version = "1.0.0-rc.1", default-features = false } serde = { version = "1", features = ["derive"] } chrono = { version = "0.4", features = ["serde"] } pin-project = "1.1.10" @@ -47,8 +47,8 @@ features = ["chrono", "mysql", "json"] [dev-dependencies] tokio = { version = "1", features = ["macros", "rt-multi-thread"] } -apalis = { version = "1.0.0-beta.2" } -apalis-workflow = { version = "0.1.0-beta.2" } +apalis = { version = "1.0.0-rc.1" } +apalis-workflow = { version = "0.1.0-rc.1" } futures-util = "0.3.31" env_logger = "0.11" diff --git a/README.md b/README.md index d5739ad..3a1d09f 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,7 @@ async fn main() { start += 1; let task = Task::builder(start) .run_after(Duration::from_secs(1)) - .with_ctx(SqlContext::new().with_priority(1)) + .priority(1) .build(); task }) diff --git a/examples/dag.rs b/examples/dag.rs new file mode 100644 index 0000000..2d76553 --- /dev/null +++ b/examples/dag.rs @@ -0,0 +1,53 @@ +use apalis::prelude::*; +use apalis_mysql::MySqlStorage; +use apalis_workflow::*; +use sqlx::MySqlPool; + +async fn get_name(user_id: u32) -> Result { + Ok(user_id.to_string()) +} + +async fn get_age(user_id: u32) -> Result { + Ok(user_id as usize + 20) +} + +async fn get_address(user_id: u32) -> Result { + Ok(user_id as usize + 100) +} + +async fn collector( + (name, age, address): (String, usize, usize), + wrk: WorkerContext, +) -> Result { + let result = name.parse::()? + age + address; + wrk.stop().unwrap(); + Ok(result) +} + +#[tokio::main] +async fn main() { + let pool = MySqlPool::connect(&std::env::var("DATABASE_URL").unwrap()) + .await + .unwrap(); + MySqlStorage::setup(&pool).await.unwrap(); + let mut backend = MySqlStorage::new(&pool); + backend.push_start(vec![42, 43, 44]).await.unwrap(); + + let dag_flow = DagFlow::new("user-etl-workflow"); + let get_name = dag_flow.node(get_name); + let get_age = dag_flow.node(get_age); + let get_address = dag_flow.node(get_address); + dag_flow + .node(collector) + .depends_on((&get_name, &get_age, &get_address)); // Order and types matters here + + dag_flow.validate().unwrap(); + + let worker = WorkerBuilder::new("rango-tango") + .backend(backend) + .on_event(|_c, e| { + println!("{e:?},"); + }) + .build(dag_flow); + worker.run().await.unwrap(); +} diff --git a/examples/workflow.rs b/examples/stepped.rs similarity index 100% rename from examples/workflow.rs rename to examples/stepped.rs diff --git a/src/ack.rs b/src/ack.rs index b473576..12b1800 100644 --- a/src/ack.rs +++ b/src/ack.rs @@ -4,13 +4,12 @@ use apalis_core::{ task::{Parts, status::Status}, worker::{context::WorkerContext, ext::ack::Acknowledge}, }; -use apalis_sql::context::SqlContext; use futures::{FutureExt, future::BoxFuture}; use serde::Serialize; use sqlx::MySqlPool; use ulid::Ulid; -use crate::MySqlTask; +use crate::{MySqlContext, MySqlTask}; #[derive(Clone, Debug)] pub struct MySqlAck { @@ -22,13 +21,13 @@ impl MySqlAck { } } -impl Acknowledge for MySqlAck { +impl Acknowledge for MySqlAck { type Error = sqlx::Error; type Future = BoxFuture<'static, Result<(), Self::Error>>; fn ack( &mut self, res: &Result, - parts: &Parts, + parts: &Parts, ) -> Self::Future { let task_id = parts.task_id; let worker_id = parts.ctx.lock_by().clone(); @@ -70,7 +69,7 @@ impl Acknowledge for MySqlAck { } pub(crate) fn calculate_status( - parts: &Parts, + parts: &Parts, res: &Result, ) -> Status { match &res { diff --git a/src/config.rs b/src/config.rs deleted file mode 100644 index a92342d..0000000 --- a/src/config.rs +++ /dev/null @@ -1,16 +0,0 @@ -use apalis_core::backend::{BackendExt, ConfigExt, queue::Queue}; -use ulid::Ulid; - -use crate::{CompactType, MySqlStorage, SqlContext}; - -pub use apalis_sql::config::*; - -impl ConfigExt for MySqlStorage -where - Self: - BackendExt, -{ - fn get_queue(&self) -> Queue { - self.config().queue().clone() - } -} diff --git a/src/fetcher.rs b/src/fetcher.rs index 231d9df..eb7b397 100644 --- a/src/fetcher.rs +++ b/src/fetcher.rs @@ -11,20 +11,20 @@ use apalis_core::{ task::Task, worker::context::WorkerContext, }; -use apalis_sql::{context::SqlContext, from_row::TaskRow}; +use apalis_sql::{config::Config, from_row::TaskRow}; use futures::{FutureExt, future::BoxFuture, stream::Stream}; use pin_project::pin_project; use sqlx::{MySql, MySqlPool, Pool}; use ulid::Ulid; -use crate::{CompactType, MySqlTask, config::Config, from_row::MySqlTaskRow}; +use crate::{CompactType, MySqlContext, MySqlTask, from_row::MySqlTaskRow}; /// Fetch the next batch of tasks from the mysql backend pub async fn fetch_next( pool: MySqlPool, config: Config, worker: WorkerContext, -) -> Result>, sqlx::Error> { +) -> Result>, sqlx::Error> { let mut tx = pool.begin().await?; let lock_at = chrono::Utc::now().naive_utc(); let job_type = config.queue().to_string(); @@ -68,7 +68,7 @@ pub async fn fetch_next( .map(|r| { let mut row: TaskRow = r.try_into()?; row.lock_by = Some(worker.clone()); - row.try_into_task_compact::() + row.try_into_task_compact() .map_err(|e| sqlx::Error::Protocol(e.to_string())) }) .collect(); diff --git a/src/lib.rs b/src/lib.rs index 735f27f..c938d05 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,18 +1,14 @@ #![doc = include_str!("../README.md")] //! -use std::{fmt, marker::PhantomData}; - +use apalis_codec::json::JsonCodec; use apalis_core::{ - backend::{ - Backend, BackendExt, TaskStream, - codec::{Codec, json::JsonCodec}, - }, + backend::{Backend, BackendExt, TaskStream, codec::Codec, queue::Queue}, features_table, layers::Stack, task::Task, worker::{context::WorkerContext, ext::ack::AcknowledgeLayer}, }; -pub use apalis_sql::context::SqlContext; +use apalis_sql::context::SqlContext; use futures::{ FutureExt, Stream, StreamExt, TryStreamExt, stream::{self, BoxStream}, @@ -23,6 +19,7 @@ pub use sqlx::{ mysql::MySqlConnectOptions, pool::{PoolConnection, PoolOptions}, }; +use std::{fmt, marker::PhantomData}; use ulid::Ulid; use crate::{ @@ -36,7 +33,6 @@ use crate::{ }; mod ack; -mod config; /// Fetcher module for retrieving tasks from mysql backend pub mod fetcher; mod from_row; @@ -47,10 +43,15 @@ mod shared; pub mod sink; /// Type alias for a task stored in mysql backend -pub type MySqlTask = Task; -pub use config::Config; +pub type MySqlTask = Task; +pub use apalis_sql::config::Config; pub use shared::{SharedMySqlError, SharedMySqlStorage}; +pub type MySqlTaskId = apalis_core::task::task_id::TaskId; +pub type MySqlContext = SqlContext; + +pub use apalis_sql::ext::TaskBuilderExt; + /// CompactType is the type used for compact serialization in mysql backend pub type CompactType = Vec; @@ -219,7 +220,7 @@ where type Args = Args; type IdType = Ulid; - type Context = SqlContext; + type Context = MySqlContext; type Error = sqlx::Error; @@ -265,7 +266,7 @@ where impl BackendExt for MySqlStorage where - Self: Backend, + Self: Backend, Decode: Codec + Send + 'static, Decode::Error: std::error::Error + Send + Sync + 'static, Args: Send + 'static + Unpin, @@ -274,6 +275,10 @@ where type Compact = CompactType; type CompactStream = TaskStream, sqlx::Error>; + fn get_queue(&self) -> Queue { + self.config.queue().to_owned() + } + fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream { self.poll_default(worker).boxed() } diff --git a/src/queries/fetch_by_id.rs b/src/queries/fetch_by_id.rs index e8b05e0..607c456 100644 --- a/src/queries/fetch_by_id.rs +++ b/src/queries/fetch_by_id.rs @@ -5,12 +5,16 @@ use apalis_core::{ use apalis_sql::from_row::{FromRowError, TaskRow}; use ulid::Ulid; -use crate::{CompactType, MySqlStorage, MySqlTask, SqlContext, from_row::MySqlTaskRow}; +use crate::{CompactType, MySqlContext, MySqlStorage, MySqlTask, from_row::MySqlTaskRow}; impl FetchById for MySqlStorage where - Self: - BackendExt, + Self: BackendExt< + Context = MySqlContext, + Compact = CompactType, + IdType = Ulid, + Error = sqlx::Error, + >, D: Codec, D::Error: std::error::Error + Send + Sync + 'static, Args: 'static, diff --git a/src/queries/list_queues.rs b/src/queries/list_queues.rs index a124c5c..9e83c85 100644 --- a/src/queries/list_queues.rs +++ b/src/queries/list_queues.rs @@ -1,7 +1,7 @@ use apalis_core::backend::{BackendExt, ListQueues, QueueInfo}; use ulid::Ulid; -use crate::{CompactType, MySqlStorage, SqlContext}; +use crate::{CompactType, MySqlContext, MySqlStorage}; struct QueueInfoRow { name: Option, @@ -23,8 +23,12 @@ impl From for QueueInfo { impl ListQueues for MySqlStorage where - Self: - BackendExt, + Self: BackendExt< + Context = MySqlContext, + Compact = CompactType, + IdType = Ulid, + Error = sqlx::Error, + >, { fn list_queues(&self) -> impl Future, Self::Error>> + Send { let pool = self.pool.clone(); diff --git a/src/queries/list_tasks.rs b/src/queries/list_tasks.rs index f02123f..c4669bd 100644 --- a/src/queries/list_tasks.rs +++ b/src/queries/list_tasks.rs @@ -5,12 +5,16 @@ use apalis_core::{ use apalis_sql::from_row::{FromRowError, TaskRow}; use ulid::Ulid; -use crate::{CompactType, MySqlStorage, MySqlTask, SqlContext, from_row::MySqlTaskRow}; +use crate::{CompactType, MySqlContext, MySqlStorage, MySqlTask, from_row::MySqlTaskRow}; impl ListTasks for MySqlStorage where - Self: - BackendExt, + Self: BackendExt< + Context = MySqlContext, + Compact = CompactType, + IdType = Ulid, + Error = sqlx::Error, + >, D: Codec, D::Error: std::error::Error + Send + Sync + 'static, Args: 'static, @@ -58,8 +62,12 @@ where impl ListAllTasks for MySqlStorage where - Self: - BackendExt, + Self: BackendExt< + Context = MySqlContext, + Compact = CompactType, + IdType = Ulid, + Error = sqlx::Error, + >, { fn list_all_tasks( &self, diff --git a/src/queries/list_workers.rs b/src/queries/list_workers.rs index 71fb136..4fac80e 100644 --- a/src/queries/list_workers.rs +++ b/src/queries/list_workers.rs @@ -3,7 +3,7 @@ use chrono::NaiveDateTime; use futures::TryFutureExt; use ulid::Ulid; -use crate::{CompactType, MySqlStorage, SqlContext}; +use crate::{CompactType, MySqlContext, MySqlStorage}; struct Worker { id: String, @@ -16,8 +16,12 @@ struct Worker { impl ListWorkers for MySqlStorage where - Self: - BackendExt, + Self: BackendExt< + Context = MySqlContext, + Compact = CompactType, + IdType = Ulid, + Error = sqlx::Error, + >, { fn list_workers( &self, diff --git a/src/queries/metrics.rs b/src/queries/metrics.rs index bfabeeb..4f8af9e 100644 --- a/src/queries/metrics.rs +++ b/src/queries/metrics.rs @@ -1,7 +1,7 @@ use apalis_core::backend::{BackendExt, Metrics, Statistic}; use ulid::Ulid; -use crate::{CompactType, MySqlStorage, SqlContext}; +use crate::{CompactType, MySqlContext, MySqlStorage}; struct StatisticRow { /// The priority of the statistic (lower number means higher priority) @@ -16,8 +16,12 @@ struct StatisticRow { impl Metrics for MySqlStorage where - Self: - BackendExt, + Self: BackendExt< + Context = MySqlContext, + Compact = CompactType, + IdType = Ulid, + Error = sqlx::Error, + >, { fn global(&self) -> impl Future, Self::Error>> + Send { let pool = self.pool.clone(); diff --git a/src/queries/wait_for.rs b/src/queries/wait_for.rs index 91c3f8f..cf2e4e0 100644 --- a/src/queries/wait_for.rs +++ b/src/queries/wait_for.rs @@ -22,7 +22,7 @@ where Self: BackendExt, Result: DeserializeOwned, { - type ResultStream = BoxStream<'static, Result, Self::Error>>; + type ResultStream = BoxStream<'static, Result, Self::Error>>; fn wait_for( &self, task_ids: impl IntoIterator>, @@ -78,12 +78,12 @@ where fn check_status( &self, task_ids: impl IntoIterator> + Send, - ) -> impl Future>, Self::Error>> + Send { + ) -> impl Future>, Self::Error>> + Send { let pool = self.pool.clone(); let ids: Vec = task_ids.into_iter().map(|id| id.to_string()).collect(); async move { - let ids = serde_json::to_string(&ids).unwrap(); + let ids = serde_json::to_string(&ids).map_err(|e| sqlx::Error::Decode(e.into()))?; let rows = sqlx::query_file_as!(ResultRow, "queries/backend/fetch_completed_tasks.sql", ids) .fetch_all(&pool) @@ -92,17 +92,17 @@ where let mut results = Vec::new(); for row in rows { let task_id = TaskId::from_str(&row.id.unwrap()) - .map_err(|_| sqlx::Error::Protocol("Invalid task ID".into()))?; + .map_err(|_| sqlx::Error::Decode("Invalid task ID".into()))?; let result: Result = serde_json::from_str(&row.result.unwrap()) - .map_err(|_| sqlx::Error::Protocol("Failed to decode result".into()))?; + .map_err(|_| sqlx::Error::Decode("Failed to decode result".into()))?; results.push(TaskResult::new( task_id, row.status .unwrap() .parse() - .map_err(|_| sqlx::Error::Protocol("Invalid status value".into()))?, + .map_err(|_| sqlx::Error::Decode("Invalid status value".into()))?, result, )); } diff --git a/src/shared.rs b/src/shared.rs index 8eb2835..67ff1cc 100644 --- a/src/shared.rs +++ b/src/shared.rs @@ -8,23 +8,20 @@ use std::{ }; use crate::{ - CompactType, Config, MySqlStorage, MySqlTask, + CompactType, Config, MySqlContext, MySqlStorage, MySqlTask, ack::{LockTaskLayer, MySqlAck}, fetcher::MySqlPollFetcher, initial_heartbeat, keep_alive, }; use crate::{from_row::MySqlTaskRow, sink::MySqlSink}; +use apalis_codec::json::JsonCodec; use apalis_core::{ - backend::{ - Backend, BackendExt, TaskStream, - codec::{Codec, json::JsonCodec}, - shared::MakeShared, - }, + backend::{Backend, BackendExt, TaskStream, codec::Codec, queue::Queue, shared::MakeShared}, layers::Stack, worker::{context::WorkerContext, ext::ack::AcknowledgeLayer}, }; -use apalis_sql::{context::SqlContext, from_row::TaskRow}; +use apalis_sql::from_row::TaskRow; use futures::{ FutureExt, Stream, StreamExt, TryStreamExt, channel::mpsc::{self, Receiver, Sender}, @@ -117,7 +114,7 @@ impl SharedMySqlStorage> { .into_iter() .map(|r| { let row: TaskRow = r.try_into()?; - row.try_into_task_compact::() + row.try_into_task_compact() .map_err(|e| sqlx::Error::Protocol(e.to_string())) }) .collect::, _>>() @@ -231,7 +228,7 @@ where type Beat = BoxStream<'static, Result<(), sqlx::Error>>; - type Context = SqlContext; + type Context = MySqlContext; type Layer = Stack, LockTaskLayer>; @@ -272,7 +269,7 @@ where impl BackendExt for MySqlStorage> where - Self: Backend, + Self: Backend, Decode: Codec + Send + 'static, Decode::Error: std::error::Error + Send + Sync + 'static, Args: Send + 'static + Unpin, @@ -281,6 +278,10 @@ where type Compact = CompactType; type CompactStream = TaskStream, sqlx::Error>; + fn get_queue(&self) -> Queue { + self.config.queue().to_owned() + } + fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream { self.poll_shared(worker).boxed() } diff --git a/src/sink.rs b/src/sink.rs index 24bc45e..5a65c0f 100644 --- a/src/sink.rs +++ b/src/sink.rs @@ -12,7 +12,7 @@ use futures::{ use sqlx::MySqlPool; use ulid::Ulid; -use crate::{CompactType, MySqlStorage, MySqlTask, config::Config}; +use crate::{CompactType, Config, MySqlStorage, MySqlTask}; type FlushFuture = BoxFuture<'static, Result<(), Arc>>; diff --git a/supply-chain/config.toml b/supply-chain/config.toml index 97afa97..7ae8c94 100644 --- a/supply-chain/config.toml +++ b/supply-chain/config.toml @@ -16,20 +16,44 @@ criteria = "safe-to-deploy" version = "0.1.5" criteria = "safe-to-deploy" +[[exemptions.anstream]] +version = "0.6.21" +criteria = "safe-to-run" + +[[exemptions.anstyle]] +version = "1.0.13" +criteria = "safe-to-run" + +[[exemptions.anstyle-parse]] +version = "0.2.7" +criteria = "safe-to-run" + +[[exemptions.anstyle-query]] +version = "1.1.5" +criteria = "safe-to-run" + +[[exemptions.anstyle-wincon]] +version = "3.0.11" +criteria = "safe-to-run" + [[exemptions.apalis]] -version = "1.0.0-beta.2" +version = "1.0.0-rc.1" criteria = "safe-to-run" +[[exemptions.apalis-codec]] +version = "0.1.0-rc.1" +criteria = "safe-to-deploy" + [[exemptions.apalis-core]] -version = "1.0.0-beta.2" +version = "1.0.0-rc.1" criteria = "safe-to-deploy" [[exemptions.apalis-sql]] -version = "1.0.0-beta.2" +version = "1.0.0-rc.1" criteria = "safe-to-deploy" [[exemptions.apalis-workflow]] -version = "0.1.0-beta.2" +version = "0.1.0-rc.1" criteria = "safe-to-run" [[exemptions.async-channel]] @@ -61,7 +85,7 @@ version = "2.8.0" criteria = "safe-to-deploy" [[exemptions.async-lock]] -version = "3.4.1" +version = "3.4.2" criteria = "safe-to-deploy" [[exemptions.async-std]] @@ -89,7 +113,7 @@ version = "0.22.1" criteria = "safe-to-deploy" [[exemptions.base64ct]] -version = "1.8.0" +version = "1.8.1" criteria = "safe-to-deploy" [[exemptions.bitflags]] @@ -109,7 +133,7 @@ version = "1.6.2" criteria = "safe-to-deploy" [[exemptions.bumpalo]] -version = "3.19.0" +version = "3.19.1" criteria = "safe-to-deploy" [[exemptions.byteorder]] @@ -121,7 +145,7 @@ version = "1.11.0" criteria = "safe-to-deploy" [[exemptions.cc]] -version = "1.2.49" +version = "1.2.50" criteria = "safe-to-deploy" [[exemptions.cfg-if]] @@ -132,6 +156,10 @@ criteria = "safe-to-deploy" version = "0.4.42" criteria = "safe-to-deploy" +[[exemptions.colorchoice]] +version = "1.0.4" +criteria = "safe-to-run" + [[exemptions.concurrent-queue]] version = "2.5.0" criteria = "safe-to-deploy" @@ -192,8 +220,12 @@ criteria = "safe-to-deploy" version = "1.15.0" criteria = "safe-to-deploy" +[[exemptions.env_filter]] +version = "0.1.4" +criteria = "safe-to-run" + [[exemptions.env_logger]] -version = "0.10.2" +version = "0.11.8" criteria = "safe-to-run" [[exemptions.equivalent]] @@ -364,10 +396,6 @@ criteria = "safe-to-deploy" version = "0.5.12" criteria = "safe-to-deploy" -[[exemptions.humantime]] -version = "2.3.0" -criteria = "safe-to-run" - [[exemptions.iana-time-zone]] version = "0.1.64" criteria = "safe-to-deploy" @@ -393,11 +421,11 @@ version = "2.1.1" criteria = "safe-to-deploy" [[exemptions.icu_properties]] -version = "2.1.1" +version = "2.1.2" criteria = "safe-to-deploy" [[exemptions.icu_properties_data]] -version = "2.1.1" +version = "2.1.2" criteria = "safe-to-deploy" [[exemptions.icu_provider]] @@ -424,14 +452,22 @@ criteria = "safe-to-deploy" version = "1.0.11" criteria = "safe-to-deploy" -[[exemptions.is-terminal]] -version = "0.4.17" +[[exemptions.is_terminal_polyfill]] +version = "1.70.2" criteria = "safe-to-run" [[exemptions.itoa]] -version = "1.0.15" +version = "1.0.16" criteria = "safe-to-deploy" +[[exemptions.jiff]] +version = "0.2.17" +criteria = "safe-to-run" + +[[exemptions.jiff-static]] +version = "0.2.17" +criteria = "safe-to-run" + [[exemptions.js-sys]] version = "0.3.83" criteria = "safe-to-deploy" @@ -453,7 +489,7 @@ version = "0.2.15" criteria = "safe-to-deploy" [[exemptions.libredox]] -version = "0.1.10" +version = "0.1.11" criteria = "safe-to-deploy" [[exemptions.libsqlite3-sys]] @@ -516,6 +552,10 @@ criteria = "safe-to-deploy" version = "1.21.3" criteria = "safe-to-deploy" +[[exemptions.once_cell_polyfill]] +version = "1.70.2" +criteria = "safe-to-run" + [[exemptions.openssl]] version = "0.10.75" criteria = "safe-to-deploy" @@ -596,6 +636,14 @@ criteria = "safe-to-deploy" version = "3.11.0" criteria = "safe-to-deploy" +[[exemptions.portable-atomic]] +version = "1.12.0" +criteria = "safe-to-run" + +[[exemptions.portable-atomic-util]] +version = "0.2.4" +criteria = "safe-to-run" + [[exemptions.potential_utf]] version = "0.1.4" criteria = "safe-to-deploy" @@ -644,6 +692,10 @@ criteria = "safe-to-deploy" version = "0.5.18" criteria = "safe-to-deploy" +[[exemptions.redox_syscall]] +version = "0.6.0" +criteria = "safe-to-deploy" + [[exemptions.regex]] version = "1.12.2" criteria = "safe-to-run" @@ -669,7 +721,7 @@ version = "0.37.28" criteria = "safe-to-deploy" [[exemptions.rustix]] -version = "1.1.2" +version = "1.1.3" criteria = "safe-to-deploy" [[exemptions.rustls]] @@ -677,7 +729,7 @@ version = "0.23.35" criteria = "safe-to-deploy" [[exemptions.rustls-pki-types]] -version = "1.13.1" +version = "1.13.2" criteria = "safe-to-deploy" [[exemptions.rustls-webpki]] @@ -689,7 +741,7 @@ version = "1.0.22" criteria = "safe-to-deploy" [[exemptions.ryu]] -version = "1.0.20" +version = "1.0.21" criteria = "safe-to-deploy" [[exemptions.schannel]] @@ -721,7 +773,7 @@ version = "1.0.228" criteria = "safe-to-deploy" [[exemptions.serde_json]] -version = "1.0.145" +version = "1.0.147" criteria = "safe-to-deploy" [[exemptions.serde_urlencoded]] @@ -821,13 +873,9 @@ version = "0.13.2" criteria = "safe-to-deploy" [[exemptions.tempfile]] -version = "3.23.0" +version = "3.24.0" criteria = "safe-to-deploy" -[[exemptions.termcolor]] -version = "1.4.1" -criteria = "safe-to-run" - [[exemptions.thiserror]] version = "2.0.17" criteria = "safe-to-deploy" @@ -877,7 +925,7 @@ version = "0.3.3" criteria = "safe-to-deploy" [[exemptions.tracing]] -version = "0.1.43" +version = "0.1.44" criteria = "safe-to-deploy" [[exemptions.tracing-attributes]] @@ -885,7 +933,7 @@ version = "0.1.31" criteria = "safe-to-deploy" [[exemptions.tracing-core]] -version = "0.1.35" +version = "0.1.36" criteria = "safe-to-deploy" [[exemptions.typenum]] @@ -924,6 +972,10 @@ criteria = "safe-to-deploy" version = "1.0.4" criteria = "safe-to-deploy" +[[exemptions.utf8parse]] +version = "0.2.2" +criteria = "safe-to-run" + [[exemptions.value-bag]] version = "1.12.0" criteria = "safe-to-deploy" @@ -1000,10 +1052,6 @@ criteria = "safe-to-deploy" version = "0.4.0" criteria = "safe-to-deploy" -[[exemptions.winapi-util]] -version = "0.1.11" -criteria = "safe-to-run" - [[exemptions.winapi-x86_64-pc-windows-gnu]] version = "0.4.0" criteria = "safe-to-deploy" @@ -1199,3 +1247,7 @@ criteria = "safe-to-deploy" [[exemptions.zerovec-derive]] version = "0.11.2" criteria = "safe-to-deploy" + +[[exemptions.zmij]] +version = "0.1.8" +criteria = "safe-to-deploy"