Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions dist/src/langfuse.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,19 @@ export interface LangfuseConfig {
fetchImpl?: typeof fetch;
flushTimeoutMs?: number;
}
export interface RootGenerationStartData {
name: string;
input: unknown;
model: string;
iteration: number;
}
export interface RootGenerationEndData {
output: unknown;
durationMs: number;
usage?: UsageStats;
isError?: boolean;
errorMessage?: string;
}
export declare class LangfuseTraceRecorder {
private host;
private publicKey;
Expand All @@ -29,6 +42,8 @@ export declare class LangfuseTraceRecorder {
userId?: string;
metadata?: Record<string, unknown>;
}): void;
rootGenerationStart(data: RootGenerationStartData): string;
rootGenerationEnd(generationId: string, data: RootGenerationEndData): void;
childStart(data: {
parentRunId: string;
childRunId?: string;
Expand Down
58 changes: 58 additions & 0 deletions dist/src/langfuse.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 27 additions & 2 deletions dist/src/rlm.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 44 additions & 0 deletions dist/tests/recursive-trace.test.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

73 changes: 73 additions & 0 deletions src/langfuse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,21 @@ interface LangfuseEvent {
body: Record<string, unknown>;
}

export interface RootGenerationStartData {
name: string;
input: unknown;
model: string;
iteration: number;
}

export interface RootGenerationEndData {
output: unknown;
durationMs: number;
usage?: UsageStats;
isError?: boolean;
errorMessage?: string;
}

export class LangfuseTraceRecorder {
private host: string | null;
private publicKey: string | null;
Expand Down Expand Up @@ -62,6 +77,64 @@ export class LangfuseTraceRecorder {
});
}

rootGenerationStart(data: RootGenerationStartData): string {
const generationId = randomUUID();
if (!this.enabled) return generationId;
this.enqueue("generation-create", {
id: generationId,
traceId: this.traceId,
name: data.name,
model: data.model,
input: data.input,
startTime: new Date().toISOString(),
metadata: {
event: "root_generation_start",
iteration: data.iteration,
},
});
return generationId;
}

rootGenerationEnd(generationId: string, data: RootGenerationEndData): void {
if (!this.enabled) return;
const input = data.usage?.inputTokens ?? 0;
const output = data.usage?.outputTokens ?? 0;
const cacheRead = data.usage?.cacheReadTokens ?? 0;
const cacheWrite = data.usage?.cacheWriteTokens ?? 0;
this.enqueue("generation-update", {
id: generationId,
output: data.output,
endTime: new Date().toISOString(),
level: data.isError ? "ERROR" : "DEFAULT",
statusMessage: data.errorMessage,
usage: {
input,
output,
total: input + output + cacheRead + cacheWrite,
},
usageDetails: {
input,
output,
cache_read: cacheRead,
cache_write: cacheWrite,
total: input + output + cacheRead + cacheWrite,
},
costDetails: {
total: data.usage?.totalCost ?? 0,
},
Comment on lines +122 to +124
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Langfuse uses the standard top-level cost field (a number) to track and display generation costs in the UI. Passing only costDetails might result in the cost not being displayed correctly in standard Langfuse cost columns. Adding cost at the top level of the payload ensures standard cost tracking works out of the box.

      cost: data.usage?.totalCost ?? 0,
      costDetails: {
        total: data.usage?.totalCost ?? 0,
      },

metadata: {
event: "root_generation_end",
duration_ms: data.durationMs,
input_tokens: input,
output_tokens: output,
cache_read_tokens: cacheRead,
cache_write_tokens: cacheWrite,
total_cost: data.usage?.totalCost ?? 0,
llm_calls: data.usage?.llmCalls ?? 0,
},
});
}

childStart(data: {
parentRunId: string;
childRunId?: string;
Expand Down
31 changes: 29 additions & 2 deletions src/rlm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,12 @@ export async function rlmLoop(

// Call LLM
const llmStartMs = Date.now();
const generationId = langfuse.rootGenerationStart({
name: `Model call — root iteration ${iteration + 1}`,
input: messages,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Snapshot generation input before queueing

Passing the live messages array here means the queued generation-create event does not preserve the prompt that was actually sent for this LLM call. rootGenerationStart stores the object reference and the batch is JSON-serialized only later during langfuse.flush(), while the loop appends assistant/user messages and mutates the last user message for soft-limit nudges before that flush. In multi-iteration runs, earlier generation inputs will therefore show the final mutated conversation (often including that generation's own output) instead of the per-call input, corrupting the Langfuse trace.

Useful? React with 👍 / 👎.

model: `${config.model.provider}/${config.model.model}`,
iteration,
});
Comment on lines +429 to +434
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Snapshot messages when starting the root generation.

input: messages stores a reference to the live array. Events are only serialized later in flush() (JSON.stringify(batch)), but messages keeps getting mutated each iteration (messages.push(...) and the nudge lastMsg.content += nudge). As a result every generation-create will serialize the final full conversation as its input, not the input at that iteration — defeating the per-iteration capture.

🐛 Capture a snapshot at start time
       const generationId = langfuse.rootGenerationStart({
         name: `Model call — root iteration ${iteration + 1}`,
-        input: messages,
+        input: [...messages],
         model: `${config.model.provider}/${config.model.model}`,
         iteration,
       });
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const generationId = langfuse.rootGenerationStart({
name: `Model call — root iteration ${iteration + 1}`,
input: messages,
model: `${config.model.provider}/${config.model.model}`,
iteration,
});
const generationId = langfuse.rootGenerationStart({
name: `Model call — root iteration ${iteration + 1}`,
input: [...messages],
model: `${config.model.provider}/${config.model.model}`,
iteration,
});
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/rlm.ts` around lines 429 - 434, The generation-create currently passes a
live reference (messages) to langfuse.rootGenerationStart so later serialization
in flush() captures the mutated final conversation instead of the per-iteration
input; fix by snapshotting messages when calling langfuse.rootGenerationStart
(create a deep copy of the messages array/objects — e.g., clone each message
object or use a safe deep-clone) and pass that snapshot as input; ensure this
change is applied where generationId is created (langfuse.rootGenerationStart)
so subsequent flush()/JSON.stringify(batch) serializes the immutable
per-iteration input rather than the live messages array.

const response = await llmComplete(messages, config.model, {
signal: abortController.signal,
cacheConfig,
Expand All @@ -434,6 +440,11 @@ export async function rlmLoop(
geminiConfig: config.gemini,
});
const llmDurationMs = Date.now() - llmStartMs;
langfuse.rootGenerationEnd(generationId, {
output: response.text,
durationMs: llmDurationMs,
usage: response.usage,
});
mergeUsage(usage, response.usage);
budget.record(response.usage.inputTokens, response.usage.outputTokens, response.usage.totalCost);

Expand Down Expand Up @@ -664,7 +675,7 @@ export async function rlmLoop(
logVerbose(actualIterations, `${reason}, forcing final answer`);
}

const forcedResult = await forceFinalAnswer(messages, config, usage, abortController.signal, cacheConfig);
const forcedResult = await forceFinalAnswer(messages, config, usage, abortController.signal, cacheConfig, langfuse, actualIterations);
return finalize(forcedResult, actualIterations);
} catch (err: unknown) {
clearTimeout(timeoutHandle);
Expand Down Expand Up @@ -697,7 +708,9 @@ async function forceFinalAnswer(
config: RlmxConfig,
usage: UsageStats,
signal?: AbortSignal,
cacheConfig?: CacheLLMConfig
cacheConfig?: CacheLLMConfig,
langfuse?: LangfuseTraceRecorder,
iteration = 0
): Promise<string> {
const forceMessages: ChatMessage[] = [
...messages,
Expand All @@ -708,13 +721,27 @@ async function forceFinalAnswer(
},
];

const generationId = langfuse?.rootGenerationStart({
name: "Model call — forced final answer",
input: forceMessages,
model: `${config.model.provider}/${config.model.model}`,
iteration,
});
const llmStartMs = Date.now();
const response = await llmComplete(forceMessages, config.model, {
signal,
cacheConfig,
thinkingLevel: config.gemini.thinkingLevel,
outputSchema: config.output.schema,
geminiConfig: config.gemini,
});
if (generationId) {
langfuse?.rootGenerationEnd(generationId, {
output: response.text,
durationMs: Date.now() - llmStartMs,
usage: response.usage,
});
}
Comment on lines +724 to +744
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similar to the main loop iteration, if llmComplete throws an error during the forced final answer call, the generation is left dangling in Langfuse. Wrapping this call in a try...catch block ensures the generation is correctly closed with an error status.

  const generationId = langfuse?.rootGenerationStart({
    name: "Model call — forced final answer",
    input: forceMessages,
    model: `${config.model.provider}/${config.model.model}`,
    iteration,
  });
  const llmStartMs = Date.now();
  let response: any;
  try {
    response = await llmComplete(forceMessages, config.model, {
      signal,
      cacheConfig,
      thinkingLevel: config.gemini.thinkingLevel,
      outputSchema: config.output.schema,
      geminiConfig: config.gemini,
    });
    if (generationId) {
      langfuse?.rootGenerationEnd(generationId, {
        output: response.text,
        durationMs: Date.now() - llmStartMs,
        usage: response.usage,
      });
    }
  } catch (err) {
    if (generationId) {
      langfuse?.rootGenerationEnd(generationId, {
        output: null,
        durationMs: Date.now() - llmStartMs,
        isError: true,
        errorMessage: err instanceof Error ? err.message : String(err),
      });
    }
    throw err;
  }

mergeUsage(usage, response.usage);
return response.text;
}
Expand Down
Loading
Loading