Skip to content

Commit 7026c3c

Browse files
committed
add restore test for calc on source
1 parent b26bebe commit 7026c3c

File tree

26 files changed

+3470
-62
lines changed

26 files changed

+3470
-62
lines changed

flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SourceTestStep.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,19 @@ public static Builder newBuilder(String name) {
5757
return new Builder(name);
5858
}
5959

60+
public SourceTestStep withNewOptions(Map<String, String> newOptions) {
61+
return new SourceTestStep(
62+
name,
63+
schemaComponents,
64+
distribution,
65+
partitionKeys,
66+
newOptions,
67+
indexes,
68+
dataBeforeRestore,
69+
dataAfterRestore,
70+
treatDataBeforeRestoreAsFullStageData);
71+
}
72+
6073
@Override
6174
public TestKind getKind() {
6275
return dataBeforeRestore.isEmpty()

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/DeltaJoinRestoreTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,13 @@ public List<TableTestProgram> programs() {
3737
DeltaJoinTestPrograms.DELTA_JOIN_WITH_JOIN_KEY_EQUALS_INDEX,
3838
DeltaJoinTestPrograms.DELTA_JOIN_WITH_JOIN_KEY_CONTAINS_INDEX,
3939
DeltaJoinTestPrograms.DELTA_JOIN_WITH_NON_EQUIV_CONDITION,
40+
DeltaJoinTestPrograms.DELTA_JOIN_WITH_CALC_ON_SOURCE,
41+
DeltaJoinTestPrograms.DELTA_JOIN_WITH_CALC_ON_SOURCE_AND_FILTER_PUSHED_DOWN,
4042
DeltaJoinTestPrograms.DELTA_JOIN_WITH_CACHE,
43+
DeltaJoinTestPrograms.DELTA_JOIN_WITH_CACHE_AND_CALC_ON_SOURCE,
4144
DeltaJoinTestPrograms.DELTA_JOIN_WITH_CDC_SOURCE_WITHOUT_DELETE,
42-
DeltaJoinTestPrograms.DELTA_JOIN_WITH_CACHE_AND_CDC_SOURCE_WITHOUT_DELETE);
45+
DeltaJoinTestPrograms.DELTA_JOIN_WITH_CALC_ON_CDC_SOURCE_WITHOUT_DELETE,
46+
DeltaJoinTestPrograms.DELTA_JOIN_WITH_CACHE_AND_CDC_SOURCE_WITHOUT_DELETE,
47+
DeltaJoinTestPrograms.DELTA_JOIN_WITH_CACHE_AND_CALC_ON_CDC_SOURCE_WITHOUT_DELETE);
4348
}
4449
}

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/DeltaJoinTestPrograms.java

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.types.RowKind;
2828

2929
import java.util.Arrays;
30+
import java.util.HashMap;
3031
import java.util.Map;
3132
import java.util.stream.Collectors;
3233
import java.util.stream.Stream;
@@ -168,6 +169,70 @@ public class DeltaJoinTestPrograms {
168169
+ "on a1 = b1 and a2 <> b2")
169170
.build();
170171

172+
public static final TableTestProgram DELTA_JOIN_WITH_CALC_ON_SOURCE =
173+
TableTestProgram.of(
174+
"delta-join-with-calc-on-source",
175+
"validates delta join with calc on source")
176+
.setupConfig(
177+
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
178+
OptimizerConfigOptions.DeltaJoinStrategy.FORCE)
179+
.setupTableSources(
180+
DELTA_JOIN_WITH_JOIN_KEY_EQUALS_INDEX.getSetupSourceTestSteps())
181+
.setupTableSink(
182+
SinkTestStep.newBuilder("snk")
183+
.addSchema(addPk2Schema(SINK_TABLE_BASE_SCHEMA, "a0", "b0"))
184+
.addOptions(TABLE_BASE_OPTIONS)
185+
.testMaterializedData()
186+
// deduplicate data by pk
187+
.deduplicatedFieldIndices(new int[] {1, 3})
188+
.consumedBeforeRestore(Row.of(6, 5.0, "l-5-1", 5.0, "r-5-1", 6))
189+
.consumedAfterRestore(Row.of(6, 5.0, "l-5-2", 5.0, "r-5-1", 6))
190+
.build())
191+
.runSql(
192+
"insert into snk "
193+
+ "select new_a1, a0, a2, b0, b2, new_b1 from ( "
194+
+ " select a0, a1, a2, a1 + 1 as new_a1 from leftSrc "
195+
+ " where a1 = 1 or a1 = 5 "
196+
+ ") join ("
197+
+ " select b0, b1, b1 + 1 as new_b1, b2 from rightSrc "
198+
+ " where b0 = cast(3.0 as double) or b0 = cast(5.0 as double) "
199+
+ ") "
200+
+ "on a1 = b1 and a0 = b0")
201+
.build();
202+
203+
public static final TableTestProgram DELTA_JOIN_WITH_CALC_ON_SOURCE_AND_FILTER_PUSHED_DOWN =
204+
TableTestProgram.of(
205+
"delta-join-with-calc-on-source-and-filter-pushed-down",
206+
"validates delta join with calc on source and filter pushed down")
207+
.setupConfig(
208+
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
209+
OptimizerConfigOptions.DeltaJoinStrategy.FORCE)
210+
.setupConfig(ExecutionConfigOptions.TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED, true)
211+
.setupTableSources(
212+
DELTA_JOIN_WITH_CALC_ON_SOURCE.getSetupSourceTestSteps().stream()
213+
.map(
214+
sourceTestStep -> {
215+
String filterableFields;
216+
if (sourceTestStep.name.equals("leftSrc")) {
217+
filterableFields = "a1";
218+
} else if (sourceTestStep.name.equals("rightSrc")) {
219+
filterableFields = "b0";
220+
} else {
221+
throw new IllegalStateException(
222+
"Unknown test table name: "
223+
+ sourceTestStep.name);
224+
}
225+
Map<String, String> oldOptions =
226+
new HashMap<>(sourceTestStep.options);
227+
oldOptions.put(
228+
"filterable-fields", filterableFields);
229+
return sourceTestStep.withNewOptions(oldOptions);
230+
})
231+
.collect(Collectors.toList()))
232+
.setupTableSinks(DELTA_JOIN_WITH_CALC_ON_SOURCE.getSetupSinkTestSteps())
233+
.runSql(DELTA_JOIN_WITH_CALC_ON_SOURCE.getRunSqlTestStep().sql)
234+
.build();
235+
171236
public static final TableTestProgram DELTA_JOIN_WITH_CACHE =
172237
TableTestProgram.of("delta-join-with-cache", "validates delta join with cache")
173238
.setupConfig(
@@ -180,6 +245,19 @@ public class DeltaJoinTestPrograms {
180245
.runSql(DELTA_JOIN_WITH_NON_EQUIV_CONDITION.getRunSqlTestStep().sql)
181246
.build();
182247

248+
public static final TableTestProgram DELTA_JOIN_WITH_CACHE_AND_CALC_ON_SOURCE =
249+
TableTestProgram.of(
250+
"delta-join-with-cache-and-calc-on-source",
251+
"validates delta join with cache and calc on source")
252+
.setupConfig(
253+
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
254+
OptimizerConfigOptions.DeltaJoinStrategy.FORCE)
255+
.setupConfig(ExecutionConfigOptions.TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED, true)
256+
.setupTableSources(DELTA_JOIN_WITH_CALC_ON_SOURCE.getSetupSourceTestSteps())
257+
.setupTableSinks(DELTA_JOIN_WITH_CALC_ON_SOURCE.getSetupSinkTestSteps())
258+
.runSql(DELTA_JOIN_WITH_CALC_ON_SOURCE.getRunSqlTestStep().sql)
259+
.build();
260+
183261
public static final TableTestProgram DELTA_JOIN_WITH_CDC_SOURCE_WITHOUT_DELETE =
184262
TableTestProgram.of(
185263
"delta-join-with-cdc-source-without-delete",
@@ -259,6 +337,44 @@ public class DeltaJoinTestPrograms {
259337
+ "on a1 = b1")
260338
.build();
261339

340+
public static final TableTestProgram DELTA_JOIN_WITH_CALC_ON_CDC_SOURCE_WITHOUT_DELETE =
341+
TableTestProgram.of(
342+
"delta-join-with-calc-on-cdc-source-without-delete",
343+
"validates delta join with calc on cdc source without delete ")
344+
.setupConfig(
345+
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
346+
OptimizerConfigOptions.DeltaJoinStrategy.FORCE)
347+
.setupConfig(ExecutionConfigOptions.TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED, true)
348+
.setupTableSources(
349+
DELTA_JOIN_WITH_CDC_SOURCE_WITHOUT_DELETE.getSetupSourceTestSteps())
350+
.setupTableSink(
351+
SinkTestStep.newBuilder("snk")
352+
.addSchema(
353+
addPk2Schema(
354+
SINK_TABLE_BASE_SCHEMA, "a0", "b0", "a1", "b1"))
355+
.addOptions(TABLE_BASE_OPTIONS)
356+
.testMaterializedData()
357+
// deduplicate data by pk
358+
.deduplicatedFieldIndices(new int[] {0, 1, 3, 5})
359+
.consumedBeforeRestore(
360+
Row.of(1, 1.0, "left-pk1-2-s", 2.0, "right-pk1-1-s", 1),
361+
Row.of(1, 2.0, "left-pk2-1-s", 2.0, "right-pk1-1-s", 1))
362+
.consumedAfterRestore(
363+
Row.of(1, 1.0, "left-pk1-2-s", 2.0, "right-pk1-2-s", 1),
364+
Row.of(1, 2.0, "left-pk2-2-s", 2.0, "right-pk1-2-s", 1))
365+
.build())
366+
.runSql(
367+
"insert into snk "
368+
+ "select a1, a0, new_a2, b0, new_b2, b1 from ( "
369+
+ " select a0, a1, a2, concat(a2, '-s') as new_a2 from leftSrc "
370+
+ " where a0 = cast(1.0 as double) or a0 = cast(2.0 as double) "
371+
+ ") join ("
372+
+ " select b0, b1, concat(b2, '-s') as new_b2, b2 from rightSrc "
373+
+ " where b0 = cast(2.0 as double) or b0 = cast(3.0 as double) "
374+
+ ") "
375+
+ "on a1 = b1")
376+
.build();
377+
262378
public static final TableTestProgram DELTA_JOIN_WITH_CACHE_AND_CDC_SOURCE_WITHOUT_DELETE =
263379
TableTestProgram.of(
264380
"delta-join-with-cache-and-cdc-source-without-delete",
@@ -274,6 +390,29 @@ public class DeltaJoinTestPrograms {
274390
.runSql(DELTA_JOIN_WITH_CDC_SOURCE_WITHOUT_DELETE.getRunSqlTestStep().sql)
275391
.build();
276392

393+
public static final TableTestProgram
394+
DELTA_JOIN_WITH_CACHE_AND_CALC_ON_CDC_SOURCE_WITHOUT_DELETE =
395+
TableTestProgram.of(
396+
"delta-join-with-cache-and-calc-on-cdc-source-without-delete",
397+
"validates delta join with cache and calc on cdc source without delete")
398+
.setupConfig(
399+
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
400+
OptimizerConfigOptions.DeltaJoinStrategy.FORCE)
401+
.setupConfig(
402+
ExecutionConfigOptions.TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED,
403+
true)
404+
.setupTableSources(
405+
DELTA_JOIN_WITH_CALC_ON_CDC_SOURCE_WITHOUT_DELETE
406+
.getSetupSourceTestSteps())
407+
.setupTableSinks(
408+
DELTA_JOIN_WITH_CALC_ON_CDC_SOURCE_WITHOUT_DELETE
409+
.getSetupSinkTestSteps())
410+
.runSql(
411+
DELTA_JOIN_WITH_CALC_ON_CDC_SOURCE_WITHOUT_DELETE
412+
.getRunSqlTestStep()
413+
.sql)
414+
.build();
415+
277416
private static String[] addPk2Schema(String[] originalSchema, String... pkCols) {
278417
return Stream.concat(
279418
Arrays.stream(originalSchema),

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,6 @@ public void generateTestSetupFiles(TableTestProgram program) throws Exception {
326326
Files.move(savepointPath, savepointDirPath, StandardCopyOption.ATOMIC_MOVE);
327327
}
328328

329-
// @Disabled
330329
@ParameterizedTest
331330
@MethodSource("createSpecs")
332331
@Order(1)

0 commit comments

Comments
 (0)