解决RabbitMQ设置x-max-length队列最大长度后不进入死信队列

news2024/11/15 8:23:36

解决RabbitMQ设置x-max-length队列最大长度后不进入死信队列

  • 问题发现
  • 问题解决
    • 方法一:只监听死信队列,在死信队列里面处理业务逻辑
    • 方法二:修改预取值

问题发现

最近再学习RabbitMQ过程中,看到关于死信队列内容:

来自队列的消息可以是 “死信”,这意味着当以下四个事件中的任何一个发生时,这些消息将被重新发布到 Exchange

  1. 使用 basic.rejectbasic.nackrequeue 参数设置为 false 的使用者否定该消息
  2. 消息由于每条消息的 TTL 而过期
  3. 队列超出了长度限制

之前的文章里面有讲解过TTL过期后不进入死信队列的疑惑和解决办法,然后上手去实践另一个死信队列的方法,结果又是一道坑等着我,示例代码如下:

默认自动应答模式

@Configuration
public class MQConfig {
    /**
     * 死信队列
     * @return
     */
    @Bean
    public Queue deadQueue(){
        return new Queue("dead_queue");
    }
    /**
     * 死信队列交换机
     * @return
     */
    @Bean
    public DirectExchange deadExchange(){
        return new DirectExchange("dead.exchange");
    }

    /**
     * 死信队列和死信交换机绑定
     * @return
     */
    @Bean
    public Binding deadBinding(){
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");
    }

    /**
     * 普通队列
     * @return
     */
    @Bean
    public Queue queue(){
//          方法一
//        Queue normalQueue = new Queue("normal_queue");
//        normalQueue.addArgument("x-dead-letter-exchange", "dead.exchange"); // 死信队列
//        normalQueue.addArgument("x-dead-letter-routing-key", "dead"); // 死信队列routingKey
//        normalQueue.addArgument("x-max-length", 5);//设置队列最大长度
//        normalQueue.addArgument("x-overflow","reject-publish");//最近发布的消息将被丢弃
//        方法二
        return  QueueBuilder.durable("normal_queue")
                .deadLetterExchange("dead.exchange")
                .deadLetterRoutingKey("dead")
                .maxLength(5) // 设置队列最大长度为5
                .build();
    }
    /**
     * 普通交换机
     * @return
     */
    @Bean
    public DirectExchange normalExchange(){
        return new DirectExchange("normal.exchange");
    }

    /**
     * 普通队列和普通交换机绑定
     * @return
     */
    @Bean
    public Binding binding(){
        return BindingBuilder.bind(queue()).to(normalExchange()).with("normal");
    }
}

监听普通队列消费方,示例代码如下:

@Component
@RabbitListener(queues = "normal_queue")
public class MQReceiver {
    private static final Logger log = LoggerFactory.getLogger(MQReceiver.class);

    @RabbitHandler
    public void receive(String msg) {
        log.info("收到消息:"+msg);
    }
}

监听死信队列消费方,示例代码如下:

@Component
@RabbitListener(queues = "dead_queue")
public class MQReceiver2 {
    private static final Logger log = LoggerFactory.getLogger(MQReceiver2.class);

    @RabbitHandler
    public void receive(String msg) {
        log.info("死信队列收到消息:{}",msg);
    }
}

发送方发送10条消息,示例代码如下:

@Component
public class MQSender {
    @Autowired
    private RabbitTemplate template;

    public void send() {
        for (int i = 0; i < 10; i++) {
            String msg = "hello world_"+i;
            template.convertAndSend("normal.exchange", "normal", msg);
        }
    }
}

然后调用send()方法,执行结果如图:

在这里插入图片描述

按道理会将队列前面的5条消息进入死信队列,然后剩下的五条消息正常消费才对,我们检查一下队列是否设置成功,如图所示

在这里插入图片描述
设置没有问题,那就很懵逼了。。。

问题解决

我们先在页面上向普通队交换机发送10条消息,然后查看它的状态,如图所示:
在这里插入图片描述
超过5条消息就会放入死信队列中,如图所示:

.
然后再看一下,默认行为是从队列前面删除或死信消息,如图所示:

在这里插入图片描述
我们可以看到普通队列存放的是最后5条消息,前面的5条消息进入死信队列。也就是说,再没有进入普通消费者之前会将队列前面删除或死信消息(进入消费者之前将消息进行分配)。

方法一:只监听死信队列,在死信队列里面处理业务逻辑

这种做法也是网上大多数文章的一种处理方法(另外一种情况就是进入普通消费者,还没被消费完的情况下,消费者挂了,然后队列就会重新分配,将从队列前面删除或者进入死信队列),如图所示:

在这里插入图片描述

但是这些做法都是基于没有普通消费者监听的情况下进行的,感觉和我理解的略有偏差(应该是再有普通消费者监听和死信队列监听的情况下,发送消息时会对消息进行分配处理)。

发送方代码和配置的代码就不重复展示了(参考之前示例),监听死信队列(自动确认模式和手动确认模式都一样),示例代码如下:

@Component
@RabbitListener(queues = "dead_queue")
public class MQReceiver2 {
    private static final Logger log = LoggerFactory.getLogger(MQReceiver2.class);

    @RabbitHandler
    public void receive(String msg,Message message,Channel channel) throws IOException {
        log.info("死信队列收到消息:{}",msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

    }
}

调用send()方法,执行结果如图:

在这里插入图片描述
然后再死信队列里面处理业务逻辑即可。

方法二:修改预取值

再监听普通队列时抛异常或者手动拒绝重新进入队列,这两种方式都不能达到我想要的效果,我发现只要有普通消费方监听就会进入消费,然后我就在想是不是因为预期值的问题导致,可能我消息数量太少了,然后默认预期值太高导致消息直接进入消费,然后将预取值改为1自动确认模式,示例代码如下:

spring.rabbitmq.listener.simple.prefetch=1

发送方代码和配置的代码就不重复展示了(参考之前示例),监听普通队列,示例代码如下:

@Component
@RabbitListener(queues = "normal_queue")
public class MQReceiver {
    private static final Logger log = LoggerFactory.getLogger(MQReceiver.class);

    @RabbitHandler
    public void receive(String msg) throws InterruptedException {
        log.info("收到消息:"+msg);
        // 模拟业务处理队列等待场景
        Thread.sleep(10000);
    }
}

监听死信队列,示例代码如下:

@Component
@RabbitListener(queues = "dead_queue")
public class MQReceiver2 {
    private static final Logger log = LoggerFactory.getLogger(MQReceiver2.class);

    @RabbitHandler
    public void receive(String msg) {
        log.info("死信队列收到消息:{}",msg);

    }
}

调用send()方法,执行结果如图:

在这里插入图片描述
当然有的时候也不一定完全按照你设置的最大长度进入死信队列,有的时候消费速度太快(队列的第一个已经被消费了的情况),得看实际情况,至少可以确保再设置了大于队列最大长度时是可以正常进入死信队列的。归根结底:消息数量太少了

另外我们再来介绍一下溢出方式,一般默认情况下溢出方式为:drop-head(从队列前面删除或者进入死信队列),除此之外还有两种:

  • reject-publishreject-publish-dlx:最近发布的消息将被丢弃。reject-publishreject-publish-dlx 之间的区别在于 reject-publish-dlx 也是死信拒绝消息。

将配置的溢出模式改为reject-publish,示例代码如下:

@Configuration
public class MQConfig {
    /**
     * 死信队列
     * @return
     */
    @Bean
    public Queue deadQueue(){
        return new Queue("dead_queue");
    }
    /**
     * 死信队列交换机
     * @return
     */
    @Bean
    public DirectExchange deadExchange(){
        return new DirectExchange("dead.exchange");
    }

    /**
     * 死信队列和死信交换机绑定
     * @return
     */
    @Bean
    public Binding deadBinding(){
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");
    }

    /**
     * 普通队列
     * @return
     */
    @Bean
    public Queue queue() {
//          方法一
//        Queue normalQueue = new Queue("normal_queue");
//        normalQueue.addArgument("x-dead-letter-exchange", "dead.exchange"); // 死信队列
//        normalQueue.addArgument("x-dead-letter-routing-key", "dead"); // 死信队列routingKey
//        normalQueue.addArgument("x-max-length", 5);//设置队列最大长度
//        normalQueue.addArgument("x-overflow","reject-publish");//最近发布的消息将被丢弃
//        方法二
        return  QueueBuilder.durable("normal_queue")
                .deadLetterExchange("dead.exchange")
                .deadLetterRoutingKey("dead")
                .maxLength(5) // 设置队列最大长度为5
                .overflow(QueueBuilder.Overflow.dropHead)
                .build();
    }
    /**
     * 普通交换机
     * @return
     */
    @Bean
    public DirectExchange normalExchange(){
        return new DirectExchange("normal.exchange");
    }

    /**
     * 普通队列和普通交换机绑定
     * @return
     */
    @Bean
    public Binding binding(){
        return BindingBuilder.bind(queue()).to(normalExchange()).with("normal");
    }
}

重新创建队列,调用send()方法,如图所示:

在这里插入图片描述
由图可以,死信队列并不会受到消息。

然后我们再来看看将溢出模式设置为reject-publish-dlxQueueBuilder.Overflow没有该参数,手动定义),示例代码如下:

@Configuration
public class MQConfig {
    //忽略死信队列创建和绑定过程,参考前面示例...

    /**
     * 普通队列
     * @return
     */
    @Bean
    public Queue queue() {
        Queue normalQueue = new Queue("normal_queue");
        normalQueue.addArgument("x-dead-letter-exchange", "dead.exchange"); // 死信队列
        normalQueue.addArgument("x-dead-letter-routing-key", "dead"); // 死信队列routingKey
        normalQueue.addArgument("x-max-length", 5);//设置队列最大长度
        normalQueue.addArgument("x-overflow","reject-publish-dlx"); //最近发布的消息进入死信队列
        return normalQueue;
    }
    //忽略普通队列创建过程,参考前面示例...
}

重新创建队列,调用send()方法,如图所示:

在这里插入图片描述

如果你有更好的方法或者不同的理解,欢迎评论区交流。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2156604.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

计算机组成原理——存储系统

计算机组成原理——存储系统 存储器层次结构 存储器层次结构如下&#xff1a; 寄存器&#xff08;CPU&#xff09;Cache&#xff08;高速缓冲存储器&#xff09;主存磁盘磁带、光盘等 按照上述层次结构&#xff0c;自下而上速度依次增快、容量相对依次渐小、造价越来越高昂…

OpenHarmony(鸿蒙南向开发)——小型系统内核(LiteOS-A)【文件系统】上

往期知识点记录&#xff1a; 鸿蒙&#xff08;HarmonyOS&#xff09;应用层开发&#xff08;北向&#xff09;知识点汇总 鸿蒙&#xff08;OpenHarmony&#xff09;南向开发保姆级知识点汇总~ 子系统开发内核 轻量系统内核&#xff08;LiteOS-M&#xff09; 轻量系统内核&#…

锂电池基础知识

1. 电池的发展史 电池是将化学能转变为电能的装置&#xff0c;通过电池内部的化学反应向外部提供直流电能 1800年Vote伏打电堆 1835年英国Daniel丹尼尔电池 1859年法国Plante铅酸蓄电池 1866年法国Leclanche锌锰电池 1899年瑞典Jungner镍镉电池 1950年Urry碱性电池 1990年索尼…

鸿蒙OpenHarmony【轻量系统内核扩展组件(C++支持)】子系统开发

C支持 基本概念 C作为目前使用最广泛的编程语言之一&#xff0c;支持类、封装、重载等特性&#xff0c;是在C语言基础上开发的一种面向对象的编程语言。 运行机制 C代码的识别主要由编译器支持&#xff0c;系统主要对全局对象进行构造函数调用&#xff0c;进行初始化操作。…

【漏洞复现】用友 NC-Cloud queryStaffByName Sql注入漏洞

免责声明&#xff1a; 本文内容旨在提供有关特定漏洞或安全漏洞的信息&#xff0c;以帮助用户更好地了解可能存在的风险。公布此类信息的目的在于促进网络安全意识和技术进步&#xff0c;并非出于任何恶意目的。阅读者应该明白&#xff0c;在利用本文提到的漏洞信息或进行相关测…

智能农业系统——作物生长模型

橙蜂智能公司致力于提供先进的人工智能和物联网解决方案&#xff0c;帮助企业优化运营并实现技术潜能。公司主要服务包括AI数字人、AI翻译、AI知识库、大模型服务等。其核心价值观为创新、客户至上、质量、合作和可持续发展。 橙蜂智农的智慧农业产品涵盖了多方面的功能&#x…

windows 驱动实例分析系列-COM驱动案例讲解

COM也被称之为串口,这是一种非常简单的通讯接口,这种结构简单的接口被广泛的应用在开发中,几乎所有系统都能支持这种通讯接口,它有RS232和RS485等分支,但一般我们都会使用RS232作为常见的串口,因为它足够简单和高效。 几乎所有的开发板,都会提供用于烧录、调试、日志的…

《Pyramid Vision Transformer》论文笔记

原文笔记 What 为了解决VIT在视觉任务上的局限性并且探究Transformer模型在视觉任务上的应用&#xff0c;这项工作提出了一种纯 Transformer 主干&#xff0c;称为 Pyramid Vision Transformer (PVT)&#xff0c;它可以作为 CNN 主干在许多下游任务中的替代方案&#xff0c;包…

【人工智能】Linux系统Mamba安装流程

在编译安装 mamba 之前&#xff0c;你需要确保已安装正常的PyTorch环境。 # 安装必要的系统依赖 sudo apt update sudo apt install build-essential # 安装mamba依赖 pip install packaging wheel # 克隆仓库 git clone https://github.com/Dao-AILab/causal-conv1d.git git …

【二等奖论文】2024年华为杯研赛D题成品论文(后续会更新)

您的点赞收藏是我继续更新的最大动力&#xff01; 一定要点击如下的卡片&#xff0c;那是获取资料的入口&#xff01; 点击链接获取【2024华为杯研赛资料汇总】&#xff1a; https://qm.qq.com/q/jTIeGzwkSchttps://qm.qq.com/q/jTIeGzwkSc 题 目&#xff1a; 大数据驱动的…

fastadmin 根据选择数据来传参给selectpage输入框

文章目录 js代码php代码&#xff1a;完结 js代码 $(document).on(change,#table .bs-checkbox [type"checkbox"],function(){let url$(#chuancan).attr(data-url)urlurl.split(?)[0]let idsTable.api.selectedids(table)if(ids.length){let u_id[]ids.forEach(eleme…

torch.embedding 报错 IndexError: index out of range in self

文章目录 1. 报错2. 原因3. 解决方法 1. 报错 torch.embedding 报错&#xff1a; IndexError: index out of range in self2. 原因 首先看下正常情况&#xff1a; import torch import torch.nn.functional as Finputs torch.tensor([[1, 2, 4, 5], [4, 3, 2, 9]]) embedd…

【C++ Primer Plus习题】17.3

大家好,这里是国中之林! ❥前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站。有兴趣的可以点点进去看看← 问题: 解答: #include <iostream> #include <fstream> using namesp…

PHP、Java等其他语言转Go时选择GoFly快速快速开发框架指南

概要 经过一年多的发展GoFly快速开发框架已被一千多家科技企业或开发者用于项目开发&#xff0c;它的简单易学得到其他语言转Go首选框架。且企业版的发展为GoFly社区提供资金&#xff0c;这使得GoFly快速框架得到良好的发展&#xff0c;GoFly技术团队加大投入反哺科技企业和开…

数据结构之搜索二叉树

目录 一、什么是搜索二叉树 基本概念 特点 注意事项 二、搜索二叉树的C实现 2.0 构造与析构 2.1 插入 2.2 查找 2.3 删除 2.3.1 无牵无挂型 2.3.2 独生子女型 2.3.3 儿女双全型 三、搜索二叉树的应用 3.1 key搜索 3.2 key/value搜索 一、什么是搜索二叉树 搜索二…

EAGLE——探索混合编码器的多模态大型语言模型的设计空间

概述 准确解释复杂视觉信息的能力是多模态大型语言模型 (MLLM) 的关键重点。最近的研究表明&#xff0c;增强的视觉感知可显著减少幻觉并提高分辨率敏感任务&#xff08;例如光学字符识别和文档分析&#xff09;的性能。最近的几种 MLLM 通过利用视觉编码器的混合来实现这一点…

科研绘图系列:R语言ggplot2画热图(heatmap)

文章目录 介绍加载R包导入数据数据预处理画图导出数据系统信息介绍 热图(Heatmap)是一种数据可视化技术,它通过颜色的变化来表示数据的大小或者密度。热图通常用于展示两个变量之间的关系,或者在二维空间上展示数据的分布情况。以下是热图可以表示的一些内容: 数据分布:…

网络原理 HTTP与HTTPS协议

博主主页: 码农派大星. 数据结构专栏:Java数据结构 数据库专栏:MySQL数据库 JavaEE专栏:JavaEE 关注博主带你了解更多计算机网络知识 目录 1.HTTP概念 2.HTTP报文格式 3.HTTP请求 1.首行 1.1URL 1.2 GET⽅法 1.3 POST⽅法 1.4 其他⽅法 2.请求头&#xff08;head…

JVM面试问题集

什么是JVM? 了解过字节码文件的组成吗? 说一下运行时数据区 哪些区域会出现内存溢出&#xff0c;会有什么现象? JM在JDK6-8之间在内存区域上有什么不同 类的生命周期 什么是类加载器 什么是双亲委派机制 打破双亲委派机制 Tomcat的自定义类加载器

【网络通信基础与实践番外一】多图预警之图解UDP和TCP前置知识

参考大佬的文章https://www.cnblogs.com/cxuanBlog/p/14059379.html 一、宏观架构中的传输层 在计算机中&#xff0c;任何一个可以交换信息的介质都可以称为端系统。计算机网络的运输层则负责把报文从一端运输到另一端&#xff0c;运输层实现了让两个互不相关的主机进行了逻辑…