RabbitMQ消费者的可靠性

news2024/9/22 13:41:28

目录

一、消费者确认

二、失败重试机制

2.1、失败处理策略

三、业务幂等性

3.1、唯一消息ID

 3.2、业务判断

3.3、兜底方案


一、消费者确认

RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ack,处理失败时返回nack.

由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
  • manual:手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活
  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:
    • 如果是业务异常,会自动返回nack
    • 如果是消息处理或校验异常,自动返回reject;

 通过下面的配置可以修改SpringAMQP的ACK处理方式:

spring:
  rabbitmq:
    host: 192.168.200.129 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: admin # 用户名
    password: 123 # 密码
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
        acknowledge-mode: none

当使用none模式时

生产者发送一条消息

 消费者接受消息时抛异常

 

测试可以发现:当消息处理发生异常时,消息依然被RabbitMQ删除了。

把确认机制修改为auto

spring:
  rabbitmq:
    host: 192.168.200.129 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: admin # 用户名
    password: 123 # 密码
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
        acknowledge-mode: auto

 再次发送消息

 

在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unacked(未确定状态):

当我们把配置改为auto时,消息处理失败后,会回到RabbitMQ,并重新投递到消费者。

二、失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。 极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力。

为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

修改consumer服务的application.yml文件,添加内容

spring:
  rabbitmq:
    host: 192.168.200.129 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: admin # 用户名
    password: 123 # 密码
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
        acknowledge-mode: auto #消息确认
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启consumer服务,重复之前的测试。可以发现:

  • 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring会返回reject,消息会被丢弃
2.1、失败处理策略

在之前的测试中,本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了

因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

在消费者里创建一个异常消息配置类

@Configuration
@Slf4j
//当配置文件中spring.rabbitmq.listener.simple.retry.enabled 属性为ture时配置类才生效
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {
    //消息处理失败交换机
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    //消息处理失败队列
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    //绑定关系
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }

    //定义一个RepublishMessageRecoverer,关联队列和交换机
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        log.error("加载RepublishMessageRecoverer");
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

 当接收消息出现异常时,会创建error.queue队列

 可以查看到异常信息

三、业务幂等性

何为幂等性? 幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x)),例如求绝对值函数。 在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:

  • 根据id删除数据
  • 查询数据
  • 新增数据

但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:

  • 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
  • 退款业务。重复退款对商家而言会有经济损失。

所以,我们要尽可能避免业务被重复执行。 然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如:

  • 页面卡顿时频繁刷新导致表单重复提交
  • 服务间调用的重试
  • MQ消息的重复投递

我们在用户支付成功后会发送MQ消息到交易服务,修改订单状态为已支付,就可能出现消息重复投递的情况。如果消费者不做判断,很有可能导致消息被消费多次,出现业务故障。 举例:

  1. 假如用户刚刚支付完成,并且投递消息到交易服务,交易服务更改订单为已支付状态。
  2. 由于某种原因,例如网络故障导致生产者没有得到确认,隔了一段时间后重新投递给交易服务。
  3. 但是,在新投递的消息被消费之前,用户选择了退款,将订单状态改为了已退款状态。
  4. 退款完成后,新投递的消息才被消费,那么订单状态会被再次改为已支付。业务异常。

因此,我们必须想办法保证消息处理的幂等性。这里给出两种方案:

  • 唯一消息ID
  • 业务状态判断
3.1、唯一消息ID
  1. 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
  3. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可。 以Jackson的消息转换器为例 

在生产者和消费者的启动类里加一个配置

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jjmc.setCreateMessageIds(true);
    return jjmc;
}

 测试生产者发送消息会产生一个id

 3.2、业务判断

业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。 例如我们当前案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。

相比较而言,消息ID的方案需要改造原有的数据库,所以我更推荐使用业务判断的方案。

以支付修改订单的业务为例,我们需要修改OrderServiceImpl中的markOrderPaySuccess方法:

 @Override
    public void markOrderPaySuccess(Long orderId) {
        // 1.查询订单
        Order old = getById(orderId);
        // 2.判断订单状态
        if (old == null || old.getStatus() != 1) {
            // 订单不存在或者订单状态不是1,放弃处理
            return;
        }
        // 3.尝试更新订单
        Order order = new Order();
        order.setId(orderId);
        order.setStatus(2);
        order.setPayTime(LocalDateTime.now());
        updateById(order);
    }

上述代码逻辑上符合了幂等判断的需求,但是由于判断和更新是两步动作,因此在极小概率下可能存在线程安全问题。

@Override
public void markOrderPaySuccess(Long orderId) {
    // UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
    lambdaUpdate()
            .set(Order::getStatus, 2)
            .set(Order::getPayTime, LocalDateTime.now())
            .eq(Order::getId, orderId)
            .eq(Order::getStatus, 1)
            .update();
}

注意看,上述代码等同于这样的SQL语句:

UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1

我们在where条件中除了判断id以外,还加上了status必须为1的条件。如果条件不符(说明订单已支付),则SQL匹配不到数据,根本不会执行。

3.3、兜底方案

我们可以在交易服务设置定时任务,定期查询订单支付状态。这样即便MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。
 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1144153.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何学好机器学习?机器学习一定需要下面这几方面知识!

学好机器学习需要有一定的数学基础和计算机编程基础。总结一下,学机器学习需要下面这几方面知识: 数学基础:机器学习中很多方面都设计到了数学知识,较好的数学知识可以在理解和应用机器学习中发挥积极的作用,一般包括…

翻页电子杂志制作方法,看完有手就行

你还在使用传统的纸质杂志进行宣传推广嘛?不打算紧跟时代的发展,融入互联网制作在线翻页电子杂志嘛!电子杂志具有成本低,易于传播,宣传速度快,推广范围大,而且制作电子杂志也很easy噢&#xff0…

支持Lrc2024 Boris FX Optics最新 for mac

Boris FX Optics是一款强大的图像处理软件,提供了各种专业级的特效和滤镜,用于增强和改善视频和图像的外观。该软件适用于照片编辑、摄影、视频制作等领域,具有广泛的应用范围。 Boris FX Optics功能包括但不限于以下几个方面: …

iOS iGameGuardian修改器检测方案

一直以来,iOS 系统的安全性、稳定性都是其与安卓竞争的主力卖点。这要归功于 iOS 系统独特的闭源生态,应用软件上架会经过严格审核与测试。所以,iOS端的作弊手段,总是在尝试绕过 App Store 的审查。 常见的 iOS 游戏作弊&#xf…

删除A文件夹中 AB文件夹共有的文件

import ospath r"Z:\Users\86152\Desktop\chui shen-sw" chui_shen os.listdir(path)# print(chui_shen) # print(len(chui_shen)) for i in chui_shen:os.remove(rZ:\Users\86152\Desktop\ce shen-sw/ i)A文件夹: B文件夹: 我这个情况是 A文…

十九、类型信息(2)

本章概要 Class 对象 类字面常量泛化的 Class 引用cast() 方法 Class 对象 要理解 RTTI 在 Java 中的工作原理,首先必须知道类型信息在运行时是如何表示的。这项工作是由称为 **Class**对象 的特殊对象完成的,它包含了与类有关的信息。实际上&#x…

STM32串口通信

数据通信的基础概念 在单片机的应用中,数据通信是必不可少的一部分,比如:单片机和上位机、单片机和外 围器件之间,它们都有数据通信的需求。由于设备之间的电气特性、传输速率、可靠性要求各 不相同,于是就有了各种通信…

基于布谷鸟算法的无人机航迹规划-附代码

基于布谷鸟算法的无人机航迹规划 文章目录 基于布谷鸟算法的无人机航迹规划1.布谷鸟搜索算法2.无人机飞行环境建模3.无人机航迹规划建模4.实验结果4.1地图创建4.2 航迹规划 5.参考文献6.Matlab代码 摘要:本文主要介绍利用布谷鸟算法来优化无人机航迹规划。 1.布谷鸟…

[Python进阶] 消息框、弹窗:pymsgbox.alert

6.18 消息框、弹窗:pymsgbox.alert 作用: 显示带有文本和单个OK按钮的简单消息框。返回所单击按钮的文本。 参数: text “”, 消息框标题 title “”, 消息框内容 button pymsgbox.OK_TEXT, 消息框自带的按钮,默认为&#xff…

@TableField(fill = FieldFill.INSERT)这个注解的作用

TableField 是 MyBatis-Plus提供的一个注解,用于标注实体类的属性与数据库表的字段之间的映射关系。当你在一个实体类的属性上使用 TableField(fill FieldFill.INSERT) 注解时,你告诉 MyBatis-Plus 在插入记录时自动填充这个字段。 FieldFill.INSERT 是一…

Docker 网络管理及资源控制

目录 1 Docker 网络 1.1 Docker 网络实现原理 1.2 Docker 的网络模式 1.3 网络模式详解 1.3.1 host模式 1.3.2 container模式 1.3.3 none模式 1.3.4 bridge模式 1.3.5 自定义网络 1.4 创建自定义网络 2 资源控制 2.1 CPU 资源控制 2.2 对内存使用的限制 2.3 对磁盘…

如何在【逻辑回归】中优化控制正则化程度的超参数C

一.逻辑回归基本介绍 逻辑回归也称作logistic回归,是一种广义的线性回归分析模型,主要是用来解决二分类问题(也可以解决多分类问题)。通过训练集来训练模型,并在训练结束后对测试集进行分类。 通过激活函数&…

我该如何入门Python机器学习?

我是在研一的时候开始学习机器学习的。相对于题主来说,我更是一窍不通,Python都没有一点点基础。总结一下我在学习Python的过程,以及自己在学习机器学习过程中的用到的优质资源,也总结一下我的学习心得。 一、怎么学习Python&…

开放式耳机百元价位怎么选、公认最好的百元开放式耳机

开放式耳机采用挂耳式的佩戴方式,不需封闭耳道,这一创新设计允许我们欣赏音乐的同时保持对周围环境的感知,从而在户外运动、通勤或其他活动中提供更安全的体验。而且,在预算有限的情况下,我们可以在百元价位范围内找到…

JS中Map对象与object的区别

若想了解Map对象可以阅读本人这篇ES6初步了解Map Map对象与object有什么区别?让我为大家介绍一下吧! 共同点 二者都是以key-value的形式对数据进行存储 const obj {name:"zs",age:18}console.log(obj)let m new Map()m.set("name&quo…

shell算数运算指令、shell的if分支结构使用场景及相关代码

1.shell算数运算的指令 (( )) $[ ] let expr expr的字符串运算 例子: 2.shell的if分支结构 例子:

【C++】C++入门(上)--命名空间 输入输出 缺省参数 函数重载

目录 一 命名空间 1 命名空间的定义 2 命名空间的使用 二 C输入和输出 1 输出 2 输入 三 缺省参数 1 缺省参数概念 2 缺省参数分类 (1) 全缺省参数 (2)半缺省参数 四 函数重载 1 函数重载概念 2 分类 1 参数类型不同 2 参数个数不同 3 参数类型顺序不同 3 C为什…

Python中json的用法

python 中 json的用法 一、JSON 的介绍二、json和python的转换1) python 的字典或列表转换为json2) json转换为python的字典或列表 一、JSON 的介绍 Json本质上一个带有特定格式的字符串,json是一种在各个编程语言中流通的数据格式,负责不同…

Android底层摸索改BUG(二):Android系统移除预置APP

首先我先提供以下博主博文,对相关知识点可以提供理解、解决、思考的 Android 系统如何预装第三方应用以及常见问题汇集android Android.mk属性说明及预置系统app操作说明系Android 中去除系统原生apk的方法 取消预置APK方法一: 其实就是上面的链接3&a…

基于springboot实现休闲娱乐代理售票平台系统项目【项目源码+论文说明】计算机毕业设计

基于springboot实现休闲娱乐代理售票平台系统演示 摘要 网络的广泛应用给生活带来了十分的便利。所以把休闲娱乐代理售票管理与现在网络相结合,利用java技术建设休闲娱乐代理售票系统,实现休闲娱乐代理售票的信息化。则对于进一步提高休闲娱乐代理售票管…