RocketMQ源码

news2024/10/6 14:24:55

RocketMQ的核心三流程

  • 启动流程

RocketMQ服务端由两部分组成NameServer和Broker,NameServer是服务的注册中心,Broker会把自己的地址注册到NameServer,生产者和消费者启动的时候会先从NameServer获取Broker的地址,再去从Broker发送和接受消息。

  • 消息生产流程

Producer将消息写入到RocketMQ集群中Broker中具体的Queue。

  • 消息消费流程

Comsumer从RocketMQ集群中拉取对应的消息并进行消费确认。

NameServer源码分析

NameServer整体流程

NameServer是整个RocketMQ的“大脑”,它是RocketMQ的服务注册中心,所以RocketMQ需要先启动NameServer再启动Rocket中的Broker。
在这里插入图片描述

  • NameServer启动
    启动监听,等待Broker、Producer、Consumer连接。Broker在启动时向所有NameServer注册,生产者发送消息之前从NameServer获取Broker服务器地址列表,然后根据负载均衡算法从列表中选择一台服务器进行消息消费发送。消费者在订阅某个主题的消息之前从NameServer获取Broker服务器地址列表(有可能是集群),但是消费者选择从Broker中订阅消息,订阅规则由Broker配置决定。
  • 路由注册
    Broker启动后向所有NameServer发送路由及心跳信息。
  • 路由剔除
    一处心跳超时的Broker相关路由信息。NameServer与,每台Broker服务保持长连接,并间隔10s检查Broker是否存活,如果检测到Broker宕机,则从路由注册表中将其移除。这样就可以实现RocketMQ的高可用。

NameServer启动流程

在这里插入图片描述
启动NameServer
加载KV配置
构建NRS(Netty Remoting Server)通讯,接受路由、心跳信息
构建定时任务(剔除超时的Broker)

加载KV配置

核心解读NamesrvController类中createNamesrvController()
在这里插入图片描述

构建NRS通讯接收路由、心跳信息

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

构建定时任务剔除超时Broker

在这里插入图片描述
核心控制器会启动定时任务: 每隔10s扫描一次Broker,移除不活跃的Broker。

Broker每隔30s向NameServer发送一个心跳包,心跳包包含BrokerId,Broker地址,Broker名称,Broker所属集群名称、Broker关联的FilterServer列表。但是如果Broker宕机,NameServer无法收到心跳包,此时NameServer如何来剔除这些失效的Broker呢?NameServer会每隔10s扫描brokerLiveTable状态表,如果BrokerLive的lastUpdateTimestamp的时间戳距当前时间超过120s,则认为Broker失效,移除该Broker,关闭与Broker连接,同时更新topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。

在这里插入图片描述

RocketMQ有两个触发点来删除路由信息:

  • NameServer定期扫描brokerLiveTable检测上次心跳包与当前系统的时间差,如果时间超过120s,则需要移除broker。

  • Broker在正常关闭的情况下,会执行unregisterBroker指令这两种方式路由删除的方法都是一样的,都是从相关路由表中删除与该broker相关的信息。

    在消费者启动之后,第一步都要从NameServer中获取Topic相关信息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1302096.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

BEUEC品牌比例阀放大器厂家

HE-SP1/HE-SP2/HE-SP2-U/HE-MPS1/HE-MPS2/HE-MAPQ-V/HE-MAPQ-K/HE-MPT2-Y/HE-MPT2-U比例放大器适配控制各种不带位置反馈比例阀; BEUEC品牌比例放大器控制如博世力士乐(Bosch Rexroth)、伊顿威格士(EATON Vickers)、油…

shiro入门demo

搭建springboot项目&#xff0c;引入以下依赖&#xff1a; <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--单元测试--><depe…

[CIKM 2023] 基于会话推荐的双通道多稀疏图注意网络

Bi-channel Multiple Sparse Graph Attention Networks for Session-based Recommendation GitHub - QEpiphany/MSGAT: Multiple Sparse Graph Attention Networks for Session-based Recommendation 摘要 基于会话的推荐&#xff08;SBR&#xff09;最近受到了极大的关注&…

安装程序无法自动安装Virtual Machine Communication Interface Sockets(VSock)驱动程序

环境情况&#xff1a; 物理机win10系统 虚拟机windowserver08系统 vmware 16.0的版本 问题触发&#xff1a; 在虚拟机win7系统上安装vmware tools出现提示&#xff0c;报错信息“安装程序无法自动安装Virtual Machine Communication Interface Sockets&#xff08;VSock&a…

iPhone 数据恢复:iMyFone D-Back iOS

iMyFone D-Back iOS 最佳 iPhone 数据恢复&#xff0c;最好的 iPhone 数据恢复软件&#xff0c;恢复成功率最高。 直接从iOS设备、iTunes/iCloud/第三方程序备份快速恢复数据。 有选择地恢复已删除的照片、WhatsApp、消息和 18 多种其他数据类型。 仅通过 iCloud 帐户访问即可从…

jmeter 压测需要的部分配置

修改jmeter 目录的bin目录下的jmeter.properties文件 解除KeepAlive设置 修改接口的高级中的实现和超时 解除httpclient4.retrycount前的注释符并将0修改为1 即修改为&#xff1a;httpclient4.retrycount1 解除httpclient4.idletimeout前的注释符并修改为合适间隔 即修改为…

java--Arrays类、自定义排序规则Comparable、自定义比较器Comparator

1.Arrays 用来操作数组的一个工具类。 2.Arrays类提供的常见方法 3.如果数组中存储的是对象&#xff0c;如何排序 方法一&#xff1a;让该对象的类实现Comparable(比较规则)接口&#xff0c;然后重写CompareTo方法&#xff0c;自己来定制比较规则。 方法二&#xff1a;使用下…

​ 轻量应用服务器:亚马逊云科技打造全球领先的云计算解决方案

随着“第四次工业革命”的爆炸式发展&#xff0c;众多企业都将自己的业务与迅速发展的应用开发和网站建设领域高度绑定。而对于众多有上云需求的企业和个人用户来说&#xff0c;选择一款自己的服务器配置就成为了一项至关重要的任务。而随着需求端的不断扩大&#xff0c;云服务…

ssm基于HTML5的出租车管理系统论文

摘 要 网络技术和计算机技术发展至今&#xff0c;已经拥有了深厚的理论基础&#xff0c;并在现实中进行了充分运用&#xff0c;尤其是基于计算机运行的软件更是受到各界的关注。加上现在人们已经步入信息时代&#xff0c;所以对于信息的宣传和管理就很关键。因此出租车信息的管…

工业级路由器在货运物流仓储管理中的应用

工业级路由器在货运物流仓储管理中扮演着重要的角色&#xff0c;为整个物流系统提供了稳定可靠的网络连接和数据传输支持。下面将从以下几个方面介绍工业级路由器在货运物流仓储管理中的应用。 实时监控和追踪&#xff1a;工业级路由器通过与各种传感器、监控设备和物联网设备的…

〖Python网络爬虫实战㊷〗- 极验滑块介绍(四)

订阅&#xff1a;新手可以订阅我的其他专栏。免费阶段订阅量1000 python项目实战 Python编程基础教程系列&#xff08;零基础小白搬砖逆袭) 说明&#xff1a;本专栏持续更新中&#xff0c;订阅本专栏前必读关于专栏〖Python网络爬虫实战〗转为付费专栏的订阅说明作者&#xff1…

Vue项目使用Sortable.js实现拖拽功能

想了解更多-可前往 Sortable.js官网 查看组件属性及参数 安装组件&#xff08;我这里使用的是NPM安装&#xff09; npm install sortablejs --save在需要使用拖拽功能的页面中使用&#xff08;完整功能代码&#xff09; <div class"tag_box"><div class&q…

十六、机器学习进阶知识:线性回归与逻辑回归算法

文章目录 1、线性回归1.1 一元线性回归及实例1.2 多元线性回归及实例 2、逻辑回归2.1 逻辑回归与线性回归的区别2.2 逻辑函数2.3 逻辑回归的概念2.4 损失函数及参数的确定2.6 逻辑回归实例 1、线性回归 回归分析&#xff08;Regression Analysis&#xff09;是确定两种或两种以…

学习 NVIDIA Omniverse 的最基础概念

无用的前言 近两年关于 Omniverse 的宣传一直很多&#xff0c;可我一直没去了解&#xff0c;连它是个啥都不知道。最近正好有契机需要了解它&#xff0c;于是我今天抽时间看了些它的官方介绍&#xff0c;并按照自己的理解梳理在这里。 官方资料索引 Omniverse 官网主页&…

HashMap常见面试问题

简述HashMap原理&#xff1f; HashMap基于数组加链表的方式来实现&#xff0c;数组下标通过hash值来计算&#xff0c;当下表冲突时&#xff0c;就会进行遍历链表&#xff0c;当链表长度大于8的时候会转化为红黑树。 HashMap的put过程&#xff1f; put的第一步是计算hash值&a…

自定义异步任务管理器和线程

import com.lancoo.common.utils.Threads; import com.lancoo.common.utils.spring.SpringUtils;import java.util.TimerTask; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit;/*** 异步任务管理器* * author lancoo*/ public c…

apisix下自定义 Nginx 配置

apisix下自定义 Nginx 配置 在apisix配置文件/conf/config.yaml中添加nginx配置。生成的nginx.conf配置文件如下&#xff1a;说明&#xff1a; APISIX 会通过 apisix/cli/ngx_tpl.lua 这个模板和 conf/config-default.yaml 加 conf/config.yaml 的配置生成 Nginx 配置文件。 在…

如果将视频转化为gif格式图

1.选择视频转换GIF&#xff1a; 2.添加视频文件&#xff1a; 3.点击“开始”&#xff1a; 4.选择设置&#xff0c;将格式选择为1080P更加清晰&#xff1a; 5.输出后的效果图&#xff1a;

传统渠道与互联网新零售较量中:2023年之后电商如何引流裂变?

传统渠道与互联网新零售较量中&#xff1a;2023年之后电商如何引流裂变&#xff1f; 互联网新零售是指通过互联网技术和数据驱动&#xff0c;打造以消费者为中心、以线上线下融合为特点的全新零售模式&#xff0c;也是近年来电商行业转型升级的必然趋势。目前普通市场竞争激烈…

图解python | 元组

1.Python元组 Python的元组与列表类似&#xff0c;不同之处在于元组的元素不能修改。 元组使用小括号&#xff0c;列表使用方括号。 元组创建很简单&#xff0c;只需要在括号中添加元素&#xff0c;并使用逗号隔开即可。 python 复制代码 tup1 (ByteDance, ShowMeAI, 199…