Skip to content

Commit

Permalink
Correct (and tune) equinox-sync template progress/error management (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Apr 12, 2019
1 parent 464eef6 commit d8272f1
Show file tree
Hide file tree
Showing 7 changed files with 521 additions and 194 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `dotnet new eqxetl` is now `dotnet new eqxsync`
- `dotnet new eqxsync` now supports command-line category white/blacklist [#18](https://github.com/jet/dotnet-templates/pull/18)
- `dotnet new eqxsync` now supports command-line selection of an `aux` collection in either the `source` or destination collections [#18](https://github.com/jet/dotnet-templates/pull/18)
- `dotnet new eqxsync` has separated processing for progress computation, progress writing and batch loading (this also happens to be the only way in which to balance throughput with correctness in the context of a ChangeFeedProcessor) [#19](https://github.com/jet/dotnet-templates/pull/19)
- Targets `Equinox`.* v `2.0.0-preview5`
- `dotnet new eqxprojector` now uses `Jet.ConfluentKafka.FSharp 1.0.0-rc2` (which uses `Confluent.Kafka 1.0.0-RC3`, `librdkafka 1.0.0`)

### Removed
### Fixed555
### Fixed

<a name="2.0.0"></a>
## [2.0.0] - 2019-03-26
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module SyncTemplate.Tests.IngesterTests
module SyncTemplate.Tests.CosmosIngesterTests

open SyncTemplate.Program.Ingester
open SyncTemplate.Program.CosmosIngester
open Swensen.Unquote
open Xunit

Expand Down
38 changes: 38 additions & 0 deletions equinox-sync/Sync.Tests/ProgressTests.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
module ProgressTests

open SyncTemplate.Program

open Swensen.Unquote
open Xunit

let [<Fact>] ``Empty has zero streams pending or progress to write`` () =
let sut = Progress.State<_>()
let validatedPos, batches = sut.Validate(fun _ -> None)
None =! validatedPos
0 =! batches

let [<Fact>] ``Can add multiple batches`` () =
let sut = Progress.State<_>()
sut.AppendBatch(0,["a",1L; "b",2L])
sut.AppendBatch(1,["b",2L; "c",3L])
let validatedPos, batches = sut.Validate(fun _ -> None)
None =! validatedPos
2 =! batches

let [<Fact>] ``Marking Progress Removes batches and updates progress`` () =
let sut = Progress.State<_>()
sut.AppendBatch(0,["a",1L; "b",2L])
sut.MarkStreamProgress("a",1L)
sut.MarkStreamProgress("b",1L)
let validatedPos, batches = sut.Validate(fun _ -> None)
None =! validatedPos
1 =! batches

let [<Fact>] ``Marking progress is not persistent`` () =
let sut = Progress.State<_>()
sut.AppendBatch(0,["a",1L])
sut.MarkStreamProgress("a",2L)
sut.AppendBatch(1,["a",1L; "b",2L])
let validatedPos, batches = sut.Validate(fun _ -> None)
Some 0 =! validatedPos
1 =! batches
3 changes: 2 additions & 1 deletion equinox-sync/Sync.Tests/Sync.Tests.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
</PropertyGroup>

<ItemGroup>
<Compile Include="Tests.fs" />
<Compile Include="CosmosIngesterTests.fs" />
<Compile Include="ProgressTests.fs" />
</ItemGroup>

<ItemGroup>
Expand Down
11 changes: 11 additions & 0 deletions equinox-sync/Sync/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@ type Async with
and d : IDisposable = Console.CancelKeyPress.Subscribe callback
in ())

module Queue =
let tryDequeue (x : System.Collections.Generic.Queue<'T>) =
#if NET461
if x.Count = 0 then None
else x.Dequeue() |> Some
#else
match x.TryDequeue() with
| false, _ -> None
| true, res -> Some res
#endif

open Equinox.Store // AwaitTaskCorrect

type SemaphoreSlim with
Expand Down
655 changes: 465 additions & 190 deletions equinox-sync/Sync/Program.fs

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions equinox-sync/Sync/Sync.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
<PackageReference Include="Equinox.Cosmos.Projection" Version="2.0.0-preview5" />
<PackageReference Include="Microsoft.Azure.DocumentDB.ChangeFeedProcessor" Version="2.2.6" />
<PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" />
<PackageReference Include="Serilog.Sinks.Seq" Version="4.0.0" />
</ItemGroup>

</Project>

0 comments on commit d8272f1

Please sign in to comment.