全网最细RocketMQ源码四:消息存储

news2024/9/28 1:22:51

看完上一章之后,有没有很好奇,生产者发送完消息之后,server是如何存储,这一章节就来学习

入口

SendMessageProcessor.processRequest
在这里插入图片描述
在这里插入图片描述

  private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                                SendMessageContext mqtraceContext,
                                                                SendMessageRequestHeader requestHeader) {
        final RemotingCommand response = preSend(ctx, request, requestHeader);
        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();

        if (response.getCode() != -1) {
            return CompletableFuture.completedFuture(response);
        }

        final byte[] body = request.getBody();

        int queueIdInt = requestHeader.getQueueId();
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());

        if (queueIdInt < 0) {
            queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
        }

        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setTopic(requestHeader.getTopic());
        msgInner.setQueueId(queueIdInt);

        if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
            return CompletableFuture.completedFuture(response);
        }

        msgInner.setBody(body);
        msgInner.setFlag(requestHeader.getFlag());
        MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
        msgInner.setPropertiesString(requestHeader.getProperties());
        msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
        msgInner.setBornHost(ctx.channel().remoteAddress());
        msgInner.setStoreHost(this.getStoreHost());
        msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
        String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));

        CompletableFuture<PutMessageResult> putMessageResult = null;
        Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
        String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        if (transFlag != null && Boolean.parseBoolean(transFlag)) {
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark(
                        "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                                + "] sending transaction message is forbidden");
                return CompletableFuture.completedFuture(response);
            }
            putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
        } else {
        // 使用defaultMessageStore.aysncPutMessage存储
            putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
        }
        return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
    }

实际真正的负责存储就是DefaultMessageStore, 不过在讲述DefaultMessageStore的时候,我们是自底往上学,因为DefaultMessageStore比较复杂,从顶往下学容易学乱。先从地基开始,然后再看高楼大厦

MappedFile

public class MappedFile extends ReferenceResource {
    // 内存页大小:4k
    public static final int OS_PAGE_SIZE = 1024 * 4;
    protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    // 当前进程下 所有的 mappedFile占用的总虚拟内存大小
    private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);

    // 当前进程下 所有的 mappedFile个数
    private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
    // 当前mappedFile数据写入点
    protected final AtomicInteger wrotePosition = new AtomicInteger(0);
    protected final AtomicInteger committedPosition = new AtomicInteger(0);
    // 当前mappedFIle数据落盘位点(flushedPosition 之前的数据 都是安全数据,flushedPosition~wrotePosition之间的数据 属于脏页)
    private final AtomicInteger flushedPosition = new AtomicInteger(0);
    // 文件大小
    protected int fileSize;
    // 文件通道
    protected FileChannel fileChannel;
    /**
     * Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
     */
    protected ByteBuffer writeBuffer = null;
    protected TransientStorePool transientStorePool = null;

    // 文件名称(commitLog ConsumeQueue:文件名就是 第一条消息的 物理偏移量   索引文件: 年月日小时分钟秒.. )
    private String fileName;
    // 文件名转long
    private long fileFromOffset;
    // 文件对象
    private File file;
    // 内存映射缓冲区,访问虚拟内存
    private MappedByteBuffer mappedByteBuffer;
    // 该文件下 保存的第一条 msg 的存储时间
    private volatile long storeTimestamp = 0;
    // 当前文件如果是 目录内 有效文件的 首文件的话,该值为true
    private boolean firstCreateInQueue = false;
  • 构造方法
    在这里插入图片描述
    在这里插入图片描述

  • appendMessage方法
    在这里插入图片描述
    在这里插入图片描述

  • appendMessage(byte[] data)
    在这里插入图片描述

  • flush
    在这里插入图片描述

MappedFileQueue

public class MappedFileQueue {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);

    private static final int DELETE_FILES_BATCH_MAX = 10;

    // mfq 管理的目录(CommitLog: ../store/commitlog  或者  consumeQueue: ../store/xxx_topic/0)
    private final String storePath;

    // 目录下每个文件大小(commitLog文件 默认 1g     consumeQueue 文件 默认 600w字节)
    private final int mappedFileSize;

    // list,目录下的每个 mappedFile 都加入该list
    private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();

    // 创建mappedFile的服务,内部有自己的线程,咱们通过向他提交 request ,内部线程处理完后 会返回给我们结果  结果 就是 mappedFile对象。
    private final AllocateMappedFileService allocateMappedFileService;

    // 目录的刷盘位点(它的值: curMappedFile.fileName + curMappedFile.wrotePosition)
    private long flushedWhere = 0;
    private long committedWhere = 0;

    // 当前目录下最后一条msg存储时间
    private volatile long storeTimestamp = 0;
  • load方法
    在这里插入图片描述

  • getLastMappedFile

 /**
     * 参数1:startOffset ,文件起始偏移量
     * 参数2:needCreate ,当 list 为空时,是否创建mappedFile
     */
    public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
        // 该值控制是否创建 mappedFile,当需要创建mappedFile时,它充当文件名的结尾
        // 两种情况会创建:
        // 1. list内没有mappedFile
        // 2. list最后一个 mappedFile (当前顺序写的mappedFile) 它写满了..
        long createOffset = -1;

        MappedFile mappedFileLast = getLastMappedFile();

        if (mappedFileLast == null) {// 情况1  list内没有mappedFile
            // createOffset 取值 必须是 mappedFileSize 的倍数 或者 0
            createOffset = startOffset - (startOffset % this.mappedFileSize);
        }

        if (mappedFileLast != null && mappedFileLast.isFull()) { // 情况2  list最后一个 mappedFile (当前顺序写的mappedFile) 它写满了..
            // 上一个文件名 转long + mappedFileSize
            createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
        }


        if (createOffset != -1 && needCreate) {// 这里面是创建 新的 mappedFile 的逻辑。
            // 获取待创建文件的 绝对路径(下次即将要创建的文件名)
            String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);

            // 获取 下下次 要创建的文件的 绝对路径
            String nextNextFilePath = this.storePath + File.separator
                + UtilAll.offset2FileName(createOffset + this.mappedFileSize);

            MappedFile mappedFile = null;


            if (this.allocateMappedFileService != null) {
                // 创建mappedFile的服务,内部有自己的线程,咱们通过向他提交 request ,
                // 内部线程处理完后 会返回给我们结果  结果 就是 mappedFile对象。

                // 当mappedFileSize >= 1g 的话,这里创建的mappedFile 会执行它的 预热方法。
                mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                    nextNextFilePath, this.mappedFileSize);
            } else {
                try {
                    mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
                } catch (IOException e) {
                    log.error("create mappedFile exception", e);
                }
            }


            if (mappedFile != null) {
                if (this.mappedFiles.isEmpty()) {
                    mappedFile.setFirstCreateInQueue(true);
                }
                this.mappedFiles.add(mappedFile);
            }

            return mappedFile;
        }

        return mappedFileLast;
    }

  • flush
 /**
     * @param flushLeastPages (0 表示强制刷新, > 0 脏页数据必须达到 flushLeastPages 才刷新)
     * @return boolean true 表示本次刷盘无数据落盘   false 表示本次刷盘有数据落盘
     */
    public boolean flush(final int flushLeastPages) {
        boolean result = true;

        // 获取当前正在刷盘的文件 (正在顺序写的mappedFile)
        MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);

        if (mappedFile != null) {
            // 获取mappedFile 最后一条消息的存储时间
            long tmpTimeStamp = mappedFile.getStoreTimestamp();
            // 调用mf 刷盘 ,返回mf的最新的落盘位点
            int offset = mappedFile.flush(flushLeastPages);
            // mf起始偏移量 + mf最新的落盘位点
            long where = mappedFile.getFileFromOffset() + offset;
            // true 表示本次刷盘无数据落盘   false 表示本次刷盘有数据落盘
            result = where == this.flushedWhere;
            // 将最新的目录刷盘位点 赋值给 flushedWhere
            this.flushedWhere = where;

            if (0 == flushLeastPages) {
                this.storeTimestamp = tmpTimeStamp;
            }
        }
        return result;
    }

CommitLog

ConsumeQueue

DefaultMessageStore

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1387133.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

低频信号发生器

前言 最近我快期末考试了&#xff0c;有点忙着复习。没时间写文章&#xff0c;不过学会了焊接 挺开心的所以买几套。 焊得怎么样这就是我们今天故事的主角“低频信号发生器”&#xff08;由于要用到所以这是购买链接&#xff09; 好&#xff0c;故事开始&#xff1a; 如何将…

如何在Eclipse IDE中安装TestNG插件

目录 使用Eclipse Marketplace安装TestNG插件 通过输入URL安装TestNG 1.点击安装新软件 2.输入URL以安装TestNG 3.遵循正常的安装过程 4.重新启动Eclipse 在Eclipse中安装TestNG插件的视频 在这篇文章中&#xff0c;我们将介绍如何在Eclipse IDE中安装TestNG插件&#x…

个人数据备份方案分享(源自一次悲惨经历)

文章目录 1 起源2 备份架构2.1 生活照片2.2 生活录音2.3 微信文件2.4 工作文件2.5 笔记、影视音乐、书籍 3 使用工具介绍3.1 小米云服务3.2 中国移动云盘3.3 小米移动硬盘&#xff08;1T&#xff09;3.4 FreeFileSync 4 总结 1 起源 本文的灵感源于我个人的一次不幸遭遇&#…

C++I/O流——(3)文件输入/输出(第二节)

归纳编程学习的感悟&#xff0c; 记录奋斗路上的点滴&#xff0c; 希望能帮到一样刻苦的你&#xff01; 如有不足欢迎指正&#xff01; 共同学习交流&#xff01; &#x1f30e;欢迎各位→点赞 &#x1f44d; 收藏⭐ 留言​&#x1f4dd; 含泪播种的人一定能含笑收获&#xff…

领域驱动设计——DDD领域驱动设计进阶

摘要 进阶篇主要讲解领域事件、DDD 分层架构、几种常见的微服务架构模型以及中台设计思想等内容。如何通过领域事件实现微服务解耦&#xff1f;、怎样进行微服务分层设计&#xff1f;、如何实现层与层之间的服务协作&#xff1f;、通过几种微服务架构模型的对比分析&#xff0…

快速排序【hoare版本】【挖坑法】【双指针法】(数据结构)

快速排序是Hoare于1962年提出的一种二叉树结构的交换排序方法&#xff0c;其基本思想为&#xff1a;任取待排序元素序列中 的某元素作为基准值&#xff0c;按照该排序码将待排序集合分割成两子序列&#xff0c;左子序列中所有元素均小于基准值&#xff0c;右子序列中所有元素均…

文件上传进阶绕过(二)4个技巧和靶场实战

★★免责声明★★ 文章中涉及的程序(方法)可能带有攻击性&#xff0c;仅供安全研究与学习之用&#xff0c;读者将信息做其他用途&#xff0c;由Ta承担全部法律及连带责任&#xff0c;文章作者不承担任何法律及连带责任。 0、环境准备 请移步《文件上传靶场实战&#xff1a;upl…

小迪安全第二天

文章目录 一、Web应用&#xff0c;架构搭建二、web应用环境架构类三、web应用安全漏洞分类总结 一、Web应用&#xff0c;架构搭建 #网站搭建前置知识 域名&#xff0c;子域名&#xff0c;dns,http/https,证书等 二、web应用环境架构类 理解不同web应用组成角色功能架构 开发…

基于JavaWeb+BS架构+SpringBoot+Vue+Hadoop短视频流量数据分析与可视化系统的设计和实现

基于JavaWebBS架构SpringBootVueHadoop短视频流量数据分析与可视化系统的设计和实现 文末获取源码Lun文目录前言主要技术系统设计功能截图订阅经典源码专栏Java项目精品实战案例《500套》 源码获取 文末获取源码 Lun文目录 目  录 目  录 I 1绪 论 1 1.1开发背景 1 1.2开…

【C++】__declspec含义

目录 一、__declspec(dllexport)如果这篇文章对你有所帮助&#xff0c;渴望获得你的一个点赞&#xff01; 一、__declspec(dllexport) __declspec(dllexport) 是 Microsoft Visual C 编译器提供的一个扩展&#xff0c;用于指示一个函数或变量在 DLL&#xff08;动态链接库&…

VueCli-自定义创建项目

参考 1.安装脚手架 (已安装可以跳过) npm i vue/cli -g2.创建项目 vue create 项目名 // 如&#xff1a; vue create dn-demo键盘上下键 - 选择自定义选型 Vue CLI v5.0.8 ? Please pick a preset:Default ([Vue 3] babel, eslint)Default ([Vue 2] babel, eslint) > M…

OpenHarmony——Linux之IR驱动

Linux之IR驱动 背景 在光谱中波长自760nm至400um的电磁波称为红外线&#xff0c;它是一种不可见光。红外遥控成本很低&#xff0c;以前广泛应用在电视&#xff0c;空调等电器的控制上面&#xff0c;现在随着蓝牙遥控器慢慢普及&#xff0c;红外遥控越来越少&#xff0c;但在某…

leetcode 142 环形链表II

题目 给定一个链表的头节点 head &#xff0c;返回链表开始入环的第一个节点。 如果链表无环&#xff0c;则返回 null。 如果链表中有某个节点&#xff0c;可以通过连续跟踪 next 指针再次到达&#xff0c;则链表中存在环。 为了表示给定链表中的环&#xff0c;评测系统内部使…

阳光抑郁症测试

大部分人对抑郁症的理解&#xff0c;就是每天无精打采&#xff0c;死气沉沉&#xff0c;可实际上&#xff0c;还有一种阳光抑郁症&#xff0c;完全不是这个样子。这种抑郁症的人&#xff0c;做事情非常有活力&#xff0c;魅力十足&#xff0c;给人感觉十分有自信&#xff0c;但…

2024Flutter岗位面试题总结

StatelessWidget和StatefulWidget的区别是什么&#xff1f; StatelessWidget是一个不可变的类&#xff0c;充当UI布局中某些部分的蓝图&#xff0c;当某个组件在显示期间不需要改变&#xff0c;或者说没有状态&#xff08;State&#xff09;&#xff0c;你可以使用它。 Statef…

什么是DDOS高防ip?DDOS高防ip是怎么防护攻击的

随着互联网的快速发展&#xff0c;网络安全问题日益突出&#xff0c;DDoS攻击和CC攻击等网络威胁对企业和网站的正常运营造成了巨大的威胁。为了解决这些问题&#xff0c;高防IP作为一种网络安全服务应运而生。高防IP通过实时监测和分析流量&#xff0c;识别和拦截恶意流量&…

MySQL 协议(非常详细适合小白学习)

MySQL 查询过程 MySQL 查询过程大致如下&#xff1a; 1&#xff09;客户端与服务器端建立连接&#xff1b; 2&#xff09;客户端登陆 MySQL&#xff1b; 3&#xff09;客户端向服务器端发起一条请求&#xff1b; 4&#xff09;服务器端先检查查询缓存&#xff0c;如果命中缓…

ArkUI-X跨平台已至,何需其它!

运行环境 DevEco Studio&#xff1a;4.0Release OpenHarmony SDK API10 开发板&#xff1a;润和DAYU200 自从写了一篇ArkUI-X跨平台的文章之后&#xff0c;好多人都说对这个项目十分关注。 那么今天我们就来完整的梳理一下这个项目。 1、ArkUI-X 我们之前可能更多接触的…

运动模型非线性扩展卡尔曼跟踪融合滤波算法(Matlab仿真)

卡尔曼滤波的原理和理论在CSDN已有很多文章&#xff0c;这里不再赘述&#xff0c;仅分享个人的理解和Matlab仿真代码。 1 单目标跟踪 匀速转弯&#xff08;CTRV&#xff09;运动模型下&#xff0c;摄像头输出目标状态camera_state [x, y, theta, v]&#xff0c;雷达输出目标状…

windows的换行符与linux风格的换行符不同的问题

问题展示&#xff1a; 说明&#xff1a; 出现这个错误的原因是脚本文件包含了windows风格换行符&#xff08;‘\r\n’&#xff09;&#xff0c;而在linux环境下&#xff0c;通常使用unix风格的换行符&#xff08;‘\n’&#xff09;.这个问题通常在windows环境下编辑脚本文件然…