From 464eef6467763a3aa628a3c8f3ae396475f76864 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Fri, 12 Apr 2019 09:30:38 +0100 Subject: [PATCH] Target Equinox 2.0.0-preview5 --- CHANGELOG.md | 4 +- equinox-projector/Consumer/Consumer.fsproj | 4 +- equinox-projector/Projector/Program.fs | 56 ++++++++++---------- equinox-projector/Projector/Projector.fsproj | 4 +- equinox-sync/Sync/Program.fs | 4 +- equinox-sync/Sync/Sync.fsproj | 2 +- equinox-testbed/Testbed/Testbed.fsproj | 8 +-- equinox-web-csharp/Domain/Domain.csproj | 2 +- equinox-web-csharp/Web/Web.csproj | 8 +-- equinox-web/Domain/Domain.fsproj | 2 +- equinox-web/Web/Web.fsproj | 8 +-- 11 files changed, 50 insertions(+), 52 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 15ba1e76f..2bbe84e6b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,11 +14,11 @@ The `Unreleased` section name is replaced by the expected version of next releas - `dotnet new eqxetl` is now `dotnet new eqxsync` - `dotnet new eqxsync` now supports command-line category white/blacklist [#18](https://github.com/jet/dotnet-templates/pull/18) - `dotnet new eqxsync` now supports command-line selection of an `aux` collection in either the `source` or destination collections [#18](https://github.com/jet/dotnet-templates/pull/18) -- Targets `Equinox`.* v `2.0.0-preview4` +- Targets `Equinox`.* v `2.0.0-preview5` - `dotnet new eqxprojector` now uses `Jet.ConfluentKafka.FSharp 1.0.0-rc2` (which uses `Confluent.Kafka 1.0.0-RC3`, `librdkafka 1.0.0`) ### Removed -### Fixed +### Fixed555 ## [2.0.0] - 2019-03-26 diff --git a/equinox-projector/Consumer/Consumer.fsproj b/equinox-projector/Consumer/Consumer.fsproj index f3fe90179..eaf6ae431 100644 --- a/equinox-projector/Consumer/Consumer.fsproj +++ b/equinox-projector/Consumer/Consumer.fsproj @@ -14,8 +14,8 @@ - - + + diff --git a/equinox-projector/Projector/Program.fs b/equinox-projector/Projector/Program.fs index 24fa4d355..add139bf7 100644 --- a/equinox-projector/Projector/Program.fs +++ b/equinox-projector/Projector/Program.fs @@ -130,19 +130,6 @@ module CmdParser = let parser = ArgumentParser.Create(programName = programName) parser.ParseCommandLine argv |> Arguments -// Illustrates how to emit direct to the Console using Serilog -// Other topographies can be achieved by using various adapters and bridges, e.g., SerilogTarget or Serilog.Sinks.NLog -module Logging = - let initialize verbose changeLogVerbose = - Log.Logger <- - LoggerConfiguration().Destructure.FSharpTypes().Enrich.FromLogContext() - |> fun c -> if verbose then c.MinimumLevel.Debug() else c - // LibLog writes to the global logger, so we need to control the emission if we don't want to pass loggers everywhere - |> fun c -> let cfpl = if changeLogVerbose then Serilog.Events.LogEventLevel.Debug else Serilog.Events.LogEventLevel.Warning - c.MinimumLevel.Override("Microsoft.Azure.Documents.ChangeFeedProcessor", cfpl) - |> fun c -> c.WriteTo.Console(theme=Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code) - .CreateLogger() - let run discovery connectionPolicy source (aux, leaseId, forceSkip, batchSize, lagReportFreq : TimeSpan option) createRangeProjector = async { @@ -162,39 +149,50 @@ let mkRangeProjector (broker, topic) = let cfg = KafkaProducerConfig.Create("ProjectorTemplate", broker, Acks.Leader, compression = CompressionType.Lz4) let producer = KafkaProducer.Create(Log.Logger, cfg, topic) let disposeProducer = (producer :> IDisposable).Dispose - let projectBatch (ctx : IChangeFeedObserverContext) (docs : IReadOnlyList) = async { + let projectBatch (log : ILogger) (ctx : IChangeFeedObserverContext) (docs : IReadOnlyList) = async { sw.Stop() // Stop the clock after ChangeFeedProcessor hands off to us - let toKafkaEvent (e: DocumentParser.IEvent) : RenderedEvent = - // Late breaking hack compensating for the way the Equinox.Codec JsonConverter renders invalid json - // TODO remove when it's fixed - let meta' = if e.Meta = null || e.Meta.Length = 0 then System.Text.Encoding.UTF8.GetBytes("{}") else e.Meta - { s = e.Stream; i = e.Index; c = e.EventType; t = e.Timestamp; d = e.Data; m = meta' } + let toKafkaEvent (e: DocumentParser.IEvent) : RenderedEvent = { s = e.Stream; i = e.Index; c = e.EventType; t = e.Timestamp; d = e.Data; m = e.Meta } let pt,events = (fun () -> docs |> Seq.collect DocumentParser.enumEvents |> Seq.map toKafkaEvent |> Array.ofSeq) |> Stopwatch.Time let es = [| for e in events -> e.s, JsonConvert.SerializeObject e |] - let! et,_ = producer.ProduceBatch es |> Stopwatch.Time - let r = ctx.FeedResponse + let! et,() = async { + let! _ = producer.ProduceBatch es + do! ctx.CheckpointAsync() |> Async.AwaitTaskCorrect } |> Stopwatch.Time - Log.Information("Read {range,2} -{token,6} {count,4} docs {requestCharge,6}RU {l:n1}s Parse {events,5} events {p:n3}s Emit {e:n1}s", - ctx.PartitionKeyRangeId, r.ResponseContinuation.Trim[|'"'|], docs.Count, (let c = r.RequestCharge in c.ToString("n1")), + log.Information("Read -{token,6} {count,4} docs {requestCharge,6}RU {l:n1}s Parse {events,5} events {p:n3}s Emit {e:n1}s", + ctx.FeedResponse.ResponseContinuation.Trim[|'"'|], docs.Count, (let c = ctx.FeedResponse.RequestCharge in c.ToString("n1")), float sw.ElapsedMilliseconds / 1000., events.Length, (let e = pt.Elapsed in e.TotalSeconds), (let e = et.Elapsed in e.TotalSeconds)) sw.Restart() // restart the clock as we handoff back to the ChangeFeedProcessor } - ChangeFeedObserver.Create(Log.Logger, projectBatch, disposeProducer) + ChangeFeedObserver.Create(Log.Logger, projectBatch, dispose = disposeProducer) //#else let createRangeHandler () = let sw = Stopwatch.StartNew() // we'll end up reporting the warmup/connect time on the first batch, but that's ok - let processBatch (ctx : IChangeFeedObserverContext) (docs : IReadOnlyList) = async { + let processBatch (log : ILogger) (ctx : IChangeFeedObserverContext) (docs : IReadOnlyList) = async { sw.Stop() // Stop the clock after ChangeFeedProcessor hands off to us let pt,events = (fun () -> docs |> Seq.collect DocumentParser.enumEvents |> Seq.length) |> Stopwatch.Time - let r = ctx.FeedResponse - Log.Information("Read {range,2} -{token,6} {count,4} docs {requestCharge,6}RU {l:n1}s Parse {events,5} events {p:n3}s", - ctx.PartitionKeyRangeId, r.ResponseContinuation.Trim[|'"'|], docs.Count, (let c = r.RequestCharge in c.ToString("n1")), - float sw.ElapsedMilliseconds / 1000., events, (let e = pt.Elapsed in e.TotalSeconds)) + let! ct,() = async { do! ctx.CheckpointAsync() |> Async.AwaitTaskCorrect } |> Stopwatch.Time + log.Information("Read -{token,6} {count,4} docs {requestCharge,6}RU {l:n1}s Parse {events,5} events {p:n3}s Checkpoint {c:n3}s", + ctx.FeedResponse.ResponseContinuation.Trim[|'"'|], docs.Count, (let c = ctx.FeedResponse.RequestCharge in c.ToString("n1")), + float sw.ElapsedMilliseconds / 1000., events, (let e = pt.Elapsed in e.TotalSeconds), (let e = ct.Elapsed in e.TotalSeconds)) sw.Restart() // restart the clock as we handoff back to the ChangeFeedProcessor } ChangeFeedObserver.Create(Log.Logger, processBatch) //#endif +// Illustrates how to emit direct to the Console using Serilog +// Other topographies can be achieved by using various adapters and bridges, e.g., SerilogTarget or Serilog.Sinks.NLog +module Logging = + let initialize verbose changeLogVerbose = + Log.Logger <- + LoggerConfiguration().Destructure.FSharpTypes().Enrich.FromLogContext() + |> fun c -> if verbose then c.MinimumLevel.Debug() else c + // LibLog writes to the global logger, so we need to control the emission if we don't want to pass loggers everywhere + |> fun c -> let cfpl = if changeLogVerbose then Serilog.Events.LogEventLevel.Debug else Serilog.Events.LogEventLevel.Warning + c.MinimumLevel.Override("Microsoft.Azure.Documents.ChangeFeedProcessor", cfpl) + |> fun c -> let t = "[{Timestamp:HH:mm:ss} {Level:u3}] {partitionKeyRangeId} {Message:lj} {NewLine}{Exception}" + c.WriteTo.Console(theme=Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code, outputTemplate=t) + .CreateLogger() + [] let main argv = try let args = CmdParser.parse argv diff --git a/equinox-projector/Projector/Projector.fsproj b/equinox-projector/Projector/Projector.fsproj index 78bf8aed1..3f1c4460c 100644 --- a/equinox-projector/Projector/Projector.fsproj +++ b/equinox-projector/Projector/Projector.fsproj @@ -14,9 +14,9 @@ - + - + diff --git a/equinox-sync/Sync/Program.fs b/equinox-sync/Sync/Program.fs index 2e4aa444c..e0af1a013 100644 --- a/equinox-sync/Sync/Program.fs +++ b/equinox-sync/Sync/Program.fs @@ -326,7 +326,7 @@ let run (sourceDiscovery, source) (auxDiscovery, aux) connectionPolicy (leaseId, let createRangeSyncHandler log (ctx: Core.CosmosContext) (transform : Microsoft.Azure.Documents.Document -> Ingester.Batch seq) = let sw = Stopwatch.StartNew() // we'll end up reporting the warmup/connect time on the first batch, but that's ok let writer = Ingester.SynchronousWriter(ctx, log) - let processBatch (ctx : IChangeFeedObserverContext) (docs : IReadOnlyList) = async { + let processBatch (log : ILogger) (ctx : IChangeFeedObserverContext) (docs : IReadOnlyList) = async { sw.Stop() // Stop the clock after ChangeFeedProcessor hands off to us let pt, events = Stopwatch.Time (fun () -> let items = docs |> Seq.collect transform |> Array.ofSeq @@ -334,7 +334,7 @@ let createRangeSyncHandler log (ctx: Core.CosmosContext) (transform : Microsoft. items) let! et,streams = writer.Pump() |> Stopwatch.Time let r = ctx.FeedResponse - Log.Information("Read {range,2} -{token,6} {count,4} docs {requestCharge,6}RU {l:n1}s Gen {events,5} events {p:n3}s Sync {streams,5} streams {e:n1}s", + log.Information("Read {range,2} -{token,6} {count,4} docs {requestCharge,6}RU {l:n1}s Gen {events,5} events {p:n3}s Sync {streams,5} streams {e:n1}s", ctx.PartitionKeyRangeId, r.ResponseContinuation.Trim[|'"'|], docs.Count, (let c = r.RequestCharge in c.ToString("n1")), float sw.ElapsedMilliseconds / 1000., events.Length, (let e = pt.Elapsed in e.TotalSeconds), streams, (let e = et.Elapsed in e.TotalSeconds)) sw.Restart() // restart the clock as we handoff back to the ChangeFeedProcessor diff --git a/equinox-sync/Sync/Sync.fsproj b/equinox-sync/Sync/Sync.fsproj index 8067bc8a7..57fc8d717 100644 --- a/equinox-sync/Sync/Sync.fsproj +++ b/equinox-sync/Sync/Sync.fsproj @@ -14,7 +14,7 @@ - + diff --git a/equinox-testbed/Testbed/Testbed.fsproj b/equinox-testbed/Testbed/Testbed.fsproj index 9ecdc8bf9..58b155157 100644 --- a/equinox-testbed/Testbed/Testbed.fsproj +++ b/equinox-testbed/Testbed/Testbed.fsproj @@ -17,10 +17,10 @@ - - - - + + + + diff --git a/equinox-web-csharp/Domain/Domain.csproj b/equinox-web-csharp/Domain/Domain.csproj index 6fb54a1ce..6e5cc8f78 100755 --- a/equinox-web-csharp/Domain/Domain.csproj +++ b/equinox-web-csharp/Domain/Domain.csproj @@ -5,7 +5,7 @@ - + diff --git a/equinox-web-csharp/Web/Web.csproj b/equinox-web-csharp/Web/Web.csproj index 9dd21f7ac..bf6ec770f 100755 --- a/equinox-web-csharp/Web/Web.csproj +++ b/equinox-web-csharp/Web/Web.csproj @@ -6,10 +6,10 @@ - - - - + + + + diff --git a/equinox-web/Domain/Domain.fsproj b/equinox-web/Domain/Domain.fsproj index 8faf4c316..400b8c103 100644 --- a/equinox-web/Domain/Domain.fsproj +++ b/equinox-web/Domain/Domain.fsproj @@ -12,7 +12,7 @@ - + diff --git a/equinox-web/Web/Web.fsproj b/equinox-web/Web/Web.fsproj index 1904f41a7..f435746f4 100644 --- a/equinox-web/Web/Web.fsproj +++ b/equinox-web/Web/Web.fsproj @@ -11,10 +11,10 @@ - - - - + + + +