@@ -2912,7 +2912,7 @@ public void testAcquisitionLockForAcquiringSingleRecord() throws InterruptedExce
2912
2912
sharePartition .cachedState ().get (0L ).batchAcquisitionLockTimeoutTask () == null &&
2913
2913
sharePartition .timer ().size () == 0 ,
2914
2914
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS ,
2915
- () -> ACQUISITION_LOCK_NEVER_GOT_RELEASED );
2915
+ () -> assertionFailedMessage ( sharePartition , Map . of ( 0L , List . of ())) );
2916
2916
2917
2917
assertEquals (1 , sharePartitionMetrics .acquisitionLockTimeoutPerSec ().count ());
2918
2918
assertTrue (sharePartitionMetrics .acquisitionLockTimeoutPerSec ().meanRate () > 0 );
@@ -2938,7 +2938,7 @@ public void testAcquisitionLockForAcquiringMultipleRecords() throws InterruptedE
2938
2938
&& sharePartition .cachedState ().get (10L ).batchDeliveryCount () == 1
2939
2939
&& sharePartition .cachedState ().get (10L ).batchAcquisitionLockTimeoutTask () == null ,
2940
2940
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS ,
2941
- () -> ACQUISITION_LOCK_NEVER_GOT_RELEASED );
2941
+ () -> assertionFailedMessage ( sharePartition , Map . of ( 10L , List . of ())) );
2942
2942
2943
2943
assertEquals (5 , sharePartitionMetrics .acquisitionLockTimeoutPerSec ().count ());
2944
2944
assertTrue (sharePartitionMetrics .acquisitionLockTimeoutPerSec ().meanRate () > 0 );
@@ -2973,7 +2973,7 @@ public void testAcquisitionLockForAcquiringMultipleRecordsWithOverlapAndNewBatch
2973
2973
sharePartition .cachedState ().get (0L ).batchAcquisitionLockTimeoutTask () == null &&
2974
2974
sharePartition .cachedState ().get (5L ).batchAcquisitionLockTimeoutTask () == null ,
2975
2975
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS ,
2976
- () -> ACQUISITION_LOCK_NEVER_GOT_RELEASED );
2976
+ () -> assertionFailedMessage ( sharePartition , Map . of ( 0L , List . of (), 5L , List . of ())) );
2977
2977
2978
2978
assertEquals (10 , sharePartitionMetrics .acquisitionLockTimeoutPerSec ().count ());
2979
2979
assertTrue (sharePartitionMetrics .acquisitionLockTimeoutPerSec ().meanRate () > 0 );
@@ -2997,7 +2997,7 @@ public void testAcquisitionLockForAcquiringSameBatchAgain() throws InterruptedEx
2997
2997
sharePartition .nextFetchOffset () == 10 &&
2998
2998
sharePartition .cachedState ().get (10L ).batchState () == RecordState .AVAILABLE ,
2999
2999
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS ,
3000
- () -> ACQUISITION_LOCK_NEVER_GOT_RELEASED );
3000
+ () -> assertionFailedMessage ( sharePartition , Map . of ( 10L , List . of ())) );
3001
3001
3002
3002
// Acquire the same batch again.
3003
3003
fetchAcquiredRecords (sharePartition , memoryRecords (5 , 10 ), 5 );
@@ -3036,7 +3036,7 @@ public void testAcquisitionLockOnAcknowledgingSingleRecordBatch() throws Interru
3036
3036
sharePartition .cachedState ().get (0L ).batchDeliveryCount () == 1 &&
3037
3037
sharePartition .cachedState ().get (0L ).batchAcquisitionLockTimeoutTask () == null ,
3038
3038
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS ,
3039
- () -> ACQUISITION_LOCK_NEVER_GOT_RELEASED );
3039
+ () -> assertionFailedMessage ( sharePartition , Map . of ( 0L , List . of ())) );
3040
3040
}
3041
3041
3042
3042
@ Test
@@ -3065,7 +3065,7 @@ public void testAcquisitionLockOnAcknowledgingMultipleRecordBatch() throws Inter
3065
3065
sharePartition .cachedState ().get (5L ).batchDeliveryCount () == 1 &&
3066
3066
sharePartition .cachedState ().get (5L ).batchAcquisitionLockTimeoutTask () == null ,
3067
3067
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS ,
3068
- () -> ACQUISITION_LOCK_NEVER_GOT_RELEASED );
3068
+ () -> assertionFailedMessage ( sharePartition , Map . of ( 5L , List . of ())) );
3069
3069
}
3070
3070
3071
3071
@ Test
@@ -3115,7 +3115,7 @@ public void testAcquisitionLockOnAcknowledgingMultipleRecordBatchWithGapOffsets(
3115
3115
sharePartition .cachedState ().get (5L ).batchAcquisitionLockTimeoutTask () == null &&
3116
3116
sharePartition .cachedState ().get (10L ).batchAcquisitionLockTimeoutTask () == null ,
3117
3117
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS ,
3118
- () -> ACQUISITION_LOCK_NEVER_GOT_RELEASED );
3118
+ () -> assertionFailedMessage ( sharePartition , Map . of ( 1L , List . of (), 5L , List . of (), 10L , List . of ())) );
3119
3119
3120
3120
assertEquals (RecordState .AVAILABLE , sharePartition .cachedState ().get (1L ).batchState ());
3121
3121
assertEquals (RecordState .ACKNOWLEDGED , sharePartition .cachedState ().get (5L ).batchState ());
@@ -3142,7 +3142,7 @@ public void testAcquisitionLockForAcquiringSubsetBatchAgain() throws Interrupted
3142
3142
sharePartition .cachedState ().get (10L ).batchState () == RecordState .AVAILABLE &&
3143
3143
sharePartition .cachedState ().get (10L ).batchAcquisitionLockTimeoutTask () == null ,
3144
3144
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS ,
3145
- () -> ACQUISITION_LOCK_NEVER_GOT_RELEASED );
3145
+ () -> assertionFailedMessage ( sharePartition , Map . of ( 10L , List . of ())) );
3146
3146
3147
3147
// Acquire subset of records again.
3148
3148
fetchAcquiredRecords (sharePartition , memoryRecords (3 , 12 ), 3 );
@@ -3176,7 +3176,7 @@ public void testAcquisitionLockForAcquiringSubsetBatchAgain() throws Interrupted
3176
3176
expectedOffsetStateMap .equals (sharePartition .cachedState ().get (10L ).offsetState ());
3177
3177
},
3178
3178
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS ,
3179
- () -> ACQUISITION_LOCK_NEVER_GOT_RELEASED );
3179
+ () -> assertionFailedMessage ( sharePartition , Map . of ( 10L , List . of ( 10L , 11L , 12L , 13L , 14L , 15L , 16L , 17L ))) );
3180
3180
assertNull (sharePartition .cachedState ().get (10L ).offsetState ().get (10L ).acquisitionLockTimeoutTask ());
3181
3181
assertNull (sharePartition .cachedState ().get (10L ).offsetState ().get (11L ).acquisitionLockTimeoutTask ());
3182
3182
assertNull (sharePartition .cachedState ().get (10L ).offsetState ().get (12L ).acquisitionLockTimeoutTask ());
@@ -3264,7 +3264,7 @@ public void testAcquisitionLockOnAcknowledgingMultipleSubsetRecordBatchWithGapOf
3264
3264
expectedOffsetStateMap2 .equals (sharePartition .cachedState ().get (10L ).offsetState ());
3265
3265
},
3266
3266
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS ,
3267
- () -> ACQUISITION_LOCK_NEVER_GOT_RELEASED );
3267
+ () -> assertionFailedMessage ( sharePartition , Map . of ( 5L , List . of ( 5L , 6L ), 10L , List . of ( 10L , 11L , 12L , 13L , 14L , 15L , 16L , 17L , 18L , 19L , 20L ))) );
3268
3268
3269
3269
assertNull (sharePartition .cachedState ().get (5L ).offsetState ().get (5L ).acquisitionLockTimeoutTask ());
3270
3270
assertNull (sharePartition .cachedState ().get (5L ).offsetState ().get (6L ).acquisitionLockTimeoutTask ());
@@ -3308,7 +3308,7 @@ public void testAcquisitionLockTimeoutCauseMaxDeliveryCountExceed() throws Inter
3308
3308
sharePartition .cachedState ().get (10L ).batchDeliveryCount () == 1 &&
3309
3309
sharePartition .cachedState ().get (10L ).batchAcquisitionLockTimeoutTask () == null ,
3310
3310
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS ,
3311
- () -> ACQUISITION_LOCK_NEVER_GOT_RELEASED );
3311
+ () -> assertionFailedMessage ( sharePartition , Map . of ( 10L , List . of ())) );
3312
3312
3313
3313
fetchAcquiredRecords (sharePartition , memoryRecords (10 , 10 ), 10 );
3314
3314
@@ -3326,7 +3326,7 @@ public void testAcquisitionLockTimeoutCauseMaxDeliveryCountExceed() throws Inter
3326
3326
sharePartition .cachedState ().get (10L ).batchDeliveryCount () == 2 &&
3327
3327
sharePartition .cachedState ().get (10L ).batchAcquisitionLockTimeoutTask () == null ,
3328
3328
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS ,
3329
- () -> ACQUISITION_LOCK_NEVER_GOT_RELEASED );
3329
+ () -> assertionFailedMessage ( sharePartition , Map . of ( 10L , List . of ())) );
3330
3330
}
3331
3331
3332
3332
@ Test
@@ -3350,7 +3350,7 @@ public void testAcquisitionLockTimeoutCauseSPSOMoveForward() throws InterruptedE
3350
3350
sharePartition .cachedState ().get (0L ).batchDeliveryCount () == 1 &&
3351
3351
sharePartition .cachedState ().get (0L ).batchAcquisitionLockTimeoutTask () == null ,
3352
3352
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS ,
3353
- () -> ACQUISITION_LOCK_NEVER_GOT_RELEASED );
3353
+ () -> assertionFailedMessage ( sharePartition , Map . of ( 0L , List . of ())) );
3354
3354
3355
3355
fetchAcquiredRecords (sharePartition , memoryRecords (5 , 0 ), 5 );
3356
3356
@@ -3384,7 +3384,7 @@ public void testAcquisitionLockTimeoutCauseSPSOMoveForward() throws InterruptedE
3384
3384
expectedOffsetStateMap .equals (sharePartition .cachedState ().get (0L ).offsetState ());
3385
3385
},
3386
3386
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS ,
3387
- () -> ACQUISITION_LOCK_NEVER_GOT_RELEASED );
3387
+ () -> assertionFailedMessage ( sharePartition , Map . of ( 0L , List . of ( 0L , 1L , 2L , 3L , 4L , 5L , 6L , 7L , 8L , 9L ))) );
3388
3388
3389
3389
assertNull (sharePartition .cachedState ().get (0L ).offsetState ().get (0L ).acquisitionLockTimeoutTask ());
3390
3390
assertNull (sharePartition .cachedState ().get (0L ).offsetState ().get (1L ).acquisitionLockTimeoutTask ());
@@ -3422,7 +3422,7 @@ public void testAcquisitionLockTimeoutCauseSPSOMoveForwardAndClearCachedState()
3422
3422
sharePartition .cachedState ().get (0L ).batchState () == RecordState .AVAILABLE &&
3423
3423
sharePartition .cachedState ().get (0L ).batchAcquisitionLockTimeoutTask () == null ,
3424
3424
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS ,
3425
- () -> ACQUISITION_LOCK_NEVER_GOT_RELEASED );
3425
+ () -> assertionFailedMessage ( sharePartition , Map . of ( 0L , List . of ())) );
3426
3426
3427
3427
fetchAcquiredRecords (sharePartition , memoryRecords (10 , 0 ), 10 );
3428
3428
@@ -3437,7 +3437,7 @@ public void testAcquisitionLockTimeoutCauseSPSOMoveForwardAndClearCachedState()
3437
3437
sharePartition .cachedState ().isEmpty () &&
3438
3438
sharePartition .nextFetchOffset () == 10 ,
3439
3439
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS ,
3440
- () -> ACQUISITION_LOCK_NEVER_GOT_RELEASED );
3440
+ () -> assertionFailedMessage ( sharePartition , Map . of ()) );
3441
3441
}
3442
3442
3443
3443
@ Test
@@ -3459,7 +3459,7 @@ public void testAcknowledgeAfterAcquisitionLockTimeout() throws InterruptedExcep
3459
3459
sharePartition .cachedState ().get (5L ).batchState () == RecordState .AVAILABLE &&
3460
3460
sharePartition .cachedState ().get (5L ).batchAcquisitionLockTimeoutTask () == null ,
3461
3461
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS ,
3462
- () -> ACQUISITION_LOCK_NEVER_GOT_RELEASED );
3462
+ () -> assertionFailedMessage ( sharePartition , Map . of ( 5L , List . of ())) );
3463
3463
3464
3464
// Acknowledge with ACCEPT type should throw InvalidRecordStateException since they've been released due to acquisition lock timeout.
3465
3465
CompletableFuture <Void > ackResult = sharePartition .acknowledge (MEMBER_ID ,
@@ -3525,7 +3525,7 @@ public void testAcquisitionLockAfterDifferentAcknowledges() throws InterruptedEx
3525
3525
expectedOffsetStateMap .equals (sharePartition .cachedState ().get (5L ).offsetState ());
3526
3526
},
3527
3527
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS ,
3528
- () -> ACQUISITION_LOCK_NEVER_GOT_RELEASED );
3528
+ () -> assertionFailedMessage ( sharePartition , Map . of ( 5L , List . of ( 5L , 6L , 7L , 8L , 9L ))) );
3529
3529
3530
3530
assertNull (sharePartition .cachedState ().get (5L ).offsetState ().get (5L ).acquisitionLockTimeoutTask ());
3531
3531
assertNull (sharePartition .cachedState ().get (5L ).offsetState ().get (6L ).acquisitionLockTimeoutTask ());
@@ -3563,7 +3563,7 @@ public void testAcquisitionLockOnBatchWithWriteShareGroupStateFailure() throws I
3563
3563
sharePartition .cachedState ().get (5L ).batchState () == RecordState .AVAILABLE &&
3564
3564
sharePartition .cachedState ().get (5L ).batchAcquisitionLockTimeoutTask () == null ,
3565
3565
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS ,
3566
- () -> ACQUISITION_LOCK_NEVER_GOT_RELEASED );
3566
+ () -> assertionFailedMessage ( sharePartition , Map . of ( 5L , List . of ())) );
3567
3567
}
3568
3568
3569
3569
@ Test
@@ -3609,7 +3609,7 @@ public void testAcquisitionLockOnOffsetWithWriteShareGroupStateFailure() throws
3609
3609
expectedOffsetStateMap .equals (sharePartition .cachedState ().get (5L ).offsetState ());
3610
3610
},
3611
3611
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS ,
3612
- () -> ACQUISITION_LOCK_NEVER_GOT_RELEASED );
3612
+ () -> assertionFailedMessage ( sharePartition , Map . of ( 5L , List . of ( 5L , 6L , 7L , 8L , 9L , 10L ))) );
3613
3613
3614
3614
assertNull (sharePartition .cachedState ().get (5L ).offsetState ().get (5L ).acquisitionLockTimeoutTask ());
3615
3615
assertNull (sharePartition .cachedState ().get (5L ).offsetState ().get (6L ).acquisitionLockTimeoutTask ());
@@ -4939,7 +4939,7 @@ public void testAcquisitionLockTimeoutForBatchesPostStartOffsetMovement() throws
4939
4939
sharePartition .cachedState ().get (35L ).offsetState ().equals (expectedOffsetStateMap3 );
4940
4940
},
4941
4941
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS ,
4942
- () -> ACQUISITION_LOCK_NEVER_GOT_RELEASED );
4942
+ () -> assertionFailedMessage ( sharePartition , Map . of ( 5L , List . of ( 5L , 6L , 7L , 8L , 9L ), 20L , List . of ( 20L , 21L , 22L , 23L , 24L ), 25L , List . of (), 30L , List . of (), 35L , List . of ( 35L , 36L , 37L , 38L , 39L ))) );
4943
4943
4944
4944
assertEquals (EMPTY_MEMBER_ID , sharePartition .cachedState ().get (10L ).batchMemberId ());
4945
4945
assertEquals (RecordState .ARCHIVED , sharePartition .cachedState ().get (10L ).batchState ());
@@ -4973,7 +4973,7 @@ public void testAcquisitionLockTimeoutForBatchesPostStartOffsetMovementToStartOf
4973
4973
sharePartition .cachedState ().get (10L ).batchMemberId ().equals (EMPTY_MEMBER_ID ) &&
4974
4974
sharePartition .cachedState ().get (10L ).batchState () == RecordState .AVAILABLE ,
4975
4975
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS ,
4976
- () -> ACQUISITION_LOCK_NEVER_GOT_RELEASED );
4976
+ () -> assertionFailedMessage ( sharePartition , Map . of ( 5L , List . of (), 10L , List . of ())) );
4977
4977
}
4978
4978
4979
4979
@ Test
@@ -5008,7 +5008,7 @@ public void testAcquisitionLockTimeoutForBatchesPostStartOffsetMovementToMiddleO
5008
5008
sharePartition .cachedState ().get (5L ).batchState () == RecordState .ARCHIVED ;
5009
5009
},
5010
5010
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS ,
5011
- () -> ACQUISITION_LOCK_NEVER_GOT_RELEASED );
5011
+ () -> assertionFailedMessage ( sharePartition , Map . of ( 5L , List . of (), 10L , List . of ( 10L , 11L , 12L , 13L , 14L ))) );
5012
5012
}
5013
5013
5014
5014
@ Test
@@ -5193,7 +5193,7 @@ public void testLsoMovementThenAcquisitionLockTimeoutThenAcknowledge() throws In
5193
5193
() -> sharePartition .nextFetchOffset () == 7 && sharePartition .cachedState ().isEmpty () &&
5194
5194
sharePartition .startOffset () == 7 && sharePartition .endOffset () == 7 ,
5195
5195
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS ,
5196
- () -> ACQUISITION_LOCK_NEVER_GOT_RELEASED );
5196
+ () -> assertionFailedMessage ( sharePartition , Map . of ()) );
5197
5197
5198
5198
fetchAcquiredRecords (sharePartition , memoryRecords (5 , 10 ), 5 );
5199
5199
@@ -5242,7 +5242,7 @@ public void testLsoMovementThenAcquisitionLockTimeoutThenAcknowledgeBatchLastOff
5242
5242
() -> sharePartition .nextFetchOffset () == 3 && sharePartition .cachedState ().isEmpty () &&
5243
5243
sharePartition .startOffset () == 3 && sharePartition .endOffset () == 3 ,
5244
5244
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS ,
5245
- () -> ACQUISITION_LOCK_NEVER_GOT_RELEASED );
5245
+ () -> assertionFailedMessage ( sharePartition , Map . of ()) );
5246
5246
5247
5247
fetchAcquiredRecords (sharePartition , memoryRecords (2 , 3 ), 2 );
5248
5248
fetchAcquiredRecords (sharePartition , memoryRecords (3 , 5 ), 3 );
@@ -6648,6 +6648,28 @@ public void testAcquireWhenBatchesRemovedForFetchOffsetForSameCachedBatch() {
6648
6648
});
6649
6649
});
6650
6650
}
6651
+
6652
+ private String assertionFailedMessage (SharePartition sharePartition , Map <Long , List <Long >> offsets ) {
6653
+ StringBuilder errorMessage = new StringBuilder (ACQUISITION_LOCK_NEVER_GOT_RELEASED + String .format (
6654
+ " timer size: %d, next fetch offset: %d\n " ,
6655
+ sharePartition .timer ().size (),
6656
+ sharePartition .nextFetchOffset ()));
6657
+ for (Map .Entry <Long , List <Long >> entry : offsets .entrySet ()) {
6658
+ if (entry .getValue () != null && !entry .getValue ().isEmpty ()) {
6659
+ errorMessage .append (String .format ("batch start offset: %d\n " , entry .getKey ()));
6660
+ for (Long offset : entry .getValue ()) {
6661
+ errorMessage .append (String .format ("\t offset: %d, offset state: %s, offset acquisition lock timeout task present: %b\n " ,
6662
+ offset , sharePartition .cachedState ().get (entry .getKey ()).offsetState ().get (offset ).state ().id (),
6663
+ sharePartition .cachedState ().get (entry .getKey ()).offsetState ().get (offset ).acquisitionLockTimeoutTask () == null ));
6664
+ }
6665
+ } else {
6666
+ errorMessage .append (String .format ("batch start offset: %d, batch state: %s, batch acquisition lock timeout task present: %b\n " ,
6667
+ entry .getKey (), sharePartition .cachedState ().get (entry .getKey ()).batchState ().id (),
6668
+ sharePartition .cachedState ().get (entry .getKey ()).batchAcquisitionLockTimeoutTask () == null ));
6669
+ }
6670
+ }
6671
+ return errorMessage .toString ();
6672
+ }
6651
6673
6652
6674
private FetchPartitionData fetchPartitionData (Records records ) {
6653
6675
return fetchPartitionData (records , 0 );
0 commit comments