Skip to content

Commit

Permalink
Merge pull request #96 from dennisdoomen/SupportAutorestart
Browse files Browse the repository at this point in the history
Detect and recover projectors that are ahead of the event store.
  • Loading branch information
dennisdoomen authored May 29, 2017
2 parents 34009c4 + f06507d commit 6e68f84
Show file tree
Hide file tree
Showing 18 changed files with 458 additions and 139 deletions.
6 changes: 5 additions & 1 deletion Samples/ExampleHost/App.config
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
</dependentAssembly>
<dependentAssembly>
<assemblyIdentity name="Microsoft.Owin" publicKeyToken="31bf3856ad364e35" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-3.0.1.0" newVersion="3.0.1.0" />
<bindingRedirect oldVersion="0.0.0.0-3.1.0.0" newVersion="3.1.0.0" />
</dependentAssembly>
<dependentAssembly>
<assemblyIdentity name="System.Web.Http.Owin" publicKeyToken="31bf3856ad364e35" culture="neutral" />
Expand All @@ -25,6 +25,10 @@
<assemblyIdentity name="Microsoft.Owin.Hosting" publicKeyToken="31bf3856ad364e35" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-3.0.1.0" newVersion="3.0.1.0" />
</dependentAssembly>
<dependentAssembly>
<assemblyIdentity name="Newtonsoft.Json" publicKeyToken="30ad4fe6b2a6aeed" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-10.0.0.0" newVersion="10.0.0.0" />
</dependentAssembly>
</assemblyBinding>
</runtime>
</configuration>
2 changes: 1 addition & 1 deletion Samples/ExampleHost/CountsProjector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public CountsProjector(Dispatcher dispatcher, InMemoryDatabase store)

public void Start()
{
dispatcher.Subscribe(null, async transactions =>
dispatcher.Subscribe(null, async (transactions, info) =>
{
await documentCountProjector.Handle(transactions);
});
Expand Down
25 changes: 10 additions & 15 deletions Samples/ExampleHost/ExampleHost.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,20 @@
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
</PropertyGroup>
<ItemGroup>
<Reference Include="Microsoft.Owin, Version=3.0.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\..\packages\Microsoft.Owin.3.0.1\lib\net45\Microsoft.Owin.dll</HintPath>
<Private>True</Private>
<Reference Include="Microsoft.Owin, Version=3.1.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\..\packages\Microsoft.Owin.3.1.0\lib\net45\Microsoft.Owin.dll</HintPath>
</Reference>
<Reference Include="Microsoft.Owin.Diagnostics, Version=3.0.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\..\packages\Microsoft.Owin.Diagnostics.3.0.1\lib\net45\Microsoft.Owin.Diagnostics.dll</HintPath>
<Private>True</Private>
<Reference Include="Microsoft.Owin.Diagnostics, Version=3.1.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\..\packages\Microsoft.Owin.Diagnostics.3.1.0\lib\net45\Microsoft.Owin.Diagnostics.dll</HintPath>
</Reference>
<Reference Include="Microsoft.Owin.Host.HttpListener, Version=3.0.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\..\packages\Microsoft.Owin.Host.HttpListener.3.0.1\lib\net45\Microsoft.Owin.Host.HttpListener.dll</HintPath>
<Private>True</Private>
<Reference Include="Microsoft.Owin.Host.HttpListener, Version=3.1.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\..\packages\Microsoft.Owin.Host.HttpListener.3.1.0\lib\net45\Microsoft.Owin.Host.HttpListener.dll</HintPath>
</Reference>
<Reference Include="Microsoft.Owin.Hosting, Version=3.0.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\..\packages\Microsoft.Owin.Hosting.3.0.1\lib\net45\Microsoft.Owin.Hosting.dll</HintPath>
<Private>True</Private>
<Reference Include="Microsoft.Owin.Hosting, Version=3.1.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\..\packages\Microsoft.Owin.Hosting.3.1.0\lib\net45\Microsoft.Owin.Hosting.dll</HintPath>
</Reference>
<Reference Include="Newtonsoft.Json, Version=6.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
<HintPath>..\..\packages\Newtonsoft.Json.6.0.4\lib\net45\Newtonsoft.Json.dll</HintPath>
<Private>True</Private>
<Reference Include="Newtonsoft.Json, Version=10.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
<HintPath>..\..\packages\Newtonsoft.Json.10.0.2\lib\net45\Newtonsoft.Json.dll</HintPath>
</Reference>
<Reference Include="Owin, Version=1.0.0.0, Culture=neutral, PublicKeyToken=f0ebd12fd5e55cc5, processorArchitecture=MSIL">
<HintPath>..\..\packages\Owin.1.0\lib\net40\Owin.dll</HintPath>
Expand Down
17 changes: 10 additions & 7 deletions Samples/ExampleHost/JsonFileEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Linq;
using System.Runtime.Serialization.Formatters;
using System.Threading.Tasks;
using LiquidProjections.Abstractions;
using Newtonsoft.Json;

namespace LiquidProjections.ExampleHost
Expand All @@ -25,9 +26,11 @@ public JsonFileEventStore(string filePath, int pageSize)
entryQueue = new Queue<ZipArchiveEntry>(zip.Entries.Where(e => e.Name.EndsWith(".json")));
}

public IDisposable Subscribe(long? lastProcessedCheckpoint, Func<IReadOnlyList<Transaction>, Task> handler, string subscriptionId)
public IDisposable Subscribe(long? lastProcessedCheckpoint, Subscriber subscriber, string subscriptionId)
{
var subscriber = new Subscriber(lastProcessedCheckpoint ?? 0, handler);
var subscription = new Subscription(
lastProcessedCheckpoint ?? 0,
transactions => subscriber.HandleTransactions(transactions, null));

Task.Run(async () =>
{
Expand All @@ -39,13 +42,13 @@ public IDisposable Subscribe(long? lastProcessedCheckpoint, Func<IReadOnlyList<T
// Start loading the next page on a separate thread while we have the subscriber handle the previous transactions.
loader = LoadNextPageAsync();

await subscriber.Send(transactions);
await subscription.Send(transactions);

transactions = await loader;
}
});

return subscriber;
return subscription;
}

private Task<Transaction[]> LoadNextPageAsync()
Expand All @@ -72,7 +75,7 @@ private Task<Transaction[]> LoadNextPageAsync()
Body = JsonConvert.DeserializeObject(json, new JsonSerializerSettings
{
TypeNameHandling = TypeNameHandling.All,
TypeNameAssemblyFormat = FormatterAssemblyStyle.Full
TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Full
})
});
}
Expand Down Expand Up @@ -105,13 +108,13 @@ public void Dispose()
zip = null;
}

internal class Subscriber : IDisposable
internal class Subscription : IDisposable
{
private readonly long lastProcessedCheckpoint;
private readonly Func<IReadOnlyList<Transaction>, Task> handler;
private bool disposed;

public Subscriber(long lastProcessedCheckpoint, Func<IReadOnlyList<Transaction>, Task> handler)
public Subscription(long lastProcessedCheckpoint, Func<IReadOnlyList<Transaction>, Task> handler)
{
this.lastProcessedCheckpoint = lastProcessedCheckpoint;
this.handler = handler;
Expand Down
12 changes: 6 additions & 6 deletions Samples/ExampleHost/packages.config
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
<package id="Microsoft.AspNet.WebApi.Client" version="5.2.3" targetFramework="net45" />
<package id="Microsoft.AspNet.WebApi.Core" version="5.2.3" targetFramework="net45" />
<package id="Microsoft.AspNet.WebApi.Owin" version="5.2.3" targetFramework="net452" />
<package id="Microsoft.Owin" version="3.0.1" targetFramework="net452" />
<package id="Microsoft.Owin.Diagnostics" version="3.0.1" targetFramework="net452" />
<package id="Microsoft.Owin.Host.HttpListener" version="3.0.1" targetFramework="net452" />
<package id="Microsoft.Owin.Hosting" version="3.0.1" targetFramework="net452" />
<package id="Microsoft.Owin.SelfHost" version="3.0.1" targetFramework="net452" />
<package id="Newtonsoft.Json" version="6.0.4" targetFramework="net45" />
<package id="Microsoft.Owin" version="3.1.0" targetFramework="net45" />
<package id="Microsoft.Owin.Diagnostics" version="3.1.0" targetFramework="net45" />
<package id="Microsoft.Owin.Host.HttpListener" version="3.1.0" targetFramework="net45" />
<package id="Microsoft.Owin.Hosting" version="3.1.0" targetFramework="net45" />
<package id="Microsoft.Owin.SelfHost" version="3.1.0" targetFramework="net45" />
<package id="Newtonsoft.Json" version="10.0.2" targetFramework="net45" />
<package id="Owin" version="1.0" targetFramework="net452" />
<package id="TinyIoC" version="1.3" targetFramework="net452" developmentDependency="true" />
</packages>
41 changes: 34 additions & 7 deletions Src/LiquidProjections.Abstractions/CreateSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,40 @@ namespace LiquidProjections.Abstractions
/// <summary>
/// Creates a subscription on an event store, starting at the transaction following
/// <paramref name="lastProcessedCheckpoint"/>, identified by <paramref name="subscriptionId"/>, and which
/// passed transactions to the provided <paramref name="handler"/>.
/// passes any transactions to the provided <paramref name="subscriber"/>.
/// </summary>
/// <param name="lastProcessedCheckpoint"></param>
/// <param name="handler"></param>
/// <param name="subscriptionId"></param>
/// <returns></returns>
/// <param name="lastProcessedCheckpoint">
/// The checkpoint of the transaction the subscriber has last seen, or <c>null</c> to start from the beginning.
/// </param>
/// <param name="subscriber">
/// An object wrapping the various handlers that the subscription will use.
/// </param>
/// <param name="subscriptionId">
/// Identifies this subscription and helps distinct multiple subscriptions.
/// </param>
/// <returns>
/// A disposable object that will cancel the subscription.
/// </returns>
public delegate IDisposable CreateSubscription(long? lastProcessedCheckpoint,
Func<IReadOnlyList<Transaction>, Task> handler,
string subscriptionId);
Subscriber subscriber, string subscriptionId);

public class Subscriber
{
/// <summary>
/// Represents a handler that will receive the transactions that the event store pushes to the subscriber.
/// </summary>
public Func<IReadOnlyList<Transaction>, SubscriptionInfo, Task> HandleTransactions { get; set; }

/// <summary>
/// Represents a handler that the event store will use if the requested checkpoint does not exist.
/// </summary>
public Func<SubscriptionInfo, Task> NoSuchCheckpoint { get; set; }
}

public class SubscriptionInfo
{
public string Id { get; set; }

public IDisposable Subscription { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<ProjectConfiguration>
<Settings>
<PreviouslyBuiltSuccessfully>True</PreviouslyBuiltSuccessfully>
</Settings>
</ProjectConfiguration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<ProjectConfiguration>
<Settings>
<PreviouslyBuiltSuccessfully>True</PreviouslyBuiltSuccessfully>
</Settings>
</ProjectConfiguration>
66 changes: 48 additions & 18 deletions Src/LiquidProjections.Testing/MemoryEventSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,50 @@
using System.Globalization;
using System.Linq;
using System.Threading.Tasks;
using LiquidProjections.Testing;
using LiquidProjections.Abstractions;

namespace LiquidProjections
namespace LiquidProjections.Testing
{
public class MemoryEventSource
{
private readonly int batchSize;
private long lastCheckpoint;

private readonly List<Subscriber> subscribers = new List<Subscriber>();
private readonly List<Subscription> subscribers = new List<Subscription>();
private readonly List<Transaction> history = new List<Transaction>();

public MemoryEventSource(int batchSize = 10)
{
this.batchSize = batchSize;
}

public IDisposable Subscribe(long? lastProcessedCheckpoint, Func<IReadOnlyList<Transaction>, Task> handler, string subscriptionId)
public IDisposable Subscribe(long? lastProcessedCheckpoint, Subscriber subscriber, string subscriptionId)
{
lastCheckpoint = lastProcessedCheckpoint ?? 0;
var subscriber = new Subscriber(lastCheckpoint, batchSize, handler);
var subscription = new Subscription(lastCheckpoint, batchSize, subscriber, subscriptionId);

subscribers.Add(subscriber);
subscribers.Add(subscription);

Func<Task> asyncAction = async () =>
async Task AsyncAction()
{
if (history.LastOrDefault()?.Checkpoint < lastProcessedCheckpoint)
{
await subscriber.NoSuchCheckpoint(new SubscriptionInfo
{
Id = subscriptionId,
Subscription = subscription
});
}

foreach (Transaction transaction in history)
{
await subscriber.Send(new[] { transaction }).ConfigureAwait(false);
await subscription.Send(new[] {transaction}).ConfigureAwait(false);
}
};
}

asyncAction().ConfigureAwait(false).GetAwaiter().GetResult();
AsyncAction().ConfigureAwait(false).GetAwaiter().GetResult();

return subscriber;
return subscription;
}


Expand Down Expand Up @@ -102,29 +111,46 @@ public async Task<Transaction> WriteWithHeaders(object anEvent, IDictionary<stri

return transaction;
}

public bool HasSubscriptionForId(string subscriptionId)
{
Subscription subscription = subscribers.SingleOrDefault(s => s.Id == subscriptionId);
return (subscription != null) && !subscription.IsDisposed;
}
}

internal class Subscriber : IDisposable
internal class Subscription : IDisposable
{
private readonly long lastProcessedCheckpoint;
private readonly int batchSize;
private readonly Func<IReadOnlyList<Transaction>, Task> handler;
private readonly Subscriber subscriber;
private readonly string subscriptionId;
private bool disposed = false;

public Subscriber(long lastProcessedCheckpoint, int batchSize, Func<IReadOnlyList<Transaction>, Task> handler)

public Subscription(long lastProcessedCheckpoint, int batchSize,
Subscriber subscriber, string subscriptionId)
{
this.lastProcessedCheckpoint = lastProcessedCheckpoint;
this.batchSize = batchSize;
this.handler = handler;
this.subscriber = subscriber;
this.subscriptionId = subscriptionId;
}

public async Task Send(IEnumerable<Transaction> transactions)
{
if (!disposed)
{
foreach (var batch in transactions.Where(t => t.Checkpoint > lastProcessedCheckpoint).InBatchesOf(batchSize))
var subscriptionInfo = new SubscriptionInfo
{
await handler(new ReadOnlyCollection<Transaction>(batch.ToList())).ConfigureAwait(false);
Id = subscriptionId,
Subscription = this
};

Transaction[] requestedTransactions = transactions.Where(t => t.Checkpoint > lastProcessedCheckpoint).ToArray();
foreach (var batch in requestedTransactions.InBatchesOf(batchSize))
{
await subscriber.HandleTransactions(new ReadOnlyCollection<Transaction>(batch.ToList()), subscriptionInfo)
.ConfigureAwait(false);
}
}
}
Expand All @@ -133,5 +159,9 @@ public void Dispose()
{
disposed = true;
}

public bool IsDisposed => disposed;

public string Id => subscriptionId;
}
}
Loading

0 comments on commit 6e68f84

Please sign in to comment.