Skip to content

Commit

Permalink
Log polish etc
Browse files Browse the repository at this point in the history
Minor tuning
Straggler struct tuples
  • Loading branch information
bartelink committed Aug 24, 2022
1 parent f4be4ee commit 2127ebc
Show file tree
Hide file tree
Showing 18 changed files with 199 additions and 169 deletions.
2 changes: 1 addition & 1 deletion src/Propulsion.CosmosStore/CosmosStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type CosmosSource =
sw.Stop() // Stop the clock after ChangeFeedProcessor hands off to us
let epoch, age = ctx.FeedResponse.ResponseContinuation.Trim[|'"'|] |> int64, DateTime.UtcNow - docs.[docs.Count-1].Timestamp
let! pt, (cur,max) = rangeIngester.Ingest {epoch = epoch; checkpoint = ctx.Checkpoint(); items = mapContent docs; onCompletion = ignore } |> Stopwatch.Time
let readS, postS, rc = float sw.ElapsedMilliseconds / 1000., (let e = pt.Elapsed in e.TotalSeconds), ctx.FeedResponse.RequestCharge
let readS, postS, rc = sw.ElapsedSeconds, (let e = pt.Elapsed in e.TotalSeconds), ctx.FeedResponse.RequestCharge
let m = Log.Metric.Read {
database = context.source.database; container = context.source.container; group = context.leasePrefix; rangeId = int ctx.PartitionKeyRangeId
token = epoch; latency = sw.Elapsed; rc = rc; age = age; docs = docs.Count
Expand Down
4 changes: 2 additions & 2 deletions src/Propulsion.CosmosStore/ReaderCheckpoint.fs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ let decideStart establishOrigin at freq state = async {
| Fold.NotStarted ->
let! origin = establishOrigin
let config, checkpoint = mk at freq origin
return (configFreq config, checkpoint.pos), [Events.Started { config = config; origin = checkpoint}]
return struct (configFreq config, checkpoint.pos), [Events.Started { config = config; origin = checkpoint}]
| Fold.Running s ->
return (configFreq s.config, s.state.pos), [] }

Expand Down Expand Up @@ -114,7 +114,7 @@ type Service internal (resolve : SourceId * TrancheId * string -> Decider<Events

/// Start a checkpointing series with the supplied parameters
/// Yields the checkpoint interval and the starting position
member _.Start(source, tranche, ?establishOrigin) : Async<TimeSpan * Position> =
member _.Start(source, tranche, ?establishOrigin) : Async<struct (TimeSpan * Position)> =
let decider = resolve (source, tranche, consumerGroupName)
let establishOrigin = match establishOrigin with None -> async { return Position.initial } | Some f -> f
decider.TransactAsync(decideStart establishOrigin DateTimeOffset.UtcNow defaultCheckpointFrequency)
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.DynamoStore/AppendsEpoch.fs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ module Config =
/// only deserializing events pertaining to things we have not seen before
module Reader =

type Event = int64 * Events.Event
type Event = (struct (int64 * Events.Event))
let codec : FsCodec.IEventCodec<Event, _, _> = EventCodec.withIndex<Events.Event>

type State = { changes : struct (int * Events.StreamSpan array) array; closed : bool }
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.DynamoStore/DynamoDbExport.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ module DynamoDbJsonParser =
and [<Struct>] NumVal = { N : string }
and ListVal<'t> = { L : 't[] }

let read (path : string) : seq<string * DynamoStoreIndex.EventSpan> = seq {
let read (path : string) : seq<struct (string * DynamoStoreIndex.EventSpan)> = seq {
use r = new StreamReader(path)
let mutable more = true
while more do
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.DynamoStore/DynamoStoreIndex.fs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ module Reader =
string epochId, totalEvents, totalStreams, spans.Length, state.changes.Length, Propulsion.Internal.mb sizeB, t.TotalSeconds)
return spans, state.closed, sizeB }

let loadIndex (log, storeLog, context) trancheId gapsLimit: Async<Buffer * int64> = async {
let loadIndex (log, storeLog, context) trancheId gapsLimit: Async<struct (Buffer * int64)> = async {
let indexEpochs = AppendsEpoch.Reader.Config.create storeLog context
let mutable epochId, more, totalB, totalSpans = AppendsEpochId.initial, true, 0L, 0L
let state = Buffer()
Expand Down
4 changes: 2 additions & 2 deletions src/Propulsion.DynamoStore/DynamoStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ module private Impl =
let materializeIndexEpochAsBatchesOfStreamEvents
(log : Serilog.ILogger, sourceId, storeLog) (hydrating, maybeLoad, loadDop) batchCutoff (context : DynamoStoreContext)
(AppendsTrancheId.Parse tid, Checkpoint.Parse (epochId, offset))
: AsyncSeq<System.TimeSpan * Propulsion.Feed.Core.Batch<_>> = asyncSeq {
: AsyncSeq<struct (System.TimeSpan * Propulsion.Feed.Core.Batch<_>)> = asyncSeq {
let epochs = AppendsEpoch.Reader.Config.create storeLog context
let sw = System.Diagnostics.Stopwatch.StartNew()
let! _maybeSize, version, state = epochs.Read(tid, epochId, offset)
Expand Down Expand Up @@ -130,7 +130,7 @@ module private Impl =
if buffer.Count <> 0 && buffer.Count + pending.Length > batchCutoff then
let! hydrated = materializeSpans
report (Some i) hydrated.Length
yield sw.Elapsed, sliceBatch epochId i hydrated // not i + 1 as the batch does not include these changes
yield struct (sw.Elapsed, sliceBatch epochId i hydrated) // not i + 1 as the batch does not include these changes
sw.Reset()
buffer.Clear()
buffer.AddRange(pending)
Expand Down
10 changes: 5 additions & 5 deletions src/Propulsion.DynamoStore/Types.fs
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ module internal Config =
let createWithOriginIndex codec initial fold context minIndex =
// TOCONSIDER include way to limit item count being read
// TOCONSIDER implement a loader hint to pass minIndex to the query as an additional filter
let isOrigin (i, _) = i <= minIndex
let isOrigin struct (i, _) = i <= minIndex
// There _should_ always be an event at minIndex - if there isn't for any reason, the load might go back one event too far
// Here we trim it for correctness (although Propulsion would technically ignore it)
let trimPotentialOverstep = Seq.filter (fun (i, _e) -> i >= minIndex)
let trimPotentialOverstep = Seq.filter (fun struct (i, _e) -> i >= minIndex)
let accessStrategy = AccessStrategy.MultiSnapshot (isOrigin, fun _ -> failwith "writing not applicable")
create codec initial (fun s -> trimPotentialOverstep >> fold s) accessStrategy (context, None)

Expand All @@ -81,9 +81,9 @@ module internal EventCodec =
let private withUpconverter<'c, 'e when 'c :> TypeShape.UnionContract.IUnionContract> up : FsCodec.IEventCodec<'e, _, _> =
let down (_ : 'e) = failwith "Unexpected"
FsCodec.SystemTextJson.Codec.Create<'e, 'c, _>(up, down) |> FsCodec.Deflate.EncodeTryDeflate
let withIndex<'c when 'c :> TypeShape.UnionContract.IUnionContract> : FsCodec.IEventCodec<int64 * 'c, _, _> =
let up (raw : FsCodec.ITimelineEvent<_>, e) = raw.Index, e
withUpconverter<'c, int64 * 'c> up
let withIndex<'c when 'c :> TypeShape.UnionContract.IUnionContract> : FsCodec.IEventCodec<struct (int64 * 'c), _, _> =
let up (raw : FsCodec.ITimelineEvent<_>, e) = struct (raw.Index, e)
withUpconverter<'c, struct (int64 * 'c)> up

module internal Async =

Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.Feed/FeedReader.fs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ type FeedReader
crawl :
bool // lastWasTail : may be used to induce a suitable backoff when repeatedly reading from tail
* Position // checkpointPosition
-> AsyncSeq<TimeSpan * Batch<Streams.Default.EventBody>>,
-> AsyncSeq<struct (TimeSpan * Batch<Streams.Default.EventBody>)>,
// <summary>Feed a batch into the ingester. Internal checkpointing decides which Commit callback will be called
// Throwing will tear down the processing loop, which is intended; we fail fast on poison messages
// In the case where the number of batches reading has gotten ahead of processing exceeds the limit,
Expand Down
30 changes: 14 additions & 16 deletions src/Propulsion.Feed/FeedSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ namespace Propulsion.Feed.Core
open FSharp.Control
open Propulsion
open Propulsion.Feed
open Propulsion.Internal
open System
open System.Collections.Generic
open System.Threading.Tasks
Expand Down Expand Up @@ -40,7 +41,6 @@ type FeedSourceBase internal
return! Async.Raise e }

let choose f (xs : KeyValuePair<_, _> array) = [| for x in xs do match f x.Value with ValueNone -> () | ValueSome v' -> struct (x.Key, v') |]
let elapsedSeconds (x : System.Diagnostics.Stopwatch) = float x.ElapsedMilliseconds / 1000.
let checkForActivity () =
match positions.Current() with
| xs when xs |> Array.forall (fun (kv : KeyValuePair<_, TrancheState>) -> kv.Value.IsEmpty) -> Array.empty
Expand All @@ -53,17 +53,15 @@ type FeedSourceBase internal
startPositions <- checkForActivity ()
return startPositions }
let awaitCompletion starting (delayMs : int) includeSubsequent logInterval = async {
let maybeLog =
let logDue = Internal.intervalCheck logInterval
fun () ->
if logDue () then
let currentRead, completed =
let current = positions.Current()
current |> choose (fun v -> v.read), current |> choose (fun v -> v.completed)
if includeSubsequent then
log.Information("Feed Awaiting All. Current {current} Completed {completed} Starting {starting}",
currentRead, completed, starting)
else log.Information("Feed Awaiting Starting {starting} Completed {completed}", starting, completed)
let logInterval = IntervalTimer logInterval
let logStatus () =
let currentRead, completed =
let current = positions.Current()
current |> choose (fun v -> v.read), current |> choose (fun v -> v.completed)
if includeSubsequent then
log.Information("FeedSource Awaiting All. Current {current} Completed {completed} Starting {starting}",
currentRead, completed, starting)
else log.Information("FeedSource Awaiting Starting {starting} Completed {completed}", starting, completed)
let isComplete () =
let current = positions.Current()
let completed = current |> choose (fun v -> v.completed)
Expand All @@ -76,14 +74,14 @@ type FeedSourceBase internal
current |> Array.forall (fun kv -> kv.Value.IsEmpty) // All submitted work (including follow-on work), completed
|| (not includeSubsequent && originalStartedAreAllCompleted ())
while not (isComplete ()) && not sink.IsCompleted do
maybeLog ()
if logInterval.IfDueRestart() then logStatus()
do! Async.Sleep delayMs }

/// Propagates exceptions raised by <c>readTranches</c> or <c>crawl</c>,
member internal _.Pump
( readTranches : unit -> Async<TrancheId[]>,
// Responsible for managing retries and back offs; yielding an exception will result in abend of the read loop
crawl : TrancheId -> bool * Position -> AsyncSeq<TimeSpan * Batch<_>>) =
crawl : TrancheId -> bool * Position -> AsyncSeq<struct (TimeSpan * Batch<_>)>) =
// TODO implement behavior to pick up newly added tranches by periodically re-running readTranches
// TODO when that's done, remove workaround in readTranches
pump readTranches crawl
Expand Down Expand Up @@ -144,7 +142,7 @@ and TrancheState =
type TailingFeedSource
( log : Serilog.ILogger, statsInterval : TimeSpan,
sourceId, tailSleepInterval : TimeSpan,
crawl : TrancheId * Position -> AsyncSeq<TimeSpan * Batch<_>>,
crawl : TrancheId * Position -> AsyncSeq<struct (TimeSpan * Batch<_>)>,
checkpoints : IFeedCheckpointStore, establishOrigin : (TrancheId -> Async<Position>) option, sink : Propulsion.Streams.Default.Sink,
renderPos,
?logReadFailure,
Expand Down Expand Up @@ -243,7 +241,7 @@ type FeedSource
let sw = System.Diagnostics.Stopwatch.StartNew()
let! page = readPage (trancheId, pos)
let items' = page.items |> Array.map (fun x -> struct (streamName, x))
yield sw.Elapsed, ({ items = items'; checkpoint = page.checkpoint; isTail = page.isTail } : Core.Batch<_>)
yield struct (sw.Elapsed, ({ items = items'; checkpoint = page.checkpoint; isTail = page.isTail } : Core.Batch<_>))
}

/// Drives the continual loop of reading and checkpointing each tranche until a fault occurs. <br/>
Expand Down
6 changes: 3 additions & 3 deletions src/Propulsion.Feed/PeriodicSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ type PeriodicSource
( log : Serilog.ILogger, statsInterval : TimeSpan, sourceId,
// The <c>AsyncSeq</c> is expected to manage its own resilience strategy (retries etc). <br/>
// Yielding an exception will result in the <c>Pump<c/> loop terminating, tearing down the source pipeline
crawl : TrancheId -> AsyncSeq<TimeSpan * SourceItem<_> array>, refreshInterval : TimeSpan,
crawl : TrancheId -> AsyncSeq<struct (TimeSpan * SourceItem<_> array)>, refreshInterval : TimeSpan,
checkpoints : IFeedCheckpointStore, sink : Propulsion.Streams.Default.Sink,
?renderPos) =
inherit Core.FeedSourceBase(log, statsInterval, sourceId, checkpoints, None, sink, defaultArg renderPos DateTimeOffsetPosition.render)

// We don't want to checkpoint for real until we know the scheduler has handled the full set of pages in the crawl.
let crawl trancheId (_wasLast, position) : AsyncSeq<TimeSpan * Core.Batch<_>> = asyncSeq {
let crawl trancheId (_wasLast, position) : AsyncSeq<struct (TimeSpan * Core.Batch<_>)> = asyncSeq {
let startDate = DateTimeOffsetPosition.getDateTimeOffset position
let dueDate = startDate + refreshInterval
match dueDate - DateTimeOffset.UtcNow with
Expand Down Expand Up @@ -83,7 +83,7 @@ type PeriodicSource
let items = Array.zeroCreate ready
buffer.CopyTo(0, items, 0, ready)
buffer.RemoveRange(0, ready)
yield elapsed, ({ items = items; checkpoint = position; isTail = false } : Core.Batch<_> )
yield struct (elapsed, ({ items = items; checkpoint = position; isTail = false } : Core.Batch<_>))
elapsed <- TimeSpan.Zero
| _ -> ()
let items, checkpoint =
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.Kafka/ProducerSinks.fs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type StreamsProducerSink =

static member Start
( log : ILogger, maxReadAhead, maxConcurrentStreams,
prepare : StreamName * StreamSpan<_> -> Async<string*string>,
prepare : struct (StreamName * StreamSpan<_>) -> Async<struct (string*string)>,
producer : Producer,
stats : Sync.Stats<unit>, statsInterval,
// Default 1 ms
Expand Down
19 changes: 8 additions & 11 deletions src/Propulsion.MemoryStore/MemoryStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -120,22 +120,19 @@ type MemoryStoreSource<'F>(log, store : Equinox.MemoryStore.VolatileStore<'F>, s
let delayMs =
let delay = defaultArg delay (TimeSpan.FromMilliseconds 1.)
int delay.TotalMilliseconds
let maybeLog =
let logInterval = defaultArg logInterval (TimeSpan.FromSeconds 10.)
let logDue = intervalCheck logInterval
fun () ->
if logDue () then
let completed = match Volatile.Read &completed with -1L -> Nullable() | x -> Nullable x
if includeSubsequent then
log.Information("Awaiting Completion of all Batches. Starting Epoch {epoch} Current Epoch {current} Completed Epoch {completed}",
startingEpoch, Volatile.Read &prepared, completed)
else log.Information("Awaiting Completion of Starting Epoch {startingEpoch} Completed Epoch {completed}", startingEpoch, completed)
let logInterval = IntervalTimer(defaultArg logInterval (TimeSpan.FromSeconds 10.))
let logStatus () =
let completed = match Volatile.Read &completed with -1L -> Nullable() | x -> Nullable x
if includeSubsequent then
log.Information("Awaiting Completion of all Batches. Starting Epoch {epoch} Current Epoch {current} Completed Epoch {completed}",
startingEpoch, Volatile.Read &prepared, completed)
else log.Information("Awaiting Completion of Starting Epoch {startingEpoch} Completed Epoch {completed}", startingEpoch, completed)
let isComplete () =
let currentCompleted = Volatile.Read &completed
Volatile.Read &prepared = currentCompleted // All submitted work (including follow-on work), completed
|| (currentCompleted >= startingEpoch && not includeSubsequent) // At or beyond starting point
while not (isComplete ()) && not sink.IsCompleted do
maybeLog ()
if logInterval.IfDueRestart() then logStatus ()
do! Async.Sleep delayMs
// If the sink Faulted, let the awaiter observe the associated Exception that triggered the shutdown
if sink.IsCompleted && not sink.RanToCompletion then
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion/Feed.fs
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ module Position =
type IFeedCheckpointStore =

/// Determines the starting position, and checkpointing frequency for a given tranche
abstract member Start: source: SourceId * tranche : TrancheId * ?establishOrigin : Async<Position> -> Async<TimeSpan * Position>
abstract member Start: source: SourceId * tranche : TrancheId * ?establishOrigin : Async<Position> -> Async<struct (TimeSpan * Position)>
abstract member Commit: source: SourceId * tranche : TrancheId * pos: Position -> Async<unit>
12 changes: 5 additions & 7 deletions src/Propulsion/Ingestion.fs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type private Stats(log : ILogger, partitionId, statsInterval : TimeSpan) =
let mutable validatedEpoch, committedEpoch : int64 option * int64 option = None, None
let mutable commitFails, commits = 0, 0
let mutable cycles, batchesPended, streamsPended, eventsPended = 0, 0, 0, 0
let statsInterval = timeRemaining statsInterval
member val Interval = IntervalTimer statsInterval

member _.DumpStats(activeReads, maxReads) =
log.Information("Ingester {partitionId} Ahead {activeReads}/{maxReads} @ {validated} (committed: {committed}, {commits} commits) Ingested {batches} ({streams:n0}s {events:n0}e) Cycles {cycles}",
Expand All @@ -76,9 +76,8 @@ type private Stats(log : ILogger, partitionId, statsInterval : TimeSpan) =
streamsPended <- streamsPended + streams
eventsPended <- eventsPended + events

member _.Ingest() =
member x.RecordCycle() =
cycles <- cycles + 1
statsInterval ()

/// Buffers items read from a range, unpacking them out of band from the reading so that can overlap
/// On completion of the unpacking, they get submitted onward to the Submitter which will buffer them for us
Expand Down Expand Up @@ -112,10 +111,9 @@ type Ingester<'Items> private
Task.start (fun () -> progressWriter.Pump ct)
while not ct.IsCancellationRequested do
while applyIncoming handleIncoming || applyMessages stats.Handle do ()
let timeToNextStatsMs = let struct (due, nextStatsIntervalMs) = stats.Ingest()
if due then let struct (active, max) = maxRead.State in stats.DumpStats(active, max)
int nextStatsIntervalMs
do! Task.WhenAny(awaitIncoming ct, awaitMessage ct, Task.Delay(timeToNextStatsMs)) :> Task }
stats.RecordCycle()
if stats.Interval.IfDueRestart() then let struct (active, max) = maxRead.State in stats.DumpStats(active, max)
do! Task.WhenAny(awaitIncoming ct, awaitMessage ct, Task.Delay(stats.Interval.RemainingMs)) :> Task }
// arguably the impl should be submitting while unpacking but
// - maintaining consistency between incoming order and submit order is required
// - in general maxRead will be double maxSubmit so this will only be relevant in catchup situations
Expand Down
Loading

0 comments on commit 2127ebc

Please sign in to comment.