ScheduledExecutorService
一、背景
大家好呀,上周我们公司由于定时线程池使用不当出了一个故障,几千万的单子可能没了
给兄弟们分享分享这个坑,希望兄弟们以后别踩!
业务中大量的使用定时线程池(ScheduledExecutorService
)执行任务,有时候会忽略掉 Try/Catch
的异常判断
当任务执行报错时,会导致整个定时线程池挂掉,影响业务的正常需求
二、问题
我们来模仿一个生产的例子:
-
合作方修改频率低且合作方允许最终一致性
-
我们有一个定时任务每隔
60
秒去MySQL
拉取全量的合作方
数据放至 合作方缓存(本地缓存) 中 -
当客户请求时,我们去缓存中拿取合作方即可
这样的生产例子应该存在于绝大数公司,代码如下:
public class Demo {
// 创建定时线程池
private static ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
private List<String> partnerCache = new ArrayList<>();
@PostConstruct
public void init() {
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// 启动时每隔60秒执行一次数据库的刷新
// 将数据缓存至本地
loadPartner();
}
}, 3, 60, TimeUnit.SECONDS);
}
public void loadPartner() {
// 查询数据库当前最新合作方数据
List<String> partnerList = queryPartners();
// 合作方数据放至缓存
partnerCache.clear();
partnerCache.addAll(partnerList);
}
public List<String> queryPartners() {
// 数据库挂了!
throw new RuntimeException();
}
}
运行上述样例,我们会发现程序不停止,输出一遍 Load start!
,一直在运行,但后续不输出 Load start!
这个时候我们可以确认:异常确实导致当前任务不再执行
1、为什么任务报错会影响定时线程池?
2、定时线程池是真的挂掉了嘛?
3、定时线程池内部是如何执行的?
跟着这三个问题,我们一起来看一看 ScheduledExecutorService
的原理介绍
三、原理剖析
对于 ScheduledExecutorService
来说,本质上是 延时队列 + 线程池
1、延时队列介绍
DelayQueue
是一个无界的 BlockingQueue
,用于放置实现了Delayed接口的对象,只能在到期时才能从队列中取走。
这种队列是有序的,即队头对象的延迟到期时间最长。
我们看一下延时队列里对象的属性:
class MyDelayedTask implements Delayed{
// 当前任务创建时间
private long start = System.currentTimeMillis();
// 延时时间
private long time ;
// 初始化
public MyDelayedTask(long time) {
this.time = time;
}
/**
* 需要实现的接口,获得延迟时间(用过期时间-当前时间)
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert((start+time) - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
}
/**
* 用于延迟队列内部比较排序(当前时间的延迟时间 - 比较对象的延迟时间)
*/
@Override
public int compareTo(Delayed o) {
MyDelayedTask o1 = (MyDelayedTask) o;
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
}
所以,延时队列的实现原理也很简单:
- 生产端:投递消息时增加时间戳(当前时间+延时时间)
- 消费端:用当前时间与时间戳进行比较,若小于则消费,反之则循环等待
2、线程池的原理介绍
- 当前的线程池个数低于核心线程数,直接添加核心线程即可
- 当前的线程池个数大于核心线程数,将任务添加至阻塞队列中
- 如果添加阻塞队列失败,则需要添加非核心线程数处理任务
- 如果添加非核心线程数失败(满了),执行拒绝策略
3、定时线程的原理
我们从定时线程池的创建看:scheduledExecutorService.scheduleAtFixedRate(myTask, 3L, 1L, TimeUnit.SECONDS);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
// 初始化我们的任务
// triggerTime:延时的实现
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 将当前任务丢进延时队列
super.getQueue().add(task);
// 创建核心线程并启动
ensurePrestart();
}
// 时间轮算法
private long triggerTime(long delay, TimeUnit unit) {
return now() + delay;
}
从这里我们可以得到结论:定时线程池通过延时队列来达到定时的目的
有一个问题:我们仅仅向 Queue
里面放了一个任务,他是怎么保证执行多次的呢?
带着这个问题,我们看一下他拉取任务启动的代码:
for (;;) {
// 从延时队列中获取任务
Runnable r = workQueue.take();
}
public RunnableScheduledFuture<?> take(){
for (;;) {
// 获取队列第一个任务
RunnableScheduledFuture<?> first = queue[0];
// 【重点】如果当前队列任务为空,则等待
if (first == null){
available.await();
}
// 获取当前任务的时间
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0){
// 弹出当前任务
return finishPoll(first);
}
}
}
// 时间戳减去当前时间
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
当拿到任务(ScheduledFutureTask)之后,会执行任务:task.run()
public void run() {
// 执行当前的任务
if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
protected boolean runAndReset() {
if (state != NEW){
return false;
}
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
// 执行任务
c.call();
// 【重点!!!】如果任务正常执行成功的话,这里会将ran置为true
// 如果你的任务有问题,会被下面直接捕捉到,不会将此处的ran置为true
ran = true;
} catch (Throwable ex) {
// 出现异常会将state置为EXCEPTIONAL
// 标记当前任务执行失败并将异常赋值到结果
setException(ex);
}finally {
s = state;
}
}
}
// ran:当前任务是否执行成功
// s:当前任务状态
// ran为false:当前任务执行失败
// s == NEW = false:当前任务状态出现异常
return ran && s == NEW;
}
如果我们的 runAndReset
返回 false
的话,那么进不去 setNextRunTime
该方法:
if (ScheduledFutureTask.super.runAndReset()) {
// 修改当前任务的Time
setNextRunTime();
// 将任务重新丢进队列
reExecutePeriodic(outerTask);
}
最终,任务没有办法被丢进队列,我们的线程无法拿到任务执行,一直在等待。
四、结论
通过上面的分析,我们回头看一下开篇的三个问题:
1、为什么任务报错会影响定时线程池?
- 任务报错不会影响线程池,只是线程池将当前任务给丢失,没有继续放到队列中
2、定时线程池是真的挂掉了嘛?
- 定时线程池没有挂,挂的只是报错的任务
3、定时线程池内部是如何执行的?
- 线程池 + 延时队列
所以,通过上述的讲解,我们应该认识到:定时任务一定要加Try Catch,不然一旦发生异常
不然,你就会和作者一样,背故障让公司损失几千万,血的教训!
在阿里巴巴的开发文档中也曾说过:
前人总结的经验真的是血的教训,一定要听劝【哭】。
需要阿里巴巴开发文档书籍版的小伙伴点击获取先关资料