23.RocketMQ之NameServer处理Broker心跳包,更新本地路由信息

news2024/9/29 21:30:04

NameServer处理Broker心跳包,更新本地路由信息

DefaultRequestProcessor继承自NettyRequestProcessor:处理各种客户端的请求,如果请求类型是为REGISTER_BROKER,则将请求转发到RouteInfoManager#regiesterBroker,主要是服务器端 或者客户端或者broker发送心跳,看下如何处理请求类型是为REGISTER_BROKER的请求吧

DefaultRequestProcessor#processRequest

java @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { ​ if (ctx != null) { log.debug("receive request, {} {} {}", request.getCode(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()), request); } ​ switch (request.getCode()) { case RequestCode.PUT_KV_CONFIG: return this.putKVConfig(ctx, request); case RequestCode.GET_KV_CONFIG: return this.getKVConfig(ctx, request); case RequestCode.DELETE_KV_CONFIG: return this.deleteKVConfig(ctx, request); case RequestCode.QUERY_DATA_VERSION: return queryBrokerTopicConfig(ctx, request); //注册broker case RequestCode.REGISTER_BROKER: Version brokerVersion = MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { return this.registerBrokerWithFilterServer(ctx, request); } else { return this.registerBroker(ctx, request); } //取消注册broker     case RequestCode.UNREGISTER_BROKER: return this.unregisterBroker(ctx, request); //拉取路由元信息 case RequestCode.GET_ROUTEINTO_BY_TOPIC: return this.getRouteInfoByTopic(ctx, request); case RequestCode.GET_BROKER_CLUSTER_INFO: return this.getBrokerClusterInfo(ctx, request); case RequestCode.WIPE_WRITE_PERM_OF_BROKER: return this.wipeWritePermOfBroker(ctx, request); case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER: return getAllTopicListFromNameserver(ctx, request); case RequestCode.DELETE_TOPIC_IN_NAMESRV: return deleteTopicInNamesrv(ctx, request); case RequestCode.GET_KVLIST_BY_NAMESPACE: return this.getKVListByNamespace(ctx, request); case RequestCode.GET_TOPICS_BY_CLUSTER: return this.getTopicsByCluster(ctx, request); case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS: return this.getSystemTopicListFromNs(ctx, request); case RequestCode.GET_UNIT_TOPIC_LIST: return this.getUnitTopicList(ctx, request); case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST: return this.getHasUnitSubTopicList(ctx, request); case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST: return this.getHasUnitSubUnUnitTopicList(ctx, request); //更新nameConfig case RequestCode.UPDATE_NAMESRV_CONFIG: return this.updateConfig(ctx, request); //拉取nameConfig case RequestCode.GET_NAMESRV_CONFIG: return this.getConfig(ctx, request); default: break; } return null; }

注册broker,维护topic和队列信息

DefaultRequestProcessor#registerBroker

```java public RemotingCommand registerBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand (RegisterBrokerResponseHeader.class);

final RegisterBrokerResponseHeader responseHeader
            = (RegisterBrokerResponseHeader)response.readCustomHeader();

final RegisterBrokerRequestHeader requestHeader
            = (RegisterBrokerRequestHeader)request
                        .decodeCommandCustomHeader          
                            (RegisterBrokerRequestHeader.class);

​ if (!checksum(ctx, request, requestHeader)) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("crc32 not match"); return response; } TopicConfigSerializeWrapper topicConfigWrapper; if (request.getBody() != null) { topicConfigWrapper = TopicConfigSerializeWrapper .decode(request.getBody(), TopicConfigSerializeWrapper.class); } else { topicConfigWrapper = new TopicConfigSerializeWrapper(); topicConfigWrapper.getDataVersion().setCounter(new AtomicLong(0)); topicConfigWrapper.getDataVersion().setTimestamp(0); }

//注册broker 维护路由信息
RegisterBrokerResult result = this.namesrvController
                                .getRouteInfoManager()
                                .registerBroker(
                                    requestHeader.getClusterName(),
                                    requestHeader.getBrokerAddr(),
                                    requestHeader.getBrokerName(),
                                    requestHeader.getBrokerId(),
                                    requestHeader.getHaServerAddr(),
                                    topicConfigWrapper,
                                    null,
                                    ctx.channel());

responseHeader.setHaServerAddr(result.getHaServerAddr());
responseHeader.setMasterAddr(result.getMasterAddr());
byte[] jsonValue = this.namesrvController
                        .getKvConfigManager()
                        .getKVListByNamespace
                        (NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
response.setBody(jsonValue);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;

} ```

RouteInfoManager#registerBroker

```java public RegisterBrokerResult registerBroker( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List filterServerList, final Channel channel) { RegisterBrokerResult result = new RegisterBrokerResult(); try { try { //加锁 this.lock.writeLock().lockInterruptibly(); //维护clusterAddrTable Set brokerNames = this.clusterAddrTable.get(clusterName); if (null == brokerNames) { brokerNames = new HashSet (); this.clusterAddrTable.put(clusterName, brokerNames); } //将当前broker添加到集合中 brokerNames.add(brokerName);

boolean registerFirst = false;
        //维护brokerAddrTable
        BrokerData brokerData = this.brokerAddrTable.get(brokerName);
        if (null == brokerData) {
            registerFirst = true;
            brokerData = new BrokerData
                    (clusterName, brokerName, new HashMap<Long, String>());
            this.brokerAddrTable.put(brokerName, brokerData);
        }
        //非第一次注册,更新Broker
        Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();

        Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
        while (it.hasNext()) {
            Entry<Long, String> item = it.next();
            if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
                it.remove();
            }
        }

​ String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr); //维护topicQueueTable if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) { if (this.isBrokerTopicConfigChanged (brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {

ConcurrentMap<String, TopicConfig> tcTable = 
                            topicConfigWrapper.getTopicConfigTable();

                if (tcTable != null) {
                    for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                        //HashMap<String/* topic */, List<QueueData>> topicQueueTable;
                        this.createAndUpdateQueueData(brokerName, entry.getValue());
                    }
                }
            }
        }
        //维护brokerLiveTable
        BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
            new BrokerLiveInfo(
                System.currentTimeMillis(),
                topicConfigWrapper.getDataVersion(),
                channel,
                haServerAddr));
        if (null == prevBrokerLiveInfo) {
            log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
        }
        //维护filterServerList
        if (filterServerList != null) {
            if (filterServerList.isEmpty()) {
                this.filterServerTable.remove(brokerAddr);
            } else {
                this.filterServerTable.put(brokerAddr, filterServerList);
            }
        }

​ if (MixAll.MASTERID != brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTERID); if (masterAddr != null) { BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr); if (brokerLiveInfo != null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); } } } } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("registerBroker Exception", e); } ​ return result; } ```

RouteInfoManager#createAndUpdateQueueData

```java private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) { // HashMap > topicQueueTable; //创建QueueData QueueData就是topicQueueTable对应的List中的元素 QueueData queueData = new QueueData(); queueData.setBrokerName(brokerName); queueData.setWriteQueueNums(topicConfig.getWriteQueueNums()); queueData.setReadQueueNums(topicConfig.getReadQueueNums()); queueData.setPerm(topicConfig.getPerm()); queueData.setTopicSynFlag(topicConfig.getTopicSysFlag()); //获得topicQueueTable中队列集合 List queueDataList = this.topicQueueTable.get(topicConfig.getTopicName()); //topicQueueTable为空,则直接创建一个list并将queueData添加到队列集合 if (null == queueDataList) { queueDataList = new LinkedList (); queueDataList.add(queueData); this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList); log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData); } else { //判断是否是新的队列 boolean addNewOne = true; Iterator it = queueDataList.iterator(); while (it.hasNext()) { QueueData qd = it.next();

//假如原来topicA对应的是braoker-a broker-b
            //现在新增broker-c
            //那么qd.getBrokerName().equals(brokerName) 肯定是不相等的
            //这样才能加入queueDataList

            //如果brokerName相同,代表不是新的队列
            if (qd.getBrokerName().equals(brokerName)) {
                    if (qd.equals(queueData)) {
                        addNewOne = false;
                    } else {
                        it.remove();
                    }
                }
        }
    //如果是新的队列,则添加队列到queueDataList
    if (addNewOne) {
        queueDataList.add(queueData);
    }
}

} ```

image.png

生产者&消费者如何定时拉取路由元信息?

是在哪里请求的,多久请求一次?

RocketMQ路由发现是非实时的,当Topic路由出现变化后,NameServer不会主动推送给客户端,而是由客户端定时拉取主题最新的路由.客户端指的是消费者和生产者。

发起getRouteInfoByTopic请求的是MQClientInstance。生产者和消费者都会启动MQClientInstance。

MQClientInstance会启动1个定时任务每30秒拉一次最新的路由信息。

此时会由DefaultRequestProcessor处理这些请求。请求的key是GET_ROUTEINFO_BY_TOPIC.

具体代码如下:

java case RequestCode.GET_ROUTEINFO_BY_TOPIC: return this.getRouteInfoByTopic(ctx, request);

DefaultRequestProcessor#getRouteInfoByTopic

java public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetRouteInfoRequestHeader requestHeader = (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class); // 调用RouteInfoManager的方法 // 从路由表topicQueueTable、brokerAddrTable、filterServerTable中 // 分别填充TopicRouteData的List<QueueData>、List<BrokerData>、filterServer TopicRouteData topicRouteData = this.namesrvController .getRouteInfoManager() .pickupTopicRouteData(requestHeader.getTopic()); //如果找到主题对应你的路由信息并且该主题为顺序消息 //则从NameServer KVConfig中获取关于顺序消息相关的配置填充路由信息 if (topicRouteData != null) { if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) { String orderTopicConf = this.namesrvController .getKvConfigManager().getKVConfig (NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, requestHeader.getTopic()); topicRouteData.setOrderTopicConf(orderTopicConf); } ​ byte[] content = topicRouteData.encode(); response.setBody(content); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; } ​ response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark ("No topic route info in name server for the topic: " + requestHeader.getTopic()+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); return response; }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/711491.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

go语言环境安装

文章目录 环境介绍安装软件包步骤环境变量设置来一个经典的hello worldNice 最近的项目需要用到go来开发了&#xff0c;前几天就已经在看书了&#xff0c;今天是个周末&#xff0c;先在家里的机器上把环境搭好&#xff0c;特此记录一下。 环境介绍 下载地址&#xff1a;https:…

RRT 算法研究(附 Python / C++ 实现)

RRT 算法研究 参考 机器人路径规划、轨迹优化课程-第五讲-RRT算法原理和代码讲解 机器人路径规划之RRT算法(附C源码) RRT算法(快速拓展随机树)的Python实现 《基于改进RRT算法的路径规划研究》 《面向室内复杂场景的移动机器人快速路径规划算法研究》 理论基础 RRT&#xff0…

meb stm32开发

matlab1028b以上 stm32cubemx5.6.0以上 stm32-mat/target 教程与代码分享 - 知乎 安装好这些后&#xff0c;打开matlab&#xff0c;打开路径STM32-MAT\STM32 打开simulink&#xff0c;view-lib 可以看到 在STM32CUBEMX完成底层配置&#xff0c;生成ioc文件

UI的绘制流程

1.App的启动流程 每个App都是一个独立的进程&#xff0c;当一个app启动的时候&#xff0c;当前进程也被启动&#xff0c;在Android中有一个类ActivityThread&#xff0c;就是进程的初始类&#xff0c;其中main方法就是整个app的入口。ActivityThread并不是一个线程&#xff0c;…

Java并发编程中的JMM、3个基本属性、synchronized和volatile

1、Java内存模型JMM (Java Meemory Model) JMM规定&#xff0c;所有变量均存储在主内存中每个线程都有自己的工作内存&#xff0c;保存了该线程中用到的变量的主内存副本拷贝线程对变量的所有操作&#xff0c;必须在自己的工作内存中&#xff0c;不可直接读写主内存不同线程无法…

2023-6-29-第十一式代理模式

&#x1f37f;*★,*:.☆(&#xffe3;▽&#xffe3;)/$:*.★* &#x1f37f; &#x1f4a5;&#x1f4a5;&#x1f4a5;欢迎来到&#x1f91e;汤姆&#x1f91e;的csdn博文&#x1f4a5;&#x1f4a5;&#x1f4a5; &#x1f49f;&#x1f49f;喜欢的朋友可以关注一下&#xf…

C++primer(第五版)第八章(IO库)

8.1 IO库 上表中以w开头的类型和函数是C标准库为了支持使用宽字符的语言而定义的一组类型和对象来操纵wchar_t类型的数据.(然而我没有遇到过) 8.1.1 IO对象无拷贝或赋值 IO对象不能拷贝或赋值,通常用引用方式传递和返回流,由于读写一个IO对象回改变其状态,因此传递和返回的引…

Cetos7.x连接不上网络解决办法

Cetos7.x连接不上网络解决办法 Cetos7.x连接不上网络解决办法 在VM中设置网络连接为桥接&#xff0c;修改后仍无法连接网络 ##配置centos7中ens33&#xff0c;将默认的no修改为yes 启动CentOS系统&#xff0c;并打开一个连接终端会话&#xff0c;使用root登录&#xff1b;进…

tomcat多台应该怎么能设置

一个tomcat一般能处理5000-1000的并发量但是还是远远不够我们可以设置多台来满足我们的要求 首先进入tomcat目录 配置tomcat环境变量 vim /etc/profile.d/tomcat.sh 然后刷新 source /etc/profile.d/tomcat.sh 修改tomcat1里面的配置文件 然后进入tomcat1中的启动bin程序中…

Docker安装、常见命令、安装常见容器(Mysql、Redis等)

目录 一、Docker安装 二、Docker常见命令 2.1 镜像命令 2.2 容器命令 2.3 总结 2.4 容器挂载-容器卷技术 三、Docker安装mysql容器 3.1 下载镜像文件 3.2 创建实例并启动 3.3 MySQL 配置 3.4 进入容器文件系统 四、Docker安装Redis 一、Docker安装 官网安装指引&a…

SSM框架原理畅谈之SpringMVC

SpringMVC 一、Java SE Servlet标准1.1 Servlet 接口1.2 HttpServletRequest 接口1.3 HttpServletResponse 接口1.4 Cookie 对象1.5 Filter 接口1.6 HttpSession 接口 二、SpringMVC2.1 Spring MVC核心概念2.2 DispatcherServlet2.3 DispatcherServlet.init()2.4 DispatcherSer…

第三章 搜索与图论(一)——深搜,广搜,图的存储与拓扑序列

文章目录 深度优先搜索广度优先搜索树和图的存储图的深搜 拓扑序深搜练习题842. 排列数字843. n-皇后问题 广搜练习题844. 走迷宫845. 八数码 树和图的存储与遍历练习题846. 树的重心847. 图中点的层次 拓扑序练习题848. 有向图的拓扑序列 深度优先搜索 数据结构&#xff1a; …

常见排序算法详解

文章目录 前言1. 排序算法简介2 算法效率2.1 度量一个程序执行时间两种方法2.2 时间频度2.3 时间复杂度2.4 常见的时间复杂度2.5 平均和最坏时间复杂度 3. 常见排序算法详解3.1 基数排序 (Radix Sort)(1) 算法过程(2)代码实现 3.2 冒泡排序 (Bubble Sort)(1) 算法过程(2) 代码实…

2023年7月2日leetcode每日一题打卡——125.验证回文串

一、题目描述与要求 125. 验证回文串 - 力扣&#xff08;LeetCode&#xff09; 题目描述 如果在将所有大写字符转换为小写字符、并移除所有非字母数字字符之后&#xff0c;短语正着读和反着读都一样。则可以认为该短语是一个 回文串 。 字母和数字都属于字母数字字符。 给…

学习系统编程No.28【多线程概念实战】

引言&#xff1a; 北京时间&#xff1a;2023/6/29/15:33&#xff0c;刚刚更新完博客&#xff0c;目前没什么状态&#xff0c;不好趁热打铁&#xff0c;需要去睡一会会&#xff0c;昨天睡的有点迟&#xff0c;然后忘记把7点到8点30之间的4个闹钟关掉了&#xff0c;恶心了我自己…

C语言学习(三十)---枚举、位段、联合体

这几天在往实习的地方弄东西&#xff0c;比较累&#xff0c;因此没有更新&#xff0c;在几天前我们学习了内存操作函数&#xff0c;其与之前学习的字符串操作函数相比&#xff0c;适用范围更加广泛&#xff0c;大家要注意掌握学习&#xff0c;今天我们将学习枚举、位段和联合体…

闲置BROOKSTONE Rover间谍车重生记

22年春节在家&#xff0c;哪也去不了&#xff0c;收拾出来一个多年前的玩具&#xff0c;全名叫BROOKSTONE Rover revolution&#xff0c;长这个样子。 尽管是7年前的产品了&#xff0c;科技感依旧挺足 印象中能手机控制&#xff0c;并且能语音对讲。只是网上找到的安卓版应用已…

xenomai内核解析--xenomai实时线程创建流程

版权声明&#xff1a;本文为本文为博主原创文章&#xff0c;未经同意&#xff0c;禁止转载。如有错误&#xff0c;欢迎指正&#xff0c;博客地址&#xff1a;https://blog.csdn.net/qq_22654551?typeblog 文章目录 问题概述1 libCobalt中调用非实时POSIX接口2 阶段1 linux线程…

02_jQuery与Ajax

jquery jquery的作用 他是js的库 处理html,事件,实现动画效果,方便的为网站提供AJAX交互 命名格式 .ji:体积大,用于学习和debug使用 .min.js:压缩的文件,体积小,用于线上环境使用 使用方法 必须先在页面文件中进行引用 $就是jQuery 注意: jQuery是DOM的封装 jQuery和…

Spring Boot 中的服务网关是什么,原理,如何使用

Spring Boot 中的服务网关是什么&#xff0c;原理&#xff0c;如何使用 在微服务架构中&#xff0c;服务网关是一个非常重要的组件。它可以作为所有微服务的入口&#xff0c;负责路由、负载均衡、安全性和监控等方面的功能。Spring Boot 提供了一系列的服务网关工具&#xff0…