From 5d0539aec7b9ba823a61f38cebb68f4312176a4d Mon Sep 17 00:00:00 2001 From: Ihar Bury Date: Tue, 12 Sep 2017 12:50:30 +0200 Subject: [PATCH 1/6] An optional subscription cancellation token is added to the subscription information class. This will allow exception handlers to be cancelled when the event store implementation sets the cancellation token. --- .../CreateSubscription.cs | 7 ------- .../SubscriptionInfo.cs | 19 +++++++++++++++++++ 2 files changed, 19 insertions(+), 7 deletions(-) create mode 100644 Src/LiquidProjections.Abstractions/SubscriptionInfo.cs diff --git a/Src/LiquidProjections.Abstractions/CreateSubscription.cs b/Src/LiquidProjections.Abstractions/CreateSubscription.cs index a81ad31..c820b5a 100644 --- a/Src/LiquidProjections.Abstractions/CreateSubscription.cs +++ b/Src/LiquidProjections.Abstractions/CreateSubscription.cs @@ -36,11 +36,4 @@ public class Subscriber /// public Func NoSuchCheckpoint { get; set; } } - - public class SubscriptionInfo - { - public string Id { get; set; } - - public IDisposable Subscription { get; set; } - } } \ No newline at end of file diff --git a/Src/LiquidProjections.Abstractions/SubscriptionInfo.cs b/Src/LiquidProjections.Abstractions/SubscriptionInfo.cs new file mode 100644 index 0000000..c46bbc8 --- /dev/null +++ b/Src/LiquidProjections.Abstractions/SubscriptionInfo.cs @@ -0,0 +1,19 @@ +using System; +using System.Threading; + +namespace LiquidProjections.Abstractions +{ + public class SubscriptionInfo + { + public string Id { get; set; } + + public IDisposable Subscription { get; set; } + + /// + /// The cancellation is requested when the subscription is being cancelled. + /// The cancellation token is disposed and cannot be used after the subscription cancellation is completed. + /// Old versions of event stores do not have the cancellation token. + /// + public CancellationToken? CancellationToken { get; set; } + } +} \ No newline at end of file From 997ce03b7dcb3754d58f6df1f7556de1280b231d Mon Sep 17 00:00:00 2001 From: Ihar Bury Date: Wed, 13 Sep 2017 14:20:45 +0200 Subject: [PATCH 2/6] The memory event source supports projecting transactions asynchronously --- .gitignore | 5 +- .../MemoryEventSource.cs | 422 +++++++++++++++--- .../TaskExtensions.cs | 48 ++ Src/LiquidProjections/Dispatcher.cs | 32 +- .../DispatcherSpecs.cs | 9 +- .../LiquidProjections.Specs/ProjectorSpecs.cs | 34 +- 6 files changed, 456 insertions(+), 94 deletions(-) create mode 100644 Src/LiquidProjections.Testing/TaskExtensions.cs diff --git a/.gitignore b/.gitignore index f3d6500..8226c8f 100644 --- a/.gitignore +++ b/.gitignore @@ -178,4 +178,7 @@ Artifacts/** # Cake related Build/** !Build/packages.config -Tools/** \ No newline at end of file +Tools/** + +# Visual Studio +.vs/ diff --git a/Src/LiquidProjections.Testing/MemoryEventSource.cs b/Src/LiquidProjections.Testing/MemoryEventSource.cs index 288aa6f..bbcf265 100644 --- a/Src/LiquidProjections.Testing/MemoryEventSource.cs +++ b/Src/LiquidProjections.Testing/MemoryEventSource.cs @@ -3,6 +3,7 @@ using System.Collections.ObjectModel; using System.Globalization; using System.Linq; +using System.Threading; using System.Threading.Tasks; using LiquidProjections.Abstractions; @@ -11,10 +12,11 @@ namespace LiquidProjections.Testing public class MemoryEventSource { private readonly int batchSize; - private long lastCheckpoint; - - private readonly List subscribers = new List(); + private readonly List subscriptions = new List(); private readonly List history = new List(); + private long lastHistoryCheckpoint; + private TaskCompletionSource historyGrowthTaskCompletionSource = new TaskCompletionSource(); + private readonly object syncRoot = new object(); public MemoryEventSource(int batchSize = 10) { @@ -23,35 +25,70 @@ public MemoryEventSource(int batchSize = 10) public IDisposable Subscribe(long? lastProcessedCheckpoint, Subscriber subscriber, string subscriptionId) { - lastCheckpoint = lastProcessedCheckpoint ?? 0; - var subscription = new Subscription(lastCheckpoint, batchSize, subscriber, subscriptionId); + return SubscribeAsync(lastProcessedCheckpoint, subscriber, subscriptionId) + .ConfigureAwait(false) + .GetAwaiter() + .GetResult(); + } - subscribers.Add(subscription); + public async Task SubscribeAsync(long? lastProcessedCheckpoint, Subscriber subscriber, string subscriptionId) + { + Subscription subscription = SubscribeWithoutWaitingInternal(lastProcessedCheckpoint, subscriber, subscriptionId); - async Task AsyncAction() + try { - if (history.LastOrDefault()?.Checkpoint < lastProcessedCheckpoint) - { - await subscriber.NoSuchCheckpoint(new SubscriptionInfo - { - Id = subscriptionId, - Subscription = subscription - }); - } + await subscription.WaitForCheckingWhetherItIsAhead().ConfigureAwait(false); + } + catch (OperationCanceledException) + { + // Do nothing. + } - foreach (Transaction transaction in history) - { - await subscription.Send(new[] {transaction}).ConfigureAwait(false); - } + long checkpointAtStart; + + lock (syncRoot) + { + checkpointAtStart = lastHistoryCheckpoint; } - AsyncAction().ConfigureAwait(false).GetAwaiter().GetResult(); + try + { + await subscription.WaitUntilCheckpoint(checkpointAtStart).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + // Do nothing. + } return subscription; } + public IDisposable SubscribeWithoutWaiting(long? lastProcessedCheckpoint, Subscriber subscriber, string subscriptionId) + { + return SubscribeWithoutWaitingInternal(lastProcessedCheckpoint, subscriber, subscriptionId); + } + + private Subscription SubscribeWithoutWaitingInternal(long? lastProcessedCheckpoint, Subscriber subscriber, string subscriptionId) + { + var subscription = new Subscription(lastProcessedCheckpoint ?? 0, batchSize, subscriber, subscriptionId, this); + + lock (syncRoot) + { + subscriptions.Add(subscription); + } + + subscription.Start(); + return subscription; + } public async Task Write(params object[] events) + { + Transaction transaction = WriteWithoutWaiting(events); + await WaitForAllSubscriptions().ConfigureAwait(false); + return transaction; + } + + public Transaction WriteWithoutWaiting(params object[] events) { Transaction transaction = new Transaction { @@ -61,39 +98,60 @@ public async Task Write(params object[] events) }).ToArray() }; - await Write(transaction).ConfigureAwait(false); + WriteWithoutWaiting(transaction); return transaction; } + + public Task Write(params Transaction[] transactions) + { + WriteWithoutWaiting(transactions); + return WaitForAllSubscriptions(); + } - public async Task Write(params Transaction[] transactions) + public void WriteWithoutWaiting(params Transaction[] transactions) { - foreach (var transaction in transactions) + if (transactions.Any()) { - if (transaction.Checkpoint == -1) - { - transaction.Checkpoint = (++lastCheckpoint); - } - else + lock (syncRoot) { - lastCheckpoint = transaction.Checkpoint; - } + foreach (Transaction transaction in transactions) + { + if (transaction.Checkpoint == -1) + { + lastHistoryCheckpoint++; + transaction.Checkpoint = lastHistoryCheckpoint; + } + else + { + lastHistoryCheckpoint = transaction.Checkpoint; + } - if (string.IsNullOrEmpty(transaction.Id)) - { - transaction.Id = transaction.Checkpoint.ToString(CultureInfo.InvariantCulture); - } + if (string.IsNullOrEmpty(transaction.Id)) + { + transaction.Id = transaction.Checkpoint.ToString(CultureInfo.InvariantCulture); + } - history.Add(transaction); - } + history.Add(transaction); + } - foreach (var subscriber in subscribers) - { - await subscriber.Send(transactions).ConfigureAwait(false); + TaskCompletionSource oldHistoryGrowthTaskCompletionSource = historyGrowthTaskCompletionSource; + historyGrowthTaskCompletionSource = new TaskCompletionSource(); + + // Execute continuations asynchronously. + Task.Run(() => oldHistoryGrowthTaskCompletionSource.SetResult(false)); + } } } public async Task WriteWithHeaders(object anEvent, IDictionary headers) + { + Transaction transaction = WriteWithHeadersWithoutWaiting(anEvent, headers); + await WaitForAllSubscriptions().ConfigureAwait(false); + return transaction; + } + + public Transaction WriteWithHeadersWithoutWaiting(object anEvent, IDictionary headers) { Transaction transaction = new Transaction { @@ -107,61 +165,281 @@ public async Task WriteWithHeaders(object anEvent, IDictionary subscriptionsAtStart; + long checkpointAtStart; + + lock (syncRoot) + { + subscriptionsAtStart = subscriptions.ToList(); + checkpointAtStart = lastHistoryCheckpoint; + } + + foreach (Subscription subscription in subscriptionsAtStart) + { + try + { + await subscription.WaitUntilCheckpoint(checkpointAtStart).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + // Do nothing. + } + } + } public bool HasSubscriptionForId(string subscriptionId) { - Subscription subscription = subscribers.SingleOrDefault(s => s.Id == subscriptionId); - return (subscription != null) && !subscription.IsDisposed; + lock (syncRoot) + { + Subscription subscription = subscriptions.SingleOrDefault(aSubscription => aSubscription.Id == subscriptionId); + return (subscription != null) && !subscription.IsDisposed; + } } - } - - internal class Subscription : IDisposable - { - private readonly long lastProcessedCheckpoint; - private readonly int batchSize; - private readonly Subscriber subscriber; - private readonly string subscriptionId; - private bool disposed = false; - public Subscription(long lastProcessedCheckpoint, int batchSize, - Subscriber subscriber, string subscriptionId) + private bool IsFutureCheckpoint(long checkpoint) { - this.lastProcessedCheckpoint = lastProcessedCheckpoint; - this.batchSize = batchSize; - this.subscriber = subscriber; - this.subscriptionId = subscriptionId; + lock (syncRoot) + { + return checkpoint > lastHistoryCheckpoint; + } } - public async Task Send(IEnumerable transactions) + private int GetNextTransactionIndex(long checkpoint) { - if (!disposed) + lock (syncRoot) { - var subscriptionInfo = new SubscriptionInfo + int index = 0; + + while (index < history.Count) { - Id = subscriptionId, - Subscription = this - }; + if (history[index].Checkpoint > checkpoint) + { + break; + } - Transaction[] requestedTransactions = transactions.Where(t => t.Checkpoint > lastProcessedCheckpoint).ToArray(); - foreach (var batch in requestedTransactions.InBatchesOf(batchSize)) - { - await subscriber.HandleTransactions(new ReadOnlyCollection(batch.ToList()), subscriptionInfo) - .ConfigureAwait(false); + index++; } + + return index; } } - public void Dispose() + private Task WaitForNewTransactions() { - disposed = true; + lock (syncRoot) + { + return historyGrowthTaskCompletionSource.Task; + } } - public bool IsDisposed => disposed; + private Transaction[] GetTransactionsFromIndex(int startIndex) + { + lock (syncRoot) + { + int count = history.Count - startIndex; + var result = new Transaction[count]; + history.CopyTo(startIndex, result, 0, count); + return result; + } + } + + private class Subscription : IDisposable + { + private long lastProcessedCheckpoint; + private readonly int batchSize; + private readonly Subscriber subscriber; + private readonly MemoryEventSource memoryEventSource; + private bool isDisposed; + private CancellationTokenSource cancellationTokenSource; + private readonly object syncRoot = new object(); + private Task task; + private TaskCompletionSource progressCompletionSource = new TaskCompletionSource(); + private readonly TaskCompletionSource waitForCheckingWhetherItIsAheadCompletionSource = new TaskCompletionSource(); + + public Subscription(long lastProcessedCheckpoint, int batchSize, + Subscriber subscriber, string subscriptionId, MemoryEventSource memoryEventSource) + { + this.lastProcessedCheckpoint = lastProcessedCheckpoint; + this.batchSize = batchSize; + this.subscriber = subscriber; + Id = subscriptionId; + this.memoryEventSource = memoryEventSource; + } + + public void Start() + { + if (task != null) + { + throw new InvalidOperationException("Already started."); + } + + lock (syncRoot) + { + if (isDisposed) + { + throw new ObjectDisposedException(nameof(Subscription)); + } + + cancellationTokenSource = new CancellationTokenSource(); + + SubscriptionInfo info = new SubscriptionInfo + { + Id = Id, + Subscription = this + }; + + task = Task.Factory.StartNew( + async () => + { + try + { + await RunAsync(info).ConfigureAwait(false); + } + catch (Exception) + { + Dispose(); + } + }, + cancellationTokenSource.Token, + TaskCreationOptions.DenyChildAttach | TaskCreationOptions.LongRunning, + TaskScheduler.Default) + .Unwrap(); + } + } + + private async Task RunAsync(SubscriptionInfo info) + { + long oldLastProcessedCheckpoint; + + lock (syncRoot) + { + oldLastProcessedCheckpoint = lastProcessedCheckpoint; + } + + if (memoryEventSource.IsFutureCheckpoint(oldLastProcessedCheckpoint)) + { + await subscriber.NoSuchCheckpoint(info).ConfigureAwait(false); + } + +#pragma warning disable 4014 + // Run continuations asynchronously. + Task.Run(() => waitForCheckingWhetherItIsAheadCompletionSource.TrySetResult(false)); +#pragma warning restore 4014 + + int nextTransactionIndex = memoryEventSource.GetNextTransactionIndex(oldLastProcessedCheckpoint); + + while (!cancellationTokenSource.IsCancellationRequested) + { + Task waitForNewTransactions = memoryEventSource.WaitForNewTransactions(); + Transaction[] transactions = memoryEventSource.GetTransactionsFromIndex(nextTransactionIndex); + + Transaction[] requestedTransactions = transactions + .Where(transaction => transaction.Checkpoint > oldLastProcessedCheckpoint) + .ToArray(); + + foreach (IList batch in requestedTransactions.InBatchesOf(batchSize)) + { + await subscriber.HandleTransactions(new ReadOnlyCollection(batch.ToList()), info) + .ConfigureAwait(false); + } - public string Id => subscriptionId; + if (requestedTransactions.Any()) + { + lock (syncRoot) + { + lastProcessedCheckpoint = requestedTransactions[requestedTransactions.Length - 1].Checkpoint; + + if (!isDisposed) + { + TaskCompletionSource oldProgressCompletionSource = progressCompletionSource; + progressCompletionSource = new TaskCompletionSource(); + +#pragma warning disable 4014 + // Run continuations asynchronously. + Task.Run(() => oldProgressCompletionSource.SetResult(lastProcessedCheckpoint)); +#pragma warning restore 4014 + } + } + } + + nextTransactionIndex += transactions.Length; + + await waitForNewTransactions + .WithWaitCancellation(cancellationTokenSource.Token) + .ConfigureAwait(false); + } + } + + public void Dispose() + { + lock (syncRoot) + { + if (!isDisposed) + { + isDisposed = true; + + // Run continuations and wait for the subscription task asynchronously. + Task.Run(() => + { + progressCompletionSource.SetCanceled(); + waitForCheckingWhetherItIsAheadCompletionSource.TrySetCanceled(); + + if (cancellationTokenSource != null) + { + if (!cancellationTokenSource.IsCancellationRequested) + { + cancellationTokenSource.Cancel(); + } + + task?.Wait(); + cancellationTokenSource.Dispose(); + } + }); + } + } + } + + public bool IsDisposed + { + get + { + lock (syncRoot) + { + return isDisposed; + } + } + } + + public string Id { get; } + + public async Task WaitUntilCheckpoint(long checkpoint) + { + while (true) + { + Task progressTask; + + lock (syncRoot) + { + progressTask = progressCompletionSource.Task; + + if (lastProcessedCheckpoint >= checkpoint) + { + return; + } + } + + await progressTask.ConfigureAwait(false); + } + } + + public Task WaitForCheckingWhetherItIsAhead() => waitForCheckingWhetherItIsAheadCompletionSource.Task; + } } } diff --git a/Src/LiquidProjections.Testing/TaskExtensions.cs b/Src/LiquidProjections.Testing/TaskExtensions.cs new file mode 100644 index 0000000..bcbd1b3 --- /dev/null +++ b/Src/LiquidProjections.Testing/TaskExtensions.cs @@ -0,0 +1,48 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace LiquidProjections.Testing +{ + internal static class TaskExtensions + { + public static Task WithWaitCancellation(this Task task, CancellationToken cancellationToken) + { + var taskCompletionSource = new TaskCompletionSource(); + CancellationTokenRegistration registration = cancellationToken.Register(CancelTask, taskCompletionSource); + + task.ContinueWith(ContinueTask, Tuple.Create(taskCompletionSource, registration), CancellationToken.None, + TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); + + return taskCompletionSource.Task; + } + + private static void CancelTask(object state) + { + var taskCompletionSource = (TaskCompletionSource)state; + taskCompletionSource.TrySetCanceled(); + } + + private static void ContinueTask(Task task, object state) + { + var tcsAndRegistration = (Tuple, CancellationTokenRegistration>)state; + + if (task.IsFaulted && (task.Exception != null)) + { + tcsAndRegistration.Item1.TrySetException(task.Exception.InnerException); + } + + if (task.IsCanceled) + { + tcsAndRegistration.Item1.TrySetCanceled(); + } + + if (task.IsCompleted) + { + tcsAndRegistration.Item1.TrySetResult(false); + } + + tcsAndRegistration.Item2.Dispose(); + } + } +} \ No newline at end of file diff --git a/Src/LiquidProjections/Dispatcher.cs b/Src/LiquidProjections/Dispatcher.cs index 90d794f..74727de 100644 --- a/Src/LiquidProjections/Dispatcher.cs +++ b/Src/LiquidProjections/Dispatcher.cs @@ -61,19 +61,27 @@ private async Task HandleUnknownCheckpoint(SubscriptionInfo info, Func + { + await options.BeforeRestarting(); + + Subscribe(null, handler, options); + }, + abort: exception => + { + LogProvider.GetLogger(typeof(Dispatcher)).FatalException( + "Failed to restart the projector.", + exception); + }, + ignore: () => Subscribe(null, handler, options)); + + // Dispose the subscription only after a new subscription is created + // to support CreateSubscription delegates which wait + // until the subscription detects being ahead + // and until all the existing transactions are processed by the new subscription info.Subscription?.Dispose(); - - await ExecuteWithPolicy(info, async () => - { - await options.BeforeRestarting(); - - Subscribe(null, handler, options); - }, abort: exception => - { - LogProvider.GetLogger(typeof(Dispatcher)).FatalException( - "Failed to restart the projector.", - exception); - }, ignore: () => Subscribe(null, handler, options)); } } diff --git a/Tests/LiquidProjections.Specs/DispatcherSpecs.cs b/Tests/LiquidProjections.Specs/DispatcherSpecs.cs index 90994d2..4be2c71 100644 --- a/Tests/LiquidProjections.Specs/DispatcherSpecs.cs +++ b/Tests/LiquidProjections.Specs/DispatcherSpecs.cs @@ -210,7 +210,10 @@ await The().Write( Subject.Subscribe(1000, (transactions, info) => { - trace.Add("TransactionsReceived"); + foreach (var transaction in transactions) + { + trace.Add("TransactionReceived"); + } foreach (var transaction in transactions) { @@ -233,7 +236,7 @@ public async Task It_should_allow_the_subscriber_to_cleanup_before_restarting() { await allTransactionsReceived.Task.TimeoutAfter(30.Seconds()); - trace.Should().Equal("BeforeRestarting", "TransactionsReceived", "TransactionsReceived"); + trace.Should().Equal("BeforeRestarting", "TransactionReceived", "TransactionReceived"); } [Fact] @@ -241,7 +244,7 @@ public async Task It_should_restart_sending_transactions_from_the_beginning() { var transactions = await allTransactionsReceived.Task.TimeoutAfter(30.Seconds()); - transactions.First().Checkpoint.Should().Be(999); + transactions.First().Checkpoint.Should().Be(1); } } public class When_the_autorestart_cleanup_action_throws_but_a_retry_is_requested : GivenSubject diff --git a/Tests/LiquidProjections.Specs/ProjectorSpecs.cs b/Tests/LiquidProjections.Specs/ProjectorSpecs.cs index d283a29..133b01e 100644 --- a/Tests/LiquidProjections.Specs/ProjectorSpecs.cs +++ b/Tests/LiquidProjections.Specs/ProjectorSpecs.cs @@ -15,6 +15,8 @@ public class Given_a_projector_with_an_in_memory_event_source : GivenSubject Events; + protected List ProjectionExceptions { get; } = new List(); + public Given_a_projector_with_an_in_memory_event_source() { Given(() => @@ -29,9 +31,24 @@ public Given_a_projector_with_an_in_memory_event_source() protected void StartProjecting() { - The().Subscribe(110, new Subscriber + The().Subscribe(null, new Subscriber { - HandleTransactions = async (transactions, info) => await Subject.Handle(transactions) + HandleTransactions = async (transactions, info) => + { + try + { + await Subject.Handle(transactions); + } + catch (OperationCanceledException) + { + // Do nothing. + } + catch (Exception exception) + { + ProjectionExceptions.Add(exception); + info.Subscription.Dispose(); + } + } }, ""); } } @@ -235,26 +252,31 @@ public When_event_handling_fails() StartProjecting(); }); - When(() => The().Write(The()), deferredExecution: true); + When(() => The().Write(The())); } [Fact] public void Then_it_should_wrap_the_exception_into_a_projection_exception() { - WhenAction.ShouldThrow() + ProjectionExceptions.Should().ContainSingle() + .Which.Should().BeOfType() .Which.InnerException.Should().BeSameAs(The()); } [Fact] public void Then_it_should_include_the_current_event_into_the_projection_exception() { - WhenAction.ShouldThrow().Which.CurrentEvent.Should().Be(The()); + ProjectionExceptions.Should().ContainSingle() + .Which.Should().BeOfType() + .Which.CurrentEvent.Should().Be(The()); } [Fact] public void Then_it_should_include_the_current_transaction_batch_into_the_projection_exception() { - WhenAction.ShouldThrow().Which.TransactionBatch.Should().BeEquivalentTo(The()); + ProjectionExceptions.Should().ContainSingle() + .Which.Should().BeOfType() + .Which.TransactionBatch.Should().BeEquivalentTo(The()); } } From 7c8d678d9402cb271eb8d71eabd2b00ac7e7477c Mon Sep 17 00:00:00 2001 From: Ihar Bury Date: Wed, 13 Sep 2017 14:32:25 +0200 Subject: [PATCH 3/6] The memory event source supports the cancellation token in the subscription info --- .../MemoryEventSource.cs | 3 +- .../DispatcherSpecs.cs | 63 +++++++++++++++++++ 2 files changed, 65 insertions(+), 1 deletion(-) diff --git a/Src/LiquidProjections.Testing/MemoryEventSource.cs b/Src/LiquidProjections.Testing/MemoryEventSource.cs index bbcf265..31184de 100644 --- a/Src/LiquidProjections.Testing/MemoryEventSource.cs +++ b/Src/LiquidProjections.Testing/MemoryEventSource.cs @@ -292,7 +292,8 @@ public void Start() SubscriptionInfo info = new SubscriptionInfo { Id = Id, - Subscription = this + Subscription = this, + CancellationToken = cancellationTokenSource.Token }; task = Task.Factory.StartNew( diff --git a/Tests/LiquidProjections.Specs/DispatcherSpecs.cs b/Tests/LiquidProjections.Specs/DispatcherSpecs.cs index 4be2c71..d525bca 100644 --- a/Tests/LiquidProjections.Specs/DispatcherSpecs.cs +++ b/Tests/LiquidProjections.Specs/DispatcherSpecs.cs @@ -2,6 +2,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Chill; @@ -119,6 +120,68 @@ public void It_should_not_log_anything() The().Exception.Should().BeNull(); } } + public class When_a_projector_throws_an_exception_and_the_exception_handler_has_a_delay_and_the_subscription_is_disposed : + GivenSubject + { + private readonly ManualResetEventSlim delayStarted = new ManualResetEventSlim(); + private readonly ManualResetEventSlim delayFinished = new ManualResetEventSlim(); + private IDisposable subscription; + + public When_a_projector_throws_an_exception_and_the_exception_handler_has_a_delay_and_the_subscription_is_disposed() + { + Given(() => + { + UseThe(new MemoryEventSource()); + WithSubject(_ => new Dispatcher(The().Subscribe)); + + LogProvider.SetCurrentLogProvider(UseThe(new FakeLogProvider())); + + UseThe(new ProjectionException("Some message.")); + + Subject.ExceptionHandler = async (exc, attempts, subscription) => + { + delayStarted.Set(); + + try + { + await Task.Delay(TimeSpan.FromDays(1), subscription.CancellationToken.Value); + } + finally + { + delayFinished.Set(); + } + + return ExceptionResolution.Retry; + }; + + subscription = Subject.Subscribe(null, (transaction, info) => + { + throw The(); + }); + + The().WriteWithoutWaiting(new List()); + }); + + When(() => + { + if (!delayStarted.Wait(TimeSpan.FromSeconds(10))) + { + throw new InvalidOperationException("The delay has not started in 10 seconds."); + } + + subscription.Dispose(); + }); + } + + [Fact] + public void It_should_stop_waiting() + { + if (!delayFinished.Wait(TimeSpan.FromSeconds(10))) + { + throw new InvalidOperationException("The delay has not been cancelled in 10 seconds."); + } + } + } public class When_a_projector_throws_an_exception_that_can_be_ignored : GivenSubject { private int attempts; From 3cb7d7f47bafe60515ed4adea9e7d8e874ff1cb7 Mon Sep 17 00:00:00 2001 From: Ihar Bury Date: Wed, 13 Sep 2017 14:46:34 +0200 Subject: [PATCH 4/6] Success handler added to the dispatcher --- Src/LiquidProjections/Dispatcher.cs | 31 +++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/Src/LiquidProjections/Dispatcher.cs b/Src/LiquidProjections/Dispatcher.cs index 74727de..3fb3c08 100644 --- a/Src/LiquidProjections/Dispatcher.cs +++ b/Src/LiquidProjections/Dispatcher.cs @@ -45,16 +45,25 @@ public IDisposable Subscribe(long? lastProcessedCheckpoint, /// public HandleException ExceptionHandler { get; set; } = (e, attempts, info) => Task.FromResult(ExceptionResolution.Abort); + public HandleSuccess SuccessHandler { get; set; } = _ => Task.FromResult(false); + private async Task HandleTransactions(IReadOnlyList transactions, Func, SubscriptionInfo, Task> handler, SubscriptionInfo info) { - await ExecuteWithPolicy(info, () => handler(transactions, info), abort: exception => - { - LogProvider.GetLogger(typeof(Dispatcher)).FatalException( - "Projector exception was not handled. Event subscription has been cancelled.", - exception); + await ExecuteWithPolicy( + info, + async () => + { + await handler(transactions, info); + await SuccessHandler(info); + }, + abort: exception => + { + LogProvider.GetLogger(typeof(Dispatcher)).FatalException( + "Projector exception was not handled. Event subscription has been cancelled.", + exception); - info.Subscription?.Dispose(); - }); + info.Subscription?.Dispose(); + }); } private async Task HandleUnknownCheckpoint(SubscriptionInfo info, Func, SubscriptionInfo, Task> handler, SubscriptionOptions options) @@ -141,7 +150,13 @@ public enum ExceptionResolution Abort, Retry } - + + /// + /// Defines the signature for a method that handles a successful transaction dispatching iteration. + /// + /// Information about the subscription. + public delegate Task HandleSuccess(SubscriptionInfo info); + public class SubscriptionOptions { /// From 2f5d228be7a70eeab9f3ac1eada0132ff337c829 Mon Sep 17 00:00:00 2001 From: Ihar Bury Date: Wed, 13 Sep 2017 14:48:54 +0200 Subject: [PATCH 5/6] Dispatcher never uses the current synchronization context to execute task continuations --- Src/LiquidProjections/Dispatcher.cs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/Src/LiquidProjections/Dispatcher.cs b/Src/LiquidProjections/Dispatcher.cs index 3fb3c08..903b291 100644 --- a/Src/LiquidProjections/Dispatcher.cs +++ b/Src/LiquidProjections/Dispatcher.cs @@ -34,8 +34,8 @@ public IDisposable Subscribe(long? lastProcessedCheckpoint, return createSubscription(lastProcessedCheckpoint, new Subscriber { - HandleTransactions = async (transactions, info) => await HandleTransactions(transactions, handler, info), - NoSuchCheckpoint = async info => await HandleUnknownCheckpoint(info, handler, options) + HandleTransactions = async (transactions, info) => await HandleTransactions(transactions, handler, info).ConfigureAwait(false), + NoSuchCheckpoint = async info => await HandleUnknownCheckpoint(info, handler, options).ConfigureAwait(false) }, options.Id); } @@ -53,8 +53,8 @@ await ExecuteWithPolicy( info, async () => { - await handler(transactions, info); - await SuccessHandler(info); + await handler(transactions, info).ConfigureAwait(false); + await SuccessHandler(info).ConfigureAwait(false); }, abort: exception => { @@ -63,7 +63,8 @@ await ExecuteWithPolicy( exception); info.Subscription?.Dispose(); - }); + }) + .ConfigureAwait(false); } private async Task HandleUnknownCheckpoint(SubscriptionInfo info, Func, SubscriptionInfo, Task> handler, SubscriptionOptions options) @@ -74,7 +75,7 @@ await ExecuteWithPolicy( info, async () => { - await options.BeforeRestarting(); + await options.BeforeRestarting().ConfigureAwait(false); Subscribe(null, handler, options); }, @@ -84,7 +85,8 @@ await ExecuteWithPolicy( "Failed to restart the projector.", exception); }, - ignore: () => Subscribe(null, handler, options)); + ignore: () => Subscribe(null, handler, options)) + .ConfigureAwait(false); // Dispose the subscription only after a new subscription is created // to support CreateSubscription delegates which wait @@ -103,12 +105,12 @@ private async Task ExecuteWithPolicy(SubscriptionInfo info, Func action, A try { attempts++; - await action(); + await action().ConfigureAwait(false); retry = false; } catch (Exception exception) { - ExceptionResolution resolution = await ExceptionHandler(exception, attempts, info); + ExceptionResolution resolution = await ExceptionHandler(exception, attempts, info).ConfigureAwait(false); switch (resolution) { case ExceptionResolution.Abort: From 664d5760696a22149ee6488d01c402394a026a89 Mon Sep 17 00:00:00 2001 From: Ihar Bury Date: Wed, 13 Sep 2017 16:40:57 +0200 Subject: [PATCH 6/6] XML comments added for the memory event source --- .../MemoryEventSource.cs | 121 ++++++++++++++++++ 1 file changed, 121 insertions(+) diff --git a/Src/LiquidProjections.Testing/MemoryEventSource.cs b/Src/LiquidProjections.Testing/MemoryEventSource.cs index 31184de..e8c8f59 100644 --- a/Src/LiquidProjections.Testing/MemoryEventSource.cs +++ b/Src/LiquidProjections.Testing/MemoryEventSource.cs @@ -9,6 +9,9 @@ namespace LiquidProjections.Testing { + /// + /// An event source which stores all the transactions in memory and has methods which are convenient for testing. + /// public class MemoryEventSource { private readonly int batchSize; @@ -18,11 +21,36 @@ public class MemoryEventSource private TaskCompletionSource historyGrowthTaskCompletionSource = new TaskCompletionSource(); private readonly object syncRoot = new object(); + /// + /// Creates a new instance of the event source. + /// + /// + /// The maximum number of transactions in batches which are handled by the subscribers. The default is 10. + /// public MemoryEventSource(int batchSize = 10) { this.batchSize = batchSize; } + /// + /// Creates a new subscription which will handle all the transactions after the given checkpoint. + /// Waits (synchronously) for the subscription to process all the transactions that are already in the event source. + /// If the given is ahead of the event source + /// and that is ignored by the handler, + /// waits (synchronously) for a replacement subscription to be created + /// and to process all the transactions that are already in the event source. + /// The replacement subscription is not returned. + /// The code that creates the replacement subscription is responsible for cancelling it. + /// + /// + /// If has value, only the transactions with checkpoints greater than the given value will be processed. + /// + /// The which will handle the transactions. + /// An arbitrary string identifying the subscription. + /// + /// An object implementing the interface. + /// Disposing the object will cancel the subscription asynchronously. + /// public IDisposable Subscribe(long? lastProcessedCheckpoint, Subscriber subscriber, string subscriptionId) { return SubscribeAsync(lastProcessedCheckpoint, subscriber, subscriptionId) @@ -31,6 +59,25 @@ public IDisposable Subscribe(long? lastProcessedCheckpoint, Subscriber subscribe .GetResult(); } + /// + /// Creates a new subscription which will handle all the transactions after the given checkpoint. + /// Waits asynchronously for the subscription to process all the transactions that are already in the event source. + /// If the given is ahead of the event source + /// and that is ignored by the handler, + /// waits asynchronously for a replacement subscription to be created + /// and to process all the transactions that are already in the event source. + /// The replacement subscription is not returned. + /// The code that creates the replacement subscription is responsible for cancelling it. + /// + /// + /// If has value, only the transactions with checkpoints greater than the given value will be processed. + /// + /// The which will handle the transactions. + /// An arbitrary string identifying the subscription. + /// + /// A task that returns an object implementing the interface. + /// Disposing the object will cancel the subscription asynchronously. + /// public async Task SubscribeAsync(long? lastProcessedCheckpoint, Subscriber subscriber, string subscriptionId) { Subscription subscription = SubscribeWithoutWaitingInternal(lastProcessedCheckpoint, subscriber, subscriptionId); @@ -63,6 +110,22 @@ public async Task SubscribeAsync(long? lastProcessedCheckpoint, Sub return subscription; } + /// + /// Creates a new subscription which will handle all the transactions after the given checkpoint. + /// Does not wait for the subscription to process any transactions. + /// If the given is ahead of the event source + /// and that is ignored by the handler, + /// does not wait for a replacement subscription to be created. + /// + /// + /// If has value, only the transactions with checkpoints greater than the given value will be processed. + /// + /// The which will handle the transactions. + /// An arbitrary string identifying the subscription. + /// + /// An object implementing the interface. + /// Disposing the object will cancel the subscription asynchronously. + /// public IDisposable SubscribeWithoutWaiting(long? lastProcessedCheckpoint, Subscriber subscriber, string subscriptionId) { return SubscribeWithoutWaitingInternal(lastProcessedCheckpoint, subscriber, subscriptionId); @@ -81,6 +144,13 @@ private Subscription SubscribeWithoutWaitingInternal(long? lastProcessedCheckpoi return subscription; } + /// + /// Adds a new transaction containing the given events to the end of the event source. + /// An incremental checkpoint number is automatically generated for the transaction. + /// Waits for all the subscriptions to process the transaction. + /// + /// The events to be included into the transaction. + /// A task returning the created transaction. public async Task Write(params object[] events) { Transaction transaction = WriteWithoutWaiting(events); @@ -88,6 +158,13 @@ public async Task Write(params object[] events) return transaction; } + /// + /// Adds a new transaction containing the given events to the end of the event source. + /// An incremental checkpoint number is automatically generated for the transaction. + /// Does not wait for the transaction to be processed by any subscriptions. + /// + /// The events to be included into the transaction. + /// A task returning the created transaction. public Transaction WriteWithoutWaiting(params object[] events) { Transaction transaction = new Transaction @@ -103,12 +180,27 @@ public Transaction WriteWithoutWaiting(params object[] events) return transaction; } + /// + /// Adds the given transactions to the end of the event source. + /// If a transaction has -1 instead of a valid checkpoint number, + /// an incremental checkpoint number is automatically generated for the transaction. + /// Waits for all the subscriptions to process the transactions. + /// + /// The transactions to be added. + /// A task that completes after all the subscriptions have processed the transactions. public Task Write(params Transaction[] transactions) { WriteWithoutWaiting(transactions); return WaitForAllSubscriptions(); } + /// + /// Adds the given transactions to the end of the event source. + /// If a transaction has -1 instead of a valid checkpoint number, + /// an incremental checkpoint number is automatically generated for the transaction. + /// Does not wait for the transaction to be processed by any subscriptions. + /// + /// The transactions to be added. public void WriteWithoutWaiting(params Transaction[] transactions) { if (transactions.Any()) @@ -144,6 +236,15 @@ public void WriteWithoutWaiting(params Transaction[] transactions) } } + /// + /// Adds a new transaction containing the given event to the end of the event source. + /// Allows to specify headers for the transaction. + /// An incremental checkpoint number is automatically generated for the transaction. + /// Waits for all the subscriptions to process the transaction. + /// + /// The event to be included into the transaction. + /// The headers for the transaction. + /// A task returning the created transaction. public async Task WriteWithHeaders(object anEvent, IDictionary headers) { Transaction transaction = WriteWithHeadersWithoutWaiting(anEvent, headers); @@ -151,6 +252,15 @@ public async Task WriteWithHeaders(object anEvent, IDictionary + /// Adds a new transaction containing the given event to the end of the event source. + /// Allows to specify headers for the transaction. + /// An incremental checkpoint number is automatically generated for the transaction. + /// Does not wait for the transaction to be processed by any subscriptions. + /// + /// The event to be included into the transaction. + /// The headers for the transaction. + /// A task returning the created transaction. public Transaction WriteWithHeadersWithoutWaiting(object anEvent, IDictionary headers) { Transaction transaction = new Transaction @@ -170,6 +280,11 @@ public Transaction WriteWithHeadersWithoutWaiting(object anEvent, IDictionary + /// Waits for all the subscriptions to process all the transaction which are already in the event source + /// but not yet processed by a subscription. + /// + /// A task that completes after all the subscriptions have processed the transactions. public async Task WaitForAllSubscriptions() { List subscriptionsAtStart; @@ -194,6 +309,12 @@ public async Task WaitForAllSubscriptions() } } + /// + /// Checks whether the event source has a non-cancelled subscription with the given identifier. + /// + /// + /// The identifier of the subscription which was specified when the subscription was created. + /// public bool HasSubscriptionForId(string subscriptionId) { lock (syncRoot)