Skip to content

Commit

Permalink
Add Seq support
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Apr 12, 2019
1 parent 637d91a commit 73f2a16
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 15 deletions.
29 changes: 15 additions & 14 deletions equinox-sync/Sync/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ module CmdParser =
[<NoEquality; NoComparison>]
type Parameters =
| [<MainCommand; ExactlyOnce>] ConsumerGroupName of string
| [<AltCommandLine "-S"; Unique>] LocalSeq
| [<AltCommandLine "-as"; Unique>] LeaseCollectionSource of string
| [<AltCommandLine "-ad"; Unique>] LeaseCollectionDestination of string
| [<AltCommandLine "-l"; Unique>] LagFreqS of float
Expand All @@ -37,6 +38,7 @@ module CmdParser =
member a.Usage =
match a with
| ConsumerGroupName _ -> "Projector consumer group name."
| LocalSeq -> "configures writing to a local Seq endpoint at http://localhost:5341, see https://getseq.net"
| ForceStartFromHere _ -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event."
| BatchSize _ -> "maximum item count to request from feed. Default: 1000"
| LeaseCollectionSource _ ->"specify Collection Name for Leases collection, within `source` connection/database (default: `source`'s `collection` + `-aux`)."
Expand All @@ -46,6 +48,7 @@ module CmdParser =
| Source _ -> "CosmosDb input parameters."
| Verbose -> "request Verbose Logging. Default: off"
and Arguments(a : ParseResults<Parameters>) =
member __.MaybeSeqEndpoint = if a.Contains LocalSeq then Some "http://localhost:5341" else None
member __.LeaseId = a.GetResult ConsumerGroupName
member __.BatchSize = a.GetResult(BatchSize,1000)
member __.StartFromHere = a.Contains ForceStartFromHere
Expand Down Expand Up @@ -217,19 +220,17 @@ module CosmosIngester =
| Exn of exn: exn * batch: Batch
member __.WriteTo(log: ILogger) =
match __ with
| Ok (stream, pos) ->
log.Information("Wrote {stream} up to {pos}", stream, pos)
| Duplicate (stream, pos) ->
log.Information("Ignored {stream} (synced up to {pos})", stream, pos)
| PartialDuplicate overage ->
log.Information("Requeing {stream} {pos} ({count} events)", overage.stream, overage.span.index, overage.span.events.Length)
| PrefixMissing ({stream=stream; span=span},pos) ->
log.Information("Waiting {stream} missing {gap} events ({count} events @ {pos})", stream, span.index-pos, span.events.Length, span.index)
| Exn (exn, batch) ->
log.Warning(exn,"Writing {stream} failed, retrying {count} events ....", batch.stream, batch.span.events.Length)
| Ok (stream, pos) -> log.Information("Wrote {stream} up to {pos}", stream, pos)
| Duplicate (stream, pos) -> log.Information("Ignored {stream} (synced up to {pos})", stream, pos)
| PartialDuplicate overage -> log.Information("Requeing {stream} {pos} ({count} events)",
overage.stream, overage.span.index, overage.span.events.Length)
| PrefixMissing (batch,pos) -> log.Information("Waiting {stream} missing {gap} events ({count} events @ {pos})",
batch.stream, batch.span.index-pos, batch.span.events.Length, batch.span.index)
| Exn (exn, batch) -> log.Warning(exn,"Writing {stream} failed, retrying {count} events ....",
batch.stream, batch.span.events.Length)
let write (log : ILogger) (ctx : CosmosContext) ({ stream = s; span = { index = p; events = e}} as batch) = async {
let stream = ctx.CreateStream s
log.Debug("Writing {s}@{i}x{n}",s,p,e.Length)
log.Information("Writing {s}@{i}x{n}",s,p,e.Length)
try let! res = ctx.Sync(stream, { index = p; etag = None }, e)
match res with
| AppendResult.Ok pos -> return Ok (s, pos.index)
Expand All @@ -238,7 +239,7 @@ module CosmosIngester =
| actual, expectedMax when actual >= expectedMax -> return Duplicate (s, pos.index)
| actual, _ when p > actual -> return PrefixMissing (batch, actual)
| actual, _ ->
log.Debug("pos {pos} batch.pos {bpos} len {blen} skip {skip}", actual, p, e.LongLength, actual-p)
log.Information("pos {pos} batch.pos {bpos} len {blen} skip {skip}", actual, p, e.LongLength, actual-p)
return PartialDuplicate { stream = s; span = { index = actual; events = e |> Array.skip (actual-p |> int) } }
with e -> return Exn (e, batch) }

Expand Down Expand Up @@ -684,15 +685,15 @@ let main argv =
let destination = args.Destination.Connect "SyncTemplate" |> Async.RunSynchronously
let colls = CosmosCollections(args.Destination.Database, args.Destination.Collection)
Equinox.Cosmos.Core.CosmosContext(destination, colls, Log.ForContext<Core.CosmosContext>())
let log = Logging.initialize args.Verbose args.ChangeFeedVerbose
let log = Logging.initialize args.Verbose args.ChangeFeedVerbose args.MaybeSeqEndpoint
let discovery, source, connectionPolicy, catFilter = args.Source.BuildConnectionDetails()
let auxDiscovery, aux, leaseId, startFromHere, batchSize, lagFrequency = args.BuildChangeFeedParams()
#if marveleqx
let createSyncHandler () = CosmosSource.createRangeSyncHandler log target (CosmosSource.transformV0 catFilter)
#else
let createSyncHandler () = CosmosSource.createRangeSyncHandler log target (CosmosSource.transformOrFilter catFilter)
// Uncomment to test marveleqx mode
let createSyncHandler () = CosmosSource.createRangeSyncHandler log target (CosmosSource.transformV0 catFilter)
// let createSyncHandler () = CosmosSource.createRangeSyncHandler log target (CosmosSource.transformV0 catFilter)
#endif
CosmosSource.run (discovery, source) (auxDiscovery, aux) connectionPolicy
(leaseId, startFromHere, batchSize, lagFrequency)
Expand Down
3 changes: 2 additions & 1 deletion equinox-sync/Sync/Sync.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
<ItemGroup>
<PackageReference Include="Argu" Version="5.2.0" />
<PackageReference Include="Destructurama.FSharp.NetCore" Version="1.0.14" />
<PackageReference Include="Equinox.Cosmos.Projection" Version="2.0.0-preview4" />
<PackageReference Include="Equinox.Cosmos.Projection" Version="2.0.0-preview5" />
<PackageReference Include="Microsoft.Azure.DocumentDB.ChangeFeedProcessor" Version="2.2.6" />
<PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" />
<PackageReference Include="Serilog.Sinks.Seq" Version="4.0.0" />
</ItemGroup>

</Project>

0 comments on commit 73f2a16

Please sign in to comment.