【RabbitMQ】可靠性策略(幂等,消息持久化)

news2024/11/24 13:43:17

MQ可靠性策略

  • 发送者的可靠性问题
    • 生产者的重连
    • 生产者确认
  • MQ的可靠性
    • 数据持久化
    • Lazy Queue
  • 消费者的可靠性问题
    • 消费者确认机制
    • 消息失败处理
  • 业务幂等性
  • 简答问题

发送者的可靠性问题

生产者的重连

可能存在由于网络波动,出现的客户端连接MQ失败,我们可以通过配置文件配置解决

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
         enabled:  true #开启超时重试机制
         initial-interval: 1000ms #失败后的初始等待时间
         multiplier: 1 #失败后下次的等待时长倍数,下次等待时间= initial-interval * multiplier
         max-attempts: 3 #最大重试次数

生产者确认

RabbitMQ提供了Publisher Confirm和Publisher Return两种确认机制,开机确认机制后,在MQ成功收到消息后会返回确认消息给生产者,返回的结果有以下几种情况:

  1. 消息投递到了MO,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功
  2. 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  3. 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
  4. 其他情况都会返回NACK,告知投递失败

通过配置文件配置生产者的消息类型:

spring:
   rabbitmq:
      publisher-confirm-type: correlated #开启publisher confirm机制,并设置confirm类型
      publisher-returns: true # 开启publisher return机制

这里的publisher-confirm-type 有三种模式可选:
none:关闭confirm机制
simple: 同步阻塞等待MQ的回执消息
correlated:MQ异步调用方式返回回执消息

异步回调方式:

我们完成一个任务将消息交由消息队列中,就进行别的任务了,当消息队列返回异常问题,在过来进行对应的处理

我们需要调用ReturnCallback函数完成消息失败后的操作:
在使用之前需要配置ReturnCallback,每个RabbitTemplate只能配置一个ReturnCallback

@Configuration
public class CommonConfig implements ApplicationContextAware{
   @Override
   public void setApplicationContext(ApplicationContext applicationContext) throws BeansException{
   //获取RabbitTemplate
   RabbitTemplate rabbittemplate =applicationContext.getBean(RabbitTemplate.class);
   //设置ReturnCallback
   rabbitTemplate.setReturnCallback(message,replyCode,replyText,exchange,routingKey)->{
   //处理操作
   }
   }
}

通过ConfirmCallback来处理消息失败:
每一个消息指定一个ConfirmCallback

void test() throws InterruptedException{
    CorrelationData cd= new CorrelationData();
    cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.confirm>(){
    @Override
    public void onFailure(Throwable ex){
    //Future 发生异常是的逻辑处理,基本不会触发
    }
    @Override
    public void onSuccess(CorrelationData.confirm result){
      //Future接收到回执的处理逻辑,参数中的result就是回执内容
      if(result.isAck()){ //result.isack,boolean类型,true代表ack回执,false表示nack回执
         //处理逻辑
         
      }else{
      //异常处理
      }
    }
    });
}
rabbittemplate.convertAndSend("","",cd);

MQ的可靠性

在默认情况下,Rabbitmq会将接收到的数据保存在内存中以降低消息收发的延迟,这样会有问题:

  1. 一旦MQ宕机,内存的消息会丢失
  2. 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞(消息对队列将消息保存到磁盘,此时MQ阻塞)

数据持久化

RabbitMQ的数据持久化包括:

  1. 交换机持久化(Durable 永久的,Transient临时的)
  2. 队列持久化(Durable 永久的,Transient临时的)
  3. 消息持久化

消息的持久化:

void test(){
   Message message =MessageBuilder
                   .withBody("hello".getBytes(StandardCharsets.UTF-8))
                   .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();
}

会将消息在过程中持久化到磁盘不会导致MQ阻塞

消息的持久化性能不是很高,可以通过Lazy Queue进行消息的持久化

Lazy Queue

惰性队列
特征:

  1. 接收到消息后直接存入磁盘而非内存(内存只保留最近消息,默认2048条)
  2. 消费者要消费消息时才会从磁盘中读取并加载到内存
  3. 支持百万条数据的消息存储

在3.12 版本后,所有的队列都是Lazy Queue模式,无法更改
在这里插入图片描述
在java中要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy:

@Bean
public Queue lazyQueue(){
  return QueueBuilder
         .durable("lazy.queue")
         .lazy() //开启lazy模式
         .build();
}

通过注解也可以实现

@RabbitListener(queuesToDeclare= @Queue(
                name="lazy.queue",
                durable="true",//持久化
                arguments =@Argument(name="x-queue-mode",value="lazy")))
public void listenLazyQueue(String msg){
             //消费处理
}

消费者的可靠性问题

消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制,当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:
ack:成功处理消息,RabbitMQ从队列中删除该信息
nack:消息处理失败,RabbitMQ需要再次投递信息
reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该信息
那么如何实现呢:
SprinaAMOP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式
有三种方式:
none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
auto:自动模式。SprinGAMOP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack.当业务出现异常时:
如果是业务异常,则会自动返回nack
如果是消息处理或校验异常,自动返回reject

spring:
   rabbitmq:
      listener:
         simple:
            prefetch: 1
            acknowledge-mode: none #none,关闭

消息失败处理

如果消费者返回nack,那就会重复进行,这样大大影响效率
我们可以利用Spring的retry机制,在消费者出现异常的时利用本地重试:

spring:
  rabbitmq:
    listener:
      simple:
          prefetch: 1
          retry:
             enabled: true  #开启消费者失败重试
             initial-interval: 1000ms #初始的失败等待时间
             multiplier: 1 #下次失败的等待时长的倍数
             max-attempts: 3 # 最大重试次数
             stateless: true # true无状态,false有状态,如果业务中包含事务,这里改为false

在开启重试模式之后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种实现:
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息,默认这种方式
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
在这里插入图片描述

将失败消息重新投递到error交换机中,可以绑定error消息队列,将来发送信息给开发人员等操作消息

将失败策略改为RepublishMessageRecoverer:

  1. 首先,定义接收失败消息的交换机,队列绑定
  2. 定义RepublishMessageRecoverer
@Bean
public MessageRecoverer test(RabbitTemplate rabbittemplate){
     return new RepublishMessageRecoverer(rabbitTemplate,"交换机名称","key值")
}

业务幂等性

幂等是一个数学概念,用函数表达式来描述是这样的:f(x)=f(f(x)),在程序开发中,则指同一个业务,执行一次或多次对业务状态的影响是一致的
幂等的使用场景:防止某一数据被重复进行修改
幂等业务:根据id的查询业务,根据id的删除业务等
非幂等:用户下单,扣减库存等

如何实现幂等:
方案一唯一消息id
给每一个消息都设置一个唯一id,利用id区分是否重复消息:

  1. 每一条消息都生成一个唯一id,与消息一起投递给消费者
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息id保存到数据库
  3. 如果下次又收到相同的消息,去数据库查询判断是否存在,存在则为重复消息放弃处理
@Bean
public MessageConverter messageConverter(){
    //定义消息转换器
    Jackson2JsonMessageConverter jjmc= new jackson2JsonMessageConverter();
    //配置自动创建消息id,用于识别不同消息,也可以在业务中基于id判断是否是重复消息
    jjmc.setCreateMessageIds(true);
    return jjmc;
}

但是这样会造成额外的操作冗余,比如还需要写数据库等等
方案二:结合业务逻辑,基于业务本身做判断

简答问题

如何保证支付服务与交易服务之间的订单状态一致性:

  1. 首先,支付服务会在正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步
  2. 其次,为了保证消息的可靠性,我们采取了生产者确认机制,消费者确认,消费者失败重试等策略,确保消息投递和处理的可靠性,同时可开启了MQ的持久化,避免因服务宕机导致消息丢失
  3. 最后,我们还会交易服务更新订单状态时作业业务幂等判断,避免因消息重复导致订单异常

如果交易服务处理失败,还有什么方案:
在交易服务设置定时任务,定期查询订单生产状态,这样即使MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1640654.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

VG做mirror引起的块偏移

事件起因 Oracle10.2环境 Aix操作系统使用aix的lvm技术。制作vg的mirror。以此来替换掉老的存储。 做mirror前&#xff0c;数据库已完全关闭 故障现象 在启动数据库时&#xff0c;发现IO错误。该系统的spfile&#xff0c;ctl&#xff0c;dbf均是用lv做的裸设备。其中dbf是使…

LIUNX系统编程:进程间通信-管道(2)

目录 1.管道通信的4种情况 1.1管道内没有数据&&写端不关闭 1.2管道内数据被写满&&写端不关闭 1.3写端写了一部分&#xff0c;关闭写端 1.4读端不读直接关闭&#xff0c;写端再写 1.管道通信的4种情况 1.1管道内没有数据&&写端不关闭 代码验证 #…

Qt之信号与槽

槽的本质&#xff1a;对信号响应的函数。 信号函数和槽函数通常位于某个类中&#xff0c;和普通的成员函数相⽐&#xff0c;它们的特别之处在于&#xff1a; 信号函数⽤ signals 关键字修饰&#xff0c;槽函数⽤ public slots、protected slots 或者 private slots 修饰。sign…

【Linux】环境变量、进程替换、wait/waitpid

文章目录 一、环境变量1. 查看环境变量的方法1.1 env1.2 echo $环境变量名 2. 在代码中使用环境变量的方法2.1 命令行参数传参2.2 environ变量2.3 getenv( )函数 3. export 二、进程替换1. execl2. execlp3. execle4. execv5. execvp6. execvpe7. 补充7.1 命名理解7.2 返回值 三…

数据结构可视化(适合考研党)

废话不多说传送门 还在疑惑平衡二叉树、红黑树、B树、B树怎么插入构建的吗&#xff0c;不要慌张&#xff0c;这个网站会一步一步来演示.&#xff0c;听了咸鱼的课还不够&#xff0c;需要自己动手模拟一下各种数据结构的CRUD&#xff01;&#xff01;

中华科技控股集团:人工智能标准化引领者与数字化服务新航程的启航者

4月30日, 矗立于时代科技潮头的中华科技控股集团&#xff0c;自2010年在香港这片国际金融沃土上诞生以来&#xff0c;便以其独特的国资背景与全球化视野&#xff0c;肩负起推动中国科技进步与产业升级的重任。作为国资委麾下的重要一员&#xff0c;中华科技始终坚持创新驱动发展…

数据结构--栈与队列【您的关注是我创作的动力!】

文章目录 栈什么是栈&#xff1f;栈的具体实现 队列什么是队列&#xff1f;队列的实现 栈 什么是栈&#xff1f; 栈也是顺序表的一种&#xff0c;栈的逻辑实现是先进后出&#xff08;后进先出&#xff09;就跟子弹夹一样。 具体逻辑就是它只允许在固定的一端进行数据的插入与…

Spring Boot中使用Redis和Lua脚本实现延时队列

码到三十五 &#xff1a; 个人主页 延时队列是一种常见的需求。延时队列允许我们延迟处理某些任务&#xff0c;这在处理需要等待一段时间后才能执行的操作时特别有用&#xff0c;如发送提醒、定时任务等。文中&#xff0c;将介绍如何在Spring Boot环境下使用Redis和Lua脚本来实…

【JAVA基础之反射】反射详解

&#x1f525;作者主页&#xff1a;小林同学的学习笔录 &#x1f525;mysql专栏&#xff1a;小林同学的专栏 1.反射 1.1 概述 是在运行状态中&#xff0c;对于任意一个类&#xff0c;都能够知道这个类的所有属性和方法&#xff1b; 对于任意一个对象&#xff0c;都能够调用它…

Edge 浏览器键入时如何关闭显示搜索和站点建议

Edge 浏览器键入时如何关闭显示搜索和站点建议

ssm104园区停车管理系统+jsp

园区停车管理系统的设计与实现 摘 要 网络技术和计算机技术发展至今&#xff0c;已经拥有了深厚的理论基础&#xff0c;并在现实中进行了充分运用&#xff0c;尤其是基于计算机运行的软件更是受到各界的关注。加上现在人们已经步入信息时代&#xff0c;所以对于信息的宣传和管…

基于SSM SpringBoot vue宾馆网上预订综合业务服务系统

基于SSM SpringBoot vue宾馆网上预订综合业务服务系统 系统功能 首页 图片轮播 宾馆信息 饮食美食 休闲娱乐 新闻资讯 论坛 留言板 登录注册 个人中心 后台管理 登录注册 个人中心 用户管理 客房登记管理 客房调整管理 休闲娱乐管理 类型信息管理 论坛管理 系统管理 新闻资讯…

【VueUse】超越基本功能的高级 Vue 元素操作

在vue开发中我们经常需要操作DOM元素&#xff0c;从简单的添加类到动态创建元素&#xff0c;这些操作都是不可避免的。而在VueUse库中&#xff0c;Elements相关API函数为我们提供了一系列强大而灵活的工具&#xff0c;帮助我们更轻松地处理DOM元素。无论是优雅地处理元素、动态…

25计算机考研院校数据分析 | 哈尔滨工业大学

哈尔滨工业大学&#xff08;Harbin Institute of Technology&#xff09;&#xff0c;简称哈工大&#xff0c; 校本部位于黑龙江省哈尔滨市&#xff0c;是由工业和信息化部直属的全国重点大学&#xff0c;位列国家“双一流”、“985工程”、“211工程”&#xff0c;九校联盟 、…

数据结构与算法之经典排序算法

一、简单排序 在我们的程序中&#xff0c;排序是非常常见的一种需求&#xff0c;提供一些数据元素&#xff0c;把这些数据元素按照一定的规则进行排序。比如查询一些订单按照订单的日期进行排序&#xff0c;再比如查询一些商品&#xff0c;按照商品的价格进行排序等等。所以&a…

ServiceNow 研究:通过RAG减少结构化输出中的幻觉

论文地址&#xff1a;https://arxiv.org/pdf/2404.08189 原文地址&#xff1a;rag-hallucination-structure-research-by-servicenow 在灾难性遗忘和模型漂移中&#xff0c;幻觉仍然是一个挑战。 2024 年 4 月 18 日 灾难性遗忘&#xff1a; 这是在序列学习或连续学习环境中出现…

Costas-Barker序列模糊函数仿真

文章目录 前言一、Costas 序列二、Barker 码三、Costas-Barker 序列模糊函数仿真1、MATLAB 核心代码2、仿真结果①、Costas-Barker 模糊函数图②、Costas-Barker 距离模糊函数图③、Costas-Barker 速度模糊函数图 四、资源自取 前言 Costas 码是一种用于载波同步的频率调制序列…

20232810 2023-2024-2 《网络攻防实践》实验七

一、实践内容 &#xff08;1&#xff09;使用Metasploit进行Linux远程渗透攻击 任务&#xff1a;使用Metasploit渗透测试软件&#xff0c;攻击Linux靶机上的Samba服务Usermap_script安全漏洞&#xff0c;获取目标Linux靶机的主机访问权限。实践步骤如下&#xff1a; ①启动Met…

字节跳动发起AI战争 寻找下一个TikTok

现如今在字节跳动&#xff0c;已近乎隐退的张一鸣&#xff0c;只重点关注两件事&#xff1a;其一&#xff0c;是风暴中的TikTok&#xff1b;其二&#xff0c;就是字节跳动正在全力追赶的AI战略业务。 提及字节的AI战略远望,多个接近字节的人士均认为,以Flow部门出品最为“正统…

缩小COCO数据集

在运行YOLOS模型的过程中&#xff0c;需要使用到COCO2017这个数据集&#xff0c;但从实验运行来看&#xff0c;其所需时间无疑是相当漫长&#xff0c;预计可能需要近几十天才能完成&#xff0c;因此便考虑缩小COCO数据集大小&#xff0c;即尽可能在遵循其分布的情况下&#xff…