RabbitMQ的死信队列和延迟队列

news2025/1/12 13:18:46

文章目录

    • 死信队列
    • 如何配置死信队列
      • 死信队列的应用场景
      • Spring Boot实现RabbitMQ的死信队列
    • 延迟队列
      • 方案优劣:
      • 延迟队列的实现有两种方式:

死信队列

1)“死信”是RabbitMQ中的一种消息机制。
2)消息变成死信,可能是由于以下的原因
● 消息被拒绝
● 消息过期
● 队列达到最大长度
3)死信队列
当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是 DLX(Dead-Letter-Exchange ) ,绑定 DLX 的队列就称之为死信队列。
“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。

如何配置死信队列

  1. 配置业务队列,绑定到业务交换机上
  2. 为业务队列配置死信交换机和路由key
  3. 为死信交换机配置死信队列
    注意:并不是直接声明一个公共的死信队列,然后所以死信消息就自己跑到死信队列里去了。而是为每个需要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的路由key。
    有了死信交换机和路由key后,接下来,就像配置业务队列一样,配置死信队列,然后绑定在死信交换机上。也就是说,死信队列并不是什么特殊的队列,只不过是绑定在死信交换机上的队列。死信交换机也不是什么特殊的交换机,只不过是用来接受死信的交换机,所以可以为任何类型【Direct、Fanout、Topic】。一般来说,会为每个业务队列分配一个独有的路由key,并对应的配置一个死信队列进行监听,也就是说,一般会为每个重要的业务队列配置一个死信队列。
    具体因为队列消息过期而被投递到死信队列的流程:
    在这里插入图片描述

死信队列其实并没有什么神秘的地方,不过是绑定在死信交换机上的普通队列,而死信交换机也只是一个普通的交换机,不过是用来专门处理死信的交换机。
死信消息的生命周期:
1)业务消息被投入业务队列
2)消费者消费业务队列的消息,由于处理过程中发生异常,于是进行了nck或者reject操作
3)被nck或reject的消息由RabbitMQ投递到死信交换机中
4)死信交换机将消息投入相应的死信队列
5)死信队列的消费者消费死信消息
死信消息是RabbitMQ为我们做的一层保证,其实我们也可以不使用死信队列,而是在消息消费异常时,将消息主动投递到另一个交换机中,当你明白了这些之后,这些Exchange和Queue想怎样配合就能怎么配合。比如从死信队列拉取消息,然后发送邮件、短信、钉钉通知来通知开发人员关注。或者将消息重新投递到一个队列然后设置过期时间,来进行延时消费

死信队列的应用场景

一般用在较为重要的业务队列中,确保未被正确消费的消息不被丢弃,一般发生消费异常可能原因主要有由于消息信息本身存在错误导致处理异常,处理过程中参数校验异常,或者因网络波动导致的查询异常等等,当发生异常时,当然不能每次通过日志来获取原消息,然后让运维帮忙重新投递消息。
通过配置死信队列,可以让未正确处理的消息暂存到另一个队列中,待后续排查清楚问题后,编写相应的处理代码来处理死信消息,这样比手工恢复数据要好太多了。

Spring Boot实现RabbitMQ的死信队列

当您在使用Spring Boot实现RabbitMQ的死信队列时,您需要完成以下步骤:

  1. 添加Maven依赖
    确保您的pom.xml文件中包含以下Maven依赖:
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

这将为您的应用程序提供RabbitMQ的基本支持。
2. 配置RabbitMQ连接信息
在application.properties或application.yml文件中添加RabbitMQ的连接信息,包括主机地址、用户名、密码等。
3. 创建RabbitMQConfig类
创建一个配置类,用于定义交换机、队列以及它们之间的绑定关系。在这个类中,您需要定义普通队列、死信队列、死信交换机,并将它们进行绑定。
4. 创建消费者类
创建一个消费者类,使用@RabbitListener注解标记需要监听的死信队列,并在方法上使用@RabbitHandler注解来处理接收到的死信消息。
5. 发送消息到普通队列
在需要发送消息的地方,使用RabbitTemplate发送消息到普通队列中。当消息因为过期或被拒绝接收时,会被标记为死信消息,并根据参数设置转发到死信交换机中,然后路由到死信队列中。

下面是一个简单的示例代码,演示了如何在Spring Boot中实现RabbitMQ的死信队列,并消费死信队列的消息:

// RabbitMQConfig.java
@Configuration
public class RabbitMQConfig {

    @Bean
    public Queue myQueue() {
        return QueueBuilder.durable("my_queue")
                .withArgument("x-dead-letter-exchange", "dlx_exchange")
                .withArgument("x-dead-letter-routing-key", "dlq_queue")
                .build();
    }

    @Bean
    public Queue dlqQueue() {
        return QueueBuilder.durable("dlq_queue").build();
    }

    @Bean
    public Exchange dlxExchange() {
        return ExchangeBuilder.directExchange("dlx_exchange").durable(true).build();
    }

    @Bean
    public Binding binding(Queue myQueue, Exchange dlxExchange) {
        return BindingBuilder.bind(myQueue).to(dlxExchange).with("my_queue").noargs();
    }
}

// DeadLetterQueueConsumer.java
@Component
@RabbitListener(queues = "dlq_queue")
public class DeadLetterQueueConsumer {

    @RabbitHandler
    public void processDeadLetterMessage(String message) {
        System.out.println("Received message from dead letter queue: " + message);
        // 处理接收到的死信消息
    }
}

// RabbitMQService.java
@Service
public class RabbitMQService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage() {
        rabbitTemplate.convertAndSend("my_queue", "Hello, RabbitMQ!");
    }
}

通过以上步骤,您就可以在Spring Boot项目中实现RabbitMQ的死信队列,并消费死信队列的消息。

延迟队列

延迟队列存储的对象是对应的延迟消息;所谓“延迟消息” 是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
在RabbitMQ中延迟队列可以通过 过期时间 + 死信队列 来实现;具体如下流程图所示:

在这里插入图片描述

RabbitMQ 的基因中没有延时队列这回事,它不能直接指定一个队列类型为延时队列,然后去延时处理,但是经过上面两节的铺垫,我们可以将 TTL+DLX 相结合,这就能组成一个延时队列。
设想一个场景,下完订单之后 15 分钟未付款我们就要将订单关闭,这就是一个很经典的演示消费的场景,如果拿 RabbitMQ 来做,我们就需要结合 TTL+DLX 了。
先把订单消息设置好 15 分钟过期时间,然后过期后队列将消息转发给我们设置好的 DLX-Exchange,DLX-Exchange 再将分发给它绑定的队列,我们的消费者再消费这个队列中的消息,就做到了延时十五分钟消费。

RabbitMQ 有两个特性,一个是 Time-To-Live Extensions,另一个是 Dead Letter Exchanges。
Time-To-Live Extensions
RabbitMQ允许我们为消息或者队列设置TTL(time to live),也就是过期时间。TTL表明了一条消息可在队列中存活的最大时间,单位为毫秒。也就是说,当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在经过TTL秒后 “死亡”,成为Dead Letter。如果既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用。
Dead Letter Exchanges
在 RabbitMQ 中,一共有三种消息的 “死亡” 形式:
● 消息被拒绝。通过调用 basic.reject 或者 basic.nack 并且设置的 requeue 参数为 false;
● 消息因为设置了TTL而过期;
● 队列达到最大长度。

DLX同一般的 Exchange 没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当队列中有 DLX 消息时,RabbitMQ就会自动的将 DLX 消息重新发布到设置的 Exchange 中去,进而被路由到另一个队列,publish 可以监听这个队列中消息做相应的处理。
由上简介大家可以看出,RabbitMQ本身是不支持延迟队列的,只是他的特性让勤劳的 中国脱发群体 急中生智(为了完成任务)弄出了这么一套可用的方案。
可用的方案就是:

  1. 如果有事件需要延迟那么将该事件发送到MQ 队列中,为需要延迟的消息设置一个TTL;
  2. TTL到期后就会自动进入设置好的DLX,然后由DLX转发到配置好的实际消费队列;
  3. 消费该队列的延迟消息,处理事件。

方案优劣:

优点:
大品牌组件,用的放心。如果面临大数据量需求可以很容易的横向扩展,同时消息支持持久化,有问题可回滚。
缺点:

  1. 配置麻烦,额外增加一个死信交换机和一个死信队列的配置;
  2. RabbitMQ 是一个消息中间件,TTL 和 DLX 只是他的一个特性,将延迟队列绑定在一个功能软件的某一个特性上,可能会有风险。不要杠,当你们组不用 RabbitMQ 的时候迁移很痛苦;
  3. 消息队列具有先进先出的特点,如果第一个进入队列的消息 A 的延迟是10分钟,第二个进入队列的消息B 的延迟是5分钟,期望的是谁先到 TTL谁先出,但是事实是B已经到期了,而还要等到 A 的延迟10分钟结束A先出之后,B 才能出。所以在设计的时候需要考虑不同延迟的消息要放到不同的队列。另外该问题官方已经给出了插件来支持:插件地址。

延迟队列的实现有两种方式:

通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能;
使用 RabbitMQ-delayed-message-exchange 插件实现延迟功能。
针对任务丢失的代价过大,高并发的场景
优点: 支持集群,分布式,高并发场景;
缺点: 引入额外的消息队列,增加项目的部署和维护的复杂度。
场景:为一个委托指定期限,委托到期后,委托关系终止,相关业务权限移交回原拥有者 这里采用的是RabbitMq的死信队列加TTL消息转化为延迟队列的方式(RabbitMq没有延时队列)

①声明一个队列设定其的死信队列  
@Configuration
public class MqConfig {
    public static final String GLOBAL_RABBIT_TEMPLATE = "rabbitTemplateGlobal";
 
    public static final String DLX_EXCHANGE_NAME = "dlxExchange";
    public static final String AUTH_EXCHANGE_NAME = "authExchange";
 
    public static final String DLX_QUEUE_NAME = "dlxQueue";
    public static final String AUTH_QUEUE_NAME = "authQueue";
    public static final String DLX_AUTH_QUEUE_NAME = "dlxAuthQueue";
 
    @Bean
    @Qualifier(GLOBAL_RABBIT_TEMPLATE)
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }
 
    @Bean
    @Qualifier(AUTH_EXCHANGE_NAME)
    public Exchange authExchange() {
        return ExchangeBuilder.directExchange (AUTH_EXCHANGE_NAME).durable (true).build ();
    }
 
    /**
     * 死信交换机
     * @return
     */
    @Bean
    @Qualifier(DLX_EXCHANGE_NAME)
    public Exchange dlxExchange() {
        return ExchangeBuilder.directExchange (DLX_EXCHANGE_NAME).durable (true).build ();
    }
 
    /**
     * 记录日志的死信队列
     * @return
     */
    @Bean
    @Qualifier(DLX_QUEUE_NAME)
    public Queue dlxQueue() {
        // Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        return QueueBuilder.durable (DLX_QUEUE_NAME).build ();
    }
 
    /**
     * 委托授权专用队列
     * @return
     */
    @Bean
    @Qualifier(AUTH_QUEUE_NAME)
    public Queue authQueue() {
        return QueueBuilder
                .durable (AUTH_QUEUE_NAME)
                .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_NAME)
                .withArgument("x-dead-letter-routing-key", "dlx_auth")
                .build ();
    }
 
    /**
     * 委托授权专用死信队列
     * @return
     */
    @Bean
    @Qualifier(DLX_AUTH_QUEUE_NAME)
    public Queue dlxAuthQueue() {
        // Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        return QueueBuilder
                .durable (DLX_AUTH_QUEUE_NAME)
                .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_NAME)
                .withArgument("x-dead-letter-routing-key", "dlx_key")
                .build ();
    }
 
    @Bean
    public Binding bindDlxQueueExchange(@Qualifier(DLX_QUEUE_NAME) Queue dlxQueue, @Qualifier(DLX_EXCHANGE_NAME) Exchange dlxExchange){
        return BindingBuilder.bind (dlxQueue).to (dlxExchange).with ("dlx_key").noargs ();
    }
 
    /**
     * 委托授权专用死信队列绑定关系
     * @param dlxAuthQueue
     * @param dlxExchange
     * @return
     */
    @Bean
    public Binding bindDlxAuthQueueExchange(@Qualifier(DLX_AUTH_QUEUE_NAME) Queue dlxAuthQueue, @Qualifier(DLX_EXCHANGE_NAME) Exchange dlxExchange){
        return BindingBuilder.bind (dlxAuthQueue).to (dlxExchange).with ("dlx_auth").noargs ();
    }
 
    /**
     * 委托授权专用队列绑定关系
     * @param authQueue
     * @param authExchange
     * @return
     */
    @Bean
    public Binding bindAuthQueueExchange(@Qualifier(AUTH_QUEUE_NAME) Queue authQueue, @Qualifier(AUTH_EXCHANGE_NAME) Exchange authExchange){
        return BindingBuilder.bind (authQueue).to (authExchange).with ("auth").noargs ();
    }
 
}
 

 ②发送含过期时间的消息  
 向授权交换机,发送路由为"auth"的消息(指定了业务所需的超时时间) =》发向MqConfig.AUTH_QUEUE_NAME 队列  
rabbitTemplate.convertAndSend(MqConfig.AUTH_EXCHANGE_NAME, "auth", "类型:END,信息:{id:1,fromUserId:111,toUserId:222,beginData:20201204,endData:20211104}", message -> {
            /**
             * MessagePostProcessor:消息后置处理
             * 为消息设置属性,然后返回消息,相当于包装消息的类
             */
 
            //业务逻辑:过期时间=xxxx
            String ttl = "5000";
            //设置消息的过期时间
            message.getMessageProperties ().setExpiration (ttl);
            return message;
        });
复制代码
③超时后队列MqConfig.AUTH_QUEUE_NAME会将消息转发至其配置的死信路由"dlx_auth",监听该死信队列即可消费定时的消息
 	/**
     * 授权定时处理
     * @param channel
     * @param message
     */
    @RabbitListener(queues = MqConfig.DLX_AUTH_QUEUE_NAME)
    public void dlxAuthQ(Channel channel, Message message) throws IOException {
        System.out.println ("\n死信原因:" + message.getMessageProperties ().getHeaders ().get ("x-first-death-reason"));
        //1.判断消息类型:1.BEGIN 2.END
        try {
            //2.1 类型为授权到期(END)
            //2.1.1 修改报件办理人
            //2.1.2 修改授权状态为0(失效)
 
            //2.2 类型为授权开启(BEGIN)
            //2.2.1 修改授权状态为1(开启)
            System.out.println (new String(message.getBody (), Charset.forName ("utf8")));
            channel.basicAck (message.getMessageProperties ().getDeliveryTag (),  false);
            System.out.println ("已处理,授权相关信息修改成功");
        } catch (Exception e) {
            //拒签消息
            channel.basicNack (message.getMessageProperties ().getDeliveryTag (), false, false);
            System.out.println ("授权相关信息处理失败, 进入死信队列记录日志");
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1466538.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于Python网络爬虫的IT招聘就业岗位可视化分析推荐系统

文章目录 基于Python网络爬虫的IT招聘就业岗位可视化分析推荐系统项目概述招聘岗位数据爬虫分析系统展示用户注册登录系统首页IT招聘数据开发岗-javaIT招聘数据开发岗-PythonIT招聘数据开发岗-Android算法方面运维方面测试方面招聘岗位薪资多维度精准预测招聘岗位分析推荐 结语…

《TCP/IP详解 卷一》第6章 DHCP

目录 6.1 引言 6.2 DHCP 6.2.1 地址池和租用 6.2.2 DHCP和BOOTP消息格式 6.2.3 DHCP和BOOTP选项 6.2.4 DHCP协议操作 6.2.5 DHCPv6 6.2.6 DCHP中继 6.2.7 DHCP认证 6.2.8 重新配置扩展 6.2.9 快速确认 6.2.10 位置信息&#xff08;LCI和LoST&#xff09; 6.2.11 移…

GPT-SoVITS 快速声音克隆使用案例:webui、api接口

参考: https://github.com/RVC-Boss/GPT-SoVITS 环境: Python 3.10 PyTorch 2.1.2, CUDA 12.0 安装包: 1、使用: 1)下载项目 git clone https://github.com/RVC-Boss/GPT-SoVITS.git2)下载预训练模型 https://huggingface.co/lj1995/GPT-SoVITS 下载模型文件放到GPT…

Vue2响应式原理分析(数据代理与数据劫持)

综述&#xff1a; 我们都知道&#xff0c;每个Vue的应用都是通过new一个Vue构造函数从而创造出来一个vm实例对象&#xff0c;el&#xff08;elect&#xff09;配置项为通过id选择器#root选择index页面中的根dom元素进行绑定&#xff0c;data配置项则为vue模板中用到的源数据。 …

python 层次分析(AHP)

文章目录 一、算法原理二、案例分析2.1 构建指标层判断矩阵2.2 求各指标权重2.2.1 算术平均法&#xff08;和积法&#xff09;2.2.2 几何平均法&#xff08;方根法&#xff09; 2.3 一致性检验2.3.1 求解最大特征根值2.3.2 求解CI、RI、CR值2.3.3 一致性判断 2.4 分别求解方案层…

算法沉淀——FloodFill 算法(leetcode真题剖析)

算法沉淀——FloodFill 算法 01.图像渲染02.岛屿数量03.岛屿的最大面积04.被围绕的区域05.太平洋大西洋水流问题06.扫雷游戏07.衣橱整理 Flood Fill&#xff08;泛洪填充&#xff09;算法是一种图像处理的基本算法&#xff0c;用于填充连通区域。该算法通常从一个种子点开始&am…

【DDD】学习笔记-薪资管理系统的测试驱动开发2

测试驱动开发的过程 满足简单设计并编写新的测试 当代码满足重用性和可读性之后&#xff0c;就应遵循简单设计的第四条原则“若无必要&#xff0c;勿增实体”&#xff0c;不要盲目地考虑为其增加新的软件元素。这时&#xff0c;需要暂时停止重构&#xff0c;编写新的测试。 …

2.23数据结构

单向循环链表 创建单向循环链表&#xff0c;创建节点 &#xff0c;头插&#xff0c;按位置插入&#xff0c;输出&#xff0c;尾删&#xff0c;按位置删除功能 //main.c #include "loop_list.h" int main() {loop_p Hcreate_head();insert_head(H,12);insert_head(…

计算机网络-网络层,运输层,应用层

网络层/网际层 网络层的主要任务包括&#xff1a; 提供逻辑上的端到端通信&#xff1a;网络层负责确定数据的传输路径&#xff0c;使数据能够从源主机传输到目标主机&#xff0c;即实现端到端的通信。数据包的路由和转发&#xff1a;网络层根据目标主机的地址信息&#xff0c…

vue项目使用vue2-org-tree

实现方式 安装依赖 npm i vue2-org-tree使用的vue页面引入 <template><div class"container"><div class"oTree" ><vue2-org-tree name"test":data"data":horizontal"horizontal":collapsable"…

【服务器数据恢复】通过reed-solomon算法恢复raid6数据的案例

服务器数据恢复环境&#xff1a; 一台网站服务器中有一组由6块磁盘组建的RAID6磁盘阵列&#xff0c;操作系统层面运行MySQL数据库和存放一些其他类型文件。 服务器故障&#xff1a; 该服务器在工作过程中&#xff0c;raid6磁盘阵列中有两块磁盘先后离线&#xff0c;不知道是管理…

LabVIEW开发FPGA的高速并行视觉检测系统

LabVIEW开发FPGA的高速并行视觉检测系统 随着智能制造的发展&#xff0c;视觉检测在生产线中扮演着越来越重要的角色&#xff0c;尤其是在质量控制方面。传统的基于PLC的视觉检测系统受限于处理速度和准确性&#xff0c;难以满足当前生产需求的高速和高精度要求。为此&#xf…

【python】yolo目标检测模型转为onnx,及trt/engine模型的tensorrt轻量级模型部署

代码参考&#xff1a; Tianxiaomo/pytorch-YOLOv4: PyTorch ,ONNX and TensorRT implementation of YOLOv4 (github.com)https://github.com/Tianxiaomo/pytorch-YOLOv4这个大佬对于各种模型转化写的很全&#xff0c;然后我根据自己的需求修改了部分源码&#xff0c;稍微简化了…

【区块链】联盟链

区块链中的联盟链 写在最前面**FAQs** 联盟链&#xff1a;区块链技术的新兴力量**联盟链的定义****联盟链的技术架构**共识机制智能合约加密技术身份认证 **联盟链的特点**高效性安全性可控性隐私保护 **联盟链的应用场景****金融服务****供应链管理****身份验证****跨境支付**…

VSCODE include错误 找不到 stdio.h

解决办法&#xff1a; Ctrl Shift P 打开命令面板&#xff0c; 键入 “Select Intellisense Configuration”&#xff08;下图是因为我在写文章之前已经用过这个命令&#xff0c;所以这个历史记录出现在了第一行&#xff09; 再选择“Use gcc.exe ”&#xff08;后面的Foun…

网络原理-TCP/IP(7)

目录 网络层 路由选择 数据链路层 认识以太网 以太网帧格式 认识MAC地址 对比理解MAC地址和IP地址 认识MTU ARP协议 ARP协议的作用 ARP协议工作流程 重要应用层协议DNS(Domain Name System) DNS背景 NAT技术 NAT IP转换过程 NAPT NAT技术的优缺点 网络层 路由…

如何将建筑白模叠加到三维地球上?

​ 通过以下方法可以将建筑白模叠加到三维地球上。 方法/步骤 下载三维地图浏览器 http://www.geosaas.com/download/map3dbrowser.exe&#xff0c;安装完成后桌面上出现”三维地图浏览器“图标。 2、双击桌面图标打开”三维地图浏览器“ 3、点击“建筑白模”菜单&…

基于java(Springboot)学生信息管理和新生报到系统设计与实现

博主介绍&#xff1a;黄菊华老师《Vue.js入门与商城开发实战》《微信小程序商城开发》图书作者&#xff0c;CSDN博客专家&#xff0c;在线教育专家&#xff0c;CSDN钻石讲师&#xff1b;专注大学生毕业设计教育和辅导。 所有项目都配有从入门到精通的基础知识视频课程&#xff…

康威生命游戏

康威生命游戏 康威生命游戏(Conway’s Game of Life)是康威发明的细胞自动机。 生命游戏有几个简单的规则&#xff1a; 细胞有两种状态&#xff0c;存活或死亡&#xff0c;每个细胞以自身为中心与周围的八格细胞互动。 对于存活的细胞&#xff1a; 当周围的细胞过少(<2)或…

C# cass10 面积计算

运行环境Visual Studio 2022 c# cad2016 cass10 通过面积计算得到扩展数据&#xff0c;宗地面积 &#xff0c;房屋占地面积&#xff0c;房屋使用面积 一、主要步骤 获取当前AutoCAD应用中的活动文档、数据库和编辑器对象。创建一个选择过滤器&#xff0c;限制用户只能选择&q…