From 6b554e3ee929bda65f03bb6d19c2d71449b3c7aa Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Mon, 25 Mar 2019 12:40:54 +0000 Subject: [PATCH] Add MinBatchSize cmdline parameter --- equinox-sync/Sync/Program.fs | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/equinox-sync/Sync/Program.fs b/equinox-sync/Sync/Program.fs index a9847f12a..c1b50a43b 100644 --- a/equinox-sync/Sync/Program.fs +++ b/equinox-sync/Sync/Program.fs @@ -9,7 +9,7 @@ open System.Diagnostics open System.Threading type StartPos = Absolute of int64 | Chunk of int | Percentage of float | Start | Ignore -type ReaderSpec = { start: StartPos; stripes: int; batchSize: int; streams: string list; tailInterval: TimeSpan option } +type ReaderSpec = { start: StartPos; streams: string list; tailInterval: TimeSpan option; stripes: int; batchSize: int; minBatchSize: int } let mb x = float x / 1024. / 1024. module CmdParser = @@ -117,6 +117,7 @@ module CmdParser = [] type Arguments = | [] BatchSize of int + | [] MinBatchSize of int | [] Verbose | [] VerboseConsole | [] LocalSeq @@ -132,6 +133,7 @@ module CmdParser = member a.Usage = match a with | BatchSize _ -> "maximum item count to request from feed. Default: 4096" + | MinBatchSize _ -> "minimum item count to drop down to in reaction to read failures. Default: 512" | Verbose -> "request Verbose Logging. Default: off" | VerboseConsole -> "request Verbose Console Logging. Default: off" | LocalSeq -> "configures writing to a local Seq endpoint at http://localhost:5341, see https://getseq.net" @@ -148,11 +150,12 @@ module CmdParser = member __.Verbose = args.Contains Verbose member __.ConsoleMinLevel = if args.Contains VerboseConsole then Serilog.Events.LogEventLevel.Information else Serilog.Events.LogEventLevel.Warning member __.MaybeSeqEndpoint = if args.Contains LocalSeq then Some "http://localhost:5341" else None - member __.BatchSize = args.GetResult(BatchSize,4096) + member __.StartingBatchSize = args.GetResult(BatchSize,4096) + member __.MinBatchSize = args.GetResult(MinBatchSize,512) member __.Stripes = args.GetResult(Stripes,1) member __.TailInterval = match args.TryGetResult Tail with Some s -> TimeSpan.FromSeconds s |> Some | None -> None member x.BuildFeedParams() : ReaderSpec = - Log.Warning("Processing in batches of {batchSize} with {stripes} stripes", x.BatchSize, x.Stripes) + Log.Warning("Processing in batches of {batchSize} (min {minBatchSize}) with {stripes} stripes", x.StartingBatchSize, x.MinBatchSize, x.Stripes) let startPos = match args.TryGetResult Offset, args.TryGetResult Chunk, args.TryGetResult Percent, args.Contains All with | Some p, _, _, _ -> Log.Warning("Processing will commence at $all Position {p}", p); Absolute p @@ -163,7 +166,8 @@ module CmdParser = match x.TailInterval with | Some interval -> Log.Warning("Following tail at {seconds}s interval", interval.TotalSeconds) | None -> Log.Warning "Not following tail" - { start = startPos; stripes = x.Stripes; batchSize = x.BatchSize; streams = args.GetResults Stream; tailInterval = x.TailInterval } + { start = startPos; streams = args.GetResults Stream; tailInterval = x.TailInterval + batchSize = x.StartingBatchSize; minBatchSize = x.MinBatchSize; stripes = x.Stripes } /// Parse the commandline; can throw exceptions in response to missing arguments and/or `-h`/`--help` args let parse argv : Parameters = @@ -566,9 +570,9 @@ module EventStoreReader = usedEvents <- usedEvents + 1 postBatch { stream = stream; span = { index = pos; events = [| item |]}} if not(ignoreEmptyEof = Some true && batchEvents = 0 && not currentSlice.IsEndOfStream) then // ES doesnt report EOF on the first call :( - Log.Warning("Read {pos,10} {pct:p1} {ft:n3}s {count,4} {mb:n1}MB {categories,3}c {streams,4}s {events,4}e Post {pt:n0}ms", - range.Current.CommitPosition, range.PositionAsRangePercentage, (let e = sw.Elapsed in e.TotalSeconds), batchEvents, mb batchBytes, - usedCats, usedStreams, usedEvents, postSw.ElapsedMilliseconds) + Log.Warning("Read {pos,10} {pct:p1} {ft:n3}s {mb:n1}MB {count,4} {categories,3}c {streams,4}s {events,4}e Post {pt:n0}ms", + range.Current.CommitPosition, range.PositionAsRangePercentage, (let e = sw.Elapsed in e.TotalSeconds), mb batchBytes, + batchEvents, usedCats, usedStreams, usedEvents, postSw.ElapsedMilliseconds) let shouldLoop = range.TryNext currentSlice.NextPosition if shouldLoop && not currentSlice.IsEndOfStream then sw.Restart() // restart the clock as we hand off back to the Reader @@ -584,7 +588,7 @@ module EventStoreReader = | Stream of name: string * batchSize: int | Tranche of range: Range * batchSize : int | Tail of pos: Position * interval: TimeSpan * batchSize : int - type FeedQueue(batchSize, max, ?statsInterval) = + type FeedQueue(batchSize, minBatchSize, max, ?statsInterval) = let work = ConcurrentQueue() member val OverallStats = OverallStats(?statsInterval=statsInterval) member val SlicesStats = SliceStatsBuffer() @@ -599,7 +603,7 @@ module EventStoreReader = member __.TryDequeue () = work.TryDequeue() member __.Process(conn, enumEvents, postBatch, work) = async { - let adjust batchSize = if batchSize > 128 then batchSize - 128 else batchSize + let adjust batchSize = if batchSize > minBatchSize then batchSize - 128 else batchSize match work with | Stream (name,batchSize) -> use _ = Serilog.Context.LogContext.PushProperty("Stream",name) @@ -661,7 +665,7 @@ module EventStoreReader = return true } type Reader(conn : IEventStoreConnection, spec: ReaderSpec, enumEvents, postBatch : Ingester.Batch -> unit, max, ct : CancellationToken, ?statsInterval) = - let work = FeedQueue(spec.batchSize, max, ?statsInterval=statsInterval) + let work = FeedQueue(spec.batchSize, spec.minBatchSize, max, ?statsInterval=statsInterval) do match spec.tailInterval with | Some interval -> work.AddTail(max, interval) | None -> ()