-
Notifications
You must be signed in to change notification settings - Fork 174
Java客户端使用指南
-
如果是spring boot项目,添加如下Maven依赖:
<dependency> <groupId>com.ppdai.infrastructure</groupId> <artifactId>mq-client-springboot</artifactId> <version>*****</version>(推荐最新版) </dependency>
-
如果是spring项目,添加如下Maven依赖:
<dependency> <groupId>com.ppdai.infrastructure</groupId> <artifactId>mq-client-spring</artifactId> <version>*****</version>(推荐最新版) </dependency>
PMQ的管理界面上方,可以查看客户端最新版本。
-
添加配置(producer和consumer都需要在application.properties中添加如下配置,支持yml)
//添加服务端broker的域名地址 mq.broker.url=http://localhost:8080
-
为topic生成token(这一步为非必须步骤)
为了防止topic乱发的情况,我们可以在“消息主题管理”页面为我们的topic生成token令牌,步骤如下所示:
(1)生成token
进入“消息主题管理”页面,在我们的topic:test1的最右侧,点击“生成token”按钮,然后在上图的token列就可以看到为test1生成的token。
(2)为topic:test1添加token配置 在application.properties中添加如下配置:
test1-token="上一步生成的token"
-
发消息
import org.springframework.core.env.Environment; @Autowired private Environment env; MqClient.publish("test1", env.getProperty("test1-token", ""), new ProducerDataDto(""));
其中“test1”是我们发送消息的topic。
env.getProperty("test1-token", "")用于读取我们为test1添加的token配置。
new ProducerDataDto("")用于创建我们的消息。
用户可以通过接口调用,进行消息的发送。格式如下:
接口:localhost:8080/api/client/consumer/publish(服务端broker的调用接口)
参数:
{
"topicName": "testProxy",
"msgs": [{
"bizId":"12345",
"tag":"678",
"head":{"key":"value"},
"body":"body"
}],
"lan":"python",
"sdkVersion":"1",
"clientIp":"sendIp"
}
说明:sendIp是你的发送Ip。
-
添加配置(producer和consumer都需要在application.properties中添加如下配置,支持yml)
//添加服务端broker的域名地址 mq.broker.url=http://localhost:8080
-
添加订阅关系
-
添加订阅关系配置文件(/src/main/resources/messageQueue/messageQueue.xml)
<?xml version="1.0" encoding="UTF-8" ?> <messageQueue> <consumer groupName="test1sub" alarmEmails="****@****.com"> <topics> <topic name="test1" receiverType="com.ppdai.infrastructure.demo.TestSub"></topic> <topic name="test4" receiverType="com.ppdai.infrastructure.demo.TestSub"></topic> </topics> </consumer> </messageQueue>
-
在PMQ的portal管理界面上,创建consumerGroup:test1sub、创建topic:test1和test2、创建test1sub与test1和test2的订阅关系(具体步骤请参考界面操作指南)。
其中 receiverType指定的是topic的消息处理类(每个topic都可以指定自己的处理类,也可以共用一个处理类)。
注意:
-
一个消费者组可以订阅多个topic,消费者组中失败的topic不用订阅(失败会自动生成)。
-
receiverType中配置的类必须存在。
-
-
编写对应的处理类
public class TestSub implements com.ppdai.infrastructure.mq.biz.event.ISubscriber { public TestSub() { } @Override public List<Long> onMessageReceived(List<MessageDto> messages) { // TODO Auto-generated method stub return null; } }
如果处理成功则return null,如果处理失败则返回失败消息的id(失败消息会被发送到对应的失败topic中)。
为了满足其他非Java语言的应用使用PMQ,我们开发了代理服务mq-proxy-2。使用代理模式的步骤如下
- 部署mq-proxy-2
- 在application.properties中添加如下格式的配置
mq.proxy.data=
{
"Test1Sub": {
"ipLst": "11.30.13.111,11.254.220.88",
"exeUrl": "http://localhost:8081/proxy1",
"hsUrl": "http://localhost:8081/hs"
},
"Test2Sub": {
"ipLst": "12.20.19.109,13.254.220.88",
"exeUrl": "http://localhost:8081/proxy1",
"hsUrl": "http://localhost:8081/hs"
}
}
配置说明:
Test1Sub:消费者组的名字(用户根据自己的消费者组填写)。
ipLst:第一步部署的mq-proxy-2实例的ip列表。
exeUrl:用户应用需要暴露给mq-proxy-2的消息处理接口(该接口中,用户需要实现自己的消息处理逻辑)。
hsUrl: 用户应用需要暴露给mq-proxy-2的健康检查接口。