-
Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction #20907
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
| */ | ||
| public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> { | ||
|
|
||
| private static final Logger log = LoggerFactory.getLogger(ConsumerGroup.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have to use the LogContext from the GroupMetadataManager, otherwise we won't prefix log lines with [GroupCoordinator id=%d]. When adding LogContext to the constructor, it should go as the first parameter.
...rdinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
Show resolved
Hide resolved
| * @param assignment The assignment. | ||
| * @param expectedEpoch The expected epoch. | ||
| * @throws IllegalStateException if the epoch does not match the expected one. | ||
| * @throws IllegalStateException if the epoch does not exist. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about this again, I think we can also hit the IllegalStateException case
Member A is assigned topic 1 partition X
[removed by compaction] Member A is unassigned topic 1 partition X
Member B is assigned topic 1 partition X
Member B is unassigned topic 1 partition X
Member A is assigned topic 2 partition Y
so we have to also turn that into log message and add another test.
The same for streams.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean hitting the same IllegalStateException or the second check in the method? I made a second test with that order for the ConsumerGroup and didn't have any more issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm referring to the remaining IllegalStateException case Cannot remove the epoch %d from %s because it does not have any epoch
| .build(); | ||
|
|
||
| // Member A is assigned partition bar-0,1 and member B is assigned partition bar-2 | ||
| assignor.prepareGroupAssignment(new GroupAssignment( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be replaying the records directly, as if we are loading the group coordinator. The records can be created using GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord.
The same for streams
...rdinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
Outdated
Show resolved
Hide resolved
...rdinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
Outdated
Show resolved
Hide resolved
...rdinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
Outdated
Show resolved
Hide resolved
| public ConsumerGroup( | ||
| LogContext logContext, | ||
| SnapshotRegistry snapshotRegistry, | ||
| String groupId | ||
| ) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove the other constructor? In the tests we can fill in the extra parameter with new LogContext().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the second constructor can just call the first unless it really needs to be removed? That way it's still not as redundant.
| } | ||
|
|
||
| @Test | ||
| public void testConsumerGroupResolvesOnCompaction() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to simpify the tests by removing the consumer group setup, target assignment records and heartbearts? To check what's in the currentPartitionEpoch map, we can assert on consumerGroup.currentPartitionEpoch(topicId, partitionId).
The group coordinator sometimes throws an IllegalStateException when concurrent compaction is in progress but the issues self-resolve if allowed to complete. This PR replaces the exception with a log message for Consumer groups and Streams groups.
Testing: Added 2 unit tests to verify that this change does not introduce new bugs.