Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 81 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 16 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,24 @@ categories = ["asynchronous", "database", "network-programming"]
publish = true

[features]
default = ["migrate", "tokio-comp"]
default = ["migrate", "tokio-comp", "chrono"]
migrate = ["sqlx/migrate", "sqlx/macros"]
async-std-comp = ["async-std", "sqlx/runtime-async-std-rustls"]
async-std-comp-native-tls = ["async-std", "sqlx/runtime-async-std-native-tls"]
tokio-comp = ["tokio", "sqlx/runtime-tokio-rustls"]
tokio-comp-native-tls = ["tokio", "sqlx/runtime-tokio-native-tls"]
chrono = ["dep:chrono", "sqlx/chrono"]
time = ["dep:time", "sqlx/time"]

[dependencies]
apalis-codec = "0.1.0-rc.1"
apalis-core = { version = "1.0.0-rc.1", default-features = false, features = [
"sleep",
] }
apalis-sql = { version = "1.0.0-rc.1", default-features = false }
# For development: use local path to apalis with SqlTimestamp trait
# For release: switch to version or git reference once apalis-sql is updated
apalis-core = { path = "../apalis/apalis-core", default-features = false, features = ["sleep"] }
apalis-sql = { path = "../apalis/apalis-sql", default-features = false }
serde = { version = "1", features = ["derive"] }
chrono = { version = "0.4", features = ["serde"] }
chrono = { version = "0.4", features = ["serde"], optional = true }
time = { version = "0.3", features = ["serde"], optional = true }
pin-project = "1.1.10"
serde_json = "1"
futures = "0.3.30"
Expand All @@ -42,7 +45,8 @@ log = "0.4"
[dependencies.sqlx]
version = "0.8.1"
default-features = false
features = ["chrono", "mysql", "json"]
# Note: chrono/time features are controlled by our feature flags
features = ["mysql", "json"]


[dev-dependencies]
Expand Down Expand Up @@ -76,3 +80,8 @@ needless_pass_by_ref_mut = "warn"
needless_pass_by_value = "warn"
option_option = "warn"
redundant_clone = "warn"

[package.metadata.cargo-udeps.ignore]
# When both chrono and time features are enabled, time takes precedence
# so chrono appears unused. This is expected behavior for --all-features.
normal = ["chrono"]
6 changes: 3 additions & 3 deletions src/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use pin_project::pin_project;
use sqlx::{MySql, MySqlPool, Pool};
use ulid::Ulid;

use crate::{CompactType, MySqlContext, MySqlTask, from_row::MySqlTaskRow};
use crate::{CompactType, MysqlDateTime, MySqlContext, MySqlTask, from_row::MySqlTaskRow, timestamp};

/// Fetch the next batch of tasks from the mysql backend
pub async fn fetch_next(
Expand All @@ -26,7 +26,7 @@ pub async fn fetch_next(
worker: WorkerContext,
) -> Result<Vec<Task<CompactType, MySqlContext, Ulid>>, sqlx::Error> {
let mut tx = pool.begin().await?;
let lock_at = chrono::Utc::now().naive_utc();
let lock_at = timestamp::now();
let job_type = config.queue().to_string();
let buffer_size = config.buffer_size() as i32;
let worker = worker.name().clone();
Expand Down Expand Up @@ -66,7 +66,7 @@ pub async fn fetch_next(
let res = rows
.into_iter()
.map(|r| {
let mut row: TaskRow = r.try_into()?;
let mut row: TaskRow<MysqlDateTime> = r.try_into()?;
row.lock_by = Some(worker.clone());
row.try_into_task_compact()
.map_err(|e| sqlx::Error::Protocol(e.to_string()))
Expand Down
19 changes: 10 additions & 9 deletions src/from_row.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use chrono::NaiveDateTime;
use crate::timestamp::RawDateTime;
use crate::MysqlDateTime;
use serde_json::Value;

#[derive(Debug)]
Expand All @@ -9,19 +10,19 @@ pub(crate) struct MySqlTaskRow {
pub(crate) status: Option<String>,
pub(crate) attempts: Option<i32>,
pub(crate) max_attempts: Option<i32>,
pub(crate) run_at: Option<NaiveDateTime>,
pub(crate) run_at: Option<RawDateTime>,
pub(crate) last_result: Option<String>,
pub(crate) lock_at: Option<NaiveDateTime>,
pub(crate) lock_at: Option<RawDateTime>,
pub(crate) lock_by: Option<String>,
pub(crate) done_at: Option<NaiveDateTime>,
pub(crate) done_at: Option<RawDateTime>,
pub(crate) priority: Option<i32>,
pub(crate) metadata: Option<Value>,
}

impl TryInto<apalis_sql::from_row::TaskRow> for MySqlTaskRow {
impl TryInto<apalis_sql::from_row::TaskRow<MysqlDateTime>> for MySqlTaskRow {
type Error = sqlx::Error;

fn try_into(self) -> Result<apalis_sql::from_row::TaskRow, Self::Error> {
fn try_into(self) -> Result<apalis_sql::from_row::TaskRow<MysqlDateTime>, Self::Error> {
Ok(apalis_sql::from_row::TaskRow {
job: self.job,
id: self
Expand All @@ -38,13 +39,13 @@ impl TryInto<apalis_sql::from_row::TaskRow> for MySqlTaskRow {
.ok_or_else(|| sqlx::Error::Protocol("Missing attempts".into()))?
as usize,
max_attempts: self.max_attempts.map(|v| v as usize),
run_at: self.run_at.map(|ts| ts.and_utc()),
run_at: self.run_at.map(MysqlDateTime::from),
last_result: self
.last_result
.map(|res| serde_json::from_str(&res).unwrap_or(serde_json::Value::Null)),
lock_at: self.lock_at.map(|ts| ts.and_utc()),
lock_at: self.lock_at.map(MysqlDateTime::from),
lock_by: self.lock_by,
done_at: self.done_at.map(|ts| ts.and_utc()),
done_at: self.done_at.map(MysqlDateTime::from),
priority: self.priority.map(|v| v as usize),
metadata: self.metadata,
})
Expand Down
16 changes: 6 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//!
use apalis_codec::json::JsonCodec;
use apalis_core::{
backend::{Backend, BackendExt, TaskStream, codec::Codec, queue::Queue},
backend::{Backend, BackendExt, TaskStream, codec::Codec},
features_table,
layers::Stack,
task::Task,
Expand Down Expand Up @@ -41,16 +41,17 @@ pub mod queries;
mod shared;
/// Sink module for pushing tasks to mysql backend
pub mod sink;
mod timestamp;

pub use timestamp::MysqlDateTime;

/// Type alias for a task stored in mysql backend
pub type MySqlTask<Args> = Task<Args, MySqlContext, Ulid>;
pub use apalis_sql::config::Config;
pub use shared::{SharedMySqlError, SharedMySqlStorage};

pub type MySqlTaskId = apalis_core::task::task_id::TaskId<Ulid>;
pub type MySqlContext = SqlContext<MySqlPool>;

pub use apalis_sql::ext::TaskBuilderExt;
pub type MySqlContext = SqlContext;

/// CompactType is the type used for compact serialization in mysql backend
pub type CompactType = Vec<u8>;
Expand Down Expand Up @@ -275,10 +276,6 @@ where
type Compact = CompactType;
type CompactStream = TaskStream<MySqlTask<Self::Compact>, sqlx::Error>;

fn get_queue(&self) -> Queue {
self.config.queue().to_owned()
}

fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream {
self.poll_default(worker).boxed()
}
Expand Down Expand Up @@ -311,7 +308,6 @@ mod tests {

use apalis::prelude::*;
use apalis_workflow::*;
use chrono::Local;
use serde::{Deserialize, Serialize};
use sqlx::MySqlPool;

Expand All @@ -336,7 +332,7 @@ mod tests {
.take(ITEMS);
backend.push_stream(&mut items).await.unwrap();

println!("Starting worker at {}", Local::now());
println!("Starting worker at {}", timestamp::now());

async fn send_reminder(item: usize, wrk: WorkerContext) -> Result<(), BoxDynError> {
if ITEMS == item {
Expand Down
4 changes: 2 additions & 2 deletions src/queries/fetch_by_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use apalis_core::{
use apalis_sql::from_row::{FromRowError, TaskRow};
use ulid::Ulid;

use crate::{CompactType, MySqlContext, MySqlStorage, MySqlTask, from_row::MySqlTaskRow};
use crate::{CompactType, MysqlDateTime, MySqlContext, MySqlStorage, MySqlTask, from_row::MySqlTaskRow};

impl<Args, D, F> FetchById<Args> for MySqlStorage<Args, D, F>
where
Expand All @@ -30,7 +30,7 @@ where
.fetch_optional(&pool)
.await?
.map(|r| {
let row: TaskRow = r
let row: TaskRow<MysqlDateTime> = r
.try_into()
.map_err(|e: sqlx::Error| FromRowError::DecodeError(e.into()))?;
row.try_into_task_compact().and_then(|t| {
Expand Down
Loading