diff --git a/equinox-sync/Sync/Program.fs b/equinox-sync/Sync/Program.fs index cb2e047c1..e3ece63a8 100644 --- a/equinox-sync/Sync/Program.fs +++ b/equinox-sync/Sync/Program.fs @@ -253,7 +253,8 @@ module Ingester = let inline arrayBytes (x:byte[]) = if x = null then 0 else x.Length type [] StreamState = { read: int64 option; write: int64 option; isMalformed : bool; queue: Span[] } with - member __.IsHead = match Array.tryHead __.queue with Some x -> x.pos = defaultArg __.write 0L | None -> false + /// Determines whether the head is ready to write (either write position is unknown, or matches) + member __.IsHead = Array.tryHead __.queue |> Option.exists (fun x -> __.write |> Option.forall (fun w -> w = x.pos)) member __.IsReady = __.queue <> null && not __.isMalformed && __.IsHead member __.Size = if __.queue = null then 0 @@ -279,6 +280,17 @@ module Ingester = let items = seq { if s1.queue <> null then yield! s1.queue; if s2.queue <> null then yield! s2.queue } { read = optionCombine max s1.read s2.read; write = writePos; isMalformed = s1.isMalformed || s2.isMalformed; queue = Span.merge (defaultArg writePos 0L) items} + /// Gathers stats relating to how many items of a given category have been observed + type CatStats() = + let cats = System.Collections.Generic.Dictionary() + member __.Ingest cat = + match cats.TryGetValue cat with + | true, catCount -> cats.[cat] <- catCount + 1 + | false, _ -> cats.[cat] <- 1 + member __.Any = cats.Count <> 0 + member __.Clear() = cats.Clear() + member __.StatsDescending = cats |> Seq.map (|KeyValue|) |> Seq.sortByDescending snd + type StreamStates() = let states = System.Collections.Generic.Dictionary() let dirty = System.Collections.Generic.Queue() @@ -319,29 +331,30 @@ module Ingester = if not state.IsReady then None else - match state.queue |> Array.tryHead with - | None -> None - | Some x -> + let x = state.queue |> Array.head let mutable bytesBudget = cosmosPayloadLimit let mutable count = 0 let max2MbMax1000EventsMax10EventsFirstTranche (y : Equinox.Codec.IEvent) = bytesBudget <- bytesBudget - cosmosPayloadBytes y count <- count + 1 - count < (if x.pos = 0L then 10 else 1000) && (bytesBudget >= 0 || count = 1) + // Reduce the item count when we don't yet know the write position + count < (if Option.isNone state.write then 10 else 1000) && (bytesBudget >= 0 || count = 1) Some { stream = stream; span = { pos = x.pos; events = x.events |> Array.takeWhile max2MbMax1000EventsMax10EventsFirstTranche } } member __.Dump() = let mutable synced, ready, waiting, malformed = 0, 0, 0, 0 let mutable readyB, waitingB, malformedB = 0L, 0L, 0L - for x in states do - match int64 x.Value.Size with + let waitCats = CatStats() + for KeyValue (stream,state) in states do + match int64 state.Size with | 0L -> synced <- synced + 1 - | sz when x.Value.isMalformed -> malformed <- malformed + 1; malformedB <- malformedB + sz - | sz when x.Value.IsReady -> ready <- ready + 1; readyB <- readyB + sz - | sz -> waiting <- waiting + 1; waitingB <- waitingB + sz + | sz when state.isMalformed -> malformed <- malformed + 1; malformedB <- malformedB + sz + | sz when state.IsReady -> ready <- ready + 1; readyB <- readyB + sz + | sz -> waitCats.Ingest(category stream); waiting <- waiting + 1; waitingB <- waitingB + sz let mb x = x / 1024L / 1024L Log.Warning("Queued {dirty} Ready {ready}/{readyMb}MB Waiting {waiting}/{waitingMb}MB Malformed {malformed}/{malformedMb}MB Synced {synced}", dirty.Count, ready, mb readyB, waiting, mb waitingB, malformed, mb malformedB, synced) + if waitCats.Any then Log.Warning("Waiting {waitCats}", waitCats.StatsDescending) type Queue(log : Serilog.ILogger, writer : Writer, cancellationToken: CancellationToken, readerQueueLen, ?interval) = let intervalMs = let t = defaultArg interval (TimeSpan.FromMinutes 1.) in t.TotalMilliseconds |> int64 let states = StreamStates() @@ -355,7 +368,7 @@ module Ingester = let mutable pendingWriterAdd = None let mutable bytesPended = 0L let resultsHandled, ingestionsHandled, workPended, eventsPended = ref 0, ref 0, ref 0, ref 0 - let badCats = System.Collections.Generic.Dictionary() + let badCats = CatStats() let progressTimer = Stopwatch.StartNew() while not cancellationToken.IsCancellationRequested do let mutable moreResults = true @@ -365,10 +378,7 @@ module Ingester = incr resultsHandled match states.HandleWriteResult res with | None -> res.WriteTo log - | Some cat -> - match badCats.TryGetValue cat with - | true, catCount -> badCats.[cat] <- catCount + 1 - | false, _ -> badCats.[cat] <- 1 + | Some cat -> badCats.Ingest cat | false, _ -> moreResults <- false let mutable t = Unchecked.defaultof<_> let mutable toIngest = 4096 * 5 @@ -405,8 +415,8 @@ module Ingester = progressTimer.Restart() Log.Warning("Ingested {ingestions}; Sent {queued} req {events} events; Completed {completed} reqs; Egress {gb:n3}GB", !ingestionsHandled, !workPended, !eventsPended,!resultsHandled, float bytesPended / 1024. / 1024. / 1024.) - Log.Error("Malformed {badCats}", badCats |> Seq.map (|KeyValue|) |> Seq.sortByDescending snd) - ingestionsHandled := 0; workPended := 0; eventsPended := 0; resultsHandled := 0; badCats.Clear() + if badCats.Any then Log.Error("Malformed {badCats}", badCats.StatsDescending); badCats.Clear() + ingestionsHandled := 0; workPended := 0; eventsPended := 0; resultsHandled := 0 states.Dump() /// Manages establishing of the writer 'threads' - can be Stop()ped explicitly and/or will stop when caller does @@ -458,11 +468,16 @@ type SliceStatsBuffer(?interval) = batchBytes member __.DumpIfIntervalExpired() = if accStart.ElapsedMilliseconds > intervalMs then - Log.Warning("Cats MB/cat/count {@cats}", - recentCats - |> Seq.sortByDescending (fun x -> snd x.Value) - |> Seq.truncate 10 - |> Seq.map (fun (KeyValue (s,(c,b))) -> b/1024/1024, s, c)) + let log = function + | [||] -> () + | xs -> + xs + |> Seq.sortByDescending (fun (KeyValue (_,(_,b))) -> b) + |> Seq.truncate 10 + |> Seq.map (fun (KeyValue (s,(c,b))) -> b/1024/1024, s, c) + |> fun rendered -> Log.Warning("Processed {@cats} (MB/cat/count)", rendered) + recentCats |> Seq.where (fun x -> x.Key.StartsWith '$' |> not) |> Array.ofSeq |> log + recentCats |> Seq.where (fun x -> x.Key.StartsWith '$') |> Array.ofSeq |> log recentCats.Clear() accStart.Restart()