Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/quiet-falcons-approve.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/sdk": patch
---

gracefully recover from ECONNRESET errors when sending stream data from tasks to the server
4 changes: 4 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ const EnvironmentSchema = z
.string()
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
REALTIME_STREAMS_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
REALTIME_STREAMS_INACTIVITY_TIMEOUT_MS: z.coerce
.number()
.int()
.default(60000 * 5), // 5 minutes

REALTIME_MAXIMUM_CREATED_AT_FILTER_AGE_IN_MS: z.coerce
.number()
Expand Down
41 changes: 36 additions & 5 deletions apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { ActionFunctionArgs } from "@remix-run/server-runtime";
import { z } from "zod";
import { $replica } from "~/db.server";
import { relayRealtimeStreams } from "~/services/realtime/relayRealtimeStreams.server";
import { v1RealtimeStreams } from "~/services/realtime/v1StreamsGlobal.server";
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";

const ParamsSchema = z.object({
Expand All @@ -12,11 +12,39 @@ const ParamsSchema = z.object({
export async function action({ request, params }: ActionFunctionArgs) {
const $params = ParamsSchema.parse(params);

// Extract client ID from header, default to "default" if not provided
const clientId = request.headers.get("X-Client-Id") || "default";

// Handle HEAD request to get last chunk index for this client
if (request.method === "HEAD") {
const lastChunkIndex = await v1RealtimeStreams.getLastChunkIndex(
$params.runId,
$params.streamId,
clientId
);

return new Response(null, {
status: 200,
headers: {
"X-Last-Chunk-Index": lastChunkIndex.toString(),
},
});
}

if (!request.body) {
return new Response("No body provided", { status: 400 });
}

return relayRealtimeStreams.ingestData(request.body, $params.runId, $params.streamId);
const resumeFromChunk = request.headers.get("X-Resume-From-Chunk");
const resumeFromChunkNumber = resumeFromChunk ? parseInt(resumeFromChunk, 10) : undefined;

return v1RealtimeStreams.ingestData(
request.body,
$params.runId,
$params.streamId,
clientId,
resumeFromChunkNumber
);
}

export const loader = createLoaderApiRoute(
Expand Down Expand Up @@ -51,12 +79,15 @@ export const loader = createLoaderApiRoute(
},
},
async ({ params, request, resource: run, authentication }) => {
return relayRealtimeStreams.streamResponse(
// Get Last-Event-ID header for resuming from a specific position
const lastEventId = request.headers.get("Last-Event-ID") || undefined;

return v1RealtimeStreams.streamResponse(
request,
run.friendlyId,
params.streamId,
authentication.environment,
request.signal
request.signal,
lastEventId
);
}
);
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import { z } from "zod";
import { $replica } from "~/db.server";
import { relayRealtimeStreams } from "~/services/realtime/relayRealtimeStreams.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { v1RealtimeStreams } from "~/services/realtime/v1StreamsGlobal.server";
import {
createActionApiRoute,
createLoaderApiRoute,
} from "~/services/routeBuilders/apiBuilder.server";

const ParamsSchema = z.object({
runId: z.string(),
Expand All @@ -14,10 +17,6 @@ const { action } = createActionApiRoute(
params: ParamsSchema,
},
async ({ request, params, authentication }) => {
if (!request.body) {
return new Response("No body provided", { status: 400 });
}

const run = await $replica.taskRun.findFirst({
where: {
friendlyId: params.runId,
Expand Down Expand Up @@ -54,8 +53,91 @@ const { action } = createActionApiRoute(
return new Response("Target not found", { status: 404 });
}

return relayRealtimeStreams.ingestData(request.body, targetId, params.streamId);
// Extract client ID from header, default to "default" if not provided
const clientId = request.headers.get("X-Client-Id") || "default";

if (!request.body) {
return new Response("No body provided", { status: 400 });
}

const resumeFromChunk = request.headers.get("X-Resume-From-Chunk");
const resumeFromChunkNumber = resumeFromChunk ? parseInt(resumeFromChunk, 10) : undefined;

return v1RealtimeStreams.ingestData(
request.body,
targetId,
params.streamId,
clientId,
resumeFromChunkNumber
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Guard against NaN/invalid X-Resume-From-Chunk.

parseInt may yield NaN; passing it downstream will store "NaN" indices.

-    const resumeFromChunk = request.headers.get("X-Resume-From-Chunk");
-    const resumeFromChunkNumber = resumeFromChunk ? parseInt(resumeFromChunk, 10) : undefined;
+    const resumeFromChunkHeader = request.headers.get("X-Resume-From-Chunk");
+    const n =
+      resumeFromChunkHeader !== null ? Number(resumeFromChunkHeader) : undefined;
+    if (n !== undefined && (!Number.isInteger(n) || n < 0)) {
+      return new Response("Invalid X-Resume-From-Chunk", { status: 400 });
+    }
+    const resumeFromChunkNumber = n;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Extract client ID from header, default to "default" if not provided
const clientId = request.headers.get("X-Client-Id") || "default";
if (!request.body) {
return new Response("No body provided", { status: 400 });
}
const resumeFromChunk = request.headers.get("X-Resume-From-Chunk");
const resumeFromChunkNumber = resumeFromChunk ? parseInt(resumeFromChunk, 10) : undefined;
return v1RealtimeStreams.ingestData(
request.body,
targetId,
params.streamId,
clientId,
resumeFromChunkNumber
);
}
// Extract client ID from header, default to "default" if not provided
const clientId = request.headers.get("X-Client-Id") || "default";
if (!request.body) {
return new Response("No body provided", { status: 400 });
}
const resumeFromChunkHeader = request.headers.get("X-Resume-From-Chunk");
const n =
resumeFromChunkHeader !== null ? Number(resumeFromChunkHeader) : undefined;
if (n !== undefined && (!Number.isInteger(n) || n < 0)) {
return new Response("Invalid X-Resume-From-Chunk", { status: 400 });
}
const resumeFromChunkNumber = n;
return v1RealtimeStreams.ingestData(
request.body,
targetId,
params.streamId,
clientId,
resumeFromChunkNumber
);
}
🤖 Prompt for AI Agents
In apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts around
lines 56 to 73, the X-Resume-From-Chunk header is parsed with parseInt which can
produce NaN and propagate invalid indices; validate the parsed value by checking
Number.isInteger and !Number.isNaN and ensure it’s non-negative—if invalid,
treat it as undefined (or return a 400 error if you prefer strict validation);
convert the header to a safe number only when valid and otherwise pass undefined
to v1RealtimeStreams.ingestData so downstream code never receives NaN.

);

const loader = createLoaderApiRoute(
{
params: ParamsSchema,
allowJWT: false,
corsStrategy: "none",
findResource: async (params, authentication) => {
return $replica.taskRun.findFirst({
where: {
friendlyId: params.runId,
runtimeEnvironmentId: authentication.environment.id,
},
select: {
id: true,
friendlyId: true,
parentTaskRun: {
select: {
friendlyId: true,
},
},
rootTaskRun: {
select: {
friendlyId: true,
},
},
},
});
},
},
async ({ request, params, resource: run }) => {
if (!run) {
return new Response("Run not found", { status: 404 });
}

const targetId =
params.target === "self"
? run.friendlyId
: params.target === "parent"
? run.parentTaskRun?.friendlyId
: run.rootTaskRun?.friendlyId;

if (!targetId) {
return new Response("Target not found", { status: 404 });
}

// Handle HEAD request to get last chunk index
if (request.method !== "HEAD") {
return new Response("Only HEAD requests are allowed for this endpoint", { status: 405 });
}

// Extract client ID from header, default to "default" if not provided
const clientId = request.headers.get("X-Client-Id") || "default";

const lastChunkIndex = await v1RealtimeStreams.getLastChunkIndex(
targetId,
params.streamId,
clientId
);

return new Response(null, {
status: 200,
headers: {
"X-Last-Chunk-Index": lastChunkIndex.toString(),
},
});
}
);

export { action };
export { action, loader };
Loading