-
Notifications
You must be signed in to change notification settings - Fork 22
Implement message Aggregator #6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: next
Are you sure you want to change the base?
Conversation
JPVenson
commented
Oct 29, 2025
- Added the logfile to gitignore
- Adapted devcontainer to install node
- Added Streaming aggregators for both message endpoints
src/Anthropic.Client.Tests/Services/Messages/MessageServiceTest.cs
Outdated
Show resolved
Hide resolved
| /// </summary> | ||
| /// <typeparam name="TMessage"></typeparam> | ||
| /// <typeparam name="TResult"></typeparam> | ||
| public abstract class SseAggregator<TMessage, TResult> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we make this and its subclasses internal? Since people should use it via the extensions anyway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can but we dont expose any internal that are dangerous to fiddle with and i would prefer at least giving devs the option to implement an aggregator on their own if they want to which could come in handy for some usecases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure where to leave this comment, but do we need any handling or logic related to not allowing these aggregators to be used more than once for the same async enumerable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i am trying to think of a scenario where you could do this actually but its hard to come up with. They are "hidden" behind that extension method and you really need to invest some work to do that the way they currently work and what the purpose of the IAsyncEnumerable in general is.
I dont think thats a big concern but also thats handled via the collectionTask so even if you try to enumerate multiple times, you get the same result and the same operation.
6b4ff8b to
9378660
Compare
sd-st
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only reviewed some of this PR but just had a few initial comments
| /// <summary> | ||
| /// The aggregation model for a stream of <see cref="BetaRawContentBlockDeltaEvent"/> | ||
| /// </summary> | ||
| public class BetaMessageContentAggregator : SseAggregator<BetaRawMessageStreamEvent, BetaMessage> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@TomerAberbach I noticed in Java we call this a message accumulator rather than aggregator. I am pretty sure internally we only call it an aggregator; does this make sense or do we want to match Java?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
c# has already defined the term "Aggregate" as a linq function that is functionally in the same category so i think it fits better here
| /// </summary> | ||
| /// <typeparam name="TMessage">The raw message base element type.</typeparam> | ||
| /// <typeparam name="TResult">The element type that defines an aggregated <typeparamref name="TMessage"/></typeparam> | ||
| public abstract class SseAggregator<TMessage, TResult> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we also need this to be an IAsyncEnumerable itself? Like one use-case we want to be able to handle, I believe, is that the user may want to lazily use the streaming result, but also accumulate it while they're doing that. I think we'd want something like this:
var myStream = ...;
var aggregator = new MessageAggregator(myStream);
await foreach (var item in aggregator)
{
// we may not actually consume the entire stream and/or use the aggregator at all
if(...) { throw new Exception(); }
}
aggregator.Complete(); // I forget if we decided if we wanted this or not
Console.WriteLine(aggregator.Message);Basically we want to be able to use the stream lazily but still be able to aggregate it at the end.
@TomerAberbach Am I remembering this correctly? And did we want a manual Complete method? I forget what we decided and it's been a while, plus I am having trouble finding it in Slack
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you want to filter the elements that go into the aggregator, you can still do that with linq
GetStreamSource().Where(e => e.Type == "ThatsWhatIWant").Aggregate()
As the aggregator is but a consumer of a IAsyncEnumerable you can do whatever you want including filtering and modifications to it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right but what if you want to also use the streaming output for something, like printing it to the user?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea funny enough i argued the exact same thing in the notion doc ...
I think we can still handle that without much change via linq extension methods. Will add that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I refactored the code and made some small changes so it can be more easily added into a linq tree.
IAsyncEnumerable<RawMessageStreamEvent> responseUpdates = client.Messages.CreateStreaming(
parameters
);
// some streaming endpoints have build-in aggregators that create logically aggregated objects that represent the full stream as its counterpart single object.
var message = responseUpdates.Aggregate();
// you can also add an aggregator as part of your linq chain to get realtime streaming and aggregation
var aggregator = new MessageContentAggregator();
await foreach (RawMessageStreamEvent rawEvent in responseUpdates.CollectAsync(aggregator))
{
// do something with the stream events
if (rawEvent.TryPickContentBlockDelta(out var delta))
{
if (delta.Delta.TryPickThinking(out var thinkingDelta))
{
Console.Write(thinkingDelta.Thinking);
}
else if (delta.Delta.TryPickText(out var textDelta))
{
Console.Write(textDelta.Text);
}
}
}
// and then get the full aggregated message
var fullMessage = responseUpdates.Aggregate();a6ccfa7 to
007b0c0
Compare
Format code
e3899bb to
1ea7675
Compare