RabbitMQ 如何保证消息可靠性

news2024/12/25 14:01:13

RabbitMQ 如何保证消息可靠性

    • 1. 保证生产者可靠
      • 1.1 生产者确认机制
      • 1.2 实现生产者确认
        • 1.2.1 开启生产者确认机制
        • 1.2.2 定义ReturnCallback
        • 1.3.3.定义ConfirmCallback
      • 1.3 注意
    • 2. 保证MQ可靠
      • 2.1 数据持久化
        • 2.1.1 交换机持久化
        • 2.1.2.队列持久化
        • 2.1.3 消息持久化
        • 2.1.4 注意
    • 3. 保证消费者可靠
      • 3.1 消费者确认机制
      • 3.2 失败重试机制
      • 3.3 失败处理策略
    • 4. 总结

该文章用到的项目代码与MQ请看下面两篇文章:

消息队列 - RabbitMQ

SpringAMQP 快速入门


要保证消息的可靠性先看一下哪些情况会破坏消息的可靠性:

  • 发送消息时丢失:
    • 生产者发送消息到达MQ后未找到Exchange
    • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue
  • MQ导致消息丢失:
    • 消息到达MQ,保存到队列后,尚未消费就突然宕机
  • 消费者处理消息时:
    • 消息接收后尚未处理突然宕机
    • 消息接收后处理过程中抛出异常

综上,我们要解决消息丢失问题,保证MQ的可靠性,就必须从3个方面入手:

  • 确保生产者一定把消息发送到MQ
  • 确保MQ不会将消息弄丢
  • 确保消费者一定要处理消息

1. 保证生产者可靠

1.1 生产者确认机制

生产者确认机制是 RabbitMQ 中的一种机制,用于确保消息成功发送到 RabbitMQ 服务器。这个机制可以防止消息在生产者发送后因为某种原因未能成功到达消息队列,从而提高消息的可靠性。

生产者确认机制,包括Publisher ConfirmPublisher Return两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执。

  • 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ACK的确认信息,代表投递成功
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功
  • 其它情况都会返回NACK,告知投递失败

其中acknack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制。
默认两种机制都是关闭状态,需要通过配置文件来开启。

1.2 实现生产者确认

1.2.1 开启生产者确认机制

在publisher模块的application.yaml中添加配置:

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开启publisher return机制

这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制
  • simple:同步阻塞等待MQ的回执
  • correlated:MQ异步回调返回回执
1.2.2 定义ReturnCallback

ReturnCallback 是 RabbitMQ 提供的回调接口,主要用于处理消息无法路由到队列时的返回情况。当消息发送到交换机,但交换机无法将消息路由到任何队列时,就会触发 ReturnCallback

每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:

@Slf4j
@Configuration
public class MqConfig {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                log.error("触发return callback,");
                log.debug("exchange: {}", returned.getExchange());
                log.debug("routingKey: {}", returned.getRoutingKey());
                log.debug("message: {}", returned.getMessage());
                log.debug("replyCode: {}", returned.getReplyCode());
                log.debug("replyText: {}", returned.getReplyText());
            }
        });
    }
}
1.3.3.定义ConfirmCallback

ConfirmCallback 是 RabbitMQ 提供的回调接口,用于处理消息是否成功发送到 RabbitMQ 服务器的确认。当消息成功到达服务器时,触发 ConfirmCallback

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:

在这里插入图片描述

这里的CorrelationData中包含两个核心的东西:

  • id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆
  • SettableListenableFuture:回执结果的Future对象

将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处理消息回执:

在这里插入图片描述

我们新建一个测试,向系统自带的交换机发送消息,并且添加ConfirmCallback

    @Test
    public void testConfirm1() {
        // 1.创建CorrelationData 这里没传 id ,因为 CorrelationData 无参构造函数里面会创建id
        CorrelationData cd = new CorrelationData();
        // 2.给Future添加ConfirmCallback
        cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            @Override
            public void onFailure(Throwable ex) {
                // 2.1.Future发生异常时的处理逻辑,基本不会触发
                log.error("send message fail", ex);
            }
            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                // 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
                if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
                    log.debug("发送消息成功,收到 ack!");
                }else{ // result.getReason(),String类型,返回nack时的异常描述
                    log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
                }
            }
        });
        // 3.发送消息
        rabbitTemplate.convertAndSend("test.topic", "aaa", "hello", cd);
    }

执行结果如下

在这里插入图片描述

消息发送成功收到了ACK,但是没有对应的Routing Key触发了return callback

当我们修改为正确的RoutingKey以后,就不会触发return callback了,只收到ack。

而如果连交换机都是错误的,则只会收到nack。

1.3 注意

开启生产者确认比较消耗MQ性能,一般不建议开启。而且大家思考一下触发确认的几种情况:

  • 路由失败:一般是因为RoutingKey错误导致,往往是编程导致
  • 交换机名称错误:同样是编程错误导致
  • MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理 nack 就可以了。

2. 保证MQ可靠

消息到达MQ以后,如果MQ不能及时保存,也会导致消息丢失,所以MQ的可靠性也非常重要。

2.1 数据持久化

为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括:

  • 交换机持久化
  • 队列持久化
  • 消息持久化
2.1.1 交换机持久化

在控制台的Exchanges页面,添加交换机时可以配置交换机的Durability参数:

在这里插入图片描述

设置为Durable就是持久化模式,Transient就是临时模式。

2.1.2.队列持久化

在控制台的Queues页面,添加队列时,同样可以配置队列的Durability参数:

在这里插入图片描述

2.1.3 消息持久化

在控制台发送消息的时候,可以添加很多参数,而消息的持久化是要配置一个properties

delivery-mode:

  • 持久性标志,用于指定消息的持久性。值为 1 表示持久性消息,值为 2 表示非持久性消息。

在这里插入图片描述

2.1.4 注意

在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。


3. 保证消费者可靠

RabbitMQ是阅后即焚机制,一般情况下RabbitMQ确认消息投递到消费者后会立刻删除,如果消费者未正常处理这条消息,该消息就会丢失。

设想这样的场景:

  1. RabbitMQ投递消息给消费者
  2. 消费者获取消息后,返回ACK给RabbitMQ
  3. RabbitMQ删除消息
  4. 消费者宕机,消息尚未处理 or 处理过程中发生异常

这样,消息就丢失了。

3.1 消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ack,处理失败时返回nack.

由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
  • manual:手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活
  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:
    • 如果是业务异常,会自动返回nack
    • 如果是消息处理或校验异常,自动返回reject;

通过下面的配置可以修改SpringAMQP的ACK处理方式:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none # 不做处理

修改consumer服务的SpringRabbitListener类中的方法,模拟一个消息处理的异常:

@RabbitListener(queues = "simple.queue")
public void listenTestQueueMessage(String msg) throws InterruptedException {
    log.info("spring 消费者接收到消息:【" + msg + "】");
    if (true) {
        throw new MessageConversionException("故意的");
    }
    log.info("消息处理完成");
}

测试可以发现:当消息处理发生异常时,消息依然被RabbitMQ删除了。

把确认机制修改为auto:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 自动ack

在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unacked(未确定状态):

在这里插入图片描述

放行以后,由于抛出的是消息转换异常,因此Spring会自动返回reject,所以消息依然会被删除

我们将异常改为RuntimeException类型:

    @RabbitListener(queues = "test.queue1")
    public void listenTestQueueMessage(String msg) throws InterruptedException {
        log.info("spring 消费者接收到消息:【" + msg + "】");
        if (true) {
            throw new RuntimeException("故意的");
        }
        log.info("消息处理完成");
    }

在异常位置打断点,然后再次发送消息测试,程序卡在断点时,可以发现此时消息状态为unacked(未确定状态)

放行以后,由于抛出的是业务异常,所以Spring返回ack,最终消息恢复至Ready状态然后再次投递变为unacked状态,并且没有被RabbitMQ删除:

在这里插入图片描述

3.2 失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。
极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力:

在这里插入图片描述

为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

修改consumer服务的application.yml文件,添加内容:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启consumer服务,重复之前的测试。可以发现:

  • 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次
  • 本地重试3次以后,抛出了AmqpRejectAndDontRequeueException异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring会返回reject,消息会被丢弃

3.3 失败处理策略

在之前的测试中,本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。
因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

1.在consumer服务中定义处理失败消息的交换机和队列

@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

2.定义一个RepublishMessageRecoverer,关联队列和交换机

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

完整代码如下:

@Configuration
// 条件注解 只有在yml文件中 spring.rabbitmq.listener.simple.retry.enabled 属性的值为 true 才生效
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfiguration {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

测试消息发送,消费者重试3次后就将消息转发到了指定的交换机

在这里插入图片描述

在这里插入图片描述

4. 总结

如何确保RabbitMQ消息的可靠性?

  • 开启生产者确认机制,确保生产者的消息能到达队列
  • 开启持久化功能,确保消息未消费前在队列中不会丢失
  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
  • 开启消费者失败重试机制,并设置 MessageRecoverer 为 RepublishMessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1310670.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ThingWorx/Vuforia—工业物联网和AR平台

产品概述 ThingWorx是美国PTC公司旗下的一款物联网和AR平台&#xff0c;它提供了适用于IoT的开发工具和能力&#xff0c;使开发者可以为工业物联网快速构建和部署变革性的智能互联解决方案&#xff0c;使创新者能够快速为当今的智能互联世界提供优异的应用程序、解决方案和用户…

自动机器学习是什么?概念及应用

自动机器学习 (Auto Machine Learning) 的应用和方法 随着众多企业在大量场景中开始采用机器学习&#xff0c;前后期处理和优化的数据量及规模指数级增长。企业很难雇用充足的人手来完成与高级机器学习模型相关的所有工作&#xff0c;因此机器学习自动化工具是未来人工智能 (A…

装机DIY-配件价格比较

计算机配件价格比较 &#x1f680;&#x1f680;&#x1f680;&#x1f680;最近无事总刷到DIY装机视频&#xff0c;自己也有兴趣&#xff0c;同时这段时间也在学前端&#xff0c;发现每次比较价格都有重新搜&#xff0c;重新计算&#xff0c;且不同配置也不好比较&#xff0c…

【毕业设计】基于STM32的智能衣柜设计

1、功能说明 功能如下: 1、用stm32控制ds18b20采集温度 2、然后按键可以设置上下限温度&#xff0c; 3、采集的温度低于下限温度时候 打开加热片开始加热&#xff0c; 4、加热到上限温度关闭加热片停止加热&#xff0c; 5、采集的温度可以在oled显示&#xff0c; 6、然后弄个按…

c语言:指针运算

目录 指针类型与整型进行加减 规律 同类型指针减法运算 其他类型的指针运算 一个数据对象的内存位置有两个重要信息&#xff1a; 数据对象的首地址。数据对象占用存储空间大小 指针类型的值存储的是内存地址。内存地址是从0开始&#xff0c;依次加1的整型数据。 指针类…

单元测试二(实验)-云计算2023.12-云南农业大学

1、实践系列课《深入浅出Docker应用》 https://developeraliyun.com/adc/scenarioSeries/713c370e605e4f1fa7be903b80a53556?spma2c6h.27088027.devcloud-scenarioSeriesList.13.5bb75b8aZHOM2w 容器镜像的制作实验要求 创建Dockerfile文件: FROM ubuntu:latest WORKDIR data…

Git应用——代码提交规范 feat ,fix ,style

当前使用 feat 增加新功能fix 修复问题/BUGstyle 代码风格相关无影响运行结果的perf 优化/性能提升refactor 重构revert 撤销修改test 测试相关docs 文档/注释chore 依赖更新/脚手架配置修改等workflow 工作流改进ci 持续集成types 类型定义文件更改wip 开发中 别处看到 fea…

玩转大数据14:分布式计算框架的选择与比较

1. 引言 随着大数据时代的到来&#xff0c;越来越多的企业和组织需要处理海量数据。分布式计算框架提供了一种有效的方式来解决大数据处理的问题。分布式计算框架将计算任务分解成多个子任务&#xff0c;并在多个节点上并行执行&#xff0c;从而提高计算效率。 2. 分布式计算…

【操作系统导论】内存篇——分页

引入 采用 「分段」 的方式&#xff0c;将空间切成 不同长度的分片&#xff0c;会出现 碎片化 问题&#xff0c;随着时间推移&#xff0c;分配内存会越来越困难。 因此&#xff0c;值得考虑「分页」的方法&#xff1a; 将空间分割成 固定长度的分片 &#xff1b; 将物理内存…

斑马zebra目标检测数据集VOC+YOLO格式2300张

斑马是由四百万年前的原马进化出来的&#xff0c;最早出现的斑马可能是细纹斑马。有关史前马科动物的化石现存于美国爱达荷州克文的克文化石床国家博物馆。斑马的史前马为“克文马”&#xff08;美洲斑马或者克文斑马&#xff09;&#xff0c;学名为“Equussimplicidens”&…

智能守护,数据安全稳中求胜!上海迅软DSE助力家具家电行业引领潮流!

随着中国经济的蓬勃发展&#xff0c;家具家电企业正迎来“精品制造”的时代&#xff0c;业内竞争日益激烈。为了提升产品竞争力、扩大市场占有率&#xff0c;企业亟需加强对自主品牌的安全建设&#xff0c;确保品牌的自主知识产权、产品生产资料以及销售信息等核心数据不受泄漏…

Docker真的好难用啊,为什么说它移植性好啊?

看起来你对Docker有点困惑和挑战呀。Docker刚开始确实有点难以入门&#xff0c;但是一旦掌握了它的核心概念和操作&#xff0c;你会发现它其实非常强大和便利。 接下来我会根据你提出的问题和场景&#xff0c;详细地解答。 关于你的实际问题&#xff1a; 刚接触时的困难是正。…

如何实现服务注册与发现?

本文主要讲解如何实现服务注册与发现。 在分布式服务中&#xff0c;服务注册和发现是一个特别重要的概念&#xff0c;为什么需要服务注册和发现&#xff1f;常用的服务发现组件有哪些&#xff1f;服务注册和发现对一致性有哪些要求呢?下面我们就来学习服务发现相关的知识。 …

【五】Python 代理模式

文章目录 5.1 代理模式概述5.1.1 代理介绍5.1.2 代理模式的作用 5.2 代理模式的UML类图5.3 了解不同类型的代理5.3.1虚拟代理5.3.2 远程代理5.3.3 保护代理5.3.4 智能代理 5.4 现实世界中的代理模式5.5 代理模式的优点5.6 门面模式和代理模式之间的比较 5.1 代理模式概述 5.1.…

用XAMPP在Windows系统构建一个本地Web服务器

用XAMPP在Windows系统构建一个本地Web服务器 Build a Local Web Server for Windows with XAMPP By JacksonML 本文简要介绍如何获取和安装XAMPP以实现Windows环境下本地Web服务器的过程&#xff0c;希望对广大网友和学生有所帮助。 所谓本地Web服务器&#xff0c;即使用本地…

Python框架篇(5):FastApi-中间件使用

1.介绍 1.1 官网介绍 "中间件"是一个函数,它在每个请求被特定的路径操作处理之前,以及在每个响应返回之前工作. 它接收你的应用程序的每一个 请求. 然后它可以对这个 请求做一些事情或者执行任何需要的代码. 然后它将 请求传递给应用程序的其他部分 (通过某种 路径操…

slurm 23.11.0集群 debian 11.5 安装

slurm 23.11.0集群 debian 11.5 安装 用途 Slurm(Simple Linux Utility for Resource Management&#xff0c; http://slurm.schedmd.com/ )是开源的、具有容错性和高度可扩展的Linux集群超级计算系统资源管理和作业调度系统。超级计算系统可利用Slurm对资源和作业进行管理&a…

变电站蓄电池在线监测系统(论文+源码)

1. 系统设计 本次课题为变电站蓄电池在线监测系统的设计&#xff0c;其系统架构如图3.1所示&#xff0c;包括了主控制器STC89C52单片机&#xff0c;液晶显示器LCD1602,模数转换器ADC0832&#xff0c;电流传感器ACS712&#xff0c;分压电阻&#xff0c;蜂鸣器以及温度传感器。在…

Amazon SageMaker: 拓展机器学习边界,塑造未来创新趋势

授权说明&#xff1a;本篇文章授权活动官方亚马逊云科技文章转发、改写权&#xff0c;包括不限于在 亚马逊云科技开发者社区, 知乎&#xff0c;自媒体平台&#xff0c;第三方开发者媒体等亚马逊云科技官方渠道。 近期在 re:Invent 2023 大会上&#xff0c;亚马逊云科技发布了一…

如何将Galaxybase图数据库应用于电力设备管理

导读 近日&#xff0c;受强冷空气影响&#xff0c;部分北方地区出现不同程度的降雪&#xff0c;并持续降温。据国家电网发布的预警通知&#xff0c;要求启动预警响应和应急机制&#xff0c;密切跟踪灾害预警信息和应急响应情况&#xff0c;滚动研判分析覆冰、积雪、低温等对电…