RabbitMQ消息可靠性问题及解决

news2024/11/28 12:56:32

说明:在RabbitMQ消息传递过程中,有以下问题:

  • 消息没发到交换机

  • 消息没发到队列

  • MQ宕机,消息在队列中丢失

  • 消息者接收到消息后,未能正常消费(程序报错),此时消息已在队列中移除

针对以上问题,提供以下解决方案:

  • 消息确认:确认消息是否发送到交换机、队列;

  • 消息持久化:持久化消息,以防MQ宕机造成消息丢失;

  • 消费者消息确认:确认消费者已正确消费消息,才把消息从队列中删除;

在这里插入图片描述

消息确认

可以使用Rabbit MQ提供的publisher confirm机制来避免消息发送到MQ过程丢失。具体实现是,publisher-confirm(发送者确定)、publisher-return(发送者回执),前者判断消息到交换机、后者判断交换机到队列


publisher-confirm(发送者确定)

  • 消息成功投递到交换机,返回ack;

  • 消息未投递到交换机,返回nack;

publisher-return(发送者回执)

  • 消息投递到交换机,但没有到队列,返回ack,即失败原因;

在生产者端添加配置

spring:
  rabbitmq:
    # rabbitMQ相关配置
    host: 118.178.228.175
    port: 5672
    username: root
    password: 123456
    virtual-host: /

    # 开启生产者确认,correlated为异步,simple为同步
    publisher-confirm-type: correlated

    # 开启publish-return功能,基于callback机制
    publisher-returns: true

    # 开启消息路由失败的策略,true是调用returnCallback方法,false是丢弃消息
    template:
      mandatory: true

publisher-return(发送者回执)代码

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;

/**
 * 发送者回执实现
 */
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate对象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);

        // 设置ReturnCallback
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

            /**
             * 回执信息
             * @param message 信息对象
             * @param replyCode 回执码
             * @param replyText 回执内容
             * @param exchange 交换机
             * @param routingKey 路由键值
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("消息发送队列失败=====replyCode{},replyText{},exchange{},routingKey{},message{}",replyCode,replyText,exchange,routingKey,message);
            }
        });
    }
}

publisher-confirm(发送者确定)代码

    @Test
    public void sendExceptionMessage() {
        // 路由键值
        String routingKey = "exception";

        // 消息
        String message = "This is a exception message";

        // 给消息设置一个唯一ID
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        // 编写confirmCallBack回调函数
        correlationData.getFuture().addCallback(new SuccessCallback<CorrelationData.Confirm>() {
            @Override
            public void onSuccess(CorrelationData.Confirm confirm) {
                if (confirm.isAck()) {
                    // 消息发送交换机成功
                    log.debug("消息送达至交换机成功");
                } else {
                    // 消息发送交换机失败,打印消息
                    log.error("消息未能送达至交换机,ID{},原因{}", correlationData.getId(), confirm.getReason());
                }
            }
        }, new FailureCallback() {
            // 消息发送交换机异常
            @Override
            public void onFailure(Throwable ex) {
                log.error("消息发送交换机异常,ID:{},原因{}", correlationData.getId(), ex.getMessage());
            }
        });

        rabbitTemplate.convertAndSend("amq.direct", routingKey, message, correlationData);
    }

测试,设置一个不存在的routingKey,被发送者确认(publisher-confirm)捕获到;

// 路由键值
String routingKey = "null";

在这里插入图片描述

设置一个不存在的路由,被发送者回执(publisher-return)捕获到;

rabbitTemplate.convertAndSend("null", routingKey, message, correlationData);

在这里插入图片描述

消息持久化

消息持久化,是指把消息保存到磁盘中,在RabbitMQ宕机或者关机时,重启后,消息仍可以保存下来。消息依赖于交换机、队列,因此持久化消息,同时也需要持久化交换机、队列。

创建一个持久化的交换机、队列

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 消息持久化
 */
@Configuration
public class DurableConfig {

    /**
     * 交换机持久化
     * @return
     */
    @Bean
    public DirectExchange directExchange(){
        // 三个参数分别是:交换机名、是否持久化、没有队列与之绑定时是否自动删除
        return new DirectExchange("durable.direct",true,false);
    }

    /**
     * 队列持久化
     * @return
     */
    @Bean
    public Queue durableQueue(){
        return QueueBuilder.durable("durable.queue").build();
    }

    /**
     * 交换机与队列绑定
     * @return
     */
    @Bean
    public Binding binding(){
        return BindingBuilder.bind(durableQueue()).to(directExchange()).with("durable");
    }

}

发送一个持久化的消息

    /**
     * 发送持久化消息
     */
    @Test
    public void sendDurableMessage() {
        String routingKey = "durable";

        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        Message message = MessageBuilder.withBody("This is a durable message".getBytes(StandardCharsets.UTF_8))
                // 设置该消息未持久化消息
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();

        rabbitTemplate.convertAndSend("durable.direct", routingKey, message, correlationData);
    }

打开RabbitMQ管理平台,可以看到"delivery_mode: 2",表示该消息是持久化消息

在这里插入图片描述

(源码:MessageDeliveryMode类)
在这里插入图片描述

实际上,交换机、队列默认就是持久化的(durable: true),所以不用特意设置;

在这里插入图片描述

消费者消息确认

介绍

消费者消息确认,是为了确保消费者已经消费了消息,才让MQ把该消息删除;

可通过在消费者的配置文件中增加下面这行配置实现,备选项有以下三个:

  • none:关闭ack,表示不做处理,消息发给消费者之后就立即被删除;

  • auto:自动ack,表示由Spring检测代码是否出现异常,出现异常则保留消息,没有异常则删除消息;

  • manual:手动ack,可根据业务手动编写代码,返回ack;

spring:
  rabbitmq:
    listener:
      simple:
      	# 设置消息确认模式
        acknowledge-mode: none

测试:none

可编写代码测试,下面是生产者代码,发送消息

    /**
     * 发送普通消息
     */
    @Test
    public void sendNoneMessage() {
        String directName = "none.direct";

        String routingKey = "none";

        String message = "This is a test message";

        rabbitTemplate.convertAndSend(directName, routingKey, message);
    }

消费者代码有问题,未能正常消费消息

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "none.queue"),
            exchange = @Exchange(name = "none.direct",type = ExchangeTypes.DIRECT),
            key = {"none"}
    ))
    public void getNoneMessage(String normalMessage){
        System.out.println(1/0);
        System.out.println("normalMessage = " + normalMessage);
    }

测试结果,程序报错,消息也没能保留下来

在这里插入图片描述
在这里插入图片描述

测试:auto

更改设置为:auto,重试

在这里插入图片描述

但是消息未被删除

在这里插入图片描述

这种情况,在实际开发中是不能允许,可以通过更改消费失败的重试机制解决。

消费失败重试机制

方法一:设置retry

因为消息被消费失败,消息会一直循环重试,无限循环,导致mq的消息处理飙升,带来不必要的压力,这种情况可以通过在消费者端添加以下配置,限制失败重试的条件来解决:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          # 开启消费者失败重试
          enabled: true
          # 初次失败等待时长为1秒
          initial-interval: 1000
          # 失败的等待时长倍数,即后一次等待的时间是前一次等待时间的多少倍
          multiplier: 1
          # 最多重试次数
          max-attempts: 3
          # true 无状态 false 有状态 如果业务中包含事务 改为false
          stateless: true

开启后,控制台可以发现,信息不回一直循环打印,而是打印数条后停止,日志信息中有提示“Retry Policy Exhausted”(重试策略已用尽)

在这里插入图片描述
这种通过配置的方式,并不会重试数次后仍保留消息,而是重试数次仍失败,随即丢弃消息,消息丢失,这在实际开发中也是不能被允许的。

方法二:路由存储消息

因此,可以通过下面这个方法,把消费失败的消息,通过交换机路由到另外的队列中存储起来,等业务代码被修复,再路由回来消费。

在这里插入图片描述

代码如下

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 错误消息队列
 */
@Configuration
public class ErrorMessageQueueConfig {

    /**
     * 创建一个交换机,用于路由消费失败的消息
     * @return
     */
    @Bean
    public DirectExchange errorExchange(){
        return new DirectExchange("error.direct");
    }

    /**
     * 创建一个队列,用于存储消费失败的消息
     * @return
     */
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue");
    }

    /**
     * 绑定
     * @return
     */
    @Bean
    public Binding errorBinding(){
        return BindingBuilder.bind(errorQueue()).to(errorExchange()).with("error");
    }

    /**
     * 路由,当消费失败时,把消费失败的消息路由到此队列中,路由key为"error"
     * @param rabbitTemplate
     * @return
     */
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");
    }
}

可以看到,消息消费失败后并没有被丢失,而是路由到错误队列中存储了起来。因为错误队列没有设置RabbitListener,所以可以存储消息,等带代码问题被排查出来后,可以再针对该队列设置监听方法,消费这部分错误的消息。

在这里插入图片描述

另外,值得一提的是,消费者这边的控制台会报一个警告,提示路由密钥错误。我们可以理解,在RabbitMQ底层,会把消费失败了的消息,统一路由到一个地方去,而我们这种手动把消费失败的消息路由到自定义的队列中的方式,打破了这种“默认的规则”,所以报了一个这样的警告。这种警告是在可控范围内的。

在这里插入图片描述

总结

RabbitMQ发送消息,为了确保消息的可靠性,保证消息能被交换机、队列收到,消息能被正常消费,而不会因消费失败而丢失,提供了对应的一系列方法,并且最后还提供了两种消费失败重试方法,优化了消费过程,非常Nice。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/779848.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Android WiFi框架概览

概览 Android 提供默认 Android 框架实现&#xff0c;其中包括对各种 WLAN 协议和模式的支持&#xff0c;这些协议和模式包括&#xff1a; WLAN 基础架构 (STA)网络共享模式或仅限本地模式下的 WLAN 热点 (Soft AP)WLAN 直连&#xff08;点对点&#xff09;WLAN 感知 (NAN)WL…

3.19 Bootstrap 面板(Panels)

文章目录 Bootstrap 面板&#xff08;Panels&#xff09;面板标题面板脚注带语境色彩的面板带表格的面板带列表组的面板 Bootstrap 面板&#xff08;Panels&#xff09; 本章将讲解 Bootstrap 面板&#xff08;Panels&#xff09;。面板组件用于把 DOM 组件插入到一个盒子中。创…

Python采集某网站小视频内容, m3u8视频内容下载

目录标题 前言环境使用:模块使用:代码实现步骤代码展示尾语 前言 嗨喽~大家好呀&#xff0c;这里是魔王呐 ❤ ~! 环境使用: python 3.8 运行代码 pycharm 2021.2 辅助敲代码 模块使用: import requests >>> pip install requests 内置模块 你安装好python环境就…

【机器学习】支持向量机SVM入门

优化目标 相较于之前学习的线性回归和神经网络&#xff0c;支持向量机&#xff08;Supprot Vector Machine&#xff0c;简称SVM&#xff09;在拟合复杂的非线性方程的时候拥有更出色的能力&#xff0c;该算法也是十分经典的算法之一。接下来我们需要学习这种算法 首先我们回顾…

ffplay播放器剖析(4)----音频输出和音频重采样流程

文章目录 1. 音频输出模块1.1 音频输出流程1.2 音频输出模型图 2. 打开SDL音频设备audio_open详解sdl_audio_callbackaudio_decode_frame 3. 音频重采样样本补偿 1. 音频输出模块 1.1 音频输出流程 打开SDL音频设备,设置参数启动SDL音频设备播放SDL音频回调函数读取数据,也就…

虚拟仿真实验室未授权获取账号密码

你应该在以后短暂的岁月里&#xff0c;真正活的不负众爱 漏洞描述 虚拟仿真实验室存在未授权访问漏洞&#xff0c;通过访问构造的Url可以获取敏感信息 漏洞复现 访问漏洞url&#xff1a; /admin/student/studentlist.html?page1成功获取所有用户的账号密码信息 文笔生疏…

遥感目标检测(2)--SCRDet

目录 一、概述 二、三个挑战 三、网络结构 1、SF-Net 2、MDA-Net&#xff08;Multi-Dimensional Attention Network&#xff09; 3、Rotation Branch 四、损失函数 五、实验 一、概述 SCRDet&#xff08;Towards More Robust Detection for Small,Cluttered and Rotate…

实验数据origin作图使用经验总结

使用Origin绘制实验数据图表时&#xff0c;可以遵循以下经验总结&#xff1a; 选择合适的图表类型&#xff1a; 根据实验数据的性质和目的&#xff0c;选择合适的图表类型&#xff0c;例如散点图、折线图、柱状图、饼图等。确保图表类型能够清晰地展示数据趋势和关系。 规范坐…

jenkins中运行python脚本时,报错:collecting ... collected 0 items

【问题描述】&#xff1a;jenkins在windows环境下运行python脚本时总是报collecting … collected 0 items 【问题定位】&#xff1a;jenkins工作目录和python文件目录不一样导致 【解决办法】&#xff1a;需要先把路径切换到项目目录下&#xff0c;再进行运行xxx.py文件&…

zabbix钉钉报警

登录钉钉客户端,创建一个群,把需要收到报警信息的人员都拉到这个群内. 然后点击群右上角 的"群机器人"->"添加机器人"->"自定义", 记录该机器人的webhook值。 添加机器人 在钉钉群中&#xff0c;找到只能群助手 添加机器人 选择自定义机…

springboot 根据不同环境 ,配置不同日志输出路径

logback-spring.xml<?xml version"1.0" encoding"UTF-8"?> <!-- scan&#xff1a;当此属性设置为true时&#xff0c;配置文件如果发生改变&#xff0c;将会被重新加载&#xff0c;默认值为true。 scanPeriod&#xff1a;设置监测配置文件是否有…

《零基础入门学习Python》第057讲:论一只爬虫的自我修养5:正则表达式

如果你在课后有勤加练习&#xff0c;那么你对于字符串的查找应该是已经深恶痛绝了&#xff0c;你发现下载一个网页是很容易的&#xff0c;但是要在网页中查找到你需要的内容&#xff0c;那就是困难的&#xff0c;你发现字符串查找并没有你想象的那么简单&#xff0c;并不是说直…

macOS mysql 8.0 忘记密码

╰─➤ mysql -V mysql Ver 8.0.33 for macos13.3 on arm64 (Homebrew)mysql.server status mysql.server stopskip-grant-tables 启动mysql ─➤ /opt…

Elemui表单合并

原代码形式 <template><el-table:data"tableData"borderstyle"width: 100%"><el-table-columnprop"date"label"日期"width"180"></el-table-column><el-table-columnprop"name"label…

Qt6 Qt Quick UI原型学习QML第五篇

文章目录 效果QML语法父文件 MyQML.qmlQML语法子文件 TLineEditV1.qmlQML语法子文件 TTextEdit.qml 效果 QML语法父文件 MyQML.qml import QtQuick 2.12 import QtQuick.Window 2.12 import QtQuick.Controls 2.12Window {id: windowvisible: truewidth: 600height: 600title:…

【043】解密C++ STL:深入理解并使用 list 容器

解密C STL&#xff1a;深入理解并使用list容器 引言一、list 容器概述二、list容器常用的API2.1、构造函数2.2、数据元素插入和删除操作2.3、大小操作2.4、赋值操作2.5、数据的存取2.6、list容器的反转和排序 三、使用示例总结 引言 &#x1f4a1; 作者简介&#xff1a;一个热爱…

详细解析python视频选择--【思维导图知识范围】

C ,JAVA JAVAWEB ,微信小程序等 都有视频选择的分析。 语言视频选择收录专辑链接C张雪峰推荐选择了计算机专业之后-在大学期间卷起来-【大学生活篇】JAVA黑马B站视频JAVA部分的知识范围、学习步骤详解JAVAWEB黑马B站视频JAVAWEB部分的知识范围、学习步骤详解SpringBootSpringB…

Linux QT通过NFS挂载到Linux开发板上

Linux QT通过NFS挂载到Linux开发板上 说明&#xff1a;这里使用的Linux开发板是正点原子的阿尔法开发板 创建NFS 环境 NFS简介 网络文件系统&#xff0c;英文 Network File System(NFS)&#xff0c;是由 SUN 公司研制的 UNIX 表示层协议 (presentation layer protocol)&…

机器学习术语解析与应用(二)

文章目录 &#x1f340;目标函数&#xff08;Objective Function&#xff09;&#x1f340;GPU加速&#xff08;GPU Acceleration&#xff09;&#x1f340;迁移学习&#xff08;Transfer Learning&#xff09;&#x1f340;自然语言处理&#xff08;Natural Language Processi…

opencv 之 外接多边形(矩形、圆、三角形、椭圆、多边形)使用详解

opencv 之 外接多边形&#xff08;矩形、圆、三角形、椭圆、多边形&#xff09;使用详解 本文主要讲述opencv中的外接多边形的使用&#xff1a; 多边形近似外接矩形、最小外接矩形最小外接圆外接三角形椭圆拟合凸包 将重点讲述最小外接矩形的使用 1. API介绍 #多边形近似 v…