Skip to content

Commit

Permalink
Merge branch 'main' into rysweet-ensure-helloagent-in-test-uses-grpc
Browse files Browse the repository at this point in the history
  • Loading branch information
rysweet authored Feb 11, 2025
2 parents b41a6d7 + 392aa14 commit 3b967ed
Show file tree
Hide file tree
Showing 25 changed files with 1,219 additions and 396 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
[
# For main use the workflow target
{ ref: "${{github.ref}}", dest-dir: dev, uv-version: "0.5.13", sphinx-release-override: "dev" },
{ ref: "python-v0.4.5", dest-dir: stable, uv-version: "0.5.13", sphinx-release-override: "stable" },
{ ref: "python-v0.4.6", dest-dir: stable, uv-version: "0.5.13", sphinx-release-override: "stable" },
{ ref: "v0.4.0.dev0", dest-dir: "0.4.0.dev0", uv-version: "0.5.11", sphinx-release-override: "" },
{ ref: "v0.4.0.dev1", dest-dir: "0.4.0.dev1", uv-version: "0.5.11", sphinx-release-override: "" },
{ ref: "v0.4.0.dev2", dest-dir: "0.4.0.dev2", uv-version: "0.5.11", sphinx-release-override: "" },
Expand All @@ -54,6 +54,7 @@ jobs:
{ ref: "v0.4.3", dest-dir: "0.4.3", uv-version: "0.5.13", sphinx-release-override: "" },
{ ref: "v0.4.4", dest-dir: "0.4.4", uv-version: "0.5.13", sphinx-release-override: "" },
{ ref: "python-v0.4.5", dest-dir: "0.4.5", uv-version: "0.5.13", sphinx-release-override: "" },
{ ref: "python-v0.4.6", dest-dir: "0.4.6", uv-version: "0.5.13", sphinx-release-override: "" },
]
steps:
- name: Checkout
Expand Down
2 changes: 1 addition & 1 deletion docs/switcher.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[
{
"name": "0.4.5 (stable)",
"name": "0.4.6 (stable)",
"version": "stable",
"url": "/autogen/stable/",
"preferred": true
Expand Down
54 changes: 42 additions & 12 deletions dotnet/src/Microsoft.AutoGen/Core.Grpc/GrpcAgentRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,33 @@ private async ValueTask HandlePublish(CloudEvent evt, CancellationToken cancella
}
}

public ValueTask StartAsync(CancellationToken cancellationToken)
public async ValueTask StartAsync(CancellationToken cancellationToken)
{
return this._messageRouter.StartAsync(cancellationToken);
await this._messageRouter.StartAsync(cancellationToken);
if (this._agentsContainer.RegisteredAgentTypes.Count > 0)
{
foreach (var type in this._agentsContainer.RegisteredAgentTypes)
{
await this._client.RegisterAgentAsync(new RegisterAgentTypeRequest
{
Type = type
}, this.CallOptions);
}
}

if (this._agentsContainer.Subscriptions.Count > 0)
{
foreach (var subscription in this._agentsContainer.Subscriptions.Values)
{
await this._client.AddSubscriptionAsync(new AddSubscriptionRequest
{
Subscription = subscription.ToProtobuf()
}, this.CallOptions);
}
}
}

Task IHostedService.StartAsync(CancellationToken cancellationToken) => this._messageRouter.StartAsync(cancellationToken).AsTask();
Task IHostedService.StartAsync(CancellationToken cancellationToken) => this.StartAsync(cancellationToken).AsTask();

public Task StopAsync(CancellationToken cancellationToken)
{
Expand Down Expand Up @@ -344,30 +365,39 @@ public async ValueTask AddSubscriptionAsync(ISubscriptionDefinition subscription
{
this._agentsContainer.AddSubscription(subscription);

var _ = await this._client.AddSubscriptionAsync(new AddSubscriptionRequest
if (this._messageRouter.IsChannelOpen)
{
Subscription = subscription.ToProtobuf()
}, this.CallOptions);
var _ = await this._client.AddSubscriptionAsync(new AddSubscriptionRequest
{
Subscription = subscription.ToProtobuf()
}, this.CallOptions);
}
}

public async ValueTask RemoveSubscriptionAsync(string subscriptionId)
{
this._agentsContainer.RemoveSubscriptionAsync(subscriptionId);

await this._client.RemoveSubscriptionAsync(new RemoveSubscriptionRequest
if (this._messageRouter.IsChannelOpen)
{
Id = subscriptionId
}, this.CallOptions);
await this._client.RemoveSubscriptionAsync(new RemoveSubscriptionRequest
{
Id = subscriptionId
}, this.CallOptions);
}
}

public async ValueTask<AgentType> RegisterAgentFactoryAsync(AgentType type, Func<Contracts.AgentId, IAgentRuntime, ValueTask<IHostableAgent>> factoryFunc)
{
this._agentsContainer.RegisterAgentFactory(type, factoryFunc);

await this._client.RegisterAgentAsync(new RegisterAgentTypeRequest
if (this._messageRouter.IsChannelOpen)
{
Type = type,
}, this.CallOptions);
await this._client.RegisterAgentAsync(new RegisterAgentTypeRequest
{
Type = type,
}, this.CallOptions);
}

return type;
}
Expand Down
4 changes: 4 additions & 0 deletions dotnet/src/Microsoft.AutoGen/Core.Grpc/GrpcMessageRouter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public AutoRestartChannel(AgentRpc.AgentRpcClient client,
_shutdownCts = CancellationTokenSource.CreateLinkedTokenSource(shutdownCancellation);
}

public bool Connected { get => _channel is not null; }

public void EnsureConnected()
{
_logger.LogInformation("Connecting to gRPC endpoint " + Environment.GetEnvironmentVariable("AGENT_HOST"));
Expand Down Expand Up @@ -287,6 +289,8 @@ public async Task StopAsync()
this._incomingMessageChannel.Dispose();
}

public bool IsChannelOpen => this._incomingMessageChannel.Connected;

public void Dispose()
{
_outboundMessagesChannel.Writer.TryComplete();
Expand Down
10 changes: 5 additions & 5 deletions dotnet/test/Microsoft.AutoGen.Core.Grpc.Tests/AgentGrpcTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
namespace Microsoft.AutoGen.Core.Grpc.Tests;

[Trait("Category", "GRPC")]
public class AgentGrpcTests
public class AgentGrpcTests : TestBase
{
[Fact]
public async Task AgentShouldNotReceiveMessagesWhenNotSubscribedTest()
{
var fixture = new GrpcAgentRuntimeFixture();
var runtime = (GrpcAgentRuntime)await fixture.Start();
var runtime = (GrpcAgentRuntime)await fixture.StartAsync();

Logger<BaseAgent> logger = new(new LoggerFactory());
TestProtobufAgent agent = null!;
Expand Down Expand Up @@ -45,7 +45,7 @@ await runtime.RegisterAgentFactoryAsync("MyAgent", async (id, runtime) =>
public async Task AgentShouldReceiveMessagesWhenSubscribedTest()
{
var fixture = new GrpcAgentRuntimeFixture();
var runtime = (GrpcAgentRuntime)await fixture.Start();
var runtime = (GrpcAgentRuntime)await fixture.StartAsync();

Logger<BaseAgent> logger = new(new LoggerFactory());
SubscribedProtobufAgent agent = null!;
Expand Down Expand Up @@ -80,7 +80,7 @@ public async Task SendMessageAsyncShouldReturnResponseTest()
{
// Arrange
var fixture = new GrpcAgentRuntimeFixture();
var runtime = (GrpcAgentRuntime)await fixture.Start();
var runtime = (GrpcAgentRuntime)await fixture.StartAsync();

Logger<BaseAgent> logger = new(new LoggerFactory());
await runtime.RegisterAgentFactoryAsync("MyAgent", async (id, runtime) => await ValueTask.FromResult(new TestProtobufAgent(id, runtime, logger)));
Expand Down Expand Up @@ -114,7 +114,7 @@ public ValueTask HandleAsync(TextMessage item, MessageContext messageContext)
public async Task SubscribeAsyncRemoveSubscriptionAsyncAndGetSubscriptionsTest()
{
var fixture = new GrpcAgentRuntimeFixture();
var runtime = (GrpcAgentRuntime)await fixture.Start();
var runtime = (GrpcAgentRuntime)await fixture.StartAsync();
ReceiverAgent? agent = null;
await runtime.RegisterAgentFactoryAsync("MyAgent", async (id, runtime) =>
{
Expand Down
67 changes: 67 additions & 0 deletions dotnet/test/Microsoft.AutoGen.Core.Grpc.Tests/FreePortManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// FreePortManager.cs

using System.Diagnostics;

namespace Microsoft.AutoGen.Core.Grpc.Tests;

internal sealed class FreePortManager
{
private HashSet<int> takenPorts = new();
private readonly object mutex = new();

[DebuggerDisplay($"{{{nameof(Port)}}}")]
internal sealed class PortTicket(FreePortManager portManager, int port) : IDisposable
{
private FreePortManager? portManager = portManager;

public int Port { get; } = port;

public void Dispose()
{
FreePortManager? localPortManager = Interlocked.Exchange(ref this.portManager, null);
localPortManager?.takenPorts.Remove(this.Port);
}

public override string ToString()
{
return this.Port.ToString();
}

public override bool Equals(object? obj)
{
return obj is PortTicket ticket && ticket.Port == this.Port;
}

public override int GetHashCode()
{
return this.Port.GetHashCode();
}

public static implicit operator int(PortTicket ticket) => ticket.Port;
public static implicit operator string(PortTicket ticket) => ticket.ToString();
}

public PortTicket GetAvailablePort()
{
lock (mutex)
{
int port;
do
{
using var listener = new System.Net.Sockets.TcpListener(System.Net.IPAddress.Loopback, 0);
listener.Start();
port = ((System.Net.IPEndPoint)listener.LocalEndpoint).Port;
listener.Stop();
listener.Dispose();
Thread.Yield(); // Let the listener actually shut down before we try to use the port
} while (takenPorts.Contains(port));

takenPorts.Add(port);

Console.WriteLine($"FreePortManager: Yielding port {port}");
Debug.WriteLine($"FreePortManager: Yielding port {port}");
return new PortTicket(this, port);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,69 +7,89 @@
using Microsoft.Extensions.Hosting;

namespace Microsoft.AutoGen.Core.Grpc.Tests;

/// <summary>
/// Fixture for setting up the gRPC agent runtime for testing.
/// </summary>
public sealed class GrpcAgentRuntimeFixture : IDisposable
{
private FreePortManager.PortTicket? portTicket;

/// the gRPC agent runtime.
public AgentsApp? Client { get; private set; }
public AgentsApp? AgentsApp { get; private set; }

/// mock server for testing.
public WebApplication? Server { get; private set; }
public WebApplication? GatewayServer { get; private set; }

public GrpcAgentServiceCollector GrpcRequestCollector { get; }

public GrpcAgentRuntimeFixture()
{
GrpcRequestCollector = new GrpcAgentServiceCollector();
}

/// <summary>
/// Start - gets a new port and starts fresh instances
/// </summary>
public async Task<IAgentRuntime> Start(bool initialize = true)
public async Task<IAgentRuntime> StartAsync(bool startRuntime = true, bool registerDefaultAgent = true)
{
int port = GetAvailablePort(); // Get a new port per test run
this.portTicket = GrpcAgentRuntimeFixture.PortManager.GetAvailablePort(); // Get a new port per test run

// Update environment variables so each test runs independently
Environment.SetEnvironmentVariable("ASPNETCORE_HTTPS_PORTS", port.ToString());
Environment.SetEnvironmentVariable("AGENT_HOST", $"https://localhost:{port}");
Environment.SetEnvironmentVariable("ASPNETCORE_HTTPS_PORTS", portTicket);
Environment.SetEnvironmentVariable("AGENT_HOST", $"https://localhost:{portTicket}");
Environment.SetEnvironmentVariable("ASPNETCORE_ENVIRONMENT", "Development");
Server = ServerBuilder().Result;
await Server.StartAsync().ConfigureAwait(true);
Client = ClientBuilder().Result;
await Client.StartAsync().ConfigureAwait(true);

var worker = Client.Services.GetRequiredService<IAgentRuntime>();
this.GatewayServer = await this.InitializeGateway();
this.AgentsApp = await this.InitializeRuntime(startRuntime, registerDefaultAgent);
var runtime = AgentsApp.Services.GetRequiredService<IAgentRuntime>();

return (worker);
return runtime;
}
private static async Task<AgentsApp> ClientBuilder()

private async Task<AgentsApp> InitializeRuntime(bool callStartAsync, bool registerDefaultAgent)
{
var appBuilder = new AgentsAppBuilder();
appBuilder.AddGrpcAgentWorker();
appBuilder.AddAgent<TestProtobufAgent>("TestAgent");
return await appBuilder.BuildAsync();

if (registerDefaultAgent)
{
appBuilder.AddAgent<TestProtobufAgent>("TestAgent");
}

AgentsApp result = await appBuilder.BuildAsync();

if (callStartAsync)
{
await result.StartAsync().ConfigureAwait(true);
}

return result;
}
private static async Task<WebApplication> ServerBuilder()

private async Task<WebApplication> InitializeGateway()
{
var builder = WebApplication.CreateBuilder();
builder.Services.AddGrpc();
var app = builder.Build();
builder.Services.AddSingleton(this.GrpcRequestCollector);

WebApplication app = builder.Build();
app.MapGrpcService<GrpcAgentServiceFixture>();

await app.StartAsync().ConfigureAwait(true);
return app;
}
private static int GetAvailablePort()
{
using var listener = new System.Net.Sockets.TcpListener(System.Net.IPAddress.Loopback, 0);
listener.Start();
int port = ((System.Net.IPEndPoint)listener.LocalEndpoint).Port;
listener.Stop();
return port;
}

private static readonly FreePortManager PortManager = new();

/// <summary>
/// Stop - stops the agent and ensures cleanup
/// </summary>
public void Stop()
{
(Client as IHost)?.StopAsync(TimeSpan.FromSeconds(30)).GetAwaiter().GetResult();
Server?.StopAsync().GetAwaiter().GetResult();
(AgentsApp as IHost)?.StopAsync(TimeSpan.FromSeconds(30)).GetAwaiter().GetResult();
GatewayServer?.StopAsync().GetAwaiter().GetResult();
portTicket?.Dispose();
}

/// <summary>
Expand Down
Loading

0 comments on commit 3b967ed

Please sign in to comment.