Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaConsumer: back pressure + improved read speed #139

Merged
merged 5 commits into from
Nov 1, 2023

Conversation

felixschlegel
Copy link
Contributor

Motivation:

Closes #131.

Example:

I ran an example consumer reading a topic with approx. 11_000_000 messages:

var consumerConfig = KafkaConsumerConfiguration(
    consumptionStrategy: .partition(
        KafkaPartition(rawValue: 0),
        topic: "test-topic",
        offset: KafkaOffset(rawValue: 0) // Important: Read from beginning!
    ),
    bootstrapBrokerAddresses: [self.bootstrapBrokerAddress]
)
consumerConfig.pollInterval = .zero
consumerConfig.autoOffsetReset = .beginning // Always read topics from beginning
consumerConfig.broker.addressFamily = .v4

let consumer = try KafkaConsumer(
    configuration: consumerConfig,
    logger: .kafkaTest
)

let serviceGroupConfiguration = ServiceGroupConfiguration(services: [consumer], logger: .kafkaTest)
let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration)

try await withThrowingTaskGroup(of: Void.self) { group in
    // Run Task
    group.addTask {
        try await serviceGroup.run()
    }

    // Consumer Task
    group.addTask {
        var count = 0
        for try await message in consumer.messages {
            _ = message // drop message
            count += 1
            try await Task.sleep(for: .milliseconds(1))
            if count % 1000 == 0 {
                print(count)
            }
        }
    }

    // Wait for Consumer Task to complete
    try await group.next()
    // Shutdown the serviceGroup
    await serviceGroup.triggerGracefulShutdown()
}
Before

(Without back pressure)

Screenshot 2023-10-11 at 13 23 53
After

(With back pressure)

Screenshot 2023-10-11 at 20 48 18

This result is tolerable since the queued.max.messages.kbytes configuration property defaults to prefetching at max ~65 MegaBytes of messages. Exposing queued.max.messages.kbytes will be done in a follow-up PR.

Modifications:

  • re-add KafkaConsumerConfiguration.backPressureStrategy: BackPressureStrategy, currently allowing users to add high-low-watermark backpressure to their KafkaConsumers
  • KafkaConsumer:
    • make KafkaConsumerMessages use NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark as backpressure strategy
    • remove rd_kafka_poll_set_consumer -> use two separate queues for consumer events and consumer messages so we can exert backpressure on the consumer message queue
    • remove idle polling mechanism where incoming messages were discarded when KafkaConsumerMessages was terminated -> we now have to independent queues
    • rename .pollForAndYieldMessage -> .pollForEventsAndMessages
    • refactor State and add ConsumerMessagesSequenceState
  • KafkaProducer:
    • rename .consumptionStopped -> .eventConsumptionFinished
  • RDKafkaClient:
    • bring back consumerPoll() * eventPoll(): only queue main queue for events since consumer messages are now handled on a different queue

/// See ``KafkaConsumerConfiguration/BackPressureStrategy-swift.struct`` for more information.
public var backPressureStrategy: BackPressureStrategy = .watermark(
low: 10,
high: 50
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you happy with these default values?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these values should work good for a wide range of cases

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine by me as well. Kafka is all around people tuning the settings to fit their messages.

Motivation:

Closes swift-server#131.

Modifications:

* re-add `KafkaConsumerConfiguration.backPressureStrategy:
  BackPressureStrategy`, currently allowing users to add
  high-low-watermark backpressure to their `KafkaConsumer`s
* `KafkaConsumer`:
    * make `KafkaConsumerMessages` use `NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark`
      as backpressure strategy
    * remove `rd_kafka_poll_set_consumer` -> use two separate queues for
      consumer events and consumer messages so we can exert backpressure
      on the consumer message queue
    * remove idle polling mechanism where incoming messages were
      discarded when `KafkaConsumerMessages` was terminated -> we now
      have to independent queues
    * rename `.pollForAndYieldMessage` -> `.pollForEventsAndMessages`
    * refactor `State` and add `ConsumerMessagesSequenceState`
* `KafkaProducer`:
    * rename `.consumptionStopped` -> `.eventConsumptionFinished`
* `RDKafkaClient`:
    * bring back `consumerPoll()`
    * `eventPoll()`: only queue main queue for events since consumer messages are now handled on a different queue

defer {
// Destroy message otherwise poll() will block forever
rd_kafka_message_destroy(messagePointer)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm.. That is out of scope but probably, we may not destroy this message but rather retain it inside KafkaConsumerMessage thus removing allocations of ByteBuffer and other structures?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, please create a separate issue for that 😄

// Poll for new consumer message.
var result: Result<KafkaConsumerMessage, Error>?
do {
if let message = try client.consumerPoll() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that previously we would poll for up to 100 messages before sleep and it looks like now it would be just one message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I read this correctly we are polling a single message and then yielding a single message right? It would probably be better if we read them in batches of 100 and then yield all of them at once. Would mean we have to acquire the locks a lot less.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure it is entirely what I mean. This comment was related to changes before introducing task group.
There was a sleep after every consumed message.
So, it is not relevant now except probably this part: https://github.com/swift-server/swift-kafka-client/pull/139/files#r1360663871

/// See ``KafkaConsumerConfiguration/BackPressureStrategy-swift.struct`` for more information.
public var backPressureStrategy: BackPressureStrategy = .watermark(
low: 10,
high: 50
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these values should work good for a wide range of cases

/// See ``KafkaConsumerConfiguration/BackPressureStrategy-swift.struct`` for more information.
public var backPressureStrategy: BackPressureStrategy = .watermark(
low: 10,
high: 50
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine by me as well. Kafka is all around people tuning the settings to fit their messages.

// Poll for new consumer message.
var result: Result<KafkaConsumerMessage, Error>?
do {
if let message = try client.consumerPoll() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to do more here than just calling the two separate polls. We probably want to serve the consumer poll in a child task and make it completely driven by the backpressure of the async sequence. However, we need to keep shutdown in mind so we need to inform the child task about this.

Saying all of that I am not yet convinced it is the right solution. The one question that I am asking myself right now is how do we get notified when the consumer queue has more messages again when we do the poll based and decouple the two queue polls. Is there a callback in rdkafka that we can set once we get nil from consumerPoll so that we can enqueue ourselves?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe there is no such notification in librdkafka. I guess the desired thing to do with native API is to use blocking call for consumer poll which contradicts with swift concurrency contract.
From the librdkafka sample (https://github.com/confluentinc/librdkafka/blob/master/examples/consumer.c#L209C17-L209C55):

for (;;) {
     const auto * msg = rd_kafka_consumer_poll(consumer.kafkaHandle, 100 /* 100ms timeout */);
     if msg == nullptr {
         continue
     }
     // ...
}

Other thought after confirming with librdkafka documentation is that all callbacks are called on queue's polls (https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#threads-and-callbacks). That, unfortunately, makes any callbacks without polls useless.

I guess making dedicated thread for blocking polls is not inline with swift evolution. However, with swift concurrency probably it is okay to sleep/yield task for some configurable timeout (e.g. 10ms) if we have no more messages to read otherwise continue to read until bump into backpressure. Additionally, it is possible to make poll intervals adaptive depending on message flow (#128).

Modifications:

* have two state machines:
    1. consumer state itself
    2. state of consumer messages async sequence
@felixschlegel felixschlegel changed the title Add Back Pressure to KafkaConsumer KafkaConsumer: back pressure + improved read speed Oct 16, 2023
@felixschlegel
Copy link
Contributor Author

Hey folks,

I have implemented your requested changes:

  • have two poll loops, one for normal events (every pollInterval time units) and one for consumer messages (read as long as you can without sleeping)

The poll loop for consumer messages follows a similar approach to what was proposed in #128 .

Also, I want to add a benchmark test evaluating that we achieve good memory usage (back pressure) and good read speeds with this implementation so it would be good if we can get #140 over the line 😄

Please let me know what you think!

Best,
Felix

Comment on lines +365 to +366
try await group.next()
try await group.next()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want to do a group.cancelAll() after the first returned without throwing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, what about the following case:

When the client application stops reading the KafkaConsumerMessages sequence the messageRunLoop() will return. However, the eventRunLoop() might still be processing a consumer close so I don't see a benefit in cancelling here, but open to discussion

case .stopProducing:
self.stateMachine.withLockedValue { $0.stopProducing() }
case .dropped:
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we probably want to return right?

// Poll for new consumer message.
var result: Result<KafkaConsumerMessage, Error>?
do {
if let message = try client.consumerPoll() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I read this correctly we are polling a single message and then yielding a single message right? It would probably be better if we read them in batches of 100 and then yield all of them at once. Would mean we have to acquire the locks a lot less.

@@ -23,8 +23,6 @@ public enum KafkaProducerEvent: Sendable, Hashable {
switch event {
case .deliveryReport(results: let results):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm.. I guess it is out of scope here but should we have backpressure for delivery reports as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can since they happen on the eventsQueue unless there is a way to separate events to different queues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the problem is that we should call poll() at regular intervals to serve any queued callbacks/events. As @FranzBusch points out, we would need librdkafka to have a separate "delivery reports queue" for backpressure to make sense here since some events like log events should still be served even when our events AsyncSequence with all the .deliveryReport events is suspended.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, sure... We can only separate log queue from others...
So, we only can make some "unfair" backpressue, i.e. sometimes poll even if receive stopProducing which is dirty solution...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we could separate the log queue but then there is also other events like commit confirmation

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I got it, we may close this discussion, probably to think about it with separate case as it is out of scope for this PR anyway and might be not a problem (at least far).

break
}

self.stateMachine.withLockedValue { $0.newMessagesProduced() }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess It might make sense to continue without sleep after message was produced until we will out of messages again or bump into backpressure.

case .pollForEvents(let client):
// Event poll to serve any events queued inside of `librdkafka`.
_ = client.eventPoll()
try await Task.sleep(for: self.configuration.pollInterval)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is obviously out of scope regarding sleeping here and likely should be done separately.
What do you think about symmetry with consumer polls thus sleeping only in case we out of events otherwise continue to poll events?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let's do this in a follow-up PR! This probably relates to the #128 issue

}

// Reached the end of the topic+partition queue on the broker
if messagePointer.pointee.err == RD_KAFKA_RESP_ERR__PARTITION_EOF {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I wonder if now all errors will come here or some of them will be received in eventsPoll()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is going to be a clear split now between message related errors and run loop errors.

case .produceMore:
break
case .stopProducing:
self.stateMachine.withLockedValue { $0.stopProducing() }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes indeed, very good catch!

Modifications:

* `KafkaConsumer`:
    * end consumer message poll loop when async sequence drops message
    * do not sleep if we picked up reading new messages again after we
      finished reading a partition
    * `messageRunLoop`:
        * fix `fatalError` where `newMessagesProduced()` is invoked after `stopProducing()`
    * add func `batchConsumerPoll` that reads a batch of messages to
      avoid acquiring the lock in `messageRunLoop` too often
@felixschlegel
Copy link
Contributor Author

We are failing on 5.10 because OpaquePointer is not Sendable — is that a deliberate change to Swift 5.10? The documentation still states that it should be Sendable

/// - Parameters:
/// - client: Client used for handling the connection to the Kafka cluster.
/// - maxMessages: Maximum amount of consumer messages to read in this invocation.
private func batchConsumerPoll(
Copy link
Collaborator

@blindspotbounty blindspotbounty Oct 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for being picky and it is probably up to fine tuning but there is a native method in librdkafka that allows to get a batch within one poll:

RD_EXPORT
ssize_t rd_kafka_consume_batch(rd_kafka_topic_t *rkt,
                               int32_t partition,
                               int timeout_ms,
                               rd_kafka_message_t **rkmessages,
                               size_t rkmessages_size);

UPD: plus it could be too low level...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's track that in a follow up issue

/// - maxMessages: Maximum amount of consumer messages to read in this invocation.
private func batchConsumerPoll(
client: RDKafkaClient,
maxMessages: Int = 100
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably should provide this argument from poll based on backpressure. Currently, we have 50 messages high watermark as default value for backpressure, so we may break backpressure as twice if read 100 messages and enqueue them to stream.
Not sure what is right here but maybe we can call it with one of those high or low watermark limits?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we should probably set this to the difference of high - low.

Copy link
Contributor

@FranzBusch FranzBusch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, this looks good to me now and I think we should go-ahead and merge it and then see if we encounter any problems with it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Enable back pressure for KafkaConsumer
3 participants