引出线程池:
假设小编是一个女生,小编处了一个对象,但是某一天小编不想和这个男生处对象了,但是小编还是想和别的男生处对象的,于是现在我就面临两个问题:一、怎么跟现任分手 二、开始物色新的对象,培养感情,这两个操作都非常耗时。那怎么可以提高效率一点呢?如果我既跟现任谈恋爱,又同时和其他的小哥哥暧mei,这样再我跟现任分手的同一时刻,就可以和其他的小哥哥在一起,如果这样的小哥哥多的话,就构成了一个“备胎池”。(上述纯属虚构,不适用于生活)
线程池最大的好处就是可以减小开启、销毁线程的损耗。
标准库中的线程池
标准库提供了类,ThreadPoolExecutor(构造方法,参数有很多)
这里主要讲解上图中的第四个构造方法
- corePoolSize : 线程池中最基本的线程数量(类似于公司的正式员工,一旦录用,不辞退)
- maximumPoolSize : 线程池中最大的线程数量(正式员工 + 临时员工,临时员工,一段时间不干活,会被辞退)
- keepAliveTime : 运行空闲时间(临时员工,允许空间的时间,一旦时间达到就会被辞退)
- unit :keepAliveTime 的时间单位,是秒,分钟,还是其他值
- workQueue:传递任务的阻塞队列
- threadFactory:创建线程的工厂,参与具体的创建线程工作。通过不同线程工厂创建出来的线程相当于对一些属性进行了不同的初始化设置。
- RejectedExecutionHandler:拒绝策略,如果任务量超出负荷接下来怎么处理
🍉. AbortPolicy():超过符合,直接抛出异常。
🍉. CallerRunsPolicy():调用者负责处理多出来的任务
🍉. DiscardOldestPolicy():丢弃队列中最老的任务
🍉. DiscardPolicy:丢弃新来的任务
小结:在 Java 标准库线程池中,把池中的线程分为两类
1)核心线程数量(线程池中最少的线程数量)
2)非核心线程数量(线程扩容过程中,新增的)
最大线程数 = 核心线程数 + 非核心线程数
❗:非核心线程数的创建是在 核心线程已全部跑起来,并且此时阻塞队列(workQueue)也被占满,此时会创建非核心线程。
实现线程池
Java标准库中实现 线程池 的用法,这里就不写了,本次主要是实现自己自定义的线程。
- 使用 MyBlockingQueue 组织所有的任务
- 使用 MyThreadPoolExecutor 来创建线程池
🍉 创建阻塞队列
class MyBlockingQueue {
public static Runnable[] arr=null;
public static volatile int date=0;//指向队列尾
public static volatile int head=0;//指向队列头
public static volatile int size=0;//队列长度设置
public static Object locker=new Object();
public MyBlockingQueue(int capacity) {
arr=new Runnable[capacity];
}
public void put(Runnable x) throws InterruptedException {
synchronized (locker){
while(size==arr.length){
locker.wait();
}
arr[date++]=x;
size++;
if(date>=arr.length){
date=0;//如果达到了队尾,指向队首
}
locker.notify();
}
}
public Runnable take() throws InterruptedException {
Runnable ret=null;
synchronized (locker){
while(size==0){
locker.wait();
}
ret=arr[head++];
size--;
if(head>=arr.length){
head=0;
}
locker.notify();
}
return ret;
}
public int getsize(){
return size;
}
}
🍉 使用 MyThreadPoolExecutor 来创建线程池
class MyThreadPoolExecutor{
private MyBlockingQueue queue=new MyBlockingQueue(10);
private Thread[] threads=null;
private volatile boolean isQuit=false;
public MyThreadPoolExecutor(int capacity) {
threads=new Thread[capacity];
//根据capacity的值来创建线程的个数
for (int i = 0; i < capacity; i++) {
threads[i]=new Thread(()->{
//每个线程不停的从队列中获取任务
while(!isQuit){
try {
Runnable run = queue.take();
run.run();
} catch (InterruptedException e) {
// 线程被中断,退出循环
break;
}
}
});
threads[i].start();
}
}
//递交方法
public void submit(Runnable run){
try {
queue.put(run);//任务在这里被接受
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// 停止所有的 worker线程 这个只在线程池要关闭的时候才会调用
private void stopAllThread() {
for (Thread worker : threads) {
worker.stop(); // 调用 worker 的 stop 方法 让正在执行 worker 当中 run 方法的线程停止执行
}
}
public void shutDown() {
// 等待任务队列当中的任务执行完成
while (queue.getsize() != 0) {
// 如果队列当中还有任务 则让出 CPU 的使用权
Thread.yield();
}
// 在所有的任务都被执行完成之后 停止所有线程的执行
stopAllThread();
}
}
注意上述使线程池中线程停止执行的方法( shutDown() 方法 )的抒写~~
🍉 代码汇总
//创建线程池
class MyThreadPoolExecutor{
private MyBlockingQueue queue=new MyBlockingQueue(10);
private Thread[] threads=null;
private volatile boolean isQuit=false;
public MyThreadPoolExecutor(int capacity) {
threads=new Thread[capacity];
//根据capacity的值来创建线程的个数
for (int i = 0; i < capacity; i++) {
threads[i]=new Thread(()->{
//每个线程不停的从队列中获取任务
while(!isQuit){
try {
Runnable run = queue.take();
run.run();
} catch (InterruptedException e) {
// 线程被中断,退出循环
break;
}
}
});
threads[i].start();
}
}
//递交方法
public void submit(Runnable run){
try {
queue.put(run);//任务在这里被接受
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// 停止所有的 worker线程 这个只在线程池要关闭的时候才会调用
private void stopAllThread() {
for (Thread worker : threads) {
worker.stop(); // 调用 worker 的 stop 方法 让正在执行 worker 当中 run 方法的线程停止执行
}
}
public void shutDown() {
// 等待任务队列当中的任务执行完成
while (queue.getsize() != 0) {
// 如果队列当中还有任务 则让出 CPU 的使用权
Thread.yield();
}
// 在所有的任务都被执行完成之后 停止所有线程的执行
stopAllThread();
}
}
//阻塞队列使用数组来实现
class MyBlockingQueue {
public static Runnable[] arr=null;
public static volatile int date=0;//指向队列尾
public static volatile int head=0;//指向队列头
public static volatile int size=0;//队列长度设置
public static Object locker=new Object();
public MyBlockingQueue(int capacity) {
arr=new Runnable[capacity];
}
public void put(Runnable x) throws InterruptedException {
synchronized (locker){
while(size==arr.length){
locker.wait();
}
arr[date++]=x;
size++;
if(date>=arr.length){
date=0;//如果达到了队尾,指向队首
}
locker.notify();
}
}
public Runnable take() throws InterruptedException {
Runnable ret=null;
synchronized (locker){
while(size==0){
locker.wait();
}
ret=arr[head++];
size--;
if(head>=arr.length){
head=0;
}
locker.notify();
}
return ret;
}
public int getsize(){
return size;
}
}
//入口点
public class Test {
public static void main(String[] args) throws InterruptedException {
MyThreadPoolExecutor pool=new MyThreadPoolExecutor(4);
for (int i = 0; i < 1000; i++) {
int id=i;
//传递的一个一个的任务
pool.submit(()->{
System.out.println(Thread.currentThread().getName() +"执行了"+ id);
});
}
pool.shutDown();
}
}
上述代码,核心操作为 submit 将任务加入到线程池中。
🚩文化篇:若汝志在辉煌,山不加阻,海不设限。