Skip to content

apalis-cron: Prevent outdated tasks after downtime #598

@liamdiprose

Description

@liamdiprose

Hey @geofmureithi,

We're using apalis-cron to run an expensive update task every 5 minutes. We also pipe the schedule to sqlite storage so we can trigger additional updates manually. The ability for high availability is also desired.

However, after starting our server after long period of downtime, apalis-cron runs the update repeatedly for every missed execution.

We're looking for a way to cancel the old schedule and start again.

My attempt is below, but still haven't cracked this. I had a look at the code and couldn't find anything that would cause the new schedule to resume from an older time.

Your help would be greatly appreciated.

fn setup_updates_backend()
async fn setup_updates_backend(mut updates_store: SqliteStorage<tasks::ScheduleUpdateTask>) -> CronPipe<SqliteStorage<tasks::ScheduleUpdateTask>> {
    use apalis::prelude::State;

    let five_minutely_schedule = CronSchedule::from_str(
        // sec  min   hour   day of month   month   day of week   year
        "    0  */5      *              *       *             *      *"
    ).unwrap();
    let cron_stream = CronStream::new(five_minutely_schedule);

    let states_to_cancel = [
        State::Pending,
        State::Running,
        State::Scheduled,
        State::Failed,
    ];

    for state in states_to_cancel {
        let mut page = 0;
        loop {
            let jobs = match updates_store.list_jobs(&state, page).await {
                Err(e) => {
                    error!("Failed to list jobs: {e}");
                    break;
                },
                Ok(jobs) => jobs
            };

            if jobs.len() == 0 {
                break;
            }

            for job in jobs {
                debug!("Cancelling: {:?} ({:?})", job.parts.context.task_id, job.args);
                if let Ok(Some(mut job)) = updates_store.fetch_by_id(&job.parts.context.task_id).await {
                    job.parts.context.set_status(apalis::prelude::State::Killed);
                    if let Err(e) = updates_store.update(job).await {
                        error!("Failed to save intermediate update task: {e}");
                    }
                } else {
                    error!("Task {} not found", job.parts.task_id);
                }
            }

            page += 1;
        }
    }

    let _ = updates_store.vacuum().await;

    if let Ok(count) = updates_store.len().await {
        if count > 0 {
            dbg!(updates_store.stats().await);
            error!("Updates store still has {count} tasks")
        }
    }


    if let Err(e) = updates_store.push_request(Request::new(tasks::ScheduleUpdateTask {})).await {
        error!("Failed to schedule initial update task: {e}");
    }

    cron_stream.pipe_to_storage(updates_store)
}

Thanks for your amazing work.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions