Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: Documentation

on:
push:
branches: [ main ]
branches: [main]
pull_request:
branches: [ main ]
branches: [main]

env:
CARGO_TERM_COLOR: always
Expand All @@ -31,6 +31,6 @@ jobs:

- name: Build documentation
run: |
cargo doc --all-features --no-deps --document-private-items
cargo doc --features=migrate,tokio-comp,json --no-deps --document-private-items
env:
RUSTDOCFLAGS: "--cfg docsrs -Dwarnings"

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Changelog

All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]


### Added

- Workflow support

### Changed

- Moved from monorepo
12 changes: 8 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "apalis-sqlite"
version = "1.0.0-alpha.1"
version = "1.0.0-alpha.2"
authors = ["Njuguna Mureithi <[email protected]>"]
readme = "README.md"
edition = "2024"
Expand Down Expand Up @@ -29,7 +29,8 @@ serde_json = { version = "1" }
apalis-core = { version = "1.0.0-alpha.4", default-features = false, features = [
"sleep",
"json",
] }
], git = "https://github.com/geofmureithi/apalis.git", branch = "chore/traits-expansion" }
apalis-workflow = { version = "0.1.0-alpha.3", git = "https://github.com/geofmureithi/apalis.git", branch = "chore/traits-expansion" }
log = "0.4.21"
futures = "0.3.30"
tokio = { version = "1", features = ["rt", "net"], optional = true }
Expand All @@ -45,10 +46,13 @@ bytes = "1.1.0"

[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
apalis-core = { version = "1.0.0-alpha.4", features = ["test-utils"] }
apalis-core = { version = "1.0.0-alpha.4", features = [
"test-utils",
], git = "https://github.com/geofmureithi/apalis.git", branch = "chore/traits-expansion" }
apalis-workflow = { version = "0.1.0-alpha.2", git = "https://github.com/geofmureithi/apalis.git", branch = "chore/traits-expansion" }
apalis-sqlite = { path = ".", features = ["migrate", "tokio-comp"] }

[package.metadata.docs.rs]
# defines the configuration attribute `docsrs`
rustdoc-args = ["--cfg", "docsrs"]
all-features = true
features = ["migrate", "tokio-comp", "json"]
48 changes: 46 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async fn main() {
Ok(())
}

let worker = apalis_core::worker::builder::WorkerBuilder::new("worker-1")
let worker = WorkerBuilder::new("worker-1")
.backend(backend)
.build(send_reminder);
worker.run().await.unwrap();
Expand Down Expand Up @@ -93,13 +93,57 @@ async fn main() {
Ok(())
}

let worker = apalis_core::worker::builder::WorkerBuilder::new("worker-2")
let worker = WorkerBuilder::new("worker-2")
.backend(backend)
.build(send_reminder);
worker.run().await.unwrap();
}
```

### Workflow Example

```rust,no_run
#[tokio::main]
async fn main() {
let workflow = WorkFlow::new("odd-numbers-workflow")
.then(|a: usize| async move {
Ok::<_, WorkflowError>((0..=a).collect::<Vec<_>>())
})
.filter_map(|x| async move {
if x % 2 != 0 { Some(x) } else { None }
})
.filter_map(|x| async move {
if x % 3 != 0 { Some(x) } else { None }
})
.filter_map(|x| async move {
if x % 5 != 0 { Some(x) } else { None }
})
.delay_for(Duration::from_millis(1000))
.then(|a: Vec<usize>| async move {
println!("Sum: {}", a.iter().sum::<usize>());
Ok::<(), WorkflowError>(())
});

let pool = SqlitePool::connect(":memory:").await.unwrap();
SqliteStorage::setup(&pool).await.unwrap();
let mut sqlite = SqliteStorage::new_in_queue(&pool, "test-workflow");

sqlite.push(100usize).await.unwrap();

let worker = WorkerBuilder::new("rango-tango")
.backend(sqlite)
.on_event(|ctx, ev| {
println!("On Event = {:?}", ev);
if matches!(ev, Event::Error(_)) {
ctx.stop().unwrap();
}
})
.build(workflow);

worker.run().await.unwrap();
}
```

## Migrations

If the `migrate` feature is enabled, you can run built-in migrations with:
Expand Down
4 changes: 4 additions & 0 deletions migrations/json/20251018162501_metadata.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ALTER TABLE
Jobs
ADD
COLUMN metadata TEXT;
21 changes: 21 additions & 0 deletions queries/backend/fetch_completed_tasks.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
SELECT
id,
status,
last_result as result
FROM
Jobs
WHERE
id IN (
SELECT
value
FROM
json_each(?)
)
AND (
status = 'Done'
OR (
status = 'Failed'
AND attempts >= max_attempts
)
OR status = 'Killed'
)
3 changes: 2 additions & 1 deletion queries/task/sink.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ VALUES
NULL,
NULL,
NULL,
?6
?6,
?7
)
25 changes: 21 additions & 4 deletions src/ack.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
use std::any::Any;

use apalis_core::{
error::{AbortError, BoxDynError},
task::{Parts, status::Status},
worker::{context::WorkerContext, ext::ack::Acknowledge},
};
use apalis_workflow::StepResult;
use futures::{FutureExt, future::BoxFuture};
use serde::Serialize;
use serde_json::Value;
use sqlx::SqlitePool;
use tower_layer::Layer;
use tower_service::Service;
use ulid::Ulid;

use crate::{SqliteTask, context::SqliteContext};
use crate::{CompactType, SqliteTask, context::SqliteContext};

#[derive(Clone)]
pub struct SqliteAck {
Expand All @@ -22,7 +26,7 @@ impl SqliteAck {
}
}

impl<Res: Serialize> Acknowledge<Res, SqliteContext, Ulid> for SqliteAck {
impl<Res: Serialize + 'static> Acknowledge<Res, SqliteContext, Ulid> for SqliteAck {
type Error = sqlx::Error;
type Future = BoxFuture<'static, Result<(), Self::Error>>;
fn ack(
Expand All @@ -33,7 +37,20 @@ impl<Res: Serialize> Acknowledge<Res, SqliteContext, Ulid> for SqliteAck {
let task_id = parts.task_id;
let worker_id = parts.ctx.lock_by().clone();

let response = serde_json::to_string(&res.as_ref().map_err(|e| e.to_string()));
// Workflows need special handling to serialize the response correctly
let response = match res {
Ok(r) => {
if let Some(res_ref) = (r as &dyn Any).downcast_ref::<StepResult<CompactType>>() {
let res_deserialized: Result<Value, serde_json::Error> =
serde_json::from_str(&res_ref.0);
serde_json::to_string(&res_deserialized.map_err(|e| e.to_string()))
} else {
serde_json::to_string(&res.as_ref().map_err(|e| e.to_string()))
}
}
_ => serde_json::to_string(&res.as_ref().map_err(|e| e.to_string())),
};

let status = calculate_status(parts, res);
parts.status.store(status.clone());
let attempt = parts.attempt.current() as i32;
Expand Down Expand Up @@ -151,7 +168,7 @@ where
.unwrap();
let parts = &req.parts;
let task_id = match &parts.task_id {
Some(id) => id.inner().clone(),
Some(id) => *id.inner(),
None => {
return async {
Err(sqlx::Error::ColumnNotFound("TASK_ID_FOR_LOCK".to_owned()).into())
Expand Down
Loading
Loading