Skip to content

Commit

Permalink
Tuning
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed May 16, 2022
1 parent a59ae74 commit 032b2e0
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 9 deletions.
7 changes: 7 additions & 0 deletions Propulsion.sln
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Propulsion.DynamoStore", "s
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Propulsion.DynamoStore.Lambda", "src\Propulsion.DynamoStore.Lambda\Propulsion.DynamoStore.Lambda.fsproj", "{0EE2957C-92FC-4D8D-9783-A48B9BE032B7}"
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.DynamoStore", "..\equinox\src\Equinox.DynamoStore\Equinox.DynamoStore.fsproj", "{BC68A82F-B702-4742-97A7-7B04D4293E47}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -115,6 +117,10 @@ Global
{0EE2957C-92FC-4D8D-9783-A48B9BE032B7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{0EE2957C-92FC-4D8D-9783-A48B9BE032B7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{0EE2957C-92FC-4D8D-9783-A48B9BE032B7}.Release|Any CPU.Build.0 = Release|Any CPU
{BC68A82F-B702-4742-97A7-7B04D4293E47}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{BC68A82F-B702-4742-97A7-7B04D4293E47}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BC68A82F-B702-4742-97A7-7B04D4293E47}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BC68A82F-B702-4742-97A7-7B04D4293E47}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -134,6 +140,7 @@ Global
{A1964569-C6A9-46D3-8DD9-01E04AF8CC81} = {4670F7C4-A4FD-4E3F-B97C-99F9B3FC1898}
{0B1D505F-FD80-428E-92E5-4367E95E96DF} = {4670F7C4-A4FD-4E3F-B97C-99F9B3FC1898}
{0EE2957C-92FC-4D8D-9783-A48B9BE032B7} = {4670F7C4-A4FD-4E3F-B97C-99F9B3FC1898}
{BC68A82F-B702-4742-97A7-7B04D4293E47} = {4670F7C4-A4FD-4E3F-B97C-99F9B3FC1898}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {DF04AF73-7412-46E5-9CC8-15CB48E3139A}
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.DynamoStore.Lambda/Connector.fs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type Configuration(?tryGet) =
open Equinox.DynamoStore

type Connector(serviceUrl, accessKey, secretKey, table) =
let retries, timeout, queryMaxItems, maxBytes = 10, System.TimeSpan.FromSeconds 60., 50, 64*1024
let retries, timeout, queryMaxItems, maxBytes = 10, System.TimeSpan.FromSeconds 60., 100, 48*1024

let conn = DynamoStoreConnector(serviceUrl, accessKey, secretKey, retries, timeout)
let client = conn.CreateClient()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
"region": "",
"configuration": "Release",
"function-runtime": "dotnet6",
"function-memory-size": 128,
"function-timeout": 120,
"function-memory-size": 192,
"function-timeout": 180,
"function-architecture": "arm64",
"function-handler": "Propulsion.DynamoStore.Lambda::Propulsion.DynamoStore.Lambda.Function::FunctionHandler"
}
11 changes: 8 additions & 3 deletions src/Propulsion.DynamoStore/AppendsEpoch.fs
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,20 @@ type Service internal (shouldClose, resolve : AppendsTrancheId * AppendsEpochId

let isSelf p = IndexStreamId.toStreamName p |> FsCodec.StreamName.splitCategoryAndId |> fst = Category
if spans |> Array.exists (function { p = p } -> isSelf p) then invalidArg (nameof spans) "Writes to indices should be filtered prior to indexing"
decider.Transact(Ingest.decide shouldClose spans, if assumeEmpty = Some true then Equinox.AssumeEmpty else Equinox.AllowStale)
decider.TransactEx((fun (c : Equinox.ISyncContext<_>) -> async {
return Ingest.decide (shouldClose c.Version) spans c.State
}), (fun r _c -> r), if assumeEmpty = Some true then Equinox.AssumeEmpty else Equinox.AllowStale)

module Config =

let private resolveStream (context, cache) =
let cat = Config.createUnoptimized Events.codec Fold.initial Fold.fold (context, Some cache)
cat.Resolve
let create log maxItemsPerEpoch store =
let shouldClose totalItems = totalItems >= maxItemsPerEpoch
let create (log : Serilog.ILogger) (maxVersion, maxItemsPerEpoch) store =
let shouldClose version totalStreams =
let closing = version >= maxVersion || totalStreams >= maxItemsPerEpoch
if closing then log.Information("Closing v{version} Streams {streams}", version, totalStreams)
closing
let resolve = streamName >> resolveStream store >> Config.createDecider log
Service(shouldClose, resolve)

Expand Down
5 changes: 3 additions & 2 deletions src/Propulsion.DynamoStore/DynamoStoreIndexer.fs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
namespace Propulsion.DynamoStore

type DynamoStoreIndexer(log : Serilog.ILogger, context, cache, ?maxItemsPerEpoch, ?storeLog) =
type DynamoStoreIndexer(log : Serilog.ILogger, context, cache, ?maxItemsPerEpoch, ?maxVersion, ?storeLog) =
let maxVersion = defaultArg maxVersion 2_000
let maxItemsPerEpoch = defaultArg maxItemsPerEpoch 100_000
do if maxItemsPerEpoch > AppendsEpoch.MaxItemsPerEpoch then invalidArg (nameof maxItemsPerEpoch) "Cannot exceed AppendsEpoch.MaxItemsPerEpoch"
let storeLog = defaultArg storeLog log
let log = log.ForContext<DynamoStoreIndexer>()

let ingester =
let epochs = AppendsEpoch.Config.create storeLog maxItemsPerEpoch (context, cache)
let epochs = AppendsEpoch.Config.create storeLog (maxVersion, maxItemsPerEpoch) (context, cache)
let index = AppendsIndex.Config.create storeLog (context, cache)
let createIngester trancheId =
let log = log.ForContext("trancheId", trancheId)
Expand Down
4 changes: 3 additions & 1 deletion src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@

<PackageReference Include="FSharp.Core" Version="4.7.2" />

<PackageReference Include="Equinox.DynamoStore" Version="4.0.0-beta.2" />
<!-- <PackageReference Include="Equinox.DynamoStore" Version="4.0.0-beta.4" />-->
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.2.2" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Propulsion.Feed\Propulsion.Feed.fsproj" />
<ProjectReference Include="..\..\..\equinox\src\Equinox\Equinox.fsproj" />
<ProjectReference Include="..\..\..\equinox\src\Equinox.DynamoStore\Equinox.DynamoStore.fsproj" />
</ItemGroup>

</Project>

0 comments on commit 032b2e0

Please sign in to comment.