Skip to content

Commit

Permalink
Rysweet 5207 net runtime interface to match python add registration t…
Browse files Browse the repository at this point in the history
…o interface and inmemoryruntime (microsoft#5215)

* add Registry abstractions and Registry Storage to Core/Contracts
* brings Grpc in line with these abstractions
* add registeragenttype to in memory runtime. Note it's not necessary to
call this because we register all the agents with reflection unless you
tell the runtime not to.....

## Why are these changes needed?

Bringing the .NET more in line with the python

## Related issue number

close microsoft#5207 

## Checks

- [] I've included any doc changes needed for
https://microsoft.github.io/autogen/. See
https://microsoft.github.io/autogen/docs/Contribute#documentation to
build and test documentation locally.
- [x ] I've added tests (if relevant) corresponding to the changes
introduced in this PR.
- [x] I've made sure all auto checks have passed.

---------

Co-authored-by: Copilot <[email protected]>
  • Loading branch information
rysweet and Copilot authored Jan 27, 2025
1 parent e582072 commit 0aed066
Show file tree
Hide file tree
Showing 27 changed files with 1,003 additions and 272 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -195,4 +195,7 @@ samples/apps/autogen-studio/autogenstudio/models/test/
notebook/coding

# dotnet artifacts
artifacts
artifacts

# project data
registry.json
4 changes: 4 additions & 0 deletions dotnet/.editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -701,3 +701,7 @@ generated_code = true

# IDE1591 Missing XML comment for publicly visible type or member
dotnet_diagnostic.CS1591.severity = none

[I*.cs]
# dont warn on missing accessibility modifiers for interfaces
dotnet_diagnostic.IDE0040.severity = none
15 changes: 15 additions & 0 deletions dotnet/src/Microsoft.AutoGen/Contracts/AgentsRegistryState.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// AgentsRegistryState.cs
using System.Collections.Concurrent;

namespace Microsoft.AutoGen.Contracts;
public class AgentsRegistryState
{
public ConcurrentDictionary<string, HashSet<string>> AgentsToEventsMap { get; set; } = new ConcurrentDictionary<string, HashSet<string>>();
public ConcurrentDictionary<string, HashSet<string>> AgentsToTopicsMap { get; set; } = [];
public ConcurrentDictionary<string, HashSet<string>> TopicToAgentTypesMap { get; set; } = [];
public ConcurrentDictionary<string, HashSet<string>> EventsToAgentTypesMap { get; set; } = [];
public ConcurrentDictionary<string, HashSet<Subscription>> GuidSubscriptionsMap { get; set; } = [];
public ConcurrentDictionary<string, AgentId> AgentTypes { get; set; } = [];
public string Etag { get; set; } = Guid.NewGuid().ToString();
}
24 changes: 24 additions & 0 deletions dotnet/src/Microsoft.AutoGen/Contracts/IAgent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// IAgent.cs

using Google.Protobuf;

namespace Microsoft.AutoGen.Contracts;

public interface IAgent
{
AgentId AgentId { get; }
IAgentRuntime Worker { get; }
ValueTask<List<Subscription>> GetSubscriptionsAsync();
ValueTask<AddSubscriptionResponse> SubscribeAsync(string topic);
ValueTask<RemoveSubscriptionResponse> UnsubscribeAsync(Guid id);
ValueTask<RemoveSubscriptionResponse> UnsubscribeAsync(string topic);
Task StoreAsync(AgentState state, CancellationToken cancellationToken = default);
Task<T> ReadAsync<T>(AgentId agentId, CancellationToken cancellationToken = default) where T : IMessage, new();
ValueTask PublishMessageAsync(IMessage message, string topic, string source, string key, CancellationToken token = default);
ValueTask PublishMessageAsync<T>(T message, string topic, string source, CancellationToken token = default) where T : IMessage;
ValueTask PublishMessageAsync<T>(T message, string topic, CancellationToken token = default) where T : IMessage;
ValueTask PublishMessageAsync<T>(T message, CancellationToken token = default) where T : IMessage;
Task<RpcResponse> HandleRequestAsync(RpcRequest request);
Task HandleObjectAsync(object item, CancellationToken cancellationToken = default);
}
93 changes: 93 additions & 0 deletions dotnet/src/Microsoft.AutoGen/Contracts/IAgentRuntime.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// IAgentRuntime.cs

using Google.Protobuf;
namespace Microsoft.AutoGen.Contracts;

/// <summary>
/// Defines the common surface for agent runtime implementations.
/// </summary>
public interface IAgentRuntime
{
/// <summary>
/// Gets the dependency injection service provider for the runtime.
/// </summary>
IServiceProvider RuntimeServiceProvider { get; }

/// <summary>
/// Registers a new agent type asynchronously.
/// </summary>
/// <param name="request">The request containing the agent type details.</param>
/// <param name="cancellationToken">A token to cancel the operation.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
ValueTask RegisterAgentTypeAsync(RegisterAgentTypeRequest request, CancellationToken cancellationToken = default);

/// <summary>
/// to be removed in favor of send_message
/// Sends a request to and agent.
/// </summary>
/// <param name="agent">The agent sending the request.</param>
/// <param name="request">The request to be sent.</param>
/// <param name="cancellationToken">A token to cancel the operation.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
ValueTask RuntimeSendRequestAsync(IAgent agent, RpcRequest request, CancellationToken cancellationToken = default);

/// <summary>
/// Sends a response to the above request.
/// /// to be removed in favor of send_message
/// </summary>
/// <param name="response">The response to be sent.</param>
/// <param name="cancellationToken">A token to cancel the operation.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
ValueTask RuntimeSendResponseAsync(RpcResponse response, CancellationToken cancellationToken = default);

/// <summary>
/// Publishes a message to a topic.
/// </summary>
/// <param name="message">The message to be published.</param>
/// <param name="topic">The topic to publish the message to.</param>
/// <param name="sender">The agent sending the message.</param>
/// <param name="cancellationToken">A token to cancel the operation.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
ValueTask PublishMessageAsync(IMessage message, TopicId topic, IAgent? sender, CancellationToken? cancellationToken = default);

/// <summary>
/// Saves the state of an agent asynchronously.
/// </summary>
/// <param name="value">The state to be saved.</param>
/// <param name="cancellationToken">A token to cancel the operation.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
ValueTask SaveStateAsync(AgentState value, CancellationToken cancellationToken = default);

/// <summary>
/// Loads the state of an agent asynchronously.
/// </summary>
/// <param name="agentId">The ID of the agent whose state is to be loaded.</param>
/// <param name="cancellationToken">A token to cancel the operation.</param>
/// <returns>A task that represents the asynchronous operation, containing the agent state.</returns>
ValueTask<AgentState> LoadStateAsync(AgentId agentId, CancellationToken cancellationToken = default);

/// <summary>
/// Adds a subscription to a topic.
/// </summary>
/// <param name="request">The request containing the subscription types.</param>
/// <param name="cancellationToken">A token to cancel the operation.</param>
/// <returns>A task that represents the asynchronous operation, containing the response.</returns>
ValueTask<AddSubscriptionResponse> AddSubscriptionAsync(AddSubscriptionRequest request, CancellationToken cancellationToken = default);

/// <summary>
/// Removes a subscription.
/// </summary>
/// <param name="request">The request containing the subscription id.</param>
/// <param name="cancellationToken">A token to cancel the operation.</param>
/// <returns>A task that represents the asynchronous operation, containing the response.</returns>
ValueTask<RemoveSubscriptionResponse> RemoveSubscriptionAsync(RemoveSubscriptionRequest request, CancellationToken cancellationToken = default);

/// <summary>
/// Gets the list of subscriptions.
/// </summary>
/// <param name="request">The request containing the subscription query details.</param>
/// <param name="cancellationToken">A token to cancel the operation.</param>
/// <returns>A task that represents the asynchronous operation, containing the list of subscriptions.</returns>
ValueTask<List<Subscription>> GetSubscriptionsAsync(GetSubscriptionsRequest request, CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
@@ -1,84 +1,55 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// IRegistry.cs
namespace Microsoft.AutoGen.Contracts;

using Microsoft.AutoGen.Contracts;

namespace Microsoft.AutoGen.Runtime.Grpc.Abstractions;

/// <summary>
/// Interface for managing agent registration, placement, and subscriptions.
/// </summary>
public interface IRegistry
{
/// <summary>
/// Gets or places an agent based on the provided agent ID.
/// </summary>
/// <param name="agentId">The ID of the agent.</param>
/// <returns>A tuple containing the worker and a boolean indicating if it's a new placement.</returns>
ValueTask<(IGateway? Worker, bool NewPlacement)> GetOrPlaceAgent(AgentId agentId);

/// <summary>
/// Removes a worker from the registry.
/// </summary>
/// <param name="worker">The worker to remove.</param>
/// <returns>A task representing the asynchronous operation.</returns>
ValueTask RemoveWorker(IGateway worker);

//AgentsRegistryState State { get; set; }
/// <summary>
/// Registers a new agent type with the specified worker.
/// </summary>
/// <param name="request">The request containing agent type details.</param>
/// <param name="worker">The worker to register the agent type with.</param>
/// <returns>A task representing the asynchronous operation.</returns>
ValueTask RegisterAgentType(RegisterAgentTypeRequest request, IGateway worker);

/// <summary>
/// Adds a new worker to the registry.
/// </summary>
/// <param name="worker">The worker to add.</param>
/// <returns>A task representing the asynchronous operation.</returns>
ValueTask AddWorker(IGateway worker);
/// <remarks>removing CancellationToken from here as it is not compatible with Orleans Serialization</remarks>
ValueTask RegisterAgentTypeAsync(RegisterAgentTypeRequest request, IAgentRuntime worker);

/// <summary>
/// Unregisters an agent type from the specified worker.
/// </summary>
/// <param name="type">The type of the agent to unregister.</param>
/// <param name="worker">The worker to unregister the agent type from.</param>
/// <returns>A task representing the asynchronous operation.</returns>
ValueTask UnregisterAgentType(string type, IGateway worker);

/// <summary>
/// Gets a compatible worker for the specified agent type.
/// </summary>
/// <param name="type">The type of the agent.</param>
/// <returns>A task representing the asynchronous operation, with the compatible worker as the result.</returns>
ValueTask<IGateway?> GetCompatibleWorker(string type);
/// <remarks>removing CancellationToken from here as it is not compatible with Orleans Serialization</remarks>
ValueTask UnregisterAgentTypeAsync(string type, IAgentRuntime worker);

/// <summary>
/// Gets a list of agents subscribed to and handling the specified topic and event type.
/// </summary>
/// <param name="topic">The topic to check subscriptions for.</param>
/// <param name="eventType">The event type to check subscriptions for.</param>
/// <returns>A task representing the asynchronous operation, with the list of agent IDs as the result.</returns>
ValueTask<List<string>> GetSubscribedAndHandlingAgents(string topic, string eventType);
ValueTask<List<string>> GetSubscribedAndHandlingAgentsAsync(string topic, string eventType);

/// <summary>
/// Subscribes an agent to a topic.
/// </summary>
/// <param name="request">The subscription request.</param>
/// <returns>A task representing the asynchronous operation.</returns>
/// <remarks>removing CancellationToken from here as it is not compatible with Orleans Serialization</remarks>
ValueTask SubscribeAsync(AddSubscriptionRequest request);

/// <summary>
/// Unsubscribes an agent from a topic.
/// </summary>
/// <param name="request">The unsubscription request.</param>
/// <returns>A task representing the asynchronous operation.</returns>
/// <remarks>removing CancellationToken from here as it is not compatible with Orleans Serialization</remarks>
ValueTask UnsubscribeAsync(RemoveSubscriptionRequest request); // TODO: This should have its own request type.

/// <summary>
/// Gets the subscriptions for a specified agent type.
/// </summary>
/// <returns>A task representing the asynchronous operation, with the subscriptions as the result.</returns>
ValueTask<List<Subscription>> GetSubscriptions(GetSubscriptionsRequest request);
ValueTask<List<Subscription>> GetSubscriptionsAsync(GetSubscriptionsRequest request);
}
20 changes: 20 additions & 0 deletions dotnet/src/Microsoft.AutoGen/Contracts/IRegistryStorage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// IRegistryStorage.cs

namespace Microsoft.AutoGen.Contracts;

public interface IRegistryStorage
{
/// <summary>
/// Populates the Registry state from the storage.
/// </summary>
/// <param name="cancellationToken"></param>
Task<AgentsRegistryState> ReadStateAsync(CancellationToken cancellationToken = default);
/// <summary>
/// Writes the Registry state to the storage.
/// </summary>
/// <param name="state"></param>
/// <param name="cancellationToken"></param>
/// <returns>the etag that was written</returns>
ValueTask<string> WriteStateAsync(AgentsRegistryState state, CancellationToken cancellationToken = default);
}
6 changes: 2 additions & 4 deletions dotnet/src/Microsoft.AutoGen/Core.Grpc/GrpcAgentRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -295,19 +295,17 @@ await WriteChannelAsync(new Message
{
await WriteChannelAsync(new Message { Response = response }, cancellationToken).ConfigureAwait(false);
}

public new async ValueTask RuntimeSendRequestAsync(Agent agent, RpcRequest request, CancellationToken cancellationToken = default)
public new async ValueTask RuntimeSendRequestAsync(IAgent agent, RpcRequest request, CancellationToken cancellationToken = default)
{
var requestId = Guid.NewGuid().ToString();
_pendingRequests[requestId] = (agent, request.RequestId);
_pendingRequests[requestId] = ((Agent)agent, request.RequestId);
request.RequestId = requestId;
await WriteChannelAsync(new Message { Request = request }, cancellationToken).ConfigureAwait(false);
}
public new async ValueTask RuntimeWriteMessage(Message message, CancellationToken cancellationToken = default)
{
await WriteChannelAsync(message, cancellationToken).ConfigureAwait(false);
}

public async ValueTask RuntimePublishEventAsync(CloudEvent @event, CancellationToken cancellationToken = default)
{
await WriteChannelAsync(new Message { CloudEvent = @event }, cancellationToken).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public static IHostApplicationBuilder AddGrpcAgentWorker(this IHostApplicationBu
});
var assemblies = AppDomain.CurrentDomain.GetAssemblies();
builder.Services.TryAddSingleton(DistributedContextPropagator.Current);
builder.Services.AddSingleton<IRegistryStorage, RegistryStorage>();
builder.Services.AddSingleton<IRegistry, Registry>();
builder.Services.AddSingleton<IAgentRuntime, GrpcAgentRuntime>();
builder.Services.AddSingleton<IHostedService>(sp => (IHostedService)sp.GetRequiredService<IAgentRuntime>());
builder.Services.AddKeyedSingleton("AgentsMetadata", (sp, key) =>
Expand Down
2 changes: 1 addition & 1 deletion dotnet/src/Microsoft.AutoGen/Core/Agent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace Microsoft.AutoGen.Core;
/// <summary>
/// Represents the base class for an agent in the AutoGen system.
/// </summary>
public abstract class Agent
public abstract class Agent : IAgent
{
private readonly object _lock = new();
private readonly ConcurrentDictionary<string, TaskCompletionSource<RpcResponse>> _pendingRequests = [];
Expand Down
17 changes: 12 additions & 5 deletions dotnet/src/Microsoft.AutoGen/Core/AgentRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
namespace Microsoft.AutoGen.Core;

/// <summary>
/// Represents a worker that manages agents and handles messages.
/// An InMemory single-process implementation of <see cref="IAgentRuntime"/>.
/// </summary>
/// <remarks>
/// Initializes a new instance of the <see cref="AgentRuntime"/> class.
/// Responsible for message routing and delivery.
/// </remarks>
/// <param name="hostApplicationLifetime">The application lifetime.</param>
/// <param name="serviceProvider">The service provider.</param>
Expand All @@ -29,6 +29,13 @@ public class AgentRuntime(
private readonly ConcurrentDictionary<string, List<Subscription>> _subscriptionsByAgentType = new();
private readonly ConcurrentDictionary<string, List<string>> _subscriptionsByTopic = new();
private readonly ConcurrentDictionary<Guid, IDictionary<string, string>> _subscriptionsByGuid = new();
private readonly IRegistry _registry = serviceProvider.GetRequiredService<IRegistry>();

/// <inheritdoc />
public override async ValueTask RegisterAgentTypeAsync(RegisterAgentTypeRequest request, CancellationToken cancellationToken = default)
{
await _registry.RegisterAgentTypeAsync(request, this);
}

/// <inheritdoc />
public override ValueTask SaveStateAsync(AgentState value, CancellationToken cancellationToken = default)
Expand All @@ -52,16 +59,16 @@ public override ValueTask<AgentState> LoadStateAsync(AgentId agentId, Cancellati
}
}
/// <inheritdoc />
public new async ValueTask RuntimeSendRequestAsync(Agent agent, RpcRequest request, CancellationToken cancellationToken = default)
public override async ValueTask RuntimeSendRequestAsync(IAgent agent, RpcRequest request, CancellationToken cancellationToken = default)
{
var requestId = Guid.NewGuid().ToString();
_pendingClientRequests[requestId] = (agent, request.RequestId);
_pendingClientRequests[requestId] = ((Agent)agent, request.RequestId);
request.RequestId = requestId;
await _mailbox.Writer.WriteAsync(request, cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc />
public new ValueTask RuntimeSendResponseAsync(RpcResponse response, CancellationToken cancellationToken = default)
public override ValueTask RuntimeSendResponseAsync(RpcResponse response, CancellationToken cancellationToken = default)
{
return _mailbox.Writer.WriteAsync(new Message { Response = response }, cancellationToken);
}
Expand Down
16 changes: 5 additions & 11 deletions dotnet/src/Microsoft.AutoGen/Core/AgentRuntimeBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,12 @@ public async Task RunMessagePump()
}
}
}
public async ValueTask PublishMessageAsync(IMessage message, TopicId topic, Agent? sender, CancellationToken? cancellationToken = default)
public abstract ValueTask RegisterAgentTypeAsync(RegisterAgentTypeRequest request, CancellationToken cancellationToken = default);
public async ValueTask PublishMessageAsync(IMessage message, TopicId topic, IAgent? sender, CancellationToken? cancellationToken = default)
{
var topicString = topic.Type + "." + topic.Source;
sender ??= RuntimeServiceProvider.GetRequiredService<Client>();
await PublishEventAsync(message.ToCloudEvent(key: sender.GetType().Name, topic: topicString), sender, cancellationToken.GetValueOrDefault()).ConfigureAwait(false);
await PublishEventAsync(message.ToCloudEvent(key: sender.GetType().Name, topic: topicString), (Agent)sender, cancellationToken.GetValueOrDefault()).ConfigureAwait(false);
}
public abstract ValueTask SaveStateAsync(AgentState value, CancellationToken cancellationToken = default);
public abstract ValueTask<AgentState> LoadStateAsync(AgentId agentId, CancellationToken cancellationToken = default);
Expand Down Expand Up @@ -202,14 +203,7 @@ private async ValueTask DispatchEventsToAgentsAsync(CloudEvent cloudEvent, Cance
}
await Task.WhenAll(taskList).ConfigureAwait(false);
}
public abstract ValueTask RuntimeSendRequestAsync(IAgent agent, RpcRequest request, CancellationToken cancellationToken = default);

public ValueTask RuntimeSendRequestAsync(Agent agent, RpcRequest request, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}

public ValueTask RuntimeSendResponseAsync(RpcResponse response, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
public abstract ValueTask RuntimeSendResponseAsync(RpcResponse response, CancellationToken cancellationToken = default);
}
Loading

0 comments on commit 0aed066

Please sign in to comment.