Skip to content

Conversation

@nutcas3
Copy link

@nutcas3 nutcas3 commented Nov 30, 2025

cc @geofmureithi how is this provide feedback on this then

Overview

This PR introduces a new PostgreSQL-based message queue backend for the Apalis job processing library using the pgmq crate. It provides a robust, persistent queue implementation for background job processing.

Features

  • PgMq Backend: Full implementation of apalis_core::backend::Backend trait
  • Task Management: Support for enqueuing, polling, and acknowledging tasks
  • Message Handling: Automatic message archival after max retries
  • Configurable Parameters:
    • Visibility timeout for message locks
    • Custom poll intervals
    • Configurable max retry attempts
  • Context Tracking: Includes message metadata (ID, read count, timestamps) in task context

Implementation Details

  • Uses pgmq for PostgreSQL-based message queue operations
  • Implements proper error handling with custom error types
  • Follows Rust best practices with proper module organization
  • Includes comprehensive documentation

Usage Example

let pgmq = PgMq::new("postgres://user:pass@localhost:5432/db", "my_queue").await?;
pgmq.enqueue(MyJob { /* ... */ }).await?;

- Add PgMq struct implementing apalis_core::backend::Backend
- Support task enqueueing, polling, and acking via pgmq
- Add configurable visibility timeout, poll interval, and retry logic
- Include PgMqContext for message metadata tracking
- Implement automatic message archival after max retries
- Add PgMqError enum with thiserror derive
- Include variants for PGMQ errors, serialization errors, and queue operations
- Implement From traits for PgmqCrateError and serde_json::Error
- Create lib.rs with module declarations for backend and errors
- Export PgMq, PgMqContext from backend module
- Export PgMqError from errors module
- Remove async-stream dependency from Cargo.toml
- Replace stream! macro with futures::stream::unfold in stream and heartbeat methods
- Remove unused tokio dev-dependency
- Change "pgmq backend" to "PGMQ backend" for consistency
- Update "message queue implementation" to "message queue backend"
- Replace "Immutable messages" feature with "Auto-retry" and add "Configurable polling"
- Update dependency versions in installation section (apalis-core 1.0.0-beta.2)
- Remove tokio from dependencies example
- Consolidate PostgreSQL setup with PGMQ extension installation in single code block
- Restructure usage section
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant