RabbitMQ入门指南(九):消费者可靠性

news2025/2/1 16:51:31

专栏导航

RabbitMQ入门指南

从零开始了解大数据


目录

专栏导航

前言

一、消费者确认机制

二、失败重试机制

三、失败处理策略

四、业务幂等性

1.通过唯一标识符保证操作的幂等性

2.通过业务判断保证操作的幂等性

总结


前言

RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了消费者确认机制、失败重试机制、失败处理策略、业务幂等性等内容。


当RabbitMQ向消费者投递消息后,了解消费者的处理状态是非常重要的。因为消息的投递并不代表消费者一定能够正确地消费这些消息,可能会出现各种故障:

  • 网络故障:在消息投递过程中,如果RabbitMQ和消费者之间的网络连接出现故障,可能会导致消息无法正确投递给消费者。
  • 消费者宕机:如果消费者在接收消息后突然宕机,那么消息可能无法被正确处理。
  • 消费者处理异常:消费者在接收到消息后,由于处理不当或者出现异常情况,可能会导致消息处理失败。

以上情况都可能导致消息丢失,因此RabbitMQ需要知道消费者的处理状态,以便在消息处理失败时重新投递消息。

一、消费者确认机制

RabbitMQ的消费者确认机制Consumer Acknowledgement)是一种确保消息被成功处理的机制。当消费者处理消息结束后,需要向RabbitMQ发送一个回执,以告知消息的处理状态。这个机制对于确保消息的可靠传递非常重要,因为它可以防止消息在消费者端处理失败而没有被正确处理的情况。

回执有三种可选值:

  • ACK(确认):表示消费者成功处理了消息,RabbitMQ会从队列中删除该消息。
  • NACK(否定确认):表示消息处理失败,RabbitMQ需要再次投递该消息。
  • REJECT(拒绝):表示消息处理失败并且被拒绝,RabbitMQ会从队列中删除该消息。

在实际应用中,一般使用ACK或NACK两种方式。REJECT方式的使用相对较少,通常只在消息格式存在问题,即存在开发错误的情况下使用。因此大多数情况下需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ACK,处理失败时返回NACK。

在consumer服务的application.yml文件中添加配置修改Spring AMQP的ACK处理方式 :

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto

RabbitMQ 支持三种不同的确认模式,这些模式通过acknowledge-mode属性进行配置:

  • none:关闭ACK。消费者接收到消息后不需要发送任何确认给发送者,发送者将继续发送下一条消息。在这种模式下,如果消费者处理消息失败,消息将会丢失,无法保证消息的可靠性。
  • manual:手动ACK。消费者接收到消息后需要手动发送确认给发送者,发送者才会继续发送下一条消息。在这种模式下,如果消费者处理消息失败,可以手动发送NACK给发送者,告诉发送者这条消息处理失败,以便发送者重新发送消息。这种模式可以保证消息的可靠性,但需要消费者手动处理确认和NACK。
  • auto:自动ACK。Spring AMQP提供了一种自动的消息确认机制。它利用AOP(面向切面编程)对消息处理逻辑做了环绕增强。当业务正常执行时,Spring AMQP会自动返回ACK。当业务出现异常时,根据异常判断返回不同结果:业务异常,自动返回NACK;消息处理或校验异常,自动返回REJECT。

二、失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。 如果消费者持续出现异常,消息会不断地在队列中重新排队并重新发送,这可能会导致消息处理延迟和队列持续增长,给系统带来不必要的压力。

为了解决这个问题,Spring框架提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到MQ队列 。

在consumer服务的application.yml文件中添加配置:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true
          initial-interval: 1000ms
          multiplier: 1
          max-attempts: 3
          stateless: true
enabled: true开启消费者失败重试
initial-interval: 1000ms初始的等待时长
multiplier: 1每次重试的等待时长是上次等待时长的倍数
max-attempts: 3最大重试次数
stateless: truetrue表示重试是无状态的,即每次重试都是独立的,不会考虑之前的重试状态。如果业务中包含事务,需要改为false。

通过这样的配置,当消费者出现异常时,消息会在本地进行重试,而不是无限期地重新排队发送。在达到最大重试次数后,SpringAMQP会抛出AmqpRejectAndDontRequeueException异常,并将消息从队列中删除。这意味着最后一次处理消息的结果是失败的,并且消息不会被重新排队发送给消费者。

这种失败重试机制可以有效地减少消息处理的延迟和队列的增长,提高系统的稳定性和可用性。当然,在使用失败重试机制时,也需要考虑到业务逻辑和异常处理的合理性,避免因过度重试而导致的问题。

三、失败处理策略

在失败重试机制中,当达到最大重试次数后,消息会被直接丢弃。尽管这在某些场景中可能是可接受的,但对于那些对消息可靠性要求极高的业务来说,这显然是一个潜在的风险点。

Spring AMQP为此提供了强大的支持,允许开发人员自定义重试次数耗尽后的消息处理策略。这个策略是由MessageRecovery接口来定义的,它有三种不同的实现方式:

  • RejectAndDontRequeueRecoverer:当重试次数耗尽后,直接拒绝消息,并丢弃该消息。这是默认的处理方式。
  • ImmediateRequeueMessageRecoverer:当重试次数耗尽后,返回一个NACK给生产者,使消息重新入队,以便再次发送。
  • RepublishMessageRecoverer:当重试次数耗尽后,可以将失败的消息投递到一个指定的交换机和队列中,这个交换机和队列专门用来存放异常的消息。

在处理策略中,一种比较合适的方式是使用RepublishMessageRecoverer。当消息失败后,它会被投递到一个特定的、专门用于存放异常消息的队列中。这个队列可以由人工进行集中处理,使得开发人员可以更精细地处理和诊断问题。

在consumer服务中定义处理失败消息的交换机和队列:

    @Bean
    public DirectExchange errorExchange(){
        return new DirectExchange("error.direct");
    }

    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue");
    }

    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange){
        return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
    }

定义一个RepublishMessageRecoverer:

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

完整代码如下:

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorConfiguration {

    @Bean
    public DirectExchange errorExchange() {
        return new DirectExchange("error.direct");
    }

    @Bean
    public Queue errorQueue() {
        return new Queue("error.queue");
    }

    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) {
        return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
    }

    @Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

通过这样的配置,当消息在尝试多次重试后仍然失败时,它们会被自动投递到定义的异常消息队列中。这样就可以集中处理这些异常消息,进行进一步的诊断或处理。这种策略为开发人员在处理复杂分布式系统中的消息问题提供了一种更加专业和灵活的方式。

四、业务幂等性

在计算机科学和软件开发中,幂等性是一个重要的概念。简单来说,如果一个操作或函数不论执行一次还是多次,其结果都是相同的,那么称这个操作或函数是幂等的。在业务处理中,幂等性尤其关键。它可以保证系统的稳定性,确保在某些异常情况下,多次执行某个业务操作不会对业务状态产生不一致的结果。幂等性的重要性在于它能够避免因重复执行操作而产生的数据不一致、状态冲突等问题。在涉及金融交易、库存管理、用户认证等关键领域,幂等性是确保系统稳定和数据准确的重要前提。

1.通过唯一标识符保证操作的幂等性

为每个操作生成唯一的标识符(如ID),并在系统中跟踪这些标识符以检测重复操作。当接收到具有已知标识符的操作时,可以跳过重复的操作。Spring AMQP的MessageConverter自带了MessageID的功能,只要开启这个功能即可。

Jackson的消息转换器示例:

@Bean
public MessageConverter messageConverter(){
    // 定义消息转换器
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    // 配置自动创建消息ID,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jjmc.setCreateMessageIds(true);
    return jjmc;
}

在publisher服务中编写测试类,并利用RabbitTemplate实现消息发送:

    @Test
    void testSendMessage2Queue() {
        String queueName = "demo.queue";
        String msg = "Idempotent Test";
        rabbitTemplate.convertAndSend(queueName, msg);
    }

运行测试用例,查看结果:

2.通过业务判断保证操作的幂等性

业务判断,是一种基于业务逻辑和状态的检查,以确定是否对重复的请求或消息进行处理。在多种业务场景中,这一策略的思路各有不同。

比如在支付订单的案例中,业务逻辑主要是支付并将订单状态从“未支付”修改为“已支付”(需要防止重复支付)。因此,在执行这一业务时,可以判断订单的状态是否为“未支付”。若状态不是“未支付”,则说明该订单已经被处理过,无需重复处理。与基于唯一标识符的方案相比,业务判断方案无需对原有数据库进行改造,因此更为推荐。

以支付修改订单的业务为例:

    @Override
    public void markOrderPaySuccess(Long orderId) {
        // 查询订单
        Order order = getById(orderId);
        // 判断订单状态,订单不存在或者订单状态不是1,放弃处理
        if (order == null || order.getStatus() != 1) {
            return;
        }
        // 尝试更新订单
        order.setStatus(2);
        order.setPayTime(LocalDateTime.now());
        orderService.updateById(order);
    }

以上代码示例判断和更新是两步动作 ,极小概率下可能存在线程安全问题,所以可以进行以下修改:

    @Override
    public void markOrderPaySuccess(Long orderId) {
        // UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
        orderService.lambdaUpdate()
                .set(Order::getStatus, 2)
                .set(Order::getPayTime, LocalDateTime.now())
                .eq(Order::getId, orderId)
                .eq(Order::getStatus, 1)
                .update();
    }
 

总结

RabbitMQ是一个开源的消息队列软件,旨在提供可靠的消息传递和消息队列功能。本文主要介绍了消费者确认机制、失败重试机制、失败处理策略、业务幂等性等内容,希望对大家有所帮助。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1338157.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【python与机器学习3】感知机和门电路:与门,或门,非门等

目录 1 电子和程序里的与门,非门,或门,与非门 ,或非门,异或门 1.1 基础电路 1.2 所有的电路情况 1.3 电路的符号 1.4 各种电路对应的实际电路图 2 各种具体的电路 2.1 与门(and gate) 2…

python降低图像的灰度分辨率——冈萨雷斯数字图像处理

原理: 降低图像的灰度分辨率是指减少图像中不同灰度级别的数量,从而使图像看起来更加粗糙或简化。这可以通过减少灰度级别的数量或重新映射灰度级别来实现。以下是一些常见的降低图像灰度分辨率的原理和方法: 灰度量化(Gray Lev…

《PCI Express体系结构导读》随记 —— 第I篇 第1章 PCI总线的基本知识(5)

接前一篇文章:《PCI Express体系结构导读》随记 —— 第I篇 第1章 PCI总线的基本知识(4) 1.1 PCI总线的组成 PCI总线作为处理器系统的本地总线,是处理器系统的一个组成部件。因此,讲述PCI总线的组成结构,不…

ADRC-跟踪微分器TD的Maltab实现及参数整定

目录 问题描述: 跟踪微分器TD基本概念: Matlab及其实现: 跟踪效果: 例1:跟踪信号 sin(t) 0.5*rand(1,1)。 例2:跟踪部分时段为方波的信号,具体形式见代码get_command。 参数整定&#xf…

【贪心算法】专题练习一

欢迎来到Cefler的博客😁 🕌博客主页:那个传说中的man的主页 🏠个人专栏:题目解析 🌎推荐文章:题目大解析(3) 前言 1.什么是贪心算法?——贪婪鼠目寸光 贪心策…

【Windows】共享文件夹拍照还原防火墙设置(入站,出站设置)---图文并茂详细讲解

目录 一 共享文件夹(两种形式) 1.1 普通共享与高级共享区别 1.2 使用 二 拍照还原 2.1 是什么 2.2 使用 三 防火墙设置(入栈,出站设置) 3.1 引入 3.2 入站出站设置 3.2.1入站出站含义 3.3入站设置 3.4安装jdk 3.5使用tomcat进行访…

【C#】Visual Studio 2022 远程调试配置教程

在某些特殊的情况下,开发机和调试机可能不是同一台设备,此时就需要远程调试了。 开发机配置 首先需要确保两台机器在同一局域网下。 创建共享文件夹 随便找个地方新建一个文件夹,用来放编译结果。例如我这里是 D:\DebuggingWorkspace\。 …

git集成github(一):主要步骤

一、创建仓库 1、创建本地git仓库 在pcharm主界面顶栏,点击VCS,再点击创建git仓库,然后选择项目根路径,点击确认。这时,可以看到顶栏的VCS变成了git。 2、远程仓库下载到本地 打开一个远程仓库,点击code…

“C语言“——scanf()、getchar() 、putchar()、之间的关系

scanf函数说明 scanf函数是对来自于标准输入流的输入数据作格式转换,并将转换结果保存至format后面的实参所指向的对象。 而const char*format 指向的字符串为格式控制字符串,它指定了可输入的字符串以及赋值时转换方法。 简单来说给一个打印格式(输入…

PYTHON数据处理:CSV和JSON

#CSV和JSON格式的数据在python上的处理 CSV和JSON数据类型都是都是常见的两种在python中的数据分析类型,这里我有两个入门项目详细讲解这两种数据的处理。 处理一个CSV形式的地方的天气的数据,然后创建一个表格; 分析JSON形式的地震数据&…

扩展mybatis-plus,保留逻辑删、逻辑查的前提下,扩展硬删除、硬查询

引入相关依赖 <!-- 提示&#xff1a;1. common-mybatis-plus:2100.8.8 中只有4个类文件&#xff0c;是对硬删除、硬查询的扩展支持&#xff0c;如果你不想引入依赖的话&#xff0c;你可以把这四个文件复制到自己的项目中即可2. common-mybatis-plus:2100.8.8 对应mybatis-p…

CEEMDAN +组合预测模型(CNN-LSTM + ARIMA)

目录 往期精彩内容&#xff1a; 前言 1 风速数据CEEMDAN分解与可视化 1.1 导入数据 1.2 CEEMDAN分解 2 数据集制作与预处理 3 基于CEEMADN的 CNN-LSTM 模型预测 3.1 定义CEEMDAN-CNN-LSTM预测模型 3.2 定义模型参数 3.3 模型训练&#xff0c;训练结果 4 基于ARIMA的…

数据结构与算法:基于比较的排序算法:选择、冒泡、插入、归并的动图演示和java代码,排序时间复杂度、空间复杂度、稳定性总结表格

选择排序 选择排序是先在0~N-1上选择一个最小值排到最前面&#xff0c;然后再在1到N-1上选一个次小的&#xff0c;以此类推。 public static selectionSort(int[] arr){if(arrnull||arr.length<2){return;} //每次从i n-1 选一个最小的放前面for(int i0;i<arr.length-…

深入Apache Commons Config:管理和使用配置文件

第1章&#xff1a;引言 咱们都知道&#xff0c;在软件开发中&#xff0c;管理配置文件是一件既重要又让人头疼的事。想象一下&#xff0c;咱们的应用程序有一堆设置需要调整&#xff0c;比如数据库的连接信息、应用的端口号&#xff0c;或者是一些功能的开关。如果这些信息硬编…

Spring高手之路-SpringBean的生命周期

目录 SpringBean的生命周期 整体介绍 详细介绍 1.实例化Bean 2.设置属性值 3.检查Aware 4.调用BeanPostProcessor的前置处理方法 5.调用InitializingBean的afterPropertiesSet方法 6.调用自定义init-method方法 7.调用BeanPostProcessor的后置处理方法 8.注册Destru…

【深入之Java进阶篇】fastjson的反序列化漏洞(详解总结)

✔️ fastjson的反序列化漏 1️⃣典型解析2️⃣拓展知识仓1️⃣AutoType2️⃣AutoType 有何错?3️⃣ 绕过checkAutotype&#xff0c;黑客与fastjson的博弈4️⃣autoType不开启也能被攻击?5️⃣利用异常进行攻击6️⃣AutoType 安全模式? 1️⃣典型解析 当我们使用fastjson进行…

ueditor富文本编辑器中图片上传地址配置以及抓取远程图片地址的配置

一&#xff1a;图片上传保存地址配置 打开文件ueditor.php,找到imagePathFormat进行修改即可 一&#xff1a;远程抓取图片配置 打开文件ueditor.config.js,找到catchRemoteImageEnable&#xff0c;取消注释即可

Unity之ShaderGraph如何实现瓶装水效果

前言 有一个场景在做效果时,有一个水瓶放到桌子上的设定,但是模型只做了个水瓶,里面是空的,所以我就想办法,如何做出来瓶中装睡的效果,最好是能跟随瓶子有液体流动的效果。 如下图所示: 水面实现 水面效果 液体颜色设置 因为液体有边缘颜色和内里面颜色,所以要分开…

strlen和sizeof的初步理解

大家好我是Beilef&#xff0c;一个美好的下我接触到编程并且逐渐喜欢。我虽然不是科班出身但是我会更加努力地去学&#xff0c;有啥不对的地方请斧正 文章目录 目录 文章目录 前言 想必大家对sizeof肯定很了解&#xff0c;那对strlen又了解多少。其实这个问题应该让不少人困扰。…

内网穿透的应用-Ubuntu安装XRDP远程桌面结合内网穿透实现远程桌面Ubuntu

文章目录 一、 同个局域网内远程桌面Ubuntu二、使用Windows远程桌面连接三、公网环境系统远程桌面Ubuntu1. 注册cpolar账号并安装2. 创建隧道&#xff0c;映射3389端口3. Windows远程桌面Ubuntu 四、 配置固定公网地址远程Ubuntu1. 保留固定TCP地址2. 配置固定的TCP地址3. 使用…