Netty主要应用用于网络通信,Netty还有一个非常重要的应用领域,即时通信系统IM, 在IM聊天系统中,有成千上万条条链路, Netty是如何管理这些链路的呢 ? Netty还有一套自带的心跳检测机制,这套检测机制的原理是通过创建多个定时任务ScheduleFutureTask,定时一段时间与客户端进行通信 ,确保连接可用。
Netty时间轮的解读
数据结构
时间轮其实就是一种环形的数据结构,其设计参考了时钟转动的思维,可以想象成时钟,分成很多格子,一个格子代表一段时间(时间越短,精度越高,过段没有意义,根据具体使用场景裁定)。并用一个链表表示该格子上的到期任务,一个指针随着时间一格一格转动,并执行相应格子中的到期任务。任务通过取摸决定放入那个格子。如下图所示:
都知道时钟有指针,刻度,每刻度表示的时长等属性, Netty的时间轮设计也差不多, 只是时钟的指针有时,分,秒,而Netty只用了一个指针,那么Netty是如何把定时任务加到时间轮的呢? 下面先看一幅时间轮的构造图。
中间的圆轮代表一个时间周期,轮子上的每个节点关联的链表代表该时间点要触发的任务。如上图所示,假设一个格子是1秒,则整个wheel能表示的时间段为8s,假如当前指针指向2,此时需要调度一个3s后执行的任务,显然应该加入到(2+3=5)的方格中,指针再走3次就可以执行了;如果任务要在10s后执行,应该等指针走完一个round零2格再执行,因此应放入4,同时将round(1)保存到任务中。检查到期任务时应当只执行round为0的,格子上其他任务的round应减1。
从图中可以看出,当指针指向某一个刻度时, 综会把此刻度中的所有的task任务一一取出并运行,在解读Netty的时间轮代码前 。
- 时间轮的指针走一轮是多久?
- 时间轮采用什么容器存储这些task的?
- 定时任务的运行时间若晚于指针走一轮的终点,则此时任务放在哪个刻度。
- 刻度的时间间隔标注为tickDuration, 同时将时间轮的一轮的刻度总数标注为wheelLen,两者都是时间轮属性,可以通过构造方法由使用者传入,这样就可以得到时间轮指针走一轮的时长 = tickDuration * wheelLen。
- 当指针运行到某一刻度时,需要把映射在此刻度上所有的任务都取出来 ,而刻度总数在时间轮初始化后就固定了。 因此与Map相似 , 采用数组标识wheel[] 加链表的方式来存储这些task,数组的大小固定为图中的N , 刻度编号就是wheel[]的下标 。
- 每个时间轮启动都会记录其启动时间,同时每个定时任务都有其确定的执行时间,用这个执行时间减去时间轮的启动时间,再除以刻度的持续时长,就能获取这个定时任务需要指针走过多少刻度才运行,标注为calculated。
时间轮本身记录了当前指针已经走过多少刻度,标注为tick,通过caclulated,tick ,时间轮刻度总数wheelLen计算定时任务在哪一刻度上执行(此刻度标注为stopIndex) ,需要分以下几种情况进行处理。
- 当calculated < tick时, 说明这项任务已经是旧的任务了,可以立即执行。因此stopIndex = tick 。
- 当(calculated - tick ) <= wheelLen时, stopIndex = (calculated - tick )。
- 当(calculated - tick) > wheelLen 时, calculated肯定大于wheelLen,若wheelLen是2的整数次幂, 则可以运用与运算stopIndex = calculated & (wheelLen - 1), 若wheelLen 不是2的整数次幂,则把它转换成距离最近的2个整数次幂即可。
时间轮源码剖析之初始化构建
经过以上的3个问题进行分析,对时间轮的构造有了基本的认知,了解了时间轮内部属性特性,以及定时任务与刻度的映射关系,但具体时间轮是如何运行的,它的指针是如何跳动的,这都需要通过仔细阅读Netty的时间轮源码来寻找答案 , 时间轮源码分为两部分,第一部分包含时间轮的核心属性,初始化构建,启动和定时检测任务的添加,第二部分主要是对时间轮的时钟Worker线程的剖析,线程的核心功能有时钟指针的刻度跳动,超时任务处理,任务的取消等。
public interface Timer { Timeout newTimeout(TimerTask task, long delay, TimeUnit unit); Set<Timeout> stop(); }
HashedWheelTimer本质上是一个Timer,用于将任务定时执行,newTimeout用于添加任务,stop用于终止Timer执行.
在分析源码之前我们先看一下netty时间轮实现中的核心组件,以便于分析过程中有比较清晰的脉络关系:
public class HashedWheelTimerTest { public static void main(String[] args) throws Exception { CountDownLatch countDownLatch = new CountDownLatch(1); HashedWheelTimer timer = new HashedWheelTimer(1, TimeUnit.SECONDS, 16); final long start = System.currentTimeMillis(); timer.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { System.out.println("task execute,current timestamp=" + (System.currentTimeMillis() - start)); countDownLatch.countDown(); } }, 2000, TimeUnit.MILLISECONDS); countDownLatch.await(); System.out.println("============================" + (System.currentTimeMillis() - start)); timer.stop(); } }
看结果输出 :
这个例子的测试目的很简单,就是创建一个TimerTask任务,只有当这个任务执行完,才停止 timer,从而打印停止时间 。 进入HashedWheelTimer的构造方法 。
static final InternalLogger logger = InternalLoggerFactory.getInstance(HashedWheelTimer.class); private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger(); //时间轮的实例个数 private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean(); // 在服务过程中,时间轮实例个数不能超过64个 private static final int INSTANCE_COUNT_LIMIT = 64; private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1); // 刻度持续时最小值,不能小于这个最小值 private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = ResourceLeakDetectorFactory.instance() .newResourceLeakDetector(HashedWheelTimer.class, 1); // 内存泄漏检测 private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER = // 原子性更新时间轮工作状态,防止多线程重复操作 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState"); private final ResourceLeakTracker<HashedWheelTimer> leak; // 内存泄漏检测虚引用 private final Worker worker = new Worker(); // 用于构建时间轮工作线程的Runnable掌控指针的跳动 private final Thread workerThread; // 时间轮工作线程 // 时间轮的3种工作状态分别为初始化,已经启动正在运行,停止 public static final int WORKER_STATE_INIT = 0; public static final int WORKER_STATE_STARTED = 1; public static final int WORKER_STATE_SHUTDOWN = 2; @SuppressWarnings({ "unused", "FieldMayBeFinal" }) private volatile int workerState; // 0 - init, 1 - started, 2 - shut down private final long tickDuration; // 每刻度的持续时间 // 此数组用于存储映射在时间轮刻度上的 private final HashedWheelBucket[] wheel; private final int mask; // 时间轮总格子数 -1 private final CountDownLatch startTimeInitialized = new CountDownLatch(1); // 同步计数器,时间轮Workder 线程启动后,将同步给调用时间轮的线程 // 超时task任务队列,先将任务放入到这个队列中, 再在Worker 线程中队列中取出并放入wheel[]的链表中 private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue(); // 取消的task任务存放队列,在Worker线程中会检测是否有任务需要取消 , 若有,则找到对应的链表,并修改这些取消任务的前后任务的指针 private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue(); private final AtomicLong pendingTimeouts = new AtomicLong(0); private final long maxPendingTimeouts; // 时间轮最多容纳多少定时检测任务,默认为-1,无限制 private volatile long startTime; // 时间轮启动时间 public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) { this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel); } public HashedWheelTimer( ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) { this(threadFactory, tickDuration, unit, ticksPerWheel, true); } public HashedWheelTimer( ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) { this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1); } /** * 时间轮构造函数 * @param threadFactory 线程工厂,用于创建线程 * @param tickDuration 刻度持续时长 * @param unit 刻度持续时长单位 * @param ticksPerWheel 时间轮总刻度数 * @param leakDetection 是否开启内存泄漏检测 * @param maxPendingTimeouts 时间轮可接受最大定时检测任务数 */ public HashedWheelTimer( ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } if (unit == null) { throw new NullPointerException("unit"); } if (tickDuration <= 0) { throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration); } if (ticksPerWheel <= 0) { throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel); } // Normalize ticksPerWheel to power of two and initialize the wheel. 对时间轮刻度数进行格式化,转换成高ticksPerWheel最近的2的整数次幂,并初始化wheel 数组 wheel = createWheel(ticksPerWheel); mask = wheel.length - 1; // Convert tickDuration to nanos. 把刻度持续时长转换成纳秒,这样更加精确 long duration = unit.toNanos(tickDuration); // Prevent overflow. 检测持续时长不能太长,但也不能太短 if (duration >= Long.MAX_VALUE / wheel.length) { throw new IllegalArgumentException(String.format( "tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / wheel.length)); } // MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1) 刻度持续时最小值,不能小于这个最小值 if (duration < MILLISECOND_NANOS) { if (logger.isWarnEnabled()) { logger.warn("Configured tickDuration %d smaller then %d, using 1ms.", tickDuration, MILLISECOND_NANOS); } this.tickDuration = MILLISECOND_NANOS; } else { this.tickDuration = duration; } workerThread = threadFactory.newThread(worker); // 构建时间轮的Worker线程 leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null; // 是否需要内存泄漏检测 this.maxPendingTimeouts = maxPendingTimeouts; // 最大定时检测任务个数 if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT && // INSTANCE_COUNT_LIMIT 默认为64 , 时间轮实例个数检测,超过64个会告警 WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) { reportTooManyInstances(); } } private static void reportTooManyInstances() { if (logger.isErrorEnabled()) { String resourceType = simpleClassName(HashedWheelTimer.class); logger.error("You are creating too many " + resourceType + " instances. " + resourceType + " is a shared resource that must be reused across the JVM," + "so that only a few instances are created."); } }
上面都是一些初始化时间轮的代码,很简单,接下来看创建时间轮的方法 。
/** * 格式化总刻度数,初始化时间轮容器 * @param ticksPerWheel * @return */ private static HashedWheelBucket[] createWheel(int ticksPerWheel) { if (ticksPerWheel <= 0) { throw new IllegalArgumentException( "ticksPerWheel must be greater than 0: " + ticksPerWheel); } if (ticksPerWheel > 1073741824) { throw new IllegalArgumentException( "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel); } ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel); // 格式化 HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel]; for (int i = 0; i < wheel.length; i ++) { wheel[i] = new HashedWheelBucket(); } return wheel; } // 找到离ticksPerWheel最近的2个整数次幂 private static int normalizeTicksPerWheel(int ticksPerWheel) { int normalizedTicksPerWheel = 1; while (normalizedTicksPerWheel < ticksPerWheel) { normalizedTicksPerWheel <<= 1; } return normalizedTicksPerWheel; }
从时间轮的创建方法得知,我们创建了一个HashedWheelBucket数组,而HashedWheelBucket数组的长度为2的幂次方倍,而HashedWheelBucket就是任务链表。
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { if (task == null) { throw new NullPointerException("task"); } if (unit == null) { throw new NullPointerException("unit"); } long pendingTimeoutsCount = pendingTimeouts.incrementAndGet(); // 需等待执行任务数+ 1 , 同时判断是否超过最大限制 if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) { pendingTimeouts.decrementAndGet(); throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")"); } start(); // 若时间轮Worker线程未启动,则需要启动 // Add the timeout to the timeout queue which will be processed on the next tick. // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket. // 根据定时任务延时执行时间与时间轮启动时间,获取相对的时间轮开始后的任务执行延时时间,因为时间轮开始启动时间不是会改变的, 所以通过这个时间可以获取时钟需要跳动的刻度 long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; // Guard against overflow. if (delay > 0 && deadline < 0) { deadline = Long.MAX_VALUE; } // 构建定时检测任务,并将其添加到新增定时检测任务队列中, 在Worker线程中,会从队列中取出定时检测任务并发放入缓存数组wheel中 HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); timeouts.add(timeout); return timeout; }
重点看时间轮启动方法 。
public void start() { // 时间轮启动 switch (WORKER_STATE_UPDATER.get(this)) { // 根据时间轮状态进行对应的处理 case WORKER_STATE_INIT: // 当时间轮处于初始化状态时,需要启动它 if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) { // 原子性启动 workerThread.start(); } break; case WORKER_STATE_STARTED: break; case WORKER_STATE_SHUTDOWN: throw new IllegalStateException("cannot be started once stopped"); default: throw new Error("Invalid WorkerState"); } // Wait until the startTime is initialized by the worker. while (startTime == 0) { // 等待Worker 线程初始化成功 try { startTimeInitialized.await(); } catch (InterruptedException ignore) { // Ignore - it will be ready very soon. } } }
启动方法中重点看workerThread.start(), 如果多个线程同时调用start()方法,只有一个线程CAS抢锁成功,并调用workerThread.start( )代码,而其他的线程都在等待startTimeInitialized.await(),那其他等待的线程什么时候从等待中唤醒呢?请看Worker任务的执行方法 。
大家可能对CountDownLatch使用得比较少,我们可以看一个例子来理解CountDownLatch在这里的原理 。
public class HashedWheelTimerTest2 { public static void main(String[] args) throws Exception { CountDownLatch countDownLatch = new CountDownLatch(1); final long start = System.currentTimeMillis(); for(int i = 0 ;i < 3 ;i ++){ new Thread(new Runnable() { @Override public void run() { try { countDownLatch.await(); System.out.println("===========" + (System.currentTimeMillis() - start)); } catch (InterruptedException e) { throw new RuntimeException(e); } } }).start(); } new Thread(new Runnable() { @Override public void run() { try { System.out.println("----------start------------------"); Thread.sleep(3000); countDownLatch.countDown(); System.out.println("--------------end -----------------"); } catch (InterruptedException e) { throw new RuntimeException(e); } } }).start(); } }
执行结果
只有当countDown()方法执行完之后,所有的wait()方法才会继续向下执行,接下来进入Worker的run()方法 。
private final class Worker implements Runnable { private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>(); // 当调用了时间轮的stop()方法后,将获取其未执行完的任务 private long tick; // 时钟指针的跳动次数 @Override public void run() { // Initialize the startTime. startTime = System.nanoTime(); // 时间轮启动的时间 if (startTime == 0) { // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized. startTime = 1; } // Notify the other threads waiting for the initialization at start(). Worker 线程初始化了,通知调用时间轮启动的线程 startTimeInitialized.countDown(); do { // 获取下一刻度时间轮总体的执行时间,记录这个时间与时间轮启动时间和大于当前时间时, 线程会睡眠到这个时间点 final long deadline = waitForNextTick(); if (deadline > 0) { int idx = (int) (tick & mask); // 获取刻度的编号,即wheel 数组的下标 processCancelledTasks(); // 先处理需要取消的任务 HashedWheelBucket bucket = wheel[idx]; // 获取刻度所在的缓存链表 transferTimeoutsToBuckets(); // 把新增加的定时任务加入wheel数组的缓存链表中 bucket.expireTimeouts(deadline); // 循环执行刻度所在的缓存链表 tick++; // 执行完后,指针才正式跳动 } } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); // 时间轮状态需要为已经启动状态 // Fill the unprocessedTimeouts so we can return them from stop() method. for (HashedWheelBucket bucket: wheel) { // 运行到这里说明时间轮停止了,需要把未处理的任务返回 bucket.clearTimeouts(unprocessedTimeouts); } for (;;) { // 刚刚加入还未来得及放入时间轮缓存中的超时任务 ,也需要捞出并放入到unprocessedTimeouts中一起返回 HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { break; } if (!timeout.isCancelled()) { unprocessedTimeouts.add(timeout); } } processCancelledTasks(); // 处理需要取消的任务 } }
Worker线程是整个时间轮的核心,它拥有一个属性——tick。 tick与时间刻度有一定的关联,指针每经过一个刻度后,tick++; tick与mask
(时间轮总格子数-1)进行与操作后,就是时间轮指针的 当前刻度序号。在Worker线程中,tick做了以下4件事。
- 等待下一刻度运行时间到来。
- 从取消任务队列中获取需要取消的任务并处理。
- 从任务队列中获取需要执行的定时检测任务,并把它们放入对 应的刻度链表中。
- 从当前刻度链表中取出需要执行的定时检测任务,并循环执行 这些定时检测任务的run()方法。
上边操作用图描述如下:
接下来看到下一个刻度时需要等待多少毫秒。
private long waitForNextTick() { long deadline = tickDuration * (tick + 1); // 获取下一刻度时间轮总体的执行时间 for (;;) { final long currentTime = System.nanoTime() - startTime; // 当前时间 - 启动时间 // 计算需要睡眠的毫秒时间 , 由于在将纳秒转化毫秒时需要除以1000000,因此需要加上999999,以防赴丢失尾数,任务被提前执行 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000; if (sleepTimeMs <= 0) { // 当睡眠时间小于,且 等于Long.MiN_VALUE时,直跳过此刻度,否则不睡眠,直接执行任务 if (currentTime == Long.MIN_VALUE) { return -Long.MAX_VALUE; } else { return currentTime; } } // Check if we run on windows, as if thats the case we will need // to round the sleepTime as workaround for a bug that only affect // the JVM if it runs on windows. // // See https://github.com/netty/netty/issues/356 Window 操作系统特殊处理, 其Sleep函数是以10ms 为单位进行延时的, // 也就是说,所有小于10且大于0的情况都是10ms, 所有大于 10且小于20的情况都是20ms , 因此这里做了特殊的处理, 对于小于10ms 的,直接不睡眠。 对于 大于 10ms的,去掉层尾数 if (PlatformDependent.isWindows()) { sleepTimeMs = sleepTimeMs / 10 * 10; } try { Thread.sleep(sleepTimeMs); } catch (InterruptedException ignored) { // 当发生异常,发现时间轮状态为WORKER_STATE_SHUTDOWN时,立刻返回 if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) { return Long.MIN_VALUE; } } } }
再来看取消的任务处理。
private void processCancelledTasks() { for (;;) { HashedWheelTimeout timeout = cancelledTimeouts.poll(); if (timeout == null) { // all processed break; } try { timeout.remove(); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown while process a cancellation task", t); } } } } void remove() { HashedWheelBucket bucket = this.bucket; if (bucket != null) { bucket.remove(this); } else { timer.pendingTimeouts.decrementAndGet(); } } public HashedWheelTimeout remove(HashedWheelTimeout timeout) { HashedWheelTimeout next = timeout.next; // remove timeout that was either processed or cancelled by updating the linked-list if (timeout.prev != null) { timeout.prev.next = next; } if (timeout.next != null) { timeout.next.prev = timeout.prev; } if (timeout == head) { // if timeout is also the tail we need to adjust the entry too if (timeout == tail) { tail = null; head = null; } else { head = next; } } else if (timeout == tail) { // if the timeout is the tail modify the tail to be the prev node. tail = timeout.prev; } // null out prev, next and bucket to allow for GC. timeout.prev = null; timeout.next = null; timeout.bucket = null; timeout.timer.pendingTimeouts.decrementAndGet(); return next; }
processCancelledTasks()方法的实现原理也很简单,从cancelledTimeouts队列中取出HashedWheelTimeout,并从bucket链表中移除即可。 那怎样才能将HashedWheelTimeout加入到cancelledTimeouts上呢? 从HashedWheelTimeout的cancel()方法中可以看到 。
public boolean cancel() { // only update the state it will be removed from HashedWheelBucket on next tick. if (!compareAndSetState(ST_INIT, ST_CANCELLED)) { return false; } // If a task should be canceled we put this to another queue which will be processed on each tick. // So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way // we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible. timer.cancelledTimeouts.add(this); return true; }
因此,可以推出时间轮取消方法的使用。
当处理完所有的取消任务后,此时会调用transferTimeoutsToBuckets()将所有未加入到链表的任务不回到bucket链表中,接下来看transferTimeoutsToBuckets()的源码实现。
private void transferTimeoutsToBuckets() { // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just // adds new timeouts in a loop. for (int i = 0; i < 100000; i++) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { // all processed break; } if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) { // Was cancelled in the meantime. continue; } // 每个时间轮启动都会记录其启动时间,同时,每个定时任务都有其确定的执行开始时间,用这个执行开始时间减去时间轮的启动时间, // 再除以刻度的持续时长,就能获取这个定时任务需要指针走过多少刻度才运行,标注为calculated。 long calculated = timeout.deadline / tickDuration; timeout.remainingRounds = (calculated - tick) / wheel.length; final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past. int stopIndex = (int) (ticks & mask); HashedWheelBucket bucket = wheel[stopIndex]; bucket.addTimeout(timeout); } }
这个方法其实还是简单的,通过当前HashedWheelTimeout对象的deadline 执行时间计算出HashedWheelTimeout应该在时间轮的哪个刻度执行,而timeout.remainingRounds 表示时间轮指针运行几圈后执行,如果timeout.remainingRounds为0,则表示指针在当前圈就执行。
如图中的1节点的任务,需要指针运行1圈后才执行,如果当前指针指向的tick为0,那么指针走过的tick为0,1,2,3,4,5,6,7,0,1,2,3。 大家应该明白remainingRounds的含义了吧。 而指针每走一圈,remainingRounds的值就减少1,当remainingRounds=0时,即任务可以执行,接下来看HashedWheelTimeout的执行方法 。
public void expireTimeouts(long deadline) { HashedWheelTimeout timeout = head; // process all timeouts while (timeout != null) { HashedWheelTimeout next = timeout.next; if (timeout.remainingRounds <= 0) { next = remove(timeout); if (timeout.deadline <= deadline) { timeout.expire(); } else { // The timeout was placed into a wrong slot. This should never happen. throw new IllegalStateException(String.format( "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); } } else if (timeout.isCancelled()) { next = remove(timeout); } else { // remainingRounds减少1 timeout.remainingRounds --; } timeout = next; } } public void expire() { if (!compareAndSetState(ST_INIT, ST_EXPIRED)) { return; } try { task.run(this); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t); } } }
接下来看时间轮的停止方法 。
public Set<Timeout> stop() { if (Thread.currentThread() == workerThread) { throw new IllegalStateException( HashedWheelTimer.class.getSimpleName() + ".stop() cannot be called from " + TimerTask.class.getSimpleName()); } if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) { // workerState can be 0 or 2 at this moment - let it always be 2. if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) { INSTANCE_COUNTER.decrementAndGet(); if (leak != null) { boolean closed = leak.close(this); assert closed; } } return Collections.emptySet(); } try { boolean interrupted = false; while (workerThread.isAlive()) { workerThread.interrupt(); try { workerThread.join(100); } catch (InterruptedException ignored) { interrupted = true; } } if (interrupted) { Thread.currentThread().interrupt(); } } finally { INSTANCE_COUNTER.decrementAndGet(); if (leak != null) { boolean closed = leak.close(this); assert closed; } } return worker.unprocessedTimeouts(); }
对于时间轮的停止方法,我们需要注意上面加粗代码,因为当前HashedWheelTimer的workerState的值可能是0,也可能是1,当然,是1的可能性更大,因此先使用WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)来修改时间轮的状态,如果修改失败,那HashedWheelTimer的workerState的值可能的值可能是1,因此再调用WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN)修改workerState的状态为WORKER_STATE_SHUTDOWN。 从细节处可以看到Netty能性能的把控。 如果CAS操作将workerState的值修改为WORKER_STATE_SHUTDOWN,此时需要修改总时间轮的个数INSTANCE_COUNTER 减1,接着调用worker.unprocessedTimeouts()方法返回所有未处理的Timeout,大家有没有好奇,workerThread.join(100)这一行代码的含义。
为什么要等0.1秒呢? 大家发现没有,在等待workerThread线程0.1秒后,返回所有的unprocessedTimeouts,而unprocessedTimeouts从何而来呢? 请看Worker的run()方法 。
public void clearTimeouts(Set<Timeout> set) { for (;;) { HashedWheelTimeout timeout = pollTimeout(); if (timeout == null) { return; } if (timeout.isExpired() || timeout.isCancelled()) { continue; } set.add(timeout); } }
因此现在知道为什么等待工作线程0.1秒了吧,等待工作线程将已经添加到链表中并没有被取消的HashedWheelTimeout,以及未被添加到链表中的HashedWheelTimeout 保存到unprocessedTimeouts中。
多重时间轮
当时间跨度很大时,提升单层时间轮的tickDuration可以减少空转次数,但会导致时间精度变低,层级时间轮既可以避免精度降低,又可以避免指针空转次数,如果有时间跨度较长的定时任务,则可交给层级时间轮去调度
源码总结
HashedWheelTimer时间轮是一个高性能,低消耗的数据结构,它适合用非准实时,延迟的短平快任务,比如心跳检测和会话探活,对于可靠性要求比较严格的延迟任务,时间轮目前并不是比较好的解决方案:
原生时间轮是单机的,在分布式和多实例部署的场景中乏力
宕机重新恢复执行,原生时间轮的存储是Mpsc队列,毫无疑问是内存存储,如果出现宕机或者重启,数据是不可恢复的
对于类似订单超时取消的场景,可以考虑时间轮+zk + db的方式实现,zk做中心化控制,避免超时任务在多节点重复执行,也即是数据去重,db做为延时任务的持久化存储,宕机可恢复;具体方案可行性有待考量,感兴趣可以自己推演。
参考文章
Netty时间轮
在实际的开发中,可能有有发行时间轮的需求,因此这里将时间轮代码给摘取出来,以供将来使用。
代码地址
https://gitee.com/quyixiao/hashed-wheel-timer.git
到这里时间轮代码也分析得差不多了, Netty的代码也分析得差不多了,历时半年时间,Netty源码的解析也告一段落,今后的一段时间可能去研究ZooKeeper源码,下一篇博客见。
Netty测试代码
https://gitee.com/quyixiao/netty-netty-4.1.38.Final.git