diff --git a/crates/fluss/tests/integration/log_table.rs b/crates/fluss/tests/integration/log_table.rs index 8d7773de..ed90fd05 100644 --- a/crates/fluss/tests/integration/log_table.rs +++ b/crates/fluss/tests/integration/log_table.rs @@ -502,21 +502,32 @@ mod table_test { writer.flush().await.unwrap(); use arrow::array::Int32Array; - let batches = scanner.poll(Duration::from_secs(10)).await.unwrap(); - let all_ids: Vec = batches - .iter() - .flat_map(|b| { - let batch = b.batch(); - (0..batch.num_rows()).map(move |i| { - batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap() - .value(i) + + fn extract_ids(batches: &[fluss::record::ScanBatch]) -> Vec { + batches + .iter() + .flat_map(|b| { + let batch = b.batch(); + (0..batch.num_rows()).map(move |i| { + batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .value(i) + }) }) - }) - .collect(); + .collect() + } + + // poll may return partial results if not all batches are available yet, + // so we accumulate across multiple polls until we have the expected count. + let mut all_ids = Vec::new(); + let deadline = tokio::time::Instant::now() + Duration::from_secs(10); + while all_ids.len() < 6 && tokio::time::Instant::now() < deadline { + let batches = scanner.poll(Duration::from_secs(5)).await.unwrap(); + all_ids.extend(extract_ids(&batches)); + } // Test 2: Order should be preserved across multiple batches assert_eq!(all_ids, vec![1, 2, 3, 4, 5, 6]); @@ -529,21 +540,12 @@ mod table_test { .unwrap(); writer.flush().await.unwrap(); - let more = scanner.poll(Duration::from_secs(10)).await.unwrap(); - let new_ids: Vec = more - .iter() - .flat_map(|b| { - let batch = b.batch(); - (0..batch.num_rows()).map(move |i| { - batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap() - .value(i) - }) - }) - .collect(); + let mut new_ids = Vec::new(); + let deadline = tokio::time::Instant::now() + Duration::from_secs(10); + while new_ids.len() < 2 && tokio::time::Instant::now() < deadline { + let more = scanner.poll(Duration::from_secs(5)).await.unwrap(); + new_ids.extend(extract_ids(&more)); + } // Test 3: Subsequent polls should not return duplicate data (offset continuation) assert_eq!(new_ids, vec![7, 8]); @@ -552,21 +554,12 @@ mod table_test { // Server returns all records from start of batch, but client truncates to subscription offset let trunc_scanner = table.new_scan().create_record_batch_log_scanner().unwrap(); trunc_scanner.subscribe(0, 3).await.unwrap(); - let trunc_batches = trunc_scanner.poll(Duration::from_secs(10)).await.unwrap(); - let trunc_ids: Vec = trunc_batches - .iter() - .flat_map(|b| { - let batch = b.batch(); - (0..batch.num_rows()).map(move |i| { - batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap() - .value(i) - }) - }) - .collect(); + let mut trunc_ids = Vec::new(); + let deadline = tokio::time::Instant::now() + Duration::from_secs(10); + while trunc_ids.len() < 5 && tokio::time::Instant::now() < deadline { + let trunc_batches = trunc_scanner.poll(Duration::from_secs(5)).await.unwrap(); + trunc_ids.extend(extract_ids(&trunc_batches)); + } // Subscribing from offset 3 should return [4,5,6,7,8], not [1,2,3,4,5,6,7,8] assert_eq!(trunc_ids, vec![4, 5, 6, 7, 8]);