rocketmq源码-pull模式拉取消息、同步拉取消息

news2025/1/24 11:41:14

前言

上一篇博客,记录的是push模式,异步发送netty请求拉取消息的代码,这篇博客主要记录consumer发送同步netty请求,去拉取消息的逻辑,但是对于同步发送请求,需要结合LitePullConsumer来看

在Lite PullConsumer中有两种方式,分别是:subscribe和assign模式,这两种模式的区别,我的理解是:前者是mq帮我们进行负载均衡,后者我们可以按照自己的需求去进行负载均衡,给当前消费者分配messageQueue

但是这两种模式的相同点是:都是采用的pull模式,需要在消费者这一端,主动的去pull消息

对于使用同步发送请求的consumer,需要这样使用
在这里插入图片描述

所以可以看到,对于pull模式,就是在业务代码中自己去poll的,接着我们来看源码

源码

start()

org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl#start

如图1所示:在这个start()方法中,有两个方法需要着重关注下,就是mqClientFactory.start()和operateAfterRunning()
前面这个方法已经看了好多次了,后面这个方法是assign模式的时候,会取启动task任务,这个后面再说

图1:
在这里插入图片描述

我们先来看mqClientFactory.start(),和这篇博客有关系的,是这里面负载均衡的逻辑
这个调用链中间的逻辑,比较简答,就不介绍了,我们主要关注和pull模式拉取消息有关的逻辑

this.rebalanceService.start();
org.apache.rocketmq.client.impl.consumer.RebalanceService#run
	org.apache.rocketmq.client.impl.factory.MQClientInstance#doRebalance
		org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl#doRebalance
			org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalance
				org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic

在这里进行负载均衡的时候,区分了集群模式和广播模式,至于如何进行负载均衡,后面会单独起一篇博客记录,这里只关系和pull模式拉取消息有关的逻辑
我们会发现,不管是集群模式,还是广播模式,都会调用中两个方法
第一个方法updateProcessQueueTableInRebalance 是更新当前consumer所需要处理的messageQueue
第二个方法messageQueueChanged,对于push模式,暂时看懂处理逻辑,但是对于pullLite模式,有很重的一个步骤
在这里插入图片描述

对于litePull模式,在这个方法中会调用updatePullTask()这个方法,这个方法,是启动了task任务,这个task任务是为了拉取消息的
在这里插入图片描述

这里在update的时候,可以看到,会先从内存中taskTable中取任务,如果是刚启动,这里肯定是null,所以关键的逻辑在startPullTask()中
在这里插入图片描述

可以看到,这里是根据messageQueue进行遍历,初始化pullTask对象,如果pullTask对象不在内存中,就先设置到内存中,然后再启动task任务;这里为什么只需要调用schedule调用一次呢?我们知道,拉取消息的话,是一个持久的逻辑,就是需要不停的去调用,那就不用想了,肯定是在task中,会再次调用schedule去拉取消息

在这里插入图片描述
前面铺垫了这么多,就是为了讲清楚这里的task任务是在什么时候启动的,现在我们可以看到,是在负载均衡之后,根据最新的负载均衡结果,每个messageQueue对应一个task,然后启动task,去拉取消息

对于PullTaskImpl的run()方法,不贴全部的代码了,看关键逻辑
这里圈起来的四块代码,是四块比较关键的逻辑

  1. 这里的pull请求,就是取发起同步netty请求的地方
  2. 第二部分是针对同步返回的pullResult进行处理
  3. 第三部分是更新当前messageQueue的offset,表示已经处理到哪里了,下次去拉取消息时会用到
  4. 重新调度当前task任务,继续拉取消息

在这里插入图片描述

pull() 发起同步netty请求

这里只需要一张截图,就全说明白了,如果看过前面一篇push模式的博客,就会发现,对于pull模式,这里设置的sync 同步模式发送请求
在这里插入图片描述

submitConsumeRequest

这里也是只需要一张截图,在从broker获取到返回参数之后,这里会把消息取出来,然后包装成ConsumeRequest对象;我们下面 消费者poll()拉取消息 这一节会看到,消费者去拉取消息的时候,是通过poll()方法拉取的,在底层,就是从这个cache中获取的
在这里插入图片描述

第三点和第四点就不说了,比较简单,没有什么特殊逻辑

operateAfterRunning()

前面在消费者启动的时候,除了调用mqClientFactory.start(),还会operateAfterRunning()调用这个方法

对于assign模式,这里会调用updateAssignPullTask();这个方法很重要,也是启动拉取消息任务的地方
在这里插入图片描述
对于updateAssign方法,也一样,会去调用startPullTask方法
但是在调用之前,会根据入参的mqNewSet进行remove的操作,也就是说,如果现在consumer处理的是messageQueue1、messageQueue2;但是现在consumer自己分配了messageQueue之后,可能consumer只处理messageQueue1,那在这里,就会把messageQueue2对应的task给停掉,因为这里把cancelled属性设置为了true
在这里插入图片描述

在这里插入图片描述
这里对于assign模式,一直有个疑问,明明在启动负载均衡策略的时候,会启动一次task,为什么还会接着再判断一遍呢?因为我感觉负载均衡逻辑执行了之后,已经启动了task,这里再判断应该也不会再启动一遍,因为task已经在taskTable中了
但是直到我看到了assign模式的使用方法之后,我明白了
这里可以看到,在start之后,assign模式,会自己再去根据自己的逻辑,去计算当前消费者要处理哪些messageQueue,然后调用assign方法更新

在这里插入图片描述
这里可以看到,在consumer自己重新计算分配了messageQueue之后,会调用assign方法更新consumer所需要处理的messageQueue,并且会对task任务进行一遍重新处理
在这里插入图片描述

消费者poll()拉取消息

这里poll的逻辑也很简单,就是从一个blockingQueue中拉取request信息,然后从request对象中获取到当前那要处理的消息,把解析到的消息,返回给consumer;
在前面 submitConsumeRequest 这一节中,有看到消息是怎么放到consumeRequestCache中的,所以对于pull的模式,就是这么简单
在这里插入图片描述

总结

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/92142.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C语言基础—运算符及优先级

本章主要讲解运算符的优先级和结合顺序 知识点&#xff1a; 运算符分类&#xff0c;记忆了解13种运算符注意运算符的易错点运算符的优先级 运算符及优先级运算符分类❗ 运算符易错不能直接连续判断“< 值 <”关于判断两个浮点数是否相等逻辑表达式&#xff08;布尔型&am…

CSS:border-image

border-image属性对图像的规格和比例比较高&#xff0c;导致使用成本比较高。另外&#xff0c;常见的场景中&#xff0c;大家更倾向于扁平化而不是非拟物化&#xff0c;边框装饰通常在项目中不会出现。 border-image是由多个CSS属性缩写的&#xff0c;比如: border-image-sou…

hc32和stm32 can波特率设置

前言 笔者在调试一款新的mcu的can通信时候&#xff0c;最麻烦的是波特率设置。由于没有弄明白其计算原理&#xff0c;经常出错&#xff0c;且不同的波特率有不同的采样点的要求。浪费了不少时间。这次一次搞明白can波特率的计算公式。 can波特率计算 在ISO 11898-1-2015 标准…

音视频基础概念(2)——音频

目录 1. 基本知识 2.采样率和采样位数 3.音频编码 4. 声道数 5. 码率 6. 音频格式 日常生活中&#xff0c;音视频随处可见&#xff0c;包括视频、音频、编解码、封装容器、音视频等概念。 1. 基本知识 音频数据的承载方式最常用的是脉冲编码调制&#xff0c;即PCM。于…

JAVA面试(2022年Java常见面试问题)

1、谈谈你对Spring中IOC和AOP的理解。 答案&#xff1a; 2、谈谈Spring的bean的创建过程和生命周期。 答案&#xff1a; 3、谈一下JVM的内存分配和垃圾回收机制。 答案&#xff1a; 4、谈一下你使用比较多的设计模式和场景。 答案&#xff1a; 5、谈一些mysql的事务隔离。 …

什么是文件系统?

【推荐阅读】 一文了解Linux上TCP的几个内核参数调优 一文剖析Linux内核中内存管理 分析linux启动内核源码 文件系统是操作系统用于明确存储设备&#xff08;常见的是磁盘&#xff0c;也有基于NAND Flash的固态硬盘&#xff09;或分区上的文件的方法和数据结构&#xff0c;…

传奇列表上传登录器公告小窗口怎么修改

传奇列表上传登录器公告小窗口怎么修改 很多小伙伴不会上传列表&#xff0c;我是艾西今天给大家分享下怎么上传列表 我们开始实操&#xff08;纯教学分享&#xff09; 在我们的网站文件夹里创建一个列表.txt 在浏览器里找一个列表模板例&#xff1a;www.pkp123.cn:88&#xff…

1570_AURIX_TC275_SCU_ERU

全部学习汇总&#xff1a; GreyZhang/g_TC275: happy hacking for TC275! (github.com) 从系统的逻辑图看&#xff0c;能够很清楚看到这个模块的功能处理过程&#xff1a;首先是边沿信号的识别&#xff0c;接着是根据判断进行置位处理&#xff0c;最后进行工作触发。如果设置的…

使用Nordic的nRF52840 Dongle配合Wireshark对蓝牙设备抓包(BLE)

硬件准备&#xff1a; 1&#xff09;nRF52840 Dongle 2&#xff09;待抓包的蓝牙设备 软件准备&#xff1a; 1&#xff09;Python 2&#xff09;Wireshark 3&#xff09;nRF Sniffer for Bluetooth LE Python安装 需要注意的是下载的nRF Sniffer for Bluetooth LE版本是否…

K8s 之 Deployment 应用案例

目录一、YAML 配置文件二、运行服务三、更新 Deployment3.1 动态伸缩容3.2 触发上线四、故障自动转移五、指定节点运行 Pod六、删除 deployment一、YAML 配置文件 我们要清楚&#xff0c;在 K8s 中有两种创建资源的方式&#xff1a; &#xff08;1&#xff09;命令行方式&…

【GO】 K8s 管理系统项目[API部分--Pod]

K8s 管理系统项目[API部分–Pod] 前端: Vueelement plus 后端: gogin 1. 功能设计 2. 初始化 2.1创建项目 2.2 配置goproxy GOPROXYhttps://goproxy.cn 2.3 添加格式化工具 2.4 安装模块 go get k8s.io/client-go/tools/clientcmd go get k8s.io/api/core/v1 go get k8s.i…

使用 docker buildx 构建跨平台 Go 镜像

目录 前提 docker buildx 启用 Buildx builder 实例 构建驱动 buildx 的跨平台构建策略 一次构建多个架构 Go 镜像实践 源代码和 Dockerfile 执行跨平台构建 验证构建结果 如何交叉编译 Golang 的 CGO 项目 准备交叉编译环境和依赖 交叉编译 CGO 示例 总结 参考链接…

供水设备远程监控客户案例

一、客户介绍 客户积累多年的技术研发和工程运维经验&#xff0c;对传统的恒压供水工程所面临的维护难、维修难、运维效率低和能耗管控弱等诸多问题有深刻的体会&#xff0c;经过广泛调研&#xff0c;客户最终选择使用蓝蜂物联网的云平台和边缘计算产品对恒压供水设备和工程进行…

数据处理指令(一)—— 搬移指令MOV、MVN

数据处理指令指的是和数学运算、逻辑运算相关的指令&#xff0c;比如加减乘、与或非、赋值比较等 目录 1、MOV —— 直接搬移 (1) MOV 指令格式 (2) MOV生成指令的策略&#xff08;MOV的优点&#xff09; (3) MOV 只能搬移“立即数”的原因&#xff08;MOV的缺点&#x…

问卷设计一:问卷题目哪些有类型和注意要点?

问卷法常被人们应用于社会调查中&#xff0c;它能反馈出最真实的社会信息。所以&#xff0c;很多企业为了最大程度地了解市场&#xff0c;也经常使用问卷调查法进行研究。不过&#xff0c;想要发挥出问卷法的最大用处&#xff0c;前提是要将问卷设计规范并且可量化。 想要设计…

用ArkTs在鸿蒙系统上画一个世界杯海报

偶然看到了CSDN关于世界杯的征文活动&#xff1a; 用代码画一个足球&#xff1f; 哈哈很有意思&#xff01; 想了想&#xff0c;画一个自定义View&#xff08;足球&#xff09;&#xff0c;当然是使用Canvas了&#xff0c;但除了Canvas还有没有其它方法呢&#xff1f;那是必须…

c语言算数转换 操作符

【题目名称】下面代码的结果是&#xff1a;( b)#include <stdio.h> int main() {int a, b, c;a 5;c a;b c, c, a, a;//逗号表达式从左向右以此计算 表达式结果是最后一个表达式b a c; //a9 先算加后算加等printf("a %d b %d c %d\n:", a, b, c);retu…

【软件测试】工作瓶颈?测试的出路在哪?

目录&#xff1a;导读前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09;前言 3年软件测试经验&am…

【内网安全-隧道技术】SMB、ICMP、DNS隧道、SSH协议

目录 一、基础知识 二、隧道技术 1、简介&#xff1a; 2、SMB隧道 3、ICMP隧道 4、DNS隧道 5、SSH协议 6、控制上线-插件 一、基础知识 【内网安全-基础】基础知识、信息收集、工具https://blog.csdn.net/qq_53079406/article/details/128292587?spm1001.2014.3001.55…

Seata 术语

爬虫组件分析目录概述需求&#xff1a;设计思路实现思路分析1.TC (Transaction Coordinator) - 事务协调者2.TM (Transaction Manager) - 事务管理器3.RM参考资料和推荐阅读Survive by day and develop by night. talk for import biz , show your perfect code,full busy&…