Skip to content

Commit

Permalink
Add overall ingestion stats
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Mar 21, 2019
1 parent 9a4463d commit 52d7200
Showing 1 changed file with 40 additions and 31 deletions.
71 changes: 40 additions & 31 deletions equinox-sync/Sync/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ open FSharp.Control
open Serilog
open System

type StartPos = Absolute of int64 | Percentage of float | Start
type StartPos = Absolute of int64 | Chunk of int | Percentage of float | Start
type ReaderSpec = { start: StartPos; stripes: int; batchSize: int; streams: string list }
let mb x = float x / 1024. / 1024.

Expand Down Expand Up @@ -122,6 +122,7 @@ module CmdParser =
| [<AltCommandLine "-S"; Unique>] LocalSeq
| [<AltCommandLine "-s">] Stream of string
| [<AltCommandLine "-o"; Unique>] Offset of int64
| [<AltCommandLine "-c"; Unique>] Chunk of int
| [<AltCommandLine "-P"; Unique>] Percent of float
| [<AltCommandLine "-i"; Unique>] Stripes of int
| [<AltCommandLine "-t"; Unique>] Tail of intervalMs: int
Expand All @@ -135,6 +136,7 @@ module CmdParser =
| LocalSeq -> "configures writing to a local Seq endpoint at http://localhost:5341, see https://getseq.net"
| Stream _ -> "specific stream(s) to read"
| Offset _ -> "EventStore $all Stream Position to commence from"
| Chunk _ -> "EventStore $all Chunk to commence from"
| Percent _ -> "EventStore $all Stream Position to commence from (as a percentage of current tail position)"
| Stripes _ -> "number of concurrent readers"
| Tail _ -> "attempt to read from tail at specified interval in milliseconds"
Expand All @@ -154,10 +156,11 @@ module CmdParser =
| Some interval -> Log.Warning("Following tail at {seconds}s interval", interval.TotalSeconds)
| None -> Log.Warning "Not following tail"
let startPos =
match args.TryGetResult Offset, args.TryGetResult Percent with
| Some p, _ -> Log.Warning("Processing will commence at $all Position {p}", p); Absolute p
| _, Some p -> Log.Warning("Processing will commence at $all Percentage {pct:P}", p/100.); Percentage p
| None, None -> Log.Warning "Processing will commence at $all Start"; Start
match args.TryGetResult Offset, args.TryGetResult Chunk, args.TryGetResult Percent with
| Some p, _, _ -> Log.Warning("Processing will commence at $all Position {p}", p); Absolute p
| _, Some c, _ -> Log.Warning("Processing will commence at $all Chunk {c}", c); StartPos.Chunk c
| _, _, Some p -> Log.Warning("Processing will commence at $all Percentage {pct:P}", p/100.); Percentage p
| None, None, None -> Log.Warning "Processing will commence at $all Start"; Start
{ start = startPos; stripes = x.Stripes; batchSize = x.BatchSize; streams = args.GetResults Stream }

/// Parse the commandline; can throw exceptions in response to missing arguments and/or `-h`/`--help` args
Expand Down Expand Up @@ -439,7 +442,7 @@ module Ingester =
}

type SliceStatsBuffer(?interval) =
let intervalMs = let t = defaultArg interval (TimeSpan.FromMinutes 1.) in t.TotalMilliseconds |> int64
let intervalMs = let t = defaultArg interval (TimeSpan.FromMinutes 5.) in t.TotalMilliseconds |> int64
let recentCats, accStart = System.Collections.Generic.Dictionary<string,int*int>(), Stopwatch.StartNew()
member __.Ingest(slice: EventStore.ClientAPI.AllEventsSlice) =
let mutable batchBytes = 0
Expand All @@ -451,8 +454,8 @@ type SliceStatsBuffer(?interval) =
| false, _ -> recentCats.[cat] <- (1, eventBytes)
batchBytes <- batchBytes + eventBytes
slice.Events.Length, int64 batchBytes
member __.DumpIfIntervalExpired() =
if accStart.ElapsedMilliseconds > intervalMs then
member __.DumpIfIntervalExpired(?force) =
if accStart.ElapsedMilliseconds > intervalMs || defaultArg force false then
let log = function
| [||] -> ()
| xs ->
Expand All @@ -466,13 +469,20 @@ type SliceStatsBuffer(?interval) =
recentCats.Clear()
accStart.Restart()

type OverallStats() =
type OverallStats(?statsInterval) =
let intervalMs = let t = defaultArg statsInterval (TimeSpan.FromMinutes 5.) in t.TotalMilliseconds |> int64
let overallStart, progressStart = Stopwatch.StartNew(), Stopwatch.StartNew()
let mutable totalEvents, totalBytes = 0L, 0L
member __.Ingest(batchEvents, batchBytes) =
totalEvents <- totalEvents + batchEvents
totalBytes <- totalBytes + batchBytes
member __.Bytes = totalBytes
member __.Events = totalEvents
member __.DumpIfIntervalExpired() =
if progressStart.ElapsedMilliseconds > intervalMs then
let totalMb = mb totalBytes
Log.Warning("Traversed {events} events {gb:n1}GB {mbs}MB/s", totalEvents, totalMb/1024., totalMb*1000./float overallStart.ElapsedMilliseconds)
progressStart.Restart()

type Range(start, sliceEnd : EventStore.ClientAPI.Position option, max : EventStore.ClientAPI.Position) =
member val Current = start with get, set
Expand All @@ -488,17 +498,17 @@ type Range(start, sliceEnd : EventStore.ClientAPI.Position option, max : EventSt
module EventStoreReader =
open EventStore.ClientAPI

let posFromPercentage (pct,max : Position) =
let rawPos = float max.CommitPosition * pct / 100. |> int64
// @scarvel8: event_global_position = 256 x 1024 x 1024 x chunk_number + chunk_header_size (128) + event_position_offset_in_chunk
let chunkBase = rawPos &&& 0xFFFFFFFFE0000000L // rawPos / 256L / 1024L / 1024L * 1024L * 1024L * 256L
// @scarvel8: event_global_position = 256 x 1024 x 1024 x chunk_number + chunk_header_size (128) + event_position_offset_in_chunk
let chunk (pos: Position) = uint64 pos.CommitPosition >>> 28
let posFromChunk (chunk: int) =
let chunkBase = int64 chunk * 1024L * 1024L * 256L
Position(chunkBase,0L)
let posFromPercentage (pct,max : Position) =
let rawPos = Position(float max.CommitPosition * pct / 100. |> int64, 0L)
let chunk = int (chunk rawPos) in posFromChunk chunk // &&& 0xFFFFFFFFE0000000L // rawPos / 256L / 1024L / 1024L * 1024L * 1024L * 256L
let posFromChunkAfter (pos: Position) =
let chunkBase = pos.CommitPosition &&& 0xFFFFFFFFF0000000L // rawPos / 256L / 1024L / 1024L * 1024L * 1024L * 256L
let nextBase = chunkBase + 256L * 1024L * 1024L
Position(nextBase,0L)
let chunk (pos: Position) =
uint64 pos.CommitPosition >>> 28
let nextChunk = 1 + int (chunk pos)
posFromChunk nextChunk

let fetchMax (conn : IEventStoreConnection) = async {
let! lastItemBatch = conn.ReadAllEventsBackwardAsync(Position.End, 1, resolveLinkTos = false) |> Async.AwaitTaskCorrect
Expand Down Expand Up @@ -557,6 +567,7 @@ module EventStoreReader =
sw.Restart() // restart the clock as we hand off back to the Reader
return! loop ()
else
slicesStats.DumpIfIntervalExpired(force=true)
return currentSlice.IsEndOfStream }
async {
try let! eof = loop ()
Expand All @@ -566,18 +577,15 @@ module EventStoreReader =
type [<NoComparison>] Work =
| Stream of name: string * batchSize: int
| Tranche of range: Range * batchSize : int
type FeedQueue(batchSize, max) =
type FeedQueue(batchSize, max, ?statsInterval) =
let work = ConcurrentQueue()
let add item =
work.Enqueue item
Log.Warning("Added {item}; count: {count}", item, work.Count)
member val OverallStats = OverallStats() with get
member val OverallStats = OverallStats(?statsInterval=statsInterval) with get
member __.AddTranche(range, ?batchSizeOverride) =
add <| Work.Tranche (range, defaultArg batchSizeOverride batchSize)
work.Enqueue <| Work.Tranche (range, defaultArg batchSizeOverride batchSize)
member __.AddTranche(pos, nextPos, ?batchSizeOverride) =
__.AddTranche(Range (pos, Some nextPos, max), ?batchSizeOverride=batchSizeOverride)
member __.AddStream(name, ?batchSizeOverride) =
add <| Work.Stream (name, defaultArg batchSizeOverride batchSize)
work.Enqueue <| Work.Stream (name, defaultArg batchSizeOverride batchSize)
member __.TryDequeue () =
work.TryDequeue()
member __.Process(conn, enumEvents, postBatch, work) = async {
Expand Down Expand Up @@ -605,18 +613,18 @@ module EventStoreReader =
__.AddTranche(range, bs)
}

type Reader(conn : IEventStoreConnection, spec: ReaderSpec, enumEvents, postBatch : Ingester.Batch -> unit, max, ct : CancellationToken) =
let work = FeedQueue(spec.batchSize, max)
type Reader(conn : IEventStoreConnection, spec: ReaderSpec, enumEvents, postBatch : Ingester.Batch -> unit, max, ct : CancellationToken, ?statsInterval) =
let work = FeedQueue(spec.batchSize, max, ?statsInterval=statsInterval)
do for s in spec.streams do work.AddStream(s)
let mutable remainder =
let startPos =
match spec.start with
| StartPos.Start -> Position.Start
| Absolute p -> Position(p, 0L)
| Percentage pct ->
let startPos = posFromPercentage (pct, max)
Log.Warning("Effective Start Position {tranche} (chunk {chunk})", startPos.CommitPosition, chunk startPos)
startPos
| Chunk c -> posFromChunk c
| Percentage pct -> posFromPercentage (pct, max)
Log.Warning("Start Position {pos} (chunk {chunk}, {pct:p1})",
startPos.CommitPosition, chunk startPos, float startPos.CommitPosition/ float max.CommitPosition)
let nextPos = posFromChunkAfter startPos
work.AddTranche(startPos, nextPos)
Some nextPos
Expand All @@ -628,6 +636,7 @@ module EventStoreReader =
let mutable finished = false
while not ct.IsCancellationRequested && not (finished && dop.CurrentCount <> maxDop) do
let! _ = dop.Await()
work.OverallStats.DumpIfIntervalExpired()
let forkRunRelease task = async {
let! _ = Async.StartChild <| async {
do! work.Process(conn, enumEvents, postBatch, task)
Expand Down

0 comments on commit 52d7200

Please sign in to comment.