support produce partitioned table#228
Conversation
There was a problem hiding this comment.
Pull request overview
Adds client-side support for producing/appending records to partitioned tables (issue #202) by routing writes to the correct physical partition path and updating the write pipeline to accept generic InternalRow inputs.
Changes:
- Route append writes for partitioned tables by deriving
PhysicalTablePathfrom row partition values (and factoring shared logic intopartition_getter). - Generalize log write/Arrow building from
GenericRowto&dyn InternalRow(FieldGetter-based extraction) and update writer APIs accordingly. - Update integration tests, examples, and Python/C++ bindings to the new append writer API and add coverage for partitioned-table append.
Reviewed changes
Copilot reviewed 23 out of 23 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| crates/fluss/tests/test_fluss.rs | Switches integration test module inclusion to log_table. |
| crates/fluss/tests/integration/utils.rs | Adds shared create_partitions helper for partitioned-table tests. |
| crates/fluss/tests/integration/table_remote_scan.rs | Updates append writer creation and row appends to new API shape. |
| crates/fluss/tests/integration/log_table.rs | Updates writer creation, row appends, and adds a partitioned-table append integration test. |
| crates/fluss/tests/integration/kv_table.rs | Reuses new create_partitions helper instead of inline partition creation logic. |
| crates/fluss/src/row/mod.rs | Updates InternalRow trait bounds (now Send + Sync). |
| crates/fluss/src/row/compacted/compacted_row.rs | Reorders/moves InternalRow method impls and removes an allow attribute. |
| crates/fluss/src/row/column.rs | Reorders get_char/get_string within InternalRow impl. |
| crates/fluss/src/record/arrow.rs | Changes Arrow row appending to work with &dyn InternalRow and uses FieldGetter for extraction. |
| crates/fluss/src/client/write/sender.rs | Adjusts log message formatting for partition-metadata update handling. |
| crates/fluss/src/client/write/mod.rs | Changes log write record variant to hold &dyn InternalRow and updates append record creation APIs. |
| crates/fluss/src/client/write/batch.rs | Updates tests/callers to pass rows by reference into WriteRecord::for_append. |
| crates/fluss/src/client/write/accumulator.rs | Updates tests/callers to pass rows by reference into WriteRecord::for_append. |
| crates/fluss/src/client/table/writer.rs | Updates writer traits and introduces (currently partial) AppendWriterImpl type definition. |
| crates/fluss/src/client/table/upsert.rs | Centralizes partition physical path computation via partition_getter::get_physical_path. |
| crates/fluss/src/client/table/scanner.rs | Updates tests/callers to pass rows by reference into WriteRecord::for_append. |
| crates/fluss/src/client/table/partition_getter.rs | Adds get_physical_path helper for partitioned vs non-partitioned path routing. |
| crates/fluss/src/client/table/mod.rs | Adjusts exports to expose AppendWriterImpl and re-export writer traits. |
| crates/fluss/src/client/table/log_fetch_buffer.rs | Updates tests/callers to pass rows by reference into WriteRecord::for_append. |
| crates/fluss/src/client/table/append.rs | Implements partition-aware append writer that derives PhysicalTablePath per row/batch. |
| crates/examples/src/example_table.rs | Updates example to new writer creation and append(&row) usage; imports required traits. |
| bindings/python/src/table.rs | Updates Python binding to use AppendWriterImpl and the new writer creation API. |
| bindings/cpp/src/lib.rs | Updates C++ binding to use AppendWriterImpl and handle create_writer() returning Result. |
Comments suppressed due to low confidence (1)
crates/fluss/tests/integration/log_table.rs:1010
- This test uses a fixed
sleep(Duration::from_secs(2))to wait for partitions. Fixed sleeps tend to make CI slower and can still be flaky on slow environments. Consider pollingadmin.list_partition_infos(...)(or similar) until the expected partitions appear (with a timeout), instead of sleeping a fixed amount.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
51937d6 to
5e22f82
Compare
5e22f82 to
9cbdb7e
Compare
|
@leekeiabstraction @fresh-borzoni Could you please help review when you got some time? |
leekeiabstraction
left a comment
There was a problem hiding this comment.
Thank you for the PR! Left some comments PTAL
| /// Callers must ensure all rows in the batch belong to the same partition. | ||
| pub async fn append_arrow_batch(&self, batch: RecordBatch) -> Result<()> { | ||
| let physical_table_path = if self.partition_getter.is_some() && batch.num_rows() > 0 { | ||
| let first_row = ColumnarRow::new(Arc::new(batch.clone())); |
There was a problem hiding this comment.
Cloning the batch seems relatively expensive for getting physical table path, are there other alternatives?
There was a problem hiding this comment.
actually it's not expensive for cloning the batch. It's just swallow copy
| let field_type = self.table_schema.field(idx).data_type(); | ||
| let builder = self.arrow_column_builders.get_mut(idx).unwrap(); | ||
| value.append_to(builder.as_mut(), field_type)?; | ||
| datum.append_to(builder.as_mut(), field_type)?; |
There was a problem hiding this comment.
Is the as_mut() necessary? I can see that the line above already does get_mut(idx).unwrap()
| for (idx, getter) in self.field_getters.iter().enumerate() { | ||
| let datum = getter.get_field(row); | ||
| let field_type = self.table_schema.field(idx).data_type(); | ||
| let builder = self.arrow_column_builders.get_mut(idx).unwrap(); |
There was a problem hiding this comment.
Since we're already in a function that returns Result<bool>, should we handle the case where arrow_column_builders do not have element at idx instead of unwrapping and panic?
Purpose
Linked issue: close #202
Brief change log
Tests
API and Format
Documentation