-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-38504][table] Add restore tests for delta join #27117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
b590638 to
d899471
Compare
f187951 to
7026c3c
Compare
7026c3c to
d2c0332
Compare
...ble/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java
Show resolved
Hide resolved
| data.addAll(expectedAfterRestore); | ||
| } | ||
|
|
||
| Map<List<Object>, Row> deduplicatedMap = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic in this section, which uses deduplicatedFieldIndices to modify expectedBeforeRestore and expectedAfterRestore, can also be implemented by calling a function in getExpectedBeforeRestoreAsStrings and getExpectedAfterRestoreAsStrings. Of course, it's up to you, but I personally find the latter easier to understand
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, for expectedBeforeRestoreStrings and expectedAfterRestoreStrings, which have transformed each row into a string and lost their schema, it is not possible to deduplicate them by column. (Perhaps one way to handle this is by removing the leading and trailing parentheses and splitting the strings by commas).
However, for now, there is no need to support this in string scenarios for test, so I haven't implemented it yet.
...c/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/DeltaJoinTestPrograms.java
Outdated
Show resolved
Hide resolved
...c/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/DeltaJoinTestPrograms.java
Outdated
Show resolved
Hide resolved
| // The difference between registeredDataForFullStage and `registeredData` is that | ||
| // `registeredData` is used for data delivered from the source to downstream, while the rows in | ||
| // `registeredDataForFullStage` will not be sent to downstream and are only used for lookup. | ||
| private static final Map<String, Collection<Row>> registeredDataForFullStage = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious why it's named registeredDataForFullStage instead of ForCurrentStage or ForCurrentTable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have renamed it to registeredConsumedData.
Different with registeredRowData, the registeredConsumedData will not be re-consumed by the source to be sent to downstream operators.
|
Thanks for advancing the feature. Let me leave some comments! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks.
What is the purpose of the change
Add restore tests for delta join.
Brief change log
Verifying this change
New tests and existent tests can verify this pr.
Does this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation