Skip to content

Conversation

ericallam
Copy link
Member

No description provided.

Copy link

changeset-bot bot commented Sep 27, 2025

⚠️ No Changeset found

Latest commit: 5cfe68e

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

Copy link
Contributor

coderabbitai bot commented Sep 27, 2025

Walkthrough

The changes introduce a ClickHouse-backed event repository alongside existing storage, add environment/config wiring, and implement per-store resolution for recording and querying events and traces. Multiple presenters, routes, services, and run engine components are updated to resolve repositories based on taskEventStore and feature flags. A TracePubSub replaces prior direct pub/sub usage in streaming. OTLP exporter and enrich utilities are refactored to CreateEventInput types and store-aware batching. Legacy taskEvent utilities are removed. New ClickHouse client APIs, schema, and query builders are added. Feature flags, organization featureFlags field, and OTEL resource attributes propagation are included.

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~180 minutes

Pre-merge checks and finishing touches

❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Description Check ⚠️ Warning There is no pull request description, so none of the required template sections (issue link, checklist, testing steps, changelog, screenshots) are present. This makes it impossible to verify how the change was tested or document its scope. Please add a description using the repository template by including the issue number, checklist, testing steps, a brief changelog summary, and any relevant screenshots.
Docstring Coverage ⚠️ Warning Docstring coverage is 9.46% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (1 passed)
Check name Status Explanation
Title Check ✅ Passed The title succinctly describes the primary feature added—ClickHouse support for storing spans and logs in the OTLP pipeline—matching the large-scale integration of ClickHouse event storage and schema changes in the PR. It is clear, specific, and relates directly to the main objective without unnecessary detail.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/clickhouse-otel

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 22

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (8)
apps/webapp/app/routes/otel.v1.logs.ts (1)

7-16: Don’t compare content-type exactly; handle charset parameters

Exact equality will reject common headers like application/json; charset=utf-8. Use startsWith/normalization.

Apply:

-    const contentType = request.headers.get("content-type");
+    const contentType = request.headers.get("content-type")?.toLowerCase() ?? "";
 
-    if (contentType === "application/json") {
+    if (contentType.startsWith("application/json")) {
       const body = await request.json();
 
-      const exportResponse = await otlpExporter.exportLogs(body as ExportLogsServiceRequest);
+      const exportResponse = await otlpExporter.exportLogs(body as ExportLogsServiceRequest);
 
       return json(exportResponse, { status: 200 });
-    } else if (contentType === "application/x-protobuf") {
+    } else if (contentType.startsWith("application/x-protobuf")) {
       const buffer = await request.arrayBuffer();

Also applies to: 25-30

apps/webapp/app/routes/otel.v1.traces.ts (1)

7-16: Handle charset in content-type

Accept application/json; charset=utf-8 and similar for protobuf.

-    const contentType = request.headers.get("content-type");
+    const contentType = request.headers.get("content-type")?.toLowerCase() ?? "";
 
-    if (contentType === "application/json") {
+    if (contentType.startsWith("application/json")) {
       const body = await request.json();
 
       const exportResponse = await otlpExporter.exportTraces(body as ExportTraceServiceRequest);
 
       return json(exportResponse, { status: 200 });
-    } else if (contentType === "application/x-protobuf") {
+    } else if (contentType.startsWith("application/x-protobuf")) {
       const buffer = await request.arrayBuffer();

Also applies to: 25-30

apps/webapp/app/routes/api.v1.runs.$runId.trace.ts (1)

41-47: Align getTraceDetailedSummary call with its signature
eventRepository.types.ts defines getTraceDetailedSummary(storeTable, environmentId, traceId, options?), but here you’re passing run.createdAt and run.completedAt as extra positional args. Remove those parameters or update the repository interface/implementation to accept from/to.
Location: apps/webapp/app/routes/api.v1.runs.$runId.trace.ts:41-47

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts (1)

2188-2206: Don't clobber existing OTEL resource attributes

Setting OTEL_RESOURCE_ATTRIBUTES to a JSON string overwrites any value defined in the environment and drops all previously configured resource attributes. For example, a user-provided service.name=foo would disappear once this branch runs, which will break downstream telemetry. Please append the $trigger.taskEventStore attribute to the existing value (or initialise it if absent) using the standard key=value comma-separated format instead of replacing it.

-    if (taskEventStore) {
-      const resourceAttributes = JSON.stringify({
-        [SemanticInternalAttributes.TASK_EVENT_STORE]: taskEventStore,
-      });
-
-      variables.push(...[{ key: "OTEL_RESOURCE_ATTRIBUTES", value: resourceAttributes }]);
-    }
+    if (taskEventStore) {
+      const entry = `${SemanticInternalAttributes.TASK_EVENT_STORE}=${taskEventStore}`;
+      const existingIndex = variables.findIndex(
+        (variable) => variable.key === "OTEL_RESOURCE_ATTRIBUTES"
+      );
+
+      if (existingIndex >= 0) {
+        const existingValue = variables[existingIndex].value ?? "";
+        const separator = existingValue.length > 0 ? "," : "";
+        variables[existingIndex] = {
+          key: "OTEL_RESOURCE_ATTRIBUTES",
+          value: `${existingValue}${separator}${entry}`,
+        };
+      } else {
+        variables.push({ key: "OTEL_RESOURCE_ATTRIBUTES", value: entry });
+      }
+    }
apps/webapp/app/runEngine/concerns/traceEvents.server.ts (1)

109-121: Record idempotency message via the resolved repository, not the default

This writes the debug message to the default Postgres repository even when the trace is routed to ClickHouse, fragmenting traces across stores.

Apply this diff:

-        await this.eventRepository.recordEvent(
+        await repository.recordEvent(
           `There's an existing run for idempotencyKey: ${idempotencyKey}`,
           {
             taskSlug: request.taskId,
             environment: request.environment,
             attributes: {
               runId: existingRun.friendlyId,
             },
             context: request.options?.traceContext,
             parentId: event.spanId,
           }
         );
apps/webapp/app/v3/otlpExporter.server.ts (3)

126-133: Fix trace count logging (counts scopes, not spans).

current totalSpans sums scopeSpans length; should sum inner spans.

-    logger.debug("Exporting traces", {
-      resourceSpans: request.resourceSpans.length,
-      totalSpans: request.resourceSpans.reduce(
-        (acc, resourceSpan) => acc + resourceSpan.scopeSpans.length,
-        0
-      ),
-    });
+    logger.debug("Exporting traces", {
+      resourceSpans: request.resourceSpans.length,
+      totalSpans: request.resourceSpans.reduce(
+        (acc, resourceSpan) =>
+          acc +
+          resourceSpan.scopeSpans.reduce((sum, scope) => sum + scope.spans.length, 0),
+        0
+      ),
+    });

792-799: Whitelist OTLP exporter env vars in env.server.ts

  • Define OTLP_EXPORTER_VERBOSE and SERVER_OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT in the env export (apps/webapp/app/env.server.ts)
  • Update the OTLPExporter instantiation in apps/webapp/app/v3/otlpExporter.server.ts to use env.* instead of process.env.*

248-249: Emit logs with LOG kind
Downstream isLogEvent only recognizes TaskEventKind.LOG, so emitting logs as "INTERNAL" prevents them from being classified as log events. In apps/webapp/app/v3/otlpExporter.server.ts, replace

kind: "INTERNAL" as const

with

kind: TaskEventKind.LOG
🧹 Nitpick comments (34)
packages/core/src/v3/utils/flattenAttributes.ts (1)

262-268: Precompute filtered key lookups to avoid repeated scans

filteredKeys?.includes(key) walks the array on every iteration. When unflattening large attribute maps this balloons to O(n·m). Converting the list to a Set once keeps the loop O(n) without changing behavior.

   const result: Record<string, unknown> = {};
 
-  for (const [key, value] of Object.entries(obj)) {
-    if (filteredKeys?.includes(key)) {
+  const filteredKeySet = filteredKeys ? new Set(filteredKeys) : undefined;
+
+  for (const [key, value] of Object.entries(obj)) {
+    if (filteredKeySet?.has(key)) {
       continue;
     }
TaskEvent_Property_Analysis.md (1)

9-31: Fix table section rows to satisfy MD056

markdownlint (MD056) is firing because these section rows only declare a single cell even though the header defines four columns. GitHub will render them, but our lint pipeline keeps failing. Pad the rows out to four cells (and repeat for the other section dividers in the doc).

-| **Core Identity & Structure**  |
+| **Core Identity & Structure** |  |  |  |

Based on markdownlint-cli2

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx (1)

305-307: Avoid unsafe type assertions for spanOverrides; narrow the type once.

Replace the local cast at usage with a typed alias near destructuring to keep types sound and remove the as at callsite.

Apply:

-  const { events, duration, rootSpanStatus, rootStartedAt, queuedDuration, overridesBySpanId } =
-    trace;
+  const {
+    events,
+    duration,
+    rootSpanStatus,
+    rootStartedAt,
+    queuedDuration,
+    overridesBySpanId: rawOverridesBySpanId,
+  } = trace;
+  const overridesBySpanId = rawOverridesBySpanId as Record<string, SpanOverride> | undefined;
@@
-  const spanOverrides = selectedSpanId ? overridesBySpanId?.[selectedSpanId] : undefined;
+  const spanOverrides = selectedSpanId ? overridesBySpanId?.[selectedSpanId] : undefined;
@@
-              spanOverrides={spanOverrides as SpanOverride | undefined}
+              spanOverrides={spanOverrides}

Also applies to: 328-329, 378-379

apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts (1)

44-45: Confirm eventEmitter lifecycle is per-subscription to avoid cross-subscriber listener removal.

eventEmitter.removeAllListeners() assumes an instance scoped to this subscription. If shared, scope removals to the "message" event or return an isolated emitter from subscribeToTrace.

apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts (2)

70-77: Prefer Web Streams and CompressionStream over Node streams for isomorphism.

Remix routes should avoid Node-specific streams/zlib. Use ReadableStream and pipeThrough(new CompressionStream("gzip")) for portability.

As per coding guidelines

-import { createGzip } from "zlib";
-import { Readable } from "stream";
+// no Node-specific stream imports

@@
-  // Create a Readable stream from the runEvents array
-  const readable = new Readable({
-    read() {
-      runEvents.forEach((event) => {
-        try {
-          if (!user.admin && event.kind === TaskEventKind.LOG) {
-            // Only return debug logs for admins
-            return;
-          }
-          this.push(formatRunEvent(event) + "\n");
-        } catch {}
-      });
-      this.push(null); // End of stream
-    },
-  });
-
-  // Create a gzip transform stream
-  const gzip = createGzip();
-
-  // Pipe the readable stream into the gzip stream
-  const compressedStream = readable.pipe(gzip);
-
-  // Return the response with the compressed stream
-  return new Response(compressedStream as any, {
+  const stream = new ReadableStream({
+    start(controller) {
+      for (const event of runEvents) {
+        try {
+          if (!user.admin && event.kind === TaskEventKind.LOG) continue;
+          controller.enqueue(new TextEncoder().encode(formatRunEvent(event) + "\n"));
+        } catch {}
+      }
+      controller.close();
+    },
+  });
+
+  const body =
+    typeof CompressionStream !== "undefined"
+      ? stream.pipeThrough(new CompressionStream("gzip"))
+      : stream; // fallback: uncompressed if CompressionStream not available
+
+  return new Response(body, {
     status: 200,
     headers: {
       "Content-Type": "application/octet-stream",
       "Content-Disposition": `attachment; filename="${parsedParams.runParam}.log"`,
-      "Content-Encoding": "gzip",
+      ...(typeof CompressionStream !== "undefined" ? { "Content-Encoding": "gzip" } : {}),
     },
   });

125-127: De-duplicate util by reusing shared helper.

Import getDateFromNanoseconds from ~/v3/eventRepository/common.server instead of redefining it here.

Apply in this file:

+import { getDateFromNanoseconds } from "~/v3/eventRepository/common.server";
@@
-function getDateFromNanoseconds(nanoseconds: bigint) {
-  return new Date(Number(nanoseconds) / 1_000_000);
-}

Note: If you adopt the precision fix, update the shared helper accordingly in a follow-up.

apps/webapp/app/routes/otel.v1.logs.ts (1)

22-24: Set protobuf response Content-Type

Return header helps OTLP clients decode properly.

-      return new Response(ExportLogsServiceResponse.encode(exportResponse).finish(), {
-        status: 200,
-      });
+      return new Response(ExportLogsServiceResponse.encode(exportResponse).finish(), {
+        status: 200,
+        headers: { "Content-Type": "application/x-protobuf" },
+      });
apps/webapp/app/v3/services/cancelTaskRunV1.server.ts (1)

115-119: Include store in error log context

Adding the store eases debugging across mixed repositories.

   if (cancelRunEventError) {
     logger.error("[CancelTaskRunServiceV1] Failed to cancel run event", {
       error: cancelRunEventError,
       runId: cancelledTaskRun.id,
+      taskEventStore: cancelledTaskRun.taskEventStore,
     });
   }
apps/webapp/app/routes/otel.v1.traces.ts (1)

22-24: Set protobuf response Content-Type

Mirror OTLP expectations for binary responses.

-      return new Response(ExportTraceServiceResponse.encode(exportResponse).finish(), {
-        status: 200,
-      });
+      return new Response(ExportTraceServiceResponse.encode(exportResponse).finish(), {
+        status: 200,
+        headers: { "Content-Type": "application/x-protobuf" },
+      });
internal-packages/clickhouse/src/client/types.ts (2)

93-98: Document and narrow the unsafe insert API

insertUnsafe bypasses schema validation; add JSDoc warning and prefer Record<string, unknown> over any.

-  insertUnsafe<TRecord extends Record<string, any>>(req: {
+  insertUnsafe<TRecord extends Record<string, unknown>>(req: {
     name: string;
     table: string;
     settings?: ClickHouseSettings;
   }): ClickhouseInsertFunction<TRecord>;

4-4: Use type-only import for ClickHouseSettings

Avoids unnecessary runtime import.

-import { ClickHouseSettings } from "@clickhouse/client";
+import type { ClickHouseSettings } from "@clickhouse/client";
internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts (1)

780-805: Propagating taskEventStore in emitted events is consistent; add it to runAttemptStarted too

For symmetry and consumer simplicity, include taskEventStore on runAttemptStarted as well.

   this.$.eventBus.emit("runAttemptStarted", {
     time: new Date(),
     run: {
       id: updatedRun.id,
       status: updatedRun.status,
       createdAt: updatedRun.createdAt,
       updatedAt: updatedRun.updatedAt,
       attemptNumber: nextAttemptNumber,
       baseCostInCents: updatedRun.baseCostInCents,
       executedAt: updatedRun.executedAt ?? undefined,
+      taskEventStore: updatedRun.taskEventStore,
     },
     organization: {
       id: updatedRun.runtimeEnvironment.organizationId,
     },
     project: {
       id: updatedRun.projectId,
     },
     environment: {
       id: updatedRun.runtimeEnvironment.id,
     },
   });

Also applies to: 900-913, 978-1001, 1418-1442, 1560-1583

apps/webapp/app/v3/eventRepository/clickhouseEventRepositoryInstance.server.ts (1)

22-45: Prefer logger over bare console output

We generally steer clear of console.log in the webapp and lean on our logger utilities so messages flow through the configured transports. Could we swap the raw console call for logger.info (or similar) on the ClickHouse instance? Same refactor would help if we ever need to adjust log levels across environments.

internal-packages/clickhouse/schema/007_add_task_events_v1.sql (2)

47-47: Consider reordering the primary key for typical trace queries

If most queries filter by environment_id and trace_id (optionally with a time window), consider ORDER BY (environment_id, trace_id, start_time). Using toUnixTimestamp(start_time) in ORDER BY can reduce data‑skipping for time‑range scans and loses sub-second ordering. Evaluate query patterns before changing.


48-49: TTL setting may retain expired rows longer than expected

SETTINGS ttl_only_drop_parts = 1 drops whole parts only. If rows within a part expire at different times, expired rows may persist until the entire part qualifies, impacting retention and storage. If you need row‑level TTL guarantees, remove this setting (default) to allow merges to purge expired rows.

Would you like me to propose a version with row‑level TTL and merge tuning?

apps/webapp/app/v3/eventRepository/index.server.ts (1)

20-30: Store naming consistency

getEventRepository returns store = "clickhouse" or the actual table name ("taskEvent" | "taskEventPartitioned"). Ensure downstream consumers (callbacks, presenters) expect this mixed namespace ("backend" vs "table"). If they need a backend discriminator, return both backend ("postgres" | "clickhouse") and table ("taskEvent" | "taskEventPartitioned").

I can draft a minimal change returning { repository, backend: "postgres" | "clickhouse", storeTable?: "taskEvent" | "taskEventPartitioned" }.

apps/webapp/app/runEngine/concerns/traceEvents.server.ts (1)

14-22: Remove unused ClickHouse DI to simplify concerns

clickhouseEventRepository is no longer used after introducing getEventRepository. Drop the field and constructor arg to avoid confusion and keep DI minimal.

Apply this diff:

-  private readonly clickhouseEventRepository: ClickhouseEventRepository;
+  // no-op

-  constructor(
-    eventRepository: EventRepository,
-    clickhouseEventRepository: ClickhouseEventRepository
-  ) {
+  constructor(eventRepository: EventRepository) {
     this.eventRepository = eventRepository;
-    this.clickhouseEventRepository = clickhouseEventRepository;
   }

You can also remove the unused imports of ClickhouseEventRepository, flags, FEATURE_FLAG, env, and getTaskEventStore if not needed elsewhere.

apps/webapp/app/env.server.ts (3)

1123-1125: Constrain rollout percent to a sane 0–100 range (int).

Prevents invalid percentages and accidental over-rollout.

-    EVENT_REPOSITORY_CLICKHOUSE_ROLLOUT_PERCENT: z.coerce.number().optional(),
+    EVENT_REPOSITORY_CLICKHOUSE_ROLLOUT_PERCENT: z.coerce.number().int().min(0).max(100).optional(),

1121-1122: Add lower bounds to batch size and flush interval.

Avoids zero/negative values that would stall batching.

-    EVENTS_CLICKHOUSE_BATCH_SIZE: z.coerce.number().int().default(1000),
-    EVENTS_CLICKHOUSE_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000),
+    EVENTS_CLICKHOUSE_BATCH_SIZE: z.coerce.number().int().min(1).default(1000),
+    EVENTS_CLICKHOUSE_FLUSH_INTERVAL_MS: z.coerce.number().int().min(1).default(1000),

1112-1116: Ensure EVENTS_CLICKHOUSE_URL is required when ClickHouse is used.

If EVENT_REPOSITORY_DEFAULT_STORE is "clickhouse" or rollout > 0, this must be present; today it’s optional. Either enforce in schema (discriminated union/refine) or validate at repository initialization.

Would you like me to add a schema refine that asserts EVENTS_CLICKHOUSE_URL is defined when ClickHouse is active?

apps/webapp/app/v3/services/tracePubSub.server.ts (2)

48-53: Await Redis quit to ensure clean shutdown.

Avoids dangling sockets/listeners on unsubscribe.

    const unsubscribe = async () => {
      await redis.unsubscribe(channel);
-      redis.quit();
+      await redis.quit();
      this._subscriberCount--;
    };

10-16: Consider removing or exposing _subscriberCount.

Currently unused; either remove to reduce noise or expose a getter for metrics.

apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts (4)

136-139: insertManyImmediate should bypass batching.

Immediate variant currently just queues normally. If callers rely on sync-ish persistence, write directly and publish.

-  async insertManyImmediate(events: CreateEventInput[]): Promise<void> {
-    this.insertMany(events);
-  }
+  async insertManyImmediate(events: CreateEventInput[]): Promise<void> {
+    const records = events.flatMap((e) => this.createEventToTaskEventV1Input(e));
+    const [err] = await this._clickhouse.taskEvents.insert(records);
+    if (err) throw err;
+    await this.#publishToRedis(records);
+  }

118-121: Avoid logging full event payloads at debug level.

Attributes/metadata may contain PII; logging entire batches is risky and noisy.

-      logger.debug("ClickhouseEventRepository.flushBatch", {
-        events,
-      });
+      logger.debug("ClickhouseEventRepository.flushBatch", {
+        eventCount: events.length,
+        sampleTraceId: events[0]?.trace_id,
+      });

158-161: TTL is hard-coded to 30 days in many places.

Make retention configurable (env or config param) and pass via ClickhouseEventRepositoryConfig.

Based on learnings

Also applies to: 228-231, 256-259, 288-291, 314-317, 448-450, 586-589, 610-612, 658-661, 699-701, 745-747, 791-793, 835-837


1312-1324: Harden metadata parsing.

A single malformed JSON string would throw and break the whole request. Catch and return undefined.

-  #parseMetadata(metadata: string): Record<string, unknown> | undefined {
+  #parseMetadata(metadata: string): Record<string, unknown> | undefined {
     if (!metadata) {
       return undefined;
     }

-    const parsed = JSON.parse(metadata);
+    let parsed: unknown;
+    try {
+      parsed = JSON.parse(metadata);
+    } catch {
+      return undefined;
+    }
apps/webapp/app/v3/eventRepository/common.server.ts (3)

37-41: Simplify JS Date duration calc.

Unreachable typeof check; keep it simple.

-export function calculateDurationFromStartJsDate(startTime: Date, endTime: Date = new Date()) {
-  const $endtime = typeof endTime === "string" ? new Date(endTime) : endTime;
-
-  return ($endtime.getTime() - startTime.getTime()) * 1_000_000;
-}
+export function calculateDurationFromStartJsDate(startTime: Date, endTime: Date = new Date()) {
+  return (endTime.getTime() - startTime.getTime()) * 1_000_000;
+}

102-131: Ensure error “message” is a string for CUSTOM/STRING errors.

error.raw may not be a string. Coerce safely to avoid downstream type issues.

-    case "CUSTOM_ERROR": {
-      return {
-        type: "Error",
-        message: error.raw,
-      };
-    }
+    case "CUSTOM_ERROR": {
+      return {
+        type: "Error",
+        message: typeof error.raw === "string" ? error.raw : JSON.stringify(error.raw),
+      };
+    }
@@
-    case "STRING_ERROR": {
-      return {
-        type: "Error",
-        message: error.raw,
-      };
-    }
+    case "STRING_ERROR": {
+      return {
+        type: "Error",
+        message: typeof error.raw === "string" ? error.raw : String(error.raw),
+      };
+    }

Please confirm TaskRunError["raw"] is string in all cases; if not, the above prevents non-string assignment.


158-166: Simplify empty-object check.

Small nit for readability.

-export function isEmptyObject(obj: object) {
-  for (var prop in obj) {
-    if (Object.prototype.hasOwnProperty.call(obj, prop)) {
-      return false;
-    }
-  }
-
-  return true;
-}
+export function isEmptyObject(obj: object) {
+  return Object.keys(obj).length === 0;
+}
apps/webapp/app/v3/otlpExporter.server.ts (4)

161-168: Clarify log message (it says “Skipping” but allows through).

Message is misleading.

-        logger.debug("Skipping resource span without trigger attribute", {
+        logger.debug("Allowing resource span without trigger attribute", {
           attributes: resourceSpan.resource?.attributes,
           spans: resourceSpan.scopeSpans.flatMap((scopeSpan) => scopeSpan.spans),
         });

36-41: Depend on interfaces, not concrete repos.

Constructor types should be IEventRepository to decouple from implementations.

-  constructor(
-    private readonly _eventRepository: EventRepository,
-    private readonly _clickhouseEventRepository: ClickhouseEventRepository,
+  constructor(
+    private readonly _eventRepository: IEventRepository,
+    private readonly _clickhouseEventRepository: IEventRepository,
     private readonly _verbose: boolean,
     private readonly _spanAttributeValueLengthLimit: number
   ) {

764-768: Support Uint8Array for bytes values.

OTLP bytes may arrive as Uint8Array, not Buffer.

-function isBytesValue(value: AnyValue | undefined): value is { bytesValue: Buffer } {
+function isBytesValue(value: AnyValue | undefined): value is { bytesValue: Buffer | Uint8Array } {
   if (!value) return false;
 
-  return Buffer.isBuffer(value.bytesValue);
+  return Buffer.isBuffer(value.bytesValue) || value.bytesValue instanceof Uint8Array;
}

770-777: Avoid unnecessary Buffer copy in binaryToHex.

Use Buffer’s toString directly when possible.

-function binaryToHex(buffer: Buffer | string | undefined): string | undefined {
+function binaryToHex(buffer: Buffer | Uint8Array | string | undefined): string | undefined {
   if (!buffer) return undefined;
   if (typeof buffer === "string") return buffer;
 
-  return Buffer.from(Array.from(buffer)).toString("hex");
+  return Buffer.isBuffer(buffer)
+    ? buffer.toString("hex")
+    : Buffer.from(buffer).toString("hex");
}
apps/webapp/app/v3/eventRepository/eventRepository.types.ts (1)

222-223: Prefer unknown over any for metadata.

Improves type safety at call sites.

-  metadata: any; // Used by SpanPresenter for entity processing
+  metadata: unknown; // Used by SpanPresenter for entity processing

Comment on lines +139 to +146
const originalRunId = await eventRepository.getSpanOriginalRunId(
eventStore,
environmentId,
spanId,
environmentId,
startCreatedAt: createdAt,
endCreatedAt: completedAt ?? undefined,
});
createdAt,
completedAt ?? undefined
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Pass the traceId into getSpanOriginalRunId

IEventRepository.getSpanOriginalRunId expects (storeTable, environmentId, spanId, traceId, …), but this call passes environmentId for the traceId slot. That breaks ClickHouse-backed lookups (and pollutes the cache key) so the original run is never found, causing cached runs to fall back to the plain span path. Please wire the actual traceId through.

     const originalRunId = await eventRepository.getSpanOriginalRunId(
       eventStore,
       environmentId,
       spanId,
-      environmentId,
+      traceId,
       createdAt,
       completedAt ?? undefined
     );
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const originalRunId = await eventRepository.getSpanOriginalRunId(
eventStore,
environmentId,
spanId,
environmentId,
startCreatedAt: createdAt,
endCreatedAt: completedAt ?? undefined,
});
createdAt,
completedAt ?? undefined
);
const originalRunId = await eventRepository.getSpanOriginalRunId(
eventStore,
environmentId,
spanId,
traceId,
createdAt,
completedAt ?? undefined
);
🤖 Prompt for AI Agents
In apps/webapp/app/presenters/v3/SpanPresenter.server.ts around lines 139 to
146, the call to eventRepository.getSpanOriginalRunId is passing environmentId
into the traceId parameter slot; change the fourth argument to the actual
traceId variable (ensure traceId is available in scope or retrieve it from the
event/span context) so the function is invoked as (eventStore, environmentId,
spanId, traceId, createdAt, completedAt ?? undefined), which fixes ClickHouse
lookups and prevents cache key pollution.

Comment on lines +99 to +108
if (taskEventStore) {
const resourceAttributes = JSON.stringify({
[SemanticInternalAttributes.TASK_EVENT_STORE]: taskEventStore,
});

variables.push(
...[
{ key: "OTEL_RESOURCE_ATTRIBUTES", value: resourceAttributes },
{ key: "TRIGGER_OTEL_RESOURCE_ATTRIBUTES", value: resourceAttributes },
]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Use OTEL_RESOURCE_ATTRIBUTES key=value format

OTEL_RESOURCE_ATTRIBUTES must be a comma-separated list of key=value pairs (per the OTel spec). Supplying a JSON string means the SDK drops it, so the new TASK_EVENT_STORE attribute never reaches the worker. Emit the spec-compliant string for OTEL_RESOURCE_ATTRIBUTES, keep the JSON payload for the TRIGGER_ helper var, and merge with any existing resource attributes before pushing.

-  if (taskEventStore) {
-    const resourceAttributes = JSON.stringify({
-      [SemanticInternalAttributes.TASK_EVENT_STORE]: taskEventStore,
-    });
-
-    variables.push(
-      ...[
-        { key: "OTEL_RESOURCE_ATTRIBUTES", value: resourceAttributes },
-        { key: "TRIGGER_OTEL_RESOURCE_ATTRIBUTES", value: resourceAttributes },
-      ]
-    );
-  }
+  if (taskEventStore) {
+    const otelAttribute = `${SemanticInternalAttributes.TASK_EVENT_STORE}=${taskEventStore}`;
+    const existingOtelResourceAttributes = variables.find(
+      (variable) => variable.key === "OTEL_RESOURCE_ATTRIBUTES"
+    )?.value;
+
+    const mergedOtelResourceAttributes = existingOtelResourceAttributes
+      ? `${existingOtelResourceAttributes},${otelAttribute}`
+      : otelAttribute;
+
+    const triggerResourceAttributes = JSON.stringify({
+      [SemanticInternalAttributes.TASK_EVENT_STORE]: taskEventStore,
+    });
+
+    variables.push(
+      ...[
+        { key: "OTEL_RESOURCE_ATTRIBUTES", value: mergedOtelResourceAttributes },
+        { key: "TRIGGER_OTEL_RESOURCE_ATTRIBUTES", value: triggerResourceAttributes },
+      ]
+    );
+  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (taskEventStore) {
const resourceAttributes = JSON.stringify({
[SemanticInternalAttributes.TASK_EVENT_STORE]: taskEventStore,
});
variables.push(
...[
{ key: "OTEL_RESOURCE_ATTRIBUTES", value: resourceAttributes },
{ key: "TRIGGER_OTEL_RESOURCE_ATTRIBUTES", value: resourceAttributes },
]
if (taskEventStore) {
const otelAttribute = `${SemanticInternalAttributes.TASK_EVENT_STORE}=${taskEventStore}`;
const existingOtelResourceAttributes = variables.find(
(variable) => variable.key === "OTEL_RESOURCE_ATTRIBUTES"
)?.value;
const mergedOtelResourceAttributes = existingOtelResourceAttributes
? `${existingOtelResourceAttributes},${otelAttribute}`
: otelAttribute;
const triggerResourceAttributes = JSON.stringify({
[SemanticInternalAttributes.TASK_EVENT_STORE]: taskEventStore,
});
variables.push(
...[
{ key: "OTEL_RESOURCE_ATTRIBUTES", value: mergedOtelResourceAttributes },
{ key: "TRIGGER_OTEL_RESOURCE_ATTRIBUTES", value: triggerResourceAttributes },
]
);
}
🤖 Prompt for AI Agents
In
apps/webapp/app/routes/engine.v1.dev.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.attempts.start.ts
around lines 99-108, the code currently builds resourceAttributes as a JSON
string and sets both OTEL_RESOURCE_ATTRIBUTES and
TRIGGER_OTEL_RESOURCE_ATTRIBUTES to it; OTEL_RESOURCE_ATTRIBUTES must be a
comma-separated list of key=value pairs per the OTel spec so the SDK will accept
it. Change the logic to: build a spec-compliant kvString like
"task_event_store=..."" (escape/quote values as needed), merge that kvString
with any existing OTEL_RESOURCE_ATTRIBUTES (comma-append, avoid duplicate keys),
set OTEL_RESOURCE_ATTRIBUTES to the resulting comma-separated string, keep the
original JSON string for TRIGGER_OTEL_RESOURCE_ATTRIBUTES, and then push both
variables as before.

Comment on lines +275 to +278
if (spanOverrides.duration) {
newSpan.duration = spanOverrides.duration;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Handle zero-duration overrides.

This guard skips applying overrides when duration is 0, which is a legitimate value (e.g., for instantaneous spans). As a result, an override coming from the repository to clamp a span to zero length will be ignored and the UI will continue to show the stale duration. Flip the check to compare against undefined.

-  if (spanOverrides.duration) {
+  if (spanOverrides.duration !== undefined) {
     newSpan.duration = spanOverrides.duration;
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (spanOverrides.duration) {
newSpan.duration = spanOverrides.duration;
}
if (spanOverrides.duration !== undefined) {
newSpan.duration = spanOverrides.duration;
}
🤖 Prompt for AI Agents
In
apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx
around lines 275 to 278, the current guard `if (spanOverrides.duration)` skips
legitimate zero values; change the condition to explicitly detect undefined
(e.g., `if (spanOverrides.duration !== undefined)`) so a duration of 0 is
applied; ensure the check covers the property possibly being null if needed by
using a strict undefined check.

Comment on lines +125 to +127
function getDateFromNanoseconds(nanoseconds: bigint) {
return new Date(Number(nanoseconds) / 1_000_000);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

BigInt → Date conversion loses precision; divide before Number().

Do integer division in BigInt space to milliseconds, then convert. This avoids precision loss/overflow.

Apply:

-function getDateFromNanoseconds(nanoseconds: bigint) {
-  return new Date(Number(nanoseconds) / 1_000_000);
-}
+function getDateFromNanoseconds(nanoseconds: bigint) {
+  const ms = nanoseconds / 1_000_000n;
+  return new Date(Number(ms));
+}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
function getDateFromNanoseconds(nanoseconds: bigint) {
return new Date(Number(nanoseconds) / 1_000_000);
}
function getDateFromNanoseconds(nanoseconds: bigint) {
const ms = nanoseconds / 1_000_000n;
return new Date(Number(ms));
}
🤖 Prompt for AI Agents
In apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts around lines
125 to 127, the getDateFromNanoseconds function converts bigint nanoseconds to
Date by first casting to Number which can lose precision; change the approach to
perform integer division in BigInt to convert nanoseconds to milliseconds (e.g.,
const ms = nanoseconds / 1_000_000n) and then pass Number(ms) into new
Date(Number(ms)); this preserves precision and avoids overflow before the
Number() conversion.

Comment on lines +246 to 248
if (this.batchQueue.length > 0 && !this.isShuttingDown && this.consecutiveFlushFailures < 3) {
this.flushBatches();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Shutdown now deadlocks with pending batches

Once shutdown() sets isShuttingDown = true, the flush timer is cleared and we rely on the recursive call at the end of flushBatches() to drain whatever remains beyond the first this.limiter.concurrency batches. The new !this.isShuttingDown guard blocks that recursion, so any extra batches sit in batchQueue forever and the while (this.batchQueue.length > 0 || this.limiter.activeCount > 0) loop in shutdown() never exits. This is a new correctness regression that prevents graceful shutdown whenever more than concurrency batches are queued.

We still need the consecutive failure protection, but we should allow recursion during shutdown so the queue can drain.

-      if (this.batchQueue.length > 0 && !this.isShuttingDown && this.consecutiveFlushFailures < 3) {
-        this.flushBatches();
-      }
+      const shouldContinueFlushing =
+        this.batchQueue.length > 0 &&
+        (this.consecutiveFlushFailures < 3 || this.isShuttingDown);
+
+      if (shouldContinueFlushing) {
+        this.flushBatches();
+      }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (this.batchQueue.length > 0 && !this.isShuttingDown && this.consecutiveFlushFailures < 3) {
this.flushBatches();
}
const shouldContinueFlushing =
this.batchQueue.length > 0 &&
(this.consecutiveFlushFailures < 3 || this.isShuttingDown);
if (shouldContinueFlushing) {
this.flushBatches();
}
🤖 Prompt for AI Agents
In apps/webapp/app/v3/dynamicFlushScheduler.server.ts around lines 246-248, the
recursive call in flushBatches is blocked by the new !this.isShuttingDown guard
which prevents draining remaining batches during shutdown; keep the
consecutiveFlushFailures < 3 protection but remove the isShuttingDown check for
the recursive flush invocation so flushBatches can continue draining the queue
when shutdown has started (still avoid scheduling new timer-based flushes when
isShuttingDown is true).

Comment on lines +15 to +16
parent_span_id: z.string(),
message: z.string(),
Copy link
Contributor

@coderabbitai coderabbitai bot Sep 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Allow null parent span IDs in task event schemas.

Root spans land in ClickHouse with parent_span_id = NULL. The current z.string() validation will reject those rows, causing trace reads to blow up as soon as a root span is encountered. Relax the field to .nullable() everywhere we deserialize/serialize it (input + all query result schemas) so we can round-trip root spans correctly.

-  parent_span_id: z.string(),
+  parent_span_id: z.string().nullable(),

Replicate the same change in TaskEventSummaryV1Result, TaskEventDetailedSummaryV1Result, and TaskEventDetailsV1Result.

Also applies to: 40-41, 64-65, 92-93

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because we are storing the parent_span_id as an empty string in clickhouse, so this critical issue is not an issue at all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ericallam Thank you for the clarification! You're absolutely right - if you're storing parent_span_id as an empty string rather than NULL for root spans, then z.string() validation is perfectly appropriate and my concern was unfounded. I appreciate you explaining the implementation detail about how you handle root spans in your ClickHouse storage layer.

   ∩---∩
  (  . .)  Thanks for the
   o_(")(") clarification!

✏️ Learnings added
Learnt from: ericallam
PR: triggerdotdev/trigger.dev#2567
File: internal-packages/clickhouse/src/taskEvents.ts:15-16
Timestamp: 2025-09-27T21:10:34.192Z
Learning: In the ClickHouse implementation, parent_span_id is stored as an empty string ("") for root spans, not NULL. This means z.string() validation is correct and doesn't need to be nullable.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant