八、发布确认高级

news2025/1/22 17:45:14

在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复

如何才能进行 RabbitMQ 的消息可靠投递呢?特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢:

一、发布确认 springboot 版本

1) 确认机制方案

  • 生产者发送消息时,将消息进行备份(例如放到缓存中)
  • 当交换机收到消息,通过回调函数从备份中清除掉已经收到的消息
  • 通过定时任务将未发送的消息重新投递
    在这里插入图片描述

2)代码架构图
在这里插入图片描述

3)配置文件添加开启发布确认模式
在配置文件当中需要添加:spring.rabbitmq.publisher-confirm-type=correlated

该配置有三种:

  • NONE:禁用发布确认模式,是默认值
  • CORRELATED:发布消息成功到交换器后会触发回调方法
  • SIMPLE: 【自己没有试过】
spring:
  rabbitmq:
    host: 192.168.126.10
    port: 5672
    username: admin
    password: 123
    publisher-confirm-type: none

4)消息生产者

@RestController
@RequestMapping("/confirm")
@Slf4j
public class Producer {
    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private MyCallBack myCallBack;

    //依赖注入 rabbitTemplate 之后再设置它的回调对象
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(myCallBack);
    }

    @GetMapping("sendMessage/{message}")
    public void sendMessage(@PathVariable String message) {
        //指定消息 id 为 1
        CorrelationData correlationData1 = new CorrelationData("1");
        String routingKey = "key1";

        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData1);
        CorrelationData correlationData2 = new CorrelationData("2");
        routingKey = "key2";

        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData2);
        log.info("发送消息内容:{}", message);
    }

}

5)发布确认回调接口

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if(ack) {
            log.info("交换机已经收到id为{}的消息",id);
        } else {
            log.error("交换机还未收到id为{}的消息,原因{}",id,cause);
        }

    }
}

6)消息消费者

@Component
@Slf4j
public class ConfirmConsumer {
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";

    @RabbitListener(queues = CONFIRM_QUEUE_NAME)
    public void receiveMsg(Message message) {
        String msg = new String(message.getBody());
        log.info("接受到队列 confirm.queue 消息:{}", msg);
    }
}

7)测试
http://localhost:8080/confirm/sendMessage/aaaaa
在这里插入图片描述
可以看到,发送了两条消息,第一条消息的 RoutingKey 为 “key1”,第二条消息的RoutingKey 为"key2",两条消息都成功被交换机接收,也收到了交换机的确认回调

但消费者只收到了一条消息,因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所以第二条消息被直接丢弃了

如果把配置改为
在这里插入图片描述
结果为 : 发布发布确认的回调函数没起作用
在这里插入图片描述

二、回退消息

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。那么如何让无法被路由的消息也被生产者捕获到

1、 Mandatory 参数

通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者

2、 消息生产者代码

@Component
@RequestMapping("/confirm2")
@Slf4j
public class MessageProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";

    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    private void init() {
        rabbitTemplate.setConfirmCallback(this);
        /**
         * true:
         * 交换机无法将消息进行路由时,会将该消息返回给生产者
         * false:
         * 如果发现消息无法进行路由,则直接丢弃
         */
        rabbitTemplate.setMandatory(true);
        //设置回退消息交给谁处理
        rabbitTemplate.setReturnCallback(this);
    }

    @GetMapping("sendMessage/{message}")
    public void sendMessage(@PathVariable String message) {
        //指定消息 id 为 1
        CorrelationData correlationData1 = new CorrelationData(UUID.randomUUID().toString());
        String routingKey = "key1";
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData1);
        log.info("发送消息 id 为:{}内容为{}",correlationData1.getId(),message+"key1");

        CorrelationData correlationData2 = new CorrelationData(UUID.randomUUID().toString());
        routingKey = "key2";
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData2);
        log.info("发送消息 id 为:{}内容为{}",correlationData2.getId(),message+"key2");
    }


    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交换机收到消息确认成功, id:{}", id);
        } else {
            log.error("消息 id:{}未成功投递到交换机,原因是:{}", id, cause);
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String
            exchange, String routingKey) {
        log.info("消息:{}被服务器退回,退回原因:{}, 交换机是:{}, 路由 key:{}",
                new String(message.getBody()), replyText, exchange, routingKey);
    }
}

测试: 可以看到,当消息无法被路由时,该消息会被通知到消息生产者
在这里插入图片描述

三、备份交换机

有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。

但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。而且设置 mandatory 参数会增
加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?

在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警

1、备份交换机使用架构图

在这里插入图片描述

2、配置类

@Configuration
public class BackupExchangeConfig {

    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
    public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
    public static final String BACKUP_QUEUE_NAME = "backup.queue";
    public static final String WARNING_QUEUE_NAME = "warning.queue";

    //声明备份 Exchange
    @Bean("backupExchange")
    public FanoutExchange backupExchange(){
        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    }
    //声明确认 Exchange 交换机的备份交换机
    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        ExchangeBuilder exchangeBuilder =
                ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
                        .durable(true)
                        //设置该交换机的备份交换机
                        .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);
        return (DirectExchange)exchangeBuilder.build();
    }

    // 声明确认队列
    @Bean("confirmQueue")
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    //声明确认队列绑定关系
    @Bean
    public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
                                @Qualifier("confirmExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("key1");
    }

    // 声明警告队列
    @Bean("warningQueue")
    public Queue warningQueue(){
        return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
    }

    // 声明报警队列绑定关系
    @Bean
    public Binding warningBinding(@Qualifier("warningQueue") Queue queue,
                                  @Qualifier("backupExchange") FanoutExchange
                                          backupExchange){
        return BindingBuilder.bind(queue).to(backupExchange);
    }

    // 声明备份队列
    @Bean("backQueue")
    public Queue backQueue(){
        return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
    }

    // 声明备份队列绑定关系
    @Bean
    public Binding backupBinding(@Qualifier("backQueue") Queue queue,
                                 @Qualifier("backupExchange") FanoutExchange backupExchange){
        return BindingBuilder.bind(queue).to(backupExchange);
    }

}

3、报警消费者

@Component
@Slf4j
public class WarningConsumer {
    public static final String WARNING_QUEUE_NAME = "warning.queue";

    @RabbitListener(queues = WARNING_QUEUE_NAME)
    public void receiveWarningMsg(Message message) {
        String msg = new String(message.getBody());
        log.error("报警发现不可路由消息:{}", msg);
    }
}

4、测试

http://localhost:8080/confirm2/sendMessage/aaaaa
在这里插入图片描述
发现:未被路由的消息确实经过备份交换机投递到 warning 队列

【注意】mandatory 参数与备份交换机可以一起使用的时候,如果两者同时开启,消息究竟何去何从?谁优先级高,经过上面结果显示答案是备份交换机优先级高。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/404520.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

20230311给Ubuntu18.04下的GTX1080M安装驱动

20230311给Ubuntu18.04下的GTX1080M安装驱动 2023/3/11 12:50 2. 安装GTX1080驱动 安装 Nvidia 驱动 367.27 sudo add-apt-repository ppa:graphics-drivers/ppa 第一次运行出现如下的警告: Fresh drivers from upstream, currently shipping Nvidia. ## Curren…

C++语法规则4(C++面向对象)

接口(抽象类) 接口描述了类的行为和功能,而不需要完成类的特定实现。C 接口是使用抽象类来实现的,抽象类与数据抽象互不混淆,数据抽象是一个把实现细节与相关的数据分离开的概念。 如果类中至少有一个函数被声明为纯虚…

在Win 11下使用Visual Studio 2019和cygwin编译JBR(Java SDK 17)源码

很多文章介绍了JDK 8和JDK11源码在Linux编译,很少有人介绍了JDK 17在windows的编译过程,所以写了这篇文章,为什么选用JBR 17版本,因为JBR17 版本集成了HotSwapAgent功能,具体HotSwapAgent有什么用,请看我前…

[N1CTF 2018]eating_cms1

一个cms,先打开环境试了一下弱口令,无效,再试一下万能密码,告诉我有waf,先不想怎么绕过,直接开扫(信息收集)访问register.php注册一个账号进行登录上面的链接尝试用php读文件http://…

学习笔记:基于SpringBoot的牛客网社区项目实现(三)之MyBatis入门

一、数据库建表 二、entity目录下创建user实体类 三、dao目录下创建userMapper映射接口 Mapper public interface UserMapper {User selectById(int id);User selectByName(String username);User selectByEmail(String email);int insertUser(User user);int updateStatus(i…

tun驱动之read

从tun驱动读取的数据,最终来源于用户空间通过write写入的数据,如下所示: inti fd socket(); int f open("/dev/net/tun", O_RDWR) write(fd, buf, len); --> 协议栈 --> t…

3-MATLAB APP Design-切换按钮组和单选按钮组

一、APP 界面设计展示 1.新建一个空白的APP,在此次的学习中,我们会用到编辑字段(文本框)、切换按钮、单选按钮,首先在界面中拖入一个编辑字段(文本框),在文本框中输入内容:切换按钮和单选按钮的使用,调整背景颜色,字体的颜色为黑色,字体的大小调为26. 2.在左侧组件…

Linux各种发行版介绍

Linux已经被广泛应用在人们的日常生活工作用品中,比如手机,智能家居,汽车电子,可穿戴设备等等,只不过很多人并不知道自己使用的电子设备里面运行的是linux系统。看一组数据:1.90%的公有云应用在使用Linux系…

利用Dockerfile开发定制镜像实战.

Dockerfile的原理 dockerfile是一种文本格式的文件,用于描述如何构建Docker镜像。在Dockerfile中,我们可以定义基础镜像、安装依赖、添加文件等操作,最终生成一个可以直接运行的容器镜像。 Dockerfile的原理可以分为以下几个步骤&#xff1a…

如何快速为子公司创建SAP财务账套的操作步骤

相对来说在SAP上配置一家子公司比从0开始创建创建一家公司可以节省很多步骤,因为子公司的很多配置(如科目表,科目,折旧表,折旧代码等)可以沿用母公司的。本文就简单介绍一下创建子公司财务账套的配置步骤.只…

中国省市选择插件

快速使用 1.引用 ChineseCities.min.js 2.拷贝以下布局结构 <select id"province"><option value"请选择城市">请选择省份</option> </select> <select id"city"><option value"请选择城市">请…

无监督对比学习(CL)最新必读经典论文整理分享

对比自监督学习技术是一种很有前途的方法&#xff0c;它通过学习对使两种事物相似或不同的东西进行编码来构建表示。Contrastive learning有很多文章介绍&#xff0c;区别于生成式的自监督方法&#xff0c;如AutoEncoder通过重建输入信号获取中间表示&#xff0c;Contrastive M…

设备树下的LED灯

一、什么是设备树设备树&#xff0c;将这个词分开就是设备和树&#xff0c;描述设备树的文件叫DTS(Device Tree Source)&#xff0c;这个DTS文件采用树形结构描述板级设备&#xff0c;也就是开发板上的设备信息&#xff0c;比如CPU数量、内存基地址、IIC接口上接了哪些设备、SP…

进度计划:什么是关键路径管理 1/2

目录 引言 什么是关键路径法&#xff1f; 为什么 CPM 调度对项目管理很重要&#xff1f; CPM 计划元素 关键路径方如何工作&#xff1f; 引言 关键路径&#xff0c;也称为最长路径&#xff0c;是直接影响项目完成日期的一系列任务。关键路径上的每项任务都称为关键活动。…

蓝桥杯C/C++VIP试题每日一练之芯片测试

💛作者主页:静Yu 🧡简介:CSDN全栈优质创作者、华为云享专家、阿里云社区博客专家,前端知识交流社区创建者 💛社区地址:前端知识交流社区 🧡博主的个人博客:静Yu的个人博客 🧡博主的个人笔记本:前端面试题 个人笔记本只记录前端领域的面试题目,项目总结,面试技…

【C++学习】【STL】deque容器

dequeDouble Ended Queues(双向队列)deque和vector很相似&#xff0c;但是它允许在容器头部快速插入和删除&#xff08;就像在尾部一样&#xff09;。所耗费的时间复杂度也为常数阶O(1)。并且更重要的一点是&#xff0c;deque 容器中存储元素并不能保证所有元素都存储到连续的内…

kubernetes实战与源码学习

1.1 关于Kubernetes的介绍与核心对象概念 关于Kubernetes的介绍与核心对象概念-阿里云开发者社区 k8s架构 核心对象 使用kubeadm10分钟部署k8集群 使用 KuboardSpray 安装kubernetes_v1.23.1 | Kuboard k8s-上部署第一个应用程序 Deployment基本概念 给应用添加service&a…

自组织(Self-organization),自组织临界性(Self-organized criticality)

文章目录1. 自组织1.1 概述1.2 原则1.3 历史1.4 按领域1.4.1 物理1.4.2 化学1.4.3 生物学1.4.4 宇宙学1.4.5 计算机科学1.4.6 控制论1.4.7 社会学1.4.8 经济学1.4.9 运输1.4.10 语言学1.4.11 研究1.5 自发秩序&#xff08;Spontaneous order&#xff09;1.5.1 历史2. 自组织临界…

【LeetCode每日一题】——671.二叉树中第二小的节点

文章目录一【题目类别】二【题目难度】三【题目编号】四【题目描述】五【题目示例】六【解题思路】七【题目提示】八【时间频度】九【代码实现】十【提交结果】一【题目类别】 深度优先搜索 二【题目难度】 简单 三【题目编号】 671.二叉树中第二小的节点 四【题目描述】…

PMP项目管理项目进度管理

目录1 项目进度管理概述2 规划进度管理3 定义活动4 排列活动顺序5 估算活动持续时间1 项目进度管理概述 项目进度管理包括为管理项目按时完成所需的各个过程。在工作分解结构的基础上&#xff0c;针对交付工作包的需要&#xff0c;列出为完成项目而必须进行的活动工作&#xf…