📝个人主页:五敷有你
🔥系列专栏:并发编程
⛺️稳重求进,晒太阳
示意图
步骤1:自定义任务队列
变量定义
- 用Deque双端队列来承接任务
- 用ReentrantLock 来做锁
- 并声明两个条件变量 Condition fullWaitSet emptyWaitSet
- 最后定义容量 capcity
方法:
- 添加任务
- 注意点:
- 任务容量慢了 用await
- 每个添加都进行一个emptyWaitSet.signalAll 唤醒沉睡的线程
- 考虑万一死等的情况,加入时间的判断
- 注意点:
- 取出任务
- 注意点:
- 任务空了 用await
- 每个任务取出来都进行一个fullWaitSet.signAll来唤醒沉睡的线程
- 考虑超时的情况,加入时间的判断
- 注意点:
public class MyBlockQueue<T> {
//1.任务队列
private Deque<T> deque=new ArrayDeque();
//2.锁
private ReentrantLock lock=new ReentrantLock();
//3.生产者条件变量
private Condition fullWaitSet=lock.newCondition();
//4.消费者条件变量
private Condition emptyWaitSet=lock.newCondition();
//5.容量
private int capcity;
public MyBlockQueue(int capcity) {
this.capcity = capcity;
}
//带超时的阻塞获取
public T poll(long timeOut, TimeUnit unit){
lock.lock();
try {
//将timeOUt转换成统一转换为ns
long nanos = unit.toNanos(timeOut);
while (deque.isEmpty()) {
try {
//返回值=等待时间-经过的时间
if(nanos<=0){
return null;
}
nanos= emptyWaitSet.awaitNanos(nanos);
}catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
T t = deque.removeFirst();
fullWaitSet.signalAll();
return t;
}finally {
lock.unlock();
}
}
//6. 阻塞获取
public T take() {
lock.lock();
try {
while (deque.isEmpty()) {
try {
emptyWaitSet.await();
}catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
T t = deque.removeFirst();
fullWaitSet.signalAll();
return t;
}finally {
lock.unlock();
}
}
//阻塞添加
public void put(T element){
lock.lock();
try {
while (deque.size()==capcity){
try {
fullWaitSet.await();
}catch (Exception e){
}
}
deque.addLast(element);
emptyWaitSet.signalAll();
} finally {
lock.unlock();
}
}
public int size(){
lock.lock();
try {
return deque.size();
}finally {
lock.unlock();
}
}
}
步骤2:自定义线程池
- 定义变量:
- 任务队列 taskQueue
- 队列的容量
- 线程的集合
- 核心线程数
- 获取任务的超时时间
- 时间单位
- 方法
- 构造方法 初始化一些核心的参数
- 执行方法 execute(task) 里面处理任务
- 每执行一个任务就放入一个worker中,并开启线程执行 同时放入workers集合中
- 当任务数量>核心数量时,就加入到阻塞队列中
- 自定义的类worker
- 继承Thread 重写Run方法
- 执行传递的任务,每次任务执行完毕,不回收,
- 去队列中拿任务 当队列也空了之后 workers集合中移除线程,线程停止。
- 继承Thread 重写Run方法
package com.aqiuo.juc;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
public class ThreadPool {
//任务队列
private MyBlockQueue<Runnable> taskQueue;
//队列容量
int queueCapcity;
//线程集合
private HashSet<Worker> workers=new HashSet();
//线程池的核心线程
private int coreSize;
//获取任务的超时时间
private long timeOut;
//时间单位
private TimeUnit timeUnit;
public ThreadPool(int coreSize, long timeOut, TimeUnit timeUnit,int queueCapcity) {
this.coreSize = coreSize;
this.timeOut = timeOut;
this.timeUnit = timeUnit;
taskQueue=new MyBlockQueue<>(queueCapcity);
}
public void exectue(Runnable task){
//当任务数没有超过coreSize时,直接交给work对象执行
//如果任务超过coreSize时,加入任务队列
synchronized (workers){
if(workers.size()<coreSize){
Worker worker=new Worker(task);
System.out.println("新增worker");
workers.add(worker);
worker.start();
//任务数超过了核心数
}else{
System.out.println(task+"加入任务队列");
taskQueue.put(task);
}
}
}
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task){
this.task=task;
}
@Override
public void run() {
//执行任务
//1)当task不为空,执行任务
//2)当task执行完毕,再接着从任务队列中获取任务
while (task!=null||(task=taskQueue.take())!=null){
try {
System.out.println("正在执行worker"+this);
sleep(10000);
task.run();
} catch (Exception e) {
}finally {
task=null;
}
}
//执行完任务后销毁线程
synchronized (workers){
workers.remove(this);
}
}
}
}
测试
开启15个线程测试
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10);
for (int i=0;i<15;i++){
int j=i;
threadPool.exectue(()->{
System.out.println(j);
});
}
}
执行过程中,超过了队列容量之后,就会发生fullWaitSet阻塞。这个阻塞的线程就开始等待,当有队列不满之后,唤醒fullWaitSet阻塞的队列,
同理,当队列为空,emptyWaitSet小黑屋阻塞,当有任务被放入,EmptyWaitSet唤醒所有的线程。
这就有一个执行完毕之后,线程不会停止,他会一定等待拿去任务,线程阻塞了EmptyWaitSet
改进
获取任务的超时结束
获取任务take的增强 超时
//带超时的阻塞获取
public T poll(long timeOut, TimeUnit unit){
lock.lock();
try {
//将timeOUt转换成统一转换为ns
long nanos = unit.toNanos(timeOut);
while (deque.isEmpty()) {
try {
//返回值=等待时间-经过的时间
if(nanos<=0){
return null;
}
nanos= emptyWaitSet.awaitNanos(nanos);
}catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
T t = deque.removeFirst();
fullWaitSet.signalAll();
return t;
}finally {
lock.unlock();
}
}
修改worker的run函数
public void run() {
//执行任务
//1)当task不为空,执行任务
//2)当task执行完毕,再接着从任务队列中获取任务
// while (task!=null||(task=taskQueue.take())!=null){
//修改如下
while (task!=null||(task=taskQueue.poll(timeOut,timeUnit))!=null){
try {
System.out.println("正在执行worker"+this);
sleep(1000);
task.run();
} catch (Exception e) {
}finally {
task=null;
}
}
正常结束了
放入任务的超时结束offer()
那么有装入任务 的增强 ,就再提供一个超时装入入offer()吧 ,当放入一个满的队列时,超时后返回false不再放入
//带有超时的队列添加
public Boolean offer(T element,long timeOut, TimeUnit unit){
lock.lock();
long nanos = unit.toNanos(timeOut);
try {
while (deque.size()==capcity){
try {
long l = fullWaitSet.awaitNanos(nanos);
if(l<=0){
return false;
}
}catch (Exception e){
}
}
deque.addLast(element);
emptyWaitSet.signalAll();
return true;
} finally {
lock.unlock();
}
}
拒绝策略
函数式接口
@FunctionalInterface // 拒绝策略
interface RejectPolicy<T> {
void reject(MyBlockQueue<T> queue, T task);
}
代码改进
如下部分代码是存入任务的部分
public void exectue(Runnable task){
//当任务数没有超过coreSize时,直接交给work对象执行
//如果任务超过coreSize时,加入任务队列
synchronized (workers){
if(workers.size()<coreSize){
Worker worker=new Worker(task);
System.out.println("新增worker");
workers.add(worker);
worker.start();
//任务数超过了核心数
}else{
//存入任务
//taskQueue.put(task);
//当队列满了之后 执行的策略
//1) 死等
//2)带有超时的等待
//3)当调用者放弃任务执行
//4)让调用者抛出异常
//5)让调用者自己执行任务...
//为了增加灵活性,这里不写死,交给调用者
//重新写了一个放入任务的方法
taskQueue.tryPut(rejectPolicy,task);
}
}
}
阻塞队列里的tryPut
public void tryPut(ThreadPool.RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
//如果队列容量满了,就开始执行拒绝策略
if(capcity>= deque.size()){
rejectPolicy.reject(this,task);
}else{
//不满就正常加入到队列中
System.out.println(task+"正常加入到队列");
deque.addLast(task);
}
}finally {
lock.unlock();
}
}
//1) 死等
//2)带有超时的等待
//3)当调用者放弃任务执行
//4)让调用者抛出异常
//5)让调用者自己执行任务...
谁调用方法,谁写拒绝策略
为了传入策略,就再构造函数里面加入一个方法的参数传入
//部分代码...
//拒绝策略
RejectPolicy<Runnable> rejectPolicy;
public ThreadPool(int coreSize, long timeOut, TimeUnit timeUnit,int queueCapcity,RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeOut = timeOut;
this.timeUnit = timeUnit;
taskQueue=new MyBlockQueue<>(queueCapcity);
this.rejectPolicy=rejectPolicy;
}
主函数编写拒绝的策略,就lamda表达式会把...
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1,(queue,task)->{
//死等
// queue.put(task);
//超时添加
// System.out.println(queue.offer(task, 100, TimeUnit.NANOSECONDS));
//放弃执行
// System.out.print("我放弃");
//调用者抛出异常
// throw new RuntimeException("任务执行失败");
//调用者执行
// task.run();
});
for (int i=0;i<5;i++){
int j=i;
threadPool.exectue(()->{
System.out.println(j);
});
}
}
五种拒绝策略的结果(我不会用slog4j)
1.死等的结果
2.超时拒绝的结果(每个false都是时间到了,每加进去)
3.不作为,调用者放弃任务
4.抛出异常,停止
5.调用者线程执行了