diff --git a/equinox-sync/Sync/Program.fs b/equinox-sync/Sync/Program.fs index c3a31d7ea..34b9c288a 100644 --- a/equinox-sync/Sync/Program.fs +++ b/equinox-sync/Sync/Program.fs @@ -5,7 +5,7 @@ open FSharp.Control open Serilog open System -type StartPos = Absolute of int64 | Percentage of float | Start +type StartPos = Absolute of int64 | Chunk of int | Percentage of float | Start type ReaderSpec = { start: StartPos; stripes: int; batchSize: int; streams: string list } let mb x = float x / 1024. / 1024. @@ -122,6 +122,7 @@ module CmdParser = | [] LocalSeq | [] Stream of string | [] Offset of int64 + | [] Chunk of int | [] Percent of float | [] Stripes of int | [] Tail of intervalMs: int @@ -135,6 +136,7 @@ module CmdParser = | LocalSeq -> "configures writing to a local Seq endpoint at http://localhost:5341, see https://getseq.net" | Stream _ -> "specific stream(s) to read" | Offset _ -> "EventStore $all Stream Position to commence from" + | Chunk _ -> "EventStore $all Chunk to commence from" | Percent _ -> "EventStore $all Stream Position to commence from (as a percentage of current tail position)" | Stripes _ -> "number of concurrent readers" | Tail _ -> "attempt to read from tail at specified interval in milliseconds" @@ -154,10 +156,11 @@ module CmdParser = | Some interval -> Log.Warning("Following tail at {seconds}s interval", interval.TotalSeconds) | None -> Log.Warning "Not following tail" let startPos = - match args.TryGetResult Offset, args.TryGetResult Percent with - | Some p, _ -> Log.Warning("Processing will commence at $all Position {p}", p); Absolute p - | _, Some p -> Log.Warning("Processing will commence at $all Percentage {pct:P}", p/100.); Percentage p - | None, None -> Log.Warning "Processing will commence at $all Start"; Start + match args.TryGetResult Offset, args.TryGetResult Chunk, args.TryGetResult Percent with + | Some p, _, _ -> Log.Warning("Processing will commence at $all Position {p}", p); Absolute p + | _, Some c, _ -> Log.Warning("Processing will commence at $all Chunk {c}", c); StartPos.Chunk c + | _, _, Some p -> Log.Warning("Processing will commence at $all Percentage {pct:P}", p/100.); Percentage p + | None, None, None -> Log.Warning "Processing will commence at $all Start"; Start { start = startPos; stripes = x.Stripes; batchSize = x.BatchSize; streams = args.GetResults Stream } /// Parse the commandline; can throw exceptions in response to missing arguments and/or `-h`/`--help` args @@ -439,7 +442,7 @@ module Ingester = } type SliceStatsBuffer(?interval) = - let intervalMs = let t = defaultArg interval (TimeSpan.FromMinutes 1.) in t.TotalMilliseconds |> int64 + let intervalMs = let t = defaultArg interval (TimeSpan.FromMinutes 5.) in t.TotalMilliseconds |> int64 let recentCats, accStart = System.Collections.Generic.Dictionary(), Stopwatch.StartNew() member __.Ingest(slice: EventStore.ClientAPI.AllEventsSlice) = let mutable batchBytes = 0 @@ -451,8 +454,8 @@ type SliceStatsBuffer(?interval) = | false, _ -> recentCats.[cat] <- (1, eventBytes) batchBytes <- batchBytes + eventBytes slice.Events.Length, int64 batchBytes - member __.DumpIfIntervalExpired() = - if accStart.ElapsedMilliseconds > intervalMs then + member __.DumpIfIntervalExpired(?force) = + if accStart.ElapsedMilliseconds > intervalMs || defaultArg force false then let log = function | [||] -> () | xs -> @@ -466,13 +469,20 @@ type SliceStatsBuffer(?interval) = recentCats.Clear() accStart.Restart() -type OverallStats() = +type OverallStats(?statsInterval) = + let intervalMs = let t = defaultArg statsInterval (TimeSpan.FromMinutes 5.) in t.TotalMilliseconds |> int64 + let overallStart, progressStart = Stopwatch.StartNew(), Stopwatch.StartNew() let mutable totalEvents, totalBytes = 0L, 0L member __.Ingest(batchEvents, batchBytes) = totalEvents <- totalEvents + batchEvents totalBytes <- totalBytes + batchBytes member __.Bytes = totalBytes member __.Events = totalEvents + member __.DumpIfIntervalExpired() = + if progressStart.ElapsedMilliseconds > intervalMs then + let totalMb = mb totalBytes + Log.Warning("Traversed {events} events {gb:n1}GB {mbs}MB/s", totalEvents, totalMb/1024., totalMb*1000./float overallStart.ElapsedMilliseconds) + progressStart.Restart() type Range(start, sliceEnd : EventStore.ClientAPI.Position option, max : EventStore.ClientAPI.Position) = member val Current = start with get, set @@ -488,17 +498,17 @@ type Range(start, sliceEnd : EventStore.ClientAPI.Position option, max : EventSt module EventStoreReader = open EventStore.ClientAPI - let posFromPercentage (pct,max : Position) = - let rawPos = float max.CommitPosition * pct / 100. |> int64 - // @scarvel8: event_global_position = 256 x 1024 x 1024 x chunk_number + chunk_header_size (128) + event_position_offset_in_chunk - let chunkBase = rawPos &&& 0xFFFFFFFFE0000000L // rawPos / 256L / 1024L / 1024L * 1024L * 1024L * 256L + // @scarvel8: event_global_position = 256 x 1024 x 1024 x chunk_number + chunk_header_size (128) + event_position_offset_in_chunk + let chunk (pos: Position) = uint64 pos.CommitPosition >>> 28 + let posFromChunk (chunk: int) = + let chunkBase = int64 chunk * 1024L * 1024L * 256L Position(chunkBase,0L) + let posFromPercentage (pct,max : Position) = + let rawPos = Position(float max.CommitPosition * pct / 100. |> int64, 0L) + let chunk = int (chunk rawPos) in posFromChunk chunk // &&& 0xFFFFFFFFE0000000L // rawPos / 256L / 1024L / 1024L * 1024L * 1024L * 256L let posFromChunkAfter (pos: Position) = - let chunkBase = pos.CommitPosition &&& 0xFFFFFFFFF0000000L // rawPos / 256L / 1024L / 1024L * 1024L * 1024L * 256L - let nextBase = chunkBase + 256L * 1024L * 1024L - Position(nextBase,0L) - let chunk (pos: Position) = - uint64 pos.CommitPosition >>> 28 + let nextChunk = 1 + int (chunk pos) + posFromChunk nextChunk let fetchMax (conn : IEventStoreConnection) = async { let! lastItemBatch = conn.ReadAllEventsBackwardAsync(Position.End, 1, resolveLinkTos = false) |> Async.AwaitTaskCorrect @@ -557,6 +567,7 @@ module EventStoreReader = sw.Restart() // restart the clock as we hand off back to the Reader return! loop () else + slicesStats.DumpIfIntervalExpired(force=true) return currentSlice.IsEndOfStream } async { try let! eof = loop () @@ -566,18 +577,15 @@ module EventStoreReader = type [] Work = | Stream of name: string * batchSize: int | Tranche of range: Range * batchSize : int - type FeedQueue(batchSize, max) = + type FeedQueue(batchSize, max, ?statsInterval) = let work = ConcurrentQueue() - let add item = - work.Enqueue item - Log.Warning("Added {item}; count: {count}", item, work.Count) - member val OverallStats = OverallStats() with get + member val OverallStats = OverallStats(?statsInterval=statsInterval) with get member __.AddTranche(range, ?batchSizeOverride) = - add <| Work.Tranche (range, defaultArg batchSizeOverride batchSize) + work.Enqueue <| Work.Tranche (range, defaultArg batchSizeOverride batchSize) member __.AddTranche(pos, nextPos, ?batchSizeOverride) = __.AddTranche(Range (pos, Some nextPos, max), ?batchSizeOverride=batchSizeOverride) member __.AddStream(name, ?batchSizeOverride) = - add <| Work.Stream (name, defaultArg batchSizeOverride batchSize) + work.Enqueue <| Work.Stream (name, defaultArg batchSizeOverride batchSize) member __.TryDequeue () = work.TryDequeue() member __.Process(conn, enumEvents, postBatch, work) = async { @@ -605,18 +613,18 @@ module EventStoreReader = __.AddTranche(range, bs) } - type Reader(conn : IEventStoreConnection, spec: ReaderSpec, enumEvents, postBatch : Ingester.Batch -> unit, max, ct : CancellationToken) = - let work = FeedQueue(spec.batchSize, max) + type Reader(conn : IEventStoreConnection, spec: ReaderSpec, enumEvents, postBatch : Ingester.Batch -> unit, max, ct : CancellationToken, ?statsInterval) = + let work = FeedQueue(spec.batchSize, max, ?statsInterval=statsInterval) do for s in spec.streams do work.AddStream(s) let mutable remainder = let startPos = match spec.start with | StartPos.Start -> Position.Start | Absolute p -> Position(p, 0L) - | Percentage pct -> - let startPos = posFromPercentage (pct, max) - Log.Warning("Effective Start Position {tranche} (chunk {chunk})", startPos.CommitPosition, chunk startPos) - startPos + | Chunk c -> posFromChunk c + | Percentage pct -> posFromPercentage (pct, max) + Log.Warning("Start Position {pos} (chunk {chunk}, {pct:p1})", + startPos.CommitPosition, chunk startPos, float startPos.CommitPosition/ float max.CommitPosition) let nextPos = posFromChunkAfter startPos work.AddTranche(startPos, nextPos) Some nextPos @@ -628,6 +636,7 @@ module EventStoreReader = let mutable finished = false while not ct.IsCancellationRequested && not (finished && dop.CurrentCount <> maxDop) do let! _ = dop.Await() + work.OverallStats.DumpIfIntervalExpired() let forkRunRelease task = async { let! _ = Async.StartChild <| async { do! work.Process(conn, enumEvents, postBatch, task)