RabbitMQ-延迟队列

news2025/1/11 20:03:56
  一、介绍

         延迟队列,队列内部是有序的,最重要的特性就体现在他的延迟属性上,延时队列中的元素是希望在指定时间到了或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。       

  二、springboot整合  

        1、引入相关依赖

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!--RabbitMQ依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--swagger-->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!--RabbitMQ测试依赖-->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

        2、添加相关配置

spring.rabbitmq.host=192.168.xxx.165
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123

      3、添加swagger配置类

@Configuration
@EnableSwagger2
public class SwaggerConfig {
   public Docket webApiConfig(){
       return new Docket (DocumentationType.SWAGGER_2)
               .groupName("webApi")
               .apiInfo(webApiInfo())
               .select()
               .build();
   }
   private ApiInfo webApiInfo(){
       return new ApiInfoBuilder()
               .title("rabbitmq接口文档")
               .description("本文档描述了rabbitmq微服务接口定义")
               .version("1.0")
               .contact(new Contact("zhangsan","xxx","954544@qq.com"))
               .build();
   }
}

       4、添加rabbitMQ配置类

@Configuration
public class TtlQueueConfig {
     //普通交换机名称
    public static final String X_EXCHANGE ="X";
    //死信交换机名称
    public static final String Y_DEAD_LETTER_EXCHANGE ="Y";
    //普通队列名称
    public static final String QUEUE_A ="QA";
    public static final String QUEUE_B ="QB";
    //死信队列名称
    public static final String DEAD_LETTER_QUEUE ="QD";

    //声明xExchange
    @Bean("xExchange")
    public DirectExchange xExchange(){
        return new DirectExchange(X_EXCHANGE);
    }
    //声明yExchange
    @Bean("yExchange")
    public DirectExchange yExchange(){
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }
    //声明队列  ttl为10s
    @Bean("queueA")
    public Queue queueA(){
        HashMap<String, Object> arguments = new HashMap<>(3);
        //设置死信交换机
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //设置死信routingKey
        arguments.put("x-dead-letter-routing-key","YD");
        //设置TTL 单位ms
        arguments.put("x-message-ttl",10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
    }
    //声明队列  ttl为10s
    @Bean("queueB")
    public Queue queueB(){
        HashMap<String, Object> arguments = new HashMap<>(3);
        //设置死信交换机
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //设置死信routingKey
        arguments.put("x-dead-letter-routing-key","YD");
        //设置TTL 单位ms
        arguments.put("x-message-ttl",40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
    }
    //死信队列
    @Bean("queueD")
    public Queue queueD(){
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
    }
    //绑定
    @Bean
    public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }
    @Bean
    public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
            @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }
    @Bean
    public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,
            @Qualifier("yExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueD).to(xExchange).with("YD");
    }
}

       5、生产者代码

@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMessageController {
      
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //开始发消息
    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message){
        log.info("当前时间:{},发送一条消息给两个队列:{}",new Date().toString(),message);
        rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列"+message);
        rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列"+message);
    }
}

        6、消费者代码

Slf4j
@Component
public class DeadLetterQueueConsumer {
      //接收消息
    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel){
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到死信队列的消息:{}",new Date(),msg);

    }
}
  三、延时队列优化

        基于上面代码增加第三个普通队列配置,以及修改生产者代码

        增加配置

//普通队列名称
    public static final String QUEUE_C ="QC";

    //声明队列QC
    @Bean("queueC")
    public Queue queueC(){
        Map<String, Object> arguments = new HashMap<>(2);
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        arguments.put("x-dead-letter-routing-key","YD");
        return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
    }
    //绑定
    @Bean
    public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueC).to(xExchange).with("XC");
    }

      另写一个生产者

@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
    public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){
        log.info("当前时间:{},发送一条时长{}毫秒TTl信息给队列QC:{}",new Date(),ttlTime,message);
        rabbitTemplate.convertAndSend("X","XC",message,(msg)->{
            //发送消息的时候 延迟时长
            msg.getMessageProperties().setExpiration(ttlTime);
            return msg;
        });
    }

      以上代码会引发出另外一个问题:就是如果使用在消息属性上设置ttl的方式,消息可能并不会按时“死亡”,因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时间很长,而第二个消息延时时间很短,第二个消息并不会得到优先执行。

  四、RabbitMQ插件实现延迟队列

        1、安装延时队列插件

             插件地址:https://www.rabbitmq.com/community-plugins.html

             插件名称:rabbitmq_delayed_message_exchange

             解压放到RabbitMQ的插件目录。

            进入RabbitMQ的安装目录下的plugins目录,执行下面命令让该插件生效,然后重启RabbitMQ(重启之前先执行chmod 644 rabbitmq_delayed_message_exchange-3.8.0.ez 给该文件添加读权限)

/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
rabbitmq-plugins enable  rabbitmq_delayed_message_exchange
systemctl restart rabbitmq-server

   重启之后即可看到(本质其实作用在了交换机)

     2、添加延时队列配置

public class DelayedQueueConfig {
    //交换机
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    //队列
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    //routingKey
    public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";
    
    //声明交换机
    @Bean
    public CustomExchange delayedExchange(){
        HashMap<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type","direct");
        /**
         * 参数
         * 1、交换机名称
         * 2、交换机类型
         * 3、是否需要持久化
         * 4、是否需要自动删除
         * 5、其他参数
         */
        return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",
                true,false,arguments);
    }
    
    //声明队列
    @Bean
    public Queue delayedQueue(){
        return new Queue(DELAYED_QUEUE_NAME);
    }
    //绑定
    @Bean
    public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,
                                                      @Qualifier("delayedExchange") CustomExchange delayedExchange){
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    } 
}

       生产者

//开始发消息 基于插件延时消息
    @GetMapping("/sendDelayedMsg/{message}/{delayedTime}")
    public void sendMsg(@PathVariable String message,@PathVariable Integer delayedTime){
        log.info("当前时间:{},发送一条时长{}毫秒延迟信息给队列delayed.queue:{}",new Date(),delayedTime,message);
        rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,
                DelayedQueueConfig.DELAYED_EXCHANGE_NAME,message,(msg)->{
            //发送消息的时候 延迟时长
            msg.getMessageProperties().setDelay(delayedTime);
            return msg;
        });
    }

        消费者

@Slf4j
@Component
public class DelayedQueueConsumer {
     @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
    public void receiveDelayedQueue(Message message, Channel channel){
         String msg = new String(message.getBody());
         log.info("当前时间:{},收到延迟队列消息:{},",new Date().toString(),msg);
     }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/340972.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

TCP的协议格式 --- 20字节固定长度 + 40字节可选数据

目录 一、 20字节的固定长度 16位源端口和16位目的端口号&#xff0c;32位序号&#xff0c;32位确认属序号&#xff0c;4位首部长度&#xff08;需要乘4&#xff09; 保留&#xff08;6位&#xff09; 16位窗口大小 16位的校验和16位的紧急指针 二、40字节可选数据 1.2.1、…

软件设计师教程(六)计算机系统知识-操作系统知识

软件设计师教程 软件设计师教程&#xff08;一&#xff09;计算机系统知识-计算机系统基础知识 软件设计师教程&#xff08;二&#xff09;计算机系统知识-计算机体系结构 软件设计师教程&#xff08;三&#xff09;计算机系统知识-计算机体系结构 软件设计师教程&#xff08;…

最新中文版FL Studio21水果软件下载安装图文教程

FL Studio是目前流行广泛使用人数最多音乐编曲制作软件&#xff0c;这款软件相信广大网友并不陌生&#xff0c;今天带来的是FL中文版本&#xff0c;所有的功能都能在线编辑&#xff0c;用户直接就能操作&#xff0c;同时因为是21水果是最新版&#xff0c;所以增加了新的功能&am…

【Spring Cloud总结】1、服务提供者与服务消费者快速上手

目录 文件结构 代码 1、api 1.1实体类&#xff08;Dept &#xff09; 1.2数据库 2、provider 2.1 DeptController 2.2 DeptDao 2.3 DeptService 2.4 DeptServiceImpl 2.5 application.yml 3、consumer 3.1 ConfigBean 3.2 DeptConsumerController 测试 1.启动…

创建阿里云物联网平台

创建阿里云物联网平台 对云平台设备创建过程做记录&#xff0c;懒得再看视频 文章参考视频&#xff1a;https://www.bilibili.com/video/BV1jP4y1E7TJ?p26&vd_source50694678ae937a743c59db6b5ff46c31 阿里云&#xff1a;https://www.aliyun.com 1&#xff0e;物联网平…

基于jsp的网络电子相册的设计与实现

技术&#xff1a;Java、JSP等摘要&#xff1a;随着科学技术的不断进步&#xff0c;云技术以及大数据的不断完善&#xff0c;越来越多的网络忠实用户告别了冲洗相片的时代&#xff0c;他们更喜欢将相片上传至网络&#xff0c;这样就省去了携带和查找的麻烦&#xff0c;随时随地只…

大数据技术之Hudi

Hudi概述 1.1 Hudi简介 Apache Hudi&#xff08;Hadoop Upserts Delete and Incremental&#xff09;是下一代流数据湖平台。Apache Hudi将核心仓库和数据库功能直接引入数据湖。Hudi提供了表、事务、高效的upserts/delete、高级索引、流摄取服务、数据集群/压缩优化和并发&a…

Vue3 的基础使用(详细)

一、Vite创建Vue3 项目 npm init vitelatest vue3-ts-vite -- --template vue创建成功后用npm install命令安装依赖运行项目 vue3vite初始化项目的基础结构 启动成功的页面 二、Vue3基本语法 1、定义全局变量 <template><h1>{{msg}}</h1><div><a…

常见漏洞之 Fastjson

数据来源 01 Fastjson相关介绍 》Fastjson概述 》Fastjson历史漏洞 02 Fastson的识别与漏洞发现 》Fastjson寻找 》Fastjson漏洞发现&#xff08;利用 dnslog&#xff09; 03 修复建议 建议1&#xff1a;使用fastjson1.2.83版本&#xff1b; Github地址&#xff1a;https:…

MySQL 高级查询

目录1.左关联2.右关联3.子查询4.联合查询5.分组查询1.左关联 MySQL中的左关联&#xff08;Left Join&#xff09;是一种基于共同列的连接操作&#xff0c; 它将左侧表中的所有行与右侧表中匹配的行结合在一起&#xff0c; 如果右侧表中没有匹配的行&#xff0c;则结果集中右侧…

[数据库]基本数据类型

●&#x1f9d1;个人主页:你帅你先说. ●&#x1f4c3;欢迎点赞&#x1f44d;关注&#x1f4a1;收藏&#x1f496; ●&#x1f4d6;既选择了远方&#xff0c;便只顾风雨兼程。 ●&#x1f91f;欢迎大家有问题随时私信我&#xff01; ●&#x1f9d0;版权&#xff1a;本文由[你帅…

nodejs下载安装以及配置全局变量

一、下载 官网下载&#xff1a; 1、https://nodejs.org/dist/v10.16.3/node-v10.16.3-win-x64.zip 2、http://nodejs.cn/download/ 注&#xff1a;根据自己的项目对应电的nodejs版本去下载对应的&#xff0c;否则肯出现项目无法运行的情况 二、安装 无脑下一步即可&#xff0…

【2021/反事实/POI推荐】Improving location recommendation with urban knowledge graph

文章全文首发&#xff1a;码农的科研笔记&#xff08;公众号&#xff09; 原文&#xff1a;https://arxiv.org/abs/2111.01013 1 动机 位置推荐定义为推荐地理位置给用户&#xff0c;现有推荐无法无法很好的建模地理位置属性&#xff0c;这导致推荐结果是次优的。同时作者希望…

引入QQ邮箱发送验证码进行安全校验

最近想给自己的项目在注册时加点安全校验&#xff0c;本想着使用短信验证码&#xff0c;奈何囊中羞涩只能退而求次改用QQ邮箱验证注册~ 一.需求分析 场景&#xff1a;用户输入自己的邮箱&#xff0c;点击获取验证码&#xff0c;后台会发送一封邮件到对应邮箱中。 分析&#x…

element表单搜索框与表格高度自适应

一般在后台管理系统中&#xff0c;表单搜索框和表格的搭配是非常常见的&#xff0c;如下所示&#xff1a; 在该图中&#xff0c;搜索框有五个&#xff0c;分为了两行排列。但根据大多数的UI标准&#xff0c;搜索框默认只显示一行&#xff0c;多余的需要进行隐藏。此时的页面被…

【Flutter入门到进阶】跨平台相关-Flutter的选择

1.回顾Android渲染机制工作流程 1.1 图例 1.2 说明 1.Android内部自己通过skia引起完成图像构建 2.Android通过surfacefilinger来完成图像与驱动之间的处理 2 自建渲染引擎渲染方案 自建渲染引擎渲染方案&#xff0c;是有别于Web渲染采用WebView容器进行渲染UI、原生渲染…

软件设计(十)--计算机系统知识

软件设计&#xff08;九&#xff09;https://blog.csdn.net/ke1ying/article/details/128990035 一、效验码 奇偶效验&#xff1a;是一种最简单的效验方法。基本思想是&#xff1a;通过在编码中增加一个效验位来使编码中1的个数为奇数&#xff08;奇效验&#xff09;或者为偶…

微内核架构

QNX微内核架构 设计原则 最小化内核功能 将操作系统功能移到用户态&#xff0c;成为Server“服务”。在用户模块之间&#xff0c;通过消息传递机制通信。 在宏内核中&#xff0c;文件系统和磁盘驱动都是运行在内核态&#xff0c;应用通过内核调用文件系统&#xff0c;文件系…

Ubuntu搭建博客typecho

提示 见过这样类型的blog吧&#xff0c;现在就是最详细的搭建过程。 第一步 搭建apache2环境 安装命令 sudo apt -y install apache2 apache2-utils访问项目地址&#xff1a; /var/www/html配置文件在: /etc/apache2/日志在&#xff1a; /var/log/apache2/ 修改配置文件 vim…