@@ -21,16 +21,6 @@ namespace Elastic.Documentation.Api.Infrastructure.Adapters.AskAi;
2121/// <param name="Data">The accumulated data from all "data:" fields</param>
2222public record SseEvent ( string ? EventType , string Data ) ;
2323
24- /// <summary>
25- /// Holds the results of processing a stream, including metrics for OpenTelemetry
26- /// </summary>
27- public class StreamProcessingResult
28- {
29- public bool HasErrorEvents { get ; set ; }
30- public List < string > FinishReasons { get ; } = [ ] ;
31- public List < object > OutputMessages { get ; } = [ ] ;
32- }
33-
3424/// <summary>
3525/// Base class for stream transformers that handles common streaming logic
3626/// </summary>
@@ -144,11 +134,17 @@ private async Task ProcessPipeAsync(PipeReader reader, PipeWriter writer, Activi
144134 private async Task ProcessStreamAsync ( PipeReader reader , PipeWriter writer , Activity ? parentActivity , CancellationToken cancellationToken )
145135 {
146136 using var activity = StreamTransformerActivitySource . StartActivity ( "transform_stream" ) ;
147- var result = new StreamProcessingResult ( ) ;
137+
138+ if ( parentActivity ? . Id != null )
139+ _ = activity ? . SetParentId ( parentActivity . Id ) ;
140+
148141 List < MessagePart > outputMessageParts = [ ] ;
149142 await foreach ( var sseEvent in ParseSseEventsAsync ( reader , cancellationToken ) )
150143 {
151- AskAiEvent ? transformedEvent = null ;
144+ using var parseActivity = StreamTransformerActivitySource . StartActivity ( "parse_event" ) ;
145+ // parseActivity automatically inherits from Activity.Current (transform_stream)
146+
147+ AskAiEvent ? transformedEvent ;
152148 try
153149 {
154150 // Parse JSON once in base class
@@ -162,91 +158,79 @@ private async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, Acti
162158 {
163159 Logger . LogError ( ex , "Failed to parse JSON from SSE event for transformer {TransformerType}. EventType: {EventType}, Data: {Data}" ,
164160 GetType ( ) . Name , sseEvent . EventType , sseEvent . Data ) ;
165- var commonTags = new ActivityTagsCollection ( [
166- new KeyValuePair < string , object ? > ( "sse.type" , sseEvent . EventType ) ,
167- new KeyValuePair < string , object ? > ( "sse.data" , sseEvent . Data )
168- ] ) ;
169-
170- // Add error event to activity for JSON parsing failures
171- _ = activity ? . AddEvent (
172- new ActivityEvent (
173- name : "Failed to parse JSON" ,
174- timestamp : DateTimeOffset . UtcNow ,
175- commonTags
176- )
177- ) ;
178- _ = activity ? . SetStatus ( ActivityStatusCode . Error , "Failed to parse JSON from SSE event" ) ;
179- _ = activity ? . AddException ( ex ) ;
180- throw ; // Re-throw to be handled by caller
161+ throw ;
181162 }
182163
183164 if ( transformedEvent == null )
184165 continue ;
185166
167+ // Set event type tag on parse_event activity
168+ _ = parseActivity ? . SetTag ( "ask_ai.event" , transformedEvent . GetType ( ) . Name ) ;
169+
186170 switch ( transformedEvent )
187171 {
188172 case AskAiEvent . ConversationStart conversationStart :
189173 {
190174 _ = parentActivity ? . SetTag ( "gen_ai.conversation.id" , conversationStart . ConversationId ) ;
191175 _ = activity ? . SetTag ( "gen_ai.conversation.id" , conversationStart . ConversationId ) ;
192- _ = activity ? . AddEvent ( new ActivityEvent ( "conversation_start" ) ) ;
193176 break ;
194177 }
195178 case AskAiEvent . Reasoning reasoning :
196179 {
197- _ = activity ? . AddEvent ( new ActivityEvent ( "conversation_start" ) ) ;
198180 outputMessageParts . Add ( new MessagePart ( "reasoning" , reasoning . Message ?? string . Empty ) ) ;
199181 break ;
200182 }
201183 case AskAiEvent . Chunk :
202184 {
203- _ = activity ? . AddEvent ( new ActivityEvent ( "message_chunk" ) ) ;
185+ // Event type already tagged above
204186 break ;
205187 }
206188
207189 case AskAiEvent . ErrorEvent errorEvent :
208190 {
209- _ = activity ? . AddEvent ( new ActivityEvent ( "error" ) ) ;
210191 _ = activity ? . SetStatus ( ActivityStatusCode . Error , "AI provider error event" ) ;
211192 _ = activity ? . SetTag ( "error.type" , "AIProviderError" ) ;
212193 _ = activity ? . SetTag ( "error.message" , errorEvent . Message ) ;
194+ _ = parseActivity ? . SetStatus ( ActivityStatusCode . Error , errorEvent . Message ) ;
213195 break ;
214196 }
215197 case AskAiEvent . ToolCall :
216198 {
217- _ = activity ? . AddEvent ( new ActivityEvent ( "tool_call" ) ) ;
199+ // Event type already tagged above
218200 break ;
219201 }
220202 case AskAiEvent . SearchToolCall searchToolCall :
221203 {
222- _ = activity ? . AddEvent ( new ActivityEvent ( "search_tool_call" , tags : [
223- new KeyValuePair < string , object ? > ( "search.query" , searchToolCall . SearchQuery )
224- ] ) ) ;
204+ _ = parseActivity ? . SetTag ( "search.query" , searchToolCall . SearchQuery ) ;
225205 break ;
226206 }
227207 case AskAiEvent . ToolResult toolResult :
228208 {
229- _ = activity ? . AddEvent ( new ActivityEvent ( "tool_result" , tags : [
230- new KeyValuePair < string , object ? > ( "tool.result_summary" , toolResult . Result )
231- ] ) ) ;
209+ _ = parseActivity ? . SetTag ( "tool.result_summary" , toolResult . Result ) ;
232210 break ;
233211 }
234212 case AskAiEvent . ChunkComplete chunkComplete :
235213 {
236- var parts = outputMessageParts . Prepend ( new MessagePart ( "text" , chunkComplete . FullContent ) ) . ToArray ( ) ;
237- var outputMessages = new OutputMessage ( "assistant" , parts , "stop" ) ;
238- _ = activity ? . SetTag ( "gen_ai.output.messages" , JsonSerializer . Serialize ( outputMessages , ApiJsonContext . Default . OutputMessage ) ) ;
239- _ = activity ? . AddEvent ( new ActivityEvent ( "message_chunk_complete" ) ) ;
214+ outputMessageParts . Add ( new MessagePart ( "text" , chunkComplete . FullContent ) ) ;
240215 break ;
241216 }
242217 case AskAiEvent . ConversationEnd :
243218 {
244- _ = activity ? . AddEvent ( new ActivityEvent ( "conversation_end" ) ) ;
219+ // Event type already tagged above
245220 break ;
246221 }
247222 }
248223 await WriteEventAsync ( transformedEvent , writer , cancellationToken ) ;
249224 }
225+
226+ // Set output messages tag once after all events are processed
227+ if ( outputMessageParts . Count > 0 )
228+ {
229+ var outputMessages = new OutputMessage ( "assistant" , outputMessageParts . ToArray ( ) , "stop" ) ;
230+ var outputMessagesJson = JsonSerializer . Serialize ( outputMessages , ApiJsonContext . Default . OutputMessage ) ;
231+ _ = parentActivity ? . SetTag ( "gen_ai.output.messages" , outputMessagesJson ) ;
232+ _ = activity ? . SetTag ( "gen_ai.output.messages" , outputMessagesJson ) ;
233+ }
250234 }
251235
252236 /// <summary>
@@ -261,14 +245,10 @@ private async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, Acti
261245 /// <summary>
262246 /// Write a transformed event to the output stream
263247 /// </summary>
264- protected async Task WriteEventAsync ( AskAiEvent ? transformedEvent , PipeWriter writer , CancellationToken cancellationToken )
248+ private async Task WriteEventAsync ( AskAiEvent ? transformedEvent , PipeWriter writer , CancellationToken cancellationToken )
265249 {
266250 if ( transformedEvent == null )
267251 return ;
268-
269- // Don't create spans for each token - too noisy and expensive
270- // Just track metrics at the stream level
271-
272252 try
273253 {
274254 // Serialize as base AskAiEvent type to include the type discriminator
@@ -292,26 +272,17 @@ protected async Task WriteEventAsync(AskAiEvent? transformedEvent, PipeWriter wr
292272 /// Parse Server-Sent Events (SSE) from a PipeReader following the W3C SSE specification.
293273 /// This method handles the standard SSE format with event:, data:, and comment lines.
294274 /// </summary>
295- protected async IAsyncEnumerable < SseEvent > ParseSseEventsAsync (
275+ private static async IAsyncEnumerable < SseEvent > ParseSseEventsAsync (
296276 PipeReader reader ,
297277 [ EnumeratorCancellation ] CancellationToken cancellationToken )
298278 {
299- using var activity = StreamTransformerActivitySource . StartActivity ( "gen_ai.agent.parse" ) ;
300- _ = ( activity ? . SetTag ( "gen_ai.agent.name" , GetAgentId ( ) ) ) ;
301- _ = ( activity ? . SetTag ( "gen_ai.provider.name" , GetAgentProvider ( ) ) ) ;
302-
303279 string ? currentEvent = null ;
304280 var dataBuilder = new StringBuilder ( ) ;
305- var eventsParsed = 0 ;
306- var readOperations = 0 ;
307- var totalBytesRead = 0L ;
308281
309282 while ( ! cancellationToken . IsCancellationRequested )
310283 {
311- readOperations ++ ;
312284 var result = await reader . ReadAsync ( cancellationToken ) ;
313285 var buffer = result . Buffer ;
314- totalBytesRead += buffer . Length ;
315286
316287 // Process all complete lines in the buffer
317288 while ( TryReadLine ( ref buffer , out var line ) )
@@ -322,47 +293,33 @@ protected async IAsyncEnumerable<SseEvent> ParseSseEventsAsync(
322293
323294 // Event type line
324295 if ( line . StartsWith ( "event:" , StringComparison . Ordinal ) )
325- {
326- currentEvent = line . Substring ( 6 ) . Trim ( ) ;
327- }
296+ currentEvent = line [ 6 ..] . Trim ( ) ;
328297 // Data line
329298 else if ( line . StartsWith ( "data:" , StringComparison . Ordinal ) )
330- {
331- _ = dataBuilder . Append ( line . Substring ( 5 ) . Trim ( ) ) ;
332- }
299+ _ = dataBuilder . Append ( line [ 5 ..] . Trim ( ) ) ;
333300 // Empty line - marks end of event
334301 else if ( string . IsNullOrEmpty ( line ) )
335302 {
336- if ( dataBuilder . Length > 0 )
337- {
338- eventsParsed ++ ;
339- yield return new SseEvent ( currentEvent , dataBuilder . ToString ( ) ) ;
340- currentEvent = null ;
341- _ = dataBuilder . Clear ( ) ;
342- }
303+ if ( dataBuilder . Length <= 0 )
304+ continue ;
305+ yield return new SseEvent ( currentEvent , dataBuilder . ToString ( ) ) ;
306+ currentEvent = null ;
307+ _ = dataBuilder . Clear ( ) ;
343308 }
344309 }
345310
346311 // Tell the PipeReader how much of the buffer we consumed
347312 reader . AdvanceTo ( buffer . Start , buffer . End ) ;
348313
349314 // Stop reading if there's no more data coming
350- if ( result . IsCompleted )
351- {
352- // Yield any remaining event that hasn't been terminated with an empty line
353- if ( dataBuilder . Length > 0 )
354- {
355- eventsParsed ++ ;
356- yield return new SseEvent ( currentEvent , dataBuilder . ToString ( ) ) ;
357- }
358- break ;
359- }
360- }
315+ if ( ! result . IsCompleted )
316+ continue ;
361317
362- // Set metrics on the activity using GenAI conventions
363- _ = ( activity ? . SetTag ( "gen_ai.response.sse_event_count" , eventsParsed ) ) ;
364- _ = ( activity ? . SetTag ( "gen_ai.response.bytes_received" , totalBytesRead ) ) ;
365- _ = ( activity ? . SetTag ( "gen_ai.response.read_operations" , readOperations ) ) ;
318+ // Yield any remaining event that hasn't been terminated with an empty line
319+ if ( dataBuilder . Length > 0 )
320+ yield return new SseEvent ( currentEvent , dataBuilder . ToString ( ) ) ;
321+ break ;
322+ }
366323 }
367324
368325 /// <summary>
0 commit comments