Skip to content

v4: implement onCancel callbacks #2022

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/real-rats-drop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/sdk": patch
---

Add onCancel lifecycle hook
1 change: 1 addition & 0 deletions apps/webapp/app/components/runs/v3/RunIcon.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ export function RunIcon({ name, className, spanName }: TaskIconProps) {
case "task-hook-onResume":
case "task-hook-onComplete":
case "task-hook-cleanup":
case "task-hook-onCancel":
return <FunctionIcon className={cn(className, "text-text-dimmed")} />;
case "task-hook-onFailure":
case "task-hook-catchError":
Expand Down
23 changes: 0 additions & 23 deletions apps/webapp/app/v3/services/cancelTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,29 +47,6 @@ export class CancelTaskRunService extends BaseService {
tx: this._prisma,
});

const inProgressEvents = await eventRepository.queryIncompleteEvents(
getTaskEventStoreTableForRun(taskRun),
{
runId: taskRun.friendlyId,
},
taskRun.createdAt,
taskRun.completedAt ?? undefined
);

logger.debug("Cancelling in-progress events", {
inProgressEvents: inProgressEvents.map((event) => event.id),
});

await Promise.all(
inProgressEvents.map((event) => {
return eventRepository.cancelEvent(
event,
options?.cancelledAt ?? new Date(),
options?.reason ?? "Run cancelled"
);
})
);

return {
id: result.run.id,
};
Expand Down
40 changes: 32 additions & 8 deletions packages/cli-v3/src/entryPoints/dev-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
TaskRunExecution,
timeout,
TriggerConfig,
UsageMeasurement,
waitUntil,
WorkerManifest,
WorkerToExecutorMessageCatalog,
Expand Down Expand Up @@ -232,7 +233,10 @@ async function bootstrap() {

let _execution: TaskRunExecution | undefined;
let _isRunning = false;
let _isCancelled = false;
let _tracingSDK: TracingSDK | undefined;
let _executionMeasurement: UsageMeasurement | undefined;
const cancelController = new AbortController();

const zodIpc = new ZodIpcConnection({
listenSchema: WorkerToExecutorMessageCatalog,
Expand Down Expand Up @@ -403,18 +407,17 @@ const zodIpc = new ZodIpcConnection({
getNumberEnvVar("TRIGGER_RUN_METADATA_FLUSH_INTERVAL", 1000)
);

const measurement = usage.start();
_executionMeasurement = usage.start();

// This lives outside of the executor because this will eventually be moved to the controller level
const signal = execution.run.maxDuration
? timeout.abortAfterTimeout(execution.run.maxDuration)
: undefined;
const timeoutController = timeout.abortAfterTimeout(execution.run.maxDuration);

const signal = AbortSignal.any([cancelController.signal, timeoutController.signal]);

const { result } = await executor.execute(execution, metadata, traceContext, signal);

const usageSample = usage.stop(measurement);
if (_isRunning && !_isCancelled) {
const usageSample = usage.stop(_executionMeasurement);

if (_isRunning) {
return sender.send("TASK_RUN_COMPLETED", {
execution,
result: {
Expand Down Expand Up @@ -458,7 +461,16 @@ const zodIpc = new ZodIpcConnection({
WAIT_COMPLETED_NOTIFICATION: async () => {
await managedWorkerRuntime.completeWaitpoints([]);
},
FLUSH: async ({ timeoutInMs }, sender) => {
CANCEL: async ({ timeoutInMs }) => {
_isCancelled = true;
cancelController.abort("run cancelled");
await callCancelHooks(timeoutInMs);
if (_executionMeasurement) {
usage.stop(_executionMeasurement);
}
await flushAll(timeoutInMs);
},
FLUSH: async ({ timeoutInMs }) => {
await flushAll(timeoutInMs);
},
WAITPOINT_CREATED: async ({ wait, waitpoint }) => {
Expand All @@ -470,6 +482,18 @@ const zodIpc = new ZodIpcConnection({
},
});

async function callCancelHooks(timeoutInMs: number = 10_000) {
const now = performance.now();

try {
await Promise.race([lifecycleHooks.callOnCancelHookListeners(), setTimeout(timeoutInMs)]);
} finally {
const duration = performance.now() - now;

log(`Called cancel hooks in ${duration}ms`);
}
}

async function flushAll(timeoutInMs: number = 10_000) {
const now = performance.now();

Expand Down
38 changes: 31 additions & 7 deletions packages/cli-v3/src/entryPoints/managed-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
TaskRunExecution,
timeout,
TriggerConfig,
UsageMeasurement,
waitUntil,
WorkerManifest,
WorkerToExecutorMessageCatalog,
Expand Down Expand Up @@ -229,7 +230,10 @@ async function bootstrap() {

let _execution: TaskRunExecution | undefined;
let _isRunning = false;
let _isCancelled = false;
let _tracingSDK: TracingSDK | undefined;
let _executionMeasurement: UsageMeasurement | undefined;
const cancelController = new AbortController();

const zodIpc = new ZodIpcConnection({
listenSchema: WorkerToExecutorMessageCatalog,
Expand Down Expand Up @@ -398,18 +402,17 @@ const zodIpc = new ZodIpcConnection({
getNumberEnvVar("TRIGGER_RUN_METADATA_FLUSH_INTERVAL", 1000)
);

const measurement = usage.start();
_executionMeasurement = usage.start();

// This lives outside of the executor because this will eventually be moved to the controller level
const signal = execution.run.maxDuration
? timeout.abortAfterTimeout(execution.run.maxDuration)
: undefined;
const timeoutController = timeout.abortAfterTimeout(execution.run.maxDuration);

const signal = AbortSignal.any([cancelController.signal, timeoutController.signal]);

const { result } = await executor.execute(execution, metadata, traceContext, signal);

const usageSample = usage.stop(measurement);
if (_isRunning && !_isCancelled) {
const usageSample = usage.stop(_executionMeasurement);

if (_isRunning) {
return sender.send("TASK_RUN_COMPLETED", {
execution,
result: {
Expand Down Expand Up @@ -454,6 +457,15 @@ const zodIpc = new ZodIpcConnection({
FLUSH: async ({ timeoutInMs }, sender) => {
await flushAll(timeoutInMs);
},
CANCEL: async ({ timeoutInMs }, sender) => {
_isCancelled = true;
cancelController.abort("run cancelled");
await callCancelHooks(timeoutInMs);
if (_executionMeasurement) {
usage.stop(_executionMeasurement);
}
await flushAll(timeoutInMs);
},
WAITPOINT_CREATED: async ({ wait, waitpoint }) => {
managedWorkerRuntime.associateWaitWithWaitpoint(wait.id, waitpoint.id);
},
Expand All @@ -463,6 +475,18 @@ const zodIpc = new ZodIpcConnection({
},
});

async function callCancelHooks(timeoutInMs: number = 10_000) {
const now = performance.now();

try {
await Promise.race([lifecycleHooks.callOnCancelHookListeners(), setTimeout(timeoutInMs)]);
} finally {
const duration = performance.now() - now;

console.log(`Called cancel hooks in ${duration}ms`);
}
}

async function flushAll(timeoutInMs: number = 10_000) {
const now = performance.now();

Expand Down
15 changes: 13 additions & 2 deletions packages/cli-v3/src/executions/taskRunProcess.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ export class TaskRunProcess {
this._isBeingCancelled = true;

try {
await this.#flush();
await this.#cancel();
} catch (err) {
console.error("Error flushing task run process", { err });
console.error("Error cancelling task run process", { err });
}

await this.kill();
Expand All @@ -120,6 +120,10 @@ export class TaskRunProcess {
async cleanup(kill = true) {
this._isPreparedForNextRun = false;

if (this._isBeingCancelled) {
return;
}

try {
await this.#flush();
} catch (err) {
Expand Down Expand Up @@ -224,10 +228,17 @@ export class TaskRunProcess {
await this._ipc?.sendWithAck("FLUSH", { timeoutInMs }, timeoutInMs + 1_000);
}

async #cancel(timeoutInMs: number = 30_000) {
logger.debug("sending cancel message to task run process", { pid: this.pid, timeoutInMs });

await this._ipc?.sendWithAck("CANCEL", { timeoutInMs }, timeoutInMs + 1_000);
}

async execute(
params: TaskRunProcessExecuteParams,
isWarmStart?: boolean
): Promise<TaskRunExecutionResult> {
this._isBeingCancelled = false;
this._isPreparedForNextRun = false;
this._isPreparedForNextAttempt = false;

Expand Down
22 changes: 22 additions & 0 deletions packages/core/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,25 @@ export async function tryCatch<T, E = Error>(
return [error as E, null];
}
}

export type Deferred<T> = {
promise: Promise<T>;
resolve: (value: T) => void;
reject: (reason?: any) => void;
};

export function promiseWithResolvers<T>(): Deferred<T> {
let resolve!: (value: T) => void;
let reject!: (reason?: any) => void;

const promise = new Promise<T>((_resolve, _reject) => {
resolve = _resolve;
reject = _reject;
});

return {
promise,
resolve,
reject,
};
}
3 changes: 3 additions & 0 deletions packages/core/src/v3/lifecycle-hooks-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,7 @@ export type {
AnyOnCleanupHookFunction,
TaskCleanupHookParams,
TaskWait,
TaskCancelHookParams,
OnCancelHookFunction,
AnyOnCancelHookFunction,
} from "./lifecycleHooks/types.js";
28 changes: 28 additions & 0 deletions packages/core/src/v3/lifecycleHooks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
AnyOnStartHookFunction,
AnyOnSuccessHookFunction,
AnyOnWaitHookFunction,
AnyOnCancelHookFunction,
RegisteredHookFunction,
RegisterHookFunctionParams,
TaskWait,
Expand Down Expand Up @@ -260,6 +261,33 @@ export class LifecycleHooksAPI {
this.#getManager().registerOnResumeHookListener(listener);
}

public registerGlobalCancelHook(hook: RegisterHookFunctionParams<AnyOnCancelHookFunction>): void {
this.#getManager().registerGlobalCancelHook(hook);
}

public registerTaskCancelHook(
taskId: string,
hook: RegisterHookFunctionParams<AnyOnCancelHookFunction>
): void {
this.#getManager().registerTaskCancelHook(taskId, hook);
}

public getTaskCancelHook(taskId: string): AnyOnCancelHookFunction | undefined {
return this.#getManager().getTaskCancelHook(taskId);
}

public getGlobalCancelHooks(): RegisteredHookFunction<AnyOnCancelHookFunction>[] {
return this.#getManager().getGlobalCancelHooks();
}

public callOnCancelHookListeners(): Promise<void> {
return this.#getManager().callOnCancelHookListeners();
}

public registerOnCancelHookListener(listener: () => Promise<void>): void {
this.#getManager().registerOnCancelHookListener(listener);
}

#getManager(): LifecycleHooksManager {
return getGlobal(API_NAME) ?? NOOP_LIFECYCLE_HOOKS_MANAGER;
}
Expand Down
Loading