RabbitMQ------延迟队列(整合SpringBoot以及使用延迟插件实现真正延时)(七)

news2024/12/30 2:04:05

RabbitMQ------延迟队列(七)

延迟队列

延迟队列,内部是有序的,特点:延时属性。
简单讲:延时队列是用来存放需要在指定时间被处理的元素队列。
是基于死信队列的消息过期场景。

适用场景

1.订单在十分钟之内未支付则自动取消。
2.用户注册后,三天内没有登陆,则短信提醒。
特点:需要在某个事件发生之后或者之前的特定事件点完成莫一项任务。

整合SpringBoot

导入依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

<!--        RabbitMQ依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.34</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.6.1</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.6.1</version>
        </dependency>
 <!--        RabbitMQ测试依赖-->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <version>2.4.7</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

在配置文件application.properties中写明rabbitmq的IP、端口、用户名以及密码

spring.rabbitmq.host=192.168.200.139
spring.rabbitmq.prot=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123

架构图如图所示
在这里插入图片描述
队列1设置的过期时间为10s,队列2设置的过期时间为40s。
代码分三部分:生产者、消费者、以及交换机队列整体作为一部分。
生产者和消费者都不在进行交换机以及队列的声明。
交换机以及队列配置类的书写:

/**
 * TTL队列  配置文件类代码
 */
@Configuration
public class TTLQueueConfig {
    //普通交换机
    public static final String X_EXCHANGE = "X";
    //死信交换机
    public static final String Y_EXCHANGE = "Y";
    //普通队列1 过期时间10s
    public static final String QA_QUEUE = "QA";
    //普通队列2 过期时间40s
    public static final String QB_QUEUE = "QB";
    //死信队列
    public static final String QD_QUEUE = "QD";

    //声明X交换机
    @Bean("xExchange")
    public DirectExchange xExchange(){
        return new DirectExchange(X_EXCHANGE);
    }

    //声明X交换机
    @Bean("yExchange")
    public DirectExchange yExchange(){
        return new DirectExchange(Y_EXCHANGE);
    }

    //声明普通队列1
    @Bean("queueA")
    public Queue queueA(){
        Map<String,Object> arguments = new HashMap<>();
        //设置死信队列
        arguments.put("x-dead-letter-exchange",QD_QUEUE);
        //设置routingkey
        arguments.put("x-dead-letter-routing-key","YD");
        //设置ttl
        arguments.put("x-message-ttl",10000);
        return QueueBuilder.durable(QA_QUEUE).withArguments(arguments).build();
    }

    //声明普通队列2
    @Bean("queueB")
    public Queue queueB(){
        Map<String,Object> arguments = new HashMap<>();
        //设置死信队列
        arguments.put("x-dead-letter-exchange",QD_QUEUE);
        //设置routingkey
        arguments.put("x-dead-letter-routing-key","YD");
        //设置ttl
        arguments.put("x-message-ttl",40000);
        return QueueBuilder.durable(QB_QUEUE).withArguments(arguments).build();
    }

    //声明死信队列
    @Bean("queueD")
    public Queue queueD(){
        return QueueBuilder.durable(QD_QUEUE).build();
    }
    
    //绑定
    //通过容器名字进行捆绑,绑定普通队列A和交换价X
    @Bean
    public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }
    //通过容器名字进行捆绑,绑定普通队列B和交换价X
    @Bean
    public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueB).to(xExchange).with("XA");
    }
    //通过容器名字进行捆绑,绑定死信队列D和交换价Y
    @Bean
    public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,
                                  @Qualifier("yExchange") DirectExchange yExchange){
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }
}

生产者代码示例:

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**
 * 发送延迟
 * 生产者
 */
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //开始发消息
    @GetMapping("/send/{message}")
    public void sendMsg(@PathVariable String message){
        log.info("当前时间:{},发送一条消息给两个ttl队列:{}",new Date().toString(),message);

        /**
         * 交换机
         * routingkey
         * message
         */
        rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10秒的队列:"+message);
        rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40秒的队列:"+message);
    }
}

消费者代码示例:

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 队列TTL 消费者
 */
@Slf4j
@Component
public class DeadLetterConsumer {

    //接收消息
    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel) throws Exception {
        String string = new String(message.getBody(),"UTF-8");
        log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),string);

    }
}

注意导包,不要导错了。

结果:第一条消息在10s后变成死信消息,然后被消费者消费掉,第二条消息在40s后变成死信消息,然后被消费者消费掉,这样就达成了延迟队列的目的。

局限性:每增加一个延时需求,都需要新增一个普通队列。这样是不合理的。
优化:只有一个延时队列,由生产者指定需要延时多久

延时队列优化,由生产者指定延时时间

增加一个队列QC,QC不设置过期时间,过期时间由生产者指定。
配置类代码新增QC,不设置存活时间,由生产者发送

    //设置普通队列
    public static final String QC_QUEUE = "QC";

    //设置普通队列
    @Bean("queueC")
    public  Queue queueC(){
        HashMap<String,Object> arguments = new HashMap<>();
        //设置死信队列
        arguments.put("x-dead-letter-exchange",QD_QUEUE);
        //设置routingkey
        arguments.put("x-dead-letter-routing-key","YD");
        return QueueBuilder.durable(QC_QUEUE).withArguments(arguments).build();
    }
    //绑定
    //通过容器名字进行捆绑,绑定普通队列A和交换价X
    @Bean
    public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueC).to(xExchange).with("XC");
    }

生产者新增代码

//开始发消息
    @GetMapping("/sendExpirationMessage/{message}/{ttlTime}")
    public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){
        log.info("当前时间:{},发送一条消息给QC,ttl队列:{},过期时间为:{}ms",new Date().toString(),message,ttlTime);

        /**
         * 交换机
         * routingkey
         * message
         * MessagePostProcessor,可以设置存活时间
         */
        //ttlTime设置置过期时间
        rabbitTemplate.convertAndSend("X","XC","消息来自ttl的队列:"+message,msg->{
            //发送消息时,设置存活时间
            msg.getMessageProperties().setExpiration(ttlTime);
         return msg;   
        });
    }

使用这种方式,消息并不会按时死亡。因为RabbitMQ只会检测第一个消息是否过期,如果过期,会被放入死信队列。

经过测试发现,第一个发送20s过期的消息,第二个发送2s过期的消息,结果依然是20s后,20s消息被消费,之后,2s消息才会被消费。 说明延时队列是按顺序执行。如果第一个消息延时很久,后续消息也会延时,并不会优先执行。

此现象只能通过,基于插件的RabbitMQ进行弥补,自身无法弥补这个缺陷。

RabbitMQ插件实现延时队列

安装插件

在官网上下载

https://www.rabbitmq.com/community-plugins.html

下载rabbitmq_delayed_message_exchange插件。解压放在RabbitMQ的插件目录。
进入RabbitMQ的安装目录下的plgins目录,执行以下命令让该插件生效,然后重启RabbitMQ。

--  3.8.8代表rabbitmq版本
-- 目录如下
cd   /usr/lib/rabbitmq/rabbitmq_server_3.8.8/plugins
-- 安装命令,不用写插件版本号
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
-- 重启rabbitmq
systemctl restart  rabbitmq-server(安装时的服务名)

重启好后打开rabbitmq的管理端页面,可以在Exchanges目录下,Add a new exchange,Type 中,会增加一个x-delayed-message的选项。
使用插件,结构更加简单
代表由交换机进行延迟,而不是队列了。
在这里插入图片描述
配置类书写
当Bean中不指定名称时,名称默认方法名
自定义交换机时,需要指定交换机类型,而之前未自定交换机,直接创建的DirectExchange交换机

/**
 * 延迟交换机
 */
@Configuration
public class DelayedQueueConfig {

    //延迟
    public static final String DELAYED_EXCHANGE = "delayed.exchange";

    //延迟队列
    public static final String DELAYED_QUEUE = "delayed.queue";

    //routingkey
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";


    //声明  自定义交换机,基于插件
    @Bean
    public CustomExchange delayedExchange(){
        //String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
        /**
         * 交换机名字
         * 类型
         * 是否持久化
         * 是否自动删除
         * 自定义参数
         */
        HashMap<String,Object> arguments = new HashMap();
        arguments.put("x-delayed-type","direct");
        return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",
                false,false,arguments);
    }

    //声明延迟队列
    @Bean
    public Queue queueDe(){
        return new Queue(DELAYED_QUEUE);
    }

    //绑定  当Bean中不指定名称时,名称默认方法名
    @Bean
    public Binding queueBindingExchange(
            @Qualifier("queueDe") Queue queue,
            @Qualifier("delayedExchange") CustomExchange customExchange){
        return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

生产者以及消费者代码与之前相同。
结论:可以实现根据过期时间,消费消息。

延时队列也有很多其他的选择,比如Java的DelayQueue,利用Redis的Zset,利用Quartz或者利用kafka的时间轮,各有特点,需要合适的场景。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/20820.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux(centos7)安装MySQL5.7

Linux 安装MySQL5.7 数据库 所有的安装方式是基于手动式的安装&#xff0c;也就是整体的下载然后配置 rpm与yum之间的关系 rpm 是Linux 免除编译安装带来的安装方式&#xff0c;而yum 是在rpm 上面的进一步的优化&#xff0c;换句话说yum 既包含了rpm 的简单安装&#xff0c…

百度地图自定义覆盖物(html)格式

<style type"text/css"> body, html{ width: 100%; height: 100%; overflow: hidden; margin: 0; font-family: "微软雅黑"; display: flex; justify-content: space-between; } #cont…

使用html+css实现一个静态页面(厦门旅游网站制作6个页面) 旅游网页设计制作 HTML5期末考核大作业,网站——美丽家乡。 学生旅行 游玩 主题住宿网页

&#x1f468;‍&#x1f393;静态网站的编写主要是用 HTML DⅣV CSSJS等来完成页面的排版设计&#x1f469;‍&#x1f393;&#xff0c;一般的网页作业需要融入以下知识点&#xff1a;div布局、浮动定位、高级css、表格、表单及验证、js轮播图、音频视频Fash的应用、uli、下拉…

FL Studio2023水果完整中文版音乐制作软件

FL Studio2023水果中文版是一款由 Image Line 公司研发几近完美的虚拟音乐工作站,同时也是知名的音乐制作软件。它让你的计算机就像是全功能的录音室&#xff0c;漂亮的大混音盘&#xff0c;先进的创作工具&#xff0c;让你的音乐突破想象力的限制。它可以播放由你指定或加入的…

IP包头分析

数据来源 IP包头长度 ip包头的长度在20-60个字节间&#xff0c;一般是20字节&#xff08;固定部分&#xff09;&#xff0c;可选项最大是40个字节&#xff08;比较少用&#xff09;。 第一行 版本 就是指出IP数据包是什么版本&#xff1b;常见的版本就是0100 IPV4和 0110 IPV6…

机器学习中基本符号表示和常用术语

目录一. 基本符号表示二. 常用术语1. 精准率计算&#xff08;precision&#xff09;2.召回率计算&#xff08;recall&#xff09;3.准确率的计算&#xff08;accuracy&#xff09;4.F1 Score5. G分数6.一. 基本符号表示 TP &#xff08;true positive&#xff09;&#xff1a;预…

【Python】基础语法(安装,常变量,类型,注释,运算符)

目录python环境搭建安装Python安装pycharmpython基础语法常量和表达式变量和数据类型变量数据类型注释输入输出运算符算术运算符关系运算符逻辑运算符赋值运算符xdm,最近更新一些学习Python基础知识的内容,感谢支持!python环境搭建 俗话说工欲善其事必先利其器,要想学习Python开…

新知实验室TRTC初体验

小记 一次偶然的邂逅,让我知道了TRTC实时音视频这个神奇的东西,于是便开始研究起来这个鬼东西,本以为是一个很简单的东西,调用一下SDK就完事了 , 谁知道它的文档并不是很齐全,这一点还需要多多努力啊!!! 正文 实时音视频&#xff08;TRTC&#xff09; 是腾讯云提供的一套低…

现代对称密码

乘积密码 因为语言特性&#xff0c;用代替和置换是不安全的&#xff0c;可以考虑用多次的加密增强密码强度。多次加密想要提高密码强度&#xff0c;要求多次加密不能成为一个群&#xff0c;那么加密就可以被重复并且组合复杂度会增加。 分组密码 分组密码就是把明文分组后进…

Linux进阶-Shell编程与环境变量

目录 定义变量&#xff1a; 使用变量&#xff1a; 将命令的结果赋值给变量&#xff1a; 删除变量&#xff1a;unset 退出当前进程&#xff1a;exit 读取从键盘输入的数据 &#xff1a;read 对整数进行数字运算&#xff1a;(()) 逻辑与或&#xff1a; 检测某个条件是否成…

【Java八股文总结】之MySQL数据库

文章目录数据库一、基本概念二、MySQL数据库2.1 MySQL基础1、MySQL数据库的优点&#xff1f;2、MySQL支持的数据类型有&#xff1f;Q&#xff1a;varchar 和 char 的区别&#xff1f;Q&#xff1a;blob 和 text 的区别&#xff1f;Q&#xff1a;datetime 和 timestamp 的区别&a…

DI依赖注入-P8,P9,P10,P11

1.构造器注入 之前写过了~~~~ 2.Set方式注入【重点】 3.拓展方式注入 2.Set方式注入【重点】 【环境搭建】 1.复杂类型 2.真实测试对象 四个文件 Student实体类的创建&#xff1a; 主要是依据官方文档来建立。那个Address也是为了测试不同的类型&#xff0c;而创建的引…

攻防世界misc2-1

misc2-1 题目描述&#xff1a;无 题目环境&#xff1a;https://download.csdn.net/download/m0_59188912/87094620 打开图片&#xff0c;发现无法显示。 使用winhex打开&#xff0c;从其中一段看出这是逆序图片。 使用python脚本将其正序排列。 脚本源码&#xff1a; f1open(‘…

5G无线技术基础自学系列 | SA及NSA组网架构

素材来源&#xff1a;《5G无线网络规划与优化》 一边学习一边整理内容&#xff0c;并与大家分享&#xff0c;侵权即删&#xff0c;谢谢支持&#xff01; 附上汇总贴&#xff1a;5G无线技术基础自学系列 | 汇总_COCOgsta的博客-CSDN博客 3GPP为新空中接口定义了两种部署配置&a…

操作系统笔记

文章目录一、操作系统的定义1.1 操作系统的功能和目标1.2 操作系统的特征1.3 操作系统的发展和分类1.4 操作系统的运行机制1.5 操作系统内核1.6 操作系统的体系结构二、中断机制中断和异常三、系统调用3.1 系统调用的分类&#xff08;按功能分配&#xff09;3.2 系统调用和库函…

整夜我的背影是一条踏往星空的道路

Brigit Pegeen Kelly&#xff0c;1951 - 2016.08.14&#xff0c;美国诗人、教师&#xff0c;在加利福尼亚州帕洛阿尔托出生&#xff0c;在印第安纳南部长达&#xff0c;成年后的大部分时间都在伊利诺州中部度过。一位非常注重隐私的女性&#xff0c;她的生活很少为人所知。[1][…

计算机基础学习(好文必看)

好长时间没发文章了&#xff0c;主要是以输入为主。 那么&#xff0c;给小伙伴们推荐一些计算机基础学习的知识&#xff0c;自己感觉挺不错的。 计算机基础学习&#xff1a; 1. 小林师傅是一位宝藏博主&#xff0c;非常厉害&#xff0c;各大平台都有他的文章&#xff0c;我是…

Day08--自定义组件的样式

提纲挈领&#xff1a; ************************************************************************************************************ 我的操作&#xff1a; 1》在app.wxss里面定义一个全局样式&#xff1a; 2》在home.wxml里面使用全局样式。 3》在test2.wxml里面使用…

Prometheus系列(一)安装

1 安装 Prometheus Server 官网&#xff1a;https://prometheus.io/ 下载&#xff1a;https://prometheus.io/download/ 手册&#xff1a;https://prometheus.io/docs/introduction/overview/ Prometheus 基于 Golang 编写&#xff0c;编译后的软件包&#xff0c;不依赖于任何的…

如何画业务流程图?

业务流程图是用来描述客户业务作业方式的有效手段&#xff0c;它可以清晰地客户业务流程中涉及的人员角色、业务活动、业务数据以及他们之间的关系&#xff0c;是用来澄清需求的有效手段。一个典型的业务流程图如下图所示&#xff1a; 在画业务流程图时有哪些注意事项呢&#x…