-
Notifications
You must be signed in to change notification settings - Fork 14.7k
KAFKA-19765: Store the last used assignment configuration in the group metadata #20702
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! I left a lot of comments, but nothing fundamental.
group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
Outdated
Show resolved
Hide resolved
// We bump the group epoch. | ||
int groupEpoch = group.groupEpoch() + 1; | ||
records.add(newStreamsGroupMetadataRecord(group.groupId(), groupEpoch, group.metadataHash(), group.validatedTopologyEpoch())); | ||
records.add(newStreamsGroupMetadataRecord(group.groupId(), groupEpoch, group.metadataHash(), group.validatedTopologyEpoch(), Map.of())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait, why are dropping the assignment configuration on the floor here? We should use group.assignmentConfiguration here.
...oordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
Outdated
Show resolved
Hide resolved
...est/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
Show resolved
Hide resolved
...rc/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
Outdated
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Outdated
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements support for storing assignment configurations in Kafka Streams group metadata to enable dynamic configuration changes that affect task assignment. The implementation tracks assignment configurations and bumps the group epoch when these configurations change, ensuring proper rebalancing behavior.
- Added assignment configuration storage to streams group metadata
- Implemented group epoch increment when assignment configurations change
- Updated all test cases to accommodate the new assignment configuration parameter
Reviewed Changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 2 comments.
Show a summary per file
File | Description |
---|---|
StreamsGroupMetadataValue.json | Added AssignmentConfigs field to store assignment configuration parameters as key-value pairs |
StreamsGroup.java | Added assignmentConfigs field and related methods to track assignment configurations |
StreamsCoordinatorRecordHelpers.java | Updated newStreamsGroupMetadataRecord to include assignment configurations parameter |
GroupMetadataManager.java | Added logic to detect assignment config changes and bump group epoch accordingly |
StreamsGroupBuilder.java | Updated test builder to pass assignment configs parameter |
StreamsCoordinatorRecordHelpersTest.java | Updated test cases to include assignment configs parameter |
GroupMetadataManagerTest.java | Updated all test cases and added new test for assignment config changes |
Comments suppressed due to low confidence (1)
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:1
- Extra space before Map.of - should be single space between parameters.
/*
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated 4 comments.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
streamsGroup.setLastAssignmentConfigs( | ||
value.assignmentConfigs().stream() | ||
.collect(Collectors.toMap( | ||
StreamsGroupMetadataValue.AssignmentConfig::key, | ||
StreamsGroupMetadataValue.AssignmentConfig::value | ||
)) | ||
); |
Copilot
AI
Oct 16, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The schema marks AssignmentConfigs as nullable with a default of null; for older (pre-field) or null-valued records value.assignmentConfigs() will be null, causing a NullPointerException when calling stream(). Guard with a null/empty check (e.g., List<...> configs = value.assignmentConfigs(); if (configs != null) { ... } else set empty) to ensure backward-compatible replay.
streamsGroup.setLastAssignmentConfigs( | |
value.assignmentConfigs().stream() | |
.collect(Collectors.toMap( | |
StreamsGroupMetadataValue.AssignmentConfig::key, | |
StreamsGroupMetadataValue.AssignmentConfig::value | |
)) | |
); | |
if (value.assignmentConfigs() != null) { | |
streamsGroup.setLastAssignmentConfigs( | |
value.assignmentConfigs().stream() | |
.collect(Collectors.toMap( | |
StreamsGroupMetadataValue.AssignmentConfig::key, | |
StreamsGroupMetadataValue.AssignmentConfig::value | |
)) | |
); | |
} else { | |
streamsGroup.setLastAssignmentConfigs(Collections.emptyMap()); | |
} |
Copilot uses AI. Check for mistakes.
group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json
Outdated
Show resolved
Hide resolved
"about": "The hash of all topics in the group." }, | ||
{ "name": "ValidatedTopologyEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "default": -1, "type": "int32", | ||
"about": "The topology epoch whose topics where validated to be present in a valid configuration in the metadata." } | ||
"about": "The topology epoch whose topics where validated to be present in a valid configuration in the metadata." }, |
Copilot
AI
Oct 16, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected spelling of 'where' to 'were'.
"about": "The topology epoch whose topics where validated to be present in a valid configuration in the metadata." }, | |
"about": "The topology epoch whose topics were validated to be present in a valid configuration in the metadata." }, |
Copilot uses AI. Check for mistakes.
...oordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates, looks really good. Left 4 minor nits, but this is mostly ready to merge.
{ "name": "ValidatedTopologyEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "default": -1, "type": "int32", | ||
"about": "The topology epoch whose topics where validated to be present in a valid configuration in the metadata." } | ||
"about": "The topology epoch whose topics where validated to be present in a valid configuration in the metadata." }, | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, we typically use 2 lines per field in these files
} | ||
|
||
@Test | ||
public void testNewStreamsGroupEpochRecordNullGroupId() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should be called testNewStreamsGroupMetadataRecord (sorry, not your change, but while you are at it...)
Objects.requireNonNull(assignmentConfigs, "assignmentConfigs should not be null here"); | ||
|
||
List<StreamsGroupMetadataValue.AssignmentConfig> assignmentConfigList = new ArrayList<>(); | ||
if (!assignmentConfigs.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think it's easier to read without this if
"nullableVersions": "0+", | ||
"tag": 1, | ||
"default": null, | ||
"about": "Assignment configuration parameters as key-value pairs." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "Last used assignment configuration..."?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, yes. I missed that we are not handling null
in replay
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
What
Ticket: https://issues.apache.org/jira/browse/KAFKA-19765
In KIP-1071, there are configurations that affect the assignment, and
that can be configured dynamically.
the StreamsGroupMetadata as a collection of key-value pairs.
bumped.
Reviewers: Lucas Brutschy [email protected]