【Java 】从源码全面解析Java 线程池

news2025/1/15 23:37:11

文章目录

  • 一、引言
  • 二、使用
  • 三、源码
    • 1、初始化
      • 1.1 拒绝策略
      • 1.1.1 AbortPolicy
      • 1.1.2 CallerRunsPolicy
      • 1.1.3 DiscardOldestPolicy
      • 1.1.4 DiscardPolicy
      • 1.1.5 自定义拒绝策略
      • 1.2 其余变量
    • 2、线程池的execute方法
    • 3、线程池的addWorker方法
      • 3.1 校验
      • 3.2 添加线程
    • 4、线程池的 worker 源码
    • 5、线程池的 runWorker 方法
    • 6、线程池的 getTask 方法
    • 7、线程池的 processWorkerExit 方法
    • 8、线程池的关闭方法
      • 8.1 shutdownNow 方法
      • 8.2 shutdown 方法
  • 四、流程图
  • 五、总结

一、引言

线程池技术在互联网技术使用如此广泛,几乎所有的后端技术面试官都要在线程池技术的使用和原理方面对小伙伴们进行 360° 的刁难。

作为一个在互联网公司面一次拿一次 Offer 的面霸,打败了无数竞争对手,每次都只能看到无数落寞的身影失望的离开,略感愧疚(请允许我使用一下夸张的修辞手法)。

希望能帮助各位读者以后面试势如破竹,对面试官进行 360° 的反击,吊打问你的面试官,让一同面试的同僚瞠目结舌,疯狂收割大厂 Offer!

虽然现在是互联网寒冬,但乾坤未定,你我皆是黑马。

二、使用

我想大部分人应该都使用过线程池,我们的 JDK 中也提供了一些包装好的线程池使用,比如:

  • newFixedThreadPool:返回一个核心线程数为 nThreads 的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
  • newSingleThreadExecutor:返回一个核心线程数为 1 的线程池
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
  • newCachedThreadPool:大同小异
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

通过上面 JDK 提供的我们可以发现一个共识,他们其实都是调用了 ThreadPoolExecutor 的构造方法来进行线程池的创建。

这时候,我们不免有疑问,我们难道不可以直接使用 ThreadPoolExecutor 的构造方法去进行创建嘛

是的,阿里巴巴Java开发手册中明确指出,『不允许』使用Executors创建线程池

所以,我们在生产中,一般使用 ThreadPoolExecutor 的构造方法自定义去创建线程池,比如:

public class ThreadPoolTest {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,      // 核心线程数
                5,  // 最大线程数
                200,   // 非核心工作线程在阻塞队列位置等待的时间
                TimeUnit.SECONDS,  // 非核心工作线程在阻塞队列位置等待的单位
                new LinkedBlockingQueue<>(), // 阻塞队列,存放任务的地方
                new ThreadPoolExecutor.AbortPolicy() // 拒绝策略:这里有四种
        );

        for (int i = 0; i < 10; i++) {
            MyTask task = new MyTask();
            executor.execute(task);
        }

        // 关闭线程
        executor.shutdown();

    }
}

class MyTask implements Runnable {
    @Override
    public void run() {
        System.out.println("我被执行了....");
    }
}

三、源码

整体的流程如下:
在这里插入图片描述

1、初始化

聊源码不从初始化聊的,都是不讲道理的

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

// 执行ThreadPoolExecutor的构造方法进行初始化
// corePoolSize: 核心线程数
// maximumPoolSize: 最大线程数
// keepAliveTime: 非核心工作线程在阻塞队列位置等待的时间
// unit: 非核心工作线程在阻塞队列位置等待的时间单位
// workQueue: 存放任务的阻塞队列
// threadFactory: 线程工厂(生产线程的地方)
// RejectedExecutionHandler: 拒绝策略
public ThreadPoolExecutor(int corePoolSize, 
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    // 核心线程数可以为0
    // 最大线程数不为0
    // 最大线程数 大于 核心线程数
    // 等待时间大于等于0
    if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    // 将当前的入参赋值给成员变量
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

我们上面初始化的过程主要对入参做了一些校验,然后将方法的入参赋予给成员变量

1.1 拒绝策略

1.1.1 AbortPolicy

简单粗暴,直接抛出异常

public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
    }
}

1.1.2 CallerRunsPolicy

当前拒绝策略会在线程池无法处理任务时,将任务交给调用者处理

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 如果当前的
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

1.1.3 DiscardOldestPolicy

如果当前的阻塞队列满了,弹出时间最久的

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            // 获取阻塞队列,弹出一个时间最久的
            e.getQueue().poll();
            // 执行当前的
            e.execute(r);
        }
    }
}

1.1.4 DiscardPolicy

简单粗暴,不做任何操作

public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

1.1.5 自定义拒绝策略

自己写的业务逻辑,可以将拒绝的任务放至数据库等存储,等后续在执行
public static class MyRejectedExecution implements RejectedExecutionHandler{

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("这是我自己的拒绝策略");
    }
}

1.2 其余变量

// 该数值代表两个意思:
// 高3位表示当前线程池的状态
// 低29位表示当前线程池工作线程的个数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//  COUNT_BITS = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// CAPACITY就是当前工作线程能记录的工作线程的最大个数
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 111:代表RUNNING状态,RUNNING可以处理任务,并且处理阻塞队列中的任务。
private static final int RUNNING    = -1 << COUNT_BITS;
// 000:代表SHUTDOWN状态,不会接收新任务,正在处理的任务正常进行,阻塞队列的任务也会做完。
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 001:代表STOP状态,不会接收新任务,正在处理任务的线程会被中断,阻塞队列的任务一个不管。
private static final int STOP       =  1 << COUNT_BITS;
// 010:代表TIDYING状态,这个状态是否SHUTDOWN或者STOP转换过来的,代表当前线程池马上关闭,就是过渡状态。
private static final int TIDYING    =  2 << COUNT_BITS;
// 011:代表TERMINATED状态,这个状态是TIDYING状态转换过来的,转换过来只需要执行一个terminated方法。
private static final int TERMINATED =  3 << COUNT_BITS;

// 基于&运算的特点,保证只会拿到ctl高三位的值
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 基于&运算的特点,保证只会拿到ctl低29位的值
private static int workerCountOf(int c)  { return c & CAPACITY; }

线程池的状态变化流程图:

在这里插入图片描述

2、线程池的execute方法

  • Step1:当前的线程池个数低于核心线程数,直接添加核心线程即可
  • Step2:当前的线程池个数大于核心线程数,将任务添加至阻塞队列中
  • Step3:如果添加阻塞队列失败,则需要添加非核心线程数处理任务
  • Step4:如果添加非核心线程数失败(满了),执行拒绝策略
public void execute(Runnable command) {
    // 如果当前传过来的任务是null,直接抛出异常即可
    if (command == null)
        throw new NullPointerException();
    // 获取当前的数据值
    int c = ctl.get();
    
//==========================线程池第一阶段:启动核心线程数开始==================================================
    // Step1:获取ctl低29位的数值,与我们的核心线程数相比
    if (workerCountOf(c) < corePoolSize) {
        // Step2:添加一个核心线程
        if (addWorker(command, true)){
            return;
        }
        // 更新一下当前值
        c = ctl.get();
    }
//==========================线程池第一阶段:启动核心线程数结束==================================================
    
    // 如果走到下面会有两种情况:
    // 1、核心线程数满了,需要往阻塞队列里面扔任务
    // 2、核心线程数满了,阻塞队列也满了,执行拒绝策略
    
//==========================线程池第二阶段:任务放至阻塞队列开始==================================================
    // 判断当前的状态是不是Running的状态(RUNNING可以处理任务,并且处理阻塞队列中的任务)
    // 如果是Running的状态,则可以将任务放至阻塞队列中
    // 这里如果放阻塞队列失败了,证明阻塞队列满了
    if (isRunning(c) && workQueue.offer(command)) {
        // 再次更新数值
        int recheck = ctl.get();
        // 再次校验当前的线程池状态是不是Running
        // 如果线程池状态不是Running的话,需要删除掉刚刚放的任务
        if (!isRunning(recheck) && remove(command)){
            // 执行拒绝策略
            reject(command);
        } 
        // 如果到这里,说明上面阻塞队列中已经有数据了
        // 如果线程池的个数为0的话,需要创建一个非核心工作线程去执行该任务
        // 不能让人家堵塞着
        else if (workerCountOf(recheck) == 0){
            addWorker(null, false);
        }
    }
//==========================线程池第二阶段:任务放至阻塞队列结束==================================================

    // 如果走到这里的逻辑,证明上面的逻辑没走通,有以下两种情况:
    // 1、线程池的状态不是Running
    //    1.1 如果是这种情况,下面的添加非核心工作线程失败执行拒绝策略,但这个并不是这个逻辑的重点
    // 2、阻塞队列添加任务失败(阻塞队列满了)
    //    2.1 这种情况才是我们需要关心的
    //    2.2 阻塞队列满了,添加非核心工作线程
    //    2.3 若添加非核心工作线程失败,证明已经到达maximumPoolSize的限制,执行拒绝策略
//==========================线程池第三阶段:启动非核心线程数开始==================================================
    // 添加一个非核心工作线程
    else if (!addWorker(command, false))
        // 工作队列中添加任务失败,执行拒绝策略
        reject(command);
//==========================线程池第三阶段:启动非核心线程数结束==================================================
}

流程图如下:
在这里插入图片描述

3、线程池的addWorker方法

3.1 校验

  • 校验当前线程池的状态
  • 校验当前线程池工作线程的个数(核心线程数、最大工作线程数)
private boolean addWorker(Runnable firstTask, boolean core) {
    // 这里主要是为了结束整个循环
    retry:
    for (;;) {
        // 获取当前线程池的数值(ctl)
        int c = ctl.get();
        
        // runStateOf:基于&运算的特点,保证只会拿到ctl高三位的值
        int rs = runStateOf(c);

//==========================线程池状态判断=============================================================
        // rs >= SHUTDOWN:代表当前线程池状态为:SHUTDOWN、STOP、TIDYING、TERMINATED,线程池状态异常
        // 但这里SHUTDOWN状态稍许不同(不会接收新任务,正在处理的任务正常进行,阻塞队列的任务也会做完)
        // 如果当前的状态是SHUTDOWN状态并且阻塞队列任务不为空且新任务为空
        // 需要新起一个非核心工作线程去执行任务
        // 如果不是前面的,直接返回false即可
        if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())){
            return false;
        }

//==========================工作线程个数判断==========================================================
        for (;;) {
            // 获取当前线程池中线程的个数
            int wc = workerCountOf(c);
            // 1、如果线程池线程的个数是否超过了工作线程的最大个数
            // 2、core=true(核心线程)=false(工作线程)
            // 2.1 根据当前core判断创建的是核心线程数(corePoolSize)还是非核心线程数(maximumPoolSize)
            if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)){
                return false;
            }
            // 尝试将线程池线程加一
            if (compareAndIncrementWorkerCount(c)){
                // CAS成功后,直接退出外层循环,代表可以执行添加工作线程操作了。
                break retry;
            }
            
            // 获取当前线程池的数值(ctl)
            c = ctl.get(); 
            // 获取当前线程池的状态
            // 判断当前线程池的状态等不等于我们上面的rs
            // 我们线程池的状态被人更改了,需要重新跑整个for循环判断逻辑
            if (runStateOf(c) != rs){
                continue retry;
            }
        }
    }
    // 省略下面的代码
}

3.2 添加线程

{
    // 省略校验的步骤
    
    // 两个标记
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 将当前的任务封装成Worker
        w = new Worker(firstTask);
        // 拿到当前Worker的线程
        final Thread t = w.thread;
        // 线程不为空
        if (t != null) {
            // 上锁,保证线程安全(workers、largestPoolSize)
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                
                // 获取线程池的状态
                int rs = runStateOf(ctl.get());

                // 1、rs < SHUTDOWN:保证当前线程池的状态一定是RUNNING状态
                // 2、rs == SHUTDOWN && firstTask == null:如果当前线程池是SHUTDOWN状态且新任务为空
                if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                    // 判断线程是否还活着
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    // private final HashSet<Worker> workers = new HashSet<Worker>();
                    // 添加到我们的work队列中
                    workers.add(w);
                    // 获取works的大小
                    int s = workers.size();
                    // largestPoolSize在记录最大线程个数的记录
                    // 如果当前的大小比最大的还要打,替换即可
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // worker添加成功
                    workerAdded = true;
                }
            } finally {
                // 解锁
                mainLock.unlock();
            }
            // 如果添加成功
            if (workerAdded) {
                // 启动线程
                t.start();
                // 线程启动标志位
                workerStarted = true;
            }
        }
    } finally {
        // 如果线程没有启动成功,从workers集合中删除掉该worker
        if (!workerStarted)
            addWorkerFailed(w);
    }
    // 返回线程是否启动成功
    return workerStarted;
}

// Worker的初始化
Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    // 从线程工厂里面拿一个线程出来
    this.thread = getThreadFactory().newThread(this);
}

// 从workers集合中删除掉该worker
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            workers.remove(w);
        decrementWorkerCount();
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

4、线程池的 worker 源码

// Worker继承了AQS,目的就是为了控制工作线程的中断。
// Worker实现了Runnable,内部的Thread对象,在执行start时,必然要执行Worker中断额一些操作
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    
    private static final long serialVersionUID = 6138294804551838833L;
    // 当前线程工厂创建的线程(也是执行任务使用的线程)
    final Thread thread;
    
    // 当前的第一个任务
    Runnable firstTask;
  
    // 记录执行了多少个任务
    volatile long completedTasks;

    // 构造方法
    Worker(Runnable firstTask) {
        // 将State设置为-1,代表当前不允许中断线程
        setState(-1); 
        // 设置任务
        this.firstTask = firstTask;
        // 设置线程
        this.thread = getThreadFactory().newThread(this);
    }

    // 线程启动执行的方法
    public void run() {
        runWorker(this);
    }
    
    // =======================Worker管理中断================================   
    // 当前方法是中断工作线程时,执行的方法
    void interruptIfStarted() {
        Thread t;
        // 只有Worker中的state >= 0的时候,可以中断工作线程
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                // 如果状态正常,并且线程未中断,这边就中断线程
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    } 
}

5、线程池的 runWorker 方法

public void run() {
    runWorker(this);
}

final void runWorker(Worker w) {
    // 拿到当前的线程
    Thread wt = Thread.currentThread();
    // 拿到当前Worker的第一个任务(如果携带的话)
    Runnable task = w.firstTask;
    // 置空
    w.firstTask = null;
    // 解锁
    w.unlock(); 
    boolean completedAbruptly = true;
    try {
        // 如果任务不等于空 或者 从阻塞队列中拿到的任务不等于空
        while (task != null || (task = getTask()) != null) {
            // 加锁
            w.lock();
            // 如果线程池状态 >= STOP,确保线程中断了
            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 在任务执行前做一些操作,自己实现的钩子
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 任务执行
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 任务执行后一些操作:自己实现的钩子
                    afterExecute(task, thrown);
                }
            } finally {
                // 任务置空
                task = null;
                // 执行任务+1
                w.completedTasks++;
                // 解锁
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 删除线程的方法
        processWorkerExit(w, completedAbruptly);
    }
}

6、线程池的 getTask 方法

private Runnable getTask() {
    // 超时的标记
    boolean timedOut = false; 

    // 死循环拿数据
    for (;;) {
        // 拿到当前的ctl
        int c = ctl.get();
        // 获取其线程池状态
        int rs = runStateOf(c);

        // 如果线程池状态是STOP,没有必要处理阻塞队列任务,直接返回null
        // 如果线程池状态是SHUTDOWN,并且阻塞队列是空的,直接返回null
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // 线程池中的线程个数减一
            decrementWorkerCount();
            return null;
        }

        // 当前线程池中线程个数
        int wc = workerCountOf(c);

        // 这里是个重点
        // allowCoreThreadTimeOut:是否允许核心线程数超时(开启这个之后),核心线程数也会执行下面超时的逻辑
        // wc > corePoolSize:当前线程池中的线程个数大于核心线程数
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // wc > maximumPoolSize:基本不存在
        // timed && timedOut:第一次肯定是失败的(超时标记为false)
        if ((wc > maximumPoolSize || (timed && timedOut))
            // 1、线程个数为1
            // 2、阻塞队列是空的
            && (wc > 1 || workQueue.isEmpty())) {
            // 线程池的线程个数减一
            if (compareAndDecrementWorkerCount(c)){
                return null;
            }
            continue;
        }

        try {
            // 根据我们前面的timed的值(当前线程池中的线程个数是否大于核心线程数)
            // 如果大于,执行workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)带有时间的等待,超过时间无任务,会返回null
            // 如果小于,执行workQueue.take(),死等任务,不会返回null
            Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
            if (r != null){
                return r;
            }
            // 到这里,说明上面的等待超时了
            // 这里要注意一下,如果这里超时后,我们上面 if ((wc > maximumPoolSize || (timed && timedOut)) 这个判断要起作用了
        	// (timed && timedOut) true
            // wc > 1 || workQueue.isEmpty():当线程大于1或者阻塞队列无数据,直接返回null,让外部循环删除
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

7、线程池的 processWorkerExit 方法

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 正常退出的:completedAbruptly=false
    // 不是正常退出的:completedAbruptly=true
    if (completedAbruptly) 
        decrementWorkerCount();

    // 加锁——上锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 将当前worker的执行任务数累加到线程池中
        completedTaskCount += w.completedTasks;
        // 线程池删除该工作线程
        workers.remove(w);
    } finally {
        // 解锁
        mainLock.unlock();
    }

    tryTerminate();

    // 获取ctl的数据
    int c = ctl.get();
    // 这里只有SHUTDOWN、RUNNING会进入判断
    if (runStateLessThan(c, STOP)) {
        // 正常退出的
        if (!completedAbruptly) {
            // 是否允许超时
            // 允许:0
            // 不允许:核心线程数
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 如果min=0并且阻塞队列不为空
            // 将min设置成1
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            // 当前线程池的大小大于最小值,直接返回即可
            if (workerCountOf(c) >= min){
                return;
            }
        }
        // 如果没有的话,说明线程池中没有线程了,并且还有阻塞任务
        // 只能添加一个非核心线程去处理这些任务
        addWorker(null, false);
    }
}

8、线程池的关闭方法

8.1 shutdownNow 方法

  • 将线程池状态修改为Stop(不会接收新任务,正在处理任务的线程会被中断,阻塞队列的任务一个不管)
  • 将线程池中的线程全部中断
  • 删除当前线程池所有的工作线程
  • 将线程池的状态从:Stop --> TIDYING --> TERMINATED,正式标记线程池的结束(唤醒一下等待的主线程)
public List<Runnable> shutdownNow() {
    // 声明返回结果
    List<Runnable> tasks;
    // 加锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 将线程池状态修改为STOP
        advanceRunState(STOP);
        // 将线程池中的线程全部中断
        interruptWorkers();
        // 删除当前所有的工作线程
        tasks = drainQueue();
    } finally {
        // 解锁
        mainLock.unlock();
    }
    // 查看当前线程池是否可以变为TERMINATED状态
    // 从 Stop 状态修改为 TIDYING,在修改为 TERMINATED
    tryTerminate();
    return tasks;
}

// targetState = STOP
// 作用:将当前线程池的状态修改为Stop
private void advanceRunState(int targetState) {
    // 进来直接死循环
    for (;;) {
        // 拿到当前的ctl
        int c = ctl.get();
        // runStateAtLeast(c, targetState):当前的c是不是大于STOP(如果大于Stop的话,说明线程池状态已经G了
        // 基于CAS,将ctl从c修改为Stop状态,不修改工作线程个数,仅仅将状态修改为Stop
        // 如果可以修改成功,直接退出即可
        if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}

// 将线程池中的线程全部中断
private void interruptWorkers() {
    // 加锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 循环遍历线程组
        for (Worker w : workers)
            // 中断线程
           	// 这里会给线程打一个中断的标记,具体什么时候中断线程,需要我们自己去控制
            w.interruptIfStarted();
    } finally {
        // 解锁
        mainLock.unlock();
    }
}

// 删除当前所有的工作线程
private List<Runnable> drainQueue() {
    // 存放工作线程的队列
    BlockingQueue<Runnable> q = workQueue;
    // 返回的结果
    ArrayList<Runnable> taskList = new ArrayList<Runnable>();
    // 清空阻塞队列并将数据放入taskList中
    q.drainTo(taskList);
    // 校验当前的数据是够真的清空
    if (!q.isEmpty()) {
        // 如果确实有遗漏的,毕竟这哥们也没上锁
        // 手动的将线程从workQueue删除掉并且放到taskList中
        for (Runnable r : q.toArray(new Runnable[0])) {
            if (q.remove(r))
                taskList.add(r);
        }
    }
    // 最终返回即可
    return taskList;
}

final void tryTerminate() {
    for (;;) {
        // 拿到ctl
        int c = ctl.get();
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // CAS将当前的ctl设置成TIDYING
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // 该方法是一个钩子函数,我们自己定义,在线程池销毁之前做最后的处理
                    terminated();
                } finally {
                    // 将ctl设置成TERMINATED标志着线程池的正式结束
                    ctl.set(ctlOf(TERMINATED, 0));
                    // 线程池提供了一个方法,主线程在提交任务到线程池后,是可以继续做其他操作的。
                    // 咱们也可以让主线程提交任务后,等待线程池处理完毕,再做后续操作
                    // 这里线程池凉凉后,要唤醒哪些调用了awaitTermination方法的线程
                    // 简单来说,当时等待线程池返回的主线程,由于线程池已经销毁了,他们也必须要唤醒
                    termination.signalAll();
                }
                return;
            }
        } finally {
            // 解锁
            mainLock.unlock();
        }
    }
}

8.2 shutdown 方法

public void shutdown() {
    // 加锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 将线程池状态修改为SHUTDOWN
        advanceRunState(SHUTDOWN);
        // 将线程池中的线程全部中断
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    // 查看当前线程池是否可以变为TERMINATED状态
    // 从 SHUTDOWN 状态修改为 TIDYING,在修改为 TERMINATED
    tryTerminate();
}

四、流程图

在这里插入图片描述

五、总结

鲁迅先生曾说:独行难,众行易,和志同道合的人一起进步。彼此毫无保留的分享经验,才是对抗互联网寒冬的最佳选择。

其实很多时候,并不是我们不够努力,很可能就是自己努力的方向不对,如果有一个人能稍微指点你一下,你真的可能会少走几年弯路。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/501422.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PostgreSQL 基础知识:psql 提示和技巧

对于积极使用和连接到 PostgreSQL 数据库的任何开发人员或 DBA 来说&#xff0c;能够访问psql命令行工具是必不可少的。在我们的第一篇文章中&#xff0c;我们讨论了 psql的简要历史&#xff0c;并演示了如何在您选择的平台上安装它并连接到 PostgreSQL 数据库。 在本文中&…

使用腾讯云快速完成网站备案的详细过程

最近总是被备案弄得血压飙升&#xff0c;明明是一件很简单的事情&#xff0c;不知道大家为什么搞得那么复杂&#xff0c;首先了解下为什么要备案&#xff0c;根据国务院令第292号《互联网信息服务管理办法》和 《非经营性互联网信息服务备案管理办法》规定&#xff0c;国家对经…

【TCP四次挥手】

文章目录 TCP 四次挥手过程是怎样的&#xff1f;为什么挥手需要四次&#xff1f;第一次挥手丢失了&#xff0c;会发生什么&#xff1f;第二次挥手丢失了&#xff0c;会发生什么&#xff1f;第三次挥手丢失了&#xff0c;会发生什么&#xff1f;第四次挥手丢失了&#xff0c;会发…

Lecture 13(Extra Material):Q-Learning

目录 Introduction of Q-Learning Tips of Q-Learning Double DQN Dueling DQN Prioritized Reply Multi-step Noisy Net Distributional Q-function Rainbow Q-Learning for Continuous Actions Introduction of Q-Learning Critic: The output values of a critic…

为生信写的Python简明教程 | 视频3

开源生信 Python教程 生信专用简明 Python 文字和视频教程 源码在&#xff1a;https://github.com/Tong-Chen/Bioinfo_course_python 目录 背景介绍 编程开篇为什么学习Python如何安装Python如何运行Python命令和脚本使用什么编辑器写Python脚本Python程序事例Python基本语法 数…

PySpark基础入门(7):Spark SQL

概述 SparkSQL和Hive的异同 Hive和Spark 均是&#xff1a;“分布式SQL计算引擎”SparkSQL使用内存计算&#xff0c;而Hive使用磁盘迭代&#xff0c;所以SparkSQL性能较好二者都可以运行在YARN之上SparkSQL无元数据管理&#xff0c;但可以和hive集成&#xff0c;集成之后可以借…

极光笔记 | 极光推出“运营增长”解决方案,开启企业增长新引擎

摘要&#xff1a; 移动互联网流量红利见底&#xff0c;营销获客面临更多挑战 随着移动互联网流量红利见顶&#xff0c;越来越多的企业客户发现获取新客户的难度直线上升&#xff0c;获客成本持续攀高。 传统的移动互联网营销以PUSH为代表&#xff0c;采用简单粗暴的方式给用户…

PaddleVideo 简介以及文件目录详解

简介特性许可证书 PaddleVideo 文件目录总述applications 文件夹详述configs 文件夹详述docs 文件夹详述paddlevideo 文件夹详述utils 文件夹tasks 文件夹loader 文件夹modeling 文件夹solver 文件夹metrics 文件夹 简介 PaddleVideo 旨在打造一套丰富、领先且实用的 Video 工…

【阿里云】秒懂云通信

目录 一、秒懂云通信-第一回听什么? 二、短信的使用场景 1. 短信的三种类型&#xff1a;短信通知、验证、会员营销 三、短信平台的选择 1、看成功率 2、看价格 3、看体验 四、秒懂云通信 五、如何使用 Step 1&#xff1a;业务入口 Step 2&#xff1a;注册账号 Step…

云安全技术——Snort安装与配置

目录 一、Snort简介 二、安装Centos7 Minimal系统 三、基本环境配置 四、安装Snort 五、下载规则 六、配置Snort 七、测试Snort 一、Snort简介 Snort是一个开源的网络入侵检测系统&#xff0c;主要用于监控网络数据包并检测可能的攻击行为。它可以实时分析网络流量&…

HJ37 统计每个月兔子的总数

HJ37 统计每个月兔子的总数 描述示例解题思路以及代码分析解法1解法2 描述 描述 有一种兔子&#xff0c;从出生后第3个月起每个月都生一只兔子&#xff0c;小兔子长到第三个月后每个月又生一只兔子。 例子&#xff1a;假设一只兔子第3个月出生&#xff0c;那么它第5个月开始会…

ASEMI代理ADUM3211TRZ-RL7原装ADI车规级ADUM3211TRZ-RL7

编辑&#xff1a;ll ASEMI代理ADUM3211TRZ-RL7原装ADI车规级ADUM3211TRZ-RL7 型号&#xff1a;ADUM3211TRZ-RL7 品牌&#xff1a;ADI/亚德诺 封装&#xff1a;SOIC-8 批号&#xff1a;2023 引脚数量&#xff1a;8 工作温度&#xff1a;-40C~125C 安装类型&#xff1a;表…

操作系统原理 —— 操作系统什么时候会发生进程的调度(十二)

操作系统什么时候需要进程调度&#xff1f; 进程调度的层次中&#xff0c;有一个低级调度&#xff0c;就是按照某种算法从就绪队列中选择一个进程为其分配 CPU。 那操作系统会在什么时候触发进程调度呢&#xff1f; 在这里一共可以分为两大类&#xff1a; 当前运行的进程主动…

04-微服务部署2023系列-centos安装gitlab

目的:为了将来的devops快速部署搭建自己的代码库,保证速度和私密性 前面01-03小节: 完成基本的服务器环境 centos_nginx_java_docker; 这个基础环境是将来集群中每台服务器的基本, 可以先打一个镜像备份。 阿里云的镜像备份比较简单。以后搭建新服务器时,以这个为基础,安…

JUC并发包详解AQS同步队列

一、AQS介绍 在JUC并发包中&#xff0c;AQS为其最关键的作用&#xff0c;全称为abstractQueuedSynchroinzed同步器&#xff0c;为信号量semaphore、同步锁的基础抽象类。 其中内部主要有二大块 state 共享资源&#xff0c;通过并发操作state修改改值为1&#xff0c;如果修改成…

《Linux 内核设计与实现》09. 内核同步介绍

共享资源之所以要防止并发访问&#xff0c;是因为如果多个执行线程同时访问和操作数据&#xff0c;就有可能发生各线程之间相互覆盖共享数据的情况&#xff0c;从而造成被访问的数据不一致状态。 临界区和竞争条件 临界区&#xff1a;访问和操作共享数据的代码段。原子操作&a…

键控流水灯

项目文件 文件 关于项目的内容知识点可以见专栏单片机原理及应用 的第四章 IO口编写 在电路图的基础上&#xff0c;编写可键控的流水灯程序。要求实现的功能为&#xff0c;K1是总开关,当K1首次按下时&#xff0c;流水灯由下往上流动;当K2按下时停止流动&#xff0c;且全部灯灭…

ASK,FSK和PSK

一、ASK&#xff0c;FSK和PSK 数字信号只有有限个离散值&#xff0c;使用数字信号对载波进行调制的方式称为键控(Keying),分为幅度键控&#xff08;ASK)、频移键控&#xff08;FSK)和相移键控&#xff08;PSK)。 幅度键控可以通过乘法器和开关电路来实现&#xff0c;在数字信…

SpringBoot【开发实用篇】---- 配置高级

SpringBoot【开发实用篇】---- 配置高级 1. ConfigurationProperties2. 宽松绑定/松散绑定3. 常用计量单位绑定4. 校验5. 数据类型转换 进入开发实用篇第二章内容&#xff0c;配置高级&#xff0c;其实配置在基础篇讲了一部分&#xff0c;在运维实用篇讲了一部分&#xff0c;这…

离线安装Percona

前言 安装还是比较简单&#xff0c;这边简单进行记录一下。 版本差异 一、离线安装Percona 下载percona官网 去下载你需要对应的版本 jemalloc-3.6.0-1.el7.x86_64.rpm 需要单独下载 安装Percona 进入RPM安装文件目录&#xff0c;执行下面的脚本 yum localinstall *.rpm修改…