RabbitMQ - 如保证消息的可靠性?

news2025/1/15 18:27:19

目录

一、消息可靠性

1.1、生产者消息确认(生产者角度)

1.1.1、理论

1.1.2、实践

1.2、消息持久化(消息角度)

1.2.1、理论

1.3、消费者消息确认(消费者角度)

1.3.1、理论

1.3.2、实践

1.4、失败重试机制(失败后的处理机制)

1.4.1、理论


一、消息可靠性


1.1、生产者消息确认(生产者角度)

1.1.1、理论

在生产者这边,RabbitMQ 提供了 消息确认机制 来确保生产者的消息到达队列。

具体的,生产者将消息发送给 MQ 之后,会返回一个结果给生产者,表示消息是否处理成功,具体有以下两种响应:

  1. publish-confirm 响应
    1. 消息成功投递到交换机,返回 ack.
    2. 消息未投递到交换机(比如交换机不存在,或者是交换机名字写错了),返回 nack.
  2. publish-return 响应
    1. 消息投递到交换机,但是没有路由到队列(比如指定的队列名字写错了),返回 ack,以及路由失败的原因.

最后生产者这边的回调接收到响应后,根据不同的 ack 执行不同的“策略”(类似于你去买书,然后拿到书以后具体要干啥,都由你决定).

Ps:确认机制发送消息时,需要给每一个消息设置一个全局唯一的 id, 以区分不同消息,避免 ack 冲突.

1.1.2、实践

a)再 publisher 微服务的 application.yml 中添加配置:

spring:
  rabbitmq:
    publisher-confirm-type: correlated 
    publisher-returns: true 
    template:
      mandatory: true

配置说明:

  1. publish-confirm-type:开启publisher-confirm,这里支持两种类型,
    1. simple(不推荐,类似死等,占用资源):同步等待confirm结果,直到超时.
    2. correlated(推荐):异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback.
  2. publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback.
  3. template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息.

b)每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 设置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}", 
                    replyCode, replyText, exchange, routingKey, message.toString());
        });
    }
}

Ps:ApplicationContextAware 就是 Spring 容器启动时的要执行的通知接口,通过 setApplicationContext 方法实现具体的通知.

c)生产者发送消息,指定 ID,消息  ConfirmCallback

@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {
    // 消息体
    String message = "hello, spring amqp!";
    // 消息ID,需要封装到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 添加callback
    correlationData.getFuture().addCallback(
            result -> {
                if(result.isAck()){ 
                    // ack,消息成功
                    log.debug("消息发送成功, ID:{}", correlationData.getId());
                }else{
                    // nack,消息失败
                    log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());
                }
            },
            ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
    );
    // 发送消息
    rabbitTemplate.convertAndSend("amq.direct", "simple", message, correlationData);
}

1.2、消息持久化(消息角度)

1.2.1、理论

MQ 默认时内存存储消息,通过开启持久化功能(设置 durable = true),就可以将消息持久化到文件中,保证保证消息不丢失.

Ps:消息要持久化的前提是交换机(不一定,但最好是)和队列是持久化的.

1.2.2、实践

a)交换机持久化

@Bean
public DirectExchange simpleExchange(){
    // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除 
    return new DirectExchange("simple.direct", true, false);
}

b)队列持久化

@Bean
public Queue simpleQueue(){
    // 使用QueueBuilder构建队列,durable就是持久化的
    return QueueBuilder.durable("simple.queue").build();
}

c)消息持久化

    public void testDurableMessage() {
        //1.构造一个持久的消息
        Message message = MessageBuilder.withBody("hello".getBytes())
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .build();
        rabbitTemplate.convertAndSend("simple.queue", message);
    }

Ps:delivery_mode = 2 就表示消息要持久化.

1.3、消费者消息确认(消费者角度)

1.3.1、理论

RabbitMQ 支持消费者确认机制,即:消费者处理消息后可以向 MQ 发送 ack 回执,MQ收到ack回执后才会删除该消息.

SpringAMQP 允许配置三种确认模式:

  • manual:手动ack,需要在消费者执行的消息代码结束后,调用api发送ack。
  • auto:自动ack,由 spring 监测消费者的执行的消费代码是否出现异常,没有异常则返回ack;抛出异常则返回nack,然后会将消息重新加入到队列,再发送给消费者,然后再次异常...,无限循环.
  • none:关闭ack,MQ 假定消费者获取消息后会成功处理,因此消息投递后立即被删除

1.3.2、实践

这里只需要配置以下 application.yml 文件,添加以下配置:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: none # none,关闭ack;manual,手动ack;auto:自动ack

1.4、失败重试机制(失败后的处理机制)

1.4.1、理论

刚刚讲到,消费者消费确认,SpringAMQP 提供了三种确认模式,其中 auto 这种方式,在消费者执行消费代码遇到异常时,会重新将消息加入到队列中,然后发送给消费者,再次异常,无限循环,导致 mq 的消息处理飙升,带来不必要的压力.

假设消费任务如下:

@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg) {
        System.out.println("消费者接收到消息:" + msg);
        System.out.println("开始消费!");
        System.out.println(1/0);
        System.out.println("消费完成!");
    }
}

我们可以利用 Spring 的 retry 机制,在消费者出现异常时,利用本地重试,而不是无限制的加入到 mq 队列,只需要对消费者的配置文件进行以下配置:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初始的失败等待时长为1秒
          multiplier: 3 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 4 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

在开启重试模式以后,若重试次数耗尽,并且消息依然失败,则需要有 MessageRecoverer 接口来处理,他包含三种不同的实现:

  1. RejectAndDontRequeueRecoverer(默认方式):重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  2. ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  3. RepublishMessageRecoverer(推荐方式):重试耗尽后,将失败消息投递到指定的交换机,再由交换机投递到指定的队列.
     

上述第三种方式比较推荐,如下图:

1.4.2、实践

这里就测试以下推荐方案 RepublishMessageRecoverer

a)首先要定义用来接收失败消息的交换机、队列、绑定关系,最后定义 RepublishMessageRecoverer(Bean 的方式注入,覆盖 Spring 默认的方案):

@Configuration
public class ErrorMessageConfig {

    @Bean
    public DirectExchange errorMessageExchange() {
        return new DirectExchange("error.direct");
    }

    @Bean
    public Queue errorQueue() {
        return new Queue("error.queue", true);
    }

    @Bean
    public Binding errorBinding() {
        return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }

}

b)定义消费者执行的消费任务

@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg) {
        System.out.println("消费者接收到消息:" + msg);
        System.out.println("开始消费!");
        System.out.println(1/0);
        System.out.println("消费完成!");
    }
}

c)启动消费者,如下:

d)查看失败队列中具体信息(异常栈信息和信息信息)

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1000571.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

母婴用品小程序开发

母婴用品小程序商城开发 商品分类与搜索: 提供母婴用品的分类,如奶粉、尿片、婴儿服装等。 用户可以根据需求进行搜索,快速找到所需的母婴用品。 商品详情与评价: 展示母婴用品的详细信息,包括商品图片、价格、规…

使用Cpolar和极简主义文件管理器构建个人云储存平台并进行公网访问

文章目录 1. 前言2.Tiny File Manager网站搭建2.1.Tiny file manager下载和安装2.2 Tiny file manager网页测试2.2 Tiny file manager网页测试3. 本地网页发布3.1 Cpolar云端设置3.2 Cpolar本地设置 4. 公网访问测试总结 1. 前言 文件共享和查阅是现在网络最常见的应用场景&am…

再见ChatGPT 无需代码能力创建自定义ChatGPT

【无需代码技能】您可以使用3个不同的LLM创建自己的自定义ChatGPT GPT-4 Llama 2Falcon LLM 这是如何与Dante AI创建自己的聊天机器人 Dante AI是一个去中心化的人工智能平台,允许用户创建和训练自己的AI模型。它使用基于区块链的系统来管理数据和交易&#xff…

UG\NX二次开发 获取曲面uv中心点 UF_MODL_ask_face_props

文章作者:里海 来源网站:王牌飞行员_里海_里海NX二次开发3000例,里海BlockUI专栏,C\C++-CSDN博客 简介: UG\NX二次开发 获取曲面uv中心点 UF_MODL_ask_face_props。 效果: 代码: #include "me.hpp"void AskFaceMidpoint() {//选择面tag_t face …

C语言猜数字游戏详解及代码优化

目录 引言: 1.程序思路: 2.代码实现: 2.1.生成游戏菜单: 2.2.构建主函数框架: 2.3.构建游戏函数: 3.游戏源码: 4.程序优化: 🎈优化后源码: 结论…

Power Series and Laplace Transforms

See https://math.libretexts.org/Bookshelves/Analysis/Supplemental_Modules_(Analysis)/Ordinary_Differential_Equations/6%3A_Power_Series_and_Laplace_Transforms

计算机毕设之基于Hadoop+springboot的物品租赁系统的设计与实现(前后端分离,内含源码+文档+教程)

该系统基于Hadoop平台,利用Java语言、MySQL数据库,结合目前流行的 B/S架构,将物品租赁管理的各个方面都集中到数据库中,以便于用户的需要。在确保系统稳定的前提下,能够实现多功能模块的设计和应用。该系统由管理员功能…

编译CentOS Stream 8系统的OpenSSHV9.4rpm安装包

目前OpenSSH版本已至9.4,其作为操作系统底层管理平台软件,需要保持更新以免遭受安全攻击,编译生成rpm包是生产环境中批量升级的最佳途径。编译软件包时与当前的运行环境有较大关系,请注意本安装包系在CentOS Stream 8原生系统纯净…

游戏发行平台都有什么服务和功能?

游戏发行平台通常提供一系列服务和功能,以帮助游戏开发商将游戏推向市场,并为玩家提供游戏。以下是一些常见的游戏发行平台服务和功能: 1、游戏发布 发行平台允许游戏开发商将游戏上传到平台上,以供玩家下载和安装。 2、游戏销售…

【C++】构造函数与析构函数概念简介 ( 构造函数和析构函数引入 | 构造函数定义与调用 | 析构函数定义与调用 | 代码示例 )

文章目录 一、构造函数和析构函数引入二、构造函数简介1、构造函数定义2、构造函数调用3、代码示例 - 构造函数定义与调用 三、析构函数简介1、析构函数定义2、析构函数调用3、代码示例 - 析构函数定义与调用 一、构造函数和析构函数引入 在 C 语言中 , 创建对象时 , 需要进行对…

【数据结构】二叉树的链式存储结构

【数据结构】二叉树的链式存储结构 二叉树的存储结构 typedef int BTDataType; // 二叉树的结构 typedef struct BinaryTreeNode {BTDataType data; // 树的值struct BinaryTreeNode *left; // 左孩子struct BinaryTreeNode *right;// 右孩子 } BinaryTreeNode;二…

华为云云服务器云耀L实例评测 | 上手华为云耀L实例:一篇教学文章就够了

🌷🍁 博主猫头虎(🐅🐾)带您 Go to New World✨🍁 🦄 博客首页——🐅🐾猫头虎的博客🎐 🐳 《面试题大全专栏》 🦕 文章图文…

Linux驱动【day2】

mychrdev.c: #include <linux/init.h> #include <linux/module.h> #include <linux/fs.h> #include<linux/uaccess.h> #include<linux/io.h> #include"head.h" unsigned int major; // 保存主设备号 char kbuf[128]{0}; unsigned int…

Stable Diffusion WebUI内存不够爆CUDA Out of memory怎么办?

在我们运行SD的时候,我们经常会爆CUDA Out of memory。 我们应该怎么办呢? 这是因为我们的显存或者内存不够了。 如果你是用cpu来跑图的则表示内存不够,这个时候就需要换个大点的内存了。 如果你是用gpu来跑图的就说明你显存不够用咯,这时候咋办呢? 下面我将一一述说…

企业架构LNMP学习笔记39

MySQL读写分离案例实现&#xff1a; 搭建M-S复制 主从复制的原理&#xff1a;主服务器开启bin-log&#xff08;记录了写操作&#xff09;&#xff0c;从服务器获取到主服务器的bin-log&#xff0c;记录到relay-log中。从服务器在通过异步的线程方式&#xff0c;对于relay-log…

明星为何会偷税?我国的交税政策是?

近几年常常会看到某明星偷税漏税塌房的&#xff0c;从最开始的震惊&#xff0c;到后面的习以为常&#xff1a;很多明星都在偷税漏税啊。那么明星为什么会是偷税漏税的高发区&#xff1f; 交税标准 个人 根据我国的税务相关法律规定&#xff0c;个人收入每月超过5000&#xf…

微服务高可用容灾架构设计

导语 相对于过去单体或 SOA 架构&#xff0c;建设微服务架构所依赖的组件发生了改变&#xff0c;因此分析与设计高可用容灾架构方案的思路也随之改变&#xff0c;本文对微服务架构落地过程中的几种常见容灾高可用方案展开分析。 作者介绍 刘冠军 腾讯云中间件中心架构组负责…

windows本地验证码识别工具

windows本地验证码识别小工具 - 可以用在windows系统中&#xff0c;并可以集成在Java或python程序中 演示视频如下&#xff1a;可用于识别4-7位的字母数字组合的验证码&#xff08;识别准确率在70% - 80%&#xff09;。 验证码识别演示 本项目未开源&#xff0c;如需使用请联…

IntelliJ IDEA 远程调试 Tomcat

准备工作 明确远程服务器的 IP 地址&#xff0c;比如我是&#xff1a;192.168.92.128 关掉服务器防火墙&#xff1a;service iptables stop 本地 Remote Server 配置 添加 Remote Server&#xff0c;如下图 复制 Remote Server 自动生成的 JVM 参数&#xff0c;等下有用&…