【源码】【Java并发】【线程池】邀请您从0-1阅读ThreadPoolExecutor源码

news2025/4/16 19:53:07

👋hi,我不是一名外包公司的员工,也不会偷吃茶水间的零食,我的梦想是能写高端CRUD
🔥 2025本人正在沉淀中… 博客更新速度++
👍 欢迎点赞、收藏、关注,跟上我的更新节奏
📚欢迎订阅专栏,专栏别名《在2B工作中寻求并发是否搞错了什么》

文章目录

  • 前言
  • ThreadPoolExecutor构造方法
  • 提交任务(execute方法)
  • ctl
    • 线程池状态
    • 工作线程数量
  • worker类
  • 执行Worker(runWorker方法)
  • 获取任务(getTask方法)
  • 线程退出(processWorkerExit方法)
  • 线程池的关闭
    • shutdown方法
    • shutdownNow方法
    • tryTerminate方法
  • 后话

前言

当我们创建一个ThreadPoolExecutor的时候,你是否会好奇🤔,它到底发生了什么?比如:

  • 我传的拒绝策略、线程工厂是啥时候被使用的?
  • 核心线程数是个啥?最大线程数和它又有什么关系?
  • 线程池,它是怎么调度,我们传入的线程?

不要着急,小手手点上关注、点赞、收藏。主播马上从源码的角度带你们探索神秘线程池的世界。

public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
                          int maximumPoolSize,//线程池的最大线程数
                          long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
                          TimeUnit unit,//时间单位
                          BlockingQueue<Runnable> workQueue,//阻塞队列,用来储存等待执行任务的队列
                          ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
                          RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
                           ) {

ThreadPoolExecutor构造方法

所有伟大的开始,源于构造方法

我们可以看到,构造方法里,只是对它做了数量的校验和赋值:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||					// 核心线程数是可以为0,但不能小于0
        maximumPoolSize <= 0 ||				// 最大线程数不能小于0
        maximumPoolSize < corePoolSize ||	// 核心线程数小于最大线程数
        keepAliveTime < 0)					// 存活时间大于0
        throw new IllegalArgumentException();
    // 工作队列、线程工厂、拒绝策略不能为null
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

有没有好奇的uu?构造线程池,有哪些参数可以不传?
下面的构造方法,早已告诉了我们答案 — 线程工厂拒绝策略

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

如果我们不传线程工厂和拒绝策略,那么就会有默认的线程工厂拒绝策略

  • 默认的线程工厂:DefaultThreadFactory
// 默认的线程工厂
static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}
  • 默认的拒绝策略:AbortPolicy
// ThreadPoolExecutor默认的拒绝策略
private static final RejectedExecutionHandler defaultHandler =
    new AbortPolicy();

// AbortPolicy拒绝策略
public static class AbortPolicy implements RejectedExecutionHandler {
    
    public AbortPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

提交任务(execute方法)

🤔提交任务的流程是什么勒?源码里,Doug Lea注释里写的很清楚了。
在这里插入图片描述
第一步

* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task.  The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
==========================我是分割线===========================
1、如果运行中的线程数少于corePoolSize,尝试用当前任务作为首个任务启动新线程。
addWorker方法会原子性地检查线程池状态和worker数量,避免非法创建线程。

第二步

* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
==========================我是分割线===========================
2、 如果任务成功入队,仍需二次检查:
   - 是否应补充新线程(例如之前检查后有线程死亡)
   - 线程池是否已关闭
   根据检查结果决定回滚入队操作或创建新线程

第三步

* 3. If we cannot queue task, then we try to add a new
* thread.  If it fails, we know we are shut down or saturated
* and so reject the task.
==========================我是分割线===========================
3、如果无法入队(队列已满),尝试创建新线程。
  若失败(线程池关闭或已达最大线程数),执行拒绝策略。

是不是长长的文字不想看,没有关系,一图顶千言👇:
在这里插入图片描述
从源码的角度的来看:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // 获取ctl,ctl是什么?等下会详细的说说,它表示线程池的状态和工作线程数
        int c = ctl.get();
        // 工作线程数小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))		// 尝试添加核心线程
                return;
            c = ctl.get();
        }
        // 线程池状态为正在运行 且 成功放入阻塞队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 二次检查的线程状态不为正在运行 且 移除线程成功
            if (! isRunning(recheck) && remove(command))
                reject(command);				    // 触发拒绝策略
            else if (workerCountOf(recheck) == 0)   // 工作线程数为0
                addWorker(null, false);			    // 尝试添加非核心线程
        }
        else if (!addWorker(command, false)) 	    // 尝试添加非核心线程
            reject(command);  						// 触发拒绝策略
    }

二次检查状态是为了处理并发场景下的竞态条件,具体检查两个维度:

  • 状态检查:用isRunning(recheck)验证线程池是否被关闭(SHUTDOWN/STOP),若关闭则回滚已入队任务并触发拒绝策略。
  • 线程检查:当workerCount==0时(核心线程被回收/异常终止),需创建非核心线程保证队列任务能被消费。

竞态条件(race condition)指的是两个或者以上进程或者线程并发执行时,其最终的结果依赖于进程或者线程执行的精确时序。竞争条件会产生超出预期的情况。

ctl

刚刚看提交任务源码中埋下的伏笔 — 什么是ctl?我们下面具体来说说。

Doug Lea用一个变量,来表示2种状态:

  1. 线程池当前状态(高3位)
  2. 当前线程池工作线程个数(低29位)
// 初始化ctl,线程池状态为正在运行,工作线程数为0
// RUNNING  = 111_00000_00000000_00000000_00000000 (-536870912)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// rs: 运行状态   wc:工作线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; }


// 用来算线程池状态,左移27位 
private static final int COUNT_BITS = Integer.SIZE - 3;
// CAPACITY = 000_11111_11111111_11111111_11111111
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

线程池状态

private static final int COUNT_BITS = Integer.SIZE - 3;

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

相当于

// 状态值定义(高3位)
RUNNING    = 111_00000_00000000_00000000_00000000 (-536870912)
SHUTDOWN   = 000_00000_00000000_00000000_00000000 (0)
STOP       = 001_00000_00000000_00000000_00000000 (536870912)
TIDYING    = 010_00000_00000000_00000000_00000000 (1073741824)
TERMINATED = 011_00000_00000000_00000000_00000000 (1610612736)

计算出当前线程池的状态

CAPACITY = 000_11111_11111111_11111111_11111111

// 取高3位(屏蔽低29位)
private static int runStateOf(int c) { return c & ~CAPACITY; }

// 状态判断
private static boolean isRunning(int c) { return c < SHUTDOWN; }

线程池状态的转变
在这里插入图片描述

工作线程数量

CAPACITY = 000_11111_11111111_11111111_11111111

// 取低29位(屏蔽高3位)
private static int workerCountOf(int c) { return c & CAPACITY; }

// CAS更新线程数
private boolean compareAndIncrementWorkerCount(int expect) {
   return ctl.compareAndSet(expect, expect + 1);
}

尝试向线程池添加线程(addWorker方法)

  1. 状态检查与CAS计数更新:通过循环检查线程池状态(是否可接受新任务)和当前线程数(是否超过核心或最大线程数限制),使用CAS原子操作增加workerCount
// 入参:
// 1、firstTask: 新线程要执行的第一个任务(可能为null)
// 2、core:决定用corePoolSize(true)还是maximumPoolSize(false)作为线程数上限
private boolean addWorker(Runnable firstTask, boolean core:) {
    retry:
    for (;;) {
        int c = ctl.get();
        // 获取线程池状态
        int rs = runStateOf(c);

        // 通过ctl原子变量(高位存状态,低位存线程数)检查线程池状态,防止在非运行状态下创建新线程
        // 1、rs >= SHUTDOWN 
        //    判断线程池状态是否处于SHUTDOWN或更高状态(STOP, TIDYING, TERMINATED)
        // 2、!(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())
        // 只有当同时满足以下3个条件时,才会取反为false:
        //   - 状态正好是SHUTDOWN(不是更高状态)
        //   - firstTask为空(表示不是新提交的任务)
        //   - 工作队列非空(还有未处理的任务)
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            // 获取工作线程数
            int wc = workerCountOf(c);
            // 校验是否超过容量限制
            // 1、wc >= CAPACITY:判断工作线程数量是否大等于最大工作线程数量
            // 2、wc >= (core ? corePoolSize : maximumPoolSize)):
            // 判断工作线程数量是否大等于 核心线程数/最大线程数
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c)) // 通过CAS保证线程数增减的原子性,避免并发问题
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
    ...
  1. 创建工作线程:创建Worker对象并加锁将其加入线程集合。若线程启动失败,则回滚计数和集合状态。
private boolean addWorker(Runnable firstTask, boolean core) {
    ...

    boolean workerStarted = false;	// worker线程是否被启动
    boolean workerAdded = false;	// worker是否被添加到workers集合中  private final HashSet<Worker> workers = new HashSet<Worker>();
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {

                int rs = runStateOf(ctl.get());

                // 1、rs < SHUTDOWN:线程池处于RUNNING状态
                // 目的:确保只有在线程池可用时才能创建新线程
                // 2、(rs == SHUTDOWN && firstTask == null):线程池处于SHUTDOWN状态且没有初始任务(firstTask为null)
                // SHUTDOWN 状态下不允许添加新任务(firstTask != null 会直接拒绝),但允许创建 无初始任务 的线程来消费队列中的残留任务
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive())  // 检查新创建的线程是否已处于活跃状态
                        throw new IllegalThreadStateException();
                    workers.add(w);		// 添加到Worker集合
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();		// 启动线程
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);  // 失败时的回滚逻辑
    }
    return workerStarted;
}

这里简单说下,失败时候的回滚逻辑

  • 从集合中移除Worker
  • CAS递减线程数
  • 尝试终止线程池(后面会详细说说🤗)
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)				
            workers.remove(w);		// 从集合中移除Worker
        decrementWorkerCount();		// CAS递减线程数
        tryTerminate();				// 尝试终止线程池,后面会详细说说
    } finally {
        mainLock.unlock();
    }
}

worker类

刚刚在addWorker方法里,我们创建了Worker类,🤔聪明的你,一定很想知道这个类到底是干什么的吧!
Worker类是ThreadPoolExecutor的核心内部类,它直接负责 封装工作线程 和 管理任务执行。其设计巧妙地将线程、任务和锁机制结合在一起。

private final class Worker
    extends AbstractQueuedSynchronizer // 继承AQS实现锁机制
    implements Runnable {             // 自身作为线程的Runnable目标
    final Thread thread;              // 实际的工作线程
    Runnable firstTask;               // 初始任务(可能为null)
    volatile long completedTasks;     // 完成的任务计数器
}

构造方法,在addWorker的时候,就会调用到这个构造方法。
怎么样?你是否看到了,自己构造线程池时传入的线程工厂被调用😄?

Worker(Runnable firstTask) {
  this.firstTask = firstTask;
  this.thread = getThreadFactory().newThread(this); // this即Worker自身
}

worker类的锁状态
通过继承 AQS(AbstractQueuedSynchronizer) 实现了一个不可重入的独占锁,用于精确控制线程的中断行为和状态标识。

注释说了,0是未被锁定,1是被锁定了
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.

work类的加锁/解锁

// 最终调用AQS的acquire方法。将state从0改为1
public void lock()        { acquire(1); }  
public boolean tryLock()  { return tryAcquire(1); }

// AQS的tryAcquire实现(Worker内部)
protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) { // CAS操作保证原子性
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

解锁

// 最终调用AQS的tryRelease实现。将state从1改为0
public void unlock()      { release(1); }   

// AQS的tryRelease实现
protected boolean tryRelease(int unused) {
    setExclusiveOwnerThread(null);
    setState(0); // 无需CAS,因为只有持有锁的线程能调用此方法
    return true;
}

执行Worker(runWorker方法)

worker类,调用外部类ThreadPoolExecutorrunWorker方法

public void run() {
    runWorker(this);   // 调用外部类ThreadPoolExecutor的runWorker方法
}

让我们康康,线程池的runWorker方法吧!

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;		// 获取初始任务(可能为null)
    w.firstTask = null;					// 清空初始任务引用
    w.unlock(); 						// 允许中断(设置state=0,标记为初始可中断状态)
    boolean completedAbruptly = true;	// 是否因异常退出
    try {
        // ---- 核心循环:不断获取任务 ----
        // 执行初始任务,或者获取到的任务,getTask是怎么获取的,下文会介绍
        while (task != null || (task = getTask()) != null) {
            w.lock();   // 加锁标记线程为"工作中"
            // 检查线程池状态(若处于STOP需确保线程被中断)
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 如果我们写了新类继承了ThreadPoolExector重写了beforeExecute方法,就执行
                // ThreadPoolExector这里没有任何实现: protected void beforeExecute(Thread t, Runnable r) { }
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run(); 	// 实际执行任务(注意不是启动新线程!)
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 如果我们写了新类继承了ThreadPoolExector重写了afterExecute方法,就执行
                    // ThreadPoolExector这里没有任何实现:protected void afterExecute(Runnable r, Throwable t) { }
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;				// 清空任务引用
                w.completedTasks++;			// 统计完成的任务数
                w.unlock();					// 解锁标记线程为“空闲”
            }
        }
        completedAbruptly = false;			// 没有因为异常导致退出
    } finally {
        processWorkerExit(w, completedAbruptly);  // 线程退出处理
    }
}

获取任务(getTask方法)

你是否会好奇🤔, 刚刚我们在runWork中的task是从哪里获取的?while (task != null || (task = getTask()) != null) 。

private Runnable getTask() {
    boolean timedOut = false; 		// 标记是否发生poll超时
    
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);		// 解析线程池状态

        // ===== 1. 状态检查:是否需要停止工作 =====
        // 条件1: 线程池状态 >= STOP(立刻停止)
        // 条件2: 线程池状态 >= SHUTDOWN 且 队列为空
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();		// 减少工作线程数
            return null;				// 触发线程回收
        }

        int wc = workerCountOf(c);		// 当前工作线程数

        // ===== 2. 判断是否允许超时回收 =====
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        // ===== 3. 线程数溢出检查 =====
        // 情况1: 线程数超过maximumPoolSize(可能因配置修改导致)
        // 情况2: 允许超时回收 且 已发生超时
        // 				且
        // 情况1:工作线程数量大于1 或 工作队列为空
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))	// CAS减少线程数
                return null;
            continue;	// 竞争失败则重试
        }
        // ===== 4. 从队列获取任务 =====
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :  // 限时阻塞等待,超时后返回 null
                workQueue.take();									   // 永久阻塞(核心线程默认行为)
            if (r != null)
                return r;
            timedOut = true;	// 标记超时
        } catch (InterruptedException retry) {
            timedOut = false;	// 中断重试(SHUTDOWN状态可能触发)
        }
    }
}

线程退出(processWorkerExit方法)

我们在runWork方法的时候,调用了processWorkerExit方法,好奇的你一定想看看他是怎么实现的吧!

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;		// 获取初始任务(可能为null)
    w.firstTask = null;					// 清空初始任务引用
    w.unlock(); 						// 允许中断(设置state=0,标记为初始可中断状态)
    boolean completedAbruptly = true;	// 是否因异常退出
    try {
        // ---- 核心循环:不断获取任务 ----
        while (task != null || (task = getTask()) != null) {...}
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);  // 线程退出处理
    }
}

processWorkerExit方法

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // ===== 1. 参数校验与状态记录 =====
    if (completedAbruptly) // 若因异常退出,需手动减少线程数
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // ===== 2. 统计完成任务数 =====
        completedTaskCount += w.completedTasks;
        workers.remove(w); // 从Worker集合中移除
    } finally {
        mainLock.unlock();
    }

    // ===== 3. 尝试终止线程池 =====
    tryTerminate();

    // ===== 4. 判断是否需要补充线程 =====
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) { // 线程池仍处于RUNNING/SHUTDOWN状态
        if (!completedAbruptly) { // 正常退出
            // 计算最小应保持的线程数
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1; // 队列非空时至少保留1个线程处理任务
            if (workerCountOf(c) >= min)
                return; // 当前线程数足够,无需补充
        }
        addWorker(null, false); // 补充新线程(无初始任务)
    }
}

线程池的关闭

之前说了这么多线程池是怎么运行,到这里也该说说线程池是怎么关闭的了🤭之前看源码的时候,经常出现的tryTerminate方法,也会在这里展开说说

线程池对我们开放了下面的3个方法,让我们来关闭线程池。

方法行为特性适用场景
shutdown()温和关闭:停止接受新任务,执行完已提交任务需要优雅关闭,确保任务不丢失
shutdownNow()强制关闭:停止接受新任务,尝试中断所有线程,并返回未执行任务列表需要立即释放资源,容忍任务丢弃
awaitTermination(long timeout, TimeUnit unit)阻塞等待线程池完全终止(结合shutdown使用)需要同步等待关闭完成

shutdown方法

shutdown方法的作用

  1. 停止接受新任务:调用 shutdown() 后,线程池不再接受新提交的任务。
  2. 执行已提交的任务:线程池会继续执行已经提交但尚未完成的任务。
  3. 关闭线程池:在所有任务执行完毕后,线程池会逐步关闭。
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN); // CAS更新状态为SHUTDOWN
        interruptIdleWorkers();    // 仅中断空闲线程
    } finally {
        mainLock.unlock();
    }
    tryTerminate(); // 尝试推进终止流程
}

interruptIdleWorkers方法
中断那些当前没有执行任务(即空闲)的工作线程。具体来说:

  • 中断空闲线程:它会遍历所有工作线程,并尝试中断那些正在等待任务的线程(即处于空闲状态的线程)。
  • 加速关闭过程:通过中断空闲线程,可以更快地释放资源,从而加速线程池的关闭过程。
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            // 关键点:尝试获取Worker锁(判断是否空闲)
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt(); // 仅中断空闲线程
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

shutdownNow方法

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP); // 更新为STOP状态
        interruptWorkers();    // 强制中断所有线程
        tasks = drainQueue();  // 排出未执行任务
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

interruptWorkers方法,我们可以看到里面调用了workerinterruptIfStarted

private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfStarted(); // 无论是否空闲都尝试中断
    } finally {
        mainLock.unlock();
    }
}

// worker类interruptIfStarted方法
void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}

tryTerminate方法

tryTerminate() 的核心逻辑是检查线程池的状态和条件,决定是否可以将线程池的状态转换为 TERMINATED,并在适当的时候中断空闲的工作线程。

final void tryTerminate() {
    // 检查是否满足终止条件
    if ((runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()) ||
        runStateAtLeast(c, TIDYING) || 
        isRunning(c)) 
        return;

    // 中断最后一个空闲线程(如有)
    if (workerCountOf(c) != 0) {
        interruptIdleWorkers(ONLY_ONE);
        return;
    }

    // 推进到TIDYING状态
    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
        try {
            terminated(); // 空实现的钩子方法
        } finally {
            ctl.set(ctlOf(TERMINATED, 0));
            termination.signalAll(); // 唤醒awaitTermination()
        }
    }
}

后话

👍 线程池源码学习,只是刚刚开始,友友们,点上关注,跟上主播的学习节奏。
话说最近温度回暖了,感觉看源码的速度都变快了!哈哈哈哈哈,开玩笑的。

往期文章推荐

【Java并发】【线程池】带你从0-1入门线程池

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2308396.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【开源免费】基于SpringBoot+Vue.JS网络海鲜市场系统(JAVA毕业设计)

本文项目编号 T 222 &#xff0c;文末自助获取源码 \color{red}{T222&#xff0c;文末自助获取源码} T222&#xff0c;文末自助获取源码 目录 一、系统介绍二、数据库设计三、配套教程3.1 启动教程3.2 讲解视频3.3 二次开发教程 四、功能截图五、文案资料5.1 选题背景5.2 国内…

Go中slice和map引用传递误区

背景 关于slice和map是指传递还是引用传递&#xff0c;很多文章都分析得模棱两可&#xff0c;其实在Go中只有值传递&#xff0c;但是很多情况下是因为分不清slice和map的底层实现&#xff0c;所以导致很多人在这一块产生疑惑&#xff0c;下面通过代码案例分析slice和map到底是…

C++ ++++++++++

初始C 注释 变量 常量 关键字 标识符命名规则 数据类型 C规定在创建一个变量或者常量时&#xff0c;必须要指定出相应的数据类型&#xff0c;否则无法给变量分配内存 整型 sizeof关键字 浮点型&#xff08;实型&#xff09; 有效位数保留七位&#xff0c;带小数点。 这个是保…

【北京迅为】iTOP-RK3568OpenHarmony系统南向驱动开发-第1章 GPIO基础知识

瑞芯微RK3568芯片是一款定位中高端的通用型SOC&#xff0c;采用22nm制程工艺&#xff0c;搭载一颗四核Cortex-A55处理器和Mali G52 2EE 图形处理器。RK3568 支持4K 解码和 1080P 编码&#xff0c;支持SATA/PCIE/USB3.0 外围接口。RK3568内置独立NPU&#xff0c;可用于轻量级人工…

探秘《矩阵之美》:解锁矩阵的无限魅力

在这个数据驱动的时代&#xff0c;矩阵作为数学中的瑰宝&#xff0c;不仅在理论研究中占据核心地位&#xff0c;更在工程技术、计算机科学、物理学、经济学等众多领域发挥着不可替代的作用。今天&#xff0c;让我们通过中科院大学耿修瑞老师&#xff08;中科院空天信息研究院研…

进行性核上性麻痹患者的生活护理指南

进行性核上性麻痹是一种神经系统退行性疾病&#xff0c;合理的生活护理能有效改善症状&#xff0c;提高生活质量。 居家环境要安全。移除地面杂物&#xff0c;铺设防滑垫&#xff0c;安装扶手&#xff0c;降低跌倒风险。在浴室、厨房等湿滑区域要特别加强防护措施。建议在床边、…

pyside6学习专栏(八):在PySide6中使用matplotlib库绘制三维图形

本代码原来是PySide6官网的一个示例程序&#xff0c;我对其进行的详细的注释&#xff0c;同时增加了一个功能&#xff1a;加载显示cass的地形图坐标数据示例&#xff0c;示例可显示以下几种三维图形 程序运行界面如下&#xff1a; 代码如下&#xff1a; # -*- coding: utf-8 -…

松灵机器人地盘 安装 ros 驱动 并且 发布ros 指令进行控制

安装驱动 $ cd ~/catkin_ws/src $ git clone https://github.com/agilexrobotics/ugv_sdk.git $ git clone https://github.com/agilexrobotics/scout_ros.git $ cd .. $ catkin_make安装 ● 使能 gs_usb 内核模块 ● 设置 500k 波特率和使能 can-to-usb 适配器 sudo modp…

python力扣2:两数相加

给你两个 非空 的链表&#xff0c;表示两个非负的整数。它们每位数字都是按照 逆序 的方式存储的&#xff0c;并且每个节点只能存储 一位 数字。 请你将两个数相加&#xff0c;并以相同形式返回一个表示和的链表。 你可以假设除了数字 0 之外&#xff0c;这两个数都不会以 0 开…

服务器间迁移conda环境

注意&#xff1a;可使用迁移miniconda文件 or 迁移yaml文件两种方式&#xff0c;推荐前者&#xff0c;基本无bug&#xff01; 一、迁移miniconda文件&#xff1a; 拷贝旧机器的miniconda文件文件到新机器: 内网拷贝&#xff1a;scp -r mazhf192.168.1.233:~/miniconda3 ~/ 外…

Python 绘制迷宫游戏,自带最优解路线

1、需要安装pygame 2、上下左右移动&#xff0c;空格实现物体所在位置到终点的路线&#xff0c;会有虚线绘制。 import pygame import random import math# 迷宫单元格类 class Cell:def __init__(self, x, y):self.x xself.y yself.walls {top: True, right: True, botto…

恶意 SSP 注入收集密码

SSP 安全服务提供者&#xff0c;是微软提供的与安全有关的函数接口&#xff0c;用户可根据自己的需求调用 SSP 接口实现高度自定义的身份验证等安全功能。攻击者注入恶意的 SSP 接口覆盖微软默认的某些安全功能&#xff0c;导致用户一旦进行身份验证&#xff0c;恶意的 SSP 将保…

Python----数据分析(Numpy:安装,数组创建,切片和索引,数组的属性,数据类型,数组形状,数组的运算,基本函数)

一、 Numpy库简介 1.1、概念 NumPy(Numerical Python)是一个开源的Python科学计算库&#xff0c;旨在为Python提供 高性能的多维数组对象和一系列工具。NumPy数组是Python数据分析的基础&#xff0c;许多 其他的数据处理库&#xff08;如Pandas、SciPy&#xff09;都依赖于Num…

Pytest之fixture的常见用法

文章目录 1.前言2.使用fixture执行前置操作3.使用conftest共享fixture4.使用yield执行后置操作 1.前言 在pytest中&#xff0c;fixture是一个非常强大和灵活的功能&#xff0c;用于为测试函数提供固定的测试数据、测试环境或执行一些前置和后置操作等&#xff0c; 与setup和te…

如何把网络ip改为动态:全面指南

在数字化时代&#xff0c;网络IP地址作为设备在网络中的唯一标识&#xff0c;扮演着至关重要的角色。随着网络环境的不断变化&#xff0c;静态IP地址的局限性逐渐显现&#xff0c;而动态IP地址则因其灵活性和安全性受到越来越多用户的青睐。那么&#xff0c;如何把网络IP改为动…

anythingLLM和deepseek4j和milvus组合建立RAG知识库

1、deepseek本地化部署使用 ollama 下载模型 Tags bge-m3 bge-m3:latest deepseek-r1:32b deepseek-r1:8b 2、安装好向量数据库 milvus docker安装milvus单机版-CSDN博客 3、安装 anythingLLM AnythingLLM | The all-in-one AI application for everyone …

和鲸科技推出人工智能通识课程解决方案,助力AI人才培养

2025年2月&#xff0c;教育部副部长吴岩应港澳特区政府邀请&#xff0c;率团赴港澳宣讲《教育强国建设规划纲要 (2024—2035 年)》。在港澳期间&#xff0c;吴岩阐释了教育强国目标的任务&#xff0c;并与特区政府官员交流推进人工智能人才培养的办法。这一系列行动体现出人工智…

当我删除word文件时无法删除,提示:操作无法完成,因为已在Microsoft Word中打开

现象&#xff1a; 查看电脑桌面下方的任务栏&#xff0c;明明已经关闭了WPS和WORD软件&#xff0c;但是打开word文档时还是提示&#xff1a; 解决方法步骤&#xff1a; 1、按一下键盘上的ctrl Shift Esc 键打开任务管理器 2、在进程中找到如下&#xff1a; 快速找到的方法…

高频面试题(含笔试高频算法整理)基本总结回顾3

目录 一、基本面试流程回顾 二、基本高频算法题展示 三、基本面试题总结回顾 &#xff08;一&#xff09;Java高频面试题整理 &#xff08;二&#xff09;JVM相关面试问题整理 &#xff08;三&#xff09;MySQL相关面试问题整理 &#xff08;四&#xff09;Redis相关面试…

Python中字符串的常用操作

一、r原样输出 在 Python 中&#xff0c;字符串前加 r&#xff08;即 r"string" 或 rstring&#xff09;表示创建一个原始字符串&#xff08;raw string&#xff09;。下面详细介绍原始字符串的特点、使用场景及与普通字符串的对比。 特点 忽略转义字符&#xff1…