Skip to content

Commit 3981ce0

Browse files
authored
feat: Pull message process (doocs#121)
* feat: add pull message process
1 parent 94f9e33 commit 3981ce0

File tree

2 files changed

+117
-0
lines changed

2 files changed

+117
-0
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,7 @@
283283
- [RocketMQ IndexFile详解](docs/rocketmq/rocketmq-indexfile.md)
284284
- [RocketMQ 消费者启动流程](docs/rocketmq/rocketmq-consumer-start.md)
285285
- [RocketMQ 消息拉取流程](docs/rocketmq/rocketmq-pullmessage.md)
286+
- [RocketMQ Broker 处理拉取消息请求流程](docs/rocketmq/rocketmq-pullmessage-processor.md)
286287

287288
## 番外篇(JDK 1.8)
288289

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
该文所涉及的 RocketMQ 源码版本为 4.9.3。
2+
3+
# RocketMQ broker 处理拉取消息请求流程
4+
5+
org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.ChannelHandlerContext, org.apache.rocketmq.remoting.protocol.RemotingCommand)
6+
7+
第 1 步、`校验broker是否可读`
8+
9+
```java
10+
if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
11+
response.setCode(ResponseCode.NO_PERMISSION);
12+
response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
13+
return response;
14+
}
15+
```
16+
17+
第 2 步、`根据消费组获取订阅信息`
18+
19+
```java
20+
SubscriptionGroupConfig subscriptionGroupConfig =
21+
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
22+
```
23+
24+
第 3 步、`校验是否允许消费`
25+
26+
```java
27+
if (!subscriptionGroupConfig.isConsumeEnable()) {
28+
response.setCode(ResponseCode.NO_PERMISSION);
29+
response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());
30+
return response;
31+
}
32+
```
33+
34+
第 4 步、`根据主题获取对应的配置信息`
35+
36+
```java
37+
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
38+
if (null == topicConfig) {
39+
log.error("the topic {} not exist, consumer: {}", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
40+
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
41+
response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
42+
return response;
43+
}
44+
```
45+
46+
第 5 步、`校验主题对应的队列`
47+
48+
```java
49+
if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
50+
String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
51+
requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());
52+
log.warn(errorInfo);
53+
response.setCode(ResponseCode.SYSTEM_ERROR);
54+
response.setRemark(errorInfo);
55+
return response;
56+
}
57+
```
58+
59+
第 6 步、`如果配置了消息过滤表达式,根据表达式进行构建consumerFilterData,如果没有,则根据主题构建`
60+
61+
```java
62+
consumerFilterData = ConsumerFilterManager.build(
63+
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(),
64+
requestHeader.getExpressionType(), requestHeader.getSubVersion()
65+
66+
consumerFilterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(),
67+
requestHeader.getConsumerGroup());
68+
```
69+
70+
7 步、`校验如果不是Tag过滤,是否开启了自定义属性过滤,如果没有开启,不允许操作 只有使用push推送模式的消费者才能用使用SQL92标准的sql语句,pull拉取模式的消费者是不支持这个功能的。`
71+
72+
```java
73+
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())
74+
&& !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {
75+
response.setCode(ResponseCode.SYSTEM_ERROR);
76+
response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType());
77+
return response;
78+
}
79+
```
80+
81+
8 步、`根据是否支持重试过滤创建不同的MessageFilter`
82+
83+
```java
84+
if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
85+
messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
86+
this.brokerController.getConsumerFilterManager());
87+
} else {
88+
messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
89+
this.brokerController.getConsumerFilterManager());
90+
}
91+
```
92+
93+
9 步、`根据消费组、主题、队列、偏移量、最大拉取消息数量、消息过滤器查找信息`
94+
95+
```java
96+
final GetMessageResult getMessageResult =
97+
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
98+
requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
99+
100+
```
101+
102+
10 步、`消息为空 设置code为系统错误 返回response`
103+
104+
```java
105+
response.setCode(ResponseCode.SYSTEM_ERROR);
106+
response.setRemark("store getMessage return null");
107+
```
108+
109+
11 步、`提交偏移量`
110+
111+
```java
112+
if (storeOffsetEnable) {
113+
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
114+
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
115+
}
116+
```

0 commit comments

Comments
 (0)