RabbitMQ高级

news2024/9/27 4:54:34

文章目录

  • 一.消息可靠性
    • 1.生产者消息确认
    • 2.消息持久化
    • 3.消费者确认
    • 4.消费者失败重试


MQ的一些常见问题

1.消息可靠性问题:如何确保发送的消息至少被消费一次

2.延迟消息问题:如何实现消息的延迟投递

3.高可用问题:如何避免单点的MQ故障而导致的不可用问题

4.消息堆积问题:如何解决数百万消息堆积,无法及时消费的问题

一.消息可靠性

消息从生产者发送到exchange,再到queue,再到消费者,有哪些导致消息丢失的可能性?

  • -发送时丢失:

    • 生产者发送的消息未送达exchange

    • 消息到达exchange后未到达queue

  • MQ宕机,queue将消息丢失

  • consumer接收到消息后未消费就宕机

1.生产者消息确认

生产者确认机制

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:

  • publisher-confirm,发送者确认

消息成功投递到交换机,返回ack
消息未投递到交换机,返回nack

  • publisher-return,发送者回执
    消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。

注意:确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突

在这里插入图片描述

简单来说:在publisher-confirm下的nack是消息投递到交换机失败返回的信息;在publisher-confirm下的ack是消息成功到达了消费者;在publisher-return下的ack是消息到达了交换机但是路由失败的返回信息


SpringAMQP实现生产者确认

一.想要实现生产者消息确认机制,需要在配置文件编写开启代码,即在微服务的application.yml中添加配置:

spring:
  rabbitmq:
    publisher-confirm-type: correlated 
    publisher-returns: true 
    template:
      mandatory: true

配置说明:

  • publish-confirm-type:开启publisher-confirm,这里支持两种类型

    • simple:同步等待confirm结果,直到超时

    • correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback(推荐使用)

  • publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback

  • template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息

二.配置ReturnCallback

ReturnCallback是消息到达交换机但是没有成功进行路由的回调函数(作用于全局)

每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 设置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}", 
                    replyCode, replyText, exchange, routingKey, message.toString());
        });
    }
}

对于代码中的ApplicationContext是负责管理和组织Spring应用中的各个组件,如bean、配置文件等.

通过实现ApplicationContextAware这个接口,bean可以获取对ApplicationContext的引用,并因此获得访问应用上下文中的其他bean、资源和容器特性的能力,所以说实现了接口等同于获取到了bean容器,就可以获取到 rabbitTemplate并进行设置唯一的ReturnCallback

在回调函数中,消息路由失败会返回很多信息,其中使用路由键,消息的交换机的名称可以实现重发消息

三.在生产者类中发送消息并同时实现ConfirmCallback

ConfirmCallback同样是回调函数,与ReturnCallback不同的是ConfirmCallback可以创建多次

ConfirmCallback是对消息还没有进入到交换机就丢失的一种消息返回策略,当丢失后,执行回调函数并可以记录消息的失败原因和UUID,成功也是同理

@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {
    // 消息体
    String message = "hello, spring amqp!";
    // 消息ID,需要封装到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 添加callback
    correlationData.getFuture().addCallback(
            result -> {
                if(result.isAck()){ 
                    // ack,消息成功
                    log.debug("消息发送成功, ID:{}", correlationData.getId());
                }else{
                    // nack,消息失败
                    log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());
                }
            },
            ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
    );
    // 发送消息
    rabbitTemplate.convertAndSend("amq.direct", "simple", message, correlationData);
}

需要注意的是:在手动添加交换机的过程中,想要使用通配符"#"的话,应该设置交换机为topic类型!

总结:

SpringAMQP中处理消息确认的几种情况:

  • publisher-comfirm:

    • 消息成功发送到exchange,返回ack

    • 消息发送失败,没有到达交换机,返回nack

    • 消息发送过程中出现异常,没有收到回执

  • 消息成功发送到exchange,但没有路由到queue,调用ReturnCallback


2.消息持久化

MQ默认是内存存储消息,开启持久化功能可以确保缓存在MQ中的消息不丢失。

1.交换机持久化:

@Bean
public DirectExchange simpleExchange(){
    // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除 
    return new DirectExchange("simple.direct", true, false);
}

2.队列持久化:

@Bean
public Queue simpleQueue(){
    // 使用QueueBuilder构建队列,durable就是持久化的
    return QueueBuilder.durable("simple.queue").build();
}

3.消息持久化,SpringAMQP中的的消息默认是持久的,可以通过MessageProperties中的DeliveryMode来指定

Message msg = MessageBuilder
        .withBody(message.getBytes(StandardCharsets.UTF_8)) // 消息体
        .setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化 
        .build();

但在springamqp中,其实已经在声明交换机和队列的时候将其持久化了,发送消息的方法convertAndSend()内部也将消息做了持久化,了解消息持久化的设置方法可以将以后不是很重要的交换机,队列,消息设置为非持久化


3.消费者确认

RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。而SpringAMQP则允许配置三种确认模式:

  • manual:手动ack,需要在业务代码结束后,调用api发送ack。

  • auto(推荐):自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack,抛出异常后,会不断重新发送消息即失败重试机制

  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

配置方式是修改application.yml文件,添加下面配置:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: none # none,关闭ack;manual,手动ack;auto:自动ack

4.消费者失败重试

当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:

在这里插入图片描述

我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初始的失败等待时长为1秒
          multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

消费者失败消息处理策略

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(推荐使用)

第三种处理策略是将重试失败的消息重新投递到指定的交换机,然后在投递到指定的队列中,形成了一个交换机-队列的错误消息存放容器,在这个容器中存放不仅有错误消息,还有错误消息头的异常栈信息

在这里插入图片描述

实现方式:

1.定义接收失败消息的交换机、队列及其绑定关系:

@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true); 
}
@Bean
public Binding errorBinding(){
    return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
}

2.定义RepublishMessageRecoverer:

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); 
}

总结:如何确保RabbitMQ消息的可靠性?

  • 开启生产者确认机制,确保生产者的消息能到达队列

  • 开启持久化功能,确保消息未消费前在队列中不会丢失

  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack

  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1364043.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AI教我学编程之C#关键字

AI教我学编程系列学习第三课 — C#关键字 前言重点先知关键字分类保留字上下文关键字 对话AI首遇波澜调整指令第一次第二次第三次直到我提出如下指令 人工智能?阶段总结 知识拓展1、Ecma和ISO是什么?2、System,dllhost.exe,taskmg…

服务器磁盘挂载及格式化

一边学习,一边总结,一边分享! 写在前面 最近一直折腾组装的电脑,来回折腾了很久关于我花费六千多组了台window+Linux主机,目前基本是可以使用了。对于Windows主机配置基本是没问题,一直在使用,以及桌面化软件,都可以自己安装,只是说这台主机有些软件可能一时半会安装…

《数字图像处理》课程设计

第1题:感兴趣目标提取 主要内容: 对输入的自然图像进行感兴趣目标提取,给出所有10副图的结果(图像教师提供)。 参考步骤: 对图像进行初始分割,可选择的初始分割方法包括分水岭方法&#xff0…

Unity中Shader面片一直面向摄像机(个性化修改及适配BRP)

文章目录 前言一、个性化修改面向摄像机效果1、把上一篇文章中求的 Z轴基向量 投影到 XoZ平面上2、其余步骤和之前的一致3、在属性面板定义一个变量,控制面片面向摄像机的类型4、效果 二、适配BRP三、最终代码 前言 在上一篇文章中,我们用Shader实现了面…

Cannot resolve property ‘driverClassName‘

已解决 Cannot resolve property 错误 最近在学习spring时遇到了下面的问题: spring读取不到property的name属性,报红,编译不通过,上网查到了两种解决方案,如下: 1、重新加载spring文件就可以解决问题了&a…

1870_使用flx来增强counsel-M-x的模糊匹配功能

Grey 全部学习内容汇总: https://github.com/GreyZhang/editors_skills 1870_使用flx来增强counsel-M-x的模糊匹配功能 这一次算是趁热打铁,把之前优化掉了的counsel-M-x的匹配功能再推进一步。虽然还是没有达到spacemacs中的乱序匹配效果&#xff0c…

Mybatis入门源码二:sql执行

后面开始分析sql执行的源码流程也就是这一部分 一、factory.openSession() 重点关注configuration.newExecutor这个方法,获取事务处理器比较简单,就是获取一个jdbc的事务管理器。 这个方法通过传入的执行器类型来创建不同的执行器,有simp…

学习笔记——C++中数据的输入 cin

作用:用于从键盘中获取数据 关键字:cin 语法:cin>>变量 类型:C中数据的输入主要包含:整形(int)浮点型(float,double float),字符型&…

[C#]使用DlibDotNet人脸检测人脸68特征点识别人脸5特征点识别人脸对齐人脸比对FaceMesh

【官方框架地址】 https://github.com/takuya-takeuchi/DlibDotNet 【算法介绍】 DlibDotNet是一个开源的.NET库,用于实现机器学习和计算机视觉应用。它基于C库dlib,通过C/CLI封装了dlib的所有功能,为.NET开发者提供了简单易用的API。以下是…

爬虫网易易盾滑块案例:某乎

声明: 该文章为学习使用,严禁用于商业用途和非法用途,违者后果自负,由此产生的一切后果均与作者无关 一、滑块初步分析 js运行 atob(‘aHR0cHM6Ly93d3cuemhpaHUuY29tL3NpZ25pbg’) 拿到网址,浏览器打开网站&#xff0…

Android学习(一):Android Studio安装与配置

Android学习(一):Android Studio安装与配置 一、安装 下载地址 下载zip文件,免安装。 二、下载资源 启动后,出现该弹框,点击Cancel。 点击Next 默认,点击Next。 点击Next。 点击Finish 开始…

liunx 巡检命令

top 查看cup使用率 - top 查看cup使用率 退出输入q 第一行top 1.当前系统时间 2.up 运行时间 3.users当前连接的终端数量 4.load average 负载的平均值 第二行 tasks 任务 1. 128 total #为当前系统进程总数 2. 1 running #为当前系统进程总…

前端--基础 常用标签-超链接标签 外部链接( herf 和 target)

目录 超链接标签 &#xff1a; 超链接的语法格式 &#xff1a; 超链接的属性 &#xff1a; 超链接的分类 &#xff1a; 外部链接 &#xff1a; 超链接标签 &#xff1a; # 在 HTML 标签中&#xff0c;<a> 标签用于定义超链接&#xff0c;作用是从一个页面…

屏幕截图--Snagit

Snagit是一款优秀的屏幕、文本和视频捕获、编辑与转换软件。它不仅可以捕获静止的图像&#xff0c;还能获得动态的图像和声音。软件界面干净清爽&#xff0c;功能板块一目了然&#xff0c;为用户提供专业的屏幕录制方案。可以根据自己的需求调整录制视频的分辨率、帧数、输出格…

HNU-数据库系统-讨论课2

第二次小班讨论课安排如下: 主题: 数据库系统设计与应用开发。 目的&#xff1a;让学生通过练习和讨论充分掌握数据库系统的设计与应用开发。 内容: 设计和实现一个数据库及应用系统。设计内容包括系统分析、系统设计、 数据库设计&#xff08;需求分析、概念结构设计、逻辑…

图神经网络|9.3 邻接矩阵的变换

由于邻接矩阵中一般不会&#xff08;i,i&#xff09;等于1&#xff0c;除非第i个点上有自环。 而如果用邻接矩阵去乘上特征矩阵&#xff0c;那么将丢失自身向自身的贡献。 此时可以再邻接矩阵的基础上&#xff0c;再加上一个单位阵&#xff0c;从而使得最终的结果包含自身对整体…

P4994 终于结束的起点————C

目录 终于结束的起点题目背景题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1 样例 #2样例输入 #2样例输出 #2 提示样例 1 解释数据范围提示 解题思路Code运行结果 终于结束的起点 题目背景 终于结束的起点 终于写下句点 终于我们告别 终于我们又回到原点 …… 一个个…

RK3399平台入门到精通系列讲解(实验篇)lseek 函数进行读写位置的调整

🚀返回总目录 文章目录 一、lseek 函数的使用二、信号驱动 IO 实验源码2.1、Makefile2.2、驱动部分代码2.3、测试应用代码lseek() 是一个用于文件定位的系统调用函数,用于在文件中移动读写位置指针。在 Linux 中,它常常被用来修改文件读写的位置。lseek() 可以被用于随机存…

[嵌入式C][入门篇] 快速掌握基础4 (普通函数,递归函数,函数指针,弱函数)

开发环境&#xff1a; 网页版&#xff1a;跳转本地开发(Vscode)&#xff1a;跳转 文章目录 一、简介二、普通函数&#xff08;1&#xff09;定义函数&#xff08;2&#xff09;调用函数1. 传值调用2. 传地址调用 三、递归函数&#xff08;套娃&#xff09;(1) 示例代码 四、函…

记.backward()报错

最近我在模型训练损失里加入了LPIPS深度感知损失&#xff0c;训练的时候就出现了如上的报错&#xff0c;具体解释为&#xff1a;调用梯度反向传播loss.backward()时&#xff0c;我们计算梯度&#xff0c;需要一个标量的loss(即该loss张量的维度为1,只包含一个元素&#xff09;&…