RabbitMQ之消费者可靠性

news2024/11/29 2:31:27

文章目录

  • 前言
  • 一、消费者确认机制
  • 二、失败重试机制
  • 三、失败处理策略
  • 四、业务幂等性
    • 唯一消息ID
    • 业务判断
  • 五、兜底方案
  • 总结


前言

当RabbitMQ向消费者投递消息以后,需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定被正确消费了,可能出现的故障有很多,比如:

  • 消息投递的过程中出现了网络故障
  • 消费者接收到消息后突然宕机
  • 消费者接收到消息后,因处理不当导致异常

一旦发生上述情况,消息也会丢失。因此,RabbitMQ必须知道消费者的处理状态,一旦消息处理失败才能重新投递消息。
但问题来了:RabbitMQ如何得知消费者的处理状态呢?


一、消费者确认机制

  • 为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

    • ack:成功处理消息,RabbitMQ从队列中删除该消息
    • nack:消息处理失败,RabbitMQ需要再次投递消息
    • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
  • 一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ack,处理失败时返回nack。

  • 由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

    • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
    • manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
    • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:
      • 如果是业务异常,会自动返回nack;
      • 如果是消息处理或校验异常,自动返回reject;

通过下面的配置可以修改SpringAMQP的ACK处理方式:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none # 不做处理

模拟一个消息处理的异常:

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
    log.info("spring 消费者接收到消息:【" + msg + "】");
    if (true) {
        throw new MessageConversionException("故意的");
    }
    log.info("消息处理完成");
}

测试可以发现:当消息处理发生异常时,消息依然被RabbitMQ删除了。

我们再次把确认机制修改为auto:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 自动ack

在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unacked(未确定状态):

在这里插入图片描述

放行以后,由于抛出的是消息转换异常,因此Spring会自动返回reject,所以消息依然会被删除:

在这里插入图片描述
我们将异常改为RuntimeException类型(此时配置还是auto):

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
    log.info("spring 消费者接收到消息:【" + msg + "】");
    if (true) {
        throw new RuntimeException("故意的");
    }
    log.info("消息处理完成");
}

在异常位置打断点,然后再次发送消息测试,程序卡在断点时,可以发现此时消息状态为unacked(未确定状态):

在这里插入图片描述
异常,所以Spring返回ack,最终消息恢复至Ready状态,并且没有被RabbitMQ删除:

在这里插入图片描述
所以当我们把配置改为auto时,如果是业务异常,会自动返回nack,消息处理失败后,会回到RabbitMQ,并重新投递到消费者。

二、失败重试机制

  • 当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。
  • 极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力。
  • 当然,上述极端情况发生的概率还是非常低的,不过不怕一万就怕万一。为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

application.yml文件,添加内容:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重复之前的测试。可以发现:

  • 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次。
  • 本地重试3次以后,抛出了AmqpRejectAndDontRequeueException异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject。

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试。
  • 重试达到最大次数后,Spring会返回reject,消息会被丢弃。

三、失败处理策略

在之前的测试中,本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。
因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

定义处理失败消息的交换机和队列:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

四、业务幂等性

幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x)),例如求绝对值函数。
在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:

  • 根据id删除数据
  • 查询数据
  • 新增数据

但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:

  • 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
  • 退款业务。重复退款对商家而言会有经济损失。

所以,我们要尽可能避免业务被重复执行。
然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如:

  • 页面卡顿时频繁刷新导致表单重复提交
  • 服务间调用的重试
  • MQ消息的重复投递

我们在用户支付成功后会发送MQ消息到交易服务,修改订单状态为已支付,就可能出现消息重复投递的情况。如果消费者不做判断,很有可能导致消息被消费多次,出现业务故障。
举例:

  • 假如用户刚刚支付完成,并且投递消息到交易服务,交易服务更改订单为已支付状态。
  • 由于某种原因,例如网络故障导致生产者没有得到确认,隔了一段时间后重新投递给交易服务。
  • 但是,在新投递的消息被消费之前,用户选择了退款,将订单状态改为了已退款状态。
  • 退款完成后,新投递的消息才被消费,那么订单状态会被再次改为已支付。业务异常。

因此,我们必须想办法保证消息处理的幂等性。这里给出两种方案:

  • 唯一消息ID
  • 业务状态判断

唯一消息ID

这个思路非常简单:

  • 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
  • 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
  • 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

我们该如何给消息添加唯一ID呢?
其实很简单,SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可。

以Jackson的消息转换器为例:

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jjmc.setCreateMessageIds(true);
    return jjmc;
}

业务判断

  • 业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。
  • 例如我们当前案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。
  • 相比较而言,消息ID的方案需要改造原有的数据库,所以我更推荐使用业务判断的方案。

以支付修改订单的业务为例:

@Override
public void markOrderPaySuccess(Long orderId) {
    // UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
    lambdaUpdate()
            .set(Order::getStatus, 2)
            .set(Order::getPayTime, LocalDateTime.now())
            .eq(Order::getId, orderId)
            .eq(Order::getStatus, 1)
            .update();
}

我们在where条件中除了判断id以外,还加上了status必须为1的条件。如果条件不符(说明订单已支付),则SQL匹配不到数据,根本不会执行。

五、兜底方案

  • 虽然我们利用各种机制尽可能增加了消息的可靠性,但也不好说能保证消息100%的可靠。万一真的MQ通知失败该怎么办呢?
  • 假设我们要做一个支付项目,有没有其它兜底方案,能够确保订单的支付状态一致呢?
  • 其实思想很简单:既然MQ通知不一定发送到交易服务,那么交易服务就必须自己主动去查询支付状态。这样即便支付服务的MQ通知失败,我们依然能通过主动查询来保证订单状态的一致。
  • 通常我们采取的措施就是利用定时任务定期查询,例如每隔20秒就查询一次,并判断支付状态。如果发现订单已经支付,则立刻更新订单状态为已支付即可。

综上,支付服务与交易服务之间的订单状态一致性是如何保证的?

  • 首先,支付服务会正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。
  • 其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递的可靠性
  • 最后,我们还在交易服务设置了定时任务,定期查询订单支付状态。这样即便MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。

总结

以上就是实现消息可靠性的详细讲解。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1254049.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C++17中std::byte的使用

C17中std::byte的定义如下:std::byte本质上代表一个字节的值 enum class byte : unsigned char {}; 与unsigned char一样,std::byte可以用来访问其它对象(对象表示)占用的原始内存(raw memory),但与unsigned char不同的是,它不是字符类型&am…

基于springboot实现应急救援物资管理系统项目【项目源码】

基于springboot实现应急救援物资管理系统演示 JAVA简介 JavaScript是一种网络脚本语言,广泛运用于web应用开发,可以用来添加网页的格式动态效果,该语言不用进行预编译就直接运行,可以直接嵌入HTML语言中,写成js语言&a…

【H5 Canvas】【平面几何】特殊图形绘制(箭头/正多边/正多尖角形等)

文章目录 直线/弧线 箭头 直线/弧线 箭头 // startX,startY 起始坐标 // endX,endY 结束坐标 // angel 圆弧角度,取值[0,PI]; 0表示画直线箭头,否则画圆弧箭头 CanvasRenderingContext2D.prototype.drawArrow function(startX,startY,endX,endY,angel)…

Pinctrl子系统和GPIO子系统

Pinctrl子系统: 借助Princtr子系统来设置一个Pin的复用和电气属性; pinctrl子系统主要做的工作是:1. 获取设备树中的PIN信息;2.根据获取到的pin信息来设置的Pin的复用功能;3.根据获取到的pin信息去设置pin的电气特性…

C++ 通过SQLite实现命令行工具

本文介绍了一个基于 C、SQLite 和 Boost 库的简单交互式数据库操作 Shell。该 Shell 允许用户通过命令行输入执行各种数据库操作,包括添加、删除主机信息,设置主机到特定主机组,以及显示主机和主机组列表。通过调用 SQLite3 库实现数据库连接…

零基础学Linux内核:1、Linux源码组织架构

文章目录 前言一、Linux内核的特征二、Linux操作系统结构1.Linux在系统中的位置2.Linux内核的主要子系统3、Linux系统主要数据结构 三、linux内核源码组织1、下载Linux源码2、Linux版本号3、linux源码架构目录讲解 前言 这里将是我们从零开始学习Linux的第一节,这节…

python 基于opencv和face_recognition的人脸识别

python 基于opencv和face_recognition的人脸识别 代码如下: 使用一个photos存放你需要识别的照片,注意一个人一张就行 然后通过下面代码注册用户,之后启动程序,就会调用摄像头进行识别了。 AddPhoto(“发哥”, “./photos/fag…

京东APP在哪里找到如何申请价格保护查看购买商品价格保护情况的记录信息?

京东价格保护是一项优质售后服务,用户在京东购买商品后,如果该商品在保护期内降价,用户可以申请价格保护,京东将补差价或返还京豆。这项服务旨在保障用户权益,让用户在购买商品时更加安心。用户在购买商品后&#xff0…

python-opencv 人脸68点特征点检测

python-opencv 人脸68点特征点检测 不是很难,主要还是掉包,来看一下代码啊: # coding: utf-8 # 导包 import numpy as np import dlib import cv2class face_emotion(object):def __init__(self):# 人脸检测器对象,通过它拿到人…

2024年天津天狮学院专升本食品质量与安全专业《分析化学》考纲

2024年天津天狮学院食品质量与安全专业高职升本入学考试《分析化学》考试大纲 一、考试性质 《分析化学》专业课程考试是天津天狮学院食品质量与安全专业高职升本入学考试 的必考科目之一,其性质是考核学生是否达到了升入本科继续学习的要求而进行的选拔性考试。《…

JSP 条件动作标签之if标签详解

好 上文 JSP JSTL引入依赖并演示基础使用我们导入了 JSTL的JAR 并 演示了 IF标签的基础使用 本文 我们来说说 平时开发的常用标签 这里 我们需要先强调一下 常用标签 操作的全部都是域对象 首先 我们来看 条件动作标签 条件动作标签的特点是 依赖于某些域对象值 控制页面输出…

【机器学习 | 聚类】关于聚类最全评价方法大全,确定不收藏?

🤵‍♂️ 个人主页: AI_magician 📡主页地址: 作者简介:CSDN内容合伙人,全栈领域优质创作者。 👨‍💻景愿:旨在于能和更多的热爱计算机的伙伴一起成长!!&…

队列实现栈VS栈实现队列

目录 【1】用队列实现栈 思路分析 ​ 易错总结 Queue.c&Queue.h手撕队列 声明栈MyStack 创建&初始化栈myStackCreate 压栈myStackPush 出栈&返回栈顶元素myStackPop 返回栈顶元素myStackTop 判断栈空否myStackEmpty 释放空间myStackFree MyStack总代码…

【LeetCode:828. 统计子串中的唯一字符 | 贡献法 乘法原理】

🚀 算法题 🚀 🌲 算法刷题专栏 | 面试必备算法 | 面试高频算法 🍀 🌲 越难的东西,越要努力坚持,因为它具有很高的价值,算法就是这样✨ 🌲 作者简介:硕风和炜,…

YOLOv5小目标检测层

目录 一、原理 二、yaml配置文件 一、原理 小目标检测层,就是增加一个检测头,增加一层锚框,用来检测输入图像中像素较小的目标 二、yaml配置文件 # YOLOv5 🚀 by Ultralytics, GPL-3.0 license# Parameters nc: 3 # number of classes depth_multiple: 0.33 # model…

Matplotlib网格子图_Python数据分析与可视化

Matplotlib网格子图 plt.subplot()绘制子图调整子图之间的间隔plt.subplots创建网格 plt.subplot()绘制子图 若干彼此对齐的行列子图是常见的可视化任务,matplotlib拥有一些可以轻松创建它们的简便方法。最底层且最常用的方法是plt.subplot()。 这个函数在一个网格…

RocketMq 队列(MessageQueue)

RocketMq是阿里出品(基于MetaQ)的开源中间件,已捐赠给Apache基金会并成为Apache的顶级项目。基于java语言实现,十万级数据吞吐量,ms级处理速度,分布式架构,功能强大,扩展性强。 官方…

C++二分查找:统计点对的数目

本题其它解法 C双指针算法:统计点对的数目 本周推荐阅读 C二分算法:得到子序列的最少操作次数 本文涉及的基础知识点 二分查找算法合集 题目 给你一个无向图,无向图由整数 n ,表示图中节点的数目,和 edges 组成…

赢麻了!义乌一个村有5000个网红,有人年收租就300万!

#义乌一村电商年成交额超300亿# ,在中国,电商行业的发展可谓是日新月异,而位于浙江省义乌市的江北下朱村,正是这股潮流的一个典型代表。这个村子,处处弥漫着“直播”的气息,仿佛每个人都在为这个新兴行业助力。 江北下…

openEuler Linux 部署 FineBi

openEuler Linux 部署 FineBi 部署环境 环境版本openEuler Linux22.03MySQL8.0.35JDK1.8FineBi6.0 环境准备 升级系统内核和软件 yum -y updatereboot安装常用工具软件 yum -y install vim tar net-tools 安装MySQL8 将 MySQL Yum 存储库添加到系统的存储库列表中 sudo…