Skip to content

Target FsKafka 1.7.0, remove .Kafka0 #119

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `CosmosStore`: Target `Microsoft.Azure.Cosmos` (V3 CFP) `3.0.21`
- `CosmosStore`: rename `maxDocuments` to `maxItems`
- `Pipeline`: Renamed `AwaitCompletion` to `AwaitShutdown` [#118](https://github.com/jet/propulsion/pull/118)
- `Kafka`: Target [`FsKafka [1.7.0, 1.9.99)`](https://github.com/jet/FsKafka/releases/tag/1.7.0)

### Removed

- `Kafka0`: Triggered by [removal of underlying `FsKafka0`](https://github.com/jet/FsKafka/releases/tag/1.7.0) [#119](https://github.com/jet/propulsion/pull/119)
- `Kafka`: Removed support for `net461`, [triggered by removal of underlying `FsKafka0`](https://github.com/jet/FsKafka/releases/tag/1.7.0)~~~~~~~~ [#119](https://github.com/jet/propulsion/pull/119)

### Fixed

- Fix CI to stop emitting builds with incorrect `AssemblyVersion 1.0.0.0` (updated MinVer to `2.5.0`)
Expand Down
14 changes: 0 additions & 14 deletions Propulsion.sln
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Propulsion.EventStore", "sr
EndProject
Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Propulsion.Kafka.Integration", "tests\Propulsion.Kafka.Integration\Propulsion.Kafka.Integration.fsproj", "{F59736C3-1D6F-4DB4-AEAA-8210691B3A89}"
EndProject
Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Propulsion.Kafka0", "src\Propulsion.Kafka0\Propulsion.Kafka0.fsproj", "{DF94FCE3-25A7-42FE-9CE5-11969302EE4C}"
EndProject
Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Propulsion.Kafka0.Integration", "tests\Propulsion.Kafka0.Integration\Propulsion.Kafka0.Integration.fsproj", "{74C2044E-4F3F-41FC-AA42-B1A0D71D0AD0}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tools", "tools", "{8DF0ED1F-DE93-45A6-B2E3-A077731EA1A3}"
EndProject
Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Propulsion.Tool", "tools\Propulsion.Tool\Propulsion.Tool.fsproj", "{0DBF5289-D429-406E-A195-1AB7785F51C1}"
Expand Down Expand Up @@ -79,14 +75,6 @@ Global
{F59736C3-1D6F-4DB4-AEAA-8210691B3A89}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F59736C3-1D6F-4DB4-AEAA-8210691B3A89}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F59736C3-1D6F-4DB4-AEAA-8210691B3A89}.Release|Any CPU.Build.0 = Release|Any CPU
{DF94FCE3-25A7-42FE-9CE5-11969302EE4C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DF94FCE3-25A7-42FE-9CE5-11969302EE4C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DF94FCE3-25A7-42FE-9CE5-11969302EE4C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DF94FCE3-25A7-42FE-9CE5-11969302EE4C}.Release|Any CPU.Build.0 = Release|Any CPU
{74C2044E-4F3F-41FC-AA42-B1A0D71D0AD0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{74C2044E-4F3F-41FC-AA42-B1A0D71D0AD0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{74C2044E-4F3F-41FC-AA42-B1A0D71D0AD0}.Release|Any CPU.ActiveCfg = Release|Any CPU
{74C2044E-4F3F-41FC-AA42-B1A0D71D0AD0}.Release|Any CPU.Build.0 = Release|Any CPU
{0DBF5289-D429-406E-A195-1AB7785F51C1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{0DBF5289-D429-406E-A195-1AB7785F51C1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{0DBF5289-D429-406E-A195-1AB7785F51C1}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand Down Expand Up @@ -114,8 +102,6 @@ Global
{A3615CCC-1CBC-463A-8E1C-5FC67440C5E8} = {4670F7C4-A4FD-4E3F-B97C-99F9B3FC1898}
{1564D314-0676-49F7-A7A1-CCA3AAF0A45F} = {4670F7C4-A4FD-4E3F-B97C-99F9B3FC1898}
{F59736C3-1D6F-4DB4-AEAA-8210691B3A89} = {302B09C4-7F38-4CF7-93B9-1B7A6035386E}
{DF94FCE3-25A7-42FE-9CE5-11969302EE4C} = {4670F7C4-A4FD-4E3F-B97C-99F9B3FC1898}
{74C2044E-4F3F-41FC-AA42-B1A0D71D0AD0} = {302B09C4-7F38-4CF7-93B9-1B7A6035386E}
{0DBF5289-D429-406E-A195-1AB7785F51C1} = {8DF0ED1F-DE93-45A6-B2E3-A077731EA1A3}
{8E15F0BD-2D8F-42AE-A789-90A2F5C84CF2} = {4670F7C4-A4FD-4E3F-B97C-99F9B3FC1898}
{356294D8-DF59-4903-9A9C-03F0F459B2A3} = {4670F7C4-A4FD-4E3F-B97C-99F9B3FC1898}
Expand Down
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ The components within this repository are delivered as a multi-targeted Nuget pa

- `Propulsion.EventStore` [![NuGet](https://img.shields.io/nuget/v/Propulsion.EventStore.svg)](https://www.nuget.org/packages/Propulsion.EventStore/). Provides bindings to [EventStore](https://www.eventstore.org), writing via `Propulsion.EventStore.EventStoreSink` [Depends](https://www.fuget.org/packages/Propulsion.EventStore) on `Equinox.EventStore` v `2.6.0`, `Serilog`
- `Propulsion.Feed` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Feed.svg)](https://www.nuget.org/packages/Propulsion.Feed/) Provides helpers for streamwise consumption of a feed of information with an arbitrary interface (e.g. a third-party Feed API), including the maintenance of checkpoints within such a feed. [Depends](https://www.fuget.org/packages/Propulsion.Feed) on `Propulsion`, a `IFeedCheckpointStore` implementation (from e.g., `Propulsion.Cosmos` or `Propulsion.CosmosStore`)
- `Propulsion.Kafka` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Kafka.svg)](https://www.nuget.org/packages/Propulsion.Kafka/) Provides bindings for producing and consuming both streamwise and in parallel. Includes a standard codec for use with streamwise projection and consumption, `Propulsion.Kafka.Codec.NewtonsoftJson.RenderedSpan`. [Depends](https://www.fuget.org/packages/Propulsion.Kafka) on `FsKafka` v `1.5.x`, `Serilog`
- `Propulsion.Kafka0` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Kafka0.svg)](https://www.nuget.org/packages/Propulsion.Kafka0/). Same functionality/purpose as `Propulsion.Kafka` but uses `FsKafka0` instead of `FsKafka` in order to target an older `Confluent.Kafka`/`librdkafka` version pairing for interoperability with systems that have a hard dependency on that. [Depends](https://www.fuget.org/packages/Propulsion.Kafka0) on `FsKafka0` v `1.5.x` (which depends on `Confluent.Kafka [0.11.3]`, `librdkafka.redist [0.11.4]`), `Serilog`
- `Propulsion.Kafka` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Kafka.svg)](https://www.nuget.org/packages/Propulsion.Kafka/) Provides bindings for producing and consuming both streamwise and in parallel. Includes a standard codec for use with streamwise projection and consumption, `Propulsion.Kafka.Codec.NewtonsoftJson.RenderedSpan`. [Depends](https://www.fuget.org/packages/Propulsion.Kafka) on `FsKafka` v `1.7.0`-`1.9.99`, `Serilog`
- `Propulsion.SqlStreamStore` [![NuGet](https://img.shields.io/nuget/v/Propulsion.SqlStreamStore.svg)](https://www.nuget.org/packages/Propulsion.SqlStreamStore/). Provides bindings to [SqlStreamStore](https://github.com/SQLStreamStore/SQLStreamStore), maintaining checkpoints in a SQL table using Dapper [Depends](https://www.fuget.org/packages/Propulsion.SqlStreamStore) on `SqlStreamStore`, `Dapper` v `2.0`, `Microsoft.Data.SqlClient` v `1.1.3`, `Serilog`

The ubiquitous `Serilog` dependency is solely on the core module, not any sinks, i.e. you configure to emit to `NLog` etc.
Expand Down
7 changes: 2 additions & 5 deletions build.proj
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,16 @@
<Exec Command="dotnet pack src/Propulsion.Feed $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Propulsion.SqlStreamStore $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Propulsion.Kafka $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Propulsion.Kafka0 $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack tools/Propulsion.Tool $(Cfg) $(PackOptions)" />
</Target>

<Target Name="VSTest">
<Exec Command="dotnet test tests/Propulsion.Tests $(Cfg) $(TestOptions)" />
<!-- NB these two suites need to be run explicitly in serial or they fail on CI (i.e. doing dotnet test on the .sln causes various hangs etc.)-->
<!-- NB previously, when there were multiple integration tests running concurrently, they failed on CI (i.e. doing dotnet test on the .sln causes various hangs etc.)-->
<!-- Wild guess says this is down to having two rdkafkas in a single process but who knows.-->
<Exec Command="dotnet test tests/Propulsion.Kafka.Integration $(Cfg) $(TestOptions)" />
<!-- Separated as it seems that mixing Kafka and Kafka0 can cause hangs -->
<Exec Command="dotnet test tests/Propulsion.Kafka0.Integration $(Cfg) $(TestOptions)" />
</Target>

<Target Name="Build" DependsOnTargets="Pack;VSTest" />

</Project>
</Project>
27 changes: 0 additions & 27 deletions src/Propulsion.Kafka/Binding.fs

This file was deleted.

48 changes: 17 additions & 31 deletions src/Propulsion.Kafka/Consumers.fs
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,12 @@ module private Impl =
16 + len m.Key + len m.Value |> int64
let inline mb x = float x / 1024. / 1024.

#if !KAFKA0 // TODO push overload down into FsKafka
#nowarn "0040"
module private Binding =

[<AutoOpen>]
module internal Shims =

type FsKafka.Core.InFlightMessageCounter with
member __.AwaitThreshold(ct : CancellationToken, consumer : IConsumer<_,_>, ?busyWork) =
// Avoid having our assignments revoked due to MAXPOLL (exceeding max.poll.interval.ms between calls to .Consume)
let showConsumerWeAreStillAlive () =
let tps = consumer.Assignment
consumer.Pause(tps)
match busyWork with Some f -> f () | None -> ()
let _ = consumer.Consume(1)
consumer.Resume(tps)
__.AwaitThreshold(ct, showConsumerWeAreStillAlive)
#endif
let mapConsumeResult (result : ConsumeResult<string,string>) =
let m = Binding.message result
if m = null then invalidOp "Cannot dereference null message"
KeyValuePair(m.Key, m.Value)

/// Continuously polls across the assigned partitions, building spans; periodically (at intervals of `emitInterval`), `submit`s accumulated messages as
/// checkpointable Batches
Expand All @@ -88,10 +77,11 @@ type KafkaIngestionEngine<'Info>
let mkSubmission topicPartition span : Submission.SubmissionBatch<'S, 'M> =
let checkpoint () =
counter.Delta(-span.reservation) // counterbalance Delta(+) per ingest, below
Binding.storeOffset log consumer span.highWaterMark
try consumer.StoreOffset(span.highWaterMark)
with e -> log.Error(e, "Consuming... storing offsets failed")
{ source = topicPartition; onCompletion = checkpoint; messages = span.messages.ToArray() }
let ingest result =
let m = FsKafka.Binding.message result
let ingest (result : ConsumeResult<string, string>) =
let m = result.Message
if m = null then invalidOp "Cannot dereference null message"
let sz = approximateMessageBytes m
counter.Delta(+sz) // counterbalanced by Delta(-) in checkpoint(), below
Expand Down Expand Up @@ -121,23 +111,19 @@ type KafkaIngestionEngine<'Info>
try while not ct.IsCancellationRequested do
match counter.IsOverLimitNow(), remainingIngestionWindow () with
| true, _ ->
#if KAFKA0
let busyWork () =
submit()
maybeLogStats()
Thread.Sleep 1
counter.AwaitThreshold(ct, busyWork)
#else
let busyWork () =
submit()
maybeLogStats()
counter.AwaitThreshold(ct, consumer, busyWork)
#endif
| false, None ->
submit()
maybeLogStats()
| false, Some intervalRemainder ->
Binding.tryConsume log consumer intervalRemainder ingest
try match consumer.Consume(intervalRemainder) with
| null -> ()
| message -> ingest message
with| :? OperationCanceledException -> log.Warning("Consuming... cancelled")
| :? ConsumeException as e -> log.Warning(e, "Consuming... exception")
finally
submit () // We don't want to leak our reservations against the counter and want to pass of messages we ingested
dumpStats () // Unconditional logging when completing
Expand All @@ -160,9 +146,9 @@ type ConsumerPipeline private (inner : IConsumer<string, string>, task : Task<un
float config.Buffering.maxInFlightBytes / 1024. / 1024. / 1024., maxDelay.TotalSeconds, maxItems)
let limiterLog = log.ForContext(Serilog.Core.Constants.SourceContextPropertyName, Core.Constants.messageCounterSourceContext)
let limiter = Core.InFlightMessageCounter(limiterLog, config.Buffering.minInFlightBytes, config.Buffering.maxInFlightBytes)
let consumer, closeConsumer = Binding.createConsumer log config.Inner // teardown is managed by ingester.Pump()
let consumer = ConsumerBuilder.WithLogging(log, config.Inner) // teardown is managed by ingester.Pump()
consumer.Subscribe config.Topics
let ingester = KafkaIngestionEngine<'M>(log, limiter, consumer, closeConsumer, mapResult, submit, maxItems, maxDelay, statsInterval=statsInterval)
let ingester = KafkaIngestionEngine<'M>(log, limiter, consumer, consumer.Close, mapResult, submit, maxItems, maxDelay, statsInterval=statsInterval)
let cts = new CancellationTokenSource()
let ct = cts.Token
let tcs = TaskCompletionSource<unit>()
Expand Down Expand Up @@ -334,7 +320,7 @@ module Core =
FsCodec.Core.TimelineEvent.Create(index, String.Empty, data, context = context)

let toStreamName defaultCategory (result : ConsumeResult<string, string>) =
let m = Binding.message result
let m = result.Message
if m = null then invalidOp "Cannot dereference null message"
parseMessageKey defaultCategory m.Key

Expand Down
21 changes: 3 additions & 18 deletions src/Propulsion.Kafka/Producers.fs
Original file line number Diff line number Diff line change
Expand Up @@ -31,33 +31,18 @@ type Producer

/// Execute a producer operation, including recording of the latency statistics for the operation
/// NOTE: the `execute` function is expected to throw in the event of a failure to produce (this is the standard semantic for all Confluent.Kafka ProduceAsync APIs)
#if KAFKA0
/// NOTE: Confluent.Kafka0 APIs are expected to adhere to V1 semantics (i.e. throwing in the event of a failure to produce)
/// However, a failure check is incorporated here as a backstop
member __.Produce(execute : KafkaProducer -> Async<Message<_,_>>) : Async<Message<_,_>> = async {
#else
member __.Produce(execute : KafkaProducer -> Async<'r>) : Async<'r> = async {
#endif
let producer = producers.[System.Threading.Interlocked.Increment(&robin) % producers.Length]
let sw = System.Diagnostics.Stopwatch.StartNew()
let! res = execute producer
#if KAFKA0
if res.Error.HasError then return invalidOp res.Error.Reason // CK 1.x throws, we do the same here for consistency
#endif
produceStats.Record sw.Elapsed
return res }

#if KAFKA0
/// Throws if producing fails, for consistency with Confluent.Kafka >= 1.0 APIs
/// NOTE The underlying 0.11.x Confluent.Kafka drivers do not throw; This implementation throws if the response `.Error.HasError` for symmetry with the Confluent.Kafka >= 1 behavior
/// NOTE Propulsion.Kafka (i.e. not using Propulsion.Kafka0) adds an optional `headers` argument
member __.Produce(key, value) =
__.Produce(fun producer -> producer.ProduceAsync(key, value)) |> Async.Ignore
#else
/// Throws if producing fails, per normal Confluent.Kafka 1.x semantics
member __.Produce(key, value, ?headers) =
__.Produce(fun producer -> producer.ProduceAsync(key, value, ?headers=headers) |> Async.Ignore)
#endif
match headers with
| Some h -> __.Produce(fun producer -> producer.ProduceAsync(key, value, h) |> Async.Ignore)
| None -> __.Produce(fun producer -> producer.ProduceAsync(key, value) |> Async.Ignore)

[<Obsolete("Please migrate code to an appropriate Produce overload")>]
/// Throws if producing fails, per normal Confluent.Kafka 1.x semantics
Expand Down
10 changes: 4 additions & 6 deletions src/Propulsion.Kafka/Propulsion.Kafka.fsproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>netstandard2.0;net461</TargetFrameworks>
<TargetFramework>netstandard2.0</TargetFramework>
<WarningLevel>5</WarningLevel>
<IsTestProject>false</IsTestProject>
<DisableImplicitFSharpCoreReference>true</DisableImplicitFSharpCoreReference>
Expand All @@ -12,7 +12,6 @@
<ItemGroup>
<Compile Include="Codec.fs" />
<Compile Include="Infrastructure.fs" />
<Compile Include="Binding.fs" />
<Compile Include="Consumers.fs" />
<Compile Include="Producers.fs" />
<Compile Include="ProducerSinks.fs" />
Expand All @@ -22,16 +21,15 @@
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All" />
<PackageReference Include="MinVer" Version="2.5.0" PrivateAssets="All" />

<PackageReference Include="FSharp.Core" Version="3.1.2.5" Condition=" '$(TargetFramework)' == 'net461' " />
<PackageReference Include="FSharp.Core" Version="4.3.4" Condition=" '$(TargetFramework)' != 'net461' " />
<PackageReference Include="FSharp.Core" Version="4.3.4" />
<PackageReference Include="System.ValueTuple" Version="4.5.0" />

<PackageReference Include="FsCodec.NewtonsoftJson" Version="2.0.0" />
<PackageReference Include="FsKafka" Version="[1.5.0,1.6.0)" />
<PackageReference Include="FsKafka" Version="[1.7.0, 1.9.99)" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Propulsion\Propulsion.fsproj" />
</ItemGroup>

</Project>
</Project>
27 changes: 0 additions & 27 deletions src/Propulsion.Kafka0/Binding.fs

This file was deleted.

Loading