RocketMQ源码阅读-Broker消息接收

news2025/1/24 22:49:39

RocketMQ源码阅读-Broker消息接收

  • 1. 从单元测试入手
  • 2. Broker启动流程
  • 3. Broker接收消息
  • 4. Broker接收消息时序图
  • 5. 小结

Broker接收 Producer发送的消息。

Broker在RocketMQ中也是一个独立的Model,rocketmq-broker。

Broker的核心类为SendMessageProcessor。
image.png

1. 从单元测试入手

同样从单元测试入手,看Broker接收消息的流程。
SendMessageProcessor的单元测试类为org.apache.rocketmq.broker.processor.SendMessageProcessorTest。
image.png
包含上面这些方法,其中init()方法是启动类,其他是测试流程的方法。
先看init()方法中Broker的启动流程。

2. Broker启动流程

单元测试中的inti()方法为:

@Before
public void init() {
    brokerController.setMessageStore(messageStore);
    when(messageStore.now()).thenReturn(System.currentTimeMillis());
    Channel mockChannel = mock(Channel.class);
    when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(1024));
    when(handlerContext.channel()).thenReturn(mockChannel);
    when(messageStore.lookMessageByOffset(anyLong())).thenReturn(new MessageExt());
    sendMessageProcessor = new SendMessageProcessor(brokerController);
}

init方法新建一个SendMessageProcessor,并一个传入brokerController,指定一个messageStore:

  • BrokerController: Broker的管理器
  • MessageStore: 消息存储的接口

继续看接收消息的流程SendMessageProcessor#sendMessage。

3. Broker接收消息

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
                                      RemotingCommand request) throws RemotingCommandException {
    SendMessageContext mqtraceContext;
    switch (request.getCode()) {
        case RequestCode.CONSUMER_SEND_MSG_BACK:
            return this.consumerSendMsgBack(ctx, request);
        default:
            // 解析请求
            SendMessageRequestHeader requestHeader = parseRequestHeader(request);
            if (requestHeader == null) {
                return null;
            }

            // 发送请求Context。在hook场景下使用
            mqtraceContext = buildMsgContext(ctx, requestHeader);
            // hook:处理发送消息前逻辑
            this.executeSendMessageHookBefore(ctx, request, mqtraceContext);

            RemotingCommand response;
            if (requestHeader.isBatch()) {
                // 处理批量发消息逻辑
                response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
            } else {
                // 处理发送消息逻辑
                response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
            }

            this.executeSendMessageHookAfter(response, mqtraceContext);
            return response;
    }
}

可以看到,此方法负责解析RPC请求,最终是调用SendMessageProcessor#sendMessage方法:

private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
                                    final RemotingCommand request,
                                    final SendMessageContext sendMessageContext,
                                    final SendMessageRequestHeader requestHeader) throws RemotingCommandException {

    // 初始化响应
    final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
    final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();

    response.setOpaque(request.getOpaque());

    response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
    response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));

    log.debug("receive SendMessage request command, {}", request);

    // 如果未开始接收消息,抛出系统异常
    final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
    if (this.brokerController.getMessageStore().now() < startTimstamp) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
        return response;
    }

    // 消息配置(Topic配置)校验
    response.setCode(-1);
    super.msgCheck(ctx, requestHeader, response);
    if (response.getCode() != -1) {
        return response;
    }

    final byte[] body = request.getBody();

    // 如果队列小于0,从可用队列随机选择
    int queueIdInt = requestHeader.getQueueId();
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());

    if (queueIdInt < 0) {
        queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
    }

    // 创建MessageExtBrokerInner
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setTopic(requestHeader.getTopic());
    msgInner.setQueueId(queueIdInt);

    if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
        return response;
    }

    msgInner.setBody(body);
    msgInner.setFlag(requestHeader.getFlag());
    MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
    msgInner.setPropertiesString(requestHeader.getProperties());
    msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
    msgInner.setBornHost(ctx.channel().remoteAddress());
    msgInner.setStoreHost(this.getStoreHost());
    msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
    PutMessageResult putMessageResult = null;
    Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
    String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    // 校验是否不允许发送事务消息
    if (traFlag != null && Boolean.parseBoolean(traFlag)) {
        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark(
                "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                    + "] sending transaction message is forbidden");
            return response;
        }
        putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
    } else {
        // 添加消息
        putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
    }
	// 处理result
    return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);

}

可以看到,此方法进行了topic的校验,并创建创建MessageExtBrokerInner,随后添加消息的流程主要是调用了MessageStore#putMessage方法:
MessageStore是接口,其默认实现为DefaultMessageStore:
image.png
DefaultMessageStore#putMessage方法:

public PutMessageResult putMessage(MessageExtBrokerInner msg) {
    if (this.shutdown) {
        log.warn("message store has shutdown, so putMessage is forbidden");
        return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
    }

	// 从节点不允许写入
    if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
        long value = this.printTimes.getAndIncrement();
        if ((value % 50000) == 0) {
            log.warn("message store is slave mode, so putMessage is forbidden ");
        }

        return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
    }
	// store是否允许写入
    if (!this.runningFlags.isWriteable()) {
        long value = this.printTimes.getAndIncrement();
        if ((value % 50000) == 0) {
            log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
        }

        return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
    } else {
        this.printTimes.set(0);
    }
	// 消息过长
    if (msg.getTopic().length() > Byte.MAX_VALUE) {
        log.warn("putMessage message topic length too long " + msg.getTopic().length());
        return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
    }
	// 消息附加属性过长
    if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
        log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
        return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
    }

    if (this.isOSPageCacheBusy()) {
        return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
    }

    long beginTime = this.getSystemClock().now();
	// 添加消息到commitLog
    PutMessageResult result = this.commitLog.putMessage(msg);

    long eclipseTime = this.getSystemClock().now() - beginTime;
    if (eclipseTime > 500) {
        log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
    }
    this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);

    if (null == result || !result.isOk()) {
        this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
    }

    return result;
}

可以看到,首先是检查Broker是否可以写入,从节点不能写入,然后消息经过一系列的格式与大小校验,最终通过CommitLog.putMessage进行存储。

4. Broker接收消息时序图

SendMessageProcessor_processRequest.png

5. 小结

Producer作为客户端发送消息后,Broker作为服务端需要接收消息并存储消息。
本篇分析了broker接收消息的流程:

  • SendMessageProcessor#processRequest是RPC执行接收消息的方法,此方法主要负责解析RPC请求
  • processRequest调用SendMessageProcessor#sendMessage方法,进行了topic的校验,并创建创建MessageExtBrokerInner,随后添加消息的流程主要是调用了DefaultMessageStore#putMessage方法
  • DefaultMessageStore#putMessage方法检查Broker是否可以写入,从节点不能写入,然后消息经过一系列的格式与大小校验,最终通过commitLog.putMessage进行存储

消息存储是通过CommitLong#putMessage进行的,这个流程在下一篇《RocketMQ源码阅读-Broker消息存储》学习。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1383555.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vue项目之.env文件.env.dev、test、pro

.env文件是vue运行项目时的环境配置文件。 .env: 全局默认配置文件&#xff0c;所有环境(开发、测试、生产等&#xff09;均会加载并合并该文件 .env.development(开发环境默认命名) 开发环境的配置&#xff0c;文件名默认为.env.development,如果需要改名也是可以的&#xf…

[小程序]定位功能实现

第一步:首先要认识三个小程序的 api wx.chooseLocation 和 wx.getLocation 和 wx.openLocation (1).wx.chooseLocation 用于在小程序中选择地理位置。当用户点击选择位置按钮时&#xff0c;小程序会调起地图选择界面&#xff0c;用户可以在地图上选择一个位置&#xff0c;并可以…

openfeign服务启动成功但是注册不上nacos? 我看看怎么个事儿!

spring-cloud-starter-alibaba-nacos-discovery和spring-boot-starter-web不得不说的秘密 ! 直接上答案: 给你的服务加上springbootweb依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifa…

C语言程序设计——程序流程控制方法(二)

循环结构 while语句 while(表达式){代码块; }do{代码块; }while(表达式)while语句分为do-while和while两种&#xff0c;区别在于循环之前是不是先执行一次循环的内容&#xff0c;可以类似于i和i的关系&#xff0c;本质上来讲是相同的。当表达式为真时&#xff0c;则会执行一次…

Python二级:二叉树问题求解

一、题源 在Python二级考试中前10道基础题是必考题&#xff0c;虽然没有什么卵用&#xff0c;但是你得分不达标&#xff0c;还不让你过&#xff0c;没有办法只好硬着头皮去刷题了。这10道题中有一个二叉树题比较难&#xff0c;现摘录如下&#xff0c;同时给出gpt-4的解答&…

【数据开发】BI数据报表之数据可测试性设计与分析

文章目录 1、什么是BI&数据报表2、什么是可测试性3、数据测试与方法3.1 数据准确性与对比&#xff08;重要&#xff09;3.2 数据安全性 1、什么是BI&数据报表 数据报表是一种数据可视化工具 用于将数据以图表、表格和其他可视化形式呈现出来&#xff0c;以便用户可以…

mysql-实战案例 (超详细版)

&#x1f389;欢迎您来到我的MySQL基础复习专栏 ☆* o(≧▽≦)o *☆哈喽~我是小小恶斯法克&#x1f379; ✨博客主页&#xff1a;小小恶斯法克的博客 &#x1f388;该系列文章专栏&#xff1a;重拾MySQL &#x1f379;文章作者技术和水平很有限&#xff0c;如果文中出现错误&am…

【DC系列教程2--Timing and Area Constrains】

DC系列教程2--Timing and Area Constrains Lab Flow:依赖输入Design SpecificationLab Demo Goal: determin the unit of time in the target library; //设置时间精度Create a Design Compiler timing and area constrains file based on a provided schematic and specifacat…

Airflow大揭秘:如何让大数据任务调度变得简单高效?

介绍&#xff1a;Airflow是一个开源的、用于创建、调度和监控数据管道的工作流平台。这个平台使用Python编写&#xff0c;并通过有向无环图&#xff08;Directed Acyclic Graph, DAG&#xff09;来管理任务流程&#xff0c;使得用户不需要知道业务数据的具体内容&#xff0c;只…

AI副业拆解:随心所欲地替换任何内容

在瞬息万变的世界里&#xff0c;保持“物体ID”的核心特质&#xff0c;同时创造无限可能的新内容&#xff0c;这是一场市场需求与技术挑战的双重交响。此刻&#xff0c;为您揭开一款颠覆性创新产品——ReplaceAnything框架。 直击痛点&#xff0c;破茧成蝶&#xff0c;Replace…

RV1126边缘计算AI盒子,支持4-6路1080p视频,2T 算力

1 产品概述 信迈推出基于瑞芯微Rockchip RV1126架构的AI边缘计算主板&#xff0c;RV1126芯片是四核ARM Cortex-A7,1.5GHz&#xff0c; RSIC-V 200MHz CPU &#xff0c;NPU2.0Tops。AI边缘计算主板外围接口丰富&#xff0c;拥有超强扩展性&#xff0c;可广泛应用在智慧安防、工…

Brc20钱包横评推荐:谁更适合玩铭文?

加密货币的世界越来越热闹&#xff0c;新的创意层出不穷&#xff01;最近&#xff0c;BRC-20 通证标准成了这个圈子的新宠儿&#xff0c;这是在比特币网络上诞生的一种超酷的新型可替代通证。和以太坊的 ERC-20 通证一样牛&#xff0c;但 BRC-20 通证是 Ordinals 协议的杰作&am…

洛谷 P1439 【模板】最长公共子序列【线性dp+dp模型转换】

原题链接&#xff1a;https://www.luogu.com.cn/problem/P1439 题目描述 给出 1,2,…,n 的两个排列 P1​ 和 P2​ &#xff0c;求它们的最长公共子序列。 输入格式 第一行是一个数 n。 接下来两行&#xff0c;每行为 n 个数&#xff0c;为自然数 1,2,…,n 的一个排列。 输…

Deepin使用记录-deepin安装docker

引用 本来想在deepin中直接安装mysql的开发环境的&#xff0c;但想到还是安装docker&#xff0c;然后在docker下安装比较方便&#xff0c;所以就有了本篇文章&#xff0c;先在deepin下安装docker。 经过本次安装&#xff0c;发现在deepin下安装docker是非常的简单&#xff0c…

自动执行 Active Directory 清理

Active Directory &#xff08;AD&#xff09; 可帮助 IT 管理员分层存储组织的资源&#xff0c;包括用户、组以及计算机和打印机等设备&#xff0c;这有助于管理员集中创建基于帐户和组的规则&#xff0c;并通过创建不合规的自动日志来强制执行和确保合规性。 不时清理AD是保…

ruoyi后台管理系统部署-3-安装redis

centos7安装redis 1. yum 安装 查看是否安装了redis yum installed list | grep redis ps -ef | grep redis安装epel 仓库&#xff08;仓库是软件包下载的&#xff0c;类似maven&#xff0c;nuget&#xff09; yum install epel-release搜索 redis 包 yum search redis安装…

YOLOv8 Ultralytics:使用Ultralytics框架进行SAM图像分割

YOLOv8 Ultralytics&#xff1a;使用Ultralytics框架进行SAM图像分割 前言相关介绍前提条件实验环境安装环境项目地址LinuxWindows 使用Ultralytics框架进行SAM图像分割参考文献 前言 由于本人水平有限&#xff0c;难免出现错漏&#xff0c;敬请批评改正。更多精彩内容&#xf…

第 3 章 稀疏数组和队列

文章目录 3.1 稀疏 sparsearray 数组3.1.1 先看一个实际的需求3.1.2 基本介绍3.1.3 应用实例3.1.4 课后练习 3.2 队列3.2.1 队列的一个使用场景3.2.2 队列介绍3.2.3 数组模拟队列思路3.2.4 数组模拟环形队列 3.1 稀疏 sparsearray 数组 3.1.1 先看一个实际的需求  编写的五…

【EI会议征稿通知】第三届机器视觉、自动识别与检测国际学术会议(MVAID 2024)

第三届机器视觉、自动识别与检测国际学术会议(MVAID 2024) 2024 3rd International Conference on Machine Vision, Automatic Identification and Detection 第三届机器视觉、自动识别与检测国际学术会议(MVAID 2024)定于2024年4月26至28日在中国昆明隆重举行。MVAID 2024将…

关于git与git-lfs对文件压缩存储方面的研究

先说结论&#xff0c;git使用了Delta增量压缩算法&#xff0c;git-lfs实测没有进行任何压缩&#xff0c;这个结论让我很震惊。 测试过程如下&#xff1a; 测试git仓库自身的压缩 准备一个包含许多杂项文件的文件夹&#xff0c;大概几百M&#xff0c;要保证有一个txt文本文件…