From a3157c24707e87686569e39396d827e6eabfca0c Mon Sep 17 00:00:00 2001 From: Jicheng Lu <103353@smsassist.com> Date: Tue, 17 Jun 2025 13:17:28 -0500 Subject: [PATCH 01/14] init message hub and chathub observers --- Directory.Packages.props | 1 + .../BotSharp.Abstraction.csproj | 1 + .../Observables/Models/HubObserveData.cs | 6 + .../Observables/Models/ObserveDataBase.cs | 6 + .../Conversations/ConversationPlugin.cs | 3 + .../Observables/Queues/MessageHub.cs | 45 +++++++ .../BotSharp.Plugin.ChatHub/ChatHubPlugin.cs | 13 +- .../Observers/ChatHubObserver.cs | 119 ++++++++++++++++++ 8 files changed, 193 insertions(+), 1 deletion(-) create mode 100644 src/Infrastructure/BotSharp.Abstraction/Observables/Models/HubObserveData.cs create mode 100644 src/Infrastructure/BotSharp.Abstraction/Observables/Models/ObserveDataBase.cs create mode 100644 src/Infrastructure/BotSharp.Core/Observables/Queues/MessageHub.cs create mode 100644 src/Plugins/BotSharp.Plugin.ChatHub/Observers/ChatHubObserver.cs diff --git a/Directory.Packages.props b/Directory.Packages.props index cec2ac886..38e68d3ad 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -26,6 +26,7 @@ + diff --git a/src/Infrastructure/BotSharp.Abstraction/BotSharp.Abstraction.csproj b/src/Infrastructure/BotSharp.Abstraction/BotSharp.Abstraction.csproj index 97e645d0a..2008c6a2e 100644 --- a/src/Infrastructure/BotSharp.Abstraction/BotSharp.Abstraction.csproj +++ b/src/Infrastructure/BotSharp.Abstraction/BotSharp.Abstraction.csproj @@ -36,6 +36,7 @@ + diff --git a/src/Infrastructure/BotSharp.Abstraction/Observables/Models/HubObserveData.cs b/src/Infrastructure/BotSharp.Abstraction/Observables/Models/HubObserveData.cs new file mode 100644 index 000000000..a6d10cce5 --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Observables/Models/HubObserveData.cs @@ -0,0 +1,6 @@ +namespace BotSharp.Abstraction.Observables.Models; + +public class HubObserveData : ObserveDataBase +{ + public RoleDialogModel Data { get; set; } = null!; +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Observables/Models/ObserveDataBase.cs b/src/Infrastructure/BotSharp.Abstraction/Observables/Models/ObserveDataBase.cs new file mode 100644 index 000000000..177732726 --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Observables/Models/ObserveDataBase.cs @@ -0,0 +1,6 @@ +namespace BotSharp.Abstraction.Observables.Models; + +public abstract class ObserveDataBase +{ + public IServiceProvider ServiceProvider { get; set; } = null!; +} diff --git a/src/Infrastructure/BotSharp.Core/Conversations/ConversationPlugin.cs b/src/Infrastructure/BotSharp.Core/Conversations/ConversationPlugin.cs index 13ee1de62..e8e18eb84 100644 --- a/src/Infrastructure/BotSharp.Core/Conversations/ConversationPlugin.cs +++ b/src/Infrastructure/BotSharp.Core/Conversations/ConversationPlugin.cs @@ -10,6 +10,7 @@ using BotSharp.Core.Routing.Reasoning; using BotSharp.Core.Templating; using BotSharp.Core.Translation; +using BotSharp.Core.Observables.Queues; using Microsoft.Extensions.Configuration; namespace BotSharp.Core.Conversations; @@ -41,6 +42,8 @@ public void RegisterDI(IServiceCollection services, IConfiguration config) return settingService.Bind("GoogleApi"); }); + services.AddSingleton(); + services.AddScoped(); services.AddScoped(); services.AddScoped(); diff --git a/src/Infrastructure/BotSharp.Core/Observables/Queues/MessageHub.cs b/src/Infrastructure/BotSharp.Core/Observables/Queues/MessageHub.cs new file mode 100644 index 000000000..ed6772b42 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core/Observables/Queues/MessageHub.cs @@ -0,0 +1,45 @@ +using BotSharp.Abstraction.Observables.Models; +using System.Reactive.Subjects; + +namespace BotSharp.Core.Observables.Queues; + +public class MessageHub +{ + private readonly ILogger _logger; + private readonly ISubject _observable = new Subject(); + public IObservable Events => _observable; + + public MessageHub(ILogger logger) + { + _logger = logger; + } + + /// + /// Push an item to the observers. + /// + /// + public void Push(HubObserveData item) + { + _logger.LogInformation($"Pushing item to observers: {item.Data.Content}"); + _observable.OnNext(item); + } + + /// + /// Send a complete notification to the observers. + /// This will stop the observers from receiving data. + /// + public void Complete() + { + _observable.OnCompleted(); + } + + /// + /// Send an error notification to the observers. + /// This will stop the observers from receiving data. + /// + /// + public void Error(Exception error) + { + _observable.OnError(error); + } +} diff --git a/src/Plugins/BotSharp.Plugin.ChatHub/ChatHubPlugin.cs b/src/Plugins/BotSharp.Plugin.ChatHub/ChatHubPlugin.cs index 725655fce..b1fb144f6 100644 --- a/src/Plugins/BotSharp.Plugin.ChatHub/ChatHubPlugin.cs +++ b/src/Plugins/BotSharp.Plugin.ChatHub/ChatHubPlugin.cs @@ -1,5 +1,8 @@ using BotSharp.Abstraction.Crontab; +using BotSharp.Core.Observables.Queues; using BotSharp.Plugin.ChatHub.Hooks; +using BotSharp.Plugin.ChatHub.Observers; +using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.Configuration; namespace BotSharp.Plugin.ChatHub; @@ -7,7 +10,7 @@ namespace BotSharp.Plugin.ChatHub; /// /// The dialogue channel connects users, AI assistants and customer service representatives. /// -public class ChatHubPlugin : IBotSharpPlugin +public class ChatHubPlugin : IBotSharpPlugin, IBotSharpAppPlugin { public string Id => "6e52d42d-1e23-406b-8599-36af36c83209"; public string Name => "Chat Hub"; @@ -28,4 +31,12 @@ public void RegisterDI(IServiceCollection services, IConfiguration config) services.AddScoped(); services.AddScoped(); } + + public void Configure(IApplicationBuilder app) + { + var services = app.ApplicationServices; + var queue = services.GetRequiredService(); + var logger = services.GetRequiredService>(); + queue.Events.Subscribe(new ChatHubObserver(logger)); + } } diff --git a/src/Plugins/BotSharp.Plugin.ChatHub/Observers/ChatHubObserver.cs b/src/Plugins/BotSharp.Plugin.ChatHub/Observers/ChatHubObserver.cs new file mode 100644 index 000000000..5699945f1 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.ChatHub/Observers/ChatHubObserver.cs @@ -0,0 +1,119 @@ +using BotSharp.Abstraction.Conversations.Dtos; +using BotSharp.Abstraction.Observables.Models; +using BotSharp.Abstraction.SideCar; +using BotSharp.Abstraction.Users.Dtos; +using BotSharp.Plugin.ChatHub.Hooks; +using Microsoft.AspNetCore.SignalR; + +namespace BotSharp.Plugin.ChatHub.Observers; + +public class ChatHubObserver : IObserver +{ + private readonly ILogger _logger; + private IServiceProvider _services; + private IUserIdentity _user; + + private const string RECEIVE_CLIENT_MESSAGE = "OnMessageReceivedFromClient"; + private const string GENERATE_SENDER_ACTION = "OnSenderActionGenerated"; + + public ChatHubObserver(ILogger logger) + { + _logger = logger; + } + + public void OnCompleted() + { + _logger.LogInformation($"{nameof(ChatHubObserver)} receives complete notification."); + } + + public void OnError(Exception error) + { + _logger.LogError(error, $"{nameof(ChatHubObserver)} receives error notification: {error.Message}"); + } + + public void OnNext(HubObserveData value) + { + _services = value.ServiceProvider; + _user = _services.GetRequiredService(); + + ReceiveMessage(value.Data).ConfigureAwait(false).GetAwaiter().GetResult(); + } + + private async Task ReceiveMessage(RoleDialogModel message) + { + if (!AllowSendingMessage()) return; + + var conv = _services.GetRequiredService(); + var userService = _services.GetRequiredService(); + var sender = await userService.GetMyProfile(); + + // Update console conversation UI for CSR + var model = new ChatResponseDto() + { + ConversationId = conv.ConversationId, + MessageId = message.MessageId, + Payload = message.Payload, + Text = !string.IsNullOrEmpty(message.SecondaryContent) ? message.SecondaryContent : message.Content, + Sender = UserDto.FromUser(sender) + }; + await ReceiveClientMessage(conv.ConversationId, model); + + // Send typing-on to client + var action = new ConversationSenderActionModel + { + ConversationId = conv.ConversationId, + SenderAction = SenderActionEnum.TypingOn + }; + + await GenerateSenderAction(conv.ConversationId, action); + } + + private async Task ReceiveClientMessage(string conversationId, ChatResponseDto model) + { + try + { + var settings = _services.GetRequiredService(); + var chatHub = _services.GetRequiredService>(); + + if (settings.EventDispatchBy == EventDispatchType.Group) + { + await chatHub.Clients.Group(conversationId).SendAsync(RECEIVE_CLIENT_MESSAGE, model); + } + else + { + await chatHub.Clients.User(_user.Id).SendAsync(RECEIVE_CLIENT_MESSAGE, model); + } + } + catch (Exception ex) + { + _logger.LogWarning(ex, $"Failed to receive assistant message in {nameof(ChatHubConversationHook)} (conversation id: {conversationId})"); + } + } + + private bool AllowSendingMessage() + { + var sidecar = _services.GetService(); + return sidecar == null || !sidecar.IsEnabled(); + } + + private async Task GenerateSenderAction(string conversationId, ConversationSenderActionModel action) + { + try + { + var settings = _services.GetRequiredService(); + var chatHub = _services.GetRequiredService>(); + if (settings.EventDispatchBy == EventDispatchType.Group) + { + await chatHub.Clients.Group(conversationId).SendAsync(GENERATE_SENDER_ACTION, action); + } + else + { + await chatHub.Clients.User(_user.Id).SendAsync(GENERATE_SENDER_ACTION, action); + } + } + catch (Exception ex) + { + _logger.LogWarning(ex, $"Failed to generate sender action in {nameof(ChatHubConversationHook)} (conversation id: {conversationId})"); + } + } +} From f4a74258c23edc595a81cabe185ea0252e8e3e8b Mon Sep 17 00:00:00 2001 From: Jicheng Lu <103353@smsassist.com> Date: Tue, 17 Jun 2025 17:57:57 -0500 Subject: [PATCH 02/14] temp save --- .../Conversations/IConversationService.cs | 7 +- .../Observables/Models/HubObserveData.cs | 1 + .../Routing/IRoutingService.cs | 1 + .../Services/ConversationService.Stream.cs | 86 ++++++++++++ .../Observables/Queues/MessageHub.cs | 1 - .../Routing/RoutingService.InstructStream.cs | 49 +++++++ .../Controllers/ConversationController.cs | 24 ++++ .../Observers/ChatHubObserver.cs | 125 ++++++++++++++---- .../Providers/Chat/ChatCompletionProvider.cs | 53 +++++++- 9 files changed, 309 insertions(+), 38 deletions(-) create mode 100644 src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationService.Stream.cs create mode 100644 src/Infrastructure/BotSharp.Core/Routing/RoutingService.InstructStream.cs diff --git a/src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationService.cs b/src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationService.cs index 656b397d8..fccab0fa5 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationService.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationService.cs @@ -36,10 +36,15 @@ public interface IConversationService /// Received the response from AI Agent /// Task SendMessage(string agentId, - RoleDialogModel lastDialog, + RoleDialogModel message, PostbackMessageModel? replyMessage, Func onResponseReceived); + + Task StreamMessage(string agentId, + RoleDialogModel lastDialog, + PostbackMessageModel? replyMessage); + List GetDialogHistory(int lastCount = 100, bool fromBreakpoint = true, IEnumerable? includeMessageTypes = null); Task CleanHistory(string agentId); diff --git a/src/Infrastructure/BotSharp.Abstraction/Observables/Models/HubObserveData.cs b/src/Infrastructure/BotSharp.Abstraction/Observables/Models/HubObserveData.cs index a6d10cce5..bf771cb4d 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Observables/Models/HubObserveData.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Observables/Models/HubObserveData.cs @@ -2,5 +2,6 @@ namespace BotSharp.Abstraction.Observables.Models; public class HubObserveData : ObserveDataBase { + public string EventName { get; set; } = null!; public RoleDialogModel Data { get; set; } = null!; } diff --git a/src/Infrastructure/BotSharp.Abstraction/Routing/IRoutingService.cs b/src/Infrastructure/BotSharp.Abstraction/Routing/IRoutingService.cs index 97be88741..5dbf8d3fd 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Routing/IRoutingService.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Routing/IRoutingService.cs @@ -41,6 +41,7 @@ public interface IRoutingService /// /// Task InstructDirect(Agent agent, RoleDialogModel message, List dialogs); + Task InstructStream(Agent agent, RoleDialogModel message, List dialogs); Task GetConversationContent(List dialogs, int maxDialogCount = 100); diff --git a/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationService.Stream.cs b/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationService.Stream.cs new file mode 100644 index 000000000..f650a28f5 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationService.Stream.cs @@ -0,0 +1,86 @@ +using BotSharp.Abstraction.Hooks; +using BotSharp.Abstraction.Infrastructures.Enums; +using BotSharp.Abstraction.Routing.Enums; +using BotSharp.Abstraction.Routing.Settings; + +namespace BotSharp.Core.Conversations.Services; + +public partial class ConversationService +{ + public async Task StreamMessage(string agentId, + RoleDialogModel message, + PostbackMessageModel? replyMessage) + { + var conversation = await GetConversationRecordOrCreateNew(agentId); + var agentService = _services.GetRequiredService(); + Agent agent = await agentService.LoadAgent(agentId); + + var content = $"Received [{agent.Name}] {message.Role}: {message.Content}"; + _logger.LogInformation(content); + + message.CurrentAgentId = agent.Id; + if (string.IsNullOrEmpty(message.SenderId)) + { + message.SenderId = _user.Id; + } + + var conv = _services.GetRequiredService(); + var dialogs = conv.GetDialogHistory(); + + var statistics = _services.GetRequiredService(); + + RoleDialogModel response = message; + bool stopCompletion = false; + + // Enqueue receiving agent first in case it stop completion by OnMessageReceived + var routing = _services.GetRequiredService(); + routing.Context.SetMessageId(_conversationId, message.MessageId); + + // Save payload in order to assign the payload before hook is invoked + if (replyMessage != null && !string.IsNullOrEmpty(replyMessage.Payload)) + { + message.Payload = replyMessage.Payload; + } + + var hooks = _services.GetHooksOrderByPriority(message.CurrentAgentId); + foreach (var hook in hooks) + { + hook.SetAgent(agent) + .SetConversation(conversation); + + if (replyMessage == null || string.IsNullOrEmpty(replyMessage.FunctionName)) + { + await hook.OnMessageReceived(message); + } + else + { + await hook.OnPostbackMessageReceived(message, replyMessage); + } + + // Interrupted by hook + if (message.StopCompletion) + { + stopCompletion = true; + routing.Context.Pop(); + break; + } + } + + if (!stopCompletion) + { + // Routing with reasoning + var settings = _services.GetRequiredService(); + + // reload agent in case it has been changed by hook + if (message.CurrentAgentId != agent.Id) + { + agent = await agentService.LoadAgent(message.CurrentAgentId); + } + + await routing.InstructStream(agent, message, dialogs); + routing.Context.ResetRecursiveCounter(); + } + + return true; + } +} diff --git a/src/Infrastructure/BotSharp.Core/Observables/Queues/MessageHub.cs b/src/Infrastructure/BotSharp.Core/Observables/Queues/MessageHub.cs index ed6772b42..9950a613b 100644 --- a/src/Infrastructure/BotSharp.Core/Observables/Queues/MessageHub.cs +++ b/src/Infrastructure/BotSharp.Core/Observables/Queues/MessageHub.cs @@ -20,7 +20,6 @@ public MessageHub(ILogger logger) /// public void Push(HubObserveData item) { - _logger.LogInformation($"Pushing item to observers: {item.Data.Content}"); _observable.OnNext(item); } diff --git a/src/Infrastructure/BotSharp.Core/Routing/RoutingService.InstructStream.cs b/src/Infrastructure/BotSharp.Core/Routing/RoutingService.InstructStream.cs new file mode 100644 index 000000000..08c72be1c --- /dev/null +++ b/src/Infrastructure/BotSharp.Core/Routing/RoutingService.InstructStream.cs @@ -0,0 +1,49 @@ +namespace BotSharp.Core.Routing; + +public partial class RoutingService +{ + public async Task InstructStream(Agent agent, RoleDialogModel message, List dialogs) + { + var conv = _services.GetRequiredService(); + var storage = _services.GetRequiredService(); + storage.Append(conv.ConversationId, message); + + dialogs.Add(message); + Context.SetDialogs(dialogs); + + var routing = _services.GetRequiredService(); + routing.Context.Push(agent.Id, "instruct directly"); + var agentId = routing.Context.GetCurrentAgentId(); + + // Update next action agent's name + var agentService = _services.GetRequiredService(); + + if (agent.Disabled) + { + var content = $"This agent ({agent.Name}) is disabled, please install the corresponding plugin ({agent.Plugin.Name}) to activate this agent."; + + message = RoleDialogModel.From(message, role: AgentRole.Assistant, content: content); + dialogs.Add(message); + } + else + { + var provider = agent.LlmConfig.Provider; + var model = agent.LlmConfig.Model; + + if (provider == null || model == null) + { + var agentSettings = _services.GetRequiredService(); + provider = agentSettings.LlmConfig.Provider; + model = agentSettings.LlmConfig.Model; + } + + var chatCompletion = CompletionProvider.GetChatCompletion(_services, + provider: provider, + model: model); + + await chatCompletion.GetChatCompletionsStreamingAsync(agent, dialogs, async data => { }); + } + + return true; + } +} diff --git a/src/Infrastructure/BotSharp.OpenAPI/Controllers/ConversationController.cs b/src/Infrastructure/BotSharp.OpenAPI/Controllers/ConversationController.cs index d3da550ff..a3cedc1e1 100644 --- a/src/Infrastructure/BotSharp.OpenAPI/Controllers/ConversationController.cs +++ b/src/Infrastructure/BotSharp.OpenAPI/Controllers/ConversationController.cs @@ -377,6 +377,30 @@ await conv.SendMessage(agentId, inputMsg, return response; } + + [HttpPost("/conversation/{agentId}/{conversationId}/stream")] + public async Task StreamMessage( + [FromRoute] string agentId, + [FromRoute] string conversationId, + [FromBody] NewMessageModel input) + { + var conv = _services.GetRequiredService(); + var inputMsg = new RoleDialogModel(AgentRole.User, input.Text) + { + MessageId = !string.IsNullOrWhiteSpace(input.InputMessageId) ? input.InputMessageId : Guid.NewGuid().ToString(), + CreatedAt = DateTime.UtcNow + }; + + var routing = _services.GetRequiredService(); + routing.Context.SetMessageId(conversationId, inputMsg.MessageId); + + conv.SetConversationId(conversationId, input.States); + SetStates(conv, input); + + await conv.StreamMessage(agentId, inputMsg, replyMessage: input.Postback); + } + + [HttpPost("/conversation/{agentId}/{conversationId}/sse")] public async Task SendMessageSse([FromRoute] string agentId, [FromRoute] string conversationId, [FromBody] NewMessageModel input) { diff --git a/src/Plugins/BotSharp.Plugin.ChatHub/Observers/ChatHubObserver.cs b/src/Plugins/BotSharp.Plugin.ChatHub/Observers/ChatHubObserver.cs index 5699945f1..dcf332b01 100644 --- a/src/Plugins/BotSharp.Plugin.ChatHub/Observers/ChatHubObserver.cs +++ b/src/Plugins/BotSharp.Plugin.ChatHub/Observers/ChatHubObserver.cs @@ -1,7 +1,6 @@ using BotSharp.Abstraction.Conversations.Dtos; using BotSharp.Abstraction.Observables.Models; using BotSharp.Abstraction.SideCar; -using BotSharp.Abstraction.Users.Dtos; using BotSharp.Plugin.ChatHub.Hooks; using Microsoft.AspNetCore.SignalR; @@ -11,9 +10,10 @@ public class ChatHubObserver : IObserver { private readonly ILogger _logger; private IServiceProvider _services; - private IUserIdentity _user; - private const string RECEIVE_CLIENT_MESSAGE = "OnMessageReceivedFromClient"; + private const string BEFORE_RECEIVE_LLM_STREAM_MESSAGE = "BeforeReceiveLlmStreamMessage"; + private const string ON_RECEIVE_LLM_STREAM_MESSAGE = "OnReceiveLlmStreamMessage"; + private const string AFTER_RECEIVE_LLM_STREAM_MESSAGE = "AfterReceiveLlmStreamMessage"; private const string GENERATE_SENDER_ACTION = "OnSenderActionGenerated"; public ChatHubObserver(ILogger logger) @@ -34,41 +34,106 @@ public void OnError(Exception error) public void OnNext(HubObserveData value) { _services = value.ServiceProvider; - _user = _services.GetRequiredService(); - - ReceiveMessage(value.Data).ConfigureAwait(false).GetAwaiter().GetResult(); + + var message = value.Data; + var model = new ChatResponseDto(); + if (value.EventName == BEFORE_RECEIVE_LLM_STREAM_MESSAGE + || value.EventName == AFTER_RECEIVE_LLM_STREAM_MESSAGE) + { + var conv = _services.GetRequiredService(); + model = new ChatResponseDto() + { + ConversationId = conv.ConversationId, + MessageId = message.MessageId, + Text = string.Empty, + Sender = new() + { + FirstName = "AI", + LastName = "Assistant", + Role = AgentRole.Assistant + } + }; + + var action = new ConversationSenderActionModel + { + ConversationId = conv.ConversationId, + SenderAction = value.EventName == BEFORE_RECEIVE_LLM_STREAM_MESSAGE ? SenderActionEnum.TypingOn : SenderActionEnum.TypingOff + }; + + GenerateSenderAction(conv.ConversationId, action).ConfigureAwait(false).GetAwaiter().GetResult(); + } + else if (value.EventName == AFTER_RECEIVE_LLM_STREAM_MESSAGE) + { + //var conv = _services.GetRequiredService(); + //model = new ChatResponseDto() + //{ + // ConversationId = conv.ConversationId, + // MessageId = message.MessageId, + // Text = string.Empty, + // Sender = new() + // { + // FirstName = "AI", + // LastName = "Assistant", + // Role = AgentRole.Assistant + // } + //}; + + //var action = new ConversationSenderActionModel + //{ + // ConversationId = conv.ConversationId, + // SenderAction = SenderActionEnum.TypingOff + //}; + + //GenerateSenderAction(conv.ConversationId, action).ConfigureAwait(false).GetAwaiter().GetResult(); + + //var storage = _services.GetRequiredService(); + //storage.Append(conv.ConversationId, message); + } + else if (value.EventName == ON_RECEIVE_LLM_STREAM_MESSAGE) + { + var conv = _services.GetRequiredService(); + model = new ChatResponseDto() + { + ConversationId = conv.ConversationId, + MessageId = message.MessageId, + Text = !string.IsNullOrEmpty(message.SecondaryContent) ? message.SecondaryContent : message.Content, + Function = message.FunctionName, + RichContent = message.SecondaryRichContent ?? message.RichContent, + Data = message.Data, + Sender = new() + { + FirstName = "AI", + LastName = "Assistant", + Role = AgentRole.Assistant + } + }; + } + + OnReceiveAssistantMessage(value.EventName, model.ConversationId, model).ConfigureAwait(false).GetAwaiter().GetResult(); } - private async Task ReceiveMessage(RoleDialogModel message) + private async Task ReceiveLlmStreamResponse(RoleDialogModel message) { - if (!AllowSendingMessage()) return; - var conv = _services.GetRequiredService(); - var userService = _services.GetRequiredService(); - var sender = await userService.GetMyProfile(); - - // Update console conversation UI for CSR var model = new ChatResponseDto() { ConversationId = conv.ConversationId, MessageId = message.MessageId, - Payload = message.Payload, Text = !string.IsNullOrEmpty(message.SecondaryContent) ? message.SecondaryContent : message.Content, - Sender = UserDto.FromUser(sender) - }; - await ReceiveClientMessage(conv.ConversationId, model); - - // Send typing-on to client - var action = new ConversationSenderActionModel - { - ConversationId = conv.ConversationId, - SenderAction = SenderActionEnum.TypingOn + Function = message.FunctionName, + RichContent = message.SecondaryRichContent ?? message.RichContent, + Data = message.Data, + Sender = new() + { + FirstName = "AI", + LastName = "Assistant", + Role = AgentRole.Assistant + } }; - - await GenerateSenderAction(conv.ConversationId, action); + await OnReceiveAssistantMessage(ON_RECEIVE_LLM_STREAM_MESSAGE, conv.ConversationId, model); } - private async Task ReceiveClientMessage(string conversationId, ChatResponseDto model) + private async Task OnReceiveAssistantMessage(string @event, string conversationId, ChatResponseDto model) { try { @@ -77,11 +142,12 @@ private async Task ReceiveClientMessage(string conversationId, ChatResponseDto m if (settings.EventDispatchBy == EventDispatchType.Group) { - await chatHub.Clients.Group(conversationId).SendAsync(RECEIVE_CLIENT_MESSAGE, model); + await chatHub.Clients.Group(conversationId).SendAsync(@event, model); } else { - await chatHub.Clients.User(_user.Id).SendAsync(RECEIVE_CLIENT_MESSAGE, model); + var user = _services.GetRequiredService(); + await chatHub.Clients.User(user.Id).SendAsync(@event, model); } } catch (Exception ex) @@ -108,7 +174,8 @@ private async Task GenerateSenderAction(string conversationId, ConversationSende } else { - await chatHub.Clients.User(_user.Id).SendAsync(GENERATE_SENDER_ACTION, action); + var user = _services.GetRequiredService(); + await chatHub.Clients.User(user.Id).SendAsync(GENERATE_SENDER_ACTION, action); } } catch (Exception ex) diff --git a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs index 17ee10124..83a96266d 100644 --- a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs @@ -1,4 +1,6 @@ using BotSharp.Abstraction.Hooks; +using BotSharp.Core.Observables.Queues; +using ModelContextProtocol.Protocol.Types; using OpenAI.Chat; namespace BotSharp.Plugin.OpenAI.Providers.Chat; @@ -185,7 +187,20 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, List(); var response = chatClient.CompleteChatStreamingAsync(messages, options); + var messageId = conversations.LastOrDefault()?.MessageId ?? string.Empty; + + hub.Push(new() + { + ServiceProvider = _services, + EventName = "BeforeReceiveLlmStreamMessage", + Data = new RoleDialogModel(AgentRole.Assistant, string.Empty) + { + CurrentAgentId = agent.Id, + MessageId = messageId + } + }); await foreach (var choice in response) { @@ -194,23 +209,47 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, List Date: Tue, 17 Jun 2025 22:23:43 -0500 Subject: [PATCH 03/14] refine stream --- .../Observers/ChatHubObserver.cs | 78 +++++++------------ .../Providers/Chat/ChatCompletionProvider.cs | 4 +- 2 files changed, 32 insertions(+), 50 deletions(-) diff --git a/src/Plugins/BotSharp.Plugin.ChatHub/Observers/ChatHubObserver.cs b/src/Plugins/BotSharp.Plugin.ChatHub/Observers/ChatHubObserver.cs index dcf332b01..c7a852fa6 100644 --- a/src/Plugins/BotSharp.Plugin.ChatHub/Observers/ChatHubObserver.cs +++ b/src/Plugins/BotSharp.Plugin.ChatHub/Observers/ChatHubObserver.cs @@ -23,7 +23,7 @@ public ChatHubObserver(ILogger logger) public void OnCompleted() { - _logger.LogInformation($"{nameof(ChatHubObserver)} receives complete notification."); + _logger.LogWarning($"{nameof(ChatHubObserver)} receives complete notification."); } public void OnError(Exception error) @@ -35,10 +35,11 @@ public void OnNext(HubObserveData value) { _services = value.ServiceProvider; + if (!AllowSendingMessage()) return; + var message = value.Data; var model = new ChatResponseDto(); - if (value.EventName == BEFORE_RECEIVE_LLM_STREAM_MESSAGE - || value.EventName == AFTER_RECEIVE_LLM_STREAM_MESSAGE) + if (value.EventName == BEFORE_RECEIVE_LLM_STREAM_MESSAGE) { var conv = _services.GetRequiredService(); model = new ChatResponseDto() @@ -57,37 +58,37 @@ public void OnNext(HubObserveData value) var action = new ConversationSenderActionModel { ConversationId = conv.ConversationId, - SenderAction = value.EventName == BEFORE_RECEIVE_LLM_STREAM_MESSAGE ? SenderActionEnum.TypingOn : SenderActionEnum.TypingOff + SenderAction = SenderActionEnum.TypingOn }; GenerateSenderAction(conv.ConversationId, action).ConfigureAwait(false).GetAwaiter().GetResult(); } else if (value.EventName == AFTER_RECEIVE_LLM_STREAM_MESSAGE) { - //var conv = _services.GetRequiredService(); - //model = new ChatResponseDto() - //{ - // ConversationId = conv.ConversationId, - // MessageId = message.MessageId, - // Text = string.Empty, - // Sender = new() - // { - // FirstName = "AI", - // LastName = "Assistant", - // Role = AgentRole.Assistant - // } - //}; - - //var action = new ConversationSenderActionModel - //{ - // ConversationId = conv.ConversationId, - // SenderAction = SenderActionEnum.TypingOff - //}; - - //GenerateSenderAction(conv.ConversationId, action).ConfigureAwait(false).GetAwaiter().GetResult(); - - //var storage = _services.GetRequiredService(); - //storage.Append(conv.ConversationId, message); + var conv = _services.GetRequiredService(); + model = new ChatResponseDto() + { + ConversationId = conv.ConversationId, + MessageId = message.MessageId, + Text = message.Content, + Sender = new() + { + FirstName = "AI", + LastName = "Assistant", + Role = AgentRole.Assistant + } + }; + + var action = new ConversationSenderActionModel + { + ConversationId = conv.ConversationId, + SenderAction = SenderActionEnum.TypingOff + }; + + GenerateSenderAction(conv.ConversationId, action).ConfigureAwait(false).GetAwaiter().GetResult(); + + var storage = _services.GetRequiredService(); + storage.Append(conv.ConversationId, message); } else if (value.EventName == ON_RECEIVE_LLM_STREAM_MESSAGE) { @@ -112,27 +113,6 @@ public void OnNext(HubObserveData value) OnReceiveAssistantMessage(value.EventName, model.ConversationId, model).ConfigureAwait(false).GetAwaiter().GetResult(); } - private async Task ReceiveLlmStreamResponse(RoleDialogModel message) - { - var conv = _services.GetRequiredService(); - var model = new ChatResponseDto() - { - ConversationId = conv.ConversationId, - MessageId = message.MessageId, - Text = !string.IsNullOrEmpty(message.SecondaryContent) ? message.SecondaryContent : message.Content, - Function = message.FunctionName, - RichContent = message.SecondaryRichContent ?? message.RichContent, - Data = message.Data, - Sender = new() - { - FirstName = "AI", - LastName = "Assistant", - Role = AgentRole.Assistant - } - }; - await OnReceiveAssistantMessage(ON_RECEIVE_LLM_STREAM_MESSAGE, conv.ConversationId, model); - } - private async Task OnReceiveAssistantMessage(string @event, string conversationId, ChatResponseDto model) { try diff --git a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs index 83a96266d..67ac7c66d 100644 --- a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs @@ -190,6 +190,7 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, List(); var response = chatClient.CompleteChatStreamingAsync(messages, options); var messageId = conversations.LastOrDefault()?.MessageId ?? string.Empty; + var allText = string.Empty; hub.Push(new() { @@ -219,6 +220,7 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, List GetChatCompletionsStreamingAsync(Agent agent, List Date: Wed, 18 Jun 2025 15:08:52 -0500 Subject: [PATCH 04/14] temp save --- .../Providers/Chat/ChatCompletionProvider.cs | 45 +++++++++---------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs index 67ac7c66d..e3ea89fec 100644 --- a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs @@ -208,37 +208,36 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, List Date: Wed, 18 Jun 2025 16:42:58 -0500 Subject: [PATCH 05/14] init tool calling --- .../Streams/RealtimeTextStream.cs} | 15 +++++-- .../Realtime/RealTimeCompletionProvider.cs | 7 +-- .../Providers/Chat/ChatCompletionProvider.cs | 44 ++++++++++++------- 3 files changed, 42 insertions(+), 24 deletions(-) rename src/{Plugins/BotSharp.Plugin.GoogleAI/Models/Realtime/RealtimeTranscriptionResponse.cs => Infrastructure/BotSharp.Core/Infrastructures/Streams/RealtimeTextStream.cs} (81%) diff --git a/src/Plugins/BotSharp.Plugin.GoogleAI/Models/Realtime/RealtimeTranscriptionResponse.cs b/src/Infrastructure/BotSharp.Core/Infrastructures/Streams/RealtimeTextStream.cs similarity index 81% rename from src/Plugins/BotSharp.Plugin.GoogleAI/Models/Realtime/RealtimeTranscriptionResponse.cs rename to src/Infrastructure/BotSharp.Core/Infrastructures/Streams/RealtimeTextStream.cs index 0a383c80a..e041a53b5 100644 --- a/src/Plugins/BotSharp.Plugin.GoogleAI/Models/Realtime/RealtimeTranscriptionResponse.cs +++ b/src/Infrastructure/BotSharp.Core/Infrastructures/Streams/RealtimeTextStream.cs @@ -1,12 +1,12 @@ using System.IO; -namespace BotSharp.Plugin.GoogleAI.Models.Realtime; +namespace BotSharp.Core.Infrastructures.Streams; -internal class RealtimeTranscriptionResponse : IDisposable +public class RealtimeTextStream : IDisposable { - public RealtimeTranscriptionResponse() + public RealtimeTextStream() { - + } private bool _disposed = false; @@ -20,6 +20,13 @@ public Stream? ContentStream } } + public long Length => _contentStream.Length; + + public bool IsNullOrEmpty() + { + return _contentStream == null || Length == 0; + } + public void Collect(string text) { if (_disposed) return; diff --git a/src/Plugins/BotSharp.Plugin.GoogleAI/Providers/Realtime/RealTimeCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.GoogleAI/Providers/Realtime/RealTimeCompletionProvider.cs index 2e95cfa24..f262efd72 100644 --- a/src/Plugins/BotSharp.Plugin.GoogleAI/Providers/Realtime/RealTimeCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.GoogleAI/Providers/Realtime/RealTimeCompletionProvider.cs @@ -1,11 +1,12 @@ -using System.Threading; using BotSharp.Abstraction.Hooks; using BotSharp.Abstraction.Realtime.Models.Session; +using BotSharp.Core.Infrastructures.Streams; using BotSharp.Core.Session; using BotSharp.Plugin.GoogleAI.Models.Realtime; using GenerativeAI; using GenerativeAI.Types; using GenerativeAI.Types.Converters; +using System.Threading; namespace BotSharp.Plugin.GoogleAi.Providers.Realtime; @@ -33,8 +34,8 @@ public class GoogleRealTimeProvider : IRealTimeCompletion UnknownTypeHandling = JsonUnknownTypeHandling.JsonElement }; - private RealtimeTranscriptionResponse _inputStream = new(); - private RealtimeTranscriptionResponse _outputStream = new(); + private RealtimeTextStream _inputStream = new(); + private RealtimeTextStream _outputStream = new(); private bool _isBlocking = false; private RealtimeHubConnection _conn; diff --git a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs index e3ea89fec..8643ebc21 100644 --- a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs @@ -1,7 +1,10 @@ using BotSharp.Abstraction.Hooks; +using BotSharp.Core.Infrastructures.Streams; using BotSharp.Core.Observables.Queues; +using EntityFrameworkCore.BootKit; using ModelContextProtocol.Protocol.Types; using OpenAI.Chat; +using static Microsoft.EntityFrameworkCore.DbLoggerCategory; namespace BotSharp.Plugin.OpenAI.Providers.Chat; @@ -188,9 +191,7 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, List(); - var response = chatClient.CompleteChatStreamingAsync(messages, options); var messageId = conversations.LastOrDefault()?.MessageId ?? string.Empty; - var allText = string.Empty; hub.Push(new() { @@ -203,23 +204,30 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, List(); + + await foreach (var choice in chatClient.CompleteChatStreamingAsync(messages, options)) { + if (choice.ToolCallUpdates != null) + { + toolCalls.AddRange(choice.ToolCallUpdates); + } + if (choice.FinishReason == ChatFinishReason.FunctionCall || choice.FinishReason == ChatFinishReason.ToolCalls) { - var update = choice.ToolCallUpdates?.FirstOrDefault()?.FunctionArgumentsUpdate?.ToString() ?? string.Empty; - _logger.LogCritical($"Tool Call (reason: {choice.FinishReason}): {update}"); + var functionName = toolCalls.FirstOrDefault(x => !string.IsNullOrEmpty(x.FunctionName))?.FunctionName; + var args = toolCalls.Where(x => x.FunctionArgumentsUpdate != null).Select(x => x.FunctionArgumentsUpdate.ToString()).ToList(); + var functionArgument = string.Join(string.Empty, args); - //await onMessageReceived(new RoleDialogModel(AgentRole.Assistant, update) - //{ - // //RenderedInstruction = string.Join("\r\n", renderedInstructions) - //}); + _logger.LogCritical($"Tool Call: {functionName}({functionArgument})"); } else if (!choice.ContentUpdate.IsNullOrEmpty()) { var text = choice.ContentUpdate[0]?.Text ?? string.Empty; - allText += text; - _logger.LogCritical($"Content update (reason: {choice.FinishReason}) {text}"); + textStream.Collect(text); + + _logger.LogCritical($"Content update: {text}"); var content = new RoleDialogModel(AgentRole.Assistant, text) { @@ -232,11 +240,6 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, List GetChatCompletionsStreamingAsync(Agent agent, List ToolCalls { get; set; } = []; } \ No newline at end of file From 0ef474aea5dc8c430596b796c1097686f9af5e7b Mon Sep 17 00:00:00 2001 From: Jicheng Lu <103353@smsassist.com> Date: Wed, 18 Jun 2025 16:45:42 -0500 Subject: [PATCH 06/14] get toolcall id --- .../Providers/Chat/ChatCompletionProvider.cs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs index 8643ebc21..f9b7df133 100644 --- a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs @@ -216,11 +216,13 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, List !string.IsNullOrEmpty(x.FunctionName))?.FunctionName; + var meta = toolCalls.FirstOrDefault(x => !string.IsNullOrEmpty(x.FunctionName)); + var functionName = meta?.FunctionName; + var toolCallId = meta?.ToolCallId; var args = toolCalls.Where(x => x.FunctionArgumentsUpdate != null).Select(x => x.FunctionArgumentsUpdate.ToString()).ToList(); var functionArgument = string.Join(string.Empty, args); - _logger.LogCritical($"Tool Call: {functionName}({functionArgument})"); + _logger.LogCritical($"Tool Call (id: {toolCallId}) => {functionName}({functionArgument})"); } else if (!choice.ContentUpdate.IsNullOrEmpty()) { From d62340732dff245e85897730b4851020ec5ae7de Mon Sep 17 00:00:00 2001 From: Jicheng Lu Date: Thu, 19 Jun 2025 12:20:12 -0500 Subject: [PATCH 07/14] refine streaming --- .../Providers/Chat/ChatCompletionProvider.cs | 60 +++++++++++++------ 1 file changed, 43 insertions(+), 17 deletions(-) diff --git a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs index f9b7df133..af207e121 100644 --- a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs @@ -2,9 +2,12 @@ using BotSharp.Core.Infrastructures.Streams; using BotSharp.Core.Observables.Queues; using EntityFrameworkCore.BootKit; +using Fluid; using ModelContextProtocol.Protocol.Types; using OpenAI.Chat; +using System.Xml; using static Microsoft.EntityFrameworkCore.DbLoggerCategory; +using static System.Net.Mime.MediaTypeNames; namespace BotSharp.Plugin.OpenAI.Providers.Chat; @@ -206,25 +209,20 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, List(); + var responseMessage = new RoleDialogModel(AgentRole.Assistant, string.Empty) + { + CurrentAgentId = agent.Id, + MessageId = messageId + }; await foreach (var choice in chatClient.CompleteChatStreamingAsync(messages, options)) { - if (choice.ToolCallUpdates != null) + if (!choice.ToolCallUpdates.IsNullOrEmpty()) { toolCalls.AddRange(choice.ToolCallUpdates); } - if (choice.FinishReason == ChatFinishReason.FunctionCall || choice.FinishReason == ChatFinishReason.ToolCalls) - { - var meta = toolCalls.FirstOrDefault(x => !string.IsNullOrEmpty(x.FunctionName)); - var functionName = meta?.FunctionName; - var toolCallId = meta?.ToolCallId; - var args = toolCalls.Where(x => x.FunctionArgumentsUpdate != null).Select(x => x.FunctionArgumentsUpdate.ToString()).ToList(); - var functionArgument = string.Join(string.Empty, args); - - _logger.LogCritical($"Tool Call (id: {toolCallId}) => {functionName}({functionArgument})"); - } - else if (!choice.ContentUpdate.IsNullOrEmpty()) + if (!choice.ContentUpdate.IsNullOrEmpty()) { var text = choice.ContentUpdate[0]?.Text ?? string.Empty; textStream.Collect(text); @@ -243,17 +241,45 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, List !string.IsNullOrEmpty(x.FunctionName)); + var functionName = meta?.FunctionName; + var toolCallId = meta?.ToolCallId; + var args = toolCalls.Where(x => x.FunctionArgumentsUpdate != null).Select(x => x.FunctionArgumentsUpdate.ToString()).ToList(); + var functionArgument = string.Join(string.Empty, args); + + _logger.LogCritical($"Tool Call (id: {toolCallId}) => {functionName}({functionArgument})"); + + responseMessage = new RoleDialogModel(AgentRole.Function, string.Empty) + { + CurrentAgentId = agent.Id, + MessageId = messageId, + ToolCallId = toolCallId, + FunctionName = functionName, + FunctionArgs = functionArgument + }; + + } + else if (choice.FinishReason.HasValue) + { + var allText = textStream.GetText(); + _logger.LogCritical($"Text Content: {allText}"); + + responseMessage = new RoleDialogModel(AgentRole.Assistant, allText) + { + CurrentAgentId = agent.Id, + MessageId = messageId + }; + } } hub.Push(new() { ServiceProvider = _services, EventName = "AfterReceiveLlmStreamMessage", - Data = new RoleDialogModel(AgentRole.Assistant, textStream.GetText()) - { - CurrentAgentId = agent.Id, - MessageId = messageId - } + Data = responseMessage }); return true; From 5551561a07f8b3b1953e185723c09f23bd639bdd Mon Sep 17 00:00:00 2001 From: Jicheng Lu <103353@smsassist.com> Date: Fri, 20 Jun 2025 14:57:16 -0500 Subject: [PATCH 08/14] change method name --- .../BotSharp.OpenAPI/Controllers/ConversationController.cs | 2 +- .../Providers/Chat/ChatCompletionProvider.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Infrastructure/BotSharp.OpenAPI/Controllers/ConversationController.cs b/src/Infrastructure/BotSharp.OpenAPI/Controllers/ConversationController.cs index a3cedc1e1..0552b19f6 100644 --- a/src/Infrastructure/BotSharp.OpenAPI/Controllers/ConversationController.cs +++ b/src/Infrastructure/BotSharp.OpenAPI/Controllers/ConversationController.cs @@ -379,7 +379,7 @@ await conv.SendMessage(agentId, inputMsg, [HttpPost("/conversation/{agentId}/{conversationId}/stream")] - public async Task StreamMessage( + public async Task SendMessageStream( [FromRoute] string agentId, [FromRoute] string conversationId, [FromBody] NewMessageModel input) diff --git a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs index af207e121..38d1455ed 100644 --- a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs @@ -242,7 +242,7 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, List !string.IsNullOrEmpty(x.FunctionName)); var functionName = meta?.FunctionName; From 7008667a1ae7042eb919406480b1f4c97ccdde7e Mon Sep 17 00:00:00 2001 From: Jicheng Lu <103353@smsassist.com> Date: Mon, 23 Jun 2025 21:43:25 -0500 Subject: [PATCH 09/14] support invoke function --- .../Conversations/IConversationService.cs | 5 -- .../Conversations/Models/RoleDialogModel.cs | 6 +- .../Settings/ConversationSetting.cs | 1 + .../MLTasks/IChatCompletion.cs | 5 +- .../Routing/IRoutingService.cs | 3 +- .../Services/ConversationService.Stream.cs | 86 ------------------- .../Demo/Functions/GetWeatherFn.cs | 2 +- .../Routing/Reasoning/InstructExecutor.cs | 3 +- .../Routing/RoutingService.InstructStream.cs | 49 ----------- .../Routing/RoutingService.InvokeAgent.cs | 20 +++-- .../BotSharp.Core/Routing/RoutingService.cs | 3 +- .../Controllers/ConversationController.cs | 23 ----- .../Hooks/ChatHubConversationHook.cs | 2 +- .../Observers/ChatHubObserver.cs | 42 ++++----- .../Providers/Chat/ChatCompletionProvider.cs | 17 ++-- .../BotSharp.LLM.Tests/ChatCompletionTests.cs | 14 +-- 16 files changed, 67 insertions(+), 214 deletions(-) delete mode 100644 src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationService.Stream.cs delete mode 100644 src/Infrastructure/BotSharp.Core/Routing/RoutingService.InstructStream.cs diff --git a/src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationService.cs b/src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationService.cs index fccab0fa5..322d86b6f 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationService.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationService.cs @@ -40,11 +40,6 @@ Task SendMessage(string agentId, PostbackMessageModel? replyMessage, Func onResponseReceived); - - Task StreamMessage(string agentId, - RoleDialogModel lastDialog, - PostbackMessageModel? replyMessage); - List GetDialogHistory(int lastCount = 100, bool fromBreakpoint = true, IEnumerable? includeMessageTypes = null); Task CleanHistory(string agentId); diff --git a/src/Infrastructure/BotSharp.Abstraction/Conversations/Models/RoleDialogModel.cs b/src/Infrastructure/BotSharp.Abstraction/Conversations/Models/RoleDialogModel.cs index de2697595..6a8af2d62 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Conversations/Models/RoleDialogModel.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Conversations/Models/RoleDialogModel.cs @@ -117,6 +117,9 @@ public class RoleDialogModel : ITrackableMessage [JsonIgnore(Condition = JsonIgnoreCondition.Always)] public string RenderedInstruction { get; set; } = string.Empty; + [JsonIgnore(Condition = JsonIgnoreCondition.Always)] + public bool IsStreaming { get; set; } + private RoleDialogModel() { } @@ -159,7 +162,8 @@ public static RoleDialogModel From(RoleDialogModel source, Payload = source.Payload, StopCompletion = source.StopCompletion, Instruction = source.Instruction, - Data = source.Data + Data = source.Data, + IsStreaming = source.IsStreaming }; } } diff --git a/src/Infrastructure/BotSharp.Abstraction/Conversations/Settings/ConversationSetting.cs b/src/Infrastructure/BotSharp.Abstraction/Conversations/Settings/ConversationSetting.cs index c4f131dc7..980569dc4 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Conversations/Settings/ConversationSetting.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Conversations/Settings/ConversationSetting.cs @@ -12,6 +12,7 @@ public class ConversationSetting public bool EnableContentLog { get; set; } public bool EnableStateLog { get; set; } public bool EnableTranslationMemory { get; set; } + public bool EnableStreaming { get; set; } public CleanConversationSetting CleanSetting { get; set; } = new(); public RateLimitSetting RateLimit { get; set; } = new(); } diff --git a/src/Infrastructure/BotSharp.Abstraction/MLTasks/IChatCompletion.cs b/src/Infrastructure/BotSharp.Abstraction/MLTasks/IChatCompletion.cs index 7cf52fe47..4d93f307b 100644 --- a/src/Infrastructure/BotSharp.Abstraction/MLTasks/IChatCompletion.cs +++ b/src/Infrastructure/BotSharp.Abstraction/MLTasks/IChatCompletion.cs @@ -23,7 +23,6 @@ Task GetChatCompletionsAsync(Agent agent, Func onMessageReceived, Func onFunctionExecuting); - Task GetChatCompletionsStreamingAsync(Agent agent, - List conversations, - Func onMessageReceived); + Task GetChatCompletionsStreamingAsync(Agent agent, + List conversations) => Task.FromResult(new RoleDialogModel(AgentRole.Assistant, string.Empty)); } diff --git a/src/Infrastructure/BotSharp.Abstraction/Routing/IRoutingService.cs b/src/Infrastructure/BotSharp.Abstraction/Routing/IRoutingService.cs index 5dbf8d3fd..fb542d5f4 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Routing/IRoutingService.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Routing/IRoutingService.cs @@ -30,7 +30,7 @@ public interface IRoutingService //int GetRecursiveCounter(); //void SetRecursiveCounter(int counter); - Task InvokeAgent(string agentId, List dialogs); + Task InvokeAgent(string agentId, List dialogs, bool useStream = false); Task InvokeFunction(string name, RoleDialogModel messages); Task InstructLoop(Agent agent, RoleDialogModel message, List dialogs); @@ -41,7 +41,6 @@ public interface IRoutingService /// /// Task InstructDirect(Agent agent, RoleDialogModel message, List dialogs); - Task InstructStream(Agent agent, RoleDialogModel message, List dialogs); Task GetConversationContent(List dialogs, int maxDialogCount = 100); diff --git a/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationService.Stream.cs b/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationService.Stream.cs deleted file mode 100644 index f650a28f5..000000000 --- a/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationService.Stream.cs +++ /dev/null @@ -1,86 +0,0 @@ -using BotSharp.Abstraction.Hooks; -using BotSharp.Abstraction.Infrastructures.Enums; -using BotSharp.Abstraction.Routing.Enums; -using BotSharp.Abstraction.Routing.Settings; - -namespace BotSharp.Core.Conversations.Services; - -public partial class ConversationService -{ - public async Task StreamMessage(string agentId, - RoleDialogModel message, - PostbackMessageModel? replyMessage) - { - var conversation = await GetConversationRecordOrCreateNew(agentId); - var agentService = _services.GetRequiredService(); - Agent agent = await agentService.LoadAgent(agentId); - - var content = $"Received [{agent.Name}] {message.Role}: {message.Content}"; - _logger.LogInformation(content); - - message.CurrentAgentId = agent.Id; - if (string.IsNullOrEmpty(message.SenderId)) - { - message.SenderId = _user.Id; - } - - var conv = _services.GetRequiredService(); - var dialogs = conv.GetDialogHistory(); - - var statistics = _services.GetRequiredService(); - - RoleDialogModel response = message; - bool stopCompletion = false; - - // Enqueue receiving agent first in case it stop completion by OnMessageReceived - var routing = _services.GetRequiredService(); - routing.Context.SetMessageId(_conversationId, message.MessageId); - - // Save payload in order to assign the payload before hook is invoked - if (replyMessage != null && !string.IsNullOrEmpty(replyMessage.Payload)) - { - message.Payload = replyMessage.Payload; - } - - var hooks = _services.GetHooksOrderByPriority(message.CurrentAgentId); - foreach (var hook in hooks) - { - hook.SetAgent(agent) - .SetConversation(conversation); - - if (replyMessage == null || string.IsNullOrEmpty(replyMessage.FunctionName)) - { - await hook.OnMessageReceived(message); - } - else - { - await hook.OnPostbackMessageReceived(message, replyMessage); - } - - // Interrupted by hook - if (message.StopCompletion) - { - stopCompletion = true; - routing.Context.Pop(); - break; - } - } - - if (!stopCompletion) - { - // Routing with reasoning - var settings = _services.GetRequiredService(); - - // reload agent in case it has been changed by hook - if (message.CurrentAgentId != agent.Id) - { - agent = await agentService.LoadAgent(message.CurrentAgentId); - } - - await routing.InstructStream(agent, message, dialogs); - routing.Context.ResetRecursiveCounter(); - } - - return true; - } -} diff --git a/src/Infrastructure/BotSharp.Core/Demo/Functions/GetWeatherFn.cs b/src/Infrastructure/BotSharp.Core/Demo/Functions/GetWeatherFn.cs index a78ad4ec7..cb6bcd886 100644 --- a/src/Infrastructure/BotSharp.Core/Demo/Functions/GetWeatherFn.cs +++ b/src/Infrastructure/BotSharp.Core/Demo/Functions/GetWeatherFn.cs @@ -17,7 +17,7 @@ public GetWeatherFn(IServiceProvider services) public async Task Execute(RoleDialogModel message) { message.Content = $"It is a sunny day!"; - //message.StopCompletion = true; + message.StopCompletion = false; return true; } } \ No newline at end of file diff --git a/src/Infrastructure/BotSharp.Core/Routing/Reasoning/InstructExecutor.cs b/src/Infrastructure/BotSharp.Core/Routing/Reasoning/InstructExecutor.cs index c93e133ce..42a3d6528 100644 --- a/src/Infrastructure/BotSharp.Core/Routing/Reasoning/InstructExecutor.cs +++ b/src/Infrastructure/BotSharp.Core/Routing/Reasoning/InstructExecutor.cs @@ -57,7 +57,8 @@ await HookEmitter.Emit(_services, async hook => await hook.OnRouti } else { - var ret = await routing.InvokeAgent(agentId, dialogs); + var convSettings = _services.GetRequiredService(); + var ret = await routing.InvokeAgent(agentId, dialogs, convSettings.EnableStreaming); } var response = dialogs.Last(); diff --git a/src/Infrastructure/BotSharp.Core/Routing/RoutingService.InstructStream.cs b/src/Infrastructure/BotSharp.Core/Routing/RoutingService.InstructStream.cs deleted file mode 100644 index 08c72be1c..000000000 --- a/src/Infrastructure/BotSharp.Core/Routing/RoutingService.InstructStream.cs +++ /dev/null @@ -1,49 +0,0 @@ -namespace BotSharp.Core.Routing; - -public partial class RoutingService -{ - public async Task InstructStream(Agent agent, RoleDialogModel message, List dialogs) - { - var conv = _services.GetRequiredService(); - var storage = _services.GetRequiredService(); - storage.Append(conv.ConversationId, message); - - dialogs.Add(message); - Context.SetDialogs(dialogs); - - var routing = _services.GetRequiredService(); - routing.Context.Push(agent.Id, "instruct directly"); - var agentId = routing.Context.GetCurrentAgentId(); - - // Update next action agent's name - var agentService = _services.GetRequiredService(); - - if (agent.Disabled) - { - var content = $"This agent ({agent.Name}) is disabled, please install the corresponding plugin ({agent.Plugin.Name}) to activate this agent."; - - message = RoleDialogModel.From(message, role: AgentRole.Assistant, content: content); - dialogs.Add(message); - } - else - { - var provider = agent.LlmConfig.Provider; - var model = agent.LlmConfig.Model; - - if (provider == null || model == null) - { - var agentSettings = _services.GetRequiredService(); - provider = agentSettings.LlmConfig.Provider; - model = agentSettings.LlmConfig.Model; - } - - var chatCompletion = CompletionProvider.GetChatCompletion(_services, - provider: provider, - model: model); - - await chatCompletion.GetChatCompletionsStreamingAsync(agent, dialogs, async data => { }); - } - - return true; - } -} diff --git a/src/Infrastructure/BotSharp.Core/Routing/RoutingService.InvokeAgent.cs b/src/Infrastructure/BotSharp.Core/Routing/RoutingService.InvokeAgent.cs index 7bfd93523..775057186 100644 --- a/src/Infrastructure/BotSharp.Core/Routing/RoutingService.InvokeAgent.cs +++ b/src/Infrastructure/BotSharp.Core/Routing/RoutingService.InvokeAgent.cs @@ -4,7 +4,7 @@ namespace BotSharp.Core.Routing; public partial class RoutingService { - public async Task InvokeAgent(string agentId, List dialogs) + public async Task InvokeAgent(string agentId, List dialogs, bool useStream = false) { var agentService = _services.GetRequiredService(); var agent = await agentService.LoadAgent(agentId); @@ -30,8 +30,16 @@ public async Task InvokeAgent(string agentId, List dialog provider: provider, model: model); + RoleDialogModel response; var message = dialogs.Last(); - var response = await chatCompletion.GetChatCompletions(agent, dialogs); + if (useStream) + { + response = await chatCompletion.GetChatCompletionsStreamingAsync(agent, dialogs); + } + else + { + response = await chatCompletion.GetChatCompletions(agent, dialogs); + } if (response.Role == AgentRole.Function) { @@ -45,8 +53,9 @@ public async Task InvokeAgent(string agentId, List dialog message.FunctionArgs = response.FunctionArgs; message.Indication = response.Indication; message.CurrentAgentId = agent.Id; + message.IsStreaming = response.IsStreaming; - await InvokeFunction(message, dialogs); + await InvokeFunction(message, dialogs, useStream); } else { @@ -59,6 +68,7 @@ public async Task InvokeAgent(string agentId, List dialog message = RoleDialogModel.From(message, role: AgentRole.Assistant, content: response.Content); message.CurrentAgentId = agent.Id; + message.IsStreaming = response.IsStreaming; dialogs.Add(message); Context.SetDialogs(dialogs); } @@ -66,7 +76,7 @@ public async Task InvokeAgent(string agentId, List dialog return true; } - private async Task InvokeFunction(RoleDialogModel message, List dialogs) + private async Task InvokeFunction(RoleDialogModel message, List dialogs, bool useStream = false) { // execute function // Save states @@ -102,7 +112,7 @@ private async Task InvokeFunction(RoleDialogModel message, List InstructDirect(Agent agent, RoleDialogModel m } else { - var ret = await routing.InvokeAgent(agentId, dialogs); + var convSettings = _services.GetRequiredService(); + var ret = await routing.InvokeAgent(agentId, dialogs, convSettings.EnableStreaming); } var response = dialogs.Last(); diff --git a/src/Infrastructure/BotSharp.OpenAPI/Controllers/ConversationController.cs b/src/Infrastructure/BotSharp.OpenAPI/Controllers/ConversationController.cs index 0552b19f6..bd131ca1e 100644 --- a/src/Infrastructure/BotSharp.OpenAPI/Controllers/ConversationController.cs +++ b/src/Infrastructure/BotSharp.OpenAPI/Controllers/ConversationController.cs @@ -378,29 +378,6 @@ await conv.SendMessage(agentId, inputMsg, } - [HttpPost("/conversation/{agentId}/{conversationId}/stream")] - public async Task SendMessageStream( - [FromRoute] string agentId, - [FromRoute] string conversationId, - [FromBody] NewMessageModel input) - { - var conv = _services.GetRequiredService(); - var inputMsg = new RoleDialogModel(AgentRole.User, input.Text) - { - MessageId = !string.IsNullOrWhiteSpace(input.InputMessageId) ? input.InputMessageId : Guid.NewGuid().ToString(), - CreatedAt = DateTime.UtcNow - }; - - var routing = _services.GetRequiredService(); - routing.Context.SetMessageId(conversationId, inputMsg.MessageId); - - conv.SetConversationId(conversationId, input.States); - SetStates(conv, input); - - await conv.StreamMessage(agentId, inputMsg, replyMessage: input.Postback); - } - - [HttpPost("/conversation/{agentId}/{conversationId}/sse")] public async Task SendMessageSse([FromRoute] string agentId, [FromRoute] string conversationId, [FromBody] NewMessageModel input) { diff --git a/src/Plugins/BotSharp.Plugin.ChatHub/Hooks/ChatHubConversationHook.cs b/src/Plugins/BotSharp.Plugin.ChatHub/Hooks/ChatHubConversationHook.cs index a41abe874..79604c98f 100644 --- a/src/Plugins/BotSharp.Plugin.ChatHub/Hooks/ChatHubConversationHook.cs +++ b/src/Plugins/BotSharp.Plugin.ChatHub/Hooks/ChatHubConversationHook.cs @@ -105,7 +105,7 @@ public override async Task OnPostbackMessageReceived(RoleDialogModel message, Po public override async Task OnResponseGenerated(RoleDialogModel message) { - if (!AllowSendingMessage()) return; + if (!AllowSendingMessage() || message.IsStreaming) return; var conv = _services.GetRequiredService(); var state = _services.GetRequiredService(); diff --git a/src/Plugins/BotSharp.Plugin.ChatHub/Observers/ChatHubObserver.cs b/src/Plugins/BotSharp.Plugin.ChatHub/Observers/ChatHubObserver.cs index c7a852fa6..7da61c76d 100644 --- a/src/Plugins/BotSharp.Plugin.ChatHub/Observers/ChatHubObserver.cs +++ b/src/Plugins/BotSharp.Plugin.ChatHub/Observers/ChatHubObserver.cs @@ -65,30 +65,30 @@ public void OnNext(HubObserveData value) } else if (value.EventName == AFTER_RECEIVE_LLM_STREAM_MESSAGE) { - var conv = _services.GetRequiredService(); - model = new ChatResponseDto() + if (message.IsStreaming) { - ConversationId = conv.ConversationId, - MessageId = message.MessageId, - Text = message.Content, - Sender = new() + var conv = _services.GetRequiredService(); + model = new ChatResponseDto() { - FirstName = "AI", - LastName = "Assistant", - Role = AgentRole.Assistant - } - }; - - var action = new ConversationSenderActionModel - { - ConversationId = conv.ConversationId, - SenderAction = SenderActionEnum.TypingOff - }; - - GenerateSenderAction(conv.ConversationId, action).ConfigureAwait(false).GetAwaiter().GetResult(); + ConversationId = conv.ConversationId, + MessageId = message.MessageId, + Text = message.Content, + Sender = new() + { + FirstName = "AI", + LastName = "Assistant", + Role = AgentRole.Assistant + } + }; + + var action = new ConversationSenderActionModel + { + ConversationId = conv.ConversationId, + SenderAction = SenderActionEnum.TypingOff + }; - var storage = _services.GetRequiredService(); - storage.Append(conv.ConversationId, message); + GenerateSenderAction(conv.ConversationId, action).ConfigureAwait(false).GetAwaiter().GetResult(); + } } else if (value.EventName == ON_RECEIVE_LLM_STREAM_MESSAGE) { diff --git a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs index 38d1455ed..bb8c484a7 100644 --- a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs @@ -1,13 +1,7 @@ using BotSharp.Abstraction.Hooks; using BotSharp.Core.Infrastructures.Streams; using BotSharp.Core.Observables.Queues; -using EntityFrameworkCore.BootKit; -using Fluid; -using ModelContextProtocol.Protocol.Types; using OpenAI.Chat; -using System.Xml; -using static Microsoft.EntityFrameworkCore.DbLoggerCategory; -using static System.Net.Mime.MediaTypeNames; namespace BotSharp.Plugin.OpenAI.Providers.Chat; @@ -187,7 +181,7 @@ public async Task GetChatCompletionsAsync(Agent agent, return true; } - public async Task GetChatCompletionsStreamingAsync(Agent agent, List conversations, Func onMessageReceived) + public async Task GetChatCompletionsStreamingAsync(Agent agent, List conversations) { var client = ProviderHelper.GetClient(Provider, _model, _services); var chatClient = client.GetChatClient(_model); @@ -227,7 +221,9 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, List GetChatCompletionsStreamingAsync(Agent agent, List x.FunctionArgumentsUpdate != null).Select(x => x.FunctionArgumentsUpdate.ToString()).ToList(); var functionArgument = string.Join(string.Empty, args); +#if DEBUG _logger.LogCritical($"Tool Call (id: {toolCallId}) => {functionName}({functionArgument})"); +#endif responseMessage = new RoleDialogModel(AgentRole.Function, string.Empty) { @@ -270,7 +268,8 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, List GetChatCompletionsStreamingAsync(Agent agent, List([new RoleDialogModel(AgentRole.User, "write a poem about stars")]); + RoleDialogModel reply = null; - var result = await chatCompletion.GetChatCompletionsStreamingAsync(agent,conversation, async (received) => + var messages = new List { - reply = received; - }); - result.ShouldBeTrue(); + new RoleDialogModel(AgentRole.User, "write a poem about stars") + }; + var result = await chatCompletion.GetChatCompletionsStreamingAsync(agent, messages); + + result.ShouldNotBeNull(); reply.ShouldNotBeNull(); reply.Content.ShouldNotBeNullOrEmpty(); } From 3d57d149b47c5f68ba6d6f7b4186cb9b3162f946 Mon Sep 17 00:00:00 2001 From: Jicheng Lu <103353@smsassist.com> Date: Mon, 23 Jun 2025 21:47:58 -0500 Subject: [PATCH 10/14] clean code --- .../BotSharp.Abstraction/Routing/IRoutingService.cs | 4 ---- .../BotSharp.Core/Routing/RoutingService.InvokeAgent.cs | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/src/Infrastructure/BotSharp.Abstraction/Routing/IRoutingService.cs b/src/Infrastructure/BotSharp.Abstraction/Routing/IRoutingService.cs index fb542d5f4..33803a98e 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Routing/IRoutingService.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Routing/IRoutingService.cs @@ -26,10 +26,6 @@ public interface IRoutingService /// RoutingRule[] GetRulesByAgentId(string id); - //void ResetRecursiveCounter(); - //int GetRecursiveCounter(); - //void SetRecursiveCounter(int counter); - Task InvokeAgent(string agentId, List dialogs, bool useStream = false); Task InvokeFunction(string name, RoleDialogModel messages); Task InstructLoop(Agent agent, RoleDialogModel message, List dialogs); diff --git a/src/Infrastructure/BotSharp.Core/Routing/RoutingService.InvokeAgent.cs b/src/Infrastructure/BotSharp.Core/Routing/RoutingService.InvokeAgent.cs index 775057186..eb60e9255 100644 --- a/src/Infrastructure/BotSharp.Core/Routing/RoutingService.InvokeAgent.cs +++ b/src/Infrastructure/BotSharp.Core/Routing/RoutingService.InvokeAgent.cs @@ -76,7 +76,7 @@ public async Task InvokeAgent(string agentId, List dialog return true; } - private async Task InvokeFunction(RoleDialogModel message, List dialogs, bool useStream = false) + private async Task InvokeFunction(RoleDialogModel message, List dialogs, bool useStream) { // execute function // Save states From 5f211d6aa5c92a5e872dc68dc184044f5f258bfb Mon Sep 17 00:00:00 2001 From: Jicheng Lu <103353@smsassist.com> Date: Tue, 24 Jun 2025 08:34:38 -0500 Subject: [PATCH 11/14] minor change --- .../Observers/ChatHubObserver.cs | 41 +++++++++---------- 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/src/Plugins/BotSharp.Plugin.ChatHub/Observers/ChatHubObserver.cs b/src/Plugins/BotSharp.Plugin.ChatHub/Observers/ChatHubObserver.cs index 7da61c76d..97cb89a4e 100644 --- a/src/Plugins/BotSharp.Plugin.ChatHub/Observers/ChatHubObserver.cs +++ b/src/Plugins/BotSharp.Plugin.ChatHub/Observers/ChatHubObserver.cs @@ -63,32 +63,29 @@ public void OnNext(HubObserveData value) GenerateSenderAction(conv.ConversationId, action).ConfigureAwait(false).GetAwaiter().GetResult(); } - else if (value.EventName == AFTER_RECEIVE_LLM_STREAM_MESSAGE) + else if (value.EventName == AFTER_RECEIVE_LLM_STREAM_MESSAGE && message.IsStreaming) { - if (message.IsStreaming) + var conv = _services.GetRequiredService(); + model = new ChatResponseDto() { - var conv = _services.GetRequiredService(); - model = new ChatResponseDto() - { - ConversationId = conv.ConversationId, - MessageId = message.MessageId, - Text = message.Content, - Sender = new() - { - FirstName = "AI", - LastName = "Assistant", - Role = AgentRole.Assistant - } - }; - - var action = new ConversationSenderActionModel + ConversationId = conv.ConversationId, + MessageId = message.MessageId, + Text = message.Content, + Sender = new() { - ConversationId = conv.ConversationId, - SenderAction = SenderActionEnum.TypingOff - }; + FirstName = "AI", + LastName = "Assistant", + Role = AgentRole.Assistant + } + }; - GenerateSenderAction(conv.ConversationId, action).ConfigureAwait(false).GetAwaiter().GetResult(); - } + var action = new ConversationSenderActionModel + { + ConversationId = conv.ConversationId, + SenderAction = SenderActionEnum.TypingOff + }; + + GenerateSenderAction(conv.ConversationId, action).ConfigureAwait(false).GetAwaiter().GetResult(); } else if (value.EventName == ON_RECEIVE_LLM_STREAM_MESSAGE) { From 2b465722df0a2cec34a52c84580947c12f859430 Mon Sep 17 00:00:00 2001 From: Jicheng Lu Date: Tue, 24 Jun 2025 22:09:13 -0500 Subject: [PATCH 12/14] use stream message from state and add token usage --- .../Conversations/Dtos/ChatResponseDto.cs | 3 ++ .../Settings/ConversationSetting.cs | 1 - .../Routing/Reasoning/InstructExecutor.cs | 5 +-- .../BotSharp.Core/Routing/RoutingService.cs | 5 +-- .../Hooks/ChatHubConversationHook.cs | 9 ++++-- .../Providers/Chat/ChatCompletionProvider.cs | 31 +++++++++++++++++++ 6 files changed, 47 insertions(+), 7 deletions(-) diff --git a/src/Infrastructure/BotSharp.Abstraction/Conversations/Dtos/ChatResponseDto.cs b/src/Infrastructure/BotSharp.Abstraction/Conversations/Dtos/ChatResponseDto.cs index b391bc7b1..19b5e8b28 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Conversations/Dtos/ChatResponseDto.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Conversations/Dtos/ChatResponseDto.cs @@ -35,6 +35,9 @@ public class ChatResponseDto : InstructResult [JsonPropertyName("has_message_files")] public bool HasMessageFiles { get; set; } + [JsonPropertyName("is_streaming")] + public bool IsStreaming { get; set; } + [JsonPropertyName("created_at")] public DateTime CreatedAt { get; set; } = DateTime.UtcNow; } diff --git a/src/Infrastructure/BotSharp.Abstraction/Conversations/Settings/ConversationSetting.cs b/src/Infrastructure/BotSharp.Abstraction/Conversations/Settings/ConversationSetting.cs index 980569dc4..c4f131dc7 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Conversations/Settings/ConversationSetting.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Conversations/Settings/ConversationSetting.cs @@ -12,7 +12,6 @@ public class ConversationSetting public bool EnableContentLog { get; set; } public bool EnableStateLog { get; set; } public bool EnableTranslationMemory { get; set; } - public bool EnableStreaming { get; set; } public CleanConversationSetting CleanSetting { get; set; } = new(); public RateLimitSetting RateLimit { get; set; } = new(); } diff --git a/src/Infrastructure/BotSharp.Core/Routing/Reasoning/InstructExecutor.cs b/src/Infrastructure/BotSharp.Core/Routing/Reasoning/InstructExecutor.cs index 42a3d6528..18d87cb9a 100644 --- a/src/Infrastructure/BotSharp.Core/Routing/Reasoning/InstructExecutor.cs +++ b/src/Infrastructure/BotSharp.Core/Routing/Reasoning/InstructExecutor.cs @@ -57,8 +57,9 @@ await HookEmitter.Emit(_services, async hook => await hook.OnRouti } else { - var convSettings = _services.GetRequiredService(); - var ret = await routing.InvokeAgent(agentId, dialogs, convSettings.EnableStreaming); + var state = _services.GetRequiredService(); + var useStreamMsg = state.GetState("use_stream_message"); + var ret = await routing.InvokeAgent(agentId, dialogs, bool.TryParse(useStreamMsg, out var useStream) && useStream); } var response = dialogs.Last(); diff --git a/src/Infrastructure/BotSharp.Core/Routing/RoutingService.cs b/src/Infrastructure/BotSharp.Core/Routing/RoutingService.cs index bc9e1da22..e4764b92b 100644 --- a/src/Infrastructure/BotSharp.Core/Routing/RoutingService.cs +++ b/src/Infrastructure/BotSharp.Core/Routing/RoutingService.cs @@ -51,8 +51,9 @@ public async Task InstructDirect(Agent agent, RoleDialogModel m } else { - var convSettings = _services.GetRequiredService(); - var ret = await routing.InvokeAgent(agentId, dialogs, convSettings.EnableStreaming); + var state = _services.GetRequiredService(); + var useStreamMsg = state.GetState("use_stream_message"); + var ret = await routing.InvokeAgent(agentId, dialogs, bool.TryParse(useStreamMsg, out var useStream) && useStream); } var response = dialogs.Last(); diff --git a/src/Plugins/BotSharp.Plugin.ChatHub/Hooks/ChatHubConversationHook.cs b/src/Plugins/BotSharp.Plugin.ChatHub/Hooks/ChatHubConversationHook.cs index 79604c98f..7bc4600af 100644 --- a/src/Plugins/BotSharp.Plugin.ChatHub/Hooks/ChatHubConversationHook.cs +++ b/src/Plugins/BotSharp.Plugin.ChatHub/Hooks/ChatHubConversationHook.cs @@ -105,7 +105,7 @@ public override async Task OnPostbackMessageReceived(RoleDialogModel message, Po public override async Task OnResponseGenerated(RoleDialogModel message) { - if (!AllowSendingMessage() || message.IsStreaming) return; + if (!AllowSendingMessage()) return; var conv = _services.GetRequiredService(); var state = _services.GetRequiredService(); @@ -118,6 +118,7 @@ public override async Task OnResponseGenerated(RoleDialogModel message) RichContent = message.SecondaryRichContent ?? message.RichContent, Data = message.Data, States = state.GetStates(), + IsStreaming = message.IsStreaming, Sender = new() { FirstName = "AI", @@ -133,7 +134,11 @@ public override async Task OnResponseGenerated(RoleDialogModel message) SenderAction = SenderActionEnum.TypingOff }; - await GenerateSenderAction(conv.ConversationId, action); + if (!message.IsStreaming) + { + await GenerateSenderAction(conv.ConversationId, action); + } + await ReceiveAssistantMessage(conv.ConversationId, json); await base.OnResponseGenerated(message); } diff --git a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs index bb8c484a7..16e56b25a 100644 --- a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs @@ -1,6 +1,9 @@ +using Azure; using BotSharp.Abstraction.Hooks; using BotSharp.Core.Infrastructures.Streams; using BotSharp.Core.Observables.Queues; +using BotSharp.Plugin.OpenAI.Models.Realtime; +using Fluid; using OpenAI.Chat; namespace BotSharp.Plugin.OpenAI.Providers.Chat; @@ -190,6 +193,13 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, var hub = _services.GetRequiredService(); var messageId = conversations.LastOrDefault()?.MessageId ?? string.Empty; + var contentHooks = _services.GetHooks(agent.Id); + // Before chat completion hook + foreach (var hook in contentHooks) + { + await hook.BeforeGenerating(agent, conversations); + } + hub.Push(new() { ServiceProvider = _services, @@ -201,8 +211,11 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, } }); + using var textStream = new RealtimeTextStream(); var toolCalls = new List(); + ChatTokenUsage? tokenUsage = null; + var responseMessage = new RoleDialogModel(AgentRole.Assistant, string.Empty) { CurrentAgentId = agent.Id, @@ -211,6 +224,8 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, await foreach (var choice in chatClient.CompleteChatStreamingAsync(messages, options)) { + tokenUsage = choice.Usage; + if (!choice.ToolCallUpdates.IsNullOrEmpty()) { toolCalls.AddRange(choice.ToolCallUpdates); @@ -281,6 +296,22 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, Data = responseMessage }); + + var inputTokenDetails = tokenUsage?.InputTokenDetails; + // After chat completion hook + foreach (var hook in contentHooks) + { + await hook.AfterGenerated(responseMessage, new TokenStatsModel + { + Prompt = prompt, + Provider = Provider, + Model = _model, + TextInputTokens = (tokenUsage?.InputTokenCount ?? 0) - (inputTokenDetails?.CachedTokenCount ?? 0), + CachedTextInputTokens = inputTokenDetails?.CachedTokenCount ?? 0, + TextOutputTokens = tokenUsage?.OutputTokenCount ?? 0 + }); + } + return responseMessage; } From 0aff924dd9a15a8ee1e33793b78c18a38f739656 Mon Sep 17 00:00:00 2001 From: Jicheng Lu <103353@smsassist.com> Date: Fri, 27 Jun 2025 11:48:13 -0500 Subject: [PATCH 13/14] minor change --- src/Plugins/BotSharp.Plugin.ChatHub/Hooks/StreamingLogHook.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Plugins/BotSharp.Plugin.ChatHub/Hooks/StreamingLogHook.cs b/src/Plugins/BotSharp.Plugin.ChatHub/Hooks/StreamingLogHook.cs index 720f3fd0a..b499438e7 100644 --- a/src/Plugins/BotSharp.Plugin.ChatHub/Hooks/StreamingLogHook.cs +++ b/src/Plugins/BotSharp.Plugin.ChatHub/Hooks/StreamingLogHook.cs @@ -74,7 +74,7 @@ public override async Task OnPostbackMessageReceived(RoleDialogModel message, Po var log = $"{GetMessageContent(message)}"; var replyContent = JsonSerializer.Serialize(replyMsg, _options.JsonSerializerOptions); - log += $"\r\n```json\r\n{replyContent}\r\n```"; + log += $"\r\n\r\n```json\r\n{replyContent}\r\n```"; var input = new ContentLogInputModel(conversationId, message) { @@ -233,7 +233,7 @@ public override async Task OnResponseGenerated(RoleDialogModel message) if (message.RichContent != null || message.SecondaryRichContent != null) { var richContent = JsonSerializer.Serialize(message.SecondaryRichContent ?? message.RichContent, _localJsonOptions); - log += $"\r\n```json\r\n{richContent}\r\n```"; + log += $"\r\n\r\n```json\r\n{richContent}\r\n```"; } var input = new ContentLogInputModel(conv.ConversationId, message) From aac09ca3bdab6786729a8475c0646347fc2116fc Mon Sep 17 00:00:00 2001 From: Jicheng Lu <103353@smsassist.com> Date: Fri, 27 Jun 2025 13:49:44 -0500 Subject: [PATCH 14/14] implement streaming chat interface --- .../MLTasks/IChatCompletion.cs | 2 +- .../Conversations/ConversationPlugin.cs | 3 +- .../Observables/Queues/MessageHub.cs | 13 +- .../Providers/ChatCompletionProvider.cs | 3 +- .../BotSharp.Plugin.AzureOpenAI.csproj | 2 +- .../Providers/Chat/ChatCompletionProvider.cs | 125 +++++++++++++++-- .../BotSharp.Plugin.ChatHub/ChatHubPlugin.cs | 5 +- .../BotSharp.Plugin.DeepSeekAI.csproj | 2 +- .../Providers/Chat/ChatCompletionProvider.cs | 131 +++++++++++++++--- .../Chat/GeminiChatCompletionProvider.cs | 35 +---- .../Chat/PalmChatCompletionProvider.cs | 2 +- .../Providers/ChatCompletionProvider.cs | 4 +- .../BotSharp.Plugin.LLamaSharp.csproj | 2 +- .../Providers/ChatCompletionProvider.cs | 64 ++++++++- .../Providers/ChatCompletionProvider.cs | 2 +- .../Providers/ChatCompletionProvider.cs | 2 +- ...osoftExtensionsAIChatCompletionProvider.cs | 4 +- .../Providers/Chat/ChatCompletionProvider.cs | 5 +- .../SemanticKernelChatCompletionProvider.cs | 2 +- .../BotSharp.Plugin.SparkDesk.csproj | 2 +- .../Providers/ChatCompletionProvider.cs | 79 ++++++++--- 21 files changed, 375 insertions(+), 114 deletions(-) diff --git a/src/Infrastructure/BotSharp.Abstraction/MLTasks/IChatCompletion.cs b/src/Infrastructure/BotSharp.Abstraction/MLTasks/IChatCompletion.cs index 4d93f307b..51f89375e 100644 --- a/src/Infrastructure/BotSharp.Abstraction/MLTasks/IChatCompletion.cs +++ b/src/Infrastructure/BotSharp.Abstraction/MLTasks/IChatCompletion.cs @@ -24,5 +24,5 @@ Task GetChatCompletionsAsync(Agent agent, Func onFunctionExecuting); Task GetChatCompletionsStreamingAsync(Agent agent, - List conversations) => Task.FromResult(new RoleDialogModel(AgentRole.Assistant, string.Empty)); + List conversations); } diff --git a/src/Infrastructure/BotSharp.Core/Conversations/ConversationPlugin.cs b/src/Infrastructure/BotSharp.Core/Conversations/ConversationPlugin.cs index e8e18eb84..a559877f8 100644 --- a/src/Infrastructure/BotSharp.Core/Conversations/ConversationPlugin.cs +++ b/src/Infrastructure/BotSharp.Core/Conversations/ConversationPlugin.cs @@ -12,6 +12,7 @@ using BotSharp.Core.Translation; using BotSharp.Core.Observables.Queues; using Microsoft.Extensions.Configuration; +using BotSharp.Abstraction.Observables.Models; namespace BotSharp.Core.Conversations; @@ -42,7 +43,7 @@ public void RegisterDI(IServiceCollection services, IConfiguration config) return settingService.Bind("GoogleApi"); }); - services.AddSingleton(); + services.AddSingleton>(); services.AddScoped(); services.AddScoped(); diff --git a/src/Infrastructure/BotSharp.Core/Observables/Queues/MessageHub.cs b/src/Infrastructure/BotSharp.Core/Observables/Queues/MessageHub.cs index 9950a613b..affb142da 100644 --- a/src/Infrastructure/BotSharp.Core/Observables/Queues/MessageHub.cs +++ b/src/Infrastructure/BotSharp.Core/Observables/Queues/MessageHub.cs @@ -1,15 +1,14 @@ -using BotSharp.Abstraction.Observables.Models; using System.Reactive.Subjects; namespace BotSharp.Core.Observables.Queues; -public class MessageHub +public class MessageHub where T : class { - private readonly ILogger _logger; - private readonly ISubject _observable = new Subject(); - public IObservable Events => _observable; + private readonly ILogger> _logger; + private readonly ISubject _observable = new Subject(); + public IObservable Events => _observable; - public MessageHub(ILogger logger) + public MessageHub(ILogger> logger) { _logger = logger; } @@ -18,7 +17,7 @@ public MessageHub(ILogger logger) /// Push an item to the observers. /// /// - public void Push(HubObserveData item) + public void Push(T item) { _observable.OnNext(item); } diff --git a/src/Plugins/BotSharp.Plugin.AnthropicAI/Providers/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.AnthropicAI/Providers/ChatCompletionProvider.cs index 285e8abdb..50d67a1bb 100644 --- a/src/Plugins/BotSharp.Plugin.AnthropicAI/Providers/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.AnthropicAI/Providers/ChatCompletionProvider.cs @@ -96,8 +96,7 @@ public Task GetChatCompletionsAsync(Agent agent, List con throw new NotImplementedException(); } - public Task GetChatCompletionsStreamingAsync(Agent agent, List conversations, - Func onMessageReceived) + public Task GetChatCompletionsStreamingAsync(Agent agent, List conversations) { throw new NotImplementedException(); } diff --git a/src/Plugins/BotSharp.Plugin.AzureOpenAI/BotSharp.Plugin.AzureOpenAI.csproj b/src/Plugins/BotSharp.Plugin.AzureOpenAI/BotSharp.Plugin.AzureOpenAI.csproj index 494326fac..372fb3de5 100644 --- a/src/Plugins/BotSharp.Plugin.AzureOpenAI/BotSharp.Plugin.AzureOpenAI.csproj +++ b/src/Plugins/BotSharp.Plugin.AzureOpenAI/BotSharp.Plugin.AzureOpenAI.csproj @@ -16,7 +16,7 @@ - + diff --git a/src/Plugins/BotSharp.Plugin.AzureOpenAI/Providers/Chat/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.AzureOpenAI/Providers/Chat/ChatCompletionProvider.cs index ab7135ef8..605bceb85 100644 --- a/src/Plugins/BotSharp.Plugin.AzureOpenAI/Providers/Chat/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.AzureOpenAI/Providers/Chat/ChatCompletionProvider.cs @@ -1,6 +1,9 @@ using Azure; using BotSharp.Abstraction.Files.Utilities; using BotSharp.Abstraction.Hooks; +using BotSharp.Abstraction.Observables.Models; +using BotSharp.Core.Infrastructures.Streams; +using BotSharp.Core.Observables.Queues; using OpenAI.Chat; using System.ClientModel; @@ -203,39 +206,133 @@ public async Task GetChatCompletionsAsync(Agent agent, return true; } - public async Task GetChatCompletionsStreamingAsync(Agent agent, List conversations, Func onMessageReceived) + public async Task GetChatCompletionsStreamingAsync(Agent agent, List conversations) { var client = ProviderHelper.GetClient(Provider, _model, _services); var chatClient = client.GetChatClient(_model); var (prompt, messages, options) = PrepareOptions(agent, conversations); - var response = chatClient.CompleteChatStreamingAsync(messages, options); + var hub = _services.GetRequiredService>(); + var messageId = conversations.LastOrDefault()?.MessageId ?? string.Empty; - await foreach (var choice in response) + var contentHooks = _services.GetHooks(agent.Id); + // Before chat completion hook + foreach (var hook in contentHooks) + { + await hook.BeforeGenerating(agent, conversations); + } + + hub.Push(new() + { + ServiceProvider = _services, + EventName = "BeforeReceiveLlmStreamMessage", + Data = new RoleDialogModel(AgentRole.Assistant, string.Empty) + { + CurrentAgentId = agent.Id, + MessageId = messageId + } + }); + + using var textStream = new RealtimeTextStream(); + var toolCalls = new List(); + ChatTokenUsage? tokenUsage = null; + + var responseMessage = new RoleDialogModel(AgentRole.Assistant, string.Empty) + { + CurrentAgentId = agent.Id, + MessageId = messageId + }; + + await foreach (var choice in chatClient.CompleteChatStreamingAsync(messages, options)) { - if (choice.FinishReason == ChatFinishReason.FunctionCall || choice.FinishReason == ChatFinishReason.ToolCalls) + tokenUsage = choice.Usage; + + if (!choice.ToolCallUpdates.IsNullOrEmpty()) + { + toolCalls.AddRange(choice.ToolCallUpdates); + } + + if (!choice.ContentUpdate.IsNullOrEmpty()) { - var update = choice.ToolCallUpdates?.FirstOrDefault()?.FunctionArgumentsUpdate?.ToString() ?? string.Empty; - Console.Write(update); + var text = choice.ContentUpdate[0]?.Text ?? string.Empty; + textStream.Collect(text); - await onMessageReceived(new RoleDialogModel(AgentRole.Assistant, update) +#if DEBUG + _logger.LogCritical($"Content update: {text}"); +#endif + + var content = new RoleDialogModel(AgentRole.Assistant, text) { - RenderedInstruction = string.Join("\r\n", renderedInstructions) + CurrentAgentId = agent.Id, + MessageId = messageId + }; + hub.Push(new() + { + ServiceProvider = _services, + EventName = "OnReceiveLlmStreamMessage", + Data = content }); - continue; } - if (choice.ContentUpdate.IsNullOrEmpty()) continue; + if (choice.FinishReason == ChatFinishReason.ToolCalls || choice.FinishReason == ChatFinishReason.FunctionCall) + { + var meta = toolCalls.FirstOrDefault(x => !string.IsNullOrEmpty(x.FunctionName)); + var functionName = meta?.FunctionName; + var toolCallId = meta?.ToolCallId; + var args = toolCalls.Where(x => x.FunctionArgumentsUpdate != null).Select(x => x.FunctionArgumentsUpdate.ToString()).ToList(); + var functionArgument = string.Join(string.Empty, args); - _logger.LogInformation(choice.ContentUpdate[0]?.Text); +#if DEBUG + _logger.LogCritical($"Tool Call (id: {toolCallId}) => {functionName}({functionArgument})"); +#endif - await onMessageReceived(new RoleDialogModel(choice.Role?.ToString() ?? ChatMessageRole.Assistant.ToString(), choice.ContentUpdate[0]?.Text ?? string.Empty) + responseMessage = new RoleDialogModel(AgentRole.Function, string.Empty) + { + CurrentAgentId = agent.Id, + MessageId = messageId, + ToolCallId = toolCallId, + FunctionName = functionName, + FunctionArgs = functionArgument + }; + } + else if (choice.FinishReason.HasValue) { - RenderedInstruction = string.Join("\r\n", renderedInstructions) + var allText = textStream.GetText(); + _logger.LogCritical($"Text Content: {allText}"); + + responseMessage = new RoleDialogModel(AgentRole.Assistant, allText) + { + CurrentAgentId = agent.Id, + MessageId = messageId, + IsStreaming = true + }; + } + } + + hub.Push(new() + { + ServiceProvider = _services, + EventName = "AfterReceiveLlmStreamMessage", + Data = responseMessage + }); + + + var inputTokenDetails = tokenUsage?.InputTokenDetails; + // After chat completion hook + foreach (var hook in contentHooks) + { + await hook.AfterGenerated(responseMessage, new TokenStatsModel + { + Prompt = prompt, + Provider = Provider, + Model = _model, + TextInputTokens = (tokenUsage?.InputTokenCount ?? 0) - (inputTokenDetails?.CachedTokenCount ?? 0), + CachedTextInputTokens = inputTokenDetails?.CachedTokenCount ?? 0, + TextOutputTokens = tokenUsage?.OutputTokenCount ?? 0 }); } - return true; + return responseMessage; } protected (string, IEnumerable, ChatCompletionOptions) PrepareOptions(Agent agent, List conversations) diff --git a/src/Plugins/BotSharp.Plugin.ChatHub/ChatHubPlugin.cs b/src/Plugins/BotSharp.Plugin.ChatHub/ChatHubPlugin.cs index b1fb144f6..d3c3f2acb 100644 --- a/src/Plugins/BotSharp.Plugin.ChatHub/ChatHubPlugin.cs +++ b/src/Plugins/BotSharp.Plugin.ChatHub/ChatHubPlugin.cs @@ -1,4 +1,5 @@ using BotSharp.Abstraction.Crontab; +using BotSharp.Abstraction.Observables.Models; using BotSharp.Core.Observables.Queues; using BotSharp.Plugin.ChatHub.Hooks; using BotSharp.Plugin.ChatHub.Observers; @@ -35,8 +36,8 @@ public void RegisterDI(IServiceCollection services, IConfiguration config) public void Configure(IApplicationBuilder app) { var services = app.ApplicationServices; - var queue = services.GetRequiredService(); - var logger = services.GetRequiredService>(); + var queue = services.GetRequiredService>(); + var logger = services.GetRequiredService>>(); queue.Events.Subscribe(new ChatHubObserver(logger)); } } diff --git a/src/Plugins/BotSharp.Plugin.DeepSeekAI/BotSharp.Plugin.DeepSeekAI.csproj b/src/Plugins/BotSharp.Plugin.DeepSeekAI/BotSharp.Plugin.DeepSeekAI.csproj index 2f7e326ad..3f9a26ce0 100644 --- a/src/Plugins/BotSharp.Plugin.DeepSeekAI/BotSharp.Plugin.DeepSeekAI.csproj +++ b/src/Plugins/BotSharp.Plugin.DeepSeekAI/BotSharp.Plugin.DeepSeekAI.csproj @@ -15,7 +15,7 @@ - + diff --git a/src/Plugins/BotSharp.Plugin.DeepSeekAI/Providers/Chat/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.DeepSeekAI/Providers/Chat/ChatCompletionProvider.cs index 42ce1ac93..bcbcc2a47 100644 --- a/src/Plugins/BotSharp.Plugin.DeepSeekAI/Providers/Chat/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.DeepSeekAI/Providers/Chat/ChatCompletionProvider.cs @@ -1,8 +1,11 @@ -using Microsoft.Extensions.Logging; -using OpenAI.Chat; using BotSharp.Abstraction.Files; -using BotSharp.Plugin.DeepSeek.Providers; using BotSharp.Abstraction.Hooks; +using BotSharp.Abstraction.Observables.Models; +using BotSharp.Core.Infrastructures.Streams; +using BotSharp.Core.Observables.Queues; +using BotSharp.Plugin.DeepSeek.Providers; +using Microsoft.Extensions.Logging; +using OpenAI.Chat; namespace BotSharp.Plugin.DeepSeekAI.Providers.Chat; @@ -170,39 +173,133 @@ public async Task GetChatCompletionsAsync(Agent agent, List GetChatCompletionsStreamingAsync(Agent agent, List conversations, Func onMessageReceived) + public async Task GetChatCompletionsStreamingAsync(Agent agent, List conversations) { var client = ProviderHelper.GetClient(Provider, _model, _services); var chatClient = client.GetChatClient(_model); var (prompt, messages, options) = PrepareOptions(agent, conversations); - var response = chatClient.CompleteChatStreamingAsync(messages, options); + var hub = _services.GetRequiredService>(); + var messageId = conversations.LastOrDefault()?.MessageId ?? string.Empty; + + var contentHooks = _services.GetHooks(agent.Id); + // Before chat completion hook + foreach (var hook in contentHooks) + { + await hook.BeforeGenerating(agent, conversations); + } - await foreach (var choice in response) + hub.Push(new() { - if (choice.FinishReason == ChatFinishReason.FunctionCall || choice.FinishReason == ChatFinishReason.ToolCalls) + ServiceProvider = _services, + EventName = "BeforeReceiveLlmStreamMessage", + Data = new RoleDialogModel(AgentRole.Assistant, string.Empty) + { + CurrentAgentId = agent.Id, + MessageId = messageId + } + }); + + using var textStream = new RealtimeTextStream(); + var toolCalls = new List(); + ChatTokenUsage? tokenUsage = null; + + var responseMessage = new RoleDialogModel(AgentRole.Assistant, string.Empty) + { + CurrentAgentId = agent.Id, + MessageId = messageId + }; + + await foreach (var choice in chatClient.CompleteChatStreamingAsync(messages, options)) + { + tokenUsage = choice.Usage; + + if (!choice.ToolCallUpdates.IsNullOrEmpty()) + { + toolCalls.AddRange(choice.ToolCallUpdates); + } + + if (!choice.ContentUpdate.IsNullOrEmpty()) { - var update = choice.ToolCallUpdates?.FirstOrDefault()?.FunctionArgumentsUpdate?.ToString() ?? string.Empty; - _logger.LogInformation(update); + var text = choice.ContentUpdate[0]?.Text ?? string.Empty; + textStream.Collect(text); - await onMessageReceived(new RoleDialogModel(AgentRole.Assistant, update) +#if DEBUG + _logger.LogCritical($"Content update: {text}"); +#endif + + var content = new RoleDialogModel(AgentRole.Assistant, text) + { + CurrentAgentId = agent.Id, + MessageId = messageId + }; + hub.Push(new() { - RenderedInstruction = string.Join("\r\n", renderedInstructions) + ServiceProvider = _services, + EventName = "OnReceiveLlmStreamMessage", + Data = content }); - continue; } - if (choice.ContentUpdate.IsNullOrEmpty()) continue; + if (choice.FinishReason == ChatFinishReason.ToolCalls || choice.FinishReason == ChatFinishReason.FunctionCall) + { + var meta = toolCalls.FirstOrDefault(x => !string.IsNullOrEmpty(x.FunctionName)); + var functionName = meta?.FunctionName; + var toolCallId = meta?.ToolCallId; + var args = toolCalls.Where(x => x.FunctionArgumentsUpdate != null).Select(x => x.FunctionArgumentsUpdate.ToString()).ToList(); + var functionArgument = string.Join(string.Empty, args); - _logger.LogInformation(choice.ContentUpdate[0]?.Text); +#if DEBUG + _logger.LogCritical($"Tool Call (id: {toolCallId}) => {functionName}({functionArgument})"); +#endif - await onMessageReceived(new RoleDialogModel(choice.Role?.ToString() ?? ChatMessageRole.Assistant.ToString(), choice.ContentUpdate[0]?.Text ?? string.Empty) + responseMessage = new RoleDialogModel(AgentRole.Function, string.Empty) + { + CurrentAgentId = agent.Id, + MessageId = messageId, + ToolCallId = toolCallId, + FunctionName = functionName, + FunctionArgs = functionArgument + }; + } + else if (choice.FinishReason.HasValue) { - RenderedInstruction = string.Join("\r\n", renderedInstructions) + var allText = textStream.GetText(); + _logger.LogCritical($"Text Content: {allText}"); + + responseMessage = new RoleDialogModel(AgentRole.Assistant, allText) + { + CurrentAgentId = agent.Id, + MessageId = messageId, + IsStreaming = true + }; + } + } + + hub.Push(new() + { + ServiceProvider = _services, + EventName = "AfterReceiveLlmStreamMessage", + Data = responseMessage + }); + + + var inputTokenDetails = tokenUsage?.InputTokenDetails; + // After chat completion hook + foreach (var hook in contentHooks) + { + await hook.AfterGenerated(responseMessage, new TokenStatsModel + { + Prompt = prompt, + Provider = Provider, + Model = _model, + TextInputTokens = (tokenUsage?.InputTokenCount ?? 0) - (inputTokenDetails?.CachedTokenCount ?? 0), + CachedTextInputTokens = inputTokenDetails?.CachedTokenCount ?? 0, + TextOutputTokens = tokenUsage?.OutputTokenCount ?? 0 }); } - return true; + return responseMessage; } public void SetModelName(string model) diff --git a/src/Plugins/BotSharp.Plugin.GoogleAI/Providers/Chat/GeminiChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.GoogleAI/Providers/Chat/GeminiChatCompletionProvider.cs index 1ead28e65..1d67eac8a 100644 --- a/src/Plugins/BotSharp.Plugin.GoogleAI/Providers/Chat/GeminiChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.GoogleAI/Providers/Chat/GeminiChatCompletionProvider.cs @@ -159,40 +159,9 @@ public async Task GetChatCompletionsAsync(Agent agent, List GetChatCompletionsStreamingAsync(Agent agent, List conversations, Func onMessageReceived) + public Task GetChatCompletionsStreamingAsync(Agent agent, List conversations) { - var client = ProviderHelper.GetGeminiClient(Provider, _model, _services); - var chatClient = client.CreateGenerativeModel(_model.ToModelId()); - var (prompt, messages) = PrepareOptions(chatClient,agent, conversations); - - var asyncEnumerable = chatClient.StreamContentAsync(messages); - - await foreach (var response in asyncEnumerable) - { - if (response.GetFunction() != null) - { - var func = response.GetFunction(); - var update = func?.Args?.ToJsonString().ToString() ?? string.Empty; - _logger.LogInformation(update); - - await onMessageReceived(new RoleDialogModel(AgentRole.Assistant, update) - { - RenderedInstruction = string.Join("\r\n", renderedInstructions) - }); - continue; - } - - if (response.Text().IsNullOrEmpty()) continue; - - _logger.LogInformation(response.Text()); - - await onMessageReceived(new RoleDialogModel(response.Candidates?.LastOrDefault()?.Content?.Role?.ToString() ?? AgentRole.Assistant.ToString(), response.Text() ?? string.Empty) - { - RenderedInstruction = string.Join("\r\n", renderedInstructions) - }); - } - - return true; + throw new NotImplementedException(); } public void SetModelName(string model) diff --git a/src/Plugins/BotSharp.Plugin.GoogleAI/Providers/Chat/PalmChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.GoogleAI/Providers/Chat/PalmChatCompletionProvider.cs index 72a47adc0..e992fd603 100644 --- a/src/Plugins/BotSharp.Plugin.GoogleAI/Providers/Chat/PalmChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.GoogleAI/Providers/Chat/PalmChatCompletionProvider.cs @@ -145,7 +145,7 @@ public Task GetChatCompletionsAsync(Agent agent, List con throw new NotImplementedException(); } - public Task GetChatCompletionsStreamingAsync(Agent agent, List conversations, Func onMessageReceived) + public Task GetChatCompletionsStreamingAsync(Agent agent, List conversations) { throw new NotImplementedException(); } diff --git a/src/Plugins/BotSharp.Plugin.HuggingFace/Providers/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.HuggingFace/Providers/ChatCompletionProvider.cs index 460677ba5..5a38b0a9c 100644 --- a/src/Plugins/BotSharp.Plugin.HuggingFace/Providers/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.HuggingFace/Providers/ChatCompletionProvider.cs @@ -76,9 +76,9 @@ public async Task GetChatCompletionsAsync(Agent agent, List GetChatCompletionsStreamingAsync(Agent agent, List conversations, Func onMessageReceived) + public Task GetChatCompletionsStreamingAsync(Agent agent, List conversations) { - return true; + throw new NotImplementedException(); } public void SetModelName(string model) diff --git a/src/Plugins/BotSharp.Plugin.LLamaSharp/BotSharp.Plugin.LLamaSharp.csproj b/src/Plugins/BotSharp.Plugin.LLamaSharp/BotSharp.Plugin.LLamaSharp.csproj index af7a6237a..80807a265 100644 --- a/src/Plugins/BotSharp.Plugin.LLamaSharp/BotSharp.Plugin.LLamaSharp.csproj +++ b/src/Plugins/BotSharp.Plugin.LLamaSharp/BotSharp.Plugin.LLamaSharp.csproj @@ -15,7 +15,7 @@ - + diff --git a/src/Plugins/BotSharp.Plugin.LLamaSharp/Providers/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.LLamaSharp/Providers/ChatCompletionProvider.cs index 45f41e95d..321b9aee8 100644 --- a/src/Plugins/BotSharp.Plugin.LLamaSharp/Providers/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.LLamaSharp/Providers/ChatCompletionProvider.cs @@ -1,5 +1,12 @@ using BotSharp.Abstraction.Agents; +using BotSharp.Abstraction.Hooks; using BotSharp.Abstraction.Loggers; +using BotSharp.Abstraction.Observables.Models; +using BotSharp.Core.Infrastructures.Streams; +using BotSharp.Core.Observables.Queues; +using Microsoft.AspNetCore.SignalR; +using static LLama.Common.ChatHistory; +using static System.Net.Mime.MediaTypeNames; namespace BotSharp.Plugin.LLamaSharp.Providers; @@ -159,12 +166,8 @@ public async Task GetChatCompletionsAsync(Agent agent, return true; } - public async Task GetChatCompletionsStreamingAsync(Agent agent, List conversations, Func onMessageReceived) + public async Task GetChatCompletionsStreamingAsync(Agent agent, List conversations) { - string totalResponse = ""; - var content = string.Join("\r\n", conversations.Select(x => $"{x.Role}: {x.Content}")).Trim(); - content += $"\r\n{AgentRole.Assistant}: "; - var state = _services.GetRequiredService(); var model = state.GetState("model", "llama-2-7b-chat.Q8_0"); @@ -180,13 +183,60 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, List>(); + var messageId = conversations.LastOrDefault()?.MessageId ?? string.Empty; + + hub.Push(new() + { + ServiceProvider = _services, + EventName = "BeforeReceiveLlmStreamMessage", + Data = new RoleDialogModel(AgentRole.Assistant, string.Empty) + { + CurrentAgentId = agent.Id, + MessageId = messageId + } + }); + + using var textStream = new RealtimeTextStream(); + var responseMessage = new RoleDialogModel(AgentRole.Assistant, string.Empty) + { + CurrentAgentId = agent.Id, + MessageId = messageId + }; + await foreach (var response in executor.InferAsync(agent.Instruction, inferenceParams)) { Console.Write(response); - totalResponse += response; + textStream.Collect(response); + + var content = new RoleDialogModel(AgentRole.Assistant, response) + { + CurrentAgentId = agent.Id, + MessageId = messageId + }; + hub.Push(new() + { + ServiceProvider = _services, + EventName = "OnReceiveLlmStreamMessage", + Data = content + }); } - return true; + responseMessage = new RoleDialogModel(AgentRole.Assistant, textStream.GetText()) + { + CurrentAgentId = agent.Id, + MessageId = messageId, + IsStreaming = true + }; + + hub.Push(new() + { + ServiceProvider = _services, + EventName = "AfterReceiveLlmStreamMessage", + Data = responseMessage + }); + + return responseMessage; } public void SetModelName(string model) diff --git a/src/Plugins/BotSharp.Plugin.LangChain/Providers/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.LangChain/Providers/ChatCompletionProvider.cs index aa6b1175f..9e2aa1d46 100644 --- a/src/Plugins/BotSharp.Plugin.LangChain/Providers/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.LangChain/Providers/ChatCompletionProvider.cs @@ -65,7 +65,7 @@ public Task GetChatCompletionsAsync(Agent agent, List con throw new NotImplementedException(); } - public Task GetChatCompletionsStreamingAsync(Agent agent, List conversations, Func onMessageReceived) + public Task GetChatCompletionsStreamingAsync(Agent agent, List conversations) { throw new NotImplementedException(); } diff --git a/src/Plugins/BotSharp.Plugin.MetaGLM/Providers/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.MetaGLM/Providers/ChatCompletionProvider.cs index c1d4bef1f..8b3ecea37 100644 --- a/src/Plugins/BotSharp.Plugin.MetaGLM/Providers/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.MetaGLM/Providers/ChatCompletionProvider.cs @@ -235,7 +235,7 @@ public Task GetChatCompletionsAsync(Agent agent, List con throw new NotImplementedException(); } - public Task GetChatCompletionsStreamingAsync(Agent agent, List conversations, Func onMessageReceived) + public Task GetChatCompletionsStreamingAsync(Agent agent, List conversations) { throw new NotImplementedException(); } diff --git a/src/Plugins/BotSharp.Plugin.MicrosoftExtensionsAI/MicrosoftExtensionsAIChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.MicrosoftExtensionsAI/MicrosoftExtensionsAIChatCompletionProvider.cs index d95a25ae0..e510a9a24 100644 --- a/src/Plugins/BotSharp.Plugin.MicrosoftExtensionsAI/MicrosoftExtensionsAIChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.MicrosoftExtensionsAI/MicrosoftExtensionsAIChatCompletionProvider.cs @@ -169,8 +169,10 @@ public Task GetChatCompletionsAsync(Agent agent, List con throw new NotImplementedException(); /// - public Task GetChatCompletionsStreamingAsync(Agent agent, List conversations, Func onMessageReceived) => + public Task GetChatCompletionsStreamingAsync(Agent agent, List conversations) + { throw new NotImplementedException(); + } private sealed class NopAIFunction(string name, string description, JsonElement schema) : AIFunction { diff --git a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs index 16e56b25a..314697c83 100644 --- a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs @@ -1,5 +1,6 @@ using Azure; using BotSharp.Abstraction.Hooks; +using BotSharp.Abstraction.Observables.Models; using BotSharp.Core.Infrastructures.Streams; using BotSharp.Core.Observables.Queues; using BotSharp.Plugin.OpenAI.Models.Realtime; @@ -190,7 +191,7 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, var chatClient = client.GetChatClient(_model); var (prompt, messages, options) = PrepareOptions(agent, conversations); - var hub = _services.GetRequiredService(); + var hub = _services.GetRequiredService>(); var messageId = conversations.LastOrDefault()?.MessageId ?? string.Empty; var contentHooks = _services.GetHooks(agent.Id); @@ -210,7 +211,6 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, MessageId = messageId } }); - using var textStream = new RealtimeTextStream(); var toolCalls = new List(); @@ -273,7 +273,6 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, FunctionName = functionName, FunctionArgs = functionArgument }; - } else if (choice.FinishReason.HasValue) { diff --git a/src/Plugins/BotSharp.Plugin.SemanticKernel/SemanticKernelChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.SemanticKernel/SemanticKernelChatCompletionProvider.cs index 3f742777d..57277399d 100644 --- a/src/Plugins/BotSharp.Plugin.SemanticKernel/SemanticKernelChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.SemanticKernel/SemanticKernelChatCompletionProvider.cs @@ -94,7 +94,7 @@ public Task GetChatCompletionsAsync(Agent agent, List con throw new NotImplementedException(); } /// - public Task GetChatCompletionsStreamingAsync(Agent agent, List conversations, Func onMessageReceived) + public Task GetChatCompletionsStreamingAsync(Agent agent, List conversations) { throw new NotImplementedException(); } diff --git a/src/Plugins/BotSharp.Plugin.SparkDesk/BotSharp.Plugin.SparkDesk.csproj b/src/Plugins/BotSharp.Plugin.SparkDesk/BotSharp.Plugin.SparkDesk.csproj index 51f26f872..d49d41df4 100644 --- a/src/Plugins/BotSharp.Plugin.SparkDesk/BotSharp.Plugin.SparkDesk.csproj +++ b/src/Plugins/BotSharp.Plugin.SparkDesk/BotSharp.Plugin.SparkDesk.csproj @@ -15,7 +15,7 @@ - + diff --git a/src/Plugins/BotSharp.Plugin.SparkDesk/Providers/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.SparkDesk/Providers/ChatCompletionProvider.cs index ba0aa220f..035e03e0c 100644 --- a/src/Plugins/BotSharp.Plugin.SparkDesk/Providers/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.SparkDesk/Providers/ChatCompletionProvider.cs @@ -1,6 +1,10 @@ using BotSharp.Abstraction.Agents; using BotSharp.Abstraction.Agents.Enums; using BotSharp.Abstraction.Loggers; +using BotSharp.Abstraction.Observables.Models; +using BotSharp.Core.Infrastructures.Streams; +using BotSharp.Core.Observables.Queues; +using Microsoft.AspNetCore.SignalR; namespace BotSharp.Plugin.SparkDesk.Providers; @@ -143,34 +147,77 @@ public async Task GetChatCompletionsAsync(Agent agent, List GetChatCompletionsStreamingAsync(Agent agent, List conversations, Func onMessageReceived) + public async Task GetChatCompletionsStreamingAsync(Agent agent, List conversations) { var client = new SparkDeskClient(appId: _settings.AppId, apiKey: _settings.ApiKey, apiSecret: _settings.ApiSecret); var (prompt, messages, funcall) = PrepareOptions(agent, conversations); + var messageId = conversations.LastOrDefault()?.MessageId ?? string.Empty; + var hub = _services.GetRequiredService>(); + + hub.Push(new() + { + ServiceProvider = _services, + EventName = "BeforeReceiveLlmStreamMessage", + Data = new RoleDialogModel(AgentRole.Assistant, string.Empty) + { + CurrentAgentId = agent.Id, + MessageId = messageId + } + }); + + var responseMessage = new RoleDialogModel(AgentRole.Assistant, string.Empty) + { + CurrentAgentId = agent.Id, + MessageId = messageId + }; + + using var textStream = new RealtimeTextStream(); await foreach (StreamedChatResponse response in client.ChatAsStreamAsync(modelVersion: _settings.ModelVersion, messages, functions: funcall.Length == 0 ? null : funcall)) { - if (response.FunctionCall !=null) + if (response.FunctionCall != null) { - await onMessageReceived(new RoleDialogModel(AgentRole.Function, response.Text) - { + responseMessage = new RoleDialogModel(AgentRole.Function, string.Empty) + { CurrentAgentId = agent.Id, + MessageId = messageId, + ToolCallId = response.FunctionCall.Name, FunctionName = response.FunctionCall.Name, - FunctionArgs = response.FunctionCall.Arguments, - RenderedInstruction = string.Join("\r\n", renderedInstructions) - }); - continue; + FunctionArgs = response.FunctionCall.Arguments + }; } - - await onMessageReceived(new RoleDialogModel(AgentRole.Assistant, response.Text) + else { - CurrentAgentId = agent.Id, - RenderedInstruction = string.Join("\r\n", renderedInstructions) - }); - - } + textStream.Collect(response.Text); + responseMessage = new RoleDialogModel(AgentRole.Assistant, response.Text) + { + CurrentAgentId = agent.Id, + MessageId = messageId + }; - return true; + hub.Push(new() + { + ServiceProvider = _services, + EventName = "OnReceiveLlmStreamMessage", + Data = responseMessage + }); + } + } + + if (responseMessage.Role == AgentRole.Assistant) + { + responseMessage.Content = textStream.GetText(); + responseMessage.IsStreaming = true; + } + + hub.Push(new() + { + ServiceProvider = _services, + EventName = "AfterReceiveLlmStreamMessage", + Data = responseMessage + }); + + return responseMessage; } public void SetModelName(string model)