[lake/hudi] Introduce Hudi lake writer support for tiering tables#3507
Merged
luoyuxia merged 3 commits intoJun 22, 2026
Merged
Conversation
Contributor
There was a problem hiding this comment.
Pull request overview
This PR introduces the writer-side implementation for tiering Fluss tables into Hudi within the fluss-lake-hudi module. It adds Hudi tiering writer scaffolding, Fluss→Flink/Hudi row conversions, buffered batch writes via HoodieFlinkWriteClient, and DFS-backed checkpoint metadata to coordinate Hudi instant initialization across writer subtasks (with committer support intentionally left for follow-up work).
Changes:
- Add Hudi tiering writer components (
HudiLakeTieringFactory,HudiLakeWriter,RecordWriter, buffering/conversion utilities). - Add DFS-backed checkpoint metadata utilities (
CkpMetadata*) to coordinate instant initialization across splits/rounds. - Add initial serialization format for writer results (
HudiWriteResult*) plus targeted unit tests, and addflink-table-runtimeas a provided dependency.
Reviewed changes
Copilot reviewed 22 out of 22 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/writer/FlussRecordAsHudiRowTest.java | Adds unit tests for Fluss LogRecord→Hudi/Flink RowData wrapping and system columns/kind mapping. |
| fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiWriteResultSerializerTest.java | Adds unit tests for HudiWriteResultSerializer versioning and empty result round-trip. |
| fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/meta/CkpMetadataProvider.java | Provides table-scoped access/caching for checkpoint metadata coordination objects. |
| fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/meta/CkpMetadataFactory.java | Constructs DFS-backed checkpoint metadata using Hudi/Hadoop filesystem utilities. |
| fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/meta/CkpMetadata.java | Implements DFS-backed “message bus” files to track instant lifecycle and retention. |
| fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/meta/CkpMessage.java | Represents checkpoint metadata messages (instant + state) derived from filenames. |
| fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiConversions.java | Adds Fluss ChangeType→Flink RowKind conversion for Hudi row data. |
| fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/writer/RecordWriteBuffer.java | Buffers records into Hudi buckets and flushes batches via HoodieFlinkWriteClient. |
| fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/writer/HudiRecordWriter.java | Writes Fluss LogRecords by converting to HoodieFlinkInternalRow and routing/buffering. |
| fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/writer/HudiRecordConverter.java | Converts buffered RowData into HoodieRecords using key/partition generation. |
| fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/writer/FlussRowAsHudiRow.java | Wraps Fluss InternalRow as Flink RowData for Hudi ingestion. |
| fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/writer/FlussRecordAsHudiRow.java | Extends row wrapper to include Fluss system columns and change-type→row-kind mapping. |
| fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/writer/FlussMapAsHudiMap.java | Wraps Fluss InternalMap as Flink MapData. |
| fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/writer/FlussArrayAsHudiArray.java | Wraps Fluss InternalArray as Flink ArrayData. |
| fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/RecordWriter.java | Base writer implementing bucket/file-id routing and completing buffered writes. |
| fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiWriteTableInfo.java | Resolves Hudi table metadata/config and creates the Hudi write client used by writers. |
| fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiWriteResultSerializer.java | Introduces versioned serializer for Hudi writer results. |
| fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiWriteResult.java | Defines writer result container to be consumed by a future committer. |
| fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeWriter.java | Implements LakeWriter for Hudi, including instant initialization + record writing lifecycle. |
| fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeTieringFactory.java | Implements LakeTieringFactory wiring writer + serializers; committer intentionally unsupported. |
| fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiCatalogProvider.java | Serializable provider to create/open Hudi catalogs inside tiering subtasks. |
| fluss-lake/fluss-lake-hudi/pom.xml | Adds flink-table-runtime as a provided dependency for the writer buffer implementation. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
luoyuxia
reviewed
Jun 22, 2026
luoyuxia
approved these changes
Jun 22, 2026
2 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Purpose
Linked issue: #3280
This PR introduces the writer-side implementation for tiering Fluss tables to Hudi.
It builds on the previous lake/hudi PRs that added Hudi catalog/source support and the tiering writer init metadata such as split index and tiering round timestamp. This PR only migrates the Hudi writer capability. The Hudi committer and full
LakeStorage#createLakeTieringFactoryenablement will be completed in follow-up PRs.Brief change log
Add Hudi lake tiering writer scaffolding:
HudiLakeTieringFactoryHudiLakeWriterHudiWriteResultHudiWriteResultSerializerHudiWriteTableInfoAdd writer-side record conversion and buffering:
LogRecord/InternalRowinto Hudi-compatible FlinkRowData.ChangeTypeto FlinkRowKind.HoodieFlinkWriteClient.Add Hudi instant coordination metadata:
WriterInitContext#splitIndex()to let only the first split initialize the Hudi instant.WriterInitContext#tieringRoundTimestamp()to wait for the correct round instant.Keep the feature intentionally writer-only for now:
UnsupportedOperationException.HudiLakeStorage#createLakeTieringFactory()is not enabled yet to avoid exposing incomplete tiering-to-Hudi service behavior.Add
flink-table-runtimeas a provided dependency for the Hudi writer buffer.Tests
mvn -q -pl fluss-lake/fluss-lake-hudi -am -DskipITs -Dcheckstyle.skip=true -DfailIfNoTests=false -Dtest=FlussRecordAsHudiRowTest,HudiWriteResultSerializerTest testgit diff --checkmvn clean verifywas not run locally.API and Format
This PR does not introduce a new user-facing public API.
It adds internal Hudi tiering writer classes and a versioned
HudiWriteResultSerializer. It also introduces internal checkpoint metadata files under Hudi auxiliary metadata paths for coordinating writer-side instantinitialization. The full committable format and commit protocol will be finalized in the follow-up committer PR.
Documentation
No user-facing documentation is added in this PR because Hudi tiering is not fully enabled yet. Documentation should be added when the committer and end-to-end Hudi tiering service are enabled.