diff --git a/Cargo.lock b/Cargo.lock index 064dc41..efe2fad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,7 +23,7 @@ version = "1.0.0-rc.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f93be0eb33b912f5e66004d0b756423c285273259068b1c80a71d7842658189b" dependencies = [ - "apalis-core", + "apalis-core 1.0.0-rc.1", "futures-util", "pin-project", "thiserror", @@ -37,11 +37,28 @@ version = "0.1.0-rc.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a5ed6bb8e64c360ed4ad666a6cbc42e9e6df73087461dc4071f510a3af284637" dependencies = [ - "apalis-core", + "apalis-core 1.0.0-rc.1", "serde", "serde_json", ] +[[package]] +name = "apalis-core" +version = "1.0.0-beta.2" +dependencies = [ + "futures-channel", + "futures-core", + "futures-sink", + "futures-timer", + "futures-util", + "pin-project", + "serde", + "serde_json", + "thiserror", + "tower-layer", + "tower-service", +] + [[package]] name = "apalis-core" version = "1.0.0-rc.1" @@ -67,7 +84,7 @@ version = "1.0.0-rc.1" dependencies = [ "apalis", "apalis-codec", - "apalis-core", + "apalis-core 1.0.0-beta.2", "apalis-sql", "apalis-workflow", "async-std", @@ -80,18 +97,16 @@ dependencies = [ "serde_json", "sqlx", "thiserror", + "time", "tokio", "ulid", ] [[package]] name = "apalis-sql" -version = "1.0.0-rc.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ade5d8faa60e9975b01d3bb1ebc5028589aa4986365eaa4d080d30ed3b5141f" +version = "1.0.0-beta.2" dependencies = [ - "apalis-core", - "chrono", + "apalis-core 1.0.0-beta.2", "serde", "serde_json", "thiserror", @@ -103,7 +118,7 @@ version = "0.1.0-rc.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc024da2d5d3ab59cc9fea099a2e2b20de5ff608f2e287abcb73aa45e4966a89" dependencies = [ - "apalis-core", + "apalis-core 1.0.0-rc.1", "futures", "petgraph", "serde", @@ -366,10 +381,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2" dependencies = [ "iana-time-zone", - "js-sys", "num-traits", "serde", - "wasm-bindgen", "windows-link", ] @@ -464,6 +477,16 @@ dependencies = [ "zeroize", ] +[[package]] +name = "deranged" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ececcb659e7ba858fb4f10388c250a7252eb0a27373f1a72b8748afdd248e587" +dependencies = [ + "powerfmt", + "serde_core", +] + [[package]] name = "digest" version = "0.10.7" @@ -1199,6 +1222,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-integer" version = "0.1.46" @@ -1445,6 +1474,12 @@ dependencies = [ "zerovec", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -1893,6 +1928,7 @@ dependencies = [ "sha2", "smallvec", "thiserror", + "time", "tokio", "tokio-stream", "tracing", @@ -1978,6 +2014,7 @@ dependencies = [ "sqlx-core", "stringprep", "thiserror", + "time", "tracing", "whoami", ] @@ -2016,6 +2053,7 @@ dependencies = [ "sqlx-core", "stringprep", "thiserror", + "time", "tracing", "whoami", ] @@ -2041,6 +2079,7 @@ dependencies = [ "serde_urlencoded", "sqlx-core", "thiserror", + "time", "tracing", "url", ] @@ -2129,6 +2168,37 @@ dependencies = [ "syn", ] +[[package]] +name = "time" +version = "0.3.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e7d9e3bb61134e77bde20dd4825b97c010155709965fedf0f49bb138e52a9d" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40868e7c1d2f0b8d73e4a8c7f0ff63af4f6d19be117e90bd73eb1d62cf831c6b" + +[[package]] +name = "time-macros" +version = "0.2.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30cfb0125f12d9c277f35663a0a33f8c30190f4e4574868a330595412d34ebf3" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tinystr" version = "0.8.2" diff --git a/Cargo.toml b/Cargo.toml index 07a652e..bc6a660 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,21 +14,24 @@ categories = ["asynchronous", "database", "network-programming"] publish = true [features] -default = ["migrate", "tokio-comp"] +default = ["migrate", "tokio-comp", "chrono"] migrate = ["sqlx/migrate", "sqlx/macros"] async-std-comp = ["async-std", "sqlx/runtime-async-std-rustls"] async-std-comp-native-tls = ["async-std", "sqlx/runtime-async-std-native-tls"] tokio-comp = ["tokio", "sqlx/runtime-tokio-rustls"] tokio-comp-native-tls = ["tokio", "sqlx/runtime-tokio-native-tls"] +chrono = ["dep:chrono", "sqlx/chrono"] +time = ["dep:time", "sqlx/time"] [dependencies] -apalis-core = { version = "1.0.0-rc.1", default-features = false, features = [ +apalis-core = { path = "../apalis/apalis-core", default-features = false, features = [ "sleep", ] } -apalis-sql = { version = "1.0.0-rc.1", default-features = false } +apalis-sql = { path = "../apalis/apalis-sql", default-features = false } apalis-codec = { version = "0.1.0-rc.1", features = ["json"] } serde = { version = "1", features = ["derive"], default-features = false } -chrono = { version = "0.4", features = ["serde"], default-features = false } +chrono = { version = "0.4", features = ["serde"], default-features = false, optional = true } +time = { version = "0.3", features = ["serde"], optional = true } pin-project = "1.1.10" serde_json = "1" futures = "0.3.30" @@ -44,7 +47,12 @@ ulid = { version = "1", features = ["serde"] } [dependencies.sqlx] version = "0.8.1" default-features = false -features = ["chrono", "postgres", "json"] +features = ["postgres", "json"] + +[package.metadata.cargo-udeps.ignore] +# When both chrono and time features are enabled, time takes precedence +# so chrono appears unused. This is expected behavior for --all-features. +normal = ["chrono"] [dev-dependencies] tokio = { version = "1", features = ["macros", "rt-multi-thread"] } @@ -52,3 +60,4 @@ once_cell = "1.19.0" apalis = { version = "1.0.0-rc.1" } apalis-workflow = { version = "0.1.0-rc.1" } futures-util = "0.3.30" + diff --git a/src/fetcher.rs b/src/fetcher.rs index d43fcf9..971ff62 100644 --- a/src/fetcher.rs +++ b/src/fetcher.rs @@ -8,6 +8,8 @@ use std::{ use apalis_core::{task::Task, timer::Delay, worker::context::WorkerContext}; use apalis_sql::from_row::TaskRow; + +use crate::timestamp::PgDateTime; use futures::{Future, FutureExt, future::BoxFuture, stream::Stream}; use pin_project::pin_project; @@ -35,7 +37,7 @@ async fn fetch_next( .await? .into_iter() .map(|r| { - let row: TaskRow = r.try_into()?; + let row: TaskRow = r.try_into()?; row.try_into_task_compact() .map_err(|e| sqlx::Error::Protocol(e.to_string())) }) diff --git a/src/from_row.rs b/src/from_row.rs index d4a1dc3..a8139d9 100644 --- a/src/from_row.rs +++ b/src/from_row.rs @@ -1,4 +1,5 @@ -use chrono::{DateTime, Utc}; +use crate::{PgDateTime, RawDateTime}; + #[derive(Debug)] pub struct PgTaskRow { pub job: Option>, @@ -7,18 +8,19 @@ pub struct PgTaskRow { pub status: Option, pub attempts: Option, pub max_attempts: Option, - pub run_at: Option>, + pub run_at: Option, pub last_result: Option, - pub lock_at: Option>, + pub lock_at: Option, pub lock_by: Option, - pub done_at: Option>, + pub done_at: Option, pub priority: Option, pub metadata: Option, } -impl TryInto for PgTaskRow { + +impl TryInto> for PgTaskRow { type Error = sqlx::Error; - fn try_into(self) -> Result { + fn try_into(self) -> Result, Self::Error> { Ok(apalis_sql::from_row::TaskRow { job: self.job.unwrap_or_default(), id: self @@ -35,11 +37,11 @@ impl TryInto for PgTaskRow { .ok_or_else(|| sqlx::Error::Protocol("Missing attempts".into()))? as usize, max_attempts: self.max_attempts.map(|v| v as usize), - run_at: self.run_at, + run_at: self.run_at.map(PgDateTime::from), last_result: self.last_result, - lock_at: self.lock_at, + lock_at: self.lock_at.map(PgDateTime::from), lock_by: self.lock_by, - done_at: self.done_at, + done_at: self.done_at.map(PgDateTime::from), priority: self.priority.map(|v| v as usize), metadata: self.metadata, }) diff --git a/src/lib.rs b/src/lib.rs index 34f49af..c65f71a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,7 +6,7 @@ use std::{fmt::Debug, marker::PhantomData}; use apalis_codec::json::JsonCodec; use apalis_core::{ - backend::{Backend, BackendExt, TaskStream, codec::Codec, queue::Queue}, + backend::{Backend, BackendExt, TaskStream, codec::Codec}, features_table, layers::Stack, task::{Task, task_id::TaskId}, @@ -35,8 +35,12 @@ pub use crate::{ mod ack; mod fetcher; mod from_row; +mod timestamp; -pub type PgContext = apalis_sql::context::SqlContext; +pub use timestamp::PgDateTime; +pub(crate) use timestamp::{RawDateTime, now_raw, timestamp_from_unix}; + +pub type PgContext = apalis_sql::context::SqlContext; mod queries; pub mod shared; pub mod sink; @@ -251,9 +255,6 @@ where type Codec = Decode; type CompactStream = TaskStream, Self::Error>; - fn get_queue(&self) -> Queue { - self.config.queue().clone() - } fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream { self.poll_basic(worker).boxed() } @@ -348,10 +349,6 @@ where type Codec = Decode; type CompactStream = TaskStream, Self::Error>; - fn get_queue(&self) -> Queue { - self.config.queue().clone() - } - fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream { self.poll_with_notify(worker).boxed() } @@ -413,7 +410,7 @@ impl PostgresStorage { ) .fetch(&mut *tx) .map(|r| { - let row: TaskRow = r?.try_into()?; + let row: TaskRow = r?.try_into()?; Ok(Some( row.try_into_task_compact() .map_err(|e| sqlx::Error::Protocol(e.to_string()))?, diff --git a/src/queries/fetch_by_id.rs b/src/queries/fetch_by_id.rs index d9e5afe..dc74300 100644 --- a/src/queries/fetch_by_id.rs +++ b/src/queries/fetch_by_id.rs @@ -24,7 +24,7 @@ where .fetch_optional(&pool) .await? .map(|r: PgTaskRow| { - let row: TaskRow = r.try_into()?; + let row: TaskRow = r.try_into()?; row.try_into_task_compact() .and_then(|a| { a.try_map(|t| D::decode(&t)) diff --git a/src/queries/keep_alive.rs b/src/queries/keep_alive.rs index c0a5a4a..cfd25e5 100644 --- a/src/queries/keep_alive.rs +++ b/src/queries/keep_alive.rs @@ -1,10 +1,9 @@ use apalis_core::worker::context::WorkerContext; -use chrono::Utc; use futures::{FutureExt, Stream, stream}; use sqlx::PgPool; use crate::{ - Config, + Config, now_raw, queries::{ reenqueue_orphaned::reenqueue_orphaned, register_worker::register as register_worker, }, @@ -36,7 +35,7 @@ pub async fn initial_heartbeat( storage_type: &str, ) -> Result<(), sqlx::Error> { reenqueue_orphaned(pool.clone(), config.clone()).await?; - let last_seen = Utc::now(); + let last_seen = now_raw(); register_worker( pool, config.queue().to_string(), diff --git a/src/queries/list_tasks.rs b/src/queries/list_tasks.rs index 47594ec..59fe034 100644 --- a/src/queries/list_tasks.rs +++ b/src/queries/list_tasks.rs @@ -42,7 +42,7 @@ where .await? .into_iter() .map(|r| { - let row: TaskRow = r.try_into()?; + let row: TaskRow = r.try_into()?; row.try_into_task_compact() .and_then(|a| { a.try_map(|t| D::decode(&t)) @@ -87,7 +87,7 @@ where .await? .into_iter() .map(|r| { - let row: TaskRow = r.try_into()?; + let row: TaskRow = r.try_into()?; row.try_into_task_compact() .map_err(|e| sqlx::Error::Protocol(e.to_string())) }) diff --git a/src/queries/list_workers.rs b/src/queries/list_workers.rs index adff2a5..d266927 100644 --- a/src/queries/list_workers.rs +++ b/src/queries/list_workers.rs @@ -1,16 +1,18 @@ use apalis_core::backend::{BackendExt, ListWorkers, RunningWorker}; -use chrono::{DateTime, Utc}; +use apalis_sql::SqlTimestamp; use futures::TryFutureExt; use ulid::Ulid; +use crate::{PgDateTime, RawDateTime}; + #[derive(Debug)] pub struct WorkerRow { pub id: String, pub worker_type: String, pub storage_name: String, pub layers: Option, - pub last_seen: DateTime, - pub started_at: Option>, + pub last_seen: RawDateTime, + pub started_at: Option, } use crate::{CompactType, PgContext, PostgresStorage}; @@ -42,8 +44,8 @@ where .map(|w| RunningWorker { id: w.id, backend: w.storage_name, - started_at: w.started_at.map(|t| t.timestamp()).unwrap_or_default() as u64, - last_heartbeat: w.last_seen.timestamp() as u64, + started_at: w.started_at.map(|t| PgDateTime::from(t).to_unix_timestamp()).unwrap_or_default() as u64, + last_heartbeat: PgDateTime::from(w.last_seen).to_unix_timestamp() as u64, layers: w.layers.unwrap_or_default(), queue: w.worker_type, }) @@ -73,8 +75,8 @@ where .map(|w| RunningWorker { id: w.id, backend: w.storage_name, - started_at: w.started_at.map(|t| t.timestamp()).unwrap_or_default() as u64, - last_heartbeat: w.last_seen.timestamp() as u64, + started_at: w.started_at.map(|t| PgDateTime::from(t).to_unix_timestamp()).unwrap_or_default() as u64, + last_heartbeat: PgDateTime::from(w.last_seen).to_unix_timestamp() as u64, layers: w.layers.unwrap_or_default(), queue: w.worker_type, }) diff --git a/src/queries/register_worker.rs b/src/queries/register_worker.rs index 641c4f5..d4836df 100644 --- a/src/queries/register_worker.rs +++ b/src/queries/register_worker.rs @@ -1,12 +1,13 @@ use apalis_core::worker::context::WorkerContext; -use chrono::{DateTime, Utc}; use sqlx::PgPool; +use crate::RawDateTime; + pub async fn register( pool: PgPool, worker_type: String, worker: WorkerContext, - last_seen: DateTime, + last_seen: RawDateTime, backend_type: &str, ) -> Result<(), sqlx::Error> { let res = sqlx::query_file!( diff --git a/src/queries/wait_for.rs b/src/queries/wait_for.rs index 4c9b707..4171007 100644 --- a/src/queries/wait_for.rs +++ b/src/queries/wait_for.rs @@ -24,7 +24,7 @@ where BackendExt, Result: DeserializeOwned, { - type ResultStream = BoxStream<'static, Result, Self::Error>>; + type ResultStream = BoxStream<'static, Result, Self::Error>>; fn wait_for( &self, task_ids: impl IntoIterator>, @@ -80,7 +80,7 @@ where fn check_status( &self, task_ids: impl IntoIterator> + Send, - ) -> impl Future>, Self::Error>> + Send { + ) -> impl Future>, Self::Error>> + Send { let pool = self.pool.clone(); let ids: Vec = task_ids.into_iter().map(|id| id.to_string()).collect(); diff --git a/src/shared.rs b/src/shared.rs index 46a08a5..95c7700 100644 --- a/src/shared.rs +++ b/src/shared.rs @@ -19,7 +19,7 @@ use crate::{ use crate::{from_row::PgTaskRow, sink::PgSink}; use apalis_codec::json::JsonCodec; use apalis_core::{ - backend::{Backend, BackendExt, TaskStream, codec::Codec, queue::Queue, shared::MakeShared}, + backend::{Backend, BackendExt, TaskStream, codec::Codec, shared::MakeShared}, layers::Stack, worker::{context::WorkerContext, ext::ack::AcknowledgeLayer}, }; @@ -222,10 +222,6 @@ where type Codec = Decode; type CompactStream = TaskStream, Self::Error>; - fn get_queue(&self) -> Queue { - self.config.queue().clone() - } - fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream { self.poll_shared(worker).boxed() } @@ -263,7 +259,7 @@ impl PostgresStorage { ) .fetch(&mut *tx) .map(|r| { - let row: TaskRow = r?.try_into()?; + let row: TaskRow = r?.try_into()?; Ok(Some( row.try_into_task_compact() .map_err(|e| sqlx::Error::Protocol(e.to_string()))?, diff --git a/src/sink.rs b/src/sink.rs index 2c0565f..04367de 100644 --- a/src/sink.rs +++ b/src/sink.rs @@ -1,6 +1,5 @@ use apalis_codec::json::JsonCodec; use apalis_sql::config::Config; -use chrono::{DateTime, Utc}; use futures::{ FutureExt, Sink, TryFutureExt, future::{BoxFuture, Shared}, @@ -13,7 +12,7 @@ use std::{ }; use ulid::Ulid; -use crate::{CompactType, PgTask, PostgresStorage}; +use crate::{CompactType, PgTask, PostgresStorage, timestamp_from_unix}; type FlushFuture = BoxFuture<'static, Result<(), Arc>>; @@ -64,7 +63,7 @@ where .unwrap_or(Ulid::new().to_string()), ); job_data.push(task.args); - run_ats.push(DateTime::from_timestamp(task.parts.run_at as i64, 0).unwrap_or(Utc::now())); + run_ats.push(timestamp_from_unix(task.parts.run_at as i64)); priorities.push(task.parts.ctx.priority()); max_attempts_vec.push(task.parts.ctx.max_attempts()); metadata.push(serde_json::Value::Object(task.parts.ctx.meta().clone())); diff --git a/src/timestamp.rs b/src/timestamp.rs new file mode 100644 index 0000000..5a75e52 --- /dev/null +++ b/src/timestamp.rs @@ -0,0 +1,202 @@ +use apalis_sql::SqlTimestamp; +use sqlx::encode::IsNull; +use sqlx::error::BoxDynError; +use sqlx::{Database, Postgres}; +use std::fmt; + +// ============================================================================ +// Chrono implementation +// ============================================================================ + +#[cfg(all(feature = "chrono", not(feature = "time")))] +pub use chrono_impl::*; + +#[cfg(all(feature = "chrono", not(feature = "time")))] +mod chrono_impl { + use super::*; + use chrono::{DateTime, Utc}; + + /// The raw datetime type used by sqlx (for use in query structs) + pub type RawDateTime = DateTime; + + /// Newtype wrapper around `chrono::DateTime` for use with apalis-sql. + #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] + pub struct PgDateTime(pub DateTime); + + impl PgDateTime { + /// Get the current time + pub fn now() -> Self { + Self(Utc::now()) + } + + /// Create from a unix timestamp (seconds since epoch) + pub fn from_timestamp(secs: i64, nanos: u32) -> Option { + DateTime::from_timestamp(secs, nanos).map(Self) + } + } + + /// Convert unix timestamp to raw datetime for sqlx queries + pub fn timestamp_from_unix(secs: i64) -> RawDateTime { + DateTime::from_timestamp(secs, 0).unwrap_or_else(Utc::now) + } + + /// Get current time as raw datetime for sqlx queries + pub fn now_raw() -> RawDateTime { + Utc::now() + } + + impl SqlTimestamp for PgDateTime { + fn to_unix_timestamp(&self) -> i64 { + self.0.timestamp() + } + } + + impl From> for PgDateTime { + fn from(dt: DateTime) -> Self { + Self(dt) + } + } + + impl std::ops::Deref for PgDateTime { + type Target = DateTime; + fn deref(&self) -> &Self::Target { + &self.0 + } + } + + impl fmt::Display for PgDateTime { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } + } + + // sqlx trait implementations - delegate to inner type + impl sqlx::Type for PgDateTime { + fn type_info() -> ::TypeInfo { + as sqlx::Type>::type_info() + } + + fn compatible(ty: &::TypeInfo) -> bool { + as sqlx::Type>::compatible(ty) + } + } + + impl<'r> sqlx::Decode<'r, Postgres> for PgDateTime { + fn decode( + value: sqlx::postgres::PgValueRef<'r>, + ) -> Result { + let dt = as sqlx::Decode<'r, Postgres>>::decode(value)?; + Ok(Self(dt)) + } + } + + impl sqlx::Encode<'_, Postgres> for PgDateTime { + fn encode_by_ref( + &self, + buf: &mut sqlx::postgres::PgArgumentBuffer, + ) -> Result { + as sqlx::Encode<'_, Postgres>>::encode_by_ref(&self.0, buf) + } + } +} + +// ============================================================================ +// Time implementation +// ============================================================================ + +#[cfg(feature = "time")] +pub use time_impl::*; + +#[cfg(feature = "time")] +mod time_impl { + use super::*; + use time::OffsetDateTime; + + /// The raw datetime type used by sqlx (for use in query structs) + pub type RawDateTime = OffsetDateTime; + + /// Newtype wrapper around `time::OffsetDateTime` for use with apalis-sql. + #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] + pub struct PgDateTime(pub OffsetDateTime); + + impl PgDateTime { + /// Get the current time + pub fn now() -> Self { + Self(OffsetDateTime::now_utc()) + } + + /// Create from a unix timestamp (seconds since epoch) + pub fn from_timestamp(secs: i64, nanos: u32) -> Option { + OffsetDateTime::from_unix_timestamp_nanos(secs as i128 * 1_000_000_000 + nanos as i128) + .ok() + .map(Self) + } + } + + /// Convert unix timestamp to raw datetime for sqlx queries + pub fn timestamp_from_unix(secs: i64) -> RawDateTime { + OffsetDateTime::from_unix_timestamp(secs).unwrap_or_else(|_| OffsetDateTime::now_utc()) + } + + /// Get current time as raw datetime for sqlx queries + pub fn now_raw() -> RawDateTime { + OffsetDateTime::now_utc() + } + + impl SqlTimestamp for PgDateTime { + fn to_unix_timestamp(&self) -> i64 { + self.0.unix_timestamp() + } + } + + impl From for PgDateTime { + fn from(dt: OffsetDateTime) -> Self { + Self(dt) + } + } + + impl std::ops::Deref for PgDateTime { + type Target = OffsetDateTime; + fn deref(&self) -> &Self::Target { + &self.0 + } + } + + impl fmt::Display for PgDateTime { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } + } + + // sqlx trait implementations - delegate to inner type + impl sqlx::Type for PgDateTime { + fn type_info() -> ::TypeInfo { + >::type_info() + } + + fn compatible(ty: &::TypeInfo) -> bool { + >::compatible(ty) + } + } + + impl<'r> sqlx::Decode<'r, Postgres> for PgDateTime { + fn decode( + value: sqlx::postgres::PgValueRef<'r>, + ) -> Result { + let dt = >::decode(value)?; + Ok(Self(dt)) + } + } + + impl sqlx::Encode<'_, Postgres> for PgDateTime { + fn encode_by_ref( + &self, + buf: &mut sqlx::postgres::PgArgumentBuffer, + ) -> Result { + >::encode_by_ref(&self.0, buf) + } + } +} + +#[cfg(not(any(feature = "chrono", feature = "time")))] +compile_error!("Either 'chrono' or 'time' feature must be enabled");