Based on the bahir-flink,the contents adjusted relative to bahir include:
- Use Lettuce to replace Jedis, change sync IO to async IO
- Add Table/SQL API, Add dimension table query support
- Increase query cache (support incremental and full)
- Added support for saving the entire row, which is used for multi-field dimension table associated query
- Add current limiting function for Flink SQL online debugging function
- Added support for Flink higher versions (including 1.12, 1.13, 1.14+)
- Unify or increase other functions such as expiration policy, number of concurrent writes, etc.
Because the version of the flink interface used by bahir is relatively old, the changes are relatively large. During the development process, the stream computing products of Tencent Cloud and Alibaba Cloud were referenced, taking the advantages of the two, and adding richer functions.
The command value corresponds to the redis command:
command | insert | select | join | delete(Flink CDC's RowKind.delete) |
---|---|---|---|---|
set | set | get | get | del |
hset | hset | hget | hget | hdel |
get | set | get | get | del |
hset | hset | hget | hget | hdel |
rpush | rpush | lrange | ||
lpush | lpush | lrange | ||
incrBy incrByFloat | incrBy incrByFloat | get | get | eg:incrby 2 -> incryby -2 |
hincrBy hincryByFloat | hincrBy hincryByFloat | hget | hget | eg:hincrby 2 -> hincryby -2 |
zincrby | zincrby | zscore | zscore | eg:zincrby 2 -> zincryby -2 |
sadd | sadd | srandmember 10 | srem | |
zadd | zadd | zscore | zscore | zrem |
pfadd(hyperloglog) | pfadd(hyperloglog) | |||
publish | publish | |||
zrem | zrem | zscore | zscore | |
srem | srem | srandmember 10 | ||
del | del | get | get | |
hdel | hdel | hget | hget | |
decrBy | decrBy | get | get |
Note: The cdc update operation has the same effect as the cdc insert.
After executing mvn package on the command line, import the generated package flink-connector-redis-1.3.2.jar into flink lib, no other settings are required.
The project depends on Lettuce(6.2.1) and netty-transport-native-epoll(4.1.82.Final),flink-connection-redis-1.4.2.jar if these packages are available. Otherwise, use flink-connector-redis-1.4.2-jar-with-dependencies.jar.
Development environment engineering direct reference:
<dependency>
<groupId>io.github.jeff-zou</groupId>
<artifactId>flink-connector-redis</artifactId>
<version>1.4.2</version>
<!-- When the Lettuce netty-transport-native-epoll dependency is not imported separately -->
<!-- <classifier>jar-with-dependencies</classifier>-->
</dependency>
There is no need to map the key in redis through the primary key, the key is directly determined by the order of the fields in the ddl, such as:
#Where username is the key and passport is the value.
create table sink_redis(username VARCHAR, passport VARCHAR) with ('command'='set')
#The name is the key of the map structure, the subject is the field, and the score is the value.
create table sink_redis(name VARCHAR, subject VARCHAR, score VARCHAR) with ('command'='hset')
value is taken from the entire row, separated by '\01'
create table sink_redis(username VARCHAR, passport VARCHAR) with ('command'='set')
#key: username , value: username\01passport.
create table sink_redis(name VARCHAR, subject VARCHAR, score VARCHAR) with ('command'='hset')
key: name, field:subject, value: name\01subject\01score.
Field | Default | Type | Description |
---|---|---|---|
connector | (none) | String | redis |
host | (none) | String | Redis IP |
port | 6379 | Integer | Redis port |
password | null | String | null if not set |
database | 0 | Integer | db0 is used by default |
timeout | 2000 | Integer | Connection timeout, in ms, default 1s |
cluster-nodes | (none) | String | Cluster ip and port, not empty when redis-mode is cluster, such as:10.11.80.147:7000,10.11.80.147:7001,10.11.80.147:8000 |
command | (none) | String | Corresponds to the redis command above |
redis-mode | (none) | Integer | mode type: single cluster |
lookup.cache.max-rows | -1 | Integer | Query cache size, reduce the query for redis duplicate keys |
lookup.cache.ttl | -1 | Integer | Query cache expiration time, in seconds. The condition for enabling query cache is that neither max-rows nor ttl can be -1 |
lookup.max-retries | 1 | Integer | Number of retries on failed query |
lookup.cache.load-all | false | Boolean | when command is hget, query all elements from redis map to cache,help to resolve cache penetration issues |
sink.max-retries | 1 | Integer | Number of retries for write failures |
value.data.structure | column | String | column: The value will come from a field (for example, set: key is the first field defined by DDL, and value is the second field) row: value is taken from the entire row, separated by '\01' |
set.if.absent | false | Boolean | set/hset only when the key absent |
io.pool.size | (none) | Integer | the size of io thread pool for Lettuce's netty. By default, this is the number of threads currently available to the JVM and is greater than 2 |
event.pool.size | (none) | Integer | the size of event thread pool for Lettuce's netty. By default, this is the number of threads currently available to the JVM and is greater than 2 |
scan.key | (none) | String | redis key |
scan.addition.key | (none) | String | redis addition key,eg: map hashfield |
scan.range.start | (none) | Integer | lrange start |
scan.range.stop | (none) | Integer | lrange start |
scan.count | (none) | Integer | srandmember count |
zset.zremrangeby | (none) | String | After executing zadd, whether to execute zremrangeby,Valid values are:SCORE、LEX、RANK |
Field | Default | Type | Description |
---|---|---|---|
ttl | (none) | Integer | key expiration time (seconds), each time sink will set the ttl |
ttl.on.time | (none) | String | The expiration time of the key in LocalTime.toString(), eg: 10:00 12:12:01, if ttl is not configured |
ttl.key.not.absent | false | boolean | Used with ttl, which is set when the key doesn't exist |
Field | Default | Type | Description |
---|---|---|---|
sink.limit | false | Boolean | if open the limit for sink |
sink.limit.max-num | 10000 | Integer | the max num of writes per thread |
sink.limit.interval | 100 | String | the millisecond interval between each write per thread |
sink.limit.max-online | 30 * 60 * 1000L | Long | the max online milliseconds per thread |
Field | Default | Type | Description |
---|---|---|---|
master.name | (none) | String | master name |
sentinels.info | (none) | String | |
sentinels.password | none) | String |
flink type | redis row converter |
---|---|
CHAR | String |
VARCHAR | String |
String | String |
BOOLEAN | String String.valueOf(boolean val) boolean Boolean.valueOf(String str) |
BINARY | String Base64.getEncoder().encodeToString byte[] Base64.getDecoder().decode(String str) |
VARBINARY | String Base64.getEncoder().encodeToString byte[] Base64.getDecoder().decode(String str) |
DECIMAL | String BigDecimal.toString DecimalData DecimalData.fromBigDecimal(new BigDecimal(String str),int precision, int scale) |
TINYINT | String String.valueOf(byte val) byte Byte.valueOf(String str) |
SMALLINT | String String.valueOf(short val) short Short.valueOf(String str) |
INTEGER | String String.valueOf(int val) int Integer.valueOf(String str) |
DATE | String the day from epoch as int date show as 2022-01-01 |
TIME | String the millisecond from 0'clock as int time show as 04:04:01.023 |
BIGINT | String String.valueOf(long val) long Long.valueOf(String str) |
FLOAT | String String.valueOf(float val) float Float.valueOf(String str) |
DOUBLE | String String.valueOf(double val) double Double.valueOf(String str) |
TIMESTAMP | String the millisecond from epoch as long timestamp TimeStampData.fromEpochMillis(Long.valueOf(String str)) |
create table sink_redis(name varchar, level varchar, age varchar) with ( 'connector'='redis', 'host'='10.11.80.147','port'='7001', 'redis-mode'='single','password'='******','command'='hset');
-- Insert data into redis first, which is equivalent to the redis command: hset 3 3 100 --
insert into sink_redis select * from (values ('3', '3', '100'));
create table dim_table (name varchar, level varchar, age varchar) with ('connector'='redis', 'host'='10.11.80.147','port'='7001', 'redis-mode'='single', 'password'='*****','command'='hget', 'maxIdle'='2', 'minIdle'='1', 'lookup.cache.max-rows'='10', 'lookup.cache.ttl'='10', 'lookup.max-retries'='3');
-- Randomly generate data within 10 as data source --
-- One of the data will be: username = 3 level = 3, which will be associated with the data inserted above --
create table source_table (username varchar, level varchar, proctime as procTime()) with ('connector'='datagen', 'rows-per-second'='1', 'fields.username.kind'='sequence', 'fields.username.start'='1', 'fields.username.end'='10', 'fields.level.kind'='sequence', 'fields.level.start'='1', 'fields.level.end'='10');
create table sink_table(username varchar, level varchar,age varchar) with ('connector'='print');
insert into
sink_table
select
s.username,
s.level,
d.age
from
source_table s
left join dim_table for system_time as of s.proctime as d on
d.name = s.username
and d.level = s.level;
-- The line where username is 3 will be associated with the value in redis, and the output will be: 3,3,100
In many cases, dimension tables have multiple fields. This example shows how to use 'value.data.structure'='row' to write multiple fields and associative queries.
-- init data in redis --
create table sink_redis(uid VARCHAR, score double, score2 double ) with ( 'connector'='redis', 'host'='10.11.69.176','port'='6379', 'redis-mode'='single','password'='***','command'='SET', 'value.data.structure'='row')
insert into sink_redis select * from (values ('1', 10.3, 10.1))
-- 'value.data.structure'='row':value is taken from the entire row and separated by '\01'
-- the value in redis will be: "1\x0110.3\x0110.1" --
-- init data in redis end --
-- create join table --
create table join_table with ('command'='get', 'value.data.structure'='row') like sink_redis
-- create result table --
create table result_table(uid VARCHAR, username VARCHAR, score double, score2 double) with ('connector'='print')
-- create source table --
create table source_table(uid VARCHAR, username VARCHAR, proc_time as procTime()) with ('connector'='datagen', 'fields.uid.kind'='sequence', 'fields.uid.start'='1', 'fields.uid.end'='2')
-- query --
insert
into
result_table
select
s.uid,
s.username,
j.score,
j.score2
from
source_table as s
join join_table for system_time as of s.proc_time as j on
j.uid = s.uid
result:
2> +I[2, 1e0fe885a2990edd7f13dd0b81f923713182d5c559b21eff6bda3960cba8df27c69a3c0f26466efaface8976a2e16d9f68b3, null, null]
1> +I[1, 30182e00eca2bff6e00a2d5331e8857a087792918c4379155b635a3cf42a53a1b8f3be7feb00b0c63c556641423be5537476, 10.3, 10.1]
-
Sample code path: src/test/java/org.apache.flink.streaming.connectors.redis.datastream.DataStreamTest.java
hset example, equivalent to redis command: hset tom math 150
Configuration configuration = new Configuration();
configuration.setString(REDIS_MODE, REDIS_CLUSTER);
configuration.setString(REDIS_COMMAND, RedisCommand.HSET.name());
RedisSinkMapper redisMapper = (RedisSinkMapper)RedisHandlerServices
.findRedisHandler(RedisMapperHandler.class, configuration.toMap())
.createRedisMapper(configuration);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
GenericRowData genericRowData = new GenericRowData(3);
genericRowData.setField(0, "tom");
genericRowData.setField(1, "math");
genericRowData.setField(2, "152");
DataStream<GenericRowData> dataStream = env.fromElements(genericRowData, genericRowData);
RedisSinkOptions redisSinkOptions =
new RedisSinkOptions.Builder().setMaxRetryTimes(3).build();
FlinkConfigBase conf =
new FlinkSingleConfig.Builder()
.setHost(REDIS_HOST)
.setPort(REDIS_PORT)
.setPassword(REDIS_PASSWORD)
.build();
RedisSinkFunction redisSinkFunction =
new RedisSinkFunction<>(conf, redisMapper, redisSinkOptions, resolvedSchema);
dataStream.addSink(redisSinkFunction).setParallelism(1);
env.execute("RedisSinkTest");
-
Sample code path: src/test/java/org.apache.flink.streaming.connectors.redis.table.SQLInsertTest.java
set example, equivalent to redis command: set test test11
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);
String ddl = "create table sink_redis(username VARCHAR, passport VARCHAR) with ( 'connector'='redis', " +
"'cluster-nodes'='10.11.80.147:7000,10.11.80.147:7001','redis- mode'='cluster','password'='******','command'='set')" ;
tEnv.executeSql(ddl);
String sql = " insert into sink_redis select * from (values ('test', 'test11'))";
TableResult tableResult = tEnv.executeSql(sql);
tableResult.getJobClient().get()
.getJobExecutionResult()
.get();
ide: IntelliJ IDEA
code format: google-java-format + Save Actions
code check: CheckStyle
flink 1.12/1.13/1.14+
jdk1.8 Lettuce 6.2.1
<dependency>
<groupId>io.github.jeff-zou</groupId>
<artifactId>flink-connector-redis</artifactId>
<version>1.1.1-1.12</version>
</dependency>