Skip to content

Commit cc55f42

Browse files
committed
address comment
1 parent 3d28928 commit cc55f42

File tree

2 files changed

+4
-1
lines changed

2 files changed

+4
-1
lines changed

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeltaJoin.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ public class StreamExecDeltaJoin extends ExecNodeBase<RowData>
142142

143143
@JsonProperty(FIELD_NAME_LEFT_UPSERT_KEY)
144144
@JsonInclude(JsonInclude.Include.NON_NULL)
145+
@Nullable
145146
private final int[] leftUpsertKeys;
146147

147148
// left (streaming) side join right (lookup) side
@@ -155,6 +156,7 @@ public class StreamExecDeltaJoin extends ExecNodeBase<RowData>
155156

156157
@JsonProperty(FIELD_NAME_RIGHT_UPSERT_KEY)
157158
@JsonInclude(JsonInclude.Include.NON_NULL)
159+
@Nullable
158160
private final int[] rightUpsertKeys;
159161

160162
// right (streaming) side join left (lookup) side

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,8 @@ private static boolean areJoinConditionsSupported(StreamPhysicalJoin join) {
252252
return false;
253253
}
254254

255-
// if this join output cdc records, the non-equiv condition must be applied on upsert key
255+
// if this join outputs cdc records and has non-equiv condition, the reference columns in
256+
// the non-equiv condition must come from the same set of upsert keys
256257
ChangelogMode changelogMode = getChangelogMode(join);
257258
if (changelogMode.containsOnly(RowKind.INSERT)) {
258259
return true;

0 commit comments

Comments
 (0)