RabbitMQ-消息队列:发布确认高级

news2025/1/11 8:13:33

18、发布确认高级

在生产环境中由于一些不明原因,导致 RabbitMQ 重启,在 RabbitMQ 重启期间生产者消息投递失败, 导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢?

1、确认机制方案

在这里插入图片描述

2、代码架构图

在这里插入图片描述

在配置文件当中需要添加

spring.rabbitmq.publisher-confirm-type=correlated
  • NONE 值是禁用发布确认模式,是默认值。

  • CORRELATED 值是发布消息成功到交换器后会触发回调方法

  • SIMPLE(最好不用) 值经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker。(相当于单一发布)

3、正常发送接收代码

(1)配置类

在这里插入图片描述

在这里插入图片描述

(2)生产者

在这里插入图片描述

(3)消费者

在这里插入图片描述

(4)测试效果:

在这里插入图片描述

4、回调接口

在这里插入图片描述

如果是交换机出了问题,队列出不出问题,都无济于事,交换机出了问题,交换机的缓存就没什么作用了,所以需要通过回调解决交换机宕机的问题

(1)修改生产者代码,添加回调接口方法

在这里插入图片描述

(2)查看源码

在这里插入图片描述

实现内部接口函数的注意点:

在这里插入图片描述

交换机确认回调方法

在这里插入图片描述

(4)创建回调函数的三部曲

在这里插入图片描述

发送方设置回调函数的CorrelationData

在这里插入图片描述

(5)配置开启交换机回调服务

在这里插入图片描述

测试效果:

在这里插入图片描述

交换机宕掉

在这里插入图片描述

测试效果:

在这里插入图片描述

结果:出现找不到交换机的错误

队列宕掉

在这里插入图片描述

测试效果:

访问: http://localhost:8080/confirm/sendMessage/LBJ

在这里插入图片描述
在这里插入图片描述

结果:会丢失RK对应的队列消息

可以看到,发送了两条消息,第一条消息的 RoutingKey 为 “key1”,第二条消息的 RoutingKey 为 “key2”,两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所有第二条消息被直接丢弃了丢弃的消息交换机是不知道的,需要解决告诉生产者消息传送失败

代码实现

(1)配置类代码

@Configuration
public class ConfirmConfig {
    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";

    //声明业务 Exchange
    @Bean("confirmExchange")
    public DirectExchange confirmExchange() {
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }

    // 声明确认队列
    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    // 声明确认队列绑定关系
    @Bean
    public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
                                @Qualifier("confirmExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("key1");
    }
}

(2)消息生产者的回调接口

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
    /**
     * 交换机不管是否收到消息的一个回调方法
     *
     * @param correlationData 消息相关数据
     * @param ack             交换机是否收到消息
     * @param cause           为收到消息的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交换机已经收到 id 为:{}的消息", id);
        } else {
            log.info("交换机还未收到 id 为:{}消息,原因:{}", id, cause);
        }
    }

}

(3)消息生产者

@RestController
@RequestMapping("/confirm")
@Slf4j
public class ProducerController {
    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private MyCallBack myCallBack;

    //依赖注入 rabbitTemplate 之后再设置它的回调对象
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(myCallBack);
    }
    
    /**
     * 消息回调和退回
     *
     * @param message
     */
    @GetMapping("sendMessage/{message}")
    public void sendMessage(@PathVariable String message) {

        //指定消息 id 为 1
        CorrelationData correlationData1 = new CorrelationData("1");
        String routingKey = "key1";
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData1);
        log.info(routingKey + "发送消息内容:{}", message + routingKey);

        CorrelationData correlationData2 = new CorrelationData("2");
        routingKey = "key2";
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData2);
        log.info(routingKey + "发送消息内容:{}", message + routingKey);

    }

}

(4)消息消费者

@Component
@Slf4j
public class ConfirmConsumer {
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";

    @RabbitListener(queues = CONFIRM_QUEUE_NAME)
    public void receiveMsg(Message message) {
        String msg = new String(message.getBody());
        log.info("接受到队列 confirm.queue 消息:{}", msg);
    }

}

5、回退消息

Mandatory 参数

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如 果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。那么如何 让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过设置 mandatory 参 数可以在当消息传递过程中不可达目的地时将消息返回给生产者。

(1)开启发布消息回退消息的核心配置

在这里插入图片描述

#消息退回
spring.rabbitmq.publisher-returns=true

(2)继承回调接口

在这里插入图片描述

(3)实现回调函数

在这里插入图片描述

 @Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

        log.error("消息{},被交换机{}退回,退回原因:{},路由Key:{}",new String(message.getBody()),exchange,replyText,routingKey);
    }

测试效果1:

在这里插入图片描述

报错原因:没有注入自定义的回调函数到接口里面

在这里插入图片描述

测试效果2:

在这里插入图片描述

6、备份交换机

有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?

前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。 在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。

备份交换机可以理解为 RabbitMQ 中交换机的 “备胎”当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们①在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们②还可以建立一个报警队列,用独立的消费者来进行监测和报警。

在这里插入图片描述

代码实现

(1)配置类中定义、声明交换机、队列

在这里插入图片描述

在这里插入图片描述

(2)按照架构图绑定交换机和队列

在这里插入图片描述

(3)设置备份交换机

在这里插入图片描述

@Configuration
public class ConfirmConfig {
 public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
 public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
 public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
 public static final String BACKUP_QUEUE_NAME = "backup.queue";
 public static final String WARNING_QUEUE_NAME = "warning.queue";
 // 声明确认队列
 @Bean("confirmQueue")
 public Queue confirmQueue(){
 return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
 }
 //声明确认队列绑定关系
 @Bean
 public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
 @Qualifier("confirmExchange") DirectExchange exchange){
 return BindingBuilder.bind(queue).to(exchange).with("key1");
 }
 //声明备份 Exchange
 @Bean("backupExchange")
 public FanoutExchange backupExchange(){
 return new FanoutExchange(BACKUP_EXCHANGE_NAME);
 }
 //声明确认 Exchange 交换机的备份交换机
 @Bean("confirmExchange")
 public DirectExchange confirmExchange(){
 ExchangeBuilder exchangeBuilder =
 ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
 .durable(true)
//设置该交换机的备份交换机
.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);
 return (DirectExchange)exchangeBuilder.build();
 }
 // 声明警告队列
 @Bean("warningQueue")
 public Queue warningQueue(){
 return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
 }
 // 声明报警队列绑定关系
 @Bean
 public Binding warningBinding(@Qualifier("warningQueue") Queue queue,
 @Qualifier("backupExchange") FanoutExchange 
backupExchange){
 return BindingBuilder.bind(queue).to(backupExchange);
 }
 // 声明备份队列
 @Bean("backQueue")
 public Queue backQueue(){
 return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
 }
 // 声明备份队列绑定关系
 @Bean
 public Binding backupBinding(@Qualifier("backQueue") Queue queue,
 @Qualifier("backupExchange") FanoutExchange backupExchange){
 return BindingBuilder.bind(queue).to(backupExchange);
 }
}

(4)报警消费者(转发消费者自己拓展)

在这里插入图片描述

测试效果:

测试之前:

去RabbitMQ的web管理界面删除已经建立的交换机,否则会和将要建立的交换机和备用交换机起冲突

重新启动项目的时候需要把原来的 confirm.exchange 删除因为我们修改了其绑定属性

在这里插入图片描述

备用交换机成功路由

在这里插入图片描述

小结

mandatory 参数与备份交换机可以一起使用的时候,如果两者同时开启,消息究竟何去何从?谁优先 级高,经过上面结果显示答案是备份交换机优先级高

在这里插入图片描述

回退消息的优先级低

RabbitMQ-消息队列:发布确认高级 到此完结,笔者归纳、创作不易,大佬们给个3连再起飞吧

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1466905.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

LiveQing视频点播流媒体RTMP推流服务功能-支持配置开启 HTTPS 服务什么时候需要开启HTTPS服务

LiveQing视频点播流媒体RTMP推流服务功能支持配置开启 HTTPS 服务什么时候需要开启HTTPS服务 1、配置开启HTTPS1.1、准备https证书1.1.1、选择Nginx类型证书下载 1.2、配置 开启 HTTPS1.2.1 web页面配置1.2.2 配置文件配置 2、验证HTTPS服务3、为什么要开启HTTPS3.1、安全性要求…

HTML+CSS:动态搜索框

效果演示 这段代码实现了一个简单的搜索栏效果。页面背景为从天蓝色到深蓝色的渐变色,搜索栏包括一个圆形背景的搜索图标和一个输入框。当用户点击搜索图标时,输入框会从搜索图标的位置滑出,显示一个输入框和一个清除按钮。用户可以在输入框中…

【JAVA】Tomcat的安装

目录 官网链接 下载安装程序 解压 启动Tomcat 问题 其他文件介绍 官网链接 Apache Tomcat - Welcome!https://tomcat.apache.org/ 下载安装程序 进入官网后,点击如下tomcat 8: 进入tomcat 8后选择zip文件下载 解压 Tomcat是一个基于java实现的“绿色软件…

八、线性代数二 ,矩阵的秩

目录 1、矩阵子式的定义与子式个数的计算: 2、矩阵秩的定义: 3、矩阵秩的计算方法: 4、矩阵秩的 性质: 线性代数四——几个重要的矩阵点积_线性代数 矩阵点积-CSDN博客 1、矩阵子式的定义与子式个数的计算: 概念&…

【数学建模规则】2024年第九届数维杯大学生数学建模挑战赛参赛指南

一、竞赛介绍 数维杯大学生数学建模挑战赛每年分为两场,每年上半年为数维杯国赛(5月,俗称小国赛),下半年为数维杯国际赛(11月),2023年第八届数维杯大学生数学建模挑战赛共有近1.4万名学生参赛,…

vue : 无法加载文件 C:\Program Files\nodejs\node_global\vue.ps1,因为在此系统上禁止运行脚本。

解决方法: 打开PowerShell,在命令框输入set-ExecutionPolicy RemoteSigned 在PowerShell中输入会出现如下图,输入y即可。

【Flink精讲】Flink组件通信

主要指三个进程中的通讯 CliFrontendYarnJobClusterEntrypointTaskExecutorRunner Flink内部节点之间的通讯使用Akka,比如JobManager和TaskManager之间。而operator之间的数据传输是利用Netty。 RPC是统称,Akka,Netty是实现 Akka与Ac…

网络编程 TCP/UDP通信

网络编程 TCP/UDP通信 1. 0.0.0.0地址与客户端bind函数2. UDP 服务器与客户端通信3. TCP 服务器与客户端通信 1. 0.0.0.0地址与客户端bind函数 0.0.0.0的地址作用 在网络编程中,0.0.0.0是一个特殊的IP地址,通常用于表示"任意地址"或"所有…

Linux:ACL权限,特殊位和隐藏属性

目录 一.什么是ACL 二.操作步骤 ① 添加测试目录、用户、组,并将用户添加到组 ② 修改目录的所有者和所属组 ③ 设定权限 ④ 为临时用户分配权限 ⑤ 验证acl权限 ⑥ 控制组的acl权限 三. 删除ACL权限 一.什么是ACL 访问控制列表 (Access Control List):ACL 通…

面试经典150题——快乐数

​"Success is not final, failure is not fatal: It is the courage to continue that counts." - Winston Churchill 1. 题目描述 2. 题目分析与解析 2.1 思路一 还是最简单的,模拟最直观的思路,就是进行一个while循环。比如:…

Unity资源加密解决方案

据统计,全球范围内超过50%的游戏均使用Unity创作而成,作为游戏开发市场第一大游戏引擎占有者,Unity已经全面覆盖到各个游戏平台。 全球游戏引擎市场占有率 由于体量庞大,Unity游戏已成为受游戏黑灰产攻击的重灾区,因游…

rider 缺少iisexpress

File C:/Program Files (x86)/IIS Express/iisexpress.exe doesn’t exist iisexpress下载 64位系统只能安装64位,32位系统安装32位 安装完成之后就有了

【Java程序设计】【C00284】基于Springboot的校园疫情防控管理系统(有论文)

基于Springboot的校园疫情防控管理系统(有论文) 项目简介项目获取开发环境项目技术运行截图 项目简介 这是一个基于Springboot的校园疫情防控系统 本系统分为系统功能模块、管理员功能模块以及学生功能模块。 系统功能模块:在系统首页可以查…

【无刷电机学习】各种电机优势比较

目录 0 参考出处 1 有刷与无刷比较 2 交流与直流比较 3 内转子与外转子比较 4 Delta型与Y型定子绕向比较 5 低压BLDC的一些优点 0 参考出处 【仅作自学记录,不出于任何商业目的。如有侵权,请联系删除,谢谢!】 维基百科…

个人简历补充

个人简历补充 1.对工作的认识2.八股文和知识面3.框架/架构角度深扒3.1 前端3.1.1 mPaaS(移动领域)3.1.2 普通前端项目框架3.1.3 微前端 3.2 后端 持续更新 1.对工作的认识 2.八股文和知识面 前端(基础知识 / 开发能力 / 总结输出能力&#xf…

易宝OA DownloadFile 任意文件读取漏洞复现

0x01 产品简介 易宝OA系统是一种专门为企业和机构的日常办公工作提供服务的综合性软件平台,具有信息管理、 流程管理 、知识管理(档案和业务管理)、协同办公等多种功能。 0x02 漏洞概述 易宝OA系统DownloadFile接口处存在任意文件读取漏洞,未授权的攻击者可以利用此漏洞…

knife4j springboot3使用

简介 在日常开发中,写接口文档是我们必不可少的,而Knife4j就是一个接口文档工具,可以看作是Swagger的升级版,但是界面比Swagger更好看,功能更丰富 使用 我使用的是springboot3.2.3 knife4j 4.3.0,knife4j 4.4版本有…

Windows 远程控制 Mac 电脑怎么操作

要从 Windows 远程控制 Mac 电脑,您可以使用内置 macOS 功能或第三方软件解决方案。以下是一些方法: 一、使用内置 macOS 功能(屏幕共享) 1、在 macOS 上启用屏幕共享 转至系统偏好设置 > 共享;选中“屏幕共享”…

Linux--自定义shell

shell shell就是操作系统提供给用户与操作系统进行交互的命令行界面。它可以理解为一个用户与操作系统之间的接口,用户可以通过输入命令来执行各种操作,如文件管理、进程控制、软件安装等。Shell还可以通过脚本编程实现自动化任务。 常见的Unix系统中使…