RocketMQ Broker消息处理流程及部分源码解析

news2025/1/13 13:46:01

🍊 Java学习:Java从入门到精通总结

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南


📆 最近更新:2023年2月10日

🍊 个人简介:通信工程本硕 for NJU💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

  • 消息处理流程
  • 消息存储目录结构
  • `SendMessage`源码
    • `processRequest`
    • `sendMessage`

消息处理流程

  1. SendMessageProcessor处理类接收到消息
  2. DefaultMessageStore实例将消息变成IndexFileConsumeQueueCommitLog对象
  3. 上述对象转成内存映射对象后进行落盘
    在这里插入图片描述

消息存储目录结构

RocketMQ的文件存储在store文件夹里,里面包含commitlogconfigconsumerqueueindex文件夹和abortcheckpoint两个文件。

文件夹:

  • commitlog存储写入到commitLog的消息内容
  • config存储配置信息
  • consumerqueue存储消费者队列信息
  • index存储消息队列的索引文件

文件:

  • abort标记RocketMQ是否正常退出
  • checkpoint存储commitlogconfigconsumerqueueindex文件的刷盘时间
├── abort
├── checkpoint
├── commitlog
│   ├── 00000000000000000000
│   ├── 00000000001073741824
├── config
│   ├── consumerFilter.json
│   ├── consumerOffset.json
│   ├── delayOffset.json
│   ├── subscriptionGroup.json
│   ├── topics.json
├── consumequeue
│   ├── TopicA
│   ├── TopicB
│   ├── TopicC
├── index
│   ├── 00000000000000000000
│   ├── 00000000001073741824

RocketMQ内有专门对应磁盘上存储文件的封装类:

  1. CommitLog:对应commitlog文件
  2. ConsumeQueue:对应consumerqueue文件
  3. IndexFile:对应index文件
  4. MappedFile:直接内存映射业务的封装类,通过操作该类实例,可以把消息写入内存映射缓冲区,或将消息刷盘
  5. MappedFileQueue:连续物理存储的封装类,可以通过offset快速定位消息所在的MappedFile
  6. MappedFileBuff:堆外内存

SendMessage源码

SendMessageProcessor是接收消息的一个钩子函数,该类的对象将会处理发送到Broker的消息

processRequest

主要流程已在代码片注释中给出:

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {

    private List<ConsumeMessageHook> consumeMessageHookList;

	public SendMessageProcessor(final BrokerController brokerController) {
        super(brokerController);
    }

	public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
                                                                  RemotingCommand request) throws RemotingCommandException {
        final SendMessageContext mqtraceContext;
        switch (request.getCode()) {
            case RequestCode.CONSUMER_SEND_MSG_BACK:
                return this.asyncConsumerSendMsgBack(ctx, request);
            default:
            	// 解析请求体
                SendMessageRequestHeader requestHeader = parseRequestHeader(request);
                if (requestHeader == null) {
                    return CompletableFuture.completedFuture(null);
                }
                // 建立消息上下文
                mqtraceContext = buildMsgContext(ctx, requestHeader);
                // 发送消息前的逻辑
                this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
                
                if (requestHeader.isBatch()) {
                    return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
                } else {
                    return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
                }
        }
    }

	/**
	* ......
	**/

}

根据源码可以看出,首先解析发送消息的请求SendMessageRequestHeader,然后调用asyncSend(Batch)Message方法进行消息的发送。

该类提供了发送或接收消息的钩子函数:如果发送消息,则调用sendMessage方法,如果是接收消息则调用pullMessage拉取消息的方法。


sendMessage

消息发送给Broker服务器时,调用的是sendMessage方法接收并存储消息,主要流程已在代码片注释中给出:

    private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                                SendMessageContext mqtraceContext,
                                                                SendMessageRequestHeader requestHeader) {
		// 初始化响应
        final RemotingCommand response = preSend(ctx, request, requestHeader);
        // 构建响应头
        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();

        if (response.getCode() != -1) {
            return CompletableFuture.completedFuture(response);
        }

        final byte[] body = request.getBody();

        int queueIdInt = requestHeader.getQueueId();
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());

        if (queueIdInt < 0) {
            queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
        }

        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setTopic(requestHeader.getTopic());
        msgInner.setQueueId(queueIdInt);

        if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
            return CompletableFuture.completedFuture(response);
        }
		// 设置消息体数据
        msgInner.setBody(body);
        msgInner.setFlag(requestHeader.getFlag());
        Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
        MessageAccessor.setProperties(msgInner, origProps);
        msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
        msgInner.setBornHost(ctx.channel().remoteAddress());
        msgInner.setStoreHost(this.getStoreHost());
        msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
        // 获取Broker集群名称
        String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
        // 同步等待消息存储成功
        if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) {
            String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);
            msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
            origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);
        } else {
            // 异步
            msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
        }
        CompletableFuture<PutMessageResult> putMessageResult = null;
        String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        if (transFlag != null && Boolean.parseBoolean(transFlag)) {
        	// Broker拒绝接收消息
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark(
                        "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                                + "] sending transaction message is forbidden");
                return CompletableFuture.completedFuture(response);
            }
            putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
        } else {
            putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
        }
        return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/335926.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

wait 和 notify

由于线程之间是抢占式执行的, 因此线程之间执行的先后顺序难以预知.但是实际开发中有时候我们希望合理的协调多个线程之间的执行先后顺序. wait() / wait(long timeout): 让当前线程进入等待状态.notify() / notifyAll(): 唤醒在当前对象上等待的线程. 注意: wait, notify,…

ChatGPT大火,你被这四种手段“割韭菜”了嘛?

目录 黑灰产利用ChatGPT的牟利方式 1、贩卖ChatGPT账号 2、售卖ChatGPT注册工具 3、制作山寨版ChatGPT 4、制作ChatGPT教程 ChatGPT对业务安全的帮助 提升漏洞挖掘的广度和精度 提升业务安全情报的效率和广度 ChatGPT自身的安全隐患 2022年11月&#xff0c;ChatGPT发布时&#…

FastJson序列化和反序列化时处理数据

FastJson序列化和反序列化时处理数据序列化时处理数据反序列化时处理json数据中的值https://github.com/alibaba/fastjson/wiki/PropertyProcessable_cn https://www.cnblogs.com/ariter/p/14254342.html 序列化时处理数据 1、自定义注解用来标识json数据需要处理的属性 impo…

ESP32S3系列--SPI主机驱动详解(二)

一、目的 在上一篇《ESP32S3系列--SPI主机驱动详解(一)》我们介绍了ESP32S3的SPI外设的基本情况以及主机驱动的一些知识点,包括主机驱动的特点、总线的初始化、从设备的加入、传输模式分类等等。 本篇我们将从代码角度帮助大家进一步理解传输接口的一些细节问题。 二、实战 …

6个常见的 PHP 安全性攻击

了解常见的PHP应用程序安全威胁&#xff0c;可以确保你的PHP应用程序不受攻击。因此&#xff0c;本文将列出 6个常见的 PHP 安全性攻击&#xff0c;欢迎大家来阅读和学习。 1、SQL注入 SQL注入是一种恶意攻击&#xff0c;用户利用在表单字段输入SQL语句的方式来影响正常的SQL执…

Blender——物体的随机分布

问题描述将正方体随机分布在平面上。问题解决点击编辑-->偏好设置。在【插件】中的【物体】类型中勾选【Object: Scatter Objects】。右下的活动工具与工作区设置中就会出现【物体散列】的模块&#xff0c;可以调节各参数。选中正方体&#xff0c;按着Shift&#xff0c;选中…

关于 OAuth 你又了解哪些?

作者罗锦华&#xff0c;API7.ai 技术专家/技术工程师&#xff0c;开源项目 pgcat&#xff0c;lua-resty-ffi&#xff0c;lua-resty-inspect 的作者。 OAuth 的背景 OAuth&#xff0c;O 是 Open&#xff0c;Auth 是授权&#xff0c;也就是开放授权的意思。OAuth 始于 2006 年&a…

error when starting dev server:Error: Failed to resolve vue/compiler-sfc.

对于node 的包管理工具&#xff0c;我一般习惯用 yarn&#xff0c;但是最近使用 yarn 创建前端项目的时候出了一些问题。yarn create vite vite-project报错如下&#xff1a;error when starting dev server:Error: Failed to resolve vue/compiler-sfc.vitejs/plugin-vue requ…

[Arxiv 2022] A Novel Plug-in Module for Fine-Grained Visual Classification

Contents MethodPlug-in ModuleLoss functionExperimentsReferencesMethod Plug-in Module Backbone:为了帮助模型抽取出不同尺度的特征,作者在 backbone 里加入了 FPNWeakly Supervised Selector:假设 backbone 的 i i

LayUI渲染数据失败之Ajax异步交互

案例 在layui中调用jquery的ajxa&#xff0c;返回数据&#xff0c;赋值给全局变量&#xff0c;通过DOM渲染到页面。 //定义变量 let sale;//定义请求 $.ajax({type: "GET",url: "http://localhost:8080/product/sale",data: null,dataType: "json&q…

离散数学 课时一 命题逻辑的基本概念

1 命题 1、命题&#xff1a;可以判断其真值的陈述句 2、真值&#xff1a;真或者假(1或者0) 3、真命题&#xff1a;真值为真的命题 4、假命题&#xff1a;真值为假的命题 5、原子命题&#xff1a;不可以再被分解成更简单的命题 6、复合命题&#xff1a;由原子命题通过联结词联结…

12 Day:内存管理

前言&#xff1a;今天我们要完成我们操作系统的内存管理&#xff0c;以及一些数据结构和小组件的实现&#xff0c;在此之前大家需要了解我们前几天一些重要文件的内存地址存放在哪&#xff0c;以便我们更好的去编写内存管理模块 一&#xff0c;实现ASSERT断言 不知道大家有没有…

< Linux >:Linux 进程概念 (4)

目录 五、孤儿进程 六、进程优先级 6.1、基本概念 6.2、查看时实系统进程 6.3、PRI and NI 七、其他概念 四、X 状态&#xff1a;死亡状态 所谓进程处于 X 状态(死亡状态)代表的就是该进程已经死亡了&#xff0c;即操作系统可以随时回收它的资源(操作系统也可以…

代码质量与安全 | 开发人员必备的安全编码实践指南

在任何新的软件开发项目开始时&#xff0c;您就应该考虑软件安全。开始一个新项目或许会令人望而生畏&#xff0c;因为有许多的决定要做&#xff0c;有许多想法必须考虑清楚。通常来说&#xff0c;这些决定和想法包括了定义项目需求、选择正确的流程、选择正确的工具以及确保软…

QML- 导入库包语法

QML- 导入库包语法一、概述二、Import语句的语法1. Module (namespace) 模块(命名空间)导入1. 非模块命名空间的导入2. 导入到限定局部命名空间2. 路径 import1. 本地目录导入2. 远程目录3. JavaScript资源导入三、QML导入路径四、调试一、概述 import 语句其实在QML文档里面体…

Springboot扩展点之SmartInstantiationAwareBeanPostProcessor

前言这是Springboot扩展点系列的第5篇了&#xff0c;主要介绍一下SmartInstantiationAwareBeanPostProcessor扩展点的功能特性、和实现方式。SmartInstantiationAwareBeanPostProcessor与其他扩展点最明显的不同&#xff0c;就是在实际的业务开发场景中应用到的机会并不多&…

机器学习框架sklearn之特征降维

目录特征降维概念特征选择过滤式①低方差特征过滤②相关系数③主成分分析特征降维 0维 标量 1维 向量 2维 矩阵 概念 降维是指在某些限定条件下&#xff0c;降低随机变量&#xff08;特征&#xff09;个数&#xff0c;得到一组“不相关”主变量的过程 注&#xff1a;正是…

微信小程序 java 医生预约挂号答疑问询系统

生预约答疑系统用户端是基于微信小程序端&#xff0c;医生和管理员是基于网页后端。本系统分为用户&#xff0c;管理员&#xff0c;医生三个角色&#xff0c;用户的主要功能是注册登陆小程序&#xff0c;查看新闻资讯&#xff0c;查看医生列表&#xff0c;预约医生&#xff0c;…

【unity细节】关于资源商店(Package Maneger)无法下载资源问题的解决

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! 本文由 秩沅 原创 收录于专栏&#xff1a;unity细节和bug ⭐关于资源商店为何下载不了的问题⭐ 文章目录⭐关于资源商店为何下载不了的问题…

鸟哥的Linux私房菜读书笔记:文件系统的简单操作

磁盘与目录的容量 现在我们知道磁盘的整体数据实在superblock区块中,但是每个个别文件的容量则在inode当中记载的. 那在命令行下面该如何显示处这几个数据呢? df:列出文件系统的整体磁盘书用量du:评估文件系统的磁盘使用量(常用在推估目录所占容量)df先来说明一下范例一所输…