From c96d98b9f4e8456e9a2c007ea74b95378ac56f40 Mon Sep 17 00:00:00 2001 From: Sholom Ber <46351743+Himmelschmidt@users.noreply.github.com> Date: Fri, 26 Dec 2025 12:05:44 -0500 Subject: [PATCH 1/2] feat: use SqlDateTime abstraction from apalis-sql - Add chrono/time feature flags for datetime library selection - Replace direct chrono usage with SqlDateTime and SqlDateTimeExt - Remove hardcoded chrono dependency in favor of feature-gated support - Update to apalis-sql path dependency for datetime abstraction Related to apalis-dev/apalis#655 --- Cargo.lock | 90 ++++++++++++++++++++++++++++++---- Cargo.toml | 11 +++-- src/from_row.rs | 15 +++--- src/lib.rs | 11 +---- src/queries/keep_alive.rs | 4 +- src/queries/list_workers.rs | 14 +++--- src/queries/register_worker.rs | 4 +- src/queries/wait_for.rs | 4 +- src/shared.rs | 6 +-- src/sink.rs | 5 +- 10 files changed, 113 insertions(+), 51 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 064dc41..9e5f88e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,7 +23,7 @@ version = "1.0.0-rc.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f93be0eb33b912f5e66004d0b756423c285273259068b1c80a71d7842658189b" dependencies = [ - "apalis-core", + "apalis-core 1.0.0-rc.1", "futures-util", "pin-project", "thiserror", @@ -37,11 +37,28 @@ version = "0.1.0-rc.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a5ed6bb8e64c360ed4ad666a6cbc42e9e6df73087461dc4071f510a3af284637" dependencies = [ - "apalis-core", + "apalis-core 1.0.0-rc.1", "serde", "serde_json", ] +[[package]] +name = "apalis-core" +version = "1.0.0-beta.2" +dependencies = [ + "futures-channel", + "futures-core", + "futures-sink", + "futures-timer", + "futures-util", + "pin-project", + "serde", + "serde_json", + "thiserror", + "tower-layer", + "tower-service", +] + [[package]] name = "apalis-core" version = "1.0.0-rc.1" @@ -67,11 +84,10 @@ version = "1.0.0-rc.1" dependencies = [ "apalis", "apalis-codec", - "apalis-core", + "apalis-core 1.0.0-beta.2", "apalis-sql", "apalis-workflow", "async-std", - "chrono", "futures", "futures-util", "once_cell", @@ -86,15 +102,14 @@ dependencies = [ [[package]] name = "apalis-sql" -version = "1.0.0-rc.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ade5d8faa60e9975b01d3bb1ebc5028589aa4986365eaa4d080d30ed3b5141f" +version = "1.0.0-beta.2" dependencies = [ - "apalis-core", + "apalis-core 1.0.0-beta.2", "chrono", "serde", "serde_json", "thiserror", + "time", ] [[package]] @@ -103,7 +118,7 @@ version = "0.1.0-rc.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc024da2d5d3ab59cc9fea099a2e2b20de5ff608f2e287abcb73aa45e4966a89" dependencies = [ - "apalis-core", + "apalis-core 1.0.0-rc.1", "futures", "petgraph", "serde", @@ -464,6 +479,16 @@ dependencies = [ "zeroize", ] +[[package]] +name = "deranged" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ececcb659e7ba858fb4f10388c250a7252eb0a27373f1a72b8748afdd248e587" +dependencies = [ + "powerfmt", + "serde_core", +] + [[package]] name = "digest" version = "0.10.7" @@ -1199,6 +1224,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-integer" version = "0.1.46" @@ -1445,6 +1476,12 @@ dependencies = [ "zerovec", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -1893,6 +1930,7 @@ dependencies = [ "sha2", "smallvec", "thiserror", + "time", "tokio", "tokio-stream", "tracing", @@ -1978,6 +2016,7 @@ dependencies = [ "sqlx-core", "stringprep", "thiserror", + "time", "tracing", "whoami", ] @@ -2016,6 +2055,7 @@ dependencies = [ "sqlx-core", "stringprep", "thiserror", + "time", "tracing", "whoami", ] @@ -2041,6 +2081,7 @@ dependencies = [ "serde_urlencoded", "sqlx-core", "thiserror", + "time", "tracing", "url", ] @@ -2129,6 +2170,37 @@ dependencies = [ "syn", ] +[[package]] +name = "time" +version = "0.3.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e7d9e3bb61134e77bde20dd4825b97c010155709965fedf0f49bb138e52a9d" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40868e7c1d2f0b8d73e4a8c7f0ff63af4f6d19be117e90bd73eb1d62cf831c6b" + +[[package]] +name = "time-macros" +version = "0.2.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30cfb0125f12d9c277f35663a0a33f8c30190f4e4574868a330595412d34ebf3" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tinystr" version = "0.8.2" diff --git a/Cargo.toml b/Cargo.toml index 07a652e..cd5830b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,21 +14,22 @@ categories = ["asynchronous", "database", "network-programming"] publish = true [features] -default = ["migrate", "tokio-comp"] +default = ["migrate", "tokio-comp", "chrono"] migrate = ["sqlx/migrate", "sqlx/macros"] async-std-comp = ["async-std", "sqlx/runtime-async-std-rustls"] async-std-comp-native-tls = ["async-std", "sqlx/runtime-async-std-native-tls"] tokio-comp = ["tokio", "sqlx/runtime-tokio-rustls"] tokio-comp-native-tls = ["tokio", "sqlx/runtime-tokio-native-tls"] +chrono = ["apalis-sql/chrono", "sqlx/chrono"] +time = ["apalis-sql/time", "sqlx/time"] [dependencies] -apalis-core = { version = "1.0.0-rc.1", default-features = false, features = [ +apalis-core = { path = "../apalis/apalis-core", version = "1.0.0-beta.2", default-features = false, features = [ "sleep", ] } -apalis-sql = { version = "1.0.0-rc.1", default-features = false } +apalis-sql = { path = "../apalis/apalis-sql", version = "1.0.0-beta.2", default-features = false } apalis-codec = { version = "0.1.0-rc.1", features = ["json"] } serde = { version = "1", features = ["derive"], default-features = false } -chrono = { version = "0.4", features = ["serde"], default-features = false } pin-project = "1.1.10" serde_json = "1" futures = "0.3.30" @@ -44,7 +45,7 @@ ulid = { version = "1", features = ["serde"] } [dependencies.sqlx] version = "0.8.1" default-features = false -features = ["chrono", "postgres", "json"] +features = ["postgres", "json"] [dev-dependencies] tokio = { version = "1", features = ["macros", "rt-multi-thread"] } diff --git a/src/from_row.rs b/src/from_row.rs index d4a1dc3..f988429 100644 --- a/src/from_row.rs +++ b/src/from_row.rs @@ -1,4 +1,5 @@ -use chrono::{DateTime, Utc}; +use apalis_sql::{SqlDateTime, TaskRow}; + #[derive(Debug)] pub struct PgTaskRow { pub job: Option>, @@ -7,19 +8,19 @@ pub struct PgTaskRow { pub status: Option, pub attempts: Option, pub max_attempts: Option, - pub run_at: Option>, + pub run_at: Option, pub last_result: Option, - pub lock_at: Option>, + pub lock_at: Option, pub lock_by: Option, - pub done_at: Option>, + pub done_at: Option, pub priority: Option, pub metadata: Option, } -impl TryInto for PgTaskRow { +impl TryInto for PgTaskRow { type Error = sqlx::Error; - fn try_into(self) -> Result { - Ok(apalis_sql::from_row::TaskRow { + fn try_into(self) -> Result { + Ok(TaskRow { job: self.job.unwrap_or_default(), id: self .id diff --git a/src/lib.rs b/src/lib.rs index 34f49af..3b9ca05 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,7 +6,7 @@ use std::{fmt::Debug, marker::PhantomData}; use apalis_codec::json::JsonCodec; use apalis_core::{ - backend::{Backend, BackendExt, TaskStream, codec::Codec, queue::Queue}, + backend::{Backend, BackendExt, TaskStream, codec::Codec}, features_table, layers::Stack, task::{Task, task_id::TaskId}, @@ -36,7 +36,7 @@ mod ack; mod fetcher; mod from_row; -pub type PgContext = apalis_sql::context::SqlContext; +pub type PgContext = apalis_sql::context::SqlContext; mod queries; pub mod shared; pub mod sink; @@ -251,9 +251,6 @@ where type Codec = Decode; type CompactStream = TaskStream, Self::Error>; - fn get_queue(&self) -> Queue { - self.config.queue().clone() - } fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream { self.poll_basic(worker).boxed() } @@ -348,10 +345,6 @@ where type Codec = Decode; type CompactStream = TaskStream, Self::Error>; - fn get_queue(&self) -> Queue { - self.config.queue().clone() - } - fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream { self.poll_with_notify(worker).boxed() } diff --git a/src/queries/keep_alive.rs b/src/queries/keep_alive.rs index c0a5a4a..3332093 100644 --- a/src/queries/keep_alive.rs +++ b/src/queries/keep_alive.rs @@ -1,5 +1,5 @@ use apalis_core::worker::context::WorkerContext; -use chrono::Utc; +use apalis_sql::{SqlDateTime, SqlDateTimeExt}; use futures::{FutureExt, Stream, stream}; use sqlx::PgPool; @@ -36,7 +36,7 @@ pub async fn initial_heartbeat( storage_type: &str, ) -> Result<(), sqlx::Error> { reenqueue_orphaned(pool.clone(), config.clone()).await?; - let last_seen = Utc::now(); + let last_seen = SqlDateTime::now(); register_worker( pool, config.queue().to_string(), diff --git a/src/queries/list_workers.rs b/src/queries/list_workers.rs index adff2a5..71b921c 100644 --- a/src/queries/list_workers.rs +++ b/src/queries/list_workers.rs @@ -1,5 +1,5 @@ use apalis_core::backend::{BackendExt, ListWorkers, RunningWorker}; -use chrono::{DateTime, Utc}; +use apalis_sql::{SqlDateTime, SqlDateTimeExt}; use futures::TryFutureExt; use ulid::Ulid; @@ -9,8 +9,8 @@ pub struct WorkerRow { pub worker_type: String, pub storage_name: String, pub layers: Option, - pub last_seen: DateTime, - pub started_at: Option>, + pub last_seen: SqlDateTime, + pub started_at: Option, } use crate::{CompactType, PgContext, PostgresStorage}; @@ -42,8 +42,8 @@ where .map(|w| RunningWorker { id: w.id, backend: w.storage_name, - started_at: w.started_at.map(|t| t.timestamp()).unwrap_or_default() as u64, - last_heartbeat: w.last_seen.timestamp() as u64, + started_at: w.started_at.map(|t| t.to_unix_timestamp()).unwrap_or_default() as u64, + last_heartbeat: w.last_seen.to_unix_timestamp() as u64, layers: w.layers.unwrap_or_default(), queue: w.worker_type, }) @@ -73,8 +73,8 @@ where .map(|w| RunningWorker { id: w.id, backend: w.storage_name, - started_at: w.started_at.map(|t| t.timestamp()).unwrap_or_default() as u64, - last_heartbeat: w.last_seen.timestamp() as u64, + started_at: w.started_at.map(|t| t.to_unix_timestamp()).unwrap_or_default() as u64, + last_heartbeat: w.last_seen.to_unix_timestamp() as u64, layers: w.layers.unwrap_or_default(), queue: w.worker_type, }) diff --git a/src/queries/register_worker.rs b/src/queries/register_worker.rs index 641c4f5..7ed0f40 100644 --- a/src/queries/register_worker.rs +++ b/src/queries/register_worker.rs @@ -1,12 +1,12 @@ use apalis_core::worker::context::WorkerContext; -use chrono::{DateTime, Utc}; +use apalis_sql::SqlDateTime; use sqlx::PgPool; pub async fn register( pool: PgPool, worker_type: String, worker: WorkerContext, - last_seen: DateTime, + last_seen: SqlDateTime, backend_type: &str, ) -> Result<(), sqlx::Error> { let res = sqlx::query_file!( diff --git a/src/queries/wait_for.rs b/src/queries/wait_for.rs index 4c9b707..4171007 100644 --- a/src/queries/wait_for.rs +++ b/src/queries/wait_for.rs @@ -24,7 +24,7 @@ where BackendExt, Result: DeserializeOwned, { - type ResultStream = BoxStream<'static, Result, Self::Error>>; + type ResultStream = BoxStream<'static, Result, Self::Error>>; fn wait_for( &self, task_ids: impl IntoIterator>, @@ -80,7 +80,7 @@ where fn check_status( &self, task_ids: impl IntoIterator> + Send, - ) -> impl Future>, Self::Error>> + Send { + ) -> impl Future>, Self::Error>> + Send { let pool = self.pool.clone(); let ids: Vec = task_ids.into_iter().map(|id| id.to_string()).collect(); diff --git a/src/shared.rs b/src/shared.rs index 46a08a5..87ff19e 100644 --- a/src/shared.rs +++ b/src/shared.rs @@ -19,7 +19,7 @@ use crate::{ use crate::{from_row::PgTaskRow, sink::PgSink}; use apalis_codec::json::JsonCodec; use apalis_core::{ - backend::{Backend, BackendExt, TaskStream, codec::Codec, queue::Queue, shared::MakeShared}, + backend::{Backend, BackendExt, TaskStream, codec::Codec, shared::MakeShared}, layers::Stack, worker::{context::WorkerContext, ext::ack::AcknowledgeLayer}, }; @@ -222,10 +222,6 @@ where type Codec = Decode; type CompactStream = TaskStream, Self::Error>; - fn get_queue(&self) -> Queue { - self.config.queue().clone() - } - fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream { self.poll_shared(worker).boxed() } diff --git a/src/sink.rs b/src/sink.rs index 2c0565f..3bbba02 100644 --- a/src/sink.rs +++ b/src/sink.rs @@ -1,6 +1,5 @@ use apalis_codec::json::JsonCodec; -use apalis_sql::config::Config; -use chrono::{DateTime, Utc}; +use apalis_sql::{SqlDateTime, SqlDateTimeExt, config::Config}; use futures::{ FutureExt, Sink, TryFutureExt, future::{BoxFuture, Shared}, @@ -64,7 +63,7 @@ where .unwrap_or(Ulid::new().to_string()), ); job_data.push(task.args); - run_ats.push(DateTime::from_timestamp(task.parts.run_at as i64, 0).unwrap_or(Utc::now())); + run_ats.push(::from_unix_timestamp(task.parts.run_at as i64)); priorities.push(task.parts.ctx.priority()); max_attempts_vec.push(task.parts.ctx.max_attempts()); metadata.push(serde_json::Value::Object(task.parts.ctx.meta().clone())); From 3a26ef9c4655870df1f7988c67341ed4d874bcfb Mon Sep 17 00:00:00 2001 From: Sholom Ber <46351743+Himmelschmidt@users.noreply.github.com> Date: Tue, 30 Dec 2025 10:18:53 -0500 Subject: [PATCH 2/2] chore: update apalis deps to git and add get_queue impl --- Cargo.lock | 80 ++++++++++++++----------------------- Cargo.toml | 10 ++--- src/lib.rs | 12 +++++- src/queries/list_workers.rs | 10 ++++- src/queries/wait_for.rs | 4 +- src/shared.rs | 6 ++- src/sink.rs | 4 +- 7 files changed, 63 insertions(+), 63 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9e5f88e..7b7733a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -20,10 +20,9 @@ dependencies = [ [[package]] name = "apalis" version = "1.0.0-rc.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f93be0eb33b912f5e66004d0b756423c285273259068b1c80a71d7842658189b" +source = "git+https://github.com/apalis-dev/apalis#82670cfd52c6054d69620e2fbe8c140d573fa7b5" dependencies = [ - "apalis-core 1.0.0-rc.1", + "apalis-core", "futures-util", "pin-project", "thiserror", @@ -34,36 +33,17 @@ dependencies = [ [[package]] name = "apalis-codec" version = "0.1.0-rc.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5ed6bb8e64c360ed4ad666a6cbc42e9e6df73087461dc4071f510a3af284637" +source = "git+https://github.com/apalis-dev/apalis#82670cfd52c6054d69620e2fbe8c140d573fa7b5" dependencies = [ - "apalis-core 1.0.0-rc.1", + "apalis-core", "serde", "serde_json", ] -[[package]] -name = "apalis-core" -version = "1.0.0-beta.2" -dependencies = [ - "futures-channel", - "futures-core", - "futures-sink", - "futures-timer", - "futures-util", - "pin-project", - "serde", - "serde_json", - "thiserror", - "tower-layer", - "tower-service", -] - [[package]] name = "apalis-core" version = "1.0.0-rc.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b1557d680ee4a9b42a76ab3a9572cee1a00d45e7eb455427d906c42774766e7" +source = "git+https://github.com/apalis-dev/apalis#82670cfd52c6054d69620e2fbe8c140d573fa7b5" dependencies = [ "futures-channel", "futures-core", @@ -84,7 +64,7 @@ version = "1.0.0-rc.1" dependencies = [ "apalis", "apalis-codec", - "apalis-core 1.0.0-beta.2", + "apalis-core", "apalis-sql", "apalis-workflow", "async-std", @@ -102,9 +82,10 @@ dependencies = [ [[package]] name = "apalis-sql" -version = "1.0.0-beta.2" +version = "1.0.0-rc.1" +source = "git+https://github.com/apalis-dev/apalis#82670cfd52c6054d69620e2fbe8c140d573fa7b5" dependencies = [ - "apalis-core 1.0.0-beta.2", + "apalis-core", "chrono", "serde", "serde_json", @@ -115,10 +96,9 @@ dependencies = [ [[package]] name = "apalis-workflow" version = "0.1.0-rc.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc024da2d5d3ab59cc9fea099a2e2b20de5ff608f2e287abcb73aa45e4966a89" +source = "git+https://github.com/apalis-dev/apalis#82670cfd52c6054d69620e2fbe8c140d573fa7b5" dependencies = [ - "apalis-core 1.0.0-rc.1", + "apalis-core", "futures", "petgraph", "serde", @@ -360,9 +340,9 @@ checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" [[package]] name = "cc" -version = "1.2.50" +version = "1.2.51" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f50d563227a1c37cc0a263f64eca3334388c01c5e4c4861a9def205c614383c" +checksum = "7a0aeaff4ff1a90589618835a598e545176939b97874f7abc7851caa0618f203" dependencies = [ "find-msvc-tools", "shlex", @@ -598,9 +578,9 @@ checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" [[package]] name = "find-msvc-tools" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a3076410a55c90011c298b04d0cfa770b00fa04e1e3c97d3f6c9de105a03844" +checksum = "645cbb3a84e60b7531617d5ae4e57f7e27308f6445f5abf653209ea76dec8dff" [[package]] name = "fixedbitset" @@ -1063,9 +1043,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.16" +version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ee5b5339afb4c41626dde77b7a611bd4f2c202b897852b4bcf5d03eddc61010" +checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2" [[package]] name = "js-sys" @@ -1109,13 +1089,13 @@ checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de" [[package]] name = "libredox" -version = "0.1.11" +version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df15f6eac291ed1cf25865b1ee60399f57e7c227e7f51bdbd4c5270396a9ed50" +checksum = "3d0b95e02c851351f877147b7deea7b1afb1df71b63aa5f8270716e0c5720616" dependencies = [ "bitflags 2.10.0", "libc", - "redox_syscall 0.6.0", + "redox_syscall 0.7.0", ] [[package]] @@ -1493,9 +1473,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.103" +version = "1.0.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ee95bc4ef87b8d5ba32e8b7714ccc834865276eab0aed5c9958d00ec45f49e8" +checksum = "9695f8df41bb4f3d222c95a67532365f569318332d03d5f3f67f37b20e6ebdf0" dependencies = [ "unicode-ident", ] @@ -1585,9 +1565,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.6.0" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec96166dafa0886eb81fe1c0a388bece180fbef2135f97c1e2cf8302e74b43b5" +checksum = "49f3fe0889e69e2ae9e41f4d6c4c0181701d00e4697b356fb1f74173a5e0ee27" dependencies = [ "bitflags 2.10.0", ] @@ -1695,9 +1675,9 @@ checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" [[package]] name = "ryu" -version = "1.0.21" +version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62049b2877bf12821e8f9ad256ee38fdc31db7387ec2d3b3f403024de2034aea" +checksum = "a50f4cf475b65d88e057964e0e9bb1f0aa9bbb2036dc65c64596b42932536984" [[package]] name = "schannel" @@ -1769,9 +1749,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.147" +version = "1.0.148" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6af14725505314343e673e9ecb7cd7e8a36aa9791eb936235a3567cc31447ae4" +checksum = "3084b546a1dd6289475996f182a22aba973866ea8e8b02c51d9f46b1336a22da" dependencies = [ "itoa", "memchr", @@ -2976,6 +2956,6 @@ dependencies = [ [[package]] name = "zmij" -version = "0.1.8" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1dccf46b25b205e4bebe1d5258a991df1cc17801017a845cb5b3fe0269781aa" +checksum = "e9747e91771f56fd7893e1164abd78febd14a670ceec257caad15e051de35f06" diff --git a/Cargo.toml b/Cargo.toml index cd5830b..e40e367 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,11 +24,11 @@ chrono = ["apalis-sql/chrono", "sqlx/chrono"] time = ["apalis-sql/time", "sqlx/time"] [dependencies] -apalis-core = { path = "../apalis/apalis-core", version = "1.0.0-beta.2", default-features = false, features = [ +apalis-core = { git = "https://github.com/apalis-dev/apalis", version = "1.0.0-rc.1", default-features = false, features = [ "sleep", ] } -apalis-sql = { path = "../apalis/apalis-sql", version = "1.0.0-beta.2", default-features = false } -apalis-codec = { version = "0.1.0-rc.1", features = ["json"] } +apalis-sql = { git = "https://github.com/apalis-dev/apalis", version = "1.0.0-rc.1", default-features = false } +apalis-codec = { git = "https://github.com/apalis-dev/apalis", version = "0.1.0-rc.1", features = ["json"] } serde = { version = "1", features = ["derive"], default-features = false } pin-project = "1.1.10" serde_json = "1" @@ -50,6 +50,6 @@ features = ["postgres", "json"] [dev-dependencies] tokio = { version = "1", features = ["macros", "rt-multi-thread"] } once_cell = "1.19.0" -apalis = { version = "1.0.0-rc.1" } -apalis-workflow = { version = "0.1.0-rc.1" } +apalis = { git = "https://github.com/apalis-dev/apalis", version = "1.0.0-rc.1" } +apalis-workflow = { git = "https://github.com/apalis-dev/apalis", version = "0.1.0-rc.1" } futures-util = "0.3.30" diff --git a/src/lib.rs b/src/lib.rs index 3b9ca05..452e665 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,7 +6,7 @@ use std::{fmt::Debug, marker::PhantomData}; use apalis_codec::json::JsonCodec; use apalis_core::{ - backend::{Backend, BackendExt, TaskStream, codec::Codec}, + backend::{Backend, BackendExt, TaskStream, codec::Codec, queue::Queue}, features_table, layers::Stack, task::{Task, task_id::TaskId}, @@ -36,7 +36,7 @@ mod ack; mod fetcher; mod from_row; -pub type PgContext = apalis_sql::context::SqlContext; +pub type PgContext = apalis_sql::context::SqlContext; mod queries; pub mod shared; pub mod sink; @@ -251,6 +251,10 @@ where type Codec = Decode; type CompactStream = TaskStream, Self::Error>; + fn get_queue(&self) -> Queue { + Queue::from(self.config.queue().to_string()) + } + fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream { self.poll_basic(worker).boxed() } @@ -345,6 +349,10 @@ where type Codec = Decode; type CompactStream = TaskStream, Self::Error>; + fn get_queue(&self) -> Queue { + Queue::from(self.config.queue().to_string()) + } + fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream { self.poll_with_notify(worker).boxed() } diff --git a/src/queries/list_workers.rs b/src/queries/list_workers.rs index 71b921c..20f42fc 100644 --- a/src/queries/list_workers.rs +++ b/src/queries/list_workers.rs @@ -42,7 +42,10 @@ where .map(|w| RunningWorker { id: w.id, backend: w.storage_name, - started_at: w.started_at.map(|t| t.to_unix_timestamp()).unwrap_or_default() as u64, + started_at: w + .started_at + .map(|t| t.to_unix_timestamp()) + .unwrap_or_default() as u64, last_heartbeat: w.last_seen.to_unix_timestamp() as u64, layers: w.layers.unwrap_or_default(), queue: w.worker_type, @@ -73,7 +76,10 @@ where .map(|w| RunningWorker { id: w.id, backend: w.storage_name, - started_at: w.started_at.map(|t| t.to_unix_timestamp()).unwrap_or_default() as u64, + started_at: w + .started_at + .map(|t| t.to_unix_timestamp()) + .unwrap_or_default() as u64, last_heartbeat: w.last_seen.to_unix_timestamp() as u64, layers: w.layers.unwrap_or_default(), queue: w.worker_type, diff --git a/src/queries/wait_for.rs b/src/queries/wait_for.rs index 4171007..4c9b707 100644 --- a/src/queries/wait_for.rs +++ b/src/queries/wait_for.rs @@ -24,7 +24,7 @@ where BackendExt, Result: DeserializeOwned, { - type ResultStream = BoxStream<'static, Result, Self::Error>>; + type ResultStream = BoxStream<'static, Result, Self::Error>>; fn wait_for( &self, task_ids: impl IntoIterator>, @@ -80,7 +80,7 @@ where fn check_status( &self, task_ids: impl IntoIterator> + Send, - ) -> impl Future>, Self::Error>> + Send { + ) -> impl Future>, Self::Error>> + Send { let pool = self.pool.clone(); let ids: Vec = task_ids.into_iter().map(|id| id.to_string()).collect(); diff --git a/src/shared.rs b/src/shared.rs index 87ff19e..b21fe83 100644 --- a/src/shared.rs +++ b/src/shared.rs @@ -19,7 +19,7 @@ use crate::{ use crate::{from_row::PgTaskRow, sink::PgSink}; use apalis_codec::json::JsonCodec; use apalis_core::{ - backend::{Backend, BackendExt, TaskStream, codec::Codec, shared::MakeShared}, + backend::{Backend, BackendExt, TaskStream, codec::Codec, queue::Queue, shared::MakeShared}, layers::Stack, worker::{context::WorkerContext, ext::ack::AcknowledgeLayer}, }; @@ -222,6 +222,10 @@ where type Codec = Decode; type CompactStream = TaskStream, Self::Error>; + fn get_queue(&self) -> Queue { + Queue::from(self.config.queue().to_string()) + } + fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream { self.poll_shared(worker).boxed() } diff --git a/src/sink.rs b/src/sink.rs index 3bbba02..6dcb89f 100644 --- a/src/sink.rs +++ b/src/sink.rs @@ -63,7 +63,9 @@ where .unwrap_or(Ulid::new().to_string()), ); job_data.push(task.args); - run_ats.push(::from_unix_timestamp(task.parts.run_at as i64)); + run_ats.push(::from_unix_timestamp( + task.parts.run_at as i64, + )); priorities.push(task.parts.ctx.priority()); max_attempts_vec.push(task.parts.ctx.max_attempts()); metadata.push(serde_json::Value::Object(task.parts.ctx.meta().clone()));