RabbitMQ-5.消费者的可靠性

news2025/1/18 22:13:38

消费者的可靠性

  • 5.消费者的可靠性
    • 5.1.消费者确认机制
    • 5.2.失败重试机制
    • 5.3.失败处理策略
    • 5.4.业务幂等性
      • 5.4.1.唯一消息ID
      • 5.4.2.业务判断
    • 5.5.兜底方案

5.消费者的可靠性

当RabbitMQ向消费者投递消息以后,需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定被正确消费了,可能出现的故障有很多,比如:

  • 消息投递的过程中出现了网络故障
  • 消费者接收到消息后突然宕机
  • 消费者接收到消息后,因处理不当导致异常

一旦发生上述情况,消息也会丢失。因此,RabbitMQ必须知道消费者的处理状态,一旦消息处理失败才能重新投递消息。
但问题来了:RabbitMQ如何得知消费者的处理状态呢?

5.1.消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ack,处理失败时返回nack.

由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
  • manual:手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活
  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:
    • 如果是业务异常,会自动返回nack
    • 如果是消息处理或校验异常,自动返回reject;

返回Reject的常见异常有:

Starting with version 1.3.2, the default ErrorHandler is now a ConditionalRejectingErrorHandler that rejects (and does not requeue) messages that fail with an irrecoverable error. Specifically, it rejects messages that fail with the following errors:

  • o.s.amqp…MessageConversionException: Can be thrown when converting the incoming message payload using a MessageConverter.
  • o.s.messaging…MessageConversionException: Can be thrown by the conversion service if additional conversion is required when mapping to a @RabbitListener method.
  • o.s.messaging…MethodArgumentNotValidException: Can be thrown if validation (for example, @Valid) is used in the listener and the validation fails.
  • o.s.messaging…MethodArgumentTypeMismatchException: Can be thrown if the inbound message was converted to a type that is not correct for the target method. For example, the parameter is declared as Message but Message is received.
  • java.lang.NoSuchMethodException: Added in version 1.6.3.
  • java.lang.ClassCastException: Added in version 1.6.3.

通过下面的配置可以修改SpringAMQP的ACK处理方式:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none # 不做处理

修改consumer服务的SpringRabbitListener类中的方法,模拟一个消息处理的异常:

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
    log.info("spring 消费者接收到消息:【" + msg + "】");
    if (true) {
        throw new MessageConversionException("故意的");
    }
    log.info("消息处理完成");
}

测试可以发现:当消息处理发生异常时,消息依然被RabbitMQ删除了。

我们再次把确认机制修改为auto:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 自动ack

在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unacked(未确定状态):
image.png
放行以后,由于抛出的是消息转换异常,因此Spring会自动返回reject,所以消息依然会被删除:
image.png

我们将异常改为RuntimeException类型:

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
    log.info("spring 消费者接收到消息:【" + msg + "】");
    if (true) {
        throw new RuntimeException("故意的");
    }
    log.info("消息处理完成");
}

在异常位置打断点,然后再次发送消息测试,程序卡在断点时,可以发现此时消息状态为unacked(未确定状态):
image.png放行以后,由于抛出的是业务异常,所以Spring返回ack,最终消息恢复至Ready状态,并且没有被RabbitMQ删除:
image.png
当我们把配置改为auto时,消息处理失败后,会回到RabbitMQ,并重新投递到消费者。

5.2.失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。
极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力:
image.png

当然,上述极端情况发生的概率还是非常低的,不过不怕一万就怕万一。为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

修改consumer服务的application.yml文件,添加内容:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启consumer服务,重复之前的测试。可以发现:

  • 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次
  • 本地重试3次以后,抛出了AmqpRejectAndDontRequeueException异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring会返回reject,消息会被丢弃

5.3.失败处理策略

在之前的测试中,本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。
因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

1)在consumer服务中定义处理失败消息的交换机和队列

@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

2)定义一个RepublishMessageRecoverer,关联队列和交换机

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

完整代码如下:

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

5.4.业务幂等性

何为幂等性?
幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x)),例如求绝对值函数。
在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:

  • 根据id删除数据
  • 查询数据
  • 新增数据

但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:

  • 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
  • 退款业务。重复退款对商家而言会有经济损失。

所以,我们要尽可能避免业务被重复执行。
然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如:

  • 页面卡顿时频繁刷新导致表单重复提交
  • 服务间调用的重试
  • MQ消息的重复投递

我们在用户支付成功后会发送MQ消息到交易服务,修改订单状态为已支付,就可能出现消息重复投递的情况。如果消费者不做判断,很有可能导致消息被消费多次,出现业务故障。
举例:

  1. 假如用户刚刚支付完成,并且投递消息到交易服务,交易服务更改订单为已支付状态。
  2. 由于某种原因,例如网络故障导致生产者没有得到确认,隔了一段时间后重新投递给交易服务。
  3. 但是,在新投递的消息被消费之前,用户选择了退款,将订单状态改为了已退款状态。
  4. 退款完成后,新投递的消息才被消费,那么订单状态会被再次改为已支付。业务异常。

因此,我们必须想办法保证消息处理的幂等性。这里给出两种方案:

  • 唯一消息ID
  • 业务状态判断

5.4.1.唯一消息ID

这个思路非常简单:

  1. 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
  3. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

我们该如何给消息添加唯一ID呢?
其实很简单,SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可。
以Jackson的消息转换器为例:

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jjmc.setCreateMessageIds(true);
    return jjmc;
}

5.4.2.业务判断

业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。
例如我们当前案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。

相比较而言,消息ID的方案需要改造原有的数据库,所以我更推荐使用业务判断的方案。

以支付修改订单的业务为例,我们需要修改OrderServiceImpl中的markOrderPaySuccess方法:

@Override
public void markOrderPaySuccess(Long orderId) {
    // 1.查询订单
    Order old = getById(orderId);
    // 2.判断订单状态
    if (old == null || old.getStatus() != 1) {
        // 订单不存在或者订单状态不是1,放弃处理
        return;
    }
    // 3.尝试更新订单
    Order order = new Order();
    order.setId(orderId);
    order.setStatus(2);
    order.setPayTime(LocalDateTime.now());
    updateById(order);
}

上述代码逻辑上符合了幂等判断的需求,但是由于判断和更新是两步动作,因此在极小概率下可能存在线程安全问题。

我们可以合并上述操作为这样:

@Override
public void markOrderPaySuccess(Long orderId) {
    // UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
    lambdaUpdate()
            .set(Order::getStatus, 2)
            .set(Order::getPayTime, LocalDateTime.now())
            .eq(Order::getId, orderId)
            .eq(Order::getStatus, 1)
            .update();
}

注意看,上述代码等同于这样的SQL语句:

UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1

我们在where条件中除了判断id以外,还加上了status必须为1的条件。如果条件不符(说明订单已支付),则SQL匹配不到数据,根本不会执行。

5.5.兜底方案

虽然我们利用各种机制尽可能增加了消息的可靠性,但也不好说能保证消息100%的可靠。万一真的MQ通知失败该怎么办呢?
有没有其它兜底方案,能够确保订单的支付状态一致呢?

其实思想很简单:既然MQ通知不一定发送到交易服务,那么交易服务就必须自己主动去查询支付状态。这样即便支付服务的MQ通知失败,我们依然能通过主动查询来保证订单状态的一致。
流程如下:

图中黄色线圈起来的部分就是MQ通知失败后的兜底处理方案,由交易服务自己主动去查询支付状态。

不过需要注意的是,交易服务并不知道用户会在什么时候支付,如果查询的时机不正确(比如查询的时候用户正在支付中),可能查询到的支付状态也不正确。
那么问题来了,我们到底该在什么时间主动查询支付状态呢?

这个时间是无法确定的,因此,通常我们采取的措施就是利用定时任务定期查询,例如每隔20秒就查询一次,并判断支付状态。如果发现订单已经支付,则立刻更新订单状态为已支付即可。
定时任务大家之前学习过,具体的实现这里就不再赘述了。

至此,消息可靠性的问题已经解决了。

综上,支付服务与交易服务之间的订单状态一致性是如何保证的?

  • 首先,支付服务会正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。
  • 其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递的可靠性
  • 最后,我们还在交易服务设置了定时任务,定期查询订单支付状态。这样即便MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1438859.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

jvm几个常见面试题整理

1. Full GC触发机制有如下5种情况。 (1)调用System.gc()时,系统建议执行Full GC,但是不必然执行。(2)老年代空间不足。(3)方法区空间不足。(4)老年代的最大可用连续空间小于历次晋升到老年代对象的平均大小就会进行Full GC。(5)由Eden区、S0(From)区向S…

前端vite+vue3——自动化配置路由布局

文章目录 ⭐前言💖vue3系列文章 ⭐ 自动化配置路由💖引入vite版本自定义目录映射💖自动化读取文件下的路由💖main入口加载路由💖入口app.vue配置💖layout基础布局配置💖效果 ⭐总结⭐结束 ⭐前言…

python实现中国剩余定理

中国剩余定理又称孙子定理,是数论中一个重要定理。最早可见于我国的数学著作《孙子算经》卷下“物不知数”问题,原文如下: 有物不知其数,三三数之剩二,五五数之剩三,七七数之剩二。问物几何?即…

车载网络测试 - 总线基础 - CAN总线负载计算

我想做过CAN总线测试的都有遇到过拉高总线负载相关的测试,这个时候我们一般都会通过增加报文的数量或者减小报文的周期来实现,但是CAN总线上的负载到底是如何计算的呢?我想很多人都会有这个疑问吧,那么今天我们一起来看下如何计算…

CX341A 安装驱动与刷固件

参考 驱动安装1 DPDK编译:支持Mellanox 25Gbps网卡 - 知乎 NVIDIA Mellanox CX网卡固件、驱动系列操作 - 知乎 驱动安装2 Mellanox网卡驱动安装指南 Mellanox OFED_崇尚匀速 追求极致的技术博客_51CTO博客 驱动与固件: 家用万兆网络指南 6 - 比…

当前的脑机交互更像是自动化交互,而不是智能化交互

脑机交互是指通过直接连接人类大脑与外部设备,实现人与计算机、机器或其他设备之间的交互。目前的脑机交互技术还存在许多挑战和限制,因此可以说脑机交互还远远不成熟。当前的脑机交互更像是自动化交互,而不是智能化交互。 目前的脑机交互技术…

ArcGIS学习(六)地理数据库

ArcGIS学习(六)地理数据库 上个任务我们讲了一个非常重要的知识点一一坐标系。这个任务我们带来另外一个很重要的知识点一一地理数据库。 地理数据库的内容相比于坐标系简单很多! 首先,先让我们来学习下地理数据库的理论。 ArcGIS 中的地理数据库(Geodatabase)是一个用…

Verilog刷题笔记22

题目: Build a priority encoder for 8-bit inputs. Given an 8-bit vector, the output should report the first (least significant) bit in the vector that is 1. Report zero if the input vector has no bits that are high. For example, the input 8’b100…

2019年江苏省职教高考计算机技能考试——一道程序改错题的分析

题目:函数将str字符串中的5个数字字符串转换为整数,并保存在二维数组m的最后一行,各元素为3、-4、16、18、6。并经函数move处理后,运行结果如下: 18 6 3 -4 16 16 18 6 3 -4 -4 16 …

Spark安装(Yarn模式)

一、解压 链接:https://pan.baidu.com/s/1O8u1SEuLOQv2Yietea_Uxg 提取码:mb4h tar -zxvf /opt/software/spark-3.0.3-bin-hadoop3.2.tgz -C /opt/module/spark-yarn mv spark-3.0.3-bin-hadoop3.2/ spark-yarn 二、配置环境变量 vim /etc/profile…

macbook电脑如何永久删除app软件?

在使用MacBook的过程中,我们经常会下载各种App来满足日常的工作和娱乐需求。然而,随着时间的积累,这些App不仅占据了宝贵的硬盘空间,还可能拖慢电脑的运行速度。那么,如何有效地管理和删除这些不再需要的App呢&#xf…

【51单片机】外部中断和定时器中断

目录 中断系统中断介绍中断概念 中断结构及相关寄存器中断结构中断相关寄存器 外部中断实验外部中断配置软件设计实验现象 定时器中断定时器介绍51 单片机定时器原理51 单片机定时/计数器结构51 单片机定时/计数器的工作方式 定时器配置硬件设计软件设计实验现象 中断系统 本章…

运维必会篇-日志(错误日志,二进制日志,查询日志,慢查询日志)

日志 错误日志 错误日志是 MySQL 中最重要的日志之一,它记录了当 mysqld 启动和停止时,以及服务器在运行过 程中发生任何严重错误时的相关信息。当数据库出现任何故障导致无法正常使用时,建议首先查看此日 志。 该日志是默认开启的&#x…

SpringBoot 事务管理Transactional 数据回滚 数据一致性

介绍 SpringBoot当中的事物他保证了一致性,要么全部一起成功(提交),要么一起失败,失败(回滚)后数据会回到当初的样子,是一组操作的集合。 事物类型 开启事物提交事物回滚事物 案…

计算机毕业设计 | SSM超市进销存管理系统(附源码)

1,绪论 1.1 开发背景 世界上第一个购物中心诞生于美国纽约,外国人迈克尔库伦开设了第一家合作商店,为了更好地吸引大量客流量,迈克尔库伦精心设计了低价策略,通过大量进货把商品价格压低,通过商店一次性集…

面试经典150题——两数之和 II - 输入有序数组

"The only limit to our realization of tomorrow will be our doubts of today." - Franklin D. Roosevelt 1. 题目描述 2. 题目分析与解析 2.1 思路一——暴力求解 暴力求解的思路就是通过两次for循环,外层循环遍历整个数组,内层循环遍…

蓝桥杯Web应用开发-CSS3 新特性【练习二:获得焦点验证】

页面上有一个姓名输入框和一个密码输入框&#xff0c;当聚焦输入框时&#xff0c;输入框的背景颜色会发生改变&#xff0c; 新建一个 index3.html 文件&#xff0c;在其中写入以下内容。 <!DOCTYPE html> <html lang"en"><head><meta charset&…

画出TCP三次握手和四次挥手的示意图,并且总结TCP和UDP的区别

三次握手 第一次握手&#xff1a;客户端发送SYN包&#xff08;SYN1, seq0&#xff09;给服务器&#xff0c;并进入SYN_SENT状态&#xff0c;等待服务器返回确认包。第二次握手&#xff1a;服务器接收到SYN包&#xff0c;确认客户端的SYN&#xff0c;发送ACK包&#xff08;ACK1 …

c语言--指针数组(详解)

目录 一、什么是指针数组&#xff1f;二、指针数组模拟二维数组 一、什么是指针数组&#xff1f; 指针数组是指针还是数组&#xff1f; 我们类比一下&#xff0c;整型数组&#xff0c;是存放整型的数组&#xff0c;字符数组是存放字符的数组。 那指针数组呢&#xff1f;是存放…

css绘制向左三角形_纯css 实现三角形

首先这个思路的讲解 就是用到了 border 边框这个属性 一个div 可以设置四边边框 我们先把其他三条边都去掉 只留下一个边框 其他 在设置底边框的宽度 再把内容区域设置为0 就可以了 下面是代码 <div></div>div {width:0;height:0;border-top:30px solid red ;bor…