延迟队列与SpringBoot实战

news2025/1/9 2:32:04

延迟队列与SpringBoot实战

概念

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列

TTL介绍

TTL 是什么呢?TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置TTL 属性的队列,那么这条消息如果在TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的TTL 和消息的TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。

设置TTL
  • 消息设置TTL

    rabbitTemplate.convertAndSend("X", "XC", message + "ttl:" + ttl, msg -> {
                msg.getMessageProperties().setExpiration(ttl);
                return msg;
            });
    
  • 队列设置TTL

    args.put("x-message-ttl",15000);
    QueueBuilder.durable(QUEUE_B).withArguments(args).build();
    
  • 如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃

代码实战

配置POM
<dependencies>
        <!--RabbitMQ 依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--swagger-->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!--RabbitMQ 测试依赖-->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
配置application
spring.rabbitmq.host=192.168.31.232
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
配置Swagger
package com.vmware.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@Configuration
@EnableSwagger2
public class SwaggerConfig {
    @Bean
    public Docket webApiConfig() {
        return new Docket(DocumentationType.SWAGGER_2)
                .groupName("webApi")
                .apiInfo(webApiInfo())
                .select().build();
    }

    private ApiInfo webApiInfo() {
        return new ApiInfoBuilder()
                .title("rabbitmq 接口文档")
                .description("本文档描述了 rabbitmq 微服务接口定义")
                .version("1.0")
                .contact(new Contact("name", "url",
                        "email"))
                .build();
    }
}
代码架构图

创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是direct,创建一个死信队列 QD,它们的绑定关系如下

在这里插入图片描述

RabbitMQ配置类
package com.vmware.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitConfig {
    //普通交换机
    private static final String X_EXCHANGE = "X";
    //死信交换机
    private static final String Y_EXCHANGE = "Y";
    //普通队列A
    private static final String QUEUE_A = "QA";
    //普通队列B
    private static final String QUEUE_B = "QB";
    //普通队列C
    private static final String QUEUE_C = "QC";
    //死信队列D
    private static final String QUEUE_D = "QD";


    @Bean("xExchange")
    public DirectExchange xExchange() {
        return new DirectExchange(X_EXCHANGE);
    }

    @Bean("yExchange")
    public DirectExchange yExchange() {
        return new DirectExchange(Y_EXCHANGE);
    }

    @Bean("queueA")
    public Queue queueA() {
        Map<String, Object> args = new HashMap<>();
        //设置死信交换机
        args.put("x-dead-letter-exchange", Y_EXCHANGE);
        //设置死信Routing Key
        args.put("x-dead-letter-routing-key", "YD");
        //设置超时
        args.put("x-message-ttl", 10000);
        //构建队列
        return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
    }

    @Bean("queueB")
    public Queue queueB() {
        Map<String, Object> args = new HashMap<>();
        //设置死信交换机
        args.put("x-dead-letter-exchange", Y_EXCHANGE);
        //设置死心Routing Key
        args.put("x-dead-letter-routing-key", "YD");
        //设置超时ttl
        args.put("x-message-ttl",15000);
        //构建队列
        return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
    }

    @Bean("queueC")
    public Queue queueC(){
        Map<String,Object> args=new HashMap<>();
        //设置死信交换机
        args.put("x-dead-letter-exchange", Y_EXCHANGE);
        //设置死信Routing Key
        args.put("x-dead-letter-routing-key", "YD");
        //构建队列
        return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
    }

    @Bean("queueD")
    public Queue queueD(){
        //构建死信队列D
        return QueueBuilder.durable(QUEUE_D).build();
    }

    //绑定普通交换机和队列A
    @Bean
    public Binding queueABindingX(){
        return BindingBuilder.bind(queueA()).to(xExchange()).with("XA");
    }

    //绑定普通交换机与队列B
    @Bean
    public Binding queueBBindingX(){
        return BindingBuilder.bind(queueB()).to(xExchange()).with("XB");
    }

    //绑定普通交换机与队列C
    @Bean
    public Binding queueCBindingX(){
        return BindingBuilder.bind(queueC()).to(xExchange()).with("XC");
    }

    //绑定死信交换机与死信队列
    @Bean
    public Binding queueDBindingY(){
        return BindingBuilder.bind(queueD()).to(yExchange()).with("YD");
    }
}
生产者
package com.vmware.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.Date;

@RestController
@RequestMapping("/ttl")
@Slf4j
public class SendMsgController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * @param message 消息
     * @apiNote 生产者代码
     */
    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message) {
        log.info("当前时间:{},发送消息给两个队列:{}", new Date(), message);
        rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10秒的队列" + message);
        rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为15秒的队列" + message);
    }

    @GetMapping("/sendMsg/{message}/{ttl}")
    public void sendMsg(@PathVariable String message, @PathVariable String ttl) {
        rabbitTemplate.convertAndSend("X", "XC", message + "ttl:" + ttl, msg -> {
            msg.getMessageProperties().setExpiration(ttl);
            return msg;
        });
        log.info("当前时间:{},发送消息:{}给队列:XC,ttl:{}", new Date(), message, ttl);
    }
}

消费者
package com.vmware.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

@Slf4j
@Component
public class DeadLetterQueueConsumer {
    @RabbitListener(queues = {"QD"})
    public void receiveD(Message message, Channel channel) {
        log.info("当前时间:{} 死信队列收到消息:{}", new Date(), message);
    }
}
存在的问题

当生产者发布消息到延迟队列后,消息只能按顺序被消费者消费,当某一消息阻塞时间很长时则会导致其他消息一同阻塞,不能达到ttl到期优先被延时队列的消费者所消费的效果

优化

下载插件rabbitmq_delayed_message_exchange到rabbit的plugin目录下

  • 官网:https://www.rabbitmq.com/community-plugins.html

  • ubuntu下载方式

    cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.2/plugins
    sudo wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez
    
  • 启用插件

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    
  • 重启服务

    systemctl restart rabbitmq-server
    
  • 安装完成后可以在rabbit交换机页面看到x-delayed-message
    在这里插入图片描述

基于插件的延时队列代码实战

在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制 消息传递后并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中
在这里插入图片描述

配置延时队列与交换机
package com.vmware.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class DelayQueueConfig {
    private static final String DELAY_QUEUE_NAME = "delayed.queue";
    private static final String DELAY_EXCHANGE_NAME = "delayed.exchange";
    private static final String DELAY_ROUTING_KEY = "delayed.routingkey";

    @Bean
    public Queue delayQueue(){
        return new Queue(DELAY_QUEUE_NAME);
    }

    @Bean
    public CustomExchange delayExchange(){
        Map<String,Object> args =new HashMap<>();
        args.put("x-delayed-type", "direct");
        /**
         * 1.交换机名称
         * 2.交换机类型:插件类型
         * 3.是否持久化
         * 4.是否自动删除
         */
        return new CustomExchange(DELAY_EXCHANGE_NAME,"x-delayed-message",true,false,args);
    }

    @Bean
    public Binding delayQueueBindExchange(){
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(DELAY_ROUTING_KEY).noargs();
    }
}
生产者
package com.vmware.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.Date;

@RestController
@RequestMapping("/ttl")
@Slf4j
public class SendMsgController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendDelayMsg/{message}/{delayTime}")
    public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
        rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routingkey", message, msg -> {
            msg.getMessageProperties().setDelay(delayTime);
            return msg;
        });
        log.info("当前时间:{},发送一条延迟{}毫秒的信息给队列 delayed.queue:{}", new Date(), delayTime, message);
    }
}
消费者
package com.vmware.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;

@Slf4j
@Component
public class DeadLetterQueueConsumer {
    @RabbitListener(queues = {"delayed.queue"})
    public void receiveDelayedQueue(Message message) {
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到延时队列的消息:{}", new Date(), msg);
    }
}
实际效果
2022-07-19 23:33:18.021  INFO 23040 --- [nio-8080-exec-4] com.vmware.controller.SendMsgController  : 当前时间:Tue Jul 19 23:33:18 CST 2022,发送一条延迟20000毫秒的信息给队列 delayed.queue:哈哈哈
2022-07-19 23:33:23.349  INFO 23040 --- [nio-8080-exec-5] com.vmware.controller.SendMsgController  : 当前时间:Tue Jul 19 23:33:23 CST 2022,发送一条延迟2000毫秒的信息给队列 delayed.queue:哈
2022-07-19 23:33:25.332  INFO 23040 --- [ntContainer#0-1] c.v.consumer.DeadLetterQueueConsumer     : 当前时间:Tue Jul 19 23:33:25 CST 2022,收到延时队列的消息:哈
2022-07-19 23:33:37.830  INFO 23040 --- [ntContainer#0-1] c.v.consumer.DeadLetterQueueConsumer     : 当前时间:Tue Jul 19 23:33:37 CST 2022,收到延时队列的消息:哈哈哈
  • 可以看到前一条延时消息并没有阻塞到后面的消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/484541.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于jQuery------购物车案例

目录 基于jQuery------购物车案例 案例&#xff1a;购物车案例模块-增减商品数量分析 案例&#xff1a;购物车案例模块-修改商品小计分析 案例&#xff1a;购物车案例模块-计算总计和总额 案例&#xff1a;购物车案例模块-删除商品模块 案例&#xff1a;购物车案例模块-选…

从‘discover.partitions‘=‘true‘分析Hive的TBLPROPERTIES

从’discover.partitions’true’分析Hive的TBLPROPERTIES 前言 Hive3.1.2先建表&#xff1a; show databases ;use db_lzy;show tables ;create external table if not exists test_external_20230502(id int,comment1 string,comment2 string ) stored as parquet ;creat…

C语言通过控制台命令行传入参数

Linux 与 windows运行c语言程序 切换到对应目录下 1. gcc hello.c -o hello 2.Linux: ./hello Windows: hello.exe int main(){}默认无参数 但在一些情况下想要直接通过在上述过程中第二步就传入参数而不是使用scanf..之类的输入语句就需要使用有参数的main方法: int main() {…

Docker--harbor私有库部署与管理

目录 一、本地私有仓库 搭建本地私有仓库 Docker容器的重启策略 二、Harbor 1、什么是Harbor 2、Harbor特性 3、Harbor的构成 三、Harbor部署 实验步骤 1、安装Docker-Compose服务 2、部署Harbor服务 1、下载或上传Harbor安装程序 2、修改Harbor安装的配置文件 3、…

基于TI板MSP430 玩转PID

文章目录 前言一、整体框架二、PID算法1. 位置式PID2. 增量式PID3. 比例外置式PID4. 积分限幅、输出限幅和PID参数整定5. 位置式PID和增量式PID的区别及抉择 三、初值获取1. 定时器输入捕获2. 外部中断3. ADC采样 前言 具体啥是PID&#xff0c;我这里不做介绍&#xff0c;网上…

SpringMVC(后)SSM整合

10、文件上传和下载 10.1、文件下载 ResponseEntity用于控制器方法的返回值类型&#xff0c;该控制器方法的返回值就是响应到浏览器的响应报文 使用ResponseEntity实现下载文件的功能 RequestMapping("/testDown") public ResponseEntity<byte[]> testResp…

【Hello Algorithm】复杂度 二分法

作者&#xff1a;小萌新 专栏&#xff1a;算法 作者简介&#xff1a;大二学生 希望能和大家一起进步 本篇博客简介&#xff1a;介绍算法的复杂度 对数器和二分法 复杂度 对数器 二分法 复杂度常数时间操作非常数时间操作时间复杂度空间复杂度 二分法有序数组中找一个值寻找有序…

树的存储和遍历

文章目录 6.5 树与森林6.5.1 树的存储结构1. 双亲表示法(顺序存储结构)2 孩子链表表示法3 孩子兄弟表示法(二叉树表示法) 6.5.2 森林与二叉树的转换1 树转换成二叉树2 二叉树转换成树3 森林转换成二叉树4 二叉树转换成森林 6.5.3 树和森林的遍历1. 树的遍历2. 森林的遍历 6.6 赫…

数据库篇:表设计、创建编辑以及导出导入数据

微信小程序云开发实战-答题积分赛小程序系列 数据库篇:表设计、添加编辑以及导出导入数据 原型: 最终实现界面截图:

Moqui REST API的两种实现方法

实现Restful API的方法 实现REST API有两种方法。第一种&#xff1a; The main tool for building a REST API based on internal services and entity operations is to define resource paths in a Service REST API XML file such as the moqui.rest.xml file in moqui-fr…

chatGPT国内可用镜像源地址

chatGPT国内可用镜像源地址 彷丶徨丶 关注 IP属地: 湖北 0.811 2023.03.15 16:02:16 字数 1,152 阅读 249,582 如果你正在尝试访问Chatgpt网站&#xff0c;但由于某些原因无法访问该网站&#xff0c;那么你可以尝试使用Chatgpt的国内镜像网站。以下是一些Chatgpt国内镜像网站的…

java基础知识——27.动态代理

这篇文章&#xff0c;我们来学一下java的动态代理 目录 1.动态代理的介绍 2.具体的代码实现 1.动态代理的介绍 动态代理&#xff1a;无侵入式的额外给代码增加功能 很不好理解&#xff0c;下面&#xff0c;我们通过两个例子来说明一下什么是动态代理&#xff1a; 例一&a…

shell编程 -- 基础

shell是一个命令行解释器&#xff0c;它接收应用程序/用户命令&#xff0c;然后调用操作系统内核。 linux笔记 链接&#xff1a;https://pan.baidu.com/s/16GZCPfUTRzUqIyGnYwPuUg?pwds5xt 提取码&#xff1a;s5xt 脚本执行 采用bash或者sh脚本的相对路径或绝对路径&#x…

TikTok跨境电商如何选品和营销?

鑫优尚电子商务&#xff1a;TikTok目前发展飞速&#xff0c;全球的MAU是5.6亿。现在作为全球炙手可热的短视频平台&#xff0c;全球流量相当庞大&#xff0c;覆盖75个语种、全球150个国家和地区。 对于从事跨境电商行业的人来说&#xff0c;又怎能错过一个流量这么好的平台呢&a…

ChatGPT注册详细步骤教程-ChatGPT申请教程

注册chatGPT账号的详细经验教程 注册ChatGPT账号是使用这一自然语言生成技术的关键步骤。下面是注册ChatGPT账号的详细经验教程&#xff1a; 访问OpenAI注册页面 在Web浏览器中打开OpenAI注册页面。 2.输入个人信息 在注册页面上&#xff0c;您需要提供以下个人信息&#…

树莓派 二维云台调零控制

目录 舵机的工作原理 案例程序 要求&#xff1a; 程序&#xff1a; 二维云台是通过IIC总线进行控制的&#xff0c;我们可以通过窗口命令输入&#xff1a;i2cdetect -y 1来检测IIC总线是否连接正常。 当有40显示的时候就说明IIC总线正常。 操控舵机我们需要一个PCA9685的模…

【移动端网页布局】流式布局案例 ④ ( Banner 栏制作 | 固定定位 | 标准流 | 百分比宽度设置 )

文章目录 一、Banner 栏样式及核心要点1、实现效果2、核心要点分析 二、完整代码示例1、HTML 标签结构2、CSS 样式3、展示效果 一、Banner 栏样式及核心要点 1、实现效果 在上一篇博客中 , 实现了 搜索栏 , 在本篇博客开始实现 搜索栏 下方的 Banner 栏 ; 2、核心要点分析 Bann…

OpenCV实战(21)——基于随机样本一致匹配图像

OpenCV实战&#xff08;21&#xff09;——基于随机样本一致匹配图像 0. 前言1. 基于随机样本一致匹配图像1.1 计算基本矩阵与匹配集1.2 随机样本一致算法 2. 算法优化2.1 优化基本矩阵2.2 优化匹配集 3. 完整代码小结系列链接 0. 前言 当两台摄像机拍摄同一场景时&#xff0c…

【Vue面试题】Vue2.x生命周期?

文章目录 1.有哪些生命周期&#xff08;系统自带&#xff09;?beforeCreate( 创建前 )created ( 创建后&#xff09;beforeMount (挂载前)mount (挂载后)beforeUpdate (更新前)updated (更新后)beforeDestroy&#xff08;销毁前&#xff09;destroy&#xff08;销毁后&#xf…

突发:深度学习之父Hinton为了警告AI的风险,不惜从谷歌离职!

‍数据智能产业创新服务媒体 ——聚焦数智 改变商业 今天&#xff0c;AI领域发生了一件标志性事件。那就是Hinton 为了能更自由的表达对AI失控的担忧&#xff0c;不惜从工作了10年的谷歌离职&#xff0c;可见他真的深切的感受到了危机。 不久前&#xff0c;纽约时报的一篇采访…