Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tasks): scheduling tasks for a one-time run in future #2640

Draft
wants to merge 8 commits into
base: v2
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions docs/1.guide/10.tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,45 @@ export default eventHandler(async (event) => {
});
```

### Running in future

You can also run tasks at a later time using the `runAt` and `runAfter` options.

**Example:**

::code-group
```ts [runAt]
// api/remind.ts
export default eventHandler(async (event) => {
// IMPORTANT: Authenticate user and validate payload!
const payload = { ...getQuery(event) };

// Run the task at 3:00 PM
const { result } = await runTask("send:reminder", {
payload,
runAt: new Date().setHours(15, 0, 0, 0),
});

return { result };
});
```
```ts [runAfter]
// api/remind.ts
export default eventHandler(async (event) => {
// IMPORTANT: Authenticate user and validate payload!
const payload = { ...getQuery(event) };

// Run the task after 24 hours has passed
const { result } = await runTask("send:reminder", {
payload,
runAfter: 1000 * 60 * 60 * 24, // 24 hours
});

return { result };
});
```
::

## Run tasks with dev server

Nitro's built-in dev server exposes tasks to be easily executed without programmatic usage.
Expand Down
5 changes: 2 additions & 3 deletions src/core/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,10 @@ export function addNitroTasksVirtualFile(nitro: Nitro) {
return true;
}
);
return { cron, tasks };
return { cron, tasks, once: false };
})
.filter((e) => e.tasks.length > 0);
const scheduledTasks: false | { cron: string; tasks: string[] }[] =
_scheduledTasks.length > 0 ? _scheduledTasks : false;
const scheduledTasks: { cron: string; tasks: string[]; once: boolean; }[] = _scheduledTasks;

return /* js */ `
export const scheduledTasks = ${JSON.stringify(scheduledTasks)};
Expand Down
8 changes: 4 additions & 4 deletions src/presets/_all.gen.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
// Auto-generated using gen-presets script

import _nitro from "./_nitro/preset";
import _static from "./_static/preset";
import _alwaysdata from "./alwaysdata/preset";
import _awsAmplify from "./aws-amplify/preset";
import _awsLambda from "./aws-lambda/preset";
Expand All @@ -25,10 +23,10 @@ import _stormkit from "./stormkit/preset";
import _vercel from "./vercel/preset";
import _winterjs from "./winterjs/preset";
import _zeabur from "./zeabur/preset";
import _nitro from "./_nitro/preset";
import _static from "./_static/preset";

export default [
..._nitro,
..._static,
..._alwaysdata,
..._awsAmplify,
..._awsLambda,
Expand All @@ -52,4 +50,6 @@ export default [
..._vercel,
..._winterjs,
..._zeabur,
..._nitro,
..._static,
] as const;
71 changes: 59 additions & 12 deletions src/runtime/internal/task.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
import { Cron } from "croner";
import { createError } from "h3";
import type {
Task,
TaskContext,
TaskEvent,
TaskPayload,
TaskResult,
} from "nitropack/types";
import type { Task, TaskContext, TaskEvent, TaskPayload, TaskResult, TaskOptions } from "nitropack/types";
import { isTest } from "std-env";
import { scheduledTasks, tasks } from "#nitro-internal-virtual/tasks";

Expand All @@ -28,8 +22,31 @@ export async function runTask<RT = unknown>(
{
payload = {},
context = {},
}: { payload?: TaskPayload; context?: TaskContext } = {}
}: { payload?: TaskPayload; context?: TaskContext } = {},
opts: TaskOptions = {}
): Promise<TaskResult<RT>> {
if (opts.runAt || opts.runAfter) {
if (opts.runAt && opts.runAfter) {
throw createError({
message: "Cannot use both `runAt` and `runAfter` options!",
statusCode: 400,
});
}

let date: Date;
if (opts.runAt) {
date = typeof opts.runAt === "string" ? new Date(opts.runAt) : opts.runAt;
} else if (opts.runAfter) {
date = new Date(Date.now() + opts.runAfter * 1000);
} else {
throw new Error("Invalid options!");
}

const cron = `${date.getSeconds()} ${date.getMinutes()} ${date.getHours()} ${date.getDate()} ${date.getMonth() + 1} *`;
scheduleTask(name, cron);
return {};
}

if (__runningTasks__[name]) {
return __runningTasks__[name];
}
Expand All @@ -53,8 +70,7 @@ export async function runTask<RT = unknown>(
__runningTasks__[name] = handler.run(taskEvent);

try {
const res = await __runningTasks__[name];
return res;
return await __runningTasks__[name];
} finally {
delete __runningTasks__[name];
}
Expand All @@ -71,7 +87,7 @@ export function startScheduleRunner() {
};

for (const schedule of scheduledTasks) {
const cron = new Cron(schedule.cron, async () => {
new Cron(schedule.cron, async () => {
await Promise.all(
schedule.tasks.map((name) =>
runTask(name, {
Expand All @@ -84,11 +100,42 @@ export function startScheduleRunner() {
);
})
)
);
)
});
}
}

/** @experimental */
export function scheduleTask(name: string, cron: string) {
if (!scheduledTasks) {
throw new Error("Scheduled tasks are not available!");
}

scheduledTasks.push({ cron, tasks: [name], once: true });

const payload: TaskPayload = {
scheduledTime: Date.now(),
};

new Cron(cron, {
maxRuns: 1,
}, async () => {
await runTask(name, {
payload,
context: {},
}).catch((error) => {
console.error(`[nitro] Error while running scheduled task "${name}"`, error);
});

if (scheduledTasks) {
const index = scheduledTasks.findIndex((task) => task.cron === cron);
if (index >= 0) {
scheduledTasks.splice(index, 1);
}
}
});
}

/** @experimental */
export function getCronTasks(cron: string): string[] {
return (scheduledTasks || []).find((task) => task.cron === cron)?.tasks || [];
Expand Down
6 changes: 6 additions & 0 deletions src/types/runtime/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ export interface Task<RT = unknown> {
run(event: TaskEvent): MaybePromise<{ result?: RT }>;
}

/** @experimental */
export interface TaskOptions {
runAt?: Date | string;
runAfter?: number;
}

/** @experimental */
export interface TaskRunnerOptions {
cwd?: string;
Expand Down
2 changes: 1 addition & 1 deletion src/types/virtual/tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ export const tasks: Record<
{ resolve?: () => Promise<Task>; meta: TaskMeta }
> = {};

export const scheduledTasks: false | { cron: string; tasks: string[] }[] = [];
export const scheduledTasks: { cron: string; tasks: string[]; once: boolean; }[] = [];