diff --git a/src/Propulsion.Cosmos/ChangeFeedProcessor.fs b/src/Propulsion.Cosmos/ChangeFeedProcessor.fs index ece0a51b..b1ee3abf 100644 --- a/src/Propulsion.Cosmos/ChangeFeedProcessor.fs +++ b/src/Propulsion.Cosmos/ChangeFeedProcessor.fs @@ -132,7 +132,7 @@ type ChangeFeedProcessor = // NB for lag reporting to work correctly, it is of course still important that the writing take place, and that it be written via the CFP lib feedProcessorOptions.CheckpointFrequency.ExplicitCheckpoint <- true // Max Items is not emphasized as a control mechanism as it can only be used meaningfully when events are highly regular in size - maxDocuments |> Option.iter (fun mi -> feedProcessorOptions.MaxItemCount <- Nullable mi) + maxDocuments |> Option.iter (fun mi -> feedProcessorOptions.MaxItemCount <- mi) let mkD cid (dc : DocumentClient) = DocumentCollectionInfo(Uri=dc.ServiceEndpoint,ConnectionPolicy=dc.ConnectionPolicy,DatabaseName=cid.database,CollectionName=cid.container) ChangeFeedProcessorBuilder() diff --git a/src/Propulsion.CosmosStore/ChangeFeedProcessor.fs b/src/Propulsion.CosmosStore/ChangeFeedProcessor.fs index 2575349f..90b06d40 100644 --- a/src/Propulsion.CosmosStore/ChangeFeedProcessor.fs +++ b/src/Propulsion.CosmosStore/ChangeFeedProcessor.fs @@ -138,7 +138,7 @@ type ChangeFeedProcessor = .GetChangeFeedProcessorBuilderWithManualCheckpoint(processorName_, Container.ChangeFeedHandlerWithManualCheckpoint handler) .WithLeaseContainer(leases) .WithPollInterval(feedPollDelay) - .WithLeaseConfiguration(acquireInterval = Nullable leaseAcquireInterval, expirationInterval = Nullable leaseTtl, renewInterval = Nullable leaseRenewInterval) + .WithLeaseConfiguration(acquireInterval = leaseAcquireInterval, expirationInterval = leaseTtl, renewInterval = leaseRenewInterval) .WithInstanceName(leaseOwnerId) .WithLeaseAcquireNotification(Container.ChangeFeedMonitorLeaseAcquireDelegate acquireAsync) .WithLeaseReleaseNotification(Container.ChangeFeedMonitorLeaseReleaseDelegate releaseAsync) diff --git a/src/Propulsion.DynamoStore.Lambda/Propulsion.DynamoStore.Lambda.fsproj b/src/Propulsion.DynamoStore.Lambda/Propulsion.DynamoStore.Lambda.fsproj index 0e1d4e2d..433410ca 100644 --- a/src/Propulsion.DynamoStore.Lambda/Propulsion.DynamoStore.Lambda.fsproj +++ b/src/Propulsion.DynamoStore.Lambda/Propulsion.DynamoStore.Lambda.fsproj @@ -1,6 +1,7 @@ net6.0 + false true Lambda @@ -21,8 +22,6 @@ - - diff --git a/src/Propulsion.DynamoStore/DynamoStoreIndexer.fs b/src/Propulsion.DynamoStore/DynamoStoreIndexer.fs index 1b25be5c..fd4de0f2 100644 --- a/src/Propulsion.DynamoStore/DynamoStoreIndexer.fs +++ b/src/Propulsion.DynamoStore/DynamoStoreIndexer.fs @@ -27,5 +27,5 @@ type DynamoStoreIndexer(log : Serilog.ILogger, context, cache, epochBytesCutoff, /// NOTE regardless of concurrency within a process, it's critical to avoid having >1 writer hitting the same trancheId as this will result on continual conflicts member _.IngestWithoutConcurrency(trancheId, spans) = async { let ingester = ingester trancheId - let! originTranche = ingester.ActiveIngestionEpochId() - return! ingester.IngestMany(originTranche, spans) |> Async.Ignore } + let! originEpoch = ingester.ActiveIngestionEpochId() + return! ingester.IngestMany(originEpoch, spans) |> Async.Ignore } diff --git a/src/Propulsion.DynamoStore/DynamoStoreSource.fs b/src/Propulsion.DynamoStore/DynamoStoreSource.fs index 00a92c94..c1af5d02 100644 --- a/src/Propulsion.DynamoStore/DynamoStoreSource.fs +++ b/src/Propulsion.DynamoStore/DynamoStoreSource.fs @@ -117,15 +117,14 @@ module private Impl = let events = Array.sub items (span.i - items[0].Index |> int) span.c.Length for e in events do ({ stream = IndexStreamId.toStreamName span.p; event = e } : StreamEvent) |] } let mutable prevLoaded, batchIndex = 0L, 0 - let report i len = + let report (i : int option) len = if largeEnough && hydrating then - let i = Option.toNullable i match cache.Count with | loadedNow when prevLoaded <> loadedNow -> prevLoaded <- loadedNow let eventsLoaded = cache.Values |> Seq.sumBy Array.length log.Information("DynamoStoreSource {sourceId}/{trancheId}/{epochId}@{offset}/{totalChanges} {result} {batch} {events}e Loaded {loadedS}/{loadingS}s {loadedE}/{loadingE}e", - sourceId, string tid, string epochId, i, version, "Hydrated", batchIndex, len, cache.Count, streamEvents.Count, eventsLoaded, chosenEvents) + sourceId, string tid, string epochId, Option.toNullable i, version, "Hydrated", batchIndex, len, cache.Count, streamEvents.Count, eventsLoaded, chosenEvents) | _ -> () batchIndex <- batchIndex + 1 for i, spans in state.changes do diff --git a/tools/Propulsion.Tool/Program.fs b/tools/Propulsion.Tool/Program.fs index d46444be..46cc6db3 100644 --- a/tools/Propulsion.Tool/Program.fs +++ b/tools/Propulsion.Tool/Program.fs @@ -6,6 +6,8 @@ open Propulsion.Tool.Args open Serilog open System +module CosmosInit = Equinox.CosmosStore.Core.Initialization + [] type Parameters = | [] Verbose @@ -36,18 +38,17 @@ and [] InitAuxParameters = | Mode _ -> "Configure RU mode to use Container-level RU, Database-level RU, or Serverless allocations (Default: Use Container-level allocation)." | Suffix _ -> "Specify Container Name suffix (default: `-aux`)." | Cosmos _ -> "Cosmos Connection parameters." - -and CosmosInitInfo(p : ParseResults) = - let throughputSpec = - if p.Contains Autoscale then Equinox.CosmosStore.Core.Initialization.Throughput.Autoscale (p.GetResult(Rus, 4000)) - else Equinox.CosmosStore.Core.Initialization.Throughput.Manual (p.GetResult(Rus, 400)) +and CosmosModeType = Container | Db | Serverless +and CosmosInitArguments(p : ParseResults) = + let rusOrDefault value = p.GetResult(Rus, value) + let throughput auto = if auto then CosmosInit.Throughput.Autoscale (rusOrDefault 4000) else CosmosInit.Throughput.Manual (rusOrDefault 400) member val ProvisioningMode = - match p.GetResult(Mode, CosmosModeType.Container) with - | CosmosModeType.Container -> Equinox.CosmosStore.Core.Initialization.Provisioning.Container throughputSpec - | CosmosModeType.Db -> Equinox.CosmosStore.Core.Initialization.Provisioning.Database throughputSpec - | CosmosModeType.Serverless -> - if p.Contains Rus || p.Contains Autoscale then missingArg "Cannot specify RU/s or Autoscale in Serverless mode" - Equinox.CosmosStore.Core.Initialization.Provisioning.Serverless + match p.GetResult(Mode, CosmosModeType.Container), p.Contains Autoscale with + | CosmosModeType.Container, auto -> CosmosInit.Provisioning.Container (throughput auto) + | CosmosModeType.Db, auto -> CosmosInit.Provisioning.Database (throughput auto) + | CosmosModeType.Serverless, auto when auto || p.Contains Rus -> missingArg "Cannot specify RU/s or Autoscale in Serverless mode" + | CosmosModeType.Serverless, _ -> CosmosInit.Provisioning.Serverless + and [] CheckpointParameters = | [] Source of Propulsion.Feed.SourceId | [] Tranche of Propulsion.Feed.TrancheId @@ -99,7 +100,6 @@ and [] StatsParameters = member a.Usage = a |> function | Cosmos _ -> "Specify CosmosDB parameters." | Dynamo _ -> "Specify DynamoDB parameters." -and CosmosModeType = Container | Db | Serverless let [] appName = "propulsion-tool" @@ -108,8 +108,7 @@ module CosmosInit = let aux (c, p : ParseResults) = match p.GetSubCommand() with | InitAuxParameters.Cosmos sa -> - let a = Args.Cosmos.Arguments(c, sa) - let mode = (CosmosInitInfo p).ProvisioningMode + let mode, a = (CosmosInitArguments p).ProvisioningMode, Args.Cosmos.Arguments(c, sa) let client = a.ConnectLeases() match mode with | Equinox.CosmosStore.Core.Initialization.Provisioning.Container throughput ->