个人博客地址:
http://xiaohe-blog.top/index.php/archives/14/
文章目录
- 1. 为什么要使用线程池
- 2. Executor
- 3. ThreadPoolExecutor
- 3.1 七个参数
- 3.2 任务队列
- 3.3 拒绝策略
- 4. 创建线程池
- 5. Executors
- 5.1 CachedThreadPool
- 5.2 FixedThreadPool
- 5.3 SingleThreadExecutor
- 5.4 ScheduledThreadPoolExecutor
- 6. ExecutorService
1. 为什么要使用线程池
复习一下创建线程的几种方式:
- 继承Thread
- 实现Runnable
- 实现Callable
但是如果频繁的创建/销毁线程,就会造成资源浪费。这时候就需要将线程创建好之后存起来,以后要用取出来,用完后再放回去。
注意 :线程池并不是一种新的线程创建方法,它也是靠上面的方式创建线程的,只不过它能保存线程,实现线程资源的重复利用。
为什么要学线程池?效率高?牛逼?错,面试喜欢问😁
2. Executor
创建之前首先要来认识一下它 :Executor
。
Executor
是线程池的根接口
但是使用时大部分使用ThreadPoolExecutor
。
3. ThreadPoolExecutor
3.1 七个参数
在创建线程池之前先来看看它的7
个参数:
new ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数类型 | 参数名 | 解释 |
---|---|---|
int | corePoolSize | 核心线程数量 |
int | maximumPoolSize | 最大线程数 |
long | keepAliveTime | 最大空闲时间 |
TimeUnit | unit | 时间单位 |
BlockingQueue | workQueue | 任务队列 |
ThreadFactory | threadFactory | 线程工厂 |
RejectedExecutionHandler | handler | 饱和处理机制 |
-
corePoolSize
线程池核心线程数。
默认情况下核心线程会一直存活,即使处于闲置状态也不会受存
keepAliveTime
限制,除非将allowCoreThreadTimeOut
设置为true
。 -
maximumPoolSize
线程池所能容纳的最大线程数。超过
maximumPoolSize
的线程将被阻塞。最大线程数
maximumPoolSize
不能小于corePoolSize
-
keepAliveTime
临时线程的闲置超时时间。超过这个时间临时线程就会被回收。
即 :核心线程被创建后一直存活,临时线程如果在闲置时间(keepAliveTime)内没有接到新任务会被销毁。
-
TimeUnit
keepAliveTime
的时间单位,如TimeUnit.SECONDS。表示:keepAliveTime
的单位是秒 -
workQueue
线程池中的任务队列。
没有获得线程资源的任务将会被放入
workQueue
,等待线程资源被释放。-
如果核心线程数满了,新来的任务会首先放入任务队列
-
如果任务队列满了,会额外创建临时线程完成任务
-
如果线程数到达最大线程数时任务队列还是满的,多余的任务将由
RejectedExecutionHandler
的拒绝策略进行处理。
常用的有三种队列:
SynchronousQueue
,LinkedBlockingDeque
,ArrayBlockingQueue
。 -
-
threadFactory
提供创建新线程功能的线程工厂。通过
threadFactory
程序员可以自定义线程池中线程的创建方法。ThreadFactory
是一个接口,只有一个newThread
方法:Thread newThread(Runnable r);
-
rejectedExecutionHandler
拒绝策略,无法被线程池处理的任务的处理器。
一般是因为任务数超出了
workQueue
的容量。
3.2 任务队列
常用的三个队列 :
SynchronousQueue
:无界缓冲等待队列。没有容量,是不储存元素的阻塞队列,会直接将任务交给线程,必须等队列中的添加元素表内消费后才能继续添加新的元素。即 :每次只能装一个任务,被线程拿走后才能继续接收任务。使用SynchronousQueue阻塞队列一般要求最大线程数为无界,避免线程拒绝执行操作。
LinkedBlockingDeque
:无界缓冲等待队列。当前执行的线程数量达到核心线程数时,剩余的元素会在阻塞队列里等待,即一次性拿走所有任务,有多少新来的也能装入,等待线程一个一个执行。(该队列默认大小为Integer.MAX_VALUE)
ArrayBlockingQueue
:有界缓存等待队列。有容量,可以指定缓存队列的大小。只能装指定容量的任务,被线程拿走后才能继续装。
3.3 拒绝策略
常用的四个拒绝策略 :(它们都是ThreadPoolExecutor的内部类)
AbortPolicy
:丢弃多余任务,抛出异常。默认策略。
CallerRunsPolicy
:不抛弃任务,创建线程池的线程帮忙执行任务(例如main)。
DiscardPolicy
:丢弃新任务,不抛出异常。假如新来5个任务想进入队列,丢弃他们。
DiscardOldestPolicy
:丢弃旧任务,不抛出异常。假如新来5个任务想进入队列,将任务队列中的前五个丢弃,新来的进入队列。
4. 创建线程池
现在就用刚才所学的知识创建一个线程池并让它执行一些任务。
首先创建一个任务类,处理响应任务
public class Task implements Runnable{
public String name;
public Task(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println("线程: " + name + " 执行任务");
}
}
紧接着就可以创建线程池,这里我们创建一个 :
核心线程数为2,最大线程数为5,存活时间为10s,队列使用ArrayBlockingQueue,拒绝策略使用CallerRunsPolicy(创建线程池的线程帮忙执行多余任务)
让它处理20个任务 :
public class Main {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2,
5,
10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(5),
new ThreadPoolExecutor.CallerRunsPolicy()
);
for (int i = 0; i < 20; i++) {
try {
executor.execute(new Task(i + ""));
} catch (Throwable e) {
System.out.println("丢弃任务: " + (i));
}
}
}
}
执行后可以看到: main确实帮忙处理任务了。
但是也可以注意到,任务已经执行结束,但是程序还未关闭,这就证明线程池并不会主动关闭,需要我们调用executor.shutdown()
关闭并回收线程池。
5. Executors
Java提供了Executors
工具类来创建线程池,注意它只是一个工具类,就像Strings是String的工具类、Objects是Object的工具类一样。Executors创建的线程池都是ExecutorService
的实现类。
但是Executors
这种方式创建的线程池是JDK提供的,有很多细节无法把控,所以阿里规范禁止使用Executors
来创建线程池。
【强制】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:
Executors返回的线程池对象的弊端如下:
1)FixedThreadPool和SingleThreadPool:允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
2)CachedThreadPool和ScheduledThreadPool:允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。
但是这里依旧要说一下这几种线程池,毕竟是官方封装的。而且后面自己写demo也可以直接用。
这几种线程池也是通过new ThreadPoolExecutor() 创建的,只不过其中的参数不同,所实现的功能也不同。
5.1 CachedThreadPool
Executors.newCachedThreadPool()
可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
核心线程数为0,任务队列只能装一个任务。也就是 只要来一个任务就创建一个线程处理它,来多少任务就创建多少线程。这些线程有60s的存活时间,如果接不到新任务就会被回收。
比起手动new线程,这个线程池可以实现线程复用、线程超时回收。但是可能创建太多线程造成资源浪费。
5.2 FixedThreadPool
Executors.newFixedThreadPool(int nThread)
定长线程池,可控制最大并发线程数量,超出的线程会在队列中等待。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
核心线程数 = 最大线程数,核心线程数满了之后任务在队列中等待,队列满了之后直接执行拒绝策略,而不是创建临时线程,因为没有线程。
5.3 SingleThreadExecutor
Executors.newSingleThreadExecutor()
单例线程池 :只会用唯一的线程工作,保证所有任务按照FIFL、LIFO优先级执行。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
核心线程数 = 最大线程数 = 1,只有一个线程,所有任务都由它处理,任务队列LinkedBlockingQueue可以一次性装很多任务,这些任务排着队等待唯一的线程执行。
5.4 ScheduledThreadPoolExecutor
调度线程池,定长,支持定时及周期性任务执行,延迟任务执行。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
------
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
可以指定核心线程数,最大线程数是Integer.MAX_VALUE,可以实现延时执行、周期执行。
延时执行 :
delay 延时时间。
// 参数 : 任务 + 时间 + 时间单位
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
周期执行 :
initialDelay 第一次执行的延时时间
period 每一次执行的间隔时间。
// 参数 : 任务 + 时间 + 时间单位
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
6. ExecutorService
刚刚说了,Executors创建的都是ExecutorService的子类,那么现在来使用一下ExecutorService完成任务。
先来看看它拥有的方法:
- shutdown :任务执行完毕后销毁线程池。
- shutdownNow :立即销毁线程池。
- invokeAll :执行所有任务。
- submit :提交并执行任务。
public class TestExecutorService {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
executorService.submit(() -> {
System.out.println(Thread.currentThread().getName() + " 执行");
});
}
System.out.println(executorService.isShutdown());
executorService.shutdown();
System.out.println(executorService.isShutdown());
}
}
执行结果为 :
pool-1-thread-2 执行
pool-1-thread-5 执行
pool-1-thread-6 执行
pool-1-thread-4 执行
pool-1-thread-1 执行
pool-1-thread-3 执行
false
pool-1-thread-8 执行
pool-1-thread-7 执行
pool-1-thread-9 执行
pool-1-thread-4 执行
true