Skip to content

Commit 5543195

Browse files
authored
[FLINK-38511][table] Join that consumes cdc source without delete may be converted to delta join (#27111)
1 parent 4f46693 commit 5543195

File tree

10 files changed

+1461
-532
lines changed

10 files changed

+1461
-532
lines changed

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeltaJoin.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@
8080
import org.slf4j.Logger;
8181
import org.slf4j.LoggerFactory;
8282

83+
import javax.annotation.Nullable;
84+
8385
import java.util.Arrays;
8486
import java.util.List;
8587
import java.util.Map;
@@ -117,6 +119,9 @@ public class StreamExecDeltaJoin extends ExecNodeBase<RowData>
117119
"lookupRightTableJoinSpec";
118120
private static final String FIELD_NAME_LOOKUP_LEFT_TABLE_JOIN_SPEC = "lookupLeftTableJoinSpec";
119121

122+
private static final String FIELD_NAME_LEFT_UPSERT_KEY = "leftUpsertKey";
123+
private static final String FIELD_NAME_RIGHT_UPSERT_KEY = "rightUpsertKey";
124+
120125
private static final String FIELD_NAME_JOIN_TYPE = "joinType";
121126

122127
public static final String FIELD_NAME_ASYNC_OPTIONS = "asyncOptions";
@@ -135,6 +140,11 @@ public class StreamExecDeltaJoin extends ExecNodeBase<RowData>
135140
@JsonProperty(FIELD_NAME_LEFT_JOIN_KEYS)
136141
private final int[] leftJoinKeys;
137142

143+
@JsonProperty(FIELD_NAME_LEFT_UPSERT_KEY)
144+
@JsonInclude(JsonInclude.Include.NON_NULL)
145+
@Nullable
146+
private final int[] leftUpsertKeys;
147+
138148
// left (streaming) side join right (lookup) side
139149
@JsonProperty(FIELD_NAME_LOOKUP_RIGHT_TABLE_JOIN_SPEC)
140150
private final DeltaJoinSpec lookupRightTableJoinSpec;
@@ -144,6 +154,11 @@ public class StreamExecDeltaJoin extends ExecNodeBase<RowData>
144154
@JsonProperty(FIELD_NAME_RIGHT_JOIN_KEYS)
145155
private final int[] rightJoinKeys;
146156

157+
@JsonProperty(FIELD_NAME_RIGHT_UPSERT_KEY)
158+
@JsonInclude(JsonInclude.Include.NON_NULL)
159+
@Nullable
160+
private final int[] rightUpsertKeys;
161+
147162
// right (streaming) side join left (lookup) side
148163
@JsonProperty(FIELD_NAME_LOOKUP_LEFT_TABLE_JOIN_SPEC)
149164
private final DeltaJoinSpec lookupLeftTableJoinSpec;
@@ -153,9 +168,11 @@ public StreamExecDeltaJoin(
153168
FlinkJoinType flinkJoinType,
154169
// delta join args related with the left side
155170
int[] leftJoinKeys,
171+
@Nullable int[] leftUpsertKeys,
156172
DeltaJoinSpec lookupRightTableJoinSpec,
157173
// delta join args related with the right side
158174
int[] rightJoinKeys,
175+
@Nullable int[] rightUpsertKeys,
159176
DeltaJoinSpec lookupLeftTableJoinSpec,
160177
InputProperty leftInputProperty,
161178
InputProperty rightInputProperty,
@@ -168,8 +185,10 @@ public StreamExecDeltaJoin(
168185
ExecNodeContext.newPersistedConfig(StreamExecDeltaJoin.class, tableConfig),
169186
flinkJoinType,
170187
leftJoinKeys,
188+
leftUpsertKeys,
171189
lookupRightTableJoinSpec,
172190
rightJoinKeys,
191+
rightUpsertKeys,
173192
lookupLeftTableJoinSpec,
174193
Lists.newArrayList(leftInputProperty, rightInputProperty),
175194
outputType,
@@ -184,9 +203,11 @@ public StreamExecDeltaJoin(
184203
@JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
185204
@JsonProperty(FIELD_NAME_JOIN_TYPE) FlinkJoinType flinkJoinType,
186205
@JsonProperty(FIELD_NAME_LEFT_JOIN_KEYS) int[] leftJoinKeys,
206+
@JsonProperty(FIELD_NAME_LEFT_UPSERT_KEY) @Nullable int[] leftUpsertKeys,
187207
@JsonProperty(FIELD_NAME_LOOKUP_RIGHT_TABLE_JOIN_SPEC)
188208
DeltaJoinSpec lookupRightTableJoinSpec,
189209
@JsonProperty(FIELD_NAME_RIGHT_JOIN_KEYS) int[] rightJoinKeys,
210+
@JsonProperty(FIELD_NAME_RIGHT_UPSERT_KEY) @Nullable int[] rightUpsertKeys,
190211
@JsonProperty(FIELD_NAME_LOOKUP_LEFT_TABLE_JOIN_SPEC)
191212
DeltaJoinSpec lookupLeftTableJoinSpec,
192213
@JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
@@ -197,8 +218,10 @@ public StreamExecDeltaJoin(
197218

198219
this.flinkJoinType = flinkJoinType;
199220
this.leftJoinKeys = leftJoinKeys;
221+
this.leftUpsertKeys = leftUpsertKeys;
200222
this.lookupRightTableJoinSpec = lookupRightTableJoinSpec;
201223
this.rightJoinKeys = rightJoinKeys;
224+
this.rightUpsertKeys = rightUpsertKeys;
202225
this.lookupLeftTableJoinSpec = lookupLeftTableJoinSpec;
203226
this.asyncLookupOptions = asyncLookupOptions;
204227
}
@@ -237,17 +260,15 @@ protected Transformation<RowData> translateToPlanInternal(
237260
RowDataKeySelector leftJoinKeySelector =
238261
KeySelectorUtil.getRowDataSelector(
239262
classLoader, leftJoinKeys, InternalTypeInfo.of(leftStreamType));
240-
// currently, delta join only supports consuming INSERT-ONLY stream
241263
RowDataKeySelector leftUpsertKeySelector =
242-
getUpsertKeySelector(new int[0], leftStreamType, classLoader);
264+
getUpsertKeySelector(leftUpsertKeys, leftStreamType, classLoader);
243265

244266
// right side selector
245267
RowDataKeySelector rightJoinKeySelector =
246268
KeySelectorUtil.getRowDataSelector(
247269
classLoader, rightJoinKeys, InternalTypeInfo.of(rightStreamType));
248-
// currently, delta join only supports consuming INSERT-ONLY stream
249270
RowDataKeySelector rightUpsertKeySelector =
250-
getUpsertKeySelector(new int[0], rightStreamType, classLoader);
271+
getUpsertKeySelector(rightUpsertKeys, rightStreamType, classLoader);
251272

252273
StreamOperatorFactory<RowData> operatorFactory =
253274
createAsyncLookupDeltaJoin(
@@ -485,9 +506,9 @@ public RexNode visitInputRef(RexInputRef inputRef) {
485506
}
486507

487508
private RowDataKeySelector getUpsertKeySelector(
488-
int[] upsertKey, RowType rowType, ClassLoader classLoader) {
509+
@Nullable int[] upsertKey, RowType rowType, ClassLoader classLoader) {
489510
final int[] rightUpsertKeys;
490-
if (upsertKey.length > 0) {
511+
if (upsertKey != null && upsertKey.length > 0) {
491512
rightUpsertKeys = upsertKey;
492513
} else {
493514
rightUpsertKeys = IntStream.range(0, rowType.getFields().size()).toArray();

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDeltaJoin.java

Lines changed: 38 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -22,39 +22,35 @@
2222
import org.apache.flink.table.api.TableConfig;
2323
import org.apache.flink.table.api.config.ExecutionConfigOptions;
2424
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
25+
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery;
2526
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
2627
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
2728
import org.apache.flink.table.planner.plan.nodes.exec.spec.DeltaJoinSpec;
2829
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDeltaJoin;
2930
import org.apache.flink.table.planner.plan.utils.FunctionCallUtil;
31+
import org.apache.flink.table.planner.plan.utils.JoinTypeUtil;
3032
import org.apache.flink.table.planner.plan.utils.RelExplainUtil;
33+
import org.apache.flink.table.planner.plan.utils.UpsertKeyUtil;
3134
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
32-
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
3335

3436
import org.apache.calcite.plan.RelOptCluster;
3537
import org.apache.calcite.plan.RelTraitSet;
36-
import org.apache.calcite.rel.BiRel;
3738
import org.apache.calcite.rel.RelNode;
3839
import org.apache.calcite.rel.RelWriter;
39-
import org.apache.calcite.rel.core.JoinInfo;
40-
import org.apache.calcite.rel.hint.Hintable;
40+
import org.apache.calcite.rel.core.Join;
41+
import org.apache.calcite.rel.core.JoinRelType;
4142
import org.apache.calcite.rel.hint.RelHint;
4243
import org.apache.calcite.rel.type.RelDataType;
4344
import org.apache.calcite.rex.RexNode;
4445

46+
import java.util.Collections;
4547
import java.util.List;
4648
import java.util.Optional;
4749

4850
import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig;
4951

5052
/** Stream physical RelNode for delta join. */
51-
public class StreamPhysicalDeltaJoin extends BiRel implements StreamPhysicalRel, Hintable {
52-
53-
private final FlinkJoinType joinType;
54-
55-
private final RexNode originalJoinCondition;
56-
57-
private final com.google.common.collect.ImmutableList<RelHint> hints;
53+
public class StreamPhysicalDeltaJoin extends Join implements StreamPhysicalRel {
5854

5955
private final RelDataType rowType;
6056

@@ -70,15 +66,20 @@ public StreamPhysicalDeltaJoin(
7066
List<RelHint> hints,
7167
RelNode left,
7268
RelNode right,
73-
FlinkJoinType joinType,
69+
JoinRelType joinType,
7470
RexNode originalJoinCondition,
7571
DeltaJoinSpec lookupRightTableJoinSpec,
7672
DeltaJoinSpec lookupLeftTableJoinSpec,
7773
RelDataType rowType) {
78-
super(cluster, traitSet, left, right);
79-
this.hints = com.google.common.collect.ImmutableList.copyOf(hints);
80-
this.joinType = joinType;
81-
this.originalJoinCondition = originalJoinCondition;
74+
super(
75+
cluster,
76+
traitSet,
77+
hints,
78+
left,
79+
right,
80+
originalJoinCondition,
81+
Collections.emptySet(),
82+
joinType);
8283
this.lookupRightTableJoinSpec = lookupRightTableJoinSpec;
8384
this.lookupLeftTableJoinSpec = lookupLeftTableJoinSpec;
8485
this.rowType = rowType;
@@ -97,15 +98,20 @@ public ExecNode<?> translateToExecNode() {
9798
// scenarios to enhance throughput as much as possible.
9899
true,
99100
AsyncDataStream.OutputMode.ORDERED);
101+
FlinkRelMetadataQuery fmq =
102+
FlinkRelMetadataQuery.reuseOrCreate(this.getCluster().getMetadataQuery());
100103

101-
JoinInfo joinInfo = JoinInfo.of(left, right, originalJoinCondition);
104+
int[] leftUpsertKey = UpsertKeyUtil.smallestKey(fmq.getUpsertKeys(left)).orElse(null);
105+
int[] rightUpsertKey = UpsertKeyUtil.smallestKey(fmq.getUpsertKeys(right)).orElse(null);
102106

103107
return new StreamExecDeltaJoin(
104108
config,
105-
joinType,
109+
JoinTypeUtil.getFlinkJoinType(joinType),
106110
joinInfo.leftKeys.toIntArray(),
111+
leftUpsertKey,
107112
lookupRightTableJoinSpec,
108113
joinInfo.rightKeys.toIntArray(),
114+
rightUpsertKey,
109115
lookupLeftTableJoinSpec,
110116
InputProperty.DEFAULT,
111117
InputProperty.DEFAULT,
@@ -120,16 +126,21 @@ public boolean requireWatermark() {
120126
}
121127

122128
@Override
123-
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
124-
assert inputs.size() == 2;
129+
public Join copy(
130+
RelTraitSet traitSet,
131+
RexNode conditionExpr,
132+
RelNode left,
133+
RelNode right,
134+
JoinRelType joinType,
135+
boolean semiJoinDone) {
125136
return new StreamPhysicalDeltaJoin(
126137
getCluster(),
127138
traitSet,
128139
hints,
129-
inputs.get(0),
130-
inputs.get(1),
140+
left,
141+
right,
131142
joinType,
132-
originalJoinCondition,
143+
conditionExpr,
133144
lookupRightTableJoinSpec,
134145
lookupLeftTableJoinSpec,
135146
rowType);
@@ -147,12 +158,13 @@ public com.google.common.collect.ImmutableList<RelHint> getHints() {
147158

148159
@Override
149160
public RelWriter explainTerms(RelWriter pw) {
150-
return super.explainTerms(pw)
151-
.item("joinType", joinType.toString())
161+
return pw.input("left", left)
162+
.input("right", right)
163+
.item("joinType", JoinTypeUtil.getFlinkJoinType(joinType).toString())
152164
.item(
153165
"where",
154166
getExpressionString(
155-
originalJoinCondition,
167+
condition,
156168
JavaScalaConversionUtil.toScala(this.getRowType().getFieldNames())
157169
.toList(),
158170
JavaScalaConversionUtil.toScala(Optional.empty()),

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DeltaJoinRewriteRule.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDeltaJoin;
2626
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalJoin;
2727
import org.apache.flink.table.planner.plan.utils.DeltaJoinUtil;
28-
import org.apache.flink.table.planner.plan.utils.JoinTypeUtil;
2928

3029
import org.apache.calcite.plan.RelOptRuleCall;
3130
import org.apache.calcite.plan.RelRule;
@@ -44,7 +43,9 @@
4443
* <li>The join is INNER join.
4544
* <li>There is at least one join key pair in the join.
4645
* <li>The downstream nodes of this join can accept duplicate changes.
47-
* <li>All join inputs are insert only streams.
46+
* <li>All join inputs are with changelog "I" or "I, UA".
47+
* <li>If this join outputs update records, the non-equiv conditions must be applied on upsert
48+
* keys of this join.
4849
* <li>All upstream nodes of this join are in {@code
4950
* DeltaJoinUtil#ALL_SUPPORTED_DELTA_JOIN_UPSTREAM_NODES}
5051
* <li>The join keys include at least one complete index in each source table of the join input.
@@ -92,7 +93,7 @@ private StreamPhysicalDeltaJoin convertToDeltaJoin(StreamPhysicalJoin join) {
9293
join.getHints(),
9394
join.getLeft(),
9495
join.getRight(),
95-
JoinTypeUtil.getFlinkJoinType(join.getJoinType()),
96+
join.getJoinType(),
9697
join.getCondition(),
9798
lookupRightTableJoinSpec,
9899
lookupLeftTableJoinSpec,

0 commit comments

Comments
 (0)