Idiomatic way of chaining workers #572
Unanswered
ChristianJacobsen
asked this question in
Q&A
Replies: 1 comment
-
|
I think what you are looking for is piping. The concept is still in development but here is how we do it in apalis-cron.
pub fn pipe_to_storage<S, Ctx>(self, storage: S) -> StorageTwoPiped<S>
where
S: Storage<Job = Req, Context = Ctx> + Clone + Send + Sync + 'static,
S::Error: std::error::Error + Send + Sync + 'static,
{
let stream = self
.into_stream()
.then({
let storage = storage.clone();
move |res| {
let mut storage = storage.clone();
async move {
match res {
Ok(Some(req)) => storage
.push(req.args)
.await
.map(|_| ())
.map_err(|e| Box::new(e) as BoxDynError),
_ => Ok(()),
}
}
}
})
.boxed();
StorageTwoPiped {
stream,
inner: storage,
}
}
impl<T, Ctx, Inner> Backend<Request<T, Ctx>> for StorageTwoPiped<Inner>
where
Inner: Backend<Request<T, Ctx>>,
{
type Stream = Inner::Stream;
type Layer = Inner::Layer;
type Codec = Inner::Codec;
fn poll(mut self, worker: &Worker<Context>) -> Poller<Self::Stream, Self::Layer> {
let pipe_heartbeat = async move { while (self.stream.next().await).is_some() {} };
let inner = self.inner.poll(worker);
let heartbeat = inner.heartbeat;
Poller::new_with_layer(
inner.stream,
async {
futures::join!(heartbeat, pipe_heartbeat);
},
inner.layer,
)
}
}
|
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
-
Hi,
I'm building a system where I plan on having multiple workers (with different concurrency settings) trigger each other in a chain, with each worker having multiple steps. I couldn't find an example for this, so I thought I'd ask if anyone has an idea of how to solve the queueing of tasks between workers in the most "idiomatic"/"true to the spirit of the library" way?
I know this can be solved with channels and such, but I wondered if any thought during the design of the library had gone into how these would best be shared? Would you just re-use the (Redis)Connection with multiple storages and then pass each "next storage" as a handle to the worker before it to queue up tasks? I.e.:
When trying something like this I just end up with lots and lots of duplicated task invocations.
The
logger1task here is just this:For some reason this just doesn't sit right with me. I hope there's something I'm missing.
Any help or thoughts would be greatly appreciated. This library looks like it can solve the scheduling issues I'm facing, as I really need the persisted state in case of crashes.
Beta Was this translation helpful? Give feedback.
All reactions