Skip to content

Commit

Permalink
Extended the projector stats with the weighted average speed.
Browse files Browse the repository at this point in the history
  • Loading branch information
dennisdoomen committed Sep 14, 2017
1 parent 9c1995a commit 33d29a1
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 31 deletions.
10 changes: 10 additions & 0 deletions Src/LiquidProjections/Statistics/ProjectionStats.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ public void LogEvent(string projectorId, string body)
this[projectorId].LogEvent(body, nowUtc());
}

/// <summary>
/// Gets the speed in transactions per minute based on a weighted average over the last
/// ten minutes, or <c>null</c> if there is not enough information yet.
/// </summary>
/// <param name="projectorId"></param>
public float? GetSpeed(string projectorId)
{
return this[projectorId].GetSpeed();
}

/// <summary>
/// Calculates the expected time for the projector identified by <paramref name="projectorId"/> to reach a
/// certain <paramref name="targetCheckpoint"/> based on a weighted average over the last
Expand Down
27 changes: 18 additions & 9 deletions Src/LiquidProjections/Statistics/ProjectorStats.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,21 @@ public TimestampedCheckpoint LastCheckpoint
}
}

/// <summary>
/// Calculates the speed in transactions per second based on a weighted average over the last 10 minutes
/// or <c>null</c> if no information is yet available.
/// </summary>
public float? GetSpeed()
{
float? speed = lastMinuteSamples.GetWeightedSpeed();

speed = (speed == null)
? last10MinuteSamples.GetWeightedSpeed()
: last10MinuteSamples.GetWeightedSpeedIncluding(speed.Value);

return speed;
}

/// <summary>
/// Gets a snapshot of the properties stored for this projector at the time of calling.
/// </summary>
Expand Down Expand Up @@ -100,20 +115,14 @@ public void LogEvent(string body, DateTime timestampUtc)
{
return TimeSpan.Zero;
}

float speed = lastMinuteSamples.GetWeightedSpeed();

speed = (speed == 0)
? last10MinuteSamples.GetWeightedSpeed()
: last10MinuteSamples.GetWeightedSpeedIncluding(speed);

if (speed == 0)
float? speed = GetSpeed();
if (speed == null)
{
return null;
}

float secondsWithFractionalPart = (targetCheckpoint - lastCheckpoint.Checkpoint) / speed;

float secondsWithFractionalPart = (targetCheckpoint - lastCheckpoint.Checkpoint) / speed.Value;
if (secondsWithFractionalPart > long.MaxValue)
{
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,17 @@ private void DiscardOlderSamples()
}
}

public float GetWeightedSpeedIncluding(float sample)
public float? GetWeightedSpeedIncluding(float sample)
{
return GetWeightedSpeed(samples.Concat(new[] { sample }));
}

public float GetWeightedSpeed()
public float? GetWeightedSpeed()
{
return GetWeightedSpeed(samples);
}

public float GetWeightedSpeed(IEnumerable<float> effectiveSamples)
public float? GetWeightedSpeed(IEnumerable<float> effectiveSamples)
{
float weightedSum = 0;
int weights = 0;
Expand All @@ -89,7 +89,7 @@ public float GetWeightedSpeed(IEnumerable<float> effectiveSamples)
weightedSum += sample * weight;
}

return weights == 0 ? 0 : weightedSum / weights;
return (weights == 0) ? (float?) null : weightedSum / weights;
}
}
}
20 changes: 10 additions & 10 deletions Tests/LiquidProjections.Specs/EventMapSpecs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ await map.Handle(
[Fact]
public void It_should_properly_pass_the_mapping_to_the_creating_handler()
{
projection.ShouldBeEquivalentTo(new
projection.Should().BeEquivalentTo(new
{
Id = "c350E",
Category = "Hybrids",
Expand Down Expand Up @@ -146,7 +146,7 @@ await map.Handle(
[Fact]
public void It_should_properly_pass_the_mapping_to_the_creating_handler()
{
projection.ShouldBeEquivalentTo(new
projection.Should().BeEquivalentTo(new
{
Id = "c350E",
Category = "Hybrids",
Expand Down Expand Up @@ -227,7 +227,7 @@ await map.Handle(
[Fact]
public void It_should_properly_pass_the_mapping_to_the_creating_handler()
{
projection.ShouldBeEquivalentTo(new
projection.Should().BeEquivalentTo(new
{
Id = "c350E",
Category = "Hybrids",
Expand Down Expand Up @@ -305,7 +305,7 @@ await map.Handle(
[Fact]
public void It_should_properly_pass_the_mapping_to_the_updating_handler()
{
projection.ShouldBeEquivalentTo(new
projection.Should().BeEquivalentTo(new
{
Id = "c350E",
Category = "Hybrids",
Expand Down Expand Up @@ -387,7 +387,7 @@ await map.Handle(
[Fact]
public void It_should_properly_pass_the_mapping_to_the_updating_handler()
{
projection.ShouldBeEquivalentTo(new
projection.Should().BeEquivalentTo(new
{
Id = "c350E",
Category = "Hybrids",
Expand Down Expand Up @@ -461,7 +461,7 @@ await map.Handle(
[Fact]
public void It_should_properly_pass_the_mapping_to_the_deleting_handler()
{
projection.ShouldBeEquivalentTo(new
projection.Should().BeEquivalentTo(new
{
Id = "c350E",
Category = (string) null,
Expand Down Expand Up @@ -528,7 +528,7 @@ await map.Handle(
[Fact]
public void It_should_properly_pass_the_mapping_to_the_deleting_handler()
{
projection.ShouldBeEquivalentTo(new
projection.Should().BeEquivalentTo(new
{
Id = "c350E",
Category = (string)null,
Expand Down Expand Up @@ -629,7 +629,7 @@ await map.Handle(
[Fact]
public void It_should_not_invoke_any_handler()
{
projection.ShouldBeEquivalentTo(new
projection.Should().BeEquivalentTo(new
{
Id = "c350E",
Category = "Electrics",
Expand Down Expand Up @@ -680,7 +680,7 @@ await map.Handle(
[Fact]
public void It_should_invoke_the_right_handler()
{
projection.ShouldBeEquivalentTo(new
projection.Should().BeEquivalentTo(new
{
Id = "c350E",
Category = "Hybrids",
Expand Down Expand Up @@ -907,7 +907,7 @@ await map.Handle(
[Fact]
public void It_should_invoke_the_right_handler()
{
projection.ShouldBeEquivalentTo(new
projection.Should().BeEquivalentTo(new
{
Id = "c350E",
Category = "Hybrids",
Expand Down
21 changes: 15 additions & 6 deletions Tests/LiquidProjections.Specs/ProjectionStatsSpecs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void When_multiple_properties_are_registered_under_the_same_name_it_shoul
var projectorStats = stats.GetForAllProjectors().Should().ContainSingle(s => s.ProjectorId == "myProjector").Subject;

projectorStats.GetProperties().Should().ContainKey("theName");
projectorStats.GetProperties()["theName"].ShouldBeEquivalentTo(new
projectorStats.GetProperties()["theName"].Should().BeEquivalentTo(new
{
Value = "anotherValue",
TimestampUtc = nowUtc
Expand Down Expand Up @@ -86,14 +86,14 @@ public void When_multiple_properties_are_registered_under_different_names_it_sho
var projectorStats = stats.GetForAllProjectors().Should().ContainSingle(s => s.ProjectorId == "myProjector").Subject;

projectorStats.GetProperties().Should().ContainKey("aName");
projectorStats.GetProperties()["aName"].ShouldBeEquivalentTo(new
projectorStats.GetProperties()["aName"].Should().BeEquivalentTo(new
{
Value = "aValue",
TimestampUtc = firstUtc
});

projectorStats.GetProperties().Should().ContainKey("anotherName");
projectorStats.GetProperties()["anotherName"].ShouldBeEquivalentTo(new
projectorStats.GetProperties()["anotherName"].Should().BeEquivalentTo(new
{
Value = "anotherValue",
TimestampUtc = nowUtc
Expand Down Expand Up @@ -122,7 +122,7 @@ public void When_multiple_events_are_registered_it_should_remember_their_timesta
// Assert
//-----------------------------------------------------------------------------------------------------------
var projectorStats = stats.GetForAllProjectors().Should().ContainSingle(s => s.ProjectorId == "myProjector").Subject;
projectorStats.GetEvents().ShouldAllBeEquivalentTo(new[]
projectorStats.GetEvents().Should().BeEquivalentTo(new[]
{
new
{
Expand Down Expand Up @@ -168,6 +168,8 @@ public void When_the_projector_runs_at_a_constant_speed_it_should_use_that_to_ca
//-----------------------------------------------------------------------------------------------------------
TimeSpan? eta = stats.GetTimeToReach("myProjector", 100000);

stats.GetSpeed("myProjector").Should().Be(transactionsPerSecond);

long secondsToComplete = (100000 - checkpoint) / transactionsPerSecond;

eta.Should().Be(TimeSpan.FromSeconds(secondsToComplete));
Expand Down Expand Up @@ -202,6 +204,7 @@ public void When_the_projector_runs_at_a_very_low_speed_it_should_still_calculat
//-----------------------------------------------------------------------------------------------------------
// Assert
//-----------------------------------------------------------------------------------------------------------

TimeSpan? eta = stats.GetTimeToReach("myProjector", 100000);

long secondsToComplete =((100000 - checkpoint) / transactionsPer5Seconds * 5);
Expand Down Expand Up @@ -242,6 +245,8 @@ public void When_the_projectors_speed_increases_it_should_favor_the_higher_speed
TimeSpan? eta = stats.GetTimeToReach("myProjector", checkpoint + 100000);

long weightedAveragePerSecond = 4550;
stats.GetSpeed("myProjector").Should().Be(weightedAveragePerSecond);


long secondsToComplete = 100000 / weightedAveragePerSecond;

Expand Down Expand Up @@ -281,6 +286,7 @@ public void When_the_projectors_speed_decreases_it_should_favor_the_lower_speed_
TimeSpan? eta = stats.GetTimeToReach("myProjector", checkpoint + 100000);

long weightedAveragePerSecond = 645;
stats.GetSpeed("myProjector").Should().Be(weightedAveragePerSecond);

long secondsToComplete = 100000 / weightedAveragePerSecond;

Expand Down Expand Up @@ -342,6 +348,7 @@ public void When_the_projector_runs_for_more_than_10_minutes_it_should_only_eval
long secondsToComplete = (long) (100000 / precalculatedWeightedAveragePerSecond);

eta.Should().Be(TimeSpan.FromSeconds(secondsToComplete));
stats.GetSpeed("myProjector").Should().BeApproximately(precalculatedWeightedAveragePerSecond, 1);
}

[Fact]
Expand All @@ -363,8 +370,8 @@ public void When_the_projector_is_ahead_of_the_requested_checkpoint_the_eta_shou
//-----------------------------------------------------------------------------------------------------------
// Assert
//-----------------------------------------------------------------------------------------------------------
TimeSpan? eta = stats.GetTimeToReach("myProjector", 5000);
eta.Should().Be(TimeSpan.Zero);
stats.GetSpeed("myProjector").Should().Be((10000-1000) / 120);
stats.GetTimeToReach("myProjector", 5000).Should().Be(TimeSpan.Zero);
}

[Fact]
Expand All @@ -384,6 +391,8 @@ public void When_the_projector_has_not_checked_in_yet_it_should_not_provide_an_e
// Assert
//-----------------------------------------------------------------------------------------------------------
eta.Should().NotHaveValue();

stats.GetSpeed("myProjector").Should().BeNull();
}
}
}
4 changes: 2 additions & 2 deletions Tests/LiquidProjections.Specs/ProjectorSpecs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void Then_it_should_have_executed_the_custom_action()
[Fact]
public void Then_it_should_have_created_the_context()
{
context.ShouldBeEquivalentTo(new ProjectionContext
context.Should().BeEquivalentTo(new ProjectionContext
{
Checkpoint = 111,
TransactionId = "MyTransactionId",
Expand Down Expand Up @@ -276,7 +276,7 @@ public void Then_it_should_include_the_current_transaction_batch_into_the_projec
{
ProjectionExceptions.Should().ContainSingle()
.Which.Should().BeOfType<ProjectionException>()
.Which.TransactionBatch.Should().BeEquivalentTo(The<Transaction>());
.Which.TransactionBatch.Should().AllBeEquivalentTo(The<Transaction>());
}
}

Expand Down

0 comments on commit 33d29a1

Please sign in to comment.