Java计划线程池ScheduledThreadPoolExecutor运行流程和源码分析

news2024/12/25 22:21:22

1. 计划线程池ScheduledThreadPoolExecutor简介

ScheduledThreadPoolExecutor继承自线程池ThreadPoolExecutor,并在其基础上增加了按时间调度执行任务的功能,如果对ThreadPoolExecutor还不是很熟悉,可以阅读一下这篇文章:
Java线程池ThreadPoolExecutor运行机制和源码解析。

首先来翻译一下这个类的前面的单词scheduled: 预先安排的,按时刻表的;定期的。那么ScheduledThreadPoolExecutor可翻译为计划线程池、定时线程池。

大多数时候,线程池已经使用了池化技术很好的满足了线程的重复使用需要,为什么还要另外搞一个ScheduledThreadPoolExecutor呢?

普通线程池无法满足以下痛点:

  1. 无法设置任务在指定时间点执行
  2. 无法周期性的执行任务。想要完成这点需要在自己实现,在任务里加一个while循环。

可能有人会问java里已经有了Timer类,能实现定时运行任务。但Timer类有个缺点就是内部只有一个线程,如果有多个任务,在同一时间只有一个任务在运行,无法做到同时运行,虽然可以使用多个Timer解决,但比较难以进行统一的管理。

2. 计划线程池ScheduledThreadPoolExecutor运行机制

ScheduledThreadPoolExecutor在调用提交任务的方法后(比如scheduleAtFixedRatescheduleWithFixedDelay)会将任务放入一个按执行时间升序排行的缓冲队列,工作线程会从任务缓冲队列取任务然后将它执行,在任务完成时会计算下次执行时间然后放回任务缓冲队列(这点是和ThreadPoolExecutor的区别),从而实现周期性执行。

在这里插入图片描述

3. ScheduledThreadPoolExecutor类图

ScheduledThreadPoolExecutor继承自线程池ThreadPoolExecutor,是从ThreadPoolExecutor扩展而来。

在这里插入图片描述

4. ScheduledThreadPoolExecutor构造函数

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}                                     
参数类型名称备注
corePoolSizeint核心线程数除非设置了allowCoreThreadTimeOut,不然即使空闲也不会回收
threadFactoryThreadFactory线程工厂可设置线程属性
handlerRejectedExecutionHandler拒绝策略当线程容量溢出时执行的策略

ScheduledThreadPoolExecutor的构造函数是调用父类ThreadPoolExecutor的构造函数,区别就是ThreadPoolExecutor可以自行根据功能需要使用不同类型的缓冲队列,而ScheduledThreadPoolExecutor的缓冲队列只能是DelayedWorkQueue

5. 提交计划任务

我们知道,ThreadPoolExecutor提交任务的唯一入口就是execute()方法,任务在execute()方法里进行分配和调度。与之不同的是ScheduledThreadPoolExecutor根据对任务处理需求不同,提交任务有几个方法。


/**
 * 安排一个任务在延迟一段时间后执行
 *
 * @param command 任务
 * @param delay 要延迟多少时间单位执行(从当前时间开始计时)
 * @param unit delay参数的时间单位
 */
public ScheduledFuture<?> schedule(Runnable command,
                            long delay, 
                            TimeUnit unit);

/**
 * 和上面方法的区别是入参任务一个是Runnable,一个Callable
 */                            
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                       long delay, 
                                       TimeUnit unit);


/**
 * 创建并执行一个周期性任务,该任务在指定的延迟时间后首次执行,
 * 然后以给定的周期重复执行;即任务将在 initialDelay 后开始执行,
 * 然后在 initialDelay+period,再然后是 initialDelay + 2 * period,以此类推。
 * 如果任务的任何执行遇到异常,则将禁止后续的执行。
 * 否则,任务只能通过取消或执行器的终止来终止。
 * 如果任务的任何执行时间超过其周期,则后续执行可能会延迟开始,但不会并发执行。
 *
 * @param command 要执行的任务
 * @param initialDelay 首次执行前的延迟时间
 * @param period 多次执行的时间周期间隔
 * @param unit 初始延迟和周期的时间单位
 */
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit);
                 
/**
 * 创建并执行一个周期性任务,该任务在指定的延迟时间后首次执行,
 * 然后在本次执行的终止与下一次执行的开始之间以给定的延迟重复执行。
 * 如果任务的任何执行遇到异常,则将禁止后续的执行。
 * 否则,任务只能通过取消或执行器的终止来终止。
 *
 * @param command 要执行的任务
 * @param initialDelay 首次执行的延迟时间
 * @param delay 一个执行的终止与下一个执行的开始之间的延迟
 * @param unit initialDelay 和 delay 参数的时间单位
 */                                                  
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit);
                                                 

提交计划的方法汇总至表格如下:

方法参数参数备注效果
scheduleRunnable command要执行的任务安排一个任务在延迟一段时间后执行
long delay要延迟多少时间单位执行(从当前时间开始计时)
TimeUnit unitdelay参数的时间单位
scheduleCallable<V> callable要执行的callable任务和上面方法的区别是入参任务一个是Runnable,一个Callable
long delay要延迟多少时间单位执行(从当前时间开始计时)
TimeUnit unitdelay参数的时间单位
scheduleAtFixedRateRunnable command要执行的任务创建并执行一个周期性任务,该任务在指定的延迟时间后首次执行;然后以给定的周期重复执行(有可能会连续不间隔的执行)
long initialDelay首次执行前的延迟时间
long period多次执行的时间周期间隔
TimeUnit unit初始延迟和周期的时间单位
scheduleWithFixedDelayRunnable command要执行的任务创建并执行一个周期性任务,该任务在指定的延迟时间后首次执行;然后在本次执行的终止与下一次执行的开始之间 固定以给定的延迟 重复执行。
long initialDelay首次执行前的延迟时间
long period多次执行的时间周期间隔
TimeUnit unit初始延迟和周期的时间单位
scheduleAtFixedRate和scheduleWithFixedDelay的区别
  • scheduleAtFixedRate

以本次执行的开始时间beginTime为基准,下一次的执行的时间为beginTime + period。如果执行任务的时间超过了period,会导致任务完成后无缝再执行一次。

  • scheduleWithFixedDelay

以本次执行的结束时间endTime为基准,下一次的执行的时间为endTime + delay。这个方法无论该次执行使用多少时间,都会在任务执行完成后会等待delay的时间再执行。

6. 阻塞队列DelayedWorkQueue

DelayedWorkQueue提供了一种方便的方式来管理延迟执行的任务,并且能够在高并发情况下安全地进行操作,是实现定时任务调度功能的重要组成部分,以下是DelayedWorkQueue的一些特性:

  1. 延迟执行任务:DelayedWorkQueue用于存储需要延迟执行的任务,这些任务会在指定的延迟时间之后被执行。这使得程序可以安排任务在将来的某个时间点执行,例如定时任务。
  2. 基于优先级队列:DelayedWorkQueue通常是基于优先级队列(PriorityQueue)实现的,其中任务根据其延迟时间被排序。因此,队列中的任务按照延迟的时间进行排列,最先到期的任务将被最先执行。
  3. 数据结构:DelayedWorkQueue通常是一个有序队列,内部使用了二叉堆数据结构来实现。这样可以快速找到下一个要执行的任务,并且在添加新任务时保持队列的有序性。
  4. 并发性:DelayedWorkQueue是线程安全的,内部有一个可重入锁ReentrantLock,通过锁来保证并发安全,它需要被多个线程同时访问,以便添加新任务或者移除已经过期的任务。这涉及到对队列的操作的同步或者使用线程安全的数据结构。
  5. 使用场景:DelayedWorkQueue适用于需要按照一定延迟顺序执行任务的场景,比如定时任务、周期性任务等。它可以作为ScheduledThreadPoolExecutor的内部组件使用,ScheduledThreadPoolExecutor通过DelayedWorkQueue实现了定时任务的调度功能。

DelayedWorkQueue从数据结构来说是一个用数组实现的二叉堆,排序方式为从小到大,每个父节点都比子节点要小的堆也称为最小堆, 堆顶元素是最小的,因此堆顶存储的是执行时间缀最小的任务。

堆(Heap)是一种特殊的树形数据结构,通常实现为数组,其具有以下性质:

  1. 堆序性质:在堆中,对于任意节点 i,如果 j 是 i 的子节点,那么堆中的节点 i 的值必须满足某种特定的顺序关系(比如最小堆中,父节点的值小于或等于子节点的值;最大堆中,父节点的值大于或等于子节点的值)。

  2. 完全二叉树性质:堆通常被实现为完全二叉树,即除了最后一层外,每一层都被完全填满,且最后一层的节点都尽可能地靠左排列。这样的特性使得堆可以利用数组来表示,而不需要使用额外的指针。

由于DelayedWorkQueue是最小堆结构,下面文章着重讲解的是最小堆

最小堆介绍

堆可以分为两种常见的类型:

最小堆(Min Heap):在最小堆中,父节点的值小于或等于其子节点的值,因此堆顶元素是最小的。

最大堆(Max Heap):在最大堆中,父节点的值大于或等于其子节点的值,因此堆顶元素是最大的。

最小堆通常可以用数组来表示,下面是一个列子

最小堆二叉树

在这里插入图片描述

最小堆数组

在这里插入图片描述

对于最小堆数组,有以下的下标对应关系:

节点效果
当前节点arr[i]
父节点arr[(i-1)/2]
左子节点arr[(2*i) + 1]
右子节点arr[(2*i )+ 2]
DelayedWorkQueue的定义
static class DelayedWorkQueue extends AbstractQueue<Runnable>
    implements BlockingQueue<Runnable> {
    
    private static final int INITIAL_CAPACITY = 16;
    private RunnableScheduledFuture<?>[] queue =
        new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
    private final ReentrantLock lock = new ReentrantLock();
    private int size = 0;    
    
    private Thread leader = null;
    private final Condition available = lock.newCondition();
    
}

类图

在这里插入图片描述

DelayedWorkQueue实现了阻塞队列BlockingQueue,通过可重入锁ReentrantLock保证多线程并发安全,它的最小堆通过数组实现,这个数组初始容量为16,每次扩容增加一半。

核心方法:

  • offer(Runnable x) 向队列添加元素
  • take() 从队列取任务
  • poll(long timeout, TimeUnit unit) 从队列取元素,若在指定时间内未能取到则返回null

offer 向队列增加元素

offer方法其实就是向最小堆添加元素的过程,将元素从最小堆最底部开始上浮,直到比它的parent节点的值要大。上浮完毕后如果该元素在堆顶,则该元素为最先要执行的任务,唤醒等待的一个线程。

offer方法的流程图

在这里插入图片描述

offer方法的代码如下:

public boolean offer(Runnable x) {
    if (x == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = size;
        if (i >= queue.length)
            grow();     //队列已满的时候进行扩容
        size = i + 1;
        if (i == 0) {   //队列为空的时候
            queue[0] = e;
            setIndex(e, 0);
        } else {
            siftUp(i, e); //元素从底部上浮
        }
        if (queue[0] == e) {
            leader = null;
            available.signal();   //唤醒一个等待的线程
        }
    } finally {
        lock.unlock();
    }
    return true;
}

siftUp 堆排序中的上浮操作

在offer的过程中因为堆添加了新的元素,假设它为e,为了维持最小堆的数据结构,需要将元e素调整到它数值的排序位置上,步骤如下:

  1. 首先将元素e放到堆数据的尾部
  2. 将元素e与父节点比较,若小于父节点则与其交换
  3. 重复步骤2直至元素e大于父子点或元素e已到堆顶
    /**
     * 堆排序中的上浮操作
     * 仅当持有锁的时候才会调用.
     */
    private void siftUp(int k, RunnableScheduledFuture<?> key) {
        //在二叉小堆中,把元素从底向上比较排序
        while (k > 0) {
            int parent = (k - 1) >>> 1;  //在二叉堆中元素的父节点位置
            RunnableScheduledFuture<?> e = queue[parent];
            if (key.compareTo(e) >= 0)
                /**
                 * 上浮直到找到比它的parent节点的值要大
                 */
                break;
            queue[k] = e;
            setIndex(e, k);
            k = parent;
        }
        queue[k] = key;
        setIndex(key, k);
    }

take 从队列取元素

take()方法是从堆顶取最小的任务,

  1. 如果队列为空,等待元素入列
  2. 取第一个堆顶最小的元素,计算该元素是否已经到执行时间
  3. 如果已到执行时间,返回元素
  4. 如果未到执行时间,

take()方法流程图:

在这里插入图片描述

源码

/**
 * 取堆顶元素
 */
public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture<?> first = queue[0];
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return finishPoll(first);
                first = null; // don't retain ref while waiting
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}
siftDown 堆排序中的向下堆操作

由于堆顶元素由于在take()poll()时已经出列而不再存在,堆的结构已经不完整,需调整堆结构。

  1. 把堆顶作为当前节点向下堆化
  2. 取当前节点的较小子节点child与队列最后元素(最后元素为入参的参数key,调用siftDown()时传入)比较
  3. 如果子节点child小于key元素,子节点child与父节点交换,当前节点变为child,然后回到步骤2直至子节点child大于key元素
  4. key元素放在child节点处
/**
 * 堆排序中的下沉操作
 * 仅当持有锁的时候才会调用。
 */
private void siftDown(int k, RunnableScheduledFuture<?> key) {
    int half = size >>> 1;  //相当于size/2
    while (k < half) {   //half之后都是叶子节点,不需要遍历小堆的叶子
        int child = (k << 1) + 1;
        RunnableScheduledFuture<?> c = queue[child];
        int right = child + 1;
        //如果右节点小于左节点,取右节点作为比较和交换的节点
        if (right < size && c.compareTo(queue[right]) > 0)
            c = queue[child = right];
        if (key.compareTo(c) <= 0)
            break;
        queue[k] = c;
        setIndex(c, k);
        k = child;
    }
    queue[k] = key;
    setIndex(key, k);
}

思考一个问题:为什么offer()使用的锁方法是lock(),而take()方法使用的锁是lockInterruptibly()

首先来理清一下这两者的区别:

  • lock(): 这是最基本的获取锁的方法。如果调用 lock() 时锁已经被其他线程持有,则当前线程会一直阻塞,直到获取到锁为止。如果在阻塞的过程中当前线程被中断(通过 Thread.interrupt() 方法),它将继续等待锁,不会响应中断,直到获取到锁为止。

  • lockInterruptibly:这个方法也是用来获取锁的,但是它允许在等待锁的过程中响应中断。如果调用 lockInterruptibly() 时锁已经被其他线程持有,则当前线程会被阻塞,但是如果在等待过程中被中断,则会抛出 InterruptedException 异常,此时可以根据实际需求进行相应的处理。

使用offer方法时为了保证提交的任务入列,肯定是要一直阻塞直到获得锁。

使用take()方法取数据时若线程处于竞争锁的时候(此时没有在执行任务,是空闲的工作线程),若调用线程池的shutdown()方法,线程池中断空闲线程时也能抛出 InterruptedException;但如果用的是lock()方法,会导致取得锁后一直在available.await(),除了使用kill -9强杀进程外永远无法回收线程。

leader线程用于

7. ScheduledFutureTask

计划线程池会把RunnableCallable的类型的任务包装成ScheduledFutureTask再放进任务队列,这个包装类是实现周期性重复运行任务的关键

ScheduledFutureTask类图

在这里插入图片描述

定义

private class ScheduledFutureTask<V>
            extends FutureTask<V> implements RunnableScheduledFuture<V> {

        private final long sequenceNumber;

        private long time; //设定的执行时间nanoTime

        //0:不重复执行
        //正值: 对应FixedRate
        //负值: 对应FixedDelay
        private final long period; 

        /** The actual task to be re-enqueued by reExecutePeriodic */
        RunnableScheduledFuture<V> outerTask = this;


        ScheduledFutureTask(Runnable r, V result, long ns) {
            super(r, result);
            this.time = ns;
            this.period = 0;
            this.sequenceNumber = sequencer.getAndIncrement();
        }


        ScheduledFutureTask(Runnable r, V result, long ns, long period) {
            super(r, result);
            this.time = ns;
            this.period = period;
            this.sequenceNumber = sequencer.getAndIncrement();
        }
    }
    //省略代码……
}

run方法

要实现周期性重复运行任务的功能只需把任务重复放进等待队列, worker线程会从队列里面取任务来运行。
由以下代码可知计算任务线程池ScheduledThreadPoolExecutor能实现定时重复运行任务的核心就是当次任务运行完毕时把任务重新加入等待队列。

    private class ScheduledFutureTask<V>
                extends FutureTask<V> implements RunnableScheduledFuture<V> {
        //省略代码……
        
        public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();  //如果不是定期任务,运行run方法后结束
            else if (ScheduledFutureTask.super.runAndReset()) { //如果是定期任务,运行run方法,重设置状态
                setNextRunTime();
                //把任务重新加入等待队列
                reExecutePeriodic(outerTask);
            }
        }
        
    private void setNextRunTime() {
            long p = period;
            if (p > 0)
                time += p;
            else
                time = triggerTime(-p);
        }
    }

    void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        if (canRunInCurrentRunState(true)) {
            super.getQueue().add(task); //把任务重新加入等待队列
            if (!canRunInCurrentRunState(true) && remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }

compareTo方法

compareTo()方法比较两个任务的私有变量time的大小,用于在队列中进行有序排序。

  • 当前任务执行时间 先于(小于) 要比较的任务的执行时间,返回-1
  • 当前任务执行时间 后于(大于) 要比较的任务的执行时间,返回1

从上面可以得出结论:

  • a.compareTo(b) < 0:a小于b,即a的执行时间在前
  • a.compareTo(b) > 0:a大于b,即a的执行时间在后
    public int compareTo(Delayed other) {
        if (other == this) // compare zero if same object
            return 0;
        if (other instanceof ScheduledFutureTask) {
            ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
            long diff = time - x.time;
            if (diff < 0)
                return -1;  //当前任务执行时间 先于(小于) 要比较的任务的执行时间
            else if (diff > 0)
                return 1;   //当前任务执行时间 后于大于) 要比较的任务的执行时间
            else if (sequenceNumber < x.sequenceNumber)
                return -1;
            else
                return 1;
        }
        long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
        return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
    }

cancel方法

cancel方法中的参数mayInterruptIfRunning表示是否中断正在运行的线程,
在计划线程池ScheduledThreadPoolExecutor内部的使用中,mayInterruptIfRunning都为false,少于中断线程这个操作。

cancel(false)方法里,尝试将新任务的状态转为CANCELLED,转为CANCELLED成功后,如果removeOnCancel为true则把任务从队列中移除,如果不是新任务(状态为NEW)则表示任务已经开始运行,无法取消,在FutureTask类的run()方法里如果状态为非NEW的时候会停止执行任务。

removeOnCancel默认为false,也就是不会从队列出移除已取消的任务,但当已取消的任务在线程中运行时,由于状态为CANCELLED会马上返回,不会继续执行。

cancel方法的流程图如下

在这里插入图片描述

源码:

    public boolean cancel(boolean mayInterruptIfRunning) {
        
        boolean cancelled = super.cancel(mayInterruptIfRunning);
        if (cancelled && removeOnCancel && heapIndex >= 0)
            remove(this);
        return cancelled;
    }
       
    //继承自FutureTask    
    public boolean cancel(boolean mayInterruptIfRunning) {
        //尝试将状态为NEW的任务的转为CANCELLED状态
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            //能执行到这里的都是将任务状态成功转为CANCELLED的,
            //逐个通知等待结果的线程任务已经被取消
            finishCompletion(); 
        }
        return true;
    }

8. 任务在计划线程线执行过程

任务在提交后最终会在工作线程中执行,worker工作线程run()方法的调用栈比基础线程池的简单Runnable任务要嵌套了几层,理清后对整体流程有更好的理解。

worker工作线程启动后的调用栈

在这里插入图片描述

调用栈执行顺序

  1. Thread.run()
  2. ThreadPoolExecutor$Worker#run();
  3. ThreadPoolExecutor$Worker#runWorker()
  4. ScheduledThreadPoolExecutor$ScheduledFutureTask#run()
  5. FutureTask#runAndReset()
  6. FutureTask#call()
  7. Executors$RunnableAdapter#call()
  8. 提交的执行任务run()方法

可以看到工作线程的run()方法被包裹了几层,每个被包裹层增加了特定功能:

调用栈备注
Worker#run()工作线程
ScheduledFutureTask#run()实现 周期性重复 运行
FutureTask#runAndReset()重置FutureTask的结果状态以便不影响后续重复执行
FutureTask#call()实现了Callable
Executors$RunnableAdapter#call()适配器,将Runnable转为Callable
提交的任务run()方法真正提交的任务

任务从提交开始在计划线程线执行过程的时序图如下:

在这里插入图片描述

任务从提交开始在计划线程线执行过程的简要代码如下:

public class ScheduledThreadPoolExecutor {

	//工作线程类,初始化的时候会创建线程,线程的运行的Runnable为this(当前worker实例)
    private final class Worker implements Runnable {
        final Thread thread;
        Runnable firstTask;
        
       Worker(Runnable firstTask) {
            this.firstTask = firstTask;
            //创建线程, 线程的运行的Runnable为this(当前worker实例)
            this.thread = getThreadFactory().newThread(this);
        }
        
        //线程运行的run方法
        public void run() {
            runWorker(this);  
        }
    }

    
    //1. 提交任务,由计划线程池进行任务调度
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        //省略代码……
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);  //调用2
        return t;
    }

    
    //2. 把任务放入队列
    private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
            super.getQueue().add(task);
            if (//省略代码……)
            //省略代码……
            else
                ensurePrestart();  //调用步骤3确保线程已预热
        }
    }

    //3. 预热,确保工作线程已启动  
    void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }

    
    //4. 增加并启动worker工作线程
    private boolean addWorker(Runnable firstTask, boolean core) {
        Worker w = null;
        try {
            //省略代码……
            w = new Worker(firstTask); //2.1 新建worker实例
            final Thread t = w.thread;
            if (t != null) {
                try {
                    //省略代码……
                    workers.add(w);   //2.2 放入集合用于管理
                }
                if (workerAdded) {
              		//2.3 启动worker线程,
              		//    线程启动后运行worker的run()方法,
              		//    worker的run()会运行runWorker(this)方法,
              		//    所以线程启动后实际运行的是runWorker(this)方法;
                    t.start();        
                }
            }
            //省略代码……
        }
        return workerStarted;
    }
    
    //5. 线程运行循环体
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        //省略代码……
        try {
            //核心循环体: 执行第一个任务,完成后循环地从队列取任务执行
            while (task != null || (task = getTask()) != null) {//5.1 通过getTask()取任务
                //省略代码……
                try {
                    try {
                        task.run();  //5.2 运行任务,任务的类型为ScheduledFutureTask
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    }
                    //省略代码……
                }
                //省略代码……
            }
        }
    }
    
    private class ScheduledFutureTask<V>
        extends FutureTask<V> implements RunnableScheduledFuture<V> {
       
       //6. 运行任务,计算任务下个周期运行的时间缀,任务重新放回队列
        public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) { //6.1 runAndReset()
                setNextRunTime();               //6.2 计算任务下个周期运行的时间缀
                reExecutePeriodic(outerTask);   //6.3 任务重新放回队列
            }
        }

        //6.1 该方法继承自父类FutureTask, 重置状态的是任务是否已完成
        protected boolean runAndReset() {
            //省略代码……
            boolean ran = false;
            int s = state;
            try {
                Callable<V> c = callable;
                if (c != null && s == NEW) {
                    try {
                        c.call(); // don't set result
                        ran = true;
                    } catch (Throwable ex) {
                        setException(ex);
                    }
                }
            } finally {
                //省略代码……
            }
            return ran && s == NEW;
        }
    
    }
 
    //6.2 计算下个周期运行的时间缀
    private void setNextRunTime() {
        long p = period;
        if (p > 0)
            time += p;
        else
            time = triggerTime(-p);
    }
 
    //6.3 任务重新放回队列, 线程池的线程会从队列取任执行(步骤5.1)
    void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        if (canRunInCurrentRunState(true)) {
            super.getQueue().add(task);
            //省略代码……
        }
    }
    
}

9. 计划线程池ScheduledThreadPoolExecutor的关闭

计划线程池的关闭继承自线程池ThreadPoolExecutor的shutdown()shutdownNow(),保留了这两个方法的全部功能。(如果对ThreadPoolExecutor还不是很熟悉,可以阅读一下这篇文章:
Java线程池ThreadPoolExecutor运行机制和源码解析。)

另外,ThreadPoolExecutor给计划线程池ScheduledThreadPoolExecutor预留的调用钩子onShutdown(),用于根据executeExistingDelayedTasksAfterShutdowncontinueExistingPeriodicTasksAfterShutdown这两个策略判断是否继续运行延迟和周期性任务。

和线程池不同的是计划线程池在onShutdown()根据条件对延迟任务和周期性任务进行取消操作。

  • executeExistingDelayedTasksAfterShutdown:是否在shutdown后继续运行延迟任务
  • continueExistingPeriodicTasksAfterShutdown:是否在shutdown后继续运行周期性任务

onShutdown()源码

@Override void onShutdown() {
        BlockingQueue<Runnable> q = super.getQueue();
        boolean keepDelayed =
            getExecuteExistingDelayedTasksAfterShutdownPolicy();
        boolean keepPeriodic =
            getContinueExistingPeriodicTasksAfterShutdownPolicy();
        
        if (!keepDelayed && !keepPeriodic) {
            //如果保留延迟任务和保留周期性任务都为false, 清空队列
            for (Object e : q.toArray())
                if (e instanceof RunnableScheduledFuture<?>)
                    ((RunnableScheduledFuture<?>) e).cancel(false);
            q.clear();
        }
        else {
            // Traverse snapshot to avoid iterator exceptions
            for (Object e : q.toArray()) {
                if (e instanceof RunnableScheduledFuture) {
                    RunnableScheduledFuture<?> t =
                        (RunnableScheduledFuture<?>)e;
                    if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
                        //满足以下条件的一个则把任务从队列移除并取消任务
                        //1.是周期性任务但设置了不保留周期性任务
                        //2.一次性延迟任务但设置了不保留延迟任务
                        //3.任务已取消
                        t.isCancelled()) { // also remove if already cancelled
                        if (q.remove(t))
                            t.cancel(false);
                    }
                }
            }
        }
        tryTerminate();
    }
}

结语

本文对java计划线程池ScheduledThreadPoolExecutor的分析就到这里。

如有不正,欢迎指出。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1472850.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

成都东部新区文化旅游体育局莅临国际数字影像产业园参观入驻企业,共促政产交流“零距离”

2月23日&#xff0c;成都东部新区文化旅游体育局投服处处长田东一行莅临国际数字影像产业园考察交流&#xff0c;树莓科技&#xff08;成都&#xff09;集团有限公司副总裁吴晓平、行政运营经理郭宇风、国际数字影像产业园项目负责人万里全程接待。 吴晓平副总带领田东处长一行…

开发知识点-.netC#图形用户界面开发之WPF

C#图形用户界面开发 框架简介WinForms(Windows Forms):WPF(Windows Presentation Foundation):UWP(Universal Windows Platform):MAUI(Multi-platform App UI):选择控件参考文章随笔分类 - WPF入门基础教程系列基于C#语言的GUI开发,主要介绍WPF框架

CAD怎么绘制建筑平面图纸?

CAD沪指图纸很简单&#xff0c;想要绘制一个简单的建筑图纸&#xff0c;该怎么绘制建筑平面图呢&#xff1f;下面我们就来看看详细的教程。 1、首先&#xff0c;运用绘图功能中的直线按照比例尺寸绘制出轴网。轴网绘制我们一般将轴网的颜色选择为红色&#xff0c;轴网的线型选择…

jdk21本地执行flink出现不兼容问题

环境说明&#xff1a;换电脑尝尝鲜&#xff0c;jdk&#xff0c;flink都是最新的&#xff0c;千辛万苦把之前的项目编译通过&#xff0c;跑一下之前的flink项目发现启动失败&#xff0c;啥都不说了上异常 Exception in thread "main" java.lang.IllegalAccessError: …

一次奇怪的事故:机器网络连接打满,导致服务不可用

业务背景 发生事故的业务系统是一个toB业务&#xff0c;业务是服务很多中小企业进行某项公共信息指标查询。系统特点:业务处理相对简单&#xff0c;但是流量大&#xff0c;且对请求响应要求较高&#xff1a; 业务请求峰值qps达50w&#xff0c;平时流量达20w左右。 请求响应时…

18 SpringMVC实战

18 SpringMVC实战 1. 课程介绍2. Spring与Spring MVC环境配置 1. 课程介绍 2. Spring与Spring MVC环境配置

Nginx之rewrite重写功能

一、rewrite概述 1、rewrite功能 访问重写 rewrite 是 Nginx HTTP 请求处理过程中的一个重要功能&#xff0c;它是以模块的形式存在于代码中的&#xff0c;其功能是对用户请求的 URI 进行 PCRE 正则重写&#xff0c;然后返回 30 重定向跳转或按条件执行相关配置。 Nginx服务…

JSON简介以及如何在Python中使用JSON

什么是JSON&#xff1f; JSON是"JavaScript Object Notation"的简称&#xff0c;是一种数据交换格式 JSON格式 假设我们有一个对象&#xff0c;这个对象有两个属性&#xff1a;“name”跟“age”。 在JSON中是这样表达的&#xff1a; { "name":"男孩…

51.仿简道云公式函数实战-文本函数-JOIN

1. JOIN函数 JOIN 函数可通过连接符将数组的值连成文本。 2. 函数用法 JOIN(数组,"连接符") 3. 函数示例 如需将复选框中勾选的选项通过”-“组合在一起&#xff0c;则可设置公式为JOIN(复选框组,"-") 4. 代码实战 首先我们在function包下创建text包…

用户态协议栈01-udp收发

文章目录 用户态协议栈01-udp收发前期准备DPDK初始化开始搓udp协议栈配置dpdk定义udp相关变量接受udp数据&&读取包内容接口层拼接udp数据包完整代码 如何启动实验如何编译使用效果 用户态协议栈01-udp收发 实现用户态协议栈最最简单的就是实现Udp的收发&#xff0c;下…

高性能API云原生网关 APISIX安装与配置指南

Apache APISIX是Apache软件基金会下的顶级项目&#xff0c;由API7.ai开发并捐赠。它是一个高性能的云原生API网关&#xff0c;具有动态、实时等特点。 APISIX网关可作为所有业务的流量入口&#xff0c;为用户提供了丰富的功能&#xff0c;包括动态路由、动态上游、动态证书、A…

将SU模型导入ARCGIS,并获取高度信息,多面体转SHP文件(ARCMAP)

问题:将Sketchup中导出的su模型,导入arcgis并得到面shp文件,进而获取各建筑的高度、面积等信息。 思路: (1)导入arcgis得到多面体 (2)转为面shp文件 (3)计算高度/面积等 1、【3D Analyst工具】【转换】【由文件转出】【导入3D文件】(在此步骤之间,建议先建立一个…

flink学习之旅(二)

目前flink中的资源管理主要是使用的hadoop圈里的yarn&#xff0c;故此需要先搭建hadoop环境并启动yarn和hdfs&#xff0c;由于看到的教程都是集群版&#xff0c;现实是只有1台机器&#xff0c;故此都是使用这台机器安装。 1.下载对应hadoop安装包 https://dlcdn.apache.org/h…

linux centos7.9改dns和ip

vi /etc/sysconfig/network-scripts/ifcfg-ens32 &#xff1a;wq后 重启网络服务 systemctl restart network —————————————————————————— 篇外话题 软件下载 xshell可以从腾讯软件中心下载

dpdk协议栈之udp架构优化

dpdk优势 传统网络架构与 DPDK&#xff08;Data Plane Development Kit&#xff09;网络架构之间存在许多区别&#xff0c;而 DPDK 的优势主要体现在以下几个方面&#xff1a; 数据包处理性能&#xff1a;传统网络架构中&#xff0c;网络数据包的处理通常由操作系统的网络协议…

探索便捷办公新选择:ONLYOFFICE 桌面编辑器

目录 引言 1. ONLYOFFICE 桌面编辑器简介 2. 功能特点 2.1 多格式支持 2.2 实时协作编辑 2.3 兼容性与格式保持 2.4 丰富的编辑功能 3. 使用方法 3.1 下载安装 3.2 打开文档 3.3 编辑文档 3.4 保存和共享 4. 注意事项 4.1 版本更新 4.2 网络连接 4.3 安全性 5.…

【电子书】移动开发

整理了一些互联网电子书&#xff0c;推荐给大家 移动开发 Android App开发入门与项目实战.epubAndroid Studio应用开发实战详解.epubAndroid Studio开发实战&#xff1a;从零基础到App上线.epubAndroid 游戏开发大全&#xff08;第二版&#xff09;.epubAndroid 源码设计模式…

k8s分布式图床(k8s,metricsapi,vue3+ts)

image-manage 文档 warning 注意⚠️ 1. 你需要至少一个mysql数据库 2. 你需要至少一个redis数据库 3. 你需要一个版本至少 kubernetes 1.29的集群(集群可选) ::: 单机部署(docker) # clone the project docker run -p 8080:8080 \-v 你的数据目录:/app\-e CONFIG_ISCLUST…

C语言函数递归

一、什么是递归 递归实际上就是函数自己调用自己。 递归在书写的时候&#xff0c;有2个必要条件&#xff1a; • 递归存在限制条件&#xff0c;当满足这个限制条件的时候&#xff0c;递归便不再继续。 • 每次递归调用之后越来越接近这个限制条件。 在下面的例子中&#xff0…

WPF 附加属性+控件模板,完成自定义控件。建议观看HandyControl源码

文章目录 相关连接前言需要实现的效果附加属性添加附加属性&#xff0c;以Test修改FontSize为例依赖属性使用触发器使用直接操控 结论 控件模板&#xff0c;在HandyControl的基础上面进行修改参考HandyControl的源码控件模板原型控件模板 结论 相关连接 WPF控件模板(6) WPF 附加…