-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Duplicated messages when default rebalance happening with 2+ consumers and offset auto commit disabled #136
Comments
After change to In other words, for basic use-case, it will be: public struct KafkaConsumerMessages: Sendable, AsyncSequence {
...
public struct AsyncIterator: AsyncIteratorProtocol {
public func next() async throws -> Element? {
let action = self.stateMachineHolder.stateMachine.withLockedValue { $0.nextConsumerPollLoopAction() }
switch action {
case .poll(let client):
if let message = try client.consumerPoll() { // non-blocking call
return message
}
// + e.g. DispatchQueue + blocking call
case .suspend:
try await Task.sleep(for: pollInterval)
case .terminate:
return nil I believe it would allow to solve several problems:
@FranzBusch, @felixschlegel what do you think? |
I've put suggestion above into PR with test that reproduces the issue #158. |
Currently, swift-kafka-client does not support rebalance callback/event. However, that is pretty important thing to avoid duplicated messages from the broker.
The typical scenario for a lot of applications is to receive a message, process it and then commit offset:
or make it time to time + at the end of lifetime when bulk message processing is required:
Usually, to process messages in parallel, topics are divided to partitions that are served by consumers independently.
That lead to requirement for re-assignment of those partitions when such consumer is being added or removed to distribute partitions evenly.
librdkafka has default strategy to assign or remove partitions automatically, however it also seeks for latest committed offset from broker.
With automatic offset commit disabled, it leads to a race, specifically in two places:
messages
stream until commitmessages
streamIn other words, scenario is the following:
Though, partially probability of such race could be lowered in the library itself, it cannot be eliminated without application that may still process messages. Furthermore, it will break Kafka EOS for transactions in future.
Therefore, it would be nice if library allows downstream application to deal with rebalance in one of the following ways (or any other):
To demonstrate possible impact, I've used the following example in branch (https://github.com/ordo-one/swift-kafka-client/tree/test-rebalance-lead-to-messages-resending, to run:
swift run -c release Snapshot
based on example for previous cases).This sample as a first step produces 15_000_000 messages within 6 partitions.
Then it starts 2 consumers within the same consumer group and delay of 20 seconds between starts.
That code has two problems related to rebalance:
The text was updated successfully, but these errors were encountered: