RocketMQ5.1 NameServer 路由管理

news2024/11/27 4:38:01

文章目录

  • 1. 路由管理核心组件介绍
  • 2. RouteInfoManager 路由表
  • 3. 路由管理
    • 3.1 注册 Broker
    • 3.2 注销 Broker
    • 3.3 拼凑 TopicRouteData

此文章基于 RocketMQ 5.1 版本进行分析,与 4.x 版本相比此文章分析的部分源码有很大的区别

1. 路由管理核心组件介绍

路由管理是指维护 Broker、Topic、Queue 和 Consumer Group 之间的对应关系,以及提供给 Producer 和 Consumer 获取这些关系的服务。路由管理涉及到以下几个核心组件:

  • NameServerController:NameServer 的控制器类,负责初始化、启动和关闭 NameServer 的各个组件。
  • RouteInfoManager:NameServer 的核心组件之一,负责维护 Broker、Topic、Queue 和 Consumer Group 的路由信息,以及提供查询、注册和删除的服务。
  • BrokerController:Broker 的控制器类,负责初始化、启动和关闭 Broker 的各个组件。
  • BrokerOuterAPI:Broker 的核心组件之一,负责与 NameServer 通信,定时向 NameServer 注册自身信息,并获取其他 Broker 的信息。
  • MQClientInstance:Producer 和 Consumer 的内部实现类,负责管理 Producer 和 Consumer 的信息,并与 NameServer 和 Broker 通信。
  • MQClientAPIImpl:MQClientInstance 的核心组件之一,负责封装与 NameServer 和 Broker 的通信协议,并执行相应的请求。

本文仅围绕 RouteInfoManager 对 RocketMQ 5.1 版本路由管理进行分析

2. RouteInfoManager 路由表

namesrv的路由管理主要由 RouteInfoManager 对象负责,该对象维护了一个内存中的路由表,包括以下几个部分:

  • topicQueueTable:一个 Map 结构,key是 topic 名称,value 是一个 Map 结构,key 是 brokerName,value 是 QueueData 对象

    topicQueueTable 中的 broker 是一个 broker 组,包含 Master 和 Slave 两部分

     private final Map<String/* topic */, Map<String, QueueData>> topicQueueTable;
    

    QueueData 类的具体属性如下表所示

    属性名含义
    brokerName存储该队列的 broker 的名称
    readQueueNums读队列的数量,即该队列下有多少个读队列,这个值在创建队列时由用户指定,通常是4
    writeQueueNums写队列的数量,即该队列下有多少个写队列,这个值在创建队列时由用户指定,通常是4
    perm队列的权限,是一个二进制数,如6代表可读写,4代表只可读,2代表只可写,0代表没有权限
    topicSysFlag系统标志
  • brokerAddrTable:一个 Map 结构,key 是 brokerName,value 是 BrokerData 对象

    brokerAddrTable 中的 broker 是一个 broker 组,包含 Master 和 Slave 两部分

     private final Map<String/* brokerName */, BrokerData> brokerAddrTable;
    

    BrokerData 类的具体属性如下表所示

    属性名含义
    cluster当前 broker 所在的集群名称
    brokerName当前 broker 的名称
    brokerAddrs当前 broker 的地址,是一个 Map,key 是 brokerId,value 是单个 broker 实例(一个 Master broker 或一个 Slave broker)的地址
    zoneName当前 broker 所在的 zone
    enableActingMaster当前 broker 是否允许 slave 切换为 master,这个属性主要用于兼容老版本的 HA (High Availability)

    其中 enableActingMaster 属性在 RIP 32 Slave Acting Master Mode 中被添加,表示是否启用 slave acting master 模式,用于旧版本的HA适配。HA是高可用的缩写,指的是RocketMQ的主从架构。旧版本的HA指的是没有 DLedgerController 的主从架构,这种架构下,slave 不能提供消息的发送和消费,也不能执行一些 master 才能执行的操作。如果启用了 slave acting master 模式,当 master 出现故障时,slave可以承担一些 master 的任务,比如读取消息、扫描和转发特殊消息、反向同步元数据等。这样可以提高可用性和容错性。

  • brokerLiveTable:一个 Map 结构,key 是 BrokerAddrInfo 对象,value 是 BrokerLiveInfo 对象

    brokerLiveTable 中的 broker 是一个 broker 实例,如一个 Master broker 或 一个 Slave broker 实例

     private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    

    BrokerAddrInfo 类的具体属性如下表所示

    属性名含义
    clusterName当前 broker 实例所在的集群名称
    brokerAddr当前 broker 实例的地址

    BrokerLiveInfo 类的具体属性如下表所示

    属性名含义
    lastUpdateTimestampbroker 最后一次向 NameServer 发送心跳的时间戳。这个用来检查 broker 是否过期或者不在线
    heartbeatTimeoutMillisbroker 的心跳超时时间。如果 broker 在这个时间内没有发送心跳,就会被 NameServer 认为是离线的
    dataVersionbroker 的 topic 配置的数据版本。这个用来在 broker 注册或者重新注册时,比较和更新 NameServer 中的 topic 信息
    channelbroker 和 NameServer 之间的通道。这个用来进行通信和发送请求或者响应
    haServerAddrbroker 的高可用服务器的地址。这个用来让其他的 broker 或者消费者连接到 broker 进行数据复制或者消费
  • clusterAddrTable:一个 Map 结构,key 是 broker 所在的集群的名称,value 是 broker 名称的集合

    clusterAddrTable 中的 broker 是一个 broker 系统

    private final Map<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    
  • filterServerTable:一个 Map 结构,key 是 BrokerAddrInfo 对象,value 是过滤器服务器地址列表。过滤器服务器用于支持消息过滤功能

    filterServerTable 中的 broker 是一个 broker 实例

    private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
    
  • topicQueueMappingInfoTable:存储每个 topic 在每个 broker 上的queue信息,key为topic,value为一个map,key为brokerName,value为TopicQueueMappingInfo

    private final Map<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable;
    

    TopicQueueMappingInfo 类的具体属性如下表所示

    属性名含义
    topictopic名称,用于标识topic
    scopetopic的范围,全局或局部,用于区分不同的topic空间
    totalQueues总队列数,用于指定topic的分区数量
    bname所属的broker名称,用于识别托管该topic的broker
    epoch用于防止旧的脏数据,每次更新时递增
    dirty是否是脏数据,用于标记该topic是否需要被删除或更新
    currIdMap逻辑ID和物理ID的映射关系,用于将topic的逻辑队列映射到物理队列

以一个 cluster 为例,上述部分表的内容如下图所示

在这里插入图片描述

3. 路由管理

RouteInfoManager 提供了一系列的方法来更新和查询路由表,比如:

  • registerBroker:用于处理broker注册请求,向路由表中添加或更新broker数据、队列数据和过滤器服务器数据,并返回所有主题的队列数据给broker。
  • unregisterBroker:用于处理broker注销请求,从路由表中删除指定集群、broker名称和broker地址对应的数据。
  • scanNotActiveBroker:用于定时扫描并删除不活跃的broker数据,判断标准是上次更新时间超过2分钟。
  • getAllClusterInfo:用于获取所有集群信息,返回clusterAddrTable的字符串形式。
  • getTopicRouteInfo:用于获取指定主题的路由信息,返回包含broker数据和队列数据的TopicRouteData对象。如果开启了区域模式,还会根据区域名称过滤掉不属于该区域的broker数据和队列数据。
  • getSystemTopicList:用于获取系统内置主题列表,比如TBW102、OFFSET_MOVED_EVENT等。
  • getUnitTopics:用于获取单元化部署的主题列表,即包含“%”符号的主题。
  • getHasUnitSubTopicList:用于获取存在单元化订阅组的主题列表,即订阅组名称包含“%”符号的主题。
  • getHasUnitSubUnUnitTopicList:用于获取存在单元化订阅组但没有单元化部署的主题列表,即订阅组名称包含“%”符号但主题名称不包含“%”符号的主题。

3.1 注册 Broker

注册 Broker 的代码为 org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker,注册的 broker 是一个 broker 实例而非 broker 组

public RegisterBrokerResult registerBroker(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final String zoneName,
    final Long timeoutMillis,
    final Boolean enableActingMaster,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final Channel channel) {
    
    // ...
}

分为几个步骤

  1. 创建一个 RegisterBrokerResult 对象,用于返回注册结果

    RegisterBrokerResult result = new RegisterBrokerResult();
    
  2. 获取写锁,防止并发修改路由信息

    try {
        this.lock.writeLock().lockInterruptibly();
        // ...
    } catch (Exception e) {
        log.error("registerBroker Exception", e);
    } finally {
        this.lock.writeLock().unlock();
    }
    
  3. 更新 cluster 信息,向 clusterAddrTable 中添加 brokerName

    此步骤检查 cluster 是否被注册,即 clusterAddrTable 中是否存在 key 等于 clusterName

    • 如果 clusterAddrTable 中不存在 clusterName,则创建一个新的 Set 并将其作为 value,并赋值给 brokerNames

    • 如果 clusterAddrTable 中存在 clusterName,则直接返回该 value,并赋值给 brokerNames

    把当前 broker 的名称 brokerName 加入到集群对应的 broker 名称集合 brokerNames

    Set<String> brokerNames = ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap<String, Set<String>>) this.clusterAddrTable, clusterName, k -> new HashSet<>());
    brokerNames.add(brokerName);
    
  4. 更新 broker 信息,向 brokerAddrTable 添加 BrokerData

    此步骤检查 broker 组是否存在,即 brokerAddrTable 中是否存在 key 等于 brokerName

    • 如果 brokerAddrTable 中不存在 brokerName,则根据传入的参数创建一个新的 BrokerData 并添加进 brokerAddrTable

    • 如果 brokerAddrTable 中存在 brokerName,则检查存在的 broker 组是否是老版本的 HA 架构

      • 如果是老版本则设置 enableActingMaster 属性为 false
      • 否则设置 enableActingMaster 属性为 true。

      enableActingMaster 属性在上述 BrokerData 属性中已经进行了详细介绍

    最后设置/更新 broker 的 zoneName

    boolean registerFirst = false;
    
    // 检查 broker 组是否存在
    BrokerData brokerData = this.brokerAddrTable.get(brokerName);
    if (null == brokerData) { // broker 组不存在
        registerFirst = true;
        // 创建 BrokerData 对象,HashMap 中的 key 为 brokerId,value 为 brokerAddr
        brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());
        this.brokerAddrTable.put(brokerName, brokerData);
    }
    
    // 检查 broker 是否是老版本
    // 如果是老版本,则 enableActingMaster 为 null,则将 brokerData.enableActingMaster 值设置为 false
    // 如果不是老版本,则将 brokerData.enableActingMaster 的值设为 enableActingMaster 的值
    boolean isOldVersionBroker = enableActingMaster == null;
    brokerData.setEnableActingMaster(!isOldVersionBroker && enableActingMaster);
    brokerData.setZoneName(zoneName);
    
  5. 更新 broker 地址信息,向 brokerAddrTable 的值 BrokerDatabrokerAddrsMap 中移除可能冲突的 broker 地址,再把当前 broker 信息更新到 brokerAddrsMap

    此步骤有以下几部分

    • 如果 brokerAddrsMap 不为空则将 prevMinBrokerId 设为 brokerAddrsMap 中最小的 brokerId 值,否则设为 0

    • 检查正在注册的 brokerId 是否小于 brokerAddrsMap 中的最小 brokerId。如果是,它将 isMinBrokerIdChanged 设置为 true

    • brokerAddrsMap 中移除所有 brokerAddr 相等但是 brokerId 不相等的 broker。如果 brokerAddr 相等,但 brokerId 不相等,可能是由于从主切换重新注册,因此需要先移除旧的 broker

    • 将当前 broker 信息更新到 brokerAddrsMap

    Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
    
    // 添加 brokerId 和 brokerAddr 的映射关系
    boolean isMinBrokerIdChanged = false;
    long prevMinBrokerId = 0;
    // 如果 brokerAddrsMap 不为空,则获取 brokerAddrsMap 中最小的 brokerId
    if (!brokerAddrsMap.isEmpty()) {
        prevMinBrokerId = Collections.min(brokerAddrsMap.keySet());
    }
    // 如果传入的 brokerId 小于 brokerAddrsMap 中最小的 brokerId,则将 isMinBrokerIdChanged 设置为 true
    if (brokerId < prevMinBrokerId) {
        isMinBrokerIdChanged = true;
    }
    
    //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
    //The same IP:PORT must only have one record in brokerAddrTable
    // 从 brokerAddrsMap 中移除所有 brokerAddr 相等但是 brokerId 不相等的 broker,避免在从节点切换到主节点或相反操作时,在映射中有相同IP:PORT的重复记录
    brokerAddrsMap.entrySet().removeIf(item -> null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey());
    
    // 6.两种情况直接返回 代码省略...
    
    // 更新 brokerAddrsMap 中 brokerId 和 brokerAddr 的映射关系
    String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr);
    registerFirst = registerFirst || (StringUtils.isEmpty(oldAddr));
    
  6. 两种情况直接返回

    • 一个 broker 实例只能注册一次,如果 brokerId 重复,说明 brokerAddr 也重复,此时需要更新 brokerAddr

      如果现有 broker 的状态版本高于正在注册的 broker 的状态版本,说明现有的 broker 是更新的,不应该被覆盖,因此返回不注册/更新 broker 的结果,并将正在注册的 broker 信息从 brokerLiveTable 中移除

    • 如果 broker 之前没有注册过,并且只有一个topic配置,此时返回null,表示注册失败,因为 broker 不允许只有一个 topic 配置就注册

    //If Local brokerId stateVersion bigger than the registering one,
    String oldBrokerAddr = brokerAddrsMap.get(brokerId);
    
    /*
      该代码检查正在注册的 broker 是否与现有 broker 冲突。
      如果现有 broker 的状态版本高于正在注册的 broker 的状态版本,说明现有的 broker 是更新的,不应该被覆盖。
      此时记录警告,并且不会注册 broker,且正在注册的 broker 将从 brokerLiveTable 中删除。
      这可确保仅使用最新的 broker 信息。
     */
    if (null != oldBrokerAddr && !oldBrokerAddr.equals(brokerAddr)) {
        BrokerLiveInfo oldBrokerInfo = brokerLiveTable.get(new BrokerAddrInfo(clusterName, oldBrokerAddr));
    
        if (null != oldBrokerInfo) {
            long oldStateVersion = oldBrokerInfo.getDataVersion().getStateVersion();
            long newStateVersion = topicConfigWrapper.getDataVersion().getStateVersion();
            if (oldStateVersion > newStateVersion) {
                log.warn("Registered Broker conflicts with the existed one, just ignore.: Cluster:{}, BrokerName:{}, BrokerId:{}, " +
                        "Old BrokerAddr:{}, Old Version:{}, New BrokerAddr:{}, New Version:{}.",
                    clusterName, brokerName, brokerId, oldBrokerAddr, oldStateVersion, brokerAddr, newStateVersion);
                //Remove the rejected brokerAddr from brokerLiveTable.
                brokerLiveTable.remove(new BrokerAddrInfo(clusterName, brokerAddr));
                return result;
            }
        }
    }
    
    /*
      此代码检查正在注册的代理是否已在 brokerAddrsMap 中,以及 topicConfigWrapper 是否只有一个 topic 配置。
      如果 broker 不在 Map 中,并且只有一个 topic 配置,那么将记录一条警告,指出由于尚未注册 broker,因此无法注册主题配置包装器。
      然后,该方法返回 null,指示注册不成功。
     */
    if (!brokerAddrsMap.containsKey(brokerId) && topicConfigWrapper.getTopicConfigTable().size() == 1) {
        log.warn("Can't register topicConfigWrapper={} because broker[{}]={} has not registered.",
            topicConfigWrapper.getTopicConfigTable(), brokerId, brokerAddr);
        return null;
    }
    
  7. 更新 topicQueueTabletopicQueueMappingInfoTable

    • 如果 broker 是第一次注册或者 topicConfig 发生了变化,则更新 topicQueueTable 中的 QueueData
    • 如果 broker 是第一次注册或者 broker 实例对应的数据版本和参数中的数据版本不同,则更新 topicQueueMappingInfoTable
    boolean isMaster = MixAll.MASTER_ID == brokerId;
    // 这段代码的意义是判断一个broker是否是prime slave。
    // prime slave是一个在broker组中拥有最小brokerId的slave,它可以在原来的master失败时充当master。
    boolean isPrimeSlave = !isOldVersionBroker && !isMaster
        && brokerId == Collections.min(brokerAddrsMap.keySet());
    
    // 如果不是普通的 slave broker,则进行如下操作
    if (null != topicConfigWrapper && (isMaster || isPrimeSlave)) {
    
        ConcurrentMap<String, TopicConfig> tcTable =
            topicConfigWrapper.getTopicConfigTable();
        // 如果 tcTable 不为空,则遍历 tcTable
        // 如果 broker 是第一次注册或者 topicConfig 发生了变化,则更新 topicQueueTable 中的 QueueData
        if (tcTable != null) {
            for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                if (registerFirst || this.isTopicConfigChanged(clusterName, brokerAddr,
                                                               topicConfigWrapper.getDataVersion(), brokerName,
                                                               entry.getValue().getTopicName())) { // 如果是第一次注册或者 topicConfig 发生了变化
                    final TopicConfig topicConfig = entry.getValue();
                    if (isPrimeSlave) {
                        // Wipe write perm for prime slave
                        // 擦除 prime slave 的写权限
                        topicConfig.setPerm(topicConfig.getPerm() & (~PermName.PERM_WRITE));
                    }
                    // 创建/更新 topicQueueTable 中的 QueueData
                    this.createAndUpdateQueueData(brokerName, topicConfig);
                }
            }
        }
    
        // 如果 broker 是第一次注册或者 broker 实例对应的数据版本和参数中的数据版本不同,则更新 topicQueueMappingInfoTable
        if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {
            TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);
            Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap();
            //the topicQueueMappingInfoMap should never be null, but can be empty
            for (Map.Entry<String, TopicQueueMappingInfo> entry : topicQueueMappingInfoMap.entrySet()) {
                if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
                    topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<>());
                }
                //Note asset brokerName equal entry.getValue().getBname()
                //here use the mappingDetail.bname
                topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());
            }
        }
    }
    
  8. 更新 brokerLiveTable

    // 更新 brokerLiveTable
    BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
    BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddrInfo,
        new BrokerLiveInfo(
            System.currentTimeMillis(),
            timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
            topicConfigWrapper == null ? new DataVersion() : topicConfigWrapper.getDataVersion(),
            channel,
            haServerAddr));
    if (null == prevBrokerLiveInfo) {
        log.info("new broker registered, {} HAService: {}", brokerAddrInfo, haServerAddr);
    }
    
  9. 更新 filterServerTable

    // 更新 filterServerTable
    if (filterServerList != null) {
        if (filterServerList.isEmpty()) {
            this.filterServerTable.remove(brokerAddrInfo);
        } else {
            this.filterServerTable.put(brokerAddrInfo, filterServerList);
        }
    }
    
  10. 更新 result 中主节点的信息

    / 更新 result 中主节点的信息
    if (MixAll.MASTER_ID != brokerId) {
        String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
        if (masterAddr != null) {
            BrokerAddrInfo masterAddrInfo = new BrokerAddrInfo(clusterName, masterAddr);
            BrokerLiveInfo masterLiveInfo = this.brokerLiveTable.get(masterAddrInfo);
            if (masterLiveInfo != null) {
                result.setHaServerAddr(masterLiveInfo.getHaServerAddr());
                result.setMasterAddr(masterAddr);
            }
        }
    }
    
    if (isMinBrokerIdChanged && namesrvConfig.isNotifyMinBrokerIdChanged()) {
        notifyMinBrokerIdChanged(brokerAddrsMap, null,
            this.brokerLiveTable.get(brokerAddrInfo).getHaServerAddr());
    }
    
  11. 释放写锁,返回RegisterBrokerResult对象

    finally {
        this.lock.writeLock().unlock();
    }
    return result;
    

3.2 注销 Broker

注销 Broker 的代码为 org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#unregisterBroker,注销的 broker 是一个 broker 实例而非 broker 组

在 RIP 29 Optimize RocketMQ NameServer 中添加了 BatchUnregistrationService 类用来批量注销 Broker,运行代码如下

@Override
public void run() {
    while (!this.isStopped()) {
        try {
            final UnRegisterBrokerRequestHeader request = unregistrationQueue.take();
            Set<UnRegisterBrokerRequestHeader> unregistrationRequests = new HashSet<>();
            unregistrationQueue.drainTo(unregistrationRequests);

            // Add polled request
            unregistrationRequests.add(request);

            this.routeInfoManager.unRegisterBroker(unregistrationRequests);
        } catch (Throwable e) {
            log.error("Handle unregister broker request failed", e);
        }
    }
}

在 run 方法中调用了 org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#unRegisterBroker 方法,此方法可分为以下几个执行步骤

  1. 更新 brokerLiveTable

    // remove from brokerLiveTable
    BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddrInfo);
    log.info("unregisterBroker, remove from brokerLiveTable {}, {}",
        brokerLiveInfo != null ? "OK" : "Failed",
        brokerAddrInfo
    );
    
  2. 更新 filterServerTable

    // remove from filterServerTable
    this.filterServerTable.remove(brokerAddrInfo);
    
  3. 更新 brokerAddrTable

    • 把 brokerData 中和传入的 brokerAddr 相同的地址移除

    • 经过上述操作后如果 brokerAddrs 为空,说明这个 brokerName 已经没有 broker 实例了,则将 brokerAddrTable 中 key 为传入的 brokerName 的元素删除,并将 removeBrokerName 设置为 true

    boolean removeBrokerName = false;
    boolean isMinBrokerIdChanged = false;
    BrokerData brokerData = this.brokerAddrTable.get(brokerName);
    if (null != brokerData) {
        // 如果当前 brokerId 是 brokerData 中最小的 brokerId,则将 isMinBrokerIdChanged 设置为 true
        if (!brokerData.getBrokerAddrs().isEmpty() &&
            unRegisterRequest.getBrokerId().equals(Collections.min(brokerData.getBrokerAddrs().keySet()))) {
            isMinBrokerIdChanged = true;
        }
        // 把 brokerData 中和 brokerAddr 相同的地址移除
        boolean removed = brokerData.getBrokerAddrs().entrySet().removeIf(item -> item.getValue().equals(brokerAddr));
        log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}",
            removed ? "OK" : "Failed",
            brokerAddrInfo
        );
        // 如果brokerData中没有地址了,说明这个brokerName已经没有broker实例了,需要从brokerAddrTable中移除
        if (brokerData.getBrokerAddrs().isEmpty()) {
            this.brokerAddrTable.remove(brokerName);
            log.info("unregisterBroker, remove name from brokerAddrTable OK, {}",
                brokerName
            );
    
            removeBrokerName = true;
        } else if (isMinBrokerIdChanged) { // 如果最小的brokerId发生了变化,需要通知其他broker更新路由信息
            needNotifyBrokerMap.put(brokerName, new BrokerStatusChangeInfo(
                brokerData.getBrokerAddrs(), brokerAddr, null));
        }
    }
    
  4. 更新 clusterAddrTable

    • 如果 removeBrokerName 为 true,则删除 clusterAddrTable 中 key 为传入的 clusterName 的值 Set 中和传入的 brokerName 相等的元素。如果删除元素后此时 Set 集合为空,则将此 cluster 删除。并将删除的 brokerName 添加到 removedBroker 集合中
    • 如果 removeBrokerName 为 false,则将传入的 brokerName 添加到 reducedBroker 集合中
    // 如果brokerName已经没有broker实例了,需要将brokerName从clusterAddrTable中移除
    if (removeBrokerName) {
        Set<String> nameSet = this.clusterAddrTable.get(clusterName);
        if (nameSet != null) {
            boolean removed = nameSet.remove(brokerName);
            log.info("unregisterBroker, remove name from clusterAddrTable {}, {}",
                removed ? "OK" : "Failed",
                brokerName);
    
            if (nameSet.isEmpty()) {
                this.clusterAddrTable.remove(clusterName);
                log.info("unregisterBroker, remove cluster from clusterAddrTable {}",
                    clusterName
                );
            }
        }
        removedBroker.add(brokerName);
    } else {
        reducedBroker.add(brokerName);
    }
    
  5. 更新 topicQueueTable

    调用了 cleanTopicByUnRegisterRequests 方法,将上一步更新的 removedBroker 和 reducedBroker 作为参数传入方法中

    // 清理topicQueueTable中的无效数据
    cleanTopicByUnRegisterRequests(removedBroker, reducedBroker);
    

    cleanTopicByUnRegisterRequests 方法中清理了 topicQueueTable 中无效的数据

    private void cleanTopicByUnRegisterRequests(Set<String> removedBroker, Set<String> reducedBroker) {
        Iterator<Entry<String, Map<String, QueueData>>> itMap = this.topicQueueTable.entrySet().iterator();
        while (itMap.hasNext()) { // 遍历所有的topic
            Entry<String, Map<String, QueueData>> entry = itMap.next();
    
            String topic = entry.getKey();
            // 遍历topic对应的brokerName和QueueData
            Map<String, QueueData> queueDataMap = entry.getValue();
    
            // 遍历需要移除的brokerName
            for (final String brokerName : removedBroker) {
                // 移除这个brokerName对应的QueueData
                final QueueData removedQD = queueDataMap.remove(brokerName);
                if (removedQD != null) {
                    log.debug("removeTopicByBrokerName, remove one broker's topic {} {}", topic, removedQD);
                }
            }
    
            // 如果topic对应的brokerName都移除了,那么就移除这个topic
            if (queueDataMap.isEmpty()) {
                log.debug("removeTopicByBrokerName, remove the topic all queue {}", topic);
                itMap.remove();
            }
    
            // 遍历需要减少的brokerName
            for (final String brokerName : reducedBroker) {
                final QueueData queueData = queueDataMap.get(brokerName);
    
                if (queueData != null) {
                    // 如果这个brokerName对应的brokerData开启了自动切换master的功能,那么就需要判断这个brokerName对应的brokerData中是否还有master
                    if (this.brokerAddrTable.get(brokerName).isEnableActingMaster()) {
                        // Master has been unregistered, wipe the write perm
                        // 如果这个brokerName对应的brokerData中没有master了,那么就把这个topic的写权限去掉
                        if (isNoMasterExists(brokerName)) {
                            queueData.setPerm(queueData.getPerm() & (~PermName.PERM_WRITE));
                        }
                    }
                }
            }
        }
    }
    
  6. 通知其他 broker 组更新信息

    调用了 notifyMinBrokerIdChanged 方法

    // 如果最小的brokerId发生了变化,需要通知其他broker更新路由信息
    if (!needNotifyBrokerMap.isEmpty() && namesrvConfig.isNotifyMinBrokerIdChanged()) {
        notifyMinBrokerIdChanged(needNotifyBrokerMap);
    }
    

    notifyMinBrokerIdChanged 方法中又调用了重载后的 notifyMinBrokerIdChanged 方法

    private void notifyMinBrokerIdChanged(Map<String, BrokerStatusChangeInfo> needNotifyBrokerMap)
        throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException,
        RemotingTooMuchRequestException {
        for (String brokerName : needNotifyBrokerMap.keySet()) {
            BrokerStatusChangeInfo brokerStatusChangeInfo = needNotifyBrokerMap.get(brokerName);
            BrokerData brokerData = brokerAddrTable.get(brokerName);
            if (brokerData != null && brokerData.isEnableActingMaster()) {
                notifyMinBrokerIdChanged(brokerStatusChangeInfo.getBrokerAddrs(),
                    brokerStatusChangeInfo.getOfflineBrokerAddr(), brokerStatusChangeInfo.getHaBrokerAddr());
            }
        }
    }
    

    在重载的 notifyMinBrokerIdChanged 方法中向非最小 BrokerId 的 Broker 地址通知最小 BrokerId 发生了变化

    private void notifyMinBrokerIdChanged(Map<Long, String> brokerAddrMap, String offlineBrokerAddr,
        String haBrokerAddr)
        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException,
        RemotingTooMuchRequestException, RemotingConnectException {
        if (brokerAddrMap == null || brokerAddrMap.isEmpty() || this.namesrvController == null) {
            return;
        }
    
        NotifyMinBrokerIdChangeRequestHeader requestHeader = new NotifyMinBrokerIdChangeRequestHeader();
        long minBrokerId = Collections.min(brokerAddrMap.keySet());
        requestHeader.setMinBrokerId(minBrokerId);
        requestHeader.setMinBrokerAddr(brokerAddrMap.get(minBrokerId));
        requestHeader.setOfflineBrokerAddr(offlineBrokerAddr);
        requestHeader.setHaBrokerAddr(haBrokerAddr);
    
        // 选择通知的 Broker 地址,即非最小 BrokerId 的 Broker 地址
        List<String> brokerAddrsNotify = chooseBrokerAddrsToNotify(brokerAddrMap, offlineBrokerAddr);
        log.info("min broker id changed to {}, notify {}, offline broker addr {}", minBrokerId, brokerAddrsNotify, offlineBrokerAddr);
        RemotingCommand request =
            RemotingCommand.createRequestCommand(RequestCode.NOTIFY_MIN_BROKER_ID_CHANGE, requestHeader);
        for (String brokerAddr : brokerAddrsNotify) {
            // 向 Broker 发送通知
            this.namesrvController.getRemotingClient().invokeOneway(brokerAddr, request, 300);
        }
    }
    

3.3 拼凑 TopicRouteData

首先来看 TopicRouteData 的定义,其属性如下

private String orderTopicConf;
private List<QueueData> queueDatas;
private List<BrokerData> brokerDatas;
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
//It could be null or empty
private Map<String/*brokerName*/, TopicQueueMappingInfo> topicQueueMappingByBroker;

拼凑 TopicRouteData 的方法是 org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#pickupTopicRouteData,此方法可大致分为以下几个执行步骤

  1. topicQueueTable 按照主题名称查询 queueDatas

  2. 根据 queueDatas 中 brokerName 属性从 brokerAddrTable 取得 brokerData 对象,组成 brokerDatas

  3. 根据查询到的 brokerDatafilterServerTable 查询到对应的 filterServer 列表,组装为 Map

    // 从topicQueueTable按照主题名称查询queueDatas
    Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topic);
    if (queueDataMap != null) {
        // 保存查询到的queueDatas
        topicRouteData.setQueueDatas(new ArrayList<>(queueDataMap.values()));
        foundQueueData = true;
    
        Set<String> brokerNameSet = new HashSet<>(queueDataMap.keySet());
    
        // 根据queueDatas中brokerName属性从brokerAddrTable取得brokerData对象,组成brokerDatas
        for (String brokerName : brokerNameSet) {
            BrokerData brokerData = this.brokerAddrTable.get(brokerName);
            if (null == brokerData) {
                continue;
            }
            BrokerData brokerDataClone = new BrokerData(brokerData);
    
            // 保存查询到的brokerDatas
            brokerDataList.add(brokerDataClone);
            foundBrokerData = true;
            if (filterServerTable.isEmpty()) {
                continue;
            }
            // 根据查询到的brokerData从filterServerTable查询到对应的filterServer列表,组装为Map
            for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
                BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(brokerDataClone.getCluster(), brokerAddr);
                List<String> filterServerList = this.filterServerTable.get(brokerAddrInfo);
                // 保存查询到的filterServer列表
                filterServerMap.put(brokerAddr, filterServerList);
            }
    
        }
    }
    
  4. 返回 topicRouteData,如果有需要会重新设置 Master 的地址

    if (foundBrokerData && foundQueueData) {
    
        topicRouteData.setTopicQueueMappingByBroker(this.topicQueueMappingInfoTable.get(topic));
    
        if (!namesrvConfig.isSupportActingMaster()) {
            return topicRouteData;
        }
    
        if (topic.startsWith(TopicValidator.SYNC_BROKER_MEMBER_GROUP_PREFIX)) {
            return topicRouteData;
        }
    
        if (topicRouteData.getBrokerDatas().size() == 0 || topicRouteData.getQueueDatas().size() == 0) {
            return topicRouteData;
        }
    
        boolean needActingMaster = false;
    
        for (final BrokerData brokerData : topicRouteData.getBrokerDatas()) {
            if (brokerData.getBrokerAddrs().size() != 0
                && !brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
                needActingMaster = true;
                break;
            }
        }
    
        if (!needActingMaster) {
            return topicRouteData;
        }
    
        // 如果需要自动切换master,则执行下面的逻辑
        for (final BrokerData brokerData : topicRouteData.getBrokerDatas()) {
            final HashMap<Long, String> brokerAddrs = brokerData.getBrokerAddrs();
            // 如果brokerAddrs为空,或者brokerAddrs中包含masterId,或者brokerData中的enableActingMaster为false,那么就跳过
            if (brokerAddrs.size() == 0 || brokerAddrs.containsKey(MixAll.MASTER_ID) || !brokerData.isEnableActingMaster()) {
                continue;
            }
    
            // No master
            for (final QueueData queueData : topicRouteData.getQueueDatas()) {
                if (queueData.getBrokerName().equals(brokerData.getBrokerName())) {
                    // 如果queueData的perm不可写,那么就将brokerAddrs中brokerId最小的brokerAddr的brokerId改为masterId
                    if (!PermName.isWriteable(queueData.getPerm())) {
                        final Long minBrokerId = Collections.min(brokerAddrs.keySet());
                        final String actingMasterAddr = brokerAddrs.remove(minBrokerId);
                        brokerAddrs.put(MixAll.MASTER_ID, actingMasterAddr);
                    }
                    break;
                }
            }
    
        }
    
        return topicRouteData;
    }
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/475121.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

亚马逊、沃尔玛、ebay测评出现风控、砍单、封号怎么解决?

大家好&#xff0c;我是亚马逊测评珑哥&#xff0c;提前祝各位跨境朋友五一假期愉快。 很多卖家和工作室的朋友加珑哥&#xff0c;沟通中很多朋友都在问为什么测评中一直被砍单&#xff0c;封号是什么原因&#xff1f;其实测评不是你随便买个IP&#xff0c;或者买几个买家号就…

轻松掌握mysql慢查询定位与优化知识点

在这里插入图片描述 1、利用工具定位慢sql 1、运维工具Skywalking 1、定位到慢接口 2、追踪慢sql的执行情况 2、利用MySQL的日志定位慢sql 在调式阶段才开启慢日志的查询&#xff0c;因为会损耗一些性能。 3、分析是否正确使用了索引 当我们已经定位到具体哪个sql较慢时&…

【计算几何】帝国边界划分问题【Voronoi图的原理】

一、说明 Voronoi 单元也称为泰森多边形。 Voronoi 图在许多领域都有实际和理论应用&#xff0c;主要是在科学和技术领域&#xff0c;但也在视觉艺术领域使用。Voronoi 图以数学家 Georgy Voronoy 的名字命名&#xff0c;也称为 Voronoi 镶嵌、Voronoi 分解、Voronoi 分区或 Di…

减少过拟合:暂退法

文章目录 &#xff08;一&#xff09;过拟合&#xff08;二&#xff09;暂退法 &#xff08;一&#xff09;过拟合 1.过拟合产生的原因 (1)根本原因&#xff1a; 我们都知道模型参数的优化方式&#xff1a;反向传播更新梯度&#xff0c;然后随机梯度下降。 也非常清楚模型参…

【Java笔试强训 9】

&#x1f389;&#x1f389;&#x1f389;点进来你就是我的人了博主主页&#xff1a;&#x1f648;&#x1f648;&#x1f648;戳一戳,欢迎大佬指点! 欢迎志同道合的朋友一起加油喔&#x1f93a;&#x1f93a;&#x1f93a; 目录 一、选择题 二、编程题 &#x1f525;另类加法…

弗洛伊德算法(求最短路径)

弗洛伊德算法介绍 和迪杰斯特拉算法一 样&#xff0c; 弗洛伊德(Floyd)算法也是一种用于寻找给定的加权图中顶点间最短路径的算法。弗洛伊德算法(Floyd)计算图中各个顶点之间的最短路径迪杰斯特拉算法用于计算图中某-一个顶点到其他项点的最短路径。弗洛伊德算法VS迪杰斯特拉算…

【数据库架构】PostgreSQL的最佳群集高可用性方案

如果您的系统依赖PostgreSQL数据库并且您正在寻找HA的集群解决方案&#xff0c;我们希望提前告知您这是一项复杂的任务&#xff0c;但并非不可能实现。 我们将讨论一些解决方案&#xff0c;您可以从中选择对您的容错要求。 PostgreSQL本身不支持任何多主群集解决方案&#xff0…

Python Unet ++ :医学图像分割,医学细胞分割,Unet医学图像处理,语义分割

一&#xff0c;语义分割&#xff1a;分割领域前几年的发展 图像分割是机器视觉任务的一个重要基础任务&#xff0c;在图像分析、自动驾驶、视频监控等方面都有很重要的作用。图像分割可以被看成一个分类任务&#xff0c;需要给每个像素进行分类&#xff0c;所以就比图像分类任务…

C++-FFmpeg-8-(1)基本概念与原理-rtsp-I、P、B 帧-DTS、PTS-

目录 1.rtsp是什么&#xff1f; 2. I、P、B 帧 3.DTS、PTS 4.rtsp协议抓包分析&#xff1f; 1.rtsp是什么&#xff1f; 流程&#xff1a; 鉴权&#xff1a; 2种 &#xff1a;basice64 Digest 哈希值 哈希值不可逆。nonce 做的单项散列&#xff08;MD5,SHA512&#xff0…

HTML(二) -- 表格设计

目录 1. 基本格式&#xff1a; 表格常用属性&#xff1a; 2. 表格标签 为什么使用表格&#xff1f; 简单通用、结构稳定数据显示的非常的规整、可读性非常好 1. 基本格式&#xff1a; <table style"border: 1px solid black;" border"1px">&l…

AWE2023什么值得看?智哪儿带你五大关键词读懂AWE2023

4月27至30日&#xff0c;2023年中国家电及消费电子博览会&#xff08;AWE 2023&#xff09;在上海浦东新国际博览中心开展。 作为与德国IFA、美国CES并肩的全球前三国际家电及消费电子展览会&#xff0c;时隔两年AWE终于重启。沉淀两年&#xff0c;它的规模也是历年最大&#x…

QT+OpenCV配置

QTOpenCV配置 1 下载CMake2 安装CMake3 下载OPenCV4 配置环境变量4.1 配置QT环境变量4.2 配置CMake环境变量4.3 重启电脑生效 5 CMake编译OPenCV5.1 解决报错 6 测试 1 下载CMake 链接&#xff1a;https://cmake.org/download/ 2 安装CMake 3 下载OPenCV 链接&#xff1a;htt…

本地elasticsearch中文分词器 ik分词器安装及使用

ElasticSearch 内置了分词器&#xff0c;如标准分词器、简单分词器、空白词器等。但这些分词器对我们最常使用的中文并不友好&#xff0c;不能按我们的语言习惯进行分词。 ik分词器就是一个标准的中文分词器。它可以根据定义的字典对域进行分词&#xff0c;并且支持用户配置自…

网络设备中VRRP协议和Linux服务器中keepalived的两个区别

1、什么是VRRP&#xff1f;keepalived又是什么&#xff1f; VRRP全称是Virtual Router Redundancy Protocol&#xff0c;即虚拟路由冗余协议。它的主要目的是在一个网络中提供冗余的路由。当一个三层网络设备或服务器出现故障时&#xff0c;VRRP可以确保网络仍能正常工作。VRR…

在.NET Core中正确使用HttpClient的方式

HttpClient 是 .NET Framework、.NET Core 或 .NET 5以上版本中的一个类&#xff0c;用于向 Web API 发送 HTTP 请求并接收响应。它提供了一些简单易用的方法&#xff0c;如 GET、POST、PUT 和 DELETE&#xff0c;可以很容易地构造和发送 HTTP 请求&#xff0c;并处理响应数据。…

算法之时间复杂度---数据结构

目录 前言&#xff1a; 1.时间复杂度 1.1时间复杂度的理解 1.2规模与基本操作执行次数 1.3大O渐进表示法 1.4计算基本操作的次数 2.常见的时间复杂度及其优劣比较 ❤博主CSDN&#xff1a;啊苏要学习 ▶专栏分类&#xff1a;数据结构◀ 学习数据结构是一件有趣的事情&…

2023五一数学建模B题完整模型代码【原创首发】

已经完成五一数学建模全部内容&#xff0c;大家可以文末查看&#xff01;&#xff01;供参考使用&#xff01; 摘要 随着网络购物的普及和发展&#xff0c;快递行业需求持续增长&#xff0c;对于快递公司来说&#xff0c;准确预测运输需求以及合理规划运输线路和仓库布局变得…

分享一个有意思的键盘,我们就只会ctrl c+v

先上效果图&#xff1a; 再上代码&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</title><style>* {border: 0;box-sizing: border-box;margin: 0;padding: 0;}:roo…

【pyTorch学习笔记④】PyTorch基础·中篇

文章目录 三、Numpy与Tensor3.Tensor的索引4.Tensor的广播机制5.逐元素操作6.归并操作7.比较操作8.矩阵操作9.PyTorch与Numpy的比较 相关推荐 三、Numpy与Tensor 3.Tensor的索引 &#xff08;1&#xff09;item&#xff1a;若Tensor为单元素&#xff0c;则返回标量&#xff0…

驱动开发:通过MDL映射实现多次通信

在前几篇文章中LyShark通过多种方式实现了驱动程序与应用层之间的通信&#xff0c;这其中就包括了通过运用SystemBuf缓冲区通信&#xff0c;运用ReadFile读写通信&#xff0c;运用PIPE管道通信&#xff0c;以及运用ASYNC反向通信&#xff0c;这些通信方式在应对一收一发模式的时…