【Alibaba中间件技术系列】「RocketMQ技术专题」RocketMQ消息发送的全部流程和落盘原理分析

news2025/1/22 15:08:02

RocketMQ目前在国内应该是比较流行的MQ 了,目前本人也在公司的项目中进行使用和研究,借着这个机会,分析一下RocketMQ 发送一条消息到存储一条消息的过程,这样会对以后大家分析和研究RocketMQ相关的问题有一定的帮助。

分析的总体技术范围发送到存储,本文的主要目的是主要是为了认识一条消息并分析被发出且被存储的,代码中,关于 MQ 文件系统的优化,设计等。

来自官方源码example的一段发送代码:

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
producer.shutdown();

直接看看send方法,send 方法会设置一个默认的 timeout:3秒。默认使用 SYNC 模式,另外有Async和OneWay模式。需要处理方法签名中的 Client 端的异常,网络异常,Broker 端的异常,线程中断异常。

DefaultMQProducerImpl 的 sendDefaultImpl方法就是发送的主要逻辑。

代码里,有个地方可以提一下,关于更新故障时间的策略,RocketMQ有一个类 MQFaultStrategy,用来处理MQ错误,然后对 MQ Server 进行服务降级。

如果发送一条消息在550ms以内,那么就不用降级,如果550毫秒以外,就进行容错降级(熔断)30 秒,以此类推。

再看DefaultMQProducerImpl 的 sendKernelImpl发送到内核的方法实现。

先找到broker的地址。尝试压缩大于4M 的消息(批量消息不压缩),然后执行各种钩子。

  • Request对象(存放数据)
  • Context 上下文对象(存放调用上下文)。

这里会设置一个消息生成时间,即bornTimestamp,后面使用消息轨迹的时候,可以查看。

默认情况下:如果采用SYNC 模式,就调用 MQClientAPIImpl 来发送消息,这一层还是在 Client 模块里,在这一层,会设置更详细的消息细节,构造命令对象。最后调用 remotingClient的 invokeSync 发送消息。

MQClientAPIImpl的sendMessage这一层,会给命令对象设置一个CmdCode,叫SEND_MESSAGE,这个东西就是一个和Broker的契约,Broker会根据这个Code进行不同的策略。

Netty 会使用 Handler 处理出去的数据和返回的数据,我们看看 Client 端 Netty 有哪些 Handler.

Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.SO_KEEPALIVE, false)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
            .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
            .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
            .handler(new ChannelInitializer() {

                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    if (nettyClientConfig.isUseTLS()) {
                        if (null != sslContext) {
                            pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
                            log.info("Prepend SSL handler");
                        } else {
                            log.warn("Connections are insecure as SSLContext is null!");
                        }
                    }
                    pipeline.addLast(
                        defaultEventExecutorGroup,
                        new NettyEncoder(),
                        new NettyDecoder(),
                        new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                        new NettyConnectManageHandler(),
                        new NettyClientHandler());
                }
            });

使用了一个 Encoder,Decoder,空闲处理器,连接管理器,ClientHandler。

XXCoder就是对Cmd对象进行序列化和反序列化的,这里的空闲使用的读写最大空闲时间为120s,超过这个,就会触发空闲事件。

  • RocketMQ就会关闭Channel 连接。而针对空闲事件进行处理的就是连接管理器了。
  • 连接管理器处理空闲、Close、Connect、异常等事件,使用监听器模式,不同的监听器对不同的事件进行处理。另外,这里也许可以借鉴 EventBus,每个事件可以设置多个监听器。

看了RocketMQ中 Netty 的设计,再看看返回值处理就简单了,NettyClientHandler 会在 channelRead0 方法处理 Netty Server 的返回值。对应 RMQ,则是 processMessageReceived 方法。该方法很简洁:

public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }

其实,这是一个模板方法,固定算法,由子类实现,分为 Request 实现和 Response 实现。我们看看 Response 实现。

public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
        final int opaque = cmd.getOpaque();

        final ResponseFuture responseFuture = responseTable.get(opaque);
        if (responseFuture != null) {
            responseFuture.setResponseCommand(cmd);
            responseTable.remove(opaque);
            if (responseFuture.getInvokeCallback() != null) {
                executeInvokeCallback(responseFuture);
            } else {
                responseFuture.putResponse(cmd);
                responseFuture.release();
            }
        } else {
            log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
            log.warn(cmd.toString());
        }
    }

通过 cmd 对象的 Request ID 找到 Feature,执行 responseFuture.putResponse,设置返回值,唤醒阻塞等待的发送线程。

这里还有一个 release 调用,这个和异步发送有关,默认最大同时 65535 个异步请求,具体就不展开了。

到这里,唤醒阻塞的发送线程,返回数据,客户端层面的发送就结束了。

看源码,看到有个 SEND_MESSAGE Code,是 Client 和 Broker Server 的一个约定代码,我们看看这个代码在哪里用的。

在 broker 模块的 BrokerController 类中,有个 registerProcessor 方法,会将 SEND_MESSAGE Code 和一个 SendMessageProcessor 对象绑定。

NettyRemotingServer是处理Request 的类,ServerBootstrap 会在 pipeline 中添加一个 NettyServerHandler处理器,这个处理器的channelRead0方法会调用 NettyRemotingServer的父类processMessageReceived 方法。

从processorTable 里,根据 Cmd Code,也就是 SEND_MESSAGE 获取对应的 Processor

一部分是处理数据的对象,一部分是这个对象所对应的线程池。用于异步处理逻辑,防止阻塞 Netty IO线程。

doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);

前后都是执行一些钩子,例如 ACL

RocketMQ会有一个 BrokerController 类,会注册 Code 和 Processor 的绑定关系,BrokerController 也会把这些绑定,注册到 Netty Server 中,当 Netty Server 从 Socket 收到 Cmd 对象,根据 Cmd 对象的 Code,就可以找到对应 Processor 类,对数据进行处理。

中间是处理 Request请求的。这个 processRequest 方法,有很多的实现,SendMessageProcessor的sendMessage 是处理消息的主要逻辑。

消息存储引擎,这里我们看DefaultMessageStore的putMessage 实现。

putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

由于RocketMQ写数据是PageCache里面写的,因此,如果写的慢,就是 PageCache 忙,这里忙的标准是,如果锁文件的时间,超过了 1 秒,那就是忙。

if (this.isOSPageCacheBusy()) {
    return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}

最后调用 PutMessageResult result = this.commitLog.putMessage(msg) 写数据。如果耗时超过 500 毫秒,就会打印日志。这样我们排查问题的时候,可以看看 storeStats 的日志。

result = mappedFile.appendMessage(msg, this.appendMessageCallback)

写完之后,释放锁,如果超过 500 毫秒,打印 cost time 日志。

处理刷盘和slave 同步,这里看刷盘策略和同步策略,是 SYNC 还是 ASYNC。经过我的测试,同步刷盘和异步刷盘的性能差距是 10 倍。

而 Slave 的数据同步,如果用 SYNC 模式,tps 最高也就 2000 多一丢度,为什么?内网,两台机器 ping 一下都要 0.2 毫秒,一秒最多 5000 次,再加上处理逻辑, 2000 已经到顶了,网络成了瓶颈。

我们看看 mappedFile.appendMessage 方法的实现。一路追踪,有个关键逻辑, 在 appendMessagesInner 里:

int currentPos = this.wrotePosition.get();
if (currentPos < this.fileSize) {
    ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
    byteBuffer.position(currentPos);
    AppendMessageResult result = null;
    if (messageExt instanceof MessageExtBrokerInner) {

        result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
    } else if (messageExt instanceof MessageExtBatch) {
        result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
    } else {
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }
    this.wrotePosition.addAndGet(result.getWroteBytes());
    this.storeTimestamp = result.getStoreTimestamp();
    return result;
}

代码中,使用了 mappedFile 从 Linux 映射的 MMap buffer,对数据进行写入。我们看看 doAppend 方法。

  • 如果是 SYNC 模式,执行 CommitLog 的 handleDiskFlush 的方法时,就会立刻刷盘并等待刷盘结果。
  • 如果是 ASYNC 模式,执行 CommitLog 的 handleDiskFlush 的方法时,会通知异步线程进行刷盘,但不等待结果。

如果没有新数据,则为 500ms 执行一次刷盘策略。

简单说下异步刷盘:

默认刷盘 4 页,Linux 一页是 4kb 数据,4页就是 16kb。

如果写的数据减去已经刷的数据,剩下的数据大于等于 4 页,就执行刷盘,执行 mappedByteBuffer.force() 或者 fileChannel.force(false);

分享资源

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
获取以上资源请访问开源项目 点击跳转

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/957636.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何增长LLM推理token,从直觉到数学

背景&#xff1a; 最近大模型输入上文长度增长技术点的研究很火。为何要增长token长度,为何大家如此热衷于增长输入token的长度呢&#xff1f;其实你如果是大模型比价频繁的使用者&#xff0c;这个问题应该不难回答。增长了输入token的长度&#xff0c;那需要多次出入才能得到…

【LeetCode】383. 赎金信 - hashmap/数组

这里写自定义目录标题 2023-8-28 22:54:39 383. 赎金信 2023-8-28 22:54:39 次数 ----> hashmap 和 数组来进行实现。 public class Solution {public boolean canConstruct(String ransomNote, String magazine) {// num 用于存储小写字母出现的次数int[] num new in…

vue报错RangeError: Maximum call stack size exceeded

这种情况&#xff0c;一般是跳转路由时发生此类错误&#xff0c;像我的就是如此。比如路由指向的vue文件里代码有错误&#xff0c;或者设置路由时重定向了路由自己&#xff0c;造成死循环。 1、首先检查自己跳转的路由地址的代码本身是否有语法错误之类的&#xff0c;造成错误…

如何实现的手机实景自动直播,都有哪些功能呢?

手机实景自动直播最近真的太火了&#xff0c;全程只需要一部手机&#xff0c;就能完成24小时直播带货&#xff0c;不需要真人出镜&#xff0c;不需要场地&#xff0c;不需要搭建直播间&#xff0c;只需要一部手机就可以了。真人语音讲解&#xff0c;真人智能回复&#xff0c;实…

「操作系统」1. 基础

前言&#xff1a;操作系统基础八股文 文章目录 一 、操作系统基础1.1 什么是操作系统&#xff1f;1.2 什么是系统调用1.3 什么是中断 &#x1f680; 作者简介&#xff1a;作为某云服务提供商的后端开发人员&#xff0c;我将在这里与大家简要分享一些实用的开发小技巧。在我的职…

腾讯云-对象存储服务(COS)的使用总结-JavaScript篇

简介 对象存储&#xff08;Cloud Object Storage&#xff0c;COS&#xff09;是腾讯云提供的一种存储海量文件的分布式存储服务&#xff0c;具有高扩展性、低成本、可靠安全等优点。通过控制台、API、SDK 和工具等多样化方式&#xff0c;用户可简单、快速地接入 COS&#xff0…

ChatGPT帮助高职院校学生实现个性化自适应学习与对话式学习

一、学习层面&#xff1a;ChatGPT帮助高职院校学生实现个性化自适应学习与对话式学习 1.帮助高职院校学生实现个性化自适应学习 数字技术的飞速发展引起了教育界和学术界对高职院校学生个性化自适应学习的更多关注和支持&#xff0c;其运作机制依赖于人工智能等技术&#xff0…

面经:微服务

文章目录 参考资料一. 微服务概述1. CAP理论2. BASE理论3. SpringBoot 与 SpringCloud对比 二. 服务注册&#xff1a;Zookeeper,Eureka,Nacos,Consul1. Nacos两种健康检查方式&#xff1f;2. nacos中负责负载均衡底层是如何实现的3. Nacos原理4. 临时实例和持久化(非临时)实例 …

微信小程序校园生活小助手+后台管理系统|前后分离VUE

《微信小程序校园生活小助手后台管理系统|前后分离VUE》该项目含有源码、文档等资料、配套开发软件、软件安装教程、项目发布教程等 本系统包含微信小程序前台和Java做的后台管理系统&#xff0c;该后台采用前后台前后分离的形式使用JavaVUE 微信小程序——前台涉及技术&#…

【RISC-V】RISC-V寄存器

一、通用寄存器 32位RISC-V体系结构提供32个32位的整型通用寄存器寄存器别名全称说明X0zero零寄存器可做源寄存器(rs)或目标寄存器(rd)X1ra链接寄存器保存函数返回地址X2sp栈指针寄存器指向栈的地址X3gp全局寄存器用于链接器松弛优化X4tp线程寄存器常用于在OS中保存指向进程控…

金融风控数据分析-信用评分卡建模(附数据集下载地址)

本文引用自&#xff1a; 金融风控&#xff1a;信用评分卡建模流程 - 知乎 (zhihu.com) 在原文的基础上加上了一部分自己的理解&#xff0c;转载在CSDN上作为保留记录。 本文涉及到的数据集可直接从天池上面下载&#xff1a; Give Me Some Credit给我一些荣誉_数据集-阿里云…

数字化新零售平台系统提供商,门店商品信息智慧管理-亿发进销存

传统的批发零售业务模式正面临着市场需求变化的冲击。用户日益注重个性化、便捷性和体验感&#xff0c;新兴的新零售模式迅速崛起&#xff0c;改变了传统的零售格局。如何在保持传统业务的基础上&#xff0c;变革发展&#xff0c;成为了业界亟需解决的问题。 在这一背景下&…

Abaqus三维随机多面体插件—AbyssFish – Random Polyhedron Aggregate

插件介绍 AbyssFish – Random Polyhedron Aggregate 插件可在Abaqus软件内批量生成随机分布的三维多面体骨料模型。插件可指定骨料分布的区域、三种尺寸的粒径分布范围、多面体面数、各尺寸骨料的数量等信息&#xff0c;同时可控制骨料间的最小间距及插件的运行时间控制。 使…

Linux常用工具(pidstat stress cgroup)

目录 1.pidstat 2.stress 3.cgroup 4.使用cgroup进行内存限制 5.使用cgroup进行cpu使用率控制 1.pidstat 安装和使用(centos): yum install sysstats yum remove sysstats pidstat -u&#xff08;默认&#xff09;&#xff0c;查看进程cpu使用情况&#xff1a; pidstat …

Spring MVC: 请求参数的获取

Spring MVC 前言通过 RequestParam 注解获取请求参数RequestParam用法 通过 ServletAPI 获取请求参数通过实体类对象获取请求参数附 前言 在 Spring MVC 介绍中&#xff0c;谈到前端控制器 DispatcherServlet 接收客户端请求&#xff0c;依据处理器映射 HandlerMapping 配置调…

解决 beego上传文件时 报http: no such file 错误

上传时文件上传失败: 关键报错的代码: //获得文件名filename := header.Filename//上传文件//注意,这里SaveToFile参数要跟传入的文件名的key一致,否则就会报http: no such fileerr = f.SaveToFile(filename, "./static/file/"+filename)if err != nil {logs.Error(e…

大数据课程K13——Spark的距离度量相似度度量

文章作者邮箱:yugongshiye@sina.cn 地址:广东惠州 ▲ 本章节目的 ⚪ 掌握Spark的距离度量和相似度度量; ⚪ 掌握Spark的欧氏距离; ⚪ 掌握Spark的曼哈顿距离; ⚪ 掌握Spark的切比雪夫距离; ⚪ 掌握Spark的最小二乘法; 一、距离度量和相似度度量 1. …

linux系统(centos、Ubuntu、银河服务器)备份

制作u盘启动盘 下载usblive系统镜像 Get Kali | Kali Linux 下载u盘启动工具 balenaEtcher - Flash OS images to SD cards & USB drives 点击下载&#xff0c;等待下载完成 双击安装&#xff0c;等待安装完成 双击 启动 选择镜像 选择U盘 开始烧录 等地制作完成 进入…

PowerBuilder连接SQLITE3

PowerBuilder,一个古老的IDE,打算陆续发些相关的,也许还有人需要,内容可能涉及其他作者,但基本都是基于本人实践整理,如涉及归属,请联系. SQLite,轻型数据库,相对与PowerBuilder来说是个新事务,故发数来,以供参考. PB中使用OLE Microsoft OLE DB方式进行连接,如下 // Profile…

苹果启动2024年SRDP计划:邀请安全专家使用定制iPhone寻找漏洞

苹果公司昨天&#xff08;8月30日&#xff09;正式宣布开始接受2024 年iPhone安全研究设备计划的申请&#xff0c;iOS 安全研究人员可以在 10 月底之前申请安全研究设备 SRD。 SRD设备是专门向安全研究人员提供的iPhone14Pro&#xff0c;该设备具有专为安全研究而设计的特殊硬…