RabbitMQ 死信队列应用

news2024/11/23 15:27:04

1. 概念

死信队列(Dead Letter Queue)是在消息队列系统中的一种特殊队列,用于存储无法被消费的消息。消息可能会因为多种原因变成“死信”,例如消息过期、消息被拒绝、消息队列长度超过限制等。当消息变成“死信”时,它们会被路由到死信队列中,以便进行进一步处理或分析。 死信队列能够帮助系统进行消息跟踪、监控和处理异常情况,是消息队列系统中的重要组成部分。

2. 应用场景

死信队列在消息队列系统中有多种应用场景,包括但不限于以下几个方面:

  • 延迟消息处理:实现延迟消息投递,例如实现消息的定时投递、消息重试机制等。

  • 任务调度:用于实现任务调度系统,例如延迟执行任务、失败重试任务等。

  • 异常处理:处理消息消费失败或超时的情况,对异常消息进行统一处理。

  • 业务流程控制:实现业务流程中的状态控制和超时处理,例如订单超时取消、支付超时处理等。

  • 监控和统计:对异常消息进行统计和分析,用于系统性能监控和问题排查。

这些应用场景展示了死信队列的灵活性和实用性,在实际系统开发中具有广泛的应用价值。

3. 造成消息进入死信队列的原因

消息成为死信的原因有以下几种:

  • 消息被拒绝(basic.reject或basic.nack),并且requeue标志被设置为false。若参数requeue为true,则表示还可以将此跳消息重新塞回普通队列,若为false则消息被拒绝后直接进入死信队列。

  • 消息过期。在生产者设置生产时设置,若消费者未在过期时间内消费消息,则消息被转发到死信队列中。("x-message-ttl")

  • 队列达到最大长度。当普通队列中消息堆积数量长度达到了maxLength,则会将新接收的消息转发到死信队列中去,从而避免消息丢失。

4. 死信队列工作流程图

5. 代码示例

5.1 引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.7.15</version>
</dependency>

5.2 RabbitMQ配置

@Configuration
public class RabbitConfig {

    /**
     * 死信队列消息模型构建----------------------------------------------------------------------------------
     **/
    // 创建普通队列
    @Bean
    public Queue basicQueue() {
        Map<String, Object> params = new HashMap<>(8);
        // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
        params.put("x-dead-letter-exchange", Exchange.DEMO_DEAD_LETTER_EXCHANGE);
        // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
        params.put("x-dead-letter-routing-key", RoutingKey.DEMO_DEAD_ROUTING_KEY);
        // 注意这里是毫秒单位,这里我们给10秒
        params.put("x-message-ttl", 10*1000);
        return new Queue(MyQueue.DEMO_CONSUMER_QUEUE, true, false, false, params);
    }

    //创建“基本消息模型”的基本交换机,面向生产者
    @Bean
    public TopicExchange basicExchange() {
        //创建并返回基本交换机实例
        return new TopicExchange(Exchange.DEMO_BASIC_NORMAL_EXCHANGE, true, false);
    }

    //创建“基本消息模型”的基本绑定(基本交换机+基本路由),面向生产者
    @Bean
    public Binding basicBinding() {
        //创建并返回基本消息模型中的基本绑定(注意这里是正常交换机跟死信队列绑定在一定,不叫死信路由)
        return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(RoutingKey.DEMO_ROUTING_KEY);
    }

    // 创建死信交换机
    @Bean
    public TopicExchange deadLetterExchange() {
        //创建并返回死信交换机实例
        return new TopicExchange(Exchange.DEMO_DEAD_LETTER_EXCHANGE, true, false);
    }

    // 创建第二个中转站
    // 创建死信队列
    @Bean
    public Queue deadLetterQueue() {
        return new Queue(MyQueue.DEMO_DEAD_LETTER_QUEUE, true);
    }

    // 创建死信路由及其绑定
    @Bean
    public Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(RoutingKey.DEMO_DEAD_ROUTING_KEY);
    }


    public static class Exchange {
        public static final String DEMO_BASIC_NORMAL_EXCHANGE = "demo.basic.exchange";

        public static final String DEMO_DEAD_LETTER_EXCHANGE = "demo.dead.letter.exchange";
    }

    public static class RoutingKey {
        //交换机与报表队列绑定的RoutingKey
        public static final String DEMO_ROUTING_KEY = "demo.basic.routing.key";

        public static final String DEMO_DEAD_ROUTING_KEY = "demo.dead.routing.key";
    }

    /**
     * 队列名称
     * @author peng.zhang
     * @date 2024/01/30
     */
    public static class MyQueue {
        //报表队列名称
        public static final String DEMO_CONSUMER_QUEUE = "demo.basic.queue";

        //死信队列名称
        public static final String DEMO_DEAD_LETTER_QUEUE = "demo.dead.letter.queue";
    }
}

5.3 消息生产者

@RestController
@RequestMapping("/test")
@Slf4j
public class TestController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送消息到死信队列
     */
    @PostMapping("/testDeadQueue")
    public String testDeadQueue() {
        // 设置生产者到交换机的确认回调
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            log.info("correlationData:{}, ack:{}, cause:{}", JSON.toJSONString(correlationData), ack, cause);
        });
        // 设置消息未被队列接收时的返回回调
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, ex, routing) -> {
            log.info("message:{}, replyCode:{}, replyText:{}, exchange:{}, routingKey:{}", JSON.toJSONString(message),
                    replyCode, replyText, ex, routing);
        });
        // 生成关联数据并发送消息到交换机
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        // 消息内容
        String messageBody = StrUtil.format("this message send at {}", DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"));
        rabbitTemplate.convertAndSend(RabbitConfig.Exchange.DEMO_BASIC_NORMAL_EXCHANGE, RabbitConfig.RoutingKey.DEMO_ROUTING_KEY, messageBody, correlationData);
        log.info(">>>>>{}, 发送消息:{}", DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"), messageBody);
        return "OK";
    }

}

5.4 消息消费者

@Component
@Slf4j
public class DeadLetterConsumer {
    /**
     * 监听 DEMO_CONSUMER_QUEUE 并处理传入的消息。
     * 为测试目的抛出 IOException 以模拟异常。
     *
     * @param messageBody 消息负载
     * @param headers     消息头
     * @param channel     用于消息确认的通道
     * @throws IOException 如果抛出异常
     */
    @RabbitListener(queues = RabbitConfig.MyQueue.DEMO_CONSUMER_QUEUE)
    @RabbitHandler
    public void testBasicQueueAndThrowsException(@Payload String messageBody, @Headers Map<String, Object> headers, Channel channel) throws IOException {
        /**
         * Delivery Tag 用来标识信道中投递的消息。RabbitMQ 推送消息给 Consumer 时,会附带一个 Delivery Tag,
         * 以便 Consumer 可以在消息确认时告诉 RabbitMQ 到底是哪条消息被确认了。
         * RabbitMQ 保证在每个信道中,每条消息的 Delivery Tag 从 1 开始递增。
         */
        Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);

        log.info(">>>>>{} 普通队列消费, tag = {}, 消息内容:{}", DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"), tag, messageBody);
        /**
         *  multiple 取值为 false 时,表示通知 RabbitMQ 当前消息被确认
         *  如果为 true,则额外将比第一个参数指定的 delivery tag 小的消息一并确认
         */
        // ACK,确认一条消息已经被消费
//        channel.basicAck(deliveryTag, false);

        // 对应的业务操作。。。。。
        // doBusiness();

        // 模拟消息拒绝
        channel.basicNack(tag, false, false);
    }

    /**
     * 处理业务逻辑
     */
    private void doBusiness() {
        System.out.println("here do some business code");
    }


    /**
     * 监听死信队列并处理消息。
     *
     * @param data    消息内容
     * @param tag     消息标签
     * @param channel 通道
     */
    @RabbitListener(queues = RabbitConfig.MyQueue.DEMO_DEAD_LETTER_QUEUE)
    @RabbitHandler
    public void fromDeadLetter(@Payload String data, @Header(AmqpHeaders.DELIVERY_TAG) long tag, Channel channel) {
        log.info(">>>>>{} 死信队列消费, tag = {}, 消息内容:{}", DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"), tag, data);
        // 对应的业务操作。。。。。
    }
}

5.5 YML配置

spring:
  rabbitmq:
    username: rabbitmq
    password: rabbitmq
    port: 5672
    host: 127.0.0.1
    #publisher-confirm-type参数有三个可选值:
    #SIMPLE:会触发回调方法,相当于单个确认(发一条确认一条)。
    #CORRELATED:消息从生产者发送到交换机后触发回调方法。
    #NONE(默认):关闭发布确认模式。
    publisher-confirm-type: correlated
    template:
      receive-timeout: 1800000
      reply-timeout: 1800000
      retry:
        enabled: false
    listener:
      direct:
        retry:
          enabled: true
        default-requeue-rejected: false
      simple:
        retry:
          # 是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息)
          enabled: true
          # 最大重试次数
          max-attempts: 1
          # 重试间隔时间(单位毫秒)
          initial-interval: 10000
          # 重试最大时间间隔(单位毫秒)
          max-interval: 300000
          # 应用于前一重试间隔的乘法器
          multiplier: 5
        default-requeue-rejected: false

5.6 控制台输出

从控制台可以看出,消息被拒绝后,大概10秒后死信队列消息被消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1425005.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

对付勒索病毒,复杂的往往无法落地

一道道复杂门墙防护安全&#xff0c; 还是一个精密的锁更安全&#xff1f; &#x1f447;&#x1f447;&#x1f447; 在网络数据安全问题频发的当下&#xff0c;除了常规的备份、灾备措施以外&#xff0c;企业是否有做好应对最坏情况的准备&#xff1f;一旦病毒绕过了一道道…

极限的运算法则【高数笔记】

【定理】 1. 无穷小量 * 有界 无穷小量 简单理解为&#xff1a;0 乘以任何数都等于 0 &#xff0c;因为常数 0 是无穷小量 2. 设 lim f&#xff08;x&#xff09; a , lim g (x) b 加减&#xff1a;lim[f(x) g(x) ] lim f(x) g(x) a b 乘&#xff1a;lim[f(x)…

[Visual Studio] vs 2022中如何创建空白的解决方案

在Visual Studio 2022中创建一个空白的解决方案非常简单。请按照以下步骤操作&#xff1a; 打开Visual Studio。 在启动页面上&#xff0c;选择“创建新的项目”。 在“创建新项目”的对话框中&#xff0c;搜索“空白”。 在中间搜索结果中&#xff0c;选择“空白解决方案”…

同时添加多个的远程桌面工具,Windows远程桌面设置多用户同时登录

Windows Server 版本上的 Windows 远程桌面服务 (RDS) 允许多个用户同时登录。 但是&#xff0c;在标准的Windows桌面版本&#xff08;例如Windows 10&#xff09;上&#xff0c;默认情况下&#xff0c;远程桌面是为单个用户一次登录而设计的。 这被称为“管理远程桌面”模式。…

蓝桥杯2024/1/31----第十届省赛题笔记

题目要求&#xff1a; 1、 基本要求 1.1 使用大赛组委会提供的国信长天单片机竞赛实训平台&#xff0c;完成本试题的程序设计 与调试。 1.2 选手在程序设计与调试过程中&#xff0c;可参考组委会提供的“资源数据包”。 1.3 请注意&#xff1a; 程序编写、调试完成后选手…

vmware读取坏掉的虚拟磁盘vmdk文件

vmware centos 7系统挂了&#xff0c;开不了机&#xff0c;重新安装ubuntu22系统&#xff0c;挂载centos7的vmdk文件。 1.找到坏掉的系统的vmdk文件 2.新系统添加硬盘 选择刚才旧系统的vmdk文件&#xff1a; 完成以后&#xff0c;看到多了一块硬盘&#xff1a; 3.读取新硬盘 &…

计算机毕业设计 | springboot 多功能商城 购物网站(附源码)

1&#xff0c; 概述 国家大力推进信息化建设的大背景下&#xff0c;城市网络基础设施和信息化应用水平得到了极大的提高和提高。特别是在经济发达的沿海地区&#xff0c;商业和服务业也比较发达&#xff0c;公众接受新事物的能力和消费水平也比较高。开展商贸流通产业的信息化…

SpringMVC-基本概念

一、引子 我们在上篇文章Spring集成Web中抛出了一个问题&#xff1a;为什么我们一直在自用Java Web阶段使用的Servlet来承接客户端浏览器的请求呢&#xff0c;我们熟知甚至是已经在日常开发中经常使用的Controller又与之有什么关系呢&#xff1f;我们将在本篇文章解答读者的这…

GitHub的使用操作

记得看目录哦&#xff01; 1. 创建仓库2. 下载desktop3. 把创建的库克隆到本地4. 文件拷贝到本地仓库![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/7171ac6c4ca14e3b8d22717121f79c9e.png)5. 在网址后面加/compare进行比较6. 给系统添加功能 1. 创建仓库 2. 下载…

C练习——插入有序数组

题目&#xff1a; 编写一个函数&#xff0c;实现在一个升序数组中查找x应插入位置&#xff0c;将x插入数组中&#xff0c;并使其入仍升序 解析&#xff1a; 插入是数组基本操作&#xff0c;从第零位遍历数组与x比较&#xff0c;x小于或等于arr[i]时&#xff0c;从arr[i]开始…

SD-WAN和MPLS的区别以及如何选择?

网络连接技术的选择对企业来说至关重要。SD-WAN&#xff08;软件定义广域网&#xff09;和MPLS&#xff08;多协议标签交换&#xff09;是两种备受关注的网络连接方案。它们在架构、带宽、成本和管理等方面存在显著区别&#xff0c;企业应了解清楚这些区别再进行选择。 SD-WAN采…

openharmony开发版应用安装签名

配置签名信息 应用/服务在真机设备上运行&#xff0c;需要提前为应用/服务进行签名&#xff0c;DevEco Studio为开发者提供了自动化签名方案&#xff0c;可以一键完成应用/服务签名。具体操作如下&#xff1a; 单击File > Project Structure > Project > Signing Con…

神经网络与深度学习Pytorch版 Softmax回归 笔记

Softmax回归 目录 Softmax回归 1. 独热编码 2. Softmax回归的网络架构是一个单层的全连接神经网络。 3. Softmax回归模型概述及其在多分类问题中的应用 4. Softmax运算在多分类问题中的应用及其数学原理 5. 小批量样本分类的矢量计算表达式 6. 交叉熵损失函数 7. 模型预…

leetcode刷题(剑指offer)54.螺旋矩阵

54.螺旋矩阵 给你一个 m 行 n 列的矩阵 matrix &#xff0c;请按照 顺时针螺旋顺序 &#xff0c;返回矩阵中的所有元素。 示例 1&#xff1a; 输入&#xff1a;matrix [[1,2,3],[4,5,6],[7,8,9]] 输出&#xff1a;[1,2,3,6,9,8,7,4,5]示例 2&#xff1a; 输入&#xff1a;ma…

【数据库数据恢复】Oracle数据库ASM磁盘组数据恢复案例

oracle数据库故障&分析&#xff1a; oracle数据库ASM磁盘组掉线&#xff0c;ASM实例不能挂载。数据库管理员尝试修复数据库&#xff0c;但是没有成功。 oracle数据库数据恢复过程&#xff1a; 1、将oracle数据库所涉及磁盘以只读方式备份。后续的数据分析和数据恢复操作都…

帆软报表简单插入多个图表和表格

前言 帆软报表支持在一个普通报表&#xff08;*.cpt&#xff09;中插入多个图表或者文字表格。 系统环境 操作系统为 Windows 11 家庭版。帆软报表版本为 FinReport 9 &#xff0c; 数据库为 Oracle 11C 。 操作步骤 1、新建 cpt 报表 注意选择 “普通报表” 不是其他类型…

10个React状态管理库推荐

本文将为您推荐十款实用的React状态管理库&#xff0c;帮助您打造出高效、可维护的前端应用。让我们一起看看这些库的魅力所在&#xff01; 在前端开发中&#xff0c;状态管理是至关重要的一环。React作为一款流行的前端框架&#xff0c;其强大的状态管理功能备受开发者青睐。…

Maven的Docker镜像二次打包,再次推送至Harbor中

之所以如此操作&#xff0c;主要原因是&#xff0c;官版的镜像中默认的setting.xml已内置好&#xff0c;不容易修改&#xff0c; 重新二次打包&#xff0c;可以指定我们自己的setting.xml配置&#xff0c;配置自己的私服地址以及解决默认Maven仓库国内下载速度慢的问题 一、创…

elk之基础概念

写在前面 本文一起看下es的基础概念&#xff0c;比较枯燥的内容说&#xff0c;但不看又不行。开始。 1&#xff1a;document 文档&#xff0c;是es搜索存储数据的最小单元&#xff0c;相当于是MySQL的一行记录&#xff0c;但es中是一个json&#xff0c;如下是一个通过logsta…

SpringBoot实战(二十六)集成SFTP

目录 一、SFTP简介二、SpringBoot 集成2.1 Maven 依赖2.2 application.yml 配置2.3 DemoController.java 接口2.4 SftpService.java2.5 DemoServiceImpl.java 实现类2.6 SftpUtils.java 工具类2.7 执行结果1&#xff09;上传文件2&#xff09;下载文件3&#xff09;重命名文件&…