最近项目中需要用到超时操作,对于不是特别优秀的timer和DelayQueue没有看。
- Timer 是单线程模式。如果某个 TimerTask 执行时间很久,会影响其他任务的调度。
- Timer 的任务调度是基于系统绝对时间的,如果系统时间不正确,可能会出现问题。
- TimerTask 如果执行出现异常,Timer 并不会捕获,会导致线程终止,其他任务永远不会执行。
- 相比于 Timer,DelayQueue 只实现了任务管理的功能,需要与异步线程配合使用。
着重看了一下ScheduledThreadPoolExecutor和时间轮算法,以下一些大致讲解和我的理解。
ScheduledThreadPoolExecutor
前置知识
继承于ThreadPoolExecutor,也就是说调用线程执行任务的流程是一样的,详见ensurePrestart();方法。
在ScheduledThreadPoolExecutor中使用的阻塞队列是DelayedWorkQueue,一个自定义的类。内部使用以下queue存放定时任务。
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
上边提到的这个queue,是一个模拟优先级队列的堆(定时任务类实现了compareTo的方法),
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
查询时间复杂度log(n),可以从siftUp方法看出来是以何种规则把任务存放到数组。
大致流程
创建一个定时任务
把任务放在优先级阻塞队列
新增worker(ThreadPoolExecutor的流程)从阻塞队列拿出任务
判断任务时间(任务中会有预期执行时间,时间不到则调用available.awaitNanos(delay);阻塞)
由于是优先级队列,所以都从queue[0]去取任务,一个线程阻塞,其他线程available.await();也阻塞
具体代码:
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0L)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
return f;
}
public RunnableScheduledFuture<?> poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
RunnableScheduledFuture<?> first = queue[0];
return (first == null || first.getDelay(NANOSECONDS) > 0)
? null
: finishPoll(first);
} finally {
lock.unlock();
}
}
个人看法
面对海量任务插入和删除的场景,会遇到比较严重的性能瓶颈
原因有以下两方面:
1. 自定义的阻塞队列queue不能自定义初始容量,只有16,需要考虑是否会频繁扩容
2. 就是上方提到的以下两个方法会导致 DelayedWorkQueue 数据结构频繁发生变化
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
/**
* Sifts element added at top down to its heap-ordered spot.
* Call only when holding lock.
*/
private void siftDown(int k, RunnableScheduledFuture<?> key) {
int half = size >>> 1;
while (k < half) {
int child = (k << 1) + 1;
RunnableScheduledFuture<?> c = queue[child];
int right = child + 1;
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
if (key.compareTo(c) <= 0)
break;
queue[k] = c;
setIndex(c, k);
k = child;
}
queue[k] = key;
setIndex(key, k);
}
时间轮算法
原理
原理很简单,一张图直接概括(偷来的图,下图是netty的实现),只看原理不看类名
时间的维度是无限的,但是钟表却能表示无限的时间。所以这个时间轮算法和钟表相似都可以表示无限的时间,只要把圈数和槽位记录好即可。
需要注意的是,时间轮算法虽然看起来是物理上的⚪,但其实只是逻辑上的回环,走到末尾时重新回到头。
类比钟表,时间轮上也有一个个的插槽(slot),一个插槽代表的是时间间隔(interval),时间间隔越小,时间表示越精确。
大致流程
放入定时任务
封装任务记录剩余圈数,槽位等等
扫描槽位以及链表,扫描到对应槽位找到剩余圈数为0的执行
个人看法,因为这只是算法思想,所以以下不是缺陷而是需要注意的点
- 如果长时间没有到期任务,那么会存在时间轮空推进的现象。
- 只适用于处理耗时较短的任务,由于 Worker 是单线程的,如果一个任务执行的时间过长,会造成 Worker 线程阻塞。
- 相比传统定时器的实现方式,内存占用较大。