RabbitMQ如何保证消息的可靠性6000字详解

news2024/11/27 19:56:36

RabbitMQ通过生产者、消费者以及MQ Broker达到了解耦的特点,实现了异步通讯等一些优点,但是在消息的传递中引入了MQ Broker必然会带来一些其他问题,比如如何保证消息在传输过程中可靠性(即不让数据丢失,发送一次消息就会被消费一次)?这篇博客将详细从生产者,MQ Broker以及消费者的角度讲解如何保证消息的可靠性!

1,消息丢失的情况

1.1 消息传递流程图如下

 Producer -> exchange ->queue -> Consumer(其中exchange和queue属于MQ Broker的组件)

1.2 消息可能丢失的情况

  • 生产者给交换机exchange的过程中发生数据丢失;
  • 交换机exchange路由给队列queue的过程中发生数据丢失;
  • 消息到达MQ的一瞬间,MQ发生了宕机的情况造成数据丢失;
  • 消费者从队列queue中取出消息进行消费的一瞬间消费者宕机了造成数据丢失。

2,生产者确认机制

生产者确认机制主要是站在生产者的角度来保证消息的可靠性,针对的是生产者给交换机发送消息以及交换机给队列发送消息的过程中数据丢失的情况!

2.1 书写配置信息

# 配置日志信息
logging:
  pattern:
    dateformat: HH:mm:ss:SSS
  level:
    cn.itcast: debug

spring:
  rabbitmq:
    host: 123.207.72.43 # rabbitMQ的ip地址
    port: 5672 # 端口
    username: admin
    password: 123
    virtual-host: /
    publisher-confirm-type: correlated
    publisher-returns: true
    #消息发送失败时执行returnCallback回调函数
    template:
      mandatory: true
  • publisher-confirm-type表示开启publisher-confirm;这个参数有两种类型,分别是correlated和simple(correlated代表异步等待回调,类似于js中发送的ajax请求的回调函数,MQ返回结果时会执行定义的confirmCallback函数;simple代表同步等待confirm结果直到超时);
  • publisher-returns表示开启publish-return功能,同样是基于callback机制,不过是定义returnCallback;
  • template.mandatory定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息。

2.2 定义return回调机制

我们使用的是SpringBoot来整合的RabbitMQ,所以不论是return回调还是confim回调都是用rabbittemplate对象进行定义的。

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        //获取获取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        //配置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            //记录日志
            log.error("消息发送队列失败,响应码:{},失败原因:{},交换机:{},路由key:{},消息:{}",
                    replyCode,replyText,exchange,routingKey,message.toString());
            //如果需要的话进行消息的重发
        });
    }
}

注意:

  1. 一个RabbitTemplate只能配置一个ReturnCallback,所以需要在项目启动的时候进行定义,这样rabbitTemplate就是全局唯一的了(也可以采用PostConstruct注解中的init方法进行定义);
  2. ApplicationContextAware是Spring创建完Bean工厂之后的通知方法,当Spring创建完Bean工厂之后就可以在Spring容器中拿到RabbitTemplate对象了;
  3. 配置ReturnCallback时可以采用匿名内部类的方法简化代码,如果消息发送失败可以根据需要进行消息重发操作。

2.3 定义confirm回调机制

ConfirmCallback可以在发送消息时指定,因为每个业务处理confirm成功或失败的逻辑不一定相同,可以通过测试方法进行定义。

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage() throws InterruptedException {
        //1.准备消息
        String message = "hello spring amqp";

        //2.准备CorrelationData
        //2.1 消息ID
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        //2.2 准备准备ConfirmCallback
        correlationData.getFuture().addCallback(confirm -> {
            if (confirm.isAck()) {
                log.debug("消息成功投递到交换机!消息ID:{}", correlationData.getId());
            } else {
                log.error("消息投递到交换机上失败!消息ID:{}", correlationData.getId());
                //重发消息
            }
        }, throwable -> {
            //记录日志
            log.error("发送消息失败!",throwable);
            //重发消息
        });

        //3.发送消息
        rabbitTemplate.convertAndSend("amq.topic","a.simple.hello",message,correlationData);
        //加上休眠时间 避免mq连接直接关闭
        Thread.sleep(1000);
    }

注意:

  1. 生产者给交换机发送的消息数据很多的,为了区分每个消息的归属,每个消息都要附属上一个ID信息,可以采用UUID的方式生成唯一身份标识;
  2. 在发送消息的时候需要增加一个correlation变量,这个变量记录了两个东西(1.每个消息的ID 2.定义的cinfirm回调机制);
  3. 加上线程休眠的操作是为了避免消息发送到交换机之后mq的连接直接关闭,这样会导致返回ack的错误。

3,消息持久化

消息持久化是站在MQ Broker的角度来保证消息的可靠性的,将交换机、队列以及消息设置成持久化的从而避免MQ宕机造成消息的丢失!

3.1 交换机持久化

@Bean
    public DirectExchange simpleDirect(){
        return new DirectExchange("simple.direct",true,false);
    }

第二个参数设置成true就是让就交换机是可持久化的,第三个参数是是否自动删除,一般设为false;

3.2 队列持久化

@Bean
    public Queue simpleQueue(){
        return QueueBuilder.durable("simple.queue").build();
    }

durable的意思就是可持久化的,传入队列名称然后进行build操作,这样创建的队列就是一个可持久化的队列;

3.3 消息持久化

将交换机和队列设置为持久化的之后重启MQ服务器之后消息依然会丢失,因为发送的消息不是可持久化的,所以也需要将消息设置成可持久化的

4,消费者消息确认

消费者消息确认是站在消费者的角度来保证消息可靠性的,消息者处理完一条消息之后需要给MQ Broker返回一条ACK表示消息处理完成!

4.1 三种确认模式

RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。而SpringAMQP则允许配置三种确认模式:

  • manual:手动ack,需要在业务代码结束后,调用api发送ack;
  • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack;
  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除。

4.2 none模式的演示

1.修改消费者工程中的配置文件

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none # 关闭ack

2.监听一个队列,在监听的方法中模拟一个异常情况,观察消息是否会被删除

@RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg) {
        log.debug("消费者接收到simple.queue的消息:【" + msg + "】");
        //这里模拟一个异常
        System.out.println(1 / 0);
        log.info("消费者处理消息成功!");
    }

3.在rabbitmq控制台模拟发送一条消息,观察抛出异常之后消息是否会重发

 抛出异常消费者并没有处理消息成功,再观察控制台是否将消息删除:

 队列中已经没有消息了,说明消息被删除了!

消费者确认机制为none的时候,只要消费者拿到消息之后MQ就会把消息删除,不关心消费者是否将消息成功处理!

4.3 auto模式的演示

1.修改消费者工程中的配置文件

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 关闭ack

 2.监听一个队列,在监听的方法中模拟一个异常情况,观察消息是否会被删除

@RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg) {
        log.debug("消费者接收到simple.queue的消息:【" + msg + "】");
        //这里模拟一个异常
        System.out.println(1 / 0);
        log.info("消费者处理消息成功!");
    }

3.在rabbitmq控制台模拟发送一条消息,观察抛出异常之后消息是否会重发

消费者确认机制为auto的时候,消费者拿到消息之后MQ并不会立刻删除队列中的消息,只有消费者成功处理完消息之后给队列返回一个ack的时候队列才会删除消息!

5, 消费者失败重试机制

我们发现当消费者确认机制为auto时,如果代码中出现了异常,消息会进行重复入队列(requeue)的操作,重复入队的操作对于MQ来说开销会非常大,消息处理飙升,所以引入了失败重试机制:当代码中出现了异常的时候,消费者内部会进行重发的操作(可以控制重发的时间和次数),如果超过设置的重发次数消费者还未成功处理消息默认将消息丢弃!

5.1 本地重试

Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列,可以在消费者工程的yml文件中添加如下配置:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初识的失败等待时长为1秒
          multiplier: 3 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 4 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

 4次重发之后消息还未成功处理spring抛出了AmqpRejectAndDontRequeueException异常,这是失败之后的默认处理方式,默认消费者给队列返回了ack,此时队列会将消息从队列中删除!

5.2 失败策略

失败达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息,默认就是这种方式;
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队;
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机。

如果消息这个消息比较重要,达到最大重试次数之后这个消息不能被丢弃该怎么办,此时就可以使用RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

@Configuration
public class ErrorMessageConfig {
    //定义失败之后处理的交换机和队列
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }

    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }

    //将交换机和队列进行绑定
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }


    //定义一个RepublishMessageRecoverer,替换spring默认的处理机制​
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

流程图如下:

 6, 如何保证RabbitMQ消息的可靠性?

  • 开启生产者确认机制,确保生产者的消息能到达队列;
  • 开启持久化功能,确保消息未消费前在队列中不会丢失;
  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack;
  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/764136.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

GDB调试基础知识

文章目录 概念准备工作常用命令说明启动与退出给程序设置参数/获取设置参数GDB使用帮助查看当前文件代码查看非当前文件代码查看及设置显示的行数断点操作调试操作 概念 GDB 是由 GNU 软件系统社区提供的调试工具,同 GCC 配套组成了一套完整的开发环境,…

Python基础教程——60个基础练习(三)

41-字符串格式化 "%s is %s years old" % (bob, 23) # 常用 "%s is %d years old" % (bob, 23) # 常用 "%s is %d years old" % (bob, 23.5) # %d 是整数 常用 "%s is %f years old" % (bob, 23.5) "%s is %5.2f…

ListBox基本用法

作用:列表框,用于以列表的形式展示数据。 常用属性: 允许多列显示数据 添加数据项集合 常用事件: 选择项变化时触发该事件 后台代码示范: //列表框项目选择变化时被触发private void listBox1_SelectedIndexChanged…

Flutter 跳转应用市场评分——超简洁实现

最近在做flutter跳转去应用市场评分的功能,虽然是一个很小的功能,但是要做的既简单又高效,同时又能把细节考虑到,还是有坑要走的,这边记录一下。 背景 做应用市场相关的运营,在app内增加评分引导&#xf…

经典目标检测R-CNN系列(1)开山之作R-CNN

经典目标检测R-CNN系列(1)开山之作R-CNN 2014年,大神RBG(Ross Girshick)等人将卷积神经网络(Convolutional Neural Network,CNN)应用于目标检测任务中,在PASCAL VOC 2012数据集上,能…

vue 如何发布并部署到服务器

一般情况npm run build即可 从而生成vue代码直接放到服务器即可 这里的具体情况要看package.json里面的配置从而使用命令 会生成dist就是该项目的发布包

软件测试项目经验重要吗?

目前从行业薪资排名看,IT行业是我们普通人能够接触到的高薪行业,像金融、银行和投行等高薪职位,张雪峰老师在他的视频中分析过,不是一般人可以拿捏的。IT行业的大部分岗位需要专业的技能,留给我们这些非计算机专业科班…

实现微信机器人开发,个微api

首先微信聊天机器人,是一种通过自然语言模拟人类进行对话的程序。通常运行在特定的软件平台上,如PC平台或者移动终端设备平台。 有兴趣的可以去进行测试(E云管家),功能十分全面 文档测试过程中实现多项功能进行管理 …

数据结构--线性表的链式存储结构

这里写目录标题 链式存储结构链表简介格式分类头结点位置示意图与不带头结点的区别 链表的特点 单链表定义链表的代码实现简介实操 基本操作的实现初始化单链表销毁单链表清空单链表求单链表表长 二级目录二级目录二级目录二级目录二级目录二级目录 链式存储结构 链表 简介 格…

QML学习day1

QML学习day1 main.qml import QtQuick 2.15 import QtQuick.Window 2.15 import QtQuick.Controls 2.5Window {width: 640height: 480visible: truecolor:"blue"title: qsTr("Hello World")Button {//按钮id: btn1width: 50height: 50focus: true //聚焦…

P106-100组A卡(R5 240)指南

P106-100组A卡(R5 240)指南 不建议小白尝试 不建议小白尝试 不建议小白尝试文章目录 P106-100组A卡(R5 240)指南资料合集硬件软件基础卸载所有原驱动安装驱动修改注册表自动调用——只改一个注册表手动调用——改两个注册表 劝退…

软件设计原则

在软件开发中,为了提高软件系统的可维护性和可复用性,增加软件的可扩展性和灵活性,程序员要尽量根据6条原则来开发程序,从而提高软件开发效率、节约软件开发成本和维护成本。 开闭原则 对扩展开放,对修改关闭。在程序…

Leecode402:移掉 K 位数字

这道题一看想的是可能用回溯或者什么别的方法,但是那样的话时间复杂度非常高,而且也不适用于动态规划,所以观察的话,可以知道从前往后判断的话肯定是前面越小越好,所以只需要前面最小,整体就最小。因此从前…

子网掩码详解

1 子网掩码 IP地址是以网络号和主机号来标示网络上的主机的,我们把网络号相同的主机称之为本地网络,网络号不相同的主机称之为远程网络主机,本地网络中的主机可以直接相互通信;远程网络中的主机要相互通信必须通过本地网关&#…

酸蚀刻对钛医药材料纳米形态表面特性及活化能的影响

引言 由于商业纯钛(CP Ti)具有抗腐蚀性,并且具有哦合适的机械性能以及生物相容性,因此,目前一直被用作牙科植入材料。为了在临床手术中获得高水平的成功,CP Ti的表面质量和形貌是影响植入手术结果的最关键因素之一,近…

GPT使用技巧

五大原则 想要让ChatGPT产出有效的回答,需要遵循以下五个原则: 提问清晰: 请尽可能清晰地描述您的问题 简明扼要: 请尽量使用简单的语言和简洁的句子来表达您的问题 确认问题: 请确认您的问题是清晰、明确和完整…

python接口自动化--token登录(详解)

简介 为了验证用户登录情况以及减轻服务器的压力,减少频繁的查询数据库,使服务器更加健壮。有些登录不是用 cookie 来验证的,是用 token 参数来判断是否登录。token 传参有两种一种是放在请求头里,本质上是跟 cookie 是一样的&…

攻不下dfs不参加比赛(十一)

标题 为什么练dfs题目为什么练dfs 相信学过数据结构的朋友都知道dfs(深度优先搜索)是里面相当重要的一种搜索算法,可能直接说大家感受不到有条件的大家可以去看看一些算法比赛。这些比赛中每一届或多或少都会牵扯到dfs,可能提到dfs大家都知道但是我们为了避免眼高手低有的东…

手把手教会你使用Markdown

目录 一、Markdown是什么 二、Markdown优点 三、Markdown的基本语法 一、Markdown是什么 Markdown 是一种轻量级标记语言,创始人为约翰格鲁伯(John Gruber)。 Markdown 允许人们使用易读易写的纯文本格式编写文档,然后转换成…

四川大学计算机考研分析

关注我们的微信公众号 姚哥计算机考研 更多详情欢迎咨询 四川大学(B)考研难度(☆☆☆☆) 四川大学计算机考研招生学院是计算机学院、网络空间安全学院和视觉合成图形图像技术国防重点学科实验室。目前均已出拟录取名单。 四川…