文章目录
- 1. 路由管理核心组件介绍
- 2. RouteInfoManager 路由表
- 3. 路由管理
- 3.1 注册 Broker
- 3.2 注销 Broker
- 3.3 拼凑 TopicRouteData
此文章基于 RocketMQ 5.1 版本进行分析,与 4.x 版本相比此文章分析的部分源码有很大的区别
1. 路由管理核心组件介绍
路由管理是指维护 Broker、Topic、Queue 和 Consumer Group 之间的对应关系,以及提供给 Producer 和 Consumer 获取这些关系的服务。路由管理涉及到以下几个核心组件:
- NameServerController:NameServer 的控制器类,负责初始化、启动和关闭 NameServer 的各个组件。
- RouteInfoManager:NameServer 的核心组件之一,负责维护 Broker、Topic、Queue 和 Consumer Group 的路由信息,以及提供查询、注册和删除的服务。
- BrokerController:Broker 的控制器类,负责初始化、启动和关闭 Broker 的各个组件。
- BrokerOuterAPI:Broker 的核心组件之一,负责与 NameServer 通信,定时向 NameServer 注册自身信息,并获取其他 Broker 的信息。
- MQClientInstance:Producer 和 Consumer 的内部实现类,负责管理 Producer 和 Consumer 的信息,并与 NameServer 和 Broker 通信。
- MQClientAPIImpl:MQClientInstance 的核心组件之一,负责封装与 NameServer 和 Broker 的通信协议,并执行相应的请求。
本文仅围绕 RouteInfoManager
对 RocketMQ 5.1 版本路由管理进行分析
2. RouteInfoManager 路由表
namesrv的路由管理主要由 RouteInfoManager
对象负责,该对象维护了一个内存中的路由表,包括以下几个部分:
-
topicQueueTable:一个 Map 结构,key是 topic 名称,value 是一个 Map 结构,key 是 brokerName,value 是 QueueData 对象
topicQueueTable 中的 broker 是一个 broker 组,包含 Master 和 Slave 两部分
private final Map<String/* topic */, Map<String, QueueData>> topicQueueTable;
QueueData 类的具体属性如下表所示
属性名 含义 brokerName 存储该队列的 broker 的名称 readQueueNums 读队列的数量,即该队列下有多少个读队列,这个值在创建队列时由用户指定,通常是4 writeQueueNums 写队列的数量,即该队列下有多少个写队列,这个值在创建队列时由用户指定,通常是4 perm 队列的权限,是一个二进制数,如6代表可读写,4代表只可读,2代表只可写,0代表没有权限 topicSysFlag 系统标志 -
brokerAddrTable:一个 Map 结构,key 是 brokerName,value 是 BrokerData 对象
brokerAddrTable 中的 broker 是一个 broker 组,包含 Master 和 Slave 两部分
private final Map<String/* brokerName */, BrokerData> brokerAddrTable;
BrokerData 类的具体属性如下表所示
属性名 含义 cluster 当前 broker 所在的集群名称 brokerName 当前 broker 的名称 brokerAddrs 当前 broker 的地址,是一个 Map,key 是 brokerId,value 是单个 broker 实例(一个 Master broker 或一个 Slave broker)的地址 zoneName 当前 broker 所在的 zone enableActingMaster 当前 broker 是否允许 slave 切换为 master,这个属性主要用于兼容老版本的 HA (High Availability) 其中 enableActingMaster 属性在 RIP 32 Slave Acting Master Mode 中被添加,表示是否启用 slave acting master 模式,用于旧版本的HA适配。HA是高可用的缩写,指的是RocketMQ的主从架构。旧版本的HA指的是没有 DLedgerController 的主从架构,这种架构下,slave 不能提供消息的发送和消费,也不能执行一些 master 才能执行的操作。如果启用了 slave acting master 模式,当 master 出现故障时,slave可以承担一些 master 的任务,比如读取消息、扫描和转发特殊消息、反向同步元数据等。这样可以提高可用性和容错性。
-
brokerLiveTable:一个 Map 结构,key 是 BrokerAddrInfo 对象,value 是 BrokerLiveInfo 对象
brokerLiveTable 中的 broker 是一个 broker 实例,如一个 Master broker 或 一个 Slave broker 实例
private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
BrokerAddrInfo 类的具体属性如下表所示
属性名 含义 clusterName 当前 broker 实例所在的集群名称 brokerAddr 当前 broker 实例的地址 BrokerLiveInfo 类的具体属性如下表所示
属性名 含义 lastUpdateTimestamp broker 最后一次向 NameServer 发送心跳的时间戳。这个用来检查 broker 是否过期或者不在线 heartbeatTimeoutMillis broker 的心跳超时时间。如果 broker 在这个时间内没有发送心跳,就会被 NameServer 认为是离线的 dataVersion broker 的 topic 配置的数据版本。这个用来在 broker 注册或者重新注册时,比较和更新 NameServer 中的 topic 信息 channel broker 和 NameServer 之间的通道。这个用来进行通信和发送请求或者响应 haServerAddr broker 的高可用服务器的地址。这个用来让其他的 broker 或者消费者连接到 broker 进行数据复制或者消费 -
clusterAddrTable:一个 Map 结构,key 是 broker 所在的集群的名称,value 是 broker 名称的集合
clusterAddrTable 中的 broker 是一个 broker 系统
private final Map<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
-
filterServerTable:一个 Map 结构,key 是 BrokerAddrInfo 对象,value 是过滤器服务器地址列表。过滤器服务器用于支持消息过滤功能
filterServerTable 中的 broker 是一个 broker 实例
private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
-
topicQueueMappingInfoTable:存储每个 topic 在每个 broker 上的queue信息,key为topic,value为一个map,key为brokerName,value为TopicQueueMappingInfo
private final Map<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable;
TopicQueueMappingInfo 类的具体属性如下表所示
属性名 含义 topic topic名称,用于标识topic scope topic的范围,全局或局部,用于区分不同的topic空间 totalQueues 总队列数,用于指定topic的分区数量 bname 所属的broker名称,用于识别托管该topic的broker epoch 用于防止旧的脏数据,每次更新时递增 dirty 是否是脏数据,用于标记该topic是否需要被删除或更新 currIdMap 逻辑ID和物理ID的映射关系,用于将topic的逻辑队列映射到物理队列
以一个 cluster 为例,上述部分表的内容如下图所示
3. 路由管理
RouteInfoManager 提供了一系列的方法来更新和查询路由表,比如:
- registerBroker:用于处理broker注册请求,向路由表中添加或更新broker数据、队列数据和过滤器服务器数据,并返回所有主题的队列数据给broker。
- unregisterBroker:用于处理broker注销请求,从路由表中删除指定集群、broker名称和broker地址对应的数据。
- scanNotActiveBroker:用于定时扫描并删除不活跃的broker数据,判断标准是上次更新时间超过2分钟。
- getAllClusterInfo:用于获取所有集群信息,返回clusterAddrTable的字符串形式。
- getTopicRouteInfo:用于获取指定主题的路由信息,返回包含broker数据和队列数据的TopicRouteData对象。如果开启了区域模式,还会根据区域名称过滤掉不属于该区域的broker数据和队列数据。
- getSystemTopicList:用于获取系统内置主题列表,比如TBW102、OFFSET_MOVED_EVENT等。
- getUnitTopics:用于获取单元化部署的主题列表,即包含“%”符号的主题。
- getHasUnitSubTopicList:用于获取存在单元化订阅组的主题列表,即订阅组名称包含“%”符号的主题。
- getHasUnitSubUnUnitTopicList:用于获取存在单元化订阅组但没有单元化部署的主题列表,即订阅组名称包含“%”符号但主题名称不包含“%”符号的主题。
3.1 注册 Broker
注册 Broker 的代码为 org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker
,注册的 broker 是一个 broker 实例而非 broker 组
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final String zoneName,
final Long timeoutMillis,
final Boolean enableActingMaster,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
// ...
}
分为几个步骤
-
创建一个 RegisterBrokerResult 对象,用于返回注册结果
RegisterBrokerResult result = new RegisterBrokerResult();
-
获取写锁,防止并发修改路由信息
try { this.lock.writeLock().lockInterruptibly(); // ... } catch (Exception e) { log.error("registerBroker Exception", e); } finally { this.lock.writeLock().unlock(); }
-
更新 cluster 信息,向
clusterAddrTable
中添加 brokerName此步骤检查 cluster 是否被注册,即
clusterAddrTable
中是否存在 key 等于clusterName
-
如果
clusterAddrTable
中不存在clusterName
,则创建一个新的 Set 并将其作为 value,并赋值给brokerNames
-
如果
clusterAddrTable
中存在clusterName
,则直接返回该 value,并赋值给brokerNames
把当前 broker 的名称
brokerName
加入到集群对应的 broker 名称集合brokerNames
中Set<String> brokerNames = ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap<String, Set<String>>) this.clusterAddrTable, clusterName, k -> new HashSet<>()); brokerNames.add(brokerName);
-
-
更新 broker 信息,向
brokerAddrTable
添加 BrokerData此步骤检查 broker 组是否存在,即
brokerAddrTable
中是否存在 key 等于brokerName
-
如果
brokerAddrTable
中不存在brokerName
,则根据传入的参数创建一个新的BrokerData
并添加进brokerAddrTable
中 -
如果
brokerAddrTable
中存在brokerName
,则检查存在的 broker 组是否是老版本的 HA 架构- 如果是老版本则设置
enableActingMaster
属性为 false - 否则设置
enableActingMaster
属性为 true。
enableActingMaster
属性在上述 BrokerData 属性中已经进行了详细介绍 - 如果是老版本则设置
最后设置/更新 broker 的 zoneName
boolean registerFirst = false; // 检查 broker 组是否存在 BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null == brokerData) { // broker 组不存在 registerFirst = true; // 创建 BrokerData 对象,HashMap 中的 key 为 brokerId,value 为 brokerAddr brokerData = new BrokerData(clusterName, brokerName, new HashMap<>()); this.brokerAddrTable.put(brokerName, brokerData); } // 检查 broker 是否是老版本 // 如果是老版本,则 enableActingMaster 为 null,则将 brokerData.enableActingMaster 值设置为 false // 如果不是老版本,则将 brokerData.enableActingMaster 的值设为 enableActingMaster 的值 boolean isOldVersionBroker = enableActingMaster == null; brokerData.setEnableActingMaster(!isOldVersionBroker && enableActingMaster); brokerData.setZoneName(zoneName);
-
-
更新 broker 地址信息,向
brokerAddrTable
的值BrokerData
的brokerAddrsMap
中移除可能冲突的 broker 地址,再把当前 broker 信息更新到brokerAddrsMap
中此步骤有以下几部分
-
如果
brokerAddrsMap
不为空则将 prevMinBrokerId 设为brokerAddrsMap
中最小的 brokerId 值,否则设为 0 -
检查正在注册的 brokerId 是否小于
brokerAddrsMap
中的最小 brokerId。如果是,它将 isMinBrokerIdChanged 设置为 true -
从
brokerAddrsMap
中移除所有 brokerAddr 相等但是 brokerId 不相等的 broker。如果 brokerAddr 相等,但 brokerId 不相等,可能是由于从主切换重新注册,因此需要先移除旧的 broker -
将当前 broker 信息更新到
brokerAddrsMap
中
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs(); // 添加 brokerId 和 brokerAddr 的映射关系 boolean isMinBrokerIdChanged = false; long prevMinBrokerId = 0; // 如果 brokerAddrsMap 不为空,则获取 brokerAddrsMap 中最小的 brokerId if (!brokerAddrsMap.isEmpty()) { prevMinBrokerId = Collections.min(brokerAddrsMap.keySet()); } // 如果传入的 brokerId 小于 brokerAddrsMap 中最小的 brokerId,则将 isMinBrokerIdChanged 设置为 true if (brokerId < prevMinBrokerId) { isMinBrokerIdChanged = true; } //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT> //The same IP:PORT must only have one record in brokerAddrTable // 从 brokerAddrsMap 中移除所有 brokerAddr 相等但是 brokerId 不相等的 broker,避免在从节点切换到主节点或相反操作时,在映射中有相同IP:PORT的重复记录 brokerAddrsMap.entrySet().removeIf(item -> null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()); // 6.两种情况直接返回 代码省略... // 更新 brokerAddrsMap 中 brokerId 和 brokerAddr 的映射关系 String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr); registerFirst = registerFirst || (StringUtils.isEmpty(oldAddr));
-
-
两种情况直接返回
-
一个 broker 实例只能注册一次,如果 brokerId 重复,说明 brokerAddr 也重复,此时需要更新 brokerAddr
如果现有 broker 的状态版本高于正在注册的 broker 的状态版本,说明现有的 broker 是更新的,不应该被覆盖,因此返回不注册/更新 broker 的结果,并将正在注册的 broker 信息从
brokerLiveTable
中移除 -
如果 broker 之前没有注册过,并且只有一个topic配置,此时返回null,表示注册失败,因为 broker 不允许只有一个 topic 配置就注册
//If Local brokerId stateVersion bigger than the registering one, String oldBrokerAddr = brokerAddrsMap.get(brokerId); /* 该代码检查正在注册的 broker 是否与现有 broker 冲突。 如果现有 broker 的状态版本高于正在注册的 broker 的状态版本,说明现有的 broker 是更新的,不应该被覆盖。 此时记录警告,并且不会注册 broker,且正在注册的 broker 将从 brokerLiveTable 中删除。 这可确保仅使用最新的 broker 信息。 */ if (null != oldBrokerAddr && !oldBrokerAddr.equals(brokerAddr)) { BrokerLiveInfo oldBrokerInfo = brokerLiveTable.get(new BrokerAddrInfo(clusterName, oldBrokerAddr)); if (null != oldBrokerInfo) { long oldStateVersion = oldBrokerInfo.getDataVersion().getStateVersion(); long newStateVersion = topicConfigWrapper.getDataVersion().getStateVersion(); if (oldStateVersion > newStateVersion) { log.warn("Registered Broker conflicts with the existed one, just ignore.: Cluster:{}, BrokerName:{}, BrokerId:{}, " + "Old BrokerAddr:{}, Old Version:{}, New BrokerAddr:{}, New Version:{}.", clusterName, brokerName, brokerId, oldBrokerAddr, oldStateVersion, brokerAddr, newStateVersion); //Remove the rejected brokerAddr from brokerLiveTable. brokerLiveTable.remove(new BrokerAddrInfo(clusterName, brokerAddr)); return result; } } } /* 此代码检查正在注册的代理是否已在 brokerAddrsMap 中,以及 topicConfigWrapper 是否只有一个 topic 配置。 如果 broker 不在 Map 中,并且只有一个 topic 配置,那么将记录一条警告,指出由于尚未注册 broker,因此无法注册主题配置包装器。 然后,该方法返回 null,指示注册不成功。 */ if (!brokerAddrsMap.containsKey(brokerId) && topicConfigWrapper.getTopicConfigTable().size() == 1) { log.warn("Can't register topicConfigWrapper={} because broker[{}]={} has not registered.", topicConfigWrapper.getTopicConfigTable(), brokerId, brokerAddr); return null; }
-
-
更新
topicQueueTable
和topicQueueMappingInfoTable
- 如果 broker 是第一次注册或者 topicConfig 发生了变化,则更新
topicQueueTable
中的QueueData
- 如果 broker 是第一次注册或者 broker 实例对应的数据版本和参数中的数据版本不同,则更新
topicQueueMappingInfoTable
boolean isMaster = MixAll.MASTER_ID == brokerId; // 这段代码的意义是判断一个broker是否是prime slave。 // prime slave是一个在broker组中拥有最小brokerId的slave,它可以在原来的master失败时充当master。 boolean isPrimeSlave = !isOldVersionBroker && !isMaster && brokerId == Collections.min(brokerAddrsMap.keySet()); // 如果不是普通的 slave broker,则进行如下操作 if (null != topicConfigWrapper && (isMaster || isPrimeSlave)) { ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable(); // 如果 tcTable 不为空,则遍历 tcTable // 如果 broker 是第一次注册或者 topicConfig 发生了变化,则更新 topicQueueTable 中的 QueueData if (tcTable != null) { for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { if (registerFirst || this.isTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion(), brokerName, entry.getValue().getTopicName())) { // 如果是第一次注册或者 topicConfig 发生了变化 final TopicConfig topicConfig = entry.getValue(); if (isPrimeSlave) { // Wipe write perm for prime slave // 擦除 prime slave 的写权限 topicConfig.setPerm(topicConfig.getPerm() & (~PermName.PERM_WRITE)); } // 创建/更新 topicQueueTable 中的 QueueData this.createAndUpdateQueueData(brokerName, topicConfig); } } } // 如果 broker 是第一次注册或者 broker 实例对应的数据版本和参数中的数据版本不同,则更新 topicQueueMappingInfoTable if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper); Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap(); //the topicQueueMappingInfoMap should never be null, but can be empty for (Map.Entry<String, TopicQueueMappingInfo> entry : topicQueueMappingInfoMap.entrySet()) { if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) { topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<>()); } //Note asset brokerName equal entry.getValue().getBname() //here use the mappingDetail.bname topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue()); } } }
- 如果 broker 是第一次注册或者 topicConfig 发生了变化,则更新
-
更新 brokerLiveTable
// 更新 brokerLiveTable BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(clusterName, brokerAddr); BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddrInfo, new BrokerLiveInfo( System.currentTimeMillis(), timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis, topicConfigWrapper == null ? new DataVersion() : topicConfigWrapper.getDataVersion(), channel, haServerAddr)); if (null == prevBrokerLiveInfo) { log.info("new broker registered, {} HAService: {}", brokerAddrInfo, haServerAddr); }
-
更新 filterServerTable
// 更新 filterServerTable if (filterServerList != null) { if (filterServerList.isEmpty()) { this.filterServerTable.remove(brokerAddrInfo); } else { this.filterServerTable.put(brokerAddrInfo, filterServerList); } }
-
更新 result 中主节点的信息
/ 更新 result 中主节点的信息 if (MixAll.MASTER_ID != brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); if (masterAddr != null) { BrokerAddrInfo masterAddrInfo = new BrokerAddrInfo(clusterName, masterAddr); BrokerLiveInfo masterLiveInfo = this.brokerLiveTable.get(masterAddrInfo); if (masterLiveInfo != null) { result.setHaServerAddr(masterLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); } } } if (isMinBrokerIdChanged && namesrvConfig.isNotifyMinBrokerIdChanged()) { notifyMinBrokerIdChanged(brokerAddrsMap, null, this.brokerLiveTable.get(brokerAddrInfo).getHaServerAddr()); }
-
释放写锁,返回RegisterBrokerResult对象
finally { this.lock.writeLock().unlock(); } return result;
3.2 注销 Broker
注销 Broker 的代码为 org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#unregisterBroker
,注销的 broker 是一个 broker 实例而非 broker 组
在 RIP 29 Optimize RocketMQ NameServer 中添加了 BatchUnregistrationService 类用来批量注销 Broker,运行代码如下
@Override
public void run() {
while (!this.isStopped()) {
try {
final UnRegisterBrokerRequestHeader request = unregistrationQueue.take();
Set<UnRegisterBrokerRequestHeader> unregistrationRequests = new HashSet<>();
unregistrationQueue.drainTo(unregistrationRequests);
// Add polled request
unregistrationRequests.add(request);
this.routeInfoManager.unRegisterBroker(unregistrationRequests);
} catch (Throwable e) {
log.error("Handle unregister broker request failed", e);
}
}
}
在 run 方法中调用了 org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#unRegisterBroker
方法,此方法可分为以下几个执行步骤
-
更新 brokerLiveTable
// remove from brokerLiveTable BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddrInfo); log.info("unregisterBroker, remove from brokerLiveTable {}, {}", brokerLiveInfo != null ? "OK" : "Failed", brokerAddrInfo );
-
更新 filterServerTable
// remove from filterServerTable this.filterServerTable.remove(brokerAddrInfo);
-
更新 brokerAddrTable
-
把 brokerData 中和传入的 brokerAddr 相同的地址移除
-
经过上述操作后如果 brokerAddrs 为空,说明这个 brokerName 已经没有 broker 实例了,则将 brokerAddrTable 中 key 为传入的 brokerName 的元素删除,并将 removeBrokerName 设置为 true
boolean removeBrokerName = false; boolean isMinBrokerIdChanged = false; BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null != brokerData) { // 如果当前 brokerId 是 brokerData 中最小的 brokerId,则将 isMinBrokerIdChanged 设置为 true if (!brokerData.getBrokerAddrs().isEmpty() && unRegisterRequest.getBrokerId().equals(Collections.min(brokerData.getBrokerAddrs().keySet()))) { isMinBrokerIdChanged = true; } // 把 brokerData 中和 brokerAddr 相同的地址移除 boolean removed = brokerData.getBrokerAddrs().entrySet().removeIf(item -> item.getValue().equals(brokerAddr)); log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}", removed ? "OK" : "Failed", brokerAddrInfo ); // 如果brokerData中没有地址了,说明这个brokerName已经没有broker实例了,需要从brokerAddrTable中移除 if (brokerData.getBrokerAddrs().isEmpty()) { this.brokerAddrTable.remove(brokerName); log.info("unregisterBroker, remove name from brokerAddrTable OK, {}", brokerName ); removeBrokerName = true; } else if (isMinBrokerIdChanged) { // 如果最小的brokerId发生了变化,需要通知其他broker更新路由信息 needNotifyBrokerMap.put(brokerName, new BrokerStatusChangeInfo( brokerData.getBrokerAddrs(), brokerAddr, null)); } }
-
-
更新 clusterAddrTable
- 如果 removeBrokerName 为 true,则删除 clusterAddrTable 中 key 为传入的 clusterName 的值 Set 中和传入的 brokerName 相等的元素。如果删除元素后此时 Set 集合为空,则将此 cluster 删除。并将删除的 brokerName 添加到 removedBroker 集合中
- 如果 removeBrokerName 为 false,则将传入的 brokerName 添加到 reducedBroker 集合中
// 如果brokerName已经没有broker实例了,需要将brokerName从clusterAddrTable中移除 if (removeBrokerName) { Set<String> nameSet = this.clusterAddrTable.get(clusterName); if (nameSet != null) { boolean removed = nameSet.remove(brokerName); log.info("unregisterBroker, remove name from clusterAddrTable {}, {}", removed ? "OK" : "Failed", brokerName); if (nameSet.isEmpty()) { this.clusterAddrTable.remove(clusterName); log.info("unregisterBroker, remove cluster from clusterAddrTable {}", clusterName ); } } removedBroker.add(brokerName); } else { reducedBroker.add(brokerName); }
-
更新 topicQueueTable
调用了
cleanTopicByUnRegisterRequests
方法,将上一步更新的 removedBroker 和 reducedBroker 作为参数传入方法中// 清理topicQueueTable中的无效数据 cleanTopicByUnRegisterRequests(removedBroker, reducedBroker);
cleanTopicByUnRegisterRequests
方法中清理了 topicQueueTable 中无效的数据private void cleanTopicByUnRegisterRequests(Set<String> removedBroker, Set<String> reducedBroker) { Iterator<Entry<String, Map<String, QueueData>>> itMap = this.topicQueueTable.entrySet().iterator(); while (itMap.hasNext()) { // 遍历所有的topic Entry<String, Map<String, QueueData>> entry = itMap.next(); String topic = entry.getKey(); // 遍历topic对应的brokerName和QueueData Map<String, QueueData> queueDataMap = entry.getValue(); // 遍历需要移除的brokerName for (final String brokerName : removedBroker) { // 移除这个brokerName对应的QueueData final QueueData removedQD = queueDataMap.remove(brokerName); if (removedQD != null) { log.debug("removeTopicByBrokerName, remove one broker's topic {} {}", topic, removedQD); } } // 如果topic对应的brokerName都移除了,那么就移除这个topic if (queueDataMap.isEmpty()) { log.debug("removeTopicByBrokerName, remove the topic all queue {}", topic); itMap.remove(); } // 遍历需要减少的brokerName for (final String brokerName : reducedBroker) { final QueueData queueData = queueDataMap.get(brokerName); if (queueData != null) { // 如果这个brokerName对应的brokerData开启了自动切换master的功能,那么就需要判断这个brokerName对应的brokerData中是否还有master if (this.brokerAddrTable.get(brokerName).isEnableActingMaster()) { // Master has been unregistered, wipe the write perm // 如果这个brokerName对应的brokerData中没有master了,那么就把这个topic的写权限去掉 if (isNoMasterExists(brokerName)) { queueData.setPerm(queueData.getPerm() & (~PermName.PERM_WRITE)); } } } } } }
-
通知其他 broker 组更新信息
调用了
notifyMinBrokerIdChanged
方法// 如果最小的brokerId发生了变化,需要通知其他broker更新路由信息 if (!needNotifyBrokerMap.isEmpty() && namesrvConfig.isNotifyMinBrokerIdChanged()) { notifyMinBrokerIdChanged(needNotifyBrokerMap); }
notifyMinBrokerIdChanged
方法中又调用了重载后的notifyMinBrokerIdChanged
方法private void notifyMinBrokerIdChanged(Map<String, BrokerStatusChangeInfo> needNotifyBrokerMap) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, RemotingTooMuchRequestException { for (String brokerName : needNotifyBrokerMap.keySet()) { BrokerStatusChangeInfo brokerStatusChangeInfo = needNotifyBrokerMap.get(brokerName); BrokerData brokerData = brokerAddrTable.get(brokerName); if (brokerData != null && brokerData.isEnableActingMaster()) { notifyMinBrokerIdChanged(brokerStatusChangeInfo.getBrokerAddrs(), brokerStatusChangeInfo.getOfflineBrokerAddr(), brokerStatusChangeInfo.getHaBrokerAddr()); } } }
在重载的
notifyMinBrokerIdChanged
方法中向非最小 BrokerId 的 Broker 地址通知最小 BrokerId 发生了变化private void notifyMinBrokerIdChanged(Map<Long, String> brokerAddrMap, String offlineBrokerAddr, String haBrokerAddr) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException, RemotingTooMuchRequestException, RemotingConnectException { if (brokerAddrMap == null || brokerAddrMap.isEmpty() || this.namesrvController == null) { return; } NotifyMinBrokerIdChangeRequestHeader requestHeader = new NotifyMinBrokerIdChangeRequestHeader(); long minBrokerId = Collections.min(brokerAddrMap.keySet()); requestHeader.setMinBrokerId(minBrokerId); requestHeader.setMinBrokerAddr(brokerAddrMap.get(minBrokerId)); requestHeader.setOfflineBrokerAddr(offlineBrokerAddr); requestHeader.setHaBrokerAddr(haBrokerAddr); // 选择通知的 Broker 地址,即非最小 BrokerId 的 Broker 地址 List<String> brokerAddrsNotify = chooseBrokerAddrsToNotify(brokerAddrMap, offlineBrokerAddr); log.info("min broker id changed to {}, notify {}, offline broker addr {}", minBrokerId, brokerAddrsNotify, offlineBrokerAddr); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.NOTIFY_MIN_BROKER_ID_CHANGE, requestHeader); for (String brokerAddr : brokerAddrsNotify) { // 向 Broker 发送通知 this.namesrvController.getRemotingClient().invokeOneway(brokerAddr, request, 300); } }
3.3 拼凑 TopicRouteData
首先来看 TopicRouteData
的定义,其属性如下
private String orderTopicConf;
private List<QueueData> queueDatas;
private List<BrokerData> brokerDatas;
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
//It could be null or empty
private Map<String/*brokerName*/, TopicQueueMappingInfo> topicQueueMappingByBroker;
拼凑 TopicRouteData
的方法是 org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#pickupTopicRouteData
,此方法可大致分为以下几个执行步骤
-
从
topicQueueTable
按照主题名称查询queueDatas
-
根据
queueDatas
中 brokerName 属性从brokerAddrTable
取得brokerData
对象,组成brokerDatas
-
根据查询到的
brokerData
从filterServerTable
查询到对应的 filterServer 列表,组装为 Map// 从topicQueueTable按照主题名称查询queueDatas Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topic); if (queueDataMap != null) { // 保存查询到的queueDatas topicRouteData.setQueueDatas(new ArrayList<>(queueDataMap.values())); foundQueueData = true; Set<String> brokerNameSet = new HashSet<>(queueDataMap.keySet()); // 根据queueDatas中brokerName属性从brokerAddrTable取得brokerData对象,组成brokerDatas for (String brokerName : brokerNameSet) { BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null == brokerData) { continue; } BrokerData brokerDataClone = new BrokerData(brokerData); // 保存查询到的brokerDatas brokerDataList.add(brokerDataClone); foundBrokerData = true; if (filterServerTable.isEmpty()) { continue; } // 根据查询到的brokerData从filterServerTable查询到对应的filterServer列表,组装为Map for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) { BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(brokerDataClone.getCluster(), brokerAddr); List<String> filterServerList = this.filterServerTable.get(brokerAddrInfo); // 保存查询到的filterServer列表 filterServerMap.put(brokerAddr, filterServerList); } } }
-
返回
topicRouteData
,如果有需要会重新设置 Master 的地址if (foundBrokerData && foundQueueData) { topicRouteData.setTopicQueueMappingByBroker(this.topicQueueMappingInfoTable.get(topic)); if (!namesrvConfig.isSupportActingMaster()) { return topicRouteData; } if (topic.startsWith(TopicValidator.SYNC_BROKER_MEMBER_GROUP_PREFIX)) { return topicRouteData; } if (topicRouteData.getBrokerDatas().size() == 0 || topicRouteData.getQueueDatas().size() == 0) { return topicRouteData; } boolean needActingMaster = false; for (final BrokerData brokerData : topicRouteData.getBrokerDatas()) { if (brokerData.getBrokerAddrs().size() != 0 && !brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) { needActingMaster = true; break; } } if (!needActingMaster) { return topicRouteData; } // 如果需要自动切换master,则执行下面的逻辑 for (final BrokerData brokerData : topicRouteData.getBrokerDatas()) { final HashMap<Long, String> brokerAddrs = brokerData.getBrokerAddrs(); // 如果brokerAddrs为空,或者brokerAddrs中包含masterId,或者brokerData中的enableActingMaster为false,那么就跳过 if (brokerAddrs.size() == 0 || brokerAddrs.containsKey(MixAll.MASTER_ID) || !brokerData.isEnableActingMaster()) { continue; } // No master for (final QueueData queueData : topicRouteData.getQueueDatas()) { if (queueData.getBrokerName().equals(brokerData.getBrokerName())) { // 如果queueData的perm不可写,那么就将brokerAddrs中brokerId最小的brokerAddr的brokerId改为masterId if (!PermName.isWriteable(queueData.getPerm())) { final Long minBrokerId = Collections.min(brokerAddrs.keySet()); final String actingMasterAddr = brokerAddrs.remove(minBrokerId); brokerAddrs.put(MixAll.MASTER_ID, actingMasterAddr); } break; } } } return topicRouteData; }