Skip to content

Commit

Permalink
XML comments added for the memory event source
Browse files Browse the repository at this point in the history
  • Loading branch information
IharBury committed Sep 13, 2017
1 parent 2f5d228 commit 664d576
Showing 1 changed file with 121 additions and 0 deletions.
121 changes: 121 additions & 0 deletions Src/LiquidProjections.Testing/MemoryEventSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

namespace LiquidProjections.Testing
{
/// <summary>
/// An event source which stores all the transactions in memory and has methods which are convenient for testing.
/// </summary>
public class MemoryEventSource
{
private readonly int batchSize;
Expand All @@ -18,11 +21,36 @@ public class MemoryEventSource
private TaskCompletionSource<bool> historyGrowthTaskCompletionSource = new TaskCompletionSource<bool>();
private readonly object syncRoot = new object();

/// <summary>
/// Creates a new instance of the event source.
/// </summary>
/// <param name="batchSize">
/// The maximum number of transactions in batches which are handled by the subscribers. The default is 10.
/// </param>
public MemoryEventSource(int batchSize = 10)
{
this.batchSize = batchSize;
}

/// <summary>
/// Creates a new subscription which will handle all the transactions after the given checkpoint.
/// Waits (synchronously) for the subscription to process all the transactions that are already in the event source.
/// If the given <paramref name="lastProcessedCheckpoint"/> is ahead of the event source
/// and that is ignored by the handler,
/// waits (synchronously) for a replacement subscription to be created
/// and to process all the transactions that are already in the event source.
/// The replacement subscription is not returned.
/// The code that creates the replacement subscription is responsible for cancelling it.
/// </summary>
/// <param name="lastProcessedCheckpoint">
/// If has value, only the transactions with checkpoints greater than the given value will be processed.
/// </param>
/// <param name="subscriber">The <see cref="Subscriber"/> which will handle the transactions.</param>
/// <param name="subscriptionId">An arbitrary string identifying the subscription.</param>
/// <returns>
/// An object implementing the <see cref="IDisposable"/> interface.
/// Disposing the object will cancel the subscription asynchronously.
/// </returns>
public IDisposable Subscribe(long? lastProcessedCheckpoint, Subscriber subscriber, string subscriptionId)
{
return SubscribeAsync(lastProcessedCheckpoint, subscriber, subscriptionId)
Expand All @@ -31,6 +59,25 @@ public IDisposable Subscribe(long? lastProcessedCheckpoint, Subscriber subscribe
.GetResult();
}

/// <summary>
/// Creates a new subscription which will handle all the transactions after the given checkpoint.
/// Waits asynchronously for the subscription to process all the transactions that are already in the event source.
/// If the given <paramref name="lastProcessedCheckpoint"/> is ahead of the event source
/// and that is ignored by the handler,
/// waits asynchronously for a replacement subscription to be created
/// and to process all the transactions that are already in the event source.
/// The replacement subscription is not returned.
/// The code that creates the replacement subscription is responsible for cancelling it.
/// </summary>
/// <param name="lastProcessedCheckpoint">
/// If has value, only the transactions with checkpoints greater than the given value will be processed.
/// </param>
/// <param name="subscriber">The <see cref="Subscriber"/> which will handle the transactions.</param>
/// <param name="subscriptionId">An arbitrary string identifying the subscription.</param>
/// <returns>
/// A task that returns an object implementing the <see cref="IDisposable"/> interface.
/// Disposing the object will cancel the subscription asynchronously.
/// </returns>
public async Task<IDisposable> SubscribeAsync(long? lastProcessedCheckpoint, Subscriber subscriber, string subscriptionId)
{
Subscription subscription = SubscribeWithoutWaitingInternal(lastProcessedCheckpoint, subscriber, subscriptionId);
Expand Down Expand Up @@ -63,6 +110,22 @@ public async Task<IDisposable> SubscribeAsync(long? lastProcessedCheckpoint, Sub
return subscription;
}

/// <summary>
/// Creates a new subscription which will handle all the transactions after the given checkpoint.
/// Does not wait for the subscription to process any transactions.
/// If the given <paramref name="lastProcessedCheckpoint"/> is ahead of the event source
/// and that is ignored by the handler,
/// does not wait for a replacement subscription to be created.
/// </summary>
/// <param name="lastProcessedCheckpoint">
/// If has value, only the transactions with checkpoints greater than the given value will be processed.
/// </param>
/// <param name="subscriber">The <see cref="Subscriber"/> which will handle the transactions.</param>
/// <param name="subscriptionId">An arbitrary string identifying the subscription.</param>
/// <returns>
/// An object implementing the <see cref="IDisposable"/> interface.
/// Disposing the object will cancel the subscription asynchronously.
/// </returns>
public IDisposable SubscribeWithoutWaiting(long? lastProcessedCheckpoint, Subscriber subscriber, string subscriptionId)
{
return SubscribeWithoutWaitingInternal(lastProcessedCheckpoint, subscriber, subscriptionId);
Expand All @@ -81,13 +144,27 @@ private Subscription SubscribeWithoutWaitingInternal(long? lastProcessedCheckpoi
return subscription;
}

/// <summary>
/// Adds a new transaction containing the given events to the end of the event source.
/// An incremental checkpoint number is automatically generated for the transaction.
/// Waits for all the subscriptions to process the transaction.
/// </summary>
/// <param name="events">The events to be included into the transaction.</param>
/// <returns>A task returning the created transaction.</returns>
public async Task<Transaction> Write(params object[] events)
{
Transaction transaction = WriteWithoutWaiting(events);
await WaitForAllSubscriptions().ConfigureAwait(false);
return transaction;
}

/// <summary>
/// Adds a new transaction containing the given events to the end of the event source.
/// An incremental checkpoint number is automatically generated for the transaction.
/// Does not wait for the transaction to be processed by any subscriptions.
/// </summary>
/// <param name="events">The events to be included into the transaction.</param>
/// <returns>A task returning the created transaction.</returns>
public Transaction WriteWithoutWaiting(params object[] events)
{
Transaction transaction = new Transaction
Expand All @@ -103,12 +180,27 @@ public Transaction WriteWithoutWaiting(params object[] events)
return transaction;
}

/// <summary>
/// Adds the given transactions to the end of the event source.
/// If a transaction has <c>-1</c> instead of a valid checkpoint number,
/// an incremental checkpoint number is automatically generated for the transaction.
/// Waits for all the subscriptions to process the transactions.
/// </summary>
/// <param name="transactions">The transactions to be added.</param>
/// <returns>A task that completes after all the subscriptions have processed the transactions.</returns>
public Task Write(params Transaction[] transactions)
{
WriteWithoutWaiting(transactions);
return WaitForAllSubscriptions();
}

/// <summary>
/// Adds the given transactions to the end of the event source.
/// If a transaction has <c>-1</c> instead of a valid checkpoint number,
/// an incremental checkpoint number is automatically generated for the transaction.
/// Does not wait for the transaction to be processed by any subscriptions.
/// </summary>
/// <param name="transactions">The transactions to be added.</param>
public void WriteWithoutWaiting(params Transaction[] transactions)
{
if (transactions.Any())
Expand Down Expand Up @@ -144,13 +236,31 @@ public void WriteWithoutWaiting(params Transaction[] transactions)
}
}

/// <summary>
/// Adds a new transaction containing the given event to the end of the event source.
/// Allows to specify headers for the transaction.
/// An incremental checkpoint number is automatically generated for the transaction.
/// Waits for all the subscriptions to process the transaction.
/// </summary>
/// <param name="anEvent">The event to be included into the transaction.</param>
/// <param name="headers">The headers for the transaction.</param>
/// <returns>A task returning the created transaction.</returns>
public async Task<Transaction> WriteWithHeaders(object anEvent, IDictionary<string, object> headers)
{
Transaction transaction = WriteWithHeadersWithoutWaiting(anEvent, headers);
await WaitForAllSubscriptions().ConfigureAwait(false);
return transaction;
}

/// <summary>
/// Adds a new transaction containing the given event to the end of the event source.
/// Allows to specify headers for the transaction.
/// An incremental checkpoint number is automatically generated for the transaction.
/// Does not wait for the transaction to be processed by any subscriptions.
/// </summary>
/// <param name="anEvent">The event to be included into the transaction.</param>
/// <param name="headers">The headers for the transaction.</param>
/// <returns>A task returning the created transaction.</returns>
public Transaction WriteWithHeadersWithoutWaiting(object anEvent, IDictionary<string, object> headers)
{
Transaction transaction = new Transaction
Expand All @@ -170,6 +280,11 @@ public Transaction WriteWithHeadersWithoutWaiting(object anEvent, IDictionary<st
return transaction;
}

/// <summary>
/// Waits for all the subscriptions to process all the transaction which are already in the event source
/// but not yet processed by a subscription.
/// </summary>
/// <returns>A task that completes after all the subscriptions have processed the transactions.</returns>
public async Task WaitForAllSubscriptions()
{
List<Subscription> subscriptionsAtStart;
Expand All @@ -194,6 +309,12 @@ public async Task WaitForAllSubscriptions()
}
}

/// <summary>
/// Checks whether the event source has a non-cancelled subscription with the given identifier.
/// </summary>
/// <param name="subscriptionId">
/// The identifier of the subscription which was specified when the subscription was created.
/// </param>
public bool HasSubscriptionForId(string subscriptionId)
{
lock (syncRoot)
Expand Down

0 comments on commit 664d576

Please sign in to comment.