前言:在编程中我们为什么要使用线程池,线程池中的线程是怎么执行任务的,线程池中的线程是如何复用和销毁的;
1 什么是线程池:
提前创建一些线程放到一个地方,使用的时候直接获取,避免频繁的创建和销毁线程,节省内存和CPU资源;
2 Java 中已有的线程池:
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
ExecutorService cashedThreadPool = Executors.newCachedThreadPool();
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
3 一个线程池构建需要的参数:
我们的任务是由某一个线程具体去执行的,所以我们就要定义好池中线程的数量,并且当任务数量增加时,可以开辟一些临时线程进行任务处理,当池中的线程已经是多余的时候再回收掉一些线程来节约资源;当池中的线程都有任务执行,这个时候来了新的任务,需要有个地方能把这些任务先行储存,以便当有空余的线程时在执行存储下来的任务,而且考虑到资源的情况这个存储任务的队列也最好是有限量的,如果超出了程序的处理能力,使用者可以自己决定拒绝策略;
所以在创建线程池的时候有必要以下参数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
corePoolSize: 正式存在的线程数;
maximumPoolSize:允许存在的最大线程,扩容的线程数= maximumPoolSize-corePoolSize
keepAliveTime:临时线程存活的时间
Unit:临时线程存活的时间单位
workQueue:阻塞队列
threadFactory:线程工厂
Handler:拒绝策略;
4 线程池任务的执行和线程销毁:
demo:
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(new Thread( () -> testGetFutureMap("param")));
private static Map<String, Object> testGetFutureMap(String param) {
// 处理业务逻辑
Map<String, Object> mapData = new HashMap<>();
/**
* do some thing
*/
System.out.printf("do some thing");
return mapData;
}
ThreadPoolExecutor:execute 线程任务的执行
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 当前工作的线程小于核心线程,直接新建线程,并且进行任务的执行
if (addWorker(command, true))
return;
c = ctl.get();
}
// 当前工作的线程大于核心线程,或者直接添加任务失败
if (isRunning(c) && workQueue.offer(command)) {
// 添加到队列中,等待后续空闲线程执行任务
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 当前工作的线程大于核心线程,或者直接添加任务失败,并且添加队列失败
// 开启临时线程进行任务处理
else if (!addWorker(command, false))
// 如果失败执行拒绝策略
reject(command);
}
addWorker:线程任务的执行
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 判断当前线程池是否可用
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 判断当前工作线程数和核心线程数或者最大线程数
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 增加工作线程数,增加成功,直接跳出for 循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 增加线程数失败,进行重试
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建线程,传入任务
// 执行任务 new Thread(new Worker()).start();
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 添加任务
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动线程执行任务
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 添加任务失败,工作任务数量-1,移除任务
addWorkerFailed(w);
}
return workerStarted;
}
compareAndIncrementWorkerCount:增加线程池中的线程数量
private boolean compareAndIncrementWorkerCount(int expect) {
// 工作线程数量+1
return ctl.compareAndSet(expect, expect + 1);
}
构建Worker用于具体任务的执行:
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
执行任务 t.start() ,调用Worker run():
public void run() {
runWorker(this);
}
finalvoid runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
// 任务执行,没有任务的时候不进入while 循环
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 线程不可用
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 在任务执行之前,此方法可以被重写
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 任务执行以后
afterExecute(task, thrown);
}
} finally {
// 执行任务后
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 线程的退出,减少工作线程数量
processWorkerExit(w, completedAbruptly);
}
}
获取任务 getTask():
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 线程池不可用
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 当前工作线程大于核心线程,并且没有了任务,则将线程池中线程数量-1
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 当前工作线程大于核心线程数 则进行规定时间内获取任务(规定时间内没有获取到则说明当前没有需要执行的任务)
// 否则直接获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;// 返回任务
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
任务执行完毕后退出线程,processWorkerExit:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 增加任务完成的数量
completedTaskCount += w.completedTasks;
// 移除任务
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
以上为线程池中线程创建,执行任务,以及销毁线程的过程,流程图如下:
过程:
(1)当提交任务后,如果当前工作的线程没有超过核心线程,则创建线程然后进行任务的执行;
(2)当工作的现车超过核心线程数,则尝试添加到阻塞队列中,添加成功后,有空闲线程时从队列中获取任务并执行;
(3)如果阻塞队列已满,则判断是否要增加临时线程处理任务,如果已经达到最大线程数,则执行拒绝策略;否则创建临时线程,执行任务;
(4)当任务执行完毕后,如果没有了任务,并且当前工作的线程大于核心小程,则执行线程的销毁;
5 总结:
5.1 线程池的创建是为了避免线程频繁的创建和销毁,是为了线程的复用,增加线程池中的线程可以提高任务执行的效率,但是线程池中线程过多会造成频繁的上下文切换,所以线程数量并不是越多越好;
5.2 我们可以自定义线程池,通过重写方法的方式,更好的监控线程的执行:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ExecutorServiceMonitor extends ThreadPoolExecutor {
public ExecutorServiceMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
// 线程执行任务之前
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
}
// 线程执行任务之后
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
}
}