目录
一. 阻塞队列 BlockingQue
二. 拒绝策略 RejectPolicy
三. 线程池 ThreadPool
四. 模拟运行
在 Java基础(二) 多线程编程 中,我们简单介绍了线程池 ThreadPoolExecutor 的核心概念与基本使用。在本文中,我们将基于前面学习的各种锁与同步工具来实现自定义的线程池,同时来探究和分析 Java 线程池的基本原理。
一. 阻塞队列 BlockingQue
在线程池的生态中,阻塞队列是至关重要的一环,其用于实现任务与工作线程之间的平衡(类似于生产者/消费者模式)。 在此处,我们实现了一个自定义的阻塞队列 BlockingQue,其代码如下:
// 阻塞队列实现
public class BlockingQue<T> {
// 1. 任务队列
private Deque<T> queue;
// 2. 锁
private ReentrantLock lock;
// 3. 生产者条件变量
private Condition fullWaitSet;
// 4. 消费者条件变量
private Condition emptyWaitSet;
// 5. 容量
private int capacity;
public BlockingQue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.capacity = capacity;
// ArrayDeque: 基于 Object[] 实现,可以自动扩容
this.queue = new ArrayDeque<>();
this.lock = new ReentrantLock(fair);
// 读写共用一把锁
this.fullWaitSet = lock.newCondition();
this.emptyWaitSet = lock.newCondition();
}
public BlockingQue(int capacity) {
this(capacity, false);
}
// 阻塞添加
public void put(T element) throws InterruptedException {
lock.lock();
try {
while (queue.size() == capacity) {
fullWaitSet.await();
}
queue.addLast(element);
// 唤醒消费线程
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
// 非阻塞添加
public boolean offer(T element) {
lock.lock();
try {
if (queue.size() == capacity)
return false;
queue.addLast(element);
// 唤醒消费线程
emptyWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}
// 超时阻塞添加
public boolean offer(T element, long timeout, TimeUnit unit) throws InterruptedException {
lock.lock();
try {
long nanos = unit.toNanos(timeout);
while (queue.size() == capacity) {
// 已经超时则返回 false
if (nanos <= 0)
return false;
nanos = fullWaitSet.awaitNanos(nanos); // awaitNanos 返回剩余等待时间(处理虚假唤醒)
}
queue.addLast(element);
// 唤醒消费线程
emptyWaitSet.signal();
return true;
}finally {
lock.unlock();
}
}
// 阻塞获取
public T take() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
emptyWaitSet.await();
}
T element = queue.removeFirst();
// 唤醒生产线程
fullWaitSet.signal();
return element;
} finally {
lock.unlock();
}
}
// 超时阻塞获取
public T poll(long timeout, TimeUnit unit) throws InterruptedException {
lock.lock();
try {
// 将 timeout 统一转换为纳秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
// 已经超时则返回 null
if(nanos <= 0){
return null;
}
nanos = emptyWaitSet.awaitNanos(nanos); // awaitNanos 返回剩余等待时间(处理虚假唤醒)
}
T element = queue.removeFirst();
// 唤醒生产线程
fullWaitSet.signal();
return element;
}finally {
lock.unlock();
}
}
//获取大小
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
}
可以看出,上述代码使用了 Deque 作为元素存储容器,但若将 Deque 换成 Object[] 数组,则其基本就是 ArrayBlockingQueue 的实现源码。在实际工作中,若要实现自定义阻塞队列,我们只需要实现 BlockingQueue<E> 接口及其抽象方法即可。
package java.util.concurrent;
import java.util.Collection;
import java.util.Queue;
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
boolean offer(E e);
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
int remainingCapacity();
boolean remove(Object o);
public boolean contains(Object o);
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
}
二. 拒绝策略 RejectPolicy
在线程数量已满且阻塞队列已满的情况下,主线程则会因为无法放置任务而一直阻塞等待,因此我们需要拒绝策略来处理这种溢出情况。拒绝策略一般定义为接口,并允许我们自定义策略,其代码如下:
// 拒绝策略
@FunctionalInterface
public interface RejectPolicy<T> {
void reject(BlockingQue<T> queue, T task);
}
一般接口方法需要提供阻塞队列以及当前任务两个参数,并支持函数式编程;常见的拒绝策略包括:阻塞等待、放弃执行、抛出异常、由调用线程执行等(后续会实现)。在实际工作中,Java已经为我们提供了拒绝策略的顶层设计,若想自定义拒绝策略,我们只需实现 RejectedExecutionHandler 接口并实现其 rejectedExecution 抽象方法即可。
package java.util.concurrent;
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
三. 线程池 ThreadPool
在本节,我们将实现一个简单的自定义线程池,其只包含核心线程数,并且规定线程池的运行规则如下:
1.若当前线程数 < corePoolSize,则新建线程处理任务;
2.若当前线程数 >= corePoolSize && 任务队列未满,则将任务放入任务队列等待;
3.若当前线程数 >= corePoolSize && 任务队列已满,则执行拒绝策略;
/**
* 自定义线程池实现:
* 1. 若当前线程数 < corePoolSize,则新建线程处理任务
* 2. 若当前线程数 >= corePoolSize && 任务队列未满,则将任务放入任务队列等待
* 3. 若当前线程数 >= corePoolSize && 任务队列已满,则执行拒绝策略
*/
public class ThreadPool {
// 任务队列
private BlockingQue<Runnable> taskQueue;
// 线程集合
private HashSet<Worker> workers = new HashSet<>();
// 锁
private ReentrantLock mainLock = new ReentrantLock();
// 核心线程数
private int coreSize;
// 获取任务的超时时间(allowThreadTimeOut=true时有效)
private long timeOut;
// 时间单位(allowThreadTimeOut=true时有效)
private TimeUnit timeUnit;
// 是否允许线程超时等待(默认允许)
private boolean allowThreadTimeOut = true;
// 拒绝策略
private RejectPolicy<Runnable> rejectPolicy;
public ThreadPool(int coreSize, long timeOut, TimeUnit timeUnit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeOut = timeOut;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQue<>(queueCapacity);
this.rejectPolicy = rejectPolicy;
}
// 设置 allowThreadTimeOut 参数
public void setAllowThreadTimeOut(boolean allowThreadTimeOut) {
this.allowThreadTimeOut = allowThreadTimeOut;
}
// 执行任务 task
public void execute(Runnable task){
mainLock.lock();
try{
if(workers.size() < coreSize){
// 添加核心线程
Worker worker = new Worker(task);
workers.add(worker);
worker.start();
}else if(!taskQueue.offer(task)){
// 执行拒绝策略
rejectPolicy.reject(taskQueue, task);
}
} finally {
mainLock.unlock();
}
}
// 工作线程类
private class Worker extends Thread{
// 执行任务
private Runnable task;
public Worker(Runnable task){
this.task = task;
}
@Override
public void run() {
// 获取任务
while(task != null || (task = getTask()) != null){
try{
task.run();
}catch (Exception e){
e.printStackTrace();
}finally {
task = null;
}
}
// worker 线程终止
synchronized (workers){
// 移除 worker
workers.remove(this);
}
}
}
// 从阻塞队列中获取等待任务(提供给Worker的钩子方法)
private Runnable getTask(){
for(;;){
try {
Runnable r = allowThreadTimeOut ? taskQueue.poll(timeOut, timeUnit) : taskQueue.take();
return r;
} catch (InterruptedException e) {
// 若被中断则重新等待
e.printStackTrace();
}
}
}
}
Java ThreadPoolExecutor 的实现相比我们自定义的线程池更加复杂和安全(增加了线程池状态的维护、最大线程数的逻辑、线程池终止方法等),但在核心思想的实现上基本一致,因此这段自定义代码的实现可以帮助我们更加方便的理解 ThreadPoolExecutor 的源码。
四. 模拟运行
public class Main {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(3,
10000, TimeUnit.MILLISECONDS, 5,
(queue, task) -> {
// 1. 死等
//try {
// queue.put(task);
//} catch (InterruptedException e) {
// e.printStackTrace();
//}
// 2. 放弃任务执行
// do nothing...
System.out.println("do discard policy...");
// 3. 抛出异常
//throw new RuntimeException("task run fail" + task);
// 4. 调用线程执行任务
//task.run();
});
for (int i = 0; i < 10; i++) {
int j = i;
threadPool.execute(() -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(j + "is running...");
});
}
}
}