Skip to content

Commit

Permalink
Add MinBatchSize cmdline parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Mar 26, 2019
1 parent 882e5c0 commit 6b554e3
Showing 1 changed file with 14 additions and 10 deletions.
24 changes: 14 additions & 10 deletions equinox-sync/Sync/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ open System.Diagnostics
open System.Threading

type StartPos = Absolute of int64 | Chunk of int | Percentage of float | Start | Ignore
type ReaderSpec = { start: StartPos; stripes: int; batchSize: int; streams: string list; tailInterval: TimeSpan option }
type ReaderSpec = { start: StartPos; streams: string list; tailInterval: TimeSpan option; stripes: int; batchSize: int; minBatchSize: int }
let mb x = float x / 1024. / 1024.

module CmdParser =
Expand Down Expand Up @@ -117,6 +117,7 @@ module CmdParser =
[<NoEquality; NoComparison>]
type Arguments =
| [<AltCommandLine "-m"; Unique>] BatchSize of int
| [<AltCommandLine "-b"; Unique>] MinBatchSize of int
| [<AltCommandLine "-v"; Unique>] Verbose
| [<AltCommandLine "-vc"; Unique>] VerboseConsole
| [<AltCommandLine "-S"; Unique>] LocalSeq
Expand All @@ -132,6 +133,7 @@ module CmdParser =
member a.Usage =
match a with
| BatchSize _ -> "maximum item count to request from feed. Default: 4096"
| MinBatchSize _ -> "minimum item count to drop down to in reaction to read failures. Default: 512"
| Verbose -> "request Verbose Logging. Default: off"
| VerboseConsole -> "request Verbose Console Logging. Default: off"
| LocalSeq -> "configures writing to a local Seq endpoint at http://localhost:5341, see https://getseq.net"
Expand All @@ -148,11 +150,12 @@ module CmdParser =
member __.Verbose = args.Contains Verbose
member __.ConsoleMinLevel = if args.Contains VerboseConsole then Serilog.Events.LogEventLevel.Information else Serilog.Events.LogEventLevel.Warning
member __.MaybeSeqEndpoint = if args.Contains LocalSeq then Some "http://localhost:5341" else None
member __.BatchSize = args.GetResult(BatchSize,4096)
member __.StartingBatchSize = args.GetResult(BatchSize,4096)
member __.MinBatchSize = args.GetResult(MinBatchSize,512)
member __.Stripes = args.GetResult(Stripes,1)
member __.TailInterval = match args.TryGetResult Tail with Some s -> TimeSpan.FromSeconds s |> Some | None -> None
member x.BuildFeedParams() : ReaderSpec =
Log.Warning("Processing in batches of {batchSize} with {stripes} stripes", x.BatchSize, x.Stripes)
Log.Warning("Processing in batches of {batchSize} (min {minBatchSize}) with {stripes} stripes", x.StartingBatchSize, x.MinBatchSize, x.Stripes)
let startPos =
match args.TryGetResult Offset, args.TryGetResult Chunk, args.TryGetResult Percent, args.Contains All with
| Some p, _, _, _ -> Log.Warning("Processing will commence at $all Position {p}", p); Absolute p
Expand All @@ -163,7 +166,8 @@ module CmdParser =
match x.TailInterval with
| Some interval -> Log.Warning("Following tail at {seconds}s interval", interval.TotalSeconds)
| None -> Log.Warning "Not following tail"
{ start = startPos; stripes = x.Stripes; batchSize = x.BatchSize; streams = args.GetResults Stream; tailInterval = x.TailInterval }
{ start = startPos; streams = args.GetResults Stream; tailInterval = x.TailInterval
batchSize = x.StartingBatchSize; minBatchSize = x.MinBatchSize; stripes = x.Stripes }

/// Parse the commandline; can throw exceptions in response to missing arguments and/or `-h`/`--help` args
let parse argv : Parameters =
Expand Down Expand Up @@ -566,9 +570,9 @@ module EventStoreReader =
usedEvents <- usedEvents + 1
postBatch { stream = stream; span = { index = pos; events = [| item |]}}
if not(ignoreEmptyEof = Some true && batchEvents = 0 && not currentSlice.IsEndOfStream) then // ES doesnt report EOF on the first call :(
Log.Warning("Read {pos,10} {pct:p1} {ft:n3}s {count,4} {mb:n1}MB {categories,3}c {streams,4}s {events,4}e Post {pt:n0}ms",
range.Current.CommitPosition, range.PositionAsRangePercentage, (let e = sw.Elapsed in e.TotalSeconds), batchEvents, mb batchBytes,
usedCats, usedStreams, usedEvents, postSw.ElapsedMilliseconds)
Log.Warning("Read {pos,10} {pct:p1} {ft:n3}s {mb:n1}MB {count,4} {categories,3}c {streams,4}s {events,4}e Post {pt:n0}ms",
range.Current.CommitPosition, range.PositionAsRangePercentage, (let e = sw.Elapsed in e.TotalSeconds), mb batchBytes,
batchEvents, usedCats, usedStreams, usedEvents, postSw.ElapsedMilliseconds)
let shouldLoop = range.TryNext currentSlice.NextPosition
if shouldLoop && not currentSlice.IsEndOfStream then
sw.Restart() // restart the clock as we hand off back to the Reader
Expand All @@ -584,7 +588,7 @@ module EventStoreReader =
| Stream of name: string * batchSize: int
| Tranche of range: Range * batchSize : int
| Tail of pos: Position * interval: TimeSpan * batchSize : int
type FeedQueue(batchSize, max, ?statsInterval) =
type FeedQueue(batchSize, minBatchSize, max, ?statsInterval) =
let work = ConcurrentQueue()
member val OverallStats = OverallStats(?statsInterval=statsInterval)
member val SlicesStats = SliceStatsBuffer()
Expand All @@ -599,7 +603,7 @@ module EventStoreReader =
member __.TryDequeue () =
work.TryDequeue()
member __.Process(conn, enumEvents, postBatch, work) = async {
let adjust batchSize = if batchSize > 128 then batchSize - 128 else batchSize
let adjust batchSize = if batchSize > minBatchSize then batchSize - 128 else batchSize
match work with
| Stream (name,batchSize) ->
use _ = Serilog.Context.LogContext.PushProperty("Stream",name)
Expand Down Expand Up @@ -661,7 +665,7 @@ module EventStoreReader =
return true }

type Reader(conn : IEventStoreConnection, spec: ReaderSpec, enumEvents, postBatch : Ingester.Batch -> unit, max, ct : CancellationToken, ?statsInterval) =
let work = FeedQueue(spec.batchSize, max, ?statsInterval=statsInterval)
let work = FeedQueue(spec.batchSize, spec.minBatchSize, max, ?statsInterval=statsInterval)
do match spec.tailInterval with
| Some interval -> work.AddTail(max, interval)
| None -> ()
Expand Down

0 comments on commit 6b554e3

Please sign in to comment.