diff --git a/.claude-plugin/marketplace.json b/.claude-plugin/marketplace.json index 088d97df..27dca0cb 100644 --- a/.claude-plugin/marketplace.json +++ b/.claude-plugin/marketplace.json @@ -9,7 +9,7 @@ { "name": "omni", "description": "Full Omni platform control — multichannel messaging, automations, events, batch ops via the omni CLI", - "version": "2.260531.4", + "version": "2.260602.2", "author": { "name": "Automagik" }, diff --git a/apps/ui/package.json b/apps/ui/package.json index cc582d30..9da5c7f4 100644 --- a/apps/ui/package.json +++ b/apps/ui/package.json @@ -1,6 +1,6 @@ { "name": "@omni/ui", - "version": "2.260531.4", + "version": "2.260602.2", "private": true, "type": "module", "scripts": { diff --git a/bun.lock b/bun.lock index a7b1c70f..9f725d51 100644 --- a/bun.lock +++ b/bun.lock @@ -18,7 +18,7 @@ }, "apps/ui": { "name": "@omni/ui", - "version": "2.260531.4", + "version": "2.260602.2", "dependencies": { "@omni/sdk": "workspace:*", "@radix-ui/react-dialog": "^1.1.15", @@ -51,7 +51,7 @@ }, "packages/api": { "name": "@omni/api", - "version": "2.260531.4", + "version": "2.260602.2", "dependencies": { "@google/genai": "^1.0.0", "@hono/swagger-ui": "^0.4.1", @@ -91,7 +91,7 @@ }, "packages/channel-a2a": { "name": "@omni/channel-a2a", - "version": "2.260531.4", + "version": "2.260602.2", "dependencies": { "@omni/channel-sdk": "workspace:*", "@omni/core": "workspace:*", @@ -107,7 +107,7 @@ }, "packages/channel-discord": { "name": "@omni/channel-discord", - "version": "2.260531.4", + "version": "2.260602.2", "dependencies": { "@omni/channel-sdk": "workspace:*", "@omni/core": "workspace:*", @@ -124,7 +124,7 @@ }, "packages/channel-gupshup": { "name": "@omni/channel-gupshup", - "version": "2.260531.4", + "version": "2.260602.2", "dependencies": { "@omni/channel-sdk": "workspace:*", "@omni/core": "workspace:*", @@ -140,7 +140,7 @@ }, "packages/channel-internal": { "name": "@omni/channel-internal", - "version": "2.260531.4", + "version": "2.260602.2", "dependencies": { "@omni/channel-sdk": "workspace:*", "@omni/core": "workspace:*", @@ -155,7 +155,7 @@ }, "packages/channel-sdk": { "name": "@omni/channel-sdk", - "version": "2.260531.4", + "version": "2.260602.2", "dependencies": { "@omni/core": "workspace:*", }, @@ -166,7 +166,7 @@ }, "packages/channel-slack": { "name": "@omni/channel-slack", - "version": "2.260531.4", + "version": "2.260602.2", "dependencies": { "@omni/channel-sdk": "workspace:*", "@omni/core": "workspace:*", @@ -184,7 +184,7 @@ }, "packages/channel-telegram": { "name": "@omni/channel-telegram", - "version": "2.260531.4", + "version": "2.260602.2", "dependencies": { "@omni/channel-sdk": "workspace:*", "@omni/core": "workspace:*", @@ -200,7 +200,7 @@ }, "packages/channel-twilio-whatsapp": { "name": "@omni/channel-twilio-whatsapp", - "version": "2.260531.4", + "version": "2.260602.2", "dependencies": { "@omni/channel-sdk": "workspace:*", "@omni/core": "workspace:*", @@ -215,7 +215,7 @@ }, "packages/channel-whatsapp": { "name": "@omni/channel-whatsapp", - "version": "2.260531.4", + "version": "2.260602.2", "dependencies": { "@hapi/boom": "^10.0.1", "@omni/channel-sdk": "workspace:*", @@ -236,7 +236,7 @@ }, "packages/cli": { "name": "@automagik/omni", - "version": "2.260531.4", + "version": "2.260602.2", "bin": { "omni": "./bin/omni", }, @@ -272,7 +272,7 @@ }, "packages/core": { "name": "@omni/core", - "version": "2.260531.4", + "version": "2.260602.2", "dependencies": { "@anthropic-ai/claude-agent-sdk": "^0.2.62", "@opentelemetry/api": "^1.9.1", @@ -288,7 +288,7 @@ }, "packages/db": { "name": "@omni/db", - "version": "2.260531.4", + "version": "2.260602.2", "dependencies": { "@omni/core": "workspace:*", "drizzle-orm": "^0.38.4", @@ -302,7 +302,7 @@ }, "packages/media-processing": { "name": "@omni/media-processing", - "version": "2.260531.4", + "version": "2.260602.2", "dependencies": { "@google/generative-ai": "^0.21.0", "@omni/core": "workspace:*", @@ -321,7 +321,7 @@ }, "packages/plugin-openclaw": { "name": "@omni/plugin-openclaw", - "version": "2.260531.4", + "version": "2.260602.2", "devDependencies": { "@types/bun": "latest", "typescript": "^5.7.3", @@ -329,7 +329,7 @@ }, "packages/sdk": { "name": "@omni/sdk", - "version": "2.260531.4", + "version": "2.260602.2", "dependencies": { "openapi-fetch": "^0.13.6", }, @@ -343,7 +343,7 @@ }, "packages/voice-client": { "name": "@omni/voice-client", - "version": "2.260531.4", + "version": "2.260602.2", "dependencies": { "@snazzah/davey": "^0.1.11", "libsodium-wrappers": "^0.7.15", diff --git a/package.json b/package.json index 337be87a..7910dd5f 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "omni-v2", - "version": "2.260531.4", + "version": "2.260602.2", "private": true, "type": "module", "workspaces": ["packages/*", "apps/*"], diff --git a/packages/api/package.json b/packages/api/package.json index 882a2f6f..c75e4df7 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -1,6 +1,6 @@ { "name": "@omni/api", - "version": "2.260531.4", + "version": "2.260602.2", "type": "module", "exports": { ".": { diff --git a/packages/api/src/plugins/__tests__/agent-dispatcher.test.ts b/packages/api/src/plugins/__tests__/agent-dispatcher.test.ts index 191f463c..614e0013 100644 --- a/packages/api/src/plugins/__tests__/agent-dispatcher.test.ts +++ b/packages/api/src/plugins/__tests__/agent-dispatcher.test.ts @@ -324,6 +324,45 @@ function createReactionEvent(overrides: Record = {}) { // ============================================================================ describe('agent-dispatcher', () => { + describe('human lifecycle observability helpers', () => { + it('builds redacted-readable lifecycle attributes without raw PII or secrets', () => { + const attributes = __test__.buildLifecycleSpanAttributes({ + stage: 'provider_inbound', + eventType: 'user_message_turn', + channel: 'whatsapp-baileys', + provider: 'gupshup', + instanceId: 'inst-1', + chatId: '5511999887766@s.whatsapp.net', + sessionId: 'p0r-hml-20260531T204449Z', + traceId: 'trc-test-123', + messageId: 'wamid.123', + agentId: 'eugenia-seller', + inputText: 'Olá, meu telefone é 5511999887766 e email felipe@example.com. Quero cotar plano.', + outputText: 'Claro, posso ajudar com a cotação.', + extra: { + authorization: 'Bearer super-secret-token', + token: 'abc123', + }, + }); + + expect(attributes['khal.lifecycle.stage']).toBe('provider_inbound'); + expect(attributes['khal.event_type']).toBe('user_message_turn'); + expect(attributes['khal.channel']).toBe('whatsapp-baileys'); + expect(attributes['khal.provider']).toBe('gupshup'); + expect(attributes['langfuse.session.id']).toBe('p0r-hml-20260531T204449Z'); + expect(attributes['session.id']).toBe('p0r-hml-20260531T204449Z'); + expect(attributes['khal.input_chars']).toBeGreaterThan(0); + expect(String(attributes['khal.input_sha256'])).toStartWith('sha256:'); + expect(String(attributes['khal.output_sha256'])).toStartWith('sha256:'); + expect(String(attributes['khal.input_preview_redacted'])).toContain('[PHONE]'); + expect(String(attributes['khal.input_preview_redacted'])).toContain('[EMAIL]'); + expect(JSON.stringify(attributes)).not.toContain('felipe@example.com'); + expect(JSON.stringify(attributes)).not.toContain('5511999887766'); + expect(JSON.stringify(attributes)).not.toContain('super-secret-token'); + expect(JSON.stringify(attributes)).not.toContain('abc123'); + }); + }); + // ====================================================================== // setupAgentDispatcher — subscribes to correct NATS subjects // ====================================================================== diff --git a/packages/api/src/plugins/agent-dispatcher.ts b/packages/api/src/plugins/agent-dispatcher.ts index 16340bba..ba305e94 100644 --- a/packages/api/src/plugins/agent-dispatcher.ts +++ b/packages/api/src/plugins/agent-dispatcher.ts @@ -17,6 +17,7 @@ * - Preserves existing debouncing for message events */ +import { createHash } from 'node:crypto'; import { unlink, writeFile } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join, resolve } from 'node:path'; @@ -67,6 +68,7 @@ import type { AgentProvider, Database } from '@omni/db'; import { agentSessions, agents, instances } from '@omni/db'; import type { ChannelType, Instance } from '@omni/db'; import { createMediaProcessingService } from '@omni/media-processing'; +import { SpanStatusCode, trace } from '@opentelemetry/api'; import * as Sentry from '@sentry/bun'; import { and, eq } from 'drizzle-orm'; import { withIdempotency } from '../lib/idempotency'; @@ -109,6 +111,173 @@ const QUOTED_MESSAGE_MAX_CHARS = 4000; /** Hard cap on history messages fetched for DM conversations. */ const DM_HISTORY_LIMIT = 20; +const LIFECYCLE_PREVIEW_MAX_CHARS = 160; +const LIFECYCLE_SENSITIVE_KEY_PARTS = ['authorization', 'bearer', 'password', 'secret', 'token', 'api_key', 'apikey']; + +type LifecycleAttributeValue = string | number | boolean; + +interface LifecycleSpanAttributeInput { + stage: 'provider_inbound' | 'dispatch_to_agno' | 'provider_outbound' | 'turn'; + eventType: string; + channel: string; + provider?: string; + instanceId?: string; + chatId?: string; + sessionId?: string; + traceId?: string; + messageId?: string; + agentId?: string; + inputText?: string; + outputText?: string; + extra?: Record; +} + +function sha256Digest(value: string): string { + return `sha256:${createHash('sha256').update(value).digest('hex')}`; +} + +function redactLifecycleText(value: string): string { + return value + .replace(/[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}/gi, '[EMAIL]') + .replace(/\b\+?\d[\d\s().-]{5,}\d\b/g, '[PHONE]') + .replace(/\b\d{6,}\b/g, '[NUMBER]') + .replace(/\b[^\s@]+@(s\.whatsapp\.net|g\.us|lid|newsletter)\b/gi, '[JID]'); +} + +function previewLifecycleText(value: string): string { + const redacted = redactLifecycleText(value).replace(/\s+/g, ' ').trim(); + if (redacted.length <= LIFECYCLE_PREVIEW_MAX_CHARS) return redacted; + return `${redacted.slice(0, LIFECYCLE_PREVIEW_MAX_CHARS - 1)}…`; +} + +function isLifecycleSafeExtraKey(key: string): boolean { + const lowered = key.toLowerCase(); + return !LIFECYCLE_SENSITIVE_KEY_PARTS.some((part) => lowered.includes(part)); +} + +function setTextLifecycleAttributes( + attributes: Record, + prefix: 'input' | 'output', + value: string | undefined, +): void { + if (value === undefined) return; + attributes[`khal.${prefix}_chars`] = value.length; + attributes[`khal.${prefix}_sha256`] = sha256Digest(value); + attributes[`khal.${prefix}_preview_redacted`] = previewLifecycleText(value); +} + +function setOptionalLifecycleAttributes( + attributes: Record, + pairs: Array<[string, string | undefined]>, +): void { + for (const [key, value] of pairs) { + if (value) attributes[key] = value; + } +} + +function setChatLifecycleAttributes( + attributes: Record, + chatId: string | undefined, +): void { + if (!chatId) return; + attributes['omni.chat_id_sha256'] = sha256Digest(chatId); + attributes['omni.chat_id_preview_redacted'] = previewLifecycleText(chatId); +} + +function setSessionLifecycleAttributes( + attributes: Record, + sessionId: string | undefined, +): void { + if (!sessionId) return; + attributes['khal.session_id'] = sessionId; + attributes['langfuse.session.id'] = sessionId; + attributes['session.id'] = sessionId; +} + +function setExtraLifecycleAttributes( + attributes: Record, + extra: Record | undefined, +): void { + if (!extra) return; + for (const [key, value] of Object.entries(extra)) { + if (!isLifecycleSafeExtraKey(key) || value === undefined || value === null) continue; + if (typeof value === 'string' || typeof value === 'number' || typeof value === 'boolean') { + attributes[`khal.${key}`] = typeof value === 'string' ? previewLifecycleText(value) : value; + } + } +} + +function buildLifecycleSpanAttributes(input: LifecycleSpanAttributeInput): Record { + const attributes: Record = { + 'khal.lifecycle.stage': input.stage, + 'khal.event_type': input.eventType, + 'khal.channel': input.channel, + }; + + setOptionalLifecycleAttributes(attributes, [ + ['khal.provider', input.provider], + ['omni.instance_id', input.instanceId], + ['khal.trace_id', input.traceId], + ['khal.turn.message_id', input.messageId], + ['khal.agent_id', input.agentId], + ]); + setChatLifecycleAttributes(attributes, input.chatId); + setSessionLifecycleAttributes(attributes, input.sessionId); + setTextLifecycleAttributes(attributes, 'input', input.inputText); + setTextLifecycleAttributes(attributes, 'output', input.outputText); + setExtraLifecycleAttributes(attributes, input.extra); + + return attributes; +} + +async function withLifecycleSpan( + name: string, + attributes: Record, + fn: () => Promise, +): Promise { + let callbackStarted = false; + try { + const tracer = trace.getTracer('omni.agent-dispatcher'); + return await tracer.startActiveSpan(name, { attributes }, async (span) => { + callbackStarted = true; + try { + const result = await fn(); + span.setStatus({ code: SpanStatusCode.OK }); + return result; + } catch (error) { + span.recordException(error instanceof Error ? error : new Error(String(error))); + span.setStatus({ code: SpanStatusCode.ERROR, message: error instanceof Error ? error.message : String(error) }); + throw error; + } finally { + span.end(); + } + }); + } catch (error) { + if (callbackStarted) throw error; + log.warn('Lifecycle span wrapper failed before dispatch; continuing without span', { spanName: name }); + return fn(); + } +} + +function emitLifecycleSpan(name: string, attributes: Record): void { + withLifecycleSpan(name, attributes, async () => undefined).catch(() => { + // best-effort — never throw from instrumentation path + }); +} + +function activeProviderTraceContext(): AgentTrigger['traceContext'] { + const activeSpan = trace.getActiveSpan(); + const spanContext = activeSpan?.spanContext(); + if (!spanContext?.traceId || !spanContext?.spanId) return undefined; + + return { + traceId: spanContext.traceId, + spanId: spanContext.spanId, + traceFlags: spanContext.traceFlags, + traceState: spanContext.traceState?.serialize(), + }; +} + // ============================================================================ // Plugin → AckProvider adapter // ============================================================================ @@ -1960,7 +2129,24 @@ async function dispatchViaTurnBasedProvider( // Dispatch (fire-and-forget — agent uses verb commands + omni done) const dispatchStart = Date.now(); - await provider.trigger(trigger); + await withLifecycleSpan( + 'omni.dispatch_to_agno', + buildLifecycleSpanAttributes({ + stage: 'dispatch_to_agno', + eventType: 'user_message_turn', + channel: instance.channel, + provider: provider.schema, + instanceId: instance.id, + chatId, + sessionId: trigger.sessionId, + traceId, + messageId, + agentId: agentRecord.id, + inputText: trigger.content.text, + extra: { mode: 'turn-based', provider_id: provider.id, provider_schema: provider.schema }, + }), + () => provider.trigger(trigger), + ); const dispatchDurationMs = Date.now() - dispatchStart; if (sentryEnabled()) { @@ -2067,77 +2253,135 @@ async function dispatchViaProvider( explicitKhalSessionId, ); - // ── Turn-based mode: delegate to extracted helper ── - if (provider.mode === 'turn-based') { - return dispatchViaTurnBasedProvider(services, instance, provider, trigger, messages, chatId, traceId, db); - } - - // ── Standard (round-trip / fire-and-forget) dispatch ── - const correlationId = messages[0]?.metadata.correlationId; - const dispatchStart = Date.now(); - const result = await provider.trigger(trigger); - const dispatchDurationMs = Date.now() - dispatchStart; - - // Sentry metrics: agent dispatch count and latency - if (sentryEnabled()) { - Sentry.metrics.count('agent.dispatch', 1, { attributes: { provider_type: provider.schema } }); - Sentry.metrics.distribution('agent.dispatch.latency', dispatchDurationMs, { - unit: 'millisecond', - attributes: { provider_type: provider.schema }, - }); - } + const lifecycleBase = { + eventType: 'user_message_turn', + channel, + provider: provider.schema, + instanceId: instance.id, + chatId, + sessionId, + traceId, + messageId: messages[0]?.payload.externalId, + agentId: instance.agentInternalId ?? instance.agentId ?? undefined, + inputText: messageTexts.join('\n'), + }; - // If the agent triggered a handoff during this run (agentPaused: true), - // suppress the response — the handoff message already notified the user. - const chatAfterRun = await services.chats.findByExternalIdSmart(instance.id, chatId); - const handoffTriggered = (chatAfterRun?.settings as { agentPaused?: boolean } | null)?.agentPaused === true; + return withLifecycleSpan( + 'omni.turn', + buildLifecycleSpanAttributes({ + ...lifecycleBase, + stage: 'turn', + extra: { + trigger_type: triggerType, + message_count: messages.length, + provider_id: provider.id, + provider_schema: provider.schema, + }, + }), + async () => { + await withLifecycleSpan( + 'omni.provider_inbound', + buildLifecycleSpanAttributes({ + ...lifecycleBase, + stage: 'provider_inbound', + extra: { trigger_type: triggerType, message_count: messages.length }, + }), + async () => undefined, + ); - if (result && result.parts.length > 0 && !handoffTriggered) { - const selfChat = isSelfChat(chatId, instance.ownerIdentifier); - const rawParts = selfChat ? result.parts.map((p) => `${BOT_PREFIX}${p}`) : result.parts; - // Apply before_message_write hooks to each response part before sending - const parts = await Promise.all(rawParts.map((part) => executeBeforeMessageWriteHooks(instance.id, chatId, part))); - const _fmtMode = (instance.messageFormatMode as 'convert' | 'passthrough') ?? 'convert'; - const replyTo = messages[0]?.payload.replyToId ?? messages[0]?.payload.externalId; + // ── Turn-based mode: delegate to extracted helper ── + if (provider.mode === 'turn-based') { + return dispatchViaTurnBasedProvider(services, instance, provider, trigger, messages, chatId, traceId, db); + } - // T8: Processing send request - recordJourneyCheckpoint(correlationId, 'T8', JOURNEY_STAGES.T8); + // ── Standard (round-trip / fire-and-forget) dispatch ── + const correlationId = messages[0]?.metadata.correlationId; + const dispatchStart = Date.now(); + const result = await withLifecycleSpan( + 'omni.dispatch_to_agno', + buildLifecycleSpanAttributes({ + ...lifecycleBase, + stage: 'dispatch_to_agno', + extra: { trigger_type: triggerType, provider_id: provider.id, provider_schema: provider.schema }, + }), + () => provider.trigger({ ...trigger, traceContext: activeProviderTraceContext() ?? trigger.traceContext }), + ); + const dispatchDurationMs = Date.now() - dispatchStart; + + // Sentry metrics: agent dispatch count and latency + if (sentryEnabled()) { + Sentry.metrics.count('agent.dispatch', 1, { attributes: { provider_type: provider.schema } }); + Sentry.metrics.distribution('agent.dispatch.latency', dispatchDurationMs, { + unit: 'millisecond', + attributes: { provider_type: provider.schema }, + }); + } - await sendResponseParts( - channel, - instance.id, - chatId, - parts, - getSplitDelayConfig(instance), - _fmtMode, - replyTo, - correlationId, - senderAgentId, - ); + // If the agent triggered a handoff during this run (agentPaused: true), + // suppress the response — the handoff message already notified the user. + const chatAfterRun = await services.chats.findByExternalIdSmart(instance.id, chatId); + const handoffTriggered = (chatAfterRun?.settings as { agentPaused?: boolean } | null)?.agentPaused === true; + + if (result && result.parts.length > 0 && !handoffTriggered) { + const selfChat = isSelfChat(chatId, instance.ownerIdentifier); + const rawParts = selfChat ? result.parts.map((p) => `${BOT_PREFIX}${p}`) : result.parts; + // Apply before_message_write hooks to each response part before sending + const parts = await Promise.all( + rawParts.map((part) => executeBeforeMessageWriteHooks(instance.id, chatId, part)), + ); + const _fmtMode = (instance.messageFormatMode as 'convert' | 'passthrough') ?? 'convert'; + const replyTo = messages[0]?.payload.replyToId ?? messages[0]?.payload.externalId; + + // T8: Processing send request + recordJourneyCheckpoint(correlationId, 'T8', JOURNEY_STAGES.T8); + + await withLifecycleSpan( + 'omni.provider_outbound', + buildLifecycleSpanAttributes({ + ...lifecycleBase, + stage: 'provider_outbound', + outputText: parts.join('\n'), + extra: { parts_count: parts.length, provider_id: provider.id, provider_schema: provider.schema }, + }), + () => + sendResponseParts( + channel, + instance.id, + chatId, + parts, + getSplitDelayConfig(instance), + _fmtMode, + replyTo, + correlationId, + senderAgentId, + ), + ); - // T9: Outbound sent via plugin - recordJourneyCheckpoint(correlationId, 'T9', JOURNEY_STAGES.T9); + // T9: Outbound sent via plugin + recordJourneyCheckpoint(correlationId, 'T9', JOURNEY_STAGES.T9); - // T10: Agent chaining — forward response to chained instance if configured - await forwardToChainedInstance(instance, parts, correlationId, messages); - } else if (handoffTriggered) { - log.info('Agent response suppressed — handoff triggered during run', { - instanceId: instance.id, - chatId, - }); - } + // T10: Agent chaining — forward response to chained instance if configured + await forwardToChainedInstance(instance, parts, correlationId, messages); + } else if (handoffTriggered) { + log.info('Agent response suppressed — handoff triggered during run', { + instanceId: instance.id, + chatId, + }); + } - log.info('Agent response via IAgentProvider', { - instanceId: instance.id, - chatId, - parts: result?.parts.length ?? 0, - providerId: result?.metadata.providerId, - durationMs: result?.metadata.durationMs, - triggerType, - traceId, - }); + log.info('Agent response via IAgentProvider', { + instanceId: instance.id, + chatId, + parts: result?.parts.length ?? 0, + providerId: result?.metadata.providerId, + durationMs: result?.metadata.durationMs, + triggerType, + traceId, + }); - return true; + return true; + }, + ); } /** @@ -2200,20 +2444,57 @@ async function dispatchViaLegacy( mediaFiles.length > 0 ? (mediaFiles as unknown as ProviderFile[]) : undefined, ); - const result = await services.agentRunner.run({ - instance, - chatId, - personId, + const lifecycleSessionId = computeSessionId( + instance.agentSessionStrategy ?? 'per_chat', senderId, - senderName, - senderAvatarUrl, - senderPlatformUsername, - chatType, - chatName, - participantCount, - messages: messageTexts, - files: mediaFiles.length > 0 ? mediaFiles : undefined, - }); + chatId, + (rawPl as Record).threadId as string | undefined, + ); + const lifecycleBase = { + eventType: 'user_message_turn', + channel, + provider: 'legacy-agent-runner', + instanceId: instance.id, + chatId, + sessionId: lifecycleSessionId, + traceId, + messageId: messages[0]?.payload.externalId, + agentId: instance.agentInternalId ?? instance.agentId ?? undefined, + inputText: messageTexts.join('\n'), + }; + + emitLifecycleSpan( + 'omni.provider_inbound', + buildLifecycleSpanAttributes({ + ...lifecycleBase, + stage: 'provider_inbound', + extra: { trigger_type: triggerType, message_count: messages.length }, + }), + ); + + const result = await withLifecycleSpan( + 'omni.dispatch_to_agno', + buildLifecycleSpanAttributes({ + ...lifecycleBase, + stage: 'dispatch_to_agno', + extra: { trigger_type: triggerType, provider_schema: 'legacy-agent-runner' }, + }), + () => + services.agentRunner.run({ + instance, + chatId, + personId, + senderId, + senderName, + senderAvatarUrl, + senderPlatformUsername, + chatType, + chatName, + participantCount, + messages: messageTexts, + files: mediaFiles.length > 0 ? mediaFiles : undefined, + }), + ); const correlationId = messages[0]?.metadata.correlationId; const selfChat = isSelfChat(chatId, instance.ownerIdentifier); @@ -2228,16 +2509,26 @@ async function dispatchViaLegacy( // T8: Processing send request recordJourneyCheckpoint(correlationId, 'T8', JOURNEY_STAGES.T8); - await sendResponseParts( - channel, - instance.id, - chatId, - parts, - getSplitDelayConfig(instance), - _fmtMode, - replyTo, - correlationId, - senderAgentId, + await withLifecycleSpan( + 'omni.provider_outbound', + buildLifecycleSpanAttributes({ + ...lifecycleBase, + stage: 'provider_outbound', + outputText: parts.join('\n'), + extra: { parts_count: parts.length, provider_schema: 'legacy-agent-runner' }, + }), + () => + sendResponseParts( + channel, + instance.id, + chatId, + parts, + getSplitDelayConfig(instance), + _fmtMode, + replyTo, + correlationId, + senderAgentId, + ), ); // T9: Outbound sent via plugin @@ -4874,6 +5165,7 @@ export const __test__ = { getDebounceConfig, extractKhalSessionId, buildTriggerHeaders, + buildLifecycleSpanAttributes, createNatsGenieProviderInstance, /** Override the NatsGenieProvider constructor for tests (avoids barrel mock contamination). */ set NatsGenieProviderClass(cls: typeof NatsGenieProvider) { diff --git a/packages/api/src/providers/gemini/videogen.test.ts b/packages/api/src/providers/gemini/videogen.test.ts index 416de6cf..a6287d06 100644 --- a/packages/api/src/providers/gemini/videogen.test.ts +++ b/packages/api/src/providers/gemini/videogen.test.ts @@ -73,7 +73,7 @@ describe('GeminiVideoGenProvider', () => { expect(request.prompt).toContain('1. hero frame'); }); - it('preserves generateAudio for text-to-video requests', async () => { + it('omits generateAudio for text-to-video requests because current Veo 3.1 API rejects it', async () => { generateVideoCalls.length = 0; const provider = new GeminiVideoGenProvider({ getSecret: async () => 'test-gemini-key', @@ -84,10 +84,10 @@ describe('GeminiVideoGenProvider', () => { expect(generateVideoCalls).toHaveLength(1); const request = generateVideoCalls[0] as { config?: Record }; - expect(request.config?.generateAudio).toBe(true); + expect(request.config?.generateAudio).toBeUndefined(); }); - it('can disable generated audio for text-to-video requests', async () => { + it('treats --no-audio as a compatibility no-op while generateAudio is unsupported', async () => { generateVideoCalls.length = 0; const provider = new GeminiVideoGenProvider({ getSecret: async () => 'test-gemini-key', @@ -98,6 +98,6 @@ describe('GeminiVideoGenProvider', () => { expect(generateVideoCalls).toHaveLength(1); const request = generateVideoCalls[0] as { config?: Record }; - expect(request.config?.generateAudio).toBe(false); + expect(request.config?.generateAudio).toBeUndefined(); }); }); diff --git a/packages/api/src/providers/gemini/videogen.ts b/packages/api/src/providers/gemini/videogen.ts index 7c41d27e..c527e1d3 100644 --- a/packages/api/src/providers/gemini/videogen.ts +++ b/packages/api/src/providers/gemini/videogen.ts @@ -70,7 +70,9 @@ export class GeminiVideoGenProvider implements IVideoGenProvider { ...(options?.durationSec !== undefined ? { durationSeconds: options.durationSec } : {}), ...(options?.seed !== undefined ? { seed: options.seed } : {}), ...(options?.resolution !== undefined ? { resolution: options.resolution } : {}), - ...(!options?.imageBase64 ? { generateAudio: options?.audio !== false } : {}), + // The current Gemini Veo 3.1 API rejects generateAudio in this SDK path. + // Keep --no-audio as a no-op compatibility flag until the provider exposes + // a supported audio toggle again. }, }; if (options?.imageBase64) { diff --git a/packages/api/src/routes/v2/__tests__/messages-send-media.test.ts b/packages/api/src/routes/v2/__tests__/messages-send-media.test.ts index 2677100a..81c3cb1d 100644 --- a/packages/api/src/routes/v2/__tests__/messages-send-media.test.ts +++ b/packages/api/src/routes/v2/__tests__/messages-send-media.test.ts @@ -67,4 +67,44 @@ describe('POST /messages/send/media', () => { }, }); }); + + test('forwards WhatsApp voice-note audio as audioBuffer instead of base64', async () => { + const sendMessage = mock(async (_instanceId: string, _message: unknown) => ({ + success: true, + messageId: 'VOICE-MSG-ID', + timestamp: 123, + })); + const app = mountMessagesRoutes(sendMessage); + const audio = Buffer.from('ogg-opus-bytes'); + + const res = await app.request('/messages/send/media', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ + instanceId: '11111111-1111-4111-8111-111111111111', + to: '5511999999999@s.whatsapp.net', + type: 'audio', + base64: audio.toString('base64'), + filename: 'voice.ogg', + voiceNote: true, + }), + }); + + expect(res.status).toBe(201); + expect(sendMessage).toHaveBeenCalledTimes(1); + const message = sendMessage.mock.calls[0]?.[1] as { metadata?: Record }; + expect(message).toMatchObject({ + content: { + type: 'audio', + filename: 'voice.ogg', + mimeType: 'audio/ogg; codecs=opus', + }, + metadata: { + ptt: true, + }, + }); + expect(message.metadata?.base64).toBeUndefined(); + expect(Buffer.isBuffer(message.metadata?.audioBuffer)).toBe(true); + expect((message.metadata?.audioBuffer as Buffer).equals(audio)).toBe(true); + }); }); diff --git a/packages/api/src/routes/v2/__tests__/messages-send-reaction.test.ts b/packages/api/src/routes/v2/__tests__/messages-send-reaction.test.ts new file mode 100644 index 00000000..6db9a215 --- /dev/null +++ b/packages/api/src/routes/v2/__tests__/messages-send-reaction.test.ts @@ -0,0 +1,208 @@ +import { describe, expect, mock, test } from 'bun:test'; +import { NotFoundError } from '@omni/core'; +import { Hono } from 'hono'; +import type { AppVariables } from '../../../types'; +import { messagesRoutes } from '../messages'; + +const INSTANCE_ID = '11111111-1111-4111-8111-111111111111'; +const CHAT_ID = '22222222-2222-4222-8222-222222222222'; +const OTHER_CHAT_ID = '44444444-4444-4444-8444-444444444444'; +const OMNI_MESSAGE_ID = '33333333-3333-4333-8333-333333333333'; +const CHAT_EXTERNAL_ID = '51961151926407@lid'; +const MESSAGE_EXTERNAL_ID = '2A0726AEA0EE1EB26093'; + +type MountOptions = { + sendMessage?: ReturnType; + chatFound?: boolean; + messageChatId?: string; + getByIdThrows?: boolean; + getByIdError?: Error; +}; + +function mountMessagesRoutes(options: MountOptions = {}): Hono<{ Variables: AppVariables }> { + const sendMessage = + options.sendMessage ?? + mock(async (_instanceId: string, _message: unknown) => ({ + success: true, + messageId: 'REACTION-MSG-ID', + timestamp: 123, + })); + + const app = new Hono<{ Variables: AppVariables }>(); + app.use('*', async (c, next) => { + c.set('services', { + instances: { + getById: mock(async (id: string) => ({ id, channel: 'whatsapp-baileys' })), + }, + persons: { + getIdentityForChannel: mock(async () => null), + }, + chats: { + getById: mock(async (id: string) => ({ id, externalId: CHAT_EXTERNAL_ID })), + findByExternalIdSmart: mock(async () => + options.chatFound === false + ? null + : { + id: CHAT_ID, + externalId: CHAT_EXTERNAL_ID, + }, + ), + }, + messages: { + getById: mock(async (id: string) => { + if (options.getByIdError) throw options.getByIdError; + if (options.getByIdThrows) throw new NotFoundError('Message', id); + return { + id, + chatId: options.messageChatId ?? CHAT_ID, + externalId: MESSAGE_EXTERNAL_ID, + isFromMe: false, + rawPayload: { + key: { + id: MESSAGE_EXTERNAL_ID, + remoteJid: CHAT_EXTERNAL_ID, + fromMe: false, + }, + }, + }; + }), + getByExternalId: mock(async (_chatId: string, externalId: string) => + externalId === MESSAGE_EXTERNAL_ID + ? { + id: OMNI_MESSAGE_ID, + chatId: CHAT_ID, + externalId: MESSAGE_EXTERNAL_ID, + isFromMe: false, + rawPayload: { + key: { + id: MESSAGE_EXTERNAL_ID, + remoteJid: CHAT_EXTERNAL_ID, + fromMe: false, + }, + }, + } + : null, + ), + }, + } as never); + c.set('channelRegistry', { + get: mock(() => ({ + capabilities: { canSendReaction: true }, + sendMessage, + })), + } as never); + c.set('apiKey', { + id: 'test', + name: 'test', + scopes: ['*'], + instanceIds: null, + expiresAt: null, + } as never); + await next(); + }); + app.route('/messages', messagesRoutes); + return app; +} + +async function postReaction(app: Hono<{ Variables: AppVariables }>, messageId: string) { + return app.request('/messages/send/reaction', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ + instanceId: INSTANCE_ID, + to: CHAT_ID, + messageId, + emoji: '👍', + }), + }); +} + +describe('POST /messages/send/reaction', () => { + test('resolves an Omni message UUID to the channel-native external ID before sending', async () => { + const sendMessage = mock(async (_instanceId: string, _message: unknown) => ({ + success: true, + messageId: 'REACTION-MSG-ID', + timestamp: 123, + })); + const app = mountMessagesRoutes({ sendMessage }); + + const res = await postReaction(app, OMNI_MESSAGE_ID); + + expect(res.status).toBe(200); + expect(sendMessage).toHaveBeenCalledTimes(1); + expect(sendMessage.mock.calls[0]?.[1]).toMatchObject({ + to: CHAT_EXTERNAL_ID, + content: { + type: 'reaction', + emoji: '👍', + targetMessageId: MESSAGE_EXTERNAL_ID, + }, + metadata: { + fromMe: false, + }, + }); + }); + + test('fails closed for an Omni message UUID when the chat cannot be resolved', async () => { + const sendMessage = mock(async () => ({ success: true })); + const app = mountMessagesRoutes({ sendMessage, chatFound: false }); + + const res = await postReaction(app, OMNI_MESSAGE_ID); + + expect(res.status).toBeGreaterThanOrEqual(400); + expect(sendMessage).not.toHaveBeenCalled(); + }); + + test('fails closed for an Omni message UUID that belongs to another chat', async () => { + const sendMessage = mock(async () => ({ success: true })); + const app = mountMessagesRoutes({ sendMessage, messageChatId: OTHER_CHAT_ID }); + + const res = await postReaction(app, OMNI_MESSAGE_ID); + + expect(res.status).toBeGreaterThanOrEqual(400); + expect(sendMessage).not.toHaveBeenCalled(); + }); + + test('fails closed for an unknown Omni message UUID', async () => { + const sendMessage = mock(async () => ({ success: true })); + const app = mountMessagesRoutes({ sendMessage, getByIdThrows: true }); + + const res = await postReaction(app, OMNI_MESSAGE_ID); + + expect(res.status).toBeGreaterThanOrEqual(400); + expect(sendMessage).not.toHaveBeenCalled(); + }); + + test('does not mask unexpected message lookup errors as not found', async () => { + const sendMessage = mock(async () => ({ success: true })); + const app = mountMessagesRoutes({ sendMessage, getByIdError: new Error('database unavailable') }); + + const res = await postReaction(app, OMNI_MESSAGE_ID); + + expect(res.status).toBe(500); + expect(sendMessage).not.toHaveBeenCalled(); + }); + + test('preserves external message IDs for plugin fallback when DB lookup misses', async () => { + const sendMessage = mock(async (_instanceId: string, _message: unknown) => ({ + success: true, + messageId: 'REACTION-MSG-ID', + timestamp: 123, + })); + const app = mountMessagesRoutes({ sendMessage }); + const externalMessageId = 'EXTERNAL-NOT-IN-DB'; + + const res = await postReaction(app, externalMessageId); + + expect(res.status).toBe(200); + expect(sendMessage).toHaveBeenCalledTimes(1); + expect(sendMessage.mock.calls[0]?.[1]).toMatchObject({ + content: { + type: 'reaction', + emoji: '👍', + targetMessageId: externalMessageId, + }, + metadata: {}, + }); + }); +}); diff --git a/packages/api/src/routes/v2/messages.ts b/packages/api/src/routes/v2/messages.ts index 0d121840..60c1e6e2 100644 --- a/packages/api/src/routes/v2/messages.ts +++ b/packages/api/src/routes/v2/messages.ts @@ -39,7 +39,7 @@ import { extname, join } from 'node:path'; import { zValidator } from '@hono/zod-validator'; import { sanitizeOutboundText } from '@omni/channel-sdk'; import type { ChannelRegistry, OutgoingContent, OutgoingMessage } from '@omni/channel-sdk'; -import { ERROR_CODES, JOURNEY_STAGES, OmniError, createLogger, getJourneyTracker } from '@omni/core'; +import { ERROR_CODES, JOURNEY_STAGES, NotFoundError, OmniError, createLogger, getJourneyTracker } from '@omni/core'; import type { ChatClosedPayload, CloseContactOutcome } from '@omni/core/events'; import type { ChannelType } from '@omni/core/types'; import type { Database } from '@omni/db'; @@ -116,6 +116,92 @@ function extractReactionTargetParticipant(rawPayload: Record | return typeof participant === 'string' && participant.length > 0 ? participant : undefined; } +async function resolveReactionTarget( + services: Services, + instanceId: string, + resolvedTo: string, + messageId: string, +): Promise<{ targetMessageId: string; metadata: Record }> { + const metadata: Record = {}; + const chat = await services.chats.findByExternalIdSmart(instanceId, resolvedTo); + + if (!chat) { + if (isUUID(messageId)) { + throw new OmniError({ + code: ERROR_CODES.NOT_FOUND, + message: `Reaction target message not found: ${messageId}`, + context: { instanceId, resolvedTo, messageId }, + recoverable: false, + }); + } + + log.warn('Reaction target chat not found in DB; deferring fromMe to channel plugin fallback (#386)', { + instanceId, + resolvedTo, + messageId, + fallback: 'plugin-heuristic', + }); + return { targetMessageId: messageId, metadata }; + } + + const target = isUUID(messageId) + ? await getReactionTargetByOmniId(services, instanceId, chat.id, messageId) + : await services.messages.getByExternalId(chat.id, messageId); + + if (!target) { + log.warn('Reaction target message not found in DB; deferring fromMe to channel plugin fallback (#386)', { + instanceId, + chatId: chat.id, + messageId, + fallback: 'plugin-heuristic', + }); + return { targetMessageId: messageId, metadata }; + } + + metadata.fromMe = target.isFromMe === true; + if (target.isFromMe !== true) { + const participant = extractReactionTargetParticipant( + target.rawPayload as Record | null | undefined, + ); + if (participant) metadata.targetParticipant = participant; + } + + return { targetMessageId: target.externalId, metadata }; +} + +async function getReactionTargetByOmniId( + services: Services, + instanceId: string, + chatId: string, + messageId: string, +): Promise>> { + let target: Awaited>; + + try { + target = await services.messages.getById(messageId); + } catch (error) { + if (error instanceof NotFoundError) { + throw reactionTargetNotFound(instanceId, chatId, messageId); + } + throw error; + } + + if (target.chatId !== chatId) { + throw reactionTargetNotFound(instanceId, chatId, messageId); + } + + return target; +} + +function reactionTargetNotFound(instanceId: string, chatId: string, messageId: string): OmniError { + return new OmniError({ + code: ERROR_CODES.NOT_FOUND, + message: `Reaction target message not found: ${messageId}`, + context: { instanceId, chatId, messageId }, + recoverable: false, + }); +} + /** * Resolve recipient - handles Omni person IDs, Omni chat IDs, and platform IDs (WA JID etc.) * @@ -447,6 +533,21 @@ const sendMediaSchema = z.object({ threadId: z.string().optional().describe('Thread/topic ID (e.g. Telegram forum topic)'), }); +function normalizeSendMediaMimeType(data: z.infer): string { + const inferred = data.mimeType ?? inferMediaMimeType(data.type, data.filename); + if (data.type === 'audio' && data.voiceNote === true && inferred === 'audio/ogg') { + return 'audio/ogg; codecs=opus'; + } + return inferred; +} + +function buildSendMediaMetadata(data: z.infer): Record { + if (data.type === 'audio' && data.voiceNote === true && data.base64) { + return { audioBuffer: Buffer.from(data.base64, 'base64'), ptt: true }; + } + return { base64: data.base64, ptt: data.voiceNote }; +} + // Send reaction schema const sendReactionSchema = z.object({ instanceId: z.string().uuid().describe('Instance ID'), @@ -1061,7 +1162,7 @@ messagesRoutes.post('/send/media', zValidator('json', sendMediaSchema), async (c // Resolve recipient (handles person ID to platform ID resolution) const resolvedTo = await resolveRecipient(data.to, instance.channel, services); - const mediaMimeType = data.mimeType ?? inferMediaMimeType(data.type, data.filename); + const mediaMimeType = normalizeSendMediaMimeType(data); // Build outgoing message const outgoingMessage: OutgoingMessage = { @@ -1074,11 +1175,7 @@ messagesRoutes.post('/send/media', zValidator('json', sendMediaSchema), async (c filename: data.filename, mimeType: mediaMimeType, } as OutgoingContent, - metadata: { - base64: data.base64, - // WhatsApp uses 'ptt' (push-to-talk) flag for voice notes - ptt: data.voiceNote, - }, + metadata: buildSendMediaMetadata(data), }; // T8: API processed the send request @@ -1175,39 +1272,16 @@ messagesRoutes.post('/send/reaction', zValidator('json', sendReactionSchema), as // Note: For reactions, 'to' is typically a chat ID, but we support person ID resolution too const resolvedTo = await resolveRecipient(to, instance.channel, services); - // Look up the target message to determine fromMe (critical for WhatsApp reactions). - // Baileys needs key.fromMe to locate the correct message — if wrong, the reaction - // is silently dropped by WhatsApp. When the target isn't in our DB (history gap - // or unsynced chat), we leave fromMe undefined and let the channel plugin's - // heuristic decide — forcing false here breaks bot-to-own-message reactions (#386). - const reactionMetadata: Record = {}; - const chat = await services.chats.findByExternalIdSmart(instanceId, resolvedTo); - if (chat) { - const target = await services.messages.getByExternalId(chat.id, messageId); - if (target) { - reactionMetadata.fromMe = target.isFromMe === true; - if (target.isFromMe !== true) { - const participant = extractReactionTargetParticipant( - target.rawPayload as Record | null | undefined, - ); - if (participant) reactionMetadata.targetParticipant = participant; - } - } else { - log.warn('Reaction target message not found in DB; deferring fromMe to channel plugin fallback (#386)', { - instanceId, - chatId: chat.id, - messageId, - fallback: 'plugin-heuristic', - }); - } - } else { - log.warn('Reaction target chat not found in DB; deferring fromMe to channel plugin fallback (#386)', { - instanceId, - resolvedTo, - messageId, - fallback: 'plugin-heuristic', - }); - } + // Look up the target message to determine the provider-native ID and fromMe + // (critical for WhatsApp reactions). CLI/history surfaces Omni message UUIDs, + // but Baileys needs the WhatsApp externalId in key.id; sending an Omni UUID can + // return command-level success while WhatsApp silently ignores the reaction. + const { targetMessageId, metadata: reactionMetadata } = await resolveReactionTarget( + services, + instanceId, + resolvedTo, + messageId, + ); // Build outgoing message for reaction. When the target is unknown, omit // metadata so the plugin applies its own fallback (defaults to true for Baileys). @@ -1216,7 +1290,7 @@ messagesRoutes.post('/send/reaction', zValidator('json', sendReactionSchema), as content: { type: 'reaction', emoji, - targetMessageId: messageId, + targetMessageId, } as OutgoingContent, metadata: reactionMetadata, }; diff --git a/packages/api/src/services/__tests__/persons.test.ts b/packages/api/src/services/__tests__/persons.test.ts index 54b4d0a6..dce2d800 100644 --- a/packages/api/src/services/__tests__/persons.test.ts +++ b/packages/api/src/services/__tests__/persons.test.ts @@ -88,6 +88,28 @@ describe('PersonService', () => { service = new PersonService(mockDb as unknown as Database, mockEventBus); }); + describe('search()', () => { + test('returns people whose WhatsApp name only exists on chat participants', async () => { + const participantBackedPerson = { + id: 'person-cadu', + displayName: 'Cadu Cassau', + primaryPhone: null, + primaryEmail: null, + avatarUrl: null, + metadata: null, + createdAt: new Date('2026-04-10T06:36:32Z'), + updatedAt: new Date('2026-06-02T09:36:18Z'), + }; + + (mockDb as unknown as { execute: ReturnType }).execute = mock(async () => [participantBackedPerson]); + + const result = await service.search('Cadu Cassau', 5); + + expect((mockDb as unknown as { execute: ReturnType }).execute).toHaveBeenCalled(); + expect(result).toEqual([participantBackedPerson]); + }); + }); + describe('getIdentityForChannel()', () => { test('returns identity when person has single identity on channel', async () => { const identity = createMockIdentity({ diff --git a/packages/api/src/services/persons.ts b/packages/api/src/services/persons.ts index c1de264d..2b5df3a0 100644 --- a/packages/api/src/services/persons.ts +++ b/packages/api/src/services/persons.ts @@ -15,7 +15,7 @@ import { persons, platformIdentities, } from '@omni/db'; -import { and, desc, eq, ilike, isNotNull, ne, or, sql } from 'drizzle-orm'; +import { and, desc, eq, isNotNull, ne, or, sql } from 'drizzle-orm'; export interface PersonWithIdentities extends Person { identities: PlatformIdentity[]; @@ -76,22 +76,76 @@ export class PersonService { } /** - * Search persons by name, email, or phone + * Search persons by name, email, phone, platform identity, or chat participant display name. + * + * WhatsApp LID DMs frequently have the useful contact name only on chat_participants, + * while the canonical person row is intentionally sparse. Searching persons alone makes + * the identity graph useless for the main operational task: "find my chat with X". */ async search(query: string, limit = 20): Promise { const searchPattern = `%${query}%`; - return this.db - .select() - .from(persons) - .where( - or( - ilike(persons.displayName, searchPattern), - ilike(persons.primaryEmail, searchPattern), - ilike(persons.primaryPhone, searchPattern), - ), + const result = await this.db.execute(sql` + WITH candidates AS ( + SELECT + p.id, + CASE + WHEN p.display_name ILIKE ${searchPattern} ESCAPE '' THEN p.display_name + WHEN pi.platform_username ILIKE ${searchPattern} ESCAPE '' THEN pi.platform_username + WHEN cp.display_name ILIKE ${searchPattern} ESCAPE '' THEN cp.display_name + ELSE COALESCE(NULLIF(p.display_name, ''), NULLIF(pi.platform_username, ''), NULLIF(cp.display_name, '')) + END AS "displayName", + p.primary_phone AS "primaryPhone", + p.primary_email AS "primaryEmail", + p.avatar_url AS "avatarUrl", + p.metadata, + p.created_at AS "createdAt", + p.updated_at AS "updatedAt", + GREATEST( + COALESCE(pi.last_seen_at, 'epoch'::timestamptz), + COALESCE(cp.last_seen_at, 'epoch'::timestamptz), + COALESCE(p.updated_at, 'epoch'::timestamptz) + ) AS rank_ts + FROM persons p + LEFT JOIN platform_identities pi ON pi.person_id = p.id + LEFT JOIN chat_participants cp ON cp.person_id = p.id OR cp.platform_identity_id = pi.id + WHERE + p.display_name ILIKE ${searchPattern} ESCAPE '' + OR p.primary_email ILIKE ${searchPattern} ESCAPE '' + OR p.primary_phone ILIKE ${searchPattern} ESCAPE '' + OR pi.platform_username ILIKE ${searchPattern} ESCAPE '' + OR pi.platform_user_id ILIKE ${searchPattern} ESCAPE '' + OR cp.display_name ILIKE ${searchPattern} ESCAPE '' + OR cp.platform_user_id ILIKE ${searchPattern} ESCAPE '' + ), distinct_candidates AS ( + SELECT DISTINCT ON (id) + id, + "displayName", + "primaryPhone", + "primaryEmail", + "avatarUrl", + metadata, + "createdAt", + "updatedAt", + rank_ts + FROM candidates + ORDER BY id, rank_ts DESC ) - .limit(limit); + SELECT + id, + "displayName", + "primaryPhone", + "primaryEmail", + "avatarUrl", + metadata, + "createdAt", + "updatedAt" + FROM distinct_candidates + ORDER BY rank_ts DESC + LIMIT ${limit} + `); + + return result as unknown as Person[]; } /** diff --git a/packages/channel-a2a/package.json b/packages/channel-a2a/package.json index 551a22a0..f8ce2b5e 100644 --- a/packages/channel-a2a/package.json +++ b/packages/channel-a2a/package.json @@ -1,6 +1,6 @@ { "name": "@omni/channel-a2a", - "version": "2.260531.4", + "version": "2.260602.2", "description": "A2A protocol server channel for Omni", "type": "module", "main": "src/index.ts", diff --git a/packages/channel-discord/package.json b/packages/channel-discord/package.json index 1886a667..4e59375a 100644 --- a/packages/channel-discord/package.json +++ b/packages/channel-discord/package.json @@ -1,6 +1,6 @@ { "name": "@omni/channel-discord", - "version": "2.260531.4", + "version": "2.260602.2", "description": "Discord channel plugin for Omni using discord.js", "type": "module", "main": "src/index.ts", diff --git a/packages/channel-gupshup/package.json b/packages/channel-gupshup/package.json index 90afaf01..19f6f2c8 100644 --- a/packages/channel-gupshup/package.json +++ b/packages/channel-gupshup/package.json @@ -1,6 +1,6 @@ { "name": "@omni/channel-gupshup", - "version": "2.260531.4", + "version": "2.260602.2", "description": "Gupshup WhatsApp BSP channel plugin for Omni", "type": "module", "main": "src/index.ts", diff --git a/packages/channel-internal/package.json b/packages/channel-internal/package.json index 9d743a29..0be563d6 100644 --- a/packages/channel-internal/package.json +++ b/packages/channel-internal/package.json @@ -1,6 +1,6 @@ { "name": "@omni/channel-internal", - "version": "2.260531.4", + "version": "2.260602.2", "description": "Internal agent-to-agent routing channel for Omni", "type": "module", "main": "src/index.ts", diff --git a/packages/channel-sdk/package.json b/packages/channel-sdk/package.json index ac5e0a47..921ae894 100644 --- a/packages/channel-sdk/package.json +++ b/packages/channel-sdk/package.json @@ -1,6 +1,6 @@ { "name": "@omni/channel-sdk", - "version": "2.260531.4", + "version": "2.260602.2", "type": "module", "exports": { ".": { diff --git a/packages/channel-slack/package.json b/packages/channel-slack/package.json index 56949d2c..a1d32bac 100644 --- a/packages/channel-slack/package.json +++ b/packages/channel-slack/package.json @@ -1,6 +1,6 @@ { "name": "@omni/channel-slack", - "version": "2.260531.4", + "version": "2.260602.2", "description": "Slack channel plugin for Omni using Bolt.js with Socket Mode", "type": "module", "main": "src/index.ts", diff --git a/packages/channel-telegram/package.json b/packages/channel-telegram/package.json index 74fb2025..57e11dfc 100644 --- a/packages/channel-telegram/package.json +++ b/packages/channel-telegram/package.json @@ -1,6 +1,6 @@ { "name": "@omni/channel-telegram", - "version": "2.260531.4", + "version": "2.260602.2", "description": "Telegram channel plugin for Omni using grammy", "type": "module", "main": "src/index.ts", diff --git a/packages/channel-twilio-whatsapp/package.json b/packages/channel-twilio-whatsapp/package.json index 3c20133a..2d8e8bec 100644 --- a/packages/channel-twilio-whatsapp/package.json +++ b/packages/channel-twilio-whatsapp/package.json @@ -1,6 +1,6 @@ { "name": "@omni/channel-twilio-whatsapp", - "version": "2.260531.4", + "version": "2.260602.2", "description": "Twilio WhatsApp channel plugin for Omni", "type": "module", "main": "src/index.ts", diff --git a/packages/channel-whatsapp/package.json b/packages/channel-whatsapp/package.json index 35878e2a..293bf13c 100644 --- a/packages/channel-whatsapp/package.json +++ b/packages/channel-whatsapp/package.json @@ -1,6 +1,6 @@ { "name": "@omni/channel-whatsapp", - "version": "2.260531.4", + "version": "2.260602.2", "description": "WhatsApp channel plugin for Omni using Baileys", "type": "module", "main": "src/index.ts", diff --git a/packages/cli/package.json b/packages/cli/package.json index 516ef683..c858bf62 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -1,6 +1,6 @@ { "name": "@automagik/omni", - "version": "2.260531.4", + "version": "2.260602.2", "description": "LLM-optimized CLI for Omni", "type": "module", "bin": { diff --git a/packages/cli/src/commands/persons.ts b/packages/cli/src/commands/persons.ts index 0527958c..8be78051 100644 --- a/packages/cli/src/commands/persons.ts +++ b/packages/cli/src/commands/persons.ts @@ -36,14 +36,16 @@ export function createPersonsCommand(): Command { const persons = results as Array<{ id: string; displayName: string | null; - email: string | null; - phone: string | null; + primaryEmail?: string | null; + primaryPhone?: string | null; + email?: string | null; + phone?: string | null; }>; const items = persons.map((p) => ({ id: p.id, displayName: p.displayName ?? '-', - email: p.email ?? '-', - phone: p.phone ?? '-', + email: p.primaryEmail ?? p.email ?? '-', + phone: p.primaryPhone ?? p.phone ?? '-', })); output.list(items, { emptyMessage: 'No persons found.' }); diff --git a/packages/core/package.json b/packages/core/package.json index 7a86f881..f4cdf4a5 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -1,6 +1,6 @@ { "name": "@omni/core", - "version": "2.260531.4", + "version": "2.260602.2", "type": "module", "exports": { ".": { diff --git a/packages/core/src/providers/__tests__/agno-client.test.ts b/packages/core/src/providers/__tests__/agno-client.test.ts index 08f72737..899ed4f1 100644 --- a/packages/core/src/providers/__tests__/agno-client.test.ts +++ b/packages/core/src/providers/__tests__/agno-client.test.ts @@ -364,6 +364,53 @@ describe('AgnoClient', () => { expect(chunks[3]).toMatchObject({ event: 'RunCompleted', isComplete: true, fullContent: 'Hello World!' }); }); + it('propagates W3C trace context and Khal session headers for streaming runs', async () => { + const sseData = 'event: RunCompleted\ndata: {"run_id": "run-1", "content": "done"}\n\n'; + const encoder = new TextEncoder(); + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode(sseData)); + controller.close(); + }, + }); + + mockImpl.mockResolvedValueOnce( + new Response(stream, { status: 200, headers: { 'Content-Type': 'text/event-stream' } }), + ); + + const client = new AgnoClient(config); + for await (const _chunk of client.stream({ + message: 'Hi!', + agentId: 'agent-1', + userId: 'user-456', + khalSessionId: 'khal-session-stream', + traceContext: { + traceId: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa', + spanId: 'bbbbbbbbbbbbbbbb', + traceFlags: 1, + }, + omni: { + instanceId: 'inst-1', + chatId: 'chat-123', + messageId: 'msg-stream-1', + channel: 'gupshup', + }, + })) { + // exhaust stream so the request is issued + } + + const init = mockImpl.mock.calls[0]?.[1] as RequestInit; + const headers = init.headers as Record; + const formData = init.body as FormData; + expect(headers.traceparent).toBe('00-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-bbbbbbbbbbbbbbbb-01'); + expect(headers['x-khal-session-id']).toBe('khal-session-stream'); + expect(headers['x-khal-message-id']).toBe('msg-stream-1'); + expect(headers['x-omni-instance-id']).toBe('inst-1'); + expect(headers['x-omni-chat-id']).toBe('chat-123'); + expect(headers['x-omni-channel']).toBe('gupshup'); + expect(formData.get('session_id')).toBe('khal-session-stream'); + }); + it('handles stream errors', async () => { const sseData = 'event: RunFailed\ndata: {"error": "Agent crashed"}\n\n'; diff --git a/packages/db/package.json b/packages/db/package.json index fc83a403..3024707c 100644 --- a/packages/db/package.json +++ b/packages/db/package.json @@ -1,6 +1,6 @@ { "name": "@omni/db", - "version": "2.260531.4", + "version": "2.260602.2", "type": "module", "exports": { ".": { diff --git a/packages/media-processing/package.json b/packages/media-processing/package.json index 36b331fd..7782f231 100644 --- a/packages/media-processing/package.json +++ b/packages/media-processing/package.json @@ -1,6 +1,6 @@ { "name": "@omni/media-processing", - "version": "2.260531.4", + "version": "2.260602.2", "type": "module", "exports": { ".": { diff --git a/packages/plugin-openclaw/package.json b/packages/plugin-openclaw/package.json index 2d1d8a8e..b91ed297 100644 --- a/packages/plugin-openclaw/package.json +++ b/packages/plugin-openclaw/package.json @@ -1,6 +1,6 @@ { "name": "@omni/plugin-openclaw", - "version": "2.260531.4", + "version": "2.260602.2", "description": "Expose Omni as a native messaging channel in OpenClaw", "type": "module", "main": "src/index.ts", diff --git a/packages/sdk/package.json b/packages/sdk/package.json index 3ccbf0cb..f08bd57d 100644 --- a/packages/sdk/package.json +++ b/packages/sdk/package.json @@ -1,6 +1,6 @@ { "name": "@omni/sdk", - "version": "2.260531.4", + "version": "2.260602.2", "type": "module", "main": "dist/index.js", "types": "dist/index.d.ts", diff --git a/packages/voice-client/package.json b/packages/voice-client/package.json index 04f6a794..e7f0ff6b 100644 --- a/packages/voice-client/package.json +++ b/packages/voice-client/package.json @@ -1,6 +1,6 @@ { "name": "@omni/voice-client", - "version": "2.260531.4", + "version": "2.260602.2", "type": "module", "exports": { ".": { diff --git a/plugins/omni/.claude-plugin/plugin.json b/plugins/omni/.claude-plugin/plugin.json index 0f4f72a9..c1d996c3 100644 --- a/plugins/omni/.claude-plugin/plugin.json +++ b/plugins/omni/.claude-plugin/plugin.json @@ -1,6 +1,6 @@ { "name": "omni", - "version": "2.260531.4", + "version": "2.260602.2", "description": "Full Omni platform control — three-tier skill system for agents (omni-agent), first-time setup (omni-setup), and platform ops (omni-ops)", "author": { "name": "Automagik"