RabbitMQ消息的发布确认机制详解

news2025/1/11 5:05:19

RabbitMQ发布确认机制确保消息从生产者成功传输到交换机和队列,提高系统可靠性。在Spring Boot项目中,通过配置publisher-confirm-typepublisher-returns,启用发布确认和消息返回机制。配置RabbitTemplate的确认回调和返回回调,可以捕捉消息传输状态,处理不同传输结果。测试场景包括消息无法到达交换机、消息到达交换机但无法到达队列以及消息成功到达队列。通过合理设置和优化,可以确保高并发环境下的消息可靠传输,适用于金融支付、电商系统等对消息传输可靠性要求高的场景。

1. RabbitMQ发布确认机制概述

发布确认(Publisher Confirms)是RabbitMQ提供的一种机制,用于确保消息从生产者发送到RabbitMQ服务器并被成功处理。与事务机制不同,发布确认的性能开销更小,非常适合高吞吐量的场景。发布确认机制提供了两种类型的确认:

  • 消息到达交换机(Exchange)后的确认
  • 消息从交换机路由到队列(Queue)后的确认

2. 配置文件中添加发布确认相关配置

在Spring Boot项目中,通过配置文件来启用发布确认机制非常方便。以下是需要添加到application.propertiesapplication.yml中的配置:

# 消息到达交换机后会回调发送者
spring.rabbitmq.publisher-confirm-type=correlated
# 消息无法路由到队列时回调发送者
spring.rabbitmq.publisher-returns=true

配置解释:

  • publisher-confirm-type:设置为correlated表示使用CorrelationData来关联确认与发送的消息。
  • publisher-returns:设置为true表示启用消息返回机制,当消息无法路由到队列时会触发回调。

3. 发布确认类型

在Spring AMQP中,发布确认类型通过ConfirmType枚举类来定义:

public enum ConfirmType {
    SIMPLE,     // 使用 RabbitTemplate#waitForConfirms() 或 waitForConfirmsOrDie()
    CORRELATED, // 使用 CorrelationData 关联确认与发送的消息
    NONE        // 不启用发布确认
}

4. 配置RabbitTemplate

为了使用发布确认机制,需要配置RabbitTemplate,包括设置确认回调和返回回调:

@Slf4j
@Configuration
public class RabbitTemplateConfig {

    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);

        // 设置mandatory为true,当找不到队列时,broker会调用basic.return方法将消息返还给生产者
        rabbitTemplate.setMandatory(true);

        // 设置确认回调
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("消息已经到达Exchange");
            } else {
                log.info("消息没有到达Exchange");
            }
            if (correlationData != null) {
                log.info("相关数据:" + correlationData);
            }
            if (cause != null) {
                log.info("原因:" + cause);
            }
        });

        // 设置返回回调
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("消息无法到达队列时触发");
            log.info("ReturnCallback:     " + "消息:" + message);
            log.info("ReturnCallback:     " + "回应码:" + replyCode);
            log.info("ReturnCallback:     " + "回应信息:" + replyText);
            log.info("ReturnCallback:     " + "交换机:" + exchange);
            log.info("ReturnCallback:     " + "路由键:" + routingKey);
        });

        return rabbitTemplate;
    }
}

5. 配置测试交换机和队列

为了测试发布确认机制,我们需要配置相应的交换机和队列:

@Slf4j
@Configuration
public class ConfirmConfig {

    @Bean
    public Queue confirmQueue() {
        return new Queue(Constant.CONFIRM_QUEUE, false);
    }

    @Bean
    DirectExchange confirmExchange() {
        DirectExchange directExchange = new DirectExchange(Constant.CONFIRM_EXCHANGE, false, false);
        directExchange.addArgument("alternate-exchange", Constant.CONFIRM_BACKUP_EXCHANGE);
        return directExchange;
    }

    @Bean
    Binding bindingConfirm() {
        return BindingBuilder.bind(confirmQueue()).to(confirmExchange()).with(Constant.CONFIRM_ROUTING_KEY);
    }

    @Bean
    FanoutExchange backupExchange() {
        return new FanoutExchange(Constant.CONFIRM_BACKUP_EXCHANGE, false, false);
    }

    @Bean
    public Queue backupQueue() {
        return new Queue(Constant.CONFIRM_BACKUP_QUEUE, false);
    }

    @Bean
    public Queue warningQueue() {
        return new Queue(Constant.CONFIRM_WARNING_QUEUE, false);
    }

    @Bean
    Binding bindingConfirmBackup() {
        return BindingBuilder.bind(backupQueue()).to(backupExchange());
    }

    @Bean
    Binding bindingConfirmWarning() {
        return BindingBuilder.bind(warningQueue()).to(backupExchange());
    }
}

6. 测试场景及分析

6.1 消息无法到达交换机

测试代码:

@Autowired
RabbitTemplate rabbitTemplate;
String msg = "一条用于发布确认的消息";

@GetMapping("/noExchange")
public void noExchange() {
    rabbitTemplate.convertAndSend("noExchange", "noExchange", msg);
}

配置了rabbitTemplate.setMandatory(true),当消息无法到达交换机时会回调:

ConfirmCallback 消息没有到达Exchange
ConfirmCallback 原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noExchange' in vhost '/', class-id=60, method-id=40)

6.2 消息到达交换机但无法到达队列

测试代码:

@GetMapping("/toExchange")
public void toExchange() {
    rabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE, "xxx.xxx.xxx", msg);
}

输出:

ConfirmCallback 消息已经到达Exchange

没有收到无法到达队列的消息,是因为配置了备份队列,消息被路由到了备份队列。

6.3 注掉备份队列再试

修改配置:

@Bean
DirectExchange confirmExchange() {
    DirectExchange directExchange = new DirectExchange(Constant.CONFIRM_EXCHANGE, true, false);
    return directExchange;
}

测试结果:

消息无法到达队列时触发
ReturnCallback:     消息:(Body:'一条用于发布确认的消息' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
ReturnCallback:     回应码:312
ReturnCallback:     回应信息:NO_ROUTE
ReturnCallback:     交换机:myConfirmExchange
ReturnCallback:     路由键:xxx.xxx.xxx
ConfirmCallback 消息已经到达Exchange

此时,ConfirmCallbackReturnCallback都被调用了。

6.4 成功到达队列

测试代码:

@GetMapping("/toQueue")
public void toQueue() {
    rabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE, Constant.CONFIRM_ROUTING_KEY, msg);
}

输出:

ConfirmCallback 消息已经到达Exchange

7. 发布确认流程

下图展示了RabbitMQ发布确认流程:

image.png

8. 深入解析RabbitMQ发布确认机制

8.1 事务机制与发布确认机制的对比

事务机制和发布确认机制都是确保消息可靠投递的手段,但它们在实现和性能方面有明显区别:

  • 事务机制:通过txSelecttxCommittxRollback实现,性能开销较大,不适合高并发场景。
  • 发布确认机制:通过异步确认消息是否成功到达交换机和队列,性能开销小,适合高并发场景。

8.2 发布确认机制的优缺点

优点
  1. 性能高:相比事务机制,发布确认机制对性能的影响较小。
  2. 异步处理:使用回调函数处理确认结果,不阻塞消息发送。
  3. 可靠性高:确保消息成功到达交换机和队列,提高系统可靠性。
缺点
  1. 实现复杂:需要配置和处理回调函数,增加了代码复杂度。
  2. 延迟高:确认机制引入了额外的网络延迟。

8.3 发布确认机制的应用场景

  1. 金融支付系统:确保支付消息的可靠传输,避免重复支付或支付丢失。
  2. 电商系统:确保订单消息的可靠传输,避免订单丢失或重复处理。
  3. 日志系统:确保日志消息的可靠传输,避免日志丢

失。

8.4 发布确认机制的最佳实践

  1. 合理设置超时时间:在高并发场景下,设置合理的超时时间,避免消息发送阻塞。
  2. 优化回调函数:回调函数中避免复杂逻辑,确保回调处理快速完成。
  3. 监控和报警:建立监控机制,及时发现和处理消息投递失败问题。

9. 总结

本文详细介绍了RabbitMQ消息的发布确认机制,包括配置、实现及其在不同场景下的表现。通过合理配置和使用发布确认机制,可以有效提高消息传输的可靠性,确保消息在高并发环境下的可靠投递。希望本文能够帮助读者深入理解并应用RabbitMQ的发布确认机制,提高系统的可靠性和性能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1797597.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java面试——中间件

OpenFeign 1、openFeign是一个HTTP客户端&#xff0c;它融合了springmvc的注解&#xff0c;使之可以用REST风格的映射来请求转发。 2、可以把openFegin理解为是controller层或是service层。可以取代springmvc控制层作为请求映射&#xff0c;亦或是作为service层处理逻辑&#…

镜头效果技术在AI绘画中的革新作用

随着人工智能技术的飞速发展&#xff0c;AI绘画已经成为艺术与科技交汇的前沿领域。在这一领域中&#xff0c;镜头效果技术的应用不仅为艺术家和设计师们提供了全新的创作工具&#xff0c;更在艺术创作中扮演了革命性的角色。本文将深入探讨镜头效果技术在AI绘画中的应用&#…

【uni-app】uniapp页面与组件生命周期介绍

uniapp应用开发过程中经常会在不同的时机触发一些事件&#xff0c;这篇文章主要是总结一下uniapp常用的一些生命周期钩子。 不同的环境运行可能有差异,下图为微信小程序执行图示 1. 应用生命周期 函数名说明onLaunch当uni-app 初始化完成时触发&#xff08;全局只触发一次&…

Ubuntu 22.04安装cuda及Pytorch教程

文章目录 1、安装显卡驱动2、安装CUDA3、安装cuDNN4、安装pyTorch5、卸载CUDA参考资料 服务器重装系统后&#xff0c;需要重新安装显卡驱动、cuda及Pytorch等&#xff0c;有些步骤容易忘记&#xff0c;这里记录一下。这里我的服务器配置以及安装版本的情况如下&#xff1a; 服…

E: Unable to locate package ros-kinetic-usb-cam

mkdir -p USB/src && cd USB/src catkin_init_workspace git clone https://github.com/bosch-ros-pkg/usb_cam.git cd .. catkin_make source devel/setup.bash echo "source ~/USB/devel/setup.bash" >> ~/.bashrc source ~/.bashrc 编译过程报错&…

undefined symbol: _ZN3c104impl8GPUTrace13gpu mmcv

这里写自定义目录标题 ImportError: //python3.8/site-packages/mmcv/_ext.cpython-38-x86_64-linux-gnu.so: undefined symbol: _ZN3c104impl8GPUTrace13gpuTraceStateEERROR conda.cli.main_run:execute(49): 这样的问题往往都是版本不匹配导致的 pytorch的版本&#xff0c;m…

Flink 基于 TDMQ Apache Pulsar 的离线场景使用实践

背景 Apache Flink 是一个开源的流处理和批处理框架&#xff0c;具有高吞吐量、低延迟的流式引擎&#xff0c;支持事件时间处理和状态管理&#xff0c;以及确保在机器故障时的容错性和一次性语义。Flink 的核心是一个分布式流数据处理引擎&#xff0c;支持 Java、Scala、Pytho…

软件管理及部分命令

sed命令 格式&#xff1a; sed [选项] 操作 目标文件 选项&#xff1a; -i&#xff1a;修改原始文件【如果不加-i&#xff0c;那就是仅仅修改内存中的文件副本】 案例&#xff1a;将1.txt中的tom修改成jerry。 sed -i "s/tom/jerry/g" 1.txt 将1…

数字驱动:企业发展的火箭助推器!

​ 在这个数字经济时代&#xff0c;数据就像火箭燃料&#xff0c;而数字驱动则是那强大的火箭助推器&#xff01;它正以惊人的力量助力企业飞速发展&#xff01; 数字驱动&#xff0c;助力企业发展的超强引擎&#xff01; 用数据说话&#xff0c;决策不再盲目&#xff01; 以数…

Java学习书籍推荐

本文推荐了Java基础&#xff0c;并发&#xff0c;虚拟机学习过程中&#xff0c;比较好的书籍&#xff0c;如果大家需要视频教程&#xff0c;可参考【软件开发】Java学习路线 或者B站文件夹同时会收藏其他Java视频&#xff0c;感谢关注。 指路&#xff1a;Java学习-创建者&…

RabbitMQ--Hello World(基础详解)

文章目录 先决条件RabbitMQ 初识RabbitMQ--Hello World发送接收 更多相关内容可查看 先决条件 本教程假定 RabbitMQ 已安装并在标准端口 &#xff08;5672&#xff09; 上运行。如果你 使用不同的主机、端口或凭据&#xff0c;连接设置将需要 调整。如未安装可查看Windows下载…

短视频矩阵系统----可视化剪辑独立开发(采用php)

短视频矩阵系统源头技术开发&#xff1a; 打磨短视频矩阵系统的开发规则核心框架可以按照以下几个步骤进行&#xff1a; 明确系统需求&#xff1a;首先明确系统的功能需求&#xff0c;包括短视频的上传、编辑、发布、播放等环节。确定系统的目标用户和主要的使用场景&#xff…

【数据结构】栈和队列-->理解和实现(赋源码)

Toc 欢迎光临我的Blog&#xff0c;喜欢就点歌关注吧♥ 前面介绍了顺序表、单链表、双向循环链表&#xff0c;基本上已经结束了链表的讲解&#xff0c;今天谈一下栈、队列。可以简单的说是前面学习的一特殊化实现&#xff0c;但是总体是相似的。 前言 栈是一种特殊的线性表&…

深入ES6:解锁 JavaScript 类与继承的高级玩法

个人主页&#xff1a;学习前端的小z 个人专栏&#xff1a;JavaScript 精粹 本专栏旨在分享记录每日学习的前端知识和学习笔记的归纳总结&#xff0c;欢迎大家在评论区交流讨论&#xff01; ES5、ES6介绍 文章目录 &#x1f4af;Class&#x1f35f;1 类的由来&#x1f35f;2 co…

手把手Linux高可hadoop集群的搭建

高可用集群的搭建 在搭建高可用集群之前&#xff0c;如果搭建了完全分布式hadoop&#xff0c;先执行stop-all.sh停掉所有的服务&#xff0c;只保留jdk和zookeeper的2个服务&#xff0c;然后再去搭建。 目标&#xff1a; 高可用集群简介部署Hadoop高可用集群 一&#xff0e;…

java:spring cloud使用tcc-transaction实现分布式事务

# 安装tcc-transaction server和dashboard 参考这篇文章【https://changmingxie.github.io/zh-cn/docs/ops/server/deploy-alone.html】里面有mysql的建表脚本&#xff0c;先将数据库建好。 下载tcc-transaction cd /chz/install/tcc-transaction wget https://github.com/ch…

webgl_framebuffer_texture

ThreeJS 官方案例学习&#xff08;webgl_framebuffer_texture&#xff09; 1.效果图 2.源码 <template><div><div id"container"></div><div id"selection"><div></div></div></div> </templa…

嵌入式Linux系统编程 — 2.3 标准I/O库:格式化I/O

目录 1 格式化I/O简介 2 格式化输出 2.1 格式化输出函数简介 2.2 格式控制字符串 format 2.3 示例程序 3 格式化输入 3.1 格式化输入简介 3.2 格式控制字符串 format 3.3 示例程序 1 格式化I/O简介 在先前示例代码中&#xff0c;经常使用库函数 printf() 来输出程序中…

操作系统教材第6版——个人笔记6

3.3.4 页面调度 页面调度 当主存空间已满而又需要装入新页时&#xff0c;页式虚拟存储管理必须按照一定的算法把已在主存的一些页调出去 #主存满加新&#xff0c;把已在主存一些页调出选择淘汰页的工作称为页面调度 选择淘汰页的算法称为页面调度算法 页面调度算法设计不当&a…

【递归、搜索与回溯】递归、搜索与回溯准备+递归主题

递归、搜索与回溯准备递归主题 1.递归2.搜索3.回溯与剪枝4.汉诺塔问题5.合并两个有序链表6.反转链表7.两两交换链表中的节点8.Pow(x, n)-快速幂&#xff08;medium&#xff09; 点赞&#x1f44d;&#x1f44d;收藏&#x1f31f;&#x1f31f;关注&#x1f496;&#x1f496; 你…