diff --git a/packages/api/src/plugins/__tests__/agent-dispatcher.test.ts b/packages/api/src/plugins/__tests__/agent-dispatcher.test.ts index 191f463c..614e0013 100644 --- a/packages/api/src/plugins/__tests__/agent-dispatcher.test.ts +++ b/packages/api/src/plugins/__tests__/agent-dispatcher.test.ts @@ -324,6 +324,45 @@ function createReactionEvent(overrides: Record = {}) { // ============================================================================ describe('agent-dispatcher', () => { + describe('human lifecycle observability helpers', () => { + it('builds redacted-readable lifecycle attributes without raw PII or secrets', () => { + const attributes = __test__.buildLifecycleSpanAttributes({ + stage: 'provider_inbound', + eventType: 'user_message_turn', + channel: 'whatsapp-baileys', + provider: 'gupshup', + instanceId: 'inst-1', + chatId: '5511999887766@s.whatsapp.net', + sessionId: 'p0r-hml-20260531T204449Z', + traceId: 'trc-test-123', + messageId: 'wamid.123', + agentId: 'eugenia-seller', + inputText: 'Olá, meu telefone é 5511999887766 e email felipe@example.com. Quero cotar plano.', + outputText: 'Claro, posso ajudar com a cotação.', + extra: { + authorization: 'Bearer super-secret-token', + token: 'abc123', + }, + }); + + expect(attributes['khal.lifecycle.stage']).toBe('provider_inbound'); + expect(attributes['khal.event_type']).toBe('user_message_turn'); + expect(attributes['khal.channel']).toBe('whatsapp-baileys'); + expect(attributes['khal.provider']).toBe('gupshup'); + expect(attributes['langfuse.session.id']).toBe('p0r-hml-20260531T204449Z'); + expect(attributes['session.id']).toBe('p0r-hml-20260531T204449Z'); + expect(attributes['khal.input_chars']).toBeGreaterThan(0); + expect(String(attributes['khal.input_sha256'])).toStartWith('sha256:'); + expect(String(attributes['khal.output_sha256'])).toStartWith('sha256:'); + expect(String(attributes['khal.input_preview_redacted'])).toContain('[PHONE]'); + expect(String(attributes['khal.input_preview_redacted'])).toContain('[EMAIL]'); + expect(JSON.stringify(attributes)).not.toContain('felipe@example.com'); + expect(JSON.stringify(attributes)).not.toContain('5511999887766'); + expect(JSON.stringify(attributes)).not.toContain('super-secret-token'); + expect(JSON.stringify(attributes)).not.toContain('abc123'); + }); + }); + // ====================================================================== // setupAgentDispatcher — subscribes to correct NATS subjects // ====================================================================== diff --git a/packages/api/src/plugins/agent-dispatcher.ts b/packages/api/src/plugins/agent-dispatcher.ts index 16340bba..cf2633eb 100644 --- a/packages/api/src/plugins/agent-dispatcher.ts +++ b/packages/api/src/plugins/agent-dispatcher.ts @@ -17,6 +17,7 @@ * - Preserves existing debouncing for message events */ +import { createHash } from 'node:crypto'; import { unlink, writeFile } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join, resolve } from 'node:path'; @@ -67,6 +68,7 @@ import type { AgentProvider, Database } from '@omni/db'; import { agentSessions, agents, instances } from '@omni/db'; import type { ChannelType, Instance } from '@omni/db'; import { createMediaProcessingService } from '@omni/media-processing'; +import { SpanStatusCode, trace } from '@opentelemetry/api'; import * as Sentry from '@sentry/bun'; import { and, eq } from 'drizzle-orm'; import { withIdempotency } from '../lib/idempotency'; @@ -109,6 +111,160 @@ const QUOTED_MESSAGE_MAX_CHARS = 4000; /** Hard cap on history messages fetched for DM conversations. */ const DM_HISTORY_LIMIT = 20; +const LIFECYCLE_PREVIEW_MAX_CHARS = 160; +const LIFECYCLE_SENSITIVE_KEY_PARTS = ['authorization', 'bearer', 'password', 'secret', 'token', 'api_key', 'apikey']; + +type LifecycleAttributeValue = string | number | boolean; + +interface LifecycleSpanAttributeInput { + stage: 'provider_inbound' | 'dispatch_to_agno' | 'provider_outbound' | 'turn'; + eventType: string; + channel: string; + provider?: string; + instanceId?: string; + chatId?: string; + sessionId?: string; + traceId?: string; + messageId?: string; + agentId?: string; + inputText?: string; + outputText?: string; + extra?: Record; +} + +function sha256Digest(value: string): string { + return `sha256:${createHash('sha256').update(value).digest('hex')}`; +} + +function redactLifecycleText(value: string): string { + return value + .replace(/[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}/gi, '[EMAIL]') + .replace(/\b\+?\d[\d\s().-]{5,}\d\b/g, '[PHONE]') + .replace(/\b\d{6,}\b/g, '[NUMBER]') + .replace(/\b[^\s@]+@(s\.whatsapp\.net|g\.us|lid|newsletter)\b/gi, '[JID]'); +} + +function previewLifecycleText(value: string): string { + const redacted = redactLifecycleText(value).replace(/\s+/g, ' ').trim(); + if (redacted.length <= LIFECYCLE_PREVIEW_MAX_CHARS) return redacted; + return `${redacted.slice(0, LIFECYCLE_PREVIEW_MAX_CHARS - 1)}…`; +} + +function isLifecycleSafeExtraKey(key: string): boolean { + const lowered = key.toLowerCase(); + return !LIFECYCLE_SENSITIVE_KEY_PARTS.some((part) => lowered.includes(part)); +} + +function setTextLifecycleAttributes( + attributes: Record, + prefix: 'input' | 'output', + value: string | undefined, +): void { + if (value === undefined) return; + attributes[`khal.${prefix}_chars`] = value.length; + attributes[`khal.${prefix}_sha256`] = sha256Digest(value); + attributes[`khal.${prefix}_preview_redacted`] = previewLifecycleText(value); +} + +function setOptionalLifecycleAttributes( + attributes: Record, + pairs: Array<[string, string | undefined]>, +): void { + for (const [key, value] of pairs) { + if (value) attributes[key] = value; + } +} + +function setChatLifecycleAttributes( + attributes: Record, + chatId: string | undefined, +): void { + if (!chatId) return; + attributes['omni.chat_id_sha256'] = sha256Digest(chatId); + attributes['omni.chat_id_preview_redacted'] = previewLifecycleText(chatId); +} + +function setSessionLifecycleAttributes( + attributes: Record, + sessionId: string | undefined, +): void { + if (!sessionId) return; + attributes['khal.session_id'] = sessionId; + attributes['langfuse.session.id'] = sessionId; + attributes['session.id'] = sessionId; +} + +function setExtraLifecycleAttributes( + attributes: Record, + extra: Record | undefined, +): void { + if (!extra) return; + for (const [key, value] of Object.entries(extra)) { + if (!isLifecycleSafeExtraKey(key) || value === undefined || value === null) continue; + if (typeof value === 'string' || typeof value === 'number' || typeof value === 'boolean') { + attributes[`khal.${key}`] = typeof value === 'string' ? previewLifecycleText(value) : value; + } + } +} + +function buildLifecycleSpanAttributes(input: LifecycleSpanAttributeInput): Record { + const attributes: Record = { + 'khal.lifecycle.stage': input.stage, + 'khal.event_type': input.eventType, + 'khal.channel': input.channel, + }; + + setOptionalLifecycleAttributes(attributes, [ + ['khal.provider', input.provider], + ['omni.instance_id', input.instanceId], + ['khal.trace_id', input.traceId], + ['khal.turn.message_id', input.messageId], + ['khal.agent_id', input.agentId], + ]); + setChatLifecycleAttributes(attributes, input.chatId); + setSessionLifecycleAttributes(attributes, input.sessionId); + setTextLifecycleAttributes(attributes, 'input', input.inputText); + setTextLifecycleAttributes(attributes, 'output', input.outputText); + setExtraLifecycleAttributes(attributes, input.extra); + + return attributes; +} + +async function withLifecycleSpan( + name: string, + attributes: Record, + fn: () => Promise, +): Promise { + let callbackStarted = false; + try { + const tracer = trace.getTracer('omni.agent-dispatcher'); + return await tracer.startActiveSpan(name, { attributes }, async (span) => { + callbackStarted = true; + try { + const result = await fn(); + span.setStatus({ code: SpanStatusCode.OK }); + return result; + } catch (error) { + span.recordException(error instanceof Error ? error : new Error(String(error))); + span.setStatus({ code: SpanStatusCode.ERROR, message: error instanceof Error ? error.message : String(error) }); + throw error; + } finally { + span.end(); + } + }); + } catch (error) { + if (callbackStarted) throw error; + log.warn('Lifecycle span wrapper failed before dispatch; continuing without span', { spanName: name }); + return fn(); + } +} + +function emitLifecycleSpan(name: string, attributes: Record): void { + withLifecycleSpan(name, attributes, async () => undefined).catch(() => { + // best-effort — never throw from instrumentation path + }); +} + // ============================================================================ // Plugin → AckProvider adapter // ============================================================================ @@ -1960,7 +2116,24 @@ async function dispatchViaTurnBasedProvider( // Dispatch (fire-and-forget — agent uses verb commands + omni done) const dispatchStart = Date.now(); - await provider.trigger(trigger); + await withLifecycleSpan( + 'omni.dispatch_to_agno', + buildLifecycleSpanAttributes({ + stage: 'dispatch_to_agno', + eventType: 'user_message_turn', + channel: instance.channel, + provider: provider.schema, + instanceId: instance.id, + chatId, + sessionId: trigger.sessionId, + traceId, + messageId, + agentId: agentRecord.id, + inputText: trigger.content.text, + extra: { mode: 'turn-based', provider_id: provider.id, provider_schema: provider.schema }, + }), + () => provider.trigger(trigger), + ); const dispatchDurationMs = Date.now() - dispatchStart; if (sentryEnabled()) { @@ -2067,6 +2240,28 @@ async function dispatchViaProvider( explicitKhalSessionId, ); + const lifecycleBase = { + eventType: 'user_message_turn', + channel, + provider: provider.schema, + instanceId: instance.id, + chatId, + sessionId, + traceId, + messageId: messages[0]?.payload.externalId, + agentId: instance.agentInternalId ?? instance.agentId ?? undefined, + inputText: messageTexts.join('\n'), + }; + + emitLifecycleSpan( + 'omni.provider_inbound', + buildLifecycleSpanAttributes({ + ...lifecycleBase, + stage: 'provider_inbound', + extra: { trigger_type: triggerType, message_count: messages.length }, + }), + ); + // ── Turn-based mode: delegate to extracted helper ── if (provider.mode === 'turn-based') { return dispatchViaTurnBasedProvider(services, instance, provider, trigger, messages, chatId, traceId, db); @@ -2075,7 +2270,15 @@ async function dispatchViaProvider( // ── Standard (round-trip / fire-and-forget) dispatch ── const correlationId = messages[0]?.metadata.correlationId; const dispatchStart = Date.now(); - const result = await provider.trigger(trigger); + const result = await withLifecycleSpan( + 'omni.dispatch_to_agno', + buildLifecycleSpanAttributes({ + ...lifecycleBase, + stage: 'dispatch_to_agno', + extra: { trigger_type: triggerType, provider_id: provider.id, provider_schema: provider.schema }, + }), + () => provider.trigger(trigger), + ); const dispatchDurationMs = Date.now() - dispatchStart; // Sentry metrics: agent dispatch count and latency @@ -2103,16 +2306,26 @@ async function dispatchViaProvider( // T8: Processing send request recordJourneyCheckpoint(correlationId, 'T8', JOURNEY_STAGES.T8); - await sendResponseParts( - channel, - instance.id, - chatId, - parts, - getSplitDelayConfig(instance), - _fmtMode, - replyTo, - correlationId, - senderAgentId, + await withLifecycleSpan( + 'omni.provider_outbound', + buildLifecycleSpanAttributes({ + ...lifecycleBase, + stage: 'provider_outbound', + outputText: parts.join('\n'), + extra: { parts_count: parts.length, provider_id: provider.id, provider_schema: provider.schema }, + }), + () => + sendResponseParts( + channel, + instance.id, + chatId, + parts, + getSplitDelayConfig(instance), + _fmtMode, + replyTo, + correlationId, + senderAgentId, + ), ); // T9: Outbound sent via plugin @@ -2200,20 +2413,57 @@ async function dispatchViaLegacy( mediaFiles.length > 0 ? (mediaFiles as unknown as ProviderFile[]) : undefined, ); - const result = await services.agentRunner.run({ - instance, - chatId, - personId, + const lifecycleSessionId = computeSessionId( + instance.agentSessionStrategy ?? 'per_chat', senderId, - senderName, - senderAvatarUrl, - senderPlatformUsername, - chatType, - chatName, - participantCount, - messages: messageTexts, - files: mediaFiles.length > 0 ? mediaFiles : undefined, - }); + chatId, + (rawPl as Record).threadId as string | undefined, + ); + const lifecycleBase = { + eventType: 'user_message_turn', + channel, + provider: 'legacy-agent-runner', + instanceId: instance.id, + chatId, + sessionId: lifecycleSessionId, + traceId, + messageId: messages[0]?.payload.externalId, + agentId: instance.agentInternalId ?? instance.agentId ?? undefined, + inputText: messageTexts.join('\n'), + }; + + emitLifecycleSpan( + 'omni.provider_inbound', + buildLifecycleSpanAttributes({ + ...lifecycleBase, + stage: 'provider_inbound', + extra: { trigger_type: triggerType, message_count: messages.length }, + }), + ); + + const result = await withLifecycleSpan( + 'omni.dispatch_to_agno', + buildLifecycleSpanAttributes({ + ...lifecycleBase, + stage: 'dispatch_to_agno', + extra: { trigger_type: triggerType, provider_schema: 'legacy-agent-runner' }, + }), + () => + services.agentRunner.run({ + instance, + chatId, + personId, + senderId, + senderName, + senderAvatarUrl, + senderPlatformUsername, + chatType, + chatName, + participantCount, + messages: messageTexts, + files: mediaFiles.length > 0 ? mediaFiles : undefined, + }), + ); const correlationId = messages[0]?.metadata.correlationId; const selfChat = isSelfChat(chatId, instance.ownerIdentifier); @@ -2228,16 +2478,26 @@ async function dispatchViaLegacy( // T8: Processing send request recordJourneyCheckpoint(correlationId, 'T8', JOURNEY_STAGES.T8); - await sendResponseParts( - channel, - instance.id, - chatId, - parts, - getSplitDelayConfig(instance), - _fmtMode, - replyTo, - correlationId, - senderAgentId, + await withLifecycleSpan( + 'omni.provider_outbound', + buildLifecycleSpanAttributes({ + ...lifecycleBase, + stage: 'provider_outbound', + outputText: parts.join('\n'), + extra: { parts_count: parts.length, provider_schema: 'legacy-agent-runner' }, + }), + () => + sendResponseParts( + channel, + instance.id, + chatId, + parts, + getSplitDelayConfig(instance), + _fmtMode, + replyTo, + correlationId, + senderAgentId, + ), ); // T9: Outbound sent via plugin @@ -4874,6 +5134,7 @@ export const __test__ = { getDebounceConfig, extractKhalSessionId, buildTriggerHeaders, + buildLifecycleSpanAttributes, createNatsGenieProviderInstance, /** Override the NatsGenieProvider constructor for tests (avoids barrel mock contamination). */ set NatsGenieProviderClass(cls: typeof NatsGenieProvider) { diff --git a/packages/core/src/providers/__tests__/agno-client.test.ts b/packages/core/src/providers/__tests__/agno-client.test.ts index 08f72737..899ed4f1 100644 --- a/packages/core/src/providers/__tests__/agno-client.test.ts +++ b/packages/core/src/providers/__tests__/agno-client.test.ts @@ -364,6 +364,53 @@ describe('AgnoClient', () => { expect(chunks[3]).toMatchObject({ event: 'RunCompleted', isComplete: true, fullContent: 'Hello World!' }); }); + it('propagates W3C trace context and Khal session headers for streaming runs', async () => { + const sseData = 'event: RunCompleted\ndata: {"run_id": "run-1", "content": "done"}\n\n'; + const encoder = new TextEncoder(); + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode(sseData)); + controller.close(); + }, + }); + + mockImpl.mockResolvedValueOnce( + new Response(stream, { status: 200, headers: { 'Content-Type': 'text/event-stream' } }), + ); + + const client = new AgnoClient(config); + for await (const _chunk of client.stream({ + message: 'Hi!', + agentId: 'agent-1', + userId: 'user-456', + khalSessionId: 'khal-session-stream', + traceContext: { + traceId: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa', + spanId: 'bbbbbbbbbbbbbbbb', + traceFlags: 1, + }, + omni: { + instanceId: 'inst-1', + chatId: 'chat-123', + messageId: 'msg-stream-1', + channel: 'gupshup', + }, + })) { + // exhaust stream so the request is issued + } + + const init = mockImpl.mock.calls[0]?.[1] as RequestInit; + const headers = init.headers as Record; + const formData = init.body as FormData; + expect(headers.traceparent).toBe('00-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-bbbbbbbbbbbbbbbb-01'); + expect(headers['x-khal-session-id']).toBe('khal-session-stream'); + expect(headers['x-khal-message-id']).toBe('msg-stream-1'); + expect(headers['x-omni-instance-id']).toBe('inst-1'); + expect(headers['x-omni-chat-id']).toBe('chat-123'); + expect(headers['x-omni-channel']).toBe('gupshup'); + expect(formData.get('session_id')).toBe('khal-session-stream'); + }); + it('handles stream errors', async () => { const sseData = 'event: RunFailed\ndata: {"error": "Agent crashed"}\n\n';