1.概念
顾名思义,线程池就是管理一系列线程的资源池,其提供了一种限制和管理线程资源的方式。每个线程池还维护一些基本统计信息,例如已完成任务的数量。
这里借用《Java 并发编程的艺术》书中的部分内容来总结一下使用线程池的好处:
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
线程池一般用于执行多个不相关联的耗时任务,没有多线程的情况下,任务顺序执行,使用了线程池的话可让多个不相关联的任务同时执行。
2.线程池的构造
ThreadPoolExecutor是线程池的核心实现类,在JDK1.5引入,位于java.util.concurrent包,由Doug Lea完成。
- Executors.newFixedThreadPool:创建一个固定大小的线程池,可控制并发的线程数,超出的线程会在队列中等待。
- Executors.newCachedThreadPool:创建一个可缓存的线程池,若线程数超过处理所需,缓存一段时间后会回收,若线程数不够,则新建线程。
- Executors.newSingleThreadExecutor:创建单个线程数的线程池,它可以保证先进先出的执行顺序。
- Executors.newScheduledThreadPool:创建一个可以执行延迟任务的线程池。
- Executors.newSingleThreadScheduledExecutor:创建一个单线程的可以执行延迟任务的线程池。
- Executors.newWorkStealingPool:创建一个抢占式执行的线程池(任务执行顺序不确定)【JDK 1.8 添加】。
- ThreadPoolExecutor:手动创建线程池的方式,它创建时最多可以设置 7 个参数。
Executor接口
public interface Executor {
// 该接口中只定义了一个Runnable作为入参的execute方法
void execute(Runnable command);
}
查看Executor接口的实现类图
Executor
线程池相关顶级接口,它将任务的提交与任务的执行分离开来ExecutorService
继承并扩展了Executor接口,提供了Runnable、FutureTask等主要线程实现接口扩展ThreadPoolExecutor
是线程池的核心实现类,用来执行被提交的任务ScheduledExecutorService
继承ExecutorService
接口,并定义延迟或定期执行的方法ScheduledThreadPoolExecutor
继承ThreadPoolExecutor
并实现了ScheduledExecutorService
接口,是延时执行类任务的主要实现
1、newCachedThreadPool
创建一个可缓存的线程池,若线程数超过处理所需,缓存一段时间后会回收,若线程数不够,则新建线程。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CachedThreadPoolExample {
public static void main(String[] args) {
// 创建一个可缓存的线程池
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// 提交任务到线程池
for (int i = 0; i < 5; i++) {
final int taskNumber = i;
cachedThreadPool.submit(new Runnable() {
@Override
public void run() {
System.out.println("执行任务: " + taskNumber + " 由线程 " + Thread.currentThread().getName() + " 处理");
try {
// 模拟任务执行时间
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
// 关闭线程池,不接受新任务,已提交的任务继续执行
cachedThreadPool.shutdown();
// 等待所有任务完成
while (!cachedThreadPool.isTerminated()) {
// 可以在此处做其他工作,或者只是等待
}
System.out.println("所有任务已完成");
}
}
运行结果:
执行任务: 1 由线程 pool-1-thread-2 处理
执行任务: 3 由线程 pool-1-thread-4 处理
执行任务: 2 由线程 pool-1-thread-3 处理
执行任务: 4 由线程 pool-1-thread-5 处理
执行任务: 0 由线程 pool-1-thread-1 处理
所有任务已完成
在这个例子中,我们创建了一个可缓存的线程池,并提交了5个任务。每个任务简单地打印出自己的任务编号和执行它的线程名称,然后休眠1秒钟来模拟工作负载。
在提交所有任务后,我们调用`shutdown()`方法来关闭线程池。这将导致线程池不再接受新任务,但是已经提交的任务会继续执行直到完成。然后我们使用一个循环来等待所有任务完成。当所有任务都执行完毕,线程池中的线程如果没有新的任务将会被回收。
2、newFixedThreadPool
创建一个固定大小的线程池,可控制并发的线程数,超出的线程会在队列中等待。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FixedThreadPoolExample {
public static void main(String[] args) {
// 创建一个固定大小为5的线程池
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
// 提交任务到线程池
for (int i = 0; i < 10; i++) {
final int taskNumber = i;
fixedThreadPool.submit(() -> {
System.out.println("执行任务: " + taskNumber + " 由线程 " + Thread.currentThread().getName() + " 处理");
try {
// 模拟任务执行时间
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// 关闭线程池,不接受新任务,已提交的任务继续执行
fixedThreadPool.shutdown();
// 等待所有任务完成
while (!fixedThreadPool.isTerminated()) {
// 可以在此处做其他工作,或者只是等待
}
System.out.println("所有任务已完成");
}
}
运行结果:
执行任务: 0 由线程 pool-1-thread-1 处理
执行任务: 4 由线程 pool-1-thread-5 处理
执行任务: 3 由线程 pool-1-thread-4 处理
执行任务: 2 由线程 pool-1-thread-3 处理
执行任务: 1 由线程 pool-1-thread-2 处理
执行任务: 5 由线程 pool-1-thread-1 处理
执行任务: 7 由线程 pool-1-thread-2 处理
执行任务: 6 由线程 pool-1-thread-3 处理
执行任务: 8 由线程 pool-1-thread-4 处理
执行任务: 9 由线程 pool-1-thread-5 处理
所有任务已完成
在这个例子中:
- 我们创建了一个固定大小为5的线程池,这意味着最多只有5个线程同时运行。
- 提交了10个任务到线程池,由于线程池的大小限制,最多只有5个任务会同时运行,其余的任务会等待直到有线程空闲出来。
- 使用 `shutdown()` 方法关闭线程池,这表示不再接受新任务,但已提交的任务会继续执行直到完成。
- 使用一个循环等待所有任务完成,这是通过检查 `isTerminated()` 方法实现的,该方法在所有任务执行完毕后返回 `true`。
3、newScheduledThreadPool
创建一个周期性的线程池,支持定时及周期性执行任务。
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduledThreadPoolExample {
public static void main(String[] args) {
// 创建一个具有固定线程数量的ScheduledThreadPoolExecutor
ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(5);
// 定义一个Runnable任务
Runnable task = new Runnable() {
@Override
public void run() {
System.out.println("任务执行时间: " + System.currentTimeMillis());
}
};
// 定时执行任务:延迟2秒后开始执行,之后每隔1秒执行一次
scheduledExecutor.scheduleAtFixedRate(task, 2, 1, TimeUnit.SECONDS);
// 定时执行任务:在指定的延迟后执行一次
scheduledExecutor.schedule(new Runnable() {
@Override
public void run() {
System.out.println("一次性任务执行时间: " + System.currentTimeMillis());
}
}, 5, TimeUnit.SECONDS);
// 取消定时任务,5秒后执行的任务不会被执行
// scheduledExecutor.schedule(new Runnable() {...}, 5, TimeUnit.SECONDS).cancel(true);
// 程序运行一段时间后关闭线程池
try {
Thread.sleep(10 * 1000); // 让程序运行10秒
} catch (InterruptedException e) {
e.printStackTrace();
}
scheduledExecutor.shutdown();
}
}
运行结果:
任务执行时间: 1725289960007
任务执行时间: 1725289961008
任务执行时间: 1725289962006
一次性任务执行时间: 1725289963008
任务执行时间: 1725289963008
任务执行时间: 1725289964003
任务执行时间: 1725289965001
任务执行时间: 1725289966000
任务执行时间: 1725289966996
任务执行时间: 1725289967994
在这个例子中,我们创建了一个拥有5个线程的 `ScheduledThreadPoolExecutor`。我们定义了一个简单的 `Runnable` 任务,它会在每次执行时打印当前的时间戳。我们使用 `scheduleAtFixedRate` 方法来周期性地执行这个任务,每隔1秒执行一次,从延迟2秒后开始。我们还使用 `schedule` 方法来安排一个在5秒后只执行一次的任务。
最后,我们在程序运行10秒后关闭线程池,以确保程序能够正常结束。如果需要取消某个定时任务,可以使用 `cancel` 方法。
4、newSingleThreadExecutor
创建一个单线程的线程池,可保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
package Text;
import java.util.concurrent.*;
public class ScheduledThreadPoolExample {
public static void main(String[] args) {
ExecutorService exector =Executors.newSingleThreadExecutor();
//提交任务给线程执行
exector.execute(new Runnable() {
@Override
public void run() {
System.out.println("任务1开始执行");
try{
Thread.sleep(2000);//模拟任务执行时间
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务一执行完毕");
}
});
exector.execute(new Runnable() {
@Override
public void run() {
System.out.println("任务2开始执行");
try{
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务2执行完毕");
}
});
exector.shutdown();
}
}
运行结果:
任务1开始执行
任务一执行完毕
任务2开始执行
任务2执行完毕
在这个例子中,两个任务将按照它们提交的顺序执行。由于是单线程,所以任务2会在任务1完成后才开始执行。
如果你想按照优先级来执行任务,你可以使用 `PriorityBlockingQueue` 作为线程池的队列,或者在提交任务时使用 `Callable` 接口和 `Future` 类来设置优先级。不过,`newSingleThreadExecutor` 默认不支持优先级队列,它只是简单地按照任务提交的顺序来执行。如果你需要优先级队列,你可能需要自定义线程池或者使用 `PriorityBlockingQueue` 作为队列来创建线程池。
二、通过ThreadPoolExecutor类自定义。
ThreadPoolExecutor类提供了4种构造方法,可根据需要来自定义一个线程池。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 省略...
}
1、共7个参数如下:
(1)corePoolSize:核心线程数,线程池中始终存活的线程数。
(2)maximumPoolSize: 最大线程数,线程池中允许的最大线程数。
(3)keepAliveTime: 存活时间,线程没有任务执行时最多保持多久时间会终止。
(4)unit: 单位,参数keepAliveTime的时间单位,7种可选。
(5)workQueue: 一个阻塞队列,用来存储等待执行的任务,均为线程安全,7种可选。
较常用的是LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。
(6)threadFactory: 线程工厂,主要用来创建线程,默及正常优先级、非守护线程。
(7)handler:拒绝策略,拒绝处理任务时的策略,4种可选,默认为AbortPolicy。
3.线程池的运行原理
说完线程池的核心构造参数,接下来来讲解这些参数在线程池中是如何工作的。
线程池刚创建出来是什么样子呢,如下图:
没错,刚创建出来的线程池中只有一个构造时传入的阻塞队列,里面并没有线程,如果想要在执行之前创建好核心线程数,可以调用 prestartAllCoreThreads 方法来实现,默认是没有线程的。
当有线程通过 execute 方法提交了一个任务,会发生什么呢?
首先会去判断当前线程池的线程数是否小于核心线程数,也就是线程池构造时传入的参数 corePoolSize。
如果小于,那么就直接通过 ThreadFactory 创建一个线程来执行这个任务,如图
当任务执行完之后,线程不会退出,而是会去阻塞队列中获取任务,如下图
接下来如果又提交了一个任务,也会按照上述的步骤去判断是否小于核心线程数,如果小于,还是会创建线程来执行任务,执行完之后也会从阻塞队列中获取任务。
这里有个细节,就是提交任务的时候,就算有线程池里的线程从阻塞队列中获取不到任务,如果线程池里的线程数还是小于核心线程数,那么依然会继续创建线程,而不是复用已有的线程。
如果线程池里的线程数不再小于核心线程数呢?那么此时就会尝试将任务放入阻塞队列中,入队成功之后,如图
这样,阻塞的线程就可以获取到任务了。
但是,随着任务越来越多,队列已经满了,任务放入失败,怎么办呢?
此时会判断当前线程池里的线程数是否小于最大线程数,也就是入参时的 maximumPoolSize 参数
如果小于最大线程数,那么也会创建非核心线程来执行提交的任务,如图
所以,就算队列中有任务,新创建的线程还是会优先处理这个提交的任务,而不是从队列中获取已有的任务执行,从这可以看出,先提交的任务不一定先执行。
假如线程数已经达到最大线程数量,怎么办呢?
此时就会执行拒绝策略,也就是构造线程池的时候,传入的 RejectedExecutionHandler 对象,来处理这个任务。
JDK 自带的 RejectedExecutionHandler 实现有 4 种
- AbortPolicy:丢弃任务,抛出运行时异常
- CallerRunsPolicy:由提交任务的线程来执行任务
- DiscardPolicy:丢弃这个任务,但是不抛异常
- DiscardOldestPolicy:从队列中剔除最先进入队列的任务,然后再次提交任务
线程池创建的时候,如果不指定拒绝策略就默认是 AbortPolicy 策略。
当然,你也可以自己实现 RejectedExecutionHandler 接口,比如将任务存在数据库或者缓存中,这样就可以从数据库或者缓存中获取被拒绝掉的任务了。
到这里,我们发现,线程池构造的几个参数 corePoolSize、maximumPoolSize、workQueue、threadFactory、handler 我们都在上述的执行过程中讲到了,那么还差两个参数 keepAliveTime 和 unit(unit 是 keepAliveTime 的时间单位)没讲到,所以 keepAliveTime 是如何起作用的呢,这个问题留到后面分析。
说完整个执行的流程,接下来看看 execute 方法的代码是如何实现的。
public void execute(Runnable command) {
// 首先检查提交的任务是否为null,是的话则抛出NullPointerException。
if (command == null)
throw new NullPointerException();
// 获取线程池的当前状态(ctl是一个AtomicInteger,其中包含了线程池状态和工作线程数)
int c = ctl.get();
// 1. 检查当前运行的工作线程数是否少于核心线程数(corePoolSize)
if (workerCountOf(c) < corePoolSize) {
// 如果少于核心线程数,尝试添加一个新的工作线程来执行提交的任务
// addWorker方法会检查线程池状态和工作线程数,并决定是否真的添加新线程
if (addWorker(command, true))
return;
// 重新获取线程池的状态,因为在尝试添加线程的过程中线程池的状态可能已经发生变化
c = ctl.get();
}
// 2. 尝试将任务添加到任务队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 双重检查线程池的状态
if (! isRunning(recheck) && remove(command)) // 如果线程池已经停止,从队列中移除任务
reject(command);
// 如果线程池正在运行,但是工作线程数为0,尝试添加一个新的工作线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 如果任务队列满了,尝试添加一个新的非核心工作线程来执行任务
else if (!addWorker(command, false))
// 如果无法添加新的工作线程(可能因为线程池已经停止或者达到最大线程数限制),则拒绝任务
reject(command);
}