feat(observability): add Omni curated lifecycle spans#688
Conversation
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: ASSERTIVE Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces human lifecycle observability helpers and OpenTelemetry span wrapping for agent dispatch flows, ensuring sensitive PII and secrets are redacted from attributes. It also adds corresponding tests, including verification of trace context propagation in streaming runs. The review feedback suggests using a nullish check in setTextLifecycleAttributes to safely handle potential null values, and refining the error handling in withLifecycleSpan to guarantee that any OpenTelemetry SDK errors do not disrupt the core application flow if the business logic executes successfully.
| function setTextLifecycleAttributes( | ||
| attributes: Record<string, LifecycleAttributeValue>, | ||
| prefix: 'input' | 'output', | ||
| value: string | undefined, | ||
| ): void { | ||
| if (value === undefined) return; |
There was a problem hiding this comment.
To prevent potential runtime TypeError crashes if value is null (which can happen with database fields or external payloads even if typed as string | undefined), use a nullish check if (value == null) instead of strict equality to undefined.
| function setTextLifecycleAttributes( | |
| attributes: Record<string, LifecycleAttributeValue>, | |
| prefix: 'input' | 'output', | |
| value: string | undefined, | |
| ): void { | |
| if (value === undefined) return; | |
| function setTextLifecycleAttributes( | |
| attributes: Record<string, LifecycleAttributeValue>, | |
| prefix: 'input' | 'output', | |
| value: string | undefined | null, | |
| ): void { | |
| if (value == null) return; |
| async function withLifecycleSpan<T>( | ||
| name: string, | ||
| attributes: Record<string, LifecycleAttributeValue>, | ||
| fn: () => Promise<T>, | ||
| ): Promise<T> { | ||
| let callbackStarted = false; | ||
| try { | ||
| const tracer = trace.getTracer('omni.agent-dispatcher'); | ||
| return await tracer.startActiveSpan(name, { attributes }, async (span) => { | ||
| callbackStarted = true; | ||
| try { | ||
| const result = await fn(); | ||
| span.setStatus({ code: SpanStatusCode.OK }); | ||
| return result; | ||
| } catch (error) { | ||
| span.recordException(error instanceof Error ? error : new Error(String(error))); | ||
| span.setStatus({ code: SpanStatusCode.ERROR, message: error instanceof Error ? error.message : String(error) }); | ||
| throw error; | ||
| } finally { | ||
| span.end(); | ||
| } | ||
| }); | ||
| } catch (error) { | ||
| if (callbackStarted) throw error; | ||
| log.warn('Lifecycle span wrapper failed before dispatch; continuing without span', { spanName: name }); | ||
| return fn(); | ||
| } | ||
| } |
There was a problem hiding this comment.
If an error occurs within the OpenTelemetry SDK itself (e.g., during span.end() or span finalization) after the business logic (fn()) has successfully completed, the current implementation will rethrow the OTel error because callbackStarted is true. Instrumentation should be completely fail-safe and never disrupt the core application flow. Consider tracking whether the business logic itself threw an error, and only rethrow if the error originated from fn().
async function withLifecycleSpan<T>(
name: string,
attributes: Record<string, LifecycleAttributeValue>,
fn: () => Promise<T>,
): Promise<T> {
let businessError: unknown = undefined;
let hasBusinessError = false;
try {
const tracer = trace.getTracer('omni.agent-dispatcher');
return await tracer.startActiveSpan(name, { attributes }, async (span) => {
try {
const result = await fn();
span.setStatus({ code: SpanStatusCode.OK });
return result;
} catch (error) {
hasBusinessError = true;
businessError = error;
span.recordException(error instanceof Error ? error : new Error(String(error)));
span.setStatus({ code: SpanStatusCode.ERROR, message: error instanceof Error ? error.message : String(error) });
throw error;
} finally {
try {
span.end();
} catch (otelError) {
log.warn('Failed to end lifecycle span', { error: String(otelError) });
}
}
});
} catch (error) {
if (hasBusinessError) {
throw businessError;
}
log.warn('Lifecycle span wrapper failed; continuing without span', { spanName: name, error: String(error) });
return fn();
}
}There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 14d2f24d4a
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| attributes['khal.session_id'] = sessionId; | ||
| attributes['langfuse.session.id'] = sessionId; | ||
| attributes['session.id'] = sessionId; |
There was a problem hiding this comment.
Avoid exporting raw session IDs to spans
For the default per_chat/per_user strategies, computeSessionId() returns the raw external chat/user ID (for WhatsApp this is often a phone-bearing JID, and per_thread embeds it too), but these new lifecycle attributes send that value verbatim to OTel/Langfuse. This bypasses the hashing/redaction used for chatId and leaks PII into observability data whenever an instance does not supply an explicit safe Khal session ID; hash/redact these fields or only emit the raw value when it is known to be non-PII.
Useful? React with 👍 / 👎.
Adds minimal curated lifecycle spans for Omni inbound/dispatch/outbound/turn flow with PII-safe lifecycle attributes and strengthens Agno session/trace propagation tests.\n\nLocal guards:\n- bun test packages/api/src/plugins/tests/agent-dispatcher.test.ts packages/core/src/providers/tests/agno-client.test.ts → 86 pass / 0 fail\n- pre-push typecheck → 21 successful / 21 total\n- pre-push full suite → 3964 pass / 292 skip / 0 fail across 307 files\n\nHML gate intent: deploy this exact SHA, send two controlled Gupshup/WhatsApp messages in the same session, query Langfuse by sessionId, and return the Langfuse session URL plus ordered trace IDs.