Skip to content

Commit

Permalink
Adds an API to track a projector's progress
Browse files Browse the repository at this point in the history
Statistics (#1)

Various improvements
  • Loading branch information
dennisdoomen committed Jul 13, 2017
1 parent 320afe8 commit f8c2cdc
Show file tree
Hide file tree
Showing 10 changed files with 751 additions and 1 deletion.
1 change: 1 addition & 0 deletions Src/LiquidProjections/LiquidProjections.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="LibLog" Version="4.2.6" />
<PackageReference Include="Microsoft.CSharp" Version="4.3.0" />
<PackageReference Include="System.Dynamic.Runtime" Version="4.3.0" />
</ItemGroup>
Expand Down
17 changes: 17 additions & 0 deletions Src/LiquidProjections/Statistics/Event.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System;

namespace LiquidProjections.Statistics
{
public class Event
{
public Event(string body, DateTime timestampUtc)
{
Body = body;
TimestampUtc = timestampUtc;
}

public string Body { get; }

public DateTime TimestampUtc { get; }
}
}
73 changes: 73 additions & 0 deletions Src/LiquidProjections/Statistics/ProjectionStats.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;

namespace LiquidProjections.Statistics
{
/// <summary>
/// Provides a thread-safe place to store all kinds of run-time information about the progress of a projector.
/// </summary>
public class ProjectionStats
{
private readonly Func<DateTime> nowUtc;
private readonly ConcurrentDictionary<string, ProjectorStats> stats = new ConcurrentDictionary<string, ProjectorStats>();

public ProjectionStats(Func<DateTime> nowUtc)
{
this.nowUtc = nowUtc;
}

/// <summary>
/// Should be called to track the progress of a projector and use that to calculate an ETA.
/// </summary>
public void TrackProgress(string projectorId, long checkpoint)
{
this[projectorId].TrackProgress(checkpoint, nowUtc());
}

/// <summary>
/// Can be used to store projector-specific properties that characterize the projector's configuration or state.
/// </summary>
/// <remarks>
/// Each property is identified by a <paramref name="name"/>. This class only keeps the latest value
/// for each property.
/// </remarks>
public void StoreProperty(string projectorId, string name, string value)
{
this[projectorId].StoreProperty(name, value, nowUtc());
}

/// <summary>
/// Can be used to store information that happened that can help diagnose the state or failure of a projector.
/// </summary>
public void LogEvent(string projectorId, string body)
{
this[projectorId].LogEvent(body, nowUtc());
}

/// <summary>
/// Calculates the expected time for the projector identified by <paramref name="projectorId"/> to reach a
/// certain <paramref name="targetCheckpoint"/> based on a weighted average over the last
/// ten minutes, or <c>null</c> if there is not enough information yet. Use <see cref="TrackProgress"/> to report
/// progress.
/// </summary>
public TimeSpan? GetTimeToReach(string projectorId, long targetCheckpoint)
{
return this[projectorId].GetTimeToReach(targetCheckpoint);
}

private ProjectorStats this[string projectorId]
{
get
{
return stats.GetOrAdd(projectorId, id => new ProjectorStats(id));
}
}

public IEnumerable<ProjectorStats> GetForAllProjectors()
{
return stats.ToArray().Select(projectorStatsById => projectorStatsById.Value);
}
}
}
137 changes: 137 additions & 0 deletions Src/LiquidProjections/Statistics/ProjectorStats.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;

namespace LiquidProjections.Statistics
{
/// <summary>
/// Contains statistics and information about a particular projector.
/// </summary>
/// <remarks>
/// An instance of this class is safe for use in multi-threaded solutions.
/// </remarks>
public class ProjectorStats
{
private readonly object eventsSyncObject = new object();
private readonly object progressSyncObject = new object();
private readonly ConcurrentDictionary<string, Property> properties;
private readonly List<Event> events;

private readonly WeightedProjectionSpeedCalculator lastMinuteSamples =
new WeightedProjectionSpeedCalculator(12, TimeSpan.FromSeconds(5));

private readonly WeightedProjectionSpeedCalculator last10MinuteSamples
= new WeightedProjectionSpeedCalculator(9, TimeSpan.FromMinutes(1));

private TimestampedCheckpoint lastCheckpoint;

public ProjectorStats(string projectorId)
{
properties = new ConcurrentDictionary<string, Property>();
events = new List<Event>();
ProjectorId = projectorId;
}

public ProjectorStats(string projectorId, IDictionary<string, Property> properties, IEnumerable<Event> events)
{
this.properties = new ConcurrentDictionary<string, Property>(properties);
this.events = this.events.ToList();
ProjectorId = projectorId;
}

public string ProjectorId { get; }

public TimestampedCheckpoint LastCheckpoint
{
get
{
lock (progressSyncObject)
{
return lastCheckpoint;
}
}
}

/// <summary>
/// Gets a snapshot of the properties stored for this projector at the time of calling.
/// </summary>
public IDictionary<string, Property> GetProperties() => properties.ToArray().ToDictionary(p => p.Key, p => p.Value);

/// <summary>
/// Gets a snapshot of the events stored for this projector at the time of calling.
/// </summary>
public IReadOnlyList<Event> GetEvents()
{
lock (eventsSyncObject)
{
return events.ToReadOnly();
}
}

public void StoreProperty(string key, string value, DateTime timestampUtc)
{
properties[key] = new Property(value, timestampUtc);
}

public void LogEvent(string body, DateTime timestampUtc)
{
lock (eventsSyncObject)
{
events.Add(new Event(body, timestampUtc));
}
}

/// <summary>
/// Calculates the expected time for a projector to reach a certain <paramref name="targetCheckpoint"/> based
/// on a weighted average over the last ten minutes, or <c>null</c> if there is not enough information yet.
/// </summary>
public TimeSpan? GetTimeToReach(long targetCheckpoint)
{
lock (progressSyncObject)
{
if (lastCheckpoint == null)
{
return null;
}

if (targetCheckpoint <= lastCheckpoint.Checkpoint)
{
return TimeSpan.Zero;
}

float speed = lastMinuteSamples.GetWeightedSpeed();

speed = (speed == 0)
? last10MinuteSamples.GetWeightedSpeed()
: last10MinuteSamples.GetWeightedSpeedIncluding(speed);

if (speed == 0)
{
return null;
}

float secondsWithFractionalPart = (targetCheckpoint - lastCheckpoint.Checkpoint) / speed;

if (secondsWithFractionalPart > long.MaxValue)
{
return null;
}

long secondsWithoutFractionalPart = (long) secondsWithFractionalPart;

return TimeSpan.FromSeconds(secondsWithoutFractionalPart);
}
}

public void TrackProgress(long checkpoint, DateTime timestampUtc)
{
lock (progressSyncObject)
{
lastMinuteSamples.Record(checkpoint, timestampUtc);
last10MinuteSamples.Record(checkpoint, timestampUtc);
lastCheckpoint = new TimestampedCheckpoint(checkpoint, timestampUtc);
}
}
}
}
17 changes: 17 additions & 0 deletions Src/LiquidProjections/Statistics/Property.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System;

namespace LiquidProjections.Statistics
{
public class Property
{
public string Value { get; }

public DateTime TimestampUtc { get; }

public Property(string value, DateTime timestampUtc)
{
Value = value;
TimestampUtc = timestampUtc;
}
}
}
17 changes: 17 additions & 0 deletions Src/LiquidProjections/Statistics/TimestampedCheckpoint.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System;

namespace LiquidProjections.Statistics
{
public class TimestampedCheckpoint
{
public TimestampedCheckpoint(long checkpoint, DateTime timestampUtc)
{
Checkpoint = checkpoint;
TimestampUtc = timestampUtc;
}

public long Checkpoint { get; }

public DateTime TimestampUtc { get; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace LiquidProjections.Statistics
{
/// <summary>
/// Calculates the weighted speed in transactions per second.
/// </summary>
/// <remarks>
/// This class is not thread-safe.
/// A <see cref="Monitor"/> or another synchronization method should be used to ensure thread-safe usage.
/// </remarks>
public class WeightedProjectionSpeedCalculator
{
private readonly TimeSpan threshold;
private readonly int maxNrOfSamples;
private readonly Queue<float> samples = new Queue<float>();
private DateTime? lastSampleTimeStampUtc;
private long? lastCheckpoint;

public WeightedProjectionSpeedCalculator(int maxNrOfSamples, TimeSpan threshold)
{
this.maxNrOfSamples = maxNrOfSamples;
this.threshold = threshold;
}

private bool HasBaselineBeenSet => lastSampleTimeStampUtc != null;

public void Record(long checkpoint, DateTime timestampUtc)
{
if (HasBaselineBeenSet)
{
TimeSpan interval = timestampUtc - lastSampleTimeStampUtc.Value;

if (interval > threshold)
{
long delta = checkpoint - lastCheckpoint.Value;

samples.Enqueue((float) (delta / interval.TotalSeconds));

lastCheckpoint = checkpoint;
lastSampleTimeStampUtc = timestampUtc;

DiscardOlderSamples();
}
}
else
{
SetBaseline(checkpoint, timestampUtc);
}
}

private void SetBaseline(long checkpoint, DateTime timestampUtc)
{
lastCheckpoint = checkpoint;
lastSampleTimeStampUtc = timestampUtc;
}

private void DiscardOlderSamples()
{
while (samples.Count > maxNrOfSamples)
{
samples.Dequeue();
}
}

public float GetWeightedSpeedIncluding(float sample)
{
return GetWeightedSpeed(samples.Concat(new[] { sample }));
}

public float GetWeightedSpeed()
{
return GetWeightedSpeed(samples);
}

public float GetWeightedSpeed(IEnumerable<float> effectiveSamples)
{
float weightedSum = 0;
int weights = 0;
int weight = 0;

foreach (float sample in effectiveSamples)
{
weight++;
weights += weight;
weightedSum += sample * weight;
}

return weights == 0 ? 0 : weightedSum / weights;
}
}
}
5 changes: 4 additions & 1 deletion Tests/LiquidProjections.Specs/DispatcherSpecs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ public When_a_projector_throws_an_exception()

When(() =>
{
return The<MemoryEventSource>().Write(new List<Transaction>());
return The<MemoryEventSource>().Write(new List<Transaction>
{
new Transaction()
});
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
<Compile Include="DispatcherSpecs.cs" />
<Compile Include="EventMapSpecs.cs" />
<Compile Include="ProjectionExceptionSpecs.cs" />
<Compile Include="ProjectionStatsSpecs.cs" />
<Compile Include="ProjectorSpecs.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="TaskExtensions.cs" />
Expand Down
Loading

0 comments on commit f8c2cdc

Please sign in to comment.