Skip to content

Commit

Permalink
- Args processing polish
Browse files Browse the repository at this point in the history
- F# 6 polish (implicit Nullable)
  • Loading branch information
bartelink committed May 31, 2022
1 parent 2bbe428 commit 1d493d0
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 23 deletions.
2 changes: 1 addition & 1 deletion src/Propulsion.Cosmos/ChangeFeedProcessor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ type ChangeFeedProcessor =
// NB for lag reporting to work correctly, it is of course still important that the writing take place, and that it be written via the CFP lib
feedProcessorOptions.CheckpointFrequency.ExplicitCheckpoint <- true
// Max Items is not emphasized as a control mechanism as it can only be used meaningfully when events are highly regular in size
maxDocuments |> Option.iter (fun mi -> feedProcessorOptions.MaxItemCount <- Nullable mi)
maxDocuments |> Option.iter (fun mi -> feedProcessorOptions.MaxItemCount <- mi)
let mkD cid (dc : DocumentClient) =
DocumentCollectionInfo(Uri=dc.ServiceEndpoint,ConnectionPolicy=dc.ConnectionPolicy,DatabaseName=cid.database,CollectionName=cid.container)
ChangeFeedProcessorBuilder()
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.CosmosStore/ChangeFeedProcessor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ type ChangeFeedProcessor =
.GetChangeFeedProcessorBuilderWithManualCheckpoint(processorName_, Container.ChangeFeedHandlerWithManualCheckpoint handler)
.WithLeaseContainer(leases)
.WithPollInterval(feedPollDelay)
.WithLeaseConfiguration(acquireInterval = Nullable leaseAcquireInterval, expirationInterval = Nullable leaseTtl, renewInterval = Nullable leaseRenewInterval)
.WithLeaseConfiguration(acquireInterval = leaseAcquireInterval, expirationInterval = leaseTtl, renewInterval = leaseRenewInterval)
.WithInstanceName(leaseOwnerId)
.WithLeaseAcquireNotification(Container.ChangeFeedMonitorLeaseAcquireDelegate acquireAsync)
.WithLeaseReleaseNotification(Container.ChangeFeedMonitorLeaseReleaseDelegate releaseAsync)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<DisableImplicitFSharpCoreReference>false</DisableImplicitFSharpCoreReference>

<GenerateRuntimeConfigurationFiles>true</GenerateRuntimeConfigurationFiles>
<AWSProjectType>Lambda</AWSProjectType>
Expand All @@ -21,8 +22,6 @@
<ItemGroup>
<PackageReference Include="MinVer" Version="4.0.0" PrivateAssets="All" />

<PackageReference Include="FSharp.Core" Version="4.7.2" />

<PackageReference Include="Amazon.Lambda.Core" Version="2.1.0" />
<PackageReference Include="Amazon.Lambda.DynamoDBEvents" Version="2.1.0" />
<PackageReference Include="Amazon.Lambda.Serialization.SystemTextJson" Version="2.3.0" />
Expand Down
4 changes: 2 additions & 2 deletions src/Propulsion.DynamoStore/DynamoStoreIndexer.fs
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ type DynamoStoreIndexer(log : Serilog.ILogger, context, cache, epochBytesCutoff,
/// NOTE regardless of concurrency within a process, it's critical to avoid having >1 writer hitting the same trancheId as this will result on continual conflicts
member _.IngestWithoutConcurrency(trancheId, spans) = async {
let ingester = ingester trancheId
let! originTranche = ingester.ActiveIngestionEpochId()
return! ingester.IngestMany(originTranche, spans) |> Async.Ignore }
let! originEpoch = ingester.ActiveIngestionEpochId()
return! ingester.IngestMany(originEpoch, spans) |> Async.Ignore }
5 changes: 2 additions & 3 deletions src/Propulsion.DynamoStore/DynamoStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,14 @@ module private Impl =
let events = Array.sub items (span.i - items[0].Index |> int) span.c.Length
for e in events do ({ stream = IndexStreamId.toStreamName span.p; event = e } : StreamEvent) |] }
let mutable prevLoaded, batchIndex = 0L, 0
let report i len =
let report (i : int option) len =
if largeEnough && hydrating then
let i = Option.toNullable i
match cache.Count with
| loadedNow when prevLoaded <> loadedNow ->
prevLoaded <- loadedNow
let eventsLoaded = cache.Values |> Seq.sumBy Array.length
log.Information("DynamoStoreSource {sourceId}/{trancheId}/{epochId}@{offset}/{totalChanges} {result} {batch} {events}e Loaded {loadedS}/{loadingS}s {loadedE}/{loadingE}e",
sourceId, string tid, string epochId, i, version, "Hydrated", batchIndex, len, cache.Count, streamEvents.Count, eventsLoaded, chosenEvents)
sourceId, string tid, string epochId, Option.toNullable i, version, "Hydrated", batchIndex, len, cache.Count, streamEvents.Count, eventsLoaded, chosenEvents)
| _ -> ()
batchIndex <- batchIndex + 1
for i, spans in state.changes do
Expand Down
27 changes: 13 additions & 14 deletions tools/Propulsion.Tool/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ open Propulsion.Tool.Args
open Serilog
open System

module CosmosInit = Equinox.CosmosStore.Core.Initialization

[<NoEquality; NoComparison>]
type Parameters =
| [<AltCommandLine("-V")>] Verbose
Expand Down Expand Up @@ -36,18 +38,17 @@ and [<NoComparison; NoEquality>] InitAuxParameters =
| Mode _ -> "Configure RU mode to use Container-level RU, Database-level RU, or Serverless allocations (Default: Use Container-level allocation)."
| Suffix _ -> "Specify Container Name suffix (default: `-aux`)."
| Cosmos _ -> "Cosmos Connection parameters."

and CosmosInitInfo(p : ParseResults<InitAuxParameters>) =
let throughputSpec =
if p.Contains Autoscale then Equinox.CosmosStore.Core.Initialization.Throughput.Autoscale (p.GetResult(Rus, 4000))
else Equinox.CosmosStore.Core.Initialization.Throughput.Manual (p.GetResult(Rus, 400))
and CosmosModeType = Container | Db | Serverless
and CosmosInitArguments(p : ParseResults<InitAuxParameters>) =
let rusOrDefault value = p.GetResult(Rus, value)
let throughput auto = if auto then CosmosInit.Throughput.Autoscale (rusOrDefault 4000) else CosmosInit.Throughput.Manual (rusOrDefault 400)
member val ProvisioningMode =
match p.GetResult(Mode, CosmosModeType.Container) with
| CosmosModeType.Container -> Equinox.CosmosStore.Core.Initialization.Provisioning.Container throughputSpec
| CosmosModeType.Db -> Equinox.CosmosStore.Core.Initialization.Provisioning.Database throughputSpec
| CosmosModeType.Serverless ->
if p.Contains Rus || p.Contains Autoscale then missingArg "Cannot specify RU/s or Autoscale in Serverless mode"
Equinox.CosmosStore.Core.Initialization.Provisioning.Serverless
match p.GetResult(Mode, CosmosModeType.Container), p.Contains Autoscale with
| CosmosModeType.Container, auto -> CosmosInit.Provisioning.Container (throughput auto)
| CosmosModeType.Db, auto -> CosmosInit.Provisioning.Database (throughput auto)
| CosmosModeType.Serverless, auto when auto || p.Contains Rus -> missingArg "Cannot specify RU/s or Autoscale in Serverless mode"
| CosmosModeType.Serverless, _ -> CosmosInit.Provisioning.Serverless

and [<NoEquality; NoComparison>] CheckpointParameters =
| [<AltCommandLine "-s"; Mandatory>] Source of Propulsion.Feed.SourceId
| [<AltCommandLine "-t"; Mandatory>] Tranche of Propulsion.Feed.TrancheId
Expand Down Expand Up @@ -99,7 +100,6 @@ and [<NoComparison; NoEquality>] StatsParameters =
member a.Usage = a |> function
| Cosmos _ -> "Specify CosmosDB parameters."
| Dynamo _ -> "Specify DynamoDB parameters."
and CosmosModeType = Container | Db | Serverless

let [<Literal>] appName = "propulsion-tool"

Expand All @@ -108,8 +108,7 @@ module CosmosInit =
let aux (c, p : ParseResults<InitAuxParameters>) =
match p.GetSubCommand() with
| InitAuxParameters.Cosmos sa ->
let a = Args.Cosmos.Arguments(c, sa)
let mode = (CosmosInitInfo p).ProvisioningMode
let mode, a = (CosmosInitArguments p).ProvisioningMode, Args.Cosmos.Arguments(c, sa)
let client = a.ConnectLeases()
match mode with
| Equinox.CosmosStore.Core.Initialization.Provisioning.Container throughput ->
Expand Down

0 comments on commit 1d493d0

Please sign in to comment.