RocketMQ 5.1 版本 NameServer 路由管理

news2024/9/21 10:54:30

文章目录

  • 1. 路由管理核心组件介绍
  • 2. RouteInfoManager 路由表
  • 3. 路由管理
    • 3.1 注册 Broker
    • 3.2 注销 Broker
    • 3.3 拼凑 TopicRouteData

此文章基于 RocketMQ 5.1 版本进行分析,与 4.x 版本相比此文章分析的部分源码有很大的区别

1. 路由管理核心组件介绍

路由管理是指维护 Broker、Topic、Queue 和 Consumer Group 之间的对应关系,以及提供给 Producer 和 Consumer 获取这些关系的服务。路由管理涉及到以下几个核心组件:

  • NameServerController:NameServer 的控制器类,负责初始化、启动和关闭 NameServer 的各个组件。
  • RouteInfoManager:NameServer 的核心组件之一,负责维护 Broker、Topic、Queue 和 Consumer Group 的路由信息,以及提供查询、注册和删除的服务。
  • BrokerController:Broker 的控制器类,负责初始化、启动和关闭 Broker 的各个组件。
  • BrokerOuterAPI:Broker 的核心组件之一,负责与 NameServer 通信,定时向 NameServer 注册自身信息,并获取其他 Broker 的信息。
  • MQClientInstance:Producer 和 Consumer 的内部实现类,负责管理 Producer 和 Consumer 的信息,并与 NameServer 和 Broker 通信。
  • MQClientAPIImpl:MQClientInstance 的核心组件之一,负责封装与 NameServer 和 Broker 的通信协议,并执行相应的请求。

本文仅围绕 RouteInfoManager 对 RocketMQ 5.1 版本路由管理进行分析

2. RouteInfoManager 路由表

namesrv的路由管理主要由 RouteInfoManager 对象负责,该对象维护了一个内存中的路由表,包括以下几个部分:

  • topicQueueTable:一个 Map 结构,key是 topic 名称,value 是一个 Map 结构,key 是 brokerName,value 是 QueueData 对象

    topicQueueTable 中的 broker 是一个 broker 组,包含 Master 和 Slave 两部分

     private final Map<String/* topic */, Map<String, QueueData>> topicQueueTable;
    

    QueueData 类的具体属性如下表所示

    属性名含义
    brokerName存储该队列的 broker 的名称
    readQueueNums读队列的数量,即该队列下有多少个读队列,这个值在创建队列时由用户指定,通常是4
    writeQueueNums写队列的数量,即该队列下有多少个写队列,这个值在创建队列时由用户指定,通常是4
    perm队列的权限,是一个二进制数,如6代表可读写,4代表只可读,2代表只可写,0代表没有权限
    topicSysFlag系统标志
  • brokerAddrTable:一个 Map 结构,key 是 brokerName,value 是 BrokerData 对象

    brokerAddrTable 中的 broker 是一个 broker 组,包含 Master 和 Slave 两部分

     private final Map<String/* brokerName */, BrokerData> brokerAddrTable;
    

    BrokerData 类的具体属性如下表所示

    属性名含义
    cluster当前 broker 所在的集群名称
    brokerName当前 broker 的名称
    brokerAddrs当前 broker 的地址,是一个 Map,key 是 brokerId,value 是单个 broker 实例(一个 Master broker 或一个 Slave broker)的地址
    zoneName当前 broker 所在的 zone
    enableActingMaster当前 broker 是否允许 slave 切换为 master,这个属性主要用于兼容老版本的 HA (High Availability)

    其中 enableActingMaster 属性在 RIP 32 Slave Acting Master Mode 中被添加,表示是否启用 slave acting master 模式,用于旧版本的HA适配。HA是高可用的缩写,指的是RocketMQ的主从架构。旧版本的HA指的是没有 DLedgerController 的主从架构,这种架构下,slave 不能提供消息的发送和消费,也不能执行一些 master 才能执行的操作。如果启用了 slave acting master 模式,当 master 出现故障时,slave可以承担一些 master 的任务,比如读取消息、扫描和转发特殊消息、反向同步元数据等。这样可以提高可用性和容错性。

  • brokerLiveTable:一个 Map 结构,key 是 BrokerAddrInfo 对象,value 是 BrokerLiveInfo 对象

    brokerLiveTable 中的 broker 是一个 broker 实例,如一个 Master broker 或 一个 Slave broker 实例

     private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    

    BrokerAddrInfo 类的具体属性如下表所示

    属性名含义
    clusterName当前 broker 实例所在的集群名称
    brokerAddr当前 broker 实例的地址

    BrokerLiveInfo 类的具体属性如下表所示

    属性名含义
    lastUpdateTimestampbroker 最后一次向 NameServer 发送心跳的时间戳。这个用来检查 broker 是否过期或者不在线
    heartbeatTimeoutMillisbroker 的心跳超时时间。如果 broker 在这个时间内没有发送心跳,就会被 NameServer 认为是离线的
    dataVersionbroker 的 topic 配置的数据版本。这个用来在 broker 注册或者重新注册时,比较和更新 NameServer 中的 topic 信息
    channelbroker 和 NameServer 之间的通道。这个用来进行通信和发送请求或者响应
    haServerAddrbroker 的高可用服务器的地址。这个用来让其他的 broker 或者消费者连接到 broker 进行数据复制或者消费
  • clusterAddrTable:一个 Map 结构,key 是 broker 所在的集群的名称,value 是 broker 名称的集合

    clusterAddrTable 中的 broker 是一个 broker 系统

    private final Map<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    
  • filterServerTable:一个 Map 结构,key 是 BrokerAddrInfo 对象,value 是过滤器服务器地址列表。过滤器服务器用于支持消息过滤功能

    filterServerTable 中的 broker 是一个 broker 实例

    private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
    
  • topicQueueMappingInfoTable:存储每个 topic 在每个 broker 上的queue信息,key为topic,value为一个map,key为brokerName,value为TopicQueueMappingInfo

    private final Map<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable;
    

    TopicQueueMappingInfo 类的具体属性如下表所示

    属性名含义
    topictopic名称,用于标识topic
    scopetopic的范围,全局或局部,用于区分不同的topic空间
    totalQueues总队列数,用于指定topic的分区数量
    bname所属的broker名称,用于识别托管该topic的broker
    epoch用于防止旧的脏数据,每次更新时递增
    dirty是否是脏数据,用于标记该topic是否需要被删除或更新
    currIdMap逻辑ID和物理ID的映射关系,用于将topic的逻辑队列映射到物理队列

以一个 cluster 为例,上述部分表的内容如下图所示

在这里插入图片描述

3. 路由管理

RouteInfoManager 提供了一系列的方法来更新和查询路由表,比如:

  • registerBroker:用于处理broker注册请求,向路由表中添加或更新broker数据、队列数据和过滤器服务器数据,并返回所有主题的队列数据给broker。
  • unregisterBroker:用于处理broker注销请求,从路由表中删除指定集群、broker名称和broker地址对应的数据。
  • scanNotActiveBroker:用于定时扫描并删除不活跃的broker数据,判断标准是上次更新时间超过2分钟。
  • getAllClusterInfo:用于获取所有集群信息,返回clusterAddrTable的字符串形式。
  • getTopicRouteInfo:用于获取指定主题的路由信息,返回包含broker数据和队列数据的TopicRouteData对象。如果开启了区域模式,还会根据区域名称过滤掉不属于该区域的broker数据和队列数据。
  • getSystemTopicList:用于获取系统内置主题列表,比如TBW102、OFFSET_MOVED_EVENT等。
  • getUnitTopics:用于获取单元化部署的主题列表,即包含“%”符号的主题。
  • getHasUnitSubTopicList:用于获取存在单元化订阅组的主题列表,即订阅组名称包含“%”符号的主题。
  • getHasUnitSubUnUnitTopicList:用于获取存在单元化订阅组但没有单元化部署的主题列表,即订阅组名称包含“%”符号但主题名称不包含“%”符号的主题。

3.1 注册 Broker

注册 Broker 的代码为 org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker,注册的 broker 是一个 broker 实例而非 broker 组

public RegisterBrokerResult registerBroker(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final String zoneName,
    final Long timeoutMillis,
    final Boolean enableActingMaster,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final Channel channel) {
    
    // ...
}

分为几个步骤

  1. 创建一个 RegisterBrokerResult 对象,用于返回注册结果

    RegisterBrokerResult result = new RegisterBrokerResult();
    
  2. 获取写锁,防止并发修改路由信息

    try {
        this.lock.writeLock().lockInterruptibly();
        // ...
    } catch (Exception e) {
        log.error("registerBroker Exception", e);
    } finally {
        this.lock.writeLock().unlock();
    }
    
  3. 更新 cluster 信息,向 clusterAddrTable 中添加 brokerName

    此步骤检查 cluster 是否被注册,即 clusterAddrTable 中是否存在 key 等于 clusterName

    • 如果 clusterAddrTable 中不存在 clusterName,则创建一个新的 Set 并将其作为 value,并赋值给 brokerNames

    • 如果 clusterAddrTable 中存在 clusterName,则直接返回该 value,并赋值给 brokerNames

    把当前 broker 的名称 brokerName 加入到集群对应的 broker 名称集合 brokerNames

    Set<String> brokerNames = ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap<String, Set<String>>) this.clusterAddrTable, clusterName, k -> new HashSet<>());
    brokerNames.add(brokerName);
    
  4. 更新 broker 信息,向 brokerAddrTable 添加 BrokerData

    此步骤检查 broker 组是否存在,即 brokerAddrTable 中是否存在 key 等于 brokerName

    • 如果 brokerAddrTable 中不存在 brokerName,则根据传入的参数创建一个新的 BrokerData 并添加进 brokerAddrTable

    • 如果 brokerAddrTable 中存在 brokerName,则检查存在的 broker 组是否是老版本的 HA 架构

      • 如果是老版本则设置 enableActingMaster 属性为 false
      • 否则设置 enableActingMaster 属性为 true。

      enableActingMaster 属性在上述 BrokerData 属性中已经进行了详细介绍

    最后设置/更新 broker 的 zoneName

    boolean registerFirst = false;
    
    // 检查 broker 组是否存在
    BrokerData brokerData = this.brokerAddrTable.get(brokerName);
    if (null == brokerData) { // broker 组不存在
        registerFirst = true;
        // 创建 BrokerData 对象,HashMap 中的 key 为 brokerId,value 为 brokerAddr
        brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());
        this.brokerAddrTable.put(brokerName, brokerData);
    }
    
    // 检查 broker 是否是老版本
    // 如果是老版本,则 enableActingMaster 为 null,则将 brokerData.enableActingMaster 值设置为 false
    // 如果不是老版本,则将 brokerData.enableActingMaster 的值设为 enableActingMaster 的值
    boolean isOldVersionBroker = enableActingMaster == null;
    brokerData.setEnableActingMaster(!isOldVersionBroker && enableActingMaster);
    brokerData.setZoneName(zoneName);
    
  5. 更新 broker 地址信息,向 brokerAddrTable 的值 BrokerDatabrokerAddrsMap 中移除可能冲突的 broker 地址,再把当前 broker 信息更新到 brokerAddrsMap

    此步骤有以下几部分

    • 如果 brokerAddrsMap 不为空则将 prevMinBrokerId 设为 brokerAddrsMap 中最小的 brokerId 值,否则设为 0

    • 检查正在注册的 brokerId 是否小于 brokerAddrsMap 中的最小 brokerId。如果是,它将 isMinBrokerIdChanged 设置为 true

    • brokerAddrsMap 中移除所有 brokerAddr 相等但是 brokerId 不相等的 broker。如果 brokerAddr 相等,但 brokerId 不相等,可能是由于从主切换重新注册,因此需要先移除旧的 broker

    • 将当前 broker 信息更新到 brokerAddrsMap

    Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
    
    // 添加 brokerId 和 brokerAddr 的映射关系
    boolean isMinBrokerIdChanged = false;
    long prevMinBrokerId = 0;
    // 如果 brokerAddrsMap 不为空,则获取 brokerAddrsMap 中最小的 brokerId
    if (!brokerAddrsMap.isEmpty()) {
        prevMinBrokerId = Collections.min(brokerAddrsMap.keySet());
    }
    // 如果传入的 brokerId 小于 brokerAddrsMap 中最小的 brokerId,则将 isMinBrokerIdChanged 设置为 true
    if (brokerId < prevMinBrokerId) {
        isMinBrokerIdChanged = true;
    }
    
    //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
    //The same IP:PORT must only have one record in brokerAddrTable
    // 从 brokerAddrsMap 中移除所有 brokerAddr 相等但是 brokerId 不相等的 broker,避免在从节点切换到主节点或相反操作时,在映射中有相同IP:PORT的重复记录
    brokerAddrsMap.entrySet().removeIf(item -> null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey());
    
    // 6.两种情况直接返回 代码省略...
    
    // 更新 brokerAddrsMap 中 brokerId 和 brokerAddr 的映射关系
    String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr);
    registerFirst = registerFirst || (StringUtils.isEmpty(oldAddr));
    
  6. 两种情况直接返回

    • 一个 broker 实例只能注册一次,如果 brokerId 重复,说明 brokerAddr 也重复,此时需要更新 brokerAddr

      如果现有 broker 的状态版本高于正在注册的 broker 的状态版本,说明现有的 broker 是更新的,不应该被覆盖,因此返回不注册/更新 broker 的结果,并将正在注册的 broker 信息从 brokerLiveTable 中移除

    • 如果 broker 之前没有注册过,并且只有一个topic配置,此时返回null,表示注册失败,因为 broker 不允许只有一个 topic 配置就注册

    //If Local brokerId stateVersion bigger than the registering one,
    String oldBrokerAddr = brokerAddrsMap.get(brokerId);
    
    /*
      该代码检查正在注册的 broker 是否与现有 broker 冲突。
      如果现有 broker 的状态版本高于正在注册的 broker 的状态版本,说明现有的 broker 是更新的,不应该被覆盖。
      此时记录警告,并且不会注册 broker,且正在注册的 broker 将从 brokerLiveTable 中删除。
      这可确保仅使用最新的 broker 信息。
     */
    if (null != oldBrokerAddr && !oldBrokerAddr.equals(brokerAddr)) {
        BrokerLiveInfo oldBrokerInfo = brokerLiveTable.get(new BrokerAddrInfo(clusterName, oldBrokerAddr));
    
        if (null != oldBrokerInfo) {
            long oldStateVersion = oldBrokerInfo.getDataVersion().getStateVersion();
            long newStateVersion = topicConfigWrapper.getDataVersion().getStateVersion();
            if (oldStateVersion > newStateVersion) {
                log.warn("Registered Broker conflicts with the existed one, just ignore.: Cluster:{}, BrokerName:{}, BrokerId:{}, " +
                        "Old BrokerAddr:{}, Old Version:{}, New BrokerAddr:{}, New Version:{}.",
                    clusterName, brokerName, brokerId, oldBrokerAddr, oldStateVersion, brokerAddr, newStateVersion);
                //Remove the rejected brokerAddr from brokerLiveTable.
                brokerLiveTable.remove(new BrokerAddrInfo(clusterName, brokerAddr));
                return result;
            }
        }
    }
    
    /*
      此代码检查正在注册的代理是否已在 brokerAddrsMap 中,以及 topicConfigWrapper 是否只有一个 topic 配置。
      如果 broker 不在 Map 中,并且只有一个 topic 配置,那么将记录一条警告,指出由于尚未注册 broker,因此无法注册主题配置包装器。
      然后,该方法返回 null,指示注册不成功。
     */
    if (!brokerAddrsMap.containsKey(brokerId) && topicConfigWrapper.getTopicConfigTable().size() == 1) {
        log.warn("Can't register topicConfigWrapper={} because broker[{}]={} has not registered.",
            topicConfigWrapper.getTopicConfigTable(), brokerId, brokerAddr);
        return null;
    }
    
  7. 更新 topicQueueTabletopicQueueMappingInfoTable

    • 如果 broker 是第一次注册或者 topicConfig 发生了变化,则更新 topicQueueTable 中的 QueueData
    • 如果 broker 是第一次注册或者 broker 实例对应的数据版本和参数中的数据版本不同,则更新 topicQueueMappingInfoTable
    boolean isMaster = MixAll.MASTER_ID == brokerId;
    // 这段代码的意义是判断一个broker是否是prime slave。
    // prime slave是一个在broker组中拥有最小brokerId的slave,它可以在原来的master失败时充当master。
    boolean isPrimeSlave = !isOldVersionBroker && !isMaster
        && brokerId == Collections.min(brokerAddrsMap.keySet());
    
    // 如果不是普通的 slave broker,则进行如下操作
    if (null != topicConfigWrapper && (isMaster || isPrimeSlave)) {
    
        ConcurrentMap<String, TopicConfig> tcTable =
            topicConfigWrapper.getTopicConfigTable();
        // 如果 tcTable 不为空,则遍历 tcTable
        // 如果 broker 是第一次注册或者 topicConfig 发生了变化,则更新 topicQueueTable 中的 QueueData
        if (tcTable != null) {
            for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                if (registerFirst || this.isTopicConfigChanged(clusterName, brokerAddr,
                                                               topicConfigWrapper.getDataVersion(), brokerName,
                                                               entry.getValue().getTopicName())) { // 如果是第一次注册或者 topicConfig 发生了变化
                    final TopicConfig topicConfig = entry.getValue();
                    if (isPrimeSlave) {
                        // Wipe write perm for prime slave
                        // 擦除 prime slave 的写权限
                        topicConfig.setPerm(topicConfig.getPerm() & (~PermName.PERM_WRITE));
                    }
                    // 创建/更新 topicQueueTable 中的 QueueData
                    this.createAndUpdateQueueData(brokerName, topicConfig);
                }
            }
        }
    
        // 如果 broker 是第一次注册或者 broker 实例对应的数据版本和参数中的数据版本不同,则更新 topicQueueMappingInfoTable
        if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {
            TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);
            Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap();
            //the topicQueueMappingInfoMap should never be null, but can be empty
            for (Map.Entry<String, TopicQueueMappingInfo> entry : topicQueueMappingInfoMap.entrySet()) {
                if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
                    topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<>());
                }
                //Note asset brokerName equal entry.getValue().getBname()
                //here use the mappingDetail.bname
                topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());
            }
        }
    }
    
  8. 更新 brokerLiveTable

    // 更新 brokerLiveTable
    BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
    BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddrInfo,
        new BrokerLiveInfo(
            System.currentTimeMillis(),
            timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
            topicConfigWrapper == null ? new DataVersion() : topicConfigWrapper.getDataVersion(),
            channel,
            haServerAddr));
    if (null == prevBrokerLiveInfo) {
        log.info("new broker registered, {} HAService: {}", brokerAddrInfo, haServerAddr);
    }
    
  9. 更新 filterServerTable

    // 更新 filterServerTable
    if (filterServerList != null) {
        if (filterServerList.isEmpty()) {
            this.filterServerTable.remove(brokerAddrInfo);
        } else {
            this.filterServerTable.put(brokerAddrInfo, filterServerList);
        }
    }
    
  10. 更新 result 中主节点的信息

    / 更新 result 中主节点的信息
    if (MixAll.MASTER_ID != brokerId) {
        String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
        if (masterAddr != null) {
            BrokerAddrInfo masterAddrInfo = new BrokerAddrInfo(clusterName, masterAddr);
            BrokerLiveInfo masterLiveInfo = this.brokerLiveTable.get(masterAddrInfo);
            if (masterLiveInfo != null) {
                result.setHaServerAddr(masterLiveInfo.getHaServerAddr());
                result.setMasterAddr(masterAddr);
            }
        }
    }
    
    if (isMinBrokerIdChanged && namesrvConfig.isNotifyMinBrokerIdChanged()) {
        notifyMinBrokerIdChanged(brokerAddrsMap, null,
            this.brokerLiveTable.get(brokerAddrInfo).getHaServerAddr());
    }
    
  11. 释放写锁,返回RegisterBrokerResult对象

    finally {
        this.lock.writeLock().unlock();
    }
    return result;
    

3.2 注销 Broker

注销 Broker 的代码为 org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#unregisterBroker,注销的 broker 是一个 broker 实例而非 broker 组

在 RIP 29 Optimize RocketMQ NameServer 中添加了 BatchUnregistrationService 类用来批量注销 Broker,运行代码如下

@Override
public void run() {
    while (!this.isStopped()) {
        try {
            final UnRegisterBrokerRequestHeader request = unregistrationQueue.take();
            Set<UnRegisterBrokerRequestHeader> unregistrationRequests = new HashSet<>();
            unregistrationQueue.drainTo(unregistrationRequests);

            // Add polled request
            unregistrationRequests.add(request);

            this.routeInfoManager.unRegisterBroker(unregistrationRequests);
        } catch (Throwable e) {
            log.error("Handle unregister broker request failed", e);
        }
    }
}

在 run 方法中调用了 org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#unRegisterBroker 方法,此方法可分为以下几个执行步骤

  1. 更新 brokerLiveTable

    // remove from brokerLiveTable
    BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddrInfo);
    log.info("unregisterBroker, remove from brokerLiveTable {}, {}",
        brokerLiveInfo != null ? "OK" : "Failed",
        brokerAddrInfo
    );
    
  2. 更新 filterServerTable

    // remove from filterServerTable
    this.filterServerTable.remove(brokerAddrInfo);
    
  3. 更新 brokerAddrTable

    • 把 brokerData 中和传入的 brokerAddr 相同的地址移除

    • 经过上述操作后如果 brokerAddrs 为空,说明这个 brokerName 已经没有 broker 实例了,则将 brokerAddrTable 中 key 为传入的 brokerName 的元素删除,并将 removeBrokerName 设置为 true

    boolean removeBrokerName = false;
    boolean isMinBrokerIdChanged = false;
    BrokerData brokerData = this.brokerAddrTable.get(brokerName);
    if (null != brokerData) {
        // 如果当前 brokerId 是 brokerData 中最小的 brokerId,则将 isMinBrokerIdChanged 设置为 true
        if (!brokerData.getBrokerAddrs().isEmpty() &&
            unRegisterRequest.getBrokerId().equals(Collections.min(brokerData.getBrokerAddrs().keySet()))) {
            isMinBrokerIdChanged = true;
        }
        // 把 brokerData 中和 brokerAddr 相同的地址移除
        boolean removed = brokerData.getBrokerAddrs().entrySet().removeIf(item -> item.getValue().equals(brokerAddr));
        log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}",
            removed ? "OK" : "Failed",
            brokerAddrInfo
        );
        // 如果brokerData中没有地址了,说明这个brokerName已经没有broker实例了,需要从brokerAddrTable中移除
        if (brokerData.getBrokerAddrs().isEmpty()) {
            this.brokerAddrTable.remove(brokerName);
            log.info("unregisterBroker, remove name from brokerAddrTable OK, {}",
                brokerName
            );
    
            removeBrokerName = true;
        } else if (isMinBrokerIdChanged) { // 如果最小的brokerId发生了变化,需要通知其他broker更新路由信息
            needNotifyBrokerMap.put(brokerName, new BrokerStatusChangeInfo(
                brokerData.getBrokerAddrs(), brokerAddr, null));
        }
    }
    
  4. 更新 clusterAddrTable

    • 如果 removeBrokerName 为 true,则删除 clusterAddrTable 中 key 为传入的 clusterName 的值 Set 中和传入的 brokerName 相等的元素。如果删除元素后此时 Set 集合为空,则将此 cluster 删除。并将删除的 brokerName 添加到 removedBroker 集合中
    • 如果 removeBrokerName 为 false,则将传入的 brokerName 添加到 reducedBroker 集合中
    // 如果brokerName已经没有broker实例了,需要将brokerName从clusterAddrTable中移除
    if (removeBrokerName) {
        Set<String> nameSet = this.clusterAddrTable.get(clusterName);
        if (nameSet != null) {
            boolean removed = nameSet.remove(brokerName);
            log.info("unregisterBroker, remove name from clusterAddrTable {}, {}",
                removed ? "OK" : "Failed",
                brokerName);
    
            if (nameSet.isEmpty()) {
                this.clusterAddrTable.remove(clusterName);
                log.info("unregisterBroker, remove cluster from clusterAddrTable {}",
                    clusterName
                );
            }
        }
        removedBroker.add(brokerName);
    } else {
        reducedBroker.add(brokerName);
    }
    
  5. 更新 topicQueueTable

    调用了 cleanTopicByUnRegisterRequests 方法,将上一步更新的 removedBroker 和 reducedBroker 作为参数传入方法中

    // 清理topicQueueTable中的无效数据
    cleanTopicByUnRegisterRequests(removedBroker, reducedBroker);
    

    cleanTopicByUnRegisterRequests 方法中清理了 topicQueueTable 中无效的数据

    private void cleanTopicByUnRegisterRequests(Set<String> removedBroker, Set<String> reducedBroker) {
        Iterator<Entry<String, Map<String, QueueData>>> itMap = this.topicQueueTable.entrySet().iterator();
        while (itMap.hasNext()) { // 遍历所有的topic
            Entry<String, Map<String, QueueData>> entry = itMap.next();
    
            String topic = entry.getKey();
            // 遍历topic对应的brokerName和QueueData
            Map<String, QueueData> queueDataMap = entry.getValue();
    
            // 遍历需要移除的brokerName
            for (final String brokerName : removedBroker) {
                // 移除这个brokerName对应的QueueData
                final QueueData removedQD = queueDataMap.remove(brokerName);
                if (removedQD != null) {
                    log.debug("removeTopicByBrokerName, remove one broker's topic {} {}", topic, removedQD);
                }
            }
    
            // 如果topic对应的brokerName都移除了,那么就移除这个topic
            if (queueDataMap.isEmpty()) {
                log.debug("removeTopicByBrokerName, remove the topic all queue {}", topic);
                itMap.remove();
            }
    
            // 遍历需要减少的brokerName
            for (final String brokerName : reducedBroker) {
                final QueueData queueData = queueDataMap.get(brokerName);
    
                if (queueData != null) {
                    // 如果这个brokerName对应的brokerData开启了自动切换master的功能,那么就需要判断这个brokerName对应的brokerData中是否还有master
                    if (this.brokerAddrTable.get(brokerName).isEnableActingMaster()) {
                        // Master has been unregistered, wipe the write perm
                        // 如果这个brokerName对应的brokerData中没有master了,那么就把这个topic的写权限去掉
                        if (isNoMasterExists(brokerName)) {
                            queueData.setPerm(queueData.getPerm() & (~PermName.PERM_WRITE));
                        }
                    }
                }
            }
        }
    }
    
  6. 通知其他 broker 组更新信息

    调用了 notifyMinBrokerIdChanged 方法

    // 如果最小的brokerId发生了变化,需要通知其他broker更新路由信息
    if (!needNotifyBrokerMap.isEmpty() && namesrvConfig.isNotifyMinBrokerIdChanged()) {
        notifyMinBrokerIdChanged(needNotifyBrokerMap);
    }
    

    notifyMinBrokerIdChanged 方法中又调用了重载后的 notifyMinBrokerIdChanged 方法

    private void notifyMinBrokerIdChanged(Map<String, BrokerStatusChangeInfo> needNotifyBrokerMap)
        throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException,
        RemotingTooMuchRequestException {
        for (String brokerName : needNotifyBrokerMap.keySet()) {
            BrokerStatusChangeInfo brokerStatusChangeInfo = needNotifyBrokerMap.get(brokerName);
            BrokerData brokerData = brokerAddrTable.get(brokerName);
            if (brokerData != null && brokerData.isEnableActingMaster()) {
                notifyMinBrokerIdChanged(brokerStatusChangeInfo.getBrokerAddrs(),
                    brokerStatusChangeInfo.getOfflineBrokerAddr(), brokerStatusChangeInfo.getHaBrokerAddr());
            }
        }
    }
    

    在重载的 notifyMinBrokerIdChanged 方法中向非最小 BrokerId 的 Broker 地址通知最小 BrokerId 发生了变化

    private void notifyMinBrokerIdChanged(Map<Long, String> brokerAddrMap, String offlineBrokerAddr,
        String haBrokerAddr)
        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException,
        RemotingTooMuchRequestException, RemotingConnectException {
        if (brokerAddrMap == null || brokerAddrMap.isEmpty() || this.namesrvController == null) {
            return;
        }
    
        NotifyMinBrokerIdChangeRequestHeader requestHeader = new NotifyMinBrokerIdChangeRequestHeader();
        long minBrokerId = Collections.min(brokerAddrMap.keySet());
        requestHeader.setMinBrokerId(minBrokerId);
        requestHeader.setMinBrokerAddr(brokerAddrMap.get(minBrokerId));
        requestHeader.setOfflineBrokerAddr(offlineBrokerAddr);
        requestHeader.setHaBrokerAddr(haBrokerAddr);
    
        // 选择通知的 Broker 地址,即非最小 BrokerId 的 Broker 地址
        List<String> brokerAddrsNotify = chooseBrokerAddrsToNotify(brokerAddrMap, offlineBrokerAddr);
        log.info("min broker id changed to {}, notify {}, offline broker addr {}", minBrokerId, brokerAddrsNotify, offlineBrokerAddr);
        RemotingCommand request =
            RemotingCommand.createRequestCommand(RequestCode.NOTIFY_MIN_BROKER_ID_CHANGE, requestHeader);
        for (String brokerAddr : brokerAddrsNotify) {
            // 向 Broker 发送通知
            this.namesrvController.getRemotingClient().invokeOneway(brokerAddr, request, 300);
        }
    }
    

3.3 拼凑 TopicRouteData

首先来看 TopicRouteData 的定义,其属性如下

private String orderTopicConf;
private List<QueueData> queueDatas;
private List<BrokerData> brokerDatas;
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
//It could be null or empty
private Map<String/*brokerName*/, TopicQueueMappingInfo> topicQueueMappingByBroker;

拼凑 TopicRouteData 的方法是 org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#pickupTopicRouteData,此方法可大致分为以下几个执行步骤

  1. topicQueueTable 按照主题名称查询 queueDatas

  2. 根据 queueDatas 中 brokerName 属性从 brokerAddrTable 取得 brokerData 对象,组成 brokerDatas

  3. 根据查询到的 brokerDatafilterServerTable 查询到对应的 filterServer 列表,组装为 Map

    // 从topicQueueTable按照主题名称查询queueDatas
    Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topic);
    if (queueDataMap != null) {
        // 保存查询到的queueDatas
        topicRouteData.setQueueDatas(new ArrayList<>(queueDataMap.values()));
        foundQueueData = true;
    
        Set<String> brokerNameSet = new HashSet<>(queueDataMap.keySet());
    
        // 根据queueDatas中brokerName属性从brokerAddrTable取得brokerData对象,组成brokerDatas
        for (String brokerName : brokerNameSet) {
            BrokerData brokerData = this.brokerAddrTable.get(brokerName);
            if (null == brokerData) {
                continue;
            }
            BrokerData brokerDataClone = new BrokerData(brokerData);
    
            // 保存查询到的brokerDatas
            brokerDataList.add(brokerDataClone);
            foundBrokerData = true;
            if (filterServerTable.isEmpty()) {
                continue;
            }
            // 根据查询到的brokerData从filterServerTable查询到对应的filterServer列表,组装为Map
            for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
                BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(brokerDataClone.getCluster(), brokerAddr);
                List<String> filterServerList = this.filterServerTable.get(brokerAddrInfo);
                // 保存查询到的filterServer列表
                filterServerMap.put(brokerAddr, filterServerList);
            }
    
        }
    }
    
  4. 返回 topicRouteData,如果有需要会重新设置 Master 的地址

    if (foundBrokerData && foundQueueData) {
    
        topicRouteData.setTopicQueueMappingByBroker(this.topicQueueMappingInfoTable.get(topic));
    
        if (!namesrvConfig.isSupportActingMaster()) {
            return topicRouteData;
        }
    
        if (topic.startsWith(TopicValidator.SYNC_BROKER_MEMBER_GROUP_PREFIX)) {
            return topicRouteData;
        }
    
        if (topicRouteData.getBrokerDatas().size() == 0 || topicRouteData.getQueueDatas().size() == 0) {
            return topicRouteData;
        }
    
        boolean needActingMaster = false;
    
        for (final BrokerData brokerData : topicRouteData.getBrokerDatas()) {
            if (brokerData.getBrokerAddrs().size() != 0
                && !brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
                needActingMaster = true;
                break;
            }
        }
    
        if (!needActingMaster) {
            return topicRouteData;
        }
    
        // 如果需要自动切换master,则执行下面的逻辑
        for (final BrokerData brokerData : topicRouteData.getBrokerDatas()) {
            final HashMap<Long, String> brokerAddrs = brokerData.getBrokerAddrs();
            // 如果brokerAddrs为空,或者brokerAddrs中包含masterId,或者brokerData中的enableActingMaster为false,那么就跳过
            if (brokerAddrs.size() == 0 || brokerAddrs.containsKey(MixAll.MASTER_ID) || !brokerData.isEnableActingMaster()) {
                continue;
            }
    
            // No master
            for (final QueueData queueData : topicRouteData.getQueueDatas()) {
                if (queueData.getBrokerName().equals(brokerData.getBrokerName())) {
                    // 如果queueData的perm不可写,那么就将brokerAddrs中brokerId最小的brokerAddr的brokerId改为masterId
                    if (!PermName.isWriteable(queueData.getPerm())) {
                        final Long minBrokerId = Collections.min(brokerAddrs.keySet());
                        final String actingMasterAddr = brokerAddrs.remove(minBrokerId);
                        brokerAddrs.put(MixAll.MASTER_ID, actingMasterAddr);
                    }
                    break;
                }
            }
    
        }
    
        return topicRouteData;
    }
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/462662.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

又一次503 service unavailable处理

出现了&#xff1a;503 service unavailable 1&#xff09;查看系统日志 通过事件查看器&#xff0c;查看iis的日志,如下&#xff1a; 在错误信息中提示是 应用程序池提供服务的进程中出现错误。 其他警告也可通过日志目录查看 C:\inetpub\ 出现上述问题的可能是&#xf…

树形结构——JAVA实现

1、树定义和基本术语 节点 package com.young.tree;/*** <p>* Title:树节点&#xff1a;二叉链表结构* </p>** Author: yangyongbing* Date: 2023-04-18 13:25* version: v1.0*/ public class Node<T> {public Node<T> lChild;private T data;public…

CASP15 蛋白质结构域 Domain 的定义和分类

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://blog.csdn.net/caroline_wendy/article/details/130379447 在CASP中&#xff0c;蛋白质结构域(Domain)的类别&#xff0c;包括 FM、FM/TBM、TBM-easy、TBM-hard、not evaluated 等5个类…

25从零开始学Java之数组扩容与数组拷贝的实现过程与原理分析

作者&#xff1a;孙玉昌&#xff0c;昵称【一一哥】&#xff0c;另外【壹壹哥】也是我哦 千锋教育高级教研员、CSDN博客专家、万粉博主、阿里云专家博主、掘金优质作者 前言 在上一篇文章中&#xff0c;壹哥给大家讲解了数组的创建、初始化及遍历方式&#xff0c;这些是我们学…

Cookies和Session案例-注册

1. 注册功能改进 1.1 service 将之前的注册案例的代码进行优化&#xff0c;将获取sqlsession工厂对象、获取sqlsession、获取mapper等操作从servlet中分离出来转变为三层架构的形式 在service目录下创建UserService public class UserService {SqlSessionFactory sqlSessionFa…

办公室组网

1.办公网络组网中,汇聚交换机和接入交换机你会做哪些配置? 接入交换机上配置: (1)VLAN配置:由题意得办公网络中有两个部门,使用VLAN技术将每个部门划入一个局域网中,如部门1属于VLAN 10,部门2属于VLAN20.该网络中还需要额外创建一个VLAN用于管理网络,如VLAN30。在接入…

一文了解国外AIGC头部产品

AIGC是指通过人工智能技术生成的内容&#xff0c;包括文字、图片、音频和视频等。AIGC技术可以基于大量的数据和算法&#xff0c;自动地生成各种类型的内容&#xff0c;可以用于新闻报道、广告宣传、文学创作、游戏设计等各个领域。AIGC技术的优点在于可以大大提高内容生产的效…

GD32F303RCT6开发笔记(一)—— macos环境搭建

macOS vscodegccpyocd环境搭建 1、vscode/arm-none-eabi-/pyocd 安装可百度。 2、pyocd 安装完成后&#xff0c;连接st-link 输入命令后显示如下&#xff0c;说明连接成功。 3、输入命令 pyocd pack find GD32F303RC4、如果没有安装GD32F303RC包 使用命令安装 pyocd pack …

【Vue 基础】vue-cli初始化项目及相关说明

目录 1. 创建项目 2. 项目文件介绍 3. 项目的其它配置 3.1 项目运行时&#xff0c;让浏览器自动打开 3.2 关闭eslint校验功能 3.3 src文件夹简写方法 1. 创建项目 vue create 项目名 2. 项目文件介绍 创建好的项目中包含如下文件&#xff1a; &#xff08;1&#xff09…

基于显扬科技自主研发3D机器视觉HY-M5在易拉罐包装检测的应用

行业现状&#xff1a; 易拉罐包装行业发展迅速&#xff0c;是中国食品工业的重要组成部分。近年来&#xff0c;随着经济水平的提高和生活方式变化&#xff0c;各类预包装食品需求剧增&#xff0c;碳酸饮料和啤酒等饮料消费大幅增加&#xff0c;直接带动易拉罐包装行业高速发展…

方案解析丨数字人主播如何成为电商直播新标配

浙江省政府办公厅近日印发《关于进一步扩大消费促进高质量发展若干举措》支持电子商务直播发展。抢抓电子商务直播快速发展机遇&#xff0c;发展数字人虚拟主播、元宇宙新消费场景等新业态新模式。 随着电商直播快速发展&#xff0c;企业怎么高效地实现引流获客&#xff0c;成为…

【计算机组成原理】数据的表示和运算·进位计数制

&#x1f6a9; 本文已收录至专栏&#xff1a;计算机基础 我们可以通过显示屏看到各种形式的数据信息&#xff0c;但数据是如何在计算机中表示呢&#xff1f;运算器又是如何实现数据的算数、逻辑运算&#xff1f; 十进制数是最适合我们日常使用的一种计数方式&#xff0c;除此之…

随手记录:Livox 时间戳修改为ROS时间戳

参考与前言 传感器类型&#xff1a;Livox-Mid70 参考链接&#xff1a;Ubuntu20.04系统安装Livox ROS Driver 官方驱动&#xff1a;https://github.com/Livox-SDK/livox_ros_driver 碎碎念&#xff1a;之所以要改成rostime主要是 提取pcd的时候发现这个timestamp 300.xxx 打…

我那张被问爆了的漫画头像确实有点东西

不得不说&#xff0c;这个照片变漫画有点东西啊&#xff01;不知道姐妹们有没有发现❗️最近漫画人像它又火起来了&#xff0c;基本上在dy等各大社交软件上&#xff0c;特别是朋友圈已经是刷屏了&#xff5e;随便一张照片经过渲染之后秒变动漫风格照&#xff0c;我就马不停蹄就…

宝塔面板搭建自己的网站,并发布公网远程访问

文章目录 1. 环境安装2. 安装cpolar内网穿透3. 内网穿透4.固定http地址5. 配置二级子域名6.创建一个测试页面 宝塔面板简单几步搭建本地web站点&#xff0c;并做内网穿透&#xff0c;实现公网用户也可以正常远程访问&#xff0c;无需公网IP&#xff0c;无需设置路由器。 1. 环…

这样的速度,还有谁?一个 issue 引发的性能大跃进

前段时间开源了一个关于音频特征提取和分析的小项目&#xff0c;自己是 AI 音频领域方向的&#xff0c;但受限于对音频特征的理解&#xff0c;做研究时总感觉缺乏“底料”&#xff0c;所以当做是学习练手做了这个小东西。 虽然是学习练手的小项目&#xff0c;但也信心满满&…

从盒马来看新零售的全面可行性

来源&#xff5c;新零售 不久前&#xff0c;一家位置极佳的北京老牌超市闭店的消息引发了很多人的唏嘘&#xff0c;这家超市位于北京长安街东侧的万达广场上&#xff0c;曾经作为万达广场的主力店&#xff0c;服务周边居民长达十年之久。 不过&#xff0c;周边的居民很快得知…

哪个牌子的电视盒子好用?经销商总结目前性能最好的电视盒子

做数码经销已经是第九年了&#xff0c;这些年对数码行业也算是颇有研究&#xff0c;大家选购数码产品时都会参考我的建议。今天我将来分享目前性能最好的电视盒子推荐&#xff0c;想知道哪个牌子的电视盒子好用看这篇就足够了。 一&#xff1a;泰捷WEBOX60Pro电视盒子 亮点&a…

27- OCR 光功率计数码管字符识别

要点&#xff1a; 光功率计数码管 1 前言 本案例将使用OCR技术自动识别光功率计显示屏文字&#xff0c;通过本章您可以掌握&#xff1a; PaddleOCR快速使用数据合成方法数据挖掘方法基于现有数据微调 为实现智能读数&#xff0c;通常会采取文本检测文本识别的方案&#xff…

【JavaWeb】jQuery(上)

本章内容 1.jQuery Hello world 2.jQuery 选择器 3.jQuery 过滤器 4.jQuery 元素筛选 1、jQuery 介绍 什么是 jQuery ? jQuery&#xff0c;顾名思义&#xff0c;也就是 JavaScript 和查询&#xff08;Query&#xff09;&#xff0c;它就是辅助 JavaScript 开发的 js 类…