并发编程之Executor线程池原理与源码解读

news2024/12/24 22:14:46

线程

线程是调度CPU资源的最小单位,线程模型分为KLT模型与ULT模型,JVM使用的KLT模

型,Java线程与OS线程保持1:1的映射关系,也就是说有一个java线程也会在操作系统里有一个对应的线程。Java线程有多种生命状态

NEW,新建

RUNNABLE,运行

BLOCKED,阻塞

WAITING,等待

TIMED_WAITING,超时等待

TERMINATED,终结

状态切换如下图所示:

 

协程

协程 (纤程,用户级线程),目的是为了追求最大力度的发挥硬件性能和提升软件的速度,协程基本原理是:在某个点挂起当前的任务,并且保存栈信息,去执行另一个任

务;等完成或达到某个条件时,再还原原来的栈信息并继续执行(整个过程线程不需要    上下文切换)。

Java原生不支持协程,在纯java代码里需要使用协程的话需要引入第三方包,如:quasar

线程池

“线程池”,顾名思义就是一个线程缓存,线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,因此Java中提供线程池对线程进行统一分配、调优和监控

线程池介绍

在web开发中,服务器需要接受并处理请求,所以会为一个请求来分配一个线程来进行处理。如果每次请求都新创建一个线程的话实现起来非常简便,但是存在一个问题:

如果并发的请求数量非常多,但每个线程执行的时间很短,这样就会频繁的创建和销毁线程,如此一来会大大降低系统的效率。可能出现服务器在为每个请求创建新线程和销毁线程上花费的时间和消耗的系统资源要比处理实际的用户请求的时间和资源更多。

那么有没有一种办法使执行完一个任务,并不被销毁,而是可以继续执行其他的任务呢?

这就是线程池的目的了。线程池为线程生命周期的开销和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。

什么时候使用线程池?

单个任务处理时间比较短需要处理的任务数量很大

线程池优势

1、重用存在的线程,减少线程创建,消亡的开销,提高性能

2、提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。

3、提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

线程的实现方式

  1. // 实现Runnable接口的类将被Thread执行,表示一个基本的任务
  2. public interface Runnable {
  3. // run方法就是它所有的内容,就是实际执行的任务
  4. public abstract void run();

5 }

  1. //Callable同样是任务,与Runnable接口的区别在于它接收泛型,同时它执行任务后带有返回内容
  2. public interface Callable<V> {
  3. // 相对于run方法的带有返回值的call方法
  4. V call() throws Exception;

10 }

Runnable,Thread,Callable

Executor框架

Executor接口是线程池框架中最基础的部分,定义了一个用于执行Runnable的execute方法。

下图为它的继承与实现

 

从图中可以看出Executor下有一个重要子接口ExecutorService,其中定义了线程池的具体行为

1,execute(Runnable command):履行Ruannable类型的任务,

2,submit(task):可用来提交Callable或Runnable任务,并返回代表此任务的Future 对象

3,shutdown():在完成已提交的任务后封闭办事,不再接管新任务,

4,shutdownNow():停止所有正在履行的任务并封闭办事。

5,isTerminated():测试是否所有任务都履行完毕了。

6,isShutdown():测试是否该ExecutorService已被关闭。

线程池重点属性

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static final int COUNT_BITS = Integer.SIZE ­ 3; private static final int CAPACITY           = (1 << COUNT_BITS) ­ 1;

ctl 是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两

部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),这里可以看到,使用了Integer类型来保存,高3位保存runState,低29位保存

workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿。

ctl相关方法

private static int runStateOf(int c)     { return c & ~CAPACITY; } private static int workerCountOf(int c)  { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }

runStateOf:获取运行状态;

workerCountOf:获取活动线程数;

ctlOf:获取运行状态和活动线程数的值。

线程池存在5种状态

  1. RUNNING = ‐1 << COUNT_BITS; //3位为111
  2. SHUTDOWN = 0 << COUNT_BITS; //3位为000
  3. STOP = 1 << COUNT_BITS; //3位为001
  4. TIDYING = 2 << COUNT_BITS; //3位为010
  5. TERMINATED = 3 << COUNT_BITS; //3位为011

1、RUNNING

(1) 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。

(02) 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0!

2、 SHUTDOWN

  1. 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
  2. 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。

3、STOP

  1. 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
  2. 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。

4、TIDYING

  1. 状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING 状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在

ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理; 可以通过重载terminated()函数来实现。

  1. 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。 当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。

5、 TERMINATED

  1. 状态说明:线程池彻底终止,就变成TERMINATED状态。
  2. 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -

> TERMINATED。

进入TERMINATED的条件如下:

线程池不是RUNNING状态;

线程池状态不是TIDYING状态或TERMINATED状态;

如果线程池状态是SHUTDOWN并且workerQueue为空;

workerCount为0;

设置TIDYING状态成功。

 

线程池的具体实现

ThreadPoolExecutor 默认线程池

ScheduledThreadPoolExecutor 定时线程池

ThreadPoolExecutor

    1. ThreadPoolExecutor(int corePoolSize,
    1. int maximumPoolSize,
    2. long keepAliveTime,
    3. TimeUnit unit,
    4. BlockingQueue<Runnable> workQueue,
    5. ThreadFactory threadFactory,
    6. RejectedExecutionHandler handler)

    线程池的创建

任务提交

1public void execute() //提交任务无返回值

2public Future<?> submit() //任务执行完成后有返回值

参数解释

corePoolSize

线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到 阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。

maximumPoolSize

线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize;

keepAliveTime

线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时

候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime;

unit

keepAliveTime的单位;

workQueue

用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了如下阻塞队列:

1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;

2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;

3、SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;

4、priorityBlockingQuene:具有优先级的无界阻塞队列;

threadFactory

它是ThreadFactory类型的变量,用来创建新线程。默认使用

Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。

handler

线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:

1、AbortPolicy:直接抛出异常,默认策略;

2、CallerRunsPolicy:用调用者所在的线程来执行任务;

3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;

4、DiscardPolicy:直接丢弃任务;

上面的4种策略都是ThreadPoolExecutor的内部类。

当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

线程池监控

  1. public long getTaskCount() //线程池已执行与未执行的任务总数
  2. public long getCompletedTaskCount() //已完成的任务数
  3. public int getPoolSize() //线程池当前的线程数
  4. public int getActiveCount() //线程池中正在执行任务的线程数量
线程池原理

源码分析

execute方法

  1. public void execute(Runnable command) {
  2. if (command == null

3   throw new NullPointerException();

4 /*

5   * clt记录着runStateworkerCount

6   */

7   int c = ctl.get();

8 /*

  1. * workerCountOf方法取出低29位的值,表示当前活动的线程数;
  2. * 如果当前活动线程数小于corePoolSize,则新建一个线程放入线程池中;
  3. * 并把任务添加到该线程中。

12   */

13   if (workerCountOf(c) < corePoolSize) {

14 /*

  1. * addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize

来判断还是maximumPoolSize来判断;

  1. * 如果为true,根据corePoolSize来判断;
  2. * 如果为false,则根据maximumPoolSize来判断

18   */

  1. if (addWorker(command, true))
  2. return;

21 /*

22   * 如果添加失败,则重新获取ctl

23   */

24   c = ctl.get();

25   }

26 /*

27   * 如果当前线程池是运行状态并且任务添加到队列成功

28   */

  1. if (isRunning(c) && workQueue.offer(command)) {
  2. // 重新获取ctl
  3. int recheck = ctl.get();
  4. // 再次判断线程池的运行状态,如果不是运行状态,由于之前已经把comman 添加到workQueue中了,
  5. // 这时需要移除该command
  6. // 执行过后通过handler使用拒绝策略对该任务进行处理,整个方法返回
  7. if (! isRunning(recheck) && remove(command))
  8. reject(command);

37 /*

  1. * 获取线程池中的有效线程数,如果数量是0,则执行addWorker方法
  2. * 这里传入的参数表示:
  3. * 1. 第一个参数为null,表示在线程池中创建一个线程,但不去启动;
  4. * 2. 第二个参数为false,将线程池的有限线程数量的上限设置为maximumP

olSize,添加线程时根据maximumPoolSize来判断;

  1. * 如果判断workerCount大于0,则直接返回,在workQueue中新增的comman 会在将来的某个时刻被执行。

43   */

  1. else if (workerCountOf(recheck) == 0)
  2. addWorker(null, false);

46   }

47 /*

  1. * 如果执行到这里,有两种情况:
  2. * 1. 线程池已经不是RUNNING状态;
  3. * 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQ

ueue已满。

  1. * 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize
  2. * 如果失败则拒绝该任务

53   */

  1. else if (!addWorker(command, false))
  2. reject(command);

56 }

简单来说,在执行execute()方法时如果状态一直是RUNNING时,的执行过程如下:

    1. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;
    2. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;
    3. 如    果    workerCount     >=     corePoolSize     &&     workerCount     <

maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;

    1. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

这里要注意一下addWorker(null, false);,也就是创建一个线程,但并没有传入任务,因为任务已经被添加到workQueue中了,所以worker在执行的时候,会直接从workQueue中获取任务。所以,在workerCountOf(recheck) == 0时执行addWorker(null, false);也是为了保证线程池在RUNNING状态下必须要有一个线程来执行任务。

addWorker方法

addWorker方法的主要工作是在线程池中创建一个新的线程并执行,firstTask参数 用于指定新增的线程执行的第一个任务,core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于

maximumPoolSize,代码如下:

  1. private boolean addWorker(Runnable firstTask, boolean core) {
  2. retry:

3   for (;;) {

  1. int c = ctl.get();
  2. // 获取运行状态
  3. int rs = runStateOf(c);

7   /*

  1. * 这个if判断
  2. * 如果rs >= SHUTDOWN,则表示此时不再接收新任务;
  3. * 接着判断以下3个条件,只要有1个不满足,则返回false
  4. * 1. rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务
  5. * 2. firsTask为空
  6. * 3. 阻塞队列不为空

14   *

  1. * 首先考虑rs == SHUTDOWN的情况
  2. * 这种情况下不会接受新提交的任务,所以在firstTask不为空的时候会返回

alse

  1. * 然后,如果firstTask为空,并且workQueue也为空,则返回false
  2. * 因为队列中已经没有任务了,不需要再添加线程了

19   */

  1. // Check if queue empty only if necessary.
  2. if (rs >= SHUTDOWN &&
  3. ! (rs == SHUTDOWN &&
  4. firstTask == null &&
  5. ! workQueue.isEmpty()))
  6. return false;

26   for (;;) {

  1. // 获取线程数
  2. int wc = workerCountOf(c);
  3. // 如果wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是291), 返回false
  4. // 这里的coreaddWorker方法的第二个参数,如果为true表示根据corePo

olSize来比较,

  1. // 如果为false则根据maximumPoolSize来比较。

32   //

  1. if (wc >= CAPACITY ||
  2. wc >= (core ? corePoolSize : maximumPoolSize))
  3. return false;
  1. // 尝试增加workerCount,如果成功,则跳出第一个for循环
  2. if (compareAndIncrementWorkerCount(c))
  3. break retry;
  4. // 如果增加workerCount失败,则重新获取ctl的值
  5. c = ctl.get(); // Re‐read ctl
  6. // 如果当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行
  7. if (runStateOf(c) != rs)
  8. continue retry;
  9. // else CAS failed due to workerCount change; retry inner loop

45   }

46   }

  1. boolean workerStarted = false;
  2. boolean workerAdded = false;
  3. Worker w = null;
  4. try {
  5. // 根据firstTask来创建Worker对象
  6. w = new Worker(firstTask);
  7. // 每一个Worker对象都会创建一个线程
  8. final Thread t = w.thread;
  9. if (t != null) {
  10. final ReentrantLock mainLock = this.mainLock;
  11. mainLock.lock();
  12. try {
  13. int rs = runStateOf(ctl.get());
  14. // rs < SHUTDOWN表示是RUNNING状态;
  15. // 如果rsRUNNING状态或者rsSHUTDOWN状态并且firstTasknull,向线程池中添加线程。
  16. // 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任
  17. if (rs < SHUTDOWN ||
  18. (rs == SHUTDOWN && firstTask == null)) {
  19. if (t.isAlive()) // precheck that t is startable
  20. throw new IllegalThreadStateException();
  21. // workers是一个HashSet
  22. workers.add(w);

  1. int s = workers.size();
  2. // largestPoolSize记录着线程池中出现过的最大线程数量
  3. if (s > largestPoolSize)
  4. largestPoolSize = s;
  5. workerAdded = true;

74   }

  1. } finally {
  2. mainLock.unlock();

77   }

  1. if (workerAdded) {
  2. // 启动线程
  3. t.start();
  4. workerStarted = true;

82   }

83   }

  1. } finally {
  2. if (! workerStarted)
  3. addWorkerFailed(w);

87   }

88   return workerStarted;

89 }

Worker类

线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组

Worker对象,请参见JDK源码。

Worker类继承了AQS,并实现了Runnable接口,注意其中的firstTask和thread属性:firstTask用它来保存传入的任务;thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。

在   调   用   构   造   方   法   时   ,   需   要   把   任   务   传   入   ,   这   里   通   过

getThreadFactory().newThread(this); 来新建一个线程, newThread方法传入的参数是

this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在启动的时候会调用Worker类中的run方法。

Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的:

    1. lock方法一旦获取了独占锁,表示当前线程正在执行任务中;
    2. 如果正在执行任务,则不应该中断线程;
    3. 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务, 这时可以对该线程进行中断;
    4. 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers 方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;
    5. 之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。

所以,Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断。

此外,在构造方法中执行了setState(-1);,把state变量设置为-1,为什么这么做呢? 是因为AQS中默认的state是0,如果刚创建了一个Worker对象,还没有执行任务时,这时就不应该被中断,看一下tryAquire方法:

  1. protected boolean tryAcquire(int unused) {
  2. //cas修改state,不可重入
  3. if (compareAndSetState(0, 1)) {
  4. setExclusiveOwnerThread(Thread.currentThread());
  5. return true;

6   }

7   return false;

8 }

tryAcquire方法是根据state是否是0来判断的,所以,setState(-1);将state设置为-1是

为了禁止在执行任务前对线程进行中断。

正因为如此,在runWorker方法中会先调用Worker对象的unlock方法将state设置为

0。

runWorker方法

在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的代码如下:

  1. final void runWorker(Worker w) {
  2. Thread wt = Thread.currentThread();
  3. // 获取第一个任务

  1. Runnable task = w.firstTask;
  2. w.firstTask = null;
  3. // 允许中断
  4. w.unlock(); // allow interrupts
  5. // 是否因为异常退出循环
  6. boolean completedAbruptly = true;
  7. try {
  8. // 如果task为空,则通过getTask来获取任务
  9. while (task != null || (task = getTask()) != null) {
  10. w.lock();
  11. if ((runStateAtLeast(ctl.get(), STOP) ||
  12. (Thread.interrupted() &&
  13. runStateAtLeast(ctl.get(), STOP))) &&
  14. !wt.isInterrupted())
  15. wt.interrupt();
  16. try {
  17. beforeExecute(wt, task);
  18. Throwable thrown = null;
  19. try {
  20. task.run();
  21. } catch (RuntimeException x) {
  22. thrown = x; throw x;
  23. } catch (Error x) {
  24. thrown = x; throw x;
  25. } catch (Throwable x) {
  26. thrown = x; throw new Error(x);
  27. } finally {
  28. afterExecute(task, thrown);

32   }

  1. } finally {
  2. task = null;
  3. w.completedTasks++;
  4. w.unlock();

37   }

38   }

  1. completedAbruptly = false;
  2. } finally {
  3. processWorkerExit(w, completedAbruptly);

42   }

43 }

这里说明一下第一个if判断,目的是:

如果线程池正在停止,那么要保证当前线程是中断状态; 如果不是的话,则要保证当前线程不是中断状态;

这里要考虑在执行该if语句期间可能也执行了shutdownNow方法,shutdownNow方法会把状态设置为STOP,回顾一下STOP状态:

不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于

RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态。

STOP状态要中断线程池中的所有线程,而这里使用Thread.interrupted()来判断是否中断是 为 了 确 保 在 RUNNING 或 者 SHUTDOWN 状 态 时 线 程 是 非 中 断 状 态 的 , 因 为

Thread.interrupted()方法会复位中断的状态。总结一下runWorker方法的执行过程:

    1. while循环不断地通过getTask()方法获取任务;
    2. getTask()方法从阻塞队列中取任务;
    3. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
    4. 调用task.run()执行任务;
    5. 如果task为null则跳出循环,执行processWorkerExit()方法;
    6. runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。这里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor类中是空的,留给子类来实现。

completedAbruptly 变 量 来 表 示 在 执 行 任 务 过 程 中 是 否 出 现 了 异 常 , 在

processWorkerExit方法中会对该变量的值进行判断。

getTask方法

1 private Runnable getTask() {


getTask方法用来从阻塞队列中取任务,代码如下:

  1. // timeOut变量的值表示上次从阻塞队列中取任务时是否超时
  2. boolean timedOut = false; // Did the last poll() time out?

4   for (;;) {

  1. int c = ctl.get();
  2. int rs = runStateOf(c);
  3. // Check if queue empty only if necessary.

8   /*

  1. * 如果线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行以下判断:
  2. * 1. rs >= STOP,线程池是否正在stop
  3. * 2. 阻塞队列是否为空。
  4. * 如果以上条件满足,则将workerCount1并返回null
  5. * 因为如果当前线程池状态的值是SHUTDOWN或以上时,不允许再向阻塞队列中添加任务。

14   */

  1. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  2. decrementWorkerCount();
  3. return null;

18   }

  1. int wc = workerCountOf(c);
  2. // Are workers subject to culling?
  3. // timed变量用于判断是否需要进行超时控制。
  4. // allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;
  5. // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
  6. // 对于超过核心线程数量的这些线程,需要进行超时控制
  7. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

26

27   /*

  1. * wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了se tMaximumPoolSize方法;
  2. * timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时
  3. * 接下来判断,如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将w

orkerCount1

  1. * 如果减1失败,则返回重试。
  2. * 如果wc == 1时,也就说明当前线程是线程池中唯一的一个线程了。

33   */

  1. if ((wc > maximumPoolSize || (timed && timedOut))
  2. && (wc > 1 || workQueue.isEmpty())) {
  3. if (compareAndDecrementWorkerCount(c))
  4. return null;
  5. continue;

39   }

40   try {

41   /*

  1. * 根据timed来判断,如果为true,则通过阻塞队列的poll方法进行超时控制,如果在keepAliveTime时间内没有获取到任务,则返回null
  2. * 否则通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为空。

44   *

45   */

  1. Runnable r = timed ?
  2. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  3. workQueue.take();
  4. if (r != null)
  5. return r;
  6. // 如果 r == null,说明已经超时,timedOut设置为true
  7. timedOut = true;
  8. } catch (InterruptedException retry) {
  9. // 如果获取任务时当前线程发生了中断,则设置timedOutfalse并返回循环重试
  10. timedOut = false;

56   }

57   }

58 }

这里重要的地方是第二个if判断,目的是控制线程池的有效线程数量。由上文中的分析

可以知道,在执行execute方法时,如果当前线程池的线程数量超过了corePoolSize且小于

maximumPoolSize,并且workQueue已满时,则可以增加工作线程,但这时如果超时没有获取到任务,也就是timedOut为true的情况,说明workQueue已经为空了,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于corePoolSize数量的线程销毁掉,保持线程数量在corePoolSize即可。

什么时候会销毁?当然是runWorker方法执行完之后,也就是Worker中的run方法执行完,由JVM自动回收。

getTask 方法返回null 时, 在runWorker 方法中会跳出while 循环, 然后会执行

processWorkerExit方法。

processWorkerExit方法

  1. private void processWorkerExit(Worker w, boolean completedAbrupt ly) {
  2. // 如果completedAbruptly值为true,则说明线程执行时出现了异常,需要workerCount1
  3. // 如果线程执行时没有出现异常,说明在getTask()方法中已经已经对worke

Count进行了减1操作,这里就不必再减了。

  1. if (completedAbruptly) // If abrupt, then workerCount wasn't ad justed
  2. decrementWorkerCount();
  3. final ReentrantLock mainLock = this.mainLock;
  4. mainLock.lock();
  5. try {
  6. //统计完成的任务数
  7. completedTaskCount += w.completedTasks;
  8. // workers中移除,也就表示着从线程池中移除了一个工作线程
  9. workers.remove(w);
  10. } finally {
  11. mainLock.unlock();

15   }

  1. // 根据线程池状态进行判断是否结束线程池
  2. tryTerminate();
  3. int c = ctl.get();

19 /*

  1. * 当线程池是RUNNINGSHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker
  2. * 如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一worker
  3. * 如果allowCoreThreadTimeOut=falseworkerCount不少于corePoolSi

e

23   */

  1. if (runStateLessThan(c, STOP)) {
  2. if (!completedAbruptly) {

  1. int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
  2. if (min == 0 && ! workQueue.isEmpty())
  3. min = 1;
  4. if (workerCountOf(c) >= min)
  5. return; // replacement not needed

31   }

32   addWorker(null, false);

33   }

34 }


至此,processWorkerExit执行完之后,工作线程被销毁,以上就是整个工作线程的生

命周期,从execute方法开始,Worker使用ThreadFactory创建新的工作线程,

runWorker通过getTask获取任务,然后执行任务,如果getTask返回null,进入

processWorkerExit方法,整个线程结束,如图所示:

课程总结

分析了线程的创建,任务的提交,状态的转换以及线程池的关闭;

这里通过execute 方法来展开线程池的工作流程, execute 方法通过

corePoolSize,maximumPoolSize以及阻塞队列的大小来判断决定传入的任务应该

被立即执行,还是应该添加到阻塞队列中,还是应该拒绝任务。

介绍了线程池关闭时的过程,也分析了shutdown方法与getTask方法存在竞态条件;

在获取任务时,要通过线程池的状态来判断应该结束工作线程还是阻塞线程等待新的任务,也解释了为什么关闭线程池时要中断工作线程以及为什么每一个worker 都需要lock。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/22157.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

分布式架构演进过程

分布式的前提&#xff0c;我们得有多台服务器&#xff0c;那么我们需要知道世界上第一台计算机的由来&#xff0c;而第一台计算机的参考模型就是冯诺依曼模型&#xff0c;为此奠定了所有的分布式都在围绕着这个模型里面的某一块或者相互之间模块进行打交道。 搞分布式又有什么意…

【计算机毕业设计】7.健身俱乐部会籍管理系统+vue

一、系统截图&#xff08;需要演示视频可以私聊&#xff09; 摘 要 随着信息技术和网络技术的飞速发展&#xff0c;人类已进入全新信息化时代&#xff0c;传统管理技术已无法高效&#xff0c;便捷地管理信息。为了迎合时代需求&#xff0c;优化管理效率&#xff0c;各种各样的…

关于clickhouse单节点部署

因为公司网络问题&#xff0c;下载clickhouse无法使用命令下载&#xff0c;所以用压缩包进行下载安装。 首先在其官网下载下载安装包 https://packages.clickhouse.com/tgz/stable/ 一共要下载四个包 clickhouse-client clickhouse-common-static clickhouse-common-static-d…

miRNA 在基因调控中的作用

MicroRNA (miRNA) 是什么&#xff1f;“micro”“mi”是微小的意思&#xff0c;顾名思义&#xff0c;miRNA 就是小的非编码 RNA&#xff0c;长度约 23 个核苷酸 (nt)&#xff0c;它在转录后的基因调控中发挥关键作用&#xff0c;包括疾病的发生、细胞分化与组织发育&#xff0c…

山东大学线性代数-1-矩阵-2

目录 1.9 初等矩阵 1.9.1 初等矩阵的定义 1.9.2 初等矩阵的性质 1.9.3 初等矩阵与初等变换的关系 1.9.4 满秩矩阵的四种等价表述 1.10 逆矩阵的定义及可逆条件 1.10.1 逆矩阵的定义 1.10.2 矩阵可逆的条件 1.10.3 二阶可逆矩阵的逆矩阵求法 1.11 逆矩阵的求法&#x…

Apollo 应用与源码分析:Apollo工程概述与AUTOSAR架构

目录 Apollo 工程概述 目录结构 软件架构 硬件架构 AUTOSAR架构 Apollo 概述 目录结构 |-cyber 消息中间件&#xff0c;替换ros作为消息层 |-docker 容器相关 |-docs 文档相关 |-modules 自动驾驶模块&#xff0c;主要的定位&#xff0c;预测&#xff0c;感知&#xf…

【Java进阶篇】第四章 异常处理

文章目录1、异常2、类Throwable3、运行时异常和编译时异常4、异常的处理5、异常导致某些代码不能被执行6、try...catch总结7、异常对象的常用方法8、try...catch和finally9、final、finally和finalize的整理区分10、如何自定义异常11、异常在实际开发中的作用12、异常与方法覆盖…

带联网功能的RFID宿舍门禁(六)-两年后的再次总结

文章首发及后续更新&#xff1a;https://mwhls.top/4066.html&#xff0c;无图/无目录/格式错误/更多相关请至首发页查看。 新的更新内容请到mwhls.top查看。 欢迎提出任何疑问及批评&#xff0c;非常感谢&#xff01; 带联网功能的RFID宿舍门禁 两年后又来了次总结&#xff0c…

聚观早报|中国制造成世界杯交通主力;特斯拉拟召回32万辆车

今日要闻&#xff1a;中国制造成世界杯交通主力&#xff1b;特斯拉拟召回32万辆车&#xff1b;iPhone14pro发货或延期至2023年&#xff1b;Tik Tok逆势宣布招聘&#xff1b;世界杯部署2.2万电子眼中国制造成世界杯交通主力 据消息&#xff0c;来自中国的新能源客车成了服务本届…

美国、欧洲、中国关于工业互联网的比较分析

工业革命的发展历史 1.0•机械化•大规模 2.0•电气化•自动化 3.0•信息化•产品标准 4.0•网络化•定制 3.0相当于肯德基麦当劳门店全部机械化&#xff0c;全程过程控制&#xff0c;任何一批不合格的产品&#xff0c;都可以追溯到上游任何一个环节 4.0全部设备通过中央控…

【C++笔试强训】第二十九天

&#x1f387;C笔试强训 博客主页&#xff1a;一起去看日落吗分享博主的C刷题日常&#xff0c;大家一起学习博主的能力有限&#xff0c;出现错误希望大家不吝赐教分享给大家一句我很喜欢的话&#xff1a;夜色难免微凉&#xff0c;前方必有曙光 &#x1f31e;。 &#x1f4a6;&a…

【linux】coredump问题排查

序言 记录coredump问题的一些定位技巧 1. coredump简介 coredump称为核心转储&#xff0c;就是在进程异常时的一个快照&#xff0c;保存了异常时的内存、寄存器、堆栈等数据当进程接收到某些 信号 而导致异常退出时&#xff0c;就会生成 coredump 文件core文件是ELF文件格式…

docker-compose 安装 Prometheus + Grafana 配置监控页面

安装 Prometheus Grafana docker 编排 prometheus:image: prom/prometheus:v2.40.1container_name: prometheusports:- "9090:9090"volumes:- /docker/prometheus/prometheus.yml:/etc/prometheus/prometheus.ymlnetwork_mode: "host"grafana:image: gra…

【Linux】基础:进程控制

【Linux】基础&#xff1a;进程控制 摘要&#xff1a;本文主要介绍关于Linux进程控制内容&#xff0c;分为创建、退出、等待与替换四个板块&#xff0c;希望读者可以掌握每个板块的主要概念以及使用原因和调用方法。 文章目录【Linux】基础&#xff1a;进程控制一、进程创建1.1…

Nginx重定向

Rewrite简介 Rewrite是Nginx服务器提供的一个重要基本功能&#xff0c;是Web服务器产品中几乎必备的功能。主要的作用是用来实现URL的重写。 注意:Nginx服务器的Rewrite功能的实现依赖于PCRE的支持&#xff0c;因此在编译安装Nginx服务器之前&#xff0c;需要安装PCRE库。Ngin…

(续)SSM整合之springmvc笔记(SpringMVC处理ajax请求)(P154-158)

目录 SpringMVC处理ajax请求 一 准备工作 1 新建spring_mvc_ajax com.atguigu 2 .导入依赖 3 添加web模块 4 .配置web.xml 5 . springmvc.xml 6 .创建控制层 7 . index.html 8 静态 9 部暑到tomcat上 10 启动tomcat 二 . 测试SpringMVC处理ajax 1 . ind…

TCP三次握手与四次挥手详解

TCP三次握手(建立TCP连接): 建立TCP连接&#xff0c;就是指建立一个TCP连接时&#xff0c;需要客户端和服务总共发送3个包以确认连接的建立。在socket编程中&#xff0c;这一过程由客户端执行connect来触发。 在TCP/IP协议中,TCP协议提供可靠的连接服务,采用三次握手建立一个连…

GSN前瞻预处理

在数控加工等应用中&#xff0c;要求数控系统对机床进行平滑的控制&#xff0c;以防止较大的冲击影响零件的加工质量。 运动控制器的前瞻预处理功能可以根据用户的运动路径计算出平滑的速度规划&#xff0c;减少机床的冲击&#xff0c;从而提高加工精度。 下面用一个实例来说明…

一文了解Spring MVC(上)

目录 什么是Spring MVC 什么是MVC Spring MVC 和MVC的区别 怎么学Spring MVC Spring MVC的创建和连接 RequestMapping注解 获取参数 传递基础数据类型/包装类型&#xff08;无注解&#xff09; 添加RequestParam注解&#xff08;使用在方法参数上&#xff09; 传递自…

python——GIL锁详解

文章目录一、GIL全局解释器锁二、为什么会有GIL锁&#xff1f;三、多线程无法利用多核优势&#xff1f;计算密集型和IO密集型计算密集型——采用多进程计算密集型——采用多线程IO密集型——采用多进程IO密集型——采用多线程四、总结一、GIL全局解释器锁 1、GIL锁不是python的…