-
-
Notifications
You must be signed in to change notification settings - Fork 80
Description
Hi! I'm building a distributed job processing pipeline with Apalis and have a question about the best pattern for cross-job coordination.
Use Case
I have a pipeline where:
JobTypeAis processed- Worker pushes 4 separate
JobTypeBinstances (different variants/parameters) - These 4 jobs are processed in parallel by different workers (potentially on different machines)
- Need to collect all 4 results before proceeding to next stage (
JobTypeC)
Requirements
- Different job types for machine targeting (some jobs need GPU, others CPU only)
- Multi-machine deployment (workers on different servers)
- Fan-in coordination (collect N parallel results into 1)
What I've Tried
Option 1: apalis-workflow with .filter_map()
I initially thought workflows could handle this:
Workflow::new("pipeline")
.and_then(process_step_1)
.and_then(|result| Ok(vec![variant1, variant2, variant3, variant4]))
.filter_map(|item| async { process(item) }) // Distributed?
.and_then(|collected: Vec<_>| async { ... })Question: Does .filter_map() distribute items across workers on different machines?
Looking at the implementation, it uses backend.wait_for(task_ids) which seems perfect, but I'm not sure if this works when I want:
- Separate worker types (
PostgresStorage<JobTypeB>on specific machines only) - Items pushed as separate top-level jobs (not sub-tasks of a workflow)
Option 2: Manual coordination with PostgreSQL table
Currently using:
// Worker Type 1: Process JobTypeA
async fn process_a(job: JobTypeA, storage_b: Data<PostgresStorage<JobTypeB>>) {
let result = process(job).await?;
// Push N separate jobs
for variant in variants {
storage_b.push(JobTypeB { result, variant }).await?;
}
}
// Worker Type 2: Process JobTypeB (different worker type, different machines!)
async fn process_b(job: JobTypeB, pool: Data<PgPool>, storage_c: Data<PostgresStorage<JobTypeC>>) {
let result = process(job).await?;
// Manual coordination via PostgreSQL table
sqlx::query("INSERT INTO coordination_table ...").execute(pool).await?;
let count: i64 = sqlx::query_scalar("SELECT COUNT(*) WHERE batch_id = $1").fetch_one(pool).await?;
if count == N {
// Last worker triggers next stage
storage_c.push(JobTypeC { ... }).await?;
}
}This works but feels like I'm reimplementing what workflows already do.
Questions
-
Can workflow
.filter_map()work across separate worker types?- Can Worker Type A's workflow use
.filter_map()to create items that are processed by Worker Type B (differentPostgresStorage<T>)?
- Can Worker Type A's workflow use
-
Is there a built-in pattern for cross-job coordination?
- The workflow collector uses
backend.wait_for(task_ids)- can I use these primitives directly for cross-job coordination?
- The workflow collector uses
-
What's the recommended pattern for fan-in when:
- Jobs are separate instances (not items within a single workflow execution)
- Workers are machine-specific (different worker types for different hardware)
- Multiple processes/machines involved
Context
- Using
apalis-postgresbackend - Multi-machine deployment (5-10 servers)
- Different worker types need different hardware (GPU vs CPU)
- Each worker type has different heavy dependencies
Thanks for any guidance! Apalis has been excellent so far, just want to make sure I'm using the right pattern for this scenario.
Apalis version: 1.0.0-beta.1