RabbitMQ - 发布确认高级

news2024/11/29 11:36:34

RabbitMQ - 发布确认高级

  • 发布确认 springboot 版本
  • 回退消息
  • 备份交换机

在生产环境中由于一些不明原因,导致 RabbitMQ 重启,在 RabbitMQ 重启期间生产者消息投递失败, 导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢?

发布确认 springboot 版本

确认机制方案:
在这里插入图片描述
代码架构图:
RabbitMQ-00000069
在配置文件当中需要添加

spring.rabbitmq.publisher-confirm-type=correlated
  • NONE 值是禁用发布确认模式,是默认值
  • CORRELATED 值是发布消息成功到交换器后会触发回调方法
  • SIMPLE 值经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker;

代码

1、添加配置类:

@Configuration
public class ConfirmConfig {
    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";

    //声明业务 Exchange
    @Bean("confirmExchange")
    public DirectExchange confirmExchange() {
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }

    // 声明确认队列
    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    // 声明确认队列绑定关系
    @Bean
    public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
                                @Qualifier("confirmExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("key1");
    }
}

2、消息生产者的回调接口

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
    /**
     * 交换机不管是否收到消息的一个回调方法
     *
     * @param correlationData 消息相关数据
     * @param ack             交换机是否收到消息
     * @param cause           为收到消息的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交换机已经收到 id 为:{}的消息", id);
        } else {
            log.info("交换机还未收到 id 为:{}消息,原因:{}", id, cause);
        }
    }
}

3、消息生产者

@RestController
@RequestMapping("/confirm")
@Slf4j
public class ProducerController {
    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private MyCallBack myCallBack;

    //依赖注入 rabbitTemplate 之后再设置它的回调对象
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(myCallBack);
    }
    
    /**
     * 消息回调和退回
     *
     * @param message
     */
    @GetMapping("sendMessage/{message}")
    public void sendMessage(@PathVariable String message) {

        //指定消息 id 为 1
        CorrelationData correlationData1 = new CorrelationData("1");
        String routingKey = "key1";
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData1);
        log.info(routingKey + "发送消息内容:{}", message + routingKey);

        CorrelationData correlationData2 = new CorrelationData("2");
        routingKey = "key2";
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData2);
        log.info(routingKey + "发送消息内容:{}", message + routingKey);
    }
}

4、消息消费者

@Component
@Slf4j
public class ConfirmConsumer {
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";

    @RabbitListener(queues = CONFIRM_QUEUE_NAME)
    public void receiveMsg(Message message) {
        String msg = new String(message.getBody());
        log.info("接受到队列 confirm.queue 消息:{}", msg);
    }

}

访问: http://localhost:8080/confirm/sendMessage/你好

结果分析:
在这里插入图片描述
可以看到,发送了两条消息,第一条消息的 RoutingKey 为 “key1”,第二条消息的 RoutingKey 为 “key2”,两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所有第二条消息被直接丢弃了。

丢弃的消息交换机是不知道的,需要解决告诉生产者消息传送失败

回退消息

Mandatory 参数

rabbitTemplate.setReturnsCallback(myCallBack);

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。

那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。

1、修改配置

#消息退回
spring.rabbitmq.publisher-returns=true

2、修改回调接口

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

    /**
     * 交换机不管是否收到消息的一个回调方法
     *
     * @param correlationData 消息相关数据
     * @param ack             交换机是否收到消息
     * @param cause           为收到消息的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交换机已经收到 id 为:{}的消息", id);
        } else {
            log.info("交换机还未收到 id 为:{}消息,原因:{}", id, cause);
        }
    }

    //当消息无法路由的时候的回调方法
    @Override
    public void returnedMessage(ReturnedMessage returned) {

        log.error("消息:{},被交换机 {} 退回,原因:{},路由key:{},code:{}",
                new String(returned.getMessage().getBody()), returned.getExchange(),
                returned.getReplyText(), returned.getRoutingKey(),
                returned.getReplyCode());

    }
}

低版本可能没有 RabbitTemplate.ReturnsCallback 请用 RabbitTemplate.ReturnCallback

@Override
public void returnedMessage(Message message, int replyCode, String replyText, String
exchange, String routingKey) {
	log.info("消息:{}被服务器退回,退回原因:{}, 交换机是:{}, 路由 key:{}",new String(message.getBody()),replyText, exchange, routingKey);
}

3、修改发送者 ProducerController

//依赖注入 rabbitTemplate 之后再设置它的回调对象
@PostConstruct
public void init() {
    //消息回调
    rabbitTemplate.setConfirmCallback(myCallBack);
    /**
     * true:交换机无法将消息进行路由时,会将该消息返回给生产者
     * false:如果发现消息无法进行路由,则直接丢弃
     */
    rabbitTemplate.setMandatory(true);
    //设置回退消息交给谁处理
    rabbitTemplate.setReturnsCallback(myCallBack);
  	//RabbitMQ版本低的是 rabbitTemplate.setReturnCallback(myCallBack);

}

访问: http://localhost:8080/confirm/sendMessage/你好

结果分析:
在这里插入图片描述

备份交换机

有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?

前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。 在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。

什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进 入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。

代码架构图
在这里插入图片描述
1、修改配置类

@Configuration
public class ConfirmConfig {
    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
    //关于备份的
    public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
    public static final String BACKUP_QUEUE_NAME = "backup.queue";
    public static final String WARNING_QUEUE_NAME = "warning.queue";


    /*
    //声明业务 Exchange
    @Bean("confirmExchange")
    public DirectExchange confirmExchange() {
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }
    */

    // 声明确认队列
    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    // 声明确认队列绑定关系
    @Bean
    public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
                                @Qualifier("confirmExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("key1");
    }

    //************************以下是关于备份的******************************

    //声明备份 Exchange
    @Bean("backupExchange")
    public FanoutExchange backupExchange() {
        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    }

    //声明确认 Exchange 交换机的备份交换机
    @Bean("confirmExchange")
    public DirectExchange confirmExchange() {
        ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
                .durable(true)
                //设置该交换机的备份交换机
                .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);
        return exchangeBuilder.build();
    }


    // 声明警告队列
    @Bean("warningQueue")
    public Queue warningQueue() {
        return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
    }

    // 声明报警队列绑定关系
    @Bean
    public Binding warningBinding(@Qualifier("warningQueue") Queue queue,
                                  @Qualifier("backupExchange") FanoutExchange backupExchange) {
        return BindingBuilder.bind(queue).to(backupExchange);
    }

    // 声明备份队列
    @Bean("backQueue")
    public Queue backQueue() {
        return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
    }

    // 声明备份队列绑定关系
    @Bean
    public Binding backupBinding(@Qualifier("backQueue") Queue queue,
                                 @Qualifier("backupExchange") FanoutExchange backupExchange) {
        return BindingBuilder.bind(queue).to(backupExchange);
    }
}

2、报警消费者

@Component
@Slf4j
public class WarningConsumer {
    public static final String WARNING_QUEUE_NAME = "warning.queue";

    @RabbitListener(queues = WARNING_QUEUE_NAME)
    public void receiveWarningMsg(Message message) {
        String msg = new String(message.getBody());
        log.error("报警发现不可路由消息:{}", msg);
    }
}

之前已写过 confirm.exchange 交换机,由于更改配置,需要删掉,不然会报错
在这里插入图片描述

访问: http://localhost:8080/confirm/sendMessage/你好
在这里插入图片描述

mandatory 参数与备份交换机可以一起使用的时候,如果两者同时开启,消息究竟何去何从?谁优先级高,经过上面结果显示答案是备份交换机优先级高。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/626299.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Steemit 会颠覆 Quora/知乎 甚至 Facebook 吗?

Steemit是基于区块链技术的社交媒体平台,其独特的激励机制吸引了众多用户。然而,是否能够真正颠覆Quora、知乎甚至Facebook这些已经成为社交巨头的平台,仍然存在着许多未知因素。本文将探讨Steemit的优势和挑战,以及其在社交领域中…

数据分析第15课pandas和matplotlib实战

01实战:911 导入: 读取数据: 转换类型:

java html导出添加空行和空格

情景: 要求导出签批单: 格式如下, 要获取“主办处室负责人”和“相关处室会签”环节的处理意见、处理人员和处理日期进行替换,导出word文档。 处理: 主要是如何拼接内容? 方法一: 导出word&…

爬虫如何发送 HTTP 请求

爬虫可以使用 Python 中的 requests 库来发送 HTTP 请求。requests 库提供了简单易用的 API,可以方便地发送 GET、POST 等请求,并且支持设置请求头、请求参数、代理等功能。 以下是一个使用 requests 库发送 GET 请求的示例代码: import re…

国产触控笔哪个牌子好?第三方电容笔推荐

对于那些把iPad当做学习工具的人来说,这已经成为了他们生活中不可缺少的一部分。但没有人买得起苹果原装电容笔,因为苹果电容笔的售价太贵了。因此,最好还是用一支普通的电容笔。我是个一个苹果粉,同时也是个数字发烧友&#xff0…

十年之约 记账表格(会员专享)

* * * 原创:刘教链 * * * 6号,教链发起了十年之约加密投资实证计划。 很多读者、会员纷纷同行。 有朋友问及教链所用记账表格。可根据文章中的介绍自制。为方便会员,教链已将该表格上传至刘教链的加密投资星球,供下载。 另外&…

喜报 | 小米智能语音通讯技术获“深圳人工智能行业应用奖”

日前,2022年度第二届“深圳人工智能奖”正式揭晓。 “小米智能语音技术在手机实时通信中的应用”项目,凭借丰硕的创新成果、广泛的应用场景,获得“深圳人工智能行业应用奖”! “深圳人工智能行业应用奖”旨在表彰人工智能行业应用…

springcloud使用nacos搭建注册中心

nacos安装这里就不细说了,(Nacos下载以及搭建环境_你非柠檬为何心酸142的博客-CSDN博客) 大家也可以去网上安装好,这里主要讲搭建 ,我们需要手动启动nacos, 输入(.\startup.cmd -m standalone),出现一下图标就代表ok 首先是父工程所需要的依…

Java009——Java数据类型变量的简单认识

一、Java数据类型 围绕以下3点学习: 1、什么是Java数据类型? 2、Java数据类型的作用? 3、Java有哪些数据类型? 4、熟悉Java8大基本数据类型 1.1、什么是Java数据类型? 当我们写Java代码时,需要把数据保存…

react设计模式,jsx

1.修改配置项: eject:暴露配置项!!!一旦暴露了就无法还原回去 会报错。这个错误:我们刚才把代码改了,在暴露之前,先让我们把代码提交到git历史区保留下来—防止暴露后的代码覆盖了我们该的代码 …

02_MySQL的索引结构

1. BTree索引 B-Tree即B树,Balance Tree,平衡树,它的高度远小于平衡二叉树的高度。2-3树是最简单的B树结构。B树的阶:节点的最多子节点个数。比如2-3树的阶是3,2-3-4树的阶是4。 1.1 初始化介绍 一颗b树,浅蓝色的块我们…

什么是AI业务流程质检,如何用它做好销售和服务过程监督

近几年,随着语音转写、语义理解和机器学习等技术的成熟,越来越多的企业开始部署基于AI技术的智能质检系统,来帮助坐席、销售和服务团队提高沟通质量管理能力,同时提升沟通中的客户体验。 不过,不论是最初的人工质检&a…

【Word技巧】打印部分内容或者隐藏不打印的内容,如何操作?

在工作中,我们经常需要打印各种Word文档,但有时候,我们只需要打印文档的其中一部分内容,或者有部分内容并不想打印出来,要如何操作呢? 还不了解的小伙伴,可以看看下面的方法哦。 一、设置打印其…

node.js+vue药品药店进销存管理系统jb526

开发语言 node.js 框架:Express 前端:Vue.js 数据库:mysql 数据库工具:Navicat 开发软件:VScode 重点研究的,关键的问题: (1)业务流程; (2)前台…

广发证券传媒互联网首席分析师旷实:大模型引发的创新浪潮不会很快结束丨数据猿专访...

‍数据智能产业创新服务媒体 ——聚焦数智 改变商业 AI大模型引爆了今年一季度的热点。 今年春节期间,来自微软投资的OpenAI旗下产品ChatGPT成为科技行业关注焦点,出现即推热了市场情绪。随后,国内百度首发文心一言,阿里、华为、…

DevExpress WPF功能区控件,更轻松创建应用工具栏!(上)

DevExpress WPF的Ribbon、Toolbar和Menus组件以Microsoft Office为灵感,针对WPF开发人员进行了优化,可帮助您在段时间内模拟当今最流行的商业生产力应用程序。 DevExpress WPF拥有120个控件和库,将帮助您交付满足甚至超出企业需求的高性能业…

深度学习应用篇-计算机视觉-语义分割综述[6]:DeepLab系列简介、DeepLabV3深入解读创新点、训练策略、主要贡献

【深度学习入门到进阶】必看系列,含激活函数、优化策略、损失函数、模型调优、归一化算法、卷积模型、序列模型、预训练模型、对抗神经网络等 专栏详细介绍:【深度学习入门到进阶】必看系列,含激活函数、优化策略、损失函数、模型调优、归一化…

MATLAB 之 数值积分和离散傅里叶变换

这里写目录标题 一、数值积分1. 数值积分基本原理2. 数值积分的实现2.1 变步长辛普森法2.2 自适应积分法2.3 高斯——克朗罗德法2.4 梯形积分法2.5 累计梯形积分 3. 多重定积分的数值求解 二、离散傅里叶变换1. 离散傅里叶变换算法简介2. 离散傅里叶变换的实现 一、数值积分 数…

通信算法之167: (低空无人机)机载通信物理层基带算法设计

一.物理层基带仿真 通信系统的链路级仿真主要可以分成5个部分。 1.系统参数 2.发送机算法 3.信道模型 4.接收机算法 5.统计性能 其中主要组成部分很明显是中间三部分,即发送,信道,接收。但系统参数和统计性能这两部分的适当设计会大大…

在webpack中使用Eslint

一、Eslint介绍 要在webpack中使用Eslint首先我们先了解下什么是Eslint 1. 什么是Eslint ESLint是一个用于在JavaScript代码中发现和报告问题的静态代码分析工具。它可以检测常见的编码错误,如拼写错误、变量未声明、使用未定义的变量等,还可以检测代…