RabbitMQ[3]-RabbitMQ如何保证消息的可靠性投递与消费?

news2024/11/29 12:40:21

上篇文章:RabbitMQ的核心概念有哪些?它们的职责是什么?中我们详细介绍了RabbitMQ的工作模式,根据它的工作模式,一条消息从生产者发出,到消费者消费,需要经历以下4个步骤:

  • 生产者将消息发送给RabbitMQ的Exchange交换机;
  • Exchange交换机根据Routing key将消息路由到指定的Queue队列;
  • 息在Queue中暂存,等待消费者消费消息;
  • 消费者从Queue中取出消息消费。

通过这种工作模式,很好地做到了两个系统之间的解耦,并且整个过程是一个异步的过程,producer发送消息后就可以继续处理自己业务逻辑,不需要同步等待consumer的消费结果。

但任何一项技术的引入,除了带来它自身的优点之外,必然也会带来其他的一些缺点。MQ消息中间件虽然可以做到系统之间的解耦以及异步通信,但可能会存在消息丢失的风险。

该工作模式存在的问题——消息可能丢失

什么是消息丢失呢?简单来说,就是producer发送了一条消息出去,但由于某种原因(比如RabbitMQ宕机了),导致consumer没有消费到这条消息,最终导致producer与consumer两个系统的数据与期望结果不一致。

那消息是如何丢失的呢?既然在RabbitMQ的工作模式中,一条消息从producer到达consumer要经过4个步骤,那么在这4步中,任何一步都可能会把消息丢掉:

(1)生产者将消息发送给Exchange交换机:假如producer向Exchange发送了一条消息,由于是异步调用,所以producer不关心Exchange是否收到了这条消息,就继续向下处理自己的业务逻辑。如果在Exchange收到消息之前,RabbitMQ宕机了,那这条消息就丢了。

(2)Exchange交换机接收到消息后,会根据producer发送过来的Routing key将消息路由到匹配的Queue队列中。一般情况下,这一步不会出现什么问题,因为这一步是在RabbitMQ内部实现的,并且Exchange与Queue之间的Routing key都会在开发之前约定好,所以,只要保证producer发送消息时使用的Routing key是真实存在的即可正确地路由到指定的Queue队列。但万一小明在复制代码的时候,手一抖,导致发送消息时的Routing key多了个数字,此时,消息发出去后,Exchange虽然能收到消息,但由于匹配不到Routing key,所以无法将消息路由到Queue队列,那这条消息也算是变相消失了。

(3)消息到达Queue中暂存,等待consumer消费:如果消息成功被路由到了Queue中,此时这条消息会被暂存在RabbitMQ的内存中,等到consumer消费,假如在consumer消费这条消息之前,RabbitMQ宕机了,那么这条消息也会丢失。

(4)consumer从Queue中取走消息消费:如果前面一切顺利,并且消息也成功被consumer从Queue中取走消费,但consumer最后消费发生异常失败了。由于默认情况下,当一条消息被consumer取走后,RabbitMQ就会将这条消息从Queue中直接删除,所以,即使consumer消费失败了,这条消息也会消失,这样也会导致producer与consumer两个系统的数据不一致。

解决方法——问题分解,逐个击破

分析完了消息可能发生丢失的几个步骤,接下来就可以针对这几个步骤进行逐个解决,来保证消息不会丢失,也就是消息的可靠性投递与消费。

1. 保证producer发送消息到RabbitMQ broker的可靠性

通过上面的分析,我们知道,producer发送消息到broker的过程中,丢失消息的原因是producer发送完消息之后,就主观认为消息发送成功了,即使RabbitMQ发生故障导致没有接收到消息,producer也是无法知道的。所以,要保证producer发出去的消息100%被broker接收成功,我们需要让producer发送消息后知道一个结果,然后根据这个结果再做相应的处理。

RabbitMQ提供了两种方式来达到这一目的:一种是Transaction(事务)模式,一种是Confirm(确认)模式。

(1)Transaction模式

Transaction模式类似于我们操作数据库的操作,首先开启一个事务,然后执行sql,最后根据sql执行情况进行commit或者rollback。

在RabbitMQ中实现Transaction模式时,首先要用Channel对象的txSelect()方法将信道设置成事务模式,broker收到该命令后,会向producer返回一个select-ok的命令,表示信道的事务模式设置成功;然后producer就可以向broker发送消息了。在消息发送完成后,producer要调用Channel对象的commit()方法提交事务。整个流程可以用下图表示:

在Transaction模式中,producer只有收到了broker返回的Commit-Ok命令后才能提交成功,若在commit执行之前,RabbitMQ发生故障抛出异常,producer可以将其捕获,然后通过Channel对象的txRollback()方法回滚事务,同时可以重发该消息。

Transaction模式虽然可以保证消息从producer到broker的可靠性投递,但它的缺点也很明显,它是阻塞的,只有当一条消息被成功发送到RabbitMQ之后,才能继续发送下一条消息,这种模式会大幅度降低RabbitMQ的性能,不推荐使用。

(2)Confirm模式(推荐)

针对Transaction模式存在的浪费RabbitMQ性能的问题,RabbitMQ推出了Confirm模式。Confirm模式是一个异步确认的模式,producer发送一条消息后,在等待确认的过程中可以继续发送下一条消息。

要使用Confirm模式,首先要通过Channel对象的confirmSelec()方法将当前Channel设置为Confirm模式,然后,通过该Channel发布消息时,每条消息都会被分配一个唯一的ID(从1开始计数),当这条消息被路由到匹配的Queue队列之后,RabbitMQ就会发送一个确认(ack)给producer(如果是持久化的消息,那么这个确认(ack)会在RabbitMQ将这条消息写入磁盘之后发出),这个确认消息中包含了消息的唯一ID,这样producer就可以知道消息已经成功到达Queue队列了。

当RabbitMQ发生故障导致消息丢失,也会发送一个不确认(nack)的消息给producer,nack消息中也会包含producer发布的消息唯一ID,producer接收到nack的消息之后,可以针对发布失败的消息做相应处理,比如重新发布等。

了解了原理后,接下来看下代码层面实现Confirm模式的三种方式:

(1)单条确认方式

单条确认模式中,每发送一条消息后,通过调用Channel对象的waitForConfirms()方法等待RabbitMQ端确认,主要代码如下:

这种方式实际上是一种同步等待的方式,只有当一条消息被确认之后,才能发送下一条消息,所以,实际使用中不推荐这种方式。

(2)批量确认方式

批量确认方式与单条确认方式使用方法类似,只是将确认的步骤放到了最后,可以一次性发送多条消息,最后统一确认,主要代码如下:

waitForConfirmsOrDie()方法会等最后一条消息被确认或者得到nack时才会结束,这种方式虽然可以做到多条消息并行发送,不用互相等待,但最后确认的时候还是通过同步等待的方式完成的,所以也会造成程序的阻塞,并且当有任意一条消息未确认就会抛出异常,实际使用中不推荐这种方式。

(3)异步确认方式(推荐)

异步确认方式的实现原理是在将Channel设置为Confirm模式后,给该Channel添加一个监听ConfirmListener,ConfirmListener中定义了两个方法,一个是handleAck,用来处理RabbitMQ的ack确认消息,一个是handleNack,用来处理RabbitMQ的nack未确认消息,这两个方法会在RabbitMQ完成消息确认和发生故障导致消息丢失时回调,producer接收到回调时可以执行对应的回调处理。主要代码如下:

异步确认的方式效率很高,多条消息既可以同时发送,不需要互相等待,又不用同步等待确认结果,只需异步监听确认结果即可,所以,实际使用中推荐使用这种方式。

2. 保证Exchange路由消息到Queue的可靠性

上面分析了,当producer发送消息时,由于小明手抖,导致消息的Routing key是一个不存在的key,从而变相丢失的情况,要如何提前规避掉呢?有两种方式:ReturnListener和使用备胎Exchange交换机。

(1)ReturnListener

ReturnListener是一个监听器,作用于Channel信道上,当producer发送一条消息给RabbitMQ后,如果由于Routing key不存在导致消息不可成功到达Queue队列,RabbitMQ就会将这条消息发送回producer的ReturnListener,在ReturnListener的handleReturn方法中,producer可以针对退回的消息做处理。

要使用ReturnListener,在发送消息时要注意,在basicPublish的方法中有一个mandatory的入参,只有将该参数值设置为true才可以正常使用ReturnListener,否则,当Routing key不存在时,消息会被自动丢弃。核心代码如下:

producer运行上述代码之后,就会打印出ReturnListener中的信息,此时,producer可以针对这条消息做业务处理,比如发送提醒信息给相关人员处理,或者更新状态等。但要注意,这里最好不要重发ReturnListener中的消息,因为导致消息被回退的原因就是消息不可达,如果在ReturnListener中重发这条消息的话,那么就有可能进入一个死循环,重发->退回->重发->退回......

(2)备胎Exchange交换机

除了使用ReturnListener,我们还可以使用备胎交换机的方式来解决Routing key不存在导致消息不可达的问题。所谓备胎交换机,是指当producer发送消息的Routing key不存在导致消息不可达时,自动将这条消息转发到另一个提前指定好的交换机上,这台交换机就是备胎交换机。备胎交换机也有自己绑定的Queue队列,当备胎交换机接到消息后,会将消息路由到自己匹配的Queue队列中,然后由订阅了这些Queue队列的消费者消费。

在开发时,如果要使用备胎交换机,也要在发送消息时,将mandatory参数值设置为true,否则,消息就会由于不可达而被RabbitMQ自动丢弃。核心代码如下:

然后我们运行该程序,然后可以在RabbitMQ控制台看到,消息被成功路由到了备胎交换机绑定的Queue队列:

然后我们开启一个消费者消费该Queue,也可以正常消费到这条原本不可达的消息:

3. 保证Queue消息存储的可靠性

消息到达Queue队列之后,在消费者消费之前,RabbitMQ宕机也会导致消息的丢失,所以,为了解决这种问题,我们需要将消息设置成持久化的消息。持久化消息会写入RabbitMQ的磁盘中,RabbitMQ宕机重启后,会恢复磁盘中的持久化消息。

但消息是存储于Queue队列中的,所以,只把消息持久化也不行,也要把Queue也设置成持久化的队列,这样,RabbitMQ宕机重启之后,Queue才不会丢失,否则,即使消息是持久化的,但Queue不是持久化的,那么RabbitMQ重启之后,Queue都不存在了,那么消息也就无处存放,也就相当于没了。

通过代码设置消息持久化和队列持久化很简单,首先看队列持久化:

Channel对象在声明队列的时候,第二个参数 durable 就代表该队列是否持久化队列,设置为 true 表示当前队列是持久化队列。

然后看下消息持久化:

Channel对象在发送消息时,有一个BasicProperties类型的参数,该参数中可以设置一些消息的属性,其中就包括是否持久化的 deliveryMode 属性,2表示持久化消息。

将队列和消息进行持久化可以保证大部分场景下RabbitMQ宕机重启后消息不丢失,但并不能100%保证,因为RabbitMQ接收到持久化消息之后,并不会立刻将消息存入磁盘,而是有一个缓冲buffer,只有当buffer写满了或者每25ms一次才会将数据写入磁盘中,所以,在这之前,消息还是会存在丢失的可能,想要更大程度地保证这种情况下消息不丢失,可以搭建RabbitMQ镜像集群,这个我在以后章节会讲。

上面介绍了队列和消息的持久化,其实Exchange交换机也可以持久化,不过交换机是否持久化对消息的可靠性并没有什么影响,只是非持久化的交换机在RabbitMQ重启之后也会消失,那么producer向该交换机发送消息时就可能会有问题,所以,一般情况下,建议也将交换机持久化:

Channel对象在声明交换机时,有一个durable的参数,该参数设置为true即表示该交换机为持久化交换机。

4. 保证consumer消费消息的可靠性

consumer消费消息时,有一个ack机制,即向RabbitMQ发送一条ack指令,表示消息已经被成功消费,RabbitMQ收到ack指令后,会将消息从本地删除。默认情况下,consumer消费消息是自动ack机制,即消息只要到达consumer,就会向RabbitMQ发送ack,不管consumer是否消费成功。所以,为了保证producer与consumer数据的一致性,我们要使用手动ack的方式确认消息消费成功,即在消息消费完成后,通过代码显式调用发送ack。

首先,我们一起看下实现手动ack的核心代码:

consumer向RabbitMQ发送ack时有三种形式:

(1)reject:表示拒收消息。发送拒收消息时,需要设置一个 requeue 的参数,表示拒收之后,这条消息是否重新回到RabbitMQ的Queue之后,设置为true表示是,false表示否(消息会被删除)。若 requeue 设置为 true,那么消息回归原Queue之后,会被消费者重新消费,这样就会出现死循环,消费->拒绝->回Queue->消费->拒绝->回Queue......所以,一般设置为false。如果设置为true,那么最好限定消费次数,比如同一条消息消费5次之后就直接丢掉。

(2)nack:一般consumer消费消息出现异常时,需要发送nack给MQ,MQ接收到nack指令后,会根据发送nack时设置的requeue参数值来判断是否删除消息,如果requeue为true,那么消息会重新放入Queue队列中,如果requeue为false,消息就会被直接删掉。当requeue设置为true时,为了防止死循环性质的消费,最好限定消费次数,比如同一条消息消费5次之后就直接丢掉。

(3)ack:当consumer成功把消息消费掉后,需要发送ack给MQ,MQ接收到ack指令后,就会把消费成功的消息直接删掉。

补偿机制

经过上面这几个步骤的改造优化,我们的应用程序已经能够保证99.99%场景下消息的可靠性投递与消费了,但由于某些不可控因素,也并不能保证100%的消息可靠性,只有producer明确知道了consumer消费成功了,才能100%保证两边数据的一致性。

因为MQ是异步处理,所以producer是无法通过RabbitMQ知道consumer是否消费成功了,所以,如果要保证两边数据100%一致,consumer在消费完成之后,要给producer发送一条消息通知producer自己消费成功了。

但producer不能一直在那干等着,如果consumer过了1小时还没有发送消息给producer,那么很可能是consumer消费失败了,所以,producer与consumer之间要根据业务场景定义一个反馈超时时间,并在producer后台定义一个定时任务,定时扫描超过指定时间未接收到consumer确认的消息,然后重发消息。重发消息的逻辑中,最好定义一个重发最大次数,比如重发3次后还是不行的话,那可能就是consumer有bug或者发生故障了,就停止重发,等待问题查明再解决。

既然producer可能会重发消息,所以,consumer端一定要做幂等控制(就是已经消费成功的消息不再次消费),要做到幂等控制,消息体中就需要有一个唯一的标识,consumer可以根据这个唯一标识来判断自己是否已经成功消费了这条消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/85152.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java笔记——String类各种方法的使用总结(附带实例)

String类的获取方法 String类实现获取功能的方法有 int length() —— 获取字符串长度 char charAt(int index) —— 获取指定索引处的字符值 int indexOf(int ch) —— 获取指定字符第一次出现的索引位置 int indexOf(String str) —— 获取指定字符串第一次出现的索引位…

ArrayDeque源码解析

ArrayDeque源码解析 问题 (1)什么是双端队列? (2)ArrayDeque 是怎么实现双端队列的? (3)ArrayDeque 是线程安全的吗? (4)ArrayDeque 是有界的…

【正点原子FPGA连载】 第三十五章双目OV5640摄像头HDMI显示实验 摘自【正点原子】DFZU2EG/4EV MPSoC 之FPGA开发指南V1.0

1)实验平台:正点原子MPSoC开发板 2)平台购买地址:https://detail.tmall.com/item.htm?id692450874670 3)全套实验源码手册视频下载地址: http://www.openedv.com/thread-340252-1-1.html 第三十五章双目O…

基于jsp+mysql+ssm二手书交易管理系统-计算机毕业设计

项目介绍 这样一个二手书交易网站为用户提供了一个可以在网上买卖图书的平台,用户可以通过二手书交易管理系统进行注册或登录操作,登录成功后可以查看自己已发布的售书信息或者求购信息。同时,用户可以浏览其他用户发布的售书信息和求购信息…

基于51单片机的多层电梯(1-16层)运行系统仿真设计_层数可改

基于51单片机的多层电梯(1-16层)运行系统仿真设计_层数可改 仿真图proteus 8.9 程序编译器:keil 4/5 编程语言:C语言 设计编号:S0027 视频演示 基于51单片机的多层电梯(1-16层)运行系统仿真设计演示视频主要功能: 结合实际情…

[附源码]Python计算机毕业设计SSM基于web的学生社团管理系统(程序+LW)

项目运行 环境配置: Jdk1.8 Tomcat7.0 Mysql HBuilderX(Webstorm也行) Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。 项目技术: SSM mybatis Maven Vue 等等组成,B/S模式 M…

[附源码]Python计算机毕业设计SSM基于的二手房交易系统(程序+LW)

项目运行 环境配置: Jdk1.8 Tomcat7.0 Mysql HBuilderX(Webstorm也行) Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。 项目技术: SSM mybatis Maven Vue 等等组成,B/S模式 Ma…

基于Java+SQL Server2008开发的(WinForm)个人财物管理系统【100010036】

一、需求分析 个人财务管理系统是智能化简单化个人管理的重要的组成部分。并且随着计算机技术的飞速发展,计算机在管理方面应用的旁及,利用计算机来实现个人财务管理势在必行。本文首先介绍了个人财务管理系统的开发目的,其次对个人财务管理…

2022年12月中国数据库排行榜:OceanBase立足创新登榜首,华为腾讯排名上升树雄心

不经一番寒彻骨,怎得梅花扑鼻香。 2022年12月的 墨天轮中国数据库流行度排行榜 火热出炉,本月共有249个数据库参与排名,相比上月新增3个数据库。本月排行榜前十用一句话可以概括为:榜单前十一片红,TODO 格局重洗牌&…

[附源码]Python计算机毕业设计SSM基于web技术的米其林轮胎管理系统(程序+LW)

项目运行 环境配置: Jdk1.8 Tomcat7.0 Mysql HBuilderX(Webstorm也行) Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。 项目技术: SSM mybatis Maven Vue 等等组成,B/S模式 M…

SpringBoot中使用Spring-Retry重试框架 - 第454篇

悟纤:最近我看到自己之前的try/catch、while代码进行请求的重试,看着很不舒服。 师傅:确实了,为师以前也是写出过这样的一堆难看的代码。 悟纤:那师傅这个事情有解吗? ​师傅:徒儿,…

博客网页制作基础大二dw作业 web课程设计网页制作 个人网页设计与实现 我的个人博客网页开发

🎉精彩专栏推荐👇🏻👇🏻👇🏻 ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 💂 作者主页: 【主页——🚀获取更多优质源码】 🎓 web前端期末大作业…

2022全年度平板电视十大热门品牌销量榜单

随着社会的发展,近年来,平板电视成为了彩电转型升级的新方向。随着我国传统彩电进入存量竞争阶段,平板电视已成为我国彩电行业结构调整和转型升级的主要方向。 根据鲸参谋数据统计,今年京东平台平板电视的年度累计销量达到1300多万…

< 在element-ui中: 使用el-tree + el-table组件,联动请求用户数据表格组件 (基础版,后续可能更新) >

文章目录👉 前言👉 一、效果演示👉 二、原理👉 三、实现代码往期内容 💨👉 前言 在 Vue elementUi 开发中,实现通过树状组织机构,点击查询用户信息联动效果! 组件较为简…

MySQL数据库学习(6)

一、MySQL索引简介 索引是一种特殊的数据库结构,由数据表中的一列或多列组合而成,可以用来快速查询数据表中有某一特定值的记录。 通过索引,查询数据时不用读完记录的所有信息,而只是查询索引列。不然的话,数据库系统将…

强化学习_06_pytorch-doubleDQN实践(Pendulum-v1)

环境描述 环境是倒立摆(Inverted Pendulum),该环境下有一个处于随机位置的倒立摆。环境的状态包括倒立摆角度的正弦值,余弦值,角速度;动作为对倒立摆施加的力矩(action Box(-2.0, 2.0, (1,), float32))。…

windows11安装cuda+cudnn

安装Nvidia显卡驱动 如需安装显卡驱动,在官方驱动下载网站找到自己的显卡型号对应的驱动下载并安装:官方驱动 | NVIDIA 安装CUDA 前言 windows10 版本安装 CUDA ,首先需要下载两个安装包 CUDA toolkit(toolkit就是指工具包)cu…

Qt扫盲-QLineEdit理论总结

QLineEdit理论总结1. 简述2. 输入模式3. 输入限制4. 文本操作槽函数3. 信号4. 快捷键5. 外观1. 简述 QLineEdit 是一个有用的编辑功能类,主要是处理输入和编辑单行纯文本 ,主要是单行哦,就用来输入简单,短小的字符串。内部其实已…

极客时间Kafka - 09 Kafka Java Consumer 多线程开发实例

文章目录1. Kafka Java Consumer 设计原理2. 多线程方案3. 代码实现4. 问题思考目前,计算机的硬件条件已经大大改善,即使是在普通的笔记本电脑上,多核都已经是标配了,更不用说专业的服务器了。如果跑在强劲服务器机器上的应用程序…

JSP ssh科研管理系统myeclipse开发mysql数据库MVC模式java编程计算机网页设计

一、源码特点 JSP ssh科研管理系统是一套完善的web设计系统(系统采用ssh框架进行设计开发),对理解JSP java编程开发语言有帮助,系统具有完整的源代码和数据库,系统主要采用B/S模式开发。开发环境为TOMCAT7.0,Myec…