RabbitMQ发送方确认机制

news2024/11/24 9:38:53

1、前言

RabbitMQ消息首先发送到交换机,然后通过路由键【routingKey】和【bindingKey】比较从而将消息发送到对应的队列【queue】上。在这个过程有两个地方消息可能会丢失:

  1. 消息发送到交换机的过程。
  2. 消息从交换机发送到队列的过程。

而RabbitMQ提供了类似于回调函数的机制来告诉发送方消息是否发送成功。这里针对上述的两种情况,RabbitMQ也是给出了以下的应对策略:

  • publisher-confirm:消息到达交换机时会触发。
  • publisher-return:到达交换机但是没有路由到队列,会返回ack以及失败原因。

2、publisher-confirm

在SpringBoot项目的properties文件中加上

spring.rabbitmq.publisher-confirm-type=correlated

该配置有三个值:

  1. none:是禁用发布确认模式,是默认值
  2. correlated:是发布消息成功到交换器后会触发回调方法
  3. simple:有两种效果,第一种和correlated值一样会触发回调方法;第二种在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker。

RabbitMQ的配置类实现ConfirmCallback

/**
 * @author LoneWalker
 * @date 2023/4/8
 * @description
 */
@Slf4j
@Configuration
public class RabbitMqConfig implements RabbitTemplate.ConfirmCallback {

    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
        //设置给rabbitTemplate
        rabbitTemplate.setConfirmCallback(this);
        return rabbitTemplate;
    }

    @Bean
    public MessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public DirectExchange getExchange(){
        return new DirectExchange("directExchange",false,false);
    }

    @Bean
    public Queue getQueue(){
        return new Queue("publisher.addUser",true,false,false);
    }

    @Bean
    public Binding getBinding(DirectExchange exchange,Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("publisher.addUser");
    }

    /**
     * 消息成功到达交换机会触发
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("交换机收到消息成功:" + correlationData.getId());
        }else {
            log.error("交换机收到消息失败:" + correlationData.getId() + "原因:" + cause);
        }
    }
}

而需要这个correlationData是因为确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突。所以我们改写一下发送消息的方法:

@RequiredArgsConstructor
@Service
public class PublisherServiceImpl implements PublisherService{

    private final RabbitTemplate rabbitTemplate;

    @Override
    public void addUser(User user) {

        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(UUID.randomUUID().toString());

        rabbitTemplate.convertAndSend("directExchange","publisher.addUser",user,correlationData);
    }
}

然后发送消息:

再模拟一下失败的情况——把交换机名称改成错的:

温馨提示:测试完把交换机名称改回去。

3、publisher-return

在SpringBoot项目的properties文件中添加:

spring.rabbitmq.publisher-returns=true
###消息在没有被队列接收时是否强行退回还是直接丢弃
spring.rabbitmq.template.mandatory=true

RabbitMQ的配置类再实现ReturnsCallback

@Slf4j
@Configuration
public class RabbitMqConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
        //设置给rabbitTemplate
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
        rabbitTemplate.setMandatory(true);
        return rabbitTemplate;
    }

    @Bean
    public MessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public DirectExchange getExchange(){
        return new DirectExchange("directExchange",false,false);
    }

    @Bean
    public Queue getQueue(){
        return new Queue("publisher.addUser",true,false,false);
    }

    @Bean
    public Binding getBinding(DirectExchange exchange,Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("publisher.addUser");
    }

    /**
     * 消息成功到达交换机会触发
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("交换机收到消息成功:" + correlationData.getId());
        }else {
            log.error("交换机收到消息失败:" + correlationData.getId() + "原因:" + cause);
        }
    }

    /**
     * 消息未成功到达队列会触发
     * @param returnedMessage
     */
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.error("{}--消息未成功到达队列",returnedMessage.getMessage().getMessageProperties().getMessageId());
    }
}

把路由键改为错误的值:

正常来说消息到达交换机就一定可以到达队列,到不了队列基本上就是代码写错了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/566889.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

中国移动董宁:深耕区块链的第八年,我仍期待挑战丨对话MVP

区块链技术对于多数人来说还是“新鲜”的代名词时,董宁已经成为这项技术的老朋友。 董宁2015年进入区块链领域,现任中国移动研究院技术总监、区块链首席专家。作为“老友”,董宁见证了区块链技术多个爆发式增长和平稳发展的阶段,…

基于STC8G1K08A的水压检测系统

基于STC8G1K08A的水压检测系统 前言先来一饱眼福设计和硬件的选型压力传感器选择单片机的选择WIFI透传模块选择 核心代码的开发STC8G1K08A单片机代码读取水压传感器的电压计算对应电压水的压力值猪场水压正常、漏水、喝光水提醒功能的实现 数据通过ESP8266上报到云端代码的实现…

低功耗定时器(LPTIMER)

概述 LPTIM 是运行在Always-On 电源域下的16bits 低功耗定时/计数器模块。通过选择合适的工作时钟,LPTIM 在在各种低功耗模式下保持运行,并且只消耗很低的功耗。LPTIM 甚至可以在没有内部时钟的条件下工作,因此可实现休眠模式下的外部脉冲计数…

新手怎么玩转Linux

Linux是一个非常强大、灵活和可定制的操作系统,这使得它成为了程序员的首选操作系统之一。程序员喜欢使用Linux的原因有以下几点:开源、稳定性、安全性、命令行界面、社区支持。那么新手改如何玩转Linux呢?跟着我一起来看看吧。 以下是对新…

Meta 开源语音 AI 模型支持 1,100 多种语言

自从ChatGPT火爆以来,各种通用的大型模型层出不穷,GPT4、SAM等等,本周一Meta 又开源了新的语音模型MMS,这个模型号称支持4000多种语言,并且发布了支持1100种语言的预训练模型权重,最主要的是这个模型不仅支…

行业报告 | 2022文化科技十大前沿应用趋势(上)

文 | BFT机器人 前言 Introduction 文化科技是文化科技融合过程中诞生的系列新技术成果,是文化强国和科技强国两大战略的交又领域。2012 年 8月,科技部会同中宣部、财政部、文化部、广电总局、新闻出版总署发布《文化科技创新工程纲要》,开启…

为何AI无法完全理解人类情感?GPT-4能否理解人类的情绪?

在科幻小说和电影里,我们经常看到超级AI人工智能机器人可以理解、感知甚至模拟人类的情感,但在现实世界中,我们距离这个目标还有一段相当长的距离,即使是强大的GPT-4甚至未来的GPT-5。过高夸大AI的体验和性能,往往并不…

gin框架返回json

一、使用gin web框架开发的两种模式: 前端浏览器去请求服务器,服务器把完整的HTML文件的内容返回给前端浏览器Vue、reactor等前端框架都自己提前定义好模板,后端(服务器)只需要返回JSON格式的数据给前端框架即可&…

如何在MyEclipse中使用JavaScript编写代码?

MyEclipse v2022.1.0正式版下载 JavaScript 项目 在 MyEclipse 2021 及更高版本中,JavaScript 支持对大多数 JavaScript 源代码都是开箱即用的——不需要特殊的 JavaScript Eclipse 项目或 JavaScript facet。但是,我们建议使用jsconfig.json文件来指定…

SAP 物料主数据基本数据1视图 参数有效值 字段的作用测试 <转载>

原文链接:https://blog.csdn.net/weixin_40672823/article/details/104773643 1.在物料主数据基本数据1视图中有个字段 参数有效值 如下图 有什么用途? 这个字段作用主要用在 BOM里面,官方说明如下 看说明很难理解下面通过一个业务实例来说明 业务要…

HOOPS平台助力Xometry数字化转型:即时报价产品实现三维模型轻量化、Web端可视化!

所属行业:制造业 挑战:为在线客户的制造平台提供流畅的客户体验、支持使用多种类型CAD文件格式的不同客户群、根据模型提供准确的报价和可制造性反馈、快速准确地可视化定制零 解决方案: HOOPS Platform 提供web端和移动设备的3D数据转换、…

擎创技术流 | 一文读懂eBPF对kubernetes可观测的重要性

一、云原生技术发展的背景与问题 当前,云原生技术主要是以容器技术为基础围绕着 Kubernetes的标准化技术生态,通过标准可扩展的调度、网络、存储、容器运行时接口来提供基础设施,同时通过标准可扩展的声明式资源和控制器来提供运维能力。两层…

Servlet【最复杂的hello world】

目录 一、Hello World 1.创建项目 2.引入依赖 3.创建目录 4.编写代码 4.1 继承 HttpServlet 父类,重写 doGet 方法 4.2 在 doGet 中编写代码,打印 hello world 4.3 给 HelloServlet 加上注解 4.4 完整代码 5.打包代码 6.部署 7.验证程序 二…

分享一个403界面给大家

先看效果图&#xff08;说明&#xff1a;小鬼影会飘来飘去&#xff0c;长时间停留会有小惊喜&#xff0c;具体大家跑一下就知道&#xff09;&#xff1a; 代码如下&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UT…

depends_on 解决 docker 容器依赖问题

如果你经常使用docker-compose启动服务的话&#xff0c;可能会遇到下面的问题&#xff1a;服务 B 依赖服务 A&#xff0c;需要服务 A 先启动&#xff0c;再启动服务 B 举个例子&#xff0c;在部署 kafka 集群的时候&#xff0c;需要启动两个kafka&#xff0c;并使用zookeeper做…

基于云计算和物联网技术开发的智慧校园云平台源码

智慧校园系统是利用物联网和云计算&#xff0c;强调对教学、科研、校园生活和管理的数据采集、智能处理、为管理者和各个角色按需提供智能化的数据分析、教学、学习的智能化服务环境。它包含“智慧环境、智慧学习、智慧服务、智慧管理”等层面的内容。 文末获取联系 它描绘的是…

准备搞个大动作!

目前我们的会员群的同学越来越多&#xff0c;然后我们提供的内容已经从起步篇&#xff0c;趣味篇&#xff0c;工具篇到高级篇了。但是到了高级篇很多内容都跟编程相关&#xff0c;有一点门槛&#xff0c;如果单单看文字是肯定无法满足大家的需求。为了更好的服务大家&#xff0…

跃升数字生产力,九州云受邀出席闵行国际人才月

5月22日&#xff0c;由闵行人才工作领导小组办公室指导、中共闵行区马桥镇委员会及闵行区马桥镇人民政府主办、上海人工智能研究院协办的首届“大零号湾”国际人才月马桥人工智能周成功召开。 本届大会以“AI才共赢 智敬未来”为主题&#xff0c;探讨科技创新的最新动态和趋势&…

如何使用Linux Top命令

Linux中的top命令允许您监视当前正在运行的进程及其使用的系统资源。作为系统管理员&#xff0c;它可能是工具箱中最有用的工具&#xff0c;特别是如果您知道如何使用它的话。所有Linux发行版都预装了top实用程序。通过这个交互式命令&#xff0c;您可以自定义如何浏览进程列表…

电脑蓝屏该如何给电脑重装系统

电脑蓝屏问题是让人头疼的常见故障之一&#xff0c;而重装系统是解决蓝屏问题的有效方法。本文将为您详细介绍如何在电脑蓝屏的情况下进行系统重装&#xff0c;轻松摆脱蓝屏困扰。 工具/原料&#xff1a; 系统版本&#xff1a;windows10系统 品牌型号&#xff1a;华为MateBoo…