rocketmq源码-broker处理consumer拉取消息请求

news2025/1/23 7:07:25

前言

在前面consumer拉取消息的博客中,有说过,对于consumer,在拉取消息的时候,是需要指定code码的,在consumer去broker拉取消息的时候,指定的code码是:PULL_MESSAGE,所以这篇博客,我们简单看下broker在接收到netty请求之后,是如何处理的

源码

在broker这边,是PullMessageProcessor 这个类来处理拉取消息请求的,我们先说为什么是这个类来处理

这是netty服务端在接收到client的请求之后,通过handler进行处理

org.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandler#channelRead0
	org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processMessageReceived
		org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand

这里有个关键的逻辑,我们可以看到:
1、先是从一个map集合中,根据key找到了一个处理类,然后判断处理类如果不为null,就使用map中获取到的processor
2、然后调用processor.processRequest()方法
在这里插入图片描述
这里我们先看下processorTable这个map是在哪里赋值的

这个registerProcessor()方法是在broker启动的时候,调用的,不再贴这个的调用链路了
在这里会根据code码以及对应的processor,调用了NettyRemotingService的registerProcessor方法
在这里插入图片描述

可以看到,这里会把code和对应的processor放到了这个map集合中
在这里插入图片描述

那接下来,我们来看具体解析请求参数,并拉取消息的逻辑

org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel,
org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)

对于这里处理拉取消息请求的逻辑,我还没有完全看明白所有的代码逻辑,所以我先把主核心的链路捞出来;我目前只看懂了两步
1、从commitLog根据size和offset拉取消息
2、将消息组装到response对象中

从commitLog拉取消息

在尝试拉取消息的时候,会看到,底层是通过commitLog来拉取消息的,将拉取到的消息,设置到返回对象中
在这里插入图片描述
在这里插入图片描述

将消息设置到返回参数中

这里可以看到,是根据从commitLog中取出来的数据,转换成了byte[]数组,然后把数组放到response的body字段中;

这里的body就是真正存放消息的地方,我们可以从consumer拿到response之后的解析逻辑来反向验证

在这里插入图片描述

consumer在接收到broker返回的请求之后,会把body中的数据,放到了messageBinary字段上
我们再往前看,对messageBinary字段是怎么用的
在这里插入图片描述
在这里插入图片描述
这里看到,会把messageBinary转换为msgList去使用;所以broker在返回参数的body字段中,设置的是消息体信息
在这里插入图片描述

总结

这里看起来,是比较简单明了的,就是根据当前请求中的offset和size,从commitLog中拉取消息数据,然后返回就可以了;当然,中间的很多细节都没有看,因为逻辑太多了,实在看不过来,先把核心逻辑串起来 后面再慢慢补中间的细节

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/91598.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[附源码]Nodejs计算机毕业设计基于的婚恋系统Express(程序+LW)

该项目含有源码、文档、程序、数据库、配套开发软件、软件安装教程。欢迎交流 项目运行 环境配置: Node.js Vscode Mysql5.7 HBuilderXNavicat11VueExpress。 项目技术: Express框架 Node.js Vue 等等组成,B/S模式 Vscode管理前后端分…

APISIX Ingress 如何支持自定义插件

摘要:本篇主要介绍了 Ingress 资源相关的语义,以及如何对 Ingress 资源进行能力的扩展。 作者:张晋涛,API7.ai 云原生技术专家,Apache APISIX PMC 成员,Apache APISIX Ingress Controller 项目维护者。 Ing…

基于C++ 实现简易图书管理系统【100010046】

图书管理系统 基于 C 实现简易图书管理系统 该项目是在学习完 C 语言后,独立完成设计开发的简易图书管理系统 设计的基本要求 基本完成对图书系统的设计,包含基本的功能,无界面设计。 要有明显的分类,对不同的进入者有不同的…

机器学习算法原理归纳总结:回归、聚类、支持向量、推荐、降维与神经网络

机器学习算法原理归纳总结:回归、聚类、支持向量、推荐、降维与神经网络 本文重点参考:唐宇迪博士的课程PPT [特别鸣谢] 完整版资料下载:机器学习算法原理详解代码实战 1.回归算法 2.逻辑回归 3.决策树 决策树实际上是根据样本的特征个数对样…

汇编语言第二章:寄存器

2. 寄存器 寄存器进行信息的存储,对于汇编程序员来说,CPU 中的主要部件是寄存器。8086CPU 有 14 个寄存器,这些寄存器分别是: AX BX CX DX SI DI SP BP IP CS SS DS ES PSW通用寄存器 8086所有的寄存器都是 16 位的&#xff0c…

Android Rust JNI系列教程(二) 创建第一个Rust JNI项目

前言 提到JNI,大家都会想到C,C.不过如今rust又给我们增加了一个选项,借助rust的jni库(https://github.com/jni-rs/jni-rs),我们可以很方便的使Android与rust交互.从本章起,我们将逐步地了解使用rust实现一些经典的jni方法. 创建Rust项目 创建工程 在命令行输入命令: cargo…

超算/先进计算的发展与应用是什么?

经过近十年的快速发展,我国在超算领域的实力已达到世界先进水平。1993年,我国第一台高性能计算机“曙光一号并行机”研制成功,打破了国外IT巨头对我国超算技术的垄断。 自此,我国不断加快超级计算机研制步伐。从全球超级计算机TO…

精华推荐 | 【MySQL技术专题】「主从同步架构」全面详细透析MySQL的三种主从复制(Replication)机制的原理和实战开发(原理+实战)

前提概要 随着应用业务数据不断的增大,应用的响应速度不断下降,在检测过程中我们不难发现大多数的请求都是查询操作。此时,我们可以将数据库扩展成主从复制模式,将读操作和写操作分离开来,多台数据库分摊请求&#xff…

NEUQACM双周赛(三)

目录7-1 打字(C)题目描述:输入格式:输出格式:输入样例1:输出样例1:输入样例2:输出样例2:解题思路:7-2 分香肠(C,最大公约数)题目描述:输入格式:输出格式:输入样例:输出样例:解题思路…

节能降耗 | AIRIOT智慧电力综合管理解决方案

电力技术的发展推动各行各业的生产力,与此同时,企业中高能耗设备的应用以及输配电过程中的电能损耗,也在一定程度上加剧了电能供应压力。以工业制造业为例,企业的管理水平、能耗结构、生产组织方式都关系到能源的有效利用率&#…

电子招投标系统nodejs+vue+elementui

前端技术:nodejsvueelementui 前端:HTML5,CSS3、JavaScript、VUE 1、 node_modules文件夹(有npn install产生) 这文件夹就是在创建完项目后,cd到项目目录执行npm install后生成的文件夹,下载了项目需要的依赖项。 2、packag…

电商新模式——链动2+1模式为你带来社交电商新思路

随着流量入口价值的降低,电商 IP 时代的来临,移动社交电商获得了飞速的发展,在运营与营销的过程中,商家们往往为了降低营销成本,主动制造消费理由,通过各类促销、折扣来刺激消费,然而在回归商业…

Web3中文|NFT如何促进教育的发展?

自问世以来,NFT已经被应用于教育、艺术等多个领域。不过,相较于艺术行业,大多数人对NFT在教育界的作用知之甚少。 那么,就让我们来看看它们在课堂内外的影响都有哪些。 得益于区块链技术,NFT可以提高教育质量&#x…

【蓝桥杯选拔赛真题52】Scratch正话反说 少儿编程scratch图形化编程 蓝桥杯选拔赛真题讲解

目录 scratch正话反说 一、题目要求 编程实现 二、案例分析 1、角色分析

Android Rust JNI系列教程(三) Rust与Android互相调用

前言 Rust的JNI流程以及方法实际上和我们常见的C JNI是十分相似的.我们本章将使用Rust实现常见的JNI调用功能.关于更多的用法,可参考官方示例,github地址为https://github.com/jni-rs/jni-rs/blob/master/example/mylib/src/lib.rs. 基本交互功能实现 1. Java传String,返回b…

Java集合复习

文章目录集合概述、collection集合体系特点Collection集合的遍历增强for循环集合概述、collection集合体系特点 集合都是支持泛型的&#xff0c;但是集合只能存储对象&#xff0c;因此集合也叫做对象集合。 public static void main(String[] args) {Collection<String>l…

分布式 | 令人头疼的堆外内存泄露怎么排查?

作者&#xff1a;鲍凤其 爱可生 dble 团队开发成员&#xff0c;主要负责 dble 需求开发&#xff0c;故障排查和社区问题解答。少说废话&#xff0c;放码过来。 本文来源&#xff1a;原创投稿 *爱可生开源社区出品&#xff0c;原创内容未经授权不得随意使用&#xff0c;转载请联…

OpenFace Win10 运行和抽离部分代码

需求&#xff1a;提取出OpenFace中的GazeAnaLyser 中的部分代码往一个写好的接口里面放&#xff0c;主要实现提取面部的所有关键点&#xff0c;估计出视线的功能&#xff1b; 一&#xff0c;openface的安装与使用 在win10上把openface跑起来这个链接够用了&#xff0c;这里主要…

非零基础自学Golang 第5章 流程控制 5.2 循环控制

非零基础自学Golang 文章目录非零基础自学Golang第5章 流程控制5.2 循环控制5.2.1 for循环5.2.2 break 跳出循环5.2.3 continue 继续循环第5章 流程控制 5.2 循环控制 5.2.1 for循环 Go语言中的循环逻辑通过for关键字实现。不同于其他编程语言&#xff0c;Go语言没有while关…

NeurIPS2022 | OmniVL: 用于Image-Language和Video-Language任务的通用模型

原文标题&#xff1a;OmniVL: One Foundation Model for Image-Language and Video-Language Tasks 论文链接&#xff1a;OmniVL: One Foundation Model for Image-Language and Video-Language Tasks | OpenReview 三模态统一的工作。 一、问题提出 旨在设计一个全视觉语言…