RabbitMQ发布确认高级篇(RabbitMQ Release Confirmation Advanced Edition)

news2025/1/8 5:56:50

系统学习消息队列——RabbitMQ的发布确认高级篇

简介

‌RabbitMQ是一个开源的消息代理软件,实现了‌高级消息队列协议(AMQP)‌,主要用于在分布式系统中进行消息传递。RabbitMQ由‌‌Erlang语言编写,具有高性能、健壮性和可伸缩性,适用于各种规模的企业应用‌。

基本概念和功能

RabbitMQ作为一个消息中间件,主要功能包括接收和转发消息,支持“生产者-消费者模型”。生产者不断向消息队列中写入消息,而消费者则从队列中读取或订阅消息。RabbitMQ支持多种消息传递模式,如普通模式、工作模式、发布/订阅模式等,以满足不同的应用场景需求‌。

架构和关键组件

RabbitMQ的架构基于生产者-消费者模型,通过队列实现消息的存储和转发。队列具有先进先出(FIFO)的特性,并且可以设置持久化、独占、自动删除等属性。RabbitMQ还引入了交换机和路由键等概念,以实现更灵活和复杂的消息路由和分发机制‌。

应用场景和优势

RabbitMQ广泛应用于需要高并发处理、流量削峰、系统解耦和提高可靠性的场景。其优势包括:

  • ‌高性能‌:RabbitMQ能够处理高并发请求,确保系统的稳定运行。
  • ‌高可靠性‌:通过消息的持久化存储和故障恢复机制,确保消息不会丢失。
  • ‌灵活性‌:支持多种消息传递模式和路由规则,满足复杂的应用需求

1.消息发布确认的方案

2.消息的回退

3.备份交换机

1.消息发布确认的方案
在前面的文章中,系统学习消息队列——RabbitMQ的消息发布确认,我们一定程度上学习了消息的发布确认的基础,但是在生产环境中,由于RabbitMq的重启,RabbitMQ在重启过程中投递失败,导致消息丢失,需要手动处理和恢复。那么我们该如何保证当RabbitMQ不可用的时候,消息的稳定投递呢?

我们采取下面的方案:

我们将要发送消息做一个持久化,发送消息的时候,我们持久化一份到数据库或者缓存中,当发送消息失败的时候,我们进行一次重新发送。所以在发送消息的时候,我们要进行代码业务逻辑的处理:

yml:

server:
port:11000
spring:
rabbitmq:
host:127.0.0.1
port:5672
username:guest
password:guest
publisher-confirm-type:correlated

publisher-confirm-type这个参数一共有三种配置方法:

NONE:
禁用发布确认,是默认值。
CORRELATED:
发布消息后,交换机会触发回调方法。
SIMPLE:
有两种效果:
1:和CORRELATED一样会触发回调方法
2:发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker。

回调方法类:

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {

/**
     * 交换机是否收到消息的回调方法
     * CorrelationData 消息相关数据
     * ack 交换机是否收到消息
     * cause 交换机未收到消息的原因
     */
@Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("交换机已经收到 id 为:{}的消息", correlationData.getId());
        } else {
log.info("交换机还未收到 id 为:{}消息,由于原因:{}", correlationData.getId(), cause);
        }
    }

}

队列配置类:

@Configuration
publicclassConfirmQueueConfig {

publicstatic final StringCONFIRM_EXCHANGE_NAME = "confirm.exchange";
publicstatic final StringCONFIRM_QUEUE_NAME = "confirm.queue";

@Autowired
privateMyCallBack myCallBack;
@Autowired
privateRabbitTemplate rabbitTemplate;

//依赖注入 rabbitTemplate 之后再设置它的回调对象
@PostConstruct
publicvoidinit() {
        rabbitTemplate.setConfirmCallback(myCallBack);
    }

//声明业务 Exchange
@Bean("confirmExchange")
publicDirectExchangeconfirmExchange(){
returnnewDirectExchange(CONFIRM_EXCHANGE_NAME);
    }

// 声明确认队列
@Bean("confirmQueue")
publicQueueconfirmQueue(){
returnQueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

// 声明确认队列绑定关系
@Bean
publicBindingqueueBinding(@Qualifier("confirmQueue") Queue queue,
@Qualifier("confirmExchange") DirectExchange exchange){
returnBindingBuilder.bind(queue).to(exchange).with("key1");
    }


}

生产者:

@RestController
@RequestMapping("/confirm")
@Slf4j
public class ProducerController {

    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
    @Autowired
private RabbitTemplate rabbitTemplate;


    @GetMapping("sendMessage/{message}")
    public void sendMessage(@PathVariable String message) {
//指定消息 id 为 1
        CorrelationData correlationData1 = new CorrelationData("1");
//这个key1是有交换机的key,会发送成功
        String routingKey = "key1";
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData1);
//这个交换机不存在,会发送失败
        CorrelationData correlationData2 = new CorrelationData("2");
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME+"1", routingKey, message + routingKey, correlationData2);
        CorrelationData correlationData3 = new CorrelationData("3");
//这个key2是没有交换机的key,会发送失败
        routingKey = "key2";
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData3);
        log.info("发送消息内容:{}", message);
    }
}

消费者:

@Component
@Slf4j
publicclassConfirmConsumer {

publicstatic final StringCONFIRM_QUEUE_NAME = "confirm.queue";

@RabbitListener(queues =CONFIRM_QUEUE_NAME)
publicvoidreceiveMsg(Message message){
String msg=newString(message.getBody());
        log.info("接受到队列 confirm.queue 消息:{}",msg);
    }


}

我们发送信息:
http://localhost:11000/confir...可以啊

我们发送三条消息:
一条是有交换机有队列的消息
二条是没有交换机的消息
三条是有交换机没有队列的消息

结果如下:

我们可以看出:
第一条消息正常消费
第二条消息找不到交换机,抛异常了
第三条消息绑定键找不到队列,这条消息直接被抛弃了

2.消息的回退

我们发现第三条消息的反馈并不是很好,在仅仅开启了生产者确认机制的情况下,交换机收到消息后,会直接给生产者发送确认消息,如果该消息不可路由,那么消息会直接被抛弃,此时生产者是不知道这条消息被丢弃的。所以我们这里要引入消息的回退机制,如果消息不能路由到队列,就会有一个通知,通过设置mandatory参数可以将不可抵达队列的消息返回给生产者。

回调处理逻辑:

@Component
@Slf4j
publicclassMyCallBackimplementsRabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

/**
     * 交换机是否收到消息的回调方法
     * CorrelationData 消息相关数据
     * ack 交换机是否收到消息
     * cause 交换机未收到消息的原因
     */
@Override
publicvoidconfirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
            log.info("交换机已经收到 id 为:{}的消息", correlationData.getId());
        } else {
            log.info("交换机还未收到 id 为:{}消息,由于原因:{}", correlationData.getId(), cause);
        }
    }


@Override
publicvoidreturnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error(" 消 息 {}, 被 交 换 机 {} 退 回 , 退 回 原 因 :{}, 路 由 key:{}",
newString(message.getBody()),
                exchange,
                replyText,
                routingKey);
    }



}

修改一下前面那个配置类的方法:

//依赖注入 rabbitTemplate 之后再设置它的回调对象
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(myCallBack);
/**
         * true:
         * 交换机无法将消息进行路由时,会将该消息返回给生产者
         * false:
         * 如果发现消息无法进行路由,则直接丢弃
         */
        rabbitTemplate.setMandatory(true);
//设置回退消息交给谁处理
        rabbitTemplate.setReturnCallback(myCallBack);

    }

继续发送消息:http://localhost:11000/confir...可以啊

我们发现,交换机路由不到的队列,也会有反馈了:

3.备份交换机

有了前面那个mandatory参数和回退消息,我们对于无法投递到目的地的消息,可以进行处理了。但是我们在处理这些日志的时候,顶多就是打印了一下日志,然后触发报警,接着手动进行处理。通过日志收集这些无法到达路由的消息非常不优雅,而且手动复制日志非常容易出错。而且mandatory参数设置,还得增加配置类,增加了复杂性。

如果我们不想丢失消息,又不想增加配置类,该怎么做呢?在前面学习死信队列的时候系统学习消息队列——RabbitMQ的死信队列,我们可以为队列设置死信交换机来处理那些失败的消息。

RabbitMQ中有备份交换机这种存在,它就像死信交换机一样,可以用来处理那些路由不到的消息,当交换机接收到一份不可路由的消息的时候,我们就会把这条消息转发到备份交换机中,由备份交换机进行统一处理。

@Configuration
publicclassConfirmQueueConfig {


publicstatic final StringCONFIRM_EXCHANGE_NAME = "confirm.exchange";
publicstatic final StringCONFIRM_QUEUE_NAME = "confirm.queue";
publicstatic final StringBACKUP_EXCHANGE_NAME = "backup.exchange";
publicstatic final StringBACKUP_QUEUE_NAME = "backup.queue";
publicstatic final StringWARNING_QUEUE_NAME = "warning.queue";

// 声明确认队列
@Bean("confirmQueue")
publicQueueconfirmQueue(){
returnQueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }
//声明确认队列绑定关系
@Bean
publicBindingqueueBinding(@Qualifier("confirmQueue") Queue queue,
@Qualifier("confirmExchange") DirectExchange exchange){
returnBindingBuilder.bind(queue).to(exchange).with("key1");
    }

//声明备份 Exchange
@Bean("backupExchange")
publicFanoutExchangebackupExchange(){
returnnewFanoutExchange(BACKUP_EXCHANGE_NAME);
    }

//声明确认 Exchange 交换机的备份交换机
@Bean("confirmExchange")
publicDirectExchangeconfirmExchange(){
//设置该交换机的备份交换机
ExchangeBuilder exchangeBuilder =
ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
                    .durable(true)
                    .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME); 
return (DirectExchange)exchangeBuilder.build();
    }
// 声明警告队列
@Bean("warningQueue")
publicQueuewarningQueue(){
returnQueueBuilder.durable(WARNING_QUEUE_NAME).build();
    }
// 声明报警队列绑定关系
@Bean
publicBindingwarningBinding(@Qualifier("warningQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchange
                                          backupExchange){
returnBindingBuilder.bind(queue).to(backupExchange);
    }
// 声明备份队列
@Bean("backQueue")
publicQueuebackQueue(){
returnQueueBuilder.durable(BACKUP_QUEUE_NAME).build();
    }
// 声明备份队列绑定关系
@Bean
publicBindingbackupBinding(@Qualifier("backQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchange backupExchange){
returnBindingBuilder.bind(queue).to(backupExchange);
    }

}

我们发现,不可路由的消息被发现后,就被送到了报警的备份队列里面。

而且这种配置的优先级,比mandatory参数更高。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2272483.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

封装/前线修饰符/Idea项目结构/package/impore

目录 1. 封装的情景引入 2. 封装的体现 3. 权限修饰符 4. Idea 项目结构 5. package 关键字 6. import 关键字 7. 练习 程序设计:高内聚,低耦合; 高内聚:将类的内部操作“隐藏”起来,不需要外界干涉&#xff1b…

【代码随想录】刷题记录(89)-分发糖果

题目描述: n 个孩子站成一排。给你一个整数数组 ratings 表示每个孩子的评分。 你需要按照以下要求,给这些孩子分发糖果: 每个孩子至少分配到 1 个糖果。相邻两个孩子评分更高的孩子会获得更多的糖果。 请你给每个孩子分发糖果&#xff0…

Ae:合成设置 - 3D 渲染器

Ae菜单:合成/合成设置 Composition/Composition Settings 快捷键:Ctrl K After Effects “合成设置”对话框中的3D 渲染器 3D Renderer选项卡用于选择和配置合成的 3D 渲染器类型,所选渲染器决定了合成中的 3D 图层可以使用的功能&#xff0…

掌握RabbitMQ:全面知识点汇总与实践指南

前言 RabbitMQ 是基于 AMQP 高级消息队列协议的消息队列技术。 特点:它通过发布/订阅模型,实现了服务间的高度解耦。因为消费者不需要确保提供者的存在。 作用:服务间异步通信;顺序消费;定时任务;请求削…

react构建项目报错 `npm install --no-audit --save @testing-l

这应该是我们想构建 react18 的项目,但是 通过 npx create-react-app my-app进行构建时,给我们安装的依赖是 react 19 下面提供一下我的解决方法: 第一步:在 package.json 中把依赖 react19 改为 react 18 第二步:添…

App窗口创建流程(Android12 )

有关的窗口对象 PhoneWindowActivityThread#performLaunchActivity {Activity.attach}Surface new ViewRootImpl 创建null对象mSurface.transferFrom(getOrCreateBLASTSurface())//填充内容创建native层的SurfaceLayerSurfaceFlinger::createLayerRenderSurfaceSurfaceFlinger…

LLM之Agent(十三)| 使用 PydanticAI 框架构建多代理LLM 系统(保姆教程)

Pydantic 是 Python 生态系统中的强大平台,每月下载量超过 2.85 亿次。现在,Pydantic的创始人也正在通过 Pydantic AI 涉足 AI 的前沿领域,Pydantic AI 是一个专为构建由生成式 AI 提供支持的生产级应用程序的框架。在本文中,我们将深入探讨 Pydantic AI 的独特之处、它的主…

常用的数据结构API概览

List ArrayList 1、在初始化一个ArrayList的时候&#xff0c;如果我想同时set一些值 比如存放int[ ] List<int[]> list new ArrayList(Arrays.asList(new int[]{intervals[0][0],intervals[0][1]}));//或者int[] temp new int[]{intervals[0][0],intervals[0][1]}…

年会游戏大全 完整版见考试宝

企业年会游戏大全&#xff08;35个&#xff09; 1.泡泡糖 游戏准备&#xff1a;主持人召集若干人上台&#xff0c;人数最好是奇数。 游戏规则&#xff1a;当大家准备好时&#xff0c;主持人喊“泡泡糖”大家要回应“粘什么”&#xff0c;主持人随机想到身体的某个部位&#x…

用豆包去除文章Ai味和重复率,实操教程

用AI生成的文章总是有“AI味”或者重复率高的问题&#xff1f; 今天就教你如何使用豆包轻松去除这些问题 让你的文章更自然、更具个人风格&#xff01;✍️✨ 详细版指令教程都整理了&#xff0c;纯粹F享啦~

【论文复现】改进麻雀搜索算法优化冷水机组的最优负载调配问题

目录 1.摘要2.麻雀搜索算法SSA原理3.改进策略4.结果展示5.参考文献6.代码获取 1.摘要 为了应对暖通空调&#xff08;HVAC&#xff09;系统由于不当负荷分配导致的高能源消耗问题&#xff0c;本文提出了一种改进麻雀搜索算法&#xff08;ISSA&#xff09;。ISSA算法旨在在满足负…

分布式ID生成-雪花算法实现无状态

雪花算法这里不再赘述&#xff0c;其缺点是有状态&#xff08;多副本隔离时&#xff0c;依赖手动配置workId和datacenterId&#xff09;&#xff0c;代码如下&#xff1a; /*** 雪花算法ID生成器*/ public class SnowflakeIdWorker {/*** 开始时间截 (2017-01-01)*/private st…

四、对象图

对象图 、对象图概述 含义&#xff1a; 对象图显示了某一时刻的一组对象及它们之间的关系。 作用&#xff1a; 对象图可以看做是类图的实例&#xff0c;用来表达各个对象在某一时刻的状态。 组成&#xff1a; 对象图中的建模元素主要有对象和链&#xff0c;对象是类的实…

2025/1/4期末复习 密码学 按老师指点大纲复习

我们都要坚信&#xff0c;道路越是曲折&#xff0c;前途越是光明。 --------------------------------------------------------------------------------------------------------------------------------- 现代密码学 第五版 杨波 第一章 引言 1.1三大主动攻击 1.中断…

【已解决】Django连接mysql报错Did you install mysqlclient?

解决报错&#xff1a;from err django.core.exceptions.ImproperlyConfigured: Error loading MySQLdb module. Did you install mysqlclient&#xff1f; 在终端执行python manage.py makemigrations报错问题汇总 错误1&#xff1a;已安装mysqlclient&#xff0c;提示Did yo…

【C语言】可移植性陷阱与缺陷(七): 除法运算时发生的截断

在C语言编程中&#xff0c;除法运算可能会引发一些与可移植性相关的问题&#xff0c;特别是当涉及到整数除法时发生的截断&#xff08;truncation&#xff09;。不同平台对于整数除法的行为和处理方式可能会有所不同&#xff0c;这可能导致代码在不同编译器或硬件平台上的行为不…

有限元分析学习——Anasys Workbanch第一阶段笔记(7)对称问题预备水杯案例分析

目录 1 序言 2 水杯案例 2.1 添加新材料 2.2 水压设置 2.3 约束边界条件设置及其结果 2.3.1 全约束固定(压缩桌面、Fixed support固定水杯底面) 2.3.2 单方面位移约束(压缩桌面、Displacement约束软弹簧) 2.3.3 接触约束(不压缩桌面、Fixed support 固定桌面、Frictional…

Spring Boot(4)使用 IDEA 搭建 Spring Boot+MyBatis 项目全流程实战

文章目录 一、⚡搞个引言二、⚡开始搭建 Spring Boot 项目吧&#xff01;2.1 启动 IDEA 并创建新项目2.2 选择项目依赖2.3 完成项目创建 三、&#x1f4d8;项目结构剖析四、✍配置数据库连接五、✍ 创建 MyBatis 相关组件5.1 实体类&#xff08;Entity&#xff09;5.2 Mapper 接…

[服务器][教程]Ubuntu24.04 Server开机自动挂载硬盘教程

1. 查看硬盘ID ls -l /dev/disk/by-uuid可以看到对应的UUID所对应的分区 2. 创建挂载文件夹 创建好文件夹即可 3. 修改配置文件 sudo vim /etc/fstab把对应的UUID和创建的挂载目录对应即可 其中# Personal mount points下面的是自己新添加的 &#xff1a;分区定位&#xff…

抢先体验:人大金仓数据库管理系统KingbaseES V9 最新版本 CentOS 7.9 部署体验

一、简介 KingbaseES 是中国人大金仓信息技术股份有限公司自主研发的一款通用关系型数据库管理系统&#xff08;RDBMS&#xff09;。 作为国产数据库的杰出代表&#xff0c;它专为中国市场设计&#xff0c;广泛应用于政府、金融、能源、电信等关键行业&#xff0c;以高安全性…