Skip to content

Conversation

@xuyangzhong
Copy link
Contributor

@xuyangzhong xuyangzhong commented Oct 14, 2025

What is the purpose of the change

Join with the following pattern can be converted to delta join:

  1. All two cdc sources output changelog mode without delete
  2. Pks of these two cdc sources all contain join key, and then upstream nodes of join do not require sending update before

Brief change log

  • Support to optimize to delta join when join can match the optimization pattern.
  • Add UT, IT and Harness test for this optimization.

Verifying this change

Tests are added to verify this pr

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented?

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 14, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@xuyangzhong xuyangzhong force-pushed the delta_join_support_update2 branch from 939312f to 1092702 Compare October 22, 2025 08:35
@xuyangzhong xuyangzhong marked this pull request as ready for review October 22, 2025 08:35
@xuyangzhong xuyangzhong force-pushed the delta_join_support_update2 branch from 1092702 to c65f4d2 Compare October 22, 2025 08:36
@xuyangzhong xuyangzhong force-pushed the delta_join_support_update2 branch from c65f4d2 to 3d28928 Compare October 22, 2025 08:37
val expected = List("+I[1.0, 1, 2022-02-02T02:02:02, 1, 1.0, 2022-02-02T02:02:22]")

// could not optimize into delta join because join keys do not contain indexes strictly
assertThatThrownBy(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed these expected failed tests because these exceptions have been tested in DeltaJoinTest. And I hope the IT tests can purely test the dataset, not for plan.

private static final String FIELD_NAME_RIGHT_JOIN_KEYS = "rightJoinKeys";

private static final String FIELD_NAME_LOOKUP_RIGHT_TABLE_JOIN_SPEC =
"lookupRightTableJoinSpec";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand why the DELTA_JOIN_TRANSFORMATION here doesn't use deltaJoin but to use delta-join.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I'm not entirely sure either. 🧐 I referenced CommonExecLookupJoin and CommonExecSink, and I noticed that the transformations use "-" as separators, while the field names for JSON are in camel case. So, I decided to follow that convention as well.

return false;
}

// if this join output cdc records, the non-equiv condition must be applied on upsert key
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this join output cdc records and has non-equiv condition, upsert key must contain non-equiv condition

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides, I don't quite understand why there is such a restriction here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You raised a very important issue. You can refer FlinkChangelogModeInferenceProgram.scala to understand the problem better. The essence of the issue is whether we can allow the upstream to omit sending Update Before(UB) when a filter is present. We can break down the filtering condition into two scenarios: 1. Filtering only on upsert key columns, and 2. Filtering on non-upsert keys. The distinction between these two cases is that in scenario 1, we can enable further optimization for the upstream and avoid sending UB.

Currently, in the FlinkChangelogModeInferenceProgram, we only check the filter on the Calc node. However, there are similar filters present on the join node (which are divided into join keys and non-equivalent join conditions), and this aspect has been overlooked. To prevent expanding the scope of this pull request, I only ensure that a delta join will not be optimized here (since Delta Join cannot consume UB messages) without altering the existing logic for streaming join.

As for why we don't need to address the join key, that has already been handled here: FlinkChangelogModeInferenceProgram.scala.

Copy link
Contributor Author

@xuyangzhong xuyangzhong Oct 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, the comment here should emphasize that the columns in the non-equivalent condition must all come from the same set of upsert keys. Therefore, I will modify it to:
If this join outputs cdc records and has non-equiv condition, the reference columns in the non-equiv condition must come from the same set of upsert keys.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I did not reuse the logic of isNonUpsertKeyCondition from FlinkChangelogModeInferenceProgram is that it only considers a single upsert key (as seen in this line). In this case, any set of upsert keys can be applicable.

}

@Test
def testFilterOnNonUpsertKeysAfterJoinWithCdcSourceWithoutDelete(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we set deltaJoin to force and use assertThatThrownBy to display the error reason in the test for failing to convert to deltaJoin

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we are unable to provide detailed error logs to explain why optimization to Delta Join is not supported. For more information, please refer to FLINK-37954. As for now, I think it's acceptable to display the original Join in the plan instead of the DeltaJoin. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I think it's indeed worth further improvement

@Au-Miner
Copy link
Contributor

Thanks for advancing the feature. Let me leave some comments

@xuyangzhong xuyangzhong force-pushed the delta_join_support_update2 branch from 6cf7fdf to cc55f42 Compare October 23, 2025 11:02
@github-actions github-actions bot added the community-reviewed PR has been reviewed by the community. label Oct 23, 2025
Copy link
Contributor

@Au-Miner Au-Miner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing the comment. It LGTM.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants