Skip to content

Commit

Permalink
Target Equinox 2rc1, Propulsion 101rc3
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jun 18, 2019
1 parent 53cde21 commit fd5e389
Show file tree
Hide file tree
Showing 14 changed files with 71 additions and 121 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Added

- `eqxprojector -k`'s `Consumer` offers a `StreamSpan`-based API for ordered, de-deduplicated consumption without concurrent executions at stream level [#24](https://github.com/jet/dotnet-templates/pull/24)
- `eqxprojector -k`'s `Consumer` summarizes processing outcomes in example using new support for same in `Propulsion.Kafka` [#25](https://github.com/jet/dotnet-templates/pull/25)
- `eqxprojector -k -n`'s `Producer` offers a parallel producer mode which runs all projections in parallel without constraints (or need to synthesize streams) [#24](https://github.com/jet/dotnet-templates/pull/24)
- `eqxsync` has EventStore Sink support via `cosmos` ... `es` commandline option [#23](https://github.com/jet/dotnet-templates/pull/23)

### Changed

- `eqxtestbed`, `eqxweb`, `eqxwebcs` now target `Equinox 2.0.0-rc1`
- `eqxprojector` `-k` now targets `Jet.ConfluentKafka.FSharp` + `Propulsion.Kafka` v `1.0.1-rc3` [#24](https://github.com/jet/dotnet-templates/pull/24)
- `eqxsync` now targets `Propulsion.Cosmos`,`Propulsion.EventStore` v `1.0.1-rc2` [#24](https://github.com/jet/dotnet-templates/pull/24)
- `eqxsync` now targets `Propulsion.Cosmos`,`Propulsion.EventStore` v `1.0.1-rc3` [#24](https://github.com/jet/dotnet-templates/pull/24)

### Removed
### Fixed
Expand Down
2 changes: 1 addition & 1 deletion equinox-projector/Consumer/Consumer.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<ItemGroup>
<PackageReference Include="Argu" Version="5.4.0" />
<PackageReference Include="Destructurama.FSharp.NetCore" Version="1.0.14" />
<PackageReference Include="Equinox.Codec" Version="2.0.0-preview9" />
<PackageReference Include="Equinox.Codec" Version="2.0.0-rc1" />
<PackageReference Include="Propulsion.Kafka" Version="1.0.1-rc3" />
<PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" />
</ItemGroup>
Expand Down
22 changes: 17 additions & 5 deletions equinox-testbed/Testbed/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,11 @@ module CmdParser =
let createStoreLog verbose verboseConsole maybeSeqEndpoint =
let c = LoggerConfiguration().Destructure.FSharpTypes()
let c = if verbose then c.MinimumLevel.Debug() else c
//#if eventStore
let c = c.WriteTo.Sink(Equinox.EventStore.Log.InternalMetrics.Stats.LogSink())
//#endif
//#if cosmos
let c = c.WriteTo.Sink(Storage.Cosmos.RuCounterSink())
let c = c.WriteTo.Sink(Equinox.Cosmos.Store.Log.InternalMetrics.Stats.LogSink())
//#endif
let c = c.WriteTo.Console((if verbose && verboseConsole then LogEventLevel.Debug else LogEventLevel.Warning), theme = Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code)
let c = match maybeSeqEndpoint with None -> c | Some endpoint -> c.WriteTo.Seq(endpoint)
Expand Down Expand Up @@ -155,21 +158,30 @@ module LoadTest =
for r in results do
resultFile.Information("Aggregate: {aggregate}", r)
log.Information("Run completed; Current memory allocation: {bytes:n2} MiB", (GC.GetTotalMemory(true) |> float) / 1024./1024.)
//#if cosmos
//#if cosmos || eventStore

match storeConfig with
//#if cosmos
| Storage.StorageConfig.Cosmos _ ->
Storage.Cosmos.dumpStats duration log
//#if eventStore || memory
Equinox.Cosmos.Store.Log.InternalMetrics.dump log
//#endif
//#if eventStore
| Storage.StorageConfig.Es _ ->
Equinox.EventStore.Log.InternalMetrics.dump log
//#endif
//#if memory
| _ -> ()
//#endif
//#endif

let createDomainLog verbose verboseConsole maybeSeqEndpoint =
let c = LoggerConfiguration().Destructure.FSharpTypes().Enrich.FromLogContext()
let c = if verbose then c.MinimumLevel.Debug() else c
//#if eventStore
let c = c.WriteTo.Sink(Equinox.EventStore.Log.InternalMetrics.Stats.LogSink())
//#endif
//#if cosmos
let c = c.WriteTo.Sink(Storage.Cosmos.RuCounterSink())
let c = c.WriteTo.Sink(Equinox.Cosmos.Store.Log.InternalMetrics.Stats.LogSink())
//#endif
let c = c.WriteTo.Console((if verboseConsole then LogEventLevel.Debug else LogEventLevel.Information), theme = Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code)
let c = match maybeSeqEndpoint with None -> c | Some endpoint -> c.WriteTo.Seq(endpoint)
Expand Down
8 changes: 4 additions & 4 deletions equinox-testbed/Testbed/Services.fs
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,18 @@ type StreamResolver(storage) =
match storage with
//#if memoryStore || (!cosmos && !eventStore)
| Storage.StorageConfig.Memory store ->
Equinox.MemoryStore.MemoryResolver(store, fold, initial).Resolve
Equinox.MemoryStore.Resolver(store, fold, initial).Resolve
//#endif
//#if eventStore
| Storage.StorageConfig.Es (gateway, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.EventStore.AccessStrategy.RollingSnapshots snapshot |> Some else None
Equinox.EventStore.GesResolver<'event,'state>(gateway, codec, fold, initial, ?caching = caching, ?access = accessStrategy).Resolve
Equinox.EventStore.Resolver<'event,'state>(gateway, codec, fold, initial, ?caching = caching, ?access = accessStrategy).Resolve
//#endif
//#if cosmos
| Storage.StorageConfig.Cosmos (gateway, caching, unfolds, databaseId, collectionId) ->
let store = Equinox.Cosmos.CosmosStore(gateway, databaseId, collectionId)
let store = Equinox.Cosmos.Context(gateway, databaseId, collectionId)
let accessStrategy = if unfolds then Equinox.Cosmos.AccessStrategy.Snapshot snapshot |> Some else None
Equinox.Cosmos.CosmosResolver<'event,'state>(store, codec, fold, initial, caching, ?access = accessStrategy).Resolve
Equinox.Cosmos.Resolver<'event,'state>(store, codec, fold, initial, caching, ?access = accessStrategy).Resolve
//#endif

type ServiceBuilder(storageConfig, handlerLog) =
Expand Down
77 changes: 7 additions & 70 deletions equinox-testbed/Testbed/Storage.fs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ type StorageConfig =
| Memory of Equinox.MemoryStore.VolatileStore
//#endif
//#if eventStore
| Es of Equinox.EventStore.GesGateway * Equinox.EventStore.CachingStrategy option * unfolds: bool
| Es of Equinox.EventStore.Context * Equinox.EventStore.CachingStrategy option * unfolds: bool
//#endif
//#if cosmos
| Cosmos of Equinox.Cosmos.CosmosGateway * Equinox.Cosmos.CachingStrategy * unfolds: bool * databaseId: string * collectionId: string
| Cosmos of Equinox.Cosmos.Gateway * Equinox.Cosmos.CachingStrategy * unfolds: bool * databaseId: string * collectionId: string
//#endif

//#if (memoryStore || (!cosmos && !eventStore))
Expand Down Expand Up @@ -72,14 +72,14 @@ module Cosmos =
open Equinox.Cosmos
open Serilog

let private createGateway connection maxItems = CosmosGateway(connection, CosmosBatchingPolicy(defaultMaxItems=maxItems))
let private createGateway connection maxItems = Gateway(connection, BatchingPolicy(defaultMaxItems=maxItems))
let private context (log: ILogger, storeLog: ILogger) (a : Arguments) =
let (Discovery.UriAndKey (endpointUri,_)) as discovery = a.Connection|> Discovery.FromConnectionString
log.Information("CosmosDb {mode} {connection} Database {database} Collection {collection}",
a.Mode, endpointUri, a.Database, a.Collection)
Log.Information("CosmosDb timeout {timeout}s; Throttling retries {retries}, max wait {maxRetryWaitTime}s",
(let t = a.Timeout in t.TotalSeconds), a.Retries, a.MaxRetryWaitTime)
let connector = CosmosConnector(a.Timeout, a.Retries, a.MaxRetryWaitTime, storeLog, mode=a.Mode)
let connector = Connector(a.Timeout, a.Retries, a.MaxRetryWaitTime, storeLog, mode=a.Mode)
discovery, a.Database, a.Collection, connector
let config (log: ILogger, storeLog) (cache, unfolds, batchSize) info =
let discovery, dbName, collName, connector = context (log, storeLog) info
Expand All @@ -91,69 +91,6 @@ module Cosmos =
else CachingStrategy.NoCaching
StorageConfig.Cosmos (createGateway conn batchSize, cacheStrategy, unfolds, dbName, collName)

open Serilog.Events
open Equinox.Cosmos.Store

let inline (|Stats|) ({ interval = i; ru = ru }: Log.Measurement) = ru, let e = i.Elapsed in int64 e.TotalMilliseconds

let (|CosmosReadRc|CosmosWriteRc|CosmosResyncRc|CosmosResponseRc|) = function
| Log.Tip (Stats s)
| Log.TipNotFound (Stats s)
| Log.TipNotModified (Stats s)
| Log.Query (_,_, (Stats s)) -> CosmosReadRc s
// slices are rolled up into batches so be sure not to double-count
| Log.Response (_,(Stats s)) -> CosmosResponseRc s
| Log.SyncSuccess (Stats s)
| Log.SyncConflict (Stats s) -> CosmosWriteRc s
| Log.SyncResync (Stats s) -> CosmosResyncRc s
let (|SerilogScalar|_|) : LogEventPropertyValue -> obj option = function
| (:? ScalarValue as x) -> Some x.Value
| _ -> None
let (|CosmosMetric|_|) (logEvent : LogEvent) : Log.Event option =
match logEvent.Properties.TryGetValue("cosmosEvt") with
| true, SerilogScalar (:? Log.Event as e) -> Some e
| _ -> None
type RuCounter =
{ mutable rux100: int64; mutable count: int64; mutable ms: int64 }
static member Create() = { rux100 = 0L; count = 0L; ms = 0L }
member __.Ingest (ru, ms) =
System.Threading.Interlocked.Increment(&__.count) |> ignore
System.Threading.Interlocked.Add(&__.rux100, int64 (ru*100.)) |> ignore
System.Threading.Interlocked.Add(&__.ms, ms) |> ignore
type RuCounterSink() =
static member val Read = RuCounter.Create()
static member val Write = RuCounter.Create()
static member val Resync = RuCounter.Create()
interface Serilog.Core.ILogEventSink with
member __.Emit logEvent = logEvent |> function
| CosmosMetric (CosmosReadRc stats) -> RuCounterSink.Read.Ingest stats
| CosmosMetric (CosmosWriteRc stats) -> RuCounterSink.Write.Ingest stats
| CosmosMetric (CosmosResyncRc stats) -> RuCounterSink.Resync.Ingest stats
| _ -> ()

let dumpStats duration (log: Serilog.ILogger) =
let stats =
[ "Read", RuCounterSink.Read
"Write", RuCounterSink.Write
"Resync", RuCounterSink.Resync ]
let mutable totalCount, totalRc, totalMs = 0L, 0., 0L
let logActivity name count rc lat =
log.Information("{name}: {count:n0} requests costing {ru:n0} RU (average: {avg:n2}); Average latency: {lat:n0}ms",
name, count, rc, (if count = 0L then Double.NaN else rc/float count), (if count = 0L then Double.NaN else float lat/float count))
for name, stat in stats do
let ru = float stat.rux100 / 100.
totalCount <- totalCount + stat.count
totalRc <- totalRc + ru
totalMs <- totalMs + stat.ms
logActivity name stat.count ru stat.ms
logActivity "TOTAL" totalCount totalRc totalMs
let measures : (string * (TimeSpan -> float)) list =
[ "s", fun x -> x.TotalSeconds
"m", fun x -> x.TotalMinutes
"h", fun x -> x.TotalHours ]
let logPeriodicRate name count ru = log.Information("rp{name} {count:n0} = ~{ru:n0} RU", name, count, ru)
for uom, f in measures do let d = f duration in if d <> 0. then logPeriodicRate uom (float totalCount/d |> int64) (totalRc/d)

//#endif
//#if eventStore
/// To establish a local node to run the tests against:
Expand Down Expand Up @@ -192,12 +129,12 @@ module EventStore =
member __.ConcurrentOperationsLimit = a.GetResult(ConcurrentOperationsLimit,5000)

let private connect (log: Serilog.ILogger) (dnsQuery, heartbeatTimeout, col) (username, password) (operationTimeout, operationRetries) =
GesConnector(username, password, reqTimeout=operationTimeout, reqRetries=operationRetries,
Connector(username, password, reqTimeout=operationTimeout, reqRetries=operationRetries,
heartbeatTimeout=heartbeatTimeout, concurrentOperationsLimit = col,
log=(if log.IsEnabled(Serilog.Events.LogEventLevel.Debug) then Logger.SerilogVerbose log else Logger.SerilogNormal log),
tags=["M", Environment.MachineName; "I", Guid.NewGuid() |> string])
.Establish("TestbedTemplate", Discovery.GossipDns dnsQuery, ConnectionStrategy.ClusterTwinPreferSlaveReads)
let private createGateway connection batchSize = GesGateway(connection, GesBatchingPolicy(maxBatchSize = batchSize))
let private createContext connection batchSize = Context(connection, BatchingPolicy(maxBatchSize = batchSize))
let config (log: Serilog.ILogger, storeLog) (cache, unfolds, batchSize) (args : ParseResults<Parameters>) =
let a = Arguments(args)
let (timeout, retries) as operationThrottling = a.Timeout, a.Retries
Expand All @@ -211,5 +148,5 @@ module EventStore =
let c = Caching.Cache("TestbedTemplate", sizeMb = 50)
CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some
else None
StorageConfig.Es ((createGateway conn batchSize), cacheStrategy, unfolds)
StorageConfig.Es ((createContext conn batchSize), cacheStrategy, unfolds)
//#endif
8 changes: 4 additions & 4 deletions equinox-testbed/Testbed/Testbed.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
<ItemGroup>
<PackageReference Include="Argu" Version="5.4.0" />
<PackageReference Include="Destructurama.FSharp.NetCore" Version="1.0.14" />
<PackageReference Include="Equinox.Cosmos" Version="2.0.0-preview9" />
<PackageReference Include="Equinox.EventStore" Version="2.0.0-preview9" />
<PackageReference Include="Equinox.MemoryStore" Version="2.0.0-preview9" />
<PackageReference Include="Equinox.Tools.TestHarness" Version="2.0.0-preview9" />
<PackageReference Include="Equinox.Cosmos" Version="2.0.0-rc1" />
<PackageReference Include="Equinox.EventStore" Version="2.0.0-rc1" />
<PackageReference Include="Equinox.MemoryStore" Version="2.0.0-rc1" />
<PackageReference Include="Equinox.Tools.TestHarness" Version="2.0.0-rc1" />
<PackageReference Include="FSharp.UMX" Version="1.0.0-preview-001" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.1.1" />
<PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" />
Expand Down
2 changes: 1 addition & 1 deletion equinox-web-csharp/Domain/Domain.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Equinox" Version="2.0.0-preview9" />
<PackageReference Include="Equinox" Version="2.0.0-rc1" />
<PackageReference Include="FSharp.Core" Version="4.5.4" />
<PackageReference Include="Newtonsoft.Json" Version="11.0.2" />
</ItemGroup>
Expand Down
14 changes: 7 additions & 7 deletions equinox-web-csharp/Web/CosmosContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class CosmosContext : EquinoxContext
{
readonly Caching.Cache _cache;

CosmosStore _store;
Context _store;
readonly Func<Task> _connect;

public CosmosContext(CosmosConfig config)
Expand All @@ -47,21 +47,21 @@ public CosmosContext(CosmosConfig config)
{
var gateway = await Connect("App", config.Mode, discovery, timeout, retriesOn429Throttling,
(int)timeout.TotalSeconds);
var collectionMapping = new CosmosCollections(config.Database, config.Collection);
var collectionMapping = new Collections(config.Database, config.Collection);

_store = new CosmosStore(gateway, collectionMapping);
_store = new Context(gateway, collectionMapping);
};
}

internal override async Task Connect() => await _connect();

static async Task<CosmosGateway> Connect(string appName, ConnectionMode mode, Discovery discovery, TimeSpan operationTimeout,
static async Task<Gateway> Connect(string appName, ConnectionMode mode, Discovery discovery, TimeSpan operationTimeout,
int maxRetryForThrottling, int maxRetryWaitSeconds)
{
var log = Log.ForContext<CosmosContext>();
var c = new CosmosConnector(operationTimeout, maxRetryForThrottling, maxRetryWaitSeconds, log, mode: mode);
var c = new Connector(operationTimeout, maxRetryForThrottling, maxRetryWaitSeconds, log, mode: mode);
var conn = await FSharpAsync.StartAsTask(c.Connect(appName, discovery), null, null);
return new CosmosGateway(conn, new CosmosBatchingPolicy(defaultMaxItems: 500));
return new Gateway(conn, new BatchingPolicy(defaultMaxItems: 500));
}

public override Func<Target,Equinox.Store.IStream<TEvent, TState>> Resolve<TEvent, TState>(
Expand All @@ -79,7 +79,7 @@ static async Task<CosmosGateway> Connect(string appName, ConnectionMode mode, Di
var cacheStrategy = _cache == null
? null
: CachingStrategy.NewSlidingWindow(_cache, TimeSpan.FromMinutes(20));
var resolver = new CosmosResolver<TEvent, TState>(_store, codec, FuncConvert.FromFunc(fold), initial, cacheStrategy, accessStrategy);
var resolver = new Resolver<TEvent, TState>(_store, codec, FuncConvert.FromFunc(fold), initial, cacheStrategy, accessStrategy);
return t => resolver.Resolve.Invoke(t);
}
}
Expand Down
12 changes: 6 additions & 6 deletions equinox-web-csharp/Web/EventStoreContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,26 @@ public class EventStoreContext : EquinoxContext
{
readonly Caching.Cache _cache;

GesGateway _gateway;
Context _connection;
readonly Func<Task> _connect;

public EventStoreContext(EventStoreConfig config)
{
_cache = new Caching.Cache("Es", config.CacheMb);
_connect = async () => _gateway = await Connect(config);
_connect = async () => _connection = await Connect(config);
}

internal override async Task Connect() => await _connect();

static async Task<GesGateway> Connect(EventStoreConfig config)
static async Task<Context> Connect(EventStoreConfig config)
{
var log = Logger.NewSerilogNormal(Serilog.Log.ForContext<EventStoreContext>());
var c = new GesConnector(config.Username, config.Password, reqTimeout: TimeSpan.FromSeconds(5), reqRetries: 1);
var c = new Connector(config.Username, config.Password, reqTimeout: TimeSpan.FromSeconds(5), reqRetries: 1);

var conn = await FSharpAsync.StartAsTask(
c.Establish("Twin", Discovery.NewGossipDns(config.Host), ConnectionStrategy.ClusterTwinPreferSlaveReads),
null, null);
return new GesGateway(conn, new GesBatchingPolicy(maxBatchSize: 500));
return new Context(conn, new BatchingPolicy(maxBatchSize: 500));
}

public override Func<Target,IStream<TEvent, TState>> Resolve<TEvent, TState>(
Expand All @@ -66,7 +66,7 @@ public override Func<Target,IStream<TEvent, TState>> Resolve<TEvent, TState>(
var cacheStrategy = _cache == null
? null
: CachingStrategy.NewSlidingWindow(_cache, TimeSpan.FromMinutes(20));
var resolver = new GesResolver<TEvent, TState>(_gateway, codec, FuncConvert.FromFunc(fold),
var resolver = new Resolver<TEvent, TState>(_connection, codec, FuncConvert.FromFunc(fold),
initial, cacheStrategy, accessStrategy);
return t => resolver.Resolve.Invoke(t);
}
Expand Down
2 changes: 1 addition & 1 deletion equinox-web-csharp/Web/MemoryStoreContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public override Func<Target,IStream<TEvent, TState>> Resolve<TEvent, TState>(
Func<TEvent, bool> isOrigin = null,
Func<TState, TEvent> compact = null)
{
var resolver = new MemoryResolver<TEvent, TState>(_store, FuncConvert.FromFunc(fold), initial);
var resolver = new Resolver<TEvent, TState>(_store, FuncConvert.FromFunc(fold), initial);
return target => resolver.Resolve.Invoke(target);
}

Expand Down
Loading

0 comments on commit fd5e389

Please sign in to comment.