目录
一、简介
1.1 继承关系
1.2 使用
1.3 例子
二、源码分析
2.1 构造方法
2.2 主要的四种提交执行任务的方法
2.3 内部类 ScheduledFutureTask
2.3 scheduleAtFixedRate()方法
2.4 delayedExecute()方法
2.5 ScheduledFutureTask类的run()方法
2.6 内部类 DelayedWorkQueue
2.7 其它
三、总结
一、简介
前面我们一起学习了ThreadPoolExecutor线程池普通任务、未来任务的执行流程,今天我们再来学习一种新的任务——定时任务。
定时任务是我们经常会用到的一种任务,它表示在未来某个时刻执行,或者未来按照某种规则重复执行的任务。
Java是通过ScheduledThreadPoolExecutor线程池实现的定时任务。定时任务的核心实现就是靠DelayedWorkQueue延迟队列实现的。ScheduledThreadPoolExecutor 使用的任务队列是DelayQueue,该队列封装了一个 PriorityQueue,PriorityQueue 会对队列中的任务进行排序,执行所需时间短的放在前面先被执行(ScheduledFutureTask 的 time 变量小的先执行),如果执行所需时间相同则先提交的任务将被先执行(ScheduledFutureTask 的 squenceNumber 变量小的先执行)。
1.1 继承关系
1.2 使用
常用:
java.util.concurrent.ScheduledThreadPoolExecutor#schedule 定时任务 (过多久之后执行,且只执行一次)
java.util.concurrent.ScheduledThreadPoolExecutor#scheduleAtFixedRate 固定速率连续执行
java.util.concurrent.ScheduledThreadPoolExecutor#scheduleWithFixedDelay 非固定速率连续执行
1.3 例子
创建一个定时线程池,用它来跑四种不同的定时任务。
public class ThreadPoolTest03 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建一个定时线程池
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(5);
System.out.println("start: " + System.currentTimeMillis());
// 执行一个无返回值任务,5秒后执行,只执行一次
scheduledThreadPoolExecutor.schedule(() -> {
System.out.println("spring: " + System.currentTimeMillis());
}, 5, TimeUnit.SECONDS);
// 执行一个有返回值任务,5秒后执行,只执行一次
ScheduledFuture<String> future = scheduledThreadPoolExecutor.schedule(() -> {
System.out.println("inner summer: " + System.currentTimeMillis());
return "outer summer: ";
}, 5, TimeUnit.SECONDS);
// 获取返回值
System.out.println(future.get() + System.currentTimeMillis());
// 按固定频率执行一个任务,每2秒执行一次,1秒后执行
// 任务开始时的2秒后
scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
System.out.println("autumn: " + System.currentTimeMillis());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
}, 1, 2, TimeUnit.SECONDS);
// 按固定延时执行一个任务,每延时2秒执行一次,1秒执行
// 任务结束时的2秒后
scheduledThreadPoolExecutor.scheduleWithFixedDelay(() -> {
System.out.println("winter: " + System.currentTimeMillis());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
}, 1, 2, TimeUnit.SECONDS);
}
}
定时任务总体分为四种:
(1)未来执行一次的任务,无返回值;
(2)未来执行一次的任务,有返回值;
(3)未来按固定频率重复执行的任务;
(4)未来按固定延时重复执行的任务;
二、源码分析
2.1 构造方法
初始化ScheduledThreadPoolExecutor。其实本质还是调用的其父类ThreadPoolExecutor的构造方法来创建线程池的。
// 只需要指定核心线程数即可,最大线程数固定为Integer.MAX_VALUE,即无限大,keepAliveTime固定为0,即如果没有待执行的任务,线程池的线程会立即退出,使用的阻塞队列实现是DelayedWorkQueue
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); // 这个是延迟队列
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
2.2 主要的四种提交执行任务的方法
public interface ScheduledExecutorService extends ExecutorService {
// 给定的延迟时间delay之后,才会执行任务command
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
// 给定的延迟时间delay之后,才会执行任务callable
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
/**
* 在给定的初始化延时initialDelay之后,固定频率地周期性执行任务command。
* 也就是说任务第一次运行时间是initialDelay,第二次运行时间是initialDelay+period,
* 第三次是initialDelay + period*2等等。 所以频率是相同地。
*
* 但是有一个问题,如果任务运行时间大于周期时间period该怎么办?
* 其实是这样的,在initialDelay之后开始运行任务,当任务完成之后,
* 将当前时间与initialDelay+period时间进行比较,如果小于initialDelay+period时间,那么等待,
* 如果大于initialDelay+period时间,那么就直接执行第二次任务
* 固定速率就是说每一次执行的时间就是上一次执行的时间加一个固定的时长
*
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
/**
* 在给定的初始化延时initialDelay之后,开始执行任务,任务执行完成之后,
* 等待delay时间,再一次执行任务。
* 因为它是等待任务完成之后,再进行延迟,就不会受任务完成时间长短地影响。
* 非固定速率就是说每一次执行的时间是上一次执行完成之后的时间加一个固定时长,因为任务执行时间是不确定的,所以就是非固定速率
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
下面这四个参数就对应着scheduledExecutorService.scheduleWithFixedDelay和scheduledExecutorService.scheduleAtFixedRate的参数:
- command:执行线程,即任务
- initialDelay:初始化延时 启动以后多久执行
- period:两次开始执行最小间隔时间 距离上次执行的时间
- unit:计时单位
2.3 内部类 ScheduledFutureTask
该类就类似于ThreadPoolExecutor中的FutureTask,就是用来包装任务的一个类。
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
// 任务的序列号
private final long sequenceNumber;
// 任务的执行时间点 / 任务执行结束的时间点 这个是根据调用的scheduleAtFixedRate还是scheduleWithFixedDelay决定的
private long time;
// 任务的执行周期,单位是纳秒
private final long period;
// 该任务在延迟队列中的节点,延迟队列存储任务的容器就是RunnableScheduledFuture类型的数组,ScheduledFutureTask和RunnableScheduledFuture是建立起了关联关系的
RunnableScheduledFuture<V> outerTask = this;
// 该任务在延迟队列中的索引下标,延迟队列存储任务的容器就是RunnableScheduledFuture类型的数组
int heapIndex;
// 构造方法
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
// 构造方法
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
// 构造方法
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
// 获取任务还有多久就可以执行了,单位是纳秒
public long getDelay(TimeUnit unit) {
// 任务要执行的时间减去当前时间,就是距离任务开始执行还有多久
return unit.convert(time - now(), NANOSECONDS);
}
// 比较两个任务的大小,比较的是距离任务开始执行还有多久,距离越短,优先级越高
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
/*
* 判断任务是否是周期性的任务
* 如果任务是周期性的,那么就会重新设置任务的执行时间,然后重新入队
*/
public boolean isPeriodic() {
return period != 0;
}
// 设置任务的下一次执行时间
private void setNextRunTime() {
long p = period;
// 上一次任务开始执行的时间加上执行间隔周期就是下一次任务要执行的时间
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
// 取消任务
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);
return cancelled;
}
// 该方法是真正执行定时任务的地方。任务执行完毕之后,如果是周期任务的话,则重新设置任务的执行时间,然后重新入队
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
}
下面我们先以按固定频率重复执行任务为例进行源码解析。
2.3 scheduleAtFixedRate()方法
提交一个按固定频率执行的任务。
// command:要执行的定时任务
// initialDelay:第一次要延时多久执行
// period:每次执行任务的间隔
// unit:时间单位
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
// 参数判断
// command和unit不能为空
if (command == null || unit == null)
throw new NullPointerException();
// 间隔不能小于等于0
if (period <= 0)
throw new IllegalArgumentException();
// 将普通Runnable任务包装成ScheduledFutureTask(其实就类似于普通线程池执行未来任务时将普通的任务包装成FutureTask的方式)
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
// 钩子方法,将传入的Runnable任务再包装成RunnableScheduledFuture,并且将该任务的RunnableScheduledFuture和该任务的ScheduledFutureTask关联起来,用来将任务加入到延迟队列中,这里认为t==sft
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
// 延时执行任务
delayedExecute(t);
return t;
}
可以看到,这里的处理跟未来任务类似,都是包装成另一个任务类型对象,再拿去执行,不同的是这里交给了delayedExecute()方法去执行,这个方法是干嘛的呢?
2.4 delayedExecute()方法
将任务加入到延迟队列中,并延时执行任务。
delayedExecute#ScheduledThreadPoolExecutor
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 如果线程池关闭了,执行拒绝策略
if (isShutdown())
reject(task);
else {
// 先把任务加入到延迟队列中去
super.getQueue().add(task);
// 再次检查线程池状态
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 保证有足够有线程执行任务,并且去启动工作线程
ensurePrestart();
}
}
ThreadPoolExecutor中的 ensurePrestart()方法
void ensurePrestart() {
// 获取工作线程数
int wc = workerCountOf(ctl.get());
// 创建工作线程
// 注意,这里没有传入firstTask参数,因为上面先把任务扔到队列中去了
// 另外,没用上maxPoolSize参数,所以最大线程数量在定时线程池中实际是没有用的
// 如果线程池中的线程数小于核心线程数,那么就创建一个核心线程
if (wc < corePoolSize)
// 调用addWorker()这一步就是ThreadPoolExecutor连起来了,这一步就会创建工作线程,并调用工作现成的start()方法启动工作线程,他就会持续的在阻塞队列中获取任务去执行了
// 而工作线程的run()方法,就是调用的runWorker()方法,在该方法中实现了从阻塞队列中获取任务并执行的操作,铜鼓getTask()获取到的任务类型就是ScheduledFutureTask。而执行任务的时候,本质还是ScheduledFutureTask对象调用自己的run()方法,因为定时任务线程池中ScheduledFutureTask就相当于ThreadPoolExecutor的FutureTask
addWorker(null, true);
// 如果线程池中的线程数等于0,那么就创建一个非核心线程
else if (wc == 0)
// 调用addWorker()这一步就是ThreadPoolExcutor连起来了。通过getTask()从队列中获取任务去执行就是在runWorker()方法中调用的
addWorker(null, false);
}
到这里就结束了?!
实际上,这里只是控制任务能不能被执行,将任务加入到了延迟队列中,真正执行任务的地方在任务的run()方法中。
还记得上面的任务被包装成了ScheduledFutureTask类的实例吗?所以,我们只要看ScheduledFutureTask的run()方法就可以了。因为启动工作线程,工作线程去阻塞队列中获取任务执行的过程已经在ThreadPoolExecutoe源码章节讲过了。
2.5 ScheduledFutureTask类的run()方法
定时任务执行的地方。
public void run() {
// 是否周期性执行
boolean periodic = isPeriodic();
// 线程池状态判断,如果不是RUNNING状态,取消任务
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 一次性任务,这种只会执行一次的任务直接调用父类的run()方法,这个父类实际上是FutureTask
// 这里我们不再讲解,在ThreadPoolExecutor源码章节已经讲了FutureTask.run()方法的源码
else if (!periodic)
ScheduledFutureTask.super.run();
// 周期性任务,先调用父类的runAndReset()方法,这个父类也是FutureTask
// 本文主要分析下面的部分
else if (ScheduledFutureTask.super.runAndReset()) {
// 设置下次执行的时间,ScheduledFutureTask中的方法
setNextRunTime();
// 重复执行
reExecutePeriodic(outerTask);
}
}
可以看到,对于重复性任务,先调用FutureTask的runAndReset()方法去执行任务,再调用setNextRunTime()设置下次执行的时间,最后再调用reExecutePeriodic()方法来使任务重新开始执行。
FutureTask的runAndReset()方法与run()方法基本一样,只是其任务运行完毕后不会把状态修改为NORMAL。
// 基本和FutureTask.run()方法是一样的
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
// 执行任务
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
再来看看reExecutePeriodic()方法。
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
// 线程池状态检查
if (canRunInCurrentRunState(true)) {
// 再次把任务扔到任务队列中
super.getQueue().add(task);
// 再次检查线程池状态
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
// 保证工作线程足够
ensurePrestart();
}
}
到这里是不是豁然开朗了,原来定时线程池执行重复任务是在任务执行完毕后,又把任务扔回了任务队列中。
重复性的问题解决了,那么,它是怎么控制任务在某个时刻执行的呢?
OK,这就轮到我们的延时队列登场了。
2.6 内部类 DelayedWorkQueue
我们知道,线程池执行任务时需要从任务队列中拿任务,而普通的任务队列,如果里面有任务就直接拿出来了,但是延时队列不一样,它里面的任务,如果没有到时间也是拿不出来的,这也是前面分析中一上来就把任务扔进队列且创建Worker没有传入firstTask的原因。
说了这么多,它到底是怎么实现的呢?
定时任务线程池用的是哪种队列来实现的?
答:延时队列。定时任务线程池中并没有直接使用并发集合中的DelayQueue,而是自己又实现了一个DelayedWorkQueue,不过跟DelayQueue的实现原理是一样的。
延时队列使用什么数据结构来实现的呢?
答:堆(DelayQueue中使用的是优先级队列,而优先级队列使用的堆;DelayedWorkQueue直接使用的堆)。
延时队列内部是使用“堆”这种数据结构来实现的。
我们这里只拿一个take()方法出来分析。
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 加锁
lock.lockInterruptibly();
try {
// 自旋
for (;;) {
// 堆顶任务
RunnableScheduledFuture<?> first = queue[0];
// 如果队列为空,则等待
if (first == null)
available.await();
else {
// 还有多久到时间
long delay = first.getDelay(NANOSECONDS);
// 如果小于等于0,说明这个任务到时间了,可以从队列中出队了
if (delay <= 0)
// 出队,然后堆化
return finishPoll(first);
// 还没到时间
first = null;
// 如果前面有线程在等待,当前线程直接进入等待状态
if (leader != null)
available.await();
// 如果前面没有有线程在等待
else {
// 当前线程作为leader
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待上面计算的延时时间,再自动唤醒
available.awaitNanos(delay);
} finally {
// 唤醒后再次获得锁后把leader再置空
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
// 相当于唤醒下一个等待的任务
available.signal();
// 解锁
lock.unlock();
}
}
大致的原理是,利用堆的特性获取最快到时间的任务,即堆顶的任务:
(1)如果堆顶的任务到时间了,就让它从队列中出队;
(2)如果堆顶的任务还没到时间,就看它还有多久到时间,利用条件锁等待这段时间,待时间到了后重新走(1)的判断;
这样就解决了可以在指定时间后执行任务。
上面讲的是固定周期性执行任务scheduleAtFixedRate,其实固定延时执行任务scheduleWithFixedDelay源码流程上和scheduleAtFixedRate基本上完全一样。唯一的区别就是他们两个对ScheduledFutureTask对象的time属性存储的数据的不同,scheduleAtFixedRate存储的是任务开始的时间点,scheduleWithFixedDelay存储的是任务执行结束的时间点。这也就导致了在算任务下一次开始时间点是计算出来的是不同的,一个是用任务开始时间 + 任务周期,一个是用任务执行结束时间 + 任务周期。这也就实现了两者不同的周其性质。
2.7 其它
其实,ScheduledThreadPoolExecutor也是可以使用execute()或者submit()提交任务的,只不过它们会被当成0延时的任务来执行一次。
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}
public <T> Future<T> submit(Callable<T> task) {
return schedule(task, 0, NANOSECONDS);
}
三、总结
实现定时任务有两个问题要解决,分别是指定未来某个时刻执行任务、重复执行。
(1)指定某个时刻执行任务,是通过延时队列的特性来解决的;
(2)重复执行,是通过在任务执行后再次把任务加入到队列中来解决的。
定时任务线程池的运行时序图:
到这里基本上定时任务线程池的源码解析就结束了,这种线程池是比较经典的实现方式,整体上来说,效率相对不是特别高,因为所有的工作线程共用同一个队列,每次从队列中取任务都要加锁解锁操作。
相关文章:【线程池】Java的线程池
【线程池】Java线程池的核心参数
【线程池】Executors框架创建线程池
【线程池】ScheduledExecutorService接口和ScheduledThreadPoolExecutor定时任务线程池使用详解 【线程池】线程池的拒绝策略(饱和策略)
【线程池】线程池的ctl属性详解
【线程池】史上最全的ThreadPoolExecutor源码详解
【阻塞队列】阻塞队列DelayedWorkQueue源码详解