你可见过如此细致的延时任务详解

news2024/11/19 15:27:21

概述

延时任务相信大家都不陌生,在现实的业务中应用场景可以说是比比皆是。例如订单下单 15 分钟未支付直接取消,外卖超时自动赔付等等。这些情况下,我们该怎么设计我们的服务的实现呢?

笨一点的方法自然是定时任务去数据库进行轮询。但是当业务量较大,事件处理比较费时的时候,我们的系统和数据库往往会面临巨大的压力,如果采用这种方式或许会导致数据库和系统的崩溃。那么有什么好办法吗?今天我来为大家介绍几种实现延时任务的办法。

JAVA DelayQueue

你没看错,java 内部有内置延时队列,位于 java concurrent 包内。

DelayQueue是一个 jdk 中自带的延时队列实现,他的实现依赖于可重入锁ReentrantLock以及条件锁Condition和优先队列PriorityQueue。而且本质上他也是一个阻塞队列。那么他是如何实现延时效果的呢。

DelayQueue 的实现原理

首先DelayQueue队列中的元素必须继承一个接口叫做Delayed,我们找到这个类

    public interface Delayed extends Comparable<Delayed> {        long getDelay(TimeUnit unit);    }

复制代码

发现这个类内部定义了一个返回值为long的方法getDelay,这个方法用来定义队列中的元素的过期时间,所有需要放在队列中的元素,必须实现这个方法。

然后我们来看看延迟队列的队列是如何操作的,我们就拿最典型的offertake来看:

    public boolean offer(E e) {        final ReentrantLock lock = this.lock;        lock.lock();        try {            q.offer(e);            if (q.peek() == e) {                leader = null;                available.signal();            }            return true;        } finally {            lock.unlock();        }    }

复制代码

offer操作平平无奇,甚至直接调用到了优先队列的 offer 来将队列根据延时进行排序,只不过加了个锁,做了些数据的调整,没有什么深入的地方,但是take的实现看上去就很复杂了。(注意,Dalayed 继承了Comparable方法,所以是可以直接用优先队列来排序的,只要你自己实现了compareTo方法)我尝试加了些注释让各位看得更明白些:

    public E take() throws InterruptedException {        final ReentrantLock lock = this.lock;        lock.lockInterruptibly();        try {            // 自选操作            for (;;) {                // 获取队列第一个元素,如果队列为空                // 阻塞住直到有新元素加入队列,offer等方法调用signal唤醒线程                E first = q.peek();                if (first == null)                    available.await();                else {                    // 如果队列中有元素                    long delay = first.getDelay(NANOSECONDS);                    // 判断延时时间,如果到时间了,直接取出数据并return                    if (delay <= 0)                        return q.poll();                    first = null;                    // 如果leader为空则阻塞                    if (leader != null)                        available.await();                    else {                        // 获取当前线程                        Thread thisThread = Thread.currentThread();                        // 设置leader为当前线程                        leader = thisThread;                        try {                            // 阻塞延时时间                            available.awaitNanos(delay);                        } finally {                            if (leader == thisThread)                                leader = null;                        }                    }                }            }        } finally {            if (leader == null && q.peek() != null)                available.signal();            lock.unlock();        }    }

复制代码

我们可以看到take的实现依靠了无限自旋,直到第一个队列元素过了超时时间后才会返回,否则等待他的只有被阻塞。

DelayQueue 实现延时队列的优缺点

看了源码后,我们应该对DelayQueue的实现有了一个大致的了解,也对他的优缺点有了一定的理解。他的优点很明显:

  1. java 原生支持,不需要引入第三方工具

  2. 线程安全,即插即用使用方便

但是他的缺点也是很明显的:

  1. 不支持分布式,并且数据放在内存中,没有持久化的支持,服务宕机会丢失数据

  2. 插入时使用的是优先队列的排序,时间复杂度较高,并且对于队列中的任务不能很好的管理

所以有没有更好的延时队列的实现呢,我们继续看下去~

时间轮算法

时间轮算法是一个被设计出来处理延时任务的算法,现实中的应用可以在kafka以及netty等项目中找到类似的实现。

时间轮的具体实现

所谓时间轮,顾名思义,他是一个类似于时钟的结构,即他的主结构是一个环形数组,如图:

环形数组中存放的是一个一个的链表,链表中存放着需要执行的任务,我们设定好数组中执行的间隔,假设我们的环形数组的长度是 60,每个数组的执行间隔为 1s,那么我们会在每过 1s 就会执行数组下一个元素中的链表中的元素。如果只是这样,那么我们将无法处理 60 秒之外的延时任务,这显然不合适,所以我们会在每个任务中加上一个参数圈数,来表明任务会在几圈后执行。假如我们有一个任务是在 150s 后执行,那么他应该在 30s 的位置,同时圈数应该为 2。我们每次执行一个链表中的任务的时候会把当圈需要执行的任务取出执行,然后把他从链表中删除,如果任务不是当圈执行,则修改他的圈数,将圈数减 1,于是一个简单的时间轮出炉了。

那么这样的时间轮有什么优缺点呢?

先来说优点吧:

  1. 相比DelayQueue来说,时间轮的插入更加的高效,时间复杂度为 O(1)

  2. 实现简单清晰,任务调度更加方便合理

当然他的缺点也不少:

  1. 他和DelayQueue一样不支持分布式,并且数据放在内存中,没有持久化的支持,服务宕机会丢失数据

  2. 数组间的间隔设置会影响任务的精度

  3. 由于不同圈数的任务会在同一个链表中,执行到每个数组元素时需要遍历所有的链表数据,效率会很低

进阶优化版时间轮算法

刚才提到了一些时间轮算法的缺点,那么是不是有一些方法来进行下优化?这里我来介绍一下时间轮的优化版本。

之前我们提到不同圈数的任务会在同一个链表中被重复遍历影响效率,这种情况下我们可以进行如下优化:将时间轮进行分层

我们可以看到图中,我们采用了多层级的设计,上图中分了三层,每层都是 60 格,第一个轮盘中的间隔为 1 小时,我们的数据每一次都是插入到这个轮盘中,每当这个轮盘经过一个小时后来到下一个刻度,就会取出其中的所有元素,按照延迟时间放入到第二个象征着分钟的轮盘中,以此类推。

这样的实现好处可以说是显而易见的:

  1. 首先避免了当时间跨度较大时空间的浪费

  2. 每一次到达刻度的时候我们不用再像以前那样遍历链表取出需要的数据,而是可以一次性全部拿出来,大大节约了操作的时间

时间轮算法的应用

时间轮算法可能在之前大家没有听说过,但是他在各个地方都有着不小的作用。linux 的定时器的实现中就有时间轮的身影,同样如果你是一个喜好看源码的读者,你也可能会在kafka以及netty中找到他的实现。

kafka

kafka 中应用了时间轮算法,他的实现和之前提到的进阶版时间轮没有太大的区别,只有在一点上:kafka内部实现的时间轮应用到了DelayQueue

    @nonthreadsafe    private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {
    private[this] val interval = tickMs * wheelSize    private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }
    private[this] var currentTime = startMs - (startMs % tickMs)        @volatile private[this] var overflowWheel: TimingWheel = null
    private[this] def addOverflowWheel(): Unit = {        synchronized {        if (overflowWheel == null) {            overflowWheel = new TimingWheel(            tickMs = interval,            wheelSize = wheelSize,            startMs = currentTime,            taskCounter = taskCounter,            queue            )        }        }    }
    def add(timerTaskEntry: TimerTaskEntry): Boolean = {        val expiration = timerTaskEntry.expirationMs
        if (timerTaskEntry.cancelled) {        false        } else if (expiration < currentTime + tickMs) {        false        } else if (expiration < currentTime + interval) {        val virtualId = expiration / tickMs        val bucket = buckets((virtualId % wheelSize.toLong).toInt)        bucket.add(timerTaskEntry)
        if (bucket.setExpiration(virtualId * tickMs)) {            queue.offer(bucket)        }        true        } else {        if (overflowWheel == null) addOverflowWheel()        overflowWheel.add(timerTaskEntry)        }    }
    def advanceClock(timeMs: Long): Unit = {        if (timeMs >= currentTime + tickMs) {        currentTime = timeMs - (timeMs % tickMs)
        if (overflowWheel != null) overflowWheel.advanceClock(currentTime)        }    }    }

复制代码

上面是 kafka 内部的实现(使用的语言是 scala),我们可以看到实现非常的简洁,并且使用到了DelayQueue。我们刚才已经讨论过了DelayQueue的优缺点,查看源码后我们已经可以有一个大致的结论了:DelayQueue在 kafka 的时间轮中的作用是负责推进任务的,为的就是防止在时间轮中由于任务比较稀疏而造成的"空推进"。DelayQueue的触发机制可以很好的避免这一点,同时由于DelayQueue的插入效率较低,所以仅用于底层的推进,任务的插入由时间轮来操作,两者配置,可以实现效率和资源的平衡。

netty

netty的内部也有时间轮的实现HashedWheelTimer

HashedWheelTimer的实现要比 kafka 内部的实现复杂许多,和 kafka 不同的是,它的内部推进不是依靠的DelayQueue而是自己实现了一套,源码太长,有兴趣的读者可以自己去看一下。

小结

时间轮说了这么多,我们可以看到他的效率是很出众的,但是还是有这么一个问题:他不支持分布式。当我们的业务很复杂,需要分布式的时候,时间轮显得力不从心,那么这个时候有什么好一点的延时队列的选择呢?我们或许可以尝试使用第三方的工具

redis 延时队列

其实啊说起延时,我们如果常用redis的话,就会想起redis是存在过期机制的,那么我们是否可以利用这个机制来实现一个延时队列呢?

redis自带 key 的过期机制,而且可以设置过期后的回调方法。基于此特性,我们可以非常容易就完成一个延时队列,任务进来时,设定定时时间,并且配置好过期回调方法即可。

除了使用redis的过期机制之外,我们也可以利用它自带的zset来实现延时队列。zset支持高性能的排序,因此我们任务进来时可以将时间戳作为排序的依据,以此将任务的执行先后进行有序的排列,这样也能实现延时队列。

zset实现延时队列的好处:

  1. 支持高性能排序

  2. redis本身的高可用和高性能以及持久性

mq 延时队列

rocketmq 延时消息

rocketmq天然支持延时消息,他的延时消息分为 18 个等级,每个等级对应不同的延时时间。

那么他的原理是怎样的呢?

rocketmqbroker收到消息后会将消息写入commitlog,并且判断这个消息是否是延时消息(即 delay 属性是否大于 0),之后如果判断确实是延时消息,那么他不会马上写入,而是通过转发的方式将消息放入对应的延时topic(18 个延时级别对应 18 个topic

rocketmq会有一个定时任务进行轮询,如果任务的延迟时间已经到了就发往指定的topic

这个设计比较的简单粗暴,但是缺点也十分明显:

  1. 延时是固定的,如果想要的延迟超出 18 个级别就没办法实现

  2. 无法实现精准延时,队列的堆积等等情况也会导致执行产生误差

rocketmq 的精准延时消息

rocketmq本身是不支持的精确延迟的,他的商业版本ons倒是支持。不过rocketmq的社区中有相应的解决方案。方案是借助于时间轮算法来实现的,感兴趣的朋友可以自行去社区查看。(社区中的一些未被合并的 pr 是不错的实现参考)

总结

延时队列的实现千千万,但是如果要在生产中大规模使用,那么大部分情况下其实都避不开时间轮算法。改进过的时间轮算法可以做到精准延时,持久化,高性能,高可用性,可谓是完美。但是话又说回来,其他的延时方式就无用了吗?其实不是的,所有的方式都是需要匹配自己的使用场景。如果你是极少量数据的轮询,那么定时轮询数据库或许才是最佳的解决方案,而不是无脑的引入复杂的延时队列。如果是单机的任务,那么 jdk 的延时队列也是不错的选择。

本文介绍的这些延时队列只是为了向大家展示他们的原理和优缺点,具体的使用还需要结合自己业务的场景。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/29424.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

华为机试 - 滑动窗口最大和

目录 题目描述 输入描述 输出描述 用例 题目解析 算法源码 题目描述 有一个N个整数的数组&#xff0c;和一个长度为M的窗口&#xff0c;窗口从数组内的第一个数开始滑动直到窗口不能滑动为止&#xff0c; 每次窗口滑动产生一个窗口和&#xff08;窗口内所有数的和&…

常用的框架技术-09 Spring Security Spring的企业应用系统提供声明式的安全访问控制解决方案的安全框架

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录1.Spring Security简介1.1 Spring Security概述1.2 Spring Security历史发展1.3 产品的对比1.3.1 Spring Security1.3.2 Shiro1.4 Spring Security 核心类1.4.1 Auth…

qemu 线程 vhost

[rootlocalhost cloud_images]# lsmod | grep vhost_net vhost_net 262144 0 vhost 262144 1 vhost_net tap 262144 1 vhost_net tun 262144 2 vhost_net [rootlocalhost cloud_images]#vhost-net网卡的…

[附源码]SSM计算机毕业设计基于实时定位的超市配送业务管理JAVA

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

低碳世界杂志低碳世界杂志社低碳世界编辑部2022年第7期目录

节能环保 挥发性有机物的全厂控制措施 董少军; 1-3 《低碳世界》投稿&#xff1a;cnqikantg126.com 佛山市市政排水管网通沟污泥处理处置工艺设计 张红; 4-6 “双碳”背景下海岸带地区适应气候变化评估与对策研究 王鸿浩;邬乐雅;吴晓晨;张丽佳;黄婧蓼琦;胡斐…

【毕业设计】基于情感分析的网络舆情热点分析系统

文章目录0 前言1 课题背景2 数据处理3 文本情感分析3.1 情感分析-词库搭建3.2 文本情感分析实现3.3 建立情感倾向性分析模型4 数据可视化工具4.1 django框架介绍4.2 ECharts5 Django使用echarts进行可视化展示5.1 修改setting.py连接mysql数据库5.2 导入数据5.3 使用echarts可视…

Java编程实战9:统计只差一个字符的子串数目

目录统计只差一个字符的子串数目题目示例 1示例 2示例 3示例 4提示解答解题思路完整代码统计只差一个字符的子串数目 题目 给你两个字符串 s 和 t &#xff0c;请你找出 s 中的非空子串的数目&#xff0c;这些子串满足替换 一个不同字符 以后&#xff0c;是 t 串的子串。换言…

实验1:Arduino的nRF24L01单向收发实验

实验结果: 发送端发送“Hello World”,发送成功打印1 接收端接收到“Hello World”,在串口中打印出“Hello World” OK,直接讲代码 因为我用的Arduino和nRF24L01 是用扩展板连接的,而我的嵌入式硬件开发,也就是AD实在不擅长,就不解释了 其中(9,10)CE,CSN 那么我…

通关算法题之 ⌈数组⌋ 下

二分搜索 704. 二分查找 给定一个 n 个元素有序的&#xff08;升序&#xff09;整型数组 nums 和一个目标值 target &#xff0c;写一个函数搜索 nums 中的 target&#xff0c;如果目标值存在返回下标&#xff0c;否则返回 -1。 输入: nums [-1,0,3,5,9,12], target 9 输出…

【后台技术】异步编程指北,问题和重点

导语&#xff1a;同步、异步&#xff0c;并发、并行、串行&#xff0c;这些名词在我们的开发中会经常遇到&#xff0c;这里对异步编程做一个详细的归纳总结&#xff0c;希望可以对这方面的开发有一些帮助。 内容大纲&#xff1a; 1、几个名词的概念 多任务的时候&#xff0c;…

jmeter压力测试报告

出版社智能智造测试报告 &#xff08;二期版本&#xff09; 2022年11月 目 录 1. 测试背景 1.1. 项目背景 1.2. 测试目的 1.3. 测试时间 1.4. 测试资源 1.5. 参考资料 2. 测试范围 3. 性能需求指标 3.1. 业界指标 4. 测试工具 5. 测试环境 5.1. 阿里云测试环境软…

搭建Gitlab

Gitlab是目前被广泛使用的基于git的开源代码管理平台, 基于Ruby on Rails构建, 主要针对软件开发过程中产生的代码和文档进行管理 一、搭建gitlab服务器&#xff0c;统一管理软件项目 第一步&#xff1a; 创建一个4G内存的虚拟机&#xff0c;否则很容易启动不了&#xff0c;报…

(附源码)计算机毕业设计Java“华商转转”平台的设计和实现

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat8.5 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; Springboot mybatis Maven Vue 等等组成&#xff0c;B/…

python常用进制转换

整数之间转换 # 1. 10 -> 16 hex(number)# 2. 10 -> 2 bin(number)# 3. 10 -> 8 oct(number)# 4. x进制 -> 10 int(Union[str, bytes, bytearray],basex) ------------------ print(int("0x16", base16)) // 22字符串转整数 # 10进制 val int(10) pri…

SPP-学习笔记

Spatial Pyramid Pooling in Deep Convolutional Networks for Visual Recognition SPP提出的原因 1、现有的深度卷积神经网络(spp出现之前的)需要固定大小的输入图像(例如224224)。往往需要对图片裁剪或者resize&#xff0c;导致图片信息损失或者产生几何畸变。这样可能会损…

奥比中光亮相全球1024开发者节,与科大讯飞达成战略合作

作者 | 奥比中光 编辑 | 3D视觉开发者社区 11月17日-23日&#xff0c;第五届世界声博会暨2022科大讯飞全球1024开发者节在安徽合肥举办&#xff0c;奥比中光作为3D视觉感知头部企业参展&#xff0c;并与科大讯飞达成战略合作&#xff0c;共同赋能3D视觉行业应用开发。 本次参…

如何利用现代工具来管理多项目

多项目管理是如今现代企业管理时常常遇到的一个难题。不同于单项目管理&#xff0c;多个项目同时进行管理要复杂得很多。而单纯的手工管理方式已经满足不了多管理的复杂需求&#xff0c;项目负责人想要保障在预定的时间内&#xff0c;又快又好地完成整体项目&#xff0c;便需要…

工厂模式解耦-交由spring来完成

上面两个小节一直在谈论解耦&#xff0c;从入门的多例到升级的单例BeanFactory工厂类是我们自己手工写的。 BeanFactory主要做了3件事&#xff1a; 1.读取配置文件&#xff08;可以是properties或xml类型的文件&#xff0c;示例中用的是properties文件&#xff09; 2.获取类…

OC RSA加密解密

好久好久没有更新了。。。你们等的急不急。。这不&#xff0c;我就姗姗来迟了。。。本文重点讲解一下iOS系统下的RSA加密解密问题。 一般为了安全&#xff0c;私钥是不会给前端暴露出来 的&#xff0c;只会通过私钥生成一个公开的公钥提供给外部对数据进行加密。将加密后的数据…

残差网络ResNet解读

一、残差网络的定义 残差网络的核心是解决增加深度带来的退化问题&#xff0c;这样能够通过单纯增加网络深度来提高网络性能。 残差单元以短连接的形式&#xff0c;将单元的输入直接与单元输出加在一起&#xff0c;然后再进行激活。 Weight为抽取特征的网络层 Addition时xl和…