RabbitMQ ---- 发布高级确认

news2025/3/12 13:31:19

RabbitMQ ---- 发布高级确认

  • 1. 发布确认 springboot 版本
    • 1.1 确认机制方案
    • 1.2 代码架构图
    • 1.3 配置文件
    • 1.4 添加配置类
    • 1.5 消息生产者
    • 1.6 回调接口
    • 1.7 消息消费者
    • 1.8 结果分析
  • 2. 回退消息
    • 2.1 Mandatory 参数
    • 2.2 回调接口
    • 2.3 结果分析
  • 3. 备份交换机
    • 3.1 代码架构图
    • 3.2 修改配置类
    • 3.3 报警消费者
    • 3.4 测试注意事项
    • 3.5 结果分析

在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢?特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢?

1. 发布确认 springboot 版本

1.1 确认机制方案

在这里插入图片描述

1.2 代码架构图

在这里插入图片描述

1.3 配置文件

在配置文件中添加

spring.rabbitmq.publisher-confirm-type=correlated
  • NONE: 禁用发布确认,是默认值
  • CORRELATED: 发布消息成功到交换器后会触发回调方法
  • SIMPLE: 经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker

在这里插入图片描述

1.4 添加配置类

/**
 * 配置类
 * @author dell
 * @date 2023/7/12 11:30
 */

@Configuration
public class ConfirmConfig {

    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";

    // 声明业务 Exchange
    @Bean("confirmExchange")
    public DirectExchange confirmExchange() {
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }

    // 声明确认队列
    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    // 声明确认队列绑定关系
    @Bean
    public Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("key1");
    }

}

1.5 消息生产者

/**
 * 消息生产者
 * @author dell
 * @date 2023/7/12 11:37
 */

@RestController
@RequestMapping("/confirm")
@Slf4j
public class Producer {

    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";

    @Resource
    private RabbitTemplate rabbitTemplate;

    private MyCallBack myCallBack;

    // 依赖注入 rabbitTemplate 之后再设置它的回调对象
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(myCallBack);
    }

    @GetMapping("sendMessage/{message}")
    public void sendMessage(@PathVariable String message) {
        // 指定消息 id 为 1
        CorrelationData correlationData1 = new CorrelationData("1");
        String  routingKey = "key1";
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData1);
        CorrelationData correlationData2 = new CorrelationData("2");
        routingKey = "key2";
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData2);
        log.info("发送消息内容: {}", message);
    }

}

1.6 回调接口

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
    /**
     * 交换机不管是否收到消息的一个回调方法
     * CorrelationData 消息相关数据
     * ack 交换机是否收到消息
     * cause 为收到消息的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交换机已经收到 id 为:{}的消息", id);
        } else {
            log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause);
        }
    }
}

1.7 消息消费者

/**
 * 消息消费者
 * @author dell
 * @date 2023/7/12 11:48
 */

@Component
@Slf4j
public class ConfirmConsumer {

    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
    
    @RabbitListener(queues = CONFIRM_QUEUE_NAME)
    public void receiveMsg(Message message) {
        String msg = new String(message.getBody());
        log.info("接受到队列 confirm.queue 消息: {}", msg);
    }

}

1.8 结果分析

在这里插入图片描述

可以看到,发送了两条消息,第一条消息的 RoutingKey 为 “key1”,第二条消息的 RoutingKey 为 “key2”,两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为第二条小的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能够接收这个消息,所以第二条消息被直接丢弃了。

2. 回退消息

2.1 Mandatory 参数

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。

2.2 回调接口

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    /**
     * 交换机不管是否收到消息的一个回调方法
     * CorrelationData 消息相关数据
     * ack 交换机是否收到消息
     * cause 为收到消息的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交换机已经收到 id 为:{}的消息", id);
        } else {
            log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause);
        }
    }

    // 当消息无法路由的时候的回调方法
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.error(" 消 息 {}, 被交换机 {} 退回,退回原因 :{}, 路 由 key:{}",new String(returnedMessage.getMessage().getBody()), returnedMessage.getExchange(), returnedMessage.getReplyText(), returnedMessage.getRoutingKey());
    }

}

2.3 结果分析

在这里插入图片描述

3. 备份交换机

有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。

3.1 代码架构图

在这里插入图片描述

3.2 修改配置类

/**
 * 配置类
 *
 * @author dell
 * @date 2023/7/12 11:30
 */

@Configuration
public class ConfirmConfig {

    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
    public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
    public static final String BACKUP_QUEUE_NAME = "backup.queue";
    public static final String WARNING_QUEUE_NAME = "warning.queue";

    // 声明确认队列
    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    // 声明确认队列绑定关系
    @Bean
    public Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("key1");
    }

    // 声明备份 Exchange
    @Bean("backupExchange")
    public FanoutExchange backupExchange() {
        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    }

    // 声明确认 Exchange 交换机的备份交换机
    @Bean("confirmExchange")
    public DirectExchange confirmExchange() {
        ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
                .durable(true)
                // 设置该交换机的备份交换机
                .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);
        return (DirectExchange) exchangeBuilder.build();
    }

    // 声明警告队列
    @Bean("warningQueue")
    public Queue warningQueue() {
        return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
    }

    // 声明报警队列绑定关系
    @Bean
    public Binding warningBinding(@Qualifier("warningQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange backupExchange) {
        return BindingBuilder.bind(queue).to(backupExchange);
    }

    // 声明备份队列
    @Bean("backQueue")
    public Queue backQueue() {
        return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
    }

    // 声明备份队列绑定关系
    @Bean
    public Binding backupBinding(@Qualifier("backQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange backupExchange) {
        return BindingBuilder.bind(queue).to(backupExchange);
    }
    
}

3.3 报警消费者

/**
 * 报警消费者
 * @author dell
 * @date 2023/7/13 0:07
 */

@Component
@Slf4j
public class WarningConsumer {

    public static final String WARNING_QUEUE_NAME = "warning.queue";

    @RabbitListener(queues = WARNING_QUEUE_NAME)
    public void receiveWarningMsg(Message message) {
        String msg = new String(message.getBody());
        log.error("报警发现不可路由消息:{}", msg);
    }

}

3.4 测试注意事项

重启项目的时候需要把原来的 confirm.exchange 删除因为我们修改了其绑定属性,不然会报错

在这里插入图片描述

3.5 结果分析

在这里插入图片描述

mandatory 参数与备份交换机可以一起使用的时候,如果两者同时开启,消息究竟何去何从?谁优先级高,经过上面结果显示答案是备份交换机优先级高。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/761905.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CAD可以转换成PDF吗?教你简单好用的转换方法

PDF格式是一种通用格式,可以在不同的设备和操作系统上轻松打开和查看,这使得共享和协作变得更加容易和高效。尤其是在远程工作的情况下,PDF格式能够让团队成员更方便地分享和合作,不受地理位置和设备的限制。那么怎么将CAD文件转换…

7. Java + Selenium 环境搭建

前提:Java 版本最低要求为 8;推荐使用 chrome 浏览器 chrome Java 1. 下载 chrome 浏览器(推荐) 2. 查看 chrome 浏览器版本 重点记住前两位即可。 3. 下载 chrome 浏览器驱动 下载链接: https://chromedriver.…

IPD跟敏捷、DevOps一样吗?有什么区别?

1992年在激烈的全球市场竞争下,IBM遭遇到了严重的财政困难,公司销售收入停止增长,利润急剧下降。经过内部分析,IBM发现他们在研发费用、研发损失费用和产品上市时间等几个方面远远落后于业界最佳。为了重新获得市场竞争优势&#…

SpringBoot源码分析(6)--SpringBootExceptionReporter/异常报告器

文章目录 一、前言二、异常报告器介绍2.1、作用2.2、接口定义2.3、FailureAnalyzer错误分析器2.4、FailureAnalysisReporter错误报告器 三 、SpringBootExceptionReporter源码分析四、shutdownHook介绍4.1、背景4.2、什么是Shutdown Hook4.3、什么时候会调用Shutdown Hook4.4、…

MYSQL 5.7.17 安装版 的配置文件

解压版解压后都有 my.ini配置文件,安装版要查找这个配置文件可以查看 MYSQL Workbench --> 左侧 INSTANCE --> Options File ,然后可以看到底部 Configuration File所处的位置,即为my.ini的路径。

医疗设备如何保障?蓄电池自动监测,简直太牛了!

蓄电池监控在医院中扮演着重要的角色,确保在电力故障或断电时医院能够继续供电,保障医疗设备和关键系统的正常运行。 通过监测蓄电池的状态、充电状态和容量,以及触发警报和提醒,监控系统可以提前发现蓄电池的故障或异常情况&…

计算机网络 day8 动态路由 - NAT - SNAT实验 - VMware的网卡的3种模式

目录 动态路由:IGP 和 EGP 参考网课:4.6.1 路由选择协议概述_哔哩哔哩_bilibili ​编辑 IGP(Interior Gateway Protocol)内部网关协议: EGP(Interior Gateway Protocol)外部网关协议&#x…

专精特新如何养成?先搞清楚成长路径和核心能力激活高质量发展!

头雁勤,群雁便能“春风一夜到衡阳”。群雁齐飞,最重要的是头雁引领。 当前加快中小企业数字化转型正当其时,“专精特新”企业势必将肩负起“领头雁”之任,为中小企业转型发展做出表率。 装备制造业 专精特新“主力军” 纵观目前…

SpringBoot Data JPA 集成多租户

背景: ​ iot-kit项目用的是SpringBoot JPA,不是Mybatis,项目中需要引入多租户。 文章中心思想: 通过Hibernate Filters 和AspectJ 切面编程,实现SpringBoot JPA多租户 什么是多租户 ​ 多租户我理解就是一个网站允…

【EXCEL】通过url获取网页表格数据

目录 0.环境 1.背景 2.具体操作 0.环境 windows excel2021 1.背景 之前我用python的flask框架的爬虫爬取过豆瓣网的电影信息,没想到excel可以直接通过url去获取网页表格内的信息,比如下图这是电影信息界面 即将上映电影 (douban.com) 通过excel操作&…

Cache——让CPU更快地执行你的代码

概要 Cache对性能的影响 首先我们要知道,CPU访问内存时,不是直接去访问内存的,而是先访问缓存(cache)。 当缓存中已经有了我们要的数据时,CPU就会直接从缓存中读数据,而不是从内存中读。 CPU…

Python基础编程案例之编写交互式博客系统

文章目录 1、博客系统的需求描述2、面向用户层面各功能的设计思路与代码编写2.1.定义文章库2.2.文章的发布2.3.删除文章2.4.修改文章的标题以及内容2.5.在评论区添加评论2.6.删除文章中的某条评论2.7.阅读文章2.8.对文章进行点赞2.9.对文章进行收藏2.10.对文章进行打赏2.11.查询…

WorkPlus AI助理:结合ChatGPT对话能力与企业数据,助力企业级AI构建!

WorkPlus AI助理是基于GPT和私有数据构建智能知识库和个性化AI,能够帮助企业生成博客、白皮书、社交媒体帖子、新闻稿等等,这些内容可以用于推广产品、服务,增强品牌形象和知名度。此外,利用WorkPlus AI助理还可以生成电子邮件、利…

基于linux串口实现语音刷抖音

目录 1.开发逻辑图及模块 2.编程实现语音和开发板通信 3.手机接入Linux热拔插相关,打开手机开发者模式允许USB调试 4.用shell指令来操作手机屏幕,模拟手动滑屏幕 5.最终主程序代码 1.开发逻辑图及模块 逻辑图: 模块 (1)语音…

读kafka生产端源码,窥kafka设计之道(上)

1. kafka 高吞吐之道-------异步提交批量发送 简约的发送接口----后面隐藏着并不简单的设计 kafka发送消息的接口非常简约,在简约的表面上,其背后却并不简单。先看下发送接口 kafkaProducer.send(new ProducerRecord(topic,msg), new Callback() {Ove…

8、链路层以太网协议,ARP协议32

网络层IP协议描述了通信中的起点到终点,但是数据不是飞过去的,是经过了大量的中间节点转发完成的。 一、以太网协议 1、MAC地址 物理硬件地址,是每一块网卡在出厂时设定的地址,固定且不可修改(早期,现在可…

当DevOps遇到AI,黑马迎来3.0时代丨IDCF

随着GhatGPT的爆火,人工智能和研发效能,无疑成为了2023的两个最重要的关键词。大规模语言模型LLM和相关应用的快速发展正在对研发团队的工作方式产生深远影响,这几乎象征着新的生产力革命的到来。 那么,作为一名工程师&#xff0…

Chat GPT是什么,初学者怎么使用Chat GPT,需要注意些什么

目录 Chat GPT是什么 初学者怎么使用Chat GPT 使用Chat GPT需要注意什么 一些简单的prompt示例 Chat GPT是什么 Chat GPT是由OpenAI开发的一种大型语言模型,它基于GPT(Generative Pre-trained Transformer)架构。GPT是一种基于深度学习的…

【Matlab】智能优化算法_遗传算法GA

【Matlab】智能优化算法_遗传算法GA 1.背景介绍2.数学模型3.文件结构4.详细代码及注释4.1 crossover.m4.2 elitism.m4.3 GeneticAlgorithm.m4.4 initialization.m4.5 Main.m4.6 mutation.m4.7 selection.m4.8 Sphere.m 5.运行结果6.参考文献 1.背景介绍 遗传算法(Ge…

(学习笔记)TCP 为什么是三次握手?不是两次、四次?

常规回答:“因为三次握手才能保证双方具有接收和发送的能力” 原因一:避免历史连接 三次握手的首要原因是为了防止旧的重复连接初始化造成混乱。 假设:客户端先发送了SYN(seq90)报文,然后客户端宕机了,而且这个SYN报…