Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumerator;
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumeratorState;
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSqlSplitEnumeratorBase;
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator;
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.splitter.JdbcSqlSplitterEnumerator;
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.splitter.SplitterEnumerator;
import org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceReader;
import org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceSplitReader;
import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
Expand Down Expand Up @@ -71,12 +72,13 @@ public class JdbcSource<OUT>
private final @Nullable ContinuousUnBoundingSettings continuousUnBoundingSettings;

private final Configuration configuration;
private final JdbcSqlSplitEnumeratorBase.Provider<JdbcSourceSplit> sqlSplitEnumeratorProvider;
private final SplitterEnumerator splitterEnumerator;

protected JdbcConnectionProvider connectionProvider;
private final ResultExtractor<OUT> resultExtractor;
private final DeliveryGuarantee deliveryGuarantee;

@Deprecated
JdbcSource(
Configuration configuration,
JdbcConnectionProvider connectionProvider,
Expand All @@ -85,9 +87,27 @@ public class JdbcSource<OUT>
TypeInformation<OUT> typeInformation,
@Nullable DeliveryGuarantee deliveryGuarantee,
@Nullable ContinuousUnBoundingSettings continuousUnBoundingSettings) {
this(
configuration,
connectionProvider,
new JdbcSqlSplitterEnumerator(sqlSplitEnumeratorProvider),
resultExtractor,
typeInformation,
deliveryGuarantee,
continuousUnBoundingSettings);
}

JdbcSource(
Configuration configuration,
JdbcConnectionProvider connectionProvider,
SplitterEnumerator splitterEnumerator,
ResultExtractor<OUT> resultExtractor,
TypeInformation<OUT> typeInformation,
@Nullable DeliveryGuarantee deliveryGuarantee,
@Nullable ContinuousUnBoundingSettings continuousUnBoundingSettings) {
this.configuration = Preconditions.checkNotNull(configuration);
this.connectionProvider = Preconditions.checkNotNull(connectionProvider);
this.sqlSplitEnumeratorProvider = Preconditions.checkNotNull(sqlSplitEnumeratorProvider);
this.splitterEnumerator = Preconditions.checkNotNull(splitterEnumerator);
this.resultExtractor = Preconditions.checkNotNull(resultExtractor);
this.deliveryGuarantee =
Objects.isNull(deliveryGuarantee) ? DeliveryGuarantee.NONE : deliveryGuarantee;
Expand Down Expand Up @@ -125,7 +145,8 @@ public SplitEnumerator<JdbcSourceSplit, JdbcSourceEnumeratorState> createEnumera
SplitEnumeratorContext<JdbcSourceSplit> enumContext) throws Exception {
return new JdbcSourceEnumerator(
enumContext,
sqlSplitEnumeratorProvider.create(),
splitterEnumerator,
connectionProvider,
continuousUnBoundingSettings,
new ArrayList<>());
}
Expand All @@ -139,7 +160,8 @@ public SplitEnumerator<JdbcSourceSplit, JdbcSourceEnumeratorState> restoreEnumer
checkpoint.getOptionalUserDefinedSplitEnumeratorState();
return new JdbcSourceEnumerator(
enumContext,
sqlSplitEnumeratorProvider.restore(optionalUserDefinedSplitEnumeratorState),
splitterEnumerator.restoreState(optionalUserDefinedSplitEnumeratorState),
connectionProvider,
continuousUnBoundingSettings,
checkpoint.getRemainingSplits());
}
Expand Down Expand Up @@ -167,8 +189,8 @@ public static <OUT> JdbcSourceBuilder<OUT> builder() {
// ---- Visible for testing methods. ---

@VisibleForTesting
public JdbcSqlSplitEnumeratorBase.Provider<JdbcSourceSplit> getSqlSplitEnumeratorProvider() {
return sqlSplitEnumeratorProvider;
public SplitterEnumerator getSplitterEnumerator() {
return splitterEnumerator;
}

@VisibleForTesting
Expand Down Expand Up @@ -204,7 +226,7 @@ public boolean equals(Object o) {
return boundedness == that.boundedness
&& Objects.equals(typeInformation, that.typeInformation)
&& Objects.equals(configuration, that.configuration)
&& Objects.equals(sqlSplitEnumeratorProvider, that.sqlSplitEnumeratorProvider)
&& Objects.equals(splitterEnumerator, that.splitterEnumerator)
&& Objects.equals(connectionProvider, that.connectionProvider)
&& Objects.equals(resultExtractor, that.resultExtractor)
&& deliveryGuarantee == that.deliveryGuarantee
Expand All @@ -215,9 +237,8 @@ public boolean equals(Object o) {
public LineageVertex getLineageVertex() {
DefaultTypeDatasetFacet defaultTypeDatasetFacet =
new DefaultTypeDatasetFacet(getTypeInformation());
SqlTemplateSplitEnumerator enumerator =
(SqlTemplateSplitEnumerator) sqlSplitEnumeratorProvider.create();
Optional<String> nameOpt = LineageUtils.tableNameOf(enumerator.getSqlTemplate(), true);
Optional<String> nameOpt =
LineageUtils.tableNameOf(splitterEnumerator.lineageQueries(), true);
String namespace = LineageUtils.namespaceOf(connectionProvider);
LineageDataset dataset =
LineageUtils.datasetOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.core.datastream.source.config.ContinuousUnBoundingSettings;
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSqlSplitEnumeratorBase;
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator;
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.splitter.JdbcSqlSplitterEnumerator;
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.splitter.SplitterEnumerator;
import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
Expand All @@ -51,7 +54,7 @@
*
* <pre><code>
* JdbcSource&lt;Row> source = JdbcSource.&lt;Row>builder()
* .setSql(validSql)
* .setSplitter(PreparedSplitterEnumerator.of(validSql))
* .setResultExtractor(new RowResultExtractor())
* .setDBUrl(dbUrl)
* .setDriverName(driverName)
Expand All @@ -67,6 +70,7 @@
*
* <pre><code>
*
* String query = "select * from books WHERE author = ?"
* Serializable[][] queryParameters = new String[2][1];
* queryParameters[0] = new String[]{"Kumar"};
* queryParameters[1] = new String[]{"Tan Ah Teck"};
Expand All @@ -78,13 +82,12 @@
* .setUsername(username)
* .setDriverName("org.apache.derby.jdbc.EmbeddedDriver")
* .setDBUrl("jdbc:derby:memory:ebookshop")
* .setSql("select * from books WHERE author = ?")
* .setJdbcParameterValuesProvider(new JdbcGenericParameterValuesProvider(queryParameters))
* .setSplitter(PreparedSplitterEnumerator.of(query, queryParameters))
* .build();
* </code></pre>
*
* @see Row
* @see JdbcParameterValuesProvider
* @see SplitterEnumerator
* @see PreparedStatement
* @see DriverManager
* @see JdbcSource
Expand Down Expand Up @@ -118,7 +121,7 @@ public class JdbcSourceBuilder<OUT> {
private JdbcParameterValuesProvider jdbcParameterValuesProvider;
private @Nullable Serializable optionalSqlSplitEnumeratorState;
private ResultExtractor<OUT> resultExtractor;

private SplitterEnumerator splitterEnumerator;
private JdbcConnectionProvider connectionProvider;

JdbcSourceBuilder() {
Expand All @@ -132,6 +135,12 @@ public class JdbcSourceBuilder<OUT> {
this.autoCommit = true;
}

public JdbcSourceBuilder<OUT> setSplitter(SplitterEnumerator splitterEnumerator) {
this.splitterEnumerator = splitterEnumerator;
return this;
}

@Deprecated
public JdbcSourceBuilder<OUT> setSql(@Nonnull String sql) {
Preconditions.checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sql),
Expand Down Expand Up @@ -197,6 +206,7 @@ public JdbcSourceBuilder<OUT> setContinuousUnBoundingSettings(
* If the value was set as an instance of {@link JdbcSlideTimingParameterProvider}, it's
* required to specify the {@link #continuousUnBoundingSettings}.
*/
@Deprecated
public JdbcSourceBuilder<OUT> setJdbcParameterValuesProvider(
@Nonnull JdbcParameterValuesProvider parameterValuesProvider) {
this.jdbcParameterValuesProvider = Preconditions.checkNotNull(parameterValuesProvider);
Expand Down Expand Up @@ -289,36 +299,52 @@ public JdbcSource<OUT> build() {
JdbcSourceOptions.READER_FETCH_BATCH_SIZE, splitReaderFetchBatchSize);
this.configuration.set(JdbcSourceOptions.AUTO_COMMIT, autoCommit);

Preconditions.checkState(
!StringUtils.isNullOrWhitespaceOnly(sql), "'sql' mustn't be null or empty.");
Preconditions.checkNotNull(resultExtractor, "'resultExtractor' mustn't be null.");
Preconditions.checkNotNull(typeInformation, "'typeInformation' mustn't be null.");

if (Objects.nonNull(continuousUnBoundingSettings)) {
Preconditions.checkArgument(
Objects.nonNull(jdbcParameterValuesProvider)
&& jdbcParameterValuesProvider
instanceof JdbcSlideTimingParameterProvider,
INVALID_SLIDE_TIMING_CONTINUOUS_HINT);
}

if (Objects.nonNull(jdbcParameterValuesProvider)
&& jdbcParameterValuesProvider instanceof JdbcSlideTimingParameterProvider) {
Preconditions.checkArgument(
Objects.nonNull(continuousUnBoundingSettings),
INVALID_CONTINUOUS_SLIDE_TIMING_HINT);
if (this.splitterEnumerator == null) {
Preconditions.checkState(
!StringUtils.isNullOrWhitespaceOnly(sql), "'sql' mustn't be null or empty.");

if (Objects.nonNull(continuousUnBoundingSettings)) {
Preconditions.checkArgument(
Objects.nonNull(jdbcParameterValuesProvider)
&& jdbcParameterValuesProvider
instanceof JdbcSlideTimingParameterProvider,
INVALID_SLIDE_TIMING_CONTINUOUS_HINT);
}

if (Objects.nonNull(jdbcParameterValuesProvider)
&& jdbcParameterValuesProvider instanceof JdbcSlideTimingParameterProvider) {
Preconditions.checkArgument(
Objects.nonNull(continuousUnBoundingSettings),
INVALID_CONTINUOUS_SLIDE_TIMING_HINT);
}

this.splitterEnumerator =
getSplitter(sql, jdbcParameterValuesProvider, optionalSqlSplitEnumeratorState);
}

return new JdbcSource<>(
configuration,
connectionProvider,
new SqlTemplateSplitEnumerator.TemplateSqlSplitEnumeratorProvider()
.setOptionalSqlSplitEnumeratorState(optionalSqlSplitEnumeratorState)
.setSqlTemplate(sql)
.setParameterValuesProvider(jdbcParameterValuesProvider),
splitterEnumerator,
resultExtractor,
typeInformation,
deliveryGuarantee,
continuousUnBoundingSettings);
}

private SplitterEnumerator getSplitter(
String sqlTemplate,
JdbcParameterValuesProvider parameterProvider,
Serializable userDefinedState) {
JdbcSqlSplitEnumeratorBase.Provider<?> provider =
new SqlTemplateSplitEnumerator.TemplateSqlSplitEnumeratorProvider()
.setOptionalSqlSplitEnumeratorState(userDefinedState)
.setSqlTemplate(sqlTemplate)
.setParameterValuesProvider(parameterProvider);

return new JdbcSqlSplitterEnumerator(provider);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.jdbc.core.datastream.source.config.ContinuousUnBoundingSettings;
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.splitter.SplitterEnumerator;
import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit;
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
Expand All @@ -50,47 +52,52 @@ public class JdbcSourceEnumerator
private final Boundedness boundedness;
private final LinkedHashMap<Integer, String> readersAwaitingSplit;
private final List<JdbcSourceSplit> unassigned;
private final JdbcSqlSplitEnumeratorBase<JdbcSourceSplit> sqlSplitEnumerator;
private final SplitterEnumerator splitterEnumerator;
private final @Nullable ContinuousUnBoundingSettings continuousUnBoundingSettings;
private final JdbcConnectionProvider connectionProvider;

public JdbcSourceEnumerator(
SplitEnumeratorContext<JdbcSourceSplit> context,
JdbcSqlSplitEnumeratorBase<JdbcSourceSplit> sqlSplitEnumerator,
SplitterEnumerator splitterEnumerator,
JdbcConnectionProvider connectionProvider,
ContinuousUnBoundingSettings continuousUnBoundingSettings,
List<JdbcSourceSplit> unassigned) {
this.context = Preconditions.checkNotNull(context);
this.sqlSplitEnumerator = Preconditions.checkNotNull(sqlSplitEnumerator);
this.splitterEnumerator = Preconditions.checkNotNull(splitterEnumerator);
this.continuousUnBoundingSettings = continuousUnBoundingSettings;
this.boundedness =
Objects.isNull(continuousUnBoundingSettings)
? Boundedness.BOUNDED
: Boundedness.CONTINUOUS_UNBOUNDED;
this.unassigned = Preconditions.checkNotNull(unassigned);
this.readersAwaitingSplit = new LinkedHashMap<>();
this.connectionProvider = connectionProvider;
}

@Override
public void start() {
sqlSplitEnumerator.open();
splitterEnumerator.start(connectionProvider);
if (boundedness == Boundedness.CONTINUOUS_UNBOUNDED
&& Objects.nonNull(continuousUnBoundingSettings)) {
context.callAsync(
() -> sqlSplitEnumerator.enumerateSplits(() -> 1024 - unassigned.size() > 0),
() -> splitterEnumerator.enumerateSplits(() -> 1024 - unassigned.size() > 0),
this::processNewSplits,
continuousUnBoundingSettings.getInitialDiscoveryDelay().toMillis(),
continuousUnBoundingSettings.getDiscoveryInterval().toMillis());
} else {
try {
unassigned.addAll(sqlSplitEnumerator.enumerateSplits(() -> true));
} catch (IOException e) {
throw new RuntimeException(e);
}
context.callAsync(
() ->
splitterEnumerator.isAllSplitsFinished()
? Collections.emptyList()
: splitterEnumerator.enumerateSplits(),
(List<JdbcSourceSplit> splits, Throwable error) ->
this.unassigned.addAll(splits));
}
}

@Override
public void close() throws IOException {
sqlSplitEnumerator.close();
splitterEnumerator.close();
}

@Override
Expand Down Expand Up @@ -136,7 +143,7 @@ public JdbcSourceEnumeratorState snapshotState(long checkpointId) throws Excepti
Collections.emptyList(),
Collections.emptyList(),
new ArrayList<>(unassigned),
sqlSplitEnumerator.optionalSqlSplitEnumeratorState);
splitterEnumerator.serializableState());
}

private Optional<JdbcSourceSplit> getNextSplit() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
*
* @param <SplitT> JDBC split type.
*/
@Deprecated
@PublicEvolving
public abstract class JdbcSqlSplitEnumeratorBase<SplitT> implements AutoCloseable, Serializable {
private final char[] currentId = "0000000000".toCharArray();
Expand Down Expand Up @@ -78,6 +79,7 @@ public abstract List<JdbcSourceSplit> enumerateSplits(@Nonnull Supplier<Boolean>
*
* @param <SplitT> Split type.
*/
@Deprecated
@PublicEvolving
public interface Provider<SplitT> extends Serializable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.function.Supplier;

/** A split enumerator based on sql-parameters grains. */
@Deprecated
public final class SqlTemplateSplitEnumerator extends JdbcSqlSplitEnumeratorBase<JdbcSourceSplit> {

public static final Logger LOG = LoggerFactory.getLogger(SqlTemplateSplitEnumerator.class);
Expand Down Expand Up @@ -105,6 +106,7 @@ public JdbcParameterValuesProvider getParameterValuesProvider() {
}

/** The {@link TemplateSqlSplitEnumeratorProvider} for {@link SqlTemplateSplitEnumerator}. */
@Deprecated
public static class TemplateSqlSplitEnumeratorProvider
implements JdbcSqlSplitEnumeratorBase.Provider<JdbcSourceSplit> {

Expand Down
Loading
Loading