这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党
RocketMQ版本
- 5.1.0
背景
入口
这里源码入口我们就从broker
启动开始查看吧,然后慢慢到NameServer
由于不知道具体代码在哪,所以我们就漫无目的的找找看吧
想了下算了还是直接搜索registerBroker
试试
我们很快在BrokerController
的start()
方法找到了
这里是有区分broker
和Proxy
是否隔离,然后执行不同的方法
但是核心还是registerBrokerAll
方法
实际我们通过查看registerBrokerAll
方法的时候发现,如果执行了topic
相关的更新操作,也会触发重新注册broker
,这里也正常,因为要更新NameServer
的路由元数据
实际在执行完方法
this.registerBrokerAll(true, false, true);
下面又马上启动了一个定时任务用于注册broker
到NameServer
默认30s执行一次
可配置最大时间为60s一次
registerBrokerAll
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
// 组装Topic配置 即我们的topics.json
TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new TopicConfigAndMappingSerializeWrapper();
topicConfigWrapper.setDataVersion(this.getTopicConfigManager().getDataVersion());
topicConfigWrapper.setTopicConfigTable(this.getTopicConfigManager().getTopicConfigTable());
topicConfigWrapper.setTopicQueueMappingInfoMap(this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream().map(
entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue()))
).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
// 组装完成
// 检查broker的权限如果不拥有可读、可写权限
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
// 将所有topic的权限替换为broker的权限
TopicConfig tmp =
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
topicConfig.getPerm() & this.brokerConfig.getBrokerPermission(), topicConfig.getTopicSysFlag());
topicConfigTable.put(topicConfig.getTopicName(), tmp);
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}
// 强制注册或者需要注册
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills(),
this.brokerConfig.isInBrokerContainer())) {
// 实际的注册逻辑
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
}
}
上面的代码基本上有注册了,但是我们还是整理一下逻辑
- 获取
topics.json
的配置文件的topic信息组装成要发送到Nameserve的TopicConfigAndMappingSerializeWrapper
类 - 如果
broker
的权限没有可读可写,就将topic
的所有权限设置为broker
的权限,但是这里不会去更新topics.json
配置文件 - 判断
broker
是否需要注册 - 注册
是否需要注册broker
实际的逻辑是在
List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills, isInBrokerContainer);
我们进去看看
可以看到就是将broker
的自身一些信息发送到NameServer
查询是否需要注册
我们通过请求状态码QUERY_DATA_VERSION
看看NameServer
的处理逻辑
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#queryBrokerTopicConfig
这里主要是判断broker
的topic
信息是否发生了变化,如果发生了则返回true
可以看到这里的注册肯定是针对后续的一些更新,我们第一次启动注册肯定是强制注册不执行这里的逻辑
为false
主要还是topic
更新相关的请求,回去对比是否需要重新注册。
现在我们还是回到主流程,看看注册的处理逻辑
注册doRegisterBrokerAll
org.apache.rocketmq.broker.BrokerController#doRegisterBrokerAll
实际的逻辑被封装在BrokerOuterAPI
的registerBrokerAll
方法
这里的代码虽然看着很长,但是实际代码逻辑很简单
主要是组装一个RegisterBrokerRequestHeader
对象,然后发送到NameServer
,其中还做了一个crc32
数据校验
这里还有一个编码技巧,使用了CountDownLatch
并发的向多个NameServer
注册,提升性能
我们进入
RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
方法看看
里面是很标准的网络请求代码我们直接通过状态码
public static final int REGISTER_BROKER = 103;
查看NameServer
那边的处理逻辑
NameServer如何处理broker的注册请求
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#registerBroker
注册代码看着挺长的,我们重点分析下
- crc32数据校验
- 对
V3_0_11
之前的版本做兼容处理 - 核心注册方法封装在
org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker(java.lang.String, java.lang.String, java.lang.String, long, java.lang.String, java.lang.String, java.lang.Long, java.lang.Boolean, org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper, java.util.List<java.lang.String>, io.netty.channel.Channel)
里面
4. 判断是否允许读取kv配置中顺序消息topic配置
可以看到核心逻辑在3,所以我们进入到这个方法看看
代码有点长,我们来慢慢分析
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final String zoneName,
final Long timeoutMillis,
final Boolean enableActingMaster,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
//加锁
this.lock.writeLock().lockInterruptibly();
//init or update the cluster info
Set<String> brokerNames = ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap<String, Set<String>>) this.clusterAddrTable, clusterName, k -> new HashSet<>());
brokerNames.add(brokerName);
// 默认不是第一次注册
boolean registerFirst = false;
// 获取broker信息如果为空则表示为第一次注册
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
// 组装broker信息,放入brokerAddrTable中
brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());
this.brokerAddrTable.put(brokerName, brokerData);
}
boolean isOldVersionBroker = enableActingMaster == null;
brokerData.setEnableActingMaster(!isOldVersionBroker && enableActingMaster);
brokerData.setZoneName(zoneName);
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
boolean isMinBrokerIdChanged = false;
long prevMinBrokerId = 0;
if (!brokerAddrsMap.isEmpty()) {
prevMinBrokerId = Collections.min(brokerAddrsMap.keySet());
}
if (brokerId < prevMinBrokerId) {
isMinBrokerIdChanged = true;
}
// 如果ip 端口相同但是 brokerId不同则删除重复的
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
brokerAddrsMap.entrySet().removeIf(item -> null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey());
//If Local brokerId stateVersion bigger than the registering one,
String oldBrokerAddr = brokerAddrsMap.get(brokerId);
if (null != oldBrokerAddr && !oldBrokerAddr.equals(brokerAddr)) {
BrokerLiveInfo oldBrokerInfo = brokerLiveTable.get(new BrokerAddrInfo(clusterName, oldBrokerAddr));
if (null != oldBrokerInfo) {
long oldStateVersion = oldBrokerInfo.getDataVersion().getStateVersion();
long newStateVersion = topicConfigWrapper.getDataVersion().getStateVersion();
if (oldStateVersion > newStateVersion) {
log.warn("Registered Broker conflicts with the existed one, just ignore.: Cluster:{}, BrokerName:{}, BrokerId:{}, " +
"Old BrokerAddr:{}, Old Version:{}, New BrokerAddr:{}, New Version:{}.",
clusterName, brokerName, brokerId, oldBrokerAddr, oldStateVersion, brokerAddr, newStateVersion);
//Remove the rejected brokerAddr from brokerLiveTable.
brokerLiveTable.remove(new BrokerAddrInfo(clusterName, brokerAddr));
return result;
}
}
}
if (!brokerAddrsMap.containsKey(brokerId) && topicConfigWrapper.getTopicConfigTable().size() == 1) {
log.warn("Can't register topicConfigWrapper={} because broker[{}]={} has not registered.",
topicConfigWrapper.getTopicConfigTable(), brokerId, brokerAddr);
return null;
}
String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr);
registerFirst = registerFirst || (StringUtils.isEmpty(oldAddr));
// 如果brokerId为0则为master
boolean isMaster = MixAll.MASTER_ID == brokerId;
boolean isPrimeSlave = !isOldVersionBroker && !isMaster
&& brokerId == Collections.min(brokerAddrsMap.keySet());
// 如果 topics.config不为空并且为master,后面这个isPrimeSlave不知道是干嘛的
if (null != topicConfigWrapper && (isMaster || isPrimeSlave)) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
if (registerFirst || this.isTopicConfigChanged(clusterName, brokerAddr,
topicConfigWrapper.getDataVersion(), brokerName,
entry.getValue().getTopicName())) {
final TopicConfig topicConfig = entry.getValue();
if (isPrimeSlave) {
// Wipe write perm for prime slave
topicConfig.setPerm(topicConfig.getPerm() & (~PermName.PERM_WRITE));
}
// 创建更新实际的消费queue
this.createAndUpdateQueueData(brokerName, topicConfig);
}
}
}
if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {
TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);
Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap();
//the topicQueueMappingInfoMap should never be null, but can be empty
for (Map.Entry<String, TopicQueueMappingInfo> entry : topicQueueMappingInfoMap.entrySet()) {
if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<>());
}
//Note asset brokerName equal entry.getValue().getBname()
//here use the mappingDetail.bname
topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());
}
}
}
// 构建broker信息放入brokerLiveTable
BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddrInfo,
new BrokerLiveInfo(
System.currentTimeMillis(),
timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
topicConfigWrapper == null ? new DataVersion() : topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAService: {}", brokerAddrInfo, haServerAddr);
}
//filterServerList没用过
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddrInfo);
} else {
this.filterServerTable.put(brokerAddrInfo, filterServerList);
}
}
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerAddrInfo masterAddrInfo = new BrokerAddrInfo(clusterName, masterAddr);
BrokerLiveInfo masterLiveInfo = this.brokerLiveTable.get(masterAddrInfo);
if (masterLiveInfo != null) {
result.setHaServerAddr(masterLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
if (isMinBrokerIdChanged && namesrvConfig.isNotifyMinBrokerIdChanged()) {
notifyMinBrokerIdChanged(brokerAddrsMap, null,
this.brokerLiveTable.get(brokerAddrInfo).getHaServerAddr());
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
} finally {
this.lock.writeLock().unlock();
}
return result;
}
首先我们看看第一次注册的参数
然后可以看到整体的代码虽然很长,实际的逻辑还是比较简单的
- 组装broker信息,放入brokerAddrTable中
- 创建或者更新queueData数据,也就是
Map<String/* topic */, Map<String, QueueData>> topicQueueTable
- 更新
Map<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable
- 更新broker的存活信息,即
Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
可以看到主要是还是在Nameserve对broker的一些元数据做维护,比如broker
的topic
信息、queue
信息、broker
的存活信息
总结
总的来说就是broker
启动后会向所有的Nameserver
注册自己的相关元数据信息,然后定时发送心跳。如果执行修改topic
相关的信息,也会同时更新broker
和`Nameserver·上面的元数据信息