Skip to content

Improve metadata flushing efficiency by collapsing operations (fix #2104) #2106

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/small-dancers-smell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/sdk": patch
"@trigger.dev/core": patch
---

Improve metadata flushing efficiency by collapsing operations
2 changes: 1 addition & 1 deletion apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const { action } = createActionApiRoute(
{
params: ParamsSchema,
body: UpdateMetadataRequestBody,
maxContentLength: 1024 * 1024, // 1MB
maxContentLength: 1024 * 1024 * 2, // 2MB
method: "PUT",
},
async ({ authentication, body, params }) => {
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/services/metadata/updateMetadata.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ export class UpdateMetadataService extends BaseService {
`[UpdateMetadataService][updateRunMetadataWithOperations] Updated metadata for run ${runId}`,
{
metadata: applyResults.newMetadata,
operations: operations,
}
);
}
Expand Down
30 changes: 21 additions & 9 deletions packages/core/src/v3/runMetadata/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { ApiClient } from "../apiClient/index.js";
import { FlushedRunMetadata, RunMetadataChangeOperation } from "../schemas/common.js";
import { ApiRequestOptions } from "../zodfetch.js";
import { MetadataStream } from "./metadataStream.js";
import { applyMetadataOperations } from "./operations.js";
import { applyMetadataOperations, collapseOperations } from "./operations.js";
import { RunMetadataManager, RunMetadataUpdater } from "./types.js";
import { AsyncIterableStream } from "../streams/asyncIterableStream.js";

Expand Down Expand Up @@ -33,7 +33,7 @@ export class StandardMetadataManager implements RunMetadataManager {
get parent(): RunMetadataUpdater {
// Store a reference to 'this' to ensure proper context
const self = this;

// Create the updater object and store it in a local variable
const parentUpdater: RunMetadataUpdater = {
set: (key, value) => {
Expand Down Expand Up @@ -66,14 +66,14 @@ export class StandardMetadataManager implements RunMetadataManager {
},
stream: (key, value, signal) => self.doStream(key, value, "parent", parentUpdater, signal),
};

return parentUpdater;
}

get root(): RunMetadataUpdater {
// Store a reference to 'this' to ensure proper context
const self = this;

// Create the updater object and store it in a local variable
const rootUpdater: RunMetadataUpdater = {
set: (key, value) => {
Expand Down Expand Up @@ -106,7 +106,7 @@ export class StandardMetadataManager implements RunMetadataManager {
},
stream: (key, value, signal) => self.doStream(key, value, "root", rootUpdater, signal),
};

return rootUpdater;
}

Expand Down Expand Up @@ -353,9 +353,17 @@ export class StandardMetadataManager implements RunMetadataManager {
this.queuedRootOperations.clear();

try {
const collapsedOperations = collapseOperations(operations);
const collapsedParentOperations = collapseOperations(parentOperations);
const collapsedRootOperations = collapseOperations(rootOperations);

const response = await this.apiClient.updateRunMetadata(
this.runId,
{ operations, parentOperations, rootOperations },
{
operations: collapsedOperations,
parentOperations: collapsedParentOperations,
rootOperations: collapsedRootOperations,
},
requestOptions
);

Expand Down Expand Up @@ -406,10 +414,14 @@ export class StandardMetadataManager implements RunMetadataManager {
return;
}

const operations = Array.from(this.queuedOperations);
const parentOperations = Array.from(this.queuedParentOperations);
const rootOperations = Array.from(this.queuedRootOperations);

return {
operations: Array.from(this.queuedOperations),
parentOperations: Array.from(this.queuedParentOperations),
rootOperations: Array.from(this.queuedRootOperations),
operations: collapseOperations(operations),
parentOperations: collapseOperations(parentOperations),
rootOperations: collapseOperations(rootOperations),
};
}

Expand Down
107 changes: 107 additions & 0 deletions packages/core/src/v3/runMetadata/operations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,110 @@ export function applyMetadataOperations(

return { newMetadata, unappliedOperations };
}

/**
* Collapses metadata operations to reduce payload size and avoid 413 "Request Entity Too Large" errors.
*
* When there are many operations queued up (e.g., 10k increment operations), sending them all
* individually can result in request payloads exceeding the server's 1MB limit. This function
* intelligently combines operations where possible to reduce the payload size:
*
* - **Increment operations**: Multiple increments on the same key are summed into a single increment
* - Example: increment("counter", 1) + increment("counter", 2) → increment("counter", 3)
*
* - **Set operations**: Multiple sets on the same key keep only the last one (since later sets override earlier ones)
* - Example: set("status", "processing") + set("status", "done") → set("status", "done")
*
* - **Delete operations**: Multiple deletes on the same key keep only one (duplicates are redundant)
* - Example: del("temp") + del("temp") → del("temp")
*
* - **Append, remove, and update operations**: Preserved as-is to maintain correctness since order matters
*
* @param operations Array of metadata change operations to collapse
* @returns Collapsed array with fewer operations that produce the same final result
*
* @example
* ```typescript
* const operations = [
* { type: "increment", key: "counter", value: 1 },
* { type: "increment", key: "counter", value: 2 },
* { type: "set", key: "status", value: "processing" },
* { type: "set", key: "status", value: "done" }
* ];
*
* const collapsed = collapseOperations(operations);
* // Result: [
* // { type: "increment", key: "counter", value: 3 },
* // { type: "set", key: "status", value: "done" }
* // ]
* ```
*/
export function collapseOperations(
operations: RunMetadataChangeOperation[]
): RunMetadataChangeOperation[] {
if (operations.length === 0) {
return operations;
}

// Maps to track collapsible operations
const incrementsByKey = new Map<string, number>();
const setsByKey = new Map<string, RunMetadataChangeOperation>();
const deletesByKey = new Set<string>();
const preservedOperations: RunMetadataChangeOperation[] = [];

// Process operations in order
for (const operation of operations) {
switch (operation.type) {
case "increment": {
const currentIncrement = incrementsByKey.get(operation.key) || 0;
incrementsByKey.set(operation.key, currentIncrement + operation.value);
break;
}
case "set": {
// Keep only the last set operation for each key
setsByKey.set(operation.key, operation);
break;
}
case "delete": {
// Keep only one delete operation per key
deletesByKey.add(operation.key);
break;
}
case "append":
case "remove":
case "update": {
// Preserve these operations as-is to maintain correctness
preservedOperations.push(operation);
break;
}
default: {
// Handle any future operation types by preserving them
preservedOperations.push(operation);
break;
}
}
}

// Build the collapsed operations array
const collapsedOperations: RunMetadataChangeOperation[] = [];

// Add collapsed increment operations
for (const [key, value] of incrementsByKey) {
collapsedOperations.push({ type: "increment", key, value });
}

// Add collapsed set operations
for (const operation of setsByKey.values()) {
collapsedOperations.push(operation);
}

// Add collapsed delete operations
for (const key of deletesByKey) {
collapsedOperations.push({ type: "delete", key });
}

// Add preserved operations
collapsedOperations.push(...preservedOperations);

return collapsedOperations;
}
Loading