diff --git a/equinox-sync/Sync/Infrastructure.fs b/equinox-sync/Sync/Infrastructure.fs index d8c3fa2b3..588ae2ab8 100644 --- a/equinox-sync/Sync/Infrastructure.fs +++ b/equinox-sync/Sync/Infrastructure.fs @@ -1,6 +1,7 @@ [] module private SyncTemplate.Infrastructure +open Equinox.Store // AwaitTaskCorrect open System open System.Threading open System.Threading.Tasks @@ -17,3 +18,23 @@ type Async with let rec callback _ = Task.Run(fun () -> if Interlocked.Increment isDisposed = 1 then d.Dispose() ; sc ()) |> ignore and d : IDisposable = Console.CancelKeyPress.Subscribe callback in ()) + +type SemaphoreSlim with + /// F# friendly semaphore await function + member semaphore.Await(?timeout : TimeSpan) = async { + let! ct = Async.CancellationToken + let timeout = defaultArg timeout Timeout.InfiniteTimeSpan + let task = semaphore.WaitAsync(timeout, ct) + return! Async.AwaitTaskCorrect task + } + +module Queue = + let tryDequeue (x : System.Collections.Generic.Queue<'T>) = +#if NET461 + if x.Count = 0 then None + else x.Dequeue() |> Some +#else + match x.TryDequeue() with + | false, _ -> None + | true, res -> Some res +#endif diff --git a/equinox-sync/Sync/Program.fs b/equinox-sync/Sync/Program.fs index 000bb7f3f..c3a31d7ea 100644 --- a/equinox-sync/Sync/Program.fs +++ b/equinox-sync/Sync/Program.fs @@ -6,7 +6,8 @@ open Serilog open System type StartPos = Absolute of int64 | Percentage of float | Start -type ReaderSpec = { start: StartPos; batchSize: int; streams: string list } +type ReaderSpec = { start: StartPos; stripes: int; batchSize: int; streams: string list } +let mb x = float x / 1024. / 1024. module CmdParser = open Argu @@ -100,12 +101,10 @@ module CmdParser = concurrentOperationsLimit=col, log=log, tags=["M", Environment.MachineName; "I", Guid.NewGuid() |> string]) .Establish("ProjectorTemplate", discovery, connection) member val Cosmos = Cosmos.Info(args.GetResult Cosmos) - member __.CreateGateway conn = GesGateway(conn, GesBatchingPolicy(maxBatchSize = args.GetResult(MaxItems,4096))) member __.Host = match args.TryGetResult Host with Some x -> x | None -> envBackstop "Host" "EQUINOX_ES_HOST" member __.Port = match args.TryGetResult Port with Some x -> Some x | None -> Environment.GetEnvironmentVariable "EQUINOX_ES_PORT" |> Option.ofObj |> Option.map int member __.User = match args.TryGetResult Username with Some x -> x | None -> envBackstop "Username" "EQUINOX_ES_USERNAME" member __.Password = match args.TryGetResult Password with Some x -> x | None -> envBackstop "Password" "EQUINOX_ES_PASSWORD" - member val CacheStrategy = let c = Caching.Cache("ProjectorTemplate", sizeMb = 50) in CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) member __.Connect(log: ILogger, storeLog, connection) = let (timeout, retries) as operationThrottling = args.GetResult(Timeout,20.) |> TimeSpan.FromSeconds, args.GetResult(Retries,3) let heartbeatTimeout = args.GetResult(HeartbeatTimeout,1.5) |> TimeSpan.FromSeconds @@ -124,6 +123,8 @@ module CmdParser = | [] Stream of string | [] Offset of int64 | [] Percent of float + | [] Stripes of int + | [] Tail of intervalMs: int | [] Es of ParseResults interface IArgParserTemplate with member a.Usage = @@ -135,6 +136,8 @@ module CmdParser = | Stream _ -> "specific stream(s) to read" | Offset _ -> "EventStore $all Stream Position to commence from" | Percent _ -> "EventStore $all Stream Position to commence from (as a percentage of current tail position)" + | Stripes _ -> "number of concurrent readers" + | Tail _ -> "attempt to read from tail at specified interval in milliseconds" | Es _ -> "specify EventStore parameters" and Parameters(args : ParseResults) = member val EventStore = EventStore.Info(args.GetResult Es) @@ -142,14 +145,20 @@ module CmdParser = member __.ConsoleMinLevel = if args.Contains VerboseConsole then LogEventLevel.Information else LogEventLevel.Warning member __.MaybeSeqEndpoint = if args.Contains LocalSeq then Some "http://localhost:5341" else None member __.BatchSize = args.GetResult(BatchSize,4096) + member __.Stripes = args.GetResult(Stripes,1) + member __.TailInterval = match args.TryGetResult Tail with Some s -> s |> float |> TimeSpan.FromMilliseconds |> Some | None -> None member x.BuildFeedParams() : ReaderSpec = - Log.Information("Processing in batches of {batchSize}", x.BatchSize) + Log.Warning("Processing in batches of {batchSize}", x.BatchSize) + Log.Warning("Reading with {stripes} stripes", x.Stripes) + match x.TailInterval with + | Some interval -> Log.Warning("Following tail at {seconds}s interval", interval.TotalSeconds) + | None -> Log.Warning "Not following tail" let startPos = match args.TryGetResult Offset, args.TryGetResult Percent with | Some p, _ -> Log.Warning("Processing will commence at $all Position {p}", p); Absolute p | _, Some p -> Log.Warning("Processing will commence at $all Percentage {pct:P}", p/100.); Percentage p | None, None -> Log.Warning "Processing will commence at $all Start"; Start - { start = startPos; batchSize = x.BatchSize; streams = args.GetResults Stream } + { start = startPos; stripes = x.Stripes; batchSize = x.BatchSize; streams = args.GetResults Stream } /// Parse the commandline; can throw exceptions in response to missing arguments and/or `-h`/`--help` args let parse argv : Parameters = @@ -166,7 +175,8 @@ module Logging = .Destructure.FSharpTypes() .Enrich.FromLogContext() |> fun c -> if verbose then c.MinimumLevel.Debug() else c - |> fun c -> c.WriteTo.Console(consoleMinLevel, theme=Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code) + |> fun c -> let t = "[{Timestamp:HH:mm:ss} {Level:u3}] {Tranche} {Message:lj} {Properties} {NewLine}{Exception}" + c.WriteTo.Console(consoleMinLevel, theme=Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code, outputTemplate=t) |> fun c -> match maybeSeqEndpoint with None -> c | Some endpoint -> c.WriteTo.Seq(endpoint) |> fun c -> c.CreateLogger() @@ -322,14 +332,9 @@ module Ingester = __.Add(batch,malformed) if malformed then Some (category batch.stream) else None member __.TryPending() = -#if NET461 - if dirty.Count = 0 then None else - let stream = dirty.Dequeue() -#else - match dirty.TryDequeue() with - | false, _ -> None - | true, stream -> -#endif + match dirty |> Queue.tryDequeue with + | None -> None + | Some stream -> let state = states.[stream] if not state.IsReady then None else @@ -354,8 +359,7 @@ module Ingester = | sz when state.isMalformed -> malformed <- malformed + 1; malformedB <- malformedB + sz | sz when state.IsReady -> ready <- ready + 1; readyB <- readyB + sz | sz -> waitCats.Ingest(category stream); waiting <- waiting + 1; waitingB <- waitingB + sz - let mb x = x / 1024L / 1024L - Log.Warning("Syncing {dirty} Ready {ready}/{readyMb}MB Waiting {waiting}/{waitingMb}MB Malformed {malformed}/{malformedMb}MB Synced {synced}", + Log.Warning("Syncing {dirty} Ready {ready}/{readyMb:n1}MB Waiting {waiting}/{waitingMb:n1}MB Malformed {malformed}/{malformedMb:n1}MB Synced {synced}", dirty.Count, ready, mb readyB, waiting, mb waitingB, malformed, mb malformedB, synced) if waitCats.Any then Log.Warning("Waiting {waitCats}", waitCats.StatsDescending) @@ -418,7 +422,7 @@ module Ingester = if progressTimer.ElapsedMilliseconds > intervalMs then progressTimer.Restart() Log.Warning("Ingested {ingestions}; Sent {queued} req {events} events; Completed {completed} reqs; Egress {gb:n3}GB", - !ingestionsHandled, !workPended, !eventsPended,!resultsHandled, float bytesPended / 1024. / 1024. / 1024.) + !ingestionsHandled, !workPended, !eventsPended,!resultsHandled, mb bytesPended / 1024.) if badCats.Any then Log.Error("Malformed {badCats}", badCats.StatsDescending); badCats.Clear() ingestionsHandled := 0; workPended := 0; eventsPended := 0; resultsHandled := 0 states.Dump() @@ -434,50 +438,10 @@ module Ingester = return queue } -open EventStore.ClientAPI - -module Reader = - - let loadSpecificStreams (conn:IEventStoreConnection, batchSize) streams (postBatch : (Ingester.Batch -> unit)) = - let fetchStream stream = - let rec fetchFrom pos = async { - let! currentSlice = conn.ReadStreamEventsBackwardAsync(stream, pos, batchSize, resolveLinkTos=true) |> Async.AwaitTaskCorrect - if currentSlice.IsEndOfStream then return () else - let events = - [| for x in currentSlice.Events -> - let e = x.Event - Equinox.Codec.Core.EventData.Create (e.EventType, e.Data, e.Metadata) :> Equinox.Codec.IEvent |] - postBatch { stream = stream; span = { pos = currentSlice.FromEventNumber; events = events } } - return! fetchFrom currentSlice.NextEventNumber } - fetchFrom 0L - async { - for stream in streams do - do! fetchStream stream } - -open Equinox.EventStore -open Equinox.Cosmos - -let enumEvents (slice : AllEventsSlice) = seq { - for e in slice.Events -> - let eb = Ingester.esPayloadBytes e - match e.Event with - | e when not e.IsJson - || e.EventType.StartsWith("compacted",StringComparison.OrdinalIgnoreCase) - || e.EventStreamId.StartsWith("$") - || e.EventStreamId.EndsWith("_checkpoints") - || e.EventStreamId.EndsWith("_checkpoint") - || e.EventStreamId = "thor_useast2_to_backup_qa2_main" -> - Choice2Of2 e - | e when eb > Ingester.cosmosPayloadLimit -> - Log.Error("ES Event Id {eventId} size {eventSize} exceeds Cosmos ingestion limit {maxCosmosBytes}", e.EventId, eb, Ingester.cosmosPayloadLimit) - Choice2Of2 e - | e -> Choice1Of2 (e.EventStreamId, e.EventNumber, Equinox.Codec.Core.EventData.Create(e.EventType, e.Data, e.Metadata)) -} - type SliceStatsBuffer(?interval) = let intervalMs = let t = defaultArg interval (TimeSpan.FromMinutes 1.) in t.TotalMilliseconds |> int64 let recentCats, accStart = System.Collections.Generic.Dictionary(), Stopwatch.StartNew() - member __.Ingest(slice: AllEventsSlice) = + member __.Ingest(slice: EventStore.ClientAPI.AllEventsSlice) = let mutable batchBytes = 0 for x in slice.Events do let cat = Ingester.category x.OriginalStreamId @@ -486,7 +450,7 @@ type SliceStatsBuffer(?interval) = | true, (currCount, currSize) -> recentCats.[cat] <- (currCount + 1, currSize+eventBytes) | false, _ -> recentCats.[cat] <- (1, eventBytes) batchBytes <- batchBytes + eventBytes - slice.Events.Length, batchBytes + slice.Events.Length, int64 batchBytes member __.DumpIfIntervalExpired() = if accStart.ElapsedMilliseconds > intervalMs then let log = function @@ -505,77 +469,215 @@ type SliceStatsBuffer(?interval) = type OverallStats() = let mutable totalEvents, totalBytes = 0L, 0L member __.Ingest(batchEvents, batchBytes) = - totalEvents <- totalEvents + int64 batchEvents - totalBytes <- totalBytes + int64 batchBytes + totalEvents <- totalEvents + batchEvents + totalBytes <- totalBytes + batchBytes member __.Bytes = totalBytes member __.Events = totalEvents -let run (destination : CosmosConnection, colls) (source : GesConnection) (spec: ReaderSpec) (writerQueueLen, writerCount, readerQueueLen) = async { - let fetchMax = async { - let! lastItemBatch = source.ReadConnection.ReadAllEventsBackwardAsync(Position.End, 1, resolveLinkTos = false) |> Async.AwaitTaskCorrect - let max = lastItemBatch.NextPosition.CommitPosition - Log.Warning("EventStore Write Position @ {pos}", max) +type Range(start, sliceEnd : EventStore.ClientAPI.Position option, max : EventStore.ClientAPI.Position) = + member val Current = start with get, set + member __.TryNext(pos: EventStore.ClientAPI.Position) = + __.Current <- pos + __.IsCompleted + member __.IsCompleted = + match sliceEnd with + | Some send when __.Current.CommitPosition >= send.CommitPosition -> false + | _ -> true + member __.PositionAsRangePercentage = float __.Current.CommitPosition/float max.CommitPosition + +module EventStoreReader = + open EventStore.ClientAPI + + let posFromPercentage (pct,max : Position) = + let rawPos = float max.CommitPosition * pct / 100. |> int64 + // @scarvel8: event_global_position = 256 x 1024 x 1024 x chunk_number + chunk_header_size (128) + event_position_offset_in_chunk + let chunkBase = rawPos &&& 0xFFFFFFFFE0000000L // rawPos / 256L / 1024L / 1024L * 1024L * 1024L * 256L + Position(chunkBase,0L) + let posFromChunkAfter (pos: Position) = + let chunkBase = pos.CommitPosition &&& 0xFFFFFFFFF0000000L // rawPos / 256L / 1024L / 1024L * 1024L * 1024L * 256L + let nextBase = chunkBase + 256L * 1024L * 1024L + Position(nextBase,0L) + let chunk (pos: Position) = + uint64 pos.CommitPosition >>> 28 + + let fetchMax (conn : IEventStoreConnection) = async { + let! lastItemBatch = conn.ReadAllEventsBackwardAsync(Position.End, 1, resolveLinkTos = false) |> Async.AwaitTaskCorrect + let max = lastItemBatch.NextPosition + Log.Warning("EventStore {chunks} chunks Write Position @ {pos} ", chunk max, max.CommitPosition) return max } - let followAll (postBatch : Ingester.Batch -> unit) = async { - let mutable currentPos = - match spec.start with - | Absolute p -> EventStore.ClientAPI.Position(p,0L) - | Percentage _ -> Position.End // placeholder, will be overwritten below - | Start -> Position.Start - let overall, slicesStats = OverallStats(), SliceStatsBuffer() - let run max = async { - let sw = Stopwatch.StartNew() // we'll report the warmup/connect time on the first batch - let rec loop () = async { - let! currentSlice = source.ReadConnection.ReadAllEventsForwardAsync(currentPos, spec.batchSize, resolveLinkTos = false) |> Async.AwaitTaskCorrect - sw.Stop() // Stop the clock after ChangeFeedProcessor hands off to us - let batchEvents, batchBytes = slicesStats.Ingest currentSlice in overall.Ingest(batchEvents, batchBytes) - slicesStats.DumpIfIntervalExpired() - let streams = - enumEvents currentSlice - |> Seq.choose (function Choice1Of2 e -> Some e | Choice2Of2 _ -> None) - |> Seq.groupBy (fun (streamId,_eventNumber,_eventData) -> streamId) - |> Seq.map (fun (streamId,xs) -> streamId, [| for _s, i, e in xs -> i, e |]) - |> Array.ofSeq - let usedCats = streams |> Seq.map fst |> Seq.distinct |> Seq.length - - let postSw = Stopwatch.StartNew() - let usedEvents = ref 0 - for stream,streamEvents in streams do - for pos, item in streamEvents do - incr usedEvents - postBatch { stream = stream; span = { pos = pos; events = [| item |]}} - let currentOffset = currentSlice.NextPosition.CommitPosition - Log.Warning("Read {count} {ft:n3}s {mb:n1}MB {gb:n3}GB Process c {categories,2} s {streams,4} e {events,4} {pt:n0}ms Pos @ {pos} {pct:p1}", - batchEvents, (let e = sw.Elapsed in e.TotalSeconds), float batchBytes / 1024. / 1024., float overall.Bytes / 1024. / 1024. / 1024., - usedCats, streams.Length, !usedEvents, postSw.ElapsedMilliseconds, - currentOffset, float currentOffset/float max) - if currentSlice.IsEndOfStream then Log.Warning("Completed {total:n0}", overall.Events) - sw.Restart() // restart the clock as we hand off back to the Reader - if not currentSlice.IsEndOfStream then - currentPos <- currentSlice.NextPosition - return! loop () } - do! loop () } - let mutable finished = false + let establishMax (conn : IEventStoreConnection) = async { let mutable max = None - while not finished do - try if max = None then - let! currMax = fetchMax - max <- Some currMax - match spec.start with - | Percentage pct -> - let rawPos = float currMax * pct / 100. |> int64 - // @scarvel8: event_global_position = 256 x 1024 x 1024 x chunk_number + chunk_header_size (128) + event_position_offset_in_chunk - let chunkBase = rawPos &&& 0xFFFFFFFFE0000000L // rawPos / 256L / 1024L / 1024L * 1024L * 1024L * 256L ;) - Log.Warning("Effective Start Position {pos} (chunk {chunk})", chunkBase, chunkBase >>> 29) - currentPos <- EventStore.ClientAPI.Position(chunkBase,0L) - | _ -> () - do! run (Option.get max) - finished <- true - with e -> Log.Warning(e,"Ingestion error") } + while Option.isNone max do + try let! max_ = fetchMax conn + max <- Some max_ + with e -> + Log.Warning(e,"Could not establish max position") + do! Async.Sleep 5000 + return Option.get max } + let pullStream (conn : IEventStoreConnection, batchSize) stream (postBatch : Ingester.Batch -> unit) = + let rec fetchFrom pos = async { + let! currentSlice = conn.ReadStreamEventsBackwardAsync(stream, pos, batchSize, resolveLinkTos=true) |> Async.AwaitTaskCorrect + if currentSlice.IsEndOfStream then return () else + let events = + [| for x in currentSlice.Events -> + let e = x.Event + Equinox.Codec.Core.EventData.Create (e.EventType, e.Data, e.Metadata) :> Equinox.Codec.IEvent |] + postBatch { stream = stream; span = { pos = currentSlice.FromEventNumber; events = events } } + return! fetchFrom currentSlice.NextEventNumber } + fetchFrom 0L + + type [] PullResult = Exn of exn: exn | Eof | EndOfTranche + let pullSourceRange (conn : IEventStoreConnection, batchSize) (range : Range) enumEvents (postBatch : Ingester.Batch -> unit) = + let stats, slicesStats = OverallStats(), SliceStatsBuffer() + let sw = Stopwatch.StartNew() // we'll report the warmup/connect time on the first batch + let rec loop () = async { + let! currentSlice = conn.ReadAllEventsForwardAsync(range.Current, batchSize, resolveLinkTos = false) |> Async.AwaitTaskCorrect + sw.Stop() // Stop the clock after ChangeFeedProcessor hands off to us + let postSw = Stopwatch.StartNew() + let batchEvents, batchBytes = slicesStats.Ingest currentSlice in stats.Ingest(int64 batchEvents, batchBytes) + slicesStats.DumpIfIntervalExpired() + let streams = + enumEvents currentSlice.Events + |> Seq.choose (function Choice1Of2 e -> Some e | Choice2Of2 _ -> None) + |> Seq.groupBy (fun (streamId,_eventNumber,_eventData) -> streamId) + |> Seq.map (fun (streamId,xs) -> streamId, [| for _s, i, e in xs -> i, e |]) + |> Array.ofSeq + let usedStreams, usedCats = streams.Length, streams |> Seq.map fst |> Seq.distinct |> Seq.length + let mutable usedEvents = 0 + for stream,streamEvents in streams do + for pos, item in streamEvents do + usedEvents <- usedEvents + 1 + postBatch { stream = stream; span = { pos = pos; events = [| item |]}} + let shouldLoop = range.TryNext currentSlice.NextPosition + Log.Warning("Read {count} {ft:n3}s {mb:n1}MB Process c {categories,2} s {streams,4} e {events,4} {pt:n0}ms Pos @ {pos} {pct:p1}", + batchEvents, (let e = sw.Elapsed in e.TotalSeconds), mb batchBytes, + usedCats, usedStreams, usedEvents, postSw.ElapsedMilliseconds, + currentSlice.NextPosition.CommitPosition, range.PositionAsRangePercentage) + if shouldLoop && not currentSlice.IsEndOfStream then + sw.Restart() // restart the clock as we hand off back to the Reader + return! loop () + else + return currentSlice.IsEndOfStream } + async { + try let! eof = loop () + return (if eof then Eof else EndOfTranche), range, stats + with e -> return Exn e, range, stats } + + type [] Work = + | Stream of name: string * batchSize: int + | Tranche of range: Range * batchSize : int + type FeedQueue(batchSize, max) = + let work = ConcurrentQueue() + let add item = + work.Enqueue item + Log.Warning("Added {item}; count: {count}", item, work.Count) + member val OverallStats = OverallStats() with get + member __.AddTranche(range, ?batchSizeOverride) = + add <| Work.Tranche (range, defaultArg batchSizeOverride batchSize) + member __.AddTranche(pos, nextPos, ?batchSizeOverride) = + __.AddTranche(Range (pos, Some nextPos, max), ?batchSizeOverride=batchSizeOverride) + member __.AddStream(name, ?batchSizeOverride) = + add <| Work.Stream (name, defaultArg batchSizeOverride batchSize) + member __.TryDequeue () = + work.TryDequeue() + member __.Process(conn, enumEvents, postBatch, work) = async { + let adjust batchSize = if batchSize > 128 then batchSize / 2 else batchSize + match work with + | Stream (name,batchSize) -> + use _ = Serilog.Context.LogContext.PushProperty("Stream",name) + Log.Warning("Reading stream; batch size {bs}", batchSize) + try do! pullStream (conn, batchSize) name postBatch + with e -> + let bs = adjust batchSize + Log.Warning(e,"Could not read stream, retrying with batch size {bs}", bs) + __.AddStream(name, bs) + | Tranche (range, batchSize) -> + use _ = Serilog.Context.LogContext.PushProperty("Tranche",chunk range.Current) + Log.Warning("Reading chunk; batch size {bs}", batchSize) + let! eofOption, range, stats = pullSourceRange (conn, batchSize) range enumEvents postBatch + lock __.OverallStats <| fun () -> __.OverallStats.Ingest(stats.Events, stats.Bytes) + match eofOption with + | PullResult.EndOfTranche -> Log.Warning("Completed tranche") + | PullResult.Eof -> Log.Warning("REACHED THE END!") + | PullResult.Exn e -> + let bs = adjust batchSize + Log.Warning(e, "Could not read All, retrying with batch size {bs}", bs) + __.AddTranche(range, bs) + } + + type Reader(conn : IEventStoreConnection, spec: ReaderSpec, enumEvents, postBatch : Ingester.Batch -> unit, max, ct : CancellationToken) = + let work = FeedQueue(spec.batchSize, max) + do for s in spec.streams do work.AddStream(s) + let mutable remainder = + let startPos = + match spec.start with + | StartPos.Start -> Position.Start + | Absolute p -> Position(p, 0L) + | Percentage pct -> + let startPos = posFromPercentage (pct, max) + Log.Warning("Effective Start Position {tranche} (chunk {chunk})", startPos.CommitPosition, chunk startPos) + startPos + let nextPos = posFromChunkAfter startPos + work.AddTranche(startPos, nextPos) + Some nextPos + + member __.Pump () = async { + (*if spec.tail then enqueue tail work*) + let maxDop = spec.stripes + let dop = new SemaphoreSlim(maxDop) + let mutable finished = false + while not ct.IsCancellationRequested && not (finished && dop.CurrentCount <> maxDop) do + let! _ = dop.Await() + let forkRunRelease task = async { + let! _ = Async.StartChild <| async { + do! work.Process(conn, enumEvents, postBatch, task) + dop.Release() |> ignore } + return () } + match work.TryDequeue() with + | true, task -> + do! forkRunRelease task + | false, _ -> + match remainder with + | None -> + finished <- true + Log.Warning("Processing completed") + | Some pos -> + let nextPos = posFromChunkAfter pos + remainder <- Some nextPos + do! forkRunRelease <| Work.Tranche (Range(pos, Some nextPos, max), spec.batchSize) } + + let start (conn, spec, enumEvents, postBatch) = async { + let! ct = Async.CancellationToken + let! max = establishMax conn + let reader = Reader(conn, spec, enumEvents, postBatch, max, ct) + let! _ = Async.StartChild <| reader.Pump() + return () + } + +open Equinox.Cosmos +open Equinox.EventStore + +let enumEvents (xs : EventStore.ClientAPI.ResolvedEvent[]) = seq { + for e in xs -> + let eb = Ingester.esPayloadBytes e + match e.Event with + | e when not e.IsJson + || e.EventType.StartsWith("compacted",StringComparison.OrdinalIgnoreCase) + || e.EventStreamId.StartsWith("$") + || e.EventStreamId.EndsWith("_checkpoints") + || e.EventStreamId.EndsWith("_checkpoint") + || e.EventStreamId = "thor_useast2_to_backup_qa2_main" -> + Choice2Of2 e + | e when eb > Ingester.cosmosPayloadLimit -> + Log.Error("ES Event Id {eventId} size {eventSize} exceeds Cosmos ingestion limit {maxCosmosBytes}", e.EventId, eb, Ingester.cosmosPayloadLimit) + Choice2Of2 e + | e -> Choice1Of2 (e.EventStreamId, e.EventNumber, Equinox.Codec.Core.EventData.Create(e.EventType, e.Data, e.Metadata)) +} + +let run (destination : CosmosConnection, colls) (source : GesConnection) (spec: ReaderSpec) (writerQueueLen, writerCount, readerQueueLen) = async { let ctx = Equinox.Cosmos.Core.CosmosContext(destination, colls, Log.Logger) - let! ingester = Ingester.start(ctx, writerQueueLen, writerCount, readerQueueLen) - let! _ = Async.StartChild (Reader.loadSpecificStreams (source.ReadConnection, spec.batchSize) spec.streams ingester.Add) - let! _ = Async.StartChild (followAll ingester.Add) + let! ingester = Ingester.start(ctx, writerQueueLen, writerCount, readerQueueLen) + let! _feeder = EventStoreReader.start(source.ReadConnection, spec, enumEvents, ingester.Add) do! Async.AwaitKeyboardInterrupt() } []