diff --git a/.clippy.toml b/.clippy.toml new file mode 100644 index 0000000..84d510c --- /dev/null +++ b/.clippy.toml @@ -0,0 +1,20 @@ +# Clippy configuration file +msrv = "1.75.0" + +# Allow certain lints that might be too pedantic for this project +allow-expect-in-tests = true +allow-unwrap-in-tests = true + +# Disallow certain patterns +disallowed-methods = [ + "std::collections::HashMap::new", # prefer FxHashMap for performance + "std::collections::HashSet::new", # prefer FxHashSet for performance +] + +# Set limits +too-many-arguments-threshold = 7 +type-complexity-threshold = 250 +trivial-copy-size-limit = 64 + +# Documentation +missing-docs-in-crate-items = true \ No newline at end of file diff --git a/.github/ISSUE_TEMPLATE/bug_report.yml b/.github/ISSUE_TEMPLATE/bug_report.yml new file mode 100644 index 0000000..0057f20 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/bug_report.yml @@ -0,0 +1,89 @@ +name: Bug Report +description: Report a bug or unexpected behavior +title: "[Bug]: " +labels: ["bug"] + +body: + - type: markdown + attributes: + value: | + Thank you for reporting a bug! Please fill out this template to help us resolve the issue. + + - type: textarea + id: description + attributes: + label: Description + description: A clear and concise description of what the bug is. + placeholder: Describe the bug... + validations: + required: true + + - type: textarea + id: reproduction + attributes: + label: Steps to Reproduce + description: Steps to reproduce the behavior + placeholder: | + 1. Create a RedisMq instance with... + 2. Send a message with... + 3. Observe error... + validations: + required: true + + - type: textarea + id: expected + attributes: + label: Expected Behavior + description: What you expected to happen + placeholder: I expected... + validations: + required: true + + - type: textarea + id: actual + attributes: + label: Actual Behavior + description: What actually happened + placeholder: Instead, what happened was... + validations: + required: true + + - type: textarea + id: code + attributes: + label: Code Sample + description: If applicable, add a minimal code sample that reproduces the issue + render: rust + placeholder: | + use apalis_rsmq::RedisMq; + + #[tokio::main] + async fn main() { + // Your code here + } + + - type: textarea + id: environment + attributes: + label: Environment + description: Please provide information about your environment + placeholder: | + - OS: [e.g. Ubuntu 22.04] + - Rust version: [e.g. 1.75.0] + - apalis-rsmq version: [e.g. 0.1.0-alpha.1] + - Redis version: [e.g. 7.0] + validations: + required: true + + - type: textarea + id: logs + attributes: + label: Logs/Error Messages + description: If applicable, add relevant log output or error messages + render: text + + - type: textarea + id: additional + attributes: + label: Additional Context + description: Add any other context about the problem here \ No newline at end of file diff --git a/.github/ISSUE_TEMPLATE/feature_request.yml b/.github/ISSUE_TEMPLATE/feature_request.yml new file mode 100644 index 0000000..1421c20 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/feature_request.yml @@ -0,0 +1,64 @@ +name: Feature Request +description: Suggest a new feature or enhancement +title: "[Feature]: " +labels: ["enhancement"] + +body: + - type: markdown + attributes: + value: | + Thank you for suggesting a new feature! Please fill out this template to help us understand your request. + + - type: textarea + id: problem + attributes: + label: Problem Statement + description: Is your feature request related to a problem? Please describe. + placeholder: I'm always frustrated when... + validations: + required: true + + - type: textarea + id: solution + attributes: + label: Proposed Solution + description: Describe the solution you'd like to see + placeholder: I would like to see... + validations: + required: true + + - type: textarea + id: alternatives + attributes: + label: Alternatives Considered + description: Describe any alternative solutions or features you've considered + placeholder: I've also considered... + + - type: textarea + id: use_case + attributes: + label: Use Case + description: Describe your use case and how this feature would help + placeholder: This feature would help me... + validations: + required: true + + - type: textarea + id: implementation + attributes: + label: Implementation Ideas + description: If you have ideas about how this could be implemented, please share + placeholder: This could be implemented by... + + - type: checkboxes + id: contribution + attributes: + label: Contribution + options: + - label: I would be willing to implement this feature + + - type: textarea + id: additional + attributes: + label: Additional Context + description: Add any other context, screenshots, or examples about the feature request \ No newline at end of file diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md new file mode 100644 index 0000000..929f5b3 --- /dev/null +++ b/.github/pull_request_template.md @@ -0,0 +1,57 @@ +## Description + +Brief description of the changes in this PR. + +## Type of Change + +Please check the relevant option: + +- [ ] Bug fix (non-breaking change which fixes an issue) +- [ ] New feature (non-breaking change which adds functionality) +- [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) +- [ ] Documentation update +- [ ] Performance improvement +- [ ] Code refactoring +- [ ] Other (please describe): + +## Changes Made + +- List the specific changes made +- Include any new files added +- Mention any files deleted or moved + +## Testing + +- [ ] I have added tests that prove my fix is effective or that my feature works +- [ ] New and existing unit tests pass locally with my changes +- [ ] I have added integration tests where appropriate + +## Documentation + +- [ ] I have updated the documentation accordingly +- [ ] I have updated the CHANGELOG.md file +- [ ] I have added docstring comments to new public functions/methods + +## Checklist + +- [ ] My code follows the project's style guidelines +- [ ] I have performed a self-review of my own code +- [ ] I have commented my code, particularly in hard-to-understand areas +- [ ] My changes generate no new warnings +- [ ] I have run `cargo fmt` to format my code +- [ ] I have run `cargo clippy` and resolved any issues +- [ ] Any dependent changes have been merged and published + +## Breaking Changes + +If this PR contains breaking changes, please describe them here and provide migration instructions. + +## Additional Notes + +Any additional information that reviewers should know about this PR. + +## Related Issues + +Fixes #(issue number) +Closes #(issue number) +Related to #(issue number) \ No newline at end of file diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml new file mode 100644 index 0000000..bddc165 --- /dev/null +++ b/.github/workflows/benchmark.yml @@ -0,0 +1,63 @@ +name: Benchmarks + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + workflow_dispatch: + +env: + CARGO_TERM_COLOR: always + +jobs: + benchmark: + name: Run Benchmarks + runs-on: ubuntu-latest + + services: + redis: + image: redis:7-alpine + ports: + - 6379:6379 + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + + - name: Cache dependencies + uses: Swatinem/rust-cache@v2 + + - name: Install cargo-criterion + run: cargo install cargo-criterion + + - name: Run benchmarks + run: cargo criterion --message-format=json > benchmark-results.json + env: + REDIS_URL: redis://localhost:6379 + + - name: Store benchmark results + uses: benchmark-action/github-action-benchmark@v1 + if: github.ref == 'refs/heads/main' + with: + tool: 'cargo' + output-file-path: benchmark-results.json + github-token: ${{ secrets.GITHUB_TOKEN }} + auto-push: true + comment-on-alert: true + alert-threshold: '150%' + fail-on-alert: false + + - name: Upload benchmark results + uses: actions/upload-artifact@v4 + with: + name: benchmark-results + path: benchmark-results.json \ No newline at end of file diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..5574a90 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,168 @@ +name: CI + +on: + push: + branches: [ main, develop ] + pull_request: + branches: [ main, develop ] + +env: + CARGO_TERM_COLOR: always + RUST_BACKTRACE: 1 + +jobs: + test: + name: Test Suite + runs-on: ubuntu-latest + strategy: + matrix: + rust: [stable, beta, nightly] + include: + - rust: stable + coverage: true + + services: + redis: + image: redis:7-alpine + ports: + - 6379:6379 + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install Rust + uses: dtolnay/rust-toolchain@master + with: + toolchain: ${{ matrix.rust }} + + - name: Cache dependencies + uses: Swatinem/rust-cache@v2 + with: + key: ${{ runner.os }}-cargo-${{ matrix.rust }}-${{ hashFiles('**/Cargo.lock') }} + + - name: Check formatting + run: cargo fmt --all -- --check + if: matrix.rust == 'stable' + + - name: Run Clippy + run: cargo clippy --all-targets --all-features -- -D warnings + if: matrix.rust == 'stable' + + - name: Build + run: cargo build --verbose + + - name: Run tests + run: cargo test --verbose --all-features + env: + REDIS_URL: redis://localhost:6379 + + - name: Run doc tests + run: cargo test --doc --verbose + env: + REDIS_URL: redis://localhost:6379 + + - name: Install cargo-tarpaulin + run: cargo install cargo-tarpaulin + if: matrix.coverage == true + + - name: Generate code coverage + run: cargo tarpaulin --verbose --all-features --workspace --timeout 120 --out xml + env: + REDIS_URL: redis://localhost:6379 + if: matrix.coverage == true + + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v4 + with: + file: cobertura.xml + fail_ci_if_error: false + if: matrix.coverage == true + + security: + name: Security Audit + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + + - name: Cache dependencies + uses: Swatinem/rust-cache@v2 + + - name: Install cargo-audit + run: cargo install cargo-audit + + - name: Run security audit + run: cargo audit + + - name: Install cargo-deny + run: cargo install cargo-deny + + - name: Run cargo-deny + run: cargo deny check + + msrv: + name: Minimum Supported Rust Version + runs-on: ubuntu-latest + + services: + redis: + image: redis:7-alpine + ports: + - 6379:6379 + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install Rust MSRV + uses: dtolnay/rust-toolchain@1.75.0 + + - name: Cache dependencies + uses: Swatinem/rust-cache@v2 + + - name: Check MSRV + run: cargo check --all-features + env: + REDIS_URL: redis://localhost:6379 + + docs: + name: Documentation + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + + - name: Cache dependencies + uses: Swatinem/rust-cache@v2 + + - name: Build documentation + run: cargo doc --no-deps --all-features + env: + RUSTDOCFLAGS: "-D warnings" + + - name: Deploy documentation + uses: peaceiris/actions-gh-pages@v4 + if: github.ref == 'refs/heads/main' + with: + github_token: ${{ secrets.GITHUB_TOKEN }} + publish_dir: ./target/doc + destination_dir: docs \ No newline at end of file diff --git a/.github/workflows/code-quality.yml b/.github/workflows/code-quality.yml new file mode 100644 index 0000000..426d704 --- /dev/null +++ b/.github/workflows/code-quality.yml @@ -0,0 +1,95 @@ +name: Code Quality + +on: + push: + branches: [ main, develop ] + pull_request: + branches: [ main, develop ] + schedule: + # Run weekly on Sundays at 05:00 UTC + - cron: '0 5 * * 0' + +env: + CARGO_TERM_COLOR: always + +jobs: + unused-dependencies: + name: Check Unused Dependencies + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install Rust + uses: dtolnay/rust-toolchain@nightly + + - name: Cache dependencies + uses: Swatinem/rust-cache@v2 + + - name: Install cargo-machete + run: cargo install cargo-machete + + - name: Check for unused dependencies + run: cargo machete + + semver-checks: + name: Semantic Versioning Checks + runs-on: ubuntu-latest + if: github.event_name == 'pull_request' + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + + - name: Cache dependencies + uses: Swatinem/rust-cache@v2 + + - name: Install cargo-semver-checks + run: cargo install cargo-semver-checks + + - name: Check semver compatibility + run: cargo semver-checks check-release + + bloat-check: + name: Binary Size Analysis + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + + - name: Cache dependencies + uses: Swatinem/rust-cache@v2 + + - name: Install cargo-bloat + run: cargo install cargo-bloat + + - name: Analyze binary bloat + run: cargo bloat --release --crates + + duplicate-check: + name: Check Duplicate Dependencies + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + + - name: Cache dependencies + uses: Swatinem/rust-cache@v2 + + - name: Install cargo-duplicate + run: cargo install cargo-duplicate + + - name: Check for duplicate dependencies + run: cargo duplicate \ No newline at end of file diff --git a/.github/workflows/dependencies.yml b/.github/workflows/dependencies.yml new file mode 100644 index 0000000..571b6af --- /dev/null +++ b/.github/workflows/dependencies.yml @@ -0,0 +1,105 @@ +name: Dependencies + +on: + schedule: + # Run weekly on Mondays at 08:00 UTC + - cron: '0 8 * * 1' + workflow_dispatch: + +jobs: + update-dependencies: + name: Update Dependencies + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + + - name: Install cargo-edit + run: cargo install cargo-edit + + - name: Update dependencies + run: | + cargo update + cargo upgrade --workspace + + - name: Check if there are changes + id: changes + run: | + if git diff --quiet Cargo.toml Cargo.lock; then + echo "changed=false" >> $GITHUB_OUTPUT + else + echo "changed=true" >> $GITHUB_OUTPUT + fi + + - name: Create Pull Request + if: steps.changes.outputs.changed == 'true' + uses: peter-evans/create-pull-request@v6 + with: + token: ${{ secrets.GITHUB_TOKEN }} + commit-message: 'chore: update dependencies' + title: 'chore: update dependencies' + body: | + This PR updates the project dependencies to their latest versions. + + ## Changes + - Updated Cargo.toml dependencies + - Updated Cargo.lock + + Please review the changes and ensure all tests pass before merging. + branch: dependencies/auto-update + delete-branch: true + + audit-dependencies: + name: Audit Dependencies + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + + - name: Install cargo-audit + run: cargo install cargo-audit + + - name: Audit dependencies + run: cargo audit --deny warnings + + - name: Create issue for vulnerabilities + if: failure() + uses: actions/github-script@v7 + with: + script: | + const title = 'Security vulnerability detected in dependencies'; + const body = `A security vulnerability has been detected in the project dependencies. + + Please run \`cargo audit\` locally to see the details and update the affected dependencies. + + This issue was automatically created by the dependency audit workflow.`; + + // Check if issue already exists + const issues = await github.rest.issues.listForRepo({ + owner: context.repo.owner, + repo: context.repo.repo, + state: 'open', + labels: 'security' + }); + + const existingIssue = issues.data.find(issue => issue.title === title); + + if (!existingIssue) { + await github.rest.issues.create({ + owner: context.repo.owner, + repo: context.repo.repo, + title: title, + body: body, + labels: ['security', 'dependencies'] + }); + } \ No newline at end of file diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..3f2eed3 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,90 @@ +name: Release + +on: + push: + tags: + - 'v*' + +env: + CARGO_TERM_COLOR: always + +jobs: + create-release: + name: Create Release + runs-on: ubuntu-latest + outputs: + upload_url: ${{ steps.create_release.outputs.upload_url }} + version: ${{ steps.get_version.outputs.version }} + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Get version from tag + id: get_version + run: echo "version=${GITHUB_REF#refs/tags/v}" >> $GITHUB_OUTPUT + + - name: Create GitHub Release + id: create_release + uses: actions/create-release@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + tag_name: ${{ github.ref }} + release_name: Release ${{ steps.get_version.outputs.version }} + draft: false + prerelease: ${{ contains(steps.get_version.outputs.version, 'alpha') || contains(steps.get_version.outputs.version, 'beta') || contains(steps.get_version.outputs.version, 'rc') }} + + test-release: + name: Test Release Build + runs-on: ubuntu-latest + + services: + redis: + image: redis:7-alpine + ports: + - 6379:6379 + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + + - name: Cache dependencies + uses: Swatinem/rust-cache@v2 + + - name: Run tests + run: cargo test --all-features --release + env: + REDIS_URL: redis://localhost:6379 + + - name: Check package + run: cargo package --allow-dirty + + publish-crate: + name: Publish to crates.io + runs-on: ubuntu-latest + needs: [create-release, test-release] + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + + - name: Cache dependencies + uses: Swatinem/rust-cache@v2 + + - name: Login to crates.io + run: cargo login ${{ secrets.CRATES_IO_TOKEN }} + + - name: Publish to crates.io + run: cargo publish \ No newline at end of file diff --git a/.github/workflows/semantic-release.yml b/.github/workflows/semantic-release.yml new file mode 100644 index 0000000..f667c39 --- /dev/null +++ b/.github/workflows/semantic-release.yml @@ -0,0 +1,103 @@ +name: Semantic Release + +on: + push: + branches: + - main + workflow_dispatch: + +jobs: + release: + name: Semantic Release + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + fetch-depth: 0 + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + + - name: Cache dependencies + uses: Swatinem/rust-cache@v2 + + - name: Install cargo-release + run: cargo install cargo-release + + - name: Configure Git + run: | + git config --global user.name "github-actions[bot]" + git config --global user.email "github-actions[bot]@users.noreply.github.com" + + - name: Determine version bump + id: version + run: | + # Get the latest tag + LATEST_TAG=$(git describe --tags --abbrev=0 2>/dev/null || echo "v0.0.0") + echo "Latest tag: $LATEST_TAG" + + # Get commits since last tag + COMMITS=$(git log ${LATEST_TAG}..HEAD --oneline) + echo "Commits since last tag:" + echo "$COMMITS" + + # Determine version bump based on conventional commits + if echo "$COMMITS" | grep -q "^[a-f0-9]\+ \(BREAKING CHANGE\|feat!\|fix!\|chore!\)"; then + echo "bump=major" >> $GITHUB_OUTPUT + echo "Detected breaking changes - major version bump" + elif echo "$COMMITS" | grep -q "^[a-f0-9]\+ feat"; then + echo "bump=minor" >> $GITHUB_OUTPUT + echo "Detected new features - minor version bump" + elif echo "$COMMITS" | grep -q "^[a-f0-9]\+ \(fix\|docs\|style\|refactor\|perf\|test\|chore\)"; then + echo "bump=patch" >> $GITHUB_OUTPUT + echo "Detected fixes/improvements - patch version bump" + else + echo "bump=none" >> $GITHUB_OUTPUT + echo "No significant changes detected - no version bump" + fi + + - name: Release + if: steps.version.outputs.bump != 'none' + run: | + case "${{ steps.version.outputs.bump }}" in + major) + cargo release major --execute --no-confirm + ;; + minor) + cargo release minor --execute --no-confirm + ;; + patch) + cargo release patch --execute --no-confirm + ;; + esac + env: + CARGO_REGISTRY_TOKEN: ${{ secrets.CRATES_IO_TOKEN }} + + - name: Create GitHub Release + if: steps.version.outputs.bump != 'none' + uses: actions/github-script@v7 + with: + script: | + const { execSync } = require('child_process'); + + // Get the new version + const newTag = execSync('git describe --tags --abbrev=0').toString().trim(); + const version = newTag.replace('v', ''); + + // Get changelog since last release + const previousTag = execSync('git describe --tags --abbrev=0 HEAD^').toString().trim(); + const commits = execSync(`git log ${previousTag}..${newTag} --oneline`).toString().trim(); + + // Create release + await github.rest.repos.createRelease({ + owner: context.repo.owner, + repo: context.repo.repo, + tag_name: newTag, + name: `Release ${version}`, + body: `## Changes\n\n${commits}`, + draft: false, + prerelease: version.includes('alpha') || version.includes('beta') || version.includes('rc') + }); \ No newline at end of file diff --git a/.gitignore b/.gitignore index d01bd1a..cef76b7 100644 --- a/.gitignore +++ b/.gitignore @@ -18,4 +18,32 @@ Cargo.lock # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ \ No newline at end of file +#.idea/ + +# Added by cargo +/target + +# Benchmark results +criterion_outputs/ + +# Coverage reports +cobertura.xml +tarpaulin-report.html + +# Dependency updates +Cargo.lock.bak + +# IDE files +.vscode/ +*.swp +*.swo +*~ + +# OS generated files +.DS_Store +.DS_Store? +._* +.Spotlight-V100 +.Trashes +ehthumbs.db +Thumbs.db diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..9baac78 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,33 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## Unreleased + +### Added +- Comprehensive CI/CD pipelines with GitHub Actions +- Automated testing, linting, and security audits +- Automated dependency updates and vulnerability scanning +- Code coverage reporting with codecov +- Automated releases with semantic versioning +- Performance benchmarking +- Documentation generation and deployment +- Contribution guidelines and development workflow + +### Changed +- Enhanced README with CI badges and comprehensive documentation +- Added development tooling configuration (rustfmt, clippy, deny) + +## [0.1.0-alpha.1] - Initial Release + +### Added +- Initial implementation of Redis-backed message queue +- Integration with apalis framework +- Message enqueue and dequeue functionality +- Acknowledgment system for processed messages +- Configurable polling intervals +- Type-safe message handling with Serde +- Basic examples and documentation \ No newline at end of file diff --git a/CODEOWNERS b/CODEOWNERS new file mode 100644 index 0000000..176161e --- /dev/null +++ b/CODEOWNERS @@ -0,0 +1,22 @@ +# Global owners +* @njuguna-mureithi + +# CI/CD configuration +/.github/ @njuguna-mureithi +/deny.toml @njuguna-mureithi +/release.toml @njuguna-mureithi +/.clippy.toml @njuguna-mureithi +/rustfmt.toml @njuguna-mureithi + +# Documentation +/README.md @njuguna-mureithi +/CONTRIBUTING.md @njuguna-mureithi +/CHANGELOG.md @njuguna-mureithi +/SECURITY.md @njuguna-mureithi + +# Core library code +/src/ @njuguna-mureithi + +# Examples and benchmarks +/examples/ @njuguna-mureithi +/benches/ @njuguna-mureithi \ No newline at end of file diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..bc0d28e --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,190 @@ +# Contributing to apalis-rsmq + +Thank you for your interest in contributing to apalis-rsmq! This document provides guidelines and information about contributing to this project. + +## Code of Conduct + +This project adheres to the Rust Community [Code of Conduct](https://www.rust-lang.org/policies/code-of-conduct). By participating, you are expected to uphold this code. + +## Getting Started + +### Prerequisites + +- Rust 1.75.0 or later +- Redis server for testing +- Git + +### Setting Up Development Environment + +1. Fork the repository on GitHub +2. Clone your fork locally: + ```bash + git clone https://github.com/YOUR_USERNAME/apalis-rsmq.git + cd apalis-rsmq + ``` + +3. Set up the upstream remote: + ```bash + git remote add upstream https://github.com/apalis-dev/apalis-rsmq.git + ``` + +4. Install development dependencies: + ```bash + cargo build + ``` + +5. Run the tests to ensure everything is working: + ```bash + # Start Redis server first + export REDIS_URL=redis://localhost:6379 + cargo test + ``` + +## Development Workflow + +### Before Making Changes + +1. Create a new branch for your feature or bugfix: + ```bash + git checkout -b feature/your-feature-name + # or + git checkout -b fix/your-bugfix-name + ``` + +2. Make sure your branch is up to date: + ```bash + git fetch upstream + git rebase upstream/main + ``` + +### Making Changes + +1. Write your code following the existing code style +2. Add tests for any new functionality +3. Update documentation if necessary +4. Ensure all tests pass: + ```bash + cargo test + ``` + +5. Format your code: + ```bash + cargo fmt + ``` + +6. Run Clippy to catch common mistakes: + ```bash + cargo clippy -- -D warnings + ``` + +### Commit Guidelines + +We follow [Conventional Commits](https://www.conventionalcommits.org/) for commit messages: + +- `feat: add new feature` +- `fix: bug fix` +- `docs: documentation changes` +- `style: formatting, missing semicolons, etc.` +- `refactor: code refactoring` +- `test: adding tests` +- `chore: maintenance tasks` + +Examples: +``` +feat: add message deduplication support +fix: handle connection timeouts gracefully +docs: update README with new examples +test: add integration tests for error handling +``` + +### Submitting Changes + +1. Push your branch to your fork: + ```bash + git push origin your-branch-name + ``` + +2. Create a Pull Request on GitHub with: + - Clear title describing the change + - Detailed description of what was changed and why + - Link to any related issues + - Screenshots if applicable + +## Testing + +### Running Tests + +```bash +# Unit tests +cargo test --lib + +# Integration tests +cargo test --test integration + +# All tests +cargo test + +# With coverage +cargo tarpaulin --verbose --all-features --workspace --timeout 120 +``` + +### Writing Tests + +- Write tests for any new functionality +- Update existing tests when changing behavior +- Use descriptive test names +- Test both success and error cases +- Mock external dependencies when appropriate + +## Documentation + +### Code Documentation + +- Use `///` for public API documentation +- Use `//!` for module-level documentation +- Include examples in documentation when helpful +- Keep documentation up to date with code changes + +### README and Guides + +- Update README.md if adding new features +- Add examples for new functionality +- Keep installation and usage instructions current + +## Performance + +- Be mindful of performance implications +- Add benchmarks for performance-critical code +- Profile changes that might affect performance +- Consider memory usage and allocations + +## Security + +- Be security-conscious in your code +- Don't introduce dependencies with known vulnerabilities +- Follow secure coding practices +- Report security issues privately to the maintainers + +## Release Process + +Releases are automated through GitHub Actions: + +1. Commits to `main` trigger automatic version bumping based on conventional commits +2. Releases are automatically published to crates.io +3. GitHub releases are created with changelog + +## Getting Help + +- Open an issue for bugs or feature requests +- Join discussions in existing issues +- Ask questions in the discussions section + +## License + +By contributing to apalis-rsmq, you agree that your contributions will be licensed under the same license as the project (MIT or Apache-2.0). + +## Recognition + +Contributors will be recognized in the project's README and releases. + +Thank you for contributing to apalis-rsmq! 🎉 \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..d7a5507 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "apalis-rsmq" +version = "0.1.0-alpha.1" +edition = "2021" +rust-version = "1.75.0" +repository = "https://github.com/apalis-dev/apalis-rsmq" +authors = ["Njuguna Mureithi "] +description = "A redis-backed message queue build with rust, apalis and rsmq-async" +readme = "README.md" +license = "MIT OR Apache-2.0" +keywords = ["redis", "task", "message-queue", "worker"] +categories = ["database"] + +[dependencies] +apalis-core = { version = "1.0.0-alpha.2", features = ["json", "sleep"] } +rsmq_async = "13.0.0" +futures = "0.3" +serde = "1" +tracing = { version = "0.1.41" } +chrono = { version = "0.4", features = ["serde"] } +serde_json = "1.0" +pin-project-lite = "0.2" + +[dev-dependencies] +apalis = { version = "1.0.0-alpha.2" } +tokio = { version = "1", features = ["full"] } +tracing-subscriber = "0.3" +apalis-workflow = { version = "0.1.0-alpha.2" } +criterion = { version = "0.5", features = ["html_reports"] } + +[[bench]] +name = "queue_performance" +harness = false diff --git a/README.md b/README.md index 8c84d62..0ddcaaa 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,136 @@ # apalis-rsmq -Message queueing with apalis and rsmq + +[![CI](https://github.com/apalis-dev/apalis-rsmq/workflows/CI/badge.svg)](https://github.com/apalis-dev/apalis-rsmq/actions/workflows/ci.yml) +[![codecov](https://codecov.io/gh/apalis-dev/apalis-rsmq/branch/main/graph/badge.svg)](https://codecov.io/gh/apalis-dev/apalis-rsmq) +[![Crates.io](https://img.shields.io/crates/v/apalis-rsmq.svg)](https://crates.io/crates/apalis-rsmq) +[![Documentation](https://docs.rs/apalis-rsmq/badge.svg)](https://docs.rs/apalis-rsmq) +[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) +[![MSRV](https://img.shields.io/badge/MSRV-1.75.0-blue.svg)](https://blog.rust-lang.org/2023/12/28/Rust-1.75.0.html) + +A redis-backed message queue built with rust, apalis and rsmq-async + +**apalis-rsmq** is a message queue implementation that integrates with [`apalis`] to provide a +Redis-based backend for message processing. It uses [`rsmq_async`] for Redis Simple Message Queue (RSMQ) interactions. + +## 🚀 Features + +- **High Performance**: Built with Rust for maximum performance and safety +- **Message Enqueue & Dequeue**: Supports adding and retrieving messages from Redis queues +- **Acknowledgments**: Messages can be acknowledged and removed from the queue once processed successfully +- **Configurable Polling**: Adjustable polling intervals +- **Automatic Message Processing**: Works with [`Backend`] to process messages asynchronously +- **Type Safety**: Fully typed message handling with Serde +- **Observability**: Built-in tracing and monitoring capabilities + +## Installation + +Add this to your `Cargo.toml`: + +```toml +[dependencies] +apalis-rsmq = "0.1.0-alpha.1" +apalis = "1.0.0-alpha.2" +serde = { version = "1.0", features = ["derive"] } +futures = "0.3" +tracing = "0.1" +``` + +## Usage + +### **Creating a Message Queue** + +```rust +use apalis_rsmq::{RedisMq, Config}; +use rsmq_async::Rsmq; +use std::time::Duration; +use rsmq_async::RsmqConnection; + +#[tokio::main] +async fn main() { + let mut conn = Rsmq::new(Default::default()).await.unwrap(); + let _ = conn.create_queue("email", None, None, None).await; + let mut config = Config::default(); + config.set_namespace("email".to_owned()); + let mq: RedisMq = RedisMq::new(conn, config); +} +``` + +### **Enqueuing Messages** + +```rust +use apalis_rsmq::RedisMq; +use apalis_core::backend::TaskSink; + +async fn enqueue_message(mq: &mut RedisMq) { + mq.push("Hello, Redis!".to_string()).await.unwrap(); +} +``` + +### **Processing Messages Automatically** + +```rust +use apalis::prelude::*; +use apalis_rsmq::RedisMq; +async fn task(message: String) { + // Do something with message +} + +async fn start_worker(mq: RedisMq) { + let worker = WorkerBuilder::new("string-worker").backend(mq).build(task); + worker.run().await.unwrap(); +} +``` + +## Development + +### Prerequisites + +- Rust 1.75.0 or later +- Redis server for testing + +### Building + +```bash +cargo build +``` + +### Testing + +Start a Redis server, then run: + +```bash +export REDIS_URL=redis://localhost:6379 +cargo test +``` + +### Formatting and Linting + +```bash +cargo fmt +cargo clippy +``` + +## CI/CD + +This project uses comprehensive GitHub Actions workflows: + +- **CI Pipeline**: Automated testing, linting, security audits, and code coverage +- **Release Automation**: Semantic versioning and automated publishing to crates.io +- **Dependency Management**: Automated dependency updates and security audits +- **Documentation**: Automated documentation building and deployment +- **Benchmarking**: Performance regression detection + +## Contributing + +We welcome contributions! Please follow these steps: + +1. Fork the repository +2. Create a feature branch +3. Make your changes +4. Add tests for your changes +5. Ensure all tests pass and code is formatted +6. Submit a pull request + +## License + +Licensed under **MIT** or **Apache-2.0**. diff --git a/SECURITY.md b/SECURITY.md new file mode 100644 index 0000000..ff556cb --- /dev/null +++ b/SECURITY.md @@ -0,0 +1,68 @@ +# Security Policy + +## Supported Versions + +We actively support the following versions of apalis-rsmq: + +| Version | Supported | +| ------- | ------------------ | +| 0.1.x | :white_check_mark: | + +## Reporting a Vulnerability + +We take security vulnerabilities seriously. If you discover a security vulnerability in apalis-rsmq, please report it privately. + +### How to Report + +**Please do NOT create a public GitHub issue for security vulnerabilities.** + +Instead, please send an email to: [security@apalis.dev](mailto:security@apalis.dev) + +Include the following information: + +- A clear description of the vulnerability +- Steps to reproduce the issue +- Potential impact of the vulnerability +- Any suggested fixes or mitigations + +### Response Timeline + +- **Acknowledgment**: We will acknowledge receipt of your report within 2 business days +- **Assessment**: We will assess the vulnerability and provide an initial response within 5 business days +- **Resolution**: We aim to resolve critical vulnerabilities within 30 days + +### Security Measures + +Our project includes several automated security measures: + +- **Dependency Auditing**: Automated scanning for known vulnerabilities in dependencies +- **Static Analysis**: Code analysis for potential security issues +- **Regular Updates**: Automated dependency updates to address security patches + +### Responsible Disclosure + +We believe in responsible disclosure. We ask that you: + +- Give us reasonable time to investigate and fix the issue +- Do not publicly disclose the vulnerability until we have released a fix +- Do not exploit the vulnerability for malicious purposes + +### Recognition + +We appreciate security researchers who help make apalis-rsmq safer. With your permission, we will acknowledge your contribution in: + +- Security advisories +- Release notes +- Project contributors list + +## Security Best Practices + +When using apalis-rsmq: + +- Keep your dependencies up to date +- Use secure Redis configurations (authentication, TLS, firewall rules) +- Validate and sanitize message content +- Monitor for unusual activity +- Follow the principle of least privilege for Redis access + +Thank you for helping keep apalis-rsmq secure! \ No newline at end of file diff --git a/benches/queue_performance.rs b/benches/queue_performance.rs new file mode 100644 index 0000000..9ccd8aa --- /dev/null +++ b/benches/queue_performance.rs @@ -0,0 +1,66 @@ +use apalis_core::backend::TaskSink; +use apalis_rsmq::{Config, RedisMq}; +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use rsmq_async::{Rsmq, RsmqConnection}; +use tokio::runtime::Runtime; + +fn benchmark_queue_operations(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + + // Skip benchmarks if Redis is not available + let _redis_url = + std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string()); + + // Test if Redis is available + if rt + .block_on(async { + let conn = Rsmq::new(Default::default()).await?; + Ok::<_, Box>(conn) + }) + .is_err() + { + eprintln!("Skipping benchmarks: Redis not available"); + return; + } + + c.bench_function("push_message", |b| { + b.iter(|| { + rt.block_on(async { + let mut conn = Rsmq::new(Default::default()).await.unwrap(); + let _ = conn.create_queue("bench_queue", None, None, None).await; + + let mut config = Config::default(); + config.set_namespace("bench_queue".to_owned()); + let mut mq: RedisMq = RedisMq::new(conn, config); + + mq.push(black_box("benchmark message".to_string())) + .await + .unwrap(); + }) + }); + }); + + c.bench_function("push_multiple_messages", |b| { + b.iter(|| { + rt.block_on(async { + let mut conn = Rsmq::new(Default::default()).await.unwrap(); + let _ = conn + .create_queue("bench_queue_multi", None, None, None) + .await; + + let mut config = Config::default(); + config.set_namespace("bench_queue_multi".to_owned()); + let mut mq: RedisMq = RedisMq::new(conn, config); + + for i in 0..10 { + mq.push(black_box(format!("benchmark message {}", i))) + .await + .unwrap(); + } + }) + }); + }); +} + +criterion_group!(benches, benchmark_queue_operations); +criterion_main!(benches); diff --git a/deny.toml b/deny.toml new file mode 100644 index 0000000..8713200 --- /dev/null +++ b/deny.toml @@ -0,0 +1,123 @@ +# cargo-deny configuration file +# https://embarkstudios.github.io/cargo-deny/ + +[graph] +# If true, metadata will be collected with `--all-features`. Note that this can't +# be toggled off if true, if you want to conditionally enable it, use `--all-features` +all-features = true + +[output] +# When outputting inclusion graphs in diagnostics that include features, this +# option can be used to specify the depth at which feature edges will be added. +# This option is included since the graphs can be quite large and the addition +# of features from the crate(s) being built can be far too verbose. This option +# can be overridden via `--feature-depth` on the cmd line +feature-depth = 1 + +[advisories] +# The path where the advisory database is cloned/fetched into +db-path = "~/.cargo/advisory-db" +# The url(s) of the advisory databases to use +db-urls = ["https://github.com/rustsec/advisory-db"] +# The lint level for security vulnerabilities +vulnerability = "deny" +# The lint level for unmaintained crates +unmaintained = "warn" +# The lint level for crates that have been yanked from their source registry +yanked = "warn" +# The lint level for crates with security notices. Note that as of +# 2019-12-17 there are no security notice advisories in +# https://github.com/rustsec/advisory-db +notice = "warn" +# A list of advisory IDs to ignore. Note that ignored advisories will still +# output a note when they are encountered. +ignore = [ + #"RUSTSEC-0000-0000", +] + +[licenses] +# The confidence threshold for detecting a license from a license text. +# confidence-threshold = 0.8 +# Allow 1 or more licenses on a per-crate basis, so that particular licenses +# aren't accepted for every possible crate as with the normal allow list +exceptions = [ + # Each entry is the crate and version constraint, and its the license + #{ allow = ["Zlib"], name = "adler32", version = "*" }, +] +# Lint level for when multiple versions of the same license are detected +multiple-versions = "warn" +# List of explicitly allowed licenses +# See https://spdx.org/licenses/ for list of possible licenses +allow = [ + "MIT", + "Apache-2.0", + "Apache-2.0 WITH LLVM-exception", + "BSD-2-Clause", + "BSD-3-Clause", + "ISC", + "Unicode-DFS-2016", + "CC0-1.0", +] +# List of explicitly disallowed licenses +# See https://spdx.org/licenses/ for list of possible licenses +deny = [ + "GPL-2.0", + "GPL-3.0", + "AGPL-1.0", + "AGPL-3.0", +] +# Lint level for licenses not in the allow or deny lists +copyleft = "warn" +# Blanket approval for OSI-approved licenses +# https://opensource.org/licenses/alphabetical +allow-osi-fsf-free = "neither" +# Lint level for licenses considered copyleft +default = "deny" + +[bans] +# Lint level for when multiple versions of the same crate are detected +multiple-versions = "warn" +# Lint level for when a crate version requirement is `*` +wildcards = "allow" +# The graph highlighting used when creating dotgraphs for crates +# with multiple versions +# * lowest-version - The path to the lowest versioned duplicate is highlighted +# * simplest-path - The path to the version with the fewest edges is highlighted +# * all - Both lowest-version and simplest-path are used +highlight = "all" +# List of crates that are allowed. Use with care! +allow = [ + #{ name = "ansi_term", version = "=0.11.0" }, +] +# List of crates to deny +deny = [ + # Each entry the name of a crate and a version range. If version is + # not specified, all versions will be matched. + #{ name = "ansi_term", version = "=0.11.0" }, + + # Wrapper crates can optionally be specified to allow the crate when it + # is a direct dependency of the otherwise banned crate + #{ name = "ansi_term", version = "=0.11.0", wrappers = [] }, +] +# Certain crates/versions that will be skipped when doing duplicate detection. +skip = [ + #{ name = "ansi_term", version = "=0.11.0" }, +] +# Similarly to `skip` allows you to skip certain crates from being picked up by +# the duplicate detection, but unlike skip, it also includes the entire tree of +# transitive dependencies starting at the specified crate, up to a certain depth. +skip-tree = [ + #{ name = "ansi_term", version = "=0.11.0", depth = 20 }, +] + +[sources] +# Lint level for what to happen when a crate from a crate registry that is +# not in the allow list is encountered +unknown-registry = "warn" +# Lint level for what to happen when a crate from a git repository that is not +# in the allow list is encountered +unknown-git = "warn" +# List of allowed registries +allow-registry = ["https://github.com/rust-lang/crates.io-index"] +# List of allowed Git repositories +allow-git = [] \ No newline at end of file diff --git a/examples/basic.rs b/examples/basic.rs new file mode 100644 index 0000000..abf51cf --- /dev/null +++ b/examples/basic.rs @@ -0,0 +1,56 @@ +use apalis_core::{ + backend::TaskSink, + error::BoxDynError, + worker::{builder::WorkerBuilder, context::WorkerContext}, +}; +use apalis_rsmq::{Config, RedisMq}; +use rsmq_async::RsmqConnection; + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +struct Email { + to: String, + text: String, + subject: String, +} + +async fn produce_jobs(mq: &mut RedisMq) -> Result<(), rsmq_async::RsmqError> { + for index in 0..100 { + mq.push(Email { + to: index.to_string(), + text: "Test background job from apalis".to_owned(), + subject: "Background email job".to_owned(), + }) + .await?; + println!("Produced job: {}", index); + } + Ok(()) +} + +async fn send_email(job: Email, wrk: WorkerContext) -> Result<(), BoxDynError> { + tracing::info!("Sending email to: {}", job.to); + if job.to == "99" { + tokio::time::sleep(std::time::Duration::from_secs(6)).await; + wrk.stop().unwrap(); + } + + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), rsmq_async::RsmqError> { + tracing_subscriber::fmt::init(); + + let mut conn = rsmq_async::Rsmq::new(Default::default()).await?; + let _ = conn.create_queue("email", None, None, None).await; + let mut config = Config::default(); + config.set_namespace("email".to_owned()); + let mut mq = RedisMq::new(conn, config); + produce_jobs(&mut mq).await?; + + let worker = WorkerBuilder::new("rango-tango") + .backend(mq) + .build(send_email); + + worker.run().await.unwrap(); + Ok(()) +} diff --git a/examples/workflow.rs b/examples/workflow.rs new file mode 100644 index 0000000..bab3670 --- /dev/null +++ b/examples/workflow.rs @@ -0,0 +1,57 @@ +use std::time::Duration; + +use apalis_core::{ + error::BoxDynError, + worker::{builder::WorkerBuilder, context::WorkerContext}, +}; +use apalis_rsmq::{Config, RedisMq}; +use apalis_workflow::{TaskFlowSink, WorkFlow}; +use rsmq_async::RsmqConnection; + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +struct Email { + to: String, + text: String, + subject: String, +} + +async fn send_email(job: Email, wrk: WorkerContext) -> Result<(), BoxDynError> { + tracing::info!("Sending email to: {}", job.to); + + tokio::time::sleep(std::time::Duration::from_secs(6)).await; + wrk.stop().unwrap(); + + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), rsmq_async::RsmqError> { + tracing_subscriber::fmt::init(); + + let mut conn = rsmq_async::Rsmq::new(Default::default()).await?; + let _ = conn.create_queue("email", None, None, None).await; + let mut config = Config::default(); + config.set_namespace("email".to_owned()); + let mut mq = RedisMq::new(conn, config); + mq.push_start(Email { + to: "0".to_string(), + text: "Test background job from apalis".to_owned(), + subject: "Background email job".to_owned(), + }) + .await + .unwrap(); + + let workflow = WorkFlow::new("email-workflow") + .then(send_email) + .delay_for(Duration::from_secs(2)) + .then(|_| async { + println!("Done processing workflow"); + }); + + let worker = WorkerBuilder::new("rango-tango") + .backend(mq) + .build(workflow); + + worker.run().await.unwrap(); + Ok(()) +} diff --git a/release.toml b/release.toml new file mode 100644 index 0000000..186000e --- /dev/null +++ b/release.toml @@ -0,0 +1,31 @@ +# cargo-release configuration + +# Use semantic versioning +sign-commit = false +sign-tag = false +push-remote = "origin" +shared-version = true + +# Pre-release checks +pre-release-replacements = [ + {file="README.md", search="apalis-rsmq = \"[^\"]+\"", replace="apalis-rsmq = \"{{version}}\""}, + {file="src/lib.rs", search="html_root_url = \"[^\"]+\"", replace="html_root_url = \"https://docs.rs/apalis-rsmq/{{version}}\"", exactly=1}, +] + +# Post-release actions +post-release-replacements = [] + +# What to do after tagging and before pushing +pre-release-hook = ["cargo", "test", "--all-features"] + +# Customization of the commit messages +tag-message = "chore: release v{{version}}" +tag-name = "v{{version}}" + +# Don't publish pre-releases to crates.io automatically +publish = false + +[[pre-release-replacements]] +file = "CHANGELOG.md" +search = "## Unreleased" +replace = "## Unreleased\n\n## [{{version}}] - {{date}}" \ No newline at end of file diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..51d03ce --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1,8 @@ +# rustfmt configuration +edition = "2021" +max_width = 100 +hard_tabs = false +tab_spaces = 4 +newline_style = "Unix" +reorder_imports = true +reorder_modules = true \ No newline at end of file diff --git a/src/ack.rs b/src/ack.rs new file mode 100644 index 0000000..730dab7 --- /dev/null +++ b/src/ack.rs @@ -0,0 +1,47 @@ +use std::fmt::Debug; + +use crate::{context::RedisMqContext, RedisMq}; +use apalis_core::{error::BoxDynError, task::Parts, worker::ext::ack::Acknowledge}; +use futures::{ + future::{self, BoxFuture}, + FutureExt, +}; +use rsmq_async::{RsmqConnection, RsmqError}; + +impl Acknowledge for RedisMq +where + T: Send, + Res: Debug + Send + Sync, + C: Send, +{ + type Error = RsmqError; + + type Future = BoxFuture<'static, Result<(), Self::Error>>; + + fn ack( + &mut self, + res: &Result, + parts: &Parts, + ) -> Self::Future { + if res.is_ok() || parts.attempt.current() >= parts.ctx.max_attempts() { + let task_id = parts.task_id.as_ref().unwrap().inner().to_owned(); + let namespace = self.config.namespace().to_owned(); + let mut conn = self.conn.clone(); + + let fut = async move { + conn.delete_message(&namespace, &task_id) + .map(move |r| match r { + Err(e) => Err(e), + Ok(true) => Ok(()), + Ok(false) => { + Err(RsmqError::MissingParameter("TaskId not found".to_owned())) + } + }) + .await?; + Ok(()) + }; + return fut.boxed(); + } + future::ready(Ok(())).boxed() + } +} diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..69aa11a --- /dev/null +++ b/src/config.rs @@ -0,0 +1,68 @@ +//! Configuration for RedisMq +use std::time::Duration; + +use apalis_core::backend::poll_strategy::{ + BackoffConfig, IntervalStrategy, MultiStrategy, StrategyBuilder, +}; + +/// Configuration for RedisMq +#[derive(Clone, Debug)] +pub struct Config { + poll_strategy: MultiStrategy, + buffer_size: usize, + namespace: String, +} + +impl Config { + /// Creates a new configuration + pub fn new(poll_strategy: MultiStrategy, buffer_size: usize, namespace: String) -> Self { + Self { + poll_strategy, + buffer_size, + namespace, + } + } + + /// Gets the namespace + pub fn namespace(&self) -> &str { + &self.namespace + } + + /// Sets the namespace + pub fn set_namespace(&mut self, namespace: String) { + self.namespace = namespace; + } + + /// Gets the polling strategy + pub fn poll_strategy(&self) -> &MultiStrategy { + &self.poll_strategy + } + + /// Sets the polling strategy + pub fn set_poll_strategy(&mut self, strategy: MultiStrategy) { + self.poll_strategy = strategy; + } + + /// Gets the buffer size + pub fn buffer_size(&self) -> usize { + self.buffer_size + } + + /// Sets the buffer size + pub fn set_buffer_size(&mut self, size: usize) { + self.buffer_size = size; + } +} + +impl Default for Config { + fn default() -> Self { + let config = BackoffConfig::default().with_jitter(0.9); + let interval = IntervalStrategy::new(Duration::from_millis(50)).with_backoff(config); + let poll_strategy = StrategyBuilder::new().apply(interval).build(); + Self { + poll_strategy, + buffer_size: 100, + namespace: "default_queue".to_string(), + } + } +} diff --git a/src/context.rs b/src/context.rs new file mode 100644 index 0000000..4e8effb --- /dev/null +++ b/src/context.rs @@ -0,0 +1,76 @@ +use std::{any::type_name, convert::Infallible}; + +use apalis_core::{task::metadata::MetadataExt, task_fn::FromRequest}; +use serde::{ + de::{DeserializeOwned, Error}, + Deserialize, Serialize, +}; +use serde_json::Map; + +use crate::RsMqTask; + +/// Context for RedisMq messages +#[derive(Clone, Debug, Serialize, Deserialize, Default)] +pub struct RedisMqContext { + max_attempts: usize, + meta: Map, +} + +impl RedisMqContext { + /// Creates a new RedisMqContext + pub fn new(max_attempts: usize) -> Self { + Self { + max_attempts, + meta: Map::new(), + } + } + + /// Sets the max attempts + pub fn with_max_attempts(mut self, max_attempts: usize) -> Self { + self.max_attempts = max_attempts; + self + } + + /// Sets the metadata + pub fn with_meta(mut self, meta: Map) -> Self { + self.meta = meta; + self + } + + /// Gets the max attempts + pub fn max_attempts(&self) -> usize { + self.max_attempts + } + + /// Gets the metadata + pub fn meta(&self) -> &Map { + &self.meta + } +} + +impl FromRequest> for RedisMqContext { + type Error = Infallible; + async fn from_request(req: &RsMqTask) -> Result { + Ok(req.parts.ctx.clone()) + } +} + +impl MetadataExt for RedisMqContext { + type Error = serde_json::Error; + fn inject(&mut self, value: T) -> Result<(), Self::Error> { + let json_value = serde_json::to_value(value)?; + self.meta.insert(type_name::().to_owned(), json_value); + Ok(()) + } + fn extract(&self) -> Result { + if let Some(value) = self.meta.get(type_name::()) { + let deserialized: T = T::deserialize(value)?; + Ok(deserialized) + } else { + Err(serde_json::Error::custom(format!( + "No metadata found for type: {}", + type_name::() + ))) + } + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..45f6e89 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,209 @@ +#![doc = include_str!("../README.md")] +#![warn( + missing_debug_implementations, + missing_docs, + rust_2018_idioms, + unreachable_pub, + bad_style, + dead_code, + improper_ctypes, + non_shorthand_field_patterns, + no_mangle_generic_items, + overflowing_literals, + path_statements, + patterns_in_fns_without_body, + unconditional_recursion, + unused, + unused_allocation, + unused_comparisons, + unused_parens, + while_true +)] + +use std::{ + fmt::Debug, + marker::PhantomData, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::Instant, +}; + +use apalis_core::{ + backend::{ + codec::{json::JsonCodec, Codec}, + poll_strategy::{PollContext, PollStrategyExt}, + Backend, TaskStream, + }, + features_table, + task::{attempt::Attempt, builder::TaskBuilder, task_id::TaskId, Task}, + worker::{context::WorkerContext, ext::ack::AcknowledgeLayer}, +}; +use futures::{ + future, + stream::{self, BoxStream}, + StreamExt, +}; +use rsmq_async::{Rsmq, RsmqConnection, RsmqError}; +use serde::de::DeserializeOwned; +use tracing::{error, trace, warn}; + +use crate::sink::RsMqSink; + +mod ack; +mod config; +mod context; +mod sink; + +pub use crate::config::Config; +pub use crate::context::RedisMqContext; + +type RsMqTask = Task; + +pin_project_lite::pin_project! { + /// Redis-backed message queue + /// + #[doc = features_table! { + setup = { + use apalis_rsmq::Config; + use apalis_rsmq::RedisMq; + use rsmq_async::RsmqConnection; + + let mut conn = rsmq_async::Rsmq::new(Default::default()).await.unwrap(); + let _ = conn.create_queue("test", None, None, None).await; + let mut config = Config::default(); + config.set_namespace("test".to_owned()); + let mut mq = RedisMq::new(conn, config); + mq + };, + TaskSink => supported("Ability to push new tasks"), + Serialization => supported("Serialization support for arguments. Accepts any bytes codec", false), + FetchById => not_implemented("Allow fetching a task by its ID"), + RegisterWorker => not_supported("Allow registering a worker with the backend"), + PipeExt => supported("Allow other backends to pipe to this backend", false), + MakeShared => supported("Share the same JSON storage across multiple workers", false), + Workflow => supported("Flexible enough to support workflows", false), + WaitForCompletion => supported("Wait for tasks to complete without blocking", false), + ResumeById => not_implemented("Resume a task by its ID"), + ResumeAbandoned => not_implemented("Resume abandoned tasks"), + ListWorkers => not_supported("List all workers registered with the backend"), + ListTasks => not_implemented("List all tasks in the backend"), + }] + #[derive(Debug)] + pub struct RedisMq>> { + conn: Rsmq, + config: Config, + msg_type: PhantomData, + codec: PhantomData, + #[pin] + sink: RsMqSink, + } +} + +impl RedisMq>> { + /// Creates a new RedisMq instance + pub fn new(conn: Rsmq, config: Config) -> RedisMq>> { + RedisMq { + sink: RsMqSink::new(conn.clone(), config.clone()), + conn, + config, + msg_type: PhantomData, + codec: PhantomData, + } + } +} + +impl RedisMq { + /// Gets the configuration + pub fn config(&self) -> &Config { + &self.config + } +} + +// Implement Clone manually +impl Clone for RedisMq { + fn clone(&self) -> Self { + RedisMq { + conn: self.conn.clone(), + msg_type: PhantomData, + config: self.config.clone(), + codec: PhantomData, + sink: RsMqSink::new(self.conn.clone(), self.config.clone()), + } + } +} + +impl Backend for RedisMq +where + Args: Send + DeserializeOwned + 'static, + C: Codec, Compact = Vec>, + C::Error: std::error::Error + Send, +{ + type Stream = TaskStream, RsmqError>; + type Layer = AcknowledgeLayer; + type Codec = C; + type Context = RedisMqContext; + type Error = RsmqError; + type Beat = BoxStream<'static, Result<(), RsmqError>>; + type IdType = String; + + fn heartbeat(&self, _: &WorkerContext) -> Self::Beat { + stream::once(future::ready(Ok(()))).boxed() + } + + fn middleware(&self) -> Self::Layer { + AcknowledgeLayer::new(self.clone()) + } + + fn poll(self, worker: &WorkerContext) -> Self::Stream { + let poll_strategy = self.config.poll_strategy().clone(); + let prev_count = Arc::new(AtomicUsize::new(0)); + let ctx = PollContext::new(worker.clone(), prev_count.clone()); + let throttle = poll_strategy.build_stream(&ctx); + + let stream = futures::stream::unfold(throttle, move |mut throttle| { + let mut conn = self.conn.clone(); + let namespace = self.config.namespace().to_string(); + let prev_count = prev_count.clone(); + async move { + let instant = Instant::now(); + throttle.next().await; + trace!("Polling new messages after {:?}", instant.elapsed()); + match conn.receive_message(&namespace, None).await { + Ok(Some(r)) => match C::decode(&r.message) { + Ok(msg) => { + let task = TaskBuilder::new(msg.task) + .with_ctx(msg.context) + .with_task_id(TaskId::new(r.id)) + .with_attempt(Attempt::new_with_value(r.rc as usize)) + .build(); + prev_count.store(1, Ordering::SeqCst); + Some((Ok(Some(task)), throttle)) + } + Err(e) => { + error!("Failed to decode message: {:?}", e); + Some((Err(RsmqError::InvalidFormat(e.to_string())), throttle)) + } + }, + Ok(None) => { + prev_count.store(0, Ordering::SeqCst); + Some((Ok(None), throttle)) + } + Err(e) => { + error!("Error receiving message: {:?}", e); + Some((Err(e), throttle)) + } + } + } + }); + + stream.boxed() + } +} + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +struct PrimitiveMessage { + task: T, + context: RedisMqContext, +} diff --git a/src/sink.rs b/src/sink.rs new file mode 100644 index 0000000..50a5026 --- /dev/null +++ b/src/sink.rs @@ -0,0 +1,158 @@ +use std::{ + collections::VecDeque, + future::Future, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; + +use apalis_core::backend::codec::Codec; +use chrono::{TimeZone, Utc}; +use futures::{FutureExt, Sink}; +use rsmq_async::{Rsmq, RsmqConnection, RsmqError}; + +use crate::{config::Config, PrimitiveMessage, RedisMq, RsMqTask}; + +pin_project_lite::pin_project! { + pub(super) struct RsMqSink { + conn: Rsmq, + config: Config, + items: VecDeque>, + pending_sends: VecDeque, + _codec: std::marker::PhantomData, + } +} +impl Clone for RsMqSink { + fn clone(&self) -> Self { + Self { + conn: self.conn.clone(), + config: self.config.clone(), + items: VecDeque::new(), + pending_sends: VecDeque::new(), + _codec: std::marker::PhantomData, + } + } +} + +impl std::fmt::Debug for RsMqSink { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RsMqSink") + .field("config", &self.config) + .field("items_len", &self.items.len()) + .field("pending_sends_len", &self.pending_sends.len()) + .finish() + } +} + +impl RsMqSink { + pub(crate) fn new(conn: Rsmq, config: Config) -> Self { + Self { + conn, + config, + items: VecDeque::new(), + pending_sends: VecDeque::new(), + _codec: std::marker::PhantomData, + } + } +} + +struct PendingSend { + future: Pin> + Send + 'static>>, +} +// SAFETY: PendingSend contains a Pin>, which is Send but not Sync. +unsafe impl Sync for PendingSend {} + +impl Sink> for RedisMq +where + C::Error: std::error::Error + Send, + C: Codec, Compact = Vec>, +{ + type Error = RsmqError; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // First, try to flush any pending sends + let this = self.get_mut(); + + // Poll pending sends + while let Some(pending) = this.sink.pending_sends.front_mut() { + match pending.future.as_mut().poll(cx) { + Poll::Ready(Ok(_)) => { + this.sink.pending_sends.pop_front(); + } + Poll::Ready(Err(e)) => { + this.sink.pending_sends.pop_front(); + return Poll::Ready(Err(e)); + } + Poll::Pending => { + return Poll::Pending; + } + } + } + + Poll::Ready(Ok(())) + } + + fn start_send(self: Pin<&mut Self>, item: RsMqTask) -> Result<(), Self::Error> { + let this = self.project().sink; + let items = this.get_mut(); + items.items.push_back(item); + Ok(()) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + let namespace = this.config.namespace(); + + // First, convert any queued items to pending sends + while let Some(item) = this.sink.items.pop_front() { + let delay = delay_until(item.parts.run_at); + let bytes = match C::encode(&PrimitiveMessage { + task: item.args, + context: item.parts.ctx, + }) { + Ok(bytes) => bytes, + Err(e) => return Poll::Ready(Err(RsmqError::InvalidFormat(e.to_string()))), + }; + let mut conn = this.conn.clone(); + let namespace = namespace.to_string(); + // Create the future but don't poll it yet + let future = + async move { conn.send_message(&namespace, bytes, Some(delay)).await }.boxed(); + this.sink.pending_sends.push_back(PendingSend { future }); + } + + // Now poll all pending sends + while let Some(pending) = this.sink.pending_sends.front_mut() { + match pending.future.as_mut().poll(cx) { + Poll::Ready(Ok(_)) => { + this.sink.pending_sends.pop_front(); + } + Poll::Ready(Err(e)) => { + this.sink.pending_sends.pop_front(); + return Poll::Ready(Err(e)); + } + Poll::Pending => { + return Poll::Pending; + } + } + } + + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_flush(cx) + } +} + +fn delay_until(timestamp: u64) -> Duration { + let target_time = Utc.timestamp_opt(timestamp as i64, 0).unwrap(); + let now = Utc::now(); + + if target_time <= now { + Duration::from_secs(0) + } else { + let diff = target_time - now; + Duration::from_secs(diff.num_seconds() as u64) + } +}