Skip to content

Commit

Permalink
First cut EventStore tailing and progress writing
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Apr 10, 2019
1 parent 423e93c commit b40aefd
Show file tree
Hide file tree
Showing 4 changed files with 366 additions and 186 deletions.
36 changes: 36 additions & 0 deletions equinox-sync/Sync.Tests/ProgressTests.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
module ProgressTests

open SyncTemplate.Program

open Swensen.Unquote
open Xunit

let [<Fact>] ``Empty has zero streams pending or progress to write`` () =
let sut = Progress.State<_>()
None =! sut.ValidatedPos
0 =! sut.PendingStreamCount

let [<Fact>] ``Can add multiple batches`` () =
let sut = Progress.State<_>()
sut.AddBatch(0,["a",1L; "b",2L])
sut.AddBatch(1,["b",2L; "c",3L])
None =! sut.ValidatedPos
3 =! sut.PendingStreamCount
2 =! sut.PendingBatchCount

let [<Fact>] ``Marking Progress Removes batches and updates progress`` () =
let sut = Progress.State<_>()
sut.AddBatch(0,["a",1L; "b",2L])
sut.MarkStreamProgress("a",1L)
sut.MarkStreamProgress("b",1L)
None =! sut.ValidatedPos
1 =! sut.PendingStreamCount
1 =! sut.PendingBatchCount

let [<Fact>] ``Marking progress is not persistent`` () =
let sut = Progress.State<_>()
sut.AddBatch(0,["a",1L])
sut.MarkStreamProgress("a",2L)
sut.AddBatch(1,["a",1L; "b",2L])
2 =! sut.PendingStreamCount
1 =! sut.PendingBatchCount
1 change: 1 addition & 0 deletions equinox-sync/Sync.Tests/Sync.Tests.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
</PropertyGroup>

<ItemGroup>
<Compile Include="ProgressTests.fs" />
<Compile Include="CosmosIngesterTests.fs" />
</ItemGroup>

Expand Down
11 changes: 11 additions & 0 deletions equinox-sync/Sync/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@ type Async with
and d : IDisposable = Console.CancelKeyPress.Subscribe callback
in ())

module Queue =
let tryDequeue (x : System.Collections.Generic.Queue<'T>) =
#if NET461
if x.Count = 0 then None
else x.Dequeue() |> Some
#else
match x.TryDequeue() with
| false, _ -> None
| true, res -> Some res
#endif

open Equinox.Store // AwaitTaskCorrect

type SemaphoreSlim with
Expand Down
Loading

0 comments on commit b40aefd

Please sign in to comment.