diff --git a/.gitignore b/.gitignore index ad67955..2b97d10 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,9 @@ target # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ + + +# Added by cargo + +/target +cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..5146711 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "apalis-pgmq" +version = "0.1.0" +edition = "2024" +authors = ["Nutcas3 "] +description = "PGMQ storage backend for Apalis" +license = "MIT" +repository = "https://github.com/apalis-dev/apalis-pgmq" +keywords = ["apalis", "pgmq", "postgres", "message-queue", "background-jobs"] +categories = ["asynchronous", "database"] + +[dependencies] +apalis-core = "1.0.0-beta.2" +pgmq = "0.31" +serde = { version = "1", features = ["derive"] } +serde_json = "1" +thiserror = "2.0.17" +chrono = { version = "0.4", features = ["serde"] } +futures = "0.3" +tower = "0.5" + +[dev-dependencies] +apalis = "1.0.0-beta.2" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } + +[features] +default = [] diff --git a/README.md b/README.md index e694c3e..d78fea1 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,197 @@ # apalis-pgmq -Background task processing in rust using apalis and pgmq + +[![Crates.io](https://img.shields.io/crates/v/apalis-pgmq.svg)](https://crates.io/crates/apalis-pgmq) +[![Documentation](https://docs.rs/apalis-pgmq/badge.svg)](https://docs.rs/apalis-pgmq) +[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](LICENSE) + +PGMQ backend for [Apalis](https://github.com/apalis-dev/apalis), providing a PostgreSQL-based message queue for distributed job processing. + +## Overview + +This crate provides a message queue backend for Apalis using [PGMQ](https://github.com/tembo-io/pgmq) (PostgreSQL Message Queue). PGMQ is a lightweight, Postgres-based message queue with guaranteed delivery and visibility timeouts, similar to AWS SQS. + +## Features + +- **Persistent storage** – Messages stored in PostgreSQL survive restarts +- **Visibility timeout** – Messages become invisible during processing and reappear if not acknowledged +- **Guaranteed delivery** – Messages are delivered to exactly one consumer within the visibility timeout +- **Distributed processing** – Multiple workers can process messages concurrently +- **Auto-retry** – Failed messages automatically become visible again after timeout +- **Archive support** – Messages can be archived for long-term retention instead of deletion +- **Configurable polling** – Adjust poll intervals and visibility timeouts per queue + +## Installation + +Add this to your `Cargo.toml`: + +```toml +[dependencies] +apalis-core = "1.0.0-beta.2" +apalis-pgmq = "0.1" +serde = { version = "1", features = ["derive"] } +``` + +## Prerequisites + +You need PostgreSQL 13+ with the PGMQ extension installed. Follow the [PGMQ installation guide](https://github.com/tembo-io/pgmq#installation). + +For development, you can use Docker: + +```bash +docker run -d --name postgres \ + -e POSTGRES_PASSWORD=postgres \ + -p 5432:5432 \ + postgres:15 + +# Install PGMQ extension +docker exec -it postgres psql -U postgres -c "CREATE EXTENSION IF NOT EXISTS pgmq CASCADE;" +``` + +## Usage + +### Creating a Backend + +```rust +use apalis_pgmq::PgMq; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct Job { + id: u64, + data: String, +} + +// Create a PGMQ backend +let backend = PgMq::::new( + "postgres://postgres:postgres@localhost:5432/postgres", + "jobs" +) +.await? +.with_visibility_timeout(std::time::Duration::from_secs(60)) +.with_poll_interval(std::time::Duration::from_millis(500)) +.with_max_retries(5); +``` + +### Enqueuing Messages + +```rust +// Enqueue a message +let msg_id = backend.enqueue(Job { + id: 1, + data: "process this".to_string(), +}).await?; + +println!("Enqueued message with ID: {}", msg_id); +``` + +### Using with Apalis + +Use the backend with Apalis `WorkerBuilder`: + +```rust +use apalis_core::backend::Backend; + +// The backend implements the Backend trait +// Use it with WorkerBuilder to create workers +let worker = WorkerBuilder::new("my-worker") + .backend(backend) + .build(my_handler_function); +``` + +## Configuration + +### Visibility Timeout + +Set how long messages stay invisible after being read (default: 30 seconds): + +```rust +let backend = PgMq::::new(url, "queue") + .await? + .with_visibility_timeout(std::time::Duration::from_secs(120)); // 2 minutes +``` + +### Poll Interval + +Set how frequently to poll for new messages (default: 100ms): + +```rust +let backend = PgMq::::new(url, "queue") + .await? + .with_poll_interval(std::time::Duration::from_millis(500)); // 500ms +``` + +### Max Retries + +Set maximum retry attempts before archiving (default: 5): + +```rust +let backend = PgMq::::new(url, "queue") + .await? + .with_max_retries(3); // Archive after 3 failed attempts +``` + +## API Reference + +### PgMq Methods + +- `new(connection_url, queue_name)` - Create a new PGMQ backend +- `with_visibility_timeout(duration)` - Set visibility timeout +- `with_poll_interval(duration)` - Set poll interval +- `with_max_retries(count)` - Set max retry attempts +- `enqueue(job)` - Enqueue a message, returns message ID +- `ack(msg_id)` - Acknowledge (delete) a message +- `archive(msg_id)` - Archive a message +- `retry(msg_id, read_count)` - Retry logic (archives if max retries exceeded) + +### PgMqContext + +The context provides message metadata: + +```rust +pub struct PgMqContext { + pub msg_id: Option, + pub read_ct: Option, + pub enqueued_at: Option>, + pub vt: Option>, +} +``` + +## Message Acknowledgment + +Messages are automatically acknowledged by Apalis when your job handler completes successfully. If the handler returns an error or panics, the message will become visible again after the visibility timeout expires. + +## Key Differences from Storage Backends + +Unlike storage backends (like `apalis-sql`), message queues: + +1. **Are immutable** – Messages cannot be updated after being enqueued +2. **Have visibility timeouts** – Messages become temporarily invisible when read +3. **Auto-requeue on failure** – Unacknowledged messages automatically return to the queue +4. **No job state tracking** – Messages are either in the queue or processed (no "running" state) +5. **Simpler semantics** – Focus on message delivery rather than job lifecycle management + +## Comparison with apalis-sql + +| Feature | apalis-pgmq | apalis-sql | +|---------|-------------|------------| +| Backend | PGMQ | Direct SQL | +| Message mutability | Immutable | Mutable | +| Visibility timeout | Yes | No | +| Auto-requeue | Yes | Manual | +| Job state tracking | No | Yes (pending/running/done) | +| Archive support | Yes | No | +| Use case | Message queues | Job queues with state | +| Complexity | Lower | Higher | + +## Contributing + +Contributions are welcome! Please feel free to submit a Pull Request. + +## License + +This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details. + +## Acknowledgments + +- [Apalis](https://github.com/apalis-dev/apalis) - Simple, extensible multithreaded background job processing library for Rust +- [PGMQ](https://github.com/tembo-io/pgmq) - PostgreSQL Message Queue diff --git a/src/backend.rs b/src/backend.rs new file mode 100644 index 0000000..d8e03d2 --- /dev/null +++ b/src/backend.rs @@ -0,0 +1,164 @@ +use apalis_core::backend::Backend; +use apalis_core::task::Task; +use apalis_core::timer::sleep; +use apalis_core::worker::context::WorkerContext; +use futures::stream::{Stream, unfold}; +use pgmq::{Message as PgmqMessage, PGMQueue}; +use serde::{Deserialize, Serialize}; +use std::marker::PhantomData; +use std::pin::Pin; + +use crate::errors::PgMqError; + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct PgMqContext { + pub msg_id: Option, + pub read_ct: Option, + pub enqueued_at: Option>, + pub vt: Option>, +} + +#[derive(Clone)] +pub struct PgMq { + queue: PGMQueue, + queue_name: String, + visibility_timeout: std::time::Duration, + poll_interval: std::time::Duration, + max_retries: i32, + _phantom: PhantomData, +} + +impl PgMq +where + T: Serialize + for<'de> Deserialize<'de>, +{ + pub async fn new(connection_url: &str, queue_name: &str) -> Result { + let queue = PGMQueue::new(connection_url.to_owned()) + .await + .map_err(PgMqError::Pgmq)?; + + queue.create(queue_name) + .await + .map_err(PgMqError::Pgmq)?; + + Ok(Self { + queue, + queue_name: queue_name.to_owned(), + visibility_timeout: std::time::Duration::from_secs(30), + poll_interval: std::time::Duration::from_millis(100), + max_retries: 5, + _phantom: PhantomData, + }) + } + + pub fn with_visibility_timeout(mut self, timeout: std::time::Duration) -> Self { + self.visibility_timeout = timeout; + self + } + + pub fn with_poll_interval(mut self, interval: std::time::Duration) -> Self { + self.poll_interval = interval; + self + } + + pub fn with_max_retries(mut self, max_retries: i32) -> Self { + self.max_retries = max_retries; + self + } + + pub async fn ack(&self, msg_id: i64) -> Result<(), PgMqError> { + self.queue + .delete(&self.queue_name, msg_id) + .await + .map_err(PgMqError::Pgmq)?; + Ok(()) + } + + pub async fn archive(&self, msg_id: i64) -> Result<(), PgMqError> { + self.queue + .archive(&self.queue_name, msg_id) + .await + .map_err(PgMqError::Pgmq)?; + Ok(()) + } + + pub async fn enqueue(&self, job: T) -> Result { + let msg_id = self.queue + .send(&self.queue_name, &job) + .await + .map_err(PgMqError::Pgmq)?; + Ok(msg_id) + } + + pub async fn retry(&self, msg_id: i64, read_count: i32) -> Result<(), PgMqError> { + if read_count >= self.max_retries { + self.archive(msg_id).await?; + } + Ok(()) + } +} + +impl Backend for PgMq +where + T: Serialize + for<'de> Deserialize<'de> + Send + Sync + Unpin + 'static, +{ + type Args = T; + type IdType = i64; + type Context = PgMqContext; + type Error = PgMqError; + type Stream = Pin>, Self::Error>> + Send>>; + type Beat = Pin> + Send>>; + type Layer = tower::layer::util::Identity; + + fn poll(self, _worker: &WorkerContext) -> Self::Stream { + let queue = self.queue.clone(); + let queue_name = self.queue_name.clone(); + let visibility_timeout_secs = self.visibility_timeout.as_secs() as i32; + let poll_interval = self.poll_interval; + + let state = (queue, queue_name, visibility_timeout_secs, poll_interval); + + Box::pin(unfold(state, |(queue, queue_name, visibility_timeout_secs, poll_interval)| async move { + let result: Result>, _> = queue + .read(&queue_name, Some(visibility_timeout_secs)) + .await; + + let next_state = (queue, queue_name, visibility_timeout_secs, poll_interval); + + match result { + Ok(Some(msg)) => { + let context = PgMqContext { + msg_id: Some(msg.msg_id), + read_ct: Some(msg.read_ct), + enqueued_at: Some(msg.enqueued_at), + vt: Some(msg.vt), + }; + + let mut task = Task::new(msg.message); + task.parts.ctx = context; + task.parts.task_id = Some(apalis_core::task::task_id::TaskId::new(msg.msg_id)); + + Some((Ok(Some(task)), next_state)) + } + Ok(None) => { + sleep(poll_interval).await; + Some((Ok(None), next_state)) + } + Err(e) => { + Some((Err(PgMqError::Pgmq(e)), next_state)) + } + } + })) + } + + fn heartbeat(&self, _worker: &WorkerContext) -> Self::Beat { + Box::pin(unfold((), |_| async { + sleep(std::time::Duration::from_secs(30)).await; + Some((Ok(()), ())) + })) + } + + fn middleware(&self) -> Self::Layer { + tower::layer::util::Identity::new() + } +} \ No newline at end of file diff --git a/src/errors.rs b/src/errors.rs new file mode 100644 index 0000000..79880d4 --- /dev/null +++ b/src/errors.rs @@ -0,0 +1,14 @@ +use pgmq::PgmqError as PgmqCrateError; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum PgMqError { + #[error("PGMQ error: {0}")] + Pgmq(#[from] PgmqCrateError), + + #[error("Serialization error: {0}")] + Serialization(#[from] serde_json::Error), + + #[error("Queue operation error: {0}")] + QueueOperation(String), +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..341194d --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,5 @@ +mod backend; +mod errors; + +pub use backend::{PgMq, PgMqContext}; +pub use errors::PgMqError;