文章目录
- 线程池
- 什么是线程池
- 线程池优点:
- 线程复用技术
- 线程池的实现原理是什么
- 线程池执行任务的流程?
- 线程池如何知道一个线程的任务已经执行完成
- 线程池的核心参数
- 拒绝策略
- 线程池类型(常用线程池)
- 阻塞队列
- 执行execute()方法和submit()方法的区别
- 代码
- 进行优化
- 线程池代码描述
——————————————————————————————————
线程池
什么是线程池
- 首先,线程池本质上是一种池化技术,而池化技术是一种资源复用的思想,比较常见的有连接池、内存池、对象池。
- 而线程池里面复用的是线程资源。
- 线程池就是创建若干个可执行的线程放入一个池中,有任务需要处理时,会提交到线程池中的任务队列,处理完之后线程并不会被销毁,而是仍然在线程池中等待下一个任务。
- 自己实现比较麻烦,所以有线程池帮助完成这些功能,让线程处于一直存活状态。
- 有任务之后会交给线程池,线程池交给线程。
线程池优点:
①:减少线程的频繁创建和销毁带来的性能开销,因为线程创建会涉及到 CPU 上下文切换、内存分配等工作。
②:提升快速响应的能力,不需要创建线程,直接在线程池调用就行
③:线程池本身会有参数来控制线程创建的数量,这样就可以避免无休止的创建线程带来的资源利用率过高的问题,起到了资源保护的作用。
④:减少代码之间的耦合,能够实现异步操作(异步:直接发送下一个请求,不需要等待回复)
线程复用技术
线程复用技术,因为线程的生命周期时由任务运行的状态决定的,无法人为控制。所以为了实现线程的复用,线程池里面用到了阻塞队列,也就是说线程池里面的工作线程处于一直运行状态,它会从阻塞队列中去获取待执行的任务,一旦队列空了,那这个工作线程就会被阻塞,直到下次有新的任务进来。工作线程是根据任务的情况实现阻塞和唤醒,从而达到线程复用的目的。
最后,线程池里面的资源限制,是通过几个关键参数来控制的,分别是核心线程数、最大线程数。核心线程数表示默认长期存在的工作线程,而最大线程数是根据任务的情况动态创建的线程,主要是提高阻塞队列中任务的处理效率。
线程池的实现原理是什么
线程里面有个死循环
线程池里面存数据的集合是队列
线程池执行任务的流程?
- 线程池执行executelsubmit方法向线程池添加任务,当任务小于核心线程数corePoolSize,线程池中可以创建新的线程。
- 当任务大于核心线程数corePoolSize,就向阻塞队列添加任务。
- 如果阻塞队列已满,需要通过比较参数maximumPoolSize,在线程池创建新的线程,当线程数量大于maximumPoolSize,说明当前设置线程池中线程已经处理不了了,就会执行饱和策略。
线程池如何知道一个线程的任务已经执行完成
(1)在线程池内部,当我们把一个任务丢给线程池去执行,线程池会调度工作线程来执行这个任务的 run 方法,run 方法正常结束,也就意味着任务完成了。
(线程池中的工作线程是通过同步调用任务的 run()方法并且等待 run 方法返回后,再去统计任务的完成数量。)
(2)如果想在线程池外部去获得线程池内部任务的执行状态,有几种方法可以实现。线程池提供了一个 isTerminated()方法,可以判断线程池的运行状态,我们可以循环判断 isTerminated()方法的返回结果来了解线程池的运行状态,一旦线程池的运行状态是 Terminated,意味着线程池中的所有任务都已经执行完了。想要通过这个方法获取状态的前提是,程序中主动调用了线程池的 shutdown()方法。在实际业务中,一般不会主动去关闭线程池,因此这个方法在实用性和灵活性方面都不是很好。
(3)在线程池中,有一个 submit()方法,它提供了一个 Future 的返回值,我们通过Future.get()方法来获得任务的执行结果,当线程池中的任务没执行完之前future.get()方法会一直阻塞,直到任务执行结束。因此,只要 future.get()方法正常返回,也就意味着传入到线程池中的任务已经执行完成了!
- 也可以引入一个 CountDownLatch 计数器,它可以通过初始化指定一个计数器进行倒计时,其中有两个方法分别是 await()阻塞线程,以及 countDown()进行倒计时,一旦倒计时归零,所以被阻塞在 await()方法的线程都会被释放。基于这样的原理,我们可以定义一个 CountDownLatch 对象并且计数器为 1,接着在线程池代码块后面调用 await()方法阻塞主线程,然后,当传入到线程池中的任务执行完成后,调用countDown()方法表示任务执行结束。最后,计数器归零 0,唤醒阻塞在 await()方法的线程。
(4)总结:不管是线程池内部还是外部,要想知道线程是否执行结束,我们必须要获取线程执行结束后的状态,而线程本身没有返回值,所以只能通过阻塞-唤醒的方式来实现,future.get 和 CountDownLatch 都是这样一个原理。
线程池的核心参数
线程池的真正实现类是 ThreadPoolExecutor,其构造方法需要如下参数:
- corePoolSize(必需):核心线程数。默认情况下,核心线程会一直存活,但是当将allowCoreThreadTimeout 设置为 true 时,核心线程也会超时回收。
- maximumPoolSize(必需):线程池所能容纳的 最大线程数。当活跃线程数达到该数值后,后续的新任务将会阻塞。
- keepAliveTime(必需):线程闲置超时时长。如果超过该时长,非核心线程就会被回收。如果将 allowCoreThreadTimeout 设置为 true 时,核心线程也会超时回收。
- unit(必需):指定 keepAliveTime 参数的时间单位。常用的有:TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)。
- workQueue(必需):任务队列。通过线程池的 execute() 方法提交的 Runnable 对象将存储在该参数中。其采用阻塞队列实现。
- threadFactory(可选):线程工厂。用于指定为线程池创建新线程的方式。
- handler(可选):拒绝策略。当达到最大线程数时需要执行的饱和策略。
拒绝策略
AbortPolicy(默认):丢弃任务,并抛出 RejectedExecutionException 异常。
CallerRunsPolicy:由调用线程处理该任务。
DiscardPolicy:丢弃任务,但是不抛出异常。可以配合这种模式进行自定义的处理方式。
DiscardOldestPolicy:丢弃队列最早的未处理任务,然后重新尝试执行任务。
线程池类型(常用线程池)
- 定长线程池(FixedThreadPool)
- 特点:只有核心线程,线程数量固定,执行完立即回收,任务队列为链表结构的有界队列。
- 应用场景:控制线程最大并发数。
- 定时线程池(ScheduledThreadPool )
- 特点:核心线程数量固定,非核心线程数量无限,执行完闲置 10ms 后回收,任务队列为延时阻塞队列。
- 应用场景:执行定时或周期性的任务。
- 可缓存线程池(CachedThreadPool)
- 特点:无核心线程,非核心线程数量无限,执行完闲置 60s 后回收,任务队列为不存储元素的阻塞队列。
- 应用场景:执行大量、耗时少的任务。
- 单线程化线程池(SingleThreadExecutor)
- 特点:只有 1 个核心线程,无非核心线程,执行完立即回收,任务队列为链表结构的有界队列。
- 应用场景:不适合并发但可能引起 IO 阻塞性及影响 UI 线程响应的操作,如数据库操作、文件操作等。
阻塞队列
执行execute()方法和submit()方法的区别
- execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否:
- submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get (long timeout,Timeunit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
代码
public class DequeThread<T extends Runnable> {
//存储任务
private Deque<Worker> threads=new LinkedList<>();
private int defaultThreadCount=4;
public DequeThread(){
for (int i = 0; i < defaultThreadCount; i++) {
Worker worker=new Worker();
//当前类实例化的时候,不断在任务列表放任务
threads.addLast(worker);
//执行任务
worker.start();
}
}
private Deque<T> tasks=new LinkedList<>();
public int addTask(T task) {
synchronized (tasks){//添加的时候也应该锁住
tasks.addLast(task);//放任务
tasks.notifyAll();
//添加新任务时应该做一次唤醒,唤醒全部的来抢任务
}
return 0;
}
class Worker extends Thread{
@Override
public void run() {
super.run();
while (true){
T first=tasks.pollFirst();//取任务
synchronized (tasks){//synchronized执行完之后,锁就释放出来了
//拿到锁就判断是否为空
while(first==null){//防止空指针
try {
tasks.wait();//为空的时候等待在task上
//.wait():让当前线程阻塞在对象上,并把锁释放掉
} catch (InterruptedException e) {
e.printStackTrace();
}
//再判断是不是空,如果是空就进下一次循环
//应该是取出来再执行
first=tasks.pollFirst();
}
}
first.run();//执行任务
}
}
}
public static void main(String[] args) {
DequeThread<Runnable> thread=new DequeThread<>();
thread.addTask(new Thread(
new Runnable() {
@Override
public void run() {
System.out.println("1");
}
}));
thread.addTask(new Thread(
new Runnable() {
@Override
public void run() {
System.out.println("2");
}
}));
thread.addTask(new Thread(
new Runnable() {
@Override
public void run() {
System.out.println("3");
}
}));
}
}
进行优化
public class DequeThread<T extends Runnable> {
//存储任务
private Deque<Worker> threads=new LinkedList<>();
private int defaultThreadCount=4;
//限制个数,线程安全的计数
private AtomicInteger taskCount = new AtomicInteger();
private Object full = new Object();
public DequeThread(){
for (int i = 0; i < defaultThreadCount; i++) {
Worker worker=new Worker();
//当前类实例化的时候,不断在任务列表放任务
threads.addLast(worker);
//执行任务
worker.start();
}
}
private Deque<T> tasks=new LinkedList<>();
public int addTask(T task) throws InterruptedException {
synchronized (tasks){//添加的时候也应该锁住
synchronized (full){
int i = taskCount.incrementAndGet();//自增
//如果i加到10,就进行阻塞
while (i > 10){
full.wait();
}
}
tasks.addLast(task);//放任务
//添加新任务时应该做一次唤醒,唤醒全部的来抢任务
tasks.notifyAll();
}
return 0;
}
class Worker extends Thread{
@Override
public void run() {
super.run();
while (true){
synchronized (tasks){//synchronized执行完之后,锁就释放出来了
//优化——》tasks.pollFirst();放到锁里
T first=tasks.pollFirst();//取任务
//拿到锁就判断是否为空
while(first==null){//防止空指针
try {
tasks.wait();//为空的时候等待在task上
//.wait():让当前线程阻塞在对象上,并把锁释放掉
} catch (InterruptedException e) {
e.printStackTrace();
}
//再判断是不是空,如果是空就进下一次循环
//应该是取出来再执行
first=tasks.pollFirst();
}
//优化——》first.run();放到锁里
first.run();//执行任务
//执行完成后,做减一的操作
taskCount.decrementAndGet();
//阻塞到full上的线程就能被唤醒
//如果空间满了,让调用的线程阻塞住
synchronized (full){
full.notifyAll();
}
}
}
}
}
//main方法里面执行的代码结束了
public static void main(String[] args) throws InterruptedException {
DequeThread<Runnable> thread=new DequeThread<>();
thread.addTask(new Thread(
new Runnable() {
@Override
public void run() {
System.out.println("1");
}
}));
thread.addTask(new Thread(
new Runnable() {
@Override
public void run() {
System.out.println("2");
}
}));
thread.addTask(new Thread(
new Runnable() {
@Override
public void run() {
System.out.println("3");
}
}));
}
}
线程池代码描述
继承了Thread类,也可以实现Runnable接口,
如何让任务和线程关联起来?
选择了自己封装线程的run方法,让它和提交的任务关联起来;
又因为线程池的线程是优先于任务启动起来的,所以看不到任务,只能借助一个另外的集合private Deque tasks=new LinkedList<>(); tasks这个变量联系起来;
类似于生产者消费者的思想,没有任务的时候等着,有任务的时候执行,进而完成线程池的处理
而且线程池的初始化是在程序运行起来的时候就进行了
所以是线程池运行起来之后,才等着任务进来