Skip to content

Commit

Permalink
Merge branch 'develop-v0' into feature/entity-framework-includes
Browse files Browse the repository at this point in the history
  • Loading branch information
rasmus authored Aug 30, 2021
2 parents 206c13e + 3fcc354 commit a80626e
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 6 deletions.
1 change: 1 addition & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

* Fix: Source IDs are now added to snapshots
* Fix: InMemoryReadStore will not break on unmodified update result
* Fix: Allow the use of explicitly implemented interfaces in the read model
* New: added extension methods to the `EventFlow.EntityFramework` package that allow
us to configure [eager loading of related data](https://docs.microsoft.com/en-us/ef/core/querying/related-data/eager). Example usage:
```csharp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,19 @@ public void Apply(IReadModelContext context, IDomainEvent<ThingyAggregate, Thing
}
}

public class ExplicitPingReadModel : IReadModel,
IAmReadModelFor<ThingyAggregate, ThingyId, ThingyPingEvent>
{
public bool PingEventsReceived { get; private set; }

void IAmReadModelFor<ThingyAggregate, ThingyId, ThingyPingEvent>.Apply(
IReadModelContext context,
IDomainEvent<ThingyAggregate, ThingyId, ThingyPingEvent> domainEvent)
{
PingEventsReceived = true;
}
}

public class TheOtherPingReadModel : IReadModel,
IAmReadModelFor<ThingyAggregate, ThingyId, ThingyPingEvent>
{
Expand All @@ -71,6 +84,21 @@ public async Task ApplyAsync(IReadModelContext context, IDomainEvent<ThingyAggre
}
}

public class AsyncExplicitPingReadModel : IReadModel,
IAmAsyncReadModelFor<ThingyAggregate, ThingyId, ThingyPingEvent>
{
public bool PingEventsReceived { get; private set; }

async Task IAmAsyncReadModelFor<ThingyAggregate, ThingyId, ThingyPingEvent>.ApplyAsync(
IReadModelContext context,
IDomainEvent<ThingyAggregate, ThingyId, ThingyPingEvent> domainEvent,
CancellationToken cancellationToken)
{
await Task.Delay(50, cancellationToken).ConfigureAwait(false);
PingEventsReceived = true;
}
}

public class DomainErrorAfterFirstReadModel : IReadModel,
IAmReadModelFor<ThingyAggregate, ThingyId, ThingyDomainErrorAfterFirstEvent>
{
Expand Down Expand Up @@ -222,6 +250,28 @@ await Sut.UpdateReadModelAsync(
readModel.PingEventsReceived.Should().BeTrue();
}

[Test]
public async Task ExplicitInterfaceReadModelReceivesEvent()
{
// Arrange
var events = new[]
{
ToDomainEvent(A<ThingyPingEvent>()),
};
var readModel = new ExplicitPingReadModel();

// Act
await Sut.UpdateReadModelAsync(
readModel,
events,
A<IReadModelContext>(),
CancellationToken.None)
.ConfigureAwait(false);

// Assert
readModel.PingEventsReceived.Should().BeTrue();
}

[Test]
public async Task AsyncReadModelReceivesEvent()
{
Expand All @@ -243,5 +293,27 @@ await Sut.UpdateReadModelAsync(
// Assert
readModel.PingEventsReceived.Should().BeTrue();
}

[Test]
public async Task AsyncExplicitInterfaceReadModelReceivesEvent()
{
// Arrange
var events = new[]
{
ToDomainEvent(A<ThingyPingEvent>()),
};
var readModel = new AsyncExplicitPingReadModel();

// Act
await Sut.UpdateReadModelAsync(
readModel,
events,
A<IReadModelContext>(),
CancellationToken.None)
.ConfigureAwait(false);

// Assert
readModel.PingEventsReceived.Should().BeTrue();
}
}
}
38 changes: 32 additions & 6 deletions Source/EventFlow/ReadStores/ReadModelDomainEventApplier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,15 @@ public async Task<bool> UpdateReadModelAsync<TReadModel>(
domainEvent.EventType,
t =>
{
var domainEventType = typeof(IDomainEvent<,,>).MakeGenericType(domainEvent.AggregateType,
domainEvent.GetIdentity().GetType(), t);
var identityType = domainEvent.GetIdentity().GetType();
var aggregateType = domainEvent.AggregateType;
var eventType = typeof(IDomainEvent<,,>).MakeGenericType(aggregateType, identityType, t);

var methodSignature = new[] {typeof(IReadModelContext), domainEventType};
var methodInfo = readModelType.GetTypeInfo().GetMethod(ApplyMethodName, methodSignature);
// first try: does it implement the synchronous 'Apply' method?

var interfaceType = typeof(IAmReadModelFor<,,>).MakeGenericType(aggregateType, identityType, t);
var methodParams = new[] {typeof(IReadModelContext), eventType};
var methodInfo = GetMethod(readModelType, interfaceType, ApplyMethodName, methodParams);

if (methodInfo != null)
{
Expand All @@ -72,8 +76,11 @@ public async Task<bool> UpdateReadModelAsync<TReadModel>(
return new ApplyMethod(method);
}

var asyncMethodSignature = new[] {typeof(IReadModelContext), domainEventType, typeof(CancellationToken)};
methodInfo = readModelType.GetTypeInfo().GetMethod(ApplyAsyncMethodName, asyncMethodSignature);
// second try: does it implement the asynchronous 'Apply' method?

var asyncInterfaceType = typeof(IAmAsyncReadModelFor<,,>).MakeGenericType(aggregateType, identityType, t);
var asyncMethodParams = new[] {typeof(IReadModelContext), eventType, typeof(CancellationToken)};
methodInfo = GetMethod(readModelType, asyncInterfaceType, ApplyAsyncMethodName, asyncMethodParams);

if (methodInfo != null)
{
Expand All @@ -82,6 +89,8 @@ public async Task<bool> UpdateReadModelAsync<TReadModel>(
return new ApplyMethod(method);
}

// no matching 'Apply' method found

return null;
});

Expand All @@ -95,6 +104,23 @@ public async Task<bool> UpdateReadModelAsync<TReadModel>(
return appliedAny;
}

private static MethodInfo GetMethod(Type instanceType, Type interfaceType, string name, Type[] parameters)
{
var methodInfo = instanceType
.GetTypeInfo()
.GetMethod(name, parameters);

if (methodInfo != null)
{
return methodInfo;
}

var type = interfaceType.GetTypeInfo();
return type.IsAssignableFrom(instanceType)
? type.GetMethod(name, parameters)
: default;
}

private class ApplyMethod
{
private readonly Func<IReadModel, IReadModelContext, IDomainEvent, CancellationToken, Task> _asyncMethod;
Expand Down

0 comments on commit a80626e

Please sign in to comment.