RocketMQ 5.x broker注册到Nameserve源码分析

news2024/12/26 20:50:06

这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党

RocketMQ版本

  • 5.1.0

背景

入口

这里源码入口我们就从broker启动开始查看吧,然后慢慢到NameServer

由于不知道具体代码在哪,所以我们就漫无目的的找找看吧

想了下算了还是直接搜索registerBroker试试

我们很快在BrokerControllerstart()方法找到了

这里是有区分brokerProxy是否隔离,然后执行不同的方法
但是核心还是registerBrokerAll方法

实际我们通过查看registerBrokerAll方法的时候发现,如果执行了topic相关的更新操作,也会触发重新注册broker,这里也正常,因为要更新NameServer的路由元数据

实际在执行完方法

this.registerBrokerAll(true, false, true);

下面又马上启动了一个定时任务用于注册brokerNameServer

默认30s执行一次

可配置最大时间为60s一次

registerBrokerAll


public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
        // 组装Topic配置 即我们的topics.json
        TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new TopicConfigAndMappingSerializeWrapper();

        topicConfigWrapper.setDataVersion(this.getTopicConfigManager().getDataVersion());
        topicConfigWrapper.setTopicConfigTable(this.getTopicConfigManager().getTopicConfigTable());

        topicConfigWrapper.setTopicQueueMappingInfoMap(this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream().map(
            entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue()))
        ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
        // 组装完成
        // 检查broker的权限如果不拥有可读、可写权限
        if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
            || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
            ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
            for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
                // 将所有topic的权限替换为broker的权限
                TopicConfig tmp =
                    new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
                        topicConfig.getPerm() & this.brokerConfig.getBrokerPermission(), topicConfig.getTopicSysFlag());
                topicConfigTable.put(topicConfig.getTopicName(), tmp);
            }
            topicConfigWrapper.setTopicConfigTable(topicConfigTable);
        }
        // 强制注册或者需要注册
        if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
            this.getBrokerAddr(),
            this.brokerConfig.getBrokerName(),
            this.brokerConfig.getBrokerId(),
            this.brokerConfig.getRegisterBrokerTimeoutMills(),
            this.brokerConfig.isInBrokerContainer())) {
            // 实际的注册逻辑
            doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
        }
    }

上面的代码基本上有注册了,但是我们还是整理一下逻辑

  1. 获取topics.json的配置文件的topic信息组装成要发送到Nameserve的TopicConfigAndMappingSerializeWrapper
  2. 如果broker的权限没有可读可写,就将topic的所有权限设置为broker的权限,但是这里不会去更新topics.json配置文件
  3. 判断broker是否需要注册
  4. 注册

是否需要注册broker

实际的逻辑是在

List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills, isInBrokerContainer);

我们进去看看

可以看到就是将broker的自身一些信息发送到NameServer查询是否需要注册

我们通过请求状态码QUERY_DATA_VERSION看看NameServer的处理逻辑

  • org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#queryBrokerTopicConfig

这里主要是判断brokertopic信息是否发生了变化,如果发生了则返回true

可以看到这里的注册肯定是针对后续的一些更新,我们第一次启动注册肯定是强制注册不执行这里的逻辑

false主要还是topic更新相关的请求,回去对比是否需要重新注册。

现在我们还是回到主流程,看看注册的处理逻辑

注册doRegisterBrokerAll

  • org.apache.rocketmq.broker.BrokerController#doRegisterBrokerAll

实际的逻辑被封装在BrokerOuterAPIregisterBrokerAll方法

这里的代码虽然看着很长,但是实际代码逻辑很简单
主要是组装一个RegisterBrokerRequestHeader对象,然后发送到NameServer,其中还做了一个crc32数据校验

这里还有一个编码技巧,使用了CountDownLatch并发的向多个NameServer注册,提升性能

我们进入

RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);

方法看看

里面是很标准的网络请求代码我们直接通过状态码

public static final int REGISTER_BROKER = 103;

查看NameServer那边的处理逻辑

NameServer如何处理broker的注册请求

org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#registerBroker

注册代码看着挺长的,我们重点分析下

  1. crc32数据校验
  2. V3_0_11之前的版本做兼容处理
  3. 核心注册方法封装在org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker(java.lang.String, java.lang.String, java.lang.String, long, java.lang.String, java.lang.String, java.lang.Long, java.lang.Boolean, org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper, java.util.List<java.lang.String>, io.netty.channel.Channel) 里面


4. 判断是否允许读取kv配置中顺序消息topic配置

可以看到核心逻辑在3,所以我们进入到这个方法看看

代码有点长,我们来慢慢分析

public RegisterBrokerResult registerBroker(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final String zoneName,
        final Long timeoutMillis,
        final Boolean enableActingMaster,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final Channel channel) {
        RegisterBrokerResult result = new RegisterBrokerResult();
        try {
            //加锁
            this.lock.writeLock().lockInterruptibly();

            //init or update the cluster info
            Set<String> brokerNames = ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap<String, Set<String>>) this.clusterAddrTable, clusterName, k -> new HashSet<>());
            brokerNames.add(brokerName);
            // 默认不是第一次注册
            boolean registerFirst = false;
            // 获取broker信息如果为空则表示为第一次注册
            BrokerData brokerData = this.brokerAddrTable.get(brokerName);
            if (null == brokerData) {
                registerFirst = true;
                // 组装broker信息,放入brokerAddrTable中
                brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());
                this.brokerAddrTable.put(brokerName, brokerData);
            }

            boolean isOldVersionBroker = enableActingMaster == null;
            brokerData.setEnableActingMaster(!isOldVersionBroker && enableActingMaster);
            brokerData.setZoneName(zoneName);

            Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();

            boolean isMinBrokerIdChanged = false;
            long prevMinBrokerId = 0;
            if (!brokerAddrsMap.isEmpty()) {
                prevMinBrokerId = Collections.min(brokerAddrsMap.keySet());
            }

            if (brokerId < prevMinBrokerId) {
                isMinBrokerIdChanged = true;
            }
            // 如果ip 端口相同但是 brokerId不同则删除重复的
            //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
            //The same IP:PORT must only have one record in brokerAddrTable
            brokerAddrsMap.entrySet().removeIf(item -> null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey());

            //If Local brokerId stateVersion bigger than the registering one,
            String oldBrokerAddr = brokerAddrsMap.get(brokerId);
            if (null != oldBrokerAddr && !oldBrokerAddr.equals(brokerAddr)) {
                BrokerLiveInfo oldBrokerInfo = brokerLiveTable.get(new BrokerAddrInfo(clusterName, oldBrokerAddr));

                if (null != oldBrokerInfo) {
                    long oldStateVersion = oldBrokerInfo.getDataVersion().getStateVersion();
                    long newStateVersion = topicConfigWrapper.getDataVersion().getStateVersion();
                    if (oldStateVersion > newStateVersion) {
                        log.warn("Registered Broker conflicts with the existed one, just ignore.: Cluster:{}, BrokerName:{}, BrokerId:{}, " +
                                "Old BrokerAddr:{}, Old Version:{}, New BrokerAddr:{}, New Version:{}.",
                            clusterName, brokerName, brokerId, oldBrokerAddr, oldStateVersion, brokerAddr, newStateVersion);
                        //Remove the rejected brokerAddr from brokerLiveTable.
                        brokerLiveTable.remove(new BrokerAddrInfo(clusterName, brokerAddr));
                        return result;
                    }
                }
            }

            if (!brokerAddrsMap.containsKey(brokerId) && topicConfigWrapper.getTopicConfigTable().size() == 1) {
                log.warn("Can't register topicConfigWrapper={} because broker[{}]={} has not registered.",
                    topicConfigWrapper.getTopicConfigTable(), brokerId, brokerAddr);
                return null;
            }

            String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr);
            registerFirst = registerFirst || (StringUtils.isEmpty(oldAddr));
            // 如果brokerId为0则为master
            boolean isMaster = MixAll.MASTER_ID == brokerId;
            boolean isPrimeSlave = !isOldVersionBroker && !isMaster
                && brokerId == Collections.min(brokerAddrsMap.keySet());
            // 如果 topics.config不为空并且为master,后面这个isPrimeSlave不知道是干嘛的
            if (null != topicConfigWrapper && (isMaster || isPrimeSlave)) {

                ConcurrentMap<String, TopicConfig> tcTable =
                    topicConfigWrapper.getTopicConfigTable();
                if (tcTable != null) {
                    for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                        if (registerFirst || this.isTopicConfigChanged(clusterName, brokerAddr,
                            topicConfigWrapper.getDataVersion(), brokerName,
                            entry.getValue().getTopicName())) {
                            final TopicConfig topicConfig = entry.getValue();
                            if (isPrimeSlave) {
                                // Wipe write perm for prime slave
                                topicConfig.setPerm(topicConfig.getPerm() & (~PermName.PERM_WRITE));
                            }
                            // 创建更新实际的消费queue
                            this.createAndUpdateQueueData(brokerName, topicConfig);
                        }
                    }
                }

                if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {
                    TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);
                    Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap();
                    //the topicQueueMappingInfoMap should never be null, but can be empty
                    for (Map.Entry<String, TopicQueueMappingInfo> entry : topicQueueMappingInfoMap.entrySet()) {
                        if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
                            topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<>());
                        }
                        //Note asset brokerName equal entry.getValue().getBname()
                        //here use the mappingDetail.bname
                        topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());
                    }
                }
            }
            // 构建broker信息放入brokerLiveTable
            BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
            BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddrInfo,
                new BrokerLiveInfo(
                    System.currentTimeMillis(),
                    timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
                    topicConfigWrapper == null ? new DataVersion() : topicConfigWrapper.getDataVersion(),
                    channel,
                    haServerAddr));
            if (null == prevBrokerLiveInfo) {
                log.info("new broker registered, {} HAService: {}", brokerAddrInfo, haServerAddr);
            }
            //filterServerList没用过
            if (filterServerList != null) {
                if (filterServerList.isEmpty()) {
                    this.filterServerTable.remove(brokerAddrInfo);
                } else {
                    this.filterServerTable.put(brokerAddrInfo, filterServerList);
                }
            }

            if (MixAll.MASTER_ID != brokerId) {
                String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                if (masterAddr != null) {
                    BrokerAddrInfo masterAddrInfo = new BrokerAddrInfo(clusterName, masterAddr);
                    BrokerLiveInfo masterLiveInfo = this.brokerLiveTable.get(masterAddrInfo);
                    if (masterLiveInfo != null) {
                        result.setHaServerAddr(masterLiveInfo.getHaServerAddr());
                        result.setMasterAddr(masterAddr);
                    }
                }
            }

            if (isMinBrokerIdChanged && namesrvConfig.isNotifyMinBrokerIdChanged()) {
                notifyMinBrokerIdChanged(brokerAddrsMap, null,
                    this.brokerLiveTable.get(brokerAddrInfo).getHaServerAddr());
            }
        } catch (Exception e) {
            log.error("registerBroker Exception", e);
        } finally {
            this.lock.writeLock().unlock();
        }

        return result;
    }

首先我们看看第一次注册的参数

然后可以看到整体的代码虽然很长,实际的逻辑还是比较简单的

  1. 组装broker信息,放入brokerAddrTable中
  2. 创建或者更新queueData数据,也就是Map<String/* topic */, Map<String, QueueData>> topicQueueTable
  3. 更新Map<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable
  4. 更新broker的存活信息,即Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;

可以看到主要是还是在Nameserve对broker的一些元数据做维护,比如brokertopic信息、queue信息、broker的存活信息

总结

总的来说就是broker启动后会向所有的Nameserver注册自己的相关元数据信息,然后定时发送心跳。如果执行修改topic相关的信息,也会同时更新broker和`Nameserver·上面的元数据信息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/846906.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Three.js阴影

目录 Three.js入门 Three.js光源 Three.js阴影 使用灯光后&#xff0c;场景中就会产生阴影。物体的背面确实在黑暗中&#xff0c;这称为核心阴影&#xff08;core shadow&#xff09;。我们缺少的是落下的阴影&#xff08;drop shadow&#xff09;&#xff0c;即对象在其他…

汇总当下的AI绘画模型

AI绘画从今年过年那阵儿兴起&#xff0c;到现在(2023.8)已经半年过去了&#xff0c;涌现了很多风格迥异的模型&#xff0c;我在这里简单汇总一些。 一、写实人物类 1.1 AWPortrait 比较拟真的人物肖像 1.2 XXMix_9realistic 2.5D人物模型&#xff0c;因为画面带有一丝油画的…

从零开始学习 Java:简单易懂的入门指南之API、String类(八)

常用API 1.API1.1API概述1.2如何使用API帮助文档 2.String类2.1String类概述2.2String类的特点2.3String类的构造方法2.4创建字符串对象两种方式的区别2.5字符串的比较2.5.1号的作用2.5.2equals方法的作用 2.6用户登录案例2.6.1案例需求2.6.2代码实现 2.7遍历字符串案例2.7.1案…

【SQL应知应会】表分区(四)• Oracle版

欢迎来到爱书不爱输的程序猿的博客, 本博客致力于知识分享&#xff0c;与更多的人进行学习交流 本文收录于SQL应知应会专栏,本专栏主要用于记录对于数据库的一些学习&#xff0c;有基础也有进阶&#xff0c;有MySQL也有Oracle 分区表 • Oracle版 前言一、分区表1.什么是表分区…

深入大B行业,什么是最有力的敲门砖?

引言&#xff1a;2023上半年&#xff0c; 能扛过外部环境各种变化&#xff0c; 这样的科技公司就很不容易了。 【全球云观察 &#xff5c; 热点关注】在当前后疫情时代下&#xff0c;全球经济增长处于的低增长期&#xff0c;这对所有科技企业的发展带来了直接影响。 有业内人…

【ChatGPT 指令大全】怎么使用ChatGPT来辅助学习英语

在当今全球化的社会中&#xff0c;英语已成为一门世界性的语言&#xff0c;掌握良好的英语技能对个人和职业发展至关重要。而借助人工智能的力量&#xff0c;ChatGPT为学习者提供了一个有价值的工具&#xff0c;可以在学习过程中提供即时的帮助和反馈。在本文中&#xff0c;我们…

解决VScode远程服务器时opencv和matplotlib无法直接显示图像的问题

解决VScode远程服务器时opencv和matplotlib无法直接显示图像的问题 1、本方案默认本地已经安装了VScode与MobaXterm2、在服务器端3、在本地端安装MobaXterm4、测试5、opencv显示测试&#xff08;测试过程中需保持MobaXterm开启的状态&#xff09;6、 matplotlib显示测试&#x…

【Linux】操作系统与冯诺依曼体系——深度解析(软硬件层面)

​ 前言 大家好吖&#xff0c;欢迎来到 YY 滴 Linux系列 &#xff0c;热烈欢迎&#xff01; 本章主要内容面向接触过Linux的老铁&#xff0c;从软硬件层面向大家介绍操作系统与冯诺依曼体系&#xff0c; 主要内容含&#xff1a; 欢迎订阅 YY滴Linux专栏&#xff01;更多干货持…

如何找到优质跨境电商补单资源

做跨境电商的卖家都知道&#xff0c;补单&#xff08;测评&#xff09;可以帮助他们的产品快速提升销量、评论&#xff0c;获得排名&#xff0c;打造爆款&#xff0c;但是现在市面上有大量的测评机构资源是烂资源&#xff0c;机刷&#xff0c;黑卡等一系列不法手段层出不穷&…

润和软件人才评定报名系统正式上线,培养openEuler专业生态人才

8月3日&#xff0c;江苏润和软件股份有限公司&#xff08;以下简称“润和软件”&#xff09;自主研发的人才评定报名系统正式上线运行&#xff0c;欢迎大家咨询报名&#xff01; 2022年10月&#xff0c;润和软件申请并通过了openEuler开源社区理事会评审&#xff0c;成为openE…

条件语义相似度-CSTS

C-STS: Conditional Semantic Textual Similarity 语义文本相似度&#xff08;STS&#xff09;&#xff1a;测量一对句子之间的相似程度。在本质上是一个模棱两可的任务&#xff0c;因为句子相似度取决于某一特定方面。 条件语义文本相似度&#xff08;C-STS&#xff09;&…

破解难题:精准评估研发工作量的艺术

引言 在当今的软件研发环境中&#xff0c;评估研发工作量已经成为了一个重要且不容忽视的话题。无论是研发团队的日常工作&#xff0c;还是项目的战略规划&#xff0c;都离不开对工作量的精准评估。然而&#xff0c;评估研发工作量并非易事&#xff0c;它涉及到多个方面的挑战…

MongoDB创建用户 、数据库、索引等基础操作

MongoDB的权限认证是相对来说比较复杂的&#xff0c;不同的库创建后需要创建用户来管理。 本机中的MongoDB是docker 启动的&#xff0c;所以先进入docker的镜像中 docker exec -it mongodb bash 这样就进入到了镜像MongoDB中&#xff0c;然后输入命令连接MongoDB数据库 注…

LLM - Transformer LLaMA2 结构分析与 LoRA 详解

目录 一.引言 二.图说 LLM 1.Transformer 结构 ◆ Input、Output Embedding ◆ PositionEmbedding ◆ Multi-Head-Attention ◆ ADD & Norm ◆ Feed Forward ◆ Linear & Softmax 2.不同 LLM 结构 ◆ Encoder-Only ◆ Encoder-Decoder ◆ Decoder-Only …

在线识别文字提取,好用的方法速速收下

在现代社会&#xff0c;识别文字提取已经成为了一项非常重要的技能。随着网络技术的不断发展&#xff0c;现在我们已经可以通过在线工具来识别文字并提取出所需要的信息。本文将分享一些好用的方法和注意事项&#xff0c;帮助大家更好地进行在线识别文字提取。 OCR技术 OCR技术…

来了!8月12日KCC成都站线下读书会活动诚邀您参加!

设计丨朱亿钦 相关阅读 | Related Reading 历史与今天的交融&#xff1a;KCC杭州 Meetup 圆满完成 KCC上海第二次活动读书会圆满举办&#xff01; KCC成都首次非正式闭门会圆满成功 开源社简介 开源社成立于 2014 年&#xff0c;是由志愿贡献于开源事业的个人成员&#xff0c;依…

IPTV为什么要直连光猫

最佳答案 IPTV机顶盒之所以要与光猫连接&#xff0c;而且必须用网线&#xff0c;不能用无线网络&#xff0c;这是因为运营商的业务模式决定的。单纯从技术层面来说&#xff0c;运营商的IPTV业务有组播和OTT两种模式&#xff0c;目前OTT模式基本被淘汰。 所谓的OTT模式&#x…

大学python题库及答案解析,大学python程序设计题库

本篇文章给大家谈谈大学python题库及答案解析&#xff0c;以及python期末编程题及答案&#xff0c;希望对各位有所帮助&#xff0c;不要忘了收藏本站喔。 发表时间&#xff1a;2020-07-07 一、填空题&#xff08;15分&#xff09; 使用print()函数将多个字符串’How’、’are ’…

如何使Python Docker镜像安全、快速、小巧

一、说明 在微服务领域&#xff0c;拥有安全、高效和紧凑的 Docker 映像对于成功部署至关重要。本博客将探讨有助于构建此类映像的关键因素&#xff0c;包括不以 root 用户身份运行映像的重要性、在构建映像时更新和升级包、在编写 Dockerfile 指令时考虑 Docker 的层架构&…

嵌入式虚拟仿真实验教学平台之登录注册功能使用

登录注册功能的使用 本文将介绍嵌入式虚拟仿真实验教学平台的账号如何注册以及登录账号。 注册账号 1、首先谷歌或Edge等主浏览器中输入https://app.puliedu.com/网址&#xff0c;然后会跳转到登录页&#xff0c;如下所示: 2、点击上图中框中的新注册账号&#xff0c;跳转…