目录
编辑
先上完整代码:
解析:
任务队列:
线程池类:
拒绝策略:
先上完整代码:
public class MyThreadPool {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2, 1000,TimeUnit.MICROSECONDS, 10, (queue, task) -> {
// 1.死等
queue.put(task);
// 2.带超时时间等待加入等待队列
// queue.offer(task, 500, TimeUnit.MICROSECONDS);
// 3.放弃任务
// 队列满了,没做人任何事情
// 4.抛出异常
// throw new RuntimeException("任务执行失败" + task);
// 5.让调用者自己执行
// task.run();
});
for (int i = 0; i < 15; i++) {
int j = i;
threadPool.execute(() -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(j);
});
}
}
}
// 拒接策略
@FunctionalInterface
interface RejectPolicy<T> {
void reject(BlockQueue queue, T task) ;
}
class ThreadPool {
// 任务队列
private BlockQueue<Runnable> taskQueue;
// 线程集合
private HashSet<Worker> workers = new HashSet();
// 线程数
private int coreSize;
private long timeout;
private TimeUnit timeUnit;
private RejectPolicy<Runnable> rejectPolicy;
// 构造方法
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueSize, RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockQueue<>(queueSize);
this.rejectPolicy = rejectPolicy;
}
public void execute(Runnable task) {
// 当任务数没有超过核心数时,直接交给woker对象执行
// 如果超过,放入任务队列中存起来
synchronized (workers) { // workers不安全,把他锁起来
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
System.out.println("新增worker");
workers.add(worker); // 加入线程集合
worker.start();
} else {
// taskQueue.put(task); // 任务添加进入
// 1.死等
// 2.带超时时间等待
// 3.放弃任务
// 4.抛出异常
// 5.让调用者自己执行
taskQueue.tryPut(rejectPolicy, task);
}
}
}
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
// 当task任务不为空,执行
// 当任务为空,去任务队列中去取
// while (task != null || (task = taskQueue.take()) != null) 一直等待获取
while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
try {
System.out.println("正在执行" + task);
task.run();
} catch (Exception e) {
} finally {
task = null;
}
}
synchronized (workers) {
System.out.println("worker被移除" + this);
workers.remove(this); // 移除当前集合对象
}
}
}
}
// 阻塞队列
class BlockQueue<T> {
// 任务队列
private Deque<T> queue = new ArrayDeque<>();
// 锁
private ReentrantLock lock = new ReentrantLock();
// 满了等待,生产者
private Condition fullWaitSet = lock.newCondition();
// 空的等待,消费者
private Condition emptyWaitSet = lock.newCondition();
// 容量
private int capacity;
public BlockQueue(int capacity) {
this.capacity = capacity;
}
// 阻塞队列中获取任务
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
try {
emptyWaitSet.await(); // 进入等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal(); // 唤醒
return t;
} finally {
lock.unlock();
}
}
// 阻塞队列中添加任务
public void put(T t) {
lock.lock();
try {
while (queue.size() == capacity) { // 如果满了,进入等待
try {
System.out.println("等待加入任务队列" + t);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("加入任务队列" + t);
queue.addLast(t);
emptyWaitSet.signal(); // 唤醒
}finally {
lock.unlock();
}
}
public int size() {
lock.lock();
try {
return queue.size();
}finally {
lock.unlock(); // 就算return也会执行
}
}
// 带超时时间的获取,无需永久的等待了
public T poll (long timeout, TimeUnit unit) {
lock.lock();
try {
long nanos = unit.toNanos(timeout); // 时间转换为ns
while (queue.isEmpty()) {
try {
if (nanos <= 0) return null; // 超时了,直接返回吧
nanos = emptyWaitSet.awaitNanos(nanos);// 进入等待,超时不再等待,返回结果为剩余等待时间
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal(); // 唤醒
return t;
} finally {
lock.unlock();
}
}
// 带超时时间的添加, return 添加成功 or 失败
public boolean offer(T task, long timeout, TimeUnit timeUnit) {
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capacity) { // 如果满了,进入等待
try {
System.out.println("等待加入任务队列" + task);
if (nanos <= 0) return false;
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("加入任务队列" + task);
queue.addLast(task);
emptyWaitSet.signal(); // 唤醒
return true;
}finally {
lock.unlock();
}
}
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
// 判断队列是否已满
if (queue.size() == capacity) { // 有空闲
rejectPolicy.reject(this, task); // 拒绝策略
} else { // 有空闲
queue.addLast(task);
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}
}
解析:
任务队列:
// 阻塞队列
class BlockQueue<T> {
// 任务队列
private Deque<T> queue = new ArrayDeque<>();
// 锁
private ReentrantLock lock = new ReentrantLock();
// 满了等待,生产者
private Condition fullWaitSet = lock.newCondition();
// 空的等待,消费者
private Condition emptyWaitSet = lock.newCondition();
// 容量
private int capacity;
public BlockQueue(int capacity) {
this.capacity = capacity;
}
// 阻塞队列中获取任务
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
try {
emptyWaitSet.await(); // 进入等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal(); // 唤醒
return t;
} finally {
lock.unlock();
}
}
// 阻塞队列中添加任务
public void put(T t) {
lock.lock();
try {
while (queue.size() == capacity) { // 如果满了,进入等待
try {
System.out.println("等待加入任务队列" + t);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("加入任务队列" + t);
queue.addLast(t);
emptyWaitSet.signal(); // 唤醒
}finally {
lock.unlock();
}
}
public int size() {
lock.lock();
try {
return queue.size();
}finally {
lock.unlock(); // 就算return也会执行
}
}
// 带超时时间的获取,无需永久的等待了
public T poll (long timeout, TimeUnit unit) {
lock.lock();
try {
long nanos = unit.toNanos(timeout); // 时间转换为ns
while (queue.isEmpty()) {
try {
if (nanos <= 0) return null; // 超时了,直接返回吧
nanos = emptyWaitSet.awaitNanos(nanos);// 进入等待,超时不再等待,返回结果为剩余等待时间
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal(); // 唤醒
return t;
} finally {
lock.unlock();
}
}
// 带超时时间的添加, return 添加成功 or 失败
public boolean offer(T task, long timeout, TimeUnit timeUnit) {
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capacity) { // 如果满了,进入等待
try {
System.out.println("等待加入任务队列" + task);
if (nanos <= 0) return false;
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("加入任务队列" + task);
queue.addLast(task);
emptyWaitSet.signal(); // 唤醒
return true;
}finally {
lock.unlock();
}
}
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
// 判断队列是否已满
if (queue.size() == capacity) { // 有空闲
rejectPolicy.reject(this, task); // 拒绝策略
} else { // 有空闲
queue.addLast(task);
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}
}
-
ArrayDeque
作为底层数据结构存储队列元素。 -
ReentrantLock
实现了线程安全。 Condition
来实现阻塞等待机制,当队列为空时,消费者线程等待;当队列满时,生产者线程等待。- 常规的入队
put()
、出队take()
操作。 - 带有超时的入队
offer()
和出队poll()
操作。 tryPut()
方法,该方法接受一个RejectPolicy
接口,用于指定当队列已满时的拒绝策略
方法:
take()
: 当队列为空时,消费者线程调用该方法将进入等待状态,直到队列中有元素可取。put(T t)
: 当队列已满时,生产者线程调用该方法将进入等待状态,直到队列有空位可添加元素。poll(long timeout, TimeUnit unit)
: 带有超时的出队操作,当队列为空时,会等待一段时间,如果在指定时间内仍未有元素可取,则返回 null。offer(T task, long timeout, TimeUnit timeUnit)
: 带有超时的入队操作,当队列已满时,会等待一段时间,如果在指定时间内仍未有空位可添加元素,则返回 false。tryPut(RejectPolicy<T> rejectPolicy, T task)
: 尝试添加元素,当队列已满时,根据拒绝策略RejectPolicy
进行处理。
单看其实就是一个生产者消费者模式而已。
线程池类:
class ThreadPool {
// 任务队列
private BlockQueue<Runnable> taskQueue;
// 线程集合
private HashSet<Worker> workers = new HashSet();
// 线程数
private int coreSize;
private long timeout;
private TimeUnit timeUnit;
private RejectPolicy<Runnable> rejectPolicy;
// 构造方法
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueSize, RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockQueue<>(queueSize);
this.rejectPolicy = rejectPolicy;
}
public void execute(Runnable task) {
// 当任务数没有超过核心数时,直接交给woker对象执行
// 如果超过,放入任务队列中存起来
synchronized (workers) { // workers不安全,把他锁起来
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
System.out.println("新增worker");
workers.add(worker); // 加入线程集合
worker.start();
} else {
// taskQueue.put(task); // 任务添加进入
// 1.死等
// 2.带超时时间等待
// 3.放弃任务
// 4.抛出异常
// 5.让调用者自己执行
taskQueue.tryPut(rejectPolicy, task);
}
}
}
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
// 当task任务不为空,执行
// 当任务为空,去任务队列中去取
// while (task != null || (task = taskQueue.take()) != null) 一直等待获取
while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
try {
System.out.println("正在执行" + task);
task.run();
} catch (Exception e) {
} finally {
task = null;
}
}
synchronized (workers) {
System.out.println("worker被移除" + this);
workers.remove(this); // 移除当前集合对象
}
}
}
}
BlockQueue<Runnable>
来存储待执行的任务。HashSet<Worker>
来存储线程集合。- 提供构造方法来初始化线程池的核心线程数、超时时间、任务队列大小和拒绝策略。
execute(Runnable task)
方法来提交任务到线程池中执行。- 内部定义了
Worker
内部类,用于执行任务的线程。
方法:
execute(Runnable task)
: 提交任务到线程池中执行。如果当前线程数小于核心线程数,则直接创建新的Worker
线程执行任务;如果当前线程数已达到核心线程数,则尝试将任务放入任务队列中,根据拒绝策略rejectPolicy
进行处理。Worker
: 内部类实现了线程执行任务的逻辑。在run()
方法中,线程会不断从任务队列中取出任务执行,如果队列为空则会等待一段时间,超时时间由timeout
和timeUnit
决定。
拒绝策略:
函数式接口,由使用者提供实现。
// 拒接策略
@FunctionalInterface
interface RejectPolicy<T> {
void reject(BlockQueue queue, T task) ;
}
```java
public class MyThreadPool {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2, 1000,TimeUnit.MICROSECONDS, 10, (queue, task) -> {
// 1.死等
queue.put(task);
// 2.带超时时间等待加入等待队列
// queue.offer(task, 500, TimeUnit.MICROSECONDS);
// 3.放弃任务
// 队列满了,没做人任何事情
// 4.抛出异常
// throw new RuntimeException("任务执行失败" + task);
// 5.让调用者自己执行
// task.run();
});
for (int i = 0; i < 15; i++) {
int j = i;
threadPool.execute(() -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(j);
});
}
}
}
几种拒绝策略实现:
-
死等(Blocking): 当任务队列已满时,线程池会一直等待直到有空位。这里使用了
queue.put(task)
,该方法会阻塞当前线程直到队列有空位可用。 -
带超时时间等待(Timeout Blocking): 当任务队列已满时,线程池会等待一段时间,如果在指定时间内仍未有空位可用,则放弃当前任务。这里使用了
queue.offer(task, 500, TimeUnit.MICROSECONDS)
,该方法会在指定时间内等待,如果超时则返回 false。 -
放弃任务(Discard): 当任务队列已满时,线程池会放弃当前任务,不做任何处理。
-
抛出异常(Throw Exception): 当任务队列已满时,线程池会抛出异常,通知调用者任务执行失败。
-
让调用者自己执行(Caller Runs): 当任务队列已满时,不在线程池内执行任务,而是由调用者自己执行任务。