redis 实现延迟队列及其他实现延迟队列

news2024/9/28 9:19:11

1、 前言

1.1、什么是延迟队列?

延时队列相比于普通队列最大的区别就体现在其延时的属性上,普通队列的元素是先进先出,按入队顺序进行处理,而延时队列中的元素在入队时会指定一个延迟时间,表示其希望能够在经过该指定时间后处理。从某种意义上来讲,延迟队列的结构并不像一个队列,而更像是一种以时间为权重的有序堆结构。

1.2、应用场景

我们在一些业务场景中,经常会遇到一些需要经历一段时间后,或者到达某个时间节点才会执行的功能。就比如以下这些场景:
新建一个订单,在规定时间内未支付需要自动取消
外卖或者打车在预计时间到达的前十分钟提醒骑手或者司机即将超时
快递收货后在规定时间内用户没有确认收货会自动确认收货
预定的会议在会议开始前十分钟会去提醒你尽快加入会议
每日周报在截止半小时前会提醒你尽快提交

1.3、为什么要使用延迟队列

对于一些数据量小并且对数据的时效性不怎么要求的项目来说,最简单有效的方法就是写一个定时任务去扫描数据库以达到业务的实现。当然,如果在数据达到数百万或者千万级别的时候,如果去定时扫描数据库,容易挨揍哈。想信大家也有所了解,当数据达到这种地步的时候,还去定时扫表会非常低效,甚至对于那些定时间隔比较小的情景来说,这一遍还没扫完下一遍就要开始了。这时候如果用延迟队列的话或许会很有效。

实现延迟队列的几种途径
Quartz 定时任务
DelayQueue 延迟队列
Redis sorted set
Redis 过期键监听回调
RabbitMQ死信队列
RabbitMQ基于插件实现延迟队列
wheel时间轮算法

2、Redis sorted set

在Redis中,zet作为有序集合,可以利用其有序的特性,将任务添加到zset中,将任务的到期时间作为score,利用zset的默认有序特性,获取score值最小的元素(也就是最近到期的任务),判断系统时间与该任务的到期时间大小,如果达到到期时间,就执行业务,并删除该到期任务,继续判断下一个元素,如果没有到期,就sleep一段时间(比如1秒),如果集合为空,也sleep一段时间。

在这里插入图片描述

通过zadd命令向队列delayqueue中添加元素,并设置score值表示元素过期的时间;向delayqueue添加三个order1、order2、order3,分别是10秒、20秒、30秒后过期。

zadd delayqueue 3 order3
消费端轮询队列delayqueue,将元素排序后取最小时间与当前时间比对,如小于当前时间代表已经过期移除key。

/**
 * 消费消息
 */
public void pollOrderQueue() {

    while (true) {
        Set<Tuple> set = jedis.zrangeWithScores(DELAY_QUEUE, 0, 0);
        String value = ((Tuple) set.toArray()[0]).getElement();
        int score = (int) ((Tuple) set.toArray()[0]).getScore();
        Calendar cal = Calendar.getInstance();
        int nowSecond = (int) (cal.getTimeInMillis() / 1000);
        if (nowSecond >= score) {
            jedis.zrem(DELAY_QUEUE, value);
            System.out.println(sdf.format(new Date()) + " removed key:" + value);
        }
        if (jedis.zcard(DELAY_QUEUE) <= 0) {
            System.out.println(sdf.format(new Date()) + " zset empty ");
            return;
        }
        Thread.sleep(1000);
    }
} 

我们看到执行结果符合预期:

2020-05-07 13:24:09 add finished.
2020-05-07 13:24:19 removed key:order1
2020-05-07 13:24:29 removed key:order2
2020-05-07 13:24:39 removed key:order3
2020-05-07 13:24:39 zset empty 

3、Redis 过期键监听回调

Redis的key过期回调事件,也能达到延迟队列的效果,简单来说我们开启监听key是否过期的事件,一旦key过期会触发一个callback事件。
修改redis.conf文件开启notify-keyspace-events Ex。
notify-keyspace-events Ex

Redis监听配置,注入Bean RedisMessageListenerContainer。
其次,配置redis监听器 最后,编写redis key过期监听回调方法

@Configuration
public class RedisListenerConfig {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
         return container;
    }
} 

编写Redis过期回调监听方法,必须继承KeyExpirationEventMessageListener ,有点类似于MQ的消息监听。

@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {

public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
    super(listenerContainer);
}

@Override
public void onMessage(Message message, byte[] pattern) {
      String expiredKey = message.toString();
      System.out.println("监听到key:" + expiredKey + "已过期");
    }
} 

到这代码就编写完成,非常的简单,接下来测试一下效果,在redis-cli客户端添加一个key并给定3s的过期时间。
set xiaofu 123 ex 3
在控制台成功监听到了这个过期的key。
监听到过期的key为:xiaofu

4、Quartz定时任务

Quartz一款非常经典任务调度框架,在Redis、RabbitMQ还未广泛应用时,超时未支付取消订单功能都是由定时任务实现的。
导入Quartz依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-quartz</artifactId>
</dependency>


>在启动类中使用@EnableScheduling注解开启定时任务功能。
```java
@SpringBootApplication
@EnableScheduling
public class DelayQueueApplication {
    public static void main(String[] args) {
        SpringApplication.run(DelayQueueApplication.class, args);
    }
}

编写定时任务

@Slf4j
@Component
public class QuartzDemo {
    /**
     * 每隔五秒开启一次任务
     */
    @Scheduled(cron = "0/5 * * * * ? ")
    public void process(){
        log.info("--------------定时任务测试--------------");
    }
}

5、DelayQueue 延迟队列

JDK中提供了一组实现延迟队列的API,位于Java.util.concurrent包下DelayQueue。
DelayQueue是一个BlockingQueue(无界阻塞)队列,它本质就是封装了一个PriorityQueue(优先队列),PriorityQueue内部使用完全二叉堆(不知道的自行了解哈)来实现队列元素排序,我们在向DelayQueue队列中添加元素时,会给元素一个Delay(延迟时间)作为排序条件,队列中最小的元素会优先放在队首。队列中的元素只有到了Delay时间才允许从队列中取出。队列中可以放基本数据类型或自定义实体类,在存放基本数据类型时,优先队列中元素默认升序排列,自定义实体类就需要我们根据类属性值比较计算了。
先简单实现一下看看效果,添加三个order入队DelayQueue,分别设置订单在当前时间的5秒、10秒、15秒后取消。

要实现DelayQueue延时队列,队中元素要implements Delayed 接口,这哥接口里只有一个getDelay方法,用于设置延期时间。Order类中compareTo方法负责对队列中的元素进行排序。

public class Order implements Delayed {
/**
 * 延迟时间
 */
@JsonFormat(locale = "zh", timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
private long time;
String name;

public Order(String name, long time, TimeUnit unit) {
    this.name = name;
    this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
}

@Override
public long getDelay(TimeUnit unit) {
    return time - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
    Order Order = (Order) o;
    long diff = this.time - Order.time;
    if (diff <= 0) {
        return -1;
    } else {
        return 1;
    }
}
} 

DelayQueue的put方法是线程安全的,因为put方法内部使用了ReentrantLock锁进行线程同步。DelayQueue还提供了两种出队的方法poll()和take() , poll()为非阻塞获取,没有到期的元素直接返回null;take()阻塞方式获取,没有到期的元素线程将会等待。

public class DelayQueueDemo {

public static void main(String[] args) throws InterruptedException {
    Order Order1 = new Order("Order1", 5, TimeUnit.SECONDS);
    Order Order2 = new Order("Order2", 10, TimeUnit.SECONDS);
    Order Order3 = new Order("Order3", 15, TimeUnit.SECONDS);
    DelayQueue<Order> delayQueue = new DelayQueue<>();
    delayQueue.put(Order1);
    delayQueue.put(Order2);
    delayQueue.put(Order3);

    System.out.println("订单延迟队列开始时间:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
    while (delayQueue.size() != 0) {
        /**
         * 取队列头部元素是否过期
         */
        Order task = delayQueue.poll();
        if (task != null) {
            System.out.format("订单:{%s}被取消, 取消时间:{%s}\n", task.name,  
            LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        }
        Thread.sleep(1000);
    }
}
} 

上边只是简单的实现入队与出队的操作,实际开发中会有专门的线程,负责消息的入队与消费。
执行后看到结果如下,Order1、Order2、Order3 分别在 5秒、10秒、15秒后被执行,至此就用DelayQueue实现了延时队列。
订单延迟队列开始时间:2020-05-06 14:59:09
订单:{Order1}被取消, 取消时间:{2020-05-06 14:59:14}
订单:{Order2}被取消, 取消时间:{2020-05-06 14:59:19}
订单:{Order3}被取消, 取消时间:{2020-05-06 14:59:24}

6、 RabbitMQ 延时队列

利用RabbitMQ做延时队列是比较常见的一种方式,而实际上RabbitMQ自身并没有直接支持提供延迟队列功能,而是通过 RabbitMQ 消息队列的 TTL和 DXL这两个属性间接实现的。
先来认识一下 TTL和 DXL两个概念:
Time To Live(TTL):
TTL 顾名思义:指的是消息的存活时间,RabbitMQ可以通过x-message-tt参数来设置指定Queue(队列)和 Message(消息)上消息的存活时间,它的值是一个非负整数,单位为微秒。
RabbitMQ 可以从两种维度设置消息过期时间,分别是队列和消息本身:
设置队列过期时间,那么队列中所有消息都具有相同的过期时间。
设置消息过期时间,对队列中的某一条消息设置过期时间,每条消息TTL都可以不同。
如果同时设置队列和队列中消息的TTL,则TTL值以两者中较小的值为准。而队列中的消息存在队列中的时间,一旦超过TTL过期时间则成为Dead Letter(死信)。
Dead Letter Exchanges(DLX)

总结
为了让大家更容易理解,上边的代码写的都比较简单粗糙,几种实现方式的demo已经都提交到GitHub 地址:https://github.com/chengxy-nds/delayqueue,感兴趣的小伙伴可以下载跑一跑。
可能写的有不够完善的地方,如哪里有错误或者不明了的,欢迎大家踊跃指正!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/349503.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

虹科新品|HK-Edgility敏捷边缘计算套件

敏捷、经济高效且易于管理大规模IT服务基础设施 业务动态和数字化转型将业务资源和业务用户都推到了传统 IT 边界之外。业务资源和服务被虚拟化并迁移到云端。随着数字工作空间的发展和业务用户需要随时随地访问业务资源&#xff0c;越来越多的业务资源被推向网络边缘。 由于…

研报精选230216

目录 【行业230216东吴证券】环保行业月报&#xff1a;2023M1环卫新能源渗透率大增至11.91%&#xff0c;上海地区渗透率高达77%【行业230216国元证券】国元新食饮&#xff1a;一图君&#xff1a;22年白酒产量&#xff1a;同降6.2%【行业230216浙商证券】农林牧渔点评报告&#…

认识数据库

今天为大家带来数据库的知识 &#x1f680;1.什么是数据库 &#x1f680;2.数据库的分类 &#x1f680;3.数据库的存储 1.首先来聊一聊MySQL数据库的定义 已经学习了数据结构 数据结构是一门研究数据如何存储的学科&#xff0c;当数据是少量的时候&#xff0c;用数据结构…

在vue2使用百度脑图的kityminder-core进行二次开发思维导图,给节点绑定数据后添加新的图标

需求说明&#xff1a;在给某个节点绑定文件数据后&#xff0c;用户并不能一眼看出哪个节点上绑定了数据&#xff0c;因此需要在绑定文件数据后给节点上加一个图标用于标识。 添加图标 1、在kityminder-core/src/module/file.js文件中添加代码 &#xff08;file.js文件如何添加…

ChatGPT国内镜像站初体验:聊天、Python代码生成等

ChatGPT国内镜像站试用&#xff0c;聊天、Python代码生成。 (本文获得CSDN质量评分【92】)【学习的细节是欢悦的历程】Python 官网&#xff1a;https://www.python.org/ Free&#xff1a;大咖免费“圣经”教程《 python 完全自学教程》&#xff0c;不仅仅是基础那么简单…… …

基于离散时间频率增益传感器的P级至M级PMU模型的实现(Matlab代码实现)

&#x1f468;‍&#x1f393;个人主页&#xff1a;研学社的博客&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5;&#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密…

[足式机器人]Part3机构运动微分几何学分析与综合Ch01-4 平面运动微分几何学——【读书笔记】

本文仅供学习使用 本文参考&#xff1a; 《机构运动微分几何学分析与综合》-王德伦、汪伟 《微分几何》吴大任 Ch01-4 平面运动微分几何学1.2.3-2 点轨迹的Euler-Savary公式1.2.4 高阶曲率理论1.2.3-2 点轨迹的Euler-Savary公式 例1-7&#xff1a; 平面曲柄摇杆机构的 Euler-Sa…

VS中scanf为什么会报错

简单来讲&#xff0c;scanf会强行输入内容&#xff0c;所以是不安全的。 我们来看这样的例子&#xff1a; #include <iostream> using namespace std;int main() {char demo[3];scanf("%s", demo);return 0; }对于以上代码&#xff0c;当我们输入字符不超过3时…

联合身份验证与Cognito

Hello大家好&#xff0c;我们接下来讨论AWS联合身份验证的内容。 AWS联合身份验证 对于考试&#xff0c;联合身份验证部分是一块非常重要的内容。那什么是联合身份验证&#xff0c;它是做什么用的呢&#xff1f; 联合身份验证&#xff0c;是用来允许AWS外部用户&#xff0c;如…

vue2中使用 Tinymce 5.1.0使用过程举例

一、背景 vue-cli 版本 &#xff1a;vue/cli 4.5.15 查看脚手架版本的方法&#xff1a;Win R 打开运行输入&#xff1a; cmd &#xff0c;打开控制台输入 vue -V vue 版本&#xff1a;"vue": "^2.6.11", 二、安装 Tinymce 1、 --no-fund 是因为提示你…

C语言(表达式,语句和副作用和序列号)

目录 一.表达式 二.语句 三.副作用和序列点 一.表达式 由运算符和运算对象组合。最简单得表达式是一个单独的运算对象&#xff0c;以次为基础可以建立复杂的表达式 4 421 a*&#xff08;b c/d)/20 运算对象可以是常量&#xff0c;变量或二者得组合。一些表达式由子表达…

python--石头剪刀布游戏(列表)

本使用了下面几篇文章的知识&#xff1a; python(8)--列表初阶使用_码银的博客-CSDN博客 python(7)--if语句_码银的博客-CSDN博客 一、学习目标 利用列表实现石头剪刀布游戏 二、实验环境 Pycharm社区版、win11 三、代码 先贴代码&#xff0c;有需要的直接拿&#xff0c;想要进…

Hive提升篇-Hive修改事务

简介 Hive 默认是不允许数据更新操作的&#xff0c;毕竟它不擅长&#xff0c;即使在0.14版本后&#xff0c;做一些额外的配置便可开启Hive数据更新操作。而在海量数据场景下做update、delete之类的行级数据操作时&#xff0c;效率并不如意。 简单使用 修改HIVE_HOME/conf/hi…

JS逆向寻找生成bid变量的加密算法,一顿操作猛如虎,结果发现原来是混淆代码

分享一下最近我JS逆向的心得。 我最近使用Python爬取某个网站某个链接&#xff0c;cookie必须加入qgqp_b_id参数才能获取数据。 这个参数是一个32位字符串&#xff0c;通过浏览器的开发者工具分析网页源代码&#xff0c;了解到这个qgqp_b_id变量不是服务器返回给客户端的&…

Whids:一款针对Windows操作系统的开源EDR

关于Whids Whids是一款针对Windows操作系统的开源EDR&#xff0c;该工具所实现的检测引擎基于先前的 Gene项目构建&#xff0c;并专门设计可以根据用户定义的规则匹配Windows事件。 功能特性 1、为社区提供一款功能强大且开源的Windows EDR&#xff1b; 2、支持检测规则透明化…

有了java基础,迅速学完Python并做了一份笔记

面向过程Python简介Python和Java的解释方式对比Java&#xff1a;源代码 -> 编译成class -> Jvm解释运行Python&#xff1a;源代码 -> Python解释器解释运行我经常和身边的Java开发者开玩笑说&#xff1a;“Java真变态&#xff0c;别的语言都是要么直接编译要么直接解释…

无线蓝牙耳机哪个好用?好用的无线蓝牙耳机推荐

随着苹果取消3.5mm耳机孔&#xff0c;近几年蓝牙耳机便逐渐取代有线耳机以强势的姿态闯入人们的日常生活。听歌、游戏、运动等&#xff0c;使用蓝牙耳机的人越来越多。经常看到有人问&#xff0c;无线蓝牙耳机哪个好用&#xff1f;针对这个问题&#xff0c;我来给大家推荐几款好…

消息称索尼计划为PS5推出两款蓝牙耳机,Find My蓝牙耳机用途广

根据国外科技媒体 Insider Gaming 报道&#xff0c;索尼计划进一步丰富 PlayStation 5 的配件生态&#xff0c;将会推出两款耳机&#xff0c;一款采用类似于 AirPods 的 TWS 设计&#xff0c;另一款则是无线头戴式耳机。 消息称 TWS 耳机的内部代号为“Project Nomad”&#…

Debug分支在什么场景下使用?怎样创建Debug分支?

在项目的正常开发过程中&#xff0c;之前发布过的版本可能很会出bug&#xff0c;这时就需要停下来现在的开发任务&#xff0c;先去修改bug&#xff0c;完成后再回来继续开发任务。git中stash提供了保存现场的功能&#xff0c;可以把当前工作区、暂存区中的内容不需要提交而保存…

Elasticsearch:Text vs. Keyword - 它们之间的差异以及它们的行为方式

很多刚开始学习 Elasticsearch 的人经常会混淆 text 和 keyword 字段数据类型。 它们之间的区别很简单&#xff0c;但非常关键。 在本文中&#xff0c;我将讨论两者之间的区别、如何使用它们、它们的行为方式以及使用哪一种。 区别 它们之间的关键区别在于&#xff0c;Elastic…