项目实战-我们应该如何正确得创建线程池?
对于现在而言多线程编程已经成为程序员必备的职业技能了,在开发实践过程中,你是否也遇到过相关多线程问题,比如创建多少线程才是合适的?线程池该如何创建?今天我们就来盘一盘“线程池”这个多线程编程中最重要的话题,让你知道我们在项目中应该如何正确创建使用线程池。
1、什么是线程池?
线程池(ThreadPool)是一种基于池化思想管理和使用线程的机制。它是将多个线程预先存储在一个“池子”内,当有任务出现时可以避免重新创建和销毁线程所带来性能开销,只需要从“池子”内取出相应的线程执行对应的任务即可。
线程池的优势主要体现在以下 4 点:
- 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
- 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
- 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
同时阿里巴巴在其《Java开发手册》中也强制规定:线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
2、如何创建线程池?
线程池的创建方法总共有 7 种,但总体来说可分为 2 类:
- 一类是通过
ThreadPoolExecutor
创建的线程池; - 另一个类是通过
Executors
创建的线程池。
阿里巴巴《Java开发手册》【强制要求】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。因此本文主要介绍 ThreadPoolExecutor 创建方式。
说明:Executors 返回的线程池对象的弊端如下:
- FixedThreadPool 和 SingleThreadPool:允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
- CachedThreadPool:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
ThreadPoolExecutor 最多可以设置 7 个参数,如下代码所示:
ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
7 个参数代表的含义如下:
-
corePoolSize :表示线程池保有的最小线程数。有些项目很闲,但是也不能把人都撤了,至少要留 corePoolSize 个人坚守阵地。
-
maximumPoolSize:表示线程池创建的最大线程数。当项目很忙时,就需要加人,但是也不能无限制地加,最多就加到 maximumPoolSize 个人。当项目闲下来时,就要撤人了,最多能撤到 corePoolSize 个人。
-
keepAliveTime & unit:上面提到项目根据忙闲来增减人员,那在编程世界里,如何定义忙和闲呢?很简单,一个线程如果在一段时间内,都没有执行任务,说明很闲,keepAliveTime 和 unit 就是用来定义这个“一段时间”的参数。也就是说,如果一个线程空闲了keepAliveTime & unit这么久,而且线程池的线程数大于 corePoolSize ,那么这个空闲的线程就要被回收了。
-
workQueue:一个阻塞队列,用来存储线程池等待执行的任务,均为线程安全。
较常用的是 LinkedBlockingQueue: 一个由链表结构组成的有界阻塞队列;
SynchronousQueue:一个不存储元素的阻塞队列,即直接提交给线程不保持它们 。
-
threadFactory:通过这个参数你可以自定义如何创建线程,例如你可以给线程指定一个有意义的名字。
-
handler:通过这个参数你可以自定义任务的拒绝策略。如果线程池中所有的线程都在忙碌,并且工作队列也满了(前提是工作队列是有界队列),那么此时提交任务,线程池就会拒绝接收。至于拒绝的策略,你可以通过 handler 这个参数来指定。
ThreadPoolExecutor 已经提供了以下 4 种策略。
- CallerRunsPolicy:提交任务的线程自己去执行该任务。
- AbortPolicy:默认的拒绝策略,会 throws RejectedExecutionException 。
- DiscardPolicy:直接丢弃任务,没有任何异常抛出。
- DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。
3、线程池的执行流程
ThreadPoolExecutor 关键节点的执行流程如下:
- 当线程数小于核心线程数时,创建线程。
- 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
- 当线程数大于等于核心线程数,且任务队列已满:若线程数小于最大线程数,创建线程;若线程数等于最大线程数,抛出异常,拒绝任务。
线程池的执行流程如下图所示:
4、创建多少线程合适?
创建多少线程合适,要看多线程具体的应用场景。
对于 CPU 密集型计算,多线程本质上是提升多核 CPU 的利用率,所以对于一个 4 核的 CPU,每个核一个线程,理论上创建 4 个线程就可以了,再多创建线程也只是增加线程切换的成本。所以,对于 CPU 密集型的计算场景,理论上“线程的数量 =CPU 核数”就是最合适的。不过在工程上,线程的数量一般会设置为“CPU 核数 +1”,这样的话,当线程因为偶尔的内存页失效或其他原因导致阻塞时,这个额外的线程可以顶上,从而保证 CPU 的利用率**。
对于 I/O 密集型的计算场景,比如,如果 CPU 计算和 I/O 操作的耗时是 1:1,那么 2 个线程是最合适的。如果 CPU 计算和 I/O 操作的耗时是 1:2,那多少个线程合适呢?是 3 个线程,CPU 在 A、B、C 三个线程之间切换,对于线程 A,当 CPU 从 B、C 切换回来时,线程 A 正好执行完 I/O 操作。这样 CPU 和 I/O 设备的利用率都达到了 100%。
对于 I/O 密集型计算场景,最佳的线程数是与程序中 CPU 计算和 I/O 操作的耗时比相关,总结公式如下:
最佳线程数 =CPU 核数 * [ 1 +(I/O 耗时 / CPU 耗时)]
对于 I/O 密集型计算场景,I/O 耗时和 CPU 耗时的比值是一个关键参数,不幸的是这个参数是未知的,而且是动态变化的,所以工程上,我们要估算这个参数,然后做各种不同场景下的压测来验证我们的估计。
5、线程池实践应用示例
Spring boot 项目实践中,使用多个线程池实现实现任务的线程池隔离,统一创建及管理。
ThreadPoolExecutor 这个类是JDK中的线程池类,继承自Executor,Spring默认的线程池simpleAsyncTaskExecutor,但是Spring更加推荐我们开发者使用ThreadPoolTaskExecutor类来创建线程池,其本质是对 ThreadPoolExecutor 的包装。
5.1 为什么要使用多个线程池?
使用多个线程池,把相同的任务放到同一个线程池中,可以起到隔离的作用,避免有线程出错时影响到其他线程池。例如只有一个线程池时,有两种任务,下单,处理图片,如果线程池被处理图片的任务占满,影响下单任务的进行。
5.2 Java代码
示例一:配置要使用的线程池,按业务类型区分开,
// ThreadPoolConfig.java
@Configuration
@EnableAsync
public class ThreadPoolConfig {
//用来生成缩略图的线程池
@Bean(name = "imageThreadPool")
public ThreadPoolTaskExecutor imageThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 设置核心线程数,它是可以同时被执行的线程数量
executor.setCorePoolSize(2);
// 设置最大线程数,缓冲队列满了之后会申请超过核心线程数的线程
executor.setMaxPoolSize(10);
// 设置缓冲队列容量,在执行任务之前用于保存任务
executor.setQueueCapacity(50);
// 设置线程生存时间(秒),当超过了核心线程出之外的线程在生存时间到达之后会被销毁
executor.setKeepAliveSeconds(60);
// 设置线程名称前缀
executor.setThreadNamePrefix("imagePool-");
// 设置拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
//初始化
executor.initialize();
return executor;
}
//用来发邮件的线程池
@Bean(name = "emailThreadPool")
public ThreadPoolTaskExecutor emailThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 设置核心线程数,它是可以同时被执行的线程数量
executor.setCorePoolSize(2);
// 设置最大线程数,缓冲队列满了之后会申请超过核心线程数的线程
executor.setMaxPoolSize(10);
// 设置缓冲队列容量,在执行任务之前用于保存任务
executor.setQueueCapacity(50);
// 设置线程生存时间(秒),当超过了核心线程出之外的线程在生存时间到达之后会被销毁
executor.setKeepAliveSeconds(60);
// 设置线程名称前缀
executor.setThreadNamePrefix("emailPool-");
// 设置拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
//初始化
executor.initialize();
return executor;
}
}
// 使用示例:说明:Async注解指定线程池的名字是:emailThreadPool
@Service
public class MailServiceImpl implements MailService {
private Logger logger= LoggerFactory.getLogger(MailServiceImpl.class);
@Resource
private MailUtil mailUtil;
//异步发送html格式的邮件,演示时只是sleep1秒
@Async(value="emailThreadPool")
@Override
public void sendHtmlMail() {
logger.info("sendHtmlMail begin");
try {
Thread.sleep(2000);
} catch(InterruptedException e) {
e.printStackTrace();
}
}
}
示例二:
# 1. yml配置
# 线程池配置参数
task:
pool:
corePoolSize: 10 # 设置核心线程数
maxPoolSize: 20 # 设置最大线程数
keepAliveTime: 300 # 设置空闲线程存活时间(秒)
queueCapacity: 100 # 设置队列容量
threadNamePrefix: "-asynnotify-" # 设置线程名称前缀
awaitTerminationSeconds: 60 # 设置线程池等待终止时间(秒)
// 2. 线程配置属性类
@Data
@ConfigurationProperties(prefix = "task.pool")
public class TaskThreadPoolConfig {
private int corePoolSize;
private int maxPoolSize;
private int keepAliveSeconds;
private int queueCapacity;
private int awaitTerminationSeconds;
private String threadNamePrefix;
}
// 3. 开启异步线程支持
//开启异步线程支持
@EnableAsync
@SpringBootApplication
public class AsyncThreadApplication {
public static void main(String[] args) {
SpringApplication.run(AsyncThreadApplication.class, args);
}
}
// 4. 创建自定义线程池配置类
@Configuration
public class AsyncScheduledTaskConfig {
@Autowired
private TaskThreadPoolConfig config;
/**
* 1.这种形式的线程池配置是需要在使用的方法上面添加@Async("customAsyncThreadPool")注解的
* 2.如果在使用的方法上不添加该注解,那么spring就会使用默认的线程池
* 3.如果添加@Async注解但是不指定使用的线程池,又想自己自定义线程池,那么就可以重写spring默认的线程池
*/
@Bean("customAsyncThreadPool")
public Executor customAsyncThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//最大线程数
executor.setMaxPoolSize(config.getMaxPoolSize());
//核心线程数
executor.setCorePoolSize(config.getCorePoolSize());
//任务队列的大小
executor.setQueueCapacity(config.getQueueCapacity());
//线程池名的前缀
executor.setThreadNamePrefix(config.getThreadNamePrefix());
//允许线程的空闲时间30秒
executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
//设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean
executor.setWaitForTasksToCompleteOnShutdown(true);
//设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住
executor.setAwaitTerminationSeconds(config.getAwaitTerminationSeconds());
/**
* 特殊说明:
* 1.这里是示例,拒绝策略咱们采用抛出异常
* 2.真实业务场景会把缓存队列的大小会设置大一些,
* 如果,提交的任务数量超过最大线程数量或将任务环缓存到本地、redis、mysql中,保证消息不丢失
* 3.如果项目比较大的话,异步通知种类很多的话,建议采用MQ做异步通知方案
*/
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
//线程初始化
executor.initialize();
return executor;
}
}