RocketMQ 源码分析——Producer

news2025/1/9 14:17:07

文章目录

  • 消息发送代码实现
  • 消息发送者启动流程
    • 检查配置
    • 获得MQ客户端实例
    • 启动实例
    • 定时任务
  • Producer 消息发送流程
    • 选择队列
      • 默认选择队列策略
      • 故障延迟机制策略*
      • 两种策略的选择
  • 技术亮点:ThreadLocal

消息发送代码实现

下面是一个生产者发送消息的demo(同步发送)

image.png

主要做了几件事:

  • 初始化一个生产者(DefaultMQProducer)对象
  • 设置 NameServer 的地址
  • 启动生产者
  • 发送消息

消息发送者启动流程

image.png

DefaultMQProducerImpl类start()

image.png

检查配置

DefaultMQProducerImpl

image.png

获得MQ客户端实例

整个JVM中只存在一个MQClientManager实例,维护一个MQClientInstance缓存表。DefaultMQProducerImpl类start()

image.png

一个clientId只会创建一个MQClientInstance

image.png

clientId生成规则:IP@instanceName@unitName

image.png

RocketMQ中消息发送者、消息消费者都属于”客户端“。每一个客户端就是一个MQClientInstance,每一个ClientConfig对应一个实例。

不同的生产者、消费端如果引用同一个客户端配置(ClientConfig),则它们共享一个MQClientInstance实例。所以我们在定义的的时候要注意这种问题(生产者和消费者如果分组名相同容易导致这个问题)

image.png

启动实例

MQClientInstance类start()

image.png

定时任务

MQClientInstance类startScheduledTask()

image.png

Producer 消息发送流程

我们从一个生产者案例的代码进入代码可知:DefaultMQProducerImpl中的sendDefaultImpl()是生产者消息发送的核心方法

image.png

image.png

从核心方法可知消息发送就是4个步骤:验证消息、查找路由、选择队列、消息发送。

image.png

image.png

选择队列

默认选择队列策略

采用了最简单的轮询算法,这种算法有个很好的特性就是,保证每一个Queue队列的消息投递数量尽可能均匀。这种算法只要消息投递过程中没有发生重试的话,基本上可以保证每一个Queue队列的消息投递数量尽可能均匀。当然如果投递中发生问题,比如第一次投递就失败,那么很大的可能性是集群状态下的一台Broker挂了,所以在重试发送中进行规避。这样设置也是比较合理的。

故障延迟机制策略*

采用此策略后,每次向Broker成功或者异常的发送,RocketMQ都会计算出该Borker的可用时间(发送结束时间-发送开始时间,失败的按照30S计算),并且保存,方便下次发送时做筛选。

image.png

除了记录Broker的发送消息时长之外,还要计算一个Broker的不可用时长。这里采用一个经验值:

如果发送时长在550ms之内,不可用时长为0。

达到550ms,不可用时长为30S

达到1000ms,不可用时长为60S

达到2000ms,不可用时长为120S

达到3000ms,不可用时长为180S

达到15000ms,不可用时长为600S

image.png

image.png

有了以上的Broker规避信息后发送消息就非常简单了。

在开启故障延迟机制策略步骤如下:

  1. 根据消息队列表时做轮训
  2. 选好一个队列
  3. 判断该队列所在Broker是否可用
  4. 如果是可用则返回该队列,队列选择逻辑结束
  5. 如果不可用,则接着步骤2继续
  6. 如果都不可用,则随机选一个

代码如下:

image.png

两种策略的选择

从这种策略上可以很明显看到,默认队列选择是轮训策略,而故障延迟选择队列则是优先考虑消息的发送时长短的队列。那么如何选择呢?

首先RocketMQ默认的发送失败有重试策略,默认是2,也就是如果向不同的Broker发送三次都失败了那么这条消息的发送就失败了,作为RocketMQ肯定是尽力要确保消息发送成功。所以给出以下建议。

如果是网络比较好的环境,推荐默认策略,毕竟网络问题导致的发送失败几率比较小。

如果是网络不太好的环境,推荐故障延迟机制,消息队列选择时,会在一段时间内过滤掉RocketMQ认为不可用的broker,以此来避免不断向宕机的broker发送消息,从而实现消息发送高可用。

当然以上成立的条件是一个Topic创建在2个Broker以上的的基础上。

技术亮点:ThreadLocal

image.png

image.png

image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1022671.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux环境中数据误删除后恢复指导

一、背景 在很多Linux系统运维工作中,很多人会遇到敲错命令,或复制命令出错,或直接执行了rm -rf命令,事后才恍然大悟,闯下大祸,抛开问题,如果真的遇到这种情况,我们该如何应对呢&…

如何使用IP归属地查询API来追踪网络活动

引言 在当今数字化世界中,了解网络活动的源头和位置对于网络安全、市场研究和用户体验至关重要。IP归属地查询API是一种强大的工具,可以帮助您追踪网络活动并获取有关IP地址的重要信息。本文将探讨如何使用IP归属地查询API来追踪网络活动,以…

[C++ 网络协议] 多播与广播

目录 1. 多播 1.1 多播的使用情形 1.2 多播的原理 1.3 如何实现多播 1.4 多播的代码实现 2. 广播 2.1 广播与多播的区别 2.2 广播的分类 2.3 实现广播 1. 多播 1.1 多播的使用情形 考虑一种情形,你要向10000名用户发送数据,此时如果用TCP提供服…

MDPI模板报错的问题---提示缺少sty文件

MDPI模板报错的问题—提示缺少sty文件 平时大多数提交IEEE trans模板时大多使用CTEX编译,然而,MDPI模板需要用texlive,二者之间如果先安装CTEX后安装texlive将会导致库文件的冲突。结果将会报缺少sty的文件错。网上提供了很多解决方案&#…

Nmap安装和使用详解

Nmap安装和使用详解 Nmap概述功能概述运行方式 Nmap安装官方文档参考:Nmap参数详解目标说明主机发现端口扫描Nmap将目标主机端口分成6种状态:Nmap产生结果是基于机器的响应报文,而这些主机可能是不可信任的,会产生一些迷惑或者误导…

如何将 ONLYOFFICE 协作空间与单页面应用集成

2023 年春季,我们推出了 ONLYOFFICE 协作空间,这是一个先进的联合办公平台,旨在加强与客户、合作伙伴和第三方的文档协作。使用可自定义的房间和高级安全功能可以彻底改变您的文档协作方式。在本博文中,我们将以 GitHub Pages 为例…

Linux之操作文件命令

目录 一、阅览文件 1、cat 2、head 3、tail 4、more 5、less 二、过滤命令——grep 1、格式 2、选项 3、匹配模式 三、cut切割命令 1、格式 2、选项 四、sort排序命令 1、格式 2、选项 五、uniq去重命令 六、替换文件中的字符显示tr 1、格式 2、选项 七、…

第8章 MySQL的数据目录

8.1 数据库和文件系统的关系 像 InnoDB 、 MyISAM 这样的存储引擎都是把表存储在磁盘上的,而操作系统用来管理磁盘的又被称为 文件系统 ,所以用专业一点的话来表述就是:像 InnoDB 、 MyISAM 这样的存储引擎都是把表存储在文件系统上的。当我…

API接口采集电商平台阿里巴巴中国站获得1688商品评论数据货品评分、评价内容接口调用指南

淘宝API商品评论接口,主要用于获取某个商品的评价信息。通过该接口,我们可以获取到商品的所有评价内容、评价时间、评价等级等相关信息,帮助我们更好地了解用户对商品的反馈,进而进行数据分析和业务优化。 1688.item_review-获得…

投票制作创建流量主小程序开发

很多企业、团队、门店商家有创建投票活动的需求,单独开发一套成本过高,所以会找一些市面上可以创建投票活动的工具。基于此开发了一款可以创建制作投票活动的小程序。 小程序主要是投票活动的制作、创建,可以接入流量主广告和会员功能&#…

成集云 | 金蝶K3集成聚水潭ERP(金蝶K3主管库存)| 解决方案

源系统成集云目标系统 方案介绍 金蝶K3是一款ERP软件,它集成了供应链管理、财务管理、人力资源管理、客户关系管理、办公自动化、商业分析、移动商务、集成接口及行业插件等业务管理组件。以成本管理为目标,计划与流程控制为主线,通过对成本…

温度传感器的精度受什么影响

温度传感器(temperature transducer)是指能感受温度并转换成可用输出信号的传感器。是早开发,应用极其广的一类传感器,是温度测量仪表的核心部分。现代的温度传感器外形非常得小,这样更加让它广泛应用在生产实践的各个…

PowerDesigner 与 mysql 同步数据

PowerDesigner 连接上数据库 创建数据库表 table_5 选择: 点击确认后弹出 点击run执行 刷新数据库表,已创建成功 修改测试表1,新增一个字段 取消全选 选择数据库,勾选修改的表,如果全部勾选的话,就…

3D动画制作和渲染需要什么样的硬件规格?

动画是艺术与技术的令人兴奋的融合,为无限的创造力提供了广阔的画布。为了将创意愿景变为现实,动画师需要适合其工艺的强大计算资源。每个动画项目都有不同的硬件需求,无论是制作简单的 2D 动画还是构建复杂的 3D 世界。因此,有抱…

字符串 (3)--- KMP 算法的扩展

对于个长度为n的字符串s。定义函数z[i]表示s和s[i,n-1](即以 s[i] 开头的后缀)的最长公共前缀(LCP)的长度。 z被称为s的Z函数。特别地,z[0] 0。 如同大多数字符串主题所介绍的算法,其关键在于&#xff0c…

深度学习之模型压缩、加速模型推理

简介 当将一个机器学习模型部署到生产环境中时,通常需要满足一些在模型原型阶段没有考虑到的要求。例如,在生产中使用的模型将不得不处理来自不同用户的大量请求。因此,您将希望进行优化,以获得较低的延迟和/或吞吐量。 延迟&…

MyBatis-Plus 使用拦截器实现数据权限控制

平时开发中遇到根据当前用户的角色,只能查看数据权限范围的数据需求。列表实现方案有两种,一是在开发初期就做好判断赛选,但如果这个需求是中途加的,或不希望每个接口都加一遍,就可以方案二加拦截器的方式。在mybatis执…

BIT-4-数组

一维数组的创建和初始化一维数组的使用 一维数组在内存中的存储 二维数组的创建和初始化二维数组的使用二维数组在内存中的存储 数组越界数组作为函数参数数组的应用实例1:三子棋 数组的应用实例2:扫雷游戏 1. 一维数组的创建和初始化 1.1 数组的创建 …

IOS17正式版今日发布

北京时间9月19日凌晨,苹果公司正式向全球用户推送了期待已久的iOS 17正式版。此次更新为iPhone带来了多项激动人心的功能,包括对“电话”、“信息”、FaceTime通话的重大更新,“待机显示”以及音乐、小组件、Safari浏览器的升级等。 据了解&…

二叉树顺序结构及实现

👉二叉树顺序结构及实现 1.二叉树的顺序结构2.堆的概念及结构3.堆的实现3.1堆向下调整算法3.2堆向上调整算法 4.堆的创建4.1堆创建方法14.1.1构建堆结构体4.1.2堆的初始化4.1.3堆数据添加向上调整4.1.4主函数内容 4.2堆的创建方法24.2.1堆数据添加向下调整 4.3堆数据…