RabbitMq:RabbitMq消息中的相关处理 ③

news2025/1/22 22:05:36

一、解耦思想

        在 RabbitMQ 在设计的时候,特意让生产者和消费者分离,也就是消息的发布和消息的消费之间是解耦的。

        生产者与消费者之间的直连,少了很多的灵活性和策略的制定。还有一种解耦的思想存在。

  二、消息的可靠性保证与性能关系

        消息的可靠性保证:

                                1.靠事务机制(存在事务的几阶段过程,性能下降严重)

                                2.确定机制--异步回复机制(轻量级处理机制)

        相比之下,发送方确认机制最大的好处在于它提供异步回复机制,一旦发布一条消息生产者程序可以在等待信道返回确认的同时继续发布下一条消息,当消息得到最终确认之后,生产者程序可以通过回调 方法来处理该确认消息,

confirm的三种实现方式

        方式一:channel.waitForConfirms()普通发送方确认模式;消息到达交换器,就会返回 true。

        方式二:channel.waitForConfirmsOrDie()批量confirm模式,每发送一批消息之后,调用waitForConfirms()方法,等待服务端confirm,这种批量确认的模式极大的提高了confirm效率,但只要有一个消息未到达交换器就会抛出 IOException 异常。客户端往往需要将这一批次的消息全部重发,这会带来明显的重复消息,如果这种情况频繁发生的话(网络不稳定),效率也会不升反降;

        方式三:channel.addConfirmListener()异步监听发送方确认模式; 通过监听器的返回效果和批量confirm模式类似,都是每隔一段时间确认一批消息,区别就是不会造成生产者阻塞,但依旧会导致重复消息

        在 RabbitMQ 中,有不同的投递机制(生产者),但是每一种机制都对性能有一定的影响。一般来讲速度快的可靠性低,可靠性好的性能差,具体怎么使用需要根据你的应用程序来定,所以说没有最好的方式,只有最合适的方式。只有把你的项目和技术相结合,才能找到适合你的平衡。

三、消息推拉模式

        Consumer消费消息,并向服务器进行应答。表明这个消息已经消费完了还是可以继续让别人消费。主要收集了两种消费方式

推模式(push)

        1:推模式接收消息是最有效的一种消息处理方式。channel.basicConsume(queneName,consumer)方法将信道(channel)设置成投递模式,直到取消队列的订阅为止;在投递模式期间,当消息到达RabbitMQ时,RabbitMQ会自动地、不断地投递消息给匹配的消费者,而不需要消费端手动来拉取,当然投递消息的个数还是会受到channel.basicQos的限制。

        2:推模式将消息提前推送给消费者,消费者必须设置一个缓冲区缓存这些消息。优点是消费者总是有一堆在内存中待处理的消息,所以当真正去消费消息时效率很高。缺点就是缓冲区可能会溢出。

        3:由于推模式是信息到达RabbitMQ后,就会立即被投递给匹配的消费者,所以实时性非常好,消费者能及时得到最新的消息。

        4:Consumer等待rabbitMQ 服务器将message推送过来再消费。一般是启一个一直挂起的线程来等待。

拉模式(pull)

        1:如果只想从队列中获取单条消息而不是持续订阅,则可以使用channel.basicGet方法来进行消费消息。

        2:拉模式在消费者需要时才去消息中间件拉取消息,这段网络开销会明显增加消息延迟,降低系统吞吐量。 

        3:由于拉模式需要消费者手动去RabbitMQ中拉取消息,所以实时性较差;消费者难以获取实时消息,具体什么时候能拿到新消息完全取决于消费者什么时候去拉取消息。

结论

1:不能在循环中使用拉模式来模拟推模式,因为拉模式每次都需要去消息中间件中拉取消息来消费,所以会严重影响RabbitMQ性能。

2:要想实现高吞吐量,消费者需要使用推模式。

  • 其次,官方特意提到的message的持久性。关键的message不能因为服务出现问题而被忽略。还要注意,官方特意提到,所有的queue是不能被多次定义的。如果一个queue在开始时被声明为durable,那在后面再次声明这个queue时,即使声明为 not durable,那这个queue的结果也还是durable的。
  • 然后,是中间件最为关键的分发方式。这里,RabbitMQ默认是采用的fair dispatch,也叫round-robin模式,就是把消息轮询,在所有consumer中轮流发送。这种方式,没有考虑消息处理的复杂度以及consumer的处理能力。而他们改进后的方案,是consumer可以向服务器声明一个prefetchCount,我把他叫做预处理能力值。channel.basicQos(prefetchCount);表示当前这个consumer可以同时处理几个message。这样服务器在进行消息发送前,会检查这个consumer当前正在处理中的message(message已经发送,但是未收到consumer的basicAck)有几个,如果超过了这个consumer节点的能力值,就不再往这个consumer发布。
    这种模式,官方也指出还是有问题的,消息有可能全部阻塞,所有consumer节点都超过了能力值,那消息就阻塞在服务器上,这时需要自己及时发现这个问题,采取措施,比如增加consumer节点或者其他策略
    *Note about queue size
    If all the workers are busy, your queue can fill up. You will want to keep an eye on that, and maybe add more workers, or have some other strategy.*
    另外 官网上没有深入提到的,就是还是没有考虑到message处理的复杂程度。有的message处理可能很简单,有的可能很复杂,现在还是将所有message的处理程度当成一样的。还是有缺陷的,但是目前也只看到dubbo里对单个服务有权重值的概念,涉及到了这个问题。

四、消息的丢失      

1、producer生产者丢失消息
        原因:生产者发送消息由于网络等原因并没有发送到RabbitMq
解决方案:
        1.1、开启RabbitMq事务机制
        生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消 息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit,类似我们数据库数据库事务机制。

        1.2、开启 confirm 模式
        在生产者端设置开启 confirm 模式之后,你每次写的消息都会分配一个唯一的 ID,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息已经收到。如果 RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,你可以重试。而且可以结合这个机制在自己业务里维护每个消息 ID 的状态,如果超过一定时间还没接收到这个消息的回调,那么可以业务主动重发。

        事务机制和 confirm 机制优劣:
        事务机制是同步的,提交一个事务之后会阻塞,吞吐量会下来,耗性能。
        confirm 机制是异步的,流程不会阻塞,吞吐量较高,性能较好。

2、broker消息中间件自身丢失消息
        原因:RabbitMq收到生产者的消息后还没有来得及持久化到磁盘,又或者创建队列没有持久化以及消息并没有设置为持久化,在Mq故障宕机后都会有消息丢失的情况。
        解决方案:
        2.1、创建队列queue的时候设置队列持久化
        2.2、mq配置deliveryMode == 2 消息持久化

        重点:必须同时设置队列持久化和消息持久化,再结合生产者的confrim模式,才能保证消息准确投递到broker并保证进入磁盘。

3、consumer消费者丢失消息
        原因:消费者自动ack配置情况下,业务代码异常或者其他故障消息并没有处理完成也会自动ack。RabbitMq消息ack后就会丢弃,这就导致异常情况下的消息丢失了。
        解决方案:
        3.1 关闭RabbitMq自动ack,业务代码成功消费了消息手动调用Mq ack,让Mq丢弃消息;如果业务代码异常则直接nack,让Mq重新推送消息进行处理。当然,在要求比较高的情况下也可以异常数据进入死信队列,保证数据的完整性。

        

五、消息的重复消费/消费顺序

        什么情况下会造成消息重复呢?

        假设我们的消费者在执行完任务后,准备显式调用 basicAck给RabbitMQ删除消息时。此时消费者宕机了。

        此时我们的RabbitMQ检测到自己队列所对应的消费者掉线了,就会将这个消息重新入队并等待下一个消费者连接进来后继续投递消费。那么问题就很明显了,上一个消费者已经消费了消息,而新连接的消费者再次消费了该消息,导致了重复消费。

        无论如何,手动提交的方式也已经解决了最大的麻烦(消息丢失的问题)。

QoS 预取模式(重复消息)
        在确认消息被接收之前,消费者可以预先要求接收一定数量的消息,在处理完一定数量的消息后,批量进行确认。如果消费者应用程序在确认消息之前崩溃,则所有在RabbitMQ中未确认的消息将被重新发送给其他消费者。很明显,这样也会导致重复消息的产生。

        我们不难发现,无论是生产者还是消费者,但凡是批量操作,总会让重复消息的概率大大提升。奈何批量操作的速度快啊,因此还是要用。

        我们发现在一个消息从生产者到消费者的过程中,至少有三种情况导致重复消息的产生。那么重复消息该如何处理呢?

消费者的事务
        当然,我们的消费者端也可以采用事务的方式来确保消息的准确性。但是依旧是效率问题导致这个方式非常鸡肋,不再赘述了。

消息幂等性处理重复消息
        让每个消息携带一个全局的唯一ID,即可保证消息的幂等性,具体消费过程为:

        消费者获取到消息后先根据id去查询redis/db是否存在该消息(或者放到内存容器中)。
        如果不存在,则正常消费,消费完毕后写入redis/db
        如果存在,则证明消息被消费过,直接丢弃。
原来说的那么高大上,无非就是交给我们消费者自己去做去重校验。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1472722.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

APP自动化第一步:Appium环境搭建

一、安装Appium Python client包 1.直接cmd窗口输入pip install Appium-Python-Client 2.要确保安装匹配版本的selenium和appium 使用命令pip install selenium -U 首先进入网盘下载这三个软件的压缩包 二、安装Appium Server 1.双击打开压缩包Appium 2.双击进行安装。 3.点…

卡玛网● 46. 携带研究材料 ● 01背包问题,你该了解这些! 滚动数组 力扣● 416. 分割等和子集

开始背包问题,掌握0-1背包和完全背包即可,注:0-1背包是完全背包的基础。 0-1背包问题:有n件物品和一个最多能背重量为w 的背包。第i件物品的重量是weight[i],得到的价值是value[i] 。每件物品只能用一次,求…

最短路径——通过Dynamo批量创建行进路线

今天我们来聊聊Revit2020新增的一个功能——布线分析,这个功能还是挺有意思的,只是需要”桌子“以后多开放点API就好了,今天我们就简单的试用一下这个功能。 打开Revit2020我们在分析选项卡下,最右侧可以找到布线分析功能栏&am…

JavaScript事件机制

JavaScript事件机制描述的是事件在DOM里面的传递顺序,以及可以对这些事件做出如何的响应。 DOM事件流存在三个阶段: ①事件捕获阶段(从window对象传导到目标节点)、 ②处于目标阶段(在目标节点上触发)、 ③事件冒泡阶段(从目标节点传导回window对象)。 在…

JVM虚拟机初步了解

各位小伙伴们大家好,欢迎来到这个小扎扎的专栏 总结 | 提效 | 拓展,在这个系列专栏中记录了博主在学习期间总结的大块知识点;以及日常工作中遇到的各种技术点 ┗|`O′|┛ 🌆 题目速览 1. JVM的位置2. JVM的体系结构3…

python(ch2)

可变长编码和不可变长编码 可变长编码是指不同字符使用不同数量的字节进行编码。例如,UTF-8 编码中,ASCII 字符使用 1 个字节编码,而其他语言的字符使用 2 个或更多字节编码。 不可变长编码是指所有字符都使用相同数量的字节进行编码。例如…

【数据结构与算法】动态规划法解题20240227

动态规划法 一、什么是动态规划二、动态规划的解题步骤三、509. 斐波那契数1、动规五部曲: 四、70. 爬楼梯1、动规五部曲: 五、746. 使用最小花费爬楼梯1、动规五部曲: 一、什么是动态规划 动态规划,英文:Dynamic Pro…

物资管理新篇章:Java+SpringBoot实战

✍✍计算机编程指导师 ⭐⭐个人介绍:自己非常喜欢研究技术问题!专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目:有源码或者技术上的问题欢迎在评论区一起讨论交流! ⚡⚡ Java实战 |…

【Python_Zebra斑马打印机编程学习笔记(四)】ZPL的一些简单指令

ZPL的一些简单指令 ZPL的一些简单指令前言一、ZPL 介绍二、ZPL 语法解析1、标签开始、标签结束2、标签原点位置设置3、标签长度设置4、标签文本打印深度设置5、标签打印宽度设置6、标签方向设置7、标签元素定位8、标签绘制矩形9、标签输入字段10、标签设置字段字体、大小11、标…

【论文阅读-PRIVGUARD】Day3:1-2节

PRIVGUARD: Privacy Regulation Compliance Made Easier(PRIVGUARD:更轻松地遵守隐私规定) 摘要 持续遵守如GDPR和CCPA等隐私法规已经成为从小型创业公司到商业巨头的公司的一项昂贵负担。罪魁祸首是当今合规过程中对人工审核的严重依赖&…

Pytorch训练RCAN QAT超分模型

Pytorch训练RCAN QAT超分模型 版本信息测试步骤准备数据集创建容器生成文件列表创建文件列表的代码执行脚本,生成文件列表训练RCAN模型准备工作修改开源代码编写训练代码执行训练脚本可视化本文以RCAN超分模型为例,演示了QAT的训练过程,步骤如下: 先训练FP32模型再加载FP32训练…

壹[1],图像源

1,工具名称:图像源 2,参数说明 2.1,图像源 注: 本地图像,使用本地图片以及本地图像文件夹 相机,连接的相机 SDK,使用相机的SDK,而不是海康SDK 2.2,像素格式 注&…

Jeecg项目部署

说明:Jeecg是一款低代码开发平台,简单说是一款现成的项目,该项目集成了许多功能,我们可以在这个项目之上开发自己的业务代码。 本文介绍Jeecg项目的部署,包括后端jeecg-boot项目、前端vue3项目。前端项目在本地Window…

VScode连接远端服务器一直输入密码解决方法

文章目录 1 关闭远程连接2打开命令面板3 输入remote-ssh: kill vs code server on host… 1 关闭远程连接 2打开命令面板 3 输入remote-ssh: kill vs code server on host… remote-ssh: kill vs code server on host… 然后一路回车(选中出问题的主机),输一遍密码…

真正理解微软Windows程序运行机制——窗口机制(第一部分)

我是荔园微风,作为一名在IT界整整25年的老兵,今天说说Windows程序的运行机制。经常被问到MFC到底是一个什么技术,为了解释这个我之前还写过帖子,但是很多人还是不理解。其实这没什么,我在学生时代也被这个问题困绕过。…

【日常聊聊】Sora- 探索AI视频模型的无限可能

🍎个人博客:个人主页 🏆个人专栏:日常聊聊 ⛳️ 功不唐捐,玉汝于成 目录 前言 正文 方向一:技术解析 方向二:应用场景 方向三:未来展望 方向四:伦理与创意 方向…

深入理解JS的执行上下文、词法作用域和闭包(下)

🤍 前端开发工程师、技术日更博主、已过CET6 🍨 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 🕠 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 🍚 蓝桥云课签约作者、上架课程《Vue.js 和 E…

【PX4SimulinkGazebo联合仿真】在Simulink中使用ROS2控制无人机沿自定义圆形轨迹正向飞行(带偏航角控制)并在Gazebo中可视化

在Simulink中使用ROS2控制无人机沿自定义圆形轨迹正向飞行(带偏航角控制)并在Gazebo中可视化 系统架构Matlab官方例程Control a Simulated UAV Using ROS 2 and PX4 Bridge运行所需的环境配置PX4&Simulink&Gazebo联合仿真实现方法建立Simulink模…

vue3自定义实现悬浮固定按钮组件

目录 一、需求描述二、代码解读三、结果展示 一、需求描述 需要5个固定的悬浮圆,居于页面的右侧。鼠标悬浮在圆上面会显示对应的文字提示其中包含返回顶部悬浮圆,当页面滑至底部时出现,点击页面滑到顶部。点击按钮会给出弹窗 二、代码解读…

LCR 172. 统计目标成绩的出现次数

解题思路&#xff1a;二分查找 题解一 class Solution {public int countTarget(int[] scores, int target) {// 搜索右边界 rightint i 0, j scores.length - 1;while(i < j) {int m (i j) / 2;if(scores[m] < target) i m 1;else j m - 1;}int right i;// 若数…