Skip to content

Commit

Permalink
Refactored EventBus to have a single generic method responsible for e…
Browse files Browse the repository at this point in the history
…vent with and without envelope
  • Loading branch information
oskardudycz committed Nov 26, 2022
1 parent b1d84b6 commit 0a42dbb
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 61 deletions.
116 changes: 80 additions & 36 deletions Core.Marten/Repository/MartenRepository.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
using System.Diagnostics;
using Core.Aggregates;
using Core.OpenTelemetry;
using Core.Tracing;
using Marten;
using OpenTelemetry;
using OpenTelemetry.Context.Propagation;

namespace Core.Marten.Repository;

Expand All @@ -19,57 +23,97 @@ Task<long> Delete(T aggregate, long? expectedVersion = null, TraceMetadata? even
public class MartenRepository<T>: IMartenRepository<T> where T : class, IAggregate
{
private readonly IDocumentSession documentSession;
private readonly IActivityScope activityScope;
private readonly TextMapPropagator propagator = Propagators.DefaultTextMapPropagator;

public MartenRepository(
IDocumentSession documentSession
IDocumentSession documentSession,
IActivityScope activityScope
)
{
this.documentSession = documentSession;
this.activityScope = activityScope;
}

public Task<T?> Find(Guid id, CancellationToken cancellationToken) =>
documentSession.Events.AggregateStreamAsync<T>(id, token: cancellationToken);

public async Task<long> Add(T aggregate, TraceMetadata? traceMetadata = null,
CancellationToken cancellationToken = default)
{
documentSession.CorrelationId = traceMetadata?.CorrelationId?.Value;
documentSession.CausationId = traceMetadata?.CausationId?.Value;

var events = aggregate.DequeueUncommittedEvents();

documentSession.Events.StartStream<Aggregate>(
aggregate.Id,
events
public Task<long> Add(T aggregate, TraceMetadata? traceMetadata = null,
CancellationToken cancellationToken = default) =>
activityScope.Run($"{typeof(MartenRepository<T>).Name}/{nameof(Add)}",
async (activity, ct) =>
{
// documentSession.CorrelationId = activity?.TraceId.ToHexString();
// documentSession.CausationId = activity?.HasRemoteParent == true ? activity.ParentId : null;
documentSession.CorrelationId = traceMetadata?.CorrelationId?.Value;
documentSession.CausationId = traceMetadata?.CausationId?.Value;

if (activity?.Context != null)
{
propagator.Inject(new PropagationContext(activity.Context, Baggage.Current), documentSession,
InjectTraceContextIntoBasicProperties);
}

var events = aggregate.DequeueUncommittedEvents();

documentSession.Events.StartStream<Aggregate>(
aggregate.Id,
events
);

await documentSession.SaveChangesAsync(ct);

return (long)events.Length;
},
new StartActivityOptions { Tags = { { TelemetryTags.Logic.Entity, typeof(T).Name } } },
cancellationToken
);

await documentSession.SaveChangesAsync(cancellationToken);

return events.Length;
}

public async Task<long> Update(T aggregate, long? expectedVersion = null, TraceMetadata? traceMetadata = null,
CancellationToken cancellationToken = default)
{
documentSession.CorrelationId = traceMetadata?.CorrelationId?.Value;
documentSession.CausationId = traceMetadata?.CausationId?.Value;

var events = aggregate.DequeueUncommittedEvents();

var nextVersion = (expectedVersion ?? aggregate.Version) + events.Length;

documentSession.Events.Append(
aggregate.Id,
nextVersion,
events
public Task<long> Update(T aggregate, long? expectedVersion = null, TraceMetadata? traceMetadata = null,
CancellationToken cancellationToken = default) =>
activityScope.Run($"{typeof(MartenRepository<T>).Name}/{nameof(Add)}",
async (activity, ct) =>
{
documentSession.CorrelationId = traceMetadata?.CorrelationId?.Value;
documentSession.CausationId = traceMetadata?.CausationId?.Value;

if (activity?.Context != null)
{
propagator.Inject(new PropagationContext(activity.Context, Baggage.Current), documentSession,
InjectTraceContextIntoBasicProperties);
}

var events = aggregate.DequeueUncommittedEvents();

var nextVersion = (expectedVersion ?? aggregate.Version) + events.Length;

documentSession.Events.Append(
aggregate.Id,
nextVersion,
events
);

await documentSession.SaveChangesAsync(ct);

return nextVersion;
},
new StartActivityOptions { Tags = { { TelemetryTags.Logic.Entity, typeof(T).Name } } },
cancellationToken
);

await documentSession.SaveChangesAsync(cancellationToken);

return nextVersion;
}

public Task<long> Delete(T aggregate, long? expectedVersion = null, TraceMetadata? traceMetadata = null,
CancellationToken cancellationToken = default) =>
Update(aggregate, expectedVersion, traceMetadata, cancellationToken);

private void InjectTraceContextIntoBasicProperties(IDocumentSession session, string key, string value)
{
try
{
session.SetHeader(key, value);
}
catch (Exception ex)
{
Console.WriteLine("Failed to inject trace context." + ex.Message);
}
}
}
2 changes: 2 additions & 0 deletions Core.Marten/Subscriptions/MartenEventPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
using Marten;
using Marten.Events;
using Microsoft.Extensions.DependencyInjection;
using OpenTelemetry.Context.Propagation;

namespace Core.Marten.Subscriptions;

public class MartenEventPublisher: IMartenEventsConsumer
{
private readonly IServiceProvider serviceProvider;
private readonly TextMapPropagator propagator = Propagators.DefaultTextMapPropagator;

public MartenEventPublisher(
IServiceProvider serviceProvider
Expand Down
2 changes: 1 addition & 1 deletion Core/Commands/CommandBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public Task Send<TCommand>(TCommand command, CancellationToken ct = default)

return activityScope.Run(
activityName,
token => retryPolicy.ExecuteAsync(c => commandHandler.Handle(command, c), token),
(_, token) => retryPolicy.ExecuteAsync(c => commandHandler.Handle(command, c), token),
new StartActivityOptions { Tags = {{ TelemetryTags.CommandHandling.Command, commandName }}},
ct
);
Expand Down
50 changes: 32 additions & 18 deletions Core/Events/EventBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,37 @@ AsyncPolicy retryPolicy
this.retryPolicy = retryPolicy;
}

private async Task Publish<TEvent>(TEvent @event, CancellationToken ct)
private async Task Publish<TEvent>(EventEnvelope<TEvent> eventEnvelope, CancellationToken ct)
where TEvent : notnull
{
var eventEnvelope = @event as IEventEnvelope;
using var scope = serviceProvider.CreateScope();
using var tracingScope = createTracingScope(scope.ServiceProvider, eventEnvelope);

var eventName = eventEnvelope?.Data.GetType().Name ?? typeof(TEvent).Name;
var eventName = eventEnvelope.Data.GetType().Name;

var activityOptions = new StartActivityOptions
{
ParentId = eventEnvelope.Metadata.Trace?.CausationId?.Value,
Tags = { { TelemetryTags.EventHandling.Event, eventName } }
};

var eventEnvelopeHandlers =
scope.ServiceProvider.GetServices<IEventHandler<EventEnvelope<TEvent>>>();

foreach (var eventHandler in eventEnvelopeHandlers)
{
var activityName = $"{eventHandler.GetType().Name}/{eventName}";

await activityScope.Run(
activityName,
(_, token) => retryPolicy.ExecuteAsync(c => eventHandler.Handle(eventEnvelope, c), token),
activityOptions,
ct
);
}

// publish also just event data
// thanks to that both handlers with envelope and without will be called
var eventHandlers =
scope.ServiceProvider.GetServices<IEventHandler<TEvent>>();

Expand All @@ -51,30 +74,21 @@ private async Task Publish<TEvent>(TEvent @event, CancellationToken ct)

await activityScope.Run(
activityName,
token => retryPolicy.ExecuteAsync(c => eventHandler.Handle(@event, c), token),
new StartActivityOptions
{
ParentId = eventEnvelope?.Metadata?.Trace?.CausationId?.Value,
Tags = {{ TelemetryTags.EventHandling.Event, eventName }}
},
(_, token) => retryPolicy.ExecuteAsync(c => eventHandler.Handle(eventEnvelope.Data, c), token),
activityOptions,
ct
);
}
}

public async Task Publish(IEventEnvelope eventEnvelope, CancellationToken ct)
public Task Publish(IEventEnvelope eventEnvelope, CancellationToken ct)
{
// publish also just event data
// thanks to that both handlers with envelope and without will be called
await (Task)GetGenericPublishFor(eventEnvelope.Data)
.Invoke(this, new[] { eventEnvelope.Data, ct })!;

await (Task)GetGenericPublishFor(eventEnvelope)
return (Task)GetGenericPublishFor(eventEnvelope)
.Invoke(this, new object[] { eventEnvelope, ct })!;
}

private static MethodInfo GetGenericPublishFor(object @event) =>
PublishMethods.GetOrAdd(@event.GetType(), eventType =>
private static MethodInfo GetGenericPublishFor(IEventEnvelope @event) =>
PublishMethods.GetOrAdd(@event.Data.GetType(), eventType =>
typeof(EventBus)
.GetMethods(BindingFlags.Instance | BindingFlags.NonPublic)
.Single(m => m.Name == nameof(Publish) && m.GetGenericArguments().Any())
Expand Down
47 changes: 42 additions & 5 deletions Core/OpenTelemetry/ActivityScope.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,26 @@ public interface IActivityScope

Task Run(
string name,
Func<CancellationToken, Task> run,
Func<Activity?, CancellationToken, Task> run,
CancellationToken ct
) => Run(name, run, new StartActivityOptions(), ct);

Task Run(
string name,
Func<CancellationToken, Task> run,
Func<Activity?, CancellationToken, Task> run,
StartActivityOptions options,
CancellationToken ct
);

Task<TResult> Run<TResult>(
string name,
Func<Activity?, CancellationToken, Task<TResult>> run,
CancellationToken ct
) => Run(name, run, new StartActivityOptions(), ct);

Task<TResult> Run<TResult>(
string name,
Func<Activity?, CancellationToken, Task<TResult>> run,
StartActivityOptions options,
CancellationToken ct
);
Expand All @@ -39,18 +52,42 @@ public class ActivityScope: IActivityScope

public async Task Run(
string name,
Func<CancellationToken, Task> run,
Func<Activity?, CancellationToken, Task> run,
StartActivityOptions options,
CancellationToken ct
)
{
using var activity = Start(name, options) ?? Activity.Current;

try
{
await run(activity, ct);

activity?.SetStatus(ActivityStatusCode.Ok);
}
catch
{
activity?.SetStatus(ActivityStatusCode.Error);
throw;
}
}

public async Task<TResult> Run<TResult>(
string name,
Func<Activity?, CancellationToken, Task<TResult>> run,
StartActivityOptions options,
CancellationToken ct
)
{
using var activity = Start(name, options);
using var activity = Start(name, options) ?? Activity.Current;

try
{
await run(ct);
var result = await run(activity, ct);

activity?.SetStatus(ActivityStatusCode.Ok);

return result;
}
catch
{
Expand Down
5 changes: 4 additions & 1 deletion Core/OpenTelemetry/TelemetryExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ OpenTelemetryOptions options
options.ConfigureTracerProvider(builder
.AddSource(ActivitySourceProvider.DefaultSourceName)
.AddAspNetCoreInstrumentation()
.AddHttpClientInstrumentation()
.AddHttpClientInstrumentation(o =>
{
o.RecordException = true;
})
.SetResourceBuilder(
ResourceBuilder.CreateDefault()
.AddService(serviceName)
Expand Down
5 changes: 5 additions & 0 deletions Core/OpenTelemetry/TelemetryTags.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ namespace Core.OpenTelemetry;

public static class TelemetryTags
{
public static class Logic
{
public const string Entity = $"{ActivitySourceProvider.DefaultSourceName}.entity";
}

public static class CommandHandling
{
public const string Command = $"{ActivitySourceProvider.DefaultSourceName}.command";
Expand Down
11 changes: 11 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,17 @@ services:
networks:
- pg_network

jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "5775:5775/udp"
- "6831:6831/udp"
- "6832:6832/udp"
- "5778:5778"
- "16686:16686"
- "14268:14268"
- "9411:9411"

#######################################################
# EventStoreDB
#######################################################
Expand Down

0 comments on commit 0a42dbb

Please sign in to comment.