Skip to content

Commit c7f0a5a

Browse files
1 parent 544275c commit c7f0a5a

File tree

26 files changed

+2513
-44
lines changed

26 files changed

+2513
-44
lines changed

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java

+1-18
Original file line numberDiff line numberDiff line change
@@ -125,17 +125,10 @@ public AbstractJdbcCatalog(
125125
checkNotNull(userClassLoader);
126126
checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl));
127127

128-
validateJdbcUrl(baseUrl);
129-
130128
this.userClassLoader = userClassLoader;
129+
this.connectionProperties = Preconditions.checkNotNull(connectionProperties);
131130
this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
132131
this.defaultUrl = getDatabaseUrl(defaultDatabase);
133-
this.connectionProperties = Preconditions.checkNotNull(connectionProperties);
134-
checkArgument(
135-
!StringUtils.isNullOrWhitespaceOnly(connectionProperties.getProperty(USER_KEY)));
136-
checkArgument(
137-
!StringUtils.isNullOrWhitespaceOnly(
138-
connectionProperties.getProperty(PASSWORD_KEY)));
139132
}
140133

141134
protected String getDatabaseUrl(String databaseName) {
@@ -576,16 +569,6 @@ protected String getSchemaTableName(ObjectPath tablePath) {
576569
throw new UnsupportedOperationException();
577570
}
578571

579-
/**
580-
* URL has to be without database, like "jdbc:dialect://localhost:1234/" or
581-
* "jdbc:dialect://localhost:1234" rather than "jdbc:dialect://localhost:1234/db".
582-
*/
583-
protected static void validateJdbcUrl(String url) {
584-
String[] parts = url.trim().split("\\/+");
585-
586-
checkArgument(parts.length == 2);
587-
}
588-
589572
@Override
590573
public boolean equals(Object o) {
591574
if (this == o) {

flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcDynamicTableSinkITCase.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ private void createTestDataTempView(StreamTableEnvironment tEnv, String viewName
198198
}
199199

200200
@Test
201-
void testReal() throws Exception {
201+
protected void testReal() throws Exception {
202202
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
203203
env.getConfig().enableObjectReuse();
204204
StreamTableEnvironment tEnv =
@@ -290,7 +290,7 @@ void testAppend() throws Exception {
290290
}
291291

292292
@Test
293-
void testBatchSink() throws Exception {
293+
protected void testBatchSink() throws Exception {
294294
TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
295295

296296
String tableName = "batchSink";

flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBase.java

+11-5
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,10 @@
5252
/** Base table operations. * */
5353
public abstract class TableBase<T> implements TableManaged {
5454

55-
private final String name;
56-
private final TableField[] fields;
55+
public static final String DEFAULT_PRIMARY_KEY_CONSTRAINT_NAME = "PRIMARY";
56+
57+
protected final String name;
58+
protected final TableField[] fields;
5759

5860
protected TableBase(String name, TableField[] fields) {
5961
Preconditions.checkArgument(name != null && !name.isEmpty(), "Table name must be defined");
@@ -69,7 +71,7 @@ public String getTableName() {
6971
return name;
7072
}
7173

72-
private Stream<TableField> getStreamFields() {
74+
protected Stream<TableField> getStreamFields() {
7375
return Arrays.stream(this.fields);
7476
}
7577

@@ -120,7 +122,7 @@ public int[] getTableTypes() {
120122
.toArray();
121123
}
122124

123-
public Schema getTableSchema() {
125+
public Schema getTableSchema(String pkConstraintName) {
124126
Schema.Builder schema = Schema.newBuilder();
125127
getStreamFields().forEach(field -> schema.column(field.getName(), field.getDataType()));
126128

@@ -129,11 +131,15 @@ public Schema getTableSchema() {
129131
.filter(TableField::isPkField)
130132
.map(TableField::getName)
131133
.collect(Collectors.joining(", "));
132-
schema.primaryKeyNamed("PRIMARY", pkFields);
134+
schema.primaryKeyNamed(pkConstraintName, pkFields);
133135

134136
return schema.build();
135137
}
136138

139+
public Schema getTableSchema() {
140+
return getTableSchema(DEFAULT_PRIMARY_KEY_CONSTRAINT_NAME);
141+
}
142+
137143
public ResolvedSchema getTableResolvedSchema() {
138144
return ResolvedSchema.of(
139145
getStreamFields()

flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableRow.java

+2
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ protected JdbcResultSetBuilder<Row> getResultSetBuilder() {
9191
} else if (type.getConversionClass().equals(LocalDateTime.class)) {
9292
ps.setTimestamp(
9393
i + 1, Timestamp.valueOf(row.<LocalDateTime>getFieldAs(i)));
94+
} else if (type.getConversionClass().equals(Float.class)) {
95+
ps.setFloat(i + 1, row.<Float>getFieldAs(i));
9496
} else {
9597
ps.setObject(i + 1, row.getField(i));
9698
}

flink-connector-jdbc-spanner/pom.xml

+98
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
<parent>
7+
<groupId>org.apache.flink</groupId>
8+
<artifactId>flink-connector-jdbc-parent</artifactId>
9+
<version>3.3-SNAPSHOT</version>
10+
</parent>
11+
12+
<artifactId>flink-connector-jdbc-spanner</artifactId>
13+
<name>Flink : Connectors : JDBC : Spanner</name>
14+
15+
<packaging>jar</packaging>
16+
17+
<properties>
18+
<spanner.version>2.26.1</spanner.version>
19+
</properties>
20+
21+
<dependencies>
22+
23+
<dependency>
24+
<groupId>org.apache.flink</groupId>
25+
<artifactId>flink-connector-jdbc-core</artifactId>
26+
<version>${project.version}</version>
27+
</dependency>
28+
29+
<dependency>
30+
<groupId>org.apache.flink</groupId>
31+
<artifactId>flink-connector-jdbc-core</artifactId>
32+
<version>${project.version}</version>
33+
<type>test-jar</type>
34+
<scope>test</scope>
35+
</dependency>
36+
37+
<dependency>
38+
<groupId>org.apache.flink</groupId>
39+
<artifactId>flink-table-api-java-bridge</artifactId>
40+
<version>${flink.version}</version>
41+
<scope>provided</scope>
42+
<optional>true</optional>
43+
</dependency>
44+
45+
<dependency>
46+
<groupId>org.apache.flink</groupId>
47+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
48+
<version>${flink.version}</version>
49+
<scope>test</scope>
50+
</dependency>
51+
<dependency>
52+
<groupId>org.apache.flink</groupId>
53+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
54+
<version>${flink.version}</version>
55+
<type>test-jar</type>
56+
<scope>test</scope>
57+
</dependency>
58+
59+
<dependency>
60+
<groupId>org.apache.flink</groupId>
61+
<artifactId>flink-test-utils</artifactId>
62+
<version>${flink.version}</version>
63+
<scope>test</scope>
64+
</dependency>
65+
66+
67+
<!-- Spanner -->
68+
<dependency>
69+
<groupId>com.google.cloud</groupId>
70+
<artifactId>google-cloud-spanner-jdbc</artifactId>
71+
<version>${spanner.version}</version>
72+
<scope>provided</scope>
73+
</dependency>
74+
75+
<!-- Assertions test dependencies -->
76+
77+
<dependency>
78+
<groupId>org.assertj</groupId>
79+
<artifactId>assertj-core</artifactId>
80+
<version>${assertj.version}</version>
81+
<scope>test</scope>
82+
</dependency>
83+
84+
<!-- Spanner test -->
85+
<dependency>
86+
<groupId>org.testcontainers</groupId>
87+
<artifactId>gcloud</artifactId>
88+
<scope>test</scope>
89+
</dependency>
90+
<dependency>
91+
<groupId>org.testcontainers</groupId>
92+
<artifactId>jdbc</artifactId>
93+
<scope>test</scope>
94+
</dependency>
95+
96+
</dependencies>
97+
98+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.jdbc.spanner.database;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.connector.jdbc.core.database.JdbcFactory;
23+
import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog;
24+
import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect;
25+
import org.apache.flink.connector.jdbc.spanner.database.catalog.SpannerCatalog;
26+
import org.apache.flink.connector.jdbc.spanner.database.dialect.SpannerDialect;
27+
28+
/** Factory for {@link SpannerDialect}. */
29+
@Internal
30+
public class SpannerFactory implements JdbcFactory {
31+
@Override
32+
public boolean acceptsURL(String url) {
33+
return url.startsWith("jdbc:cloudspanner:");
34+
}
35+
36+
@Override
37+
public JdbcDialect createDialect() {
38+
return new SpannerDialect();
39+
}
40+
41+
@Override
42+
public JdbcCatalog createCatalog(
43+
ClassLoader classLoader,
44+
String catalogName,
45+
String defaultDatabase,
46+
String username,
47+
String pwd,
48+
String baseUrl) {
49+
return new SpannerCatalog(
50+
classLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
51+
}
52+
}

0 commit comments

Comments
 (0)