RabbitMQ --- 死信交换机

news2025/1/11 4:03:52

一、简介

1.1、什么是死信交换机

什么是死信?

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false

  • 消息是一个过期消息,超时无人消费

  • 要投递的队列消息满了,无法投递

 

如果这个包含死信的队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,检查DLX)。

 

如图,一个消息被消费者拒绝了,变成了死信:

因为simple.queue绑定了死信交换机 dl.direct,因此死信会投递给这个交换机:  

如果这个死信交换机也绑定了一个队列,则消息最终会进入这个存放死信的队列:  

另外,队列将死信投递给死信交换机时,必须知道两个信息:

  • 死信交换机名称

  • 死信交换机与死信队列绑定的RoutingKey

这样才能确保投递的消息能到达死信交换机,并且正确的路由到死信队列。

 

 

1.2、利用死信交换机接收死信(拓展)

在失败重试策略中,默认的RejectAndDontRequeueRecoverer会在本地重试次数耗尽后,发送reject给RabbitMQ,消息变成死信,被丢弃。

我们可以给simple.queue添加一个死信交换机,给死信交换机绑定一个队列。这样消息变成死信后也不会丢弃,而是最终投递到死信交换机,路由到与死信交换机绑定的队列。

我们在consumer服务中,定义一组死信交换机、死信队列:  

// 声明普通的 simple.queue队列,并且为其指定死信交换机:dl.direct
@Bean
public Queue simpleQueue2(){
    return QueueBuilder.durable("simple.queue") // 指定队列名称,并持久化
        .deadLetterExchange("dl.direct") // 指定死信交换机
        .build();
}
// 声明死信交换机 dl.direct
@Bean
public DirectExchange dlExchange(){
    return new DirectExchange("dl.direct", true, false);
}
// 声明存储死信的队列 dl.queue
@Bean
public Queue dlQueue(){
    return new Queue("dl.queue", true);
}
// 将死信队列 与 死信交换机绑定
@Bean
public Binding dlBinding(){
    return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}

 

1.3、总结

什么样的消息会成为死信?

  • 消息被消费者reject或者返回nack

  • 消息超时未消费

  • 队列满了

死信交换机的使用场景是什么?

  • 如果队列绑定了死信交换机,死信会投递到死信交换机;

  • 可以利用死信交换机收集所有消费者处理失败的消息(死信),交由人工处理,进一步提高消息队列的可靠性。

 

 

二、TTL

一个队列中的消息如果超时未消费,则会变为死信,超时分为两种情况:

  • 消息所在的队列设置了超时时间

  • 消息本身设置了超时时间

 

2.1、接收超时死信的死信交换机

在consumer服务的SpringRabbitListener中,定义一个新的消费者,并且声明 死信交换机、死信队列:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "dl.ttl.queue", durable = "true"),
    exchange = @Exchange(name = "dl.ttl.direct"),
    key = "ttl"
))
public void listenDlQueue(String msg){
    log.info("接收到 dl.ttl.queue的延迟消息:{}", msg);
}

 

2.2、声明一个队列,并且指定TTL

要给队列设置超时时间,需要在声明队列时配置x-message-ttl属性:

@Bean
public Queue ttlQueue(){
    return QueueBuilder.durable("ttl.queue") // 指定队列名称,并持久化
        .ttl(10000) // 设置队列的超时时间,10秒
        .deadLetterExchange("dl.ttl.direct") // 指定死信交换机
        .build();
}

注意,这个队列设定了死信交换机为dl.ttl.direct

声明交换机,将ttl与交换机绑定:

@Bean
public DirectExchange ttlExchange(){
    return new DirectExchange("ttl.direct");
}
@Bean
public Binding ttlBinding(){
    return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}

发送消息,但是不要指定TTL:

@Test
public void testTTLQueue() {
    // 创建消息
    String message = "hello, ttl queue";
    // 消息ID,需要封装到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 发送消息
    rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
    // 记录日志
    log.debug("发送消息成功");
}

发送消息的日志:

查看下接收消息的日志:  

因为队列的TTL值是10000ms,也就是10秒。可以看到消息发送与接收之间的时差刚好是10秒。  

 

2.3、发送消息时,设定TTL

在发送消息时,也可以指定TTL:

@Test
public void testTTLMsg() {
    // 创建消息
    Message message = MessageBuilder
        .withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
        .setExpiration("5000")
        .build();
    // 消息ID,需要封装到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 发送消息
    rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
    log.debug("发送消息成功");
}

查看发送消息日志:

接收消息日志:  

这次,发送与接收的延迟只有5秒。说明当队列、消息都设置了TTL时,任意一个到期就会成为死信。  

 

2.4、总结

消息超时的两种方式是?

  • 给队列设置ttl属性,进入队列后超过ttl时间的消息变为死信

  • 给消息设置ttl属性,队列接收到消息超过ttl时间后变为死信

如何实现发送一个消息20秒后消费者才收到消息?

  • 给消息的目标队列指定死信交换机

  • 将消费者监听的队列绑定到死信交换机

  • 发送消息时给消息设置超时时间为20秒

 

 

三、延迟队列

利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。

延迟队列的使用场景包括:

  • 延迟发送短信

  • 用户下单,如果用户在15 分钟内未支付,则自动取消

  • 预约工作会议,20分钟后自动通知所有参会人员

因为延迟队列的需求非常多,所以RabbitMQ的官方也推出了一个插件,原生支持延迟队列效果。

这个插件就是DelayExchange插件。参考RabbitMQ的插件列表页面:Community Plugins — RabbitMQ

使用方式可以参考官网地址:Scheduling Messages with RabbitMQ | RabbitMQ - Blog

 

3.1、安装DelayExchange插件

安装MQ


执行下面的命令来运行MQ容器:

docker run \
 -e RABBITMQ_DEFAULT_USER=itcast \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3.8-management

安装DelayExchange插件


官方的安装指南地址为:Scheduling Messages with RabbitMQ | RabbitMQ - Blog

上述文档是基于linux原生安装RabbitMQ,然后安装插件。

 

因为我们之前是基于Docker安装RabbitMQ,所以下面我们会讲解基于Docker来安装RabbitMQ插件。


下载插件


RabbitMQ有一个官方的插件社区,地址为:Community Plugins — RabbitMQ

其中包含各种各样的插件,包括我们要使用的DelayExchange插件:

 

大家可以去对应的GitHub页面下载3.8.9版本的插件,地址为Release v3.8.9 · rabbitmq/rabbitmq-delayed-message-exchange · GitHub这个对应RabbitMQ的3.8.5以上版本。


上传插件


因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。如果不是基于Docker的,重新创建Docker容器。

我们之前设定的RabbitMQ的数据卷名称为mq-plugins,所以我们使用下面命令查看数据卷:

docker volume inspect mq-plugins

可以得到下面结果:

接下来,将插件上传到这个目录即可:

 


安装插件


最后就是安装了,需要进入MQ容器内部来执行安装。我的容器名为mq,所以执行下面命令:

docker exec -it mq bash

执行时,请将其中的 -it 后面的mq替换为你自己的容器名.

进入容器内部后,执行下面命令开启插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

 

3.2、DelayExchange原理

DelayExchange需要将一个交换机声明为delayed类型。当我们发送消息到delayExchange时,流程如下:

  • 接收消息

  • 判断消息是否具备x-delay属性

  • 如果有x-delay属性,说明是延迟消息,持久化到硬盘,读取x-delay值,作为延迟时间

  • 返回routing not found结果给消息发送者

  • x-delay时间到期后,重新投递消息到指定队列

 

3.3、使用DelayExchange

插件的使用也非常简单:声明一个交换机,交换机的类型可以是任意类型,只需要设定delayed属性为true即可,然后声明队列与其绑定即可。

1)声明DelayExchange交换机

基于注解方式(推荐):

也可以基于@Bean的方式:  

 

2)发送消息

发送消息时,一定要携带x-delay属性,指定延迟的时间:

 

3.4、总结

延迟队列插件的使用步骤包括哪些?

•声明一个交换机,添加delayed属性为true

•发送消息时,添加x-delay头,值为超时时间

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/499205.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

超详细-自动化测试从选型到落地,2023年我从10k涨到了18k*14薪...

目录:导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜) 前言 Python自动化测试&…

系统集成项目管理工程师 下午 真题 及考点(2021年上下半年)

文章目录 2021年下半年试题一:第18章 项目风险管理,风险应对策略,风险的性质(客观、偶然、相对、社会、不确定)试题二:第9章 项目成本管理,执行绩效(即CV和SV)&#xff0…

HAL库版FreeRTOS(中)

目录 FreeRTOS 任务切换PendSV 异常PendSV 中断服务函数FreeRTOS 确定下一个要运行的任务函数vTaskSwitchContext()函数taskSELECT_HIGHEST_PRIORITY_TASK() PendSV 异常何时触发FreeRTOS 时间片调度实验功能设计软件设计下载验证 FreeRTOS 内核控制函数FreeRTOS 内核控制函数预…

蓝牙设备的名称与MAC地址及UUID

每个蓝牙设备都具有各自的地址和名称,他们之间通过唯一通过地址和名称进行数据交互。本文详细讲述了蓝牙设备的名称和地址的格式及作用。 名称 蓝牙设备具有各自的名称,通常为字母与数字的组合. MAC地址 与Ethernet相同,MAC地址为48bit的…

VTK安装路径检查

/usr/include/vtk-7.1——————VTK头文件

番剧更新表及番剧详情数据库

访问【WRITE-BUG数字空间】_[内附完整源码和文档] 该项目立足于目前各大平台网站的番剧信息较为分散,用户需要辗转多个平台才能获取较为完整的番剧信息的背景下,实现了各大平台网站番剧信息的整合。将各大平台网站的番剧更新信息及番剧详情信息整合制表…

MATLAB 之 基本概述

文章目录 一、MATLAB 主要功能1. 数值计算功能2. 符号计算功能3. 绘图功能4. 程序设计语言功能5. 工具箱的扩展功能 二、MATLAB 操作界面1. 主窗口2. 命令行窗口3. 当前文件夹窗口4. 工作区窗口5. 搜索路径 三、MATLAB 基本操作1. 交互式命令操作1.1 命令行1.2 续行符1.3 命令行…

开关电源基础01:电源变换器基础(1)-关于缘起

说在开头 我相信各位胖友们通过对《阻容感基础》,《信号完整性基础》以及《半导体器件基础》艰苦卓绝地钻研,已为 “硬功夫” 这门绝世武功,打下了坚实的入门基础,入门之日简直就是指日可待(我xxx,都半年了…

【数据结构】单链表详解

☃️个人主页:fighting小泽 🌸作者简介:目前正在学习C语言和数据结构 🌼博客专栏:数据结构 🏵️欢迎关注:评论👊🏻点赞👍🏻留言💪&…

阿里云服务器镜像系统怎么选择?超详细教程

阿里云服务器镜像怎么选择?云服务器操作系统镜像分为Linux和Windows两大类,Linux可以选择Alibaba Cloud Linux,Windows可以选择Windows Server 2022数据中心版64位中文版,阿里云百科来详细说下阿里云服务器操作系统有哪些&#xf…

Buf 教程 - 使用 Protobuf 生成 Golang 代码和 Typescript 类型定义

简介 Buf 是一款更高效、开发者友好的 Protobuf API 管理工具,不仅支持代码生成,还支持插件和 Protobuf 格式化。 我们可以使用 Buf 替代原本基于 Protoc 的代码生成流程,一方面可以统一管理团队 Protoc 插件的版本、代码生成配置&#xff…

测试之路,2023年软件测试市场领域有哪些变化?突破走得更远...

目录:导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜) 前言 Python自动化测试&…

Linux - 第12节 - 网络编程套接字

1.预备知识 1.1.理解源IP地址和目的IP地址 因特网上的每台计算机都有一个唯一的IP地址,如果一台主机上的数据要传输到另一台主机,那么对端主机的IP地址就应该作为该数据传输时的目的IP地址。但仅仅知道目的IP地址是不够的,当对端主机收到该数…

【Java校招面试】基础知识(七)——数据库

目录 前言一、数据库索引二、数据库锁三、数据库事务四、数据库连接池后记 前言 本篇主要介绍数据库的相关内容。 “基础知识”是本专栏的第一个部分,本篇博文是第六篇博文,如有需要,可: 点击这里,返回本专栏的索引文…

Sourcetree介绍及使用

Sourcetree是一个操作简单但功能强大的免费Git客户端管理工具,可应用在Windows和Mac平台。 Sourcetree的安装: 1.从Sourcetree | Free Git GUI for Mac and Windows 下载SourceTreeSetup-3.4.12.exe; 2.双击SourceTreeSetup-3.4.12.exe&#…

【C++】动态规划

参考博客:动态规划详解 1. 什么是动态规划 动态规划(英语:Dynamic programming,简称 DP),是一种在数学、管理科学、计算机科学、经济学和生物信息学中使用的,通过把原问题分解为相对简单的子问…

Linux LED 驱动开发实验

1、LED 灯驱动原理 Linux 下的任何外设驱动,最终都是要配置相应的硬件寄存器。LED 灯驱动最 终也是对 I.MX6ULL 的 IO 口进行配置,在 Linux 下编写驱动要符合 Linux 的驱动框架。I.MX6U-ALPHA 开发板上的 LED 连接到 I.MX6ULL 的 GPIO1_IO03 这个引脚上&…

Day963.如何拆分数据 -遗留系统现代化实战

如何拆分数据 Hi,我是阿昌,今天学习记录的是关于如何拆分数据的内容。 如何拆分数据,这个场景在建设新老城区,甚至与其他城市(外部系统)交互时都非常重要。 作为开发人员,理想中的业务数据存…

C++《vector类的使用介绍》

本文主要介绍vector一些常见的接口函数的使用 文章目录 一、vector的介绍二、vector的使用2.1vector构造函数2.2迭代器的使用2.3空间增长问题2.4增删查改问题 一、vector的介绍 vector是表示可变大小数组的序列容器。就像数组一样,vector也采用的连续存储空间来存储…

比赛记录:Codeforces Round 871 (Div. 4) A~H

传送门:CF A题:A. Love Story 简单比对一下即可解决 #include <bits/stdc.h> using namespace std; typedef long long ll; #define root 1,n,1 #define ls rt<<1 #define rs rt<<1|1 #define lson l,mid,rt<<1 #define rson mid1,r,rt<<1|1 …