From 4b93fb1c41403527fc2aa02318b0d85c609c5fd6 Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Thu, 24 Feb 2022 08:06:37 +0100 Subject: [PATCH 1/9] Move to .NET Standard 2.1 to get IAsyncEnumerable --- .../EventFlow.MsSql.Tests.csproj | 2 +- Source/EventFlow.MsSql/EventFlow.MsSql.csproj | 2 +- Source/EventFlow.Sql/EventFlow.Sql.csproj | 2 +- .../EventFlow.TestHelpers.csproj | 2 +- Source/EventFlow.Tests/EventFlow.Tests.csproj | 2 +- Source/EventFlow/EventFlow.csproj | 11 ++--------- 6 files changed, 7 insertions(+), 14 deletions(-) diff --git a/Source/EventFlow.MsSql.Tests/EventFlow.MsSql.Tests.csproj b/Source/EventFlow.MsSql.Tests/EventFlow.MsSql.Tests.csproj index 221bebad4..57f11831a 100644 --- a/Source/EventFlow.MsSql.Tests/EventFlow.MsSql.Tests.csproj +++ b/Source/EventFlow.MsSql.Tests/EventFlow.MsSql.Tests.csproj @@ -1,7 +1,7 @@  - netcoreapp2.1;netcoreapp3.1;net6.0 + netcoreapp3.1;net6.0 True False False diff --git a/Source/EventFlow.MsSql/EventFlow.MsSql.csproj b/Source/EventFlow.MsSql/EventFlow.MsSql.csproj index 051260875..a6b8044fe 100644 --- a/Source/EventFlow.MsSql/EventFlow.MsSql.csproj +++ b/Source/EventFlow.MsSql/EventFlow.MsSql.csproj @@ -1,7 +1,7 @@  - netstandard2.0;netcoreapp2.1;netcoreapp3.1;net6.0 + netstandard2.1;netcoreapp3.1;net6.0 True True False diff --git a/Source/EventFlow.Sql/EventFlow.Sql.csproj b/Source/EventFlow.Sql/EventFlow.Sql.csproj index ede1b251b..d27fdb154 100644 --- a/Source/EventFlow.Sql/EventFlow.Sql.csproj +++ b/Source/EventFlow.Sql/EventFlow.Sql.csproj @@ -1,7 +1,7 @@  - netstandard2.0;netcoreapp2.1;netcoreapp3.1;net6.0 + netstandard2.1;netcoreapp3.1;net6.0 True True False diff --git a/Source/EventFlow.TestHelpers/EventFlow.TestHelpers.csproj b/Source/EventFlow.TestHelpers/EventFlow.TestHelpers.csproj index 123e83f2e..0ef9425bc 100644 --- a/Source/EventFlow.TestHelpers/EventFlow.TestHelpers.csproj +++ b/Source/EventFlow.TestHelpers/EventFlow.TestHelpers.csproj @@ -1,7 +1,7 @@  - netstandard2.0;netcoreapp2.1;netcoreapp3.1;net6.0 + netstandard2.1;netcoreapp3.1;net6.0 True False False diff --git a/Source/EventFlow.Tests/EventFlow.Tests.csproj b/Source/EventFlow.Tests/EventFlow.Tests.csproj index 273bd9ada..d9f01591d 100644 --- a/Source/EventFlow.Tests/EventFlow.Tests.csproj +++ b/Source/EventFlow.Tests/EventFlow.Tests.csproj @@ -1,7 +1,7 @@  - netcoreapp2.1;netcoreapp3.1;net6.0 + netcoreapp3.1;net6.0 True False False diff --git a/Source/EventFlow/EventFlow.csproj b/Source/EventFlow/EventFlow.csproj index 4dd6838a8..6c1c909b2 100644 --- a/Source/EventFlow/EventFlow.csproj +++ b/Source/EventFlow/EventFlow.csproj @@ -1,7 +1,7 @@  - netstandard2.0;netcoreapp2.1;netcoreapp3.1;net6.0 + netstandard2.1;netcoreapp3.1;net6.0 True True False @@ -36,14 +36,7 @@ - - - - - - - - + From 2a60e1712443049d5f3bdfa95c0395d0ec2065dc Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Wed, 8 Jun 2022 22:34:52 +0200 Subject: [PATCH 2/9] PoC how a replacement might look --- .../EventUpgradeExplorationTest.cs | 7 ++- .../EventStores/EventUpgradeManagerTests.cs | 24 ++++----- Source/EventFlow/EventStores/EventUpgrader.cs | 51 +++++++++++++++++++ .../EventStores/IEventUpgradeContext.cs | 28 ++++++++++ .../EventFlow/EventStores/IEventUpgrader.cs | 8 ++- 5 files changed, 100 insertions(+), 18 deletions(-) create mode 100644 Source/EventFlow/EventStores/EventUpgrader.cs create mode 100644 Source/EventFlow/EventStores/IEventUpgradeContext.cs diff --git a/Source/EventFlow.Tests/Exploration/EventUpgradeExplorationTest.cs b/Source/EventFlow.Tests/Exploration/EventUpgradeExplorationTest.cs index b845d45ef..38a231363 100644 --- a/Source/EventFlow.Tests/Exploration/EventUpgradeExplorationTest.cs +++ b/Source/EventFlow.Tests/Exploration/EventUpgradeExplorationTest.cs @@ -124,7 +124,7 @@ public class UpgradeEventV2 : IAggregateEvent { } - public class UpgradeV1ToV2 : IEventUpgrader + public class UpgradeV1ToV2 : EventUpgrader { private readonly IDomainEventFactory _domainEventFactory; @@ -134,11 +134,10 @@ public UpgradeV1ToV2( _domainEventFactory = domainEventFactory; } - public IEnumerable> Upgrade( + protected override IEnumerable> Upgrade( IDomainEvent domainEvent) { - var v1 = domainEvent as IDomainEvent; - yield return v1 == null + yield return !(domainEvent is IDomainEvent v1) ? domainEvent : _domainEventFactory.Upgrade(domainEvent, new UpgradeEventV2()); } diff --git a/Source/EventFlow.Tests/UnitTests/EventStores/EventUpgradeManagerTests.cs b/Source/EventFlow.Tests/UnitTests/EventStores/EventUpgradeManagerTests.cs index 22188b208..7da7f9ae5 100644 --- a/Source/EventFlow.Tests/UnitTests/EventStores/EventUpgradeManagerTests.cs +++ b/Source/EventFlow.Tests/UnitTests/EventStores/EventUpgradeManagerTests.cs @@ -122,7 +122,7 @@ public class TestEventV2 : AggregateEvent { } public class TestEventV3 : AggregateEvent { } public class DamagedEvent : AggregateEvent { } - public class UpgradeTestEventV1ToTestEventV2 : IEventUpgrader + public class UpgradeTestEventV1ToTestEventV2 : EventUpgrader { private readonly IDomainEventFactory _domainEventFactory; @@ -131,16 +131,16 @@ public UpgradeTestEventV1ToTestEventV2(IDomainEventFactory domainEventFactory) _domainEventFactory = domainEventFactory; } - public IEnumerable> Upgrade(IDomainEvent domainEvent) + protected override IEnumerable> Upgrade( + IDomainEvent domainEvent) { - var testEvent1 = domainEvent as IDomainEvent; - yield return testEvent1 == null + yield return !(domainEvent is IDomainEvent _) ? domainEvent : _domainEventFactory.Upgrade(domainEvent, new TestEventV2()); } } - public class UpgradeTestEventV2ToTestEventV3 : IEventUpgrader + public class UpgradeTestEventV2ToTestEventV3 : EventUpgrader { private readonly IDomainEventFactory _domainEventFactory; @@ -149,21 +149,21 @@ public UpgradeTestEventV2ToTestEventV3(IDomainEventFactory domainEventFactory) _domainEventFactory = domainEventFactory; } - public IEnumerable> Upgrade(IDomainEvent domainEvent) + protected override IEnumerable> Upgrade( + IDomainEvent domainEvent) { - var testEvent2 = domainEvent as IDomainEvent; - yield return testEvent2 == null + yield return !(domainEvent is IDomainEvent _) ? domainEvent : _domainEventFactory.Upgrade(domainEvent, new TestEventV3()); } } - public class DamagedEventRemover : IEventUpgrader + public class DamagedEventRemover : EventUpgrader { - public IEnumerable> Upgrade(IDomainEvent domainEvent) + protected override IEnumerable> Upgrade( + IDomainEvent domainEvent) { - var damagedEvent = domainEvent as IDomainEvent; - if (damagedEvent == null) + if (!(domainEvent is IDomainEvent _)) { yield return domainEvent; } diff --git a/Source/EventFlow/EventStores/EventUpgrader.cs b/Source/EventFlow/EventStores/EventUpgrader.cs new file mode 100644 index 000000000..0651c4e9e --- /dev/null +++ b/Source/EventFlow/EventStores/EventUpgrader.cs @@ -0,0 +1,51 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2022 Rasmus Mikkelsen +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System.Collections.Generic; +using System.Runtime.CompilerServices; +using System.Threading; +using EventFlow.Aggregates; +using EventFlow.Core; + +#pragma warning disable CS1998 + +namespace EventFlow.EventStores +{ + public abstract class EventUpgrader : IEventUpgrader + where TAggregate : IAggregateRoot + where TIdentity : IIdentity + { + protected abstract IEnumerable> Upgrade( + IDomainEvent domainEvent); + + public async IAsyncEnumerable> UpgradeAsync( + IDomainEvent domainEvent, + IEventUpgradeContext eventUpgradeContext, + [EnumeratorCancellation] CancellationToken cancellationToken) + { + foreach (var upgradedDomainEvent in Upgrade(domainEvent)) + { + yield return upgradedDomainEvent; + } + } + } +} diff --git a/Source/EventFlow/EventStores/IEventUpgradeContext.cs b/Source/EventFlow/EventStores/IEventUpgradeContext.cs new file mode 100644 index 000000000..ea1df4291 --- /dev/null +++ b/Source/EventFlow/EventStores/IEventUpgradeContext.cs @@ -0,0 +1,28 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2022 Rasmus Mikkelsen +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +namespace EventFlow.EventStores +{ + public interface IEventUpgradeContext + { + } +} diff --git a/Source/EventFlow/EventStores/IEventUpgrader.cs b/Source/EventFlow/EventStores/IEventUpgrader.cs index 63137c745..15c575eeb 100644 --- a/Source/EventFlow/EventStores/IEventUpgrader.cs +++ b/Source/EventFlow/EventStores/IEventUpgrader.cs @@ -22,6 +22,7 @@ // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. using System.Collections.Generic; +using System.Threading; using EventFlow.Aggregates; using EventFlow.Core; @@ -31,6 +32,9 @@ public interface IEventUpgrader where TAggregate : IAggregateRoot where TIdentity : IIdentity { - IEnumerable> Upgrade(IDomainEvent domainEvent); + IAsyncEnumerable> UpgradeAsync( + IDomainEvent domainEvent, + IEventUpgradeContext eventUpgradeContext, + CancellationToken cancellationToken); } -} \ No newline at end of file +} From 536529a3b9cbae384c5fa0d801fa479c8aa9140c Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Wed, 8 Jun 2022 22:50:27 +0200 Subject: [PATCH 3/9] Just need to convert the hard bits now --- .../EventStores/EventUpgradeManagerTests.cs | 19 +++++++++++-------- Source/EventFlow/EventFlow.csproj | 1 + .../EventFlow/EventStores/EventStoreBase.cs | 12 ++++++++++-- .../EventStores/EventUpgradeManager.cs | 17 ++++++++++------- .../EventStores/IEventUpgradeManager.cs | 12 ++++++++---- 5 files changed, 40 insertions(+), 21 deletions(-) diff --git a/Source/EventFlow.Tests/UnitTests/EventStores/EventUpgradeManagerTests.cs b/Source/EventFlow.Tests/UnitTests/EventStores/EventUpgradeManagerTests.cs index 7da7f9ae5..cfa56f81f 100644 --- a/Source/EventFlow.Tests/UnitTests/EventStores/EventUpgradeManagerTests.cs +++ b/Source/EventFlow.Tests/UnitTests/EventStores/EventUpgradeManagerTests.cs @@ -23,6 +23,9 @@ using System; using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; using EventFlow.Aggregates; using EventFlow.EventStores; using EventFlow.TestHelpers; @@ -64,20 +67,20 @@ public void SetUp() } [Test] - public void EmptyListReturnsEmptyList() + public async Task EmptyListReturnsEmptyList() { // Arrange var events = new IDomainEvent[] { }; // Act - var upgradedEvents = Sut.Upgrade(events); + var upgradedEvents = await Sut.UpgradeAsync(events.ToAsyncEnumerable(), CancellationToken.None).ToArrayAsync(); // Assert upgradedEvents.Should().BeEmpty(); } [Test] - public void EventWithNoUpgradersIsReturned() + public async Task EventWithNoUpgradersIsReturned() { // Arrange var events = new[] @@ -87,15 +90,15 @@ public void EventWithNoUpgradersIsReturned() }; // Act - var upgradedEvents = Sut.Upgrade(events); + var upgradedEvents = await Sut.UpgradeAsync(events.ToAsyncEnumerable(), CancellationToken.None).ToArrayAsync(); // Assert - upgradedEvents.Count.Should().Be(2); + upgradedEvents.Length.Should().Be(2); upgradedEvents.Should().Contain(events); } [Test] - public void EventsAreUpgradedToLatestVersion() + public async Task EventsAreUpgradedToLatestVersion() { // Arrange var events = new[] @@ -107,10 +110,10 @@ public void EventsAreUpgradedToLatestVersion() }; // Act - var upgradedEvents = Sut.Upgrade(events); + var upgradedEvents = await Sut.UpgradeAsync(events.ToAsyncEnumerable(), CancellationToken.None).ToArrayAsync(); // Assert - upgradedEvents.Count.Should().Be(3); + upgradedEvents.Length.Should().Be(3); foreach (var upgradedEvent in upgradedEvents) { upgradedEvent.Should().BeAssignableTo>(); diff --git a/Source/EventFlow/EventFlow.csproj b/Source/EventFlow/EventFlow.csproj index ee7a17e7c..6692fd63e 100644 --- a/Source/EventFlow/EventFlow.csproj +++ b/Source/EventFlow/EventFlow.csproj @@ -32,6 +32,7 @@ + All diff --git a/Source/EventFlow/EventStores/EventStoreBase.cs b/Source/EventFlow/EventStores/EventStoreBase.cs index fe7680733..17e08134d 100644 --- a/Source/EventFlow/EventStores/EventStoreBase.cs +++ b/Source/EventFlow/EventStores/EventStoreBase.cs @@ -131,7 +131,12 @@ public async Task LoadAllEventsAsync( var domainEvents = (IReadOnlyCollection) allCommittedEventsPage.CommittedDomainEvents .Select(e => _eventJsonSerializer.Deserialize(e)) .ToList(); - domainEvents = _eventUpgradeManager.Upgrade(domainEvents); + + // TODO: Pass a real IAsyncEnumerable instead + domainEvents = await _eventUpgradeManager.UpgradeAsync( + domainEvents.ToAsyncEnumerable(), + cancellationToken).ToArrayAsync(cancellationToken); + return new AllEventsPage(allCommittedEventsPage.NextGlobalPosition, domainEvents); } @@ -170,7 +175,10 @@ public virtual async Task> Upgrade { get; } + public Func> Upgrade { get; } - public EventUpgraderCacheItem(Type eventUpgraderType, Func> upgrade) + public EventUpgraderCacheItem(Type eventUpgraderType, Func> upgrade) { EventUpgraderType = eventUpgraderType; Upgrade = upgrade; @@ -61,9 +62,11 @@ public EventUpgradeManager( _serviceProvider = serviceProvider; } - public IReadOnlyCollection Upgrade(IReadOnlyCollection domainEvents) + public IAsyncEnumerable UpgradeAsync( + IAsyncEnumerable domainEvents, + CancellationToken cancellationToken) { - return Upgrade((IEnumerable) domainEvents).ToList(); + return Upgrade((IEnumerable) domainEvents); } private IEnumerable Upgrade(IEnumerable domainEvents) @@ -116,8 +119,8 @@ private IEnumerable Upgrade(IEnumerable domainEvents .OrderBy(d => d.AggregateSequenceNumber); } - public IReadOnlyCollection> Upgrade( - IReadOnlyCollection> domainEvents) + public IAsyncEnumerable> UpgradeAsync( + IAsyncEnumerable> domainEvents, CancellationToken cancellationToken) where TAggregate : IAggregateRoot where TIdentity : IIdentity { diff --git a/Source/EventFlow/EventStores/IEventUpgradeManager.cs b/Source/EventFlow/EventStores/IEventUpgradeManager.cs index 993608d60..b81d918e9 100644 --- a/Source/EventFlow/EventStores/IEventUpgradeManager.cs +++ b/Source/EventFlow/EventStores/IEventUpgradeManager.cs @@ -22,6 +22,7 @@ // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. using System.Collections.Generic; +using System.Threading; using EventFlow.Aggregates; using EventFlow.Core; @@ -29,11 +30,14 @@ namespace EventFlow.EventStores { public interface IEventUpgradeManager { - IReadOnlyCollection Upgrade(IReadOnlyCollection domainEvents); + IAsyncEnumerable UpgradeAsync( + IAsyncEnumerable domainEvents, + CancellationToken cancellationToken); - IReadOnlyCollection> Upgrade( - IReadOnlyCollection> domainEvents) + IAsyncEnumerable> UpgradeAsync( + IAsyncEnumerable> domainEvents, + CancellationToken cancellationToken) where TAggregate : IAggregateRoot where TIdentity : IIdentity; } -} \ No newline at end of file +} From d4016134b9a031e379b779ecc2867d237f1029ec Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Wed, 8 Jun 2022 23:36:51 +0200 Subject: [PATCH 4/9] Simple case done --- .../EventStores/EventUpgradeManagerTests.cs | 29 ++--- .../EventStores/EventUpgradeManager.cs | 118 +++++------------- 2 files changed, 41 insertions(+), 106 deletions(-) diff --git a/Source/EventFlow.Tests/UnitTests/EventStores/EventUpgradeManagerTests.cs b/Source/EventFlow.Tests/UnitTests/EventStores/EventUpgradeManagerTests.cs index cfa56f81f..e66a0e58c 100644 --- a/Source/EventFlow.Tests/UnitTests/EventStores/EventUpgradeManagerTests.cs +++ b/Source/EventFlow.Tests/UnitTests/EventStores/EventUpgradeManagerTests.cs @@ -33,7 +33,7 @@ using EventFlow.TestHelpers.Aggregates.Events; using EventFlow.TestHelpers.Aggregates.ValueObjects; using FluentAssertions; -using Moq; +using Microsoft.Extensions.DependencyInjection; using NUnit.Framework; namespace EventFlow.Tests.UnitTests.EventStores @@ -41,29 +41,18 @@ namespace EventFlow.Tests.UnitTests.EventStores [Category(Categories.Unit)] public class EventUpgradeManagerTests : TestsFor { - private Mock _serviceProviderMock; + private ServiceProvider _serviceProvider; [SetUp] public void SetUp() { - _serviceProviderMock = InjectMock(); - - _serviceProviderMock - .Setup(r => r.GetService(typeof(IEnumerable>))) - .Returns(new IEventUpgrader[] - { - new UpgradeTestEventV1ToTestEventV2(DomainEventFactory), - new UpgradeTestEventV2ToTestEventV3(DomainEventFactory), - new DamagedEventRemover(), - }); - _serviceProviderMock - .Setup(r => r.GetService(typeof(IEnumerable>))) - .Returns(new object[] - { - new UpgradeTestEventV1ToTestEventV2(DomainEventFactory), - new UpgradeTestEventV2ToTestEventV3(DomainEventFactory), - new DamagedEventRemover(), - }); + _serviceProvider = new ServiceCollection() + .AddSingleton>(new UpgradeTestEventV1ToTestEventV2(DomainEventFactory)) + .AddSingleton>(new UpgradeTestEventV2ToTestEventV3(DomainEventFactory)) + .AddSingleton>(new DamagedEventRemover()) + .BuildServiceProvider(); + + Inject(_serviceProvider); } [Test] diff --git a/Source/EventFlow/EventStores/EventUpgradeManager.cs b/Source/EventFlow/EventStores/EventUpgradeManager.cs index 989a325d3..16478f983 100644 --- a/Source/EventFlow/EventStores/EventUpgradeManager.cs +++ b/Source/EventFlow/EventStores/EventUpgradeManager.cs @@ -22,14 +22,13 @@ // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. using System; -using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; -using System.Reflection; +using System.Runtime.CompilerServices; using System.Threading; +using System.Threading.Tasks; using EventFlow.Aggregates; using EventFlow.Core; -using EventFlow.Extensions; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -37,20 +36,6 @@ namespace EventFlow.EventStores { public class EventUpgradeManager : IEventUpgradeManager { - private static readonly ConcurrentDictionary EventUpgraderCacheItems = new ConcurrentDictionary(); - - private class EventUpgraderCacheItem - { - public Type EventUpgraderType { get; } - public Func> Upgrade { get; } - - public EventUpgraderCacheItem(Type eventUpgraderType, Func> upgrade) - { - EventUpgraderType = eventUpgraderType; - Upgrade = upgrade; - } - } - private readonly ILogger _logger; private readonly IServiceProvider _serviceProvider; @@ -62,92 +47,53 @@ public EventUpgradeManager( _serviceProvider = serviceProvider; } + public IAsyncEnumerable UpgradeAsync( IAsyncEnumerable domainEvents, CancellationToken cancellationToken) { - return Upgrade((IEnumerable) domainEvents); + throw new NotImplementedException(); } - private IEnumerable Upgrade(IEnumerable domainEvents) + public async IAsyncEnumerable> UpgradeAsync( + IAsyncEnumerable> domainEvents, + [EnumeratorCancellation] CancellationToken cancellationToken) + where TAggregate : IAggregateRoot + where TIdentity : IIdentity { - var domainEventList = domainEvents.ToList(); - if (!domainEventList.Any()) - { - return Enumerable.Empty(); - } + var eventUpgraders = _serviceProvider.GetService>>() + .OrderBy(u => u.GetType().Name) + .ToList(); - var eventUpgraders = domainEventList - .Select(d => d.AggregateType) - .Distinct() - .ToDictionary( - t => t, - t => - { - var cache = GetCache(t); - var upgraders = _serviceProvider.GetServices(cache.EventUpgraderType) - .OrderBy(u => u.GetType().Name) - .ToList(); - return new - { - EventUpgraders = upgraders, - cache.Upgrade - }; - }); - - if (!eventUpgraders.Any()) + await foreach (var domainEvent in domainEvents.WithCancellation(cancellationToken)) { - return Enumerable.Empty(); - } + var upgradeDomainEvents = new List>{domainEvent}; - if (_logger.IsEnabled(LogLevel.Trace)) - { - _logger.LogTrace( - "Upgrading {DomainEventCount} events and found these event upgraders to use: {EventUpgraderTypes}", - domainEventList.Count, - eventUpgraders.Values.SelectMany(a => a.EventUpgraders.Select(e => e.GetType().PrettyPrint())).ToList()); - } - - return domainEventList - .SelectMany(e => - { - var a = eventUpgraders[e.AggregateType]; - return a.EventUpgraders.Aggregate( - (IEnumerable) new[] {e}, - (de, up) => de.SelectMany(ee => a.Upgrade(up, ee))); - }) - .OrderBy(d => d.AggregateSequenceNumber); - } - - public IAsyncEnumerable> UpgradeAsync( - IAsyncEnumerable> domainEvents, CancellationToken cancellationToken) - where TAggregate : IAggregateRoot - where TIdentity : IIdentity - { - return Upgrade(domainEvents.Cast()).Cast>().ToList(); - } + foreach (var eventUpgrader in eventUpgraders) + { + var buffer = new List>(); - private static EventUpgraderCacheItem GetCache(Type aggregateType) - { - return EventUpgraderCacheItems.GetOrAdd( - aggregateType, - t => + foreach (var upgradeDomainEvent in upgradeDomainEvents) { - var aggregateRootInterface = t.GetTypeInfo().GetInterfaces().SingleOrDefault(i => i.GetTypeInfo().IsGenericType && i.GetGenericTypeDefinition() == typeof(IAggregateRoot<>)); - if (aggregateRootInterface == null) + await foreach (var upgradedDomainEvent in eventUpgrader.UpgradeAsync(upgradeDomainEvent, DummyContext.Instance, cancellationToken)) { - throw new ArgumentException($"Type '{t.PrettyPrint()}' is not a '{typeof(IAggregateRoot<>).PrettyPrint()}'", nameof(aggregateType)); + buffer.Add(upgradedDomainEvent); } + } - var arguments = aggregateRootInterface.GetTypeInfo().GetGenericArguments(); - var eventUpgraderType = typeof(IEventUpgrader<,>).MakeGenericType(t, arguments[0]); + upgradeDomainEvents = buffer; + } - var invokeUpgrade = ReflectionHelper.CompileMethodInvocation>>(eventUpgraderType, "Upgrade"); + foreach (var upgradeDomainEvent in upgradeDomainEvents) + { + yield return upgradeDomainEvent; + } + } + } - return new EventUpgraderCacheItem( - eventUpgraderType, - invokeUpgrade); - }); + private class DummyContext : IEventUpgradeContext + { + public static IEventUpgradeContext Instance { get; } = new DummyContext(); } } } From 4e9b92fddaea932a7380b4603c5bc49eeb9150e3 Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Sun, 6 Nov 2022 12:14:22 +0100 Subject: [PATCH 5/9] Implemented event upgraders --- .../EventUpgradeExplorationTest.cs | 2 +- .../EventStores/EventUpgradeManagerTests.cs | 6 +- .../EventStores/EventUpgradeManager.cs | 86 ++++++++++++++++--- Source/EventFlow/EventStores/EventUpgrader.cs | 27 +++++- .../EventFlow/EventStores/IEventUpgrader.cs | 10 ++- 5 files changed, 114 insertions(+), 17 deletions(-) diff --git a/Source/EventFlow.Tests/Exploration/EventUpgradeExplorationTest.cs b/Source/EventFlow.Tests/Exploration/EventUpgradeExplorationTest.cs index 38a231363..be09c9177 100644 --- a/Source/EventFlow.Tests/Exploration/EventUpgradeExplorationTest.cs +++ b/Source/EventFlow.Tests/Exploration/EventUpgradeExplorationTest.cs @@ -124,7 +124,7 @@ public class UpgradeEventV2 : IAggregateEvent { } - public class UpgradeV1ToV2 : EventUpgrader + public class UpgradeV1ToV2 : EventUpgraderNonAsync { private readonly IDomainEventFactory _domainEventFactory; diff --git a/Source/EventFlow.Tests/UnitTests/EventStores/EventUpgradeManagerTests.cs b/Source/EventFlow.Tests/UnitTests/EventStores/EventUpgradeManagerTests.cs index e66a0e58c..c70566a6c 100644 --- a/Source/EventFlow.Tests/UnitTests/EventStores/EventUpgradeManagerTests.cs +++ b/Source/EventFlow.Tests/UnitTests/EventStores/EventUpgradeManagerTests.cs @@ -114,7 +114,7 @@ public class TestEventV2 : AggregateEvent { } public class TestEventV3 : AggregateEvent { } public class DamagedEvent : AggregateEvent { } - public class UpgradeTestEventV1ToTestEventV2 : EventUpgrader + public class UpgradeTestEventV1ToTestEventV2 : EventUpgraderNonAsync { private readonly IDomainEventFactory _domainEventFactory; @@ -132,7 +132,7 @@ protected override IEnumerable> Upgrade( } } - public class UpgradeTestEventV2ToTestEventV3 : EventUpgrader + public class UpgradeTestEventV2ToTestEventV3 : EventUpgraderNonAsync { private readonly IDomainEventFactory _domainEventFactory; @@ -150,7 +150,7 @@ protected override IEnumerable> Upgrade( } } - public class DamagedEventRemover : EventUpgrader + public class DamagedEventRemover : EventUpgraderNonAsync { protected override IEnumerable> Upgrade( IDomainEvent domainEvent) diff --git a/Source/EventFlow/EventStores/EventUpgradeManager.cs b/Source/EventFlow/EventStores/EventUpgradeManager.cs index 16478f983..3c5045ca8 100644 --- a/Source/EventFlow/EventStores/EventUpgradeManager.cs +++ b/Source/EventFlow/EventStores/EventUpgradeManager.cs @@ -22,6 +22,8 @@ // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. using System; +using System.Collections; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Runtime.CompilerServices; @@ -29,6 +31,7 @@ using System.Threading.Tasks; using EventFlow.Aggregates; using EventFlow.Core; +using EventFlow.Extensions; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -48,11 +51,41 @@ public EventUpgradeManager( } - public IAsyncEnumerable UpgradeAsync( + public async IAsyncEnumerable UpgradeAsync( IAsyncEnumerable domainEvents, - CancellationToken cancellationToken) + [EnumeratorCancellation] CancellationToken cancellationToken) { - throw new NotImplementedException(); + var eventUpgradeContext = new EventUpgradeContext(); + + await foreach (var domainEvent in domainEvents.WithCancellation(cancellationToken)) + { + var upgradeDomainEvents = new List { domainEvent }; + if (!eventUpgradeContext.TryGet(domainEvent.AggregateType, out var eventUpgraders)) + { + eventUpgraders = ResolveUpgraders(domainEvent.AggregateType, domainEvent.IdentityType); + eventUpgradeContext.Add(domainEvent.AggregateType, eventUpgraders); + } + + foreach (var eventUpgrader in eventUpgraders) + { + var buffer = new List(); + + foreach (var upgradeDomainEvent in upgradeDomainEvents) + { + await foreach (var upgradedDomainEvent in eventUpgrader.UpgradeAsync(upgradeDomainEvent, eventUpgradeContext, cancellationToken)) + { + buffer.Add(upgradedDomainEvent); + } + } + + upgradeDomainEvents = buffer; + } + + foreach (var upgradeDomainEvent in upgradeDomainEvents) + { + yield return upgradeDomainEvent; + } + } } public async IAsyncEnumerable> UpgradeAsync( @@ -61,9 +94,9 @@ public async IAsyncEnumerable> UpgradeAsync< where TAggregate : IAggregateRoot where TIdentity : IIdentity { - var eventUpgraders = _serviceProvider.GetService>>() - .OrderBy(u => u.GetType().Name) - .ToList(); + var eventUpgradeContext = new EventUpgradeContext(); + var eventUpgraders = ResolveUpgraders(typeof(TAggregate), typeof(TIdentity)); + eventUpgradeContext.Add(typeof(TAggregate), eventUpgraders); await foreach (var domainEvent in domainEvents.WithCancellation(cancellationToken)) { @@ -75,9 +108,9 @@ public async IAsyncEnumerable> UpgradeAsync< foreach (var upgradeDomainEvent in upgradeDomainEvents) { - await foreach (var upgradedDomainEvent in eventUpgrader.UpgradeAsync(upgradeDomainEvent, DummyContext.Instance, cancellationToken)) + await foreach (var upgradedDomainEvent in eventUpgrader.UpgradeAsync(upgradeDomainEvent, eventUpgradeContext, cancellationToken)) { - buffer.Add(upgradedDomainEvent); + buffer.Add((IDomainEvent) upgradedDomainEvent); } } @@ -91,9 +124,42 @@ public async IAsyncEnumerable> UpgradeAsync< } } - private class DummyContext : IEventUpgradeContext + private IReadOnlyCollection ResolveUpgraders(Type aggregateType, Type identityType) { - public static IEventUpgradeContext Instance { get; } = new DummyContext(); + var type = typeof(IEventUpgrader<,>).MakeGenericType(aggregateType, identityType); + return _serviceProvider.GetServices(type) + .OrderBy(u => u.GetType().Name) + .Select(u => (IEventUpgrader)u) + .ToList(); + } + + private class EventUpgradeContext : IEventUpgradeContext + { + private readonly ConcurrentDictionary _eventUpgrades = new ConcurrentDictionary(); + + public bool TryGet(Type aggregateType, out IReadOnlyCollection upgraders) + { + if (!_eventUpgrades.TryGetValue(aggregateType, out var u)) + { + upgraders = null; + return false; + } + + upgraders = (IReadOnlyCollection)u; + return true; + } + + public void Add( + Type aggregateType, + IReadOnlyCollection upgraders) + { + if (!_eventUpgrades.TryAdd(aggregateType, upgraders)) + { + throw new ArgumentOutOfRangeException( + nameof(aggregateType), + $"Upgraders for {aggregateType.PrettyPrint()} already added"); + } + } } } } diff --git a/Source/EventFlow/EventStores/EventUpgrader.cs b/Source/EventFlow/EventStores/EventUpgrader.cs index 0651c4e9e..b92864c82 100644 --- a/Source/EventFlow/EventStores/EventUpgrader.cs +++ b/Source/EventFlow/EventStores/EventUpgrader.cs @@ -23,6 +23,7 @@ using System.Collections.Generic; using System.Runtime.CompilerServices; using System.Threading; +using System.Threading.Tasks; using EventFlow.Aggregates; using EventFlow.Core; @@ -30,14 +31,14 @@ namespace EventFlow.EventStores { - public abstract class EventUpgrader : IEventUpgrader + public abstract class EventUpgraderNonAsync : EventUpgrader where TAggregate : IAggregateRoot where TIdentity : IIdentity { protected abstract IEnumerable> Upgrade( IDomainEvent domainEvent); - public async IAsyncEnumerable> UpgradeAsync( + public override async IAsyncEnumerable> UpgradeAsync( IDomainEvent domainEvent, IEventUpgradeContext eventUpgradeContext, [EnumeratorCancellation] CancellationToken cancellationToken) @@ -48,4 +49,26 @@ public async IAsyncEnumerable> UpgradeAsync( } } } + + public abstract class EventUpgrader : IEventUpgrader + where TAggregate : IAggregateRoot + where TIdentity : IIdentity + { + public virtual async IAsyncEnumerable UpgradeAsync( + IDomainEvent domainEvent, + IEventUpgradeContext eventUpgradeContext, + [EnumeratorCancellation] CancellationToken cancellationToken) + { + var castDomainEvent = (IDomainEvent) domainEvent; + await foreach (var e in UpgradeAsync(castDomainEvent, eventUpgradeContext, cancellationToken).WithCancellation(cancellationToken)) + { + yield return e; + } + } + + public abstract IAsyncEnumerable> UpgradeAsync( + IDomainEvent domainEvent, + IEventUpgradeContext eventUpgradeContext, + CancellationToken cancellationToken); + } } diff --git a/Source/EventFlow/EventStores/IEventUpgrader.cs b/Source/EventFlow/EventStores/IEventUpgrader.cs index 15c575eeb..80002c6a7 100644 --- a/Source/EventFlow/EventStores/IEventUpgrader.cs +++ b/Source/EventFlow/EventStores/IEventUpgrader.cs @@ -28,7 +28,15 @@ namespace EventFlow.EventStores { - public interface IEventUpgrader + public interface IEventUpgrader + { + IAsyncEnumerable UpgradeAsync( + IDomainEvent domainEvent, + IEventUpgradeContext eventUpgradeContext, + CancellationToken cancellationToken); + } + + public interface IEventUpgrader : IEventUpgrader where TAggregate : IAggregateRoot where TIdentity : IIdentity { From f841014879a5838a1bf74f2609c117513d4dd912 Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Sun, 6 Nov 2022 12:24:21 +0100 Subject: [PATCH 6/9] Make the context a bit more useful --- .../EventStores/EventUpgradeContext.cs | 59 +++++++++++++++++++ .../EventStores/EventUpgradeManager.cs | 38 +----------- .../EventStores/IEventUpgradeContext.cs | 8 +++ 3 files changed, 70 insertions(+), 35 deletions(-) create mode 100644 Source/EventFlow/EventStores/EventUpgradeContext.cs diff --git a/Source/EventFlow/EventStores/EventUpgradeContext.cs b/Source/EventFlow/EventStores/EventUpgradeContext.cs new file mode 100644 index 000000000..782f72c84 --- /dev/null +++ b/Source/EventFlow/EventStores/EventUpgradeContext.cs @@ -0,0 +1,59 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2022 Rasmus Mikkelsen +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System; +using System.Collections; +using System.Collections.Concurrent; +using System.Collections.Generic; +using EventFlow.Extensions; + +namespace EventFlow.EventStores +{ + public class EventUpgradeContext : IEventUpgradeContext + { + protected ConcurrentDictionary EventUpgrades { get; } = new ConcurrentDictionary(); + + public virtual bool TryGetUpgraders(Type aggregateType, out IReadOnlyCollection upgraders) + { + if (!EventUpgrades.TryGetValue(aggregateType, out var u)) + { + upgraders = null; + return false; + } + + upgraders = (IReadOnlyCollection)u; + return true; + } + + public virtual void AddUpgraders( + Type aggregateType, + IReadOnlyCollection upgraders) + { + if (!EventUpgrades.TryAdd(aggregateType, upgraders)) + { + throw new ArgumentOutOfRangeException( + nameof(aggregateType), + $"Upgraders for {aggregateType.PrettyPrint()} already added"); + } + } + } +} \ No newline at end of file diff --git a/Source/EventFlow/EventStores/EventUpgradeManager.cs b/Source/EventFlow/EventStores/EventUpgradeManager.cs index 3c5045ca8..95b9c6c43 100644 --- a/Source/EventFlow/EventStores/EventUpgradeManager.cs +++ b/Source/EventFlow/EventStores/EventUpgradeManager.cs @@ -22,8 +22,6 @@ // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. using System; -using System.Collections; -using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Runtime.CompilerServices; @@ -31,7 +29,6 @@ using System.Threading.Tasks; using EventFlow.Aggregates; using EventFlow.Core; -using EventFlow.Extensions; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -60,10 +57,10 @@ public async IAsyncEnumerable UpgradeAsync( await foreach (var domainEvent in domainEvents.WithCancellation(cancellationToken)) { var upgradeDomainEvents = new List { domainEvent }; - if (!eventUpgradeContext.TryGet(domainEvent.AggregateType, out var eventUpgraders)) + if (!eventUpgradeContext.TryGetUpgraders(domainEvent.AggregateType, out var eventUpgraders)) { eventUpgraders = ResolveUpgraders(domainEvent.AggregateType, domainEvent.IdentityType); - eventUpgradeContext.Add(domainEvent.AggregateType, eventUpgraders); + eventUpgradeContext.AddUpgraders(domainEvent.AggregateType, eventUpgraders); } foreach (var eventUpgrader in eventUpgraders) @@ -96,7 +93,7 @@ public async IAsyncEnumerable> UpgradeAsync< { var eventUpgradeContext = new EventUpgradeContext(); var eventUpgraders = ResolveUpgraders(typeof(TAggregate), typeof(TIdentity)); - eventUpgradeContext.Add(typeof(TAggregate), eventUpgraders); + eventUpgradeContext.AddUpgraders(typeof(TAggregate), eventUpgraders); await foreach (var domainEvent in domainEvents.WithCancellation(cancellationToken)) { @@ -132,34 +129,5 @@ private IReadOnlyCollection ResolveUpgraders(Type aggregateType, .Select(u => (IEventUpgrader)u) .ToList(); } - - private class EventUpgradeContext : IEventUpgradeContext - { - private readonly ConcurrentDictionary _eventUpgrades = new ConcurrentDictionary(); - - public bool TryGet(Type aggregateType, out IReadOnlyCollection upgraders) - { - if (!_eventUpgrades.TryGetValue(aggregateType, out var u)) - { - upgraders = null; - return false; - } - - upgraders = (IReadOnlyCollection)u; - return true; - } - - public void Add( - Type aggregateType, - IReadOnlyCollection upgraders) - { - if (!_eventUpgrades.TryAdd(aggregateType, upgraders)) - { - throw new ArgumentOutOfRangeException( - nameof(aggregateType), - $"Upgraders for {aggregateType.PrettyPrint()} already added"); - } - } - } } } diff --git a/Source/EventFlow/EventStores/IEventUpgradeContext.cs b/Source/EventFlow/EventStores/IEventUpgradeContext.cs index ea1df4291..d11b18a45 100644 --- a/Source/EventFlow/EventStores/IEventUpgradeContext.cs +++ b/Source/EventFlow/EventStores/IEventUpgradeContext.cs @@ -20,9 +20,17 @@ // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +using System; +using System.Collections.Generic; + namespace EventFlow.EventStores { public interface IEventUpgradeContext { + bool TryGetUpgraders(Type aggregateType, out IReadOnlyCollection upgraders); + + void AddUpgraders( + Type aggregateType, + IReadOnlyCollection upgraders); } } From bd171c0dbf787caf26bd8d37b90fde7f5aa37406 Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Fri, 11 Nov 2022 10:59:21 +0100 Subject: [PATCH 7/9] More work --- RELEASE_NOTES.md | 6 + .../ThingyEmitUpgradableEventsCommand.cs | 63 +++++++ .../Aggregates/Events/ThingyPingEvent.cs | 4 +- .../Events/ThingyUpgradableV1Event.cs | 30 ++++ .../Events/ThingyUpgradableV2Event.cs | 30 ++++ .../Events/ThingyUpgradableV3Event.cs | 30 ++++ .../Aggregates/ThingyAggregate.cs | 43 ++++- .../ThingyUpgradableV1ToV2EventUpgrader.cs | 55 ++++++ .../ThingyUpgradableV2ToV3EventUpgrader.cs | 55 ++++++ .../Suites/TestSuiteForEventStore.cs | 162 +++++++++++------- ...ConcurrentInMemoryEventPersistanceTests.cs | 11 +- .../ReadStores/BaseReadModelTests.cs | 4 +- Source/EventFlow/EventFlowOptions.cs | 1 + .../EventFlow/EventStores/EventStoreBase.cs | 2 + .../EventStores/EventUpgradeContextFactory.cs | 36 ++++ .../EventStores/EventUpgradeManager.cs | 57 +++++- Source/EventFlow/EventStores/IEventStore.cs | 1 + .../EventStores/IEventUpgradeContext.cs | 4 +- .../IEventUpgradeContextFactory.cs | 33 ++++ .../EventStores/IEventUpgradeManager.cs | 1 + .../ReadStores/ReadModelPopulator.cs | 7 +- 21 files changed, 554 insertions(+), 81 deletions(-) create mode 100644 Source/EventFlow.TestHelpers/Aggregates/Commands/ThingyEmitUpgradableEventsCommand.cs create mode 100644 Source/EventFlow.TestHelpers/Aggregates/Events/ThingyUpgradableV1Event.cs create mode 100644 Source/EventFlow.TestHelpers/Aggregates/Events/ThingyUpgradableV2Event.cs create mode 100644 Source/EventFlow.TestHelpers/Aggregates/Events/ThingyUpgradableV3Event.cs create mode 100644 Source/EventFlow.TestHelpers/Aggregates/Upgraders/ThingyUpgradableV1ToV2EventUpgrader.cs create mode 100644 Source/EventFlow.TestHelpers/Aggregates/Upgraders/ThingyUpgradableV2ToV3EventUpgrader.cs create mode 100644 Source/EventFlow/EventStores/EventUpgradeContextFactory.cs create mode 100644 Source/EventFlow/EventStores/IEventUpgradeContextFactory.cs diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 62b2cb051..174089301 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -10,6 +10,12 @@ https://github.com/eventflow/EventFlow/blob/develop-v1/MIGRATION_GUIDE.md Changes since 1.0.5001-alpha +* New/breaking: `IEventUpgrader<,>` are now (finally) async +* Fix/breaking: Event upgraders are now used during read model population. As the upgraders + are re-used across multiple aggregates, there is a high likelihood that some additions are + needed in any existing upgraders. Upgraders are stored on the new `IEventUpgradeContext`, + which is created by the new `IEventUpgradeContextFactory`. Replace this if you need addition + context during event upgrades * Fix: `SnapshotAggregateRoot` now correctly loads previous source IDs as well adds the current source ID that triggered the snapshot. This causes the `DuplicateOperationException` to be correctly thrown if a duplicate source diff --git a/Source/EventFlow.TestHelpers/Aggregates/Commands/ThingyEmitUpgradableEventsCommand.cs b/Source/EventFlow.TestHelpers/Aggregates/Commands/ThingyEmitUpgradableEventsCommand.cs new file mode 100644 index 000000000..cf05c19d3 --- /dev/null +++ b/Source/EventFlow.TestHelpers/Aggregates/Commands/ThingyEmitUpgradableEventsCommand.cs @@ -0,0 +1,63 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2022 Rasmus Mikkelsen +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System.Threading; +using System.Threading.Tasks; +using EventFlow.Commands; + +namespace EventFlow.TestHelpers.Aggregates.Commands +{ + public class ThingyEmitUpgradableEventsCommand : Command + { + public int UpgradableEventV1Count { get; } + public int UpgradableEventV2Count { get; } + public int UpgradableEventV3Count { get; } + + public ThingyEmitUpgradableEventsCommand( + ThingyId aggregateId, + int upgradableEventV1Count, + int upgradableEventV2Count, + int upgradableEventV3Count) + : base(aggregateId) + { + UpgradableEventV1Count = upgradableEventV1Count; + UpgradableEventV2Count = upgradableEventV2Count; + UpgradableEventV3Count = upgradableEventV3Count; + } + } + + public class ThingyEmitUpgradableEventsCommandHandler : CommandHandler + { + public override Task ExecuteAsync( + ThingyAggregate aggregate, + ThingyEmitUpgradableEventsCommand command, + CancellationToken cancellationToken) + { + aggregate.EmitUpgradableEvents( + command.UpgradableEventV1Count, + command.UpgradableEventV2Count, + command.UpgradableEventV3Count); + + return Task.CompletedTask; + } + } +} diff --git a/Source/EventFlow.TestHelpers/Aggregates/Events/ThingyPingEvent.cs b/Source/EventFlow.TestHelpers/Aggregates/Events/ThingyPingEvent.cs index d71a85d19..3e4fce1a3 100644 --- a/Source/EventFlow.TestHelpers/Aggregates/Events/ThingyPingEvent.cs +++ b/Source/EventFlow.TestHelpers/Aggregates/Events/ThingyPingEvent.cs @@ -1,6 +1,6 @@ // The MIT License (MIT) // -// Copyright (c) 2015-2021 Rasmus Mikkelsen +// Copyright (c) 2015-2022 Rasmus Mikkelsen // Copyright (c) 2015-2021 eBay Software Foundation // https://github.com/eventflow/EventFlow // @@ -30,7 +30,7 @@ namespace EventFlow.TestHelpers.Aggregates.Events [EventVersion("ThingyPing", 1)] public class ThingyPingEvent : AggregateEvent { - public PingId PingId { get; private set; } + public PingId PingId { get; } public ThingyPingEvent(PingId pingId) { diff --git a/Source/EventFlow.TestHelpers/Aggregates/Events/ThingyUpgradableV1Event.cs b/Source/EventFlow.TestHelpers/Aggregates/Events/ThingyUpgradableV1Event.cs new file mode 100644 index 000000000..9970df9bb --- /dev/null +++ b/Source/EventFlow.TestHelpers/Aggregates/Events/ThingyUpgradableV1Event.cs @@ -0,0 +1,30 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2022 Rasmus Mikkelsen +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using EventFlow.Aggregates; + +namespace EventFlow.TestHelpers.Aggregates.Events +{ + public class ThingyUpgradableV1Event : AggregateEvent + { + } +} diff --git a/Source/EventFlow.TestHelpers/Aggregates/Events/ThingyUpgradableV2Event.cs b/Source/EventFlow.TestHelpers/Aggregates/Events/ThingyUpgradableV2Event.cs new file mode 100644 index 000000000..f2843a44f --- /dev/null +++ b/Source/EventFlow.TestHelpers/Aggregates/Events/ThingyUpgradableV2Event.cs @@ -0,0 +1,30 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2022 Rasmus Mikkelsen +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using EventFlow.Aggregates; + +namespace EventFlow.TestHelpers.Aggregates.Events +{ + public class ThingyUpgradableV2Event : AggregateEvent + { + } +} diff --git a/Source/EventFlow.TestHelpers/Aggregates/Events/ThingyUpgradableV3Event.cs b/Source/EventFlow.TestHelpers/Aggregates/Events/ThingyUpgradableV3Event.cs new file mode 100644 index 000000000..215aeddd6 --- /dev/null +++ b/Source/EventFlow.TestHelpers/Aggregates/Events/ThingyUpgradableV3Event.cs @@ -0,0 +1,30 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2022 Rasmus Mikkelsen +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using EventFlow.Aggregates; + +namespace EventFlow.TestHelpers.Aggregates.Events +{ + public class ThingyUpgradableV3Event : AggregateEvent + { + } +} diff --git a/Source/EventFlow.TestHelpers/Aggregates/ThingyAggregate.cs b/Source/EventFlow.TestHelpers/Aggregates/ThingyAggregate.cs index 6606f9934..e979139d6 100644 --- a/Source/EventFlow.TestHelpers/Aggregates/ThingyAggregate.cs +++ b/Source/EventFlow.TestHelpers/Aggregates/ThingyAggregate.cs @@ -41,7 +41,10 @@ namespace EventFlow.TestHelpers.Aggregates [AggregateName("Thingy")] public class ThingyAggregate : SnapshotAggregateRoot, IEmit, - IEmit + IEmit, + IEmit, + IEmit, + IEmit { // ReSharper disable once NotAccessedField.Local private readonly IScopedContext _scopedContext; @@ -58,6 +61,10 @@ public class ThingyAggregate : SnapshotAggregateRoot SnapshotVersions { get; private set; } = new ThingySnapshotVersion[] {}; public bool IsDeleted { get; private set; } + public int UpgradableEventV1Received { get; private set; } + public int UpgradableEventV2Received { get; private set; } + public int UpgradableEventV3Received { get; private set; } + public ThingyAggregate(ThingyId id, IScopedContext scopedContext) : base(id, SnapshotEveryFewVersionsStrategy.With(SnapshotEveryVersion)) { @@ -130,11 +137,45 @@ public void RequestSagaException() Emit(new ThingySagaExceptionRequestedEvent()); } + public void EmitUpgradableEvents( + int upgradableEventV1Count, + int upgradableEventV2Count, + int upgradableEventV3Count) + { + void EmitCount(int count) + where T : IAggregateEvent, new() + { + for (var i = 0; i < count; i++) + { + Emit(new T()); + } + } + + EmitCount(upgradableEventV1Count); + EmitCount(upgradableEventV2Count); + EmitCount(upgradableEventV3Count); + } + public void Apply(ThingyDomainErrorAfterFirstEvent e) { DomainErrorAfterFirstReceived = true; } + public void Apply(ThingyUpgradableV1Event aggregateEvent) + { + UpgradableEventV1Received++; + } + + public void Apply(ThingyUpgradableV2Event aggregateEvent) + { + UpgradableEventV2Received++; + } + + public void Apply(ThingyUpgradableV3Event _) + { + UpgradableEventV3Received++; + } + void IEmit.Apply(ThingyDeletedEvent e) { IsDeleted = true; diff --git a/Source/EventFlow.TestHelpers/Aggregates/Upgraders/ThingyUpgradableV1ToV2EventUpgrader.cs b/Source/EventFlow.TestHelpers/Aggregates/Upgraders/ThingyUpgradableV1ToV2EventUpgrader.cs new file mode 100644 index 000000000..dd1655c04 --- /dev/null +++ b/Source/EventFlow.TestHelpers/Aggregates/Upgraders/ThingyUpgradableV1ToV2EventUpgrader.cs @@ -0,0 +1,55 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2022 Rasmus Mikkelsen +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System.Collections.Generic; +using EventFlow.Aggregates; +using EventFlow.EventStores; +using EventFlow.TestHelpers.Aggregates.Events; + +namespace EventFlow.TestHelpers.Aggregates.Upgraders +{ + public class ThingyUpgradableV1ToV2EventUpgrader : EventUpgraderNonAsync + { + private readonly IDomainEventFactory _domainEventFactory; + + public ThingyUpgradableV1ToV2EventUpgrader( + IDomainEventFactory domainEventFactory) + { + _domainEventFactory = domainEventFactory; + } + + protected override IEnumerable> Upgrade( + IDomainEvent domainEvent) + { + if (!(domainEvent is IDomainEvent _)) + { + yield return domainEvent; + } + else + { + yield return _domainEventFactory.Upgrade( + domainEvent, + new ThingyUpgradableV2Event()); + } + } + } +} diff --git a/Source/EventFlow.TestHelpers/Aggregates/Upgraders/ThingyUpgradableV2ToV3EventUpgrader.cs b/Source/EventFlow.TestHelpers/Aggregates/Upgraders/ThingyUpgradableV2ToV3EventUpgrader.cs new file mode 100644 index 000000000..c1f108707 --- /dev/null +++ b/Source/EventFlow.TestHelpers/Aggregates/Upgraders/ThingyUpgradableV2ToV3EventUpgrader.cs @@ -0,0 +1,55 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2022 Rasmus Mikkelsen +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System.Collections.Generic; +using EventFlow.Aggregates; +using EventFlow.EventStores; +using EventFlow.TestHelpers.Aggregates.Events; + +namespace EventFlow.TestHelpers.Aggregates.Upgraders +{ + public class ThingyUpgradableV2ToV3EventUpgrader : EventUpgraderNonAsync + { + private readonly IDomainEventFactory _domainEventFactory; + + public ThingyUpgradableV2ToV3EventUpgrader( + IDomainEventFactory domainEventFactory) + { + _domainEventFactory = domainEventFactory; + } + + protected override IEnumerable> Upgrade( + IDomainEvent domainEvent) + { + if (!(domainEvent is IDomainEvent _)) + { + yield return domainEvent; + } + else + { + yield return _domainEventFactory.Upgrade( + domainEvent, + new ThingyUpgradableV3Event()); + } + } + } +} diff --git a/Source/EventFlow.TestHelpers/Suites/TestSuiteForEventStore.cs b/Source/EventFlow.TestHelpers/Suites/TestSuiteForEventStore.cs index 2fe8c6786..1fa7161e4 100644 --- a/Source/EventFlow.TestHelpers/Suites/TestSuiteForEventStore.cs +++ b/Source/EventFlow.TestHelpers/Suites/TestSuiteForEventStore.cs @@ -1,6 +1,6 @@ // The MIT License (MIT) // -// Copyright (c) 2015-2021 Rasmus Mikkelsen +// Copyright (c) 2015-2022 Rasmus Mikkelsen // Copyright (c) 2015-2021 eBay Software Foundation // https://github.com/eventflow/EventFlow // @@ -54,7 +54,7 @@ public abstract class TestSuiteForEventStore : IntegrationTest public async Task NewAggregateCanBeLoaded() { // Act - var testAggregate = await LoadAggregateAsync(ThingyId.New).ConfigureAwait(false); + var testAggregate = await LoadAggregateAsync(ThingyId.New); // Assert testAggregate.Should().NotBeNull(); @@ -66,11 +66,11 @@ public async Task EventsCanBeStored() { // Arrange var id = ThingyId.New; - var testAggregate = await LoadAggregateAsync(id).ConfigureAwait(false); + var testAggregate = await LoadAggregateAsync(id); testAggregate.Ping(PingId.New); // Act - var domainEvents = await testAggregate.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None).ConfigureAwait(false); + var domainEvents = await testAggregate.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None); // Assert domainEvents.Count.Should().Be(1); @@ -90,12 +90,12 @@ public async Task AggregatesCanBeLoaded() { // Arrange var id = ThingyId.New; - var testAggregate = await LoadAggregateAsync(id).ConfigureAwait(false); + var testAggregate = await LoadAggregateAsync(id); testAggregate.Ping(PingId.New); - await testAggregate.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None).ConfigureAwait(false); + await testAggregate.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None); // Act - var loadedTestAggregate = await LoadAggregateAsync(id).ConfigureAwait(false); + var loadedTestAggregate = await LoadAggregateAsync(id); // Assert loadedTestAggregate.Should().NotBeNull(); @@ -109,14 +109,14 @@ public async Task EventsCanContainUnicodeCharacters() { // Arrange var id = ThingyId.New; - var testAggregate = await LoadAggregateAsync(id).ConfigureAwait(false); + var testAggregate = await LoadAggregateAsync(id); var message = new ThingyMessage(ThingyMessageId.New, "😉"); testAggregate.AddMessage(message); - await testAggregate.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None).ConfigureAwait(false); + await testAggregate.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None); // Act - var loadedTestAggregate = await LoadAggregateAsync(id).ConfigureAwait(false); + var loadedTestAggregate = await LoadAggregateAsync(id); // Assert loadedTestAggregate.Messages.Single().Message.Should().Be("😉"); @@ -128,17 +128,17 @@ public async Task AggregateEventStreamsAreSeperate() // Arrange var id1 = ThingyId.New; var id2 = ThingyId.New; - var aggregate1 = await LoadAggregateAsync(id1).ConfigureAwait(false); - var aggregate2 = await LoadAggregateAsync(id2).ConfigureAwait(false); + var aggregate1 = await LoadAggregateAsync(id1); + var aggregate2 = await LoadAggregateAsync(id2); aggregate1.Ping(PingId.New); aggregate2.Ping(PingId.New); aggregate2.Ping(PingId.New); // Act - await aggregate1.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None).ConfigureAwait(false); - await aggregate2.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None).ConfigureAwait(false); - aggregate1 = await LoadAggregateAsync(id1).ConfigureAwait(false); - aggregate2 = await LoadAggregateAsync(id2).ConfigureAwait(false); + await aggregate1.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None); + await aggregate2.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None); + aggregate1 = await LoadAggregateAsync(id1); + aggregate2 = await LoadAggregateAsync(id2); // Assert aggregate1.Version.Should().Be(1); @@ -153,15 +153,19 @@ public async Task DomainEventCanBeLoaded() var id2 = ThingyId.New; var pingId1 = PingId.New; var pingId2 = PingId.New; - var aggregate1 = await LoadAggregateAsync(id1).ConfigureAwait(false); - var aggregate2 = await LoadAggregateAsync(id2).ConfigureAwait(false); + var aggregate1 = await LoadAggregateAsync(id1); + var aggregate2 = await LoadAggregateAsync(id2); aggregate1.Ping(pingId1); aggregate2.Ping(pingId2); - await aggregate1.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None).ConfigureAwait(false); - await aggregate2.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None).ConfigureAwait(false); + await aggregate1.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None); + await aggregate2.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None); // Act - var domainEvents = await EventStore.LoadAllEventsAsync(GlobalPosition.Start, 200, CancellationToken.None).ConfigureAwait(false); + var domainEvents = await EventStore.LoadAllEventsAsync( + GlobalPosition.Start, + 200, + new EventUpgradeContext(), + CancellationToken.None); // Assert domainEvents.DomainEvents.Count.Should().BeGreaterOrEqualTo(2); @@ -172,10 +176,10 @@ public async Task LoadingOfEventsCanStartLater() { // Arrange var id = ThingyId.New; - await PublishPingCommandsAsync(id, 5).ConfigureAwait(false); + await PublishPingCommandsAsync(id, 5); // Act - var domainEvents = await EventStore.LoadEventsAsync(id, 3, CancellationToken.None).ConfigureAwait(false); + var domainEvents = await EventStore.LoadEventsAsync(id, 3, CancellationToken.None); // Assert domainEvents.Should().HaveCount(3); @@ -191,38 +195,58 @@ public async Task AggregateCanHaveMultipleCommits() var id = ThingyId.New; // Act - var aggregate = await LoadAggregateAsync(id).ConfigureAwait(false); + var aggregate = await LoadAggregateAsync(id); aggregate.Ping(PingId.New); - await aggregate.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None).ConfigureAwait(false); - aggregate = await LoadAggregateAsync(id).ConfigureAwait(false); + await aggregate.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None); + aggregate = await LoadAggregateAsync(id); aggregate.Ping(PingId.New); - await aggregate.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None).ConfigureAwait(false); - aggregate = await LoadAggregateAsync(id).ConfigureAwait(false); + await aggregate.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None); + aggregate = await LoadAggregateAsync(id); // Assert aggregate.PingsReceived.Count.Should().Be(2); } + [Test] + public async Task EventsAreUpgraded() + { + // Arrange + var id = ThingyId.New; + const int version1 = 3; + const int version2 = 5; + const int version3 = 1; + + // Act + await CommandBus.PublishAsync( + new ThingyEmitUpgradableEventsCommand(id, version1, version2, version3)); + + // Assert + var aggregate = await LoadAggregateAsync(id); + aggregate.UpgradableEventV1Received.Should().Be(0); + aggregate.UpgradableEventV2Received.Should().Be(0); + aggregate.UpgradableEventV3Received.Should().Be(version1 + version2 + version3); + } + [Test] public async Task AggregateEventStreamsCanBeDeleted() { // Arrange var id1 = ThingyId.New; var id2 = ThingyId.New; - var aggregate1 = await LoadAggregateAsync(id1).ConfigureAwait(false); - var aggregate2 = await LoadAggregateAsync(id2).ConfigureAwait(false); + var aggregate1 = await LoadAggregateAsync(id1); + var aggregate2 = await LoadAggregateAsync(id2); aggregate1.Ping(PingId.New); aggregate2.Ping(PingId.New); aggregate2.Ping(PingId.New); - await aggregate1.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None).ConfigureAwait(false); - await aggregate2.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None).ConfigureAwait(false); + await aggregate1.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None); + await aggregate2.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None); // Act - await EventStore.DeleteAggregateAsync(id2, CancellationToken.None).ConfigureAwait(false); + await EventStore.DeleteAggregateAsync(id2, CancellationToken.None); // Assert - aggregate1 = await LoadAggregateAsync(id1).ConfigureAwait(false); - aggregate2 = await LoadAggregateAsync(id2).ConfigureAwait(false); + aggregate1 = await LoadAggregateAsync(id1); + aggregate2 = await LoadAggregateAsync(id2); aggregate1.Version.Should().Be(1); aggregate2.Version.Should().Be(0); } @@ -232,10 +256,10 @@ public async Task NoEventsEmittedIsOk() { // Arrange var id = ThingyId.New; - var aggregate = await LoadAggregateAsync(id).ConfigureAwait(false); + var aggregate = await LoadAggregateAsync(id); // Act - await aggregate.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None).ConfigureAwait(false); + await aggregate.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None); } [Test] @@ -243,12 +267,16 @@ public async Task NextPositionIsIdOfNextEvent() { // Arrange var id = ThingyId.New; - var aggregate = await LoadAggregateAsync(id).ConfigureAwait(false); + var aggregate = await LoadAggregateAsync(id); aggregate.Ping(PingId.New); - await aggregate.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None).ConfigureAwait(false); + await aggregate.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None); // Act - var domainEvents = await EventStore.LoadAllEventsAsync(GlobalPosition.Start, 10, CancellationToken.None).ConfigureAwait(false); + var domainEvents = await EventStore.LoadAllEventsAsync( + GlobalPosition.Start, + 10, + new EventUpgradeContext(), + CancellationToken.None); // Assert domainEvents.NextGlobalPosition.Value.Should().NotBe(string.Empty); @@ -260,14 +288,18 @@ public async Task LoadingFirstPageShouldLoadCorrectEvents() // Arrange var id = ThingyId.New; var pingIds = new[] {PingId.New, PingId.New, PingId.New}; - var aggregate = await LoadAggregateAsync(id).ConfigureAwait(false); + var aggregate = await LoadAggregateAsync(id); aggregate.Ping(pingIds[0]); aggregate.Ping(pingIds[1]); aggregate.Ping(pingIds[2]); - await aggregate.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None).ConfigureAwait(false); + await aggregate.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None); // Act - var domainEvents = await EventStore.LoadAllEventsAsync(GlobalPosition.Start, 200, CancellationToken.None).ConfigureAwait(false); + var domainEvents = await EventStore.LoadAllEventsAsync( + GlobalPosition.Start, + 200, + new EventUpgradeContext(), + CancellationToken.None); // Assert domainEvents.DomainEvents.OfType>().Should().Contain(e => e.AggregateEvent.PingId == pingIds[0]); @@ -279,15 +311,15 @@ public async Task OptimisticConcurrency() { // Arrange var id = ThingyId.New; - var aggregate1 = await LoadAggregateAsync(id).ConfigureAwait(false); - var aggregate2 = await LoadAggregateAsync(id).ConfigureAwait(false); + var aggregate1 = await LoadAggregateAsync(id); + var aggregate2 = await LoadAggregateAsync(id); aggregate1.DomainErrorAfterFirst(); aggregate2.DomainErrorAfterFirst(); // Act - await aggregate1.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None).ConfigureAwait(false); - await ThrowsExceptionAsync(() => aggregate2.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None)).ConfigureAwait(false); + await aggregate1.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None); + await ThrowsExceptionAsync(() => aggregate2.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None)); } [Test] @@ -297,21 +329,21 @@ public async Task AggregatesCanUpdatedAfterOptimisticConcurrency() var id = ThingyId.New; var pingId1 = PingId.New; var pingId2 = PingId.New; - var aggregate1 = await LoadAggregateAsync(id).ConfigureAwait(false); - var aggregate2 = await LoadAggregateAsync(id).ConfigureAwait(false); + var aggregate1 = await LoadAggregateAsync(id); + var aggregate2 = await LoadAggregateAsync(id); aggregate1.Ping(pingId1); aggregate2.Ping(pingId2); - await aggregate1.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None).ConfigureAwait(false); - await ThrowsExceptionAsync(() => aggregate2.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None)).ConfigureAwait(false); + await aggregate1.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None); + await ThrowsExceptionAsync(() => aggregate2.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None)); // Act - aggregate1 = await LoadAggregateAsync(id).ConfigureAwait(false); + aggregate1 = await LoadAggregateAsync(id); aggregate1.PingsReceived.Single().Should().Be(pingId1); aggregate1.Ping(pingId2); - await aggregate1.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None).ConfigureAwait(false); + await aggregate1.CommitAsync(EventStore, SnapshotStore, SourceId.New, CancellationToken.None); // Assert - aggregate1 = await LoadAggregateAsync(id).ConfigureAwait(false); + aggregate1 = await LoadAggregateAsync(id); aggregate1.PingsReceived.Should().BeEquivalentTo(new[] {pingId1, pingId2}); } @@ -329,18 +361,18 @@ public async Task MultipleScopes() var commandBus = serviceScope.ServiceProvider.GetRequiredService(); await commandBus.PublishAsync( new ThingyPingCommand(id, pingId1)) - .ConfigureAwait(false); + ; } using (var serviceScope = ServiceProvider.CreateScope()) { var commandBus = serviceScope.ServiceProvider.GetRequiredService(); await commandBus.PublishAsync( new ThingyPingCommand(id, pingId2)) - .ConfigureAwait(false); + ; } // Assert - var aggregate = await LoadAggregateAsync(id).ConfigureAwait(false); + var aggregate = await LoadAggregateAsync(id); aggregate.PingsReceived.Should().BeEquivalentTo(new []{pingId1, pingId2}); } @@ -354,7 +386,7 @@ public async Task PublishedDomainEventsHaveAggregateSequenceNumbers() // Act await CommandBus.PublishAsync( new ThingyMultiplePingsCommand(id, pingIds)) - .ConfigureAwait(false); + ; // Assert PublishedDomainEvents.Count.Should().Be(10); @@ -369,13 +401,13 @@ public async Task PublishedDomainEventsContinueAggregateSequenceNumbers() var pingIds = Many(10); await CommandBus.PublishAsync( new ThingyMultiplePingsCommand(id, pingIds)) - .ConfigureAwait(false); + ; _publishedDomainEvents.Clear(); // Act await CommandBus.PublishAsync( new ThingyMultiplePingsCommand(id, pingIds)) - .ConfigureAwait(false); + ; // Assert PublishedDomainEvents.Count.Should().Be(10); @@ -393,19 +425,19 @@ public virtual async Task LoadAllEventsAsyncFindsEventsAfterLargeGaps() foreach (var id in ids) { var command = new ThingyPingCommand(id, PingId.New); - await CommandBus.PublishAsync(command).ConfigureAwait(false); + await CommandBus.PublishAsync(command); } foreach (var id in ids.Skip(1).Take(5)) { await EventPersistence.DeleteEventsAsync(id, CancellationToken.None) - .ConfigureAwait(false); + ; } // Act var result = await EventStore - .LoadAllEventsAsync(GlobalPosition.Start, 5, CancellationToken.None) - .ConfigureAwait(false); + .LoadAllEventsAsync(GlobalPosition.Start, 5, new EventUpgradeContext(), CancellationToken.None) + ; // Assert result.DomainEvents.Should().HaveCount(5); @@ -437,7 +469,7 @@ private static async Task ThrowsExceptionAsync(Func action) try { - await action().ConfigureAwait(false); + await action(); } catch (Exception e) { diff --git a/Source/EventFlow.Tests/UnitTests/EventStores/ConcurrentInMemoryEventPersistanceTests.cs b/Source/EventFlow.Tests/UnitTests/EventStores/ConcurrentInMemoryEventPersistanceTests.cs index ee4b44168..a36d21fb0 100644 --- a/Source/EventFlow.Tests/UnitTests/EventStores/ConcurrentInMemoryEventPersistanceTests.cs +++ b/Source/EventFlow.Tests/UnitTests/EventStores/ConcurrentInMemoryEventPersistanceTests.cs @@ -1,6 +1,6 @@ // The MIT License (MIT) // -// Copyright (c) 2015-2021 Rasmus Mikkelsen +// Copyright (c) 2015-2022 Rasmus Mikkelsen // Copyright (c) 2015-2021 eBay Software Foundation // https://github.com/eventflow/EventFlow // @@ -39,7 +39,6 @@ using EventFlow.TestHelpers.Aggregates.Events; using EventFlow.TestHelpers.Aggregates.ValueObjects; using FluentAssertions; -using Microsoft.Extensions.Logging; using NUnit.Framework; namespace EventFlow.Tests.UnitTests.EventStores @@ -71,7 +70,11 @@ public async Task MultipleInstances() Task.WaitAll(tasks.ToArray()); // Assert - var allEvents = await store.LoadAllEventsAsync(GlobalPosition.Start, Int32.MaxValue, CancellationToken.None); + var allEvents = await store.LoadAllEventsAsync( + GlobalPosition.Start, + int.MaxValue, + new EventUpgradeContext(), + CancellationToken.None); allEvents.DomainEvents.Count.Should().Be(NumberOfEvents * DegreeOfParallelism); } @@ -83,7 +86,7 @@ private EventStoreBase CreateStore() var snapshotStore = Mock(); var factory = new DomainEventFactory(); var persistence = new InMemoryEventPersistence(Logger()); - var upgradeManager = new EventUpgradeManager(Logger(), serviceProvider); + var upgradeManager = new EventUpgradeManager(Logger(), serviceProvider, new EventUpgradeContextFactory()); var definitionService = new EventDefinitionService(Logger(), Mock()); definitionService.Load(typeof(ThingyPingEvent)); var serializer = new EventJsonSerializer(new JsonSerializer(), definitionService, factory); diff --git a/Source/EventFlow.Tests/UnitTests/ReadStores/BaseReadModelTests.cs b/Source/EventFlow.Tests/UnitTests/ReadStores/BaseReadModelTests.cs index 73de9888b..e2c543222 100644 --- a/Source/EventFlow.Tests/UnitTests/ReadStores/BaseReadModelTests.cs +++ b/Source/EventFlow.Tests/UnitTests/ReadStores/BaseReadModelTests.cs @@ -53,6 +53,8 @@ public abstract class BaseReadModelTests : TestsFor(new EventUpgradeContextFactory()); + _eventStoreMock = InjectMock(); _eventStoreData = null; _serviceProviderMock = InjectMock(); @@ -72,7 +74,7 @@ public void SetUp() .Returns(ReadModelPageSize); _eventStoreMock - .Setup(s => s.LoadAllEventsAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .Setup(s => s.LoadAllEventsAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) .Returns((s, p, c) => Task.FromResult(GetEvents(s, p))); _readStoreManagerMock .Setup(m => m.ReadModelType) diff --git a/Source/EventFlow/EventFlowOptions.cs b/Source/EventFlow/EventFlowOptions.cs index f256b459d..a99fa10fd 100644 --- a/Source/EventFlow/EventFlowOptions.cs +++ b/Source/EventFlow/EventFlowOptions.cs @@ -182,6 +182,7 @@ private void RegisterDefaults(IServiceCollection serviceCollection) serviceCollection.TryAddTransient(); serviceCollection.TryAddTransient(); + serviceCollection.TryAddSingleton(); serviceCollection.TryAddSingleton(); serviceCollection.TryAddTransient(); serviceCollection.TryAddTransient(); diff --git a/Source/EventFlow/EventStores/EventStoreBase.cs b/Source/EventFlow/EventStores/EventStoreBase.cs index 17e08134d..286904c53 100644 --- a/Source/EventFlow/EventStores/EventStoreBase.cs +++ b/Source/EventFlow/EventStores/EventStoreBase.cs @@ -119,6 +119,7 @@ public virtual async Task LoadAllEventsAsync( GlobalPosition globalPosition, int pageSize, + IEventUpgradeContext eventUpgradeContext, CancellationToken cancellationToken) { if (pageSize <= 0) throw new ArgumentOutOfRangeException(nameof(pageSize)); @@ -135,6 +136,7 @@ public async Task LoadAllEventsAsync( // TODO: Pass a real IAsyncEnumerable instead domainEvents = await _eventUpgradeManager.UpgradeAsync( domainEvents.ToAsyncEnumerable(), + eventUpgradeContext, cancellationToken).ToArrayAsync(cancellationToken); return new AllEventsPage(allCommittedEventsPage.NextGlobalPosition, domainEvents); diff --git a/Source/EventFlow/EventStores/EventUpgradeContextFactory.cs b/Source/EventFlow/EventStores/EventUpgradeContextFactory.cs new file mode 100644 index 000000000..8d3887676 --- /dev/null +++ b/Source/EventFlow/EventStores/EventUpgradeContextFactory.cs @@ -0,0 +1,36 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2022 Rasmus Mikkelsen +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System.Threading; +using System.Threading.Tasks; + +namespace EventFlow.EventStores +{ + public class EventUpgradeContextFactory : IEventUpgradeContextFactory + { + public Task CreateAsync( + CancellationToken _) + { + return Task.FromResult(new EventUpgradeContext()); + } + } +} diff --git a/Source/EventFlow/EventStores/EventUpgradeManager.cs b/Source/EventFlow/EventStores/EventUpgradeManager.cs index 95b9c6c43..dbda74887 100644 --- a/Source/EventFlow/EventStores/EventUpgradeManager.cs +++ b/Source/EventFlow/EventStores/EventUpgradeManager.cs @@ -29,6 +29,7 @@ using System.Threading.Tasks; using EventFlow.Aggregates; using EventFlow.Core; +using EventFlow.Extensions; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -38,22 +39,23 @@ public class EventUpgradeManager : IEventUpgradeManager { private readonly ILogger _logger; private readonly IServiceProvider _serviceProvider; + private readonly IEventUpgradeContextFactory _eventUpgradeContextFactory; public EventUpgradeManager( ILogger logger, - IServiceProvider serviceProvider) + IServiceProvider serviceProvider, + IEventUpgradeContextFactory eventUpgradeContextFactory) { _logger = logger; _serviceProvider = serviceProvider; + _eventUpgradeContextFactory = eventUpgradeContextFactory; } - public async IAsyncEnumerable UpgradeAsync( IAsyncEnumerable domainEvents, + IEventUpgradeContext eventUpgradeContext, [EnumeratorCancellation] CancellationToken cancellationToken) { - var eventUpgradeContext = new EventUpgradeContext(); - await foreach (var domainEvent in domainEvents.WithCancellation(cancellationToken)) { var upgradeDomainEvents = new List { domainEvent }; @@ -91,7 +93,7 @@ public async IAsyncEnumerable> UpgradeAsync< where TAggregate : IAggregateRoot where TIdentity : IIdentity { - var eventUpgradeContext = new EventUpgradeContext(); + var eventUpgradeContext = await _eventUpgradeContextFactory.CreateAsync(cancellationToken); var eventUpgraders = ResolveUpgraders(typeof(TAggregate), typeof(TIdentity)); eventUpgradeContext.AddUpgraders(typeof(TAggregate), eventUpgraders); @@ -103,6 +105,14 @@ public async IAsyncEnumerable> UpgradeAsync< { var buffer = new List>(); + if (_logger.IsEnabled(LogLevel.Trace)) + { + _logger.LogTrace( + "Using upgrader {EventUpgraderType} to upgrade {DomainEventType}", + eventUpgrader.GetType().PrettyPrint(), + domainEvent.GetType().PrettyPrint()); + } + foreach (var upgradeDomainEvent in upgradeDomainEvents) { await foreach (var upgradedDomainEvent in eventUpgrader.UpgradeAsync(upgradeDomainEvent, eventUpgradeContext, cancellationToken)) @@ -111,6 +121,41 @@ public async IAsyncEnumerable> UpgradeAsync< } } + if (_logger.IsEnabled(LogLevel.Trace)) + { + if (buffer.Count == 0) + { + _logger.LogTrace( + "Event upgrader {EventUpgraderType} removed the {DomainEventType} from the history!", + eventUpgrader.GetType().PrettyPrint(), + domainEvent.EventType.PrettyPrint()); + } + else if (buffer.Count == 1 && ReferenceEquals(buffer[0], domainEvent)) + { + _logger.LogTrace( + "Event upgrader {EventUpgraderType} did not do anything to {DomainEventType}", + eventUpgrader.GetType().PrettyPrint(), + domainEvent.EventType.PrettyPrint()); + } + else if (buffer.Count == 1) + { + _logger.LogTrace( + "Event upgrader {EventUpgraderType} upgraded {DomainEventType} to {UpgradedDomainEventType}", + eventUpgrader.GetType().PrettyPrint(), + domainEvent.EventType.PrettyPrint(), + buffer[0].EventType.PrettyPrint()); + } + else + { + var prettyNames = buffer.Select(e => e.EventType.PrettyPrint()).ToArray(); + _logger.LogTrace( + "Event upgrader {EventUpgraderType} upgraded {DomainEventType} to the following events {UpgradedDomainEventTypes}", + eventUpgrader.GetType().PrettyPrint(), + domainEvent.EventType.PrettyPrint(), + prettyNames); + } + } + upgradeDomainEvents = buffer; } @@ -121,7 +166,7 @@ public async IAsyncEnumerable> UpgradeAsync< } } - private IReadOnlyCollection ResolveUpgraders(Type aggregateType, Type identityType) + protected virtual IReadOnlyCollection ResolveUpgraders(Type aggregateType, Type identityType) { var type = typeof(IEventUpgrader<,>).MakeGenericType(aggregateType, identityType); return _serviceProvider.GetServices(type) diff --git a/Source/EventFlow/EventStores/IEventStore.cs b/Source/EventFlow/EventStores/IEventStore.cs index 72042baec..ee51027b7 100644 --- a/Source/EventFlow/EventStores/IEventStore.cs +++ b/Source/EventFlow/EventStores/IEventStore.cs @@ -43,6 +43,7 @@ Task>> StoreAsync LoadAllEventsAsync( GlobalPosition globalPosition, int pageSize, + IEventUpgradeContext eventUpgradeContext, CancellationToken cancellationToken); Task>> LoadEventsAsync( diff --git a/Source/EventFlow/EventStores/IEventUpgradeContext.cs b/Source/EventFlow/EventStores/IEventUpgradeContext.cs index d11b18a45..4a335df4f 100644 --- a/Source/EventFlow/EventStores/IEventUpgradeContext.cs +++ b/Source/EventFlow/EventStores/IEventUpgradeContext.cs @@ -27,7 +27,9 @@ namespace EventFlow.EventStores { public interface IEventUpgradeContext { - bool TryGetUpgraders(Type aggregateType, out IReadOnlyCollection upgraders); + bool TryGetUpgraders( + Type aggregateType, + out IReadOnlyCollection upgraders); void AddUpgraders( Type aggregateType, diff --git a/Source/EventFlow/EventStores/IEventUpgradeContextFactory.cs b/Source/EventFlow/EventStores/IEventUpgradeContextFactory.cs new file mode 100644 index 000000000..6d16bef1a --- /dev/null +++ b/Source/EventFlow/EventStores/IEventUpgradeContextFactory.cs @@ -0,0 +1,33 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2022 Rasmus Mikkelsen +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System.Threading; +using System.Threading.Tasks; + +namespace EventFlow.EventStores +{ + public interface IEventUpgradeContextFactory + { + Task CreateAsync( + CancellationToken cancellationToken); + } +} diff --git a/Source/EventFlow/EventStores/IEventUpgradeManager.cs b/Source/EventFlow/EventStores/IEventUpgradeManager.cs index b81d918e9..b56760f0d 100644 --- a/Source/EventFlow/EventStores/IEventUpgradeManager.cs +++ b/Source/EventFlow/EventStores/IEventUpgradeManager.cs @@ -32,6 +32,7 @@ public interface IEventUpgradeManager { IAsyncEnumerable UpgradeAsync( IAsyncEnumerable domainEvents, + IEventUpgradeContext eventUpgradeContext, CancellationToken cancellationToken); IAsyncEnumerable> UpgradeAsync( diff --git a/Source/EventFlow/ReadStores/ReadModelPopulator.cs b/Source/EventFlow/ReadStores/ReadModelPopulator.cs index 944053f75..e462962d5 100644 --- a/Source/EventFlow/ReadStores/ReadModelPopulator.cs +++ b/Source/EventFlow/ReadStores/ReadModelPopulator.cs @@ -42,17 +42,20 @@ public class ReadModelPopulator : IReadModelPopulator private readonly IEventFlowConfiguration _configuration; private readonly IEventStore _eventStore; private readonly IServiceProvider _serviceProvider; + private readonly IEventUpgradeContextFactory _eventUpgradeContextFactory; public ReadModelPopulator( ILogger logger, IEventFlowConfiguration configuration, IEventStore eventStore, - IServiceProvider serviceProvider) + IServiceProvider serviceProvider, + IEventUpgradeContextFactory eventUpgradeContextFactory) { _logger = logger; _configuration = configuration; _eventStore = eventStore; _serviceProvider = serviceProvider; + _eventUpgradeContextFactory = eventUpgradeContextFactory; } public Task PurgeAsync( @@ -101,6 +104,7 @@ public async Task PopulateAsync( { var stopwatch = Stopwatch.StartNew(); var readStoreManagers = ResolveReadStoreManagers(readModelType); + var eventUpgradeContext = await _eventUpgradeContextFactory.CreateAsync(cancellationToken); var readModelTypes = new[] { @@ -140,6 +144,7 @@ public async Task PopulateAsync( var allEventsPage = await _eventStore.LoadAllEventsAsync( currentPosition, _configuration.PopulateReadModelEventPageSize, + eventUpgradeContext, cancellationToken) .ConfigureAwait(false); totalEvents += allEventsPage.DomainEvents.Count; From 974e82f1da1f54143d9bc02a1a964a95c0fc98dc Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Fri, 11 Nov 2022 11:18:30 +0100 Subject: [PATCH 8/9] Updated docs --- MIGRATION_GUIDE.md | 13 +++++++++++-- RELEASE_NOTES.md | 16 ++++++++++++++-- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/MIGRATION_GUIDE.md b/MIGRATION_GUIDE.md index bf8d2a5a6..54adc4b04 100644 --- a/MIGRATION_GUIDE.md +++ b/MIGRATION_GUIDE.md @@ -52,6 +52,13 @@ that wasn't possible to add before as introducing them would cause breaking chan This allows for connection strings to be fetched runtime from external sources. +- **Event upgraders are now async and works with the read model populator:** In + version 0.x, event upgraders aren't applied when events are loaded and + re-populated to read models using the `IReadModelPopulator`. This is fixed for + version 1.x, which properly will require some change to upgraders if the + re-population feature is used. + + ## Changes to supported .NET versions With the 1.x release, EventFlow limits the amount of supported .NET versions, to @@ -82,6 +89,10 @@ of EventFlow from 0.x to 1.x. - Since there is no change to the underlying storage, creating a release that only has EventFlow upgraded is highly recommended. This enables easy rollback if you encounter unexpected problems +- Since the `IEventUpgrader<,>` has changed significantly, consider using the new + base class `EventUpgraderNonAsync` in any existing upgraders. It provides an + `abstract` method with the same signature as the old interface that can be + overridden, making the switch significantly easier ## NuGet packages removed @@ -182,5 +193,3 @@ environments. - `IQueryProcessor.Process` - `IReadModelPopulator.Populate` - `IReadModelPopulator.Purge` - - diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 174089301..93f9f6714 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -8,9 +8,12 @@ as recommendations on how to do the migration. https://github.com/eventflow/EventFlow/blob/develop-v1/MIGRATION_GUIDE.md -Changes since 1.0.5001-alpha +Changes since last 1.x pre-release, `1.0.5001-alpha` -* New/breaking: `IEventUpgrader<,>` are now (finally) async +* New/breaking: `IEventUpgrader<,>` are now (finally) async. For an easy upgrade experience, + use the new base class `EventUpgraderNonAsync` for any existing upgraders. Its a `abstract` + class that implements the updated interface and provides a `abstract` method with the same + signature as the previous interface * Fix/breaking: Event upgraders are now used during read model population. As the upgraders are re-used across multiple aggregates, there is a high likelihood that some additions are needed in any existing upgraders. Upgraders are stored on the new `IEventUpgradeContext`, @@ -37,6 +40,15 @@ Complete 1.0 change log * New/breaking: SQL connection strings are now fetched from the `SqlConfiguration.GetConnectionStringAsync(...)` instead of a property, allowing more control of the connection string used at runtime +* New/breaking: `IEventUpgrader<,>` are now (finally) async. For an easy upgrade experience, + use the new base class `EventUpgraderNonAsync` for any existing upgraders. Its a `abstract` + class that implements the updated interface and provides a `abstract` method with the same + signature as the previous interface +* Fix/breaking: Event upgraders are now used during read model population. As the upgraders + are re-used across multiple aggregates, there is a high likelihood that some additions are + needed in any existing upgraders. Upgraders are stored on the new `IEventUpgradeContext`, + which is created by the new `IEventUpgradeContextFactory`. Replace this if you need addition + context during event upgrades * New: Its now possible to change the execution timeout for database migrations using the `SetUpgradeExecutionTimeout(...)` on the SQL configuration * Breaking: Removed the following dead and/or confusion MSSQL attributes. The real ones From f82e1894959324289c5a4a86e421dccaa3fb9818 Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Fri, 11 Nov 2022 11:20:48 +0100 Subject: [PATCH 9/9] Fix a few tests --- .../UnitTests/ReadStores/BaseReadModelTests.cs | 2 +- .../UnitTests/ReadStores/ReadModelPopulatorTests.cs | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/Source/EventFlow.Tests/UnitTests/ReadStores/BaseReadModelTests.cs b/Source/EventFlow.Tests/UnitTests/ReadStores/BaseReadModelTests.cs index e2c543222..d4b349504 100644 --- a/Source/EventFlow.Tests/UnitTests/ReadStores/BaseReadModelTests.cs +++ b/Source/EventFlow.Tests/UnitTests/ReadStores/BaseReadModelTests.cs @@ -75,7 +75,7 @@ public void SetUp() _eventStoreMock .Setup(s => s.LoadAllEventsAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) - .Returns((s, p, c) => Task.FromResult(GetEvents(s, p))); + .Returns((s, p, uc, c) => Task.FromResult(GetEvents(s, p))); _readStoreManagerMock .Setup(m => m.ReadModelType) .Returns(typeof(TReadModel)); diff --git a/Source/EventFlow.Tests/UnitTests/ReadStores/ReadModelPopulatorTests.cs b/Source/EventFlow.Tests/UnitTests/ReadStores/ReadModelPopulatorTests.cs index 161eb5498..f02919eb9 100644 --- a/Source/EventFlow.Tests/UnitTests/ReadStores/ReadModelPopulatorTests.cs +++ b/Source/EventFlow.Tests/UnitTests/ReadStores/ReadModelPopulatorTests.cs @@ -21,8 +21,12 @@ // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +using EventFlow.TestHelpers; +using NUnit.Framework; + namespace EventFlow.Tests.UnitTests.ReadStores { + [Category(Categories.Unit)] public class ReadModelPopulatorTests : BaseReadModelTests { }