RocketMQ系统性学习-RocketMQ原理分析之消费者的接收消息流程

news2025/1/11 7:41:32

文章目录

      • 消费者的接收消息流程

消费者的接收消息流程

还是先把消费者接收消息的流程图贴出来,再细说代码流程:
在这里插入图片描述

首先先从消费者的业务调用出发

// 创建消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_group");
// ...
// 注册监听消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                    ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            System.out.println(new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
// 启动消费者
consumer.start();

那么我们就从 consumer.start() 进入,看一下消费者的启动逻辑,该方法的核心代码也就是:

this.defaultMQPushConsumerImpl.start();

那么进入到这个 start 方法,这里进行了一些配置以及客户端的启动:

  1. 通过 checkConfig() 检查消费组的一些配置:名称是否符合规范、消费者的线程数、消费者的监听等等
  2. 之后再设置一些属性
  3. 通过 mQClientFactory.start() 启动客户端

那么我们进入到启动客户端这个逻辑,我们猜测这里 start 之后,可能就可以进行消息的拉取了,那么在 start 这个方法中,看到了有下边这一行:

this.pullMessageService.start();

这不正是拉取消息的服务吗?点进去之后,发现就是启动了一个线程,这个线程呢就是 this,那么我们点进去这个 start 方法是定义在 ServiceThread 类中,这个类并没有定义 run 方法,因此呢,这个 run 方法应该是定义在了子类 PullMessageService 类中,点进去找到 run 方法,可以看到在 run 方法中就会不停地去 messageRequestQueue 中拉取数据:

MessageRequest messageRequest = this.messageRequestQueue.take();

既然在这里拉取数据了,那么数据是什么时候放到 messageRequestQueue 中的呢?

只需要搜一下哪里调用到了 this.messageRequestQueue.put 就可以知道了,找到之后呢,我们在这一行打个断点,再去启动生产者,就可以知道整个调用链了,

在这里插入图片描述

那么根据栈调用情况呢,可以发现这一行是通过 RebalanceService 的 run 方法进入的,那么这个 RebalanceService 一定是在哪里作为一个线程被启动了

在这里插入图片描述

那么呢,我们之前说了在启动客户端的时候,调用 this.pullMessageService.start() 启动了这个线程,那么在下一行就启动了 rebalanceService 这个线程:

在这里插入图片描述

因此呢,就通过 debug 的方式找到了向 messageRequestQueue 中存放消息就是在 RebalanceService 这个线程中做的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1323127.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用极狐gitlab初始化导入本地项目

本地有项目的情况需要同步到极狐gitlab上 第一步&#xff1a; 在gitlab上新创建一个空项目 ⚠️⚠️⚠️这里需要注意红色圈住的地方一定不要选择&#xff0c;因为选择了这个后续会有不必要的麻烦 第二步 在本地项目中删除原来的.git文件(这一步如果是新项目可以忽略&#…

c# OpenCV 基本绘画(直线、椭圆、矩形、圆、多边形、文本)(四)

我们将在这里演示如何使用几何形状和文本注释图像。 Cv2.Line() 绘制直线 Cv2.Ellipse() 绘制椭圆Cv2.Rectangle() 绘制矩形Cv2.Circle() 绘制圆Cv2.FillPoly() 绘制多边形Cv2.PutText() 绘制文本 一、绘制直线 Cv2.Line(image, start_point, end_point, color, thickness) …

CleanMyMac X 4 for Mac(Mac优化清理工具)v4.14.6中文破解版

CleanMyMac X for Mac中文破解版只需两个简单步骤就可以把系统里那些乱七八糟的无用文件统统清理掉&#xff0c;节省宝贵的磁盘空间。cleanmymac x个人认为X代表界面上的最大升级&#xff0c;功能方面有更多增加&#xff0c;与最新macOS系统更加兼容&#xff0c;流畅地与系统性…

云计算:FusionCompute 通过 FreeNAS 添加SAN存储

目录 一、实验 1.环境准备 2.FusionCompute添加CNA 3.在存储中创建LUN资源映射给CNA节点 3.添加存储资源关联CNA主机节点 4.扫描存储资源 5.将存储设备添加为数据存储 二、问题 1.FusionCompute中存储如何分类 2.存储资源与存储设备有何区别 3.FusionCompute支持哪些…

JavaScript基础篇

目录 1.初始JavaScript 2.Js数据类型 2.1强制转换类型 1.转换为String类型 2.转换为Number类型 3.转换为 Boolean 4.转义符 2.2运算符 2.3分支结构 1.初始JavaScript <!-- 1. 文件引入 --> <!--<script src"./js/index.js"></script>-…

微软官宣放出一个「小模型」,仅2.7B参数,击败Llama2和Gemini Nano 2

就在前一阵谷歌深夜炸弹直接对标 GPT-4 放出 Gemini 之后&#xff0c;微软这两天也紧锣密鼓进行了一系列动作。尽管时间日趋圣诞假期&#xff0c;但是两家巨头硬碰硬的军备竞赛丝毫没有停止的意思。 就在昨日&#xff0c;微软官宣放出一个“小模型” Phi-2&#xff0c;这个 Ph…

在 VMware 虚拟机上安装黑苹果(Hackintosh):免费 macOS ISO 镜像下载及安装教程

在 VMware 虚拟机上安装黑苹果(Hackintosh)&#xff1a;免费 macOS ISO 镜像下载及安装教程 VMware 虚拟机解锁 macOS 安装选项使用 macOS iso 系统镜像安装使用 OpenCore 做引导程序安装 在 VMware 虚拟机上安装黑苹果(Hackintosh)&#xff1a;免费 macOS ISO 镜像下载及安装…

RPC(5):AJAX跨域请求处理

接上一篇RPC&#xff08;4&#xff09;&#xff1a;HttpClient实现RPC之POST请求进行修改。 1 修改客户端项目 1.1 修改maven文件 修改后配置文件如下&#xff1a; <dependencyManagement><dependencies><dependency><groupId>org.springframework.b…

【线性代数】两个向量组等价,其中一个向量组线性无关,另一个向量组也是线性无关吗?

一、问题 两个向量组等价,其中一个向量组线性无关,另一个向量组也是线性无关吗? 二、答案 不一定,当两个向量组中的向量个数也相同时,结论才成立.若向量个数不相同,结论不成立. 例如&#xff1a; 向量组一&#xff1a;(1,0),(0,1) 向量组二&#xff1a;(1,0),(0,1),(1,1) 两…

MyBatis ${}和#{}区别

sql防注入底层jdbc类型转换当简单类型参数$不防止Statment不转换value#防止preparedStatement转换任意 除模糊匹配外&#xff0c;杜绝使用${} MyBatis教程&#xff0c;大家可以借鉴 MyBatis 教程_w3cschoolMyBatis 是支持定制化 SQL、存储过程以及高级映射的优秀的持久层框架。…

【IDEA】Intellij IDEA相关配置

IDEA 全称 IntelliJ IDEA&#xff0c;是java编程语言的集成开发环境。IntelliJ在业界被公认为最好的Java开发工具&#xff0c;尤其在智能代码助手、代码自动提示、重构、JavaEE支持、各类版本工具(git、svn等)、JUnit、CVS整合、代码分析、 创新的GUI设计等方面的功能可以说是超…

锐捷配置完全stub区域

一、实验拓扑 二、实验目的 在运行OSPF协议的网络中&#xff0c;配置STU区域可以减少路由器的路由条目&#xff0c;减小路由器的压力&#xff0c;有效提高路由器的性能。 三、实验配置 第一步&#xff1a;全局配置OSPF R1 ruijie>enable R1#conf terminal R1(config)#hos…

API资源对象StorageClass;Ceph存储;搭建Ceph集群;k8s使用ceph

API资源对象StorageClass;Ceph存储;搭建Ceph集群;k8s使用ceph API资源对象StorageClass SC的主要作用在于&#xff0c;自动创建PV&#xff0c;从而实现PVC按需自动绑定PV。 下面我们通过创建一个基于NFS的SC来演示SC的作用。 要想使用NFS的SC&#xff0c;还需要安装一个NFS…

七:爬虫-数据解析之正则表达式

七&#xff1a;正则表达式概述 正则表达式&#xff0c;又称规则表达式,&#xff08;Regular Expression&#xff0c;在代码中常简写为regex、regexp或RE&#xff09;&#xff0c;是一种文本模式&#xff0c;包括普通字符&#xff08;例如&#xff0c;a 到 z 之间的字母&#xf…

华清远见嵌入式学习——ARM——作业1

要求&#xff1a; 代码&#xff1a; mov r0,#0 用于加mov r1,#1 初始值mov r2,#101 终止值loop: cmp r1,r2addne r0,r0,r1addne r1,r1,#1bne loop 效果&#xff1a;

【AI图集】猫狗的自动化合成图集

猫是一种哺乳动物&#xff0c;通常被人们作为宠物饲养。它们有柔软的毛发&#xff0c;灵活的身体和尖锐的爪子。猫是肉食性动物&#xff0c;主要以肉类为食&#xff0c;但也可以吃一些蔬菜和水果。猫通常在夜间活动&#xff0c;因此它们需要足够的玩具和活动空间来保持健康和快…

STM32启动流程详解(超全,startup_stm32xx.s分析)

单片机上电后执行的第一段代码 1.初始化堆栈指针 SP_initial_sp 2.初始化 PC 指针Reset_Handler 3.初始化中断向量表 4.配置系统时钟 5.调用 C 库函数_main 初始化用户堆栈&#xff0c;然后进入 main 函数。 在正式讲解之前&#xff0c;我们需要了解STM32的启动模式。 STM32的…

css实现0.5px宽度/高度显——属性: transform: scale

在大多数设备上&#xff0c;实际上无法直接使用 CSS 来精确地创建 0.5 像素的边框。因为大多数屏幕的最小渲染单位是一个物理像素&#xff0c;所以通常只能以整数像素单位渲染边框。但是&#xff0c;有一些技巧可以模拟出看起来像是 0.5 像素的边框。 这里介绍使用&#xff1a…

Jmeter基础和概念(超详细整理)

JMeter 介绍&#xff1a; 一个非常优秀的开源的性能测试工具。 优点&#xff1a;你用着用着就会发现它的重多优点&#xff0c;当然不足点也会呈现出来。 从性能工具的原理划分&#xff1a; Jmeter工具和其他性能工具在原理上完全一致&#xff0c;工具包含4个部分&#xff1a…

机器学习---bagging与随机森林

1. bagging算法 集成学习有两个流派&#xff1a;一个是boosting派系&#xff0c;它的特点是各个弱学习器之间有依赖关系。另一种是 bagging流派&#xff0c;它的特点是各个弱学习器之间没有依赖关系&#xff0c;可以并行拟合。 Bagging的弱学习器之间的确没有boosting那样的联…