Skip to content

Commit 7691047

Browse files
committed
[feat-34267][redis] redis support ipv6.
1 parent d022e4b commit 7691047

File tree

1 file changed

+23
-12
lines changed

1 file changed

+23
-12
lines changed

redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisOutputFormat.java

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,17 @@
4141
import java.util.List;
4242
import java.util.Map;
4343
import java.util.Set;
44+
import java.util.regex.Matcher;
45+
import java.util.regex.Pattern;
4446

4547
/**
4648
* @author yanxi
4749
*/
48-
public class RedisOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
50+
public class RedisOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolean, Row>> {
4951
private static final Logger LOG = LoggerFactory.getLogger(RedisOutputFormat.class);
5052

53+
private static final Pattern HOST_PORT_PATTERN = Pattern.compile("(?<host>(.*)):(?<port>\\d+)*");
54+
5155
protected String[] fieldNames;
5256

5357
protected TypeInformation<?>[] fieldTypes;
@@ -82,8 +86,6 @@ public class RedisOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
8286

8387
private JedisSentinelPool jedisSentinelPool;
8488

85-
private GenericObjectPoolConfig poolConfig;
86-
8789
private RedisOutputFormat() {
8890
}
8991

@@ -117,17 +119,27 @@ private GenericObjectPoolConfig setPoolConfig(String maxTotal, String maxIdle, S
117119
}
118120

119121
private void establishConnection() {
120-
poolConfig = setPoolConfig(maxTotal, maxIdle, minIdle);
122+
GenericObjectPoolConfig poolConfig = setPoolConfig(maxTotal, maxIdle, minIdle);
121123
String[] nodes = StringUtils.split(url, ",");
122124
String[] firstIpPort = StringUtils.split(nodes[0], ":");
123125
String firstIp = firstIpPort[0];
124126
String firstPort = firstIpPort[1];
125127
Set<HostAndPort> addresses = new HashSet<>();
126128
Set<String> ipPorts = new HashSet<>();
127-
for (String ipPort : nodes) {
128-
ipPorts.add(ipPort);
129-
String[] ipPortPair = StringUtils.split(ipPort, ":");
130-
addresses.add(new HostAndPort(ipPortPair[0].trim(), Integer.parseInt(ipPortPair[1].trim())));
129+
130+
// 对ipv6 支持
131+
for (String node : nodes) {
132+
ipPorts.add(node);
133+
Matcher matcher = HOST_PORT_PATTERN.matcher(node);
134+
if (matcher.find()) {
135+
String host = matcher.group("host").trim();
136+
String portStr = matcher.group("port").trim();
137+
if (StringUtils.isNotBlank(host) && StringUtils.isNotBlank(portStr)) {
138+
// 转化为int格式的端口
139+
int port = Integer.parseInt(portStr);
140+
addresses.add(new HostAndPort(host, port));
141+
}
142+
}
131143
}
132144

133145
switch (RedisType.parse(redisType)) {
@@ -148,13 +160,12 @@ private void establishConnection() {
148160
}
149161

150162
@Override
151-
public void writeRecord(Tuple2 record) throws IOException {
152-
Tuple2<Boolean, Row> tupleTrans = record;
153-
Boolean retract = tupleTrans.getField(0);
163+
public void writeRecord(Tuple2<Boolean, Row> record) throws IOException {
164+
Boolean retract = record.getField(0);
154165
if (!retract) {
155166
return;
156167
}
157-
Row row = tupleTrans.getField(1);
168+
Row row = record.getField(1);
158169
if (row.getArity() != fieldNames.length) {
159170
return;
160171
}

0 commit comments

Comments
 (0)