diff --git a/sdk/core/System.ClientModel/src/Internal/SSE/AsyncServerSentEventEnumerator.cs b/sdk/core/System.ClientModel/src/Internal/SSE/AsyncServerSentEventEnumerator.cs deleted file mode 100644 index 3e1ecc0e6f77..000000000000 --- a/sdk/core/System.ClientModel/src/Internal/SSE/AsyncServerSentEventEnumerator.cs +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -using System.Collections.Generic; -using System.IO; -using System.Threading; -using System.Threading.Tasks; - -namespace System.ClientModel.Internal; - -internal sealed class AsyncServerSentEventEnumerator : IAsyncEnumerator -{ - private readonly ReadOnlyMemory _terminalEvent; - private readonly CancellationToken _cancellationToken; - - private ServerSentEventReader? _reader; - private ServerSentEvent _current; - - public ServerSentEvent Current => _current; - - public AsyncServerSentEventEnumerator(Stream contentStream, string terminalEvent, CancellationToken cancellationToken = default) - { - _reader = new(contentStream); - _cancellationToken = cancellationToken; - _terminalEvent = terminalEvent.AsMemory(); - } - - public async ValueTask MoveNextAsync() - { - if (_reader is null) - { - throw new ObjectDisposedException(nameof(AsyncServerSentEventEnumerator)); - } - - ServerSentEvent? nextEvent = await _reader.TryGetNextEventAsync(_cancellationToken).ConfigureAwait(false); - - if (nextEvent.HasValue) - { - if (nextEvent.Value.Data.Span.SequenceEqual(_terminalEvent.Span)) - { - _current = default; - return false; - } - - _current = nextEvent.Value; - return true; - } - - return false; - } - - public async ValueTask DisposeAsync() - { - await DisposeAsyncCore().ConfigureAwait(false); - - GC.SuppressFinalize(this); - } - - private async ValueTask DisposeAsyncCore() - { - if (_reader is not null) - { - await _reader.DisposeAsync().ConfigureAwait(false); - _reader = null; - } - } -} diff --git a/sdk/core/System.ClientModel/src/Internal/SSE/AsyncSseItemCollection.cs b/sdk/core/System.ClientModel/src/Internal/SSE/AsyncSseItemCollection.cs new file mode 100644 index 000000000000..2c95d80ab682 --- /dev/null +++ b/sdk/core/System.ClientModel/src/Internal/SSE/AsyncSseItemCollection.cs @@ -0,0 +1,105 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.Collections.Generic; +using System.Formats.Sse; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace System.ClientModel.Internal; + +internal class AsyncSseItemCollection : IAsyncEnumerable> +{ + private readonly Func> _getSseStreamAsync; + private readonly Func _parseItem; + private readonly Func, bool>? _isTerminalEvent; + + public AsyncSseItemCollection(Func> getSseStreamAsync, + Func parseItem, + Func, bool>? isTerminalEvent = default) + { + _getSseStreamAsync = getSseStreamAsync; + _parseItem = parseItem; + _isTerminalEvent = isTerminalEvent; + } + + public IAsyncEnumerator> GetAsyncEnumerator(CancellationToken cancellationToken = default) + { + return new AsyncSseItemEnumerator(_getSseStreamAsync, _parseItem, _isTerminalEvent, cancellationToken); + } + + private sealed class AsyncSseItemEnumerator : IAsyncEnumerator> + { + private readonly Func> _getSseStreamAsync; + private readonly Func _parseItem; + private readonly Func, bool>? _isTerminalEvent; + private readonly CancellationToken _cancellationToken; + + private ServerSentEventReader? _reader; + private bool _started; + private SseItem _current; + + public SseItem Current => _current; + + public AsyncSseItemEnumerator(Func> getSseStreamAsync, + Func parseItem, + Func, bool>? isTerminalEvent = default, + CancellationToken cancellationToken = default) + { + _getSseStreamAsync = getSseStreamAsync; + _parseItem = parseItem; + _isTerminalEvent = isTerminalEvent; + _cancellationToken = cancellationToken; + } + + public async ValueTask MoveNextAsync() + { + if (_reader is null && _started) + { + throw new ObjectDisposedException(nameof(AsyncSseItemCollection)); + } + + // TODO: consolidate with above + if (_reader is null) + { + Stream stream = await _getSseStreamAsync().ConfigureAwait(false); + _reader = new(stream, _parseItem); + _started = true; + } + + SseItem? nextEvent = await _reader.TryGetNextEventAsync(_cancellationToken).ConfigureAwait(false); + + if (nextEvent.HasValue) + { + if (_isTerminalEvent is not null && + _isTerminalEvent(nextEvent.Value)) + { + _current = default; + return false; + } + + _current = nextEvent.Value; + return true; + } + + return false; + } + + public async ValueTask DisposeAsync() + { + await DisposeAsyncCore().ConfigureAwait(false); + + GC.SuppressFinalize(this); + } + + private async ValueTask DisposeAsyncCore() + { + if (_reader is not null) + { + await _reader.DisposeAsync().ConfigureAwait(false); + _reader = null; + } + } + } +} diff --git a/sdk/core/System.ClientModel/src/Internal/SSE/PendingSseItem.cs b/sdk/core/System.ClientModel/src/Internal/SSE/PendingSseItem.cs new file mode 100644 index 000000000000..99bad935eb1a --- /dev/null +++ b/sdk/core/System.ClientModel/src/Internal/SSE/PendingSseItem.cs @@ -0,0 +1,66 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.Collections.Generic; +using System.Diagnostics; + +namespace System.Formats.Sse; + +internal struct PendingSseItem +{ + private const char LF = '\n'; + + private List? _dataFields; + + // TODO: I think making the fields nullable makes them take up more space? + + public int DataLength { get; set; } + public List DataFields => _dataFields ??= new(); + public ServerSentEventField? EventTypeField { get; set; } + public ServerSentEventField? IdField { get; set; } + public ServerSentEventField? RetryField { get; set; } + + public SseItem ToSseItem(Func itemParser) + { + SseItem item = default; + + // Per spec, if event type buffer is empty, set event.type to "message". + item.EventType = EventTypeField.HasValue ? + EventTypeField.Value.Value.ToString() : + "message"; + + if (IdField.HasValue) + { + item.Id = IdField.Value.Value.ToString(); + } + + if (RetryField.HasValue) + { +#if NETSTANDARD2_0 + item.ReconnectionTime = int.TryParse(RetryField.Value.Value.ToString(), out int retry) ? TimeSpan.FromMilliseconds(retry) : null; +#else + item.ReconnectionTime = int.TryParse(RetryField.Value.Value.Span, out int retry) ? TimeSpan.FromMilliseconds(retry) : null; +#endif + } + + Debug.Assert(DataLength > 0); + + Memory buffer = new(new char[DataLength]); + + int curr = 0; + + foreach (ServerSentEventField field in DataFields) + { + Debug.Assert(field.FieldType == ServerSentEventFieldKind.Data); + + field.Value.Span.CopyTo(buffer.Span.Slice(curr)); + buffer.Span[curr + field.Value.Length] = LF; + curr += field.Value.Length + 1; + } + + // remove trailing LF and parse as T. + item.Data = itemParser(buffer.Slice(0, buffer.Length - 1).ToString()); + + return item; + } +} diff --git a/sdk/core/System.ClientModel/src/Internal/SSE/ServerSentEvent.cs b/sdk/core/System.ClientModel/src/Internal/SSE/ServerSentEvent.cs deleted file mode 100644 index 130274b78a2d..000000000000 --- a/sdk/core/System.ClientModel/src/Internal/SSE/ServerSentEvent.cs +++ /dev/null @@ -1,79 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -using System.Collections.Generic; -using System.Diagnostics; - -namespace System.ClientModel.Internal; - -internal struct PendingEvent -{ - private List? _dataFields; - - public int DataLength { get; set; } - public List DataFields => _dataFields ??= new(); - public ServerSentEventField? EventNameField { get; set; } - public ServerSentEventField? IdField { get; set; } - public ServerSentEventField? RetryField { get; set; } -} - -// SSE specification: https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream -internal readonly struct ServerSentEvent -{ - private const char LF = '\n'; - - // Gets the value of the SSE "event type" buffer, used to distinguish between event kinds. - public ReadOnlyMemory EventName { get; } - // Gets the value of the SSE "data" buffer, which holds the payload of the server-sent event. - public ReadOnlyMemory Data { get; } - // Gets the value of the "last event ID" buffer, with which a user agent can reestablish a session. - public ReadOnlyMemory LastEventId { get; } - // If present, gets the defined "retry" value for the event, which represents the delay before reconnecting. - public TimeSpan? ReconnectionTime { get; } - - internal ServerSentEvent(PendingEvent pending) - { - if (pending.EventNameField.HasValue) - { - EventName = pending.EventNameField.Value.Value; - } - - if (pending.IdField.HasValue) - { - LastEventId = pending.IdField.Value.Value; - } - - if (pending.RetryField.HasValue) - { -#if NETSTANDARD2_0 - ReconnectionTime = int.TryParse(pending.RetryField.Value.Value.ToString(), out int retry) ? TimeSpan.FromMilliseconds(retry) : null; -#else - ReconnectionTime = int.TryParse(pending.RetryField.Value.Value.Span, out int retry) ? TimeSpan.FromMilliseconds(retry) : null; -#endif - } - - Debug.Assert(pending.DataLength > 0); - - Memory buffer = new(new char[pending.DataLength]); - - int curr = 0; - - foreach (ServerSentEventField field in pending.DataFields) - { - Debug.Assert(field.FieldType == ServerSentEventFieldKind.Data); - - field.Value.Span.CopyTo(buffer.Span.Slice(curr)); - buffer.Span[curr + field.Value.Length] = LF; - curr += field.Value.Length + 1; - } - - // remove trailing LF. - Data = buffer.Slice(0, buffer.Length - 1); - - if (EventName.Length == 0) - { - // Per spec, if event type buffer is empty, set event.type to "message". - EventName = "message".ToCharArray(); - } - } -} diff --git a/sdk/core/System.ClientModel/src/Internal/SSE/ServerSentEventField.cs b/sdk/core/System.ClientModel/src/Internal/SSE/ServerSentEventField.cs index c484946fa37f..d52ce822c275 100644 --- a/sdk/core/System.ClientModel/src/Internal/SSE/ServerSentEventField.cs +++ b/sdk/core/System.ClientModel/src/Internal/SSE/ServerSentEventField.cs @@ -1,7 +1,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -namespace System.ClientModel.Internal; +namespace System.Formats.Sse; // SSE specification: https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream internal readonly struct ServerSentEventField diff --git a/sdk/core/System.ClientModel/src/Internal/SSE/ServerSentEventFieldKind.cs b/sdk/core/System.ClientModel/src/Internal/SSE/ServerSentEventFieldKind.cs index 48d1884a2a7a..8cff2141050b 100644 --- a/sdk/core/System.ClientModel/src/Internal/SSE/ServerSentEventFieldKind.cs +++ b/sdk/core/System.ClientModel/src/Internal/SSE/ServerSentEventFieldKind.cs @@ -1,7 +1,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -namespace System.ClientModel.Internal; +namespace System.Formats.Sse; // SSE specification: https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream internal enum ServerSentEventFieldKind diff --git a/sdk/core/System.ClientModel/src/Internal/SSE/ServerSentEventReader.cs b/sdk/core/System.ClientModel/src/Internal/SSE/ServerSentEventReader.cs index 3785b58cf68d..ab468170e88a 100644 --- a/sdk/core/System.ClientModel/src/Internal/SSE/ServerSentEventReader.cs +++ b/sdk/core/System.ClientModel/src/Internal/SSE/ServerSentEventReader.cs @@ -5,38 +5,40 @@ using System.Threading; using System.Threading.Tasks; -namespace System.ClientModel.Internal; +namespace System.Formats.Sse; // TODO: Different sync and async readers to dispose differently? -internal sealed class ServerSentEventReader : IDisposable, IAsyncDisposable +internal sealed class ServerSentEventReader : IDisposable, IAsyncDisposable { + private readonly Func _parseItem; private Stream? _stream; private StreamReader? _reader; public int? LastEventId { get; private set; } - public ServerSentEventReader(Stream stream) + public ServerSentEventReader(Stream stream, Func parseItem) { _stream = stream; _reader = new StreamReader(stream); + _parseItem = parseItem; } - /// - /// Synchronously retrieves the next server-sent event from the underlying stream, blocking until a new event is - /// available and returning null once no further data is present on the stream. - /// - /// An optional cancellation token that can abort subsequent reads. - /// - /// The next in the stream, or null once no more data can be read from the stream. - /// - public ServerSentEvent? TryGetNextEvent(CancellationToken cancellationToken = default) + ///// + ///// Synchronously retrieves the next server-sent event from the underlying stream, blocking until a new event is + ///// available and returning null once no further data is present on the stream. + ///// + ///// An optional cancellation token that can abort subsequent reads. + ///// + ///// The next in the stream, or null once no more data can be read from the stream. + ///// + public SseItem? TryGetNextEvent(CancellationToken cancellationToken = default) { if (_reader is null) { - throw new ObjectDisposedException(nameof(ServerSentEventReader)); + throw new ObjectDisposedException(nameof(ServerSentEventReader)); } - PendingEvent pending = default; + PendingSseItem pending = default; while (true) { cancellationToken.ThrowIfCancellationRequested(); @@ -54,27 +56,27 @@ public ServerSentEventReader(Stream stream) if (dispatch) { - return new ServerSentEvent(pending); + return pending.ToSseItem(_parseItem); } } } - /// - /// Asynchronously retrieves the next server-sent event from the underlying stream, blocking until a new event is - /// available and returning null once no further data is present on the stream. - /// - /// An optional cancellation token that can abort subsequent reads. - /// - /// The next in the stream, or null once no more data can be read from the stream. - /// - public async Task TryGetNextEventAsync(CancellationToken cancellationToken = default) + ///// + ///// Asynchronously retrieves the next server-sent event from the underlying stream, blocking until a new event is + ///// available and returning null once no further data is present on the stream. + ///// + ///// An optional cancellation token that can abort subsequent reads. + ///// + ///// The next in the stream, or null once no more data can be read from the stream. + ///// + public async Task?> TryGetNextEventAsync(CancellationToken cancellationToken = default) { if (_reader is null) { - throw new ObjectDisposedException(nameof(ServerSentEventReader)); + throw new ObjectDisposedException(nameof(ServerSentEventReader)); } - PendingEvent pending = default; + PendingSseItem pending = default; while (true) { cancellationToken.ThrowIfCancellationRequested(); @@ -92,12 +94,12 @@ public ServerSentEventReader(Stream stream) if (dispatch) { - return new ServerSentEvent(pending); + return pending.ToSseItem(_parseItem); } } } - private static void ProcessLine(string line, ref PendingEvent pending, out bool dispatch) + private static void ProcessLine(string line, ref PendingSseItem pending, out bool dispatch) { dispatch = false; @@ -122,7 +124,7 @@ private static void ProcessLine(string line, ref PendingEvent pending, out bool switch (field.FieldType) { case ServerSentEventFieldKind.Event: - pending.EventNameField = field; + pending.EventTypeField = field; break; case ServerSentEventFieldKind.Data: pending.DataLength += field.Value.Length + 1; diff --git a/sdk/core/System.ClientModel/src/Internal/SSE/SseItemOfT.cs b/sdk/core/System.ClientModel/src/Internal/SSE/SseItemOfT.cs new file mode 100644 index 000000000000..daa3937c1755 --- /dev/null +++ b/sdk/core/System.ClientModel/src/Internal/SSE/SseItemOfT.cs @@ -0,0 +1,36 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Text; + +namespace System.Formats.Sse; + +internal struct SseItem +{ + // TODO: why nullable? + //public string? Event { get; set; } = "message"; + + public string EventType { get; set; } + + public T Data { get; set; } + + // TODO: is the event the best place for id and retry? + // would a different place make more sense? + + //public string LastEventId { get; set; } + public string? Id { get; set; } + + // TODO: name? + //public TimeSpan ReconnectionInterval { get; set; } + public TimeSpan? ReconnectionTime { get; set; } + + //internal SseItem(string name, T data, int? id, string retry) + //{ + // Data = data; + // Event = eventName; + // LastEventId = id; + // // TODO: retry + //} +} diff --git a/sdk/core/System.ClientModel/src/Internal/SSE/SseParser.cs b/sdk/core/System.ClientModel/src/Internal/SSE/SseParser.cs new file mode 100644 index 000000000000..d22f9f0d9886 --- /dev/null +++ b/sdk/core/System.ClientModel/src/Internal/SSE/SseParser.cs @@ -0,0 +1,36 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.Collections.Generic; +using System.IO; +using System.Threading.Tasks; + +namespace System.Formats.Sse; + +internal static class SseParser +{ + //// The proposed API relies on ref structs being able to be used as a + //// generic parameters and on Func being annotated accordingly. This + //// is on track to happen for .NET 9 / C# 13, but if it doesn't, a + //// different shape might be needed. + //public static IAsyncEnumerable> ParseAsync( + // Stream sseStream, + // Func, T> itemParser, + // CancellationToken cancellationToken = default) + //{ + // throw new NotImplementedException(); + //} + + // TODO: why is string be nullable? + // TODO: is Func the best type for terminal event check? + // TODO: responsibility for disposal of stream? -> obtain lazily? + // TODO: I think, don't take cancellationToken? + + public static IAsyncEnumerable> ParseAsync( + Func> getSseStreamAsync, + Func parseItem, + Func, bool>? isTerminalEvent = default) + { + throw new NotImplementedException(); + } +} diff --git a/sdk/core/System.ClientModel/tests/internal/Convenience/SSE/AsyncServerSentEventEnumeratorTests.cs b/sdk/core/System.ClientModel/tests/internal/Convenience/SSE/AsyncServerSentEventEnumeratorTests.cs index 66c7b8b6a6b6..7749a772a31a 100644 --- a/sdk/core/System.ClientModel/tests/internal/Convenience/SSE/AsyncServerSentEventEnumeratorTests.cs +++ b/sdk/core/System.ClientModel/tests/internal/Convenience/SSE/AsyncServerSentEventEnumeratorTests.cs @@ -16,7 +16,7 @@ public class AsyncServerSentEventEnumeratorTests public async Task EnumeratesEvents() { using Stream contentStream = BinaryData.FromString(_mockContent).ToStream(); - AsyncServerSentEventEnumerator enumerator = new(contentStream, "[DONE]"); + AsyncSseItemCollection enumerator = new(contentStream, "[DONE]"); int i = 0; while (await enumerator.MoveNextAsync()) @@ -38,7 +38,7 @@ public void ThrowsIfCancelled() CancellationToken token = new(true); using Stream contentStream = BinaryData.FromString(_mockContent).ToStream(); - AsyncServerSentEventEnumerator enumerator = new(contentStream, "[DONE]", token); + AsyncSseItemCollection enumerator = new(contentStream, "[DONE]", token); Assert.ThrowsAsync(async () => await enumerator.MoveNextAsync()); } @@ -60,7 +60,7 @@ public async Task StopsOnStringBasedTerminalEvent() """; using Stream contentStream = BinaryData.FromString(mockContent).ToStream(); - AsyncServerSentEventEnumerator enumerator = new(contentStream, "~stop~"); + AsyncSseItemCollection enumerator = new(contentStream, "~stop~"); List events = new();