3030import com .google .common .collect .Maps ;
3131import org .apache .calcite .sql .JoinType ;
3232import org .apache .commons .collections .CollectionUtils ;
33- import org .apache .commons .lang3 .StringUtils ;
3433import org .apache .commons .pool2 .impl .GenericObjectPoolConfig ;
3534import org .apache .flink .api .java .typeutils .RowTypeInfo ;
3635import org .apache .flink .table .dataformat .BaseRow ;
4847import java .io .Closeable ;
4948import java .io .IOException ;
5049import java .sql .SQLException ;
50+ import java .util .Arrays ;
5151import java .util .Calendar ;
5252import java .util .HashSet ;
5353import java .util .List ;
5454import java .util .Map ;
55+ import java .util .Objects ;
5556import java .util .Set ;
5657import java .util .TreeSet ;
5758import java .util .concurrent .atomic .AtomicReference ;
59+ import java .util .regex .Matcher ;
60+ import java .util .regex .Pattern ;
61+
5862/**
5963 * @author yanxi
6064 */
6165public class RedisAllReqRow extends BaseAllReqRow {
6266
6367 private static final long serialVersionUID = 7578879189085344807L ;
6468
69+ private static final Pattern HOST_PORT_PATTERN = Pattern .compile ("(?<host>(.*)):(?<port>\\ d+)*" );
70+
6571 private static final Logger LOG = LoggerFactory .getLogger (RedisAllReqRow .class );
6672
6773 private static final int CONN_RETRY_NUM = 3 ;
@@ -72,9 +78,9 @@ public class RedisAllReqRow extends BaseAllReqRow {
7278
7379 private RedisSideTableInfo tableInfo ;
7480
75- private AtomicReference <Map <String , Map <String , String >>> cacheRef = new AtomicReference <>();
81+ private final AtomicReference <Map <String , Map <String , String >>> cacheRef = new AtomicReference <>();
7682
77- private RedisSideReqRow redisSideReqRow ;
83+ private final RedisSideReqRow redisSideReqRow ;
7884
7985 public RedisAllReqRow (RowTypeInfo rowTypeInfo , JoinInfo joinInfo , List <FieldInfo > outFieldInfoList , AbstractSideTableInfo sideTableInfo ) {
8086 super (new RedisAllSideInfo (rowTypeInfo , joinInfo , outFieldInfoList , sideTableInfo ));
@@ -112,9 +118,9 @@ protected void reloadCache() {
112118 public void flatMap (BaseRow input , Collector <BaseRow > out ) throws Exception {
113119 GenericRow genericRow = (GenericRow ) input ;
114120 Map <String , Object > inputParams = Maps .newHashMap ();
115- for (Integer conValIndex : sideInfo .getEqualValIndex ()){
121+ for (Integer conValIndex : sideInfo .getEqualValIndex ()) {
116122 Object equalObj = genericRow .getField (conValIndex );
117- if (equalObj == null ){
123+ if (equalObj == null ) {
118124 if (sideInfo .getJoinType () == JoinType .LEFT ) {
119125 BaseRow data = fillData (input , null );
120126 RowDataComplete .collectBaseRow (out , data );
@@ -128,11 +134,11 @@ public void flatMap(BaseRow input, Collector<BaseRow> out) throws Exception {
128134
129135 Map <String , String > cacheMap = cacheRef .get ().get (key );
130136
131- if (cacheMap == null ){
132- if (sideInfo .getJoinType () == JoinType .LEFT ){
137+ if (cacheMap == null ) {
138+ if (sideInfo .getJoinType () == JoinType .LEFT ) {
133139 BaseRow data = fillData (input , null );
134140 RowDataComplete .collectBaseRow (out , data );
135- }else {
141+ } else {
136142 return ;
137143 }
138144
@@ -147,7 +153,7 @@ private void loadData(Map<String, Map<String, String>> tmpCache) throws SQLExcep
147153 JedisCommands jedis = null ;
148154 try {
149155 StringBuilder keyPattern = new StringBuilder (tableInfo .getTableName ());
150- for (String key : tableInfo .getPrimaryKeys ()) {
156+ for (int i = 0 ; i < tableInfo .getPrimaryKeys (). size (); i ++ ) {
151157 keyPattern .append ("_" ).append ("*" );
152158 }
153159 jedis = getJedisWithRetry (CONN_RETRY_NUM );
@@ -182,41 +188,54 @@ private JedisCommands getJedis(RedisSideTableInfo tableInfo) {
182188 String url = tableInfo .getUrl ();
183189 String password = tableInfo .getPassword ();
184190 String database = tableInfo .getDatabase () == null ? "0" : tableInfo .getDatabase ();
191+ String masterName = tableInfo .getMasterName ();
185192 int timeout = tableInfo .getTimeout ();
186- if (timeout == 0 ){
187- timeout = 1000 ;
193+ if (timeout == 0 ) {
194+ timeout = 10000 ;
188195 }
189196
190- String [] nodes = StringUtils .split (url , "," );
191- String [] firstIpPort = StringUtils .split (nodes [0 ], ":" );
192- String firstIp = firstIpPort [0 ];
193- String firstPort = firstIpPort [1 ];
194- Set <HostAndPort > addresses = new HashSet <>();
195- Set <String > ipPorts = new HashSet <>();
196- for (String ipPort : nodes ) {
197- ipPorts .add (ipPort );
198- String [] ipPortPair = ipPort .split (":" );
199- addresses .add (new HostAndPort (ipPortPair [0 ].trim (), Integer .valueOf (ipPortPair [1 ].trim ())));
200- }
201- if (timeout == 0 ){
202- timeout = 1000 ;
203- }
197+ String [] nodes = url .split ("," );
204198 JedisCommands jedis = null ;
205199 GenericObjectPoolConfig poolConfig = setPoolConfig (tableInfo .getMaxTotal (), tableInfo .getMaxIdle (), tableInfo .getMinIdle ());
206- switch (RedisType .parse (tableInfo .getRedisType ())){
207- //单机
200+ switch (RedisType .parse (tableInfo .getRedisType ())) {
208201 case STANDALONE :
209- pool = new JedisPool (poolConfig , firstIp , Integer .parseInt (firstPort ), timeout , password , Integer .parseInt (database ));
210- jedis = pool .getResource ();
202+ String firstIp = null ;
203+ String firstPort = null ;
204+ Matcher standalone = HOST_PORT_PATTERN .matcher (nodes [0 ]);
205+ if (standalone .find ()) {
206+ firstIp = standalone .group ("host" ).trim ();
207+ firstPort = standalone .group ("port" ).trim ();
208+ }
209+ if (Objects .nonNull (firstIp )) {
210+ pool = new JedisPool (poolConfig , firstIp , Integer .parseInt (firstPort ), timeout , password , Integer .parseInt (database ));
211+ jedis = pool .getResource ();
212+ } else {
213+ throw new IllegalArgumentException (
214+ String .format ("redis url error. current url [%s]" , nodes [0 ]));
215+ }
211216 break ;
212- //哨兵
213217 case SENTINEL :
214- jedisSentinelPool = new JedisSentinelPool (tableInfo .getMasterName (), ipPorts , poolConfig , timeout , password , Integer .parseInt (database ));
218+ Set <String > ipPorts = new HashSet <>(Arrays .asList (nodes ));
219+ jedisSentinelPool = new JedisSentinelPool (masterName , ipPorts , poolConfig , timeout , password , Integer .parseInt (database ));
215220 jedis = jedisSentinelPool .getResource ();
216221 break ;
217- //集群
218222 case CLUSTER :
219- jedis = new JedisCluster (addresses , timeout , timeout ,1 , poolConfig );
223+ Set <HostAndPort > addresses = new HashSet <>();
224+ // 对ipv6 支持
225+ for (String node : nodes ) {
226+ Matcher matcher = HOST_PORT_PATTERN .matcher (node );
227+ if (matcher .find ()) {
228+ String host = matcher .group ("host" ).trim ();
229+ String portStr = matcher .group ("port" ).trim ();
230+ if (org .apache .commons .lang3 .StringUtils .isNotBlank (host ) && org .apache .commons .lang3 .StringUtils .isNotBlank (portStr )) {
231+ // 转化为int格式的端口
232+ int port = Integer .parseInt (portStr );
233+ addresses .add (new HostAndPort (host , port ));
234+ }
235+ }
236+ }
237+ jedis = new JedisCluster (addresses , timeout , timeout , 10 , password , poolConfig );
238+ break ;
220239 default :
221240 break ;
222241 }
@@ -244,35 +263,32 @@ private JedisCommands getJedisWithRetry(int retryNum) {
244263 return null ;
245264 }
246265
247- private Set <String > getRedisKeys (RedisType redisType , JedisCommands jedis , String keyPattern ){
248- if (!redisType .equals (RedisType .CLUSTER )){
266+ private Set <String > getRedisKeys (RedisType redisType , JedisCommands jedis , String keyPattern ) {
267+ if (!redisType .equals (RedisType .CLUSTER )) {
249268 return ((Jedis ) jedis ).keys (keyPattern );
250269 }
251270 Set <String > keys = new TreeSet <>();
252- Map <String , JedisPool > clusterNodes = ((JedisCluster )jedis ).getClusterNodes ();
253- for (String k : clusterNodes .keySet ()){
271+ Map <String , JedisPool > clusterNodes = ((JedisCluster ) jedis ).getClusterNodes ();
272+ for (String k : clusterNodes .keySet ()) {
254273 JedisPool jp = clusterNodes .get (k );
255- Jedis connection = jp .getResource ();
256- try {
274+ try (Jedis connection = jp .getResource ()) {
257275 keys .addAll (connection .keys (keyPattern ));
258- } catch (Exception e ){
259- LOG .error ("Getting keys error: {}" , e );
260- } finally {
261- connection .close ();
276+ } catch (Exception e ) {
277+ LOG .error ("Getting keys error" , e );
262278 }
263279 }
264280 return keys ;
265281 }
266282
267- private GenericObjectPoolConfig setPoolConfig (String maxTotal , String maxIdle , String minIdle ){
283+ private GenericObjectPoolConfig setPoolConfig (String maxTotal , String maxIdle , String minIdle ) {
268284 GenericObjectPoolConfig config = new GenericObjectPoolConfig ();
269- if (maxTotal != null ){
285+ if (maxTotal != null ) {
270286 config .setMaxTotal (Integer .parseInt (maxTotal ));
271287 }
272- if (maxIdle != null ){
288+ if (maxIdle != null ) {
273289 config .setMaxIdle (Integer .parseInt (maxIdle ));
274290 }
275- if (minIdle != null ){
291+ if (minIdle != null ) {
276292 config .setMinIdle (Integer .parseInt (minIdle ));
277293 }
278294 return config ;
0 commit comments