从emq实时读取数据
主流版本
Sync | emqxsource、emqxreader |
---|---|
SQL | emqx-x |
- broker
- 描述:连接URL信息
- 必选:是
- 参数类型:string
- 默认值:无
- topic
- 描述:订阅主题
- 必选:是
- 参数类型:string
- 默认值:无
- username
- 描述:认证用户名
- 必选:否
- 参数类型:string
- 默认值:无
- password
- 描述:认证密码
- 必选:否
- 参数类型:string
- 默认值:无
- isCleanSession
- 描述:是否清除session
- false:MQTT服务器保存于客户端会话的的主题与确认位置;
- true:MQTT服务器不保存于客户端会话的的主题与确认位置
- 必选:否
- 参数类型:boolean
- 默认值:true
- 描述:是否清除session
- qos
- 描述:服务质量
- 0:AT_MOST_ONCE,至多一次;
- 1:AT_LEAST_ONCE,至少一次;
- 2:EXACTLY_ONCE,精准一次;
- 必选:否
- 参数类型:int
- 默认值:2
- 描述:服务质量
- codec
- 描述:编码解码器类型,支持 json、plain
- plain:将kafka获取到的消息字符串存储到一个key为message的map中,如:
{"message":"{\"key\": \"key\", \"message\": \"value\"}"}
- plain:将kafka获取到的消息字符串按照json格式进行解析
- 若该字符串为json格式
- 当其中含有message字段时,原样输出,如:
{"key": "key", "message": "value"}
- 当其中不包含message字段时,增加一个key为message,value为原始消息字符串的键值对,如:
{"key": "key", "value": "value", "message": "{\"key\": \"key\", \"value\": \"value\"}"}
- 当其中含有message字段时,原样输出,如:
- 若改字符串不为json格式,则按照plain类型进行处理
- 若该字符串为json格式
- plain:将kafka获取到的消息字符串存储到一个key为message的map中,如:
- 必选:否
- 参数类型:string
- 默认值:plain
- 描述:编码解码器类型,支持 json、plain
- connector
- 描述:emqx-x
- 必选:是
- 参数类型:String
- 默认值:无
- broker
- 描述:连接信息tcp://localhost:1883
- 必选:是
- 参数类型:String
- 默认值:无
- topic
- 描述:主题
- 必选:是
- 参数类型:String
- 默认值:无
- isCleanSession
- 描述:是否清除session
- false:MQTT服务器保存于客户端会话的的主题与确认位置;
- true:MQTT服务器不保存于客户端会话的的主题与确认位置
- 必选:否
- 参数类型:String
- 默认值:true
- 描述:是否清除session
- qos
- 描述:服务质量
- 0:AT_MOST_ONCE,至多一次;
- 1:AT_LEAST_ONCE,至少一次;
- 2:EXACTLY_ONCE,精准一次;
- 必选:否
- 参数类型:String
- 默认值:2
- 描述:服务质量
- username
- 描述:username
- 必选:否
- 参数类型:String
- 默认值:无
- password
- 描述:password
- 必选:否
- 参数类型:String
- 默认值:无
- format
- 描述:数据来源格式
- 必选:否
- 参数类型:String
- 默认值:json
支持 | BOOLEAN、TINYINT、SMALLINT、INT、BIGINT、FLOAT、DOUBLE、DECIMAL、STRING、VARCHAR、CHAR、TIMESTAMP、DATE、BINARY、ARRAY、MAP、STRUCT、LIST、ROW |
---|---|
暂不支持 | 其他 |
见项目内chunjun-examples
文件夹。