Skip to content

Commit 9281216

Browse files
committed
[feat-34267][redis] opt code.
1 parent 3b34b09 commit 9281216

File tree

6 files changed

+124
-115
lines changed

6 files changed

+124
-115
lines changed

redis5/pom.xml

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,18 @@
1515
<module>redis5-sink</module>
1616
<module>redis5-side</module>
1717
</modules>
18-
18+
<dependencies>
19+
<dependency>
20+
<groupId>com.dtstack.flink</groupId>
21+
<artifactId>sql.core</artifactId>
22+
<version>1.0-SNAPSHOT</version>
23+
<scope>provided</scope>
24+
</dependency>
25+
<dependency>
26+
<groupId>redis.clients</groupId>
27+
<artifactId>jedis</artifactId>
28+
<version>2.9.0</version>
29+
</dependency>
30+
</dependencies>
1931

2032
</project>

redis5/redis5-side/pom.xml

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,6 @@
1111
<modelVersion>4.0.0</modelVersion>
1212
<artifactId>sql.side.redis</artifactId>
1313
<name>redis-side</name>
14-
<dependencies>
15-
<dependency>
16-
<groupId>com.dtstack.flink</groupId>
17-
<artifactId>sql.core</artifactId>
18-
<version>1.0-SNAPSHOT</version>
19-
<scope>provided</scope>
20-
</dependency>
21-
<dependency>
22-
<groupId>redis.clients</groupId>
23-
<artifactId>jedis</artifactId>
24-
<version>2.8.0</version>
25-
</dependency>
26-
</dependencies>
27-
2814

2915
<modules>
3016
<module>redis-side-core</module>

redis5/redis5-side/redis-all-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAllReqRow.java

Lines changed: 55 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import com.google.common.collect.Maps;
3131
import org.apache.calcite.sql.JoinType;
3232
import org.apache.commons.collections.CollectionUtils;
33-
import org.apache.commons.lang3.StringUtils;
3433
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
3534
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3635
import org.apache.flink.table.dataformat.BaseRow;
@@ -48,10 +47,12 @@
4847
import java.io.Closeable;
4948
import java.io.IOException;
5049
import java.sql.SQLException;
50+
import java.util.Arrays;
5151
import java.util.Calendar;
5252
import java.util.HashSet;
5353
import java.util.List;
5454
import java.util.Map;
55+
import java.util.Objects;
5556
import java.util.Set;
5657
import java.util.TreeSet;
5758
import java.util.concurrent.atomic.AtomicReference;
@@ -117,9 +118,9 @@ protected void reloadCache() {
117118
public void flatMap(BaseRow input, Collector<BaseRow> out) throws Exception {
118119
GenericRow genericRow = (GenericRow) input;
119120
Map<String, Object> inputParams = Maps.newHashMap();
120-
for(Integer conValIndex : sideInfo.getEqualValIndex()){
121+
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
121122
Object equalObj = genericRow.getField(conValIndex);
122-
if(equalObj == null){
123+
if (equalObj == null) {
123124
if (sideInfo.getJoinType() == JoinType.LEFT) {
124125
BaseRow data = fillData(input, null);
125126
RowDataComplete.collectBaseRow(out, data);
@@ -133,11 +134,11 @@ public void flatMap(BaseRow input, Collector<BaseRow> out) throws Exception {
133134

134135
Map<String, String> cacheMap = cacheRef.get().get(key);
135136

136-
if (cacheMap == null){
137-
if(sideInfo.getJoinType() == JoinType.LEFT){
137+
if (cacheMap == null) {
138+
if (sideInfo.getJoinType() == JoinType.LEFT) {
138139
BaseRow data = fillData(input, null);
139140
RowDataComplete.collectBaseRow(out, data);
140-
}else{
141+
} else {
141142
return;
142143
}
143144

@@ -187,49 +188,54 @@ private JedisCommands getJedis(RedisSideTableInfo tableInfo) {
187188
String url = tableInfo.getUrl();
188189
String password = tableInfo.getPassword();
189190
String database = tableInfo.getDatabase() == null ? "0" : tableInfo.getDatabase();
191+
String masterName = tableInfo.getMasterName();
190192
int timeout = tableInfo.getTimeout();
191-
if (timeout == 0){
192-
timeout = 1000;
193-
}
194-
195-
String[] nodes = StringUtils.split(url, ",");
196-
String[] firstIpPort = StringUtils.split(nodes[0], ":");
197-
String firstIp = firstIpPort[0];
198-
String firstPort = firstIpPort[1];
199-
Set<HostAndPort> addresses = new HashSet<>();
200-
Set<String> ipPorts = new HashSet<>();
201-
202-
// 对ipv6 支持
203-
for (String node : nodes) {
204-
ipPorts.add(node);
205-
Matcher matcher = HOST_PORT_PATTERN.matcher(node);
206-
if (matcher.find()) {
207-
String host = matcher.group("host").trim();
208-
String portStr = matcher.group("port").trim();
209-
if (StringUtils.isNotBlank(host) && StringUtils.isNotBlank(portStr)) {
210-
// 转化为int格式的端口
211-
int port = Integer.parseInt(portStr);
212-
addresses.add(new HostAndPort(host, port));
213-
}
214-
}
193+
if (timeout == 0) {
194+
timeout = 10000;
215195
}
216196

197+
String[] nodes = url.split(",");
217198
JedisCommands jedis = null;
218199
GenericObjectPoolConfig poolConfig = setPoolConfig(tableInfo.getMaxTotal(), tableInfo.getMaxIdle(), tableInfo.getMinIdle());
219-
switch (RedisType.parse(tableInfo.getRedisType())){
220-
//单机
200+
switch (RedisType.parse(tableInfo.getRedisType())) {
221201
case STANDALONE:
222-
pool = new JedisPool(poolConfig, firstIp, Integer.parseInt(firstPort), timeout, password, Integer.parseInt(database));
223-
jedis = pool.getResource();
202+
String firstIp = null;
203+
String firstPort = null;
204+
Matcher standalone = HOST_PORT_PATTERN.matcher(nodes[0]);
205+
if (standalone.find()) {
206+
firstIp = standalone.group("host").trim();
207+
firstPort = standalone.group("port").trim();
208+
}
209+
if (Objects.nonNull(firstIp)) {
210+
pool = new JedisPool(poolConfig, firstIp, Integer.parseInt(firstPort), timeout, password, Integer.parseInt(database));
211+
jedis = pool.getResource();
212+
} else {
213+
throw new IllegalArgumentException(
214+
String.format("redis url error. current url [%s]", nodes[0]));
215+
}
224216
break;
225-
//哨兵
226217
case SENTINEL:
227-
jedisSentinelPool = new JedisSentinelPool(tableInfo.getMasterName(), ipPorts, poolConfig, timeout, password, Integer.parseInt(database));
218+
Set<String> ipPorts = new HashSet<>(Arrays.asList(nodes));
219+
jedisSentinelPool = new JedisSentinelPool(masterName, ipPorts, poolConfig, timeout, password, Integer.parseInt(database));
228220
jedis = jedisSentinelPool.getResource();
229221
break;
230-
//集群
231222
case CLUSTER:
232-
jedis = new JedisCluster(addresses, timeout, timeout,1, poolConfig);
223+
Set<HostAndPort> addresses = new HashSet<>();
224+
// 对ipv6 支持
225+
for (String node : nodes) {
226+
Matcher matcher = HOST_PORT_PATTERN.matcher(node);
227+
if (matcher.find()) {
228+
String host = matcher.group("host").trim();
229+
String portStr = matcher.group("port").trim();
230+
if (org.apache.commons.lang3.StringUtils.isNotBlank(host) && org.apache.commons.lang3.StringUtils.isNotBlank(portStr)) {
231+
// 转化为int格式的端口
232+
int port = Integer.parseInt(portStr);
233+
addresses.add(new HostAndPort(host, port));
234+
}
235+
}
236+
}
237+
jedis = new JedisCluster(addresses, timeout, timeout, 10, password, poolConfig);
238+
break;
233239
default:
234240
break;
235241
}
@@ -257,35 +263,32 @@ private JedisCommands getJedisWithRetry(int retryNum) {
257263
return null;
258264
}
259265

260-
private Set<String> getRedisKeys(RedisType redisType, JedisCommands jedis, String keyPattern){
261-
if(!redisType.equals(RedisType.CLUSTER)){
266+
private Set<String> getRedisKeys(RedisType redisType, JedisCommands jedis, String keyPattern) {
267+
if (!redisType.equals(RedisType.CLUSTER)) {
262268
return ((Jedis) jedis).keys(keyPattern);
263269
}
264270
Set<String> keys = new TreeSet<>();
265-
Map<String, JedisPool> clusterNodes = ((JedisCluster)jedis).getClusterNodes();
266-
for(String k : clusterNodes.keySet()){
271+
Map<String, JedisPool> clusterNodes = ((JedisCluster) jedis).getClusterNodes();
272+
for (String k : clusterNodes.keySet()) {
267273
JedisPool jp = clusterNodes.get(k);
268-
Jedis connection = jp.getResource();
269-
try {
274+
try (Jedis connection = jp.getResource()) {
270275
keys.addAll(connection.keys(keyPattern));
271-
} catch (Exception e){
272-
LOG.error("Getting keys error: {}", e);
273-
} finally {
274-
connection.close();
276+
} catch (Exception e) {
277+
LOG.error("Getting keys error", e);
275278
}
276279
}
277280
return keys;
278281
}
279282

280-
private GenericObjectPoolConfig setPoolConfig(String maxTotal, String maxIdle, String minIdle){
283+
private GenericObjectPoolConfig setPoolConfig(String maxTotal, String maxIdle, String minIdle) {
281284
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
282-
if (maxTotal != null){
285+
if (maxTotal != null) {
283286
config.setMaxTotal(Integer.parseInt(maxTotal));
284287
}
285-
if (maxIdle != null){
288+
if (maxIdle != null) {
286289
config.setMaxIdle(Integer.parseInt(maxIdle));
287290
}
288-
if (minIdle != null){
291+
if (minIdle != null) {
289292
config.setMinIdle(Integer.parseInt(minIdle));
290293
}
291294
return config;

redis5/redis5-side/redis-side-core/pom.xml

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,7 @@
1212
</parent>
1313

1414
<artifactId>sql.side.redis.core</artifactId>
15-
<dependencies>
16-
<dependency>
17-
<groupId>com.dtstack.flink</groupId>
18-
<artifactId>sql.core</artifactId>
19-
<version>1.0-SNAPSHOT</version>
20-
<scope>provided</scope>
21-
</dependency>
22-
</dependencies>
15+
2316
<packaging>jar</packaging>
2417

2518

redis5/redis5-sink/pom.xml

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,6 @@
1616
<name>redis-sink</name>
1717
<url>http://maven.apache.org</url>
1818

19-
<dependencies>
20-
<dependency>
21-
<groupId>com.dtstack.flink</groupId>
22-
<artifactId>sql.core</artifactId>
23-
<version>1.0-SNAPSHOT</version>
24-
<scope>provided</scope>
25-
</dependency>
26-
<dependency>
27-
<groupId>redis.clients</groupId>
28-
<artifactId>jedis</artifactId>
29-
<version>2.9.0</version>
30-
</dependency>
31-
</dependencies>
32-
3319
<build>
3420
<plugins>
3521
<plugin>

redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisOutputFormat.java

Lines changed: 55 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,11 @@
3737

3838
import java.io.Closeable;
3939
import java.io.IOException;
40+
import java.util.Arrays;
4041
import java.util.HashSet;
4142
import java.util.List;
4243
import java.util.Map;
44+
import java.util.Objects;
4345
import java.util.Set;
4446
import java.util.regex.Matcher;
4547
import java.util.regex.Pattern;
@@ -121,41 +123,68 @@ private GenericObjectPoolConfig setPoolConfig(String maxTotal, String maxIdle, S
121123
private void establishConnection() {
122124
GenericObjectPoolConfig poolConfig = setPoolConfig(maxTotal, maxIdle, minIdle);
123125
String[] nodes = StringUtils.split(url, ",");
124-
String[] firstIpPort = StringUtils.split(nodes[0], ":");
125-
String firstIp = firstIpPort[0];
126-
String firstPort = firstIpPort[1];
127-
Set<HostAndPort> addresses = new HashSet<>();
128-
Set<String> ipPorts = new HashSet<>();
129-
130-
// 对ipv6 支持
131-
for (String node : nodes) {
132-
ipPorts.add(node);
133-
Matcher matcher = HOST_PORT_PATTERN.matcher(node);
134-
if (matcher.find()) {
135-
String host = matcher.group("host").trim();
136-
String portStr = matcher.group("port").trim();
137-
if (StringUtils.isNotBlank(host) && StringUtils.isNotBlank(portStr)) {
138-
// 转化为int格式的端口
139-
int port = Integer.parseInt(portStr);
140-
addresses.add(new HostAndPort(host, port));
141-
}
142-
}
143-
}
144126

145-
switch (RedisType.parse(redisType)) {
127+
switch (RedisType.parse(redisType)){
146128
case STANDALONE:
147-
pool = new JedisPool(poolConfig, firstIp, Integer.parseInt(firstPort), timeout, password, Integer.parseInt(database));
148-
jedis = pool.getResource();
129+
String firstIp = null;
130+
String firstPort = null;
131+
Matcher standalone = HOST_PORT_PATTERN.matcher(nodes[0]);
132+
if (standalone.find()) {
133+
firstIp = standalone.group("host").trim();
134+
firstPort = standalone.group("port").trim();
135+
}
136+
if (Objects.nonNull(firstIp)) {
137+
pool = new JedisPool(
138+
poolConfig,
139+
firstIp,
140+
Integer.parseInt(firstPort),
141+
timeout,
142+
password,
143+
Integer.parseInt(database));
144+
145+
jedis = pool.getResource();
146+
} else {
147+
throw new IllegalArgumentException(
148+
String.format("redis url error. current url [%s]", nodes[0]));
149+
}
149150
break;
150151
case SENTINEL:
151-
jedisSentinelPool = new JedisSentinelPool(masterName, ipPorts, poolConfig, timeout, password, Integer.parseInt(database));
152+
Set<String> ipPorts = new HashSet<>(Arrays.asList(nodes));
153+
jedisSentinelPool = new JedisSentinelPool(
154+
masterName,
155+
ipPorts,
156+
poolConfig,
157+
timeout,
158+
password,
159+
Integer.parseInt(database));
160+
152161
jedis = jedisSentinelPool.getResource();
153162
break;
154163
case CLUSTER:
155-
jedis = new JedisCluster(addresses, timeout, timeout, 10, password, poolConfig);
164+
Set<HostAndPort> addresses = new HashSet<>();
165+
// 对ipv6 支持
166+
for (String node : nodes) {
167+
Matcher matcher = HOST_PORT_PATTERN.matcher(node);
168+
if (matcher.find()) {
169+
String host = matcher.group("host").trim();
170+
String portStr = matcher.group("port").trim();
171+
if (StringUtils.isNotBlank(host) && StringUtils.isNotBlank(portStr)) {
172+
// 转化为int格式的端口
173+
int port = Integer.parseInt(portStr);
174+
addresses.add(new HostAndPort(host, port));
175+
}
176+
}
177+
}
178+
jedis = new JedisCluster(
179+
addresses,
180+
timeout,
181+
timeout,
182+
10,
183+
password,
184+
poolConfig);
156185
break;
157186
default:
158-
throw new RuntimeException("unsupported redis type[ " + redisType + "]");
187+
throw new IllegalArgumentException("unsupported redis type[ " + redisType + "]");
159188
}
160189
}
161190

0 commit comments

Comments
 (0)