ThreadPoolExecutor 是JDK中的线程池实现,这个类实现了一个线程池需要的各个方法,它提供了任务提交、线程管理、监控等方法
下面是 ThreadPoolExecutor 类的构造方法源码,其他创建线程池的方法最终都会导向这个构造方法,共有7个参数:
corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、threadFactory、handler
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
这些参数大部分都是通过 volatile 修饰
public class ThreadPoolExecutor extends AbstractExecutorService {
private volatile int corePoolSize; // 1.核心线程数
private volatile int maximumPoolSize; // 2.最大线程数
private volatile long keepAliveTime; // 3.空闲线程存活时间
TimeUnit unit; // 4. 存活时间单位
private final BlockingQueue<Runnable> workQueue; // 5.工作队列
private volatile ThreadFactory threadFactory; // 6.线程工厂
private volatile RejectedExecutionHandler handler; // 7.拒绝策略
private volatile boolean allowCoreThreadTimeOut; // 是否允许核心线程被回收
}
一、corePoolSize:核心线程数
线程池维护的最小线程数量,核心线程创建后不会被回收(注意:设置 allowCoreThreadTimeout = true 后,空闲的核心线程超过存活时间也会被回收)
大于核心线程数的线程,在空闲时间超过keepAliveTime后会被回收
线程池刚创建时,里面没有一个线程,当调用 execute() 方法添加一个任务时,如果正在运行的线程数量小于corePoolSize,则马上创建新线程并运行这个任务
二、maximumPoolSize:最大线程数
线程池允许创建的最大线程数量
当添加一个任务时,核心线程数已满,工作队列已满的情况下,线程池还没达到最大线程数,并且没有空闲线程,创建一个新线程并执行
三、keepAliveTime:空闲线程存活时间
当一个可被回收的线程的空闲时间大于keepAliveTime,就会被回收
可被回收的线程:
设置allowCoreThreadTimeout = true的核心线程
大于核心线程数的线程(非核心线程)
四、unit:时间单位
keepAliveTime 的时间单位:
TimeUnit.NANOSECONDS // 纳秒
TimeUnit.MICROSECONDS // 微秒
TimeUnit.MILLISECONDS // 毫秒
TimeUnit.SECONDS // 秒
TimeUnit.MINUTES // 分钟
TimeUnit.HOURS // 小时
TimeUnit.DAYS // 天
五、workQueue:工作队列
作用:存放待执行任务的队列。当提交的任务数超过核心线程数大小后,再提交的任务就存放在工作队列,任务调度时再从队列中取出任务。它仅仅用来存放被 execute() 方法提交的 Runnable 任务。工作队列实现了 BlockingQueue 接口
JDK默认的工作队列有五种
5.1 ArrayBlockingQueue:数组型阻塞队列。
数组结构,初始化时传入大小(有界),FIFO(先进先出策略)。使用一个重入锁(ReentrantLock),默认使用非公平锁,入队和出队共用一个锁,互斥
final ReentrantLock lock = this.lock;
5.2 LinkedBlockingQueue:链表型阻塞队列。
链表结构,默认初始化大小为Integer.MAX_VALUE,有界(近似无界),FIFO(先进先出策略)。使用两个重入锁分别控制元素的入队和出队,用 Condition 进行线程间的唤醒和等待
// 初始化构造,也有自定义大小capacity参数
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
// 两把锁
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
5.3 SynchronousQueue:同步移交队列。
容量为0,添加任务必须等待取出任务,这个队列相当于通道,不存储元素
5.4 PriorityBlockingQueue:优先级阻塞队列。
无界,在 put 的时候会tryGrow,要说它有界也没问题,因为界是 Integer.MAX_VALUE,但其实上这个队列应该是无界的。默认采用元素自然顺序升序排列(可以自定义Comparator)。使用一个重入锁分别控制元素的入队和出队
/**
* Default array capacity. 默认初始化大小 11
*/
private static final int DEFAULT_INITIAL_CAPACITY = 11;
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
/**
* Lock used for all public operations
*/
private final ReentrantLock lock;
/**
* Condition for blocking when empty
*/
private final Condition notEmpty;
5.5 DelayQueue:延时队列。
无界,队列中的元素有过期时间,过期的元素才能被取出。使用一个重入锁分别控制元素的入队和出队,用 Condition 进行线程间的唤醒和等待。任务调度时候可以使用
private final transient ReentrantLock lock = new ReentrantLock();
private final Condition available = lock.newCondition();
队列分类
无界队列
队列大小无限制,常用的无界的队列为 LinkedBlockingQueue,使用该队列做为阻塞队列时要尤其当心,当任务耗时较长时可能会导致大量新任务在队列中堆积最终导致OOM
有界队列
常用的有两类,一类是遵循FIFO原则的队列如 ArrayBlockingQueue,另一类是优先级队列如 PriorityBlockingQueue, PriorityBlockingQueue 中的优先级由任务的Comparator决定
使用有界队列时队列大小需和线程池大小互相配合,线程池较小有界队列较大时可减少内存消耗,降低cpu使用率和上下文切换,但是可能会限制系统吞吐量
同步移交队列
如果不希望任务在队列中等待而是希望将任务直接移交给工作线程,可使用 SynchronousQueue 作为等待队列。SynchronousQueue 不是一个真正的队列,而是一种线程之间移交的机制。要将一个元素放入 SynchronousQueue 中,必须有另一个线程正在等待接收这个元素。只有在使用无界线程池或者有饱和策略时才建议使用该队列
六、threadFactory:线程工厂
创建线程的工厂,可以设定线程名、线程编号等
默认创建的线程工厂,通过Executors.defaultThreadFactory()获取
/**
* The default thread factory
*/
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
七、handler:拒绝策略
当线程池线程数已满,并且工作队列达到限制,新提交的任务使用拒绝策略处理。可以自定义拒绝策略,拒绝策略需要实现 RejectedExecutionHandler 接口
JDK默认的拒绝策有四种
AbortPolicy:丢弃任务并抛出 RejectedExecutionException 异常
DiscardPolicy:丢弃任务,但是不抛出异常。可能导致无法发现系统的异常状态
DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务
CallerRunsPolicy:由调用线程处理该任务
默认拒绝策略是 AbortPolicy
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
八、拓展
线程池的执行流程
自定义线程池工具
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadPoolFactory {
public static void main(String[] args) {
// 测试线程池方法
ExecutorService threadPool = createFixedThreadPool("vinjcent");
for (int i = 0; i <= 20; i++) {
int number = i;
threadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "===>" + number);
}
});
}
// 使用完之后记得关闭
threadPool.shutdown();
}
public static ExecutorService createFixedThreadPool(String threadName) {
// 用于创建线程时,每个线程名称不同的增值
AtomicInteger threadNumber = new AtomicInteger(0);
return new ThreadPoolExecutor(
// 核心线程数
defaultThreadNum(),
// 最大线程数(这里我们使用I/O密集型)
defaultThreadNum(),
// 空闲线程存活时间 15s
15L,
// 空闲线程存活时间单位
TimeUnit.SECONDS,
// 工作队列(使用数组型阻塞队列)
new ArrayBlockingQueue<>(1024),
// 创建线程工厂
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
// 为工厂创建的每一个线程的格式化名字 threadName-No.[0,1,2,3...]
return new Thread(r, threadName + "-No." + threadNumber.getAndIncrement());
}
},
// 拒绝策略,自定义
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 如果线程池没有关闭
if (!executor.isShutdown()) {
// 尝试将当前任务添加到任务队列中
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
// 保持线程的中断状态
Thread.currentThread().interrupt();
}
}
}
}
);
}
/**
* 默认的线程数,使用2倍的cpu核心数
*
* 核心线程数分配
* CPU密集型: 核心线程数 = CPU核心数(或核心线程数 = CPU核心数 + 1)
* I/O密集型: 核心线程数 = 2 * CPU核心数(或核心线程数 = CPU核心数 / (1 - 阻塞系数))
* 混合型: 核心线程数 = (线程等待时间 / 线程CPU时间 + 1) * CPU核心数
*
* 最大线程数分配
* IO密集型经验应用,最大线程设置为 2N+1 (N为CPU数量,下同)
* CPU密集型应用,最大线程设置为 N+1
*
* 线程数 = CPU 核心数 * (1 + IO 耗时/ CPU 耗时)
*/
public static int defaultThreadNum() {
return Runtime.getRuntime().availableProcessors() * 2;
}
}
合理设计线程池大小
CPU 密集型任务
比如像加解密,压缩、计算等一系列需要大量耗费 CPU 资源的任务,大部分场景下都是纯 CPU 计算。IO 密集型任务:比如像 MySQL 数据库、文件的读写、网络通信等任务,这类任务不会特别消耗 CPU 资源,但是 IO 操作比较耗时,会占用比较多时间。在知道如何判断任务的类别后,让我们分两个场景进行讨论:
对于 CPU 密集型计算,多线程本质上是提升多核 CPU 的利用率,所以对于一个 8 核的 CPU,每个核一个线程,理论上创建 8 个线程就可以了
如果设置过多的线程数,实际上并不会起到很好的效果。此时假设我们设置的线程数量是 CPU 核心数的 2 倍,因为计算任务非常重,会占用大量的 CPU 资源,所以这时 CPU 的每个核心工作基本都是满负荷的,而我们又设置了过多的线程,每个线程都想去利用 CPU 资源来执行自己的任务,这就会造成不必要的上下文切换,此时线程数的增多并没有让性能提升,反而由于线程数量过多会导致性能下降
因此,对于 CPU 密集型的计算场景,理论上线程的数量 = CPU 核数就是最合适的,不过通常把线程的数量设置为CPU 核数 +1,会实现最优的利用率
即使当密集型的线程由于偶尔的内存页失效或其他原因导致阻塞时,这个额外的线程也能确保 CPU 的时钟周期不会被浪费,从而保证 CPU 的利用率
IO 密集型任务
对于 IO 密集型任务最大线程数一般会大于 CPU 核心数很多倍,因为 IO 读写速度相比于 CPU 的速度而言是比较慢的,如果我们设置过少的线程数,就可能导致 CPU 资源的浪费。而如果我们设置更多的线程数,那么当一部分线程正在等待 IO 的时候,它们此时并不需要 CPU 来计算,那么另外的线程便可以利用 CPU 去执行其他的任务,互不影响,这样的话在任务队列中等待的任务就会减少,可以更好地利用资源
对于 IO 密集型计算场景,最佳的线程数是与程序中 CPU 计算和 IO 操作的耗时比相关的,《Java并发编程实战》的作者 Brain Goetz 推荐的计算方法如下:
线程数 = CPU 核心数 * (1 + IO 耗时/ CPU 耗时)
通过这个公式,我们可以计算出一个合理的线程数量,如果任务的平均等待时间长,线程数就随之增加,而如果平均工作时间长,也就是对于我们上面的 CPU 密集型任务,线程数就随之减少