Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<PackageVersion Include="AWSSDK.SQS" Version="4.0.2" />
<PackageVersion Include="AWSSDK.S3" Version="4.0.7.14" />
<PackageVersion Include="Elastic.OpenTelemetry" Version="1.1.0" />
<PackageVersion Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.12.0" />
<PackageVersion Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.13.0" />
<PackageVersion Include="KubernetesClient" Version="17.0.14" />
<PackageVersion Include="Elastic.Aspire.Hosting.Elasticsearch" Version="9.3.0" />
<PackageVersion Include="Elastic.Clients.Elasticsearch" Version="9.1.4" />
Expand Down Expand Up @@ -76,10 +76,10 @@
<ItemGroup>
<PackageVersion Include="Microsoft.Extensions.Http.Resilience" Version="9.7.0" />
<PackageVersion Include="Microsoft.Extensions.ServiceDiscovery" Version="9.4.0" />
<PackageVersion Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.12.0" />
<PackageVersion Include="OpenTelemetry.Extensions.Hosting" Version="1.12.0" />
<PackageVersion Include="OpenTelemetry.Instrumentation.Http" Version="1.12.0" />
<PackageVersion Include="OpenTelemetry.Instrumentation.Runtime" Version="1.12.0" />
<PackageVersion Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.13.1" />
<PackageVersion Include="OpenTelemetry.Extensions.Hosting" Version="1.13.1" />
<PackageVersion Include="OpenTelemetry.Instrumentation.Http" Version="1.13.0" />
<PackageVersion Include="OpenTelemetry.Instrumentation.Runtime" Version="1.13.0" />
</ItemGroup>
<!-- Test packages -->
<ItemGroup>
Expand All @@ -99,4 +99,4 @@
</PackageVersion>
<PackageVersion Include="xunit.v3" Version="2.0.2" />
</ItemGroup>
</Project>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import * as z from 'zod'
// Event type constants for type-safe referencing
export const EventTypes = {
CONVERSATION_START: 'conversation_start',
CHUNK: 'chunk',
CHUNK_COMPLETE: 'chunk_complete',
MESSAGE_CHUNK: 'message_chunk',
MESSAGE_COMPLETE: 'message_complete',
SEARCH_TOOL_CALL: 'search_tool_call',
TOOL_CALL: 'tool_call',
TOOL_RESULT: 'tool_result',
Expand All @@ -23,14 +23,14 @@ export const ConversationStartEventSchema = z.object({
})

export const ChunkEventSchema = z.object({
type: z.literal(EventTypes.CHUNK),
type: z.literal(EventTypes.MESSAGE_CHUNK),
id: z.string(),
timestamp: z.number(),
content: z.string(),
})

export const ChunkCompleteEventSchema = z.object({
type: z.literal(EventTypes.CHUNK_COMPLETE),
type: z.literal(EventTypes.MESSAGE_COMPLETE),
id: z.string(),
timestamp: z.number(),
fullContent: z.string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ const computeAiStatus = (
m.type === EventTypes.SEARCH_TOOL_CALL ||
m.type === EventTypes.TOOL_CALL ||
m.type === EventTypes.TOOL_RESULT ||
m.type === EventTypes.CHUNK
m.type === EventTypes.MESSAGE_CHUNK
)
.sort((a, b) => a.timestamp - b.timestamp)

Expand All @@ -166,9 +166,9 @@ const computeAiStatus = (
case EventTypes.TOOL_RESULT:
return STATUS_MESSAGES.ANALYZING

case EventTypes.CHUNK: {
case EventTypes.MESSAGE_CHUNK: {
const allContent = events
.filter((m) => m.type === EventTypes.CHUNK)
.filter((m) => m.type === EventTypes.MESSAGE_CHUNK)
.map((m) => m.content)
.join('')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export const StreamingAiMessage = ({
if (event.conversationId && !threadId) {
setThreadId(event.conversationId)
}
} else if (event.type === EventTypes.CHUNK) {
} else if (event.type === EventTypes.MESSAGE_CHUNK) {
contentRef.current += event.content
} else if (event.type === EventTypes.ERROR) {
// Handle error events from the stream
Expand Down
12 changes: 6 additions & 6 deletions src/api/Elastic.Documentation.Api.Core/AskAi/AskAiEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ namespace Elastic.Documentation.Api.Core.AskAi;
/// </summary>
[JsonPolymorphic(TypeDiscriminatorPropertyName = "type")]
[JsonDerivedType(typeof(ConversationStart), typeDiscriminator: "conversation_start")]
[JsonDerivedType(typeof(Chunk), typeDiscriminator: "chunk")]
[JsonDerivedType(typeof(ChunkComplete), typeDiscriminator: "chunk_complete")]
[JsonDerivedType(typeof(MessageChunk), typeDiscriminator: "message_chunk")]
[JsonDerivedType(typeof(MessageComplete), typeDiscriminator: "message_complete")]
[JsonDerivedType(typeof(SearchToolCall), typeDiscriminator: "search_tool_call")]
[JsonDerivedType(typeof(ToolCall), typeDiscriminator: "tool_call")]
[JsonDerivedType(typeof(ToolResult), typeDiscriminator: "tool_result")]
Expand All @@ -33,7 +33,7 @@ string ConversationId
/// <summary>
/// Streaming text chunk from AI
/// </summary>
public sealed record Chunk(
public sealed record MessageChunk(
string Id,
long Timestamp,
string Content
Expand All @@ -42,7 +42,7 @@ string Content
/// <summary>
/// Complete message when streaming is done
/// </summary>
public sealed record ChunkComplete(
public sealed record MessageComplete(
string Id,
long Timestamp,
string FullContent
Expand Down Expand Up @@ -111,8 +111,8 @@ string Message
/// </summary>
[JsonSerializable(typeof(AskAiEvent))]
[JsonSerializable(typeof(AskAiEvent.ConversationStart))]
[JsonSerializable(typeof(AskAiEvent.Chunk))]
[JsonSerializable(typeof(AskAiEvent.ChunkComplete))]
[JsonSerializable(typeof(AskAiEvent.MessageChunk))]
[JsonSerializable(typeof(AskAiEvent.MessageComplete))]
[JsonSerializable(typeof(AskAiEvent.SearchToolCall))]
[JsonSerializable(typeof(AskAiEvent.ToolCall))]
[JsonSerializable(typeof(AskAiEvent.ToolResult))]
Expand Down
47 changes: 15 additions & 32 deletions src/api/Elastic.Documentation.Api.Core/AskAi/AskAiUsecase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
// See the LICENSE file in the project root for more information

using System.Diagnostics;
using System.Text.Json;
using Elastic.Documentation.Api.Core;
using Microsoft.Extensions.Logging;

namespace Elastic.Documentation.Api.Core.AskAi;
Expand All @@ -16,41 +18,22 @@ public class AskAiUsecase(

public async Task<Stream> AskAi(AskAiRequest askAiRequest, Cancel ctx)
{
// Start activity for the chat request - DO NOT use 'using' because the stream is consumed later
// The activity will be passed to the transformer which will dispose it when the stream completes
var activity = AskAiActivitySource.StartActivity("chat", ActivityKind.Client);

// Generate a correlation ID for tracking if this is a new conversation
// For first messages (no ThreadId), generate a temporary ID that will be updated when the provider responds
var correlationId = askAiRequest.ThreadId ?? $"temp-{Guid.NewGuid()}";

// Set GenAI semantic convention attributes
_ = (activity?.SetTag("gen_ai.operation.name", "chat"));
_ = (activity?.SetTag("gen_ai.conversation.id", correlationId)); // Will be updated when we receive ConversationStart with actual ID
_ = (activity?.SetTag("gen_ai.usage.input_tokens", askAiRequest.Message.Length)); // Approximate token count

// Custom attributes for tracking our abstraction layer
// We use custom attributes because we don't know the actual GenAI provider (OpenAI, Anthropic, etc.)
// or model (gpt-4, claude, etc.) - those are abstracted by AgentBuilder/LlmGateway
_ = (activity?.SetTag("docs.ai.gateway", streamTransformer.AgentProvider)); // agent-builder or llm-gateway
_ = (activity?.SetTag("docs.ai.agent_name", streamTransformer.AgentId)); // docs-agent or docs_assistant

// Add GenAI prompt event
_ = (activity?.AddEvent(new ActivityEvent("gen_ai.content.prompt",
timestamp: DateTimeOffset.UtcNow,
tags:
[
new KeyValuePair<string, object?>("gen_ai.prompt", askAiRequest.Message),
new KeyValuePair<string, object?>("gen_ai.system_instructions", AskAiRequest.SystemPrompt)
])));

logger.LogDebug("Processing AskAiRequest: {Request}", askAiRequest);

logger.LogInformation("Starting AskAI chat with {AgentProvider} and {AgentId}", streamTransformer.AgentProvider, streamTransformer.AgentId);
var activity = AskAiActivitySource.StartActivity($"chat", ActivityKind.Client);
_ = activity?.SetTag("gen_ai.operation.name", "chat");
_ = activity?.SetTag("gen_ai.provider.name", streamTransformer.AgentProvider); // agent-builder or llm-gateway
_ = activity?.SetTag("gen_ai.agent.id", streamTransformer.AgentId); // docs-agent or docs_assistant
var inputMessages = new[]
{
new InputMessage("user", [new MessagePart("text", askAiRequest.Message)])
};
var inputMessagesJson = JsonSerializer.Serialize(inputMessages, ApiJsonContext.Default.InputMessageArray);
_ = activity?.SetTag("gen_ai.input.messages", inputMessagesJson);
logger.LogInformation("AskAI input message: {InputMessage}", askAiRequest.Message);
logger.LogInformation("Streaming AskAI response");
var rawStream = await askAiGateway.AskAi(askAiRequest, ctx);

// The stream transformer will handle disposing the activity when streaming completes
var transformedStream = await streamTransformer.TransformAsync(rawStream, activity, ctx);

return transformedStream;
}
}
Expand Down
12 changes: 12 additions & 0 deletions src/api/Elastic.Documentation.Api.Core/SerializationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,21 @@

namespace Elastic.Documentation.Api.Core;

/// <summary>
/// Types for OpenTelemetry telemetry serialization (AOT-compatible)
/// </summary>
public record MessagePart(string Type, string Content);

public record InputMessage(string Role, MessagePart[] Parts);

public record OutputMessage(string Role, MessagePart[] Parts, string FinishReason);

[JsonSerializable(typeof(AskAiRequest))]
[JsonSerializable(typeof(SearchRequest))]
[JsonSerializable(typeof(SearchResponse))]
[JsonSerializable(typeof(InputMessage))]
[JsonSerializable(typeof(OutputMessage))]
[JsonSerializable(typeof(MessagePart))]
[JsonSerializable(typeof(InputMessage[]))]
[JsonSourceGenerationOptions(PropertyNamingPolicy = JsonKnownNamingPolicy.CamelCase)]
public partial class ApiJsonContext : JsonSerializerContext;
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ public class AgentBuilderStreamTransformer(ILogger<AgentBuilderStreamTransformer

// Special handling for error events - they may have a different structure
if (type == "error")
{
return ParseErrorEventFromRoot(id, timestamp, json);
}

// Most Agent Builder events have data nested in a "data" property
if (!json.TryGetProperty("data", out var innerData))
Expand All @@ -43,10 +41,10 @@ public class AgentBuilderStreamTransformer(ILogger<AgentBuilderStreamTransformer
new AskAiEvent.ConversationStart(id, timestamp, convId.GetString()!),

"message_chunk" when innerData.TryGetProperty("text_chunk", out var textChunk) =>
new AskAiEvent.Chunk(id, timestamp, textChunk.GetString()!),
new AskAiEvent.MessageChunk(id, timestamp, textChunk.GetString()!),

"message_complete" when innerData.TryGetProperty("message_content", out var fullContent) =>
new AskAiEvent.ChunkComplete(id, timestamp, fullContent.GetString()!),
new AskAiEvent.MessageComplete(id, timestamp, fullContent.GetString()!),

"reasoning" =>
// Parse reasoning message if available
Expand Down Expand Up @@ -76,7 +74,7 @@ public class AgentBuilderStreamTransformer(ILogger<AgentBuilderStreamTransformer
return null;
}

private AskAiEvent.Reasoning ParseReasoningEvent(string id, long timestamp, JsonElement innerData)
private static AskAiEvent.Reasoning ParseReasoningEvent(string id, long timestamp, JsonElement innerData)
{
// Agent Builder sends: {"data":{"reasoning":"..."}}
var message = innerData.TryGetProperty("reasoning", out var reasoningProp)
Expand All @@ -86,7 +84,7 @@ private AskAiEvent.Reasoning ParseReasoningEvent(string id, long timestamp, Json
return new AskAiEvent.Reasoning(id, timestamp, message ?? "Thinking...");
}

private AskAiEvent.ToolResult ParseToolResultEvent(string id, long timestamp, JsonElement innerData)
private static AskAiEvent.ToolResult ParseToolResultEvent(string id, long timestamp, JsonElement innerData)
{
// Extract tool_call_id and results
var toolCallId = innerData.TryGetProperty("tool_call_id", out var tcId) ? tcId.GetString() : id;
Expand All @@ -99,7 +97,7 @@ private AskAiEvent.ToolResult ParseToolResultEvent(string id, long timestamp, Js
return new AskAiEvent.ToolResult(id, timestamp, toolCallId ?? id, result);
}

private AskAiEvent ParseToolCallEvent(string id, long timestamp, JsonElement innerData)
private static AskAiEvent ParseToolCallEvent(string id, long timestamp, JsonElement innerData)
{
// Extract fields from Agent Builder's tool_call structure
var toolCallId = innerData.TryGetProperty("tool_call_id", out var tcId) ? tcId.GetString() : id;
Expand Down Expand Up @@ -128,16 +126,13 @@ private AskAiEvent ParseToolCallEvent(string id, long timestamp, JsonElement inn
return new AskAiEvent.ToolCall(id, timestamp, toolCallId ?? id, toolId ?? "unknown", args);
}

private AskAiEvent.ErrorEvent ParseErrorEventFromRoot(string id, long timestamp, JsonElement root)
private static AskAiEvent.ErrorEvent ParseErrorEventFromRoot(string id, long timestamp, JsonElement root)
{
// Agent Builder sends: {"error":{"code":"...","message":"...","meta":{...}}}
var errorMessage = root.TryGetProperty("error", out var errorProp) &&
errorProp.TryGetProperty("message", out var msgProp)
? msgProp.GetString()
: null;

Logger.LogError("Error event received from Agent Builder: {ErrorMessage}", errorMessage ?? "Unknown error");

return new AskAiEvent.ErrorEvent(id, timestamp, errorMessage ?? "Unknown error occurred");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ public class LlmGatewayStreamTransformer(ILogger<LlmGatewayStreamTransformer> lo
new AskAiEvent.ConversationStart(id, timestamp, Guid.NewGuid().ToString()),

"ai_message_chunk" when messageData.TryGetProperty("content", out var content) =>
new AskAiEvent.Chunk(id, timestamp, content.GetString()!),
new AskAiEvent.MessageChunk(id, timestamp, content.GetString()!),

"ai_message" when messageData.TryGetProperty("content", out var fullContent) =>
new AskAiEvent.ChunkComplete(id, timestamp, fullContent.GetString()!),
new AskAiEvent.MessageComplete(id, timestamp, fullContent.GetString()!),

"tool_call" when messageData.TryGetProperty("toolCalls", out var toolCalls) =>
TransformToolCall(id, timestamp, toolCalls),
Expand All @@ -56,6 +56,8 @@ public class LlmGatewayStreamTransformer(ILogger<LlmGatewayStreamTransformer> lo
"agent_end" =>
new AskAiEvent.ConversationEnd(id, timestamp),

"error" => ParseErrorEvent(id, timestamp, messageData),

"chat_model_start" or "chat_model_end" =>
null, // Skip model lifecycle events

Expand Down Expand Up @@ -110,4 +112,18 @@ public class LlmGatewayStreamTransformer(ILogger<LlmGatewayStreamTransformer> lo
Logger.LogWarning("Unknown LLM Gateway event type: {Type}", type);
return null;
}

private AskAiEvent.ErrorEvent ParseErrorEvent(string id, long timestamp, JsonElement messageData)
{
// LLM Gateway error format: {error: "...", message: "..."}
var errorMessage = messageData.TryGetProperty("message", out var msgProp)
? msgProp.GetString()
: messageData.TryGetProperty("error", out var errProp)
? errProp.GetString()
: null;

Logger.LogError("Error event received from LLM Gateway: {ErrorMessage}", errorMessage ?? "Unknown error");

return new AskAiEvent.ErrorEvent(id, timestamp, errorMessage ?? "Unknown error occurred");
}
}
Loading
Loading