From 5a036a9cb77c34c38494a534c28694130144683c Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Tue, 4 May 2021 14:04:03 +0100 Subject: [PATCH 01/20] Handle overflows of requests --- dotnet-templates.sln | 19 +++ .../.template.config/template.json | 18 ++ .../Domain.Tests/Domain.Tests.fsproj | 26 +++ .../Domain.Tests/TipEpochCarryForward.fs | 22 +++ equinox-patterns/Domain/Domain.fsproj | 18 ++ equinox-patterns/Domain/Epoch.fs | 156 ++++++++++++++++++ equinox-patterns/Domain/Infrastructure.fs | 26 +++ equinox-patterns/Domain/Types.fs | 12 ++ equinox-patterns/Patterns.sln | 48 ++++++ 9 files changed, 345 insertions(+) create mode 100644 equinox-patterns/.template.config/template.json create mode 100644 equinox-patterns/Domain.Tests/Domain.Tests.fsproj create mode 100644 equinox-patterns/Domain.Tests/TipEpochCarryForward.fs create mode 100644 equinox-patterns/Domain/Domain.fsproj create mode 100644 equinox-patterns/Domain/Epoch.fs create mode 100644 equinox-patterns/Domain/Infrastructure.fs create mode 100644 equinox-patterns/Domain/Types.fs create mode 100644 equinox-patterns/Patterns.sln diff --git a/dotnet-templates.sln b/dotnet-templates.sln index e20dd46f0..4c1cd7e25 100644 --- a/dotnet-templates.sln +++ b/dotnet-templates.sln @@ -111,6 +111,15 @@ EndProjectSection EndProject Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Pruner", "propulsion-pruner\Pruner.fsproj", "{A0FB44F5-15E5-47C8-81E5-1991269849CB}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "eqxPatterns", "eqxPatterns", "{76721F1E-851C-4970-A276-DF61FCE3DA23}" +ProjectSection(SolutionItems) = preProject + equinox-patterns\.template.config\template.json = equinox-patterns\.template.config\template.json +EndProjectSection +EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Domain", "equinox-patterns\Domain\Domain.fsproj", "{8D9867A9-1B5D-4AD3-A890-ACC81D011C00}" +EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Domain.Tests", "equinox-patterns\Domain.Tests\Domain.Tests.fsproj", "{C899EB07-FEC8-41F8-95DC-203DA80F5A32}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -191,6 +200,14 @@ Global {A0FB44F5-15E5-47C8-81E5-1991269849CB}.Debug|Any CPU.Build.0 = Debug|Any CPU {A0FB44F5-15E5-47C8-81E5-1991269849CB}.Release|Any CPU.ActiveCfg = Release|Any CPU {A0FB44F5-15E5-47C8-81E5-1991269849CB}.Release|Any CPU.Build.0 = Release|Any CPU + {8D9867A9-1B5D-4AD3-A890-ACC81D011C00}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {8D9867A9-1B5D-4AD3-A890-ACC81D011C00}.Debug|Any CPU.Build.0 = Debug|Any CPU + {8D9867A9-1B5D-4AD3-A890-ACC81D011C00}.Release|Any CPU.ActiveCfg = Release|Any CPU + {8D9867A9-1B5D-4AD3-A890-ACC81D011C00}.Release|Any CPU.Build.0 = Release|Any CPU + {C899EB07-FEC8-41F8-95DC-203DA80F5A32}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {C899EB07-FEC8-41F8-95DC-203DA80F5A32}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C899EB07-FEC8-41F8-95DC-203DA80F5A32}.Release|Any CPU.ActiveCfg = Release|Any CPU + {C899EB07-FEC8-41F8-95DC-203DA80F5A32}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(NestedProjects) = preSolution {F66A5BFE-7C81-44DC-97DE-3FD8C83B8F06} = {B72FFAAE-7801-41B2-86F5-FD90E97A30F7} @@ -210,5 +227,7 @@ Global {83BA87C3-6288-40F4-BC4F-EC3A54586CDF} = {DAE9E2B9-EDA2-4064-B0CE-FD5294549374} {B9976751-C3A6-4F8B-BEF4-278382D8EAA6} = {BBD6F425-C2F4-4857-BD68-2B13D0B4EDDE} {A0FB44F5-15E5-47C8-81E5-1991269849CB} = {7F80222A-2687-4E91-8F5B-4717DD13DEF5} + {8D9867A9-1B5D-4AD3-A890-ACC81D011C00} = {76721F1E-851C-4970-A276-DF61FCE3DA23} + {C899EB07-FEC8-41F8-95DC-203DA80F5A32} = {76721F1E-851C-4970-A276-DF61FCE3DA23} EndGlobalSection EndGlobal diff --git a/equinox-patterns/.template.config/template.json b/equinox-patterns/.template.config/template.json new file mode 100644 index 000000000..ae9dce542 --- /dev/null +++ b/equinox-patterns/.template.config/template.json @@ -0,0 +1,18 @@ +{ + "$schema": "http://json.schemastore.org/template", + "author": "@jet @bartelink", + "classifications": [ + "Equinox", + "Event Sourcing", + "Closing Books" + ], + "tags": { + "language": "F#", + "type": "project" + }, + "identity": "Equinox.Patterns", + "name": "Equinox Patterns Sample", + "shortName": "eqxPatterns", + "sourceName": "Patterns", + "preferNameDirectory": true +} \ No newline at end of file diff --git a/equinox-patterns/Domain.Tests/Domain.Tests.fsproj b/equinox-patterns/Domain.Tests/Domain.Tests.fsproj new file mode 100644 index 000000000..c63ab4cfd --- /dev/null +++ b/equinox-patterns/Domain.Tests/Domain.Tests.fsproj @@ -0,0 +1,26 @@ + + + + netcoreapp3.1 + 5 + false + Library + + + + + + + + + + + + + + + + + + + diff --git a/equinox-patterns/Domain.Tests/TipEpochCarryForward.fs b/equinox-patterns/Domain.Tests/TipEpochCarryForward.fs new file mode 100644 index 000000000..f139861a9 --- /dev/null +++ b/equinox-patterns/Domain.Tests/TipEpochCarryForward.fs @@ -0,0 +1,22 @@ +/// This test and implementation pairing demonstrates how one might accomplish a pattern +module Patterns.Domain.Tests.TipEpochCarryForward + +open Patterns.Domain +open Patterns.Domain.Epoch +open Swensen.Unquote +open Xunit + +[] +let ``Happy path`` () = + let store = Equinox.MemoryStore.VolatileStore() + let service = MemoryStore.create store + let go = Async.RunSynchronously + let add epoch events = service.Transact(EpochId.parse epoch, (fun es _state -> (), es), events) |> go + let read epoch = service.Read(EpochId.parse epoch) |> go + add 0 [Events.Added {items = [| "a"; "b" |]}] + test <@ Fold.Open [|"a"; "b"|] = read 0 @> + add 1 [Events.Added {items = [| "c"; "d" |]}] + test <@ Fold.Closed ([|"a"; "b"|], [|"a"; "b"|]) = read 0 @> + test <@ Fold.Open [|"a"; "b"; "c"; "d" |] = read 1 @> + add 0 [Events.Added {items = [| "e" |]}] + test <@ Fold.Open [|"a"; "b"; "c"; "d"; "e" |] = read 1 @> diff --git a/equinox-patterns/Domain/Domain.fsproj b/equinox-patterns/Domain/Domain.fsproj new file mode 100644 index 000000000..bc2b04d0e --- /dev/null +++ b/equinox-patterns/Domain/Domain.fsproj @@ -0,0 +1,18 @@ + + + + netstandard2.0 + + + + + + + + + + + + + + diff --git a/equinox-patterns/Domain/Epoch.fs b/equinox-patterns/Domain/Epoch.fs new file mode 100644 index 000000000..07da66cc6 --- /dev/null +++ b/equinox-patterns/Domain/Epoch.fs @@ -0,0 +1,156 @@ +/// Illustrates a high level approach to how one might manage a chained series of epochs which can be logically Closed +/// When the target Epoch is Closed, all write attempts attempts are required to adhere to a protocol consisting of +/// a) all preceding epochs must be closed, idempotently computing and persisting or honoring a previous computed balance +/// b) the decision is processed within the target epoch (which may be either Open, or being opened as part of this flow) +/// c) if appropriate, the target epoch may be closed as part of the same decision flow if `decideCarryForward` yields Some +module Patterns.Domain.Epoch + +let [] Category = "Epoch" +let streamName epochId = FsCodec.StreamName.create Category (EpochId.toString epochId) + +// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care +module Events = + + type ItemIds = { items : string[] } + type Balance = ItemIds + type Event = + | BroughtForward of Balance + | Added of ItemIds + | Removed of ItemIds + | CarriedForward of Balance + interface TypeShape.UnionContract.IUnionContract + let codec = FsCodec.NewtonsoftJson.Codec.Create() + +module Fold = + + type State = + | Initial + | Open of items : OpenState + | Closed of items : string[] * carryingForward : string[] + and OpenState = string[] + let initial : State = Initial + let (|Items|) = function Initial -> [||] | Open i | Closed (i, _) -> i + open Events + let evolve (Items items) = function + | BroughtForward e + | Added e -> Open (Array.append items e.items) + | Removed e -> Open (items |> Array.except e.items) + | CarriedForward e -> Closed (items, e.items) + let fold = Seq.fold evolve + + /// Handles one-time opening of the Epoch, if applicable + let maybeOpen (getIncomingBalance : unit -> Async) state = async { + match state with + | Initial -> let! balance = getIncomingBalance () + return (), [BroughtForward balance] + | Open _ + | Closed _ -> return (), [] } + + /// Handles attempting to apply the request to the stream (assuming it's not already closed) + /// The `decide` function can signal a need to close and/or split the request by emitting it as the residual + let tryIngest (decide : 'req -> State -> 'req * 'result * Event list) req = function + | Initial -> failwith "Invalid tryIngest; stream not Open" + | Open _ as s -> let residual, result, events = decide req s + (residual, Some result), events + | Closed _ -> (req, None), [] + + /// Yields or computes the Balance to be Carried forward and/or application of the event representing that decision + let maybeClose (decideCarryForward : 'residual -> OpenState -> Async) residual state = async { + match state with + | Initial -> return failwith "Invalid maybeClose; stream not Open" + | Open s -> let! cf = decideCarryForward residual s + let events = cf |> Option.map CarriedForward |> Option.toList + return (residual, cf), events + | Closed (_, b) -> return (residual, Some { items = b }), [] } + +[] +type Rules<'request, 'result> = + { getIncomingBalance : unit -> Async + decideIngestion : 'request -> Fold.State -> 'request * 'result * Events.Event list + decideCarryForward : 'request -> Fold.OpenState -> Async } + +/// The result of the overall ingestion, consisting of +type Result<'request, 'result> = + { /// residual of the request, in the event where it was not possible to ingest it completely + residual : 'request + /// The result of the decision (assuming processing took place) + result : 'result option + /// balance being carried forward in the event that the successor epoch has yet to have the BroughtForward event generated + carryForward : Events.Balance option } + +/// Decision function ensuring the high level rules of an Epoch are adhered to viz. +/// 1. Streams must open with a BroughtForward event (obtained via Rules.getIncomingBalance if this is an uninitialized Epoch) +/// 2. (If the Epoch has not closed) Rules.decide gets to map the request to events and a residual +/// 3. Rules.decideCarryForward may trigger the closing of the Epoch based on the residual and the stream State by emitting Some balance +let decideIngestWithCarryForward rules req s : Async * Events.Event list> = async { + let acc = Accumulator(s, Fold.fold) + do! acc.TransactAsync(Fold.maybeOpen rules.getIncomingBalance) + let residual, result = acc.Transact(Fold.tryIngest rules.decideIngestion req) + let! residual, carryForward = acc.TransactAsync(Fold.maybeClose rules.decideCarryForward residual) + return { residual = residual; result = result; carryForward = carryForward }, acc.Events +} + +/// Manages Application of Requests to the Epoch's stream, including closing preceding epochs as appropriate +type Service(resolve : EpochId -> Equinox.Decider) = + + let calcBalance state = + let createEventsBalance items : Events.Balance = { items = items } + async { return createEventsBalance state } + + /// Walks back as far as necessary to ensure any preceding Epochs that are not yet Closed are, then closes the target if necessary + /// Yields the accumulated balance to be carried forward into the next epoch + member private x.Close epochId : Async = + let rules : Rules = + { getIncomingBalance = fun () -> x.Close epochId + decideIngestion = fun () _state -> (), (), [] + decideCarryForward = fun () state -> async { let! bal = calcBalance state in return Some bal } } // always close + let decider = resolve epochId + decider.TransactEx((fun c -> decideIngestWithCarryForward rules () c.State), fun r _c -> Option.get r.carryForward) + + /// Runs the decision function on the specified Epoch, closing and bringing forward balances from preceding Epochs if necessary + member private x.TryTransact(epochId, getIncoming, decide : 'request -> Fold.State -> 'request * 'result * Events.Event list, request) : Async> = + let rules : Rules<'request, 'result> = + { getIncomingBalance = getIncoming + decideIngestion = fun request state -> let residual, result, events = decide request state in residual, result, events + decideCarryForward = fun _res _state -> async { return None } } // never close + let decider = resolve epochId + decider.TransactEx((fun c -> decideIngestWithCarryForward rules request c.State), fun r _c -> r) + + /// Runs the decision function on the specified Epoch, closing and bringing forward balances from preceding Epochs if necessary + member x.Transact(epochId, decide : 'request -> Fold.State -> 'result * Events.Event list, request : 'request) : Async<'result> = + let rec aux epochId getIncoming request = async { + let decide req state = + let res, es = decide (Option.get req) state + None, res, es + match! x.TryTransact(epochId, getIncoming, decide, request) with + | { residual = None; result = Some r } -> return r + | { residual = r; carryForward = cf } -> return! aux (EpochId.next epochId) (fun () -> async { return Option.get cf }) r } + let getIncoming () = + match EpochId.tryPrev epochId with + | None -> calcBalance [||] + | Some prevEpochId -> x.Close prevEpochId + aux epochId getIncoming (Some request) +(* +/// The result of the overall ingestion, consisting of +type Result<'request, 'result> = + { /// residual of the request, in the event where it was not possible to ingest it completely + residual : 'request + /// The result of the decision (assuming processing took place) + result : 'result option + /// balance being carried forward in the event that the successor epoch has yet to have the BroughtForward event generated + carryForward : Events.Balance option } +*) + /// Exposes the full state to a reader (which is appropriate for a demo but is an anti-pattern in the general case) + member _.Read epochId = + let decider = resolve epochId + decider.Query id + +let private create resolveStream = + let resolve id = Equinox.Decider(Serilog.Log.ForContext(), streamName id |> resolveStream, maxAttempts = 3) + Service(resolve) + +module MemoryStore = + + let create store = + let cat = Equinox.MemoryStore.MemoryStoreCategory(store, Events.codec, Fold.fold, Fold.initial) + create cat.Resolve diff --git a/equinox-patterns/Domain/Infrastructure.fs b/equinox-patterns/Domain/Infrastructure.fs new file mode 100644 index 000000000..11216ab31 --- /dev/null +++ b/equinox-patterns/Domain/Infrastructure.fs @@ -0,0 +1,26 @@ +[] +module Patterns.Domain.Infrastructure + +/// Buffers events accumulated from a series of decisions while evolving the presented `state` to reflect said proposed `Events` +type Accumulator<'event, 'state>(originState, fold : 'state -> 'event seq -> 'state) = + let pendingEvents = ResizeArray() + let mutable state = originState + + let apply (events : 'event seq) = + pendingEvents.AddRange events + state <- fold state events + + /// Run a decision function, buffering and applying any Events yielded + member _.Transact decide = + let r, events = decide state + apply events + r + + /// Run an Async decision function, buffering and applying any Events yielded + member _.TransactAsync decide = async { + let! r, events = decide state + apply events + return r } + + /// Accumulated events based on the Decisions applied to date + member _.Events = List.ofSeq pendingEvents diff --git a/equinox-patterns/Domain/Types.fs b/equinox-patterns/Domain/Types.fs new file mode 100644 index 000000000..b42507895 --- /dev/null +++ b/equinox-patterns/Domain/Types.fs @@ -0,0 +1,12 @@ +namespace Patterns.Domain + +open FSharp.UMX +type EpochId = int +and [] epochId + +module EpochId = + + let parse (value : int) : EpochId = %value + let tryPrev (value : EpochId) : EpochId option = match %value with 0 -> None | x -> Some %(x - 1) + let next (value : EpochId) : EpochId = %(%value + 1) + let toString (value : EpochId) : string = string %value diff --git a/equinox-patterns/Patterns.sln b/equinox-patterns/Patterns.sln new file mode 100644 index 000000000..43e15bedb --- /dev/null +++ b/equinox-patterns/Patterns.sln @@ -0,0 +1,48 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio 15 +VisualStudioVersion = 15.0.26124.0 +MinimumVisualStudioVersion = 15.0.26124.0 +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Domain", "Domain\Domain.fsproj", "{CD2BEF24-B21D-4454-9769-8BD7C23A7FB6}" +EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Domain.Tests", "Domain.Tests\Domain.Tests.fsproj", "{136AA8F5-A808-46F0-9246-3A3271F3539E}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Debug|x64 = Debug|x64 + Debug|x86 = Debug|x86 + Release|Any CPU = Release|Any CPU + Release|x64 = Release|x64 + Release|x86 = Release|x86 + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {136AA8F5-A808-46F0-9246-3A3271F3539E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {136AA8F5-A808-46F0-9246-3A3271F3539E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {136AA8F5-A808-46F0-9246-3A3271F3539E}.Debug|x64.ActiveCfg = Debug|Any CPU + {136AA8F5-A808-46F0-9246-3A3271F3539E}.Debug|x64.Build.0 = Debug|Any CPU + {136AA8F5-A808-46F0-9246-3A3271F3539E}.Debug|x86.ActiveCfg = Debug|Any CPU + {136AA8F5-A808-46F0-9246-3A3271F3539E}.Debug|x86.Build.0 = Debug|Any CPU + {136AA8F5-A808-46F0-9246-3A3271F3539E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {136AA8F5-A808-46F0-9246-3A3271F3539E}.Release|Any CPU.Build.0 = Release|Any CPU + {136AA8F5-A808-46F0-9246-3A3271F3539E}.Release|x64.ActiveCfg = Release|Any CPU + {136AA8F5-A808-46F0-9246-3A3271F3539E}.Release|x64.Build.0 = Release|Any CPU + {136AA8F5-A808-46F0-9246-3A3271F3539E}.Release|x86.ActiveCfg = Release|Any CPU + {136AA8F5-A808-46F0-9246-3A3271F3539E}.Release|x86.Build.0 = Release|Any CPU + {CD2BEF24-B21D-4454-9769-8BD7C23A7FB6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {CD2BEF24-B21D-4454-9769-8BD7C23A7FB6}.Debug|Any CPU.Build.0 = Debug|Any CPU + {CD2BEF24-B21D-4454-9769-8BD7C23A7FB6}.Debug|x64.ActiveCfg = Debug|Any CPU + {CD2BEF24-B21D-4454-9769-8BD7C23A7FB6}.Debug|x64.Build.0 = Debug|Any CPU + {CD2BEF24-B21D-4454-9769-8BD7C23A7FB6}.Debug|x86.ActiveCfg = Debug|Any CPU + {CD2BEF24-B21D-4454-9769-8BD7C23A7FB6}.Debug|x86.Build.0 = Debug|Any CPU + {CD2BEF24-B21D-4454-9769-8BD7C23A7FB6}.Release|Any CPU.ActiveCfg = Release|Any CPU + {CD2BEF24-B21D-4454-9769-8BD7C23A7FB6}.Release|Any CPU.Build.0 = Release|Any CPU + {CD2BEF24-B21D-4454-9769-8BD7C23A7FB6}.Release|x64.ActiveCfg = Release|Any CPU + {CD2BEF24-B21D-4454-9769-8BD7C23A7FB6}.Release|x64.Build.0 = Release|Any CPU + {CD2BEF24-B21D-4454-9769-8BD7C23A7FB6}.Release|x86.ActiveCfg = Release|Any CPU + {CD2BEF24-B21D-4454-9769-8BD7C23A7FB6}.Release|x86.Build.0 = Release|Any CPU + EndGlobalSection +EndGlobal From 90ba454be8c5c95f9d413ba361ce8492d1910970 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Tue, 4 May 2021 14:17:38 +0100 Subject: [PATCH 02/20] Tidy --- .../Domain.Tests/TipEpochCarryForward.fs | 3 ++- equinox-patterns/Domain/Epoch.fs | 18 +++++------------- 2 files changed, 7 insertions(+), 14 deletions(-) diff --git a/equinox-patterns/Domain.Tests/TipEpochCarryForward.fs b/equinox-patterns/Domain.Tests/TipEpochCarryForward.fs index f139861a9..f7eb1f713 100644 --- a/equinox-patterns/Domain.Tests/TipEpochCarryForward.fs +++ b/equinox-patterns/Domain.Tests/TipEpochCarryForward.fs @@ -11,7 +11,8 @@ let ``Happy path`` () = let store = Equinox.MemoryStore.VolatileStore() let service = MemoryStore.create store let go = Async.RunSynchronously - let add epoch events = service.Transact(EpochId.parse epoch, (fun es _state -> (), es), events) |> go + let decide events _state = None, (), events + let add epoch events = service.Transact(EpochId.parse epoch, decide, events) |> go let read epoch = service.Read(EpochId.parse epoch) |> go add 0 [Events.Added {items = [| "a"; "b" |]}] test <@ Fold.Open [|"a"; "b"|] = read 0 @> diff --git a/equinox-patterns/Domain/Epoch.fs b/equinox-patterns/Domain/Epoch.fs index 07da66cc6..ae1ee38c4 100644 --- a/equinox-patterns/Domain/Epoch.fs +++ b/equinox-patterns/Domain/Epoch.fs @@ -117,11 +117,12 @@ type Service(resolve : EpochId -> Equinox.Decider) = decider.TransactEx((fun c -> decideIngestWithCarryForward rules request c.State), fun r _c -> r) /// Runs the decision function on the specified Epoch, closing and bringing forward balances from preceding Epochs if necessary - member x.Transact(epochId, decide : 'request -> Fold.State -> 'result * Events.Event list, request : 'request) : Async<'result> = + /// Processing completes when `decide` yields None for the residual of the 'request + member x.Transact(epochId, decide : 'request -> Fold.State -> 'request option * 'result * Events.Event list, request : 'request) : Async<'result> = let rec aux epochId getIncoming request = async { let decide req state = - let res, es = decide (Option.get req) state - None, res, es + let residual, result, es = decide (Option.get req) state + residual, result, es match! x.TryTransact(epochId, getIncoming, decide, request) with | { residual = None; result = Some r } -> return r | { residual = r; carryForward = cf } -> return! aux (EpochId.next epochId) (fun () -> async { return Option.get cf }) r } @@ -130,16 +131,7 @@ type Service(resolve : EpochId -> Equinox.Decider) = | None -> calcBalance [||] | Some prevEpochId -> x.Close prevEpochId aux epochId getIncoming (Some request) -(* -/// The result of the overall ingestion, consisting of -type Result<'request, 'result> = - { /// residual of the request, in the event where it was not possible to ingest it completely - residual : 'request - /// The result of the decision (assuming processing took place) - result : 'result option - /// balance being carried forward in the event that the successor epoch has yet to have the BroughtForward event generated - carryForward : Events.Balance option } -*) + /// Exposes the full state to a reader (which is appropriate for a demo but is an anti-pattern in the general case) member _.Read epochId = let decider = resolve epochId From d9368740f16dd58dd2e9339d29a9f146aecf958a Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Tue, 4 May 2021 17:31:34 +0100 Subject: [PATCH 03/20] Better test cases --- .../Domain.Tests/TipEpochCarryForward.fs | 25 +++++++++++++------ equinox-patterns/Domain/Epoch.fs | 18 ++++++------- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/equinox-patterns/Domain.Tests/TipEpochCarryForward.fs b/equinox-patterns/Domain.Tests/TipEpochCarryForward.fs index f7eb1f713..e5d061b18 100644 --- a/equinox-patterns/Domain.Tests/TipEpochCarryForward.fs +++ b/equinox-patterns/Domain.Tests/TipEpochCarryForward.fs @@ -10,14 +10,23 @@ open Xunit let ``Happy path`` () = let store = Equinox.MemoryStore.VolatileStore() let service = MemoryStore.create store - let go = Async.RunSynchronously - let decide events _state = None, (), events - let add epoch events = service.Transact(EpochId.parse epoch, decide, events) |> go - let read epoch = service.Read(EpochId.parse epoch) |> go - add 0 [Events.Added {items = [| "a"; "b" |]}] + let decide items _state = + let apply = Array.truncate 2 items + let overflow = Array.skip apply.Length items + (match overflow with [||] -> None | xs -> Some xs), // Apply max of two events + (), // result + [Events.Added {items = apply }] + let add epoch events = service.Transact(EpochId.parse epoch, decide, events) |> Async.RunSynchronously + let read epoch = service.Read(EpochId.parse epoch) |> Async.RunSynchronously + add 0 [| "a"; "b" |] test <@ Fold.Open [|"a"; "b"|] = read 0 @> - add 1 [Events.Added {items = [| "c"; "d" |]}] + add 1 [| "c"; "d" |] test <@ Fold.Closed ([|"a"; "b"|], [|"a"; "b"|]) = read 0 @> test <@ Fold.Open [|"a"; "b"; "c"; "d" |] = read 1 @> - add 0 [Events.Added {items = [| "e" |]}] - test <@ Fold.Open [|"a"; "b"; "c"; "d"; "e" |] = read 1 @> + let items epoch = read epoch |> Fold.(|Items|) + add 1 [| "e"; "f"; "g" |] // >2 items, therefore triggers an overflow + test <@ [|"a"; "b"; "c"; "d"; "e"; "f" |] = items 1 @> + test <@ [|"a"; "b"; "c"; "d"; "e"; "f"; "g" |] = items 2 @> + test <@ Fold.Initial = read 3 @> + add 3 [| "h" |] + test <@ Fold.Open [|"a"; "b"; "c"; "d"; "e"; "f"; "g"; "h" |] = read 3 @> diff --git a/equinox-patterns/Domain/Epoch.fs b/equinox-patterns/Domain/Epoch.fs index ae1ee38c4..c693a59b9 100644 --- a/equinox-patterns/Domain/Epoch.fs +++ b/equinox-patterns/Domain/Epoch.fs @@ -96,6 +96,7 @@ type Service(resolve : EpochId -> Equinox.Decider) = let calcBalance state = let createEventsBalance items : Events.Balance = { items = items } async { return createEventsBalance state } + let genBalance state = async { let! bal = calcBalance state in return Some bal } /// Walks back as far as necessary to ensure any preceding Epochs that are not yet Closed are, then closes the target if necessary /// Yields the accumulated balance to be carried forward into the next epoch @@ -103,27 +104,25 @@ type Service(resolve : EpochId -> Equinox.Decider) = let rules : Rules = { getIncomingBalance = fun () -> x.Close epochId decideIngestion = fun () _state -> (), (), [] - decideCarryForward = fun () state -> async { let! bal = calcBalance state in return Some bal } } // always close + decideCarryForward = fun () -> genBalance } // always close let decider = resolve epochId decider.TransactEx((fun c -> decideIngestWithCarryForward rules () c.State), fun r _c -> Option.get r.carryForward) /// Runs the decision function on the specified Epoch, closing and bringing forward balances from preceding Epochs if necessary - member private x.TryTransact(epochId, getIncoming, decide : 'request -> Fold.State -> 'request * 'result * Events.Event list, request) : Async> = + member private x.TryTransact(epochId, getIncoming, decide : 'request -> Fold.State -> 'request * 'result * Events.Event list, request, shouldClose) : Async> = let rules : Rules<'request, 'result> = { getIncomingBalance = getIncoming - decideIngestion = fun request state -> let residual, result, events = decide request state in residual, result, events - decideCarryForward = fun _res _state -> async { return None } } // never close + decideIngestion = fun request state -> let residual, result, events = decide request state in residual, result, events + decideCarryForward = fun res state -> async { if shouldClose res then return! genBalance state else return None } } // also close, if we should let decider = resolve epochId decider.TransactEx((fun c -> decideIngestWithCarryForward rules request c.State), fun r _c -> r) /// Runs the decision function on the specified Epoch, closing and bringing forward balances from preceding Epochs if necessary /// Processing completes when `decide` yields None for the residual of the 'request member x.Transact(epochId, decide : 'request -> Fold.State -> 'request option * 'result * Events.Event list, request : 'request) : Async<'result> = - let rec aux epochId getIncoming request = async { - let decide req state = - let residual, result, es = decide (Option.get req) state - residual, result, es - match! x.TryTransact(epochId, getIncoming, decide, request) with + let rec aux epochId getIncoming req = async { + let decide req state = decide (Option.get req) state + match! x.TryTransact(epochId, getIncoming, decide, req, Option.isSome) with | { residual = None; result = Some r } -> return r | { residual = r; carryForward = cf } -> return! aux (EpochId.next epochId) (fun () -> async { return Option.get cf }) r } let getIncoming () = @@ -144,5 +143,6 @@ let private create resolveStream = module MemoryStore = let create store = +// let cat = Equinox.MemoryStore.MemoryStoreCategory(store, FsCodec.Box.Codec.Create(), Fold.fold, Fold.initial) let cat = Equinox.MemoryStore.MemoryStoreCategory(store, Events.codec, Fold.fold, Fold.initial) create cat.Resolve From f445b586fd11244d18a890b5d32bebcb13009b4d Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Tue, 4 May 2021 17:36:05 +0100 Subject: [PATCH 04/20] rename to Period --- .../Domain.Tests/Domain.Tests.fsproj | 2 +- ...ryForward.fs => PeriodsCarryingForward.fs} | 10 +-- equinox-patterns/Domain/Domain.fsproj | 2 +- .../Domain/{Epoch.fs => Period.fs} | 69 +++++++++---------- equinox-patterns/Domain/Types.fs | 14 ++-- 5 files changed, 48 insertions(+), 49 deletions(-) rename equinox-patterns/Domain.Tests/{TipEpochCarryForward.fs => PeriodsCarryingForward.fs} (77%) rename equinox-patterns/Domain/{Epoch.fs => Period.fs} (67%) diff --git a/equinox-patterns/Domain.Tests/Domain.Tests.fsproj b/equinox-patterns/Domain.Tests/Domain.Tests.fsproj index c63ab4cfd..2c9fc12dc 100644 --- a/equinox-patterns/Domain.Tests/Domain.Tests.fsproj +++ b/equinox-patterns/Domain.Tests/Domain.Tests.fsproj @@ -8,7 +8,7 @@ - + diff --git a/equinox-patterns/Domain.Tests/TipEpochCarryForward.fs b/equinox-patterns/Domain.Tests/PeriodsCarryingForward.fs similarity index 77% rename from equinox-patterns/Domain.Tests/TipEpochCarryForward.fs rename to equinox-patterns/Domain.Tests/PeriodsCarryingForward.fs index e5d061b18..634730331 100644 --- a/equinox-patterns/Domain.Tests/TipEpochCarryForward.fs +++ b/equinox-patterns/Domain.Tests/PeriodsCarryingForward.fs @@ -1,8 +1,8 @@ /// This test and implementation pairing demonstrates how one might accomplish a pattern -module Patterns.Domain.Tests.TipEpochCarryForward +module Patterns.Domain.Tests.PeriodsCarryingForward open Patterns.Domain -open Patterns.Domain.Epoch +open Patterns.Domain.Period open Swensen.Unquote open Xunit @@ -16,14 +16,14 @@ let ``Happy path`` () = (match overflow with [||] -> None | xs -> Some xs), // Apply max of two events (), // result [Events.Added {items = apply }] - let add epoch events = service.Transact(EpochId.parse epoch, decide, events) |> Async.RunSynchronously - let read epoch = service.Read(EpochId.parse epoch) |> Async.RunSynchronously + let add period events = service.Transact(PeriodId.parse period, decide, events) |> Async.RunSynchronously + let read period = service.Read(PeriodId.parse period) |> Async.RunSynchronously add 0 [| "a"; "b" |] test <@ Fold.Open [|"a"; "b"|] = read 0 @> add 1 [| "c"; "d" |] test <@ Fold.Closed ([|"a"; "b"|], [|"a"; "b"|]) = read 0 @> test <@ Fold.Open [|"a"; "b"; "c"; "d" |] = read 1 @> - let items epoch = read epoch |> Fold.(|Items|) + let items period = read period |> Fold.(|Items|) add 1 [| "e"; "f"; "g" |] // >2 items, therefore triggers an overflow test <@ [|"a"; "b"; "c"; "d"; "e"; "f" |] = items 1 @> test <@ [|"a"; "b"; "c"; "d"; "e"; "f"; "g" |] = items 2 @> diff --git a/equinox-patterns/Domain/Domain.fsproj b/equinox-patterns/Domain/Domain.fsproj index bc2b04d0e..cfe50e374 100644 --- a/equinox-patterns/Domain/Domain.fsproj +++ b/equinox-patterns/Domain/Domain.fsproj @@ -7,7 +7,7 @@ - + diff --git a/equinox-patterns/Domain/Epoch.fs b/equinox-patterns/Domain/Period.fs similarity index 67% rename from equinox-patterns/Domain/Epoch.fs rename to equinox-patterns/Domain/Period.fs index c693a59b9..568318971 100644 --- a/equinox-patterns/Domain/Epoch.fs +++ b/equinox-patterns/Domain/Period.fs @@ -1,12 +1,12 @@ -/// Illustrates a high level approach to how one might manage a chained series of epochs which can be logically Closed -/// When the target Epoch is Closed, all write attempts attempts are required to adhere to a protocol consisting of -/// a) all preceding epochs must be closed, idempotently computing and persisting or honoring a previous computed balance -/// b) the decision is processed within the target epoch (which may be either Open, or being opened as part of this flow) -/// c) if appropriate, the target epoch may be closed as part of the same decision flow if `decideCarryForward` yields Some -module Patterns.Domain.Epoch +/// Illustrates a high level approach to how one might manage a chained series of periods which can be logically Closed +/// All write attempts adhere to a common protocol to effect this semantic:- +/// a) all preceding periods must be closed, idempotently i) computing and persisting a balance OR ii) honoring a previously recorded one +/// b) the decision is processed within the target period (which may be either Open, or being opened as part of this flow) +/// c) if appropriate, the target period may be closed as part of the same decision flow if `decideCarryForward` yields Some +module Patterns.Domain.Period -let [] Category = "Epoch" -let streamName epochId = FsCodec.StreamName.create Category (EpochId.toString epochId) +let [] Category = "Period" +let streamName periodId = FsCodec.StreamName.create Category (PeriodId.toString periodId) // NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care module Events = @@ -38,7 +38,7 @@ module Fold = | CarriedForward e -> Closed (items, e.items) let fold = Seq.fold evolve - /// Handles one-time opening of the Epoch, if applicable + /// Handles one-time opening of the Period, if applicable let maybeOpen (getIncomingBalance : unit -> Async) state = async { match state with | Initial -> let! balance = getIncomingBalance () @@ -75,13 +75,13 @@ type Result<'request, 'result> = residual : 'request /// The result of the decision (assuming processing took place) result : 'result option - /// balance being carried forward in the event that the successor epoch has yet to have the BroughtForward event generated + /// balance being carried forward in the event that the successor period has yet to have the BroughtForward event generated carryForward : Events.Balance option } -/// Decision function ensuring the high level rules of an Epoch are adhered to viz. -/// 1. Streams must open with a BroughtForward event (obtained via Rules.getIncomingBalance if this is an uninitialized Epoch) -/// 2. (If the Epoch has not closed) Rules.decide gets to map the request to events and a residual -/// 3. Rules.decideCarryForward may trigger the closing of the Epoch based on the residual and the stream State by emitting Some balance +/// Decision function ensuring the high level rules of an Period are adhered to viz. +/// 1. Streams must open with a BroughtForward event (obtained via Rules.getIncomingBalance if this is an uninitialized Period) +/// 2. (If the Period has not closed) Rules.decide gets to map the request to events and a residual +/// 3. Rules.decideCarryForward may trigger the closing of the Period based on the residual and/or the State by emitting Some balance let decideIngestWithCarryForward rules req s : Async * Events.Event list> = async { let acc = Accumulator(s, Fold.fold) do! acc.TransactAsync(Fold.maybeOpen rules.getIncomingBalance) @@ -90,50 +90,50 @@ let decideIngestWithCarryForward rules req s : Async * Eve return { residual = residual; result = result; carryForward = carryForward }, acc.Events } -/// Manages Application of Requests to the Epoch's stream, including closing preceding epochs as appropriate -type Service(resolve : EpochId -> Equinox.Decider) = +/// Manages Application of Requests to the Period's stream, including closing preceding periods as appropriate +type Service(resolve : PeriodId -> Equinox.Decider) = let calcBalance state = let createEventsBalance items : Events.Balance = { items = items } async { return createEventsBalance state } let genBalance state = async { let! bal = calcBalance state in return Some bal } - /// Walks back as far as necessary to ensure any preceding Epochs that are not yet Closed are, then closes the target if necessary - /// Yields the accumulated balance to be carried forward into the next epoch - member private x.Close epochId : Async = + /// Walks back as far as necessary to ensure any preceding Periods that are not yet Closed are, then closes the target if necessary + /// Yields the accumulated balance to be carried forward into the next period + member private x.Close periodId : Async = let rules : Rules = - { getIncomingBalance = fun () -> x.Close epochId + { getIncomingBalance = fun () -> x.Close periodId decideIngestion = fun () _state -> (), (), [] decideCarryForward = fun () -> genBalance } // always close - let decider = resolve epochId + let decider = resolve periodId decider.TransactEx((fun c -> decideIngestWithCarryForward rules () c.State), fun r _c -> Option.get r.carryForward) - /// Runs the decision function on the specified Epoch, closing and bringing forward balances from preceding Epochs if necessary - member private x.TryTransact(epochId, getIncoming, decide : 'request -> Fold.State -> 'request * 'result * Events.Event list, request, shouldClose) : Async> = + /// Runs the decision function on the specified Period, closing and bringing forward balances from preceding Periods if necessary + member private x.TryTransact(periodId, getIncoming, decide : 'request -> Fold.State -> 'request * 'result * Events.Event list, request, shouldClose) : Async> = let rules : Rules<'request, 'result> = { getIncomingBalance = getIncoming decideIngestion = fun request state -> let residual, result, events = decide request state in residual, result, events decideCarryForward = fun res state -> async { if shouldClose res then return! genBalance state else return None } } // also close, if we should - let decider = resolve epochId + let decider = resolve periodId decider.TransactEx((fun c -> decideIngestWithCarryForward rules request c.State), fun r _c -> r) - /// Runs the decision function on the specified Epoch, closing and bringing forward balances from preceding Epochs if necessary + /// Runs the decision function on the specified Period, closing and bringing forward balances from preceding Periods if necessary /// Processing completes when `decide` yields None for the residual of the 'request - member x.Transact(epochId, decide : 'request -> Fold.State -> 'request option * 'result * Events.Event list, request : 'request) : Async<'result> = - let rec aux epochId getIncoming req = async { + member x.Transact(periodId, decide : 'request -> Fold.State -> 'request option * 'result * Events.Event list, request : 'request) : Async<'result> = + let rec aux periodId getIncoming req = async { let decide req state = decide (Option.get req) state - match! x.TryTransact(epochId, getIncoming, decide, req, Option.isSome) with + match! x.TryTransact(periodId, getIncoming, decide, req, Option.isSome) with | { residual = None; result = Some r } -> return r - | { residual = r; carryForward = cf } -> return! aux (EpochId.next epochId) (fun () -> async { return Option.get cf }) r } + | { residual = r; carryForward = cf } -> return! aux (PeriodId.next periodId) (fun () -> async { return Option.get cf }) r } let getIncoming () = - match EpochId.tryPrev epochId with + match PeriodId.tryPrev periodId with | None -> calcBalance [||] - | Some prevEpochId -> x.Close prevEpochId - aux epochId getIncoming (Some request) + | Some prevPeriodId -> x.Close prevPeriodId + aux periodId getIncoming (Some request) /// Exposes the full state to a reader (which is appropriate for a demo but is an anti-pattern in the general case) - member _.Read epochId = - let decider = resolve epochId + member _.Read periodId = + let decider = resolve periodId decider.Query id let private create resolveStream = @@ -143,6 +143,5 @@ let private create resolveStream = module MemoryStore = let create store = -// let cat = Equinox.MemoryStore.MemoryStoreCategory(store, FsCodec.Box.Codec.Create(), Fold.fold, Fold.initial) let cat = Equinox.MemoryStore.MemoryStoreCategory(store, Events.codec, Fold.fold, Fold.initial) create cat.Resolve diff --git a/equinox-patterns/Domain/Types.fs b/equinox-patterns/Domain/Types.fs index b42507895..50b2540b7 100644 --- a/equinox-patterns/Domain/Types.fs +++ b/equinox-patterns/Domain/Types.fs @@ -1,12 +1,12 @@ namespace Patterns.Domain open FSharp.UMX -type EpochId = int -and [] epochId +type PeriodId = int +and [] periodId -module EpochId = +module PeriodId = - let parse (value : int) : EpochId = %value - let tryPrev (value : EpochId) : EpochId option = match %value with 0 -> None | x -> Some %(x - 1) - let next (value : EpochId) : EpochId = %(%value + 1) - let toString (value : EpochId) : string = string %value + let parse (value : int) : PeriodId = %value + let tryPrev (value : PeriodId) : PeriodId option = match %value with 0 -> None | x -> Some %(x - 1) + let next (value : PeriodId) : PeriodId = %(%value + 1) + let toString (value : PeriodId) : string = string %value From 3089549a5c488872cb1f69fbb37f2299f3897e76 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Tue, 4 May 2021 18:47:29 +0100 Subject: [PATCH 05/20] Readme/tidying --- README.md | 3 +++ dotnet-templates.sln | 1 + .../.template.config/template.json | 5 +++- .../Domain.Tests/PeriodsCarryingForward.fs | 2 +- equinox-patterns/README.md | 26 +++++++++++++++++++ 5 files changed, 35 insertions(+), 2 deletions(-) create mode 100644 equinox-patterns/README.md diff --git a/README.md b/README.md index 365f0bdb3..4b407fa8b 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,9 @@ These templates focus solely on Consistent Processing using Equinox Stores: - [`eqxweb`](equinox-web/README.md) - Boilerplate for an ASP .NET Core 2 Web App, with an associated storage-independent Domain project using [Equinox](https://github.com/jet/equinox). - [`eqxwebcs`](equinox-web-csharp/README.md) - Boilerplate for an ASP .NET Core 2 Web App, with an associated storage-independent Domain project using [Equinox](https://github.com/jet/equinox), _ported to C#_. - [`eqxtestbed`](equinox-testbed/README.md) - Host that allows running back-to-back benchmarks when prototyping models using [Equinox](https://github.com/jet/equinox), using different stores and/or store configuration parameters. +- [`eqxPatterns`](equinox-patterns/README.md) - Equinox Skeleton Deciders and Tests implementing various event sourcing patterns: + - [x] Period (with Rolling Balance carried forward) + - [ ] Epochs/Series with deduplication ## [Propulsion](https://github.com/jet/propulsion) related diff --git a/dotnet-templates.sln b/dotnet-templates.sln index 4c1cd7e25..07fed446a 100644 --- a/dotnet-templates.sln +++ b/dotnet-templates.sln @@ -114,6 +114,7 @@ EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "eqxPatterns", "eqxPatterns", "{76721F1E-851C-4970-A276-DF61FCE3DA23}" ProjectSection(SolutionItems) = preProject equinox-patterns\.template.config\template.json = equinox-patterns\.template.config\template.json + equinox-patterns\README.md = equinox-patterns\README.md EndProjectSection EndProject Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Domain", "equinox-patterns\Domain\Domain.fsproj", "{8D9867A9-1B5D-4AD3-A890-ACC81D011C00}" diff --git a/equinox-patterns/.template.config/template.json b/equinox-patterns/.template.config/template.json index ae9dce542..cc83d7f40 100644 --- a/equinox-patterns/.template.config/template.json +++ b/equinox-patterns/.template.config/template.json @@ -4,7 +4,10 @@ "classifications": [ "Equinox", "Event Sourcing", - "Closing Books" + "Closing Books", + "Epoch", + "Series", + "Period" ], "tags": { "language": "F#", diff --git a/equinox-patterns/Domain.Tests/PeriodsCarryingForward.fs b/equinox-patterns/Domain.Tests/PeriodsCarryingForward.fs index 634730331..5d0b02504 100644 --- a/equinox-patterns/Domain.Tests/PeriodsCarryingForward.fs +++ b/equinox-patterns/Domain.Tests/PeriodsCarryingForward.fs @@ -1,4 +1,4 @@ -/// This test and implementation pairing demonstrates how one might accomplish a pattern +/// Integration suite for `Period` module Patterns.Domain.Tests.PeriodsCarryingForward open Patterns.Domain diff --git a/equinox-patterns/README.md b/equinox-patterns/README.md new file mode 100644 index 000000000..dc7dfea03 --- /dev/null +++ b/equinox-patterns/README.md @@ -0,0 +1,26 @@ +# Equinox Patterns + +This template provides a grab-bag of example Deciders, including illustrations of the following generic techniques: + +## `Period` with Rolling Balance carried forward + +Consists of: + +- `Period`: a Decider that manages ingestion into a chain of periods with each one + a) Carrying Forward a Balance from its immediate predecessor `Period`; then + b) being open for transactions for a period of time; ultimately + c) carrying forward a closing balance to its successor Period + +Notes: +- A given `Period` can thus be read without any need to load any preceding periods, as by definition, all relevant information has been `CarriedForward` +- Any `Period` is guaranteed to have been initialized; thus the preceding epochs can safely be archived the moment the first event has been written to a given `Period` + +## Epochs/Series with deduplication + +Consists of: + +- `Epoch`: A given atomic set of items that have been ingested. May be closed at an arbitrary point in time by any writer. +- `Series`: Records the identifier of the current active Epoch of the series. + +Notes: +- The deduplication logic is such that the ingestion logic can, given a starting Epoch Id, can 100% guarantee exactly a single copy of the item will be stored in the series as a whole. From ea7cda6c2bd2efa1256d1808afb3c9f57d089b18 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Tue, 4 May 2021 18:54:33 +0100 Subject: [PATCH 06/20] Add test --- tests/Equinox.Templates.Tests/DotnetBuild.fs | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/Equinox.Templates.Tests/DotnetBuild.fs b/tests/Equinox.Templates.Tests/DotnetBuild.fs index cfab3a937..834cbe9dc 100644 --- a/tests/Equinox.Templates.Tests/DotnetBuild.fs +++ b/tests/Equinox.Templates.Tests/DotnetBuild.fs @@ -49,6 +49,7 @@ type DotnetBuild(output : ITestOutputHelper, folder : EquinoxTemplatesFixture) = let [] eqxTestbed () = run "eqxTestbed" [] let [] eqxShipping () = run "eqxShipping" [] + let [] eqxPatterns () = run "eqxPatterns" [] [)>] let [] proProjector args = run "proProjector" args let [] proProjectorSynth () = run "proProjector" ["--source cosmos"; "--kafka"; "--synthesizeSequence"] From 30519743f75a99a16864a3511c192e5ae14bc69c Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Tue, 4 May 2021 18:56:47 +0100 Subject: [PATCH 07/20] Add changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 546cb6fa1..384bce275 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Added +- `eqxPatterns`: `Period`: Skeleton Deciders+Tests for `Period` with Rolling Balance [#89](https://github.com/jet/dotnet-templates/pull/89) +- `eqxPatterns`: `Series`+`Epoch`: Skeleton Deciders+Tests for deduplicated ingestion of items [#89](https://github.com/jet/dotnet-templates/pull/89) - `eqxProjector --source cosmos --kafka --synthesizeSequence`: Sample code for custom parsing of document changes [#84](https://github.com/jet/dotnet-templates/pull/84) ### Changed From b8a079885eda620792cbfb9ef3969bffcc728f62 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Thu, 6 May 2021 08:16:07 +0100 Subject: [PATCH 08/20] Remove private members --- equinox-patterns/Domain/Period.fs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/equinox-patterns/Domain/Period.fs b/equinox-patterns/Domain/Period.fs index 568318971..cac3c3cfc 100644 --- a/equinox-patterns/Domain/Period.fs +++ b/equinox-patterns/Domain/Period.fs @@ -100,16 +100,16 @@ type Service(resolve : PeriodId -> Equinox.Decider) = /// Walks back as far as necessary to ensure any preceding Periods that are not yet Closed are, then closes the target if necessary /// Yields the accumulated balance to be carried forward into the next period - member private x.Close periodId : Async = + let rec close periodId : Async = let rules : Rules = - { getIncomingBalance = fun () -> x.Close periodId + { getIncomingBalance = fun () -> close periodId decideIngestion = fun () _state -> (), (), [] decideCarryForward = fun () -> genBalance } // always close let decider = resolve periodId decider.TransactEx((fun c -> decideIngestWithCarryForward rules () c.State), fun r _c -> Option.get r.carryForward) /// Runs the decision function on the specified Period, closing and bringing forward balances from preceding Periods if necessary - member private x.TryTransact(periodId, getIncoming, decide : 'request -> Fold.State -> 'request * 'result * Events.Event list, request, shouldClose) : Async> = + let tryTransact periodId getIncoming (decide : 'request -> Fold.State -> 'request * 'result * Events.Event list) request shouldClose : Async> = let rules : Rules<'request, 'result> = { getIncomingBalance = getIncoming decideIngestion = fun request state -> let residual, result, events = decide request state in residual, result, events @@ -119,16 +119,16 @@ type Service(resolve : PeriodId -> Equinox.Decider) = /// Runs the decision function on the specified Period, closing and bringing forward balances from preceding Periods if necessary /// Processing completes when `decide` yields None for the residual of the 'request - member x.Transact(periodId, decide : 'request -> Fold.State -> 'request option * 'result * Events.Event list, request : 'request) : Async<'result> = + member _.Transact(periodId, decide : 'request -> Fold.State -> 'request option * 'result * Events.Event list, request : 'request) : Async<'result> = let rec aux periodId getIncoming req = async { let decide req state = decide (Option.get req) state - match! x.TryTransact(periodId, getIncoming, decide, req, Option.isSome) with + match! tryTransact periodId getIncoming decide req Option.isSome with | { residual = None; result = Some r } -> return r | { residual = r; carryForward = cf } -> return! aux (PeriodId.next periodId) (fun () -> async { return Option.get cf }) r } let getIncoming () = match PeriodId.tryPrev periodId with | None -> calcBalance [||] - | Some prevPeriodId -> x.Close prevPeriodId + | Some prevPeriodId -> close prevPeriodId aux periodId getIncoming (Some request) /// Exposes the full state to a reader (which is appropriate for a demo but is an anti-pattern in the general case) From b36514c390116944bc0af85ea207a663196c3315 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Thu, 6 May 2021 08:24:45 +0100 Subject: [PATCH 09/20] Tidy --- equinox-patterns/Domain/Period.fs | 2 +- equinox-patterns/Domain/Types.fs | 1 + tests/Equinox.Templates.Tests/DotnetBuild.fs | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/equinox-patterns/Domain/Period.fs b/equinox-patterns/Domain/Period.fs index cac3c3cfc..686b3ad7a 100644 --- a/equinox-patterns/Domain/Period.fs +++ b/equinox-patterns/Domain/Period.fs @@ -91,7 +91,7 @@ let decideIngestWithCarryForward rules req s : Async * Eve } /// Manages Application of Requests to the Period's stream, including closing preceding periods as appropriate -type Service(resolve : PeriodId -> Equinox.Decider) = +type Service internal (resolve : PeriodId -> Equinox.Decider) = let calcBalance state = let createEventsBalance items : Events.Balance = { items = items } diff --git a/equinox-patterns/Domain/Types.fs b/equinox-patterns/Domain/Types.fs index 50b2540b7..8dab85937 100644 --- a/equinox-patterns/Domain/Types.fs +++ b/equinox-patterns/Domain/Types.fs @@ -1,6 +1,7 @@ namespace Patterns.Domain open FSharp.UMX + type PeriodId = int and [] periodId diff --git a/tests/Equinox.Templates.Tests/DotnetBuild.fs b/tests/Equinox.Templates.Tests/DotnetBuild.fs index 834cbe9dc..c02b8ed00 100644 --- a/tests/Equinox.Templates.Tests/DotnetBuild.fs +++ b/tests/Equinox.Templates.Tests/DotnetBuild.fs @@ -47,9 +47,9 @@ type DotnetBuild(output : ITestOutputHelper, folder : EquinoxTemplatesFixture) = let [] ``*pending*`` () = run "proProjector" ["--source cosmos"; "--kafka"; "--synthesizeSequence"] #endif + let [] eqxPatterns () = run "eqxPatterns" [] let [] eqxTestbed () = run "eqxTestbed" [] let [] eqxShipping () = run "eqxShipping" [] - let [] eqxPatterns () = run "eqxPatterns" [] [)>] let [] proProjector args = run "proProjector" args let [] proProjectorSynth () = run "proProjector" ["--source cosmos"; "--kafka"; "--synthesizeSequence"] From 8821bb1a8951481223c6ff6c8e668c8932639e8d Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Thu, 6 May 2021 09:12:22 +0100 Subject: [PATCH 10/20] ItemId --- .../Domain.Tests/PeriodsCarryingForward.fs | 21 ++++++------ equinox-patterns/Domain/Period.fs | 6 ++-- equinox-patterns/Domain/Types.fs | 32 ++++++++++++++++++- 3 files changed, 45 insertions(+), 14 deletions(-) diff --git a/equinox-patterns/Domain.Tests/PeriodsCarryingForward.fs b/equinox-patterns/Domain.Tests/PeriodsCarryingForward.fs index 5d0b02504..2921bbe26 100644 --- a/equinox-patterns/Domain.Tests/PeriodsCarryingForward.fs +++ b/equinox-patterns/Domain.Tests/PeriodsCarryingForward.fs @@ -3,6 +3,7 @@ module Patterns.Domain.Tests.PeriodsCarryingForward open Patterns.Domain open Patterns.Domain.Period +open FSharp.UMX open Swensen.Unquote open Xunit @@ -18,15 +19,15 @@ let ``Happy path`` () = [Events.Added {items = apply }] let add period events = service.Transact(PeriodId.parse period, decide, events) |> Async.RunSynchronously let read period = service.Read(PeriodId.parse period) |> Async.RunSynchronously - add 0 [| "a"; "b" |] - test <@ Fold.Open [|"a"; "b"|] = read 0 @> - add 1 [| "c"; "d" |] - test <@ Fold.Closed ([|"a"; "b"|], [|"a"; "b"|]) = read 0 @> - test <@ Fold.Open [|"a"; "b"; "c"; "d" |] = read 1 @> + add 0 [| %"a"; %"b" |] + test <@ Fold.Open [| %"a"; %"b"|] = read 0 @> + add 1 [| %"c"; %"d" |] + test <@ Fold.Closed ([| %"a"; %"b"|], [| %"a"; %"b"|]) = read 0 @> + test <@ Fold.Open [| %"a"; %"b"; %"c"; %"d" |] = read 1 @> let items period = read period |> Fold.(|Items|) - add 1 [| "e"; "f"; "g" |] // >2 items, therefore triggers an overflow - test <@ [|"a"; "b"; "c"; "d"; "e"; "f" |] = items 1 @> - test <@ [|"a"; "b"; "c"; "d"; "e"; "f"; "g" |] = items 2 @> + add 1 [| %"e"; %"f"; %"g" |] // >2 items, therefore triggers an overflow + test <@ [| %"a"; %"b"; %"c"; %"d"; %"e"; %"f" |] = items 1 @> + test <@ [| %"a"; %"b"; %"c"; %"d"; %"e"; %"f"; %"g" |] = items 2 @> test <@ Fold.Initial = read 3 @> - add 3 [| "h" |] - test <@ Fold.Open [|"a"; "b"; "c"; "d"; "e"; "f"; "g"; "h" |] = read 3 @> + add 3 [| %"h" |] + test <@ Fold.Open [| %"a"; %"b"; %"c"; %"d"; %"e"; %"f"; %"g"; %"h" |] = read 3 @> diff --git a/equinox-patterns/Domain/Period.fs b/equinox-patterns/Domain/Period.fs index 686b3ad7a..6cc1d4384 100644 --- a/equinox-patterns/Domain/Period.fs +++ b/equinox-patterns/Domain/Period.fs @@ -11,7 +11,7 @@ let streamName periodId = FsCodec.StreamName.create Category (PeriodId.toString // NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care module Events = - type ItemIds = { items : string[] } + type ItemIds = { items : ItemId[] } type Balance = ItemIds type Event = | BroughtForward of Balance @@ -26,8 +26,8 @@ module Fold = type State = | Initial | Open of items : OpenState - | Closed of items : string[] * carryingForward : string[] - and OpenState = string[] + | Closed of items : ItemId[] * carryingForward : ItemId[] + and OpenState = ItemId[] let initial : State = Initial let (|Items|) = function Initial -> [||] | Open i | Closed (i, _) -> i open Events diff --git a/equinox-patterns/Domain/Types.fs b/equinox-patterns/Domain/Types.fs index 8dab85937..6ab31a7de 100644 --- a/equinox-patterns/Domain/Types.fs +++ b/equinox-patterns/Domain/Types.fs @@ -4,10 +4,40 @@ open FSharp.UMX type PeriodId = int and [] periodId - module PeriodId = let parse (value : int) : PeriodId = %value let tryPrev (value : PeriodId) : PeriodId option = match %value with 0 -> None | x -> Some %(x - 1) let next (value : PeriodId) : PeriodId = %(%value + 1) let toString (value : PeriodId) : string = string %value + +type ItemId = string +and [] itemId +module ItemId = + + let parse (value : string) : ItemId = %value + let toString (value : ItemId) : string = %value + +type TrancheId = string +and [] trancheId +module TrancheId = + + let parse (value : string) : TrancheId = %value + let toString (value : TrancheId) : string = %value + +type EpochId = int +and [] epochId +module EpochId = + +// let unknown = -1 +// let initial = 0 +// let next (value : EpochId) : EpochId = % (%value + 1) +// let value (value : EpochId) : int = %value + let toString (value : EpochId) : string = string %value + +type [] seriesId +type SeriesId = string +module SeriesId = + + let wellKnownId : SeriesId = % "0" + let toString (value : SeriesId) : string = %value From 8ba66e72d0b51a7d90d27a5cf886a1f8883e9427 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Thu, 6 May 2021 12:03:32 +0100 Subject: [PATCH 11/20] Epoch/Series/Ingester initial impl --- README.md | 4 +- equinox-patterns/Domain/Domain.fsproj | 7 +- equinox-patterns/Domain/Epoch.fs | 142 ++++++++++++++++++++++ equinox-patterns/Domain/Infrastructure.fs | 17 +++ equinox-patterns/Domain/Ingester.fs | 128 +++++++++++++++++++ equinox-patterns/Domain/Period.fs | 4 +- equinox-patterns/Domain/Series.fs | 78 ++++++++++++ equinox-patterns/Domain/Types.fs | 36 ++++-- 8 files changed, 400 insertions(+), 16 deletions(-) create mode 100644 equinox-patterns/Domain/Epoch.fs create mode 100644 equinox-patterns/Domain/Ingester.fs create mode 100644 equinox-patterns/Domain/Series.fs diff --git a/README.md b/README.md index 4b407fa8b..dde272ace 100644 --- a/README.md +++ b/README.md @@ -10,8 +10,8 @@ These templates focus solely on Consistent Processing using Equinox Stores: - [`eqxwebcs`](equinox-web-csharp/README.md) - Boilerplate for an ASP .NET Core 2 Web App, with an associated storage-independent Domain project using [Equinox](https://github.com/jet/equinox), _ported to C#_. - [`eqxtestbed`](equinox-testbed/README.md) - Host that allows running back-to-back benchmarks when prototyping models using [Equinox](https://github.com/jet/equinox), using different stores and/or store configuration parameters. - [`eqxPatterns`](equinox-patterns/README.md) - Equinox Skeleton Deciders and Tests implementing various event sourcing patterns: - - [x] Period (with Rolling Balance carried forward) - - [ ] Epochs/Series with deduplication + - [x] Periods with Rolling Balance carried forward + - [x] Epochs/Series/Ingester with deduplication ## [Propulsion](https://github.com/jet/propulsion) related diff --git a/equinox-patterns/Domain/Domain.fsproj b/equinox-patterns/Domain/Domain.fsproj index cfe50e374..797918f02 100644 --- a/equinox-patterns/Domain/Domain.fsproj +++ b/equinox-patterns/Domain/Domain.fsproj @@ -1,17 +1,22 @@  - netstandard2.0 + + netcoreapp3.1 + + + + diff --git a/equinox-patterns/Domain/Epoch.fs b/equinox-patterns/Domain/Epoch.fs new file mode 100644 index 000000000..4fbc80f0b --- /dev/null +++ b/equinox-patterns/Domain/Epoch.fs @@ -0,0 +1,142 @@ +module Patterns.Domain.Epoch + +let [] Category = "Epoch" +let streamName (trancheId : TrancheId, epochId : EpochId) = FsCodec.StreamName.compose Category [TrancheId.toString trancheId; EpochId.toString epochId] + +// NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care +[] +module Events = + + type Ingested = { items : Item[] } + and Item = { id : ItemId; payload : string } + type Snapshotted = { ids : ItemId[]; closed : bool } + type Event = + | Ingested of Ingested + | Closed + | Snapshotted of Snapshotted + interface TypeShape.UnionContract.IUnionContract + let codec = FsCodec.NewtonsoftJson.Codec.Create() + +let itemId (x : Events.Item) : ItemId = x.id +let (|ItemIds|) : Events.Item[] -> ItemId[] = Array.map itemId + +module Fold = + + type State = ItemId[] * bool + let initial = [||], false + let evolve (ids, closed) = function + | Events.Ingested { items = ItemIds ingestedIds } -> (Array.append ids ingestedIds, closed) + | Events.Closed -> (ids, true) + | Events.Snapshotted e -> (e.ids, e.closed) + let fold : State -> Events.Event seq -> State = Seq.fold evolve + + let isOrigin = function Events.Snapshotted _ -> true | _ -> false + let toSnapshot (ids, closed) = Events.Snapshotted { ids = ids; closed = closed } + +let notAlreadyIn (ids : ItemId seq) = + let ids = System.Collections.Generic.HashSet ids + fun (x : Events.Item) -> (not << ids.Contains) x.id + +type Result = { accepted : ItemId[]; rejected : Events.Item[]; content : ItemId[]; closed : bool } + +// Note there aren't ever rejected Items in this implementation; the size of an epoch may actually exceed the capacity +// Pros for not rejecting: +// - snapshots should compress well +// - we want to avoid a second roundtrip +// - splitting a batched write into multiple writes with multiple events misrepresents the facts +// i.e. we did not have 10 items 2s ago and 3 just now - we had 13 2s ago +let decide capacity candidates (currentIds, closed as state) = + match closed, candidates |> Array.filter (notAlreadyIn currentIds) with + | true, freshCandidates -> { accepted = [||]; rejected = freshCandidates; content = currentIds; closed = closed }, [] + | false, [||] -> { accepted = [||]; rejected = [||]; content = currentIds; closed = closed }, [] + | false, freshItems -> + let events = + let closingNow = Array.length currentIds + Array.length freshItems >= capacity + let maybeClosingEvent = if closingNow then [ Events.Closed ] else [] + Events.Ingested { items = freshItems } :: maybeClosingEvent + let currentIds, closed = Fold.fold state events + let (ItemIds addedItemIds) = freshItems + { accepted = addedItemIds; rejected = [||]; content = currentIds; closed = closed }, events + +(* Alternate implementation of the `decide` above employing the `Accumulator` helper. + While it's a subjective thing, the takeaway should be that an accumulator rarely buys one code clarity *) +let equivalentDecideUsingAnAccumulator capacity candidates (currentIds, closed as initialState) = + match closed, candidates |> Array.filter (notAlreadyIn currentIds) with + | true, freshCandidates -> { accepted = [||]; rejected = freshCandidates; content = currentIds; closed = closed }, [] + | false, [||] -> { accepted = [||]; rejected = [||]; content = currentIds; closed = closed }, [] + | false, freshItems -> + let acc = Accumulator(initialState, Fold.fold) + acc.Transact(fun _ -> [ Events.Ingested { items = freshItems } ]) + let currentIds, closed = + acc.Transact(fun (currentIds, _) -> + let closing = Array.length currentIds >= capacity + ((currentIds, closing), if closing then [Events.Closed] else [])) + let (ItemIds addedItemIds) = freshItems + { accepted = addedItemIds; rejected = [||]; content = currentIds; closed = closed }, acc.Events + +/// Used by the Ingester to manages ingestion of items into the epoch, i.e. the Write side +type IngestionService internal (capacity, resolve : TrancheId * EpochId -> Equinox.Decider) = + + /// Obtains a complete list of all the items in the specified trancheId/epochId + member _.ReadIds(trancheId, epochId) : Async = + let decider = resolve (trancheId, epochId) + decider.Query fst + + /// Ingest the supplied items. Yields relevant elements of the post-state to enable generation of stats + /// and facilitate deduplication of incoming items in order to avoid null store round-trips where possible + member _.Ingest(trancheId, epochId, items) : Async = + let decider = resolve (trancheId, epochId) + decider.Transact(decide capacity items) + +let private create capacity resolveStream = + let resolve = streamName >> resolveStream Equinox.AllowStale >> Equinox.createDecider + IngestionService(capacity, resolve) + +module Cosmos = + + open Equinox.CosmosStore + + let accessStrategy = AccessStrategy.Snapshot (Fold.isOrigin, Fold.toSnapshot) + let private resolve (context, cache) = + let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + let resolver = CosmosStoreCategory(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy) + fun opt sn -> resolver.Resolve(sn, opt) + let create capacity (context, cache) = + create capacity (resolve (context, cache)) + +/// Custom Fold and caching logic compared to the IngesterService +/// - When reading, we want the full Items +/// - Caching only for one minute +/// - There's no value in using the snapshot +module Reader = + + type ReadState = Events.Item[] * bool + let initial = [||], false + let evolve (es, closed as state) = function + | Events.Ingested e -> Array.append es e.items, closed + | Events.Closed -> (es, true) + | Events.Snapshotted _ -> state // there's nothing useful in the snapshot for us to take + let fold : ReadState -> Events.Event seq -> ReadState = Seq.fold evolve + + type Service private (resolve : TrancheId * EpochId -> Equinox.Decider) = + + /// Returns all the items currently held in the stream + member _.Read(trancheId, epochId) : Async = + let decider = resolve (trancheId, epochId) + decider.Query id + + static member internal Create(resolveStream) = + let resolve = streamName >> resolveStream >> Equinox.createDecider + Service(resolve) + + module Cosmos = + + open Equinox.CosmosStore + + let accessStrategy = AccessStrategy.Unoptimized + let resolve (context, cache) = + let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 1.) + let resolver = CosmosStoreCategory(context, Events.codec, fold, initial, cacheStrategy, accessStrategy) + resolver.Resolve + let create (context, cache) = + Service.Create(resolve (context, cache)) diff --git a/equinox-patterns/Domain/Infrastructure.fs b/equinox-patterns/Domain/Infrastructure.fs index 11216ab31..1b92797a8 100644 --- a/equinox-patterns/Domain/Infrastructure.fs +++ b/equinox-patterns/Domain/Infrastructure.fs @@ -1,6 +1,11 @@ [] module Patterns.Domain.Infrastructure +module Equinox = + + let createDecider stream = + Equinox.Decider(Serilog.Log.Logger, stream, maxAttempts = 3) + /// Buffers events accumulated from a series of decisions while evolving the presented `state` to reflect said proposed `Events` type Accumulator<'event, 'state>(originState, fold : 'state -> 'event seq -> 'state) = let pendingEvents = ResizeArray() @@ -16,11 +21,23 @@ type Accumulator<'event, 'state>(originState, fold : 'state -> 'event seq -> 'st apply events r + /// Run a decision function that does not yield a result + member x.Transact decide = + x.Transact(fun state -> (), decide state) + /// Run an Async decision function, buffering and applying any Events yielded member _.TransactAsync decide = async { let! r, events = decide state apply events return r } + /// Run an Async decision function that does not yield a result + member x.TransactAsync decide = async { + let! events = decide state + apply events } + +// /// Projects from the present state including accumulated events +// member _.Query f = f state + /// Accumulated events based on the Decisions applied to date member _.Events = List.ofSeq pendingEvents diff --git a/equinox-patterns/Domain/Ingester.fs b/equinox-patterns/Domain/Ingester.fs new file mode 100644 index 000000000..296ca3c58 --- /dev/null +++ b/equinox-patterns/Domain/Ingester.fs @@ -0,0 +1,128 @@ +/// Application Service that controls the ingestion of Items into a chain of `Epoch` streams per Tranche +/// - the `Series` aggregate maintains a pointer to the current Epoch for each Tranche +/// - as `Epoch`s complete (have `Closed` events logged), we update the `active` Epoch in the Series to reference the new one +module Patterns.Domain.Ingester + +open Equinox.Core +open FSharp.UMX + +type internal IdsCache() = + // NOTE: Bounded only by relatively low number of physical items IRL + let all = System.Collections.Generic.HashSet() + static member Create init = let x = IdsCache() in x.Add init; x + member _.Add ids = all.UnionWith ids + member _.Contains id = all.Contains id + +/// Maintains active EpochId in a thread-safe manner while ingesting items into the chain of `epochs` indexed by the `series` +/// Prior to first add, reads `lookBack` batches to seed the cache, in order to minimize the number of duplicated items we ingest +type ServiceForTranche internal (log : Serilog.ILogger, trancheId, epochs : Epoch.IngestionService, series : Series.Service, lookBack, linger) = + + // Maintains what we believe to be the currently open EpochId + // NOTE not valid/initialized until invocation of `previousIds.AwaitValue()` has completed + let uninitializedSentinel = %EpochId.unknown + let mutable activeEpochId = uninitializedSentinel + let effectiveEpochId () = if activeEpochId = uninitializedSentinel then EpochId.initial else %activeEpochId + + // establish the pre-existing items from which the previousIds cache will be seeded + let loadPreviousEpochs loadDop : Async = async { + match! series.TryReadIngestionEpochId trancheId with + | None -> + log.Information("No starting epoch registered for {trancheId}", trancheId) + return Array.empty + | Some startingId -> + log.Information("Walking back from {trancheId}/{epochId}", trancheId, startingId) + activeEpochId <- %startingId + let readEpoch epochId = + log.Information("Reading {trancheId}/{epochId}", trancheId, epochId) + epochs.ReadIds(trancheId, epochId) + return! Async.Parallel(seq { for epochId in (max 0 (%startingId - lookBack)) .. (%startingId - 1) -> readEpoch %epochId }, loadDop) } + + // ItemIds cache - used to maintain a list of items that have already been ingested in order to avoid db round-trips + let previousIds : AsyncCacheCell = AsyncCacheCell <| async { + let! batches = loadPreviousEpochs 4 + return IdsCache.Create(Seq.concat batches) } + + let tryIngest items = async { + let! previousIds = previousIds.AwaitValue() + let firstEpochId = effectiveEpochId () + + let rec aux epochId ingestedItems items = async { + let dup, freshItems = items |> Array.partition (Epoch.itemId >> previousIds.Contains) + let fullCount = Array.length items + let dropping = fullCount - Array.length freshItems + if dropping <> 0 then + log.Information("Ignoring {count}/{fullCount} duplicate ids: {ids} for {trancheId}/{epochId}", dropping, fullCount, dup, trancheId, epochId) + if Array.isEmpty freshItems then + return ingestedItems + else + let! res = epochs.Ingest(trancheId, epochId, freshItems) + let ingestedItemIds = Array.append ingestedItems res.accepted + if (not << Array.isEmpty) res.accepted then + log.Information("Added {count} items to {trancheId}/{epochId}", res.accepted.Length, trancheId, epochId) + // The adding is potentially redundant; we don't care + previousIds.Add res.content + // Any writer noticing we've moved to a new Epoch shares the burden of marking it active in the Series + if not res.closed && activeEpochId < EpochId.value epochId then + log.Information("Marking {trancheId}/{epochId} active", trancheId, epochId) + do! series.MarkIngestionEpochId(trancheId, epochId) + System.Threading.Interlocked.CompareExchange(&activeEpochId, %epochId, activeEpochId) |> ignore + match res.rejected with + | [||] -> return ingestedItemIds + | remaining -> return! aux (EpochId.next epochId) ingestedItemIds remaining } + return! aux firstEpochId [||] (Array.concat items) + } + + /// Within the processing for a given Tranche, we have a Scheduler running N streams concurrently + /// If each thread works in isolation, they'll conflict with each other as they feed the ticket into the batch in epochs.Ingest + /// Instead, we enable concurrent requests to coalesce by having requests converge in this AsyncBatchingGate + /// This has the following critical effects: + /// - Traffic to CosmosDB is naturally constrained to a single flight in progress + /// (BatchingGate does not admit next batch until current has succeeded or throws) + /// - RU consumption for writing to the batch is optimized (1 write inserting 1 event document vs N writers writing N) + /// - Peak throughput is more consistent as latency is not impacted by the combination of having to: + /// a) back-off, re-read and retry if there's a concurrent write Optimistic Concurrency Check failure when writing the stream + /// b) enter a prolonged period of retries if multiple concurrent writes trigger rate limiting and 429s from CosmosDB + /// c) readers will less frequently encounter sustained 429s on the batch + let batchedIngest = AsyncBatchingGate(tryIngest, linger) + + /// Upon startup, we initialize the ItemIds cache from recent epochs; we want to kick that process off before our first ingest + member _.Initialize() = previousIds.AwaitValue() |> Async.Ignore + + /// Attempts to feed the items into the sequence of epochs. + /// Returns the subset that actually got fed in this time around. + member _.IngestMany(items : Epoch.Events.Item[]) : Async = async { + let! results = batchedIngest.Execute items + return System.Linq.Enumerable.Intersect(Seq.map Epoch.itemId items, results) + } + + /// Attempts to feed the item into the sequence of batches. + /// Returns true if the item actually got included into an Epoch this time around. + member _.TryIngest(item : Epoch.Events.Item) : Async = async { + let! result = batchedIngest.Execute(Array.singleton item) + return result |> Array.contains (Epoch.itemId item) + } + +let private createServiceForTranche (epochs, lookBackLimit) series linger trancheId = + let log = Serilog.Log.ForContext() + ServiceForTranche(log, trancheId, epochs, series, lookBack=lookBackLimit, linger=linger) + +/// Each ServiceForTranche maintains significant state (set of itemIds looking back through e.g. 100 epochs), which we obv need to cache +type Service internal (createForTranche : TrancheId -> ServiceForTranche) = + + // Its important we don't risk >1 instance https://andrewlock.net/making-getoradd-on-concurrentdictionary-thread-safe-using-lazy/ + // while it would be safe, there would be a risk of incurring the cost of multiple initialization loops + let forTranche = System.Collections.Concurrent.ConcurrentDictionary>() + let build trancheId = lazy createForTranche trancheId + + member _.ForTranche trancheId : ServiceForTranche = + forTranche.GetOrAdd(trancheId, build).Value + +module Cosmos = + + let create (context, cache) = + let maxItemsPerEpoch, lookBackLimit = 10_000, 100 + let epochs = Epoch.Cosmos.create maxItemsPerEpoch (context, cache) + let series = Series.Cosmos.create (context, cache) + let linger = System.TimeSpan.FromMilliseconds 200. + let createForTranche = createServiceForTranche (epochs, lookBackLimit) series linger + Service(createForTranche) diff --git a/equinox-patterns/Domain/Period.fs b/equinox-patterns/Domain/Period.fs index 6cc1d4384..2a850e21e 100644 --- a/equinox-patterns/Domain/Period.fs +++ b/equinox-patterns/Domain/Period.fs @@ -42,9 +42,9 @@ module Fold = let maybeOpen (getIncomingBalance : unit -> Async) state = async { match state with | Initial -> let! balance = getIncomingBalance () - return (), [BroughtForward balance] + return [BroughtForward balance] | Open _ - | Closed _ -> return (), [] } + | Closed _ -> return [] } /// Handles attempting to apply the request to the stream (assuming it's not already closed) /// The `decide` function can signal a need to close and/or split the request by emitting it as the residual diff --git a/equinox-patterns/Domain/Series.fs b/equinox-patterns/Domain/Series.fs new file mode 100644 index 000000000..ffda359ca --- /dev/null +++ b/equinox-patterns/Domain/Series.fs @@ -0,0 +1,78 @@ +/// Maintains a pointer into the Epoch chain for each Tranche +/// Allows an Ingester to quickly determine the current Epoch into which it should commence writing +/// As an Epoch is marked `Closed`, the Ingester will mark a new Epoch `Started` on this aggregate +module Patterns.Domain.Series + +let [] Category = "Series" +let streamName seriesId = FsCodec.StreamName.create Category (SeriesId.toString seriesId) + +// NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care +[] +module Events = + + type Event = + | Started of {| trancheId : TrancheId; epochId : EpochId |} + | Snapshotted of {| active : Map |} + interface TypeShape.UnionContract.IUnionContract + let codec = FsCodec.NewtonsoftJson.Codec.Create() + +module Fold = + + type State = Map + let initial = Map.empty + let evolve state = function + | Events.Started e -> state |> Map.add e.trancheId e.epochId + | Events.Snapshotted e -> e.active + let fold : State -> Events.Event seq -> State = Seq.fold evolve + + let isOrigin = function Events.Snapshotted _ -> true | _ -> false + let toSnapshot s = Events.Snapshotted {| active = s |} + +let tryFindEpochOfTranche : TrancheId -> Fold.State -> EpochId option = Map.tryFind + +let interpret trancheId epochId (state : Fold.State) = + [if state |> tryFindEpochOfTranche trancheId |> Option.forall (fun cur -> cur < epochId) && epochId >= EpochId.initial then + yield Events.Started {| trancheId = trancheId; epochId = epochId |}] + +type Service internal (resolve_ : Equinox.ResolveOption option -> SeriesId -> Equinox.Decider, ?seriesId) = + + let resolveUncached = resolve_ None + let resolveCached = resolve_ (Some Equinox.AllowStale) + + // For now we have a single global sequence. This provides us an extension point should we ever need to reprocess + // NOTE we use a custom id in order to isolate data for acceptance tests + let seriesId = defaultArg seriesId SeriesId.wellKnownId + + /// Exposes the set of tranches for which data is held + /// Never yields a cached value, in order to ensure a reader can traverse all tranches + member _.Read() : Async = + let decider = resolveUncached seriesId + decider.Query id + + /// Determines the current active epoch for the specified `trancheId` + /// Uses cached values as epoch transitions are rare, and caller needs to deal with the inherent race condition in any case + member _.TryReadIngestionEpochId trancheId : Async = + let decider = resolveCached seriesId + decider.Query(tryFindEpochOfTranche trancheId) + + /// Mark specified `epochId` as live for the purposes of ingesting + /// Writers are expected to react to having writes to an epoch denied (due to it being Closed) by anointing a successor via this + member _.MarkIngestionEpochId(trancheId, epochId) : Async = + let decider = resolveCached seriesId + decider.Transact(interpret trancheId epochId) + +let private create seriesOverride resolveStream = + let resolve opt = streamName >> resolveStream opt >> Equinox.createDecider + Service(resolve, ?seriesId = seriesOverride) + +module Cosmos = + + open Equinox.CosmosStore + + let accessStrategy = AccessStrategy.Snapshot (Fold.isOrigin, Fold.toSnapshot) + let private resolve (context, cache) = + let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + let resolver = CosmosStoreCategory(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy) + fun opt sn -> resolver.Resolve(sn, ?option = opt) + + let create (context, cache) = create None (resolve (context, cache)) diff --git a/equinox-patterns/Domain/Types.fs b/equinox-patterns/Domain/Types.fs index 6ab31a7de..dafa82914 100644 --- a/equinox-patterns/Domain/Types.fs +++ b/equinox-patterns/Domain/Types.fs @@ -2,6 +2,10 @@ namespace Patterns.Domain open FSharp.UMX +/// Identifies a single period within a temporally linked chain of periods +/// Each Period commences with a Balance `BroughtForward` based on what the predecessor Period +/// has decided should be `CarriedForward` +/// TODO prefix the name with a relevant Domain term type PeriodId = int and [] periodId module PeriodId = @@ -11,13 +15,9 @@ module PeriodId = let next (value : PeriodId) : PeriodId = %(%value + 1) let toString (value : PeriodId) : string = string %value -type ItemId = string -and [] itemId -module ItemId = - - let parse (value : string) : ItemId = %value - let toString (value : ItemId) : string = %value - +/// TODO replace the terms `tranche`, `trancheId` and type `TrancheId` +/// with a domain specific term that represents the nature of the separation +/// i.e. it might be a TenantId or a FulfilmentCenterId type TrancheId = string and [] trancheId module TrancheId = @@ -25,16 +25,30 @@ module TrancheId = let parse (value : string) : TrancheId = %value let toString (value : TrancheId) : string = %value +/// Identifies an Epoch that holds a set of Items (within a Tranche) +/// TODO prefix name with a Domain term referencing the Item being managed type EpochId = int and [] epochId module EpochId = -// let unknown = -1 -// let initial = 0 -// let next (value : EpochId) : EpochId = % (%value + 1) -// let value (value : EpochId) : int = %value + let unknown = -1 + let initial = 0 + let next (value : EpochId) : EpochId = % (%value + 1) + let value (value : EpochId) : int = %value let toString (value : EpochId) : string = string %value +/// Identifies an Item stored within an Epoch +/// TODO rename or prefix name with a Domain term referencing the Item being managed + +type ItemId = string +and [] itemId +module ItemId = + + let parse (value : string) : ItemId = %value + let toString (value : ItemId) : string = %value + +/// Identifies a group of chained Epochs (pe Tranche within the series) +/// TODO prefix name with a Domain term referencing the Item being managed type [] seriesId type SeriesId = string module SeriesId = From df362995ebc4b0a58fb32afbb30d9ffc4a481754 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Thu, 6 May 2021 13:13:36 +0100 Subject: [PATCH 12/20] Implement capacity-driven residuals --- equinox-patterns/Domain/Epoch.fs | 34 +++++++++++++++++------------ equinox-patterns/Domain/Ingester.fs | 22 +++++++++++-------- 2 files changed, 33 insertions(+), 23 deletions(-) diff --git a/equinox-patterns/Domain/Epoch.fs b/equinox-patterns/Domain/Epoch.fs index 4fbc80f0b..a64c1c7ce 100644 --- a/equinox-patterns/Domain/Epoch.fs +++ b/equinox-patterns/Domain/Epoch.fs @@ -37,7 +37,7 @@ let notAlreadyIn (ids : ItemId seq) = let ids = System.Collections.Generic.HashSet ids fun (x : Events.Item) -> (not << ids.Contains) x.id -type Result = { accepted : ItemId[]; rejected : Events.Item[]; content : ItemId[]; closed : bool } +type Result = { accepted : ItemId[]; residual : Events.Item[]; content : ItemId[]; closed : bool } // Note there aren't ever rejected Items in this implementation; the size of an epoch may actually exceed the capacity // Pros for not rejecting: @@ -47,32 +47,38 @@ type Result = { accepted : ItemId[]; rejected : Events.Item[]; content : ItemId[ // i.e. we did not have 10 items 2s ago and 3 just now - we had 13 2s ago let decide capacity candidates (currentIds, closed as state) = match closed, candidates |> Array.filter (notAlreadyIn currentIds) with - | true, freshCandidates -> { accepted = [||]; rejected = freshCandidates; content = currentIds; closed = closed }, [] - | false, [||] -> { accepted = [||]; rejected = [||]; content = currentIds; closed = closed }, [] + | true, freshCandidates -> { accepted = [||]; residual = freshCandidates; content = currentIds; closed = closed }, [] + | false, [||] -> { accepted = [||]; residual = [||]; content = currentIds; closed = closed }, [] | false, freshItems -> + let capacityNow = capacity freshItems currentIds + let acceptingCount = min capacityNow freshItems.Length + let closing = acceptingCount = capacityNow + let ItemIds addedItemIds as itemsIngested, residualItems = Array.splitAt acceptingCount freshItems let events = - let closingNow = Array.length currentIds + Array.length freshItems >= capacity - let maybeClosingEvent = if closingNow then [ Events.Closed ] else [] - Events.Ingested { items = freshItems } :: maybeClosingEvent + [ if closing then yield Events.Closed + if (not << Array.isEmpty) itemsIngested then yield Events.Ingested { items = itemsIngested } ] let currentIds, closed = Fold.fold state events - let (ItemIds addedItemIds) = freshItems - { accepted = addedItemIds; rejected = [||]; content = currentIds; closed = closed }, events + { accepted = addedItemIds; residual = residualItems; content = currentIds; closed = closed }, events (* Alternate implementation of the `decide` above employing the `Accumulator` helper. While it's a subjective thing, the takeaway should be that an accumulator rarely buys one code clarity *) let equivalentDecideUsingAnAccumulator capacity candidates (currentIds, closed as initialState) = match closed, candidates |> Array.filter (notAlreadyIn currentIds) with - | true, freshCandidates -> { accepted = [||]; rejected = freshCandidates; content = currentIds; closed = closed }, [] - | false, [||] -> { accepted = [||]; rejected = [||]; content = currentIds; closed = closed }, [] + | true, freshCandidates -> { accepted = [||]; residual = freshCandidates; content = currentIds; closed = closed }, [] + | false, [||] -> { accepted = [||]; residual = [||]; content = currentIds; closed = closed }, [] | false, freshItems -> + let capacityNow = capacity freshItems currentIds + let acceptingCount = min capacityNow freshItems.Length + let closing = acceptingCount = capacityNow + let ItemIds addedItemIds as itemsIngested, residualItems = Array.splitAt acceptingCount freshItems + let acc = Accumulator(initialState, Fold.fold) - acc.Transact(fun _ -> [ Events.Ingested { items = freshItems } ]) + if (not << Array.isEmpty) itemsIngested then acc.Transact(fun _ -> [ Events.Ingested { items = itemsIngested } ]) let currentIds, closed = acc.Transact(fun (currentIds, _) -> - let closing = Array.length currentIds >= capacity ((currentIds, closing), if closing then [Events.Closed] else [])) - let (ItemIds addedItemIds) = freshItems - { accepted = addedItemIds; rejected = [||]; content = currentIds; closed = closed }, acc.Events + + { accepted = addedItemIds; residual = residualItems; content = currentIds; closed = closed }, acc.Events /// Used by the Ingester to manages ingestion of items into the epoch, i.e. the Write side type IngestionService internal (capacity, resolve : TrancheId * EpochId -> Equinox.Decider) = diff --git a/equinox-patterns/Domain/Ingester.fs b/equinox-patterns/Domain/Ingester.fs index 296ca3c58..973bb3276 100644 --- a/equinox-patterns/Domain/Ingester.fs +++ b/equinox-patterns/Domain/Ingester.fs @@ -20,8 +20,9 @@ type ServiceForTranche internal (log : Serilog.ILogger, trancheId, epochs : Epoc // Maintains what we believe to be the currently open EpochId // NOTE not valid/initialized until invocation of `previousIds.AwaitValue()` has completed let uninitializedSentinel = %EpochId.unknown - let mutable activeEpochId = uninitializedSentinel - let effectiveEpochId () = if activeEpochId = uninitializedSentinel then EpochId.initial else %activeEpochId + let mutable activeEpochId_ = uninitializedSentinel + // NOTE see above - must not be called prior to previousIds.AwaitValue() + let effectiveEpochId () = if activeEpochId_ = uninitializedSentinel then EpochId.initial else %activeEpochId_ // establish the pre-existing items from which the previousIds cache will be seeded let loadPreviousEpochs loadDop : Async = async { @@ -31,7 +32,7 @@ type ServiceForTranche internal (log : Serilog.ILogger, trancheId, epochs : Epoc return Array.empty | Some startingId -> log.Information("Walking back from {trancheId}/{epochId}", trancheId, startingId) - activeEpochId <- %startingId + activeEpochId_ <- %startingId let readEpoch epochId = log.Information("Reading {trancheId}/{epochId}", trancheId, epochId) epochs.ReadIds(trancheId, epochId) @@ -62,11 +63,11 @@ type ServiceForTranche internal (log : Serilog.ILogger, trancheId, epochs : Epoc // The adding is potentially redundant; we don't care previousIds.Add res.content // Any writer noticing we've moved to a new Epoch shares the burden of marking it active in the Series - if not res.closed && activeEpochId < EpochId.value epochId then + if not res.closed && activeEpochId_ < EpochId.value epochId then log.Information("Marking {trancheId}/{epochId} active", trancheId, epochId) do! series.MarkIngestionEpochId(trancheId, epochId) - System.Threading.Interlocked.CompareExchange(&activeEpochId, %epochId, activeEpochId) |> ignore - match res.rejected with + System.Threading.Interlocked.CompareExchange(&activeEpochId_, %epochId, activeEpochId_) |> ignore + match res.residual with | [||] -> return ingestedItemIds | remaining -> return! aux (EpochId.next epochId) ingestedItemIds remaining } return! aux firstEpochId [||] (Array.concat items) @@ -120,9 +121,12 @@ type Service internal (createForTranche : TrancheId -> ServiceForTranche) = module Cosmos = let create (context, cache) = - let maxItemsPerEpoch, lookBackLimit = 10_000, 100 - let epochs = Epoch.Cosmos.create maxItemsPerEpoch (context, cache) + let maxItemsPerEpoch = 10_000 + let remainingBatchCapacity _candidateItems currentItems = + let l = Array.length currentItems + max 0 (maxItemsPerEpoch - l) + let epochs = Epoch.Cosmos.create remainingBatchCapacity (context, cache) let series = Series.Cosmos.create (context, cache) - let linger = System.TimeSpan.FromMilliseconds 200. + let linger, lookBackLimit = System.TimeSpan.FromMilliseconds 200., 100 let createForTranche = createServiceForTranche (epochs, lookBackLimit) series linger Service(createForTranche) From 93a49980310cbff7cf407ae83bb75899d3ece96b Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Thu, 6 May 2021 13:22:56 +0100 Subject: [PATCH 13/20] Unite Epoch/Period under Item placeholder name --- equinox-patterns/Domain/Domain.fsproj | 6 +-- .../Domain/{Epoch.fs => ItemEpoch.fs} | 10 ++--- .../Domain/{Ingester.fs => ItemIngester.fs} | 30 ++++++------- .../Domain/{Series.fs => ItemSeries.fs} | 22 ++++----- equinox-patterns/Domain/Types.fs | 45 +++++++++---------- 5 files changed, 56 insertions(+), 57 deletions(-) rename equinox-patterns/Domain/{Epoch.fs => ItemEpoch.fs} (93%) rename equinox-patterns/Domain/{Ingester.fs => ItemIngester.fs} (86%) rename equinox-patterns/Domain/{Series.fs => ItemSeries.fs} (81%) diff --git a/equinox-patterns/Domain/Domain.fsproj b/equinox-patterns/Domain/Domain.fsproj index 797918f02..ab72c52a5 100644 --- a/equinox-patterns/Domain/Domain.fsproj +++ b/equinox-patterns/Domain/Domain.fsproj @@ -9,9 +9,9 @@ - - - + + + diff --git a/equinox-patterns/Domain/Epoch.fs b/equinox-patterns/Domain/ItemEpoch.fs similarity index 93% rename from equinox-patterns/Domain/Epoch.fs rename to equinox-patterns/Domain/ItemEpoch.fs index a64c1c7ce..c20ec0fc6 100644 --- a/equinox-patterns/Domain/Epoch.fs +++ b/equinox-patterns/Domain/ItemEpoch.fs @@ -1,7 +1,7 @@ -module Patterns.Domain.Epoch +module Patterns.Domain.ItemEpoch -let [] Category = "Epoch" -let streamName (trancheId : TrancheId, epochId : EpochId) = FsCodec.StreamName.compose Category [TrancheId.toString trancheId; EpochId.toString epochId] +let [] Category = "ItemEpoch" +let streamName (trancheId : ItemTrancheId, epochId : ItemEpochId) = FsCodec.StreamName.compose Category [ItemTrancheId.toString trancheId; ItemEpochId.toString epochId] // NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care [] @@ -81,7 +81,7 @@ let equivalentDecideUsingAnAccumulator capacity candidates (currentIds, closed a { accepted = addedItemIds; residual = residualItems; content = currentIds; closed = closed }, acc.Events /// Used by the Ingester to manages ingestion of items into the epoch, i.e. the Write side -type IngestionService internal (capacity, resolve : TrancheId * EpochId -> Equinox.Decider) = +type IngestionService internal (capacity, resolve : ItemTrancheId * ItemEpochId -> Equinox.Decider) = /// Obtains a complete list of all the items in the specified trancheId/epochId member _.ReadIds(trancheId, epochId) : Async = @@ -124,7 +124,7 @@ module Reader = | Events.Snapshotted _ -> state // there's nothing useful in the snapshot for us to take let fold : ReadState -> Events.Event seq -> ReadState = Seq.fold evolve - type Service private (resolve : TrancheId * EpochId -> Equinox.Decider) = + type Service private (resolve : ItemTrancheId * ItemEpochId -> Equinox.Decider) = /// Returns all the items currently held in the stream member _.Read(trancheId, epochId) : Async = diff --git a/equinox-patterns/Domain/Ingester.fs b/equinox-patterns/Domain/ItemIngester.fs similarity index 86% rename from equinox-patterns/Domain/Ingester.fs rename to equinox-patterns/Domain/ItemIngester.fs index 973bb3276..5e992563b 100644 --- a/equinox-patterns/Domain/Ingester.fs +++ b/equinox-patterns/Domain/ItemIngester.fs @@ -1,7 +1,7 @@ /// Application Service that controls the ingestion of Items into a chain of `Epoch` streams per Tranche /// - the `Series` aggregate maintains a pointer to the current Epoch for each Tranche /// - as `Epoch`s complete (have `Closed` events logged), we update the `active` Epoch in the Series to reference the new one -module Patterns.Domain.Ingester +module Patterns.Domain.ItemIngester open Equinox.Core open FSharp.UMX @@ -15,14 +15,14 @@ type internal IdsCache() = /// Maintains active EpochId in a thread-safe manner while ingesting items into the chain of `epochs` indexed by the `series` /// Prior to first add, reads `lookBack` batches to seed the cache, in order to minimize the number of duplicated items we ingest -type ServiceForTranche internal (log : Serilog.ILogger, trancheId, epochs : Epoch.IngestionService, series : Series.Service, lookBack, linger) = +type ServiceForTranche internal (log : Serilog.ILogger, trancheId, epochs : ItemEpoch.IngestionService, series : ItemSeries.Service, lookBack, linger) = // Maintains what we believe to be the currently open EpochId // NOTE not valid/initialized until invocation of `previousIds.AwaitValue()` has completed - let uninitializedSentinel = %EpochId.unknown + let uninitializedSentinel = %ItemEpochId.unknown let mutable activeEpochId_ = uninitializedSentinel // NOTE see above - must not be called prior to previousIds.AwaitValue() - let effectiveEpochId () = if activeEpochId_ = uninitializedSentinel then EpochId.initial else %activeEpochId_ + let effectiveEpochId () = if activeEpochId_ = uninitializedSentinel then ItemEpochId.initial else %activeEpochId_ // establish the pre-existing items from which the previousIds cache will be seeded let loadPreviousEpochs loadDop : Async = async { @@ -48,7 +48,7 @@ type ServiceForTranche internal (log : Serilog.ILogger, trancheId, epochs : Epoc let firstEpochId = effectiveEpochId () let rec aux epochId ingestedItems items = async { - let dup, freshItems = items |> Array.partition (Epoch.itemId >> previousIds.Contains) + let dup, freshItems = items |> Array.partition (ItemEpoch.itemId >> previousIds.Contains) let fullCount = Array.length items let dropping = fullCount - Array.length freshItems if dropping <> 0 then @@ -63,13 +63,13 @@ type ServiceForTranche internal (log : Serilog.ILogger, trancheId, epochs : Epoc // The adding is potentially redundant; we don't care previousIds.Add res.content // Any writer noticing we've moved to a new Epoch shares the burden of marking it active in the Series - if not res.closed && activeEpochId_ < EpochId.value epochId then + if not res.closed && activeEpochId_ < ItemEpochId.value epochId then log.Information("Marking {trancheId}/{epochId} active", trancheId, epochId) do! series.MarkIngestionEpochId(trancheId, epochId) System.Threading.Interlocked.CompareExchange(&activeEpochId_, %epochId, activeEpochId_) |> ignore match res.residual with | [||] -> return ingestedItemIds - | remaining -> return! aux (EpochId.next epochId) ingestedItemIds remaining } + | remaining -> return! aux (ItemEpochId.next epochId) ingestedItemIds remaining } return! aux firstEpochId [||] (Array.concat items) } @@ -91,16 +91,16 @@ type ServiceForTranche internal (log : Serilog.ILogger, trancheId, epochs : Epoc /// Attempts to feed the items into the sequence of epochs. /// Returns the subset that actually got fed in this time around. - member _.IngestMany(items : Epoch.Events.Item[]) : Async = async { + member _.IngestMany(items : ItemEpoch.Events.Item[]) : Async = async { let! results = batchedIngest.Execute items - return System.Linq.Enumerable.Intersect(Seq.map Epoch.itemId items, results) + return System.Linq.Enumerable.Intersect(Seq.map ItemEpoch.itemId items, results) } /// Attempts to feed the item into the sequence of batches. /// Returns true if the item actually got included into an Epoch this time around. - member _.TryIngest(item : Epoch.Events.Item) : Async = async { + member _.TryIngest(item : ItemEpoch.Events.Item) : Async = async { let! result = batchedIngest.Execute(Array.singleton item) - return result |> Array.contains (Epoch.itemId item) + return result |> Array.contains (ItemEpoch.itemId item) } let private createServiceForTranche (epochs, lookBackLimit) series linger trancheId = @@ -108,11 +108,11 @@ let private createServiceForTranche (epochs, lookBackLimit) series linger tranch ServiceForTranche(log, trancheId, epochs, series, lookBack=lookBackLimit, linger=linger) /// Each ServiceForTranche maintains significant state (set of itemIds looking back through e.g. 100 epochs), which we obv need to cache -type Service internal (createForTranche : TrancheId -> ServiceForTranche) = +type Service internal (createForTranche : ItemTrancheId -> ServiceForTranche) = // Its important we don't risk >1 instance https://andrewlock.net/making-getoradd-on-concurrentdictionary-thread-safe-using-lazy/ // while it would be safe, there would be a risk of incurring the cost of multiple initialization loops - let forTranche = System.Collections.Concurrent.ConcurrentDictionary>() + let forTranche = System.Collections.Concurrent.ConcurrentDictionary>() let build trancheId = lazy createForTranche trancheId member _.ForTranche trancheId : ServiceForTranche = @@ -125,8 +125,8 @@ module Cosmos = let remainingBatchCapacity _candidateItems currentItems = let l = Array.length currentItems max 0 (maxItemsPerEpoch - l) - let epochs = Epoch.Cosmos.create remainingBatchCapacity (context, cache) - let series = Series.Cosmos.create (context, cache) + let epochs = ItemEpoch.Cosmos.create remainingBatchCapacity (context, cache) + let series = ItemSeries.Cosmos.create (context, cache) let linger, lookBackLimit = System.TimeSpan.FromMilliseconds 200., 100 let createForTranche = createServiceForTranche (epochs, lookBackLimit) series linger Service(createForTranche) diff --git a/equinox-patterns/Domain/Series.fs b/equinox-patterns/Domain/ItemSeries.fs similarity index 81% rename from equinox-patterns/Domain/Series.fs rename to equinox-patterns/Domain/ItemSeries.fs index ffda359ca..6f0943de7 100644 --- a/equinox-patterns/Domain/Series.fs +++ b/equinox-patterns/Domain/ItemSeries.fs @@ -1,24 +1,24 @@ /// Maintains a pointer into the Epoch chain for each Tranche /// Allows an Ingester to quickly determine the current Epoch into which it should commence writing /// As an Epoch is marked `Closed`, the Ingester will mark a new Epoch `Started` on this aggregate -module Patterns.Domain.Series +module Patterns.Domain.ItemSeries -let [] Category = "Series" -let streamName seriesId = FsCodec.StreamName.create Category (SeriesId.toString seriesId) +let [] Category = "ItemSeries" +let streamName seriesId = FsCodec.StreamName.create Category (ItemSeriesId.toString seriesId) // NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care [] module Events = type Event = - | Started of {| trancheId : TrancheId; epochId : EpochId |} - | Snapshotted of {| active : Map |} + | Started of {| trancheId : ItemTrancheId; epochId : ItemEpochId |} + | Snapshotted of {| active : Map |} interface TypeShape.UnionContract.IUnionContract let codec = FsCodec.NewtonsoftJson.Codec.Create() module Fold = - type State = Map + type State = Map let initial = Map.empty let evolve state = function | Events.Started e -> state |> Map.add e.trancheId e.epochId @@ -28,20 +28,20 @@ module Fold = let isOrigin = function Events.Snapshotted _ -> true | _ -> false let toSnapshot s = Events.Snapshotted {| active = s |} -let tryFindEpochOfTranche : TrancheId -> Fold.State -> EpochId option = Map.tryFind +let tryFindEpochOfTranche : ItemTrancheId -> Fold.State -> ItemEpochId option = Map.tryFind let interpret trancheId epochId (state : Fold.State) = - [if state |> tryFindEpochOfTranche trancheId |> Option.forall (fun cur -> cur < epochId) && epochId >= EpochId.initial then + [if state |> tryFindEpochOfTranche trancheId |> Option.forall (fun cur -> cur < epochId) && epochId >= ItemEpochId.initial then yield Events.Started {| trancheId = trancheId; epochId = epochId |}] -type Service internal (resolve_ : Equinox.ResolveOption option -> SeriesId -> Equinox.Decider, ?seriesId) = +type Service internal (resolve_ : Equinox.ResolveOption option -> ItemSeriesId -> Equinox.Decider, ?seriesId) = let resolveUncached = resolve_ None let resolveCached = resolve_ (Some Equinox.AllowStale) // For now we have a single global sequence. This provides us an extension point should we ever need to reprocess // NOTE we use a custom id in order to isolate data for acceptance tests - let seriesId = defaultArg seriesId SeriesId.wellKnownId + let seriesId = defaultArg seriesId ItemSeriesId.wellKnownId /// Exposes the set of tranches for which data is held /// Never yields a cached value, in order to ensure a reader can traverse all tranches @@ -51,7 +51,7 @@ type Service internal (resolve_ : Equinox.ResolveOption option -> SeriesId -> Eq /// Determines the current active epoch for the specified `trancheId` /// Uses cached values as epoch transitions are rare, and caller needs to deal with the inherent race condition in any case - member _.TryReadIngestionEpochId trancheId : Async = + member _.TryReadIngestionEpochId trancheId : Async = let decider = resolveCached seriesId decider.Query(tryFindEpochOfTranche trancheId) diff --git a/equinox-patterns/Domain/Types.fs b/equinox-patterns/Domain/Types.fs index dafa82914..c9d861c84 100644 --- a/equinox-patterns/Domain/Types.fs +++ b/equinox-patterns/Domain/Types.fs @@ -15,30 +15,29 @@ module PeriodId = let next (value : PeriodId) : PeriodId = %(%value + 1) let toString (value : PeriodId) : string = string %value -/// TODO replace the terms `tranche`, `trancheId` and type `TrancheId` -/// with a domain specific term that represents the nature of the separation +/// TODO replace the terms `tranche`, `trancheId` and type `TrancheId` with domain term that represents the nature of the separation /// i.e. it might be a TenantId or a FulfilmentCenterId -type TrancheId = string -and [] trancheId -module TrancheId = +type ItemTrancheId = string +and [] itemTrancheId +module ItemTrancheId = - let parse (value : string) : TrancheId = %value - let toString (value : TrancheId) : string = %value + let parse (value : string) : ItemTrancheId = %value + let toString (value : ItemTrancheId) : string = %value /// Identifies an Epoch that holds a set of Items (within a Tranche) -/// TODO prefix name with a Domain term referencing the Item being managed -type EpochId = int -and [] epochId -module EpochId = +/// TODO replace `Item` with a Domain term referencing the specific element being managed +type ItemEpochId = int +and [] itemEpochId +module ItemEpochId = - let unknown = -1 - let initial = 0 - let next (value : EpochId) : EpochId = % (%value + 1) - let value (value : EpochId) : int = %value - let toString (value : EpochId) : string = string %value + let unknown = -1 + let initial = 0 + let next (value : ItemEpochId) : ItemEpochId = % (%value + 1) + let value (value : ItemEpochId) : int = %value + let toString (value : ItemEpochId) : string = string %value /// Identifies an Item stored within an Epoch -/// TODO rename or prefix name with a Domain term referencing the Item being managed +/// TODO replace `Item` with a Domain term referencing the specific element being managed type ItemId = string and [] itemId @@ -48,10 +47,10 @@ module ItemId = let toString (value : ItemId) : string = %value /// Identifies a group of chained Epochs (pe Tranche within the series) -/// TODO prefix name with a Domain term referencing the Item being managed -type [] seriesId -type SeriesId = string -module SeriesId = +/// TODO replace `Item` with a Domain term referencing the specific element being managed +type [] itemSeriesId +type ItemSeriesId = string +module ItemSeriesId = - let wellKnownId : SeriesId = % "0" - let toString (value : SeriesId) : string = %value + let wellKnownId : ItemSeriesId = % "0" + let toString (value : ItemSeriesId) : string = %value From 5466a5016909bdb9f624938f94877cf1bee88297 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Fri, 7 May 2021 10:35:11 +0100 Subject: [PATCH 14/20] Add tests for unique checking --- .../Domain.Tests/Domain.Tests.fsproj | 5 +- .../Domain.Tests/Infrastructure.fs | 34 ++++++++++ .../Domain.Tests/ItemIngesterTests.fs | 67 +++++++++++++++++++ equinox-patterns/Domain/ItemEpoch.fs | 38 +++++++---- equinox-patterns/Domain/ItemIngester.fs | 18 ++++- equinox-patterns/Domain/ItemSeries.fs | 16 +++-- equinox-patterns/Domain/Period.fs | 2 +- 7 files changed, 156 insertions(+), 24 deletions(-) create mode 100644 equinox-patterns/Domain.Tests/Infrastructure.fs create mode 100644 equinox-patterns/Domain.Tests/ItemIngesterTests.fs diff --git a/equinox-patterns/Domain.Tests/Domain.Tests.fsproj b/equinox-patterns/Domain.Tests/Domain.Tests.fsproj index 2c9fc12dc..873b1ab64 100644 --- a/equinox-patterns/Domain.Tests/Domain.Tests.fsproj +++ b/equinox-patterns/Domain.Tests/Domain.Tests.fsproj @@ -8,12 +8,15 @@ + + - + + diff --git a/equinox-patterns/Domain.Tests/Infrastructure.fs b/equinox-patterns/Domain.Tests/Infrastructure.fs new file mode 100644 index 000000000..e5070c2f9 --- /dev/null +++ b/equinox-patterns/Domain.Tests/Infrastructure.fs @@ -0,0 +1,34 @@ +[] +module Patterns.Domain.Tests.Infrastructure + +open FSharp.UMX +open System +open FsCheck +open Patterns.Domain + +(* Generic FsCheck helpers *) + +let (|Id|) (x : Guid) = x.ToString "N" |> UMX.tag +let inline mkId () = Guid.NewGuid() |> (|Id|) +let (|Ids|) (xs : Guid[]) = xs |> Array.map (|Id|) + +type DomainArbs() = + + static member Item : Arbitrary = Arb.fromGen <| gen { + let! r = Arb.Default.Derive() |> Arb.toGen + let id = mkId () // TODO why doesnt `let (Id id) = Arb.generate` generate fresh every time? + return { r with id = id } + } + +type DomainProperty() = inherit FsCheck.Xunit.PropertyAttribute(Arbitrary=[|typeof|], QuietOnSuccess=true) + +/// Inspired by AutoFixture.XUnit's AutoDataAttribute - generating test data without the full Property Based Tests experience +/// By using this instead of Property, the developer has +/// a) asserted by using this property instead of [] +/// b) indirectly validated by running the tests frequently locally in DEBUG mode +/// that running the test multiple times is not a useful thing to do +#if !DEBUG +type AutoDataAttribute() = inherit FsCheck.Xunit.PropertyAttribute(Arbitrary=[|typeof|], MaxTest=1, QuietOnSuccess=true) +#else +type AutoDataAttribute() = inherit FsCheck.Xunit.PropertyAttribute(Arbitrary=[|typeof|], MaxTest=5, QuietOnSuccess=true) +#endif \ No newline at end of file diff --git a/equinox-patterns/Domain.Tests/ItemIngesterTests.fs b/equinox-patterns/Domain.Tests/ItemIngesterTests.fs new file mode 100644 index 000000000..ea6698f49 --- /dev/null +++ b/equinox-patterns/Domain.Tests/ItemIngesterTests.fs @@ -0,0 +1,67 @@ +module Patterns.Domain.Tests.ItemIngesterTests + +open Patterns.Domain +open Patterns.Domain.ItemIngester +open FsCheck.Xunit +open FSharp.UMX +open Swensen.Unquote + +let linger, lookBackLimit, maxPickTicketsPerBatch = System.TimeSpan.FromMilliseconds 1., 2, 5 + +let createSut store trancheId = + // While we use ~ 200ms when hitting Cosmos, there's no value in doing so in the context of these property based tests + let service = MemoryStore.Create(store, linger=linger, maxItemsPerEpoch=maxPickTicketsPerBatch, lookBackLimit=lookBackLimit) + service.ForTranche trancheId + +let [] properties shouldInitialize shouldUseSameSut (Id trancheId) initialItems items = + let store = Equinox.MemoryStore.VolatileStore() + Async.RunSynchronously <| async { + // Initialize with some items + let initialSut = createSut store trancheId + if shouldInitialize then do! initialSut.Initialize() + let! initialResult = initialSut.IngestMany(initialItems) + let initialExpected = initialItems |> Seq.map ItemEpoch.itemId |> Array.ofSeq + test <@ set initialExpected = set initialResult @> + + // Add some extra + let sut = if shouldUseSameSut then initialSut else createSut store trancheId + if shouldInitialize then do! sut.Initialize() + let! result = sut.IngestMany items + let expected = items |> Seq.map ItemEpoch.itemId |> Seq.except initialExpected |> Seq.distinct + test <@ set expected = set result @> + + // Add the same stuff for a different tranche; the data should be completely independent from an ingestion perspective + let differentTranche = %(sprintf "%s2" %trancheId) + let differentSutSameStore = createSut store differentTranche + let! independentResult = differentSutSameStore.IngestMany(Array.append initialItems items) + test <@ set initialResult + set result = set independentResult @> + } + +let [] ``lookBack is limited`` (Id trancheId) genItem = + let store = Equinox.MemoryStore.VolatileStore() + Async.RunSynchronously <| async { + // Initialize with more items than the lookBack accommodates + let initialSut = createSut store trancheId + let itemCount = + // Fill up lookBackLimit batches, and another one as batch 0 that we will not look include in the load + (lookBackLimit+1) * maxPickTicketsPerBatch + // Add one more so we end up with an active batchId = lookBackLimit + + 1 + let items = Array.init itemCount (fun _ -> genItem () ) + test <@ Array.distinct items = items @> + let batch0 = Array.take maxPickTicketsPerBatch items + let batchesInLookBack = Array.skip maxPickTicketsPerBatch items + let! b0Added = initialSut.IngestMany batch0 + let b0Added = Array.ofSeq b0Added + test <@ maxPickTicketsPerBatch = Array.length b0Added @> + let! batchesInLookBackAdded = initialSut.IngestMany batchesInLookBack + test <@ itemCount = Set.count (set b0Added + set batchesInLookBackAdded) @> + + // Now try to add the same items - the first batch worth should not be deduplicated + let sut = createSut store trancheId + let! result = sut.IngestMany items + let result = Array.ofSeq result + test <@ itemCount = itemCount + && result.Length = maxPickTicketsPerBatch + && set result = set b0Added @> + } diff --git a/equinox-patterns/Domain/ItemEpoch.fs b/equinox-patterns/Domain/ItemEpoch.fs index c20ec0fc6..7c9ebbf68 100644 --- a/equinox-patterns/Domain/ItemEpoch.fs +++ b/equinox-patterns/Domain/ItemEpoch.fs @@ -98,17 +98,23 @@ let private create capacity resolveStream = let resolve = streamName >> resolveStream Equinox.AllowStale >> Equinox.createDecider IngestionService(capacity, resolve) +module MemoryStore = + + let create capacity store = + let cat = Equinox.MemoryStore.MemoryStoreCategory(store, Events.codec, Fold.fold, Fold.initial) + let resolveStream opt sn = cat.Resolve(sn, opt) + create capacity resolveStream + module Cosmos = open Equinox.CosmosStore let accessStrategy = AccessStrategy.Snapshot (Fold.isOrigin, Fold.toSnapshot) - let private resolve (context, cache) = - let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) - let resolver = CosmosStoreCategory(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy) - fun opt sn -> resolver.Resolve(sn, opt) let create capacity (context, cache) = - create capacity (resolve (context, cache)) + let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + let cat = CosmosStoreCategory(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy) + let resolveStream opt sn = cat.Resolve(sn, opt) + create capacity resolveStream /// Custom Fold and caching logic compared to the IngesterService /// - When reading, we want the full Items @@ -124,25 +130,29 @@ module Reader = | Events.Snapshotted _ -> state // there's nothing useful in the snapshot for us to take let fold : ReadState -> Events.Event seq -> ReadState = Seq.fold evolve - type Service private (resolve : ItemTrancheId * ItemEpochId -> Equinox.Decider) = + type Service internal (resolve : ItemTrancheId * ItemEpochId -> Equinox.Decider) = /// Returns all the items currently held in the stream member _.Read(trancheId, epochId) : Async = let decider = resolve (trancheId, epochId) decider.Query id - static member internal Create(resolveStream) = - let resolve = streamName >> resolveStream >> Equinox.createDecider - Service(resolve) + let private create resolveStream = + let resolve = streamName >> resolveStream >> Equinox.createDecider + Service resolve + + module MemoryStore = + + let create store = + let cat = Equinox.MemoryStore.MemoryStoreCategory(store, Events.codec, fold, initial) + create cat.Resolve module Cosmos = open Equinox.CosmosStore let accessStrategy = AccessStrategy.Unoptimized - let resolve (context, cache) = - let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 1.) - let resolver = CosmosStoreCategory(context, Events.codec, fold, initial, cacheStrategy, accessStrategy) - resolver.Resolve let create (context, cache) = - Service.Create(resolve (context, cache)) + let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 1.) + let cat = CosmosStoreCategory(context, Events.codec, fold, initial, cacheStrategy, accessStrategy) + create cat.Resolve diff --git a/equinox-patterns/Domain/ItemIngester.fs b/equinox-patterns/Domain/ItemIngester.fs index 5e992563b..31cf6e70d 100644 --- a/equinox-patterns/Domain/ItemIngester.fs +++ b/equinox-patterns/Domain/ItemIngester.fs @@ -118,15 +118,27 @@ type Service internal (createForTranche : ItemTrancheId -> ServiceForTranche) = member _.ForTranche trancheId : ServiceForTranche = forTranche.GetOrAdd(trancheId, build).Value +let private maxItemsPerEpoch = 10_000 +let private linger, lookBackLimit = System.TimeSpan.FromMilliseconds 200., 100 + +type MemoryStore() = + + static member Create(store, linger, maxItemsPerEpoch, lookBackLimit) = + let remainingBatchCapacity _candidateItems currentItems = + let l = Array.length currentItems + max 0 (maxItemsPerEpoch - l) + let epochs = ItemEpoch.MemoryStore.create remainingBatchCapacity store + let series = ItemSeries.MemoryStore.create store + let createForTranche = createServiceForTranche (epochs, lookBackLimit) series linger + Service createForTranche + module Cosmos = let create (context, cache) = - let maxItemsPerEpoch = 10_000 let remainingBatchCapacity _candidateItems currentItems = let l = Array.length currentItems max 0 (maxItemsPerEpoch - l) let epochs = ItemEpoch.Cosmos.create remainingBatchCapacity (context, cache) let series = ItemSeries.Cosmos.create (context, cache) - let linger, lookBackLimit = System.TimeSpan.FromMilliseconds 200., 100 let createForTranche = createServiceForTranche (epochs, lookBackLimit) series linger - Service(createForTranche) + Service createForTranche diff --git a/equinox-patterns/Domain/ItemSeries.fs b/equinox-patterns/Domain/ItemSeries.fs index 6f0943de7..b2844e956 100644 --- a/equinox-patterns/Domain/ItemSeries.fs +++ b/equinox-patterns/Domain/ItemSeries.fs @@ -65,14 +65,20 @@ let private create seriesOverride resolveStream = let resolve opt = streamName >> resolveStream opt >> Equinox.createDecider Service(resolve, ?seriesId = seriesOverride) +module MemoryStore = + + let create store = + let cat = Equinox.MemoryStore.MemoryStoreCategory(store, Events.codec, Fold.fold, Fold.initial) + let resolveStream opt sn = cat.Resolve(sn, ?option = opt) + create None resolveStream + module Cosmos = open Equinox.CosmosStore let accessStrategy = AccessStrategy.Snapshot (Fold.isOrigin, Fold.toSnapshot) - let private resolve (context, cache) = + let create (context, cache) = let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) - let resolver = CosmosStoreCategory(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy) - fun opt sn -> resolver.Resolve(sn, ?option = opt) - - let create (context, cache) = create None (resolve (context, cache)) + let cat = CosmosStoreCategory(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy) + let resolveStream opt sn = cat.Resolve(sn, ?option = opt) + create None resolveStream diff --git a/equinox-patterns/Domain/Period.fs b/equinox-patterns/Domain/Period.fs index 2a850e21e..0cea1981a 100644 --- a/equinox-patterns/Domain/Period.fs +++ b/equinox-patterns/Domain/Period.fs @@ -138,7 +138,7 @@ type Service internal (resolve : PeriodId -> Equinox.Decider(), streamName id |> resolveStream, maxAttempts = 3) - Service(resolve) + Service resolve module MemoryStore = From d5fd5e15f1a30407784c1dcc7ecb845106256ef5 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Sat, 8 May 2021 08:21:57 +0100 Subject: [PATCH 15/20] Polish --- equinox-patterns/Domain/Infrastructure.fs | 4 +-- equinox-patterns/Domain/ItemEpoch.fs | 34 +++++------------------ equinox-patterns/Domain/ItemSeries.fs | 16 +++++------ equinox-patterns/Domain/Period.fs | 21 ++++++++++++-- 4 files changed, 36 insertions(+), 39 deletions(-) diff --git a/equinox-patterns/Domain/Infrastructure.fs b/equinox-patterns/Domain/Infrastructure.fs index 1b92797a8..3ef7d2da2 100644 --- a/equinox-patterns/Domain/Infrastructure.fs +++ b/equinox-patterns/Domain/Infrastructure.fs @@ -22,8 +22,8 @@ type Accumulator<'event, 'state>(originState, fold : 'state -> 'event seq -> 'st r /// Run a decision function that does not yield a result - member x.Transact decide = - x.Transact(fun state -> (), decide state) +// member x.Transact decide = +// x.Transact(fun state -> (), decide state) /// Run an Async decision function, buffering and applying any Events yielded member _.TransactAsync decide = async { diff --git a/equinox-patterns/Domain/ItemEpoch.fs b/equinox-patterns/Domain/ItemEpoch.fs index 7c9ebbf68..178e98643 100644 --- a/equinox-patterns/Domain/ItemEpoch.fs +++ b/equinox-patterns/Domain/ItemEpoch.fs @@ -11,9 +11,9 @@ module Events = and Item = { id : ItemId; payload : string } type Snapshotted = { ids : ItemId[]; closed : bool } type Event = - | Ingested of Ingested + | Ingested of Ingested | Closed - | Snapshotted of Snapshotted + | Snapshotted of Snapshotted interface TypeShape.UnionContract.IUnionContract let codec = FsCodec.NewtonsoftJson.Codec.Create() @@ -55,31 +55,11 @@ let decide capacity candidates (currentIds, closed as state) = let closing = acceptingCount = capacityNow let ItemIds addedItemIds as itemsIngested, residualItems = Array.splitAt acceptingCount freshItems let events = - [ if closing then yield Events.Closed - if (not << Array.isEmpty) itemsIngested then yield Events.Ingested { items = itemsIngested } ] + [ if (not << Array.isEmpty) itemsIngested then yield Events.Ingested { items = itemsIngested } + if closing then yield Events.Closed ] let currentIds, closed = Fold.fold state events { accepted = addedItemIds; residual = residualItems; content = currentIds; closed = closed }, events -(* Alternate implementation of the `decide` above employing the `Accumulator` helper. - While it's a subjective thing, the takeaway should be that an accumulator rarely buys one code clarity *) -let equivalentDecideUsingAnAccumulator capacity candidates (currentIds, closed as initialState) = - match closed, candidates |> Array.filter (notAlreadyIn currentIds) with - | true, freshCandidates -> { accepted = [||]; residual = freshCandidates; content = currentIds; closed = closed }, [] - | false, [||] -> { accepted = [||]; residual = [||]; content = currentIds; closed = closed }, [] - | false, freshItems -> - let capacityNow = capacity freshItems currentIds - let acceptingCount = min capacityNow freshItems.Length - let closing = acceptingCount = capacityNow - let ItemIds addedItemIds as itemsIngested, residualItems = Array.splitAt acceptingCount freshItems - - let acc = Accumulator(initialState, Fold.fold) - if (not << Array.isEmpty) itemsIngested then acc.Transact(fun _ -> [ Events.Ingested { items = itemsIngested } ]) - let currentIds, closed = - acc.Transact(fun (currentIds, _) -> - ((currentIds, closing), if closing then [Events.Closed] else [])) - - { accepted = addedItemIds; residual = residualItems; content = currentIds; closed = closed }, acc.Events - /// Used by the Ingester to manages ingestion of items into the epoch, i.e. the Write side type IngestionService internal (capacity, resolve : ItemTrancheId * ItemEpochId -> Equinox.Decider) = @@ -95,14 +75,14 @@ type IngestionService internal (capacity, resolve : ItemTrancheId * ItemEpochId decider.Transact(decide capacity items) let private create capacity resolveStream = - let resolve = streamName >> resolveStream Equinox.AllowStale >> Equinox.createDecider + let resolve = streamName >> resolveStream (Some Equinox.AllowStale) >> Equinox.createDecider IngestionService(capacity, resolve) module MemoryStore = let create capacity store = let cat = Equinox.MemoryStore.MemoryStoreCategory(store, Events.codec, Fold.fold, Fold.initial) - let resolveStream opt sn = cat.Resolve(sn, opt) + let resolveStream opt sn = cat.Resolve(sn, ?option = opt) create capacity resolveStream module Cosmos = @@ -113,7 +93,7 @@ module Cosmos = let create capacity (context, cache) = let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) let cat = CosmosStoreCategory(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy) - let resolveStream opt sn = cat.Resolve(sn, opt) + let resolveStream opt sn = cat.Resolve(sn, ?option = opt) create capacity resolveStream /// Custom Fold and caching logic compared to the IngesterService diff --git a/equinox-patterns/Domain/ItemSeries.fs b/equinox-patterns/Domain/ItemSeries.fs index b2844e956..a14ee861f 100644 --- a/equinox-patterns/Domain/ItemSeries.fs +++ b/equinox-patterns/Domain/ItemSeries.fs @@ -11,8 +11,8 @@ let streamName seriesId = FsCodec.StreamName.create Category (ItemSeriesId.toStr module Events = type Event = - | Started of {| trancheId : ItemTrancheId; epochId : ItemEpochId |} - | Snapshotted of {| active : Map |} + | Started of {| trancheId : ItemTrancheId; epochId : ItemEpochId |} + | Snapshotted of {| active : Map |} interface TypeShape.UnionContract.IUnionContract let codec = FsCodec.NewtonsoftJson.Codec.Create() @@ -21,7 +21,7 @@ module Fold = type State = Map let initial = Map.empty let evolve state = function - | Events.Started e -> state |> Map.add e.trancheId e.epochId + | Events.Started e -> state |> Map.add e.trancheId e.epochId | Events.Snapshotted e -> e.active let fold : State -> Events.Event seq -> State = Seq.fold evolve @@ -36,8 +36,8 @@ let interpret trancheId epochId (state : Fold.State) = type Service internal (resolve_ : Equinox.ResolveOption option -> ItemSeriesId -> Equinox.Decider, ?seriesId) = - let resolveUncached = resolve_ None - let resolveCached = resolve_ (Some Equinox.AllowStale) + let resolve = resolve_ None + let resolveStale = resolve_ (Some Equinox.AllowStale) // For now we have a single global sequence. This provides us an extension point should we ever need to reprocess // NOTE we use a custom id in order to isolate data for acceptance tests @@ -46,19 +46,19 @@ type Service internal (resolve_ : Equinox.ResolveOption option -> ItemSeriesId - /// Exposes the set of tranches for which data is held /// Never yields a cached value, in order to ensure a reader can traverse all tranches member _.Read() : Async = - let decider = resolveUncached seriesId + let decider = resolve seriesId decider.Query id /// Determines the current active epoch for the specified `trancheId` /// Uses cached values as epoch transitions are rare, and caller needs to deal with the inherent race condition in any case member _.TryReadIngestionEpochId trancheId : Async = - let decider = resolveCached seriesId + let decider = resolveStale seriesId decider.Query(tryFindEpochOfTranche trancheId) /// Mark specified `epochId` as live for the purposes of ingesting /// Writers are expected to react to having writes to an epoch denied (due to it being Closed) by anointing a successor via this member _.MarkIngestionEpochId(trancheId, epochId) : Async = - let decider = resolveCached seriesId + let decider = resolveStale seriesId decider.Transact(interpret trancheId epochId) let private create seriesOverride resolveStream = diff --git a/equinox-patterns/Domain/Period.fs b/equinox-patterns/Domain/Period.fs index 0cea1981a..4e0d6338d 100644 --- a/equinox-patterns/Domain/Period.fs +++ b/equinox-patterns/Domain/Period.fs @@ -132,16 +132,33 @@ type Service internal (resolve : PeriodId -> Equinox.Decider(), streamName id |> resolveStream, maxAttempts = 3) + let resolve = streamName >> resolveStream (Some Equinox.AllowStale) >> Equinox.createDecider Service resolve module MemoryStore = let create store = let cat = Equinox.MemoryStore.MemoryStoreCategory(store, Events.codec, Fold.fold, Fold.initial) - create cat.Resolve + let resolveStream opt sn = cat.Resolve(sn, ?option = opt) + create resolveStream + +module Cosmos = + + open Equinox.CosmosStore + + // Not using snapshots, on the basis that the writes are all coming from this process, so the cache will be sufficient + // to make reads cheap enough, with the benefit of writes being cheaper as you're not paying to maintain the snapshot + let accessStrategy = AccessStrategy.Unoptimized + let create (context, cache) = + let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + let cat = CosmosStoreCategory(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy) + let resolveStream opt sn = cat.Resolve(sn, ?option = opt) + create resolveStream From 4274ff78bc4ee28975a81c28e20ae8719cab4bb3 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Sat, 8 May 2021 08:28:05 +0100 Subject: [PATCH 16/20] Format --- equinox-patterns/Domain/Period.fs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/equinox-patterns/Domain/Period.fs b/equinox-patterns/Domain/Period.fs index 4e0d6338d..89366afee 100644 --- a/equinox-patterns/Domain/Period.fs +++ b/equinox-patterns/Domain/Period.fs @@ -14,10 +14,10 @@ module Events = type ItemIds = { items : ItemId[] } type Balance = ItemIds type Event = - | BroughtForward of Balance - | Added of ItemIds - | Removed of ItemIds - | CarriedForward of Balance + | BroughtForward of Balance + | Added of ItemIds + | Removed of ItemIds + | CarriedForward of Balance interface TypeShape.UnionContract.IUnionContract let codec = FsCodec.NewtonsoftJson.Codec.Create() @@ -25,8 +25,8 @@ module Fold = type State = | Initial - | Open of items : OpenState - | Closed of items : ItemId[] * carryingForward : ItemId[] + | Open of items : OpenState + | Closed of items : ItemId[] * carryingForward : ItemId[] and OpenState = ItemId[] let initial : State = Initial let (|Items|) = function Initial -> [||] | Open i | Closed (i, _) -> i @@ -136,8 +136,8 @@ type Service internal (resolve : PeriodId -> Equinox.Decider> resolveStream (Some Equinox.AllowStale) >> Equinox.createDecider From bf729ef638254077411b4b3fcd89f56a6413b0f9 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Sat, 8 May 2021 08:39:10 +0100 Subject: [PATCH 17/20] Polish --- equinox-patterns/Domain/Period.fs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/equinox-patterns/Domain/Period.fs b/equinox-patterns/Domain/Period.fs index 89366afee..1239a28ff 100644 --- a/equinox-patterns/Domain/Period.fs +++ b/equinox-patterns/Domain/Period.fs @@ -16,7 +16,6 @@ module Events = type Event = | BroughtForward of Balance | Added of ItemIds - | Removed of ItemIds | CarriedForward of Balance interface TypeShape.UnionContract.IUnionContract let codec = FsCodec.NewtonsoftJson.Codec.Create() @@ -25,8 +24,8 @@ module Fold = type State = | Initial - | Open of items : OpenState - | Closed of items : ItemId[] * carryingForward : ItemId[] + | Open of items : OpenState + | Closed of items : ItemId[] * carryingForward : ItemId[] and OpenState = ItemId[] let initial : State = Initial let (|Items|) = function Initial -> [||] | Open i | Closed (i, _) -> i @@ -34,7 +33,6 @@ module Fold = let evolve (Items items) = function | BroughtForward e | Added e -> Open (Array.append items e.items) - | Removed e -> Open (items |> Array.except e.items) | CarriedForward e -> Closed (items, e.items) let fold = Seq.fold evolve From 9fb50dd92165bdeb8767ecc2cc10947a8e1343f8 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Sat, 8 May 2021 09:24:01 +0100 Subject: [PATCH 18/20] Formatting --- equinox-patterns/Domain/Period.fs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/equinox-patterns/Domain/Period.fs b/equinox-patterns/Domain/Period.fs index 1239a28ff..0d89f5464 100644 --- a/equinox-patterns/Domain/Period.fs +++ b/equinox-patterns/Domain/Period.fs @@ -14,9 +14,9 @@ module Events = type ItemIds = { items : ItemId[] } type Balance = ItemIds type Event = - | BroughtForward of Balance - | Added of ItemIds - | CarriedForward of Balance + | BroughtForward of Balance + | Added of ItemIds + | CarriedForward of Balance interface TypeShape.UnionContract.IUnionContract let codec = FsCodec.NewtonsoftJson.Codec.Create() @@ -63,18 +63,18 @@ module Fold = [] type Rules<'request, 'result> = - { getIncomingBalance : unit -> Async - decideIngestion : 'request -> Fold.State -> 'request * 'result * Events.Event list - decideCarryForward : 'request -> Fold.OpenState -> Async } + { getIncomingBalance: unit -> Async + decideIngestion : 'request -> Fold.State -> 'request * 'result * Events.Event list + decideCarryForward: 'request -> Fold.OpenState -> Async } /// The result of the overall ingestion, consisting of type Result<'request, 'result> = { /// residual of the request, in the event where it was not possible to ingest it completely - residual : 'request + residual : 'request /// The result of the decision (assuming processing took place) - result : 'result option + result : 'result option /// balance being carried forward in the event that the successor period has yet to have the BroughtForward event generated - carryForward : Events.Balance option } + carryForward : Events.Balance option } /// Decision function ensuring the high level rules of an Period are adhered to viz. /// 1. Streams must open with a BroughtForward event (obtained via Rules.getIncomingBalance if this is an uninitialized Period) From 49e551b6fac2f5d7057a96266ebdcbb934f447d2 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Sun, 9 May 2021 16:11:24 +0100 Subject: [PATCH 19/20] Fixed comments, thanks Rambert! --- equinox-patterns/Domain/ItemIngester.fs | 6 +++++- equinox-patterns/Domain/Period.fs | 5 +++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/equinox-patterns/Domain/ItemIngester.fs b/equinox-patterns/Domain/ItemIngester.fs index 31cf6e70d..bafd04941 100644 --- a/equinox-patterns/Domain/ItemIngester.fs +++ b/equinox-patterns/Domain/ItemIngester.fs @@ -86,7 +86,11 @@ type ServiceForTranche internal (log : Serilog.ILogger, trancheId, epochs : Item /// c) readers will less frequently encounter sustained 429s on the batch let batchedIngest = AsyncBatchingGate(tryIngest, linger) - /// Upon startup, we initialize the ItemIds cache from recent epochs; we want to kick that process off before our first ingest + /// Optional API enabling one to preemptively initialize the cache based on the walk back of previous epochs to load + /// ItemIds for deduplication (which would otherwise add to the latency of the first incoming request(s)). It's safe + /// to ignore the result of this call (i.e., trigger it via Async.Start) as the usual semantics of + /// AsyncBatchingGate pertain:- everyone gets to see the exception from the batch being processed, including + /// that initialization, and a failed attempt is never cached) member _.Initialize() = previousIds.AwaitValue() |> Async.Ignore /// Attempts to feed the items into the sequence of epochs. diff --git a/equinox-patterns/Domain/Period.fs b/equinox-patterns/Domain/Period.fs index 0d89f5464..fba227132 100644 --- a/equinox-patterns/Domain/Period.fs +++ b/equinox-patterns/Domain/Period.fs @@ -131,8 +131,9 @@ type Service internal (resolve : PeriodId -> Equinox.Decider Date: Sun, 9 May 2021 16:22:32 +0100 Subject: [PATCH 20/20] More comment fixes --- equinox-patterns/Domain/ItemEpoch.fs | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/equinox-patterns/Domain/ItemEpoch.fs b/equinox-patterns/Domain/ItemEpoch.fs index 178e98643..1d8ab6b1f 100644 --- a/equinox-patterns/Domain/ItemEpoch.fs +++ b/equinox-patterns/Domain/ItemEpoch.fs @@ -39,17 +39,17 @@ let notAlreadyIn (ids : ItemId seq) = type Result = { accepted : ItemId[]; residual : Events.Item[]; content : ItemId[]; closed : bool } -// Note there aren't ever rejected Items in this implementation; the size of an epoch may actually exceed the capacity -// Pros for not rejecting: -// - snapshots should compress well -// - we want to avoid a second roundtrip -// - splitting a batched write into multiple writes with multiple events misrepresents the facts -// i.e. we did not have 10 items 2s ago and 3 just now - we had 13 2s ago let decide capacity candidates (currentIds, closed as state) = match closed, candidates |> Array.filter (notAlreadyIn currentIds) with | true, freshCandidates -> { accepted = [||]; residual = freshCandidates; content = currentIds; closed = closed }, [] | false, [||] -> { accepted = [||]; residual = [||]; content = currentIds; closed = closed }, [] | false, freshItems -> + // NOTE we in some cases end up triggering splitting of a request (or set of requests coalesced in the AsyncBatchingGate) + // In some cases it might be better to be a little tolerant and not be rigid about limiting things as + // - snapshots should compress well (no major incremental cost for a few more items) + // - its always good to avoid a second store roundtrip + // - splitting a batched write into multiple writes with multiple events misrepresents the facts i.e. we did + // not have 10 items 2s ago and 3 just now - we had 13 2s ago let capacityNow = capacity freshItems currentIds let acceptingCount = min capacityNow freshItems.Length let closing = acceptingCount = capacityNow @@ -61,9 +61,14 @@ let decide capacity candidates (currentIds, closed as state) = { accepted = addedItemIds; residual = residualItems; content = currentIds; closed = closed }, events /// Used by the Ingester to manages ingestion of items into the epoch, i.e. the Write side -type IngestionService internal (capacity, resolve : ItemTrancheId * ItemEpochId -> Equinox.Decider) = +type IngestionService internal + ( capacity : Events.Item[] -> ItemId[] -> int, // let outer layers decide how many candidate items to accept given supplied Epoch state + resolve : ItemTrancheId * ItemEpochId -> Equinox.Decider) = /// Obtains a complete list of all the items in the specified trancheId/epochId + /// NOTE this exposes stale cached state - which is OK for the ingestion use case it needs to deal with such stale + /// state as part of handling normal race conditions, but should not be passed to anyone that's going to infer the + /// current state or version of the Epoch member _.ReadIds(trancheId, epochId) : Async = let decider = resolve (trancheId, epochId) decider.Query fst @@ -72,6 +77,9 @@ type IngestionService internal (capacity, resolve : ItemTrancheId * ItemEpochId /// and facilitate deduplication of incoming items in order to avoid null store round-trips where possible member _.Ingest(trancheId, epochId, items) : Async = let decider = resolve (trancheId, epochId) + /// NOTE decider which will initially transact against potentially stale cached state, which will trigger a + /// resync if another writer has gotten in before us. This is a conscious decision in this instance; the bulk + /// of writes are presumed to be coming from within this same process decider.Transact(decide capacity items) let private create capacity resolveStream = @@ -99,7 +107,7 @@ module Cosmos = /// Custom Fold and caching logic compared to the IngesterService /// - When reading, we want the full Items /// - Caching only for one minute -/// - There's no value in using the snapshot +/// - There's no value in using the snapshot as it does not have the full state module Reader = type ReadState = Events.Item[] * bool