RocketMQ5.0消息消费<二> _ 消息队列负载均衡机制
一、消费队列负载均衡概览
RocketMQ默认一个主题下有4个消费队列,集群模式下同一消费组内要求每个消费队列在同一时刻只能被一个消费者消费。那么集群模式下多个消费者是如何负载主题的多个消费队列呢?并且如果有新的消费者加入时,消费队列又会如何重新分布呢?
RocketMQ消费端每20s周期执行一次消费队列重新负载,每次进行队列重新负载时会从Broker实时查询当前消费组内所有消费者,并且对消息队列、消费者列表进行排序,这样新加入的消费者就会在队列重新分布时分配到消费队列从而消费消息。如下所示,是消息拉取与消费队列负载均衡的交互图。
消息拉取与消费队列负载均衡的交互流程
二、消费队列负载均衡实现
- 负载均衡UML
2. 启动RebalanceService线程
参考《RocketMQ5.0.0消息消费<一> _ PUSH模式的消息拉取》
章节,消费者启动时,当前消费者添加到MQClientInstance#consumerTable属性中,并启动MQClientInstance实例。启动MQClientInstance实例时,会启动org.apache.rocketmq.client.impl.consumer.RebalanceService消费队列负载均衡服务线程。下图所示是该线程run()调用链。
以下代码是MQClientInstance维护整个JVM的所有生产者和消费者的属性。
// 生产者容器
private final ConcurrentMap<String/* 生产组 */, MQProducerInner> producerTable = new ConcurrentHashMap<>();
// 消费者容器
private final ConcurrentMap<String/* 消费组 */, MQConsumerInner> consumerTable = new ConcurrentHashMap<>();
org.apache.rocketmq.client.impl.consumer.RebalanceService#run()周期20s执行负载均衡任务。-Drocketmq.client.rebalance. waitlnterval参数修改执行周期,默认20s。
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
// 线程等待20s
this.waitForRunning(waitInterval);
// topic下消费队列的负载均衡
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
org.apache.rocketmq.client.impl.factory.MQClientInstance#doRebalance方法遍历MQClientInstance实例中所有消费组下消费者。每一个消费者DefaultMQPushConsumerImpl拥有一个org.apache.rocketmq.client.impl.consumer.RebalanceImpl对象(实现负载均衡),给每个消费者找到一个消费队列(重新负载)。
// 消费队列负载均衡
public void doRebalance() {
for (Map.Entry<String/* 消费组 */, MQConsumerInner> entry : this.consumerTable.entrySet()) {
// 获取消费者
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
// 消费者负载均衡
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
3. PUSH模式负载均衡
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#doRebalance是PUSH模式的负载均衡的入口方法,其调用链如下。
每个消费者DefaultMQPushConsumerImpl拥有一个RebalanceImpl对象,其中org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalance方法是对消费者的所有订阅主题进行负载均衡,即:消费者的所有订阅主题重新分配一个或多个消费队列来进行消费。其代码如下。注意事项:
- Map<String/* topic */, SubscriptionData> subTable:获取当前消费者订阅的主题信息;
- rebalanceByTopic():每个主题进行重新负载均衡
/**
* 对消费者订阅的每个topic进行消费队列重新负载
* step1:获取消费者订阅的主题信息,注意:消费者可以订阅多个主题
* step2:遍历消费者的每个topic
* step3:消费者订阅的topic进行消费队列重新负载
* {@link RebalanceImpl#rebalanceByTopic(String, boolean)}
* @param isOrder 是否顺序消息
* @return true所有topic重新负载成功
*/
public boolean doRebalance(final boolean isOrder) {
boolean balanced = true;
// 获取消费者订阅的主题信息,注意:消费者可以订阅多个主题
Map<String/* topic */, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
// 遍历消费者的每个topic
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
if (!clientRebalance(topic) && tryQueryAssignment(topic)) {
balanced = this.getRebalanceResultFromBroker(topic, isOrder);
} else {
// 消费者订阅的topic进行消费队列重新负载
balanced = this.rebalanceByTopic(topic, isOrder);
}
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalance Exception", e);
balanced = false;
}
}
}
}
this.truncateMessageQueueNotMyTopic();
return balanced;
}
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic方法是对每个主题进行重新负载均衡的核心逻辑,如下代码所示。 这里介绍集群模式下负载均衡,注意事项:
- MQClientInstance#findConsumerIdList():从Broker上获取所有订阅该topic且同属一个消费组的所有消费者ID。
- 对消费队列、消费者ID集合排序:原因是同一个消费组内视图一致,确保同一个消费队列不会被多个消费者分配。
- AllocateMessageQueueStrategy#allocate:根据均衡策略,获取当前消费者的消息队列。
- RebalanceImpl#updateProcessQueueTableInRebalance:重新负载后,消费者对应的分配后的消息队列是否变化:
新增、删除(其他消费者占用)。
/**
* 消费者订阅的topic进行消费队列重新负载
* 集群模式下的步骤:
* step1:从主题订阅信息缓存表(topicSubscribeInfoTable)中获取当前topic的消费队列
* step2:从Broker上获取所有订阅该topic + 同属一个消费组 的所有消费者ID
* step3:对消费队列、消费者ID排序,很重要,原因是:同一个消费组内视图一致,确保同一个消费队列不会被多个消费者分配
* step4:根据均衡策略,获取当前消费者的消息队列
* {@link AllocateMessageQueueStrategy#allocate}
* step5:消费者对应的分配消息队列是否变化: 新增、删除(其他消费者占用)
* {@link RebalanceImpl#updateProcessQueueTableInRebalance}
* @param topic 主题
* @param isOrder 是否是顺序消息
* @return true重新分配消息队列成功
*/
private boolean rebalanceByTopic(final String topic, final boolean isOrder) {
boolean balanced = true;
switch (messageModel) {
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}", consumerGroup, topic, mqSet, mqSet);
}
balanced = mqSet.equals(getWorkingMessageQueue(topic));
} else {
this.messageQueueChanged(topic, Collections.<MessageQueue>emptySet(), Collections.<MessageQueue>emptySet());
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
case CLUSTERING: {
// 从主题订阅信息缓存表中获取当前topic的消费队列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// 从Broker上获取所有订阅该topic + 同属一个消费组 的所有消费者ID
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
this.messageQueueChanged(topic, Collections.<MessageQueue>emptySet(), Collections.<MessageQueue>emptySet());
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
/*
消费队列、消费者ID排序很重要:同一个消费组内视图一致,确保同一个消费队列不会被多个消费者分配
*/
// 消费队列排序
Collections.sort(mqAll);
// 消费者ID排序
Collections.sort(cidAll);
// 均衡策略
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
// 根据均衡策略,获取当前消费者的消息队列
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(), // 当前消费者ID
mqAll,
cidAll);
} catch (Throwable e) {
log.error("allocate message queue exception. strategy name: {}, ex: {}", strategy.getName(), e);
return false;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
// 消费者对应的分配消息队列是否变化: 新增、删除(其他消费者占用)
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"client rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
balanced = allocateResultSet.equals(getWorkingMessageQueue(topic));
}
break;
}
default:
break;
}
return balanced;
}
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance重新分配后消费队列集合与上次负载的分配集合是否改变(新增或删除)来重新拉取消息。如下代码所示。
-
删除(消费队列分配给其他消费者):暂停消费并移除,且持久化待移除消费队列的消费进度。
-
新增(缓存表没有的消费队列):
step1:删除内存中该消费队列的消费进度;
step2:创建broker的消费队列;
step3:从磁盘中获取该消费队列的消费进度(若进度<0时,则根据配置矫正消费进度),创建拉取消息请求。
-
新增消费队列:重新创建拉取请求PullRequest加入到PullMessageService线程中,唤醒该线程拉取消息RebalanceImpl#dispatchPullRequest。
-
若是顺序消息:是局部顺序消息,尝试向Broker请求锁定该消费队列,锁定失败延迟时则重新负载。
/**
* 消费者对应的分配消息队列是否变化
* step1:消费队列缓存表中不在本次均衡分配的消费队列时,则暂停消费并移除,且持久化待移除消费队列的消费进度;
* step2:本次均衡分配的消费队列不在消费队列缓存表中,则新增:
* 1):删除内存中该消费队列的消费进度;
* 2):创建broker的消费队列;
* 3):从磁盘中获取该消费队列的消费进度(若进度<0时,则根据配置矫正消费进度),创建拉取消息请求
* {@link RebalanceImpl#computePullFromWhere}
* step3: 新增消费队列,则创建{@link PullRequest}加入到{@link PullMessageService},唤醒该线程拉取消息
* {@link RebalanceImpl#dispatchPullRequest}
* step4:顺序消息时,则尝试向Broker请求锁定该消费队列,锁定失败延迟重新负载
* @param topic 主题
* @param mqSet 本次均衡分配的消费队列
* @param isOrder 是否顺序
* @return true变化;false未改变
*/
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
// drop process queues no longer belong me 当前消费队列不在分配队列中
HashMap<MessageQueue, ProcessQueue> removeQueueMap = new HashMap<MessageQueue, ProcessQueue>(this.processQueueTable.size());
// 遍历当前消费队列缓存表
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
// 是该topic的消费队列
if (mq.getTopic().equals(topic)) {
// 当前消费队列不在现有的分配消息队列中,则暂停消费、废弃当前消费队列并移除(分配给其他消费者)
if (!mqSet.contains(mq)) {
pq.setDropped(true);
removeQueueMap.put(mq, pq);
} else if (pq.isPullExpired() && this.consumeType() == ConsumeType.CONSUME_PASSIVELY) {
pq.setDropped(true);
removeQueueMap.put(mq, pq);
log.error("[BUG]doRebalance, {}, try remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
}
}
// remove message queues no longer belong me 移除不在分配的消费队列
for (Entry<MessageQueue, ProcessQueue> entry : removeQueueMap.entrySet()) {
MessageQueue mq = entry.getKey();
ProcessQueue pq = entry.getValue();
/*
判断是否将{@link MessageQueue}、{@link ProcessQueue}缓存表中移除
a. 持久化待移除的{@link MessageQueue}消费进度;
b. 顺序消息时,需先解锁队列
*/
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
this.processQueueTable.remove(mq);
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
}
// add new message queue 遍历本次负载均衡分配的消费队列,缓存表中没有,则新增的消费队列
boolean allMQLocked = true; // 消费队列是否有锁定(顺序消息使用)
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
// 新增的消费队列
if (!this.processQueueTable.containsKey(mq)) {
// 若是顺序消息,则尝试向Broker请求锁定该消费队列,锁定失败延迟重新负载
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
allMQLocked = false;
continue;
}
// 删除内存中该消费队列的消费进度
this.removeDirtyOffset(mq);
// 创建broker的消费队列
ProcessQueue pq = createProcessQueue(topic);
pq.setLocked(true);
// 从磁盘中获取该消费队列的消费进度(若进度<0时,则根据配置矫正消费进度),创建拉取消息请求
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
// 创建拉取消息请求
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
// 锁定消费队列失败,延迟重新负载
if (!allMQLocked) {
mQClientFactory.rebalanceLater(500);
}
// 将拉取消息对象{@link PullRequest}加入到{@link PullMessageService},唤醒该线程拉取消息
this.dispatchPullRequest(pullRequestList, 500);
return changed;
}
根据RebalanceImpl#updateProcessQueueTableInRebalance来判定消费者对应的分配到的消息队列是否变化(新增或删除)时,若是新增,则先删除内存消费进度,再从Broker端获取该消费队列的消费进度;若是删除,持久化消费进度同时删除旧的消费队列。
a. 删除操作
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#removeUnnecessaryMessageQueue负载均衡时删除未分配的消费队列,其调用链如下。
b. 新增操作
先删除该消费队列旧的内存消费进度,执行方法RebalanceImpl#removeDirtyOffset,其调用链如下。
再从Broker磁盘获取该消费队列消费进度,执行RebalanceImpl#computePullFromWhere,其调用链如下。
三、负载均衡策略
org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy是消费队列负载均衡策略的接口,其有6个实现类,UML图如下。其中:
-
AllocateMessageQueueAveragely:平均分配算法(默认),如:8个消息消费队列q1、q2、q3、q4、q5、q6、q7、q8,有3个消费者c1、c2、c3,则分配如下:
c1:q1、q2、q3 c2:q4、q5、q6 c3:q7、q8
-
AllocateMessageQueueAveragelyByCircle:平均轮询算法,如:8个消息消费队列q1、q2、q3、q4、q5、q6、q7、q8,有3个消费者c1、c2、c3,则分配如下:
c1:q1、q4、q7 c2:q2、q5、q8 c3:q3、q6
四、参考资料
- https://blog.csdn.net/Weixiaohuai/article/details/123898841
- https://www.cnblogs.com/alisystemsoftware/p/16935521.html