ScheduledThreadPoolExecutor源码分析-延时线程池是如何实现延时执行的

news2024/12/29 0:11:29

ScheduledThreadPoolExecutor 线程池可以实现任务延时执行,那么它是怎么实现的呢?下面笔者进行详细分析

先看看它是怎么使用的

目录

1、延时执行使用

2、源码分析

2.1、ScheduledThreadPoolExecutor 初始化分析

2.2、ScheduledThreadPoolExecutor 执行延时任务分析

3、总结


1、延时执行使用

创建 ScheduledThreadPoolExecutor 可以直接new ScheduledThreadPoolExecutor;也可以使用 Executors.newScheduledThreadPool 得到 ScheduledExecutorService,ScheduledThreadPoolExecutor实现了 ScheduledExecutorService 接口

Executors.newScheduledThreadPool

package com.wsjzzcbq.java.thread.pool.scheduled;

import java.time.LocalDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * ScheduledThreadPoolLearn
 *
 * @author wsjz
 * @date 2023/09/13
 */
public class ScheduledThreadPoolLearn {

    public static void main(String[] args) {
        ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
        System.out.println(LocalDateTime.now());
        service.schedule(()->{
            System.out.println("被酒莫惊春睡重,赌书消得泼茶香,当时只道是寻常");
            System.out.println(LocalDateTime.now());
        }, 8, TimeUnit.SECONDS);

        service.shutdown();
    }
}

ScheduledThreadPoolExecutor

package com.wsjzzcbq.java.thread.pool.scheduled;

import java.time.LocalDateTime;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * ScheduledThreadPoolLearn
 *
 * @author wsjz
 * @date 2023/09/13
 */
public class ScheduledThreadPoolLearn {

    public static void main(String[] args) {
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
        System.out.println(LocalDateTime.now());
        executor.schedule(()->{
            System.out.println("被酒莫惊春睡重,赌书消得泼茶香,当时只道是寻常");
            System.out.println(LocalDateTime.now());
        }, 8, TimeUnit.SECONDS);

        executor.shutdown();
    }
}

运行效果

延时8秒执行

2、源码分析

2.1、ScheduledThreadPoolExecutor 初始化分析

下面分析 ScheduledThreadPoolExecutor 是如何实现延时执行的

我们先看 new ScheduledThreadPoolExecutor 对象时做了哪些事情,因为在构造函数中初始化的东西后面会用到

通过构造函数点进去,可以看到下面的代码

    /**
     * Creates a new {@code ScheduledThreadPoolExecutor} with the
     * given core pool size.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

熟悉线程池原理的朋友应该知道阻塞队列在线程池中的作用,这里不做展开。ScheduledThreadPoolExecutor 使用的是 DelayedWorkQueue,看名字应该猜到它是延时工作队列,这个队列是在 ScheduledThreadPoolExecutor 内部自己实现的,是它的内部类

点击 super,我们看 ScheduledThreadPoolExecutor 它的父类做了哪些事情

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters and default thread factory and rejected execution handler.
     * It may be more convenient to use one of the {@link Executors} factory
     * methods instead of this general purpose constructor.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

Executors.defaultThreadFactory() 使用默认的线程工厂,这里留个印象,因为后面会用

点击 defaultThreadFactory(),看看默认线程工厂方法的内容

    /**
     * Returns a default thread factory used to create new threads.
     * This factory creates all new threads used by an Executor in the
     * same {@link ThreadGroup}. If there is a {@link
     * java.lang.SecurityManager}, it uses the group of {@link
     * System#getSecurityManager}, else the group of the thread
     * invoking this {@code defaultThreadFactory} method. Each new
     * thread is created as a non-daemon thread with priority set to
     * the smaller of {@code Thread.NORM_PRIORITY} and the maximum
     * priority permitted in the thread group.  New threads have names
     * accessible via {@link Thread#getName} of
     * <em>pool-N-thread-M</em>, where <em>N</em> is the sequence
     * number of this factory, and <em>M</em> is the sequence number
     * of the thread created by this factory.
     * @return a thread factory
     */
    public static ThreadFactory defaultThreadFactory() {
        return new DefaultThreadFactory();
    }

这里new 一个 DefaultThreadFactory

我们点进去看看 DefaultThreadFactory

/**
     * The default thread factory
     */
    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

它是 Executors 的内部类,newThread 方法要记一下,因为后面会用到

接下来我们回到 ScheduledThreadPoolExecutor 的父类 ThreadPoolExecutor

点击 this 看看详细的内容

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

这是它真正开始初始化的地方,初始化线程池需要的参数

到现在为止我们只需要记住2个参数,在 ScheduledThreadPoolExecutor 构造阶段,一个是它使用了DelayedWorkQueue,另一个是它使用了默认的线程工厂

2.2、ScheduledThreadPoolExecutor 执行延时任务分析

下面看 schedule 方法做了哪些事情

还是点进去看源码

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

这里先判断参数是不是null,不是null 的话构建一个 RunnableScheduledFuture 对象

我们先看 triggerTime 方法

    /**
     * Returns the trigger time of a delayed action.
     */
    private long triggerTime(long delay, TimeUnit unit) {
        return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
    }

    /**
     * Returns the trigger time of a delayed action.
     */
    long triggerTime(long delay) {
        return now() +
            ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
    }

这里会将延时时间转换成纳秒并加上现在时间进行返回

然后我们看 ScheduledFutureTask 构造函数

        /**
         * Creates a one-shot action with given nanoTime-based trigger time.
         */
        ScheduledFutureTask(Runnable r, V result, long ns) {
            super(r, result);
            this.time = ns;
            this.period = 0;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

ScheduledFutureTask 的父类

    /**
     * Creates a {@code FutureTask} that will, upon running, execute the
     * given {@code Runnable}, and arrange that {@code get} will return the
     * given result on successful completion.
     *
     * @param runnable the runnable task
     * @param result the result to return on successful completion. If
     * you don't need a particular result, consider using
     * constructions of the form:
     * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
     * @throws NullPointerException if the runnable is null
     */
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

这里就是将线程池执行的 Runnable 和延时时间赋值给 ScheduledFutureTask 对应的属性,然后将ScheduledFutureTask 返回

我们回到 schedule 方法

下面我们看关键的 delayedExecute(t) 方法

    /**
     * Main execution method for delayed or periodic tasks.  If pool
     * is shut down, rejects the task. Otherwise adds task to queue
     * and starts a thread, if necessary, to run it.  (We cannot
     * prestart the thread to run the task because the task (probably)
     * shouldn't be run yet.)  If the pool is shut down while the task
     * is being added, cancel and remove it if required by state and
     * run-after-shutdown parameters.
     *
     * @param task the task
     */
    private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }

这里先判断线程池是不是 Shutdown,如果没有的话会将 RunnableScheduledFuture 对象添加到队列,什么队列呢?当然是上面初始化时候的 DelayedWorkQueue,我们先不看DelayedWorkQueue 的 add 方法逻辑,先看线程池的代码

添加到队列后,又进行Shutdown判断,我们直接看 ensurePrestart() 方法

    /**
     * Same as prestartCoreThread except arranges that at least one
     * thread is started even if corePoolSize is 0.
     */
    void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }

workerCountOf(c) 返回当前线程数量,初始返回 0;corePoolSize 是我们构造 ScheduledThreadPoolExecutor 对象时传的 1,因此会进入 addWorker(null, true) 方法

    /**
     * Checks if a new worker can be added with respect to current
     * pool state and the given bound (either core or maximum). If so,
     * the worker count is adjusted accordingly, and, if possible, a
     * new worker is created and started, running firstTask as its
     * first task. This method returns false if the pool is stopped or
     * eligible to shut down. It also returns false if the thread
     * factory fails to create a thread when asked.  If the thread
     * creation fails, either due to the thread factory returning
     * null, or due to an exception (typically OutOfMemoryError in
     * Thread.start()), we roll back cleanly.
     *
     * @param firstTask the task the new thread should run first (or
     * null if none). Workers are created with an initial first task
     * (in method execute()) to bypass queuing when there are fewer
     * than corePoolSize threads (in which case we always start one),
     * or when the queue is full (in which case we must bypass queue).
     * Initially idle threads are usually created via
     * prestartCoreThread or to replace other dying workers.
     *
     * @param core if true use corePoolSize as bound, else
     * maximumPoolSize. (A boolean indicator is used here rather than a
     * value to ensure reads of fresh values after checking other pool
     * state).
     * @return true if successful
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            //检查线程池状态
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                //判断线程数是够超出容量
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //将线程数加 1
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

addWorker 方法前面 2个for 死循环先判断线程池状态,然后第2个for循环先判断worker线程数有没有超出容量,然后通过 compareAndIncrementWorkerCount(c) 将线程数加 1,然后结束2重循环

下面的代码先 new Worker(firstTask),然后将 worker 添加到 workers,最后调用 t.start() 方法结束

下面我们分析 Worker

t.start() 是 Worker 中的线程,由  final Thread t = w.thread 得到,我们先看 new Worker(firstTask)做了什么

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

由上面的代码知道传入的 firstTask = null

这里的 thread 由线程工厂 getThreadFactory().newThread(this) 得到

而线程工厂是前面初始化时候的默认线程工厂,注意这里 getThreadFactory().newThread(this) 时传入了 this,这里是关键代码,这个 this 是什么,是当前 new 出来的 Worker对象,把new 出来的 Worker 对象作为线程的 Runnable 传进去,所以当 Worker 内的 thread 调用start 方法运行后,会执行 Worker 类的 run 方法。这里有些绕!

下面是默认线程工厂,注意看 newThread 方法

所以 t.start() 运行后,会进入 Worker 类的 run 方法

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

Worker 类的 run 方法调用外面的 runWorker 方法,并传入当前 Worker 对象

下面看 runWorker 方法

    /**
     * Main worker run loop.  Repeatedly gets tasks from queue and
     * executes them, while coping with a number of issues:
     *
     * 1. We may start out with an initial task, in which case we
     * don't need to get the first one. Otherwise, as long as pool is
     * running, we get tasks from getTask. If it returns null then the
     * worker exits due to changed pool state or configuration
     * parameters.  Other exits result from exception throws in
     * external code, in which case completedAbruptly holds, which
     * usually leads processWorkerExit to replace this thread.
     *
     * 2. Before running any task, the lock is acquired to prevent
     * other pool interrupts while the task is executing, and then we
     * ensure that unless pool is stopping, this thread does not have
     * its interrupt set.
     *
     * 3. Each task run is preceded by a call to beforeExecute, which
     * might throw an exception, in which case we cause thread to die
     * (breaking loop with completedAbruptly true) without processing
     * the task.
     *
     * 4. Assuming beforeExecute completes normally, we run the task,
     * gathering any of its thrown exceptions to send to afterExecute.
     * We separately handle RuntimeException, Error (both of which the
     * specs guarantee that we trap) and arbitrary Throwables.
     * Because we cannot rethrow Throwables within Runnable.run, we
     * wrap them within Errors on the way out (to the thread's
     * UncaughtExceptionHandler).  Any thrown exception also
     * conservatively causes thread to die.
     *
     * 5. After task.run completes, we call afterExecute, which may
     * also throw an exception, which will also cause thread to
     * die. According to JLS Sec 14.20, this exception is the one that
     * will be in effect even if task.run throws.
     *
     * The net effect of the exception mechanics is that afterExecute
     * and the thread's UncaughtExceptionHandler have as accurate
     * information as we can provide about any problems encountered by
     * user code.
     *
     * @param w the worker
     */
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

这里先判断 Worker 的 firstTask 是不是 null,firstTask 是 null,程序会从 getTask() 中获取 task

我们先往下看流程,如果从 getTask() 中拿到了 task,下面会执行 task.run(),就是我们线程池要延时执行的 Runnable 的 run 方法。看到这没发现有延时的内容,也就是说延时和线程池执行流程没关系,那么延时是怎么实现的呢?我们就要看 getTask() 是怎么获取 task 的

    /**
     * Performs blocking or timed wait for a task, depending on
     * current configuration settings, or returns null if this worker
     * must exit because of any of:
     * 1. There are more than maximumPoolSize workers (due to
     *    a call to setMaximumPoolSize).
     * 2. The pool is stopped.
     * 3. The pool is shutdown and the queue is empty.
     * 4. This worker timed out waiting for a task, and timed-out
     *    workers are subject to termination (that is,
     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
     *    both before and after the timed wait, and if the queue is
     *    non-empty, this worker is not the last thread in the pool.
     *
     * @return task, or null if the worker must exit, in which case
     *         workerCount is decremented
     */
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

还是一个死循环,先判断线程池状态,再判断线程数

注意 timed,allowCoreThreadTimeOut 默认是 false,wc 不大于 corePoolSize,因此 timed 等于 false,下面 timed 的三目运算符会执行 workQueue.take(),workQueue 是前面的 DelayedWorkQueue

下面看 DelayedWorkQueue 的 take 方法

        public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            return finishPoll(first);
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }

这里是真正实现延时的地方

首先还是一个死循环,DelayedWorkQueue 是基于数组实现的,先拿出队列最上方的元素,

RunnableScheduledFuture<?> first = queue[0],看它是否等于 null,等于 null 就等待,这是阻塞队列一贯的做法,如果不是null,看它的延时时间是否小于等于 0,小于等于 0 则出队;如果不小于等于 0,说明没到执行时间,available.awaitNanos(delay) 让线程等待需要延时的时长,等待时长结束后,开始新一轮循环,此时延时时间小于等于 0 满足,将任务出队返回。从而达到延时执行的效果

读者可能会有个疑问,DelayedWorkQueue 数组首下标的元素一定是延时时间最小的吗?

下面看 DelayedWorkQueue 的 add 方法实现,就是前面将 RunnableScheduledFuture 对象添加到队列的 add 方法

public boolean add(Runnable e) {
    return offer(e);
}

offer 方法

        public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
            RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = size;
                if (i >= queue.length)
                    grow();
                size = i + 1;
                if (i == 0) {
                    queue[0] = e;
                    setIndex(e, 0);
                } else {
                    siftUp(i, e);
                }
                if (queue[0] == e) {
                    leader = null;
                    available.signal();
                }
            } finally {
                lock.unlock();
            }
            return true;
        }

如果是首次添加会把元素放在数组索引是0的位置,如果不是首次添加会调用 siftUp 方法

        /**
         * Sifts element added at bottom up to its heap-ordered spot.
         * Call only when holding lock.
         */
        private void siftUp(int k, RunnableScheduledFuture<?> key) {
            while (k > 0) {
                int parent = (k - 1) >>> 1;
                RunnableScheduledFuture<?> e = queue[parent];
                if (key.compareTo(e) >= 0)
                    break;
                queue[k] = e;
                setIndex(e, k);
                k = parent;
            }
            queue[k] = key;
            setIndex(key, k);
        }

这里我们假设是第2次添加,k=1,key 是新添加的元素,此时 k 大于 0,>>> 表示无符号右移位运算,得到的 parent 是 0,得到的 e 就是首位的元素,然后用新添加的 key 和 e 进行比较,如果 key 的延时时间比 e 长,结束循环,给数组索引是1的位置赋值 key;如果 key 的延时时间比 e 小的话,把索引是 1 的位置赋值 e,然后让 k = 0,结束循环,再把索引是0的位置赋值 key,这样就实现了索引是 0 的位置的元素总是延时时间最小的。再详细的细节笔者就不分析了,本文重点分析延时是如何实现的

3、总结

ScheduledThreadPoolExecutor 线程池是如何实现延时执行的?和线程池的执行流程无关,延时是通过延时队列实现的,当我们提交一个任务到线程池时,它不是先执行任务,而是先把任务添加到延时队列,然后启动 worker 线程执行,worker 线程以阻塞的方法从延时队列中获取任务,如果任务没到执行时间,worker 线程会等待需要延时的时间,等待延时时间结束后,进入新一轮循环,此时延时时间小于等于0,worker 线程可以从延时队列中获取任务,获取到任务后,走正常线程池的流程,执行任务,从而达到延时的效果

当然还有一些更详细的细节,比如延时队列中有一个任务等待执行,此时添加进来一个延时时间更短的任务,这时应该唤醒出于等待的线程,让它重新获取等待时间,进行等待。否则会导致新添加的更短延时时间的任务实际执行时间比应该执行的时间晚

至此完

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1022874.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

java项目之咖啡馆管理系统ssm+jsp

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于ssm的咖啡馆管理系统。技术交流和部署相关看文章末尾&#xff01; 开发环境&#xff1a; 后端&#xff1a; 开发语言&#xff1a;Java 框架&am…

Netty2

文章目录 Netty2Netty入站与出站机制Netty的handler链的调用机制 Netty2 Netty入站与出站机制 基本说明&#xff1a; 1&#xff09;netty的组件设计&#xff1a;Netty的主要组件有Channel&#xff0c;EventLoop&#xff0c;ChannelFuture&#xff0c;ChannelHandler&#xff…

[golang gui]fyne框架代码示例

1、下载GO Go语言中文网 golang安装包 - 阿里镜像站(镜像站使用方法&#xff1a;查找最新非rc版本的golang安装包) golang安装包 - 中科大镜像站 go二进制文件下载 - 南京大学开源镜像站 Go语言官网(Google中国) Go语言官网(Go团队) 截至目前&#xff08;2023年9月17日&#x…

中秋猜灯谜小游戏

中秋猜灯谜小游戏是一个基于HTML制作的互动游戏&#xff0c;旨在增添中秋节的欢乐氛围&#xff0c;通过猜灯谜来娱乐和挑战玩家。 目录 前言简介游戏规则 制作过程HTML 结构CSS 样式JavaScript 交互 功能实现题目和答案的存储游戏逻辑设计 前言 简介 游戏开始时&#xff0c;玩…

SpringBoot Admin监控平台《二》基础报警设置

一、前置准备 首先搭建监控一个平台和连个客户端&#xff0c;搭建流程见SpringBoot Admin监控平台《一》平台搭建及基础介绍 &#xff0c;搭建完毕之后&#xff0c;启动各个项目&#xff0c;监控平台的界面如下所示&#xff1a; 二、邮件报警 2.1.邮箱授权码获取 授权码主要…

5.5V-65V Vin同步降压控制器,具有线路前馈SCT82630DHKR

描述&#xff1a; SCT82630是一款65V电压模式控制同步降压控制器&#xff0c;具有线路前馈。40ns受控高压侧MOSFET的最小导通时间支持高转换比&#xff0c;实现从48V输入到低压轨的直接降压转换&#xff0c;降低了系统复杂性和解决方案成本。如果需要&#xff0c;在低至6V的输…

天猫全店商品采集教程,天猫店铺所有商品接口(详解天猫店铺所有商品数据采集步骤方法和代码示例)

随着电商行业的快速发展&#xff0c;天猫已成为国内的电商平台之一&#xff0c;拥有着海量的商品资源。对于一些需要大量商品数据的商家或者需求方来说&#xff0c;天猫全店采集是非常必要的。本文将详细介绍天猫全店采集的步骤和技巧&#xff0c;帮助大家更好地完成数据采集任…

使用Visual Leak Detector排查内存泄漏问题

目录 1、VLD工具概述 2、下载并安装VLD 2.1、下载VLD 2.2、安装VLD 3、VLD安装目录及文件说明 3.1、安装目录及文件说明 3.2、关于32位和64位版本的详细说明 4、在工程中引入VLD 5、内存泄漏检测实例讲解 5.1、程序启动报错 5.2、启动调试&#xff0c;查看内存泄漏报…

二维码生成器

二维码生成器 二维码生成器_二维码在线制作_应用方案提供商_互联二维码 使用方式 先知道自己电脑端口 然后运行你要生成页面 拼接自己的端口和页面路径

四川天蝶电子商务有限公司正规吗?

近年来&#xff0c;随着短视频平台的兴起&#xff0c;抖音成为了中国最受欢迎的社交媒体之一。许多企业看到了抖音带货的巨大商机&#xff0c;纷纷涌入这个领域。然而&#xff0c;一些不法分子也乘机滋生&#xff0c;伪装成合法的商家&#xff0c;进行各种欺诈行为。所以&#…

这些提高摸鱼效率的自动化测试技巧,提高打工人幸福感~

最近有许多小伙伴都在吐槽打工好难。 每天都是执行许多重复的任务 例如阅读新闻、发邮件、查看天气、打开书签、清理文件夹等等&#xff0c; 使用自动化脚本&#xff0c;就无需手动一次又一次地完成这些任务&#xff0c; 非常方便啊有木有&#xff1f;&#xff01; 今天就…

rv1126-rv1109-瑞芯微的 IPC 程序

关闭瑞芯微的 IPC 程序 例程源码中,第一次下载之后会进入一个类似摄像头demo预览的界面 我想要关掉它,找了很久,终于发现 \rv1126_rv1109\buildroot\board\rockchip\rv1126_rv1109\fs-overlay-sysv\etc\init.d\S98_lunch_init 这个文件注解掉全部 就可以看到注解掉就只有l…

软文推广在医疗行业中的优势有哪些?媒介盒子告诉你

随互联网的快速发展&#xff0c;越来越多的企业开始利用网络宣传&#xff0c;医疗行业也参与其中&#xff0c;相比于传统广告的高成本和不明显的效果&#xff0c;软文推广的效果更明显&#xff0c;对医疗行业的宣传帮助也更大&#xff0c;现在就由媒介盒子告诉大家&#xff0c;…

springboot整合mybatis(详解)

springboot整合mybatis 1. 整体架构展示&#xff1a; 2. pom.xml-需要的依赖&#xff1a; <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency&g…

ICS TRIPLEX T8461 PLC系统备件模块

ICS TRIPLEX T8461 是一款用于 PLC&#xff08;可编程逻辑控制器&#xff09;系统备件的模块&#xff0c;通常用于工业自动化和控制系统中。这种类型的备件模块在多个应用领域都有广泛的用途&#xff0c;包括但不限于以下几个领域&#xff1a; 制造业&#xff1a; T8461 模块可…

FastAdmin开发七牛云上传插件

一看官网一个上传插件60大洋&#xff0c;对我这个穷鬼来说还是太贵了&#xff0c;于是乎自己写一个&#xff0c;后面随时用 直接开干 创建插件 php think addon -a aliupload -c create创建配置 <?phpreturn [[name > region,title > 获取存储区域,type > sel…

Linux内核中断(内核中断实现过程、注册三个按键中断实例、中断底半部实例、工作队列)

一、linux内核中断 1.目的&#xff1a; 用于对设备不用进行轮询访问&#xff0c;而是当设备事件发生后主动通知内核&#xff0c;内核再去访问设备。 2.linux内核中断实现过程框图 3.中断子系统API 1.解析中断相关的设备树节点 struct device_node *of_find_compatible_node…

【笔试强训选择题】Day44.习题(错题)解析

作者简介&#xff1a;大家好&#xff0c;我是未央&#xff1b; 博客首页&#xff1a;未央.303 系列专栏&#xff1a;笔试强训选择题 每日一句&#xff1a;人的一生&#xff0c;可以有所作为的时机只有一次&#xff0c;那就是现在&#xff01;&#xff01;&#xff01;&#xff…

CSS 滚动驱动动画 scroll-timeline ( scroll-timeline-name ❤️ scroll-timeline-axis )

scroll-timelinescroll-timeline-name❤️scroll-timeline-axis 解决问题语法 animation-timeline-nameanimation-timeline-axis scroll-timeline ( scroll-timeline-name ❤️ scroll-timeline-axis ) 在 scroll() 的最后我们遇到了因为定位问题导致滚动效果失效的情况, 当…

选择渲染农场的几个标准

随着电影、电视剧等影视作品的制作越来越依赖于计算机特效&#xff0c;渲染农场的使用也变得越来越普遍。渲染农场是一种利用大量计算机图形处理器&#xff08;GPU&#xff09;来加速渲染过程的服务。在选择渲染农场时&#xff0c;有几个标准可以帮助您确定哪个农场是适合您的项…