From 4b0c0388eb34d289cbbffea81f50358b3aff4d5c Mon Sep 17 00:00:00 2001 From: Joao Boto Date: Mon, 15 Dec 2025 16:55:02 +0100 Subject: [PATCH] [FLINK-38733] Add new SplitterEnumerator on JdbcSource --- .../core/datastream/source/JdbcSource.java | 43 ++-- .../datastream/source/JdbcSourceBuilder.java | 74 ++++--- .../enumerator/JdbcSourceEnumerator.java | 31 +-- .../JdbcSqlSplitEnumeratorBase.java | 2 + .../SqlTemplateSplitEnumerator.java | 2 + .../splitter/JdbcSqlSplitterEnumerator.java | 71 +++++++ .../splitter/PreparedSplitterEnumerator.java | 127 ++++++++++++ .../PreparedSplitterNumericParameters.java | 87 ++++++++ .../SlideTimingSplitterEnumerator.java | 185 ++++++++++++++++++ .../splitter/SplitterEnumerator.java | 96 +++++++++ .../splitter/SqlSplitterEnumerator.java | 111 +++++++++++ .../connector/jdbc/lineage/LineageUtils.java | 8 +- .../CompositeJdbcParameterValuesProvider.java | 1 + .../JdbcGenericParameterValuesProvider.java | 1 + .../JdbcNumericBetweenParametersProvider.java | 1 + .../split/JdbcParameterValuesProvider.java | 1 + .../JdbcSlideTimingParameterProvider.java | 1 + .../source/JdbcSourceBuilderTest.java | 48 +++-- .../source/JdbcSourceStreamRelatedITCase.java | 1 + .../enumerator/JdbcSourceEnumeratorTest.java | 38 +++- ...PreparedSplitterNumericParametersTest.java | 116 +++++++++++ 21 files changed, 978 insertions(+), 67 deletions(-) create mode 100644 flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/JdbcSqlSplitterEnumerator.java create mode 100644 flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/PreparedSplitterEnumerator.java create mode 100644 flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/PreparedSplitterNumericParameters.java create mode 100644 flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/SlideTimingSplitterEnumerator.java create mode 100644 flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/SplitterEnumerator.java create mode 100644 flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/SqlSplitterEnumerator.java create mode 100644 flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/PreparedSplitterNumericParametersTest.java diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java index c861d6768..3bb316f4f 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java @@ -35,7 +35,8 @@ import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumerator; import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumeratorState; import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSqlSplitEnumeratorBase; -import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator; +import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.splitter.JdbcSqlSplitterEnumerator; +import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.splitter.SplitterEnumerator; import org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceReader; import org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceSplitReader; import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor; @@ -71,12 +72,13 @@ public class JdbcSource private final @Nullable ContinuousUnBoundingSettings continuousUnBoundingSettings; private final Configuration configuration; - private final JdbcSqlSplitEnumeratorBase.Provider sqlSplitEnumeratorProvider; + private final SplitterEnumerator splitterEnumerator; protected JdbcConnectionProvider connectionProvider; private final ResultExtractor resultExtractor; private final DeliveryGuarantee deliveryGuarantee; + @Deprecated JdbcSource( Configuration configuration, JdbcConnectionProvider connectionProvider, @@ -85,9 +87,27 @@ public class JdbcSource TypeInformation typeInformation, @Nullable DeliveryGuarantee deliveryGuarantee, @Nullable ContinuousUnBoundingSettings continuousUnBoundingSettings) { + this( + configuration, + connectionProvider, + new JdbcSqlSplitterEnumerator(sqlSplitEnumeratorProvider), + resultExtractor, + typeInformation, + deliveryGuarantee, + continuousUnBoundingSettings); + } + + JdbcSource( + Configuration configuration, + JdbcConnectionProvider connectionProvider, + SplitterEnumerator splitterEnumerator, + ResultExtractor resultExtractor, + TypeInformation typeInformation, + @Nullable DeliveryGuarantee deliveryGuarantee, + @Nullable ContinuousUnBoundingSettings continuousUnBoundingSettings) { this.configuration = Preconditions.checkNotNull(configuration); this.connectionProvider = Preconditions.checkNotNull(connectionProvider); - this.sqlSplitEnumeratorProvider = Preconditions.checkNotNull(sqlSplitEnumeratorProvider); + this.splitterEnumerator = Preconditions.checkNotNull(splitterEnumerator); this.resultExtractor = Preconditions.checkNotNull(resultExtractor); this.deliveryGuarantee = Objects.isNull(deliveryGuarantee) ? DeliveryGuarantee.NONE : deliveryGuarantee; @@ -125,7 +145,8 @@ public SplitEnumerator createEnumera SplitEnumeratorContext enumContext) throws Exception { return new JdbcSourceEnumerator( enumContext, - sqlSplitEnumeratorProvider.create(), + splitterEnumerator, + connectionProvider, continuousUnBoundingSettings, new ArrayList<>()); } @@ -139,7 +160,8 @@ public SplitEnumerator restoreEnumer checkpoint.getOptionalUserDefinedSplitEnumeratorState(); return new JdbcSourceEnumerator( enumContext, - sqlSplitEnumeratorProvider.restore(optionalUserDefinedSplitEnumeratorState), + splitterEnumerator.restoreState(optionalUserDefinedSplitEnumeratorState), + connectionProvider, continuousUnBoundingSettings, checkpoint.getRemainingSplits()); } @@ -167,8 +189,8 @@ public static JdbcSourceBuilder builder() { // ---- Visible for testing methods. --- @VisibleForTesting - public JdbcSqlSplitEnumeratorBase.Provider getSqlSplitEnumeratorProvider() { - return sqlSplitEnumeratorProvider; + public SplitterEnumerator getSplitterEnumerator() { + return splitterEnumerator; } @VisibleForTesting @@ -204,7 +226,7 @@ public boolean equals(Object o) { return boundedness == that.boundedness && Objects.equals(typeInformation, that.typeInformation) && Objects.equals(configuration, that.configuration) - && Objects.equals(sqlSplitEnumeratorProvider, that.sqlSplitEnumeratorProvider) + && Objects.equals(splitterEnumerator, that.splitterEnumerator) && Objects.equals(connectionProvider, that.connectionProvider) && Objects.equals(resultExtractor, that.resultExtractor) && deliveryGuarantee == that.deliveryGuarantee @@ -215,9 +237,8 @@ public boolean equals(Object o) { public LineageVertex getLineageVertex() { DefaultTypeDatasetFacet defaultTypeDatasetFacet = new DefaultTypeDatasetFacet(getTypeInformation()); - SqlTemplateSplitEnumerator enumerator = - (SqlTemplateSplitEnumerator) sqlSplitEnumeratorProvider.create(); - Optional nameOpt = LineageUtils.tableNameOf(enumerator.getSqlTemplate(), true); + Optional nameOpt = + LineageUtils.tableNameOf(splitterEnumerator.lineageQueries(), true); String namespace = LineageUtils.namespaceOf(connectionProvider); LineageDataset dataset = LineageUtils.datasetOf( diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilder.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilder.java index 87b39f401..c4a6b14cf 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilder.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilder.java @@ -24,7 +24,10 @@ import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.core.datastream.source.config.ContinuousUnBoundingSettings; +import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSqlSplitEnumeratorBase; import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator; +import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.splitter.JdbcSqlSplitterEnumerator; +import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.splitter.SplitterEnumerator; import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor; import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider; @@ -51,7 +54,7 @@ * *

  * JdbcSource<Row> source = JdbcSource.<Row>builder()
- *           .setSql(validSql)
+ *           .setSplitter(PreparedSplitterEnumerator.of(validSql))
  *           .setResultExtractor(new RowResultExtractor())
  *           .setDBUrl(dbUrl)
  *           .setDriverName(driverName)
@@ -67,6 +70,7 @@
  *
  * 

  *
+ * String query = "select * from books WHERE author = ?"
  * Serializable[][] queryParameters = new String[2][1];
  * queryParameters[0] = new String[]{"Kumar"};
  * queryParameters[1] = new String[]{"Tan Ah Teck"};
@@ -78,13 +82,12 @@
  *          .setUsername(username)
  * 			.setDriverName("org.apache.derby.jdbc.EmbeddedDriver")
  * 			.setDBUrl("jdbc:derby:memory:ebookshop")
- * 			.setSql("select * from books WHERE author = ?")
- * 			.setJdbcParameterValuesProvider(new JdbcGenericParameterValuesProvider(queryParameters))
+ *          .setSplitter(PreparedSplitterEnumerator.of(query, queryParameters))
  *          .build();
  * 
* * @see Row - * @see JdbcParameterValuesProvider + * @see SplitterEnumerator * @see PreparedStatement * @see DriverManager * @see JdbcSource @@ -118,7 +121,7 @@ public class JdbcSourceBuilder { private JdbcParameterValuesProvider jdbcParameterValuesProvider; private @Nullable Serializable optionalSqlSplitEnumeratorState; private ResultExtractor resultExtractor; - + private SplitterEnumerator splitterEnumerator; private JdbcConnectionProvider connectionProvider; JdbcSourceBuilder() { @@ -132,6 +135,12 @@ public class JdbcSourceBuilder { this.autoCommit = true; } + public JdbcSourceBuilder setSplitter(SplitterEnumerator splitterEnumerator) { + this.splitterEnumerator = splitterEnumerator; + return this; + } + + @Deprecated public JdbcSourceBuilder setSql(@Nonnull String sql) { Preconditions.checkArgument( !StringUtils.isNullOrWhitespaceOnly(sql), @@ -197,6 +206,7 @@ public JdbcSourceBuilder setContinuousUnBoundingSettings( * If the value was set as an instance of {@link JdbcSlideTimingParameterProvider}, it's * required to specify the {@link #continuousUnBoundingSettings}. */ + @Deprecated public JdbcSourceBuilder setJdbcParameterValuesProvider( @Nonnull JdbcParameterValuesProvider parameterValuesProvider) { this.jdbcParameterValuesProvider = Preconditions.checkNotNull(parameterValuesProvider); @@ -289,36 +299,52 @@ public JdbcSource build() { JdbcSourceOptions.READER_FETCH_BATCH_SIZE, splitReaderFetchBatchSize); this.configuration.set(JdbcSourceOptions.AUTO_COMMIT, autoCommit); - Preconditions.checkState( - !StringUtils.isNullOrWhitespaceOnly(sql), "'sql' mustn't be null or empty."); Preconditions.checkNotNull(resultExtractor, "'resultExtractor' mustn't be null."); Preconditions.checkNotNull(typeInformation, "'typeInformation' mustn't be null."); - if (Objects.nonNull(continuousUnBoundingSettings)) { - Preconditions.checkArgument( - Objects.nonNull(jdbcParameterValuesProvider) - && jdbcParameterValuesProvider - instanceof JdbcSlideTimingParameterProvider, - INVALID_SLIDE_TIMING_CONTINUOUS_HINT); - } - - if (Objects.nonNull(jdbcParameterValuesProvider) - && jdbcParameterValuesProvider instanceof JdbcSlideTimingParameterProvider) { - Preconditions.checkArgument( - Objects.nonNull(continuousUnBoundingSettings), - INVALID_CONTINUOUS_SLIDE_TIMING_HINT); + if (this.splitterEnumerator == null) { + Preconditions.checkState( + !StringUtils.isNullOrWhitespaceOnly(sql), "'sql' mustn't be null or empty."); + + if (Objects.nonNull(continuousUnBoundingSettings)) { + Preconditions.checkArgument( + Objects.nonNull(jdbcParameterValuesProvider) + && jdbcParameterValuesProvider + instanceof JdbcSlideTimingParameterProvider, + INVALID_SLIDE_TIMING_CONTINUOUS_HINT); + } + + if (Objects.nonNull(jdbcParameterValuesProvider) + && jdbcParameterValuesProvider instanceof JdbcSlideTimingParameterProvider) { + Preconditions.checkArgument( + Objects.nonNull(continuousUnBoundingSettings), + INVALID_CONTINUOUS_SLIDE_TIMING_HINT); + } + + this.splitterEnumerator = + getSplitter(sql, jdbcParameterValuesProvider, optionalSqlSplitEnumeratorState); } return new JdbcSource<>( configuration, connectionProvider, - new SqlTemplateSplitEnumerator.TemplateSqlSplitEnumeratorProvider() - .setOptionalSqlSplitEnumeratorState(optionalSqlSplitEnumeratorState) - .setSqlTemplate(sql) - .setParameterValuesProvider(jdbcParameterValuesProvider), + splitterEnumerator, resultExtractor, typeInformation, deliveryGuarantee, continuousUnBoundingSettings); } + + private SplitterEnumerator getSplitter( + String sqlTemplate, + JdbcParameterValuesProvider parameterProvider, + Serializable userDefinedState) { + JdbcSqlSplitEnumeratorBase.Provider provider = + new SqlTemplateSplitEnumerator.TemplateSqlSplitEnumeratorProvider() + .setOptionalSqlSplitEnumeratorState(userDefinedState) + .setSqlTemplate(sqlTemplate) + .setParameterValuesProvider(parameterProvider); + + return new JdbcSqlSplitterEnumerator(provider); + } } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumerator.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumerator.java index c545a08f5..16078ad33 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumerator.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumerator.java @@ -23,7 +23,9 @@ import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.connector.jdbc.core.datastream.source.config.ContinuousUnBoundingSettings; +import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.splitter.SplitterEnumerator; import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit; +import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -50,16 +52,18 @@ public class JdbcSourceEnumerator private final Boundedness boundedness; private final LinkedHashMap readersAwaitingSplit; private final List unassigned; - private final JdbcSqlSplitEnumeratorBase sqlSplitEnumerator; + private final SplitterEnumerator splitterEnumerator; private final @Nullable ContinuousUnBoundingSettings continuousUnBoundingSettings; + private final JdbcConnectionProvider connectionProvider; public JdbcSourceEnumerator( SplitEnumeratorContext context, - JdbcSqlSplitEnumeratorBase sqlSplitEnumerator, + SplitterEnumerator splitterEnumerator, + JdbcConnectionProvider connectionProvider, ContinuousUnBoundingSettings continuousUnBoundingSettings, List unassigned) { this.context = Preconditions.checkNotNull(context); - this.sqlSplitEnumerator = Preconditions.checkNotNull(sqlSplitEnumerator); + this.splitterEnumerator = Preconditions.checkNotNull(splitterEnumerator); this.continuousUnBoundingSettings = continuousUnBoundingSettings; this.boundedness = Objects.isNull(continuousUnBoundingSettings) @@ -67,30 +71,33 @@ public JdbcSourceEnumerator( : Boundedness.CONTINUOUS_UNBOUNDED; this.unassigned = Preconditions.checkNotNull(unassigned); this.readersAwaitingSplit = new LinkedHashMap<>(); + this.connectionProvider = connectionProvider; } @Override public void start() { - sqlSplitEnumerator.open(); + splitterEnumerator.start(connectionProvider); if (boundedness == Boundedness.CONTINUOUS_UNBOUNDED && Objects.nonNull(continuousUnBoundingSettings)) { context.callAsync( - () -> sqlSplitEnumerator.enumerateSplits(() -> 1024 - unassigned.size() > 0), + () -> splitterEnumerator.enumerateSplits(() -> 1024 - unassigned.size() > 0), this::processNewSplits, continuousUnBoundingSettings.getInitialDiscoveryDelay().toMillis(), continuousUnBoundingSettings.getDiscoveryInterval().toMillis()); } else { - try { - unassigned.addAll(sqlSplitEnumerator.enumerateSplits(() -> true)); - } catch (IOException e) { - throw new RuntimeException(e); - } + context.callAsync( + () -> + splitterEnumerator.isAllSplitsFinished() + ? Collections.emptyList() + : splitterEnumerator.enumerateSplits(), + (List splits, Throwable error) -> + this.unassigned.addAll(splits)); } } @Override public void close() throws IOException { - sqlSplitEnumerator.close(); + splitterEnumerator.close(); } @Override @@ -136,7 +143,7 @@ public JdbcSourceEnumeratorState snapshotState(long checkpointId) throws Excepti Collections.emptyList(), Collections.emptyList(), new ArrayList<>(unassigned), - sqlSplitEnumerator.optionalSqlSplitEnumeratorState); + splitterEnumerator.serializableState()); } private Optional getNextSplit() { diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSqlSplitEnumeratorBase.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSqlSplitEnumeratorBase.java index dd3eb34bc..2a2797cde 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSqlSplitEnumeratorBase.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSqlSplitEnumeratorBase.java @@ -34,6 +34,7 @@ * * @param JDBC split type. */ +@Deprecated @PublicEvolving public abstract class JdbcSqlSplitEnumeratorBase implements AutoCloseable, Serializable { private final char[] currentId = "0000000000".toCharArray(); @@ -78,6 +79,7 @@ public abstract List enumerateSplits(@Nonnull Supplier * * @param Split type. */ + @Deprecated @PublicEvolving public interface Provider extends Serializable { diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/SqlTemplateSplitEnumerator.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/SqlTemplateSplitEnumerator.java index b4565d950..227f195cb 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/SqlTemplateSplitEnumerator.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/SqlTemplateSplitEnumerator.java @@ -38,6 +38,7 @@ import java.util.function.Supplier; /** A split enumerator based on sql-parameters grains. */ +@Deprecated public final class SqlTemplateSplitEnumerator extends JdbcSqlSplitEnumeratorBase { public static final Logger LOG = LoggerFactory.getLogger(SqlTemplateSplitEnumerator.class); @@ -105,6 +106,7 @@ public JdbcParameterValuesProvider getParameterValuesProvider() { } /** The {@link TemplateSqlSplitEnumeratorProvider} for {@link SqlTemplateSplitEnumerator}. */ + @Deprecated public static class TemplateSqlSplitEnumeratorProvider implements JdbcSqlSplitEnumeratorBase.Provider { diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/JdbcSqlSplitterEnumerator.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/JdbcSqlSplitterEnumerator.java new file mode 100644 index 000000000..3f3abf9b9 --- /dev/null +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/JdbcSqlSplitterEnumerator.java @@ -0,0 +1,71 @@ +package org.apache.flink.connector.jdbc.core.datastream.source.enumerator.splitter; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSqlSplitEnumeratorBase; +import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator; +import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit; +import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +/** A split enumerator for JdbcSqlSplitEnumeratorBase. */ +@Deprecated +@Internal +public class JdbcSqlSplitterEnumerator implements SplitterEnumerator { + private final JdbcSqlSplitEnumeratorBase.Provider provider; + private JdbcSqlSplitEnumeratorBase base; + private boolean finished; + + public JdbcSqlSplitterEnumerator(JdbcSqlSplitEnumeratorBase.Provider provider) { + this.provider = provider; + this.base = provider.create(); + this.finished = false; + } + + @Override + public void start(JdbcConnectionProvider connectionProvider) { + base.open(); + } + + @Override + public void close() { + base.close(); + } + + @Override + public boolean isAllSplitsFinished() { + return this.finished; + } + + @Override + public List enumerateSplits() { + try { + this.finished = true; + return base.enumerateSplits(() -> true); + } catch (Exception e) { + throw new RuntimeException("Error enumerating splits", e); + } + } + + @Override + public List lineageQueries() { + List queries = new ArrayList<>(); + if (base instanceof SqlTemplateSplitEnumerator) { + queries.add(((SqlTemplateSplitEnumerator) base).getSqlTemplate()); + } + return queries; + } + + @Override + public Serializable serializableState() { + return null; + } + + @Override + public SplitterEnumerator restoreState(Serializable state) { + this.base = provider.restore(state); + return this; + } +} diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/PreparedSplitterEnumerator.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/PreparedSplitterEnumerator.java new file mode 100644 index 000000000..2e3c20cb0 --- /dev/null +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/PreparedSplitterEnumerator.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.core.datastream.source.enumerator.splitter; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit; +import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +/** A split enumerator based on sql-parameters grains. */ +@PublicEvolving +public class PreparedSplitterEnumerator extends SqlSplitterEnumerator { + public static final Logger LOG = LoggerFactory.getLogger(PreparedSplitterEnumerator.class); + + private final Serializable[][] sqlParameters; + private boolean finished; + + protected PreparedSplitterEnumerator(String sqlTemplate, Serializable[][] sqlParameters) { + super(Preconditions.checkNotNull(sqlTemplate)); + this.sqlParameters = Preconditions.checkNotNull(sqlParameters); + this.finished = false; + } + + public static PreparedSplitterEnumerator of( + String sqlTemplate, Serializable[][] sqlParameters) { + return new PreparedSplitterEnumerator(sqlTemplate, sqlParameters); + } + + public static PreparedSplitterEnumerator of(String sqlTemplate) { + return new PreparedSplitterEnumerator(sqlTemplate, new Serializable[0][]); + } + + public static PreparedSplitterEnumerator of( + String sqlTemplate, long minValue, long maxValue, long batchSize) { + Serializable[][] parameters = + new PreparedSplitterNumericParameters(minValue, maxValue) + .withBatchSize(batchSize) + .getParameterValues(); + return new PreparedSplitterEnumerator(sqlTemplate, parameters); + } + + public static PreparedSplitterEnumerator of( + String sqlTemplate, long minValue, long maxValue, int batchNum) { + Serializable[][] parameters = + new PreparedSplitterNumericParameters(minValue, maxValue) + .withBatchNum(batchNum) + .getParameterValues(); + return new PreparedSplitterEnumerator(sqlTemplate, parameters); + } + + @Override + public void start(JdbcConnectionProvider connectionProvider) {} + + @Override + public void close() {} + + @Override + public boolean isAllSplitsFinished() { + return this.finished; + } + + @VisibleForTesting + public Serializable[][] getSqlParameters() { + return sqlParameters; + } + + @Override + public List enumerateSplits() { + List splitList = super.enumerateSplits(); + this.finished = true; + return splitList; + } + + @Override + public @Nullable Serializable serializableState() { + return null; + } + + @Override + public PreparedSplitterEnumerator restoreState(Serializable state) { + return this; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof PreparedSplitterEnumerator)) { + return false; + } + if (!super.equals(o)) { + return false; + } + PreparedSplitterEnumerator that = (PreparedSplitterEnumerator) o; + return Objects.deepEquals(sqlParameters, that.sqlParameters); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), Arrays.deepHashCode(sqlParameters)); + } +} diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/PreparedSplitterNumericParameters.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/PreparedSplitterNumericParameters.java new file mode 100644 index 000000000..ba1c7aec6 --- /dev/null +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/PreparedSplitterNumericParameters.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.core.datastream.source.enumerator.splitter; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * This query parameters generator is a helper class to parameterize from/to queries on a numeric + * column. The generated array of from/to values will be equally sized to batchSize (apart from the + * last one), ranging from minVal up to maxVal. + */ +@Internal +public class PreparedSplitterNumericParameters implements Serializable { + + private final long minVal; + private final long maxVal; + private long batchSize; + private int batchNum; + + public PreparedSplitterNumericParameters(long minVal, long maxVal) { + this.minVal = minVal; + this.maxVal = maxVal; + this.batchNum = 0; + this.batchSize = 0; + } + + public PreparedSplitterNumericParameters withBatchSize(long batchSize) { + Preconditions.checkArgument(batchSize > 0, "Batch size must be positive"); + + long maxElemCount = (maxVal - minVal) + 1; + if (batchSize > maxElemCount) { + batchSize = maxElemCount; + } + this.batchSize = batchSize; + this.batchNum = new Double(Math.ceil((double) maxElemCount / batchSize)).intValue(); + return this; + } + + public PreparedSplitterNumericParameters withBatchNum(int batchNum) { + Preconditions.checkArgument(batchNum > 0, "Batch number must be positive"); + + long maxElemCount = (maxVal - minVal) + 1; + if (batchNum > maxElemCount) { + batchNum = (int) maxElemCount; + } + this.batchNum = batchNum; + this.batchSize = new Double(Math.ceil((double) maxElemCount / batchNum)).longValue(); + return this; + } + + public Serializable[][] getParameterValues() { + Preconditions.checkState( + batchSize > 0, + "Batch size and batch number must be positive. Have you called `withBatchSize` or `withBatchNum`?"); + + long maxElemCount = (maxVal - minVal) + 1; + long bigBatchNum = maxElemCount - (batchSize - 1) * batchNum; + + Serializable[][] parameters = new Serializable[batchNum][2]; + long start = minVal; + for (int i = 0; i < batchNum; i++) { + long end = start + batchSize - 1 - (i >= bigBatchNum ? 1 : 0); + parameters[i] = new Long[] {start, end}; + start = end + 1; + } + return parameters; + } +} diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/SlideTimingSplitterEnumerator.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/SlideTimingSplitterEnumerator.java new file mode 100644 index 000000000..4f458fccc --- /dev/null +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/SlideTimingSplitterEnumerator.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.core.datastream.source.enumerator.splitter; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** A split enumerator based on sql-parameters grains. */ +@PublicEvolving +public class SlideTimingSplitterEnumerator extends SqlSplitterEnumerator { + public static final Logger LOG = LoggerFactory.getLogger(SlideTimingSplitterEnumerator.class); + + private final long slideStepMills; + private final long slideSpanMills; + private final long splitGenerateDelayMillis; + + private @Nonnull Long startMills; + + protected SlideTimingSplitterEnumerator( + String sqlTemplate, + Long startMills, + long slideSpanMills, + long slideStepMills, + long splitGenerateDelayMillis) { + super(sqlTemplate); + this.startMills = Preconditions.checkNotNull(startMills); + this.slideStepMills = slideStepMills; + this.slideSpanMills = slideSpanMills; + this.splitGenerateDelayMillis = splitGenerateDelayMillis; + } + + @Override + public void start(JdbcConnectionProvider connectionProvider) {} + + @Override + public void close() {} + + @Override + public boolean isAllSplitsFinished() { + return false; + } + + @VisibleForTesting + public Serializable[][] getSqlParameters() { + List tmpList = new ArrayList<>(); + while (nextSplitAvailable(startMills)) { + Serializable[] params = new Serializable[] {startMills, startMills + slideSpanMills}; + tmpList.add(params); + startMills += slideStepMills; + } + return tmpList.toArray(new Serializable[0][]); + } + + @Override + public @Nullable Serializable serializableState() { + return this.startMills; + } + + @Override + public SlideTimingSplitterEnumerator restoreState(Serializable state) { + Preconditions.checkArgument((Long) state > 0L); + this.startMills = (Long) state; + return this; + } + + private boolean nextSplitAvailable(Long nextSpanStartMillis) { + final long delayedNextSpanStartMillis = nextSpanStartMillis + splitGenerateDelayMillis; + final long currentAvailableMillis = currentAvailableMillis(); + return currentAvailableMillis >= delayedNextSpanStartMillis + && (currentAvailableMillis - delayedNextSpanStartMillis >= slideSpanMills); + } + + private Long currentAvailableMillis() { + return System.currentTimeMillis(); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof SlideTimingSplitterEnumerator)) { + return false; + } + if (!super.equals(o)) { + return false; + } + SlideTimingSplitterEnumerator that = (SlideTimingSplitterEnumerator) o; + return slideStepMills == that.slideStepMills + && slideSpanMills == that.slideSpanMills + && splitGenerateDelayMillis == that.splitGenerateDelayMillis + && Objects.equals(startMills, that.startMills); + } + + @Override + public int hashCode() { + return Objects.hash( + super.hashCode(), + slideStepMills, + slideSpanMills, + splitGenerateDelayMillis, + startMills); + } + + public static SlideTimingSplitterEnumeratorBuilder builder() { + return new SlideTimingSplitterEnumeratorBuilder(); + } + + /** A slide split enumerator builder. */ + public static class SlideTimingSplitterEnumeratorBuilder { + private String sqlTemplate; + private Long startMills; + private Long slideSpanMills; + private Long slideStepMills; + private Long splitGenerateDelayMillis; + + public SlideTimingSplitterEnumeratorBuilder setSqlTemplate(String sqlTemplate) { + this.sqlTemplate = sqlTemplate; + return this; + } + + public SlideTimingSplitterEnumeratorBuilder setStartMills(Long startMills) { + this.startMills = startMills; + return this; + } + + public SlideTimingSplitterEnumeratorBuilder setSlideSpanMills(Long slideSpanMills) { + this.slideSpanMills = slideSpanMills; + return this; + } + + public SlideTimingSplitterEnumeratorBuilder setSlideStepMills(Long slideStepMills) { + this.slideStepMills = slideStepMills; + return this; + } + + public SlideTimingSplitterEnumeratorBuilder setSplitGenerateDelayMillis( + Long splitGenerateDelayMillis) { + this.splitGenerateDelayMillis = splitGenerateDelayMillis; + return this; + } + + public SlideTimingSplitterEnumerator build() { + Preconditions.checkArgument(startMills > 0L, "'startMillis' must be greater than 0. "); + Preconditions.checkArgument( + slideSpanMills > 0 || slideStepMills > 0, + "parameters must satisfy slideSpanMills > 0 and slideStepMills > 0"); + Preconditions.checkArgument( + splitGenerateDelayMillis >= 0L, + "parameters must satisfy splitGenerateDelayMillis >= 0"); + return new SlideTimingSplitterEnumerator( + sqlTemplate, + startMills, + slideSpanMills, + slideStepMills, + splitGenerateDelayMillis); + } + } +} diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/SplitterEnumerator.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/SplitterEnumerator.java new file mode 100644 index 000000000..22d12fd58 --- /dev/null +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/SplitterEnumerator.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.core.datastream.source.enumerator.splitter; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit; +import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; + +import javax.annotation.Nonnull; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import java.util.function.Supplier; + +/** Interface for jdbc sql split enumerator. */ +@PublicEvolving +public interface SplitterEnumerator extends AutoCloseable, Serializable { + + /** + * Start the enumerator. + * + * @param connectionProvider The JDBC connection provider. + */ + void start(JdbcConnectionProvider connectionProvider); + + /** Close the enumerator. */ + void close(); + + /** All splits have been enumerated. */ + boolean isAllSplitsFinished(); + + /** Enumerate the JDBC splits. */ + List enumerateSplits(); + + /** + * Enumerate the JDBC splits. + * + * @param splitGettable If the next batch splits are gettable. + * @return The result splits generated by the split enumerator. + */ + default List enumerateSplits(@Nonnull Supplier splitGettable) { + return enumerateSplits(splitGettable.get()); + } + + /** + * Enumerate the JDBC splits. + * + * @param splitGettable If the next batch splits are gettable. + * @return The result splits generated by the split enumerator. + */ + default List enumerateSplits(boolean splitGettable) { + if (!splitGettable) { + return Collections.emptyList(); + } + return enumerateSplits(); + } + + /** + * Get lineage queries for splits. + * + * @return The lineage queries. + */ + List lineageQueries(); + + /** + * Get the serializable state of the enumerator. + * + * @return The serializable state. + */ + Serializable serializableState(); + + /** + * Restore the enumerator state from the given serializable state. + * + * @param state The serializable state. + * @return The restored SplitterEnumerator. + */ + SplitterEnumerator restoreState(Serializable state); +} diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/SqlSplitterEnumerator.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/SqlSplitterEnumerator.java new file mode 100644 index 000000000..5ddc3017d --- /dev/null +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/SqlSplitterEnumerator.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.core.datastream.source.enumerator.splitter; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.jdbc.core.datastream.source.split.CheckpointedOffset; +import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** A sql base template split enumerator. */ +public abstract class SqlSplitterEnumerator implements SplitterEnumerator { + public static final Logger LOG = LoggerFactory.getLogger(SqlSplitterEnumerator.class); + + private final char[] currentId = "0000000000".toCharArray(); + + private final String sqlTemplate; + + protected SqlSplitterEnumerator(String sqlTemplate) { + this.sqlTemplate = Preconditions.checkNotNull(sqlTemplate); + } + + @VisibleForTesting + protected abstract Serializable[][] getSqlParameters(); + + protected String getSqlTemplate() { + return this.sqlTemplate; + } + + @Override + public List enumerateSplits() { + Serializable[][] params = getSqlParameters(); + int paramLength = params.length; + List splitList = new ArrayList<>(paramLength); + if (paramLength == 0) { + splitList.add(createSplit(null)); + } else { + for (Serializable[] paramArr : params) { + splitList.add(createSplit(paramArr)); + } + } + return splitList; + } + + @Override + public List lineageQueries() { + return Collections.singletonList(getSqlTemplate()); + } + + protected JdbcSourceSplit createSplit(Serializable[] paramArr) { + return new JdbcSourceSplit( + getNextId(), getSqlTemplate(), paramArr, new CheckpointedOffset()); + } + + protected final String getNextId() { + // because we just increment numbers, we increment the char representation directly, + // rather than incrementing an integer and converting it to a string representation + // every time again (requires quite some expensive conversion logic). + incrementCharArrayByOne(currentId, currentId.length - 1); + return new String(currentId); + } + + private static void incrementCharArrayByOne(char[] array, int pos) { + char c = array[pos]; + c++; + + if (c > '9') { + c = '0'; + incrementCharArrayByOne(array, pos - 1); + } + array[pos] = c; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof SqlSplitterEnumerator)) { + return false; + } + SqlSplitterEnumerator that = (SqlSplitterEnumerator) o; + return Objects.equals(sqlTemplate, that.sqlTemplate); + } + + @Override + public int hashCode() { + return Objects.hash(sqlTemplate); + } +} diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/LineageUtils.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/LineageUtils.java index 0a09671c4..be5ea909b 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/LineageUtils.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/LineageUtils.java @@ -32,8 +32,8 @@ import io.openlineage.sql.OpenLineageSql; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -56,7 +56,11 @@ public static Optional nameOf( } public static Optional tableNameOf(String query, boolean isSource) { - return OpenLineageSql.parse(Arrays.asList(query)) + return tableNameOf(Collections.singletonList(query), isSource); + } + + public static Optional tableNameOf(List queries, boolean isSource) { + return OpenLineageSql.parse(queries) .map( sqlMeta -> isSource diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/CompositeJdbcParameterValuesProvider.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/CompositeJdbcParameterValuesProvider.java index eebeac5ab..ded972ef9 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/CompositeJdbcParameterValuesProvider.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/CompositeJdbcParameterValuesProvider.java @@ -24,6 +24,7 @@ import java.io.Serializable; /** Combine 2 {@link JdbcParameterValuesProvider} into 1. */ +@Deprecated @Internal public class CompositeJdbcParameterValuesProvider implements JdbcParameterValuesProvider { JdbcParameterValuesProvider a; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/JdbcGenericParameterValuesProvider.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/JdbcGenericParameterValuesProvider.java index d96379821..c219070b4 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/JdbcGenericParameterValuesProvider.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/JdbcGenericParameterValuesProvider.java @@ -27,6 +27,7 @@ * This splits generator actually does nothing but wrapping the query parameters computed by the * user before creating the {@link JdbcInputFormat} instance. */ +@Deprecated @PublicEvolving public class JdbcGenericParameterValuesProvider implements JdbcParameterValuesProvider { diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/JdbcNumericBetweenParametersProvider.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/JdbcNumericBetweenParametersProvider.java index 617e6c526..28c600d05 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/JdbcNumericBetweenParametersProvider.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/JdbcNumericBetweenParametersProvider.java @@ -37,6 +37,7 @@ *

You can take advantage of this class to automatically generate the parameters of the BETWEEN * clause, based on the passed constructor parameters. */ +@Deprecated public class JdbcNumericBetweenParametersProvider implements JdbcParameterValuesProvider { private final long minVal; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/JdbcParameterValuesProvider.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/JdbcParameterValuesProvider.java index 3f1c2aede..a8699ede4 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/JdbcParameterValuesProvider.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/JdbcParameterValuesProvider.java @@ -28,6 +28,7 @@ * run (i.e. splits). Each query will be parameterized using a row of the matrix provided by each * {@link JdbcParameterValuesProvider} implementation. */ +@Deprecated @PublicEvolving public interface JdbcParameterValuesProvider extends Serializable { diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/JdbcSlideTimingParameterProvider.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/JdbcSlideTimingParameterProvider.java index eae5891ed..de4cb00df 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/JdbcSlideTimingParameterProvider.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/JdbcSlideTimingParameterProvider.java @@ -28,6 +28,7 @@ import java.util.List; /** The parameters provider generate parameters by slide timing window strategy. */ +@Deprecated @PublicEvolving public class JdbcSlideTimingParameterProvider implements JdbcParameterValuesProvider { diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilderTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilderTest.java index d116433b5..488ae807b 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilderTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilderTest.java @@ -22,8 +22,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.core.datastream.source.config.ContinuousUnBoundingSettings; -import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator; +import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.splitter.JdbcSqlSplitterEnumerator; import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor; +import org.apache.flink.connector.jdbc.core.datastream.source.split.CheckpointedOffset; +import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit; import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider; import org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider; @@ -53,7 +55,7 @@ class JdbcSourceBuilderTest { private final String dbUrl = "dbUrl"; private final ResultExtractor extractor = ResultExtractor.ofRowResultExtractor(); private final JdbcParameterValuesProvider parameterValuesProvider = - new JdbcNumericBetweenParametersProvider(0, 3); + new JdbcNumericBetweenParametersProvider(0, 3).ofBatchSize(1); private final TypeInformation typeInformation = new TypeHint() {}.getTypeInfo(); @@ -74,13 +76,15 @@ void testSetSql() { .isInstanceOf(IllegalArgumentException.class); assertThatThrownBy(() -> JdbcSource.builder().setSql(emptySql)) .isInstanceOf(IllegalArgumentException.class); - assertThatThrownBy(() -> JdbcSource.builder().setDBUrl(dbUrl).build()) - .isInstanceOf(IllegalStateException.class); + // For valid. JdbcSource jdbcSource = sourceBuilder.build(); - SqlTemplateSplitEnumerator sqlEnumerator = - (SqlTemplateSplitEnumerator) jdbcSource.getSqlSplitEnumeratorProvider().create(); - assertThat(sqlEnumerator.getSqlTemplate()).isEqualTo(validSql); + assertThat(jdbcSource.getSplitterEnumerator()) + .isInstanceOf(JdbcSqlSplitterEnumerator.class); + JdbcSqlSplitterEnumerator sqlEnumerator = + (JdbcSqlSplitterEnumerator) jdbcSource.getSplitterEnumerator(); + + assertThat(sqlEnumerator.lineageQueries()).containsExactly(validSql); } @Test @@ -89,10 +93,32 @@ void testSetParameterProvider() { .isInstanceOf(NullPointerException.class); JdbcSource jdbcSource = sourceBuilder.setJdbcParameterValuesProvider(parameterValuesProvider).build(); - SqlTemplateSplitEnumerator sqlSplitEnumerator = - (SqlTemplateSplitEnumerator) jdbcSource.getSqlSplitEnumeratorProvider().create(); - assertThat(sqlSplitEnumerator.getParameterValuesProvider()) - .isEqualTo(parameterValuesProvider); + assertThat(jdbcSource.getSplitterEnumerator()) + .isInstanceOf(JdbcSqlSplitterEnumerator.class); + JdbcSqlSplitterEnumerator sqlEnumerator = + (JdbcSqlSplitterEnumerator) jdbcSource.getSplitterEnumerator(); + assertThat(sqlEnumerator.enumerateSplits()) + .containsExactly( + new JdbcSourceSplit( + "0000000001", + validSql, + new Serializable[] {0L, 0L}, + new CheckpointedOffset()), + new JdbcSourceSplit( + "0000000002", + validSql, + new Serializable[] {1L, 1L}, + new CheckpointedOffset()), + new JdbcSourceSplit( + "0000000003", + validSql, + new Serializable[] {2L, 2L}, + new CheckpointedOffset()), + new JdbcSourceSplit( + "0000000004", + validSql, + new Serializable[] {3L, 3L}, + new CheckpointedOffset())); } @Test diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceStreamRelatedITCase.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceStreamRelatedITCase.java index 23814eede..16a9453d0 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceStreamRelatedITCase.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceStreamRelatedITCase.java @@ -149,6 +149,7 @@ void testForNormalCaseWithoutFailure( .addSink(new TestingSinkFunction()); waitExpectation(client, env, () -> collectedRecords.size() >= TESTING_ENTRIES_SIZE); + assertThat(collectedRecords.size()).isEqualTo(testEntries.size()); assertThat(collectedRecords).containsExactlyInAnyOrderElementsOf(testEntries); } diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumeratorTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumeratorTest.java index 268fb73f9..1014db162 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumeratorTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumeratorTest.java @@ -19,21 +19,19 @@ package org.apache.flink.connector.jdbc.core.datastream.source.enumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.splitter.SplitterEnumerator; import org.apache.flink.connector.jdbc.core.datastream.source.split.CheckpointedOffset; import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit; +import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import javax.annotation.Nonnull; - -import java.io.IOException; import java.io.Serializable; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.function.Supplier; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; @@ -108,14 +106,40 @@ private static JdbcSourceEnumerator createEnumerator( return new JdbcSourceEnumerator( context, - new JdbcSqlSplitEnumeratorBase(null) { + new SplitterEnumerator() { + @Override + public void start(JdbcConnectionProvider connectionProvider) {} + + @Override + public void close() {} + @Override - public @Nonnull List enumerateSplits( - @Nonnull Supplier splitGettable) throws IOException { + public boolean isAllSplitsFinished() { + return false; + } + + @Override + public List enumerateSplits() { return Collections.emptyList(); } + + @Override + public List lineageQueries() { + return Collections.emptyList(); + } + + @Override + public Serializable serializableState() { + return null; + } + + @Override + public SplitterEnumerator restoreState(Serializable state) { + return null; + } }, null, + null, Arrays.stream(splits).collect(Collectors.toList())); } } diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/PreparedSplitterNumericParametersTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/PreparedSplitterNumericParametersTest.java new file mode 100644 index 000000000..34ea58fd3 --- /dev/null +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/PreparedSplitterNumericParametersTest.java @@ -0,0 +1,116 @@ +package org.apache.flink.connector.jdbc.core.datastream.source.enumerator.splitter; + +import org.junit.jupiter.api.Test; + +import java.io.Serializable; + +import static org.assertj.core.api.Assertions.assertThat; + +class PreparedSplitterNumericParametersTest { + + @Test + void testBatchSizeDivisible() { + Serializable[][] parameters = + new PreparedSplitterNumericParameters(-5, 9).withBatchSize(3).getParameterValues(); + + long[][] expected = { + new long[] {-5, -3}, + new long[] {-2, 0}, + new long[] {1, 3}, + new long[] {4, 6}, + new long[] {7, 9} + }; + check(expected, parameters); + } + + @Test + void testBatchSizeNotDivisible() { + Serializable[][] parameters = + new PreparedSplitterNumericParameters(-5, 11).withBatchSize(4).getParameterValues(); + + long[][] expected = { + new long[] {-5, -2}, + new long[] {-1, 2}, + new long[] {3, 5}, + new long[] {6, 8}, + new long[] {9, 11} + }; + check(expected, parameters); + } + + @Test + void testBatchSizeTooLarge() { + Serializable[][] parameters = + new PreparedSplitterNumericParameters(0, 2).withBatchSize(5).getParameterValues(); + + long[][] expected = {new long[] {0, 2}}; + check(expected, parameters); + } + + @Test + void testBatchNumDivisible() { + Serializable[][] parameters = + new PreparedSplitterNumericParameters(-5, 9).withBatchNum(5).getParameterValues(); + + long[][] expected = { + new long[] {-5, -3}, + new long[] {-2, 0}, + new long[] {1, 3}, + new long[] {4, 6}, + new long[] {7, 9} + }; + check(expected, parameters); + } + + @Test + void testBatchNumNotDivisible() { + Serializable[][] parameters = + new PreparedSplitterNumericParameters(-5, 11).withBatchNum(5).getParameterValues(); + + long[][] expected = { + new long[] {-5, -2}, + new long[] {-1, 2}, + new long[] {3, 5}, + new long[] {6, 8}, + new long[] {9, 11} + }; + check(expected, parameters); + } + + @Test + void testBatchNumTooLarge() { + Serializable[][] parameters = + new PreparedSplitterNumericParameters(0, 2).withBatchNum(5).getParameterValues(); + + long[][] expected = { + new long[] {0, 0}, + new long[] {1, 1}, + new long[] {2, 2} + }; + check(expected, parameters); + } + + @Test + void testBatchMaxMinTooLarge() { + Serializable[][] parameters = + new PreparedSplitterNumericParameters(2260418954055131340L, 3875220057236942850L) + .withBatchNum(3) + .getParameterValues(); + + long[][] expected = { + new long[] {2260418954055131340L, 2798685988449068491L}, + new long[] {2798685988449068492L, 3336953022843005643L}, + new long[] {3336953022843005644L, 3875220057236942795L} + }; + check(expected, parameters); + } + + private void check(long[][] expected, Serializable[][] actual) { + assertThat(actual).hasDimensions(expected.length, expected[0].length); + for (int i = 0; i < expected.length; i++) { + for (int j = 0; j < expected[i].length; j++) { + assertThat(((Long) actual[i][j]).longValue()).isEqualTo(expected[i][j]); + } + } + } +}