rocketmq源码-consumer拉取消息(push模式)

news2024/11/26 8:33:14

前言

在前面consumer启动的博客中,有说过,在启动过程中,有两个比较重要的逻辑,分别是负载均衡和拉取消息的service,这篇博客,主要记录拉取消息的service,因为前面的demo和这篇笔记中的demo,都是基于push模式来学习的,所以前面的笔记都是基于push模式的,但是最近看了下pull模式,和push模式的代码还是有点区别的,所以后面单独起一篇博客,记录pull模式的逻辑

源码

org.apache.rocketmq.client.impl.consumer.PullMessageService#run

前面有说过,拉取消息,使用的是这个service,所以我们从这个service的入口处开始看起
这里会发现,只有两行代码

  1. 从一个queue中拉取到一个pullRequest请求体
  2. 然后调用pullMessage方法
    这里的这个queue很重要,这个queue中存放的是拉取消息的请求,会再调用pullMessage()方法拉取到消息,各个消费者处理完之后,再放到pullRequestQueue中一个请求,所以这里就会在本次拉取消息的请求完成之后,接着取pullRequest,再次触发拉取消息的请求

这里有一个很重要的逻辑:既然拉取消息的请求,是从pullRequestQueue中开始的,那在consumer启动之后,第一次拉取消息的pullRequest是什么放进队列里面的呢?因为这里看到只是去取数据,总要有一个地方,先放进去一个请求,才会开始拉取消息;第一次把pullRequest放到queue中,是在负载均衡分配了messageQueue之后,会构建pullRequest,然后把请求放到queue中,在后面负载均衡的service笔记中会记录
在这里插入图片描述

我们接着来看pullMessage()的相关逻辑:
会发现,在这个方法中,也没有太多的逻辑,就是把当前消费者信息取出来,强转成功push类型的consumer,然后调用其pullMessage()
在这里插入图片描述

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage

在这个方法中,会有几个简单的流控判断
在这里插入图片描述

这里有一个pullCallBack方法,我给先暂时折叠起来了,这个方法,并不会立即执行,而是在异步拉取消息的时候,接收到broker的回调时,会通过这里的callback方法去解析返回的消息信息,所以暂时先不看
接着会通过pullKernelimpl去发起拉取消息的netty请求
在这里插入图片描述

org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl

在pullKernelImpl的方法里面,主要是根据brokerName获取到brokerAddr,然后构建netty请求的请求体,我们就不再贴代码了,里面的代码比较简单,并且不复杂,我们直接看发送请求的代码

org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessage

在这个方法中,会先拼接拉取消息的code码
然后根据当前拉取消息的模式,调用不同的方法拉取消息
在这里插入图片描述

同步请求(这个后面博客单独详细说)

可以看到,同步请求,没有回调,就是等拿到response结果之后,再去处理;对于同步请求,通常和pull模式有关联
上面所讲的顺序和并行消费,我感觉只有push模式,才会有这个区分,对于pull模式的话,是consumer自己去拉取消费者的,好像没有看到区分并行、顺序消费的逻辑
所以这里的同步请求,不做过多解释,在后面讲解pull模式的时候,再详细说
在这里插入图片描述
同步请求的处理逻辑中,就是根据当前返回结果,构建了一个pullResult对象,然后返回
在这里插入图片描述
对于同步请求,在这个方法中,我们先只需要知道,当consumer发起拉取消息请求的时候,同步请求会等待返回结果,然后返回pullResult对象

异步发送请求

可以看到,异步发送请求时,这里是把回调回来的pullResult交给pullCallBack方法去处理了,所以这里当异步回调回来之后,会有pullCallBack来处理,也就是前面我说的先暂时折叠起来的方法
在这里插入图片描述
所以,我们要回来,去看pullCallBack的方法,在callBack的onSuccess方法中,大致我分了三个逻辑
这三个逻辑和下面图中三个红框截出来的代码一一对应

  1. 根据tag进行过滤,这是consumer在订阅topic的时候,可以指定过滤条件,在这里会根据tag进行一层过滤
  2. 将消息信息放到了processQueue中,然后通过submitConsumeRequest()方法,将请求信息交给consumer的回调方法
  3. 在消费者处理完成之后,会再次把pullRequest请求信息,放到队列中,继续发起下次请求,在case FOUND:这个case中的第二行代码,会设置pullRequest的nextOffset属性,这样就确保了下次不会拉取到重复的消息

在这里插入图片描述

这里我们看下上面截图中第二个框圈起来的 submitConsumeRequest()方法,在这个方法中,分为顺序消费和并行消费

顺序消费

org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#submitConsumeRequest

对于顺序消费,这里就是直接提交了一个任务到线程池中

在这里插入图片描述

在其task的run方法中,会先对当前messageQueue进行加锁
在这里插入图片描述

中间会for循环,遍历本次拉取到的消息,然后依次调用下面这个方法,去处理,这里的messageListener是consumer在启动之前,程序员自己注册的
在这里插入图片描述
在这里插入图片描述
这里我们可以看到,不管消息有多少,对于顺序消费的时候,只会有一个线程处理,并且这个线程在处理的时候,会对messageQueue加锁,只处理这个messageQueue中的消息,所以,对于一个messageQueue中的消息,一定是有序的

并行消费

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest

在这里插入图片描述
对于并行消费,我们可以发现,这里通过for循环去提交consumeRequest任务,可能会存在并行处理的场景,因为for循环的时候,一个消息,就提交了一个任务到线程池中
在这里插入图片描述
可以看到,在异步消费的时候,并没有加锁的逻辑,也不可能有加锁的逻辑,因为是多个线程,在并行的处理消息,加锁就会有问题,加锁就不是并行消费了

总结

对于消费者有两种模式:
pull和push
但是在push模式中,又分为了顺序消费和并行消费
pull模式,就是消费者的业务代码中,自己去拉取消息,处理完了,继续去拉下一批消息,也就是我们常说的,根据自己的消费能力去处理消息
但是push模式,是在定时的去拉取broke中的消息,然后就回调业务上的处理逻辑

在push模式中,底层发送netty请求,是异步发送的,在接收到broker返回的response之后,会通过callBack方法进行处理,在处理的过程中,会进行tag的过滤,最后将解析到的msg,通过业务上注册的messageListener进行回调处理

对于push模式,在本次拉取消息的结果处理完之后,会继续发起下一次拉取消息的请求

下面这个是push模式拉取到消息之后的处理逻辑
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/90315.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

电磁场知识整理------2022/12/14

电磁场知识整理1、数学基础麦克斯韦方程2、麦克斯韦方程组的近似情况2.1 恒定电场2.2 恒定电流场2.3 恒定磁场2.4 动态电磁场2.5 时谐电磁场3、电磁辐射与电磁波写在最后学习资源:慕课上浙大的工程电磁场与波。 工程电磁场与波 1、数学基础 正交坐标系:…

Android 线上卡顿监控

文章目录1. 卡顿与ANR的关系2. 卡顿原理3. 卡顿监控3.1 WatchDog3.2 Looper Printer3.2.1 监控TouchEvent卡顿3.2.2 监控IdleHandler卡顿3.2.3 监控SyncBarrier泄漏4. 小结平时看博客或者学知识,学到的东西比较零散,没有独立的知识模块概念,而…

leetcode 375. 猜数字大小 II-【python3详细图解】递归+记忆化搜索与动态规划

题目 我们正在玩一个猜数游戏,游戏规则如下: 我从 1 到 n 之间选择一个数字。你来猜我选了哪个数字。如果你猜到正确的数字,就会 赢得游戏 。如果你猜错了,那么我会告诉你,我选的数字比你的 更大或者更小 &#xff0c…

二十、JavaScript——逻辑非

! 逻辑非- &#xff01;可以对一个值进行非运算 - 它可以对一个布尔值进行取反操作 true 变成 false false 变成 true - 如果对一个非布尔值进行取反&#xff0c;它会将其先转换为布尔值&#xff0c;再进行取反操作 可以利用这个特点将其他类型转换为布尔值 <script>/*! …

Hybrid模式下,如何实现热更新?

做过开发的小伙伴应该对“热更新”不陌生吧&#xff01;热更新就是指在游戏或软件更新的时候&#xff0c;不用再重新下载安装包进行安装&#xff0c;而是在启动应用程序的时候&#xff0c;在内部进行资源或代码的更新。那么如今&#xff0c;市场为什么越来越多地选择热更新技术…

数据结构——图最全总结(期末复习必备)

目录 图 定义&#xff1a; 基本术语&#xff1a; 图的存储结构 邻接矩阵 邻接表 十字链表 邻接多重表 图的遍历 深度优先搜索(Depth First Search,DFS) 广度优先搜索(Breadth First Search,BFS) 图的应用 最小生成树 普利姆算法 克鲁斯卡尔算法 最短路径 单源最短…

优蓝冲刺港股:上半年期内亏损过亿 主打蓝领人才服务

雷递网 雷建平 12月14日优蓝国际控股股份有限公司&#xff08;简称&#xff1a;“优蓝”&#xff09;日前递交招股书&#xff0c;准备在香港上市。上半年期内亏损1.18亿优蓝是一家蓝领终身服务平台&#xff0c;旨在成为蓝领人才的首选终身服务平台。截至最后实际可行日期&#…

[附源码]Nodejs计算机毕业设计基于博客系统的UI手机界面展示Express(程序+LW)

该项目含有源码、文档、程序、数据库、配套开发软件、软件安装教程。欢迎交流 项目运行 环境配置&#xff1a; Node.js Vscode Mysql5.7 HBuilderXNavicat11VueExpress。 项目技术&#xff1a; Express框架 Node.js Vue 等等组成&#xff0c;B/S模式 Vscode管理前后端分…

vue3插槽(匿名插槽-具名插槽-插槽作用域-动态插槽)

文章目录容器布局匿名插槽具名插槽作用域插槽动态插槽容器布局 &#x1f468;&#x1f3fb; parent.vue <script setup lang"ts"> import { ref, useAttrs, defineProps } from "vue"; import children from ./children.vue</script><tem…

界面控件DevExpress WinForm——HTML-CSS感知控件介绍

DevExpress WinForm拥有180组件和UI库&#xff0c;能为Windows Forms平台创建具有影响力的业务解决方案。DevExpress WinForm能完美构建流畅、美观且易于使用的应用程序&#xff0c;无论是Office风格的界面&#xff0c;还是分析处理大批量的业务数据&#xff0c;它都能轻松胜任…

疫情防控|Springboot+小程序+校园疫情防控系统设计与实现

作者主页&#xff1a;编程指南针 作者简介&#xff1a;Java领域优质创作者、CSDN博客专家 、掘金特邀作者、多年架构师设计经验、腾讯课堂常驻讲师 主要内容&#xff1a;Java项目、毕业设计、简历模板、学习资料、面试题库、技术互助 收藏点赞不迷路 关注作者有好处 文末获取源…

【刷题笔记】之牛客面试必刷TOP101(二分查找-I + 二维数组中的查找 + 寻找峰值 + 数组中的逆序对 + 旋转数组的最小数字 + 比较版本号)

目录 1. 二分查找-I 2. 二维数组中的查找 3. 寻找峰值 4. 数组中的逆序对 5. 旋转数组的最小数字 6. 比较版本号 1. 二分查找-I 题目链接&#xff1a;二分查找-I_牛客题霸_牛客网 (nowcoder.com) 题目要求&#xff1a; 上代码 import java.util.*;public class Solut…

Spring MVC学习 | 视图RESTFul

文章目录一、视图1.1 视图对象View1.2 ThymeleafView1.3 转发视图1.4 重定向视图1.5 视图控制器二、RESTFul2.1 简介2.2 PUT和DELETE请求的实现2.2.1 HiddenHttpMethodFilter过滤器2.2.2 实现PUT请求2.2.3 实现DELETE请求学习视频&#x1f3a5;&#xff1a;https://www.bilibil…

Python 元组(Tuple)操作详解

Python的元组与列表类似&#xff0c;不同之处在于元组的元素不能修改,元组使用小括号,列表使用方括号,元组创建很简单,只需要在括号中添加元素,并使用逗号隔开即可 一、创建元组 代码如下: 1 2 3 tup1 (physics, chemistry, 1997, 2000); tup2 (1, 2, 3, 4, 5 ); tup3 &qu…

Redis实现朋友圈,微博等Feed流功能,实现Feed流微服务(业务场景、实现思路和环境搭建)

文章目录业务场景Feed流相关概念Feed流特征Feed流分类实现思路环境搭建数据库表结构新建Feeds功能微服务ms-feeds配置类 RedisTemplateConfigurationREST配置类 RestTemplateConfigurationFeeds 实体类FeedsVO 响应类业务场景 在互联网领域&#xff0c;尤其现在的移动互联网时…

Linux环境下MySQL的安装与使用

目录 一&#xff1a;安装MYSQL说明 1.1 查看是否安装过MySQL 1.2 MYSQL的卸载 二&#xff1a;MySQL在Linux下的安装 三&#xff1a;MYSQL登录 3.1 首次登录 3.2 修改密码 3.3 设置远程登录 一&#xff1a;安装MYSQL说明 1.1 查看是否安装过MySQL 检查rpm安装包 rpm -…

JAVA毕业设计——基于ssm高校共享单车管理系统 (源代码+数据库)604

代码地址 https://github.com/ynwynw/webike-public 毕业设计所有选题地址 https://github.com/ynwynw/allProject 基于ssm高校共享单车管理系统 (源代码数据库)604 一、系统介绍 用户管理&#xff0c;服务点管理&#xff0c;单车管理&#xff0c;分类管理&#xff0c;学生管…

基于java+springboot+mybatis+vue+mysql的大学生体质测试管理系统

项目介绍 随着我国大学生数量的不断增加&#xff0c;个个高校对大学生的体质也开始高度的进行重视&#xff0c;只有拥有了高强健康体质的大学生才能够全身心的投入到学习和工作中&#xff0c;为了能够更好的对大学生的体质进行检测我们通过java编程语言&#xff0c;后端采用sp…

redis之哨兵机制

0. 前言 我们知道&#xff0c;只有主库才能有写操作&#xff0c;而从库只能进行读操作&#xff0c;那么当主库宕机后&#xff0c;如何保证服务的正常进行呢&#xff1f; 本文主要介绍的是 Redis 提供的哨兵机制&#xff0c;通过哨兵监控主库的状况&#xff0c;如果发现主库下…

Python迭代法Iteration的讲解及求解海藻问题、方程问题实战(超详细 附源码)

一、迭代法简介 迭代法&#xff08;iteration&#xff09;是现代计算机求解问题的一种基本形式。迭代法与其说是一种算法&#xff0c;更是一种思想&#xff0c;它不像传统数学解析方法那样一步到位得到精确解&#xff0c;而是步步为营&#xff0c;逐次推进&#xff0c;逐步接近…