Skip to content

Commit

Permalink
Target Equinox 2.0.0-preview5
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Apr 12, 2019
1 parent 8bd1dde commit 464eef6
Show file tree
Hide file tree
Showing 11 changed files with 50 additions and 52 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `dotnet new eqxetl` is now `dotnet new eqxsync`
- `dotnet new eqxsync` now supports command-line category white/blacklist [#18](https://github.com/jet/dotnet-templates/pull/18)
- `dotnet new eqxsync` now supports command-line selection of an `aux` collection in either the `source` or destination collections [#18](https://github.com/jet/dotnet-templates/pull/18)
- Targets `Equinox`.* v `2.0.0-preview4`
- Targets `Equinox`.* v `2.0.0-preview5`
- `dotnet new eqxprojector` now uses `Jet.ConfluentKafka.FSharp 1.0.0-rc2` (which uses `Confluent.Kafka 1.0.0-RC3`, `librdkafka 1.0.0`)

### Removed
### Fixed
### Fixed555

<a name="2.0.0"></a>
## [2.0.0] - 2019-03-26
Expand Down
4 changes: 2 additions & 2 deletions equinox-projector/Consumer/Consumer.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
<ItemGroup>
<PackageReference Include="Argu" Version="5.2.0" />
<PackageReference Include="Destructurama.FSharp.NetCore" Version="1.0.14" />
<PackageReference Include="Equinox.Codec" Version="2.0.0-preview4" />
<PackageReference Include="Equinox.Projection" Version="2.0.0-preview4" />
<PackageReference Include="Equinox.Codec" Version="2.0.0-preview5" />
<PackageReference Include="Equinox.Projection" Version="2.0.0-preview5" />
<PackageReference Include="Jet.ConfluentKafka.FSharp" Version="1.0.0-rc2" />
<PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" />
</ItemGroup>
Expand Down
56 changes: 27 additions & 29 deletions equinox-projector/Projector/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -130,19 +130,6 @@ module CmdParser =
let parser = ArgumentParser.Create<Parameters>(programName = programName)
parser.ParseCommandLine argv |> Arguments

// Illustrates how to emit direct to the Console using Serilog
// Other topographies can be achieved by using various adapters and bridges, e.g., SerilogTarget or Serilog.Sinks.NLog
module Logging =
let initialize verbose changeLogVerbose =
Log.Logger <-
LoggerConfiguration().Destructure.FSharpTypes().Enrich.FromLogContext()
|> fun c -> if verbose then c.MinimumLevel.Debug() else c
// LibLog writes to the global logger, so we need to control the emission if we don't want to pass loggers everywhere
|> fun c -> let cfpl = if changeLogVerbose then Serilog.Events.LogEventLevel.Debug else Serilog.Events.LogEventLevel.Warning
c.MinimumLevel.Override("Microsoft.Azure.Documents.ChangeFeedProcessor", cfpl)
|> fun c -> c.WriteTo.Console(theme=Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code)
.CreateLogger()

let run discovery connectionPolicy source
(aux, leaseId, forceSkip, batchSize, lagReportFreq : TimeSpan option)
createRangeProjector = async {
Expand All @@ -162,39 +149,50 @@ let mkRangeProjector (broker, topic) =
let cfg = KafkaProducerConfig.Create("ProjectorTemplate", broker, Acks.Leader, compression = CompressionType.Lz4)
let producer = KafkaProducer.Create(Log.Logger, cfg, topic)
let disposeProducer = (producer :> IDisposable).Dispose
let projectBatch (ctx : IChangeFeedObserverContext) (docs : IReadOnlyList<Microsoft.Azure.Documents.Document>) = async {
let projectBatch (log : ILogger) (ctx : IChangeFeedObserverContext) (docs : IReadOnlyList<Microsoft.Azure.Documents.Document>) = async {
sw.Stop() // Stop the clock after ChangeFeedProcessor hands off to us
let toKafkaEvent (e: DocumentParser.IEvent) : RenderedEvent =
// Late breaking hack compensating for the way the Equinox.Codec JsonConverter renders invalid json
// TODO remove when it's fixed
let meta' = if e.Meta = null || e.Meta.Length = 0 then System.Text.Encoding.UTF8.GetBytes("{}") else e.Meta
{ s = e.Stream; i = e.Index; c = e.EventType; t = e.Timestamp; d = e.Data; m = meta' }
let toKafkaEvent (e: DocumentParser.IEvent) : RenderedEvent = { s = e.Stream; i = e.Index; c = e.EventType; t = e.Timestamp; d = e.Data; m = e.Meta }
let pt,events = (fun () -> docs |> Seq.collect DocumentParser.enumEvents |> Seq.map toKafkaEvent |> Array.ofSeq) |> Stopwatch.Time
let es = [| for e in events -> e.s, JsonConvert.SerializeObject e |]
let! et,_ = producer.ProduceBatch es |> Stopwatch.Time
let r = ctx.FeedResponse
let! et,() = async {
let! _ = producer.ProduceBatch es
do! ctx.CheckpointAsync() |> Async.AwaitTaskCorrect } |> Stopwatch.Time

Log.Information("Read {range,2} -{token,6} {count,4} docs {requestCharge,6}RU {l:n1}s Parse {events,5} events {p:n3}s Emit {e:n1}s",
ctx.PartitionKeyRangeId, r.ResponseContinuation.Trim[|'"'|], docs.Count, (let c = r.RequestCharge in c.ToString("n1")),
log.Information("Read -{token,6} {count,4} docs {requestCharge,6}RU {l:n1}s Parse {events,5} events {p:n3}s Emit {e:n1}s",
ctx.FeedResponse.ResponseContinuation.Trim[|'"'|], docs.Count, (let c = ctx.FeedResponse.RequestCharge in c.ToString("n1")),
float sw.ElapsedMilliseconds / 1000., events.Length, (let e = pt.Elapsed in e.TotalSeconds), (let e = et.Elapsed in e.TotalSeconds))
sw.Restart() // restart the clock as we handoff back to the ChangeFeedProcessor
}
ChangeFeedObserver.Create(Log.Logger, projectBatch, disposeProducer)
ChangeFeedObserver.Create(Log.Logger, projectBatch, dispose = disposeProducer)
//#else
let createRangeHandler () =
let sw = Stopwatch.StartNew() // we'll end up reporting the warmup/connect time on the first batch, but that's ok
let processBatch (ctx : IChangeFeedObserverContext) (docs : IReadOnlyList<Microsoft.Azure.Documents.Document>) = async {
let processBatch (log : ILogger) (ctx : IChangeFeedObserverContext) (docs : IReadOnlyList<Microsoft.Azure.Documents.Document>) = async {
sw.Stop() // Stop the clock after ChangeFeedProcessor hands off to us
let pt,events = (fun () -> docs |> Seq.collect DocumentParser.enumEvents |> Seq.length) |> Stopwatch.Time
let r = ctx.FeedResponse
Log.Information("Read {range,2} -{token,6} {count,4} docs {requestCharge,6}RU {l:n1}s Parse {events,5} events {p:n3}s",
ctx.PartitionKeyRangeId, r.ResponseContinuation.Trim[|'"'|], docs.Count, (let c = r.RequestCharge in c.ToString("n1")),
float sw.ElapsedMilliseconds / 1000., events, (let e = pt.Elapsed in e.TotalSeconds))
let! ct,() = async { do! ctx.CheckpointAsync() |> Async.AwaitTaskCorrect } |> Stopwatch.Time
log.Information("Read -{token,6} {count,4} docs {requestCharge,6}RU {l:n1}s Parse {events,5} events {p:n3}s Checkpoint {c:n3}s",
ctx.FeedResponse.ResponseContinuation.Trim[|'"'|], docs.Count, (let c = ctx.FeedResponse.RequestCharge in c.ToString("n1")),
float sw.ElapsedMilliseconds / 1000., events, (let e = pt.Elapsed in e.TotalSeconds), (let e = ct.Elapsed in e.TotalSeconds))
sw.Restart() // restart the clock as we handoff back to the ChangeFeedProcessor
}
ChangeFeedObserver.Create(Log.Logger, processBatch)
//#endif

// Illustrates how to emit direct to the Console using Serilog
// Other topographies can be achieved by using various adapters and bridges, e.g., SerilogTarget or Serilog.Sinks.NLog
module Logging =
let initialize verbose changeLogVerbose =
Log.Logger <-
LoggerConfiguration().Destructure.FSharpTypes().Enrich.FromLogContext()
|> fun c -> if verbose then c.MinimumLevel.Debug() else c
// LibLog writes to the global logger, so we need to control the emission if we don't want to pass loggers everywhere
|> fun c -> let cfpl = if changeLogVerbose then Serilog.Events.LogEventLevel.Debug else Serilog.Events.LogEventLevel.Warning
c.MinimumLevel.Override("Microsoft.Azure.Documents.ChangeFeedProcessor", cfpl)
|> fun c -> let t = "[{Timestamp:HH:mm:ss} {Level:u3}] {partitionKeyRangeId} {Message:lj} {NewLine}{Exception}"
c.WriteTo.Console(theme=Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code, outputTemplate=t)
.CreateLogger()

[<EntryPoint>]
let main argv =
try let args = CmdParser.parse argv
Expand Down
4 changes: 2 additions & 2 deletions equinox-projector/Projector/Projector.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
<ItemGroup>
<PackageReference Include="Argu" Version="5.2.0" />
<PackageReference Include="Destructurama.FSharp.NetCore" Version="1.0.14" />
<PackageReference Include="Equinox.Cosmos.Projection" Version="2.0.0-preview4" />
<PackageReference Include="Equinox.Cosmos.Projection" Version="2.0.0-preview5" />
<!--#if (kafka)-->
<PackageReference Include="Equinox.Projection" Version="2.0.0-preview4" />
<PackageReference Include="Equinox.Projection" Version="2.0.0-preview5" />
<PackageReference Include="Jet.ConfluentKafka.FSharp" Version="1.0.0-rc2" />
<!--#endif-->
<PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" />
Expand Down
4 changes: 2 additions & 2 deletions equinox-sync/Sync/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -326,15 +326,15 @@ let run (sourceDiscovery, source) (auxDiscovery, aux) connectionPolicy (leaseId,
let createRangeSyncHandler log (ctx: Core.CosmosContext) (transform : Microsoft.Azure.Documents.Document -> Ingester.Batch seq) =
let sw = Stopwatch.StartNew() // we'll end up reporting the warmup/connect time on the first batch, but that's ok
let writer = Ingester.SynchronousWriter(ctx, log)
let processBatch (ctx : IChangeFeedObserverContext) (docs : IReadOnlyList<Microsoft.Azure.Documents.Document>) = async {
let processBatch (log : ILogger) (ctx : IChangeFeedObserverContext) (docs : IReadOnlyList<Microsoft.Azure.Documents.Document>) = async {
sw.Stop() // Stop the clock after ChangeFeedProcessor hands off to us
let pt, events = Stopwatch.Time (fun () ->
let items = docs |> Seq.collect transform |> Array.ofSeq
items |> Array.iter writer.Add
items)
let! et,streams = writer.Pump() |> Stopwatch.Time
let r = ctx.FeedResponse
Log.Information("Read {range,2} -{token,6} {count,4} docs {requestCharge,6}RU {l:n1}s Gen {events,5} events {p:n3}s Sync {streams,5} streams {e:n1}s",
log.Information("Read {range,2} -{token,6} {count,4} docs {requestCharge,6}RU {l:n1}s Gen {events,5} events {p:n3}s Sync {streams,5} streams {e:n1}s",
ctx.PartitionKeyRangeId, r.ResponseContinuation.Trim[|'"'|], docs.Count, (let c = r.RequestCharge in c.ToString("n1")),
float sw.ElapsedMilliseconds / 1000., events.Length, (let e = pt.Elapsed in e.TotalSeconds), streams, (let e = et.Elapsed in e.TotalSeconds))
sw.Restart() // restart the clock as we handoff back to the ChangeFeedProcessor
Expand Down
2 changes: 1 addition & 1 deletion equinox-sync/Sync/Sync.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<ItemGroup>
<PackageReference Include="Argu" Version="5.2.0" />
<PackageReference Include="Destructurama.FSharp.NetCore" Version="1.0.14" />
<PackageReference Include="Equinox.Cosmos.Projection" Version="2.0.0-preview4" />
<PackageReference Include="Equinox.Cosmos.Projection" Version="2.0.0-preview5" />
<PackageReference Include="Microsoft.Azure.DocumentDB.ChangeFeedProcessor" Version="2.2.6" />
<PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" />
</ItemGroup>
Expand Down
8 changes: 4 additions & 4 deletions equinox-testbed/Testbed/Testbed.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
<ItemGroup>
<PackageReference Include="Argu" Version="5.2.0" />
<PackageReference Include="Destructurama.FSharp.NetCore" Version="1.0.14" />
<PackageReference Include="Equinox.Cosmos" Version="2.0.0-preview4" />
<PackageReference Include="Equinox.EventStore" Version="2.0.0-preview4" />
<PackageReference Include="Equinox.MemoryStore" Version="2.0.0-preview4" />
<PackageReference Include="Equinox.Tools.TestHarness" Version="2.0.0-preview4" />
<PackageReference Include="Equinox.Cosmos" Version="2.0.0-preview5" />
<PackageReference Include="Equinox.EventStore" Version="2.0.0-preview5" />
<PackageReference Include="Equinox.MemoryStore" Version="2.0.0-preview5" />
<PackageReference Include="Equinox.Tools.TestHarness" Version="2.0.0-preview5" />
<PackageReference Include="FSharp.UMX" Version="1.0.0-preview-001" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.1.1" />
<PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" />
Expand Down
2 changes: 1 addition & 1 deletion equinox-web-csharp/Domain/Domain.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Equinox" Version="2.0.0-preview4" />
<PackageReference Include="Equinox" Version="2.0.0-preview5" />
<PackageReference Include="FSharp.Core" Version="4.5.4" />
<PackageReference Include="Newtonsoft.Json" Version="11.0.2" />
</ItemGroup>
Expand Down
8 changes: 4 additions & 4 deletions equinox-web-csharp/Web/Web.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Equinox.Codec" Version="2.0.0-preview4" />
<PackageReference Include="Equinox.Cosmos" Version="2.0.0-preview4" />
<PackageReference Include="Equinox.EventStore" Version="2.0.0-preview4" />
<PackageReference Include="Equinox.MemoryStore" Version="2.0.0-preview4" />
<PackageReference Include="Equinox.Codec" Version="2.0.0-preview5" />
<PackageReference Include="Equinox.Cosmos" Version="2.0.0-preview5" />
<PackageReference Include="Equinox.EventStore" Version="2.0.0-preview5" />
<PackageReference Include="Equinox.MemoryStore" Version="2.0.0-preview5" />
<PackageReference Include="Microsoft.AspNetCore.App" />
<PackageReference Include="Serilog.AspNetCore" Version="2.1.1" />
<PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" />
Expand Down
2 changes: 1 addition & 1 deletion equinox-web/Domain/Domain.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Equinox" Version="2.0.0-preview4" />
<PackageReference Include="Equinox" Version="2.0.0-preview5" />
<PackageReference Include="FSharp.UMX" Version="1.0.0-preview-001" />
<PackageReference Include="TypeShape" Version="6.0.0" />
</ItemGroup>
Expand Down
8 changes: 4 additions & 4 deletions equinox-web/Web/Web.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Equinox.Codec" Version="2.0.0-preview4" />
<PackageReference Include="Equinox.Cosmos" Version="2.0.0-preview4" />
<PackageReference Include="Equinox.EventStore" Version="2.0.0-preview4" />
<PackageReference Include="Equinox.MemoryStore" Version="2.0.0-preview4" />
<PackageReference Include="Equinox.Codec" Version="2.0.0-preview5" />
<PackageReference Include="Equinox.Cosmos" Version="2.0.0-preview5" />
<PackageReference Include="Equinox.EventStore" Version="2.0.0-preview5" />
<PackageReference Include="Equinox.MemoryStore" Version="2.0.0-preview5" />
<PackageReference Include="Microsoft.AspNetCore.App" />
<PackageReference Include="Serilog.AspNetCore" Version="2.1.1" />
<PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" />
Expand Down

0 comments on commit 464eef6

Please sign in to comment.