Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-34467] add lineage integration for jdbc connector #149

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/backwards_compatibility.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
flink: [1.18-SNAPSHOT, 1.19-SNAPSHOT]
flink: [1.20-SNAPSHOT]
jdk: [8, 11, 17]

env:
Expand Down
6 changes: 1 addition & 5 deletions .github/workflows/weekly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,7 @@ jobs:
if: github.repository_owner == 'apache'
strategy:
matrix:
flink_branches: [{
flink: 1.19-SNAPSHOT,
jdk: '8, 11, 17, 21',
branch: main
},
flink_branches: [
{
flink: 1.20-SNAPSHOT,
jdk: '8, 11, 17, 21',
Expand Down
1 change: 1 addition & 0 deletions .java-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
11
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Method <org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource.getLineageVertex()> calls method <org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator.getSqlTemplate()> in (JdbcSource.java:215)

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
org.apache.flink.connector.jdbc.lineage.DefaultJdbcExtractor.extract(java.lang.String, java.util.Properties): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.connector.jdbc.lineage.JdbcLocation$Builder.build(): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractor.extract(java.lang.String, java.util.Properties): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.connector.jdbc.lineage.OverrideJdbcLocationExtractor.extract(java.lang.String, java.util.Properties): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
5 changes: 5 additions & 0 deletions flink-connector-jdbc-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ under the License.
<optional>true</optional>
</dependency>

<dependency>
<groupId>io.openlineage</groupId>
<artifactId>openlineage-sql-java</artifactId>
</dependency>

<!-- Tests -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,23 @@
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource;
import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceBuilder;
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
import org.apache.flink.connector.jdbc.lineage.LineageUtils;
import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

Expand All @@ -53,6 +59,8 @@
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;

/**
* InputFormat to read data from a database and generate Rows. The InputFormat has to be configured
Expand Down Expand Up @@ -107,7 +115,7 @@
@Deprecated
@Experimental
public class JdbcInputFormat extends RichInputFormat<Row, InputSplit>
implements ResultTypeQueryable<Row> {
implements LineageVertexProvider, ResultTypeQueryable<Row> {

protected static final long serialVersionUID = 2L;
protected static final Logger LOG = LoggerFactory.getLogger(JdbcInputFormat.class);
Expand Down Expand Up @@ -344,6 +352,19 @@ public static JdbcInputFormatBuilder buildJdbcInputFormat() {
return new JdbcInputFormatBuilder();
}

@Override
public LineageVertex getLineageVertex() {
DefaultTypeDatasetFacet defaultTypeDatasetFacet =
new DefaultTypeDatasetFacet(getProducedType());
Optional<String> nameOpt = LineageUtils.nameOf(queryTemplate);
String namespace = LineageUtils.namespaceOf(connectionProvider);
LineageDataset dataset =
LineageUtils.datasetOf(
nameOpt.orElse(""), namespace, Arrays.asList(defaultTypeDatasetFacet));
return LineageUtils.sourceLineageVertexOf(
Boundedness.BOUNDED, Collections.singleton(dataset));
}

/** Builder for {@link JdbcInputFormat}. */
public static class JdbcInputFormatBuilder {
private final JdbcConnectionOptions.JdbcConnectionOptionsBuilder connOptionsBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,16 @@
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
import org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement;
import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer;
import org.apache.flink.connector.jdbc.lineage.LineageUtils;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;

/**
* Flink Sink to produce data into a jdbc database.
Expand All @@ -47,7 +52,9 @@
*/
@PublicEvolving
public class JdbcSink<IN>
implements StatefulSink<IN, JdbcWriterState>, TwoPhaseCommittingSink<IN, JdbcCommitable> {
implements LineageVertexProvider,
StatefulSink<IN, JdbcWriterState>,
TwoPhaseCommittingSink<IN, JdbcCommitable> {

private final DeliveryGuarantee deliveryGuarantee;
private final JdbcConnectionProvider connectionProvider;
Expand Down Expand Up @@ -113,4 +120,13 @@ public SimpleVersionedSerializer<JdbcCommitable> getCommittableSerializer() {
public SimpleVersionedSerializer<JdbcWriterState> getWriterStateSerializer() {
return new JdbcWriterStateSerializer();
}

@Override
public LineageVertex getLineageVertex() {
Optional<String> nameOpt = LineageUtils.nameOf(queryStatement.query());
String namespace = LineageUtils.namespaceOf(connectionProvider);
LineageDataset dataset =
LineageUtils.datasetOf(nameOpt.orElse(""), namespace, Collections.emptyList());
return LineageUtils.lineageVertexOf(Collections.singleton(dataset));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,35 @@
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumerator;
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumeratorState;
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSqlSplitEnumeratorBase;
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator;
import org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceReader;
import org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceSplitReader;
import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit;
import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplitSerializer;
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
import org.apache.flink.connector.jdbc.lineage.LineageUtils;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;

/** JDBC source. */
@PublicEvolving
public class JdbcSource<OUT>
implements Source<OUT, JdbcSourceSplit, JdbcSourceEnumeratorState>,
implements LineageVertexProvider,
Source<OUT, JdbcSourceSplit, JdbcSourceEnumeratorState>,
ResultTypeQueryable<OUT> {

private final Boundedness boundedness;
Expand Down Expand Up @@ -195,4 +205,18 @@ public boolean equals(Object o) {
&& deliveryGuarantee == that.deliveryGuarantee
&& Objects.equals(continuousUnBoundingSettings, that.continuousUnBoundingSettings);
}

@Override
public LineageVertex getLineageVertex() {
DefaultTypeDatasetFacet defaultTypeDatasetFacet =
new DefaultTypeDatasetFacet(getTypeInformation());
SqlTemplateSplitEnumerator enumerator =
(SqlTemplateSplitEnumerator) sqlSplitEnumeratorProvider.create();
Optional<String> nameOpt = LineageUtils.nameOf(enumerator.getSqlTemplate());
String namespace = LineageUtils.namespaceOf(connectionProvider);
LineageDataset dataset =
LineageUtils.datasetOf(
nameOpt.orElse(""), namespace, Arrays.asList(defaultTypeDatasetFacet));
return LineageUtils.sourceLineageVertexOf(boundedness, Collections.singleton(dataset));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,22 @@
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectConverter;
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
import org.apache.flink.connector.jdbc.lineage.LineageUtils;
import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;

Expand All @@ -51,11 +57,13 @@
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;

/** InputFormat for {@link JdbcDynamicTableSource}. */
@Internal
public class JdbcRowDataInputFormat extends RichInputFormat<RowData, InputSplit>
implements ResultTypeQueryable<RowData> {
implements LineageVertexProvider, ResultTypeQueryable<RowData> {

private static final long serialVersionUID = 2L;
private static final Logger LOG = LoggerFactory.getLogger(JdbcRowDataInputFormat.class);
Expand Down Expand Up @@ -296,6 +304,19 @@ public static Builder builder() {
return new Builder();
}

@Override
public LineageVertex getLineageVertex() {
DefaultTypeDatasetFacet defaultTypeDatasetFacet =
new DefaultTypeDatasetFacet(getProducedType());
Optional<String> nameOpt = LineageUtils.nameOf(queryTemplate);
String namespace = LineageUtils.namespaceOf(connectionProvider);
LineageDataset dataset =
LineageUtils.datasetOf(
nameOpt.orElse(""), namespace, Arrays.asList(defaultTypeDatasetFacet));
return LineageUtils.sourceLineageVertexOf(
Boundedness.BOUNDED, Collections.singleton(dataset));
}

/** Builder for {@link JdbcRowDataInputFormat}. */
public static class Builder {
private JdbcConnectionOptions.JdbcConnectionOptionsBuilder connOptionsBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,26 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect;
import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectConverter;
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
import org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
import org.apache.flink.connector.jdbc.lineage.LineageUtils;
import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement;
import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.LookupFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
import org.apache.flink.table.types.utils.TypeConversions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -46,14 +54,15 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/** A lookup function for {@link JdbcDynamicTableSource}. */
@Internal
public class JdbcRowDataLookupFunction extends LookupFunction {
public class JdbcRowDataLookupFunction extends LookupFunction implements LineageVertexProvider {

private static final Logger LOG = LoggerFactory.getLogger(JdbcRowDataLookupFunction.class);
private static final long serialVersionUID = 2L;
Expand All @@ -67,6 +76,7 @@ public class JdbcRowDataLookupFunction extends LookupFunction {

private final List<String> resolvedPredicates;
private final Serializable[] pushdownParams;
private final RowType producedType;

private transient FieldNamedPreparedStatement statement;

Expand Down Expand Up @@ -106,12 +116,12 @@ public JdbcRowDataLookupFunction(
.getSelectFromStatement(options.getTableName(), fieldNames, keyNames);
JdbcDialect jdbcDialect = options.getDialect();
this.jdbcDialectConverter = jdbcDialect.getRowConverter(rowType);
this.lookupKeyRowConverter =
jdbcDialect.getRowConverter(
RowType.of(
Arrays.stream(keyTypes)
.map(DataType::getLogicalType)
.toArray(LogicalType[]::new)));
this.producedType =
RowType.of(
Arrays.stream(keyTypes)
.map(DataType::getLogicalType)
.toArray(LogicalType[]::new));
this.lookupKeyRowConverter = jdbcDialect.getRowConverter(producedType);
this.resolvedPredicates = resolvedPredicates;
this.pushdownParams = pushdownParams;
}
Expand Down Expand Up @@ -224,4 +234,19 @@ public void close() throws IOException {
public Connection getDbConnection() {
return connectionProvider.getConnection();
}

@Override
public LineageVertex getLineageVertex() {
DefaultTypeDatasetFacet defaultTypeDatasetFacet =
new DefaultTypeDatasetFacet(
LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(
TypeConversions.fromLogicalToDataType(producedType)));
Optional<String> nameOpt = LineageUtils.nameOf(query);
String namespace = LineageUtils.namespaceOf(connectionProvider);
LineageDataset dataset =
LineageUtils.datasetOf(
nameOpt.orElse(""), namespace, Arrays.asList(defaultTypeDatasetFacet));
return LineageUtils.sourceLineageVertexOf(
Boundedness.BOUNDED, Collections.singleton(dataset));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,8 @@ public Connection reestablishConnection() throws SQLException, ClassNotFoundExce
closeConnection();
return getOrEstablishConnection();
}

public String getDbURL() {
return this.jdbcOptions.getDbURL();
}
}
Loading