Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36816] Support source parallelism setting for JDBC connector #148

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/content.zh/docs/connectors/table/jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,14 @@ ON myTopic.key = MyUserTable.id;
它决定了每个语句是否在事务中自动提交。有些 JDBC 驱动程序,特别是
<a href="https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor">Postgres</a>,可能需要将此设置为 false 以便流化结果。</td>
</tr>
<tr>
<td><h5>scan.parallelism</h5></td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you add a link to some documentation for this - as there is for autocommit please.

<td>optional</td>
<td>no</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>定义 JDBC source 算子的并行度。默认情况下会使用全局默认并发。</td>
</tr>
<tr>
<td><h5>lookup.cache</h5></td>
<td>可选</td>
Expand Down
8 changes: 8 additions & 0 deletions docs/content/docs/connectors/table/jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,14 @@ Connector Options
which determines whether each statement is committed in a transaction automatically. Some JDBC drivers, specifically
<a href="https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor">Postgres</a>, may require this to be set to false in order to stream results.</td>
</tr>
<tr>
<td><h5>scan.parallelism</h5></td>
<td>optional</td>
<td>no</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>Defines the parallelism of the JDBC source operator. If not set, the global default parallelism is used.</td>
</tr>
<tr>
<td><h5>lookup.cache</h5></td>
<td>optional</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public class JdbcConnectorOptions {
// Scan options
// -----------------------------------------------------------------------------------------

public static final ConfigOption<Integer> SCAN_PARALLELISM = FactoryUtil.SOURCE_PARALLELISM;

public static final ConfigOption<String> SCAN_PARTITION_COLUMN =
ConfigOptions.key("scan.partition.column")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.PASSWORD;
import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SCAN_AUTO_COMMIT;
import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SCAN_FETCH_SIZE;
import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SCAN_PARALLELISM;
import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SCAN_PARTITION_COLUMN;
import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SCAN_PARTITION_LOWER_BOUND;
import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SCAN_PARTITION_NUM;
Expand Down Expand Up @@ -167,6 +168,7 @@ private JdbcReadOptions getJdbcReadOptions(ReadableConfig readableConfig) {
final Optional<String> partitionColumnName =
readableConfig.getOptional(SCAN_PARTITION_COLUMN);
final JdbcReadOptions.Builder builder = JdbcReadOptions.builder();
readableConfig.getOptional(SCAN_PARALLELISM).ifPresent(builder::setParallelism);
if (partitionColumnName.isPresent()) {
builder.setPartitionColumnName(partitionColumnName.get());
builder.setPartitionLowerBound(readableConfig.get(SCAN_PARTITION_LOWER_BOUND));
Expand Down Expand Up @@ -243,6 +245,7 @@ public Set<ConfigOption<?>> optionalOptions() {
optionalOptions.add(COMPATIBLE_MODE);
optionalOptions.add(USERNAME);
optionalOptions.add(PASSWORD);
optionalOptions.add(SCAN_PARALLELISM);
optionalOptions.add(SCAN_PARTITION_COLUMN);
optionalOptions.add(SCAN_PARTITION_LOWER_BOUND);
optionalOptions.add(SCAN_PARTITION_UPPER_BOUND);
Expand Down Expand Up @@ -316,6 +319,13 @@ private void validateConfigOptions(ReadableConfig config, ClassLoader classLoade
}
}

if (config.getOptional(SCAN_PARALLELISM).isPresent() && config.get(SCAN_PARALLELISM) <= 0) {
throw new IllegalArgumentException(
String.format(
"The value of '%s' option should be positive, but is %s.",
SCAN_PARALLELISM.key(), config.get(SCAN_PARALLELISM)));
}

checkAllOrNone(config, new ConfigOption[] {LOOKUP_CACHE_MAX_ROWS, LOOKUP_CACHE_TTL});

if (config.get(LOOKUP_MAX_RETRIES) < 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
builder.setRowDataTypeInfo(
runtimeProviderContext.createTypeInformation(physicalRowDataType));

return InputFormatProvider.of(builder.build());
return InputFormatProvider.of(builder.build(), readOptions.getParallelism());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class JdbcReadOptions implements Serializable {

private final int fetchSize;
private final boolean autoCommit;
private final Integer parallelism;

private JdbcReadOptions(
String query,
Expand All @@ -41,7 +42,8 @@ private JdbcReadOptions(
Long partitionUpperBound,
Integer numPartitions,
int fetchSize,
boolean autoCommit) {
boolean autoCommit,
Integer parallelism) {
this.query = query;
this.partitionColumnName = partitionColumnName;
this.partitionLowerBound = partitionLowerBound;
Expand All @@ -50,6 +52,7 @@ private JdbcReadOptions(

this.fetchSize = fetchSize;
this.autoCommit = autoCommit;
this.parallelism = parallelism;
}

public Optional<String> getQuery() {
Expand Down Expand Up @@ -80,6 +83,10 @@ public boolean getAutoCommit() {
return autoCommit;
}

public Integer getParallelism() {
return parallelism;
}

public static Builder builder() {
return new Builder();
}
Expand All @@ -94,7 +101,8 @@ public boolean equals(Object o) {
&& Objects.equals(partitionUpperBound, options.partitionUpperBound)
&& Objects.equals(numPartitions, options.numPartitions)
&& Objects.equals(fetchSize, options.fetchSize)
&& Objects.equals(autoCommit, options.autoCommit);
&& Objects.equals(autoCommit, options.autoCommit)
&& Objects.equals(parallelism, options.parallelism);
} else {
return false;
}
Expand All @@ -110,6 +118,7 @@ public static class Builder {

protected int fetchSize = 0;
protected boolean autoCommit = true;
protected Integer scanParallelism;

/** optional, SQL query statement for this JDBC source. */
public Builder setQuery(String query) {
Expand Down Expand Up @@ -159,6 +168,12 @@ public Builder setAutoCommit(boolean autoCommit) {
return this;
}

/** optional, source scan parallelism. */
public Builder setParallelism(int scanParallelism) {
this.scanParallelism = scanParallelism;
return this;
}

public JdbcReadOptions build() {
return new JdbcReadOptions(
query,
Expand All @@ -167,7 +182,8 @@ public JdbcReadOptions build() {
partitionUpperBound,
numPartitions,
fetchSize,
autoCommit);
autoCommit,
scanParallelism);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ void testJdbcReadProperties() {
properties.put("scan.partition.num", "10");
properties.put("scan.fetch-size", "20");
properties.put("scan.auto-commit", "false");
properties.put("scan.parallelism", "13");

DynamicTableSource actual = createTableSource(SCHEMA, properties);

Expand All @@ -141,6 +142,7 @@ void testJdbcReadProperties() {
.setNumPartitions(10)
.setFetchSize(20)
.setAutoCommit(false)
.setParallelism(13)
.build();
JdbcDynamicTableSource expected =
new JdbcDynamicTableSource(
Expand Down Expand Up @@ -366,6 +368,14 @@ void testJdbcValidation() {
assertThatThrownBy(() -> createTableSource(SCHEMA, finalProperties7))
.hasStackTraceContaining(
"The value of 'connection.max-retry-timeout' option must be in second granularity and shouldn't be smaller than 1 second, but is 100ms.");

// scan parallelism > = 1
properties = getAllOptions();
properties.put("scan.parallelism", "-1");
Map<String, String> finalProperties8 = properties;
assertThatThrownBy(() -> createTableSource(SCHEMA, finalProperties8))
.hasStackTraceContaining(
"The value of 'scan.parallelism' option should be positive, but is -1.");
}

@Test
Expand Down