【RabbitMQ学习日记】——死信队列与延迟队列

news2025/1/9 15:19:55

一、死信队列

1.1 相关概念

死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumerqueue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列

应用场景:

  • 为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中
  • 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

1.2 死信的来源

  • 消息TTL过期 【Time to live 存活时间】
  • 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
  • 消息被拒绝(basic.rejectbasic.nack)并且 requeue=false 【消息应答被拒绝并且不能重新返回队列】

1.3 死信实战

🌔 1、我们先看一下案例的整体架构图

  • 一个生产者、两个消费者
    • 一个消费者从正常队列接收消息、另一个从死信队列接收消息
      在这里插入图片描述

🌔 2、演示第一种情况:消息TTL过期

  • 我们需要思考如何将普通队列与死信交换机关联起来?
    • 此时就用到了我们声明队列的第四个参数,准备一个 map 集合
      • 以添加键值对的方式设置 死信交换机 和 routingKey
      //正常队列绑定死信队列信息
      Map<String, Object> params = new HashMap<>();
      //正常队列设置死信交换机
      params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
      //正常队列设置死信 routingKey
      params.put("x-dead-letter-routing-key", "lisi");
      
  • 第二个我们需要思考如何设置过期时间
    • 首先,过期时间肯定是设置给消息的,所以应该在生产者发出消息前进行设置

    • 这就用到了我们发送消息的第三个参数,通过 AMQP.BasicProperties 指明过期时间

      AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
      
  • 接下来是代码部分和效果演示:

1️⃣ 生产者

package com.atguigu.rabbitmq.eight;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;

/**
 * @author Bonbons
 * @version 1.0
 * 死信队列的生产者
 */
public class Producer {
    public static final String NORMAL_QUEUE = "normal_queue";
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        //为了实现消息的存活时间为10s
        AMQP.BasicProperties properties = new AMQP.BasicProperties()
                .builder().expiration("10000").build();
        for (int i = 1; i < 11; i++) {
            String message = "info" + i;
            //演示过期成为死信消息
            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes("UTF-8"));
        }
    }
}

2️⃣ 消费者1

package com.atguigu.rabbitmq.eight;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.util.HashMap;
import java.util.Map;

/**
 * @author Bonbons
 * @version 1.0
 * 演示我们死信队列的消费者1号
 */
public class Consumer01 {
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    public static final String DEAD_EXCHANGE = "dead_exchange";
    public static final String NORMAL_QUEUE = "normal_queue";
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception{
        //获取信道
        Channel channel = RabbitMqUtils.getChannel();

        //声明交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);


        //正常队列绑定死信队列信息
        Map<String, Object> params = new HashMap<>();
        //正常队列设置死信交换机
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //正常队列设置死信 routingKey
        params.put("x-dead-letter-routing-key", "lisi");

        //声明队列 [此处用到了第四个参数,实现将普通队列与死信交换机关联起来]
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, params);
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);

        //将我们的队列和交换机绑定到一起
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");

        System.out.println("等待接收消息......");
        channel.basicConsume(NORMAL_QUEUE, true, (consumersTag, message) -> {
            String msg = new String(message.getBody(), "UTF-8");
           	System.out.println("Consumer01接收到的消息: " + msg);
        }, consumersTag -> {});
    }
}

3️⃣ 消费者2

package com.atguigu.rabbitmq.eight;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.util.HashMap;
import java.util.Map;

/**
 * @author Bonbons
 * @version 1.0
 * 演示我们死信队列的消费者2号,就负责接收死信队列的消息
 */
public class Consumer02 {
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception{
        //获取信道
        Channel channel = RabbitMqUtils.getChannel();
        //最普通的消费者
        System.out.println("等待接收消息......");
        channel.basicConsume(DEAD_QUEUE, true, (consumersTag, message) -> {
            System.out.println("Consumer02接收到的消息: " + new String(message.getBody(), "UTF-8"));
        }, consumersTag -> {});

    }
}
  • 我们通过不启动消费者1(C1)来演示消息过期的效果
    • 因为没有消费者1,当生产者的消息发过来就没有消费者进行处理,此处10s后就会成为死信消息
    • 成为死信消息后就会从当前的这个队列发往死信交换机
    • 死信交换机将死信消息发往死信队列,我们的第二个消费者从死信队列获取消息并消费

请添加图片描述
请添加图片描述
在这里插入图片描述

🌔 3、演示第二种情况:队列达到最大长度

  • 只需要在上面的代码中进行部分修改
    • 第一步,去掉生产者中消息的过期时间参数设置
      在这里插入图片描述

    • 第二步 ,在消费者C1的声明普通队列的map集合参数加入一条关于队列长度的限制

      • params.put("x-max-length", 6);
        在这里插入图片描述

重点: 当我们想要修改已经存在队列、交换机的属性时,需要将已经存在的删除,然后运行代码重新创建

  • 通过RabbitMQWeb管理工具我们可以看出达到了预期的测试效果:
    • 发送十条消息,正常队列里有六条消息,死信队列里有四条消息

请添加图片描述

🌔 4、第三种情况:消息被拒并不能返回队列

  • 对于生产者与第二种情况一致,消费者C2与第一种情况一致
  • 与前两种情况最大的不同在于消费者C1的设定,接下来我们看一下这部分的代码
    • 要关闭自动应答
package com.atguigu.rabbitmq.eight;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.util.HashMap;
import java.util.Map;

/**
 * @author Bonbons
 * @version 1.0
 * 演示我们死信队列的消费者1号
 */
public class Consumer01 {
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    public static final String DEAD_EXCHANGE = "dead_exchange";
    public static final String NORMAL_QUEUE = "normal_queue";
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception{
        //获取信道
        Channel channel = RabbitMqUtils.getChannel();

        //声明交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);


        //正常队列绑定死信队列信息
        Map<String, Object> params = new HashMap<>();
        //正常队列设置死信交换机
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //正常队列设置死信 routingKey
        params.put("x-dead-letter-routing-key", "lisi");
        //设置正常队列的长度限制
//        params.put("x-max-length", 6);

        //声明队列 [此处用到了第四个参数,实现将普通队列与死信交换机关联起来]
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, params);
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);

        //将我们的队列和交换机绑定到一起
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");

        System.out.println("等待接收消息......");
        //此处要开启手动应答,因为如果设置为true(自动应答)就不存在拒绝消息这一说了
        channel.basicConsume(NORMAL_QUEUE, false, (consumersTag, message) -> {
            String msg = new String(message.getBody(), "UTF-8");
            if(msg.equals("info5")){
                System.out.println("Consumer01接收到的消息: " + msg + ": 此消息是被C1拒绝的");
                //拒绝消息,并设置不重新添加到队列 >> 变成死信消息
                channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
            }else{
                System.out.println("Consumer01接收到的消息: " + msg);
            }
        }, consumersTag -> {});
    }
}

在这里插入图片描述
在这里插入图片描述
请添加图片描述

二、延迟队列

  • 此部分开始,我们不再使用简单的Maven项目,而是创建一个SpringBoot项目来演示

2.1 延迟队列的概念

  • 延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理
  • 简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列
  • 对于我们死信队列的C2消费者来说,它就是一个延迟队列

2.2 延迟队列使用场景

  • 1.订单在十分钟之内未支付则自动取消
  • 2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
  • 3.用户注册成功后,如果三天内没有登陆则进行短信提醒。
  • 4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。
  • 5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

在这里插入图片描述
这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。【引出我们的延迟队列】

2.3 RabbitMQ 中的 TTL

  • TTL 是什么呢?
    • TTLRabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。
    • 换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。
    • 如果同时配置了队列的 TTL 和消息的TTL,那么较小的那个值将会被使用,有两种方式设置 TTL

🌔 1、如何为消息设置 TTL 呢?

  • 在我们使用 RabbitTemplateconvertAndSend 方法发送消息时,通过参数设置消息的过期时间 【SpringBoot】
    在这里插入图片描述

🌔 2、如何为队列设置 TTL 呢?

  • 在我们创建队列之间,设置队列 map 集合参数时,设置我们队列的过期时间 【SpringBoot】
    在这里插入图片描述

🌔 3、两者有什么区别呢?

  • 区别:
    • 第一种方式:如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),
    • 第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间
    • 另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期
    • 如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

2.4 整合 SpringBoot

在这里插入图片描述

🌔 1、创建项目

在这里插入图片描述
🌔 2、添加依赖 【此处直接贴上我们的pom.xml】

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.14</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.atguigu.rabbitmq</groupId>
    <artifactId>springboot-rabbitmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot-rabbitmq</name>
    <description>springboot-rabbitmq</description>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!--RabbitMQ 依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--swagger-->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!--RabbitMQ 测试依赖-->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

🌔 3、修改配置文件 【连接到我们的RabbitMQ

spring.rabbitmq.host=8.130.95.101
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123

🌔 4、添加 Swagger 配置类

  • 根据在代码中使用自定义的注解来生成接口文档,这个在前后端分离的项目中很重要
  • 这样做的好处是 在开发接口时可以通过swagger 将接口文档定义好,同时也方便以后的维护
package com.atguigu.rabbitmq.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@Configuration
@EnableSwagger2
public class SwaggerConfig {
    @Bean
    public Docket webApiConfig() {
        return new Docket(DocumentationType.SWAGGER_2)
                .groupName("webApi")
                .apiInfo(webApiInfo())
                .select()
                .build();
    }

    private ApiInfo webApiInfo() {
        return new ApiInfoBuilder()
                .title("rabbitmq 接口文档")
                .description("本文档描述了 rabbitmq 微服务接口定义")
                .version("1.0")
                .contact(new Contact("enjoy6288", "http://atguigu.com",
                        "1551388580@qq.com"))
                .build();
    }
}

2.5 队列 TTL

  • 就是通过死信队列的方式实现延迟队列,接下来直接通过案例演示
  • 首先,我们要知道:
    • 声明工作通过配置类完成 【就是定义声明队列、交换机】
    • 生产者通过controller完成 【以浏览器发送请求的方式让生产者发送消息】
    • 消费者通过监听器完成 【通过监听器监听对应的队列,来完成消费者的功能】

1️⃣ 代码架构图

  • 创建两个队列 QAQB,两者队列 TTL 分别设置为 10S40S
  • 然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct
  • 创建一个死信队列 QD,它们的绑定关系如下

在这里插入图片描述

2️⃣ 配置类代码 【定义队列、交换机部分】

package com.atguigu.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author Bonbons
 * @version 1.0
 * 延迟队列演示配置类
 */
@Configuration
public class TtlQueueConfig {
    //普通交换机
    public static final String X_EXCHANGE = "X";
    //死信交换机
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    //普通队列
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";
    //死信队列
    public static final String DEAD_LETTER_QUEUE = "QD";

    //声明我们的两个交换机
    @Bean("xExchange")
    public DirectExchange xExchange(){
        return new DirectExchange(X_EXCHANGE);
    }
    @Bean("yExchange")
    public DirectExchange yExchange(){
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    //声明我们的两个普通队列
    @Bean("queueA")
    public Queue queueA(){
        Map<String, Object> arguments = new HashMap<>(3);//3代表初始map的长度
        //设置死信交换机
        arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //设置routingKey
        arguments.put("x-dead-letter-routing-key", "YD");
        //设置过期时间
        arguments.put("x-message-ttl", 10000);

        return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
    }

    @Bean("queueB")
    public Queue queueB(){
        Map<String, Object> arguments = new HashMap<>(3);//3代表初始map的长度
        //设置死信交换机
        arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //设置routingKey
        arguments.put("x-dead-letter-routing-key", "YD");
        //设置过期时间
        arguments.put("x-message-ttl", 40000);

        return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
    }

    //声明我们的死信队列
    @Bean("queueD")
    public Queue queueD(){
//        return new Queue(DEAD_LETTER_QUEUE); //两种创建队列的方式均可
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
    }

    //将我们的两个普通队列绑定到普通交换机X上
    @Bean
    public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }

    @Bean
    public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }

    //将我们的死信队列绑定到死信交换机上
    @Bean
    public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,
                                  @Qualifier("yExchange") DirectExchange yExchange){
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }

3️⃣ 消息生产者 【编写一个处理请求的控制器】

package com.atguigu.rabbitmq.controller;

import com.atguigu.rabbitmq.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**
 * @author Bonbons
 * @version 1.0
 */
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
    @Autowired
    RabbitTemplate rabbitTemplate;
    //发送我们基础死信消息
    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message){
        log.info("当前时间:{}, 发送一条消息给两个TTL队列: {}", new Date().toString(), message);
        rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10s的队列" + message);
        rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40s的队列" + message);
    }
}

4️⃣ 消息消费者 【一个组件类里定义Rabbit监听器】

package com.atguigu.rabbitmq.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


import java.util.Date;

/**
 * @author Bonbons
 * @version 1.0
 */
@Slf4j
@Component
public class DeadLetterQueueConsumer {
    //接收消息
    @RabbitListener(queues = "QD")
    public void receiveMsg(Message message, Channel channel) throws Exception{
        String msg = new String(message.getBody());
        log.info("当前时间: {}, 收到死信队列的消息: {}", new Date().toString(), msg);
    }
}
  • 接下来我们通过浏览器发送请求进行测试:

请添加图片描述
请添加图片描述

  • 第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了
  • 但是基于上述这种情况,每增加一个新的时间需求,就要新增一个队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?

2.6 延迟队列优化

  • 就是我们先不为队列设置 TTL,而是通过浏览器发起请求的占位符来携带我们的过期时间
    • 注意此处的过期时间,我们是设置给消息的,并没有设置给队列
  • 映射URL绑定的占位符 带占位符的URL是 Spring3.0 新增的功能

1️⃣ 代码架构图

  • 基于上面通过TTL设置的延迟队列,我们新增了一个队列 QC,绑定关系如下,该队列不设置 TTL 时间

在这里插入图片描述
2️⃣ 配置类代码

  • 只需要在我上面的配置类 TtlQueueConfig.java 代码里增加下面的内容
//优化我们的延迟队列,再设置一个普通队列,不为其设置过期时间,让它做公共队列
    public static final String QUEUE_C = "QC";
    //声明这个队列
    @Bean
    public Queue queueC(){
        Map<String, Object> arguments = new HashMap<>(3);
        //设置死信交换机
        arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //设置routingKey
        arguments.put("x-dead-letter-routing-key", "YD");
        return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
    }
    //绑定到我们的交换机X
    @Bean
    public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueC).to(xExchange).with("XC");
    }

3️⃣ 消息生产者代码 【增加了一个新的请求映射】

  • 在我们上面的生产者代码文件中进行添加即可
//发送我们优化后的延迟队列的消息
    @GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
    public void sendMsg(@PathVariable String message, @PathVariable String ttlTime){
        log.info("当前时间:{}, 发送一条时长为{}毫秒的消息给TTL队列QC: {}", new Date().toString(), ttlTime, message);
        //使用springboot为我们提供的rabbitTemplate来发送延迟消息
        rabbitTemplate.convertAndSend("X", "XC", message, msg -> {
            //设置过期时间
            msg.getMessageProperties().setExpiration(ttlTime);
            return msg;
        });
    }
  • 对于消费者代码不变,接下来我们发起请求进行测试:
    • 我们先发起一个过期时间长的请求,再发起一个过期时间短的请求,这样便于说明优化后延迟队列的问题

请添加图片描述

请添加图片描述

  • 看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“
  • 因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行

2.7 Rabbitmq 插件实现延迟队列

请添加图片描述

  • 对于上文优化后延迟队列存在的问题,我们可以通过使用 RabbitMQ 为我们提供的插件解决
    • 我们需要知道,这种基于插件的==实现延迟的效果是在交换机处完成的 ==

1️⃣ 安装延迟队列插件 【在我们的第一篇配置文章资源包里有】

  • 在官网上下载 https://www.rabbitmq.com/community-plugins.html,下载 rabbitmq_delayed_message_exchange 插件,然后解压放置到 RabbitMQ 的插件目录
    在这里插入图片描述

  • 进入 RabbitMQ 的安装目录下的 plgins 目录,执行下面命令让该插件生效,然后重启 RabbitMQ

    • cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
    • rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    • rabbitmqctl start_app
      在这里插入图片描述
  • 通过 RabbitMQweb 控制台可以看到添加插件前后交换机处的变换

在这里插入图片描述

  • 插件安装好了之后,就可以演示如何使用我们基于插件的方式实现延迟队列

2️⃣ 代码架构图

  • 在这里新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:
    在这里插入图片描述

3️⃣ 配置类代码

在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制 消息传递后并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中

新建的配置类

package com.atguigu.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author Bonbons
 * @version 1.0
 */
@Configuration
public class DelayedQueueConfig {
    //定义我们的交换机、队列、RoutingKey
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
    //声明交换机
    @Bean("delayedExchange")
    public CustomExchange delayedExchange(){
        Map<String, Object> arguments = new HashMap<>();
        //延迟类型
        arguments.put("x-delayed-type", "direct");
        /**
         * 1、交换机的名字
         * 2、交换机的类型 [指明是延迟消息]
         * 3、是否需要持久化
         * 4、是否开启自动删除
         * 5、其他参数
         */
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message",
                true, false, arguments);
    }
    //声明队列
    @Bean
    public Queue delayedQueue(){
        return new Queue(DELAYED_QUEUE_NAME);
    }
    //将交换机与队列进行绑定
    @Bean
    public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,
                                                      @Qualifier("delayedExchange") CustomExchange delayedExchange){
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

4️⃣ 消息生产者

  • 在原来的生产者代码里添加
    //发送我们使用插件的延迟队列消息
    @GetMapping("/sendDelayedMsg/{message}/{delayedTime}")
    public void sendMsg(@PathVariable String message, @PathVariable Integer delayedTime){
        log.info("当前时间:{}, 发送一条时长为{}毫秒的消息给延迟队列delayed.queue: {}", new Date().toString(), delayedTime, message);
        rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY, message, msg -> {
            //设置延迟消息
            msg.getMessageProperties().setDelay(delayedTime);
            return msg;
        });
    }

5️⃣ 消息消费者

新的监听器

package com.atguigu.rabbitmq.consumer;

import com.atguigu.rabbitmq.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * @author Bonbons
 * @version 1.0
 */
@Slf4j
@Component
public class DelayQueueConsumer {
    //设置监听器
    @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
    public void receiveDelayQueue(Message message){
        String msg = new String(message.getBody());
        log.info("当前时间: {}, 收到延迟队列的消息: {}", new Date().toString(), msg);
    }
}
  • 发起请求,测试效果

请添加图片描述
请添加图片描述

6️⃣ 总结

  • 延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。
  • 另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。
  • 当然,延时队列还有很多其它选择,比如利用 JavaDelayQueue,利用 Rediszset,利用 Quartz或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/417693.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

云擎未来,智信天下 | 2023移动云大会来了!

新三年&#xff0c;新征程 2023年作为新三年开局之年 移动云又将以怎样的 全新品牌形象、全新战略规划 向“一流云服务商”战略目标勇毅前行&#xff1f; 答案就在这里&#xff1a; 2023移动云大会&#xff0c;官宣定档&#xff01; 2023.4.25 - 4.26 苏州金鸡湖国际会…

MATLAB配置C/C++库(Visual Studio,MinGW-w64 C/C++ 编译器)问题(包括低版本matlab配置高版本VS)

问题描述 使用matlab加载C语言的库函数时&#xff0c;需要提前配置好C/C编译器&#xff0c;否则在matlab中使用 loadlibrary 加载C /C库中的函数时候&#xff0c;会报错&#xff1a; “未找到支持的编译器或 SDK。您可以安装免费提供的 MinGW-w64 C/C 编译器&#xff1b;请参…

软考第三章 广域通信网

广域通信网 1.公共交换电话网 公共交换电话网PSTN&#xff1a;是为了话音通信而建立的网络&#xff0c;在有些地方用户仍然通过电话线拨号上网 1.1 电话系统的结构 电话系统是一个高度冗余的分级网络。用户电话通过一对铜线连接到最近的端局。 公共电话网由本地网和长途网组…

一文速学数模-最优化算法(二)梯度下降算法一文详解+Python代码

目录 前言 一、梯度下降法简述 二、梯度下降算法原理理解 1.梯度 2.梯度定义 3.梯度下降 4.损失函数(loss function) 5.学习率(步长) 三、梯度下降算法代码展示 消失和爆炸梯度 前言 最近会不断更新深度学习系列文章(全实战性可运行代码)加入到我的一文速学-数学建模…

Git项目同时推送到GitHub和Gitee详细操作

文章目录前言一、创建仓库【Create a new repository】二、初始化三、配置公钥四、密钥验证五、代码推送总结前言 将Git项目同时推送到GitHub和Gitee的好处如下&#xff1a; 提高代码可见性和协作性&#xff1a;GitHub和Gitee都是知名的代码托管平台&#xff0c;推送代码到这两…

大数据能力提升项目|学生成果展系列之五

导读为了发挥清华大学多学科优势&#xff0c;搭建跨学科交叉融合平台&#xff0c;创新跨学科交叉培养模式&#xff0c;培养具有大数据思维和应用创新的“π”型人才&#xff0c;由清华大学研究生院、清华大学大数据研究中心及相关院系共同设计组织的“清华大学大数据能力提升项…

JavaScript【十】JavaScript事件

文章目录&#x1f31f;前言&#x1f31f;事件&#x1f31f;绑定事件的方式&#xff1a;&#x1f31f;标签绑定事件&#xff1a;&#x1f31f;Document对象来绑定事件&#xff1a;on事件type&#x1f31f; 事件监听:使同一个对象的同一事件绑定多个事件处理程序。兼容IE9及以上。…

Zephyr RTOS应用开发(nrf5340)

目录 概述 开发环境安装 创建一个新的Zephyr应用 构建应用并刷写到开发板 概述 Zephyr™项目是一个采用Apache 2.0协议许可&#xff0c;Linux基金会托管的协作项目。针对低功耗、小型内存微处理器设备开发的物联网嵌入式小型、可扩展的实时操作系统&#xff0c;支持多种硬件…

redis哨兵机制详解

文章目录前言监控&#xff08;Monitoring&#xff09;自动故障转移&#xff08;Automatic failover&#xff09;配置提供者&#xff08;Configuration provider&#xff09;通知&#xff08;Notification&#xff09;哨兵集群的组建哨兵监控Redis库主库下线的判定主观下线客观下…

ORB-SLAM2原理分析

原理分析 ORB-SLAM2是一种基于单目、双目和RGB-D相机的实时视觉SLAM系统&#xff0c;用于在无GPS信号或有限的传感器信息情况下&#xff0c;构建三维地图并定位相机的位置和姿态。ORB-SLAM2采用了ORB特征点提取和描述符匹配技术&#xff0c;以及图优化和闭环检测算法&#xff…

分布式系统监控zabbix安装部署及使用

目录 一、zabbix监控 1、什么是zabbix 2、zabbix功能 3、zabbix运行机制 4、zabbix的3种架构 ①C/S架构 ②zabbix-proxy-client架构 ③master-zabbix-client架构 5、zabbix工作原理及数据走向 6、zabbix监控模式 7、zabbix部署 8、zabbix图形化页面显示设置 二、Z…

技术复盘(3)--ElasticSearch

技术复盘--ElasticSearch技术复盘(3)--ElasticSearch资料地址概述对比solrwindows下使用esIK分词器介绍es基本命令集成springboot以及调用api技术复盘(3)–ElasticSearch ElasticSearch7.x 资料地址 ElasticSearch官网&#xff1a;https://www.elastic.co/ ElasticSearch-he…

unity3d:网络同步,状态同步,源码,C#服务器demo

协议数据单元 网络同步包最小单元PDU // 预测的基础数据类型 public class PDU { public uint UID; //玩家的唯一id public PDUType type; //PDU类型 public Vector3 position; // 位置 public Vector3 forward; // 朝向 public float speed; // 速度: 速度为…

【STL十一】无序容器(哈希容器)—— unordered_map、unordered_set

【STL十一】无序容器&#xff08;哈希容器&#xff09;—— unordered_map、unordered_set一、简介1、关联容器和无序容器不同2、无序容器特点二、头文件三、模板类四、无序容器的内部结构1、管理桶2、内部结构五、unordered_map成员函数1、迭代器2、元素访问3、容量4、修改操作…

CV大模型应用:Grounded-Segment-Anything实现目标分割、检测与风格迁移

Grounded-Segment-Anything实现目标分割、检测与风格迁移 文章目录Grounded-Segment-Anything实现目标分割、检测与风格迁移一、Segment-Anything介绍二、Grounded-Segment-Anything1、简介2、测试一、Segment-Anything介绍 代码链接&#xff1a;https://github.com/facebookr…

Direct3D 12——混合——雾

实现雾化效果的流程如下&#xff1a;如图所示&#xff0c;首先指明雾的颜色、由摄像机到雾气的最近距离以及雾 的分散范围(即从雾到摄像机的最近距离至雾能完全覆盖物体的这段范围)&#xff0c;接下来再将网格三角形上点 的颜色置为原色与雾色的加权平均值&#xff1a; foggedC…

Python爬虫之多线程加快爬取速度

之前我们学习了动态翻页我们实现了网页的动态的分页&#xff0c;此时我们可以爬取所有的公开信息了&#xff0c;经过几十个小时的不懈努力&#xff0c;一共获取了 16万 条数据&#xff0c;但是软件的效率实在是有点低了&#xff0c;看了下获取 10 万条数据的时间超过了 56 个小…

【技巧】Word“只读方式”的设置与取消

如果你担心在阅读Word文档的时候&#xff0c;不小心修改并保存了内容&#xff0c;那就给文档设置“只读方式”吧&#xff0c;这样就算不小心做了修改也不能随意保存。 Word文档的“只读方式”有两种模式&#xff0c;对此不清楚的小伙伴&#xff0c;来看看如何设置和取消吧。 模…

第一次作业

作业内容&#xff1a;1&#xff0c;atd和crond的区别 2&#xff0c;指定在2023/08/26 09&#xff1a;00将时间写入testmail.txt文件中 3&#xff0c;指定在每天凌晨4&#xff1a;00将该时间点之前的系统日志信息备份到个目录下&#xff08;/var/log/messages &#xff09;&…

华为手表开发:WATCH 3 Pro(17)传感器订阅指南针

华为手表开发&#xff1a;WATCH 3 Pro&#xff08;17&#xff09;传感器订阅指南针初环境与设备指南针传感器介绍与说明鸿蒙开发文件夹&#xff1a;文件新增展示的文本标记index.hmlindex.cssindex.js初 希望能写一些简单的教程和案例分享给需要的人 鸿蒙可穿戴开发 环境与设…