Skip to content

Commit 3b34b09

Browse files
committed
[feat-34267][redis] redis side support ipv6.
1 parent 7691047 commit 3b34b09

File tree

2 files changed

+37
-27
lines changed

2 files changed

+37
-27
lines changed

redis5/redis5-side/redis-all-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAllReqRow.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,18 @@
5555
import java.util.Set;
5656
import java.util.TreeSet;
5757
import java.util.concurrent.atomic.AtomicReference;
58+
import java.util.regex.Matcher;
59+
import java.util.regex.Pattern;
60+
5861
/**
5962
* @author yanxi
6063
*/
6164
public class RedisAllReqRow extends BaseAllReqRow {
6265

6366
private static final long serialVersionUID = 7578879189085344807L;
6467

68+
private static final Pattern HOST_PORT_PATTERN = Pattern.compile("(?<host>(.*)):(?<port>\\d+)*");
69+
6570
private static final Logger LOG = LoggerFactory.getLogger(RedisAllReqRow.class);
6671

6772
private static final int CONN_RETRY_NUM = 3;
@@ -72,9 +77,9 @@ public class RedisAllReqRow extends BaseAllReqRow {
7277

7378
private RedisSideTableInfo tableInfo;
7479

75-
private AtomicReference<Map<String, Map<String, String>>> cacheRef = new AtomicReference<>();
80+
private final AtomicReference<Map<String, Map<String, String>>> cacheRef = new AtomicReference<>();
7681

77-
private RedisSideReqRow redisSideReqRow;
82+
private final RedisSideReqRow redisSideReqRow;
7883

7984
public RedisAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
8085
super(new RedisAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
@@ -147,7 +152,7 @@ private void loadData(Map<String, Map<String, String>> tmpCache) throws SQLExcep
147152
JedisCommands jedis = null;
148153
try {
149154
StringBuilder keyPattern = new StringBuilder(tableInfo.getTableName());
150-
for (String key : tableInfo.getPrimaryKeys()) {
155+
for (int i = 0; i < tableInfo.getPrimaryKeys().size(); i++) {
151156
keyPattern.append("_").append("*");
152157
}
153158
jedis = getJedisWithRetry(CONN_RETRY_NUM);
@@ -193,14 +198,22 @@ private JedisCommands getJedis(RedisSideTableInfo tableInfo) {
193198
String firstPort = firstIpPort[1];
194199
Set<HostAndPort> addresses = new HashSet<>();
195200
Set<String> ipPorts = new HashSet<>();
196-
for (String ipPort : nodes) {
197-
ipPorts.add(ipPort);
198-
String[] ipPortPair = ipPort.split(":");
199-
addresses.add(new HostAndPort(ipPortPair[0].trim(), Integer.valueOf(ipPortPair[1].trim())));
200-
}
201-
if (timeout == 0){
202-
timeout = 1000;
201+
202+
// 对ipv6 支持
203+
for (String node : nodes) {
204+
ipPorts.add(node);
205+
Matcher matcher = HOST_PORT_PATTERN.matcher(node);
206+
if (matcher.find()) {
207+
String host = matcher.group("host").trim();
208+
String portStr = matcher.group("port").trim();
209+
if (StringUtils.isNotBlank(host) && StringUtils.isNotBlank(portStr)) {
210+
// 转化为int格式的端口
211+
int port = Integer.parseInt(portStr);
212+
addresses.add(new HostAndPort(host, port));
213+
}
214+
}
203215
}
216+
204217
JedisCommands jedis = null;
205218
GenericObjectPoolConfig poolConfig = setPoolConfig(tableInfo.getMaxTotal(), tableInfo.getMaxIdle(), tableInfo.getMinIdle());
206219
switch (RedisType.parse(tableInfo.getRedisType())){

redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public class RedisAsyncReqRow extends BaseAsyncReqRow {
6666

6767
private RedisSideTableInfo redisSideTableInfo;
6868

69-
private RedisSideReqRow redisSideReqRow;
69+
private final RedisSideReqRow redisSideReqRow;
7070

7171
public RedisAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
7272
super(new RedisAsyncSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
@@ -92,15 +92,15 @@ private void buildRedisClient(RedisSideTableInfo tableInfo){
9292
case STANDALONE:
9393
RedisURI redisURI = RedisURI.create("redis://" + url);
9494
redisURI.setPassword(password);
95-
redisURI.setDatabase(Integer.valueOf(database));
95+
redisURI.setDatabase(Integer.parseInt(database));
9696
redisClient = RedisClient.create(redisURI);
9797
connection = redisClient.connect();
9898
async = connection.async();
9999
break;
100100
case SENTINEL:
101101
RedisURI redisSentinelURI = RedisURI.create("redis-sentinel://" + url);
102102
redisSentinelURI.setPassword(password);
103-
redisSentinelURI.setDatabase(Integer.valueOf(database));
103+
redisSentinelURI.setDatabase(Integer.parseInt(database));
104104
redisSentinelURI.setSentinelMasterId(redisSideTableInfo.getMasterName());
105105
redisClient = RedisClient.create(redisSentinelURI);
106106
connection = redisClient.connect();
@@ -129,21 +129,18 @@ public void handleAsyncInvoke(Map<String, Object> inputParams, BaseRow input, Re
129129
return;
130130
}
131131
RedisFuture<Map<String, String>> future = ((RedisHashAsyncCommands) async).hgetall(key);
132-
future.thenAccept(new Consumer<Map<String, String>>() {
133-
@Override
134-
public void accept(Map<String, String> values) {
135-
if (MapUtils.isNotEmpty(values)) {
136-
try {
137-
BaseRow row = fillData(input, values);
138-
dealCacheData(key,CacheObj.buildCacheObj(ECacheContentType.SingleLine, row));
139-
RowDataComplete.completeBaseRow(resultFuture, row);
140-
} catch (Exception e) {
141-
dealFillDataError(input, resultFuture, e);
142-
}
143-
} else {
144-
dealMissKey(input, resultFuture);
145-
dealCacheData(key, CacheMissVal.getMissKeyObj());
132+
future.thenAccept(values -> {
133+
if (MapUtils.isNotEmpty(values)) {
134+
try {
135+
BaseRow row = fillData(input, values);
136+
dealCacheData(key,CacheObj.buildCacheObj(ECacheContentType.SingleLine, row));
137+
RowDataComplete.completeBaseRow(resultFuture, row);
138+
} catch (Exception e) {
139+
dealFillDataError(input, resultFuture, e);
146140
}
141+
} else {
142+
dealMissKey(input, resultFuture);
143+
dealCacheData(key, CacheMissVal.getMissKeyObj());
147144
}
148145
});
149146
}

0 commit comments

Comments
 (0)