Skip to content

Commit

Permalink
MessageDb: propulsion checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Nov 18, 2022
1 parent 993ffd0 commit f2d75ba
Show file tree
Hide file tree
Showing 11 changed files with 168 additions and 160 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Propulsion.EventStoreDb`: Ported `EventStore` to target `Equinox.EventStore` >= `4.0.0` (using the gRPC interface) [#139](https://github.com/jet/propulsion/pull/139)
- `Propulsion.CosmosStore3`: Special cased version of `Propulsion.CosmosStore` to target `Equinox.CosmosStore` v `[3.0.7`-`3.99.0]` **Deprecated; Please migrate to `Propulsion.CosmosStore` by updating `Equinox.CosmosStore` dependencies to `4.0.0`** [#139](https://github.com/jet/propulsion/pull/139)
- `Propulsion.DynamoStore`: `Equinox.CosmosStore`-equivalent functionality for `Equinox.DynamoStore`. Combines elements of `CosmosStore`, `SqlStreamStore`, `Feed` [#140](https://github.com/jet/propulsion/pull/143) [#140](https://github.com/jet/propulsion/pull/143) [#177](https://github.com/jet/propulsion/pull/177)
- `Propulsion.MessageDb`: `FeedSource` for [MessageDb](http://docs.eventide-project.org/user-guide/message-db/) [#181](https://github.com/jet/propulsion/pull/181) :pray: [@nordfjord](https://github.com/nordfjord)
- `Propulsion.MessageDb`: `FeedSource` and `CheckpointStore` for [MessageDb](http://docs.eventide-project.org/user-guide/message-db/) [#181](https://github.com/jet/propulsion/pull/181) :pray: [@nordfjord](https://github.com/nordfjord)
- `Propulsion.MemoryStore`: `MemoryStoreSource` to align with other sources for integration testing. Includes *deterministic* `AwaitCompletion` as per `Propulsion.Feed`-based Sources [#165](https://github.com/jet/propulsion/pull/165)
- `Propulsion.SqlStreamStore`: Added `startFromTail` [#173](https://github.com/jet/propulsion/pull/173)
- `Propulsion.Tool`: `checkpoint` commandline option; enables viewing or overriding checkpoints [#141](https://github.com/jet/propulsion/pull/141)
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ The ubiquitous `Serilog` dependency is solely on the core module, not any sinks.

- `Propulsion.Tool` [![Tool NuGet](https://img.shields.io/nuget/v/Propulsion.Tool.svg)](https://www.nuget.org/packages/Propulsion.Tool/): Tool used to initialize a Change Feed Processor `aux` container for `Propulsion.Cosmos` and demonstrate basic projection, including to Kafka. See [quickstart](#quickstart).

- CosmosDB: Initialize `-aux` Container for ChangeFeedProcessor
- CosmosDB/DynamoStore/EventStoreDB/Feed/SqlStreamStore: adjust checkpoints
- CosmosDB/DynamoStore/EventStoreDB/MessageDb: walk change feeds/indexes and/or project to Kafka
- DynamoStore: validate and/or reindex DynamoStore Index
- MessageDb: Initialize a checkpoints table in a Postgres Database
- `init` CosmosDB: Initialize an `-aux` Container for ChangeFeedProcessor
- `initpg` : MessageDb: Initialize a checkpoints table in a Postgres Database
- `index`: DynamoStore: validate and/or reindex DynamoStore Index
- `checkpoint`: CosmosDB/DynamoStore/EventStoreDB/Feed/SqlStreamStore: adjust checkpoints in DynamoStore/CosmosStore/Postgres
- `project`: CosmosDB/DynamoStore/EventStoreDB/MessageDb: walk change feeds/indexes and/or project to Kafka

## Deprecated components

Expand Down
13 changes: 0 additions & 13 deletions src/Propulsion.MessageDb/Internal.fs

This file was deleted.

74 changes: 40 additions & 34 deletions src/Propulsion.MessageDb/MessageDbSource.fs
Original file line number Diff line number Diff line change
@@ -1,24 +1,31 @@
namespace Propulsion.MessageDb

open FSharp.Control
open FsCodec
open FsCodec.Core
open FSharp.Control
open NpgsqlTypes
open Propulsion.Feed
open Propulsion.Feed.Core
open Propulsion.Internal
open System
open System.Data.Common
open System.Diagnostics

module internal Npgsql =

let connect connectionString ct = task {
let conn = new Npgsql.NpgsqlConnection(connectionString)
do! conn.OpenAsync(ct)
return conn }

module Internal =

open Propulsion.Feed
open System.Threading.Tasks
open Propulsion.Infrastructure // AwaitTaskCorrect

module Core =
type MessageDbCategoryClient(connectionString) =
let connect = Npgsql.connect connectionString
let parseRow (reader: DbDataReader) =
let readNullableString idx = if reader.IsDBNull(idx) then None else Some (reader.GetString idx)
let streamName = reader.GetString(8)
let event = TimelineEvent.Create(
let event = FsCodec.Core.TimelineEvent.Create(
index = reader.GetInt64(0),
eventType = reader.GetString(1),
data = ReadOnlyMemory(Text.Encoding.UTF8.GetBytes(reader.GetString 2)),
Expand All @@ -28,9 +35,9 @@ module Core =
?causationId = readNullableString 6,
context = reader.GetInt64(9),
timestamp = DateTimeOffset(DateTime.SpecifyKind(reader.GetDateTime(7), DateTimeKind.Utc)))

struct(StreamName.parse streamName, event)
member _.ReadCategoryMessages(category: TrancheId, fromPositionInclusive: int64, batchSize: int, ct) = task {

member _.ReadCategoryMessages(category: TrancheId, fromPositionInclusive: int64, batchSize: int, ct) : Task<Propulsion.Feed.Core.Batch<_>> = task {
use! conn = connect ct
let command = conn.CreateCommand(CommandText = "select position, type, data, metadata, id::uuid,
(metadata::jsonb->>'$correlationId')::text,
Expand All @@ -44,66 +51,65 @@ module Core =
let mutable checkpoint = fromPositionInclusive

use! reader = command.ExecuteReaderAsync(ct)
let events = [| while reader.Read() do yield parseRow reader |]
let events = [| while reader.Read() do parseRow reader |]

checkpoint <- match Array.tryLast events with Some (_, ev) -> unbox<int64> ev.Context | None -> checkpoint

return { checkpoint = Position.parse checkpoint; items = events; isTail = events.Length = 0 } }
member _.ReadCategoryLastVersion(category: TrancheId, ct) = task {
return ({ checkpoint = Position.parse checkpoint; items = events; isTail = events.Length = 0 } : Propulsion.Feed.Core.Batch<_>) }

member _.ReadCategoryLastVersion(category: TrancheId, ct) : Task<int64> = task {
use! conn = connect ct
let command = conn.CreateCommand(CommandText = "select max(global_position) from messages where category(stream_name) = @Category;")
command.Parameters.AddWithValue("Category", NpgsqlDbType.Text, TrancheId.toString category) |> ignore

use! reader = command.ExecuteReaderAsync(ct)
return if reader.Read() then reader.GetInt64(0) else 0L }

module private Impl =
open Core
open Propulsion.Infrastructure // AwaitTaskCorrect

let readBatch batchSize (store : MessageDbCategoryClient) (category, pos) : Async<Propulsion.Feed.Core.Batch<_>> = async {
let internal readBatch batchSize (store : MessageDbCategoryClient) (category, pos) : Async<Propulsion.Feed.Core.Batch<_>> = async {
let! ct = Async.CancellationToken
let positionInclusive = Position.toInt64 pos
let! x = store.ReadCategoryMessages(category, positionInclusive, batchSize, ct) |> Async.AwaitTaskCorrect
return x }
return! store.ReadCategoryMessages(category, positionInclusive, batchSize, ct) |> Async.AwaitTaskCorrect }

let readTailPositionForTranche (store : MessageDbCategoryClient) trancheId : Async<Propulsion.Feed.Position> = async {
let internal readTailPositionForTranche (store : MessageDbCategoryClient) trancheId : Async<Propulsion.Feed.Position> = async {
let! ct = Async.CancellationToken
let! lastEventPos = store.ReadCategoryLastVersion(trancheId, ct) |> Async.AwaitTaskCorrect
return Position.parse lastEventPos }

type MessageDbSource
type MessageDbSource internal
( log : Serilog.ILogger, statsInterval,
client: Core.MessageDbCategoryClient, batchSize, tailSleepInterval,
client: Internal.MessageDbCategoryClient, batchSize, tailSleepInterval,
checkpoints : Propulsion.Feed.IFeedCheckpointStore, sink : Propulsion.Streams.Default.Sink,
categories,
// Override default start position to be at the tail of the index. Default: Replay all events.
?startFromTail,
?sourceId) =
tranches, ?startFromTail, ?sourceId) =
inherit Propulsion.Feed.Core.TailingFeedSource
( log, statsInterval, defaultArg sourceId FeedSourceId.wellKnownId, tailSleepInterval, checkpoints,
( if startFromTail <> Some true then None
else Some (Impl.readTailPositionForTranche client)),
else Some (Internal.readTailPositionForTranche client)),
sink,
(fun req -> asyncSeq {
let sw = Stopwatch.StartNew()
let! b = Impl.readBatch batchSize client req
let sw = Stopwatch.start ()
let! b = Internal.readBatch batchSize client req
yield sw.Elapsed, b }),
string)
new (log, statsInterval, connectionString, batchSize, tailSleepInterval, checkpoints, sink, trancheIds, ?startFromTail, ?sourceId) =
MessageDbSource(log, statsInterval, Core.MessageDbCategoryClient(connectionString),
batchSize, tailSleepInterval, checkpoints, sink, trancheIds, ?startFromTail=startFromTail, ?sourceId=sourceId)
new( log, statsInterval,
connectionString, batchSize, tailSleepInterval,
checkpoints, sink,
categories,
// Override default start position to be at the tail of the index. Default: Replay all events.
?startFromTail, ?sourceId) =
MessageDbSource(log, statsInterval, Internal.MessageDbCategoryClient(connectionString),
batchSize, tailSleepInterval, checkpoints, sink,
categories |> Array.map Propulsion.Feed.TrancheId.parse,
?startFromTail=startFromTail, ?sourceId=sourceId)

abstract member ListTranches : unit -> Async<Propulsion.Feed.TrancheId array>
default _.ListTranches() = async { return categories |> Array.map TrancheId.parse }
default _.ListTranches() = async { return tranches }

abstract member Pump : unit -> Async<unit>
default x.Pump() = base.Pump(x.ListTranches)

abstract member Start : unit -> Propulsion.SourcePipeline<Propulsion.Feed.Core.FeedMonitor>
default x.Start() = base.Start(x.Pump())


/// Pumps to the Sink until either the specified timeout has been reached, or all items in the Source have been fully consumed
member x.RunUntilCaughtUp(timeout : TimeSpan, statsInterval : IntervalTimer) = task {
let sw = Stopwatch.start ()
Expand Down
5 changes: 2 additions & 3 deletions src/Propulsion.MessageDb/Propulsion.MessageDb.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,24 @@

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup>

<ItemGroup>
<Compile Include="..\Propulsion\Infrastructure.fs">
<Link>Infrastructure.fs</Link>
</Compile>
<Compile Include="Internal.fs" />
<Compile Include="Types.fs" />
<Compile Include="MessageDbSource.fs" />
<Compile Include="ReaderCheckpoint.fs" />
<Content Include="Readme.md" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="Npgsql" Version="6.0.7" />
</ItemGroup>


<ItemGroup>
<ProjectReference Include="..\Propulsion.Feed\Propulsion.Feed.fsproj" />
</ItemGroup>
Expand Down
112 changes: 58 additions & 54 deletions src/Propulsion.MessageDb/ReaderCheckpoint.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,69 +5,73 @@ open NpgsqlTypes
open Propulsion.Feed
open Propulsion.Infrastructure

let [<Literal>] TableName = "propulsion_checkpoint"

let table = "propulsion_checkpoint"
module internal Impl =

let createIfNotExists (conn : NpgsqlConnection, schema: string) =
let cmd = conn.CreateCommand(CommandText = $"create table if not exists {schema}.{table} (
source text not null,
tranche text not null,
consumer_group text not null,
position bigint not null,
primary key (source, tranche, consumer_group));")
cmd.ExecuteNonQueryAsync() |> Async.AwaitTaskCorrect |> Async.Ignore<int>
open System.Threading
open System.Threading.Tasks

let commitPosition (conn : NpgsqlConnection, schema: string) source tranche (consumerGroup : string) (position : int64)
= async {
let cmd = conn.CreateCommand(CommandText = $"insert into {schema}.{table}(source, tranche, consumer_group, position)
values (@Source, @Tranche, @ConsumerGroup, @Position)
on conflict (source, tranche, consumer_group)
do update set position = @Position;")
cmd.Parameters.AddWithValue("Source", NpgsqlDbType.Text, SourceId.toString source) |> ignore
cmd.Parameters.AddWithValue("Tranche", NpgsqlDbType.Text, TrancheId.toString tranche) |> ignore
cmd.Parameters.AddWithValue("ConsumerGroup", NpgsqlDbType.Text, consumerGroup) |> ignore
cmd.Parameters.AddWithValue("Position", NpgsqlDbType.Bigint, position) |> ignore
let createIfNotExists (conn : NpgsqlConnection, schema: string) ct = task {
let cmd = conn.CreateCommand(CommandText = $"create table if not exists {schema}.{TableName} (
source text not null,
tranche text not null,
consumer_group text not null,
position bigint not null,
primary key (source, tranche, consumer_group));")
do! cmd.ExecuteNonQueryAsync(ct) : Task }

let! ct = Async.CancellationToken
do! cmd.ExecuteNonQueryAsync(ct) |> Async.AwaitTaskCorrect |> Async.Ignore<int> }
let commitPosition (conn : NpgsqlConnection, schema: string) source tranche (consumerGroup : string) (position : int64) ct = task {
let cmd = conn.CreateCommand(CommandText = $"insert into {schema}.{TableName}(source, tranche, consumer_group, position)
values (@Source, @Tranche, @ConsumerGroup, @Position)
on conflict (source, tranche, consumer_group)
do update set position = @Position;")
cmd.Parameters.AddWithValue("Source", NpgsqlDbType.Text, SourceId.toString source) |> ignore
cmd.Parameters.AddWithValue("Tranche", NpgsqlDbType.Text, TrancheId.toString tranche) |> ignore
cmd.Parameters.AddWithValue("ConsumerGroup", NpgsqlDbType.Text, consumerGroup) |> ignore
cmd.Parameters.AddWithValue("Position", NpgsqlDbType.Bigint, position) |> ignore
do! cmd.ExecuteNonQueryAsync(ct) :> Task }

let tryGetPosition (conn : NpgsqlConnection, schema : string) source tranche (consumerGroup : string) = async {
let cmd = conn.CreateCommand(CommandText = $"select position from {schema}.{table}
where source = @Source
and tranche = @Tranche
and consumer_group = @ConsumerGroup")
cmd.Parameters.AddWithValue("Source", NpgsqlDbType.Text, SourceId.toString source) |> ignore
cmd.Parameters.AddWithValue("Tranche", NpgsqlDbType.Text, TrancheId.toString tranche) |> ignore
cmd.Parameters.AddWithValue("ConsumerGroup", NpgsqlDbType.Text, consumerGroup) |> ignore
let tryGetPosition (conn : NpgsqlConnection, schema : string) source tranche (consumerGroup : string) (ct : CancellationToken) = task {
let cmd = conn.CreateCommand(CommandText = $"select position from {schema}.{TableName}
where source = @Source
and tranche = @Tranche
and consumer_group = @ConsumerGroup")
cmd.Parameters.AddWithValue("Source", NpgsqlDbType.Text, SourceId.toString source) |> ignore
cmd.Parameters.AddWithValue("Tranche", NpgsqlDbType.Text, TrancheId.toString tranche) |> ignore
cmd.Parameters.AddWithValue("ConsumerGroup", NpgsqlDbType.Text, consumerGroup) |> ignore
use! reader = cmd.ExecuteReaderAsync(ct)
return if reader.Read() then ValueSome (reader.GetInt64 0) else ValueNone }

let! ct = Async.CancellationToken
use! reader = cmd.ExecuteReaderAsync(ct) |> Async.AwaitTaskCorrect
return if reader.Read() then ValueSome (reader.GetInt64 0) else ValueNone }

type CheckpointStore(connString : string, schema: string, consumerGroupName, defaultCheckpointFrequency) =
let connect = Npgsql.connect connString

member _.CreateSchemaIfNotExists() = async {
let exec connString f= async {
let! ct = Async.CancellationToken
use! conn = connect ct |> Async.AwaitTaskCorrect
return! createIfNotExists (conn, schema) }
use! conn = connect connString ct |> Async.AwaitTaskCorrect
return! f conn ct |> Async.AwaitTaskCorrect }

interface IFeedCheckpointStore with
type CheckpointStore(connString : string, schema : string, consumerGroupName, defaultCheckpointFrequency : System.TimeSpan) =
let exec f = Impl.exec connString f
let setPos source tranche pos =
let commit conn = Impl.commitPosition (conn, schema) source tranche consumerGroupName (Position.toInt64 pos)
exec commit

member _.Start(source, tranche, ?establishOrigin) = async {
let! ct = Async.CancellationToken
use! conn = connect ct |> Async.AwaitTaskCorrect
let! maybePos = tryGetPosition (conn, schema) source tranche consumerGroupName
let! pos =
match maybePos, establishOrigin with
| ValueSome pos, _ -> async { return Position.parse pos }
| ValueNone, Some f -> f
| ValueNone, None -> async { return Position.initial }
return defaultCheckpointFrequency, pos }
member _.CreateSchemaIfNotExists() : Async<unit> =
let creat conn = Impl.createIfNotExists (conn, schema)
exec creat

member _.Commit(source, tranche, pos) = async {
let! ct = Async.CancellationToken
use! conn = connect ct |> Async.AwaitTaskCorrect
return! commitPosition (conn, schema) source tranche consumerGroupName (Position.toInt64 pos) }
member _.Override(source, tranche, pos : Position) : Async<unit> =
setPos source tranche pos

interface IFeedCheckpointStore with

member _.Start(source, tranche, ?establishOrigin) =
let start conn ct = task {
let! maybePos = Impl.tryGetPosition (conn, schema) source tranche consumerGroupName ct |> Async.AwaitTaskCorrect
let! pos =
match maybePos, establishOrigin with
| ValueSome pos, _ -> async { return Position.parse pos }
| ValueNone, Some f -> f
| ValueNone, None -> async { return Position.initial }
return struct (defaultCheckpointFrequency, pos) }
exec start
member _.Commit(source, tranche, pos) : Async<unit> =
setPos source tranche pos
Loading

0 comments on commit f2d75ba

Please sign in to comment.