From 6d0c50fc1162b7787c2ed860cf7b31c8f3b6ae76 Mon Sep 17 00:00:00 2001 From: Genie Automagik Date: Fri, 29 May 2026 15:48:24 +0000 Subject: [PATCH] fix: emit Langfuse generations for root RLM calls --- dist/src/langfuse.d.ts | 15 ++++++ dist/src/langfuse.js | 58 ++++++++++++++++++++++++ dist/src/rlm.js | 29 +++++++++++- dist/tests/recursive-trace.test.js | 44 ++++++++++++++++++ src/langfuse.ts | 73 ++++++++++++++++++++++++++++++ src/rlm.ts | 31 ++++++++++++- tests/recursive-trace.test.ts | 47 +++++++++++++++++++ 7 files changed, 293 insertions(+), 4 deletions(-) diff --git a/dist/src/langfuse.d.ts b/dist/src/langfuse.d.ts index 4ea8e6f..c9f4e53 100644 --- a/dist/src/langfuse.d.ts +++ b/dist/src/langfuse.d.ts @@ -12,6 +12,19 @@ export interface LangfuseConfig { fetchImpl?: typeof fetch; flushTimeoutMs?: number; } +export interface RootGenerationStartData { + name: string; + input: unknown; + model: string; + iteration: number; +} +export interface RootGenerationEndData { + output: unknown; + durationMs: number; + usage?: UsageStats; + isError?: boolean; + errorMessage?: string; +} export declare class LangfuseTraceRecorder { private host; private publicKey; @@ -29,6 +42,8 @@ export declare class LangfuseTraceRecorder { userId?: string; metadata?: Record; }): void; + rootGenerationStart(data: RootGenerationStartData): string; + rootGenerationEnd(generationId: string, data: RootGenerationEndData): void; childStart(data: { parentRunId: string; childRunId?: string; diff --git a/dist/src/langfuse.js b/dist/src/langfuse.js index d4d3a63..7362d53 100644 --- a/dist/src/langfuse.js +++ b/dist/src/langfuse.js @@ -41,6 +41,64 @@ export class LangfuseTraceRecorder { tags: ["rlmx", "recursive-tree"], }); } + rootGenerationStart(data) { + const generationId = randomUUID(); + if (!this.enabled) + return generationId; + this.enqueue("generation-create", { + id: generationId, + traceId: this.traceId, + name: data.name, + model: data.model, + input: data.input, + startTime: new Date().toISOString(), + metadata: { + event: "root_generation_start", + iteration: data.iteration, + }, + }); + return generationId; + } + rootGenerationEnd(generationId, data) { + if (!this.enabled) + return; + const input = data.usage?.inputTokens ?? 0; + const output = data.usage?.outputTokens ?? 0; + const cacheRead = data.usage?.cacheReadTokens ?? 0; + const cacheWrite = data.usage?.cacheWriteTokens ?? 0; + this.enqueue("generation-update", { + id: generationId, + output: data.output, + endTime: new Date().toISOString(), + level: data.isError ? "ERROR" : "DEFAULT", + statusMessage: data.errorMessage, + usage: { + input, + output, + total: input + output + cacheRead + cacheWrite, + }, + usageDetails: { + input, + output, + cache_read: cacheRead, + cache_write: cacheWrite, + total: input + output + cacheRead + cacheWrite, + }, + costDetails: { + total: data.usage?.totalCost ?? 0, + }, + metadata: { + event: "root_generation_end", + duration_ms: data.durationMs, + input_tokens: input, + output_tokens: output, + cache_read_tokens: cacheRead, + cache_write_tokens: cacheWrite, + total_cost: data.usage?.totalCost ?? 0, + llm_calls: data.usage?.llmCalls ?? 0, + }, + }); + } childStart(data) { const spanId = randomUUID(); if (!this.enabled) diff --git a/dist/src/rlm.js b/dist/src/rlm.js index 651dca6..804c087 100644 --- a/dist/src/rlm.js +++ b/dist/src/rlm.js @@ -318,6 +318,12 @@ export async function rlmLoop(query, context, config, options = {}) { } // Call LLM const llmStartMs = Date.now(); + const generationId = langfuse.rootGenerationStart({ + name: `Model call — root iteration ${iteration + 1}`, + input: messages, + model: `${config.model.provider}/${config.model.model}`, + iteration, + }); const response = await llmComplete(messages, config.model, { signal: abortController.signal, cacheConfig, @@ -326,6 +332,11 @@ export async function rlmLoop(query, context, config, options = {}) { geminiConfig: config.gemini, }); const llmDurationMs = Date.now() - llmStartMs; + langfuse.rootGenerationEnd(generationId, { + output: response.text, + durationMs: llmDurationMs, + usage: response.usage, + }); mergeUsage(usage, response.usage); budget.record(response.usage.inputTokens, response.usage.outputTokens, response.usage.totalCost); // Record LLM call to observability @@ -504,7 +515,7 @@ export async function rlmLoop(query, context, config, options = {}) { const reason = budget.isExceeded() ? "budget exceeded" : abortController.signal.aborted ? "timeout" : "max iterations reached"; logVerbose(actualIterations, `${reason}, forcing final answer`); } - const forcedResult = await forceFinalAnswer(messages, config, usage, abortController.signal, cacheConfig); + const forcedResult = await forceFinalAnswer(messages, config, usage, abortController.signal, cacheConfig, langfuse, actualIterations); return finalize(forcedResult, actualIterations); } catch (err) { @@ -523,7 +534,7 @@ export async function rlmLoop(query, context, config, options = {}) { /** * Force the LLM to produce a final answer when max iterations are reached. */ -async function forceFinalAnswer(messages, config, usage, signal, cacheConfig) { +async function forceFinalAnswer(messages, config, usage, signal, cacheConfig, langfuse, iteration = 0) { const forceMessages = [ ...messages, { @@ -531,6 +542,13 @@ async function forceFinalAnswer(messages, config, usage, signal, cacheConfig) { content: "You have reached the maximum number of iterations. Please provide your best final answer NOW based on what you've learned so far. Respond with just the answer, no FINAL() wrapper needed.", }, ]; + const generationId = langfuse?.rootGenerationStart({ + name: "Model call — forced final answer", + input: forceMessages, + model: `${config.model.provider}/${config.model.model}`, + iteration, + }); + const llmStartMs = Date.now(); const response = await llmComplete(forceMessages, config.model, { signal, cacheConfig, @@ -538,6 +556,13 @@ async function forceFinalAnswer(messages, config, usage, signal, cacheConfig) { outputSchema: config.output.schema, geminiConfig: config.gemini, }); + if (generationId) { + langfuse?.rootGenerationEnd(generationId, { + output: response.text, + durationMs: Date.now() - llmStartMs, + usage: response.usage, + }); + } mergeUsage(usage, response.usage); return response.text; } diff --git a/dist/tests/recursive-trace.test.js b/dist/tests/recursive-trace.test.js index ae73de9..30e4e05 100644 --- a/dist/tests/recursive-trace.test.js +++ b/dist/tests/recursive-trace.test.js @@ -97,5 +97,49 @@ describe("recursive RLM tracing helpers", () => { assert.equal(batch.filter((e) => e.type === "span-update").length, 1); assert.equal(batch.find((e) => e.type === "span-update").body.metadata.child_run_id, "child-run"); }); + it("builds root Langfuse generation create/update events with model, IO, usage, and latency", async () => { + const payloads = []; + const recorder = new LangfuseTraceRecorder({ + host: "https://langfuse.example", + publicKey: "pk", + secretKey: "sk", + fetchImpl: async (_url, init) => { + payloads.push(JSON.parse(String(init?.body))); + return new Response("{}", { status: 200 }); + }, + }); + recorder.startTrace({ runId: "root-run", query: "root query", model: "anthropic/claude-opus-4-8" }); + const generationId = recorder.rootGenerationStart({ + name: "Model call — root iteration 1", + input: [{ role: "user", content: "hello" }], + model: "anthropic/claude-opus-4-8", + iteration: 0, + }); + recorder.rootGenerationEnd(generationId, { + output: "world", + durationMs: 123, + usage: { inputTokens: 5, outputTokens: 7, cacheReadTokens: 0, cacheWriteTokens: 2, totalCost: 0.42, llmCalls: 1 }, + }); + await recorder.flush(); + const batch = payloads.flatMap((p) => p.batch); + const generationCreate = batch.find((e) => e.type === "generation-create"); + const generationUpdate = batch.find((e) => e.type === "generation-update"); + assert.ok(generationCreate, "generation-create event missing"); + assert.ok(generationUpdate, "generation-update event missing"); + assert.equal(generationCreate.body.traceId, "root-run"); + assert.equal(generationCreate.body.model, "anthropic/claude-opus-4-8"); + assert.deepEqual(generationCreate.body.input, [{ role: "user", content: "hello" }]); + assert.equal(generationUpdate.body.output, "world"); + assert.equal(generationUpdate.body.usage.input, 5); + assert.equal(generationUpdate.body.usage.output, 7); + assert.equal(generationUpdate.body.usage.total, 14); + assert.equal(generationUpdate.body.usageDetails.input, 5); + assert.equal(generationUpdate.body.usageDetails.output, 7); + assert.equal(generationUpdate.body.usageDetails.cache_read, 0); + assert.equal(generationUpdate.body.usageDetails.cache_write, 2); + assert.equal(generationUpdate.body.costDetails.total, 0.42); + assert.equal(generationUpdate.body.metadata.duration_ms, 123); + assert.equal(generationUpdate.body.metadata.total_cost, 0.42); + }); }); //# sourceMappingURL=recursive-trace.test.js.map \ No newline at end of file diff --git a/src/langfuse.ts b/src/langfuse.ts index 5e5ceb0..6760a06 100644 --- a/src/langfuse.ts +++ b/src/langfuse.ts @@ -23,6 +23,21 @@ interface LangfuseEvent { body: Record; } +export interface RootGenerationStartData { + name: string; + input: unknown; + model: string; + iteration: number; +} + +export interface RootGenerationEndData { + output: unknown; + durationMs: number; + usage?: UsageStats; + isError?: boolean; + errorMessage?: string; +} + export class LangfuseTraceRecorder { private host: string | null; private publicKey: string | null; @@ -62,6 +77,64 @@ export class LangfuseTraceRecorder { }); } + rootGenerationStart(data: RootGenerationStartData): string { + const generationId = randomUUID(); + if (!this.enabled) return generationId; + this.enqueue("generation-create", { + id: generationId, + traceId: this.traceId, + name: data.name, + model: data.model, + input: data.input, + startTime: new Date().toISOString(), + metadata: { + event: "root_generation_start", + iteration: data.iteration, + }, + }); + return generationId; + } + + rootGenerationEnd(generationId: string, data: RootGenerationEndData): void { + if (!this.enabled) return; + const input = data.usage?.inputTokens ?? 0; + const output = data.usage?.outputTokens ?? 0; + const cacheRead = data.usage?.cacheReadTokens ?? 0; + const cacheWrite = data.usage?.cacheWriteTokens ?? 0; + this.enqueue("generation-update", { + id: generationId, + output: data.output, + endTime: new Date().toISOString(), + level: data.isError ? "ERROR" : "DEFAULT", + statusMessage: data.errorMessage, + usage: { + input, + output, + total: input + output + cacheRead + cacheWrite, + }, + usageDetails: { + input, + output, + cache_read: cacheRead, + cache_write: cacheWrite, + total: input + output + cacheRead + cacheWrite, + }, + costDetails: { + total: data.usage?.totalCost ?? 0, + }, + metadata: { + event: "root_generation_end", + duration_ms: data.durationMs, + input_tokens: input, + output_tokens: output, + cache_read_tokens: cacheRead, + cache_write_tokens: cacheWrite, + total_cost: data.usage?.totalCost ?? 0, + llm_calls: data.usage?.llmCalls ?? 0, + }, + }); + } + childStart(data: { parentRunId: string; childRunId?: string; diff --git a/src/rlm.ts b/src/rlm.ts index b00ad90..47fa939 100644 --- a/src/rlm.ts +++ b/src/rlm.ts @@ -426,6 +426,12 @@ export async function rlmLoop( // Call LLM const llmStartMs = Date.now(); + const generationId = langfuse.rootGenerationStart({ + name: `Model call — root iteration ${iteration + 1}`, + input: messages, + model: `${config.model.provider}/${config.model.model}`, + iteration, + }); const response = await llmComplete(messages, config.model, { signal: abortController.signal, cacheConfig, @@ -434,6 +440,11 @@ export async function rlmLoop( geminiConfig: config.gemini, }); const llmDurationMs = Date.now() - llmStartMs; + langfuse.rootGenerationEnd(generationId, { + output: response.text, + durationMs: llmDurationMs, + usage: response.usage, + }); mergeUsage(usage, response.usage); budget.record(response.usage.inputTokens, response.usage.outputTokens, response.usage.totalCost); @@ -664,7 +675,7 @@ export async function rlmLoop( logVerbose(actualIterations, `${reason}, forcing final answer`); } - const forcedResult = await forceFinalAnswer(messages, config, usage, abortController.signal, cacheConfig); + const forcedResult = await forceFinalAnswer(messages, config, usage, abortController.signal, cacheConfig, langfuse, actualIterations); return finalize(forcedResult, actualIterations); } catch (err: unknown) { clearTimeout(timeoutHandle); @@ -697,7 +708,9 @@ async function forceFinalAnswer( config: RlmxConfig, usage: UsageStats, signal?: AbortSignal, - cacheConfig?: CacheLLMConfig + cacheConfig?: CacheLLMConfig, + langfuse?: LangfuseTraceRecorder, + iteration = 0 ): Promise { const forceMessages: ChatMessage[] = [ ...messages, @@ -708,6 +721,13 @@ async function forceFinalAnswer( }, ]; + const generationId = langfuse?.rootGenerationStart({ + name: "Model call — forced final answer", + input: forceMessages, + model: `${config.model.provider}/${config.model.model}`, + iteration, + }); + const llmStartMs = Date.now(); const response = await llmComplete(forceMessages, config.model, { signal, cacheConfig, @@ -715,6 +735,13 @@ async function forceFinalAnswer( outputSchema: config.output.schema, geminiConfig: config.gemini, }); + if (generationId) { + langfuse?.rootGenerationEnd(generationId, { + output: response.text, + durationMs: Date.now() - llmStartMs, + usage: response.usage, + }); + } mergeUsage(usage, response.usage); return response.text; } diff --git a/tests/recursive-trace.test.ts b/tests/recursive-trace.test.ts index d0642dd..92414c9 100644 --- a/tests/recursive-trace.test.ts +++ b/tests/recursive-trace.test.ts @@ -109,4 +109,51 @@ describe("recursive RLM tracing helpers", () => { assert.equal(batch.filter((e) => (e as { type: string }).type === "span-update").length, 1); assert.equal((batch.find((e) => (e as { type: string }).type === "span-update") as { body: { metadata: { child_run_id: string } } }).body.metadata.child_run_id, "child-run"); }); + + it("builds root Langfuse generation create/update events with model, IO, usage, and latency", async () => { + const payloads: unknown[] = []; + const recorder = new LangfuseTraceRecorder({ + host: "https://langfuse.example", + publicKey: "pk", + secretKey: "sk", + fetchImpl: async (_url: string | URL | Request, init?: RequestInit) => { + payloads.push(JSON.parse(String(init?.body))); + return new Response("{}", { status: 200 }); + }, + }); + + recorder.startTrace({ runId: "root-run", query: "root query", model: "anthropic/claude-opus-4-8" }); + const generationId = recorder.rootGenerationStart({ + name: "Model call — root iteration 1", + input: [{ role: "user", content: "hello" }], + model: "anthropic/claude-opus-4-8", + iteration: 0, + }); + recorder.rootGenerationEnd(generationId, { + output: "world", + durationMs: 123, + usage: { inputTokens: 5, outputTokens: 7, cacheReadTokens: 0, cacheWriteTokens: 2, totalCost: 0.42, llmCalls: 1 }, + }); + await recorder.flush(); + + const batch = payloads.flatMap((p) => (p as { batch: unknown[] }).batch) as Array<{ type: string; body: Record }>; + const generationCreate = batch.find((e) => e.type === "generation-create"); + const generationUpdate = batch.find((e) => e.type === "generation-update"); + assert.ok(generationCreate, "generation-create event missing"); + assert.ok(generationUpdate, "generation-update event missing"); + assert.equal(generationCreate.body.traceId, "root-run"); + assert.equal(generationCreate.body.model, "anthropic/claude-opus-4-8"); + assert.deepEqual(generationCreate.body.input, [{ role: "user", content: "hello" }]); + assert.equal(generationUpdate.body.output, "world"); + assert.equal(generationUpdate.body.usage.input, 5); + assert.equal(generationUpdate.body.usage.output, 7); + assert.equal(generationUpdate.body.usage.total, 14); + assert.equal(generationUpdate.body.usageDetails.input, 5); + assert.equal(generationUpdate.body.usageDetails.output, 7); + assert.equal(generationUpdate.body.usageDetails.cache_read, 0); + assert.equal(generationUpdate.body.usageDetails.cache_write, 2); + assert.equal(generationUpdate.body.costDetails.total, 0.42); + assert.equal(generationUpdate.body.metadata.duration_ms, 123); + assert.equal(generationUpdate.body.metadata.total_cost, 0.42); + }); });