文章目录
- 1. 前言
- 2. 什么是时间轮算法?
- 2.1 单层时间轮
- 2.2 多层时间轮
- 2.2.1 增加轮次的概念
- 2.2.2 多层次时间轮
- 2.3 小结
- 3. 实现案例
- 3.1 Kafka中的时间轮
- 3.1.1 任务的添加
- 3.1.2 时间轮的推进
- SystemTimer
- 3.1.3 小结
1. 前言
计时器对于故障恢复、基于速率的流量控制、调度算法、控制网络中的数据包生命周期至关重要。
而一般计时器的实现维护成本比较高,比如JDK自带的 Timer、DelayQueue对于任务的进出其时间复杂度为O(logN)。
对于要求高性能且需要保证高频繁大量操作任务的优先级框架,比如Kafka、Netty等框架,重排序的时间复杂度O(logN)是不能满足其要求的。而基于一种时间轮的算法可以实现将这种重排序的时间复杂度降为O(1)。
2. 什么是时间轮算法?
算法来自于生活,我们日常看时间使用手表,一个表盘就可以无限的去循环每一天,通过同样的一个表盘不同的指针来指向不同维度的时间(时分秒),日常中如果我们由大量任务需要进行提醒,可以进行备忘与时钟里的时间进行指定按时提醒。
同样的时间轮算法数据结构其实抽象于手表时钟,时间轮是用环形数组抽象表盘,数组里面的每一个元素就是一个bucket(刻度之间的间隔,也可以指代时间的精度)。bucket内部用双向链表存这待执行的任务,此时添加和删除的链表操作时间复杂度都是o(1)。
2.1 单层时间轮
从图中可以看到此时指针指向的是第一个bucket,一共有八个bucket0~7,假设bucket的时间单位为 1 秒,现在要加入一个延时 6秒的任务,计算方式就是 6 % 8 = 6,即放在下标为6 的那个bucket中,具体的操作只要直接添加到bucket双向链表的tail尾部就行了。
2.2 多层时间轮
比如当我们加一个延迟9s后执行的任务,此时超出表盘的范围时,如何解决呢?同样借鉴于表盘中循环以及多个指针代表不同时间维度的思想,时间轮算法有两种解决方案。
2.2.1 增加轮次的概念
延迟9s任务存放的bucket 下标 = 9%8 = 1,轮数记为 9/8 = 1。
意思就是当循环1轮后,指针指向下表为1的bucket就会触发这个任务。Netty 中的 HashedWheelTimer 使用的就是这种方式。
2.2.2 多层次时间轮
这种概念,就和我们手表里的时分秒不同指针代表不同维度时间概念一样了,只不过这里是分层的设计。
实现的方式如同手表里的指针转动,当秒针走一圈,分针走一格,分针走一圈,时针走一格。
这里三层的时间轮,一共是38=24格bucket, 最多可以延迟88*8=512秒。
多层时间轮,任务存放的位置还会随着时间进行降层变动,比如一个延迟65秒的任务,刚才是放在第三层,时间过了1s后,此时只需要64s就会执行,那么这个任务就会被降层到第二层,随着时间的不断的进行,这个任务最终会降层到第一层等待执行。
为什么要进行降层操作呢? 这是为了保证时间精度的一致性,Kakfa内部用的就是多层次时间轮算法。
2.3 小结
时间轮是一种实现延迟功能(定时器)的高效调度模型算法。其设计思想类似于手表时钟的设计,主要的数据结构为数组+链表,多层时间轮有两种实现方案,一种是轮次时间轮,一种是多层时间轮方案。
3. 实现案例
3.1 Kafka中的时间轮
Kafka的时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务链表(TimerTaskList),或者称之为任务槽。TimerTaskList是一个环形的双向链表,链表中的每一项表示的均是定时任务(TimerTaskEntry),其中封装了真正的定时任务(TimerTask)。
时间轮由多个时间格组成, 每个时间格代表当前时间轮的基本时间跨度(tickMs) 。时间轮的时间格个数是固定的,可用wheelSize来表示,那么整个时间轮的总体时间跨度(interval)可以通过公式tickMs × wheelSize计算得出。时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间,currentTime是tickMs的整数倍。currentTime可以将整个时间轮划分为到期部分和未到期部分,currentTime当前指向的时间格也属于到期部分,表示刚好到期,需要处理此时间格所对应的TimerTaskList中的所有任务。
3.1.1 任务的添加
// TimerTaskEntry的就是包装了任务,并且记录任务的执行时间 = 延时+当前时间
def add(timerTaskEntry: TimerTaskEntry): Boolean = {
val expiration = timerTaskEntry.expirationMs
if (timerTaskEntry.cancelled) {
// Cancelled
false
} else if (expiration < currentTime + tickMs) { // 如果到期
// Already expired
false
} else if (expiration < currentTime + interval) { // 如果还在本层
// Put in its own bucket
val virtualId = expiration / tickMs
val bucket = buckets((virtualId % wheelSize.toLong).toInt) // 计算bucket
bucket.add(timerTaskEntry) // 添加到bucket中的双向链表中
// Set the bucket expiration time
if (bucket.setExpiration(virtualId * tickMs)) { // 更新bucket过期时间
// The bucket needs to be enqueued because it was an expired bucket
// We only need to enqueue the bucket when its expiration time has changed, i.e. the wheel has advanced
// and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle
// will pass in the same value and hence return false, thus the bucket with the same expiration will not
// be enqueued multiple times.
queue.offer(bucket) // 将bucket加入delayQueue
}
true
} else {
// Out of the interval. Put it into the parent timer
if (overflowWheel == null) addOverflowWheel()
overflowWheel.add(timerTaskEntry)
}
}
从上面的 add 方法我们知道每次对比都是根据expiration < currentTime + interval 来进行对比的,那currentTime 如何进行推进的呢?
3.1.2 时间轮的推进
Netty 中是通过固定的时间间隔扫描,时候未到就等待来进行时间轮的推动。
而 Kafka 就利用了空间换时间的思想,通过 DelayQueue,来保存每个槽,通过每个槽的过期时间排序。这样拥有最早需要执行任务的槽会有优先获取。如果时候未到,那么 delayQueue.poll 就会阻塞着,这样就不会有空推进的情况发送。
SystemTimer
我们先看下SystemTimer构造器如下:
@threadsafe
class SystemTimer(executorName: String,
tickMs: Long = 1,
wheelSize: Int = 20,
startMs: Long = Time.SYSTEM.hiResClockMs) extends Timer {
// timeout timer
private[this] val taskExecutor = Executors.newFixedThreadPool(1,
(runnable: Runnable) => KafkaThread.nonDaemon("executor-" + executorName, runnable))
private[this] val delayQueue = new DelayQueue[TimerTaskList]()
private[this] val taskCounter = new AtomicInteger(0)
private[this] val timingWheel = new TimingWheel(
tickMs = tickMs,
wheelSize = wheelSize,
startMs = startMs,
taskCounter = taskCounter,
delayQueue
)
其中SystemTimer.advanceClock即为推进的方法
3.1.3 小结
Kafka 用了多层次时间轮来实现,并且是按需创建时间轮,采用任务的绝对时间来判断延期,并且对于每个bucket槽(槽内存放的也是任务的双向链表)都会维护一个过期时间,利用 DelayQueue 来对每个槽的过期时间排序,来进行时间的推进,防止空推进的存在。
每次推进都会更新 currentTime 为当前时间戳,当然做了点微调使得 currentTime 是 tickMs 的整数倍。并且每次推进都会把能降级的任务重新插入降级。
可以看到这里的 DelayQueue 的元素是每个槽,而不是任务,因此数量就少很多了,这应该是权衡了对于槽操作的延时队列的时间复杂度与空推进的影响。