Skip to content


Add backoff impl
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Nov 30, 2021
1 parent 9b98bfc commit 372c82f
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 17 deletions.
2 changes: 1 addition & 1 deletion src/Propulsion.Cosmos/CosmosPruner.fs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ module Pruner =
totalDeferred <- totalDeferred + deferred

/// Used to render exceptions that don't fall into the rate-limiting or timed-out categories
override _.HandleExn(log, exn) =
override _.HandleExn(log, _stream, exn) =
match classify exn with
| ExceptionKind.RateLimited | ExceptionKind.TimedOut ->
() // Outcomes are already included in the statistics - no logging is warranted
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.CosmosStore/CosmosStorePruner.fs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ module Pruner =
totalDeferred <- totalDeferred + deferred

/// Used to render exceptions that don't fall into the rate-limiting or timed-out categories
override _.HandleExn(log, exn) =
override _.HandleExn(log, _stream, exn) =
match classify exn with
| ExceptionKind.RateLimited | ExceptionKind.TimedOut ->
() // Outcomes are already included in the statistics - no logging is warranted
Expand Down
62 changes: 49 additions & 13 deletions src/Propulsion/Streams.fs
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,10 @@ module Scheduling =
let timeSpanFromStopwatchTicks = function
| ticks when ticks > 0L -> TimeSpan.FromSeconds(double ticks / ticksPerSecond)
| _ -> TimeSpan.Zero
let private dateTimeOffsetToTimeStamp (dto : DateTimeOffset) : int64 =
let now, nowTs = DateTimeOffset.UtcNow, System.Diagnostics.Stopwatch.GetTimestamp()
let totalWaitS = (dto - now).TotalSeconds
nowTs + int64 (totalWaitS * ticksPerSecond)
type private StreamState = { ts : int64; mutable count : int }
let private walkAges (state : Dictionary<_, _>) =
let now = System.Diagnostics.Stopwatch.GetTimestamp()
Expand Down Expand Up @@ -548,31 +552,54 @@ module Scheduling =
| true, v -> v.count <- v.count + 1
| false, _ -> state.Add(sn, { ts = startTs; count = 1 })
member _.State = walkAges state |> renderState
/// Maintains a list of streams that have been marked to backing off on processing
type private Waiting() =
let state = Dictionary<FsCodec.StreamName, StreamState>() // NOTE ts is a cutoff time, not a start time here
let prune cutoff =
for sn in [| for kv in state do if kv.Value.ts <= cutoff then kv.Key |] do
state.Remove sn |> ignore
let walk now = if state.Count = 0 then Seq.empty else seq { for x in state.Values -> struct (x.ts - now, x.count) }
member _.HandleBackOff(sn, untilTs) = state.Add(sn, { ts = untilTs; count = 1 })
member _.CanDispatch(sn, ts) =
match state.TryGetValue sn with
| true, { ts = until } when until > ts -> false
| true, _ -> state.Remove sn |> ignore; true
| false, _ -> true
member _.State : (int * int) * (TimeSpan * TimeSpan) =
let now = System.Diagnostics.Stopwatch.GetTimestamp()
prune now
walk now |> renderState
type Monitor() =
let active, failing, stuck = Active(), Repeating(), Repeating()
let active, failing, stuck, waiting = Active(), Repeating(), Repeating(), Waiting()
let emit (log : ILogger) state (streams, attempts) (oldest : TimeSpan, newest : TimeSpan) =
log.Information(" {state} {streams} for {newest:n1}-{oldest:n1}s, {attempts} attempts",
state, streams, newest.TotalSeconds, oldest.TotalSeconds, attempts)
member _.CanDispatch(sn, ts) =
waiting.CanDispatch(sn, ts)
member _.HandleStarted(sn, ts) =
active.HandleStarted(sn, ts)
member _.HandleResult(sn, succeeded, progressed) =
let startTs = active.TakeFinished(sn)
failing.HandleResult(sn, not succeeded, startTs)
stuck.HandleResult(sn, succeeded && not progressed, startTs)
member _.HandleBackoff(sn, untilTs) =
waiting.HandleBackOff(sn, dateTimeOffsetToTimeStamp untilTs)
member _.DumpState(log : ILogger) =
let inline dump state (streams, attempts) ages =
if streams <> 0 then
emit log state (streams, attempts) ages
active.State ||> dump "active"
failing.State ||> dump "failing"
stuck.State ||> dump "stalled"
waiting.State ||> dump "waiting"
member _.EmitMetrics(log : ILogger) =
let inline report state (streams, attempts) (oldest : TimeSpan, newest : TimeSpan) =
let m = Log.Metric.Stuck (state, streams, oldest.TotalSeconds, newest.TotalSeconds)
emit (log |> Log.metric m) state (streams, attempts) (oldest, newest)
active.State ||> report "active"
failing.State ||> report "failing"
stuck.State ||> report "stalled"
waiting.State ||> report "waiting"

/// Gathers stats pertaining to the core projection/ingestion activity
Expand Down Expand Up @@ -621,6 +648,14 @@ module Scheduling =
if stucksDue () then
mon.EmitMetrics metricsLog

abstract BackoffUntil : stream : FsCodec.StreamName * until : DateTimeOffset -> unit
default _.BackoffUntil(stream, until) =
mon.HandleBackoff(stream, until)

/// Enables one to configure backoffs for streams that are failing
abstract CanDispatch : stream : FsCodec.StreamName * stopwatchTicks : int64 -> bool
default _.CanDispatch(stream, stopwatchTicks) = mon.CanDispatch(stream, stopwatchTicks)

abstract MarkStarted : stream : FsCodec.StreamName * stopwatchTicks : int64 -> unit
default _.MarkStarted(stream, stopwatchTicks) =
mon.HandleStarted(stream, stopwatchTicks)
Expand Down Expand Up @@ -681,27 +716,28 @@ module Scheduling =
let inner = DopDispatcher<TimeSpan * FsCodec.StreamName * bool * 'R>(maxDop)

// On each iteration, we try to fill the in-flight queue, taking the oldest and/or heaviest streams first
let tryFillDispatcher (pending, markStarted) project markBusy =
let tryFillDispatcher (pending, canDispatchAt, markStarted) project markBusy =
let mutable hasCapacity, dispatched = inner.HasCapacity, false
if hasCapacity then
let potential : seq<DispatchItem<byte[]>> = pending ()
let xs = potential.GetEnumerator()
let ts = System.Diagnostics.Stopwatch.GetTimestamp()
while xs.MoveNext() && hasCapacity do
let item = xs.Current
let succeeded = inner.TryAdd(project ts item)
if succeeded then
markStarted (, ts)
hasCapacity <- succeeded
dispatched <- dispatched || succeeded // if we added any request, we'll skip sleeping
if canDispatchAt (, ts) then
let succeeded = inner.TryAdd(project ts item)
if succeeded then
markStarted (, ts)
hasCapacity <- succeeded
dispatched <- dispatched || succeeded // if we added any request, we'll skip sleeping
hasCapacity, dispatched

member _.Pump() = inner.Pump()
[<CLIEvent>] member _.Result = inner.Result
member _.State = inner.State
member _.TryReplenish (pending, markStarted) project markStreamBusy =
tryFillDispatcher (pending, markStarted) project markStreamBusy
member _.TryReplenish (pending, canDispatchAt, markStarted) project markStreamBusy =
tryFillDispatcher (pending, canDispatchAt, markStarted) project markStreamBusy

/// Defines interface between Scheduler (which owns the pending work) and the Dispatcher which periodically selects work to commence based on a policy
type IDispatcher<'P, 'R, 'E> =
Expand Down Expand Up @@ -739,7 +775,7 @@ module Scheduling =

interface IDispatcher<'P, 'R, 'E> with
override _.TryReplenish pending markStreamBusy =
inner.TryReplenish (pending, stats.MarkStarted) project markStreamBusy
inner.TryReplenish (pending, stats.CanDispatch, stats.MarkStarted) project markStreamBusy
[<CLIEvent>] override _.Result = inner.Result
override _.InterpretProgress(streams : StreamStates<_>, stream : FsCodec.StreamName, res : Choice<'P, 'E>) =
interpretProgress streams stream res
Expand Down Expand Up @@ -992,9 +1028,9 @@ type Stats<'Outcome>(log : ILogger, statsInterval, statesInterval) =
exnEvents <- exnEvents + es
exnBytes <- exnBytes + int64 bs
resultExnOther <- resultExnOther + 1
this.HandleExn(log.ForContext("stream", stream).ForContext("events", es).ForContext("duration", duration), exn)
this.HandleExn(log.ForContext("stream", stream).ForContext("events", es).ForContext("duration", duration), stream, exn)
abstract member HandleOk : outcome : 'Outcome -> unit
abstract member HandleExn : log : ILogger * exn : exn -> unit
abstract member HandleExn : log : ILogger * streamName : FsCodec.StreamName * exn : exn -> unit

module Projector =

Expand Down
2 changes: 1 addition & 1 deletion tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ module Helpers =
inherit Propulsion.Streams.Stats<unit>(log, statsInterval, stateInterval)

override _.HandleOk(()) = ()
override _.HandleExn(log, exn) = log.Information(exn, "Unhandled")
override _.HandleExn(log, _stream, exn) = log.Information(exn, "Unhandled")

let runConsumersBatch log (config : KafkaConsumerConfig) (numConsumers : int) (timeout : TimeSpan option) (handler : ConsumerCallback) = async {
let mkConsumer (consumerId : int) = async {
Expand Down
2 changes: 1 addition & 1 deletion tools/Propulsion.Tool/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ type Stats(log, statsInterval, statesInterval) =
inherit Propulsion.Streams.Stats<unit>(log, statsInterval=statsInterval, statesInterval=statesInterval)
member val StatsInterval = statsInterval
override _.HandleOk(_log) = ()
override _.HandleExn(_log, _exn) = ()
override _.HandleExn(_log, _stream, _exn) = ()

let main argv =
Expand Down

0 comments on commit 372c82f

Please sign in to comment.