spring-boot 整合 redisson 实现延时队列(文末有彩蛋)

news2025/1/22 12:54:18

应用场景

通常在一些需要经历一段时间或者到达某个指定时间节点才会执行的功能,比如以下这些场景:

  • 订单超时提醒
  • 收货自动确认
  • 会议提醒
  • 代办事项提醒

为什么使用延时队列

对于数据量小且实时性要求不高的需求来说,最简单的方法就是定时扫描数据库。

但是,当数量达到数百万、上千万级别且时,定时扫库就显得非常低效且消耗资源,

甚至有些时间间隔小实时性要求高的情况,上一次扫描还没结束,下一次就又开始了,

这时候如果使用延时队列就会比较合适

延时队列的几种方式:

  • Quartz 定时任务实现扫库
  • DelayQueue JDK中提供了一组实现延迟队列的API
  • Redis sorted set
  • Redis 过期键监听回调
  • RabbitMQ 死信队列
  • RabbitMQ 基于插件实现延迟队列
  • Wheel 时间轮训算法

Redisson 实现延时队列

顾名思义 Redis son 就是 Redis 的儿子,举个栗子先:

1.引入 pom

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>${lastest.version}</version>
</dependency>

2.封装一个 RedissonQueue 类

@Service
public class RedissonQueue {

    public static final String QUEUE = "delayQueue";

    // 默认超时时间,30秒
    public static final Integer DEFAULT_TIMEOUT = 30;

    @Resource
    private RedissonClient redissonClient;

    // 加入任务并设置到期时间
    public void offer(String taskId, Integer timeout) {
        RDelayedQueue<String> delayedQueue = delayedQueue();
        delayedQueue.offer(taskId, Objects.isNull(timeout) ? DEFAULT_TIMEOUT : timeout, TimeUnit.SECONDS);
    }

    // 移除任务
    public void remove(String taskId) {
        RDelayedQueue<String> delayedQueue = delayedQueue();
        delayedQueue.removeIf(messageId -> messageId.equals(taskId));
    }

    // 任务列表
    public RDelayedQueue<String> delayedQueue() {
        RBlockingDeque<String> blockingDeque = blockingDeque();
        return redissonClient.getDelayedQueue(blockingDeque);
    }

    public RBlockingDeque<String> blockingDeque() {
        return redissonClient.getBlockingDeque(QUEUE);
    }

    public boolean isShutdown() {
        return redissonClient.isShutdown();
    }

    public void shutdown() {
        redissonClient.shutdown();
    }

}

3.交给 Spring 管理

@Slf4j
@Service
public class RedissonService implements ApplicationRunner {

    @Resource
    private RedissonQueue redissonQueue;

    @Resource(name = "threadPoolTaskExecutor")
    private ThreadPoolTaskExecutor executor;

    @Override
    public void run(ApplicationArguments args) {
        RBlockingDeque<String> blockingDeque = redissonQueue.blockingDeque();
        executor.execute(() -> {
            while (true) {
                if (redissonQueue.isShutdown()) {
                    return;
                } else {
                    String messageId = null;
                    try {
                        messageId = blockingDeque.take();
                    } catch (InterruptedException e) {
                        log.warn("RedissonConsumer error:{}", e.getMessage());
                    }
                    if (!Objects.isNull(messageId) && !messageId.isEmpty()) {
                        log.warn("timeout messageId : {}", messageId);
                    }
                }
            }
        });

    }

    // 初始化,启动服务就执行一次
    @PostConstruct
    public void init() {
        redissonQueue.delayedQueue();
    }

    @PreDestroy
    public void shutdown() {
        redissonQueue.shutdown();
    }

}

4.测试接口

@Operation(summary = "添加任务", description = "添加任务")
@PostMapping
public ResponseEntity<?> add(@RequestParam(value = "taskId", required = false) String taskId,
                             @RequestParam(value = "timeout", required = false) Integer timeout) {
    taskId = StringUtils.isEmpty(taskId) ? String.valueOf(snowflake.nextId()) : taskId;
    redissonQueue.offer(taskId, timeout);
    return ResponseEntity.ok().body(redissonQueue.delayedQueue());
}

@Operation(summary = "移除任务", description = "移除任务")
@DeleteMapping(value = "/{taskId}")
public ResponseEntity<?> remove(@PathVariable("taskId") String taskId) {
    redissonQueue.remove(taskId);
    return ResponseEntity.ok().body(redissonQueue.delayedQueue());
}

5.测试结果

添加10个任务

在这里插入图片描述

删除第1个任务

在这里插入图片描述

可以看到第一个任务删除后没有被执行(没有设置到期时间,默认为30秒到期)

在这里插入图片描述

实现原理

  • redisson_delay_queue_timeout:delayQueue,sorted set 数据类型,存放所有延迟任务,按延迟任务的到期时间戳(提交任务时间戳 +
    延迟时间)排序,所以列表最前面第一个元素就是整个延迟队列中最早被执行的任务。
  • redisson_delay_queue:delayQueue,list 数据类型,也是存放所有任务。
  • delayQueue,list 数据类型,被称为目标队列,这个里面存放的任务都是已经到延迟时间的,可以被消费者获取的任务,所以上面示例中
    RBlockingQueue 的 take 方法是从此目标队列中获取任务的。
  • redisson_delay_queue_channel:delayQueue,是一个 channel,用来通知客户端开启一个延迟任务
  • 生产者提交任务时将任务放到 redisson_delay_queue_timeout:delayQueue 中,提交任务的时间戳+延迟时间
  • 客户端会有一个延迟任务,这个延迟任务会向 Redis Server 发送一段 lua 脚本,Redis 执行 lua 脚本中的命令,此操作是原子性的

lua 脚本主要干两件事

  • 将到了延迟时间的任务从 redisson_delay_queue_timeout:delayQueue 中移除,存到 delayQueue 这个目标队列
  • 获取到 redisson_delay_queue_timeout:delayQueue 中最早到期时间的任务的到期时间戳,发布到 redisson_delay_queue_channel:
    delayQueue channel 中

当客户端监听到 redisson_delay_queue_channel:delayQueue 这个 channel 的消息时,会再次提交一个客户端延迟任务,延迟时间就是消息(最早到期时间任务的到期时间戳)当前时间戳
这个时间其实也就是 redisson_delay_queue_channel:delayQueue 中最早到期时间的任务的剩余的延迟时间。
一旦时间来到最早到期时间任务的到期时间戳,redisson_delay_queue_timeout:delayQueue 中最早到期时间的任务已经到期,客户端的延迟任务也同时到期,
于是开始执行 lua 脚本操作,及时将到期任务放到目标队列中。然后再次发布剩余的延迟任务中最早到期任务的到期时间戳到 channel
中,
如此循环运行下去,保证 redisson_delay_queue_timeout:delayQueue 中到期数据能及时放到目标队列中。
这里存在一个特殊情况,需要项目启动时就执行一次延时队列。因为由于没有客户端延迟任务的执行,
可能会出现 redisson_delay_queue_timeout:delayQueue 队列中有到期但是没有被放到目标队列的可能,启动就执行一次是为了保证到期的数据能被及时放到目标队列中。

结论

  • Redisson 方案理论上没有延迟,但当消息数量剧增,消费者消费缓慢这种情况下,可能会导致延迟任务消费的延迟。

  • 消息丢失问题 Redisson 方案最大程度上减轻消息丢失的可能性,因为所有任务都是存在 list 和 sorted set 两种数据类型中,Redis
    有持久化机制。除非整个 redis 集群宕机,可能丢失一小部分数据。

  • 广播任务问题,是不会出现的,因为每个客户端都是从同一个目标队列中获取任务。

Redisson 这种实现方案是比较合适且靠谱的,一般中小型项目建议用 Redisson 实现延迟队列,规模较大的项目直接上 MQ。

整合DEMO仓库地址

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1940869.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

sql常见50道查询练习题

sql常见50道查询练习题 1. 表创建1.1 表创建1.2 数据插入 2. 简单查询例题(3题&#xff09;2.1 查询"李"姓老师的数量2.2 查询男生、女生人数2.3 查询名字中含有"风"字的学生信息 3. 日期相关例题(6题&#xff09;3.1 查询各学生的年龄3.2 查询本周过生日的…

73、Flink 的 DataStream API 生产实践总结

0、汇总 1.可以使用 Maven 命令、CURL 命令、IDEA 手动创建 Flink 项目&#xff1b;2.可以使用 Maven Shade 插件将必需的依赖项打包进应用程序 jar 中&#xff1b;3.应该在 Flink 集群的 lib 文件夹内配置需要的&#xff08;核心&#xff09;依赖项&#xff1b;4.应该将程序中…

探索球形气膜的独特魅力:移动性与经济性的结合—轻空间

球形气膜结构因其独特的设计和多功能性而备受青睐。它不仅在空间利用方面有着显著优势&#xff0c;还展现出出色的移动性和经济性。以下是球形气膜的关键特点&#xff1a; 灵活的移动性 球形气膜以其轻便且易于移动的特性而闻名。这种结构设计使得气膜可以在不同场地之间快速组…

测试管理工具、自动化测试工具、跨浏览器测试工具 推荐

测试管理工具 1&#xff09;Xray Xray 是排名第一的手动与自动化测试管理应用&#xff0c;专为质量保证而设计。它是一个功能齐全的工具&#xff0c;能够无缝集成于 Jira 中。其目的是通过有效和高效的测试帮助公司提高产品质量。 功能特点&#xff1a; 需求、测试、缺陷和执…

Docker核心技术:容器技术要解决哪些问题

云原生学习路线导航页&#xff08;持续更新中&#xff09; 本文是 Docker核心技术 系列文章&#xff1a;容器技术要解决哪些问题&#xff0c;其他文章快捷链接如下&#xff1a; 应用架构演进容器技术要解决哪些问题&#xff08;本文&#xff09;Docker的基本使用Docker是如何实…

MySQL学习记录 —— 이십삼 MySQL服务器文件系统(3)

文章目录 1、数据字典2、系统表各种系统表 Mysql Schema是⼀个系统库&#xff0c;表中存储了MySQL服务器运行时所需的信息。广义上&#xff0c;mysql schema包含存储MySQL程序基本数据的数据字典和用于其他操作目的的系统表。数据字典表和系统表位于数据目录下一个名为mysql.ib…

Spring-Aop源码解析(二)

书接上文&#xff0c;上文说到&#xff0c;specificInterceptors 不为空则执行createProxy方法创建代理对象&#xff0c;即下图的createProxy方法开始执行&#xff0c;生成代理对象&#xff0c;生成代理对象有两种方式&#xff0c;JDK和CGLIB。 createAopProxy就是决定使用哪…

(ISPRS,2021)具有遥感知识图谱的鲁棒深度对齐网络用于零样本和广义零样本遥感图像场景分类

文章目录 Robust deep alignment network with remote sensing knowledge graph for zero-shot and generalized zero-shot remote sensing image scene classification相关资料摘要引言遥感知识图谱的表示学习遥感知识图谱的构建实体和关系的语义表示学习创建遥感场景类别的语…

【Git-驯化】手把手搭建Mac电脑中git环境配置以及连接github仓库

【Git-驯化】手把手搭建Mac电脑中git环境配置以及连接github仓库 本次修炼方法请往下查看 &#x1f308; 欢迎莅临我的个人主页 &#x1f448;这里是我工作、学习、实践 IT领域、真诚分享 踩坑集合&#xff0c;智慧小天地&#xff01; &#x1f387; 免费获取相关内容文档关…

【Linux】HTTP 协议

目录 1. URL2. HTTP 协议2.1. HTTP 请求2.2. HTTP 响应 1. URL URL 表示着是统一资源定位符(Uniform Resource Locator), 就是 web 地址&#xff0c;俗称“网址”; 每个有效的 URL 可以通过互联网访问唯一的资源, 是互联网上标准资源的地址; URL 的主要由四个部分组成: sche…

【MySQL-17】存储过程-[变量篇]详解-(系统变量&用户定义变量&局部变量)

前言 大家好吖&#xff0c;欢迎来到 YY 滴MySQL系列 &#xff0c;热烈欢迎&#xff01; 本章主要内容面向接触过C的老铁 主要内容含&#xff1a; 欢迎订阅 YY滴C专栏&#xff01;更多干货持续更新&#xff01;以下是传送门&#xff01; YY的《C》专栏YY的《C11》专栏YY的《Lin…

Pytorch使用前期准备

一、检查英伟达驱动和CUDA Toolkit是否正确安装 1.任务管理器性能选项卡中能正确显示显卡型号则表示显卡驱动正确安装 2. CUDA Toolkit会跟随pytorch自动安装 二、虚拟环境的准备 Miniconda — Anaconda documentationhttps://docs.anaconda.com/miniconda/ 1.安装anaconda或者…

Linux实用操作三

文章目录 Linux实用操作三网络传输ping命令介绍&#xff1a;示例&#xff1a; wget命令介绍&#xff1a;示例&#xff1a; curl命令介绍&#xff1a;示例&#xff1a; 端口介绍&#xff1a;端口的划分&#xff1a;查看端口占用&#xff1a; 进程管理进程介绍&#xff1a;查看进…

二十一、【机器学习】【非监督学习】- 谱聚类 (Spectral Clustering)​​

系列文章目录 第一章 【机器学习】初识机器学习 第二章 【机器学习】【监督学习】- 逻辑回归算法 (Logistic Regression) 第三章 【机器学习】【监督学习】- 支持向量机 (SVM) 第四章【机器学习】【监督学习】- K-近邻算法 (K-NN) 第五章【机器学习】【监督学习】- 决策树…

转置卷积方法

一、定义 1、卷积神经网络层通常会减少&#xff08;或保持不变&#xff09;采样输入图像的空间维度&#xff08;高和宽&#xff09;&#xff0c;另一种类型的卷积神经网络层&#xff0c;它可以增加上采样中间层特征图的空间维度&#xff0c; 用于逆转下采样导致的空间尺寸减小…

StringBuilder, Stringbuffer,StringJoiner

StringBuilder StringBuilder 代表可变字符串对象&#xff0c;相当于是一个容器&#xff0c;里面装的字符串是可以改变的&#xff0c;就是用来操作字符串的。 StringBuilder 比String更适合做字符串的修改操作&#xff0c;效率更高&#xff0c;代码更加的简洁。 public clas…

职升网:咨询工程师考试科目难不难?

咨询工程师考试包含四个科目&#xff0c;它们分别是《宏观经济政策与发展规划》、《工程项目组织与管理》、《项目决策分析与评价》以及《现代咨询方法与实务》。每个科目都有其独特的难度和特点。 《宏观经济政策与发展规划》&#xff1a;这一科目被认为是备考中相对容易的科…

ubuntu20.04支持win10远程桌面连接

1. 安装xrdp sudo apt install xrdp 2. 检查xrdp状态 sudo systemctl status xrdp 要处于running状态 3.&#xff08;若为Ubuntu 20&#xff09;添加xrdp至ssl-cert sudo adduser xrdp ssl-cert 4. 重启服务 sudo systemctl restart xrdp 5. window 远程桌面连接&am…

AVL树超详解上

前言 学习过了二叉树以及二叉搜索树后&#xff08;不了解二叉搜索树的朋友可以先看看这篇博客&#xff0c;二叉搜索树详解-CSDN博客&#xff09;&#xff0c;我们在一般情况下对于二叉搜索树的插入与查询时间复杂度都是O(lgN)&#xff0c;是十分快的&#xff0c;但是在一些特殊…

太速科技-基于XCVU9P+ C6678的8T8R的无线MIMO平台

基于XCVU9P C6678的8T8R的无线MIMO平台 一、板卡概述 板卡基于TI TMS320C6678 DSP和XCVU9P高性能FPGA&#xff0c;FPGA接入4片AD9361 无线射频&#xff0c;构建8输入8输出的无线MIMO平台&#xff0c;丰富的FPGA资源和8核DSP为算法验证和信号处理提供强大能力。 二…