RabbitMQ系列(17)--延迟队列的简介与实现

news2025/1/16 13:44:04

1、延迟队列的概念

延迟队列内部是有序的,重要的特性体现在它的延迟属性上,延迟队列中的元素希望在指定时间到了之后或之前取出处理,简单的说延迟队列就是用来存放需要在指定时间被处理的元素的队列。

2、延迟队列的应用场景

(1)订单指定时间内未支付则自动取消

(2)用户发起退款,指定时间内未处理则通知相关运营人员

3、定时任务和延迟队列的取舍

以上场景都有一个特点,那就是都需要在某个事件发生前或发生后执行一项任务,如生成订单后,在十分钟后检查订单状态,未支付的订单将关闭,这种场景也可以用定时任务来处理,但数据量比价少的话确实可以用定时任务来处理,但在活动期间,订单的数据量可能会变得很庞大,对于庞大的数据,定时任务很难在1秒内检查完订单,从而不能及时的关闭未支付的订单,而且用定时任务来检查订单会给数据库带来很大的压力,所以在数据量大的情况下,定时任务无法满足业务需求且性能低下

4、延迟队列架构图 (后面我们就根据这个架构图进行代码的设计与实现)

  

5、延迟队列的实现

(1)新建一个名为config的包,用于装实现特定配置的代码

效果图:

(2)在config包里新建一个名为TtlQueueConfig的类用于编写配置队列延迟的代码

代码如下:

package com.ken.springbootrqbbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * 用于配置TTL队列的延迟时间
 */
@Configuration
public class TtlQueueConfig {

    //普通交换机的名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机的名称
    public static final String DEAD_EXCHANGE = "dead_exchange";

    //普通队列的名称
    public static final String NORMAL_QUEUE01 = "normal_queue01";
    public static final String NORMAL_QUEUE02 = "normal_queue02";

    //死信队列的名称
    public static final String DEAD_QUEUE = "dead_queue";

    //声明普通交换机
    @Bean("normalExchange")
    public DirectExchange normalExchange() {
        return new DirectExchange(NORMAL_EXCHANGE);
    }

    //声明交换机交换机
    @Bean("deadExchange")
    public DirectExchange deadExchange() {
        return new DirectExchange(DEAD_EXCHANGE);
    }

    //声明普通队列,TTL为10S
    @Bean("normalQueue01")
    public Queue normalQueue01() {
        Map<String, Object> arguments = new HashMap<>();
        //设置死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信RoutignKey
        arguments.put("x-dead-letter-routing-key","dead");
        //设置TTL
        arguments.put("x-message-ttl",10000);
        return QueueBuilder.durable(NORMAL_QUEUE01).withArguments(arguments).build();
    }

    //声明普通队列,TTL为40S
    @Bean("normalQueue02")
    public Queue normalQueue02() {
        Map<String, Object> arguments = new HashMap<>();
        //设置死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信RoutignKey
        arguments.put("x-dead-letter-routing-key","dead");
        //设置TTL
        arguments.put("x-message-ttl",40000);
        return QueueBuilder.durable(NORMAL_QUEUE02).withArguments(arguments).build();
    }

    //声明死信队列
    @Bean("deadQueue")
    public Queue deadQueue() {
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }

    //绑定队列1和普通交换机
    @Bean
    public Binding queue01BindNormalExchange(@Qualifier("normalQueue01") Queue normalQueue01,
                                             @Qualifier("normalExchange") DirectExchange normalExchange) {
        return BindingBuilder.bind(normalQueue01).to(normalExchange).with("normal01");
    }

    //绑定队列2和普通交换机
    @Bean
    public Binding queue02BindNormalExchange(@Qualifier("normalQueue02") Queue normalQueue02,
                                             @Qualifier("normalExchange") DirectExchange normalExchange) {
        return BindingBuilder.bind(normalQueue02).to(normalExchange).with("normal02");
    }

    //绑定队列2和普通交换机
    @Bean
    public Binding deadQueueBindDeadExchange(@Qualifier("deadQueue") Queue deadQueue,
                                             @Qualifier("deadExchange") DirectExchange deadExchange) {
        return BindingBuilder.bind(deadQueue).to(deadExchange).with("dead");
    }

}

(3)新建一个名为controller的包,用于装控制层的代码

效果图:

(4)新建一个名为SendMsgController的类用于充当生产者用于发送消息

 代码如下:

package com.ken.springbootrqbbitmq.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**
 * 发送延迟消息
 */
@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {

    @Autowired(required = false)
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message) {
        log.info("当前时间:{},发送一条消息给两个TTL队列:{}",new Date().toString(),message);
        rabbitTemplate.convertAndSend("normal_exchange","normal01","消息来着ttl为10s的队列:" + message);
        rabbitTemplate.convertAndSend("normal_exchange","normal02","消息来着ttl为40s的队列:" + message);
    }

}

(5)新建一个名为consumer的包,用于装消费者的代码

效果图:

(6)新建一个名为DeadQueueConsumer的类用于消费死信队列里的消息

代码如下:

package com.ken.springbootrqbbitmq.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 死信队列消费者
 */
@Slf4j
@Component
public class DeadQueueConsumer {

    //接收消息
    @RabbitListener(queues = "dead_queue")
    public void receiveMsg(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);
    }

}

(7)进入项目的启动类启动项目

(8)启动完毕后在浏览器地址栏输入http://localhost:8080/ttl/sendMsg/参数往队列发送消息

 (9)查看控制台的输出,发现分别在10s和40s后进行输出,这证明我们的延迟队列成功运行

6、延迟队列的优化

虽然上述能实现延迟队列,但上述的实现过程是一个队列只能延迟固定的已经设置好的时间,若想增加一个新的时间需要,用上述的实现方法就只能新增一个队列,这样很麻烦,所以我们需要优化延迟队列

(1)延迟队列优化架构图 (后面我们就根据这个架构图对延迟队列进行优化)

(2)修改config包里TtlQueueConfig类的代码,多加一些关于NormalQueue03队列的配置

代码如下:

package com.ken.springbootrqbbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * 用于配置TTL队列的延迟时间
 */
@Configuration
public class TtlQueueConfig {

    //普通交换机的名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机的名称
    public static final String DEAD_EXCHANGE = "dead_exchange";

    //普通队列的名称
    public static final String NORMAL_QUEUE01 = "normal_queue01";
    public static final String NORMAL_QUEUE02 = "normal_queue02";
    //自定义延迟时间队列的名称
    public static final String NORMAL_QUEUE03 = "normal_queue03";

    //死信队列的名称
    public static final String DEAD_QUEUE = "dead_queue";

    //声明普通交换机
    @Bean("normalExchange")
    public DirectExchange normalExchange() {
        return new DirectExchange(NORMAL_EXCHANGE);
    }

    //声明交换机交换机
    @Bean("deadExchange")
    public DirectExchange deadExchange() {
        return new DirectExchange(DEAD_EXCHANGE);
    }

    //声明普通队列,TTL为10S
    @Bean("normalQueue01")
    public Queue normalQueue01() {
        Map<String, Object> arguments = new HashMap<>(3);
        //设置死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信RoutignKey
        arguments.put("x-dead-letter-routing-key","dead");
        //设置TTL
        arguments.put("x-message-ttl",10000);
        return QueueBuilder.durable(NORMAL_QUEUE01).withArguments(arguments).build();
    }

    //声明普通队列,TTL为40S
    @Bean("normalQueue02")
    public Queue normalQueue02() {
        Map<String, Object> arguments = new HashMap<>(3);
        //设置死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信RoutignKey
        arguments.put("x-dead-letter-routing-key","dead");
        //设置TTL
        arguments.put("x-message-ttl",40000);
        return QueueBuilder.durable(NORMAL_QUEUE02).withArguments(arguments).build();
    }

    //声明普通队列,TTL为40S
    @Bean("normalQueue03")
    public Queue normalQueue03() {
        Map<String, Object> arguments = new HashMap<>(3);
        //设置死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信RoutignKey
        arguments.put("x-dead-letter-routing-key","dead");
        //设置TTL
        return QueueBuilder.durable(NORMAL_QUEUE03).withArguments(arguments).build();
    }

    //声明死信队列
    @Bean("deadQueue")
    public Queue deadQueue() {
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }

    //绑定队列1和普通交换机
    @Bean
    public Binding queue01BindNormalExchange(@Qualifier("normalQueue01") Queue normalQueue01,
                                             @Qualifier("normalExchange") DirectExchange normalExchange) {
        return BindingBuilder.bind(normalQueue01).to(normalExchange).with("normal01");
    }

    //绑定队列2和普通交换机
    @Bean
    public Binding queue02BindNormalExchange(@Qualifier("normalQueue02") Queue normalQueue02,
                                             @Qualifier("normalExchange") DirectExchange normalExchange) {
        return BindingBuilder.bind(normalQueue02).to(normalExchange).with("normal02");
    }

    //绑定队列3和普通交换机
    @Bean
    public Binding queue03BindNormalExchange(@Qualifier("normalQueue03") Queue normalQueue03,
                                             @Qualifier("normalExchange") DirectExchange normalExchange) {
        return BindingBuilder.bind(normalQueue03).to(normalExchange).with("normal03");
    }

    //绑定死信队列和死信交换机
    @Bean
    public Binding deadQueueBindDeadExchange(@Qualifier("deadQueue") Queue deadQueue,
                                             @Qualifier("deadExchange") DirectExchange deadExchange) {
        return BindingBuilder.bind(deadQueue).to(deadExchange).with("dead");
    }

}

(3)修改controller包里SendMsgController类的代码,多加一个调用自定义延迟时间NormalQueue03队列的接口

代码如下:

package com.ken.springbootrqbbitmq.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**
 * 发送延迟消息
 */
@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {

    @Autowired(required = false)
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message) {
        log.info("当前时间:{},发送一条消息给两个TTL队列:{}",new Date().toString(),message);
        rabbitTemplate.convertAndSend("normal_exchange","normal01","消息来着ttl为10s的队列:" + message);
        rabbitTemplate.convertAndSend("normal_exchange","normal02","消息来着ttl为40s的队列:" + message);
    }

    @GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
    public void sendMsg(@PathVariable String message,@PathVariable String ttlTime) {
        log.info("当前时间:{},发送一条时长{}毫秒的TTL消息给normal03队列:{}", new Date(),ttlTime,message);
        rabbitTemplate.convertAndSend("normal_exchange","normal03",message,msg -> {
            //发送消息的时候延迟时长
            msg.getMessageProperties().setExpiration(ttlTime);
            return msg;
        });
    }

}

 (4)进入项目的启动类重新启动项目

(5)启动完毕后分别在浏览器地址栏输http://localhost:8080/ttl/sendExpirationMsg/第一个参数/20000和http://localhost:8080/ttl/sendExpirationMsg/第二个参数/2000队列发送消息

例:

 (6)查看控制台的输出,发现第一条消息在20s后进行了输出,这证明我们优化后的延迟队列成功运行​,但当我们发送多条消息时,消息可能不会按时"死亡"从而不能按时把消息发送到死信队列,如图里的第二条消息,在第一条消息被消费后紧跟着被消费,而不是隔2秒后被消费,这是因为RabbitMQ只会检查第一条消息是否过期,过期则会被扔进死信队列,如果第一条消息延迟时间很长,第二条消息延迟时间很短,第二条消息也并不会被优先消费,而是等到第一条消息被消费后第二条消息再被消费,这时需要我们用另一种方法去实现延迟队列(另一种方法放在下一篇文章介绍)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/739155.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python实现本地电脑启动HTTP服务

在Python中&#xff0c;可以使用Python内置的http.server模块来启动一个简单的HTTP服务器。以下是一个简单的Python代码示例&#xff0c;实现本地电脑启动HTTP服务&#xff1a; import http.server import socketserverport 8081# 在当前目录下启动http服务器 Handler http.…

JavaWeb 速通JavaScript

目录 一、JavaScript快速入门 1.基本介绍 : 2.JavaScript特点 : 3.JavaScript使用方式 : 1 方式一 : 写在 2 方式二 : 以外部文件形式引入 PS : 注意事项 4.JavaScript查错方式 : 二、JavaScript数据类型 1.变量 : 2.数据类型 : 3.特殊值 : 三、JavaScript运算符 1.算…

解决forest低版本请求不安全的网站出现SSL认证不通过问题

文章目录 前言解决问题的步骤1、当然是百度2、官网3、看源码4、GPT5、直接去gitee上看源代码的问题 解决一解决二 前言 先说结论&#xff1a;无法解决 那既然无法解决&#xff0c;为啥还要写这样一篇文章呢&#xff0c;是因为这个问题我弄了一天多&#xff0c;我觉得有必要记…

SpringBoot开启子线程执行任务

目录 一、EnableAsync 二、Async 三、测试 一、EnableAsync 二、Async Service public class IotLocationServiceImpl {Asyncpublic void testA() {try {// 模拟阻塞Thread.sleep(5000);System.out.println("子线程执行完毕");} catch (InterruptedException e) {…

WPS Office AI实战:智能表格化身智能助理

前面我们已经拿 WPS AI 对Word文字、PPT幻灯片、PDF 做了开箱体验&#xff0c;还没有看过的小伙伴&#xff0c;请翻看以前的文章&#xff0c;本文开始对【智能表格】进行AI开箱测验。 表格在日常的数据处理中占绝对地位&#xff0c;但表格处理并不是每一个人都擅长&#xff0c;…

《向量数据库指南》——Milvus 中的向量索引概览和平面索引

目录 Milvus 中的向量索引 索引概览 平面索引 在之前的教程中,我们简单介绍了单词 Embedding 示例,了解了 Embedding 的强大,以及如何在向量数据库中进行向量存储和索引。此外,我们也简单介绍了最近邻搜索算法,这个问题涉及根据所选距离度量找到距离查询向量最接近的向…

spring boot+MySQL智慧食堂”设计与实现(包论文)

“智慧食堂”系统运用springboot框架开发&#xff0c;MIS的总体思想&#xff0c;以及MYSQL等技术的支持下共同完成了该系统的开发&#xff0c;实现了“智慧食堂”系统管理的信息化&#xff0c;使用户体验到更优秀的“智慧食堂”管理系统&#xff0c;管理员管理操作将更加方便&a…

青岛大学_王卓老师【数据结构与算法】Week05_01_栈和队列的定义和特点1_学习笔记

本文是个人学习笔记&#xff0c;素材来自青岛大学王卓老师的教学视频。 一方面用于学习记录与分享&#xff0c; 另一方面是想让更多的人看到这么好的《数据结构与算法》的学习视频。 如有侵权&#xff0c;请留言作删文处理。 课程视频链接&#xff1a; 数据结构与算法基础…

常用排序,快速排序,归并排序算法讲解

文章目录 快速排序归并排序 排序有很多种算法&#xff0c;常听的十大排序有&#xff1a;冒泡排序、选择排序、插入排序、快速排序、归并排序、堆排序、希尔排序、计数排序、基数排序、桶排序。 这里只介绍两个常用的算法。 排序&#xff1a; 快速排序归并排序 你可能想知道…

python安装cartopy

1.安装cartopy 创建新环境&#xff1a;https://www.machinelearningplus.com/deployment/conda-create-environment-and-everything-you-need-to-know-to-manage-conda-virtual-environment/ conda create --name mlenv python3.7.5 激活环境后&#xff0c;安装cartopy包&#…

IDEA+springboot+mybatis+shiro+bootstrap+Mysql网上书店管理系统源码

IDEAspringbootmybatisshirobootstrapMysql网上书店管理系统 一、系统介绍1.环境配置 二、系统展示1. 首页2.图书详情3.登录4. 注册5. 购物车6. 个人信息7.我的订单8.填写订单9.用户管理10.添加用户11.店铺列表12.添加店铺13.角色列表14.添加角色15.权限管理16.店铺信息17.我的…

十、SkyWalking链路追踪

解决什么问题&#xff1f; 串联整个调用链路&#xff0c;快速定位问题。缕清服务之间的依赖关系。对各个微服务接口的进行性能分析。跟踪整个业务流程的调用处理顺序。 一、skywalking介绍 1、skywalking是什么&#xff1f; skywalking是一个国产开源框架&#xff0c;2015年…

wordpress 单页中子页面列表制作方法

一、目标&#xff1a; 1、父页面显示所有子页面&#xff0c;如果是子页面就显示子页面对应父页面下的所有子页面。 2、选中的子页面链接显示不一样的样式。 二、代码 <?php$current_page_id get_the_ID(); // 获取当前页面的ID// 判断当前页面是否为父页面$is_parent_…

[数据结构 -- 手撕排序第三篇] 冒泡排序

目录 1、常见的排序算法 1.1 交换排序基本思想 2、冒泡排序的实现 2.1 基本思想 2.2 单趟排序 2.2.1 单趟排序分析 2.2.2 单趟排序实现代码 3、冒泡排序完整代码实现 3.1 思路分析 3.2 代码实现 4、时间复杂度 5、优化算法 5.1 优化算法思路 5.2 优化算法代码实现…

【Linux网络】网络基础(一)

Linux网络基础 1 网络背景1.1 网络发展1.2 局域网和广域网 2 认识“协议”2.1 网络协议分层&#xff08;高内聚&#xff0c;低耦合思想&#xff09;2.1.1 高内聚&#xff0c;低耦合简单理解2.1.2 协议分层的简单例子 3 OSI七层模型3.1 TCP/IP五层&#xff08;或四层&#xff09…

前端LayUI框架快速上手实现登入注册

目录 一、Layui简介 1. 什么是LayUI 2. LayUI的特点 二、LayUI入门 1. LayUI下载 2. LayUI入门使用 1. 在web项目中导入LayUI文件中的layui文件 2. 在JSP页面引入css、js文件 3. 制作一个简单的猜字游戏 4.效果展示 三、Layui实现简单登入功能 环境搭建 一、Layui简…

spring boot+MySQL实现班级综合测评管理系统

随着电子技术的普及和快速发展&#xff0c;线上管理系统被广泛的使用&#xff0c;有很多事业单位和商业机构都在实现电子信息化管理&#xff0c;班级综合测评管理也不例外&#xff0c;由比较传统的人工管理转向了电子化、信息化、系统化的管理。 传统的班级综合测评管理系统&am…

运维工具的学习(1)--Curl

1.1 curl简介 ​Curl是一个非常实用的、用来与服务器之间传输数据的工具&#xff1b;支持的协议包括 (DICT, FILE, FTP, FTPS, GOPHER, HTTP, HTTPS, IMAP, IMAPS, LDAP, LDAPS, POP3, POP3S, RTMP, RTSP, SCP, SFTP, SMTP, SMTPS, TELNET and TFTP)&#xff0c;curl设计为无用…

Kafka入门,offset的默认维护位置(二十一)

offset的默认维护位置 0.9版本之前&#xff1a;consumer默认将offset保持在zookeeper中 从0.9版本开始&#xff0c;consumer默认将offset保存在kafka一个内置的topic中&#xff0c;该topic为__consumer_offsets __consumer_offsets 主题里面采用key和value方式存储数据&#x…

【谷粒学院】NUXT框架

一、服务端渲染技术NUXT 1、什么是服务端渲染 服务端渲染又称SSR (Server Side Render)是在服务端完成页面的内容&#xff0c;而不是在客户端通过AJAX获取数据。 服务器端渲染(SSR)的优势主要在于&#xff1a;更好的 SEO&#xff0c;由于搜索引擎爬虫抓取工具可以直接查看完全…