从零开始读RocketMq源码(二)Message的发送详解

news2025/1/27 13:05:35

目录

前言

准备

消息发送方式

深入源码

消息发送模式

选择发送方式

同步发送消息

校验消息体

获取Topic订阅信息

高级特性-消息重投

选择消息队列-负载均衡

装载消息体发送消息

压缩消息内容

构造发送message的请求的Header

更新broker故障信息

异步发送消息

总结


前言

上一篇我们已经对RocketMq生产者启动源码进行了学习《从零开始读RocketMq源码(一)生产者启动》那么本篇我们将对生产者发送消息的源码进行学习

准备

如果没看前一篇的,这里还是要强调本篇的rocketmq版本

首先我们从github上拉取rocketmqd的源码链接到本地,使用idea打开。

源码地址:https://github.com/apache/rocketmq

目前最新版本为:5.2.0

那么我们在idea上切换分支为 release-5.2.0

注:请保持和本篇的版本一直,方便后面文章中给出的代码块定位

消息发送方式

在读源码之前我们先了解下mq支持的发送消息的类型。

消息的发送方式有三种,但我们最常用的是同步的方式发送

  • sync 同步:消息发送后,必须等待消息的发送结果返回后,才能发送下一条消息
  • async 异步:消息发送后,不用等待返回结果,直接发送下一条数据,但会设置一个回调方法接收返回结果
  • oneway 单向:消息发送后,不会返回结果,也不会等待,也不会设置回调方法。适用场景日志收集、监控数据和快速通知等对可靠性要求不高但需要高性能的场景

深入源码

首先进入外层的producer.send()方法中

//源码位置:
//包名:org.apache.rocketmq.example.simple
//文件名:Producer
//行数:42
SendResult sendResult = producer.send(msg);

消息发送模式

//源码位置:
//包名:org.apache.rocketmq.client.producer
//文件名:DefaultMQProducer
//行数:431
public SendResult send(
    Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    msg.setTopic(withNamespace(msg.getTopic()));
    //批量发送
    if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
        return sendByAccumulator(msg, null, null);
    } else {
        //单条发送
        return sendDirect(msg, null, null);
    }
}
  1. 自动批处理发送 -sendByAccumulator()
  • 该方法用于将消息累积到一个批处理容器中,等待足够的消息数量或达到某个时间间隔后,再进行批量发送。
  • 可以显著减少发送次数,提高吞吐量。

     2. 直接发送 -sendDirect()

  • 适用于即时发送或消息已经是批处理消息的情况

本章的重点就是直接发送消息,这也是开发中使用最频发的方式

选择发送方式

//源码位置:
//包名:org.apache.rocketmq.client.producer
//文件名:DefaultMQProducer
//行数:720
public SendResult sendDirect(Message msg, MessageQueue mq,
                             SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
    // send in sync mode
    if (sendCallback == null) {
        if (mq == null) {
            //同步不指定队列
            return this.defaultMQProducerImpl.send(msg);
        } else {
            //同步指定队列
            return this.defaultMQProducerImpl.send(msg, mq);
        }
    } else {
        if (mq == null) {
            //异步不指定队列
            this.defaultMQProducerImpl.send(msg, sendCallback);
        } else {
            //异步指定队列
            this.defaultMQProducerImpl.send(msg, mq, sendCallback);
        }
        return null;
    }
}

有上面代码可以知道,方法中提供了三个参数设置:

  • msg :消息体,这个为必填项
  • sendCallback :消息回调对象,如果这个参数不为空,则为异步发送,为空则为同步发送
  • mq :指定的队列(指定与不指定的区别在于后续是否需要对队列负载均衡,下面源码中会讲到)

根据最开始生产者发送消息,我们只传入了msg,所以本次重点看同步不指定队列代码实现

同步发送消息

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:1525
public SendResult send(Message msg,
                       long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

跟踪代码我们可以看到,方法中我们默认设置了CommunicationMode.SYNC 同步发送模式,并且回调参数为空,以及设置了默认超时时间3s

校验消息体

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:704
Validators.checkMessage(msg, this.defaultMQProducer);

该方法就是校验消息内容是否合规

  • 校验消息内容是否不为空,消息大小是否超过最大值maxMessageSize = 1024 * 1024 * 4; // 4M
  • 校验消息发送的topic是否为不为空,以及topic的长度是否超过默认最长值127

获取Topic订阅信息

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:709
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

该方法通过消息体中的topic名称获取topic的订阅信息,该方法在我们上一篇生产者启动中已经出现过了,深入方法内部其实就是先从本地topicPublishInfoTable map中获取数据,没有则从远程nameserver中拉取

高级特性-消息重投

这是rocketMq中一个重要的特性,消息如果投递失败了,会重新投递

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:715
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;

这段代码就是获取总过重投的次数:

不难看出,只有发送方式为同步发送时才为1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() =3次,其余发送方式都只有一次机会。

只有同步发送消息才支持消息重投,如果第一次投递失败了,mq还回重试2次投递

找到上面源码位置往下看,其实可以看到下面代码就是使用了一个for循环来进行重投

选择消息队列-负载均衡

通过上面我们知道,最开始并没有指定队列,所以需要程序来获取一个队列。

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:724
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName, resetIndex);

因为自动创建的topic,会被默认分配4个队列(生产环境为手动创建topic以及设置队列数量),所以我们必须使用负载均衡保证队列的合理分配到不同队列上,减轻单个队列的压力

  • topicPublishInfo:为消息发送到指定topic的订阅信息
  • lastBrokerName :为上一次选择的broker名称(如果在集群模式下,topic也会存在于多个broker上,因此记录上一次选择的broker名称可以避免连续选择同一个 Broker,从而实现更好的负载均衡和容错处理
  • resetIndex :重置队列索引位置(根据源码逻辑可知,当消息进行重新投递时会重置topic订阅消息中队列的索引位置)

深入上面源码会发现,队列负载均衡的算法获取索引策略默认就是轮询

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:TopicPublishInfo
//行数:101
int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size());

负载均衡策略

  1. 轮询策略 (Round-Robin)
  2. 随机策略 (Random)
  3. 一致性哈希策略 (Consistent Hashing)
  4. 权重随机策略 (Weighted Random)
  5. 最少连接策略 (Least Connections)

装载消息体发送消息

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:740
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);

该方法就是发现消息的核心方法了,不管是同步发送还是异步发送都会执行该方法

做一些发送消息前的准备,接下深入该方法查看

压缩消息内容

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:898
if (this.tryToCompressMessage(msg)) {
    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
    sysFlag |= compressType.getCompressionFlag();
    msgBodyCompressed = true;
}
  • 首先判断消息是否大于4k( compressMsgBodyOverHowmuch = 1024 * 4),大于则进行压缩,小于则不处理
//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:1070
byte[] data = compressor.compress(body, compressLevel);
  • 传入消息体以及压缩的等级,这里大佬们提供了三种压缩实现,分别基于三种不同的压缩框架

在我们日常工作中,如果需要压缩内容,也可以参考大佬们的实现,学习源码不仅仅是了解框架的本身,也要吸取优秀的地方合理运用

构造发送message的请求的Header

message是Producer发送给Broker的一个请求,我们可以把内容抽象成两部分组成:请求头请求体

  • 请求体就是消息本身数据
  • 请求头 SendMessageRequestHeader 则包含了各种必要的数据,比如topicmessaeQueue等等,更多可直接查看请求头对象源码

最后就是使用基于netty实现的远程调用发送消息到broker中

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:1016
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
    brokerAddr,
    brokerName,
    msg,
    requestHeader,
    timeout - costTimeSync,
    communicationMode,
    context,
    this);

更新broker故障信息

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:742
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true);

程序执行到这个位置,说明前面消息发送的流程全部执行完成了,那么我们也知道了消息发送的结果,从而知道broker服务的状态情况,我们需要把当前的broker故障情况更新到 faultItemTable 本地map中,供后续对broker服务的故障规避faultItemTable 该map在前一篇生产者启动中也提到过。

异步发送消息

选择发送方式代码中当sendCallback!=null时则进入异步发送消息

跟踪源码我们可知,异步发送其实就是创建了一个单独的线程,使用Runnable对象实现,因为会返回一个执行结果

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:550
Runnable runnable = new Runnable() {
    @Override
    public void run() {
        long costTime = System.currentTimeMillis() - beginStartTime;
        if (timeout > costTime) {
            try {
                sendDefaultImpl(msg, CommunicationMode.ASYNC, newCallBack, timeout - costTime);
            } catch (Exception e) {
                newCallBack.onException(e);
            }
        } else {
            newCallBack.onException(
                new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
        }
    }
    executeAsyncMessageSend(runnable, msg, newCallBack, timeout, beginStartTime);
};
  • sendDefaultImpl() 该方法就是和同步发送调用的同一个了,唯一区别就是类型 CommunicationMode.ASYNC 和存在回调方法newCallBack
  • executeAsyncMessageSend() 执行异步消息发送

总结

本篇对生产者发送消息源码进行了跟踪学习,你是否也有所收获呢。下一篇我们将对rocketMq的核心组件Broker进行源码解读,Broker负责接收和存储消息,管理消息队列,并将消息分发给消费者, 是担任连接生产者和消费者,确保消息的高效传输和存储,保证系统的可靠性和性能的重要角色。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1909152.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

根据空格、制表符、回车符等分割字符串re.split

【小白从小学Python、C、Java】 【考研初试复试毕业设计】 【Python基础AI数据分析】 根据空格、制表符、 回车符等分割字符串 re.split [太阳]选择题 根据给定的Python代码,哪个选项是正确的? import re pattern r\s print(f"【显示】pattern{…

软件工程面向对象 超市管理系统 需求分析 系统设计 课程设计报告

1、引言 系统简述 超市管理系统的功能主要有前台管理和后台管理两个大块。其使用对象 有超市管理人员和超市销售人员两类。超市管理系统主要为了实现商品输 入、 输出管理数据的自动化, 提高商品统计信息的实时性, 减轻人工劳动强 度从而节省人力成本。实…

国产操作系统安装配置auditd审计工具 _ 统信 _ 麒麟 _ 中科方德

原文链接:国产操作系统安装配置auditd审计工具 | 统信 | 麒麟 | 中科方德 Hello,大家好啊!今天给大家带来一篇在国产桌面操作系统上部署auditd审计工具的文章。auditd是Linux审计系统的核心守护进程,用于记录系统安全相关的事件和…

Java根据经纬度获取两点之间的距离

Java根据经纬度获取两点之间的距离,最近在实现类似于钉钉打卡签到的需求,因为对精度要求不是很高,所以可以通过一个球面距离的公式来求两点距离,这里将地球当成一个球体,实际上地球是一个不规则的球体,所以…

14-22 剑和远方2 - 深度神经网络中的学习机制

概论 在第一部分中,我们深入探讨了人工智能的兴衰简史以及推动人工智能发展的努力。我们研究了一个简单的感知器,以了解其组件以及简单的 ANN 如何处理数据和权重层。在简单的 ANN 中,不会对数据执行特定操作。ANN 中的激活函数是一个线性函…

模拟考试小程序的设计

管理员账户功能包括:系统首页,个人中心,科目管理,复习资料管理,参考文献管理,用户管理,留言板管理,论坛管理 微信端账号功能包括:系统首页,复习资料&#xf…

搭建ASP+Mssql站点

SQL Server 2022 链接:https://pan.baidu.com/s/1U2zkXacbjOVNsAq-JQ80tA?pwdgosm 提取码:gosm MSSQLi-labs 链接:https://pan.baidu.com/s/1K6rCIaeSzaBBtQKD9Uk1fw?pwdveyn 提取码:veyn SQL Server 2022下载安装 安装成功…

【前端从入门到精通:第十一课: JS基本语法】

独闯JavaScript 了解JavaScript 为什么学习JavaScript JavaScript 是 web 开发者必学的三种语言之一: HTML 定义网页的内容 CSS 规定网页的布局 JavaScript 对网页行为进行编程 我们学习 JavaScript,以及 JavaScript 如何与 HTML 和 CSS 协同工作的知…

重大更新!800GB谷歌倾斜摄影最新OSGB数据免费大放送

自3月20日谷歌倾斜摄影OSGB转换工具V0.1版本发布以来,期间更新了V0.2、V1.0、V1.1、V1.2共4个版本,目前V1.2版本功能已经比较完善和稳定,实现了的当初产品规划的绝大部分功能。基于此,我将之前免费分享的数据重新下载和生成&#…

MyBatis框架学习笔记(二):原生API 的调用 和 注解的使用

1 MyBatis原生API 1.1 原生API 快速入门需求 在笔记一案例的基础上将增删改查,使用 MyBatis 原生的 API 完成,就是直接通过SqlSession 接口的方法来完成 1.2 原生API 快速入门-代码实现 创建 src\test\java\com\hspedu\mapper\MyBatisNativeTest.jav…

BUG: npm error `electron_mirror` is not a valid npm option

npm error electron_mirror is not a valid npm option 环境 windows 11 node v20.15.0 npm v10.7.0详情 在运行 npm run mirror 命令时出现错误。这是一个设置镜像的命令。 我是没事干了,运行这个命令,这个命令在我这里根本就是运行不了。这个命令一…

完美解决AttributeError: ‘DataFrame‘ object has no attribute ‘ix‘的正确解决方法,亲测有效!!!

完美解决AttributeError: ‘DataFrame’ object has no attribute ix’的正确解决方法,亲测有效!!! 亲测有效 完美解决AttributeError: DataFrame object has no attribute ix的正确解决方法,亲测有效!&…

服务器数据恢复—同品牌不同系列服务器raid5阵列数据恢复方案分析

RAID5磁盘阵列数据恢复案例一: 服务器数据恢复环境: 一台某品牌LH6000系列服务器,通过NetRaid阵列卡将4块硬盘组建为一组RAID5磁盘阵列。操作系统都为Window server,数据库是SQLServer。 服务器故障: LH6000系列服务器…

并发编程工具集——StampedLock-比ReadWriteLock更快的锁(上篇)(十八)

StampedLock 支持的三种锁模式 写锁、悲观读锁和乐观读 StampedLock与ReadWriteLock的异同 ReadWriteLock支持两种:读锁、写锁相同点:其中,写锁、悲观读锁的语义和 ReadWriteLock 的写锁、读锁的语义非常类似,允许多个线程同时获取…

tessy 单元测试:小白入门指导手册

目录 1,创建单元测试工程目录 2,导入单元测试源文件 一:创建测试文件夹(最好和代码目录一一对应,方便查找) 二:选择测试环境 三:添加源文件 四:分析源文件 3,编写单元测试用例 一:设置函数参数的传输方向 二:添加单元测试用例 三:编辑单元测试用例数据 …

MATLAB中d2c函数用法

目录 语法 说明 示例 将离散时间传递函数转换为连续时间 将识别出的离散时间传递函数转换为连续时间 在将已识别的离散时间传递函数模型转换为连续时间模型后,重新生成协方差信息 d2c函数的功能是将模型从离散时间转换为连续时间。 语法 sysc d2c(sysd) sy…

atcoder 357 F Two Sequence Queries (线段树板子)

题目: 分析: 线段树 代码: // Problem: F - Two Sequence Queries // Contest: AtCoder - SuntoryProgrammingContest2024(AtCoder Beginner Contest 357) // URL: https://atcoder.jp/contests/abc357/tasks/abc357_…

【最新鸿蒙应用开发】——Navigation路由管理

Navigation路由 1.引言 一多开发的项目适合使用Navigation进行统一的页面路由管理。Navigation还提供统一的标题栏、工具栏、菜单栏,并且自带导航返回功能。另外,Navigation还支持一些Router不支持的功能,比如:自带的路由拦截功…

运行时异常与一般异常的异同

运行时异常与一般异常的异同 1、运行时异常(Runtime Exception)1.1 特点 2、 一般异常(Checked Exception)2.1 特点 3、异同点总结3.1 相同点3.2 不同点 4、总结 💖The Begin💖点点关注,收藏不迷…

Transformer构架的优劣及常见问题

Transformer构架的优劣 优点 长距离依赖关系建模:通过自注意力机制,Transformer能够有效捕捉长距离依赖关系,适用于处理长序列和涉及远距离语义关系的任务。并行计算能力:多头注意力机制的并行计算特性极大提高了训练和推理效率…