通过之前的一篇文章,我们总结了Executor框架。而在Executor框架中,ThreadPoolExecutor 是最核心的类。
ThreadPoolExecutor 看字面意思,是线程池的执行器。我们本篇文章就基于ThreadPoolExecutor 这个类来展开总结线程池。
下篇文章会从源码的角度解析ThreadPoolExecutor原理。
一、ThreadPoolExecutor类分析
1、构造方法
构造方法源码如下:
public ThreadPoolExecutor(int corePoolSize,//核心线程数
int maximumPoolSize,//最大线程数,可同时运行的最大线程数
long keepAliveTime,//除核心线程数之外的,空闲下来的线程的存活时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//装任务的队列,存储等待执行的任务
ThreadFactory threadFactory,//线程工厂,可自定义一个产生线程的类
RejectedExecutionHandler handler) {//拒绝策略
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
我们做个详解:
corePoolSize : 核心线程数,就是可同时运行的最小数量,不会被销毁的线程数,空闲时不还给操作系统。
maximumPoolSize:最大线程数,任务有一个等待队列,当这个队列满了,会启动除核心线程之外的线程,而当前可启动的最大线程数就是这个参数值了。
workQueue:任务队列,如果新来了任务,先判断当前运行的线程数量是否达到核心线程数,没达到就执行,如果达到的话,新任务就会被存放在队列中。
keepAliveTime :当线程池中的线程数量大于核心线程数的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了这个时间才会被回收销毁;
unit:keepAliveTime的时间单位;
threadFactory:生产线程的类;
handler:拒绝策略,如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolExecutor会员拒绝策略。
拒绝策略有以下四种:
AbortPolicy:抛出 RejectedExecutionException来拒绝新任务的处理。(抛异常拒绝)
DiscardPolicy :不处理新任务,直接丢弃掉。(不处理)
DiscardOldestPolicy:丢掉最早的未处理的任务。(丢队列最前端的任务)
CallerRunsPolicy:调用执行自己的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。(执行此任务)当最大池被填满时,此策略为我们提供可伸缩队列。
2、ThreadPoolExecutor使用
示例代码如下:
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
//创建任务
Callable<String> callable = () -> Thread.currentThread().getName();
//线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5,
10,
10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
new ThreadPoolExecutor.CallerRunsPolicy());
//提交并执行任务
for (int i = 0; i < 10; i++) {
Future<String> submit = executor.submit(callable);
String s = submit.get(3, TimeUnit.SECONDS);
System.out.println(s);
}
//关闭线程池
executor.shutdown();
}
运行结果如下:
pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
pool-1-thread-5
pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
pool-1-thread-5
二、常见线程池
1、FixedThreadPool(指定线程数)
其构造函数源码如下:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
从源码可以看出,FixedThreadPool 的 corePoolSize 和 maximumPoolSize 都被设置为我们自己传递的nThreads参数,所以,除核心线程外没有多余的线程。
《Java 并发编程的艺术》图片如下:
线程数小于nThreads时,来新任务就创建新线程,等于nThreads时,就加入队列等待,然后线程闲下来就立刻从队列中取任务执行。
为什么不推荐使用FixedThreadPool?
因为maximumPoolSize无效,而LinkedBlockingQueue队列的最大值是 Integer.MAX_VALUE,运行中的线程池会一直接受任务,直到队列满了还会接受,极端情况下会造成OOM。
2、SingleThreadExecutor (1个线程)
构造函数源码如下:
*/
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
就核心线程和最大线程就是1。
《Java 并发编程的艺术》图片如下:
和FixedThreadPool一样,但是就1个线程,任务来了就加入队列等待。
为什么不推荐使用SingleThreadExecutor ?
和FixedThreadPool一样,而LinkedBlockingQueue队列的最大值是 Integer.MAX_VALUE,运行中的线程池会一直接受任务,直到队列满了还会接受,极端情况下会造成OOM。
3、CachedThreadPool (线程数最大化)
构造函数代码如下:
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
从代码可以看出,其没有核心线程,来一个任务就创建一个线程(如果之前的线程不闲下来的话),一直创建到Integer.MAX_VALUE为止。极端情况下,这样会导致耗尽 cpu 和内存资源。
《Java 并发编程的艺术》图片如下:
1.首先执行 SynchronousQueue.offer(Runnable task) 提交任务到任务队列。如果当前 maximumPool 中有闲线程正在执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行 offer 操作与空闲线程执行的 poll 操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成,否则执行下面的步骤 2;(有空闲线程就执行任务)
2.当初始 maximumPool 为空,或者 maximumPool 中没有空闲线程时,将没有线程执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这种情况下,步骤 1 将失败,此时 CachedThreadPool 会创建新线程执行任务,execute 方法执行完成;(没有空闲线程就创建线程执行任务)
为什么不推荐使用CachedThreadPool?
允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。
4、ScheduledThreadPoolExecutor (定时执行)
主要用来在给定的延迟后运行任务,或者定期执行任务。
构造函数代码如下:
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
ScheduledThreadPoolExecutor 使用的任务队列 DelayQueue 封装了一个 PriorityQueue,PriorityQueue会对队列中的任务进行排序,执行所需时间短的放在前面先被执行(ScheduledFutureTask 的 time变量小的先执行),如果执行所需时间相同则先提交的任务将被先执行(ScheduledFutureTask 的 squenceNumber 变量小的先执行)。
运行机制图片如下:
1、当调用 ScheduledThreadPoolExecutor 的 scheduleAtFixedRate() 方法或者 scheduleWithFixedDelay() 方法时,会向 ScheduledThreadPoolExecutor 的 DelayQueue 添加一个实现了 RunnableScheduledFuture 接口的 ScheduledFutureTask 。
2、线程池中的线程从 DelayQueue 中获取 ScheduledFutureTask,然后执行任务。
为什么不推荐使用ScheduledThreadPoolExecutor ?
允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。而且在实际项目中应用较少,了解即可。
三、阿里开发手册
《阿里开发手册》华山版的并发编程这一节,有如下规定,参考:
四、线程池大小
1、上下文切换
多线程编程中一般线程的个数都大于 CPU 核心的个数,而一个 CPU 核心在任意时刻只能被一个线程使用,为了让这些线程都能得到有效执行,CPU 采取的策略是为每个线程分配时间片并轮转的形式。当一个线程的时间片用完的时候就会重新处于就绪状态让给其他线程使用,这个过程就属于一次上下文切换。
概括来说就是:当前任务在执行完 CPU 时间片切换到另一个任务之前会先保存自己的状态,以便下次再切换回这个任务时,可以再加载这个任务的状态。
任务从保存到再加载的过程就是一次上下文切换。
简单的公式:
CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。