Question: Pattern for cross-job fan-in coordination with machine-specific workers? #637
-
|
Hi! I'm building a distributed job processing pipeline with Apalis and have a question about the best pattern for cross-job coordination. Use CaseI have a pipeline where:
Requirements
What I've TriedOption 1: apalis-workflow with
|
Beta Was this translation helpful? Give feedback.
Replies: 5 comments 2 replies
-
|
As indicated in the docs of
You dont need to use |
Beta Was this translation helpful? Give feedback.
-
|
Thanks for the quick response! Scenario: // Worker Type A (GPU machines)
let storage_a: PostgresStorage<JobTypeA> = PostgresStorage::new(&pool);
async fn handler_a(job: JobTypeA, storage_b: Data<PostgresStorage<JobTypeB>>) {
let result = process(job).await?;
// Push 4 separate JobTypeB instances
for variant in [V1, V2, V3, V4] {
storage_b.push(JobTypeB { result, variant }).await?;
}
}
WorkerBuilder::new("worker-a")
.backend(storage_a) // Polls JobTypeA
.data(storage_b)
.build(handler_a)
// Worker Type B (CPU machines, different code!)
let storage_b: PostgresStorage<JobTypeB> = PostgresStorage::new(&pool);
async fn handler_b(job: JobTypeB) {
// Process ONE JobTypeB instance
process(job).await?;
// How to know when ALL 4 JobTypeB instances (from the same batch) are done?
}
WorkerBuilder::new("worker-b")
.backend(storage_b) // Polls JobTypeB (different from Worker A!)
.build(handler_b)The 4 JobTypeB instances are picked up by different workers (possibly on different machines). Question: Can I use .filter_map() across different worker types? Or is manual coordination (PostgreSQL table with COUNT) the recommended pattern? (Why separate worker types: Worker A needs GPUs, Worker B runs on CPU-only machines.) btw I really appreciate your work on this library for the year+ i've been following it |
Beta Was this translation helpful? Give feedback.
-
|
You cannot pass this to a workflow: Workflows accept backend where This is because if you have to store in a generalized manner but convert to the right type at execution point. You should be able to do this: async fn handler_a(job: JobTypeA, ) -> Result<Vec<JobTypeB>, BoxDynError> {
let result = process(job).await?;
let mut next = vec![];
// Push 4 separate JobTypeB instances
for variant in [V1, V2, V3, V4] {
next.push(JobTypeB { result, variant });
}
Ok(next)
}
async fn handler_b(job: JobTypeB) -> Result<Option<String>, BoxDynError> {
// Process ONE JobTypeB instance
process(job).await?;
Ok(Some(job.variant))
}
async fn collect(res: Vec<String>) {
/// get your results here
}
#[tokio::main]
async fn main() {
let workflow = Workflow::new("odd-numbers-workflow")
.and_then(handler_a)
.filter_map(handle_b)
.and_then(collect);
} |
Beta Was this translation helpful? Give feedback.
-
|
PS, you dont need to do coordination manually, use workflows because they are checked at compile time and were made for this exact use case. |
Beta Was this translation helpful? Give feedback.
-
|
Just realized you wanted to do some execution on cpu and some on gpu. Currently you might have to do some extra work. Eg you may need to do something like: async fn handler_b(job: JobTypeB) -> Result<Option<String>, BoxDynError> {
let result = tokio::task::spawn_blocking(|| run_in_cuda_kernel()).await.unwrap();
} |
Beta Was this translation helpful? Give feedback.
Currently the feature to choose which worker consumes a task is not implemented and will be implemented as a
Profeature.Option 2 is possibly the way to go for you, then use
wait_for.