Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18379: Enforce resigned cannot transition to any other state in same epoch #19236

Open
wants to merge 10 commits into
base: trunk
Choose a base branch
from
10 changes: 9 additions & 1 deletion raft/src/main/java/org/apache/kafka/raft/QuorumState.java
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ public void transitionToResigned(List<ReplicaKey> preferredSuccessors) {
*/
public void transitionToUnattached(int epoch, OptionalInt leaderId) {
int currentEpoch = state.epoch();
if (epoch < currentEpoch || (epoch == currentEpoch && !isProspective())) {
if (epoch < currentEpoch || (epoch == currentEpoch && (!isProspective() || isResigned()))) {
throw new IllegalStateException(
String.format(
"Cannot transition to Unattached with epoch %d from current state %s",
Expand Down Expand Up @@ -576,6 +576,14 @@ public void transitionToFollower(int epoch, int leaderId, Endpoints endpoints) {
state
)
);
} else if (isResigned()) {
throw new IllegalStateException(
String.format(
"Cannot transition to Follower within same epoch %s from state %s",
epoch,
state
)
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2505,7 +2505,6 @@ public void testLeaderToAnyStateLowerEpoch(KRaftVersion kraftVersion) {
/**
* Test transitions from Resigned
*/
// KAFKA-18379 to fill in the rest of the cases
@ParameterizedTest
@EnumSource(value = KRaftVersion.class)
public void testResignedToFollowerInSameEpoch(KRaftVersion kraftVersion) {
Expand All @@ -2519,8 +2518,7 @@ public void testResignedToFollowerInSameEpoch(KRaftVersion kraftVersion) {
state.initialize(new OffsetAndEpoch(0L, logEndEpoch));
assertTrue(state.isResigned());
assertThrows(IllegalStateException.class, () -> state.transitionToFollower(epoch, localId, voters.listeners(localId)));
// KAFKA-18379 will fix this
state.transitionToFollower(epoch, node1, voters.listeners(node1));
assertThrows(IllegalStateException.class, () -> state.transitionToFollower(epoch, node1, voters.listeners(node1)));
}

@ParameterizedTest
Expand Down