redisson 延迟队列实现任务过期监听

news2025/1/10 17:26:10

一、需求:

任务超过一个小时以后,如果还为待执行状态,则自动转为结束状态。

二、实现:

  1. 创建延迟队列的监听任务RedisDelayedQueueListener,消费延迟队列;
  2. 创建新增延迟队列的类,用于创建延迟队列;
  3. 整体初始化,把监听任务与spring绑定,扫描各个监听延迟队列的实现类,并开启单独线程,监听任务;
  4. 创建延迟任务。

三、实现步骤:

1.引入redisson依赖,这里直接引入springboot整合好的依赖,如果引用原生的依赖,需要自己配置redissonClient Bean。

<dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson-spring-boot-starter</artifactId>
        <version>3.10.5</version>
</dependency>

2.创建延时队列监听接口,定义延时队列到期事件处理方法,消费延时队列

/**
 * redis 队列事件监听,需要实现这个方法
 * @param <T>
 */
public interface RedisDelayedQueueListener<T> {

    /**
     * 执行方法
     * @param t
     */
    void invoke(T t);
}

3.具体的延时队列消费实现

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 订单支付过期监听
 */
@Component
@Slf4j
public class OrderPayExpirationListener implements RedisDelayedQueueListener<String>{

    @Autowired
    private ITOrderService orderService;

    @Override
    public void invoke(String orderId) {
        log.info("===" + orderId + ===");
        //查询到订单,判断为未支付,修改订单状态
        TOrder order = orderService.lambdaQuery().eq(TOrder::getOrderId, orderId).one();
        if (order.getOrderStatus() == 1) { //订单未支付
            TOrder tOrder = new TOrder();
            tOrder.setOrderId(orderId);
            tOrder.setOrderStatus(0); //更新订单为取消状态
            orderService.updateById(tOrder);
        }
    }
}

4.初始化,把监听任务与spring绑定

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.util.Map;

/**
 * redis 延时队列初始化
 */
@Component
@Slf4j
public class RedisDelayedQueueInit implements ApplicationContextAware {

    @Autowired
    private RedissonClient redissonClient;

    /**
     * 获取应用上下文并获取相应的接口实现类
     * @param applicationContext
     * @throws BeansException
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Map<String, RedisDelayedQueueListener> map = applicationContext.getBeansOfType(RedisDelayedQueueListener.class);
        for (Map.Entry<String, RedisDelayedQueueListener> taskEventListenerEntry : map.entrySet()) {
            String listenerName = taskEventListenerEntry.getValue().getClass().getName();
            startThread(listenerName, taskEventListenerEntry.getValue());
        }
    }

    /**
     * 启动线程获取队列
     * @param queueName 队列名称
     * @param redisDelayedQueueListener 任务回调监听
     */
    private <T> void startThread(String queueName, RedisDelayedQueueListener redisDelayedQueueListener) {
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
        //由于此线程需要常驻,可以新建线程,不用交给线程池管理
        Thread thread = new Thread(() -> {
            log.info("启动监听队列线程" + queueName);
            while (true) {
                try {
                    T t = blockingFairQueue.take();
                    log.info("监听队列线程{},获取到值:{}", queueName, JSON.toJSONString(t));
                    redisDelayedQueueListener.invoke(t);
                } catch (Exception e) {
                    log.info("监听队列线程错误,", e);
                }
            }
        });
        thread.setName(queueName);
        thread.start();
    }
}

5.创建延时任务

import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;

/**
 * Redis 延时队列
 */
@Component
@Slf4j
public class RedisDelayedQueue {

    @Autowired
    private RedissonClient redissonClient;

    /**
     * 添加对象进延时队列
     * @param putInData 添加数据
     * @param delay     延时时间
     * @param timeUnit  时间单位
     * @param queueName 队列名称
     * @param <T>
     */
    private <T> void addQueue(T putInData,long delay, TimeUnit timeUnit, String queueName){
        log.info("添加延迟队列,监听名称:{},时间:{},时间单位:{},内容:{}" , queueName, delay, timeUnit,putInData);
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
        RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
        delayedQueue.offer(putInData, delay, timeUnit);
    }

    /**
     * 添加队列-秒
     *
     * @param t     DTO传输类
     * @param delay 时间数量
     * @param <T>   泛型
     */
    public <T> void addQueueSeconds(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {
        addQueue(t, delay, TimeUnit.SECONDS, clazz.getName());
    }

    /**
     * 添加队列-分
     *
     * @param t     DTO传输类
     * @param delay 时间数量
     * @param <T>   泛型
     */
    public <T> void addQueueMinutes(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {
        addQueue(t, delay, TimeUnit.MINUTES, clazz.getName());
    }

    /**
     * 添加队列-时
     *
     * @param t     DTO传输类
     * @param delay 时间数量
     * @param <T>   泛型
     */
    public <T> void addQueueHours(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {
        addQueue(t, delay, TimeUnit.HOURS, clazz.getName());
    }
    /**
     * 添加队列-天
     *
     * @param t     DTO传输类
     * @param delay 时间数量
     * @param <T>   泛型
     */
    public <T> void addQueueDays(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {
        addQueue(t, delay, TimeUnit.DAYS, clazz.getName());
    }
}

6.此时只需要再下单成功的方法里面新增以下逻辑即可

@Autowired
 private RedisDelayedQueue redisDelayedQueue;

 //将订单id放入延时队列,配置过期监听的处理类
redisDelayedQueue.addQueueHours(id,2, OrderPayExpirationListener.class);

以上参考:https://www.cnblogs.com/huaixiaonian/p/16978606.html

四、我的优化

4.1 此场景中,ApplicationContextAware存在的问题

介绍ApplicationContextAware和ApplicationRunner的区别

  • ApplicationContextAware:在Bean初始化过程中initializeBean()函数中;(项目没启动完成)
  • ApplicationRunner:在所有bean都初始化完成后调用,在AfterFinish中执行;

因此ApplicationContextAware初始化会有两个问题:

  1. 未完全启动完成就监听,可能会导致消费队列的相关类未全部加载完成,导致在启动完成前这段时间,消息消费异常;
  2. 代码里是新建线程异步消费,当有系统启动异常时,线程还在启动着,会不断打印log.info(“监听队列线程错误,”, e);

4.1.2 优化一:ApplicationRunner替代ApplicationContextAware

@Slf4j
@Component
public class RedisDelayedQueueInitRunner implements ApplicationRunner {
 .......
}
@Override
    public void run(ApplicationArguments args) {
        String listenerName = String.format("XX", redisDelayedQueueListener.getClass().getSimpleName());
        startThread(listenerName, redisDelayedQueueListener);
    }

4.2 上次关闭的时候的消息到期了,不会马上发送

上次关闭的时候的消息到期了,不会马上发送,要等新消息来,才会消费。

原因:因为是在添加消息的时候才初始化管道的:
在这里插入图片描述
解决方法:这个地方吧管道开启就可以了
在这里插入图片描述
这个是在启动的时候去执行 要在invoke 方法里面捕获,防止启动失败了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2156916.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

LeetCode 热题 100 回顾17

干货分享&#xff0c;感谢您的阅读&#xff01;原文见&#xff1a;LeetCode 热题 100 回顾_力code热题100-CSDN博客 一、哈希部分 1.两数之和 &#xff08;简单&#xff09; 题目描述 给定一个整数数组 nums 和一个整数目标值 target&#xff0c;请你在该数组中找出 和为目标…

HTML翻牌器:用CSS和HTML元素创造动态数字展示

HTML翻牌器&#xff1a;用CSS和HTML元素创造动态数字展示 前言 翻牌器是一种数字动态展示形式&#xff0c;在生活中常见的例如翻牌计分、翻牌时钟等。 之所以以翻牌的形式是因为其物理设计的原因使其只能滚动翻牌展示数字&#xff0c;在电子显示设备不普及时&#xff0c;使用…

PMBOK® 第六版 估算活动持续时间

目录 读后感—PMBOK第六版 目录 在项目管理中&#xff0c;尤其是在软件开发这样的复杂项目中&#xff0c;工作内容是多种多样的。从需求分析、设计、编码到测试和部署&#xff0c;每个阶段都有其独特的挑战和不确定性。 没有人能独自完成所有估算工作并做到绝对精准。估算涉及…

【Unity Shader】Special Effects(九)Vortex 旋涡(UI)

源码:[点我获取源码] 索引 Vortex 旋涡思路分析旋涡中心旋涡旋转旋涡强度旋涡动画Vortex 旋涡 旋涡效果可以将一张图像以指定点作为旋涡中心,呈顺时针旋涡动画效果,使用动画播放器: 思路分析 首先,旋涡特效的核心也即是旋转(特别是uv坐标的旋转); 在此基础上,旋涡中…

Vue(15)——组合式API②

生命周期函数 选项式组合式beforeCreate/createdsetupbeforeMountonBeforeMount mountedonMounedbeforeUpdateonBeforeUpdateupdatedonUpdatedbeforeUnmountonBeforeUnmountunmountedonUnmounted 父子通信 父传子基本思想&#xff1a; 父组件中给子组件绑定属性…

Vue Devtools -----一条龙安装教程 + 解决安装使用过程的一些问题

一条龙安装教程&#xff08;首次 安装看这里&#xff09; 点击下方网址 进入下载页面 安装 |Vue 开发工具 (vuejs.org)https://devtools-v6.vuejs.org/guide/installation.html 选择适合自己浏览器的版本 以Edge为例&#xff0c;点击下载即可 我以为已经下载过了&#xff0c;…

BUUCTF-MISC-数据包中的线索

下载题目文件&#xff0c;解压发现是一段流量包 使用Wireshark打开 首先过滤HTTP数据流 然后追踪HTTP数据流 通过追踪数据流可以发现 流7 当中有一段base64编码&#xff0c;我们尝试解码 base64基本特征 Base64编码只包含64个字符&#xff1a;大写字母&#xff08;A-Z&#x…

计算机网络笔记002

### 课堂讨论对话 **学生A**: 老师&#xff0c;计算机网络的组成是怎样的&#xff1f;&#x1f914; **老师**: 非常好的问题&#xff01;计算机网络主要由硬件、软件和通信协议三部分组成。我们先从硬件开始讨论吧。 **学生B**: 硬件包括哪些设备呢&#xff1f;&#x1f60…

cmd快速进入文件夹目录下

首先&#xff0c;将文件夹直接点击左键拖动至cmd窗口中&#xff0c;就可以得到目录路径。 还有就是&#xff0c;在命令行直接敲入D:或者C:就可以在磁盘之间进行转换&#xff0c;注意冒号不要丢。 再有&#xff0c;如果进入某磁盘中的一个文件夹&#xff0c;使用cd命令。路径获取…

zabbix email 告警

配置媒介、触发器动作&#xff08;动作、操作&#xff09; 为用户 定义媒体&#xff0c;比如电子邮件地址 动作 - 条件

[图解]静态关系和动态关系

1 00:00:01,060 --> 00:00:04,370 首先我们来看静态关系和动态关系 2 00:00:06,160 --> 00:00:10,040 我们要尽量基于静态关系来建立动态关系 3 00:00:11,740 --> 00:00:13,740 不能够在没有这个的基础上 4 00:00:14,220 --> 00:00:17,370 没有这个的情况下就胡…

2024PHP彩虹工具网源码一个多功能工具箱程序支持72种常用站长和开发等工具

安装&#xff1a; PHP>7.4 伪静态设置Thinkphp 设置/public为网站运行目录 访问你的域名/install进行安装即可 安装扩展 sg11 &#xff0c;fileinfo &#xff0c; ionCube 常用功能 站长工具&#xff1a;ICP备案查询、IP地址查询、域名Whios查询、腾讯域名拦截查询、Mysql…

828华为云征文 | 构建高效搜索解决方案,Elasticsearch Kibana的完美结合

前言 构建高效搜索解决方案&#xff0c;FlexusX服务器与Elasticsearch & Kibana的完美结合&#xff0c;为企业带来云端搜索新体验。FlexusX实例以其卓越性能与灵活扩展性&#xff0c;确保高并发搜索的流畅运行。部署Elasticsearch&#xff0c;享受分布式搜索的精准与快速&a…

从Yargs源码学习中间件的设计

yargs中间件介绍 yargs 是一个用于解析命令行参数的流行库&#xff0c;它能帮助开发者轻松地定义 CLI&#xff08;命令行接口&#xff09;&#xff0c;并提供参数处理、命令组织、help文本自动生成等功能。今天我们来学习一下它对中间件的支持。 中间件的API详细信息&#xff0…

Python | Leetcode Python题解之第430题扁平化多级双向链表

题目&#xff1a; 题解&#xff1a; class Solution:def flatten(self, head: "Node") -> "Node":def dfs(node: "Node") -> "Node":cur node# 记录链表的最后一个节点last Nonewhile cur:nxt cur.next# 如果有子节点&#…

旋转机械故障数据集 全网首发

旋转机械故障 数据集 11G资料 泵、齿轮箱、电机、流量、液压系统、轴承(西储大学、辛辛那提大学、FEMTO、MOSFET)、PHM08挑战数据集、我闪发动机降级模拟数据集、铣床等 旋转机械故障数据集 数据集描述 该数据集是一个综合性的旋转机械故障检测和诊断数据集&#xff0c;旨在…

【ChatGPT】提示词助力广告文案、PPT制作与书籍推荐的高效新模式

博客主页&#xff1a; [小ᶻZ࿆] 本文专栏: AIGC | ChatGPT 文章目录 &#x1f4af;前言&#x1f4af;高效广告推销文案提示词使用方法 &#x1f4af;AI自动生成PPT全流程提示词使用方法 &#x1f4af;精选书籍推荐爆款文案提示词使用方法 &#x1f4af;小结 &#x1f4af;…

【VUE3.0】动手做一套像素风的前端UI组件库---Radio

目录 引言做之前先仔细看看UI设计稿解读一下都有哪些元素&#xff1a;参考下成熟的组件库&#xff0c;看看还需要做什么&#xff1f; 代码编写1. 设计group包裹选项的组件group.vueitem.vue 2. 让group的v-model和item的value联动起来3. 完善一下item的指示器样式4. 补充禁用模…

【测试】——JUnit

&#x1f4d6; 前言&#xff1a;JUnit 是一个流行的 Java 测试框架&#xff0c;主要用于编写和运行单元测试&#xff0c;用来管理测试用例。本文采用JUnit 5 目录 &#x1f552; 1. 添加依赖&#x1f552; 2. 注解&#x1f558; 2.1 Test&#x1f558; 2.2 BeforeAll AfterAll&…

【Docker】基于docker compose部署artifactory-cpp-ce服务

基于docker compose部署artifactory-cpp-ce服务 1 环境准备2 必要文件创建与编写3 拉取镜像-创建容器并后台运行4 访问JFog Artifactory 服务 1 环境准备 docker 以及其插件docker compose &#xff0c;我使用的版本如下图所示&#xff1a; postgresql 的jdbc驱动, 我使用的是…