线程池图解
线程池与主线程之间通过一个阻塞队列来平衡任务分配,阻塞队列中既可以满足线程等待,又要接收主线程的任务。
线程池实现
使用一个双向链表实现任务队列
创建任务队列
//阻塞队列
public class BlockingQueue<T> {
//双线链表
private Deque<T> queue = new ArrayDeque();
//锁
private ReentrantLock lock =new ReentrantLock();
//生产者条件变量
private Condition fullWaitSet = lock.newCondition();
//消费者条件变量
private Condition emptyWaitSet = lock.newCondition();
//容器容量大小
private int capacity;
//阻塞获取
public T pull(long timeOut, TimeUnit unit){
lock.lock();
//判断链表中是否存在任务待处理
try {
//将尝试时间转化为纳秒
long nanos = unit.toNanos(timeOut);
while (queue.isEmpty()){
try {
if (nanos<0){
return null;
}
//awaitNanos返回结果是最大等待时间减去睡眠时间的剩余时间
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
//阻塞添加
public void put(T element){
lock.lock();
try{
while(queue.size()==capacity){
//说明满了,暂时无法添加新的任务
try {
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(element);
emptyWaitSet.signal();
}finally {
lock.unlock();
}
}
//获取队列任务数量
public int size(){
lock.lock();
try {
return queue.size();
}finally {
lock.unlock();
}
}
}
创建线程池
public class ThreadPool {
//任务队列
private BlockingQueue<Runnable> blockingQueue;
//线程集合
private HashSet<Worker> workers = new HashSet();
//核心线程数
private int coreNum;
//超时时间
private long timeOut;
private TimeUnit unit;
public ThreadPool(int coreNum, long timeOut, TimeUnit unit, int queueCapacity) {
System.out.println("初始化线程池");
this.coreNum = coreNum;
this.timeOut = timeOut;
this.unit = unit;
this.blockingQueue = new BlockingQueue<>(queueCapacity);
}
//线程执行任务
public void execute(Runnable task) {
//当线程数没有超过coreNum时,直接交给Worker对象执行,如果超过了coreNum数,则加入BlockingQueue
synchronized (workers) {
if (workers.size() < coreNum) {
Worker worker = new Worker(task);
System.out.println("新增worker"+worker);
workers.add(worker);
worker.start();
} else {
System.out.println("从消息队列中获取task");
blockingQueue.put(task);
}
}
}
class Worker extends Thread {
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
while (task != null || (task = blockingQueue.pull(timeOut, unit)) != null) {
try {
System.out.println("Worker执行任务");
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
synchronized (workers){
System.out.println("Worker执行完毕"+this);
workers.remove(this);
}
}
}
}
测试
public class Test {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(3,3000, TimeUnit.MILLISECONDS,5);
for (int i = 0; i < 10; i++) {
int j = i;
threadPool.execute(()->{
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("生产任务:"+j);
});
}
}
}
初始化线程池
新增workerThread[Thread-0,5,main]
新增workerThread[Thread-1,5,main]
新增workerThread[Thread-2,5,main]
Worker执行任务
Worker执行任务
加入任务队列TheadPool.Test$$Lambda$1/1078694789@7ba4f24f
Worker执行任务
加入任务队列TheadPool.Test$$Lambda$1/1078694789@3b9a45b3
加入任务队列TheadPool.Test$$Lambda$1/1078694789@7699a589
加入任务队列TheadPool.Test$$Lambda$1/1078694789@58372a00
加入任务队列TheadPool.Test$$Lambda$1/1078694789@4dd8dc3
等待加入任务队列TheadPool.Test$$Lambda$1/1078694789@6d03e736
生产任务:2
生产任务:1
生产任务:0
Worker执行任务
Worker执行任务
加入任务队列TheadPool.Test$$Lambda$1/1078694789@6d03e736
Worker执行任务
加入任务队列TheadPool.Test$$Lambda$1/1078694789@378bf509
生产任务:3
生产任务:4
生产任务:5
Worker执行任务
Worker执行任务
Worker执行任务
生产任务:6
生产任务:8
生产任务:7
Worker执行任务
Worker执行完毕Thread[Thread-1,5,main]
Worker执行完毕Thread[Thread-0,5,main]
生产任务:9
Worker执行完毕Thread[Thread-2,5,main]
添加拒绝策略
上面测试中,有一点不友好的是,当任务队列满了之后,再向其中添加任务时,主线程会死等任务添加成功。
对此我们可以选择多种解决方案
- 死等
- 添加超时时间
- 让调用者方式执行
- 让调用者抛出异常
- 让调用者自己执行
创建拒绝策略
@FunctionalInterface
public interface RejectPolicy<T> {
void reject(BlockingQueue<T> queue,T task);
}
修改线程池的执行方法
//添加属性
private RejectPolicy rejectPolicy;
//构造方法
public ThreadPool(int coreNum, long timeOut, TimeUnit unit, int queueCapacity, RejectPolicy rejectPolicy) {
System.out.println("初始化线程池");
this.coreNum = coreNum;
this.timeOut = timeOut;
this.unit = unit;
this.blockingQueue = new BlockingQueue<>(queueCapacity);
this.rejectPolicy = rejectPolicy;
}
//线程执行任务
public void execute(Runnable task) {
//当线程数没有超过coreNum时,直接交给Worker对象执行,如果超过了coreNum数,则加入BlockingQueue
synchronized (workers) {
if (workers.size() < coreNum) {
Worker worker = new Worker(task);
System.out.println("新增worker" + worker);
workers.add(worker);
worker.start();
} else {
// System.out.println("从消息队列中获取task");
// blockingQueue.put(task);
blockingQueue.tryPut(rejectPolicy,task);
}
}
}
任务队列添加方法
public void tryPut(RejectPolicy rejectPolicy, T task) {
lock.lock();
try {
if (queue.size() == capacity) {
//如果满了,需要调用拒绝策略
rejectPolicy.reject(this,task);
} else {
queue.addLast(task);
emptyWaitSet.signal();
}
} finally {
lock.unlock();
}
}
测试
public class Test {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(3,3000,
TimeUnit.MILLISECONDS,5,
(queue,task)->{
//由调用者决定任务队列满了之后如何处理后续任务
queue.put(task);//死等
queue.offer(task,1000,TimeUnit.MILLISECONDS);//超时返回
//啥也不干,直接丢弃任务
task.run();//调用者自己执行
throw new RuntimeException("任务秩序异常");//抛出异常
});
for (int i = 0; i < 10; i++) {
int j = i;
threadPool.execute(()->{
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("生产任务:"+j);
});
}
}
}