初识线程池
什么是“池”
---- 软件中的“池”,可以理解为计划经济。
我们的资源是有限的,比如只有十个线程,我们创造十个线程的线程池,可能我们的任务非常多,如1000个任务,我们就把1000个任务放到我们十个线程中去,慢慢的去执行,最终会执行完所有任务。但是资源总量十个线程是被控制住的,还有就是不需要再创建更多的线程了,创建线程是有一定的开销的。所以我们复用线程池的这十个线程,每个线程的执行效率也会提高。
如果不适用线程池,每个任务都新开一个线程处理
- 一个线程
- for循环创建线程
- 当任务数量上升到1000
这样开销太大,我们希望有固定数量的线程,来执行这1000个任务,这样就避免了反复创建并销毁线程所带来的开销问题。
为什么要使用线程池
- 问题一:反复创建线程开销大
- 问题二:过多的线程会占用太多内存
- 解决以上两个问题的思路:
- 用少量的线程 ----避免内存占用过多
- 让这部分线程都保持工作,且可以反复执行任务 ---- 避免生命周期的损耗。
线程池的好处
- 加快响应速度
- 合理利用CPU和内存
- 统一管理
线程池适合应用的场合
- 服务器接收到大量请求时,使用线程池技术是非常合适的,它可以大大减少线程的创建和销毁次数,提高服务器的工作效率。
- 实际上,在开发中,如果需要创建5个以上的线程,那么就可以使用线程池来管理。
创建和停止线程池
线程池构造方法的参数
- corePoolSize指的是核心线程数
-线程池在完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待有任务到来时,再创建新线程去执行任务。 - 最大量maxPoolSize
-在核心线程数的基础上,额外增加的线程数的上限。
添加线程规则 - 如果线程数小于corePoolSize,创建一个新线程来运行新任务。
- 如果线程数等于(或大于)corePoolSize但少于workQueue上限,则将任务放入队列。
- 如果队列已满,并且线程数小于maxPoolSize,则创建一个新线程。
- 如果队列已满,并且线程数大于或等于maxPoolSize,则拒绝。
增减线程的特点 - 通过设置corePoolSize和maximumPoolSize相同,就可以创建固定大小的线程池。
- 线程池希望保持较少的线程数,并且只有在负载变得很大时才增加它。
- 通过设置maximumPoolSize为很高的值,可以允许线程池容纳任意数量的并发任务。
- 只有在队列填满时才创建多于corePoolSize的线程,如果使用的是无界队列,那么线程数就不会超过corePoolSize。
keepAliveTime
如果线程池当前的线程数多于corePoolSize,那么如果多余的线程空闲时间超过keepAliveTime,它们就会被终止。
ThreadFactory用来创建线程
- 默认使用Executors.defaultThreadFactory()
- 创建出来的线程都在同一个线程组
- 如果自己指定ThreadFactory,那么就可以改变线程名、线程组、优先级、是否是守护线程等。
工作队列
有3种最常见的队列类型:
- 直接交换:SynchronousQueue
- 无界队列:LinkedBlockingQueue
- 有界的队列:ArrayBlockingQueue
线程池该手动创建还是自动创建
手动创建更好,因为这样可以更加明确线程池的运行规则,避免资源耗尽的风险。
自动创建线程池(即直接调用JDK封装好的构造方法)可能带来哪些问题?
FixedThreadPool
源码为
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
可看出corePoolSize和maximumPoolSize相同,且keepAliveTime为0,工作队列为LinkedBlockingQueue无界队列。
容易造成大量内存占用,可能会导致OOM。
SingleThreadExecutor
源码为
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
可看出corePoolSize和maximumPoolSize相同且为1,且keepAliveTime为0,工作队列为LinkedBlockingQueue无界队列。
当请求堆积的时候,可能会占用大量的内存。
CachedThreadPool
可缓存线程池
特点:具有自动回收多余线程的功能
源码为
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
弊端在于第二个参数maximumPoolSize被设置为了Integer.MAX_VALUE,这可能会创建数量非常多的线程,甚至导致OOM
ScheduledThreadPool
支持定时及周期性任务执行的线程池
以上4种线程池的构造方法的参数
正确的创建线程池的方法
- 根据不同的业务场景,设置线程池参数。
- 比如:内存有多大,给线程取什么名字等等
线程池里的线程数量设定为多少比较合适?
- CPU密级型(加密、计算hash等):最佳线程数为CPU核心数的1-2倍左右。
- 耗时IO型(读写数据库、文件、网络读写等):最佳线程数一般会大于CPU核心数很多倍。
参考Brain Goetz推荐的计算方法:
workStealingPool是JDK1.8加入的
子任务:比如二叉树的遍历
窃取:子任务会被放到每个线程独有的任务队列,而不是公共队列。比如有个线程产生了很多子任务,其他线程如果是空闲,会帮助第一个线程,窃取它的子任务去执行。这个任务最好不能加锁,也不能保证执行顺序。
停止线程池的正确方法
- shutdown :运行了这个方法,线程池有新的任务会拒绝并抛出拒绝的异常,会把正在执行的任务和队列里等待的任务都执行完毕后关闭线程池。
- isShutdown:返回线程池是否有停止标记(是否执行过shutdown)
- isTerminated:返回线程池是否完全终止
- awaitTermination:等待一段时间检测线程是否终止
- shutdownNow:立刻关闭线程池,并返回未执行的任务列表。
暂停和恢复线程池
任务太多,怎么拒绝?
- 拒绝时机
- 当Executor关闭是,提交新任务会被拒绝。
- 以及当Executor对最大线程和工作队列容量使用有限边界并且已经饱和时。
4种拒绝策略
- AbortPolicy 直接抛出异常
- DiscardPolicy 默默地把任务丢弃,不会通知
- DiscardOldestPolicy 丢弃最老的任务
- CallerRunsPolicy 谁提交的任务,让谁执行
钩子方法
给线程池加点料
每个任务执行前后、日志、统计等场景使用
package com.ql;
import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* 演示每个任务执行前后放钩子函数
*/
public class PauseableThreadPool extends ThreadPoolExecutor {
private boolean isPaused;
private final ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
/**
* 默认重写,不用管
*/
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
/**
* 默认重写,不用管
*/
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
/**
* 默认重写,不用管
*/
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
/**
* 默认重写,不用管
*/
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
/**
* 重写该方法,在任务执行前放钩子函数
*/
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
lock.lock();
try {
while (isPaused){
condition.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
private void pause(){
lock.lock();
try{
isPaused = true;
}finally {
lock.unlock();
}
}
public void resume(){
lock.lock();
try{
isPaused = false;
condition.signalAll();
}finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10l, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("我被执行");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < 200; i++) {
pauseableThreadPool.execute(runnable);
}
Thread.sleep(1500);
pauseableThreadPool.pause();
System.out.println("线程池被暂停了~~");
Thread.sleep(1500);
pauseableThreadPool.resume();
System.out.println("线程池被恢复了~~");
}
}
/**执行结果
...
我被执行
我被执行
我被执行
我被执行
线程池被暂停了~~
线程池被恢复了~~
我被执行
我被执行
我被执行
我被执行
...
*/
线程池实现原理
线程池组成部分
- 线程池管理器:创建停止线程池等
- 工作线程:创建出来执行任务的线程
- 任务队列:存放未执行任务的队列,必须支持并发
- 任务接口(Task)
线程池相关类和接口关系
Executors是Java提供的工具类,包含了很多自带的线程池。
线程池实现任务复用的原理
- 相同线程执行不同任务
线程池状态
使用线程池的注意点
- 避免任务堆积
- 避免线程数过度增加
- 排查线程泄漏