深入解析 RocketMQ 中的 BrokerOuterAPI 组件​

news2025/4/6 9:18:48

在 RocketMQ 这一高性能分布式消息队列系统中,BrokerOuterAPI 组件犹如一座桥梁,连接着 Broker 与外部世界,在系统的运行、管理以及与其他组件交互过程中发挥着极为关键的作用。本文将深入探讨 BrokerOuterAPI 组件的内部机制、核心功能以及其在实际应用场景中的价值。​

一、BrokerOuterAPI 组件概述​

BrokerOuterAPI 并非一个孤立的模块,而是一组封装了 Broker 对外提供服务接口的集合。它涵盖了与客户端(Producer、Consumer)、其他 Broker 以及 NameServer 等进行通信和交互的关键逻辑。通过这些接口,Broker 能够接收并处理各种请求,实现消息的发送、消费、存储管理以及集群状态同步等核心功能。

二、核心功能剖析​

1.主要属性信息

    /**
     * netty客户端的组件
     */
    private final RemotingClient remotingClient;
    /**
     * 地址
     */
    private final TopAddressing topAddressing = new TopAddressing(MixAll.getWSAddr());
    /**
     * nameServer地址
     */
    private String nameSrvAddr = null;
    /**
     * 固定大小的线程池 4-10个
     */
    private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
        new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));
public class TopAddressing {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
    /**
     * nameServer的地址
     */
    private String nsAddr;
    /**
     * ws地址
     */
    private String wsAddr;
    /**
     * 单元名称
     */
    private String unitName;
}

2.核心方法

2.1fetchNameServerAddr

在 RocketMQ 中,BrokerOuterAPIfetchNameServerAddr方法在整个系统的架构和运行中扮演着不可或缺的角色。

方法功能

fetchNameServerAddr方法主要用于获取 NameServer 的地址信息。NameServer 在 RocketMQ 集群中承担着路由管理的重要职责,它维护着 Broker 的地址、主题与队列的映射关系等关键信息。Broker 需要与 NameServer 保持紧密通信,无论是注册自身信息、获取最新的路由数据,还是进行心跳检测等操作,都依赖于准确的 NameServer 地址。而fetchNameServerAddr方法正是为 Broker 提供了获取这些关键地址信息的途径。通过调用此方法,Broker 能够知晓 NameServer 的网络位置,进而建立起与 NameServer 之间的有效连接,确保后续各种交互操作得以顺利进行。

方法调用时机
  1. Broker 启动阶段:当 Broker 启动时,它首先需要知道 NameServer 的地址,以便能够向其注册自身信息并获取初始的路由数据。此时,Broker 会调用fetchNameServerAddr方法来获取 NameServer 的地址。例如,在一个新搭建的 RocketMQ 集群中,各个 Broker 节点在启动过程中,会通过此方法获取到预先配置或动态发现的 NameServer 地址,然后与 NameServer 建立连接,完成注册流程,使得自身能够被纳入到整个集群的管理体系中。

  2. 地址变更或重连场景:在 RocketMQ 集群运行过程中,可能会出现 NameServer 地址变更的情况,比如由于集群的扩容、网络架构调整等原因,NameServer 的地址发生了改变。或者当 Broker 与 NameServer 之间的连接因为网络故障等原因断开时,Broker 需要重新连接到 NameServer。在这些场景下,Broker 会再次调用fetchNameServerAddr方法,以获取最新有效的 NameServer 地址,从而重新建立连接,恢复与 NameServer 之间的通信,保证系统的正常运行。

代码


    //获取NameServer的地址
    public String fetchNameServerAddr() {
        try {
            //获取到nameServer的地址
            String addrs = this.topAddressing.fetchNSAddr();
            if (addrs != null && !UtilAll.isBlank(addrs)) {
                if (!addrs.equals(this.nameSrvAddr)) {
                    log.info("name server address changed, old: {} new: {}", this.nameSrvAddr, addrs);
                    this.updateNameServerAddressList(addrs);
                    this.nameSrvAddr = addrs;
                    return nameSrvAddr;
                }
            }
        } catch (Exception e) {
            log.error("fetchNameServerAddr Exception", e);
        }
        return nameSrvAddr;
    }

    //更新nameServer的地址
    public void updateNameServerAddressList(final String addrs) {
        List<String> lst = new ArrayList<String>();
        String[] addrArray = addrs.split(";");
        for (String addr : addrArray) {
            lst.add(addr);
        }
        //针对remotingClient 更新nameServer的地址
        this.remotingClient.updateNameServerAddressList(lst);
    }

2.2 registerBrokerAll

在 RocketMQ 里,BrokerOuterAPI中的registerBrokerAll方法是实现 Broker 在集群中注册与信息同步的关键。下面我将详细为你介绍它的功能、工作流程以及在集群管理中的重要性。

方法功能

registerBrokerAll方法主要用于 Broker 向 NameServer 注册自身信息,并且会同步一些关键的配置与状态数据,以确保 NameServer 掌握整个集群的最新布局与各 Broker 详细信息。这个方法的执行,让 Producer 和 Consumer 能够通过 NameServer 获取到准确的 Broker 地址与相关属性,从而实现消息的发送与消费。

工作流程
  1. 构建注册请求数据:当 Broker 启动或者检测到自身状态有重大变化(如新增或移除消息队列等)时,会调用registerBrokerAll方法。方法内部首先会收集一系列要注册的信息,包括 Broker 的唯一标识符(brokerId)、所属集群名称(clusterName)、Broker 地址(brokerAddr)、Master Broker 地址(如果当前 Broker 是 Slave 角色)、Broker 所支持的消息类型、当前 Broker 存储的消息队列信息(包括每个主题下的队列数量与分布情况)以及 Broker 的配置参数等。

  2. 向 NameServer 发送请求:将上述构建好的注册信息封装成网络请求,通过与 NameServer 建立的网络连接,发送到 NameServer 集群中的各个节点。RocketMQ 中的 NameServer 通常以集群形式部署,以保证高可用性和负载均衡,所以registerBrokerAll方法会确保注册信息同步到所有 NameServer 节点。

  3. NameServer 处理注册请求:NameServer 接收到注册请求后,会进行一系列处理。它会检查注册信息的完整性与合法性,比如验证brokerId是否唯一、clusterName是否存在等。若信息合法,NameServer 会将 Broker 的信息更新到其内部维护的路由表中。这个路由表记录了集群中所有 Broker 的详细信息,包括它们的地址、所属集群、负责的主题与队列等,是 Producer 和 Consumer 进行消息路由的重要依据。

  4. 返回注册结果:NameServer 处理完注册请求后,会向 Broker 返回注册结果。如果注册成功,Broker 会收到确认信息,表明其已经成功在 NameServer 中注册,并且后续可以正常接收来自 Producer 和 Consumer 的请求;若注册失败,NameServer 会返回失败原因,Broker 可能需要根据错误信息进行相应调整后重新尝试注册。

代码:

 /**
     *  在这里 broker通过netty客户端组件进行向NameSever组件发起注册请求
     * @param clusterName 集群名称
     * @param brokerAddr broker地址
     * @param brokerName broker名字
     * @param brokerId brokerid
     * @param haServerAddr 高可用地址
     * @param topicConfigWrapper topic的元数据
     * @param filterServerList 过滤服务器
     * @param oneway 是否oneway请求
     * @param timeoutMills 超时时间
     * @param compressed 是否启用压缩
     * @return
     */
    public List<RegisterBrokerResult> registerBrokerAll(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final boolean oneway,
        final int timeoutMills,
        final boolean compressed) {

        //初始化了一个List集合 用来存放Broker的注册结果的返回值
        final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
        //获取nameServerAddressList的集合
        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();

        if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
            //构建注册请求的请求头信息
            final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
            requestHeader.setBrokerAddr(brokerAddr);
            requestHeader.setBrokerId(brokerId);
            requestHeader.setBrokerName(brokerName);
            requestHeader.setClusterName(clusterName);
            requestHeader.setHaServerAddr(haServerAddr);
            requestHeader.setCompressed(compressed);
            //构建注册请求的请求体信息
            RegisterBrokerBody requestBody = new RegisterBrokerBody();
            requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
            requestBody.setFilterServerList(filterServerList);
            final byte[] body = requestBody.encode(compressed);
            final int bodyCrc32 = UtilAll.crc32(body);
            requestHeader.setBodyCrc32(bodyCrc32);

            //这块进行搞了一个CountDownLatch 只有向所有的NameServer注册完成之后才能继续执行
            final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
            for (final String namesrvAddr : nameServerAddressList) {
                brokerOuterExecutor.execute(() -> {
                    try {
                        //真正的执行注册的操作
                        RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
                        if (result != null) {
                            registerBrokerResultList.add(result);
                        }
                        log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                    } catch (Exception e) {
                        log.warn("registerBroker Exception, {}", namesrvAddr, e);
                    } finally {
                        countDownLatch.countDown();
                    }
                });
            }

            try {
                countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
            }
        }

        return registerBrokerResultList;
    }

    private RegisterBrokerResult registerBroker(
        final String namesrvAddr,
        final boolean oneway,
        final int timeoutMills,
        final RegisterBrokerRequestHeader requestHeader,
        final byte[] body
    ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
        InterruptedException {
        //下面两行代码将请求头和请求体封装到RemotingCommand中
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
        request.setBody(body);

        //这个oneway为true的时候就是单向发送请求,不需要等待响应 属于一种特殊情况
        if (oneway) {
            try {
                this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
            } catch (RemotingTooMuchRequestException e) {
                // Ignore
            }
            return null;
        }
        // 真正执行发送请求的代码
        RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                RegisterBrokerResponseHeader responseHeader =
                    (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
                RegisterBrokerResult result = new RegisterBrokerResult();
                result.setMasterAddr(responseHeader.getMasterAddr());
                result.setHaServerAddr(responseHeader.getHaServerAddr());
                if (response.getBody() != null) {
                    result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
                }
                return result;
            }
            default:
                break;
        }

        throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr());
    }

2.3unregisterBrokerAll

在 RocketMQ 的 BrokerOuterAPI 组件里,unregisterBrokerAll方法在集群管理方面扮演着特殊且重要的角色。它主要用于从 NameServer 中注销特定 Broker 的所有相关信息,通常在 Broker 节点需要从集群中彻底移除或进行重大变更时被调用。

方法功能
  1. 信息移除unregisterBrokerAll方法会向 NameServer 发起请求,将该 Broker 在 NameServer 维护的路由表中所有与之相关的记录删除。这包括 Broker 的地址、所属集群名称、Broker 角色(主节点 Master 或从节点 Slave),以及该 Broker 所负责的主题与队列等信息。NameServer 依靠这些信息来引导生产者(Producer)和消费者(Consumer)与正确的 Broker 节点进行通信,当 Broker 通过此方法注销后,NameServer 会更新内部数据结构,使得其他组件不再能通过 NameServer 找到该 Broker 的相关信息。

  2. 关联资源清理:在 Broker 自身内部,该方法会触发一系列关联资源的清理操作。例如,它会关闭与其他 Broker 节点用于数据同步的网络连接,停止对消息存储相关资源的维护(如关闭一些文件句柄、释放缓存资源等),因为该 Broker 即将不再参与集群的数据处理和存储工作。同时,Broker 也会清理本地维护的与其他组件(如 Producer、Consumer)交互的会话信息等。

方法调用时机
  1. Broker 正常下线:当运维人员计划对某个 Broker 节点进行硬件升级、软件版本更新等操作,需要将该 Broker 从集群中暂时移除时,会调用unregisterBrokerAll方法。在操作完成且确保 Broker 符合上线条件后,再通过registerBroker方法重新注册到集群中。这样可以保证在 Broker 下线期间,不会有新的请求被路由到该节点,避免出现服务中断或数据不一致问题。

  2. Broker 故障处理:如果某个 Broker 节点出现严重故障,无法正常提供服务,并且短时间内难以修复,为了保障整个集群的稳定性和可用性,会立即调用unregisterBrokerAll方法将其从集群中注销。与此同时,集群中的其他 Broker 节点会根据配置和相关机制,接管故障 Broker 原本负责的部分工作,例如从节点(Slave)可能会切换为主节点(Master),继续提供消息存储和服务。

代码:

//broker的下线请求
    public void unregisterBrokerAll(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId
    ) {
        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
        if (nameServerAddressList != null) {
            for (String namesrvAddr : nameServerAddressList) {
                try {
                    this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId);
                    log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr);
                } catch (Exception e) {
                    log.warn("unregisterBroker Exception, {}", namesrvAddr, e);
                }
            }
        }
    }

    public void unregisterBroker(
        final String namesrvAddr,
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId
    ) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
        UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader();
        requestHeader.setBrokerAddr(brokerAddr);
        requestHeader.setBrokerId(brokerId);
        requestHeader.setBrokerName(brokerName);
        requestHeader.setClusterName(clusterName);
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_BROKER, requestHeader);

        RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, 3000);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                return;
            }
            default:
                break;
        }

        throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr);
    }

三、总结与展望​

BrokerOuterAPI 组件作为 RocketMQ 中 Broker 对外交互的窗口,承载了消息发送、消费以及集群管理等核心功能,是 RocketMQ 能够高效、可靠运行的重要基石。深入理解 BrokerOuterAPI 的内部机制和功能,有助于开发者更好地优化 RocketMQ 的应用,提升分布式系统的性能和稳定性。随着分布式技术的不断发展和应用场景的日益复杂,相信 BrokerOuterAPI 组件也将持续演进,为 RocketMQ 在更多领域的广泛应用提供有力支撑。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2329121.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

springboot2.7.x整合nacos+seata

1、nacos及seata下载地址 Nacos Server 下载 | Nacos 官网 Seata Java Download | Apache Seata 这里的seata版本用2.1.0版本。 2、启动nacos D:\本地-seata-nacos\nacos-server\bin>startup.cmd -m standalone 3、修改seata的conf下的application的内容 这里的数据库…

为 IDEA 设置管理员权限

IDEA 安装目录 兼容性选择管理员身份运行程序 之后 IDEA 中的操作&#xff08;包括终端中的操作&#xff09;都是管理员权限的了

单片机学习笔记8.定时器

IAP15W4K58S4定时/计数器结构工作原理 定时器工作方式控制寄存器TMOD 不能进行位寻址&#xff0c;只能对整个寄存器进行赋值 寄存器地址D7D6D5D4D3D2D1D0复位值TMOD89HGATEC/(T低电平有效)M1M0GATEC/(T低电平有效)M1M000000000B D0-D3为T0控制&#xff0c;D4-D7为T1控制 GAT…

vue3实现markdown预览和编辑

Markdown作为一种轻量级标记语言&#xff0c;已经成为开发者编写文档的首选工具之一。在Vue3项目中集成Markdown编辑和预览功能可以极大地提升内容管理体验。本文将介绍如何使用Vditor这一强大的开源Markdown编辑器在Vue3项目中实现这一功能。 一、Vditor简介 Vditor是一款浏…

高并发秒杀系统接入层如何设计

博主介绍&#xff1a;✌全网粉丝5W&#xff0c;全栈开发工程师&#xff0c;从事多年软件开发&#xff0c;在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战&#xff0c;博主也曾写过优秀论文&#xff0c;查重率极低&#xff0c;在这方面有丰富的经验…

C++异常处理 throw try catch

C 异常处理概述 C 异常处理机制提供了一种在程序运行时捕获错误或异常情况的方式。异常处理的目的是使得程序在遇到错误时能够优雅地终止或恢复&#xff0c;并防止程序出现崩溃。C 使用 try, throw, 和 catch 关键字来实现异常处理。 异常处理的基本结构&#xff1a; throw: …

纯css实现环形进度条

需要在中实现一个定制化的环形进度条&#xff0c;最终效果如图&#xff1a; 使用代码 <divclass"circular-progress":style"{--progress: nextProgress,--color: endSliderColor,--size: isFull ? 60rpx : 90rpx,}"><div class"inner-conte…

0基础 | 硬件 | 电源系统 一

降压电路LDO 几乎所有LDO都是基于此拓扑结构 图 拓扑结构 LDO属于线性电源&#xff0c;通过控制开关管的导通程度实现稳压&#xff0c;输出纹波小&#xff0c;无开关噪声 线性电源&#xff0c;IoutIin&#xff0c;发热功率P电压差△U*电流I&#xff0c;转换效率Vo/Vi LDO不适…

获取KUKA机器人诊断文件KRCdiag的方法

有时候在进行售后问题时需要获取KUKA机器人的诊断文件KRCdiag&#xff0c;通过以下方法可以获取KUKA机器人的诊断文件KRCdiag&#xff1a; 1、将U盘插到控制柜内的任意一个USB接口&#xff1b; 2、依次点【主菜单】—【文件】—【存档】—【USB&#xff08;控制柜&#xff09…

一周学会Pandas2 Python数据处理与分析-NumPy数据类型

锋哥原创的Pandas2 Python数据处理与分析 视频教程&#xff1a; 2025版 Pandas2 Python数据处理与分析 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili NumPy 提供了丰富的数据类型&#xff08;dtypes&#xff09;&#xff0c;主要用于高效数值计算。以下是 NumPy 的主要…

Redis核心机制-缓存、分布式锁

目录 缓存 缓存更新策略 定期生成 实时生成 缓存问题 缓存预热&#xff08;Cache preheating&#xff09; 缓存穿透&#xff08;Cache penetration&#xff09; 缓存雪崩&#xff08;Cache avalanche&#xff09; 缓存击穿&#xff08;Cache breakdown&#xff09; 分…

如何在Ubuntu上安装Dify

如何在Ubuntu上安装Dify 如何在Ubuntu上安装docker 使用apt安装 # Add Dockers official GPG key: sudo apt-get update sudo apt-get install ca-certificates curl sudo install -m 0755 -d /etc/apt/keyrings sudo curl -fsSL https://download.docker.com/linux/ubuntu/gpg…

Python FastApi(13):APIRouter

如果你正在开发一个应用程序或 Web API&#xff0c;很少会将所有的内容都放在一个文件中。FastAPI 提供了一个方便的工具&#xff0c;可以在保持所有灵活性的同时构建你的应用程序。假设你的文件结构如下&#xff1a; . ├── app # 「app」是一个 Python 包…

【算法竞赛】状态压缩型背包问题经典应用(蓝桥杯2019A4分糖果)

在蓝桥杯中遇到的这道题&#xff0c;看上去比较普通&#xff0c;但其实蕴含了很巧妙的“状态压缩 背包”的思想&#xff0c;本文将从零到一&#xff0c;详细解析这个问题。 目录 一、题目 二、思路分析&#xff1a;状态压缩 最小覆盖 1. 本质&#xff1a;最小集合覆盖问题…

常微分方程 1

slow down and take your time 定积分应用回顾常微分方程的概述一阶微分方程可分离变量齐次方程三阶线性微分方程 一阶线性微分方程不定积分的被积分函数出现了绝对值梳理微分方程的基本概念题型 1 分离变量题型 2 齐次方程5.4 题型 3 一阶线性微分方程知识点5.55.6 尾声 定积分…

Web前端页面搭建

1.在D盘中创建www文件 cmd进入窗口命令windowsR 切换盘符d: 进入创建的文件夹 在文件夹里安装tp框架 在PS中打开tp文件 创建网站&#xff0c;根目录到public 在浏览器中打开网页 修改文件目录名称 在public目录中的。htaccess中填写下面代码 <IfModule mod_rewrite.c >…

开源 LLM 应用开发平台 Dify 全栈部署指南(Docker Compose 方案)

开源 LLM 应用开发平台 Dify 全栈部署指南&#xff08;Docker Compose 方案&#xff09; 一、部署环境要求与前置检查 1.1 硬件最低配置 组件要求CPU双核及以上内存4GB 及以上磁盘空间20GB 可用空间 1.2 系统兼容性验证 ✅ 官方支持系统&#xff1a; Ubuntu 20.04/22.04 L…

BN 层的作用, 为什么有这个作用?

BN 层&#xff08;Batch Normalization&#xff09;——这是深度神经网络中非常重要的一环&#xff0c;它大大改善了网络的训练速度、稳定性和收敛效果。 &#x1f9e0; 一句话理解 BN 层的作用&#xff1a; Batch Normalization&#xff08;批归一化&#xff09;通过标准化每一…

金仓数据库KCM认证考试介绍【2025年4月更新】

KCM&#xff08;金仓认证大师&#xff09;认证是金仓KES数据库的顶级认证&#xff0c;学员需通过前置KCA、KCP认证才能考KCM认证。 KCM培训考试一般1-2个月一次&#xff0c;KCM报名费原价为1.8万&#xff0c;当前优惠价格是1万&#xff08;趋势是&#xff1a;费用越来越高&…

如何通过句块训练法(Chunks)提升英语口语

真正说一口流利英语的人&#xff0c;并不是会造句的人&#xff0c;而是擅长“调取句块”的人。下面我们从原理、方法、场景、资源几个维度展开&#xff0c;告诉你怎么用“句块训练法&#xff08;Chunks&#xff09;”快速提升英语口语&#xff1a; 一、什么是“句块”&#xff…