diff --git a/equinox-ingest/Ingest/Infrastructure.fs b/equinox-ingest/Ingest/Infrastructure.fs index 6c4cd7e28..816652591 100644 --- a/equinox-ingest/Ingest/Infrastructure.fs +++ b/equinox-ingest/Ingest/Infrastructure.fs @@ -1,5 +1,5 @@ [] -module private IngestTemplate.Infrastructure +module private Infrastructure open Equinox.Store // AwaitTaskCorrect open System @@ -15,16 +15,12 @@ type SemaphoreSlim with return! Async.AwaitTaskCorrect task } -module Queue = - let tryDequeue (x : System.Collections.Generic.Queue<'T>) = -#if NET461 - if x.Count = 0 then None - else x.Dequeue() |> Some -#else - match x.TryDequeue() with - | false, _ -> None - | true, res -> Some res -#endif + /// Throttling wrapper which waits asynchronously until the semaphore has available capacity + member semaphore.Throttle(workflow : Async<'T>) : Async<'T> = async { + let! _ = semaphore.Await() + try return! workflow + finally semaphore.Release() |> ignore + } #nowarn "21" // re AwaitKeyboardInterrupt #nowarn "40" // re AwaitKeyboardInterrupt diff --git a/equinox-ingest/Ingest/Ingest.fsproj b/equinox-ingest/Ingest/Ingest.fsproj index a6e638dd7..094ebc154 100644 --- a/equinox-ingest/Ingest/Ingest.fsproj +++ b/equinox-ingest/Ingest/Ingest.fsproj @@ -9,6 +9,7 @@ + diff --git a/equinox-ingest/Ingest/Program.fs b/equinox-ingest/Ingest/Program.fs index bd529e2b8..7b41878be 100644 --- a/equinox-ingest/Ingest/Program.fs +++ b/equinox-ingest/Ingest/Program.fs @@ -1,6 +1,7 @@ module IngestTemplate.Program open Equinox.Store // Infra +open SyncTemplate open FSharp.Control open Serilog open System @@ -8,8 +9,8 @@ open System.Collections.Concurrent open System.Diagnostics open System.Threading -type StartPos = Absolute of int64 | Chunk of int | Percentage of float | Start | Ignore -type ReaderSpec = { start: StartPos; streams: string list; tailInterval: TimeSpan option; stripes: int; batchSize: int; minBatchSize: int } +type StartPos = Absolute of int64 | Chunk of int | Percentage of float | Start +type ReaderSpec = { start: StartPos; stripes: int; batchSize: int; minBatchSize: int } let mb x = float x / 1024. / 1024. module CmdParser = @@ -119,13 +120,10 @@ module CmdParser = | [] Verbose | [] VerboseConsole | [] LocalSeq - | [] Stream of string - | [] All - | [] Offset of int64 + | [] Position of int64 | [] Chunk of int | [] Percent of float | [] Stripes of int - | [] Tail of intervalS: float | [] Es of ParseResults interface IArgParserTemplate with member a.Usage = @@ -135,13 +133,10 @@ module CmdParser = | Verbose -> "request Verbose Logging. Default: off" | VerboseConsole -> "request Verbose Console Logging. Default: off" | LocalSeq -> "configures writing to a local Seq endpoint at http://localhost:5341, see https://getseq.net" - | Stream _ -> "specific stream(s) to read" - | All -> "traverse EventStore $all from Start" - | Offset _ -> "EventStore $all Stream Position to commence from" + | Position _ -> "EventStore $all Stream Position to commence from" | Chunk _ -> "EventStore $all Chunk to commence from" | Percent _ -> "EventStore $all Stream Position to commence from (as a percentage of current tail position)" | Stripes _ -> "number of concurrent readers" - | Tail _ -> "attempt to read from tail at specified interval in Seconds" | Es _ -> "specify EventStore parameters" and Arguments(args : ParseResults) = member val EventStore = EventStore.Arguments(args.GetResult Es) @@ -151,21 +146,15 @@ module CmdParser = member __.StartingBatchSize = args.GetResult(BatchSize,4096) member __.MinBatchSize = args.GetResult(MinBatchSize,512) member __.Stripes = args.GetResult(Stripes,1) - member __.TailInterval = match args.TryGetResult Tail with Some s -> TimeSpan.FromSeconds s |> Some | None -> None member x.BuildFeedParams() : ReaderSpec = Log.Warning("Processing in batches of [{minBatchSize}..{batchSize}] with {stripes} stripes", x.MinBatchSize, x.StartingBatchSize, x.Stripes) let startPos = - match args.TryGetResult Offset, args.TryGetResult Chunk, args.TryGetResult Percent, args.Contains All with - | Some p, _, _, _ -> Log.Warning("Processing will commence at $all Position {p}", p); Absolute p - | _, Some c, _, _ -> Log.Warning("Processing will commence at $all Chunk {c}", c); StartPos.Chunk c - | _, _, Some p, _ -> Log.Warning("Processing will commence at $all Percentage {pct:P}", p/100.); Percentage p - | None, None, None, true -> Log.Warning "Processing will commence at $all Start"; Start - | None, None, None, false ->Log.Warning "No $all processing requested"; Ignore - match x.TailInterval with - | Some interval -> Log.Warning("Following tail at {seconds}s interval", interval.TotalSeconds) - | None -> Log.Warning "Not following tail" - { start = startPos; streams = args.GetResults Stream; tailInterval = x.TailInterval - batchSize = x.StartingBatchSize; minBatchSize = x.MinBatchSize; stripes = x.Stripes } + match args.TryGetResult Position, args.TryGetResult Chunk, args.TryGetResult Percent with + | Some p, _, _ -> Log.Warning("Processing will commence at $all Position {p}", p); Absolute p + | _, Some c, _ -> Log.Warning("Processing will commence at $all Chunk {c}", c); StartPos.Chunk c + | _, _, Some p -> Log.Warning("Processing will commence at $all Percentage {pct:P}", p/100.); Percentage p + | None, None, None -> Log.Warning "Processing will commence at $all Start"; Start + { start = startPos; batchSize = x.StartingBatchSize; minBatchSize = x.MinBatchSize; stripes = x.Stripes } /// Parse the commandline; can throw exceptions in response to missing arguments and/or `-h`/`--help` args let parse argv : Arguments = @@ -187,270 +176,67 @@ module Logging = |> fun c -> match maybeSeqEndpoint with None -> c | Some endpoint -> c.WriteTo.Seq(endpoint) |> fun c -> c.CreateLogger() -module Ingester = - open Equinox.Cosmos.Core - open Equinox.Cosmos.Store - - type [] Span = { index: int64; events: Equinox.Codec.IEvent[] } - type [] Batch = { stream: string; span: Span } - - module Writer = - type [] Result = - | Ok of stream: string * updatedPos: int64 - | Duplicate of stream: string * updatedPos: int64 - | Conflict of overage: Batch - | Exn of exn: exn * batch: Batch - member __.WriteTo(log: ILogger) = - match __ with - | Ok (stream, pos) -> log.Information("Wrote {stream} up to {pos}", stream, pos) - | Duplicate (stream, pos) -> log.Information("Ignored {stream} (synced up to {pos})", stream, pos) - | Conflict overage -> log.Information("Requeing {stream} {pos} ({count} events)", overage.stream, overage.span.index, overage.span.events.Length) - | Exn (exn, batch) -> log.Warning(exn,"Writing {stream} failed, retrying {count} events ....", batch.stream, batch.span.events.Length) - let private write (ctx : CosmosContext) ({ stream = s; span={ index = p; events = e}} as batch) = async { - let stream = ctx.CreateStream s - Log.Information("Writing {s}@{i}x{n}",s,p,e.Length) - try let! res = ctx.Sync(stream, { index = p; etag = None }, e) - match res with - | AppendResult.Ok pos -> return Ok (s, pos.index) - | AppendResult.Conflict (pos, _) | AppendResult.ConflictUnknown pos -> - match pos.index, p + e.LongLength with - | actual, expectedMax when actual >= expectedMax -> return Duplicate (s, pos.index) - | actual, _ when p >= actual -> return Conflict batch - | actual, _ -> - Log.Debug("pos {pos} batch.pos {bpos} len {blen} skip {skip}", actual, p, e.LongLength, actual-p) - return Conflict { stream = s; span = { index = actual; events = e |> Array.skip (actual-p |> int) } } - with e -> return Exn (e, batch) } - - /// Manages distribution of work across a specified number of concurrent writers - type WriteQueue(ctx : CosmosContext, queueLen, ct : CancellationToken) = - let buffer = new BlockingCollection<_>(ConcurrentQueue(), queueLen) - let result = Event<_>() - let child = async { - let! ct = Async.CancellationToken // i.e. cts.Token - for item in buffer.GetConsumingEnumerable(ct) do - let! res = write ctx item - result.Trigger res } - member internal __.StartConsumers n = - for _ in 1..n do - Async.StartAsTask(child, cancellationToken=ct) |> ignore - - /// Supply an item to be processed - member __.TryAdd(item, timeout : TimeSpan) = buffer.TryAdd(item, int timeout.TotalMilliseconds, ct) - [] member __.Result = result.Publish - - let inline arrayBytes (x:byte[]) = if x = null then 0 else x.Length - - type [] StreamState = { read: int64 option; write: int64 option; isMalformed : bool; queue: Span[] } with - /// Determines whether the head is ready to write (either write position is unknown, or matches) - member __.IsHeady = Array.tryHead __.queue |> Option.exists (fun x -> __.write |> Option.forall (fun w -> w = x.index)) - member __.IsReady = __.queue <> null && not __.isMalformed && __.IsHeady - member __.Size = - if __.queue = null then 0 - else __.queue |> Seq.collect (fun x -> x.events) |> Seq.sumBy (fun x -> arrayBytes x.Data + arrayBytes x.Meta + x.EventType.Length*2 + 16) - module StreamState = - module Span = - let private (|Max|) x = x.index + x.events.LongLength - let private trim min (Max m as x) = - // Full remove - if m <= min then { index = min; events = [||] } - // Trim until min - elif m > min && x.index < min then { index = min; events = x.events |> Array.skip (min - x.index |> int) } - // Leave it - else x - let merge min (xs : Span seq) = - let buffer = ResizeArray() - let mutable curr = { index = min; events = [||]} - for x in xs |> Seq.sortBy (fun x -> x.index) do - match curr, trim min x with - // no data incoming, skip - | _, x when x.events.Length = 0 -> - () - // Not overlapping, no data buffered -> buffer - | c, x when c.events.Length = 0 -> - curr <- x - // Overlapping, join - | Max cMax as c, x when cMax >= x.index -> - curr <- { c with events = Array.append c.events (trim cMax x).events } - // Not overlapping, new data - | c, x -> - buffer.Add c - curr <- x - if curr.events.Length <> 0 then buffer.Add curr - if buffer.Count = 0 then null else buffer.ToArray() - - let inline optionCombine f (r1: int64 option) (r2: int64 option) = - match r1, r2 with - | Some x, Some y -> f x y |> Some - | None, None -> None - | None, x | x, None -> x - let combine (s1: StreamState) (s2: StreamState) : StreamState = - let writePos = optionCombine max s1.write s2.write - let items = seq { if s1.queue <> null then yield! s1.queue; if s2.queue <> null then yield! s2.queue } - { read = optionCombine max s1.read s2.read; write = writePos; isMalformed = s1.isMalformed || s2.isMalformed; queue = Span.merge (defaultArg writePos 0L) items} - - /// Gathers stats relating to how many items of a given category have been observed - type CatStats() = - let cats = System.Collections.Generic.Dictionary() - member __.Ingest cat = - match cats.TryGetValue cat with - | true, catCount -> cats.[cat] <- catCount + 1 - | false, _ -> cats.[cat] <- 1 - member __.Any = cats.Count <> 0 - member __.Clear() = cats.Clear() - member __.StatsDescending = cats |> Seq.map (|KeyValue|) |> Seq.sortByDescending snd - - let category (s : string) = s.Split([|'-'|], 2, StringSplitOptions.RemoveEmptyEntries) |> Array.head - let cosmosPayloadLimit = 2 * 1024 * 1024 - 1024 - let inline cosmosPayloadBytes (x: Equinox.Codec.IEvent) = arrayBytes x.Data + arrayBytes x.Meta + 4 - let (|TimedOutMessage|RateLimitedMessage|MalformedMessage|Other|) (e: exn) = - match string e with - | m when m.Contains "Microsoft.Azure.Documents.RequestTimeoutException" -> TimedOutMessage - | m when m.Contains "Microsoft.Azure.Documents.RequestRateTooLargeException" -> RateLimitedMessage - | m when m.Contains "SyntaxError: JSON.parse Error: Unexpected input at position" - || m.Contains "SyntaxError: JSON.parse Error: Invalid character at position" -> MalformedMessage - | _ -> Other - - type Result = TimedOut | RateLimited | Malformed of category: string | Ok - type StreamStates() = - let states = System.Collections.Generic.Dictionary() - let dirty = System.Collections.Generic.Queue() - let markDirty stream = if dirty.Contains stream |> not then dirty.Enqueue stream - - let update stream (state : StreamState) = - Log.Debug("Updated {s} r{r} w{w}", stream, state.read, state.write) - match states.TryGetValue stream with - | false, _ -> - states.Add(stream, state) - markDirty stream |> ignore - | true, current -> - let updated = StreamState.combine current state - states.[stream] <- updated - if updated.IsReady then markDirty stream |> ignore - let updateWritePos stream pos isMalformed span = - update stream { read = None; write = pos; isMalformed = isMalformed; queue = span } - - member __.Add (item: Batch, ?isMalformed) = updateWritePos item.stream None (defaultArg isMalformed false) [|item.span|] - member __.HandleWriteResult = function - | Writer.Result.Ok (stream, pos) -> updateWritePos stream (Some pos) false null; Ok - | Writer.Result.Duplicate (stream, pos) -> updateWritePos stream (Some pos) false null; Ok - | Writer.Result.Conflict overage -> updateWritePos overage.stream (Some overage.span.index) false [|overage.span|]; Ok - | Writer.Result.Exn (exn, batch) -> - let r, malformed = - match exn with - | RateLimitedMessage -> RateLimited, false - | TimedOutMessage -> TimedOut, false - | MalformedMessage -> Malformed (category batch.stream), true - | Other -> Ok, false - __.Add(batch,malformed) - r - member __.TryPending() = - match dirty |> Queue.tryDequeue with - | None -> None - | Some stream -> - let state = states.[stream] - - if not state.IsReady then None else - - let x = state.queue |> Array.head - - let mutable bytesBudget = cosmosPayloadLimit - let mutable count = 0 - let max2MbMax1000EventsMax10EventsFirstTranche (y : Equinox.Codec.IEvent) = - bytesBudget <- bytesBudget - cosmosPayloadBytes y - count <- count + 1 - // Reduce the item count when we don't yet know the write position - count <= (if Option.isNone state.write then 10 else 100) && (bytesBudget >= 0 || count = 1) - Some { stream = stream; span = { index = x.index; events = x.events |> Array.takeWhile max2MbMax1000EventsMax10EventsFirstTranche } } - member __.Dump() = - let mutable synced, ready, waiting, malformed = 0, 0, 0, 0 - let mutable readyB, waitingB, malformedB = 0L, 0L, 0L - let waitCats = CatStats() - for KeyValue (stream,state) in states do - match int64 state.Size with - | 0L -> synced <- synced + 1 - | sz when state.isMalformed -> malformed <- malformed + 1; malformedB <- malformedB + sz - | sz when state.IsReady -> ready <- ready + 1; readyB <- readyB + sz - | sz -> waitCats.Ingest(category stream); waiting <- waiting + 1; waitingB <- waitingB + sz - Log.Warning("Syncing {dirty} Ready {ready}/{readyMb:n1}MB Waiting {waiting}/{waitingMb:n1}MB Malformed {malformed}/{malformedMb:n1}MB Synced {synced}", - dirty.Count, ready, mb readyB, waiting, mb waitingB, malformed, mb malformedB, synced) - if waitCats.Any then Log.Warning("Waiting {waitCats}", waitCats.StatsDescending) - - type SyncQueue(log : Serilog.ILogger, writer : Writer.WriteQueue, cancellationToken: CancellationToken, readerQueueLen, ?interval) = - let intervalMs = let t = defaultArg interval (TimeSpan.FromMinutes 1.) in t.TotalMilliseconds |> int64 - let states = StreamStates() - let results = ConcurrentQueue<_>() - let work = new BlockingCollection<_>(ConcurrentQueue<_>(), readerQueueLen) - - member __.Add item = work.Add item - member __.HandleWriteResult = results.Enqueue - member __.Pump() = - let fiveMs = TimeSpan.FromMilliseconds 5. - let mutable pendingWriterAdd = None - let mutable bytesPended = 0L - let resultsHandled, ingestionsHandled, workPended, eventsPended = ref 0, ref 0, ref 0, ref 0 - let badCats = CatStats() - let progressTimer = Stopwatch.StartNew() - while not cancellationToken.IsCancellationRequested do - let mutable moreResults, rateLimited, timedOut = true, 0, 0 - while moreResults do - match results.TryDequeue() with - | true, res -> - incr resultsHandled - match states.HandleWriteResult res with - | Malformed cat -> badCats.Ingest cat - | RateLimited -> rateLimited <- rateLimited + 1 - | TimedOut -> timedOut <- timedOut + 1 - | Ok -> res.WriteTo log - | false, _ -> moreResults <- false - if rateLimited <> 0 || timedOut <> 0 then Log.Warning("Failures {rateLimited} Rate-limited, {timedOut} Timed out", rateLimited, timedOut) - let mutable t = Unchecked.defaultof<_> - let mutable toIngest = 4096 * 5 - while work.TryTake(&t,fiveMs) && toIngest > 0 do - incr ingestionsHandled - toIngest <- toIngest - 1 - states.Add t - let mutable moreWork = true - while moreWork do - let wrk = - match pendingWriterAdd with - | Some w -> - pendingWriterAdd <- None - Some w - | None -> - let pending = states.TryPending() - match pending with - | Some p -> Some p - | None -> - moreWork <- false - None - match wrk with - | None -> () - | Some w -> - if not (writer.TryAdd(w,fiveMs)) then - moreWork <- false - pendingWriterAdd <- Some w - else - incr workPended - eventsPended := !eventsPended + w.span.events.Length - bytesPended <- bytesPended + int64 (Array.sumBy cosmosPayloadBytes w.span.events) - - if progressTimer.ElapsedMilliseconds > intervalMs then - progressTimer.Restart() - Log.Warning("Ingested {ingestions}; Sent {queued} req {events} events; Completed {completed} reqs; Egress {gb:n3}GB", - !ingestionsHandled, !workPended, !eventsPended,!resultsHandled, mb bytesPended / 1024.) - if badCats.Any then Log.Error("Malformed {badCats}", badCats.StatsDescending); badCats.Clear() - ingestionsHandled := 0; workPended := 0; eventsPended := 0; resultsHandled := 0 - states.Dump() +type Coordinator(log : Serilog.ILogger, writers : CosmosIngester.Writers, cancellationToken: CancellationToken, readerQueueLen, ?interval) = + let intervalMs = let t = defaultArg interval (TimeSpan.FromMinutes 1.) in t.TotalMilliseconds |> int64 + let states = CosmosIngester.StreamStates() + let results = ConcurrentQueue<_>() + let work = new BlockingCollection<_>(ConcurrentQueue<_>(), readerQueueLen) + + member __.Add item = work.Add item + member __.HandleWriteResult = results.Enqueue + member __.Pump() = + let _ = writers.Result.Subscribe __.HandleWriteResult // codependent, wont worry about unsubcribing + let fiveMs = TimeSpan.FromMilliseconds 5. + let mutable pendingWriterAdd = None + let mutable bytesPended = 0L + let resultsHandled, ingestionsHandled, workPended, eventsPended = ref 0, ref 0, ref 0, ref 0 + let badCats = CosmosIngester.CatStats() + let progressTimer = Stopwatch.StartNew() + while not cancellationToken.IsCancellationRequested do + let mutable moreResults, rateLimited, timedOut = true, 0, 0 + while moreResults do + match results.TryDequeue() with + | true, res -> + incr resultsHandled + match states.HandleWriteResult res with + | (stream, _), CosmosIngester.Malformed -> CosmosIngester.category stream |> badCats.Ingest + | _, CosmosIngester.RateLimited -> rateLimited <- rateLimited + 1 + | _, CosmosIngester.TimedOut -> timedOut <- timedOut + 1 + | _, CosmosIngester.Ok -> res.WriteTo log + | false, _ -> moreResults <- false + if rateLimited <> 0 || timedOut <> 0 then Log.Warning("Failures {rateLimited} Rate-limited, {timedOut} Timed out", rateLimited, timedOut) + let mutable t = Unchecked.defaultof<_> + let mutable toIngest = 4096 * 5 + while work.TryTake(&t,fiveMs) && toIngest > 0 do + incr ingestionsHandled + toIngest <- toIngest - 1 + states.Add t |> ignore + let mutable moreWork = true + while writers.HasCapacity && moreWork do + let pending = states.TryReady(writers.IsStreamBusy) + match pending with + | None -> moreWork <- false + | Some w -> + incr workPended + eventsPended := !eventsPended + w.span.events.Length + bytesPended <- bytesPended + int64 (Array.sumBy CosmosIngester.cosmosPayloadBytes w.span.events) + + if progressTimer.ElapsedMilliseconds > intervalMs then + progressTimer.Restart() + Log.Warning("Ingested {ingestions}; Sent {queued} req {events} events; Completed {completed} reqs; Egress {gb:n3}GB", + !ingestionsHandled, !workPended, !eventsPended,!resultsHandled, mb bytesPended / 1024.) + if badCats.Any then Log.Error("Malformed {badCats}", badCats.StatsDescending); badCats.Clear() + ingestionsHandled := 0; workPended := 0; eventsPended := 0; resultsHandled := 0 + states.Dump log /// Manages establishing of the writer 'threads' - can be Stop()ped explicitly and/or will stop when caller does - let start(ctx : CosmosContext, writerQueueLen, writerCount, readerQueueLen) = async { + static member Start(log, ctx, writerQueueLen, writerCount, readerQueueLen) = async { let! ct = Async.CancellationToken - let writer = Writer.WriteQueue(ctx, writerQueueLen, ct) - let queue = SyncQueue(Log.Logger, writer, ct, readerQueueLen) - let _ = writer.Result.Subscribe queue.HandleWriteResult // codependent, wont worry about unsubcribing - writer.StartConsumers writerCount - let! _ = Async.StartChild(async { queue.Pump() }) - return queue + let writers = CosmosIngester.Writers(CosmosIngester.Writer.write log ctx, writerCount, writerQueueLen) + let instance = Coordinator(log, writers, ct, readerQueueLen) + let! _ = Async.StartChild <| writers.Pump() + let! _ = Async.StartChild(async { instance.Pump() }) + return instance } type EventStore.ClientAPI.RecordedEvent with @@ -459,7 +245,7 @@ type EventStore.ClientAPI.RecordedEvent with module EventStoreReader = open EventStore.ClientAPI - let inline esRecPayloadBytes (x: EventStore.ClientAPI.RecordedEvent) = Ingester.arrayBytes x.Data + Ingester.arrayBytes x.Metadata + let inline esRecPayloadBytes (x: EventStore.ClientAPI.RecordedEvent) = CosmosIngester.arrayBytes x.Data + CosmosIngester.arrayBytes x.Metadata let inline esPayloadBytes (x: EventStore.ClientAPI.ResolvedEvent) = esRecPayloadBytes x.Event + x.OriginalStreamId.Length * 2 type SliceStatsBuffer(?interval) = @@ -469,7 +255,7 @@ module EventStoreReader = lock recentCats <| fun () -> let mutable batchBytes = 0 for x in slice.Events do - let cat = Ingester.category x.OriginalStreamId + let cat = CosmosIngester.category x.OriginalStreamId let eventBytes = esPayloadBytes x match recentCats.TryGetValue cat with | true, (currCount, currSize) -> recentCats.[cat] <- (currCount + 1, currSize+eventBytes) @@ -547,20 +333,9 @@ module EventStoreReader = Log.Warning(e,"Could not establish max position") do! Async.Sleep 5000 return Option.get max } - let pullStream (conn : IEventStoreConnection, batchSize) stream (postBatch : Ingester.Batch -> unit) = - let rec fetchFrom pos = async { - let! currentSlice = conn.ReadStreamEventsBackwardAsync(stream, pos, batchSize, resolveLinkTos=true) |> Async.AwaitTaskCorrect - if currentSlice.IsEndOfStream then return () else - let events = - [| for x in currentSlice.Events -> - let e = x.Event - Equinox.Codec.Core.EventData.Create(e.EventType, e.Data, e.Metadata, e.Timestamp) :> Equinox.Codec.IEvent |] - postBatch { stream = stream; span = { index = currentSlice.FromEventNumber; events = events } } - return! fetchFrom currentSlice.NextEventNumber } - fetchFrom 0L type [] PullResult = Exn of exn: exn | Eof | EndOfTranche - type ReaderGroup(conn : IEventStoreConnection, enumEvents, postBatch : Ingester.Batch -> unit) = + type ReaderGroup(conn : IEventStoreConnection, enumEvents, postBatch : CosmosIngester.Batch -> unit) = member __.Pump(range : Range, batchSize, slicesStats : SliceStatsBuffer, overallStats : OverallStats, ?ignoreEmptyEof) = let sw = Stopwatch.StartNew() // we'll report the warmup/connect time on the first batch let rec loop () = async { @@ -596,9 +371,7 @@ module EventStoreReader = with e -> return Exn e } type [] Work = - | Stream of name: string * batchSize: int | Tranche of range: Range * batchSize : int - | Tail of pos: Position * interval: TimeSpan * batchSize : int type FeedQueue(batchSize, minBatchSize, max, ?statsInterval) = let work = ConcurrentQueue() member val OverallStats = OverallStats(?statsInterval=statsInterval) @@ -607,25 +380,11 @@ module EventStoreReader = work.Enqueue <| Work.Tranche (range, defaultArg batchSizeOverride batchSize) member __.AddTranche(pos, nextPos, ?batchSizeOverride) = __.AddTranche(Range (pos, Some nextPos, max), ?batchSizeOverride=batchSizeOverride) - member __.AddStream(name, ?batchSizeOverride) = - work.Enqueue <| Work.Stream (name, defaultArg batchSizeOverride batchSize) - member __.AddTail(pos, interval, ?batchSizeOverride) = - work.Enqueue <| Work.Tail (pos, interval, defaultArg batchSizeOverride batchSize) member __.TryDequeue () = work.TryDequeue() member __.Process(conn, enumEvents, postBatch, work) = async { let adjust batchSize = if batchSize > minBatchSize then batchSize - 128 else batchSize match work with - | Stream (name,batchSize) -> - use _ = Serilog.Context.LogContext.PushProperty("Tranche",name) - Log.Warning("Reading stream; batch size {bs}", batchSize) - try do! pullStream (conn, batchSize) name postBatch - Log.Warning("completed stream") - with e -> - let bs = adjust batchSize - Log.Warning(e,"Could not read stream, retrying with batch size {bs}", bs) - __.AddStream(name, bs) - return false | Tranche (range, batchSize) -> use _ = Serilog.Context.LogContext.PushProperty("Tranche",chunk range.Current) Log.Warning("Commencing tranche, batch size {bs}", batchSize) @@ -645,44 +404,10 @@ module EventStoreReader = Log.Warning(e, "Could not read All, retrying with batch size {bs}", bs) __.OverallStats.DumpIfIntervalExpired() __.AddTranche(range, bs) - return false - | Tail (pos, interval, batchSize) -> - let mutable first, count, batchSize, range = true, 0, batchSize, Range(pos,None, Position.Start) - let statsInterval = defaultArg statsInterval (TimeSpan.FromMinutes 5.) - let progressIntervalMs, tailIntervalMs = int64 statsInterval.TotalMilliseconds, int64 interval.TotalMilliseconds - let progressSw, tailSw = Stopwatch.StartNew(), Stopwatch.StartNew() - let reader = ReaderGroup(conn, enumEvents, postBatch) - let slicesStats, stats = SliceStatsBuffer(), OverallStats() - while true do - let currentPos = range.Current - use _ = Serilog.Context.LogContext.PushProperty("Tranche", "Tail") - if first then - first <- false - Log.Warning("Tailing at {interval}s interval", interval.TotalSeconds) - elif progressSw.ElapsedMilliseconds > progressIntervalMs then - Log.Warning("Performed {count} tails to date @ {pos} chunk {chunk}", count, currentPos.CommitPosition, chunk currentPos) - progressSw.Restart() - count <- count + 1 - let! res = reader.Pump(range,batchSize,slicesStats,stats,ignoreEmptyEof=true) - stats.DumpIfIntervalExpired() - match tailIntervalMs - tailSw.ElapsedMilliseconds with - | waitTimeMs when waitTimeMs > 0L -> do! Async.Sleep (int waitTimeMs) - | _ -> () - tailSw.Restart() - match res with - | PullResult.EndOfTranche | PullResult.Eof -> () - | PullResult.Exn e -> - batchSize <- adjust batchSize - Log.Warning(e, "Tail $all failed, adjusting batch size to {bs}", batchSize) - return true } + return false } - type Reader(conn : IEventStoreConnection, spec: ReaderSpec, enumEvents, postBatch : Ingester.Batch -> unit, max, ct : CancellationToken, ?statsInterval) = + type Reader(conn : IEventStoreConnection, spec: ReaderSpec, enumEvents, postBatch : CosmosIngester.Batch -> unit, max, ct : CancellationToken, ?statsInterval) = let work = FeedQueue(spec.batchSize, spec.minBatchSize, max, ?statsInterval=statsInterval) - do match spec.tailInterval with - | Some interval -> work.AddTail(max, interval) - | None -> () - for s in spec.streams do - work.AddStream s let mutable remainder = let startPos = match spec.start with @@ -690,18 +415,16 @@ module EventStoreReader = | Absolute p -> Position(p, 0L) | Chunk c -> posFromChunk c | Percentage pct -> posFromPercentage (pct, max) - | Ignore -> max Log.Warning("Start Position {pos} (chunk {chunk}, {pct:p1})", startPos.CommitPosition, chunk startPos, float startPos.CommitPosition/ float max.CommitPosition) - if spec.start = Ignore then None + if spec.start = StartPos.Start then None else let nextPos = posFromChunkAfter startPos work.AddTranche(startPos, nextPos) Some nextPos member __.Pump () = async { - (*if spec.tail then enqueue tail work*) - let maxDop = spec.stripes + Option.count spec.tailInterval + let maxDop = spec.stripes let dop = new SemaphoreSlim(maxDop) let mutable finished = false while not ct.IsCancellationRequested && not (finished && dop.CurrentCount <> maxDop) do @@ -748,15 +471,16 @@ let enumEvents (xs : EventStore.ClientAPI.ResolvedEvent[]) = seq { || e.EventStreamId.EndsWith("_checkpoints") || e.EventStreamId.EndsWith("_checkpoint") -> Choice2Of2 e - | e when eb > Ingester.cosmosPayloadLimit -> - Log.Error("ES Event Id {eventId} (#{index} in {stream}, type {type}) size {eventSize} exceeds Cosmos ingestion limit {maxCosmosBytes}", e.EventId, e.EventNumber, e.EventStreamId, e.EventType, eb, Ingester.cosmosPayloadLimit) + | e when eb > CosmosIngester.cosmosPayloadLimit -> + Log.Error("ES Event Id {eventId} (#{index} in {stream}, type {type}) size {eventSize} exceeds Cosmos ingestion limit {maxCosmosBytes}", + e.EventId, e.EventNumber, e.EventStreamId, e.EventType, eb, CosmosIngester.cosmosPayloadLimit) Choice2Of2 e | e -> Choice1Of2 (e.EventStreamId, e.EventNumber, Equinox.Codec.Core.EventData.Create(e.EventType, e.Data, e.Metadata, e.Timestamp)) } let run (ctx : Equinox.Cosmos.Core.CosmosContext) (source : GesConnection) (spec: ReaderSpec) (writerQueueLen, writerCount, readerQueueLen) = async { - let! ingester = Ingester.start(ctx, writerQueueLen, writerCount, readerQueueLen) - let! _feeder = EventStoreReader.start(source.ReadConnection, spec, enumEvents, ingester.Add) + let! coodinator = Coordinator.Start(Log.Logger, ctx, writerQueueLen, writerCount, readerQueueLen) + let! _ = EventStoreReader.start(source.ReadConnection, spec, enumEvents, coodinator.Add) do! Async.AwaitKeyboardInterrupt() } [] diff --git a/equinox-sync/Sync/CosmosIngester.fs b/equinox-sync/Sync/CosmosIngester.fs index 37808409d..4da331597 100644 --- a/equinox-sync/Sync/CosmosIngester.fs +++ b/equinox-sync/Sync/CosmosIngester.fs @@ -3,10 +3,11 @@ open Equinox.Cosmos.Core open Equinox.Cosmos.Store open Serilog +open System.Collections.Concurrent open System.Collections.Generic open System.Threading -let private arrayBytes (x:byte[]) = if x = null then 0 else x.Length +let arrayBytes (x:byte[]) = if x = null then 0 else x.Length let private mb x = float x / 1024. / 1024. let category (streamName : string) = streamName.Split([|'-'|],2).[0] @@ -57,7 +58,7 @@ module Writer = || m.Contains "SyntaxError: JSON.parse Error: Invalid character at position" -> MalformedMessage | _ -> Other -type [] StreamState = { write: int64 option; queue: Span[] } with +type [] StreamState = { isMalformed : bool; write: int64 option; queue: Span[] } with member __.TryGap() = if __.queue = null then None else @@ -65,7 +66,7 @@ type [] StreamState = { write: int64 option; queue: Span[] } with | Some w, Some { index = i } when i > w -> Some (w, i-w) | _ -> None member __.IsReady = - if __.queue = null then false + if __.queue = null || __.isMalformed then false else match __.write, Array.tryHead __.queue with | Some w, Some { index = i } -> i = w @@ -114,7 +115,7 @@ module StreamState = let combine (s1: StreamState) (s2: StreamState) : StreamState = let writePos = optionCombine max s1.write s2.write let items = let (NNA q1, NNA q2) = s1.queue, s2.queue in Seq.append q1 q2 - { write = writePos; queue = Span.merge (defaultArg writePos 0L) items} + { write = writePos; queue = Span.merge (defaultArg writePos 0L) items; isMalformed = s1.isMalformed || s2.isMalformed} /// Gathers stats relating to how many items of a given category have been observed type CatStats() = @@ -128,6 +129,17 @@ type CatStats() = member __.Clear() = cats.Clear() member __.StatsDescending = cats |> Seq.map (|KeyValue|) |> Seq.sortByDescending snd +module Queue = + let tryDequeue (x : System.Collections.Generic.Queue<'T>) = +#if NET461 + if x.Count = 0 then None + else x.Dequeue() |> Some +#else + match x.TryDequeue() with + | false, _ -> None + | true, res -> Some res +#endif + type ResultKind = TimedOut | RateLimited | Malformed | Ok type StreamStates() = @@ -152,26 +164,26 @@ type StreamStates() = //elif Option.isNone state.write then // Log.Information("None {s} {w} {sz}", stream, updated.write, updated.Size) stream, updated - let updateWritePos stream pos span = - update stream { write = pos; queue = span } + let updateWritePos stream pos isMalformed span = + update stream { write = pos; queue = span; isMalformed = isMalformed } - member __.Add(item: Batch) = updateWritePos item.stream None [|item.span|] + member __.Add(item: Batch, ?isMalformed) = updateWritePos item.stream None (defaultArg isMalformed false) [|item.span|] member __.TryGetStreamWritePos stream = match states.TryGetValue stream with true, value -> value.write | _ -> None member __.HandleWriteResult = function - | Writer.Result.Ok (stream, pos) -> updateWritePos stream (Some pos) null, Ok - | Writer.Result.Duplicate (stream, pos) -> updateWritePos stream (Some pos) null, Ok - | Writer.Result.PartialDuplicate overage -> updateWritePos overage.stream (Some overage.span.index) [|overage.span|], Ok + | Writer.Result.Ok (stream, pos) -> updateWritePos stream (Some pos) false null, Ok + | Writer.Result.Duplicate (stream, pos) -> updateWritePos stream (Some pos) false null, Ok + | Writer.Result.PartialDuplicate overage -> updateWritePos overage.stream (Some overage.span.index) false [|overage.span|], Ok | Writer.Result.PrefixMissing (overage,pos) -> markGap overage.stream - updateWritePos overage.stream (Some pos) [|overage.span|], Ok + updateWritePos overage.stream (Some pos) false [|overage.span|], Ok | Writer.Result.Exn (exn, batch) -> - let r = + let r, malformed = match exn with - | Writer.RateLimitedMessage -> RateLimited - | Writer.TimedOutMessage -> TimedOut - | Writer.MalformedMessage -> Malformed - | Writer.Other -> Ok - __.Add(batch), r + | Writer.RateLimitedMessage -> RateLimited, false + | Writer.TimedOutMessage -> TimedOut, false + | Writer.MalformedMessage -> Malformed, true + | Writer.Other -> Ok, false + __.Add(batch, malformed), r member __.TryGap() : (string*int64*int) option = let rec aux () = match gap |> Queue.tryDequeue with @@ -210,13 +222,16 @@ type StreamStates() = for x in blocked do markDirty x res member __.Dump(log : ILogger) = - let mutable synced, ready, waiting = 0, 0, 0 - let mutable readyB, waitingB = 0L, 0L + let mutable synced, ready, waiting, malformed = 0, 0, 0, 0 + let mutable readyB, waitingB, malformedB = 0L, 0L, 0L let waitCats, readyCats, readyStreams = CatStats(), CatStats(), CatStats() for KeyValue (stream,state) in states do match int64 state.Size with | 0L -> synced <- synced + 1 + | sz when state.isMalformed -> + malformed <- malformed + 1 + malformedB <- malformedB + sz | sz when state.IsReady -> readyCats.Ingest(category stream) readyStreams.Ingest(sprintf "%s@%A" stream state.write, int sz) @@ -226,18 +241,53 @@ type StreamStates() = waitCats.Ingest(category stream) waiting <- waiting + 1 waitingB <- waitingB + sz - log.Information("Streams Synced {synced} Dirty {dirty} Ready {ready}/{readyMb:n1}MB Awaiting prefix {waiting}/{waitingMb:n1}MB", - synced, dirty.Count, ready, mb readyB, waiting, mb waitingB) + log.Information("Streams Synced {synced} Dirty {dirty} Ready {ready}/{readyMb:n1}MB Awaiting prefix {waiting}/{waitingMb:n1}MB Malformed {malformed}/{malformedMb:n1}MB Synced {synced}", + synced, dirty.Count, ready, mb readyB, waiting, mb waitingB, malformed, mb malformedB) if waitCats.Any then log.Warning("Waiting {waitCats}", waitCats.StatsDescending) if readyCats.Any then log.Information("Ready {readyCats} {readyStreams}", readyCats.StatsDescending, readyStreams.StatsDescending) -type Writers(write, maxDop) = - let work = System.Collections.Concurrent.ConcurrentQueue() +type RefCounted<'T> = { mutable refCount: int; value: 'T } + +// via https://stackoverflow.com/a/31194647/11635 +type SemaphorePool(gen : unit -> SemaphoreSlim) = + let inners: Dictionary> = Dictionary() + + let getOrCreateSlot key = + lock inners <| fun () -> + match inners.TryGetValue key with + | true, inner -> + inner.refCount <- inner.refCount + 1 + inner.value + | false, _ -> + let value = gen () + inners.[key] <- { refCount = 1; value = value } + value + let slotReleaseGuard key : System.IDisposable = + { new System.IDisposable with + member __.Dispose() = + lock inners <| fun () -> + let item = inners.[key] + match item.refCount with + | 1 -> inners.Remove key |> ignore + | current -> item.refCount <- current - 1 } + + member __.ExecuteAsync(k,f) = async { + let x = getOrCreateSlot k + use _ = slotReleaseGuard k + return! f x } + + member __.Execute(k,f) = + let x = getOrCreateSlot k + use _l = slotReleaseGuard k + f x + + type Writers(write, maxDop, ?maxQueueLen) = + let work = ConcurrentQueue() let result = Event() - let locks = Infrastructure.SemaphorePool(fun () -> new SemaphoreSlim 1) + let locks = SemaphorePool(fun () -> new SemaphoreSlim 1) [] member __.Result = result.Publish member __.Enqueue item = work.Enqueue item - member __.HasCapacity = work.Count < maxDop + member __.HasCapacity = work.Count < maxDop && maxQueueLen |> Option.forall (fun max -> work.Count < max) member __.IsStreamBusy stream = let checkBusy (x : SemaphoreSlim) = x.CurrentCount = 0 locks.Execute(stream,checkBusy) diff --git a/equinox-sync/Sync/Infrastructure.fs b/equinox-sync/Sync/Infrastructure.fs index 5c1e5a55b..165e548b2 100644 --- a/equinox-sync/Sync/Infrastructure.fs +++ b/equinox-sync/Sync/Infrastructure.fs @@ -1,10 +1,10 @@ [] -module private SyncTemplate.Infrastructure +module private Infrastructure +open Equinox.Store // AwaitTaskCorrect open System open System.Threading open System.Threading.Tasks -open System.Collections.Generic #nowarn "21" // re AwaitKeyboardInterrupt #nowarn "40" // re AwaitKeyboardInterrupt @@ -19,22 +19,9 @@ type Async with and d : IDisposable = Console.CancelKeyPress.Subscribe callback in ()) -module Queue = - let tryDequeue (x : System.Collections.Generic.Queue<'T>) = -#if NET461 - if x.Count = 0 then None - else x.Dequeue() |> Some -#else - match x.TryDequeue() with - | false, _ -> None - | true, res -> Some res -#endif - -open Equinox.Store // AwaitTaskCorrect - type SemaphoreSlim with /// F# friendly semaphore await function - member semaphore.Await(?timeout : TimeSpan) = async { + member semaphore.Await(?timeout : System.TimeSpan) = async { let! ct = Async.CancellationToken let timeout = defaultArg timeout Timeout.InfiniteTimeSpan let task = semaphore.WaitAsync(timeout, ct) @@ -46,39 +33,4 @@ type SemaphoreSlim with let! _ = semaphore.Await() try return! workflow finally semaphore.Release() |> ignore - } - -type RefCounted<'T> = { mutable refCount: int; value: 'T } - -// via https://stackoverflow.com/a/31194647/11635 -type SemaphorePool(gen : unit -> SemaphoreSlim) = - let inners: Dictionary> = Dictionary() - - let getOrCreateSlot key = - lock inners <| fun () -> - match inners.TryGetValue key with - | true, inner -> - inner.refCount <- inner.refCount + 1 - inner.value - | false, _ -> - let value = gen () - inners.[key] <- { refCount = 1; value = value } - value - let slotReleaseGuard key : IDisposable = - { new System.IDisposable with - member __.Dispose() = - lock inners <| fun () -> - let item = inners.[key] - match item.refCount with - | 1 -> inners.Remove key |> ignore - | current -> item.refCount <- current - 1 } - - member __.ExecuteAsync(k,f) = async { - let x = getOrCreateSlot k - use _ = slotReleaseGuard k - return! f x } - - member __.Execute(k,f) = - let x = getOrCreateSlot k - use _l = slotReleaseGuard k - f x \ No newline at end of file + } \ No newline at end of file