1.手撕线程池原理图
2.代码实现
// 手撕线程池
public class Main {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(1,1000,TimeUnit.MILLISECONDS,1,(queue, task) -> {
queue.putByTime(task,1500,TimeUnit.MILLISECONDS);
});
for (int i = 0; i < 4; i++) {
int j=i;
threadPool.execute(()->{
try{
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(j);
});
}
}
}
// 自定义任务队列
class BlockQueue<T>{
// 任务队列
private Deque<T> queue=new ArrayDeque<>();
// 锁
private ReentrantLock lock=new ReentrantLock();
// 生产者条件变量
private Condition fullWaitSet=lock.newCondition();
// 消费者条件变量
private Condition emptyWaitSet=lock.newCondition();
// 容量大小
private int capacity;
public BlockQueue(int capacity) {
this.capacity = capacity;
}
// 阻塞获取
public T take(){
lock.lock();
try{
while(queue.isEmpty()){
try{
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t=queue.removeFirst();
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}
// 阻塞添加
public void put(T task){
lock.lock();
try {
while(queue.size()>=capacity){
try{
System.out.println("等待加入任务队列:"+task);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("加入任务队列:"+task);
queue.addLast(task);
emptyWaitSet.signal();
}finally {
lock.unlock();
}
}
// 带超时时间的阻塞获取
public T takeByTime(long timeout, TimeUnit unit){
lock.lock();
try{
// 将 timeout 统一转换为 纳秒
long nanos=unit.toNanos(timeout);
while(queue.isEmpty()){
try{
if(nanos<=0){
return null;
}
nanos=emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t=queue.removeFirst();
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}
// 带超时时间阻塞添加
public boolean putByTime(T task,long timeout, TimeUnit unit){
lock.lock();
try {
long nanos=unit.toNanos(timeout);
while(queue.size()>=capacity){
try{
if(nanos<=0){
return false;
}
System.out.println("等待加入任务队列:"+task);
nanos=fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("加入任务队列:"+task);
queue.addLast(task);
emptyWaitSet.signal();
return true;
}finally {
lock.unlock();
}
}
// 返回队列大小
public int size(){
lock.lock();
try{
return queue.size();
}finally {
lock.unlock();
}
}
// 根据策略添加
public void tryPut(RejectPolicy<T> rejectPolicy,T task){
lock.lock();
try{
// 判断队列是否满
if(queue.size()>=capacity){
rejectPolicy.reject(this,task);
}else{
System.out.println("加入任务队列:"+task);
queue.addLast(task);
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}
}
// 拒绝策略
// 采用策略模式
@FunctionalInterface
interface RejectPolicy<T>{
void reject(BlockQueue<T> queue,T task);
}
// 自定义线程池
class ThreadPool{
// 任务队列
private BlockQueue<Runnable> taskQueue;
// 线程集合
// Worker为内部类
private HashSet<Worker> workers=new HashSet<Worker>();
// 核心线程数
private int coreSize;
// 获取任务时的超时时间
private long timeout;
private TimeUnit timeUnit;
// 拒绝策略
private RejectPolicy<Runnable> policy;
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit,int queueCapacity, RejectPolicy<Runnable> policy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.policy = policy;
this.taskQueue=new BlockQueue<>(queueCapacity);
}
// 执行任务
public void execute(Runnable task){
// 当任务数没有超过 coreSize 时,直接交给 worker 对象执行
// 如果任务数超过 coreSize 时,加入任务队列暂存
synchronized (workers){
if(workers.size()<coreSize){
Worker worker = new Worker(task);
System.out.println("新增worker:"+worker+" ,task:"+task);
workers.add(worker);
worker.start();
}else{
taskQueue.tryPut(policy,task);
}
}
}
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
// 执行任务
// (1) 当 task 不为空,执行任务
// (2) 当 task 执行完毕,再接着从任务队列获取任务并执行
while(task!=null||(task= taskQueue.takeByTime(timeout,timeUnit))!=null){
try{
System.out.println("正在执行:"+task);
task.run();
} catch (Exception e) {
e.printStackTrace();
}finally {
task=null;
}
}
synchronized (workers){
System.out.println("worker 被移除:"+this);
workers.remove(this);
}
}
}
}
{
task=null;
}
}
synchronized (workers){
System.out.println(“worker 被移除:”+this);
workers.remove(this);
}
}
}
}