RocketMQ源码学习笔记:Broker接受消息和发送消息

news2024/11/24 0:16:10

这是本人学习的总结,主要学习资料如下

  • 马士兵教育
  • rocketMq官方文档

目录

  • 1、Overview
  • 2、技术亮点
    • 2.1、消息写入时的自旋锁和可重入锁
    • 2.2、堆外内存机制
      • 2.2.1、Overview
      • 2.2.2、源码
        • 2.2.2.1、开启堆外内存的条件
        • 2.2.2.2、堆外内存的初始化
        • 2.2.2.3、写消息到堆外内存
        • 2.2.2.4、堆外内存同步数据到磁盘

1、Overview

这是Broker中类的架构图。

在这里插入图片描述

发送和接收消息的代码流程是从上到下的,比如接受消息的流程就是SendMessageProcessor#processRequest-> DefaultMessageStore#asyncPutMessage -> CommitLog#asyncPutMessage -> MappedFile#appendMessage

2、技术亮点

2.1、消息写入时的自旋锁和可重入锁

CommitLog的构造方法中,初始化了这么一个锁。在写入消息时会调用这个锁的lock()unlock()方法。

this.putMessageLock = defaultMessageStore.getMessageStoreConfig()
.isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() 
: new PutMessageSpinLock();

默认情况下是自旋锁,我们也可以配置成可重入锁。

我们看看PutMessageSpinLock怎么实现的。

public class PutMessageSpinLock implements PutMessageLock {
    //true: Can lock, false : in lock.
    private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);
    @Override
    public void lock() {
        boolean flag;
        do {
            flag = this.putMessageSpinLock.compareAndSet(true, false);
        }
        while (!flag);
    }
    @Override
    public void unlock() {
        this.putMessageSpinLock.compareAndSet(false, true);
    }
}

这个自旋实现的很简单,就是不断地循环然后通过CAS加锁解锁,所以这个锁不会阻塞线程,不涉及操作系统上下文切换,只是CPU空转。

PutMessageReentrantLock则更简单,它直接使用ReentrantLock来加锁解锁。所以可能会导致线程阻塞或者挂起。

官方文档建议,异步刷盘时使用自旋锁,同步刷盘使用可重入锁。

因为异步刷盘速度快,消息到Borker内存就可以返回发送成功,占有锁的时间较少,自旋锁能有最大的效率。

同步刷盘需要等到消息写入磁盘后才能返回发送成功,占有所得时间较长,用自旋锁会导致大量线程空转占用CPU。所以需要用可重入锁将获取锁失败的线程挂起。

2.2、堆外内存机制

2.2.1、Overview

堆外内存机制用于高并发的场景。

因为高并发会在JVM中产生大量的对象,很可能会频繁地触发GC导致STW暂停业务线程。

堆外内存是指从内存中开辟一个新的空间,这个空间的回收不受GC的控制,完全交给开发者。

这片堆外内存会被当成一个缓存,Broker接受到的消息对象会存放到堆外内存中,然后定时从把消息从堆外内存中刷到磁盘。

因为堆外内存的垃圾回收不受GC控制,而是交给开发者,所以就能保证垃圾回收的频率够低,保证业务线程尽可能少地暂停。

这是消息写入时,普通模式和开启堆外内存时的流程图。
在这里插入图片描述


2.2.2、源码

2.2.2.1、开启堆外内存的条件

MessageStoreConfig中可以看到什么情况才会被RocketMQ认为当前开启了堆外内存。

public boolean isTransientStorePoolEnable() {
    return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType()
        && BrokerRole.SLAVE != getBrokerRole();
}

可以看到,三个条件同时满足才能开启堆外内存。

  1. Broker.conf中设置transientStorePoolEnable=true
  2. 刷盘方式是异步刷盘:第二个刷盘方式必须是异步刷盘。这是因为同步刷盘要求数据写到磁盘后才返回ACK给生产者,这需要较长的时间。但堆外内存的意义就是为了满足高并发,同步刷盘与之相违背,所以只能是异步刷盘。
  3. 当前的Broker必须是Master:因为主从架构中,从节点只能只能被消费者读消息不能被生产者写消息,而堆外内存只是一个写数据时的缓存,读数据还是得从磁盘中读。所以从节点开启堆外内存没意义,反而会占用内存影响性能。

2.2.2.2、堆外内存的初始化

初始化的内容比较简单,靠外部配置就足够的话,一般是在BrokerStartup#createBrokerController中。比较复杂的则是在BrokerController#createBrokerController中。

堆外内存和DefaultMessageStore有关,初始化在DefaultMessageStore的构造方法。下面是相关代码。

if (messageStoreConfig.isTransientStorePoolEnable()) {
    this.transientStorePool.init();
}
public void init() {
    // poolSize = 5 by default
    for (int i = 0; i < poolSize; i++) {
        ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);

        final long address = ((DirectBuffer) byteBuffer).address();
        Pointer pointer = new Pointer(address);
        LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));

        availableBuffers.offer(byteBuffer);
    }
}

从代码中可以看到,初始化具体做的事是新建默认5个ByteBuffer对象,然后存放在availableBuffers中,availableBuffers是一个队列。

这5个ByteBuffer是映射到堆外内存的缓存,后续将通过这几个缓存向堆外内存中放数据。

之后在初始化BrokerController#intialize()中,会通过线程AllocateMappedFileService调用MappedFile#init()方法,将上面初始化好的availableBuffers通过transientStorePool#borrowBuffer()传给MappedFilewriteBuffer。这样写消息时,MappedFile就可以通过writeBuffer向堆外内存写数据。

// init with off-heap
public void init(final String fileName, final int fileSize,
    final TransientStorePool transientStorePool) throws IOException {
    init(fileName, fileSize);
    this.writeBuffer = transientStorePool.borrowBuffer();
    this.transientStorePool = transientStorePool;
}

2.2.2.3、写消息到堆外内存

写消息的流程是SendMessageProcessor#processRequest-> DefaultMessageStore#asyncPutMessage -> CommitLog#asyncPutMessage -> MappedFile#appendMessage -> MappdFile#appendMessagInner,即使开启堆外内存也是一样。

所以我们可以来看看MappedFile#appendMessagInner怎么实现写消息到堆外内存。这里截取关键片段。

 // 如果writeBuffer不为空,则开启了堆外内存,否则用正常的mappedByteBuffer
 ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
 byteBuffer.position(currentPos);
 AppendMessageResult result;
 if (messageExt instanceof MessageExtBrokerInner) {
     result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
             (MessageExtBrokerInner) messageExt, putMessageContext);
 } else if (messageExt instanceof MessageExtBatch) {
     result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
             (MessageExtBatch) messageExt, putMessageContext);
 } else {
     return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
 }

代码中可以看到,关键点是writeBuffer。之前提到开启了堆外内存,那初始化时会将堆外内存的映射缓存传给MappedFile中的workBuffer;如果没开启堆外内存则writeBuffer为null。

所以开启堆外内存就向writeBuffer写数据到堆外内存;没有开启就向mappedByteBuffer写数据到磁盘。


2.2.2.4、堆外内存同步数据到磁盘

堆外内存只是一个缓存,最终数据应该同步到磁盘。RocketMQ设置了一个定时线程做这个工作,叫CommitRealTimeService

默认200ms同步一次。因为MQ就是用于异步的场景,所以写完数据至少200ms后才能读到也是可以忍耐的。

具体的代码不展示,没什么值得说的地方。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1877106.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

昇思25天学习打卡营第2天|数据集Dataset

学习目标&#xff1a;熟练掌握mindspore.dataset mindspore.dataset中有常用的视觉、文本、音频开源数据集供下载&#xff0c;点赞、关注收藏哦 了解mindspore.dataset mindspore.dataset应用实践 拓展自定义数据集 昇思平台学习时间记录: 一、关于mindspore.dataset minds…

华测视频RTK,AR实景导航

华测导航视频测量RTK技术,通过融合卫星导航、惯导与视频摄影测量算法,让“所见即所测”成为现实,让测量工作变得更加智能、高效。 视频测量RTK:智能测绘的新里程碑 华测RTK的性能和广泛应用,在市场中获得了用户的认可,平均每10位用户中即有6位推荐。其视频测量功能通过引入自动…

【硬件视界2】什么是CPU和GPU?有什么区别?

名人说&#xff1a;莫听穿林打叶声&#xff0c;何妨吟啸且徐行。—— 苏轼《定风波莫听穿林打叶声》 本篇笔记整理&#xff1a;Code_流苏(CSDN)&#xff08;一个喜欢古诗词和编程的Coder&#x1f60a;&#xff09; 目录 1、CPU (中央处理器)①主要作用②特点 2、 GPU (图形处理…

HarmonyOS Next开发学习手册——弹性布局 (Flex)

概述 弹性布局&#xff08; Flex &#xff09;提供更加有效的方式对容器中的子元素进行排列、对齐和分配剩余空间。常用于页面头部导航栏的均匀分布、页面框架的搭建、多行数据的排列等。 容器默认存在主轴与交叉轴&#xff0c;子元素默认沿主轴排列&#xff0c;子元素在主轴…

Workbench密码登录登录失败

Workbench密码登录登录失败操作系统禁用了密码登录方式&#xff0c;会导致使用了正确的用户名和密码仍无法登录 sudo vim /etc/ssh/sshd_config 输入O进入编辑 改完后重启 systemctl restart sshd.service 登录报错 有试了几遍登上了 可能是改完还要等一会儿

ros1仿真导航机器人 基础传感器数据读取

仅为学习记录和一些自己的思考&#xff0c;不具有参考意义。 1 仿真环境 gazebo、rviz、ros1 2 机器人模型 <?xml version"1.0"?> <robot name"wpb_home_gazebo"><link name"base_footprint"><visual><origin …

第6章_libmodbus使用

文章目录 第6章 libmodbus使用6.1 libmodbus开发库6.1.1 功能概要6.1.2 源码获取6.1.3 源码阅读1. 新建工程2. 同步文件3.打开工程4. 操作示例5. 快捷键 6.1.4 libmodbus与应用程序的关系 6.2 libmodbus源代码解析6.2.1 核心函数6.2.2 框架分析与数据结构6.2.3 情景分析1. 初始…

ASUS华硕A豆14笔记本电脑I421EAYB,I421EQYB_ADOL14EA工厂模式原厂Win11系统安装包下载

适用型号&#xff1a;ADOL14EA笔记本I421EAYB、I421EQYB 链接&#xff1a;https://pan.baidu.com/s/1krU8m_lbApyUfZQo5E4cCQ?pwd0ewl 提取码&#xff1a;0ewl 华硕原装WIN11系统工厂安装包&#xff0c;带有MyASUS WinRE RECOVERY恢复功能、自带所有驱动、出厂主题壁纸、系…

磁共振图像MRI重建实现

最近涉及到了磁共振图像MRI的重建&#xff0c;网络上相关的实现比较少&#xff0c;因此进行实现记录。 磁共振图像MRI重建实现 1.配置代码环境2.MRI数据集处理3.配置数据集以及模型文件5.训练 1.配置代码环境 这里介绍一个很好的开源项目&#xff0c;git为&#xff1a; https…

【SpringMVC】_SpringMVC实现留言墙

目录 1. 需求分析 2. 接口定义 2.1 提交留言 2.2 获取全部留言 3. 响应数据 4. 服务器代码 4.1 MessageInfo 文件 4.2 MessageController 文件 5. 前端页面代码 5. 运行测试 1. 需求分析 实现如下页面&#xff1a; 1、输入留言信息&#xff0c;点击提交后&#xff0…

京东618风云再起,极空间私有云蝉联销冠,AI NAS技术绘就行业新篇章

2023年极空间私有云占据京东618全周期网络存储成交额的榜首&#xff0c;而在今年618极空间延续了往年佳绩再次斩获销冠之位&#xff0c;这已是其连续两年在京东618中夺得销售冠军。自进军NAS行业以来&#xff0c;极空间不仅深耕于智能存储技术&#xff0c;更积极投身AI研发&…

6.27-6.29 旧c语言

#include<stdio.h> struct stu {int num;float score;struct stu *next; }; void main() {struct stu a,b,c,*head;//静态链表a.num 1;a.score 10;b.num 2;b.score 20;c.num 3;c.score 30;head &a;a.next &b;b.next &c;do{printf("%d,%5.1f\n&…

JeecgBoot中如何对敏感信息进行脱敏处理?

数据脱敏即将一些敏感信息通过加密、格式化等方式处理&#xff0c;展示给用户一个新的或是格式化后的信息&#xff0c;避免了敏感信息的暴露。 一、接口脱敏注解 针对接口数据实现脱敏加密&#xff0c;只加密&#xff0c;一般此方案用于数据加密展示。 1.1 注解介绍 注解作用域…

C语言 | Leetcode C语言题解之第204题计数质数

题目&#xff1a; 题解&#xff1a; int countPrimes(int n) {if (n < 2) {return 0;}int isPrime[n];int primes[n], primesSize 0;memset(isPrime, 0, sizeof(isPrime));for (int i 2; i < n; i) {if (!isPrime[i]) {primes[primesSize] i;}for (int j 0; j < …

stm32学习笔记---ADC模数转换器(代码部分)AD单通道/多通道

目录 第一个代码&#xff1a;AD单通道 ADC初始化步骤 ADC相关的库函数 RCC_ADCCLKConfig 三个初始化相关函数 ADC_Cmd ADC_DMACmd ADC_ITConfig 四个校准相关函数 ADC_SoftwareStartConvCmd ADC_GetSoftwareStartConvStatus ADC_GetFlagStatus ADC_RegularChannel…

Flask之电子邮件

前言&#xff1a;本博客仅作记录学习使用&#xff0c;部分图片出自网络&#xff0c;如有侵犯您的权益&#xff0c;请联系删除 目录 一、使用Flask-Mail发送电子邮件 1.1、配置Flask-Mail 1.2、构建邮件数据 1.3、发送邮件 二、使用事务邮件服务SendGrid 2.1、注册SendGr…

【2024最新华为OD-C/D卷试题汇总】[支持在线评测] LYA的字符串拼接游戏(200分) - 三语言AC题解(Python/Java/Cpp)

&#x1f36d; 大家好这里是清隆学长 &#xff0c;一枚热爱算法的程序员 ✨ 本系列打算持续跟新华为OD-C/D卷的三语言AC题解 &#x1f4bb; ACM银牌&#x1f948;| 多次AK大厂笔试 &#xff5c; 编程一对一辅导 &#x1f44f; 感谢大家的订阅➕ 和 喜欢&#x1f497; &#x1f…

动手学深度学习(Pytorch版)代码实践 -卷积神经网络-21多输入多输出通道

21多输入多输出通道 import torch from d2l import torch as d2ldef corr2d(X, K):"""计算二维互相关运算"""h, w K.shapeY torch.zeros((X.shape[0] - h 1, X.shape[1] - w 1))for i in range(Y.shape[0]):for j in range(Y.shape[1]):Y[i,…

【操作系统期末速成】 EP02 | 学习笔记(基于五道口一只鸭)

文章目录 一、前言&#x1f680;&#x1f680;&#x1f680;二、正文&#xff1a;☀️☀️☀️2.1 考点二&#xff1a;操作系统的功能及接口2.2 考点三&#xff1a;操作系统的发展及分类2.3 考点四&#xff1a;操作系统的运行环境&#xff08;重要&#xff09; 一、前言&#x…

私域流量的深度解析与电商应用

一、私域流量的核心价值 在当今数字化时代&#xff0c;流量成为了企业发展的重要资源。与公域流量相比&#xff0c;私域流量以其独有的私有性和可复用性&#xff0c;为企业提供了与用户建立深度联系的机会。私域流量不仅有助于企业精准触达目标用户&#xff0c;还能通过数据分…