Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EventStoreSource #1

Merged
merged 2 commits into from
Jun 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ The `Unreleased` section name is replaced by the expected version of next releas
## [Unreleased]

### Added

- `Propulsion.EventStore.EventStoreSource` (productized from `Equinox.Templates`'s `eqxsync`)

### Changed

- Targets `Microsoft.Azure.DocumentDB.ChangeFeedProcessor` v `2.2.7`, which includes critical lease management improvements
Expand Down
5 changes: 3 additions & 2 deletions src/Propulsion.Cosmos/CosmosSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,10 @@ type CosmosSink =
let statsInterval, stateInterval = defaultArg statsInterval (TimeSpan.FromMinutes 5.), defaultArg stateInterval (TimeSpan.FromMinutes 5.)
let projectionStats = Internal.CosmosStats(log.ForContext<Internal.CosmosStats>(), categorize, statsInterval, stateInterval)
let dispatcher = Propulsion.Streams.Scheduling.Dispatcher<_>(maxConcurrentStreams)
let dumpStats (s : Scheduling.StreamStates<_>) l = s.Dump(l, Propulsion.Streams.Buffering.StreamState.eventsSize, categorize)
let streamScheduler = Internal.CosmosSchedulingEngine.Create(log, cosmosContexts, dispatcher, projectionStats, dumpStats)
let dumpStreams (s : Scheduling.StreamStates<_>) l = s.Dump(l, Propulsion.Streams.Buffering.StreamState.eventsSize, categorize)
let streamScheduler = Internal.CosmosSchedulingEngine.Create(log, cosmosContexts, dispatcher, projectionStats, dumpStreams)
let mapBatch onCompletion (x : Submission.SubmissionBatch<StreamEvent<_>>) : Scheduling.StreamsBatch<_> =
let onCompletion () = x.onCompletion(); onCompletion()
Scheduling.StreamsBatch.Create(onCompletion, x.messages) |> fst
let submitBatch (x : Scheduling.StreamsBatch<_>) : int =
streamScheduler.Submit x
Expand Down
110 changes: 110 additions & 0 deletions src/Propulsion.EventStore/Checkpoint.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
module Propulsion.EventStore.Checkpoint

open FSharp.UMX
open System // must shadow UMX to use DateTimeOffSet

type CheckpointSeriesId = string<checkpointSeriesId>
and [<Measure>] checkpointSeriesId
module CheckpointSeriesId = let ofGroupName (groupName : string) = UMX.tag groupName

// NB - these schemas reflect the actual storage formats and hence need to be versioned with care
module Events =
type Checkpoint = { at: DateTimeOffset; nextCheckpointDue: DateTimeOffset; pos: int64 }
type Config = { checkpointFreqS: int }
type Started = { config: Config; origin: Checkpoint }
type Checkpointed = { config: Config; pos: Checkpoint }
type Unfolded = { config: Config; state: Checkpoint }
type Event =
| Started of Started
| Checkpointed of Checkpointed
| Overrode of Checkpointed
| [<System.Runtime.Serialization.DataMember(Name="state-v1")>]
Unfolded of Unfolded
interface TypeShape.UnionContract.IUnionContract

module Folds =
type State = NotStarted | Running of Events.Unfolded

let initial : State = NotStarted
let private evolve _ignoreState = function
| Events.Started { config = cfg; origin=originState } -> Running { config = cfg; state = originState }
| Events.Checkpointed e | Events.Overrode e -> Running { config = e.config; state = e.pos }
| Events.Unfolded runningState -> Running runningState
let fold (state: State) = Seq.fold evolve state
let isOrigin _state = true // we can build a state from any of the events and/or an unfold
let unfold state =
match state with
| NotStarted -> failwith "should never produce a NotStarted state"
| Running state -> Events.Unfolded {config = state.config; state=state.state}

/// We only want to generate a first class event every N minutes, while efficiently writing contingent on the current etag value
//let postProcess events state =
// let checkpointEventIsRedundant (e: Events.Checkpointed) (s: Events.Unfolded) =
// s.state.nextCheckpointDue = e.pos.nextCheckpointDue
// && s.state.pos <> e.pos.pos
// match events, state with
// | [Events.Checkpointed e], (Running state as s) when checkpointEventIsRedundant e state ->
// [],unfold s
// | xs, state ->
// xs,unfold state

type Command =
| Start of at: DateTimeOffset * checkpointFreq: TimeSpan * pos: int64
| Override of at: DateTimeOffset * checkpointFreq: TimeSpan * pos: int64
| Update of at: DateTimeOffset * pos: int64

module Commands =
let interpret command (state : Folds.State) =
let mkCheckpoint at next pos = { at=at; nextCheckpointDue = next; pos = pos } : Events.Checkpoint
let mk (at : DateTimeOffset) (interval: TimeSpan) pos : Events.Config * Events.Checkpoint=
let freq = int interval.TotalSeconds
let next = at.AddSeconds(float freq)
{ checkpointFreqS = freq }, mkCheckpoint at next pos
match command, state with
| Start (at, freq, pos), Folds.NotStarted ->
let config, checkpoint = mk at freq pos
[Events.Started { config = config; origin = checkpoint}]
| Override (at, freq, pos), Folds.Running _ ->
let config, checkpoint = mk at freq pos
[Events.Overrode { config = config; pos = checkpoint}]
| Update (at,pos), Folds.Running state ->
// Force a write every N seconds regardless of whether the position has actually changed
if state.state.pos = pos && at < state.state.nextCheckpointDue then [] else
let freq = TimeSpan.FromSeconds <| float state.config.checkpointFreqS
let config, checkpoint = mk at freq pos
[Events.Checkpointed { config = config; pos = checkpoint}]
| c, s -> failwithf "Command %A invalid when %A" c s

type Service(log, resolveStream, ?maxAttempts) =
let (|AggregateId|) (id : CheckpointSeriesId) = Equinox.AggregateId ("Sync", % id)
let (|Stream|) (AggregateId id) = Equinox.Stream(log, resolveStream id, defaultArg maxAttempts 3)
let execute (Stream stream) cmd = stream.Transact(Commands.interpret cmd)

/// Determines the present state of the CheckpointSequence
member __.Read(Stream stream) =
stream.Query id

/// Start a checkpointing series with the supplied parameters
/// NB will fail if already existing; caller should select to `Start` or `Override` based on whether Read indicates state is Running Or NotStarted
member __.Start(id, freq: TimeSpan, pos: int64) =
execute id <| Command.Start(DateTimeOffset.UtcNow, freq, pos)

/// Override a checkpointing series with the supplied parameters
/// NB fails if not already initialized; caller should select to `Start` or `Override` based on whether Read indicates state is Running Or NotStarted
member __.Override(id, freq: TimeSpan, pos: int64) =
execute id <| Command.Override(DateTimeOffset.UtcNow, freq, pos)

/// Ingest a position update
/// NB fails if not already initialized; caller should ensure correct initialization has taken place via Read -> Start
member __.Commit(id, pos: int64) =
execute id <| Command.Update(DateTimeOffset.UtcNow, pos)

// General pattern is that an Equinox Service is a singleton and calls pass an inentifier for a stream per call
// This light wrapper means we can adhere to that general pattern yet still end up with lef=gible code while we in practice only maintain a single checkpoint series per running app
type CheckpointSeries(name, log, resolveStream) =
let seriesId = CheckpointSeriesId.ofGroupName name
let inner = Service(log, resolveStream)
member __.Read = inner.Read seriesId
member __.Start(freq, pos) = inner.Start(seriesId, freq, pos)
member __.Override(freq, pos) = inner.Override(seriesId, freq, pos)
member __.Commit(pos) = inner.Commit(seriesId, pos)
Loading