从源码全面解析Java 线程池的来龙去脉

news2024/11/24 14:23:58
  • 👏作者简介:大家好,我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,阿里云专家博主
  • 📕系列专栏:Java设计模式、Spring源码系列、Netty源码系列、Kafka源码系列、JUC源码系列
  • 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
  • 🍂博主正在努力完成2023计划中:以梦为马,扬帆起航,2023追梦人
  • 📝联系方式:hls1793929520,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬👀

在这里插入图片描述

文章目录

  • 线程池源码剖析
    • 一、引言
    • 二、使用
    • 三、源码
      • 1、初始化
        • 1.1 拒绝策略
          • 1.1.1 AbortPolicy
          • 1.1.2 CallerRunsPolicy
          • 1.1.3 DiscardOldestPolicy
          • 1.1.4 DiscardPolicy
          • 1.1.5 自定义拒绝策略
        • 1.2 其余变量
      • 2、线程池的execute方法
      • 3、线程池的addWorker方法
        • 3.1 校验
        • 3.2 添加线程
      • 4、线程池的 worker 源码
      • 5、线程池的 runWorker 方法
      • 6、线程池的 getTask 方法
      • 7、线程池的 processWorkerExit 方法
      • 8、线程池的关闭方法
        • 8.1 shutdownNow 方法
        • 8.2 shutdown 方法
    • 四、流程图
    • 五、总结

线程池源码剖析

一、引言

线程池技术在互联网技术使用如此广泛,几乎所有的后端技术面试官都要在线程池技术的使用和原理方面对小伙伴们进行 360° 的刁难。

作为一个在互联网公司面一次拿一次 Offer 的面霸,打败了无数竞争对手,每次都只能看到无数落寞的身影失望的离开,略感愧疚(请允许我使用一下夸张的修辞手法)。

于是在一个寂寞难耐的夜晚,暖男我痛定思痛,决定开始写 《吊打面试官》 系列,希望能帮助各位读者以后面试势如破竹,对面试官进行 360° 的反击,吊打问你的面试官,让一同面试的同僚瞠目结舌,疯狂收割大厂 Offer!

虽然现在是互联网寒冬,但乾坤未定,你我皆是黑马

二、使用

我想大部分人应该都使用过线程池,我们的 JDK 中也提供了一些包装好的线程池使用,比如:

  • newFixedThreadPool:返回一个核心线程数为 nThreads 的线程池

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    }
    
  • newSingleThreadExecutor:返回一个核心线程数为 1 的线程池

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
    }
    
  • newCachedThreadPool:大同小异

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
    }
    

通过上面 JDK 提供的我们可以发现一个共识,他们其实都是调用了 ThreadPoolExecutor 的构造方法来进行线程池的创建

这时候,我们不免有疑问,我们难道不可以直接使用 ThreadPoolExecutor 的构造方法去进行创建嘛

是的,阿里巴巴Java开发手册中明确指出,『不允许』使用Executors创建线程池

所以,我们在生产中,一般使用 ThreadPoolExecutor 的构造方法自定义去创建线程池,比如:

public class ThreadPoolTest {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,      // 核心线程数
                5,  // 最大线程数
                200,   // 非核心工作线程在阻塞队列位置等待的时间
                TimeUnit.SECONDS,  // 非核心工作线程在阻塞队列位置等待的单位
                new LinkedBlockingQueue<>(), // 阻塞队列,存放任务的地方
                new ThreadPoolExecutor.AbortPolicy() // 拒绝策略:这里有四种
        );

        for (int i = 0; i < 10; i++) {
            MyTask task = new MyTask();
            executor.execute(task);
        }

        // 关闭线程
        executor.shutdown();

    }
}

class MyTask implements Runnable {
    @Override
    public void run() {
        System.out.println("我被执行了....");
    }
}

三、源码

整体的流程如下:

image-20230505235845777

1、初始化

聊源码不从初始化聊的,都是不讲道理的

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

// 执行ThreadPoolExecutor的构造方法进行初始化
// corePoolSize: 核心线程数
// maximumPoolSize: 最大线程数
// keepAliveTime: 非核心工作线程在阻塞队列位置等待的时间
// unit: 非核心工作线程在阻塞队列位置等待的时间单位
// workQueue: 存放任务的阻塞队列
// threadFactory: 线程工厂(生产线程的地方)
// RejectedExecutionHandler: 拒绝策略
public ThreadPoolExecutor(int corePoolSize, 
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    // 核心线程数可以为0
    // 最大线程数不为0
    // 最大线程数 大于 核心线程数
    // 等待时间大于等于0
    if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    // 将当前的入参赋值给成员变量
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

我们上面初始化的过程主要对入参做了一些校验,然后将方法的入参赋予给成员变量

1.1 拒绝策略

1.1.1 AbortPolicy

简单粗暴,直接抛出异常

public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
    }
}
1.1.2 CallerRunsPolicy

当前拒绝策略会在线程池无法处理任务时,将任务交给调用者处理

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 如果当前的
        if (!e.isShutdown()) {
            r.run();
        }
    }
}
1.1.3 DiscardOldestPolicy

如果当前的阻塞队列满了,弹出时间最久的

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            // 获取阻塞队列,弹出一个时间最久的
            e.getQueue().poll();
            // 执行当前的
            e.execute(r);
        }
    }
}
1.1.4 DiscardPolicy

简单粗暴,不做任何操作

public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}
1.1.5 自定义拒绝策略

自己写的业务逻辑,可以将拒绝的任务放至数据库等存储,等后续在执行

public static class MyRejectedExecution implements RejectedExecutionHandler{

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("这是我自己的拒绝策略");
    }
}

1.2 其余变量

// 该数值代表两个意思:
// 高3位表示当前线程池的状态
// 低29位表示当前线程池工作线程的个数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//  COUNT_BITS = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// CAPACITY就是当前工作线程能记录的工作线程的最大个数
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 111:代表RUNNING状态,RUNNING可以处理任务,并且处理阻塞队列中的任务。
private static final int RUNNING    = -1 << COUNT_BITS;
// 000:代表SHUTDOWN状态,不会接收新任务,正在处理的任务正常进行,阻塞队列的任务也会做完。
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 001:代表STOP状态,不会接收新任务,正在处理任务的线程会被中断,阻塞队列的任务一个不管。
private static final int STOP       =  1 << COUNT_BITS;
// 010:代表TIDYING状态,这个状态是否SHUTDOWN或者STOP转换过来的,代表当前线程池马上关闭,就是过渡状态。
private static final int TIDYING    =  2 << COUNT_BITS;
// 011:代表TERMINATED状态,这个状态是TIDYING状态转换过来的,转换过来只需要执行一个terminated方法。
private static final int TERMINATED =  3 << COUNT_BITS;

// 基于&运算的特点,保证只会拿到ctl高三位的值
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 基于&运算的特点,保证只会拿到ctl低29位的值
private static int workerCountOf(int c)  { return c & CAPACITY; }

线程池的状态变化流程图:

在这里插入图片描述

2、线程池的execute方法

  • Step1:当前的线程池个数低于核心线程数,直接添加核心线程即可
  • Step2:当前的线程池个数大于核心线程数,将任务添加至阻塞队列中
  • Step3:如果添加阻塞队列失败,则需要添加非核心线程数处理任务
  • Step4:如果添加非核心线程数失败(满了),执行拒绝策略
public void execute(Runnable command) {
    // 如果当前传过来的任务是null,直接抛出异常即可
    if (command == null)
        throw new NullPointerException();
    // 获取当前的数据值
    int c = ctl.get();
    
//==========================线程池第一阶段:启动核心线程数开始==================================================
    // Step1:获取ctl低29位的数值,与我们的核心线程数相比
    if (workerCountOf(c) < corePoolSize) {
        // Step2:添加一个核心线程
        if (addWorker(command, true)){
            return;
        }
        // 更新一下当前值
        c = ctl.get();
    }
//==========================线程池第一阶段:启动核心线程数结束==================================================
    
    // 如果走到下面会有两种情况:
    // 1、核心线程数满了,需要往阻塞队列里面扔任务
    // 2、核心线程数满了,阻塞队列也满了,执行拒绝策略
    
//==========================线程池第二阶段:任务放至阻塞队列开始==================================================
    // 判断当前的状态是不是Running的状态(RUNNING可以处理任务,并且处理阻塞队列中的任务)
    // 如果是Running的状态,则可以将任务放至阻塞队列中
    // 这里如果放阻塞队列失败了,证明阻塞队列满了
    if (isRunning(c) && workQueue.offer(command)) {
        // 再次更新数值
        int recheck = ctl.get();
        // 再次校验当前的线程池状态是不是Running
        // 如果线程池状态不是Running的话,需要删除掉刚刚放的任务
        if (!isRunning(recheck) && remove(command)){
            // 执行拒绝策略
            reject(command);
        } 
        // 如果到这里,说明上面阻塞队列中已经有数据了
        // 如果线程池的个数为0的话,需要创建一个非核心工作线程去执行该任务
        // 不能让人家堵塞着
        else if (workerCountOf(recheck) == 0){
            addWorker(null, false);
        }
    }
//==========================线程池第二阶段:任务放至阻塞队列结束==================================================

    // 如果走到这里的逻辑,证明上面的逻辑没走通,有以下两种情况:
    // 1、线程池的状态不是Running
    //    1.1 如果是这种情况,下面的添加非核心工作线程失败执行拒绝策略,但这个并不是这个逻辑的重点
    // 2、阻塞队列添加任务失败(阻塞队列满了)
    //    2.1 这种情况才是我们需要关心的
    //    2.2 阻塞队列满了,添加非核心工作线程
    //    2.3 若添加非核心工作线程失败,证明已经到达maximumPoolSize的限制,执行拒绝策略
//==========================线程池第三阶段:启动非核心线程数开始==================================================
    // 添加一个非核心工作线程
    else if (!addWorker(command, false))
        // 工作队列中添加任务失败,执行拒绝策略
        reject(command);
//==========================线程池第三阶段:启动非核心线程数结束==================================================
}

流程图如下:

在这里插入图片描述

3、线程池的addWorker方法

3.1 校验

  • 校验当前线程池的状态
  • 校验当前线程池工作线程的个数(核心线程数、最大工作线程数)
private boolean addWorker(Runnable firstTask, boolean core) {
    // 这里主要是为了结束整个循环
    retry:
    for (;;) {
        // 获取当前线程池的数值(ctl)
        int c = ctl.get();
        
        // runStateOf:基于&运算的特点,保证只会拿到ctl高三位的值
        int rs = runStateOf(c);

//==========================线程池状态判断=============================================================
        // rs >= SHUTDOWN:代表当前线程池状态为:SHUTDOWN、STOP、TIDYING、TERMINATED,线程池状态异常
        // 但这里SHUTDOWN状态稍许不同(不会接收新任务,正在处理的任务正常进行,阻塞队列的任务也会做完)
        // 如果当前的状态是SHUTDOWN状态并且阻塞队列任务不为空且新任务为空
        // 需要新起一个非核心工作线程去执行任务
        // 如果不是前面的,直接返回false即可
        if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())){
            return false;
        }

//==========================工作线程个数判断==========================================================
        for (;;) {
            // 获取当前线程池中线程的个数
            int wc = workerCountOf(c);
            // 1、如果线程池线程的个数是否超过了工作线程的最大个数
            // 2、core=true(核心线程)=false(工作线程)
            // 2.1 根据当前core判断创建的是核心线程数(corePoolSize)还是非核心线程数(maximumPoolSize)
            if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)){
                return false;
            }
            // 尝试将线程池线程加一
            if (compareAndIncrementWorkerCount(c)){
                // CAS成功后,直接退出外层循环,代表可以执行添加工作线程操作了。
                break retry;
            }
            
            // 获取当前线程池的数值(ctl)
            c = ctl.get(); 
            // 获取当前线程池的状态
            // 判断当前线程池的状态等不等于我们上面的rs
            // 我们线程池的状态被人更改了,需要重新跑整个for循环判断逻辑
            if (runStateOf(c) != rs){
                continue retry;
            }
        }
    }
    // 省略下面的代码
}

3.2 添加线程

{
    // 省略校验的步骤
    
    // 两个标记
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 将当前的任务封装成Worker
        w = new Worker(firstTask);
        // 拿到当前Worker的线程
        final Thread t = w.thread;
        // 线程不为空
        if (t != null) {
            // 上锁,保证线程安全(workers、largestPoolSize)
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                
                // 获取线程池的状态
                int rs = runStateOf(ctl.get());

                // 1、rs < SHUTDOWN:保证当前线程池的状态一定是RUNNING状态
                // 2、rs == SHUTDOWN && firstTask == null:如果当前线程池是SHUTDOWN状态且新任务为空
                if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                    // 判断线程是否还活着
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    // private final HashSet<Worker> workers = new HashSet<Worker>();
                    // 添加到我们的work队列中
                    workers.add(w);
                    // 获取works的大小
                    int s = workers.size();
                    // largestPoolSize在记录最大线程个数的记录
                    // 如果当前的大小比最大的还要打,替换即可
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // worker添加成功
                    workerAdded = true;
                }
            } finally {
                // 解锁
                mainLock.unlock();
            }
            // 如果添加成功
            if (workerAdded) {
                // 启动线程
                t.start();
                // 线程启动标志位
                workerStarted = true;
            }
        }
    } finally {
        // 如果线程没有启动成功,从workers集合中删除掉该worker
        if (!workerStarted)
            addWorkerFailed(w);
    }
    // 返回线程是否启动成功
    return workerStarted;
}

// Worker的初始化
Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    // 从线程工厂里面拿一个线程出来
    this.thread = getThreadFactory().newThread(this);
}

// 从workers集合中删除掉该worker
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            workers.remove(w);
        decrementWorkerCount();
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

4、线程池的 worker 源码

// Worker继承了AQS,目的就是为了控制工作线程的中断。
// Worker实现了Runnable,内部的Thread对象,在执行start时,必然要执行Worker中断额一些操作
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    
    private static final long serialVersionUID = 6138294804551838833L;
    // 当前线程工厂创建的线程(也是执行任务使用的线程)
    final Thread thread;
    
    // 当前的第一个任务
    Runnable firstTask;
  
    // 记录执行了多少个任务
    volatile long completedTasks;

    // 构造方法
    Worker(Runnable firstTask) {
        // 将State设置为-1,代表当前不允许中断线程
        setState(-1); 
        // 设置任务
        this.firstTask = firstTask;
        // 设置线程
        this.thread = getThreadFactory().newThread(this);
    }

    // 线程启动执行的方法
    public void run() {
        runWorker(this);
    }
    
    // =======================Worker管理中断================================   
    // 当前方法是中断工作线程时,执行的方法
    void interruptIfStarted() {
        Thread t;
        // 只有Worker中的state >= 0的时候,可以中断工作线程
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                // 如果状态正常,并且线程未中断,这边就中断线程
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    } 
}

5、线程池的 runWorker 方法

public void run() {
    runWorker(this);
}

final void runWorker(Worker w) {
    // 拿到当前的线程
    Thread wt = Thread.currentThread();
    // 拿到当前Worker的第一个任务(如果携带的话)
    Runnable task = w.firstTask;
    // 置空
    w.firstTask = null;
    // 解锁
    w.unlock(); 
    boolean completedAbruptly = true;
    try {
        // 如果任务不等于空 或者 从阻塞队列中拿到的任务不等于空
        while (task != null || (task = getTask()) != null) {
            // 加锁
            w.lock();
            // 如果线程池状态 >= STOP,确保线程中断了
            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 在任务执行前做一些操作,自己实现的钩子
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 任务执行
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 任务执行后一些操作:自己实现的钩子
                    afterExecute(task, thrown);
                }
            } finally {
                // 任务置空
                task = null;
                // 执行任务+1
                w.completedTasks++;
                // 解锁
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 删除线程的方法
        processWorkerExit(w, completedAbruptly);
    }
}

6、线程池的 getTask 方法

private Runnable getTask() {
    // 超时的标记
    boolean timedOut = false; 

    // 死循环拿数据
    for (;;) {
        // 拿到当前的ctl
        int c = ctl.get();
        // 获取其线程池状态
        int rs = runStateOf(c);

        // 如果线程池状态是STOP,没有必要处理阻塞队列任务,直接返回null
        // 如果线程池状态是SHUTDOWN,并且阻塞队列是空的,直接返回null
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // 线程池中的线程个数减一
            decrementWorkerCount();
            return null;
        }

        // 当前线程池中线程个数
        int wc = workerCountOf(c);

        // 这里是个重点
        // allowCoreThreadTimeOut:是否允许核心线程数超时(开启这个之后),核心线程数也会执行下面超时的逻辑
        // wc > corePoolSize:当前线程池中的线程个数大于核心线程数
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // wc > maximumPoolSize:基本不存在
        // timed && timedOut:第一次肯定是失败的(超时标记为false)
        if ((wc > maximumPoolSize || (timed && timedOut))
            // 1、线程个数为1
            // 2、阻塞队列是空的
            && (wc > 1 || workQueue.isEmpty())) {
            // 线程池的线程个数减一
            if (compareAndDecrementWorkerCount(c)){
                return null;
            }
            continue;
        }

        try {
            // 根据我们前面的timed的值(当前线程池中的线程个数是否大于核心线程数)
            // 如果大于,执行workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)带有时间的等待,超过时间无任务,会返回null
            // 如果小于,执行workQueue.take(),死等任务,不会返回null
            Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
            if (r != null){
                return r;
            }
            // 到这里,说明上面的等待超时了
            // 这里要注意一下,如果这里超时后,我们上面 if ((wc > maximumPoolSize || (timed && timedOut)) 这个判断要起作用了
        	// (timed && timedOut) true
            // wc > 1 || workQueue.isEmpty():当线程大于1或者阻塞队列无数据,直接返回null,让外部循环删除
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

7、线程池的 processWorkerExit 方法

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 正常退出的:completedAbruptly=false
    // 不是正常退出的:completedAbruptly=true
    if (completedAbruptly) 
        decrementWorkerCount();

    // 加锁——上锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 将当前worker的执行任务数累加到线程池中
        completedTaskCount += w.completedTasks;
        // 线程池删除该工作线程
        workers.remove(w);
    } finally {
        // 解锁
        mainLock.unlock();
    }

    tryTerminate();

    // 获取ctl的数据
    int c = ctl.get();
    // 这里只有SHUTDOWN、RUNNING会进入判断
    if (runStateLessThan(c, STOP)) {
        // 正常退出的
        if (!completedAbruptly) {
            // 是否允许超时
            // 允许:0
            // 不允许:核心线程数
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 如果min=0并且阻塞队列不为空
            // 将min设置成1
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            // 当前线程池的大小大于最小值,直接返回即可
            if (workerCountOf(c) >= min){
                return;
            }
        }
        // 如果没有的话,说明线程池中没有线程了,并且还有阻塞任务
        // 只能添加一个非核心线程去处理这些任务
        addWorker(null, false);
    }
}

8、线程池的关闭方法

8.1 shutdownNow 方法

  • 将线程池状态修改为Stop(不会接收新任务,正在处理任务的线程会被中断,阻塞队列的任务一个不管)
  • 将线程池中的线程全部中断
  • 删除当前线程池所有的工作线程
  • 将线程池的状态从:Stop --> TIDYING --> TERMINATED,正式标记线程池的结束(唤醒一下等待的主线程)
public List<Runnable> shutdownNow() {
    // 声明返回结果
    List<Runnable> tasks;
    // 加锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 将线程池状态修改为STOP
        advanceRunState(STOP);
        // 将线程池中的线程全部中断
        interruptWorkers();
        // 删除当前所有的工作线程
        tasks = drainQueue();
    } finally {
        // 解锁
        mainLock.unlock();
    }
    // 查看当前线程池是否可以变为TERMINATED状态
    // 从 Stop 状态修改为 TIDYING,在修改为 TERMINATED
    tryTerminate();
    return tasks;
}

// targetState = STOP
// 作用:将当前线程池的状态修改为Stop
private void advanceRunState(int targetState) {
    // 进来直接死循环
    for (;;) {
        // 拿到当前的ctl
        int c = ctl.get();
        // runStateAtLeast(c, targetState):当前的c是不是大于STOP(如果大于Stop的话,说明线程池状态已经G了
        // 基于CAS,将ctl从c修改为Stop状态,不修改工作线程个数,仅仅将状态修改为Stop
        // 如果可以修改成功,直接退出即可
        if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}

// 将线程池中的线程全部中断
private void interruptWorkers() {
    // 加锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 循环遍历线程组
        for (Worker w : workers)
            // 中断线程
           	// 这里会给线程打一个中断的标记,具体什么时候中断线程,需要我们自己去控制
            w.interruptIfStarted();
    } finally {
        // 解锁
        mainLock.unlock();
    }
}

// 删除当前所有的工作线程
private List<Runnable> drainQueue() {
    // 存放工作线程的队列
    BlockingQueue<Runnable> q = workQueue;
    // 返回的结果
    ArrayList<Runnable> taskList = new ArrayList<Runnable>();
    // 清空阻塞队列并将数据放入taskList中
    q.drainTo(taskList);
    // 校验当前的数据是够真的清空
    if (!q.isEmpty()) {
        // 如果确实有遗漏的,毕竟这哥们也没上锁
        // 手动的将线程从workQueue删除掉并且放到taskList中
        for (Runnable r : q.toArray(new Runnable[0])) {
            if (q.remove(r))
                taskList.add(r);
        }
    }
    // 最终返回即可
    return taskList;
}

final void tryTerminate() {
    for (;;) {
        // 拿到ctl
        int c = ctl.get();
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // CAS将当前的ctl设置成TIDYING
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // 该方法是一个钩子函数,我们自己定义,在线程池销毁之前做最后的处理
                    terminated();
                } finally {
                    // 将ctl设置成TERMINATED标志着线程池的正式结束
                    ctl.set(ctlOf(TERMINATED, 0));
                    // 线程池提供了一个方法,主线程在提交任务到线程池后,是可以继续做其他操作的。
                    // 咱们也可以让主线程提交任务后,等待线程池处理完毕,再做后续操作
                    // 这里线程池凉凉后,要唤醒哪些调用了awaitTermination方法的线程
                    // 简单来说,当时等待线程池返回的主线程,由于线程池已经销毁了,他们也必须要唤醒
                    termination.signalAll();
                }
                return;
            }
        } finally {
            // 解锁
            mainLock.unlock();
        }
    }
}

8.2 shutdown 方法

public void shutdown() {
    // 加锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 将线程池状态修改为SHUTDOWN
        advanceRunState(SHUTDOWN);
        // 将线程池中的线程全部中断
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    // 查看当前线程池是否可以变为TERMINATED状态
    // 从 SHUTDOWN 状态修改为 TIDYING,在修改为 TERMINATED
    tryTerminate();
}

四、流程图

在这里插入图片描述

五、总结

鲁迅先生曾说:独行难,众行易,和志同道合的人一起进步。彼此毫无保留的分享经验,才是对抗互联网寒冬的最佳选择。

其实很多时候,并不是我们不够努力,很可能就是自己努力的方向不对,如果有一个人能稍微指点你一下,你真的可能会少走几年弯路。

如果你也对 后端架构和中间件源码 有兴趣,欢迎添加博主微信:hls1793929520,一起学习,一起成长

我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,喜欢后端架构和中间件源码。

我们下期再见。

我从清晨走过,也拥抱夜晚的星辰,人生没有捷径,你我皆平凡,你好,陌生人,一起共勉。

往期文章推荐:

  • 从源码全面解析LinkedBlockingQueue的来龙去脉
  • 从源码全面解析 ArrayBlockingQueue 的来龙去脉
  • 从源码全面解析ReentrantLock的来龙去脉
  • 阅读完synchronized和ReentrantLock的源码后,我竟发现其完全相似
  • 从源码全面解析 ThreadLocal 关键字的来龙去脉
  • 从源码全面解析 synchronized 关键字的来龙去脉
  • 阿里面试官让我讲讲volatile,我直接从HotSpot开始讲起,一套组合拳拿下面试

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/499721.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

二叉树专题

⭐️前言⭐️ 本文主要总结一些常见的二叉树题目&#xff0c;希望读者能够通过这篇文章&#xff0c;来对二叉树有一个更深一步的了解。 &#x1f349;欢迎点赞 &#x1f44d; 收藏 ⭐留言评论 &#x1f4dd;私信必回哟&#x1f601; &#x1f349;博主将持续更新学习记录收获&…

【移动端网页布局】flex 弹性布局 ⑥ ( 设置侧轴多行子元素排列方式 | align-content 样式说明 | 代码示例 )

文章目录 一、设置侧轴多行子元素排列方式 : align-content 样式说明1、align-content 样式引入2、align-content 样式属性值 二、代码示例1、代码示例 - 侧轴多行元素从上到下排列2、代码示例 - 侧轴多行元素垂直居中3、代码示例 - 侧轴多行元素平分剩余空间4、代码示例 - 侧轴…

1960-2014年各国二氧化碳排放量(人均公吨数)

1960&#xff0d;2014年各国二氧化碳排放量&#xff08;人均公吨数&#xff09;&#xff08;世界发展指标, 2019年12月更新&#xff09; 1、来源&#xff1a;世界发展指标 2、时间&#xff1a;1960&#xff0d;2014年 3、范围&#xff1a;世界各国 4、指标&#xff1a; 二氧…

C++ STL:set和map的结构及接口使用

目录 一. set和map的简介 1.1 set的简介 1.2 map的简介 二. set的主要接口函数及使用方法 2.1 构造及赋值相关接口函数 2.2 通过迭代器遍历set 2.3 结构修改相关接口函数 2.4 其他主要接口函数 三. map的主要接口函数及使用方法 3.1 构造和赋值相关接口函数 3.2 通…

Midjourney放大招,什么好看唯美高清,统统都是我的

你知道吗&#xff1f;超过99%的人都没有足够出色的肖像照来展现自己的魅力&#xff0c;除非你是那种在网上拥有成千上万张自己肖像照的大明星。但是&#xff0c;好消息来了&#xff01;现在有一个InsightFaceSwap Discord bot&#xff0c;可以帮助你实现这个“不可能完成”的想…

【JavaEE初阶】多线程进阶(五)常见锁策略 CAS synchronized优化原理

文章目录 常见锁策略乐观锁 & 悲观锁轻量级锁 & 重量级锁自旋锁 & 挂起等待锁互斥锁 & 读写锁公平锁 & 非公平锁可重入锁 & 不可重入锁synchronized对应以上的锁策略锁策略中的面试题&#xff1a; CASCAS的介绍CAS如何实现CAS的应用场景CAS的典型问题&…

Excel公式:将日期转换为月份年

Excel公式&#xff1a;将日期转换为月份年 在Excel中&#xff0c;您可以将日期单元格格式化为多种类型的日期格式&#xff0c;但是在某些情况下&#xff0c;您希望将日期转换为文本格式&#xff0c;仅转换为月&#xff0c;年&#xff0c;日或月年&#xff0c;日月或年日。 在本…

Java全栈学习路线总结,科班程序员搬砖逆袭

&#x1f307;文章目录 前言一、前置知识二、 Web前端基础示例&#xff1a;1.文本域2.密码字段 三、后端基础一. Java基础二. 数据库技术三. Web开发技术四. 框架技术五. 服务器部署 四、其他技术五、全栈开发六、综合实践七、学习教程一、前端开发二、后端开发三、数据库开发四…

VUE 学习笔记(三) Vue 渲染流程详解

在 Vue 里渲染一块内容&#xff0c;会有以下步骤及流程&#xff1a; 第一步&#xff0c;解析语法&#xff0c;生成AST 第二步&#xff0c;根据AST结果&#xff0c;完成data数据初始化 第三步&#xff0c;根据AST结果和DATA数据绑定情况&#xff0c;生成虚拟DOM 第四步&…

ESP32设备驱动-Si1145红外接近-紫外 (UV) 指数和环境光传感器驱动

Si1145红外接近-紫外 (UV) 指数和环境光传感器驱动 文章目录 Si1145红外接近-紫外 (UV) 指数和环境光传感器驱动1、Si1145介绍2、硬件准备3、软件准备4、驱动实现1、Si1145介绍 Si1145/46/47 是一款低功耗、基于反射的红外接近、紫外 (UV) 指数和环境光传感器,具有 I2C 数字接…

电脑百度网盘打不开怎么办 电脑百度网盘双击没反应处理方法

有时候我们想要在电脑浏览器上下载一些文件时&#xff0c;打开的文件下载链接有些需要通过百度网盘来存储下载&#xff0c;然而当用户在电脑中安装完百度网盘工具之后&#xff0c;双击想要打开时却总是没反应&#xff0c;对此电脑百度网盘打不开怎么办呢&#xff1f;接下来小编…

Java反射和动态代理

反射 反射允许对封装类的成员变量、成员方法和构造方法的信息进行编程访问 成员变量&#xff1a;修饰符、名字、类型、get/set值 构造方法&#xff1a;修饰符、名字、形参、创建对象 成员方法&#xff1a;修饰符、名字、形参、返回值、抛出的异常、获取注解、运行方法 获取…

【云原生进阶之PaaS中间件】第一章Redis-1.1简介

1 Redis概述 1.1 Redis 简介 Redis&#xff08;Remote Dictionary Server )&#xff0c;即远程字典服务&#xff0c;是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库&#xff0c;并提供多种语言的API&#xff0c;可用作数据库&#…

高质量毕业答辩PPT模板+PPT网站

文章目录 前言一、iSlide二、office plus三、优品PPT总结 前言 提示&#xff1a;这里可以添加本文要记录的大概内容&#xff1a; 又是一年毕业季&#xff0c;又到了快要答辩的时候&#xff0c;最近有一些小伙伴找我要毕业答辩PPT模板&#xff0c;本着这不能拒绝啊的心态&…

20230507使用python3批量转换DOCX文档为TXT

20230507使用python3批量转换DOCX文档为TXT 2023/5/7 20:22 WIN10使用python3.11 # – coding: gbk – import os from pdf2docx import Converter from win32com import client as wc """这里需要安转包pywin32com""" # 读取pdf文件文本内容 …

探秘二分查找中的数学奇迹:如何手动求解整数x的平方根

本篇博客会讲解力扣“69. x 的平方根”这道题的解题思路。这是题目链接。 大家先来审下题&#xff1a; 以及示例&#xff1a; 还有提示&#xff1a; 本题常规的思路有&#xff1a;暴力查找、转换成指数和对数、二分查找、牛顿迭代。 转换成指数和对数的方法非常简单&#…

接口自动化测试框架9项必备功能有哪些?你一定不知道

当你准备使用一个接口测试框架或者自造轮子的时候&#xff0c;或许你需要先了解下一个接口自动化测试框架必须具备什么功能。 一、校验   这个很好了解&#xff0c;如果没有校验&#xff0c;单纯的执行接口的话&#xff0c;那就谈不上测试了。所以支持对返回值校验是一个必须…

[Golang] 爬虫实战-获取动态页面数据-获取校招信息

&#x1f61a;一个不甘平凡的普通人&#xff0c;致力于为Golang社区和算法学习做出贡献&#xff0c;期待您的关注和认可&#xff0c;陪您一起学习打卡&#xff01;&#xff01;&#xff01;&#x1f618;&#x1f618;&#x1f618; &#x1f917;专栏&#xff1a;算法学习 &am…

Solr(1):Solr概述

1 概述 Solr 是一个基于 Apache Lucene 之上的搜索服务器&#xff0c;它是一个开源的、基于 Java 的信息检索库。它旨在驱动功能强大的文档检索应用程序 - 无论您需要根据用户的查询将数据服务到何处&#xff0c;Solr 都可以为您服务。Solr与应用程序的集成以为您服务。 下面…

es 7.x 通过DSL语句添加doc数据

一 在es中doc数据的crud操作 1.1 说明 本案例操作 接上一篇的基础上进行操作。 1.2 添加doc 方式为post http://localhost:9200/order_item/_doc 添加文档数据 必须是post提交&#xff0c;不能是put 1.3 查看文档数据 http://localhost:9200/order_item/_doc/_searc…