Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support $changelog auxiliary table for flink connector #356

Open
1 of 2 tasks
wuchong opened this issue Feb 8, 2025 · 2 comments · May be fixed by #510
Open
1 of 2 tasks

Support $changelog auxiliary table for flink connector #356

wuchong opened this issue Feb 8, 2025 · 2 comments · May be fixed by #510
Assignees

Comments

@wuchong
Copy link
Member

wuchong commented Feb 8, 2025

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

This issue aims to enhance our Flink connector by introducing support for the $changelog auxiliary table. This feature is essential for capturing and processing change data capture (CDC) events seamlessly within Flink streaming jobs.

Fluss primary key tables support change data capature to track row-level changes for updates and deletes. When streaming read the primary key table, the flink connector emit records with Flink native RowKind (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE) to enable stateful computation on changelogs. On the other hand, there are many use cases that users want to consume the plain logs without converting into Flink native RowKind. This feature is similar to Paimon $audit_log table, and Databricks table_changes(..) query.

Solution

Implementation

  1. FlinkCatalog supports getTable for <table_name>$changelog table path, and the returned table should include additional metadata columns (see following).
  2. FlinkRecordEmitter of FlinkSourceReader should have a special FlussRowToFlinkRowConverter that converts the Fluss InternalRow into Flink RowData with the additional metadata columns.
  3. CoordinatorService#createTable should add validation that whether the created table using system reserved columns (_change_type, _log_offset, _commit_timestamp).

Schema of the $changelog table

Column Name Type Values
_change_type String +I, -U, +U, -D
_log_offset long the offset of the log
_commit_timestamp TIMESTAMP_LTZ the timestamp associated when the change was happended

Reference: https://docs.databricks.com/en/delta/delta-change-data-feed.html#what-is-the-schema-for-the-change-data-feed

Anything else?

You can take Paimon $audit_log implementation as an example: https://github.com/apache/paimon/blob/release-1.0/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java#L69

Willingness to contribute

  • I'm willing to submit a PR!
@MehulBatra
Copy link
Contributor

I would like to take a stab at it! @wuchong

@wuchong
Copy link
Member Author

wuchong commented Feb 22, 2025

Thank you, @MehulBatra. I've assigned the task to you. Please feel free to reach out if you need any additional guidance after your investigation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants