18-RocketMQ源码解读

news2025/1/13 10:30:36

NameServer启动

1、功能回顾
NameServer的核心作用
一是维护Broker的服务地址并进行及时的更新。
二是给Producer和Consumer提供服务获取Broker列表。
2、启动流程-源码重点
整个NameServer的核心就是一个NamesrvController对象。这个controller对象就跟java Web开发中的Controller功能类似,都是响应客户端请求的。
在创建NamesrvController对象时,有两个关键的配置
NamesrvConfig 这个是NameServer自己运行需要的配置信息。
NettyServerConfig 包含Netty服务端的配置参数,默认占用了9876端口。可以在配置文件中覆盖。
然后在启动服务时,启动几个重要组件:
RemotingServer 这个就是用来响应请求的。
还有一个定时任务会定时扫描不活动的Broker。这个Broker管理是通过routeInfoManager这个功能组件。
在关闭服务时,关闭了四个东西
RemotingServer
remotingExecutor Netty服务线程池;
scheduledExecutorService 定时任务;
fileWatchService 这个是用来跟踪TLS配置的。
从启动和关闭这两个关键步骤,我们可以总结出NameServer的组件其实并不是很多,整个NameServer的结构是这样:
在这里插入图片描述
三、Broker启动
1、功能回顾
Broker是整个RocketMQ的业务核心,所有消息存储、转发这些最为重要的业务都是在Broker中进行处理的。

而Broker的内部架构,有点类似于JavaWeb开发的MVC架构。有Controller负责响应请求,各种Service
组件负责具体业务,然后还有负责消息存盘的功能模块则类似于Dao。

所以我们这一段的重点,是通过Broker的启动过程,观察总结出Broker的内部结构。
2、源码重点
Broker启动的入口在BrokerStartup这个类,可以从他的main方法开始调试。
重点也是围绕一个BrokerController对象,先创建,然后再启动。
首先:
在BrokerStartup.createBrokerController方法中可以看到Broker的几个核心配置:
BrokerConfig
NettyServerConfig :Netty服务端占用了10911端口。同样也可以在配置文件中覆盖。
NettyClientConfig
MessageStoreConfig
然后:
在BrokerController.start方法可以看到启动了一大堆Broker的核心服务,

this.messageStore.start();启动核心的消息存储组件
this.remotingServer.start();
this.fastRemotingServer.start(); 启动两个Netty服务
this.brokerOuterAPI.start();启动客户端,往外发请求
BrokerController.this.registerBrokerAll: 向NameServer注册心跳。
this.brokerStatsManager.start();
this.brokerFastFailure.start();这也是一些负责具体业务的功能组件

Broker的一个整体结构
在这里插入图片描述
四、Broker注册
1、功能回顾
Broker会在启动时向NameServer注册自己的服务信息,并且会定时的往NameServer发送心跳信息。而NameServer会维护Broker的路由列表,并对路由列表进行实时更新。
2、源码重点
BrokerController.this.registerBrokerAll方法会发起向NameServer注册心跳。启动时会立即注册,同时也会启动一个线程池,以10秒延迟,默认30秒的间隔 持续向NameServer发送心跳。

BrokerController.this.registerBrokerAll这个方法就是注册心跳的入口。
在这里插入图片描述
然后,在NameServer中也会启动一个定时任务,扫描不活动的Broker。具体观察NamesrvController.initialize方法
五、Producer
1、功能回顾
Producer有两种
一种是普通发送者:DefaultMQProducer。这个只需要构建一个Netty客户端,往Broker发送消息就行了。注意,异步回调只是在Producer接收到Broker的响应后自行调整流程,不需要提供Netty服务。
另一种是事务消息发送者: TransactionMQProducer。这个需要构建一个Netty客户端,往Broker发送消息。同时也要构建Netty服务端,供Broker回查本地事务状态。
2、源码重点
整个Producer的流程,大致分两个步骤
start方法,进行一大堆的准备工作
各种各样的send方法,进行消息发送。
首先 --Broker的核心启动流程:
在mQClientFactory的start方法中,启动了生产者的一大堆重要服务。

然后在DefaultMQProducerImpl的start方法中,又回到了生产者的mqClientFactory的启动过程,这中间有服务状态的管理。
其次:Borker路由信息的管理
Producer需要拉取Broker列表,然后跟Broker建立连接等等很多核心的流程,其实都是在发送消息时建立的。因为在启动时,还不知道要拉取哪个Topic的Broker列表呢。所以对于这个问题,我们关注的重点,不应该是start方法,而是send方法

而对NameServer的地址管理,则是散布在启动和发送的多个过程当中,并且NameServer地址可以通过一个Http服务来获取。

Send方法中,首先需要获得Topic的路由信息。这会从本地缓存中获取,如果本地缓存中没有,就从NameServer中去申请。
路由信息大致的管理流程:
在这里插入图片描述然后 关于Producer的负载均衡。
在之前介绍RocketMQ的顺序消息时,讲到了Producer的负载均衡策略,默认会把消息平均的发送到所有MessageQueue里的。

获取路由信息后,会选出一个MessageQueue去发送消息。这个选MessageQueue的方法就是一个索引自增然后取模的方式。
然后
在发送Netty请求时,实际上是指定的MessageQueue,而不是Topic。Topic只是用来找MessageQueue。

然后根据MessageQueue再找所在的Broker,往Broker发送请求。
六、消息存储
1、功能回顾
Producer把消息发到了Broker,接下来就关注下Broker接收到消息后是如何把消息进行存储的。
commitLog:消息存储目录
config:运行期间一些配置信息
consumerqueue:消息消费队列存储目录
index:消息索引文件存储目录
abort:如果存在改文件寿命Broker非正常关闭
checkpoint:文件检查点,存储CommitLog文件最后一次刷盘时间戳、consumerquueue最后一次刷盘时间,index索引文件最后一次刷盘时间戳。
2、源码重点:
消息存储的入口在:DefaultMessageStore.putMessage
1-commitLog写入
CommitLog的doAppend方法就是Broker写入消息的实际入口。这个方法最终会把消息追加到MappedFile映射的一块内存里,并没有直接写入磁盘。写入消息的过程是串行的,一次只会允许一个线程写入。
2-分发ConsumeQueue和IndexFile
当CommitLog写入一条消息后,在DefaultMessageStore的start方法中,会启动一个后台线程reputMessageService每隔1毫秒就会去拉取CommitLog中最新更新的一批消息,然后分别转发到ComsumeQueue和IndexFile里去,这就是他底层的实现逻辑。

并且,如果服务异常宕机,会造成CommitLog和ConsumeQueue、IndexFile文件不一致,有消息写入CommitLog后,没有分发到索引文件,这样消息就丢失了。DefaultMappedStore的load方法提供了恢复索引文件的方法,入口在load方法。
3、文件同步刷盘与异步刷盘
入口:CommitLog.putMessage -> CommitLog.handleDiskFlush
其中主要涉及到是否开启了对外内存。TransientStorePoolEnable。如果开启了堆外内存,会在启动时申请一个跟CommitLog文件大小一致的堆外内存,这部分内存就可以确保不会被交换到虚拟内存中。
4、过期文件删除
入口: DefaultMessageStore.addScheduleTask ->DefaultMessageStore.this.cleanFilesPeriodically()

默认情况下, Broker会启动后台线程,每60秒,检查CommitLog、ConsumeQueue文件。然后对超过72小时的数据进行删除。也就是说,默认情况下, RocketMQ只会保存3天内的数据。这个时间可以通过fileReservedTime来配置。注意他删除时,并不会检查消息是否被消费了。整个文件存储的核心入口入口在DefaultMessageStore的start方法中。
在这里插入图片描述5文件存储部分的总结:
RocketMQ的存储文件包括消息文件(Commitlog)、消息消费队列文件(ConsumerQueue)、Hash索引文件(IndexFile)、监测点文件(checkPoint)、abort(关闭异常文件)。单个消息存储文件、消息消费队列文件、Hash索引文件长度固定以便使用内存映射机制进行文件的读写操作。RocketMQ组织文件以文件的起始偏移量来命令文件,这样根据偏移量能快速定位到真实的物理文件。RocketMQ基于内存映射文件机制提供了同步刷盘和异步刷盘两种机制,异步刷盘是指在消息存储时先追加到内存映射文件,然后启动专门的刷盘线程定时将内存中的文件数据刷写到磁盘。

CommitLog,消息存储文件,RocketMQ为了保证消息发送的高吞吐量,采用单一文件存储所有主题消息,保证消息存储是完全的顺序写,但这样给文件读取带来了不便,为此RocketMQ为了方便消息消费构建了消息消费队列文件,基于主题与队列进行组织,同时RocketMQ为消息实现了Hash索引,可以为消息设置索引键,根据所以能够快速从CommitLog文件中检索消息。

当消息达到CommitLog后,会通过ReputMessageService线程接近实时地将消息转发给消息消费队列文件与索引文件。为了安全起见,RocketMQ引入abort文件,记录Broker的停机是否是正常关闭还是异常关闭,在重启Broker时为了保证CommitLog文件,消息消费队列文件与Hash索引文件的正确性,分别采用不同策略来恢复文件。

RocketMQ不会永久存储消息文件、消息消费队列文件,而是启动文件过期机制并在磁盘空间不足或者默认凌晨4点删除过期文件,文件保存72小时并且在删除文件时并不会判断该消息文件上的消息是否被消费。

七、消费者
1、功能回顾
消费者也是有两种,推模式消费者和拉模式消费者。消费者的使用过程也跟生产者差不多,都是先start()然后再开始消费。

消费者以消费者组的模式开展。消费者组之间有集群模式和广播模式两种消费模式。我们就要了解下这两种集群模式是如何做的逻辑封装。

然后我们关注下消费者端的负载均衡的原理。即消费者是如何绑定消费队列的。

最后我们来关注下在推模式的消费者中,MessageListenerConcurrently 和MessageListenerOrderly这两种消息监听器的处理逻辑到底有什么不同,为什么后者能保持消息顺序。

2、源码重点:
1、启动
DefaultMQPushConsumer.start方法启动过程不用太过关注,有个概念就行,然后客户端启动的核心是mQClientFactory 主要是启动了一大堆的服务。这些服务可以结合具体场景再进行深入。
例如pullMessageService主要处理拉取消息服务,
rebalanceService主要处理客户端的负载均衡
2、消息拉取:
拉模式: PullMessageService
PullRequest里有messageQueue和processQueue,其中messageQueue负责拉取消息,拉取到后,将消息存入processQueue,进行处理。 存入后就可以清空messageQueue,继续拉取了。
在这里插入图片描述3 客户端负载均衡策略
在消费者示例的start方法中,启动RebalanceService,这个是客户端进行负载均衡策略的启动服务。他只负责根据负载均衡策略获取当前客户端分配到的MessageQueue示例。
五种负载策略,可以由Consumer的allocateMessageQueueStrategy属性来选择
最常用的是AllocateMessageQueueAveragely平均分配和AllocateMessageQueueAveragelyByCircle平均轮询分配。

平均分配是把MessageQueue按组内的消费者个数平均分配。
而平均轮询分配就是把MessageQueue按组内的消费者一个一个轮询分配。

例如,六个队列q1,q2,q3,q4,q5,q6,分配给三个消费者c1,c2,c3
平均分配的结果就是: c1:{q1,q2},c2:{q3,q4},c3{q5,q6}
平均轮询分配的结果就是: c1:{q1,q4},c2:{q2,q5},c3:{q3,q6}

4 并发消费与顺序消费的过程
消费的过程依然是在DefaultMQPushConsumerImpl的 consumeMessageService中。他有两个子类ConsumeMessageConcurrentlyService和ConsumeMessageOrderlyService。其中最主要的差别是ConsumeMessageOrderlyService会在消费前把队列锁起来,优先保证拉取同一个队列里的消息。

消费过程的入口在DefaultMQPushConsumerImpl的pullMessage中定义的PullCallback中。

八、延迟消息
1、功能回顾
延迟消息的核心使用方法就是在Message中设定一个MessageDelayLevel参数,对应18个延迟级别。然后Broker中会创建一个默认的Schedule_Topic主题,这个主题下有18个队列,对应18个延迟级别。消息发过来之后,会先把消息存入Schedule_Topic主题中对应的队列。然后等延迟时间到了,再转发到目标队列,推送给消费者进行消费。
整个延迟消息的实现方式是这样的:

在这里插入图片描述
2、源码重点
延迟消息的处理入口在scheduleMessageService这个组件中。 他会在broker启动时也一起加载。

1、消息写入:
代码见CommitLog.putMessage方法。在CommitLog写入消息时,会判断消息的延迟级别,然后修改Message的Topic和Queue,达到转储Message的目的。
2、消息转储到目标Topic

这个转储的核心服务是scheduleMessageService,他也是Broker启动过程中的一个功能组件、然后ScheduleMessageService会每隔1秒钟执行一个executeOnTimeup任务,将消息从延迟队列中写入正常Topic中。 代码见ScheduleMessageService中的DeliverDelayedMessageTimerTask.executeOnTimeup方法

3 消费者部分小结:
RocketMQ消息消费方式分别为集群模式、广播模式。

消息队列负载由RebalanceService线程默认每隔20s进行一次消息队列负载,根据当前消费者组内消费者个数与主题队列数量按照某一种负载算法进行队列分配,分配原则为同一个消费者可以分配多个消息消费队列,同一个消息消费队列同一个时间只会分配给一个消费者。

消息拉取由PullMessageService线程根据RebalanceService线程创建的拉取任务进行拉取,默认每次拉取32条消息,提交给消费者消费线程后继续下一次消息拉取。如果消息消费过慢产生消息堆积会触发消息消费拉取流控。

并发消息消费指消费线程池中的线程可以并发对同一个消息队列的消息进行消费,消费成功后,取出消息队列中最小的消息偏移量作为消息消费进度偏移量存储在于消息消费进度存储文件中,集群模式消息消费进度存储在Broker(消息服务器),广播模式消息消费进度存储在消费者端。

RocketMQ不支持任意精度的定时调度消息,只支持自定义的消息延迟级别,例如1s、2s、5s等,可通过在broker配置文件中设置messageDelayLevel。

顺序消息一般使用集群模式,是指对消息消费者内的线程池中的线程对消息消费队列只能串行消费。与并发消息消费最本质的区别是消息消费时必须成功锁定消息消费队列,在Broker端会存储消息消费队列的锁占用情况。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/124950.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

3D可视化大屏是如何实现的?

3D可视化是指拥有3D效果的数据可视化,对于所要展示的数据可视化内容还原出真实场景,并实时接入数据,在面对复杂操作时灵活应对,使得整个场景在大屏上的展示更具立体、更具科技感、更具易用性。 物联网时代,可视化大屏的…

【发表案例】传感器网络及电路类,仅1个月26天录用

【期刊简介】IF:1.0-2.0,JCR4区,中科院4区 【检索情况】SCI 在检,正刊 【征稿领域】自主传感器网络的高级接口电路及其应用 【参考周期】2个月左右录用 【截稿日期】2023.1.31 重要时间节点:仅1个月26天录用 2022/12…

神经网络中常用的权重初始化方法及为何不能全初始化为0

1.权重初始化的重要性 神经网络的训练过程中的参数学习时基于梯度下降算法进行优化的。梯度下降法需要在开始训练时给每个参数赋予一个初始值。这个初始值的选取十分重要。在神经网络的训练中如果将权重全部初始化为0,则第一遍前向传播过程中,所有隐藏层…

深度学习笔记:感知机

感知机(perceptron)为神经网络的起源算法。感知机接受多个输入信号,输出一个信号。感知机信号只有0和1。 在上图的感知机中,x1和x2两个输入信号会分别乘以其对应权重(weight) w1和w2,传入神经元。神经元计算传来信号综…

Disentangled Face Attribute Editing via Instance-Aware Latent Space Search翻译

论文地址 代码地址 摘要 最近的研究表明,生成对抗网络(GAN)的潜空间中存在一组丰富的语义方向,这使得各种面部属性编辑应用成为可能。然而,现有的方法可能会遇到属性变化不好的问题,从而导致在更改所需属…

JS中数组对象使用

文章目录一、创建数组对象二、数组翻转1.检测数组2.翻转数组:三、添加数组元素1.push方法2.unshift方法四、删除数组元素1.pop方法2.shift方法🤘案例1五、数组排序六、数组索引方法1.indexof(数组元素)2.lastIndexOf方法🤟案例2七、数组转化为…

数字验证学习笔记——SystemVerilog芯片验证16 ——约束控制块随机函数

一、约束块控制 一个类可以包含多个约束块。可以把不同约束块用于不同测试。一般情况下,各个约束块之间的约束内容是互相协调不违背的,因此通过随机函数产生随机数时可以找到合适的解 如果子类继承父类,也继承了父类的约束,这个时…

基于蒙特卡诺的电动汽车充电负荷曲线研究(充电开始时间,充电电量,充电功率)(Matlab代码实现)

💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…

【C++】 STL-vector模拟实现

文章目录vector源码的内容:成员变量默认构造函数构造函数1-无参构造构造函数2 -使用n个相同的值构造构造函数3-使用迭代器区间构造拷贝构造函数**传统写法**现代写法赋值重载函数opeartor传统写法现代写法析构函数迭代器begin & end任意类型vector容器迭代器通用遍历方式:容…

paddleOCRv3之四: rec识别部分用 tensorRT(C++)部署

文章目录1. 简介:速度测试2. paddle 模型转onnx3. onnx转为tensorRT的engine模型4. tensorRT在vs2017中的配置5. 源码1. 简介: tensorRT是nvdia GPU模型部署的一个框架,似乎只是部分开源,github地址.大多数时候用这个框架去部署模…

十九、Docker容器监控之CAdvisor+InfluxDB+Granfana

1、概述 Docker自带查询容器状态的命令:docker stats,可以看到容器的ID\名称、占用CPU、内存等信息 但是我们不能时时刻刻的盯着这个命令,并且这个都是实时数据不能留痕,如果这个时候某一个容器挂了,我们想查看下当时…

webpack性能优化

splitChunks webpack splitChunks minSize: 只有到目标文件超过这个minSize时才会分包。cacheGroups: 可以对某个第三方包进行单独分离出来 例如: splitChunks: {minSize: 300 * 1024,chunks: all,name: aaa,cacheGroups: {jquery: {name: jquery,test…

SCADA平台在风电场测量的应用,实现风电场的高效管理

一、应用背景 随着煤碳、石油等能源的逐渐枯竭,人类越来越重视可再生能源的利用。风能作为一种清洁的可再生能源日益受到世界各国的重视。中国风能储量大,分布面广,仅陆地上的风能储量就约2.53亿千瓦。我国的风电发展起步较晚,但…

大数据教学实训沙盘介绍

沙盘的作用主要有3个: 1、采集真实数据,解决教学中缺少真实数据的困扰; 2、形成从数据采集、预处理、挖掘建模、模型部署的业务闭环,可以把构建模型发布到沙盘系统上,根据模型产生真实的反馈不断的修正模型精度&#x…

DoIP协议从入门到精通系列——车辆声明

上篇文章对DoIP中物理连接做了说明和描述,介绍了以太网应用到车载网络中重要的两个组织: IEEE;OPEN联盟。本文主要对物理连接后,车辆进行自属信息声明过程做一个完整描述。 一、基础信息 DoIP协议标准由一个或多个DoIP实体实施,具体取决于车辆的网络架构。如下图是车辆网…

SuperMap iServer在不同系统中设置开机自启动--Windows篇

目录前言1.删除已有的 SuperMap iServer 系统服务2.注册 SuperMap iServer 系统服务3.设置 SuperMap iServer 系统服务开机自启动实例作者:kxj 前言 在成功部署SuperMap iServer之后,每次重启电脑都需要手动去启动iServer,如何能让iServer在…

HTML5 Web Worker(多线程处理)

文章目录HTML5 Web Worker(多线程处理)概述简单使用处理复杂数据HTML5 Web Worker(多线程处理) 概述 JavaScript的执行环境是单线程的,也就是一次只能执行一个任务。如果遇到多个任务时,只能排队依次执行。 在HTML5中,可以使用Web Worker创…

小程序集成Three.js,使用npm安装gsap动画库

0.视频演示 three.js集成gsap创建物体动画gsap作为简单易用的补间动画库,获得开发者一致好评。 在小程序中,我们集成了Three.js第三方库,可以创建和加载模型及场景,但是做动画还是需要第三方库的支持。 下面详细说明如何在小程序…

Java SPI机制详解

一、什么是SPI SPI全称Service Provider Interface,是Java提供的一种服务发现机制。实现服务接口和服务实现的解耦。 Java SPI 实际上是“基于接口的编程+策略模式+配置文件”组合实现的动态加载机制,实现不修改任何代码的情况下…

不错的一个麦肯锡信任公式

1)可信度:这人是不是专家。 你是否让他人可以相信你这个人。这取决于你解决问题的能力、经验、专业知识、资源等等;这个人的专业能力是否真有别人说的那么出色,是否能够胜任这份工作呢?过往的履历中是否做过足以让我值…