线程池
- 1.线程池基本概念(了解)
- 1.1 什么是线程池
- 1.2 为什么使用线程池
- 1.3 线程池的优势
- 2.创建池的方式
- 3.线程池的工作原理(重点)
- 3.1 线程池的七大参数
- 3.2 线程池的四种拒绝策略
- AbortPolicy
- CallerRunsPolicy
- DiscardPolicy
- DiscardOldestPolicy
- 3.3 任务队列
- 4. 自定义线程池(代码实现)
1.线程池基本概念(了解)
1.1 什么是线程池
线程池其实就是一种多线程的处理方式,处理过程中可以将任务添加到队列里,然后在创建线程后自动启动这些任务,这里的线程就是之前介绍到的线程,这里所说的任务就是实现了Runnable
和Callable
接口的实例对象.
1.2 为什么使用线程池
使用线程池最大的原因就是可以根据系统的需求和硬件环境,灵活的控制线程的数量,且可以对所有线程进行统一的管理和控制,从而提高系统的运行效率,降低系统的运行压力…
1.3 线程池的优势
(1) 降低资源消耗: 通过重复利用已创建的线程降低创建和销毁的消耗.
(2) 提高响应速度: 当任务到达时,任务可以不需要等待线程创建就可以立即执行.
举例:假如创建线程用的时间为T1,执行任务用的时间为T2,销毁线程用的时间为T3,那么使用线程池就免去了T1和T3的时间;
(3) 提高线程的可管理性: 线程是稀缺资源,如果无限制的创建线程,不仅会消耗资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控.
(4)重用性: 线程和任务是分离的,提高线程的重用性.
2.创建池的方式
序号 | 创建方式 | 解析 |
---|---|---|
A | Executors.newSingleThreadExecutor(); | 创建只有一个线程的线程池 |
B | Executors.newFixedThreadPool(int nThreads); | 创建一个固定大小的线程池,容量为n |
C | Executors.newCachedThreadPool(); | 创建一个可伸缩的线程池,类似于链表上的结点,可以无限多 |
大家注意,像这种与并发有关系的API大多在java.util.concurrent
包里面
- 讲解A — Executors.newSingleThreadExecutor()
创建使用在无界队列上操作的单个工作线程的执行器。(但是请注意,如果这个单线程在关闭之前的执行过程中由于失败而终止,如果需要执行后续任务,将会有一个新的线程取代它。)任务保证按顺序执行,并且在任何给定时间活动的任务不超过一个。
return : 新创建的单线程执行器对象
源码:
public static ExecutorService newSingleThreadExecutor() {
//后面会讲到ThreadPoolExcutor
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, //核心线程数
1,//最大线程数
0L,//等待时间
TimeUnit.MILLISECONDS,//时间单位
new LinkedBlockingQueue<Runnable>()));//阻塞队列
}
- 讲解B — Executors.newFixedThreadPool(int nThreads)
创建一个线程池,该线程池重用在共享无界队列上操作的固定数量的线程。在任何时候,最多有nThreads线程处于活动状态,正在处理任务。如果在所有线程都处于活动状态时提交额外的任务,它们将在队列中等待,直到一个线程可用。如果任何线程在关闭之前的执行过程中由于失败而终止,如果需要执行后续任务,则会有一个新线程取代它的位置。在显式关闭池之前,池中的线程将一直存在。
参数:
nThreads——池中的线程数
return: 新创建的线程池
抛出异常的条件: IllegalArgumentException -如果nThreads <= 0
源码:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(
nThreads,//核心线程数
nThreads,//最大线程数(后面讲)
0L,//(先不用管)
TimeUnit.MILLISECONDS,//(不用管)
new LinkedBlockingQueue<Runnable>());//(阻塞队列)
}
- 讲解C — Executors.newCachedThreadPool()
创建一个线程池,该线程池根据需要创建新线程,在以前构造的线程可用时重用它们。这些池通常会提高执行许多短期异步任务的程序的性能。执行调用将重用先前构造的线程(如果可用)。如果没有可用的现有线程,将创建一个新线程并将其添加到池中。60秒内未使用的线程将被终止并从缓存中删除。因此,长时间保持空闲状态的池不会消耗任何资源。
源码:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0,//核心线程数
Integer.MAX_VALUE,//最大线程数
60L,//如果等待了该时间且还没有用到的线程就会释放
TimeUnit.SECONDS,//时间单位
new SynchronousQueue<Runnable>());//阻塞队列
}
大家先不要疑惑为什么返回值要用ExecutorService类接收,后面会讲.
3.线程池的工作原理(重点)
3.1 线程池的七大参数
类型 | 参数名 | 描述 |
---|---|---|
int | corePoolSize | 核心线程池大小 |
int | maximumPoolSize | 最核心大线程池大小 |
long | keepAliveTime | 超时时间 没有人使用会自动释放 |
TimeUnit | unit | 超时单位 |
BlockingQueue | workQueue | 阻塞队列 |
ThreadFactory | threadFactory | 线程工厂,创建线程的,一般不用动 |
RejectedExecutionHandler | handler | 拒绝策略 |
通过下面情景解析这些属性
public ThreadPoolExecutor(int corePoolSize,//核心线程数
int maximumPoolSize,//最大线程数
long keepAliveTime,//等待时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//阻塞队列
ThreadFactory threadFactory,//
RejectedExecutionHandler handler)//线程池的拒绝策略
//围绕这些参数看下面的故事
ExecutorService pool = new ThreadPoolExecutor(2,//核心线程数
4,//最大线程数
30L,//时间,L表示long类型
TimeUnit.MINUTES,//单位是分钟
new ArrayBlockingQueue<>(2),//阻塞队列
Executors.defaultThreadFactory(),//默认的工厂模式(不重要)
new ThreadPoolExecutor.DiscardPolicy())//(锁拒绝策略,这里选择的是丢弃最新任务)
银行开门后,有一个用户A(任务)来办理业务,此时由于没有客服(线程)接待,老板就把客服A喊来上班,当客服A正在给用户A服务时,又进来一位用户B,由于客服A正在忙,所以就把客服B喊来了.现在客服A,B都在为用户办理业务
此时又来了两位用户,但是客服已经都在忙了,没办法,经理只能从隔壁银行借了两位客服应急,这样才能给这两位新来的用户办理服务.
借来的员工就相当于创建的新线程,但是只能创建两个,因为上面有最大线程数量限制,此时4个员工代表4个线程,正好达到这个界限.
不久又来了两位用户,但是银行经理看见客服都在帮别人办理业务,于是便说,现在办理业务人比较多,您需要坐在凳子(阻塞队列)上稍等会
两个用户坐在凳子上就想当于阻塞队列内put了两个任务,由于阻塞队列容量为2,所以后续也无法容纳更多的用户.
当又来一位用户时,经理一看,现在正在办理业务的加上等待办理业务的已经有6位了,银行已经无法再给更多的人进行业务的办理,此时银行经理只能拒绝给这位用户办理业务.
核心线程数-2;最大核心线程数 - 4;阻塞队列 - 2 可容纳最大请求数 - 6
正式员工两位, 最多员工数量四位,椅子只能放两个,所以这家银行最多可以容纳六个用户.
当处理好所有用户之后,此时又过去了30min,还是没有新来的用户,现在就可以让客服C,D下班了,但是A,B还不可以下班,因为他是这个银行的正式员工.
员工:太惨了QAQ
参数规定30分钟内如果没有新的任务就结束多创建的线程.
30分钟还没有客服,所以给C,D两位借来的员工下班.
3.2 线程池的四种拒绝策略
拒绝策略 | 描述 | 白话 |
---|---|---|
new ThreadPoolExecutor.AbortPolicy() | 线程池默认拒绝策略,如果元素添加到线程池失败,会抛出RejectedExecutionException异常 | 劳资已经被你安排的工作压的喘不过气了,你还要再给我别的工作是吧,行,都别过了!!!直接掀桌子(抛异常) |
new ThreadPoolExecutor.CallerRunsPolicy() | 如果添加失败,那么主线程会自己调用执行器中的execute方法来执行任务 | 依旧是自己有很多的任务没完成,此时主管再给我安排任务我就直接跟他说,要做你做,我不做.(谁安排谁做) |
new ThreadPoolExecutor.DiscardPolicy() | 如果添加失败,丢弃最新的任务,也就是刚刚添加的任务,不抛出异常 | 这么多任务我都忙不过来了,你还要给我任务,我直接拒绝!!!(丢弃新添加的任务) |
new ThreadPoolExecutor.DiscardOldestPolicy() | 如果添加到线程池失败,会将队列中最早添加的元素移除,再尝试添加,如果失败则按该策略不断重试不抛异常 | 行,给我安排任务是吧,我做,但是我不能吃亏,我要把在我手里等待时间最久的任务丢弃.(丢弃最老的任务) |
下面的举例采用自定义线程池: 核心线程数 – 2,最大线程数 – 8,阻塞队列 – 2 ,可容纳最大请求数量 – 10
AbortPolicy
测试用例 4,10,11
CallerRunsPolicy
请求数设置为11,不超过最大容量都相同
DiscardPolicy
DiscardOldestPolicy
翻译:Oldest = 最古老的
3.3 任务队列
任务队列是基于阻塞队列实现的,即采用生产者消费者模式,在 Java 中需要实现 BlockingQueue 接口。但 Java 已经为我们提供了 7 种阻塞队列的实现:
- ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列(数组结构可配合指针实现一个环形队列)。
- LinkedBlockingQueue: 一个由链表结构组成的有界阻塞队列,在未指明容量时,容量默认为 Integer.MAX_VALUE。
- PriorityBlockingQueue: 一个支持优先级排序的无界阻塞队列,对元素没有要求,可以实现 Comparable 接口也可以提供 Comparator 来对队列中的元素进行比较。跟时间没有任何关系,仅仅是按照优先级取任务。
- DelayQueue:类似于PriorityBlockingQueue,是二叉堆实现的无界优先级阻塞队列。要求元素都实现 Delayed 接口,通过执行时间从队列中提取任务,时间没到任务取不出来。
- SynchronousQueue: 一个不存储元素的阻塞队列,消费者线程调用 take() 方法的时候就会发生阻塞,直到有一个生产者线程生产了一个元素,消费者线程就可以拿到这个元素并返回;生产者线程调用 put() 方法的时候也会发生阻塞,直到有一个消费者线程消费了一个元素,生产者才会返回。
- LinkedBlockingDeque: 使用双向队列实现的有界双端阻塞队列。双端意味着可以像普通队列一样 FIFO(先进先出),也可以像栈一样 FILO(先进后出)。
- LinkedTransferQueue: 它是ConcurrentLinkedQueue、LinkedBlockingQueue 和 SynchronousQueue 的结合体,但是把它用在 ThreadPoolExecutor 中,和 LinkedBlockingQueue 行为一致,但是是无界的阻塞队列。
4. 自定义线程池(代码实现)
既然了解了线程池,那么我们自己实现一个精简版的线程池.
写一个for循环,循环的边界由用户指定,每循环一次创建一个线程,在每个线程中使用while(true)来一直读取任务,只要有任务就取,取到任务之后就调用run(),注意:在创建好线程后还需要通过t.start()来启动该线程.
class MyThreadPool{
//自己实现线程池
//阻塞队列
BlockingDeque<Runnable> queue = new LinkedBlockingDeque<>();
public void submit(Runnable runnable) throws InterruptedException {
//添加到阻塞队列
queue.put(runnable);
}
public MyThreadPool (int p){
for (int i = 0; i < p; i++) {
Thread t = new Thread(()->{
try {
//有任务就取
while (true){
Runnable runnable = queue.take();
runnable.run();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t.start();
}
}
}