9999import java .io .File ;
100100import java .io .IOException ;
101101import java .nio .file .Path ;
102+ import java .time .Duration ;
102103import java .util .ArrayList ;
103104import java .util .Arrays ;
104105import java .util .Collections ;
157158import static org .apache .fluss .testutils .DataTestUtils .genMemoryLogRecordsByObject ;
158159import static org .apache .fluss .testutils .DataTestUtils .getKeyValuePairs ;
159160import static org .apache .fluss .testutils .DataTestUtils .row ;
161+ import static org .apache .fluss .testutils .common .CommonTestUtils .retry ;
160162import static org .assertj .core .api .Assertions .assertThat ;
161163import static org .assertj .core .api .Assertions .assertThatThrownBy ;
162164
@@ -638,6 +640,9 @@ void testPutKvWithOutOfBatchSequence() throws Exception {
638640 future ::complete );
639641 assertThat (future .get ()).containsOnly (new PutKvResultForBucket (tb , 5 ));
640642
643+ // Wait for async flush so HW advances before reading CDC log.
644+ waitForFlush (tb );
645+
641646 // 2. get the cdc-log of this batch (data1).
642647 List <Tuple2 <ChangeType , Object []>> expectedLogForData1 =
643648 Arrays .asList (
@@ -726,6 +731,9 @@ void testPutKvWithOutOfBatchSequence() throws Exception {
726731 future ::complete );
727732 assertThat (future .get ()).containsOnly (new PutKvResultForBucket (tb , 8 ));
728733
734+ // Wait for async flush so HW advances.
735+ waitForFlush (tb );
736+
729737 // 6. get the cdc-log of this batch (data2).
730738 List <Tuple2 <ChangeType , Object []>> expectedLogForData2 =
731739 Arrays .asList (
@@ -789,6 +797,9 @@ void testPutKvWithDeleteNonExistsKey() throws Exception {
789797 future ::complete );
790798 assertThat (future .get ()).containsOnly (new PutKvResultForBucket (tb , 18 ));
791799
800+ // Wait for async flush so HW advances before reading CDC log.
801+ waitForFlush (tb );
802+
792803 // 2. get the cdc-log of these batches.
793804 CompletableFuture <Map <TableBucket , FetchLogResultForBucket >> future1 =
794805 new CompletableFuture <>();
@@ -851,6 +862,9 @@ void testLookup() throws Exception {
851862 future1 ::complete );
852863 assertThat (future1 .get ()).containsOnly (new PutKvResultForBucket (tb , 8 ));
853864
865+ // Wait for async flush to complete so data is visible in RocksDB.
866+ waitForFlush (tb );
867+
854868 // second lookup key in table, key = 1, value = 1, "a1".
855869 Object [] value1 = DATA_1_WITH_KEY_AND_VALUE .get (3 ).f1 ;
856870 byte [] value1Bytes =
@@ -891,6 +905,10 @@ void testLookupWithInsertIfNotExists() throws Exception {
891905
892906 List <byte []> inserted = lookupWithInsert (tb , Arrays .asList (key100 , key200 )).lookupValues ();
893907 assertThat (inserted ).hasSize (2 ).allMatch (Objects ::nonNull );
908+
909+ // Wait for async flush so data is visible via plain lookup.
910+ waitForFlush (tb );
911+
894912 verifyLookup (tb , key100 , inserted .get (0 ));
895913 verifyLookup (tb , key200 , inserted .get (1 ));
896914
@@ -916,6 +934,10 @@ void testLookupWithInsertIfNotExists() throws Exception {
916934 List <byte []> mixed = lookupWithInsert (tb , Arrays .asList (key100 , key300 )).lookupValues ();
917935 assertThat (mixed .get (0 )).isEqualTo (inserted .get (0 )); // existing
918936 assertThat (mixed .get (1 )).isNotNull (); // newly inserted
937+
938+ // Wait for async flush so key300 is visible via plain lookup.
939+ waitForFlush (tb );
940+
919941 verifyLookup (tb , key300 , mixed .get (1 ));
920942
921943 // Verify that only one new log record was created for key300
@@ -966,6 +988,9 @@ void testLookupWithInsertIfNotExistsAutoIncrement() throws Exception {
966988 InternalRow row3 = valueDecoder .decodeValue (mixed .get (1 )).row ;
967989 assertThat (row3 .getLong (2 )).isEqualTo (3L ); // continues sequence
968990
991+ // Wait for async flush so HW advances and log is readable.
992+ waitForFlush (tb );
993+
969994 FetchLogResultForBucket logResult = fetchLog (tb , 0L );
970995 assertThat (logResult .getHighWatermark ()).isEqualTo (3L );
971996 LogRecords records = logResult .records ();
@@ -1064,6 +1089,9 @@ void testConcurrentLookupWithInsertIfNotExistsAutoIncrement() throws Exception {
10641089 // Values should be 1, 2, 3 (in any order due to concurrency)
10651090 assertThat (autoIncrementValues ).containsExactlyInAnyOrder (1L , 2L , 3L );
10661091
1092+ // Wait for async flush so HW advances.
1093+ waitForFlush (tb );
1094+
10671095 // Verify exactly 3 changelog entries were written (one per unique key)
10681096 FetchLogResultForBucket logResult = fetchLog (tb , 0L );
10691097 // Only the first upsert for a given primary key generates changelog records. Subsequent
@@ -1109,6 +1137,10 @@ void testLookupWithInsertIfNotExistsMultiBucket() throws Exception {
11091137 byte [] value0 = inserted .get (tb0 ).lookupValues ().get (0 );
11101138 byte [] value1 = inserted .get (tb1 ).lookupValues ().get (0 );
11111139
1140+ // Wait for async flush so data is visible via plain lookup.
1141+ waitForFlush (tb0 );
1142+ waitForFlush (tb1 );
1143+
11121144 // Verify inserted values via lookup
11131145 verifyLookup (tb0 , key0 , value0 );
11141146 verifyLookup (tb1 , key1 , value1 );
@@ -1187,6 +1219,10 @@ void testPrefixLookup() throws Exception {
11871219 PUT_KV_VERSION ,
11881220 future ::complete );
11891221 assertThat (future .get ()).containsOnly (new PutKvResultForBucket (tb , 4 ));
1222+
1223+ // Wait for async flush so data is visible for prefix lookup.
1224+ waitForFlush (tb );
1225+
11901226 // second prefix lookup in table, prefix key = (1, "a").
11911227 Object [] prefixKey1 = new Object [] {1 , "a" };
11921228 CompactedKeyEncoder keyEncoder = new CompactedKeyEncoder (rowType , new int [] {0 , 1 });
@@ -1269,6 +1305,9 @@ void testLimitScanPrimaryKeyTable() throws Exception {
12691305 future1 ::complete );
12701306 assertThat (future1 .get ()).containsOnly (new PutKvResultForBucket (tb , 8 ));
12711307
1308+ // Wait for async flush so data is visible in RocksDB for limit scan.
1309+ waitForFlush (tb );
1310+
12721311 // second, limit scan from table with limit
12731312 builder .append (DEFAULT_SCHEMA_ID , compactedRow (DATA1_ROW_TYPE , new Object [] {1 , "a1" }));
12741313 future = new CompletableFuture <>();
@@ -2585,4 +2624,18 @@ void testStopReplicaSweepsOrphanDirsForNoneReplica() throws Exception {
25852624 // LogManager should no longer hold the log.
25862625 assertThat (logManager .getLog (tb )).isNotPresent ();
25872626 }
2627+
2628+ /**
2629+ * Wait for the async KV flush to complete and high watermark to advance to at least the current
2630+ * log end offset. This is needed because the KV flush scheduler runs on background threads.
2631+ */
2632+ private void waitForFlush (TableBucket tb ) {
2633+ Replica replica = replicaManager .getReplicaOrException (tb );
2634+ long expectedOffset = replica .getLocalLogEndOffset ();
2635+ retry (
2636+ Duration .ofSeconds (10 ),
2637+ () ->
2638+ assertThat (replica .getLogHighWatermark ())
2639+ .isGreaterThanOrEqualTo (expectedOffset ));
2640+ }
25882641}
0 commit comments