Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: expose librdkafka statistics as swift metrics #92

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
c870864
introduce statistics for producer
blindspotbounty Jul 26, 2023
633773f
Merge remote-tracking branch 'origin/main' into feature/sc-1976/gsoc-…
blindspotbounty Jul 31, 2023
d9887b9
add statistics to new consumer with events
blindspotbounty Jul 31, 2023
e5f0483
Merge remote-tracking branch 'origin/main' into feature/sc-1976/gsoc-…
blindspotbounty Aug 10, 2023
d55a7fd
fix some artefacts
blindspotbounty Aug 10, 2023
8b4525b
adjust to KeyRefreshAttempts
blindspotbounty Aug 10, 2023
23e08fc
draft: statistics with metrics
blindspotbounty Aug 17, 2023
612a3c4
make structures internal
blindspotbounty Aug 17, 2023
5c10435
Update Sources/Kafka/Configuration/KafkaConfiguration+Metrics.swift
blindspotbounty Aug 21, 2023
2be2bd9
Update Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift
blindspotbounty Aug 21, 2023
2cd0f8b
Update Sources/Kafka/Configuration/KafkaConfiguration+Metrics.swift
blindspotbounty Aug 21, 2023
abd97de
Update Sources/Kafka/Configuration/KafkaConfiguration+Metrics.swift
blindspotbounty Aug 21, 2023
15284e1
address review comments
blindspotbounty Aug 21, 2023
a4ee678
formatting
blindspotbounty Aug 21, 2023
0a0f1b8
map gauges in one place
blindspotbounty Aug 24, 2023
dcdbe21
Merge remote-tracking branch 'origin/main' into feature/sc-1976/gsoc-…
blindspotbounty Aug 29, 2023
5448eb4
move json mode as rd kafka statistics, misc renaming + docc
blindspotbounty Aug 30, 2023
900cb38
Merge branch 'main' into feature/sc-1976/gsoc-expose-librdkafka-stati…
blindspotbounty Sep 5, 2023
a83c970
address review comments
blindspotbounty Oct 10, 2023
4ebdf9d
remove import Metrics
blindspotbounty Oct 11, 2023
05cf1b9
divide producer/consumer configuration
blindspotbounty Oct 20, 2023
3febfcd
apply swiftformat
blindspotbounty Oct 20, 2023
455be80
Merge branch 'main' into feature/sc-1976/gsoc-expose-librdkafka-stati…
blindspotbounty Oct 20, 2023
a96edf7
Merge branch 'main' into feature/sc-1976/gsoc-expose-librdkafka-stati…
blindspotbounty Nov 3, 2023
af05f5b
fix code after conflicts
blindspotbounty Nov 3, 2023
8a3caf3
fix formatting
blindspotbounty Nov 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ let package = Package(
// The zstd Swift package produces warnings that we cannot resolve:
// https://github.com/facebook/zstd/issues/3328
.package(url: "https://github.com/facebook/zstd.git", from: "1.5.0"),
.package(url: "https://github.com/swift-extras/swift-extras-json.git", .upToNextMajor(from: "0.6.0")),
blindspotbounty marked this conversation as resolved.
Show resolved Hide resolved
],
targets: [
.target(
Expand Down Expand Up @@ -76,6 +77,7 @@ let package = Package(
.product(name: "NIOCore", package: "swift-nio"),
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle"),
.product(name: "Logging", package: "swift-log"),
.product(name: "ExtrasJSON", package: "swift-extras-json"),
]
),
.systemLibrary(
Expand Down
7 changes: 7 additions & 0 deletions Sources/SwiftKafka/Configuration/KafkaConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,10 @@ public enum KafkaConfiguration {
public static let v6 = IPAddressFamily(description: "v6")
}
}

extension Duration {
blindspotbounty marked this conversation as resolved.
Show resolved Hide resolved
// Calculated total milliseconds
internal var totalMilliseconds: Int64 {
self.components.seconds * 1000 + self.components.attoseconds / 1_000_000_000_000_000
}
}
13 changes: 13 additions & 0 deletions Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ public struct KafkaConsumerConfiguration {
/// Default: `.milliseconds(100)`
public var pollInterval: Duration = .milliseconds(100)

/// Interval for librdkafka statistics reports
/// 0ms - disabled
/// >= 1ms - statistics provided every specified interval
blindspotbounty marked this conversation as resolved.
Show resolved Hide resolved
public var statisticsInterval: Duration = .zero {
didSet {
precondition(
self.statisticsInterval.totalMilliseconds > 0 || self.statisticsInterval == .zero /*self.statisticsInterval.canBeRepresentedAsMilliseconds*/,
"Lowest granularity is milliseconds"
)
}
}

/// The strategy used for consuming messages.
/// See ``KafkaConfiguration/ConsumptionStrategy`` for more information.
public var consumptionStrategy: KafkaConfiguration.ConsumptionStrategy
Expand Down Expand Up @@ -128,6 +140,7 @@ extension KafkaConsumerConfiguration {
resultDict["group.id"] = groupID
}

resultDict["statistics.interval.ms"] = String(self.statisticsInterval.totalMilliseconds)
resultDict["session.timeout.ms"] = String(session.timeoutMilliseconds)
resultDict["heartbeat.interval.ms"] = String(heartbeatIntervalMilliseconds)
resultDict["max.poll.interval.ms"] = String(maxPollInvervalMilliseconds)
Expand Down
13 changes: 13 additions & 0 deletions Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,18 @@ public struct KafkaProducerConfiguration {
/// Default: `.milliseconds(100)`
public var pollInterval: Duration = .milliseconds(100)

/// Interval for librdkafka statistics reports
/// 0ms - disabled
/// >= 1ms - statistics provided every specified interval
public var statisticsInterval: Duration = .zero {
blindspotbounty marked this conversation as resolved.
Show resolved Hide resolved
didSet {
precondition(
self.statisticsInterval.totalMilliseconds > 0 || self.statisticsInterval == .zero /*self.statisticsInterval.canBeRepresentedAsMilliseconds*/,
"Lowest granularity is milliseconds"
)
}
}

/// Maximum timeout for flushing outstanding produce requests when the ``KakfaProducer`` is shutting down.
/// Default: `10000`
public var flushTimeoutMilliseconds: Int = 10000 {
Expand Down Expand Up @@ -107,6 +119,7 @@ extension KafkaProducerConfiguration {
internal var dictionary: [String: String] {
var resultDict: [String: String] = [:]

resultDict["statistics.interval.ms"] = String(self.statisticsInterval.totalMilliseconds)
resultDict["enable.idempotence"] = String(self.enableIdempotence)
resultDict["queue.buffering.max.messages"] = String(self.queue.bufferingMaxMessages)
resultDict["queue.buffering.max.kbytes"] = String(self.queue.bufferingMaxKBytes)
Expand Down
109 changes: 76 additions & 33 deletions Sources/SwiftKafka/KafkaConsumer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import ServiceLifecycle
/// `NIOAsyncSequenceProducerDelegate` that terminates the closes the producer when
/// `didTerminate()` is invoked.
internal struct KafkaConsumerCloseOnTerminate: Sendable {
let isMessageSequence: Bool
let stateMachine: NIOLockedValueBox<KafkaConsumer.StateMachine>
}

Expand All @@ -31,7 +32,7 @@ extension KafkaConsumerCloseOnTerminate: NIOAsyncSequenceProducerDelegate {
}

func didTerminate() {
self.stateMachine.withLockedValue { $0.messageSequenceTerminated() }
self.stateMachine.withLockedValue { $0.messageSequenceTerminated(isMessageSequence: isMessageSequence) }
}
}

Expand Down Expand Up @@ -121,6 +122,12 @@ public final class KafkaConsumer: Sendable, Service {
NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure,
KafkaConsumerCloseOnTerminate
>
typealias ProducerEvents = NIOAsyncSequenceProducer<
KafkaConsumerEvent,
NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure,
KafkaConsumerCloseOnTerminate
>

/// The configuration object of the consumer client.
private let config: KafkaConsumerConfiguration
/// A logger.
Expand All @@ -146,7 +153,8 @@ public final class KafkaConsumer: Sendable, Service {
client: RDKafkaClient,
stateMachine: NIOLockedValueBox<StateMachine>,
config: KafkaConsumerConfiguration,
logger: Logger
logger: Logger,
eventSource: ProducerEvents.Source? = nil
) throws {
self.config = config
self.stateMachine = stateMachine
Expand All @@ -155,7 +163,7 @@ public final class KafkaConsumer: Sendable, Service {
let sourceAndSequence = NIOThrowingAsyncSequenceProducer.makeSequence(
elementType: KafkaConsumerMessage.self,
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(),
delegate: KafkaConsumerCloseOnTerminate(stateMachine: self.stateMachine)
delegate: KafkaConsumerCloseOnTerminate(isMessageSequence: true, stateMachine: self.stateMachine)
)

self.messages = KafkaConsumerMessages(
Expand All @@ -166,7 +174,8 @@ public final class KafkaConsumer: Sendable, Service {
self.stateMachine.withLockedValue {
$0.initialize(
client: client,
source: sourceAndSequence.source
source: sourceAndSequence.source,
eventSource: eventSource
)
}

Expand Down Expand Up @@ -242,6 +251,11 @@ public final class KafkaConsumer: Sendable, Service {
if config.enableAutoCommit == false {
subscribedEvents.append(.offsetCommit)
}
// Don't listen to statistics even if configured
// As there are no events instantiated
// if config.statisticsInterval != .zero {
blindspotbounty marked this conversation as resolved.
Show resolved Hide resolved
// subscribedEvents.append(.statistics)
// }

let client = try RDKafkaClient.makeClient(
type: .consumer,
Expand All @@ -250,20 +264,22 @@ public final class KafkaConsumer: Sendable, Service {
logger: logger
)

let consumer = try KafkaConsumer(
client: client,
stateMachine: stateMachine,
config: config,
logger: logger
)

let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence(
elementType: KafkaConsumerEvent.self,
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(),
delegate: KafkaConsumerCloseOnTerminate(stateMachine: stateMachine)
delegate: KafkaConsumerCloseOnTerminate(isMessageSequence: false, stateMachine: stateMachine)
)

let eventsSequence = KafkaConsumerEvents(wrappedSequence: sourceAndSequence.sequence)

let consumer = try KafkaConsumer(
client: client,
stateMachine: stateMachine,
config: config,
logger: logger,
eventSource: sourceAndSequence.source
)

return (consumer, eventsSequence)
}

Expand Down Expand Up @@ -321,7 +337,7 @@ public final class KafkaConsumer: Sendable, Service {
while !Task.isCancelled {
let nextAction = self.stateMachine.withLockedValue { $0.nextPollLoopAction() }
switch nextAction {
case .pollForAndYieldMessage(let client, let source):
case .pollForAndYieldMessage(let client, let source, let eventSource):
let events = client.eventPoll()
for event in events {
switch event {
Expand All @@ -332,8 +348,11 @@ public final class KafkaConsumer: Sendable, Service {
_ = source.yield(message)
case .failure(let error):
source.finish()
eventSource?.finish()
throw error
}
case .statistics(let statistics):
_ = eventSource?.yield(.statistics(statistics))
default:
break // Ignore
}
Expand Down Expand Up @@ -383,8 +402,9 @@ public final class KafkaConsumer: Sendable, Service {
client: client,
logger: self.logger
)
case .triggerGracefulShutdownAndFinishSource(let client, let source):
case .triggerGracefulShutdownAndFinishSource(let client, let source, let eventSource):
source.finish()
eventSource?.finish()
self._triggerGracefulShutdown(
client: client,
logger: self.logger
Expand Down Expand Up @@ -428,17 +448,20 @@ extension KafkaConsumer {
///
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
/// - Parameter eventSource: ``NIOAsyncSequenceProducer/Source`` used for yielding new events.
case initializing(
client: RDKafkaClient,
source: Producer.Source
source: Producer.Source,
eventSource: ProducerEvents.Source?
)
/// The ``KafkaConsumer`` is consuming messages.
///
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
/// - Parameter eventSource: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
case consuming(
client: RDKafkaClient,
source: Producer.Source
source: Producer.Source,
eventSource: ProducerEvents.Source?
)
/// Consumer is still running but the messages asynchronous sequence was terminated.
/// All incoming messages will be dropped.
Expand All @@ -461,14 +484,16 @@ extension KafkaConsumer {
/// not yet available when the normal initialization occurs.
mutating func initialize(
client: RDKafkaClient,
source: Producer.Source
source: Producer.Source,
eventSource: ProducerEvents.Source?
) {
guard case .uninitialized = self.state else {
fatalError("\(#function) can only be invoked in state .uninitialized, but was invoked in state \(self.state)")
}
self.state = .initializing(
client: client,
source: source
source: source,
eventSource: eventSource
)
}

Expand All @@ -480,7 +505,8 @@ extension KafkaConsumer {
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
case pollForAndYieldMessage(
client: RDKafkaClient,
source: Producer.Source
source: Producer.Source,
eventSource: ProducerEvents.Source?
)
/// The ``KafkaConsumer`` stopped consuming messages or
/// is in the process of shutting down.
Expand All @@ -502,8 +528,8 @@ extension KafkaConsumer {
fatalError("\(#function) invoked while still in state \(self.state)")
case .initializing:
fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages")
case .consuming(let client, let source):
return .pollForAndYieldMessage(client: client, source: source)
case .consuming(let client, let source, let eventSource):
return .pollForAndYieldMessage(client: client, source: source, eventSource: eventSource)
case .consumptionStopped(let client):
return .pollWithoutYield(client: client)
case .finishing(let client):
Expand Down Expand Up @@ -532,10 +558,11 @@ extension KafkaConsumer {
switch self.state {
case .uninitialized:
fatalError("\(#function) invoked while still in state \(self.state)")
case .initializing(let client, let source):
case .initializing(let client, let source, let eventSource):
self.state = .consuming(
client: client,
source: source
source: source,
eventSource: eventSource
)
return .setUpConnection(client: client)
case .consuming, .consumptionStopped, .finishing, .finished:
Expand All @@ -545,16 +572,30 @@ extension KafkaConsumer {

/// The messages asynchronous sequence was terminated.
/// All incoming messages will be dropped.
mutating func messageSequenceTerminated() {
mutating func messageSequenceTerminated(isMessageSequence: Bool) {
switch self.state {
case .uninitialized:
fatalError("\(#function) invoked while still in state \(self.state)")
case .initializing:
fatalError("Call to \(#function) before setUpConnection() was invoked")
case .consumptionStopped:
fatalError("messageSequenceTerminated() must not be invoked more than once")
case .consuming(let client, _):
self.state = .consumptionStopped(client: client)
if isMessageSequence {
fatalError("messageSequenceTerminated() must not be invoked more than once")
}
case .consuming(let client, let source, let eventSource):
// only move to stopping if messages sequence was finished
if isMessageSequence {
self.state = .consumptionStopped(client: client)
// If message sequence is being terminated, it means class deinit is called
// see `messages` field, it is last change to call finish for `eventSource`
eventSource?.finish()
}
else {
// Messages are still consuming, only event source was finished
// Ok, probably, noone wants to listen to events,
// though it might be very bad for rebalancing
self.state = .consuming(client: client, source: source, eventSource: nil)
}
case .finishing, .finished:
break
}
Expand All @@ -576,7 +617,7 @@ extension KafkaConsumer {
fatalError("Subscribe to consumer group / assign to topic partition pair before committing offsets")
case .consumptionStopped:
fatalError("Cannot store offset when consumption has been stopped")
case .consuming(let client, _):
case .consuming(let client, _, _):
return .storeOffset(client: client)
case .finishing, .finished:
fatalError("\(#function) invoked while still in state \(self.state)")
Expand Down Expand Up @@ -607,7 +648,7 @@ extension KafkaConsumer {
fatalError("Subscribe to consumer group / assign to topic partition pair before committing offsets")
case .consumptionStopped:
fatalError("Cannot commit when consumption has been stopped")
case .consuming(let client, _):
case .consuming(let client, _, _):
return .commitSync(client: client)
case .finishing, .finished:
return .throwClosedError
Expand All @@ -628,7 +669,8 @@ extension KafkaConsumer {
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
case triggerGracefulShutdownAndFinishSource(
client: RDKafkaClient,
source: Producer.Source
source: Producer.Source,
eventSource: ProducerEvents.Source?
)
}

Expand All @@ -642,11 +684,12 @@ extension KafkaConsumer {
fatalError("\(#function) invoked while still in state \(self.state)")
case .initializing:
fatalError("subscribe() / assign() should have been invoked before \(#function)")
case .consuming(let client, let source):
case .consuming(let client, let source, let eventSource):
self.state = .finishing(client: client)
return .triggerGracefulShutdownAndFinishSource(
client: client,
source: source
source: source,
eventSource: eventSource
)
case .consumptionStopped(let client):
self.state = .finishing(client: client)
Expand Down
6 changes: 5 additions & 1 deletion Sources/SwiftKafka/KafkaConsumerEvent.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@

/// An enumeration representing events that can be received through the ``KafkaConsumerEvents`` asynchronous sequence.
public enum KafkaConsumerEvent: Sendable, Hashable {
/// Statistics from librdkafka
case statistics(KafkaStatistics)
/// - Important: Always provide a `default` case when switiching over this `enum`.
case DO_NOT_SWITCH_OVER_THIS_EXHAUSITVELY

internal init(_ event: RDKafkaClient.KafkaEvent) {
internal init?(_ event: RDKafkaClient.KafkaEvent) {
blindspotbounty marked this conversation as resolved.
Show resolved Hide resolved
switch event {
case .statistics(let stat):
self = .statistics(stat)
case .deliveryReport:
fatalError("Cannot cast \(event) to KafkaConsumerEvent")
case .consumerMessages:
Expand Down
Loading