文章目录
- 线程池的基本概念
- 创建线程池的注意事项
- 实例1: `newFixedThreadPool` 使用无界队列,可能因任务积压导致 OOM
- 实例2: `newCachedThreadPool` 会创建大量线程,可能因线程数量过多导致无法创建新线程。
- 线程池参数设置的最佳实践
- 线程池默认的工作行为
- 预先启动核心线程
- 监控线程池状态的方法
- 线程池的混用策略
- 线程池的基本概念和使用场景
- 线程池创建时的注意事项,包括手动创建与使用
Executors
类的区别 - 案例分析
newFixedThreadPool
和newCachedThreadPool
可能引发的问题 - 线程池参数设置的最佳实践
- 监控线程池状态
- 线程池的混用策略
线程池的基本概念
在程序中,我们会用各种池化技术来缓存创建昂贵的对象,比如线程池、连接池、内存池。一般是预先创建一些对象放入池中,使用的时候直接取出使用,用完归还以便复用,还会通过一定的策略调整池中缓存对象的数量,实现池的动态伸缩
线程池是一种管理线程的机制,通过重用线程来减少创建和销毁线程的开销,适用于处理短平快的任务。线程池的核心组成部分包括核心线程数、最大线程数、工作队列及拒绝策略。
创建线程池的注意事项
在 Java 中,Executors
类提供了快速创建线程池的方法,但在生产环境中,建议手动使用 ThreadPoolExecutor
来创建线程池。这是因为 Executors
提供的某些方法可能导致内存溢出(OOM)等问题。
实例1: newFixedThreadPool
使用无界队列,可能因任务积压导致 OOM
/**
* 触发OOM(OutOfMemoryError)的测试方法
* 通过向固定大小的线程池中提交大量任务,每个任务在执行时生成大量的字符串并保持在内存中
* 直到线程池被关闭,以此来模拟和测试OOM的情况
*
* @throws InterruptedException 如果在等待线程池终止时被中断
*/
@GetMapping("oom1")
public void oom1() throws InterruptedException {
// 创建一个固定大小为1的线程池,以控制并发任务的数量
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
// 打印线程池的统计信息,监控线程池的状态
printStats(threadPool);
// 提交给线程池大量的任务,以模拟高并发的场景
for (int i = 0; i < 100000000; i++) {
// 每个任务在执行时会生成一个很大的字符串,并尝试将其保持在内存中
threadPool.execute(() -> {
// 生成一个由大量字符组成的字符串,以占用大量内存
String payload = IntStream.rangeClosed(1, 1000000)
.mapToObj(__ -> "a")
.collect(Collectors.joining("")) + UUID.randomUUID().toString();
// 使当前任务暂停1小时,模拟长时间运行的任务
try {
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
// 捕获中断异常,但不执行任何操作
}
// 记录生成的字符串,进一步增加内存的使用
log.info(payload);
});
}
// 关闭线程池,不再接受新的任务
threadPool.shutdown();
// 等待线程池中的所有任务完成,或直到指定的超时时间结束
threadPool.awaitTermination(1, TimeUnit.HOURS);
}
/**
* 定期打印线程池的运行统计信息
* 此方法内部创建了一个新的单线程调度器,用于定期执行打印线程池统计信息的任务
* 它提供了线程池大小、活动线程数、已完成任务数和队列中任务数的信息
* 这些信息有助于监控线程池的性能和工作负载
*
* @param threadPool 线程池对象,其统计信息将被打印
*/
private void printStats(ThreadPoolExecutor threadPool) {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
// 打印分割线,用于区分不同的统计时间点
log.info("=========================");
// 打印线程池当前的线程数量
log.info("Pool Size: {}", threadPool.getPoolSize());
// 打印当前活动线程的数量
log.info("Active Threads: {}", threadPool.getActiveCount());
// 打印已完成任务的总数
log.info("Number of Tasks Completed: {}", threadPool.getCompletedTaskCount());
// 打印队列中等待执行的任务数量
log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size());
// 再次打印分割线,结束本次统计信息的打印
log.info("=========================");
}, 0, 1, TimeUnit.SECONDS);
}
newFixedThreadPool
方法线程池的工作队列直接 new 了一个
LinkedBlockingQueue
,而默认构造方法的 LinkedBlockingQueue
是一个Integer.MAX_VALUE
长度的队列,可以认为是无界的
虽然使用 newFixedThreadPool
可以把工作线程控制在固定的数量上,但任务队列是无界的。如果任务较多并且执行较慢的话,队列可能会快速积压,撑爆内存导致 OOM
实例2: newCachedThreadPool
会创建大量线程,可能因线程数量过多导致无法创建新线程。
/**
* 触发OOM(OutOfMemoryError)的测试方法
* 通过创建大量的线程和字符串对象,最终导致内存溢出
* 此方法主要用于演示和测试目的,实际应用中应避免此类设计
*/
@GetMapping("oom2")
public void oom2() throws InterruptedException {
// 创建一个可缓存的线程池,按需(每个任务)创建新线程
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newCachedThreadPool();
// 打印线程池的统计信息
printStats(threadPool);
// 循环提交大量任务到线程池
for (int i = 0; i < 100000000; i++) {
// 每个任务生成一个随机UUID字符串,并尝试休眠1小时
threadPool.execute(() -> {
String payload = UUID.randomUUID().toString();
try {
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
// 捕获中断异常,但不执行任何操作
}
// 日志记录生成的UUID字符串
log.info(payload);
});
}
// 关闭线程池,不再接受新任务
threadPool.shutdown();
// 等待线程池中的所有任务完成,最多等待1小时
threadPool.awaitTermination(1, TimeUnit.HOURS);
}
查看newCachedThreadPool
的源码可以看到,这种线程池的最大线程数是Integer.MAX_VALUE
,可以认为是没有上限的,而其工作队列 SynchronousQueue
是一个没有存储空间的阻塞队列。这意味着,只要有请求到来,就必须找到一条工作线程来处理,如果当前没有空闲的线程就再创建一条新的。
由于我们的任务需要 1 小时才能执行完成,大量的任务进来后会创建大量的线程。我们知道线程是需要分配一定的内存空间作为线程栈的,比如 1MB,因此无限制创建线程必然会导致 OOM。
不建议使用 Executors 提供的两种快捷的线程池
-
需要根据自己的场景、并发情况来评估线程池的几个核心参数,包括核心线程数、最大线程数、线程回收策略、工作队列的类型,以及拒绝策略,确保线程池的工作行为符合需求,一般都需要设置有界的工作队列和可控的线程数
-
任何时候,都应该为自定义线程池指定有意义的名称,以方便排查问题。当出现线程数量暴增、线程死锁、线程占用大量 CPU、线程执行出现异常等问题时,我们往往会抓取线程栈。此时,有意义的线程名称,就可以方便我们定位问题
除了建议手动声明线程池以外,还建议用一些监控手段来观察线程池的状态。线程池这个组件往往会表现得任劳任怨、默默无闻,除非是出现了拒绝策略,否则压力再大都不会抛出一个异常。如果我们能提前观察到线程池队列的积压,或者线程数量的快速膨胀,往往可以提早发现并解决问题
线程池参数设置的最佳实践
根据应用场景,合理设置以下参数:
- 核心线程数:应根据任务的并发性和执行时间进行调整。
- 最大线程数:应限制线程数量以防止资源耗尽。
- 工作队列:应使用有界队列来防止无穷的任务积压。
- 拒绝策略:根据应用需求选择合适的拒绝策略,例如
AbortPolicy
或CallerRunsPolicy
。
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* 处理 "good" GET 请求的控制器方法
* 该方法演示了如何在Spring MVC环境中使用线程池执行异步任务
* 它创建了一个固定大小的线程池,并提交了多个任务去执行
*
* @return 返回 AtomicInteger 的值,用于跟踪任务的完成数量
* @throws InterruptedException 如果在等待过程中线程被中断
*/
@GetMapping("good")
public int good() throws InterruptedException {
// 使用 AtomicInteger 来跟踪任务的唯一标识
AtomicInteger atomicInteger = new AtomicInteger();
// 创建一个 ThreadPoolExecutor,配置核心线程数、最大线程数、空闲线程存活时间、工作队列等
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
2, 5,
5, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10),
new ThreadFactoryBuilder().setNameFormat("demo-threadpool-%d").build(),
new ThreadPoolExecutor.AbortPolicy());
// 打印线程池的统计信息,监控线程池状态
printStats(threadPool);
// 生成并提交 20 个任务到线程池
IntStream.rangeClosed(1, 20).forEach(i -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 获取并递增任务ID
int id = atomicInteger.incrementAndGet();
try {
// 提交任务到线程池执行
threadPool.submit(() -> {
log.info("{} started", id);
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
}
log.info("{} finished", id);
});
} catch (Exception ex) {
// 如果任务提交失败,记录错误信息并递减任务ID
log.error("error submitting task {}", id, ex);
atomicInteger.decrementAndGet();
}
});
// 主线程休眠60秒,等待任务完成
TimeUnit.SECONDS.sleep(60);
// 返回完成的任务数量
return atomicInteger.intValue();
}
初始化线程池:
创建一个 AtomicInteger 对象 atomicInteger,用于记录任务ID。
创建一个 ThreadPoolExecutor 对象 threadPool,配置核心线程数为2,最大线程数为5,空闲线程存活时间为5秒,任务队列容量为10,线程工厂设置名称格式,拒绝策略为AbortPolicy。
打印线程池状态:
调用 printStats 方法,每隔1秒打印线程池的当前状态。
提交任务:
- 使用 IntStream.rangeClosed(1, 20) 生成1到20的整数流。
- 每次迭代中,休眠1秒,获取任务ID,尝试提交任务到线程池。
- 任务内容为记录开始日志,休眠10秒,记录结束日志。
- 如果提交失败,记录错误并减少任务计数。
等待和返回结果:
等待60秒后,返回提交任务的总数。
线程池默认的工作行为
-
核心线程数 (
corePoolSize
):- 线程池在没有任务时,会保持
corePoolSize
个线程活跃。 - 当有任务提交时,线程池会首先尝试复用这些核心线程。
- 线程池在没有任务时,会保持
-
任务堆积:
- 如果所有核心线程都在忙碌,后续的任务会被添加到工作队列(如
LinkedBlockingQueue
)中等待处理。 - 这个工作队列的大小是有限的,默认情况下是无界的。
- 如果所有核心线程都在忙碌,后续的任务会被添加到工作队列(如
-
扩容:
- 一旦工作队列满了,线程池会尝试扩容,创建新的工作线程,直到达到
maximumPoolSize
。 maximumPoolSize
是线程池允许的最大线程数。
- 一旦工作队列满了,线程池会尝试扩容,创建新的工作线程,直到达到
-
拒绝策略:
- 如果队列和线程池都已满,线程池会根据预设的拒绝策略(如
AbortPolicy
、CallerRunsPolicy
等)处理新提交的任务。
- 如果队列和线程池都已满,线程池会根据预设的拒绝策略(如
-
线程回收:
- 当线程数大于
corePoolSize
时,如果线程在keepAliveTime
内没有处理新任务,它们会被终止,回收至核心线程数。
- 当线程数大于
以下是ThreadPoolExecutor
构造函数的参数及其影响:
corePoolSize
:核心线程数,保持活跃的最小线程数。maximumPoolSize
:最大线程数,能够创建的最大线程数量。keepAliveTime
:非核心线程闲置时间,超过此时间后将被回收。unit
:时间单位,与keepAliveTime
一起使用。workQueue
:用于存储等待执行任务的队列。handler
:拒绝策略,用于处理无法被执行的任务。
预先启动核心线程
在Java的ThreadPoolExecutor
中,预先启动核心线程意味着在创建线程池时,线程池会立即启动并激活corePoolSize
个核心线程。这可以减少任务到达时的响应延迟,尤其是在预计会有大量任务同时提交的场景下。
相关参数和方法
- 核心线程数 (
corePoolSize
):指定要保持活跃的最小线程数。 allowCoreThreadTimeOut
:如果设置为true
,当核心线程在keepAliveTime
内没有执行任务时,它们会被回收。
预先启动核心线程的方法
可以使用prestartAllCoreThreads()
方法来预先启动所有核心线程:
prestartCoreThread()
:启动单个核心线程(如果核心线程未被创建)。prestartAllCoreThreads()
:启动所有核心线程。
演示如何预先启动核心线程:
import java.util.concurrent.*;
public class PrestartCoreThreadsExample {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // corePoolSize
4, // maximumPoolSize
60, // keepAliveTime
TimeUnit.SECONDS, // unit
new LinkedBlockingQueue<>() // workQueue
);
// 预先启动所有核心线程
executor.prestartAllCoreThreads();
// 提交任务
for (int i = 0; i < 5; i++) {
final int taskId = i;
executor.execute(() -> {
System.out.println("Executing task " + taskId);
try {
Thread.sleep(1000); // 模拟任务执行
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
executor.shutdown(); // 关闭线程池
}
}
监控线程池状态的方法
建议在生产环境中添加监控,定期输出线程池的状态信息。可以使用定时任务定期打印线程池的基本信息,如线程数、活跃线程数、完成任务数量等。
/**
* 定期打印线程池的运行统计信息
* 此方法内部创建了一个新的单线程调度器,用于定期执行打印线程池统计信息的任务
* 它提供了线程池大小、活动线程数、已完成任务数和队列中任务数的信息
* 这些信息有助于监控线程池的性能和工作负载
*
* @param threadPool 线程池对象,其统计信息将被打印
*/
private void printStats(ThreadPoolExecutor threadPool) {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
// 打印分割线,用于区分不同的统计时间点
log.info("=========================");
// 打印线程池当前的线程数量
log.info("Pool Size: {}", threadPool.getPoolSize());
// 打印当前活动的线程数量
log.info("Active Threads: {}", threadPool.getActiveCount());
// 打印已完成任务的总数
log.info("Number of Tasks Completed: {}", threadPool.getCompletedTaskCount());
// 打印队列中等待执行的任务数量
log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size());
// 再次打印分割线,结束本次统计信息的打印
log.info("=========================");
}, 0, 1, TimeUnit.SECONDS);
}
线程池的混用策略
要根据任务的“轻重缓急”来指定线程池的核心参数,包括线程数、回收策略和任务队列
-
IO密集型任务:
- 特点:执行时间较长、数量较少,通常涉及网络请求、文件操作等。
- 配置建议:
- 可以增加核心线程数,因为这些任务通常会在等待IO时处于阻塞状态。
- 不需要太大的任务队列,以避免内存消耗过大。
-
计算密集型任务:
- 特点:执行时间短、数量较多,主要涉及CPU计算。
- 配置建议:
- 线程数量应接近CPU核心数或核心数的两倍,以优化CPU资源利用。
- 需要较长的任务队列来处理任务高峰,防止因线程不足导致任务拒绝。
优化策略
-
根据任务特性选择线程池:
- 对于IO密集型任务,使用较大的线程池以处理多任务并发。
- 对于计算密集型任务,使用较小的线程池,避免线程切换的开销。
-
合理设置核心参数:
corePoolSize
:根据任务特性选择适当的线程数。maximumPoolSize
:根据系统资源设置合理的最大线程数。keepAliveTime
:调节非核心线程的存活时间,以便更好地应对任务波动。
-
使用不同类型的线程池:
- 对于短期任务,可以考虑使用
CachedThreadPool
,它会动态创建和回收线程。 - 对于长期任务,可以使用
FixedThreadPool
,确保线程数不变,适合处理稳定负载的任务。
- 对于短期任务,可以考虑使用
CachedThreadPool
与FixedThreadPool
的特性
-
CachedThreadPool
:- 特点:可以动态创建线程,根据需要创建新的线程,空闲的线程会被回收,适合短时间内大量并发任务。
- 优点:灵活性高,能够适应突发的任务需求。
- 缺点:在任务量大且长时间运行时,可能导致资源耗尽或系统负载过高。
-
FixedThreadPool
:- 特点:线程池中线程数固定,适合长期稳定的负载。
- 优点:可控性强,避免了因动态创建线程导致的资源问题。
- 缺点:在任务量大时可能会出现任务被阻塞的情况,因为线程数不够。
适用场景与局限性
-
短期任务:
- 适合使用:
CachedThreadPool
。当任务数量不确定且突发性强时,CachedThreadPool
能够快速响应并执行任务。 - 注意事项:应监控系统资源,以防因过多线程创建导致内存或CPU负担过重。
- 适合使用:
-
长期任务:
- 适合使用:
FixedThreadPool
。对于稳定负载的情况,FixedThreadPool
可以确保线程数不变,提升执行效率。 - 注意事项:线程数过少可能会导致任务排队,线程数过多可能会增加上下文切换的开销。
- 适合使用: