From 4025bb0b4c846e70c7dbd425981796b49e00c5ee Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Tue, 18 Feb 2020 08:44:25 +0000 Subject: [PATCH] Complete Process Manager Apply --- equinox-fc/Domain/InventoryTransaction.fs | 78 ++++++++++------------- 1 file changed, 35 insertions(+), 43 deletions(-) diff --git a/equinox-fc/Domain/InventoryTransaction.fs b/equinox-fc/Domain/InventoryTransaction.fs index 7d5c4d988..a0088fcab 100644 --- a/equinox-fc/Domain/InventoryTransaction.fs +++ b/equinox-fc/Domain/InventoryTransaction.fs @@ -33,7 +33,8 @@ module Events = | Added of Added (* Successful completion *) - | Completed // terminal + | Logged + | Completed interface TypeShape.UnionContract.IUnionContract let codec = FsCodec.NewtonsoftJson.Codec.Create() @@ -42,6 +43,7 @@ module Fold = type State = | Initial | Running of RunningState + | Logging of TerminalState | Completed of TerminalState and RunningState = | Adjust of Events.AdjustmentRequested @@ -61,10 +63,10 @@ module Fold = let evolve state event = match state, event with (* Adjustment Process *) - | Initial, Events.AdjustmentRequested e -> - Running (Adjust e) - | Running (Adjust s), Events.Adjusted -> - Completed (Adjusted s) + | Initial, Events.AdjustmentRequested r -> + Running (Adjust r) + | Running (Adjust r), Events.Adjusted -> + Logging (Adjusted r) (* Transfer Process *) | Initial, Events.TransferRequested e -> @@ -73,57 +75,47 @@ module Fold = | Running (Transfer (Requested s)), Events.Failed -> Completed (TransferFailed s) - | Running (Transfer (Requested s)), Events.Removed e as ee -> + | Running (Transfer (Requested s)), Events.Removed e -> Running (Transfer (Adding { request = s; removed = e })) - | Running (Transfer (Adding s)), Events.Added e as ee -> + | Running (Transfer (Adding s)), Events.Added e -> Running (Transfer (Added { request = s.request; removed = s.removed; added = e })) - | Running (Transfer (Added s)), Events.Completed as ee -> - Completed (Transferred s) + | Running (Transfer (Added s)), Events.Completed -> + Logging (Transferred s) + + (* Log result *) + | Logging s, Events.Logged -> + Completed s (* Any disallowed state changes represent gaps in the model, so we fail fast *) | state, event -> failwithf "Unexpected %A when %A" event state let fold : State -> Events.Event seq -> State = Seq.fold evolve -type Command = - | Adjust of Events.AdjustmentRequested - | Transfer of Events.TransferRequested - -type Result = { complete : bool; } - -/// Holds events accumulated from a series of decisions while also evolving the presented `state` to reflect the pended events -type private Accumulator() = - let acc = ResizeArray() - member __.Ingest(state) : 'res * Events.Event list -> 'res * Fold.State = function - | res, [] -> res, state - | res, [e] -> acc.Add e; res, Fold.evolve state e - | res, xs -> acc.AddRange xs; res, Fold.fold state (Seq.ofList xs) - member __.Accumulated = List.ofSeq acc - -type Update = - | AdjustmentCompleted - -let decide command updates (state : Fold.State) : Result * Events.Event list = - - let acc = Accumulator() - let started, state = - acc.Ingest state <| - match state, command with - | Fold.Initial, Adjust e -> true, [ Events.AdjustmentRequested e ] - | Fold.Initial, Transfer e -> true, [ Events.TransferRequested e ] - - // TOCONSIDER validate conflicts - - | _ -> false, [] - - { complete = false }, acc.Accumulated + /// Validates an event actually represents an acceptable, non-redundant state transition + let filter event state = + match state, event with + | Initial, Events.AdjustmentRequested _ + | Initial, Events.TransferRequested _ + | Running (Adjust _), Events.Adjusted + | Running (Transfer (Requested _)), Events.Failed + | Running (Transfer (Requested _)), Events.Removed _ + | Running (Transfer (Adding _)), Events.Added _ + | Logging _, Events.Logged -> + [event] + | _ -> [] + +/// Given an event from the Process's timeline, yields the State, in order that it can be completed +let decide update (state : Fold.State) : Fold.State * Events.Event list = + let events = Fold.filter update state + let state' = Fold.fold state events + state', events type Service internal (log, resolve, maxAttempts) = let resolve (Events.For streamId) = Equinox.Stream(log, resolve streamId, maxAttempts) - member __.IngestShipped(inventoryId, transactionId, command, updates) : Async = + member __.Apply(transactionId, update) : Async = let stream = resolve transactionId - stream.Transact(decide command updates) + stream.Transact(decide update) let createService resolve = Service(Serilog.Log.ForContext(), resolve, maxAttempts = 2)