diff --git a/Directory.Packages.props b/Directory.Packages.props index b115ab6b7..b179a20d1 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -111,10 +111,10 @@ - + - - + + diff --git a/src/Infrastructure/BotSharp.Abstraction/Functions/IFunctionCallback.cs b/src/Infrastructure/BotSharp.Abstraction/Functions/IFunctionCallback.cs index b842b5b6d..2019a0577 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Functions/IFunctionCallback.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Functions/IFunctionCallback.cs @@ -2,6 +2,7 @@ namespace BotSharp.Abstraction.Functions; public interface IFunctionCallback { + string Provider => "Botsharp"; string Name { get; } /// diff --git a/src/Infrastructure/BotSharp.Abstraction/MCP/Models/McpServerConfigModel.cs b/src/Infrastructure/BotSharp.Abstraction/MCP/Models/McpServerConfigModel.cs index 2e0967b94..0a80bb895 100644 --- a/src/Infrastructure/BotSharp.Abstraction/MCP/Models/McpServerConfigModel.cs +++ b/src/Infrastructure/BotSharp.Abstraction/MCP/Models/McpServerConfigModel.cs @@ -12,22 +12,21 @@ public class McpServerConfigModel /// public string Name { get; set; } = null!; - /// - /// The type of transport to use. - /// - [JsonPropertyName("transport_type")] - public string TransportType { get; set; } = null!; - - /// - /// For stdio transport: path to the executable - /// For HTTP transport: base URL of the server - /// - public string? Location { get; set; } + public McpSseServerConfig? SseConfig { get; set; } + public McpStdioServerConfig? StdioConfig { get; set; } +} - /// - /// Additional transport-specific configuration. - /// - [JsonPropertyName("transport_options")] - [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] - public Dictionary? TransportOptions { get; set; } +public class McpSseServerConfig +{ + public string EndPoint { get; set; } = null!; + public TimeSpan ConnectionTimeout { get; init; } = TimeSpan.FromSeconds(30); + public Dictionary? AdditionalHeaders { get; set; } } + +public class McpStdioServerConfig +{ + public string Command { get; set; } = null!; + public IList? Arguments { get; set; } + public Dictionary? EnvironmentVariables { get; set; } + public TimeSpan ShutdownTimeout { get; set; } = TimeSpan.FromSeconds(5); +} \ No newline at end of file diff --git a/src/Infrastructure/BotSharp.Abstraction/MCP/Models/McpServerOptionModel.cs b/src/Infrastructure/BotSharp.Abstraction/MCP/Models/McpServerOptionModel.cs new file mode 100644 index 000000000..964203c92 --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/MCP/Models/McpServerOptionModel.cs @@ -0,0 +1,19 @@ +namespace BotSharp.Abstraction.MCP.Models; + +public class McpServerOptionModel : IdName +{ + public IEnumerable Tools { get; set; } = []; + + public McpServerOptionModel() : base() + { + + } + + public McpServerOptionModel( + string id, + string name, + IEnumerable tools) : base(id, name) + { + Tools = tools ?? []; + } +} diff --git a/src/Infrastructure/BotSharp.Abstraction/MCP/Services/IMcpService.cs b/src/Infrastructure/BotSharp.Abstraction/MCP/Services/IMcpService.cs index 7f0546992..71952dacf 100644 --- a/src/Infrastructure/BotSharp.Abstraction/MCP/Services/IMcpService.cs +++ b/src/Infrastructure/BotSharp.Abstraction/MCP/Services/IMcpService.cs @@ -2,5 +2,5 @@ namespace BotSharp.Abstraction.MCP.Services; public interface IMcpService { - IEnumerable GetServerConfigs() => []; + IEnumerable GetServerConfigs() => []; } diff --git a/src/Infrastructure/BotSharp.Abstraction/Models/IdName.cs b/src/Infrastructure/BotSharp.Abstraction/Models/IdName.cs index e5f7d8b5b..107c7dd8f 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Models/IdName.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Models/IdName.cs @@ -8,6 +8,11 @@ public class IdName [JsonPropertyName("name")] public string Name { get; set; } = default!; + public IdName() + { + + } + public IdName(string id, string name) { Id = id; diff --git a/src/Infrastructure/BotSharp.Abstraction/Models/MessageState.cs b/src/Infrastructure/BotSharp.Abstraction/Models/MessageState.cs index 15fd6a377..ff26e981d 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Models/MessageState.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Models/MessageState.cs @@ -3,7 +3,7 @@ namespace BotSharp.Abstraction.Models; public class MessageState { public string Key { get; set; } - public string Value { get; set; } + public object Value { get; set; } [JsonPropertyName("active_rounds")] public int ActiveRounds { get; set; } = -1; @@ -13,7 +13,7 @@ public MessageState() } - public MessageState(string key, string value, int activeRounds = -1) + public MessageState(string key, object value, int activeRounds = -1) { Key = key; Value = value; diff --git a/src/Infrastructure/BotSharp.Abstraction/Realtime/Models/RealtimeHubConnection.cs b/src/Infrastructure/BotSharp.Abstraction/Realtime/Models/RealtimeHubConnection.cs index b3132df71..0f2af400e 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Realtime/Models/RealtimeHubConnection.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Realtime/Models/RealtimeHubConnection.cs @@ -13,6 +13,7 @@ public class RealtimeHubConnection public Func OnModelMessageReceived { get; set; } = null!; public Func OnModelAudioResponseDone { get; set; } = null!; public Func OnModelUserInterrupted { get; set; } = null!; + public Func OnUserSpeechDetected { get; set; } = () => string.Empty; public void ResetResponseState() { diff --git a/src/Infrastructure/BotSharp.Abstraction/Utilities/StringExtensions.cs b/src/Infrastructure/BotSharp.Abstraction/Utilities/StringExtensions.cs index af222d0dd..94bda0cd3 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Utilities/StringExtensions.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Utilities/StringExtensions.cs @@ -108,4 +108,27 @@ public static bool IsPrimitiveValue(this string value) uint.TryParse(value, out _) || ulong.TryParse(value, out _); } + + + public static string ConvertToString(this T? value, JsonSerializerOptions? jsonOptions = null) + { + if (value == null) + { + return string.Empty; + } + + if (value is string s) + { + return s; + } + + if (value is JsonElement elem + && elem.ValueKind == JsonValueKind.String) + { + return elem.ToString(); + } + + var str = JsonSerializer.Serialize(value, jsonOptions); + return str; + } } diff --git a/src/Infrastructure/BotSharp.Core.MCP/BotSharpMCPExtensions.cs b/src/Infrastructure/BotSharp.Core.MCP/BotSharpMCPExtensions.cs index ee0654596..38522fa9f 100644 --- a/src/Infrastructure/BotSharp.Core.MCP/BotSharpMCPExtensions.cs +++ b/src/Infrastructure/BotSharp.Core.MCP/BotSharpMCPExtensions.cs @@ -4,7 +4,6 @@ using BotSharp.Core.MCP.Services; using BotSharp.Core.MCP.Settings; using Microsoft.Extensions.Configuration; -using ModelContextProtocol; using ModelContextProtocol.Client; namespace BotSharp.Core.MCP; @@ -21,7 +20,7 @@ public static IServiceCollection AddBotSharpMCP(this IServiceCollection services { services.AddScoped(); var settings = config.GetSection("MCP").Get(); - services.AddScoped(provider => { return settings; }); + services.AddScoped(provider => settings); if (settings != null && settings.Enabled && !settings.McpServerConfigs.IsNullOrEmpty()) { @@ -42,18 +41,18 @@ public static IServiceCollection AddBotSharpMCP(this IServiceCollection services return services; } - private static async Task RegisterFunctionCall(IServiceCollection services, McpServerConfig server, McpClientManager clientManager) + private static async Task RegisterFunctionCall(IServiceCollection services, McpServerConfigModel server, McpClientManager clientManager) { var client = await clientManager.GetMcpClientAsync(server.Id); var tools = await client.ListToolsAsync(); foreach (var tool in tools) { - services.AddScoped(provider => { return tool; }); + services.AddScoped(provider => tool); services.AddScoped(provider => { - var funcTool = new McpToolAdapter(provider, tool, clientManager); + var funcTool = new McpToolAdapter(provider, server.Name, tool, clientManager); return funcTool; }); } diff --git a/src/Infrastructure/BotSharp.Core.MCP/Functions/McpToolAdapter.cs b/src/Infrastructure/BotSharp.Core.MCP/Functions/McpToolAdapter.cs index 2d4fdc702..ad0272199 100644 --- a/src/Infrastructure/BotSharp.Core.MCP/Functions/McpToolAdapter.cs +++ b/src/Infrastructure/BotSharp.Core.MCP/Functions/McpToolAdapter.cs @@ -6,17 +6,24 @@ namespace BotSharp.Core.MCP.Functions; public class McpToolAdapter : IFunctionCallback { + private readonly string _provider; private readonly McpClientTool _tool; private readonly McpClientManager _clientManager; private readonly IServiceProvider _services; - public McpToolAdapter(IServiceProvider services, McpClientTool tool, McpClientManager client) + public McpToolAdapter( + IServiceProvider services, + string serverName, + McpClientTool tool, + McpClientManager client) { _services = services ?? throw new ArgumentNullException(nameof(services)); _tool = tool ?? throw new ArgumentNullException(nameof(tool)); _clientManager = client ?? throw new ArgumentNullException(nameof(client)); + _provider = serverName; } + public string Provider => _provider; public string Name => _tool.Name; public async Task Execute(RoleDialogModel message) diff --git a/src/Infrastructure/BotSharp.Core.MCP/Managers/McpClientManager.cs b/src/Infrastructure/BotSharp.Core.MCP/Managers/McpClientManager.cs index c11163b38..a5441e669 100644 --- a/src/Infrastructure/BotSharp.Core.MCP/Managers/McpClientManager.cs +++ b/src/Infrastructure/BotSharp.Core.MCP/Managers/McpClientManager.cs @@ -1,5 +1,6 @@ using BotSharp.Core.MCP.Settings; using ModelContextProtocol.Client; +using ModelContextProtocol.Protocol.Transport; namespace BotSharp.Core.MCP.Managers; @@ -14,9 +15,33 @@ public McpClientManager(McpSettings mcpSettings) public async Task GetMcpClientAsync(string serverId) { - return await McpClientFactory.CreateAsync( - _mcpSettings.McpServerConfigs.Where(x=> x.Name == serverId).First(), - _mcpSettings.McpClientOptions); + var config = _mcpSettings.McpServerConfigs.Where(x => x.Id == serverId).FirstOrDefault(); + + IClientTransport transport; + if (config.SseConfig != null) + { + transport = new SseClientTransport(new SseClientTransportOptions + { + Name = config.Name, + Endpoint = new Uri(config.SseConfig.EndPoint) + }); + } + else if (config.StdioConfig != null) + { + transport = new StdioClientTransport(new StdioClientTransportOptions + { + Name = config.Name, + Command = config.StdioConfig.Command, + Arguments = config.StdioConfig.Arguments, + EnvironmentVariables = config.StdioConfig.EnvironmentVariables + }); + } + else + { + throw new ArgumentNullException("Invalid MCP server configuration!"); + } + + return await McpClientFactory.CreateAsync(transport, _mcpSettings.McpClientOptions); } public void Dispose() diff --git a/src/Infrastructure/BotSharp.Core.MCP/Services/McpService.cs b/src/Infrastructure/BotSharp.Core.MCP/Services/McpService.cs index 786f0857f..bbc2c3f4d 100644 --- a/src/Infrastructure/BotSharp.Core.MCP/Services/McpService.cs +++ b/src/Infrastructure/BotSharp.Core.MCP/Services/McpService.cs @@ -16,17 +16,26 @@ public McpService( _logger = logger; } - public IEnumerable GetServerConfigs() + public IEnumerable GetServerConfigs() { + var options = new List(); var settings = _services.GetRequiredService(); var configs = settings?.McpServerConfigs ?? []; - return configs.Select(x => new McpServerConfigModel + + foreach (var config in configs) { - Id = x.Id, - Name = x.Name, - TransportType = x.TransportType, - TransportOptions = x.TransportOptions, - Location = x.Location - }); + var tools = _services.GetServices() + .Where(x => x.Provider == config.Name) + .Select(x => x.Name); + + options.Add(new McpServerOptionModel + { + Id = config.Id, + Name = config.Name, + Tools = tools + }); + } + + return options; } } diff --git a/src/Infrastructure/BotSharp.Core.MCP/Settings/MCPSettings.cs b/src/Infrastructure/BotSharp.Core.MCP/Settings/MCPSettings.cs index cd4bfd0f2..2867712f9 100644 --- a/src/Infrastructure/BotSharp.Core.MCP/Settings/MCPSettings.cs +++ b/src/Infrastructure/BotSharp.Core.MCP/Settings/MCPSettings.cs @@ -1,5 +1,4 @@ using ModelContextProtocol.Client; -using ModelContextProtocol; namespace BotSharp.Core.MCP.Settings; @@ -7,6 +6,6 @@ public class McpSettings { public bool Enabled { get; set; } = true; public McpClientOptions McpClientOptions { get; set; } - public List McpServerConfigs { get; set; } = new(); + public List McpServerConfigs { get; set; } = []; } diff --git a/src/Infrastructure/BotSharp.Core.Realtime/Services/RealtimeHub.cs b/src/Infrastructure/BotSharp.Core.Realtime/Services/RealtimeHub.cs index 4eae3a648..0c36eee3d 100644 --- a/src/Infrastructure/BotSharp.Core.Realtime/Services/RealtimeHub.cs +++ b/src/Infrastructure/BotSharp.Core.Realtime/Services/RealtimeHub.cs @@ -150,6 +150,9 @@ await _completer.Connect(_conn, var data = _conn.OnModelUserInterrupted(); await (responseToUser?.Invoke(data) ?? Task.CompletedTask); } + + var res = _conn.OnUserSpeechDetected(); + await (responseToUser?.Invoke(res) ?? Task.CompletedTask); }); } diff --git a/src/Infrastructure/BotSharp.Core.Realtime/Services/WaveStreamChannel.cs b/src/Infrastructure/BotSharp.Core.Realtime/Services/WaveStreamChannel.cs index b10004534..34f823799 100644 --- a/src/Infrastructure/BotSharp.Core.Realtime/Services/WaveStreamChannel.cs +++ b/src/Infrastructure/BotSharp.Core.Realtime/Services/WaveStreamChannel.cs @@ -38,10 +38,13 @@ public async Task ConnectAsync(string conversationId) // Initialize audio output for streaming var waveFormat = new WaveFormat(24000, 16, 1); // 24000 Hz, 16-bit PCM, Mono _bufferedWaveProvider = new BufferedWaveProvider(waveFormat); - _bufferedWaveProvider.BufferLength = 1024 * 1024; // Buffer length + _bufferedWaveProvider.BufferDuration = TimeSpan.FromMinutes(10); _bufferedWaveProvider.DiscardOnBufferOverflow = true; - - _waveOut = new WaveOutEvent(); + + _waveOut = new WaveOutEvent() + { + DeviceNumber = 0 + }; _waveOut.Init(_bufferedWaveProvider); _waveOut.Play(); } diff --git a/src/Infrastructure/BotSharp.Core/Agents/Services/AgentService.LoadAgent.cs b/src/Infrastructure/BotSharp.Core/Agents/Services/AgentService.LoadAgent.cs index d3ceea8ba..e10b79ccd 100644 --- a/src/Infrastructure/BotSharp.Core/Agents/Services/AgentService.LoadAgent.cs +++ b/src/Infrastructure/BotSharp.Core/Agents/Services/AgentService.LoadAgent.cs @@ -67,7 +67,7 @@ public async Task LoadAgent(string id, bool loadUtility = true) hook.OnSamplesLoaded(agent.Samples); } - if (loadUtility) + if (loadUtility && !agent.Utilities.IsNullOrEmpty()) { hook.OnAgentUtilityLoaded(agent); } diff --git a/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationStateService.cs b/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationStateService.cs index 37c8c7604..9e101b745 100644 --- a/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationStateService.cs +++ b/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationStateService.cs @@ -15,6 +15,7 @@ limitations under the License. ******************************************************************************/ using BotSharp.Abstraction.Conversations.Enums; +using BotSharp.Abstraction.Options; using BotSharp.Abstraction.SideCar; namespace BotSharp.Core.Conversations.Services; @@ -69,9 +70,11 @@ public IConversationStateService SetState(string name, T value, bool isNeedVe return this; } + var options = _services.GetRequiredService(); + var defaultRound = -1; var preValue = string.Empty; - var currentValue = value.ToString(); + var currentValue = value.ConvertToString(options.JsonSerializerOptions); var curActive = true; StateKeyValue? pair = null; StateValue? prevLeafNode = null; diff --git a/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationStorage.cs b/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationStorage.cs index 75d1a5012..49852eb22 100644 --- a/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationStorage.cs +++ b/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationStorage.cs @@ -19,75 +19,7 @@ public ConversationStorage( public void Append(string conversationId, RoleDialogModel dialog) { - var agentId = dialog.CurrentAgentId; - var db = _services.GetRequiredService(); - var dialogElements = new List(); - - // Prevent duplicate record to be inserted - /*var dialogs = db.GetConversationDialogs(conversationId); - if (dialogs.Any(x => x.MetaData.MessageId == dialog.MessageId && x.Content == dialog.Content)) - { - return; - }*/ - - if (dialog.Role == AgentRole.Function) - { - var meta = new DialogMetaData - { - Role = dialog.Role, - AgentId = agentId, - MessageId = dialog.MessageId, - MessageType = dialog.MessageType, - FunctionName = dialog.FunctionName, - CreatedTime = dialog.CreatedAt - }; - - var content = dialog.Content.RemoveNewLine(); - if (string.IsNullOrEmpty(content)) - { - return; - } - dialogElements.Add(new DialogElement - { - MetaData = meta, - Content = dialog.Content, - SecondaryContent = dialog.SecondaryContent, - Payload = dialog.Payload - }); - } - else - { - var meta = new DialogMetaData - { - Role = dialog.Role, - AgentId = agentId, - MessageId = dialog.MessageId, - MessageType = dialog.MessageType, - SenderId = dialog.SenderId, - FunctionName = dialog.FunctionName, - CreatedTime = dialog.CreatedAt - }; - - var content = dialog.Content.RemoveNewLine(); - if (string.IsNullOrEmpty(content)) - { - return; - } - - var richContent = dialog.RichContent != null ? JsonSerializer.Serialize(dialog.RichContent, _options.JsonSerializerOptions) : null; - var secondaryRichContent = dialog.SecondaryRichContent != null ? JsonSerializer.Serialize(dialog.SecondaryRichContent, _options.JsonSerializerOptions) : null; - dialogElements.Add(new DialogElement - { - MetaData = meta, - Content = dialog.Content, - SecondaryContent = dialog.SecondaryContent, - RichContent = richContent, - SecondaryRichContent = secondaryRichContent, - Payload = dialog.Payload - }); - } - - db.AppendConversationDialogs(conversationId, dialogElements); + Append(conversationId, [dialog]); } public void Append(string conversationId, IEnumerable dialogs) diff --git a/src/Infrastructure/BotSharp.OpenAPI/Controllers/ConversationController.cs b/src/Infrastructure/BotSharp.OpenAPI/Controllers/ConversationController.cs index b0a0fa6d9..55e3009f6 100644 --- a/src/Infrastructure/BotSharp.OpenAPI/Controllers/ConversationController.cs +++ b/src/Infrastructure/BotSharp.OpenAPI/Controllers/ConversationController.cs @@ -33,7 +33,7 @@ public async Task NewConversation([FromRoute] string agen var conv = new Conversation { AgentId = agentId, - Channel = channel == default ? ConversationChannel.OpenAPI : channel.Value, + Channel = channel == default ? ConversationChannel.OpenAPI : channel.Value.ToString(), Tags = config.Tags ?? new(), TaskId = config.TaskId }; diff --git a/src/Infrastructure/BotSharp.OpenAPI/Controllers/McpController.cs b/src/Infrastructure/BotSharp.OpenAPI/Controllers/McpController.cs index 0524f0fbc..7b74a37ec 100644 --- a/src/Infrastructure/BotSharp.OpenAPI/Controllers/McpController.cs +++ b/src/Infrastructure/BotSharp.OpenAPI/Controllers/McpController.cs @@ -13,7 +13,7 @@ public McpController(IServiceProvider services) } [HttpGet("/mcp/server-configs")] - public IEnumerable GetMcpServerConfigs() + public IEnumerable GetMcpServerConfigs() { var mcp = _services.GetRequiredService(); return mcp.GetServerConfigs(); diff --git a/src/Plugins/BotSharp.Plugin.GoogleAI/Providers/Realtime/RealTimeCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.GoogleAI/Providers/Realtime/RealTimeCompletionProvider.cs index f9ef590ab..cdebc5f6d 100644 --- a/src/Plugins/BotSharp.Plugin.GoogleAI/Providers/Realtime/RealTimeCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.GoogleAI/Providers/Realtime/RealTimeCompletionProvider.cs @@ -119,17 +119,18 @@ private Task AttachEvents(MultiModalLiveClient client) { client.Connected += (sender, e) => { - _logger.LogInformation("Google Realtime Client connected"); + _logger.LogInformation("Google Realtime Client connected."); onModelReady(); }; client.Disconnected += (sender, e) => { - _logger.LogInformation("Google Realtime Client disconnected"); + _logger.LogInformation("Google Realtime Client disconnected."); }; client.MessageReceived += async (sender, e) => { + _logger.LogInformation("User message received."); if (e.Payload.SetupComplete != null) { onConversationItemCreated(_client.ConnectionId.ToString()); @@ -156,12 +157,14 @@ private Task AttachEvents(MultiModalLiveClient client) }; client.GenerationInterrupted += (sender, e) => - { + { + _logger.LogInformation("Audio generation interrupted."); onUserInterrupted(); }; client.AudioReceiveCompleted += (sender, e) => - { + { + _logger.LogInformation("Audio receive completed."); onModelAudioResponseDone(); }; diff --git a/src/Plugins/BotSharp.Plugin.OpenAI/Models/Realtime/SessionConversationUpdate.cs b/src/Plugins/BotSharp.Plugin.OpenAI/Models/Realtime/SessionConversationUpdate.cs new file mode 100644 index 000000000..e2b12f57a --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.OpenAI/Models/Realtime/SessionConversationUpdate.cs @@ -0,0 +1,11 @@ +namespace BotSharp.Plugin.OpenAI.Models.Realtime; + +public class SessionConversationUpdate +{ + public string RawResponse { get; set; } + + public SessionConversationUpdate() + { + + } +} diff --git a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Realtime/RealTimeCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Realtime/RealTimeCompletionProvider.cs index f4f3120f0..959ed4578 100644 --- a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Realtime/RealTimeCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Realtime/RealTimeCompletionProvider.cs @@ -1,6 +1,6 @@ using BotSharp.Plugin.OpenAI.Models.Realtime; +using BotSharp.Plugin.OpenAI.Providers.Realtime.Session; using OpenAI.Chat; -using System.Net.WebSockets; namespace BotSharp.Plugin.OpenAI.Providers.Realtime; @@ -18,7 +18,7 @@ public class RealTimeCompletionProvider : IRealTimeCompletion private readonly BotSharpOptions _options; protected string _model = "gpt-4o-mini-realtime-preview"; - private ClientWebSocket _webSocket; + private RealtimeChatSession _session; public RealTimeCompletionProvider( OpenAiSettings settings, @@ -45,20 +45,11 @@ public async Task Connect(RealtimeHubConnection conn, var realtimeModelSettings = _services.GetRequiredService(); _model = realtimeModelSettings.Model; - var settingsService = _services.GetRequiredService(); - var settings = settingsService.GetSetting(Provider, _model); - - _webSocket?.Dispose(); - _webSocket = new ClientWebSocket(); - _webSocket.Options.SetRequestHeader("Authorization", $"Bearer {settings.ApiKey}"); - _webSocket.Options.SetRequestHeader("OpenAI-Beta", "realtime=v1"); - - await _webSocket.ConnectAsync(new Uri($"wss://api.openai.com/v1/realtime?model={_model}"), CancellationToken.None); + _session?.Dispose(); + _session = new RealtimeChatSession(_services, _options); + await _session.ConnectAsync(Provider, _model, CancellationToken.None); - if (_webSocket.State == WebSocketState.Open) - { - // Receive a message - _ = ReceiveMessage(conn, + _ = ReceiveMessage(conn, onModelReady, onModelAudioDeltaReceived, onModelAudioResponseDone, @@ -67,15 +58,11 @@ public async Task Connect(RealtimeHubConnection conn, onConversationItemCreated, onInputAudioTranscriptionCompleted, onInterruptionDetected); - } } public async Task Disconnect() { - if (_webSocket.State == WebSocketState.Open) - { - await _webSocket.CloseAsync(WebSocketCloseStatus.Empty, null, CancellationToken.None); - } + _session?.Disconnect(); } public async Task AppenAudioBuffer(string message) @@ -137,7 +124,7 @@ await SendEventToModel(new private async Task ReceiveMessage(RealtimeHubConnection conn, Action onModelReady, - Action onModelAudioDeltaReceived, + Action onModelAudioDeltaReceived, Action onModelAudioResponseDone, Action onModelAudioTranscriptDone, Action> onModelResponseDone, @@ -145,33 +132,9 @@ private async Task ReceiveMessage(RealtimeHubConnection conn, Action onUserAudioTranscriptionCompleted, Action onInterruptionDetected) { - var buffer = new byte[1024 * 1024 * 32]; - // Model response timeout - var settings = _services.GetRequiredService(); - var timeout = settings.ModelResponseTimeout; - WebSocketReceiveResult? result = default; - - do + await foreach (SessionConversationUpdate update in _session.ReceiveUpdatesAsync(CancellationToken.None)) { - Array.Clear(buffer, 0, buffer.Length); - - var taskWorker = _webSocket.ReceiveAsync(new ArraySegment(buffer), CancellationToken.None); - var taskTimer = Task.Delay(1000 * timeout); - var completedTask = await Task.WhenAny(taskWorker, taskTimer); - - if (completedTask == taskWorker) - { - result = taskWorker.Result; - } - else - { - _logger.LogWarning($"Timeout {timeout} seconds waiting for Model response."); - await TriggerModelInference("Response user immediately"); - continue; - } - - // Convert received data to text/audio (Twilio sends Base64-encoded audio) - string receivedText = Encoding.UTF8.GetString(buffer, 0, result.Count); + var receivedText = update?.RawResponse; if (string.IsNullOrEmpty(receivedText)) { continue; @@ -179,8 +142,6 @@ private async Task ReceiveMessage(RealtimeHubConnection conn, var response = JsonSerializer.Deserialize(receivedText); - _logger.LogDebug($"{nameof(RealTimeCompletionProvider)} received: {response.Type} {receivedText.Length}"); - if (response.Type == "error") { _logger.LogError($"{response.Type}: {receivedText}"); @@ -217,6 +178,11 @@ private async Task ReceiveMessage(RealtimeHubConnection conn, _logger.LogDebug($"{response.Type}: {receivedText}"); onModelAudioDeltaReceived(audio.Delta, audio.ItemId); } + else + { + _logger.LogDebug($"{response.Type}: {receivedText}"); + onModelAudioDeltaReceived(audio.Delta, audio.ItemId); + } } else if (response.Type == "response.audio.done") { @@ -248,27 +214,14 @@ private async Task ReceiveMessage(RealtimeHubConnection conn, // Handle user interuption onInterruptionDetected(); } - - } while (!result.CloseStatus.HasValue); - - await _webSocket.CloseAsync(result.CloseStatus.Value, result.CloseStatusDescription, CancellationToken.None); + } } public async Task SendEventToModel(object message) { - if (_webSocket.State != WebSocketState.Open) - { - return; - } - - if (message is not string data) - { - data = JsonSerializer.Serialize(message, _options.JsonSerializerOptions); - } + if (_session == null) return; - var buffer = Encoding.UTF8.GetBytes(data); - - await _webSocket.SendAsync(new ArraySegment(buffer), WebSocketMessageType.Text, true, CancellationToken.None); + await _session.SendEventToModel(message); } public async Task UpdateSession(RealtimeHubConnection conn) diff --git a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Realtime/Session/AiWebsocketPipelineResponse.cs b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Realtime/Session/AiWebsocketPipelineResponse.cs new file mode 100644 index 000000000..2a62cdc91 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Realtime/Session/AiWebsocketPipelineResponse.cs @@ -0,0 +1,116 @@ +using System.ClientModel.Primitives; +using System.Net; +using System.Net.WebSockets; + +namespace BotSharp.Plugin.OpenAI.Providers.Realtime.Session; + +public class AiWebsocketPipelineResponse : PipelineResponse +{ + + public AiWebsocketPipelineResponse() + { + + } + + private int _status; + public override int Status => _status; + + private string _reasonPhrase; + public override string ReasonPhrase => _reasonPhrase; + + private MemoryStream _contentStream = new(); + public override Stream? ContentStream + { + get + { + return _contentStream != null ? _contentStream : new MemoryStream(); + } + set => throw new NotImplementedException(); + } + + private BinaryData _content; + public override BinaryData Content + { + get + { + if (_content == null) + { + _content = new(_contentStream.ToArray()); + } + return _content; + } + } + + protected override PipelineResponseHeaders HeadersCore => throw new NotImplementedException(); + + public bool IsComplete { get; private set; } = false; + + + public void CollectReceivedResult(WebSocketReceiveResult receivedResult, BinaryData receivedBytes) + { + if (ContentStream.Length == 0) + { + _status = ConvertWebsocketCloseStatusToHttpStatus(receivedResult.CloseStatus ?? WebSocketCloseStatus.Empty); + _reasonPhrase = receivedResult.CloseStatusDescription?? (receivedResult.CloseStatus ?? WebSocketCloseStatus.Empty).ToString(); + } + else if (receivedResult.MessageType != WebSocketMessageType.Text) + { + throw new NotImplementedException($"{nameof(AiWebsocketPipelineResponse)} currently supports only text messages."); + } + + var rawBytes = receivedBytes.ToArray(); + _contentStream.Position = _contentStream.Length; + _contentStream.Write(rawBytes, 0, rawBytes.Length); + _contentStream.Position = 0; + IsComplete = receivedResult.EndOfMessage; + } + + public override BinaryData BufferContent(CancellationToken cancellationToken = default) + { + return Content; + } + + public override ValueTask BufferContentAsync(CancellationToken cancellationToken = default) + { + return new ValueTask(Task.FromResult(Content)); + } + + public override void Dispose() + { + ContentStream?.Dispose(); + } + + private static int ConvertWebsocketCloseStatusToHttpStatus(WebSocketCloseStatus status) + { + int res; + + switch (status) + { + case WebSocketCloseStatus.Empty: + case WebSocketCloseStatus.NormalClosure: + res = (int)HttpStatusCode.OK; + break; + case WebSocketCloseStatus.EndpointUnavailable: + case WebSocketCloseStatus.ProtocolError: + case WebSocketCloseStatus.InvalidMessageType: + case WebSocketCloseStatus.InvalidPayloadData: + case WebSocketCloseStatus.PolicyViolation: + res = (int)HttpStatusCode.BadRequest; + break; + case WebSocketCloseStatus.MessageTooBig: + res = (int)HttpStatusCode.RequestEntityTooLarge; + break; + case WebSocketCloseStatus.MandatoryExtension: + res = 418; + break; + case WebSocketCloseStatus.InternalServerError: + res = (int)HttpStatusCode.InternalServerError; + break; + default: + res = (int)HttpStatusCode.InternalServerError; + break; + } + + return res; + } +} diff --git a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Realtime/Session/AsyncWebsocketDataCollectionResult.cs b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Realtime/Session/AsyncWebsocketDataCollectionResult.cs new file mode 100644 index 000000000..38b46c905 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Realtime/Session/AsyncWebsocketDataCollectionResult.cs @@ -0,0 +1,38 @@ +using System.ClientModel; +using System.Net.WebSockets; + +namespace BotSharp.Plugin.OpenAI.Providers.Realtime.Session; + +public class AsyncWebsocketDataCollectionResult : AsyncCollectionResult +{ + private readonly WebSocket _webSocket; + private readonly CancellationToken _cancellationToken; + + public AsyncWebsocketDataCollectionResult( + WebSocket webSocket, + CancellationToken cancellationToken) + { + _webSocket = webSocket; + _cancellationToken = cancellationToken; + } + + public override ContinuationToken? GetContinuationToken(ClientResult page) + { + return null; + } + + public override async IAsyncEnumerable GetRawPagesAsync() + { + await using var enumerator = new AsyncWebsocketDataResultEnumerator(_webSocket, _cancellationToken); + while (await enumerator.MoveNextAsync().ConfigureAwait(false)) + { + yield return enumerator.Current; + } + } + + protected override async IAsyncEnumerable GetValuesFromPageAsync(ClientResult page) + { + await Task.CompletedTask; + yield return page; + } +} diff --git a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Realtime/Session/AsyncWebsocketDataResultEnumerator.cs b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Realtime/Session/AsyncWebsocketDataResultEnumerator.cs new file mode 100644 index 000000000..1d0a3da2d --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Realtime/Session/AsyncWebsocketDataResultEnumerator.cs @@ -0,0 +1,52 @@ +using System; +using System.Buffers; +using System.ClientModel; +using System.Net.WebSockets; + +namespace BotSharp.Plugin.OpenAI.Providers.Realtime.Session; + +public class AsyncWebsocketDataResultEnumerator : IAsyncEnumerator +{ + private readonly WebSocket _webSocket; + private readonly CancellationToken _cancellationToken; + private readonly byte[] _buffer; + + public AsyncWebsocketDataResultEnumerator( + WebSocket webSocket, + CancellationToken cancellationToken) + { + _webSocket = webSocket; + _cancellationToken = cancellationToken; + _buffer = ArrayPool.Shared.Rent(1024 * 32); + } + + public ClientResult Current { get; private set; } + + public ValueTask DisposeAsync() + { + _webSocket?.Dispose(); + return new ValueTask(Task.CompletedTask); + } + + public async ValueTask MoveNextAsync() + { + var response = new AiWebsocketPipelineResponse(); + while (!response.IsComplete) + { + var receivedResult = await _webSocket.ReceiveAsync(new(_buffer), _cancellationToken); + + if (receivedResult.CloseStatus.HasValue) + { + Current = null; + return false; + } + + var receivedBytes = _buffer.AsMemory(0, receivedResult.Count); + var receivedData = BinaryData.FromBytes(receivedBytes); + response.CollectReceivedResult(receivedResult, receivedData); + } + + Current = ClientResult.FromResponse(response); + return true; + } +} diff --git a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Realtime/Session/RealtimeChatSession.cs b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Realtime/Session/RealtimeChatSession.cs new file mode 100644 index 000000000..788c2a000 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Realtime/Session/RealtimeChatSession.cs @@ -0,0 +1,109 @@ +using BotSharp.Plugin.OpenAI.Models.Realtime; +using System.ClientModel; +using System.Net.WebSockets; +using System.Runtime.CompilerServices; + +namespace BotSharp.Plugin.OpenAI.Providers.Realtime.Session; + +public class RealtimeChatSession : IDisposable +{ + private readonly IServiceProvider _services; + private readonly BotSharpOptions _options; + + private ClientWebSocket _webSocket; + private readonly object _singleReceiveLock = new(); + private readonly SemaphoreSlim _clientEventSemaphore = new(initialCount: 1, maxCount: 1); + private AsyncWebsocketDataCollectionResult _receivedCollectionResult; + + public RealtimeChatSession( + IServiceProvider services, + BotSharpOptions options) + { + _services = services; + _options = options; + } + + public async Task ConnectAsync(string provider, string model, CancellationToken cancellationToken = default) + { + var settingsService = _services.GetRequiredService(); + var settings = settingsService.GetSetting(provider, model); + + _webSocket?.Dispose(); + _webSocket = new ClientWebSocket(); + _webSocket.Options.SetRequestHeader("Authorization", $"Bearer {settings.ApiKey}"); + _webSocket.Options.SetRequestHeader("OpenAI-Beta", "realtime=v1"); + + await _webSocket.ConnectAsync(new Uri($"wss://api.openai.com/v1/realtime?model={model}"), cancellationToken); + } + + public async IAsyncEnumerable ReceiveUpdatesAsync([EnumeratorCancellation] CancellationToken cancellationToken = default) + { + await foreach (ClientResult result in ReceiveInnerUpdatesAsync()) + { + var update = HandleSessionResult(result); + yield return update; + } + } + + public async IAsyncEnumerable ReceiveInnerUpdatesAsync([EnumeratorCancellation] CancellationToken cancellationToken = default) + { + lock (_singleReceiveLock) + { + _receivedCollectionResult ??= new(_webSocket, cancellationToken); + } + + await foreach (var result in _receivedCollectionResult) + { + yield return result; + } + } + + private SessionConversationUpdate HandleSessionResult(ClientResult result) + { + using var response = result.GetRawResponse(); + var bytes = response.Content.ToArray(); + var text = Encoding.UTF8.GetString(bytes, 0, bytes.Length); + return new SessionConversationUpdate + { + RawResponse = text + }; + } + + public async Task SendEventToModel(object message) + { + if (_webSocket.State != WebSocketState.Open) + { + return; + } + + await _clientEventSemaphore.WaitAsync().ConfigureAwait(false); + + try + { + if (message is not string data) + { + data = JsonSerializer.Serialize(message, _options.JsonSerializerOptions); + } + + var buffer = Encoding.UTF8.GetBytes(data); + await _webSocket.SendAsync(new ArraySegment(buffer), WebSocketMessageType.Text, true, CancellationToken.None); + } + finally + { + _clientEventSemaphore.Release(); + } + } + + public async Task Disconnect() + { + if (_webSocket.State == WebSocketState.Open) + { + await _webSocket.CloseAsync(WebSocketCloseStatus.Empty, null, CancellationToken.None); + } + } + + public void Dispose() + { + _webSocket?.Dispose(); + } +} diff --git a/src/WebStarter/appsettings.json b/src/WebStarter/appsettings.json index 69625129f..7ab6f7fd6 100644 --- a/src/WebStarter/appsettings.json +++ b/src/WebStarter/appsettings.json @@ -218,9 +218,9 @@ { "Id": "PizzaServer", "Name": "PizzaServer", - "TransportType": "sse", - "TransportOptions": [], - "Location": "http://localhost:58905/sse" + "SseConfig": { + "Endpoint": "http://localhost:58905/sse" + } } ] }, diff --git a/tests/BotSharp.PizzaBot.MCPServer/Properties/launchSettings.json b/tests/BotSharp.PizzaBot.MCPServer/Properties/launchSettings.json index 617e672ca..3333cfcd9 100644 --- a/tests/BotSharp.PizzaBot.MCPServer/Properties/launchSettings.json +++ b/tests/BotSharp.PizzaBot.MCPServer/Properties/launchSettings.json @@ -2,7 +2,7 @@ "profiles": { "BotSharp.PizzaBot.MCPServer": { "commandName": "Project", - "launchBrowser": true, + "launchBrowser": false, "environmentVariables": { "ASPNETCORE_ENVIRONMENT": "Development" }, diff --git a/tests/BotSharp.PizzaBot.MCPServer/Tools/MakePayment.cs b/tests/BotSharp.PizzaBot.MCPServer/Tools/MakePayment.cs index 5016f5bbf..5937a489b 100644 --- a/tests/BotSharp.PizzaBot.MCPServer/Tools/MakePayment.cs +++ b/tests/BotSharp.PizzaBot.MCPServer/Tools/MakePayment.cs @@ -1,3 +1,4 @@ +using ModelContextProtocol; using ModelContextProtocol.Server; using System.ComponentModel; using System.ComponentModel.DataAnnotations; @@ -14,11 +15,11 @@ public static string Make_Payment( { if (order_number is null) { - throw new McpServerException("Missing required argument 'order_number'"); + throw new McpException("Missing required argument 'order_number'"); } if (order_number is null) { - throw new McpServerException("Missing required argument 'total_amount'"); + throw new McpException("Missing required argument 'total_amount'"); } return "Payment proceed successfully. Thank you for your business. Have a great day!"; } diff --git a/tests/BotSharp.PizzaBot.MCPServer/Tools/PizzaPrices.cs b/tests/BotSharp.PizzaBot.MCPServer/Tools/PizzaPrices.cs index e8aa677c3..326f217d2 100644 --- a/tests/BotSharp.PizzaBot.MCPServer/Tools/PizzaPrices.cs +++ b/tests/BotSharp.PizzaBot.MCPServer/Tools/PizzaPrices.cs @@ -1,3 +1,4 @@ +using ModelContextProtocol; using ModelContextProtocol.Server; using System.ComponentModel; using System.ComponentModel.DataAnnotations; @@ -16,11 +17,11 @@ public static string GetPizzaPrices( { if (pizza_type is null) { - throw new McpServerException("Missing required argument 'pizza_type'"); + throw new McpException("Missing required argument 'pizza_type'"); } if (quantity <= 0) { - throw new McpServerException("Missing required argument 'quantity'"); + throw new McpException("Missing required argument 'quantity'"); } double unit_price = 0; if (pizza_type.ToString() == "Pepperoni Pizza") diff --git a/tests/BotSharp.PizzaBot.MCPServer/Tools/PlaceOrder.cs b/tests/BotSharp.PizzaBot.MCPServer/Tools/PlaceOrder.cs index d6b82bd76..541c992c7 100644 --- a/tests/BotSharp.PizzaBot.MCPServer/Tools/PlaceOrder.cs +++ b/tests/BotSharp.PizzaBot.MCPServer/Tools/PlaceOrder.cs @@ -1,3 +1,4 @@ +using ModelContextProtocol; using ModelContextProtocol.Server; using System.ComponentModel; using System.ComponentModel.DataAnnotations; @@ -15,15 +16,15 @@ public static string PlaceAnOrder( { if (pizza_type is null) { - throw new McpServerException("Missing required argument 'pizza_type'"); + throw new McpException("Missing required argument 'pizza_type'"); } if (quantity <= 0) { - throw new McpServerException("Missing required argument 'quantity'"); + throw new McpException("Missing required argument 'quantity'"); } if (unit_price < 0) { - throw new McpServerException("Missing required argument 'unit_price'"); + throw new McpException("Missing required argument 'unit_price'"); } return "The order number is P123-01: {order_number = \"P123-01\" }"; diff --git a/tests/BotSharp.Test.RealtimeVoice/Program.cs b/tests/BotSharp.Test.RealtimeVoice/Program.cs index bb6285123..1e6c94f5d 100644 --- a/tests/BotSharp.Test.RealtimeVoice/Program.cs +++ b/tests/BotSharp.Test.RealtimeVoice/Program.cs @@ -49,13 +49,20 @@ conn.OnModelUserInterrupted = () => JsonSerializer.Serialize(new { - @event = "clear" + @event = "interrupted" }); +conn.OnUserSpeechDetected = () => + JsonSerializer.Serialize(new + { + @event = "speech_detected" + }); + + await hub.ConnectToModel(async data => { var response = JsonSerializer.Deserialize(data); - if (response.Event == "clear") + if (response.Event == "speech_detected") { channel.ClearBuffer(); } @@ -81,6 +88,7 @@ await hub.ConnectToModel(async data => DisplayAudioLevel(audioLevel); } while (result.Status == StreamChannelStatus.Open); + int CalculateAudioLevel(byte[] buffer, int bytesRecorded) { // Simple audio level calculation (RMS) @@ -123,5 +131,5 @@ void DisplayAudioLevel(int level) // Display audio level as a bar Console.Write("\rMicrophone: ["); Console.Write(new string('#', displayLevel).PadRight(sep, ' ')); - Console.Write("]"); + Console.Write("]\r"); } diff --git a/tests/BotSharp.Test.RealtimeVoice/appsettings.json b/tests/BotSharp.Test.RealtimeVoice/appsettings.json index d8c2730f6..e0ffcb8c4 100644 --- a/tests/BotSharp.Test.RealtimeVoice/appsettings.json +++ b/tests/BotSharp.Test.RealtimeVoice/appsettings.json @@ -48,11 +48,9 @@ }, "RealtimeModel": { - "Provider": "google-ai", - "Model": "gemini-2.0-flash-live-001", "InputAudioFormat": "pcm16", "OutputAudioFormat": "pcm16", - "InterruptResponse": false, + "MaxResponseOutputTokens": 4096 }, "PluginLoader": {