RabbitMQ —— 深入发布确认

news2024/11/18 19:38:00

前言

        面对在实际的生产环境中RabbitMQ服务宕机或者重启导致消息在投递阶段丢失的问题,我们需要采用消息的发布确认和回退消息两种机制来保证消息的不丢失。在这篇文章中,荔枝同样以demo实例的方式来梳理相关的知识,希望能够帮助到有需要的小伙伴~~~


文章目录

前言

一、发布确认——回调接口和回退消息

1.1 回调接口 

1.2 回退消息 

二、备份交换机

总结


一、发布确认——回调接口和回退消息

        在生产环境中由于一些不明原因,导致rabbitmq重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行RabbitMQ的消息可靠投递呢?特别是在这样比较极端的情况,RabbitMQ集群不可用的时候,无法投递的消息该如何处理呢:

1.1 回调接口 

        前面我们讲到为了防止极端情况下的消息丢失,需要在原有发布确认模式中进行增强。这种丢消息的情况由两种:一是交换机接收不到消息;二是队列接收不到消息。我们先讲第一种情况:交换机接收不到消息的处理机制:回调接口。

配置类配置

要记得开启配置类,有三个参数:None/Correlated/Simple,其中simple会有信道关闭的风险。 

spring.rabbitmq.publisher-confirm-type=correlated

消息发布者 

        消息发布者需要注意在这里需要传入一个correlationData对象,该对象只是一个对消息的唯一标识而已,如果在消息发布者处没有传入,那么回调接口类在重写ConfirmCallback接口的实现方法的时候就不能获取到消息的id值。

package com.crj.rabbitmqtestspringboot.controller;

import com.crj.rabbitmqtestspringboot.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

//发布确认的生产者
@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {
    private RabbitTemplate rabbitTemplate;
    //发消息
    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message){

        CorrelationData correlationData = new CorrelationData("1");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData);
    }
}

可以看到convertAndSend有多种重载的方法,这里我们按需求选择。 

消息接收者

package com.crj.rabbitmqtestspringboot.controller.consumer;

import com.crj.rabbitmqtestspringboot.config.ConfirmConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

//接收消息
@Component
public class Consumer {
    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
    public  void  receiveConfirmMessage(Message message){

        System.out.println(new String(message.getBody()));
    }
}

回调接口的实现类

回调接口实现类需要实现RabbitTemplate类中的ConfirmCallback接口,并通过重写接口原来的confirm方法来获取交换机是否获取消息的状态信息。

回调方法被调用的时机:

  • 发消息 交换机接收到了会开始回调
  • 发消息 交换机接收失败  开始回调

重写的confirm方法中的参数:

  • correlationData 保存回调信息的ID机器相关信息,String类型
  • ack  交换机是否接收到消息,boolean类型
  • reason  失败的原因,String类型
package com.crj.rabbitmqtestspringboot.controller.callback;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Slf4j
@Component
public class MyCallback implements RabbitTemplate.ConfirmCallback {
    /**
     * 因为该接口是内部接口,因此当RabbitTemplate调用ConfirmCallback接口并不能调用到我们定义的实现类
     * 所以我们需要做一下注入的操作
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //注入
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     *交换机确认回调方法
     * 回调方法被调用的时机:
     * 1.发消息 交换机接收到了会开始回调
     * 2.发消息 交换机接收失败  开始回调
     *
     *
     * @param correlationData 保存回调信息的ID机器相关信息
     * @param ack  交换机是否接收到消息boolean
     * @param reason  失败的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String reason) {
        String id = correlationData != null ? correlationData.getId() : "";
        if(ack){
            log.info("交换机已经收到Id为{}的消息",id);
        }else {
            log.info("交换机为收到Id为{}的消息,原因:{}",id,reason);
        }
    }
}

        这里还需要注意的是:由于ConfirmCallback接口是内部接口,因此当RabbitTemplate实例化后调用ConfirmCallback接口实现类的时候并不能调用到我们定义的实现类,因此我们需要做一下注入的操作。

@PostConstruct注解:当依赖注入完成后用于执行初始化的方法,并且只会被执行一次,该注解的执行顺序必须按照上述demo注解的顺序!!!

1.2 回退消息 

        在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃的。因此为了让无法被路由的消息被处理,我们可以通过设置mandatory参数可以在当消息传递过程中不可达时将消息返回给生产者。 简单来说就是:队列如果接收不到消息就会使用回退消息,而消息只有在消息不可达的时候才会采用回退消息给生产者

配置类中开启回退消息

spring.rabbitmq.publisher-returns=true

接口实现类

这里需要注意的是returnedMessage接口方法的重写,荔枝学习的教程重写该方法时候有五个参数,但现在好像已经改了一下,重写该方法的时候仅有一个参数作为方法的形参!

package com.crj.rabbitmqtestspringboot.controller.callback;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Slf4j
@Component
public class MyCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;
    //注入
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String reason) {
        String id = correlationData != null ? correlationData.getId() : "";
        if(ack){
            log.info("交换机已经收到Id为{}的消息",id);
        }else {
            log.info("交换机为收到Id为{}的消息,原因:{}",id,reason);
        }
    }

    //重写ReturnsCallback接口的方法,只有在消息不可达的时候才会调用
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        System.err.println("ReturnedMessage: " + returnedMessage);
    }
}

二、备份交换机

        除了上面的回退消息的解决方法,我们还可以通过加一个备份交换机来备份消息。备份交换机顾名思义就是在原来接收消息的交换机没有确定接收到消息的时候(宕机或者服务中断),将消息发送到备份交换机备份。

关联直接交换机和备份交换机,方便消息转发。这里荔枝仅给出关键代码:withArgument这里会传递一个map参数,设置备份交换机。

//声明交换机
@Bean ("confirmExchange")
public DirectExchange confirmExchange(){

    return ExchangeBuilder.directExchange(CONFIRM EXCHANGE NAME).durable(true)
            .withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME).build();
}

需要注意的是:当消息回退和备份交换机机制同时开启的时候,备份交换机的优先级更高。 


总结

        发布确认机制和回退消息机制其实就是为了保证消息队列中的消息的安全性,也是为了防止在发送后消息的不丢失以及回退重发来保证业务功能的正常运行。从这两个角度去理解整个机制会有更好的体会哈哈哈哈~

今朝已然成为过去,明日依然向往未来!我是荔枝,在技术成长之路上与您相伴~~~

如果博文对您有帮助的话,可以给荔枝一键三连嘿,您的支持和鼓励是荔枝最大的动力!

如果博文内容有误,也欢迎各位大佬在下方评论区批评指正!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1025138.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

土耳其市场最全开发攻略

2023年6月1日起,亚马逊土耳其站开始正式面向中国卖家试运营。土耳其这个古丝绸之路西域的重要节点,再一次吸引了来自东方众商家的目光。 土耳其是一个著名的贸易中心,向世界提供许多出口产品。土耳其总统埃尔多安指出,根据调整后…

[Linux 基础] linux基础指令(2)head,tail,Cal,find,grep,zip/unzip,tar,bc,uname

文章目录 1、head指令2、tail指令引申:如何拿到中间行内容方案一:方案二:方案三: 补充指令:(1) wc -l 文件名(2) uniq 文件名(3) sort 文件名 3、时间相关的指令4、Cal指令5、find指令:(灰常重要…

如何设置代理ip服务器地址

目录 前言 一、使用HTTP代理服务器 1. Python代码 2. Java代码 二、使用SOCKS代理服务器 1. Python代码 2. Java代码 三、使用代理池 1. Python代码 2. Java代码 总结 前言 代理服务器是一种可以隐藏真实IP地址并且保护用户隐私的工具。在某些情况下,比…

「聊设计模式」之备忘录模式(Memento)

🏆本文收录于《聊设计模式》专栏,专门攻坚指数级提升,助你一臂之力,带你早日登顶🚀,欢迎持续关注&&收藏&&订阅! 前言 设计模式是软件设计中经典的解决方案,旨在解决…

中科驭数联合处理器芯片全国重点实验室获得FPL 2023最佳论文奖

在2023年的FPGA领域顶级会议FPL (International Conference on Field Programmable Logic and Applications) 上,由中科驭数团队、中国科学院计算技术研究所处理器芯片全国重点实验室团队共同完成的论文《Co-ViSu: a Video Super-Resolution Accelerator Exploiting…

高教杯数学建模竞赛A题文章写作要点与示范

数学建模竞赛写作最重要的一点 LaTeX 很重要 非常重要 非常重要一定要规范 美观 写作注意事项 标准的附录详实的支撑材料和清晰的支撑材料说明 文章中所有的图片都应该包含在支撑材料中正确得引用参考文献模型的评价部分应当包含 模型优点模型缺点改进方案图像绘制应当标准假设…

通过uni.chooseImage返回的临时路径转为base64

uniapp官方API文档:https://uniapp.dcloud.net.cn/api/media/image.html#chooseimage 代码在后面 chooseimage的succes函数中的res.tempFilePaths,是图片的一个临时路径,没法直接传给后端接口使用,且接口需要的是base64格式的 ge…

使用dokcer部署分布式任务调度平台XXL-JOB

XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。 部署docker略有需要看下面文档即可 ​​​​​​yum安装docker以及安装指定版本docker_yum安装指定版本dock…

迅为RK3588开发板修改编译工具路径

1 因为此章节以 rknn_yolov5_demo 在 RK3588 Linux 64 位平台上运行为例,所以修改 examples/rknn_yolov5_demo/build-XXX.sh 的编译工具路径,如下图所示: 2 修改 build-linux_RK3588.sh 文件,将 TOOL_CHAIN 修改为 gcc-arm-10.3…

set和map通过一颗红黑树进行封装

T是什么我不知道,但是我知道set 那么T一定得是key , map一定得是pair的数据类型 得用一颗模板是红黑树,实例化出map和set ---------------------------------------------------------------------------------------------------------…

如何与QVC 建立EDI连接?

QVC,全称为Quality, Value, Convenience(品质、价值、便利),成立于1986年,是一家全球领先的零售电视和在线零售商。作为一家多渠道零售商,QVC致力于为客户提供高品质、独特的商品,通过电视、互联…

山石网科国产化防火墙,打造全方位边界安全解决方案

互联网的快速发展促进了各行各业的信息化建设,但也随之带来了诸多网络安全风险。大部分组织机构采用统一互联网接入方案,互联网出口承担着内部用户访问互联网的统一出口和对外信息服务的入口,因此在该区域部署相匹配的安全防护手段必不可少。…

Spring WebFlux使用未加前缀的双通配符模式绕过安全性CVE-2023-34034

文章目录 0.前言漏洞漏洞介绍描述 1.参考文档2.基础介绍3.解决方案3.1. 升级版本 4.漏洞修复源码分析5. 漏洞利用示例 0.前言 背景:公司项目扫描到 WebFlux中使用"**"作为模式会导致Spring Security和Spring WebFlux之间 CVE-2023-34034漏洞 漏洞 高 | 2…

junit.Test误踩坑,识别不到@Test注解,无法运行测试方法

问题的出现源自于下面的一段代码: 在这一段代码中,只看到可以运行的main方法,无法看到test方法可以运行的标志。 只能运行main()方法。 开始排查,对junit包的导入进行检查,发现是没有问题的。 怀疑是否是IntelliJ IDE…

lazada跨境电商商品数据采集

Lazada跨境电商商品数据采集可以使用以下两种方法: 手动采集:可以在Lazada网站上手动搜索商品信息,然后复制粘贴到Excel表格中。这种方法比较麻烦,需要逐个搜索和记录商品信息,适合采集数量较少的商品数据。使用采集软…

无涯教程-JavaScript - PI函数

描述 PI函数返回数字3.14159265358979,数学常数pi,精确到15位数字。 语法 PI ()争论 PI函数语法没有参数。 适用性 Excel 2007,Excel 2010,Excel 2013,Excel 2016 Example JavaScript 中的 PI函数 - 无涯教程网无涯教程网提供描述PI函数返回数字3.14159265358979,数学常…

基于Android+OpenCV+CNN+Keras的智能手语数字实时翻译——深度学习算法应用(含Python、ipynb工程源码)+数据集(二)

目录 前言总体设计系统整体结构图系统流程图 运行环境模块实现1. 数据预处理2. 数据增强3. 模型构建1)定义模型结构2)优化损失函数 相关其它博客工程源代码下载其它资料下载 前言 本项目依赖于Keras深度学习模型,旨在对手语进行分类和实时识…

Vue.js入门模板语法[上] 及Vue.js实现购物车---详细讲解

前言 前面我们学习了Vue的基础入门,接下来我们学习有关Vue的模板语法,学习Vue语法能提高我们的前端开发效率 Vue.js 使用了基于 HTML 的模板语法,允许开发者声明式地将 DOM 绑定至底层 Vue 实例的数据。所有 Vue.js 的模板都是合法的 HTML &a…

肖sir__mysql之存储练习题__013

实验 一、 实验要求: 理解存储过程的概念掌握存储过程的语法格式、使用方法掌握存 储过程的创建、执行 二、实验前提: – drop table if exists student; – Create table student – (Id varchar(255), #学号 – Name varchar(255), #姓名 – Roomid…

4、wireshark使用教程

文章目录 一、wireshark简介二、环境三、wireshark抓包三、wireshark过滤器使用 一、wireshark简介 Wireshark是使用最广泛的一款「开源抓包软件」,常用来检测网络问题、攻击溯源、或者分析底层通信机制。 Wireshark抓包原理: 单机情况:电脑…