redisson的延时队列机制简述

news2024/11/16 12:46:42

概述

业务中经常会遇到一些延迟执行的需求;通常想到的都是rabbitmq或者rocketmq的延迟消息;
但是系统中不一定集成了mq,但为了控制分布式下的并发,一般redis都是有集成的;
rediskey过期监听那个时间不准确,在集群环境下节点挂了也容易丢失;

那么用redisson的延迟队列,正好可以用来解决轻量级的延时消息;
简单的来说就是消费者生产了一个消息任务,塞到ZSet里(用当前时间戳+延迟时间作为分数),等时间到了,就会放到任务List中,然后消费者真正去执行任务都是从任务List中获取任务;

redisson中的消费者并不是一直轮询获取任务;而是有具体时间的延迟任务,时间到了去任务队列中获取任务;

redisson延时任务机制简述

生产者先将任务pushdelay_queue_timeout等待队列中,延迟时间到了,消费者会把任务从timeout队列挪到SANYOU任务队列中(消费者实际获取任务的队列),然后消费者就能拿到最终要执行的任务了;

这里具体要说的就是客户端通知和获取机制;
消费者在启动时通常都会去get一下队列,达到订阅队列的目的;

RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue("SANYOU");
RDelayedQueue<String> delayQueue = redissonClient.getDelayedQueue(blockingQueue);

这样做的目的:
消费者订阅队列,从delay_queue_timeout等待延迟队列中将已经到达时间的任务挪到真正的任务List队列中,然后再将delay_queue_timeout队列中第一个(也就是第一个要执行的)的任务的时间拿到,用这个时间开启一个延迟任务,时间到了之后,会发布一个消息到时间通知channel中;然后客户端监听到这个channel中的消息后,会再次重复上述步骤,让delay_queue_timeout中的任务,可以都放到真正的任务List队列中;

这样有一个好处就是不用一直while扫描等待,客户端的延迟任务时间和delay_queue_timeout中的延迟时间是一样的,可以精准利用cpu,理论上是没有延迟的,但是实际消息数量大量增加,消费者消费比较慢,还是会造成延迟任务消费延迟;

另外由于客户端都是用lua脚本去redis的同一个List队列中获取任务,lua脚本在redis中都是原子任务,而且redis真正的操作是单线程的,所以不会存在任务广播情况(并发获取时,一个任务不会被多个消费者同时拿到);

捞一张图片
在这里插入图片描述

代码Demo


import cn.hutool.extra.spring.SpringUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;


@Slf4j
@Component
public class RedissonDelayQueueConfig implements InitializingBean {

    @Resource
    private RedissonClient redissonClient;

    //延时队列map
    private final Map<String, RDelayedQueue<DelayMessageDTO>> delayQueueMap = new ConcurrentHashMap<>(16);


    /**
     * 消费者初始化所有队列,订阅对应的队列,并开启第一个过期任务的过期时间对应的延迟任务
     */
    @PostConstruct
    public void reScheduleDelayedTasks() {
        DelayQueueEnum[] queueEnums = DelayQueueEnum.values();
        for (DelayQueueEnum queueEnum : queueEnums) {
            RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueEnum.getCode());
            RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
        }
    }


    @Override
    public void afterPropertiesSet() {
        // 有新的延迟队列在这里添加,队列消费类需要继承DelayQueueConsumer
        DelayQueueEnum[] queueEnums = DelayQueueEnum.values();

        for (DelayQueueEnum queueEnum : queueEnums) {
            DelayQueueConsumer delayQueueConsumer = SpringUtil.getBean(queueEnum.getBeanName());
            if (delayQueueConsumer == null) {
                throw new ServiceException("queueName=" + queueEnum.getBeanName() + ",delayQueueConsumer=null,请检查配置...");
            }
            // Redisson的延时队列是对另一个队列的再包装,使用时要先将延时消息添加到延时队列中,当延时队列中的消息达到设定的延时时间后,
            // 该延时消息才会进行进入到被包装队列中,因此,我们只需要对被包装队列进行监听即可。
            RBlockingQueue<DelayMessageDTO> rBlockingQueue = redissonClient.getBlockingDeque(queueEnum.getCode());

            //消费者初始化队列
            RDelayedQueue<DelayMessageDTO> rDelayedQueue = redissonClient.getDelayedQueue(rBlockingQueue);
            //set到map中方便获取
            delayQueueMap.put(queueEnum.getCode(), rDelayedQueue);
            // 订阅新元素的到来,调用的是takeAsync(),异步执行
            rBlockingQueue.subscribeOnElements(delayQueueConsumer::execute);
        }
    }

    public RedissonClient getRedissonClient() {
        return redissonClient;
    }

    public Map<String, RDelayedQueue<DelayMessageDTO>> getDelayQueueMap() {
        return delayQueueMap;
    }
}








import cn.hutool.core.date.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;

import javax.annotation.Resource;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;


@Slf4j
@Component
public class DelayQueueUtil {

    private static RedissonDelayQueueConfig redissonDelayQueueConfig;

    @Resource
    public void setRedissonDelayQueueConfig(RedissonDelayQueueConfig redissonDelayQueueConfig) {
        DelayQueueUtil.redissonDelayQueueConfig = redissonDelayQueueConfig;
    }

    private static Map<String, RDelayedQueue<DelayMessageDTO>> getDelayQueueMap() {
        if(null == redissonDelayQueueConfig) return Collections.emptyMap();
        return redissonDelayQueueConfig.getDelayQueueMap();
    }

    private static RedissonClient getRedissonClient() {
        if(null == redissonDelayQueueConfig) return null;
        return redissonDelayQueueConfig.getRedissonClient();
    }

    /**
     * 添加延迟消息
     */
    public static void addDelayMessage(DelayMessageDTO delayMessage) {
        log.info("delayMessage={}", delayMessage);

        Assert.isTrue(getDelayQueueMap().containsKey(delayMessage.getQueueName()), "队列不存在");

        delayMessage.setCreateTime(DateUtil.now());
        if(null == delayMessage.getTimeUnit()){
            delayMessage.setTimeUnit(TimeUnit.SECONDS);
        }

        RDelayedQueue<DelayMessageDTO> rDelayedQueue = getDelayQueueMap().get(delayMessage.getQueueName());
        //移除相同的消息
        rDelayedQueue.remove(delayMessage);

        //添加消息
        rDelayedQueue.offer(delayMessage, delayMessage.getDelayTime(), delayMessage.getTimeUnit());
    }


    /**
     * 移除指定队列中的消息
     */
    public static void removeDelayMessage(DelayMessageDTO delayMessage) {
        log.info("取消:delayMessage={}", delayMessage);
        if (!getDelayQueueMap().containsKey(delayMessage.getQueueName())) {
            log.error("queueName={},该延迟队列不存在,请确认后再试...", delayMessage.getQueueName());
            return;
        }

        RDelayedQueue<DelayMessageDTO> rDelayedQueue = getDelayQueueMap().get(delayMessage.getQueueName());
        rDelayedQueue.remove(delayMessage);
        removeDelayQueue(delayMessage);
    }


    /**
     * 从所有队列中删除消息
     */
    public static void removeDelayQueue(DelayMessageDTO value) {
        DelayQueueEnum[] queueEnums = DelayQueueEnum.values();
        for (DelayQueueEnum queueEnum : queueEnums) {
            RBlockingDeque<Object> blockingDeque = getRedissonClient().getBlockingDeque(queueEnum.getCode());
            RDelayedQueue<Object> delayedQueue = getRedissonClient().getDelayedQueue(blockingDeque);
            delayedQueue.remove(value);
        }
    }



}

参考了大佬的博文
https://lhalcyon.com/delay-task/index.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1397115.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

初识React,基础(1), 安装react,jsx文件,类组件和函数组件,css样式

第一部分:初识react react: 用于构建用户界面的 JavaScript 库全局安装,win r, 命令: npm install create-react-app -g3. 创建一个react应用, 这里我在vscode 里面创建, 创建之后,运行 create-react-app my-appcd my-app npm start 第二部分: redact 组件定义以及使用 rea…

视频增强修复Topaz Video AI

Topaz Video AI是一款强大的视频增强软件&#xff0c;利用人工智能技术对数千个视频进行训练&#xff0c;结合多个输入视频的帧信息来提高素材的分辨率。该软件可将视频的分辨率提高到最高8K&#xff0c;并保持真实的细节和运动一致性。同时&#xff0c;它还能自动修复视频中的…

NFS网络共享存储服务技术攻略

目录 一.NFS 1.定义 2.特点 3.原理 二.服务端NFS配置文件 1.主配置文件 2.文件格式 3.相关命令 三.实验&#xff1a;NFS共享存储服务配置 1.服务端安装nfs-utils和rpcbind软件包 2.服务端新建共享目录给权限 3.服务端修改配置文件/etc/exports 4.服务端关闭防火墙…

【C++干货铺】C++11常用新特性 | 列表初始化 | STL中的变化

个人主页点击直达&#xff1a;小白不是程序媛 C系列专栏&#xff1a;C干货铺 代码仓库&#xff1a;Gitee 目录 C11简介 列表初始化 std::initializer_list std::initializer_list使用场景 decltype关键字 STL中的一些变化 新容器 array forward_list 容器中的一些新…

【Python_PySide6学习笔记(三十一)】基于PySide6实现自定义串口设备连接界面类:可实现串口连接断开、定时发送等功能

基于PySide6实现自定义串口设备连接界面类:可实现串口连接关闭、定时发送等功能 基于PySide6实现自定义串口设备连接界面类:可实现串口连接关闭、定时发送等功能前言一、界面布局二、串口相关功能实现三、完整代码四、调用方法五、实现效果基于PySide6实现自定义串口设备连接…

ChatGPT提示词保姆级教程

现在越来越多提示词教程&#xff0c;本文列个清单&#xff0c;方便以后整理&#xff0c;不定期更新&#xff0c;欢迎关注留言&#xff01; 后续更新欢迎关注 提示词&#xff08;prompt&#xff09;出来后&#xff0c;被称为一个新的岗位诞生&#xff0c;面向提示词工程师。 …

将vue项目打包成桌面客户端实现点击桌面图标直接进入项目

1.下载NW.js 下载地址&#xff1a;NW.js官网 下载完后zip解压 2.文件夹下新建index.html index内容如下&#xff1a; <!DOCTYPE html> <html> <head> </head> <body> <script language"javascript" type"text/javascript&q…

《游戏-01_2D-开发》

首先利用安装好的Unity Hub创建一个unity 2D&#xff08;URP渲染管线&#xff09;项目 选择个人喜欢的操作格局&#xff08;这里采用2 by 3&#xff09; 在Project项目管理中将双栏改为单栏模式&#xff08;个人喜好&#xff09; 找到首选项&#xff08;Preferences&#xff09…

K8s知识点总结_part1

Kubernetes 是什么 为用户提供一个具有普遍意义的容器编排工具。 它着重解决的问题是&#xff1a;大规模集群中的各种运行任务之间的关系处理&#xff0c;这些关系的处理&#xff0c;是作业编排和管理系统最困难的地方。 其能力有&#xff1a; • 基于容器的应用部署、维护和…

C#编程-使用反射检索元数据

使用反射检索元数据 术语反射通常用来指镜像。如果您站在镜子面前,镜子会反射出您的所有物理属性,如:您的身高、肤色和身体结构。在C#中也一样,反射被用于反射程序有关的所有信息。C#程序可以利用反射获得类在运行时的信息。 反射在运行时获取类型信息的过程中被使用。提…

Python自动化报告的输出用例详解

1、设计简单的用例 2、设计用例 以TestBaiduLinks.py命名 # coding:utf-8from selenium import webdriverimport unittestclass BaiduLinks(unittest.TestCase):def setUp(self):base_url https://www.baidu.comself.driver webdriver.Chrome()self.driver.implicitly_wait(…

监控系统——Zabbix

目录 Zabbix概述 Zabbix 监控原理 Zabbix 与 Prometheus的区别 Zabbix 6.0 新特性 Zabbix 6.0 功能组件 Zabbix Server 数据库 Web 界面 Zabbix Agent Zabbix Proxy Java Gateway Zabbix 6.0 部署 部署 zabbix 服务端 添加 zabbix 客户端主机 自定义监控内容…

如何使用idm下载百度网盘的资源

IDM是海内外都非常受欢迎的一款下载管理软件。它支持视频媒体嗅探和多线程下载&#xff0c;能够完美替代谷歌Chrome浏览器、Edge浏览器等浏览器的原生下载功能。在浏览器中单击下载链接时&#xff0c;idm将接管浏览器的原生下载工具并加快下载速度&#xff0c;支持HTTP&#xf…

Architecture Lab:part A 【实现sum_list/rsum_list/copy_block/熟悉Y86-64指令】

Architecture Lab 对应CS:APP的Chap 4——处理器体系结构。Part A要实现三个函数&#xff0c;分别为sum_list/rsum_list/copy_block。建议先得到x86-64指令&#xff0c;然后再转换为Y86-64指令。 准备工作 在misc目录下&#xff0c;键入以下命令用来生成汇编代码。命令执行完…

tui.calender日历创建、删除、编辑事件、自定义样式

全是坑&#x1f573;&#xff01;全是坑&#x1f573;&#xff01;全是坑&#x1f573;&#xff01;能不用就不用&#xff01; 官方文档&#xff1a;https://github.com/nhn/tui.calendar/blob/main/docs/en/apis/calendar.md 实例的一些方法&#xff0c;比如创建、删除、修改、…

【React】脚手架创建项目

文章目录 创建React项目目录结构分析了解PWA脚手架中的webpack 创建React项目 ◼ 创建React项目的命令如下&#xff1a; ​  注意&#xff1a;项目名称不能包含大写字母 ​  另外还有更多创建项目的方式&#xff0c;可以参考GitHub的readme 命令&#xff1a; create-rea…

Redis相关命令详解及其原理

Redis概念 Redis&#xff0c;英文全称是remote dictionary service&#xff0c;也就是远程字典服务。这是kv存储数据库。Redis&#xff0c;包括所有的数据库&#xff0c;都是请求-回应模式&#xff0c;通俗来说就是数据库不会主动地要给前台推送数据&#xff0c;只有前台发送了…

1026 程序运行时间 (15)

要获得一个 C 语言程序的运行时间&#xff0c;常用的方法是调用头文件 time.h&#xff0c;其中提供了 clock() 函数&#xff0c;可以捕捉从程序开始运行到 clock() 被调用时所耗费的时间。这个时间单位是 clock tick&#xff0c;即“时钟打点”。同时还有一个常数 CLK_TCK&…

使用zabbix-proxy进行分布式监控

目录 一、准备4台服务器 二、配置主从复制 1.准备环境 2.主机名解析 3.安装数据库 4.配置主库db1 5.配置从库db2 6.主从状态显示 三、db1&#xff0c;db2配置zabbix-agent 三、zabbix-server的配置 四、zabbix-proxy的配置 1.为您的平台安装和配置Zabbix-proxy a. …

勒索病毒:原理与防御

一、勒索病毒概述 勒索病毒&#xff0c;又称为Ransomware&#xff0c;是一种恶意软件&#xff0c;通过感染电脑系统、服务器或者手机等设备&#xff0c;使用户文件被加密&#xff0c;从而向用户索取赎金以解锁文件。近年来&#xff0c;勒索病毒已经成为网络安全领域的一大公害…