RocketMQ学习笔记:生产者Producer

news2025/1/10 16:42:18

DefaultMQProducer

根据上文:RocketMQ学习笔记:消息Message - 掘金 (juejin.cn),我们定位到Producer中的这一行代码:

java

复制代码

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start();

  1. 通过new DefaultMQProducer("ProducerGroupName")实例化一个生产者对象。这里的ProducerGroupName在5.x的版本已经移除了,他主要说的是生产者分组,主要是保证Producer发送同一类消息且发送逻辑一致。.
  2. 通过setNamesrvAddr("127.0.0.1:9876"),是指明设置设置NameServer地址。(记得这句话吗:Producer、Consumer、Broker在启动时,会自行将数据注册到NameServer中)。
  3. 启动这个生产者实例。

DefaultMQProducer是继承ClientConfig的,ClientConfig主要是做RocketMQ客户端公共配置的,setNamesrvAddr就是其方法,后续我们再介绍这个ClientConfig类。

DefaultMQProducer的其他推荐资料:RocketMQ源码解析:手把手教老婆看懂DefaultMQProducer_小虚竹的博客-CSDN博客,该文章介绍的很详细。

干活的是DefaultMQProducerImpl

构造它需要初始化一些基本属性,才能方便后面干活。

在构造方法中,我们继续往下定位发现,在DefaultMQProducer中其实有一个成员属性是defaultMQProducerImpl,我们记住:它是真正做消息传输的事:

 

java

复制代码

public class DefaultMQProducer extends ClientConfig implements MQProducer { protected final transient DefaultMQProducerImpl defaultMQProducerImpl; ...

DefaultMQProducer的构造方法中,实例化了DefaultMQProducerImpl

同时在上面第3步中,执行的

 

java

复制代码

producer.start();

也是在执行DefaultMQProducerImplstart()

了解一下它

当在DefaultMQProducer调用以下代码时,会去创建DefaultMQProducerImpl,它是 RocketMQ 生产者的实现类。该类的主要作用是提供一个异步发送线程池,用于处理生产者发送消息的异步任务。

 

ini

复制代码

defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);

调用以下的代码,主要是构造DefaultMQProducerImpl并且初始化一些属性,主要是线程池和队列。

  • asyncSenderThreadPoolQueue:容量为 50000。该队列用于存储异步发送任务,当队列已满时,新的任务将被阻塞,直到队列中有空间可用。
  • defaultAsyncSenderExecutor:线程池中的线程数量与 CPU 核心数量相同;线程存活时间为 60 秒,即如果一个线程在 60 秒内没有执行任务,则会被回收。
 

java

复制代码

public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) { // 保存传入的 DefaultMQProducer 对象和 RPCHook 对象 this.defaultMQProducer = defaultMQProducer; this.rpcHook = rpcHook; // 创建一个有界阻塞队列,容量为 50000,用于存储异步发送任务 this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000); // 创建一个异步发送线程池 this.defaultAsyncSenderExecutor = new ThreadPoolExecutor( // 设置核心线程数为当前可用的处理器数量 Runtime.getRuntime().availableProcessors(), // 设置最大线程数为当前可用的处理器数量 Runtime.getRuntime().availableProcessors(), // 设置线程存活时间为 60 秒 1000 * 60, TimeUnit.MILLISECONDS, // 使用上面创建的有界阻塞队列作为任务队列 this.asyncSenderThreadPoolQueue, // 创建一个 ThreadFactory,用于创建新的线程 new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { // 创建一个新的线程,名称为 AsyncSenderExecutor_1、AsyncSenderExecutor_2、... return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet()); } }); }

启动生产者实例

 

java

复制代码

producer.start();

通过producer.start();其实是调用this.defaultMQProducerImpl.start();这也就证明了干活的是DefaultMQProducerImpl

最后看一下发送消息

回到Producer类中,看最后Producer做了什么事情。

 

java

复制代码

SendResult sendResult = producer.send(msg);

根据代码一步步深入,主要是以下这样的逻辑,我们直接看到最后的地方:

send最后执行的是sendDefaltImpl这个方法。 它主要做这些事情:

  1. 校验消息
 

java

复制代码

Validators.checkMessage(msg, this.defaultMQProducer);

根据checkMessage源码,他主要校验:

  • 消息对象是否为空
  • topic是否合法
    • 是否为空的校验
    • 是不是系统主题的校验
  • 检查消息体(实际放的内容)是否为空
  • 检查消息体的长度是否为0
  • 检查消息的大小有没有超过默认值,默认值是4M
  1. 拿 Topic 名称去 NameServer 换取详情(挖坑)
  2. 消息重投,同步传输默认重传3次(包含传输的1次),否则1次。

最后,消息传递的方式有:同步、异步(回调函数)、还有一个是发就得了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/517651.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023年3月GESP能力等级认证C++一级真题

一、单选题&#xff08;每题2分&#xff0c;共30分&#xff09; 1.以下不属于计算机输入设备的有&#xff08;B &#xff09;。(2分) A&#xff0e;键盘 B&#xff0e;音箱 C&#xff0e;鼠标 D&#xff0e;传感器 2.计算机系统中存储的基本单位用 B 来表示&#xff0c;它…

Git 常用命令笔记

下载安装这里就不赘述了&#xff0c;直接下一步就行&#xff01; 一、常用命令 1. 增加删除/文件 添加当前目录的所有文件到暂存区 git add .添加指定文件到暂存区 git add [file1] [file2] ...添加指定目录到暂存区&#xff0c;包括子目录 git add [dir]对于同一个文件的多…

多种方法解决There is no tracking information for the current branch的错误

文章目录 1. 复现错误2. 分析错误3. 解决错误3.1 远程有分支3.2 远程无分支 4. 总结 1. 复现错误 今天发布某版本的项目&#xff0c;准备创建个v0point1分支&#xff0c;后期如果修改该版本&#xff0c;直接在该分支上修改即可。 首先&#xff0c;使用git branch v0point1命令…

问道游戏私人服务器架设+详细搭建教程+外网教程

搭建条件: 1、服务器一台, 2、下载服务端 搭建教程&#xff1a; 1.先安装宝塔 2、放行安全组的相应端口 具体要放行的端口有&#xff1a;3306、888、8888、5000、8101、8110、8120、8160-8168&#xff08;这个是范围之8160是一线&#xff0c;依次类推&#xff09; 3、安装数据库…

别点了!CAS登录对接,这个Bug让你反复登录!

目录 引言 背景描述 问题描述 问题排查 软件测试工程师发展规划路线 引言 你是否曾经在登录一个网站时&#xff0c;不断输入账号密码&#xff0c;却发现自己总是无法成功登录&#xff1f;或者你是否曾经遇到过跨域问题导致的登录失败&#xff1f; 今天我要和大家分享的就…

Speech and Language Processing之神经网络

上面这句话很好的解释了一件事&#xff0c;就是“大力出奇迹” &#xff0c;当神经元的数目足够足够多的时候&#xff0c;机器所能做到的事情就很复杂、很难理解了&#xff0c;这是不是说明chatgpt的成功也是因为大&#xff1f; 现代神经网络是一个由小型计算单元组成的网络&am…

前端 Web 性能清单

&#x1f482; 个人网站:【海拥】【摸鱼游戏】【神级源码资源网】&#x1f91f; 前端学习课程&#xff1a;&#x1f449;【28个案例趣学前端】【400个JS面试题】&#x1f485; 想寻找共同学习交流、摸鱼划水的小伙伴&#xff0c;请点击【摸鱼学习交流群】 提高 Web 应用程序的性…

HS6621系列低功耗国产蓝牙芯片 支持蓝牙5.1

HS6621CxC是一个功耗优化的蓝牙低功耗和专有的2.4 ghz应用真正的芯片上系统(SOC)解决方案。它集成了一个具有蓝牙基带和丰富外设的低功耗射频收发器I0扩展。HS6621CxC还集成了电源管理&#xff0c;提供高效率电源管理。它的目标是2.4 G蓝牙低功耗系统&#xff0c;人机界面设备(…

尚无忧【已对接硬件】共享自习室,共享麻将馆,共享茶室,共享空间,共享台球室,共享健身房无人值thinkphp开发

1、定位功能&#xff1a;可定位附近是否有店 2、能通过关键字搜索现有的店铺 3、个性轮播图展示&#xff0c;系统公告消息提醒 4、个性化功能展示&#xff0c;智能排序&#xff0c;距离、价格排序 5、现有店铺清单展示&#xff0c;订房可查看房间单价&#xff0c;根据日期、…

面试了一位6年的软件测试,一问三不知,他还反怼我...

最近看了很多简历&#xff0c;很多候选人年限不小&#xff0c;但是做的都是一些非常传统的项目&#xff0c;想着也不能通过简历就直接否定一个人&#xff0c;何况现在大环境越来 越难&#xff0c;大家找工作也不容易&#xff0c;于是就打算见一见。 在沟通中发现&#xff0c;由…

linux 修改 /etc/locale.conf无效问题处理办法

问题背景&#xff1a; 我在做测试系统文档转换成其他格式文档时&#xff0c;按照系统要求配置系统的编码格式为&#xff1a;utf-8 但是 尤其是设置&#xff1a;LC_ALLZh_CN.UTF- 8 但是 即使 我已经设置了 /etc/locale.conf内容如下&#xff1a; 并且source /etc/locale.con…

【FPGA-DSP】第十期:sysgen算法封装与调用

参考视频教程第10期 - sysgen算法封装与调用 - 基于FPGA的数字信号处理系统开发笔记_哔哩哔哩_bilibili 该教程主要实现如何将sysgen编写的算法模块给实际的应用起来 添加封装有两种方式&#xff1a; 在Vivado中使用ip核添加算法模块封装在Sysgen中将算法模块封装 Sysgen开发…

为何使用 B+ 树而非二叉查找树或 B 树做索引?

二叉树 B-Tree BTree 一、为何使用 B 树而非二叉查找树做索引&#xff1f; 我们知道二叉树的查找效率为 O(logn)&#xff0c;当树过高时&#xff0c;查找效率会下降。另外由于我们的索引文件并不小&#xff0c;所以是存储在磁盘上的。 文件系统需要从磁盘读取数据时&#xff0c…

【Vue工程】010-UnoCSS 即时按需原子 CSS 引擎

【Vue工程】010-UnoCSS 即时按需原子 CSS 引擎 文章目录 【Vue工程】010-UnoCSS 即时按需原子 CSS 引擎一、概述1、简介2、官网 二、基本使用1、安装2、修改 vite.config.ts3、根目录创建 uno.config.ts4、在 main.ts 中引入5、VS Code 安装 UnoCSS 插件6、在组件中使用7、访问…

ABAP 好用的事务码工具记录(持续更新)

性能优化类 SM50-某个程序RUNNING时间过长的时候&#xff0c;可以直接跳转到对应程序位置。 使用说明&#xff1a;一般选择仅活动进程&#xff0c;过滤列表中的活动程序和用户名&#xff0c;这样可以快速的定位到进程。 通过管理>>程序>>调试直接跳转到程序运行的…

VS2019配置opencv4.6.0手把手一步一步实现

引言&#xff1a;配置环境真是让人痛苦不堪&#xff0c;踩了无数个坑&#xff0c;网上的文章五花八门&#xff0c;完全不知道参考哪个&#xff0c;直接劝退。为了能顺利配置&#xff0c;此处进行记录&#xff0c;以后可以回过头来看&#xff0c;也分享给大家。 我提供了两种方…

基于AT89C51单片机的温度检测报警设计

点击链接获取Keil源码与Project Backups仿真图&#xff1a; https://download.csdn.net/download/qq_64505944/87777752?spm1001.2014.3001.5503 源码获取 主要内容&#xff1a; 基于51单片机设计一个温度检测报警器&#xff0c;至少具有以下功能&#xff1a;温度的检测和超…

音频信号处理库librosa

参考&#xff1a; 1. librosa官网 2. librosa语音信号处理 3. 语音信号处理库 ——Librosa 4. librosa音频处理教程 5. Python音频信号处理库函数librosa介绍 0 谱分析函数 1. librosa 读取信号 librosa.load(path, sr22050, monoTrue, offset0.0, durationNone)读取音频文件…

深度linux社区版 20.8 安装 nvidia-docker,启动Stable Deffision WebUI docker 容器

以下为失败的记录&#xff0c;成功方法直接跳到末尾。 环境 说明&#xff1a; 深度apt 源中无法直接安装nvidia-docker 下载源码 github地址&#xff1a; GitHub - NVIDIA/nvidia-docker: Build and run Docker containers leveraging NVIDIA GPUs 下载最新release https:…

Win10系统D盘满了怎么清理隐藏的垃圾文件?

Win10系统D盘满了怎么清理隐藏的垃圾文件&#xff1f;电脑磁盘满了之后&#xff0c;就无法存在其它的文件了&#xff0c;有用户的电脑D盘空间满了&#xff0c;那么这个情况怎么去将里面隐藏的一些垃圾文件进行清理呢?接下来我们一起来看看解决的方法分享吧。 方法一&#xff1…