RocketMQ的push消费方式实现的太聪明了

news2024/11/29 11:34:42

最近仍然畅游在RocketMQ的源码中,这几天刚好翻到了消费者的源码,发现RocketMQ的对于push消费方式的实现简直太聪明了,所以趁着我脑子里还有点印象的时候,赶紧来写一篇文章,来掰扯一下,防止过两天就忘得一干二净了。

MQ消费方式

消费方式就是指消费者如何从MQ中获取到消息,分为两种方式,push(推方式)和pull(拉方式)。

1、push(推方式)

push,顾名思义,就是推的意思。就是当MQ收到生产者产生的消息的时候,会主动将消息推送到消费者进行消费,这种模式就叫push,也就是MQ将消息推给到消费者的意思。

图片

push模式

push这种模式的好处就是响应快,消息的实时性比较高,一旦消息MQ收到消息,那么就能立马将消息推送给消费者,消费者也就能立马收到消息进行消费。

但是这种push的模式,有个缺点就是一旦消息量比较大时,对消费者性能要求比较高,因为是消费者无法控制MQ消息的推送速度,一旦消息量大,那么消费者消费消息的压力就比较大。

2、pull(拉方式)

push是MQ主动给消费者推消息,那么pull呢?刚好跟push相反,就是消费者主动去MQ中拉取消息。

图片

pull模式

那么pull的优缺点自然也就跟push刚好相反。因为是消费者主动去MQ中拉取消息,那么消费者可根据自身消费的情况,决定何时去拉取消息,主动权在自己手上,这样消费者的压力就会相对小点;但是缺点也很明显,那么就会实时性相对于push方式会低一些,因为你得决定拉的时间间隔。

其实想想,消费方式就跟拿快递一样,快递就是一个消息,我自己就是消费者,快递要么快递小哥主动送(push)到家,要么我自己去快递站拿(pull)。

RocketMQ对于消费方式的实现

上一节说了消费消息的两种方式push和pull,或者说算一种理念。尚大的周阳老师有一句经常说的话我比较赞同,那就是“天上飞的理念,必然有落地的实现”。所以push或者pull到底如何落地,得看具体的MQ的产品了。

而RocketMQ作为阿里开源的一款高性能、功能丰富的MQ,自然同时实现了push和pull的两种消费方式,用户可以选择在项目中使用push还是pull。

图片

push模式的实现

图片

pull模式的实现

但是一般情况下,项目中都是使用push的方式来消费,因为pull除了时实性差外,pull方式还得让开发人员主动去维护消息消费进度,增加额外的操作。

所以接下来就着重讲一下RocketMQ是如何实现push的逻辑。

RocketMQ聪明地实现push的原因

上文说到push模式的优点是时实性好,但是缺点就是消费者压力会比较大,所以,难道实现push模式,只能舍弃压力的控制么?

就在这时,RocketMQ大喊了一声

图片

是的,RocketMQ对于push模式做到了实时和压力的平衡,这主要是因为RocketMQ的push模式其实算是一个“伪push”模式,真正底层的实现还是基于pull。

到这里可能有的小伙伴比较迷糊,怎么push变成“伪push”了,还是用pull实现的,到底是push还是pull?

图片

前面我说过,push和pull只是一种理论,具体的实现看MQ。

所以RocketMQ为了兼顾两者,就选择通过消费者主动拉消息来实现push的效果,这也是为什么我称为“伪push”的原因,RocketMQ都给封装好了,让你用起来感觉是MQ主动push消息给你的。

既然底层是pull,那么RokcetMQ在实现消费者的逻辑的时候,就可以很容易实现控制压力的效果,毕竟这是“拉”方式天然自带的buff;但是如何通过pull实现push的时实的优点呢?毕竟鱼和熊掌我RokcetMQ偏要兼得。

这时这就不得不提到一种叫“长轮询”的机制。

轮询与长轮询

轮询与长轮询都属于pull的实现,都是由客户端主动给服务端发送请求,拉取数据。套到MQ中,就是都是消费者主动去MQ拉消息。

轮询

轮询是指不管服务端数据有无更新,客户端每隔定长时间请求拉取一次数据,可能有更新数据返回,也可能什么都没有。

图片

再拿快递举例子,轮询就好比,小明买的iphone 13 pro max快递到了,显示正在派送中,但是小明等不及了,于是就去快递站拿,但是快递还没放到快递站,但是小明的心里急啊,他忍受不了相思之苦,于是小明每隔5分钟就往快递站跑一次,问一下快递到了没,到了就拿回来。这就是轮询的意思,也就是不论有没有数据,客户端都会每隔一定时间去请求一次服务端。

来分析一下拿快递的例子的问题:

  • 每隔5分钟就往快递站跑,那不是累死个小明么。

  • 还有一个问题,假设刚跑到快递站,快递没到,就回去了,但是刚到家的时候,快递到了,于是又等了5分钟,再去快递站终于拿到快递了,但是其实快递都到了几分钟了,你还是没有第一时间拿到快递,这就造成了延迟。

从而对应到程序中,就是会产生如下问题

  • 对于消息而言,会一直产生,这就要求消费者不停地间隔一定时间去拉取消息,即使没有消息也需要去请求,就会造成大量无用的请求,白白浪费大量耗费服务器内存和宽带资源。

  • 可能造成数据的延迟

长轮询

说长轮询概念之前,先来救救小明吧,毕竟小明可不想狗带。

既然原先小明每隔5分钟跑一次,那么是不是可以换种思路,当快递还没到的时候,让小明不要回来,直接在快递站待着,当快递到的时候,才让小明拿着快递回家。这下小明就喜死了,既可以有时间刷刷某音,逛逛某东,还可以在第一时间拿到13 pro max。

图片

所以这种可以在快递站等待的机制,就叫长轮询。

长轮询也是客户端请求服务端,如果服务端有数据,那么就立马返回,客户端再次请求;当服务端不存在数据的时候,服务端并不会给客户端响应,而是将请求给hold住,当服务端有数据的时候才会给客户端响应,返回数据。

所以长轮询可以解决如下问题

  • 解决轮询带来的频繁请求服务端但是没有的问题

  • 一旦新的数据到了,那么消费者能立马就可以获取到新的数据,所以从效果上,有点像是push的感觉。

但是长轮询也会带来服务端代码实现逻辑复杂的问题,当然相比于优点来说,都不太重要。

push消费方式源码探究

理论都讲完了,接下来就到了show me the code的时间了,来看看RocketMQ的是如何通过长轮询机制来实现压力和时实的平衡。

这里我画了一张push模式下消费者消费流程图。

消费者拉取消息的逻辑

  • ①消费者有一个后台线程,会去处理拉取消息(PullRequest)

  • ②先去判断有没有过多消息没有消费,如果有的话,那么就间隔一定时间再次从①开始执行拉取消息的逻辑

  • ③消费者没有过多消息没有消费,那么就会直接向MQ发送拉取消息的请求,有消息就返回,没有消息就hold住请求,等有新的消息到的时候才返回

  • ④消费者获取到消息之后,会去找用户自定义的消息处理逻辑的实现(MessageListener的实现)去消费消息,同时会再次拉取消息,继续从①开始执行逻辑

1、消费者拉取消息控制压力源码

当消费者准备去拉消息的时候,会先去判断当前消费者消费的压力再决定是否去拉取消息。

RocketMQ提供了两种判断消费压力逻辑,一种是基于还未消费的消息的数量的大小,还有一种是基于还未消费的消息所占内存的大小。

图片

控制压力源码

  • 判断还未消息的数量,数量太多就等会再执行重新执行拉取消息的逻辑

  • 判断还未消息的大小,如果还未消息的消息占用的内存过大,就等会再执行重新执行拉取消息的逻辑

总的一句话就是,当消费者消费的压力过大时,就不会去拉取消息,而是等待一定的时间再去执行拉取消息的逻辑,如果压力还是很大,就还继续等,如此循环,直到消费者的消费压力小于阈值的时候,才会真正的发送请求到MQ中拉取消息。

2、MQ将请求hold住源码

当服务端未找到消息时,就将请求进行挂起,存起来

图片

请求hold住源码

拉取不到消息时,会调用PullRequestHoldService的suspendPullRequest方法讲请求存储起来。PullRequestHoldService是用来存储拉取请求的类。

PullRequestHoldService

suspendPullRequest方法会将请求分类,放到ManyPullRequest里,然后用一个ConcurrentHashMap进行存储

3、MQ收到消息响应给消费者的源码

NotifyMessageArrivingListener

当生产者发送的消息达到MQ的时候,MQ会回调NotifyMessageArrivingListener的arriving方法,之后就会调用PullRequestHoldService的notifyMessageArriving方法,MQ会重新处理拉取消息的逻辑,此时就能找到最新来的那条消息,从而将最新的消息通过网络返回给消费者。

图片

notifyMessageArriving和返回消息逻辑

最后

所以从以上的分析可以看出,RocketMQ对于push的消费方式的实现是基于长轮询机制来实现的,同时平衡了时实和压力,这其实就很nice了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1256304.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[网络] 5. TCP 链接的建立与释放~汇总

大部分内容源于网络加之个人理解~巨人的肩膀有多大决定你可以看得多远~ 文章目录 1. 三次握手说一下三次握手的过程为什么是三次握手 2. 四次挥手说一下四次挥手的过程为什么需要四次挥手有可能出现三次挥手吗,什么时候会出现呢?为…

video标签在h5中被劫持问题

将video的视频链接转为blob export const encryptionVideo (options: URL) > {return new Promise((resolve, reject) > {window.URL window.URL || window.webkitURL;var xhr new XMLHttpRequest();xhr.open(GET, options.url, true);xhr.responseType blob;xhr.onl…

正则化与正则剪枝

写在前面:本博客仅作记录学习之用,部分图片来自网络,如需引用请注明出处,同时如有侵犯您的权益,请联系删除! 文章目录 引言正则化为什么会过拟合拉格朗日与正则化梯度衰减与正则化 应用解决过拟合网络剪枝 …

JPA 自关联 设置单向多对一

Spring boot 3 JPA中,遇到一个需求,建一个数据字典表: Dictionary,存放两级数据,第一级为字典项目,第二级为项目内容,查询时要把parent_id对应父项的名称也一起查出来,返回前端。 …

VUE简易计划清单

目录 效果预览图 完整代码 效果预览图 完整代码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>…

Web前端 ---- 【Vue】什么?代码堆在一起不好维护?辛辛苦苦改造的单文件组件用不了?看我直接Vue Cli脚手架安排

目录 前言 单文件组件 什么是单文件组件 单文件组件的内容 Es6模块的导入和导出 创建单文件组件 Vue Cli脚手架 前言 继上篇文章Vue组件的使用介绍了如何使用Vue组件&#xff0c;但是发现有一个很重要的问题&#xff0c;就是代码复用性很差&#xff0c;并且无法提供样式…

文件的写入和读取操作

题目&#xff1a; 编写一个程序&#xff0c;实现以下功能&#xff1a; 1. 创建一个新的文本文件&#xff0c;并将用户输入的数据写入文件中。 2. 打开已存在的文本文件&#xff0c;并将其中的数据显示在屏幕上。 #include <stdio.h> #include <stdlib.h> void wri…

事件委派+自定义属性+编程式导航实现路由跳转及传参

当我们页面中有许多a标签需要实现点击跳转到同一个页面并携带不同的参数时&#xff0c;我们就可以使用事件委派自定义属性编程式导航 的方式&#xff0c;用最小的内存实现路由跳转的最大效率。 为什么我们不用router-link 进行跳转&#xff1f; 要知道&#xff0c;我们页面中…

汇编:关于栈的知识

1.入栈和出栈指令 2. SS与SP 3. 入栈与出栈 3.1 执行push ax ↑↑ 3.2 执行pop ax ↓↓ 3.3 栈顶超界的问题 4. 寄存器赋值 基于8086CPU编程时&#xff0c;可以将一段内存当作栈来使用。一个栈段最大可以设为64KB&#xff08;0-FFFFH&#xff09;。 1.入栈和出栈指令…

058-第三代软件开发-文件Model

第三代软件开发-文件Model 文章目录 第三代软件开发-文件Model项目介绍文件Model 关键字&#xff1a; Qt、 Qml、 关键字3、 关键字4、 关键字5 项目介绍 欢迎来到我们的 QML & C 项目&#xff01;这个项目结合了 QML&#xff08;Qt Meta-Object Language&#xff09;…

[计算机网络]应用层概述

0.写在前面: 该层为教学模型的最后一层,某种意义上来说是最接近各位开发者的一层,正因如此,这层中的很多定义和概念大家都有属于自己的理解, 完全按照书本反而才是异类,因此在这里我会去结合我做前端开发的一些经验,来处理和讲解一些概念,另外本层中的部分协议也不会过多阐述了…

提升逼格,自己搭建博客网站不求人

背景 对于一个热爱分享知识和经验的大佬来说&#xff0c;搭建一个自己的个人博客是十分必要的。因为各个免费写博客平台都会有每天写博客限制&#xff0c;比如我现在这篇文章的限制&#xff0c;就是每天最多发表3篇&#xff0c;同时还给我的博客添加一大波广告&#xff0c;真是…

async函数和await关键字

async写在一个函数a前面&#xff0c;该函数变为异步函数&#xff0c;可在里面使用await关键字&#xff0c;await后面一般跟一个promise对象&#xff08;axios函数返回一个promise对象&#xff0c;里面有异步任务&#xff09;&#xff0c;await会原地等待该异步任务结果&#xf…

JVM基础篇:垃圾回收

1.前言 1.1C/C的内存管理 在C/C这类没有自动垃圾回收机制的语言中&#xff0c;一个对象如果不再使用&#xff0c;需要手动释放&#xff0c;否则就会出现内存泄漏。我们称这种释放对象的过程为垃圾回收&#xff0c;而需要程序员编写代码进行回收的方式为手动回收。内存泄漏指的…

WiFi 发射链路 MCS 自适应机制介绍

链路适配是指发射机选择最优的MCS向特定的接收机发送数据的过程。链路自适应算法的实现有其特殊性&#xff0c;但通常基于测量的数据包错误率(PER)。大多数算法监视PER并调整MCS以跟踪一个最佳的长期平均值&#xff0c;以平衡由于使用更高MCS发送更短数据包而减少的开销和由于更…

坚鹏:中国工商银行数字化背景下银行公司业务如何快速转型培训

中国工商银行作为全球最大的银行&#xff0c;资产规模超过40万亿元&#xff0c;最近几年围绕“数字生态、数字资产、数字技术、数字基建、数字基因”五维布局&#xff0c;深入推进数字化转型&#xff0c;加快形成体系化、生态化实施路径&#xff0c;促进科技与业务加速融合&…

jupyter notebook 添加conda环境变量为内核(kenel)

第一步&#xff1a;安装ipykernel 在激活环境后&#xff0c;需要安装ipykernel包&#xff0c;以便将Conda环境添加到Jupyter Notebook中。使用以下命令安装&#xff1a; pip install ipykernel第二步&#xff1a;将Conda环境添加到Jupyter 需要将Conda环境添加到Jupyter Not…

在拼多多,照见热气腾腾的平凡人生

文 | 螳螂观察 作者 | 易不二 内容丰富的《鲁迅日记》里&#xff0c;经常会出现“xx日晴&#xff0c;无事”的记载。 如果按照年份算&#xff0c;在被记载的日子里&#xff0c;每年鲁迅都有一二十天的时间是“无事”的状态。 很难想象&#xff0c;为人类历史文明前进照亮了…

2024年最新最全的Jmeter接口测试必会技能:jmeter对图片验证码的处理

jmeter对图片验证码的处理 在web端的登录接口经常会有图片验证码的输入&#xff0c;而且每次登录时图片验证码都是随机的&#xff1b;当通过jmeter做接口登录的时候要对图片验证码进行识别出图片中的字段&#xff0c;然后再登录接口中使用&#xff1b; 通过jmeter对图片验证码…

error LNK2038: 检测到“RuntimeLibrary”的不匹配项 解决方法

问题&#xff1a; 我们在使用Visual Studio编程的时候偶尔会遇到以下三种报错&#xff1a; error LNK2038: 检测到“RuntimeLibrary”的不匹配项: 值“MD_DynamicRelease”不匹配值“MDd_DynamicDebug” &#xff08;引用的是release模式&#xff0c;但设置成debug模式了…