From ca8627477f861433c2fb6ad2dbf92b4fb4f0f981 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Wed, 10 Apr 2019 11:50:57 +0100 Subject: [PATCH] First cut EventStore tailing and progress writing --- equinox-sync/Sync.Tests/ProgressTests.fs | 36 ++ equinox-sync/Sync.Tests/Sync.Tests.fsproj | 1 + equinox-sync/Sync/Infrastructure.fs | 11 + equinox-sync/Sync/Program.fs | 504 ++++++++++++++-------- 4 files changed, 366 insertions(+), 186 deletions(-) create mode 100644 equinox-sync/Sync.Tests/ProgressTests.fs diff --git a/equinox-sync/Sync.Tests/ProgressTests.fs b/equinox-sync/Sync.Tests/ProgressTests.fs new file mode 100644 index 000000000..c50ca4fb0 --- /dev/null +++ b/equinox-sync/Sync.Tests/ProgressTests.fs @@ -0,0 +1,36 @@ +module ProgressTests + +open SyncTemplate.Program + +open Swensen.Unquote +open Xunit + +let [] ``Empty has zero streams pending or progress to write`` () = + let sut = Progress.State<_>() + None =! sut.ValidatedPos + 0 =! sut.PendingStreamCount + +let [] ``Can add multiple batches`` () = + let sut = Progress.State<_>() + sut.AddBatch(0,["a",1L; "b",2L]) + sut.AddBatch(1,["b",2L; "c",3L]) + None =! sut.ValidatedPos + 3 =! sut.PendingStreamCount + 2 =! sut.PendingBatchCount + +let [] ``Marking Progress Removes batches and updates progress`` () = + let sut = Progress.State<_>() + sut.AddBatch(0,["a",1L; "b",2L]) + sut.MarkStreamProgress("a",1L) + sut.MarkStreamProgress("b",1L) + None =! sut.ValidatedPos + 1 =! sut.PendingStreamCount + 1 =! sut.PendingBatchCount + +let [] ``Marking progress is not persistent`` () = + let sut = Progress.State<_>() + sut.AddBatch(0,["a",1L]) + sut.MarkStreamProgress("a",2L) + sut.AddBatch(1,["a",1L; "b",2L]) + 2 =! sut.PendingStreamCount + 1 =! sut.PendingBatchCount \ No newline at end of file diff --git a/equinox-sync/Sync.Tests/Sync.Tests.fsproj b/equinox-sync/Sync.Tests/Sync.Tests.fsproj index 8e0bfa8c4..a764d7553 100644 --- a/equinox-sync/Sync.Tests/Sync.Tests.fsproj +++ b/equinox-sync/Sync.Tests/Sync.Tests.fsproj @@ -7,6 +7,7 @@ + diff --git a/equinox-sync/Sync/Infrastructure.fs b/equinox-sync/Sync/Infrastructure.fs index 749335c15..3fb477a7b 100644 --- a/equinox-sync/Sync/Infrastructure.fs +++ b/equinox-sync/Sync/Infrastructure.fs @@ -18,6 +18,17 @@ type Async with and d : IDisposable = Console.CancelKeyPress.Subscribe callback in ()) +module Queue = + let tryDequeue (x : System.Collections.Generic.Queue<'T>) = +#if NET461 + if x.Count = 0 then None + else x.Dequeue() |> Some +#else + match x.TryDequeue() with + | false, _ -> None + | true, res -> Some res +#endif + open Equinox.Store // AwaitTaskCorrect type SemaphoreSlim with diff --git a/equinox-sync/Sync/Program.fs b/equinox-sync/Sync/Program.fs index f5c891a40..53d37a823 100644 --- a/equinox-sync/Sync/Program.fs +++ b/equinox-sync/Sync/Program.fs @@ -12,8 +12,24 @@ open System open System.Diagnostics open System.Threading -type StartPos = Absolute of int64 | Chunk of int | Percentage of float | Start | Ignore -type ReaderSpec = { start: StartPos; streams: string list; tailInterval: TimeSpan option; stripes: int; batchSize: int; minBatchSize: int } +type StartPos = Absolute of int64 | Chunk of int | Percentage of float | Tail +type ReaderSpec = + { /// Identifier for this projection and it's state + groupName: string + /// Start position from which forward reading is to commence // Assuming no stored position + start: StartPos + /// Additional streams with which to seed the reading + streams: string list + /// Delay when reading yields an empty batch + tailInterval: TimeSpan + /// Maximum number of stream readers to permit + stripes: int + /// Initial batch size to use when commencing reading + batchSize: int + /// Smallest batch size to degrade to in the presence of failures + minBatchSize: int } +let mb x = float x / 1024. / 1024. +let category (streamName : string) = streamName.Split([|'-'|],2).[0] module CmdParser = open Argu @@ -36,7 +52,6 @@ module CmdParser = #else | [] MinBatchSize of int | [] Stream of string - | [] All | [] Offset of int64 | [] Chunk of int | [] Percent of float @@ -64,31 +79,31 @@ module CmdParser = | BatchSize _ -> "maximum item count to request from feed. Default: 4096" | MinBatchSize _ -> "minimum item count to drop down to in reaction to read failures. Default: 512" | Stream _ -> "specific stream(s) to read" - | All -> "traverse EventStore $all from Start" | Offset _ -> "EventStore $all Stream Position to commence from" | Chunk _ -> "EventStore $all Chunk to commence from" | Percent _ -> "EventStore $all Stream Position to commence from (as a percentage of current tail position)" | Stripes _ -> "number of concurrent readers" - | Tail _ -> "attempt to read from tail at specified interval in Seconds" + | Tail _ -> "attempt to read from tail at specified interval in Seconds. Default: 1" | VerboseConsole -> "request Verbose Console Logging. Default: off" #endif | Verbose -> "request Verbose Logging. Default: off" | Source _ -> "CosmosDb input parameters." and Arguments(a : ParseResults) = - member __.LeaseId = a.GetResult ConsumerGroupName - member __.StartFromHere = a.Contains ForceStartFromHere member __.BatchSize = a.GetResult(BatchSize,1000) member __.MaybeSeqEndpoint = if a.Contains LocalSeq then Some "http://localhost:5341" else None #if cosmos + member __.LeaseId = a.GetResult ConsumerGroupName + member __.StartFromHere = a.Contains ForceStartFromHere member __.LagFrequency = a.TryGetResult LagFreqS |> Option.map TimeSpan.FromSeconds member __.ChangeFeedVerbose = a.Contains ChangeFeedVerbose #else + member __.ConsumerGroupName = a.GetResult ConsumerGroupName member __.VerboseConsole = a.Contains VerboseConsole member __.ConsoleMinLevel = if __.VerboseConsole then Serilog.Events.LogEventLevel.Information else Serilog.Events.LogEventLevel.Warning member __.StartingBatchSize = a.GetResult(BatchSize,4096) member __.MinBatchSize = a.GetResult(MinBatchSize,512) member __.Stripes = a.GetResult(Stripes,1) - member __.TailInterval = match a.TryGetResult Tail with Some s -> TimeSpan.FromSeconds s |> Some | None -> None + member __.TailInterval = a.GetResult(Tail,1.) |> TimeSpan.FromSeconds #endif member __.Verbose = a.Contains Verbose @@ -108,23 +123,17 @@ module CmdParser = x.LagFrequency |> Option.iter (fun s -> Log.Information("Dumping lag stats at {lagS:n0}s intervals", s.TotalSeconds)) disco, db, x.LeaseId, x.StartFromHere, x.BatchSize, x.LagFrequency #else - Log.Information("Processing Lease {leaseId} in Database {db} Collection {coll} in batches of {batchSize}", - x.LeaseId, x.Destination.Database, x.Destination.Collection, x.BatchSize) - if x.StartFromHere then Log.Warning("(If new projector group) Skipping projection of all existing events.") - x.LeaseId, x.StartFromHere, x.BatchSize - member x.BuildFeedParams() : ReaderSpec = - Log.Warning("Processing in batches of [{minBatchSize}..{batchSize}] with {stripes} stripes", x.MinBatchSize, x.StartingBatchSize, x.Stripes) + Log.Warning("Reading in batches of [{minBatchSize}..{batchSize}] with {stripes} stripes", x.MinBatchSize, x.StartingBatchSize, x.Stripes) let startPos = - match a.TryGetResult Offset, a.TryGetResult Chunk, a.TryGetResult Percent, a.Contains All with - | Some p, _, _, _ -> Log.Warning("Processing will commence at $all Position {p}", p); Absolute p - | _, Some c, _, _ -> Log.Warning("Processing will commence at $all Chunk {c}", c); StartPos.Chunk c - | _, _, Some p, _ -> Log.Warning("Processing will commence at $all Percentage {pct:P}", p/100.); Percentage p - | None, None, None, true -> Log.Warning "Processing will commence at $all Start"; Start - | None, None, None, false ->Log.Warning "No $all processing requested"; Ignore - match x.TailInterval with - | Some interval -> Log.Warning("Following tail at {seconds}s interval", interval.TotalSeconds) - | None -> Log.Warning "Not following tail" - { start = startPos; streams = a.GetResults Stream; tailInterval = x.TailInterval + match a.TryGetResult Offset, a.TryGetResult Chunk, a.TryGetResult Percent with + | Some p, _, _ -> Log.Warning("Processing will commence at $all Position {p}", p); Absolute p + | _, Some c, _ -> Log.Warning("Processing will commence at $all Chunk {c}", c); StartPos.Chunk c + | _, _, Some p -> Log.Warning("Processing will commence at $all Percentage {pct:P}", p/100.); Percentage p + | None, None, None -> Log.Warning "Processing will commence at $all Tail"; StartPos.Tail + Log.Information("Processing ConsumerGroupName {groupName} in Database {db} Collection {coll} in batches of {batchSize}", + x.ConsumerGroupName, x.Destination.Database, x.Destination.Collection, x.BatchSize) + Log.Warning("Following tail at {seconds}s interval", let i = x.TailInterval in i.TotalSeconds) + { groupName = x.ConsumerGroupName; start = startPos; streams = a.GetResults Stream; tailInterval = x.TailInterval batchSize = x.StartingBatchSize; minBatchSize = x.MinBatchSize; stripes = x.Stripes } #endif and [] SourceParameters = @@ -283,6 +292,38 @@ module Logging = |> fun c -> c.CreateLogger() Log.ForContext() +module Progress = + open System.Collections.Generic + + type [] internal Chunk<'Pos> = { pos: 'Pos; streamToRequiredIndex : Dictionary } + + type State<'Pos when 'Pos:equality>(?currentPos : 'Pos) = + let pending = Queue<_>() + member val ValidatedPos = currentPos with get, set + member __.MarkStreamProgress(stream, index) = + let mutable dirty = false + for x in pending do + match x.streamToRequiredIndex.TryGetValue stream with + | true, requiredIndex when requiredIndex <= index -> x.streamToRequiredIndex.Remove stream |> ignore; dirty <- true + | _, _ -> () + let headIsComplete () = match pending.TryPeek() with true, head -> Seq.isEmpty head.streamToRequiredIndex | _ -> false + while dirty && headIsComplete () do + let headBatch = pending.Dequeue() + __.ValidatedPos <- Some headBatch.pos + member __.AddBatch(pos, streamWithRequiredIndices : (string * int64) seq) = + let byStream = streamWithRequiredIndices |> Seq.groupBy fst |> Seq.map (fun (s,xs) -> KeyValuePair(s,xs |> Seq.map snd |> Seq.max)) + pending.Enqueue { pos = pos; streamToRequiredIndex = Dictionary byStream } + member __.PendingStreamCount = HashSet(seq { for x in pending do for y in x.streamToRequiredIndex.Keys do yield y }).Count + member __.PeekPendingStreams = seq { for x in pending do for y in x.streamToRequiredIndex.Keys do yield y } |> Seq.distinct + member __.PendingBatchCount = pending.Count + + let tryDump (log: Serilog.ILogger) (x : State<_>) = + match x.PendingStreamCount, x.PendingBatchCount with + | 0, 0 -> () + | streams, batches -> + log.Warning("Progress Waiting: {streams} streams, {batches} batches Pending streams: {queueHead}", + streams, batches, x.PeekPendingStreams |> Seq.truncate 5 ) + module CosmosIngester = open Equinox.Cosmos.Store @@ -347,75 +388,124 @@ module CosmosIngester = if curr.events.Length <> 0 then buffer.Add curr if buffer.Count = 0 then null else buffer.ToArray() - type [] StreamState = { write: int64; queue: Span[] } + let inline arrayBytes (x:byte[]) = if x = null then 0 else x.Length + type [] StreamState = { write: int64 option; queue: Span[] } with + /// Determines whether the head is ready to write (either write position is unknown, or matches) + member __.IsHeady = Array.tryHead __.queue |> Option.exists (fun x -> __.write |> Option.forall (fun w -> w = x.index)) + member __.IsReady = __.queue <> null && __.IsHeady + member __.Size = + if __.queue = null then 0 + else __.queue |> Seq.collect (fun x -> x.events) |> Seq.sumBy (fun x -> arrayBytes x.Data + arrayBytes x.Meta + x.EventType.Length*2 + 16) module StreamState = + let inline optionCombine f (r1: int64 option) (r2: int64 option) = + match r1, r2 with + | Some x, Some y -> f x y |> Some + | None, None -> None + | None, x | x, None -> x let combine (s1: StreamState) (s2: StreamState) : StreamState = - let writePos = max s1.write s2.write + let writePos = optionCombine max s1.write s2.write let items = seq { if s1.queue <> null then yield! s1.queue; if s2.queue <> null then yield! s2.queue } - { write = writePos; queue = Span.merge writePos items} - + { write = writePos; queue = Span.merge (defaultArg writePos 0L) items} + + let cosmosPayloadLimit = 2 * 1024 * 1024 - 1024 + let inline cosmosPayloadBytes (x: Equinox.Codec.IEvent) = arrayBytes x.Data + arrayBytes x.Meta + 4 + + let (|TimedOutMessage|RateLimitedMessage|Other|) (e: exn) = + match string e with + | m when m.Contains "Microsoft.Azure.Documents.RequestTimeoutException" -> TimedOutMessage + | m when m.Contains "Microsoft.Azure.Documents.RequestRateTooLargeException" -> RateLimitedMessage + | _ -> Other + + /// Gathers stats relating to how many items of a given category have been observed + type CatStats() = + let cats = System.Collections.Generic.Dictionary() + member __.Ingest cat = + match cats.TryGetValue cat with + | true, catCount -> cats.[cat] <- catCount + 1 + | false, _ -> cats.[cat] <- 1 + member __.Any = cats.Count <> 0 + member __.Clear() = cats.Clear() + member __.StatsDescending = cats |> Seq.map (|KeyValue|) |> Seq.sortByDescending snd + + type Result = TimedOut | RateLimited | Ok type StreamStates() = let states = System.Collections.Generic.Dictionary() + let dirty = System.Collections.Generic.Queue() + let markDirty stream = if dirty.Contains stream |> not then dirty.Enqueue stream let update stream (state : StreamState) = match states.TryGetValue stream with | false, _ -> states.Add(stream, state) + markDirty stream |> ignore + stream, state | true, current -> let updated = StreamState.combine current state states.[stream] <- updated + if updated.IsReady then markDirty stream |> ignore + stream, updated let updateWritePos stream pos span = update stream { write = pos; queue = span } - member __.Add (item: Batch) = updateWritePos item.stream 0L [|item.span|] + member __.Add (item: Batch) = updateWritePos item.stream None [|item.span|] member __.HandleWriteResult = function - | Writer.Result.Ok (stream, pos) -> updateWritePos stream pos null - | Writer.Result.Duplicate (stream, pos) -> updateWritePos stream pos null - | Writer.Result.Conflict overage -> updateWritePos overage.stream overage.span.index [|overage.span|] - | Writer.Result.Exn (_exn, batch) -> __.Add(batch) - - member __.PendingBatches = - [| for KeyValue (stream, state) in states do - if state.queue <> null then - let x = state.queue |> Array.head - let mutable count = 0 - let max1000EventsMax10EventsFirstTranche (_y : Equinox.Codec.IEvent) = - count <- count + 1 - // Reduce the item count when we don't yet know the write position - count <= (if state.write = 0L then 10 else 1000) - yield { stream = stream; span = { index = x.index; events = x.events |> Array.takeWhile max1000EventsMax10EventsFirstTranche } } |] - - /// Manages distribution of work across a specified number of concurrent writers - type SynchronousWriter(ctx : CosmosContext, log, ?maxDop) = - let states = Queue.StreamStates() - let dop = new SemaphoreSlim(defaultArg maxDop 10) - member __.Add item = states.Add item - member __.Pump(?attempts) = async { - let attempts = defaultArg attempts 5 - let rec loop pending remaining = async { - let! results = Async.Parallel <| seq { for batch in pending -> Writer.write log ctx batch |> dop.Throttle } - let mutable retriesRequired = false - for res in results do - states.HandleWriteResult res - res.WriteTo log - match res with - | Writer.Result.Ok _ | Writer.Result.Duplicate _ -> () - | Writer.Result.Conflict _ | Writer.Result.Exn _ -> retriesRequired <- true - let pending = states.PendingBatches - if not <| Array.isEmpty pending then - if retriesRequired then Log.Warning("Retrying; {count} streams to sync", Array.length pending) - else log.Information("Syncing {count} streams", Array.length pending) - if remaining <= 1 then log.Error("{retries} Sync attempts exceeded", attempts); failwith "Sync failed" - else return! loop pending (remaining-1) } - let pending = states.PendingBatches - let streams = Array.length pending - log.Information("Syncing {count} streams", streams) - do! loop pending attempts - return streams } - -let category (streamName : string) = streamName.Split([|'-'|],2).[0] + | Writer.Result.Ok (stream, pos) -> updateWritePos stream (Some pos) null, Ok + | Writer.Result.Duplicate (stream, pos) -> updateWritePos stream (Some pos) null, Ok + | Writer.Result.Conflict overage -> updateWritePos overage.stream (Some overage.span.index) [|overage.span|], Ok + | Writer.Result.Exn (exn, batch) -> + let r = + match exn with + | RateLimitedMessage -> RateLimited + | TimedOutMessage -> TimedOut + | Other -> Ok + __.Add(batch), r + member __.TryPending() = + match dirty |> Queue.tryDequeue with + | None -> None + | Some stream -> + let state = states.[stream] + + if not state.IsReady then None else + + let x = state.queue |> Array.head + + let mutable bytesBudget = cosmosPayloadLimit + let mutable count = 0 + let max2MbMax1000EventsMax10EventsFirstTranche (y : Equinox.Codec.IEvent) = + bytesBudget <- bytesBudget - cosmosPayloadBytes y + count <- count + 1 + // Reduce the item count when we don't yet know the write position + count <= (if Option.isNone state.write then 10 else 100) && (bytesBudget >= 0 || count = 1) + Some { stream = stream; span = { index = x.index; events = x.events |> Array.takeWhile max2MbMax1000EventsMax10EventsFirstTranche } } + member __.Dump() = + let mutable synced, ready, waiting = 0, 0, 0 + let mutable readyB, waitingB = 0L, 0L + let waitCats = CatStats() + for KeyValue (stream,state) in states do + match int64 state.Size with + | 0L -> synced <- synced + 1 + | sz when state.IsReady -> ready <- ready + 1; readyB <- readyB + sz + | sz -> waitCats.Ingest(category stream); waiting <- waiting + 1; waitingB <- waitingB + sz + Log.Warning("Syncing {dirty} Ready {ready}/{readyMb:n1}MB Waiting {waiting}/{waitingMb:n1}MB Synced {synced}", + dirty.Count, ready, mb readyB, waiting, mb waitingB, synced) + if waitCats.Any then Log.Warning("Waiting {waitCats}", waitCats.StatsDescending) + + type Writers(write, maxDop) = + let work = System.Collections.Concurrent.ConcurrentQueue() + let result = Event<_>() + [] member __.Result = result.Publish + member __.Enqueue item = work.Enqueue item + member __.HasCapacity = work.Count < maxDop * 2 + member __.Pump() = async { + let dop = new SemaphoreSlim(maxDop) + let dispatch item = async { let! res = write item in result.Trigger res } |> dop.Throttle + let! ct = Async.CancellationToken + while not ct.IsCancellationRequested do + match work.TryDequeue() with + | true, item -> do! dispatch item + | _ -> do! Async.Sleep 100 } -#if !cosmos +//#if !cosmos type EventStore.ClientAPI.RecordedEvent with member __.Timestamp = System.DateTimeOffset.FromUnixTimeMilliseconds(__.CreatedEpoch) @@ -495,9 +585,6 @@ module EventStoreSource = let posFromPercentage (pct,max : Position) = let rawPos = Position(float max.CommitPosition * pct / 100. |> int64, 0L) let chunk = int (chunk rawPos) in posFromChunk chunk // &&& 0xFFFFFFFFE0000000L // rawPos / 256L / 1024L / 1024L * 1024L * 1024L * 256L - let posFromChunkAfter (pos: Position) = - let nextChunk = 1 + int (chunk pos) - posFromChunk nextChunk let fetchMax (conn : IEventStoreConnection) = async { let! lastItemBatch = conn.ReadAllEventsBackwardAsync(Position.End, 1, resolveLinkTos = false) |> Async.AwaitTaskCorrect @@ -526,7 +613,7 @@ module EventStoreSource = fetchFrom 0L type [] PullResult = Exn of exn: exn | Eof | EndOfTranche - type ReaderGroup(conn : IEventStoreConnection, enumEvents, postBatch : CosmosIngester.Batch -> unit) = + type ReaderGroup(conn : IEventStoreConnection, enumEvents, postBatch : Position -> CosmosIngester.Batch[] -> unit) = member __.Pump(range : Range, batchSize, slicesStats : SliceStatsBuffer, overallStats : OverallStats, ?ignoreEmptyEof) = let sw = Stopwatch.StartNew() // we'll report the warmup/connect time on the first batch let rec loop () = async { @@ -541,15 +628,15 @@ module EventStoreSource = |> Seq.map (fun (streamId,xs) -> streamId, [| for _s, i, e in xs -> i, e |]) |> Array.ofSeq let usedStreams, usedCats = streams.Length, streams |> Seq.map fst |> Seq.distinct |> Seq.length - let mutable usedEvents = 0 - for stream,streamEvents in streams do - for pos, item in streamEvents do - usedEvents <- usedEvents + 1 - postBatch { stream = stream; span = { index = pos; events = [| item |]}} + let events : CosmosIngester.Batch[] = + [| for stream,streamEvents in streams do + for pos, item in streamEvents do + yield { stream = stream; span = { index = pos; events = [| item |]}} |] + postBatch currentSlice.NextPosition events if not(ignoreEmptyEof = Some true && batchEvents = 0 && not currentSlice.IsEndOfStream) then // ES doesnt report EOF on the first call :( Log.Warning("Read {pos,10} {pct:p1} {ft:n3}s {mb:n1}MB {count,4} {categories,3}c {streams,4}s {events,4}e Post {pt:n0}ms", range.Current.CommitPosition, range.PositionAsRangePercentage, (let e = sw.Elapsed in e.TotalSeconds), mb batchBytes, - batchEvents, usedCats, usedStreams, usedEvents, postSw.ElapsedMilliseconds) + batchEvents, usedCats, usedStreams, events.Length, postSw.ElapsedMilliseconds) let shouldLoop = range.TryNext currentSlice.NextPosition if shouldLoop && not currentSlice.IsEndOfStream then sw.Restart() // restart the clock as we hand off back to the Reader @@ -563,149 +650,189 @@ module EventStoreSource = type [] Work = | Stream of name: string * batchSize: int - | Tranche of range: Range * batchSize : int | Tail of pos: Position * interval: TimeSpan * batchSize : int - type FeedQueue(batchSize, minBatchSize, max, ?statsInterval) = + type FeedQueue(batchSize, minBatchSize, ?statsInterval) = let work = System.Collections.Concurrent.ConcurrentQueue() member val OverallStats = OverallStats(?statsInterval=statsInterval) member val SlicesStats = SliceStatsBuffer() - member __.AddTranche(range, ?batchSizeOverride) = - work.Enqueue <| Work.Tranche (range, defaultArg batchSizeOverride batchSize) - member __.AddTranche(pos, nextPos, ?batchSizeOverride) = - __.AddTranche(Range (pos, Some nextPos, max), ?batchSizeOverride=batchSizeOverride) member __.AddStream(name, ?batchSizeOverride) = work.Enqueue <| Work.Stream (name, defaultArg batchSizeOverride batchSize) member __.AddTail(pos, interval, ?batchSizeOverride) = work.Enqueue <| Work.Tail (pos, interval, defaultArg batchSizeOverride batchSize) member __.TryDequeue () = work.TryDequeue() - member __.Process(conn, enumEvents, postBatch, work) = async { + member __.Process(conn, enumEvents, postItem, shouldTail, postTail, work) = async { let adjust batchSize = if batchSize > minBatchSize then batchSize - 128 else batchSize match work with | Stream (name,batchSize) -> use _ = Serilog.Context.LogContext.PushProperty("Tranche",name) Log.Warning("Reading stream; batch size {bs}", batchSize) - try do! pullStream (conn, batchSize) name postBatch + try do! pullStream (conn, batchSize) name postItem Log.Warning("completed stream") with e -> let bs = adjust batchSize Log.Warning(e,"Could not read stream, retrying with batch size {bs}", bs) __.AddStream(name, bs) return false - | Tranche (range, batchSize) -> - use _ = Serilog.Context.LogContext.PushProperty("Tranche",chunk range.Current) - Log.Warning("Commencing tranche, batch size {bs}", batchSize) - let reader = ReaderGroup(conn, enumEvents, postBatch) - let! res = reader.Pump(range, batchSize, __.SlicesStats, __.OverallStats) - match res with - | PullResult.EndOfTranche -> - Log.Warning("Completed tranche") - __.OverallStats.DumpIfIntervalExpired() - return false - | PullResult.Eof -> - Log.Warning("REACHED THE END!") - __.OverallStats.DumpIfIntervalExpired(true) - return true - | PullResult.Exn e -> - let bs = adjust batchSize - Log.Warning(e, "Could not read All, retrying with batch size {bs}", bs) - __.OverallStats.DumpIfIntervalExpired() - __.AddTranche(range, bs) - return false | Tail (pos, interval, batchSize) -> - let mutable first, count, batchSize, range = true, 0, batchSize, Range(pos,None, Position.Start) + let mutable first, count, pauses, batchSize, range = true, 0, 0, batchSize, Range(pos,None, Position.Start) let statsInterval = defaultArg statsInterval (TimeSpan.FromMinutes 5.) let progressIntervalMs, tailIntervalMs = int64 statsInterval.TotalMilliseconds, int64 interval.TotalMilliseconds let progressSw, tailSw = Stopwatch.StartNew(), Stopwatch.StartNew() - let reader = ReaderGroup(conn, enumEvents, postBatch) + let awaitInterval = async { + match tailIntervalMs - tailSw.ElapsedMilliseconds with + | waitTimeMs when waitTimeMs > 0L -> do! Async.Sleep (int waitTimeMs) + | _ -> () + tailSw.Restart() } + let reader = ReaderGroup(conn, enumEvents, postTail) let slicesStats, stats = SliceStatsBuffer(), OverallStats() + use _ = Serilog.Context.LogContext.PushProperty("Tranche", "Tail") while true do let currentPos = range.Current - use _ = Serilog.Context.LogContext.PushProperty("Tranche", "Tail") if first then first <- false Log.Warning("Tailing at {interval}s interval", interval.TotalSeconds) elif progressSw.ElapsedMilliseconds > progressIntervalMs then - Log.Warning("Performed {count} tails to date @ {pos} chunk {chunk}", count, currentPos.CommitPosition, chunk currentPos) + Log.Warning("Performed {count} tails ({pauses} pauses) to date @ {pos} chunk {chunk}", count, pauses, currentPos.CommitPosition, chunk currentPos) progressSw.Restart() count <- count + 1 - let! res = reader.Pump(range,batchSize,slicesStats,stats,ignoreEmptyEof=true) + if shouldTail () then + let! res = reader.Pump(range,batchSize,slicesStats,stats,ignoreEmptyEof=true) + do! awaitInterval + match res with + | PullResult.EndOfTranche | PullResult.Eof -> () + | PullResult.Exn e -> + batchSize <- adjust batchSize + Log.Warning(e, "Tail $all failed, adjusting batch size to {bs}", batchSize) + else + pauses <- pauses + 1 + do! awaitInterval stats.DumpIfIntervalExpired() - match tailIntervalMs - tailSw.ElapsedMilliseconds with - | waitTimeMs when waitTimeMs > 0L -> do! Async.Sleep (int waitTimeMs) - | _ -> () - tailSw.Restart() - match res with - | PullResult.EndOfTranche | PullResult.Eof -> () - | PullResult.Exn e -> - batchSize <- adjust batchSize - Log.Warning(e, "Tail $all failed, adjusting batch size to {bs}", batchSize) return true } - type Reader(conn : IEventStoreConnection, spec: ReaderSpec, enumEvents, postBatch : CosmosIngester.Batch -> unit, max, ct : CancellationToken, ?statsInterval) = - let work = FeedQueue(spec.batchSize, spec.minBatchSize, max, ?statsInterval=statsInterval) - do match spec.tailInterval with - | Some interval -> work.AddTail(max, interval) - | None -> () + type Reader(conn : IEventStoreConnection, spec: ReaderSpec, enumEvents, max, ?statsInterval) = + let work = FeedQueue(spec.batchSize, spec.minBatchSize, ?statsInterval=statsInterval) + do work.AddTail(max, spec.tailInterval) for s in spec.streams do work.AddStream s - let mutable remainder = let startPos = match spec.start with - | StartPos.Start -> Position.Start + | StartPos.Tail -> max | Absolute p -> Position(p, 0L) | Chunk c -> posFromChunk c | Percentage pct -> posFromPercentage (pct, max) - | Ignore -> max Log.Warning("Start Position {pos} (chunk {chunk}, {pct:p1})", startPos.CommitPosition, chunk startPos, float startPos.CommitPosition/ float max.CommitPosition) - if spec.start = Ignore then None - else - let nextPos = posFromChunkAfter startPos - work.AddTranche(startPos, nextPos) - Some nextPos - member __.Pump () = async { - (*if spec.tail then enqueue tail work*) - let maxDop = spec.stripes + Option.count spec.tailInterval + member __.Pump(postItem, shouldTail, postTail) = async { + let maxDop = spec.stripes + 1 let dop = new SemaphoreSlim(maxDop) let mutable finished = false - while not ct.IsCancellationRequested && not (finished && dop.CurrentCount <> maxDop) do + let! ct = Async.CancellationToken + while not ct.IsCancellationRequested do let! _ = dop.Await() work.OverallStats.DumpIfIntervalExpired() let forkRunRelease task = async { let! _ = Async.StartChild <| async { - try let! eof = work.Process(conn, enumEvents, postBatch, task) - if eof then remainder <- None + try let! _ = work.Process(conn, enumEvents, postItem, shouldTail, postTail, task) in () finally dop.Release() |> ignore } return () } match work.TryDequeue() with | true, task -> do! forkRunRelease task + | false, _ when not finished-> + Log.Warning("No further ingestion work to commence") + finished <- true + | _ -> () } + + type [] CoordinationWork<'Pos> = + | Result of CosmosIngester.Writer.Result + | Unbatched of CosmosIngester.Batch + | BatchWithTracking of 'Pos * CosmosIngester.Batch[] + + let every ms f = + let timer = Stopwatch.StartNew() + fun () -> + if timer.ElapsedMilliseconds > ms then + f () + timer.Restart() + + type Coordinator(log : Serilog.ILogger, reader : Reader, cosmosContext, ?maxWriters, ?interval) = + let statsIntervalMs = let t = defaultArg interval (TimeSpan.FromMinutes 1.) in t.TotalMilliseconds |> int64 + let sleepIntervalMs = 100 + let work = System.Collections.Concurrent.ConcurrentQueue() + let buffer = CosmosIngester.Queue.StreamStates() + let writers = CosmosIngester.Writers(CosmosIngester.Writer.write log cosmosContext, defaultArg maxWriters 32) + let tailSyncState = Progress.State() + let pumpReaders = + let postWrite = work.Enqueue << CoordinationWork.Unbatched + let postBatch pos xs = work.Enqueue(CoordinationWork.BatchWithTracking (pos,xs)) + // Yes, there is a race, but its constrained by the number of parallel readers and the fact that batches get ingested quickly here + let shouldTailNow () = tailSyncState.PendingBatchCount < 10 && tailSyncState.PendingStreamCount < 10*4096 + reader.Pump(postWrite, shouldTailNow, postBatch) + let postWriteResult = work.Enqueue << CoordinationWork.Result + + member __.Pump () = async { + use _ = writers.Result.Subscribe postWriteResult + let! _ = Async.StartChild pumpReaders + let! _ = Async.StartChild <| writers.Pump() + let! ct = Async.CancellationToken + let mutable bytesPended = 0L + let resultsHandled, ingestionsHandled, workPended, eventsPended = ref 0, ref 0, ref 0, ref 0 + let mutable rateLimited, timedOut = 0, 0 + let dumpStats () = + if rateLimited <> 0 || timedOut <> 0 then Log.Warning("Failures {rateLimited} Rate-limited, {timedOut} Timed out", rateLimited, timedOut) + Log.Warning("Sent {queued} req {events} events; Completed {completed} reqs; Egress {gb:n3}GB", + !workPended, !eventsPended,!resultsHandled, mb bytesPended / 1024.) + ingestionsHandled := 0; workPended := 0; eventsPended := 0; resultsHandled := 0 + rateLimited <- 0; timedOut <- 0 + buffer.Dump() + let tryDumpStats = every statsIntervalMs dumpStats + let handle = function + | CoordinationWork.Unbatched item -> + buffer.Add item |> ignore + | CoordinationWork.BatchWithTracking(pos, items) -> + for item in items do + buffer.Add item |> ignore + tailSyncState.AddBatch(pos, [|for x in items -> x.stream, x.span.index + int64 x.span.events.Length |]) + | CoordinationWork.Result res -> + incr resultsHandled + let (stream, updatedState), kind = buffer.HandleWriteResult res + match updatedState.write with None -> () | Some wp -> tailSyncState.MarkStreamProgress(stream, wp) + res.WriteTo log + match kind with + | CosmosIngester.Queue.Ok -> res.WriteTo log + | CosmosIngester.Queue.RateLimited -> rateLimited <- rateLimited + 1 + | CosmosIngester.Queue.TimedOut -> timedOut <- timedOut + 1 + let queueWrite (w : CosmosIngester.Batch) = + incr workPended + eventsPended := !eventsPended + w.span.events.Length + bytesPended <- bytesPended + int64 (Array.sumBy CosmosIngester.Queue.cosmosPayloadBytes w.span.events) + writers.Enqueue w + while not ct.IsCancellationRequested do + // 1. propagate read items to buffer; propagate write results to buffer + Progress + match work.TryDequeue() with + | true, item -> + handle item | false, _ -> - match remainder with - | Some pos -> - let nextPos = posFromChunkAfter pos - remainder <- Some nextPos - do! forkRunRelease <| Work.Tranche (Range(pos, Some nextPos, max), spec.batchSize) - | None -> - if finished then do! Async.Sleep 1000 - else Log.Warning("No further ingestion work to commence") - finished <- true } - - let start log (leaseId, startFromHere, batchSize) (conn, spec, enumEvents, cosmosContext) = async { - let! ct = Async.CancellationToken + // 2. After that, [over] provision writers queue + let mutable more = writers.HasCapacity + while more do + match buffer.TryPending() with + | Some w -> queueWrite w; more <- writers.HasCapacity + | None -> (); more <- false + // 3. Periodically emit status info + tryDumpStats () + // TODO trigger periodic progress writing + // 5. Sleep if + do! Async.Sleep sleepIntervalMs } + + let start (log : Serilog.ILogger) (conn, spec, enumEvents) (maxWriters, cosmosContext) = async { let! max = establishMax conn - let writer = CosmosIngester.SynchronousWriter(cosmosContext, log) - let reader = Reader(conn, spec, enumEvents, writer.Add, max, ct) - let! _ = Async.StartChild <| reader.Pump() - return () - } - - //let run (ctx : Equinox.Cosmos.Core.CosmosContext) (source : GesConnection) (spec: ReaderSpec) (writerQueueLen, writerCount, readerQueueLen) = async { - // let! ingester = EventStoreSource.start(source.ReadConnection, writerQueueLen, writerCount, readerQueueLen) - // let! _feeder = EventStoreSource.start(source.ReadConnection, spec, enumEvents, ingester.Add) - // do! Async.AwaitKeyboardInterrupt() } + let reader = Reader(conn, spec, enumEvents, max) + let coordinator = Coordinator(log, reader, cosmosContext, maxWriters) + do! coordinator.Pump () } + let enumEvents catFilter (xs : EventStore.ClientAPI.ResolvedEvent[]) = seq { for e in xs -> let eb = EventStoreSource.esPayloadBytes e @@ -723,7 +850,8 @@ let enumEvents catFilter (xs : EventStore.ClientAPI.ResolvedEvent[]) = seq { Choice2Of2 e | e -> Choice1Of2 (e.EventStreamId, e.EventNumber, Equinox.Codec.Core.EventData.Create(e.EventType, e.Data, e.Metadata, e.Timestamp)) } -#else + +//#else module CosmosSource = open Microsoft.Azure.Documents open Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing @@ -786,21 +914,26 @@ module CosmosSource = //#endif let createRangeSyncHandler log (ctx: Core.CosmosContext) (transform : Microsoft.Azure.Documents.Document -> CosmosIngester.Batch seq) = + let writer = CosmosIngester.Writers(CosmosIngester.Writer.write log ctx, maxDop = 10) + let ingest docs : (*streams*)int * (*events*)int = + let streams, events = System.Collections.Generic.HashSet(), ref 0 + // TODO send this and a writer callback into a coordinator + for x in docs |> Seq.collect transform do + incr events + writer.Enqueue x + streams.Add x.stream |> ignore + !events, streams.Count let sw = Stopwatch.StartNew() // we'll end up reporting the warmup/connect time on the first batch, but that's ok - let writer = CosmosIngester.SynchronousWriter(ctx, log) let processBatch (ctx : IChangeFeedObserverContext) (docs : System.Collections.Generic.IReadOnlyList) = async { sw.Stop() // Stop the clock after ChangeFeedProcessor hands off to us - let pt, events = Stopwatch.Time (fun () -> - let items = docs |> Seq.collect transform |> Array.ofSeq - items |> Array.iter writer.Add - items) - let! et,streams = writer.Pump() |> Stopwatch.Time let r = ctx.FeedResponse - Log.Information("Read {range,2} -{token,6} {count,4} docs {requestCharge,6}RU {l:n1}s Gen {events,5} events {p:n3}s Sync {streams,5} streams {e:n1}s", + let pt, (events,streams) = Stopwatch.Time(fun () -> ingest docs) + Log.Information("Read {range,2} -{token,6} {count,4} docs {requestCharge,6}RU {l:n1}s Gen {events,5} events {p:n3}s Sync {streams,5} streams", ctx.PartitionKeyRangeId, r.ResponseContinuation.Trim[|'"'|], docs.Count, (let c = r.RequestCharge in c.ToString("n1")), - float sw.ElapsedMilliseconds / 1000., events.Length, (let e = pt.Elapsed in e.TotalSeconds), streams, (let e = et.Elapsed in e.TotalSeconds)) + float sw.ElapsedMilliseconds / 1000., events, (let e = pt.Elapsed in e.TotalSeconds), streams) sw.Restart() // restart the clock as we handoff back to the ChangeFeedProcessor } + // TODO add a coordinator child to take charge of writing, fixups and progress ChangeFeedObserver.Create(log, processBatch) let run (sourceDiscovery, source) (auxDiscovery, aux) connectionPolicy (leaseId, forceSkip, batchSize, lagReportFreq : TimeSpan option) @@ -814,7 +947,7 @@ module CosmosSource = ( Log.Logger, sourceDiscovery, connectionPolicy, source, aux, auxDiscovery = auxDiscovery, leasePrefix = leaseId, forceSkipExistingEvents = forceSkip, cfBatchSize = batchSize, createObserver = createRangeProjector, ?reportLagAndAwaitNextEstimation = maybeLogLag) do! Async.AwaitKeyboardInterrupt() } -#endif +//#endif [] let main argv = @@ -823,7 +956,12 @@ let main argv = let destination = args.Destination.Connect "SyncTemplate" |> Async.RunSynchronously let colls = CosmosCollections(args.Destination.Database, args.Destination.Collection) Equinox.Cosmos.Core.CosmosContext(destination, colls, Log.ForContext()) -#if cosmos +#if !cosmos + let log : ILogger = Logging.initialize args.Verbose args.VerboseConsole args.MaybeSeqEndpoint + let esConnection, catFilter = args.Source.Connect(log, log, ConnectionStrategy.ClusterSingle NodePreference.Master) + let spec = args.BuildChangeFeedParams() + EventStoreSource.start log (esConnection.ReadConnection, spec, enumEvents catFilter) (10, target) |> Async.RunSynchronously +#else let log = Logging.initialize args.Verbose args.ChangeFeedVerbose args.MaybeSeqEndpoint let discovery, source, connectionPolicy, catFilter = args.Source.BuildConnectionDetails() let auxDiscovery, aux, leaseId, startFromHere, batchSize, lagFrequency = args.BuildChangeFeedParams() @@ -837,14 +975,8 @@ let main argv = CosmosSource.run (discovery, source) (auxDiscovery, aux) connectionPolicy (leaseId, startFromHere, batchSize, lagFrequency) createSyncHandler -#else - let log = Logging.initialize args.Verbose args.VerboseConsole args.MaybeSeqEndpoint - let esConnection, catFilter = args.Source.Connect(log, log, ConnectionStrategy.ClusterSingle NodePreference.Master) - let leaseId, startFromHere, batchSize = args.BuildChangeFeedParams() - let spec = args.BuildFeedParams() - EventStoreSource.start log (leaseId, startFromHere, batchSize) (esConnection.ReadConnection, spec, enumEvents catFilter, target) -#endif |> Async.RunSynchronously +#endif 0 with :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 | CmdParser.InvalidArguments msg -> eprintfn "%s" msg; 1