MQ高级:RabbitMQ小细节

news2024/10/2 3:11:02

在之前的学习中,我们只介绍了消息的发送,但是没有考虑到异常的情况,今天我们就介绍一些异常情况,和细节的部分。

目录

生产者可靠性

生产者重连

生产者确认

MQ可靠性

持久化

Lazy Queue

消费者可靠性

消费者确认机制

失败重试机制

业务幂等性

延迟消息

死信交换机

延迟消息插件


生产者可靠性

生产者重连

有时候由于网络波动,可能会出现客户端连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制:

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间

  template:
    retry:
      enabled: true # 开启超时重试机制
      initial-interval: 1000ms # 失败后的初始等待时间
      multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长=initial-interval * multiplier
      max-attempts: 3 # 最大重试次数

在停止mq服务之后,运行代码,会发现测试失败,因为连接失败。最大重试次数设置的是3,此处就重试了3次再停止。

当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMOP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,
会影响业务性能。
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

生产者确认

在开启了生产者确认机制后,在MQ成功收到消息后会返回确认消息ACK给生产者,如果有异常,会返回NACK。

  • 消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
  • 其它情况都会返回NACK,告知投递失败

spring:
  rabbitmq:
    publisher-confirm-type: correlated
    # 开启publisher confirm机制,并设置confirm类型为correlated
    publisher-returns: true
    # 开启publisher return机制

 这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制
  • simple:同步阻塞等待MO的回执消息
  • correlated:MO异步回调方式返回回执消息

但是!生产者确认需要额外的网络和系统资源开销,尽量不要使用。
如果一定要使用,无需开启Publisher-Return机制,因为一般路由失败是自己业务问题。
对于nack消息可以有限次数重试,依然失败则记录异常消息。

MQ可靠性

解决了生产者可靠性,还需要解决MQ的可靠性。

通过生产者发送的消息被MQ存放到内存中,经过某些特殊情况或者MQ重启后,这部分数据会丢失。并且内存空间是有限的,当消费者故障或者处理太慢时,会导致消息积压,导致MQ阻塞。

持久化

为了解决这个问题,MQ引入了数据持久化,包括交换机持久化、队列持久化、消息持久化。

前两者只需要在创建的时候设置成Durable(默认)即可。

消息持久化默认是不开启的,要手动开启。

当不开启磁盘持久化,消息会全部存放在内存中。但是发送消息过多,会占满内存。之后多出来的消息会存放到Paged out中,也就是磁盘中。等待内存中的消息被处理完后,会再把磁盘中的消息加载到内存中,再继续处理。

当开启了磁盘持久化,接收到的消息会在内存和磁盘中都存一份。此时处理的消息是从内存中处理。内存会在将要满的时候清理一次,再继续完成消息处理。磁盘则会把所有消息都保存下来。

Lazy Queue

Lazy Queue惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储

并且惰性队列的性能很高,比之前的几种性能都会好一些。

消费者可靠性

消费者确认机制

当不开启消费者确认机制,生产者投递了一条消息,不管消费者是否处理完了,会马上被RabbitMQ删除,当做已经处理完了。但是如果消费者出现网络波动或者其他异常情况,会导致没有接收到这条消息,生产者这边还会认为消费者已经接收到消息了。告知RabbitMQ自己消息处理状态。处理消息结束后,应该向RabbitMQ发送一个回执,

回执有三种可选值:

  • ack:成功处理消息,RabbitMO从队列中删除该消息
  • nack:消息处理失败,RabbitMO需要再次投递消息
  • reiect:消息处理失败并拒绝该消息,RabbitMO从队列中删除该消息

SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式:

  • none:不处理消息确认。消息投递给消费者后立刻ack,消息会立刻从MQ删除。这种方式非常不安全,不建议使用。

  • manual:手动模式。需要在业务代码中手动调用API发送ack或reject。虽然存在业务侵入,但提供了更大的灵活性。

  • auto:自动模式。SpringAMQP利用AOP对消息处理逻辑进行环绕增强。当业务正常执行时,自动返回ack。当业务出现异常时,根据异常类型自动返回不同的结果:
    1.如果是业务异常,自动返回nack。
    2.如果是消息处理或校验异常,自动返回reject。

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto  # 可以设置为 none, manual, auto

失败重试机制

当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,形成无限循环。这会导致MQ的消息处理量飙升,给系统带来不必要的压力。

我们可以利用Spring的retry机制,在消费者出现异常时进行本地重试,而不是无限制地requeue到MQ队列。并且指定最大重试次数。

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
    retry:
      enabled: true  # 开启消费者失败重试
      initial-interval: 1000ms  # 初始的失败等待时长为1秒
      multiplier: 1  # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval
      max-attempts: 3  # 最大重试次数
      stateless: true  # true无状态; false有状态。如果业务中包含事务,这里改为false

  

在开启重试模式后,如果重试次数耗尽且消息依然失败,则需要有MessageRecoverer接口来处理。MessageRecoverer包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式。
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队。
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

当通过最后一种方式重试耗尽后,我们可以额外设置一个队列,比如error.queue,当发送失败的消息进入这个队列后,再通过邮件强提醒这样的机制推送给工作人员,可以有效解决消息发送失败的极端情况。

业务幂等性

在程序开发中,业务幂等性指的是同一个业务,执行一次或者执行多次对业务的状态是没有影响的。

唯一消息id
方案一,是给每个消息都设置一个唯一id,利用id区分是否是重复消息:

  1. 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
  3. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

业务判断

方案二,是结合业务逻辑,基于业务本身做判断。以我们的业务为例:我们要在支付后修改订单状态为已支付,应该在修改订单状态前先查询订单状态,判断状态是否是未支付。只有未支付订单才需要修改,其它状态不做处理。

如何保证支付服务与交易服务之间的订单状态一致性?

为确保支付服务与交易服务之间的订单状态一致性,我们采取了以下措施:

  1. 消息通知

    • 支付服务在用户支付成功后,通过MQ(消息队列)发送消息通知交易服务,以完成订单状态的同步。
  2. 消息可靠性策略

    • 采用生产者确认机制、消费者确认和消费者失败重试等策略,确保消息的可靠投递和处理。
    • 开启MQ的持久化功能,避免因服务宕机导致消息丢失。
  3. 业务幂等性

    • 在交易服务更新订单状态时进行业务幂等性判断,防止因消息重复消费导致订单状态异常。

如果交易服务消息处理失败,有什么兜底方案?

  • 定时任务
    • 在交易服务中设置定时任务,定期查询订单的支付状态。
    • 即使MQ通知失败,定时任务也可以作为兜底方案,确保订单支付状态的最终一致性。

延迟消息

延迟消息是消息队列中的一种重要功能,它允许消息在被发送到消息队列后并不会立即被消费者消费,而是在经过特定的时间延迟后才能被消费者获取和处理。这种特性在很多业务场景中都非常有用,比如订单处理超时、定时提醒等。

比如,用户A下单某商品的最后一件,订单确认后,迟迟不支付,但是又占用着这个名额。等到很久以后取消这个订单,此时想买的人没买到,商家没有卖掉,而这个人又没有买。这种情况用延迟消息就能很好的解决这个问题。当用户下单商品后,会设置一个延迟消息,假设30分钟内没有下单,这个延迟消息就会被发送到MQ,提醒数据库这个人订单超时了,强制让这个人取消订单。

死信交换机

当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
  • 要投递的队列消息堆积满了,最早的消息可能成为死信

死信消息,经过死信交换机可以变成延迟消息。

当publisher发布一个消息后,通过交换机进入队列。通过手动设置一个过期时间,让消息变成死信消息,此时消息会自动进入通过dead-letter-exchang设置的交换机dlx.direct,再一步步的进入到consumer。

延迟消息插件


RabbitMO的官方也推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息功能的交换机
当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。

@RabbitListener(bindings=@QueueBinding(
value=@Queue(name="delay.queue", durable="true"),
exchange=@Exchange(name="delay.direct",delayed="true"),
key="delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}",msg);
}
@Bean
public DirectExchange delayExchange(){
return ExchangeBuilder
.directExchange("delay.direct")
.delayed()//设置delay的属性为true
.durable(true)//持久化
.build();
}

发送消息时需要通过消息头x-delay来设置过期时间:

@Test
void testPublisherDelayMessage() {
    //1.创建消息
    String message = "hello, delayed message";
    //2.发送消息,利用消息后置处理器添加消息头
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            //添加延迟消息属性
            message.getMessageProperties().setDelay(5000);
            return message;
        }
    });
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2183659.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

LoadRunner实战测试解析:记录一次性能测试过程

环境准备 PC: Windows7/XP LoadRunner11: 与win10及以上版本不兼容 Nmon: 性能监控工具,部署到被测服务器 LoadRunner破解安装 下载地址:https://pan.baidu.com/s/1WJjcFWhrkWW-GgYwXdEniQ 提取码:f4z…

基于Spark的汽车行业大数据分析及可视化系统

作者:计算机学姐 开发技术:SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等,“文末源码”。 专栏推荐:前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏:…

NetApp 混合闪存 FAS 统一存储平台

挑战 简化内部和公有云中的数据管理 各种规模的企业在精简其存储运维方面正面临越来越大的挑战。他们存储和备份的数据量不断增长,而预算却在缩减。他们需要一个既能满足内部环境要求,又能结合公有云战略的解决方案。 解决方案 兼顾容量与性能的存储&…

设计模式-策略模式-200

优点:用来消除 if-else、switch 等多重判断的代码,消除 if-else、switch 多重判断 可以有效应对代码的复杂性。 缺点:会增加类的数量,有的时候没必要为了消除几个if-else而增加很多类,尤其是那些类型又长又臭的 原始代…

scratch棒球运动 2024年9月中国电子学会图形化编程 少儿编程 scratch编程等级考试一级真题和答案解析

目录 scratch棒球运动 一、题目要求 1、准备工作 2、功能实现 二、案例分析 1、角色分析 2、背景分析 3、前期准备 三、解题思路 1、思路分析 2、详细过程 四、程序编写 五、考点分析 六、 推荐资料 1、入门基础 2、蓝桥杯比赛 3、考级资料 4、视频课程 5、…

进程--信号量

信号量是什么 资源的竞争 资源竞争 : 当多个进程同时访问共享资源时,会产生资源竞争,最终最导致数据混乱临界资源 : 不允许同时有多个进程访问的资源,包括硬件资源(CPU、内存、存储器以及其他外围设备)与软件资源(共享代码段、共享数据结构…

SpringCloudEureka实战:搭建EurekaServer

1、依赖引入 <dependencies><!-- 注册中心 --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-server</artifactId></dependency> </dependencies> <de…

66.对cplusplus网的strtok函数的详细解释(补第56篇的翻译)

56.【C语言】字符函数和字符串函数(strtok函数) 点我跳转 目录 56.【C语言】字符函数和字符串函数(strtok函数) 点我跳转 1.原文 2. 翻译 1.原文 原文链接: cplusplus的介绍 点我跳转 2. 翻译 函数 strtok char * strtok ( char * str, const char * delimiters ); Spli…

如何构建一个生产级的AI平台(3)?

书接上回&#xff0c;继续往下讲,本节会说一下模型的路由和网关 模型的路由和网关 随着应用程序复杂性的增加和涉及的模型越来越多&#xff0c; 出现了两种类型的工具来帮助使用多个模型&#xff1a;路由和网关 1. 路由 应用程序可以使用不同的模型来响应不同类型的查询。 …

平衡二叉搜索树删除的实现

前言 上期讲了平衡二叉搜索树的插入&#xff0c;这一期我们来讲讲删除。同时&#xff0c;二叉搜索树的简介不会出现在本篇博客之中&#xff0c;如有需要可以查看上一篇博客《平衡二叉搜索树插入的实现》。 平衡二叉搜索树插入的实现-CSDN博客文章浏览阅读659次&#xff0c;点赞…

三、I/O控制器

1.主要功能 接受和识别CPU发出的命令(要有控制寄存器) 向CPU报告设备的状态(要有状态寄存器) 数据交换(要有数据寄存器&#xff0c;暂存输入/输出的数据) 地址识别(由I/0逻辑实现) 2.组成 CPU与控制器之间的接口(实现控制器与CPU之间的通信) I/0逻辑(负责识别CPU发出的命…

鸿蒙NEXT开发-ArkUI(基于最新api12稳定版)

注意&#xff1a;博主有个鸿蒙专栏&#xff0c;里面从上到下有关于鸿蒙next的教学文档&#xff0c;大家感兴趣可以学习下 如果大家觉得博主文章写的好的话&#xff0c;可以点下关注&#xff0c;博主会一直更新鸿蒙next相关知识 专栏地址: https://blog.csdn.net/qq_56760790/…

自定义注解加 AOP 实现服务接口鉴权以及内部认证

注解 何谓注解&#xff1f; 在Java中&#xff0c;注解&#xff08;Annotation&#xff09;是一种特殊的语法&#xff0c;用符号开头&#xff0c;是 Java5 开始引入的新特性&#xff0c;可以看作是一种特殊的注释&#xff0c;主要用于修饰类、方法或者变量&#xff0c;提供某些信…

中英翻译神器!轻松搞定跨文化沟通

大家好&#xff01;今天咱们来聊聊那些你生活中不可或缺的翻译小助手&#xff1b;不论你是个英语小白&#xff0c;还是希望更快地了解外国文献、掌握外媒信息&#xff0c;或者是从事需要大量翻译工作的小伙伴&#xff0c;总有一款翻译工具能帮你省时省力&#xff0c;让你的生活…

DBCP数据库连接池以及在Tomcat中配置JNDI数据源

前言 数据库连接 数据库连接是指在计算机系统中建立起应用程序与数据库之间的连接通道&#xff0c;用于进行数据的读取和写入操作。通过数据库连接&#xff0c;应用程序可以与数据库进行交互&#xff0c;执行各种数据库操作&#xff0c;如查询数据、插入数据、更新数据和删除数…

算法题总结(四)——螺旋矩阵

螺旋矩阵 59、螺旋矩阵二 给你一个正整数 n &#xff0c;生成一个包含 1 到 n2 所有元素&#xff0c;且元素按顺时针顺序螺旋排列的 n x n 正方形矩阵 matrix 。 示例 1&#xff1a; 输入&#xff1a;n 3 输出&#xff1a;[[1,2,3],[8,9,4],[7,6,5]]示例 2&#xff1a; 输…

2.点位管理开发(续)及设计思路——帝可得后台管理系统

目录 前言一、页面原型二、修改1、页面展示2、新增 3 、总结思路 前言 提示&#xff1a;本篇继续点位管理的改造 一、页面原型 页面展示新增 二、修改 1、页面展示 页面修改&#xff1a;修改标签换行、顺序顺序、地址过长时换行问题&#xff1b; <el-table v-loading…

七,MyBatis-Plus 扩展功能:乐观锁,代码生成器,执行SQL分析打印(实操详细使用)

七&#xff0c;MyBatis-Plus 扩展功能&#xff1a;乐观锁&#xff0c;代码生成器&#xff0c;执行SQL分析打印&#xff08;实操详细使用&#xff09; 文章目录 七&#xff0c;MyBatis-Plus 扩展功能&#xff1a;乐观锁&#xff0c;代码生成器&#xff0c;执行SQL分析打印&#…

愿祖国富强!肌肉水凝胶的奥秘,自协调与光驱动,运动模式大揭秘

大家好&#xff0c;在这个国庆佳节&#xff0c;我们一同感受科技的魅力。今天来了解一种特殊的肌肉样水凝胶&#xff0c;它通过自协调形状变化和摩擦调节&#xff0c;能在光的引导下实现多样运动。这一成果为软机器人发展带来新契机——《Light-steered locomotion of muscle-l…

基于ScriptableObject设计游戏数据表

前言 本篇文章是针对之前对于ScriptableObject概念讲解的实际应用之一&#xff0c;在游戏开发中&#xff0c;我们可以使用该类来设计编辑器时的可读写数据表或者运行时的只读数据表。本文将针对运行时的只读数据表的应用进行探索&#xff0c;并且结合自定义的本地持久化存储方式…