Netty源码—10.Netty工具之时间轮二

news2025/4/1 17:27:44

大纲

1.什么是时间轮

2.HashedWheelTimer是什么

3.HashedWheelTimer的使用

4.HashedWheelTimer的运行流程

5.HashedWheelTimer的核心字段

6.HashedWheelTimer的构造方法

7.HashedWheelTimer添加任务和执行任务

8.HashedWheelTimer的完整源码

9.HashedWheelTimer的总结

10.HashedWheelTimer的应用

8.HashedWheelTimer的完整源码

//Netty时间轮
public class HashedWheelTimer implements Timer {
    static final InternalLogger logger = InternalLoggerFactory.getInstance(HashedWheelTimer.class);
    private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
    private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
    private static final int INSTANCE_COUNT_LIMIT = 64;
    private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
    private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = 
        ResourceLeakDetectorFactory.instance().newResourceLeakDetector(HashedWheelTimer.class, 1);
    private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
        AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
    private final ResourceLeakTracker<HashedWheelTimer> leak;
    //指针转动和延时任务执行的线程
    private final Worker worker = new Worker();
    //worker任务封装的工作线程,用于指针转动和触发时间格里的延时任务的执行
    private final Thread workerThread;

    public static final int WORKER_STATE_INIT = 0;
    public static final int WORKER_STATE_STARTED = 1;
    public static final int WORKER_STATE_SHUTDOWN = 2;
    @SuppressWarnings({"unused", "FieldMayBeFinal"})
    private volatile int workerState;//0 - init, 1 - started, 2 - shut down
    //每个时间格的时间跨度,默认为100ms
    private final long tickDuration;
    //时间轮(环形数组),HashedWheelBucket为每个时间格的槽
    private final HashedWheelBucket[] wheel;
    private final int mask;
    private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
    //延时任务队列,队列中为等待被添加到时间轮的延时任务
    private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
    //保存已经取消的延时任务的队列
    private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
    //记录当前的任务数
    private final AtomicLong pendingTimeouts = new AtomicLong(0);
    //最大的任务数
    private final long maxPendingTimeouts;
    //执行延时任务的线程池
    private final Executor taskExecutor;
    //工作线程启动时间
    private volatile long startTime;

    // 构造器 start //
    public HashedWheelTimer() {
        this(Executors.defaultThreadFactory());
    }

    public HashedWheelTimer(long tickDuration, TimeUnit unit) {
        this(Executors.defaultThreadFactory(), tickDuration, unit);
    }

    public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
        this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
    }

    //使用默认的tickDuration(时间格跨度默认为100ms)和默认的ticksPerWheel(时间格总数默认为512)创建一个新的计时器(时间轮)
    public HashedWheelTimer(ThreadFactory threadFactory) {
        this(threadFactory, 100, TimeUnit.MILLISECONDS);
    }

    public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
        this(threadFactory, tickDuration, unit, 512);
    }

    public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) {
        this(threadFactory, tickDuration, unit, ticksPerWheel, true);
    }

    public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {
        this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);
    }

    public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts) { 
        this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, maxPendingTimeouts, ImmediateExecutor.INSTANCE);
    }

    //Creates a new timer.
    //@param threadFactory        创建线程的工厂
    //@param tickDuration         每格的时间间隔,默认100ms,0.1秒
    //@param unit                 时间单位,默认为毫秒
    //@param ticksPerWheel        时间轮的格子数,默认为512;如果传入的不是2的N次方,则会调整为大于等于该参数的第一个2的N次方,好处是可以优化hash值的计算 
    //@param leakDetection        如果false,那么只有工作线程不是后台线程时才会追踪资源泄露,这个参数可以忽略
    //@param maxPendingTimeouts   最大的pending数量(时间轮中任务的最大数量),超过这个值之后调用将抛出异常,0或者负数表示没有限制,默认为-1
    //@param taskExecutor         任务线程池,用于执行提交的任务,调用者负责在不需要时关闭它
    //@throws NullPointerException     if either of threadFactory and unit is null
    //@throws IllegalArgumentException if either of tickDuration and ticksPerWheel is <= 0
    public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts, Executor taskExecutor) { 
        //1.构造参数校验及给实际执行延时任务的线程池taskExecutor赋值
        checkNotNull(threadFactory, "threadFactory");
        checkNotNull(unit, "unit");
        checkPositive(tickDuration, "tickDuration");
        checkPositive(ticksPerWheel, "ticksPerWheel");
        this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");

        //2.将ticksPerWheel(时间轮上的时间格数)向上取值为2的幂,方便进行求商和取余计算
        //3.初始化时间轮wheel
        wheel = createWheel(ticksPerWheel);
        //mask的设计和HashMap一样,通过限制数组的大小为2的幂,利用位运算来替代取模运算,提高性能
        mask = wheel.length - 1;
      
        //4.校验tickDuration和ticksPerWheel
        //Convert tickDuration to nanos.
        long duration = unit.toNanos(tickDuration);
        //防止溢出
        //tickDuration * ticksPerWheel必须小于Long.MAX_VALUE
        if (duration >= Long.MAX_VALUE / wheel.length) {
            throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / wheel.length));
        }
        //tickDuration不能小于1ms
        if (duration < MILLISECOND_NANOS) {
            logger.warn("Configured tickDuration {} smaller then {}, using 1ms.", tickDuration, MILLISECOND_NANOS);
            this.tickDuration = MILLISECOND_NANOS;
        } else {
            this.tickDuration = duration;
        }
        //5.创建工作线程,用于指针转动和触发时间格里的延时任务的执行
        workerThread = threadFactory.newThread(worker);

        leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
        //6.给时间轮中任务的最大数量maxPendingTimeouts赋值
        this.maxPendingTimeouts = maxPendingTimeouts;
        //7.检查HashedWheelTimer的实例数量,如果大于64则打印error日志
        if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
            WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
            reportTooManyInstances();
        }
    }
    // 构造器 end //
    
    @Override
    protected void finalize() throws Throwable {
        try {
            super.finalize();
        } finally {
            //This object is going to be GCed and it is assumed the ship has sailed to do a proper shutdown. 
            //If we have not yet shutdown then we want to make sure we decrement the active instance count.
            if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
                INSTANCE_COUNTER.decrementAndGet();
            }
        }
    }

    //初始化时间轮环形数组
    //@param ticksPerWheel
    private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
        //ticksPerWheel不能大于2^30
        checkInRange(ticksPerWheel, 1, 1073741824, "ticksPerWheel");
        //将ticksPerWheel(轮子上的时间格数)向上取值为2的次幂
        ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
        //创建时间轮环形数组
        HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
        for (int i = 0; i < wheel.length; i ++) {
            wheel[i] = new HashedWheelBucket();
        }
        return wheel;
    }

    //将ticksPerWheel(时间轮上的时间格数)向上取值为2的次幂
    private static int normalizeTicksPerWheel(int ticksPerWheel) {
        int normalizedTicksPerWheel = 1;
        while (normalizedTicksPerWheel < ticksPerWheel) {
            normalizedTicksPerWheel <<= 1;
        }
        return normalizedTicksPerWheel;
    }

    //显式启动后台线程
    //即使没有调用此方法,后台线程也会按需自动启动
    //Starts the background thread explicitly.  
    //The background thread will start automatically on demand even if you did not call this method.
    //@throws IllegalStateException if this timer has been #stop() stopped already
    public void start() {
        switch (WORKER_STATE_UPDATER.get(this)) {
        case WORKER_STATE_INIT:
            if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                //启动工作线程,即启动时间轮
                workerThread.start();
            }
            break;
        case WORKER_STATE_STARTED:
            break;
        case WORKER_STATE_SHUTDOWN:
            throw new IllegalStateException("cannot be started once stopped");
        default:
            throw new Error("Invalid WorkerState");
        }

        //Wait until the startTime is initialized by the worker.
        while (startTime == 0) {
            try {
                //阻塞时间轮的工作线程
                startTimeInitialized.await();
            } catch (InterruptedException ignore) {
                //Ignore - it will be ready very soon.
            }
        }
    }

    @Override
    public Set<Timeout> stop() {
        if (Thread.currentThread() == workerThread) {
            throw new IllegalStateException(HashedWheelTimer.class.getSimpleName() + ".stop() cannot be called from " + TimerTask.class.getSimpleName());
        }

        if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
            //workerState can be 0 or 2 at this moment - let it always be 2.
            if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
                INSTANCE_COUNTER.decrementAndGet();
                if (leak != null) {
                    boolean closed = leak.close(this);
                    assert closed;
                }
            }
            return Collections.emptySet();
        }

        try {
            boolean interrupted = false;
            while (workerThread.isAlive()) {
                workerThread.interrupt();
                try {
                    workerThread.join(100);
                } catch (InterruptedException ignored) {
                    interrupted = true;
                }
            }
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        } finally {
            INSTANCE_COUNTER.decrementAndGet();
            if (leak != null) {
                boolean closed = leak.close(this);
                assert closed;
            }
        }
        return worker.unprocessedTimeouts();
    }

    //添加延时任务
    //@param task 任务
    //@param delay 延时时间
    //@param unit 延时时间单位
    @Override
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        checkNotNull(task, "task");
        checkNotNull(unit, "unit");
        //1.将需要执行的延时任务数pendingTimeouts + 1
        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
       
        //2.如果pendingTimeouts超过maxPendingTimeouts,则抛出异常
        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
            pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount 
                + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")"); 
        }
      
        //3.启动工作线程,即启动时间轮
        start();
     
        //将延时任务添加到延时任务队列timeouts中,该队列将在下一个滴答声中处理(指针的下一次转动)
        //在处理过程中,所有排队的HashedWheelTimeout将被添加到正确的HashedWheelBucket

        //4.计算任务的截止时间deadline = 当前时间 + 当前任务执行的延迟时间 - 时间轮启动的时间
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
        if (delay > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        }
       
        //5.创建延时任务实例HashedWheelTimeout
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
       
        //6.将延时任务实例添加到延时任务队列中
        timeouts.add(timeout);
        return timeout;
    }

    //Returns the number of pending timeouts of this Timer.
    public long pendingTimeouts() {
        return pendingTimeouts.get();
    }

    private static void reportTooManyInstances() {
        if (logger.isErrorEnabled()) {
            String resourceType = simpleClassName(HashedWheelTimer.class);
            logger.error("You are creating too many " + resourceType + " instances. " +
                resourceType + " is a shared resource that must be reused across the JVM, " +
                "so that only a few instances are created.");
        }
    }

    //指针转动和延时任务执行的线程
    private final class Worker implements Runnable {
        //用于记录未执行的延时任务
        private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
    
        //总的tick数(指针嘀嗒的次数)
        private long tick;
    
        @Override
        public void run() {
            //1.记录时间轮启动的时间startTime
            startTime = System.nanoTime();
            if (startTime == 0) {
                //我们在这里使用0作为未初始化值的指示符,所以要确保初始化时它不是0
                startTime = 1;
            }
    
            //2.唤醒被阻塞的start()方法,通知时间轮已经启动完毕
            startTimeInitialized.countDown();
        
            //一直执行do while循环,直到时间轮被关闭
            do {
                //3.阻塞等待下一次指针转动的时间
                //这里会休眠tick的时间,模拟指针走动
                final long deadline = waitForNextTick();
                if (deadline > 0) {
                    //4.计算当前指针指向的时间轮槽位idx
                    int idx = (int) (tick & mask);
                    //5.将已经取消的任务从HashedWheelBucket数组中移除,并将pendingTimeouts任务数 - 1
                    processCancelledTasks();
                    //6.获取当前指针指向的时间槽HashedWheelBucket
                    HashedWheelBucket bucket = wheel[idx];
                    //7.遍历延时任务队列timeouts,将其中的延时任务保存到对应的槽的链表中
                    transferTimeoutsToBuckets();
                    //8.运行目前指针指向的槽中的链表的任务,交给taskExecutor线程池去执行到期的延时任务
                    //9.到期的和取消的延时任务从链表中移除并将pendingTimeouts--
                    bucket.expireTimeouts(deadline);
                    //10.时间轮指针的总转动次数tick++
                    tick++;
                }
            } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
    
            //Fill the unprocessedTimeouts so we can return them from stop() method.
            //11.清除时间轮中不需要处理的任务
            for (HashedWheelBucket bucket: wheel) {
                bucket.clearTimeouts(unprocessedTimeouts);
            }
            //12.将延时任务队列中还未添加到时间轮的延时任务保存到unprocessedTimeouts中
            //遍历任务队列,如果发现有任务被取消,则添加到unprocessedTimeouts,也就是不需要处理的队列中
            for (;;) {
                HashedWheelTimeout timeout = timeouts.poll();
                if (timeout == null) {
                    break;
                }
                if (!timeout.isCancelled()) {
                    //如果延时任务没被取消,记录到未执行的任务Set集合中
                    unprocessedTimeouts.add(timeout);
                }
            }
            //13.处理被取消的任务
            processCancelledTasks();
        }
    
        //将延时任务队列timeouts中等待添加到时间轮中的延时任务,转移到时间轮的指定位置
        //也就是遍历延时任务队列timeouts,将其中的延时任务保存到对应的槽的链表中
        private void transferTimeoutsToBuckets() {
            //每次转移10w个延时任务
            for (int i = 0; i < 100000; i++) {
                //从队列中出队一个延时任务
                HashedWheelTimeout timeout = timeouts.poll();
                if (timeout == null) {
                    //all processed
                    break;
                }
                if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
                    //Was cancelled in the meantime.
                    continue;
                }
                //到期一共需要走多少时间格(tick次数),deadline表示当前任务的延迟时间(从时间轮启动时计算),tickDuration表示时间格的时间间隔 
                long calculated = timeout.deadline / tickDuration;
                //tick已经走了的时间格,到期一共还需要需要走多少圈
                timeout.remainingRounds = (calculated - tick) / wheel.length;
                //如果延时任务在队列中等待太久已经过了执行时间,那么这个时候就使用当前tick,也就是放在当前的bucket,此方法调用完后就会被执行
                final long ticks = Math.max(calculated, tick);
                //槽的索引,stopIndex = tick 次数 & mask, mask = wheel.length - 1
                int stopIndex = (int) (ticks & mask);
                //根据索引该任务应该放到的槽
                HashedWheelBucket bucket = wheel[stopIndex];
                //将任务添加到槽中,链表末尾
                bucket.addTimeout(timeout);
            }
        }
    
        //处理取消掉的延时任务
        //将已经取消的任务从HashedWheelBucket数组中移除,并将pendingTimeouts任务数 - 1
        private void processCancelledTasks() {
            for (;;) {
                HashedWheelTimeout timeout = cancelledTimeouts.poll();
                if (timeout == null) {
                    //all processed
                    break;
                }
                try {
                    timeout.remove();
                } catch (Throwable t) {
                    if (logger.isWarnEnabled()) {
                        logger.warn("An exception was thrown while process a cancellation task", t);
                    }
                }
            }
        }
    
        //从时间轮的启动时间startTime和当前的tick数(指针跳动次数)计算下一次指针跳动的时间,然后休眠等待下一次指针跳动时间到来
        private long waitForNextTick() {
            //deadline返回的是下一次时间轮指针跳动的时间与时间格启动的时间间隔
            long deadline = tickDuration * (tick + 1);
    
            for (;;) {
                //计算当前时间距离启动时间的时间间隔
                final long currentTime = System.nanoTime() - startTime;
                //距离下一次指针跳动还需休眠多长时间
                long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
                //到了指针调到下一个槽位的时间
                if (sleepTimeMs <= 0) {
                    if (currentTime == Long.MIN_VALUE) {
                        return -Long.MAX_VALUE;
                    } else {
                        return currentTime;
                    }
                }
    
                try {
                    //表示距离下一次指针跳动还需要一段时间,所以休眠等待时间的到来
                    Thread.sleep(sleepTimeMs);
                } catch (InterruptedException ignored) {
                    if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                        return Long.MIN_VALUE;
                    }
                }
            }
        }
    
        //记录未执行的延时任务
        public Set<Timeout> unprocessedTimeouts() {
            return Collections.unmodifiableSet(unprocessedTimeouts);
        }
    }
    
    //延时任务
    private static final class HashedWheelTimeout implements Timeout, Runnable {
        private static final int ST_INIT = 0;
        private static final int ST_CANCELLED = 1;
        private static final int ST_EXPIRED = 2;
        private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");

        private final HashedWheelTimer timer;
        private final TimerTask task;
        //任务执行的截止时间 = 当前时间 + 延时任务延时时间 - 时间轮启动时间
        private final long deadline;

        @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
        private volatile int state = ST_INIT;

        //剩下的圈(轮)数
        //remainingRounds将由Worker.transferTimeoutsToBuckets()在HashedWheelTimeout被添加到正确的HashedWheelBucket之前计算和设置
        long remainingRounds;

        //HashedWheelTimerBucket槽中的延时任务列表是一个双向链表
        //因为只有workerThread会对它进行操作,所以不需要 synchronization / volatile
        HashedWheelTimeout next;
        HashedWheelTimeout prev;

        //当前延时任务所插入时间轮的哪个槽
        HashedWheelBucket bucket;

        HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
            this.timer = timer;
            this.task = task;
            this.deadline = deadline;
        }

        @Override
        public Timer timer() {
            return timer;
        }

        @Override
        public TimerTask task() {
            return task;
        }

        @Override
        public boolean cancel() {
            //only update the state it will be removed from HashedWheelBucket on next tick.
            if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
                return false;
            }
            //If a task should be canceled we put this to another queue which will be processed on each tick.
            //So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way
            //we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible.
            timer.cancelledTimeouts.add(this);
            return true;
        }

        void remove() {
            HashedWheelBucket bucket = this.bucket;
            if (bucket != null) {
                bucket.remove(this);
            } else {
                timer.pendingTimeouts.decrementAndGet();
            }
        }

        public boolean compareAndSetState(int expected, int state) {
            return STATE_UPDATER.compareAndSet(this, expected, state);
        }

        public int state() {
            return state;
        }

        @Override
        public boolean isCancelled() {
            return state() == ST_CANCELLED;
        }

        @Override
        public boolean isExpired() {
            return state() == ST_EXPIRED;
        }

        public void expire() {
            if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
                return;
            }

            try {
                timer.taskExecutor.execute(this);
            } catch (Throwable t) {
                if (logger.isWarnEnabled()) {
                    logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName() + " for execution.", t);
                }
            }
        }

        @Override
        public void run() {
            try {
                task.run(this);
            } catch (Throwable t) {
                if (logger.isWarnEnabled()) {
                    logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
                }
            }
        }

        @Override
        public String toString() {
            final long currentTime = System.nanoTime();
            long remaining = deadline - currentTime + timer.startTime;

            StringBuilder buf = new StringBuilder(192).append(simpleClassName(this)).append('(').append("deadline: ");
            if (remaining > 0) {
                buf.append(remaining).append(" ns later");
            } else if (remaining < 0) {
                buf.append(-remaining).append(" ns ago");
            } else {
                buf.append("now");
            }
            if (isCancelled()) {
                buf.append(", cancelled");
            }
            return buf.append(", task: ").append(task()).append(')').toString();
        }
    }

    //存放HashedWheelTimeouts的桶
    //这些数据存储在一个类似于链表的数据结构中,允许轻松删除中间的hashedwheeltimeout
    //HashedWheelTimeout本身作为节点,因此不需要创建额外的对象
    //保存头结点和尾节点,方便于任务的提取和插入
    private static final class HashedWheelBucket {
        //头结点
        private HashedWheelTimeout head;
        //尾节点
        private HashedWheelTimeout tail;

        //Add HashedWheelTimeout to this bucket.
        public void addTimeout(HashedWheelTimeout timeout) {
            assert timeout.bucket == null;
            timeout.bucket = this;
            if (head == null) {
                head = tail = timeout;
            } else {
                tail.next = timeout;
                timeout.prev = tail;
                tail = timeout;
            }
        }

        //Expire all HashedWheelTimeouts for the given deadline.
        public void expireTimeouts(long deadline) {
            HashedWheelTimeout timeout = head;
            //遍历当前时间槽中的所有任务
            while (timeout != null) {
                HashedWheelTimeout next = timeout.next;
                if (timeout.remainingRounds <= 0) {
                    //从链表中移除
                    next = remove(timeout);
                    if (timeout.deadline <= deadline) {
                        //延时任务到期,执行延时任务
                        timeout.expire();
                    } else {
                        //The timeout was placed into a wrong slot. This should never happen.
                        throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
                    }
                    //如果延时任务取消,从链表中移除
                } else if (timeout.isCancelled()) {
                    next = remove(timeout);
                } else {
                    //任务还没到期,剩余的轮数-1
                    timeout.remainingRounds --;
                }
                //将指针放置到下一个延时任务上
                timeout = next;
            }
        }

        //删除槽中链表中的延时任务
        public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
            HashedWheelTimeout next = timeout.next;
            //remove timeout that was either processed or cancelled by updating the linked-list
            if (timeout.prev != null) {
                timeout.prev.next = next;
            }
            if (timeout.next != null) {
                timeout.next.prev = timeout.prev;
            }
            if (timeout == head) {
                //if timeout is also the tail we need to adjust the entry too
                if (timeout == tail) {
                    tail = null;
                    head = null;
                } else {
                    head = next;
                }
            } else if (timeout == tail) {
                //if the timeout is the tail modify the tail to be the prev node.
                tail = timeout.prev;
            }
            //null out prev, next and bucket to allow for GC.
            timeout.prev = null;
            timeout.next = null;
            timeout.bucket = null;
            timeout.timer.pendingTimeouts.decrementAndGet();
            return next;
        }

        //Clear this bucket and return all not expired / cancelled Timeouts.
        public void clearTimeouts(Set<Timeout> set) {
            for (;;) {
                HashedWheelTimeout timeout = pollTimeout();
                if (timeout == null) {
                    return;
                }
                if (timeout.isExpired() || timeout.isCancelled()) {
                    continue;
                }
                set.add(timeout);
            }
        }

        //头结点移除
        private HashedWheelTimeout pollTimeout() {
            HashedWheelTimeout head = this.head;
            if (head == null) {
                return null;
            }
            HashedWheelTimeout next = head.next;
            if (next == null) {
                tail = this.head =  null;
            } else {
                this.head = next;
                next.prev = null;
            }

            //null out prev and next to allow for GC.
            head.next = null;
            head.prev = null;
            head.bucket = null;
            return head;
        }
    }
}

9.HashedWheelTimer的总结

一.时间轮的转动是单线程

但是时间轮中每个时间槽里的延时任务则是由线程池来执行的。

二.延时任务保存到JVM中没有做宕机备份

系统重启时延时任务将会丢失,无法恢复任务进行重新调度。

三.时间轮调度器的时间精度不是很高

对于精度要求特别高的调度任务可能不太适合,因为时间轮的精度取决于时间格的跨度大小。

四.时间轮指针的转动是使用Sleep来完成等待的

10.HashedWheelTimer的应用

(1)时间轮的应用场景

一.Dubbo、Netty、Kafka、Redission等中间件都用到了时间轮机制

二.订单关闭、确认收货、批量定时数据更新等都可以采用时间轮机制

(2)心跳检测

心跳机制会每隔固定的时间发送一个心跳包来检测客户端和服务端的连接状态,客户端发送心跳包用来告诉服务器其还正常运行。

比如在Dubbo中,需要有心跳机制来维持Consumer与Provider的长连接,默认的心跳间隔是60s。当Provider在3次心跳时间内没有收到心跳响应,会关闭连接通道。当Consumer在3次心跳时间内没有收到心跳响应,会进行重连。

在Dubbo的HeaderExchangeClient类中会向时间轮中提交该心跳任务:

一.发送心跳的时间轮

private static final HashedWheelTimer IDLE_CHECK_TIMER =
    new HashedWheelTimer(
        new NamedThreadFactory("dubbo-client-idleCheck", true), 
        1, 
        TimeUnit.SECONDS, 
        TICKS_PER_WHEEL
    );

二.向时间轮中提交心跳任务

private void startHeartBeatTask(URL url) {
    //Client的具体实现决定是否启动该心跳任务
    if (!client.canHandleIdle()) {
        AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this);
        //计算心跳间隔, 最小间隔不能低于1s
        int heartbeat = getHeartbeat(url);
        long heartbeatTick = calculateLeastDuration(heartbeat);
        //创建心跳任务
        this.heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
        //提交到IDLE_CHECK_TIMER这个时间轮中等待执行, 等时间到了时间轮就会去取出该任务进行调度执行
        IDLE_CHECK_TIMER.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
    }
}

(3)超时处理

在Dubbo中发起RPC调用时,通常会配置超时时间,当消费者调用服务提供者出现超时进行一定的逻辑处理。那么怎么检测任务调用超时了呢?我们可以利用定时任务。

每次发起RPC调用时创建一个Future,记录这个Future的创建时间与超时时间,后台有一个定时任务进行检测。当Future到达超时时间并且没有被处理时,就需要对这个Future执行超时逻辑处理。

(4)Redisson分布式锁续期

Redisson看门狗机制,通过时间轮定时给分布式锁续期。在获取锁成功后,Redisson会封装一个锁续期的延时任务放入到时间轮中。默认10秒检查一下,用于对获取到的锁进行续期,延长持有锁的时间。如果业务机器宕机了,那么续期的延时任务失效,也无法续期,锁会超时释放。

一.添加续期延时任务

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }

    //这边newTimeout点进去发现就是往时间轮中提交了一个任务
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }

            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getName() + " expiration", e);
                    return;
                }
                if (res) {
                    //续期成功后继续调度, 又往时间轮中放一个续期任务
                    renewExpiration();
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    ee.setTimeout(task);
}

二.lua续期代码

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    //通过lua脚本对锁进行续期
    return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
        "return 1; " +
        "end; " +
        "return 0;",
        Collections.singletonList(getName()),
        internalLockLeaseTime, getLockName(threadId)
    );
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2325235.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

算法学习记录:递归

递归算法的关键在于回复现场&#xff0c;dfs&#xff08;&#xff09;函数返回值、结束条件、它的作用。 目录 1.综合练习 2. 二叉树的深搜 1.综合练习 39. 组合总和 - 力扣&#xff08;LeetCode&#xff09; 关键在画出的决策树当中&#xff0c;前面使用过的2、3&#xff0c;…

可发1区的超级创新思路(python\matlab实现):MPTS+Lconv+注意力集成机制的Transformer时间序列模型

首先声明,该模型为原创!原创!原创!且该思路还未有成果发表,感兴趣的小伙伴可以借鉴! 应用场景 该模型主要用于时间序列数据预测问题,包含功率预测、电池寿命预测、电机故障检测等等。 一、模型整体架构(本文以光伏功率预测为例) 本模型由多尺度特征提取模块(MPTS)…

三、分类模块,通用组件顶部导航栏Navbar

1.封装通用组件顶部导航栏Navbar 不同效果 Component export struct MkNavbar {Prop title: string Prop leftIcon: ResourceStr $r("app.media.ic_public_left")ProprightIcon: ResourceStr $r("app.media.ic_public_more")PropshowLeftIcon: boolean…

PHY——LAN8720A 寄存器读写 (二)

文章目录 PHY——LAN8720A 寄存器读写 (二)工程配置引脚初始化代码以太网初始化代码PHY 接口实现LAN8720 接口实现PHY 接口测试 PHY——LAN8720A 寄存器读写 (二) 工程配置 这里以野火电子的 F429 开发板为例&#xff0c;配置以太网外设 这里有一点需要注意原理图 RMII_TXD0…

Flutter_学习记录_AppBar中取消leading的占位展示

将leading设置为null将automaticallyImplyLeading设置为false 看看automaticallyImplyLeading的说明&#xff1a; Controls whether we should try to imply the leading widget if null. If true and [AppBar.leading] is null, automatically try to deduce what the leading…

未来派几何风格包装徽标品牌海报标牌logo设计无衬线英文字体安装包 Myfonts – Trakya Sans Font Family

Trakya Sans 是一种具有几何风格的现代无衬线字体。Futura、Avant Garde 等。它具有现代条纹&#xff0c;这是宽度和高度协调的结果&#xff0c;尤其是在小写字母中&#xff0c;以支持易读性。 非常适合广告和包装、编辑和出版、徽标、品牌和创意产业、海报和广告牌、小文本、寻…

C语言深度解析:从零到系统级开发的完整指南

一、C语言的核心特性与优势 1. 高效性与直接硬件控制 C语言通过编译为机器码的特性&#xff0c;成为系统级开发的首选语言。例如&#xff0c;Linux内核通过C语言直接操作内存和硬件寄存器&#xff0c;实现高效进程调度。 关键点&#xff1a; malloc/free直接管理内存&#…

ctfshow WEB web8

首先确定注入点&#xff0c;输入以下payload使SQL恒成立 ?id-1/**/or/**/true 再输入一下payload 使SQL恒不成立 ?id-1/**/or/**/false 由于SQL恒不成立, 数据库查询不到任何数据, 从而导致页面空显示 由以上返回结果可知&#xff0c;该页面存在SQL注入&#xff0c;注入点…

【Linux】U-Boot 加载并启动 Linux 系统程序

U-Boot 加载并启动 Linux 系统程序 零、介绍 最近在玩一些嵌入式的开发板&#xff0c;在引导操作系统时需要用到U-Boot&#xff0c;故此研究一下。 U-Boot&#xff08;Universal Bootloader&#xff09;是一款开源的通用引导加载程序&#xff0c;专为嵌入式系统设计&#xff…

jarvisoj API调用 [JSON格式变XXE]

http://web.jarvisoj.com:9882/ 题目要求&#xff1a;请设法获得目标机器 /home/ctf/flag.txt 中的flag值 抓包得到&#xff1a; POST /api/v1.0/try HTTP/1.1 Host: web.jarvisoj.com:9882 Content-Length: 36 Accept-Language: zh-CN,zh;q0.9 User-Agent: Mozilla/5.0 (W…

机器学习的一百个概念(4)下采样

前言 本文隶属于专栏《机器学习的一百个概念》&#xff0c;该专栏为笔者原创&#xff0c;引用请注明来源&#xff0c;不足和错误之处请在评论区帮忙指出&#xff0c;谢谢&#xff01; 本专栏目录结构和参考文献请见[《机器学习的一百个概念》 ima 知识库 知识库广场搜索&…

NNI 适配 TensorRT10教程

引言 本文涉及两个框架及其版本分别为 NNI (Neural Network Intelligence) &#xff1a;3.0TensorRT&#xff1a;10.9.0.34 NNI 在文档 Speed Up Quantized Model with TensorRT里描述了如何使用 TensorRT 为NNI量化的模型实现加速&#xff0c;但是从NNI 的源代码https://gi…

多路径 TCP 调度的另一面

参考前面的文章 一个原教旨的多路径 TCP 和 MP-BBR 公平性推演&#xff0c;一直都破而不立&#xff0c;不能光说怎样不好&#xff0c;还得说说现状情况下&#xff0c;该如何是好。 如果 receiver 乱序重排的能力有限(拜 TCP 所赐)&#xff0c;如果非要在多路径上传输 TCP&…

vcpkg安装指定版本的库

一.vcpkg安装 使用git将vcpkg源码克隆到本地制定目录&#xff08;D:\vcpkg&#xff09;&#xff0c;并初始化 git clone https://github.com/microsoft/vcpkg.git cd vcpkg ./bootstrap-vcpkg.sh # Linux/macOS .\bootstrap-vcpkg.bat # Windows 如下图&#xff1a; 二.安…

【工具变量】上市公司供应链稳定性数据两个维度(2013-2023年)

供应链稳定性是指供应链在面对各种内外部因素的冲击和不确定性时&#xff0c;能够保持持续、顺畅运作的能力&#xff0c;而供应链稳定性指数是用于评估企业在其供应链管理中保持稳定性的一个重要指标。本分享数据参考钟涛&#xff08;2022&#xff09;、董浩和闫晴&#xff08;…

Redis场景问题2:缓存击穿

Redis 缓存击穿是指在缓存系统中&#xff0c;大量请求&#xff08;高并发访问&#xff09;同时访问一个不存在于缓存中&#xff08;一般是因为缓存过期或者数据未被加载到缓存&#xff09;但在数据库中存在的热点数据&#xff0c;从而导致这些请求直接穿透缓存层&#xff0c;涌…

RocketMQ - 从消息可靠传输谈高可用

先稍微介绍下RocketMQ架构。 主从架构 Broker 集群&#xff1a;每个 Broker 分为 Master 和 Slave 角色&#xff0c;Master 负责读写&#xff0c;Slave 作为热备。 同步复制&#xff08;SYNC_MASTER&#xff09;&#xff1a;消息写入 Master 后&#xff0c;需等待 Slave 同步完…

ES拼音分词自动补全实现

#测试拼音分词 POST /_analyze { "text":"如家酒店真不错", "analyzer": "pinyin" } #这里把拼音的首字母放到这里&#xff0c;也说明了这句话没有被分词&#xff0c;而是作为一个整体出现的 #还把每一个字都形成了一个拼音&#…

EFISH-SBC-RK3576 + 5G模组:无线工业相机与分布式AI质检‌

在智能制造与仓储物流场景中&#xff0c;传统有线工业相机存在部署成本高、灵活性差等痛点。‌eFish-SBC-RK3576‌ 通过 ‌5G无线传输 分布式NPU协同‌&#xff0c;实现跨产线、跨工厂的AI质检系统&#xff0c;检测效率提升300%&#xff0c;布线复杂度降低90%。 ‌1. 系统架构…

C/C++ 基础 - 回调函数

目录 前言 回调函数预备知识 函数指针 什么是函数指针 函数指针的语法 如何用函数指针调用函数 函数指针作为函数的参数 函数指针作为函数返回类型 函数指针数组 回调函数 什么是回调函数 为什么要用回调函数 怎么使用回调函数 总结 前言 在写项目的时候&#x…