【从零开始学习RabbitMQ | 第二篇】如何确保MQ的可靠性和消费者可靠性

news2024/10/6 4:06:05

目录

前言:

MQ可靠性: 

数据持久化:

Lazy Queue: 

消费者可靠性:

消费者确认机制:

消费失败处理:

MQ保证幂等性:

方法一:

总结:


前言:

在上一篇文章中,我们介绍了如何确保生产者的可靠性,确保消息一定可以到达MQ。

【从零开始学习RabbitMQ | 第一篇】如何确保生产者的可靠性-CSDN博客icon-default.png?t=N7T8https://liyuanxin.blog.csdn.net/article/details/139261125?spm=1001.2014.3001.5502

但是MQ自己也是会丢失消息的,比如MQ的突然宕机或者消息过多造成的阻塞,因此我们这篇文章来介绍一下如何确保MQ的可靠性和消费者可靠性

默认情况下,RabbitMQ会把接收到的消息保存在内存中来降低消息收发的延迟,但这样会导致两个问题:

  • 一旦MQ宕机,内存中的消息会丢失
  • 内存空间有限,当消费者故障或者处理过慢的时候,会导致消息的积压,引发阻塞。 

想要保证MQ的可靠性,主要依赖于两种方式:

  • 数据持久化
  • Lazy Queue 

MQ可靠性: 

数据持久化:

RabbitMQ实现持久化主要有三个部分:

  1. 持久化队列(Durable Queues): 持久化队列指的是队列本身被存储在磁盘上,这样即使RabbitMQ服务器重启,队列也不会丢失。要创建持久化队列,需要在声明队列时设置durable属性为true。持久化队列中的消息默认不是持久化的,需要单独设置每条消息为持久化。

  2. 持久化消息(Persistent Messages): 持久化消息意味着消息本身被存储在磁盘上,因此即使RabbitMQ服务器重启,消息也不会丢失。在发送消息时,需要设置消息的deliveryMode属性为2(即PERSISTENT),这样RabbitMQ就会将消息存储到磁盘上。

  3. 持久化交换机(Durable Exchanges): 持久化交换器与持久化队列类似,指的是交换器被存储在磁盘上,这样服务器重启后交换器依然存在。声明交换器时,也需要设置durable属性为true。持久化交换器确保了消息路由的结构在服务器重启后能够保留。

当我们把消息持久化到磁盘中的时候,就避免了消息挤压造成的阻塞。但是当我们把消息持久化到磁盘中的时候,这个时候是不能接收新的消息的。

Lazy Queue: 

因此实际上把消息持久化到磁盘中不是一个很好的解决方案,在RabbitMQ的3.6.0版本后,增加了Lazy  Queue这个概念。其实就是懒惰队列:

懒惰队列的特性如下:

  • 接收到消息后直接存入磁盘而不是内存(内存只保留最近的消息,默认2048条)。
  • 消费者要消费消息的时候才会从磁盘中读取并加载到内存中。
  • 支持数百万条的消息存储。

在3.12版本后,所有的队列都是Lazy Queue模式,无法更改。

消费者可靠性:

消费者确认机制:

 为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。当消费者处理消息的结束之后,将会向RabbitMQ发送一个回执,告知自己消息的处理状态 ,消费者一共可以向Rabbitmq回执三种处理状态,分别是:

  • ACK:成功处理消息,RabbitMQ从队列中删除该消息
  • NACK:消息处理失败,RabbitMQ再次投递消息
  • REJECT: 消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

而消费者确认机制并不用我们手写,Spring AMQP已经为我们实现了消费者消息确认机制,并允许我们通过配置文件的形式选择ACK处理方式,有三种:

  • none:不处理,消息投递给消费者之后就会ack,消息会从MQ中删除,不安全
  • manual:手动模式。需要自己调用api发送ack或者reject。
  • auto:托管给Spring AMQP 利用 AOP实现消费者确认机制。当业务正常执行的时候返回ack

当业务异常的时候,根据异常不同返回不同的结果:

  1. 如果是业务异常,自动返回nack
  2. 如果是消息处理或者校验异常,自动返回reject

Spring AMQP的消费者确认机制默认开启auto。

消费失败处理:

如果我们把消费者确认机制配置为auto的时候,当消费者出现异常之后,消息会不断的重新入队到队列,再重新发送给消费者。同样的消息再次引发异常,再次入队,再次发送给消费者。

这样会导致MQ的消息处理数量飙升,带来不必要的压力,因此除了把处理失败的消息重新进行处理之外,我们还应该有其他的处理方案。

我们可以利用Spring 的retry机制,在消费者出现异常的时候使用本地重试,而不是无限制的requeue到MQ队列中。

    listener:
      simple:
        prefetch: 1
        retry:
          enabled: true #开启超时重试机制
          initial-interval: 1000ms #失败后的初始等待时间
          multiplier: 1 #失败后下次等待时长倍数,下次等待时长= Initial - interval * multiplier
          max-attempts: 3 #最大重试次数
          stateless: true #true无状态,false有状态,如果业务中包含事务,就改为fasle

当我们开启重试模式之后,如果重试次数耗尽并且消息依然失败,那么就需要有MessageRecoverer接口来处理,这个接口包含三种不同的实现:

  • RejectAndDontRequeueRecoverer :重试耗尽之后,直接reject,丢弃消息。
  • ImmedIateRequeueMessageRecoverer : 重试耗尽之后,返回nack,消息重新入队
  • RepublishMessageRecoerer:重试耗尽之后,将失败消息投递到指定的交换机

如果消息丢失不会对业务产生重大影响,可以选择RejectAndDontRequeueRecoverer

如果希望消息有机会被重新处理,可以选择ImmediateRequeueMessageRecoverer

RepublishMessageRecoverer则适用于需要对失败消息进行进一步分析或记录的情况。

MQ保证幂等性:

通过上述的各种保证措施,我们基本上可以确保业务至少被执行一次。那对于一些需要保证幂等性的业务,我们要如何保证这个消息只被消费一次呢?

【从零开始学习重要知识点 | 第一篇】快速了解什么是幂等性以及常见解决方案_幂等性问题如何解决分布式锁-CSDN博客icon-default.png?t=N7T8https://blog.csdn.net/fckbb/article/details/136331113?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522171689607216777224421340%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=171689607216777224421340&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~first_rank_ecpm_v1~rank_v31_ecpm-1-136331113-null-null.142%5Ev100%5Epc_search_result_base1&utm_term=%E5%B9%82%E7%AD%89%E6%80%A7%20%E6%88%91%E6%98%AF%E4%B8%80%E7%9B%98%E7%89%9B%E8%82%89&spm=1018.2226.3001.4187

方法一:

给每一个消息都设置一个唯一ID,用ID区分是否被消费过。当我们消费一个消息的时候,先在数据库中查询是否存在这个数据的ID,如果不存在就消费。如果存在就说明这个消息之前被消费过。

其实他和我们上面黏贴的文章中介绍的token机制的思想很像。而在MQ中也不需要我们手动去设置唯一ID,可以在消费者的消息转换器中开启:

 我们可以看一看这个setCreateMessageIds这个方法中是如何构造id的:

这里逻辑就很清晰了,实际上就是在发消息的时候做一个if判断,如果我们设置了构造id,并且当前消息配置类的id为空,我们就为这个消息构造一个id。

 但是这种方法也有自己的缺点,也就是对业务逻辑有侵入性,而且还有额外的数据库操作

总结:

        在本文中,我们探讨了RabbitMQ作为领先的开源消息代理,如何通过一系列高级特性和策略来确保消息队列和消费者的高度可靠性。

首先,我们讨论了RabbitMQ的持久化机制,它允许消息和队列在服务器重启后依然保持不变。通过将消息标记为持久化,并在队列上启用持久化选项,我们可以确保关键数据不会因系统故障而丢失。

最后,我们介绍了消息确认机制,这是确保消息被成功处理的关键。消费者在处理完消息后发送确认回执,RabbitMQ只有在收到确认后才会从队列中移除消息。如果消费者在处理过程中失败,未确认的消息将重新入队,供其他消费者处理。

如果我的内容对你有帮助,请点赞,评论,收藏。创作不易,大家的支持就是我坚持下去的动力!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1707714.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

windows帐户自动被锁定解决方法

处理方法方法一: 运行-gpedit.msc,打开组策略, 处理方法方法二: 运行-gpedit.msc,打开组策略, 在本地组策略编辑器页面中,选择计算机配置 > Windows设置 > 安全设置 > 账户策略 > 账…

Linux防火墙(以iptables为例)

目录 Linux配置防火墙1. 引言2. 什么是防火墙3. Linux中的防火墙3.1 iptablesiptables命令参数常用方式:3.1.1 安装iptables3.1.2 配置iptables规则3.1.3 示例一:使用iptables配置防火墙规则4. iptables执行过程 Linux配置防火墙 1. 引言 在互联网时代&…

NIUSHOP 开源商城 V6 开源版(商城+分销+VIPCard+上门服务)前端技术探索与实践

摘要: 本文深入探讨了NIUSHOP V6开源商城前端技术的选型、实现与设计理念。NIUSHOP V6作为一款优秀的企业级商城系统,其前端采用了Vite、TypeScript、Vue3及ElementPlus等最新技术栈,为开发者提供了高效、灵活的开发体验。本文将从技术选型、…

某有赞滑块验证码

⚠️前言⚠️ 本文仅用于学术交流。 学习探讨逆向知识,欢迎私信共享学习心得。 如有侵权,联系博主删除。 请勿商用,否则后果自负。 网址 aHR0cHM6Ly9wYXNzcG9ydC55b3V6YW4uY29tL2xvZ2luL3Bhc3N3b3Jk 1. 首先来分析一下参数 1_1. get-beh…

宝塔下应该用 Memcached 还是 Redis?

明月最近在跟几个使用宝塔面板的客户运维的时候发现不少站长不知道如何选择 Memcached 和 Redis,甚至都说不清楚 Memcached 或者 Redis 具体是用来干啥的?甚至还碰到过一个站长 Memcached 和 Redis 都安装了,但一个都没有用,就那么…

vscode远程登录阿里云服务器【使用密钥方式--后期无需再进行密码登录】【外包需要密码】

1:windows主机上生成【私钥】【公钥】 1.1生成公钥时不设置额外密码 1.2生成公钥时设置额外密码【给外包人员使用的方法】 2:在linux服务器中添加【公钥】 3:本地vscode连接linux服务器的配置 操作流程如下 1.1本地终端中【生成免密登录…

python练习题-反转一个只有三位数的整数

【问题描述】&#xff1a;反转一个只有三位数的整数 [示例]&#xff1a;123 321 完整代码如下&#xff1a; nint(input()) if n<100 or n>999: print("请输入三位数&#xff01;") else: gen%10 shin//10%10 bain//100 m100*ge10*shibai…

Pytorch入门需要达到的效果

会搭建深度学习环境和依赖包安装 使用Anaconda创建环境、在pytorch官网安装pytorch、安装依赖包 会使用常见操作&#xff0c;例如matmul&#xff0c;sigmoid&#xff0c;softmax&#xff0c;relu&#xff0c;linear matmul操作见文章torch.matmul()的用法 sigmoid&#xff0…

【算法】位运算算法——判断字符是否唯一

题解&#xff1a;判断字符是否唯一(位运算算法) 目录 1.题目2.题解3.位图参考代码4.细节5.总结 1.题目 题目链接&#xff1a;LINK 2.题解 题解有两种方法&#xff0c; 一是做一个哈希数组&#xff0c;去查重&#xff1b; 二是直接用一个变量每一位来对应表示是否有这个字母…

Linux主机连接腾讯云服务器详细配置

硬件条件 当然你要先有一个云服务器&#xff0c;腾讯云比阿里云便宜一点&#xff0c;所以就用腾讯云了 问了师兄买这个98的就行&#xff0c;选择CentOS&#xff0c;不要选Ubuntu&#xff0c;因为 嗯&#xff0c;大概就是这样 编程测试 云服务器当然是作为服务端 server.cpp…

LabVIEW高低温试验箱控制系统

要实现LabVIEW高低温试验箱控制系统&#xff0c;需要进行硬件配置、软件设计和系统集成&#xff0c;确保LabVIEW能够有效地监控和控制试验箱的温度。以下是详细说明&#xff1a; 硬件配置 选择合适的试验箱&#xff1a; 确定高低温试验箱的型号和品牌。 确认试验箱是否支持外…

SpringBoot 返回值 i18n 自动处理

定义基础通用类 首先定义一波错误码&#xff1a;ResultCode Getter AllArgsConstructor public enum ResultCode {SUCCESS(200, "请求成功", "request.success"),Fail(400, "请求失败", "request.failed"),PASSWORD_NOT_MATCH(1000…

保护“第二生命线”,科技守护颈椎健康

脊柱支撑着人体重量&#xff0c;汇集着众多血管神经&#xff0c;素有“人体第二生命线”之称。在如今快节奏的时代&#xff0c;人们生活方式也在发生着变化&#xff0c;长期低头看手机、伏案久坐等不良生活习惯引发脊柱健康问题&#xff0c;且呈现年轻化趋势。目前&#xff0c;…

2024.05.22学习记录

1、面经复习&#xff1a; Vue组件通讯、vuex、js严格模式、options请求、vue3 Setup 语法糖、React hook 2、代码随想录刷题&#xff1a;动态规划 3、rosebush组件库 完成Alert和Alert测试 Menu组件初步开发

基于单片机的自行车里程监测系统的设计

摘 要 &#xff1a;本设计是一种基于单片机的自行车里程监测系统&#xff0c;采用 STC89C52RC 单片机为核心处理芯片&#xff0c;液晶显示器使用 LCD1602 &#xff0c; 速度测量使用霍尔传感器&#xff0c;温度传感器使用 DS18B20 &#xff0c;时间由时钟芯片 DS1302 进行…

昆仑通态触摸屏组态软件MCGS 嵌入版V7.7.1.7老版触摸屏安装程序

1.MCGS7.7嵌入版用于昆仑通态老版本触摸屏组态开发&#xff0c;具体支持哪些型号组态&#xff0c;可以在软件的工程设置里面查看。新出的触摸屏一般用MCGS Pro版本组态开发&#xff0c;老版本触摸屏必须用MCGS 7.7嵌入版组态开发。 2.MCGS7.7嵌入版支持当下常用的Win7、Win10、…

Nodejs 第七十三章(网关层)

什么是网关层(getway)&#xff1f; 技术选型fastify 速度快适合网关层 fastify教程上一章有讲 网关层是位于客户端和后端服务之间的中间层&#xff0c;用于处理和转发请求。它充当了请求的入口点&#xff0c;并负责将请求路由到适当的后端服务&#xff0c;并将后端服务的响应…

怎样打造一份个性化画册呢?我来教你

在这个数字化的时代&#xff0c;传统的照片已经不能满足我们对个性化回忆的需求。个性化画册&#xff0c;不仅能够承载我们的记忆&#xff0c;还能展现自我风格。今天&#xff0c;就让我来教你如何打造一份属于自己的个性化画册。 1.要制作电子杂志,首先需要选择一款适合自己的…

Debian12 安装留档@Virtual Box

在学蜜罐系统的时候&#xff0c;T-Pot 需要Debian&#xff0c;于是安装Debian12 下载安装光盘 先去中科大下载了12的安装光盘&#xff0c;然后在VirtualBox中创建一个新虚拟机&#xff0c;将安装光盘挂载上。 安装光盘下载地址&#xff1a;https://mirrors.ustc.edu.cn/debi…

隐藏服务器源IP怎么操作,看这一篇学会!

在当今的网络环境中&#xff0c;服务器作为信息和服务的中枢&#xff0c;常驻于公网之上&#xff0c;面临着各式各样的安全威胁&#xff0c;其中&#xff0c;分布式拒绝服务&#xff08;DDoS&#xff09;攻击尤为猖獗&#xff0c;它通过协调大量计算机同时向目标服务器发送请求…