apalis-cron: Prevent outdated tasks after downtime #599
-
|
Hey @geofmureithi, We're using apalis-cron to run an expensive update task every 5 minutes. We also pipe the schedule to sqlite storage so we can trigger additional updates manually. The ability for high availability is also desired. However, after starting our server after long period of downtime, apalis-cron runs the update repeatedly for every missed execution. We're looking for a way to cancel the old schedule and start again. My attempt is below, but still haven't cracked this. I had a look at the code and couldn't find anything that would cause the new schedule to resume from an older time. Your help would be greatly appreciated. fn setup_updates_backend()async fn setup_updates_backend(mut updates_store: SqliteStorage<tasks::ScheduleUpdateTask>) -> CronPipe<SqliteStorage<tasks::ScheduleUpdateTask>> {
use apalis::prelude::State;
let five_minutely_schedule = CronSchedule::from_str(
// sec min hour day of month month day of week year
" 0 */5 * * * * *"
).unwrap();
let cron_stream = CronStream::new(five_minutely_schedule);
let states_to_cancel = [
State::Pending,
State::Running,
State::Scheduled,
State::Failed,
];
for state in states_to_cancel {
let mut page = 0;
loop {
let jobs = match updates_store.list_jobs(&state, page).await {
Err(e) => {
error!("Failed to list jobs: {e}");
break;
},
Ok(jobs) => jobs
};
if jobs.len() == 0 {
break;
}
for job in jobs {
debug!("Cancelling: {:?} ({:?})", job.parts.context.task_id, job.args);
if let Ok(Some(mut job)) = updates_store.fetch_by_id(&job.parts.context.task_id).await {
job.parts.context.set_status(apalis::prelude::State::Killed);
if let Err(e) = updates_store.update(job).await {
error!("Failed to save intermediate update task: {e}");
}
} else {
error!("Task {} not found", job.parts.task_id);
}
}
page += 1;
}
}
let _ = updates_store.vacuum().await;
if let Ok(count) = updates_store.len().await {
if count > 0 {
dbg!(updates_store.stats().await);
error!("Updates store still has {count} tasks")
}
}
if let Err(e) = updates_store.push_request(Request::new(tasks::ScheduleUpdateTask {})).await {
error!("Failed to schedule initial update task: {e}");
}
cron_stream.pipe_to_storage(updates_store)
}Thanks for your amazing work. |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 3 replies
-
|
Hmmm, have you considered replacing this with a sql query? for state in states_to_cancel {
let mut page = 0;
loop {
let jobs = match updates_store.list_jobs(&state, page).await {
Err(e) => {
error!("Failed to list jobs: {e}");
break;
},
Ok(jobs) => jobs
};
if jobs.len() == 0 {
break;
}
for job in jobs {
debug!("Cancelling: {:?} ({:?})", job.parts.context.task_id, job.args);
if let Ok(Some(mut job)) = updates_store.fetch_by_id(&job.parts.context.task_id).await {
job.parts.context.set_status(apalis::prelude::State::Killed);
if let Err(e) = updates_store.update(job).await {
error!("Failed to save intermediate update task: {e}");
}
} else {
error!("Task {} not found", job.parts.task_id);
}
}
page += 1;
}
}This would be something like: UPDATE Jobs .... |
Beta Was this translation helpful? Give feedback.
Hmmm, have you considered replacing this with a sql query?