diff --git a/.changeset/giant-plums-smash.md b/.changeset/giant-plums-smash.md new file mode 100644 index 0000000000..7b23fb870c --- /dev/null +++ b/.changeset/giant-plums-smash.md @@ -0,0 +1,18 @@ +--- +"@trigger.dev/sdk": patch +--- + +Added and cleaned up the run ctx param: + +- New optional properties `ctx.run.parentTaskRunId` and `ctx.run.rootTaskRunId` reference the current run's root/parent ID. +- Removed deprecated properties from `ctx` +- Added a new `ctx.deployment` object that contains information about the deployment associated with the run. + +We also update `metadata.root` and `metadata.parent` to work even when the run is a "root" run (meaning it doesn't have a parent or a root associated run). This now works: + +```ts +metadata.root.set("foo", "bar"); +metadata.parent.set("baz", 1); +metadata.current().foo // "bar" +metadata.current().baz // 1 +``` diff --git a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts index 7d3c219bee..0d9065e28a 100644 --- a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts @@ -1,22 +1,29 @@ import { - type MachinePresetName, + MachinePreset, prettyPrintPacket, SemanticInternalAttributes, + TaskRunContext, TaskRunError, + V3TaskRunContext, } from "@trigger.dev/core/v3"; -import { getMaxDuration } from "@trigger.dev/core/v3/isomorphic"; +import { AttemptId, getMaxDuration } from "@trigger.dev/core/v3/isomorphic"; import { RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus"; import { logger } from "~/services/logger.server"; import { eventRepository, rehydrateAttribute } from "~/v3/eventRepository.server"; -import { machinePresetFromName, machinePresetFromRun } from "~/v3/machinePresets.server"; +import { machinePresetFromRun } from "~/v3/machinePresets.server"; import { getTaskEventStoreTableForRun, type TaskEventStoreTable } from "~/v3/taskEventStore.server"; import { isFailedRunStatus, isFinalRunStatus } from "~/v3/taskStatus"; import { BasePresenter } from "./basePresenter.server"; import { WaitpointPresenter } from "./WaitpointPresenter.server"; +import { engine } from "~/v3/runEngine.server"; type Result = Awaited>; export type Span = NonNullable["span"]>; export type SpanRun = NonNullable["run"]>; +type FindRunResult = NonNullable< + Awaited["findRun"]>> +>; +type GetSpanResult = NonNullable>>; export class SpanPresenter extends BasePresenter { public async call({ @@ -60,7 +67,7 @@ export class SpanPresenter extends BasePresenter { const eventStore = getTaskEventStoreTableForRun(parentRun); - const run = await this.#getRun({ + const run = await this.getRun({ eventStore, traceId, spanId, @@ -95,7 +102,7 @@ export class SpanPresenter extends BasePresenter { }; } - async #getRun({ + async getRun({ eventStore, traceId, spanId, @@ -120,114 +127,7 @@ export class SpanPresenter extends BasePresenter { return; } - const run = await this._replica.taskRun.findFirst({ - select: { - id: true, - spanId: true, - traceId: true, - //metadata - number: true, - taskIdentifier: true, - friendlyId: true, - isTest: true, - maxDurationInSeconds: true, - taskEventStore: true, - tags: { - select: { - name: true, - }, - }, - machinePreset: true, - lockedToVersion: { - select: { - version: true, - sdkVersion: true, - runtime: true, - runtimeVersion: true, - }, - }, - engine: true, - workerQueue: true, - error: true, - output: true, - outputType: true, - //status + duration - status: true, - statusReason: true, - startedAt: true, - executedAt: true, - createdAt: true, - updatedAt: true, - queuedAt: true, - completedAt: true, - logsDeletedAt: true, - //idempotency - idempotencyKey: true, - idempotencyKeyExpiresAt: true, - //delayed - delayUntil: true, - //ttl - ttl: true, - expiredAt: true, - //queue - queue: true, - concurrencyKey: true, - //schedule - scheduleId: true, - //usage - baseCostInCents: true, - costInCents: true, - usageDurationMs: true, - //env - runtimeEnvironment: { - select: { id: true, slug: true, type: true }, - }, - payload: true, - payloadType: true, - metadata: true, - metadataType: true, - maxAttempts: true, - project: { - include: { - organization: true, - }, - }, - lockedBy: { - select: { - filePath: true, - }, - }, - //relationships - rootTaskRun: { - select: { - taskIdentifier: true, - friendlyId: true, - spanId: true, - createdAt: true, - }, - }, - parentTaskRun: { - select: { - taskIdentifier: true, - friendlyId: true, - spanId: true, - }, - }, - batch: { - select: { - friendlyId: true, - }, - }, - replayedFromTaskRunFriendlyId: true, - }, - where: span.originalRun - ? { - friendlyId: span.originalRun, - } - : { - spanId, - }, - }); + const run = await this.findRun({ span, spanId }); if (!run) { return; @@ -271,46 +171,7 @@ export class SpanPresenter extends BasePresenter { const machine = run.machinePreset ? machinePresetFromRun(run) : undefined; - const context = { - task: { - id: run.taskIdentifier, - filePath: run.lockedBy?.filePath, - }, - run: { - id: run.friendlyId, - createdAt: run.createdAt, - tags: run.tags.map((tag) => tag.name), - isTest: run.isTest, - idempotencyKey: run.idempotencyKey ?? undefined, - startedAt: run.startedAt ?? run.createdAt, - durationMs: run.usageDurationMs, - costInCents: run.costInCents, - baseCostInCents: run.baseCostInCents, - maxAttempts: run.maxAttempts ?? undefined, - version: run.lockedToVersion?.version, - maxDuration: run.maxDurationInSeconds ?? undefined, - }, - queue: { - name: run.queue, - }, - environment: { - id: run.runtimeEnvironment.id, - slug: run.runtimeEnvironment.slug, - type: run.runtimeEnvironment.type, - }, - organization: { - id: run.project.organization.id, - slug: run.project.organization.slug, - name: run.project.organization.title, - }, - project: { - id: run.project.id, - ref: run.project.externalRef, - slug: run.project.slug, - name: run.project.name, - }, - machine, - }; + const context = await this.#getTaskRunContext({ run, machine: machine ?? undefined }); return { id: run.id, @@ -342,7 +203,7 @@ export class SpanPresenter extends BasePresenter { isCustomQueue: !run.queue.startsWith("task/"), concurrencyKey: run.concurrencyKey, }, - tags: run.tags.map((tag) => tag.name), + tags: run.runTags, baseCostInCents: run.baseCostInCents, costInCents: run.costInCents, totalCostInCents: run.costInCents + run.baseCostInCents, @@ -405,6 +266,127 @@ export class SpanPresenter extends BasePresenter { }; } + async findRun({ span, spanId }: { span: GetSpanResult; spanId: string }) { + const run = await this._replica.taskRun.findFirst({ + select: { + id: true, + spanId: true, + traceId: true, + //metadata + number: true, + taskIdentifier: true, + friendlyId: true, + isTest: true, + maxDurationInSeconds: true, + taskEventStore: true, + runTags: true, + machinePreset: true, + lockedToVersion: { + select: { + version: true, + sdkVersion: true, + runtime: true, + runtimeVersion: true, + }, + }, + engine: true, + workerQueue: true, + error: true, + output: true, + outputType: true, + //status + duration + status: true, + statusReason: true, + startedAt: true, + executedAt: true, + createdAt: true, + updatedAt: true, + queuedAt: true, + completedAt: true, + logsDeletedAt: true, + //idempotency + idempotencyKey: true, + idempotencyKeyExpiresAt: true, + //delayed + delayUntil: true, + //ttl + ttl: true, + expiredAt: true, + //queue + queue: true, + concurrencyKey: true, + //schedule + scheduleId: true, + //usage + baseCostInCents: true, + costInCents: true, + usageDurationMs: true, + //env + runtimeEnvironment: { + select: { id: true, slug: true, type: true }, + }, + payload: true, + payloadType: true, + metadata: true, + metadataType: true, + maxAttempts: true, + project: { + include: { + organization: true, + }, + }, + lockedBy: { + select: { + filePath: true, + }, + }, + //relationships + rootTaskRun: { + select: { + taskIdentifier: true, + friendlyId: true, + spanId: true, + createdAt: true, + }, + }, + parentTaskRun: { + select: { + taskIdentifier: true, + friendlyId: true, + spanId: true, + }, + }, + batch: { + select: { + friendlyId: true, + }, + }, + replayedFromTaskRunFriendlyId: true, + attempts: { + take: 1, + orderBy: { + createdAt: "desc", + }, + select: { + number: true, + status: true, + createdAt: true, + friendlyId: true, + }, + }, + }, + where: span.originalRun + ? { + friendlyId: span.originalRun, + } + : { + spanId, + }, + }); + + return run; + } + async #getSpan({ eventStore, traceId, @@ -513,4 +495,83 @@ export class SpanPresenter extends BasePresenter { return { ...data, entity: null }; } } + + async #getTaskRunContext({ run, machine }: { run: FindRunResult; machine?: MachinePreset }) { + if (run.engine === "V1") { + return this.#getV3TaskRunContext({ run, machine }); + } else { + return this.#getV4TaskRunContext({ run }); + } + } + + async #getV3TaskRunContext({ + run, + machine, + }: { + run: FindRunResult; + machine?: MachinePreset; + }): Promise { + const attempt = run.attempts[0]; + + const context = { + attempt: attempt + ? { + id: attempt.friendlyId, + number: attempt.number, + status: attempt.status, + startedAt: attempt.createdAt, + } + : { + id: AttemptId.generate().friendlyId, + number: 1, + status: "PENDING" as const, + startedAt: run.updatedAt, + }, + task: { + id: run.taskIdentifier, + filePath: run.lockedBy?.filePath ?? "", + }, + run: { + id: run.friendlyId, + createdAt: run.createdAt, + tags: run.runTags, + isTest: run.isTest, + idempotencyKey: run.idempotencyKey ?? undefined, + startedAt: run.startedAt ?? run.createdAt, + durationMs: run.usageDurationMs, + costInCents: run.costInCents, + baseCostInCents: run.baseCostInCents, + maxAttempts: run.maxAttempts ?? undefined, + version: run.lockedToVersion?.version, + maxDuration: run.maxDurationInSeconds ?? undefined, + }, + queue: { + name: run.queue, + id: run.queue, + }, + environment: { + id: run.runtimeEnvironment.id, + slug: run.runtimeEnvironment.slug, + type: run.runtimeEnvironment.type, + }, + organization: { + id: run.project.organization.id, + slug: run.project.organization.slug, + name: run.project.organization.title, + }, + project: { + id: run.project.id, + ref: run.project.externalRef, + slug: run.project.slug, + name: run.project.name, + }, + machine, + } satisfies V3TaskRunContext; + + return context; + } + + async #getV4TaskRunContext({ run }: { run: FindRunResult }): Promise { + return engine.resolveTaskRunContext(run.id); + } } diff --git a/apps/webapp/app/routes/resources.runs.$runParam.ts b/apps/webapp/app/routes/resources.runs.$runParam.ts index 899dbb63f0..7b116b31c3 100644 --- a/apps/webapp/app/routes/resources.runs.$runParam.ts +++ b/apps/webapp/app/routes/resources.runs.$runParam.ts @@ -76,6 +76,30 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { lockedBy: { select: { filePath: true, + worker: { + select: { + deployment: { + select: { + friendlyId: true, + shortCode: true, + version: true, + runtime: true, + runtimeVersion: true, + git: true, + }, + }, + }, + }, + }, + }, + parentTaskRun: { + select: { + friendlyId: true, + }, + }, + rootTaskRun: { + select: { + friendlyId: true, }, }, }, @@ -163,6 +187,8 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { baseCostInCents: run.baseCostInCents, maxAttempts: run.maxAttempts ?? undefined, version: run.lockedToVersion?.version, + parentTaskRunId: run.parentTaskRun?.friendlyId ?? undefined, + rootTaskRunId: run.rootTaskRun?.friendlyId ?? undefined, }, queue: { name: run.queue, @@ -184,6 +210,16 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { name: run.project.name, }, machine: run.machinePreset ? machinePresetFromRun(run) : undefined, + deployment: run.lockedBy?.worker.deployment + ? { + id: run.lockedBy.worker.deployment.friendlyId, + shortCode: run.lockedBy.worker.deployment.shortCode, + version: run.lockedBy.worker.deployment.version, + runtime: run.lockedBy.worker.deployment.runtime, + runtimeVersion: run.lockedBy.worker.deployment.runtimeVersion, + git: run.lockedBy.worker.deployment.git, + } + : undefined, }; return typedjson({ diff --git a/apps/webapp/app/v3/failedTaskRun.server.ts b/apps/webapp/app/v3/failedTaskRun.server.ts index 3935c21ddd..f4b3c92ea6 100644 --- a/apps/webapp/app/v3/failedTaskRun.server.ts +++ b/apps/webapp/app/v3/failedTaskRun.server.ts @@ -4,6 +4,7 @@ import { TaskRunExecution, TaskRunExecutionRetry, TaskRunFailedExecutionResult, + V3TaskRunExecution, } from "@trigger.dev/core/v3"; import type { Prisma, TaskRun } from "@trigger.dev/database"; import * as semver from "semver"; @@ -129,7 +130,7 @@ export class FailedTaskRunRetryHelper extends BaseService { async #getRetriableAttemptExecution( run: TaskRunWithAttempts, completion: TaskRunFailedExecutionResult - ): Promise { + ): Promise { let attempt = run.attempts[0]; // We need to create an attempt if: diff --git a/apps/webapp/app/v3/marqs/devQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/devQueueConsumer.server.ts index 307a6be5d4..2bd80d465b 100644 --- a/apps/webapp/app/v3/marqs/devQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/devQueueConsumer.server.ts @@ -1,6 +1,6 @@ import { Context, ROOT_CONTEXT, Span, SpanKind, context, trace } from "@opentelemetry/api"; import { - TaskRunExecution, + V3TaskRunExecution, TaskRunExecutionLazyAttemptPayload, TaskRunExecutionResult, TaskRunFailedExecutionResult, @@ -138,7 +138,7 @@ export class DevQueueConsumer { public async taskAttemptCompleted( workerId: string, completion: TaskRunExecutionResult, - execution: TaskRunExecution + execution: V3TaskRunExecution ) { if (completion.ok) { this._taskSuccesses++; diff --git a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts index 21544cc756..075732544c 100644 --- a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts @@ -11,8 +11,8 @@ import { import { AckCallbackResult, MachinePreset, - ProdTaskRunExecution, - ProdTaskRunExecutionPayload, + V3ProdTaskRunExecution, + V3ProdTaskRunExecutionPayload, TaskRunError, TaskRunErrorCodes, TaskRunExecution, @@ -1679,7 +1679,7 @@ class SharedQueueTasks { private async _executionFromAttempt( attempt: AttemptForExecution, machinePreset?: MachinePreset - ): Promise { + ): Promise { const { backgroundWorkerTask, taskRun, queue } = attempt; if (!machinePreset) { @@ -1693,7 +1693,7 @@ class SharedQueueTasks { dataType: taskRun.metadataType, }); - const execution: ProdTaskRunExecution = { + const execution: V3ProdTaskRunExecution = { task: { id: backgroundWorkerTask.slug, filePath: backgroundWorkerTask.filePath, @@ -1784,7 +1784,7 @@ class SharedQueueTasks { setToExecuting?: boolean; isRetrying?: boolean; skipStatusChecks?: boolean; - }): Promise { + }): Promise { const attempt = await prisma.taskRunAttempt.findFirst({ where: { id, @@ -1874,7 +1874,7 @@ class SharedQueueTasks { machinePreset ); - const payload: ProdTaskRunExecutionPayload = { + const payload: V3ProdTaskRunExecutionPayload = { execution, traceContext: taskRun.traceContext as Record, environment: variables.reduce((acc: Record, curr) => { @@ -1888,7 +1888,7 @@ class SharedQueueTasks { async getResumePayload(attemptId: string): Promise< | { - execution: ProdTaskRunExecution; + execution: V3ProdTaskRunExecution; completion: TaskRunExecutionResult; } | undefined @@ -1927,7 +1927,7 @@ class SharedQueueTasks { async getResumePayloads(attemptIds: string[]): Promise< Array<{ - execution: ProdTaskRunExecution; + execution: V3ProdTaskRunExecution; completion: TaskRunExecutionResult; }> > { @@ -1985,7 +1985,7 @@ class SharedQueueTasks { id: string, setToExecuting?: boolean, isRetrying?: boolean - ): Promise { + ): Promise { const run = await prisma.taskRun.findFirst({ where: { id, diff --git a/apps/webapp/app/v3/services/completeAttempt.server.ts b/apps/webapp/app/v3/services/completeAttempt.server.ts index d0b9d911b6..8fe57f040d 100644 --- a/apps/webapp/app/v3/services/completeAttempt.server.ts +++ b/apps/webapp/app/v3/services/completeAttempt.server.ts @@ -2,13 +2,13 @@ import { Attributes } from "@opentelemetry/api"; import { MachinePresetName, TaskRunContext, - TaskRunError, TaskRunErrorCodes, TaskRunExecution, TaskRunExecutionResult, TaskRunExecutionRetry, TaskRunFailedExecutionResult, TaskRunSuccessfulExecutionResult, + V3TaskRunExecution, flattenAttributes, isOOMRunError, sanitizeError, @@ -60,7 +60,7 @@ export class CompleteAttemptService extends BaseService { checkpoint, }: { completion: TaskRunExecutionResult; - execution: TaskRunExecution; + execution: V3TaskRunExecution; env?: AuthenticatedEnvironment; checkpoint?: CheckpointData; }): Promise<"COMPLETED" | "RETRIED"> { @@ -196,7 +196,7 @@ export class CompleteAttemptService extends BaseService { checkpoint, }: { completion: TaskRunFailedExecutionResult; - execution: TaskRunExecution; + execution: V3TaskRunExecution; taskRunAttempt: NonNullable; env?: AuthenticatedEnvironment; checkpoint?: CheckpointData; @@ -559,7 +559,7 @@ export class CompleteAttemptService extends BaseService { forceRequeue = false, oomMachine, }: { - execution: TaskRunExecution; + execution: V3TaskRunExecution; executionRetry: TaskRunExecutionRetry; executionRetryInferred: boolean; taskRunAttempt: NonNullable; @@ -648,7 +648,7 @@ export class CompleteAttemptService extends BaseService { executionRetryInferred, checkpoint, }: { - execution: TaskRunExecution; + execution: V3TaskRunExecution; taskRunAttempt: NonNullable; executionRetry: TaskRunExecutionRetry; executionRetryInferred: boolean; diff --git a/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts b/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts index 60d7448b2e..df5e4e2b74 100644 --- a/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts +++ b/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts @@ -1,4 +1,4 @@ -import { parsePacket, TaskRunExecution } from "@trigger.dev/core/v3"; +import { parsePacket, V3TaskRunExecution } from "@trigger.dev/core/v3"; import { TaskRun, TaskRunAttempt } from "@trigger.dev/database"; import { MAX_TASK_RUN_ATTEMPTS } from "~/consts"; import { $transaction, prisma, PrismaClientOrTransaction } from "~/db.server"; @@ -25,7 +25,7 @@ export class CreateTaskRunAttemptService extends BaseService { setToExecuting?: boolean; startAtZero?: boolean; }): Promise<{ - execution: TaskRunExecution; + execution: V3TaskRunExecution; run: TaskRun; attempt: TaskRunAttempt; }> { @@ -189,7 +189,7 @@ export class CreateTaskRunAttemptService extends BaseService { dataType: taskRun.metadataType, }); - const execution: TaskRunExecution = { + const execution: V3TaskRunExecution = { task: { id: lockedBy.slug, filePath: lockedBy.filePath, diff --git a/internal-packages/cache/README.md b/internal-packages/cache/README.md new file mode 100644 index 0000000000..e0f344f788 --- /dev/null +++ b/internal-packages/cache/README.md @@ -0,0 +1,3 @@ +# Redis + +This is a simple package that is used to return a valid Redis client and provides an error callback. It will log and swallow errors if they're not handled. diff --git a/internal-packages/cache/package.json b/internal-packages/cache/package.json new file mode 100644 index 0000000000..63dd03a4b4 --- /dev/null +++ b/internal-packages/cache/package.json @@ -0,0 +1,18 @@ +{ + "name": "@internal/cache", + "private": true, + "version": "0.0.1", + "main": "./src/index.ts", + "types": "./src/index.ts", + "type": "module", + "dependencies": { + "@unkey/cache": "^1.5.0", + "@unkey/error": "^0.2.0", + "@trigger.dev/core": "workspace:*", + "@internal/redis": "workspace:*", + "superjson": "^2.2.1" + }, + "scripts": { + "typecheck": "tsc --noEmit" + } +} \ No newline at end of file diff --git a/internal-packages/cache/src/index.ts b/internal-packages/cache/src/index.ts new file mode 100644 index 0000000000..e5844d910b --- /dev/null +++ b/internal-packages/cache/src/index.ts @@ -0,0 +1,8 @@ +export { + createCache, + DefaultStatefulContext, + Namespace, + type Cache as UnkeyCache, +} from "@unkey/cache"; +export { MemoryStore } from "@unkey/cache/stores"; +export { RedisCacheStore } from "./stores/redis.js"; diff --git a/internal-packages/cache/src/stores/redis.ts b/internal-packages/cache/src/stores/redis.ts new file mode 100644 index 0000000000..4f40133a4f --- /dev/null +++ b/internal-packages/cache/src/stores/redis.ts @@ -0,0 +1,105 @@ +import { CacheError } from "@unkey/cache"; +import type { Entry, Store } from "@unkey/cache/stores"; +import { Err, Ok, type Result } from "@unkey/error"; +import { createRedisClient, Redis, RedisOptions } from "@internal/redis"; + +export type RedisCacheStoreConfig = { + connection: RedisOptions; + name?: string; + useModernCacheKeyBuilder?: boolean; +}; + +export class RedisCacheStore + implements Store +{ + public readonly name = "redis"; + private readonly redis: Redis; + + constructor(private readonly config: RedisCacheStoreConfig) { + this.redis = createRedisClient({ + ...config.connection, + name: config.name ?? "trigger:cacheStore", + }); + } + + private buildCacheKey(namespace: TNamespace, key: string): string { + if (this.config.useModernCacheKeyBuilder) { + return [namespace, key].join(":"); + } + + return [namespace, key].join("::"); + } + + public async get( + namespace: TNamespace, + key: string + ): Promise | undefined, CacheError>> { + let raw: string | null; + try { + raw = await this.redis.get(this.buildCacheKey(namespace, key)); + } catch (err) { + return Err( + new CacheError({ + tier: this.name, + key, + message: (err as Error).message, + }) + ); + } + + if (!raw) { + return Promise.resolve(Ok(undefined)); + } + + try { + const superjson = await import("superjson"); + const entry = superjson.parse(raw) as Entry; + return Ok(entry); + } catch (err) { + return Err( + new CacheError({ + tier: this.name, + key, + message: (err as Error).message, + }) + ); + } + } + + public async set( + namespace: TNamespace, + key: string, + entry: Entry + ): Promise> { + const cacheKey = this.buildCacheKey(namespace, key); + try { + const superjson = await import("superjson"); + await this.redis.set(cacheKey, superjson.stringify(entry), "PXAT", entry.staleUntil); + return Ok(); + } catch (err) { + return Err( + new CacheError({ + tier: this.name, + key, + message: (err as Error).message, + }) + ); + } + } + + public async remove(namespace: TNamespace, key: string): Promise> { + try { + const cacheKey = this.buildCacheKey(namespace, key); + await this.redis.del(cacheKey); + return Promise.resolve(Ok()); + } catch (err) { + return Err( + new CacheError({ + tier: this.name, + key, + message: (err as Error).message, + }) + ); + } + } +} diff --git a/internal-packages/cache/tsconfig.json b/internal-packages/cache/tsconfig.json new file mode 100644 index 0000000000..0104339620 --- /dev/null +++ b/internal-packages/cache/tsconfig.json @@ -0,0 +1,23 @@ +{ + "compilerOptions": { + "target": "ES2019", + "lib": ["ES2019", "DOM", "DOM.Iterable", "DOM.AsyncIterable"], + "module": "Node16", + "moduleResolution": "Node16", + "moduleDetection": "force", + "verbatimModuleSyntax": false, + "types": ["vitest/globals"], + "esModuleInterop": true, + "forceConsistentCasingInFileNames": true, + "isolatedModules": true, + "preserveWatchOutput": true, + "skipLibCheck": true, + "noEmit": true, + "strict": true, + "paths": { + "@trigger.dev/core": ["../../packages/core/src/index"], + "@trigger.dev/core/*": ["../../packages/core/src/*"] + } + }, + "exclude": ["node_modules"] +} diff --git a/internal-packages/run-engine/package.json b/internal-packages/run-engine/package.json index e509d2dede..62f0837897 100644 --- a/internal-packages/run-engine/package.json +++ b/internal-packages/run-engine/package.json @@ -25,7 +25,7 @@ "@internal/tracing": "workspace:*", "@trigger.dev/core": "workspace:*", "@trigger.dev/database": "workspace:*", - "@unkey/cache": "^1.5.0", + "@internal/cache": "workspace:*", "assert-never": "^1.2.1", "nanoid": "3.3.8", "redlock": "5.0.0-beta.2", diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 8fa677a1c1..7657431140 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -9,6 +9,7 @@ import { ExecutionResult, RunExecutionData, StartRunAttemptResult, + TaskRunContext, TaskRunExecutionResult, } from "@trigger.dev/core/v3"; import { RunId, WaitpointId } from "@trigger.dev/core/v3/isomorphic"; @@ -288,6 +289,7 @@ export class RunEngine { delayedRunSystem: this.delayedRunSystem, machines: this.options.machines, retryWarmStartThresholdMs: this.options.retryWarmStartThresholdMs, + redisOptions: this.options.cache?.redis ?? this.options.runLock.redis, }); this.dequeueSystem = new DequeueSystem({ @@ -1054,6 +1056,10 @@ export class RunEngine { } } + async resolveTaskRunContext(runId: string): Promise { + return this.runAttemptSystem.resolveTaskRunContext(runId); + } + async getSnapshotsSince({ runId, snapshotId, diff --git a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts index f56c9a6ebb..ce0f8abe4d 100644 --- a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts @@ -1,3 +1,12 @@ +import { + createCache, + DefaultStatefulContext, + MemoryStore, + Namespace, + RedisCacheStore, + UnkeyCache, +} from "@internal/cache"; +import { RedisOptions } from "@internal/redis"; import { startSpan } from "@internal/tracing"; import { tryCatch } from "@trigger.dev/core/utils"; import { @@ -5,9 +14,16 @@ import { ExecutionResult, FlushedRunMetadata, GitMeta, + MachinePreset, + MachinePresetName, StartRunAttemptResult, + TaskRunContext, TaskRunError, TaskRunExecution, + TaskRunExecutionDeployment, + TaskRunExecutionOrganization, + TaskRunExecutionProject, + TaskRunExecutionQueue, TaskRunExecutionResult, TaskRunFailedExecutionResult, TaskRunInternalError, @@ -23,7 +39,7 @@ import { import { MAX_TASK_RUN_ATTEMPTS } from "../consts.js"; import { runStatusFromError, ServiceValidationError } from "../errors.js"; import { sendNotificationToWorker } from "../eventBus.js"; -import { getMachinePreset } from "../machinePresets.js"; +import { getMachinePreset, machinePresetFromName } from "../machinePresets.js"; import { retryOutcomeFromCompletion } from "../retrying.js"; import { isExecuting, isInitialState } from "../statuses.js"; import { RunEngineOptions } from "../types.js"; @@ -36,6 +52,7 @@ import { } from "./executionSnapshotSystem.js"; import { SystemResources } from "./systems.js"; import { WaitpointSystem } from "./waitpointSystem.js"; +import { BatchId, RunId } from "@trigger.dev/core/v3/isomorphic"; export type RunAttemptSystemOptions = { resources: SystemResources; @@ -45,14 +62,54 @@ export type RunAttemptSystemOptions = { delayedRunSystem: DelayedRunSystem; retryWarmStartThresholdMs?: number; machines: RunEngineOptions["machines"]; + redisOptions: RedisOptions; +}; + +type BackwardsCompatibleTaskRunExecution = Omit & { + task: TaskRunExecution["task"] & { + exportName: string | undefined; + }; + attempt: TaskRunExecution["attempt"] & { + id: string; + backgroundWorkerId: string; + backgroundWorkerTaskId: string; + status: string; + }; + run: TaskRunExecution["run"] & { + context: undefined; + durationMs: number; + costInCents: number; + baseCostInCents: number; + }; }; +const ORG_FRESH_TTL = 60000 * 60 * 24; // 1 day +const ORG_STALE_TTL = 60000 * 60 * 24 * 2; // 2 days +const PROJECT_FRESH_TTL = 60000 * 60 * 24; // 1 day +const PROJECT_STALE_TTL = 60000 * 60 * 24 * 2; // 2 days +const TASK_FRESH_TTL = 60000 * 60 * 24; // 1 day +const TASK_STALE_TTL = 60000 * 60 * 24 * 2; // 2 days +const MACHINE_PRESET_FRESH_TTL = 60000 * 60 * 24; // 1 day +const MACHINE_PRESET_STALE_TTL = 60000 * 60 * 24 * 2; // 2 days +const DEPLOYMENT_FRESH_TTL = 60000 * 60 * 24; // 1 day +const DEPLOYMENT_STALE_TTL = 60000 * 60 * 24 * 2; // 2 days +const QUEUE_FRESH_TTL = 60000 * 60; // 1 hour +const QUEUE_STALE_TTL = 60000 * 60 * 2; // 2 hours + export class RunAttemptSystem { private readonly $: SystemResources; private readonly executionSnapshotSystem: ExecutionSnapshotSystem; private readonly batchSystem: BatchSystem; private readonly waitpointSystem: WaitpointSystem; private readonly delayedRunSystem: DelayedRunSystem; + private readonly cache: UnkeyCache<{ + tasks: BackwardsCompatibleTaskRunExecution["task"]; + machinePresets: MachinePreset; + deployments: TaskRunExecutionDeployment; + queues: TaskRunExecutionQueue; + projects: TaskRunExecutionProject; + orgs: TaskRunExecutionOrganization; + }>; constructor(private readonly options: RunAttemptSystemOptions) { this.$ = options.resources; @@ -60,6 +117,170 @@ export class RunAttemptSystem { this.batchSystem = options.batchSystem; this.waitpointSystem = options.waitpointSystem; this.delayedRunSystem = options.delayedRunSystem; + + const ctx = new DefaultStatefulContext(); + // TODO: use an LRU cache for memory store + const memory = new MemoryStore({ persistentMap: new Map() }); + const redisCacheStore = new RedisCacheStore({ + name: "run-attempt-system", + connection: { + ...options.redisOptions, + keyPrefix: "engine:run-attempt-system:cache:", + }, + useModernCacheKeyBuilder: true, + }); + + this.cache = createCache({ + orgs: new Namespace(ctx, { + stores: [memory, redisCacheStore], + fresh: ORG_FRESH_TTL, + stale: ORG_STALE_TTL, + }), + projects: new Namespace(ctx, { + stores: [memory, redisCacheStore], + fresh: PROJECT_FRESH_TTL, + stale: PROJECT_STALE_TTL, + }), + tasks: new Namespace(ctx, { + stores: [memory, redisCacheStore], + fresh: TASK_FRESH_TTL, + stale: TASK_STALE_TTL, + }), + machinePresets: new Namespace(ctx, { + stores: [memory, redisCacheStore], + fresh: MACHINE_PRESET_FRESH_TTL, + stale: MACHINE_PRESET_STALE_TTL, + }), + deployments: new Namespace(ctx, { + stores: [memory, redisCacheStore], + fresh: DEPLOYMENT_FRESH_TTL, + stale: DEPLOYMENT_STALE_TTL, + }), + queues: new Namespace(ctx, { + stores: [memory, redisCacheStore], + fresh: QUEUE_FRESH_TTL, + stale: QUEUE_STALE_TTL, + }), + }); + } + + public async resolveTaskRunContext(runId: string): Promise { + const run = await this.$.prisma.taskRun.findFirst({ + where: { + id: runId, + }, + select: { + id: true, + createdAt: true, + updatedAt: true, + executedAt: true, + baseCostInCents: true, + projectId: true, + organizationId: true, + friendlyId: true, + lockedById: true, + lockedQueueId: true, + queue: true, + attemptNumber: true, + status: true, + ttl: true, + machinePreset: true, + runTags: true, + isTest: true, + idempotencyKey: true, + startedAt: true, + maxAttempts: true, + taskVersion: true, + maxDurationInSeconds: true, + usageDurationMs: true, + costInCents: true, + traceContext: true, + priorityMs: true, + taskIdentifier: true, + runtimeEnvironment: { + select: { + id: true, + slug: true, + type: true, + branchName: true, + git: true, + organizationId: true, + }, + }, + parentTaskRunId: true, + rootTaskRunId: true, + batchId: true, + }, + }); + + if (!run) { + throw new ServiceValidationError("Task run not found", 404); + } + + const [task, queue, organization, project, machinePreset, deployment] = await Promise.all([ + run.lockedById + ? this.#resolveTaskRunExecutionTask(run.lockedById) + : Promise.resolve({ + id: run.taskIdentifier, + filePath: "unknown", + }), + this.#resolveTaskRunExecutionQueue({ + runId, + lockedQueueId: run.lockedQueueId ?? undefined, + queueName: run.queue, + runtimeEnvironmentId: run.runtimeEnvironment.id, + }), + this.#resolveTaskRunExecutionOrganization(run.runtimeEnvironment.organizationId), + this.#resolveTaskRunExecutionProjectByRuntimeEnvironmentId(run.runtimeEnvironment.id), + run.lockedById + ? this.#resolveTaskRunExecutionMachinePreset(run.lockedById, run.machinePreset) + : Promise.resolve( + getMachinePreset({ + defaultMachine: this.options.machines.defaultMachine, + machines: this.options.machines.machines, + config: undefined, + run, + }) + ), + run.lockedById + ? this.#resolveTaskRunExecutionDeployment(run.lockedById) + : Promise.resolve(undefined), + ]); + + return { + run: { + id: run.friendlyId, + tags: run.runTags, + isTest: run.isTest, + createdAt: run.createdAt, + startedAt: run.startedAt ?? run.createdAt, + idempotencyKey: run.idempotencyKey ?? undefined, + maxAttempts: run.maxAttempts ?? undefined, + version: run.taskVersion ?? "unknown", + maxDuration: run.maxDurationInSeconds ?? undefined, + priority: run.priorityMs === 0 ? undefined : run.priorityMs / 1_000, + parentTaskRunId: run.parentTaskRunId ? RunId.toFriendlyId(run.parentTaskRunId) : undefined, + rootTaskRunId: run.rootTaskRunId ? RunId.toFriendlyId(run.rootTaskRunId) : undefined, + }, + attempt: { + number: run.attemptNumber ?? 1, + startedAt: run.startedAt ?? new Date(), + }, + task, + queue, + organization, + project, + machine: machinePreset, + deployment, + environment: { + id: run.runtimeEnvironment.id, + slug: run.runtimeEnvironment.slug, + type: run.runtimeEnvironment.type, + branchName: run.runtimeEnvironment.branchName ?? undefined, + git: safeParseGitMeta(run.runtimeEnvironment.git), + }, + batch: run.batchId ? { id: BatchId.toFriendlyId(run.batchId) } : undefined, + }; } public async startRunAttempt({ @@ -95,35 +316,19 @@ export class RunAttemptSystem { throw new ServiceValidationError("Snapshot changed", 409); } - const environment = await this.#getAuthenticatedEnvironmentFromRun(runId, prisma); - if (!environment) { - throw new ServiceValidationError("Environment not found", 404); - } - const taskRun = await prisma.taskRun.findFirst({ where: { id: runId, }, - include: { - tags: true, - lockedBy: { - include: { - worker: { - select: { - id: true, - version: true, - sdkVersion: true, - cliVersion: true, - supportsLazyAttempts: true, - }, - }, - }, - }, - batchItems: { - include: { - batchTaskRun: true, - }, - }, + select: { + id: true, + friendlyId: true, + attemptNumber: true, + projectId: true, + runtimeEnvironmentId: true, + status: true, + lockedById: true, + ttl: true, }, }); @@ -142,21 +347,10 @@ export class RunAttemptSystem { throw new ServiceValidationError("Task run is cancelled", 400); } - if (!taskRun.lockedBy) { + if (!taskRun.lockedById) { throw new ServiceValidationError("Task run is not locked", 400); } - const queue = await prisma.taskQueue.findFirst({ - where: { - runtimeEnvironmentId: environment.id, - name: taskRun.queue, - }, - }); - - if (!queue) { - throw new ServiceValidationError("Queue not found", 404); - } - //increment the attempt number (start at 1) const nextAttemptNumber = (taskRun.attemptNumber ?? 0) + 1; @@ -190,11 +384,50 @@ export class RunAttemptSystem { attemptNumber: nextAttemptNumber, executedAt: taskRun.attemptNumber === null ? new Date() : undefined, }, - include: { - tags: true, - lockedBy: { - include: { worker: true }, + select: { + id: true, + createdAt: true, + updatedAt: true, + executedAt: true, + baseCostInCents: true, + projectId: true, + organizationId: true, + friendlyId: true, + lockedById: true, + lockedQueueId: true, + queue: true, + attemptNumber: true, + status: true, + ttl: true, + metadata: true, + metadataType: true, + machinePreset: true, + payload: true, + payloadType: true, + runTags: true, + isTest: true, + idempotencyKey: true, + startedAt: true, + maxAttempts: true, + taskVersion: true, + maxDurationInSeconds: true, + usageDurationMs: true, + costInCents: true, + traceContext: true, + priorityMs: true, + batchId: true, + runtimeEnvironment: { + select: { + id: true, + slug: true, + type: true, + branchName: true, + git: true, + organizationId: true, + }, }, + parentTaskRunId: true, + rootTaskRunId: true, }, }); @@ -222,7 +455,7 @@ export class RunAttemptSystem { await this.$.worker.ack(`expireRun:${taskRun.id}`); } - return { run, snapshot: newSnapshot }; + return { updatedRun: run, snapshot: newSnapshot }; }, (error) => { this.$.logger.error("RunEngine.createRunAttempt(): prisma.$transaction error", { @@ -247,56 +480,59 @@ export class RunAttemptSystem { throw new ServiceValidationError("Failed to create task run attempt", 500); } - const { run, snapshot } = result; + const { updatedRun, snapshot } = result; this.$.eventBus.emit("runAttemptStarted", { time: new Date(), run: { - id: run.id, - status: run.status, - createdAt: run.createdAt, - updatedAt: run.updatedAt, + id: updatedRun.id, + status: updatedRun.status, + createdAt: updatedRun.createdAt, + updatedAt: updatedRun.updatedAt, attemptNumber: nextAttemptNumber, - baseCostInCents: run.baseCostInCents, - executedAt: run.executedAt ?? undefined, + baseCostInCents: updatedRun.baseCostInCents, + executedAt: updatedRun.executedAt ?? undefined, }, organization: { - id: environment.organization.id, + id: updatedRun.runtimeEnvironment.organizationId, }, project: { - id: environment.project.id, + id: updatedRun.projectId, }, environment: { - id: environment.id, + id: updatedRun.runtimeEnvironment.id, }, }); - const machinePreset = getMachinePreset({ - machines: this.options.machines.machines, - defaultMachine: this.options.machines.defaultMachine, - config: taskRun.lockedBy.machineConfig ?? {}, - run: taskRun, - }); - - const metadata = await parsePacket({ - data: taskRun.metadata ?? undefined, - dataType: taskRun.metadataType, - }); - - let git: GitMeta | undefined = undefined; - if (environment.git) { - const parsed = GitMeta.safeParse(environment.git); - if (parsed.success) { - git = parsed.data; - } - } + const environmentGit = safeParseGitMeta(updatedRun.runtimeEnvironment.git); - const execution: TaskRunExecution = { - task: { - id: run.lockedBy!.slug, - filePath: run.lockedBy!.filePath, - exportName: run.lockedBy!.exportName ?? undefined, - }, + const [metadata, task, queue, organization, project, machinePreset, deployment] = + await Promise.all([ + parsePacket({ + data: updatedRun.metadata ?? undefined, + dataType: updatedRun.metadataType, + }), + this.#resolveTaskRunExecutionTask(taskRun.lockedById), + this.#resolveTaskRunExecutionQueue({ + runId, + lockedQueueId: updatedRun.lockedQueueId ?? undefined, + queueName: updatedRun.queue, + runtimeEnvironmentId: updatedRun.runtimeEnvironment.id, + }), + this.#resolveTaskRunExecutionOrganization( + updatedRun.runtimeEnvironment.organizationId + ), + this.#resolveTaskRunExecutionProjectByRuntimeEnvironmentId( + updatedRun.runtimeEnvironment.id + ), + this.#resolveTaskRunExecutionMachinePreset( + taskRun.lockedById, + updatedRun.machinePreset + ), + this.#resolveTaskRunExecutionDeployment(taskRun.lockedById), + ]); + + const execution: BackwardsCompatibleTaskRunExecution = { attempt: { number: nextAttemptNumber, startedAt: latestSnapshot.updatedAt, @@ -310,59 +546,56 @@ export class RunAttemptSystem { status: "deprecated", }, run: { - id: run.friendlyId, - payload: run.payload, - payloadType: run.payloadType, - createdAt: run.createdAt, - tags: run.tags.map((tag) => tag.name), - isTest: run.isTest, - idempotencyKey: run.idempotencyKey ?? undefined, - startedAt: run.startedAt ?? run.createdAt, - maxAttempts: run.maxAttempts ?? undefined, - version: run.lockedBy!.worker.version, + id: updatedRun.friendlyId, + payload: updatedRun.payload, + payloadType: updatedRun.payloadType, + createdAt: updatedRun.createdAt, + tags: updatedRun.runTags, + isTest: updatedRun.isTest, + idempotencyKey: updatedRun.idempotencyKey ?? undefined, + startedAt: updatedRun.startedAt ?? updatedRun.createdAt, + maxAttempts: updatedRun.maxAttempts ?? undefined, + version: updatedRun.taskVersion ?? "unknown", metadata, - maxDuration: run.maxDurationInSeconds ?? undefined, + maxDuration: updatedRun.maxDurationInSeconds ?? undefined, /** @deprecated */ context: undefined, /** @deprecated */ - durationMs: run.usageDurationMs, + durationMs: updatedRun.usageDurationMs, /** @deprecated */ - costInCents: run.costInCents, + costInCents: updatedRun.costInCents, /** @deprecated */ - baseCostInCents: run.baseCostInCents, - traceContext: run.traceContext as Record, - priority: run.priorityMs === 0 ? undefined : run.priorityMs / 1_000, - }, - queue: { - id: queue.friendlyId, - name: queue.name, + baseCostInCents: updatedRun.baseCostInCents, + traceContext: updatedRun.traceContext as Record, + priority: updatedRun.priorityMs === 0 ? undefined : updatedRun.priorityMs / 1_000, + parentTaskRunId: updatedRun.parentTaskRunId + ? RunId.toFriendlyId(updatedRun.parentTaskRunId) + : undefined, + rootTaskRunId: updatedRun.rootTaskRunId + ? RunId.toFriendlyId(updatedRun.rootTaskRunId) + : undefined, }, + task, + queue, environment: { - id: environment.id, - slug: environment.slug, - type: environment.type, - branchName: environment.branchName ?? undefined, - git, - }, - organization: { - id: environment.organization.id, - slug: environment.organization.slug, - name: environment.organization.title, + id: updatedRun.runtimeEnvironment.id, + slug: updatedRun.runtimeEnvironment.slug, + type: updatedRun.runtimeEnvironment.type, + branchName: updatedRun.runtimeEnvironment.branchName ?? undefined, + git: environmentGit, }, - project: { - id: environment.project.id, - ref: environment.project.externalRef, - slug: environment.project.slug, - name: environment.project.name, - }, - batch: - taskRun.batchItems[0] && taskRun.batchItems[0].batchTaskRun - ? { id: taskRun.batchItems[0].batchTaskRun.friendlyId } - : undefined, + organization, + project, machine: machinePreset, + deployment, + batch: updatedRun.batchId + ? { + id: BatchId.toFriendlyId(updatedRun.batchId), + } + : undefined, }; - return { run, snapshot, execution }; + return { run: updatedRun, snapshot, execution }; }); }, { @@ -1311,27 +1544,248 @@ export class RunAttemptSystem { await this.$.worker.ack(`heartbeatSnapshot.${id}`); } - async #getAuthenticatedEnvironmentFromRun(runId: string, tx?: PrismaClientOrTransaction) { - const prisma = tx ?? this.$.prisma; - const taskRun = await prisma.taskRun.findFirst({ - where: { - id: runId, - }, - include: { - runtimeEnvironment: { - include: { - organization: true, - project: true, + async #resolveTaskRunExecutionTask( + backgroundWorkerTaskId: string + ): Promise { + const result = await this.cache.tasks.swr(backgroundWorkerTaskId, async () => { + const task = await this.$.prisma.backgroundWorkerTask.findFirstOrThrow({ + where: { + id: backgroundWorkerTaskId, + }, + select: { + id: true, + slug: true, + filePath: true, + exportName: true, + }, + }); + + return { + id: task.slug, + filePath: task.filePath, + exportName: task.exportName ?? undefined, + }; + }); + + if (result.err) { + throw result.err; + } + + if (!result.val) { + throw new ServiceValidationError( + `Could not resolve task execution data for task ${backgroundWorkerTaskId}` + ); + } + + return result.val; + } + + async #resolveTaskRunExecutionOrganization( + organizationId: string + ): Promise { + const result = await this.cache.orgs.swr(organizationId, async () => { + const organization = await this.$.prisma.organization.findFirstOrThrow({ + where: { id: organizationId }, + select: { + id: true, + title: true, + slug: true, + }, + }); + + return { + id: organization.id, + name: organization.title, + slug: organization.slug, + }; + }); + + if (result.err) { + throw result.err; + } + + if (!result.val) { + throw new ServiceValidationError( + `Could not resolve organization data for organization ${organizationId}` + ); + } + + return result.val; + } + + async #resolveTaskRunExecutionProjectByRuntimeEnvironmentId( + runtimeEnvironmentId: string + ): Promise { + const result = await this.cache.projects.swr(runtimeEnvironmentId, async () => { + const { project } = await this.$.prisma.runtimeEnvironment.findFirstOrThrow({ + where: { id: runtimeEnvironmentId }, + select: { + id: true, + project: { + select: { + id: true, + name: true, + slug: true, + externalRef: true, + }, }, }, - }, + }); + + return { + id: project.id, + name: project.name, + slug: project.slug, + ref: project.externalRef, + }; }); - if (!taskRun) { - return; + if (result.err) { + throw result.err; + } + + if (!result.val) { + throw new ServiceValidationError( + `Could not resolve project data for project ${runtimeEnvironmentId}` + ); + } + + return result.val; + } + + async #resolveTaskRunExecutionMachinePreset( + backgroundWorkerTaskId: string, + runMachinePreset: string | null + ): Promise { + if (runMachinePreset) { + return machinePresetFromName( + this.options.machines.machines, + runMachinePreset as MachinePresetName + ); + } + + const result = await this.cache.machinePresets.swr(backgroundWorkerTaskId, async () => { + const { machineConfig } = await this.$.prisma.backgroundWorkerTask.findFirstOrThrow({ + where: { + id: backgroundWorkerTaskId, + }, + select: { + machineConfig: true, + }, + }); + + return getMachinePreset({ + machines: this.options.machines.machines, + defaultMachine: this.options.machines.defaultMachine, + config: machineConfig, + run: { machinePreset: null }, + }); + }); + + if (result.err) { + throw result.err; + } + + if (!result.val) { + throw new ServiceValidationError( + `Could not resolve machine preset for task ${backgroundWorkerTaskId}` + ); + } + + return result.val; + } + + async #resolveTaskRunExecutionQueue(params: { + runId: string; + lockedQueueId?: string; + queueName: string; + runtimeEnvironmentId: string; + }): Promise { + const result = await this.cache.queues.swr(params.runId, async () => { + const queue = params.lockedQueueId + ? await this.$.prisma.taskQueue.findFirst({ + where: { + id: params.lockedQueueId, + }, + select: { + id: true, + friendlyId: true, + name: true, + }, + }) + : await this.$.prisma.taskQueue.findFirst({ + where: { + runtimeEnvironmentId: params.runtimeEnvironmentId, + name: params.queueName, + }, + select: { + id: true, + friendlyId: true, + name: true, + }, + }); + + if (!queue) { + throw new ServiceValidationError( + `Could not resolve queue data for queue ${params.queueName}`, + 404 + ); + } + + return { + id: queue.friendlyId, + name: queue.name, + }; + }); + + if (result.err) { + throw result.err; } - return taskRun?.runtimeEnvironment; + if (!result.val) { + throw new ServiceValidationError( + `Could not resolve queue data for queue ${params.queueName}`, + 404 + ); + } + + return result.val; + } + + async #resolveTaskRunExecutionDeployment( + backgroundWorkerTaskId: string + ): Promise { + const result = await this.cache.deployments.swr(backgroundWorkerTaskId, async () => { + const { worker } = await this.$.prisma.backgroundWorkerTask.findFirstOrThrow({ + where: { id: backgroundWorkerTaskId }, + select: { + worker: { + select: { + deployment: true, + }, + }, + }, + }); + + if (!worker.deployment) { + return undefined; + } + + return { + id: worker.deployment.friendlyId, + shortCode: worker.deployment.shortCode, + version: worker.deployment.version, + runtime: worker.deployment.runtime ?? "unknown", + runtimeVersion: worker.deployment.runtimeVersion ?? "unknown", + git: safeParseGitMeta(worker.deployment.git), + }; + }); + + if (result.err) { + throw result.err; + } + + return result.val; } async #notifyMetadataUpdated(runId: string, completion: TaskRunExecutionResult) { @@ -1386,3 +1840,11 @@ export class RunAttemptSystem { } } } + +export function safeParseGitMeta(git: unknown): GitMeta | undefined { + const parsed = GitMeta.safeParse(git); + if (parsed.success) { + return parsed.data; + } + return undefined; +} diff --git a/internal-packages/run-engine/src/engine/tests/heartbeats.test.ts b/internal-packages/run-engine/src/engine/tests/heartbeats.test.ts index 9983415f51..c9654c612d 100644 --- a/internal-packages/run-engine/src/engine/tests/heartbeats.test.ts +++ b/internal-packages/run-engine/src/engine/tests/heartbeats.test.ts @@ -639,7 +639,7 @@ describe("RunEngine heartbeats", () => { containerTest("Heartbeat keeps run alive", async ({ prisma, redisOptions }) => { const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); - const executingTimeout = 100; + const executingTimeout = 500; const engine = new RunEngine({ prisma, @@ -726,24 +726,23 @@ describe("RunEngine heartbeats", () => { expect(executionData.snapshot.executionStatus).toBe("EXECUTING"); expect(executionData.run.status).toBe("EXECUTING"); - // Send heartbeats every 50ms (half the timeout) - for (let i = 0; i < 6; i++) { - await setTimeout(50); + // Send heartbeats every 100ms (to make sure we're not timing out) + for (let i = 0; i < 5; i++) { + await setTimeout(100); await engine.heartbeatRun({ runId: run.id, snapshotId: attempt.snapshot.id, }); } - // After 300ms (3x the timeout) the run should still be executing - // because we've been sending heartbeats + // Should still be executing because we're sending heartbeats const executionData2 = await engine.getRunExecutionData({ runId: run.id }); assertNonNullable(executionData2); expect(executionData2.snapshot.executionStatus).toBe("EXECUTING"); expect(executionData2.run.status).toBe("EXECUTING"); // Stop sending heartbeats and wait for timeout - await setTimeout(executingTimeout * 3); + await setTimeout(executingTimeout * 2); // Now it should have timed out and be queued const executionData3 = await engine.getRunExecutionData({ runId: run.id }); diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index 5884e0ab9b..e15f90b1c5 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -58,6 +58,9 @@ export type RunEngineOptions = { automaticExtensionThreshold?: number; retryConfig?: LockRetryConfig; }; + cache?: { + redis: RedisOptions; + }; /** If not set then checkpoints won't ever be used */ retryWarmStartThresholdMs?: number; heartbeatTimeoutsMs?: Partial; diff --git a/internal-packages/run-engine/src/run-queue/fairQueueSelectionStrategy.ts b/internal-packages/run-engine/src/run-queue/fairQueueSelectionStrategy.ts index 5f2f031bdb..b67e77d151 100644 --- a/internal-packages/run-engine/src/run-queue/fairQueueSelectionStrategy.ts +++ b/internal-packages/run-engine/src/run-queue/fairQueueSelectionStrategy.ts @@ -1,7 +1,12 @@ import { createRedisClient, Redis, type RedisOptions } from "@internal/redis"; import { startSpan, type Tracer } from "@internal/tracing"; -import { createCache, DefaultStatefulContext, Namespace, Cache as UnkeyCache } from "@unkey/cache"; -import { MemoryStore } from "@unkey/cache/stores"; +import { + createCache, + DefaultStatefulContext, + Namespace, + type UnkeyCache, + MemoryStore, +} from "@internal/cache"; import { randomUUID } from "crypto"; import seedrandom from "seedrandom"; import { diff --git a/packages/cli-v3/src/entryPoints/dev-run-worker.ts b/packages/cli-v3/src/entryPoints/dev-run-worker.ts index 4e220c9a19..8a148eaa71 100644 --- a/packages/cli-v3/src/entryPoints/dev-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/dev-run-worker.ts @@ -484,6 +484,8 @@ const zodIpc = new ZodIpcConnection({ } runMetadataManager.runId = execution.run.id; + runMetadataManager.runIdIsRoot = typeof execution.run.rootTaskRunId === "undefined"; + _executionCount++; const executor = new TaskExecutor(task, { @@ -503,6 +505,11 @@ const zodIpc = new ZodIpcConnection({ getNumberEnvVar("TRIGGER_RUN_METADATA_FLUSH_INTERVAL", 1000) ); + devUsageManager.setInitialState({ + cpuTime: execution.run.durationMs ?? 0, + costInCents: execution.run.costInCents ?? 0, + }); + _executionMeasurement = usage.start(); const timeoutController = timeout.abortAfterTimeout(execution.run.maxDuration); diff --git a/packages/cli-v3/src/entryPoints/managed-run-worker.ts b/packages/cli-v3/src/entryPoints/managed-run-worker.ts index 294e2c741a..21ed6a265f 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-worker.ts @@ -328,12 +328,17 @@ const zodIpc = new ZodIpcConnection({ resetExecutionEnvironment(); - initializeUsageManager({ + const prodManager = initializeUsageManager({ usageIntervalMs: getEnvVar("USAGE_HEARTBEAT_INTERVAL_MS"), usageEventUrl: getEnvVar("USAGE_EVENT_URL"), triggerJWT: getEnvVar("TRIGGER_JWT"), }); + prodManager.setInitialState({ + cpuTime: execution.run.durationMs ?? 0, + costInCents: execution.run.costInCents ?? 0, + }); + standardRunTimelineMetricsManager.registerMetricsFromExecution(metrics, isWarmStart); console.log(`[${new Date().toISOString()}] Received EXECUTE_TASK_RUN`, execution); @@ -483,6 +488,7 @@ const zodIpc = new ZodIpcConnection({ } runMetadataManager.runId = execution.run.id; + runMetadataManager.runIdIsRoot = typeof execution.run.rootTaskRunId === "undefined"; _executionCount++; const executor = new TaskExecutor(task, { @@ -689,6 +695,8 @@ function initializeUsageManager({ usage.setGlobalUsageManager(prodUsageManager); timeout.setGlobalManager(new UsageTimeoutManager(devUsageManager)); + + return prodUsageManager; } _sharedWorkerRuntime = new SharedRuntimeManager(zodIpc, true); diff --git a/packages/core/src/v3/isomorphic/friendlyId.ts b/packages/core/src/v3/isomorphic/friendlyId.ts index 7b4f8a7f3d..90fa31bd57 100644 --- a/packages/core/src/v3/isomorphic/friendlyId.ts +++ b/packages/core/src/v3/isomorphic/friendlyId.ts @@ -95,6 +95,7 @@ export const SnapshotId = new IdUtil("snapshot"); export const WaitpointId = new IdUtil("waitpoint"); export const BatchId = new IdUtil("batch"); export const BulkActionId = new IdUtil("bulk"); +export const AttemptId = new IdUtil("attempt"); export class IdGenerator { private alphabet: string; diff --git a/packages/core/src/v3/runMetadata/manager.ts b/packages/core/src/v3/runMetadata/manager.ts index 75cdd23500..03f2d6f244 100644 --- a/packages/core/src/v3/runMetadata/manager.ts +++ b/packages/core/src/v3/runMetadata/manager.ts @@ -24,6 +24,7 @@ export class StandardMetadataManager implements RunMetadataManager { private queuedRootOperations: Set = new Set(); public runId: string | undefined; + public runIdIsRoot: boolean = false; constructor( private apiClient: ApiClient, @@ -38,6 +39,7 @@ export class StandardMetadataManager implements RunMetadataManager { this.activeStreams.clear(); this.store = undefined; this.runId = undefined; + this.runIdIsRoot = false; if (this.flushTimeoutId) { clearTimeout(this.flushTimeoutId); @@ -54,34 +56,76 @@ export class StandardMetadataManager implements RunMetadataManager { // Create the updater object and store it in a local variable const parentUpdater: RunMetadataUpdater = { set: (key, value) => { + // We have to check runIdIsRoot here because parent/root are executed before runIdIsRoot is set + if (self.runIdIsRoot) { + return self.set(key, value); + } + self.queuedParentOperations.add({ type: "set", key, value }); return parentUpdater; }, del: (key) => { + // We have to check runIdIsRoot here because parent/root are executed before runIdIsRoot is set + if (self.runIdIsRoot) { + return self.del(key); + } + self.queuedParentOperations.add({ type: "delete", key }); return parentUpdater; }, append: (key, value) => { + // We have to check runIdIsRoot here because parent/root are executed before runIdIsRoot is set + if (self.runIdIsRoot) { + return self.append(key, value); + } + self.queuedParentOperations.add({ type: "append", key, value }); return parentUpdater; }, remove: (key, value) => { + // We have to check runIdIsRoot here because parent/root are executed before runIdIsRoot is set + if (self.runIdIsRoot) { + return self.remove(key, value); + } + self.queuedParentOperations.add({ type: "remove", key, value }); return parentUpdater; }, increment: (key, value) => { + // We have to check runIdIsRoot here because parent/root are executed before runIdIsRoot is set + if (self.runIdIsRoot) { + return self.increment(key, value); + } + self.queuedParentOperations.add({ type: "increment", key, value }); return parentUpdater; }, decrement: (key, value) => { + // We have to check runIdIsRoot here because parent/root are executed before runIdIsRoot is set + if (self.runIdIsRoot) { + return self.decrement(key, value); + } + self.queuedParentOperations.add({ type: "increment", key, value: -Math.abs(value) }); return parentUpdater; }, update: (value) => { + // We have to check runIdIsRoot here because parent/root are executed before runIdIsRoot is set + if (self.runIdIsRoot) { + return self.update(value); + } + self.queuedParentOperations.add({ type: "update", value }); return parentUpdater; }, - stream: (key, value, signal) => self.doStream(key, value, "parent", parentUpdater, signal), + stream: (key, value, signal) => { + // We have to check runIdIsRoot here because parent/root are executed before runIdIsRoot is set + if (self.runIdIsRoot) { + return self.doStream(key, value, "self", parentUpdater, signal); + } + + return self.doStream(key, value, "parent", parentUpdater, signal); + }, }; return parentUpdater; @@ -94,34 +138,76 @@ export class StandardMetadataManager implements RunMetadataManager { // Create the updater object and store it in a local variable const rootUpdater: RunMetadataUpdater = { set: (key, value) => { + // We have to check runIdIsRoot here because parent/root are executed before runIdIsRoot is set + if (self.runIdIsRoot) { + return self.set(key, value); + } + self.queuedRootOperations.add({ type: "set", key, value }); return rootUpdater; }, del: (key) => { + // We have to check runIdIsRoot here because parent/root are executed before runIdIsRoot is set + if (self.runIdIsRoot) { + return self.del(key); + } + self.queuedRootOperations.add({ type: "delete", key }); return rootUpdater; }, append: (key, value) => { + // We have to check runIdIsRoot here because parent/root are executed before runIdIsRoot is set + if (self.runIdIsRoot) { + return self.append(key, value); + } + self.queuedRootOperations.add({ type: "append", key, value }); return rootUpdater; }, remove: (key, value) => { + // We have to check runIdIsRoot here because parent/root are executed before runIdIsRoot is set + if (self.runIdIsRoot) { + return self.remove(key, value); + } + self.queuedRootOperations.add({ type: "remove", key, value }); return rootUpdater; }, increment: (key, value) => { + // We have to check runIdIsRoot here because parent/root are executed before runIdIsRoot is set + if (self.runIdIsRoot) { + return self.increment(key, value); + } + self.queuedRootOperations.add({ type: "increment", key, value }); return rootUpdater; }, decrement: (key, value) => { + // We have to check runIdIsRoot here because parent/root are executed before runIdIsRoot is set + if (self.runIdIsRoot) { + return self.decrement(key, value); + } + self.queuedRootOperations.add({ type: "increment", key, value: -Math.abs(value) }); return rootUpdater; }, update: (value) => { + // We have to check runIdIsRoot here because parent/root are executed before runIdIsRoot is set + if (self.runIdIsRoot) { + return self.update(value); + } + self.queuedRootOperations.add({ type: "update", value }); return rootUpdater; }, - stream: (key, value, signal) => self.doStream(key, value, "root", rootUpdater, signal), + stream: (key, value, signal) => { + // We have to check runIdIsRoot here because parent/root are executed before runIdIsRoot is set + if (self.runIdIsRoot) { + return self.doStream(key, value, "self", rootUpdater, signal); + } + + return self.doStream(key, value, "root", rootUpdater, signal); + }, }; return rootUpdater; diff --git a/packages/core/src/v3/schemas/common.ts b/packages/core/src/v3/schemas/common.ts index e56ec8da6d..c80d6a8ce7 100644 --- a/packages/core/src/v3/schemas/common.ts +++ b/packages/core/src/v3/schemas/common.ts @@ -219,49 +219,19 @@ export const TaskRun = z.object({ version: z.string().optional(), metadata: z.record(DeserializedJsonSchema).optional(), maxDuration: z.number().optional(), - /** @deprecated */ - context: z.any(), - /** - * @deprecated For live values use the `usage` SDK functions - * @link https://trigger.dev/docs/run-usage - */ - durationMs: z.number().default(0), - /** - * @deprecated For live values use the `usage` SDK functions - * @link https://trigger.dev/docs/run-usage - */ - costInCents: z.number().default(0), - /** - * @deprecated For live values use the `usage` SDK functions - * @link https://trigger.dev/docs/run-usage - */ - baseCostInCents: z.number().default(0), /** The priority of the run. Wih a value of 10 it will be dequeued before runs that were triggered 9 seconds before it (assuming they had no priority set). */ priority: z.number().optional(), -}); + baseCostInCents: z.number().optional(), -export type TaskRun = z.infer; + parentTaskRunId: z.string().optional(), + rootTaskRunId: z.string().optional(), -export const TaskRunExecutionTask = z.object({ - id: z.string(), - filePath: z.string(), - exportName: z.string().optional(), + // These are only used during execution, not in run.ctx + durationMs: z.number().optional(), + costInCents: z.number().optional(), }); -export type TaskRunExecutionTask = z.infer; - -export const TaskRunExecutionAttempt = z.object({ - number: z.number(), - startedAt: z.coerce.date(), - /** @deprecated */ - id: z.string(), - /** @deprecated */ - backgroundWorkerId: z.string(), - /** @deprecated */ - backgroundWorkerTaskId: z.string(), - /** @deprecated */ - status: z.string(), -}); +export type TaskRun = z.infer; export const GitMeta = z.object({ commitAuthorName: z.string().optional(), @@ -277,6 +247,18 @@ export const GitMeta = z.object({ export type GitMeta = z.infer; +export const TaskRunExecutionTask = z.object({ + id: z.string(), + filePath: z.string(), +}); + +export type TaskRunExecutionTask = z.infer; + +export const TaskRunExecutionAttempt = z.object({ + number: z.number(), + startedAt: z.coerce.date(), +}); + export type TaskRunExecutionAttempt = z.infer; export const TaskRunExecutionEnvironment = z.object({ @@ -317,40 +299,144 @@ export const TaskRunExecutionBatch = z.object({ id: z.string(), }); -export const TaskRunExecution = z.object({ +export const TaskRunExecutionDeployment = z.object({ + id: z.string(), + shortCode: z.string(), + version: z.string(), + runtime: z.string(), + runtimeVersion: z.string(), + git: GitMeta.optional(), +}); + +export type TaskRunExecutionDeployment = z.infer; + +const StaticTaskRunExecutionShape = { task: TaskRunExecutionTask, + queue: TaskRunExecutionQueue, + environment: TaskRunExecutionEnvironment, + organization: TaskRunExecutionOrganization, + project: TaskRunExecutionProject, + machine: MachinePreset, + batch: TaskRunExecutionBatch.optional(), + deployment: TaskRunExecutionDeployment.optional(), +}; + +export const StaticTaskRunExecution = z.object(StaticTaskRunExecutionShape); + +export type StaticTaskRunExecution = z.infer; + +export const TaskRunExecution = z.object({ attempt: TaskRunExecutionAttempt, run: TaskRun.and( z.object({ traceContext: z.record(z.unknown()).optional(), }) ), + ...StaticTaskRunExecutionShape, +}); + +export type TaskRunExecution = z.infer; + +export const V3TaskRunExecutionTask = z.object({ + id: z.string(), + filePath: z.string(), + exportName: z.string().optional(), +}); + +export type V3TaskRunExecutionTask = z.infer; + +export const V3TaskRunExecutionAttempt = z.object({ + number: z.number(), + startedAt: z.coerce.date(), + id: z.string(), + backgroundWorkerId: z.string(), + backgroundWorkerTaskId: z.string(), + status: z.string(), +}); + +export type V3TaskRunExecutionAttempt = z.infer; + +export const V3TaskRun = z.object({ + id: z.string(), + payload: z.string(), + payloadType: z.string(), + tags: z.array(z.string()), + isTest: z.boolean().default(false), + createdAt: z.coerce.date(), + startedAt: z.coerce.date().default(() => new Date()), + idempotencyKey: z.string().optional(), + maxAttempts: z.number().optional(), + version: z.string().optional(), + metadata: z.record(DeserializedJsonSchema).optional(), + maxDuration: z.number().optional(), + context: z.unknown(), + durationMs: z.number(), + costInCents: z.number(), + baseCostInCents: z.number(), +}); + +export type V3TaskRun = z.infer; + +export const V3TaskRunExecution = z.object({ + task: V3TaskRunExecutionTask, + attempt: V3TaskRunExecutionAttempt, + run: V3TaskRun.and( + z.object({ + traceContext: z.record(z.unknown()).optional(), + }) + ), queue: TaskRunExecutionQueue, environment: TaskRunExecutionEnvironment, organization: TaskRunExecutionOrganization, project: TaskRunExecutionProject, - batch: TaskRunExecutionBatch.optional(), machine: MachinePreset, + batch: TaskRunExecutionBatch.optional(), }); -export type TaskRunExecution = z.infer; +export type V3TaskRunExecution = z.infer; export const TaskRunContext = z.object({ - task: TaskRunExecutionTask, - attempt: TaskRunExecutionAttempt.omit({ + attempt: TaskRunExecutionAttempt, + run: TaskRun.omit({ + payload: true, + payloadType: true, + metadata: true, + durationMs: true, + costInCents: true, + }), + ...StaticTaskRunExecutionShape, +}); + +export type TaskRunContext = z.infer; + +export const V3TaskRunExecutionEnvironment = z.object({ + id: z.string(), + slug: z.string(), + type: z.enum(["PRODUCTION", "STAGING", "DEVELOPMENT", "PREVIEW"]), +}); + +export type V3TaskRunExecutionEnvironment = z.infer; + +export const V3TaskRunContext = z.object({ + attempt: V3TaskRunExecutionAttempt.omit({ backgroundWorkerId: true, backgroundWorkerTaskId: true, }), - run: TaskRun.omit({ payload: true, payloadType: true, metadata: true }), + run: V3TaskRun.omit({ + payload: true, + payloadType: true, + metadata: true, + }), + task: V3TaskRunExecutionTask, queue: TaskRunExecutionQueue, - environment: TaskRunExecutionEnvironment, + environment: V3TaskRunExecutionEnvironment, organization: TaskRunExecutionOrganization, project: TaskRunExecutionProject, batch: TaskRunExecutionBatch.optional(), machine: MachinePreset.optional(), }); -export type TaskRunContext = z.infer; +export type V3TaskRunContext = z.infer; export const TaskRunExecutionRetry = z.object({ timestamp: z.number(), diff --git a/packages/core/src/v3/schemas/messages.ts b/packages/core/src/v3/schemas/messages.ts index 6c813b357e..72a78deb3c 100644 --- a/packages/core/src/v3/schemas/messages.ts +++ b/packages/core/src/v3/schemas/messages.ts @@ -6,12 +6,13 @@ import { TaskRunExecutionResult, TaskRunFailedExecutionResult, TaskRunInternalError, + V3TaskRunExecution, } from "./common.js"; import { TaskResource } from "./resources.js"; import { EnvironmentType, - ProdTaskRunExecution, - ProdTaskRunExecutionPayload, + V3ProdTaskRunExecution, + V3ProdTaskRunExecutionPayload, RunEngineVersionSchema, TaskRunExecutionLazyAttemptPayload, TaskRunExecutionMetrics, @@ -83,7 +84,7 @@ export const BackgroundWorkerClientMessages = z.discriminatedUnion("type", [ version: z.literal("v1").default("v1"), type: z.literal("TASK_RUN_COMPLETED"), completion: TaskRunExecutionResult, - execution: TaskRunExecution, + execution: V3TaskRunExecution, }), z.object({ version: z.literal("v1").default("v1"), @@ -368,7 +369,7 @@ export const CoordinatorToPlatformMessages = { }), z.object({ success: z.literal(true), - executionPayload: ProdTaskRunExecutionPayload, + executionPayload: V3ProdTaskRunExecutionPayload, }), ]), }, @@ -385,7 +386,7 @@ export const CoordinatorToPlatformMessages = { }), z.object({ success: z.literal(true), - payload: ProdTaskRunExecutionPayload, + payload: V3ProdTaskRunExecutionPayload, }), ]), }, @@ -417,7 +418,7 @@ export const CoordinatorToPlatformMessages = { TASK_RUN_COMPLETED: { message: z.object({ version: z.enum(["v1", "v2"]).default("v1"), - execution: ProdTaskRunExecution, + execution: V3ProdTaskRunExecution, completion: TaskRunExecutionResult, checkpoint: z .object({ @@ -430,7 +431,7 @@ export const CoordinatorToPlatformMessages = { TASK_RUN_COMPLETED_WITH_ACK: { message: z.object({ version: z.enum(["v1", "v2"]).default("v2"), - execution: ProdTaskRunExecution, + execution: V3ProdTaskRunExecution, completion: TaskRunExecutionResult, checkpoint: z .object({ @@ -720,7 +721,7 @@ export const ProdWorkerToCoordinatorMessages = { TASK_RUN_COMPLETED: { message: z.object({ version: z.enum(["v1", "v2"]).default("v1"), - execution: ProdTaskRunExecution, + execution: V3ProdTaskRunExecution, completion: TaskRunExecutionResult, }), callback: z.object({ @@ -792,7 +793,7 @@ export const ProdWorkerToCoordinatorMessages = { }), z.object({ success: z.literal(true), - executionPayload: ProdTaskRunExecutionPayload, + executionPayload: V3ProdTaskRunExecutionPayload, }), ]), }, @@ -835,7 +836,7 @@ export const CoordinatorToProdWorkerMessages = { EXECUTE_TASK_RUN: { message: z.object({ version: z.literal("v1").default("v1"), - executionPayload: ProdTaskRunExecutionPayload, + executionPayload: V3ProdTaskRunExecutionPayload, }), }, EXECUTE_TASK_RUN_LAZY_ATTEMPT: { diff --git a/packages/core/src/v3/schemas/schemas.ts b/packages/core/src/v3/schemas/schemas.ts index 660f9dea38..bd32d848ff 100644 --- a/packages/core/src/v3/schemas/schemas.ts +++ b/packages/core/src/v3/schemas/schemas.ts @@ -1,6 +1,12 @@ import { z } from "zod"; import { RequireKeys } from "../types/index.js"; -import { MachineConfig, MachinePreset, MachinePresetName, TaskRunExecution } from "./common.js"; +import { + MachineConfig, + MachinePreset, + MachinePresetName, + TaskRunExecution, + V3TaskRunExecution, +} from "./common.js"; /* WARNING: Never import anything from ./messages here. If it's needed in both, put it here instead. @@ -36,7 +42,7 @@ export type TaskRunExecutionPayload = z.infer; // Strategies for not breaking backwards compatibility: // 1. Add new fields as optional // 2. If a field is required, add a default value -export const ProdTaskRunExecution = TaskRunExecution.extend({ +export const V3ProdTaskRunExecution = V3TaskRunExecution.extend({ worker: z.object({ id: z.string(), contentHash: z.string(), @@ -46,16 +52,16 @@ export const ProdTaskRunExecution = TaskRunExecution.extend({ machine: MachinePreset.default({ name: "small-1x", cpu: 1, memory: 1, centsPerMs: 0 }), }); -export type ProdTaskRunExecution = z.infer; +export type V3ProdTaskRunExecution = z.infer; -export const ProdTaskRunExecutionPayload = z.object({ - execution: ProdTaskRunExecution, +export const V3ProdTaskRunExecutionPayload = z.object({ + execution: V3ProdTaskRunExecution, traceContext: z.record(z.unknown()), environment: z.record(z.string()).optional(), metrics: TaskRunExecutionMetrics.optional(), }); -export type ProdTaskRunExecutionPayload = z.infer; +export type V3ProdTaskRunExecutionPayload = z.infer; export const FixedWindowRateLimit = z.object({ type: z.literal("fixed-window"), diff --git a/packages/core/src/v3/taskContext/index.ts b/packages/core/src/v3/taskContext/index.ts index a9ef58cf6e..b03b7ff4b6 100644 --- a/packages/core/src/v3/taskContext/index.ts +++ b/packages/core/src/v3/taskContext/index.ts @@ -82,11 +82,9 @@ export class TaskContextAPI { get contextAttributes(): Attributes { if (this.ctx) { return { - [SemanticInternalAttributes.ATTEMPT_ID]: this.ctx.attempt.id, [SemanticInternalAttributes.ATTEMPT_NUMBER]: this.ctx.attempt.number, [SemanticInternalAttributes.TASK_SLUG]: this.ctx.task.id, [SemanticInternalAttributes.TASK_PATH]: this.ctx.task.filePath, - [SemanticInternalAttributes.TASK_EXPORT_NAME]: this.ctx.task.exportName, [SemanticInternalAttributes.QUEUE_NAME]: this.ctx.queue.name, [SemanticInternalAttributes.QUEUE_ID]: this.ctx.queue.id, [SemanticInternalAttributes.RUN_ID]: this.ctx.run.id, diff --git a/packages/core/src/v3/usage/api.ts b/packages/core/src/v3/usage/api.ts index e08c76929d..c3a05fc9f3 100644 --- a/packages/core/src/v3/usage/api.ts +++ b/packages/core/src/v3/usage/api.ts @@ -1,7 +1,7 @@ const API_NAME = "usage"; import { getGlobal, registerGlobal, unregisterGlobal } from "../utils/globals.js"; -import type { UsageManager, UsageMeasurement, UsageSample } from "./types.js"; +import type { InitialUsageState, UsageManager, UsageMeasurement, UsageSample } from "./types.js"; import { NoopUsageManager } from "./noopUsageManager.js"; const NOOP_USAGE_MANAGER = new NoopUsageManager(); @@ -53,6 +53,10 @@ export class UsageAPI implements UsageManager { this.disable(); } + public getInitialState(): InitialUsageState { + return this.#getUsageManager().getInitialState(); + } + #getUsageManager(): UsageManager { return getGlobal(API_NAME) ?? NOOP_USAGE_MANAGER; } diff --git a/packages/core/src/v3/usage/devUsageManager.ts b/packages/core/src/v3/usage/devUsageManager.ts index 80e1fdde21..510213b260 100644 --- a/packages/core/src/v3/usage/devUsageManager.ts +++ b/packages/core/src/v3/usage/devUsageManager.ts @@ -1,4 +1,4 @@ -import { UsageManager, UsageMeasurement, UsageSample } from "./types.js"; +import { InitialUsageState, UsageManager, UsageMeasurement, UsageSample } from "./types.js"; import { clock } from "../clock-api.js"; import { ClockTime, calculateDurationInMs } from "../clock/clock.js"; @@ -45,6 +45,10 @@ export class DevUsageManager implements UsageManager { private _firstMeasurement?: DevUsageMeasurement; private _currentMeasurements: Map = new Map(); private _pauses: Map = new Map(); + private _initialState: InitialUsageState = { + cpuTime: 0, + costInCents: 0, + }; disable(): void {} @@ -54,6 +58,18 @@ export class DevUsageManager implements UsageManager { this._firstMeasurement = undefined; this._currentMeasurements.clear(); this._pauses.clear(); + this._initialState = { + cpuTime: 0, + costInCents: 0, + }; + } + + setInitialState(state: InitialUsageState) { + this._initialState = state; + } + + getInitialState(): InitialUsageState { + return this._initialState; } sample(): UsageSample | undefined { diff --git a/packages/core/src/v3/usage/noopUsageManager.ts b/packages/core/src/v3/usage/noopUsageManager.ts index 9e52144466..d044f1738d 100644 --- a/packages/core/src/v3/usage/noopUsageManager.ts +++ b/packages/core/src/v3/usage/noopUsageManager.ts @@ -1,4 +1,4 @@ -import { UsageManager, UsageMeasurement, UsageSample } from "./types.js"; +import { InitialUsageState, UsageManager, UsageMeasurement, UsageSample } from "./types.js"; export class NoopUsageManager implements UsageManager { disable(): void { @@ -30,4 +30,11 @@ export class NoopUsageManager implements UsageManager { reset(): void { // Noop } + + getInitialState(): InitialUsageState { + return { + cpuTime: 0, + costInCents: 0, + }; + } } diff --git a/packages/core/src/v3/usage/prodUsageManager.ts b/packages/core/src/v3/usage/prodUsageManager.ts index 5d3d49c3d0..7cfd038d01 100644 --- a/packages/core/src/v3/usage/prodUsageManager.ts +++ b/packages/core/src/v3/usage/prodUsageManager.ts @@ -1,5 +1,5 @@ import { setInterval } from "node:timers/promises"; -import { UsageManager, UsageMeasurement, UsageSample } from "./types.js"; +import { InitialUsageState, UsageManager, UsageMeasurement, UsageSample } from "./types.js"; import { UsageClient } from "./usageClient.js"; export type ProdUsageManagerOptions = { @@ -13,6 +13,10 @@ export class ProdUsageManager implements UsageManager { private _abortController: AbortController | undefined; private _lastSample: UsageSample | undefined; private _usageClient: UsageClient | undefined; + private _initialState: InitialUsageState = { + cpuTime: 0, + costInCents: 0, + }; constructor( private readonly delegageUsageManager: UsageManager, @@ -27,6 +31,14 @@ export class ProdUsageManager implements UsageManager { return typeof this._usageClient !== "undefined"; } + setInitialState(state: InitialUsageState) { + this._initialState = state; + } + + getInitialState(): InitialUsageState { + return this._initialState; + } + reset(): void { this.delegageUsageManager.reset(); this._abortController?.abort(); @@ -34,6 +46,10 @@ export class ProdUsageManager implements UsageManager { this._usageClient = undefined; this._measurement = undefined; this._lastSample = undefined; + this._initialState = { + cpuTime: 0, + costInCents: 0, + }; } disable(): void { diff --git a/packages/core/src/v3/usage/types.ts b/packages/core/src/v3/usage/types.ts index 8655950df3..45fd32e63b 100644 --- a/packages/core/src/v3/usage/types.ts +++ b/packages/core/src/v3/usage/types.ts @@ -7,8 +7,14 @@ export interface UsageMeasurement { sample(): UsageSample; } +export type InitialUsageState = { + cpuTime: number; + costInCents: number; +}; + export interface UsageManager { disable(): void; + getInitialState(): InitialUsageState; start(): UsageMeasurement; stop(measurement: UsageMeasurement): UsageSample; sample(): UsageSample | undefined; diff --git a/packages/trigger-sdk/src/v3/shared.ts b/packages/trigger-sdk/src/v3/shared.ts index 5d5d896621..487c16308e 100644 --- a/packages/trigger-sdk/src/v3/shared.ts +++ b/packages/trigger-sdk/src/v3/shared.ts @@ -1313,7 +1313,6 @@ async function triggerAndWait_internal { const sample = usageApi.sample(); + const initialState = usageApi.getInitialState(); const machine = taskContext.ctx?.machine; const run = taskContext.ctx?.run; @@ -65,12 +66,12 @@ export const usage = { durationMs: 0, }, total: { - costInCents: run?.costInCents ?? 0, - durationMs: run?.durationMs ?? 0, + costInCents: initialState.costInCents, + durationMs: initialState.cpuTime, }, }, baseCostInCents: run?.baseCostInCents ?? 0, - totalCostInCents: (run?.costInCents ?? 0) + (run?.baseCostInCents ?? 0), + totalCostInCents: initialState.costInCents + (run?.baseCostInCents ?? 0), }; } @@ -83,12 +84,12 @@ export const usage = { durationMs: sample.cpuTime, }, total: { - costInCents: (run?.costInCents ?? 0) + currentCostInCents, - durationMs: (run?.durationMs ?? 0) + sample.cpuTime, + costInCents: currentCostInCents + initialState.costInCents, + durationMs: sample.cpuTime + initialState.cpuTime, }, }, baseCostInCents: run?.baseCostInCents ?? 0, - totalCostInCents: (run?.costInCents ?? 0) + currentCostInCents + (run?.baseCostInCents ?? 0), + totalCostInCents: currentCostInCents + (run?.baseCostInCents ?? 0) + initialState.costInCents, }; }, /** diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 37ec88c236..5ccc631488 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -921,6 +921,24 @@ importers: docs: {} + internal-packages/cache: + dependencies: + '@internal/redis': + specifier: workspace:* + version: link:../redis + '@trigger.dev/core': + specifier: workspace:* + version: link:../../packages/core + '@unkey/cache': + specifier: ^1.5.0 + version: 1.5.0 + '@unkey/error': + specifier: ^0.2.0 + version: 0.2.0 + superjson: + specifier: ^2.2.1 + version: 2.2.1 + internal-packages/clickhouse: dependencies: '@clickhouse/client': @@ -1054,6 +1072,9 @@ importers: internal-packages/run-engine: dependencies: + '@internal/cache': + specifier: workspace:* + version: link:../cache '@internal/redis': specifier: workspace:* version: link:../redis @@ -1069,9 +1090,6 @@ importers: '@trigger.dev/redis-worker': specifier: workspace:* version: link:../../packages/redis-worker - '@unkey/cache': - specifier: ^1.5.0 - version: 1.5.0 assert-never: specifier: ^1.2.1 version: 1.2.1 diff --git a/references/hello-world/src/trigger/example.ts b/references/hello-world/src/trigger/example.ts index 8759953d28..1eb7f18916 100644 --- a/references/hello-world/src/trigger/example.ts +++ b/references/hello-world/src/trigger/example.ts @@ -57,7 +57,7 @@ export const parentTask = task({ id: "parent", machine: "medium-1x", run: async (payload: any, { ctx }) => { - logger.log("Hello, world from the parent", { payload }); + logger.log("Hello, world from the parent", { payload, ctx }); await childTask.triggerAndWait({ message: "Hello, world!", aReallyBigInt: BigInt(10000) }); }, }); @@ -107,7 +107,7 @@ export const childTask = task({ }: { message?: string; failureChance?: number; duration?: number; aReallyBigInt?: bigint }, { ctx } ) => { - logger.info("Hello, world from the child", { message, failureChance, aReallyBigInt }); + logger.info("Hello, world from the child", { ctx, failureChance, aReallyBigInt }); if (Math.random() < failureChance) { throw new Error("Random error at start"); diff --git a/references/hello-world/src/trigger/metadata.ts b/references/hello-world/src/trigger/metadata.ts index d9618f4207..b0d493b8df 100644 --- a/references/hello-world/src/trigger/metadata.ts +++ b/references/hello-world/src/trigger/metadata.ts @@ -44,6 +44,8 @@ export const parentTask = task({ metadata.parent.set("test.parent.set", true); metadata.set("test.set", "test"); + logger.info("logging metadata.current()", { current: metadata.current() }); + await childTask.triggerAndWait({}); return { diff --git a/references/hello-world/src/trigger/usage.ts b/references/hello-world/src/trigger/usage.ts new file mode 100644 index 0000000000..caacbde2ac --- /dev/null +++ b/references/hello-world/src/trigger/usage.ts @@ -0,0 +1,35 @@ +import { logger, task, wait, usage } from "@trigger.dev/sdk"; +import { setTimeout } from "timers/promises"; + +export const usageExampleTask = task({ + id: "usage-example", + retry: { + maxAttempts: 3, + minTimeoutInMs: 500, + maxTimeoutInMs: 1000, + factor: 1.5, + }, + run: async (payload: { throwError: boolean }, { ctx }) => { + logger.info("run.ctx", { ctx }); + + await setTimeout(1000); + + const currentUsage = usage.getCurrent(); + + logger.info("currentUsage", { currentUsage }); + + if (payload.throwError && ctx.attempt.number === 1) { + throw new Error("Forced error to cause a retry"); + } + + await setTimeout(5000); + + const currentUsage2 = usage.getCurrent(); + + logger.info("currentUsage2", { currentUsage2 }); + + return { + message: "Hello, world!", + }; + }, +});