Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: expose librdkafka statistics as swift metrics #92

Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
c870864
introduce statistics for producer
blindspotbounty Jul 26, 2023
633773f
Merge remote-tracking branch 'origin/main' into feature/sc-1976/gsoc-…
blindspotbounty Jul 31, 2023
d9887b9
add statistics to new consumer with events
blindspotbounty Jul 31, 2023
e5f0483
Merge remote-tracking branch 'origin/main' into feature/sc-1976/gsoc-…
blindspotbounty Aug 10, 2023
d55a7fd
fix some artefacts
blindspotbounty Aug 10, 2023
8b4525b
adjust to KeyRefreshAttempts
blindspotbounty Aug 10, 2023
23e08fc
draft: statistics with metrics
blindspotbounty Aug 17, 2023
612a3c4
make structures internal
blindspotbounty Aug 17, 2023
5c10435
Update Sources/Kafka/Configuration/KafkaConfiguration+Metrics.swift
blindspotbounty Aug 21, 2023
2be2bd9
Update Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift
blindspotbounty Aug 21, 2023
2cd0f8b
Update Sources/Kafka/Configuration/KafkaConfiguration+Metrics.swift
blindspotbounty Aug 21, 2023
abd97de
Update Sources/Kafka/Configuration/KafkaConfiguration+Metrics.swift
blindspotbounty Aug 21, 2023
15284e1
address review comments
blindspotbounty Aug 21, 2023
a4ee678
formatting
blindspotbounty Aug 21, 2023
0a0f1b8
map gauges in one place
blindspotbounty Aug 24, 2023
dcdbe21
Merge remote-tracking branch 'origin/main' into feature/sc-1976/gsoc-…
blindspotbounty Aug 29, 2023
5448eb4
move json mode as rd kafka statistics, misc renaming + docc
blindspotbounty Aug 30, 2023
900cb38
Merge branch 'main' into feature/sc-1976/gsoc-expose-librdkafka-stati…
blindspotbounty Sep 5, 2023
a83c970
address review comments
blindspotbounty Oct 10, 2023
4ebdf9d
remove import Metrics
blindspotbounty Oct 11, 2023
05cf1b9
divide producer/consumer configuration
blindspotbounty Oct 20, 2023
3febfcd
apply swiftformat
blindspotbounty Oct 20, 2023
455be80
Merge branch 'main' into feature/sc-1976/gsoc-expose-librdkafka-stati…
blindspotbounty Oct 20, 2023
a96edf7
Merge branch 'main' into feature/sc-1976/gsoc-expose-librdkafka-stati…
blindspotbounty Nov 3, 2023
af05f5b
fix code after conflicts
blindspotbounty Nov 3, 2023
8a3caf3
fix formatting
blindspotbounty Nov 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ let package = Package(
.package(url: "https://github.com/apple/swift-nio-ssl", from: "2.25.0"),
.package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.1.0"),
.package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-metrics", from: "2.4.1"),
// The zstd Swift package produces warnings that we cannot resolve:
// https://github.com/facebook/zstd/issues/3328
.package(url: "https://github.com/facebook/zstd.git", from: "1.5.0"),
Expand Down Expand Up @@ -83,6 +84,7 @@ let package = Package(
.product(name: "NIOCore", package: "swift-nio"),
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle"),
.product(name: "Logging", package: "swift-log"),
.product(name: "Metrics", package: "swift-metrics"),
]
),
.target(
Expand All @@ -93,7 +95,10 @@ let package = Package(
),
.testTarget(
name: "KafkaTests",
dependencies: ["Kafka"]
dependencies: [
"Kafka",
.product(name: "MetricsTestKit", package: "swift-metrics")
]
),
.testTarget(
name: "IntegrationTests",
Expand Down
146 changes: 146 additions & 0 deletions Sources/Kafka/Configuration/KafkaConfiguration+Metrics.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the swift-kafka-client open source project
//
// Copyright (c) 2023 Apple Inc. and the swift-kafka-client project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Metrics

extension KafkaConfiguration {
// MARK: - Metrics

/// Configuration for the consumer metrics emitted by `SwiftKafka`.
public struct ConsumerMetrics: Sendable {
internal var enabled: Bool {
self.updateInterval != nil &&
(self.queuedOperation != nil ||
self.totalKafkaBrokerRequests != nil ||
self.totalKafkaBrokerBytesSent != nil ||
self.totalKafkaBrokerResponses != nil ||
self.totalKafkaBrokerResponsesSize != nil ||
self.totalKafkaBrokerMessagesBytesRecieved != nil ||
self.topicsInMetadataCache != nil)
}

/// Update interval for statistics.
public var updateInterval: Duration?

/// Number of operations (callbacks, events, etc) waiting in the queue.
public var queuedOperation: Gauge?

/// Total number of requests sent to Kafka brokers.
public var totalKafkaBrokerRequests: Gauge?
/// Total number of bytes transmitted to Kafka brokers.
public var totalKafkaBrokerBytesSent: Gauge?
/// Total number of responses received from Kafka brokers.
public var totalKafkaBrokerResponses: Gauge?
/// Total number of bytes received from Kafka brokers.
public var totalKafkaBrokerResponsesSize: Gauge?

/// Total number of messages consumed, not including ignored messages (due to offset, etc), from Kafka brokers.
public var totalKafkaBrokerMessagesRecieved: Gauge?
/// Total number of message bytes (including framing) received from Kafka brokers.
public var totalKafkaBrokerMessagesBytesRecieved: Gauge?

/// Number of topics in the metadata cache.
public var topicsInMetadataCache: Gauge?

private static func record<T: BinaryInteger>(_ value: T?, to: Gauge?) {
guard let value,
let to else {
return
}
to.record(value)
}

internal func update(with rdKafkaStatistics: RDKafkaStatistics) {
Self.record(rdKafkaStatistics.queuedOperation, to: self.queuedOperation)

Self.record(rdKafkaStatistics.totalKafkaBrokerRequests, to: self.totalKafkaBrokerRequests)
Self.record(rdKafkaStatistics.totalKafkaBrokerBytesSent, to: self.totalKafkaBrokerBytesSent)
Self.record(rdKafkaStatistics.totalKafkaBrokerResponses, to: self.totalKafkaBrokerResponses)
Self.record(rdKafkaStatistics.totalKafkaBrokerResponsesSize, to: self.totalKafkaBrokerResponsesSize)

Self.record(rdKafkaStatistics.totalKafkaBrokerMessagesRecieved, to: self.totalKafkaBrokerMessagesRecieved)
Self.record(rdKafkaStatistics.totalKafkaBrokerMessagesBytesRecieved, to: self.totalKafkaBrokerMessagesBytesRecieved)

Self.record(rdKafkaStatistics.topicsInMetadataCache, to: self.topicsInMetadataCache)
}
}

/// Configuration for the producer metrics emitted by `SwiftKafka`.
public struct ProducerMetrics: Sendable {
internal var enabled: Bool {
self.updateInterval != nil &&
(self.queuedOperation != nil ||
self.queuedProducerMessages != nil ||
self.queuedProducerMessagesSize != nil ||
self.totalKafkaBrokerRequests != nil ||
self.totalKafkaBrokerBytesSent != nil ||
self.totalKafkaBrokerResponses != nil ||
self.totalKafkaBrokerResponsesSize != nil ||
self.totalKafkaBrokerMessagesSent != nil ||
self.totalKafkaBrokerMessagesBytesSent != nil ||
self.topicsInMetadataCache != nil)
}

/// Update interval for statistics.
public var updateInterval: Duration?

/// Number of operations (callbacks, events, etc) waiting in the queue.
public var queuedOperation: Gauge?
/// Current number of queued producer messages.
public var queuedProducerMessages: Gauge?
/// Current total size in bytes of queued producer messages.
public var queuedProducerMessagesSize: Gauge?

/// Total number of requests sent to Kafka brokers.
public var totalKafkaBrokerRequests: Gauge?
/// Total number of bytes transmitted to Kafka brokers.
public var totalKafkaBrokerBytesSent: Gauge?
/// Total number of responses received from Kafka brokers.
public var totalKafkaBrokerResponses: Gauge?
/// Total number of bytes received from Kafka brokers.
public var totalKafkaBrokerResponsesSize: Gauge?

/// Total number of messages transmitted (produced) to Kafka brokers.
public var totalKafkaBrokerMessagesSent: Gauge?
/// Total number of message bytes (including framing, such as per-Message framing and MessageSet/batch framing) transmitted to Kafka brokers.
public var totalKafkaBrokerMessagesBytesSent: Gauge?

/// Number of topics in the metadata cache.
public var topicsInMetadataCache: Gauge?

private static func record<T: BinaryInteger>(_ value: T?, to: Gauge?) {
guard let value,
let to else {
return
}
to.record(value)
}

internal func update(with rdKafkaStatistics: RDKafkaStatistics) {
Self.record(rdKafkaStatistics.queuedOperation, to: self.queuedOperation)
Self.record(rdKafkaStatistics.queuedProducerMessages, to: self.queuedProducerMessages)
Self.record(rdKafkaStatistics.queuedProducerMessagesSize, to: self.queuedProducerMessagesSize)

Self.record(rdKafkaStatistics.totalKafkaBrokerRequests, to: self.totalKafkaBrokerRequests)
Self.record(rdKafkaStatistics.totalKafkaBrokerBytesSent, to: self.totalKafkaBrokerBytesSent)
Self.record(rdKafkaStatistics.totalKafkaBrokerResponses, to: self.totalKafkaBrokerResponses)
Self.record(rdKafkaStatistics.totalKafkaBrokerResponsesSize, to: self.totalKafkaBrokerResponsesSize)

Self.record(rdKafkaStatistics.totalKafkaBrokerMessagesSent, to: self.totalKafkaBrokerMessagesSent)
Self.record(rdKafkaStatistics.totalKafkaBrokerMessagesBytesSent, to: self.totalKafkaBrokerMessagesBytesSent)

Self.record(rdKafkaStatistics.topicsInMetadataCache, to: self.topicsInMetadataCache)
}
}
}
8 changes: 8 additions & 0 deletions Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ public struct KafkaConsumerConfiguration {
/// Reconnect options.
public var reconnect: KafkaConfiguration.ReconnectOptions = .init()

/// Options for librdkafka metrics updates
public var metrics: KafkaConfiguration.ConsumerMetrics = .init()

/// Security protocol to use (plaintext, ssl, sasl_plaintext, sasl_ssl).
/// Default: `.plaintext`
public var securityProtocol: KafkaConfiguration.SecurityProtocol = .plaintext
Expand Down Expand Up @@ -271,6 +274,11 @@ extension KafkaConsumerConfiguration {
resultDict["reconnect.backoff.ms"] = String(reconnect.backoff.rawValue)
resultDict["reconnect.backoff.max.ms"] = String(reconnect.maximumBackoff.inMilliseconds)

if self.metrics.enabled,
let updateInterval = self.metrics.updateInterval {
resultDict["statistics.interval.ms"] = String(updateInterval.inMilliseconds)
}

// Merge with SecurityProtocol configuration dictionary
resultDict.merge(securityProtocol.dictionary) { _, _ in
fatalError("securityProtocol and \(#file) should not have duplicate keys")
Expand Down
8 changes: 8 additions & 0 deletions Sources/Kafka/Configuration/KafkaProducerConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ public struct KafkaProducerConfiguration {
/// Reconnect options.
public var reconnect: KafkaConfiguration.ReconnectOptions = .init()

/// Options for librdkafka metrics updates
public var metrics: KafkaConfiguration.ProducerMetrics = .init()

/// Security protocol to use (plaintext, ssl, sasl_plaintext, sasl_ssl).
/// Default: `.plaintext`
public var securityProtocol: KafkaConfiguration.SecurityProtocol = .plaintext
Expand Down Expand Up @@ -212,6 +215,11 @@ extension KafkaProducerConfiguration {
resultDict["reconnect.backoff.ms"] = String(self.reconnect.backoff.rawValue)
resultDict["reconnect.backoff.max.ms"] = String(self.reconnect.maximumBackoff.inMilliseconds)

if self.metrics.enabled,
let updateInterval = self.metrics.updateInterval {
resultDict["statistics.interval.ms"] = String(updateInterval.inMilliseconds)
}

// Merge with SecurityProtocol configuration dictionary
resultDict.merge(self.securityProtocol.dictionary) { _, _ in
fatalError("securityProtocol and \(#file) should not have duplicate keys")
Expand Down
9 changes: 9 additions & 0 deletions Sources/Kafka/KafkaConsumer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public final class KafkaConsumer: Sendable, Service {
NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure,
KafkaConsumerCloseOnTerminate
>

/// The configuration object of the consumer client.
private let configuration: KafkaConsumerConfiguration
/// A logger.
Expand Down Expand Up @@ -203,6 +204,9 @@ public final class KafkaConsumer: Sendable, Service {
if configuration.isAutoCommitEnabled == false {
subscribedEvents.append(.offsetCommit)
}
if configuration.metrics.enabled {
subscribedEvents.append(.statistics)
}

let client = try RDKafkaClient.makeClient(
type: .consumer,
Expand Down Expand Up @@ -243,6 +247,9 @@ public final class KafkaConsumer: Sendable, Service {
if configuration.isAutoCommitEnabled == false {
subscribedEvents.append(.offsetCommit)
}
if configuration.metrics.enabled {
subscribedEvents.append(.statistics)
}

let client = try RDKafkaClient.makeClient(
type: .consumer,
Expand Down Expand Up @@ -341,6 +348,8 @@ public final class KafkaConsumer: Sendable, Service {
case .consumerMessages(let result):
// We do not support back pressure, we can ignore the yield result
_ = source.yield(result)
case .statistics(let statistics):
self.configuration.metrics.update(with: statistics)
default:
break // Ignore
}
Expand Down
2 changes: 2 additions & 0 deletions Sources/Kafka/KafkaConsumerEvent.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ public enum KafkaConsumerEvent: Sendable, Hashable {

internal init(_ event: RDKafkaClient.KafkaEvent) {
switch event {
case .statistics:
fatalError("Cannot cast \(event) to KafkaConsumerEvent")
case .deliveryReport:
fatalError("Cannot cast \(event) to KafkaConsumerEvent")
case .consumerMessages:
Expand Down
27 changes: 22 additions & 5 deletions Sources/Kafka/KafkaProducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,16 @@ public final class KafkaProducer: Service, Sendable {
) throws {
let stateMachine = NIOLockedValueBox(StateMachine(logger: logger))

var subscribedEvents: [RDKafkaEvent] = [.log] // No .deliveryReport here!

if configuration.metrics.enabled {
subscribedEvents.append(.statistics)
}

let client = try RDKafkaClient.makeClient(
type: .producer,
configDictionary: configuration.dictionary,
events: [.log], // No .deliveryReport here!
events: subscribedEvents,
logger: logger
)

Expand Down Expand Up @@ -156,10 +162,16 @@ public final class KafkaProducer: Service, Sendable {
) throws -> (KafkaProducer, KafkaProducerEvents) {
let stateMachine = NIOLockedValueBox(StateMachine(logger: logger))

var subscribedEvents: [RDKafkaEvent] = [.log, .deliveryReport]
// Listen to statistics events when statistics enabled
if configuration.metrics.enabled {
subscribedEvents.append(.statistics)
blindspotbounty marked this conversation as resolved.
Show resolved Hide resolved
}

let client = try RDKafkaClient.makeClient(
type: .producer,
configDictionary: configuration.dictionary,
events: [.log, .deliveryReport],
events: subscribedEvents,
logger: logger
)

Expand Down Expand Up @@ -212,9 +224,14 @@ public final class KafkaProducer: Service, Sendable {
case .pollAndYield(let client, let source):
let events = client.eventPoll()
for event in events {
let producerEvent = KafkaProducerEvent(event)
// Ignore YieldResult as we don't support back pressure in KafkaProducer
_ = source?.yield(producerEvent)
switch event {
case .statistics(let statistics):
self.configuration.metrics.update(with: statistics)
case .deliveryReport(let reports):
_ = source?.yield(.deliveryReports(reports))
case .consumerMessages:
fatalError("Unexpected event for producer \(event)")
}
}
try await Task.sleep(for: self.configuration.pollInterval)
case .flushFinishSourceAndTerminatePollLoop(let client, let source):
Expand Down
2 changes: 2 additions & 0 deletions Sources/Kafka/KafkaProducerEvent.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public enum KafkaProducerEvent: Sendable, Hashable {
switch event {
case .deliveryReport(results: let results):
self = .deliveryReports(results)
case .statistics:
fatalError("Cannot cast \(event) to KafkaProducerEvent")
case .consumerMessages:
fatalError("Cannot cast \(event) to KafkaProducerEvent")
}
Expand Down
22 changes: 22 additions & 0 deletions Sources/Kafka/RDKafka/RDKafkaClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import Crdkafka
import Dispatch
import class Foundation.JSONDecoder
import Logging

/// Base class for ``KafkaProducer`` and ``KafkaConsumer``,
Expand Down Expand Up @@ -311,6 +312,7 @@ final class RDKafkaClient: Sendable {
enum KafkaEvent {
case deliveryReport(results: [KafkaDeliveryReport])
case consumerMessages(result: Result<KafkaConsumerMessage, Error>)
case statistics(RDKafkaStatistics)
}

/// Poll the event `rd_kafka_queue_t` for new events.
Expand Down Expand Up @@ -341,6 +343,10 @@ final class RDKafkaClient: Sendable {
self.handleLogEvent(event)
case .offsetCommit:
self.handleOffsetCommitEvent(event)
case .statistics:
if let forwardEvent = self.handleStatistics(event) {
events.append(forwardEvent)
}
case .none:
// Finished reading events, return early
return events
Expand Down Expand Up @@ -392,6 +398,22 @@ final class RDKafkaClient: Sendable {
// The returned message(s) MUST NOT be freed with rd_kafka_message_destroy().
}

/// Handle event of type `RDKafkaEvent.statistics`.
///
/// - Parameter event: Pointer to underlying `rd_kafka_event_t`.
private func handleStatistics(_ event: OpaquePointer?) -> KafkaEvent? {
let jsonStr = String(cString: rd_kafka_event_stats(event))
do {
if let jsonData = jsonStr.data(using: .utf8) {
let json = try JSONDecoder().decode(RDKafkaStatistics.self, from: jsonData)
return .statistics(json)
}
} catch {
assertionFailure("Error occurred when decoding JSON statistics: \(error) when decoding \(jsonStr)")
}
return nil
}

/// Handle event of type `RDKafkaEvent.log`.
///
/// - Parameter event: Pointer to underlying `rd_kafka_event_t`.
Expand Down
Loading