Skip to content

Commit

Permalink
Release 4.0.0-rc.14
Browse files Browse the repository at this point in the history
Target FsCodec 3.0.0-rc.13
Tweaks
  • Loading branch information
bartelink committed Sep 11, 2023
1 parent 832d5e9 commit 84a6f95
Show file tree
Hide file tree
Showing 12 changed files with 127 additions and 88 deletions.
117 changes: 78 additions & 39 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -315,19 +315,22 @@ highly recommended to use the following canonical skeleton layout:
```fsharp
module Aggregate
module Stream =
module private Stream =
let [<Literal>] Category = "category"
let id = FsCodec.StreamId.gen Id.toString
(* Optionally, Helpers/Types *)
(* Optionally (rarely) Helpers/Types *)
// NOTE - these types and the union case names reflect the actual storage
// formats and hence need to be versioned with care
[<RequiredQualifiedAccess>]
module Events =
type Snapshotted = ... // NOTE: event body types past tense with same name as case
type Event =
| ...
| [<DataMember(Name = "Snapshotted">] Snapshotted of Snapshotted // NOTE: Snapshotted event explictly named to remind one can/should version it
// optionally: `encode`, `tryDecode` (only if you're doing manual decoding)
let codec = FsCodec ... Codec.Create<Event>(...)
```
Expand All @@ -339,6 +342,44 @@ Some notes about the intents being satisfied here:
sibling code in adjacent `module`s should not be using them directly (in
general interaction should be via the `type Service`)

✅ DO keep `module Stream` visibility `private`, present via `module Reactions`

If the composition of stream names is relevant for Reactions processing, expose relevant helpers in a `module Reactions` facade.
For instance, rather than having external reaction logic refer to `Aggregate.Stream.Category`, expose a facade such as:

```fsharp
module Reactions =
let streamName = Stream.name
let deletionNamePrefix tenantIdStr = $"%s{Stream.Category}-%s{tenantIdStr}"
```

✅ DO use tupled arguments for the `Stream.id` function

All the inputs of which the `StreamId` is composed should be represented as one argument:

```fsharp
✅ let id struct (tenantId, clientId) = FsCodec.StreamId.gen2 TenantId.toString ClientId.toString (tenantId, clientId)
✅ let id = FsCodec.StreamId.gen2 TenantId.toString ClientId.toString
❌ let id tenantId clientId = FsCodec.StreamId.gen2 TenantId.toString ClientId.toString (tenantId, clientId)
```

✅ DO keep `module Stream` visibility `private`, present via `module Reactions`

If the composition of stream names is relevant for Reactions processing, expose relevant helpers in a `module Reactions` facade.
For instance, rather than having external reaction logic refer to `Aggregate.Stream.Category`, expose a facade such as:

```fsharp
module Reactions =
let streamName = Stream.name
let deletionNamePrefix tenantIdStr = $"%s{Stream.Category}-%s{tenantIdStr}"
let [<return: Struct>] (|For|_|) = Stream.tryDecode
let [<return: Struct>] (|Decode|_|) = function
| struct (For id, _) & Streams.Decode dec events -> ValueSome struct (id, events)
| _ -> ValueNone
```

```fsharp
module Fold =
Expand Down Expand Up @@ -411,17 +452,18 @@ either within the `module Aggregate`, or somewhere outside closer to the

```fsharp
let defaultCacheDuration = System.TimeSpan.FromMinutes 20.
let cacheStrategy = Equinox.CosmosStore.CachingStrategy.SlidingWindow (cache, defaultCacheDuration)
let cacheStrategy cache = Equinox.CosmosStore.CachingStrategy.SlidingWindow (cache, defaultCacheDuration)
module EventStore =
let accessStrategy = Equinox.EventStoreDb.AccessStrategy.RollingSnapshots (Fold.isOrigin, Fold.snapshot)
let accessStrategy = Equinox.EventStoreDb.AccessStrategy.RollingSnapshots Fold.Snapshot.config
let category (context, cache) =
Equinox.EventStore.EventStoreCategory(context, Stream.Category, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
Equinox.EventStore.EventStoreCategory(context, Stream.Category, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy cache)
module Cosmos =
let accessStrategy = Equinox.CosmosStore.AccessStrategy.Snapshot Fold.Snapshot.config
let category (context, cache) =
Equinox.CosmosStore.CosmosStoreCategory(context, Stream.Category, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy)
Equinox.CosmosStore.CosmosStoreCategory(context, Stream.Category, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy cache)
```

### `MemoryStore` Storage Binding Module

Expand All @@ -432,7 +474,7 @@ can use the `MemoryStore` in the context of your tests:
```fsharp
module MemoryStore =
let category (store: Equinox.MemoryStore.VolatileStore) =
Equinox.MemoryStore.MemoryStoreCategory(store, Category, Events.codec, Fold.fold, Fold.initial)
Equinox.MemoryStore.MemoryStoreCategory(store, Stream.Category, Events.codec, Fold.fold, Fold.initial)
```

Typically that binding module can live with your test helpers rather than
Expand Down Expand Up @@ -479,7 +521,7 @@ events on a given category of stream:
tests and used to parameterize the Category's storage configuration._.
Sometimes named `apply`)

- `interpret: (context/command etc ->) 'state -> event' list` or `decide: (context/command etc ->) 'state -> 'result*'event list`: responsible for _Deciding_ (in an [idempotent](https://en.wikipedia.org/wiki/Idempotence) manner) how the intention represented by `context/command` should be mapped with regard to the provided `state` in terms of:
- `interpret: (context/command etc ->) 'state -> 'event[]` or `decide: (context/command etc ->) 'state -> 'result * 'event[]`: responsible for _Deciding_ (in an [idempotent](https://en.wikipedia.org/wiki/Idempotence) manner) how the intention represented by `context/command` should be mapped with regard to the provided `state` in terms of:
a) the `'events` that should be written to the stream to record the decision
b) (for the `'result` in the `decide` signature) any response to be returned to the invoker (NB returning a result likely represents a violation of the [CQS](https://en.wikipedia.org/wiki/Command%E2%80%93query_separation) and/or CQRS principles, [see Synchronous Query in the Glossary](#glossary))

Expand Down Expand Up @@ -574,10 +616,10 @@ type Command =
| Remove of itemId: int
let interpret command state =
let has id = state |> List.exits (is id)
let has id = state |> List.exists (is id)
match command with
| Add item -> if has item.id then [] else [Added item]
| Remove id -> if has id then [Removed id] else []
| Add item -> if has item.id then [||] else [| Added item |]
| Remove id -> if has id then [| Removed id |] else [||]
(*
* Optional: Snapshot/Unfold-related functions to allow establish state
Expand Down Expand Up @@ -710,15 +752,15 @@ follow!

```fsharp
type Equinox.Decider(...) =
StoreIntegration
// Run interpret function with present state, retrying with Optimistic Concurrency
member _.Transact(interpret: State -> Event list): Async<unit>
member _.Transact(interpret: 'state -> 'event[]): Async<unit>
// Run decide function with present state, retrying with Optimistic Concurrency, yielding Result on exit
member _.Transact(decide: State -> Result*Event list): Async<Result>
member _.Transact(decide: 'state -> 'result * 'event[]): Async<'result>
// Runs a Null Flow that simply yields a `projection` of `Context.State`
member _.Query(projection: State -> View): Async<View>
member _.Query(projection: 'state -> 'view): Async<'view>
```

### Favorites walkthrough
Expand Down Expand Up @@ -789,8 +831,8 @@ type Command =
| Remove of string
let interpret command state =
match command with
| Add sku -> if state |> List.contains sku then [] else [Added sku]
| Remove sku -> if state |> List.contains sku |> not then [] else [Removed sku]
| Add sku -> if state |> List.contains sku then [||] else [| Added sku |]
| Remove sku -> if state |> List.contains sku |> not then [||] else [| Removed sku |]
```

Command handling should almost invariably be implemented in an
Expand Down Expand Up @@ -1006,13 +1048,13 @@ let fold = Array.fold evolve
type Command = Add of Todo | Update of Todo | Delete of id: int | Clear
let interpret c (state: State) =
match c with
| Add value -> [Added { value with id = state.nextId }]
| Add value -> [| Added { value with id = state.nextId } |]
| Update value ->
match state.items |> List.tryFind (function { id = id } -> id = value.id) with
| Some current when current <> value -> [Updated value]
| _ -> []
| Delete id -> if state.items |> List.exists (fun x -> x.id = id) then [Deleted id] else []
| Clear -> if state.items |> List.isEmpty then [] else [Cleared]
| Some current when current <> value -> [| Updated value |]
| _ -> [||]
| Delete id -> if state.items |> List.exists (fun x -> x.id = id) then [| Deleted id |] else [||]
| Clear -> if state.items |> List.isEmpty then [||] else [| Cleared |]
```

- Note `Add` does not adhere to the normal idempotency constraint, being
Expand Down Expand Up @@ -1130,7 +1172,7 @@ In this case, the Decision Process is `interpret`ing the _Command_ in the
context of a `'state`.

The function signature is:
`let interpret (context, command, args) state: Events.Event list`
`let interpret (context, command, args) state: Events.Event[]`

Note the `'state` is the last parameter; it's computed and supplied by the
Equinox Flow.
Expand All @@ -1147,12 +1189,12 @@ conflicting write have taken place since the loading of the state_

```fsharp
let interpret (context, command) state: Events.Event list =
let interpret (context, command) state: Events.Event[] =
match tryCommand context command state with
| None ->
[] // not relevant / already in effect
[||] // not relevant / already in effect
| Some eventDetails -> // accepted, mapped to event details record
[Event.HandledCommand eventDetails]
[| Events.HandledCommand eventDetails |]
type Service internal (resolve: ClientId -> Equinox.Decider<Events.Event, Fold.State>)
Expand Down Expand Up @@ -1180,20 +1222,20 @@ signature: you're both potentially emitting events and yielding an outcome or
projecting some of the 'state'.

In this case, the signature is: `let decide (context, command, args) state:
'result * Events.Event list`
'result * Events.Event[]`

Note that the return value is a _tuple_ of `('result,Event list):
Note that the return value is a _tuple_ of `('result, Events.Event[])`:
- the `fst` element is returned from `decider.Transact`
- the `snd` element of the tuple represents the events (if any) that should
represent the state change implied by the request.with
represent the state change implied by the request.

Note if the decision function yields events, and a conflict is detected, the
flow may result in the `decide` function being rerun with the conflicting state
until either no events are emitted, or there were on further conflicting writes
supplied by competing writers.

```fsharp
let decide (context, command) state: int * Events.Event list =
let decide (context, command) state: int * Events.Event[] =
// ... if `snd` contains event, they are written
// `fst` (an `int` in this instance) is returned as the outcome to the caller
Expand Down Expand Up @@ -1259,7 +1301,7 @@ let validateInterpret contextAndOrArgsAndOrCommand state =
let validateIdempotent contextAndOrArgsAndOrCommand state' =
let events' = interpret contextAndOrArgsAndOrCommand state'
match events' with
| [|] -> ()
| [||] -> ()
// TODO add clauses to validate edge cases that should still generate events on a re-run
| xs -> failwithf "Not idempotent; Generated %A in response to %A" xs contextAndOrArgsAndOrCommand
```
Expand Down Expand Up @@ -1317,7 +1359,7 @@ type Service internal (resolve: CartId -> Equinox.Decider<Events.Event, Fold.Sta
member _.Run(cartId, optimistic, commands: Command seq, ?prepare): Async<Fold.State> =
let decider = resolve cartId
let opt = if optimistic then Equinox.AnyCachedValue else Equinox.RequireLoad
let opt = if optimistic then Equinox.LoadOption.AnyCachedValue else Equinox.LoadOption.RequireLoad
decider.Transact(fun state -> async {
match prepare with None -> () | Some prep -> do! prep
return interpretMany Fold.fold (Seq.map interpret commands) state }, opt)
Expand Down Expand Up @@ -1379,7 +1421,7 @@ type Accumulator<'event, 'state>(fold: 'state -> 'event[] -> 'state, originState
type Service ... =
member _.Run(cartId, optimistic, commands: Command seq, ?prepare): Async<Fold.State> =
let decider = resolve cartId
let opt = if optimistic then Equinox.AnyCachedValue else Equinox.RequireLoad
let opt = if optimistic then Equinox.LoadOption.AnyCachedValue else Equinox.LoadOption.RequireLoad
decider.Transact(fun state -> async {
match prepare with None -> () | Some prep -> do! prep
let acc = Accumulator(Fold.fold, state)
Expand Down Expand Up @@ -1453,11 +1495,8 @@ Key aspects relevant to the Equinox programming model:
- In general, EventStore provides excellent caching and performance
characteristics intrinsically by virtue of its design

- Projections can be managed by either tailing streams (including the synthetic
`$all` stream) or using the Projections facility - there's no obvious reason
to wrap it, aside from being able to uniformly target CosmosDB (i.e. one
could build an `Equinox.EventStore.Projection` library and an `eqx project
stats es` with very little code).
- Projections can be managed by the `Propulsion.EventStoreDb` library; there is also
an `eqx project stats es` feature).

- In general event streams should be considered append only, with no mutations
or deletes
Expand Down Expand Up @@ -2339,7 +2378,7 @@ For Domain Events in an event-sourced model, their permanence and immutability i

It should be noted with regard to such requirements:
- EventStoreDB does not present any APIs for mutation of events, though deleting events is a fully supported operation (although that can be restricted). Rewrites are typically approached by doing an offline database rebuild.
- `Equinox.Cosmos` and `Equinox.CosmosStore` include support for pruning events (only) from the head of a stream. Obviously, there's nothing stopping you deleting or altering the Batch documents out of band via the underlying CosmosDB APIs directly (Note however that the semantics of document ordering within a logical partition means its strongly advised not to mutate any event Batch documents as this will cause their ordering to become incorrect relative to other events, invalidating a key tenet that Change Feed Processors rely on).
- `Equinox.CosmosStore` includes support for pruning events (only) from the head of a stream. Obviously, there's nothing stopping you deleting or altering the Batch documents out of band via the underlying CosmosDB APIs directly (Note however that the semantics of document ordering within a logical partition means its strongly advised not to mutate any event Batch documents as this will cause their ordering to become incorrect relative to other events, invalidating a key tenet that Change Feed Processors rely on).

### Growth handling strategies

Expand Down
2 changes: 1 addition & 1 deletion samples/Infrastructure/Services.fs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type Store(store) =
CosmosStore.CosmosStoreCategory<'event,'state,_>(store, name, codec.ToJsonElementCodec(), fold, initial, accessStrategy, caching)
| Store.Config.Dynamo (store, caching, unfolds) ->
let accessStrategy = if unfolds then DynamoStore.AccessStrategy.Snapshot snapshot else DynamoStore.AccessStrategy.Unoptimized
DynamoStore.DynamoStoreCategory<'event,'state,_>(store, name, FsCodec.Deflate.EncodeTryDeflate codec, fold, initial, accessStrategy, caching)
DynamoStore.DynamoStoreCategory<'event,'state,_>(store, name, FsCodec.Compression.EncodeTryCompress codec, fold, initial, accessStrategy, caching)
| Store.Config.Es (context, caching, unfolds) ->
let accessStrategy = if unfolds then EventStoreDb.AccessStrategy.RollingSnapshots snapshot else EventStoreDb.AccessStrategy.Unoptimized
EventStoreDb.EventStoreCategory<'event,'state,_>(context, name, codec, fold, initial, accessStrategy, caching)
Expand Down
2 changes: 1 addition & 1 deletion samples/Infrastructure/Store.fs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ module Dynamo =

let config (log : ILogger) (cache, unfolds) (a : Arguments) =
a.Connector.LogConfiguration(log)
let client = a.Connector.CreateDynamoDbClient() |> DynamoStoreClient
let client = a.Connector.CreateDynamoStoreClient()
let context = DynamoStoreContext(client, a.Table, maxBytes = a.TipMaxBytes, queryMaxItems = a.QueryMaxItems,
?tipMaxEvents = a.TipMaxEvents, ?archiveTableName = a.ArchiveTable)
context.LogConfiguration(log, "Main", a.Table, ?archiveTableName = a.ArchiveTable)
Expand Down
4 changes: 2 additions & 2 deletions samples/Store/Domain/Domain.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
<ItemGroup>
<PackageReference Include="FSharp.Core" Version="6.0.7" ExcludeAssets="contentfiles" />

<PackageReference Include="FsCodec.NewtonsoftJson" Version="3.0.0-rc.10.1" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.10.1" />
<PackageReference Include="FsCodec.NewtonsoftJson" Version="3.0.0-rc.13" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.13" />

<ProjectReference Include="..\..\..\src\Equinox\Equinox.fsproj" />
</ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/Tutorial.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

<ItemGroup>
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.10.1" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.13" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.1.0" />
<PackageReference Include="Serilog.Sinks.Seq" Version="5.2.0" />
</ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/Upload.fs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ let decide (value : UploadId) (state : Fold.State) : Choice<UploadId,UploadId> *
| None -> Choice1Of2 value, [| Events.IdAssigned { value = value} |]
| Some value -> Choice2Of2 value, [||]

type Service internal (resolve : CompanyId * PurchaseOrderId -> Equinox.Decider<Events.Event, Fold.State>) =
type Service internal (resolve: struct (CompanyId * PurchaseOrderId) -> Equinox.Decider<Events.Event, Fold.State>) =

member _.Sync(companyId, purchaseOrderId, value) : Async<Choice<UploadId,UploadId>> =
let decider = resolve (companyId, purchaseOrderId)
Expand Down
4 changes: 2 additions & 2 deletions src/Equinox.CosmosStore/CosmosStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1347,8 +1347,8 @@ type CosmosStoreCategory<'event, 'state, 'req> =
match access with
| AccessStrategy.Unoptimized -> (fun _ -> false), false, Choice1Of3 ()
| AccessStrategy.LatestKnownEvent -> (fun _ -> true), true, Choice2Of3 (fun events _ -> events |> Array.last |> Array.singleton)
| AccessStrategy.Snapshot (isOrigin, toSnapshot) -> isOrigin, true, Choice2Of3 (fun _ state -> toSnapshot state |> Array.singleton)
| AccessStrategy.MultiSnapshot (isOrigin, unfold) -> isOrigin, true, Choice2Of3 (fun _ state -> unfold state)
| AccessStrategy.Snapshot (isOrigin, toSnapshot) -> isOrigin, true, Choice2Of3 (fun _ -> toSnapshot >> Array.singleton)
| AccessStrategy.MultiSnapshot (isOrigin, unfold) -> isOrigin, true, Choice2Of3 (fun _ -> unfold)
| AccessStrategy.RollingState toSnapshot -> (fun _ -> true), true, Choice3Of3 (fun _ state -> Array.empty, toSnapshot state |> Array.singleton)
| AccessStrategy.Custom (isOrigin, transmute) -> isOrigin, true, Choice3Of3 transmute
{ inherit Equinox.Category<'event, 'state, 'req>(name,
Expand Down
Loading

0 comments on commit 84a6f95

Please sign in to comment.