Skip to content

Commit 4719928

Browse files
committed
[FLINK-34467] add lineage integration for jdbc connector
1 parent 134d858 commit 4719928

File tree

57 files changed

+2264
-20
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+2264
-20
lines changed

.github/workflows/backwards_compatibility.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ jobs:
2929
runs-on: ubuntu-latest
3030
strategy:
3131
matrix:
32-
flink: [1.18-SNAPSHOT, 1.19-SNAPSHOT]
32+
flink: [1.19-SNAPSHOT]
3333
jdk: [8, 11, 17]
3434

3535
env:

.github/workflows/weekly.yml

+1-5
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,7 @@ jobs:
2929
if: github.repository_owner == 'apache'
3030
strategy:
3131
matrix:
32-
flink_branches: [{
33-
flink: 1.19-SNAPSHOT,
34-
jdk: '8, 11, 17, 21',
35-
branch: main
36-
},
32+
flink_branches: [
3733
{
3834
flink: 1.20-SNAPSHOT,
3935
jdk: '8, 11, 17, 21',

.java-version

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Method <org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource.getLineageVertex()> calls method <org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator.getSqlTemplate()> in (JdbcSource.java:215)

flink-connector-jdbc-core/archunit-violations/6cdea252-f400-4c13-bc99-b325f2ebe333

+3-1
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,11 @@ Method <org.apache.flink.connector.jdbc.core.table.sink.JdbcOutputFormatBuilder.
4848
Method <org.apache.flink.connector.jdbc.core.table.sink.JdbcOutputFormatBuilder.getPrimaryKey(org.apache.flink.table.data.RowData, [Lorg.apache.flink.table.data.RowData$FieldGetter;)> has parameter of type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in (JdbcOutputFormatBuilder.java:0)
4949
Method <org.apache.flink.connector.jdbc.core.table.sink.JdbcOutputFormatBuilder.setFieldDataTypes([Lorg.apache.flink.table.types.DataType;)> has parameter of type <[Lorg.apache.flink.table.types.DataType;> in (JdbcOutputFormatBuilder.java:0)
5050
Method <org.apache.flink.connector.jdbc.core.table.source.JdbcRowDataInputFormat.createInputSplits(int)> has return type <[Lorg.apache.flink.core.io.InputSplit;> in (JdbcRowDataInputFormat.java:0)
51-
Method <org.apache.flink.connector.jdbc.core.table.source.JdbcRowDataInputFormat.getInputSplitAssigner([Lorg.apache.flink.core.io.InputSplit;)> calls constructor <org.apache.flink.api.common.io.DefaultInputSplitAssigner.<init>([Lorg.apache.flink.core.io.InputSplit;)> in (JdbcRowDataInputFormat.java:287)
51+
Method <org.apache.flink.connector.jdbc.core.table.source.JdbcRowDataInputFormat.getInputSplitAssigner([Lorg.apache.flink.core.io.InputSplit;)> calls constructor <org.apache.flink.api.common.io.DefaultInputSplitAssigner.<init>([Lorg.apache.flink.core.io.InputSplit;)> in (JdbcRowDataInputFormat.java:295)
5252
Method <org.apache.flink.connector.jdbc.core.table.source.JdbcRowDataInputFormat.getInputSplitAssigner([Lorg.apache.flink.core.io.InputSplit;)> has parameter of type <[Lorg.apache.flink.core.io.InputSplit;> in (JdbcRowDataInputFormat.java:0)
5353
Method <org.apache.flink.connector.jdbc.core.table.source.JdbcRowDataLookupFunction.getDbConnection()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (JdbcRowDataLookupFunction.java:0)
54+
Method <org.apache.flink.connector.jdbc.core.table.source.JdbcRowDataLookupFunction.getLineageVertex()> calls method <org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(org.apache.flink.table.types.DataType)> in (JdbcRowDataLookupFunction.java:242)
55+
Method <org.apache.flink.connector.jdbc.core.table.source.JdbcRowDataLookupFunction.getLineageVertex()> calls method <org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType(org.apache.flink.table.types.logical.LogicalType)> in (JdbcRowDataLookupFunction.java:243)
5456
Method <org.apache.flink.connector.jdbc.datasource.connections.xa.SimpleXaConnectionProvider.from(javax.sql.XADataSource)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (SimpleXaConnectionProvider.java:0)
5557
Method <org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId.deserialize([B)> calls constructor <org.apache.flink.core.memory.DataInputDeserializer.<init>([B)> in (TransactionId.java:96)
5658
Method <org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId.deserialize([B)> calls method <org.apache.flink.core.memory.DataInputDeserializer.readInt()> in (TransactionId.java:101)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
org.apache.flink.connector.jdbc.lineage.DefaultJdbcExtractor.extract(java.lang.String, java.util.Properties): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
2+
org.apache.flink.connector.jdbc.lineage.JdbcLocation$Builder.build(): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
3+
org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractor.extract(java.lang.String, java.util.Properties): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
4+
org.apache.flink.connector.jdbc.lineage.OverrideJdbcLocationExtractor.extract(java.lang.String, java.util.Properties): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated

flink-connector-jdbc-core/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@ under the License.
5656
<optional>true</optional>
5757
</dependency>
5858

59+
<dependency>
60+
<groupId>io.openlineage</groupId>
61+
<artifactId>openlineage-sql-java</artifactId>
62+
</dependency>
63+
5964
<!-- Tests -->
6065

6166
<dependency>

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java

+22-1
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,23 @@
2424
import org.apache.flink.api.common.io.InputFormat;
2525
import org.apache.flink.api.common.io.RichInputFormat;
2626
import org.apache.flink.api.common.io.statistics.BaseStatistics;
27+
import org.apache.flink.api.connector.source.Boundedness;
2728
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
2829
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2930
import org.apache.flink.configuration.Configuration;
3031
import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource;
3132
import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceBuilder;
3233
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
3334
import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
35+
import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
36+
import org.apache.flink.connector.jdbc.lineage.LineageUtils;
3437
import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
3538
import org.apache.flink.core.io.GenericInputSplit;
3639
import org.apache.flink.core.io.InputSplit;
3740
import org.apache.flink.core.io.InputSplitAssigner;
41+
import org.apache.flink.streaming.api.lineage.LineageDataset;
42+
import org.apache.flink.streaming.api.lineage.LineageVertex;
43+
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
3844
import org.apache.flink.types.Row;
3945
import org.apache.flink.util.Preconditions;
4046

@@ -53,6 +59,8 @@
5359
import java.sql.Time;
5460
import java.sql.Timestamp;
5561
import java.util.Arrays;
62+
import java.util.Collections;
63+
import java.util.Optional;
5664

5765
/**
5866
* InputFormat to read data from a database and generate Rows. The InputFormat has to be configured
@@ -107,7 +115,7 @@
107115
@Deprecated
108116
@Experimental
109117
public class JdbcInputFormat extends RichInputFormat<Row, InputSplit>
110-
implements ResultTypeQueryable<Row> {
118+
implements LineageVertexProvider, ResultTypeQueryable<Row> {
111119

112120
protected static final long serialVersionUID = 2L;
113121
protected static final Logger LOG = LoggerFactory.getLogger(JdbcInputFormat.class);
@@ -344,6 +352,19 @@ public static JdbcInputFormatBuilder buildJdbcInputFormat() {
344352
return new JdbcInputFormatBuilder();
345353
}
346354

355+
@Override
356+
public LineageVertex getLineageVertex() {
357+
DefaultTypeDatasetFacet defaultTypeDatasetFacet =
358+
new DefaultTypeDatasetFacet(getProducedType());
359+
Optional<String> nameOpt = LineageUtils.nameOf(queryTemplate);
360+
String namespace = LineageUtils.namespaceOf(connectionProvider);
361+
LineageDataset dataset =
362+
LineageUtils.datasetOf(
363+
nameOpt.orElse(""), namespace, Arrays.asList(defaultTypeDatasetFacet));
364+
return LineageUtils.sourceLineageVertexOf(
365+
Boundedness.BOUNDED, Collections.singleton(dataset));
366+
}
367+
347368
/** Builder for {@link JdbcInputFormat}. */
348369
public static class JdbcInputFormatBuilder {
349370
private final JdbcConnectionOptions.JdbcConnectionOptionsBuilder connOptionsBuilder;

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java

+17-1
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,16 @@
3434
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
3535
import org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement;
3636
import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer;
37+
import org.apache.flink.connector.jdbc.lineage.LineageUtils;
3738
import org.apache.flink.core.io.SimpleVersionedSerializer;
39+
import org.apache.flink.streaming.api.lineage.LineageDataset;
40+
import org.apache.flink.streaming.api.lineage.LineageVertex;
41+
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
3842

3943
import java.io.IOException;
4044
import java.util.Collection;
4145
import java.util.Collections;
46+
import java.util.Optional;
4247

4348
/**
4449
* Flink Sink to produce data into a jdbc database.
@@ -47,7 +52,9 @@
4752
*/
4853
@PublicEvolving
4954
public class JdbcSink<IN>
50-
implements StatefulSink<IN, JdbcWriterState>, TwoPhaseCommittingSink<IN, JdbcCommitable> {
55+
implements LineageVertexProvider,
56+
StatefulSink<IN, JdbcWriterState>,
57+
TwoPhaseCommittingSink<IN, JdbcCommitable> {
5158

5259
private final DeliveryGuarantee deliveryGuarantee;
5360
private final JdbcConnectionProvider connectionProvider;
@@ -113,4 +120,13 @@ public SimpleVersionedSerializer<JdbcCommitable> getCommittableSerializer() {
113120
public SimpleVersionedSerializer<JdbcWriterState> getWriterStateSerializer() {
114121
return new JdbcWriterStateSerializer();
115122
}
123+
124+
@Override
125+
public LineageVertex getLineageVertex() {
126+
Optional<String> nameOpt = LineageUtils.nameOf(queryStatement.query());
127+
String namespace = LineageUtils.namespaceOf(connectionProvider);
128+
LineageDataset dataset =
129+
LineageUtils.datasetOf(nameOpt.orElse(""), namespace, Collections.emptyList());
130+
return LineageUtils.lineageVertexOf(Collections.singleton(dataset));
131+
}
116132
}

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java

+25-1
Original file line numberDiff line numberDiff line change
@@ -34,26 +34,36 @@
3434
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumerator;
3535
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumeratorState;
3636
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSqlSplitEnumeratorBase;
37+
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator;
3738
import org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceReader;
3839
import org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceSplitReader;
3940
import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
4041
import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit;
4142
import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplitSerializer;
4243
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
44+
import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
45+
import org.apache.flink.connector.jdbc.lineage.LineageUtils;
4346
import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings;
4447
import org.apache.flink.core.io.SimpleVersionedSerializer;
48+
import org.apache.flink.streaming.api.lineage.LineageDataset;
49+
import org.apache.flink.streaming.api.lineage.LineageVertex;
50+
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
4551
import org.apache.flink.util.Preconditions;
4652

4753
import javax.annotation.Nullable;
4854

4955
import java.io.Serializable;
5056
import java.util.ArrayList;
57+
import java.util.Arrays;
58+
import java.util.Collections;
5159
import java.util.Objects;
60+
import java.util.Optional;
5261

5362
/** JDBC source. */
5463
@PublicEvolving
5564
public class JdbcSource<OUT>
56-
implements Source<OUT, JdbcSourceSplit, JdbcSourceEnumeratorState>,
65+
implements LineageVertexProvider,
66+
Source<OUT, JdbcSourceSplit, JdbcSourceEnumeratorState>,
5767
ResultTypeQueryable<OUT> {
5868

5969
private final Boundedness boundedness;
@@ -195,4 +205,18 @@ public boolean equals(Object o) {
195205
&& deliveryGuarantee == that.deliveryGuarantee
196206
&& Objects.equals(continuousUnBoundingSettings, that.continuousUnBoundingSettings);
197207
}
208+
209+
@Override
210+
public LineageVertex getLineageVertex() {
211+
DefaultTypeDatasetFacet defaultTypeDatasetFacet =
212+
new DefaultTypeDatasetFacet(getTypeInformation());
213+
SqlTemplateSplitEnumerator enumerator =
214+
(SqlTemplateSplitEnumerator) sqlSplitEnumeratorProvider.create();
215+
Optional<String> nameOpt = LineageUtils.nameOf(enumerator.getSqlTemplate());
216+
String namespace = LineageUtils.namespaceOf(connectionProvider);
217+
LineageDataset dataset =
218+
LineageUtils.datasetOf(
219+
nameOpt.orElse(""), namespace, Arrays.asList(defaultTypeDatasetFacet));
220+
return LineageUtils.sourceLineageVertexOf(boundedness, Collections.singleton(dataset));
221+
}
198222
}

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataInputFormat.java

+22-1
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,22 @@
2424
import org.apache.flink.api.common.io.RichInputFormat;
2525
import org.apache.flink.api.common.io.statistics.BaseStatistics;
2626
import org.apache.flink.api.common.typeinfo.TypeInformation;
27+
import org.apache.flink.api.connector.source.Boundedness;
2728
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
2829
import org.apache.flink.configuration.Configuration;
2930
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
3031
import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectConverter;
3132
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
3233
import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
34+
import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
35+
import org.apache.flink.connector.jdbc.lineage.LineageUtils;
3336
import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
3437
import org.apache.flink.core.io.GenericInputSplit;
3538
import org.apache.flink.core.io.InputSplit;
3639
import org.apache.flink.core.io.InputSplitAssigner;
40+
import org.apache.flink.streaming.api.lineage.LineageDataset;
41+
import org.apache.flink.streaming.api.lineage.LineageVertex;
42+
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
3743
import org.apache.flink.table.data.RowData;
3844
import org.apache.flink.util.Preconditions;
3945

@@ -51,11 +57,13 @@
5157
import java.sql.Time;
5258
import java.sql.Timestamp;
5359
import java.util.Arrays;
60+
import java.util.Collections;
61+
import java.util.Optional;
5462

5563
/** InputFormat for {@link JdbcDynamicTableSource}. */
5664
@Internal
5765
public class JdbcRowDataInputFormat extends RichInputFormat<RowData, InputSplit>
58-
implements ResultTypeQueryable<RowData> {
66+
implements LineageVertexProvider, ResultTypeQueryable<RowData> {
5967

6068
private static final long serialVersionUID = 2L;
6169
private static final Logger LOG = LoggerFactory.getLogger(JdbcRowDataInputFormat.class);
@@ -296,6 +304,19 @@ public static Builder builder() {
296304
return new Builder();
297305
}
298306

307+
@Override
308+
public LineageVertex getLineageVertex() {
309+
DefaultTypeDatasetFacet defaultTypeDatasetFacet =
310+
new DefaultTypeDatasetFacet(getProducedType());
311+
Optional<String> nameOpt = LineageUtils.nameOf(queryTemplate);
312+
String namespace = LineageUtils.namespaceOf(connectionProvider);
313+
LineageDataset dataset =
314+
LineageUtils.datasetOf(
315+
nameOpt.orElse(""), namespace, Arrays.asList(defaultTypeDatasetFacet));
316+
return LineageUtils.sourceLineageVertexOf(
317+
Boundedness.BOUNDED, Collections.singleton(dataset));
318+
}
319+
299320
/** Builder for {@link JdbcRowDataInputFormat}. */
300321
public static class Builder {
301322
private JdbcConnectionOptions.JdbcConnectionOptionsBuilder connOptionsBuilder;

0 commit comments

Comments
 (0)