diff --git a/genkit-tools/common/src/eval/evaluate.ts b/genkit-tools/common/src/eval/evaluate.ts index d7b391f91d..596f33cfc9 100644 --- a/genkit-tools/common/src/eval/evaluate.ts +++ b/genkit-tools/common/src/eval/evaluate.ts @@ -491,6 +491,8 @@ async function gatherEvalInput(params: { // Only the last collected trace to be used for evaluation. const traceId = traceIds.at(-1)!; + // Sleep to let traces persist. + await new Promise((resolve) => setTimeout(resolve, 2000)); const trace = await manager.getTrace({ traceId, }); diff --git a/genkit-tools/telemetry-server/src/index.ts b/genkit-tools/telemetry-server/src/index.ts index 26fba05238..7f3fe61cae 100644 --- a/genkit-tools/telemetry-server/src/index.ts +++ b/genkit-tools/telemetry-server/src/index.ts @@ -91,6 +91,41 @@ export async function startTelemetryServer(params: { } }); + api.post( + '/api/otlp/:parentTraceId/:parentSpanId', + async (request, response) => { + try { + const { parentTraceId, parentSpanId } = request.params; + + if (!request.body.resourceSpans?.length) { + // Acknowledge and ignore empty payloads. + response.status(200).json({}); + return; + } + const traces = traceDataFromOtlp(request.body); + for (const traceData of traces) { + traceData.traceId = parentTraceId; + for (const span of Object.values(traceData.spans)) { + span.attributes['genkit:otlp-traceId'] = span.traceId; + span.traceId = parentTraceId; + if (!span.parentSpanId) { + span.parentSpanId = parentSpanId; + } + } + await params.traceStore.save(parentTraceId, traceData); + } + response.status(200).json({}); + } catch (err) { + logger.error(`Error processing OTLP payload: ${err}`); + response.status(500).json({ + code: 13, // INTERNAL + message: + 'An internal error occurred while processing the OTLP payload.', + }); + } + } + ); + api.post('/api/otlp', async (request, response) => { try { if (!request.body.resourceSpans?.length) { @@ -99,8 +134,7 @@ export async function startTelemetryServer(params: { return; } const traces = traceDataFromOtlp(request.body); - for (const trace of traces) { - const traceData = TraceDataSchema.parse(trace); + for (const traceData of traces) { await params.traceStore.save(traceData.traceId, traceData); } response.status(200).json({}); diff --git a/genkit-tools/telemetry-server/tests/file_store_test.ts b/genkit-tools/telemetry-server/tests/file_store_test.ts index d6e47f5997..588be2b6c0 100644 --- a/genkit-tools/telemetry-server/tests/file_store_test.ts +++ b/genkit-tools/telemetry-server/tests/file_store_test.ts @@ -537,3 +537,397 @@ describe('index', () => { assert.strictEqual(result4.pageLastIndex, undefined); }); }); + +describe('otlp-endpoint', () => { + let port: number; + let storeRoot: string; + let indexRoot: string; + let url: string; + + beforeEach(async () => { + port = await getPort(); + url = `http://localhost:${port}`; + storeRoot = path.resolve( + os.tmpdir(), + `./telemetry-server-api-test-${Date.now()}/traces` + ); + indexRoot = path.resolve( + os.tmpdir(), + `./telemetry-server-api-test-${Date.now()}/traces_idx` + ); + + await startTelemetryServer({ + port, + traceStore: new LocalFileTraceStore({ + storeRoot, + indexRoot, + }), + }); + }); + + afterEach(async () => { + await stopTelemetryApi(); + }); + + it('saves a single trace', async () => { + const traceId = 'childTraceId'; + const otlpPayload = { + resourceSpans: [ + { + resource: { + attributes: [ + { key: 'service.name', value: { stringValue: 'test' } }, + ], + }, + scopeSpans: [ + { + scope: { name: 'test-scope' }, + spans: [ + { + traceId, + spanId: 'childSpanId1', + name: 'span1', + startTimeUnixNano: '1000000', + endTimeUnixNano: '2000000', + kind: 1, + attributes: [], + }, + ], + }, + ], + }, + ], + }; + + const res = await fetch(`${url}/api/otlp`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(otlpPayload), + }); + assert.strictEqual(res.status, 200); + + const getResp = await fetch(`${url}/api/traces/${traceId}`); + assert.strictEqual(getResp.status, 200); + const trace = await getResp.json(); + assert.strictEqual(trace.traceId, traceId); + assert.strictEqual(Object.keys(trace.spans).length, 1); + const span = Object.values(trace.spans)[0] as any; + assert.strictEqual(span.traceId, traceId); + assert.strictEqual(span.spanId, 'childSpanId1'); + }); + + it('saves a trace with multiple spans', async () => { + const traceId = 'childTraceId'; + const otlpPayload = { + resourceSpans: [ + { + resource: { + attributes: [ + { key: 'service.name', value: { stringValue: 'test' } }, + ], + }, + scopeSpans: [ + { + scope: { name: 'test-scope' }, + spans: [ + { + traceId, + spanId: 'childSpanId1', + name: 'span1', + startTimeUnixNano: '1000000', + endTimeUnixNano: '2000000', + kind: 1, + attributes: [], + }, + { + traceId, + spanId: 'childSpanId2', + name: 'span2', + startTimeUnixNano: '3000000', + endTimeUnixNano: '4000000', + kind: 1, + attributes: [], + }, + ], + }, + ], + }, + ], + }; + + const res = await fetch(`${url}/api/otlp`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(otlpPayload), + }); + assert.strictEqual(res.status, 200); + + const getResp = await fetch(`${url}/api/traces/${traceId}`); + assert.strictEqual(getResp.status, 200); + const trace = await getResp.json(); + assert.strictEqual(trace.traceId, traceId); + assert.strictEqual(Object.keys(trace.spans).length, 2); + const span1 = trace.spans['childSpanId1']; + assert.strictEqual(span1.traceId, traceId); + const span2 = trace.spans['childSpanId2']; + assert.strictEqual(span2.traceId, traceId); + }); + + it('handles errors', async () => { + const res = await fetch(`${url}/api/otlp`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: 'invalid json', + }); + assert.strictEqual(res.status, 500); + }); +}); + +describe('otlp-endpoint (with parent)', () => { + let port: number; + let storeRoot: string; + let indexRoot: string; + let url: string; + + beforeEach(async () => { + port = await getPort(); + url = `http://localhost:${port}`; + storeRoot = path.resolve( + os.tmpdir(), + `./telemetry-server-api-test-${Date.now()}/traces` + ); + indexRoot = path.resolve( + os.tmpdir(), + `./telemetry-server-api-test-${Date.now()}/traces_idx` + ); + + await startTelemetryServer({ + port, + traceStore: new LocalFileTraceStore({ + storeRoot, + indexRoot, + }), + }); + }); + + afterEach(async () => { + await stopTelemetryApi(); + }); + + it('saves a single trace', async () => { + const parentTraceId = 'parentTraceId'; + const parentSpanId = 'parentSpanId'; + const otlpPayload = { + resourceSpans: [ + { + resource: { + attributes: [ + { key: 'service.name', value: { stringValue: 'test' } }, + ], + }, + scopeSpans: [ + { + scope: { name: 'test-scope' }, + spans: [ + { + traceId: 'childTraceId', + spanId: 'childSpanId1', + name: 'span1', + startTimeUnixNano: '1000000', + endTimeUnixNano: '2000000', + kind: 1, + attributes: [], + }, + ], + }, + ], + }, + ], + }; + + const res = await fetch( + `${url}/api/otlp/${parentTraceId}/${parentSpanId}`, + { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(otlpPayload), + } + ); + assert.strictEqual(res.status, 200); + + const getResp = await fetch(`${url}/api/traces/${parentTraceId}`); + assert.strictEqual(getResp.status, 200); + const trace = await getResp.json(); + assert.strictEqual(trace.traceId, parentTraceId); + assert.strictEqual(Object.keys(trace.spans).length, 1); + const span = Object.values(trace.spans)[0] as any; + assert.strictEqual(span.traceId, parentTraceId); + assert.strictEqual(span.parentSpanId, parentSpanId); + assert.strictEqual(span.spanId, 'childSpanId1'); + assert.strictEqual(span.attributes['genkit:otlp-traceId'], 'childTraceId'); + }); + + it('saves a trace with multiple spans', async () => { + const parentTraceId = 'parentTraceId'; + const parentSpanId = 'parentSpanId'; + const otlpPayload = { + resourceSpans: [ + { + resource: { + attributes: [ + { key: 'service.name', value: { stringValue: 'test' } }, + ], + }, + scopeSpans: [ + { + scope: { name: 'test-scope' }, + spans: [ + { + traceId: 'childTraceId', // this will be overwritten + spanId: 'childSpanId1', + name: 'span1', + startTimeUnixNano: '1000000', + endTimeUnixNano: '2000000', + kind: 1, + attributes: [], + }, + { + traceId: 'childTraceId', // this will be overwritten + spanId: 'childSpanId2', + name: 'span2', + startTimeUnixNano: '3000000', + endTimeUnixNano: '4000000', + kind: 1, + attributes: [], + }, + ], + }, + ], + }, + ], + }; + + const res = await fetch( + `${url}/api/otlp/${parentTraceId}/${parentSpanId}`, + { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(otlpPayload), + } + ); + assert.strictEqual(res.status, 200); + + const getResp = await fetch(`${url}/api/traces/${parentTraceId}`); + assert.strictEqual(getResp.status, 200); + const trace = await getResp.json(); + assert.strictEqual(trace.traceId, parentTraceId); + assert.strictEqual(Object.keys(trace.spans).length, 2); + const span1 = trace.spans['childSpanId1']; + assert.strictEqual(span1.traceId, parentTraceId); + assert.strictEqual(span1.parentSpanId, parentSpanId); + const span2 = trace.spans['childSpanId2']; + assert.strictEqual(span2.traceId, parentTraceId); + assert.strictEqual(span2.parentSpanId, parentSpanId); + }); + + it('saves multiple batches of traces', async () => { + const parentTraceId = 'parentTraceId'; + const parentSpanId = 'parentSpanId'; + const otlpPayload1 = { + resourceSpans: [ + { + resource: { + attributes: [ + { key: 'service.name', value: { stringValue: 'test' } }, + ], + }, + scopeSpans: [ + { + scope: { name: 'test-scope' }, + spans: [ + { + traceId: 'childTraceId', + spanId: 'childSpanId1', + name: 'span1', + startTimeUnixNano: '1000000', + endTimeUnixNano: '2000000', + kind: 1, + attributes: [], + }, + ], + }, + ], + }, + ], + }; + const otlpPayload2 = { + resourceSpans: [ + { + resource: { + attributes: [ + { key: 'service.name', value: { stringValue: 'test' } }, + ], + }, + scopeSpans: [ + { + scope: { name: 'test-scope' }, + spans: [ + { + traceId: 'childTraceId', + spanId: 'childSpanId2', + name: 'span2', + startTimeUnixNano: '3000000', + endTimeUnixNano: '4000000', + kind: 1, + attributes: [], + }, + ], + }, + ], + }, + ], + }; + + const res1 = await fetch( + `${url}/api/otlp/${parentTraceId}/${parentSpanId}`, + { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(otlpPayload1), + } + ); + assert.strictEqual(res1.status, 200); + + const res2 = await fetch( + `${url}/api/otlp/${parentTraceId}/${parentSpanId}`, + { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(otlpPayload2), + } + ); + assert.strictEqual(res2.status, 200); + + const getResp = await fetch(`${url}/api/traces/${parentTraceId}`); + assert.strictEqual(getResp.status, 200); + const trace = await getResp.json(); + assert.strictEqual(trace.traceId, parentTraceId); + assert.strictEqual(Object.keys(trace.spans).length, 2); + assert.ok(trace.spans['childSpanId1']); + assert.ok(trace.spans['childSpanId2']); + }); + + it('handles errors', async () => { + const parentTraceId = 'parentTraceId'; + const parentSpanId = 'parentSpanId'; + const res = await fetch( + `${url}/api/otlp/${parentTraceId}/${parentSpanId}`, + { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: 'invalid json', + } + ); + assert.strictEqual(res.status, 500); + }); +});