深入浅出消息队列----【阶段总结篇】
- 总览
- nameSrv
- Broker
- producer(生产者)
- consumer(消费者)
- 串联起来
本文仅是文章笔记,整理了原文章中重要的知识点、记录了个人的看法
文章来源:编程导航-鱼皮【yes哥深入浅出消息队列专栏】
总览
nameSrv
它会保存 Topic 与 Broker 的关联信息,Broker 将自己的信息上报给 nameSrv。
生产者从 nameSrv 获得它要发送的 Topic 对应的 Broker 信息。
消费者从 nameSrv 获取它要消费的 Topic 对应的 Broker 信息。
nameSrv 虽然有集群的概念,但是集群内的 nameSrv 互不通信,他们都是独立的存在。
生产者会随机跟一个 nameSrv 建连交互,每 30s 从 nameSrv 拉取最新的 Topic 相关数据。
消费者会随机跟一个 nameSrv 建连交互,每 30s 从 nameSrv 拉取最新的 Topic 相关数据。
Broker 需要跟集群里面所有的 nameSrv 建立交互,每 30s 上报自身信息给 nameSrv。
nameSrv 会维护 Broker 的信息,如果 Broker 下线了,会移除对应的 Broker 信息,这样生产者和消费者拉取 Topic 相关信息时就能感知到 Broker 下线了。
Broker
Broker 启动之后会跟 nameSrv 集群里面的每一台 nameSrv 建立连接,并且定时上报自己的信息给 nameSrv,包含自己的 IP 信息、Topic 信息等。
Broker 存储 Topic 消息,而 Topic 会分多个队列,如果有 Broker 集群,那么一个 Topic 的多个队列会分散在一个 Broker 集群的不同的 Broker 中。
比如 TopicA 有 4 个队列,那么此时 Broker 集群里,BrokerA 存 1、2 两个队列,BrokerB 存 3、4 两个队列。
Broker 会将不同的 Topic 消息都写入到同一个文件,即 commitlog 里,所有的消息追加写入到一个文件,这叫顺序写,根据磁盘的物理结构,顺序写的效率很高。
Broker 会有一个后台线程,一直将新加入的 commitlog 里面的消息,映射到 consumerQueue 中:
consumerQueue 对应的就是 Topic 队列的概念,消费者直接消费的对象也是 consumerQueue。
consumerQueue 里面存储的是消息在 commitlog 里面的起始物理地址、长度、taghash。
消费者消费的时候先找到对应的 consumerQueue,再根据 consumerQueue 里面的数据,找到 commitlog 得到对应的消息内容。
除了 commitlog 和 consumerQueue,broker 还有个 IndexFile,即消息的索引文件。
我们在发送消息时,可以给消息设置索引,这样方便我们通过 key 直接查询对应的消息:
producer(生产者)
producer 启动之后就会选择一台 nameSrv 进行建连,从 nameSrv 拉取自己想要发送的 Topic 对应的 Broker 信息。
紧接着跟 Broker 建立连接,后续发送消息直接跟 Broker 进行交互。
按照消息的类别可以分为 5 大;类,分别是
- 普通消息
- 顺序消息
- 全局顺序
- 局部顺序
- 延迟消息
- 批量消息
- 事务消息
然后是发送消息的三种方式,分别是
- 同步发送
- 异步发送
- 单向发送
consumer(消费者)
consumer 启动之后就会选择一台 nameSrv 进行建连,从 nameSrv 拉取自己想要消费的 Topic 对应的 Broker 信息。
跟 Broker 建立连接后,后续的消费消息就直接跟 Broker 进行交互。
消费者从 Broker 获取消息有两种方式:
- Broker 推给 consumer
- consumer 主动从 Broker 拉取
现在主流的消息队列都是采取拉的方式来实现消息的获取,因为推消息不容易把控消费者的消费情况,如果消费不过来还使劲推送消息,容易导致消费者直接挂掉。
当然朴素的拉消息也存在问题,比如拉取的不及时,或者忙拉取的情况,因此拉消息的时候都是基于长轮询拉取,即消费者向 Broker 发送拉取请求,如果当前有消息则立马返回,如果没有消息那么 Broker 会先 hold 住这个请求。
然后等着,如果此时有生产者发送消息过来,则立马响应这些消息给消费者。
如果没有消息则等待一定时间后返回消费者无消息的响应,然后消费者立马再次请求,如此循环往复。
对了,RocketMQ 虽然对应的消费者实现类有 pullConsumer、pushConsumer,但是底层都是用的拉模式,无非是封装的看着像推的罢了。
consumer 还有重平衡的操作来实现消费的负载均衡,consumer 消费某个 Topic 的消息,实际是去 Topic 下的某个队列去拉取消息。
而一个 Topic 往往会有多个队列,也同时会有一个消费组一起来消费这个 Topic 的消息,一个消费组往往由多个消费者组成。
消费组里面的消费者们会瓜分消费 Topic 的队列们,正常情况下瓜分结束后不需要有什么变动。
但是如果当前有消费者挂了,这时候需要重新瓜分下队列的归属,留存的消费者需要顶上挂了的消费者负责的队列,也就是需要重平衡。
当然,不仅仅是挂了之后要重平衡,新的消费者加入也会触发这个动作。
Broker 知晓新消费者上线后,会通知一个组的消费者进行重平衡,默认每个消费者 20s 也会主动进行一次重平衡。
这是属于客户端的负载均衡。
重平衡后,每个消费者指定自己需要拉取哪个队列消息,构建对应的请求去 Broker 拉取即可。
然后还有个消费进度的概念。
由于集群模式下,Topic 下的UIlie是被消费组内的消费者瓜分的,如果发送老的消费者下线,或者新的消费者上线都需要重平衡,即每个 consumer 负责的队列会发生变化。
因此接手新队列的 consumer 需要知道老 consumer 对当前队列的消费进度,总不能发送重平衡后让 consumer 从这个队列的最早的一条消息开始消费吧?
所以集群模式下,消费进度需要保存在 Broker 中,这样重平衡之后 consumer 可以从 Broker 知晓消费进度。
消费进度的更新时 consumer 去 Broker 拉取请求时顺带将当前的消费进度带上给 Broker 的。
而广播模式下,消费者之间没有互帮互助的关系,各管各的,所以消费进度仅保存在他们本地。
如果消费失败了怎么办?
consumer 会将消息发回给 Broker,消息的 Topic 为 %RETRY% + ConsumerGroupName,后续会重新消费这个消息。
如果一直消费失败,默认重试 16 次后,消息回进入死信队列,不会再进行重试。
串联起来
nameSrv 启动后,待命。
Broker 启动,将自身的信息包括 IP、端口、Topic 等上传给 nameSrv,等待 producer 发送消息,等待 consumer 消费消息。
producer 启动,连上 nameSrv,从它身上获取 Broker 信息,跟对应的 Broker 建立连接,建立连接后发送消息给 Broker。
Broker 将消息存储到 commitlog 文件中,并分发到 consumerQueue,等待 consumer 来消费拉取消息。
consumer 启动,连上 nameSrv,从它身上获取 Broker 信息,跟对应的 Broker 建立连接,建立连接后发送拉取请求给 Broker。
Broker 根据对应的 Topic、队列ID、消息点位,找到 consumerQueue 的消息,再解析找到对应 commitlog 得到消息内容,然后返回给 consumer。
consumer 消费消息,随后上报自己的消费进度给 Broker。