From ea4a4c5a7f7702785c2d8082ee65138ffb752a42 Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Fri, 31 Oct 2025 01:21:41 +0100 Subject: [PATCH 01/14] Clean up OTel instrumentation --- Directory.Packages.props | 12 +- .../SearchOrAskAi/AskAi/AskAiEvent.ts | 8 +- .../SearchOrAskAi/AskAi/ChatMessage.tsx | 6 +- .../AskAi/StreamingAiMessage.tsx | 2 +- .../AskAi/AskAiEvent.cs | 12 +- .../AskAi/AskAiUsecase.cs | 47 +--- .../SerializationContext.cs | 12 + .../AskAi/AgentBuilderStreamTransformer.cs | 17 +- .../AskAi/LlmGatewayStreamTransformer.cs | 20 +- .../Adapters/AskAi/StreamTransformerBase.cs | 258 +++++++----------- ...ic.Documentation.Api.Infrastructure.csproj | 3 + .../OpenTelemetryExtensions.cs | 52 ++++ .../Program.cs | 51 +--- .../docs-builder/Http/DocumentationWebHost.cs | 1 + src/tooling/docs-builder/docs-builder.csproj | 2 + 15 files changed, 230 insertions(+), 273 deletions(-) create mode 100644 src/api/Elastic.Documentation.Api.Infrastructure/OpenTelemetryExtensions.cs diff --git a/Directory.Packages.props b/Directory.Packages.props index 3dce2b56a..3176770de 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -25,7 +25,7 @@ - + @@ -76,10 +76,10 @@ - - - - + + + + @@ -99,4 +99,4 @@ - \ No newline at end of file + diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/AskAiEvent.ts b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/AskAiEvent.ts index 019cc8ed9..dae62305a 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/AskAiEvent.ts +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/AskAiEvent.ts @@ -4,8 +4,8 @@ import * as z from 'zod' // Event type constants for type-safe referencing export const EventTypes = { CONVERSATION_START: 'conversation_start', - CHUNK: 'chunk', - CHUNK_COMPLETE: 'chunk_complete', + MESSAGE_CHUNK: 'message_chunk', + MESSAGE_COMPLETE: 'message_complete', SEARCH_TOOL_CALL: 'search_tool_call', TOOL_CALL: 'tool_call', TOOL_RESULT: 'tool_result', @@ -23,14 +23,14 @@ export const ConversationStartEventSchema = z.object({ }) export const ChunkEventSchema = z.object({ - type: z.literal(EventTypes.CHUNK), + type: z.literal(EventTypes.MESSAGE_CHUNK), id: z.string(), timestamp: z.number(), content: z.string(), }) export const ChunkCompleteEventSchema = z.object({ - type: z.literal(EventTypes.CHUNK_COMPLETE), + type: z.literal(EventTypes.MESSAGE_COMPLETE), id: z.string(), timestamp: z.number(), fullContent: z.string(), diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/ChatMessage.tsx b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/ChatMessage.tsx index dc033c3fd..720d20854 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/ChatMessage.tsx +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/ChatMessage.tsx @@ -144,7 +144,7 @@ const computeAiStatus = ( m.type === EventTypes.SEARCH_TOOL_CALL || m.type === EventTypes.TOOL_CALL || m.type === EventTypes.TOOL_RESULT || - m.type === EventTypes.CHUNK + m.type === EventTypes.MESSAGE_CHUNK ) .sort((a, b) => a.timestamp - b.timestamp) @@ -166,9 +166,9 @@ const computeAiStatus = ( case EventTypes.TOOL_RESULT: return STATUS_MESSAGES.ANALYZING - case EventTypes.CHUNK: { + case EventTypes.MESSAGE_CHUNK: { const allContent = events - .filter((m) => m.type === EventTypes.CHUNK) + .filter((m) => m.type === EventTypes.MESSAGE_CHUNK) .map((m) => m.content) .join('') diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/StreamingAiMessage.tsx b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/StreamingAiMessage.tsx index 1143465f6..b1a9d08f2 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/StreamingAiMessage.tsx +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/StreamingAiMessage.tsx @@ -35,7 +35,7 @@ export const StreamingAiMessage = ({ if (event.conversationId && !threadId) { setThreadId(event.conversationId) } - } else if (event.type === EventTypes.CHUNK) { + } else if (event.type === EventTypes.MESSAGE_CHUNK) { contentRef.current += event.content } else if (event.type === EventTypes.ERROR) { // Handle error events from the stream diff --git a/src/api/Elastic.Documentation.Api.Core/AskAi/AskAiEvent.cs b/src/api/Elastic.Documentation.Api.Core/AskAi/AskAiEvent.cs index d2eb11b69..2ad3e003c 100644 --- a/src/api/Elastic.Documentation.Api.Core/AskAi/AskAiEvent.cs +++ b/src/api/Elastic.Documentation.Api.Core/AskAi/AskAiEvent.cs @@ -11,8 +11,8 @@ namespace Elastic.Documentation.Api.Core.AskAi; /// [JsonPolymorphic(TypeDiscriminatorPropertyName = "type")] [JsonDerivedType(typeof(ConversationStart), typeDiscriminator: "conversation_start")] -[JsonDerivedType(typeof(Chunk), typeDiscriminator: "chunk")] -[JsonDerivedType(typeof(ChunkComplete), typeDiscriminator: "chunk_complete")] +[JsonDerivedType(typeof(MessageChunk), typeDiscriminator: "message_chunk")] +[JsonDerivedType(typeof(MessageComplete), typeDiscriminator: "message_complete")] [JsonDerivedType(typeof(SearchToolCall), typeDiscriminator: "search_tool_call")] [JsonDerivedType(typeof(ToolCall), typeDiscriminator: "tool_call")] [JsonDerivedType(typeof(ToolResult), typeDiscriminator: "tool_result")] @@ -33,7 +33,7 @@ string ConversationId /// /// Streaming text chunk from AI /// - public sealed record Chunk( + public sealed record MessageChunk( string Id, long Timestamp, string Content @@ -42,7 +42,7 @@ string Content /// /// Complete message when streaming is done /// - public sealed record ChunkComplete( + public sealed record MessageComplete( string Id, long Timestamp, string FullContent @@ -111,8 +111,8 @@ string Message /// [JsonSerializable(typeof(AskAiEvent))] [JsonSerializable(typeof(AskAiEvent.ConversationStart))] -[JsonSerializable(typeof(AskAiEvent.Chunk))] -[JsonSerializable(typeof(AskAiEvent.ChunkComplete))] +[JsonSerializable(typeof(AskAiEvent.MessageChunk))] +[JsonSerializable(typeof(AskAiEvent.MessageComplete))] [JsonSerializable(typeof(AskAiEvent.SearchToolCall))] [JsonSerializable(typeof(AskAiEvent.ToolCall))] [JsonSerializable(typeof(AskAiEvent.ToolResult))] diff --git a/src/api/Elastic.Documentation.Api.Core/AskAi/AskAiUsecase.cs b/src/api/Elastic.Documentation.Api.Core/AskAi/AskAiUsecase.cs index 554bb3545..525aca2ec 100644 --- a/src/api/Elastic.Documentation.Api.Core/AskAi/AskAiUsecase.cs +++ b/src/api/Elastic.Documentation.Api.Core/AskAi/AskAiUsecase.cs @@ -3,6 +3,8 @@ // See the LICENSE file in the project root for more information using System.Diagnostics; +using System.Text.Json; +using Elastic.Documentation.Api.Core; using Microsoft.Extensions.Logging; namespace Elastic.Documentation.Api.Core.AskAi; @@ -16,41 +18,22 @@ public class AskAiUsecase( public async Task AskAi(AskAiRequest askAiRequest, Cancel ctx) { - // Start activity for the chat request - DO NOT use 'using' because the stream is consumed later - // The activity will be passed to the transformer which will dispose it when the stream completes - var activity = AskAiActivitySource.StartActivity("chat", ActivityKind.Client); - - // Generate a correlation ID for tracking if this is a new conversation - // For first messages (no ThreadId), generate a temporary ID that will be updated when the provider responds - var correlationId = askAiRequest.ThreadId ?? $"temp-{Guid.NewGuid()}"; - - // Set GenAI semantic convention attributes - _ = (activity?.SetTag("gen_ai.operation.name", "chat")); - _ = (activity?.SetTag("gen_ai.conversation.id", correlationId)); // Will be updated when we receive ConversationStart with actual ID - _ = (activity?.SetTag("gen_ai.usage.input_tokens", askAiRequest.Message.Length)); // Approximate token count - - // Custom attributes for tracking our abstraction layer - // We use custom attributes because we don't know the actual GenAI provider (OpenAI, Anthropic, etc.) - // or model (gpt-4, claude, etc.) - those are abstracted by AgentBuilder/LlmGateway - _ = (activity?.SetTag("docs.ai.gateway", streamTransformer.AgentProvider)); // agent-builder or llm-gateway - _ = (activity?.SetTag("docs.ai.agent_name", streamTransformer.AgentId)); // docs-agent or docs_assistant - - // Add GenAI prompt event - _ = (activity?.AddEvent(new ActivityEvent("gen_ai.content.prompt", - timestamp: DateTimeOffset.UtcNow, - tags: - [ - new KeyValuePair("gen_ai.prompt", askAiRequest.Message), - new KeyValuePair("gen_ai.system_instructions", AskAiRequest.SystemPrompt) - ]))); - - logger.LogDebug("Processing AskAiRequest: {Request}", askAiRequest); - + logger.LogInformation("Starting AskAI chat with {AgentProvider} and {AgentId}", streamTransformer.AgentProvider, streamTransformer.AgentId); + var activity = AskAiActivitySource.StartActivity($"chat", ActivityKind.Client); + _ = activity?.SetTag("gen_ai.operation.name", "chat"); + _ = activity?.SetTag("gen_ai.provider.name", streamTransformer.AgentProvider); // agent-builder or llm-gateway + _ = activity?.SetTag("gen_ai.agent.id", streamTransformer.AgentId); // docs-agent or docs_assistant + var inputMessages = new[] + { + new InputMessage("user", [new MessagePart("text", askAiRequest.Message)]) + }; + var inputMessagesJson = JsonSerializer.Serialize(inputMessages, ApiJsonContext.Default.InputMessageArray); + _ = activity?.SetTag("gen_ai.input.messages", inputMessagesJson); + logger.LogInformation("AskAI input message: {InputMessage}", askAiRequest.Message); + logger.LogInformation("Streaming AskAI response"); var rawStream = await askAiGateway.AskAi(askAiRequest, ctx); - // The stream transformer will handle disposing the activity when streaming completes var transformedStream = await streamTransformer.TransformAsync(rawStream, activity, ctx); - return transformedStream; } } diff --git a/src/api/Elastic.Documentation.Api.Core/SerializationContext.cs b/src/api/Elastic.Documentation.Api.Core/SerializationContext.cs index 9cae0eb22..00e519bdb 100644 --- a/src/api/Elastic.Documentation.Api.Core/SerializationContext.cs +++ b/src/api/Elastic.Documentation.Api.Core/SerializationContext.cs @@ -8,9 +8,21 @@ namespace Elastic.Documentation.Api.Core; +/// +/// Types for OpenTelemetry telemetry serialization (AOT-compatible) +/// +public record MessagePart(string Type, string Content); + +public record InputMessage(string Role, MessagePart[] Parts); + +public record OutputMessage(string Role, MessagePart[] Parts, string FinishReason); [JsonSerializable(typeof(AskAiRequest))] [JsonSerializable(typeof(SearchRequest))] [JsonSerializable(typeof(SearchResponse))] +[JsonSerializable(typeof(InputMessage))] +[JsonSerializable(typeof(OutputMessage))] +[JsonSerializable(typeof(MessagePart))] +[JsonSerializable(typeof(InputMessage[]))] [JsonSourceGenerationOptions(PropertyNamingPolicy = JsonKnownNamingPolicy.CamelCase)] public partial class ApiJsonContext : JsonSerializerContext; diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderStreamTransformer.cs b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderStreamTransformer.cs index 901b059f8..791f042d4 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderStreamTransformer.cs +++ b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderStreamTransformer.cs @@ -26,9 +26,7 @@ public class AgentBuilderStreamTransformer(ILogger - new AskAiEvent.Chunk(id, timestamp, textChunk.GetString()!), + new AskAiEvent.MessageChunk(id, timestamp, textChunk.GetString()!), "message_complete" when innerData.TryGetProperty("message_content", out var fullContent) => - new AskAiEvent.ChunkComplete(id, timestamp, fullContent.GetString()!), + new AskAiEvent.MessageComplete(id, timestamp, fullContent.GetString()!), "reasoning" => // Parse reasoning message if available @@ -76,7 +74,7 @@ public class AgentBuilderStreamTransformer(ILogger lo new AskAiEvent.ConversationStart(id, timestamp, Guid.NewGuid().ToString()), "ai_message_chunk" when messageData.TryGetProperty("content", out var content) => - new AskAiEvent.Chunk(id, timestamp, content.GetString()!), + new AskAiEvent.MessageChunk(id, timestamp, content.GetString()!), "ai_message" when messageData.TryGetProperty("content", out var fullContent) => - new AskAiEvent.ChunkComplete(id, timestamp, fullContent.GetString()!), + new AskAiEvent.MessageComplete(id, timestamp, fullContent.GetString()!), "tool_call" when messageData.TryGetProperty("toolCalls", out var toolCalls) => TransformToolCall(id, timestamp, toolCalls), @@ -56,6 +56,8 @@ public class LlmGatewayStreamTransformer(ILogger lo "agent_end" => new AskAiEvent.ConversationEnd(id, timestamp), + "error" => ParseErrorEvent(id, timestamp, messageData), + "chat_model_start" or "chat_model_end" => null, // Skip model lifecycle events @@ -110,4 +112,18 @@ public class LlmGatewayStreamTransformer(ILogger lo Logger.LogWarning("Unknown LLM Gateway event type: {Type}", type); return null; } + + private AskAiEvent.ErrorEvent ParseErrorEvent(string id, long timestamp, JsonElement messageData) + { + // LLM Gateway error format: {error: "...", message: "..."} + var errorMessage = messageData.TryGetProperty("message", out var msgProp) + ? msgProp.GetString() + : messageData.TryGetProperty("error", out var errProp) + ? errProp.GetString() + : null; + + Logger.LogError("Error event received from LLM Gateway: {ErrorMessage}", errorMessage ?? "Unknown error"); + + return new AskAiEvent.ErrorEvent(id, timestamp, errorMessage ?? "Unknown error occurred"); + } } diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerBase.cs b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerBase.cs index f374f8e54..60eede7f0 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerBase.cs +++ b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerBase.cs @@ -8,6 +8,7 @@ using System.Runtime.CompilerServices; using System.Text; using System.Text.Json; +using Elastic.Documentation.Api.Core; using Elastic.Documentation.Api.Core.AskAi; using Microsoft.Extensions.Logging; @@ -50,16 +51,8 @@ public abstract class StreamTransformerBase(ILogger logger) : IStreamTransformer /// public string AgentProvider => GetAgentProvider(); - public Task TransformAsync(Stream rawStream, Activity? parentActivity, CancellationToken cancellationToken = default) + public Task TransformAsync(Stream rawStream, Activity? parentActivity, Cancel cancellationToken = default) { - // Create a child activity for the transformation - DO NOT use 'using' because streaming happens asynchronously - var activity = StreamTransformerActivitySource.StartActivity($"chat {GetAgentId()}", ActivityKind.Client); - _ = (activity?.SetTag("gen_ai.operation.name", "chat")); - - // Custom attributes for tracking our abstraction layer - _ = (activity?.SetTag("docs.ai.gateway", GetAgentProvider())); - _ = (activity?.SetTag("docs.ai.agent_name", GetAgentId())); - // Configure pipe for low-latency streaming var pipeOptions = new PipeOptions( minimumSegmentSize: 1024, // Smaller segments for faster processing @@ -76,8 +69,8 @@ public Task TransformAsync(Stream rawStream, Activity? parentActivity, C // Start processing task to transform and write events to pipe // Note: We intentionally don't await this task as we need to return the stream immediately // The pipe handles synchronization and backpressure between producer and consumer - // Pass both parent and child activities - they will be disposed when streaming completes - _ = ProcessPipeAsync(reader, pipe.Writer, parentActivity, activity, cancellationToken); + // Pass parent activity - it will be disposed when streaming completes + _ = ProcessPipeAsync(reader, pipe.Writer, parentActivity, cancellationToken); // Return the read side of the pipe as a stream return Task.FromResult(pipe.Reader.AsStream()); @@ -87,11 +80,8 @@ public Task TransformAsync(Stream rawStream, Activity? parentActivity, C /// Process the pipe reader and write transformed events to the pipe writer. /// This runs concurrently with the consumer reading from the output stream. /// - private async Task ProcessPipeAsync(PipeReader reader, PipeWriter writer, Activity? parentActivity, Activity? transformActivity, CancellationToken cancellationToken) + private async Task ProcessPipeAsync(PipeReader reader, PipeWriter writer, Activity? parentActivity, CancellationToken cancellationToken) { - using var activity = StreamTransformerActivitySource.StartActivity("gen_ai.agent.pipe"); - _ = (activity?.SetTag("transformer.type", GetType().Name)); - try { try @@ -100,50 +90,12 @@ private async Task ProcessPipeAsync(PipeReader reader, PipeWriter writer, Activi } catch (OperationCanceledException ex) { - // Cancellation is expected and not an error - log as debug Logger.LogDebug(ex, "Stream processing was cancelled for transformer {TransformerType}", GetType().Name); - _ = (activity?.SetTag("gen_ai.response.error", true)); - _ = (activity?.SetTag("gen_ai.response.error_type", "OperationCanceledException")); - - // Add error event to activity - _ = (activity?.AddEvent(new ActivityEvent("gen_ai.error", - timestamp: DateTimeOffset.UtcNow, - tags: - [ - new KeyValuePair("gen_ai.error.type", "OperationCanceledException"), - new KeyValuePair("gen_ai.error.message", "Stream processing was cancelled"), - new KeyValuePair("gen_ai.transformer.type", GetType().Name) - ]))); - - try - { - await writer.CompleteAsync(ex); - await reader.CompleteAsync(ex); - } - catch (Exception completeEx) - { - Logger.LogError(completeEx, "Error completing pipe after cancellation for transformer {TransformerType}", GetType().Name); - } - return; } catch (Exception ex) { Logger.LogError(ex, "Error transforming stream for transformer {TransformerType}. Stream processing will be terminated.", GetType().Name); - _ = (activity?.SetTag("gen_ai.response.error", true)); - _ = (activity?.SetTag("gen_ai.response.error_type", ex.GetType().Name)); - _ = (activity?.SetTag("gen_ai.response.error_message", ex.Message)); - - // Add error event to activity - _ = (activity?.AddEvent(new ActivityEvent("gen_ai.error", - timestamp: DateTimeOffset.UtcNow, - tags: - [ - new KeyValuePair("gen_ai.error.type", ex.GetType().Name), - new KeyValuePair("gen_ai.error.message", ex.Message), - new KeyValuePair("gen_ai.transformer.type", GetType().Name), - new KeyValuePair("gen_ai.error.stack_trace", ex.StackTrace ?? "") - ]))); - + _ = parentActivity?.SetTag("error.type", ex.GetType().Name); try { await writer.CompleteAsync(ex); @@ -169,32 +121,30 @@ private async Task ProcessPipeAsync(PipeReader reader, PipeWriter writer, Activi } finally { - // Always dispose activities, regardless of how we exit - transformActivity?.Dispose(); parentActivity?.Dispose(); } } + /// /// Process the raw stream and write transformed events to the pipe writer. /// Default implementation parses SSE events and JSON, then calls TransformJsonEvent. /// - protected virtual async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, Activity? parentActivity, CancellationToken cancellationToken) + /// Stream processing result with metrics and captured output + private async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, Activity? parentActivity, CancellationToken cancellationToken) { - using var activity = StreamTransformerActivitySource.StartActivity("gen_ai.agent.stream"); - - // Custom attributes for tracking our abstraction layer - _ = (activity?.SetTag("docs.ai.gateway", GetAgentProvider())); - _ = (activity?.SetTag("docs.ai.agent_name", GetAgentId())); + using var activity = StreamTransformerActivitySource.StartActivity("transform_stream"); - var eventCount = 0; - var jsonParseErrors = 0; + if (parentActivity?.Id != null) + _ = activity?.SetParentId(parentActivity.Id); + List outputMessageParts = []; await foreach (var sseEvent in ParseSseEventsAsync(reader, cancellationToken)) { - eventCount++; - AskAiEvent? transformedEvent = null; + using var parseActivity = StreamTransformerActivitySource.StartActivity("parse_event"); + // parseActivity automatically inherits from Activity.Current (transform_stream) + AskAiEvent? transformedEvent; try { // Parse JSON once in base class @@ -206,39 +156,82 @@ protected virtual async Task ProcessStreamAsync(PipeReader reader, PipeWriter wr } catch (JsonException ex) { - jsonParseErrors++; Logger.LogError(ex, "Failed to parse JSON from SSE event for transformer {TransformerType}. EventType: {EventType}, Data: {Data}", GetType().Name, sseEvent.EventType, sseEvent.Data); - - // Add error event to activity for JSON parsing failures - _ = (activity?.AddEvent(new ActivityEvent("gen_ai.error", - timestamp: DateTimeOffset.UtcNow, - tags: - [ - new KeyValuePair("gen_ai.error.type", "JsonException"), - new KeyValuePair("gen_ai.error.message", ex.Message), - new KeyValuePair("gen_ai.transformer.type", GetType().Name), - new KeyValuePair("gen_ai.sse.event_type", sseEvent.EventType ?? "unknown"), - new KeyValuePair("gen_ai.sse.data", sseEvent.Data) - ]))); + throw; } - if (transformedEvent != null) + if (transformedEvent == null) + continue; + + // Set event type tag on parse_event activity + _ = parseActivity?.SetTag("ask_ai.event", transformedEvent.GetType().Name); + + switch (transformedEvent) { - // Update parent activity with conversation ID when we receive ConversationStart events - if (transformedEvent is AskAiEvent.ConversationStart conversationStart) - { - _ = (parentActivity?.SetTag("gen_ai.conversation.id", conversationStart.ConversationId)); - _ = (activity?.SetTag("gen_ai.conversation.id", conversationStart.ConversationId)); - } + case AskAiEvent.ConversationStart conversationStart: + { + _ = parentActivity?.SetTag("gen_ai.conversation.id", conversationStart.ConversationId); + _ = activity?.SetTag("gen_ai.conversation.id", conversationStart.ConversationId); + break; + } + case AskAiEvent.Reasoning reasoning: + { + outputMessageParts.Add(new MessagePart("reasoning", reasoning.Message ?? string.Empty)); + break; + } + case AskAiEvent.MessageChunk: + { + // Event type already tagged above + break; + } - await WriteEventAsync(transformedEvent, writer, cancellationToken); + case AskAiEvent.ErrorEvent errorEvent: + { + _ = activity?.SetStatus(ActivityStatusCode.Error, "AI provider error event"); + _ = activity?.SetTag("error.type", "AIProviderError"); + _ = activity?.SetTag("error.message", errorEvent.Message); + _ = parseActivity?.SetStatus(ActivityStatusCode.Error, errorEvent.Message); + break; + } + case AskAiEvent.ToolCall: + { + // Event type already tagged above + break; + } + case AskAiEvent.SearchToolCall searchToolCall: + { + _ = parseActivity?.SetTag("search.query", searchToolCall.SearchQuery); + break; + } + case AskAiEvent.ToolResult toolResult: + { + _ = parseActivity?.SetTag("tool.result_summary", toolResult.Result); + break; + } + case AskAiEvent.MessageComplete chunkComplete: + { + outputMessageParts.Add(new MessagePart("text", chunkComplete.FullContent)); + Logger.LogInformation("AskAI output message: {OutputMessage}", chunkComplete.FullContent); + break; + } + case AskAiEvent.ConversationEnd: + { + // Event type already tagged above + break; + } } + await WriteEventAsync(transformedEvent, writer, cancellationToken); } - // Set metrics on the activity using GenAI conventions - _ = (activity?.SetTag("gen_ai.response.token_count", eventCount)); - _ = (activity?.SetTag("gen_ai.response.error_count", jsonParseErrors)); + // Set output messages tag once after all events are processed + if (outputMessageParts.Count > 0) + { + var outputMessages = new OutputMessage("assistant", outputMessageParts.ToArray(), "stop"); + var outputMessagesJson = JsonSerializer.Serialize(outputMessages, ApiJsonContext.Default.OutputMessage); + _ = parentActivity?.SetTag("gen_ai.output.messages", outputMessagesJson); + _ = activity?.SetTag("gen_ai.output.messages", outputMessagesJson); + } } /// @@ -253,35 +246,17 @@ protected virtual async Task ProcessStreamAsync(PipeReader reader, PipeWriter wr /// /// Write a transformed event to the output stream /// - protected async Task WriteEventAsync(AskAiEvent? transformedEvent, PipeWriter writer, CancellationToken cancellationToken) + private async Task WriteEventAsync(AskAiEvent? transformedEvent, PipeWriter writer, CancellationToken cancellationToken) { if (transformedEvent == null) return; - - using var activity = StreamTransformerActivitySource.StartActivity("gen_ai.agent.token"); - - // Custom attributes for tracking our abstraction layer - _ = (activity?.SetTag("docs.ai.gateway", GetAgentProvider())); - _ = (activity?.SetTag("docs.ai.agent_name", GetAgentId())); - _ = (activity?.SetTag("gen_ai.response.token_type", transformedEvent.GetType().Name)); - try { - // Add GenAI completion event for each token/chunk - _ = (activity?.AddEvent(new ActivityEvent("gen_ai.content.completion", - timestamp: DateTimeOffset.UtcNow, - tags: - [ - new KeyValuePair("gen_ai.completion", JsonSerializer.Serialize(transformedEvent, AskAiEventJsonContext.Default.AskAiEvent)) - ]))); - // Serialize as base AskAiEvent type to include the type discriminator var json = JsonSerializer.Serialize(transformedEvent, AskAiEventJsonContext.Default.AskAiEvent); var sseData = $"data: {json}\n\n"; var bytes = Encoding.UTF8.GetBytes(sseData); - _ = (activity?.SetTag("gen_ai.response.token_size", bytes.Length)); - // Write to pipe and flush immediately for real-time streaming _ = await writer.WriteAsync(bytes, cancellationToken); _ = await writer.FlushAsync(cancellationToken); @@ -290,18 +265,6 @@ protected async Task WriteEventAsync(AskAiEvent? transformedEvent, PipeWriter wr { Logger.LogError(ex, "Error writing event to stream for transformer {TransformerType}. EventType: {EventType}", GetType().Name, transformedEvent.GetType().Name); - - // Add error event to activity - _ = (activity?.AddEvent(new ActivityEvent("gen_ai.error", - timestamp: DateTimeOffset.UtcNow, - tags: - [ - new KeyValuePair("gen_ai.error.type", ex.GetType().Name), - new KeyValuePair("gen_ai.error.message", ex.Message), - new KeyValuePair("gen_ai.transformer.type", GetType().Name), - new KeyValuePair("gen_ai.event.type", transformedEvent.GetType().Name) - ]))); - throw; // Re-throw to be handled by caller } } @@ -310,26 +273,17 @@ protected async Task WriteEventAsync(AskAiEvent? transformedEvent, PipeWriter wr /// Parse Server-Sent Events (SSE) from a PipeReader following the W3C SSE specification. /// This method handles the standard SSE format with event:, data:, and comment lines. /// - protected async IAsyncEnumerable ParseSseEventsAsync( + private static async IAsyncEnumerable ParseSseEventsAsync( PipeReader reader, [EnumeratorCancellation] CancellationToken cancellationToken) { - using var activity = StreamTransformerActivitySource.StartActivity("gen_ai.agent.parse"); - _ = (activity?.SetTag("gen_ai.agent.name", GetAgentId())); - _ = (activity?.SetTag("gen_ai.provider.name", GetAgentProvider())); - string? currentEvent = null; var dataBuilder = new StringBuilder(); - var eventsParsed = 0; - var readOperations = 0; - var totalBytesRead = 0L; while (!cancellationToken.IsCancellationRequested) { - readOperations++; var result = await reader.ReadAsync(cancellationToken); var buffer = result.Buffer; - totalBytesRead += buffer.Length; // Process all complete lines in the buffer while (TryReadLine(ref buffer, out var line)) @@ -340,24 +294,18 @@ protected async IAsyncEnumerable ParseSseEventsAsync( // Event type line if (line.StartsWith("event:", StringComparison.Ordinal)) - { - currentEvent = line.Substring(6).Trim(); - } + currentEvent = line[6..].Trim(); // Data line else if (line.StartsWith("data:", StringComparison.Ordinal)) - { - _ = dataBuilder.Append(line.Substring(5).Trim()); - } + _ = dataBuilder.Append(line[5..].Trim()); // Empty line - marks end of event else if (string.IsNullOrEmpty(line)) { - if (dataBuilder.Length > 0) - { - eventsParsed++; - yield return new SseEvent(currentEvent, dataBuilder.ToString()); - currentEvent = null; - _ = dataBuilder.Clear(); - } + if (dataBuilder.Length <= 0) + continue; + yield return new SseEvent(currentEvent, dataBuilder.ToString()); + currentEvent = null; + _ = dataBuilder.Clear(); } } @@ -365,22 +313,14 @@ protected async IAsyncEnumerable ParseSseEventsAsync( reader.AdvanceTo(buffer.Start, buffer.End); // Stop reading if there's no more data coming - if (result.IsCompleted) - { - // Yield any remaining event that hasn't been terminated with an empty line - if (dataBuilder.Length > 0) - { - eventsParsed++; - yield return new SseEvent(currentEvent, dataBuilder.ToString()); - } - break; - } - } + if (!result.IsCompleted) + continue; - // Set metrics on the activity using GenAI conventions - _ = (activity?.SetTag("gen_ai.response.token_count", eventsParsed)); - _ = (activity?.SetTag("gen_ai.request.input_size", totalBytesRead)); - _ = (activity?.SetTag("gen_ai.model.operation_count", readOperations)); + // Yield any remaining event that hasn't been terminated with an empty line + if (dataBuilder.Length > 0) + yield return new SseEvent(currentEvent, dataBuilder.ToString()); + break; + } } /// diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Elastic.Documentation.Api.Infrastructure.csproj b/src/api/Elastic.Documentation.Api.Infrastructure/Elastic.Documentation.Api.Infrastructure.csproj index e9b64a3b9..5d1c6c895 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/Elastic.Documentation.Api.Infrastructure.csproj +++ b/src/api/Elastic.Documentation.Api.Infrastructure/Elastic.Documentation.Api.Infrastructure.csproj @@ -19,6 +19,9 @@ + + + diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/OpenTelemetryExtensions.cs b/src/api/Elastic.Documentation.Api.Infrastructure/OpenTelemetryExtensions.cs new file mode 100644 index 000000000..c9651b03d --- /dev/null +++ b/src/api/Elastic.Documentation.Api.Infrastructure/OpenTelemetryExtensions.cs @@ -0,0 +1,52 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using Elastic.OpenTelemetry; +using Microsoft.Extensions.Hosting; +using OpenTelemetry; +using OpenTelemetry.Metrics; +using OpenTelemetry.Trace; + +namespace Elastic.Documentation.Api.Infrastructure; + +public static class OpenTelemetryExtensions +{ + /// + /// Configures Elastic OpenTelemetry (EDOT) for the Docs API. + /// Only enables if OTEL_EXPORTER_OTLP_ENDPOINT environment variable is set. + /// + /// The web application builder + /// The builder for chaining + public static TBuilder AddDocsApiOpenTelemetry( + this TBuilder builder) + where TBuilder : IHostApplicationBuilder + { + var options = new ElasticOpenTelemetryOptions + { + SkipOtlpExporter = true, // Disable OTLP exporter + SkipInstrumentationAssemblyScanning = true // Disable instrumentation assembly scanning for AOT + }; + + _ = builder.AddElasticOpenTelemetry(options, edotBuilder => + { + _ = edotBuilder + .WithElasticLogging() + .WithElasticTracing(tracing => + { + _ = tracing + .AddSource("Elastic.Documentation.Api.AskAi") + .AddSource("Elastic.Documentation.Api.StreamTransformer") + .AddAspNetCoreInstrumentation() + .AddHttpClientInstrumentation(); + }) + .WithElasticMetrics(metrics => + { + _ = metrics + .AddAspNetCoreInstrumentation() + .AddHttpClientInstrumentation(); + }); + }); + return builder; + } +} diff --git a/src/api/Elastic.Documentation.Api.Lambda/Program.cs b/src/api/Elastic.Documentation.Api.Lambda/Program.cs index c37e90269..a9f51dbda 100644 --- a/src/api/Elastic.Documentation.Api.Lambda/Program.cs +++ b/src/api/Elastic.Documentation.Api.Lambda/Program.cs @@ -8,49 +8,12 @@ using Elastic.Documentation.Api.Core.AskAi; using Elastic.Documentation.Api.Core.Search; using Elastic.Documentation.Api.Infrastructure; -using Elastic.Documentation.ServiceDefaults; -using OpenTelemetry; -using OpenTelemetry.Metrics; -using OpenTelemetry.Trace; try { - var process = System.Diagnostics.Process.GetCurrentProcess(); - Console.WriteLine($"Starting Lambda application... Memory: {process.WorkingSet64 / 1024 / 1024} MB"); - var builder = WebApplication.CreateSlimBuilder(args); - process.Refresh(); - Console.WriteLine($"WebApplication builder created. Memory: {process.WorkingSet64 / 1024 / 1024} MB"); - // Add logging configuration for Lambda - _ = builder.Services.AddElasticDocumentationLogging(LogLevel.Information); - process.Refresh(); - Console.WriteLine($"Logging configured. Memory: {process.WorkingSet64 / 1024 / 1024} MB"); - - _ = builder.AddElasticOpenTelemetry(edotBuilder => - { - _ = edotBuilder - .WithElasticTracing(tracing => - { - _ = tracing - .AddAspNetCoreInstrumentation() - .AddHttpClientInstrumentation() - .AddSource("Elastic.Documentation.Api.AskAi") - .AddSource("Elastic.Documentation.Api.StreamTransformer"); - }) - .WithElasticLogging() - .WithElasticMetrics(metrics => - { - _ = metrics - .AddAspNetCoreInstrumentation() - .AddHttpClientInstrumentation() - .AddProcessInstrumentation() - .AddRuntimeInstrumentation(); - }); - }); - - process.Refresh(); - Console.WriteLine($"Elastic OTel configured. Memory: {process.WorkingSet64 / 1024 / 1024} MB"); + _ = builder.AddDocsApiOpenTelemetry(); // If we are running in Lambda Web Adapter response_stream mode, configure Kestrel to listen on port 8080 // Otherwise, configure AWS Lambda hosting for API Gateway HTTP API @@ -69,21 +32,11 @@ _ = builder.Services.AddAWSLambdaHosting(LambdaEventSource.HttpApi, new SourceGeneratorLambdaJsonSerializer()); _ = builder.WebHost.UseKestrelHttpsConfiguration(); } - - process.Refresh(); - Console.WriteLine($"Kestrel configured to listen on port 8080. Memory: {process.WorkingSet64 / 1024 / 1024} MB"); - var environment = Environment.GetEnvironmentVariable("ENVIRONMENT"); - Console.WriteLine($"Environment: {environment}"); + Console.WriteLine($"Docs Environment: {environment}"); builder.Services.AddElasticDocsApiUsecases(environment); - process.Refresh(); - Console.WriteLine($"Elastic docs API use cases added. Memory: {process.WorkingSet64 / 1024 / 1024} MB"); - var app = builder.Build(); - process.Refresh(); - Console.WriteLine($"Application built successfully. Memory: {process.WorkingSet64 / 1024 / 1024} MB"); - var v1 = app.MapGroup("/docs/_api/v1"); v1.MapElasticDocsApiEndpoints(); Console.WriteLine("API endpoints mapped"); diff --git a/src/tooling/docs-builder/Http/DocumentationWebHost.cs b/src/tooling/docs-builder/Http/DocumentationWebHost.cs index dfc0a9aef..82f36367b 100644 --- a/src/tooling/docs-builder/Http/DocumentationWebHost.cs +++ b/src/tooling/docs-builder/Http/DocumentationWebHost.cs @@ -44,6 +44,7 @@ IConfigurationContext configurationContext _writeFileSystem = writeFs; var builder = WebApplication.CreateSlimBuilder(); _ = builder.AddDocumentationServiceDefaults(); + #if DEBUG builder.Services.AddElasticDocsApiUsecases("dev"); #endif diff --git a/src/tooling/docs-builder/docs-builder.csproj b/src/tooling/docs-builder/docs-builder.csproj index 432973fc4..a153355b2 100644 --- a/src/tooling/docs-builder/docs-builder.csproj +++ b/src/tooling/docs-builder/docs-builder.csproj @@ -23,6 +23,8 @@ + + From 2ecc35dcad5ee91e8a4b3448eb2e1ceb5e55c5f5 Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Mon, 3 Nov 2025 23:32:25 +0100 Subject: [PATCH 02/14] Add comment --- .../OpenTelemetryExtensions.cs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/OpenTelemetryExtensions.cs b/src/api/Elastic.Documentation.Api.Infrastructure/OpenTelemetryExtensions.cs index c9651b03d..3588e8961 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/OpenTelemetryExtensions.cs +++ b/src/api/Elastic.Documentation.Api.Infrastructure/OpenTelemetryExtensions.cs @@ -24,7 +24,11 @@ public static TBuilder AddDocsApiOpenTelemetry( { var options = new ElasticOpenTelemetryOptions { - SkipOtlpExporter = true, // Disable OTLP exporter + // TODO: I don't think we really want to set `SkipOtlpExporter=true`. + // But without it, EDOT is sending duplicated traces and spans to the OTLP endpoint. + // Needs investigation. + // *However*, this makes it work correctly. + SkipOtlpExporter = true, SkipInstrumentationAssemblyScanning = true // Disable instrumentation assembly scanning for AOT }; From ef762908c834b0b1af22e82e402ead85e303b4e9 Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Mon, 3 Nov 2025 23:41:44 +0100 Subject: [PATCH 03/14] Fix naming --- .../AskAi/StreamingAiMessage.tsx | 9 +++- .../SearchOrAskAi/AskAi/useAskAi.ts | 46 ++++++++++++++++--- .../Adapters/AskAi/StreamTransformerBase.cs | 6 +-- 3 files changed, 50 insertions(+), 11 deletions(-) diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/StreamingAiMessage.tsx b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/StreamingAiMessage.tsx index b1a9d08f2..0befdbe1d 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/StreamingAiMessage.tsx +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/StreamingAiMessage.tsx @@ -48,7 +48,14 @@ export const StreamingAiMessage = ({ updateAiMessage(message.id, contentRef.current, 'complete') } }, - onError: () => { + onError: (error) => { + console.error('[AI Provider] Error in StreamingAiMessage:', { + messageId: message.id, + errorMessage: error.message, + errorStack: error.stack, + errorName: error.name, + fullError: error, + }) updateAiMessage( message.id, message.content || 'Error occurred', diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/useAskAi.ts b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/useAskAi.ts index 340939103..41ef560b5 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/useAskAi.ts +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/useAskAi.ts @@ -61,18 +61,44 @@ export const useAskAi = (props: Props): UseAskAiResponse => { const onMessage = useCallback( (sseEvent: EventSourceMessage) => { try { - // Parse and validate the canonical AskAiEvent format + // Parse JSON first const rawData = JSON.parse(sseEvent.data) - const askAiEvent = AskAiEventSchema.parse(rawData) - processMessage(askAiEvent) + // Use safeParse with reportInput to include input data in validation errors + const result = AskAiEventSchema.safeParse(rawData, { + reportInput: true, + }) + + if (!result.success) { + // Log detailed validation errors with input data + console.error('[AI Provider] Failed to parse SSE event:', { + eventId: sseEvent.id || 'unknown', + eventType: sseEvent.event || 'unknown', + rawEventData: sseEvent.data, + validationErrors: result.error.issues, + }) + throw new Error( + `Event validation failed: ${result.error.issues.map((e) => `${e.path.join('.')}: ${e.message}`).join('; ')}` + ) + } + + processMessage(result.data) } catch (error) { + // Handle JSON parsing errors or other unexpected errors + if (error instanceof Error && error.message.includes('Event validation failed')) { + // Already logged above, just re-throw + throw error + } + + // Log JSON parsing or other errors console.error('[AI Provider] Failed to parse SSE event:', { - eventData: sseEvent.data, - error: - error instanceof Error ? error.message : String(error), + eventId: sseEvent.id || 'unknown', + eventType: sseEvent.event || 'unknown', + rawEventData: sseEvent.data, + error: error instanceof Error ? error.message : String(error), + errorStack: error instanceof Error ? error.stack : undefined, }) - // Re-throw to trigger onError handler + throw new Error( `Event parsing failed: ${error instanceof Error ? error.message : String(error)}` ) @@ -86,6 +112,12 @@ export const useAskAi = (props: Props): UseAskAiResponse => { headers, onMessage, onError: (error) => { + console.error('[AI Provider] Error in useFetchEventSource:', { + errorMessage: error.message, + errorStack: error.stack, + errorName: error.name, + fullError: error, + }) setError(error) props.onError?.(error) }, diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerBase.cs b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerBase.cs index 60eede7f0..93d5834a9 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerBase.cs +++ b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerBase.cs @@ -209,10 +209,10 @@ private async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, Acti _ = parseActivity?.SetTag("tool.result_summary", toolResult.Result); break; } - case AskAiEvent.MessageComplete chunkComplete: + case AskAiEvent.MessageComplete messageComplete: { - outputMessageParts.Add(new MessagePart("text", chunkComplete.FullContent)); - Logger.LogInformation("AskAI output message: {OutputMessage}", chunkComplete.FullContent); + outputMessageParts.Add(new MessagePart("text", messageComplete.FullContent)); + Logger.LogInformation("AskAI output message: {OutputMessage}", messageComplete.FullContent); break; } case AskAiEvent.ConversationEnd: From 78c74368d1fef61926d717f0105757e3dfdfcdf4 Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Tue, 4 Nov 2025 00:12:35 +0100 Subject: [PATCH 04/14] Run prettier --- .../web-components/SearchOrAskAi/AskAi/useAskAi.ts | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/useAskAi.ts b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/useAskAi.ts index 41ef560b5..3020a9997 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/useAskAi.ts +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/useAskAi.ts @@ -85,7 +85,10 @@ export const useAskAi = (props: Props): UseAskAiResponse => { processMessage(result.data) } catch (error) { // Handle JSON parsing errors or other unexpected errors - if (error instanceof Error && error.message.includes('Event validation failed')) { + if ( + error instanceof Error && + error.message.includes('Event validation failed') + ) { // Already logged above, just re-throw throw error } @@ -95,8 +98,10 @@ export const useAskAi = (props: Props): UseAskAiResponse => { eventId: sseEvent.id || 'unknown', eventType: sseEvent.event || 'unknown', rawEventData: sseEvent.data, - error: error instanceof Error ? error.message : String(error), - errorStack: error instanceof Error ? error.stack : undefined, + error: + error instanceof Error ? error.message : String(error), + errorStack: + error instanceof Error ? error.stack : undefined, }) throw new Error( From 7681f14a85f5a8aeb7e76f69c6f57a5a9fe77a4f Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Tue, 4 Nov 2025 10:19:17 +0100 Subject: [PATCH 05/14] Fix duplicate spans and other adjustments --- .../AskAi/AgentBuilderStreamTransformer.cs | 9 +++--- .../Adapters/AskAi/StreamTransformerBase.cs | 32 ++++++++++++------- .../OpenTelemetryExtensions.cs | 11 ++----- 3 files changed, 28 insertions(+), 24 deletions(-) diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderStreamTransformer.cs b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderStreamTransformer.cs index 791f042d4..d8e6b1946 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderStreamTransformer.cs +++ b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderStreamTransformer.cs @@ -24,10 +24,6 @@ public class AgentBuilderStreamTransformer(ILogger + ParseErrorEvent(id, timestamp, json), + "conversation_id_set" when innerData.TryGetProperty("conversation_id", out var convId) => new AskAiEvent.ConversationStart(id, timestamp, convId.GetString()!), @@ -126,7 +125,7 @@ private static AskAiEvent ParseToolCallEvent(string id, long timestamp, JsonElem return new AskAiEvent.ToolCall(id, timestamp, toolCallId ?? id, toolId ?? "unknown", args); } - private static AskAiEvent.ErrorEvent ParseErrorEventFromRoot(string id, long timestamp, JsonElement root) + private static AskAiEvent.ErrorEvent ParseErrorEvent(string id, long timestamp, JsonElement root) { // Agent Builder sends: {"error":{"code":"...","message":"...","meta":{...}}} var errorMessage = root.TryGetProperty("error", out var errorProp) && diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerBase.cs b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerBase.cs index 93d5834a9..b42612ca6 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerBase.cs +++ b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerBase.cs @@ -133,7 +133,7 @@ private async Task ProcessPipeAsync(PipeReader reader, PipeWriter writer, Activi /// Stream processing result with metrics and captured output private async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, Activity? parentActivity, CancellationToken cancellationToken) { - using var activity = StreamTransformerActivitySource.StartActivity("transform_stream"); + using var activity = StreamTransformerActivitySource.StartActivity(nameof(ProcessStreamAsync)); if (parentActivity?.Id != null) _ = activity?.SetParentId(parentActivity.Id); @@ -141,9 +141,7 @@ private async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, Acti List outputMessageParts = []; await foreach (var sseEvent in ParseSseEventsAsync(reader, cancellationToken)) { - using var parseActivity = StreamTransformerActivitySource.StartActivity("parse_event"); - // parseActivity automatically inherits from Activity.Current (transform_stream) - + using var parseActivity = StreamTransformerActivitySource.StartActivity("AskAI Event"); AskAiEvent? transformedEvent; try { @@ -162,10 +160,15 @@ private async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, Acti } if (transformedEvent == null) + { + Logger.LogWarning("Transformed event is null for transformer {TransformerType}. Skipping event. EventType: {EventType}", + GetType().Name, sseEvent.EventType); continue; + } // Set event type tag on parse_event activity - _ = parseActivity?.SetTag("ask_ai.event", transformedEvent.GetType().Name); + _ = parseActivity?.SetTag("ask_ai.event.type", transformedEvent.GetType().Name); + _ = parseActivity?.SetTag("gen_ai.respone.id", transformedEvent.Id); switch (transformedEvent) { @@ -173,15 +176,18 @@ private async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, Acti { _ = parentActivity?.SetTag("gen_ai.conversation.id", conversationStart.ConversationId); _ = activity?.SetTag("gen_ai.conversation.id", conversationStart.ConversationId); + Logger.LogDebug("AskAI conversation started: {ConversationId}", conversationStart.ConversationId); break; } case AskAiEvent.Reasoning reasoning: { + Logger.LogDebug("AskAI reasoning: {ReasoningMessage}", reasoning.Message); outputMessageParts.Add(new MessagePart("reasoning", reasoning.Message ?? string.Empty)); break; } - case AskAiEvent.MessageChunk: + case AskAiEvent.MessageChunk messageChunk: { + Logger.LogDebug("AskAI message chunk: {ChunkContent}", messageChunk.Content); // Event type already tagged above break; } @@ -192,21 +198,24 @@ private async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, Acti _ = activity?.SetTag("error.type", "AIProviderError"); _ = activity?.SetTag("error.message", errorEvent.Message); _ = parseActivity?.SetStatus(ActivityStatusCode.Error, errorEvent.Message); + Logger.LogError("AskAI error event: {Message}", errorEvent.Message); break; } - case AskAiEvent.ToolCall: + case AskAiEvent.ToolCall toolCall: { // Event type already tagged above + Logger.LogDebug("AskAI tool call: {ToolCall}", toolCall.ToolName); break; } case AskAiEvent.SearchToolCall searchToolCall: { _ = parseActivity?.SetTag("search.query", searchToolCall.SearchQuery); + Logger.LogDebug("AskAI search tool call: {SearchQuery}", searchToolCall.SearchQuery); break; } case AskAiEvent.ToolResult toolResult: { - _ = parseActivity?.SetTag("tool.result_summary", toolResult.Result); + Logger.LogDebug("AskAI tool result: {ToolResult}", toolResult.Result); break; } case AskAiEvent.MessageComplete messageComplete: @@ -215,9 +224,9 @@ private async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, Acti Logger.LogInformation("AskAI output message: {OutputMessage}", messageComplete.FullContent); break; } - case AskAiEvent.ConversationEnd: + case AskAiEvent.ConversationEnd conversationEnd: { - // Event type already tagged above + Logger.LogDebug("AskAI conversation end: {ConversationId}", conversationEnd.Id); break; } } @@ -275,7 +284,8 @@ private async Task WriteEventAsync(AskAiEvent? transformedEvent, PipeWriter writ /// private static async IAsyncEnumerable ParseSseEventsAsync( PipeReader reader, - [EnumeratorCancellation] CancellationToken cancellationToken) + [EnumeratorCancellation] Cancel cancellationToken + ) { string? currentEvent = null; var dataBuilder = new StringBuilder(); diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/OpenTelemetryExtensions.cs b/src/api/Elastic.Documentation.Api.Infrastructure/OpenTelemetryExtensions.cs index 3588e8961..9f424bb44 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/OpenTelemetryExtensions.cs +++ b/src/api/Elastic.Documentation.Api.Infrastructure/OpenTelemetryExtensions.cs @@ -24,19 +24,14 @@ public static TBuilder AddDocsApiOpenTelemetry( { var options = new ElasticOpenTelemetryOptions { - // TODO: I don't think we really want to set `SkipOtlpExporter=true`. - // But without it, EDOT is sending duplicated traces and spans to the OTLP endpoint. - // Needs investigation. - // *However*, this makes it work correctly. - SkipOtlpExporter = true, SkipInstrumentationAssemblyScanning = true // Disable instrumentation assembly scanning for AOT }; _ = builder.AddElasticOpenTelemetry(options, edotBuilder => { _ = edotBuilder - .WithElasticLogging() - .WithElasticTracing(tracing => + .WithLogging() + .WithTracing(tracing => { _ = tracing .AddSource("Elastic.Documentation.Api.AskAi") @@ -44,7 +39,7 @@ public static TBuilder AddDocsApiOpenTelemetry( .AddAspNetCoreInstrumentation() .AddHttpClientInstrumentation(); }) - .WithElasticMetrics(metrics => + .WithMetrics(metrics => { _ = metrics .AddAspNetCoreInstrumentation() From ea2935e9613151f8bcb2cac2a8e3fc2461dbc2b3 Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Tue, 4 Nov 2025 11:29:08 +0100 Subject: [PATCH 06/14] Fix setting conversationId --- .../AskAi/AskAiUsecase.cs | 5 +++-- .../AskAi/IStreamTransformer.cs | 3 ++- .../AskAi/AgentBuilderStreamTransformer.cs | 2 +- .../AskAi/LlmGatewayStreamTransformer.cs | 11 +++++----- .../Adapters/AskAi/StreamTransformerBase.cs | 20 ++++++++++--------- 5 files changed, 23 insertions(+), 18 deletions(-) diff --git a/src/api/Elastic.Documentation.Api.Core/AskAi/AskAiUsecase.cs b/src/api/Elastic.Documentation.Api.Core/AskAi/AskAiUsecase.cs index 525aca2ec..245ae84fd 100644 --- a/src/api/Elastic.Documentation.Api.Core/AskAi/AskAiUsecase.cs +++ b/src/api/Elastic.Documentation.Api.Core/AskAi/AskAiUsecase.cs @@ -4,7 +4,6 @@ using System.Diagnostics; using System.Text.Json; -using Elastic.Documentation.Api.Core; using Microsoft.Extensions.Logging; namespace Elastic.Documentation.Api.Core.AskAi; @@ -23,6 +22,8 @@ public async Task AskAi(AskAiRequest askAiRequest, Cancel ctx) _ = activity?.SetTag("gen_ai.operation.name", "chat"); _ = activity?.SetTag("gen_ai.provider.name", streamTransformer.AgentProvider); // agent-builder or llm-gateway _ = activity?.SetTag("gen_ai.agent.id", streamTransformer.AgentId); // docs-agent or docs_assistant + if (askAiRequest.ThreadId is not null) + _ = activity?.SetTag("gen_ai.conversation.id", askAiRequest.ThreadId); var inputMessages = new[] { new InputMessage("user", [new MessagePart("text", askAiRequest.Message)]) @@ -33,7 +34,7 @@ public async Task AskAi(AskAiRequest askAiRequest, Cancel ctx) logger.LogInformation("Streaming AskAI response"); var rawStream = await askAiGateway.AskAi(askAiRequest, ctx); // The stream transformer will handle disposing the activity when streaming completes - var transformedStream = await streamTransformer.TransformAsync(rawStream, activity, ctx); + var transformedStream = await streamTransformer.TransformAsync(rawStream, askAiRequest.ThreadId, activity, ctx); return transformedStream; } } diff --git a/src/api/Elastic.Documentation.Api.Core/AskAi/IStreamTransformer.cs b/src/api/Elastic.Documentation.Api.Core/AskAi/IStreamTransformer.cs index b5b2455bd..097d17d98 100644 --- a/src/api/Elastic.Documentation.Api.Core/AskAi/IStreamTransformer.cs +++ b/src/api/Elastic.Documentation.Api.Core/AskAi/IStreamTransformer.cs @@ -23,8 +23,9 @@ public interface IStreamTransformer /// Transforms a raw SSE stream into a stream of AskAiEvent objects /// /// Raw SSE stream from gateway (Agent Builder, LLM Gateway, etc.) + /// Thread/conversation ID (if known) /// Parent activity to track the streaming operation (will be disposed when stream completes) /// Cancellation token /// Stream containing SSE-formatted AskAiEvent objects - Task TransformAsync(Stream rawStream, System.Diagnostics.Activity? parentActivity, CancellationToken cancellationToken = default); + Task TransformAsync(Stream rawStream, string? threadId, System.Diagnostics.Activity? parentActivity, CancellationToken cancellationToken = default); } diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderStreamTransformer.cs b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderStreamTransformer.cs index d8e6b1946..38efb93d8 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderStreamTransformer.cs +++ b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderStreamTransformer.cs @@ -18,7 +18,7 @@ public class AgentBuilderStreamTransformer(ILogger AgentBuilderAskAiGateway.ModelName; protected override string GetAgentProvider() => AgentBuilderAskAiGateway.ProviderName; - protected override AskAiEvent? TransformJsonEvent(string? eventType, JsonElement json) + protected override AskAiEvent? TransformJsonEvent(string? threadId, string? eventType, JsonElement json) { var type = eventType ?? "message"; var timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayStreamTransformer.cs b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayStreamTransformer.cs index 2c2857696..f5b5a42e3 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayStreamTransformer.cs +++ b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayStreamTransformer.cs @@ -18,7 +18,7 @@ public class LlmGatewayStreamTransformer(ILogger lo { protected override string GetAgentId() => LlmGatewayAskAiGateway.ModelName; protected override string GetAgentProvider() => LlmGatewayAskAiGateway.ProviderName; - protected override AskAiEvent? TransformJsonEvent(string? eventType, JsonElement json) + protected override AskAiEvent? TransformJsonEvent(string? threadId, string? eventType, JsonElement json) { // LLM Gateway format: ["custom", {type: "...", ...}] if (json.ValueKind != JsonValueKind.Array || json.GetArrayLength() < 2) @@ -34,12 +34,13 @@ public class LlmGatewayStreamTransformer(ILogger lo var id = message.GetProperty("id").GetString()!; var messageData = message.GetProperty("data"); + // LLM gateway does not emit conversation start events with thread IDs + // so we create a synthetic conversation start event here + if (threadId is null) + return new AskAiEvent.ConversationStart(id, timestamp, Guid.NewGuid().ToString()); + return type switch { - "agent_start" => - // LLM Gateway doesn't provide conversation ID, so generate one - new AskAiEvent.ConversationStart(id, timestamp, Guid.NewGuid().ToString()), - "ai_message_chunk" when messageData.TryGetProperty("content", out var content) => new AskAiEvent.MessageChunk(id, timestamp, content.GetString()!), diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerBase.cs b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerBase.cs index b42612ca6..3fd101187 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerBase.cs +++ b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerBase.cs @@ -51,7 +51,7 @@ public abstract class StreamTransformerBase(ILogger logger) : IStreamTransformer /// public string AgentProvider => GetAgentProvider(); - public Task TransformAsync(Stream rawStream, Activity? parentActivity, Cancel cancellationToken = default) + public Task TransformAsync(Stream rawStream, string? threadId, Activity? parentActivity, Cancel cancellationToken = default) { // Configure pipe for low-latency streaming var pipeOptions = new PipeOptions( @@ -70,7 +70,7 @@ public Task TransformAsync(Stream rawStream, Activity? parentActivity, C // Note: We intentionally don't await this task as we need to return the stream immediately // The pipe handles synchronization and backpressure between producer and consumer // Pass parent activity - it will be disposed when streaming completes - _ = ProcessPipeAsync(reader, pipe.Writer, parentActivity, cancellationToken); + _ = ProcessPipeAsync(reader, pipe.Writer, threadId, parentActivity, cancellationToken); // Return the read side of the pipe as a stream return Task.FromResult(pipe.Reader.AsStream()); @@ -80,13 +80,13 @@ public Task TransformAsync(Stream rawStream, Activity? parentActivity, C /// Process the pipe reader and write transformed events to the pipe writer. /// This runs concurrently with the consumer reading from the output stream. /// - private async Task ProcessPipeAsync(PipeReader reader, PipeWriter writer, Activity? parentActivity, CancellationToken cancellationToken) + private async Task ProcessPipeAsync(PipeReader reader, PipeWriter writer, string? threadId, Activity? parentActivity, CancellationToken cancellationToken) { try { try { - await ProcessStreamAsync(reader, writer, parentActivity, cancellationToken); + await ProcessStreamAsync(reader, writer, threadId, parentActivity, cancellationToken); } catch (OperationCanceledException ex) { @@ -131,7 +131,7 @@ private async Task ProcessPipeAsync(PipeReader reader, PipeWriter writer, Activi /// Default implementation parses SSE events and JSON, then calls TransformJsonEvent. /// /// Stream processing result with metrics and captured output - private async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, Activity? parentActivity, CancellationToken cancellationToken) + private async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, string? threadId, Activity? parentActivity, CancellationToken cancellationToken) { using var activity = StreamTransformerActivitySource.StartActivity(nameof(ProcessStreamAsync)); @@ -141,7 +141,6 @@ private async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, Acti List outputMessageParts = []; await foreach (var sseEvent in ParseSseEventsAsync(reader, cancellationToken)) { - using var parseActivity = StreamTransformerActivitySource.StartActivity("AskAI Event"); AskAiEvent? transformedEvent; try { @@ -150,7 +149,7 @@ private async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, Acti var root = doc.RootElement; // Subclass transforms JsonElement to AskAiEvent - transformedEvent = TransformJsonEvent(sseEvent.EventType, root); + transformedEvent = TransformJsonEvent(threadId, sseEvent.EventType, root); } catch (JsonException ex) { @@ -166,9 +165,11 @@ private async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, Acti continue; } + using var parseActivity = StreamTransformerActivitySource.StartActivity("AskAI Event"); + // Set event type tag on parse_event activity _ = parseActivity?.SetTag("ask_ai.event.type", transformedEvent.GetType().Name); - _ = parseActivity?.SetTag("gen_ai.respone.id", transformedEvent.Id); + _ = parseActivity?.SetTag("gen_ai.response.id", transformedEvent.Id); switch (transformedEvent) { @@ -247,10 +248,11 @@ private async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, Acti /// Transform a parsed JSON event into an AskAiEvent. /// Subclasses implement provider-specific transformation logic. /// + /// The conversation/thread ID, if available /// The SSE event type (from "event:" field), or null if not present /// The parsed JSON data from the "data:" field /// The transformed AskAiEvent, or null to skip this event - protected abstract AskAiEvent? TransformJsonEvent(string? eventType, JsonElement json); + protected abstract AskAiEvent? TransformJsonEvent(string? threadId, string? eventType, JsonElement json); /// /// Write a transformed event to the output stream From 0d2a09af9b5a803ad13a804a48b1de22dbbc72a1 Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Tue, 4 Nov 2025 11:35:20 +0100 Subject: [PATCH 07/14] Change naming from threadId to conversationId --- .../SearchOrAskAi/AskAi/Chat.test.tsx | 8 ++++---- .../SearchOrAskAi/AskAi/ChatMessage.test.tsx | 10 +++++----- .../AskAi/StreamingAiMessage.tsx | 12 +++++------ .../SearchOrAskAi/AskAi/chat.store.test.ts | 2 +- .../SearchOrAskAi/AskAi/chat.store.ts | 20 +++++++++---------- .../SearchOrAskAi/AskAi/useAskAi.ts | 8 ++++---- .../AskAi/AskAiUsecase.cs | 8 ++++---- .../AskAi/IStreamTransformer.cs | 4 ++-- .../AskAi/AgentBuilderAskAiGateway.cs | 5 ++--- .../AskAi/AgentBuilderStreamTransformer.cs | 2 +- .../Adapters/AskAi/LlmGatewayAskAiGateway.cs | 2 +- .../AskAi/LlmGatewayStreamTransformer.cs | 4 ++-- .../Adapters/AskAi/StreamTransformerBase.cs | 16 +++++++-------- 13 files changed, 50 insertions(+), 51 deletions(-) diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/Chat.test.tsx b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/Chat.test.tsx index 2383034a8..70773b284 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/Chat.test.tsx +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/Chat.test.tsx @@ -94,14 +94,14 @@ describe('Chat Component', () => { id: '1', type: 'user' as const, content: 'What is Elasticsearch?', - threadId: 'thread-1', + conversationId: 'thread-1', timestamp: Date.now(), }, { id: '2', type: 'ai' as const, content: 'Elasticsearch is a search engine...', - threadId: 'thread-1', + conversationId: 'thread-1', timestamp: Date.now(), status: 'complete' as const, }, @@ -245,14 +245,14 @@ describe('Chat Component', () => { id: '1', type: 'user' as const, content: 'Question', - threadId: 'thread-1', + conversationId: 'thread-1', timestamp: Date.now(), }, { id: '2', type: 'ai' as const, content: 'Answer...', - threadId: 'thread-1', + conversationId: 'thread-1', timestamp: Date.now(), status: 'streaming' as const, }, diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/ChatMessage.test.tsx b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/ChatMessage.test.tsx index ba7b04b6b..5aa1ba5c3 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/ChatMessage.test.tsx +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/ChatMessage.test.tsx @@ -13,7 +13,7 @@ describe('ChatMessage Component', () => { id: '1', type: 'user', content: 'What is Elasticsearch?', - threadId: 'thread-1', + conversationId: 'thread-1', timestamp: Date.now(), } @@ -44,7 +44,7 @@ describe('ChatMessage Component', () => { id: '2', type: 'ai', content: 'Elasticsearch is a distributed search engine...', - threadId: 'thread-1', + conversationId: 'thread-1', timestamp: Date.now(), status: 'complete', } @@ -102,7 +102,7 @@ describe('ChatMessage Component', () => { id: '3', type: 'ai', content: 'Elasticsearch is...', - threadId: 'thread-1', + conversationId: 'thread-1', timestamp: Date.now(), status: 'streaming', } @@ -142,7 +142,7 @@ describe('ChatMessage Component', () => { id: '4', type: 'ai', content: 'Previous content...', - threadId: 'thread-1', + conversationId: 'thread-1', timestamp: Date.now(), status: 'error', } @@ -176,7 +176,7 @@ describe('ChatMessage Component', () => { id: '5', type: 'ai', content: '# Heading\n\n**Bold text** and *italic*', - threadId: 'thread-1', + conversationId: 'thread-1', timestamp: Date.now(), status: 'complete', } diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/StreamingAiMessage.tsx b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/StreamingAiMessage.tsx index 0befdbe1d..7b526d34f 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/StreamingAiMessage.tsx +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/StreamingAiMessage.tsx @@ -3,7 +3,7 @@ import { ChatMessage } from './ChatMessage' import { ChatMessage as ChatMessageType, useChatActions, - useThreadId, + useConversationId, } from './chat.store' import { useAskAi } from './useAskAi' import * as React from 'react' @@ -22,18 +22,18 @@ export const StreamingAiMessage = ({ updateAiMessage, hasMessageBeenSent, markMessageAsSent, - setThreadId, + setConversationId, } = useChatActions() - const threadId = useThreadId() + const conversationId = useConversationId() const contentRef = useRef('') const { events, sendQuestion } = useAskAi({ - threadId: threadId ?? undefined, + conversationId: conversationId ?? undefined, onEvent: (event) => { if (event.type === EventTypes.CONVERSATION_START) { // Capture conversationId from backend on first request - if (event.conversationId && !threadId) { - setThreadId(event.conversationId) + if (event.conversationId && !conversationId) { + setConversationId(event.conversationId) } } else if (event.type === EventTypes.MESSAGE_CHUNK) { contentRef.current += event.content diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/chat.store.test.ts b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/chat.store.test.ts index feaded37c..a354c2434 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/chat.store.test.ts +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/chat.store.test.ts @@ -94,7 +94,7 @@ describe('chat.store', () => { // Verify fresh state expect(chatStore.getState().chatMessages).toHaveLength(0) - expect(chatStore.getState().threadId).toBeNull() + expect(chatStore.getState().conversationId).toBeNull() // Start new conversation act(() => { diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/chat.store.ts b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/chat.store.ts index 4c4008f06..b85145c4e 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/chat.store.ts +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/chat.store.ts @@ -5,7 +5,7 @@ export interface ChatMessage { id: string type: 'user' | 'ai' content: string - threadId: string + conversationId: string timestamp: number status?: 'streaming' | 'complete' | 'error' question?: string // For AI messages, store the question @@ -16,7 +16,7 @@ const sentAiMessageIds = new Set() interface ChatState { chatMessages: ChatMessage[] - threadId: string | null + conversationId: string | null actions: { submitQuestion: (question: string) => void updateAiMessage: ( @@ -24,7 +24,7 @@ interface ChatState { content: string, status: ChatMessage['status'] ) => void - setThreadId: (threadId: string) => void + setConversationId: (conversationId: string) => void clearChat: () => void hasMessageBeenSent: (id: string) => boolean markMessageAsSent: (id: string) => void @@ -33,7 +33,7 @@ interface ChatState { export const chatStore = create((set) => ({ chatMessages: [], - threadId: null, // Start with null - will be set by backend on first request + conversationId: null, // Start with null - will be set by backend on first request actions: { submitQuestion: (question: string) => { set((state) => { @@ -41,7 +41,7 @@ export const chatStore = create((set) => ({ id: uuidv4(), type: 'user', content: question, - threadId: state.threadId ?? '', + conversationId: state.conversationId ?? '', timestamp: Date.now(), } @@ -50,7 +50,7 @@ export const chatStore = create((set) => ({ type: 'ai', content: '', question, - threadId: state.threadId ?? '', + conversationId: state.conversationId ?? '', timestamp: Date.now(), status: 'streaming', } @@ -77,13 +77,13 @@ export const chatStore = create((set) => ({ })) }, - setThreadId: (threadId: string) => { - set({ threadId }) + setConversationId: (conversationId: string) => { + set({ conversationId }) }, clearChat: () => { sentAiMessageIds.clear() - set({ chatMessages: [], threadId: null }) + set({ chatMessages: [], conversationId: null }) }, hasMessageBeenSent: (id: string) => sentAiMessageIds.has(id), @@ -95,5 +95,5 @@ export const chatStore = create((set) => ({ })) export const useChatMessages = () => chatStore((state) => state.chatMessages) -export const useThreadId = () => chatStore((state) => state.threadId) +export const useConversationId = () => chatStore((state) => state.conversationId) export const useChatActions = () => chatStore((state) => state.actions) diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/useAskAi.ts b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/useAskAi.ts index 3020a9997..f37ded9c2 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/useAskAi.ts +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/useAskAi.ts @@ -11,7 +11,7 @@ const MESSAGE_THROTTLE_MS = 25 // Throttle messages to prevent UI flooding export const AskAiRequestSchema = z.object({ message: z.string(), - threadId: z.string().optional(), + conversationId: z.string().optional(), }) export type AskAiRequest = z.infer @@ -26,7 +26,7 @@ export interface UseAskAiResponse { interface Props { onEvent?: (event: AskAiEvent) => void onError?: (error: Error) => void - threadId?: string + conversationId?: string } export const useAskAi = (props: Props): UseAskAiResponse => { @@ -136,7 +136,7 @@ export const useAskAi = (props: Props): UseAskAiResponse => { setEvents([]) clearQueue() lastSentQuestionRef.current = question - const payload = createAskAiRequest(question, props.threadId) + const payload = createAskAiRequest(question, props.conversationId) try { await sendMessage(payload) @@ -148,7 +148,7 @@ export const useAskAi = (props: Props): UseAskAiResponse => { } } }, - [props.threadId, sendMessage, abort, clearQueue] + [props.conversationId, sendMessage, abort, clearQueue] ) useEffect(() => { diff --git a/src/api/Elastic.Documentation.Api.Core/AskAi/AskAiUsecase.cs b/src/api/Elastic.Documentation.Api.Core/AskAi/AskAiUsecase.cs index 245ae84fd..5e7a45697 100644 --- a/src/api/Elastic.Documentation.Api.Core/AskAi/AskAiUsecase.cs +++ b/src/api/Elastic.Documentation.Api.Core/AskAi/AskAiUsecase.cs @@ -22,8 +22,8 @@ public async Task AskAi(AskAiRequest askAiRequest, Cancel ctx) _ = activity?.SetTag("gen_ai.operation.name", "chat"); _ = activity?.SetTag("gen_ai.provider.name", streamTransformer.AgentProvider); // agent-builder or llm-gateway _ = activity?.SetTag("gen_ai.agent.id", streamTransformer.AgentId); // docs-agent or docs_assistant - if (askAiRequest.ThreadId is not null) - _ = activity?.SetTag("gen_ai.conversation.id", askAiRequest.ThreadId); + if (askAiRequest.ConversationId is not null) + _ = activity?.SetTag("gen_ai.conversation.id", askAiRequest.ConversationId); var inputMessages = new[] { new InputMessage("user", [new MessagePart("text", askAiRequest.Message)]) @@ -34,12 +34,12 @@ public async Task AskAi(AskAiRequest askAiRequest, Cancel ctx) logger.LogInformation("Streaming AskAI response"); var rawStream = await askAiGateway.AskAi(askAiRequest, ctx); // The stream transformer will handle disposing the activity when streaming completes - var transformedStream = await streamTransformer.TransformAsync(rawStream, askAiRequest.ThreadId, activity, ctx); + var transformedStream = await streamTransformer.TransformAsync(rawStream, askAiRequest.ConversationId, activity, ctx); return transformedStream; } } -public record AskAiRequest(string Message, string? ThreadId) +public record AskAiRequest(string Message, string? ConversationId) { public static string SystemPrompt => """ diff --git a/src/api/Elastic.Documentation.Api.Core/AskAi/IStreamTransformer.cs b/src/api/Elastic.Documentation.Api.Core/AskAi/IStreamTransformer.cs index 097d17d98..56435df6d 100644 --- a/src/api/Elastic.Documentation.Api.Core/AskAi/IStreamTransformer.cs +++ b/src/api/Elastic.Documentation.Api.Core/AskAi/IStreamTransformer.cs @@ -23,9 +23,9 @@ public interface IStreamTransformer /// Transforms a raw SSE stream into a stream of AskAiEvent objects /// /// Raw SSE stream from gateway (Agent Builder, LLM Gateway, etc.) - /// Thread/conversation ID (if known) + /// Thread/conversation ID (if known) /// Parent activity to track the streaming operation (will be disposed when stream completes) /// Cancellation token /// Stream containing SSE-formatted AskAiEvent objects - Task TransformAsync(Stream rawStream, string? threadId, System.Diagnostics.Activity? parentActivity, CancellationToken cancellationToken = default); + Task TransformAsync(Stream rawStream, string? conversationId, System.Diagnostics.Activity? parentActivity, CancellationToken cancellationToken = default); } diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderAskAiGateway.cs b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderAskAiGateway.cs index ab5ce75c5..cdf53b3c9 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderAskAiGateway.cs +++ b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderAskAiGateway.cs @@ -26,14 +26,13 @@ public class AgentBuilderAskAiGateway(HttpClient httpClient, IParameterProvider public const string ProviderName = "agent-builder"; public async Task AskAi(AskAiRequest askAiRequest, Cancel ctx = default) { - // Only include conversation_id if threadId is provided (subsequent requests) var agentBuilderPayload = new AgentBuilderPayload( askAiRequest.Message, "docs-agent", - askAiRequest.ThreadId); + askAiRequest.ConversationId); var requestBody = JsonSerializer.Serialize(agentBuilderPayload, AgentBuilderContext.Default.AgentBuilderPayload); - logger.LogInformation("Sending to Agent Builder with conversation_id: {ConversationId}", askAiRequest.ThreadId ?? "(null - first request)"); + logger.LogInformation("Sending to Agent Builder with conversation_id: {ConversationId}", askAiRequest.ConversationId ?? "(null - first request)"); var kibanaUrl = await parameterProvider.GetParam("docs-kibana-url", false, ctx); var kibanaApiKey = await parameterProvider.GetParam("docs-kibana-apikey", true, ctx); diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderStreamTransformer.cs b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderStreamTransformer.cs index 38efb93d8..1b5e3d1a3 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderStreamTransformer.cs +++ b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderStreamTransformer.cs @@ -18,7 +18,7 @@ public class AgentBuilderStreamTransformer(ILogger AgentBuilderAskAiGateway.ModelName; protected override string GetAgentProvider() => AgentBuilderAskAiGateway.ProviderName; - protected override AskAiEvent? TransformJsonEvent(string? threadId, string? eventType, JsonElement json) + protected override AskAiEvent? TransformJsonEvent(string? conversationId, string? eventType, JsonElement json) { var type = eventType ?? "message"; var timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayAskAiGateway.cs b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayAskAiGateway.cs index 86b8d4a39..8626af230 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayAskAiGateway.cs +++ b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayAskAiGateway.cs @@ -68,7 +68,7 @@ public static LlmGatewayRequest CreateFromRequest(AskAiRequest request) => new ChatInput("user", AskAiRequest.SystemPrompt), new ChatInput("user", request.Message) ], - ThreadId: request.ThreadId ?? "elastic-docs-" + Guid.NewGuid() + ThreadId: request.ConversationId ?? "elastic-docs-" + Guid.NewGuid() ); } diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayStreamTransformer.cs b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayStreamTransformer.cs index f5b5a42e3..ff5217e4e 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayStreamTransformer.cs +++ b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayStreamTransformer.cs @@ -18,7 +18,7 @@ public class LlmGatewayStreamTransformer(ILogger lo { protected override string GetAgentId() => LlmGatewayAskAiGateway.ModelName; protected override string GetAgentProvider() => LlmGatewayAskAiGateway.ProviderName; - protected override AskAiEvent? TransformJsonEvent(string? threadId, string? eventType, JsonElement json) + protected override AskAiEvent? TransformJsonEvent(string? conversationId, string? eventType, JsonElement json) { // LLM Gateway format: ["custom", {type: "...", ...}] if (json.ValueKind != JsonValueKind.Array || json.GetArrayLength() < 2) @@ -36,7 +36,7 @@ public class LlmGatewayStreamTransformer(ILogger lo // LLM gateway does not emit conversation start events with thread IDs // so we create a synthetic conversation start event here - if (threadId is null) + if (conversationId is null) return new AskAiEvent.ConversationStart(id, timestamp, Guid.NewGuid().ToString()); return type switch diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerBase.cs b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerBase.cs index 3fd101187..94029b874 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerBase.cs +++ b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerBase.cs @@ -51,7 +51,7 @@ public abstract class StreamTransformerBase(ILogger logger) : IStreamTransformer /// public string AgentProvider => GetAgentProvider(); - public Task TransformAsync(Stream rawStream, string? threadId, Activity? parentActivity, Cancel cancellationToken = default) + public Task TransformAsync(Stream rawStream, string? conversationId, Activity? parentActivity, Cancel cancellationToken = default) { // Configure pipe for low-latency streaming var pipeOptions = new PipeOptions( @@ -70,7 +70,7 @@ public Task TransformAsync(Stream rawStream, string? threadId, Activity? // Note: We intentionally don't await this task as we need to return the stream immediately // The pipe handles synchronization and backpressure between producer and consumer // Pass parent activity - it will be disposed when streaming completes - _ = ProcessPipeAsync(reader, pipe.Writer, threadId, parentActivity, cancellationToken); + _ = ProcessPipeAsync(reader, pipe.Writer, conversationId, parentActivity, cancellationToken); // Return the read side of the pipe as a stream return Task.FromResult(pipe.Reader.AsStream()); @@ -80,13 +80,13 @@ public Task TransformAsync(Stream rawStream, string? threadId, Activity? /// Process the pipe reader and write transformed events to the pipe writer. /// This runs concurrently with the consumer reading from the output stream. /// - private async Task ProcessPipeAsync(PipeReader reader, PipeWriter writer, string? threadId, Activity? parentActivity, CancellationToken cancellationToken) + private async Task ProcessPipeAsync(PipeReader reader, PipeWriter writer, string? conversationId, Activity? parentActivity, CancellationToken cancellationToken) { try { try { - await ProcessStreamAsync(reader, writer, threadId, parentActivity, cancellationToken); + await ProcessStreamAsync(reader, writer, conversationId, parentActivity, cancellationToken); } catch (OperationCanceledException ex) { @@ -131,7 +131,7 @@ private async Task ProcessPipeAsync(PipeReader reader, PipeWriter writer, string /// Default implementation parses SSE events and JSON, then calls TransformJsonEvent. /// /// Stream processing result with metrics and captured output - private async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, string? threadId, Activity? parentActivity, CancellationToken cancellationToken) + private async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, string? conversationId, Activity? parentActivity, CancellationToken cancellationToken) { using var activity = StreamTransformerActivitySource.StartActivity(nameof(ProcessStreamAsync)); @@ -149,7 +149,7 @@ private async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, stri var root = doc.RootElement; // Subclass transforms JsonElement to AskAiEvent - transformedEvent = TransformJsonEvent(threadId, sseEvent.EventType, root); + transformedEvent = TransformJsonEvent(conversationId, sseEvent.EventType, root); } catch (JsonException ex) { @@ -248,11 +248,11 @@ private async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, stri /// Transform a parsed JSON event into an AskAiEvent. /// Subclasses implement provider-specific transformation logic. /// - /// The conversation/thread ID, if available + /// The conversation/thread ID, if available /// The SSE event type (from "event:" field), or null if not present /// The parsed JSON data from the "data:" field /// The transformed AskAiEvent, or null to skip this event - protected abstract AskAiEvent? TransformJsonEvent(string? threadId, string? eventType, JsonElement json); + protected abstract AskAiEvent? TransformJsonEvent(string? conversationId, string? eventType, JsonElement json); /// /// Write a transformed event to the output stream From 075c8c52a21697e1d856eff25311400bf6aa602a Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Tue, 4 Nov 2025 22:20:40 +0100 Subject: [PATCH 08/14] formatting --- .../web-components/SearchOrAskAi/AskAi/chat.store.ts | 3 ++- .../Assets/web-components/SearchOrAskAi/AskAi/useAskAi.ts | 5 ++++- .../Adapters/AskAi/LlmGatewayStreamTransformer.cs | 8 -------- .../Adapters/AskAi/StreamTransformerFactory.cs | 4 ++-- 4 files changed, 8 insertions(+), 12 deletions(-) diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/chat.store.ts b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/chat.store.ts index b85145c4e..7ca915b9a 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/chat.store.ts +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/chat.store.ts @@ -95,5 +95,6 @@ export const chatStore = create((set) => ({ })) export const useChatMessages = () => chatStore((state) => state.chatMessages) -export const useConversationId = () => chatStore((state) => state.conversationId) +export const useConversationId = () => + chatStore((state) => state.conversationId) export const useChatActions = () => chatStore((state) => state.actions) diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/useAskAi.ts b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/useAskAi.ts index f37ded9c2..319234b5b 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/useAskAi.ts +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/useAskAi.ts @@ -136,7 +136,10 @@ export const useAskAi = (props: Props): UseAskAiResponse => { setEvents([]) clearQueue() lastSentQuestionRef.current = question - const payload = createAskAiRequest(question, props.conversationId) + const payload = createAskAiRequest( + question, + props.conversationId + ) try { await sendMessage(payload) diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayStreamTransformer.cs b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayStreamTransformer.cs index ff5217e4e..2e150bba5 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayStreamTransformer.cs +++ b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayStreamTransformer.cs @@ -2,9 +2,6 @@ // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information -using System.Buffers; -using System.IO.Pipelines; -using System.Text; using System.Text.Json; using Elastic.Documentation.Api.Core.AskAi; using Microsoft.Extensions.Logging; @@ -34,11 +31,6 @@ public class LlmGatewayStreamTransformer(ILogger lo var id = message.GetProperty("id").GetString()!; var messageData = message.GetProperty("data"); - // LLM gateway does not emit conversation start events with thread IDs - // so we create a synthetic conversation start event here - if (conversationId is null) - return new AskAiEvent.ConversationStart(id, timestamp, Guid.NewGuid().ToString()); - return type switch { "ai_message_chunk" when messageData.TryGetProperty("content", out var content) => diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerFactory.cs b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerFactory.cs index 049bdd27c..4d9937871 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerFactory.cs +++ b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerFactory.cs @@ -39,9 +39,9 @@ private IStreamTransformer GetTransformer() public string AgentId => GetTransformer().AgentId; public string AgentProvider => GetTransformer().AgentProvider; - public async Task TransformAsync(Stream rawStream, System.Diagnostics.Activity? parentActivity, CancellationToken cancellationToken = default) + public async Task TransformAsync(Stream rawStream, string? conversationId, System.Diagnostics.Activity? parentActivity, Cancel cancellationToken = default) { var transformer = GetTransformer(); - return await transformer.TransformAsync(rawStream, parentActivity, cancellationToken); + return await transformer.TransformAsync(rawStream, conversationId, parentActivity, cancellationToken); } } From 986c816f6811dd5992c37eecc32dcefefd81b48a Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Tue, 4 Nov 2025 23:02:01 +0100 Subject: [PATCH 09/14] Fix message rendering --- .../Assets/web-components/SearchOrAskAi/AskAi/ChatMessage.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/ChatMessage.tsx b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/ChatMessage.tsx index 720d20854..56497f4fb 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/ChatMessage.tsx +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/ChatMessage.tsx @@ -65,7 +65,7 @@ interface ChatMessageProps { const getAccumulatedContent = (messages: AskAiEvent[]) => { return messages - .filter((m) => m.type === 'chunk') + .filter((m) => m.type === EventTypes.MESSAGE_CHUNK) .map((m) => m.content) .join('') } From 5d92861983a7799eb1fcaab13bdc59b9dc1969af Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Tue, 4 Nov 2025 23:07:47 +0100 Subject: [PATCH 10/14] Simplify streaming message --- .../SearchOrAskAi/AskAi/ChatMessage.tsx | 13 +++---------- .../SearchOrAskAi/AskAi/StreamingAiMessage.tsx | 14 +++++++------- 2 files changed, 10 insertions(+), 17 deletions(-) diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/ChatMessage.tsx b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/ChatMessage.tsx index 56497f4fb..a026415c0 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/ChatMessage.tsx +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/ChatMessage.tsx @@ -63,13 +63,6 @@ interface ChatMessageProps { onRetry?: () => void } -const getAccumulatedContent = (messages: AskAiEvent[]) => { - return messages - .filter((m) => m.type === EventTypes.MESSAGE_CHUNK) - .map((m) => m.content) - .join('') -} - const splitContentAndReferences = ( content: string ): { mainContent: string; referencesJson: string | null } => { @@ -279,9 +272,9 @@ export const ChatMessage = ({ ) } - const content = - streamingContent || - (events.length > 0 ? getAccumulatedContent(events) : message.content) + // Use streamingContent during streaming, otherwise use message.content from store + // message.content is updated atomically with status when CONVERSATION_END arrives + const content = streamingContent || message.content const hasError = message.status === 'error' || !!error diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/StreamingAiMessage.tsx b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/StreamingAiMessage.tsx index 7b526d34f..05a1abbc5 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/StreamingAiMessage.tsx +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/StreamingAiMessage.tsx @@ -6,7 +6,6 @@ import { useConversationId, } from './chat.store' import { useAskAi } from './useAskAi' -import * as React from 'react' import { useEffect, useRef } from 'react' interface StreamingAiMessageProps { @@ -45,7 +44,7 @@ export const StreamingAiMessage = ({ 'error' ) } else if (event.type === EventTypes.CONVERSATION_END) { - updateAiMessage(message.id, contentRef.current, 'complete') + updateAiMessage(message.id, message.content || contentRef.current, 'complete') } }, onError: (error) => { @@ -85,15 +84,16 @@ export const StreamingAiMessage = ({ markMessageAsSent, ]) + // Always use contentRef.current if it has content (regardless of status) + // This way we don't need to save to message.content and can just use streamingContent + const streamingContentToPass = + isLast && contentRef.current ? contentRef.current : undefined + return ( ) } From e7e3b0a285c71a5f4b6879cbeff50de9f1740687 Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Wed, 5 Nov 2025 08:16:01 +0100 Subject: [PATCH 11/14] Run prettier --- .../SearchOrAskAi/AskAi/StreamingAiMessage.tsx | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/StreamingAiMessage.tsx b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/StreamingAiMessage.tsx index 05a1abbc5..e90ad5547 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/StreamingAiMessage.tsx +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/StreamingAiMessage.tsx @@ -44,7 +44,11 @@ export const StreamingAiMessage = ({ 'error' ) } else if (event.type === EventTypes.CONVERSATION_END) { - updateAiMessage(message.id, message.content || contentRef.current, 'complete') + updateAiMessage( + message.id, + message.content || contentRef.current, + 'complete' + ) } }, onError: (error) => { From b8e68936d103a680a49e34bb0d287b3a844e1dba Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Wed, 5 Nov 2025 09:51:40 +0100 Subject: [PATCH 12/14] Move StreamTransformerBase to core package --- .../AskAi/StreamTransformerBase.cs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) rename src/api/{Elastic.Documentation.Api.Infrastructure/Adapters => Elastic.Documentation.Api.Core}/AskAi/StreamTransformerBase.cs (98%) diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerBase.cs b/src/api/Elastic.Documentation.Api.Core/AskAi/StreamTransformerBase.cs similarity index 98% rename from src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerBase.cs rename to src/api/Elastic.Documentation.Api.Core/AskAi/StreamTransformerBase.cs index 94029b874..1a2ca22b7 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerBase.cs +++ b/src/api/Elastic.Documentation.Api.Core/AskAi/StreamTransformerBase.cs @@ -8,11 +8,9 @@ using System.Runtime.CompilerServices; using System.Text; using System.Text.Json; -using Elastic.Documentation.Api.Core; -using Elastic.Documentation.Api.Core.AskAi; using Microsoft.Extensions.Logging; -namespace Elastic.Documentation.Api.Infrastructure.Adapters.AskAi; +namespace Elastic.Documentation.Api.Core.AskAi; /// /// Represents a parsed Server-Sent Event (SSE) From e563ac1bda97a9bddd8754e7c077e75418dbdbab Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Wed, 5 Nov 2025 10:00:13 +0100 Subject: [PATCH 13/14] Combine aiProviderStore and chat.store --- .../SearchOrAskAi/AskAi/AiProviderSelector.tsx | 9 ++++----- .../SearchOrAskAi/AskAi/aiProviderResolver.ts | 6 ------ .../SearchOrAskAi/AskAi/aiProviderStore.ts | 18 ------------------ .../SearchOrAskAi/AskAi/chat.store.ts | 10 ++++++++++ .../SearchOrAskAi/AskAi/useAskAi.ts | 2 +- 5 files changed, 15 insertions(+), 30 deletions(-) delete mode 100644 src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/aiProviderResolver.ts delete mode 100644 src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/aiProviderStore.ts diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/AiProviderSelector.tsx b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/AiProviderSelector.tsx index 1a2e720d2..ece8fa231 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/AiProviderSelector.tsx +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/AiProviderSelector.tsx @@ -1,5 +1,5 @@ /** @jsxImportSource @emotion/react */ -import { useAiProviderStore } from './aiProviderStore' +import { useChatActions, useAiProvider, type AiProvider } from './chat.store' import { EuiRadioGroup } from '@elastic/eui' import type { EuiRadioGroupOption } from '@elastic/eui' import { css } from '@emotion/react' @@ -22,16 +22,15 @@ const options: EuiRadioGroupOption[] = [ ] export const AiProviderSelector = () => { - const { provider, setProvider } = useAiProviderStore() + const provider = useAiProvider() + const { setAiProvider } = useChatActions() return (
- setProvider(id as 'AgentBuilder' | 'LlmGateway') - } + onChange={(id) => setAiProvider(id as AiProvider)} name="aiProvider" legend={{ children: 'AI Provider', diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/aiProviderResolver.ts b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/aiProviderResolver.ts deleted file mode 100644 index 0b7d4f12c..000000000 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/aiProviderResolver.ts +++ /dev/null @@ -1,6 +0,0 @@ -/** - * AI Provider selection - user-controlled via UI - * This file is kept for backwards compatibility but now just exports the type - */ - -export type AiProvider = 'AgentBuilder' | 'LlmGateway' diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/aiProviderStore.ts b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/aiProviderStore.ts deleted file mode 100644 index ef1902b6f..000000000 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/aiProviderStore.ts +++ /dev/null @@ -1,18 +0,0 @@ -import { create } from 'zustand' - -type AiProvider = 'AgentBuilder' | 'LlmGateway' - -interface AiProviderState { - provider: AiProvider - setProvider: (provider: AiProvider) => void -} - -export const useAiProviderStore = create((set) => ({ - provider: 'LlmGateway', // Default to LLM Gateway - setProvider: (provider: AiProvider) => { - console.log(`[AI Provider] Switched to ${provider}`) - set({ provider }) - }, -})) - -export const useAiProvider = () => useAiProviderStore((state) => state.provider) diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/chat.store.ts b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/chat.store.ts index 7ca915b9a..dcf128117 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/chat.store.ts +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/chat.store.ts @@ -1,6 +1,8 @@ import { v4 as uuidv4 } from 'uuid' import { create } from 'zustand/react' +export type AiProvider = 'AgentBuilder' | 'LlmGateway' + export interface ChatMessage { id: string type: 'user' | 'ai' @@ -17,6 +19,7 @@ const sentAiMessageIds = new Set() interface ChatState { chatMessages: ChatMessage[] conversationId: string | null + aiProvider: AiProvider actions: { submitQuestion: (question: string) => void updateAiMessage: ( @@ -25,6 +28,7 @@ interface ChatState { status: ChatMessage['status'] ) => void setConversationId: (conversationId: string) => void + setAiProvider: (provider: AiProvider) => void clearChat: () => void hasMessageBeenSent: (id: string) => boolean markMessageAsSent: (id: string) => void @@ -34,6 +38,7 @@ interface ChatState { export const chatStore = create((set) => ({ chatMessages: [], conversationId: null, // Start with null - will be set by backend on first request + aiProvider: 'LlmGateway', // Default to LLM Gateway actions: { submitQuestion: (question: string) => { set((state) => { @@ -81,6 +86,10 @@ export const chatStore = create((set) => ({ set({ conversationId }) }, + setAiProvider: (provider: AiProvider) => { + set({ aiProvider: provider }) + }, + clearChat: () => { sentAiMessageIds.clear() set({ chatMessages: [], conversationId: null }) @@ -97,4 +106,5 @@ export const chatStore = create((set) => ({ export const useChatMessages = () => chatStore((state) => state.chatMessages) export const useConversationId = () => chatStore((state) => state.conversationId) +export const useAiProvider = () => chatStore((state) => state.aiProvider) export const useChatActions = () => chatStore((state) => state.actions) diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/useAskAi.ts b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/useAskAi.ts index 319234b5b..9335d1222 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/useAskAi.ts +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/useAskAi.ts @@ -1,5 +1,5 @@ import { AskAiEvent, AskAiEventSchema } from './AskAiEvent' -import { useAiProvider } from './aiProviderStore' +import { useAiProvider } from './chat.store' import { useFetchEventSource } from './useFetchEventSource' import { useMessageThrottling } from './useMessageThrottling' import { EventSourceMessage } from '@microsoft/fetch-event-source' From ae267947aaa44d5303098d5d6a6c7072804c8476 Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Wed, 5 Nov 2025 10:16:09 +0100 Subject: [PATCH 14/14] Fix tests --- src/Elastic.Documentation.Site/Assets/styles.css | 2 +- .../Assets/web-components/SearchOrAskAi/AskAi/Chat.test.tsx | 2 ++ .../Assets/web-components/SearchOrAskAi/Search/Search.test.tsx | 2 ++ 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/Elastic.Documentation.Site/Assets/styles.css b/src/Elastic.Documentation.Site/Assets/styles.css index 298ca1a84..fb3bbf8a6 100644 --- a/src/Elastic.Documentation.Site/Assets/styles.css +++ b/src/Elastic.Documentation.Site/Assets/styles.css @@ -1,5 +1,5 @@ @import 'tailwindcss'; -@config "../tailwind.config.js"; +@config '../tailwind.config.js'; @import './fonts.css'; @import './theme.css'; diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/Chat.test.tsx b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/Chat.test.tsx index 70773b284..6171b8f26 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/Chat.test.tsx +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/Chat.test.tsx @@ -9,9 +9,11 @@ jest.mock('./chat.store', () => ({ getState: jest.fn(), }, useChatMessages: jest.fn(() => []), + useAiProvider: jest.fn(() => 'LlmGateway'), useChatActions: jest.fn(() => ({ submitQuestion: jest.fn(), clearChat: jest.fn(), + setAiProvider: jest.fn(), })), })) diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/Search/Search.test.tsx b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/Search/Search.test.tsx index 56d509d3d..734d31e8a 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/Search/Search.test.tsx +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/Search/Search.test.tsx @@ -26,6 +26,7 @@ jest.mock('../AskAi/chat.store', () => ({ useChatActions: jest.fn(() => ({ submitQuestion: jest.fn(), clearChat: jest.fn(), + setAiProvider: jest.fn(), })), })) @@ -66,6 +67,7 @@ describe('Search Component', () => { mockUseChatActions.mockReturnValue({ submitQuestion: mockSubmitQuestion, clearChat: mockClearChat, + setAiProvider: jest.fn(), }) mockUseModalActions.mockReturnValue({ setModalMode: mockSetModalMode,