Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

.Net: Process Cloud Events - Publish Events Abstractions #10477

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
c775597
moving event data to a separate class, adding plumbing to capture sou…
esttenorio Feb 11, 2025
79562b7
new abstractions of proxyStep related files
esttenorio Feb 11, 2025
1584e57
LocalRuntime related plumbing of proxyStep - unhooking IExternal from…
esttenorio Feb 11, 2025
3fd8f53
updating existing cloud event uts - working for localRuntime
esttenorio Feb 11, 2025
a89d5af
fixing spelling typos
esttenorio Feb 11, 2025
a893e10
cloud event abstractions working with dapr runtime
esttenorio Feb 27, 2025
55687fd
addressing comments
esttenorio Feb 27, 2025
0c6c030
addressing more comments -> proxybuilder related
esttenorio Feb 27, 2025
4e04bf0
adding component UTs related to proxyBuilder and KernelProcessProxy
esttenorio Feb 27, 2025
55d3277
fixing spelling typo
esttenorio Feb 27, 2025
e715c01
moving shared testing components to shared utilities + adding local r…
esttenorio Feb 28, 2025
bb4dd3b
adding more uts
esttenorio Mar 1, 2025
752798d
removing class external messaging interface from processbuilder in th…
esttenorio Mar 3, 2025
2424c0e
fixing pipeline build errors
esttenorio Mar 3, 2025
1d43e3b
removing emit internal event from proxy
esttenorio Mar 3, 2025
5592a6e
fixing #pragma warning restore CA2211 warning
esttenorio Mar 3, 2025
e2bdd7a
removing verify
esttenorio Mar 3, 2025
f7af833
fixing ut
esttenorio Mar 3, 2025
407b9c9
Merge branch 'main' into estenori/cloudEventsPublishAbstraction
esttenorio Mar 3, 2025
90e885e
Merge branch 'main' into estenori/cloudEventsPublishAbstraction
esttenorio Mar 4, 2025
279387d
fixing uts
esttenorio Mar 4, 2025
ab44b2a
addressing comment
esttenorio Mar 4, 2025
432d79c
more integration tests + moving steps to common steps
esttenorio Mar 4, 2025
eef446c
fixing test formatting error
esttenorio Mar 4, 2025
d123edb
fixing localproxy tests
esttenorio Mar 4, 2025
9c67dfc
fixing formatting
esttenorio Mar 4, 2025
adfec30
addressing comments
esttenorio Mar 5, 2025
7ec9643
missing comment fix
esttenorio Mar 5, 2025
1846b80
fixing error
esttenorio Mar 5, 2025
5425c18
missing change after renaming
esttenorio Mar 5, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ public interface IExternalKernelProcessMessageChannel
/// <param name="externalTopicEvent">name of the topic to be used externally as the event name</param>
/// <param name="eventData">data to be transmitted externally</param>
/// <returns></returns>
abstract Task EmitExternalEventAsync(string externalTopicEvent, object? eventData);
public abstract Task EmitExternalEventAsync(string externalTopicEvent, KernelProcessProxyMessage eventData);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,10 @@ public abstract class KernelProcessContext
/// </summary>
/// <returns></returns>
public abstract Task<IExternalKernelProcessMessageChannel?> GetExternalMessageChannelAsync();

/// <summary>
/// Gets the id of the running process instance
/// </summary>
/// <returns></returns>
public abstract Task<string> GetProcessIdAsync();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) Microsoft. All rights reserved.
using System.Collections.Generic;
using Microsoft.SemanticKernel.Process;
using Microsoft.SemanticKernel.Process.Models;

namespace Microsoft.SemanticKernel;

/// <summary>
/// A serializable representation of a ProcessProxy.
/// </summary>
public sealed record KernelProcessProxy : KernelProcessStepInfo
{
/// <summary>
/// Proxy metadata used for linking specific SK events to external events and viceversa
/// </summary>
public KernelProcessProxyStateMetadata? ProxyMetadata { get; init; }

/// <summary>
/// Creates a new instance of the <see cref="KernelProcess"/> class.
/// </summary>
/// <param name="state">The process state.</param>
/// <param name="edges">The edges for the map.</param>
public KernelProcessProxy(KernelProcessStepState state, Dictionary<string, List<KernelProcessEdge>> edges)
: base(typeof(KernelProxyStep), state, edges)
{
Verify.NotNullOrWhiteSpace(state.Name, $"{nameof(state)}.{nameof(KernelProcessStepState.Name)}");
Verify.NotNullOrWhiteSpace(state.Id, $"{nameof(state)}.{nameof(KernelProcessStepState.Id)}");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) Microsoft. All rights reserved.
using System.Runtime.Serialization;
using System.Text.Json.Serialization;

namespace Microsoft.SemanticKernel;

/// <summary>
/// A serializable representation of an internal message used in a process runtime received by proxy steps.
/// </summary>
/// <remarks>
/// Initializes a new instance of the <see cref="KernelProcessProxyMessage"/> class.
/// </remarks>
[DataContract]
public sealed record KernelProcessProxyMessage
{
/// <summary>
/// Id of the SK process that emits the external event
/// </summary>
[DataMember]
[JsonPropertyName("processId")]
public string? ProcessId { get; init; }

/// <summary>
/// Name of the SK process that triggers sending the event externally
/// </summary>
[DataMember]
[JsonPropertyName("triggerEventId")]
public string? TriggerEventId { get; init; }

/// <summary>
/// Topic name used for publishing process event data externally
/// </summary>
[DataMember]
[JsonPropertyName("externalTopicName")]
public string ExternalTopicName { get; init; } = string.Empty;
/// <summary>
/// Event name used for publishing process event as another process event with a different event name
/// </summary>
[DataMember]
[JsonPropertyName("proxyEventName")]
public string? ProxyEventName { get; init; }
/// <summary>
/// Data to be emitted
/// </summary>
[DataMember]
[JsonPropertyName("eventData")]
public object? EventData { get; init; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,16 @@ public ValueTask EmitEventAsync(
/// Emit an external event to through a <see cref="IExternalKernelProcessMessageChannel"/>
/// component if connected from within the SK process
/// </summary>
/// <param name="topicName"></param>
/// <param name="processEventData"></param>
/// <param name="processEventData">data containing event details</param>
/// <returns></returns>
/// <exception cref="KernelException"></exception>
public async Task EmitExternalEventAsync(string topicName, object? processEventData = null)
public async Task EmitExternalEventAsync(KernelProcessProxyMessage processEventData)
{
if (this._externalMessageChannel == null)
{
throw new KernelException($"External message channel not configured for step with topic {topicName}");
throw new KernelException($"External message channel not configured for step with topic {processEventData.ExternalTopicName}");
}

await this._externalMessageChannel.EmitExternalEventAsync(topicName, processEventData).ConfigureAwait(false);
await this._externalMessageChannel.EmitExternalEventAsync(processEventData.ExternalTopicName, processEventData).ConfigureAwait(false);
}
}
35 changes: 35 additions & 0 deletions dotnet/src/Experimental/Process.Abstractions/KernelProxyStep.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Threading.Tasks;

namespace Microsoft.SemanticKernel.Process;

/// <summary>
/// Internal SK KernelProcessStep preconfigured to be used when emitting SK events outside of the SK Process Framework or inside with a different event name
/// </summary>
public sealed class KernelProxyStep : KernelProcessStep
{
/// <summary>
/// SK Function names in this SK Step as entry points
/// </summary>
public static class Functions
{
/// <summary>
/// Function name used to emit events externally
/// </summary>
public const string EmitExternalEvent = nameof(EmitExternalEvent);
}

/// <summary>
/// Step function used to emit events externally
/// </summary>
/// <param name="context">instance of <see cref="KernelProcessStepContext"/></param>
/// <param name="proxyEvent">event data passed to proxy step</param>
/// <returns></returns>
[KernelFunction(Functions.EmitExternalEvent)]
public Task EmitExternalEventAsync(KernelProcessStepContext context, KernelProcessProxyMessage proxyEvent)
{
Verify.NotNull(proxyEvent.ExternalTopicName, nameof(proxyEvent.ExternalTopicName));
return context.EmitExternalEventAsync(proxyEvent);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) Microsoft. All rights reserved.
using System.Runtime.Serialization;
using System.Text.Json.Serialization;

namespace Microsoft.SemanticKernel.Process.Models;

/// <summary>
/// Process state used for State Persistence serialization
/// </summary>
public sealed record class KernelProcessProxyEventMetadata
{
/// <summary>
/// Name of the topic to be emitted externally
/// </summary>
[DataMember]
[JsonPropertyName("topicName")]
public string TopicName { get; set; } = string.Empty;

/// <summary>
/// Internal id used to identify the SK event
/// </summary>
[DataMember]
[JsonPropertyName("eventId")]
public string EventId { get; set; } = string.Empty;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) Microsoft. All rights reserved.
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.Text.Json.Serialization;

namespace Microsoft.SemanticKernel.Process.Models;

/// <summary>
/// Process state used for State Persistence serialization
/// </summary>
public sealed record class KernelProcessProxyStateMetadata : KernelProcessStepStateMetadata
{
/// <summary>
/// List of publish topics that can be used by the SK process
/// </summary>
[DataMember]
[JsonPropertyName("publishTopics")]
public List<string> PublishTopics { get; set; } = [];

/// <summary>
/// Map that stores which process events trigger external topic to be published and internal metadata information
/// </summary>
[DataMember]
[JsonPropertyName("eventMetadata")]
public Dictionary<string, KernelProcessProxyEventMetadata> EventMetadata { get; set; } = [];
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace Microsoft.SemanticKernel.Process.Models;
[JsonPolymorphic(TypeDiscriminatorPropertyName = "$type", UnknownDerivedTypeHandling = JsonUnknownDerivedTypeHandling.FallBackToNearestAncestor)]
[JsonDerivedType(typeof(KernelProcessStepStateMetadata), typeDiscriminator: nameof(ProcessConstants.SupportedComponents.Step))]
[JsonDerivedType(typeof(KernelProcessMapStateMetadata), typeDiscriminator: nameof(ProcessConstants.SupportedComponents.Map))]
[JsonDerivedType(typeof(KernelProcessProxyStateMetadata), typeDiscriminator: nameof(ProcessConstants.SupportedComponents.Proxy))]
[JsonDerivedType(typeof(KernelProcessStateMetadata), typeDiscriminator: nameof(ProcessConstants.SupportedComponents.Process))]
public record class KernelProcessStepStateMetadata
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) Microsoft. All rights reserved.

namespace Microsoft.SemanticKernel.Process.Internal;
internal class ProcessEventData
{
/// <summary>
/// SK Process Event Id, id assigned during runtime
/// </summary>
public string EventId { get; init; } = string.Empty;

/// <summary>
/// SK Process Event Name, human readable, defined when creating the process builder
/// </summary>
public string EventName { get; init; } = string.Empty;
}
17 changes: 17 additions & 0 deletions dotnet/src/Experimental/Process.Core/ProcessBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.SemanticKernel.Process;
using Microsoft.SemanticKernel.Process.Internal;
using Microsoft.SemanticKernel.Process.Models;

Expand Down Expand Up @@ -242,6 +243,22 @@ public ProcessMapBuilder AddMapStepFromProcess(ProcessBuilder process, IReadOnly
return this.AddStep(mapBuilder, aliases);
}

/// <summary>
/// Adds proxy step to the process that allows emitting events externally. For making use of it, there should be an implementation
/// of <see cref="IExternalKernelProcessMessageChannel"/> passed.
/// For now, the current implementation only allows for 1 implementation of <see cref="IExternalKernelProcessMessageChannel"/> at the time.
/// </summary>
/// <param name="externalTopics">topic names to be used externally</param>
/// <param name="name">name of the proxy step</param>
/// <param name="aliases">Aliases that have been used by previous versions of the step, used for supporting backward compatibility when reading old version Process States</param>
/// <returns>An instance of <see cref="ProcessProxyBuilder"/></returns>
public ProcessProxyBuilder AddProxyStep(IReadOnlyList<string> externalTopics, string? name = null, IReadOnlyList<string>? aliases = null)
{
ProcessProxyBuilder proxyBuilder = new(externalTopics, name ?? nameof(KernelProxyStep));

return this.AddStep(proxyBuilder, aliases);
}

/// <summary>
/// Provides an instance of <see cref="ProcessStepEdgeBuilder"/> for defining an edge to a
/// step inside the process for a given external event.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public ProcessEdgeBuilder SendEventTo(ProcessFunctionTargetBuilder target)
}

this.Target = target;
ProcessStepEdgeBuilder edgeBuilder = new(this.Source, this.EventId) { Target = this.Target };
ProcessStepEdgeBuilder edgeBuilder = new(this.Source, this.EventId, this.EventId) { Target = this.Target };
this.Source.LinkTo(this.EventId, edgeBuilder);

return new ProcessEdgeBuilder(this.Source, this.EventId);
Expand Down
93 changes: 93 additions & 0 deletions dotnet/src/Experimental/Process.Core/ProcessProxyBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.SemanticKernel.Process;
using Microsoft.SemanticKernel.Process.Internal;
using Microsoft.SemanticKernel.Process.Models;

namespace Microsoft.SemanticKernel;

/// <summary>
/// Provides functionality to allow emitting external messages from within the SK
/// process.
/// </summary>
public sealed class ProcessProxyBuilder : ProcessStepBuilder<KernelProxyStep>
{
/// <summary>
/// Initializes a new instance of the <see cref="ProcessProxyBuilder"/> class.
/// </summary>
internal ProcessProxyBuilder(IReadOnlyList<string> externalTopics, string name)
: base(name)
{
if (externalTopics.Count == 0)
{
throw new ArgumentException("No topic names registered");
}

this._externalTopicUsage = externalTopics.ToDictionary(topic => topic, topic => false);
if (this._externalTopicUsage.Count < externalTopics.Count)
{
throw new ArgumentException("Topic names registered must be different");
}
}

/// <summary>
/// Version of the proxy step, used when saving the state of the step.
/// </summary>
public string Version { get; init; } = "v1";

internal readonly Dictionary<string, bool> _externalTopicUsage;

// For supporting multiple step edges getting linked to the same external topic, current implementation needs to be updated
// to instead have a list of potential edges in case event names in different steps have same name
internal readonly Dictionary<string, KernelProcessProxyEventMetadata> _eventMetadata = [];

internal ProcessFunctionTargetBuilder GetExternalFunctionTargetBuilder()
{
return new ProcessFunctionTargetBuilder(this, functionName: KernelProxyStep.Functions.EmitExternalEvent, parameterName: "proxyEvent");
}

internal void LinkTopicToStepEdgeInfo(string topicName, ProcessStepBuilder sourceStep, ProcessEventData eventData)
{
if (!this._externalTopicUsage.TryGetValue(topicName, out bool usedTopic))
{
throw new InvalidOperationException($"Topic name {topicName} is not registered as proxy publish event, register first before using");
}

if (usedTopic)
{
throw new InvalidOperationException($"Topic name {topicName} is is already linked to another step edge");
}

this._eventMetadata[eventData.EventName] = new() { EventId = eventData.EventId, TopicName = topicName };
this._externalTopicUsage[topicName] = true;
}

/// <inheritdoc/>
internal override KernelProcessStepInfo BuildStep(KernelProcessStepStateMetadata? stateMetadata = null)
{
if (this._externalTopicUsage.All(topic => !topic.Value))
{
throw new InvalidOperationException("Proxy step does not have linked steps to it, link step edges to proxy or remove proxy step");
}

KernelProcessProxyStateMetadata proxyMetadata = new()
{
Name = this.Name,
Id = this.Id,
EventMetadata = this._eventMetadata,
PublishTopics = this._externalTopicUsage.ToList().Select(topic => topic.Key).ToList(),
};

// Build the edges first
var builtEdges = this.Edges.ToDictionary(kvp => kvp.Key, kvp => kvp.Value.Select(e => e.Build()).ToList());

KernelProcessStepState state = new(this.Name, this.Version, this.Id);

return new KernelProcessProxy(state, builtEdges)
{
ProxyMetadata = proxyMetadata
};
}
}
4 changes: 2 additions & 2 deletions dotnet/src/Experimental/Process.Core/ProcessStepBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public ProcessStepEdgeBuilder OnEvent(string eventId)
{
// scope the event to this instance of this step
var scopedEventId = this.GetScopedEventId(eventId);
return new ProcessStepEdgeBuilder(this, scopedEventId);
return new ProcessStepEdgeBuilder(this, scopedEventId, eventId);
}

/// <summary>
Expand Down Expand Up @@ -229,7 +229,7 @@ protected ProcessStepBuilder(string name)
/// <summary>
/// Provides functionality for incrementally defining a process step.
/// </summary>
public sealed class ProcessStepBuilder<TStep> : ProcessStepBuilder where TStep : KernelProcessStep
public class ProcessStepBuilder<TStep> : ProcessStepBuilder where TStep : KernelProcessStep
{
/// <summary>
/// The initial state of the step. This may be null if the step does not have any state.
Expand Down
Loading
Loading