diff --git a/Package.swift b/Package.swift index 8d463327..c13229ec 100644 --- a/Package.swift +++ b/Package.swift @@ -49,6 +49,7 @@ let package = Package( .package(url: "https://github.com/apple/swift-nio-ssl", from: "2.25.0"), .package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.1.0"), .package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"), + .package(url: "https://github.com/apple/swift-metrics", from: "2.4.1"), // The zstd Swift package produces warnings that we cannot resolve: // https://github.com/facebook/zstd/issues/3328 .package(url: "https://github.com/facebook/zstd.git", from: "1.5.0"), @@ -83,6 +84,7 @@ let package = Package( .product(name: "NIOCore", package: "swift-nio"), .product(name: "ServiceLifecycle", package: "swift-service-lifecycle"), .product(name: "Logging", package: "swift-log"), + .product(name: "Metrics", package: "swift-metrics"), ] ), .target( @@ -93,7 +95,10 @@ let package = Package( ), .testTarget( name: "KafkaTests", - dependencies: ["Kafka"] + dependencies: [ + "Kafka", + .product(name: "MetricsTestKit", package: "swift-metrics"), + ] ), .testTarget( name: "IntegrationTests", diff --git a/Sources/Kafka/Configuration/KafkaConfiguration+Metrics.swift b/Sources/Kafka/Configuration/KafkaConfiguration+Metrics.swift new file mode 100644 index 00000000..e9878c99 --- /dev/null +++ b/Sources/Kafka/Configuration/KafkaConfiguration+Metrics.swift @@ -0,0 +1,146 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-client open source project +// +// Copyright (c) 2023 Apple Inc. and the swift-kafka-client project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Metrics + +extension KafkaConfiguration { + // MARK: - Metrics + + /// Configuration for the consumer metrics emitted by `SwiftKafka`. + public struct ConsumerMetrics: Sendable { + internal var enabled: Bool { + self.updateInterval != nil && + (self.queuedOperation != nil || + self.totalKafkaBrokerRequests != nil || + self.totalKafkaBrokerBytesSent != nil || + self.totalKafkaBrokerResponses != nil || + self.totalKafkaBrokerResponsesSize != nil || + self.totalKafkaBrokerMessagesBytesRecieved != nil || + self.topicsInMetadataCache != nil) + } + + /// Update interval for statistics. + public var updateInterval: Duration? + + /// Number of operations (callbacks, events, etc) waiting in the queue. + public var queuedOperation: Gauge? + + /// Total number of requests sent to Kafka brokers. + public var totalKafkaBrokerRequests: Gauge? + /// Total number of bytes transmitted to Kafka brokers. + public var totalKafkaBrokerBytesSent: Gauge? + /// Total number of responses received from Kafka brokers. + public var totalKafkaBrokerResponses: Gauge? + /// Total number of bytes received from Kafka brokers. + public var totalKafkaBrokerResponsesSize: Gauge? + + /// Total number of messages consumed, not including ignored messages (due to offset, etc), from Kafka brokers. + public var totalKafkaBrokerMessagesRecieved: Gauge? + /// Total number of message bytes (including framing) received from Kafka brokers. + public var totalKafkaBrokerMessagesBytesRecieved: Gauge? + + /// Number of topics in the metadata cache. + public var topicsInMetadataCache: Gauge? + + private static func record(_ value: T?, to: Gauge?) { + guard let value, + let to else { + return + } + to.record(value) + } + + internal func update(with rdKafkaStatistics: RDKafkaStatistics) { + Self.record(rdKafkaStatistics.queuedOperation, to: self.queuedOperation) + + Self.record(rdKafkaStatistics.totalKafkaBrokerRequests, to: self.totalKafkaBrokerRequests) + Self.record(rdKafkaStatistics.totalKafkaBrokerBytesSent, to: self.totalKafkaBrokerBytesSent) + Self.record(rdKafkaStatistics.totalKafkaBrokerResponses, to: self.totalKafkaBrokerResponses) + Self.record(rdKafkaStatistics.totalKafkaBrokerResponsesSize, to: self.totalKafkaBrokerResponsesSize) + + Self.record(rdKafkaStatistics.totalKafkaBrokerMessagesRecieved, to: self.totalKafkaBrokerMessagesRecieved) + Self.record(rdKafkaStatistics.totalKafkaBrokerMessagesBytesRecieved, to: self.totalKafkaBrokerMessagesBytesRecieved) + + Self.record(rdKafkaStatistics.topicsInMetadataCache, to: self.topicsInMetadataCache) + } + } + + /// Configuration for the producer metrics emitted by `SwiftKafka`. + public struct ProducerMetrics: Sendable { + internal var enabled: Bool { + self.updateInterval != nil && + (self.queuedOperation != nil || + self.queuedProducerMessages != nil || + self.queuedProducerMessagesSize != nil || + self.totalKafkaBrokerRequests != nil || + self.totalKafkaBrokerBytesSent != nil || + self.totalKafkaBrokerResponses != nil || + self.totalKafkaBrokerResponsesSize != nil || + self.totalKafkaBrokerMessagesSent != nil || + self.totalKafkaBrokerMessagesBytesSent != nil || + self.topicsInMetadataCache != nil) + } + + /// Update interval for statistics. + public var updateInterval: Duration? + + /// Number of operations (callbacks, events, etc) waiting in the queue. + public var queuedOperation: Gauge? + /// Current number of queued producer messages. + public var queuedProducerMessages: Gauge? + /// Current total size in bytes of queued producer messages. + public var queuedProducerMessagesSize: Gauge? + + /// Total number of requests sent to Kafka brokers. + public var totalKafkaBrokerRequests: Gauge? + /// Total number of bytes transmitted to Kafka brokers. + public var totalKafkaBrokerBytesSent: Gauge? + /// Total number of responses received from Kafka brokers. + public var totalKafkaBrokerResponses: Gauge? + /// Total number of bytes received from Kafka brokers. + public var totalKafkaBrokerResponsesSize: Gauge? + + /// Total number of messages transmitted (produced) to Kafka brokers. + public var totalKafkaBrokerMessagesSent: Gauge? + /// Total number of message bytes (including framing, such as per-Message framing and MessageSet/batch framing) transmitted to Kafka brokers. + public var totalKafkaBrokerMessagesBytesSent: Gauge? + + /// Number of topics in the metadata cache. + public var topicsInMetadataCache: Gauge? + + private static func record(_ value: T?, to: Gauge?) { + guard let value, + let to else { + return + } + to.record(value) + } + + internal func update(with rdKafkaStatistics: RDKafkaStatistics) { + Self.record(rdKafkaStatistics.queuedOperation, to: self.queuedOperation) + Self.record(rdKafkaStatistics.queuedProducerMessages, to: self.queuedProducerMessages) + Self.record(rdKafkaStatistics.queuedProducerMessagesSize, to: self.queuedProducerMessagesSize) + + Self.record(rdKafkaStatistics.totalKafkaBrokerRequests, to: self.totalKafkaBrokerRequests) + Self.record(rdKafkaStatistics.totalKafkaBrokerBytesSent, to: self.totalKafkaBrokerBytesSent) + Self.record(rdKafkaStatistics.totalKafkaBrokerResponses, to: self.totalKafkaBrokerResponses) + Self.record(rdKafkaStatistics.totalKafkaBrokerResponsesSize, to: self.totalKafkaBrokerResponsesSize) + + Self.record(rdKafkaStatistics.totalKafkaBrokerMessagesSent, to: self.totalKafkaBrokerMessagesSent) + Self.record(rdKafkaStatistics.totalKafkaBrokerMessagesBytesSent, to: self.totalKafkaBrokerMessagesBytesSent) + + Self.record(rdKafkaStatistics.topicsInMetadataCache, to: self.topicsInMetadataCache) + } + } +} diff --git a/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift b/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift index 60215250..dae5691f 100644 --- a/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift +++ b/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift @@ -238,6 +238,9 @@ public struct KafkaConsumerConfiguration { /// Reconnect options. public var reconnect: KafkaConfiguration.ReconnectOptions = .init() + /// Options for librdkafka metrics updates + public var metrics: KafkaConfiguration.ConsumerMetrics = .init() + /// Security protocol to use (plaintext, ssl, sasl_plaintext, sasl_ssl). /// Default: `.plaintext` public var securityProtocol: KafkaConfiguration.SecurityProtocol = .plaintext @@ -302,6 +305,11 @@ extension KafkaConsumerConfiguration { resultDict["reconnect.backoff.ms"] = String(reconnect.backoff.rawValue) resultDict["reconnect.backoff.max.ms"] = String(reconnect.maximumBackoff.inMilliseconds) + if self.metrics.enabled, + let updateInterval = self.metrics.updateInterval { + resultDict["statistics.interval.ms"] = String(updateInterval.inMilliseconds) + } + // Merge with SecurityProtocol configuration dictionary resultDict.merge(securityProtocol.dictionary) { _, _ in fatalError("securityProtocol and \(#file) should not have duplicate keys") diff --git a/Sources/Kafka/Configuration/KafkaProducerConfiguration.swift b/Sources/Kafka/Configuration/KafkaProducerConfiguration.swift index a774905b..2a345b02 100644 --- a/Sources/Kafka/Configuration/KafkaProducerConfiguration.swift +++ b/Sources/Kafka/Configuration/KafkaProducerConfiguration.swift @@ -161,6 +161,9 @@ public struct KafkaProducerConfiguration { /// Reconnect options. public var reconnect: KafkaConfiguration.ReconnectOptions = .init() + /// Options for librdkafka metrics updates + public var metrics: KafkaConfiguration.ProducerMetrics = .init() + /// Security protocol to use (plaintext, ssl, sasl_plaintext, sasl_ssl). /// Default: `.plaintext` public var securityProtocol: KafkaConfiguration.SecurityProtocol = .plaintext @@ -212,6 +215,11 @@ extension KafkaProducerConfiguration { resultDict["reconnect.backoff.ms"] = String(self.reconnect.backoff.rawValue) resultDict["reconnect.backoff.max.ms"] = String(self.reconnect.maximumBackoff.inMilliseconds) + if self.metrics.enabled, + let updateInterval = self.metrics.updateInterval { + resultDict["statistics.interval.ms"] = String(updateInterval.inMilliseconds) + } + // Merge with SecurityProtocol configuration dictionary resultDict.merge(self.securityProtocol.dictionary) { _, _ in fatalError("securityProtocol and \(#file) should not have duplicate keys") diff --git a/Sources/Kafka/KafkaConsumer.swift b/Sources/Kafka/KafkaConsumer.swift index 1163f862..78d7b858 100644 --- a/Sources/Kafka/KafkaConsumer.swift +++ b/Sources/Kafka/KafkaConsumer.swift @@ -146,6 +146,7 @@ public final class KafkaConsumer: Sendable, Service { NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, KafkaConsumerMessagesDelegate > + /// The configuration object of the consumer client. private let configuration: KafkaConsumerConfiguration /// A logger. @@ -222,6 +223,9 @@ public final class KafkaConsumer: Sendable, Service { if configuration.isAutoCommitEnabled == false { subscribedEvents.append(.offsetCommit) } + if configuration.metrics.enabled { + subscribedEvents.append(.statistics) + } let client = try RDKafkaClient.makeClient( type: .consumer, @@ -262,6 +266,9 @@ public final class KafkaConsumer: Sendable, Service { if configuration.isAutoCommitEnabled == false { subscribedEvents.append(.offsetCommit) } + if configuration.metrics.enabled { + subscribedEvents.append(.statistics) + } let client = try RDKafkaClient.makeClient( type: .consumer, @@ -374,7 +381,15 @@ public final class KafkaConsumer: Sendable, Service { switch nextAction { case .pollForEvents(let client): // Event poll to serve any events queued inside of `librdkafka`. - _ = client.eventPoll() + let events = client.eventPoll() + for event in events { + switch event { + case .statistics(let statistics): + self.configuration.metrics.update(with: statistics) + default: + break + } + } try await Task.sleep(for: self.configuration.pollInterval) case .terminatePollLoop: return diff --git a/Sources/Kafka/KafkaConsumerEvent.swift b/Sources/Kafka/KafkaConsumerEvent.swift index f654797b..089ed5f3 100644 --- a/Sources/Kafka/KafkaConsumerEvent.swift +++ b/Sources/Kafka/KafkaConsumerEvent.swift @@ -19,6 +19,8 @@ public enum KafkaConsumerEvent: Sendable, Hashable { internal init(_ event: RDKafkaClient.KafkaEvent) { switch event { + case .statistics: + fatalError("Cannot cast \(event) to KafkaConsumerEvent") case .deliveryReport: fatalError("Cannot cast \(event) to KafkaConsumerEvent") } diff --git a/Sources/Kafka/KafkaProducer.swift b/Sources/Kafka/KafkaProducer.swift index 2c6dc4fc..81742f3b 100644 --- a/Sources/Kafka/KafkaProducer.swift +++ b/Sources/Kafka/KafkaProducer.swift @@ -116,10 +116,16 @@ public final class KafkaProducer: Service, Sendable { ) throws { let stateMachine = NIOLockedValueBox(StateMachine(logger: logger)) + var subscribedEvents: [RDKafkaEvent] = [.log] // No .deliveryReport here! + + if configuration.metrics.enabled { + subscribedEvents.append(.statistics) + } + let client = try RDKafkaClient.makeClient( type: .producer, configDictionary: configuration.dictionary, - events: [.log], // No .deliveryReport here! + events: subscribedEvents, logger: logger ) @@ -156,10 +162,16 @@ public final class KafkaProducer: Service, Sendable { ) throws -> (KafkaProducer, KafkaProducerEvents) { let stateMachine = NIOLockedValueBox(StateMachine(logger: logger)) + var subscribedEvents: [RDKafkaEvent] = [.log, .deliveryReport] + // Listen to statistics events when statistics enabled + if configuration.metrics.enabled { + subscribedEvents.append(.statistics) + } + let client = try RDKafkaClient.makeClient( type: .producer, configDictionary: configuration.dictionary, - events: [.log, .deliveryReport], + events: subscribedEvents, logger: logger ) @@ -212,9 +224,12 @@ public final class KafkaProducer: Service, Sendable { case .pollAndYield(let client, let source): let events = client.eventPoll() for event in events { - let producerEvent = KafkaProducerEvent(event) - // Ignore YieldResult as we don't support back pressure in KafkaProducer - _ = source?.yield(producerEvent) + switch event { + case .statistics(let statistics): + self.configuration.metrics.update(with: statistics) + case .deliveryReport(let reports): + _ = source?.yield(.deliveryReports(reports)) + } } try await Task.sleep(for: self.configuration.pollInterval) case .flushFinishSourceAndTerminatePollLoop(let client, let source): diff --git a/Sources/Kafka/KafkaProducerEvent.swift b/Sources/Kafka/KafkaProducerEvent.swift index 9684f146..a4a27262 100644 --- a/Sources/Kafka/KafkaProducerEvent.swift +++ b/Sources/Kafka/KafkaProducerEvent.swift @@ -23,6 +23,8 @@ public enum KafkaProducerEvent: Sendable, Hashable { switch event { case .deliveryReport(results: let results): self = .deliveryReports(results) + case .statistics: + fatalError("Cannot cast \(event) to KafkaProducerEvent") } } } diff --git a/Sources/Kafka/RDKafka/RDKafkaClient.swift b/Sources/Kafka/RDKafka/RDKafkaClient.swift index 623f2c34..014a3e8c 100644 --- a/Sources/Kafka/RDKafka/RDKafkaClient.swift +++ b/Sources/Kafka/RDKafka/RDKafkaClient.swift @@ -14,6 +14,7 @@ import Crdkafka import Dispatch +import class Foundation.JSONDecoder import Logging /// Base class for ``KafkaProducer`` and ``KafkaConsumer``, @@ -295,6 +296,7 @@ final class RDKafkaClient: Sendable { /// Swift wrapper for events from `librdkafka`'s event queue. enum KafkaEvent { case deliveryReport(results: [KafkaDeliveryReport]) + case statistics(RDKafkaStatistics) } /// Poll the event `rd_kafka_queue_t` for new events. @@ -321,6 +323,10 @@ final class RDKafkaClient: Sendable { self.handleLogEvent(event) case .offsetCommit: self.handleOffsetCommitEvent(event) + case .statistics: + if let forwardEvent = self.handleStatistics(event) { + events.append(forwardEvent) + } case .none: // Finished reading events, return early return events @@ -352,6 +358,22 @@ final class RDKafkaClient: Sendable { return .deliveryReport(results: deliveryReportResults) } + /// Handle event of type `RDKafkaEvent.statistics`. + /// + /// - Parameter event: Pointer to underlying `rd_kafka_event_t`. + private func handleStatistics(_ event: OpaquePointer?) -> KafkaEvent? { + let jsonStr = String(cString: rd_kafka_event_stats(event)) + do { + if let jsonData = jsonStr.data(using: .utf8) { + let json = try JSONDecoder().decode(RDKafkaStatistics.self, from: jsonData) + return .statistics(json) + } + } catch { + assertionFailure("Error occurred when decoding JSON statistics: \(error) when decoding \(jsonStr)") + } + return nil + } + /// Handle event of type `RDKafkaEvent.log`. /// /// - Parameter event: Pointer to underlying `rd_kafka_event_t`. diff --git a/Sources/Kafka/RDKafka/RDKafkaStatistics.swift b/Sources/Kafka/RDKafka/RDKafkaStatistics.swift new file mode 100644 index 00000000..96ceb4b2 --- /dev/null +++ b/Sources/Kafka/RDKafka/RDKafkaStatistics.swift @@ -0,0 +1,45 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-client open source project +// +// Copyright (c) 2023 Apple Inc. and the swift-kafka-client project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +// MARK: - RDKafkaStatistics + +struct RDKafkaStatistics: Hashable, Codable { + let queuedOperation: Int? + let queuedProducerMessages: Int? + let queuedProducerMessagesSize: Int? + let topicsInMetadataCache: Int? + let totalKafkaBrokerRequests: Int? + let totalKafkaBrokerBytesSent: Int? + let totalKafkaBrokerResponses: Int? + let totalKafkaBrokerResponsesSize: Int? + let totalKafkaBrokerMessagesSent: Int? + let totalKafkaBrokerMessagesBytesSent: Int? + let totalKafkaBrokerMessagesRecieved: Int? + let totalKafkaBrokerMessagesBytesRecieved: Int? + + enum CodingKeys: String, CodingKey { + case queuedOperation = "replyq" + case queuedProducerMessages = "msg_cnt" + case queuedProducerMessagesSize = "msg_size" + case topicsInMetadataCache = "metadata_cache_cnt" + case totalKafkaBrokerRequests = "tx" + case totalKafkaBrokerBytesSent = "tx_bytes" + case totalKafkaBrokerResponses = "rx" + case totalKafkaBrokerResponsesSize = "rx_bytes" + case totalKafkaBrokerMessagesSent = "txmsgs" + case totalKafkaBrokerMessagesBytesSent = "txmsg_bytes" + case totalKafkaBrokerMessagesRecieved = "rxmsgs" + case totalKafkaBrokerMessagesBytesRecieved = "rxmsg_bytes" + } +} diff --git a/Tests/KafkaTests/KafkaConsumerTests.swift b/Tests/KafkaTests/KafkaConsumerTests.swift index 212853c6..fc3da31a 100644 --- a/Tests/KafkaTests/KafkaConsumerTests.swift +++ b/Tests/KafkaTests/KafkaConsumerTests.swift @@ -12,9 +12,12 @@ // //===----------------------------------------------------------------------===// +@testable import CoreMetrics // for MetricsSystem.bootstrapInternal import struct Foundation.UUID @testable import Kafka import Logging +import Metrics +import MetricsTestKit import ServiceLifecycle import XCTest @@ -33,6 +36,17 @@ import XCTest // zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties final class KafkaConsumerTests: XCTestCase { + var metrics: TestMetrics! = TestMetrics() + + override func setUp() async throws { + MetricsSystem.bootstrapInternal(self.metrics) + } + + override func tearDown() async throws { + self.metrics = nil + MetricsSystem.bootstrapInternal(NOOPMetricsHandler.instance) + } + func testConsumerLog() async throws { let recorder = LogEventRecorder() let mockLogger = Logger(label: "kafka.test.consumer.log") { @@ -82,4 +96,35 @@ final class KafkaConsumerTests: XCTestCase { ) } } + + func testConsumerStatistics() async throws { + let uniqueGroupID = UUID().uuidString + var config = KafkaConsumerConfiguration( + consumptionStrategy: .group(id: uniqueGroupID, topics: ["this-topic-does-not-exist"]), + bootstrapBrokerAddresses: [] + ) + + config.metrics.updateInterval = .milliseconds(100) + config.metrics.queuedOperation = .init(label: "operations") + + let consumer = try KafkaConsumer(configuration: config, logger: .kafkaTest) + + let svcGroupConfig = ServiceGroupConfiguration(services: [consumer], logger: .kafkaTest) + let serviceGroup = ServiceGroup(configuration: svcGroupConfig) + + try await withThrowingTaskGroup(of: Void.self) { group in + // Run Task + group.addTask { + try await serviceGroup.run() + } + + try await Task.sleep(for: .seconds(1)) + + // Shutdown the serviceGroup + await serviceGroup.triggerGracefulShutdown() + } + + let value = try metrics.expectGauge("operations").lastValue + XCTAssertNotNil(value) + } } diff --git a/Tests/KafkaTests/KafkaProducerTests.swift b/Tests/KafkaTests/KafkaProducerTests.swift index f4124898..de083b97 100644 --- a/Tests/KafkaTests/KafkaProducerTests.swift +++ b/Tests/KafkaTests/KafkaProducerTests.swift @@ -12,8 +12,11 @@ // //===----------------------------------------------------------------------===// +@testable import CoreMetrics // for MetricsSystem.bootstrapInternal @testable import Kafka import Logging +import Metrics +import MetricsTestKit import NIOCore import ServiceLifecycle import XCTest @@ -38,6 +41,7 @@ final class KafkaProducerTests: XCTestCase { let kafkaPort: Int = .init(ProcessInfo.processInfo.environment["KAFKA_PORT"] ?? "9092")! var bootstrapBrokerAddress: KafkaConfiguration.BrokerAddress! var config: KafkaProducerConfiguration! + var metrics: TestMetrics! = TestMetrics() override func setUpWithError() throws { self.bootstrapBrokerAddress = KafkaConfiguration.BrokerAddress( @@ -49,11 +53,16 @@ final class KafkaProducerTests: XCTestCase { bootstrapBrokerAddresses: [self.bootstrapBrokerAddress] ) self.config.broker.addressFamily = .v4 + + MetricsSystem.bootstrapInternal(self.metrics) } override func tearDownWithError() throws { self.bootstrapBrokerAddress = nil self.config = nil + + self.metrics = nil + MetricsSystem.bootstrapInternal(NOOPMetricsHandler.instance) } func testSend() async throws { @@ -340,4 +349,32 @@ final class KafkaProducerTests: XCTestCase { XCTAssertNil(producerCopy) } + + func testProducerStatistics() async throws { + self.config.metrics.updateInterval = .milliseconds(100) + self.config.metrics.queuedOperation = .init(label: "operations") + + let producer = try KafkaProducer( + configuration: self.config, + logger: .kafkaTest + ) + + let svcGroupConfig = ServiceGroupConfiguration(services: [producer], logger: .kafkaTest) + let serviceGroup = ServiceGroup(configuration: svcGroupConfig) + + try await withThrowingTaskGroup(of: Void.self) { group in + // Run Task + group.addTask { + try await serviceGroup.run() + } + + try await Task.sleep(for: .seconds(1)) + + // Shutdown the serviceGroup + await serviceGroup.triggerGracefulShutdown() + } + + let value = try metrics.expectGauge("operations").lastValue + XCTAssertNotNil(value) + } }