diff --git a/.github/workflows/cd.yaml b/.github/workflows/cd.yaml index f9b3747c..28221eb1 100644 --- a/.github/workflows/cd.yaml +++ b/.github/workflows/cd.yaml @@ -41,7 +41,7 @@ jobs: run: | ERROR='' echo VERSION: ${VERSION}, VERSION_NUMBER: ${VERSION_NUMBER} - for dir in "." packages/apalis-{core,cron,redis,sql}; do + for dir in "." packages/apalis-{core,sql}; do PACKAGE=$(cargo get package.name --entry $dir) ACTUAL=$(cargo get package.version --entry $dir) if [[ $VERSION != $ACTUAL ]]; then @@ -66,7 +66,7 @@ jobs: uses: actions-rs/cargo@v1 with: command: publish - args: ${{ env.PUBLISH_OPTS }} -p apalis-sql --features=tokio-comp + args: ${{ env.PUBLISH_OPTS }} -p apalis-sql - name: publish apalis uses: actions-rs/cargo@v1 with: diff --git a/CHANGELOG.md b/CHANGELOG.md index d562150f..7423edbb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ All notable changes to this project are documented in this file. - **chore**: introduce `cargo audit` and `cargo vet` ([#640](https://github.com/geofmureithi/apalis/pull/640)) - **chore**: add cargo udeps ([#641](https://github.com/geofmureithi/apalis/pull/641)) - **fix**: TaskId must be explicit to prevent defaulting to RandomId ([#643](https://github.com/geofmureithi/apalis/pull/643)) +- **chore**: bump to v1.0.0 beta.2 ([#624](https://github.com/geofmureithi/apalis/pull/644)) ### Breaking Changes diff --git a/apalis-core/Cargo.toml b/apalis-core/Cargo.toml index 3ff807ec..81d6d7f2 100644 --- a/apalis-core/Cargo.toml +++ b/apalis-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "apalis-core" -version = "1.0.0-beta.1" +version = "1.0.0-beta.2" authors = ["Njuguna Mureithi "] edition.workspace = true repository.workspace = true @@ -32,7 +32,7 @@ tower-layer = "0.3.3" pin-project = "1" thiserror = "2.0.0" futures-timer = { version = "3.0.3", optional = true } -tracing = { version = "0.1.42", default-features = false, optional = true } +tracing = { version = "0.1.41", default-features = false, optional = true } # Needed for the codec serde_json = { version = "1", optional = true } diff --git a/apalis-core/README.md b/apalis-core/README.md new file mode 100644 index 00000000..80a4d559 --- /dev/null +++ b/apalis-core/README.md @@ -0,0 +1,255 @@ +# apalis-core + +A high-performance, type-safe task processing framework for rust. + +`apalis-core` provides the fundamental abstractions and runtime components for building +scalable background task systems with middleware support, graceful shutdown, and monitoring capabilities. + +This is advanced documentation, for guide level documentation is found on the [website](https://apalis.dev). + +## Core Concepts + +`apalis-core` is built around four primary abstractions that provide a flexible and extensible task processing system: + +- **[`Tasks`](#tasks)**: Type-safe task data structures with processing metadata +- **[`Backends`](#backends)**: Pluggable task storage and streaming implementations +- **[`Workers`](#workers)**: Task processing engines with lifecycle management +- **[`Monitor`](#monitor)**: Multi-worker coordination and observability + +The framework leverages the `tower` service abstraction to provide a rich middleware +ecosystem like error handling, timeouts, rate limiting, +and observability. + + +### Tasks + +The task struct provides type-safe components for task data and metadata: +- [`Args`](crate::task_fn::guide) - The primary structure for the task +- [`Parts`](crate::task::Parts) - Wrapper type for information for task execution includes context, status, attempts, task_id and metadata +- [`Context`](crate::backend::Backend#required-associated-types) - contextual information with the task provided by the backend +- [`Status`](crate::task::status::Status) - Represents the current state of a task +- [`TaskId`](crate::task::task_id::TaskId) - Unique identifier for task tracking +- [`Attempt`](crate::task::attempt::Attempt) - Retry tracking and attempt information +- [`Extensions`](crate::task::data) - Type-safe storage for additional task data +- [`Metadata`](crate::task::metadata) - metadata associated with the task + +#### Example: Using `TaskBuilder` + +```rust +let task: Task = TaskBuilder::new("my-task".to_string()) + .id("task-123".into()) + .attempts(3) + .timeout(Duration::from_secs(30)) + .run_in_minutes(10) + .build(); +``` +Specific documentation for tasks can be found in the [`task`] and [`task::builder`] modules. + +##### Relevant Guides: +- [**Defining Task arguments**](https://docs.rs/apalis-core/1.0.0-beta.2/apalis_core/task_fn/guide/index.html) - Creating effective task arguments that are scalable and type-safe + +### Backends + +The [`Backend`](https://docs.rs/apalis-core/1.0.0-beta.2/apalis_core/backend/trait.Backend.html) trait serves as the core abstraction for all task sources. +It defines task polling mechanisms, streaming interfaces, and middleware integration points. + +
+Associated Types: + +- `Stream` - Defines the task stream type for polling operations +- `Layer` - Specifies the middleware layer stack for the backend +- `Codec` - Determines serialization format for task data persistence +- `Beat` - Heartbeat stream for worker liveness checks +- `IdType` - Type used for unique task identifiers +- `Ctx` - Context associated with tasks +- `Error` - Error type for backend operations + +
+ +#### Inbuilt Implementations +- [`MemoryStorage`](https://docs.rs/apalis-core/1.0.0-beta.1/apalis_core/backend/memory/struct.MemoryStorage.html) : In-memory storage based on channels +- [`Pipe`](https://docs.rs/apalis-core/1.0.0-beta.1/apalis_core/backend/pipe/index.html) : Pipe-based backend for a stream-to-backend pipeline +- [`CustomBackend`](https://docs.rs/apalis-core/1.0.0-beta.1/apalis_core/backend/custom/index.html) : Flexible backend composition allowing custom functions for task management + +Backends handle task persistence, distribution, and reliability concerns while providing +a uniform interface for worker consumption. + +### Workers + +The [`Worker`](https://docs.rs/apalis-core/1.0.0-beta.1/apalis_core/worker/index.html) is the core runtime component responsible for task polling, execution, and lifecycle management: + +#### Worker Lifecycle + +- Workers are responsible for task polling, processing, and lifecycle management. +- Workers can be run as a future or as a stream of events. +- Workers readiness is conditioned on the backend and service (and middleware) being ready. +- This means any blocking middleware eg (concurrency) will block the worker from polling tasks. + +#### Worker Components + +The following are the main components the worker module: + +- [`WorkerBuilder`] - Fluent builder for configuring and constructing workers +- [`Worker`] - Actual worker implementation that processes tasks +- [`WorkerContext`] - Runtime state including task counts and execution status +- [`Event`] - Worker event enumeration (`Start`, `Engage`, `Idle`, `Error`, `Stop`) +- [`Ext`](https://docs.rs/apalis-core/1.0.0-beta.1/apalis_core/worker/ext/index.html) - Extension traits and middleware for adding functionality to workers + +#### Example: Building and Running a Worker +```rust +#[tokio::main] +async fn main() { + let mut in_memory = MemoryStorage::new(); + in_memory.push(1u32).await.unwrap(); + + async fn task( + task: u32, + worker: WorkerContext, + ) -> Result<(), BoxDynError> { + /// Do some work + tokio::time::sleep(Duration::from_secs(1)).await; + worker.stop().unwrap(); + Ok(()) + } + + let worker = WorkerBuilder::new("rango-tango") + .backend(in_memory) + .on_event(|ctx, ev| { + println!("On Event = {:?}, {:?}", ev, ctx.name()); + }) + .build(task); + worker.run().await.unwrap(); +} +``` + +Learn more about workers in the [`worker`](crate::worker) and [`worker::builder`](crate::worker::builder) modules. + +##### Relevant Tutorials: +- [**Creating task handlers**](crate::task_fn::guide) - Defining task processing functions using the [`TaskFn`] trait +- [**Testing task handlers with `TestWorker`**](https://docs.rs/apalis-core/1.0.0-beta.1/apalis_core/worker/test_worker/index.html) - Specialized worker implementation for unit and integration testing + +### Monitor + +The [`Monitor`](https://docs.rs/apalis-core/1.0.0-beta.1/apalis_core/monitor/struct.Monitor.html) helps manage and coordinate multiple workers: + +**Main Features:** +- **Worker Registry** - Keeps track of active workers +- **Event Handling** - Handles and processes worker events +- **Graceful Shutdown** - Stops all workers together safely +- **Health Monitoring** - Restarts and manages worker health +#### Example: Using `Monitor` with a Worker + +```rust +#[tokio::main] +async fn main() { + let mut storage = JsonStorage::new_temp().unwrap(); + storage.push(1u32).await.unwrap(); + + let monitor = Monitor::new() + .on_event(|ctx, event| println!("{}: {:?}", ctx.name(), event)) + .register(move |_| { + WorkerBuilder::new("demo-worker") + .backend(storage.clone()) + .build(|req: u32, ctx: WorkerContext| async move { + println!("Processing task: {:?}", req); + Ok::<_, std::io::Error>(req) + }) + }); + + // Start monitor and run all registered workers + monitor.run().await.unwrap(); +} +``` + +Learn more about the monitor in the [`monitor` module](https://docs.rs/apalis-core/1.0.0-beta.1/apalis_core/monitor/index.html. + +### Middleware + +Built on the `tower` ecosystem, `apalis-core` provides extensive middleware support like error handling, timeouts, rate limiting, and observability. + +#### Core Middleware + +The following middleware layers are included with their worker extensions: +- [`AcknowledgmentLayer`] - Task acknowledgment after processing +- [`EventListenerLayer`] - Worker event emission and handling +- [`CircuitBreakerLayer`] - Circuit breaker pattern for failure handling +- [`LongRunningLayer`] - Support for tracking long-running tasks + +#### Extending with middleware + +You can write your own middleware to run code before or after a task is processed. + +
+Creating Custom Middleware + +Here's a simple example of a logging middleware layer: + +```rust +use apalis_core::task::Task; +use tower::{Layer, Service}; +use std::task::{Context, Poll}; + +// Define a logging service that wraps another service +pub struct LoggingService { + inner: S, +} + +impl Service> for LoggingService +where + S: Service, Response = Res, Error = Err>, + Req: std::fmt::Debug, +{ + type Response = Res; + type Error = Err; + type Future = S::Future; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: Task) -> Self::Future { + println!("Processing task: {:?}", req.args); + self.inner.call(req) + } +} + +// Define a layer that wraps services with LoggingService +pub struct LoggingLayer; + +impl Layer for LoggingLayer { + type Service = LoggingService; + + fn layer(&self, service: S) -> Self::Service { + LoggingService { inner: service } + } +} +``` +
+ +If you want your middleware to do more than just intercept requests and responses, you can use extension traits. See the [`worker::ext`](crate::worker::ext) module for examples. + +### Error Handling + +`apalis-core` defines a comprehensive error taxonomy for robust error handling: + +- [`AbortError`] - Non-retryable fatal errors requiring immediate termination +- [`RetryAfterError`] - Retryable execution errors triggering retry mechanisms after a delay +- [`DeferredError`] - Retryable execution errors triggering immediate retry + +This error classification enables precise error handling strategies and +appropriate retry behavior for different failure scenarios. + +### Graceful Shutdown + +`apalis-core` has a reliable graceful shutdown system that makes sure +workers stop safely and all tasks finish before shutting down: + +**Key Features:** +- Task tracking: Workers keep track of how many tasks are running. +- Shutdown control: The system waits until all tasks are finished before shutting down. +- Monitor coordination: A shared [`Shutdown`] token helps all workers stop together. +- Timeout: You can set a time limit for shutdown using [`with_terminator`](crate::monitor::Monitor::with_terminator). + +Learn more about the graceful shutdown process in the [`monitor`](crate::monitor#graceful-shutdown-with-timeout) module. + +License: MIT diff --git a/apalis-sql/Cargo.toml b/apalis-sql/Cargo.toml index 0ba84cf4..2b29ce00 100644 --- a/apalis-sql/Cargo.toml +++ b/apalis-sql/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "apalis-sql" -version = "1.0.0-beta.1" +version = "1.0.0-beta.2" authors = ["Njuguna Mureithi "] edition.workspace = true repository.workspace = true @@ -13,7 +13,7 @@ keywords = ["job", "task", "scheduler", "worker", "sql"] categories = ["database", "asynchronous"] [dependencies] -apalis-core = { path = "../apalis-core", version = "1.0.0-beta.1", default-features = false, features = [ +apalis-core = { path = "../apalis-core", version = "1.0.0-beta.2", default-features = false, features = [ "sleep", "json", ] } diff --git a/apalis-workflow/Cargo.toml b/apalis-workflow/Cargo.toml index f60a509a..00cc6c01 100644 --- a/apalis-workflow/Cargo.toml +++ b/apalis-workflow/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "apalis-workflow" -version = "0.1.0-beta.1" +version = "0.1.0-beta.2" edition.workspace = true repository.workspace = true authors = ["Njuguna Mureithi "] @@ -21,14 +21,14 @@ uuid = ["dep:uuid"] petgraph = { version = "0.8.2", features = ["serde-1"] } serde = { version = "1.0", features = ["derive"] } tower = { version = "0.5", features = ["util"], default-features = false } -apalis-core = { path = "../apalis-core", version = "1.0.0-beta.1", default-features = false, features = [ +apalis-core = { path = "../apalis-core", version = "1.0.0-beta.2", default-features = false, features = [ "sleep", "json", ] } futures = "0.3.30" thiserror = "2.0.0" # TODO: move to workspace dependencies -tracing = { version = "0.1.42", default-features = false, optional = true } +tracing = { version = "0.1.41", default-features = false, optional = true } ulid = { version = "1", optional = true } uuid = { version = "1", features = ["v4"], optional = true } diff --git a/apalis-workflow/README.md b/apalis-workflow/README.md index c31842b8..d7984cbb 100644 --- a/apalis-workflow/README.md +++ b/apalis-workflow/README.md @@ -55,7 +55,7 @@ You can track your workflows using [apalis-board](https://github.com/apalis-dev/ ## Backend Support -- [x] [JSONStorage](https://docs.rs/apalis-core/1.0.0-beta.1/apalis_core/backend/json/struct.JsonStorage.html) +- [x] [JSONStorage](https://docs.rs/apalis-core/1.0.0-beta.2/apalis_core/backend/json/struct.JsonStorage.html) - [x] [SqliteStorage](https://docs.rs/apalis-sqlite#workflow-example) - [x] [RedisStorage](https://docs.rs/apalis-redis#workflow-example) - [x] [PostgresStorage](https://docs.rs/apalis-postgres#workflow-example) diff --git a/apalis/Cargo.toml b/apalis/Cargo.toml index 873a4218..7e30f496 100644 --- a/apalis/Cargo.toml +++ b/apalis/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "apalis" -version = "1.0.0-beta.1" +version = "1.0.0-beta.2" authors = ["Geoffrey Mureithi "] description = "Simple, extensible and multithreaded background task processing for Rust" edition.workspace = true @@ -59,7 +59,7 @@ full = [ docsrs = ["document-features"] [dependencies.apalis-core] -version = "1.0.0-beta.1" +version = "1.0.0-beta.2" default-features = false path = "../apalis-core" @@ -86,7 +86,7 @@ version = "0.1.40" optional = true [dev-dependencies] -apalis-core = { path = "../apalis-core", version = "1.0.0-beta.1" } +apalis-core = { path = "../apalis-core", version = "1.0.0-beta.2" } serde = "1" tokio = { version = "1", features = ["full"] } apalis-workflow = { path = "../apalis-workflow", version = "0.1.0-alpha.6" } # For README diff --git a/apalis/README.md b/apalis/README.md index 6d53048f..b2635c42 100644 --- a/apalis/README.md +++ b/apalis/README.md @@ -32,7 +32,7 @@ ## Features -- **Simple and predictable task handling** - [Task handlers](https://docs.rs/apalis-core/1.0.0-beta.1/apalis_core/task_fn/guide/index.html) are just async functions with a macro-free API +- **Simple and predictable task handling** - [Task handlers](https://docs.rs/apalis-core/1.0.0-beta.2/apalis_core/task_fn/guide/index.html) are just async functions with a macro-free API - **Robust task execution** - Built-in support for retries, timeouts, and error handling - **Multiple storage backends** - Support for Redis, PostgreSQL, SQLite, and in-memory storage - **Advanced task management** - Task prioritization, scheduling, metadata, and result tracking @@ -64,7 +64,7 @@ To get started, just add to Cargo.toml ```toml [dependencies] -apalis = { version = "1.0.0-beta.1" } +apalis = { version = "1.0.0-beta.2" } # apalis-redis = { version = "1.0.0-alpha.1" } # Use redis/sqlite/postgres etc ``` diff --git a/examples/basics/Cargo.toml b/examples/basics/Cargo.toml index 9fa07be1..7f816a3c 100644 --- a/examples/basics/Cargo.toml +++ b/examples/basics/Cargo.toml @@ -12,7 +12,7 @@ apalis = { path = "../../apalis", features = ["limit", "catch-panic", "tracing"] apalis-core = { path = "../../apalis-core" , features = ["json"]} serde = "1" serde_json = "1" -tracing-subscriber = "0.3.21" +tracing-subscriber = "0.3.20" email-service = { path = "../email-service" } futures = "0.3" tower = "0.4" diff --git a/examples/catch-panic/Cargo.toml b/examples/catch-panic/Cargo.toml index 86644a81..430fa39c 100644 --- a/examples/catch-panic/Cargo.toml +++ b/examples/catch-panic/Cargo.toml @@ -9,7 +9,7 @@ anyhow = "1" tokio = { version = "1", features = ["full"] } apalis = { path = "../../apalis", features = ["limit", "catch-panic", "tracing"] } serde = { version = "1", features = ["derive"] } -tracing-subscriber = "0.3.21" +tracing-subscriber = "0.3.20" email-service = { path = "../email-service" } diff --git a/examples/fn-args/Cargo.toml b/examples/fn-args/Cargo.toml index a54810ae..dfc49b66 100644 --- a/examples/fn-args/Cargo.toml +++ b/examples/fn-args/Cargo.toml @@ -8,7 +8,7 @@ repository.workspace = true tokio = { version = "1", features = ["full"] } apalis = { path = "../../apalis", features = ["tracing"] } serde = { version = "1", features = ["derive"] } -tracing-subscriber = "0.3.21" +tracing-subscriber = "0.3.20" futures = "0.3" tower = "0.4" diff --git a/examples/graceful-shutdown/Cargo.toml b/examples/graceful-shutdown/Cargo.toml index 89d93076..8d6c3a82 100644 --- a/examples/graceful-shutdown/Cargo.toml +++ b/examples/graceful-shutdown/Cargo.toml @@ -10,7 +10,7 @@ tokio = { version = "1", features = ["full"] } apalis = { path = "../../apalis", features = ["limit", "catch-panic"] } apalis-core = { path = "../../apalis-core", features = ["json"] } serde = "1" -tracing-subscriber = "0.3.21" +tracing-subscriber = "0.3.20" futures = "0.3" tower = "0.4" diff --git a/examples/prometheus/Cargo.toml b/examples/prometheus/Cargo.toml index c4ce0135..a4a7db1b 100644 --- a/examples/prometheus/Cargo.toml +++ b/examples/prometheus/Cargo.toml @@ -8,8 +8,8 @@ publish = false anyhow = "1" axum = "0.5.6" tokio = { version = "1.0", features = ["full"] } -tracing = "0.1.42" -tracing-subscriber = { version = "0.3.21", features = ["env-filter"] } +tracing = "0.1.41" +tracing-subscriber = { version = "0.3.20", features = ["env-filter"] } serde = { version = "1.0", features = ["derive"] } apalis = { path = "../../apalis", features = ["prometheus"] } apalis-core = { path = "../../apalis-core", features = ["json"] } diff --git a/examples/retries/Cargo.toml b/examples/retries/Cargo.toml index 3330de63..f7bbed64 100644 --- a/examples/retries/Cargo.toml +++ b/examples/retries/Cargo.toml @@ -8,7 +8,7 @@ repository.workspace = true anyhow = "1" apalis = { path = "../../apalis", features = ["retry", "limit"] } serde = "1" -tracing-subscriber = "0.3.21" +tracing-subscriber = "0.3.20" chrono = { version = "0.4", default-features = false, features = ["clock"] } tokio = { version = "1", features = ["full"] } email-service = { path = "../email-service" } diff --git a/examples/sentry/Cargo.toml b/examples/sentry/Cargo.toml index b66974ba..0078887f 100644 --- a/examples/sentry/Cargo.toml +++ b/examples/sentry/Cargo.toml @@ -10,7 +10,7 @@ anyhow = "1" apalis = { path = "../../apalis", features = ["sentry"] } serde = "1" env_logger = "0.10" -tracing-subscriber = { version = "0.3.21", features = ["env-filter"] } +tracing-subscriber = { version = "0.3.20", features = ["env-filter"] } sentry = "0.37.0" sentry-tower = "0.37.0" sentry-tracing = "0.37.0" diff --git a/examples/tracing/Cargo.toml b/examples/tracing/Cargo.toml index ecea47cd..f6bcaa23 100644 --- a/examples/tracing/Cargo.toml +++ b/examples/tracing/Cargo.toml @@ -11,7 +11,7 @@ apalis = { path = "../../apalis" } serde = "1" tokio = { version = "1", features = ["full"] } env_logger = "0.10" -tracing-subscriber = { version = "0.3.21", features = ["env-filter", "json"] } +tracing-subscriber = { version = "0.3.20", features = ["env-filter", "json"] } chrono = { version = "0.4", default-features = false, features = ["clock"] } email-service = { path = "../email-service" } futures = "0.3" diff --git a/examples/unmonitored-worker/Cargo.toml b/examples/unmonitored-worker/Cargo.toml index 82c1de48..3f069073 100644 --- a/examples/unmonitored-worker/Cargo.toml +++ b/examples/unmonitored-worker/Cargo.toml @@ -8,7 +8,7 @@ repository.workspace = true tokio = { version = "1", features = ["full"] } apalis = { path = "../../apalis", features = ["limit", "catch-panic"] } serde = { version = "1", features = ["derive"] } -tracing-subscriber = "0.3.21" +tracing-subscriber = "0.3.20" futures = "0.3" tower = "0.4" diff --git a/examples/workflow/Cargo.toml b/examples/workflow/Cargo.toml index 207f94f5..46287e47 100644 --- a/examples/workflow/Cargo.toml +++ b/examples/workflow/Cargo.toml @@ -12,7 +12,7 @@ apalis-workflow = { path = "../../apalis-workflow" } apalis-core = { path = "../../apalis-core", features = ["json"] } serde = { version = "1", features = ["derive"] } serde_json = "1" -tracing-subscriber = "0.3.21" +tracing-subscriber = "0.3.20" futures = "0.3" [dependencies.tracing] diff --git a/supply-chain/config.toml b/supply-chain/config.toml index d76ab7b4..783d28f8 100644 --- a/supply-chain/config.toml +++ b/supply-chain/config.toml @@ -47,11 +47,11 @@ version = "1.0.100" criteria = "safe-to-deploy" [[exemptions.apalis-core]] -version = "1.0.0-beta.1" +version = "1.0.0-beta.2" criteria = "safe-to-deploy" [[exemptions.apalis-workflow]] -version = "0.1.0-beta.1" +version = "0.1.0-beta.2" criteria = "safe-to-deploy" [[exemptions.arrayvec]] @@ -1159,7 +1159,7 @@ version = "0.3.3" criteria = "safe-to-deploy" [[exemptions.tracing]] -version = "0.1.42" +version = "0.1.41" criteria = "safe-to-deploy" [[exemptions.tracing-attributes]] @@ -1191,7 +1191,7 @@ version = "0.2.0" criteria = "safe-to-deploy" [[exemptions.tracing-subscriber]] -version = "0.3.21" +version = "0.3.20" criteria = "safe-to-deploy" [[exemptions.try-lock]]