Skip to content

Commit b590638

Browse files
committed
[FLINK-38504][table] Add restore tests for delta join
1 parent 17862df commit b590638

File tree

21 files changed

+2384
-38
lines changed

21 files changed

+2384
-38
lines changed

flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java

Lines changed: 71 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@
2020

2121
import org.apache.flink.table.catalog.TableDistribution;
2222
import org.apache.flink.types.Row;
23+
import org.apache.flink.util.Preconditions;
2324

2425
import javax.annotation.Nullable;
2526

2627
import java.util.ArrayList;
2728
import java.util.Arrays;
2829
import java.util.Collections;
30+
import java.util.HashMap;
2931
import java.util.List;
3032
import java.util.Map;
3133
import java.util.stream.Collectors;
@@ -42,6 +44,7 @@ public final class SinkTestStep extends TableTestStep {
4244
public final @Nullable List<Row> expectedMaterializedRows;
4345
public final @Nullable List<String> expectedMaterializedStrings;
4446
public final boolean testChangelogData;
47+
public final @Nullable int[] deduplicatedFieldIndices;
4548

4649
SinkTestStep(
4750
String name,
@@ -55,17 +58,16 @@ public final class SinkTestStep extends TableTestStep {
5558
@Nullable List<String> expectedAfterRestoreStrings,
5659
@Nullable List<Row> expectedMaterializedRows,
5760
@Nullable List<String> expectedMaterializedStrings,
58-
boolean testChangelogData) {
59-
super(name, schemaComponents, distribution, partitionKeys, options);
60-
boolean hasRowsSet =
61-
expectedBeforeRestore != null
62-
|| expectedAfterRestore != null
63-
|| expectedMaterializedRows != null;
64-
boolean hasStringsSet =
65-
expectedBeforeRestoreStrings != null
66-
|| expectedAfterRestoreStrings != null
67-
|| expectedMaterializedStrings != null;
68-
if (hasRowsSet && hasStringsSet) {
61+
boolean testChangelogData,
62+
@Nullable int[] deduplicatedFieldIndices) {
63+
super(
64+
name,
65+
schemaComponents,
66+
distribution,
67+
partitionKeys,
68+
options,
69+
Collections.emptyList());
70+
if (hasRowsSet() && hasStringsSet()) {
6971
throw new IllegalArgumentException(
7072
"You can not mix Row/String representations in restore data.");
7173
}
@@ -76,6 +78,12 @@ public final class SinkTestStep extends TableTestStep {
7678
this.expectedMaterializedRows = expectedMaterializedRows;
7779
this.expectedMaterializedStrings = expectedMaterializedStrings;
7880
this.testChangelogData = testChangelogData;
81+
this.deduplicatedFieldIndices = deduplicatedFieldIndices;
82+
83+
if (deduplicatedFieldIndices != null && !hasRowsSet()) {
84+
throw new IllegalArgumentException(
85+
"DeduplicatedFieldIndices can only be used for Row representations in restore data.");
86+
}
7987
}
8088

8189
/** Builder for creating a {@link SinkTestStep}. */
@@ -108,16 +116,41 @@ public List<String> getExpectedAfterRestoreAsStrings() {
108116
}
109117

110118
public List<String> getExpectedAsStrings() {
111-
final List<String> data = new ArrayList<>(getExpectedBeforeRestoreAsStrings());
112-
data.addAll(getExpectedAfterRestoreAsStrings());
113-
return data;
119+
if (hasStringsSet() || deduplicatedFieldIndices == null) {
120+
final List<String> data = new ArrayList<>(getExpectedBeforeRestoreAsStrings());
121+
data.addAll(getExpectedAfterRestoreAsStrings());
122+
return data;
123+
}
124+
Preconditions.checkState(hasRowsSet());
125+
final List<Row> data = new ArrayList<>();
126+
if (expectedBeforeRestore != null) {
127+
data.addAll(expectedBeforeRestore);
128+
}
129+
if (expectedAfterRestore != null) {
130+
data.addAll(expectedAfterRestore);
131+
}
132+
133+
Map<List<Object>, Row> deduplicatedMap = new HashMap<>();
134+
for (Row row : data) {
135+
List<Object> key = new ArrayList<>(deduplicatedFieldIndices.length);
136+
for (int i = 0; i < deduplicatedFieldIndices.length; i++) {
137+
key.add(row.getField(deduplicatedFieldIndices[i]));
138+
}
139+
deduplicatedMap.put(key, row);
140+
}
141+
return deduplicatedMap.values().stream().map(Row::toString).collect(Collectors.toList());
114142
}
115143

116144
public List<String> getExpectedMaterializedResultsAsStrings() {
145+
117146
if (expectedMaterializedStrings != null) {
118147
return expectedMaterializedStrings;
119148
}
120149
if (expectedMaterializedRows != null) {
150+
if (deduplicatedFieldIndices == null) {
151+
throw new UnsupportedOperationException(
152+
"Unsupported to deduplicate data for materialized rows");
153+
}
121154
return expectedMaterializedRows.stream()
122155
.map(Row::toString)
123156
.collect(Collectors.toList());
@@ -138,6 +171,18 @@ public boolean shouldTestChangelogData() {
138171
return testChangelogData;
139172
}
140173

174+
private boolean hasRowsSet() {
175+
return expectedBeforeRestore != null
176+
|| expectedAfterRestore != null
177+
|| expectedMaterializedRows != null;
178+
}
179+
180+
private boolean hasStringsSet() {
181+
return expectedBeforeRestoreStrings != null
182+
|| expectedAfterRestoreStrings != null
183+
|| expectedMaterializedStrings != null;
184+
}
185+
141186
/** Builder pattern for {@link SinkTestStep}. */
142187
public static final class Builder extends AbstractBuilder<Builder> {
143188

@@ -151,6 +196,8 @@ public static final class Builder extends AbstractBuilder<Builder> {
151196

152197
private boolean testChangelogData = true;
153198

199+
private @Nullable int[] deduplicatedFieldIndices;
200+
154201
private Builder(String name) {
155202
super(name);
156203
}
@@ -203,6 +250,14 @@ public Builder testMaterializedData() {
203250
return this;
204251
}
205252

253+
public Builder deduplicatedFieldIndices(int[] deduplicatedFieldIndices) {
254+
// TODO FLINK-38518 use pk to deduplicate data rather than specific fields.
255+
// This task requires refactoring the current `AbstractBuilder` to separate the
256+
// declaration of the primary key from the `List<String> schemaComponents`.
257+
this.deduplicatedFieldIndices = deduplicatedFieldIndices;
258+
return this;
259+
}
260+
206261
public SinkTestStep build() {
207262
return new SinkTestStep(
208263
name,
@@ -216,7 +271,8 @@ public SinkTestStep build() {
216271
expectedAfterRestoreStrings,
217272
expectedMaterializedBeforeRows,
218273
expectedMaterializedBeforeStrings,
219-
testChangelogData);
274+
testChangelogData,
275+
deduplicatedFieldIndices);
220276
}
221277
}
222278
}

flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SourceTestStep.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,22 @@ public final class SourceTestStep extends TableTestStep {
3434
public final List<Row> dataBeforeRestore;
3535
public final List<Row> dataAfterRestore;
3636

37+
public final boolean treatDataBeforeRestoreAsFullStageData;
38+
3739
SourceTestStep(
3840
String name,
3941
List<String> schemaComponents,
4042
@Nullable TableDistribution distribution,
4143
List<String> partitionKeys,
4244
Map<String, String> options,
45+
List<List<String>> indexes,
4346
List<Row> dataBeforeRestore,
44-
List<Row> dataAfterRestore) {
45-
super(name, schemaComponents, distribution, partitionKeys, options);
47+
List<Row> dataAfterRestore,
48+
boolean treatDataBeforeRestoreAsFullStageData) {
49+
super(name, schemaComponents, distribution, partitionKeys, options, indexes);
4650
this.dataBeforeRestore = dataBeforeRestore;
4751
this.dataAfterRestore = dataAfterRestore;
52+
this.treatDataBeforeRestoreAsFullStageData = treatDataBeforeRestoreAsFullStageData;
4853
}
4954

5055
/** Builder for creating a {@link SourceTestStep}. */
@@ -66,6 +71,8 @@ public static final class Builder extends AbstractBuilder<Builder> {
6671

6772
private final List<Row> dataBeforeRestore = new ArrayList<>();
6873
private final List<Row> dataAfterRestore = new ArrayList<>();
74+
private final List<List<String>> indexes = new ArrayList<>();
75+
private boolean treatDataBeforeRestoreAsFullStageData = false;
6976

7077
private Builder(String name) {
7178
super(name);
@@ -85,15 +92,27 @@ public Builder producedAfterRestore(Row... data) {
8592
return this;
8693
}
8794

95+
public Builder addIndex(String... index) {
96+
this.indexes.add(Arrays.asList(index));
97+
return this;
98+
}
99+
100+
public Builder treatDataBeforeRestoreAsFullStageData() {
101+
this.treatDataBeforeRestoreAsFullStageData = true;
102+
return this;
103+
}
104+
88105
public SourceTestStep build() {
89106
return new SourceTestStep(
90107
name,
91108
schemaComponents,
92109
distribution,
93110
partitionKeys,
94111
options,
112+
indexes,
95113
dataBeforeRestore,
96-
dataAfterRestore);
114+
dataAfterRestore,
115+
treatDataBeforeRestoreAsFullStageData);
97116
}
98117
}
99118
}

flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,16 @@ public Builder setupTableSource(SourceTestStep sourceTestStep) {
317317
return this;
318318
}
319319

320+
/**
321+
* Setup steps for each table source.
322+
*
323+
* <p>Use {@link SourceTestStep.Builder} to construct this step.
324+
*/
325+
public Builder setupTableSources(List<SourceTestStep> sourceTestSteps) {
326+
setupSteps.addAll(sourceTestSteps);
327+
return this;
328+
}
329+
320330
/**
321331
* Setup step for a table sink.
322332
*
@@ -327,6 +337,16 @@ public Builder setupTableSink(SinkTestStep sinkTestStep) {
327337
return this;
328338
}
329339

340+
/**
341+
* Setup steps for each table sink.
342+
*
343+
* <p>Use {@link SinkTestStep.Builder} to construct this step.
344+
*/
345+
public Builder setupTableSinks(List<SinkTestStep> sinkTestSteps) {
346+
setupSteps.addAll(sinkTestSteps);
347+
return this;
348+
}
349+
330350
/**
331351
* Setup step for a model.
332352
*

flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,18 @@
2020

2121
import org.apache.flink.configuration.ConfigOption;
2222
import org.apache.flink.configuration.ConfigurationUtils;
23+
import org.apache.flink.table.api.Schema;
2324
import org.apache.flink.table.api.TableEnvironment;
24-
import org.apache.flink.table.api.TableResult;
25+
import org.apache.flink.table.catalog.Catalog;
26+
import org.apache.flink.table.catalog.CatalogTable;
27+
import org.apache.flink.table.catalog.ObjectPath;
2528
import org.apache.flink.table.catalog.TableDistribution;
29+
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
30+
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
31+
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
2632
import org.apache.flink.table.connector.ChangelogMode;
2733
import org.apache.flink.types.RowKind;
34+
import org.apache.flink.util.Preconditions;
2835

2936
import javax.annotation.Nullable;
3037

@@ -58,25 +65,28 @@ public abstract class TableTestStep implements TestStep {
5865
public final @Nullable TableDistribution distribution;
5966
public final List<String> partitionKeys;
6067
public final Map<String, String> options;
68+
public final List<List<String>> indexes;
6169

6270
TableTestStep(
6371
String name,
6472
List<String> schemaComponents,
6573
@Nullable TableDistribution distribution,
6674
List<String> partitionKeys,
67-
Map<String, String> options) {
75+
Map<String, String> options,
76+
List<List<String>> indexes) {
6877
this.name = name;
6978
this.schemaComponents = schemaComponents;
7079
this.distribution = distribution;
7180
this.partitionKeys = partitionKeys;
7281
this.options = options;
82+
this.indexes = indexes;
7383
}
7484

75-
public TableResult apply(TableEnvironment env) {
76-
return apply(env, Collections.emptyMap());
85+
public void apply(TableEnvironment env) {
86+
apply(env, Collections.emptyMap());
7787
}
7888

79-
public TableResult apply(TableEnvironment env, Map<String, String> extraOptions) {
89+
public void apply(TableEnvironment env, Map<String, String> extraOptions) {
8090
final Map<String, String> allOptions = new HashMap<>(options);
8191
allOptions.putAll(extraOptions);
8292

@@ -97,7 +107,41 @@ public TableResult apply(TableEnvironment env, Map<String, String> extraOptions)
97107
.map(e -> String.format("'%s'='%s'", e.getKey(), e.getValue()))
98108
.collect(Collectors.joining(",\n")));
99109

100-
return env.executeSql(createTable);
110+
env.executeSql(createTable);
111+
if (indexes.isEmpty()) {
112+
return;
113+
}
114+
115+
Optional<Catalog> currentCatalogOp = env.getCatalog(env.getCurrentCatalog());
116+
Preconditions.checkState(currentCatalogOp.isPresent());
117+
Catalog catalog = currentCatalogOp.get();
118+
119+
String currentDatabaseName = env.getCurrentDatabase();
120+
ObjectPath tablePath = new ObjectPath(currentDatabaseName, name);
121+
CatalogTable oldTable;
122+
try {
123+
oldTable = (CatalogTable) catalog.getTable(tablePath);
124+
catalog.dropTable(tablePath, false);
125+
} catch (TableNotExistException e) {
126+
throw new IllegalStateException(e);
127+
}
128+
Schema schema = oldTable.getUnresolvedSchema();
129+
Schema.Builder schemaBuilder = Schema.newBuilder().fromSchema(schema);
130+
indexes.forEach(schemaBuilder::index);
131+
CatalogTable newTable =
132+
CatalogTable.newBuilder()
133+
.schema(schemaBuilder.build())
134+
.comment(oldTable.getComment())
135+
.partitionKeys(oldTable.getPartitionKeys())
136+
.options(oldTable.getOptions())
137+
.snapshot(oldTable.getSnapshot().orElse(null))
138+
.distribution(oldTable.getDistribution().orElse(null))
139+
.build();
140+
try {
141+
catalog.createTable(tablePath, newTable, false);
142+
} catch (TableAlreadyExistException | DatabaseNotExistException e) {
143+
throw new IllegalStateException(e);
144+
}
101145
}
102146

103147
/** Builder pattern for {@link SourceTestStep} and {@link SinkTestStep}. */

0 commit comments

Comments
 (0)