RabbitMQ-如何保证消息不丢失

news2024/11/19 15:21:51

RabbitMQ常用于 异步发送,mysql,redis,es之间的数据同步 ,分布式事务,削峰填谷等.....

在微服务中,rabbitmq是我们经常用到的消息中间件。它能够异步的在各个业务之中进行消息的接受和发送,那么如何保证rabbitmq的消息不丢失就显得尤为重要。

首先要分析问题,我们就要明确rabbitmq在什么时候可能会出现消息丢失的情况呢?

我们直接说结果

RabbitMQ在每个阶段都有可能使消息发生丢失

我们在这里把他们简单归结为三个层面

层面一 :生产者发送消息没有到达交换机或者没有到达绑定的队列。

层面二:RabbitMQ宕机可能导致的消息的丢失。

层面三:消费者宕机导致消息丢失。

层面一的解决方法常见的是

1.生产者确认机制

RabbitMQ提供了publisher confirm机制来避免消息发送到Mq的过程中丢失,消息发送到Mq以后,会返回一个结果给发送者,表示消息的发送成功。

情况一:发送成功 生产者正常发送消息到队列之后会返回一个publish-confirm ack 这个意思是告诉生产者已经接收到消息了。

情况二:发送失败 这里的发送失败有两种,一种是生产者发送到交换机失败 此时返回 publish-confirm nack  。第二种是生产者发送到队列失败 返回 publish-return ack。

开启生产者确认机制的代码如下 ,在生产者的配置文件中加入以下配置
 

spring:
  rabbitmq:
    publisher-confirm-type: correlated #开启生产者确认机制
    publisher-returns: true

这里的

publisher-confirm-type:有三种模式可以选择:

第一种是none:代表关闭confirm机制

第二种是 simple:表示同步阻塞并等待mq的回执消息,即发送完消息后不能干其他的事情,只能等待mq的回执,很显然这样效率很低。

第三种是correlated:MQ异步回调方式返回回执消息,即生产者发送完消息后可以干其他的事情,直到接收到mq的回执。很明显这种效率要优于第二种。

配置return callback的代码如下,每个RabbitTemplate只能配置一个 代码如下
 

package com.itheima.publisher.com.it.heima.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;

/**
 * @Auther: QuJingChuan
 * @Date: 2024/1/13 10:34
 * @Description:
 */
@Slf4j
@Configuration
public class MqConfirmConfig implements ApplicationContextAware {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        //配置回调
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                log.debug("收到消息return的callback,  {},{},{},{},{}",
                        returnedMessage.getExchange(),
                        returnedMessage.getRoutingKey(),
                        returnedMessage.getMessage(),
                        returnedMessage.getReplyCode(),
                        returnedMessage.getReplyText());
            }
        });
    }
}

Confirm Callback需要每次发消息的时候都要配置(要制定发消息的id方便回执的时候直到是谁发的消息)这里写一个测试类方便大家看。

 @Test
    void testConfirmCallback() throws InterruptedException {
        //创建cd 参数为每次发送消息的id
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        //添加confirmCallBack
        correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            @Override
            public void onFailure(Throwable ex) {
                //这种情况一般是运行出现bug,一般不会发生。
                log.error("消息回调失败",ex);
            }

            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                log.debug("收到confirm callback 回执");
                if (result.isAck()){
                    //消息发送成功
                    log.debug("消息发送成功收到ack");
                }else {
                    //消息发送失败
                    log.debug("消息发送失败收到nack,原因:{}",result.getReason());
                    //TODO 重发消息等业务
                }
            }
        });

        rabbitTemplate.convertAndSend("amqp.test","amqptest","hello qjc",correlationData);

        Thread.sleep(2000);
    }

那么我们如何解决这个问题呢
方案一:重发消息 

方案二:记录日志

方案三:保存到数据库中定时发送,发送成功后删除表中的数据。

方案四:交给人工处理。

~生产者确认机制需要额外的网络和系统的资源开销,尽量不要使用。

~如果业务需要,那么无需开启publisher-return机制,因为一般路由失败都是自己业务的原因。

~对于nack消息可以有限次数的重试,依然失败则记录异常消息。

层面二的解决方法常见的是

2.消息持久化

由于mq是基于内存存储消息的,那么在mq服务宕机等一些情况下可能导致消息的丢失。同时内存空间有限,当消费者出现故障或者处理过慢,会导致消息积压,mq会对消息做迁移(page out 写入磁盘)从而引发mq阻塞。我们将消息存储在磁盘上就避免了这个问题。

一 :持久化交换机。

这里要选择Durable,因为Transient是临时交换机,当mq宕机后会消失。

代码展示
 

 @Bean
    public DirectExchange simpleExchange(){
        //分别是三个参数 交换机名称 是否持久化 当没有队列绑定时是否自动删除
        return new DirectExchange("qjc.exchange",true,false);
    }

二 :持久化队列。

这个与交换机类似,在此不做赘述。

代码展示

@Bean
    public Queue simpleQueue(){
        //springamqp在使用QueueBuilder来创建队列的时候,默认就是持久化的
        return QueueBuilder.durable("qjc.queue").build();
    }

三 :持久化消息。

这里选择delivery mode 选择2 ,1是不持久的。

代码展示

 Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .build();
如果不选择持久化队列,交换机,消息的话我们还有另一种方案

Lazy Queue(惰性队列)

惰性队列的特征如下

~接受到消息的时候直接存入磁盘而非内存(内存中只保留最近的消息)

~消费者需要消息的时候才会从磁盘中取出数据加载到内存

~支持数百万条的消息存储

在mq3.12版本后,所有的队列都是Lazy Queue模式,无法更改。

如果各位小伙伴的版本低于3.12那我这里提供了两种方式创建惰性队列

或用注解声明

    @RabbitListener(queuesToDeclare = @Queue(
            name = "lazy.queue",
            durable = "true",
            arguments = @Argument(name = "x-queue-mode",value = "lazy")
    ))
    public void listenLazyQueue(String msg){
        log.debug("接收到lazyqueue的消息" + msg);
    }

3.消费者确认机制

RabbitMQ支持消费者确认机制,即:当消费者处理消息后可以向mq发送ack回执,mq收到消息后会在队列中删除该消息。

SpringAMQP已经实现了消息确认的功能,并且允许我们通过配置文件选择ack的处理方式,有三种方式。

- none: 不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用  
- manual: 手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活  
- auto: 自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack.  
当业务出现异常时,根据异常判断返回不同结果:  
- 如果是业务异常,会自动返回nack  
- 如果是消息处理或校验异常,自动返回reject

注意我们需要再消费者的配置文件中加入参数

这就是mq保证消息不丢失的一些方式和解决方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1418960.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

代码随想录算法刷题训练营day19

代码随想录算法刷题训练营day19&#xff1a;LeetCode(404)左叶子之和、LeetCode(112)路径总和、LeetCode(113)路径总和 II、LeetCode(105)从前序与中序遍历序列构造二叉树、LeetCode(106)从中序与后序遍历序列构造二叉树 LeetCode(404)左叶子之和 题目 代码 /*** Definitio…

GitHub 上传文件夹到远程仓库、再次上传修改文件、如何使用lfs上传大文件、github报错一些问题

按照大家的做法&#xff0c;把自己遇到的问题及解决方案写出来&#xff08;注意&#xff1a;Error里面有些方法有时候我用可以成功&#xff0c;有时候我用也不能成功&#xff0c;写出来仅供参考&#xff0c;实在不行重头再clone&#xff0c;add&#xff0c;commit&#xff0c;p…

Virtual Assistant for Smartphone;Denoising Autoencoder;CrossMAE

本文首发于公众号&#xff1a;机器感知 Virtual Assistant for Smartphone&#xff1b;Denoising Autoencoder&#xff1b;CrossMAE The Case for Co-Designing Model Architectures with Hardware While GPUs are responsible for training the vast majority of state-of-t…

dvwa,xss反射型lowmedium

xss&#xff0c;反射型&#xff0c;low&&medium low发现xss本地搭建实操 medium作为初学者的我第一次接触比较浅的绕过思路 low 发现xss 本关无过滤 <script>alert(/xss/)</script> //或 <script>confirm(/xss/)</script> //或 <script&…

vulnhub靶场之EMPIRE:BREAKOUT

一.环境搭建 1.靶场描述 Description Back to the Top Difficulty: Easy This box was created to be an Easy box, but it can be Medium if you get lost. For hints discord Server ( https://discord.gg/7asvAhCEhe ) 2.靶场地址 https://www.vulnhub.com/entry/empire-…

Canny边缘检测算法(python 实现)

1. 应用高斯滤波来平滑(模糊)图像&#xff0c;目的是去除噪声 2. 计算梯度强度和方向&#xff0c;寻找边缘&#xff0c;即灰度强度变化最强的位置 3应用非最大抑制技术NMS来消除边误检模糊&#xff08;blurred&#xff09;的边界变得清晰&#xff08;sharp&#xff09;。保留了…

力扣题目训练(3)

2024年1月27日力扣题目训练 2024年1月27日力扣题目训练290. 单词规律292. Nim 游戏303. 区域和检索 - 数组不可变91. 解码方法92. 反转链表 II41. 缺失的第一个正数 2024年1月27日力扣题目训练 2024年1月27日第三天编程训练&#xff0c;今天主要是进行一些题训练&#xff0c;包…

机器学习算法实战案例:使用 Transformer 模型进行时间序列预测实战(升级版)

时间序列预测是一个经久不衰的主题&#xff0c;受自然语言处理领域的成功启发&#xff0c;transformer模型也在时间序列预测有了很大的发展。 本文可以作为学习使用Transformer 模型的时间序列预测的一个起点。 文章目录 机器学习算法实战案例系列答疑&技术交流数据集数据…

Java RC4加密算法

一、RC4加密算法 在密码学中&#xff0c;RC4&#xff08;来自Rivest Cipher 4的缩写&#xff09;是一种流加密算法&#xff0c;密钥长度可变。它加解密使用相同的密钥&#xff0c;因此也属于对称加密算法。 百度百科 - RC4&#xff1a;https://baike.baidu.com/item/RC4/34545…

揭秘1688商品详情API接口:一探阿里巴巴的亿级商品数据宝藏

一、概述 1688商品详情API接口是阿里巴巴提供的一套应用程序接口&#xff0c;允许第三方开发者获取1688平台上的商品详情信息。通过使用这个接口&#xff0c;开发者可以获取到商品的详细属性、规格参数、价格等信息&#xff0c;从而进行深度分析和挖掘&#xff0c;进一步优化和…

selenium元素定位---元素点击交互异常解决方法

&#x1f345; 视频学习&#xff1a;文末有免费的配套视频可观看 &#x1f345; 点击文末小卡片 &#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;薪资嘎嘎涨 1、异常原因 在编写ui自动化时&#xff0c;执行报错元素无法点击&#xff1a;ElementClickIn…

基础算法之Huffman编码

// Type your code here, or load an example. #include<iostream> #include<string> #include<queue> #include <unordered_map> #include <vector>using namespace std;//树节点结构 struct Node {char ch;int freq;Node *left;Node *right;No…

【数据结构】(一)从绪论到各种线性表

目录 一、绪论Introduction 1、数据结构 2、逻辑结构&#xff08;数据元素之间的相互关系&#xff09; 3、物理结构&#xff08;数据逻辑结构在计算机中的存储形式&#xff09; 4、数据类型&#xff08;一组性质相同的值的集合及定义在此集合上的一些操作的总称&#xff09…

幻兽帕鲁服务器多少钱?2024年Palworld游戏主机费用

幻兽帕鲁服务器多少钱&#xff1f;价格便宜&#xff0c;阿里云4核16G幻兽帕鲁专属服务器32元1个月、66元3个月&#xff0c;4核32G配置113元1个月、339元3个月&#xff1b;腾讯云4核16G14M服务器66元1个月、277元3个月、1584元一年。阿腾云atengyun.com分享阿里云和腾讯云palwor…

Python算法题集_找到字符串中所有字母异位词

本文为Python算法题集之一的代码示例 题目438&#xff1a;找到字符串中所有字母异位词 说明&#xff1a;给定两个字符串 s 和 p&#xff0c;找到 s 中所有 p 的 异位词 的子串&#xff0c;返回这些子串的起始索引。不考虑答案输出的顺序。 异位词 指由相同字母重排列形成的字…

Java基础—面向对象OOP—18三大特性:封装、继承与多态

由于本身理解还不是很到位&#xff0c;所以写的很绕&#xff0c;后续待补充优化 1、封装&#xff08;底层&#xff09;&#xff1a;该露的露&#xff0c;该藏的藏 高内聚&#xff1a;类的内部数据操作细节自己完成&#xff0c;不允许外部干涉低耦合&#xff1a;仅暴露少量的方…

休息日的思考与额外题——链表

文章目录 前言链表知识点 一、 92. 反转链表 II二、21. 合并两个有序链表总结 前言 一个本硕双非的小菜鸡&#xff0c;备战24年秋招&#xff0c;计划二刷完卡子哥的刷题计划&#xff0c;加油&#xff01; 二刷决定精刷了&#xff0c;于是参加了卡子哥的刷题班&#xff0c;训练…

2023年算法SAO-CNN-BiLSTM-ATTENTION回归预测(matlab)

2023年算法SAO-CNN-BiLSTM-ATTENTION回归预测&#xff08;matlab&#xff09; SAO-CNN-BiLSTM-Attention雪消融优化器优化卷积-长短期记忆神经网络结合注意力机制的数据回归预测 Matlab语言。 雪消融优化器( SAO) 是受自然界中雪的升华和融化行为的启发&#xff0c;开发了一种…

Linux true/false区分

bash的数值代表和其它代表相反&#xff1a;0表示true&#xff1b;非0代表false。 #!/bin/sh PIDFILE"pid"# truenginx进程运行 falsenginx进程未运行 checkRunning(){# -f true表示普通文件if [ -f "$PIDFILE" ]; then# -z 字符串长度为0trueif [ -z &qu…

shell脚本——条件语句

目录 一、条件语句 1、test命令测试条件表达式 2、整数数值比较 3、字符串比较 4、逻辑测试&#xff08;短路运算&#xff09; 5、双中括号 二、if语句 1、 分支结构 1.1 单分支结果 1.2 双分支 1.3 多分支 2、case 一、条件语句 条件测试&#xff1a;判断某需求是…