Skip to content

Commit

Permalink
Improve error logging
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Apr 3, 2019
1 parent f0397a1 commit 6ba49a4
Showing 1 changed file with 29 additions and 15 deletions.
44 changes: 29 additions & 15 deletions equinox-sync/Sync/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ module CmdParser =
member __.Stripes = args.GetResult(Stripes,1)
member __.TailInterval = match args.TryGetResult Tail with Some s -> TimeSpan.FromSeconds s |> Some | None -> None
member x.BuildFeedParams() : ReaderSpec =
Log.Warning("Processing in batches of {batchSize} (min {minBatchSize}) with {stripes} stripes", x.StartingBatchSize, x.MinBatchSize, x.Stripes)
Log.Warning("Processing in batches of [{minBatchSize}..{batchSize}] with {stripes} stripes", x.MinBatchSize, x.StartingBatchSize, x.Stripes)
let startPos =
match args.TryGetResult Offset, args.TryGetResult Chunk, args.TryGetResult Percent, args.Contains All with
| Some p, _, _, _ -> Log.Warning("Processing will commence at $all Position {p}", p); Absolute p
Expand Down Expand Up @@ -304,10 +304,15 @@ module Ingester =
let category (s : string) = s.Split([|'-'|], 2, StringSplitOptions.RemoveEmptyEntries) |> Array.head
let cosmosPayloadLimit = 2 * 1024 * 1024 - 1024
let inline cosmosPayloadBytes (x: Equinox.Codec.IEvent<byte[]>) = arrayBytes x.Data + arrayBytes x.Meta + 4
let isMalformedException (e: #exn) =
e.ToString().Contains "SyntaxError: JSON.parse Error: Unexpected input at position"
|| e.ToString().Contains "SyntaxError: JSON.parse Error: Invalid character at position"

let (|TimedOutMessage|RateLimitedMessage|MalformedMessage|Other|) (e: exn) =
match string e with
| m when m.Contains "Microsoft.Azure.Documents.RequestTimeoutException" -> TimedOutMessage
| m when m.Contains "Microsoft.Azure.Documents.RequestRateTooLargeException" -> RateLimitedMessage
| m when m.Contains "SyntaxError: JSON.parse Error: Unexpected input at position"
|| m.Contains "SyntaxError: JSON.parse Error: Invalid character at position" -> MalformedMessage
| _ -> Other

type Result = TimedOut | RateLimited | Malformed of category: string | Ok
type StreamStates() =
let states = System.Collections.Generic.Dictionary<string, StreamState>()
let dirty = System.Collections.Generic.Queue()
Expand All @@ -328,13 +333,18 @@ module Ingester =

member __.Add (item: Batch, ?isMalformed) = updateWritePos item.stream None (defaultArg isMalformed false) [|item.span|]
member __.HandleWriteResult = function
| Writer.Result.Ok (stream, pos) -> updateWritePos stream (Some pos) false null; None
| Writer.Result.Duplicate (stream, pos) -> updateWritePos stream (Some pos) false null; None
| Writer.Result.Conflict overage -> updateWritePos overage.stream (Some overage.span.index) false [|overage.span|]; None
| Writer.Result.Ok (stream, pos) -> updateWritePos stream (Some pos) false null; Ok
| Writer.Result.Duplicate (stream, pos) -> updateWritePos stream (Some pos) false null; Ok
| Writer.Result.Conflict overage -> updateWritePos overage.stream (Some overage.span.index) false [|overage.span|]; Ok
| Writer.Result.Exn (exn, batch) ->
let malformed = isMalformedException exn
let r, malformed =
match exn with
| RateLimitedMessage -> RateLimited, false
| TimedOutMessage -> TimedOut, false
| MalformedMessage -> Malformed (category batch.stream), true
| Other -> Ok, false
__.Add(batch,malformed)
if malformed then Some (category batch.stream) else None
r
member __.TryPending() =
match dirty |> Queue.tryDequeue with
| None -> None
Expand All @@ -351,7 +361,7 @@ module Ingester =
bytesBudget <- bytesBudget - cosmosPayloadBytes y
count <- count + 1
// Reduce the item count when we don't yet know the write position
count < (if Option.isNone state.write then 10 else 1000) && (bytesBudget >= 0 || count = 1)
count <= (if Option.isNone state.write then 10 else 100) && (bytesBudget >= 0 || count = 1)
Some { stream = stream; span = { index = x.index; events = x.events |> Array.takeWhile max2MbMax1000EventsMax10EventsFirstTranche } }
member __.Dump() =
let mutable synced, ready, waiting, malformed = 0, 0, 0, 0
Expand Down Expand Up @@ -383,15 +393,18 @@ module Ingester =
let badCats = CatStats()
let progressTimer = Stopwatch.StartNew()
while not cancellationToken.IsCancellationRequested do
let mutable moreResults = true
let mutable moreResults, rateLimited, timedOut = true, 0, 0
while moreResults do
match results.TryDequeue() with
| true, res ->
incr resultsHandled
match states.HandleWriteResult res with
| None -> res.WriteTo log
| Some cat -> badCats.Ingest cat
| Malformed cat -> badCats.Ingest cat
| RateLimited -> rateLimited <- rateLimited + 1
| TimedOut -> timedOut <- timedOut + 1
| Ok -> res.WriteTo log
| false, _ -> moreResults <- false
if rateLimited <> 0 || timedOut <> 0 then Log.Warning("Failures {rateLimited} Rate-limited, {timedOut} Timed out", rateLimited, timedOut)
let mutable t = Unchecked.defaultof<_>
let mutable toIngest = 4096 * 5
while work.TryTake(&t,fiveMs) && toIngest > 0 do
Expand Down Expand Up @@ -606,9 +619,10 @@ module EventStoreReader =
let adjust batchSize = if batchSize > minBatchSize then batchSize - 128 else batchSize
match work with
| Stream (name,batchSize) ->
use _ = Serilog.Context.LogContext.PushProperty("Stream",name)
use _ = Serilog.Context.LogContext.PushProperty("Tranche",name)
Log.Warning("Reading stream; batch size {bs}", batchSize)
try do! pullStream (conn, batchSize) name postBatch
Log.Warning("completed stream")
with e ->
let bs = adjust batchSize
Log.Warning(e,"Could not read stream, retrying with batch size {bs}", bs)
Expand Down

0 comments on commit 6ba49a4

Please sign in to comment.