【线程池】Java 线程池 ThreadPoolExecutor 类源码介绍

news2024/9/27 7:23:15

文章目录

  • 前言
    • 线程池是什么
    • 线程池解决了哪些问题
    • 本文主要讲述什么
    • 感谢读者
  • 线程池 UML 类图
  • ThreadPoolExecutor 内部设计
    • 核心参数
    • 内部类
    • 任务队列
    • 拒绝策略
  • ThreadPoolExecutor 源码
    • 线程池生命周期
    • 线程池构造函数
    • execute() 【提交任务】
    • addWorker() 方法 【添加工作线程并启动】
      • 了解 retry:
      • addWordker() 方法讲述
    • Worker 类
    • runWorker 【执行任务】
    • getTask 【获取任务】
    • shutdown() 【停止所有线程】
    • shutdownNow() 【停止所有任务,中断执行的任务】
    • tryTerminate() 【根据线程池状态进行判断是否结束线程池】
  • 总结
  • 参考与感谢

前言

线程池是什么

随着现在计算机的飞速发展,现在的计算机基本都是多核 CPU 的了。在目前的系统中创建和销毁线程都是十分昂贵的,为了不频繁的创建和销毁,以及能够更好的管理线程,就出现了池化的思想。

线程池解决了哪些问题

线程池的出现,解决了系统中频繁创建和销毁线程的问题。从系统的角度看还解决了一些问题:
1、降低了资源损耗:线程池中会有一些线程,利用这些线程达到重复使用线程执行任务的作用,避免了频繁的线程创建和销毁,这样就大大降低了系统的资源损耗;
2、提高了线程的管理手段:通过监控线程池中线程的数量和状态,来进行调优和控制;
3、提高的响应速度:当一个任务给到线程池后,一旦有空闲的线程会立即执行该任务;

本文主要讲述什么

本文主要讲述线程池主要实现类 ThreadPoolExecutor 类的源码,让大家能够更加清楚明白 ThreadPoolExecutor 线程池内部的核心设计与实现,以及内部机制有哪些。

感谢读者

读者同学们如果发现了哪些不对的地方请评论或私信我,我会及时更正!非常感谢!!!



线程池 UML 类图

在这里插入图片描述

  • Executor 接口:这是线程池顶级的接口,定义了一个 execute(Runnable command) 方法,用于提交任务。
  • ExecutorService 接口:继承自 Executor 接口,提供了更多管理线程生命周期的方法,如 shutdown()、shutdownNow()、submit()、invokeAll() 等。主要的作用是扩充执行任务的能力和提供了管控线程池的方法。
  • AbstractExecutorService 抽象类:这是 ExecutorService 的一个抽象实现类,提供了一些 ExecutorService 接口的默认实现,减少了实现 ExecutorService 接口的类的工作量。让下层实现类只需要关注执行任务的方法即可。
  • ThreadPoolExecutor 类:ThreadPoolExecutor 是线程池实现类。继承 AbstractExecutorService,主要用于维护线程池的生命周期、管理线程和任务。它提供了多种构造方法,允许我们对线程池的行为进行详细配置,包括核心线程数、最大线程数、线程空闲时间、任务队列等。


ThreadPoolExecutor 内部设计

核心参数

参数名称参数解释
corePoolSize(必填)核心线程数,即线程池一直存活的最线程数量。但是将allowCoreThreadTimeOut参数设置为true后,核心线程处于空闲一段时间以上,也会被回收。
maximumPoolSize(必填)线程池中最大线程数,当核心线程都忙起来并且任务队列满了以后,就开始创建新线程,知道创建的数量到达设置的 maximumPoolSize 数。
keepAliveTime(必填)线程空闲时间,即当线程数大于核心线程数时,非核心线程的空闲时间超过这个时间后,就会被回收。将allowCoreThreadTimeOut参数设置为true后,核心线程也会被回收。
unit(必填)keepAliveTime 的时间单位。有:TimeUnit.DAYS(天)、TimeUnit.HOURS(小时)、TimeUnit.MINUTES(分钟)、TimeUnit.SECONDS(秒)、TimeUnit.MILLISECONDS(毫秒)、TimeUnit.MICROSECONDS(微秒)、TimeUnit.NANOSECONDS(纳秒)。
workQueue(必填)任务队列,用于保存等待执行的任务。
threadFactory线程工厂,用于指定创建新线程的方式。
handler拒绝策略,当任务过多而无法处理时,采取的处理策略。

内部类

在 ThreadPoolExecutor 中有五个核心的内部类,分别是 AbortPolicy、CallerRunsPolicy、DiscardOldestPolicy、DiscardPolicy 和 Worker。总的来说其实就两类:拒绝策略(Policy)和工作线程类(Worker)。

拒绝策略内部类(Policy)是当工作队列中的任务已到达最大限制,并且线程池中的线程数量也达到最大限制,这时如果有新任务提交进来,该如何处理呢?这里的拒绝策略,就是解决这个问题的。拒绝策略表示当任务队列满了且线程数也达到最大了,这时候再新加任务,线程池已经无法承受了,这些新来的任务应该按什么逻辑来处理。

工作线程内部类(Worker)是每一个 Worker 类就会绑定一个线程(Worker 类有一个成员属性持有一个线程对象),可以将 Worker 理解为线程池中执行任务的线程。但是实际上它并不仅仅是一个线程,Worker 里面还会有一些统计信息,存储一些相关的数据。

任务队列

  • SynchronousQueue:SynchronousQueue 是一个没有容量的队列(不知道能不能称为一个队列)。它是 BlockingQueue 的一个实现类,与其他实现 BlockingQueue 的实现类对比,它没有容量,更轻量级;入队和出队要配对,也就是说当你入队一个元素后,必须先出队才能入队,否则插入的线程会一直等待。
  • LinkedBlockingQueue:LinkedBlockingQueue 是一个支持并发操作的有界阻塞队列,底层数据结构是由一个单链表维护。当使用它的时候,不设置长度限制,则默认 Integer.MAX_VALUE 做为最大容量。
  • ArrayBlockingQueue:ArrayBlockingQueue 也是一个支持并发操作的有界阻塞队列,与 LinkedBlockingQueue 不同的是,ArrayBlockingQueue 底层数据结构是由一个环形数组数据结构维护的。同样是当使用它的时候,不设置长度限制,则默认 Integer.MAX_VALUE 做为最大容量。

另外,还支持另外 4 种队列:

  • PriorityBlockingQueue:PriorityBlockingQueue 是一个支持优先级处理操作的队列,你可以按照某个规则自定义这个优先级,以保证优先级高的元素放在队列的前面,进行出队的时候能够先出优先级高的元素。
  • DelayQueue:DelayQueue 是一个延迟队列,队列中每一个元素都会有一个自己的过期时间,每当使用出队操作的时候,只有过期的元素才会被出队,没有过期的依然会留在队列中。
  • LinkedBlockingDeque:LinkedBlockingDeque 是一个由链表结构组成的双向阻塞队列,可以头部和尾部进行操作。
  • LinkedTransferQueue:LinkedTransferQueue 由链表结构组成的无界阻塞队列。它的设计比较特别,是一种预约模式的设计。当出队时如果队列中有数据则直接取走,没有的话会在队列中占一个位子,这个位子的元素为 null,当有其他线程入队时,则会通知出队的这个线程,告诉它你可以拿走这个元素了。

拒绝策略

当添加任务时,如果线程池中的容量满了以后,线程池会做哪些策略?下面是线程池的一些策略:

  • AbortPolicy(默认):AbortPolicy 策略会将新的任务丢弃并抛出 RejectedExecutionException 异常。
  • CallerRunsPolicy:CallerRunsPolicy 策略是直接运行这个任务的run方法,但并非是由线程池的线程处理,而是交由任务的调用线程处理。
  • DiscardPolicy:DiscardPolicy 策略是直接丢弃任务,不抛出任何异常。
  • DiscardOldestPolicy:DiscardOldestPolicy 策略是将当前处于等待队列列头的等待任务强行取出,然后再试图将当前被拒绝的任务提交到线程池执行。


ThreadPoolExecutor 源码

线程池生命周期

// 管理线程池的状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 表示低 29 位的值,用于表示线程池中工作线程的数量
private static final int COUNT_BITS = Integer.SIZE - 3;
// 用于表示线程池中工作线程的数量的最大值,等于 2^29-1
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 线程池的五种状态,高 3 位表示,下面的状态值依次增高,是根据他们状态流程的顺序依次增高的
// RUNNING 状态
private static final int RUNNING    = -1 << COUNT_BITS;
// SHUTDOWN 状态
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// STOP 状态
private static final int STOP       =  1 << COUNT_BITS;
// TIDYING 状态
private static final int TIDYING    =  2 << COUNT_BITS;
// TERMINATED 状态
private static final int TERMINATED =  3 << COUNT_BITS;


// 通过与运算,计算返回线程池的状态,将高3位的值保留,低 29 位的值置为 0
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 通过与运算,计算返回线程池的状态,将高 3 位的值置为 0,低 29 位的值保留
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 通过或运算,计算返回线程池的状态,将高 3 位和低 29 位的值都保留,合并为 ctl 并返回
private static int ctlOf(int rs, int wc) { return rs | wc; }

// runStateLessThan 方法用于判断线程池的状态是否小于某个状态
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}

// runStateAtLeast 方法用于判断线程池的状态是否大于等于某个状态
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}

// isRunning 方法用于判断线程池的状态是否是 RUNNING 状态
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

上面这些源码主要是写了当前线程池的状态、线程池的五种状态值定义、计算线程池的状态和判断线程池的状态。
这里再补充一下线程池的五种状态:

  • RUNNING 状态:线程池创建后,初始化时会将线程池的状态设置为 RUNNING,表示可接受新任务,并且执行队列中的任务;
  • SHUTDOWN 状态:当调用了 shutdown() 方法,则线程池处于 SHUTDOWN 状态,表示不接受新任务,但会执行队列中的任务;
  • STOP 状态:当调用了 shutdownNow() 方法,则线程池处于 STOP 状态,表示不接受新任务,并且不再执行队列中的任务,并且中断正在执行的任务;
  • TIDYING 状态:所有任务已经中止,且工作线程数量为 0,最后变迁到这个状态的线程将要执行 terminated() 钩子方法,只会有一个线程执行这个方法;
  • TERMINATED 状态:中止状态,已经执行完 terminated() 钩子方法;

线程池状态执行流程:
在这里插入图片描述


线程池构造函数

当我们创建线程池时,不管是通过 Executor 工具类还是手动创建线程池,最终都会调用的下面这个构造方法来实现的。

/**
 * 构造方法,创建线程池。
 * 
 * @param corePoolSize    线程池的核心线程数量
 * @param maximumPoolSize 线程池的最大线程数
 * @param keepAliveTime   当线程数大于核心线程数时,多余的空闲线程存活的最长时间
 * @param unit            存活时间的时间单位
 * @param workQueue       任务队列,用来储存等待执行任务的队列
 * @param threadFactory   线程工厂,用来创建线程,一般默认即可
 * @param handler         拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
 */
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                          TimeUnit unit, BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory, RejectedExecutionHandler handler) {
    if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

这里简单介绍一下线程池的内部结构设计:
在这里插入图片描述

在线程池中,如果执行的线程数超过了核心线程数,那么这些多余线程的最长存活时间,不会超过 keepAliveTime 参数。


execute() 【提交任务】

execute() 方法的主要目的就是将任务执行起来。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 1. 如果工作线程数量小于核心数量
    if (workerCountOf(c) < corePoolSize) {
        // 添加一个工作线程(核心)  并将该任务作为该工作线程的第一个任务,创建线程完成后直接返回
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 2. 如果达到了核心数量且线程池是运行状态,则将任务加入队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 再次检查线程池状态,如果不是运行状态,就移除任务并执行拒绝策略
        if (!isRunning(recheck) && remove(command))
            reject(command);
        // 容错检查工作线程数量是否为0,如果为0就创建一个,此时就不用为工作线程绑定任务了,因为任务已经加入到队列中了
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3. 任务入队列失败(例如队列满了),尝试创建非核心工作线程,将该任务和非核心工作线程绑定在一起
    else if (!addWorker(command, false))
        // 尝试创建非核心工作线程失败,执行拒绝策略
        reject(command);
}

上面这些源码和注释,主要讲述 execute() 方法是怎么做的校验和做了哪些步骤。用流程图表达:
在这里插入图片描述


addWorker() 方法 【添加工作线程并启动】

了解 retry:

在了解 addWorker() 方法之前,先了解一下 retry:
下面是一个示例

public static void main(String[] args) {
    int i = 0;
    // todo retry: 标记在这里
    retry:
    for (;;) {
        i++;
        System.out.println("i=" + i);
        int j = 0;
        for (;;) {
            System.out.println("j=" + j);
            j++;
            if (j == 3) {
                break retry;
            }
        }
    }
}

在执行完下面代码后,通过控制台可以看到下面这个输出

i=1
j=0
j=1
j=2

接下来把示例代码改一下

public static void main(String[] args) {
    int i = 0;
    for (;;) {
        i++;
        System.out.println("i=" + i);
        int j = 0;
        // todo retry: 标记在这里
        retry:
        for (;;) {
            System.out.println("j=" + j);
            j++;
            if (j == 3) {
                break retry;
            }
        }
    }
}

输出的内容如下

0
j=1
j=2
i=232781
j=0
j=1
j=2
i=232782
j=0
j=1
j=2
i=232783
...

通过两个示例,能够发现,retry 相当于一个标记,只用在循环里面,break 的时候会到 retry 标记处。如果 retry 没有在循环(for,while)里面,在执行到 retry 时,就会跳出整个循环。如果 retry 在循环里面,可以理解为跳到了关键字处执行,不管几层循环。continue理解也是一样。
需要注意的是: retry: 需要放在for,whlie,do…while的前面声明,变量只跟在 break 和 continue 后面。


addWordker() 方法讲述

addWordker() 方法主要的目的是创建一个线程,然后启动执行,期间也会判断线程池状态和工作线程数量等各种检测。

/**
 * addWordker() 方法主要做的事情是创建一个线程,然后启动执行,期间也会判断线程池状态和工作线程数量等各种检测。
 *
 * @param firstTask 创建线程后执行的任务
 * @param core      该参数决定了线程池容量的约束条件,即当前线程数量以何值为极限值。参数为 true 则使用 corePollSize 作为约束值,否则使用 maximumPoolSize。
 * @return 是否创建线程并启动成功
 */
private boolean addWorker(Runnable firstTask, boolean core) {
    // 外层循环主要是检查线程池状态
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 线程池状态检查
        // 1、线程池是非 RUNNING 状态
        // 2、线程池是 SHUTDOWN 状态 并且 传入的任务是 null 并且 workQueue 不等于空
        // 两种情况都会返回 false
        if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
            return false;

        // 内层循环:线程池添加核心线程并返回是否添加成功的结果
        for (;;) {
            // 获取当前线程数
            int wc = workerCountOf(c);
            // 判断是否饱和容量了,如果已经达到最大线程数则不能再新建线程,直接返回 false
            if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 完成了上面的判断后,说明现在线程池可以创建新线程,则通过 CAS 将线程数量加 1,因为后面要创建线程了,并跳出外层循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 如果上面的 compareAndIncrementWorkerCount(c) 方法返回 false,则说明有其他线程在操作线程池的线程数量,所以需要重新获取 ctl
            c = ctl.get();
            // 如果当前的运行状态已经和最开始获取的状态不一样了,则重新进入外层循环
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    // 如果上面的条件满足,则会把工作线程数量加1,然后执行下面创建线程的动作
    // 标记是否启动了工作线程
    boolean workerStarted = false;
    // 标记是否成功添加了工作线程
    boolean workerAdded = false;
    // 要创建的 Worker 对象
    Worker w = null;
    try {
        // 创建工作线程
        // 增加一个 worker   这个 firstTask 就是一个线程(任务线程,每一个 Worker 都和一个任务线程绑定)
        w = new Worker(firstTask);
        // 这个是绑定在 Worker 上的工作线程,并不是任务线程,工作线程是用来执行任务的
        final Thread t = w.thread;
        // 判断 t 是否为 null
        if (t != null) {
            // 获取线程池的锁
            final ReentrantLock mainLock = this.mainLock;
            // 在读取线程池状态的时候就应该上锁,防止有并发操作破坏程序的一致性,上下不一致
            mainLock.lock();
            try {
                // 锁定后并重新检查线程池的状态,查看是否存在线程工厂的失败或者锁定前的关闭
                // 获取线程池运行状态
                int rs = runStateOf(ctl.get());
                // 检查线程池状态
                // rs < SHUTDOWN表示是 RUNNING 状态;
                // 如果 rs 是 RUNNING 状态或者 rs 是 SHUTDOWN 状态并且 firstTask 为 null,向线程池中添加线程。
                // 因为在 SHUTDOWN 时不会在添加新的任务,但还是会执行 workQueue 中的任务
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException();
                    // 添加到工作线程队列。 workers 是线程池中存放工作线程的集合
                    // 增加 work  每一个线程池都持有一个 workers 集合,里面存储着线程池的所有 worker,也就是所有线程
                    workers.add(w);
                    // 还在池子中的线程数量(只能在mainLock中使用)
                    int s = workers.size();
                    // 不能超过线程池的最大线程数量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // 标记线程添加成功
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 线程添加成功之后启动线程
                // 启动绑定在 worker上的线程。启动了之后该工作线程就会开始从任务队列中拿任务去执行了
                t.start();
                // 标记工作线程启动成功
                workerStarted = true;
            }
        }
    } finally {
        // 线程启动失败,执行失败方法(线程数量减1,执行tryTerminate()方法等)
        if (! workerStarted)
            addWorkerFailed(w);
    }
    // 返回新创建的工作线程是否启动成功
    return workerStarted;
}

在 addWorker 方法中,如果添加 Worker 并且启动线程失败,则会做失败后的处理:

private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 移除添加失败的 worker
        if (w != null)
            workers.remove(w);
        // 减少 worker 数量
        decrementWorkerCount();
        // 尝试终止线程池
        tryTerminate();
    } finally {
        mainLock.unlock();
}

通过上面的源码,可以知道所有的线程都是放在 Worker 这个类中的,到目前为止,任务还并没有执行, 真正的执行是在 Worker 内部类中执行的,在下面会聊到 Worker 内部类中是怎么执行任务的,注意一下这里的 t.start() 这个语句,启动时会调用 Worker 类中的run方法,Worker 本身实现了 Runnable 接口,所以一个 Worker 类型的对象也是一个线程。现在先来总结创建一个线程执行工作任务 addWorker() 这个方法。

流程图表示:
在这里插入图片描述


Worker 类

通过上面的分析,就可以知道每一个任务都会放在 Worker 类中,由 Worker 类中的线程去最终执行任务。
下面就来看一下 Worker 类的源码:

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{

    private static final long serialVersionUID = 6138294804551838833L;

    // 执行任务的线程
    final Thread thread;
    // 要被执行的任务
    Runnable firstTask;
    // 记录线程完成的任务数
    volatile long completedTasks;


    // worker 类的构造方法
    // 传入一个需要执行的任务,并且通过 getThreadFactory() 创建一个线程来执行任务
    Worker(Runnable firstTask) {
        // 设置 worker 线程状态为-1,表示禁止线程中断
        // state:锁状态,-1为初始值,0为unlock状态,1为lock状态
        setState(-1); // 在调用runWorker前,禁止中断
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    // 执行任务
    public void run() {
        runWorker(this);
    }

    // 是否锁住了任务
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    // 尝试获取锁的方法
    protected boolean tryAcquire(int unused) {
        // 尝试修改线程状态
        if (compareAndSetState(0, 1)) {
            // 设置 exclusiveOwnerThread 为当前线程
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    // 尝试释放锁
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
    
    // 加锁
    public void lock()        { acquire(1); }
    // 尝试获取锁
    public boolean tryLock()  { return tryAcquire(1); }
    // 解锁
    public void unlock()      { release(1); }
    // 是否锁住
    public boolean isLocked() { return isHeldExclusively(); }

    // 线程启动后,进行线程中断时执行方法
    void interruptIfStarted() {
        Thread t;
        // worker中状态 >= 0 且 线程不为空 且 当前线程未中断
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                // 中断线程
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

runWorker 【执行任务】

通过 Worker 类中的源码可以得知,在类中最终是调用了 runWorker 方法,执行的任务,那么接下来就来看一下 runWorker 方法。

final void runWorker(Worker w) {
    // 获取当前线程
    Thread wt = Thread.currentThread();
    // 获取要执行的任务
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 解锁,设置 worker 状态 state 为 0,表示允许线程中断
    w.unlock(); // allow interrupts
    // 标识线程退出原因,true 代表线程异常退出,false 代表线程正常退出
    boolean completedAbruptly = true;
    try {
        // 当前任务是为空 或 从任务队列获取的任务为空 则停止循环
        // getTask() 方法从任务队列获取一个任务
        while (task != null || (task = getTask()) != null) {
            // 加锁处理并发问题,防止 shutdown() 时终止正在执行的 worker
            w.lock();
            
            // 如果线程池状态 >= STOP(即 STOP 或 TERMINATED) 并且当前线程未被中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                // 中断当前线程
                wt.interrupt();
            
            try {
                // 调用自定义的任务启动前的方法
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 调用自定义的任务启动后的方法
                    afterExecute(task, thrown);
                }
            } finally {
                // 任务对象设置成 null
                task = null;
                // 完成的工作数 +1
                w.completedTasks++;
                // 解锁
                w.unlock();
            }
        }
        // 标识线程为正常退出
        completedAbruptly = false;
    } finally {
        // 处理 worker 线程退出,主要是用来销毁空闲线程,控制线程池中线程数量的大小变化
        processWorkerExit(w, completedAbruptly);
    }
}

通过上面的源码可以得知在 runWorker 方法内部的主要流程主要如下:

  1. 首先解锁 worker,允许线程中断;
  2. 通过 while 循环判断当前任务是否为空,若为空则调用 getTask() 方法从阻塞队列中获取待处理的任务;
  3. 获取到任务后工作线程直接加锁,同时根据线程池状态判断是否中断当前线程;
  4. 执行任务,在任务执行前后执行自定义扩展方法;
  5. 重复第 2 步的逻辑,直到阻塞队列中的任务全部执行完毕,最后调用 processWorkerExit() 方法处理 worker 线程退出、销毁空闲线程和控制线程池中线程数量的大小变化;
    下面使用比较直观详细的流程图来表示出来:

在这里插入图片描述


getTask 【获取任务】

getTask() 方法的主要作用是从任务队列中获取任务。

private Runnable getTask() {
    // 标记上一次从阻塞队列获取任务是否超时
    boolean timedOut = false; // Did the last poll() time out?

    // 自旋
    for (;;) {
        int c = ctl.get();
        // 获取当前线程池状态
        int rs = runStateOf(c);

        // 判断是否需获取任务
        // 若线程池状态 >= STOP 或 (线程池状态为 SHUTDOWN 且 阻塞队列为空),则不需要再处理任务
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // 工作线程数-1
            decrementWorkerCount();
            // 返回
            return null;
        }
        
        // 获取当前工作线程数
        int wc = workerCountOf(c);

        // timed 标记用于判断是否需进行超时控制
        // allowCoreThreadTimeOut 是否运行核心线程超时销毁(默认为false,核心线程不允许超时销毁)
        // wc > corePoolSize 当前工作线程数大于核心线程数
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        /**
         * 当前工作线程数 > maximumPoolSize
         * 或
         * timed && timedOut 为 true
         *
         * 并且
         *
         * 工作线程数大于1
         * 或
         * 阻塞队列为空
         */
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            // 使用 CAS 对工作线程数 -1
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        // 开始从阻塞队列获取任务
        try {
            // timed 为 true 代表核心线程允许超时销毁 或 当前工作线程数大于核心线程数(存在非核心线程)
            // timed = true:超时等待获取任务 poll(),超时未获取到则返回 null
            // timed = false:阻塞获取任务 take(),一直等着直到获取到任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            // 判断是否获取到任务
            if (r != null)
                // 获取到直接返回
                return r;
            // 未获取到,代表等待超时也没有拿到任务,设置timedOut值为true
            timedOut = true;
        } catch (InterruptedException retry) {
            // 即使异常也循环重试
            timedOut = false;
        }
    }
}

通过上面的 getTask() 方法原发分析,大致的主要的过程就是:

  1. 校验线程池状态和工作线程数是否合规;
  2. 根据条件选择从任务队列中超时等待获取,还是从任务队列中阻塞获取;

下面用一个直观的流程图来表示:

在这里插入图片描述


shutdown() 【停止所有线程】

shutdown() 方法会把状态修改成 SHUTDOWN。并且这里肯定会成功,因为修改时是通过 自旋 的方式,不成功不退出。

public void shutdown() {
    // 获取线程池锁
    final ReentrantLock mainLock = this.mainLock;
    // 加锁,修改线程池状态
    mainLock.lock();
    try {
        // 检查是否有权限关闭线程池
        checkShutdownAccess();
        // 修改状态为SHUTDOWN,自旋操作,只有状态修改成功才会返回
        advanceRunState(SHUTDOWN);
        // 标记空闲线程为中断状态
        interruptIdleWorkers();
        onShutdown();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}
    
private void advanceRunState(int targetState) {
    // 自旋修改线程池状态
    for (;;) {
        // 获取ctl
        int c = ctl.get();
        // 如果状态大于 SHUTDOWN(因为只有 RUNNING 状态才能转化为 SHUTDOWN 状态,而只有 RUNNING 状态是小于 SHUTDOWN 状态的),
        // 或者修改为 SHUTDOWN 成功了,才会 break 跳出自旋
        if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}

shutdownNow() 【停止所有任务,中断执行的任务】

shutdownNow() 方法会把线程池的状态改成 STOP,线程池不接受新任务,且不再执行队列中的任务,且中断正在执行的任务,同时标记所有线程为中断状态。

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    // 获取线程池的锁
    final ReentrantLock mainLock = this.mainLock;
    // 加锁,进行状态修改操作
    mainLock.lock();
    try {
        // 检查是否有权限关闭线程池
        checkShutdownAccess();
        // 修改为 STOP 状态
        advanceRunState(STOP);
        // 标记所有线程为中断状态
        interruptWorkers();
        // 将队列中的任务全部移除,并返回
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    // 尝试终止线程池
    tryTerminate();
    // 返回队列中的任务
    return tasks;
}

private List<Runnable> drainQueue() {
    BlockingQueue<Runnable> q = workQueue;
    ArrayList<Runnable> taskList = new ArrayList<Runnable>();
    // 将队列中的元素添加到一个集合中去,但是如果队列是 DelayQueue 或者 其他类型的 Queue 时使用 drainTo 就会失败,所以就需要逐个删除
    q.drainTo(taskList);
    if (!q.isEmpty()) {
        for (Runnable r : q.toArray(new Runnable[0])) {
            if (q.remove(r))
                taskList.add(r);
        }
    }
    return taskList;
}

tryTerminate() 【根据线程池状态进行判断是否结束线程池】

所有任务已经中止,且工作线程数量为0,就会进入这个状态。最后变迁到这个状态的线程将要执行 terminated() 钩子方法,只会有一个线程执行这个方法。

final void tryTerminate() {
    // 自旋修改状态
    for (;;) {
        int c = ctl.get();
        // 下面几种情况不会执行后续代码
        // 1. 运行中
        // 2. 状态的值比 TIDYING 还大,也就是 TERMINATED
        // 3. SHUTDOWN 状态且任务队列不为空
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        
        // 工作线程数量不为 0,也不会执行后续代码
        if (workerCountOf(c) != 0) {
            // 尝试中断空闲的线程
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        // 获取线程池的锁
        final ReentrantLock mainLock = this.mainLock;
        // 加锁
        mainLock.lock();
        try {
            // CAS 修改状态为 TIDYING 状态
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // 更新成功,执行 terminated 钩子方法
                    terminated();
                } finally {
                    // 确保 terminated 钩子方法执行完毕后,再次修改状态为 TERMINATED(最终的终止状态),线程池彻底关闭
                    // 强制更新状态为 TERMINATED,这里不需要 CAS 了
                    ctl.set(ctlOf(TERMINATED, 0));
                    // 通知所有等待线程
                    termination.signalAll();
                }
                return;
            }
        } finally {
            // 解锁
            mainLock.unlock();
        }
    }
}

更新成 TIDYING 或者 TERMINATED 状态的代码都在 tryTerminate() 方法中,而 tryTerminate() 方法在很多个地方被调用,比如 shutdown()、shutdownNow()、线程退出时,所以说几乎每个线程最后消亡的时候都会调用 tryTerminate() 方法,但最后只会有一个线程真正执行到修改状态为 TIDYING 的地方。
修改状态为 TIDYING 后执行 terminated() 方法,最后修改状态为 TERMINATED,标志着线程池真正消亡了。



总结

最后,使用流程图来呈现一下 ThreadPoolExecutor 运行机制的图:
在这里插入图片描述



参考与感谢

  • 美团线程池文章
  • Agodival 博主的线程池文章
  • 云深i不知处 博主的线程池文章

非常感谢以上的博主!!!





End


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2125845.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【微服务】⭐️华为云obs功能抽取到公共服务,供所有项目使用

目录 &#x1f378;前言 &#x1f37b;一、公共服务搭建 &#x1f37a;二、代码实现 1.工具类编写 2.项目引入使用 &#x1f379;三、章末 &#x1f378;前言 小伙伴们大家好&#xff0c;上次讲了如何本地对接华为云Obs对象存储服务&#xff0c;在本地项目中通过sdk引入调用…

【QT】常用控件-下

欢迎来到Cefler的博客&#x1f601; &#x1f54c;博客主页&#xff1a;折纸花满衣 &#x1f3e0;个人专栏&#xff1a;QT 目录 &#x1f449;&#x1f3fb;QComboBox&#x1f449;&#x1f3fb; QSpinBox&#x1f449;&#x1f3fb;QDateTimeEdit&#x1f449;&#x1f3fb;QD…

时序预测 | MATLAB实现BKA-XGBoost(黑翅鸢优化算法优化极限梯度提升树)时间序列预测

时序预测 | MATLAB实现BKA-XGBoost(黑翅鸢优化算法优化极限梯度提升树)时间序列预测 目录 时序预测 | MATLAB实现BKA-XGBoost(黑翅鸢优化算法优化极限梯度提升树)时间序列预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 Matlab实现BKA-XGBoost时间序列预测&a…

datasophon升级海豚调度dolphinscheduler为3.2.2

一、参考博主升级3.2.1文章&#xff1a;datasophon升级海豚调度为3.2.1_海豚调度3.2.2 mysql包找不到-CSDN博客 二、升级后woker-server启动报错如下&#xff1a; 原因是worker-server下conf/common.properties中的&#xff1a;resource.storage.typeNONE&#xff0c; 解决很简…

如何划分类/单一职权原则SRP

参考&#xff1a;单一职责 -- 每个类只负责一个功能_每个类应该只负责一个功能,遵循单一职责原则。-CSDN博客 类有且只有一个原因需要修改它&#xff0c;这样的才是一个结构简洁的类。 结合上面的例子&#xff0c;需要注意的点&#xff1a; 1.比如搜索数据库&#xff0c;需要…

Procdump抓ToDesk密码

目录 前言 1.工具教程 2.转储数据 3.密码获取 4.总结 前言 本文是因为在公众号上看到一篇文章随想着实战中利用ToDesk秀操作失败后&#xff0c;实验环境成功复现后写下。ProcDump[1] 是一个命令行实用工具&#xff0c;其主要用途是监视应用程序的 CPU 峰值&#xff0c;并在…

mybatis 查询Not Found TableInfoCache

近期在工程迁移中遇到一个mybatis查询的问题&#xff0c;检查代码没有问题&#xff0c;但是报Not Found TableInfoCache 解决过程 是不是数据库对应表错误或者实体类指定的表名错误 查看配置文件链接的数据源是否正确TableName中指定的表名然后去数据库看一下是否存在 如果…

象过河仓库管理软件,轻松实现无纸化录入,自动化记账

在如今快速发展的商业环境中&#xff0c;仓库管理面临着手工记账效率低下&#xff0c;容易引发数据不准确&#xff0c;滞后&#xff0c;错漏频发&#xff0c;盘点耗时费力等问题。为了解决这些问题&#xff0c;象过河仓库管理软件应运而生&#xff0c;轻松实现无纸化录入&#…

支付环节攻击方式与漏洞类型

支付环节攻击方式与漏洞类型 1.概述2.卡复制3.卡数据破解与篡改4.网络欺骗攻击5.线下欺骗攻击6.支付身份伪造7.支付逻辑绕过8.数据不同步9.支付数据篡改10.条件竞争漏洞&#xff08;并发&#xff09;11.拒绝服务 参考自&#xff1a;https://www.topsec.com.cn/uploads/2023-10-…

希捷,AI时代的存储“破壁者”

喜欢跑步或者经常看马拉松等比赛的读者知道&#xff0c;当选手经过专业训练成绩突飞猛进后&#xff0c;就会有一段时间停滞不前。这个阻碍可能是物理的、心理的或是技术的障碍&#xff0c;只有突破这个“壁垒”&#xff0c;才能成为更好的自己。 对于一家企业来说&#xff0c;…

100.WEB渗透测试-信息收集-网络空间搜索引擎shodan(2)

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 内容参考于&#xff1a; 易锦网校会员专享课 上一个内容&#xff1a;99.WEB渗透测试-信息收集-网络空间搜索引擎shodan&#xff08;1&#xff09;-CSDN博客 Sh…

读论文-《基于计算机视觉的工业金属表面缺陷检测综述》

文章目录 1. 背景1.1 工业需求1.2 传统方法的局限1.3 计算机视觉技术的优势 2. 技术流程2.1 光学成像2.1.1照明方式2.1.2 缺陷和背景特性 2.2 图像预处理2.3 缺陷检测2.4 结果分析和决策 3. 关键算法3.1 光学成像技术相关算法3.2 图像预处理相关算法3.2.1 图像增强3.2.2特征提取…

wakenet尾迹

1、数据集介绍SWIM_Dataset_1.0.0 1.1标注文件介绍 标注文件介绍&#xff0c; 第一种&#xff1a;角度和框的坐标 <annotation><folder>Positive</folder><filename>00001</filename>文件名字<format>jpg</format>图片后缀<s…

太速科技-基于XC7Z100+AD9361的双收双发无线电射频板卡

基于XC7Z100AD9361的双收双发无线电射频板卡 一、板卡概述 基于XC7Z100AD9361的双收双发无线电射频板卡是基于Xilinx ZYNQ FPGA和ADI的无线收发芯片AD9361开发的专用功能板卡&#xff0c;用于4G小基站&#xff0c;无线图传&#xff0c;数据收发等领域。 二、板卡…

[产品管理-8]:NPDP新产品开发 - 6 - 商业画布、商业模式、创新模式

目录 一、商业画布&#xff1a;九个核心部件 二、商业模式 三、创新模式 3.1 什么是创新 1、传统与创新模式的对比 2、创新模式的具体类型 3、企业创新模式的分类 4、总结 3.2 创新模式 1. 已知领域 2. 未知领域 一、商业画布&#xff1a;九个核心部件 商业画布&…

【C语言】选择排序及优化、冒泡排序、计数排序的实现

目录 一、选择排序1.1 常规版&#xff08;一次排好一个数&#xff09;1.1.1 基本思想1.1.2 实现思路1.1.3 代码 1.2 优化版&#xff08;一次排好两个数&#xff09;1.2.1 实现思路1.2.2 代码 1.3 时间复杂度 二、冒泡排序2.1 实现思路2.2 代码2.3 时间复杂度 三、计数排序3.1 基…

DroidBot-GPT: GPT-powered UI Automation for Android论文学习

本文介绍了DroidBot GPT&#xff0c;这是一种利用类似GPT的大型语言模型&#xff08;LLM&#xff09;自动化与Android移动应用程序交互的工具。给定所需任务的自然语言描述&#xff0c;DroidBot GPT可以自动生成并执行导航应用程序以完成任务的操作。它的工作原理是将应用程序G…

99.游戏安全项目-可见数据的搜索与技巧

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 内容参考于&#xff1a;易道云信息技术研究院 上一个内容&#xff1a;98.游戏的启动与多开-分析与实现多开器 下图中红框位置显示的数据&#xff0c;只有下图…

Avaloia 实现国产麒麟系统中文显示界面

最近在搞一个国产麒麟系统的接口对接&#xff0c;因为&#xff0c;接口内含复杂的签名验证&#xff0c;而且还是离线环境&#xff0c;所以&#xff0c;postman不是很好用。 就想着哪个方式好一些&#xff0c;主要是有选择图片的操作&#xff0c;所以&#xff0c;在Electron和A…

有了它,Python性能瓶颈消失!

声明&#xff1a;此篇为 ai123.cn 原创文章&#xff0c;转载请标明出处链接&#xff1a;https://ai123.cn/#1 作为一名互联网行业的Python工程师&#xff0c;你是否也遇到过解释型语言在处理大量数据时速度较慢、内存占用高、并行处理困难、调试复杂、扩展性差和性能瓶颈等各种…