前言
前些时间想到利用redis实现延时队列,但是底层的定时器不止如何实现好些,故此研究了一下jdk的Timer。
Timer是一个用于执行定时任务的类,可以单次执行或按指定时间间隔循环执行(直到主动cancel或线程被杀掉)。Timer中任务处理采用了生产者-消费者模型的设计思想。
原理简述
Timer里面维护了一个TimerThread(继承了Thread)和TaskQueue(其实就是一个初始化长度为128的TimerTask数组【TimerTask就是我们的执行任务】),当我们new Timer()的时候就会调用TimerThread.start方法,然后TimerThread线程while(true)循环从TaskQueue中取出任务(TimerTask)执行,那么如何取出执行时间最近的任务呢?前面有提到TaskQueue底层是一个数组,但其实是个小根堆,只要每次取出根元素就是执行时间最近的一个任务了。
ps:TimerTask里面有一个long类型的nextExecutionTime(下次执行时间)变量,小根堆就是根据这个来排序的…
在这里取出最近的任务并不是取的小根堆的根元素,而是TaskQueue中下标为1的元素,也就是第二个元素
组件介绍
Timer关键属性:TaskQueue、TimerThread
TaskQueue:任务队列
一个存储任务的任务池TaskQueue,包含一个初始大小为128的TimerTask数组,负责任务的存储(add)、排序(fixUp、fixDown)、取出(getMin)、清理(removeMin、quickRemove)、循环任务处理(rescheduleMin)以及一些其他基本操作。并通过排序保证队头任务的执行一定是最早的。
根据注释可以知道TimerTask是一个用做一个平衡二叉树的模型,一个父节点array[n]下挂载的两个子节点为array[2n]和array[2n+1].
TimerTask:任务实体
TimerTask是任务实体,Runnable接口的实现类。内部包含用于线程安全的锁lock、用于标记任务状态的字段state、以及一个供用户实现的任务内容抽象方法run().
public abstract class TimerTask implements Runnable {
/** 此对象用于控制对TimerTask内部的访问。 */
final Object lock = new Object();
/** 标记任务状态的字段 初始化为0 */
int state = VIRGIN;
/** 该状态表示任务尚未执行。在TimerTask对象创建时,它的状态就是VIRGIN。*/
static final int VIRGIN = 0;
/**
* 表示任务已经被调度,等待执行。当调用Timer.schedule()方法成功后,任务的状态将变为SCHEDULED。
* 此时任务已经被加入了任务队列中,等待Timer线程按照任务的调度时间来执行。
*/
static final int SCHEDULED = 1;
/** 表示任务已经执行完成。此时任务的run()方法已经执行完毕,但任务对象还没有从任务队列中删除,因为队列中的任务删除是由Timer线程自动完成的。*/
static final int EXECUTED = 2;
/**
* 该状态表示任务已经被取消。当调用TimerTask.cancel()方法取消任务时,任务的状态将变为CANCELLED。
* 此时任务被标记为取消状态,即使它已经被加入了任务队列中,也不会执行。一旦任务被标记为CANCELLED状态,它将永远不会被执行。
*/
static final int CANCELLED = 3;
/**
* 此任务的下一次执行时间,格式为System.currentTimeMillis,假设此任务计划执行。对于重复任务,此字段在每次任务执行之前更新。
*/
long nextExecutionTime;
/**
* 重复任务的周期(毫秒)。正值表示固定利率执行。负值表示固定延迟执行。值0表示非重复任务。
*/
long period = 0;
// ...
TimerThread:事件消费者
一个作为事件消费者的TimerThread,TimerThread中不断获取当前任务队列的队头任务,执行任务。并根据任务是否需要循环决定是移除任务还是将任务按下一次执行时间重新加入到任务队列中。在TimerThread中不断获取待执行任务时,采用了Object.wait()和Object.notify()的机制。Object.wait()保证了任务队列为空时及时释放资源,而当有新的任务时也通过Object.notify()及时恢复任务的遍历。
Timer本身提供了对以上三者操作的封装、实例化和对外暴露运行任务的接口。同时,作为生产者,将用户任务加入到任务队列;对消费者层面,Timer也是和消费者线程唯一绑定的,负责启动消费者线程,并在生产了新的任务后及时通知已经休眠的消费者。提供了多种构造方法和清理接口。
源码解析
Timer的使用demo
public class TimerDemo {
public static void main(String[] args) {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
String strDate = format.format(new Date());
try {
Thread.sleep(1000);
} catch (InterruptedException e) { }
System.err.println("strDate1 = " + strDate);
}
}, 5, 5000);
}
}
Timer构造方法:
// Timer.java 131行
public Timer() {
this("Timer-" + serialNumber());
}
// Timer.java 158行
public Timer(String name) {
// 设置线程名称
thread.setName(name);
// 启动TimerThread线程
thread.start();
}
TimerThread继承了Thread类,所以我们看一下它的run方法:
// Timer.java 503行
public void run() {
try {
mainLoop();
} finally {
// Someone killed this Thread, behave as if Timer cancelled
// 【有人杀死了这个线程,表现得就像计时器被取消一样】
synchronized(queue) {
newTasksMayBeScheduled = false;
queue.clear(); // Eliminate obsolete references【消除过时的引用】
}
}
}
跟进去mainLoop()方法
/**
* 主计时器循环
*/
// Timer.java 518行
private void mainLoop() {
while (true) {
try {
TimerTask task;
boolean taskFired;
synchronized(queue) {
// 线程为空则wait
while (queue.isEmpty() && newTasksMayBeScheduled)
queue.wait();
if (queue.isEmpty())
break; // 队列为空,将永远保留;死亡
long currentTime, executionTime;
// 获取距离执行时间最近的任务(返回的是queue[1]的引用)
task = queue.getMin();
synchronized(task.lock) {
// 如果任务的状态是已经取消,则移除该任务,移除后TimerTask数组元素将重新排序
if (task.state == TimerTask.CANCELLED) {
queue.removeMin();
continue; // No action required, poll queue again
}
currentTime = System.currentTimeMillis();
// 该任务的下次执行时间
executionTime = task.nextExecutionTime;
// 若任务的下次执行时间 < 当前时间
if (taskFired = (executionTime<=currentTime)) {
// period值0表示非重复任务。将该任务从任务队列移除,并且将任务的state设置为已执行
if (task.period == 0) { // Non-repeating, remove
queue.removeMin();
task.state = TimerTask.EXECUTED;
} else { // Repeating task, reschedule
// 将与头任务关联的nextExecutionTime设置为指定值,并相应地调整优先级队列。
queue.rescheduleMin(
task.period<0 ? currentTime - task.period
: executionTime + task.period);
}
}
}
// 还未到执行时间,wait
if (!taskFired)
queue.wait(executionTime - currentTime);
}
// 执行时间到了,执行任务的run方法【这里就是我们自己写的逻辑了】
if (taskFired)
task.run();
} catch(InterruptedException e) {
}
}
}
如上,若queue为空&&newTasksMayBeScheduled为true,则调用wait方法等待,若queue不为空则调用queue.getMin获取距离执行时间最近的任务(返回的是queue[1]的引用),然后判断任务的state和nextExecutionTime(下次执行时间),条件满足则执行任务;如果任务的state是已经取消,则移除该任务,移除后TimerTask数组元素将重新排序,若还未到执行时间则wait。
ps:为什么queue.getMin是返回queue[1]元素?
答:TaskQueue内部使用一个数组queue来存储所有的TimerTask对象。数组的第一个元素queue[0]没有使用,实际的任务从数组的第二个元素queue[1]开始存储。数组中的元素是按照任务的执行时间从小到大排序的,也就是说,queue[1]元素表示最先执行的任务。
接下来再看Timer.schedule()方法:
public void schedule(TimerTask task, long delay, long period) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
// 进去看看,参数1:任务 参数二:下次执行时间 参数三:重复任务的周期(毫秒)
sched(task, System.currentTimeMillis()+delay, -period);
}
// Timer.java 386行
private void sched(TimerTask task, long time, long period) {
if (time < 0)
throw new IllegalArgumentException("Illegal execution time.");
// Constrain value of period sufficiently to prevent numeric
// overflow while still being effectively infinitely large.
if (Math.abs(period) > (Long.MAX_VALUE >> 1))
period >>= 1;
synchronized(queue) {
if (!thread.newTasksMayBeScheduled)
throw new IllegalStateException("Timer already cancelled.");
synchronized(task.lock) {
if (task.state != TimerTask.VIRGIN)
throw new IllegalStateException(
"Task already scheduled or cancelled");
// 设置任务下次执行时间、任务周期、任务状态
task.nextExecutionTime = time;
task.period = period;
task.state = TimerTask.SCHEDULED;
}
// 将任务添加到队列【必要的时候会对任务队列进行扩容,对小根堆的元素重新排序】
queue.add(task);
// 若当前添加的任务是距离当前执行时间最近的任务则唤醒等待线程【其实就是TimerThread线程】
if (queue.getMin() == task)
queue.notify();
}
}
最后有几点需要注意:
1)在不使用时一定要及时cancel清理,释放资源。
2)当timer中有多任务时,因为后边任务会依赖前边任务执行完,尤其是如果有耗时任务,会发生定时不准确的现象。
3)当存在多任务时,若其中某个因异常而终止,则会退出所有任务的执行(消费者线程被异常终止了)