这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党
RocketMQ版本
- version: 5.1.0
RocketMQ中consumer消费模型
在了解RocketMQ的Rebalance机制之前,我们必须先简单了解下rocketmq的消费模型
我们知道在我们创建topic的时候需要指定一个参数就是读队列数
这里假设我们的topic是xiaozoujishu-topic
,我们的读队列数
是4个,我们同一gid
下的集群消费模式的消费者有两个,那么我们消费者是如何消费消息的呢
首先需要明确的是:
- 这里我们的消费模式是集群消费
- queue的负载均衡算法是使用默认的
AllocateMessageQueueAveragely
(平均分配)
假设我们项目刚开始只有一个消费者,那么我们的消费队列分配就如下:
四个队列分配给一个消费者
此时如果我们再启动一个消费者,那么这时候就会进行Rebalance
,然后此时我们的队列分配就变成如下:
所以通过上面的队列分配我就知道Rebalance
是个啥了,我们下面对Rebalance
进行一些定义
RocketMQ的Rebalance是什么
Rebalance(重新平衡)机制指的是:将一个Topic下的多个队列(queue),在同一个消费者组(consumer group)(gid)下的多个消费者实例(consumer instance)之间进行重新分配
Rebalance的目的
从上面可以看出Rebalance
的本意是把一个topic的queue分配给合适的consumer,本意其实是为了提升消息的并行处理能力
但是Rebalance
也带来了一些危害,后面我们会重点分析下
Rebalance
的触发原因
我们这里先说结论
- 订阅Topic的队列数量变化
- 消费者组信息变化
这里是最深层的原因,就是topic
的队列数量、消费组信息
实际我们可以将这些归结为Rebalance
的元数据,这些元数据的变更,就会引起clinet的Rebalance
注意RocketMQ的
Rebalance
是发生在client
这些元数据都在管broker
管理
核心就是这三个类
- TopicConfigManager
- SubscriptionGroupManager
- ConsumerManager
只要这个三个类的信息有变化,client
就会进行Rebalance
。
下面我们可以具体说下什么情况下会让这三个类变化
订阅Topic的队列数量变化
什么情况下订阅Topic的队列数量会变化呢?
- broker扩容
- broker缩容
- broker宕机(本质也是类似缩容)
消费者组信息变化
什么时候消费者组信息会变化呢?
核心就是consumer的上下线,具体细分又可以分为如下原因:
- 服务日常滚动升级
- 服务扩容
- 服务订阅消息发生变化
源码分析
上面大致介绍了Rebalance
的触发原因,现在我们结合源码来具体分析下
我们就从consumer
的启动开始分析吧
这里我们以最简单的demo为例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe(TOPIC, "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
// return ConsumeConcurrentlyStatus.RECONSUME_LATER;
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.printf("Consumer Started.%n");
这里我们直接注意到
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
这个方法,看名字就知道是client
向所有的broker
发送心跳
我们进入到sendHeartbeatToAllBrokerWithLock
方法看看
private void sendHeartbeatToAllBroker() {
final HeartbeatData heartbeatData = this.prepareHeartbeatData();
final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
if (producerEmpty && consumerEmpty) {
log.warn("sending heartbeat, but no consumer and no producer. [{}]", this.clientId);
return;
}
if (this.brokerAddrTable.isEmpty()) {
return;
}
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
for (Entry<String, HashMap<Long, String>> brokerClusterInfo : this.brokerAddrTable.entrySet()) {
String brokerName = brokerClusterInfo.getKey();
HashMap<Long, String> oneTable = brokerClusterInfo.getValue();
if (oneTable == null) {
continue;
}
for (Entry<Long, String> singleBrokerInstance : oneTable.entrySet()) {
Long id = singleBrokerInstance.getKey();
String addr = singleBrokerInstance.getValue();
if (addr == null) {
continue;
}
if (consumerEmpty && MixAll.MASTER_ID != id) {
continue;
}
try {
int version = this.mQClientAPIImpl.sendHeartbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());
if (!this.brokerVersionTable.containsKey(brokerName)) {
this.brokerVersionTable.put(brokerName, new HashMap<>(4));
}
this.brokerVersionTable.get(brokerName).put(addr, version);
if (times % 20 == 0) {
log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
log.info(heartbeatData.toString());
}
} catch (Exception e) {
if (this.isBrokerInNameServer(addr)) {
log.warn("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
} else {
log.warn("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
id, addr, e);
}
}
}
}
}
这段代码主要是通过this.brokerAddrTable.entrySet()
获取到所有的master broker地址,然后进行心跳发送
具体的心跳发送代码实际是在下面代码中进行的
int version = this.mQClientAPIImpl.sendHeartbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());
我们进入到该方法会发现和我们之前分析的一样,就是发送一个请求到broker,请求码是RequestCode.HEART_BEAT
我们看看RequestCode.HEART_BEAT
的调用找到`broker的处理逻辑
很快我们通过方法名就能定位到处理client的请求的方法是ClientManageProcessor
类的processRequest
我们具体进去看看这个方法
可以看到具体的逻辑被封装在return this.heartBeat(ctx, request);
这个方法中,所以我们需要再进去看看
进去这个方法我们能看到一个比较核心的方法
registerConsumer
很明显这个方法就是注册consumer的方法
这个方法里面和Rebalance
相关比较核心的方法就是这三个
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere);
这里我们可以看看clientChannelInfo
里面是个啥玩意
具体深入到updateChannel
方法里面就是判断是否为新的client,是就更新channelInfoTable
2.updateSubscription
这个方法就是判断订阅关系是否发生了变化并更新订阅关系
callConsumerIdsChangeListener
callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
这个方法就是通知client进行Rebalance
,具体的实现是参考了类似事件订阅的方式去实现的,这里是发送了一个CHANGE
事件
这里我们可以简单看看事件定义的类型有哪些
我们直接看看具体的事件处理类
可以看到实现类有多个,我们直接看broker
模块的DefaultConsumerIdsChangeListener
类即可
可以看到这里是给该group所有的client发送Rebalance
消息
具体的消息状态码是
RequestCode.NOTIFY_CONSUMER_IDS_CHANGED
client Rebalance
通过上面我们大致找到了整个通信过程,但是实际的Rebalance
是发生在client,所以我们还是需要继续回到client
的代码
我们通过状态码RequestCode.NOTIFY_CONSUMER_IDS_CHANGED
找到client
的处理类ClientRemotingProcessor
实际处理方法就是
this.mqClientFactory.rebalanceImmediately();
我们进入这个方法看看这里最终就是唤醒阻塞的Rebalance
线程
所以实际的方法调用还是在RebalanceService
的 run
方法
最终还是调用的是MQConsumerInner
接口中的doRebalance
方法
这里有个细节,为什么不是直接调用一个静态方法,要搞这么多花里胡哨的唤醒线程操作?
原来是cleint
也会定时去Rebalance
默认是20s一次,可以配置
可以通过参数rocketmq.client.rebalance.waitInterval
去配置
那么为什么client
还要自己去循环Rebalance
原来这里是防止因为网络等其他原因丢失了broker
的请求,后续网络回复了,也能进行进行Rebalance
下面我们继续看看Rebalance
的实现细节
这里我们以常用的DefaultMQPushConsumerImpl
为例
实际这里最终调用的还是抽象类RebalanceImpl
的doRebalance
方法
可以看到这里的Rebalance
是按照topic
的维度
我们先理解订阅单个topic
的原理
这里的就是先对topic
的queue排序,然后对consumer
排序,
然后调用AllocateMessageQueueStrategy
的allocate方法
这里我们暂时只分析默认的平均分配算法(AllocateMessageQueueAveragely),也就是我们最先说的分配算法。其他算法可以详细分析
这里的分配方式就是我们前面画图的,比如4个queue,2个consumer,那么就是每个consumer2个queue。
简单举例就是我们的 queue有q1、q2、q3、q4
consumer有 c1、c2
那么就是
c1:q1、q2
c2:q2、q3
需要注意的是如果consumer大于queue数量,多出的consumer就不会被分配到queue
client什么时候触发Rebalance
上面分析了这么多原理,这里我们总结下client
什么时候会触发Rebalance
- consumer启动时会向所有master broker发送心跳,然后
broker
发送信息通知所有consumer
触发Rebalance
- 启动完成后consumer会周期的触发
Rebalance
,防止因为网络等问题丢失broker
的通知而没有Rebalance
- 当consumer停止时,也会通过之前分析的事件机制,触发注销
comsuer
事件然后通知所有的comsuer
触发Rebalance
总结
这里我们详细介绍了client
是如何触发Rebalance
的,以及触发Rebalance
的时机,也介绍了Rebalance
的好处。
实际还有很多细节我们限于篇幅暂未分析。
后面我们会继续分析Rebalance
的坏处和一些详细的Rebalance
算法
参考
- RocketMQ源码
- 博客