19
19
package com .dtstack .flink .sql .sink .redis ;
20
20
21
21
import com .dtstack .flink .sql .outputformat .AbstractDtRichOutputFormat ;
22
+ import com .dtstack .flink .sql .sink .redis .enums .RedisType ;
23
+ import com .google .common .collect .Maps ;
22
24
import org .apache .commons .lang3 .StringUtils ;
23
25
import org .apache .commons .pool2 .impl .GenericObjectPoolConfig ;
24
26
import org .apache .flink .api .common .typeinfo .TypeInformation ;
35
37
36
38
import java .io .Closeable ;
37
39
import java .io .IOException ;
38
- import java .util .HashMap ;
39
- import java .util .HashSet ;
40
- import java .util .LinkedList ;
41
- import java .util .List ;
42
- import java .util .Set ;
40
+ import java .util .*;
43
41
44
42
/**
45
43
* @author yanxi
@@ -49,7 +47,7 @@ public class RedisOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
49
47
50
48
private String url ;
51
49
52
- private String database ;
50
+ private String database = "0" ;
53
51
54
52
private String tableName ;
55
53
@@ -71,7 +69,7 @@ public class RedisOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
71
69
72
70
protected List <String > primaryKeys ;
73
71
74
- protected int timeout ;
72
+ protected int timeout = 10000 ;
75
73
76
74
private JedisPool pool ;
77
75
@@ -121,29 +119,21 @@ private void establishConnection() {
121
119
String [] ipPortPair = StringUtils .split (ipPort , ":" );
122
120
addresses .add (new HostAndPort (ipPortPair [0 ].trim (), Integer .valueOf (ipPortPair [1 ].trim ())));
123
121
}
124
- if (timeout == 0 ){
125
- timeout = 10000 ;
126
- }
127
- if (database == null )
128
- {
129
- database = "0" ;
130
- }
131
122
132
- switch (redisType ){
133
- //单机
134
- case 1 :
123
+ switch (RedisType .parse (redisType )){
124
+ case STANDALONE :
135
125
pool = new JedisPool (poolConfig , firstIp , Integer .parseInt (firstPort ), timeout , password , Integer .parseInt (database ));
136
126
jedis = pool .getResource ();
137
127
break ;
138
- //哨兵
139
- case 2 :
128
+ case SENTINEL :
140
129
jedisSentinelPool = new JedisSentinelPool (masterName , ipPorts , poolConfig , timeout , password , Integer .parseInt (database ));
141
130
jedis = jedisSentinelPool .getResource ();
142
131
break ;
143
- //集群
144
- case 3 :
132
+ case CLUSTER :
145
133
jedis = new JedisCluster (addresses , timeout , timeout , 10 , password , poolConfig );
134
+ break ;
146
135
default :
136
+ throw new RuntimeException ("unsupport redis type[ " + redisType + "]" );
147
137
}
148
138
}
149
139
@@ -158,36 +148,14 @@ public void writeRecord(Tuple2 record) throws IOException {
158
148
if (row .getArity () != fieldNames .length ) {
159
149
return ;
160
150
}
161
-
162
- HashMap <String , Integer > map = new HashMap <>(8 );
163
- for (String primaryKey : primaryKeys ) {
164
- for (int i = 0 ; i < fieldNames .length ; i ++) {
165
- if (fieldNames [i ].equals (primaryKey )) {
166
- map .put (primaryKey , i );
167
- }
168
- }
169
- }
170
-
171
- List <String > kvList = new LinkedList <>();
172
- for (String primaryKey : primaryKeys ){
173
- StringBuilder primaryKv = new StringBuilder ();
174
- int index = map .get (primaryKey ).intValue ();
175
- primaryKv .append (primaryKey ).append (":" ).append (row .getField (index ));
176
- kvList .add (primaryKv .toString ());
177
- }
178
-
179
- String perKey = String .join (":" , kvList );
180
- for (int i = 0 ; i < fieldNames .length ; i ++) {
181
- StringBuilder key = new StringBuilder ();
182
- key .append (tableName ).append (":" ).append (perKey ).append (":" ).append (fieldNames [i ]);
183
-
184
- String value = "null" ;
185
- Object field = row .getField (i );
186
- if (field != null ) {
187
- value = field .toString ();
188
- }
189
- jedis .set (key .toString (), value );
151
+ Map <String , Object > refData = Maps .newHashMap ();
152
+ for (int i = 0 ; i < fieldNames .length ; i ++){
153
+ refData .put (fieldNames [i ], row .getField (i ));
190
154
}
155
+ String redisKey = buildCacheKey (refData );
156
+ refData .entrySet ().forEach (e ->{
157
+ jedis .hset (redisKey , e .getKey (), String .valueOf (e .getValue ()));
158
+ });
191
159
192
160
if (outRecords .getCount () % ROW_PRINT_FREQUENCY == 0 ){
193
161
LOG .info (record .toString ());
@@ -211,6 +179,17 @@ public void close() throws IOException {
211
179
212
180
}
213
181
182
+ public String buildCacheKey (Map <String , Object > refData ) {
183
+ StringBuilder keyBuilder = new StringBuilder (tableName );
184
+ for (String primaryKey : primaryKeys ){
185
+ if (!refData .containsKey (primaryKey )){
186
+ return null ;
187
+ }
188
+ keyBuilder .append ("_" ).append (refData .get (primaryKey ));
189
+ }
190
+ return keyBuilder .toString ();
191
+ }
192
+
214
193
public static RedisOutputFormatBuilder buildRedisOutputFormat (){
215
194
return new RedisOutputFormatBuilder ();
216
195
}
0 commit comments