浅析RabbitMQ死信队列

news2024/11/15 14:07:46

原文首发于公众号【CSJerry】
在这里插入图片描述

在现代分布式系统中,消息队列扮演着至关重要的角色。它们可以实现应用程序之间的异步通信,并确保数据的可靠传输和处理。而在这个领域中,RabbitMQ作为一种强大而受欢迎的消息队列解决方案,具备了高效、可靠和灵活的特性。

然而,即使使用了RabbitMQ,我们仍然会遇到一些不可预料的情况,例如消费者无法处理某些消息、消息过期或者队列溢出等。为了解决这些问题,RabbitMQ引入了死信队列(Dead Letter Queue)的概念,为开发人员提供了一种有效的错误处理机制。

那么,究竟什么是死信队列呢?

本文结合Spring Boot使用RabbitMQ的死信队列,着重从是什么、为什么、怎么用几个方面对死信队列进行简单介绍。

1. 是什么:

  • 死信队列(Dead Letter Queue)是一种特殊的消息队列,用于存储无法被消费的消息。
  • 当消息满足某些条件无法被正常消费时,将被发送到死信队列中进行处理。
  • 死信队列提供了一种延迟处理、异常消息处理等场景的解决方案。

2. 为什么

  • 用来处理消费者无法正确处理的消息,避免消息丢失或积压
  • 实现延迟消息处理,例如订单超时未支付,可以将该消息发送到死信队列,然后再进行后续处理。
  • 用于实现消息重试机制,当消费者处理失败时,将消息重新发送到死信队列进行重试。
  • 提高了系统的可伸缩性和容错性,能够应对高并发和异常情况。

3. 怎么用

  1. 在Spring Boot中配置和使用死信队列:
    • 首先,在pom.xml文件中添加RabbitMQ的依赖项。
    • 然后,在application.properties文件中配置RabbitMQ连接信息。
    • 接下来,创建生产者和消费者代码,并通过注解将队列和交换机进行绑定。
    • 在队列的声明中添加死信队列的相关参数,如x-dead-letter-exchangex-dead-letter-routing-key等。
    • 最后,在消费者中编写处理消息的逻辑,包括对异常消息进行处理,并设置是否重新发送到死信队列。

简而言之,死信队列可以认为是一个正常队列的备用队列(或者说是兜底队列),当正常队列的消息无法消费的时候mq会重新把该消息发送到死信交换机,由死信交换机根据路由键将消息投递到备用队列,启动服务备用方案。

消息从正常队列到死信队列的三种情况:

1、消息被否定确认使用 channel.basicNackchannel.basicReject ,并且此时requeue 属性被设置为false

2、消息在队列中的时间超过了设置的TTL())时间。

3、消息数量超过了队列的容量限制()。

当一个队列中的消息满足上述三种情况任一个时,改消息就会从原队列移至死信队列,若改队列没有绑定死信队列则消息被丢弃。

4. 实战

以下是一个简单的Spring Boot集成RabbitMQ的死信队列示例代码:

  • 配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=rabbit
spring.rabbitmq.password=123456
# 开启消费者手动确认
spring.rabbitmq.listener.type=direct

# 发送到队列失败时的手动处理
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.publisher-returns=true

# 发送到交换机手动确认
spring.rabbitmq.publisher-confirm-type=simple
  • 配置类
@Configuration
@Slf4j
public class RabbitCof {

    @Resource
    private MqKeys mqKeys;

    @Bean("normalQueue")
    public Queue normalQueue() {
        /**
         * 为普通队列绑定交换机
         */
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", mqKeys.DIE_EXCHANGE);
        args.put("x-dead-letter-routing-key", mqKeys.DIE_ROUTING_KEY);
        args.put("x-message-ttl", 1000); // 队列中的消息未被消费则1秒后过期
        return new Queue(mqKeys.NORMAL_QUEUE, true, false, false, args);
    }

    @Bean("normalExchange")
    public Exchange normalExchange() {
        return new DirectExchange(mqKeys.NORMAL_EXCHANGE);
    }

    @Bean("normalBind")
    public Binding normalBinding(@Qualifier("normalQueue") Queue normalQueue, @Qualifier("normalExchange") Exchange normalExchange) {
        return BindingBuilder.bind(normalQueue).to(normalExchange).with(mqKeys.ROUTING_KEY).noargs();
    }

    /**
     * 死信队列
     * @return
     */
    @Bean("dieQueue")
    public Queue dlQueue() {
        return new Queue(mqKeys.DIE_QUEUE, true, false, false);
    }

    /**
     * 死信交换机
     * @return
     */
    @Bean("dieExchange")
    public Exchange dlExchange() {
        return new DirectExchange(mqKeys.DIE_EXCHANGE);
    }

    @Bean("dieBind")
    public Binding dlBinding(@Qualifier("dieQueue") Queue dlQueue, @Qualifier("dieExchange") Exchange dlExchange) {
        return BindingBuilder.bind(dlQueue).to(dlExchange).with(mqKeys.DIE_ROUTING_KEY).noargs();
    }



    @Resource
    private ConnectionFactory connectionFactory;

    @Bean
    public RabbitTemplate rabbitTemplate() {

        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

        /**
         * 消费者确认收到消息后,手动ack回调处理
         * spring.rabbitmq.publisher-confirm-type=simple
         */
        rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause)->{
            if(!ack) {
                log.info("消息投递到交换机失败,correlationData={} ,ack={}, cause={}", correlationData == null ? "null" : correlationData.getId(), ack, cause);
            } else {
                log.info("消息成功投递到交换机,correlationData={} ,ack={}, cause={}", correlationData == null ? "null" : correlationData.getId(), ack, cause);
            }
        });

        /**
         * 消息投递到队列失败回调处理
         * spring.rabbitmq.listener.direct.acknowledge-mode=manual
         * spring.rabbitmq.publisher-returns=true
         */
        rabbitTemplate.setReturnsCallback((returnedMessage)->{
            Message message = returnedMessage.getMessage();
            log.error("分发到到队列失败, body->{}", message.getBody());
        });
        return rabbitTemplate;
    }
}
  • 生产者类
@Component
public class Producer {

    @Resource
    private  MqKeys mqKeys;

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String message) {
        rabbitTemplate.convertAndSend(mqKeys.NORMAL_EXCHANGE, mqKeys.ROUTING_KEY, message);
    }
}
  • 消费者类
@Component
@RabbitListener(queues = "normal.queue")
@Slf4j
public class Consumer {

    @RabbitHandler
    public void handleMessage(String data, Message message, Channel channel) {
        boolean success = false;
        int retryCount = 3;
        System.out.println(message.toString());
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        while (!success && retryCount-- > 0){
            try {
                // 处理消息
                log.info("收到消息: {}, deliveryTag = {}", data, deliveryTag);
                // 正常处理完毕,手动确认,此处不确认让他进入死信队列
//                success = true;
//                channel.basicAck(deliveryTag, false);
                Thread.sleep(3 * 1000L);
            }catch (Exception e){
                log.error("程序异常:{}", e.getMessage());
            }
        }
        // 达到最大重试次数后仍然消费失败
        if(!success){
            try {
                log.info("move to die queue");
                // 手动拒绝,移至死信队列
                /**
                 *
                 deliveryTag – the tag from the received AMQP.Basic.GetOk or AMQP.Basic.Deliver
                 multiple – true to reject all messages up to and including the supplied delivery tag; false to reject just the supplied delivery tag.
                 requeue – true if the rejected message(s) should be requeued rather than discarded/dead-lettered
                 */
                channel.basicNack(deliveryTag, false, false);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

以上代码演示了如何在Spring Boot中配置一个普通队列和一个死信队列,然后通过生产者发送消息到普通队列,在消费者中处理消息,并模拟了当发生异常时将消息重新发送到死信队列。

参考连接

  • [rabbit 官网]Dead Letter Exchanges — RabbitMQ

  • 具体代码仓库

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/839538.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

c语言——求n之内的素数和

//求n之内的素数和 //列如&#xff1a;2、3、5等 #include<stdio.h> #include<math.h> int main() {int i,j,k,n0;scanf("%d",&n);for(i2;i<n;i){k(int)sqrt(i);for(j2;j<k;j)if(i%j0)break;if(j>k){printf("%d,",i);n;if(n%50)p…

23.8.5总结(Web博客项目)

用户搜索&#xff0c;标签搜索。 主页面加上了标签和用户推荐 管理员页面还需要修改 还有这些功能点没有实现&#xff1a; 右键删除评论 点赞次数已达上限 删除博客 消息页面&#xff0c;收消息&#xff08;点赞和收藏要给被点赞这个博主发消息&#xff09; 管理员页面&#…

day52-Redis

Redis 1.Redis 1.1 RESP连接Redis 1.2 定义&#xff1a;是一个高性能的key-value数据库&#xff08;非关系型数据库&#xff09; 1.3 数据类型&#xff1a; key键的类型是字符串类型&#xff1b; 值的类型有五种&#xff1a;字符串String&#xff0c;哈希hash&#xff0…

layui之layer弹出层的icon数字及效果展示

layer的icon样式 icon如果在信息提示弹出层值(type为0)可以传入0-6&#xff0c;icon与图标对应关系如下&#xff1a; 如果是加载层&#xff08;type为3&#xff09;可以传入0-2&#xff0c;icon与图标对应关系如下&#xff1a;

javaAPI(五):System、Math、BigInteger、BigDecimal

System类 System类代表系统&#xff0c;系统级的很多属性和控制方法都放置在该类的内部。该类位于java.lang包。 由于该类的构造器是private的&#xff0c;所以无法创建该类的对象&#xff0c;也就是无法实例化该类。其内部的成员&#xff0c;所以也可以很方便的进行调用。变量…

STM32入门学习之独立看门狗(IWDG)

1.STM32的独立看门狗是一个具有独立时钟的片上外设。通常&#xff0c;为了防止程序卡死&#xff0c;可以设置看门狗定时复位。当看看门狗被使能之后&#xff0c;会按初始化时设置的计数值进行计数。当根据计数值计数的倒数时间为0时&#xff0c;便会自动复位程序&#xff0c;即…

嵌入式开发学习(STC51-12-I2C/IIC)

内容 在数码管右3位显示数字&#xff0c;从0开始&#xff0c;按K1键将数据写入到EEPROM内保存&#xff0c;按K2键读取EEPROM内保存的数据&#xff0c;按K3键显示数据加1&#xff0c;按K4键显示数据清零&#xff0c;最大能写入的数据是255&#xff1b; I2C介绍 I2C简介 I2C&…

CNN成长路:从AlexNet到EfficientNet(01)

一、说明 在 10年的深度学习中&#xff0c;进步是多么迅速&#xff01;早在 2012 年&#xff0c;Alexnet 在 ImageNet 上的准确率就达到了 63.3% 的 Top-1。现在&#xff0c;我们超过90%的EfficientNet架构和师生训练&#xff08;teacher-student&#xff09;。 如果我们在 Ima…

echarts 饼图的label放置于labelLine引导线上方

一般的饼图基础配置后长这样。 想要实现将文本放置在引导线上方&#xff0c;效果长这样 const options {// ...series: [{label: {padding: [0, -40],},labelLine: {length: 10,length2: 50,},labelLayout: {verticalAlign: "bottom",dy: -10,},},], };label.padd…

2配置篇:基础功能配置

前言 在上一章节中,我们学习了 NestJS CLI 的用法,得到了一套基础的项目工程。最开始做项目对比的时候也提到过,NestJS 作为一款自定义程度较高的框架,CLI 直接提供的基础功能虽然并不完善,但同时也为开发者提供了非常多的内置或配套的功能例如高速缓存、日志拦截、过滤器…

栈和队列(一) 栈操作详解

文章目录 一、物理结构和逻辑结构二、栈1、什么是栈2、栈中一些基本操作的实现Stack.hStack.c栈的初始化栈的销毁入栈出栈获得栈顶元素获得栈的元素数判断栈空 三、利用栈解决问题 一、物理结构和逻辑结构 栈和队列都属于逻辑结构&#xff0c;它们既可以用数组实现也可以用链表…

【小沐学前端】VuePress制作在线电子书、技术文档(VuePress + Markdown + node)

文章目录 1、简介1.1 VuePress简介1.2 它是如何工作的&#xff1f; 2、安装node3、安装VuePress4、配置VuePress4.1 修改标题4.2 修改导航条4.3 修改右侧栏4.4 修改正文 结语 1、简介 Vue驱动的静态网站生成器&#xff0c;生成的网页内容放到自己服务器上管理&#xff0c;可用于…

极光笔记 | 浅谈企业级SaaS产品的客户成长旅程管理(上)—— 分析篇

本文作者&#xff1a;陈伟&#xff08;极光用户体验部高级总监&#xff09; “企业级SaaS产品与C端互联网产品特征差异很大&#xff0c;有些甚至是截然相反&#xff0c;这些特征也会成为后续客户成长旅程的重要影响变量。本文就如何设计并服务好企业级SaaS产品客户成长旅程进行…

VUE之JWT前后端分离认证,学生管理系统

参考资料: SpringBoot搭建教程 SpringCloud搭建教程 JWT视频教程 JWT官网 Vue视频教程 JWT视频参考资料、VUE视频资料,及前后端demo 特别有参考价值的JWT博客1 特别有参考价值的JWT博客2 cookie、localstorage和sessionStorage的区别1 cookie、localstorage和sessi…

第1章 什么是JavaScript

引言 JavaScript最早诞生的原因是希望表单验证可以在客户端得到解决。频繁通过服务端的请求来验证表单缓慢的网速让页面每次刷新都考验着人们的耐心。 如今的js不再局限简单的表单验证&#xff0c;能够实现复杂的计算与交互&#xff0c;包括闭包、匿名&#xff08;lambda&…

Android Glide MemorySizeCalculator计算值,Kotlin

Android Glide MemorySizeCalculator计算值,Kotlin for (i in 100..1000 step 50) {val calculator MemorySizeCalculator.Builder(this).setMemoryCacheScreens(i.toFloat()).setBitmapPoolScreens(i.toFloat()).setMaxSizeMultiplier(0.8f).setLowMemoryMaxSizeMultiplier(0…

使用U盘重装Windows10系统详细步骤及配图【官方纯净版】

文章目录 1.制作启动盘1.1准备U盘及一台电脑1.2下载win10安装包 2.安装操作系统2.1插入系统安装盘2.2设置启动盘为第一启动项2.3开始安装操作系统 3.安装成功后进入图形界面3.1启动问题3.2驱动问题3.3调出"控制面板"3.4给磁盘分区 4.win10激活 前天下午不知道怎么想的…

springBean生命周期解析

本文基于Spring5.3.7 参考&#xff1a; kykangyuky Spring中bean的生命周期 阿斌Java之路 SpringBean的生命周期&#xff0c; 杨开振 JavaEE互联网轻量级框架整合开发 黑马程序员 JavaEE企业级应用开发教程 马士兵 Spring源码讲解 一. SpringBean生命周期流程图 二. 示例代码 …

升级到mybatis-plus,系统启动的一些问题

在分表后mybatis-plus删除操作失效等问题处理 mybatis-plus 旧系统重构遇到的种种问题 在这三篇文章中&#xff0c;我花了近1个月时间重构了28个微服务&#xff0c;当中遇到的一些问题&#xff0c;但是发布到pretest环境&#xff0c;却还有启动问题&#xff0c;看来系统重构不是…

变压器参数测定中空载实验和短路实验的理解

确定变压器的参数是在《电机学》和《电力系统分析》中非常重要的一个环节&#xff0c;这里用自己习惯的方式讲一下怎样理解 首先要讲下变压器的额定参数&#xff0c;这个也是个常考的知识点 额定功率&#xff0c;即视在功率&#xff0c;电压电流&#xff0c;单位是VA或者kVA额…