一、阿里Java开发规范,为啥禁止直接使用Executors创建线程池
- newFixdThreadPool 及 singleThreadPool 中默认队列长度为 Integer.MAX_VALUE,如果线程执行比较耗时,执行任务的线程在队列中产生大量堆积,进而有导致虚拟机OOM 的风险。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
//默认长度 Integer.MAX_VALUE
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
- newCachedThreadPool 允许创建最大线程数量为 Integer.MAX_VALUE,当大量任务需要执行时,可能会导致大量线程的创建,每个线程默认占1M,线程占据资源过多或队列中线程产生堆积,导致 CPU 过高或 java虚拟机 OOM 的问题 。
// 最大线程数 默认为:Integer.MAX_VALUE
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- 使用ThreadPoolExecutor 来构造线程池, 建议明确指定线程池的核心参数:最小线程数、最大线程数,队列长度、拒绝策略等等,确保线程池运稳定运行。
二、如何合理配置线程池的大小
- CPU 密集型: 主要执行计算任务,响应时间较快,任务 cpu 的利用率很高。线程数的配置由服务器 CPU 核心数来决定, 建议CPU 核心数=同时最大执行线程数,如 CPU 核心数为 8,表明服务器最多能同时执行 8个线程。创建过多的线程反而会导致线程之间上下文切换,降低执行效率。推荐:线程池的最大线程数= cpu 核心数+1。
- IO 密集型: 主要进行 IO 操作,执行 IO 操作(文件读写、网络通信等)相对比较耗时,cpu相对处于空闲状态, 导致 cpu 的利用率不高,建议可以增加线程的数量。IO密集型操作建议结合线程的等待时长来做判断,等待时间越长,线程数相对配置的越多。一般建议配置 cpu 核心数的 2 倍。 基础公式:线程池设定最佳线程数目 = ((线程池配置的线程等待时间+线程 CPU 时间)/ 线程 CPU 时间 )* CPU 数目 。公式中线程 cpu等待时间是计算出程序中单个线程在 cpu 上运行的时间。
三、线程池中的线程的初始化
线程池创建后,线程池默认没有初始化,简而言之就是线程池中没有线程,在提交任务之后才会创建线程。 在并发系统中,为提高系统的吞吐量,建议在线程池创建之后立即创建线程。通过以下两个方法可以实现线程的初始化:
ExecutorService executorService = new ThreadPoolExecutor(10,20,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
ThreadPoolExecutor threadPoolExecutor=(ThreadPoolExecutor)executorService;
//方式1 初始化所有核心线程
threadPoolExecutor.prestartAllCoreThreads();
//方式2 初始化一个核心线程
threadPoolExecutor.prestartCoreThread();
四、线程池的关闭
- shutdown():不会立即终止线程池,等所有任务队列中的任务都执行完后才终止,同时也不会接受新的任务 。
- shutdownNow():立即终止线程池,并尝试打断正在执行的任务且清空任务队列,返回未执行的任务。
五、线程池核心参数支持动态调整及线程池监控
- setCorePoolSize:设置核心线程数
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
else if (delta > 0) {
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
- setMaximumPoolSize:设置线程池最大线程数目
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
if (workerCountOf(ctl.get()) > maximumPoolSize)
interruptIdleWorkers();
}
- beforeExecute: 获取执行之前的核心参数
- afterExecute: 获取执行之后的核心参数
protected void beforeExecute(Thread t, Runnable r) {
BlockingQueue<Runnable> blockingQueue = getQueue();
log.info("执行前! 线程池名称:{},初始线程数:{},核心线程数:{},活跃的任务数量:{},已经执行的任务数:{},任务总数:{},允许最大的线程数:{}," +
"线程允许空闲时间:{},队列的容量:{}",name,getPoolSize(),getCorePoolSize(),getActiveCount(),getCompletedTaskCount(),
getTaskCount(),getMaximumPoolSize(),getKeepAliveTime(TimeUnit.MILLISECONDS),(blockingQueue.size() + blockingQueue.remainingCapacity()));
}
/**
* @description: 线程监控使用
* @param: r
* @param: t
* @return: void
* @author:
* @date: 2022/1/17 16:58
*/
protected void afterExecute(Runnable r, Throwable t) {
BlockingQueue<Runnable> blockingQueue = getQueue();
log.info("执行后! 线程池名称:{},初始线程数:{},核心线程数:{},活跃的任务数量:{},已经执行的任务数:{},任务总数:{},允许最大的线程数:{}," +
"线程允许空闲时间:{},队列的容量:{}",name,getPoolSize(),getCorePoolSize(),getActiveCount(),getCompletedTaskCount(),
getTaskCount(),getMaximumPoolSize(),getKeepAliveTime(TimeUnit.MILLISECONDS),(blockingQueue.size() + blockingQueue.remainingCapacity()));
}
六、推荐开源线程池监控 Hippo4j
- 动态可观测线程池框架,为业务系统提高线上运行保障能力
官网:Hippo4j | Hippo4j