diff --git a/equinox-ingest/Ingest/Infrastructure.fs b/equinox-ingest/Ingest/Infrastructure.fs deleted file mode 100644 index 816652591..000000000 --- a/equinox-ingest/Ingest/Infrastructure.fs +++ /dev/null @@ -1,36 +0,0 @@ -[] -module private Infrastructure - -open Equinox.Store // AwaitTaskCorrect -open System -open System.Threading -open System.Threading.Tasks - -type SemaphoreSlim with - /// F# friendly semaphore await function - member semaphore.Await(?timeout : TimeSpan) = async { - let! ct = Async.CancellationToken - let timeout = defaultArg timeout Timeout.InfiniteTimeSpan - let task = semaphore.WaitAsync(timeout, ct) - return! Async.AwaitTaskCorrect task - } - - /// Throttling wrapper which waits asynchronously until the semaphore has available capacity - member semaphore.Throttle(workflow : Async<'T>) : Async<'T> = async { - let! _ = semaphore.Await() - try return! workflow - finally semaphore.Release() |> ignore - } - -#nowarn "21" // re AwaitKeyboardInterrupt -#nowarn "40" // re AwaitKeyboardInterrupt - -type Async with - static member Sleep(t : TimeSpan) : Async = Async.Sleep(int t.TotalMilliseconds) - /// Asynchronously awaits the next keyboard interrupt event - static member AwaitKeyboardInterrupt () : Async = - Async.FromContinuations(fun (sc,_,_) -> - let isDisposed = ref 0 - let rec callback _ = Task.Run(fun () -> if Interlocked.Increment isDisposed = 1 then d.Dispose() ; sc ()) |> ignore - and d : IDisposable = Console.CancelKeyPress.Subscribe callback - in ()) \ No newline at end of file diff --git a/equinox-ingest/Ingest/Ingest.fsproj b/equinox-ingest/Ingest/Ingest.fsproj deleted file mode 100644 index 094ebc154..000000000 --- a/equinox-ingest/Ingest/Ingest.fsproj +++ /dev/null @@ -1,26 +0,0 @@ - - - - Exe - netcoreapp2.1;net461 - 5 - $(DefineConstants);NET461 - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/equinox-sync/Ingest/Ingest.fsproj b/equinox-sync/Ingest/Ingest.fsproj new file mode 100644 index 000000000..9f8ae20d5 --- /dev/null +++ b/equinox-sync/Ingest/Ingest.fsproj @@ -0,0 +1,17 @@ + + + + Exe + netcoreapp2.1 + 5 + + + + + + + + + + + \ No newline at end of file diff --git a/equinox-ingest/Ingest/Program.fs b/equinox-sync/Ingest/Program.fs similarity index 100% rename from equinox-ingest/Ingest/Program.fs rename to equinox-sync/Ingest/Program.fs diff --git a/equinox-sync/Sync/CosmosIngester.fs b/equinox-sync/Sync/CosmosIngester.fs index 4da331597..6c0f0ef2a 100644 --- a/equinox-sync/Sync/CosmosIngester.fs +++ b/equinox-sync/Sync/CosmosIngester.fs @@ -129,17 +129,6 @@ type CatStats() = member __.Clear() = cats.Clear() member __.StatsDescending = cats |> Seq.map (|KeyValue|) |> Seq.sortByDescending snd -module Queue = - let tryDequeue (x : System.Collections.Generic.Queue<'T>) = -#if NET461 - if x.Count = 0 then None - else x.Dequeue() |> Some -#else - match x.TryDequeue() with - | false, _ -> None - | true, res -> Some res -#endif - type ResultKind = TimedOut | RateLimited | Malformed | Ok type StreamStates() = diff --git a/equinox-sync/Sync/Infrastructure.fs b/equinox-sync/Sync/Infrastructure.fs index 165e548b2..30f313320 100644 --- a/equinox-sync/Sync/Infrastructure.fs +++ b/equinox-sync/Sync/Infrastructure.fs @@ -1,5 +1,5 @@ [] -module private Infrastructure +module SyncTemplate.Infrastructure open Equinox.Store // AwaitTaskCorrect open System @@ -19,6 +19,17 @@ type Async with and d : IDisposable = Console.CancelKeyPress.Subscribe callback in ()) +module Queue = + let tryDequeue (x : System.Collections.Generic.Queue<'T>) = +#if NET461 + if x.Count = 0 then None + else x.Dequeue() |> Some +#else + match x.TryDequeue() with + | false, _ -> None + | true, res -> Some res +#endif + type SemaphoreSlim with /// F# friendly semaphore await function member semaphore.Await(?timeout : System.TimeSpan) = async { diff --git a/equinox-sync/equinox-sync.sln b/equinox-sync/equinox-sync.sln index 857f5d668..be656b901 100644 --- a/equinox-sync/equinox-sync.sln +++ b/equinox-sync/equinox-sync.sln @@ -11,6 +11,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution EndProject Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Sync.Tests", "Sync.Tests\Sync.Tests.fsproj", "{1A5997B1-48F3-43FC-B5AD-661EF4B8B15D}" EndProject +Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Ingest", "Ingest\Ingest.fsproj", "{BB7079A7-53E8-4843-8981-78DD025F8C91}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -25,6 +27,10 @@ Global {1A5997B1-48F3-43FC-B5AD-661EF4B8B15D}.Debug|Any CPU.Build.0 = Debug|Any CPU {1A5997B1-48F3-43FC-B5AD-661EF4B8B15D}.Release|Any CPU.ActiveCfg = Release|Any CPU {1A5997B1-48F3-43FC-B5AD-661EF4B8B15D}.Release|Any CPU.Build.0 = Release|Any CPU + {BB7079A7-53E8-4843-8981-78DD025F8C91}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {BB7079A7-53E8-4843-8981-78DD025F8C91}.Debug|Any CPU.Build.0 = Debug|Any CPU + {BB7079A7-53E8-4843-8981-78DD025F8C91}.Release|Any CPU.ActiveCfg = Release|Any CPU + {BB7079A7-53E8-4843-8981-78DD025F8C91}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE