RabbitMQ 消息丢失解决 (高级发布确认、消息回退与重发、备份交换机)

news2024/12/28 10:15:34

目录

一、发布确认SpringBoot版本

确认机制图例:

代码实战:

代码架构图:

1.1交换机的发布确认

添加配置类

消息消费者

消息生产者发布消息后的回调接口

测试:

 1.2回退消息并重发(队列的发布确认)

修改回调接口

生产者:

测试:

二、备份交换机

实战

生产者

报警消费者:

测试:


一、发布确认SpringBoot版本

        首先发布消息后进行备份在缓存里,如果消息成功发布确认到交换机,则从缓存里删除该消息,如果没有成功发布,则设置一个定时任务,重新从缓存里获取消息发布到交换机,直到成功发布到交换机。

确认机制图例:

代码实战:

一个交换机:confirm.exchange,一个队列:confirm.queue,一个消费者:confirm.consumer

其中交换机类型时 direct,与队列关联的 routingKey 是 key1

代码架构图:

1.1交换机的发布确认

配置文件中添加:

server:
  port: 8888
spring:
  rabbitmq:
    host: 192.168.163.132
    port: 5672
    username: 2252631565
    password: 2252631565
#    高级发布确认 发布消息成功后将会触发回调方法
    publisher-confirm-type: correlated
  • NONE 值是禁用发布确认模式,是默认值
  • CORRELATED 值是发布消息成功到交换器后会触发回调方法
  • SIMPLE 值经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker;

添加配置类

声明交换机和队列,并且将交换机和队列进行绑定:

@Configuration
public class ConfirmConfig {
    public static final String CONFIRM_EXCHANGE="confirm.exchange";
    public static final String CONFIRM_QUEUE="confirm.queue";
    public static final String ROUTING_KEY="key1";

    @Bean
    public DirectExchange confirmExchange(){
        return new DirectExchange(CONFIRM_EXCHANGE,false,false);
    }

    @Bean
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE).build();
    }

    @Bean
    public Binding EAndQBind(@Qualifier("confirmExchange") DirectExchange exchange,@Qualifier("confirmQueue")Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
    }
}

消息生产者

也可以说是 Controller 层,在这里发送两条消息给两个交换机,其中一个交换机是我们设置好的,另一个交换机不存在;这样就可以清晰看出交换机应答效果。

@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    //高级发布确认模式
    @GetMapping("/sendConfirmMsg/{message}")
    public void sendConfirmMsg(@PathVariable String message){
        log.info("发送一条时长为的消息给第一个队列内容是:{}",new Date().toString(),message);
        CorrelationData correlationData=new CorrelationData("1");
        correlationData.setReturnedMessage(new org.springframework.amqp.core.Message(message.getBytes()));
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,ConfirmConfig.ROUTING_KEY,message,correlationData);
        //向一个不存在的交换机发送消息
        log.info("发送一条时长为的消息给第一个队列内容是:{}",new Date().toString(),message);
        CorrelationData correlationData2=new CorrelationData("2");
        correlationData2.setReturnedMessage(new org.springframework.amqp.core.Message(message.getBytes()));
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE+123,ConfirmConfig.ROUTING_KEY,message,correlationData2);
    }

}

消息消费者

监听 confirm.queue 队列

@Slf4j
@Component
public class ConfirmLetterQueue {
    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE)
    public void confirmConsumer(Message message, Channel channel){
        log.info("收到了消息:{}",new String(message.getBody()));
    }
}

消息生产者发布消息后的回调接口

只要生产者发布消息,交换机不管是否收到消息,都会调用该类的 confirm 方法

@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {


    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        //注入
        rabbitTemplate.setConfirmCallback(this::confirm);
    }
    /*交换机确认回调
    1.交换机收到了消息 触发回调
    1.1 correlationData(我们在发消息的时候自己创建的) 消息的ID以及消息内容
    1.2 ack 交换机收到消息 true
    1.3 cause 交换机收到消息的原因 null
    ---------------------------------
    2.交换机未收到消息 触发回调
    2.1 correlationData 消息的ID以及消息内容
    2.2 ack 交换机未收到消息 false
    2.3 cause 失败原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(ack){
            log.info("发送消息到交换机成功!消息体为:{}",new String(correlationData.getReturned().getMessage().getBody()));
        }else {
            log.info("发送消息到交换机失败!原因为:{}",cause.toString());
        }
    }
}

测试:

 效果很明显,我们配置的交换机成功收到消息并转发给队列;不存在的交换机没有接受到消息并作出反应。

 1.2回退消息并重发(队列的发布确认)

在配置文件中开启消息回退功能

server:
  port: 8888
spring:
  rabbitmq:
    host: 192.168.163.133
    port: 5672
    username: 2252631565
    password: 2252631565
#    高级发布确认 发布消息成功后将会触发回调方法
    publisher-confirm-type: correlated
    #    消息回退 当消息未路由至队列时触发
    publisher-returns: true

修改回调接口

实现 RabbitTemplate.ReturnsCallback 接口,并实现方法

@Slf4j
@Component
public class MyCallBack  implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{


    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        //注入
        rabbitTemplate.setConfirmCallback(this::confirm);
        rabbitTemplate.setReturnsCallback(this::returnedMessage);
    }
    /*交换机确认回调
    1.交换机收到了消息 触发回调
    1.1 correlationData(我们在发消息的时候自己创建的) 消息的ID以及消息内容
    1.2 ack 交换机收到消息 true
    1.3 cause 交换机收到消息的原因 null
    ---------------------------------
    2.交换机未收到消息 触发回调
    2.1 correlationData 消息的ID以及消息内容
    2.2 ack 交换机未收到消息 false
    2.3 cause 失败原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(ack){
            log.info("时间:{}发送消息到交换机成功!",new Date());
        }else {
            log.info("发送消息到交换机失败!原因为:{}",cause.toString());
        }
    }


    //当消息未路由到队列时触发 只有失败时才触发 若消息发送至延迟队列则一定会触发回退 记得根据交换机名称排除延迟队列
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.error("消息:'{}',被交换机:{}回退,回退原因为:{},routingKey为:{}"
                ,new String(returned.getMessage().getBody())
                ,returned.getExchange()
                ,returned.getReplyText()
                ,returned.getRoutingKey());
        //10s后消息重发
        try {
            Thread.sleep(10000);
            log.info("时间:{},生产者重新发消息",new Date());
            rabbitTemplate.convertAndSend(returned.getExchange(),ConfirmConfig.ROUTING_KEY,new String(returned.getMessage().getBody()));
        }catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

生产者:

向交换机中发送消息,指定错误的routingkey,触发队列回退消息并重发消息。

    //高级发布确认模式
    @GetMapping("/sendConfirmMsg/{message}")
    public void sendConfirmMsg(@PathVariable String message){
        //向一个不存在的队列发送消息
        log.info("时间:{}生产者发送一条的消息给第一个队列内容是:{}",new Date().toString(),message);
        CorrelationData correlationData2=new CorrelationData("2");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,ConfirmConfig.ROUTING_KEY+123,message,correlationData2);
    }

测试:

回退未进入队列的消息并重新发送消息。 

二、备份交换机

        什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout (扇出),这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进 入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警并可以重发消息。

实战

需要一个备份交换机 backup.exchange,类型为 fanout,该交换机发送消息到队列 backup.queue 和 warning.queue。

 修改高级确认发布 配置类

@Configuration
public class ConfirmConfig {
    public static final String CONFIRM_EXCHANGE="confirm.exchange";
    public static final String CONFIRM_QUEUE="confirm.queue";
    public static final String ROUTING_KEY="key1";

    //备份交换机
    public static final String BACKUP_EXCHANGE="backup.exchange";
    //备份队列
    public static final String BACKUP_QUEUE="backup.queue";
    //报警队列
    public static final String WARNING_QUEUE="warning.queue";
    @Bean
    public DirectExchange confirmExchange(){
        //绑定确认交换机与备份交换机
        Map<String,Object> argument=new HashMap<>();
        argument.put("alternate-exchange",BACKUP_EXCHANGE);
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE).withArguments(argument).build();
    }

    //备份交换机
    @Bean
    public FanoutExchange backupExchange(){
        return new FanoutExchange(BACKUP_EXCHANGE);
    }
    @Bean
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE).build();
    }
    //备份队列
    @Bean
    public Queue backupQueue(){
        return QueueBuilder.durable(BACKUP_QUEUE).build();
    }
    //警告队列
    @Bean
    public Queue warningQueue(){
        return QueueBuilder.durable(WARNING_QUEUE).build();
    }
    @Bean
    public Binding EAndQBind(@Qualifier("confirmExchange") DirectExchange exchange,@Qualifier("confirmQueue")Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
    }
    //绑定备份交换机与两个队列
    @Bean
    public Binding BAndBBing(@Qualifier("backupExchange") FanoutExchange exchange,@Qualifier("backupQueue")Queue queue){
        return BindingBuilder.bind(queue).to(exchange);
    }
    @Bean
    public Binding BAndWBing(@Qualifier("backupExchange") FanoutExchange exchange,@Qualifier("warningQueue")Queue queue){
        return BindingBuilder.bind(queue).to(exchange);
    }
}

生产者

        生产者发送两条消息 一个配置正确的路由,另一个是错误的路由。预期目标是正确路由正常接收消息,错误路由传输的信息由警告队列接收。

    //高级发布确认模式
    @GetMapping("/sendConfirmMsg/{message}")
    public void sendConfirmMsg(@PathVariable String message){
        log.info("时间:{}生产者发送两条消息队列内容是:{}",new Date().toString(),message);
        CorrelationData correlationData=new CorrelationData("1");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,ConfirmConfig.ROUTING_KEY,message,correlationData);
        //向一个不存在的队列发送消息
        CorrelationData correlationData2=new CorrelationData("2");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,ConfirmConfig.ROUTING_KEY+123,message,correlationData2);
    }

报警消费者:

接收不可路由的消息

/**
 * 报警消费者
 */
@Slf4j
@Component
public class WarningConsumer {
    //监听报警消息
    @RabbitListener(queues = ConfirmConfig.WARNING_QUEUE)
    public void receiveWarningMsg(Message message){
        String msg=new String(message.getBody());
        log.info("报警发现不可路由消息:{},重发消息",msg);
    }
}

测试:

         生产者发送两条消息 一个配置正确的路由,另一个是错误的路由。预期目标是正确路由正常接收消息,错误路由传输的信息由警告队列接收。

        在此案例中,也设置了消息回退的配置,但是没有触发消息回退。由此得出:备份交换机的优先级更高。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1219957.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

让文字在盒子中水平居中与垂直居中

简单方法&#xff1a; 1.先用text-align: center;将文字垂直居中。 2.再用line-height: Xpx;将元素的行高设置为与父元素同样的高度。&#xff08;这里的X代表父元素的高度&#xff09; 举例&#xff1a; 对于该网页的代码如下&#xff1a; <!DOCTYPE html> <html&…

servlet页面以及控制台输出中文乱码

如图&#xff1a; servlet首页面&#xff1a; servlet映射页面&#xff1a; 以及控制台输出打印信息&#xff1a; 以上页面均出现中文乱码 下面依次解决&#xff1a; 1、首页面中文乱码 检查你的html或者jsp页面中meta字符集 如图设置成utf-8 然后重启一下tomcat 2、servl…

Eclipse切换中文环境

PACK包链接 地址&#xff0c;进入后可以看到不同版本的包。 要选择跟自己Eclipse版本一致的包&#xff0c;比如我的Eclipse启动界面如下&#xff0c;我就要找Helios的包&#xff08; Juno、Indigo、Helios、Kepler这些具体怎么划分的我也不清楚&#xff09;。 在线安装 打…

AODNet

【20231117】读研期间没有对阅读的文章进行总结&#xff0c;没想到毕业反而有了机会。即日起会对阅读过的文章要点进行梳理记录&#xff0c;希望这一习惯能够坚持下去。 学术的角度&#xff1a;看论文要学习作者如何逻辑严谨的自证 落地的角度&#xff1a;只用看以下六点&#…

Oracle OCM考试(史上最详细的介绍,需要19c OCP的证书)

Oracle 19c OCM考试和之前版本的OCM考试差不多&#xff0c;对于考生来说最大的难点是题量大&#xff0c;每场3小时&#xff0c;一共4场&#xff0c;敲键盘敲得手抽筋。姚远老师&#xff08;v:dataace&#xff09;的很多Oracle OCP学员都对19c OCM考试很有兴趣&#xff0c;这里给…

企业数字化转型的好处?_光点科技

企业数字化转型是当今商业世界中一个至关重要的议题。数字化转型不仅仅意味着采用新技术&#xff0c;而是涉及到企业在文化、运营和客户体验方面的根本变革。那么&#xff0c;企业数字化转型的好处是什么呢&#xff1f; 1.数字化转型可以显著提高企业的运营效率。 通过自动化流…

8.jib-maven-plugin构建springboot项目镜像,docker部署配置

目录 1.构建、推送镜像 1.1 执行脚本 2.2 pom.xml配置 ​2.部署镜像服务 2.1 执行脚本 2.2 compose文件 3.docker stack常用命令 介绍&#xff1a;使用goole jib插件构建镜像&#xff0c;docker stack启动部署服务&#xff1b; 通过执行两个脚本既可以实现构建镜像、部…

前端对用户名密码加密处理,后端解密

一. 正常表单提交如图&#xff0c;可以看见输入的用户名密码等 二. 使用crypto-js.min.js进行前端加密处理 js资源地址链接&#xff1a;https://pan.baidu.com/s/1kfQZ1SzP6DUz50D–g_ziQ 提取码&#xff1a;1234 前端代码 <script type"text/javascript" src&q…

2023年亚太杯数学建模思路 - 复盘:校园消费行为分析

文章目录 0 赛题思路1 赛题背景2 分析目标3 数据说明4 数据预处理5 数据分析5.1 食堂就餐行为分析5.2 学生消费行为分析 建模资料 0 赛题思路 &#xff08;赛题出来以后第一时间在CSDN分享&#xff09; https://blog.csdn.net/dc_sinor?typeblog 1 赛题背景 校园一卡通是集…

2023年09月 Scratch(一级)真题解析#中国电子学会#全国青少年软件编程等级考试

Scratch等级考试(1~4级)全部真题・点这里 一、单选题(共25题,每题2分,共50分) 第1题 下列哪项内容是不可以修改的?( ) A:角色名称 B:造型名称 C:舞台名称 D:背景名称 答案:C 第2题 要给“古诗朗诵”作品录制配音,可以使用下列哪个按钮?( ) A: B: C:…

LLM建模了什么,为什么需要RAG

LLM近期研究是井喷式产出&#xff0c;如此多的文章该处何处下手&#xff0c;他们到底又在介绍些什么、解决什么问题呢&#xff1f;“为学日增&#xff0c;为道日损”&#xff0c;我们该如何从如此多的论文中找到可以“损之又损以至于无”的更本质道或者说是这个方向的核心模型。…

ElementUI及ElementUI Plus Axure RP高保真交互元件库及模板库

基于ElementUI2.0及ElementUI Plus3.0二次创作的ElementUI 元件库。2个版本的原型图内容会有所不同&#xff0c;ElementUI Plus3.0的交互更加丰富和高级。你可以同时使用这两个版本。 不仅包含Element UI 2.0版&#xff0c;还包含Element Plus 3版本。Element 2版支持Axure 8&…

解析 Python requests 库 POST 请求中的参数顺序问题

在这篇文章中&#xff0c;我们将探讨一个用户在使用Python的requests库进行POST请求时遇到的问题&#xff0c;即参数顺序的不一致。用户通过Fiddler进行网络抓包&#xff0c;发现请求体中的参数顺序与他设置的顺序不符。我们将深入了解POST请求的工作原理&#xff0c;并提供解决…

css继承属性

在css中&#xff0c;继承是指的是给父元素设置一些属性&#xff0c;后代元素会自动拥有这些属性 关于继承属性&#xff0c;可以分成&#xff1a; 字体系列属性文本系列属性元素可见性表格布局属性列表属性引用光标属性 继承中比较特殊的几点&#xff1a; a 标签的字体颜色不…

HTTP HTTPS 独特的魅力

目录 HTTP协议 HTTP协议的工作过程 首行 请求头&#xff08;header&#xff09; HOST Content-Length​编辑 User-Agent&#xff08;简称UA&#xff09; Referer Cookie 空行 正文&#xff08;body&#xff09; HTTP响应详解 状态码 报文格式 HTTP响应格式 如何…

将ECharts图表插入到Word文档中

文章目录 在后端调用JS代码准备ECharts库生成Word文档项目地址库封装本文示例 EChartsGen_DocTemplateTool_Sample 如何通过ECharts在后台生成图片&#xff0c;然后插入到Word文档中&#xff1f; 首先要解决一个问题&#xff1a;总所周知&#xff0c;ECharts是前端的一个图表库…

asp.net实验室设备管理系统VS开发sqlserver数据库web结构c#编程计算机网页源码项目

一、源码特点 asp.net实验室设备管理系统 是一套完善的web设计管理系统&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。 asp.net实验室设备管理系统1 二、功能介绍 本系统使用Microsoft Visual Studio 2019为开发工具&#xff0c;SQL …

国鑫受邀出席2023松山湖软件和信息服务业高质量发展大会

为推动粤港澳大湾区的软件和先进制造产业的融合发展&#xff0c;“2023松山湖软件和信息服务业高质量发展大会”于今日在松山湖畔隆重举办&#xff0c;会议以“推动软件和制造业深度融合发展&#xff0c;打造软件和信息服务业集聚高地”为主题&#xff0c;聚焦工业软件应用、智…

解析CAD图纸出现乱码的原因及解决方法

解析CAD图纸出现乱码的原因及解决方法 CAD&#xff08;计算机辅助设计&#xff09;是现代工程设计中不可或缺的工具&#xff0c;它能够帮助工程师们高效地完成复杂的设计任务。然而&#xff0c;有时在使用CAD软件过程中&#xff0c;可能会遇到图纸出现乱码的问题&#xff0c;影…

【python零基础入门学习】python进阶篇之数据库连接-PyMysql-全都是干货-一起来学习吧!!!

本站以分享各种运维经验和运维所需要的技能为主 《python零基础入门》&#xff1a;python零基础入门学习 《python运维脚本》&#xff1a; python运维脚本实践 《shell》&#xff1a;shell学习 《terraform》持续更新中&#xff1a;terraform_Aws学习零基础入门到最佳实战 《k8…