1、ThreadPoolTaskExecutor 创建线程池
从它的创建和使用说起,创建和使用的代码如下:
创建:
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveSeconds);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("ExecutorPool-");
executor.initialize();
return executor;
使用:
threadPoolTaskExecutor.execute(() -> {
//do something...
});
上面创建线程池使用的是ThreadPoolTaskExecutor ,点击进入executor.initialize()
方法:
ExecutorConfigurationSupport.java
/**
* Set up the ExecutorService.
*/
public void initialize() {
if (logger.isInfoEnabled()) {
logger.info("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
}
if (!this.threadNamePrefixSet && this.beanName != null) {
setThreadNamePrefix(this.beanName + "-");
}
this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
}
再进入initializeExecutor方法:
ThreadPoolTaskExecutor.java
/**
* Note: This method exposes an {@link ExecutorService} to its base class
* but stores the actual {@link ThreadPoolExecutor} handle internally.
* Do not override this method for replacing the executor, rather just for
* decorating its {@code ExecutorService} handle or storing custom state.
*/
@Override
protected ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
ThreadPoolExecutor executor;
if (this.taskDecorator != null) {
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler) {
@Override
public void execute(Runnable command) {
Runnable decorated = taskDecorator.decorate(command);
if (decorated != command) {
decoratedTaskMap.put(decorated, command);
}
super.execute(decorated);
}
};
}
else {
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler);
}
if (this.allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
}
this.threadPoolExecutor = executor;
return executor;
}
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,
maxPoolSize,
keepAliveSeconds,
TimeUnit.SECONDS,
queue,
threadFactory,
rejectedExecutionHandler);
可以看到new ThreadPoolExecutor()
,ThreadPoolTaskExecutor 本质是使用ThreadPoolExecutor,那为何要存在ThreadPoolTaskExecutor?
ThreadPoolTaskExecutor的方法基本都是围绕如何创建ThreadPoolExecutor,安全有效设置各种参数,添一些行为等。
如下面设置MaxPoolSize(最大线程池数),它考虑到并发同步、threadPoolExecutor 是否为null的问题。
ThreadPoolTaskExecutor.java
/**
* Set the ThreadPoolExecutor's maximum pool size.
* Default is {@code Integer.MAX_VALUE}.
* <p><b>This setting can be modified at runtime, for example through JMX.</b>
*/
public void setMaxPoolSize(int maxPoolSize) {
synchronized (this.poolSizeMonitor) {
this.maxPoolSize = maxPoolSize;
if (this.threadPoolExecutor != null) {
this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize);
}
}
}
再如:
设置队列时,它会创建一个安全的队列,有合适大小的LinkedBlockingQueue或者没有大小的SynchronousQueue。从而避免使用newSingleThreadExecutor这类创建一个不安全的等待队列。
ThreadPoolTaskExecutor.java
/**
* Create the BlockingQueue to use for the ThreadPoolExecutor.
* <p>A LinkedBlockingQueue instance will be created for a positive
* capacity value; a SynchronousQueue else.
* @param queueCapacity the specified queue capacity
* @return the BlockingQueue instance
* @see java.util.concurrent.LinkedBlockingQueue
* @see java.util.concurrent.SynchronousQueue
*/
protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
if (queueCapacity > 0) {
return new LinkedBlockingQueue<>(queueCapacity);
}
else {
return new SynchronousQueue<>();
}
}
newSingleThreadExecutor方式创建的等待队列中的LinkedBlockingQueue容量是Integer.MAX_VALUE
Executors.java
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
LinkedBlockingQueue.java
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
ThreadPoolTaskExecutor的父类ExecutorConfigurationSupport的方法可以设置BeanName、ThreadNamePrefix等。
总结来说在spring项目中使用ThreadPoolTaskExecutor创建线程池是首推使用的。
2、ThreadPoolExecutor解读
2.1 基本使用介绍
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,
maxPoolSize,
keepAliveSeconds,
TimeUnit.SECONDS,
queue,
threadFactory,
rejectedExecutionHandler);
-
corePoolSize:线程池的核心线程数,定义了最小可以同时运行的线程数量。
-
maximumPoolSize:线程池的最大线程数。队列中存放的任务达到队列容量时,可以同时运行的线程数量变为最大线程数。
-
keepAliveTime:当线程池中的线程数量大于corePoolSize时,如果没有新任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了KeepAliveTime才会被回收销毁。
-
unit:keepAliveTime参数的时间单位,包括DAYS、HOURS、MINUTES、MILLISECONDS等。
-
workQueue:用于保存等待执行任务的阻塞队列。可以选择以下集个阻塞队列:
- ArrayBlockingQueue:是一个基于数组结构的阻塞队列,此队列按FIFO原则对元素进行排序;
- LinkedBlockingQueue:是一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通常高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列;
- SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量常高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool()使用了这个队列;
- PriorityBlockingQueue:一个具有优先级的无限阻塞队列;
-
threadFactory:用于设置创建线程的工厂,可以通过工厂给每个创造出来的线程设置更有意义的名字。使用开源框架guava提供的ThreadFactoryBuilder可以快速给线程池里的线程设置有意义的名字:new ThreadFactoryBuilder().setNameFormat(“XX-task-%d”).build();7)handler:饱和策略。若当前同时运行的线程数量达到最大线程数量并且队列已经被放满,ThreadPoolExecutor定义了一些饱和策略:
- ThreadPoolExecutor.AbortPolicy:直接抛出RejectedExecutionException异常来拒绝处理新任务;
- ThreadPoolExecutor.CallerRunsPolicy:只用调用者所在的线程来运行任务,会降低新任务的提交速度,影响程序的整体性能;
- ThreadPoolExecutor.DiscardPolicy:不处理新任务,直接丢弃掉;
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列中最近的一个任务,执行当前任务;
处理流程:
1.在创建了线程池之后,等待提交过来的任务请求
2.当调用execute()方法添加一个请求任务的时候,线程池会做出如下判断:
2.1 如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个程序
2.2 如果正在运行的线程数量大于或者等于corePoolSize,那么将这个任务放入队列
2.3 如果这个时候队列满了并且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻执行这个任务
2.4 如果队列满了并且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行
3. 当一个线程完成任务的时候,它会从队列中取下一个任务来执行
4. 当一个线程无事可做超过一定时间(keepAliveTime)时:线程会判断如果当前运行的线程数大于corePoolSize,那么这个线程就会被停掉。
最经典的比喻是银行办理业务,可以很形象的去理解线程池七个核心参数之间的关系:
银行刚开始上班,然后又一位需要办理业务的顾客1进来,柜员1就来一个帮这位顾客1办理,其他在吃早餐摸鱼。这位顾客1办理完了业务就走了,柜员1坐在柜台上静等。
这时又有一位顾客2过来,这时不是柜员1去接待,而是叫上柜员2,因为这天安排上班的两位柜员(corePoolSize=2)。
顾客3来办理业务,因为上班的柜员已经全部就位了,柜员1有空,柜员1就去接待顾客3。
顾客4来办理业务,因为柜员1和柜员2都在忙,所以顾客4在凳子上静等。柜员2帮顾客2办理完了,就去给顾客4办理。这时凳子又空出3张(Queue容量为3)。
顾客5、顾客6、顾客7同时过来,柜员1和柜员2都在忙,顾客5、顾客6、顾客7就坐满了凳子。
过了一会,顾客8来了,柜员1和柜员2在忙同时没有凳子坐了,本着顾客就是上帝的原则,打电话叫来一个在休息的柜员3帮顾客8办理业务。
生意兴隆,顾客9来了,打电话叫来一个在休息的柜员4帮顾客9办理业务。
这是顾客10来了,一进门,柜员没有了,连休息中的也没有了(maximumPoolSize=4),银行直接赶走他。
过了一段时间,银行的客户都办完业务走了,那两位本来应该休息的柜员说:我们再等一下(keepAliveTime),不忙了我们就撤了。
看上面的场景中,搬救兵请休息中的柜员回来干活是很麻烦的,要先坐满凳子再说。线程池中也是,要等等待队列满了以后才会去创建核心线程数之外的线程,因为创建线程的花销是很大的。