文章目录
- 1.常见定时任务实现
- 2.时间轮算法
- 3.HashedWheelTimer源码分析
- 3.1 内部结构分析
- 3.2 构造方法
- 3.3 添加任务
- 3.4 工作线程Worker
- 3.5 停止时间轮
- 4.HashWheelTimer总结
1.常见定时任务实现
定时器的使用场景包括:成月统计报表、财务对账、会员积分结算、邮件推送等,它一般有三种表现形式:按固定周期定时执行、延迟一定时间后执行、指定某个时刻执行。
定时器的本质是设计一种数据结构,能够存储和调度任务集合,而且 deadline 越近的任务拥有更高的优先级。那么定时器如何知道一个任务是否到期了呢?定时器需要通过轮询的方式来实现,每隔一个时间片去检查任务是否到期。
定时器的内部结构一般需要一个任务队列和一个异步轮询线程。
JDK原生支持三种定时器实现: Timer、DelayedQueue 和 ScheduledThreadPoolExecutor
2.时间轮算法
时间轮算法的设计思想就来源于钟表,时间轮可以理解为一种环形结构,像钟表一样被分为多个 slot 槽位。
每个 slot 代表一个时间段,每个 slot 中可以存放多个任务,使用的是链表结构保存该时间段到期的所有任务。时间轮通过一个时针随着时间一个个 slot 转动,并执行 slot 中的所有到期任务。
时间轮定时器最大的优势就是, 任务的新增和取消都是 O(1) 时间复杂度, 而且只需要一个线程就可以驱动时间轮进行工作。
处理冲突的方法采用的是拉链法。在任务数量比较多的场景下, 适当增加时间轮的 slot 数量, 可以减少时针转动时遍历的任务个数。
3.HashedWheelTimer源码分析
3.1 内部结构分析
创建任务 newTimeout() 和停止所有未执行任务 stop()。Timer 可以认为是上层的时间轮调度器,通过 newTimeout() 方法可以提交一个任务 TimerTask,并返回一个 Timeout。
public class HashedWheelTimerTest {
public static void main(String[] args) {
Timer timer = new HashedWheelTimer();
Timeout timeout1 = timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) {
System.out.println("timeout1: " + new Date());
}
}, 10, TimeUnit.SECONDS);
if (!timeout1.isExpired()) {
timeout1.cancel();
}
timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws InterruptedException {
System.out.println("timeout2: " + new Date());
Thread.sleep(5000);
}
}, 1, TimeUnit.SECONDS);
timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) {
System.out.println("timeout3: " + new Date());
}
}, 3, TimeUnit.SECONDS);
}
}
timeout2: Mon Nov 09 19:57:04 CST 2020
timeout3: Mon Nov 09 19:57:09 CST 2020
newTimeout() 启动了三个 TimerTask, timeout1被取消了, timeout2 和 timeout3 分别应该在 1s 和 3s 后执行
3.2 构造方法
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
long maxPendingTimeouts) {
wheel = createWheel(ticksPerWheel);
mask = wheel.length - 1;
// Convert tickDuration to nanos.
long duration = unit.toNanos(tickDuration);
workerThread = threadFactory.newThread(worker); // 创建工作线程
leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null; // 是否开启内存泄漏检测
this.maxPendingTimeouts = maxPendingTimeouts; // 最大允许等待任务数,HashedWheelTimer 中任务超出该阈值时会抛出异常
// 如果 HashedWheelTimer 的实例数超过 64,会打印错误日志
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
reportTooManyInstances();
}
}
- threadFactory,线程池,但是只创建了一个线程;
- tickDuration,时针每次 tick 的时间,相当于时针间隔多久走到下一个 slot;
- unit,表示 tickDuration 的时间单位;
- ticksPerWheel,时间轮上一共有多少个 slot,默认 512 个。分配的 slot 越多,占用的内存空间就越大;
- leakDetection,是否开启内存泄漏检测;
- maxPendingTimeouts,最大允许等待任务数。
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException(
"ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
if (ticksPerWheel > 1073741824) {
throw new IllegalArgumentException(
"ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
}
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for (int i = 0; i < wheel.length; i ++) {
wheel[i] = new HashedWheelBucket();
}
return wheel;
}
private static int normalizeTicksPerWheel(int ticksPerWheel) {
int normalizedTicksPerWheel = 1;
while (normalizedTicksPerWheel < ticksPerWheel) {
normalizedTicksPerWheel <<= 1;
}
return normalizedTicksPerWheel;
}
创建 HashedWheelBucket 数组, 每个 HashedWheelBucket 表示时间轮中一个 slot, HashedWheelBucket 内部是一个双向链表结构, 双向链表的每个节点持有一个 HashedWheelTimeout 对象, HashedWheelTimeout 代表一个定时任务, 每个 HashedWheelBucket 都包含双向链表 head 和 tail 两个 HashedWheelTimeout 节点, 可以实现不同方向进行链表遍历。
normalizeTicksPerWheel() 方法的作用就是找到不小于ticksPerWheel 的最小 2 次幂。
3.3 添加任务
newTimeout()
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
// 省略其他代码
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
start(); // 1. 如果 worker 线程没有启动,需要启动
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; // 计算任务的 deadline
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); // 2. 创建定时任务
timeouts.add(timeout); // 3. 添加任务到 Mpsc Queue
return timeout;
}
private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
- 启动工作线程
- 根据用户传入的任务延迟时间, 计算deadline
- 创建工作线程
- 添加任务到Mpsc Queue, Mpsc Queue 可以理解为多生产者单消费者的线程安全队列, HashedWheelTimer 是想借助 Mpsc Queue 保证多线程向时间轮添加任务的线程安全性。
HashedWheelTimer 的工作线程采用了懒启动的方式, 样做的好处是在时间轮中没有任务时, 可以避免工作线程空转而造成性能损耗。
public void start() {
switch (WORKER_STATE_UPDATER.get(this)) {
case WORKER_STATE_INIT:
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
// Wait until the startTime is initialized by the worker.
while (startTime == 0) {
try {
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}
通过 CAS 操作获取工作线程的状态, 如果启动了, 则跳过, 没有启动, 利用CAS改变工作线程状态, 然后启动工作线程, 启动的过程是直接调用的 Thread#start() 方法。
3.4 工作线程Worker
工作线程 Worker 是时间轮的核心引擎, 随着时针的转动, 到期任务的处理都由 Worker 处理完成。
private final class Worker implements Runnable {
private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>(); // 未处理任务列表
private long tick;
@Override
public void run() {
startTime = System.nanoTime();
if (startTime == 0) {
startTime = 1;
}
startTimeInitialized.countDown();
do {
final long deadline = waitForNextTick(); // 1. 计算下次 tick 的时间, 然后sleep 到下次 tick
if (deadline > 0) { // 可能因为溢出或者线程中断,造成 deadline <= 0
int idx = (int) (tick & mask); // 2. 获取当前 tick 在 HashedWheelBucket 数组中对应的下标
processCancelledTasks(); // 3. 移除被取消的任务
HashedWheelBucket bucket =
wheel[idx];
transferTimeoutsToBuckets(); // 4. 从 Mpsc Queue 中取出任务加入对应的 slot 中
bucket.expireTimeouts(deadline); // 5. 执行到期的任务
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
// 时间轮退出后,取出 slot 中未执行且未被取消的任务,并加入未处理任务列表,以便 stop() 方法返回
for (HashedWheelBucket bucket: wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
// 将还没来得及添加到 slot 中的任务取出,如果任务未取消则加入未处理任务列表,以便 stop() 方法返回
for (;;) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
processCancelledTasks();
}
}
工作线程 Worker 的核心执行流程是代码中的 do-while 循环, 只要 Worker 处于 STARTED 状态, 就执行do - while。
- 通过 waitForNextTick() 方法计算出时针到下一次 tick 的时间间隔, sleep到下一个tick
- 通过位运算获取当前 tick 在 HashedWheelBucket 数组中对应的下标
- 移除被取消的任务
- 从 Mpsc Queue 中取出任务加入对应的 HashedWheelBucket 中
- 执行当前 HashedWheelBucket 中的到期任务
计算下次 tick 的时间, 然后sleep 到下次 tick
private long waitForNextTick() {
long deadline = tickDuration * (tick + 1);
for (;;) {
final long currentTime = System.nanoTime() - startTime;
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}
if (PlatformDependent.isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
}
try {
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}
根据 tickDuration 可以推算出下一次 tick 的 deadline, deadline 减去当前时间就可以得到需要 sleep 的等待时间。
从 Mpsc Queue 中取出任务加入对应的 HashedWheelBucket 中
private void transferTimeoutsToBuckets() {
// 每次时针 tick 最多只处理 100000 个任务,以防阻塞 Worker 线程
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
continue;
}
long calculated = timeout.deadline / tickDuration; // 计算任务需要经过多少个 tick
timeout.remainingRounds = (calculated - tick) / wheel.length; // 计算任务需要在时间轮中经历的圈数 remainingRounds
final long ticks = Math.max(calculated, tick); // 如果任务在 timeouts 队列里已经过了执行时间, 那么会加入当前 HashedWheelBucket 中
int stopIndex = (int) (ticks & mask);
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);
}
}
根据用户设置的任务 deadline, 可以计算出任务需要经过多少次 tick 才能开始执行以及需要在时间轮中转动圈数 remainingRounds, remainingRounds 会记录在 HashedWheelTimeout 中, 在执行任务的时候 remainingRounds 会被使用到。
执行当前 HashedWheelBucket 中的到期任务
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
if (timeout.remainingRounds <= 0) {
next = remove(timeout);
if (timeout.deadline <= deadline) {
timeout.expire(); // 执行任务
} else {
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {
next = remove(timeout);
} else {
timeout.remainingRounds --; // 未到执行时间,remainingRounds 减 1
}
timeout = next;
}
}
从头开始遍历 HashedWheelBucket 中的双向链表, 如果 remainingRounds <=0, 执行任务, timeout.expire() 内部就是调用了 TimerTask 的 run() 方法。如果任务已经被取消, 从链表删除, 否则表示任务的执行时间未到, remainigRounds - 1, 等待下一圈。
Worker 还会执行一些后置的收尾工作, Worker 会从每个 HashedWheelBucket 取出未执行且未取消的任务, 以及未添加到 HashedWheelBucket 的任务, stop() 处理。
3.5 停止时间轮
@Override
public Set<Timeout> stop() {
// Worker 线程无法停止时间轮
if (Thread.currentThread() == workerThread) {
throw new IllegalStateException(
HashedWheelTimer.class.getSimpleName() +
".stop() cannot be called from " +
TimerTask.class.getSimpleName());
}
// 尝试通过 CAS 操作将工作线程的状态更新为 SHUTDOWN 状态
if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
INSTANCE_COUNTER.decrementAndGet();
if (leak != null) {
boolean closed = leak.close(this);
assert closed;
}
return Collections.emptySet();
}
try {
boolean interrupted = false;
while (workerThread.isAlive()) {
workerThread.interrupt(); // 中断 Worker 线程
try {
workerThread.join(100);
} catch (InterruptedException ignored) {
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
} finally {
INSTANCE_COUNTER.decrementAndGet();
if (leak != null) {
boolean closed = leak.close(this);
assert closed;
}
}
return worker.unprocessedTimeouts(); // 返回未处理任务的列表
}
Worker 线程无法停止时间轮, 为了防止有定时任务发起停止时间轮的恶意操作。
- 尝试通过 CAS 操作将工作线程的状态更新为 SHUTDOWN 状态
- 中断工作线程 Worker
- 将未处理的任务列表返回给上层
4.HashWheelTimer总结
- HashedWheelTimeout,任务的封装类,包含任务的到期时间 deadline、需要经历的圈数 remainingRounds 等属性。
- HashedWheelBucket,相当于时间轮的每个 slot,内部采用双向链表保存了当前需要执行的 HashedWheelTimeout 列表。
- Worker,HashedWheelTimer 的核心工作引擎,负责处理定时任务。
Kafka 时间轮的内部结构与 Netty 类似, 数组中的每个 slot 代表一个 Bucket, Bucket 中保存定时任务列表 TimerTaskList, TimerTaskList 同样采用双向链表的结构实现, 链表中的时间任务是TimerTaskEntry