Skip to content

Commit

Permalink
Merge pull request #103 from IharBury/exception-handler-cancellation
Browse files Browse the repository at this point in the history
Dispatcher supports exception handler cancellation and success handling
  • Loading branch information
dennisdoomen authored Sep 13, 2017
2 parents f0b7fd6 + 664d576 commit 1cd60ad
Show file tree
Hide file tree
Showing 8 changed files with 689 additions and 113 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,7 @@ Artifacts/**
# Cake related
Build/**
!Build/packages.config
Tools/**
Tools/**

# Visual Studio
.vs/
7 changes: 0 additions & 7 deletions Src/LiquidProjections.Abstractions/CreateSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,4 @@ public class Subscriber
/// </summary>
public Func<SubscriptionInfo, Task> NoSuchCheckpoint { get; set; }
}

public class SubscriptionInfo
{
public string Id { get; set; }

public IDisposable Subscription { get; set; }
}
}
19 changes: 19 additions & 0 deletions Src/LiquidProjections.Abstractions/SubscriptionInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System;
using System.Threading;

namespace LiquidProjections.Abstractions
{
public class SubscriptionInfo
{
public string Id { get; set; }

public IDisposable Subscription { get; set; }

/// <summary>
/// The cancellation is requested when the subscription is being cancelled.
/// The cancellation token is disposed and cannot be used after the subscription cancellation is completed.
/// Old versions of event stores do not have the cancellation token.
/// </summary>
public CancellationToken? CancellationToken { get; set; }
}
}
544 changes: 472 additions & 72 deletions Src/LiquidProjections.Testing/MemoryEventSource.cs

Large diffs are not rendered by default.

48 changes: 48 additions & 0 deletions Src/LiquidProjections.Testing/TaskExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace LiquidProjections.Testing
{
internal static class TaskExtensions
{
public static Task WithWaitCancellation(this Task task, CancellationToken cancellationToken)
{
var taskCompletionSource = new TaskCompletionSource<bool>();
CancellationTokenRegistration registration = cancellationToken.Register(CancelTask, taskCompletionSource);

task.ContinueWith(ContinueTask, Tuple.Create(taskCompletionSource, registration), CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

return taskCompletionSource.Task;
}

private static void CancelTask(object state)
{
var taskCompletionSource = (TaskCompletionSource<bool>)state;
taskCompletionSource.TrySetCanceled();
}

private static void ContinueTask(Task task, object state)
{
var tcsAndRegistration = (Tuple<TaskCompletionSource<bool>, CancellationTokenRegistration>)state;

if (task.IsFaulted && (task.Exception != null))
{
tcsAndRegistration.Item1.TrySetException(task.Exception.InnerException);
}

if (task.IsCanceled)
{
tcsAndRegistration.Item1.TrySetCanceled();
}

if (task.IsCompleted)
{
tcsAndRegistration.Item1.TrySetResult(false);
}

tcsAndRegistration.Item2.Dispose();
}
}
}
73 changes: 49 additions & 24 deletions Src/LiquidProjections/Dispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ public IDisposable Subscribe(long? lastProcessedCheckpoint,

return createSubscription(lastProcessedCheckpoint, new Subscriber
{
HandleTransactions = async (transactions, info) => await HandleTransactions(transactions, handler, info),
NoSuchCheckpoint = async info => await HandleUnknownCheckpoint(info, handler, options)
HandleTransactions = async (transactions, info) => await HandleTransactions(transactions, handler, info).ConfigureAwait(false),
NoSuchCheckpoint = async info => await HandleUnknownCheckpoint(info, handler, options).ConfigureAwait(false)
}, options.Id);
}

Expand All @@ -45,35 +45,54 @@ public IDisposable Subscribe(long? lastProcessedCheckpoint,
/// </summary>
public HandleException ExceptionHandler { get; set; } = (e, attempts, info) => Task.FromResult(ExceptionResolution.Abort);

public HandleSuccess SuccessHandler { get; set; } = _ => Task.FromResult(false);

private async Task HandleTransactions(IReadOnlyList<Transaction> transactions, Func<IReadOnlyList<Transaction>, SubscriptionInfo, Task> handler, SubscriptionInfo info)
{
await ExecuteWithPolicy(info, () => handler(transactions, info), abort: exception =>
{
LogProvider.GetLogger(typeof(Dispatcher)).FatalException(
"Projector exception was not handled. Event subscription has been cancelled.",
exception);
await ExecuteWithPolicy(
info,
async () =>
{
await handler(transactions, info).ConfigureAwait(false);
await SuccessHandler(info).ConfigureAwait(false);
},
abort: exception =>
{
LogProvider.GetLogger(typeof(Dispatcher)).FatalException(
"Projector exception was not handled. Event subscription has been cancelled.",
exception);

info.Subscription?.Dispose();
});
info.Subscription?.Dispose();
})
.ConfigureAwait(false);
}

private async Task HandleUnknownCheckpoint(SubscriptionInfo info, Func<IReadOnlyList<Transaction>, SubscriptionInfo, Task> handler, SubscriptionOptions options)
{
if (options.RestartWhenAhead)
{
await ExecuteWithPolicy(
info,
async () =>
{
await options.BeforeRestarting().ConfigureAwait(false);

Subscribe(null, handler, options);
},
abort: exception =>
{
LogProvider.GetLogger(typeof(Dispatcher)).FatalException(
"Failed to restart the projector.",
exception);
},
ignore: () => Subscribe(null, handler, options))
.ConfigureAwait(false);

// Dispose the subscription only after a new subscription is created
// to support CreateSubscription delegates which wait
// until the subscription detects being ahead
// and until all the existing transactions are processed by the new subscription
info.Subscription?.Dispose();

await ExecuteWithPolicy(info, async () =>
{
await options.BeforeRestarting();

Subscribe(null, handler, options);
}, abort: exception =>
{
LogProvider.GetLogger(typeof(Dispatcher)).FatalException(
"Failed to restart the projector.",
exception);
}, ignore: () => Subscribe(null, handler, options));
}
}

Expand All @@ -86,12 +105,12 @@ private async Task ExecuteWithPolicy(SubscriptionInfo info, Func<Task> action, A
try
{
attempts++;
await action();
await action().ConfigureAwait(false);
retry = false;
}
catch (Exception exception)
{
ExceptionResolution resolution = await ExceptionHandler(exception, attempts, info);
ExceptionResolution resolution = await ExceptionHandler(exception, attempts, info).ConfigureAwait(false);
switch (resolution)
{
case ExceptionResolution.Abort:
Expand Down Expand Up @@ -133,7 +152,13 @@ public enum ExceptionResolution
Abort,
Retry
}


/// <summary>
/// Defines the signature for a method that handles a successful transaction dispatching iteration.
/// </summary>
/// <param name="info">Information about the subscription.</param>
public delegate Task HandleSuccess(SubscriptionInfo info);

public class SubscriptionOptions
{
/// <summary>
Expand Down
72 changes: 69 additions & 3 deletions Tests/LiquidProjections.Specs/DispatcherSpecs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

using Chill;
Expand Down Expand Up @@ -119,6 +120,68 @@ public void It_should_not_log_anything()
The<FakeLogProvider>().Exception.Should().BeNull();
}
}
public class When_a_projector_throws_an_exception_and_the_exception_handler_has_a_delay_and_the_subscription_is_disposed :
GivenSubject<Dispatcher>
{
private readonly ManualResetEventSlim delayStarted = new ManualResetEventSlim();
private readonly ManualResetEventSlim delayFinished = new ManualResetEventSlim();
private IDisposable subscription;

public When_a_projector_throws_an_exception_and_the_exception_handler_has_a_delay_and_the_subscription_is_disposed()
{
Given(() =>
{
UseThe(new MemoryEventSource());
WithSubject(_ => new Dispatcher(The<MemoryEventSource>().Subscribe));

LogProvider.SetCurrentLogProvider(UseThe(new FakeLogProvider()));

UseThe(new ProjectionException("Some message."));

Subject.ExceptionHandler = async (exc, attempts, subscription) =>
{
delayStarted.Set();

try
{
await Task.Delay(TimeSpan.FromDays(1), subscription.CancellationToken.Value);
}
finally
{
delayFinished.Set();
}

return ExceptionResolution.Retry;
};

subscription = Subject.Subscribe(null, (transaction, info) =>
{
throw The<ProjectionException>();
});

The<MemoryEventSource>().WriteWithoutWaiting(new List<Transaction>());
});

When(() =>
{
if (!delayStarted.Wait(TimeSpan.FromSeconds(10)))
{
throw new InvalidOperationException("The delay has not started in 10 seconds.");
}

subscription.Dispose();
});
}

[Fact]
public void It_should_stop_waiting()
{
if (!delayFinished.Wait(TimeSpan.FromSeconds(10)))
{
throw new InvalidOperationException("The delay has not been cancelled in 10 seconds.");
}
}
}
public class When_a_projector_throws_an_exception_that_can_be_ignored : GivenSubject<Dispatcher>
{
private int attempts;
Expand Down Expand Up @@ -210,7 +273,10 @@ await The<MemoryEventSource>().Write(

Subject.Subscribe(1000, (transactions, info) =>
{
trace.Add("TransactionsReceived");
foreach (var transaction in transactions)
{
trace.Add("TransactionReceived");
}

foreach (var transaction in transactions)
{
Expand All @@ -233,15 +299,15 @@ public async Task It_should_allow_the_subscriber_to_cleanup_before_restarting()
{
await allTransactionsReceived.Task.TimeoutAfter(30.Seconds());

trace.Should().Equal("BeforeRestarting", "TransactionsReceived", "TransactionsReceived");
trace.Should().Equal("BeforeRestarting", "TransactionReceived", "TransactionReceived");
}

[Fact]
public async Task It_should_restart_sending_transactions_from_the_beginning()
{
var transactions = await allTransactionsReceived.Task.TimeoutAfter(30.Seconds());

transactions.First().Checkpoint.Should().Be(999);
transactions.First().Checkpoint.Should().Be(1);
}
}
public class When_the_autorestart_cleanup_action_throws_but_a_retry_is_requested : GivenSubject<Dispatcher>
Expand Down
34 changes: 28 additions & 6 deletions Tests/LiquidProjections.Specs/ProjectorSpecs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ public class Given_a_projector_with_an_in_memory_event_source : GivenSubject<Pro
{
protected EventMapBuilder<ProjectionContext> Events;

protected List<Exception> ProjectionExceptions { get; } = new List<Exception>();

public Given_a_projector_with_an_in_memory_event_source()
{
Given(() =>
Expand All @@ -29,9 +31,24 @@ public Given_a_projector_with_an_in_memory_event_source()

protected void StartProjecting()
{
The<MemoryEventSource>().Subscribe(110, new Subscriber
The<MemoryEventSource>().Subscribe(null, new Subscriber
{
HandleTransactions = async (transactions, info) => await Subject.Handle(transactions)
HandleTransactions = async (transactions, info) =>
{
try
{
await Subject.Handle(transactions);
}
catch (OperationCanceledException)
{
// Do nothing.
}
catch (Exception exception)
{
ProjectionExceptions.Add(exception);
info.Subscription.Dispose();
}
}
}, "");
}
}
Expand Down Expand Up @@ -235,26 +252,31 @@ public When_event_handling_fails()
StartProjecting();
});

When(() => The<MemoryEventSource>().Write(The<Transaction>()), deferredExecution: true);
When(() => The<MemoryEventSource>().Write(The<Transaction>()));
}

[Fact]
public void Then_it_should_wrap_the_exception_into_a_projection_exception()
{
WhenAction.ShouldThrow<ProjectionException>()
ProjectionExceptions.Should().ContainSingle()
.Which.Should().BeOfType<ProjectionException>()
.Which.InnerException.Should().BeSameAs(The<InvalidOperationException>());
}

[Fact]
public void Then_it_should_include_the_current_event_into_the_projection_exception()
{
WhenAction.ShouldThrow<ProjectionException>().Which.CurrentEvent.Should().Be(The<EventEnvelope>());
ProjectionExceptions.Should().ContainSingle()
.Which.Should().BeOfType<ProjectionException>()
.Which.CurrentEvent.Should().Be(The<EventEnvelope>());
}

[Fact]
public void Then_it_should_include_the_current_transaction_batch_into_the_projection_exception()
{
WhenAction.ShouldThrow<ProjectionException>().Which.TransactionBatch.Should().BeEquivalentTo(The<Transaction>());
ProjectionExceptions.Should().ContainSingle()
.Which.Should().BeOfType<ProjectionException>()
.Which.TransactionBatch.Should().BeEquivalentTo(The<Transaction>());
}
}

Expand Down

0 comments on commit 1cd60ad

Please sign in to comment.