RocketMQ系统性学习-RocketMQ原理分析之Broker接收消息的处理流程

news2024/9/19 19:54:20

Broker接收消息的处理流程?

既然要分析 Broker 接收消息,那么如何找到 Broker 接收消息并进行处理的程序入口呢?

那么消息既然是从生产者开始发送,消息是有单条消息和批量消息之分的,那么消息肯定是有一个标识,当 Broker 接收到消息之后,肯定是需要通过判断消息的标识来区分单条消息和批量消息,那么只需要找到发送消息的标识,再全局搜索,就可以找到这个标识在哪里被处理,被处理的地方一定就是 Broker 接收消息处理的位置了!

那么还是先找到发送消息的位置:DefaultMQProducer # send(Message msg) ,通过层层调用(这里在生产者发送消息流程中讲了)到达了 DefaultMQProducerImpl # this.sendKernelImpl()

在这个方法中就调用到了 MQ 客户端的发送消息的方法 this.mQClientFactory.getMQClientAPIImpl().sendMessage()

在这里真正的通过 Netty 去发送消息到 Broker 中去:

  1. 通过判断消息的类型构造一个 RemotiongCommand 类型的 request 参数

    这里有 4 个构造 request 参数的方法,如下图会走到第三个方法中,那么这里的请求标识为 RequestCode.SEND_MESSAGE_V2

    在这里插入图片描述

  2. this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request) 方法中通过 Netty 将消息发送出去,那么这个方法需要传入一个 request 参数

在上边构造了 request 并且通过 Netty 发送出去,request 的标识为 RequestCode.SEND_MESSAGE_V2 ,那么我们只需要找到处理该标识的 request 的位置,那就是 Broker 处理消息的位置,在 IDEA 中通过 Ctrl+Shift+F 全局搜索这个标识即可:

在这里插入图片描述

可以发现有三个进行 case 判断的地方:

  • 第一个在 PlainAccessResource 类中
  • 第二个在 SendMessageActivity 类中
  • 第三个在 SendMessageRequestHeader 类中

这里第三个 case 判断的地方就是 Broker 处理消息的位置(可以在三个 case 中都 debug,看断点走到哪里就知道了)

那么我们就在第三个 case 判断的位置打上断点

在这里插入图片描述

接下来启动 NameServer,再以 Debug 的方式启动 Broker,再启动生产者,根据调用堆栈信息来找到 Broker 处理消息的整个调用链:

在这里插入图片描述

根据这个堆栈信息,可以发现,调用链是从 NettyServerHandler 的 channelRead0 转移过来的,那么也就是再 NettyServerHandler 这个 Netty 的服务端接收到消息并进行处理,那么我们就在这个堆栈信息中找 Broker 是在哪里对消息进行处理了呢?

就是在 SendMessageProcessor # processRequest 方法中(也就是堆栈顶第3个方法),在这个方法中:

  1. 通过 parseRequestHeader(request) 先对请求头进行解码,也就是根据请求头 RequestCode.SEND_MESSAGE_V2 的类型做一些相应的处理
  2. 接下来通过 buildMsgContext(ctx, requestHeader, request) 创建消息的上下文对象
  3. this.executeSendMessageHookBefore(sendMessageContext) 执行一些消息发送前的钩子(扩展点)
  4. 核心:this.sendMessage() 真正去发送消息

那么在 this.sendMessage() 中就是真正发送消息的逻辑了:

  1. 首先是 preSend(ctx, request, requestHeader) 进行预发送,这里其实就是对发送的消息进行一些检查(Topic 是否合法?Topic 是否与系统默认 Topic 冲突?Topic 的一些配置是否存在?等等信息)

  2. 如果 queueIdInt < 0 是 true 的话,表明生产者没有指定要发送到哪个队列,那么就通过 99999999 % 队列个数 来选择一个队列发送

  3. 将超过最大重试次数的消息发送到 DLQ 死信队列中去

    if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig, oriProps)) {
        return response;
    }
    
  4. 接下来判断 Broker 是否开启了 异步模式,如果开启的话,通过 asyncPutMessage() 处理

    如果没有开启 异步模式,通过 putMessage() 处理,这里其实还是调用了 asyncPutMessage(),只不过通过 get() 阻塞等待结果(复用代码)

那么在发送消息的时候,无论是否异步,都会进入到 DefaultMessageStore # asyncPutMessage() 方法中,我们就点进去看看进行了哪些处理:

  1. 执行一些钩子函数,作为扩展点:putMessageHook.executeBeforePutMessage(msg)

  2. 提交文件的写请求:CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg)

    在这个写文件的方法中,主要做一些文件的写操作,以及将文件写入到磁盘中

    1. 获取文件对象:this.mappedFileQueue.getLastMappedFile()
    2. 追加写文件的操作: mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext)
    3. 最后进行刷盘以及高可用的一些处理:handleDiskFlushAndHA(putMessageResult, msg, needAckNums, needHandleHA)
  3. 打印写文件消耗的时间 this.getSystemClock().now() - beginTime

那么 Broker 总体的接收消息的处理流程就是上边将的这么多了,当然还有一些边边角角的内容没有细说,先了解整体的处理流程,不要提前去学习太多的细节!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1326081.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

java中常用的加密算法总结

目前在工作中常用到加密的一些场景&#xff0c;比如密码加密&#xff0c;数据加密&#xff0c;接口参数加密等&#xff0c;故通过本文总结以下常见的加密算法。 1. 对称加密算法 对称加密算法使用相同的密钥进行加密和解密。在Java中&#xff0c;常见的对称加密算法包括&…

活动回顾丨迁飞之路主题艺术墙绘落地大坪大融城

重庆作为鹰飞之城&#xff0c;不仅是数十万猛禽迁飞的必经之路&#xff0c;也是其他珍稀鸟类的家园。守护飞羽精灵&#xff0c;领略迁飞之美&#xff0c;2023年12月19日&#xff0c;传益千里携手重庆工商大学艺术学院党员服务站的志愿者们一起走进大坪大融城开展迁飞之路生态艺…

软件测试工程师,“我“从月10k到月30k进阶自动化测试之路...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 作为手工测试&…

Likeshop单商户高级版商城的二次开发之路

一、产品介绍 likeshop单商户高级版是一款适用于B2C、单商户、自营商城场景的商城系统。它完美契合私域流量变现闭环交易使用&#xff0c;拥有丰富的营销玩法、强大的分销能力&#xff0c;支持DIY多模板&#xff0c;前后端分离。无论您是想要进行商城运营还是二次开发&#xf…

聚观早报 |xPad2 Pro系列学习机发布;华为Mate X5典藏版实力过硬

聚观早报每日整理最值得关注的行业重点事件&#xff0c;帮助大家及时了解最新行业动态&#xff0c;每日读报&#xff0c;就读聚观365资讯简报。 整理丨Cutie 12月21日消息 xPad2 Pro系列学习机发布 华为Mate X5典藏版实力过硬 iQOO Neo9系列标配芯片Q1 亚马逊云科技自研芯…

CentOS 宣布停更3年后,服务器操作系统何去何从?

“CentOS 要停止更新了&#xff1f;” 盯着电脑&#xff0c;某大型企业数字化部门的负责人彭素素看到这个消息&#xff0c;不仅在心里发出了一声惊呼。 2020年&#xff0c;CentOS 停止更新的消息&#xff0c;不仅彭素素所在的企业&#xff0c;对于不少正在使用 CentOS 的厂商…

搞懂这6 个持续集成工具,领先80%测试人!

开发人员喜欢把写的代码当成自己的孩子&#xff0c;他们会被当成艺术品一样呵护。作为家长&#xff0c;总是会认为自己的孩子是最好的&#xff0c;也会尽全力给自己的孩子最好的&#xff0c;就算有时候会超出自己的能力范围。 最终&#xff0c;孩子会走出去&#xff0c;和其他…

【笔试强化】Day 6

文章目录 一、单选1.2.3.4.5.6.7. 二、不定项选择1.2.3. 三、编程1. 把字符串转换成整数解法&#xff1a;代码&#xff1a; 2. 不要二解法&#xff1a;代码&#xff1a; 一、单选 1. 正确答案&#xff1a;D2. 正确答案&#xff1a;B3. 正确答案&#xff1a;D4. 正确答案&#…

Python编程技巧 – 使用正则表达式

Python编程技巧 – 使用正则表达式 Python Programming Skills – Using Regular Expression By JacksonML Python以其强大的功能高居全球编程软件的榜首。它易于学习和使用&#xff0c;使其成为初学者绝佳语言。此外&#xff0c;Python还用于各种应用程序&#xff0c;包括We…

Java Swing学生成绩管理系统期末大作业

1.且看界面 &#xff08;1&#xff09;登录页&#xff08;可记住账号密码&#xff09; &#xff08;2&#xff09;注册弹窗页 &#xff08;3&#xff09;登录弹窗 &#xff08;4&#xff09;还有账号密码错误3次需等待30秒 &#xff08;5&#xff09;成绩展示页面&#xff08;…

【Spring】15 ApplicationContextAware 接口

文章目录 1. 简介2. 作用3. 使用3.1 创建并实现接口3.2 配置 Bean 信息3.3 创建启动类3.4 启动 4. 应用场景总结 Spring 框架提供了许多回调接口&#xff0c;用于在 Bean 的生命周期中执行特定的操作。ApplicationContextAware 接口是其中之一&#xff0c;它允许 Bean 获取对 A…

无代码API集成助力电商平台,提升味分享营销系统效率

无代码开发的革命 在数字化转型的浪潮中&#xff0c;无代码开发正在成为企业提升效率和灵活性的重要工具。特别是在电商领域&#xff0c;高效的客户关系管理&#xff08;CRM&#xff09;系统和客户服务系统对于保持竞争力至关重要。无代码API集成方案如何实现电商系统的优化和…

存在重复元素

题目链接 存在重复元素 题目描述 注意点 无 解答思路 根据Set无法存储相同元素的特点判断nums中是否存在重复元素 代码 class Solution {public boolean containsDuplicate(int[] nums) {Set<Integer> set new HashSet<Integer>();for (int x : nums) {if …

广州华锐互动:VR元宇宙技术为汽车行业带来革命性变化

随着科技的飞速发展&#xff0c;VR元宇宙技术已经深入影响到我们生活的方方面面&#xff0c;汽车行业更是深受其益。这一新兴技术的出现&#xff0c;为汽车行业带来了前所未有的变化。广州华锐互动将VR技术应用于汽车行业&#xff0c;研发了VR汽修培训、3D汽车展厅、特种车辆3D…

JVM内存结构Java内存模型Java对象模型

导图&#xff1a; https://naotu.baidu.com/file/60a0bdcaca7c6b92fcc5f796fe6f6bc9 1.JVM内存结构&&Java内存模型&&Java对象模型 1.1.JVM内存结构 1.2.Java对象模型 Java对象模型表示的是这个对象本身的存储模型,JVM会给这个类创建一个instanceKlass保存在方…

【powershell】Windows环境powershell 运维之历史文件压缩清理

&#x1f984; 个人主页——&#x1f390;开着拖拉机回家_Linux,大数据运维-CSDN博客 &#x1f390;✨&#x1f341; &#x1fa81;&#x1f341;&#x1fa81;&#x1f341;&#x1fa81;&#x1f341;&#x1fa81;&#x1f341; &#x1fa81;&#x1f341;&#x1fa81;&am…

什么是文件包含漏洞?文件包含漏洞利用方法及防御技巧

文章目录 文件包含漏洞文件包含漏洞利用方法如何预防文件包含漏洞文件包含漏洞防御技巧网安学习路线 文件包含漏洞 文件包含漏洞是指在程序执行过程中&#xff0c;将外部文件的内容作为程序代码或数据的一部分来执行或使用&#xff0c;从而导致程序行为异常。攻击者可以利用文…

ansible的脚本---playbook剧本

ansible的脚本---playbook剧本 playbook组成部分 1、tasks任务&#xff1a;包含要在目标主机上执行的操作&#xff0c;使用模块定义这些操作&#xff0c;每个任务都是一个模块的调用 2、varlables变量&#xff1a;存储和传递数据&#xff0c;变量可以自定义&#xff0c;可以…

企业 NAS 升级,如何解决 Windows ACL 权限迁移和配置?

数字化转型是当前时代的必然趋势&#xff0c;它对于企业的创新能力和竞争力的提升至关重要。企业数字化发展过程中会产生大量的非结构化数据&#xff0c;旧有的存储已经不能完全满足企业需求。因此&#xff0c;相应的存储基础设施需要升级换代&#xff0c;以适应新的业务发展。…

本地生活团购外卖怎么做?一招教你轻易入行!

如果说今年生意不好做的话&#xff0c;那么年初做本地生活服务这个赛道的现在是喜忧参半。喜的是在本地生活干团购和外卖把钱给挣上了。忧的是官方清退了所有的全国本地生活服务商。通过官方渠道基本是没的玩了。本来还想着干个三五年。实现车子、房子、票子自由。这计划全落空…