基于Redis的4种延时队列实现方式

news2025/4/19 17:25:21

延时队列是一种特殊的消息队列,它允许消息在指定的时间后被消费。在微服务架构、电商系统和任务调度场景中,延时队列扮演着关键角色。例如,订单超时自动取消、定时提醒、延时支付等都依赖延时队列实现。

Redis作为高性能的内存数据库,具备原子操作、数据结构丰富和简单易用的特性,本文将介绍基于Redis实现分布式延时队列的四种方式。

1. 基于Sorted Set的延时队列

原理

利用Redis的Sorted Set(有序集合),将消息ID作为member,执行时间戳作为score进行存储。通过ZRANGEBYSCORE命令可以获取到达执行时间的任务。

代码实现

public class RedisZSetDelayQueue {
    private final StringRedisTemplate redisTemplate;
    private final String queueKey = "delay_queue:tasks";
    
    public RedisZSetDelayQueue(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }
    
    /**
     * 添加延时任务
     * @param taskId 任务ID
     * @param taskInfo 任务信息(JSON字符串)
     * @param delayTime 延迟时间(秒)
     */
    public void addTask(String taskId, String taskInfo, long delayTime) {
        // 计算执行时间
        long executeTime = System.currentTimeMillis() + delayTime * 1000;
        
        // 存储任务详情
        redisTemplate.opsForHash().put("delay_queue:details", taskId, taskInfo);
        
        // 添加到延时队列
        redisTemplate.opsForZSet().add(queueKey, taskId, executeTime);
        
        System.out.println("Task added: " + taskId + ", will execute at: " + executeTime);
    }
    
    /**
     * 轮询获取到期任务
     */
    public List<String> pollTasks() {
        long now = System.currentTimeMillis();
        
        // 获取当前时间之前的任务
        Set<String> taskIds = redisTemplate.opsForZSet()
                .rangeByScore(queueKey, 0, now);
        
        if (taskIds == null || taskIds.isEmpty()) {
            return Collections.emptyList();
        }
        
        // 获取任务详情
        List<String> tasks = new ArrayList<>();
        for (String taskId : taskIds) {
            String taskInfo = (String) redisTemplate.opsForHash()
                    .get("delay_queue:details", taskId);
            
            if (taskInfo != null) {
                tasks.add(taskInfo);
                
                // 从集合和详情中移除任务
                redisTemplate.opsForZSet().remove(queueKey, taskId);
                redisTemplate.opsForHash().delete("delay_queue:details", taskId);
            }
        }
        
        return tasks;
    }
    
    // 定时任务示例
    public void startTaskProcessor() {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        scheduler.scheduleAtFixedRate(() -> {
            try {
                List<String> tasks = pollTasks();
                for (String task : tasks) {
                    processTask(task);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, 0, 1, TimeUnit.SECONDS);
    }
    
    private void processTask(String taskInfo) {
        System.out.println("Processing task: " + taskInfo);
        // 实际任务处理逻辑
    }
}

优缺点

优点

  • 实现简单,易于理解
  • 任务按执行时间自动排序
  • 支持精确的时间控制

缺点

  • 需要轮询获取到期任务,消耗CPU资源
  • 大量任务情况下,ZRANGEBYSCORE操作可能影响性能
  • 没有消费确认机制,需要额外实现

2. 基于List + 定时轮询的延时队列

原理

这种方式使用多个List作为存储容器,按延迟时间的不同将任务分配到不同的队列中。通过定时轮询各个队列,将到期任务移动到一个立即执行队列。

代码实现

public class RedisListDelayQueue {
    private final StringRedisTemplate redisTemplate;
    private final String readyQueueKey = "delay_queue:ready";  // 待处理队列
    private final Map<Integer, String> delayQueueKeys;  // 延迟队列,按延时时间分级
    
    public RedisListDelayQueue(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
        
        // 初始化不同延迟级别的队列
        delayQueueKeys = new HashMap<>();
        delayQueueKeys.put(5, "delay_queue:delay_5s");     // 5秒
        delayQueueKeys.put(60, "delay_queue:delay_1m");    // 1分钟
        delayQueueKeys.put(300, "delay_queue:delay_5m");   // 5分钟
        delayQueueKeys.put(1800, "delay_queue:delay_30m"); // 30分钟
    }
    
    /**
     * 添加延时任务
     */
    public void addTask(String taskInfo, int delaySeconds) {
        // 选择合适的延迟队列
        String queueKey = selectDelayQueue(delaySeconds);
        
        // 任务元数据,包含任务信息和执行时间
        long executeTime = System.currentTimeMillis() + delaySeconds * 1000;
        String taskData = executeTime + ":" + taskInfo;
        
        // 添加到延迟队列
        redisTemplate.opsForList().rightPush(queueKey, taskData);
        System.out.println("Task added to " + queueKey + ": " + taskData);
    }
    
    /**
     * 选择合适的延迟队列
     */
    private String selectDelayQueue(int delaySeconds) {
        // 找到最接近的延迟级别
        int closestDelay = delayQueueKeys.keySet().stream()
                .filter(delay -> delay >= delaySeconds)
                .min(Integer::compareTo)
                .orElse(Collections.max(delayQueueKeys.keySet()));
                
        return delayQueueKeys.get(closestDelay);
    }
    
    /**
     * 移动到期任务到待处理队列
     */
    public void moveTasksToReadyQueue() {
        long now = System.currentTimeMillis();
        
        // 遍历所有延迟队列
        for (String queueKey : delayQueueKeys.values()) {
            boolean hasMoreTasks = true;
            
            while (hasMoreTasks) {
                // 查看队列头部任务
                String taskData = redisTemplate.opsForList().index(queueKey, 0);
                if (taskData == null) {
                    hasMoreTasks = false;
                    continue;
                }
                
                // 解析任务执行时间
                long executeTime = Long.parseLong(taskData.split(":", 2)[0]);
                
                // 检查是否到期
                if (executeTime <= now) {
                    // 通过LPOP原子性地移除队列头部任务
                    String task = redisTemplate.opsForList().leftPop(queueKey);
                    
                    // 任务可能被其他进程处理,再次检查
                    if (task != null) {
                        // 提取任务信息并添加到待处理队列
                        String taskInfo = task.split(":", 2)[1];
                        redisTemplate.opsForList().rightPush(readyQueueKey, taskInfo);
                        System.out.println("Task moved to ready queue: " + taskInfo);
                    }
                } else {
                    // 队列头部任务未到期,无需检查后面的任务
                    hasMoreTasks = false;
                }
            }
        }
    }
    
    /**
     * 获取待处理任务
     */
    public String getReadyTask() {
        return redisTemplate.opsForList().leftPop(readyQueueKey);
    }
    
    /**
     * 启动任务处理器
     */
    public void startTaskProcessors() {
        // 定时移动到期任务
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
        
        // 移动任务线程
        scheduler.scheduleAtFixedRate(this::moveTasksToReadyQueue, 0, 1, TimeUnit.SECONDS);
        
        // 处理任务线程
        scheduler.scheduleAtFixedRate(() -> {
            String task = getReadyTask();
            if (task != null) {
                processTask(task);
            }
        }, 0, 100, TimeUnit.MILLISECONDS);
    }
    
    private void processTask(String taskInfo) {
        System.out.println("Processing task: " + taskInfo);
        // 实际任务处理逻辑
    }
}

优缺点

优点

  • 分级队列设计,降低单队列压力
  • 相比Sorted Set占用内存少
  • 支持队列监控和任务优先级

缺点

  • 延迟时间精度受轮询频率影响
  • 实现复杂度高
  • 需要维护多个队列
  • 时间判断和队列操作非原子性,需特别处理并发问题

3. 基于发布/订阅(Pub/Sub)的延时队列

原理

结合Redis发布/订阅功能与本地时间轮算法,实现延迟任务的分发和处理。任务信息存储在Redis中,而时间轮负责任务的调度和发布。

代码实现

public class RedisPubSubDelayQueue {
    private final StringRedisTemplate redisTemplate;
    private final String TASK_TOPIC = "delay_queue:task_channel";
    private final String TASK_HASH = "delay_queue:tasks";
    
    private final HashedWheelTimer timer;
    
    public RedisPubSubDelayQueue(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
        
        // 初始化时间轮,刻度100ms,轮子大小512
        this.timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 512);
        
        // 启动消息订阅
        subscribeTaskChannel();
    }
    
    /**
     * 添加延时任务
     */
    public void addTask(String taskId, String taskInfo, long delaySeconds) {
        // 存储任务信息到Redis
        redisTemplate.opsForHash().put(TASK_HASH, taskId, taskInfo);
        
        // 添加到时间轮
        timer.newTimeout(timeout -> {
            // 发布任务就绪消息
            redisTemplate.convertAndSend(TASK_TOPIC, taskId);
        }, delaySeconds, TimeUnit.SECONDS);
        
        System.out.println("Task scheduled: " + taskId + ", delay: " + delaySeconds + "s");
    }
    
    /**
     * 订阅任务通道
     */
    private void subscribeTaskChannel() {
        redisTemplate.getConnectionFactory().getConnection().subscribe(
            (message, pattern) -> {
                String taskId = new String(message.getBody());
                
                // 获取任务信息
                String taskInfo = (String) redisTemplate.opsForHash().get(TASK_HASH, taskId);
                
                if (taskInfo != null) {
                    // 处理任务
                    processTask(taskId, taskInfo);
                    
                    // 删除任务
                    redisTemplate.opsForHash().delete(TASK_HASH, taskId);
                }
            }, 
            TASK_TOPIC.getBytes()
        );
    }
    
    private void processTask(String taskId, String taskInfo) {
        System.out.println("Processing task: " + taskId + " - " + taskInfo);
        // 实际任务处理逻辑
    }
    
    // 模拟HashedWheelTimer类
    public static class HashedWheelTimer {
        private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        private final long tickDuration;
        private final TimeUnit unit;
        
        public HashedWheelTimer(long tickDuration, TimeUnit unit, int wheelSize) {
            this.tickDuration = tickDuration;
            this.unit = unit;
        }
        
        public void newTimeout(TimerTask task, long delay, TimeUnit timeUnit) {
            long delayMillis = timeUnit.toMillis(delay);
            scheduler.schedule(
                () -> task.run(null), 
                delayMillis, 
                TimeUnit.MILLISECONDS
            );
        }
        
        public interface TimerTask {
            void run(Timeout timeout);
        }
        
        public interface Timeout {
        }
    }
}

优缺点

优点

  • 即时触发,无需轮询
  • 高效的时间轮算法
  • 可以跨应用订阅任务
  • 分离任务调度和执行,降低耦合

缺点

  • 依赖本地时间轮,非纯Redis实现
  • Pub/Sub模式无消息持久化,可能丢失消息
  • 服务重启时需要重建时间轮
  • 订阅者需要保持连接

4. 基于Redis Stream的延时队列

原理

Redis 5.0引入的Stream是一个强大的数据结构,专为消息队列设计。结合Stream的消费组和确认机制,可以构建可靠的延时队列。

代码实现

public class RedisStreamDelayQueue {
    private final StringRedisTemplate redisTemplate;
    private final String delayQueueKey = "delay_queue:stream";
    private final String consumerGroup = "delay_queue_consumers";
    private final String consumerId = UUID.randomUUID().toString();
    
    public RedisStreamDelayQueue(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
        
        // 创建消费者组
        try {
            redisTemplate.execute((RedisCallback<String>) connection -> {
                connection.streamCommands().xGroupCreate(
                    delayQueueKey.getBytes(), 
                    consumerGroup, 
                    ReadOffset.from("0"), 
                    true
                );
                return "OK";
            });
        } catch (Exception e) {
            // 消费者组可能已存在
            System.out.println("Consumer group may already exist: " + e.getMessage());
        }
    }
    
    /**
     * 添加延时任务
     */
    public void addTask(String taskInfo, long delaySeconds) {
        long executeTime = System.currentTimeMillis() + delaySeconds * 1000;
        
        Map<String, Object> task = new HashMap<>();
        task.put("executeTime", String.valueOf(executeTime));
        task.put("taskInfo", taskInfo);
        
        redisTemplate.opsForStream().add(delayQueueKey, task);
        System.out.println("Task added: " + taskInfo + ", execute at: " + executeTime);
    }
    
    /**
     * 获取待执行的任务
     */
    public List<String> pollTasks() {
        long now = System.currentTimeMillis();
        List<String> readyTasks = new ArrayList<>();
        
        // 读取尚未处理的消息
        List<MapRecord<String, Object, Object>> records = redisTemplate.execute(
            (RedisCallback<List<MapRecord<String, Object, Object>>>) connection -> {
                return connection.streamCommands().xReadGroup(
                    consumerGroup.getBytes(),
                    consumerId.getBytes(),
                    StreamReadOptions.empty().count(10),
                    StreamOffset.create(delayQueueKey.getBytes(), ReadOffset.from(">"))
                );
            }
        );
        
        if (records != null) {
            for (MapRecord<String, Object, Object> record : records) {
                String messageId = record.getId().getValue();
                Map<Object, Object> value = record.getValue();
                
                long executeTime = Long.parseLong((String) value.get("executeTime"));
                String taskInfo = (String) value.get("taskInfo");
                
                // 检查任务是否到期
                if (executeTime <= now) {
                    readyTasks.add(taskInfo);
                    
                    // 确认消息已处理
                    redisTemplate.execute((RedisCallback<String>) connection -> {
                        connection.streamCommands().xAck(
                            delayQueueKey.getBytes(),
                            consumerGroup.getBytes(),
                            messageId.getBytes()
                        );
                        return "OK";
                    });
                    
                    // 可选:从流中删除消息
                    redisTemplate.opsForStream().delete(delayQueueKey, messageId);
                } else {
                    // 任务未到期,放回队列
                    redisTemplate.execute((RedisCallback<String>) connection -> {
                        connection.streamCommands().xAck(
                            delayQueueKey.getBytes(),
                            consumerGroup.getBytes(),
                            messageId.getBytes()
                        );
                        return "OK";
                    });
                    
                    // 重新添加任务(可选:使用延迟重新入队策略)
                    Map<String, Object> newTask = new HashMap<>();
                    newTask.put("executeTime", String.valueOf(executeTime));
                    newTask.put("taskInfo", taskInfo);
                    redisTemplate.opsForStream().add(delayQueueKey, newTask);
                }
            }
        }
        
        return readyTasks;
    }
    
    /**
     * 启动任务处理器
     */
    public void startTaskProcessor() {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        scheduler.scheduleAtFixedRate(() -> {
            try {
                List<String> tasks = pollTasks();
                for (String task : tasks) {
                    processTask(task);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, 0, 1, TimeUnit.SECONDS);
    }
    
    private void processTask(String taskInfo) {
        System.out.println("Processing task: " + taskInfo);
        // 实际任务处理逻辑
    }
}

优缺点

优点

  • 支持消费者组和消息确认,提供可靠的消息处理
  • 内置消息持久化机制
  • 支持多消费者并行处理
  • 消息ID包含时间戳,便于排序

缺点

  • 要求Redis 5.0+版本
  • 实现相对复杂
  • 仍需轮询获取到期任务
  • 对未到期任务的处理相对繁琐

性能对比与选型建议

实现方式性能可靠性实现复杂度内存占用适用场景
Sorted Set★★★★☆★★★☆☆任务量适中,需要精确调度
List + 轮询★★★★★★★★☆☆高并发,延时精度要求不高
Pub/Sub + 时间轮★★★★★★★☆☆☆实时性要求高,可容忍服务重启丢失
Stream★★★☆☆★★★★★可靠性要求高,需要消息确认

总结

在实际应用中,可根据系统规模、性能需求、可靠性要求和实现复杂度等因素进行选择,也可以组合多种方式打造更符合业务需求的延时队列解决方案。无论选择哪种实现,都应关注可靠性、性能和监控等方面,确保延时队列在生产环境中稳定运行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2338134.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

时序预测 | Matlab实现基于VMD-WOA-ELM和VMD-ELM变分模态分解结合鲸鱼算法优化极限学习机时间序列预测

时序预测 | Matlab实现基于VMD-WOA-ELM和VMD-ELM变分模态分解结合鲸鱼算法优化极限学习机时间序列预测 目录 时序预测 | Matlab实现基于VMD-WOA-ELM和VMD-ELM变分模态分解结合鲸鱼算法优化极限学习机时间序列预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 1.Matlab…

基于EasyX库开发的球球大作战游戏

目录 球球大作战 一、开发环境 二、流程图预览 三、代码逻辑 1、初始化时间 2、设置开始界面大小 3、设置开始界面 4、让玩家选择速度 5、设置玩家小球、人机小球、食物的属性 6、一次性把图绘制到界面里 7、进入死循环 8、移动玩家小球 9、移动人机 10、食物刷新…

《系统分析师-第三阶段—总结(一)》

背景 采用三遍读书法进行阅读&#xff0c;此阶段是第三遍。 过程 第一章 第二章 总结 在这个过程中&#xff0c;对导图的规范越来越清楚&#xff0c;开始结构化&#xff0c;找关系&#xff0c;找联系。

AI——K近邻算法

文章目录 一、什么是K近邻算法二、KNN算法流程总结三、Scikit-learn工具1、安装2、导入3、简单使用 三、距离度量1、欧式距离2、曼哈顿距离3、切比雪夫距离4、闵可夫斯基距离5、K值的选择6、KD树 一、什么是K近邻算法 如果一个样本在特征空间中的k个最相似&#xff08;即特征空…

用 NLP + Streamlit,把问卷变成能说话的反馈

网罗开发 &#xff08;小红书、快手、视频号同名&#xff09; 大家好&#xff0c;我是 展菲&#xff0c;目前在上市企业从事人工智能项目研发管理工作&#xff0c;平时热衷于分享各种编程领域的软硬技能知识以及前沿技术&#xff0c;包括iOS、前端、Harmony OS、Java、Python等…

TCP/IP和UDP协议的发展历程

TCP/IP和UDP协议的发展历程 引言 互联网的发展史是人类技术创新的辉煌篇章&#xff0c;而在这一发展过程中&#xff0c;通信协议发挥了奠基性的作用。TCP/IP&#xff08;传输控制协议/互联网协议&#xff09;和UDP&#xff08;用户数据报协议&#xff09;作为互联网通信的基础…

Function Calling的时序图(含示例)

&#x1f9cd; 用户&#xff1a; 发起请求&#xff0c;输入 prompt&#xff08;比如&#xff1a;“请告诉我北京的天气”&#xff09;。 &#x1f7ea; 应用&#xff1a; 将用户输入的 prompt 和函数定义&#xff08;包括函数名、参数结构等&#xff09;一起发给 OpenAI。 …

若依框架修改左侧菜单栏默认选中颜色

1.variables.sacc中修改为想要的颜色 2.给目标设置使用的颜色

搜广推校招面经七十八

字节推荐算法 一、实习项目&#xff1a;多任务模型中的每个任务都是做什么&#xff1f;怎么确定每个loss的权重 这个根据实际情况来吧。如果实习时候用了moe&#xff0c;就可能被问到。 loss权重的话&#xff0c;直接根据任务的重要性吧。。。 二、特征重要性怎么判断的&…

广搜bfs-P1443 马的遍历

P1443 马的遍历 题目来源-洛谷 题意 要求马到达棋盘上任意一个点最少要走几步 思路 国际棋盘规则是马的走法是-日字形&#xff0c;也称走马日&#xff0c;即x,y一个是走两步&#xff0c;一个是一步 要求最小步数&#xff0c;所以考虑第一次遍历到的点即为最小步数&#xff…

强化学习算法系列(六):应用最广泛的算法——PPO算法

强化学习算法 &#xff08;一&#xff09;动态规划方法——策略迭代算法(PI)和值迭代算法(VI) &#xff08;二&#xff09;Model-Free类方法——蒙特卡洛算法(MC)和时序差分算法(TD) &#xff08;三&#xff09;基于动作值的算法——Sarsa算法与Q-Learning算法 &#xff08;四…

AI Agents系列之AI代理架构体系

1. 引言 智能体架构是定义智能体组件如何组织和交互的蓝图,使智能体能够感知其环境、推理并采取行动。本质上,它就像是智能体的数字大脑——集成了“眼睛”(传感器)、“大脑”(决策逻辑)和“手”(执行器),用于处理信息并采取行动。 选择正确的架构对于构建有效的智能…

2025海外代理IP测评:Bright Data,ipfoxy,smartproxy,ipipgo,kookeey,ipidea哪个值得推荐?

近年来&#xff0c;随着全球化和跨境业务需求的不断扩大“海外代理IP”逐渐成为企业和个人在多样化场景中的重要工具。无论是进行数据采集、广告验证、社交媒体管理&#xff0c;还是跨境电商平台运营&#xff0c;选择合适的代理IP服务商都显得尤为重要。然而&#xff0c;市场上…

Android守护进程——Vold (Volume Daemon)

简介 介绍&#xff1a;Vold 是用来管理 android 系统的存储设备&#xff0c;如U盘、SD卡、磁盘等移动设备的热插拔、挂载、卸载、格式化 框架结构&#xff1a;Vold 在系统中以守护进程存在&#xff0c;是一个单独的进程。处于Kernel和Framework之间&#xff0c;是两个层级连接…

vue3+vite 实现.env全局配置

首先创建.env文件 VUE_APP_BASE_APIhttp://127.0.0.1/dev-api 然后引入依赖&#xff1a; pnpm install dotenv --save-dev 引入完成后&#xff0c;在vite.config.js配置文件内加入以下内容&#xff1a; const env dotenv.config({ path: ./.env }).parsed define: { // 将…

AI 组件库是什么?如何影响UI的开发?

AI组件库是基于人工智能技术构建的、面向用户界面&#xff08;UI&#xff09;开发的预制模块集合。它们结合了传统UI组件&#xff08;如按钮、表单、图表&#xff09;与AI能力&#xff08;如机器学习、自然语言处理、计算机视觉&#xff09;&#xff0c;旨在简化开发流程并增强…

OpenCV day6

函数内容接上文&#xff1a;OpenCV day4-CSDN博客 , OpenCV day5-CSDN博客 目录 平滑&#xff08;模糊&#xff09; 25.cv2.blur()&#xff1a; 26.cv2.boxFilter(): 27.cv2.GaussianBlur()&#xff1a; 28.cv2.medianBlur(): 29.cv2.bilateralFilter()&#xff1a; 锐…

【AI飞】AutoIT入门七(实战):python操控autoit解决csf视频批量转换(有点难,AI都不会)

背景&#xff1a; 终极目标&#xff1a;通过python调用大模型&#xff0c;获得结果&#xff0c;然后根据返回信息&#xff0c;控制AutoIT操作电脑软件&#xff0c;执行具体工作。让AI更具有执行力。 已完成部分&#xff1a; 关于python调用大模型的&#xff0c;可以参考之前的…

MARA/MARC表 PSTAT字段

最近要开发一个维护物料视图的功能。其中PSTAT字段是来记录已经维护的视图的。这里记录一下视图和其对应的字母。 MARA还有个VPSTA&#xff08;完整状态&#xff09;字段&#xff0c;不过在我试的时候每次PSTAT出现一个它就增加一个&#xff0c;不知道具体是为什么。 最近一直…

学习型组织与系统思考

真正的学习型组织不是只关注个人的学习&#xff0c;而是关注整个系统的学习。—彼得圣吉 在这两年里&#xff0c;越来越多的企业开始询问是否可以将系统思考的内容内化给自己的内训师&#xff0c;进而在公司内部进行教学。我非常理解企业这样做的动机&#xff0c;毕竟内部讲师…