死信是什么,如何运用RabbitMQ的死信机制?

news2024/12/28 22:07:21

系列文章目录

手把手教你,本地RabbitMQ服务搭建(windows)
消息队列选型——为什么选择RabbitMQ
RabbitMQ 五种消息模型
RabbitMQ 能保证消息可靠性吗
推或拉? RabbitMQ 消费模式该如何选择


死信是什么,如何运用RabbitMQ的死信机制?

  • 系列文章目录
  • 前言
  • 一、死信与AMQP
  • 二、产生死信的场景
    • 1. 消费失败
    • 2. 超时
      • 消息TTL
      • 队列TTL
    • 3. 队列饱和
  • 三、死信的处理
    • 1. DLX 死信交换机
    • 2. 死信队列
    • 3. 一些细节逻辑
      • (1)死信的路由问题
      • (2)死信的循环问题
  • 四、死信功能Demo
  • 五、死信的应用
    • 1. 消息堆积报警
    • 2. 异常消息检查
    • 3. 延迟消费


前言

我们在上次讨论RabbitMQ的消息可靠性时,已经提到了死信队列(详见系列文章《RabbitMQ 能保证消息可靠性吗》),死信概念是RabbitMQ的重要特性,官网也有该特性的介绍,那么这种设计有什么用,我们又该怎么使用死信呢?一起开始本次的学习吧

在这里插入图片描述


一、死信与AMQP

死信是指由于某些原因无法被正常投递到目标地址的邮件或消息,而在MQ的语义下,就是无法被消费的mq消息。

从AMQP的规范原文中(AMQP0-9 版本协议文档),我们也可以看到死信相关的说明:
在这里插入图片描述

The server SHOULD track the number of times a message has been delivered to clients and when a message is redelivered a certain number of times ­ e.g. 5 times ­ without being acknowledged,the server SHOULD consider the message to be unprocessable (possibly causing client
applications to abort), and move the message to a dead letter queue.
The server SHOULD track the number of times a message has been delivered to clients and when a message is redelivered a certain number of times ­ e.g. 5 times ­ without being acknowledged,the server SHOULD consider the message to be unprocessable (possibly causing client
applications to abort), and move the message to a dead letter queue.
服务器应跟踪消息已传递给客户端的次数,当消息被重新传递一定次数(例如5次)而未得到确认时,服务器应认为该消息无法处理(可能导致客户端要中止的应用程序),并将消息移动到死信队列中。

因此,不难看出RabbitMQ 有死信这种设计,主要是遵从了AMQP规范,其目的是针对一些暂时无法处理的消息,避免其无限循环的同时,也能保证这些MQ消息不会因为暂时无法处理而丢失,能够帮助开发者更好地控制消息的处理流程,提高系统的可靠性和稳定性。

二、产生死信的场景

1. 消费失败

即消费者无法处理消息,或处理失败,最后返回给rabbitMQ服务器一个否定的Ack,并且要求不要重新入队,一般有以下两个方法

// 拒绝单个消息
void basicReject(long deliveryTag, boolean requeue);
// 拒绝一个或多个消息
void basicNack(long deliveryTag, boolean multiple, boolean requeue);

我们以一段 basicReject的代码为例:

// 新建消费者
Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("Received message: " + message);

        // 模拟处理消息失败
        boolean messageProcessedSuccessfully = false;

        if (!messageProcessedSuccessfully) {
            System.out.println("Message processing failed, rejecting message...");
            // 注意第二个参数设置为为false
            channel.basicReject(envelope.getDeliveryTag(), false);
            System.out.println("Message rejected");
        }
    }
};
// 推模式,消费者监听队列
channel.basicConsume(QUEUE_NAME, true, consumer); 

需要注意的是,如果是Ack因为网络原因没有发送到RabbitMQ服务器,消息是不会因此就置为死信的。同样,如果Ack 的参数为 requeue = true ,消息也不会被置为死信,而是重新发送到队列尾部(尾部不一定是最后)

2. 超时

超时(Expiration) 一定是因为有个限时(Time-To-Live),而在rabbitMQ中,我们可以为队列和消息设置其TTL。需要注意的是:为队列设置TTL,并不代表队列本身的有效时长,而是指分发进入该队列的消息的有效时长,当消息进入该队列久于该时长,则消息超时.

消息TTL

消息的有效时长设置,是通过 AMQP.BasicProperties 进行设置的

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                     .deliveryMode(2) // 消息持久化
                     .expiration("60000") // 有效时长60秒
                     .build();
channel.basicPublish("", queueName, properties, message.getBytes("UTF-8"));

如上述代码,即消息如果在60秒内没有被消费,则会被自动从队列中移除

队列TTL

为队列设置时长,则需要在声明队列时加上参数 x-message-ttl

Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000);
channel.queueDeclare(queueName, false, false, false, args);

同样,此时队列中的消息如果在60秒内没有被消费,则会被自动从队列中移除,当队列TTL 和 消息TTL同时设置时,取其中的较小值,为消息有效期

3. 队列饱和

同设置有效时间一样,我们也可以给队列设置个消息上限(消息条数或数据量大小),使用到的参数分别为 x-max-lengthx-max-length-bytes

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-length", 10); // 最大10条
args.put("x-max-length-bytes", 1000);  // 最大1000长度的 byte数组
channel.queueDeclare("myqueue", false, false, false, args);

如果超出该限制,rabbitMQ默认将从队列头部拿消息进行移除(一般是队列中最老的),当然这种溢出的处理策略也有其他选择,比如设置 x-overflow 参数为 drop-head (默认), reject-publishreject-publish-dlx

channel.queueDeclare(QUEUE_NAME, true, false, false, 
     new java.util.HashMap<String, Object>() {{
          put("x-overflow", "reject-publish");
     }});

三种策略的含义如下:

  • drop-head:丢弃队列头部
  • reject-publish:拒绝新消息入队尾,如果消息开启了"发布确认",则向消息发布者发送nack
  • reject-publish-dlx:与 reject-publish 一样,但如果指定死信交换机,会将该消息转发至死信交换机

三、死信的处理

1. DLX 死信交换机

针对死信的处理,可以选择丢弃和死信交换机(在配置了死信交换机的情况下),我们讨论的自然是后者,这里就画一幅死信交换机的流程图
在这里插入图片描述
我们不难发现,死信的现象总是在队列中发生的,因此我们可以给队列设置一个"死信交换机",当出现死信的时候,队列就可以把死信转发给死信交换机。其代码如下

channel.exchangeDeclare("some.exchange.name", "direct");
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
// 为队列 myqueue 设置一个死信交换机,该死信交换机的名字为some.exchange.name
channel.queueDeclare("myqueue", false, false, false, args);

必须注意的是,所谓的”死信交换机“的功能其实和普通交换机并无区别,它只是因为特殊用法,而被我们叫做”死信交换机“而已

当为一个队列设置了死信交换机时,我们可以在管理面板上,看到该队列的DLX标志。
在这里插入图片描述

2. 死信队列

绑定在死信交换机上的队列就叫死信队列,因为交换机本身并不存消息,所以死信最终是存储于死信队列中,当然,死信队列本身与普通队列,功能上也没有什么区别。

3. 一些细节逻辑

(1)死信的路由问题

死信从队列转发给死信交换机,也是带有路由键的,如果我们没有特别设置,那么路由键就是消息自身的路由键,如果我们做了如下设置,那么所有由该队列发给死信交换机的消息,路由键都会变成 messageDead.dl

    Map<String, Object> args = new HashMap<String, Object>();
    // 设置一个死信交换机
    args.put("x-dead-letter-exchange", dlxName);
    // 设置死信路由键,此处将所有死信的路由键设置为"messageDead.dl"
    args.put("x-dead-letter-routing-key", "messageDead.dl");
    channel.queueDeclare(QUEUE_NAME, false, false, false, args);

(2)死信的循环问题

一般的消息流程是由交换机分发给队列,然而死信却是从队列发送给交换机。因此不难想象,能否构建一个环状结构,让一个死信又经由死信交换机,最后又回到同一个队列呢?我们不卖关子,环可以构建,但消息不会反复,直接摘抄官方文档如下:

It is possible to form a cycle of message dead-lettering. For instance, this can happen when a queue dead-letters messages to the default exchange without specifying a dead-letter routing key. Messages in such cycles (i.e. messages that reach the same queue twice) will be dropped if there was no rejections in the entire cycle.
有可能形成消息死信的循环。例如,当将死信消息发送到默认交换机而不指定死信路由键时,可能会发生这种情况。如果整个周期中没有拒绝,则此类循环中的消息(即两次到达同一队列的消息)将被丢弃。

四、死信功能Demo

学习完上面的内容,我们实际来运行个demo测试下死信的功能是否如上所诉。

如下代码,我们给一个长度为20的队列设置了死信交换机,然后向该队列发送三十条路由键为”messageAlive“的MQ消息,而与死信交换机绑定的死信队列,则监听着"messageDead.#"的路由

public class AsyncPublisher {

    private final static String QUEUE_NAME = "message_queue";
    private static final int MESSAGE_COUNT = 30;
    private static ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        String message = "Hello, RabbitMQ!";

        // 声明个容量20条的,带死信交换机的队列
        channel.exchangeDeclare("myExchange", "topic");
        declareQueueWithDLX(channel);
        channel.queueBind(QUEUE_NAME,"myExchange","messageAlive");

        // 异步发布确认
        channel.confirmSelect();
        channel.addConfirmListener(new ConfirmCallback() {
            @Override
            public void handle(long deliveryTag, boolean multiple) throws IOException {
                System.err.println("Sucess to publish message.");
                if (multiple) {
                    ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag, true);
                    confirmed.clear();
                } else {
                    outstandingConfirms.remove(deliveryTag);
                }
            }
        }, new ConfirmCallback() {
            @Override
            public void handle(long deliveryTag, boolean multiple) throws IOException {
                System.err.println("Failed to publish message.");
            }
        });

        for (int i = 0; i < MESSAGE_COUNT; i++) {
            long nextSeqNo = channel.getNextPublishSeqNo();
            channel.basicPublish("myExchange", "messageAlive", null, message.getBytes());
            outstandingConfirms.put(nextSeqNo, message);
        }
        System.out.println("All messages published successfully.");

        channel.close();
        connection.close();
    }

    static void declareQueueWithDLX(Channel channel) throws IOException {
        String dlxName = "some.exchange.name";
        String dlqName = "some.exchange.queue";
        // 声明个交换机,作为死信交换机,类型为topic
        channel.exchangeDeclare(dlxName, "topic");
        // 声明个死信队列
        channel.queueDeclare(dlqName, false, false, false, null);
        // 将死信队列与死信交换机绑定,此处设定路由
        channel.queueBind(dlqName, dlxName, "messageDead.#");

        Map<String, Object> args = new HashMap<String, Object>();
        // 设置一个死信交换机
        args.put("x-dead-letter-exchange", dlxName);
        // 设置队列最大消息量为 20
        args.put("x-max-length", 20);
        //args.put("x-dead-letter-routing-key", "messageDead.dl");
        channel.queueDeclare(QUEUE_NAME, false, false, false, args);
    }
}

预测最终结果:正常队列20条消息是满的,而死信队列因为设置了路由,且关注的是"messageDead.#" , 当死信使用着原始的路由键messageAlive进入死信交换机时,无法被分发到任何队列,所以死信队列一条消息也不会被分发到,我们看看结果:

在这里插入图片描述
结果符合预期,如果我们把上文 declareQueueWithDLX 方法中的

 args.put("x-dead-letter-routing-key", "messageDead.dl");

取消注释,即让死信路由键生效,那么所有发送给死信交换机的消息,都会使用该路由键 “messageDead.dl” ,而不是原来消息的路由键。我们删除队列,再运行一次,预测死信队列将出现十条死信。
在这里插入图片描述
结果符合预期,且其中队列的 DLK 标志即 x-dead-letter-routing-key

五、死信的应用

我们把目光回到开头,介绍死信的部分,我们提及了死信机制的目的:是针对一些暂时无法处理的消息,避免其无限循环的同时,也能保证这些MQ消息不会因为暂时无法处理而丢失,能够帮助开发者更好地控制消息的处理流程,提高系统的可靠性和稳定性。

那么在实际中,我们会如何运用死信队列完成上述目的呢?

1. 消息堆积报警

我们可以使用定长队列,如果消费端的消费能力长期小于生产者的生产能力,将会导致大量消息堆积在MQ的队列中,此时使用定长队列,就能在消息溢出时,进入死信交换机->死信队列,只要我们为死信队列建立一个消费着,就可以及时获取到消息堆积情况,并发出报警了

2. 异常消息检查

当一些消息因为暂时无法处理,而被消费端拒收时,此时为队列设置死信转发,就能避免让该消息重复入队并被循环获取,同时还让该异常消息备份进死信队列中,而不至于丢失。这些消息后续可以取出进行重新处理,或分析其异常原因

3. 延迟消费

可以为队列设置TTL,配合死信机制达到延时队列的效果,对于某些需要延迟处理的消息,可以将其发送到TTL队列中,等待一定的时间后,再将其投递到死信队列中,然后被死信队列消费者消费掉。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/693258.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用影刀RPA合并excel数据

合并不同sheet&#xff1a; 先获取不同sheet中的表头&#xff0c;合并所有表头并去重存作列表&#xff0c;新建excel在第一行插入该表头数据,作合并数据存放使用循环方法&#xff0c;依次获取每个sheet的数据&#xff0c;用if判断表头是否在1的列表中&#xff0c;在的话则将整…

618技术揭秘:探究竞速榜页面核心前端技术 | 京东云技术团队

前言 H5页面作为移动端Web应用的重要形式之一&#xff0c;已经成为了现代Web开发的热门话题。在H5页面的开发过程中&#xff0c;前端技术的应用至关重要。本文将探究京东竞速榜H5页面的核心前端技术&#xff0c;包括动画、样式配置化、皮肤切换、海报技术、调试技巧等方面&…

easyX库图像处理相关函数

0.图像处理相关函数与类型概览 您好&#xff0c;这里是limou3434&#xff0c;本次我将给您带来的是easyX的图像处理相关接口。 如果您感兴趣也可以看看我的其他内容。 函数或数据类型描述IMAGE保存图像的对象。loadimage读取图片文件。putimage在当前绘图设备上绘制指定图像…

10--Gradle进阶 - Gradle任务的执行

10--Gradle进阶 - Gradle任务的执行 前言 在前面的篇章中&#xff0c;我们尝试执行了 gradle 的 task&#xff0c;但是不清楚有哪些语法&#xff0c;下面来介绍一下。 任务执行 任务执行语法&#xff1a;gradle [taskName...] [--option-name...]。 分类解释常见的任务&#xf…

沉浸式三维虚拟展厅交互体验科技感十足

随着科技的不断发展进步&#xff0c;展厅的表现形式也变得多样化&#xff0c;紧跟时代发展步伐&#xff0c;迭代创新。 3D虚拟展厅具有四大优势 一、降低成本&#xff0c;提高效率 3D“VR线上展厅”将艺术优势资源转到线上搭建的艺术线上展平台&#xff0c;相对传统艺术展来说有…

ModaHub魔搭社区:详解向量数据库Milvus的Mishards:集群分片中间件(二)

目录 元数据 服务发现 元数据 元数据记录了底层数据的组织结构信息。在分布式系统中&#xff0c;Milvus 写节点是元数据唯一的生产者&#xff0c;而 Mishards 节点、Milvus 写节点和读节点都是元数据的消费者。目前版本的 Milvus 只支持 MySQL 和 SQLite 作为元数据的存储后…

SpringBoot整合网易邮箱

SpringBoot整合邮箱 1&#xff0c;开启POP3/SMTP/IMAP服务 注意&#xff1a;每个邮箱的密码唯一&#xff0c;不要随意分享给他人 最后就是这个样子了 2&#xff0c;整合测试 2.1&#xff0c;pom.xml <?xml version"1.0" encoding"UTF-8"?> <…

Just KNIME it [S2C13] 机器学习的可解释性

朋友们&#xff0c;Just KNIME it 还有在跟进吗? 本季已经到 13 期啦。 本期探讨的主题是机器学习的可解释性问题&#xff0c;快随指北君一起看看吧。 挑战 挑战13&#xff1a;揭示犯罪率之迷 难度&#xff1a;中等 情境描述&#xff1a;作为一名在房地产公司任职的数据科学家…

Apikit 自学日记:发起文档测试-HTTP

HTTP 功能入口&#xff1a;API管理应用 / 选中某个项目 / API文档菜单 / 选中某一API文档 / 点击“测试”TAB API文档测试页&#xff0c;可对该API文档描述的接口进行快速测试。API文档测试页分为地址控制栏、请求控制区、返回展示区&#xff0c;以及测试辅助工具区共四个部分…

Android进阶之路 - 深入浅出字体、字体库

当时组内临时接到一个换字体库的需求&#xff0c;这个需求相对简单&#xff0c;因为手头有其他事情&#xff0c;同时之前也没换过字体库&#xff0c;就交给了同事去做了&#xff1b;现在有时间就好好充实下自己 ( 我写的也未必全对&#xff0c;如有不足可直接提出&#xff0c;相…

vue 组件基本使用方法

前言:vue 可以比较灵活的使用 html的片段&#xff0c;并将html的片段进行数据隔离&#xff0c;参数也可以互相传递&#xff0c;组件与组件之间也可以进行数据的交互 合理的使用组件可以避免重复代码或者很方便的调用第三方组件库 vue组件 简单实例组件传参实际应用父子组件交互…

右键文件夹添加指定打开的程序(如:IDEA、PyCharm等)

一.打开注册表 使用winR打开运行界面(默认为左下角)输入regedit 二.进入指定目录 1.找不到可直接复制路径: 计算机\HKEY_LOCAL_MACHINE\SOFTWARE\Classes\Directory\shell 2.在当前路径shell下右键->新建->项->命名IDEA &#xff08;名字任意即可&#xff09;,修改默认…

利用影刀RPA批量导出excel中的图片并重命名

本程序要求excel中的图片应该符合以下截图中的格式&#xff1a; 图片和名称应该处于同一行&#xff0c;而且图片应该是嵌入在单元格中 程序参考&#xff1a;

linux系统Nginx服务Rewrite重写

文章目录 一、Rewrite跳转场景二、Rewrite跳转实现三、Rewrite实际场景1.Nginx跳转需求的实现方式2.rewrite放在 server{}、if{}、location{}段中3.对域名或参数字符串 四、Nginx正则表达式1.常用的正则表达式元字符2.正则表达式的优点 五、Rewrite命令1.Rewrite命令语法2.flag…

数字IC前端学习笔记:仲裁轮询(六)

相关文章 数字IC前端学习笔记&#xff1a;LSFR&#xff08;线性反馈移位寄存器&#xff09; 数字IC前端学习笔记&#xff1a;跨时钟域信号同步 数字IC前端学习笔记&#xff1a;信号同步和边沿检测 数字IC前端学习笔记&#xff1a;锁存器Latch的综合 数字IC前端学习笔记&am…

数据库学习3

主键使用 主键的使用 CREATE TABLE t17 (id INT PRIMARY KEY ,name VARCHAR(32), email VARCHAR(32)); 主键列的值不可以重复 INSERT INTO t17 VALUES(1,jack,jacksohu.com); INSERT INTO t17 VALUES(2,tom,tomsohu.com); INSERT INTO t17 VALUES(1,hsp,hspsohu.com); SELECT …

《计算机系统与网络安全》 第九章 访问控制技术

&#x1f337;&#x1f341; 博主 libin9iOak带您 Go to New World.✨&#x1f341; &#x1f984; 个人主页——libin9iOak的博客&#x1f390; &#x1f433; 《面试题大全》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33…

2022(二等奖)C859基于WebGIS的南京市排污口管理系统

作品介绍 一、需求分析 &#xff08;一&#xff09;社会需求 《国务院办公厅关于加强入河入海排污口监督管理工作的实施意见》明确提出&#xff0c;入河入海排污口(以下简称排污口&#xff09;是指直接或通过管道、沟、渠等排污通道向环境水体排放污水的口门&#xff0c;是流…

Day40

思维导图 练习 定义一个命名空间Myspace&#xff0c;包含以下函数&#xff1a;将一个字符串中的所有单词进行反转&#xff0c;并输出反转后的结果。例如&#xff0c;输入字符串为"Hello World"&#xff0c;输出结果为"olleH dlroW"&#xff0c;并在主函数…

Visual C++中的虚函数和纯虚函数的定义

我是荔园微风&#xff0c;作为一名在IT界整整25年的老兵&#xff0c;今天来说说Visual C中的虚函数和纯虚函数。 直接说虚函数和纯虚函数有很多人会直接晕&#xff0c;但是来看这篇帖子的很多人是有JAVA或其他面象对象编程基础的&#xff0c;我要不就先作个类比&#xff0c;究…