RocketMQ在集群模式下,同一个消费组内,一个消息队列同一时间只能分配给组内的某一个消费者,也就是一条消息只能被组内的一个消费者进行消费,为了合理的对消息队列进行分配,于是就有了负载均衡。
接下来以集群模式下的消息推模式DefaultMQPushConsumerImpl
为例,看一下负载均衡的过程。
消费者负载均衡
首先,消费者在启动时会做如下操作:
- 从NameServer更新当前消费者订阅主题的路由信息;
- 向Broker发送心跳,注册消费者;
- 唤醒负载均衡服务,触发一次负载均衡;
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public synchronized void start() throws MQClientException {
// ...
// 更新当前消费者订阅主题的路由信息
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
// 向Broker发送心跳
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 唤醒负载均衡服务
this.mQClientFactory.rebalanceImmediately();
}
}
更新主题路由信息
为了保证消费者拿到的主题路由信息是最新的(topic下有几个消息队列、消息队列的分布信息等),在进行负载均衡之前首先要更新主题的路由信息,在updateTopicSubscribeInfoWhenSubscriptionChanged
方法中可以看到,首先获取了当前消费者订阅的所有主题信息(一个消费者可以订阅多个主题),然后进行遍历,向NameServer发送请求,更新每一个主题的路由信息,保证路由信息是最新的:
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
// 获取当前消费者订阅的主题信息
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
// 遍历订阅的主题信息
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
// 从NameServer更新主题的路由信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
}
}
}
}
注册消费者
发送心跳
由于Broker需要感知消费者数量的增减,所以每个消费者在启动的时候,会调用sendHeartbeatToAllBrokerWithLock
向Broker发送心跳包,进行消费者注册:
public class MQClientInstance {
public void sendHeartbeatToAllBrokerWithLock() {
if (this.lockHeartbeat.tryLock()) {
try {
// 调用sendHeartbeatToAllBroker向Broker发送心跳
this.sendHeartbeatToAllBroker();
this.uploadFilterClassSource();
} catch (final Exception e) {
log.error("sendHeartbeatToAllBroker exception", e);
} finally {
this.lockHeartbeat.unlock();
}
} else {
log.warn("lock heartBeat, but failed. [{}]", this.clientId);
}
}
}
在sendHeartbeatToAllBroker
方法中,可以看到从brokerAddrTable
中获取了所有的Broker进行遍历(主从模式下也会向从节点发送请求注册),调用MQClientAPIImpl
的sendHearbeat
方法向每一个Broker发送心跳请求进行注册:
public class MQClientInstance {
// Broker路由表
private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
new ConcurrentHashMap<String, HashMap<Long, String>>();
// 发送心跳
private void sendHeartbeatToAllBroker() {
final HeartbeatData heartbeatData = this.prepareHeartbeatData();
// ...
if (!this.brokerAddrTable.isEmpty()) {
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
// 获取所有的Broker进行遍历, key为 Broker Name, value为同一个name下的所有Broker实例(主从模式下Broker的name一致)
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, HashMap<Long, String>> entry = it.next();
String brokerName = entry.getKey(); // broker name
// 获取同一个Broker Name下的所有Broker实例
HashMap<Long, String> oneTable = entry.getValue();
if (oneTable != null) {
// 遍历所有的实例
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
Long id = entry1.getKey();
String addr = entry1.getValue();
if (addr != null) { // 如果地址不为空
// ...
try {
// 发送心跳
int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());
// ...
} catch (Exception e) {
// ...
}
}
}
}
}
}
}
}
在MQClientAPIImpl
的sendHearbeat
方法中,可以看到构建了HEART_BEAT
请求,然后向Broker发送:
public class MQClientAPIImpl {
public int sendHearbeat(final String addr, final HeartbeatData heartbeatData, final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {
// 创建HEART_BEAT请求
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
request.setLanguage(clientConfig.getLanguage());
request.setBody(heartbeatData.encode());
// 发送请求
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
// ...
}
}
心跳请求处理
Broker在启动时注册了HEART_BEAT
请求的处理器,可以看到请求处理器是ClientManageProcessor
:
public class BrokerController {
public void registerProcessor() {
ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
// 注册HEART_BEAT请求的处理器ClientManageProcessor
this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
}
}
进入到ClientManageProcessor
的processRequest
方法,如果请求是HEART_BEAT
类型会调用heartBeat
方法进行处理,这里也能看还有UNREGISTER_CLIENT
类型的请求,从名字上可以看出是与取消注册有关的(这个稍后再说):
public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.HEART_BEAT: // 处理心跳请求
return this.heartBeat(ctx, request);
case RequestCode.UNREGISTER_CLIENT: // 取消注册请求
return this.unregisterClient(ctx, request);
case RequestCode.CHECK_CLIENT_CONFIG:
return this.checkClientConfig(ctx, request);
default:
break;
}
return null;
}
}
进入到heartBeat
方法,可以看到,调用了ConsumerManager
的registerConsumer
注册消费者:
public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
// ...
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
// ...
// 注册Consumer
boolean changed = this.brokerController.getConsumerManager().registerConsumer(
data.getGroupName(), clientChannelInfo, data.getConsumeType(), data.getMessageModel(), data.getConsumeFromWhere(),
data.getSubscriptionDataSet(), isNotifyConsumerIdsChangedEnable);
// ...
}
// ...
return response;
}
}
进行注册
ConsumerManager
的registerConsumer
方法的理逻辑如下:
- 根据组名称获取该消费者组的信息
ConsumerGroupInfo
对象。如果获取为空,会创建一个ConsumerGroupInfo,记录了消费者组的相关信息; - 判断消费者是否发生了变更,如果如果发生了变化,会触发
CHANGE
变更事件(这个稍后再看); - 触发
REGISTER
注册事件;
public class ConsumerManager {
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
// 根据组名称获取消费者组信息
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) { // 如果为空新增ConsumerGroupInfo对象
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
boolean r2 = consumerGroupInfo.updateSubscription(subList);
// 如果有变更
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
// 通知变更
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
// 注册Consumer
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
return r1 || r2;
}
}
进入到DefaultConsumerIdsChangeListener
的handle
方法中,可以看到如果是REGISTER
事件,会通过ConsumerFilterManager
的register
方法进行注册,注册的详细过程这里先不展开讲解:
public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
@Override
public void handle(ConsumerGroupEvent event, String group, Object... args) {
if (event == null) {
return;
}
switch (event) {
case CHANGE:// 如果是消费者变更事件
// ...
break;
case UNREGISTER: // 如果是取消注册事件
this.brokerController.getConsumerFilterManager().unRegister(group);
break;
case REGISTER: // 如果是注册事件
if (args == null || args.length < 1) {
return;
}
Collection<SubscriptionData> subscriptionDataList = (Collection<SubscriptionData>) args[0];
// 进行注册
this.brokerController.getConsumerFilterManager().register(group, subscriptionDataList);
break;
default:
throw new RuntimeException("Unknown event " + event);
}
}
}
负载均衡
经过以上步骤之后,会调用MQClientInstance的rebalanceImmediately
唤醒负载均衡服务进行一次负载均衡,为消费者分配消息队列,需要注意的是负载均衡是由消费者端执行:
// MQClientInstance
public class MQClientInstance {
private final RebalanceService rebalanceService;
public void rebalanceImmediately() {
// 唤醒负载均衡服务
this.rebalanceService.wakeup();
}
}
// RebalanceService
public class RebalanceService extends ServiceThread {
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
// 负载均衡
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
}
负载均衡的过程在【RocketMQ】消息的拉取一文中已经讲解,这里挑一些重点内容看一下。
在负载均衡的时候,首先会获取当前消费者订阅的主题信息,对订阅的主题进行遍历,对每一个主题进行负载均衡,重新分配:
public abstract class RebalanceImpl {
public void doRebalance(final boolean isOrder) {
// 获取订阅的主题信息
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
// 遍历所有订阅的主题
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
// 根据主题进行负载均衡
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
}
根据主题进行负载均衡
rebalanceByTopic
方法中根据消费模式进行了判断然后对主题进行负载均衡,这里我们关注集群模式下的负载均衡:
-
从
topicSubscribeInfoTable
中根据主题获取对应的消息队列集合,这一步可以得到主题下的所有消息队列信息; -
根据主题信息和消费者组名称,获取所有订阅了该主题的消费者ID集合,这一步得到了订阅该主题的所有消费者;
-
如果主题对应的消息队列集合和消费者ID都不为空,对消息队列集合和消费ID集合进行排序,排序是为了接下来进行分配;
-
获取设置的分配策略,根据分配策略,为消费者分配对应的消费队列,以平均分配策略为例,它会根据消息队列的数量和消费者的个数计算每个消费者分配的队列个数:
-
根据最新分配的消息队列信息,调用
updateProcessQueueTableInRebalance
更新当前消费者消费的处理队列ProcessQueue
信息
public abstract class RebalanceImpl {
// 根据主题进行负载均衡
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: { // 广播模式
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// ...
break;
}
case CLUSTERING: { // 集群模式
// 根据主题获取订阅的消息队列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// 获取所有订阅了该主题的消费者id
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
// ...
if (mqSet != null && cidAll != null) { // 如果都不为空
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
// 对消息队列排序
Collections.sort(mqAll);
// 对消费者排序
Collections.sort(cidAll);
// 获取分配策略
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
// 根据分配策略,为消费者分配消费队列
allocateResult = strategy.allocate(
this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll);
} catch (Throwable e) {
// ...
}
// 分配给当前消费的消费队列
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
// 将分配结果加入到结果集合中
allocateResultSet.addAll(allocateResult);
}
// 根据分配信息更新处理队列
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
// ...
}
break;
}
default:
break;
}
}
}
更新处理队列
负载均衡之后,消费者负责的消息队列有可能发生变化,一个消息队列MessageQueue
对应一个处理队列ProcessQueue
,processQueueTable
记录了消费者负责的队列信息,此时需要对其进行更新,处理逻辑如下:
-
对
processQueueTable
进行遍历,处理每一个消息队列,这一步主要是判断重新分配之后,processQueueTable
中记录的某些消息队列是否已经不再由当前消费者负责,如果是需要将消息队列置为dropped,表示删除,之后消费者不再从此消费队列中拉取消息; -
判断是否有新分配给当前消费者的消息队列,如果某个消息队列在最新分配给当前消费者的消息队列集合
mqSet
中,但是不在processQueueTable
中,中,进行以下处理:
- 计算消息拉取偏移量,也就是从哪个位置开始消费,如果消息拉取偏移量大于0,创建ProcessQueue,并放入处理队列表中
processQueueTable
- 构建
PullRequest
,设置消息的拉取信息,并加入到拉取消息请求集合pullRequestList
中
经过这一步,如果分配给当前消费者的消费队列不在
processQueueTable
中,就会构建拉取请求PullRequest
,然后调用dispatchPullRequest处理消息拉取请求,之后会从该消息队列拉取消息,详细过程可参考【RocketMQ】消息的拉取。 - 计算消息拉取偏移量,也就是从哪个位置开始消费,如果消息拉取偏移量大于0,创建ProcessQueue,并放入处理队列表中
public abstract class RebalanceImpl {
// 处理队列表,KEY为消息队列,VALUE为对应的处理信息
protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
// 负载均衡,topic表示当前要进行负载均衡的主题,mqSet中记录了重新分配给当前消费者的消息队列
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
// 处理队列表
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
// 获取消息队列
MessageQueue mq = next.getKey();
// 获取处理队列
ProcessQueue pq = next.getValue();
// 主题是否一致
if (mq.getTopic().equals(topic)) {
// 如果队列集合中不包含当前的队列
if (!mqSet.contains(mq)) {
// 设置为dropped
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) { // 是否过期
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true); // 设置为删除
// ...
break;
default:
break;
}
}
}
}
// 创建拉取请求集合
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
// 遍历本次分配的消息队列集合
for (MessageQueue mq : mqSet) {
// 如果之前不在processQueueTable中
if (!this.processQueueTable.containsKey(mq)) {
// ...
// 创建ProcessQueue
ProcessQueue pq = new ProcessQueue();
long nextOffset = -1L;
try {
// 计算消息拉取偏移量
nextOffset = this.computePullFromWhereWithException(mq);
} catch (Exception e) {
log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
continue;
}
// 如果偏移量大于等于0
if (nextOffset >= 0) {
// 放入处理队列表中
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
// 如果之前已经存在,不需要进行处理
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
// 如果之前不存在,构建PullRequest,之后会加入到阻塞队列中,进行消息拉取
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);// 设置消费组
pullRequest.setNextOffset(nextOffset);// 设置拉取偏移量
pullRequest.setMessageQueue(mq);// 设置消息队列
pullRequest.setProcessQueue(pq);// 设置处理队列
pullRequestList.add(pullRequest);// 加入到拉取消息请求集合
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
// 添加消息拉取请求
this.dispatchPullRequest(pullRequestList);
return changed;
}
}
Rebalance的触发
消费者启动时触发
在文章开头已经讲过,消费者在启动时会进行一次负载均衡,这里便不再赘述。
消费者变更时触发
在消费者注册时讲到,如果发现消费者有变更会触发变更事件,当处于以下两种情况之一时会被判断为消费者发生了变化,需要进行负载均衡:
-
当前注册的消费者对应的Channel对象之前不存在;
-
当前注册的消费者订阅的主题信息发生了变化,也就是消费者订阅的主题有新增或者删除;
public class ConsumerManager {
/**
* 注册消费者
* @param group 消费者组名称
* @param clientChannelInfo 注册的消费者对应的Channel信息
* @param consumeType 消费类型
* @param messageModel
* @param consumeFromWhere 消费消息的位置
* @param subList 消费者订阅的主题信息
* @param isNotifyConsumerIdsChangedEnable 是否通知变更
* @return
*/
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
// 根据组名称获取消费者组信息
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) { // 如果为空新增
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}
// 更新Channel
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
// 更新订阅信息
boolean r2 = consumerGroupInfo.updateSubscription(subList);
// 如果有变更
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
// 通知变更,consumerGroupInfo中存储了该消费者组下的所有消费者的channel
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
// 注册Consumer
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
return r1 || r2;
}
}
Channel变更
在updateChannel
方法中,首先将变更状态updated
初始化为false,然后根据消费者的channel从channelInfoTable
路由表中获取对应的ClientChannelInfo
对象:
-
如果ClientChannelInfo对象获取为空,表示之前不存在该消费者的channel信息,将其加入到路由表中,变更状态置为true,表示消费者有变化;
-
如果获取不为空,判断clientid是否一致,如果不一致更新为最新的channel信息,但是变更状态
updated
不发生变化;
也就是说,如果注册的消费者之前不存在,那么将变更状态置为true,表示消费者数量发生了变化。
// key为消费者对应的channle,value为chanel信息
private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType,
MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {
boolean updated = false; // 变更状态初始化为false
this.consumeType = consumeType;
this.messageModel = messageModel;
this.consumeFromWhere = consumeFromWhere;
// 从channelInfoTable中获取对应的Channel信息,
ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel());
if (null == infoOld) { // 如果为空
// 新增
ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew);
if (null == prev) { // 如果之前不存在
log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType,
messageModel, infoNew.toString());
// 变更状态置为true
updated = true;
}
infoOld = infoNew;
} else {
// 如果之前存在,判断clientid是否一致,如果不一致更新为最新的channel
if (!infoOld.getClientId().equals(infoNew.getClientId())) {
log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ", this.groupName, infoOld.toString(), infoNew.toString());
this.channelInfoTable.put(infoNew.getChannel(), infoNew);
}
}
this.lastUpdateTimestamp = System.currentTimeMillis();
infoOld.setLastUpdateTimestamp(this.lastUpdateTimestamp);
return updated;
}
主题信息订阅变更
updateSubscription
方法中,主要判断了消费的主题订阅信息是否发生了变化,subscriptionTable中记录了之前记录的订阅信息:
- 判断是否有新增的主题订阅信息,主要是通过subscriptionTable是否存在某个主题进行判断的:
- 如果不存在,表示之前没有订阅过某个主题的信息,将其加入到subscriptionTable中,并将变更状态置为true,表示主题订阅信息有变化;
- 如果subscriptionTable中存在某个主题的订阅信息,表示之前就已订阅,将其更新为最新的,但是变更状态不发生变化;
- 判断是否有删除的主题,主要是通过subscriptionTable和subList的对比进行判断的,如果有删除的主题,将变更状态置为true;
如果消费者订阅的主题发生了变化,比如有新增加的主题或者删除了某个主题的订阅,会被判断为主题订阅信息发生了变化。
public class ConsumerGroupInfo {
// 记录了订阅的主题信息,key为topic,value为订阅信息
private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
new ConcurrentHashMap<String, SubscriptionData>();
public boolean updateSubscription(final Set<SubscriptionData> subList) {
boolean updated = false;
// 遍历订阅的主题信息
for (SubscriptionData sub : subList) {
//根据主题获取订阅信息
SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
// 如果获取为空
if (old == null) {
// 加入到subscriptionTable
SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
if (null == prev) {
updated = true; // 变更状态置为true
log.info("subscription changed, add new topic, group: {} {}", this.groupName, sub.toString());
}
} else if (sub.getSubVersion() > old.getSubVersion()) { // 如果版本发生了变化
if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
log.info("subscription changed, group: {} OLD: {} NEW: {}", this.groupName, old.toString(), sub.toString());
}
// 更新为最新的订阅信息
this.subscriptionTable.put(sub.getTopic(), sub);
}
}
Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
// 进行遍历,这一步主要是判断有没有取消订阅的主题
while (it.hasNext()) {
Entry<String, SubscriptionData> next = it.next();
String oldTopic = next.getKey();
boolean exist = false;
// 遍历最新的订阅信息
for (SubscriptionData sub : subList) {
// 如果在旧的订阅信息中存在就终止,继续判断下一个主题
if (sub.getTopic().equals(oldTopic)) {
exist = true;
break;
}
}
// 走到这里,表示有取消订阅的主题
if (!exist) {
log.warn("subscription changed, group: {} remove topic {} {}",this.groupName, oldTopic, next.getValue().toString());
// 进行删除
it.remove();
// 变更状态置为true
updated = true;
}
}
this.lastUpdateTimestamp = System.currentTimeMillis();
return updated;
}
}
变更请求发送
上面讲解了两种被判定为消费者发生变化的情况,被判定为变化之后,会触调用DefaultConsumerIdsChangeListener
中的handle
方法触发变更事件,在方法中传入了消费者组下的所有消费者的channel对象,会发送变更请求通知该消费者组下的所有消费者,进行负载均衡。
DefaultConsumerIdsChangeListener
中处理变更事件时,会对消费组下的所有消费者遍历,调用notifyConsumerIdsChanged
方法向每一个消费者发送变更请求:
public class ConsumerManager {
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
// ...
// 更新Channel
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
// 更新订阅信息
boolean r2 = consumerGroupInfo.updateSubscription(subList);
// 如果有变更
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
// 触发变更事件,consumerGroupInfo中存储了该消费者组下的所有消费者的channel
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
// ...
}
}
// DefaultConsumerIdsChangeListener
public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
@Override
public void handle(ConsumerGroupEvent event, String group, Object... args) {
if (event == null) {
return;
}
switch (event) {
case CHANGE:// 如果是消费者变更事件
case CHANGE:
if (args == null || args.length < 1) {
return;
}
// 获取所有的消费者对应的channel
List<Channel> channels = (List<Channel>) args[0];
if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {
for (Channel chl : channels) {
// 向每一个消费者发送变更请求
this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);
}
}
break;
// ...
}
}
}
请求的发送是在Broker2Client
的notifyConsumerIdsChanged
方法中实现的,可以看到会创建NOTIFY_CONSUMER_IDS_CHANGED
请求并发送:
public class Broker2Client {
public void notifyConsumerIdsChanged(
final Channel channel,
final String consumerGroup) {
if (null == consumerGroup) {
log.error("notifyConsumerIdsChanged consumerGroup is null");
return;
}
NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
// 创建变更通知请求
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
try {
// 发送请求
this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
} catch (Exception e) {
log.error("notifyConsumerIdsChanged exception. group={}, error={}", consumerGroup, e.toString());
}
}
}
变更通知请求处理
消费者对Broker发送的NOTIFY_CONSUMER_IDS_CHANGED
的请求处理在ClientRemotingProcessor
的processRequest
方法中,它会调用notifyConsumerIdsChanged
方法进行处理,在notifyConsumerIdsChanged
方法中可以看到触发了一次负载均衡:
public class ClientRemotingProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.CHECK_TRANSACTION_STATE:
return this.checkTransactionState(ctx, request);
case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED: // NOTIFY_CONSUMER_IDS_CHANGED请求处理
// 处理变更请求
return this.notifyConsumerIdsChanged(ctx, request);
// ...
default:
break;
}
return null;
}
public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
try {
// ...
// 触发负载均衡
this.mqClientFactory.rebalanceImmediately();
} catch (Exception e) {
log.error("notifyConsumerIdsChanged exception", RemotingHelper.exceptionSimpleDesc(e));
}
return null;
}
}
消费者停止时触发
消费者在停止时,需要将当前消费者负责的消息队列分配给其他消费者进行消费,所以在shutdown
方法中会调用MQClientInstance
的unregisterConsumer
方法取消注册:
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public synchronized void shutdown(long awaitTerminateMillis) {
switch (this.serviceState) {
case CREATE_JUST:
break;
case RUNNING:
this.consumeMessageService.shutdown(awaitTerminateMillis);
this.persistConsumerOffset();
// 取消注册
this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
this.mQClientFactory.shutdown();
// ...
break;
case SHUTDOWN_ALREADY:
break;
default:
break;
}
}
}
在unregisterConsumer
方法中,又调用了unregisterClient
方法取消注册,与注册消费者的逻辑相似,它会向所有的Broker发送取消注册的请求:
public class MQClientInstance {
public synchronized void unregisterConsumer(final String group) {
this.consumerTable.remove(group);
// 取消注册
this.unregisterClient(null, group);
}
private void unregisterClient(final String producerGroup, final String consumerGroup) {
// 获取所有的Broker
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
// 进行遍历
while (it.hasNext()) {
// ...
if (oneTable != null) {
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
String addr = entry1.getValue();
if (addr != null) {
try {
// 发送取消注册请求
this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, clientConfig.getMqClientApiTimeout());
// ...
} // ...
}
}
}
}
}
}
取消注册请求的发送是在MQClientAPIImpl
中unregisterClient
方法实现的,可以看到构建了UNREGISTER_CLIENT
请求并发送:
public class MQClientAPIImpl {
public void unregisterClient(final String addr, final String clientID, final String producerGroup, final String consumerGroup, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
// ...
requestHeader.setConsumerGroup(consumerGroup);
// 构建UNREGISTER_CLIENT请求
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader);
// 发送请求
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
// ...
}
}
与注册消费者的请求处理一样,Broker对UNREGISTER_CLIENT
的请求同样是在ClientManageProcessor
的processRequest
中处理的,对于UNREGISTER_CLIENT
请求是调用unregisterClient方法处理的,里面又调用了ConsumerManager
的unregisterConsumer
方法进行取消注册:
public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.HEART_BEAT: // 处理心跳请求
return this.heartBeat(ctx, request);
case RequestCode.UNREGISTER_CLIENT: // 取消注册请求
return this.unregisterClient(ctx, request);
case RequestCode.CHECK_CLIENT_CONFIG:
return this.checkClientConfig(ctx, request);
default:
break;
}
return null;
}
public RemotingCommand unregisterClient(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
// ...
{
final String group = requestHeader.getConsumerGroup();
if (group != null) {
// ...
// 取消消费者的注册
this.brokerController.getConsumerManager().unregisterConsumer(group, clientChannelInfo, isNotifyConsumerIdsChangedEnable);
}
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
}
在ConsumerManager
的unregisterConsumer
方法中,可以看到触发了取消注册事件,之后如果开启了允许通知变更,会触发变更事件,变更事件在上面已经讲解过,它会通知消费者组下的所有消费者进行一次负载均衡:
public class ConsumerManager {
public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo,
boolean isNotifyConsumerIdsChangedEnable) {
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null != consumerGroupInfo) {
consumerGroupInfo.unregisterChannel(clientChannelInfo);
if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {
ConsumerGroupInfo remove = this.consumerTable.remove(group);
if (remove != null) {
// 触发取消注册事件
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, group);
}
}
// 触发消费者变更事件
if (isNotifyConsumerIdsChangedEnable) {
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
}
}
消费者定时触发
在RebalanceService
的run方法中,可以看到设置了等待时间,默认是20s,所以消费者本身也会定时执行负载均衡:
public class RebalanceService extends ServiceThread {
private static long waitInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.waitInterval", "20000"));
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval); // 等待
// 负载均衡
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
}
总结
参考
田守枝-深入理解RocketMQ Rebalance机制
RocketMQ版本:4.9.3