【SpringBoot】SpringBoot整合RabbitMQ消息中间件,实现延迟队列和死信队列

news2025/1/18 20:19:19

  📝个人主页:哈__

期待您的关注 

目录

 一、🔥死信队列

RabbitMQ的工作模式

 死信队列的工作模式

 二、🍉RabbitMQ相关的安装 

三、🍎SpringBoot引入RabbitMQ

1.引入依赖

2.创建队列和交换器

2.1 变量声明 

2.2 创建延迟交换器

2.3 创建延迟队列

2.4 延迟队列绑定延迟交换器

2.5 死信队列配置

3. 添加application.yml

4. 添加RabbitMQListener (消费者)

5. 创建DelayMessageSender 

6. 创建Controller 

7.测试 

四、🍌死信队列的应用场景


 一、🔥死信队列

RabbitMQ的死信队列(Dead Letter Queue,DLQ)是一种特殊的队列,用于接收其他队列中的“死信”消息。所谓“死信”,是指满足一定条件而无法被消费者正确处理的消息,这些条件包括消息被拒绝、消息过期、消息达到最大重试次数等。

当消息成为死信时,RabbitMQ会将其重新发送到指定的死信队列,而不是丢弃它们。这样做的好处是可以对死信进行分析和处理,例如记录日志、重新入队或者进一步处理。

死信队列通常与RabbitMQ的延迟队列(Delayed Message Queue)一起使用,通过延迟队列延迟消息的处理时间,可以更容易地触发消息成为死信的条件,从而进行测试和调试。

死信队列在消息中间件中有许多实际应用场景,主要用于处理无法被正常消费的消息,增强了消息的可靠性和处理能力。以下是一些常见的应用场景:

  1. 延迟消息处理:通过将消息发送到延迟队列,在指定的时间后再将消息发送到目标队列,实现延迟处理消息的功能。

  2. 消息重试:当消费者无法处理消息时,消息可以被重新发送到队列并设置重试次数,达到最大重试次数后转发到死信队列,以便进行进一步处理。

  3. 异常处理:当消息无法被消费者正常处理时(如格式错误、业务异常等),将消息转发到死信队列,用于记录日志、报警或人工处理。

  4. 消息超时处理:当消息在队列中等待时间过长时,可以设置消息的过期时间(TTL),超过时间后将消息转发到死信队列。

  5. 消息路由失败:当消息无法被正确路由到目标队列时,可以将消息发送到死信队列,避免消息丢失。

  6. 消息版本兼容性处理:当消息的格式或内容发生变化时,通过死信队列可以处理老版本消息,确保新版本系统的兼容性。


RabbitMQ的工作模式


 死信队列的工作模式

今天我要实现的就是这个延迟队列和死信队列。生产者首先向延迟队列发送消息,待达到TTL后消息会被转送到死信队列当中,消费者会从死信队列中获取消息进行消费。

 二、🍉RabbitMQ相关的安装 

win10 安装rabbitMQ详细步骤_rabbitmq 安装-CSDN博客

我这里直接引用别人的文章了,下载需要大家去看一看。

RabbitMQ延迟插件的安装。

[超详细]RabbitMQ安装延迟消息插件_rabbitmq安装延迟插件-CSDN博客

三、🍎SpringBoot引入RabbitMQ

1.引入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </dependency>

2.创建队列和交换器

这一步是很重要的,如果你配置错误了,消息很可能无法正确的传送。要实现延迟队列和死信队列,我们一共要创建以下几个组件:

  1. 延迟队列
  2. 延迟队列的交换器
  3. 死信队列
  4. 死信队列的交换器

在我们创建了这几个组件之后,我们还要干一些事情,我们需要把这些组件进行组装,如果你不了解RabbitMQ的基础,你可以先看看基础教学,我这里简单的说一下。RabbitMQ中有一种绑定方式,这种绑定方式会把BindingKey和RoutingKey完全匹配的进行绑定,如下图所示,生产者发送了一个BindingKey为“warning”的消息,那么这个消息就会被发送到Queue1和Queue2,这并不难理解。

我们要做的就是把队列和交换器通过一个RoutingKey绑定在一起。


2.1 变量声明 

 接下来的代码要好好看了,首先我们把我们后边要用到的名称变量全部定义出来。因为这个名称起的很长,我们不方便直接使用。创建DeadRabbitConfig。在类中定义如下变量,延迟队列交换器名称、延迟队列名称、延迟队列Routing名称。除此之外还有死信队列交换器名称、死信队列名称和死信Routing名称。

  // 延迟队列交换器名称
    public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange";
    // 延迟队列A名称
    public static final String DELAY_QUEUE_A_NAME = "delay.queue.demo.business.queue_a";
    // 延迟队列B名称
    public static final String DELAY_QUEUE_B_NAME = "delay.queue.demo.business.queue_b";
    // 延迟队列routingA名称
    public static final String DELAY_QUEUE_ROUTING_A_NAME = "delay.queue.demo.business.queue_a.routing_key";
    // 延迟队列routingB名称
    public static final String DELAY_QUEUE_ROUTING_B_NAME = "delay.queue.demo.business.queue_b.routing_key";

    // 死信队列
    public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange";
    public static final String DEAD_LETTER_QUEUE_A_ROUTING_KEY = "delay.queue.demo.deadletter.delay_10s.routing_key";
    public static final String DEAD_LETTER_QUEUE_B_ROUTING_KEY = "delay.queue.demo.deadletter.delay_60s.routing_key";
    public static final String DEAD_LETTER_QUEUE_A_NAME = "delay.queue.demo.deadletter.queue_a";
    public static final String DEAD_LETTER_QUEUE_B_NAME = "delay.queue.demo.deadletter.queue_b";

2.2 创建延迟交换器

// 注册延迟交换器delayExchange
    @Bean("delayExchange")
    public DirectExchange delayExchange(){
        return  new DirectExchange(DELAY_EXCHANGE_NAME);
    }

2.3 创建延迟队列

这里的延迟队列需要我们额外的配置一些参数,用于和死信队列进行信息发送。这里我是用了两种不同的方式构建延迟队列A和延迟队列B,在延迟队列A种我没有设置TTL参数,而是通过RabbitMQ的延迟插件实现的,而延迟队列B我设置了TTL为10000ms,也就是十秒,十秒内消息如果没有被消费掉就会发送到死信队列。

// 注册延迟队列A   还要绑定死信交换器和死信routingA
    @Bean("delayQueueA")
    public Queue delayQueueA(){
        Map<String,Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);
        args.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUE_A_ROUTING_KEY);
        //args.put("x-message-ttl",6000);
        return QueueBuilder.durable(DELAY_QUEUE_A_NAME).withArguments(args).build();
    }
    // 注册延迟队列B   还要绑定死信交换器和死信routingB
    @Bean("delayQueueB")
    public Queue delayQueueB(){
        Map<String,Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);
        args.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUE_B_ROUTING_KEY);
        args.put("x-message-ttl",10000);
        return QueueBuilder.durable(DELAY_QUEUE_B_NAME).withArguments(args).build();
    }

2.4 延迟队列绑定延迟交换器

 // 延迟队列A绑定交换器
    @Bean
    public Binding delayQueueABinding(@Qualifier("delayQueueA") Queue queue, @Qualifier("delayExchange") DirectExchange delayExchange){
        return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_QUEUE_ROUTING_A_NAME);
    }
    // 延迟队列B绑定交换器
    @Bean
    public Binding delayQueueBBinding(@Qualifier("delayQueueB") Queue queue,@Qualifier("delayExchange") DirectExchange delayExchange){
        return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_QUEUE_ROUTING_B_NAME);
    }

2.5 死信队列配置

与延迟队列不同的是,死信队列并没有配置延迟参数。

// 注册死信队列A
    @Bean("deadLetterQueueA")
    public Queue deadLetterQueueA(){
        return new Queue(DEAD_LETTER_QUEUE_A_NAME);
    }
    // 注册死信队列B
    @Bean("deadLetterQueueB")
    public Queue deadLetterQueueB(){
        return new Queue(DEAD_LETTER_QUEUE_B_NAME);
    }
    // 注册死信交换器
    @Bean
    public DirectExchange deadLetterExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }
    // 死信队列A绑定死信交换器
    @Bean
    public Binding deadLetterQueueABinding(@Qualifier("deadLetterQueueA") Queue queue, @Qualifier("deadLetterExchange") DirectExchange deadLetterExchange){
        return BindingBuilder.bind(queue).to(deadLetterExchange).with(DEAD_LETTER_QUEUE_A_ROUTING_KEY);
    }
    // 死信队列B绑定死信交换器
    @Bean
    public Binding deadLetterQueueBBinding(@Qualifier("deadLetterQueueB") Queue queue, @Qualifier("deadLetterExchange")DirectExchange deadLetterExchange){
        return BindingBuilder.bind(queue).to(deadLetterExchange).with(DEAD_LETTER_QUEUE_B_ROUTING_KEY);
    }

到此为止,RabbitMQ的组件配置完成。


3. 添加application.yml

server:
  port: 8081
spring:
  application:
    name: test-rabbitmq-producer
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

4. 添加RabbitMQListener (消费者)

下方的代码一共有两个消费者,一个消费者获取死信队列A中的消息,另一个消费者获取死信队列B中的消息。

@Component
public class DeadLetterQueueConsumer {
    public static final Logger LOGGER = LoggerFactory.getLogger(DeadLetterQueueConsumer.class);

    @RabbitListener(queues = DeadRabbitConfig.DEAD_LETTER_QUEUE_A_NAME,ackMode = "MANUAL")
    public void receiveA(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        LOGGER.info("当前时间:{},死信队列A收到消息:{}", new Date().toString(), msg);
        System.out.println(message.getMessageProperties().getDeliveryTag());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    @RabbitListener(queues = DeadRabbitConfig.DEAD_LETTER_QUEUE_B_NAME,ackMode = "MANUAL")
    public void receiveB(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        LOGGER.info("当前时间:{},死信队列B收到消息:{}", new Date().toString(), msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

5. 创建DelayMessageSender 

这里采用的就是两种不同的方式,一种方式是使用插件来延迟消息的发送,另一种是通过TTL参数。

@Component
public class DelayMessageSender {
    @Resource
    RabbitTemplate rabbitTemplate;


    public void sendMessage(String msg,Integer delayTimes){
        switch (delayTimes){
            case 6:
                rabbitTemplate.convertAndSend(DeadRabbitConfig.DELAY_EXCHANGE_NAME, DeadRabbitConfig.DELAY_QUEUE_ROUTING_A_NAME,msg,new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setExpiration(String.valueOf(6000));
                        return message;
                    }
                });
                break;
            case 10:
                rabbitTemplate.convertAndSend(DeadRabbitConfig.DELAY_QUEUE_B_NAME,msg);
                break;
        }
    }
}

6. 创建Controller 

@RestController
@RequestMapping("/student")
public class StudentController {
    @Autowired
    DelayMessageSender messageSender;
    @RequestMapping("/send-message")
    public String sendMessage(String msg,Integer delayTimes){
        System.out.println(new Date());
        messageSender.sendMessage(msg,delayTimes);
        return "发送成功";
    }
}

7.测试 

在浏览器中输入以下地址进入RabbitMQ界面。账号密码都是guest。

http://localhost:15672/

 先来看看我们的初始队列。这里是什么都没有的。


然后我们启动项目后在看。我们刚才创建出来的四个队列全部都被加载了出来。


 使用PostMan发送一次请求。


 我们的请求在17s的时候发送到后端,消息打印在23s,说明我们的延迟队列有效果。


接下来我们测试10s的延迟队列。


 10s后死信队列B成功的接收到了消息。

四、🍌死信队列的应用场景

延迟队列通常用于需要延迟执行某些任务或触发某些事件的场景。例如,在电子商务中,可以使用延迟队列实现订单超时未支付自动取消功能。

  • 1.订单创建

    • 用户下单后,系统生成订单,并将订单信息发送到一个普通队列,同时设置一个TTL(Time-To-Live)为30分钟。
    • 这个队列配置了死信交换机(Dead Letter Exchange, DLX),当消息过期后会被转发到死信队列。
  • 2.等待支付

    • 在30分钟内,用户可以完成支付。如果用户在30分钟内支付完成,系统会从普通队列中移除对应的消息并正常处理订单。
  • 3.订单超时处理

    • 如果用户未在30分钟内完成支付,消息会自动过期并转发到死信交换机,进而转发到死信队列。
  • 4.取消订单

    • 系统有一个专门的消费者监听死信队列。当有消息进入死信队列时,消费者会自动处理这些消息,即取消订单、释放库存,并通知用户订单已取消。
  • 5.定时任务(可选):

    • 虽然死信队列已经提供了超时订单的处理,但为了防止消息丢失或处理延迟,可以设置一个定时任务定期检查订单状态,确保所有超时未支付的订单都得到了处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1801992.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java采取擦除式泛型到底兼容了什么场景?

在开始前刚好我有一些资料&#xff0c;是我根据网友给的问题精心整理了一份「 Java的资料从专业入门到高级教程」&#xff0c; 点个关注在评论区回复“888”之后私信回复“888”&#xff0c;全部无偿共享给大家&#xff01;&#xff01;&#xff01;Java擦除式泛型是一个妥协,…

高光谱图像聚类的像素-超像素对比学习与伪标签校正

Pixel-Superpixel Contrastive Learning and Pseudo-Label Correction for Hyperspectral Image Clustering 文章目录 Pixel-Superpixel Contrastive Learning and Pseudo-Label Correction for Hyperspectral Image Clustering摘要引言相关方法对比学习 方法超像素对比学习像素…

【Flask开发实战】首页模板

一、前言 前面我们已经完成登录页面的设定&#xff0c;登录后临时调转到“hello flask”的界面。现在我们根据实际首页的设计需要&#xff0c;来完成首页相关内容的开发。一般系统首页会放一些分析数据&#xff0c;多以图表的方式展示&#xff0c;方便使用者了解信息。以防火墙…

JavaScript事件监听之其它事件(页面加载事件、元素滚动事件、页面尺寸事件、M端事件)

目录 1. 页面加载事件(load、DOMContentLoaded)2. 元素滚动事件(scroll)3. 页面尺寸事件3.1 resize3.2 获取元素宽高3.3 获取元素位置(offsetLeft和offsetTop、getBoundingClientRect) 4. M端事件 1. 页面加载事件(load、DOMContentLoaded) load事件&#xff1a; 使用场景: 等…

MyBatis二级缓存开启条件

MyBatis缓存为俩层体系。分为一级缓存和二级缓存。 一级缓存&#xff1a; 一级缓存默认开启&#xff0c;一级缓存的作用域是SqlSession级别的&#xff0c;这意味着当你更换SqlSession之后就不能再利用原来的SqlSession的一级缓存了。不同的SqlSession之间的一级缓存是隔离的。…

基础概念解析:SOCKS5代理究竟是什么?SOCKS5代理ip使用场景有哪些?

在当今数字化时代&#xff0c;网络安全和隐私保护已成为我们日常生活中不可忽视的问题。随着网络攻击手段的日益复杂&#xff0c;如何安全地访问互联网资源成为了一个亟待解决的问题。SOCKS5代理作为一种先进的网络协议&#xff0c;为我们提供了解决这一问题的有效方案。 本文…

实用的 C 盘搬家软件

一、简介 1、一款专门用于 Windows 系统的文件夹移动工具&#xff0c;它允许用户将程序或游戏的安装文件夹从一台驱动器移动到另一台驱动器&#xff0c;或者同一个驱动器内的不同路径&#xff0c;而无需重新安装或破坏现有的程序安装。 二、下载 1、下载地址&#xff1a; 官网链…

3-1RT-Thread时钟管理

这里写自定义目录标题 时钟节拍是RT thread操作系统的最小时间单位。 第一个功能&#xff0c;rt tick值自动加1&#xff0c;在RT thread当中通过RT_USING_SMP定义了多核和单核的场景。第二个功能&#xff0c;检查当前线程的时间片&#xff0c;首先获取当前线程&#xff0c;将当…

AI 写高考作文丨10 款大模型 “交卷”,实力水平如何?

前言 在科技日新月异的今天&#xff0c;人工智能&#xff08;AI&#xff09;已不再是遥不可及的未来科技&#xff0c;而是逐渐融入我们日常生活的实用工具。从智能语音助手到自动驾驶汽车&#xff0c;从智能家居系统到精准医疗诊断&#xff0c;AI技术正以其强大的计算能力和数…

端午搞个零花钱,轻松赚取创业的第一桶金!2024最受欢迎的创业项目,2024新的创业机会

好好的端午节&#xff0c; 净给我添堵&#xff01; 本来我打算在端午节愉快的玩耍&#xff0c; 结果一大早起床却看到舍友在给一堆设备充电&#xff0c; 然后装的整整齐齐&#xff0c; 满满一书包。 我好奇他小子这是要干嘛&#xff1f; 不会是打算今天回去给亲朋好友准备…

Centos7 安装配置SFTP

Centos7安装配置SFTP 更新源安装 OpenSSH 服务启动服务设置为开机自启动新建一个用户 (sftpuser为你要设置的用户的用户名)编辑配置文件设置sftp用户的根目录重启SSH服务代码实现 由于最近工作中需要实现动态上传文件到帆软服务器&#xff0c;但是帆软没有提供相关API&#xff…

Allegro限制走线区域和元器件摆放区域

一、Line to Shape 当你的板框是线条形式时&#xff0c;先将线条闭合成shape&#xff0c;点击Shape–>Compose Shape,选择如下参数&#xff0c;再逐一选中分散线条&#xff0c;选择完毕右键Done即可 二、Z-Cope使用 点击Edit–>Z-Cope&#xff0c;选择如下参数&…

Django项目上线-报错汇总

Django项目上线-报错汇总 下列报错基本都是Python环境相关 pip install 报错 WARNING: pip is configured with locations that require TLS/SSL, however the ssl module in Python is not available. debian运行pip报错ssl module in Python is not available - z417 - 博…

SM481,SM432和利时DCS备件

SM481,SM432和利时DCS备件。POU名只能包含字母、数字、下划线&#xff0c;第一个字符必须是字母或者下划线&#xff0c;且遵循以下原则&#xff1a;SM481,SM432和利时DCS备件。关于重名&#xff0c;不能与变量名、变量组名、POU文件夹名、任务名、SM481,SM432和利时DCS备件。工…

OpenCompass 大模型评测平台C-Eval 基准任务评估实战

1. 引言 在人工智能迅速发展的今天&#xff0c;大型语言模型&#xff08;LLMs&#xff09;在多个领域展现出了巨大的潜力和应用价值。然而&#xff0c;如何评价这些模型的性能&#xff0c;了解它们的优缺点&#xff0c;成为了一个重要课题。OpenCompass&#xff0c;一个由上海…

【Java】解决Java报错:ArrayIndexOutOfBoundsException

文章目录 引言1. 错误详解2. 常见的出错场景2.1 直接访问数组越界2.2 循环中的索引错误2.3 多维数组的错误访问 3. 解决方案3.1 检查数组长度3.2 正确使用循环3.3 多维数组的正确访问 4. 预防措施4.1 使用增强型 for 循环4.2 编写防御性代码4.3 单元测试 结语 引言 在Java编程…

C++学习插曲:“name“的初始化操作由“case“标签跳过

问题 "name"的初始化操作由"case"标签跳过 问题代码 case 3: // 3、删除联系人string name;cout << "请输入删除联系人姓名&#xff1a;" << endl;cin >> name;if (isExistPerson(&abs, name) -1){cout << "…

Linux--进程间通信(system V共享内存)

目录 1.原理部分 2.系统调用接口 参数说明 返回值 1. 函数原型 2. 参数说明 3. 返回值 4. 原理 5. 注意事项 3.使用一下shmget&#xff08;一段代码&#xff09; 4.一个案例&#xff08;一段代码) 1.简单封装一下 2.使用共享内存 2.1挂接&#xff08;shmat&#x…

netty+springboot+vue聊天室(需要了解netty)

先看看这个使用websocket实现的聊天室&#xff0c;因为前端是使用websocket&#xff0c;和下面的demo的前端差不多就不解释实现原理&#xff0c;所以建议还是看看(要是会websocket的大佬请忽略) springbootwebsocketvue聊天室 目录 一、实现内容二、代码实现1.后端2.前端源码…

java基础语法整理 ----- 上

java基础语法 一、变量二、数据类型三、标识符四、键盘录入五、判断语句1. 三种格式2. 练习题 六、switch语句七、循环八、循环控制语句九、方法 一、变量 1.什么是变量&#xff1a; 在程序运行过程中&#xff0c;其值可以发生改变的量从本质上讲&#xff0c;变量是内存中的一…