RabbitMQ——消息的可靠性处理

news2025/1/24 8:48:11

1.业务分析

        在业务的开发中,我们通常将业务的非核心业务交给MQ来处理,比如支付,在支付过后,我们需要扣减余额,修改支付单状态,修改订单状态,发送短信提醒用户,给用户增加积分等等(可能真是场景并非这么简单,这里举个例子),在这套业务中,修改订单状态,发送短信提醒用户,给用户增加积分这三个业务在特定场景下并非是核心业务,所以把他放在MQ消息队列中进行处理,在非核心业务执行的时候,可能出现多个问题,导致数据不一致,比如用户买了东西,发现自己的钱已经扣了,但是页面显示的还是未支付状态,这种情况就非常严重了,造成这个现象的原因可能有多种,比如,网络丢包,发布消息者挂了,MQ挂了,消费者挂了等等,可能有硬件层面,也可能有软件层面,甚至是网络层面,对于这一系列问题,我们应该尽量的保证消息的可靠性,让数据一致性得到保证,接下来,从三个方面进行分析,以及提出解决方案,不过具体还是得看业务需求,是否需要数据的强一致性。

2.发送者的可靠性

        2-1:发送者重连

        消息的发布有三个角色,发布者,MQ,消费者,在发消息的时候需要和MQ进行连接,这个连接时一个网络连接,如果因为网络问题,消息发送失败,可能会导致数据不一致产生。

解决方案:在发布者的application.yaml中配置

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier,如果是2,第一此初始化1秒,2秒,4秒,以此类推
        max-attempts: 3 # 最大重试次数

        注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

        2-2:发送者确认机制

SpringAMQP提供了Publisher Confirm和Publisher Return两种确认机制。开启确机制认后,当发送者发送消息给MQ后,MQ会返回确认结果给发送者,发送者再证实这个返回结果。返回的结果有以下几种情况:

  • 消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
  • 其它情况都会返回NACK,告知投递失败

92c4e225b2cc46f1bddcb507065bc3c8.png

返回ACK不需要重发,NACK需要重发

在publisher模块的application.yaml中添加配置:

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开启publisher return机制

这里 publisher-confirm-type 有三种模式可选:

  • none : 关闭confirm机制
  • simple:同步阻塞等待MQ的回执
  • correlated:MQ异步回调返回回执

一般推荐使用 correlated,回调机制。

每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:

@Configuration
@RequiredArgsConstructor
@Slf4j
public class MqConfig {

    private final RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                log.error("触发returnCallback");
                log.debug("exchange:{}",returnedMessage.getExchange());
                log.debug("message:{}",returnedMessage.getMessage());
                log.debug("routingKey:{}",returnedMessage.getRoutingKey());
                log.debug("replyCode:{}",returnedMessage.getReplyCode());
                log.debug("replyText:{}",returnedMessage.getReplyText());
            }
        });
    }
}

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:

@RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "simple2.queue"),
            exchange = @Exchange(name = "simple2.direct"),
            key = "simple2"
    ))
    public void testPublisherConfirmRule() {
        CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
        cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            @Override
            public void onFailure(Throwable ex) {
                // Future发生异常时的处理逻辑,基本不会触发
                log.error("Future发生异常时的处理逻辑", ex);
            }

            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                if (result.isAck()){
                    log.debug("发送消息成功,收到ACK!");
                }
                else {
                    log.error("发送消息失败,收到NACK!");
                    //TODO: 重发消息

                }
            }
        });
        //交换机名称
        String exchange = "simple2.direct";
        //消息
        String message = "hello mq";
        //发送消息
        rabbitTemplate.convertAndSend(exchange, "simple2", message, cd);
    }

3.MQ的可靠性

        在默认情况下,RabbitMQ会将接受到的消息保存在内存中以降低消息收发的延迟。这样会导致两个问题:

  • 一旦MQ宕机,内存中的消息会丢失
  • 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞

方案一:数据持久化

数据持久化包括三个方面:交换机持久化,队列持久化,消息持久化

SpringAMQP默认生成的交换机和队列以及发消息都是持久化的

方案二:lazy queue(既能保证并发能力,又不用写内存)

        从RabbitMQ的3.6.0版本开始,就增加了LazyQueue的概念,也就是惰性队列。惰性队列的特征如下:

  • 接收到消息后直接存入磁盘,不再存储到内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(可以提前缓存部分消息到内存,最多2048条)在3.12版本后,所有队列都是LazyQueue模式,无法更改。

那如何把queue变成lazy queue,可以基于声明bean的形式,也可以通过注解的方式

@Bean
public Queue lazyQueue(){
    return QueueBuilder
            .durable("lazy.queue")
            .lazy() // 开启Lazy模式
            .build();
}
@RabbitListener(queuesToDeclare = @Queue(
        name = "lazy.queue",
        durable = "true",
        arguments = @Argument(name = "x-queue-mode", value = "lazy") //这两个是固定的
))
public void listenLazyQueue(String msg){
    log.info("接收到 lazy.queue的消息:{}", msg);
}

开启持久化和生产者确认时,RabbitMQ只有在消息持久化完成后才会给生产者返回ACK回执

4.消费者的可靠性

        4-1:消费者确认机制

        消费者确认机制时为了确认消费者是否成功处理消息。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息的处理状态。状态有三种:

  • ack:成功的处理了消息,RabbitMQ队列中删除这个消息
  • nack:消息处理失败,RabbitMQ就会再次给消费者投递消息,持续投递
  • reject:消息处理失败,RabbitMQ队列中删除这个消息(一般是消息内容有问题,所以拒绝)

0bdc481054844c36b234f5020190e542.png

        SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
  • manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack.当业务出现异常时,根据异常判断返回不同结果:
  1. 如果是业务异常,会自动返回nack
  2. 如果是消息处理或校验异常,自动返回reject

需要配置在消费的application.yaml中

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 自动ack

4-2:失败重试机制

        SpringAMQP提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限的requeue到mq。通过在消费者的application.yaml文件中添加配置来开启重试机制:

spring:
  rabbitmq:
    listener:  # 这里注意区别,发布者有个失败重连机制和这个配置很像
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

        在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

第三种示例图

1d0061a9a9c04b3dae1fbf6915904e7c.png

1)在consumer服务中定义处理失败消息的交换机和队列

@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

2)定义一个RepublishMessageRecoverer,关联队列和交换机

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

完整代码:

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); //第二个参数是routingKey
    }
}

4-3:幂等性业务

        在程序开发中,是指同一个业务,执行一次或多次对业务状态的运行是一致的。

 

        如果网络问题出现故障,有可能出现把一个业务做了多次,比如扣减库存,总不能扣减两次吧,于是可以采取对每一个消息指定一个消息id,id值唯一,然后执行完这个消息后,把这个消息id存到数据库里,然后每次执行消息的时候,可以去数据库查一下有没有这个消息,如果有,就代表这个消息之前执行过了,于是就不对这个消息做处理。

        在做注册消息转换器为bean的时候,可以设置消息的id,然后我们接受消息的时候用Message来接就可以

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jjmc.setCreateMessageIds(true);
    return jjmc;
}

        用Message接,然后message.getMessageProperties().getMessageId() 获取id,那这个id做业务判断就可以了。

以上是对于非幂等业务的一种方案,但明显这种方案不太好。影响mq性能。

另一种就是基于具体业务逻辑来进行判断来实现业务的幂等,比如我在一个业务执行前,先判断这个业务的一个状态,如果状态以及修改过了,我直接不做处理就行了,如果没有,我在进行修改。

5.延迟消息

        5-1:死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.rejectbasic.nack声明消费失败,并且消息的requeue参数设置为false

  • 消息是一个过期消息,超时无人消费

  • 要投递的队列消息满了,无法投递

        如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。

5ec9b7a546924919b0d7ac7fca942f88.png

死信交换机有什么作用呢?

  1. 收集那些因处理失败而被拒绝的消息

  2. 收集那些因队列满了而被拒绝的消息

  3. 收集因TTL(有效期)到期的消息

声明正常队列和正常交换机和的时候使用bean方式,因为正常队列要指定死信交换机,

 @Bean
 public Queue normalQueue(){
     return QueueBuilder
             .durable("normal.queue")
             .deadLetterExchange("dlx.direct")
             .build();
 }

发送者示例代码:给正常队列设置一个TTL,如果时间到了,把消息给死信交换机,模拟延迟消息

rabbitTemplate.convertAndSend("normal.direct", "hi", "hello", new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("1000000"); 
                return message;
            }
        });

5-2:延迟消息插件

先去社区下载 https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

去查看RabbitMQ的插件目录对应的数据卷。

docker volume inspect mq-plugins

然后进入插件那么目录

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

这样插件就装好了

然后声明一个延迟交换机,基于注解方式

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayMessage(String msg){
    log.info("接收到delay.queue的延迟消息:{}", msg);
}

基于bean方式

@Bean
public DirectExchange delayExchange(){
    return ExchangeBuilder
            .directExchange("delay.direct") // 指定交换机类型和名称
            .delayed() // 设置delay的属性为true
            .durable(true) // 持久化
            .build();
}

发送延迟消息:

@Test
void testPublisherDelayMessage() {
    // 1.创建消息
    String message = "hello, delayed message";
    // 2.发送消息,利用消息后置处理器添加消息头
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 添加延迟消息属性
            message.getMessageProperties().setDelay(5000);
            return message;
        }
    });
}

注意:

延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。

因此,不建议设置延迟时间过长的延迟消息

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2174876.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

技术分享|一文读懂三维建模技术

在上一期推文中&#xff0c;我们简要介绍了国产3A级大作游戏《黑神话&#xff1a;悟空》中应用的实时渲染技术&#xff0c;同时&#xff0c;还展示了RflySim工具链中基于Unreal Engine虚幻引擎开发的三维可视化显示软件—RflySim3D/UE5。它利用高逼真度的仿真技术&#xff0c;结…

商家营销工具架构升级总结

今年以来&#xff0c;商家营销工具业务需求井喷&#xff0c;需求数量多且耗时都比较长&#xff0c;技术侧面临很大的压力。因此这篇文章主要讨论营销工具前端要如何应对这样大规模的业务需求。 问题拆解 我们核心面对的问题主要如下&#xff1a; 1. 人力有限 我们除了要支撑存量…

全国省、市、县(区)土地利用类型及面积面板数据(2019-2022年)

土地利用类型是根据土地利用方式和地域差异对土地资源单元进行划分的基本土地地域单元。 2019年-2022年全国省、市、县&#xff08;区&#xff09;土地利用类型及面积面板数据_土地利用类型数据下载资源-CSDN文库https://download.csdn.net/download/2401_84585615/89466102 …

2024版最新Wireshark安装使用教程(非常详细)零基础入门到精通,收藏这一篇就够了_wireshark 4.4.0安装要求

前言 这是大白给粉丝盆友们整理的网络安全渗透测试入门阶段渗透测试工具第9篇。 喜欢的朋友们&#xff0c;记得给大白点赞支持和收藏一下&#xff0c;关注我&#xff0c;学习黑客技术 Wireshark 什么是WireShark&#xff1f;Wireshark 是一个开源抓包工具或者叫网络嗅探器&a…

FPGA-Vivado-IP核-逻辑分析仪(ILA)

ILA IP核 背景介绍 在用FPGA做工程项目时&#xff0c;当Verilog代码写好&#xff0c;我们需要对代码里面的一些关键信号进行上板验证查看。首先&#xff0c;我们可以把需要查看的这些关键信号引出来&#xff0c;接好线通过示波器进行实时监测&#xff0c;但这会用到大量的线材…

ViTamin——视觉-语言时代的可扩展视觉模型设计

人工智能咨询培训老师叶梓 转载标明出处 尽管视觉-语言模型&#xff08;VLMs&#xff09;已经取得了显著的成就&#xff0c;但在图像编码器的选择上&#xff0c;传统的视觉Transformer&#xff08;ViT&#xff09;依然是主流。尽管Transformer在文本编码领域已经证明了其有效性…

无线感知会议系列【5】 无线感知边界-1

前言&#xff1a; 无线感知边界是整个ISAC 里面一个研究的难点和重点。 本篇主要来源于2022 《WiFi感知边界研究-Ubicomp2022论文分享》 感知的相关论文组会 2016年无线感知研究主要是国内高校主导,各种无线感知论坛 2021年无线感知 VIVO,OPPO &#xff0c;华为&#xff0c;国…

LeetCode讲解篇之33. 搜索旋转排序数组

文章目录 题目描述题解思路题解代码 题目描述 题目链接 题解思路 旋转后的数组具备一个特性&#xff0c;如果把数组分割成两部分&#xff0c;必定至少有一部分是递增的&#xff0c;并且其中递增区间可以通过左端点小于右端点这个特征来确定 我们基于这个特性&#xff0c;进…

通信工程学习:什么是MIMO多输入多输出技术

MIMO:多输入多输出技术 MIMO(Multiple-Input Multiple-Output)多输入多输出技术是一种在无线通信中广泛应用的技术,它通过利用多个天线进行数据传输和接收,可以显著提高无线通信系统的性能和容量。以下是对MIMO技术的详细解释: 一、定义与原理 MIMO技术…

XWF使用指南

简介 X-Ways Forensics 是由 Stefan Fleischmann 编写的一个轻量化的应急响应及取证工具&#xff0c;是 WinHex 的法证版本&#xff0c;因此界面逻辑和 WinHex 较为相似。在配置好 mplayer 的情况下&#xff0c;程序总体积在 100MiB 左右&#xff0c;运行时内存占用极低&#…

【数据修复指南】手把手教你使用线性插值填补各类遥感数据缺失——Modis、Landsat和Sentinel

线性插值 1. 写在前面2. MODIS数据插值3. Landsat数据插值3.1 参数修改以适应其他类型的遥感数据3.2 Landsat数据汇总3.3 Sentinel卫星介绍 1. 写在前面 之前我写了使用年内均值或者中值来填补数据控制的方法&#xff0c;这种方法较为简单&#xff0c;不够精确。因此&#xff0…

面向人工智能: 对红酒数据集进行分析 (实验四)

由于直接提供截图是不切实际的&#xff0c;我将详细解释如何使用scikit-learn&#xff08;通常称为sk-learn&#xff09;自带的红酒数据集进行葡萄酒数据的分析与处理。这包括实验要求的分析、数据的初步分析&#xff08;完整性和重复性&#xff09;以及特征之间的关联关系分析…

SAP EWM QM 集成

目录 1 简介 2 业务流程 3 后台配置 4 主数据 5 业务操作 5.1 创建 EWM 交货单 5.2 不同的质检结果导致不同的入库地点 - 质检通过 5.3 不同的质检结果导致不同的入库地点 - 质检失败 1 简介 EWM 与 QM (quality management) 集成,自动 or 手动执行质检流程。质检可以…

现代cpp多线程与并发初探

个人博客:Sekyoro的博客小屋 个人网站:Proanimer的个人网站 在现代c(c20)中,有了jthread和协程的概念,使得我们编写并发程序更加方便. 这里作简单学习. 前言知识 多线程编程 std::thread 用于创建一个执行的线程实例,所以它是一切并发编程的基础,使用时需要包含 <thread…

XSS(内含DVWA)

目录 一.XSS的攻击方式&#xff1a; 1. 反射型 XSS&#xff08;Reflected XSS&#xff09; 2. 存储型 XSS&#xff08;Stored XSS&#xff09; 3. DOM型 XSS&#xff08;DOM-based XSS&#xff09; 总结 二..XSS的危害 三.常见的XSS方式 1.script标签 四.常见基本过滤方…

假期旅行数仓项目--OLAP

需要这个完整离线数仓项目的源码和流程PPT可以私信我&#xff0c;可以帮助解决项目中遇到的问题&#xff0c;做完项目可以让你对数仓有更加清晰的认识 项目流程&#xff1a; 配置文件 kafka server.properties hive : hvie-site.xml 启动mysql 的binlog日志 修改maxwell配置…

QT:常用类与组件

1.设计QQ的界面 widget.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QPushButton> #include <QLineEdit> #include <QLabel>//自定义类Widget,采用public方式继承QWidget&#xff0c;该类封装了图形化界面的相关操作&#xff…

怎么绕开华为纯净模式安装软件

我是标题 众所周不知&#xff0c;华为鸿蒙系统自带纯净模式&#xff0c;而且 没法关闭 : ) 我反正没找到关闭键 以前或许会有提示&#xff0c;无视风险&#xff0c;“仍要安装”。但我这次遇到的问题是&#xff0c;根本没有这个选项&#xff0c;只有“应用市场”和“取消”&…

动态规划笔记

第一轮面试准备到第26题 一 解题步骤 对于动态规划问题&#xff0c;我将拆解为如下五步曲&#xff0c;这五步都搞清楚了&#xff0c;才能说把动态规划真的掌握了&#xff01; 确定dp数组&#xff08;dp table&#xff09;以及下标的含义确定递推公式dp数组如何初始化确定遍历…

基于yolov8的海上红外目标系统python源码+onnx模型+评估指标曲线+精美GUI界面

【算法介绍】 基于YOLOv8的海上红外目标系统是一项集成了前沿技术的创新解决方案&#xff0c;专为复杂海洋环境下的目标检测而设计。该系统利用YOLOv8深度学习模型的强大目标检测能力&#xff0c;结合红外成像技术&#xff0c;实现了对海上小型船只、浮标、甚至水下潜器等目标…