RocketMq架构和源码解析

news2025/4/8 18:41:19

NameServer作为注册中心,提供路由注册、路由踢出、路由发现功能,舍弃强一致,保证高可用,集群中各个节点不会实时通讯,其中一个节点下线之后,会提供另外一个节点保证路由功能。

Broker:消息中转角色,负责存储消息、转发消息。Broker服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。Broker服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

Producer:负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到Broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。

Consumer:负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。

除了上面说的三个核心组件外,还有Topic这个概念下面也会多次提到:

Topic:表示一类消息的集合,每个Topic包含若干条消息,每条消息只能属于一个Topic,是RocketMQ进行消息订阅的基本单位。一个Topic可以分片在多个Broker集群上,每一个Topic分片包含多个queue

 NameServer架构

 启动入口:

org.apache.rocketmq.namesrv.NamesrvController#initialize

 public boolean initialize() {
        loadConfig();
        initiateNetworkComponents();
        initiateThreadExecutors();
        registerProcessor();
        startScheduleService();
        initiateSslContext();
        initiateRpcHooks();
        return true;
 }

路由注册

Broker服务器在启动的时候会想NameServer集群中所有的NameServer发送心跳信号进行注册,并会每隔30秒向nameserver发送心跳,告诉NameServer自己活着。NameServer接收到Broker发送的心跳包之后,会记录该broker信息,并保存最近一次收到心跳包的时间。

Broker服务发送心跳包,对外提供Topic配置

org.apache.rocketmq.broker.BrokerController#start

org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll

    public void start() throws Exception {

        this.shouldStartTime = System.currentTimeMillis() + messageStoreConfig.getDisappearTimeAfterStart();

        if (messageStoreConfig.getTotalReplicas() > 1 && this.brokerConfig.isEnableSlaveActingMaster() || this.brokerConfig.isEnableControllerMode()) {
            isIsolated = true;
        }

        if (this.brokerOuterAPI != null) {
            this.brokerOuterAPI.start();
        }

        startBasicService();

        if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
            changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
            this.registerBrokerAll(true, false, true);
        }

        scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
            @Override
            public void run2() {
                try {
                    if (System.currentTimeMillis() < shouldStartTime) {
                        BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);
                        return;
                    }
                    if (isIsolated) {
                        BrokerController.LOG.info("Skip register for broker is isolated");
                        return;
                    }
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    BrokerController.LOG.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));

        if (this.brokerConfig.isEnableSlaveActingMaster()) {
            scheduleSendHeartbeat();

            scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
                @Override
                public void run2() {
                    try {
                        BrokerController.this.syncBrokerMemberGroup();
                    } catch (Throwable e) {
                        BrokerController.LOG.error("sync BrokerMemberGroup error. ", e);
                    }
                }
            }, 1000, this.brokerConfig.getSyncBrokerMemberGroupPeriod(), TimeUnit.MILLISECONDS));
        }

        if (this.brokerConfig.isEnableControllerMode()) {
            scheduleSendHeartbeat();
        }

        if (brokerConfig.isSkipPreOnline()) {
            startServiceWithoutCondition();
        }
    }


   
    public List<RegisterBrokerResult> registerBrokerAll(
            final String clusterName,
            final String brokerAddr,
            final String brokerName,
            final long brokerId,
            final String haServerAddr,
            final TopicConfigSerializeWrapper topicConfigWrapper,
            final List<String> filterServerList,
            final boolean oneway,
            final int timeoutMills,
            final boolean enableActingMaster,
            final boolean compressed,
            final Long heartbeatTimeoutMillis,
            final BrokerIdentity brokerIdentity) {

        final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
        List<String> nameServerAddressList = this.remotingClient.getAvailableNameSrvList();
        if (nameServerAddressList != null && nameServerAddressList.size() > 0) {

            final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
            requestHeader.setBrokerAddr(brokerAddr);
            requestHeader.setBrokerId(brokerId);
            requestHeader.setBrokerName(brokerName);
            requestHeader.setClusterName(clusterName);
            requestHeader.setHaServerAddr(haServerAddr);
            requestHeader.setEnableActingMaster(enableActingMaster);
            requestHeader.setCompressed(false);
            if (heartbeatTimeoutMillis != null) {
                requestHeader.setHeartbeatTimeoutMillis(heartbeatTimeoutMillis);
            }

            RegisterBrokerBody requestBody = new RegisterBrokerBody();
            requestBody.setTopicConfigSerializeWrapper(TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper));
            requestBody.setFilterServerList(filterServerList);
            final byte[] body = requestBody.encode(compressed);
            final int bodyCrc32 = UtilAll.crc32(body);
            requestHeader.setBodyCrc32(bodyCrc32);
            final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
            for (final String namesrvAddr : nameServerAddressList) {
                brokerOuterExecutor.execute(new AbstractBrokerRunnable(brokerIdentity) {
                    @Override
                    public void run2() {
                        try {
                            RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
                            if (result != null) {
                                registerBrokerResultList.add(result);
                            }

                            LOGGER.info("Registering current broker to name server completed. TargetHost={}", namesrvAddr);
                        } catch (Exception e) {
                            LOGGER.error("Failed to register current broker to name server. TargetHost={}", namesrvAddr, e);
                        } finally {
                            countDownLatch.countDown();
                        }
                    }
                });
            }

            try {
                if (!countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS)) {
                    LOGGER.warn("Registration to one or more name servers does NOT complete within deadline. Timeout threshold: {}ms", timeoutMills);
                }
            } catch (InterruptedException ignore) {
            }
        }

        return registerBrokerResultList;
    }

NameServer注册broker信息,集群名称、broker名称、ha注册中心地址、超时时间

org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker

    public RegisterBrokerResult registerBroker(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final String zoneName,
        final Long timeoutMillis,
        final Boolean enableActingMaster,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final Channel channel) {
        RegisterBrokerResult result = new RegisterBrokerResult();
        try {
            this.lock.writeLock().lockInterruptibly();

            //init or update the cluster info
            Set<String> brokerNames = ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap<String, Set<String>>) this.clusterAddrTable, clusterName, k -> new HashSet<>());
            brokerNames.add(brokerName);

            boolean registerFirst = false;

            BrokerData brokerData = this.brokerAddrTable.get(brokerName);
            if (null == brokerData) {
                registerFirst = true;
                brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());
                this.brokerAddrTable.put(brokerName, brokerData);
            }

            boolean isOldVersionBroker = enableActingMaster == null;
            brokerData.setEnableActingMaster(!isOldVersionBroker && enableActingMaster);
            brokerData.setZoneName(zoneName);

            Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();

            boolean isMinBrokerIdChanged = false;
            long prevMinBrokerId = 0;
            if (!brokerAddrsMap.isEmpty()) {
                prevMinBrokerId = Collections.min(brokerAddrsMap.keySet());
            }

            if (brokerId < prevMinBrokerId) {
                isMinBrokerIdChanged = true;
            }

            //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
            //The same IP:PORT must only have one record in brokerAddrTable
            brokerAddrsMap.entrySet().removeIf(item -> null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey());

            //If Local brokerId stateVersion bigger than the registering one,
            String oldBrokerAddr = brokerAddrsMap.get(brokerId);
            if (null != oldBrokerAddr && !oldBrokerAddr.equals(brokerAddr)) {
                BrokerLiveInfo oldBrokerInfo = brokerLiveTable.get(new BrokerAddrInfo(clusterName, oldBrokerAddr));

                if (null != oldBrokerInfo) {
                    long oldStateVersion = oldBrokerInfo.getDataVersion().getStateVersion();
                    long newStateVersion = topicConfigWrapper.getDataVersion().getStateVersion();
                    if (oldStateVersion > newStateVersion) {
                        log.warn("Registered Broker conflicts with the existed one, just ignore.: Cluster:{}, BrokerName:{}, BrokerId:{}, " +
                                "Old BrokerAddr:{}, Old Version:{}, New BrokerAddr:{}, New Version:{}.",
                            clusterName, brokerName, brokerId, oldBrokerAddr, oldStateVersion, brokerAddr, newStateVersion);
                        //Remove the rejected brokerAddr from brokerLiveTable.
                        brokerLiveTable.remove(new BrokerAddrInfo(clusterName, brokerAddr));
                        return result;
                    }
                }
            }

            if (!brokerAddrsMap.containsKey(brokerId) && topicConfigWrapper.getTopicConfigTable().size() == 1) {
                log.warn("Can't register topicConfigWrapper={} because broker[{}]={} has not registered.",
                    topicConfigWrapper.getTopicConfigTable(), brokerId, brokerAddr);
                return null;
            }

            String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr);
            registerFirst = registerFirst || (StringUtils.isEmpty(oldAddr));

            boolean isMaster = MixAll.MASTER_ID == brokerId;
            boolean isPrimeSlave = !isOldVersionBroker && !isMaster
                && brokerId == Collections.min(brokerAddrsMap.keySet());

            if (null != topicConfigWrapper && (isMaster || isPrimeSlave)) {

                ConcurrentMap<String, TopicConfig> tcTable =
                    topicConfigWrapper.getTopicConfigTable();
                if (tcTable != null) {
                    for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                        if (registerFirst || this.isTopicConfigChanged(clusterName, brokerAddr,
                            topicConfigWrapper.getDataVersion(), brokerName,
                            entry.getValue().getTopicName())) {
                            final TopicConfig topicConfig = entry.getValue();
                            if (isPrimeSlave) {
                                // Wipe write perm for prime slave
                                topicConfig.setPerm(topicConfig.getPerm() & (~PermName.PERM_WRITE));
                            }
                            this.createAndUpdateQueueData(brokerName, topicConfig);
                        }
                    }
                }

                if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {
                    TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);
                    Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap();
                    //the topicQueueMappingInfoMap should never be null, but can be empty
                    for (Map.Entry<String, TopicQueueMappingInfo> entry : topicQueueMappingInfoMap.entrySet()) {
                        if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
                            topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<>());
                        }
                        //Note asset brokerName equal entry.getValue().getBname()
                        //here use the mappingDetail.bname
                        topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());
                    }
                }
            }

            BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
            BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddrInfo,
                new BrokerLiveInfo(
                    System.currentTimeMillis(),
                    timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
                    topicConfigWrapper == null ? new DataVersion() : topicConfigWrapper.getDataVersion(),
                    channel,
                    haServerAddr));
            if (null == prevBrokerLiveInfo) {
                log.info("new broker registered, {} HAService: {}", brokerAddrInfo, haServerAddr);
            }

            if (filterServerList != null) {
                if (filterServerList.isEmpty()) {
                    this.filterServerTable.remove(brokerAddrInfo);
                } else {
                    this.filterServerTable.put(brokerAddrInfo, filterServerList);
                }
            }

            if (MixAll.MASTER_ID != brokerId) {
                String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                if (masterAddr != null) {
                    BrokerAddrInfo masterAddrInfo = new BrokerAddrInfo(clusterName, masterAddr);
                    BrokerLiveInfo masterLiveInfo = this.brokerLiveTable.get(masterAddrInfo);
                    if (masterLiveInfo != null) {
                        result.setHaServerAddr(masterLiveInfo.getHaServerAddr());
                        result.setMasterAddr(masterAddr);
                    }
                }
            }

            if (isMinBrokerIdChanged && namesrvConfig.isNotifyMinBrokerIdChanged()) {
                notifyMinBrokerIdChanged(brokerAddrsMap, null,
                    this.brokerLiveTable.get(brokerAddrInfo).getHaServerAddr());
            }
        } catch (Exception e) {
            log.error("registerBroker Exception", e);
        } finally {
            this.lock.writeLock().unlock();
        }

        return result;
    }

路由剔除:NameServer和每个Broker保持长连接,每隔30秒接收Broker发送的心跳包,同时自身每个10秒扫描BrokerLiveTable,比较上次收到心跳时间和当前时间比较是否大于120秒,如果超过,那么认为Broker不可用,剔除路由表中该Broker相关信息。

路由发现:路由发现不是实时的,路由变化后,NameServer不主动推给客户端,等待producer定期拉取最新路由信息。这样的设计方式降低了NameServer实现的复杂性,当路由发生变化时通过在消息发送端的容错机制来保证消息发送的高可用(这块内容会在后续介绍producer消息发送时介绍,本文不展开讲解)。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1253824.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

为什么网上大量程序员卡35岁年龄招聘,而从来不报道测试、技术支持、售前售后工程师呢?

其实&#xff0c;网上只报道程序员卡35岁&#xff0c;这个说法并不成立。 而是普遍卡35岁&#xff0c;但并没有明确的一个职业类别。 随便搜一下&#xff0c;一眼望过去&#xff0c;其实已经波及很多行业了。 但如果你把IT从业人员合并报道&#xff0c;确实容易给人一种“程序…

Docker Swarm总结+service创建和部署、overlay网络以及Raft算法(2/4)

博主介绍&#xff1a;Java领域优质创作者,博客之星城市赛道TOP20、专注于前端流行技术框架、Java后端技术领域、项目实战运维以及GIS地理信息领域。 &#x1f345;文末获取源码下载地址&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;&#x1f3fb;…

【Go实现】实践GoF的23种设计模式:备忘录模式

上一篇&#xff1a;【Go实现】实践GoF的23种设计模式&#xff1a;命令模式 简单的分布式应用系统&#xff08;示例代码工程&#xff09;&#xff1a;https://github.com/ruanrunxue/Practice-Design-Pattern–Go-Implementation 简介 相对于代理模式、工厂模式等设计模式&…

红队攻防实战系列一之metasploit

百目无她&#xff0c;百书质华&#xff0c;君当醒悟&#xff0c;建我中华 本文首发于先知社区&#xff0c;原创作者即是本人 前言 在红队攻防中&#xff0c;我们主要在外网进行信息收集&#xff0c;通过cms或者其他漏洞拿到shell&#xff0c;之后通过免杀木马将windows或lin…

【通讯协议】gRPC和Webhook

RPC&#xff08;Remote procedure Call&#xff09;之所以被称为“远程”&#xff0c;是因为在微服务架构下&#xff0c;当服务部署到不同的服务器上时&#xff0c;它可以实现远程服务之间的通信。从用户的角度来看&#xff0c;它的作用就像本地函数调用。 下图说明了gRPC的整…

java io 流,输入流和输出流;节点流和处理流;字节流和字符流

文章目录 java 中 IO 流分为几种?按照流的流向分&#xff0c;可以分为输入流和输出流&#xff1b;按照流的角色划分为节点流和处理流。IO流主要的分类方式有以下3种&#xff1a; java中的IO流也是工作中使用到比较频繁的一个内容&#xff0c;今天以这篇文章来了解它的概念和整…

第十七章 解读PyTorch断点训练(工具)

主要有以下几方面的内容&#xff1a; 对于多步长训练需要保存lr_schedule初始化随机数种子保存每一代最好的结果 简单详细介绍 最近在尝试用CIFAR10训练分类问题的时候&#xff0c;由于数据集体量比较大&#xff0c;训练的过程中时间比较长&#xff0c;有时候想给停下来&…

下一代图片压缩格式 AVIF

长期以来我们都在为了在网络上使用什么样的图片格式而进行纠结。我们所熟知的或者运用到 Web 应用中的图片格式无非就是 PNG、JPG、GIF、SVG 或者 WebP。 HEIC是一种图像格式&#xff0c;上线时间还比较短&#xff0c;只有4年左右。 自iOS 11和 macOS High Sierra&#xff08…

基于 Gin 的 HTTPS 代理 Demo

上次写了 基于 Gin 的 HTTP 代理 Demo 之后&#xff0c;对这方面还是蛮感兴趣的&#xff0c;所以就接着继续走下去。为了这个主题的内容&#xff0c;我斥巨资购入了一本二手的 《HTTP 权威指南》&#xff0c;因为我知道这本书里面有我想要的知识。在我还在大学的时候&#xff0…

Kerberos 高可用配置和验证

参考 https://cloud.tencent.com/developer/article/1078314 https://mp.weixin.qq.com/s?__bizMzI4OTY3MTUyNg&mid2247485861&idx1&snbb930a497f63ac5e63ed20c64643eec5 机器准备 Kerberos主 ip-172-31-22-86.ap-southeast-1.compute.internal 7.common2.hado…

美国季节性干旱数据集

美国季节性干旱数据集 美国干旱展望栅格数据集由国家气象局气候预测中心生成。它在每个月的最后一天发布&#xff0c;提供下个月的干旱前景信息。“美国季节性干旱展望”数据集每月发布一次&#xff0c;特别是每月的第三个星期四。该数据集对美国不同地区发生干旱的可能性进行…

Linux加强篇005-用户身份与文件权限

目录 前言 1. 用户身份与能力 2. 文件权限与归属 3. 文件的特殊权限 4. 文件的隐藏属性 5. 文件访问控制列表 6. su命令与sudo服务 前言 悟已往之不谏&#xff0c;知来者之可追。实迷途其未远&#xff0c;觉今是而昨非。舟遥遥以轻飏&#xff0c;风飘飘而吹衣。问征夫以…

AIGC创作系统ChatGPT网站源码、支持最新GPT-4-Turbo模型、GPT-4图片对话能力+搭建部署教程

一、AI创作系统 SparkAi创作系统是基于ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统&#xff0c;支持OpenAI-GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美&#xff0c;可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如何搭建部署AI…

居家适老化设计第三十一条---卫生间水龙头

以上产品图片均来源于淘宝 侵权联系删除 居家适老化中&#xff0c;水龙头是一个非常重要的设备。水龙头的选择应该考虑到老年人的特点和需求。首先&#xff0c;水龙头的操作应该简单方便&#xff0c;老年人手部灵活性可能不如年轻人&#xff0c;因此水龙头应该设计成易于转动和…

计算机网络常考计算题之循环冗余校验(宝典教学)

文章目录 奇偶效验循环冗余校验例题四步走另一种题型 本文讲述了计算机考研中易出现的循环冗余校验&#xff0c;点赞关注收藏不迷路哦 我是一名双非计算机本科生&#xff0c;希望我的文章可以帮助到你。 奇偶效验 奇偶校验&#xff1a;也可以检测数据在传输过程中是否出现错误…

Ceph----RBD块存储的使用:详细实践过程实战版

RBD 方式的 工作 流程&#xff1a; 1、客户端创建一个pool&#xff0c;并指定pg数量&#xff0c;创建 rbd 设备并map 到文件系统&#xff1b; 2、用户写入数据&#xff0c;ceph进行对数据切块&#xff0c;每个块的大小默认为 4M&#xff0c;每个 块名字是 object序号&#xff…

【UnLua】在 Lua 中定义 UE 反射类型

【UnLua】在 Lua 中定义 UE 反射类型 用法 启动编辑器时遍历 Defines 目录下 lua 脚本来加载 UE 反射类型&#xff08;开个临时的 Lua VM 即可&#xff09;直接像 -- define a uenum in lua UEnum.EEnumGuestSomethingElse {Value1 1;Value2 2; }-- use it like a native …

算法基础之单链表

单链表 核心思想&#xff1a; 用数组模拟链表(new节点非常慢 用数组模拟快) e[N] 表示节点value ne[N]表示next指针指向 (空节点为-1) #include<iostream>using namespace std;const int N100010;//head头结点的指针//e[N] 表示节点value ne[N]表示next指针指向 //idx…

聚簇索引和非聚簇索引的区别;什么是回表

聚簇索引和非聚簇索引的区别 什么是聚簇索引&#xff1f;&#xff08;重点&#xff09; 聚簇索引就是将数据(一行一行的数据)跟索引结构放到一块&#xff0c;InnoDB存储引擎使用的就是聚簇索引&#xff1b; 注意点&#xff1a; 1、InnoDB使用的是聚簇索引&#xff08;聚簇索…

学习.NET验证模块FluentValidation的基本用法(续2:其它常见用法)

FluentValidation模块支持调用When和Unless函数设置验证规则的执行条件&#xff0c;其中when函数设置的是满足条件时执行&#xff0c;而Unless函数则是满足条件时不执行&#xff0c;这两个函数的使用示例如及效果如下所示&#xff1a; public AppInfoalidator() {RuleFor(x>…