单机环境下定时任务的基本原理和常见解决方案之时间轮原理分析
- 时间轮
- Netty时间轮使用
- Netty 时间轮 HashedWheelTimer 源码分析
- 向时间轮里添加task
- WorkerThread线程执行时间轮任务
- 多层时间轮
- 总结
时间轮
生活中的时钟想必大家都不陌生,而时间轮的设计思想就是来源于生活中的时钟,这个从其命名就可以看出。
时间轮是一种环形的数据结构,我们可以将其想象成时钟的样子,时间轮上有许多格子(bucket),每个格子代表一段时间,时间轮的精度取决于一个格子的代表的时间,比如时间轮的格子是一秒跳一次,那么其调度任务的精度就是1秒,小于一秒的任务无法被时间轮调度。
时间轮上的bucket数量是有限的,而任务的数量是可以无限大的(理论上),所以时间轮使用一个链表来存储放在某个格子上的定时任务。
如下图所示 :
假设一个格子是1秒,整个时间轮有10个格子,那走一圈就表示10s,假如当前指针指向1,此时需要调度一个12s后执的任务,应该等指针走完一圈+2格再执行,因此应放入序号为3的格子,同时将round(1)保存到任务中。
指针随着时间一格格转动,走到每个格子,则检查格子中是否有可以执行的任务。此时时间轮指将链表里round=0的任务取出来执行,其他任务的round都减1。
简单总结一下,时间轮通过数组+链表的形式来存储定时任务,每个任务存放的bucket的计算公式:
(预计时间-时间轮开始时间)/(每一格的时间*时间轮的bucket数) 对应的商就是round,余数就是bucket的下标(本质还是取模)
Netty 需要管理大量的连接,每一个连接都会有很多检测超时任务,比如发送超时、心跳检测间隔等。
它提供了工具类 HashedWheelTimer 来实现延迟任务。该工具类就是采用时间轮原理来实现的。
Netty时间轮使用
后续的源码分析都是基于4.1.80版本的源码
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>4.1.80.Final</version>
</dependency>
public static void main(String[] args) {
//创建一个HashedWheelTimer时间轮,有16个格的轮子,每一秒走一个格子
Timer timer = new HashedWheelTimer(1, TimeUnit.SECONDS, 16);
System.out.println(Calendar.getInstance().getTime() + "开始执行任务...");
//添加任务到时间轮中
timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) {
System.out.println(Calendar.getInstance().getTime() + ":执行任务1");
}
}, 5, TimeUnit.SECONDS);
timer.newTimeout(timeout ->
System.out.println(Calendar.getInstance().getTime() + ":执行任务2"), 8,
TimeUnit.SECONDS);
}
Netty 时间轮 HashedWheelTimer 源码分析
构造方法的三个参数分别代表
- tickDuration 每一tick的时间,走一格是多久
- timeUnit tickDuration的时间单位
- ticksPerWheel 时间轮一共有多个格子,即一圈表示多少个tick。
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts) {
this.worker = new Worker();
//一个CountDownLatch
this.startTimeInitialized = new CountDownLatch(1);
//mpsc队列
this.timeouts = PlatformDependent.newMpscQueue();
//mpsc队列
this.cancelledTimeouts = PlatformDependent.newMpscQueue();
this.pendingTimeouts = new AtomicLong(0L);
ObjectUtil.checkNotNull(threadFactory, "threadFactory");
ObjectUtil.checkNotNull(unit, "unit");
ObjectUtil.checkPositive(tickDuration, "tickDuration");
ObjectUtil.checkPositive(ticksPerWheel, "ticksPerWheel");
//创建时间轮,默认创建512个轮 就是创建一个长度为512的HashedWheelBucket数组
this.wheel = createWheel(ticksPerWheel);
this.mask = this.wheel.length - 1;
//默认tickDuration=100ms
long duration = unit.toNanos(tickDuration);
if (duration >= Long.MAX_VALUE / (long)this.wheel.length) {
throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / (long)this.wheel.length));
} else {
if (duration < MILLISECOND_NANOS) {
logger.warn("Configured tickDuration {} smaller then {}, using 1ms.", tickDuration, MILLISECOND_NANOS);
this.tickDuration = MILLISECOND_NANOS;
} else {
this.tickDuration = duration;
}
//创建一个线程workerThread,此时未启动(延迟启动,当有任务添加后再启动)
this.workerThread = threadFactory.newThread(this.worker);
this.leak = !leakDetection && this.workerThread.isDaemon() ? null : leakDetector.track(this);
this.maxPendingTimeouts = maxPendingTimeouts;
if (INSTANCE_COUNTER.incrementAndGet() > 64 && WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
reportTooManyInstances();
}
}
}
向时间轮里添加task
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
ObjectUtil.checkNotNull(task, "task");
ObjectUtil.checkNotNull(unit, "unit");
long pendingTimeoutsCount = this.pendingTimeouts.incrementAndGet();
//如果maxPendingTimeouts>0,则表示对于存储的任务有上限,默认无限制
if (this.maxPendingTimeouts > 0L && pendingTimeoutsCount > this.maxPendingTimeouts) {
this.pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending timeouts (" + this.maxPendingTimeouts + ")");
} else {
//启动workerThread线程
this.start();
//判断当前任务还要多长时间执行(这里的startTime就是workerThread的启动时间,执行到这的时候startTime一定有值,否则this.start()会阻塞)
long deadline = System.nanoTime() + unit.toNanos(delay) - this.startTime;
if (delay > 0L && deadline < 0L) {
deadline = Long.MAX_VALUE;
}
//封装成HashedWheelTimeout,并将其加入到MpscQueue(timeouts队列)
//MPSC: Multi producer, Single consumer FIFO
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
this.timeouts.add(timeout);
return timeout;
}
}
workerThread线程延迟启动
public void start() {
switch (WORKER_STATE_UPDATER.get(this)) {
case 0:
//通过CAS保证线程安全,workThread线程只会启动一次
if (WORKER_STATE_UPDATER.compareAndSet(this, 0, 1)) {
this.workerThread.start();
}
case 1:
break;
case 2:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
//当startTime==0L,表示workThread线程还没有启动,通过CountDownLatch阻塞在这,直到workThread线程启动
while(this.startTime == 0L) {
try {
this.startTimeInitialized.await();
} catch (InterruptedException var2) {
}
}
}
WorkerThread线程执行时间轮任务
public void run() {
//设置startTime
HashedWheelTimer.this.startTime = System.nanoTime();
if (HashedWheelTimer.this.startTime == 0L) {
HashedWheelTimer.this.startTime = 1L;
}
HashedWheelTimer.this.startTimeInitialized.countDown();
int idx;
HashedWheelBucket bucket;
//自旋直到wheelTimer被关闭
do {
//计算时间轮走到下一个tick的时间点(如果没有到时间则通过sleep休眠等待),这里返回的deadline是当前时间距离时间轮启动经过的时间(deadline小于0说明数据异常,不执行操作)
long deadline = this.waitForNextTick();
if (deadline > 0L) {
//根据tick与轮的大小取模 得到当前tick所在的bucket的下标
idx = (int)(this.tick & (long)HashedWheelTimer.this.mask);
//处理已经取消的task(将取消队列里的任务从bucket丢弃,如果已经放入到bucket里的话)
this.processCancelledTasks();
bucket = HashedWheelTimer.this.wheel[idx];
//将timeouts队列中缓存的任务取出加入到时间轮中
this.transferTimeoutsToBuckets();
//处理当前bucket所有的到期任务
bucket.expireTimeouts(deadline);
++this.tick;
}
} while(HashedWheelTimer.WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == 1);
//wheelTimer被关闭后的处理,取出每一个bucket里还没被执行的任务,放到unprocessedTimeouts中
HashedWheelBucket[] var5 = HashedWheelTimer.this.wheel;
int var2 = var5.length;
for(idx = 0; idx < var2; ++idx) {
bucket = var5[idx];
bucket.clearTimeouts(this.unprocessedTimeouts);
}
while(true) {
HashedWheelTimeout timeout = (HashedWheelTimeout)HashedWheelTimer.this.timeouts.poll();
//处理所有被取消的任务
if (timeout == null) {
this.processCancelledTasks();
return;
}
//将加到timeouts队列里的任务添加到把未处理的unprocessedTimeouts队列中
if (!timeout.isCancelled()) {
this.unprocessedTimeouts.add(timeout);
}
}
}
主要流程:
1.如果HashedWheelTimer未关闭,则等待到达下一个tick的时间(未到则sleep)
2.到达下一tick时间后
- 1)将已取消的任务丢弃
- 2)然后将timeouts队列里的任务迁移到bucket对应的位置上
- 3)获取当前tick对应的bucket,执行其中已经到达执行时间的任务
3.如果HashedWheelTimer已关闭,则将bucket里还没被执行的任务和timeouts队列里未取消的任务,统一放到unprocessedTimeouts队列中。
然后统一处理取消队列里的任务(processCancelledTasks) 也就是说已取消的任务在取消操作时只是放入到取消队列里,并没有从timeouts队列或者bucket里移除
private long waitForNextTick() {
//计算下一个tick的时间点,该时间是相对时间轮启动时间的相对时间
long deadline = HashedWheelTimer.this.tickDuration * (this.tick + 1L);
//自旋
while(true) {
//计算时间轮启动后经过的时间
long currentTime = System.nanoTime() - HashedWheelTimer.this.startTime;
//判断需要休眠的时间
long sleepTimeMs = (deadline - currentTime + 999999L) / 1000000L;
if (sleepTimeMs <= 0L) {
if (currentTime == Long.MIN_VALUE) {
return -9223372036854775807L;
}
//如果当前时间大于下一个tick的时间,则直接返回(说明到执行任务的时间了),否则sleep
return currentTime;
}
if (PlatformDependent.isWindows()) {
sleepTimeMs = sleepTimeMs / 10L * 10L;
if (sleepTimeMs == 0L) {
sleepTimeMs = 1L;
}
}
try {
//休眠对应的时间
Thread.sleep(sleepTimeMs);
} catch (InterruptedException var8) {
if (HashedWheelTimer.WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == 2) {
return Long.MIN_VALUE;
}
}
}
}
private void transferTimeoutsToBuckets() {
//从timeouts队列中获取任务,每次最多只能获取10万个任务
for(int i = 0; i < 100000; ++i) {
HashedWheelTimeout timeout = (HashedWheelTimeout)HashedWheelTimer.this.timeouts.poll();
//timeout==null说明timeouts队列已经空了
if (timeout == null) {
break;
}
//计算执行该任务需要放到哪个bucket下并且对应的round为多少
if (timeout.state() != 1) {
//计算任务的执行时间
//这里的deadline是任务执行时间相对时间轮开始时间的时间,也就是计算从时间轮的开始时间算起,需要经过多少次tick
long calculated = timeout.deadline / HashedWheelTimer.this.tickDuration;
timeout.remainingRounds = (calculated - this.tick) / (long)HashedWheelTimer.this.wheel.length;
//calculated和tick去较大者,就是说如果当前任务的执行时间已过期,则让其在当前tick执行
long ticks = Math.max(calculated, this.tick);
//计算该任务要在哪个bucket下执行
int stopIndex = (int)(ticks & (long)HashedWheelTimer.this.mask);
HashedWheelBucket bucket = HashedWheelTimer.this.wheel[stopIndex];
//HashedWheelBucket底层是一个HashedWheelTimeout的链表
bucket.addTimeout(timeout);
}
}
}
处理已到执行时间的任务
// 这里的deadline是当前时间距离时间轮启动经过的时间
public void expireTimeouts(long deadline) {
HashedWheelTimeout next;
//从头遍历HashedWheelTimeout链表
for(HashedWheelTimeout timeout = this.head; timeout != null; timeout = next) {
next = timeout.next;
if (timeout.remainingRounds <= 0L) {
next = this.remove(timeout);
//说明当前任务的执行时间大于deadline,中间可能哪里出现故障,抛出异常
if (timeout.deadline > deadline) {
throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
//调用timeout.expire方法,执行task,通过ImmediateExecutor线程池执行任务,实际就是调用task的run方法
timeout.expire();
} else if (timeout.isCancelled()) {
next = this.remove(timeout);
} else {
//未到执行时间,remainingRounds-1
--timeout.remainingRounds;
}
}
}
多层时间轮
当时间跨度很大时,提升单层时间轮的 tickDuration 可以减少空转次数,但会导致时间精度变低,使用多层时间轮既可以避免精度降低,也能减少空转次数。
如果有时间跨度较长的定时任务,则可以交给多层级时间轮去调度。
假设有一个设置为5天14 小时40 分30 秒后执行的定时任务,在 tickDuration = 1s 的单层时间轮中,需要经过:5x24x60x60+14x60x60+40x60+30 数十万次tick才能被执行。
但在 wheel1 tickDuration = 1 天,wheel2 tickDuration = 1 小时,wheel3 tickDuration = 1 分,wheel4 tickDuration = 1 秒 的四层时间轮中,只需要经过 5+14+40+30 次tick就可以了。
总结
while+sleep | Timer | ScheduledThreadPoolExecutor | HashedWheelTimer | |
---|---|---|---|---|
实现方式 | while+sleep | 最小堆 | 最小堆 | 基于时间轮 |
写入效率 | - | O(logN) | O(logN) | 类HashMap,近似O(1) |
查询效率 | - | O(1) | O(1) | 近似O(1) |
优点 | 实现简单 O(1) | 可以对大量定时任务进行统一调度 | 线程池执行,有异常捕获机制 | 写入性能高 |
缺点 | 对于大量定时任务不便于管理 | 单线程执行;没有异常捕获机制 | 写入效率较低,在需要大量添加定时任务的场景下会影响性能 | 单线程执行;没有异常捕捉机制 |
注意下,HashedWheelTimer 的写入和查询效率都是近似O(1),由于链表的存在,如果要执行任务的存放在长链表的末尾,那他的查询性能可能会很差,HashMap通过扰动函数来将减少hash冲突,时间轮也可以通过设置合适的时间精度,来减少hash冲突
Netty对时间轮的实现是基于他的使用场景,我们可以根据不同的业务场景对时间轮进行优化
- 将所有的任务交给线程池执行,避免单个任务的执行耗时较长影响下一个任务的执行
- 可以给每个bucket设置一个线程池来执行这个bucket的任务
- 假设需要在同一时刻,执行大量比较耗时的任务,那么可以通过MQ解耦,然后使用消费者并发执行任务,提高性能
。。。。
选择哪一种方式来实现定时/延迟任务取决于各自的业务场景。