【RocketMQ】负载均衡源码分析

news2024/12/28 3:24:39

RocketMQ在集群模式下,同一个消费组内,一个消息队列同一时间只能分配给组内的某一个消费者,也就是一条消息只能被组内的一个消费者进行消费,为了合理的对消息队列进行分配,于是就有了负载均衡。

img

接下来以集群模式下的消息推模式DefaultMQPushConsumerImpl为例,看一下负载均衡的过程。

消费者负载均衡

首先,消费者在启动时会做如下操作:

  1. 从NameServer更新当前消费者订阅主题的路由信息;
  2. 向Broker发送心跳,注册消费者;
  3. 唤醒负载均衡服务,触发一次负载均衡;
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
    public synchronized void start() throws MQClientException {
        // ...
        // 更新当前消费者订阅主题的路由信息
        this.updateTopicSubscribeInfoWhenSubscriptionChanged();
        this.mQClientFactory.checkClientInBroker();
        // 向Broker发送心跳
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        // 唤醒负载均衡服务
        this.mQClientFactory.rebalanceImmediately();
    }
}

更新主题路由信息

为了保证消费者拿到的主题路由信息是最新的(topic下有几个消息队列、消息队列的分布信息等),在进行负载均衡之前首先要更新主题的路由信息,在updateTopicSubscribeInfoWhenSubscriptionChanged方法中可以看到,首先获取了当前消费者订阅的所有主题信息(一个消费者可以订阅多个主题),然后进行遍历,向NameServer发送请求,更新每一个主题的路由信息,保证路由信息是最新的:

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
    private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
        // 获取当前消费者订阅的主题信息
        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
        if (subTable != null) {
            // 遍历订阅的主题信息
            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                final String topic = entry.getKey();
                // 从NameServer更新主题的路由信息
                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            }
        }
    }
}

注册消费者

发送心跳

由于Broker需要感知消费者数量的增减,所以每个消费者在启动的时候,会调用sendHeartbeatToAllBrokerWithLock向Broker发送心跳包,进行消费者注册:

public class MQClientInstance {
    public void sendHeartbeatToAllBrokerWithLock() {
        if (this.lockHeartbeat.tryLock()) {
            try {
                // 调用sendHeartbeatToAllBroker向Broker发送心跳
                this.sendHeartbeatToAllBroker();
                this.uploadFilterClassSource();
            } catch (final Exception e) {
                log.error("sendHeartbeatToAllBroker exception", e);
            } finally {
                this.lockHeartbeat.unlock();
            }
        } else {
            log.warn("lock heartBeat, but failed. [{}]", this.clientId);
        }
    }
}

sendHeartbeatToAllBroker方法中,可以看到从brokerAddrTable中获取了所有的Broker进行遍历(主从模式下也会向从节点发送请求注册),调用MQClientAPIImplsendHearbeat方法向每一个Broker发送心跳请求进行注册:

public class MQClientInstance {
    // Broker路由表
    private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
        new ConcurrentHashMap<String, HashMap<Long, String>>();
    // 发送心跳
    private void sendHeartbeatToAllBroker() {
        final HeartbeatData heartbeatData = this.prepareHeartbeatData();
        // ...
        if (!this.brokerAddrTable.isEmpty()) {
            long times = this.sendHeartbeatTimesTotal.getAndIncrement();
            // 获取所有的Broker进行遍历, key为 Broker Name, value为同一个name下的所有Broker实例(主从模式下Broker的name一致)
            Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
            while (it.hasNext()) {
                Entry<String, HashMap<Long, String>> entry = it.next();
                String brokerName = entry.getKey(); // broker name
                // 获取同一个Broker Name下的所有Broker实例
                HashMap<Long, String> oneTable = entry.getValue();
                if (oneTable != null) {
                    // 遍历所有的实例
                    for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
                        Long id = entry1.getKey();
                        String addr = entry1.getValue();
                        if (addr != null) { // 如果地址不为空
                            // ...
                            try {
                                // 发送心跳
                                int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());
                                // ...
                            } catch (Exception e) {
                                // ...
                            }
                        }
                    }
                }
            }
        }
    }
}

MQClientAPIImplsendHearbeat方法中,可以看到构建了HEART_BEAT请求,然后向Broker发送:

public class MQClientAPIImpl {
   public int sendHearbeat(final String addr, final HeartbeatData heartbeatData, final long timeoutMillis
    ) throws RemotingException, MQBrokerException, InterruptedException {
        // 创建HEART_BEAT请求
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
        request.setLanguage(clientConfig.getLanguage());
        request.setBody(heartbeatData.encode());
        // 发送请求
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        // ...
    }
}

心跳请求处理

Broker在启动时注册了HEART_BEAT请求的处理器,可以看到请求处理器是ClientManageProcessor

public class BrokerController {
    public void registerProcessor() {
        ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
        // 注册HEART_BEAT请求的处理器ClientManageProcessor
        this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
    }
}

进入到ClientManageProcessorprocessRequest方法,如果请求是HEART_BEAT类型会调用heartBeat方法进行处理,这里也能看还有UNREGISTER_CLIENT类型的请求,从名字上可以看出是与取消注册有关的(这个稍后再说):

public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {
        switch (request.getCode()) {
            case RequestCode.HEART_BEAT: // 处理心跳请求
                return this.heartBeat(ctx, request);
            case RequestCode.UNREGISTER_CLIENT: // 取消注册请求
                return this.unregisterClient(ctx, request);
            case RequestCode.CHECK_CLIENT_CONFIG:
                return this.checkClientConfig(ctx, request);
            default:
                break;
        }
        return null;
    }
}

进入到heartBeat方法,可以看到,调用了ConsumerManagerregisterConsumer注册消费者:

public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
        // ...
        for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
            // ...
            // 注册Consumer
            boolean changed = this.brokerController.getConsumerManager().registerConsumer(
                data.getGroupName(), clientChannelInfo, data.getConsumeType(), data.getMessageModel(), data.getConsumeFromWhere(),
                data.getSubscriptionDataSet(), isNotifyConsumerIdsChangedEnable);
            // ...
        }
        // ...
        return response;
    }
}

进行注册

ConsumerManagerregisterConsumer方法的理逻辑如下:

  1. 根据组名称获取该消费者组的信息ConsumerGroupInfo对象。如果获取为空,会创建一个ConsumerGroupInfo,记录了消费者组的相关信息;
  2. 判断消费者是否发生了变更,如果如果发生了变化,会触发CHANGE变更事件(这个稍后再看);
  3. 触发REGISTER注册事件;
public class ConsumerManager {
    public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
        ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
        final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
        // 根据组名称获取消费者组信息
        ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
        if (null == consumerGroupInfo) { // 如果为空新增ConsumerGroupInfo对象
            ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
            ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
            consumerGroupInfo = prev != null ? prev : tmp;
        }
        boolean r1 =
            consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
                consumeFromWhere);
        boolean r2 = consumerGroupInfo.updateSubscription(subList);
        // 如果有变更
        if (r1 || r2) {
            if (isNotifyConsumerIdsChangedEnable) {
                // 通知变更
                this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
            }
        }
        // 注册Consumer
        this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
        return r1 || r2;
    }
}

进入到DefaultConsumerIdsChangeListenerhandle方法中,可以看到如果是REGISTER事件,会通过ConsumerFilterManagerregister方法进行注册,注册的详细过程这里先不展开讲解:

public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
    @Override
    public void handle(ConsumerGroupEvent event, String group, Object... args) {
        if (event == null) {
            return;
        }
        switch (event) {
            case CHANGE:// 如果是消费者变更事件
                // ...
                break;
            case UNREGISTER: // 如果是取消注册事件
                this.brokerController.getConsumerFilterManager().unRegister(group);
                break;
            case REGISTER: // 如果是注册事件
                if (args == null || args.length < 1) {
                    return;
                }
                Collection<SubscriptionData> subscriptionDataList = (Collection<SubscriptionData>) args[0];
                // 进行注册
                this.brokerController.getConsumerFilterManager().register(group, subscriptionDataList);
                break;
            default:
                throw new RuntimeException("Unknown event " + event);
        }
    }
}

负载均衡

经过以上步骤之后,会调用MQClientInstance的rebalanceImmediately唤醒负载均衡服务进行一次负载均衡,为消费者分配消息队列,需要注意的是负载均衡是由消费者端执行

// MQClientInstance
public class MQClientInstance {
    private final RebalanceService rebalanceService;

    public void rebalanceImmediately() {
        // 唤醒负载均衡服务
        this.rebalanceService.wakeup();
    }
}

// RebalanceService
public class RebalanceService extends ServiceThread {
    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");
        while (!this.isStopped()) {
            this.waitForRunning(waitInterval);
            // 负载均衡
            this.mqClientFactory.doRebalance();
        }
        log.info(this.getServiceName() + " service end");
    }
}

负载均衡的过程在【RocketMQ】消息的拉取一文中已经讲解,这里挑一些重点内容看一下。

在负载均衡的时候,首先会获取当前消费者订阅的主题信息,对订阅的主题进行遍历,对每一个主题进行负载均衡,重新分配:

public abstract class RebalanceImpl {
    public void doRebalance(final boolean isOrder) {
        // 获取订阅的主题信息
        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
        if (subTable != null) {
            // 遍历所有订阅的主题
            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                final String topic = entry.getKey();
                try {
                    // 根据主题进行负载均衡
                    this.rebalanceByTopic(topic, isOrder);
                } catch (Throwable e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("rebalanceByTopic Exception", e);
                    }
                }
            }
        }
        this.truncateMessageQueueNotMyTopic();
    }
}

根据主题进行负载均衡

rebalanceByTopic方法中根据消费模式进行了判断然后对主题进行负载均衡,这里我们关注集群模式下的负载均衡:

  1. topicSubscribeInfoTable中根据主题获取对应的消息队列集合,这一步可以得到主题下的所有消息队列信息

  2. 根据主题信息和消费者组名称,获取所有订阅了该主题的消费者ID集合,这一步得到了订阅该主题的所有消费者

  3. 如果主题对应的消息队列集合和消费者ID都不为空,对消息队列集合和消费ID集合进行排序,排序是为了接下来进行分配;

  4. 获取设置的分配策略,根据分配策略,为消费者分配对应的消费队列,以平均分配策略为例,它会根据消息队列的数量和消费者的个数计算每个消费者分配的队列个数:

    img

  5. 根据最新分配的消息队列信息,调用updateProcessQueueTableInRebalance更新当前消费者消费的处理队列ProcessQueue信息

public abstract class RebalanceImpl {

    // 根据主题进行负载均衡
    private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: { // 广播模式
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                // ... 
                break;
            }
            case CLUSTERING: { // 集群模式
                // 根据主题获取订阅的消息队列
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                // 获取所有订阅了该主题的消费者id
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                // ...
                if (mqSet != null && cidAll != null) { // 如果都不为空
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);
                    // 对消息队列排序
                    Collections.sort(mqAll);
                    // 对消费者排序
                    Collections.sort(cidAll);
                    // 获取分配策略
                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
                    List<MessageQueue> allocateResult = null;
                    try {
                        // 根据分配策略,为消费者分配消费队列
                        allocateResult = strategy.allocate(
                            this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll);
                    } catch (Throwable e) {
                       // ...
                    }
                    // 分配给当前消费的消费队列
                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        // 将分配结果加入到结果集合中
                        allocateResultSet.addAll(allocateResult);
                    }
                    // 根据分配信息更新处理队列
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    // ...
                }
                break;
            }
            default:
                break;
        }
    }
}

更新处理队列

负载均衡之后,消费者负责的消息队列有可能发生变化,一个消息队列MessageQueue对应一个处理队列ProcessQueueprocessQueueTable记录了消费者负责的队列信息,此时需要对其进行更新,处理逻辑如下:

  1. processQueueTable进行遍历,处理每一个消息队列,这一步主要是判断重新分配之后,processQueueTable中记录的某些消息队列是否已经不再由当前消费者负责,如果是需要将消息队列置为dropped,表示删除,之后消费者不再从此消费队列中拉取消息;

  2. 判断是否有新分配给当前消费者的消息队列,如果某个消息队列在最新分配给当前消费者的消息队列集合mqSet中,但是不在processQueueTable中,

    中,进行以下处理:

    • 计算消息拉取偏移量,也就是从哪个位置开始消费,如果消息拉取偏移量大于0,创建ProcessQueue,并放入处理队列表中processQueueTable
    • 构建PullRequest,设置消息的拉取信息,并加入到拉取消息请求集合pullRequestList

    经过这一步,如果分配给当前消费者的消费队列不在processQueueTable中,就会构建拉取请求PullRequest,然后调用dispatchPullRequest处理消息拉取请求,之后会从该消息队列拉取消息,详细过程可参考【RocketMQ】消息的拉取。

public abstract class RebalanceImpl {
    // 处理队列表,KEY为消息队列,VALUE为对应的处理信息
    protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
    // 负载均衡,topic表示当前要进行负载均衡的主题,mqSet中记录了重新分配给当前消费者的消息队列
    private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
        final boolean isOrder) {
        boolean changed = false;
        // 处理队列表
        Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<MessageQueue, ProcessQueue> next = it.next();
            // 获取消息队列
            MessageQueue mq = next.getKey();
            // 获取处理队列
            ProcessQueue pq = next.getValue();
            // 主题是否一致
            if (mq.getTopic().equals(topic)) {
                // 如果队列集合中不包含当前的队列
                if (!mqSet.contains(mq)) {
                    // 设置为dropped
                    pq.setDropped(true);
                    if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                        it.remove();
                        changed = true;
                        log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
                    }
                } else if (pq.isPullExpired()) { // 是否过期
                    switch (this.consumeType()) {
                        case CONSUME_ACTIVELY:
                            break;
                        case CONSUME_PASSIVELY:
                            pq.setDropped(true); // 设置为删除
                            // ...
                            break;
                        default:
                            break;
                    }
                }
            }
        }
        // 创建拉取请求集合
        List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
        // 遍历本次分配的消息队列集合
        for (MessageQueue mq : mqSet) {
            // 如果之前不在processQueueTable中
            if (!this.processQueueTable.containsKey(mq)) {
                // ...
                // 创建ProcessQueue
                ProcessQueue pq = new ProcessQueue();
                long nextOffset = -1L;
                try {
                    // 计算消息拉取偏移量
                    nextOffset = this.computePullFromWhereWithException(mq);
                } catch (Exception e) {
                    log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
                    continue;
                }
                // 如果偏移量大于等于0
                if (nextOffset >= 0) {
                    // 放入处理队列表中
                    ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                    // 如果之前已经存在,不需要进行处理
                    if (pre != null) {
                        log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                    } else {
                        // 如果之前不存在,构建PullRequest,之后会加入到阻塞队列中,进行消息拉取
                        log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                        PullRequest pullRequest = new PullRequest();
                        pullRequest.setConsumerGroup(consumerGroup);// 设置消费组
                        pullRequest.setNextOffset(nextOffset);// 设置拉取偏移量
                        pullRequest.setMessageQueue(mq);// 设置消息队列
                        pullRequest.setProcessQueue(pq);// 设置处理队列
                        pullRequestList.add(pullRequest);// 加入到拉取消息请求集合
                        changed = true;
                    }
                } else {
                    log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                }
            }
        }
        // 添加消息拉取请求
        this.dispatchPullRequest(pullRequestList);
        return changed;
    }
}

Rebalance的触发

消费者启动时触发

在文章开头已经讲过,消费者在启动时会进行一次负载均衡,这里便不再赘述。

消费者变更时触发

在消费者注册时讲到,如果发现消费者有变更会触发变更事件,当处于以下两种情况之一时会被判断为消费者发生了变化,需要进行负载均衡:

  • 当前注册的消费者对应的Channel对象之前不存在;

  • 当前注册的消费者订阅的主题信息发生了变化,也就是消费者订阅的主题有新增或者删除;

public class ConsumerManager {
    
    /**
     *  注册消费者
     * @param group 消费者组名称
     * @param clientChannelInfo 注册的消费者对应的Channel信息
     * @param consumeType 消费类型
     * @param messageModel 
     * @param consumeFromWhere 消费消息的位置
     * @param subList 消费者订阅的主题信息
     * @param isNotifyConsumerIdsChangedEnable 是否通知变更
     * @return
     */
    public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
        ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
        final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
        // 根据组名称获取消费者组信息
        ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
        if (null == consumerGroupInfo) { // 如果为空新增
            ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
            ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
            consumerGroupInfo = prev != null ? prev : tmp;
        }
        // 更新Channel
        boolean r1 =
            consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
                consumeFromWhere);
        // 更新订阅信息
        boolean r2 = consumerGroupInfo.updateSubscription(subList);
        // 如果有变更
        if (r1 || r2) {
            if (isNotifyConsumerIdsChangedEnable) {
                // 通知变更,consumerGroupInfo中存储了该消费者组下的所有消费者的channel
                this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
            }
        }
        // 注册Consumer
        this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
        return r1 || r2;
    }
}

Channel变更

updateChannel方法中,首先将变更状态updated初始化为false,然后根据消费者的channel从channelInfoTable路由表中获取对应的ClientChannelInfo对象:

  • 如果ClientChannelInfo对象获取为空,表示之前不存在该消费者的channel信息,将其加入到路由表中,变更状态置为true,表示消费者有变化;

  • 如果获取不为空,判断clientid是否一致,如果不一致更新为最新的channel信息,但是变更状态updated不发生变化;

也就是说,如果注册的消费者之前不存在,那么将变更状态置为true,表示消费者数量发生了变化。

 
// key为消费者对应的channle,value为chanel信息
   private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
        new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
   public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType,
        MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {
        boolean updated = false; // 变更状态初始化为false
        this.consumeType = consumeType;
        this.messageModel = messageModel;
        this.consumeFromWhere = consumeFromWhere;
        // 从channelInfoTable中获取对应的Channel信息, 
        ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel());
        if (null == infoOld) { // 如果为空
            // 新增
            ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew);
            if (null == prev) { // 如果之前不存在
                log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType,
                    messageModel, infoNew.toString());
                // 变更状态置为true
                updated = true;
            }
            infoOld = infoNew;
        } else {
            // 如果之前存在,判断clientid是否一致,如果不一致更新为最新的channel
            if (!infoOld.getClientId().equals(infoNew.getClientId())) { 
                log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ", this.groupName, infoOld.toString(), infoNew.toString());
                this.channelInfoTable.put(infoNew.getChannel(), infoNew);
            }
        }
        this.lastUpdateTimestamp = System.currentTimeMillis();
        infoOld.setLastUpdateTimestamp(this.lastUpdateTimestamp);
        return updated;
    }

主题信息订阅变更

updateSubscription方法中,主要判断了消费的主题订阅信息是否发生了变化,subscriptionTable中记录了之前记录的订阅信息:

  1. 判断是否有新增的主题订阅信息,主要是通过subscriptionTable是否存在某个主题进行判断的:
    • 如果不存在,表示之前没有订阅过某个主题的信息,将其加入到subscriptionTable中,并将变更状态置为true,表示主题订阅信息有变化;
    • 如果subscriptionTable中存在某个主题的订阅信息,表示之前就已订阅,将其更新为最新的,但是变更状态不发生变化;
  2. 判断是否有删除的主题,主要是通过subscriptionTable和subList的对比进行判断的,如果有删除的主题,将变更状态置为true;

如果消费者订阅的主题发生了变化,比如有新增加的主题或者删除了某个主题的订阅,会被判断为主题订阅信息发生了变化。

public class ConsumerGroupInfo {
    // 记录了订阅的主题信息,key为topic,value为订阅信息
    private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
        new ConcurrentHashMap<String, SubscriptionData>();   
    
    public boolean updateSubscription(final Set<SubscriptionData> subList) {
        boolean updated = false;
        // 遍历订阅的主题信息
        for (SubscriptionData sub : subList) {
            //根据主题获取订阅信息
            SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
            // 如果获取为空
            if (old == null) {
                // 加入到subscriptionTable
                SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
                if (null == prev) {
                    updated = true; // 变更状态置为true
                    log.info("subscription changed, add new topic, group: {} {}", this.groupName, sub.toString());
                }
            } else if (sub.getSubVersion() > old.getSubVersion()) { // 如果版本发生了变化
                if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
                    log.info("subscription changed, group: {} OLD: {} NEW: {}", this.groupName, old.toString(), sub.toString());
                }
                // 更新为最新的订阅信息
                this.subscriptionTable.put(sub.getTopic(), sub);
            }
        }
        Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
        // 进行遍历,这一步主要是判断有没有取消订阅的主题
        while (it.hasNext()) {
            Entry<String, SubscriptionData> next = it.next();
            String oldTopic = next.getKey();
            boolean exist = false;
            // 遍历最新的订阅信息
            for (SubscriptionData sub : subList) {
                // 如果在旧的订阅信息中存在就终止,继续判断下一个主题
                if (sub.getTopic().equals(oldTopic)) {
                    exist = true;
                    break;
                }
            }
            // 走到这里,表示有取消订阅的主题
            if (!exist) {
                log.warn("subscription changed, group: {} remove topic {} {}",this.groupName, oldTopic, next.getValue().toString());
                // 进行删除
                it.remove();
                // 变更状态置为true
                updated = true;
            }
        }
        this.lastUpdateTimestamp = System.currentTimeMillis();
        return updated;
    }
}

变更请求发送

上面讲解了两种被判定为消费者发生变化的情况,被判定为变化之后,会触调用DefaultConsumerIdsChangeListener中的handle方法触发变更事件,在方法中传入了消费者组下的所有消费者的channel对象,会发送变更请求通知该消费者组下的所有消费者,进行负载均衡。

DefaultConsumerIdsChangeListener中处理变更事件时,会对消费组下的所有消费者遍历,调用notifyConsumerIdsChanged方法向每一个消费者发送变更请求:

public class ConsumerManager {
   
    public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
        ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
        final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
        // ...
        // 更新Channel
        boolean r1 =
            consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
                consumeFromWhere);
        // 更新订阅信息
        boolean r2 = consumerGroupInfo.updateSubscription(subList);
        // 如果有变更
        if (r1 || r2) {
            if (isNotifyConsumerIdsChangedEnable) {
                // 触发变更事件,consumerGroupInfo中存储了该消费者组下的所有消费者的channel
                this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
            }
        }
        // ...
    }
}

// DefaultConsumerIdsChangeListener
public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
    @Override
    public void handle(ConsumerGroupEvent event, String group, Object... args) {
        if (event == null) {
            return;
        }
        switch (event) {
            case CHANGE:// 如果是消费者变更事件
                case CHANGE:
                if (args == null || args.length < 1) {
                    return;
                }
                // 获取所有的消费者对应的channel
                List<Channel> channels = (List<Channel>) args[0];
                if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {
                    for (Channel chl : channels) {
                        // 向每一个消费者发送变更请求
                        this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);
                    }
                }
                break;
             // ...
        }
    }
}

请求的发送是在Broker2ClientnotifyConsumerIdsChanged方法中实现的,可以看到会创建NOTIFY_CONSUMER_IDS_CHANGED请求并发送:

    
public class Broker2Client {
    public void notifyConsumerIdsChanged(
        final Channel channel,
        final String consumerGroup) {
        if (null == consumerGroup) {
            log.error("notifyConsumerIdsChanged consumerGroup is null");
            return;
        }
        NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
        requestHeader.setConsumerGroup(consumerGroup);
        // 创建变更通知请求
        RemotingCommand request =
            RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
        try {
            // 发送请求
            this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
        } catch (Exception e) {
            log.error("notifyConsumerIdsChanged exception. group={}, error={}", consumerGroup, e.toString());
        }
    }
}

变更通知请求处理

消费者对Broker发送的NOTIFY_CONSUMER_IDS_CHANGED的请求处理在ClientRemotingProcessorprocessRequest方法中,它会调用notifyConsumerIdsChanged方法进行处理,在notifyConsumerIdsChanged方法中可以看到触发了一次负载均衡:

public class ClientRemotingProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    
    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        switch (request.getCode()) {
            case RequestCode.CHECK_TRANSACTION_STATE:
                return this.checkTransactionState(ctx, request);
            case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED: // NOTIFY_CONSUMER_IDS_CHANGED请求处理
                // 处理变更请求
                return this.notifyConsumerIdsChanged(ctx, request); 
            // ...
            default:
                break;
        }
        return null;
    }
    
    public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        try {
            // ...
            // 触发负载均衡
            this.mqClientFactory.rebalanceImmediately();
        } catch (Exception e) {
            log.error("notifyConsumerIdsChanged exception", RemotingHelper.exceptionSimpleDesc(e));
        }
        return null;
    }
}

消费者停止时触发

消费者在停止时,需要将当前消费者负责的消息队列分配给其他消费者进行消费,所以在shutdown方法中会调用MQClientInstanceunregisterConsumer方法取消注册:

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
   public synchronized void shutdown(long awaitTerminateMillis) {
        switch (this.serviceState) {
            case CREATE_JUST:
                break;
            case RUNNING:
                this.consumeMessageService.shutdown(awaitTerminateMillis);
                this.persistConsumerOffset();
                // 取消注册
                this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
                this.mQClientFactory.shutdown();
                // ...
                break;
            case SHUTDOWN_ALREADY:
                break;
            default:
                break;
        }
    }
}

unregisterConsumer方法中,又调用了unregisterClient方法取消注册,与注册消费者的逻辑相似,它会向所有的Broker发送取消注册的请求:

public class MQClientInstance {
    public synchronized void unregisterConsumer(final String group) {
        this.consumerTable.remove(group);
        // 取消注册
        this.unregisterClient(null, group);
    }
    
    private void unregisterClient(final String producerGroup, final String consumerGroup) {
        // 获取所有的Broker
        Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
        // 进行遍历
        while (it.hasNext()) {
            // ...
            if (oneTable != null) {
                for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
                    String addr = entry1.getValue();
                    if (addr != null) {
                        try {
                            // 发送取消注册请求
                            this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, clientConfig.getMqClientApiTimeout());
                            // ...
                        } // ...
                    }
                }
            }
        }
    }
}

取消注册请求的发送是在MQClientAPIImplunregisterClient方法实现的,可以看到构建了UNREGISTER_CLIENT请求并发送:

public class MQClientAPIImpl { 
    public void unregisterClient(final String addr, final String clientID, final String producerGroup,  final String consumerGroup, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
        // ...
        requestHeader.setConsumerGroup(consumerGroup);
        // 构建UNREGISTER_CLIENT请求
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader);
        // 发送请求
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        // ...
    }
}

与注册消费者的请求处理一样,Broker对UNREGISTER_CLIENT的请求同样是在ClientManageProcessorprocessRequest中处理的,对于UNREGISTER_CLIENT请求是调用unregisterClient方法处理的,里面又调用了ConsumerManagerunregisterConsumer方法进行取消注册:

public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {
        switch (request.getCode()) {
            case RequestCode.HEART_BEAT: // 处理心跳请求
                return this.heartBeat(ctx, request);
            case RequestCode.UNREGISTER_CLIENT: // 取消注册请求
                return this.unregisterClient(ctx, request);
            case RequestCode.CHECK_CLIENT_CONFIG:
                return this.checkClientConfig(ctx, request);
            default:
                break;
        }
        return null;
    }
    
    public RemotingCommand unregisterClient(ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {
        // ...
        {
            final String group = requestHeader.getConsumerGroup();
            if (group != null) {
                // ...
                // 取消消费者的注册
                this.brokerController.getConsumerManager().unregisterConsumer(group, clientChannelInfo, isNotifyConsumerIdsChangedEnable);
            }
        }
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }
}

ConsumerManagerunregisterConsumer方法中,可以看到触发了取消注册事件,之后如果开启了允许通知变更,会触发变更事件,变更事件在上面已经讲解过,它会通知消费者组下的所有消费者进行一次负载均衡:


public class ConsumerManager {    
    public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo,
        boolean isNotifyConsumerIdsChangedEnable) {
        ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
        if (null != consumerGroupInfo) {
            consumerGroupInfo.unregisterChannel(clientChannelInfo);
            if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {
                ConsumerGroupInfo remove = this.consumerTable.remove(group);
                if (remove != null) {
                    // 触发取消注册事件
                    this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, group);
                }
            }
            // 触发消费者变更事件
            if (isNotifyConsumerIdsChangedEnable) {
                this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
            }
        }
    }
}

消费者定时触发

RebalanceService的run方法中,可以看到设置了等待时间,默认是20s,所以消费者本身也会定时执行负载均衡:

public class RebalanceService extends ServiceThread {
    private static long waitInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.waitInterval", "20000"));
  
    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");
        while (!this.isStopped()) {
            this.waitForRunning(waitInterval); // 等待
            // 负载均衡
            this.mqClientFactory.doRebalance();
        }
        log.info(this.getServiceName() + " service end");
    }
}

总结

参考

田守枝-深入理解RocketMQ Rebalance机制

RocketMQ版本:4.9.3

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/417141.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

智优ERP的升级版智优E3_ERP,可以自定义列,和自定义打印公司logo

新版的智优E3_ERP系统&#xff0c;新增了许多供自定义的列。 系统能够解决的企业管理问题&#xff1a; 一、日常的出入库管理、收付款管理、往来对账、移动加权平均成本核算、以及相关数据的查询分析&#xff1b; 二、订单的跟单管理&#xff08;包括销售跟单、采购跟单、生产…

ElasticSearch常用查询操作

ES查询 一般我们使用ES最多的就是查询&#xff0c;今天就讲一下ES的查询。这里我是建了一个person的索引。 "person" : {"aliases" : { },"mappings" : {"properties" : {"address" : {"type" : "text"…

[LeetCode周赛复盘] 第 102 场双周赛20230415

[LeetCode周赛复盘] 第 102 场双周赛20230415 一、本周周赛总结二、 6333. 查询网格图中每一列的宽度1. 题目描述2. 思路分析3. 代码实现三、6334. 一个数组所有前缀的分数1. 题目描述2. 思路分析3. 代码实现四、6335. 二叉树的堂兄弟节点 II1. 题目描述2. 思路分析3. 代码实现…

English Learning - L2 第 15 次小组纠音 助动词弱读和重音节奏 2023.4.15 周六

English Learning - L2 第 15 次小组纠音 助动词弱读和重音节奏 2023.4.15 周六共性问题have has /hv/ /hz/ 弱读成 /həv/ /həz/fine left /faɪn/ /left/late changed train /leɪt/ /ʧeɪnʤd/ /treɪn/ 中的 eɪmoment problem time /ˈməʊmənt/ /ˈprɒbləm/ /taɪm…

4.10~4.11学习总结

ER图的学习&#xff1a; 学习了ER图相关知识&#xff0c;并绘制了项目大概的ER图 详细笔记博客&#xff1a;http://t.csdn.cn/YOJxq MySQL的学习&#xff1a; 函数 学习了字符串函数&#xff0c;数值函数&#xff0c;日期函数&#xff0c;流程函数。 约束 作用于表中字段的规则…

改善Instagram客户服务的6个技巧

Instagram仍然是全球前四大社交网络&#xff0c;按用户数量排名。它通过其创新的过滤器、内容创建工具、视频和卷轴选项继续增长并推动流量。这是一个平台&#xff0c;世界顶级名人和有影响力的人可以为全球用户提供有趣和令人印象深刻的内容。 但不仅仅是一个娱乐平台&#xf…

Nestjs实战干货-概况-异常过滤器-Exception filters

异常过滤器 Nest 带有一个内置的异常层&#xff0c;负责处理应用程序中所有未处理的异常。当应用程序代码未处理异常时&#xff0c;该层会捕获该异常&#xff0c;然后自动发送适当的用户友好响应。 开箱即用&#xff0c;此操作由内置的全局异常过滤器执行&#xff0c;该过滤器…

三、Locust任务(task)详解

当一个负载测试开始时&#xff0c;将为每个模拟用户创建一个用户类的实例&#xff0c;他们将在自己的绿色线程中开始运行。当这些用户运行时&#xff0c;他们会选择执行的任务&#xff0c;睡眠一段时间&#xff0c;然后选择一个新的任务&#xff0c;如此循环。 这些任务是正常…

二、Java 并发编程(4)

本章概要 Java 中的锁 乐观锁悲观锁自旋锁synchronizedReentrantLocksynchronized 与 ReentrantLock 对比SemaphoreAtomicInteger可重入锁公平锁和非公平锁读写锁共享锁和独占锁重量级锁和轻量级锁偏向锁分段锁同步锁和死锁如何进行锁优化 2.6 Java 中的锁 Java 中的锁主要…

【C语言进阶:动态内存管理】C/C++中程序内存区域的划分

⚡C/C中程序内存区域的划分 C/C程序内存分配的几个区域&#xff1a; 栈区&#xff08;stack&#xff09;&#xff1a;在执行函数时&#xff0c;函数内局部变量的存储单元都可以在栈上创建&#xff0c;函数执行结 束时这些存储单元自动被释放。栈内存分配运算内置于处理器的指…

day8 互斥锁/读写锁的概念及使用、死锁的避免

目录 互斥锁的概念和使用 线程通信 - 互斥 互斥锁的创建和销毁 互斥锁的创建 互斥锁的销毁 互斥锁的使用 申请锁 释放锁 互斥锁的概念和使用 线程通信 - 互斥 临界资源&#xff1a; 一次只允许一个任务&#xff08;进程、线程&#xff09;访问的共享资源&#xff1b…

Maven-依赖管理

一. 依赖管理 1. maven-依赖管理-依赖配置 依赖&#xff1a;指当前项目运行所需要的jar包。一个项目中可以引入多个依赖&#xff1a; 例如&#xff1a;在当前工程中&#xff0c;我们需要用到logback来记录日志&#xff0c;此时就可以在maven工程的pom.xml文件中&#xff0c…

Python让ChatGPT全自动改写生成文章教程

ChatGPT是一个在自然语言处理领域非常先进的文本生成模型&#xff0c;它能够产生高质量、连贯的文章。它受到了广泛的关注&#xff0c;因为它可以自动生成大量的文本&#xff0c;从而减轻了人工写作的负担。怎么使用chatgpt批量改写文章&#xff1f;最简单的方式就是找到一家接…

I.MX6U开发板使用OTG烧写系统

1.系统烧写 在实际的产品开发中肯定不可能通过网络来运行&#xff0c;否则没网的时候产品岂不 是就歇菜了。因此我们需要将 uboot、linux kernel、.dtb(设备树)和 rootfs 这四个文件烧写到板子 上的 EMMC、NAND 或 QSPI Flash 等其他存储设备上&#xff0c;这样不管有没有网络我…

SpringCloud 使用sentinel

一、添加依赖 <dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-sentinel</artifactId> </dependency> 二、配置文件配置地址 spring:cloud:sentinel:transport:dashboard: localhost:8080三…

机器学习:基于逻辑回归对优惠券使用情况预测分析

机器学习&#xff1a;基于逻辑回归对优惠券使用情况预测分析 作者&#xff1a;i阿极 作者简介&#xff1a;Python领域新星作者、多项比赛获奖者&#xff1a;博主个人首页 &#x1f60a;&#x1f60a;&#x1f60a;如果觉得文章不错或能帮助到你学习&#xff0c;可以点赞&#x…

Elasticsearch:集群管理的一些建议

在之前的文章 “Elasticsearch&#xff1a;集群管理” &#xff0c;我们对集群管理做了一些介绍。在今天的文章中&#xff0c;我们接着来聊一下有关配置的方面的问题。这在很大程度上取决于你的用例&#xff0c;是索引还是搜索繁重。 我们将在这里讨论在集群设置方面我们需要关…

中国算力的想象力有多大?|产业特稿

巨头入场和“东数西算”的助推&#xff0c;让中国离这个万亿级算力蓝海更近了一步。 作者|思杭 编辑|皮爷 出品|产业家 2023年初&#xff0c;在青岛、济南、日照等12座城市&#xff0c;一座座崭新的大型数据中心拔地而起。 其中&#xff0c;最引人瞩目的属2月23日&#xff…

文件上传漏洞 --- php邂逅windows通用上传缺陷

目录 后端源码 前端源码 后端代码审计 方式一绕过原理 --- 冒号加特性 验证及结果 方式二绕过原理 --- 数据流 验证及结果 环境需求 php5.2.17IIS环境&#xff0c;可以下载phpstuday2018来满足环境的要求。 后端源码 <?php //U-Mail demo ... if(isset($_POST[sub…

【AI能否取代设计师】「Stable Diffusion」AI绘画黑科技将告诉你答案

上一篇文章&#xff1a;【AI绘画】我以Midjourney为主学习AI绘画效果咋样&#xff1f;_山楂山楂丸的博客-CSDN博客 目录 前言 一、「Stable Diffusion」 是什么 二、「Stable Diffusion」上手演练 三、竟然还有ChatGPT&#xff1f; 四、「Stable Diffusion」作品展示 五、…