完整代码已上传gitee ,地址 :朱元杰的开源仓库 – ThreadPool核心源码仿写
完整文章栏目地址在:Fearless____的博客 - ThreadPool仿写
接下来将手动仿写一个线程池,第一步先仿写 阻塞队列
为什么需要阻塞队列 - 因为不能为每个任务都创建一个线程,当任务数量超过可用线程的数量,需要将任务放在阻塞队列中
阻塞队列属性
阻塞队列我们定义为一个类 MyBlockingQueue ,要有如下几个属性
- 任务队列
private Deque<T> queue = new ArrayDeque<>();
使用 ArrayDeque 因为性能好于 LinkList - 锁
private ReentrantLock lock = new ReentrantLock();
防止多个线程同时获取头部任务,也防止多个线程同时添加任务而发生线程安全问题 - 生产者(main)条件变量
private Condition fullWaitSet = lock.newCondition();
阻塞队列有容量限制,当任务过多,生产者线程需阻塞等待 - 消费者(线程池的线程)条件变量
private Condition emptyWaitSet = lock.newCondition();
当阻塞队列为空,消费者也需要阻塞等待 - 容量
private int capcity
; 阻塞队列的容量
任务添加逻辑
// 阻塞添加
public void put(T task) {
lock.lock();
try {
while (queue.size() == capcity) {
try {
log.debug("等待加入任务队列 {} ...", task);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列 {}", task);
queue.addLast(task);
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
上锁,while判断队列是否已满,满了则让当前 添加线程 进入 fullWaitSet.await() ,不满则执行 queue.addLast(task) 添加任务到队列,在此之前 线程池中的 可能有 消费线程 因为任务队列没有任务而进入 emptyWaitSet.await() ,因此添加完任务后有必要调用 emptyWaitSet.signal() 去唤他们
任务获取逻辑
// 阻塞获取
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
与任务添加的逻辑类似,不在赘述
阻塞优化 - 添加超时时间
只需将 await 方法替换为 awaitNanos 方法,就可以实现 带超时的阻塞添加和获取
带超时时间的阻塞获取如下
// 带超时阻塞获取
public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
// 将 timeout 统一转换为 纳秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
try {
// 返回值是剩余时间
if (nanos <= 0) {
return null;
}
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
给方法传入自定义的超时时间,toNanos 方法将时间统一转化为纳秒
emptyWaitSet.awaitNanos(nanos) 方法的返回值是 定义的等待时间nanos - 已等待的时间,因为存在虚假唤醒的可能
虚假唤醒 指 被唤醒的原因不是因为有新任务添加到阻塞队列中,也不是因为超时时间到,而是其他原因,因此唤醒后 阻塞队列中仍然可能为空,此时就要让他继续等待,不过只需等完剩余的超时时间,不能从头开始等待
如果超时时间耗尽,还没有新任务,就返回null,后面消费者线程获取null,就知道暂时没有任务需要执行,就会结束线程
带超时时间的阻塞添加如下
// 带超时时间阻塞添加
public boolean offer(T task, long timeout, TimeUnit timeUnit) {
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capcity) {
try {
if (nanos <= 0) {
return false;
}
log.debug("等待加入任务队列 {} ...", task);
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列 {}", task);
queue.addLast(task);
emptyWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}
因为超时了就会失败,因此函数的返回值类型不能是void,应该设置为boolean,超时获取失败了就返回 false,其他逻辑与 超时阻塞获取 一致