Skip to content

Commit

Permalink
Add backoff impl
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Dec 6, 2021
1 parent e300a63 commit a2a1c90
Showing 1 changed file with 47 additions and 11 deletions.
58 changes: 47 additions & 11 deletions src/Propulsion/Streams.fs
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,10 @@ module Scheduling =
let timeSpanFromStopwatchTicks = function
| ticks when ticks > 0L -> TimeSpan.FromSeconds(double ticks / ticksPerSecond)
| _ -> TimeSpan.Zero
let private dateTimeOffsetToTimeStamp (dto : DateTimeOffset) : int64 =
let now, nowTs = DateTimeOffset.UtcNow, System.Diagnostics.Stopwatch.GetTimestamp()
let totalWaitS = (dto - now).TotalSeconds
nowTs + int64 (totalWaitS * ticksPerSecond)
type private StreamState = { ts : int64; mutable count : int }
let private walkAges (state : Dictionary<_, _>) =
let now = System.Diagnostics.Stopwatch.GetTimestamp()
Expand Down Expand Up @@ -551,32 +555,55 @@ module Scheduling =
| true, v -> v.count <- v.count + 1
| false, _ -> state.Add(sn, { ts = startTs; count = 1 })
member _.State = walkAges state |> renderState
/// Maintains a list of streams that have been marked to backing off on processing
type private Waiting() =
let state = Dictionary<FsCodec.StreamName, StreamState>() // NOTE ts is a cutoff time, not a start time here
let prune cutoff =
for sn in [| for kv in state do if kv.Value.ts <= cutoff then kv.Key |] do
state.Remove sn |> ignore
let walk now = if state.Count = 0 then Seq.empty else seq { for x in state.Values -> struct (x.ts - now, x.count) }
member _.HandleBackOff(sn, untilTs) = state.Add(sn, { ts = untilTs; count = 1 })
member _.CanDispatch(sn, ts) =
match state.TryGetValue sn with
| true, { ts = until } when until > ts -> false
| true, _ -> state.Remove sn |> ignore; true
| false, _ -> true
member _.State : (int * int) * (TimeSpan * TimeSpan) =
let now = System.Diagnostics.Stopwatch.GetTimestamp()
prune now
walk now |> renderState
/// Collates all state and reactions to manage the list of busy streams based on callbacks/notifications from the Dispatcher
type Monitor() =
let active, failing, stuck = Active(), Repeating(), Repeating()
let active, failing, stuck, waiting = Active(), Repeating(), Repeating(), Waiting()
let emit (log : ILogger) state (streams, attempts) (oldest : TimeSpan, newest : TimeSpan) =
log.Information(" {state} {streams} for {newest:n1}-{oldest:n1}s, {attempts} attempts",
state, streams, newest.TotalSeconds, oldest.TotalSeconds, attempts)
member _.CanDispatch(sn, ts) =
waiting.CanDispatch(sn, ts)
member _.HandleStarted(sn, ts) =
active.HandleStarted(sn, ts)
member _.HandleResult(sn, succeeded, progressed) =
let startTs = active.TakeFinished(sn)
failing.HandleResult(sn, not succeeded, startTs)
stuck.HandleResult(sn, succeeded && not progressed, startTs)
member _.HandleBackoff(sn, untilTs) =
waiting.HandleBackOff(sn, dateTimeOffsetToTimeStamp untilTs)
member _.DumpState(log : ILogger) =
let inline dump state (streams, attempts) ages =
if streams <> 0 then
emit log state (streams, attempts) ages
active.State ||> dump "active"
failing.State ||> dump "failing"
stuck.State ||> dump "stalled"
waiting.State ||> dump "waiting"
member _.EmitMetrics(log : ILogger) =
let inline report state (streams, attempts) (oldest : TimeSpan, newest : TimeSpan) =
let m = Log.Metric.StreamsBusy (state, streams, oldest.TotalSeconds, newest.TotalSeconds)
emit (log |> Log.metric m) state (streams, attempts) (oldest, newest)
active.State ||> report "active"
failing.State ||> report "failing"
stuck.State ||> report "stalled"
waiting.State ||> report "waiting"

/// Gathers stats pertaining to the core projection/ingestion activity
[<AbstractClass>]
Expand Down Expand Up @@ -625,6 +652,14 @@ module Scheduling =
if stucksDue () then
mon.EmitMetrics metricsLog

abstract BackoffUntil : stream : FsCodec.StreamName * until : DateTimeOffset -> unit
default _.BackoffUntil(stream, until) =
mon.HandleBackoff(stream, until)

/// Enables one to configure backoffs for streams that are failing
abstract CanDispatch : stream : FsCodec.StreamName * stopwatchTicks : int64 -> bool
default _.CanDispatch(stream, stopwatchTicks) = mon.CanDispatch(stream, stopwatchTicks)

abstract MarkStarted : stream : FsCodec.StreamName * stopwatchTicks : int64 -> unit
default _.MarkStarted(stream, stopwatchTicks) =
mon.HandleStarted(stream, stopwatchTicks)
Expand Down Expand Up @@ -685,27 +720,28 @@ module Scheduling =
let inner = DopDispatcher<TimeSpan * FsCodec.StreamName * bool * 'R>(maxDop)

// On each iteration, we try to fill the in-flight queue, taking the oldest and/or heaviest streams first
let tryFillDispatcher (pending, markStarted) project markBusy =
let tryFillDispatcher (pending, canDispatchAt, markStarted) project markBusy =
let mutable hasCapacity, dispatched = inner.HasCapacity, false
if hasCapacity then
let potential : seq<DispatchItem<byte[]>> = pending ()
let xs = potential.GetEnumerator()
let ts = System.Diagnostics.Stopwatch.GetTimestamp()
while xs.MoveNext() && hasCapacity do
let item = xs.Current
let succeeded = inner.TryAdd(project ts item)
if succeeded then
markBusy item.stream
markStarted (item.stream, ts)
hasCapacity <- succeeded
dispatched <- dispatched || succeeded // if we added any request, we'll skip sleeping
if canDispatchAt (item.stream, ts) then
let succeeded = inner.TryAdd(project ts item)
if succeeded then
markBusy item.stream
markStarted (item.stream, ts)
hasCapacity <- succeeded
dispatched <- dispatched || succeeded // if we added any request, we'll skip sleeping
hasCapacity, dispatched

member _.Pump() = inner.Pump()
[<CLIEvent>] member _.Result = inner.Result
member _.State = inner.State
member _.TryReplenish (pending, markStarted) project markStreamBusy =
tryFillDispatcher (pending, markStarted) project markStreamBusy
member _.TryReplenish (pending, canDispatchAt, markStarted) project markStreamBusy =
tryFillDispatcher (pending, canDispatchAt, markStarted) project markStreamBusy

/// Defines interface between Scheduler (which owns the pending work) and the Dispatcher which periodically selects work to commence based on a policy
type IDispatcher<'P, 'R, 'E> =
Expand Down Expand Up @@ -743,7 +779,7 @@ module Scheduling =

interface IDispatcher<'P, 'R, 'E> with
override _.TryReplenish pending markStreamBusy =
inner.TryReplenish (pending, stats.MarkStarted) project markStreamBusy
inner.TryReplenish (pending, stats.CanDispatch, stats.MarkStarted) project markStreamBusy
[<CLIEvent>] override _.Result = inner.Result
override _.InterpretProgress(streams : StreamStates<_>, stream : FsCodec.StreamName, res : Choice<'P, 'E>) =
interpretProgress streams stream res
Expand Down

0 comments on commit a2a1c90

Please sign in to comment.