[test] Add Rust integration coverage for RecordBatchLogReader bounded reads.#559
Conversation
|
@fresh-borzoni @leekeiabstraction Could you please help review this PR when you have time? |
There was a problem hiding this comment.
@slfan1989 Ty for the PR, great addition, LGTM overall, couple of minor comments 👍
Let's move this to a separate file for testing RecordBatchLogReader feature and then add some additional scenarios:
*until_offsets_with_empty_range (stop == start -> no batches)
- until_offsets_past_end_of_log (graceful finish past actual end)
- until_offsets_multi_bucket (multiple buckets in the HashMap)
also extract_ids_from_batches overlaps with extract_ids in test_project test, mb we wish to factor this out to utils and reuse.
|
@charlesdong1991 Can you also take a look, please? |
There was a problem hiding this comment.
Pull request overview
Adds Rust integration tests to cover RecordBatchLogReader bounded-read semantics end-to-end (closing #558), complementing existing unit/Python coverage.
Changes:
- Added an integration test verifying
RecordBatchLogReader::new_until_offsetsstops before an explicit upper offset. - Added an integration test verifying
RecordBatchLogReader::new_until_latestreturns all records present at reader creation for partitioned log tables. - Updated an internal comment to point to the new Rust integration coverage location.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
crates/fluss/tests/integration/log_table.rs |
Adds two new Rust integration tests and a shared helper for validating bounded reader behavior. |
crates/fluss/src/client/table/reader.rs |
Updates an in-code comment to reference the new Rust integration coverage. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| fn extract_ids_from_batches(batches: &[ScanBatch]) -> Vec<i32> { | ||
| batches | ||
| .iter() | ||
| .flat_map(|b| { | ||
| let batch = b.batch(); | ||
| (0..batch.num_rows()).map(move |i| { | ||
| batch | ||
| .column(0) | ||
| .as_any() | ||
| .downcast_ref::<Int32Array>() | ||
| .expect("id column should be Int32") | ||
| .value(i) | ||
| }) | ||
| }) | ||
| .collect() | ||
| } |
@fresh-borzoni Thank you for the review and suggestions! Addressed most of the comments:
I left The current Supporting graceful completion would require snapshot-style semantics by querying latest offsets at reader creation, which needs I think that should be handled separately from this test-only PR. Thanks again! |
charlesdong1991
left a comment
There was a problem hiding this comment.
thanks for the PR, overall very nice, just minor comments!
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn until_latest_reads_partitioned_table() { |
There was a problem hiding this comment.
query_latest_offsets should be applied on both partitioned and non-partitioned tables, can we check if non-partition one has integration test? and if not, add it?
There was a problem hiding this comment.
Thanks for pointing this out!
Added until_latest_reads_non_partitioned_table, so new_until_latest now has integration coverage for both non-partitioned and partitioned tables.
| .expect("Failed to list partition infos") | ||
| { | ||
| scanner | ||
| .subscribe_partition(partition.get_partition_id(), 0, EARLIEST_OFFSET) |
There was a problem hiding this comment.
i assume we use 0 here because default bucket number if 1, can you add an inline comment here for future reference if we want to have multiple buckets?
There was a problem hiding this comment.
Added an inline comment explaining that bucket 0 is used because the table uses the default single-bucket layout, and that future multi-bucket coverage should subscribe all buckets per partition.
| */ | ||
|
|
||
| #[cfg(test)] | ||
| mod reader_test { |
There was a problem hiding this comment.
until_offsets_past_end_of_log (graceful finish past actual end)
i think what @fresh-borzoni meant might be the scenario where stop_at past current end, and future records arrive and cross it, which i think doesn't need API change, basically to cover wait and then resume behaviour of the loop.
API change probably is needed if we want immediate graceful completion against arbtrary offsets
There was a problem hiding this comment.
yes, I should have been more specific. Sorry for the confusion
There was a problem hiding this comment.
Got it, that makes sense. I interpreted past_end_of_log as immediate completion when the requested stop offset is beyond the current log end, which would require querying/clamping latest offsets.
I'll add a test for the wait-and-resume scenario instead: create a reader with a stop offset past the current end, start collecting, append more records later so the log crosses the stop offset, and verify the reader resumes and finishes.
|
@slfan1989 Ty for the changes, LGTM |
@fresh-borzoni Thanks for the review! I have rebased the branch on the latest |
fresh-borzoni
left a comment
There was a problem hiding this comment.
@slfan1989 LGTM 👍
@charlesdong1991 Do you have additional comments?
charlesdong1991
left a comment
There was a problem hiding this comment.
thanks for the changes, very nice! 👍
|
@slfan1989 Thank you for the contribution, merged 👍 |
Purpose
Linked issue: close #558
Add Rust-level integration coverage for
RecordBatchLogReaderbounded reads.RecordBatchLogReaderalready has unit coverage for batch filtering and Python binding coverage forto_arrow_batch_readerguard/drop behavior, but it lacks Rust end-to-end integration coverage for:new_until_offsetsstopping semanticsnew_until_latestwith partitioned log tablesThis PR adds those missing integration tests.
Brief change log
Added a Rust integration test for
RecordBatchLogReader::new_until_offsets.Added a Rust integration test for
RecordBatchLogReader::new_until_lateston partitioned tables.USandEUpartitions.Updated the comment in
RecordBatchLogReaderto point to the new Rust integration coverage.Tests
Verified locally.
API and Format
No API or storage format changes.
Documentation
No user-facing documentation changes.