RabbitMQ 保证消息不丢失的几种手段

news2024/11/16 17:37:38

文章目录

  • 1.RabbitMQ消息丢失的三种情况
  • 2.RabbitMQ消息丢失解决方案
    • 2.1 针对生产者
      • 2.1.1 方案1 :开启RabbitMQ事务
      • 2.1.2 方案2:使用confirm机制
    • 2.2 Exchange路由到队列失败
    • 2.3 RabbitMq自身问题导致的消息丢失问题解决方案
      • 2.3.1 消息持久化
      • 2.3.2 设置集群镜像模式
      • 2.3.3 消息补偿机制
    • 2.3 针对消费者
  • 3.总结

在使用消息队列时,面对复杂的网络状况,我们必须要考虑如何确保消息能够正常消费。在分析如何保证消息不丢失的问题之前,我们需要对症下药,什么样的情况会导致消息丢失。

1.RabbitMQ消息丢失的三种情况

在弄清消息丢失的情况之前,我们先看看一条消息从产生到最终消费会经历哪些过程。
在这里插入图片描述

上面的图是官网中关于一条消息发送的整个流程,消息会经历下面几个流程:

  • 生产者将消息发送到Exchange
  • Exchange根据Routing Key路由到Queue
  • 消费者订阅Queue,从Queue中获取数据消费

通过上面的RabbitMQ发送消息的模型我们可以知道在下面几个过程中消息可能会丢失:
在这里插入图片描述

第一种:生产者弄丢了数据。生产者将消息发送到Exchange时丢失。例如在发送过程中因为网络原因发送失败,亦或者是因为发送到了一个不存在的Exchange。

第二种:路由失败。这种情况就是消息已经发送到Exchange了,但是Exchange将消息根据Routing Key路由到对应的Queue时失败,例如这个Exchange根本就没有绑定Queue等等。

第三种:客户端在处理消息时失败。客户端已经获取了消息,但是在处理消息过程中出现异常,没有对异常做处理,导致消息丢失了。

上面这几种情况都是消息在向不同的模块传递时失败导致消息丢失了,如果上面的情况都能解决也并不能保证消息不会丢失,如果RabbitMQ服务宕机了,如果这些消息没有被持久化,等RabbitMQ服务重启之后,这些没有持久化的消息也将丢失。

分析了这么多的情况可能会导致消息丢失,下面将根据各种情况对应的分析来解决。

2.RabbitMQ消息丢失解决方案

在这里插入图片描述

2.1 针对生产者

生产者发送消息到Exchange失败

对于网络原因导致消息发送到Exchange失败这个我们很好感知,我们只需要对发送异常做处理即可。排除这个原因,默认情况下生产者将消息发送到Exchange是不会返回任何信息给生产者,至于消息是不是真的到了服务端作为生产者根本无从可知。

对于这个问题RabbitMQ中有两种方式可以用来解决问题:

  • 通过事务机制实现
  • 通过发送方确认机制实现

2.1.1 方案1 :开启RabbitMQ事务

可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。

值得我们注意的是:RabbitMQ中的事务与数据库的事务有稍许不同,数据库每次都需要打开事务,且最后与之对应的有commit或者rollback,而RabbitMQ中channel中的事务只需要开启一次,可以多次commit或者rollback。

开启事务的样例如下:

// 开启事务  
channel.txSelect();  
try {  
   // 这里发送消息  
} catch (Exception e) {  
   channel.txRollback(); 
// 这里再次重发这条消息
}
// 提交事务  
channel.txCommit(); 

这样看可能不太直观,下面我简单写一段使用RabbitMQ的代码,然后给大家解释一下

//channel开启事务
channel.txSelect();
//发送3条消息
String msgTemplate = "测试事务消息内容[%d]";
channel.basicPublish("tx.exchange", "tx", new AMQP.BasicProperties(), String.format(msgTemplate,1).getBytes(StandardCharsets.UTF_8));
channel.basicPublish("tx.exchange", "tx", new AMQP.BasicProperties(), String.format(msgTemplate,2).getBytes(StandardCharsets.UTF_8));
channel.basicPublish("tx.exchange", "tx", new AMQP.BasicProperties(), String.format(msgTemplate,3).getBytes(StandardCharsets.UTF_8));
//消息回滚
channel.txRollback();
//成功提交
channel.basicPublish("tx.exchange", "tx", new AMQP.BasicProperties(), String.format(msgTemplate,4).getBytes(StandardCharsets.UTF_8));
channel.txCommit();

上面的方法中一共发送了4次消息,前三次发送后最后调用了txRollback,这将导致前三条消息回滚而没有发送成功。而第四次发送之后调用commit,最后在RabbitMQ中只会有一条消息。

虽然事务可以保证消息一定被提交到服务器,而且在客户端编码方面足够简单。但是它也不是那么完美,在性能方面事务会带来较大的性能影响。RabbitMQ 事务机制是同步的,你提交一个事务之后会阻塞在那儿,采用这种方式基本上吞吐量会下来,因为太耗性能。

2.1.2 方案2:使用confirm机制

事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的。

confirm机制是为了解决事务性能问题的一种方案,我们可以通过使用channel.confirmSelect方法开启confirm模式,在生产者开启了confirm模式之后,每次写的消息都会分配一个唯一的id,然后如果写入了rabbitmq之中,rabbitmq会给你回传一个ack消息,告诉你这个消息发送OK了;

如果rabbitmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息失败了,你可以进行重试。而且你可以结合这个机制知道自己在内存里维护每个消息的id,如果超过一定时间还没接收到这个消息的回调,那么你可以进行重发。

代码样例:

  • 生产者
public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //设置虚拟主机
        connectionFactory.setVirtualHost("/");

        //创建一个链接
        Connection connection = connectionFactory.newConnection();

        //创建channel
        Channel channel = connection.createChannel();

        //消息的确认模式
        channel.confirmSelect();

        String exchangeName="test_confirm_exchange";
        String routeKey="confirm.test";
        String msg="RabbitMQ send message confirm test!";
        for (int i=0;i<5;i++){
            channel.basicPublish(exchangeName,routeKey,null,msg.getBytes());
        }
        //确定监听事件
        channel.addConfirmListener(new ConfirmListener() {

            /**
             *  消息成功发送
             * @param deliveryTag   消息唯一标签
             * @param multiple  是否批量
             * @throws IOException
             */
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("**********Ack*********");
            }

            /**
             *  消息没有成功发送
             * @param deliveryTag
             * @param multiple
             * @throws IOException
             */
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("**********No Ack*********");
            }

        });
    }

  • 消费者
public static void main(String[] args) throws  Exception{
        System.out.println("======消息接收start==========");
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //设置虚拟主机
        connectionFactory.setVirtualHost("/");
        //创建链接
        Connection connection = connectionFactory.newConnection();

        //创建channel
        Channel channel = connection.createChannel();
        String exchangeName="test_confirm_exchange";
        String exchangeType="topic";
        //声明Exchange
        channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
        String queueName="test_confirm_queue";
        //声明队列
        channel.queueDeclare(queueName,true,false,false,null);
        String routeKey="confirm.#";
        //绑定队列和交换机
        channel.queueBind(queueName,exchangeName,routeKey);
            channel.basicConsume(queueName, true, new DefaultConsumer(channel) {

                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("接收到消息::"+new String(body));
                }
            });

    }

需要注意的是confirm机制与事务是不能共存的,简单的说就是开启事务就无法使用confirm,开启confirm就无法使用事务。

2.2 Exchange路由到队列失败

在生产者将消息推送到RabbitMQ时,我们可以通过事务或者confirm模式来保证消息不会丢失。但是这两种措施只能保证消息到达Exchange,如果我们的消息无法根据RoutingKey到达对应的Queue中,那么我们的消息最后就会丢失。

对于这种情况,RabbitMQ中在发送消息时提供了mandatory参数。如果mandatory为true时,Exchange根据自身的类型和RoutingKey无法找到对应的Queue,它将不会丢掉该消息,而是会将消息返回给生产者。

代码样例:

//创建Exchange
channel.exchangeDeclare("mandatory.exchange", BuiltinExchangeType.DIRECT, true, false, new HashMap<>());
//创建Queue
channel.queueDeclare("mandatory.queue", true, false, false, new HashMap<>());
//绑定路由
channel.queueBind("mandatory.queue", "mandatory.exchange", "mandatory");
channel.addReturnListener(new ReturnListener() {
    @Override
    public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
        log.error("replyCode = {},replyText ={},exchange={},routingKey={},body={}",replyCode,replyText,exchange,routingKey,new String(body));
    }
});
//设置mandatory = true
//void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
channel.basicPublish("mandatory.exchange", "mandatory-1",true, new AMQP.BasicProperties(), "测试mandatory的消息".getBytes(StandardCharsets.UTF_8));

在我们调用BasicPublish方法的时候,我们设置了mandatory为true,同时还给channel设置了ReturnListener用来监听路由到队列失败的消息。

2.3 RabbitMq自身问题导致的消息丢失问题解决方案

RabbitMQ本身主要应对三点:

  • 要保证rabbitMQ不丢失消息,那么就需要开启rabbitMQ的持久化机制,即把消息持久化到硬盘上,这样即使rabbitMQ挂掉在重启后仍然可以从硬盘读取消息;

  • 如果rabbitMQ单点故障怎么办,这种情况倒不会造成消息丢失,这里就要提到rabbitMQ的3种安装模式,单机模式、普通集群模式、镜像集群模式,这里要保证rabbitMQ的高可用就要配合HAPROXY做镜像集群模式;

  • 如果硬盘坏掉怎么保证消息不丢失。

2.3.1 消息持久化

RabbitMQ 的消息默认存放在内存上面,如果不特别声明设置,消息不会持久化保存到硬盘上面的,如果节点重启或者意外crash掉,消息就会丢失,所以就要对消息进行持久化处理。

在RabbitMQ中,我们可以通过将durable的值设置为true来保证持久化。如何持久化,下面具体说明下。要想做到消息持久化,必须满足以下三个条件,缺一不可。

  • Exchange 设置持久化

  • Queue 设置持久化

  • Message持久化发送:发送消息设置发送模式deliveryMode=2,代表持久化消息

2.3.2 设置集群镜像模式

先来介绍下RabbitMQ三种部署模式:

  • 单节点模式:最简单的情况,非集群模式,节点挂了,消息就不能用了。业务可能瘫痪,只能等待。

  • 普通模式:消息只会存在与当前节点中,并不会同步到其他节点,当前节点宕机,有影响的业务会瘫痪,只能等待节点恢复重启可用(必须持久化消息情况下)。

  • 镜像模式:消息会同步到其他节点上,可以设置同步的节点个数,但吞吐量会下降。属于RabbitMQ的HA方案

为什么设置镜像模式集群,因为队列的内容仅仅存在某一个节点上面,不会存在所有节点上面,所有节点仅仅存放消息结构和元数据。
在这里插入图片描述

如果想解决上面途中问题,保证消息不丢失,需要采用HA 镜像模式队列。

下面介绍下三种HA策略模式:

  • 同步至所有的

  • 同步最多N个机器

  • 只同步至符合指定名称的nodes

但是:HA 镜像队列有一个很大的缺点就是系统的吞吐量会有所下降。

2.3.3 消息补偿机制

系统是在一个复杂的环境,虽然以上的三种方案,基本可以保证消息的高可用不丢失的问题,但是仍然会遇到消息丢失的问题,如:持久化的消息,保存到硬盘过程中,当前队列节点挂了,存储节点硬盘又坏了,这种情况下消息仍然会丢失。

为了避免上面这个问题,我们可以让生产端首先将业务数据以及消息数据入库,需要在同一个事务中,消息数据入库失败,则整体回滚。
在这里插入图片描述

然后我们根据消息表中消息状态,失败则进行消息补偿措施,重新发送消息处理。
在这里插入图片描述

2.3 针对消费者

消费者获取消息后处理消息失败

通过上面的方式我们保证了从生产者到RabbitMQ消息不会丢失,现在到了消费者消费消息了。

在消费者处理业务时,可能由于我们业务代码的异常导致消息没有被正常处理完,但是消息已经从RabbitMQ中的队列移除了,这样我们的消息就丢失了。

我同样也可以通过ACK确认机制去避免这种情况

在生产者发送消息到RabbitMQ时我们可以通过ack来确认消息是否到达了服务端,与之类似的是,消费者在消费消息时同样提供手动ack模式。默认情况下,消费者从队列中获取消息后会自动ack,我们可以通过手动ack来保证消费者主动的控制ack行为,这样我们可以避免业务异常导致消息丢失的情况。

DeliverCallback deliverCallback = new DeliverCallback() {
    @Override
    public void handle(String consumerTag, Delivery message) throws IOException {
        try {
            byte[] body = message.getBody();
            String messageContent = new String(body, StandardCharsets.UTF_8);
            if("error".equals(messageContent)){
                throw new RuntimeException("业务异常");
            }
            log.info("收到的消息内容:{}",messageContent);
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        }catch (Exception e){
            log.info("消费消息失败!重回队列!");
            channel.basicNack(message.getEnvelope().getDeliveryTag(),false,true);
        }
    }
};
CancelCallback cancelCallback = new CancelCallback() {
    @Override
    public void handle(String consumerTag) throws IOException {
        log.info("取消订阅:{}",consumerTag);
    }
};
channel.basicConsume("confirm.queue",false,deliverCallback,cancelCallback);

3.总结

我们通过分析消息从生产者发送消息到消费者消费消息的全过程,得出了消息可能丢失的几种场景,并给出了相应的解决方案,如果需要保证消息在整条链路中不丢失,那就需要生产端、mq自身与消费端共同去保障。

生产端:对生产的消息进行状态标记,开启confirm机制,依据mq的响应来更新消息状态,使用定时任务重新投递超时的消息,多次投递失败进行报警。

mq自身:开启持久化,并在落盘后再进行ack。如果是镜像部署模式,需要在同步到多个副本之后再进行ack。

消费端:开启手动ack模式,在业务处理完成后再进行ack,并且需要保证幂等。

整个过程如下图所示:
在这里插入图片描述

通过以上的处理,理论上不存在消息丢失的情况,但是系统的吞吐量以及性能有所下降。在实际开发中,需要考虑消息丢失的影响程度,来做出对可靠性以及性能之间的权衡。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/419979.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

无废话硬核分享:Linux 基础知识点总结很详细,全的很,吐血奉献

Linux 的学习对于一个程序员的重要性是不言而喻的。前端开发相比后端开发&#xff0c;接触 Linux 机会相对较少&#xff0c;因此往往容易忽视它。但是学好它却是程序员必备修养之一。 Linux 基础 操作系统 操作系统Operating System简称OS&#xff0c;是软件的一部分&#x…

【0基础学爬虫】爬虫基础之数据存储

大数据时代&#xff0c;各行各业对数据采集的需求日益增多&#xff0c;网络爬虫的运用也更为广泛&#xff0c;越来越多的人开始学习网络爬虫这项技术&#xff0c;K哥爬虫此前已经推出不少爬虫进阶、逆向相关文章&#xff0c;为实现从易到难全方位覆盖&#xff0c;特设【0基础学…

物联网时代的网络安全

近年来&#xff0c;物联网 (IoT) 彻底改变了我们的生活和工作方式。从智能家居到自动驾驶汽车&#xff0c;物联网设备在我们的日常生活中变得越来越普遍。 根据 Statista 的一份报告&#xff0c;到 2025 年将有超过 750 亿个物联网 (IoT) 设备投入使用。 然而&#xff0c;这…

c++之STl容器-string

目录 容器的分类 string string的概念 string的初始化 string的遍历 string的一些基本操作 char*类型和string类型互转 字符串的连接 字符串的查找和替换 string的截断和删除 容器的分类 在实际的开发过程中&#xff0c;数据结构本身的重要性不会逊于操作于数据结构的算…

SpringMVC03-文件上传、异常处理、拦截器

SpringMVC03 SpringMVC的文件上传 一 、文件上传的前端必要前提 form 表单的 entcype取值必须是&#xff1a;multipart/form-data。默认值&#xff1a;application/x-www-form-urlencoded&#xff0c;是表单请求正文的类型method 属性取值必须是 post提供一个文件选择域 二…

利用ChatGPT,一分钟制作思维导图

大家好&#xff0c;我是易安&#xff01; 今天我来教你如何使用ChatGPT&#xff0c;一分钟制作出一份思维导图 大纲选题 想到一个课题&#xff0c;然后人工梳理出内容大纲&#xff0c;是个挺费精力的事情。但利用ChatGPT来做这件事. 5秒就可以搞定啦&#xff01; 例如&#xf…

Python安全攻防之第二章Python语言基础

2.3 Python模块的安装与使用python模块的安装pip3 install 模块名称py -3 -m pip install 模块名称python模块的导入与使用&#xff08;1&#xff09;Import模块名称采用“Import模块名称”方式时&#xff0c;需要在对象前面加上模块名称作为前缀&#xff0c;具体形式为“模块名…

Nextcloud去掉URL中的index.php以及强制https(Win10子系统WSL)

一、Nextcloud去掉URL中的index.php 1、启用相关模块 cd /var/www/nextcloud #进入程序目录sudo chmod -R 777 .htaccess #设置.htaccess文件权限可读写sudo a2enmod envaudo a2enmod rewrite #启用rewrite模块2、修改nextcloud配置文件 vim /var/www/nextcloud/config/…

Redis数据备份与恢复

Redis数据备份与恢复 文章目录Redis数据备份与恢复1. Redis备份的方式2. RDB持久化2.1 什么是RDB&#xff1f;2.2 Fork操作2.3 save VS bgsave2.4 关于RDB备份的一些配置项2.5 RDB的备份与恢复2.6 RDB的自动触发2.7 RDB的优势与劣势3. AOF持久化3.1 什么是AOF&#xff1f;3.2 A…

hypothesis testing假设检验

假设检验是什么 比如一家巧克力工厂生产的巧克力每个1g&#xff0c;一个工人说&#xff0c;机器在维修之后生产的巧克力不是1g&#xff0c;为了验证工人说的是否正确&#xff0c;需进行假设检验。 随机挑选50个巧克力&#xff0c;计算平均重量。 H0&#xff1a;每个巧克力1g H…

Seatunnel-2.3.0源码解析

一、概述 SeaTunnel是一个简单易用的数据集成框架&#xff0c;在企业中&#xff0c;由于开发时间或开发部门不通用&#xff0c;往往有多个异构的、运行在不同的软硬件平台上的信息系统同时运行。数据集成是把不同来源、格式、特点性质的数据在逻辑上或物理上有机地集中&#x…

Spring学习笔记(二)【CGLIB浅拷贝BeanCopier的使用和详解】

CGLIB浅拷贝BeanCopier的使用和详解 一、bean拷贝工具 bean拷贝工具类比较 常用的bean拷贝工具类当中&#xff0c;主要有Apache提供的beanUtils、Spring提供的beanUtils、Cglib提供的beanCopier&#xff0c;性能上分析如下表所示&#xff08;该表来自网上的数据&#xff09; …

探索Apache Hudi核心概念 (3) - Compaction

Compaction是MOR表的一项核心机制&#xff0c;Hudi利用Compaction将MOR表产生的Log File合并到新的Base File中。本文我们会通过Notebook介绍并演示Compaction的运行机制&#xff0c;帮助您理解其工作原理和相关配置。 1. 运行 Notebook 本文使用的Notebook是&#xff1a;《A…

此战成硕,我成功上岸西南交通大学了~~~

友友们&#xff0c;好久不见&#xff0c;很长时间没有更一个正式点的文章了&#xff01; 是因为我在去年年底忙着准备初试&#xff0c;今年年初在准备复试&#xff0c;直到3月底拟录取后&#xff0c;终于可以写下这篇上岸贴&#xff0c;和大家分享一下考研至上岸的一个过程 文章…

springboot+thymeleaf实现发Html邮件自由

2019年&#xff0c;我刚接触测试架构和测试开发类的工作时&#xff0c;经常会有自动化发邮件的功能&#xff0c;大都是从各个平台自动化统计一些数据出来&#xff0c;每周定时发一封邮件给领导交差&#xff0c;回过头来再看看我发的邮件&#xff0c;不美观&#xff0c;不专业。…

Android Jetpack:现代化Android开发的利器

Android Jetpack&#xff1a;现代化Android开发的利器 引言 随着移动应用的快速发展和用户体验的不断提升&#xff0c;现代化的Android应用开发变得愈发复杂和多样化。为了提高开发效率、简化代码、加速应用迭代&#xff0c;Google推出了Android Jetpack组件&#xff0c;成为现…

springboot零基础到项目实战

推荐教程&#xff1a; springboot零基础到项目实战 SpringBoot这门技术课程所包含的技术点其实并不是很多&#xff0c;但是围绕着SpringBoot的周边知识&#xff0c;也就是SpringBoot整合其他技术&#xff0c;这样的知识量很大&#xff0c;例如SpringBoot整合MyBatis等等。因此…

gitlab-ce升级方法

Centos7升级gitlab-ce&#xff1a; 1、记录当前版本 在升级前一定要做好备份&#xff0c;记录自己当前的gitlab-ce的版本&#xff1a; yum list | grep gitlab-ce 2、编辑/etc/gitlab/gitlab.rb文件&#xff1a; 1&#xff09;将下面几行注释取消。 说明&#xff1a; 1&am…

pytorch进阶学习(二):使用DataLoader读取自己的数据集

上一节使用的是官方数据集fashionminist进行训练&#xff0c;这节课使用自己搜集的数据集来进行数据的获取和训练。所需资源教学视频&#xff1a;https://www.bilibili.com/video/BV1by4y1b7hX/?spm_id_from333.1007.top_right_bar_window_history.content.click&vd_sourc…

建立数据驱动,关键字驱动和混合Selenium框架这些你了解吗

一、什么是Selenium框架&#xff1f; Selenium框架是一种代码结构&#xff0c;用于简化代码维护和提高代码可读性。框架涉及将整个代码分成较小的代码段&#xff0c;以测试特定的功能。 该代码的结构使得“数据集”与实际的“测试用例”分开&#xff0c;后者将测试Web应用程序…