diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeltaJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeltaJoin.java index 962cc5505414d..07ae35a6e82cc 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeltaJoin.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeltaJoin.java @@ -80,6 +80,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.util.Arrays; import java.util.List; import java.util.Map; @@ -117,6 +119,9 @@ public class StreamExecDeltaJoin extends ExecNodeBase "lookupRightTableJoinSpec"; private static final String FIELD_NAME_LOOKUP_LEFT_TABLE_JOIN_SPEC = "lookupLeftTableJoinSpec"; + private static final String FIELD_NAME_LEFT_UPSERT_KEY = "leftUpsertKey"; + private static final String FIELD_NAME_RIGHT_UPSERT_KEY = "rightUpsertKey"; + private static final String FIELD_NAME_JOIN_TYPE = "joinType"; public static final String FIELD_NAME_ASYNC_OPTIONS = "asyncOptions"; @@ -135,6 +140,11 @@ public class StreamExecDeltaJoin extends ExecNodeBase @JsonProperty(FIELD_NAME_LEFT_JOIN_KEYS) private final int[] leftJoinKeys; + @JsonProperty(FIELD_NAME_LEFT_UPSERT_KEY) + @JsonInclude(JsonInclude.Include.NON_NULL) + @Nullable + private final int[] leftUpsertKeys; + // left (streaming) side join right (lookup) side @JsonProperty(FIELD_NAME_LOOKUP_RIGHT_TABLE_JOIN_SPEC) private final DeltaJoinSpec lookupRightTableJoinSpec; @@ -144,6 +154,11 @@ public class StreamExecDeltaJoin extends ExecNodeBase @JsonProperty(FIELD_NAME_RIGHT_JOIN_KEYS) private final int[] rightJoinKeys; + @JsonProperty(FIELD_NAME_RIGHT_UPSERT_KEY) + @JsonInclude(JsonInclude.Include.NON_NULL) + @Nullable + private final int[] rightUpsertKeys; + // right (streaming) side join left (lookup) side @JsonProperty(FIELD_NAME_LOOKUP_LEFT_TABLE_JOIN_SPEC) private final DeltaJoinSpec lookupLeftTableJoinSpec; @@ -153,9 +168,11 @@ public StreamExecDeltaJoin( FlinkJoinType flinkJoinType, // delta join args related with the left side int[] leftJoinKeys, + @Nullable int[] leftUpsertKeys, DeltaJoinSpec lookupRightTableJoinSpec, // delta join args related with the right side int[] rightJoinKeys, + @Nullable int[] rightUpsertKeys, DeltaJoinSpec lookupLeftTableJoinSpec, InputProperty leftInputProperty, InputProperty rightInputProperty, @@ -168,8 +185,10 @@ public StreamExecDeltaJoin( ExecNodeContext.newPersistedConfig(StreamExecDeltaJoin.class, tableConfig), flinkJoinType, leftJoinKeys, + leftUpsertKeys, lookupRightTableJoinSpec, rightJoinKeys, + rightUpsertKeys, lookupLeftTableJoinSpec, Lists.newArrayList(leftInputProperty, rightInputProperty), outputType, @@ -184,9 +203,11 @@ public StreamExecDeltaJoin( @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig, @JsonProperty(FIELD_NAME_JOIN_TYPE) FlinkJoinType flinkJoinType, @JsonProperty(FIELD_NAME_LEFT_JOIN_KEYS) int[] leftJoinKeys, + @JsonProperty(FIELD_NAME_LEFT_UPSERT_KEY) @Nullable int[] leftUpsertKeys, @JsonProperty(FIELD_NAME_LOOKUP_RIGHT_TABLE_JOIN_SPEC) DeltaJoinSpec lookupRightTableJoinSpec, @JsonProperty(FIELD_NAME_RIGHT_JOIN_KEYS) int[] rightJoinKeys, + @JsonProperty(FIELD_NAME_RIGHT_UPSERT_KEY) @Nullable int[] rightUpsertKeys, @JsonProperty(FIELD_NAME_LOOKUP_LEFT_TABLE_JOIN_SPEC) DeltaJoinSpec lookupLeftTableJoinSpec, @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List inputProperties, @@ -197,8 +218,10 @@ public StreamExecDeltaJoin( this.flinkJoinType = flinkJoinType; this.leftJoinKeys = leftJoinKeys; + this.leftUpsertKeys = leftUpsertKeys; this.lookupRightTableJoinSpec = lookupRightTableJoinSpec; this.rightJoinKeys = rightJoinKeys; + this.rightUpsertKeys = rightUpsertKeys; this.lookupLeftTableJoinSpec = lookupLeftTableJoinSpec; this.asyncLookupOptions = asyncLookupOptions; } @@ -237,17 +260,15 @@ protected Transformation translateToPlanInternal( RowDataKeySelector leftJoinKeySelector = KeySelectorUtil.getRowDataSelector( classLoader, leftJoinKeys, InternalTypeInfo.of(leftStreamType)); - // currently, delta join only supports consuming INSERT-ONLY stream RowDataKeySelector leftUpsertKeySelector = - getUpsertKeySelector(new int[0], leftStreamType, classLoader); + getUpsertKeySelector(leftUpsertKeys, leftStreamType, classLoader); // right side selector RowDataKeySelector rightJoinKeySelector = KeySelectorUtil.getRowDataSelector( classLoader, rightJoinKeys, InternalTypeInfo.of(rightStreamType)); - // currently, delta join only supports consuming INSERT-ONLY stream RowDataKeySelector rightUpsertKeySelector = - getUpsertKeySelector(new int[0], rightStreamType, classLoader); + getUpsertKeySelector(rightUpsertKeys, rightStreamType, classLoader); StreamOperatorFactory operatorFactory = createAsyncLookupDeltaJoin( @@ -485,9 +506,9 @@ public RexNode visitInputRef(RexInputRef inputRef) { } private RowDataKeySelector getUpsertKeySelector( - int[] upsertKey, RowType rowType, ClassLoader classLoader) { + @Nullable int[] upsertKey, RowType rowType, ClassLoader classLoader) { final int[] rightUpsertKeys; - if (upsertKey.length > 0) { + if (upsertKey != null && upsertKey.length > 0) { rightUpsertKeys = upsertKey; } else { rightUpsertKeys = IntStream.range(0, rowType.getFields().size()).toArray(); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDeltaJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDeltaJoin.java index 7c0f11cdc616d..217ad55a5705c 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDeltaJoin.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDeltaJoin.java @@ -22,39 +22,35 @@ import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery; import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; import org.apache.flink.table.planner.plan.nodes.exec.spec.DeltaJoinSpec; import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDeltaJoin; import org.apache.flink.table.planner.plan.utils.FunctionCallUtil; +import org.apache.flink.table.planner.plan.utils.JoinTypeUtil; import org.apache.flink.table.planner.plan.utils.RelExplainUtil; +import org.apache.flink.table.planner.plan.utils.UpsertKeyUtil; import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; -import org.apache.flink.table.runtime.operators.join.FlinkJoinType; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.BiRel; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelWriter; -import org.apache.calcite.rel.core.JoinInfo; -import org.apache.calcite.rel.hint.Hintable; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.hint.RelHint; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexNode; +import java.util.Collections; import java.util.List; import java.util.Optional; import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig; /** Stream physical RelNode for delta join. */ -public class StreamPhysicalDeltaJoin extends BiRel implements StreamPhysicalRel, Hintable { - - private final FlinkJoinType joinType; - - private final RexNode originalJoinCondition; - - private final com.google.common.collect.ImmutableList hints; +public class StreamPhysicalDeltaJoin extends Join implements StreamPhysicalRel { private final RelDataType rowType; @@ -70,15 +66,20 @@ public StreamPhysicalDeltaJoin( List hints, RelNode left, RelNode right, - FlinkJoinType joinType, + JoinRelType joinType, RexNode originalJoinCondition, DeltaJoinSpec lookupRightTableJoinSpec, DeltaJoinSpec lookupLeftTableJoinSpec, RelDataType rowType) { - super(cluster, traitSet, left, right); - this.hints = com.google.common.collect.ImmutableList.copyOf(hints); - this.joinType = joinType; - this.originalJoinCondition = originalJoinCondition; + super( + cluster, + traitSet, + hints, + left, + right, + originalJoinCondition, + Collections.emptySet(), + joinType); this.lookupRightTableJoinSpec = lookupRightTableJoinSpec; this.lookupLeftTableJoinSpec = lookupLeftTableJoinSpec; this.rowType = rowType; @@ -97,15 +98,20 @@ public ExecNode translateToExecNode() { // scenarios to enhance throughput as much as possible. true, AsyncDataStream.OutputMode.ORDERED); + FlinkRelMetadataQuery fmq = + FlinkRelMetadataQuery.reuseOrCreate(this.getCluster().getMetadataQuery()); - JoinInfo joinInfo = JoinInfo.of(left, right, originalJoinCondition); + int[] leftUpsertKey = UpsertKeyUtil.smallestKey(fmq.getUpsertKeys(left)).orElse(null); + int[] rightUpsertKey = UpsertKeyUtil.smallestKey(fmq.getUpsertKeys(right)).orElse(null); return new StreamExecDeltaJoin( config, - joinType, + JoinTypeUtil.getFlinkJoinType(joinType), joinInfo.leftKeys.toIntArray(), + leftUpsertKey, lookupRightTableJoinSpec, joinInfo.rightKeys.toIntArray(), + rightUpsertKey, lookupLeftTableJoinSpec, InputProperty.DEFAULT, InputProperty.DEFAULT, @@ -120,16 +126,21 @@ public boolean requireWatermark() { } @Override - public RelNode copy(RelTraitSet traitSet, List inputs) { - assert inputs.size() == 2; + public Join copy( + RelTraitSet traitSet, + RexNode conditionExpr, + RelNode left, + RelNode right, + JoinRelType joinType, + boolean semiJoinDone) { return new StreamPhysicalDeltaJoin( getCluster(), traitSet, hints, - inputs.get(0), - inputs.get(1), + left, + right, joinType, - originalJoinCondition, + conditionExpr, lookupRightTableJoinSpec, lookupLeftTableJoinSpec, rowType); @@ -147,12 +158,13 @@ public com.google.common.collect.ImmutableList getHints() { @Override public RelWriter explainTerms(RelWriter pw) { - return super.explainTerms(pw) - .item("joinType", joinType.toString()) + return pw.input("left", left) + .input("right", right) + .item("joinType", JoinTypeUtil.getFlinkJoinType(joinType).toString()) .item( "where", getExpressionString( - originalJoinCondition, + condition, JavaScalaConversionUtil.toScala(this.getRowType().getFieldNames()) .toList(), JavaScalaConversionUtil.toScala(Optional.empty()), diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DeltaJoinRewriteRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DeltaJoinRewriteRule.java index e4c90fea9a245..14529f69ab84a 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DeltaJoinRewriteRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DeltaJoinRewriteRule.java @@ -25,7 +25,6 @@ import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDeltaJoin; import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalJoin; import org.apache.flink.table.planner.plan.utils.DeltaJoinUtil; -import org.apache.flink.table.planner.plan.utils.JoinTypeUtil; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.RelRule; @@ -44,7 +43,9 @@ *
  • The join is INNER join. *
  • There is at least one join key pair in the join. *
  • The downstream nodes of this join can accept duplicate changes. - *
  • All join inputs are insert only streams. + *
  • All join inputs are with changelog "I" or "I, UA". + *
  • If this join outputs update records, the non-equiv conditions must be applied on upsert + * keys of this join. *
  • All upstream nodes of this join are in {@code * DeltaJoinUtil#ALL_SUPPORTED_DELTA_JOIN_UPSTREAM_NODES} *
  • The join keys include at least one complete index in each source table of the join input. @@ -92,7 +93,7 @@ private StreamPhysicalDeltaJoin convertToDeltaJoin(StreamPhysicalJoin join) { join.getHints(), join.getLeft(), join.getRight(), - JoinTypeUtil.getFlinkJoinType(join.getJoinType()), + join.getJoinType(), join.getCondition(), lookupRightTableJoinSpec, lookupLeftTableJoinSpec, diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java index 43bd7e52463c8..ab228502030a6 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java @@ -25,9 +25,12 @@ import org.apache.flink.table.connector.source.LookupTableSource; import org.apache.flink.table.functions.AsyncTableFunction; import org.apache.flink.table.functions.UserDefinedFunction; +import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery; import org.apache.flink.table.planner.plan.nodes.exec.spec.DeltaJoinSpec; +import org.apache.flink.table.planner.plan.nodes.exec.spec.JoinSpec; import org.apache.flink.table.planner.plan.nodes.exec.spec.TemporalTableSourceSpec; import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDeltaJoin; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDropUpdateBefore; import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange; import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalIntermediateTableScan; import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalJoin; @@ -53,10 +56,12 @@ import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.mapping.IntPair; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -75,7 +80,10 @@ public class DeltaJoinUtil { *

    More physical nodes can be added to support more patterns for delta join. */ private static final Set> ALL_SUPPORTED_DELTA_JOIN_UPSTREAM_NODES = - Sets.newHashSet(StreamPhysicalTableSourceScan.class, StreamPhysicalExchange.class); + Sets.newHashSet( + StreamPhysicalTableSourceScan.class, + StreamPhysicalExchange.class, + StreamPhysicalDropUpdateBefore.class); private DeltaJoinUtil() {} @@ -95,8 +103,8 @@ public static boolean canConvertToDeltaJoin(StreamPhysicalJoin join) { return false; } - // currently, only join with append-only inputs is supported - if (!areAllInputsInsertOnly(join)) { + // currently, only join that consumes +I and +U is supported + if (!areAllInputsInsertOrUpdateAfter(join)) { return false; } @@ -240,7 +248,29 @@ private static List> getAllIndexesColumnsOfTable( private static boolean areJoinConditionsSupported(StreamPhysicalJoin join) { JoinInfo joinInfo = join.analyzeCondition(); // there must be one pair of join key - return !joinInfo.pairs().isEmpty(); + if (joinInfo.pairs().isEmpty()) { + return false; + } + + // if this join outputs cdc records and has non-equiv condition, the reference columns in + // the non-equiv condition must come from the same set of upsert keys + ChangelogMode changelogMode = getChangelogMode(join); + if (changelogMode.containsOnly(RowKind.INSERT)) { + return true; + } + JoinSpec joinSpec = join.joinSpec(); + Optional nonEquiCond = joinSpec.getNonEquiCondition(); + if (nonEquiCond.isEmpty()) { + return true; + } + ImmutableBitSet fieldRefIndices = + ImmutableBitSet.of( + RexNodeExtractor.extractRefInputFields( + Collections.singletonList(nonEquiCond.get()))); + FlinkRelMetadataQuery fmq = + FlinkRelMetadataQuery.reuseOrCreate(join.getCluster().getMetadataQuery()); + Set upsertKeys = fmq.getUpsertKeys(join); + return upsertKeys.stream().anyMatch(uk -> uk.contains(fieldRefIndices)); } private static boolean areAllJoinTableScansSupported(StreamPhysicalJoin join) { @@ -300,8 +330,9 @@ private static TableScan getTableScan(RelNode node) { node = unwrapNode(node, true); // support to get table across more nodes if we support more nodes in // `ALL_SUPPORTED_DELTA_JOIN_UPSTREAM_NODES` - if (node instanceof StreamPhysicalExchange) { - return getTableScan(((StreamPhysicalExchange) node).getInput()); + if (node instanceof StreamPhysicalExchange + || node instanceof StreamPhysicalDropUpdateBefore) { + return getTableScan(node.getInput(0)); } Preconditions.checkState(node instanceof TableScan); @@ -339,25 +370,29 @@ private static boolean canJoinOutputDuplicateChanges(StreamPhysicalJoin join) { return DuplicateChanges.ALLOW.equals(duplicateChanges); } - private static boolean areAllInputsInsertOnly(StreamPhysicalJoin join) { + private static boolean areAllInputsInsertOrUpdateAfter(StreamPhysicalJoin join) { for (RelNode input : join.getInputs()) { - if (!isInsertOnly(unwrapNode(input, false))) { + if (!onlyProduceInsertOrUpdateAfter(unwrapNode(input, false))) { return false; } } return true; } - private static boolean isInsertOnly(StreamPhysicalRel node) { - ChangelogMode changelogMode = - JavaScalaConversionUtil.toJava(ChangelogPlanUtils.getChangelogMode(node)) - .orElseThrow( - () -> - new IllegalStateException( - String.format( - "Unable to derive changelog mode from node %s. This is a bug.", - node))); - return changelogMode.containsOnly(RowKind.INSERT); + private static boolean onlyProduceInsertOrUpdateAfter(StreamPhysicalRel node) { + ChangelogMode changelogMode = getChangelogMode(node); + Set allKinds = changelogMode.getContainedKinds(); + return !allKinds.contains(RowKind.UPDATE_BEFORE) && !allKinds.contains(RowKind.DELETE); + } + + private static ChangelogMode getChangelogMode(StreamPhysicalRel node) { + return JavaScalaConversionUtil.toJava(ChangelogPlanUtils.getChangelogMode(node)) + .orElseThrow( + () -> + new IllegalStateException( + String.format( + "Unable to derive changelog mode from node %s. This is a bug.", + node))); } private static StreamPhysicalRel unwrapNode(RelNode node, boolean transposeToChildBlock) { diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java index d31ef6531eaf6..96de677864fd2 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java @@ -1962,8 +1962,9 @@ private List deduplicateDataByPk(List data) { Row pk = extractPk(row); RowKind originalRowKind = row.getKind(); if (originalRowKind == RowKind.INSERT || originalRowKind == RowKind.UPDATE_AFTER) { - row.setKind(RowKind.INSERT); - pkMap.put(pk, row); + Row copiedRow = copyRow(row); + copiedRow.setKind(RowKind.INSERT); + pkMap.put(pk, copiedRow); } else { pkMap.remove(pk); } @@ -1971,6 +1972,14 @@ private List deduplicateDataByPk(List data) { return new ArrayList<>(pkMap.values()); } + private Row copyRow(Row oldRow) { + Row newRow = new Row(oldRow.getKind(), oldRow.getArity()); + for (int i = 0; i < newRow.getArity(); i++) { + newRow.setField(i, oldRow.getField(i)); + } + return newRow; + } + private Row extractPk(Row row) { Object[] pk = new Object[primaryKeyIndices.length]; for (int i = 0; i < primaryKeyIndices.length; i++) { diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml index 7cd943cb20a83..ca486728f6db0 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml @@ -37,30 +37,6 @@ Sink(table=[default_catalog.default_database.append_snk], fields=[a0, a1, a2, a3 : +- TableSourceScan(table=[[default_catalog, default_database, src1]], fields=[a0, a1, a2, a3]) +- Exchange(distribution=[hash[b1, b2]]) +- TableSourceScan(table=[[default_catalog, default_database, src2]], fields=[b0, b2, b1]) -]]> - - - - - - - - - - - @@ -298,6 +274,84 @@ Sink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, a3, b0, b : +- TableSourceScan(table=[[default_catalog, default_database, src1, filter=[]]], fields=[a0, a1, a2, a3]) +- Exchange(distribution=[hash[b1, b2]]) +- TableSourceScan(table=[[default_catalog, default_database, src2]], fields=[b0, b2, b1]) +]]> + + + + + b0]]> + + + ($3, $4)]) + +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner]) + :- LogicalTableScan(table=[[default_catalog, default_database, no_delete_src1]]) + +- LogicalTableScan(table=[[default_catalog, default_database, no_delete_src2]]) +]]> + + + (a3, b0))], select=[a0, a1, a2, a3, b0, b2, b1], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA]) + :- Exchange(distribution=[hash[a1, a2]], changelogMode=[I,UA]) + : +- DropUpdateBefore(changelogMode=[I,UA]) + : +- TableSourceScan(table=[[default_catalog, default_database, no_delete_src1]], fields=[a0, a1, a2, a3], changelogMode=[I,UB,UA]) + +- Exchange(distribution=[hash[b1, b2]], changelogMode=[I,UA]) + +- DropUpdateBefore(changelogMode=[I,UA]) + +- TableSourceScan(table=[[default_catalog, default_database, no_delete_src2]], fields=[b0, b2, b1], changelogMode=[I,UB,UA]) +]]> + + + + + b1]]> + + + ($0, $6)]) + +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner]) + :- LogicalTableScan(table=[[default_catalog, default_database, no_delete_src1]]) + +- LogicalTableScan(table=[[default_catalog, default_database, no_delete_src2]]) +]]> + + + (a0, b1))], select=[a0, a1, a2, a3, b0, b2, b1], changelogMode=[I,UA]) + :- Exchange(distribution=[hash[a1, a2]], changelogMode=[I,UA]) + : +- DropUpdateBefore(changelogMode=[I,UA]) + : +- TableSourceScan(table=[[default_catalog, default_database, no_delete_src1]], fields=[a0, a1, a2, a3], changelogMode=[I,UB,UA]) + +- Exchange(distribution=[hash[b1, b2]], changelogMode=[I,UA]) + +- DropUpdateBefore(changelogMode=[I,UA]) + +- TableSourceScan(table=[[default_catalog, default_database, no_delete_src2]], fields=[b0, b2, b1], changelogMode=[I,UB,UA]) +]]> + + + + + + + + + + + @@ -567,6 +621,86 @@ Sink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, a3, b0, b Sink(table=[default_catalog.default_database.snk2], fields=[a0, a1, a2, a3, b0, b2, b1]) +- Reused(reference_id=[1]) +]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -618,6 +752,54 @@ Sink(table=[default_catalog.default_database.snk], targetColumns=[[0],[1],[2],[4 : +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a0, a1, a2], metadata=[]]], fields=[a0, a1, a2]) +- Exchange(distribution=[hash[b1, b2]]) +- TableSourceScan(table=[[default_catalog, default_database, src2]], fields=[b0, b2, b1]) +]]> + + + + + + + + + + + + + + + + + + + + + + @@ -675,27 +857,27 @@ Sink(table=[default_catalog.default_database.tmp_snk], fields=[a0, a1, a2, a3, b ]]> - + - + diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala index 50347c42915e8..138d7fa691bff 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala @@ -26,6 +26,7 @@ import org.apache.flink.table.planner.JMap import org.apache.flink.table.planner.utils.{TableTestBase, TestingTableEnvironment} import org.assertj.core.api.Assertions.assertThatThrownBy +import org.assertj.core.util.Maps import org.junit.jupiter.api.{BeforeEach, Test} import java.util.{Collections, HashMap => JHashMap} @@ -76,6 +77,33 @@ class DeltaJoinTest extends TableTestBase { .build() ) + addTable( + "no_delete_src1", + Schema + .newBuilder() + .column("a0", DataTypes.INT.notNull) + .column("a1", DataTypes.DOUBLE.notNull) + .column("a2", DataTypes.STRING.notNull) + .column("a3", DataTypes.INT) + .primaryKey("a0", "a1", "a2") + .index("a1", "a2") + .build(), + Maps.newHashMap("changelog-mode", "I,UA,UB") + ) + + addTable( + "no_delete_src2", + Schema + .newBuilder() + .column("b0", DataTypes.INT) + .column("b2", DataTypes.STRING.notNull) + .column("b1", DataTypes.DOUBLE.notNull) + .primaryKey("b1", "b2") + .index("b2") + .build(), + Maps.newHashMap("changelog-mode", "I,UA,UB") + ) + addTable( "snk", Schema @@ -90,6 +118,21 @@ class DeltaJoinTest extends TableTestBase { .primaryKey("l0", "r0") .build() ) + + addTable( + "snk_for_cdc_src", + Schema + .newBuilder() + .column("l0", DataTypes.INT.notNull) + .column("l1", DataTypes.DOUBLE.notNull) + .column("l2", DataTypes.STRING.notNull) + .column("l3", DataTypes.INT) + .column("r0", DataTypes.INT) + .column("r2", DataTypes.STRING.notNull) + .column("r1", DataTypes.DOUBLE.notNull) + .primaryKey("l0", "l1", "l2", "r1", "r2") + .build() + ) } @Test @@ -188,6 +231,30 @@ class DeltaJoinTest extends TableTestBase { "where a3 > b0") } + @Test + def testFilterOnNonUpsertKeysAfterJoinWithCdcSourceWithoutDelete(): Unit = { + util.verifyRelPlanInsert( + "insert into snk_for_cdc_src select * from no_delete_src1 " + + "join no_delete_src2 " + + "on a1 = b1 " + + "and a2 = b2 " + + "where a3 > b0", + ExplainDetail.CHANGELOG_MODE + ) + } + + @Test + def testFilterOnUpsertKeysAfterJoinWithCdcSourceWithoutDelete(): Unit = { + util.verifyRelPlanInsert( + "insert into snk_for_cdc_src select * from no_delete_src1 " + + "join no_delete_src2 " + + "on a1 = b1 " + + "and a2 = b2 " + + "where a0 > b1", + ExplainDetail.CHANGELOG_MODE + ) + } + @Test def testMultiRootsWithReusingJoinView(): Unit = { util.tableConfig.set( @@ -372,7 +439,7 @@ class DeltaJoinTest extends TableTestBase { } @Test - def testCdcSource(): Unit = { + def testSourceWithAllRowKinds(): Unit = { util.tableConfig.set( ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE, UpsertMaterialize.NONE) @@ -387,6 +454,99 @@ class DeltaJoinTest extends TableTestBase { "and src1.a2 = cdc_src.b2") } + @Test + def testPKContainsJoinKeyAndTwoSourcesNoDelete(): Unit = { + util.verifyRelPlanInsert( + "insert into snk_for_cdc_src " + + "select * from no_delete_src1 join no_delete_src2 " + + "on a1 = b1 " + + "and a2 = b2", + ExplainDetail.CHANGELOG_MODE) + } + + @Test + def testPKNotContainJoinKeyAndTwoSourcesNoDelete(): Unit = { + util.tableConfig.set( + ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE, + UpsertMaterialize.NONE) + + util.verifyRelPlanInsert( + "insert into snk_for_cdc_src " + + "select * from no_delete_src1 join no_delete_src2 " + + "on a0 = b0 " + + "and a1 = b1 " + + "and a2 = b2", + ExplainDetail.CHANGELOG_MODE + ) + } + + @Test + def testPKContainJoinKeyAndOnlyOneSourceNoDelete(): Unit = { + util.tableEnv.executeSql(""" + |create table all_changelog_src with ( + | 'changelog-mode' = 'I,UA,UB,D' + |) like no_delete_src1 + |""".stripMargin) + + util.verifyRelPlanInsert( + "insert into snk_for_cdc_src " + + "select * from all_changelog_src join no_delete_src2 " + + "on a1 = b1 " + + "and a2 = b2", + ExplainDetail.CHANGELOG_MODE + ) + } + + @Test + def testPKContainsJoinKeyAndSourceNoUBAndD(): Unit = { + // FLINK-38489 Currently, ChangelogNormalize will always generate changelog mode with D, + // and Join with D cannot be optimized into Delta Join + util.tableEnv.executeSql(""" + |create table no_delete_and_update_before_src1 with ( + | 'changelog-mode' = 'I,UA' + |) like no_delete_src1 + |""".stripMargin) + + util.tableEnv.executeSql(""" + |create table no_delete_and_update_before_src2 with ( + | 'changelog-mode' = 'I,UA' + |) like no_delete_src2 + |""".stripMargin) + + util.verifyRelPlanInsert( + "insert into snk_for_cdc_src " + + "select * from no_delete_and_update_before_src1 " + + "join no_delete_and_update_before_src2 " + + "on a1 = b1 " + + "and a2 = b2", + ExplainDetail.CHANGELOG_MODE + ) + } + + @Test + def testJoinAppendOnlySourceAndSourceWithoutDelete(): Unit = { + util.tableConfig.set( + ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE, + UpsertMaterialize.NONE) + + util.tableEnv.executeSql(""" + |create table tmp_snk ( + | primary key (r1, r2) not enforced + |) like snk_for_cdc_src ( + | EXCLUDING CONSTRAINTS + |) + |""".stripMargin) + + // the join could not be converted into the delta join + // because the upsert key of the join is `null` + util.verifyRelPlanInsert( + "insert into tmp_snk " + + "select * from src1 join no_delete_src2 " + + "on a1 = b1 " + + "and a2 = b2", + ExplainDetail.CHANGELOG_MODE) + } + @Test def testSourceWithSourceAbilities(): Unit = { util.tableEnv.executeSql( diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeltaJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeltaJoinITCase.scala index 063fe973c67e9..0c5d881d6608e 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeltaJoinITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeltaJoinITCase.scala @@ -30,13 +30,14 @@ import org.apache.flink.table.planner.runtime.utils.{FailingCollectionSource, St import org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedTestExtension, Parameters} import org.apache.flink.types.Row -import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy} +import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.{BeforeEach, TestTemplate} import org.junit.jupiter.api.extension.ExtendWith import javax.annotation.Nullable import java.time.LocalDateTime +import java.util.Objects.requireNonNull import java.util.concurrent.TimeUnit import scala.collection.JavaConversions._ @@ -81,7 +82,18 @@ class DeltaJoinITCase(enableCache: Boolean) extends StreamingTestBase { "+I[1.0, 1, 2021-01-01T01:01:01, 1, 1.0, 2021-01-01T01:01:01]", "+I[2.0, 2, 2022-02-02T02:02:02, 2, 2.0, 2022-02-02T02:02:22]" ) - testUpsertResult(List("a1"), List("b1"), data1, data2, "a1 = b1", expected, 6) + + testUpsertResult( + newTestSpecBuilder() + .withLeftIndex(List("a1")) + .withRightIndex(List("b1")) + .withLeftData(data1) + .withRightData(data2) + .withJoinCondition("a1 = b1") + .withSinkPk(List("l0", "r0")) + .withExpectedData(expected) + .withExpectedLookupFunctionInvokeCount(6) + .build()) } @TestTemplate @@ -106,34 +118,18 @@ class DeltaJoinITCase(enableCache: Boolean) extends StreamingTestBase { "+I[2.0, 2, 2022-02-02T02:02:02, 2, 2.0, 2022-02-02T02:02:02]", "+I[1.0, 1, 2021-01-01T01:01:01, 1, 1.0, 2021-01-01T01:01:01]" ) - testUpsertResult(List("a1"), List("b1"), data1, data2, "a1 = b1 and a2 = b2", expected, 6) - } - - @TestTemplate - def testJoinKeyNotContainsIndex(): Unit = { - val data1 = List( - changelogRow("+I", Double.box(1.0), Int.box(1), LocalDateTime.of(2021, 1, 1, 1, 1, 1)), - changelogRow("+I", Double.box(2.0), Int.box(2), LocalDateTime.of(2022, 2, 2, 2, 2, 2)), - // mismatch - changelogRow("+I", Double.box(3.0), Int.box(3), LocalDateTime.of(2023, 3, 3, 3, 3, 3)) - ) - val data2 = List( - changelogRow("+I", Int.box(1), Double.box(1.0), LocalDateTime.of(2021, 1, 1, 1, 1, 11)), - changelogRow("+I", Int.box(2), Double.box(2.0), LocalDateTime.of(2022, 2, 2, 2, 2, 22)), - // mismatch - changelogRow("+I", Int.box(99), Double.box(99.0), LocalDateTime.of(2099, 2, 2, 2, 2, 2)) - ) - - // TestValuesRuntimeFunctions#KeyedUpsertingSinkFunction will change the RowKind from - // "+U" to "+I" - val expected = List("+I[1.0, 1, 2022-02-02T02:02:02, 1, 1.0, 2022-02-02T02:02:22]") - - // could not optimize into delta join because join keys do not contain indexes strictly - assertThatThrownBy( - () => - testUpsertResult(List("a0", "a1"), List("b0", "b1"), data1, data2, "a1 = b1", expected, 6)) - .hasMessageContaining("The current sql doesn't support to do delta join optimization.") + testUpsertResult( + newTestSpecBuilder() + .withLeftIndex(List("a1")) + .withRightIndex(List("b1")) + .withLeftData(data1) + .withRightData(data2) + .withJoinCondition("a1 = b1 and a2 = b2") + .withSinkPk(List("l0", "r0")) + .withExpectedData(expected) + .withExpectedLookupFunctionInvokeCount(6) + .build()) } @TestTemplate @@ -157,14 +153,18 @@ class DeltaJoinITCase(enableCache: Boolean) extends StreamingTestBase { val expected = List( "+I[1.0, 1, 2022-02-02T02:02:02, 1, 1.0, 2022-02-02T02:02:22]" ) + testUpsertResult( - List("a1"), - List("b1"), - data1, - data2, - "a1 = b1", - expected, - if (enableCache) 4 else 6) + newTestSpecBuilder() + .withLeftIndex(List("a1")) + .withRightIndex(List("b1")) + .withLeftData(data1) + .withRightData(data2) + .withJoinCondition("a1 = b1") + .withSinkPk(List("l0", "r0")) + .withExpectedData(expected) + .withExpectedLookupFunctionInvokeCount(if (enableCache) 4 else 6) + .build()) } @TestTemplate @@ -192,13 +192,17 @@ class DeltaJoinITCase(enableCache: Boolean) extends StreamingTestBase { "+I[1.0, 2, 2021-01-01T01:01:01, 2, 1.0, 2021-01-01T01:01:01]" ) testUpsertResult( - List("a1"), - List("b1"), - data1, - data2, - "a1 = b1 and a2 = b2", - expected, - if (enableCache) 4 else 6) + newTestSpecBuilder() + .withLeftIndex(List("a1")) + .withRightIndex(List("b1")) + .withLeftData(data1) + .withRightData(data2) + .withJoinCondition("a1 = b1 and a2 = b2") + .withSinkPk(List("l0", "r0")) + .withExpectedData(expected) + .withExpectedLookupFunctionInvokeCount(if (enableCache) 4 else 6) + .build() + ) } @TestTemplate @@ -222,99 +226,147 @@ class DeltaJoinITCase(enableCache: Boolean) extends StreamingTestBase { val expected = List("+I[2.0, 2, 2023-03-03T03:03:03, 2, 2.0, 2022-02-02T02:02:22]") testUpsertResult( - List("a0"), - List("b0"), - data1, - data2, - "a0 = b0 and a1 = b1 and a2 > b2", - expected, - 6) + newTestSpecBuilder() + .withLeftIndex(List("a0")) + .withRightIndex(List("b0")) + .withLeftData(data1) + .withRightData(data2) + .withJoinCondition("a0 = b0 and a1 = b1 and a2 > b2") + .withSinkPk(List("l0", "r0")) + .withExpectedData(expected) + .withExpectedLookupFunctionInvokeCount(6) + .build()) } @TestTemplate - def testWithNonEquiCondition2(): Unit = { + def testCdcSourceWithoutDelete(): Unit = { val data1 = List( + // pk1 changelogRow("+I", Double.box(1.0), Int.box(1), LocalDateTime.of(2021, 1, 1, 1, 1, 1)), - changelogRow("+I", Double.box(2.0), Int.box(2), LocalDateTime.of(2023, 3, 3, 3, 3, 3)), + changelogRow("-U", Double.box(1.0), Int.box(1), LocalDateTime.of(2021, 1, 1, 1, 1, 1)), + changelogRow("+U", Double.box(11.0), Int.box(1), LocalDateTime.of(2021, 1, 1, 1, 1, 11)), + // pk2 + changelogRow("+I", Double.box(2.0), Int.box(2), LocalDateTime.of(2022, 2, 2, 2, 2, 2)), + changelogRow("-U", Double.box(2.0), Int.box(2), LocalDateTime.of(2022, 2, 2, 2, 2, 2)), + changelogRow("+U", Double.box(22.0), Int.box(2), LocalDateTime.of(2022, 2, 2, 2, 2, 22)), + // mismatch + changelogRow("+I", Double.box(3.0), Int.box(3), LocalDateTime.of(2023, 3, 3, 3, 3, 3)) + ) + + val data2 = List( + // pk1 + changelogRow("+I", Int.box(1), Double.box(12.0), LocalDateTime.of(2021, 1, 1, 1, 1, 12)), + changelogRow("-U", Int.box(1), Double.box(12.0), LocalDateTime.of(2021, 1, 1, 1, 1, 12)), + changelogRow("+U", Int.box(1), Double.box(13.0), LocalDateTime.of(2021, 1, 1, 1, 1, 13)), + // pk2 + changelogRow("+I", Int.box(2), Double.box(22.0), LocalDateTime.of(2022, 2, 2, 2, 2, 22)), + changelogRow("-U", Int.box(2), Double.box(22.0), LocalDateTime.of(2022, 2, 2, 2, 2, 22)), + changelogRow("+U", Int.box(2), Double.box(23.0), LocalDateTime.of(2022, 2, 2, 2, 2, 23)), + // mismatch + changelogRow("+I", Int.box(99), Double.box(99.0), LocalDateTime.of(2099, 2, 2, 2, 2, 2)) + ) + + // TestValuesRuntimeFunctions#KeyedUpsertingSinkFunction will change the RowKind from + // "+U" to "+I" + val expected = List( + "+I[11.0, 1, 2021-01-01T01:01:11, 1, 13.0, 2021-01-01T01:01:13]", + "+I[22.0, 2, 2022-02-02T02:02:22, 2, 23.0, 2022-02-02T02:02:23]" + ) + + testUpsertResult( + newTestSpecBuilder() + .withLeftIndex(List("a0")) + .withRightIndex(List("b0")) + .withLeftPk(List("a0")) + .withRightPk(List("b0")) + .withSinkPk(List("l0", "r0")) + .withLeftChangelogMode("I,UA,UB") + .withRightChangelogMode("I,UA,UB") + .withLeftData(data1) + .withRightData(data2) + .withJoinCondition("a0 = b0") + .withExpectedData(expected) + .withExpectedLookupFunctionInvokeCount(if (enableCache) 6 else 10) + .build()) + } + + @TestTemplate + def testFilterFieldsAfterJoin(): Unit = { + val data1 = List( + changelogRow("+I", Double.box(1.0), Int.box(1), LocalDateTime.of(2021, 1, 1, 1, 1, 1)), + changelogRow("+I", Double.box(2.0), Int.box(2), LocalDateTime.of(2022, 2, 2, 2, 2, 2)), // mismatch changelogRow("+I", Double.box(3.0), Int.box(3), LocalDateTime.of(2033, 3, 3, 3, 3, 3)) ) val data2 = List( changelogRow("+I", Int.box(1), Double.box(1.0), LocalDateTime.of(2021, 1, 1, 1, 1, 11)), - changelogRow("+I", Int.box(2), Double.box(2.0), LocalDateTime.of(2022, 2, 2, 2, 2, 22)), + changelogRow("+I", Int.box(2), Double.box(3.0), LocalDateTime.of(2022, 2, 2, 2, 2, 33)), // mismatch changelogRow("+I", Int.box(99), Double.box(99.0), LocalDateTime.of(2099, 2, 2, 2, 2, 2)) ) // TestValuesRuntimeFunctions#KeyedUpsertingSinkFunction will change the RowKind from // "+U" to "+I" - val expected = List("+I[2.0, 2, 2023-03-03T03:03:03, 2, 2.0, 2022-02-02T02:02:22]") + val expected = List("+I[2.0, 2, 2022-02-02T02:02:02, 2, 3.0, 2022-02-02T02:02:33]") - // could not optimize into delta join because there is calc between join and source - assertThatThrownBy( - () => - testUpsertResult( - List("a0"), - List("b0"), - data1, - data2, - "a0 = b0 and a1 = b1 and a2 > TO_TIMESTAMP('2021-01-01 01:01:11')", - expected, - 6)) - .hasMessageContaining("The current sql doesn't support to do delta join optimization.") - - // could not optimize into delta join because there is calc between join and source - assertThatThrownBy( - () => - testUpsertResult( - List("a0"), - List("b0"), - data1, - data2, - "a0 = b0 and b1 > 1.0", - expected, - 12)) - .hasMessageContaining("The current sql doesn't support to do delta join optimization.") + testUpsertResult( + newTestSpecBuilder() + .withLeftIndex(List("a0")) + .withRightIndex(List("b0")) + .withLeftData(data1) + .withRightData(data2) + .withJoinCondition("a0 = b0") + .withFilterAfterJoin("a1 <> b1") + .withSinkPk(List("l0", "r0")) + .withExpectedData(expected) + .withExpectedLookupFunctionInvokeCount(6) + .build()) } @TestTemplate - def testFilterFieldsBeforeJoin(): Unit = { + def testFilterFieldsAfterJoinWithCdcSourceWithoutDelete(): Unit = { val data1 = List( + // pk1 changelogRow("+I", Double.box(1.0), Int.box(1), LocalDateTime.of(2021, 1, 1, 1, 1, 1)), - changelogRow("+I", Double.box(2.0), Int.box(2), LocalDateTime.of(2022, 2, 2, 2, 2, 2)), + changelogRow("-U", Double.box(1.0), Int.box(1), LocalDateTime.of(2021, 1, 1, 1, 1, 1)), + changelogRow("+U", Double.box(1.0), Int.box(1), LocalDateTime.of(2021, 1, 1, 1, 1, 2)), + // pk2 + changelogRow("+I", Double.box(2.0), Int.box(3), LocalDateTime.of(2022, 2, 2, 2, 2, 2)), + changelogRow("-U", Double.box(2.0), Int.box(3), LocalDateTime.of(2022, 2, 2, 2, 2, 2)), + changelogRow("+U", Double.box(2.0), Int.box(3), LocalDateTime.of(2022, 2, 2, 2, 2, 3)), // mismatch changelogRow("+I", Double.box(3.0), Int.box(3), LocalDateTime.of(2033, 3, 3, 3, 3, 3)) ) val data2 = List( + // pk1 changelogRow("+I", Int.box(1), Double.box(1.0), LocalDateTime.of(2021, 1, 1, 1, 1, 11)), - changelogRow("+I", Int.box(2), Double.box(2.0), LocalDateTime.of(2022, 2, 2, 2, 2, 22)), + changelogRow("+I", Int.box(3), Double.box(2.0), LocalDateTime.of(2022, 2, 2, 2, 2, 22)), // mismatch changelogRow("+I", Int.box(99), Double.box(99.0), LocalDateTime.of(2099, 2, 2, 2, 2, 2)) ) // TestValuesRuntimeFunctions#KeyedUpsertingSinkFunction will change the RowKind from // "+U" to "+I" - val expected = List("+I[1.0, 1, 2021-01-01T01:01:01, 1, 1.0, 2021-01-01T01:01:11]") - - // could not optimize into delta join because there is calc between join and source - assertThatThrownBy( - () => - testUpsertResult( - List("a0"), - List("b0"), - data1, - data2, - "a0 = b0 and a1 = b1 and a2 = TO_TIMESTAMP('2021-01-01 01:01:01')", - expected, - 6)) - .hasMessageContaining("The current sql doesn't support to do delta join optimization.") - - // could not optimize into delta join because there is calc between join and source - assertThatThrownBy( - () => testUpsertResult(List(), List(), data1, data2, "a0 = b0 and b1 = 1.0", expected, 12)) - .hasMessageContaining("The current sql doesn't support to do delta join optimization.") + val expected = List("+I[2.0, 3, 2022-02-02T02:02:03, 3, 2.0, 2022-02-02T02:02:22]") + + testUpsertResult( + newTestSpecBuilder() + .withLeftIndex(List("a0")) + .withRightIndex(List("b0")) + .withLeftPk(List("a0", "a1")) + .withRightPk(List("b0", "b1")) + .withSinkPk(List("l0", "r0", "l1", "r1")) + .withLeftData(data1) + .withRightData(data2) + .withLeftChangelogMode("I,UA,UB") + .withRightChangelogMode("I,UA,UB") + .withJoinCondition("a0 = b0") + .withFilterAfterJoin("a1 < b0") + .withExpectedData(expected) + .withExpectedLookupFunctionInvokeCount(if (enableCache) 5 else 8) + .build()) } @TestTemplate @@ -363,85 +415,6 @@ class DeltaJoinITCase(enableCache: Boolean) extends StreamingTestBase { assertThat(AsyncTestValueLookupFunction.invokeCount.get()).isEqualTo(6) } - @TestTemplate - def testProjectFieldsBeforeJoin(): Unit = { - val data1 = List( - changelogRow("+I", Double.box(1.0), Int.box(1), LocalDateTime.of(2021, 1, 1, 1, 1, 1)), - changelogRow("+I", Double.box(2.0), Int.box(2), LocalDateTime.of(2022, 2, 2, 2, 2, 2)), - // mismatch - changelogRow("+I", Double.box(3.0), Int.box(3), LocalDateTime.of(2033, 3, 3, 3, 3, 3)) - ) - - val data2 = List( - changelogRow("+I", Int.box(1), Double.box(1.0), LocalDateTime.of(2021, 1, 1, 1, 1, 11)), - changelogRow("+I", Int.box(2), Double.box(2.0), LocalDateTime.of(2022, 2, 2, 2, 2, 22)), - // mismatch - changelogRow("+I", Int.box(99), Double.box(99.0), LocalDateTime.of(2099, 2, 2, 2, 2, 2)) - ) - - prepareTable(List("a0"), List("b0"), data1, data2) - tEnv.executeSql(s""" - |create table projectedSink( - | l0 int, - | r0 int, - | l1 double, - | l2 timestamp(3), - | primary key(l0, r0) not enforced - |) with ( - | 'connector' = 'values', - | 'bounded' = 'false', - | 'sink-insert-only' = 'false' - |) - |""".stripMargin) - - // could not optimize into delta join - // because there is ProjectPushDownSpec between join and source - assertThatThrownBy(() => tEnv.executeSql(""" - |insert into projectedSink - | select - | testLeft.a0, - | testRight.b0, - | testLeft.a1, - | testLeft.a2 - | from testLeft - | join testRight - | on a0 = b0 - |""".stripMargin)) - .hasMessageContaining("The current sql doesn't support to do delta join optimization.") - } - - @TestTemplate - def testProjectFieldsBeforeJoin2(): Unit = { - val data1 = List( - changelogRow("+I", Double.box(1.0), Int.box(1), LocalDateTime.of(2021, 1, 1, 1, 1, 1)), - changelogRow("+I", Double.box(2.0), Int.box(2), LocalDateTime.of(2022, 2, 2, 2, 2, 2)), - // mismatch - changelogRow("+I", Double.box(3.0), Int.box(3), LocalDateTime.of(2033, 3, 3, 3, 3, 3)) - ) - - val data2 = List( - changelogRow("+I", Int.box(1), Double.box(1.0), LocalDateTime.of(2021, 1, 1, 1, 1, 11)), - changelogRow("+I", Int.box(2), Double.box(2.0), LocalDateTime.of(2022, 2, 2, 2, 2, 22)), - // mismatch - changelogRow("+I", Int.box(99), Double.box(99.0), LocalDateTime.of(2099, 2, 2, 2, 2, 2)) - ) - - prepareTable(List("a0"), List("b0"), data1, data2) - - // could not optimize into delta join because there is calc between join and source - assertThatThrownBy( - () => - tEnv - .executeSql( - s"insert into testSnk(l0, l1, l2, r0, r1, r2) " + - "select * from ( " + - " select a0, a1, a2 from testLeft" + - ") join testRight " + - "on a1 = b1") - .await()) - .hasMessageContaining("The current sql doesn't support to do delta join optimization.") - } - @TestTemplate def testFailOverAndRestore(): Unit = { // enable checkpoint, we are using failing source to force have a complete checkpoint @@ -468,14 +441,16 @@ class DeltaJoinITCase(enableCache: Boolean) extends StreamingTestBase { val expected = List("+I[2.0, 2, 2022-02-02T02:02:02, 2, 2.0, 2022-02-02T02:02:22]") testUpsertResult( - List("a0"), - List("b0"), - data1, - data2, - "a0 = b0 and a1 = b1", - expected, - null, - testFailingSource = true) + newTestSpecBuilder() + .withLeftIndex(List("a0")) + .withRightIndex(List("b0")) + .withLeftData(data1) + .withRightData(data2) + .withJoinCondition("a0 = b0 and a1 = b1") + .withSinkPk(List("l0", "r0")) + .withExpectedData(expected) + .withTestFailingSource(true) + .build()) } /** TODO add index in DDL. */ @@ -510,26 +485,35 @@ class DeltaJoinITCase(enableCache: Boolean) extends StreamingTestBase { catalog.createTable(tablePath, newResolvedTable, false) } - private def testUpsertResult( - leftIndex: List[String], - rightIndex: List[String], - leftData: List[Row], - rightData: List[Row], - joinKeyStr: String, - expected: List[String], - @Nullable expectedLookupFunctionInvokeCount: Integer, - testFailingSource: Boolean = false): Unit = { - prepareTable(leftIndex, rightIndex, leftData, rightData, testFailingSource) + private def testUpsertResult(testSpec: TestSpec): Unit = { + prepareTable( + testSpec.leftIndex, + testSpec.rightIndex, + testSpec.leftPk.orNull, + testSpec.rightPk.orNull, + testSpec.sinkPk, + testSpec.leftData, + testSpec.rightData, + testSpec.testFailingSource, + testSpec.leftChangelogMode, + testSpec.rightChangelogMode + ) + val sql = + s""" + | insert into testSnk + | select * from testLeft join testRight on ${testSpec.joinCondition} + | ${if (testSpec.filterAfterJoin.isEmpty) "" else s"where ${testSpec.filterAfterJoin.get}"} + |""".stripMargin tEnv - .executeSql(s"insert into testSnk select * from testLeft join testRight on $joinKeyStr") + .executeSql(sql) .await(60, TimeUnit.SECONDS) val result = TestValuesTableFactory.getResultsAsStrings("testSnk") - assertThat(result.sorted).isEqualTo(expected.sorted) - if (expectedLookupFunctionInvokeCount != null) { + assertThat(result.sorted).isEqualTo(testSpec.expected.sorted) + if (testSpec.expectedLookupFunctionInvokeCount.isDefined) { assertThat(AsyncTestValueLookupFunction.invokeCount.get()) - .isEqualTo(expectedLookupFunctionInvokeCount) + .isEqualTo(testSpec.expectedLookupFunctionInvokeCount.get) } } @@ -537,40 +521,67 @@ class DeltaJoinITCase(enableCache: Boolean) extends StreamingTestBase { leftIndex: List[String], rightIndex: List[String], leftData: List[Row], + rightData: List[Row]): Unit = { + prepareTable( + leftIndex, + rightIndex, + null, + null, + List("l0", "r0"), + leftData, + rightData, + testFailingSource = false, + "I", + "I") + } + + private def prepareTable( + leftIndex: List[String], + rightIndex: List[String], + @Nullable leftPk: List[String], + @Nullable rightPk: List[String], + sinkPk: List[String], + leftData: List[Row], rightData: List[Row], - testFailingSource: Boolean = false): Unit = { + testFailingSource: Boolean, + leftChangelogMode: String, + rightChangelogMode: String): Unit = { tEnv.executeSql("drop table if exists testLeft") - tEnv.executeSql(s""" - |create table testLeft( - | a1 double, - | a0 int, - | a2 timestamp(3) - |) with ( - | 'connector' = 'values', - | 'bounded' = 'false', - | 'changelog-mode' = 'I', - | 'data-id' = '${TestValuesTableFactory.registerData(leftData)}', - | 'async' = 'true', - | 'failing-source' = '$testFailingSource' - |) - |""".stripMargin) + tEnv.executeSql( + s""" + |create table testLeft( + | a1 double, + | a0 int, + | a2 timestamp(3) + | ${if (leftPk == null) "" else s", primary key (${leftPk.mkString(",")}) not enforced"} + |) with ( + | 'connector' = 'values', + | 'bounded' = 'false', + | 'changelog-mode' = '$leftChangelogMode', + | 'data-id' = '${TestValuesTableFactory.registerData(leftData)}', + | 'async' = 'true', + | 'failing-source' = '$testFailingSource' + |) + |""".stripMargin) addIndex("testLeft", leftIndex) tEnv.executeSql("drop table if exists testRight") - tEnv.executeSql(s""" - |create table testRight( - | b0 int, - | b1 double, - | b2 timestamp(3) - |) with ( - | 'connector' = 'values', - | 'bounded' = 'false', - | 'changelog-mode' = 'I', - | 'data-id' = '${TestValuesTableFactory.registerData(rightData)}', - | 'async' = 'true', - | 'failing-source' = '$testFailingSource' - |) - |""".stripMargin) + tEnv.executeSql( + s""" + |create table testRight( + | b0 int, + | b1 double, + | b2 timestamp(3) + | ${if (rightPk == null) "" else s", primary key (${rightPk.mkString(",")}) not enforced"} + |) with ( + | 'connector' = 'values', + | 'bounded' = 'false', + | 'changelog-mode' = '$rightChangelogMode', + | 'data-id' = '${TestValuesTableFactory.registerData(rightData)}', + | 'async' = 'true', + | 'failing-source' = '$testFailingSource' + |) + |""".stripMargin) addIndex("testRight", rightIndex) tEnv.executeSql("drop table if exists testSnk") @@ -582,7 +593,7 @@ class DeltaJoinITCase(enableCache: Boolean) extends StreamingTestBase { | r0 int, | r1 double, | r2 timestamp(3), - | primary key(l0, r0) not enforced + | primary key(${sinkPk.mkString(",")}) not enforced |) with ( | 'connector' = 'values', | 'bounded' = 'false', @@ -591,6 +602,133 @@ class DeltaJoinITCase(enableCache: Boolean) extends StreamingTestBase { |""".stripMargin) } + private def newTestSpecBuilder(): TestSpecBuilder = { + new TestSpecBuilder + } + + private case class TestSpec( + leftIndex: List[String], + rightIndex: List[String], + leftPk: Option[List[String]], + rightPk: Option[List[String]], + sinkPk: List[String], + leftData: List[Row], + rightData: List[Row], + joinCondition: String, + filterAfterJoin: Option[String], + expected: List[String], + expectedLookupFunctionInvokeCount: Option[Int], + testFailingSource: Boolean, + leftChangelogMode: String, + rightChangelogMode: String + ) + + private class TestSpecBuilder { + private var leftIndex: Option[List[String]] = None + private var rightIndex: Option[List[String]] = None + private var leftPk: Option[List[String]] = None + private var rightPk: Option[List[String]] = None + private var sinkPk: Option[List[String]] = None + private var joinCondition: Option[String] = None + private var filterAfterJoin: Option[String] = None + private var leftData: Option[List[Row]] = None + private var rightData: Option[List[Row]] = None + private var expectedData: Option[List[String]] = None + private var expectedLookupFunctionInvokeCount: Option[Int] = None + private var testFailingSource: Boolean = false + private var leftChangelogMode: String = "I" + private var rightChangelogMode: String = "I" + + def withLeftIndex(index: List[String]): TestSpecBuilder = { + leftIndex = Some(requireNonNull(index)) + this + } + + def withRightIndex(index: List[String]): TestSpecBuilder = { + rightIndex = Some(requireNonNull(index)) + this + } + + def withLeftPk(pk: List[String]): TestSpecBuilder = { + leftPk = Some(requireNonNull(pk)) + this + } + + def withRightPk(pk: List[String]): TestSpecBuilder = { + rightPk = Some(requireNonNull(pk)) + this + } + + def withSinkPk(pk: List[String]): TestSpecBuilder = { + sinkPk = Some(requireNonNull(pk)) + this + } + + def withLeftData(data: List[Row]): TestSpecBuilder = { + leftData = Some(requireNonNull(data)) + this + } + + def withRightData(data: List[Row]): TestSpecBuilder = { + rightData = Some(requireNonNull(data)) + this + } + + def withJoinCondition(condition: String): TestSpecBuilder = { + joinCondition = Some(requireNonNull(condition)) + this + } + + def withFilterAfterJoin(filter: String): TestSpecBuilder = { + filterAfterJoin = Some(requireNonNull(filter)) + this + } + + def withExpectedData(expected: List[String]): TestSpecBuilder = { + this.expectedData = Some(requireNonNull(expected)) + this + } + + def withExpectedLookupFunctionInvokeCount(count: Int): TestSpecBuilder = { + expectedLookupFunctionInvokeCount = Some(requireNonNull(count)) + this + } + + def withTestFailingSource(flag: Boolean): TestSpecBuilder = { + testFailingSource = requireNonNull(flag) + this + } + + def withLeftChangelogMode(mode: String): TestSpecBuilder = { + leftChangelogMode = requireNonNull(mode) + this + } + + def withRightChangelogMode(mode: String): TestSpecBuilder = { + rightChangelogMode = requireNonNull(mode) + this + } + + def build(): TestSpec = { + TestSpec( + requireNonNull(leftIndex.orNull), + requireNonNull(rightIndex.orNull), + leftPk, + rightPk, + requireNonNull(sinkPk.orNull), + requireNonNull(leftData.orNull), + requireNonNull(rightData.orNull), + requireNonNull(joinCondition.orNull), + filterAfterJoin, + requireNonNull(expectedData.orNull), + expectedLookupFunctionInvokeCount, + testFailingSource, + leftChangelogMode, + rightChangelogMode + ) + } + + } } object DeltaJoinITCase { diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperator.java index 45fc04b239c72..2505550104bc7 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperator.java @@ -357,8 +357,9 @@ private void tryProcess() throws Exception { private void processElement(StreamRecord element, int inputIndex) throws Exception { Preconditions.checkArgument( - RowKind.INSERT == element.getValue().getRowKind(), - "Currently, delta join only supports to consume append only stream."); + RowKind.INSERT == element.getValue().getRowKind() + || RowKind.UPDATE_AFTER == element.getValue().getRowKind(), + "Currently, delta join only supports to consume insert record or update after record."); tryProcess(); StreamRecord record; boolean isLeft = isLeft(inputIndex); diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperatorTest.java index 17b286e9819aa..de52c8f2be2bd 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperatorTest.java @@ -35,7 +35,7 @@ import org.apache.flink.table.data.conversion.DataStructureConverters; import org.apache.flink.table.runtime.collector.TableFunctionCollector; import org.apache.flink.table.runtime.collector.TableFunctionResultFuture; -import org.apache.flink.table.runtime.generated.GeneratedFunctionWrapper; +import org.apache.flink.table.runtime.generated.GeneratedFunction; import org.apache.flink.table.runtime.generated.GeneratedResultFutureWrapper; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; import org.apache.flink.table.runtime.operators.join.lookup.keyordered.AecRecord; @@ -67,7 +67,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -83,6 +82,7 @@ import static org.apache.flink.table.runtime.util.StreamRecordUtils.binaryrow; import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -94,77 +94,9 @@ public class StreamingDeltaJoinOperatorTest { private static final int CACHE_SIZE = 10; // the data snapshot of the left/right table when joining - private static final LinkedList leftTableCurrentData = new LinkedList<>(); - private static final LinkedList rightTableCurrentData = new LinkedList<>(); - - /** - * Mock sql like the following. - * - *

    -     *      CREATE TABLE leftSrc(
    -     *          left_value INT,
    -     *          left_jk1 BOOLEAN,
    -     *          left_jk2_lk STRING,
    -     *          INDEX(left_jk2_lk)
    -     *      )
    -     * 
    - * - *
    -     *      CREATE TABLE rightSrc(
    -     *          right_jk2 STRING,
    -     *          right_value INT,
    -     *          right_jk1_lk BOOLEAN,
    -     *          INDEX(right_jk1_lk)
    -     *      )
    -     * 
    - * - *
    -     *     select * from leftSrc join rightSrc
    -     *      on leftSrc.left_jk1 = rightSrc.right_jk1_lk
    -     *      and leftSrc.left_jk2_lk = rightSrc.right_jk2
    -     * 
    - * - *

    For right lookup table(left stream side delta join right table), the join key is - * [right_jk1_lk, right_jk2], and it will be split into lookup key [right_jk1_lk] and {@code - * DeltaJoinSpec#remainingCondition} [right_jk2]. - * - *

    For left lookup table(right stream side delta join left table), the join key is [left_jk1, - * left_jk2_lk], and it will be split into lookup key [left_jk2_lk] and {@code - * DeltaJoinSpec#remainingCondition} [left_jk1]. - */ - - // left join key: - // left lookup key: - private static final RowType leftRowType = - RowType.of( - new LogicalType[] {new IntType(), new BooleanType(), VarCharType.STRING_TYPE}, - new String[] {"left_value", "left_jk1", "left_jk2_lk"}); - - private static final InternalTypeInfo leftTypeInfo = InternalTypeInfo.of(leftRowType); - - private static final int[] leftJoinKeyIndices = new int[] {1, 2}; - - // right join key: - // right lookup key: - private static final RowType rightRowType = - RowType.of( - new LogicalType[] {VarCharType.STRING_TYPE, new IntType(), new BooleanType()}, - new String[] {"right_jk2", "right_value", "right_jk1_lk"}); - - private static final InternalTypeInfo rightTypeInfo = - InternalTypeInfo.of(rightRowType); - - private static final int[] rightJoinKeyIndices = new int[] {2, 0}; - - private static final RowDataKeySelector leftJoinKeySelector = - HandwrittenSelectorUtil.getRowDataSelector( - leftJoinKeyIndices, leftRowType.getChildren().toArray(new LogicalType[0])); - private static final RowDataKeySelector rightJoinKeySelector = - HandwrittenSelectorUtil.getRowDataSelector( - rightJoinKeyIndices, rightRowType.getChildren().toArray(new LogicalType[0])); - - private static final int[] outputFieldIndices = - IntStream.range(0, leftTypeInfo.getArity() + rightTypeInfo.getArity()).toArray(); + // + private static final HashMap leftTableCurrentData = new HashMap<>(); + private static final HashMap rightTableCurrentData = new HashMap<>(); @Parameters(name = "EnableCache = {0}") public static List parameters() { @@ -181,46 +113,6 @@ public static List parameters() { @BeforeEach public void beforeEach() throws Exception { - testHarness = createDeltaJoinOperatorTestHarness(); - testHarness.setup(); - testHarness.open(); - StreamingDeltaJoinOperator operator = unwrapOperator(testHarness); - // set external failure cause consumer to prevent hang - testHarness - .getEnvironment() - .setExternalFailureCauseConsumer( - error -> { - latestException = Optional.of(error); - // DO NOT throw exception up again to avoid hang - }); - operator.setAsyncExecutionController( - new MyAsyncExecutionControllerDelegate(operator.getAsyncExecutionController())); - prepareOperatorRuntimeInfo(operator); - - assertor = - new RowDataHarnessAssertor( - getOutputType().getChildren().toArray(new LogicalType[0]), - // sort the result by the output upsert key - (o1, o2) -> { - for (int keyIndex : outputFieldIndices) { - LogicalType type = getOutputType().getChildren().get(keyIndex); - RowData.FieldGetter getter = - RowData.createFieldGetter(type, keyIndex); - - int compareResult = - Objects.requireNonNull(getter.getFieldOrNull(o1)) - .toString() - .compareTo( - Objects.requireNonNull( - getter.getFieldOrNull(o2)) - .toString()); - - if (compareResult != 0) { - return compareResult; - } - } - return o1.toString().compareTo(o2.toString()); - }); MyAsyncFunction.leftInvokeCount.set(0); MyAsyncFunction.rightInvokeCount.set(0); MyAsyncExecutionControllerDelegate.insertTableDataAfterEmit = true; @@ -228,7 +120,9 @@ public void beforeEach() throws Exception { @AfterEach public void afterEach() throws Exception { - testHarness.close(); + if (assertor != null) { + testHarness.close(); + } leftTableCurrentData.clear(); rightTableCurrentData.clear(); latestException = Optional.empty(); @@ -236,7 +130,11 @@ public void afterEach() throws Exception { } @TestTemplate - void testJoinBothAppendOnlyTables() throws Exception { + void testJoinBothLogTables() throws Exception { + LogLogTableJoinTestSpec testSpec = LogLogTableJoinTestSpec.INSTANCE; + initTestHarness(testSpec); + initAssertor(testSpec); + StreamRecord leftRecord1 = insertRecord(100, true, "jklk1"); StreamRecord leftRecord2 = insertRecord(100, false, "jklk2"); testHarness.processElement1(leftRecord1); @@ -293,6 +191,8 @@ void testJoinBothAppendOnlyTables() throws Exception { DeltaJoinCache cache = unwrapCache(testHarness); if (enableCache) { + RowType leftRowType = testSpec.getLeftInputRowType(); + RowType rightRowType = testSpec.getRightInputRowType(); Map> expectedLeftCacheData = newHashMap( binaryrow(true, "jklk1"), @@ -339,8 +239,102 @@ void testJoinBothAppendOnlyTables() throws Exception { } } + @TestTemplate + void testJoinBothPkTables() throws Exception { + PkPkTableJoinTestSpec testSpec = PkPkTableJoinTestSpec.INSTANCE; + initTestHarness(testSpec); + initAssertor(testSpec); + + StreamRecord leftRecordK1V1 = insertRecord(100, true, "Tom"); + StreamRecord leftRecordK2V1 = insertRecord(101, false, "Tom"); + // mismatch + StreamRecord leftRecordK3V1 = insertRecord(1999, false, "Jim"); + testHarness.processElement1(leftRecordK1V1); + testHarness.processElement1(leftRecordK2V1); + testHarness.processElement1(leftRecordK3V1); + + waitAllDataProcessed(); + final ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + assertor.assertOutputEqualsSorted( + "result mismatch", expectedOutput, testHarness.getOutput()); + + StreamRecord rightRecordK1V1 = insertRecord("Tom", 200, true); + StreamRecord rightRecordK2V1 = insertRecord("Tom", 201, false); + // mismatch + StreamRecord rightRecordK3V1 = insertRecord("Sam", 2999, false); + testHarness.processElement2(rightRecordK1V1); + testHarness.processElement2(rightRecordK2V1); + testHarness.processElement2(rightRecordK3V1); + + waitAllDataProcessed(); + expectedOutput.add(insertRecord(100, true, "Tom", "Tom", 200, true)); + expectedOutput.add(insertRecord(101, false, "Tom", "Tom", 200, true)); + expectedOutput.add(insertRecord(100, true, "Tom", "Tom", 201, false)); + expectedOutput.add(insertRecord(101, false, "Tom", "Tom", 201, false)); + assertor.assertOutputEqualsSorted( + "result mismatch", expectedOutput, testHarness.getOutput()); + + StreamRecord leftRecordK1V2 = updateAfterRecord(1000, true, "Tom"); + testHarness.processElement1(leftRecordK1V2); + + waitAllDataProcessed(); + expectedOutput.add(updateAfterRecord(1000, true, "Tom", "Tom", 200, true)); + expectedOutput.add(updateAfterRecord(1000, true, "Tom", "Tom", 201, false)); + assertor.assertOutputEqualsSorted( + "result mismatch", expectedOutput, testHarness.getOutput()); + + StreamRecord rightRecordK1V2 = updateAfterRecord("Tom", 2000, true); + StreamRecord rightRecordK2V2 = updateAfterRecord("Tom", 2001, false); + testHarness.processElement2(rightRecordK1V2); + testHarness.processElement2(rightRecordK2V2); + + waitAllDataProcessed(); + expectedOutput.add(updateAfterRecord(1000, true, "Tom", "Tom", 2000, true)); + expectedOutput.add(updateAfterRecord(101, false, "Tom", "Tom", 2000, true)); + expectedOutput.add(updateAfterRecord(1000, true, "Tom", "Tom", 2001, false)); + expectedOutput.add(updateAfterRecord(101, false, "Tom", "Tom", 2001, false)); + assertor.assertOutputEqualsSorted( + "result mismatch", expectedOutput, testHarness.getOutput()); + + DeltaJoinCache cache = unwrapCache(testHarness); + if (enableCache) { + Map> expectedLeftCacheData = + newHashMap( + binaryrow("Tom"), + newHashMap( + binaryrow(true, "Tom"), + leftRecordK1V2.getValue(), + binaryrow(false, "Tom"), + leftRecordK2V1.getValue()), + binaryrow("Sam"), + Collections.emptyMap()); + + Map> expectedRightCacheData = + newHashMap( + binaryrow("Tom"), + newHashMap( + binaryrow("Tom", true), + rightRecordK1V2.getValue(), + binaryrow("Tom", false), + rightRecordK2V2.getValue()), + binaryrow("Jim"), + Collections.emptyMap()); + verifyCacheData(cache, expectedLeftCacheData, expectedRightCacheData, 5, 3, 4, 2); + assertThat(MyAsyncFunction.leftInvokeCount.get()).isEqualTo(2); + assertThat(MyAsyncFunction.rightInvokeCount.get()).isEqualTo(2); + } else { + verifyCacheData(cache, Collections.emptyMap(), Collections.emptyMap(), 0, 0, 0, 0); + assertThat(MyAsyncFunction.leftInvokeCount.get()).isEqualTo(4); + assertThat(MyAsyncFunction.rightInvokeCount.get()).isEqualTo(5); + } + } + @TestTemplate void testBlockingWithSameJoinKey() throws Exception { + LogLogTableJoinTestSpec testSpec = LogLogTableJoinTestSpec.INSTANCE; + initTestHarness(testSpec); + initAssertor(testSpec); + // block the async function MyAsyncFunction.block(); @@ -377,6 +371,9 @@ void testBlockingWithSameJoinKey() throws Exception { assertThat(recordsBuffer.getActiveBuffer().size()).isEqualTo(3); assertThat(recordsBuffer.getBlockingBuffer().size()).isEqualTo(2); + RowDataKeySelector leftJoinKeySelector = testSpec.getLeftJoinKeySelector(); + RowDataKeySelector rightJoinKeySelector = testSpec.getRightJoinKeySelector(); + RowData joinKey1 = leftJoinKeySelector.getKey(insertRecord(100, true, "jklk1").getValue()); RowData joinKey2 = leftJoinKeySelector.getKey(insertRecord(100, false, "jklk2").getValue()); RowData joinKey3 = @@ -411,6 +408,8 @@ void testBlockingWithSameJoinKey() throws Exception { DeltaJoinCache cache = unwrapCache(testHarness); if (enableCache) { + RowType leftRowType = testSpec.getLeftInputRowType(); + RowType rightRowType = testSpec.getRightInputRowType(); Map> expectedLeftCacheData = newHashMap( binaryrow(true, "jklk1"), @@ -457,29 +456,33 @@ void testBlockingWithSameJoinKey() throws Exception { * source and delta-join). */ @TestTemplate - void testTableDataVisibleBeforeJoin() throws Exception { + void testLogTableDataVisibleBeforeJoin() throws Exception { + LogLogTableJoinTestSpec testSpec = LogLogTableJoinTestSpec.INSTANCE; + initTestHarness(testSpec); + initAssertor(testSpec); + MyAsyncExecutionControllerDelegate.insertTableDataAfterEmit = false; // prepare the data first to mock all following requests were in flight between source and // delta-join final StreamRecord leftRecord1 = insertRecord(100, true, "jklk1"); - insertLeftTable(leftRecord1); + insertLeftTable(testSpec, leftRecord1); final StreamRecord leftRecord2 = insertRecord(200, true, "jklk1"); - insertLeftTable(leftRecord2); + insertLeftTable(testSpec, leftRecord2); final StreamRecord rightRecord1 = insertRecord("jklk1", 300, true); - insertRightTable(rightRecord1); + insertRightTable(testSpec, rightRecord1); // mismatch final StreamRecord rightRecord2 = insertRecord("jklk2", 500, false); - insertRightTable(rightRecord2); + insertRightTable(testSpec, rightRecord2); final StreamRecord leftRecord3 = insertRecord(800, true, "jklk1"); - insertLeftTable(leftRecord3); + insertLeftTable(testSpec, leftRecord3); final StreamRecord rightRecord3 = insertRecord("jklk1", 1000, true); - insertRightTable(rightRecord3); + insertRightTable(testSpec, rightRecord3); testHarness.processElement1(leftRecord1); testHarness.processElement1(leftRecord2); @@ -525,6 +528,8 @@ void testTableDataVisibleBeforeJoin() throws Exception { DeltaJoinCache cache = unwrapCache(testHarness); if (enableCache) { + RowType leftRowType = testSpec.getLeftInputRowType(); + RowType rightRowType = testSpec.getRightInputRowType(); Map> expectedLeftCacheData = newHashMap( binaryrow(true, "jklk1"), @@ -557,8 +562,134 @@ void testTableDataVisibleBeforeJoin() throws Exception { } } + /** + * This test is used to test the scenario where the right stream side joined out a record from + * the left table that has not been sent to the delta-join operator (maybe is in flight between + * source and delta-join). + */ + @TestTemplate + void testPkTableDataVisibleBeforeJoin() throws Exception { + PkPkTableJoinTestSpec testSpec = PkPkTableJoinTestSpec.INSTANCE; + initTestHarness(testSpec); + initAssertor(testSpec); + + MyAsyncExecutionControllerDelegate.insertTableDataAfterEmit = false; + + // prepare the data first to mock all following requests were in flight between source and + // delta-join + final StreamRecord leftRecordK1V1 = insertRecord(100, true, "Tom"); + insertLeftTable(testSpec, leftRecordK1V1); + final StreamRecord leftRecordK1V2 = updateAfterRecord(1000, true, "Tom"); + insertLeftTable(testSpec, leftRecordK1V2); + + final StreamRecord leftRecordK2V1 = insertRecord(101, false, "Tom"); + insertLeftTable(testSpec, leftRecordK2V1); + + // mismatch + final StreamRecord leftRecordK3V1 = insertRecord(101, false, "Jim"); + insertLeftTable(testSpec, leftRecordK3V1); + final StreamRecord leftRecordK3V2 = updateAfterRecord(1001, false, "Jim"); + insertLeftTable(testSpec, leftRecordK3V2); + + final StreamRecord rightRecordK1V1 = insertRecord("Tom", 200, true); + insertRightTable(testSpec, rightRecordK1V1); + final StreamRecord rightRecordK1V2 = updateAfterRecord("Tom", 2000, true); + insertRightTable(testSpec, rightRecordK1V2); + final StreamRecord rightRecordK1V3 = updateAfterRecord("Tom", 20000, true); + insertRightTable(testSpec, rightRecordK1V3); + + final StreamRecord rightRecordK2V1 = insertRecord("Tom", 201, false); + insertRightTable(testSpec, rightRecordK2V1); + + // mismatch + final StreamRecord rightRecordK3V1 = insertRecord("Sam", 999, false); + insertRightTable(testSpec, rightRecordK3V1); + + final ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + + testHarness.processElement1(leftRecordK1V1); + expectedOutput.add(insertRecord(100, true, "Tom", "Tom", 20000, true)); + expectedOutput.add(insertRecord(100, true, "Tom", "Tom", 201, false)); + + testHarness.processElement1(leftRecordK1V2); + expectedOutput.add(updateAfterRecord(1000, true, "Tom", "Tom", 20000, true)); + expectedOutput.add(updateAfterRecord(1000, true, "Tom", "Tom", 201, false)); + + testHarness.processElement1(leftRecordK2V1); + expectedOutput.add(insertRecord(101, false, "Tom", "Tom", 20000, true)); + expectedOutput.add(insertRecord(101, false, "Tom", "Tom", 201, false)); + + testHarness.processElement1(leftRecordK3V1); + testHarness.processElement1(leftRecordK3V2); + + testHarness.processElement2(rightRecordK1V1); + expectedOutput.add(insertRecord(1000, true, "Tom", "Tom", 200, true)); + expectedOutput.add(insertRecord(101, false, "Tom", "Tom", 200, true)); + + testHarness.processElement2(rightRecordK1V2); + expectedOutput.add(updateAfterRecord(1000, true, "Tom", "Tom", 2000, true)); + expectedOutput.add(updateAfterRecord(101, false, "Tom", "Tom", 2000, true)); + + testHarness.processElement2(rightRecordK1V3); + expectedOutput.add(updateAfterRecord(1000, true, "Tom", "Tom", 20000, true)); + expectedOutput.add(updateAfterRecord(101, false, "Tom", "Tom", 20000, true)); + + testHarness.processElement2(rightRecordK2V1); + expectedOutput.add(insertRecord(1000, true, "Tom", "Tom", 201, false)); + expectedOutput.add(insertRecord(101, false, "Tom", "Tom", 201, false)); + + testHarness.processElement2(rightRecordK3V1); + + waitAllDataProcessed(); + + assertor.assertOutputEqualsSorted( + "result mismatch", expectedOutput, testHarness.getOutput()); + + TableAsyncExecutionController aec = unwrapAEC(testHarness); + assertThat(aec.getBlockingSize()).isEqualTo(0); + assertThat(aec.getInFlightSize()).isEqualTo(0); + assertThat(aec.getFinishSize()).isEqualTo(0); + + DeltaJoinCache cache = unwrapCache(testHarness); + if (enableCache) { + Map> expectedLeftCacheData = + newHashMap( + binaryrow("Tom"), + newHashMap( + binaryrow(true, "Tom"), + leftRecordK1V2.getValue(), + binaryrow(false, "Tom"), + leftRecordK2V1.getValue()), + binaryrow("Sam"), + Collections.emptyMap()); + + Map> expectedRightCacheData = + newHashMap( + binaryrow("Tom"), + newHashMap( + binaryrow("Tom", true), + rightRecordK1V3.getValue(), + binaryrow("Tom", false), + rightRecordK2V1.getValue()), + binaryrow("Jim"), + Collections.emptyMap()); + + verifyCacheData(cache, expectedLeftCacheData, expectedRightCacheData, 5, 3, 5, 3); + assertThat(MyAsyncFunction.leftInvokeCount.get()).isEqualTo(2); + assertThat(MyAsyncFunction.rightInvokeCount.get()).isEqualTo(2); + } else { + verifyCacheData(cache, Collections.emptyMap(), Collections.emptyMap(), 0, 0, 0, 0); + assertThat(MyAsyncFunction.leftInvokeCount.get()).isEqualTo(5); + assertThat(MyAsyncFunction.rightInvokeCount.get()).isEqualTo(5); + } + } + @TestTemplate void testCheckpointAndRestore() throws Exception { + LogLogTableJoinTestSpec testSpec = LogLogTableJoinTestSpec.INSTANCE; + initTestHarness(testSpec); + initAssertor(testSpec); + // block the async function MyAsyncFunction.block(); @@ -599,19 +730,19 @@ void testCheckpointAndRestore() throws Exception { MyAsyncFunction.block(); // restoring - testHarness = createDeltaJoinOperatorTestHarness(); + testHarness = createDeltaJoinOperatorTestHarness(testSpec); testHarness.setup(); StreamingDeltaJoinOperator operator = unwrapOperator(testHarness); operator.setAsyncExecutionController( - new MyAsyncExecutionControllerDelegate(operator.getAsyncExecutionController())); + new MyAsyncExecutionControllerDelegate( + testSpec, operator.getAsyncExecutionController())); latestException = Optional.empty(); testHarness.initializeState(snapshot); testHarness.open(); - prepareOperatorRuntimeInfo(operator); aec = unwrapAEC(testHarness); assertThat(aec.getBlockingSize()).isEqualTo(2); @@ -641,6 +772,8 @@ void testCheckpointAndRestore() throws Exception { DeltaJoinCache cache = unwrapCache(testHarness); if (enableCache) { + RowType leftRowType = testSpec.getLeftInputRowType(); + RowType rightRowType = testSpec.getRightInputRowType(); Map> expectedLeftCacheData = newHashMap( binaryrow(true, "jklk1"), @@ -671,6 +804,10 @@ void testCheckpointAndRestore() throws Exception { @TestTemplate void testClearLegacyStateWhenCheckpointing() throws Exception { + LogLogTableJoinTestSpec testSpec = LogLogTableJoinTestSpec.INSTANCE; + initTestHarness(testSpec); + initAssertor(testSpec); + // block the async function MyAsyncFunction.block(); @@ -722,6 +859,10 @@ void testClearLegacyStateWhenCheckpointing() throws Exception { @TestTemplate void testMeetExceptionWhenLookup() throws Exception { + LogLogTableJoinTestSpec testSpec = LogLogTableJoinTestSpec.INSTANCE; + initTestHarness(testSpec); + initAssertor(testSpec); + Throwable expectedException = new IllegalStateException("Mock to fail"); MyAsyncFunction.setExpectedThrownException(expectedException); @@ -739,6 +880,52 @@ void testMeetExceptionWhenLookup() throws Exception { .isEqualTo(expectedException); } + private void initTestHarness(AbstractTestSpec testSpec) throws Exception { + testHarness = createDeltaJoinOperatorTestHarness(testSpec); + testHarness.setup(); + testHarness.open(); + StreamingDeltaJoinOperator operator = unwrapOperator(testHarness); + // set external failure cause consumer to prevent hang + testHarness + .getEnvironment() + .setExternalFailureCauseConsumer( + error -> { + latestException = Optional.of(error); + // DO NOT throw exception up again to avoid hang + }); + operator.setAsyncExecutionController( + new MyAsyncExecutionControllerDelegate( + testSpec, operator.getAsyncExecutionController())); + } + + private void initAssertor(AbstractTestSpec testSpec) { + RowType outputRowType = testSpec.getOutputRowType(); + assertor = + new RowDataHarnessAssertor( + outputRowType.getChildren().toArray(new LogicalType[0]), + // sort the result by the output upsert key + (o1, o2) -> { + for (int keyIndex : testSpec.getOutputFieldIndices()) { + LogicalType type = outputRowType.getChildren().get(keyIndex); + RowData.FieldGetter getter = + RowData.createFieldGetter(type, keyIndex); + + int compareResult = + Objects.requireNonNull(getter.getFieldOrNull(o1)) + .toString() + .compareTo( + Objects.requireNonNull( + getter.getFieldOrNull(o2)) + .toString()); + + if (compareResult != 0) { + return compareResult; + } + } + return o1.toString().compareTo(o2.toString()); + }); + } + private void verifyCacheData( DeltaJoinCache actualCache, Map> expectedLeftCacheData, @@ -820,58 +1007,68 @@ private void waitAllDataProcessed() throws Exception { } private KeyedTwoInputStreamOperatorTestHarness - createDeltaJoinOperatorTestHarness() throws Exception { + createDeltaJoinOperatorTestHarness(AbstractTestSpec testSpec) throws Exception { TaskMailbox mailbox = new TaskMailboxImpl(); MailboxProcessor mailboxProcessor = new MailboxProcessor(controller -> {}, mailbox, StreamTaskActionExecutor.IMMEDIATE); DataStructureConverter leftFetcherConverter = (DataStructureConverter) - DataStructureConverters.getConverter(leftTypeInfo.getDataType()); - - RowDataKeySelector leftUpsertKeySelector = getUpsertKeySelector(leftRowType, null); - RowDataKeySelector rightUpsertKeySelector = getUpsertKeySelector(rightRowType, null); + DataStructureConverters.getConverter( + testSpec.getLeftTypeInfo().getDataType()); AsyncDeltaJoinRunner leftAsyncFunction = new AsyncDeltaJoinRunner( - new GeneratedFunctionWrapper<>(new MyAsyncFunction()), + new GeneratedFunction<>("", "", new Object[0]) { + @Override + public MyAsyncFunction newInstance(ClassLoader classLoader) { + return new MyAsyncFunction(testSpec, false); + } + }, leftFetcherConverter, new GeneratedResultFutureWrapper<>(new TestingFetcherResultFuture()), - leftTypeInfo.toRowSerializer(), - leftJoinKeySelector, - leftUpsertKeySelector, - rightJoinKeySelector, - rightUpsertKeySelector, + testSpec.getLeftTypeInfo().toRowSerializer(), + testSpec.getLeftJoinKeySelector(), + testSpec.getLeftUpsertKeySelector(), + testSpec.getRightJoinKeySelector(), + testSpec.getRightUpsertKeySelector(), AEC_CAPACITY, false, enableCache); DataStructureConverter rightFetcherConverter = (DataStructureConverter) - DataStructureConverters.getConverter(rightTypeInfo.getDataType()); + DataStructureConverters.getConverter( + testSpec.getRightTypeInfo().getDataType()); AsyncDeltaJoinRunner rightAsyncFunction = new AsyncDeltaJoinRunner( - new GeneratedFunctionWrapper<>(new MyAsyncFunction()), + new GeneratedFunction<>("", "", new Object[0]) { + @Override + public MyAsyncFunction newInstance(ClassLoader classLoader) { + return new MyAsyncFunction(testSpec, true); + } + }, rightFetcherConverter, new GeneratedResultFutureWrapper<>(new TestingFetcherResultFuture()), - rightTypeInfo.toRowSerializer(), - leftJoinKeySelector, - leftUpsertKeySelector, - rightJoinKeySelector, - rightUpsertKeySelector, + testSpec.getRightTypeInfo().toRowSerializer(), + testSpec.getLeftJoinKeySelector(), + testSpec.getLeftUpsertKeySelector(), + testSpec.getRightJoinKeySelector(), + testSpec.getRightUpsertKeySelector(), AEC_CAPACITY, true, enableCache); - InternalTypeInfo joinKeyTypeInfo = leftJoinKeySelector.getProducedType(); + InternalTypeInfo joinKeyTypeInfo = + testSpec.getLeftJoinKeySelector().getProducedType(); StreamingDeltaJoinOperator operator = new StreamingDeltaJoinOperator( rightAsyncFunction, leftAsyncFunction, - leftJoinKeySelector, - rightJoinKeySelector, + testSpec.getLeftJoinKeySelector(), + testSpec.getRightJoinKeySelector(), -1L, AEC_CAPACITY, new TestProcessingTimeService(), @@ -879,41 +1076,19 @@ private void waitAllDataProcessed() throws Exception { mailbox, 0, StreamTaskActionExecutor.IMMEDIATE, mailboxProcessor), CACHE_SIZE, CACHE_SIZE, - leftRowType, - rightRowType); + testSpec.getLeftInputRowType(), + testSpec.getRightInputRowType()); return new KeyedTwoInputStreamOperatorTestHarness<>( operator, - leftJoinKeySelector, - rightJoinKeySelector, + testSpec.getLeftJoinKeySelector(), + testSpec.getRightJoinKeySelector(), joinKeyTypeInfo, 1, 1, 0, - leftTypeInfo.toSerializer(), - rightTypeInfo.toSerializer()); - } - - private RowDataKeySelector getUpsertKeySelector(RowType rowType, @Nullable int[] upsertKey) { - if (upsertKey == null) { - upsertKey = IntStream.range(0, rowType.getFieldCount()).toArray(); - } - return HandwrittenSelectorUtil.getRowDataSelector( - upsertKey, rowType.getChildren().toArray(new LogicalType[0])); - } - - private void prepareOperatorRuntimeInfo(StreamingDeltaJoinOperator operator) { - unwrapAsyncFunction(operator, true).tagInvokingSideDuringRuntime(true); - unwrapAsyncFunction(operator, false).tagInvokingSideDuringRuntime(false); - } - - private MyAsyncFunction unwrapAsyncFunction( - StreamingDeltaJoinOperator operator, boolean unwrapLeft) { - if (unwrapLeft) { - return (MyAsyncFunction) operator.getLeftTriggeredUserFunction().getFetcher(); - } else { - return (MyAsyncFunction) operator.getRightTriggeredUserFunction().getFetcher(); - } + testSpec.getLeftTypeInfo().toSerializer(), + testSpec.getRightTypeInfo().toSerializer()); } private TableAsyncExecutionController unwrapAEC( @@ -941,36 +1116,27 @@ private DeltaJoinCache unwrapCache( return cacheInLeftRunner; } - private RowType getOutputType() { - return RowType.of( - Stream.concat( - leftRowType.getChildren().stream(), - rightRowType.getChildren().stream()) - .toArray(LogicalType[]::new), - Stream.concat( - leftRowType.getFieldNames().stream(), - rightRowType.getFieldNames().stream()) - .toArray(String[]::new)); + private void insertLeftTable(AbstractTestSpec testSpec, StreamRecord record) { + insertTableData(testSpec, record, true); } - private void insertLeftTable(StreamRecord record) { - insertTableData(record, true); + private void insertRightTable(AbstractTestSpec testSpec, StreamRecord record) { + insertTableData(testSpec, record, false); } - private void insertRightTable(StreamRecord record) { - insertTableData(record, false); - } - - private static void insertTableData(StreamRecord record, boolean insertLeftTable) { + private static void insertTableData( + AbstractTestSpec testSpec, StreamRecord record, boolean insertLeftTable) { RowData rowData = record.getValue(); try { if (insertLeftTable) { synchronized (leftTableCurrentData) { - leftTableCurrentData.add(rowData); + RowData upsertKey = testSpec.getLeftUpsertKeySelector().getKey(rowData); + leftTableCurrentData.put(upsertKey, rowData); } } else { synchronized (rightTableCurrentData) { - rightTableCurrentData.add(rowData); + RowData upsertKey = testSpec.getRightUpsertKeySelector().getKey(rowData); + rightTableCurrentData.put(upsertKey, rowData); } } } catch (Exception e) { @@ -1018,11 +1184,12 @@ public static class MyAsyncFunction extends RichAsyncFunction { private static Optional expectedThrownException = Optional.empty(); - // ===== runtime info ===== - private Boolean treatRightAsLookupTable; + private final AbstractTestSpec testSpec; + private final boolean treatRightAsLookupTable; - public void tagInvokingSideDuringRuntime(boolean isLeftInvoking) { - this.treatRightAsLookupTable = isLeftInvoking; + private MyAsyncFunction(AbstractTestSpec testSpec, boolean treatRightAsLookupTable) { + this.testSpec = testSpec; + this.treatRightAsLookupTable = treatRightAsLookupTable; } public static void block() throws Exception { @@ -1054,29 +1221,29 @@ public void asyncInvoke(final RowData input, final ResultFuture resultFu lock.await(); } - LinkedList lookupTableData; + HashMap lookupTableData; RowDataKeySelector streamSideJoinKeySelector; RowDataKeySelector lookupSideJoinKeySelector; - if (Objects.requireNonNull(treatRightAsLookupTable)) { + if (treatRightAsLookupTable) { synchronized (rightTableCurrentData) { - lookupTableData = new LinkedList<>(rightTableCurrentData); + lookupTableData = new HashMap<>(rightTableCurrentData); } - streamSideJoinKeySelector = leftJoinKeySelector.copy(); - lookupSideJoinKeySelector = rightJoinKeySelector.copy(); + streamSideJoinKeySelector = testSpec.getLeftJoinKeySelector(); + lookupSideJoinKeySelector = testSpec.getRightJoinKeySelector(); leftInvokeCount.incrementAndGet(); } else { synchronized (leftTableCurrentData) { - lookupTableData = new LinkedList<>(leftTableCurrentData); + lookupTableData = new HashMap<>(leftTableCurrentData); } - streamSideJoinKeySelector = rightJoinKeySelector.copy(); - lookupSideJoinKeySelector = leftJoinKeySelector.copy(); + streamSideJoinKeySelector = testSpec.getRightJoinKeySelector(); + lookupSideJoinKeySelector = testSpec.getLeftJoinKeySelector(); rightInvokeCount.incrementAndGet(); } List results = new ArrayList<>(); - for (RowData row : lookupTableData) { + for (RowData row : lookupTableData.values()) { if (streamSideJoinKeySelector .getKey(input) .equals(lookupSideJoinKeySelector.getKey(row))) { @@ -1139,6 +1306,7 @@ private static class MyAsyncExecutionControllerDelegate private static boolean insertTableDataAfterEmit = true; public MyAsyncExecutionControllerDelegate( + AbstractTestSpec testSpec, TableAsyncExecutionController innerAec) { super( innerAec.getAsyncInvoke(), @@ -1153,6 +1321,7 @@ public MyAsyncExecutionControllerDelegate( int inputIndex = inputIndexAwareEntry.getInputIndex(); //noinspection unchecked insertTableData( + testSpec, (StreamRecord) inputIndexAwareEntry.getInputElement(), inputIndex == 0); } @@ -1178,4 +1347,205 @@ public void complete(Collection result) { getResultFuture().complete((Collection) result); } } + + private abstract static class AbstractTestSpec { + + abstract RowType getLeftInputRowType(); + + final InternalTypeInfo getLeftTypeInfo() { + return InternalTypeInfo.of(getLeftInputRowType()); + } + + abstract Optional getLeftUpsertKey(); + + final RowDataKeySelector getLeftUpsertKeySelector() { + return getUpsertKeySelector(getLeftInputRowType(), getLeftUpsertKey().orElse(null)); + } + + abstract RowType getRightInputRowType(); + + final InternalTypeInfo getRightTypeInfo() { + return InternalTypeInfo.of(getRightInputRowType()); + } + + abstract Optional getRightUpsertKey(); + + final RowDataKeySelector getRightUpsertKeySelector() { + return getUpsertKeySelector(getRightInputRowType(), getRightUpsertKey().orElse(null)); + } + + abstract int[] getLeftJoinKeyIndices(); + + final RowDataKeySelector getLeftJoinKeySelector() { + return HandwrittenSelectorUtil.getRowDataSelector( + getLeftJoinKeyIndices(), + getLeftInputRowType().getChildren().toArray(new LogicalType[0])); + } + + abstract int[] getRightJoinKeyIndices(); + + final RowDataKeySelector getRightJoinKeySelector() { + return HandwrittenSelectorUtil.getRowDataSelector( + getRightJoinKeyIndices(), + getRightInputRowType().getChildren().toArray(new LogicalType[0])); + } + + final RowType getOutputRowType() { + return RowType.of( + Stream.concat( + getLeftInputRowType().getChildren().stream(), + getRightInputRowType().getChildren().stream()) + .toArray(LogicalType[]::new), + Stream.concat( + getLeftInputRowType().getFieldNames().stream(), + getRightInputRowType().getFieldNames().stream()) + .toArray(String[]::new)); + } + + final int[] getOutputFieldIndices() { + return IntStream.range(0, getOutputRowType().getFieldCount()).toArray(); + } + + private RowDataKeySelector getUpsertKeySelector( + RowType rowType, @Nullable int[] upsertKey) { + if (upsertKey == null) { + upsertKey = IntStream.range(0, rowType.getFieldCount()).toArray(); + } + return HandwrittenSelectorUtil.getRowDataSelector( + upsertKey, rowType.getChildren().toArray(new LogicalType[0])); + } + } + + /** + * Mock sql like the following. + * + *
    +     *      CREATE TABLE leftSrc(
    +     *          left_value INT,
    +     *          left_jk1 BOOLEAN,
    +     *          left_jk2_index STRING,
    +     *          INDEX(left_jk2_index)
    +     *      )
    +     * 
    + * + *
    +     *      CREATE TABLE rightSrc(
    +     *          right_jk2 STRING,
    +     *          right_value INT,
    +     *          right_jk1_index BOOLEAN,
    +     *          INDEX(right_jk1_index)
    +     *      )
    +     * 
    + * + *
    +     *     select * from leftSrc join rightSrc
    +     *      on leftSrc.left_jk1 = rightSrc.right_jk1_index
    +     *      and leftSrc.left_jk2_index = rightSrc.right_jk2
    +     * 
    + */ + private static class LogLogTableJoinTestSpec extends AbstractTestSpec { + + private static final LogLogTableJoinTestSpec INSTANCE = new LogLogTableJoinTestSpec(); + + @Override + RowType getLeftInputRowType() { + return RowType.of( + new LogicalType[] {new IntType(), new BooleanType(), VarCharType.STRING_TYPE}, + new String[] {"left_value", "left_jk1", "left_jk2_index"}); + } + + @Override + RowType getRightInputRowType() { + return RowType.of( + new LogicalType[] {VarCharType.STRING_TYPE, new IntType(), new BooleanType()}, + new String[] {"right_jk2", "right_value", "right_jk1_index"}); + } + + @Override + Optional getLeftUpsertKey() { + return Optional.empty(); + } + + @Override + Optional getRightUpsertKey() { + return Optional.empty(); + } + + @Override + int[] getLeftJoinKeyIndices() { + return new int[] {1, 2}; + } + + @Override + int[] getRightJoinKeyIndices() { + return new int[] {2, 0}; + } + } + + /** + * Mock sql like the following. + * + *
    +     *      CREATE TABLE leftSrc(
    +     *          left_value INT,
    +     *          left_pk1 BOOLEAN,
    +     *          left_pk2_jk_index STRING,
    +     *          PRIMARY KEY (left_pk1, left_pk2_jk_index) NOT ENFORCED
    +     *          INDEX(left_pk2_jk_index)
    +     *      )
    +     * 
    + * + *
    +     *      CREATE TABLE rightSrc(
    +     *          right_pk2_jk_index STRING,
    +     *          right_value INT,
    +     *          right_pk1 BOOLEAN,
    +     *          PRIMARY KEY (right_pk2_jk_index, right_pk1) NOT ENFORCED
    +     *          INDEX(right_pk2_jk_index)
    +     *      )
    +     * 
    + * + *
    +     *     select * from leftSrc join rightSrc
    +     *      on leftSrc.left_pk2_jk_index = rightSrc.right_pk2_jk_index
    +     * 
    + */ + private static class PkPkTableJoinTestSpec extends AbstractTestSpec { + + private static final PkPkTableJoinTestSpec INSTANCE = new PkPkTableJoinTestSpec(); + + @Override + RowType getLeftInputRowType() { + return RowType.of( + new LogicalType[] {new IntType(), new BooleanType(), VarCharType.STRING_TYPE}, + new String[] {"left_value", "left_pk1", "left_pk2_jk_index"}); + } + + @Override + RowType getRightInputRowType() { + return RowType.of( + new LogicalType[] {VarCharType.STRING_TYPE, new IntType(), new BooleanType()}, + new String[] {"right_pk2_jk_index", "right_value", "right_pk1"}); + } + + @Override + Optional getLeftUpsertKey() { + return Optional.of(new int[] {1, 2}); + } + + @Override + Optional getRightUpsertKey() { + return Optional.of(new int[] {0, 2}); + } + + @Override + int[] getLeftJoinKeyIndices() { + return new int[] {2}; + } + + @Override + int[] getRightJoinKeyIndices() { + return new int[] {0}; + } + } }