Skip to content

Commit

Permalink
Add Waiting stats by category
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Mar 15, 2019
1 parent c2de0ff commit cebc41f
Showing 1 changed file with 38 additions and 23 deletions.
61 changes: 38 additions & 23 deletions equinox-sync/Sync/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,9 @@ module Ingester =
let inline arrayBytes (x:byte[]) = if x = null then 0 else x.Length

type [<NoComparison>] StreamState = { read: int64 option; write: int64 option; isMalformed : bool; queue: Span[] } with
member __.IsHead = match Array.tryHead __.queue with Some x -> x.pos = defaultArg __.write 0L | None -> false
member __.IsReady = __.queue <> null && not __.isMalformed && __.IsHead
/// Determines whether the head is ready to write (either write position is unknown, or matches)
member __.IsHeady = Array.tryHead __.queue |> Option.exists (fun x -> __.write |> Option.forall (fun w -> w = x.pos))
member __.IsReady = __.queue <> null && not __.isMalformed && __.IsHeady
member __.Size =
if __.queue = null then 0
else __.queue |> Seq.collect (fun x -> x.events) |> Seq.sumBy (fun x -> arrayBytes x.Data + arrayBytes x.Meta + x.EventType.Length*2 + 16)
Expand All @@ -279,6 +280,17 @@ module Ingester =
let items = seq { if s1.queue <> null then yield! s1.queue; if s2.queue <> null then yield! s2.queue }
{ read = optionCombine max s1.read s2.read; write = writePos; isMalformed = s1.isMalformed || s2.isMalformed; queue = Span.merge (defaultArg writePos 0L) items}

/// Gathers stats relating to how many items of a given category have been observed
type CatStats() =
let cats = System.Collections.Generic.Dictionary<string,int>()
member __.Ingest cat =
match cats.TryGetValue cat with
| true, catCount -> cats.[cat] <- catCount + 1
| false, _ -> cats.[cat] <- 1
member __.Any = cats.Count <> 0
member __.Clear() = cats.Clear()
member __.StatsDescending = cats |> Seq.map (|KeyValue|) |> Seq.sortByDescending snd

type StreamStates() =
let states = System.Collections.Generic.Dictionary<string, StreamState>()
let dirty = System.Collections.Generic.Queue()
Expand Down Expand Up @@ -319,29 +331,30 @@ module Ingester =

if not state.IsReady then None else

match state.queue |> Array.tryHead with
| None -> None
| Some x ->
let x = state.queue |> Array.head

let mutable bytesBudget = cosmosPayloadLimit
let mutable count = 0
let max2MbMax1000EventsMax10EventsFirstTranche (y : Equinox.Codec.IEvent<byte[]>) =
bytesBudget <- bytesBudget - cosmosPayloadBytes y
count <- count + 1
count < (if x.pos = 0L then 10 else 1000) && (bytesBudget >= 0 || count = 1)
// Reduce the item count when we don't yet know the write position
count < (if Option.isNone state.write then 10 else 1000) && (bytesBudget >= 0 || count = 1)
Some { stream = stream; span = { pos = x.pos; events = x.events |> Array.takeWhile max2MbMax1000EventsMax10EventsFirstTranche } }
member __.Dump() =
let mutable synced, ready, waiting, malformed = 0, 0, 0, 0
let mutable readyB, waitingB, malformedB = 0L, 0L, 0L
for x in states do
match int64 x.Value.Size with
let waitCats = CatStats()
for KeyValue (stream,state) in states do
match int64 state.Size with
| 0L -> synced <- synced + 1
| sz when x.Value.isMalformed -> malformed <- malformed + 1; malformedB <- malformedB + sz
| sz when x.Value.IsReady -> ready <- ready + 1; readyB <- readyB + sz
| sz -> waiting <- waiting + 1; waitingB <- waitingB + sz
| sz when state.isMalformed -> malformed <- malformed + 1; malformedB <- malformedB + sz
| sz when state.IsReady -> ready <- ready + 1; readyB <- readyB + sz
| sz -> waitCats.Ingest(category stream); waiting <- waiting + 1; waitingB <- waitingB + sz
let mb x = x / 1024L / 1024L
Log.Warning("Queued {dirty} Ready {ready}/{readyMb}MB Waiting {waiting}/{waitingMb}MB Malformed {malformed}/{malformedMb}MB Synced {synced}",
dirty.Count, ready, mb readyB, waiting, mb waitingB, malformed, mb malformedB, synced)
if waitCats.Any then Log.Warning("Waiting {waitCats}", waitCats.StatsDescending)
type Queue(log : Serilog.ILogger, writer : Writer, cancellationToken: CancellationToken, readerQueueLen, ?interval) =
let intervalMs = let t = defaultArg interval (TimeSpan.FromMinutes 1.) in t.TotalMilliseconds |> int64
let states = StreamStates()
Expand All @@ -355,7 +368,7 @@ module Ingester =
let mutable pendingWriterAdd = None
let mutable bytesPended = 0L
let resultsHandled, ingestionsHandled, workPended, eventsPended = ref 0, ref 0, ref 0, ref 0
let badCats = System.Collections.Generic.Dictionary<string,int>()
let badCats = CatStats()
let progressTimer = Stopwatch.StartNew()
while not cancellationToken.IsCancellationRequested do
let mutable moreResults = true
Expand All @@ -365,10 +378,7 @@ module Ingester =
incr resultsHandled
match states.HandleWriteResult res with
| None -> res.WriteTo log
| Some cat ->
match badCats.TryGetValue cat with
| true, catCount -> badCats.[cat] <- catCount + 1
| false, _ -> badCats.[cat] <- 1
| Some cat -> badCats.Ingest cat
| false, _ -> moreResults <- false
let mutable t = Unchecked.defaultof<_>
let mutable toIngest = 4096 * 5
Expand Down Expand Up @@ -405,8 +415,8 @@ module Ingester =
progressTimer.Restart()
Log.Warning("Ingested {ingestions}; Sent {queued} req {events} events; Completed {completed} reqs; Egress {gb:n3}GB",
!ingestionsHandled, !workPended, !eventsPended,!resultsHandled, float bytesPended / 1024. / 1024. / 1024.)
Log.Error("Malformed {badCats}", badCats |> Seq.map (|KeyValue|) |> Seq.sortByDescending snd)
ingestionsHandled := 0; workPended := 0; eventsPended := 0; resultsHandled := 0; badCats.Clear()
if badCats.Any then Log.Error("Malformed {badCats}", badCats.StatsDescending); badCats.Clear()
ingestionsHandled := 0; workPended := 0; eventsPended := 0; resultsHandled := 0
states.Dump()

/// Manages establishing of the writer 'threads' - can be Stop()ped explicitly and/or will stop when caller does
Expand Down Expand Up @@ -458,11 +468,16 @@ type SliceStatsBuffer(?interval) =
batchBytes
member __.DumpIfIntervalExpired() =
if accStart.ElapsedMilliseconds > intervalMs then
Log.Warning("Cats MB/cat/count {@cats}",
recentCats
|> Seq.sortByDescending (fun x -> snd x.Value)
|> Seq.truncate 10
|> Seq.map (fun (KeyValue (s,(c,b))) -> b/1024/1024, s, c))
let log = function
| [||] -> ()
| xs ->
xs
|> Seq.sortByDescending (fun (KeyValue (_,(_,b))) -> b)
|> Seq.truncate 10
|> Seq.map (fun (KeyValue (s,(c,b))) -> b/1024/1024, s, c)
|> fun rendered -> Log.Warning("Processed {@cats} (MB/cat/count)", rendered)
recentCats |> Seq.where (fun x -> x.Key.StartsWith '$' |> not) |> Array.ofSeq |> log
recentCats |> Seq.where (fun x -> x.Key.StartsWith '$') |> Array.ofSeq |> log
recentCats.Clear()
accStart.Restart()

Expand Down

0 comments on commit cebc41f

Please sign in to comment.