【RabbitMQ】死信队列、延迟队列

news2024/9/20 22:59:39

死信队列

死信,简单理解就是因为种种原因,无法被消费的消息。

有死信,自然就有死信队列。当一个消息在一个队列中变成死信消息之后,就会被重新发送到另一个交换器中,这个交换器就是DLX(Dead Letter Exchange),绑定该交换器的队列,就被称为死信队列DLQ(Dead Letter Queue)。

消息变成死信消息一般是由于以下几条:

  • 队列达到最大长度
  • 消息过期
  • 消息被拒绝,即消息确认机中手动确认的两种拒绝情况,并且不允许重新入队

队列达到最大长度

spring:
  rabbitmq:
    host: 43.138.108.125
    port: 5672
    username: admin
    password: admin
    virtual-host: mq-springboot-test
@Configuration
public class DeadConfig {

    // 正常队列,当正常队列中的消息出现一些不确定情况时,消息就会进入死信交换机中

    @Bean("normalQueue")
    public Queue normalQueue() {
        return QueueBuilder.durable(Constants.NORMAL_QUEUE)
                .deadLetterExchange(Constants.DEAD_EXCHANGE) // 设置死信交换机
                .deadLetterRoutingKey("dead") // 设置死信队列的路由键为dead
                .maxLength(3) // 设置队列的最大长度为3
                .build();
    }

    @Bean("normalExchange")
    public Exchange normalExchange() {
        return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).durable(true).build();
    }

    @Bean("normalQueueBind")
    public Binding normalQueueBind(@Qualifier("normalQueue") Queue queue,
                                   @Qualifier("normalExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();
    }

    // 死信队列

    @Bean("deadQueue")
    public Queue deadQueue() {
        return QueueBuilder.durable(Constants.DEAD_QUEUE).build();
    }

    @Bean("deadExchange")
    public Exchange deadExchange() {
        return ExchangeBuilder.directExchange(Constants.DEAD_EXCHANGE).durable(true).build();
    }

    @Bean("deadQueueBind")
    public Binding deadQueueBind(@Qualifier("deadQueue") Queue queue,
                                 @Qualifier("deadExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("dead").noargs();
    }

}
@RestController
@RequestMapping("/dead")
public class DeadController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @RequestMapping
    public void deadQueue() {
        this.rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "hello 死信");
        System.out.println("正常队列发送消息成功");
    }

}
@Configuration
public class DeadListener {

    @RabbitListener(queues = Constants.DEAD_QUEUE)
    public void deadListener(String msg) {
        System.out.println("死信队列接收到消息:" + msg);
    }

}

在上述代码中,主要内容是声明了正常队列、交换机和绑定关系以及声明死信队列、死信交换机以及其绑定关系、正常队列的生产者代码、死亡队列的消费者代码。

队列达到最大长度和死信消息要转发到的DLX和路由键都是由正常队列在声明时进行绑定的。

启动上述程序之后,当正常队列存在三条消息之时,假设再来消息,那么消息就要进入死信交换机,从而路由到死信队列了。如下图可以看出,当发送第四条消息之后,死信队列的消费者就消费了一条消息:

在上述图片中,D表示队列是持久化的,Lim表示队列有最大长度,DLX表示队列存在死信交换机、DLK表示队列存在路由键。把鼠标放在这些字母上方,详细的消息都会表示。

在下述代码中,主要是对上述代码改进之后地方的指出,并没有把所有的代码全部给出。

消息过期

消息过期分为两种,一种是设置队列过期时间让消息过期,另一种是设置消息过期时间让消息过期,都可以进行测试。

设置队列过期时间

    @Bean("normalQueue")
    public Queue normalQueue() {
        return QueueBuilder.durable(Constants.NORMAL_QUEUE)
                .deadLetterExchange(Constants.DEAD_EXCHANGE) // 设置死信交换机
                .deadLetterRoutingKey("dead") // 设置死信队列的路由键为dead
//                .maxLength(3) // 设置队列的最大长度为3
                .ttl(5 * 1000) // 设置队列的过期时间为5秒
                .build();
    }

 由上图以及结合代码可以看出,将消息由正常生产者发送给Broker之后,大概5秒钟之后,消息过期。此时消息就会发送给死信交换机,从而交给其对应的消费者消费。

设置消息的过期时间

@Slf4j
@RestController
@RequestMapping("/dead")
public class DeadController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @RequestMapping
    public void deadQueue() {
        // 设置消息的过期时间
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("5000");
                return message;
            }
        };
        this.rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "hello 死信", messagePostProcessor);
        log.info("死信队列发送成功");
    }

}

同样,结合上图和代码来说,19秒的时候消息发送功,24秒的时候死信消费者消费消息成功。

消息被拒绝

spring:
  rabbitmq:
    host: 43.138.108.125
    port: 5672
    username: admin
    password: admin
    virtual-host: mq-springboot-test
    listener:
      simple:
        acknowledge-mode: manual # 消息确认机制,手动确认
@Slf4j
@Configuration
public class DeadListener {

    // 正常队列接收消息
    @RabbitListener(queues = Constants.NORMAL_QUEUE)
    public void normalListener(Channel channel, String msg, Message message) throws IOException {
        try {
            log.info("正常队列监听器接收消息:{}", msg);
            int num = 3 / 0;
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        } catch (Exception e) {
            log.error("正常队列监听器接收消息异常:{}", e.getMessage());
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        }
    }

    // 死信队列接收消息
    @RabbitListener(queues = Constants.DEAD_QUEUE)
    public void deadListener(String msg, Channel channel, Message message) throws IOException {
        try {
            log.info("死信队列监听器接收消息:{}", msg);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        } catch (Exception e) {
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }

}

由上图以及代码可以看到,当消息的确认机制是手动确认时,当出现异常并且拒绝消息重新入队以后,消息就会来到死信队列中。

使用场景

用户支付订单之后,支付系统会给订单系统返回当前订单的支付状态。为了保证支付信息不丢失,需要使用到死信队列机制。当消息消费异常时,将消息投入到死信队列,由订单系统的其他消费者来监听这个队列,并对数据进行处理(比如发送工单等,进行人工确认)。

消息重试:将死信消息发送到原队列或另一个队列进行重试处理。

消息丢弃:直接丢弃这些无法处理的消息,避免占用系统资源。

日志收集:将死信消息做为日志收集起来,用户后续分析和问题定位。

延迟队列

概念

延迟队列就是消息发送之后,并不想让消费者立即拿到消息,而是在等待特定时间之后,消费者才能拿到消息进行消费

应用场景

  1. 用户发起退款后,24小时内商家未处理,默认退款
  2. 用户注册成功后,三天后发送短信,提高用户活跃度
  3. 预定会议后,在会议开始前15分钟提醒众人参加会议
  4. 用户通过手机远程遥控家里的智能设备在指定时间进行工作,这就可以使用延迟队列。用户发送消息到延迟队列,当指定时间到了再将指令推送到智能设备。

实现方法

  1. RabbitMQ本身并没有实现延迟队列,因此可以使用TTL + 死信队列的方式来实现延迟队列。
  2. 安装延迟队列插件来实现延迟队列。

TTL + 死信队列

@Configuration
public class MockDelayConfig {

    @Bean("mockDelayNormalQueue")
    public Queue mockDelayNormalQueue() {
        return QueueBuilder.durable(Constants.MOCk_DELAY_NORMAL_QUEUE)
                .ttl(5000 * 10) // 设置消息过期时间为50秒
                .deadLetterExchange(Constants.MOCK_DELAY_DEAD_EXCHANGE) // 设置死信交换机
                .deadLetterRoutingKey("mock.delay.dead") // 设置死信路由键
                .build();
    }

    @Bean("mockDelayNormalExchange")
    public Exchange mockDelayNormalExchange() {
        return ExchangeBuilder.directExchange(Constants.MOCk_DELAY_NORMAL_EXCHANGE).durable(true).build();
    }

    @Bean("mockDelayNormalQueueBind")
    public Binding mockDelayNormalQueueBind(@Qualifier("mockDelayNormalQueue") Queue queue,
                                           @Qualifier("mockDelayNormalExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("mock.delay.normal").noargs();
    }

    @Bean("mockDelayDeadQueue")
    public Queue mockDelayDeadQueue() {
        return QueueBuilder.durable(Constants.MOCK_DELAY_DEAD_QUEUE).build();
    }

    @Bean("mockDelayDeadExchange")
    public Exchange mockDelayDeadExchange() {
        return ExchangeBuilder.directExchange(Constants.MOCK_DELAY_DEAD_EXCHANGE).durable(true).build();
    }

    @Bean("mockDelayDeadQueueBind")
    public Binding mockDelayDeadQueueBind(@Qualifier("mockDelayDeadQueue") Queue queue,
                                         @Qualifier("mockDelayDeadExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("mock.delay.dead").noargs();
    }

}
@Slf4j
@RestController
@RequestMapping("/mockDelay")
public class MockDelayController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @RequestMapping
    public void mockDelayQueue() {
        this.rabbitTemplate.convertAndSend(Constants.MOCk_DELAY_NORMAL_EXCHANGE,
                "mock.delay.normal", "hello 延迟队列");
        log.info("延迟队列生产者发送成功");
    }

}
@Slf4j
@Configuration
public class MockDelayListener {

    @RabbitListener(queues = Constants.MOCK_DELAY_DEAD_QUEUE)
    public void mockDelayListener(String msg) {
        log.info("模拟延迟队列消费者接收到消息:" + msg);
    }

}

在上述代码中,实现的功能是生产者发送消息后,消费者在50秒之后获得消息,对消息进行消费:

在TTL一文中,已经说明了RabbitMQ只会检查队首消息是否过期,不会扫描整个队列。因此如果想要放在模拟延迟队列中的消息过期时间不一致,那就会出现死信消息无法被及时处理的情况。因此,我们想要模拟实现延迟队列,就要确保队列中所有消息的过期时间是一致的。如果存在时间不一致的情况,我们就可以使用不同的模拟延迟队列来实现。

延迟队列插件

下载插件:官方网站进行下载(注意版本对应关系)

启动插件

rabbitma-plusins list // 查看插件列表

rabbitmq-plugins enable rabbitmq_delayed_message_exchange // 启动插件

service rabbitmq-server restart # 重启服务

如下图,当交换机中有了x-delayed-message就表示延迟插件安装成功 

代码测试

@Configuration
public class DelayConfig {

    @Bean("delayQueue")
    public Queue delayQueue() {
        return QueueBuilder.durable(Constants.DELAY_QUEUE).build();
    }

    @Bean("delayExchange")
    public Exchange delayExchange() {
        return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE)
                .delayed() // 延迟交换机
                .durable(true) // 持久化
                .build();
    }

    @Bean("delayQueueBind")
    public Binding delayQueueBind(@Qualifier("delayQueue") Queue delayQueue,
                                   @Qualifier("delayExchange") Exchange delayExchange) {
        return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay").noargs();
    }

}
@Slf4j
@RestController
@RequestMapping("/delay")
public class DelayController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @RequestMapping
    public void delayQueue() {
        for(int i = 0; i < 5; i++) {
            // 随机生成延迟时间
            Random random = new Random();
            int time = random.nextInt(20);
            // 消息处理器,设置延迟时间
            MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    message.getMessageProperties().setDelayLong((long) (time * 1000)); // 设置延迟时间
                    return message;
                }
            };
            // 发送消息到延迟队列
            this.rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "hello 延迟队列 " + i, messagePostProcessor);
            log.info("发送延迟队列第" + i + "消息成功,延迟时间为:" + time);
        }
    }

}
@Slf4j
@Configuration
public class DelayListener {

    @RabbitListener(queues = Constants.DELAY_QUEUE)
    public void delayListener(String msg) {
        log.info("延迟队列监听器,接收到的消息:{}", msg);
    }

}

本质上,延迟插件就是让消息停留在交换机中,等到延迟时间结束之后,再发送到对应的队列中去。 

两者对比

使用TTL + 死信队列的好处是不需要额外安装插件。缺点是受消息的延迟时间影响,同一个队列中的消息必须延迟时间相同。

使用延迟队列插件的好处是不受延迟时间影响,同一队列中的所有消息延迟时间可以不同,额外的插件使得延迟队列的实现比较容易。缺点是需要依赖特定的插件,并且插件的版本必须和对应的RabbitMQ相对应。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2150359.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

sicp每日一题[1.1-1.29]

补一下之前的题目 Exercise 1.1 Below is a sequence of expressions. What is the result printed by the interpreter in response to each expression? Assume that the sequence is to be evaluated in the order in which it is presented. Exercise 1.2 Translate the …

【C++ Primer Plus习题】16.10

大家好,这里是国中之林! ❥前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站。有兴趣的可以点点进去看看← 问题: 解答: #include <iostream> #include <string> #include <…

Vue2学习笔记(02条件渲染 、监视数据的原理)

1、v-if和v-show的区别 2、Vue监视数据的原理

8.安卓逆向-安卓开发基础-安卓四大组件1

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 内容参考于&#xff1a;图灵Python学院 本人写的内容纯属胡编乱造&#xff0c;全都是合成造假&#xff0c;仅仅只是为了娱乐&#xff0c;请不要盲目相信。 工…

Python酷库之旅-第三方库Pandas(122)

目录 一、用法精讲 541、pandas.DataFrame.take方法 541-1、语法 541-2、参数 541-3、功能 541-4、返回值 541-5、说明 541-6、用法 541-6-1、数据准备 541-6-2、代码示例 541-6-3、结果输出 542、pandas.DataFrame.truncate方法 542-1、语法 542-2、参数 542-3…

Linux进阶命令-scp

作者介绍&#xff1a;简历上没有一个精通的运维工程师。希望大家多多关注作者&#xff0c;下面的思维导图也是预计更新的内容和当前进度(不定时更新)。 经过上一章Linux日志的讲解&#xff0c;我们对Linux系统自带的日志服务已经有了一些了解。我们接下来将讲解一些进阶命令&am…

有没有自带财务管理功能的海外仓系统?

在全球化的商业环境中&#xff0c;海外仓作为连接国际市场的物流枢纽&#xff0c;其重要性日益凸显。然而&#xff0c;随着业务范围的扩展和费用类型的多样化&#xff0c;海外仓在财务管理上面临着诸多挑战。传统的手工计费和对账方式不仅耗时费力&#xff0c;而且容易出错&…

常用的k8s容器网络模式有哪些?

常用的k8s容器网络模式包括Bridge模式、Host模式、Overlay模式、Flannel模式、CNI&#xff08;ContainerNetworkInterface&#xff09;模式。K8s的容器网络模式多种多样&#xff0c;每种模式都有其特点和适用场景。Bridge模式适用于简单的容器通信场景&#xff1b;Host模式适用…

将阮一峰老师的《ES6入门教程》的源码拷贝本地运行和发布

你好同学&#xff0c;我是沐爸&#xff0c;欢迎点赞、收藏、评论和关注。 阮一峰老师的《ES6入门教程》应该是很多同学学习 ES6 知识的重要参考吧&#xff0c;应该也有很多同学在看该文档的时候&#xff0c;想知道这个教程的前端源码是怎么实现的&#xff0c;也可能有同学下载…

掌握Python-uinput:打造你的输入设备控制大师

文章目录 掌握Python-uinput&#xff1a;打造你的输入设备控制大师背景&#xff1a;为何Python-uinput不可或缺&#xff1f;Python-uinput是什么&#xff1f;如何安装Python-uinput&#xff1f;简单库函数使用方法创建虚拟设备模拟按键模拟鼠标移动模拟滚轮滚动关闭设备 场景应…

IP Source Guard技术原理与应用

目录 IP Source Guard概述 IP Source Guard源数据表项 IP Source Guard源数据-静态添加 IP Source Guard查看 IP Source Guard使用注意事项 IP Source Guard概述 局域网IP冲突给网络的运维带来很大困扰存在以下风险&#xff1a; 使用手工配置IP地址的方式上网&#xff0c…

Redis——C++库redisplusplus在Linux环境下的安装

目录 第一步&#xff0c;安装hiredis第二步&#xff0c;下载redis源码第三步&#xff0c;编译/安装 redis-plus-plus使用redis-plus-plus(以Centos为例)Ubuntu的Makefile 第一步&#xff0c;安装hiredis redis-plus-plus 是基于 hiredis 实现的&#xff0c;而hiredis 是⼀个 C…

【图像检索】基于傅里叶描述子的形状特征图像检索,matlab实现

博主简介&#xff1a;matlab图像代码项目合作&#xff08;扣扣&#xff1a;3249726188&#xff09; ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 本次案例是基于傅里叶描述子的形状特征图像检索&#xff0c;用matlab实现。 一、案例背景和算法…

企业数字化底座与数字化转型方案(可编辑的81页PPT)

方案介绍&#xff1a;在当今数字化转型的浪潮中&#xff0c;企业数字化底座与数字化转型方案是企业应对市场变化、提升竞争力的关键举措。通过构建数字化底座&#xff0c;实现数据的集中管理和共享&#xff1b;通过数字化转型方案的实施&#xff0c;推动企业的全面数字化改造。…

进阶版水仙花数水是指一个n位数,各个位数字的n次方之和等于该数字本身

两种方法&#xff1a; 第一种&#xff0c;是输入一个数值&#xff0c;判断是否为水仙花数 //打印水仙花数 //水仙花数是指一个n位数&#xff0c;各个位数字的n次方之和等于该数字本身 //如&#xff1a;1531^35^33^3 // //分析&#xff1a; //153/1015 //15/101 //1/100 #incl…

.whl文件下载及pip安装

以安装torch_sparse库为例 一、找到自己需要的版本&#xff0c;点击下载。 去GitHub的pyg-team主页中找到pytorch-geometric包。网址如下&#xff1a; pyg-team/pytorch_geometric​github.com/pyg-team/pytorch_geometric 然后点击如图中Additional Libraries位置的here&am…

Qt 多线程TCP客户端使用QTimer进行重连服务器———附带详细代码和讲解

文章目录 0 背景1 原理1.1 QThread的线程归属1.2 Qtimer使用1.3 TCP客户端使用 2 问题解决2.1 解决思路2.2 解决方法 3 完整的代码示例3.1 tcp_client类3.2 主界面类 附录参考 0 背景 在子线程中&#xff0c;使用Qtimer来进行定时重连TCP服务器&#xff0c;总是会出现跨线程创…

U盘显示未被格式化:深度解析、恢复策略与预防之道

现象透视&#xff1a;U显示未被格式化的迷局 在日常的数字生活中&#xff0c;U盘作为我们随身携带的数据仓库&#xff0c;承载着无数重要的文件与回忆。然而&#xff0c;当U盘突然弹出“未被格式化”的警告时&#xff0c;这份便捷瞬间转化为焦虑与不安。这一提示不仅意味着U盘…

uboot:源码分析-启动第一阶段-start.S解析

start.S引入 进入start.S文件中&#xff0c;发现57行中就是_start标号的定义处 SourceInsight中添加行号 在SI中&#xff0c;如果我们知道我们要找的文件的名字&#xff0c;但是我们又不知道他在哪个目录下&#xff0c;我们要怎样找到并打开这个文件&#xff1f;方法是在SI中先…

多重指针变量(n重指针变量)实例分析

0 前言 指针之于C语言&#xff0c;就像子弹于枪械。没了子弹的枪械虽然可以用来肉搏&#xff0c;却失去了迅速解决、优雅解决战斗的能力。但上了膛的枪械也非常危险&#xff0c;时刻要注意是否上了保险&#xff0c;使用C语言的指针也是如此&#xff0c;要万分小心&#xff0c;…