目录
一:什么是线程池
二:线程池有什么好处
三:如何创建一个线程池
Executors
ThreadPoolExecutors
四:创建一个线程池为什么不推荐使用Executors
五:如何设置线程池的大小
六:线程池有哪些参数
七:线程池有哪些状态
八:线程池的执行流程
九:如何判断一个线程池的中的任务是否执行完毕
十:线程池的拒绝策略有哪些
十一:线程池的线程是如何复用的
十二:线程池中的阻塞队列有哪些
十三:线程池中的任务出现异常后,是复用还是销毁呢
十四:如何关闭一个线程池
一:什么是线程池
线程池:顾名思义就是一个管理线程的容器,当有任务需要处理的时候,放进任务队列里面,由线程池分配空闲的线程处理任务,处理完任务的线程不会被销毁,而是在线程池中等待下一个任务
二:线程池有什么好处
1.降低资源消耗: 频繁的创建与销毁线程,占用大量资源,线程池的出现避免了这种情况,减少了资源的消耗;
2.提高响应速度: 因为线程池中的线程处于待命状态,有任务进来无需等待线程的创建就能立即执行(前提是有空闲线程,任务量巨大,还是需要排队的哈);
3.更好的管理线程: 线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
三:如何创建一个线程池
在Java中有两类方法可以创建一个线程池,如下:
Executors
1.Executors.newFixedThreadPool:创建一个固定大小的线程池,并且可以控制并发线程的数量,多余的线程会放在任务队列中等待执行
2.Executors.newCacheThreadPool:创建一个可缓存的线程池,若线程数量超过了线程池设置的参数,会缓存一段时间后回收
3.Executors.newSingleThreadPool:创建单个线程的线程池,他可以保证线程的顺序执行
4.Executors.newScheduledThreadPool:创建可以执行延迟任务的线程池
5.Executors.newSingleThreadScheduleExecutors:创建一个单线程可以执行延迟任务的线程池,是3和4的结合体
6.Executors.newWorkStealingPool:创建一个抢占式的线程池(是根据CPU的核数来确定的)
ThreadPoolExecutors
7.ThreadPoolExecutors:通过手动创建线程池,需要配置参数
四:创建一个线程池为什么不推荐使用Executors
如果大家跟入到Executors这些方法的底层实现中去看一眼的话,立马就知道原因了,像FixedThreadPool 和 SingleThreadExecutor这两个方法内使用的是无界的 LinkedBlockingQueue存储任务,任务队列最大长度为 Integer.MAX_VALUE,这样可能会堆积大量的请求,从而导致 OOM。 而CachedThreadPool使用的是同步队列 SynchronousQueue, 允许创建的线程数量也为 Integer.MAX_VALUE ,如果任务数量过多且执行速度较慢,可能会创建大量的线程,从而导致 OOM,其他的方法所提供的均是这种无界任务队列
,在高并发场景下导致OOM的风险很大,故大部分的公司已经不建议采用Executors提供的方法创建线程池了。
五:如何设置线程池的大小
通常来说设置线程数是根据业务需求和实际情况来确定的,原因如下
1.如果当前业务需求的任务类型是多IO类型的(读和写操作比较多)话,线程数可以设置的多一些(2n-1),如果业务需求的任务类型是多cpu计算型的话,那么线程数可以设置为n-1
2.线程数还与cpu的核心数有关,因为我们创建的线程池中本质还是多个线程,而Java线程是与操作系统一一对应,因此也要考虑cpu的情况
六:线程池有哪些参数
在线程池中有七大参数
1.核心线程数:即当前线程池种常驻的线程数量,如果是0的话,则会销毁线程池
2.最大线程数:当有最多任务的时候,该线程池中所有的线程数量
3.临时线程存活时间:如果线程池空闲,除了核心线程以外的临时线程,在超过改时间后会消亡
4.临时线程存活时间单位:临时线程的存活时间单位,一般为毫秒
5.任务阻塞队列:当线程池中所有的线程都在执行任务时,又有新的任务来时,会放进任务阻塞i队列中。
6.线程工厂:是一个创建线程池的工厂,不设置该参数时,用的是默认的线程工厂
7.拒绝策略:当任务队列都已经满了的时候,此时就会触发拒绝策略。表示不在接收新的任务
七:线程池有哪些状态
通常线程池有以下5中状态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
-
RUNNING: 线程池一旦被创建,就处于 RUNNING 状态,任务数为 0,能够接收新任务,对已排队的任务进行处理。
-
SHUTDOWN: 不接收新任务,但能处理已排队的任务。调用线程池的 shutdown() 方法,线程池由 RUNNING 转变为 SHUTDOWN 状态。
-
STOP: 不接收新任务,不处理已排队的任务,并且会中断正在处理的任务。调用线程池的 shutdownNow() 方法,线程池由(RUNNING 或 SHUTDOWN ) 转变为 STOP 状态。
-
TIDYING: 1)SHUTDOWN 状态下,任务数为 0, 其他所有任务已终止,线程池会变为 TIDYING 状态。
2)线程池在 SHUTDOWN 状态,任务队列为空且执行中任务为空,线程池就会由 SHUTDOWN 转变为 TIDYING 状态。
3)线程池在 STOP 状态,线程池中执行中任务为空时,就会由 STOP 转变为 TIDYING 状态。
-
TERMINATED: 线程池彻底终止。线程池在 TIDYING 状态执行完 terminated() 方法就会由 TIDYING 转变为 TERMINATED 状态。
5种状态转换如下:
八:线程池的执行流程
1、刚new出来的线程池里默认是没有线程的,只有一个传入的阻塞队列;
2、当我们执行execute提交一个方法后,会判断当前线程池中线程数是否小于核心线程数(corePoolSize),如果小于,那么就直接通过 ThreadFactory 创建一个线程来执行这个任务,当任务执行完之后,线程不会退出,而是会去阻塞队列中获取任务;
3、如果当前运行的线程数等于或大于核心线程数,但是小于最大线程数,那么就把该任务放入到任务队列里等待执行。
4、如果向任务队列投放任务失败(任务队列已经满了),但是当前运行的线程数是小于最大线程数的,就新建一个线程来执行任务。
5、如果当前运行的线程数已经等同于最大线程数了,新建线程将会使当前运行的线程超出最大线程数,那么当前任务会被拒绝,调用RejectedExecutionHandler.rejectedExecution()方法。
源码如下:
public void execute(Runnable command) {
// 首先检查提交的任务是否为null,是的话则抛出NullPointerException。
if (command == null)
throw new NullPointerException();
// 获取线程池的当前状态(ctl是一个AtomicInteger,其中包含了线程池状态和工作线程数)
//private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
int c = ctl.get();
// 1. 检查当前运行的工作线程数是否少于核心线程数(corePoolSize)
if (workerCountOf(c) < corePoolSize) {
// 如果少于核心线程数,尝试添加一个新的工作线程来执行提交的任务
// addWorker方法会检查线程池状态和工作线程数,并决定是否真的添加新线程
if (addWorker(command, true))
return;
// 重新获取线程池的状态,因为在尝试添加线程的过程中线程池的状态可能已经发生变化
c = ctl.get();
}
// 2. 尝试将任务添加到任务队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 双重检查线程池的状态
if (! isRunning(recheck) && remove(command)) // 如果线程池已经停止,从队列中移除任务
reject(command);
// 如果线程池正在运行,但是工作线程数为0,尝试添加一个新的工作线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 如果任务队列满了,尝试添加一个新的非核心工作线程来执行任务
else if (!addWorker(command, false))
// 如果无法添加新的工作线程(可能因为线程池已经停止或者达到最大线程数限制),则拒绝任务
reject(command);
}
九:如何判断一个线程池的中的任务是否执行完毕
在Java中通常有两种方式判断线程池中线程是否已经执行完毕
1.使用getTakCount总任务数和getCompletedTaskCount已经执行完毕的线程数相比较来判断,该方法缺点是在实际开发中线程池是公用的,因此总任务数是在一直不断变化的,并且得到的也是一个近似值,因此不推荐使用
2.使用FutureTask中的get()方法是等待所有线程执行完毕后才返回结果。
十:线程池的拒绝策略有哪些
在Java中有两类拒绝策略:内置的拒绝策略,自定义拒绝策略
1.在内置的拒绝策略中有四种:
1.AbortPolicy:即默认的拒绝策略,只会抛出一个异常消息
2.CallerRunsPolicy:即哪个线程提交过来的任务,就返回给哪个线程,是不会用线程池中的线程处理
3.DiscardPolicy:即默默丢弃任务,不会做任何的提醒操作
4.DiscardOldestPolicy:即丢弃任务队列中最旧的一个任务,并尝试重新提交任务
自定义拒绝策略
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author: dlwlrma
* @data 2024年09月17日 19:47
* @Description: TODO:模拟自定义线程池拒绝策略
*/
public class ThreadDemo {
public static void main(String[] args) {
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("线程名称:"+Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
//创建线程,线程的任务队列长度为1
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//自定义线程池拒绝策略
System.out.println("执行自定义拒绝策略");
}
});
threadPool.execute(runnable);
threadPool.execute(runnable);
threadPool.execute(runnable);
threadPool.execute(runnable);
}
}
实际开发中,我们不会使用内置的拒绝策略,是因为当内置的拒绝策略过于简单,不利于维护,因此常用自定义的拒绝策略,例如在自定义的拒绝策略中加入mq,当触发了拒绝策略时,发布者会收集异常信息,然后订阅者会根据异常id去处理异常等等
十一:线程池的线程是如何复用的
线程池的核心功能就是实现线程的重复利用,那么线程池是如何实现线程的复用呢? 线程池通过addWorker()方法添加任务,而在这个方法的底层会将任务和线程一起封装到一个Worker对象中,Worker 继承了 AQS,也就是具有一定锁的特性。
然后Worker中有一个run方法,执行时会去调用runWorker()方法来执行任务,我们来看一下这个方法的源码:
final void runWorker(Worker w) {
// 获取当前工作线程
Thread wt = Thread.currentThread();
// 从 Worker 中取出第一个任务
Runnable task = w.firstTask;
w.firstTask = null;
// 解锁 Worker(允许中断)
w.unlock();
boolean completedAbruptly = true;
try {
// 当有任务需要执行或者能够从任务队列中获取到任务时,工作线程就会持续运行
while (task != null || (task = getTask()) != null) {
// 锁定 Worker,确保在执行任务期间不会被其他线程干扰
w.lock();
// 如果线程池正在停止,并确保线程已经中断
// 如果线程没有中断并且线程池已经达到停止状态,中断线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 在执行任务之前,可以插入一些自定义的操作
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 实际执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行任务后,可以插入一些自定义的操作
afterExecute(task, thrown);
}
} finally {
// 清空任务,并更新完成任务的计数
task = null;
w.completedTasks++;
// 解锁 Worker
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 工作线程退出的后续处理
processWorkerExit(w, completedAbruptly);
}
}
在这段源码中我们看到了线程被复用的原因了,就是这个while的循环,当有任务需要执行或者能够从任务队列中获取到任务时,工作线程就会持续运行;如果从 getTask 获取不到方法的话,就会调用 finally 中的 processWorkerExit 方法,将线程退出。
十二:线程池中的阻塞队列有哪些
常见的阻塞队列有以下几种
// 1、无界队列 LinkedBlockingQueue,容量Integer.MAX_VALUE
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
// 1、无界队列 LinkedBlockingQueue
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
// 2、同步队列 SynchronousQueue,没有容量,不存储元素,目的是保证对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务,因此线程最多可创建Integer.MAX_VALUE个。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
// 3、DelayedWorkQueue(延迟阻塞队列),添加元素满了之后会自动扩容原来容量的 1/2,即永远不会阻塞,最大扩容可达 Integer.MAX_VALUE。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
十三:线程池中的任务出现异常后,是复用还是销毁呢
这个问题我们要分两种情况去分析,第一种是通过 execute() 提交任务时,在执行过程中抛出异常,且没有在任务内被捕获,当前线程会因此终止,异常信息会记录在日志或控制台中,并且线程池会移除异常线程,重新创建一个线程补上去。
第二种通过submit()提交任务时,如果在任务执行中发生异常,这个异常不会直接打印出来。相反,异常会被封装在由submit()返回的Future对象中。当调用Future.get()方法时,可以捕获到一个ExecutionException。在这种情况下,线程不会因为异常而终止,它会继续存在于线程池中,准备执行后续的任务。
通过submit()的底层源码发现,其实它的内部封装的是execute方法,只不过它的任务被放在了RunnableFuture对象里。
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
execute方法会抛出异常终止线程的,为什么submit中不会呢,那么肯定在RunnableFuture里,经过一顿跟踪发现,这个Future中实现的run方法,对异常进行了捕获,所以并不会往上抛出,也就不会移除异常线程以及新建线程了。
十四:如何关闭一个线程池
在JDK 1.8 中,线程池的停止一般使用 shutdown()、shutdownNow()这两种方法。
1.shutdown
public void shutdown() {
final ReentrantLock mainLock = this.mainLock; // ThreadPoolExecutor的主锁
mainLock.lock(); // 加锁以确保独占访问
try {
checkShutdownAccess(); // 检查是否有关闭的权限
advanceRunState(SHUTDOWN); // 将执行器的状态更新为SHUTDOWN
interruptIdleWorkers(); // 中断所有闲置的工作线程
onShutdown(); // ScheduledThreadPoolExecutor中的挂钩方法,可供子类重写以进行额外操作
} finally {
mainLock.unlock(); // 无论try块如何退出都要释放锁
}
tryTerminate(); // 如果条件允许,尝试终止执行器
}
在shutdown的源码中,会启动一次顺序关闭,在这次关闭中,执行器不再接受新任务,但会继续处理队列中的已存在任务,当所有任务都完成后,线程池中的线程会逐渐退出。
2.shutdownNow
public List<Runnable> shutdownNow() {
List<Runnable> tasks; // 用于存储未执行的任务的列表
final ReentrantLock mainLock = this.mainLock; // ThreadPoolExecutor的主锁
mainLock.lock(); // 加锁以确保独占访问
try {
checkShutdownAccess(); // 检查是否有关闭的权限
advanceRunState(STOP); // 将执行器的状态更新为STOP
interruptWorkers(); // 中断所有工作线程
tasks = drainQueue(); // 清空队列并将结果放入任务列表中
} finally {
mainLock.unlock(); // 无论try块如何退出都要释放锁
}
tryTerminate(); // 如果条件允许,尝试终止执行器
return tasks; // 返回队列中未被执行的任务列表
}
与shutdown不同的是shutdownNow会尝试终止所有的正在执行的任务,清空队列,停止失败会抛出异常,并且返回未被执行的任务列表。