Redis 实现的延时队列组件

news2025/1/19 17:19:07

最近看开源看到一个好用的延时队列组件,已经上生产。代码量很少,主要就是利用Redis监听过期键实现的。然后搞点策略模式柔和柔和。利用Spring Start 封装了一下,全是俺掌握的知识,稍微研究了下就搞懂了。觉得挺有用的,这里分享一下。

Redis 过期键监听

之前写责任链手撸二级缓存的时候,也是借助过期键监听器来更新二级缓存的,详情挪步
CaffeineCache+Redis 接入系统做二层缓存,SPI 思路实现(借鉴 mybatis 二级缓存、自动装配源码)

效果

效果前提: Redis 开启了过期键通知:config set notify-keyspace-events Ex

根据 code 值发布延时任务(10s)。
在这里插入图片描述
对应的code 的处理器,10s后收到通知进行处理任务

在这里插入图片描述
在这里插入图片描述

基于这套组件可实现的功能:订单超时自动取消、会议前 30 分钟自动提醒、订单到点自动收货等,比MQ灵活性更高,RocketMq 老版只支持最高30 分钟的延时任务,这套组件可以指定任意时间。且可无限扩展 topic,满足不同类型的业务。缺点就是严重依赖Redis,需要保证Redis的高可用

RedisExpiredListener配置

利用ApplicationContextAware注册所有的messageHandleRouter处理器,当有消息过来时解析消息格式中的CODE,根据CODE把任务分发给具体的某个messageHandleRouter实现类进行处理。进行业务隔离。

package com.zzh.mybatisplus5.mq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;

import java.util.HashMap;

public class RedisExpiredListener implements MessageListener, ApplicationContextAware {

    /**
     * 客户端监听订阅的topic,当有消息的时候,会触发该方法;
     * 并不能得到value, 只能得到key。
     * 姑且理解为: redis服务在key失效时(或失效后)通知到java服务某个key失效了, 那么在java中不可能得到这个redis-key对应的redis-value。
     */
    protected HashMap<Integer, DelayedMessageHandler> handlerRouter;

    private static final Logger logger = LoggerFactory.getLogger(RedisExpiredListener.class);

    @Override
    public void onMessage(Message message, byte[] bytes) {
        String expiredKey = message.toString();
        // TASK:CODE:VALUE结构
        String[] split = expiredKey.split(":");
        if (split.length < 2 || !expiredKey.startsWith("TASK:")) {
            return;
        }
        logger.info("[Redis键失效通知] key=" + expiredKey);
        StringBuilder value = new StringBuilder();
        for (int i = 2; i < split.length; i++) {
            value.append(split[i]);
            if (i != split.length - 1) {
                value.append(":");
            }
        }
        int code = Integer.parseInt(split[1]);
        DelayedMessageHandler handler = handlerRouter.get(code);
        if (handler != null) {
            handler.handle(value.toString());
        }
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.handlerRouter = (HashMap<Integer, DelayedMessageHandler>) applicationContext.getBean("messageHandleRouter");
    }
}

DelayedMessageQueue实现类

基础配置类,里面配置了监听哪个 Redis 库的过期事件

package com.zzh.mybatisplus5.mq;

import com.zzh.mybatisplus5.component.CacheComponent;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.concurrent.Callable;


public class RedisNotifyDelayedMessageQueueImpl implements DelayedMessageQueue {

    @Autowired
    private CacheComponent cacheComponent;

    @Override
    public Boolean publishTask(Integer code, String value, Integer delay) {
        if (delay < 0) {
            delay = 1;
        }
        cacheComponent.putRaw(assembleKey(code, value), "", delay);
        return true;
    }

    @Override
    public Boolean deleteTask(Integer code, String value) {
        cacheComponent.del(assembleKey(code, value));
        return true;
    }

    @Override
    public Long getTaskTime(Integer code, String value) {
        return cacheComponent.getKeyExpire(assembleKey(code, value));
    }

    @Override
    public Boolean publishTask(Callable task, Integer delay) {
        throw new RuntimeException();
    }

    public String assembleKey(Integer code, String value) {
        if (value == null) {
            value = "";
        }
        StringBuilder sb = new StringBuilder("TASK:");
        sb.append(code + ":");
        sb.append(value);
        return sb.toString();
    }
}

Redis 配置

package com.zzh.mybatisplus5.mq;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.*;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;

import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@Configuration
public class RedisAutoConfig {

    private static final Logger logger = LoggerFactory.getLogger(RedisAutoConfig.class);

    @Value("${spring.redis.database}")
    private Integer cacheDB;

    @Bean
    public Map<Integer, DelayedMessageHandler> messageHandleRouter(List<DelayedMessageHandler> delayedMessageHandlerList) {
        return delayedMessageHandlerList.stream().collect(Collectors.toMap(DelayedMessageHandler::getCode, v -> v));
    }

    @Bean
    public RedisExpiredListener redisExpiredListener() {
        return new RedisExpiredListener();
    }

    /**
     * 指定 redis 库运行 config set notify-keyspace-events Ex 即可,不然监听无法生效
     * redis服务端需要配置 notify-keyspace-events 参数 ,至少包含k或者e
     * K 键空间通知,所有通知以 __keyspace@<db>__ 为前缀
     * E 键事件通知,所有通知以 __keyevent@<db>__ 为前缀
     * g DEL 、 EXPIRE 、 RENAME 等类型无关的通用命令的通知
     * $ 字符串命令的通知
     * l 列表命令的通知
     * s 集合命令的通知
     * h 哈希命令的通知
     * z 有序集合命令的通知
     * x 过期事件:每当有过期键被删除时发送
     * e 驱逐(evict)事件:每当有键因为 maxmemory 政策而被删除时发送
     * A 参数 g$lshzxe 的别名
     *
     * @后边可以指定db库,*代表所有库,0代表0库 __keyevent@0__:expired 0库过期的数据
     * __keyspace@0__:mykey   0库mykey这个键的所有操作
     * __keyevent@0__:del     0库所有del这个命令
     */
    @Bean
    public RedisMessageListenerContainer container(LettuceConnectionFactory defaultLettuceConnectionFactory, RedisExpiredListener expiredListener) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(defaultLettuceConnectionFactory);
        //监听指定库的过期key
        container.addMessageListener(expiredListener, new PatternTopic("__keyevent@" + cacheDB + "__:expired"));
        return container;
    }

    @Bean
    public DelayedMessageQueue delayedMessageQueue() {
        return new RedisNotifyDelayedMessageQueueImpl();
    }

    @Bean
    public LettuceConnectionFactory defaultLettuceConnectionFactory(
            RedisConfiguration defaultRedisConfig,GenericObjectPoolConfig defaultPoolConfig) {
        LettuceClientConfiguration clientConfig =
                LettucePoolingClientConfiguration.builder().commandTimeout(Duration.ofMillis(5000))
                        .poolConfig(defaultPoolConfig).build();
        return new LettuceConnectionFactory(defaultRedisConfig, clientConfig);
    }

    @Bean
    public RedisTemplate<String, String> redisTemplate(
            LettuceConnectionFactory defaultLettuceConnectionFactory) {
        RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(defaultLettuceConnectionFactory);
        return redisTemplate;
    }

    @Bean
    public StringRedisTemplate stringRedisTemplate(LettuceConnectionFactory defaultLettuceConnectionFactory) {
        StringRedisTemplate stringRedisTemplate = new StringRedisTemplate();
        stringRedisTemplate.setConnectionFactory(defaultLettuceConnectionFactory);
        return stringRedisTemplate;
    }


    @Configuration
    public static class DefaultRedisConfig {
        @Value("${spring.redis.master-name}")
        private String masterName;
        @Value("${spring.redis.mode}")
        private String mode;
        @Value("${spring.redis.host:127.0.0.1:6379}")
        private String host;
        @Value("${spring.redis.password:}")
        private String password;
        @Value("${spring.redis.database:0}")
        private Integer database;

        @Value("${spring.redis.lettuce.pool.max-active:8}")
        private Integer maxActive;
        @Value("${spring.redis.lettuce.pool.max-idle:8}")
        private Integer maxIdle;
        @Value("${spring.redis.lettuce.pool.max-wait:-1}")
        private Long maxWait;
        @Value("${spring.redis.lettuce.pool.min-idle:0}")
        private Integer minIdle;

        @Bean
        public GenericObjectPoolConfig defaultPoolConfig() {
            GenericObjectPoolConfig config = new GenericObjectPoolConfig();
            config.setMaxTotal(maxActive);
            config.setMaxIdle(maxIdle);
            config.setMinIdle(minIdle);
            config.setMaxWaitMillis(maxWait);
            return config;
        }

        @Bean
        public RedisConfiguration defaultRedisConfig() {
            return getRedisConfiguration(masterName, mode, host, password, database);
        }

    }

    private static RedisConfiguration getRedisConfiguration(String masterName, String mode, String host, String password, Integer database) {
        if (mode.equals("single")) {
            RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
            String[] hostArray = host.split(":");
            config.setHostName(hostArray[0]);
            config.setPassword(RedisPassword.of(password));
            config.setPort(Integer.parseInt(hostArray[1]));
            config.setDatabase(database);
            return config;
        } else if (mode.equals("sentinel")) {
            RedisSentinelConfiguration configuration = new RedisSentinelConfiguration();
            configuration.setMaster(masterName);
            String[] hostList = host.split(",");
            List<RedisNode> serverList = new LinkedList<>();
            for (String hostItem : hostList) {
                String[] hostArray = hostItem.split(":");
                RedisServer redisServer = new RedisServer(hostArray[0], Integer.parseInt(hostArray[1]));
                serverList.add(redisServer);
            }
            configuration.setSentinels(serverList);
            logger.info("[Redis] 哨兵节点: masterName={}, host={}", masterName, host);
            return configuration;
        } else {
            return null;
        }
    }

}

顶级策略接口

没啥好说的,老三样

package com.zzh.mybatisplus5.mq;

import java.util.concurrent.Callable;

public interface DelayedMessageQueue {


    /**
     * 添加延迟秒数,RingDelayedMessageQueueImpl专用,单机实现
     * * @param delay seconds
     * @return
     */
    public Boolean publishTask(Callable task, Integer delay);

    /**
     * RedisNotifyDelayedMessageQueueImpl专用,集群实现
     * 这两个都会被拼接为 TASK:(随机码):CODE:VALUE 当成key存入redis中,因为回调时只会返回key,而不会返回key对应的值
     * @param code 回调时用来选择的Handler的CODE
     * @param value 回调时使用的值
     * @param delay 多少秒后调用
     * @return
     */
    public Boolean publishTask(Integer code, String value, Integer delay);

    /**
     * 删除已有任务
     * @param code
     * @param value
     * @return
     */
    public Boolean deleteTask(Integer code, String value);

    /**
     * 获取指定任务还有多少时间执行,如果不存在,返回-2
     * @param code
     * @param value
     * @return
     */
    public Long getTaskTime(Integer code, String value);
}
package com.zzh.mybatisplus5.mq;

/**
 * 延迟消息处理器
 */
public interface DelayedMessageHandler {

    /**
     *
     * @param value
     * @return 处理成功的返回大于0结果,失败返回0
     */
    public int handle(String value);

    public int getCode();
}

延时队列设计思路

和我之前使用策略模式封装的多个OSS使用,写法简直是一毛一样,详情挪步。策略模式调优(多Oss存储导致代码冗余的问题)在这里插入图片描述

延时队列消息丢失怎么解决

开个定时任务,每隔一分钟定时进行扫表,加了索引、延时消息丢失不多的情况下,查数据会很快。扫到有超时的订单,接着丢到 Redis 延时队列里面,双重保险。同时定时任务加个分布式锁,一台机器运行即可。​代码爆红是因为,我拉的开源项目,没跑起来直接看的源码。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1910408.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

人话学Python-基础篇-数字计算

一&#xff1a;数字类型 对于最常见的数据类型,数字在Python中分为三类&#xff1a; 整型(int) 表示的是整数类型的所有数字&#xff0c;包括正整数&#xff0c;负整数和0。和C语言不同的是&#xff0c;Python中的int型没有范围的限制&#xff0c;理论上可以从无限小的整数取到…

想实现随时随地远程访问?解析可道云teamOS内网穿透功能

在数字化时代&#xff0c;无论是个人还是企业&#xff0c;都面临着数据共享与远程访问的迫切需求。 比如我有时会需要在家中加班&#xff0c;急需访问公司内网中的某个关键文件。 然而&#xff0c;由于公网与内网的天然隔阂&#xff0c;这些需求往往难以实现。这时&#xff0c…

智慧运维管理平台建设方案(PPT原件)

1、智慧运维系统建设背景 2、智慧运维系统建设目标 3、智慧运维系统建设内容 4、智慧运维系统建设技术 5、智慧运维系统建设流程 6、智慧运维系统建设收益 软件全套资料获取及学习&#xff1a;本文末个人名片直接获取或者进主页。

前端工程师

15年前&#xff0c;前端主流的框架jquery&#xff0c;那个时候还没有前端工程师,后端开发人员既要写后台业务逻辑&#xff0c;又要写页面设计&#xff0c;还要应付IE不同版本浏览器兼容问题&#xff0c;非常的繁琐、难搞。 现在前端框架很多、很强大&#xff0c;前端开发工程师…

响应式的vue框架搭建个人博客网站模板

Vue框架搭建个人博客站点&#xff0c;html5 响应式个人博客模板 微信扫码免费获取源码

(NeurIPS,2022)Knowledge-CLIP:使用知识图谱进行CLIP

文章目录 Contrastive Language-Image Pre-Training with Knowledge Graphs相关资料摘要引言回顾CLIPKnowledge-CLIP数据准备模型架构训练目标 Contrastive Language-Image Pre-Training with Knowledge Graphs 相关资料 论文&#xff1a;Contrastive Language-Image Pre-Tra…

基于springboot+vue+uniapp的机电公司管理信息系统

开发语言&#xff1a;Java框架&#xff1a;springbootuniappJDK版本&#xff1a;JDK1.8服务器&#xff1a;tomcat7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/ideaMaven包&#…

js计算两个日期直接的间隔天,2018/12/14到2017/11/10有多少天

const startDate new Date(2017-11-10)const endDate new Date(2018-12-14)const diffTime Math.abs(endDate - startDate)const diffDays Math.ceil(diffTime / (1000 * 60 * 60 * 24))console.log(diffDays) // 输出天数差 人工智能学习网站 https://chat.xutongbao.top…

如何将libwebsockets库编译为x86架构

在之前的文章中&#xff0c;我们已经详细介绍了如何交叉编译libwebsockets并将其部署到ELF 1开发板上。然而在调试阶段&#xff0c;发现将libwebsockets在Ubuntu环境下编译为x86架构可能更为方便和高效。 通过在主机环境中编译运用x86架构下的libwebsockets库&#xff0c;可以…

详细解读COB显示屏使用的共阴技术原理

倒装COB显示屏技术中采用的共阴技术是一种旨在提升能效并且减少驱动功耗的LED驱动方式&#xff0c;常规LED显示屏一般采用共阳极或者独立驱动的方式&#xff0c;而共阴技术就有所不同了&#xff0c;其基本原理如下&#xff1a; 一、基本概念&#xff1a;   共阴技术是指在LED…

AGI 之 【Hugging Face】 的【文本分类】的[数据集][文本转换成词元]的简单整理

AGI 之 【Hugging Face】 的【文本分类】的[数据集][文本转换成词元]的简单整理 目录 AGI 之 【Hugging Face】 的【文本分类】的[数据集][文本转换成词元]的简单整理 一、简单介绍 二、文本分类 三、数据集 1、Hugging Face Datasets 库 2、如果我的数据集不在Hub上那该…

中国科学院院士丁汉:人形机器人——机器人与人工智能结合的爆发点

工业制造是国民经济的重要支柱&#xff0c;是实现发展升级的国之重器。早在 2002 年&#xff0c;党的十六大就曾提出&#xff0c;坚持以信息化带动工业化&#xff0c;以工业化促进信息化&#xff0c;走出一条科技含量高、经济效益好、资源消耗低、环境污染少、人力资源优势得到…

tableau折线图绘制 - 4

tableau折线图绘制 1. 电影数量变化折线图绘制1.1 折线图及显示标签1.2 排除异常值1.3 修改纵坐标名称1.4 最大值注释 2.电影票房变化折线图2.1 数据类型位置拖拽2.2 折线图绘制2.3 修改标签数据格式 3. 2015年电影数量与票房变化折线图3.1 年份筛选3.2 横轴单位单位设置3.3 单…

TensorFlow系列:第一讲:环境搭建

简介&#xff1a; 这是TensorFlow系列教程&#xff0c;先从实战应用&#xff0c;再到原理分析&#xff0c;讲解如何使用大模型进行图像识别。 下载安装Mini Anaconda&#xff0c;配置环境变量 配置镜像 查看当前conda配置 conda config --show channels增加channel conda …

北斗防爆手持终端在化工厂的安全性能分析

北斗防爆手持终端在化工厂中的应用显著提升了安全性能&#xff0c;其卓越的防爆设计、高精度定位与监控功能、实时通信能力以及多功能集成特性&#xff0c;共同构筑了化工厂安全生产的坚实防线&#xff0c;确保了巡检人员与设备在复杂环境下的安全作业与高效管理。 北斗防爆手持…

分布式一致性算法:Raft学习

分布式一致性算法&#xff1a;Raft学习 1 什么是分布式系统&#xff1f; 分布式系统是由一组通过网络进行通信、为了完成共同的任务而协调工作的计算机节点组成的系统。这些节点可能位于不同的物理位置&#xff0c;但它们协同工作以提供一个统一的计算平台或服务。分布式系统…

SQLite 命令行客户端 + HTA 实现简易UI

SQLite 命令行客户端 HTA 实现简易UI SQLite 客户端.hta目录结构参考资料 仅用于探索可行性&#xff0c;就只实现了 SELECT。 SQLite 客户端.hta <!DOCTYPE html> <html> <head><meta http-equiv"Content-Type" content"text/html; cha…

昇思25天学习打卡营第11天|基于MindSpore的GPT2文本摘要

数据集 准备nlpcc2017摘要数据&#xff0c;内容为新闻正文及其摘要&#xff0c;总计50000个样本。 数据需要预处理&#xff0c;如下 原始数据格式&#xff1a; article: [CLS] article_context [SEP] summary: [CLS] summary_context [SEP] 预处理后的数据格式&#xff1a; […

EHS管理体系,重塑造企业竞争力的关键密码

在当今这个快速发展的时代&#xff0c;企业面临着前所未有的挑战与机遇。随着全球环保意识的普遍觉醒&#xff0c;以及社会各界对企业社会责任的日益关注&#xff0c;EHS&#xff08;环境&#xff0c;健康&#xff0c;安全&#xff09;管理体系成为了企业稳健前行的重要基石。它…

GPU发展史(二):改变游戏规则的3Dfx Voodoo

小伙伴们&#xff0c;大家好呀&#xff0c;我是老猫。 在上一篇GPU发展史&#xff08;一&#xff09;文章中&#xff0c;我们介绍了1976-1995期间早期显卡的发展故事&#xff0c;今天我们将介绍在1995-1999年这段时间显卡的故事&#xff0c;而这段故事的主角就是——3Dfx 提起…