Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-33929][jdbc-connector] Support JDBC String field read Fragment read #87

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions docs/content.zh/docs/connectors/datastream/jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,60 @@ env.from_collection(

env.execute()
```
### 支持字符串分片读取
<table class="table table-bordered" style="width: 60%">
<thead>
<tr>
<th class="text-left">Support String Split Database</th>
<th class="text-left">Slice Read String Grammar</th>
</tr>
</thead>
<tbody>
<tr>
<td>MySQL</td>
<td>ABS(MD5(`column`) % `number_of_partitions`)</td>
</tr>
<tr>
<td>Oracle</td>
<td>MOD(ORA_HASH(`column`) , `number_of_partitions`)</td>
<tr>
<td>PostgreSQL</td>
<td>(ABS(HASHTEXT(`column`)) % `number_of_partitions`)</td>
</tr>
<tr>
<td>MS SQL Server</td>
<td>ABS(HASHBYTES('MD5', `column`) % `number_of_partitions`)</td>
</tr>
</table>

<a name="jdbc-catalog"></a>

```java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Serializable[][] queryParameters = new Long[3][1];
queryParameters[0] = new Long[]{0L};
queryParameters[1] = new Long[]{1L};
queryParameters[2] = new Long[]{2L};

TypeInformation<?>[] fieldTypes = new TypeInformation<?>[]{
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO};
RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);

JdbcInputFormat jdbcInputFormat = JdbcInputFormat.buildJdbcInputFormat()
.setDrivername("com.mysql.cj.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost:3306/test")
.setUsername("root")
.setPassword("12345678")
.setPartitionColumnTypeString(true) // 当读取字符串分区键一定设置为true
.setQuery("select * from fake_source_sink where ABS(MD5( `name`) % 2 ) = ?") // 根据不同数据库改造SQL语法
.setRowTypeInfo(rowTypeInfo).setParametersProvider(new JdbcGenericParameterValuesProvider(queryParameters)).finish();
DataStreamSource<Row> input = env.createInput(jdbcInputFormat);
env.execute();
```

{{< /tab >}}
{{< /tabs >}}

Expand Down
47 changes: 46 additions & 1 deletion docs/content.zh/docs/connectors/table/jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -353,12 +353,57 @@ ON myTopic.key = MyUserTable.id;
为了在并行 `Source` task 实例中加速读取数据,Flink 为 JDBC table 提供了分区扫描的特性。

如果下述分区扫描参数中的任一项被指定,则下述所有的分区扫描参数必须都被指定。这些参数描述了在多个 task 并行读取数据时如何对表进行分区。
`scan.partition.column` 必须是相关表中的数字、日期或时间戳列。注意,`scan.partition.lower-bound` 和 `scan.partition.upper-bound` 用于决定分区的起始位置和过滤表中的数据。如果是批处理作业,也可以在提交 flink 作业之前获取最大值和最小值。
`scan.partition.column` 必须是相关表中的数字、日期或时间戳列、字符串类型。注意,`scan.partition.lower-bound` 和 `scan.partition.upper-bound` 用于决定分区的起始位置和过滤表中的数据。如果是批处理作业,也可以在提交 flink 作业之前获取最大值和最小值。

- `scan.partition.column`:输入用于进行分区的列名。
- `scan.partition.num`:分区数。
- `scan.partition.lower-bound`:第一个分区的最小值。
- `scan.partition.upper-bound`:最后一个分区的最大值。
#### 支持字符串分片读取
<table class="table table-bordered" style="width: 60%">
<thead>
<tr>
<th class="text-left">Support String Split Database</th>
<th class="text-left">Slice Read String Grammar</th>
</tr>
</thead>
<tbody>
<tr>
<td>MySQL</td>
<td>ABS(MD5(`column`) % `number_of_partitions`)</td>
</tr>
<tr>
<td>Oracle</td>
<td>MOD(ORA_HASH(`column`) , `number_of_partitions`)</td>
<tr>
<td>PostgreSQL</td>
<td>(ABS(HASHTEXT(`column`)) % `number_of_partitions`)</td>
</tr>
<tr>
<td>MS SQL Server</td>
<td>ABS(HASHBYTES('MD5', `column`) % `number_of_partitions`)</td>
</tr>
</table>

```SQL
CREATE TABLE my_split_string_read_table (
id int,
name STRING,
age INT,
email STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/test',
'table-name' = 'mysql_table_name',
'username'='root',
'password'='12345678',
'scan.partition.column'='name',
'scan.partition.num'='3',
'scan.partition.lower-bound'='0', -- 分别获取到 ABS(MD5(`name`) % 3) 0,1,2 的全表数据分区数据3
'scan.partition.upper-bound'='2'
)
```

<a name="lookup-cache"></a>

Expand Down
56 changes: 56 additions & 0 deletions docs/content/docs/connectors/datastream/jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,62 @@ JdbcSink.sink(
{{< /tab >}}
{{< /tabs >}}


#### 分片读取 字符串列 目前只支持 部分数据库
<table class="table table-bordered" style="width: 60%">
<thead>
<tr>
<th class="text-left">Support String Split Database</th>
<th class="text-left">Slice Read String Grammar</th>
</tr>
</thead>
<tbody>
<tr>
<td>MySQL</td>
<td>ABS(MD5(`column`) % `number_of_partitions`)</td>
</tr>
<tr>
<td>Oracle</td>
<td>MOD(ORA_HASH(`column`) , `number_of_partitions`)</td>
<tr>
<td>PostgreSQL</td>
<td>(ABS(HASHTEXT(`column`)) % `number_of_partitions`)</td>
</tr>
<tr>
<td>MS SQL Server</td>
<td>ABS(HASHBYTES('MD5', `column`) % `number_of_partitions`)</td>
</tr>
</table>

<a name="jdbc-catalog"></a>
### Supports Fragment Read String

```java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Serializable[][] queryParameters = new Long[3][1];
queryParameters[0] = new Long[]{0L};
queryParameters[1] = new Long[]{1L};
queryParameters[2] = new Long[]{2L};

TypeInformation<?>[] fieldTypes = new TypeInformation<?>[]{
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO};
RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);

JdbcInputFormat jdbcInputFormat = JdbcInputFormat.buildJdbcInputFormat()
.setDrivername("com.mysql.cj.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost:3306/test")
.setUsername("root")
.setPassword("12345678")
.setPartitionColumnTypeString(true) // When reading string partitioning key must be set to true
.setQuery("select * from fake_source_sink where ABS(MD5( `name`) % 2 ) = ?") // Modify SQL syntax for different databases
.setRowTypeInfo(rowTypeInfo).setParametersProvider(new JdbcGenericParameterValuesProvider(queryParameters)).finish();
DataStreamSource<Row> input = env.createInput(jdbcInputFormat);
env.execute();
```

### SQL DML statement and JDBC statement builder

The sink builds one [JDBC prepared statement](https://docs.oracle.com/en/java/javase/11/docs/api/java.sql/java/sql/PreparedStatement.html) from a user-provider SQL string, e.g.:
Expand Down
48 changes: 47 additions & 1 deletion docs/content/docs/connectors/table/jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -366,13 +366,59 @@ See [CREATE TABLE DDL]({{< ref "docs/dev/table/sql/create" >}}#create-table) for
To accelerate reading data in parallel `Source` task instances, Flink provides partitioned scan feature for JDBC table.

All the following scan partition options must all be specified if any of them is specified. They describe how to partition the table when reading in parallel from multiple tasks.
The `scan.partition.column` must be a numeric, date, or timestamp column from the table in question. Notice that `scan.partition.lower-bound` and `scan.partition.upper-bound` are used to decide the partition stride and filter the rows in table. If it is a batch job, it also doable to get the max and min value first before submitting the flink job.
The `scan.partition.column` must be a numeric, date, or timestamp or string column from the table in question. Notice that `scan.partition.lower-bound` and `scan.partition.upper-bound` are used to decide the partition stride and filter the rows in table. If it is a batch job, it also doable to get the max and min value first before submitting the flink job.

- `scan.partition.column`: The column name used for partitioning the input.
- `scan.partition.num`: The number of partitions.
- `scan.partition.lower-bound`: The smallest value of the first partition.
- `scan.partition.upper-bound`: The largest value of the last partition.

#### Support String Fragment Reading
<table class="table table-bordered" style="width: 60%">
<thead>
<tr>
<th class="text-left">Support String Split Database</th>
<th class="text-left">Slice Read String Grammar</th>
</tr>
</thead>
<tbody>
<tr>
<td>MySQL</td>
<td>ABS(MD5(`column`) % `number_of_partitions`)</td>
</tr>
<tr>
<td>Oracle</td>
<td>MOD(ORA_HASH(`column`) , `number_of_partitions`)</td>
<tr>
<td>PostgreSQL</td>
<td>(ABS(HASHTEXT(`column`)) % `number_of_partitions`)</td>
</tr>
<tr>
<td>MS SQL Server</td>
<td>ABS(HASHBYTES('MD5', `column`) % `number_of_partitions`)</td>
</tr>
</table>

```SQL
CREATE TABLE my_split_string_read_table (
id int,
name STRING,
age INT,
email STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/test',
'table-name' = 'mysql_table_name',
'username'='root',
'password'='12345678',
'scan.partition.column'='name',
'scan.partition.num'='3',
'scan.partition.lower-bound'='0', -- Obtain ABS(MD5(' name ') % 3) 0,1,2 Full table data partition data 3
'scan.partition.upper-bound'='2'
)
```

### Lookup Cache

JDBC connector can be used in temporal join as a lookup source (aka. dimension table). Currently, only sync lookup mode is supported.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public class JdbcInputFormat extends RichInputFormat<Row, InputSplit>

protected boolean hasNext;
protected Object[][] parameterValues;
protected boolean isPartitionColumnTypeString;

public JdbcInputFormat() {}

Expand Down Expand Up @@ -190,7 +191,11 @@ public void closeInputFormat() {
public void open(InputSplit inputSplit) throws IOException {
try {
if (inputSplit != null && parameterValues != null) {
for (int i = 0; i < parameterValues[inputSplit.getSplitNumber()].length; i++) {
int parameterLength =
isPartitionColumnTypeString
? 1
: parameterValues[inputSplit.getSplitNumber()].length;
for (int i = 0; i < parameterLength; i++) {
Object param = parameterValues[inputSplit.getSplitNumber()][i];
if (param instanceof String) {
statement.setString(i + 1, (String) param);
Expand Down Expand Up @@ -265,7 +270,7 @@ public void close() throws IOException {
* Checks whether all data has been read.
*
* @return boolean value indication whether all data has been read.
* @throws IOException
* @throws IOException if an I/O error occurs
*/
@Override
public boolean reachedEnd() throws IOException {
Expand Down Expand Up @@ -393,6 +398,12 @@ public JdbcInputFormatBuilder setParametersProvider(
return this;
}

public JdbcInputFormatBuilder setPartitionColumnTypeString(
boolean partitionColumnTypeString) {
format.isPartitionColumnTypeString = partitionColumnTypeString;
return this;
}

public JdbcInputFormatBuilder setRowTypeInfo(RowTypeInfo rowTypeInfo) {
format.rowTypeInfo = rowTypeInfo;
return this;
Expand Down Expand Up @@ -425,6 +436,7 @@ public JdbcInputFormat finish() {
if (format.parameterValues == null) {
LOG.debug("No input splitting configured (data will be read with parallelism 1).");
}

return format;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ public String quoteIdentifier(String identifier) {
return identifier;
}

@Override
public String hashModForField(String fieldName, int numPartitions) {
throw new IllegalArgumentException(
"The Db2 database itself is not supported by the hash md5 syntax "
+ fieldName
+ "Cannot be read in fragments");
}

@Override
public String dialectName() {
return "Db2";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ public String quoteIdentifier(String identifier) {
return identifier;
}

@Override
public String hashModForField(String fieldName, int numPartitions) {
throw new IllegalArgumentException(
"The Derby database itself is not supported by the hash md5 syntax "
+ fieldName
+ "Cannot be read in fragments");
}

@Override
public String dialectName() {
return "Derby";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ public String quoteIdentifier(String identifier) {
return identifier;
}

@Override
public String hashModForField(String fieldName, int numPartitions) {
return "MOD(ORA_HASH(" + quoteIdentifier(fieldName) + ")," + numPartitions + ")";
}

@Override
public Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ public Optional<String> defaultDriverName() {
return Optional.of("org.postgresql.Driver");
}

@Override
public String hashModForField(String fieldName, int numPartitions) {
return "(ABS(HASHTEXT(" + quoteIdentifier(fieldName) + ")) % " + numPartitions + ")";
}

@Override
public String dialectName() {
return "PostgreSQL";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public String quoteIdentifier(String identifier) {
return identifier;
}

@Override
public String hashModForField(String fieldName, int numPartitions) {
return "ABS(HASHBYTES('MD5', " + quoteIdentifier(fieldName) + ") % " + numPartitions + ")";
}

@Override
public Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ public String dialectName() {
return "Trino";
}

@Override
public String hashModForField(String fieldName, int numPartitions) {
throw new IllegalArgumentException(
"The Trino database itself is not supported by the hash md5 syntax "
+ fieldName
+ "Cannot be read in fragments");
}

@Override
public String quoteIdentifier(String identifier) {
return identifier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,17 @@ default Optional<String> defaultDriverName() {
*/
String quoteIdentifier(String identifier);

/**
* Computes the Hash value of the string.
*
* @param fieldName jdbc partition reads column names.
* @param numPartitions Set the number of partitions to be read.
* @return the quoted identifier.
*/
default String hashModForField(String fieldName, int numPartitions) {
return " ABS(MD5(" + quoteIdentifier(fieldName) + ") % " + numPartitions + ")";
}

/**
* Constructs the dialects upsert statement if supported; such as MySQL's {@code DUPLICATE KEY
* UPDATE}, or PostgreSQL's {@code ON CONFLICT... DO UPDATE SET..}. If supported, the returned
Expand Down
Loading