Skip to content

Commit

Permalink
Dotnet Grpc worker implementation (#5245)
Browse files Browse the repository at this point in the history
Co-authored-by: Jacob Alber <[email protected]>
Co-authored-by: Ryan Sweet <[email protected]>
  • Loading branch information
3 people authored Feb 5, 2025
1 parent 9030f75 commit 08f9830
Show file tree
Hide file tree
Showing 35 changed files with 1,861 additions and 269 deletions.
34 changes: 34 additions & 0 deletions .github/workflows/dotnet-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,38 @@ jobs:
- name: Unit Test V2
run: dotnet test --no-build -bl --configuration Release --filter "Category=UnitV2"

grpc-unit-tests:
name: Dotnet Grpc unit tests
needs: paths-filter
if: needs.paths-filter.outputs.hasChanges == 'true'
defaults:
run:
working-directory: dotnet
strategy:
fail-fast: false
matrix:
os: [ ubuntu-latest ]
runs-on: ${{ matrix.os }}
timeout-minutes: 30
steps:
- uses: actions/checkout@v4
with:
lfs: true
- name: Setup .NET 8.0
uses: actions/setup-dotnet@v4
with:
dotnet-version: '8.0.x'
- name: Install dev certs
run: dotnet --version && dotnet dev-certs https --trust
- name: Restore dependencies
run: |
# dotnet nuget add source --name dotnet-tool https://pkgs.dev.azure.com/dnceng/public/_packaging/dotnet-tools/nuget/v3/index.json --configfile NuGet.config
dotnet restore -bl
- name: Build
run: dotnet build --no-restore --configuration Release -bl /p:SignAssembly=true
- name: GRPC tests
run: dotnet test --no-build -bl --configuration Release --filter "Category=GRPC"

integration-test:
strategy:
fail-fast: true
Expand Down Expand Up @@ -224,6 +256,8 @@ jobs:
with:
dotnet-version: '8.0.x'
global-json-file: dotnet/global.json
- name: Install dev certs
run: dotnet --version && dotnet dev-certs https --trust
- name: Restore dependencies
run: |
dotnet restore -bl
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/dotnet-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ jobs:
run: |
echo "Build AutoGen"
dotnet build --no-restore --configuration Release -bl /p:SignAssembly=true
- run: sudo dotnet dev-certs https --trust --no-password
- name: Unit Test
run: dotnet test --no-build -bl --configuration Release
env:
Expand Down
14 changes: 14 additions & 0 deletions dotnet/AutoGen.sln
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Hello", "Hello", "{F42F9C8E
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AutoGen.Core.Grpc", "src\Microsoft.AutoGen\Core.Grpc\Microsoft.AutoGen.Core.Grpc.csproj", "{3D83C6DB-ACEA-48F3-959F-145CCD2EE135}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GettingStartedGrpc", "samples\GettingStartedGrpc\GettingStartedGrpc.csproj", "{C3740DF1-18B1-4607-81E4-302F0308C848}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AutoGen.Core.Grpc.Tests", "test\Microsoft.AutoGen.Core.Grpc.Tests\Microsoft.AutoGen.Core.Grpc.Tests.csproj", "{23A028D3-5EB1-4FA0-9CD1-A1340B830579}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -306,6 +310,14 @@ Global
{AAD593FE-A49B-425E-A9FE-A0022CD25E3D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AAD593FE-A49B-425E-A9FE-A0022CD25E3D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AAD593FE-A49B-425E-A9FE-A0022CD25E3D}.Release|Any CPU.Build.0 = Release|Any CPU
{C3740DF1-18B1-4607-81E4-302F0308C848}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C3740DF1-18B1-4607-81E4-302F0308C848}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C3740DF1-18B1-4607-81E4-302F0308C848}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C3740DF1-18B1-4607-81E4-302F0308C848}.Release|Any CPU.Build.0 = Release|Any CPU
{23A028D3-5EB1-4FA0-9CD1-A1340B830579}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{23A028D3-5EB1-4FA0-9CD1-A1340B830579}.Debug|Any CPU.Build.0 = Debug|Any CPU
{23A028D3-5EB1-4FA0-9CD1-A1340B830579}.Release|Any CPU.ActiveCfg = Release|Any CPU
{23A028D3-5EB1-4FA0-9CD1-A1340B830579}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -359,6 +371,8 @@ Global
{3D83C6DB-ACEA-48F3-959F-145CCD2EE135} = {18BF8DD7-0585-48BF-8F97-AD333080CE06}
{AAD593FE-A49B-425E-A9FE-A0022CD25E3D} = {F42F9C8E-7BD9-4687-9B63-AFFA461AF5C1}
{F42F9C8E-7BD9-4687-9B63-AFFA461AF5C1} = {CE0AA8D5-12B8-4628-9589-DAD8CB0DDCF6}
{C3740DF1-18B1-4607-81E4-302F0308C848} = {CE0AA8D5-12B8-4628-9589-DAD8CB0DDCF6}
{23A028D3-5EB1-4FA0-9CD1-A1340B830579} = {F823671B-3ECA-4AE6-86DA-25E920D3FE64}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {93384647-528D-46C8-922C-8DB36A382F0B}
Expand Down
34 changes: 34 additions & 0 deletions dotnet/samples/GettingStartedGrpc/Checker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Checker.cs

using Microsoft.AutoGen.Contracts;
using Microsoft.AutoGen.Core;
using Microsoft.Extensions.Hosting;
using TerminationF = System.Func<int, bool>;

namespace GettingStartedGrpcSample;

[TypeSubscription("default")]
public class Checker(
AgentId id,
IAgentRuntime runtime,
IHostApplicationLifetime hostApplicationLifetime,
TerminationF runUntilFunc
) :
BaseAgent(id, runtime, "Modifier", null),
IHandle<Events.CountUpdate>
{
public async ValueTask HandleAsync(Events.CountUpdate item, MessageContext messageContext)
{
if (!runUntilFunc(item.NewCount))
{
Console.WriteLine($"\nChecker:\n{item.NewCount} passed the check, continue.");
await this.PublishMessageAsync(new Events.CountMessage { Content = item.NewCount }, new TopicId("default"));
}
else
{
Console.WriteLine($"\nChecker:\n{item.NewCount} failed the check, stopping.");
hostApplicationLifetime.StopApplication();
}
}
}
26 changes: 26 additions & 0 deletions dotnet/samples/GettingStartedGrpc/GettingStartedGrpc.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<RootNamespace>getting_started</RootNamespace>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\Microsoft.AutoGen\Contracts\Microsoft.AutoGen.Contracts.csproj" />
<ProjectReference Include="..\..\src\Microsoft.AutoGen\Core\Microsoft.AutoGen.Core.csproj" />
<ProjectReference Include="..\..\src\Microsoft.AutoGen\Core.Grpc\Microsoft.AutoGen.Core.Grpc.csproj" />
</ItemGroup>


<ItemGroup>
<Protobuf Include="message.proto" GrpcServices="Client" Link="Protos\message.proto" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Grpc.Tools" PrivateAssets="All" />
</ItemGroup>

</Project>
29 changes: 29 additions & 0 deletions dotnet/samples/GettingStartedGrpc/Modifier.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Modifier.cs

using Microsoft.AutoGen.Contracts;
using Microsoft.AutoGen.Core;

using ModifyF = System.Func<int, int>;

namespace GettingStartedGrpcSample;

[TypeSubscription("default")]
public class Modifier(
AgentId id,
IAgentRuntime runtime,
ModifyF modifyFunc
) :
BaseAgent(id, runtime, "Modifier", null),
IHandle<Events.CountMessage>
{

public async ValueTask HandleAsync(Events.CountMessage item, MessageContext messageContext)
{
int newValue = modifyFunc(item.Content);
Console.WriteLine($"\nModifier:\nModified {item.Content} to {newValue}");

var updateMessage = new Events.CountUpdate { NewCount = newValue };
await this.PublishMessageAsync(updateMessage, topic: new TopicId("default"));
}
}
36 changes: 36 additions & 0 deletions dotnet/samples/GettingStartedGrpc/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Program.cs
using GettingStartedGrpcSample;
using Microsoft.AutoGen.Contracts;
using Microsoft.AutoGen.Core;
using Microsoft.AutoGen.Core.Grpc;
using Microsoft.Extensions.DependencyInjection.Extensions;
using ModifyF = System.Func<int, int>;
using TerminationF = System.Func<int, bool>;

ModifyF modifyFunc = (int x) => x - 1;
TerminationF runUntilFunc = (int x) =>
{
return x <= 1;
};

AgentsAppBuilder appBuilder = new AgentsAppBuilder();
appBuilder.AddGrpcAgentWorker("http://localhost:50051");

appBuilder.Services.TryAddSingleton(modifyFunc);
appBuilder.Services.TryAddSingleton(runUntilFunc);

appBuilder.AddAgent<Checker>("Checker");
appBuilder.AddAgent<Modifier>("Modifier");

var app = await appBuilder.BuildAsync();
await app.StartAsync();

// Send the initial count to the agents app, running on the `local` runtime, and pass through the registered services via the application `builder`
await app.PublishMessageAsync(new GettingStartedGrpcSample.Events.CountMessage
{
Content = 10
}, new TopicId("default"));

// Run until application shutdown
await app.WaitForShutdownAsync();
11 changes: 11 additions & 0 deletions dotnet/samples/GettingStartedGrpc/message.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
syntax = "proto3";

option csharp_namespace = "GettingStartedGrpcSample.Events";

message CountMessage {
int32 content = 1;
}

message CountUpdate {
int32 new_count = 1;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// AgentsAppBuilderExtensions.cs

using System.Diagnostics;
using Grpc.Core;
using Grpc.Net.Client.Configuration;
using Microsoft.AutoGen.Contracts;
using Microsoft.AutoGen.Protobuf;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Logging;
namespace Microsoft.AutoGen.Core.Grpc;

public static class AgentsAppBuilderExtensions
{
private const string _defaultAgentServiceAddress = "http://localhost:53071";

// TODO: How do we ensure AddGrpcAgentWorker and UseInProcessRuntime are mutually exclusive?
public static AgentsAppBuilder AddGrpcAgentWorker(this AgentsAppBuilder builder, string? agentServiceAddress = null)
{
builder.Services.AddGrpcClient<AgentRpc.AgentRpcClient>(options =>
{
options.Address = new Uri(agentServiceAddress ?? builder.Configuration.GetValue("AGENT_HOST", _defaultAgentServiceAddress));
options.ChannelOptionsActions.Add(channelOptions =>
{
var loggerFactory = new LoggerFactory();
if (Debugger.IsAttached)
{
channelOptions.HttpHandler = new SocketsHttpHandler
{
EnableMultipleHttp2Connections = false,
KeepAlivePingDelay = TimeSpan.FromSeconds(200),
KeepAlivePingTimeout = TimeSpan.FromSeconds(100),
KeepAlivePingPolicy = HttpKeepAlivePingPolicy.Always
};
}
else
{
channelOptions.HttpHandler = new SocketsHttpHandler
{
EnableMultipleHttp2Connections = true,
KeepAlivePingDelay = TimeSpan.FromSeconds(20),
KeepAlivePingTimeout = TimeSpan.FromSeconds(10),
KeepAlivePingPolicy = HttpKeepAlivePingPolicy.WithActiveRequests
};
}

var methodConfig = new MethodConfig
{
Names = { MethodName.Default },
RetryPolicy = new RetryPolicy
{
MaxAttempts = 5,
InitialBackoff = TimeSpan.FromSeconds(1),
MaxBackoff = TimeSpan.FromSeconds(5),
BackoffMultiplier = 1.5,
RetryableStatusCodes = { StatusCode.Unavailable }
}
};

channelOptions.ServiceConfig = new() { MethodConfigs = { methodConfig } };
channelOptions.ThrowOperationCanceledOnCancellation = true;
});
});

builder.Services.TryAddSingleton(DistributedContextPropagator.Current);
builder.Services.AddSingleton<IAgentRuntime, GrpcAgentRuntime>();
builder.Services.AddHostedService<GrpcAgentRuntime>(services =>
{
return (services.GetRequiredService<IAgentRuntime>() as GrpcAgentRuntime)!;
});

return builder;
}
}
43 changes: 43 additions & 0 deletions dotnet/src/Microsoft.AutoGen/Core.Grpc/CloudEventExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// CloudEventExtensions.cs

using Microsoft.AutoGen.Contracts;

namespace Microsoft.AutoGen.Core.Grpc;

internal static class CloudEventExtensions
{
// Convert an ISubscrptionDefinition to a Protobuf Subscription
internal static CloudEvent CreateCloudEvent(Google.Protobuf.WellKnownTypes.Any payload, TopicId topic, string dataType, AgentId? sender, string messageId)
{
var attributes = new Dictionary<string, CloudEvent.Types.CloudEventAttributeValue>
{
{
Constants.DATA_CONTENT_TYPE_ATTR, new CloudEvent.Types.CloudEventAttributeValue { CeString = Constants.DATA_CONTENT_TYPE_PROTOBUF_VALUE }
},
{
Constants.DATA_SCHEMA_ATTR, new CloudEvent.Types.CloudEventAttributeValue { CeString = dataType }
},
{
Constants.MESSAGE_KIND_ATTR, new CloudEvent.Types.CloudEventAttributeValue { CeString = Constants.MESSAGE_KIND_VALUE_PUBLISH }
}
};

if (sender != null)
{
var senderNonNull = (AgentId)sender;
attributes.Add(Constants.AGENT_SENDER_TYPE_ATTR, new CloudEvent.Types.CloudEventAttributeValue { CeString = senderNonNull.Type });
attributes.Add(Constants.AGENT_SENDER_KEY_ATTR, new CloudEvent.Types.CloudEventAttributeValue { CeString = senderNonNull.Key });
}

return new CloudEvent
{
ProtoData = payload,
Type = topic.Type,
Source = topic.Source,
Id = messageId,
Attributes = { attributes }
};

}
}
21 changes: 21 additions & 0 deletions dotnet/src/Microsoft.AutoGen/Core.Grpc/Constants.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Constants.cs

namespace Microsoft.AutoGen.Core.Grpc;

public static class Constants
{
public const string DATA_CONTENT_TYPE_PROTOBUF_VALUE = "application/x-protobuf";
public const string DATA_CONTENT_TYPE_JSON_VALUE = "application/json";
public const string DATA_CONTENT_TYPE_TEXT_VALUE = "text/plain";

public const string DATA_CONTENT_TYPE_ATTR = "datacontenttype";
public const string DATA_SCHEMA_ATTR = "dataschema";
public const string AGENT_SENDER_TYPE_ATTR = "agagentsendertype";
public const string AGENT_SENDER_KEY_ATTR = "agagentsenderkey";

public const string MESSAGE_KIND_ATTR = "agmsgkind";
public const string MESSAGE_KIND_VALUE_PUBLISH = "publish";
public const string MESSAGE_KIND_VALUE_RPC_REQUEST = "rpc_request";
public const string MESSAGE_KIND_VALUE_RPC_RESPONSE = "rpc_response";
}
Loading

0 comments on commit 08f9830

Please sign in to comment.