-
Notifications
You must be signed in to change notification settings - Fork 196
Parquet Incremental Sync #768
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
… into the parquet table
…ds, interfacing with ConversionSource
|
I can do first review for this @the-other-tim-brown @vinishjail97 |
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetDataManager.java
Outdated
Show resolved
Hide resolved
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetDataManager.java
Show resolved
Hide resolved
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetDataManager.java
Outdated
Show resolved
Hide resolved
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetFileConfig.java
Outdated
Show resolved
Hide resolved
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetDataManager.java
Outdated
Show resolved
Hide resolved
| try (ParquetWriter<Group> writer = | ||
| new ParquetWriter<Group>( | ||
| outputFile, | ||
| new GroupWriteSupport(), | ||
| parquetFileConfig.getCodec(), | ||
| (int) parquetFileConfig.getRowGroupSize(), | ||
| pageSize, | ||
| pageSize, // dictionaryPageSize | ||
| true, // enableDictionary | ||
| false, // enableValidation | ||
| ParquetWriter.DEFAULT_WRITER_VERSION, | ||
| conf)) { | ||
| Group currentGroup = null; | ||
| while ((currentGroup = (Group) reader.read()) != null) { | ||
| writer.write(currentGroup); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we writing new parquet files again like this through the writer? I think there's some misunderstanding with the parquet incremental sync feature here.
Parquet Incremental Sync Requirements.
- You have a target table where parquet files [p1/f1.parquet, p1/f2.parquet, p2/f1.parquet] have been synced to hudi, iceberg and delta for example.
- In the source changes some changes have been made a new file in partition p1 was added and p2's file was deleted. The incremental sync should now sync the new changes incrementally.
@sapienza88 It's better to align on the approach first here before we push PR's. Can you add the approach for parquet incremental sync in the PR description or any google doc if possible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vinishjail97 we simply want to append the file to where it belongs in the table (under the right partition). so we need to find the partition path (from the table's) where the file should be injected (doing this through path construction). In order to write the file the only way as far as I know is the ParquetWriter. After doing so, the Source can filter the files based on the modfication dates.
|
@vinishjail97 I added some comments on the functions so that the approach is clearer. All above suggestions were also taken into account in my last commit. |
Important Read
What is the purpose of the pull request
(For example: This pull request implements the sync for delta format.)
Brief change log
(for example:)
Verify this pull request
(Please pick either of the following options)
This pull request is a trivial rework / code cleanup without any test coverage.
(or)
This pull request is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)