Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18796: Added more information to error message when assertion fails for acquisition lock timeout #19247

Merged
merged 3 commits into from
Mar 24, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/src/main/java/kafka/server/share/SharePartition.java
Original file line number Diff line number Diff line change
@@ -187,6 +187,10 @@ public static RecordState forId(byte id) {
throw new IllegalArgumentException("Unknown record state id: " + id);
}
}

public byte id() {
return this.id;
}
}

/**
72 changes: 47 additions & 25 deletions core/src/test/java/kafka/server/share/SharePartitionTest.java
Original file line number Diff line number Diff line change
@@ -2912,7 +2912,7 @@ public void testAcquisitionLockForAcquiringSingleRecord() throws InterruptedExce
sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask() == null &&
sharePartition.timer().size() == 0,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> ACQUISITION_LOCK_NEVER_GOT_RELEASED);
() -> assertionFailedMessage(sharePartition, Map.of(0L, List.of())));

assertEquals(1, sharePartitionMetrics.acquisitionLockTimeoutPerSec().count());
assertTrue(sharePartitionMetrics.acquisitionLockTimeoutPerSec().meanRate() > 0);
@@ -2938,7 +2938,7 @@ public void testAcquisitionLockForAcquiringMultipleRecords() throws InterruptedE
&& sharePartition.cachedState().get(10L).batchDeliveryCount() == 1
&& sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask() == null,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> ACQUISITION_LOCK_NEVER_GOT_RELEASED);
() -> assertionFailedMessage(sharePartition, Map.of(10L, List.of())));

assertEquals(5, sharePartitionMetrics.acquisitionLockTimeoutPerSec().count());
assertTrue(sharePartitionMetrics.acquisitionLockTimeoutPerSec().meanRate() > 0);
@@ -2973,7 +2973,7 @@ public void testAcquisitionLockForAcquiringMultipleRecordsWithOverlapAndNewBatch
sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask() == null &&
sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask() == null,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> ACQUISITION_LOCK_NEVER_GOT_RELEASED);
() -> assertionFailedMessage(sharePartition, Map.of(0L, List.of(), 5L, List.of())));

assertEquals(10, sharePartitionMetrics.acquisitionLockTimeoutPerSec().count());
assertTrue(sharePartitionMetrics.acquisitionLockTimeoutPerSec().meanRate() > 0);
@@ -2997,7 +2997,7 @@ public void testAcquisitionLockForAcquiringSameBatchAgain() throws InterruptedEx
sharePartition.nextFetchOffset() == 10 &&
sharePartition.cachedState().get(10L).batchState() == RecordState.AVAILABLE,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> ACQUISITION_LOCK_NEVER_GOT_RELEASED);
() -> assertionFailedMessage(sharePartition, Map.of(10L, List.of())));

// Acquire the same batch again.
fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 5);
@@ -3036,7 +3036,7 @@ public void testAcquisitionLockOnAcknowledgingSingleRecordBatch() throws Interru
sharePartition.cachedState().get(0L).batchDeliveryCount() == 1 &&
sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask() == null,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> ACQUISITION_LOCK_NEVER_GOT_RELEASED);
() -> assertionFailedMessage(sharePartition, Map.of(0L, List.of())));
}

@Test
@@ -3065,7 +3065,7 @@ public void testAcquisitionLockOnAcknowledgingMultipleRecordBatch() throws Inter
sharePartition.cachedState().get(5L).batchDeliveryCount() == 1 &&
sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask() == null,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> ACQUISITION_LOCK_NEVER_GOT_RELEASED);
() -> assertionFailedMessage(sharePartition, Map.of(5L, List.of())));
}

@Test
@@ -3115,7 +3115,7 @@ public void testAcquisitionLockOnAcknowledgingMultipleRecordBatchWithGapOffsets(
sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask() == null &&
sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask() == null,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> ACQUISITION_LOCK_NEVER_GOT_RELEASED);
() -> assertionFailedMessage(sharePartition, Map.of(1L, List.of(), 5L, List.of(), 10L, List.of())));

assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(1L).batchState());
assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(5L).batchState());
@@ -3142,7 +3142,7 @@ public void testAcquisitionLockForAcquiringSubsetBatchAgain() throws Interrupted
sharePartition.cachedState().get(10L).batchState() == RecordState.AVAILABLE &&
sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask() == null,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> ACQUISITION_LOCK_NEVER_GOT_RELEASED);
() -> assertionFailedMessage(sharePartition, Map.of(10L, List.of())));

// Acquire subset of records again.
fetchAcquiredRecords(sharePartition, memoryRecords(3, 12), 3);
@@ -3176,7 +3176,7 @@ public void testAcquisitionLockForAcquiringSubsetBatchAgain() throws Interrupted
expectedOffsetStateMap.equals(sharePartition.cachedState().get(10L).offsetState());
},
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> ACQUISITION_LOCK_NEVER_GOT_RELEASED);
() -> assertionFailedMessage(sharePartition, Map.of(10L, List.of(10L, 11L, 12L, 13L, 14L, 15L, 16L, 17L))));
assertNull(sharePartition.cachedState().get(10L).offsetState().get(10L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(11L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).offsetState().get(12L).acquisitionLockTimeoutTask());
@@ -3264,7 +3264,7 @@ public void testAcquisitionLockOnAcknowledgingMultipleSubsetRecordBatchWithGapOf
expectedOffsetStateMap2.equals(sharePartition.cachedState().get(10L).offsetState());
},
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> ACQUISITION_LOCK_NEVER_GOT_RELEASED);
() -> assertionFailedMessage(sharePartition, Map.of(5L, List.of(5L, 6L), 10L, List.of(10L, 11L, 12L, 13L, 14L, 15L, 16L, 17L, 18L, 19L, 20L))));

assertNull(sharePartition.cachedState().get(5L).offsetState().get(5L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(5L).offsetState().get(6L).acquisitionLockTimeoutTask());
@@ -3308,7 +3308,7 @@ public void testAcquisitionLockTimeoutCauseMaxDeliveryCountExceed() throws Inter
sharePartition.cachedState().get(10L).batchDeliveryCount() == 1 &&
sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask() == null,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> ACQUISITION_LOCK_NEVER_GOT_RELEASED);
() -> assertionFailedMessage(sharePartition, Map.of(10L, List.of())));

fetchAcquiredRecords(sharePartition, memoryRecords(10, 10), 10);

@@ -3326,7 +3326,7 @@ public void testAcquisitionLockTimeoutCauseMaxDeliveryCountExceed() throws Inter
sharePartition.cachedState().get(10L).batchDeliveryCount() == 2 &&
sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask() == null,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> ACQUISITION_LOCK_NEVER_GOT_RELEASED);
() -> assertionFailedMessage(sharePartition, Map.of(10L, List.of())));
}

@Test
@@ -3350,7 +3350,7 @@ public void testAcquisitionLockTimeoutCauseSPSOMoveForward() throws InterruptedE
sharePartition.cachedState().get(0L).batchDeliveryCount() == 1 &&
sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask() == null,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> ACQUISITION_LOCK_NEVER_GOT_RELEASED);
() -> assertionFailedMessage(sharePartition, Map.of(0L, List.of())));

fetchAcquiredRecords(sharePartition, memoryRecords(5, 0), 5);

@@ -3384,7 +3384,7 @@ public void testAcquisitionLockTimeoutCauseSPSOMoveForward() throws InterruptedE
expectedOffsetStateMap.equals(sharePartition.cachedState().get(0L).offsetState());
},
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> ACQUISITION_LOCK_NEVER_GOT_RELEASED);
() -> assertionFailedMessage(sharePartition, Map.of(0L, List.of(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L))));

assertNull(sharePartition.cachedState().get(0L).offsetState().get(0L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(0L).offsetState().get(1L).acquisitionLockTimeoutTask());
@@ -3422,7 +3422,7 @@ public void testAcquisitionLockTimeoutCauseSPSOMoveForwardAndClearCachedState()
sharePartition.cachedState().get(0L).batchState() == RecordState.AVAILABLE &&
sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask() == null,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> ACQUISITION_LOCK_NEVER_GOT_RELEASED);
() -> assertionFailedMessage(sharePartition, Map.of(0L, List.of())));

fetchAcquiredRecords(sharePartition, memoryRecords(10, 0), 10);

@@ -3437,7 +3437,7 @@ public void testAcquisitionLockTimeoutCauseSPSOMoveForwardAndClearCachedState()
sharePartition.cachedState().isEmpty() &&
sharePartition.nextFetchOffset() == 10,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> ACQUISITION_LOCK_NEVER_GOT_RELEASED);
() -> assertionFailedMessage(sharePartition, Map.of()));
}

@Test
@@ -3459,7 +3459,7 @@ public void testAcknowledgeAfterAcquisitionLockTimeout() throws InterruptedExcep
sharePartition.cachedState().get(5L).batchState() == RecordState.AVAILABLE &&
sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask() == null,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> ACQUISITION_LOCK_NEVER_GOT_RELEASED);
() -> assertionFailedMessage(sharePartition, Map.of(5L, List.of())));

// Acknowledge with ACCEPT type should throw InvalidRecordStateException since they've been released due to acquisition lock timeout.
CompletableFuture<Void> ackResult = sharePartition.acknowledge(MEMBER_ID,
@@ -3525,7 +3525,7 @@ public void testAcquisitionLockAfterDifferentAcknowledges() throws InterruptedEx
expectedOffsetStateMap.equals(sharePartition.cachedState().get(5L).offsetState());
},
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> ACQUISITION_LOCK_NEVER_GOT_RELEASED);
() -> assertionFailedMessage(sharePartition, Map.of(5L, List.of(5L, 6L, 7L, 8L, 9L))));

assertNull(sharePartition.cachedState().get(5L).offsetState().get(5L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(5L).offsetState().get(6L).acquisitionLockTimeoutTask());
@@ -3563,7 +3563,7 @@ public void testAcquisitionLockOnBatchWithWriteShareGroupStateFailure() throws I
sharePartition.cachedState().get(5L).batchState() == RecordState.AVAILABLE &&
sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask() == null,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> ACQUISITION_LOCK_NEVER_GOT_RELEASED);
() -> assertionFailedMessage(sharePartition, Map.of(5L, List.of())));
}

@Test
@@ -3609,7 +3609,7 @@ public void testAcquisitionLockOnOffsetWithWriteShareGroupStateFailure() throws
expectedOffsetStateMap.equals(sharePartition.cachedState().get(5L).offsetState());
},
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> ACQUISITION_LOCK_NEVER_GOT_RELEASED);
() -> assertionFailedMessage(sharePartition, Map.of(5L, List.of(5L, 6L, 7L, 8L, 9L, 10L))));

assertNull(sharePartition.cachedState().get(5L).offsetState().get(5L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(5L).offsetState().get(6L).acquisitionLockTimeoutTask());
@@ -4939,7 +4939,7 @@ public void testAcquisitionLockTimeoutForBatchesPostStartOffsetMovement() throws
sharePartition.cachedState().get(35L).offsetState().equals(expectedOffsetStateMap3);
},
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> ACQUISITION_LOCK_NEVER_GOT_RELEASED);
() -> assertionFailedMessage(sharePartition, Map.of(5L, List.of(5L, 6L, 7L, 8L, 9L), 20L, List.of(20L, 21L, 22L, 23L, 24L), 25L, List.of(), 30L, List.of(), 35L, List.of(35L, 36L, 37L, 38L, 39L))));

assertEquals(EMPTY_MEMBER_ID, sharePartition.cachedState().get(10L).batchMemberId());
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(10L).batchState());
@@ -4973,7 +4973,7 @@ public void testAcquisitionLockTimeoutForBatchesPostStartOffsetMovementToStartOf
sharePartition.cachedState().get(10L).batchMemberId().equals(EMPTY_MEMBER_ID) &&
sharePartition.cachedState().get(10L).batchState() == RecordState.AVAILABLE,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> ACQUISITION_LOCK_NEVER_GOT_RELEASED);
() -> assertionFailedMessage(sharePartition, Map.of(5L, List.of(), 10L, List.of())));
}

@Test
@@ -5008,7 +5008,7 @@ public void testAcquisitionLockTimeoutForBatchesPostStartOffsetMovementToMiddleO
sharePartition.cachedState().get(5L).batchState() == RecordState.ARCHIVED;
},
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> ACQUISITION_LOCK_NEVER_GOT_RELEASED);
() -> assertionFailedMessage(sharePartition, Map.of(5L, List.of(), 10L, List.of(10L, 11L, 12L, 13L, 14L))));
}

@Test
@@ -5193,7 +5193,7 @@ public void testLsoMovementThenAcquisitionLockTimeoutThenAcknowledge() throws In
() -> sharePartition.nextFetchOffset() == 7 && sharePartition.cachedState().isEmpty() &&
sharePartition.startOffset() == 7 && sharePartition.endOffset() == 7,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> ACQUISITION_LOCK_NEVER_GOT_RELEASED);
() -> assertionFailedMessage(sharePartition, Map.of()));

fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 5);

@@ -5242,7 +5242,7 @@ public void testLsoMovementThenAcquisitionLockTimeoutThenAcknowledgeBatchLastOff
() -> sharePartition.nextFetchOffset() == 3 && sharePartition.cachedState().isEmpty() &&
sharePartition.startOffset() == 3 && sharePartition.endOffset() == 3,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> ACQUISITION_LOCK_NEVER_GOT_RELEASED);
() -> assertionFailedMessage(sharePartition, Map.of()));

fetchAcquiredRecords(sharePartition, memoryRecords(2, 3), 2);
fetchAcquiredRecords(sharePartition, memoryRecords(3, 5), 3);
@@ -6648,6 +6648,28 @@ public void testAcquireWhenBatchesRemovedForFetchOffsetForSameCachedBatch() {
});
});
}

private String assertionFailedMessage(SharePartition sharePartition, Map<Long, List<Long>> offsets) {
StringBuilder errorMessage = new StringBuilder(ACQUISITION_LOCK_NEVER_GOT_RELEASED + String.format(
" timer size: %d, next fetch offset: %d\n",
sharePartition.timer().size(),
sharePartition.nextFetchOffset()));
for (Map.Entry<Long, List<Long>> entry : offsets.entrySet()) {
if (entry.getValue() != null && !entry.getValue().isEmpty()) {
errorMessage.append(String.format("batch start offset: %d\n", entry.getKey()));
for (Long offset : entry.getValue()) {
errorMessage.append(String.format("\toffset: %d, offset state: %s, offset acquisition lock timeout task present: %b\n",
offset, sharePartition.cachedState().get(entry.getKey()).offsetState().get(offset).state().id(),
sharePartition.cachedState().get(entry.getKey()).offsetState().get(offset).acquisitionLockTimeoutTask() == null));
}
} else {
errorMessage.append(String.format("batch start offset: %d, batch state: %s, batch acquisition lock timeout task present: %b\n",
entry.getKey(), sharePartition.cachedState().get(entry.getKey()).batchState().id(),
sharePartition.cachedState().get(entry.getKey()).batchAcquisitionLockTimeoutTask() == null));
}
}
return errorMessage.toString();
}

private FetchPartitionData fetchPartitionData(Records records) {
return fetchPartitionData(records, 0);