-
Notifications
You must be signed in to change notification settings - Fork 174
PMQ设计
PMQ的消息模型如下:
- ConsumerGroup:表示一些topic集合,被一组consumer订阅,这组consumer就形成了一个消费者组。
- topic:即消息主题,topic包含多个queue(类似于kafka中Partition)。
- Queue: 即存储消息的队列,每个queue对应数据库中的一张表(PMQ的消息存储在数据库中)。
下图是PMQ架构模块的概览:
上图中PMQ架构模块说明如下:
- Broker:PMQ的服务端
- Portal:PMQ的管理界面
- Producers:PMQ的消息生产者
- Consumers:PMQ的消费者
- Metadata:PMQ的元数据库
- Message:PMQ的消息库
Broker是PMQ的服务端,核心功能如下:
- 提供消息发送接口
- 提供消息拉取接口
- 提供心跳注册接口
- 重平衡功能(队列和consumer的重新分配)
- 元数据同步
- 偏移提交接口
Portal:PMQ的管理界面,核心功能如下:
- consumerGroup、topic、数据节点(消息节点)的初始化创建和治理
- 订阅关系的添加(绑定consumerGroup和topic的订阅关系)
- 消费治理(偏移调整、消费启停、消费端监控等)
- 消息查询、失败消息的手动重新发送
- 队列的分配、队列扩容、队列缩容、队列读写类型的设置
- 界面操作的审计日志和权限控制
- 各种监控统计报表
- 历史消息清理的定时器、元数据处理的定时器(portal集群中的实例,会抢占这些定时器,执行清理工作)。
Producers和Consumers属于同一个PMQ客户端,核心功能如下:
- 消息的同步发送和异步发送
- 消息的拉取和消费
- 客户端心跳和偏移提交
- 失败消息重试和发送
Metadata是PMQ的元数据库,类似于rocketmq的namesrv,核心功能如下:
- 存储consumerGroup、topic以及它们的订阅关系
- 存储消息库的节点信息
- 存储queue(消息表)的信息、topic和queue的分配关系
- 存储consumer的消费偏移(即消费位置)
Message是PMQ的消息库,分为普通消息库和失败消息库(库结构相同)。
- 普通消息库用于存储我们发送的常规消息
- 失败消息库用于存储处理失败的消息
PMQ元数据库中,核心表的对应关系如下图所示:
-
DbNode
- 记录消息库的节点信息,每一条记录表示一个消息库节点(记录了消息库的:ip、端口、库名、等连接信息)。
-
Queue
- 记录queue(消息队列)的信息(每个queue对应数据库中的一张表)。记录topic与queue的分配信息。
-
Topic
- 消息主题,拥有多个queue来存储消息。
-
QueueOffset
- 记录consumer和queue的对应关系(即某个consumer消费某个queue的消息)。记录消费偏移(即consumer消费到了什么位置)。
-
Consumer
- 消费者。
-
ConsumerGroup
- 一些topic集合,被一组consumer订阅,这组consumer就形成了一个consumerGroup。
-
ConsumerGroupConsumer
- 记录ConsumerGroup和Consumer的对应关系。
-
ConsumerGroupTopic
- 记录consumerGroup和topic的订阅关系。
几乎所有消息系统都拥有三个核心功能:消息发送、消息存储、消息拉取消费。下面我们简要看下PMQ这三个功能的逻辑实现:
上图简单描述了PMQ消息的发送、存储、消费流程:
- PMQ的客户端(producer),发送消息到服务端Broker。
- 服务端收到消息后,存储到queue中。
- 客户端(consumer),从服务端拉取消息消费。
看到这里是不是有几个疑问:
-
每个topic有多个queue,服务端是怎样把消息存到某个queue中的呢?
-
每个topic有多个queue,consumer怎么知道自己应该拉取那个queue的消息呢?
-
如果某个consumer挂掉了,它对应的queue应该让谁来消费呢?
-
如果某个broker服务端挂了,会有什么影响呢?
下文将通过四个小结来解答上述问题
服务端是怎样存储消息的呢?请看下图:
如上图所示,producer要往TopicA中发消息,消息的发送存储流程如下:
- producer的发送请求,通过负载均衡打到一台broker上。
- broker1收到请求之后,根据TopicA的队列数量和计数器,对queue轮询存储。
consumer怎么知道自己应该拉取那个queue的消息呢?
在consumer注册时,服务端会对consumer进行队列分配,分配逻辑如下:
如上图所示,consumer注册时,服务端会根据consumerGroup下的consumer数量、consumerGroup订阅的所有topic的队列数量,进行队列分配。分配关系记录在queueOffset表中(consumer下线时,清理器会清除掉该consumer的分配记录)。
例如上图中consumerGroup有3个consumer,consumerGroup订阅的topic(topic1,topic2,topic3)一共有6个队列。分配逻辑就是把这6个队列,平均的分配给这3个消费者组。
那么问题来了,如果consumer的数量变化了(增加或者减少),或者某个topic的队列数变化了,就会出现consumer没有分配到queue或者queue没有分配到consumer的问题。怎么解决呢?请看下文的重平衡设计
为了解决consumer和topic数量动态变化造成的问题,我们引入了重平衡(即consumer和queue的动态分配),如下图所示:
如上图所示,重平衡流程如下:
- PMQ有一个重平衡器,它用来监控consumer的加入和退出、topic的扩容和缩容。
- 当某一个consumerGroup下的consumer数量发生变化,或者该consumerGroup订阅的topic的queue数量发生了变化,就会触发重平衡器对该consumerGroup进行重平衡操作。
- 重平衡器对需要重平衡的consumerGroup,进行consumer和queue的重新分配。
如果个别broker服务端挂了怎么办?不用担心,PMQ的broker是无状态的,可以水平扩容。如下图所示:
由于broker是无状态的,所以可以水平扩容、易于升级、个别broker挂无影响。
PMQ通过消息数据库的主备,来保证高可用,如下图所示:
如上图所示:topic可以水平扩容(即增加topic的队列数量)。每一个queue(消息表)都有主备,保证了的高可用。
上图中broker表示PMQ的服务端模块,消费者表示PMQ的消费端模块。
上图简要描述了PMQ消费端的实现原理:
- 消费线程:根据消费偏移(即该消费者当前的消费位置),从broker拉取消息到本地消费队列。
- 应用消费线程:从本地消费队列中拉取消息,进行消费。
- 偏移提交线程:定时提交当前的消费位置到broker。
- 心跳线程:定时上报心跳到broker。
- 失败消息重试线程:对失败消息进行重试消费。
PMQ的消息发送分为同步发送和异步发送。
上图中Producer表示PMQ的发送端,Broker表示PMQ的服务端。
同步发送比较简单。发送端把消息发送到broker,broker把消息存储到消息库,然后返回存储结果。
上图简要描述了PMQ异步发送端的实现原理:
- 应用发送线程:把消息发送到本地缓冲队列。
- 批量发送线程:把缓冲队列中的消息批量发送到broker。
- broker把消息批量插入到消息库。
场景 | 影响 | 原因 |
---|---|---|
个别broker服务端下线 | 无影响 | broker无状态,客户端重连其它broker |
个别消息库挂 | 无影响 | 主从保证消息库高可用 |
元数据库挂 | 仅影响元数据管理功能,现有的生产和消费无影响。 | 客户端和broker会缓存元数据 |
portal挂 | 仅影响元数据管理功能,现有的生产和消费无影响。 | |
个别消费实例挂 | 无影响 | PMQ的动态重平衡功能,会把队列重新分配给存活的消费实例 |
PMQ客户端和服务端目前支持CAT自动打点,所以如果自己公司内部部署了CAT的话,只要引入cat-client后PMQ就会自动启用CAT打点。
如果不使用CAT的话,也不用担心,只要不引入cat-client,PMQ是不会启用CAT打点的。