Skip to content

Commit 0b0fe8c

Browse files
author
gituser
committed
Merge branch '1.10_release_4.0.x' into 1.10_release_4.1.x
2 parents 41c3c20 + dea1375 commit 0b0fe8c

File tree

2 files changed

+36
-0
lines changed

2 files changed

+36
-0
lines changed

localTest/pom.xml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,32 @@
111111
<version>1.0-SNAPSHOT</version>
112112
</dependency>
113113

114+
115+
<dependency>
116+
<groupId>com.dtstack.flink</groupId>
117+
<artifactId>sql.postgresql</artifactId>
118+
<version>1.0-SNAPSHOT</version>
119+
</dependency>
120+
121+
<dependency>
122+
<groupId>com.dtstack.flink</groupId>
123+
<artifactId>sql.side.all.postgresql</artifactId>
124+
<version>1.0-SNAPSHOT</version>
125+
</dependency>
126+
127+
<dependency>
128+
<groupId>com.dtstack.flink</groupId>
129+
<artifactId>sql.sink.postgresql</artifactId>
130+
<version>1.0-SNAPSHOT</version>
131+
</dependency>
132+
133+
<dependency>
134+
<groupId>com.dtstack.flink</groupId>
135+
<artifactId>sql.side.async.postgresql</artifactId>
136+
<version>1.0-SNAPSHOT</version>
137+
</dependency>
138+
139+
114140
<dependency>
115141
<groupId>com.dtstack.flink</groupId>
116142
<artifactId>sql.hbase</artifactId>

postgresql/postgresql-sink/src/main/java/com/dtstack/flink/sql/sink/postgresql/PostgresqlDialect.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121

2222
import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect;
23+
import org.apache.commons.lang3.StringUtils;
2324

2425
import java.util.Arrays;
2526
import java.util.Optional;
@@ -48,9 +49,18 @@ public Optional<String> getUpsertStatement(String schema, String tableName, Stri
4849
.collect(Collectors.joining(", "));
4950

5051
String updateClause = Arrays.stream(fieldNames)
52+
.filter(item -> !Arrays.asList(uniqueKeyFields).contains(item))
5153
.map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f))
5254
.collect(Collectors.joining(", "));
5355

56+
if (StringUtils.isBlank(updateClause)) {
57+
return Optional.of(
58+
getInsertIntoStatement(schema, tableName, fieldNames, null)
59+
+ " ON CONFLICT ("
60+
+ uniqueColumns
61+
+ ") DO NOTHING");
62+
}
63+
5464
return Optional.of(getInsertIntoStatement(schema, tableName, fieldNames, null) +
5565
" ON CONFLICT (" + uniqueColumns + ")" +
5666
" DO UPDATE SET " + updateClause

0 commit comments

Comments
 (0)