目录
一、线程池基础概念
常见的线程池类型:
创建线程池的例子:
注意事项:
二、线程池使用场景
三、JDK自带的构建线程池的方式
1 newFixedThreadPool
2 newSingleThreadExecutor
3 newCachedThreadPool
4 newScheduleThreadPool
5 newWorkStealingPool
一、线程池基础概念
Java 中的线程池是一种管理执行线程的方式。使用线程池可以有效地管理和控制活动线程的数量,提高系统的响应性和可用性。Java 提供了几个类和接口来帮助创建和管理线程池,这些主要位于 java.util.concurrent
包中。
常见的线程池类型:
-
FixedThreadPool:固定大小的线程池,线程数量固定,可以复用线程。如果一个任务等待的时间超过了最大队列长度,那么这个线程池将会阻塞,直到有线程可用或者拒绝该任务。
-
CachedThreadPool:可缓存的线程池,线程数量不定,但空闲的线程会被终止以减少内存消耗。当有新的任务到达时,已终止的线程会被重新创建并执行任务。
-
SingleThreadExecutor:单个后台线程执行器,只包含一个线程的线程池。所有的任务会在单个线程中顺序执行。
-
ScheduledThreadPool:定时的线程池,支持调度执行任务。它允许延迟初始化线程,并且可以设置固定的周期来重复执行任务。
创建线程池的例子:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExample {
public static void main(String[] args) {
// 创建一个固定大小为5的线程池
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
final int taskId = i;
// 提交任务给线程池
fixedThreadPool.submit(new Runnable() {
@Override
public void run() {
System.out.println("正在处理任务 " + taskId);
}
});
}
// 关闭线程池
fixedThreadPool.shutdown();
// 等待所有任务完成
while (!fixedThreadPool.isTerminated()) {
}
System.out.println("所有任务已完成");
}
}
注意事项:
-
当不再需要线程池时,应该调用
shutdown()
或shutdownNow()
方法来关闭线程池。shutdown()
允许已经提交的任务被执行完毕后关闭线程池,而shutdownNow()
尝试立即停止所有正在执行的任务,并返回尚未执行的任务列表。 -
要合理设置线程池中的线程数,过多的线程可能导致系统资源耗尽,过少则可能无法充分利用系统资源。
-
在设计任务执行策略时,要考虑线程池的饱和度和任务排队机制,以避免因为任务积压而导致性能下降或服务不可用。
二、线程池使用场景
为什么要使用线程池
在开发中,为了提升效率的操作,我们需要将一些业务采用多线程的方式去执行。
比如有一个比较大的任务,可以将任务分成几块,分别交给几个线程去执行,最终做一个汇总就可以了。
比如做业务操作时,需要发送短信或者是发送邮件,这种操作也可以基于异步的方式完成,这种异步的方式,其实就是再构建一个线程去执行。
但是,如果每次异步操作或者多线程操作都需要新创建一个线程,使用完毕后,线程再被销毁,这样的话,对系统造成一些额外的开销。在处理过程中到底由多线程处理了多少个任务,以及每个线程的开销无法统计和管理。
所以咱们需要一个线程池机制来管理这些内容。线程池的概念和连接池类似,都是在一个Java的集合中存储大量的线程对象,每次需要执行异步操作或者多线程操作时,不需要重新创建线程,直接从集合中拿到线程对象直接执行方法就可以了。
JDK中就提供了线程池的类。
在线程池构建初期,可以将任务提交到线程池中。会根据一定的机制来异步执行这个任务。
-
可能任务直接被执行
-
任务可以暂时被存储起来了。等到有空闲线程再来处理。
-
任务也可能被拒绝,无法被执行。
JDK提供的线程池中记录了每个线程处理了多少个任务,以及整个线程池处理了多少个任务。同时还可以针对任务执行前后做一些勾子函数的实现。可以在任务执行前后做一些日志信息,这样可以多记录信息方便后面统计线程池执行任务时的一些内容参数等等……
三、JDK自带的构建线程池的方式
JDK中基于Executors提供了很多种线程池
1 newFixedThreadPool
这个线程池的特别是线程数是固定的。
在Executors中第一个方法就是构建newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
构建时,需要给newFixedThreadPool方法提供一个nThreads的属性,而这个属性其实就是当前线程池中线程的个数。当前线程池的本质其实就是使用ThreadPoolExecutor。
构建好当前线程池后,线程个数已经固定好**(线程是懒加载,在构建之初,线程并没有构建出来,而是随着任务的提交才会将线程在线程池中构建出来)**。如果线程没构建,线程会待着任务执行被创建和执行。如果线程都已经构建好了,此时任务会被放到LinkedBlockingQueue无界队列中存放,等待线程从LinkedBlockingQueue中去take出任务,然后执行。
测试功能效果
public static void main(String[] args) throws Exception {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
threadPool.execute(() -> {
System.out.println("1号任务:" + Thread.currentThread().getName() + System.currentTimeMillis());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
threadPool.execute(() -> {
System.out.println("2号任务:" + Thread.currentThread().getName() + System.currentTimeMillis());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
threadPool.execute(() -> {
System.out.println("3号任务:" + Thread.currentThread().getName() + System.currentTimeMillis());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
2 newSingleThreadExecutor
这个线程池看名字就知道是单例线程池,线程池中只有一个工作线程在处理任务
如果业务涉及到顺序消费,可以采用newSingleThreadExecutor
// 当前这里就是构建单例线程池的方式
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
// 在内部依然是构建了ThreadPoolExecutor,设置的线程个数为1
// 当任务投递过来后,第一个任务会被工作线程处理,后续的任务会被扔到阻塞队列中
// 投递到阻塞队列中任务的顺序,就是工作线程处理的顺序
// 当前这种线程池可以用作顺序处理的一些业务中
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
static class FinalizableDelegatedExecutorService extends DelegatedExecutorService {
// 线程池的使用没有区别,跟正常的ThreadPoolExecutor没区别
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
// finalize是当前对象被GC干掉之前要执行的方法
// 当前FinalizableDelegatedExecutorService的目的是为了在当前线程池被GC回收之前
// 可以执行shutdown,shutdown方法是将当前线程池停止,并且干掉工作线程
// 但是不能基于这种方式保证线程池一定会执行shutdown
// finalize在执行时,是守护线程,这种线程无法保证一定可以执行完毕。
// 在使用线程池时,如果线程池是基于一个业务构建的,在使用完毕之后,一定要手动执行shutdown,
// 否则会造成JVM中一堆线程protected void finalize() {
super.shutdown();
}
}
测试单例线程池效果:
public static void main(String[] args) throws Exception {
ExecutorService threadPool = Executors.newSingleThreadExecutor();
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "," + "111");
});
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "," + "222");
});
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "," + "333");
});
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "," + "444");
});
}
测试线程池使用完毕后,不执行shutdown的后果:
如果是局部变量仅限当前线程池使用的线程池,在使用完毕之后要记得执行shutdown,避免线程无法结束
如果是全局的线程池,很多业务都会到,使用完毕后不要shutdown,因为其他业务也要执行当前线程池
static ExecutorService threadPool = Executors.newFixedThreadPool(200);
public static void main(String[] args) throws Exception {
newThreadPool();
System.gc();
Thread.sleep(5000);
System.out.println("线程池被回收了!!");
System.in.read();
}
private static void newThreadPool(){
for (int i = 0; i < 200; i++) {
final int a = i;
threadPool.execute(() -> {
System.out.println(a);
});
}
threadPool.shutdown();
for (int i = 0; i < 200; i++) {
final int a = i;
threadPool.execute(() -> {
System.out.println(a);
});
}
}
3 newCachedThreadPool
看名字好像是一个缓存的线程池,查看一下构建的方式
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
当第一次提交任务到线程池时,会直接构建一个工作线程
这个工作线程带执行完后,60秒没有任务可以执行后,会结束
如果在等待60秒期间有任务进来,他会再次拿到这个任务去执行
如果后续提升任务时,没有线程是空闲的,那么就构建工作线程去执行。
最大的一个特点,任务只要提交到当前的newCachedThreadPool中,就必然有工作线程可以处理
代码测试效果
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 1; i <= 200; i++) {
final int j = i;
executorService.execute(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ":" + j);
});
}
}
4 newScheduleThreadPool
首先看到名字就可以猜到当前线程池是一个定时任务的线程池,而这个线程池就是可以以一定周期去执行一个任务,或者是延迟多久执行一个任务一次
查看一下如何构建的。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
基于这个方法可以看到,构建的是ScheduledThreadPoolExecutor线程池
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor{
//....
}
所以本质上还是正常线程池,只不过在原来的线程池基础上实现了定时任务的功能
原理是基于DelayQueue实现的延迟执行。周期性执行是任务执行完毕后,再次扔回到阻塞队列。
代码查看使用的方式和效果
public static void main(String[] args) throws Exception {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
// 正常执行
// pool.execute(() -> {
// System.out.println(Thread.currentThread().getName() + ":1");
// });
// 延迟执行,执行当前任务延迟5s后再执行
// pool.schedule(() -> {
// System.out.println(Thread.currentThread().getName() + ":2");
// },5,TimeUnit.SECONDS);
// 周期执行,当前任务第一次延迟5s执行,然后每3s执行一次
// 这个方法在计算下次执行时间时,是从任务刚刚开始时就计算。
// pool.scheduleAtFixedRate(() -> {
// try {
// Thread.sleep(3000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// System.out.println(System.currentTimeMillis() + ":3");
// },2,1,TimeUnit.SECONDS);
// 周期执行,当前任务第一次延迟5s执行,然后每3s执行一次
// 这个方法在计算下次执行时间时,会等待任务结束后,再计算时间
pool.scheduleWithFixedDelay(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis() + ":3");
},2,1,TimeUnit.SECONDS);
}
至于Executors提供的newSingleThreadScheduledExecutor单例的定时任务线程池就不说了。
一个线程的线程池可以延迟或者以一定的周期执行一个任务。
5 newWorkStealingPool
当前JDK提供构建线程池的方式newWorkStealingPool和之前的线程池很非常大的区别
之前定长,单例,缓存,定时任务都基于ThreadPoolExecutor去实现的。
newWorkStealingPool是基于ForkJoinPool构建出来的
ThreadPoolExecutor的核心点:
在ThreadPoolExecutor中只有一个阻塞队列存放当前任务
ForkJoinPool的核心特点:
ForkJoinPool从名字上就能看出一些东西。当有一个特别大的任务时,如果采用上述方式,这个大任务只能会某一个线程去执行。ForkJoin第一个特点是可以将一个大任务拆分成多个小任务,放到当前线程的阻塞队列中。其他的空闲线程就可以去处理有任务的线程的阻塞队列中的任务
来一个比较大的数组,里面存满值,计算总和
单线程处理一个任务:
/** 非常大的数组 */static int[] nums = new int[1_000_000_000];
// 填充值static{
for (int i = 0; i < nums.length; i++) {
nums[i] = (int) ((Math.random()) * 1000);
}
}
public static void main(String[] args) {
// ===================单线程累加10亿数据================================
System.out.println("单线程计算数组总和!");
long start = System.nanoTime();
int sum = 0;
for (int num : nums) {
sum += num;
}
long end = System.nanoTime();
System.out.println("单线程运算结果为:" + sum + ",计算时间为:" + (end - start));
}
多线程分而治之的方式处理:
/** 非常大的数组 */
static int[] nums = new int[1_000_000_000];
// 填充值
static{
for (int i = 0; i < nums.length; i++) {
nums[i] = (int) ((Math.random()) * 1000);
}
}
public static void main(String[] args) {
// ===================单线程累加10亿数据================================
System.out.println("单线程计算数组总和!");
long start = System.nanoTime();
int sum = 0;
for (int num : nums) {
sum += num;
}
long end = System.nanoTime();
System.out.println("单线程运算结果为:" + sum + ",计算时间为:" + (end - start));
// ===================多线程分而治之累加10亿数据================================
// 在使用forkJoinPool时,不推荐使用Runnable和Callable
// 可以使用提供的另外两种任务的描述方式
// Runnable(没有返回结果) -> RecursiveAction
// Callable(有返回结果) -> RecursiveTask
ForkJoinPool forkJoinPool = (ForkJoinPool) Executors.newWorkStealingPool();
System.out.println("分而治之计算数组总和!");
long forkJoinStart = System.nanoTime();
ForkJoinTask<Integer> task = forkJoinPool.submit(new SumRecursiveTask(0, nums.length - 1));
Integer result = task.join();
long forkJoinEnd = System.nanoTime();
System.out.println("分而治之运算结果为:" + result + ",计算时间为:" + (forkJoinEnd - forkJoinStart));
}
private static class SumRecursiveTask extends RecursiveTask<Integer>{
/** 指定一个线程处理哪个位置的数据 */
private int start,end;
private final int MAX_STRIDE = 100_000_000;
// 200_000_000: 147964900
// 100_000_000: 145942100
public SumRecursiveTask(int start, int end) {
this.start = start;
this.end = end;
}
@Overrideprotected Integer compute() {
// 在这个方法中,需要设置好任务拆分的逻辑以及聚合的逻辑
int sum = 0;
int stride = end - start;
if(stride <= MAX_STRIDE){
// 可以处理任务
for (int i = start; i <= end; i++) {
sum += nums[i];
}
}else{
// 将任务拆分,分而治之。
int middle = (start + end) / 2;
// 声明为2个任务
SumRecursiveTask left = new SumRecursiveTask(start, middle);
SumRecursiveTask right = new SumRecursiveTask(middle + 1, end);
// 分别执行两个任务
left.fork();
right.fork();
// 等待结果,并且获取sum
sum = left.join() + right.join();
}
return sum;
}
}
最终可以发现,这种累加的操作中,采用分而治之的方式效率提升了2倍多。
但是也不是所有任务都能拆分提升效率,首先任务得大,耗时要长。