RabbitMQ--重试机制

news2025/1/11 5:15:19

原文网址:RabbitMQ--重试机制_IT利刃出鞘的博客-CSDN博客

简介

说明

        本文介绍RabbitMQ的重试机制。

问题描述

        消费者默认是自动提交,如果消费时出现了RuntimException,会导致消息直接重新入队,再次投递(进入队首),进入死循环,继而导致后面的消息被阻塞。

        消息阻塞带来的后果是:后边的消息无法被消费;RabbitMQ服务端继续接收消息,占内存和磁盘越来越多。

RabbitMQ的自动确认

自动确认分四种情况(第一就是正常消费,其他三种为异常情况)

  1. 消息成功被消费,没有抛出异常,则自动确认,回复ack。
    不涉及requeue,毕竟已经成功了。requeue是对被拒绝的消息生效。
  2. 当抛出ImmediateAcknowledgeAmqpException异常的时候,则视为成功消费,确认该消息。
  3. 当抛出AmqpRejectAndDontRequeueException异常的时候,则消息会被拒绝,且requeue = false(该异常会在重试超过限制后抛出)
  4. 抛出其他的异常,消息会被拒绝,且requeue = true

        我遇到的是第四种情况,导致mq消息阻塞,并且消费者一直在消费同一条消息,然后抛异常,此时就进入了死循环。

消息未被确认时如下图所示:

RabbitMQ的重试机制

        本处使用spring-rabbit中自带的重试功能解决上述问题。

注意

        重试并不是RabbitMQ重新发送了消息,仅仅是消费者内部进行的重试,换句话说就是重试跟mq没有任何关系。

        不管消息被消费了之后是手动确认还是自动确认,代码中不能使用try/catch捕获异常,否则重试机制失效。

重试机制有2种情况

  1. 消息是自动确认时,如果抛出了异常导致多次重试都失败,消息被自动确认,消息就丢失了
  2. 消息是手动确认时,如果抛出了异常导致多次重试都失败,消息没被确认,也无法nack,就一直是unacked状态,导致消息积压。

RabbitMQ的重试的实例

配置

application.yml

spring:
  # RabbitMQ服务配置
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    listener:
      simple:
        # 重试机制
        retry:
          enabled: true #是否开启消费者重试
          max-attempts: 3 #最大重试次数
          initial-interval: 5000ms #重试间隔时间(单位毫秒)
          max-interval: 1200000ms #重试最大时间间隔(单位毫秒)
          # 乘子。间隔时间*乘子=下一次的间隔时间,不能超过max-interval
          # 以本处为例:第一次间隔 5 秒,第二次间隔 10 秒,以此类推
          multiplier: 2

代码

@RabbitListener(queues = "meat_queue")
public void processMeatTwo(String message) throws InterruptedException {
    System.out.println("processMeatTwo消费了队列meat_queue的消息:" + message);
    Thread.sleep(1000);
    //模拟异常
    String is = null;
    is.toString();
}

结果

        可以看到,消息重试了5次,之后会抛出ListenerExecutionFailedException的异常。后面附带着Retry Policy Exhausted,提示我们重试次数已经用尽了。

        消息重试次数用尽后,消息就会被抛弃。

重试完之后对消息的处理

概述

        消息在重试完之后,会调用MessageRecoverer接口的recover方法。MessageRecoverer接口有如下三个实现类(看它们名字即可知道含义):

  • RejectAndDontRequeueRecoverer:拒绝而且不把消息重新放入队列(默认)
  • RepublishMessageRecoverer:重新发布消息
  • ImmediateRequeueMessageRecoverer:立即把消息重新放入队列

处理示例

        默认情况下是RejectAndDontRequeueRecoverer:拒绝而且不把消息重新放入队列。我们可以使用RepublishMessageRecoverer,重新发布消息,将它发布到其他队列,后边对它进行补偿处理。

先创建一个异常队列,然后与交换机绑定进行绑定,绑定之后设置MessageRecoverer。

@Configuration
public class MQErrorConfig {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    private static String errorTopicExchange = "error-topic-exchange";
    private static String errorQueue = "error-queue";
    private static String errorRoutingKey = "error-routing-key";

    //创建异常交换机
    @Bean
    public TopicExchange errorTopicExchange(){
        return new TopicExchange(errorTopicExchange, true, false);
    }

    //创建异常队列
    @Bean
    public Queue errorQueue(){
        return new Queue(errorQueue, true);
    }

    //队列与交换机进行绑定
    @Bean
    public Binding BindingErrorQueueAndExchange(Queue errorQueue, TopicExchange errorTopicExchange){
        return BindingBuilder.bind(errorQueue).to(errorTopicExchange).with(errorRoutingKey);
    }

    //设置MessageRecoverer
    @Bean
    public MessageRecoverer messageRecoverer(){
        //AmqpTemplate和RabbitTemplate都可以
        return new RepublishMessageRecoverer(rabbitTemplate, errorTopicExchange, errorRoutingKey);
    }
}

查看处理结果:

        通过控制台可以看到,消息重试5次以后直接以新的routingKey发送到了配置的交换机中,此时再查看监控页面,可以看原始队列中已经没有消息了,但是配置的异常队列中存在一条消息:

源码分析

        上面的例子在测试中发现了一个问题,就是经过5次重试以后,控制台输出了一个异常的堆栈日志,然后队列中的数据也被ack掉了(自动ack模式),首先我们看一下这个异常日志是什么。

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Retry Policy Exhausted

        出现消息被消费掉并且出现上述异常的原因是因为在构建SimpleRabbitListenerContainerFactoryConfigurer类时使用了MessageRecoverer接口,这个接口有一个cover方法,用来实现重试完成之后对消息的处理,源码如下:

ListenerRetry retryConfig = configuration.getRetry();
if (retryConfig.isEnabled()) {
    RetryInterceptorBuilder<?, ?> builder = (retryConfig.isStateless()) ? RetryInterceptorBuilder.stateless()
            : RetryInterceptorBuilder.stateful();
    RetryTemplate retryTemplate = new RetryTemplateFactory(this.retryTemplateCustomizers)
            .createRetryTemplate(retryConfig, RabbitRetryTemplateCustomizer.Target.LISTENER);
    builder.retryOperations(retryTemplate);
    MessageRecoverer recoverer = (this.messageRecoverer != null) ? this.messageRecoverer
            : new RejectAndDontRequeueRecoverer(); // 1
    builder.recoverer(recoverer);
    factory.setAdviceChain(builder.build());

        注意看1处的代码,默认使用的是RejectAndDontRequeueRecoverer实现类,根据实现类的名字我们就可以看出来该实现类的作用就是拒绝并且不会将消息重新发回队列,我们可以看一下这个实现类的具体内容:

public class RejectAndDontRequeueRecoverer implements MessageRecoverer {
    protected Log logger = LogFactory.getLog(RejectAndDontRequeueRecoverer.class); // NOSONAR protected
    @Override
    public void recover(Message message, Throwable cause) {
        if (this.logger.isWarnEnabled()) {
            this.logger.warn("Retries exhausted for message " + message, cause);
        }
        throw new ListenerExecutionFailedException("Retry Policy Exhausted",
                    new AmqpRejectAndDontRequeueException(cause), message);
    }
}

        上述源码给出了异常的来源,但是未看到拒绝消息的代码,猜测应该是使用aop的方式实现的,此处不再继续深究。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/81097.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【iOS】—— MVVM模式

MVVM模式 文章目录MVVM模式为什么使用MVVM&#xff1f;MVVM分别代表什么含义&#xff1f;MVVM通信关系MVVM模式的优缺点优点:缺点:概括总结MVVM文件分类为什么使用MVVM&#xff1f; iOS中&#xff0c;我们使用的大部分都是MVC架构。虽然MVC的层次明确&#xff0c;但是由于功能日…

C# 11新特性之file关键字

C#11 添加了文件作用域类型功能&#xff1a;一个新的 file 修饰符&#xff0c;可以应用于任何类型定义以限制其只能在当前文件中使用。这样&#xff0c;我们可以在一个项目中拥有多个同名的类。 目录示例file不可以与其他修饰符一起使用file可以修饰的类型file 不可修饰的类型f…

主流报表开发工具FastReport.Net全新发布,邀您体验最新版试用

FastReport .Net是一款适用于 WinForms、Blazor Server、ASP.NET、MVC、.NET 6 和 .NET Core 的报告生成工具。FastReport代表着“速度”、“可靠”和“品质”&#xff0c;是当今主流的报表开发工具。 该产品在本月进行了重大版本v2023的发布&#xff0c;接下来让我们一起看看…

【OpenCV-Python】教程:4-9 特征匹配 match

OpenCV Python 特征匹配 【目标】 特征匹配Brute-Force Matcher 和 FLANN Matcher 【理论】 Brute-Force Matcher字面意思是蛮力匹配器&#xff0c;所以它的过程也很简单&#xff0c;从一个集合里取出一个特征描述子&#xff0c;然后与第二个集合里的特征逐个的进行匹配比较…

传统MES架构的智能化改进---python在Aprol上的实践

一、开题依据 MES是属于生产车间级的管理信息系统。作为生产与计划之间的信息“集线器”&#xff0c;MES 主要包括以下功能模块&#xff1a;工序详细调度、资源分配和状态管理、生产单元分配、过程管理、人力资源管理、维护管理、质量管理、文档控制、产品跟踪和产品清单管理、…

Solidworks导出为URDF用于MoveIT总结(带prismatic)

环境 Solidwoks2018 SP0 / Solidwoks2021 SP5&#xff1b;Ubuntu20.04&#xff1b;ROS1 Noetic; Solidwoks2018 SP0对于平移副有问题&#xff0c;显示不出来&#xff0c;Solidwoks2021 SP5没有问题。 官网有段话&#xff1a; There is a known STL export bug with SolidWork…

Jdk Tomcat 安装教程 — 2022.12.11

文章目录一、安装jdk教程二、tomcat 安装三、修改Tomcat端口号安装Tomcat之前要确保安装jdk一、安装jdk教程 安装vim命令包&#xff0c;此操作如果执行不了&#xff0c;需要使用root权限执行 执行如下命令&#xff1a; yum install -y vim-enhanced2. 下载jdk安装包&#xff…

3D打印切片软件Cura入门

安装好之后&#xff0c;添加一台打印机&#xff0c;参数可以随便设置。 Cura安装包&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1T1MBcZYBCVfhtFKDBjypmQ?pwd2022 提取码&#xff1a;2022 基本操作 按住鼠标右键不放&#xff1a;旋转 按住滚轮不放&#xff1…

制作USB启动盘(U盘安装ubuntu20.04)

文章目录制作USB启动盘&#xff08;U盘安装ubuntu20.04&#xff09;制作USB启动盘的工具ubuntu20.04系统安装u盘制作进入bios制作USB启动盘&#xff08;U盘安装ubuntu20.04&#xff09; 制作USB启动盘的工具 制作USB启动盘的工具有rufus&#xff0c;UNetbootin&#xff0c;Un…

汇编语言—第1章 各类存储芯片及内存空间

1、各类存储器芯片 一台PC机中&#xff0c;装有多个存储类芯片&#xff0c;这些存储器芯片从物理连接上来看是独立的、不同的器件。 &#xff08;1&#xff09;随机存储器 用于存放供CPU使用的绝大部分程序和数据 &#xff08;2&#xff09;装有BIOS&#xff08;Basic Input/Ou…

AI遮天传 DL-反馈神经网络RNN

本文会先介绍动态系统的概念&#xff0c;然后介绍两种简单的反馈神经网络&#xff0c;然后再介绍两种门控神经网络(LSTM, GRU)&#xff0c;最后是关于反馈神经网络的应用(本次以语音识别为例)。 RNN: Recurrent neural network&#xff0c;一般叫它“反馈神经网络”或者“循环神…

1565_AURIX_TC275_开关电源模式相关寄存器

全部学习汇总&#xff1a; GreyZhang/g_TC275: happy hacking for TC275! (github.com) 这个寄存器可以设置开关电源的开关频率&#xff0c;之前在文档中看到过这个默认的数值是1.5M的频率&#xff0c;现在看来应该是这个1.56M的一个近似了。准确的数值不是1.5M而是1.56M。 1. …

任务四:标准化组织概览

标准化组织概览一、标准化组织1、ITU电信标准化部门无线电通信部门电信发展部门2、3GPP3、3GPP24、CCSA二、TDD-LTE与FDD-LTE系统的对比三、LTE技术特点及基本指标1、LTE主要技术特点2、峰值数据速率3、控制面延迟4、用户面延迟5、用户吞吐量6、频谱效率7、移动性8、覆盖9、频谱…

同事跳槽拿下阿里 P6Offer,程序员:会点基础还真不行

前阵子&#xff0c;同事程序员 H 偷偷的向阿里菜鸟投递了自己的简历... 不久后程序员 H 就收到了阿里菜鸟的面试通知&#xff0c;经历 5 轮面试&#xff0c;一举成功拿下 offer 并定级 P6。 小天趁着未来的阿里大佬还在身边&#xff0c;向程序员 H 讨教了一下面试阿里菜鸟的经…

法则三:架构师如何在一定时间内最大化自己的增量价值

法则三&#xff1a;架构师如何在一定时间内最大化自己的增量价值 作为一个架构师&#xff0c;必须要创造足够的商业价值&#xff0c;才能保障自己职业的长期。 那么你作为架构师&#xff0c;该如何为你的公司、部门或团队提供可量化的增量价值呢&#xff1f; 主要有扩大收入与…

2022.12.11 学习周报

文章目录摘要文献阅读1.题目2.摘要3.传统RNN存在的问题4.RNN与IndRNN的对比4.1 隐含层状态更新公式4.2 结构示意图4.3 IndRNN的优势5.IndRNN的分析5.1 RNN5.2 LSTM5.3 IndRNN的初始化5.4 梯度截断5.5 IndRNN6.实验结果6.1 Adding Problem6.2 Sequential MNIST Classification6.…

Spring 体系常用项目

如今做Java尤其是web几乎是避免不了和Spring打交道了&#xff0c;但是Spring是这样的大而全&#xff0c;新鲜名词不断产生&#xff0c;学起来给人一种凌乱的感觉&#xff0c;我就在这里总结一下&#xff0c;理顺头绪。 Spring Spring 概述 Spring 是一个开源框架&#xff0c…

R语言学习笔记——基础篇:第六章-基本图形

R语言 R语言学习笔记——入门篇&#xff1a;第六章-基本图形 文章目录R语言一、条形图1.1、垂直与水平条形图补——数据为因子时绘制垂直与水平条形图1.2、堆砌条形图与分组条形图1.3、数据整合条形图1.4、条形图的微调1.5、棘状图二、饼图三、直方图四、核密度图4.1、简易核密…

CSS基础-装饰,基线,光标类型,边框圆角(胶囊),文字溢出,元素隐藏,边框合并,css画三角形...

CSS基础-装饰 目录CSS基础-装饰1.1 认识基线(了解)1.2 文字对齐问1.3 垂直对齐方式(拓展)项目中 vertical-align 可以解决的问题2.1 光标类型3.1 边框圆角3.2 边框圆角的常见应用4.1 溢出部分显示效果5.1 元素本身隐藏(拓展)元素整体透明度(拓展)边框合并(拓展)用CSS画三角形技…

关键字(五):const和volatile

关键字一."令人误解"的关键字—const1.const的基本特质2.const的各种应用场景1.修饰变量2.修饰数组3.修饰指针4.修饰函数的参数5.修饰返回值二.最易变的关键字—volatile一."令人误解"的关键字—const 1.const的基本特质 简单的示例 const所修饰的变量不可…