回顾
什么是线程,什么是进程?
- 进程:是一个应用程序,里面包含很多线程
- 线程:进程执行的基本单元
java实现线程的几种方式
- 继承Thread类
- 实现Runable接口
线程的生命周期
执行线程会出现的问题
- 一个线程只能执行一个任务
- 线程执行完后销毁,不能复用
- 线程过多,会导致jvm宕机(正常一台8核【同时并行的线程最多只有8个,cpu会在2000个线程中来回切】,12g的服务器线程数正常只有2000个线程)
线程池
JUC
- JUC是一个工具类,用户高并发、处理多线程的一个包。
线程池解决了哪些问题
- 降低资源消耗
- 方便线程数的管控
- 功能强大,提供延时定时线程池
线程池引发了什么问题
- jvm宕机,提交的任务会消失
- 使用不合理,导致内存溢出
- 参数多,引入数据结构和算法,增加了学习难度
线程池的设计思想
-
线程维护
-
执行任务
-
状态监控
线程池的原理
- 线程池的结构图
- 最常用的是ThreadPoolExecutor,调度用ScheduleExecutorService;
线程池的工作状态
- RUNNING:线程池一单被创建,就是RUNNING状态。
- SHUTDOWN:不接受新的任务,但能处理已添加的任务。调用shutdown接口后,线程池的状态由RUNNING变成SHUTDOWN。
- STOP:不接受新的任务,不会去处理已经添加的任务,并且会中断住在处理的任务;【调用shutdownnow()接口之后线程池会由running和shutdown变成stop】
- TIDYING(终止状态):所有任务终止,队列中任务数量会变成0。会执行钩子函数terminated()方法。
线程池的参数定义
线程池的结构说明
- 任务提交给线程池,如果有空闲线程,则会将任务分配给空闲线程,如果没有空闲线程会先放到队列中去。
- 如果核心线程都满了,并且队列也满了,才会去只用最大线程数中的线程。
- 达到maxSize,根据拒绝策略处理。
线程池的工具类
确定线程池的线程数
创建合适的线程数才能提高性能
- io密集型任务
io操作时间长,cpu利用率不高,这类任务CPU常处于空闲状态。
此类型任务可以开cpu核心的两倍线程。比如cpu是4核的,可以开8个线程。
- cpu密集型任务
执行计算任务,cpu一直运行,cpu的利用率高 。
取相等的线程数。比如cpu是4核的,可以开4个线程。
- 混合型任务
既要执行逻辑运算,又要进行大量的io操作。针对不同类型的任务,创建不同的线程池。
最佳线程数 = ((线程等待时间+线程cpu时间)/ 线程cpu时间)* cpu核数
线程池的源码分析
- ExecutorService executorService = Executors.newFixedThreadPool(5);
- ExecutorService executorService1 = Executors.newSingleThreadExecutor();
- ExecutorService executorService2 = Executors.newCachedThreadPool();
任务先放在阻塞队列中在进行处理
- ExecutorService executorService3 = Executors.newScheduledThreadPool(5);
为什么大厂里禁止使用Executors工具类,怕使用不规范造成宕机的风险。一般都是自己new一个线程池:ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.HOURS, new LinkedBlockingQueue<>(1000));
threadPoolExecutor.execute(); //提交一个普通的任务
threadPoolExecutor.submit(); //提交一个有返回值的任务
- execute()
//提交任务代码
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//判断工作数,如果小于coreSize -> addWork,注意第二个参数 core=true
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//否则,如果线程池还在运行,offer到队列
if (isRunning(c) && workQueue.offer(command)) {
//在检查一下状态
int recheck = ctl.get();
//线程池终止,直接移除任务
if (! isRunning(recheck) && remove(command))
reject(command);
//如果没可用线程的话(coreSize=0),创建一个空work
//该work创建时指派给任务(为null)。会被放入works集合,从队列获取任务去执行
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//队列已满,继续调addWork,但是注意,core=false,开启到maxSize的大门
//超出max,addWork会返回false,进入reject
else if (!addWorker(command, false))
reject(command);
}
- addWorker()
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//线程大于 2^29次方^-1,或者线程超出了规定的范围,返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
//创建work放入works集合(一个hashSet)
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//符合条件,创建新的work包装成task
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//加锁,works是一个hashset,保证线程安全性
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//add成功新的work,work立即启动
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
- 任务获取和执行
//执行runWorker()的时候,一直循环,如果携带task,就执行
while (task != null || (task = getTask()) != null)
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
//判断是不是超时处理,决定了当前线程是不是要释放
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//线程数量超出max,并且上次循环poll等待超时,说明该线程已经终止,将线程数量原子性 减
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//计数器原子性递减,递减成功后,放回null,for终止
if (compareAndDecrementWorkerCount(c))
return null;
//递减失败,继续下一轮循环
continue;
}
try {
//如果线程可被释放,那就poll,释放时间为keepAliveTime
//否则,线程不会被释放,take一直被阻塞在这里,直到新的任务继续工作
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
//到这说明可被释放的线程等待超时,已被销毁,设置标记,下次循环的线程减少
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
线程池的经典面试题
如果队列没有任务,核心线程一直阻塞获取任务的方法,直到返回任务。而任务执行完后,又会进入下一轮work.runWork()中循环。
//核心代码
//work.runWork
while (task != null || (task = getTask()) != null)
//work.getTask
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//最关键
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS):workQueue.take();
没有,被销毁的线程和创建先后无关。即使是第一个被创建的核心线程,仍有可能被销毁。
**验证:**每个work在runwork()的时候去getTask(),在getTask内部,并没有针对性的区分当前work是否是核心线程。只要判断work是都大于core,就会调poll(),否则take()。
- 阅读代码,查看执行结果
结果只会执行 1 和 2,因为队列不会满,只会执行核心线程数,而核心线程在 while(true) 中一直在执行。
- 线程池7个参数作用和生效时机
- int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue workQueue,
- ThreadFactory threadFactory
- RejectedExecutionHandler handler
- 为什么线程池不用数组,而用队列?