Java高并发应用开发过程中会频繁的创建和销毁线程,为了节约成本和提升性能,往往会使用线程池来统一管理线程,使用线程池主要有以下几点优势
降低资源消耗:重复利用已创建的线程降低线程创建和销毁造成的消耗
提高响应速度:任务可以不需要等待线程创建就能立即执行
提高线程可管理性:线程池会保持一些基本的线程统计信息,以便对线程进行有效管理,使得能对所接收到的异步任务进行高效调度
1. 使用Executors创建线程
Executors工厂类提供了四种快捷方式创建线程池
方法名称 | 说明 |
---|---|
newSingleThreadExecutor() | 创建只有一个线程的线程池 |
newFixedThreadPool(int nThreads) | 创建固定大小的线程池 |
newCachedThreadPool | 创建一个不限制线程数量的线程池,提交的任务都将立即执行,但空闲线程会得到及时回收 |
newScheduledThreadPool | 创建一个可定期或延时执行任务的线程池 |
1.1. newSingleThreadExecutor
只有一个线程的线程池,用唯一的工作线程来执行任务,可以保证所有任务按照指定顺序执行
该线程池特点如下:
- 单线程化的线程池中的任务是按照提交的次序顺序执行
- 池中的唯一线程的存活时间是无限的
- 当池中的唯一线程正繁忙时,新提交的任务实例会进入内部的阻塞队列中,并且其阻塞队列是无界的
使用场景:任务按照提交次序,一个任务一个任务逐个执行
1.2. newFixedThreadPool
用于创建固定数量的线程池,唯一的参数用于设置池中线程的固定数量
该线程池特点如下:
- 线程池没有达到固定数量时,每次提交一个任务线程池内就创建一个新线程,直到线程达到线程池固定数量
- 线程池的大小一旦达到固定数量会保持不变,如果某个线程因执行异常而结束,那么线程池会补充一个新线程
- 在接收异步任务的执行目标实例时,如果池中的所有线程均在繁忙状态,新任务会进入阻塞队列
使用场景:需要任务长期执行,适用于处理CPU密集型任务
1.3. newCachedThreadPool
创建一个可缓存线程池,如线程池内某些线程无事可做成为空闲线程,可缓存线程池可灵活回收这些空闲线程
该线程池特点如下:
- 在接收新的异步任务target执行目标实例时,如果池内所有线程繁忙,此线程池就会添加新线程来处理任务
- 此线程池不会对线程池大小进行限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小
- 如果部分线程空闲,也就是存量线程的数量超过了处理任务数量,就会回收空闲(60秒不执行任务)线程
使用场景:快速处理突发性强、耗时较短
1.4. newScheduledThreadPool
创建一个可调度线程池,提供延时和周期性任务调度功能
当被调任务的执行时间大于指定的间隔时间时,并不会创建一个新的线程去并发执行这个任务,而是等待前一次调度执行完毕
使用场景:周期性执行任务
1.5. Executors创建线程池存在的问题
FixedThreadPool和SingleThreadPool
创建的线程池工作队列的长度都为Integer.MAX_VALUE,可能会堆积大量的任务,从而导致OOM
CachedThreadPool和ScheduledThreadPool
创建的线程池允许创建的线程数量为Integer.MAX_VALUE,可能会导致创建大量的线程,从而导致OOM
2. 使用ThreadPoolExecutor创建线程池
在实际开发过程中创建线程池都是使用构造器ThreadPoolExecutor去构造工作线程池,源码如下:
public ThreadPoolExecutor(
int corePoolSize,//核心线程数,即使线程空闲也不会回收
int maximumPoolSize,//最大线程数
long keepAliveTime, TimeUnit unit,//线程最大空闲时长
BlockingQueue<Runnable> workQueue,//工作队列
ThreadFactory threadFactory,//新线程产生方式(线程工厂)
RejectedExecutionHandler handler)//拒绝策略
2.1. 核心和最大线程数
线程池执行器将会根据corePoolSize和maximumPoolSize自动维护线程池中的工作线程
当在线程池接收到新任务,并且当前工作线程数少于corePoolSize时,即使其他工作线程处于空闲状态,也会创建一个新线程来处理请求,直到线程数达到corePoolSize
如果当前工作线程数多于corePoolSize数,但小于maximumPoolSize数,那么仅当任务队列已满时才会创建新线程,设置corePoolSize和maximumPoolSize相同,可以创建一个固定大小的线程池
当maximumPoolSize被设置为无界值(Integer.MAX_VALUE)时,线程池可以接收任意数的并发任务
corePoolSize和maximumPoolSize不仅能在线程池构造时设置,也可以调用setCorePoolSize()和setMaximumPoolSize()两个方法进行动态更改
2.2. BlockingQueue
实例用于暂时接收到的异步任务,当线程池核心线程都不可用,那么接收到的目标任务会缓存在该队列中
2.3. keepAliveTime
用于设置池内线程最大空闲时长,超过该时长非核心线程会被回收。如果池在使用过程中提交任务频率变高,也可以调用方法setKeepAliveTime(long, TimeUnit)进行线程存活时长的动态变更,如需防止空闲线程被终止,可以将空闲时长设置为无限大。默认情况下,空闲超时策略仅适用于存在超时corePoolSize线程的情况,若调用了allowCoreThreadTimeOut(boolean)方法,且传入了参数true,则keepAliveTime参数所设置的Idle超时策略也将被应用于核心线程
2.4. 向线程池提交任务的两种方式
调用execute()方法
void execute(Runnable command);
调用submit()方法
Future<?> submit(Runnable task);
<T> Future<T> submit(Runnable task, T result);
<T> Future<T> submit(Callable<T> task);
两个方法的区别如下:
- 接收的参数不一样
execute()方法只能接收Runnable类型的参数,而submit()方法可以接收Callable、Runnable两种类型的参数。Callable类型的任务是可以返回执行结果的,而Runnable类型的任务不可以返回执行结果
Runnable和Callable的主要区别为:Callable允许有返回值,Runnable不允许有返回值;Runnable不允许抛出异常,Callable允许抛出异常 - submit()提交任务后会有返回值,而execute()没有
execute()方法主要用于启动任务的执行,submit()方法也用于启动任务的执行,启动之后会返回Future对象,表示一个异步执行实例,通过该异步执行实例去获取结果 - submit()方便Exception处理
execute()方法在启动任务执行后,不关心任务执行过程中可能发生的异常。而通过submit()方法返回的Future对象(异步执行实例),可以进行异步执行过程中的异常捕获
2.5. 线程池任务调度流程
- 当前工作线程数小于核心线程数,优先创建任务线程,而不是从线程队列中获取一个空闲线程
- 当池中任务数大于核心线程池数,新任务加入阻塞队列。核心线程数用完、阻塞队列没满时线程池不会创建新线程
- 当完成任务时,优先从阻塞队列获取下一个任务,直到阻塞队列为空,所有缓存任务执行完成
- 当核心线程数用完,阻塞队列已满,线程池接收到新任务,会创建一个非核心线程,并立即执行新任务
- 当核心线程数用完,阻塞队列已满,线程总数超过maximumPoolSize,线程池会拒绝接收新任务,执行拒绝策略
2.6. ThreadFactory
ThreadFactory线程工厂只要一个方法
Thread newThread(Runnable r);
创建新线程时,可以更改所创建的新线程的名称、线程组、优先级、守护进程状态。使用Executors创建新的线程池时,也可以基于ThreadFactory创建,只需要指定ThreadFactory实例,如果没有指定会使用Executors.defaultThreadFactory默认实例,这样所创建的线程位于同一个ThreadGroup中,具有相同的NORM_PRIORITY(优先级为5),且都为非守护进程状态
2.7. 阻塞队列
阻塞队列为空时会阻塞当前线程的元素获取操作,在一个线程从一个空的阻塞队列中获取元素时线程会被阻塞,直到阻塞队列中有了元素;当队列中有元素后,被阻塞的线程会自动被唤醒。
BlockingQueue是JUC包中的一个接口,常用的实现类有以下几种:
ArrayBlockingQueue:数组实现的有界阻塞队列,队列中的元素按FIFO排序,创建时必须设置大小,线程池任务超过corePoolSize时,任务缓存的数量只能为创建时设置的大小,若该阻塞队列已满,则会为新的任务创建线程,直到线程池中的线程总数大于maximumPoolSize
LinkedBlockingQueue:基于链表实现的阻塞队列,按FIFO排序任务,可以设置容量,默认为Integer.Max_VALUE,当接收的任务数超过corePoolSize时,新任务无限地缓存进该队列,直到资源耗尽,快捷创建线程池有两个方法Executors.newSingleThreadExecutor和Executors.newFixedThreadPool使用了这个队列,队列吞吐量高于ArrayBlockingQueue
PriorityBlockingQueue:具有优先级的无界队列
DelayQueue:无界阻塞延迟队列,基于PriorityBlockingQueue实现,队列中每个元素都有过期时间,当从队列获取元素时,只有已经过期的元素才会出队,队列头部的元素是过期最快的元素,Executors.newScheduledThreadPool所创建的线程池使用此队列
SynchronousQueue:不存储元素的阻塞队列,每个插入操作必须等到另一个线程的调用移除操作,否则插入操作一直处于阻塞状态,吞吐量高于LinkedBlockingQueue,Executors.newCachedThreadPool所创建的线程池使用此队列,不会保存提交的任务,而是直接新建一个线程来执行新来的任务
2.8. 拒绝策略
线程池的缓存队列为有界队列时,如果队列满了,提交新任务到线程池时会被拒绝,任务被拒绝通常有两种情况
- 线程池已经被关闭
- 工作队列已满且maximumPoolSize已满
常见的拒绝策略有如下几种:
AbortPolicy:拒绝策略
如果线程池队列满了,新任务就会被拒绝,且抛出RejectedExecutionException异常,线程池默认拒绝策略
DiscardPolicy:抛弃策略
如果线程池队列满了,新任务就会直接被丢弃,且不会抛出异常
DiscardOldestPolicy:抛弃最早任务策略
如果线程池队列满了,将最早进入队列(队列头)的任务抛弃,从队列中腾出空间,再尝试加入队列
CallerRunsPolicy:调用者执行策略
在新任务被添加进线程池时,如果添加失败,那么提交任务线程会自己去执行该任务,不会使用线程池中的线程去执行新任务
自定义策略
也可以自定义一个拒绝策略,只需要实现RejectedExecutionHandler接口的rejectedExecution方法即可
2.9. 线程池的五种状态
RUNNING:线程池创建之后的初始状态,此状态下可以执行任务
SHUTDOWN:该状态线程池不接受新任务,会将工作队列中的任务执行完毕
STOP:该状态线程池不接受新任务,也不处理工作队列中剩余任务,且将会中断所有工作线程
TIDYING:该状态所有任务都已终止或处理完成,将会执行terminated()钩子方法
TERMINATED:执行完terminated()钩子方法之后的状态
线程池状态转换规则
- 线程池创建之后状态为RUNNING
- 执行shutdown()实例方法,线程池状态从RUNNING转变为SHUTDOWN
- 执行shutdownNow()实例方法,线程池状态从RUNNING转变为STOP
- 线程池状态为SHUTDOWN时,执行hutdownNow()方法会将其状态转变为STOP
- 线程池所有工作线程停止,工作队列清空后,状态从STOP转为TIDYING
- 执行完terminated()钩子方法之后,线程池状态从TIDYING转变为TERMINATED
关闭线程池步骤如下:
- 执行shutdown()方法,拒绝新任务的提交,并等待所有任务有序执行完毕
- 执行awaitTermination(long timeout,TimeUnit unit)方法,指定超时时间,判断是否已经关闭所有任务,线程池关闭完成
- 如果awaitTermination()方法返回false,或被中断,就调用shutDownNow()方法立即关闭线程池所有任务
- 补充执行awaitTermination(long timeout,TimeUnit unit)方法,判断线程池是否关闭完成,如果超时,可以进入循环关闭,循环一定的次数,不断关闭线程池,直到其关闭或循环结束
2.10. 确定线程池线程数
按照任务类型对线程池可以分为三类
IO密集型任务
主要是执行IO操作,由于执行IO操作时间较长,导致CPU利用率不高,CPU常处于空闲状态,Netty的IO读写操作为此类任务
核心线程池数 = CPU核数 * 2
CPU密集型任务
主要执行计算任务,由于响应时间快,CPU一直在运行,CPU利用率高
核心线程数 = CPU核心数 + 1
多出一个线程是为了防止线程偶发的缺页中断,或其他原因导致的任务暂停而带来的影响
混合型任务
既要执行逻辑计算,又要进行IO操作,由于执行IO操作耗时较长,CPU利用率也不高,Web服务器的HTTP请求处理操作为此类任务
核心线程数 = (线程等待时长与线程CPU时长之比 + 1) * CPU核数