Java并发源码学习:线程池源码解析

ThreadPoolExecutor概述

线程池解决的优点

  1. 当执行大量异步任务时线程池能够提供较好的性能,因为线程池中的线程是可复用的,不需要每次执行异步任务时都创建和销毁线程。
  2. 提供资源限制和管理的手段,比如可以限制线程的个数,动态新增线程等等。

线程池处理流程

ThreadPoolExecutor执行execute时,流程如下:



  1. 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务,这里需要加全局锁。
  2. 如果运行的线程数>=corePoolSize,则将任务加入BlockingQueue。
  3. 如果此时BlockingQueue已满,则创建新的线程来处理任务,这里也需要加全局锁。
  4. 如果创建新线程将使当前运行的线程超出maximumPoolSize,则按照拒绝策略拒绝任务。

当然啦,这篇文章意在从源码角度学习线程池这些核心步骤的具体实现啦,线程池概念性的东西,可以参考一些其他的博客

创建线程池

创建线程池有几种方法,一种是使用Executors工具类快速创建内置的几种线程池,也可以自定义。

一、通过Executor框架的工具类Executors可以创建三种类型的ThreadPoolExecutor。

二、使用ThreadPoolExecutor的各种构造方法。

《阿里巴巴 Java 开发手册》中:强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险

Executors 返回线程池对象的弊端如下: FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE ,可能堆积大量的请求,从而导致 OOM。 CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。

本篇的重点就是这个ThreadPoolExecutor。

重要常量及字段

public class ThreadPoolExecutor extends AbstractExecutorService {
	// 原子的Integer变量ctl,用于记录线程池状态【高3位】和线程池中线程个数【低29位】,这里假设Integer是32位的
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 其实并不是每个平台的Integer二进制都是32位的,实际上是,二进制位-3代表线程个数
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 线程最大个数【约5亿】 低COUNT_BITS都是1  000 11111111111111111111111111111
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// 111 00000000000000000000000000000 高3位是 111
private static final int RUNNING    = -1 &lt;&lt; COUNT_BITS;
// 000 00000000000000000000000000000 高3位是 000
private static final int SHUTDOWN   =  0 &lt;&lt; COUNT_BITS;
// 001 00000000000000000000000000000 高3位是 001
private static final int STOP       =  1 &lt;&lt; COUNT_BITS;
// 010 00000000000000000000000000000 高3位是 110
private static final int TIDYING    =  2 &lt;&lt; COUNT_BITS;
// 011 00000000000000000000000000000 高3位是 011
private static final int TERMINATED =  3 &lt;&lt; COUNT_BITS;

// Packing and unpacking ctl
// 获取高3位的运行状态
private static int runStateOf(int c)     { return c &amp; ~CAPACITY; }
// 获取低29位的线程个数
private static int workerCountOf(int c)  { return c &amp; CAPACITY; }
// 通过RunState和WorkCount计算ctl的值
private static int ctlOf(int rs, int wc) { return rs | wc; }

// 线程池状态变换是单调递增的
private static boolean runStateLessThan(int c, int s) {
    return c &lt; s;
}

private static boolean runStateAtLeast(int c, int s) {
    return c &gt;= s;
}
// 只有RUNNING 是小于SHUTDOWN的
private static boolean isRunning(int c) {
    return c &lt; SHUTDOWN;
}

// ...

// 阻塞队列
private final BlockingQueue&lt;Runnable&gt; workQueue;
// 独占锁 同步保证
private final ReentrantLock mainLock = new ReentrantLock();
// 存放 线程池中的工作线程
private final HashSet&lt;Worker&gt; workers = new HashSet&lt;Worker&gt;();
// 条件队列,线程调用awaitTermination时存放阻塞的线程
private final Condition termination = mainLock.newCondition();

// ...

// 继承AQS和Runnable,任务线程
private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{ /*.. */}