Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -117,6 +119,9 @@ public class StreamExecDeltaJoin extends ExecNodeBase<RowData>
"lookupRightTableJoinSpec";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand why the DELTA_JOIN_TRANSFORMATION here doesn't use deltaJoin but to use delta-join.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I'm not entirely sure either. 🧐 I referenced CommonExecLookupJoin and CommonExecSink, and I noticed that the transformations use "-" as separators, while the field names for JSON are in camel case. So, I decided to follow that convention as well.

private static final String FIELD_NAME_LOOKUP_LEFT_TABLE_JOIN_SPEC = "lookupLeftTableJoinSpec";

private static final String FIELD_NAME_LEFT_UPSERT_KEY = "leftUpsertKey";
private static final String FIELD_NAME_RIGHT_UPSERT_KEY = "rightUpsertKey";

private static final String FIELD_NAME_JOIN_TYPE = "joinType";

public static final String FIELD_NAME_ASYNC_OPTIONS = "asyncOptions";
Expand All @@ -135,6 +140,11 @@ public class StreamExecDeltaJoin extends ExecNodeBase<RowData>
@JsonProperty(FIELD_NAME_LEFT_JOIN_KEYS)
private final int[] leftJoinKeys;

@JsonProperty(FIELD_NAME_LEFT_UPSERT_KEY)
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
private final int[] leftUpsertKeys;

// left (streaming) side join right (lookup) side
@JsonProperty(FIELD_NAME_LOOKUP_RIGHT_TABLE_JOIN_SPEC)
private final DeltaJoinSpec lookupRightTableJoinSpec;
Expand All @@ -144,6 +154,11 @@ public class StreamExecDeltaJoin extends ExecNodeBase<RowData>
@JsonProperty(FIELD_NAME_RIGHT_JOIN_KEYS)
private final int[] rightJoinKeys;

@JsonProperty(FIELD_NAME_RIGHT_UPSERT_KEY)
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
private final int[] rightUpsertKeys;

// right (streaming) side join left (lookup) side
@JsonProperty(FIELD_NAME_LOOKUP_LEFT_TABLE_JOIN_SPEC)
private final DeltaJoinSpec lookupLeftTableJoinSpec;
Expand All @@ -153,9 +168,11 @@ public StreamExecDeltaJoin(
FlinkJoinType flinkJoinType,
// delta join args related with the left side
int[] leftJoinKeys,
@Nullable int[] leftUpsertKeys,
DeltaJoinSpec lookupRightTableJoinSpec,
// delta join args related with the right side
int[] rightJoinKeys,
@Nullable int[] rightUpsertKeys,
DeltaJoinSpec lookupLeftTableJoinSpec,
InputProperty leftInputProperty,
InputProperty rightInputProperty,
Expand All @@ -168,8 +185,10 @@ public StreamExecDeltaJoin(
ExecNodeContext.newPersistedConfig(StreamExecDeltaJoin.class, tableConfig),
flinkJoinType,
leftJoinKeys,
leftUpsertKeys,
lookupRightTableJoinSpec,
rightJoinKeys,
rightUpsertKeys,
lookupLeftTableJoinSpec,
Lists.newArrayList(leftInputProperty, rightInputProperty),
outputType,
Expand All @@ -184,9 +203,11 @@ public StreamExecDeltaJoin(
@JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
@JsonProperty(FIELD_NAME_JOIN_TYPE) FlinkJoinType flinkJoinType,
@JsonProperty(FIELD_NAME_LEFT_JOIN_KEYS) int[] leftJoinKeys,
@JsonProperty(FIELD_NAME_LEFT_UPSERT_KEY) @Nullable int[] leftUpsertKeys,
@JsonProperty(FIELD_NAME_LOOKUP_RIGHT_TABLE_JOIN_SPEC)
DeltaJoinSpec lookupRightTableJoinSpec,
@JsonProperty(FIELD_NAME_RIGHT_JOIN_KEYS) int[] rightJoinKeys,
@JsonProperty(FIELD_NAME_RIGHT_UPSERT_KEY) @Nullable int[] rightUpsertKeys,
@JsonProperty(FIELD_NAME_LOOKUP_LEFT_TABLE_JOIN_SPEC)
DeltaJoinSpec lookupLeftTableJoinSpec,
@JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
Expand All @@ -197,8 +218,10 @@ public StreamExecDeltaJoin(

this.flinkJoinType = flinkJoinType;
this.leftJoinKeys = leftJoinKeys;
this.leftUpsertKeys = leftUpsertKeys;
this.lookupRightTableJoinSpec = lookupRightTableJoinSpec;
this.rightJoinKeys = rightJoinKeys;
this.rightUpsertKeys = rightUpsertKeys;
this.lookupLeftTableJoinSpec = lookupLeftTableJoinSpec;
this.asyncLookupOptions = asyncLookupOptions;
}
Expand Down Expand Up @@ -237,17 +260,15 @@ protected Transformation<RowData> translateToPlanInternal(
RowDataKeySelector leftJoinKeySelector =
KeySelectorUtil.getRowDataSelector(
classLoader, leftJoinKeys, InternalTypeInfo.of(leftStreamType));
// currently, delta join only supports consuming INSERT-ONLY stream
RowDataKeySelector leftUpsertKeySelector =
getUpsertKeySelector(new int[0], leftStreamType, classLoader);
getUpsertKeySelector(leftUpsertKeys, leftStreamType, classLoader);

// right side selector
RowDataKeySelector rightJoinKeySelector =
KeySelectorUtil.getRowDataSelector(
classLoader, rightJoinKeys, InternalTypeInfo.of(rightStreamType));
// currently, delta join only supports consuming INSERT-ONLY stream
RowDataKeySelector rightUpsertKeySelector =
getUpsertKeySelector(new int[0], rightStreamType, classLoader);
getUpsertKeySelector(rightUpsertKeys, rightStreamType, classLoader);

StreamOperatorFactory<RowData> operatorFactory =
createAsyncLookupDeltaJoin(
Expand Down Expand Up @@ -485,9 +506,9 @@ public RexNode visitInputRef(RexInputRef inputRef) {
}

private RowDataKeySelector getUpsertKeySelector(
int[] upsertKey, RowType rowType, ClassLoader classLoader) {
@Nullable int[] upsertKey, RowType rowType, ClassLoader classLoader) {
final int[] rightUpsertKeys;
if (upsertKey.length > 0) {
if (upsertKey != null && upsertKey.length > 0) {
rightUpsertKeys = upsertKey;
} else {
rightUpsertKeys = IntStream.range(0, rowType.getFields().size()).toArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,39 +22,35 @@
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.spec.DeltaJoinSpec;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDeltaJoin;
import org.apache.flink.table.planner.plan.utils.FunctionCallUtil;
import org.apache.flink.table.planner.plan.utils.JoinTypeUtil;
import org.apache.flink.table.planner.plan.utils.RelExplainUtil;
import org.apache.flink.table.planner.plan.utils.UpsertKeyUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;

import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.BiRel;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.core.JoinInfo;
import org.apache.calcite.rel.hint.Hintable;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;

import java.util.Collections;
import java.util.List;
import java.util.Optional;

import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig;

/** Stream physical RelNode for delta join. */
public class StreamPhysicalDeltaJoin extends BiRel implements StreamPhysicalRel, Hintable {

private final FlinkJoinType joinType;

private final RexNode originalJoinCondition;

private final com.google.common.collect.ImmutableList<RelHint> hints;
public class StreamPhysicalDeltaJoin extends Join implements StreamPhysicalRel {

private final RelDataType rowType;

Expand All @@ -70,15 +66,20 @@ public StreamPhysicalDeltaJoin(
List<RelHint> hints,
RelNode left,
RelNode right,
FlinkJoinType joinType,
JoinRelType joinType,
RexNode originalJoinCondition,
DeltaJoinSpec lookupRightTableJoinSpec,
DeltaJoinSpec lookupLeftTableJoinSpec,
RelDataType rowType) {
super(cluster, traitSet, left, right);
this.hints = com.google.common.collect.ImmutableList.copyOf(hints);
this.joinType = joinType;
this.originalJoinCondition = originalJoinCondition;
super(
cluster,
traitSet,
hints,
left,
right,
originalJoinCondition,
Collections.emptySet(),
joinType);
this.lookupRightTableJoinSpec = lookupRightTableJoinSpec;
this.lookupLeftTableJoinSpec = lookupLeftTableJoinSpec;
this.rowType = rowType;
Expand All @@ -97,15 +98,20 @@ public ExecNode<?> translateToExecNode() {
// scenarios to enhance throughput as much as possible.
true,
AsyncDataStream.OutputMode.ORDERED);
FlinkRelMetadataQuery fmq =
FlinkRelMetadataQuery.reuseOrCreate(this.getCluster().getMetadataQuery());

JoinInfo joinInfo = JoinInfo.of(left, right, originalJoinCondition);
int[] leftUpsertKey = UpsertKeyUtil.smallestKey(fmq.getUpsertKeys(left)).orElse(null);
int[] rightUpsertKey = UpsertKeyUtil.smallestKey(fmq.getUpsertKeys(right)).orElse(null);

return new StreamExecDeltaJoin(
config,
joinType,
JoinTypeUtil.getFlinkJoinType(joinType),
joinInfo.leftKeys.toIntArray(),
leftUpsertKey,
lookupRightTableJoinSpec,
joinInfo.rightKeys.toIntArray(),
rightUpsertKey,
lookupLeftTableJoinSpec,
InputProperty.DEFAULT,
InputProperty.DEFAULT,
Expand All @@ -120,16 +126,21 @@ public boolean requireWatermark() {
}

@Override
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
assert inputs.size() == 2;
public Join copy(
RelTraitSet traitSet,
RexNode conditionExpr,
RelNode left,
RelNode right,
JoinRelType joinType,
boolean semiJoinDone) {
return new StreamPhysicalDeltaJoin(
getCluster(),
traitSet,
hints,
inputs.get(0),
inputs.get(1),
left,
right,
joinType,
originalJoinCondition,
conditionExpr,
lookupRightTableJoinSpec,
lookupLeftTableJoinSpec,
rowType);
Expand All @@ -147,12 +158,13 @@ public com.google.common.collect.ImmutableList<RelHint> getHints() {

@Override
public RelWriter explainTerms(RelWriter pw) {
return super.explainTerms(pw)
.item("joinType", joinType.toString())
return pw.input("left", left)
.input("right", right)
.item("joinType", JoinTypeUtil.getFlinkJoinType(joinType).toString())
.item(
"where",
getExpressionString(
originalJoinCondition,
condition,
JavaScalaConversionUtil.toScala(this.getRowType().getFieldNames())
.toList(),
JavaScalaConversionUtil.toScala(Optional.empty()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDeltaJoin;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalJoin;
import org.apache.flink.table.planner.plan.utils.DeltaJoinUtil;
import org.apache.flink.table.planner.plan.utils.JoinTypeUtil;

import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelRule;
Expand All @@ -44,7 +43,9 @@
* <li>The join is INNER join.
* <li>There is at least one join key pair in the join.
* <li>The downstream nodes of this join can accept duplicate changes.
* <li>All join inputs are insert only streams.
* <li>All join inputs are with changelog "I" or "I, UA".
* <li>If this join outputs update records, the non-equiv conditions must be applied on upsert
* keys of this join.
* <li>All upstream nodes of this join are in {@code
* DeltaJoinUtil#ALL_SUPPORTED_DELTA_JOIN_UPSTREAM_NODES}
* <li>The join keys include at least one complete index in each source table of the join input.
Expand Down Expand Up @@ -92,7 +93,7 @@ private StreamPhysicalDeltaJoin convertToDeltaJoin(StreamPhysicalJoin join) {
join.getHints(),
join.getLeft(),
join.getRight(),
JoinTypeUtil.getFlinkJoinType(join.getJoinType()),
join.getJoinType(),
join.getCondition(),
lookupRightTableJoinSpec,
lookupLeftTableJoinSpec,
Expand Down
Loading