从源码剖析Java线程池的工作机制

news2024/11/16 7:45:09

文章目录

  • 从源码剖析Java线程池的工作机制
    • 一、序言
    • 二、基础概念
      • 1、线程调度模型
      • 2、线程池创建方式
        • (1)Executors工具类
        • (2)ThreadPoolExecutor构造方法
          • 2.1 核心参数
          • 2.3 拒绝策略实现
    • 三、源码剖析
      • 1、状态控制变量ctl
      • 2、线程执行execute方法
      • 3、新增线程addWorker方法
      • 4、工作线程Worker内部类
      • 5、启动线程runWorker方法
      • 6、获取阻塞队列getTask方法
      • 7、退出线程processWorkerExit方法
      • 8、线程池关闭
        • (1)shutdown方法
        • (2)shutdownNow方法
    • 四、流程图
    • 五、后记


从源码剖析Java线程池的工作机制

一、序言

在多核处理器环境下,多线程与并发编程已经成为提升程序响应速度和吞吐量的关键手段,对于Java工程师而言,深入理解多线程与并发编程内部机制,是构建高性能、高可用系统的基石。

本文小豪将带大家深入解读Java线程池ThreadPoolExecutor的工作原理,从源码分析其任务处理流程,掌握线程池内部机制,帮助我们更好地应用Java线程池。

文章最后附有流程图,进一步帮我们梳理业务逻辑

二、基础概念

在上一篇【从JDK源码探究Java线程与操作系统的交互】一文中,介绍到Java中线程的底层源码实现,而我们在开发中,通常不太会直接创建线程去使用,大多数仍是通过采用线程池来统一创建、管理线程。

线程池是一种用于管理和复用线程的机制,通过预创建一定数量的线程,来减少线程创建和销毁的开销,提高程序的性能和响应速度。

1、线程调度模型

在Java中,线程池的实现建立在Executor框架之上的,JDK为我们提供了一系列Executor框架的接口、实现类等,便于我们创建使用线程池。

同时,在上一篇我们了解到Java线程对应着操作系统内核级线程,且为一对一映射关系,由操作系统调度所有线程并分配给CPU去执行,这里我们可以认为,Java线程由两层线程调度模型组成:

  • 上层:Executor框架调度Java线程
  • 下层:操作系统调度内核级线程

这里的Executor框架是java.util.concurrent包的一部分,它是实现线程池的顶层接口,Executor接口只有一个方法execute(Runnable command),用于执行一个Runnable任务,其主要目的就是提供一个提交任务的机制,不需要我们直接管理线程的创建和生命周期。

Executor接口下层实现类结构关系如下:

在这里插入图片描述

2、线程池创建方式

基于Executor框架,我们一般创建线程池常用两种方式:通过Executors工具类ThreadPoolExecutor构造方法

(1)Executors工具类

Executors是JDK提供的创建线程池工具类,利于简化线程池的创建,内部提供了大量静态方法,这里列举几个常见的,源码如下:

public class Executors {

    // 创建一个固定大小的线程池
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
    }

    // 创建一个单线程的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO)执行
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<Runnable>()));
    }

    // 创建一个可缓存的线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                60L, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>());
    }

    // 创建一个可以执行延迟任务的线程池,支持定时及周期性任务执行
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
}

但现在大家早已不用Executors工具类来创建线程池了,太死板而且有内存溢出OOM风险,相信没有小伙伴还不知道这一点,具体原因为何大家可阅读阿里的Java开发手册。

内存溢出及内存泄漏小豪在之前有写过一篇【深入JVM:线上内存泄漏问题诊断与处理】,里面详细介绍了内存泄漏的检测方法和解决策略。

(2)ThreadPoolExecutor构造方法

根据Executor接口的类结构关系,以及Executors工具类中静态方法的源码,不难发现底层都是通过ThreadPoolExecutor对象的去实现的。

一般开发中,也基本都是通过实例化ThreadPoolExecutor类的方式,通过其构造方法设置线程池参数,自定义创建线程池,这种方法相对来说更灵活,也规避资源耗尽的风险。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    // 参数校验
    if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    // 入参赋值给成员变量
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
2.1 核心参数

ThreadPoolExecutor构造方法一共有7个参数,分别是:

  • ① 核心线程数(corePoolSize)

    线程池中的核心线程数量,用于定义最少的线程个数,即使空闲默认也不会被销毁(除非设置allowCoreThreadTimeOut属性,允许核心线程超时销毁)。

    创建线程池后,线程池中默认也没有核心线程的,等提交任务之后才会优先创建核心线程,另外,也可以设置创建线程池之后立即初始化核心线程

  • ② 最大线程数(maximumPoolSize)

    线程池中的允许的最大线程数量,当核心线程全部在繁忙且任务队列存满之后,线程池会临时追加非核心线程,直到总线程数达到该值上限

  • ③ 线程空闲等待时间(keepAliveTime)

    线程的空闲等待时间,当线程空闲时间超过配置的等待时间,线程就会被销毁,默认销毁的是非核心线程,如果设置核心线程也可被销毁,则核心线程空闲时间超过等待时间也会被销毁

  • ④ 空闲等待时间单位(unit)

    线程空闲等待时间的单位,秒、分、小时等

  • ⑤ 任务队列(workQueue)

    存放待执行的任务的队列(采用阻塞队列),当核心线程均繁忙时,新提交的任务会存放到该任务队列中,等待被线程执行

  • ⑥ 线程工厂(threadFactory)

    线程工厂,用来创建线程,通常用它来自定义线程名称

  • ⑦ 拒绝策略(handler)

    线程池中的拒绝(饱和)策略,当任务太多无法处理时(任务队列workQueue已满,正在执行的线程数达到maximumPoolSize),采用哪种拒绝策略去处理新任务。

2.3 拒绝策略实现

ThreadPoolExecutor构造方法中的拒绝策略参数,我们需要额外注意一下。

JDK提供了四种内置的拒绝策略:

  • AbortPolicy:丢弃任务并抛出RejectedExecutionException异常,线程池默认的拒绝策略

    public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() {
        }
        // 直接抛出异常
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
        }
    }
    
  • DiscardPolicy:直接丢弃任务

    public static class DiscardPolicy implements RejectedExecutionHandler {
        public DiscardPolicy() {
        }
        // 什么都没有处理
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
    
  • DiscardOldestPolicy:丢弃阻塞队列中最靠前的任务

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public DiscardOldestPolicy() {
        }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                // 弹出最先进入阻塞队列中的任务(即时间最长的)
                e.getQueue().poll();
                // 执行当前新的任务
                e.execute(r);
            }
        }
    }
    
  • CallerRunsPolicy:由提交任务的线程去执行当前任务

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        public CallerRunsPolicy() {
        }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                // 由调度线程执行该任务
                r.run();
            }
        }
    }
    

同时也支持我们定制化拒绝策略,实现RejectedExecutionHandler接口,自定义拒绝策略:

public class CustomRejectedExecution implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        // 自定义拒绝策略
        // 存放至缓存...
    }
}

自定义拒绝策略一般使用较多,当任务量较大时,我们可以先将拒绝的任务放入缓存或数据库,待压力较小时再依次消费处理。

三、源码剖析

熟悉过线程池的基础概念之后,我们开始分析ThreadPoolExecutor线程池处理任务流程的源码实现。

在上文我们了解到,顶层Executor接口的execute()方法用来执行Runnable任务,在ThreadPoolExecutor中,也是通过execute()方法提交任务并执行,因此,我们从execute()方法作为入口,分析其源码。

在此之前,还有一个概念需要了解,一个成员变量ctl

1、状态控制变量ctl

ThreadPoolExecutor中,将线程池状态和工作线程数采用一个原子整数成员变量ctl进行存储:

public class ThreadPoolExecutor extends AbstractExecutorService {

    // (重要)线程池状态控制变量
    // 两个含义:高3位代表当前线程池状态、低29位代表当前工作线程数
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 1字节等于8位,int类型是占用4字节,COUNT_BITS = 32 - 4 = 29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 线程池容量,1左移29位再减1,CAPACITY约等于5亿多
    private static final int CAPACITY = (1 << COUNT_BITS) - 1;

    // 线程池状态
    // RUNNING状态,可以处理新的任务和阻塞队列中的任务
    // 位图111...
    private static final int RUNNING = -1 << COUNT_BITS;
    // SHUTDOWN状态,不再处理新任务,但会处理阻塞队列中的任务
    // 位图000...
    private static final int SHUTDOWN = 0 << COUNT_BITS;
    // STOP状态,不再处理新任务,且不处理阻塞队列中的任务
    // 位图001...
    private static final int STOP = 1 << COUNT_BITS;
    // TIDYING状态,过度状态,所有任务都已经执行完毕,即将关闭线程池
    // 位图010...
    private static final int TIDYING = 2 << COUNT_BITS;
    // TERMINATED状态,线程终止
    // 位图011...
    private static final int TERMINATED = 3 << COUNT_BITS;

    // 获取线程状态,根据&运算特点,始终获取ctl高3位,即当前线程池状态
    private static int runStateOf(int c) {
        return c & ~CAPACITY;
    }

    // 计算工作线程数,根据&运算特点,始终获取ctl低29位,即当前工作线程数
    private static int workerCountOf(int c) {
        return c & CAPACITY;
    }

    private static int ctlOf(int rs, int wc) {
        return rs | wc;
    }
    
    // 其它代码省略......
}

这里的状态控制变量ctl设计的十分巧妙,通过一个AtomicInteger类型存储了两个变量值,AtomicInteger类型占用4字节32位,操作时都是基于位运算,高3位代表当前线程池状态、低29位代表当前工作线程数

同时这里也确保了共享变量的线程安全,利用AtomicInteger的原子操作,确保线程池状态和工作线程数始终是保持统一的。

不懂位运算以及这里为何可以通过runStateOf()workerCountOf()方法获取到线程状态和工作线程数的小伙伴,可以自己写个Demo验证一下这个结论,本篇着重分析线程池工作流程的源码,这里不做过多赘述。

小豪读到这里时也不得不感叹Java并发编程大师Doug Lea确实流弊!

2、线程执行execute方法

通过状态控制变量ctl,我们得到两个方法:

  1. runStateOf(int c):获取当前线程状态
  2. workerCountOf(int c):获取当前工作线程数

接着我们正式进入ThreadPoolExecutorexecute方法,源码如下:

public void execute(Runnable command) {
    // 参数验证,判断任务是否为空,为空则抛出空指针异常
    if (command == null)
        throw new NullPointerException();

    // 获取状态控制变量clt的值,用于后续获取线程池状态和工作线程数量
    int c = ctl.get();
    // 阶段一:启动核心线程 -----------------------------------------
    // 判断当前工作线程数小于核心线程数corePoolSize
    if (workerCountOf(c) < corePoolSize) {
        // 创建一个核心线程,并把任务添加到该线程中运行
        // addWorker()方法:参数二为true,代表添加核心线程;参数二为false,代表添加非核心线程
        if (addWorker(command, true))
            return;
        // 若添加失败,则重新获取控制变量clt的值;
        c = ctl.get();
    }
    // 走到这里,说明workerCountOf(c) >= corePoolSize,核心线程数已满

    // 阶段二:核心线程数已满,任务添加至阻塞队列 -----------------------
    // 判断当前线程池是否为运行状态 且 新任务添加到阻塞队列成功(即阻塞队列未满)
    if (isRunning(c) && workQueue.offer(command)) {
        // 重新获取ctl的值
        int recheck = ctl.get();
        // 再次检验线程池是否是运行状态
        // 如果不是运行状态,由于已经把任务添加到workQueue阻塞队列中了,此时需移除该任务
        if (!isRunning(recheck) && remove(command))
            // 线程池不是运行状态 且 移除任务成功,触发拒绝策略
            reject(command);
        // 线程池是运行状态
        // 获取线程池中的有效线程数,如果是0,则新建一个非核心线程(确保线程池在RUNNING状态下至少有一个线程执行任务)
        else if (workerCountOf(recheck) == 0)
            // addWorker()方法:参数一为null,代表在线程池中新创建一个线程,但没有传入任务
            // 任务已经被添加到workQueue阻塞队列中,新建的非核心线程会从workQueue中获取任务来执行
            addWorker(null, false);
    }
    // 走到这里,说明两种情况:
    // 1.线程池已经不是RUNNING状态
    // 2.线程池是RUNNING状态,但workQueue阻塞队列已满(核心线程数在之前也以及满了)

    // 阶段三:阻塞队列已满,启动非核心线程 -----------------------------
    // 再次调用addWorker()方法,参数二为false,代表创建一个非核心线程
    else if (!addWorker(command, false))
        // 创建非核心线程失败,执行拒绝策略(工作线程数已达到maximumPoolSize最大线程数的限制)
        reject(command);
}

在源码中,execute方法大致包含三个阶段:

  • 阶段一:当前工作线程数小于核心线程数,创建核心线程
  • 阶段二:核心线程数已满(当前工作线程数大于等于核心线程数),任务添加至阻塞队列
  • 阶段三:阻塞队列已满(且核心线程数已满),创建非核心线程,若非核心线程已满,则执行拒绝策略

execute方法的整体逻辑比较清晰,其中多次出现addWorker()方法,我们继续往下走。

3、新增线程addWorker方法

addWorker()方法主要用于新增工作线程,其主要参数有两个:

  1. 参数一:Runnable firstTask 任务实例
    • 不为null:创建线程并启动传入的任务实例
    • 为null:创建线程,但不启动任务,后续从阻塞队列中获取任务
  2. 参数二:boolean core 是否核心线程
    • true:创建核心线程
    • false:创建非核心线程

具体源码如下:

private boolean addWorker(Runnable firstTask, boolean core) {
    // 标志位
    retry:
    // 外层死循环
    for (; ; ) {
        // 获取状态控制变量clt的值
        int c = ctl.get();
        // 获取当前线程池状态
        int rs = runStateOf(c);

        // 判断是否需要新创建工作线程
        // 如果当前线程池非运行状态(rs >= SHUTDOWN,代表状态为SHUTDOWN、STOP、TIDYING、TERMINATED其中之一)
        // 并且
        // 当前线程池状态>SHUTDOWN 或 新任务不为空 或 阻塞队列为空
        // 均不需要创建新的工作线程,直接返回false
        if (rs >= SHUTDOWN &&
                !(rs == SHUTDOWN &&
                        firstTask == null &&
                        !workQueue.isEmpty()))
            return false;
        // !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())
        // 这段代码逻辑有点绕,其实就是线程池为SHUTDOWN时,会停止接收新任务,但还是会处理完阻塞队列中的任务,如过是这种情况,这里就不会返回false

        // 内层死循环
        // 采用CAS,将工作线程个数+1
        for (; ; ) {
            // 获取当前线程池工作线程数
            int wc = workerCountOf(c);
            // 判断当前工作线程数是否符合要求
            // 当前工作线程数是否>=线程池容量 或 根据参数二core判断当前工作线程数是否>=核心线程数/最大线程数
            if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 尝试将线程池工作线程数量+1
            if (compareAndIncrementWorkerCount(c))
                // 成功则退回标志位,即结束外层循环
                break retry;
            // workerCount线程数量添加失败,重新获取控制变量ctl的值
            c = ctl.get();
            // 如果当前线程池的运行状态不等于上面rs,代表在此期间线程池运行状态已经发生改变
            if (runStateOf(c) != rs)
                //返回外层for循环继续执行
                continue retry;
            // 若仍未添加数量成功,则CAS自旋重新尝试添加
        }
    }

    // 创建两个状态标记
    // 标记线程是否启动
    boolean workerStarted = false;
    // 标记线程是否添加
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 将firstTask当前任务封装为Worker对象
        w = new Worker(firstTask);
        // 拿到worker对象创建的线程
        final Thread t = w.thread;
        if (t != null) {
            // 加锁ReentrantLock,确保线程安全
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 获取当前线程池状态
                int rs = runStateOf(ctl.get());

                // 校验当前线程池状态
                // rs < SHUTDOWN 代表当前线程是否为RUNNING运行状态
                // 或
                // 线程池状态为SHUTDOWN状态 且 firstTask新任务为null(线程池状态SHUTDOWN时,仍会执行workQueue中的任务)
                if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                    // 判断当前线程已经还存活
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    // 添加到工作线程集合workers中
                    // workers是一个HashSet集合,线程不安全,因此上面需要加锁
                    workers.add(w);
                    int s = workers.size();
                    // 统计largestPoolSize线程池中的最大线程数量
                    if (s > largestPoolSize)
                        // 更新最大值
                        largestPoolSize = s;
                    // worker线程添加成功,标记线程已添加
                    workerAdded = true;
                }
            } finally {
                // 解锁
                mainLock.unlock();
            }
            // 判断线程是否添加成功
            if (workerAdded) {
                // 成功则启动线程
                t.start();
                // 标记worker线程已启动
                workerStarted = true;
            }
        }
    } finally {
        // 判断线程是否未启动成功
        if (!workerStarted)
            // 在workers集合中移除该worker
            addWorkerFailed(w);
    }
    // 返回worker线程是否启动成功
    return workerStarted;
}

观察源码发现,addWorker()方法主要分为两步:

  • 校验线程池状态和工作线程数

    • 首先判断当前线程池状态是否非运行状态,并且当前线程池状态不为SHUTDOWN或新任务不为空或阻塞队列为空,这两种情况均不需要创建新的工作线程,直接返回false
    • 紧接着判断当前工作线程数是否符合要求,是否超过线程池容量或核心/最大线程数(根据参数二core判断),超过则直接返回false
    • 否则通过CAS不断尝试将工作线程数量+1,成功则跳出循环,执行下一步
  • 创建worker线程对象并启动线程

    • 创建worker对象,封装firstTask任务,采用独占锁将worker工作线程对象放入工作队列workers集合,同时更新最大线程数的值,最后启动线程。

      // 将firstTask当前任务封装为Worker对象
      w = new Worker(firstTask)
      
    • 若线程启动失败则调用addWorkerFailed()方法在workers集合中移除该worker对象

4、工作线程Worker内部类

ThreadPoolExecutor中,线程统一被封装为Worker内部类,即具体的工作线程类,Worker继承自AQS并实现了Runnable接口:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

    private static final long serialVersionUID = 6138294804551838833L;

    // 当前的线程实例
    final Thread thread;

    // 当前的任务实例
    Runnable firstTask;

    // 记录线程完成的任务数
    volatile long completedTasks;

    // 构造方法,入参为firstTask任务实例
    Worker(Runnable firstTask) {
        // (重要)设置worker线程状态为-1,表示禁止线程中断
        setState(-1);
        // 赋值成员变量,保存firstTask任务实例
        this.firstTask = firstTask;
        // 通过线程工厂创建新线程(传入当前对象),真正的线程
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {
        // (重要)线程启动时执行的任务
        runWorker(this);
    }

    // 是否持有独占锁
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    // 尝试获取独占锁
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    // 尝试释放独占锁
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock() {acquire(1);}

    public boolean tryLock() {return tryAcquire(1);}

    public void unlock() {release(1);}

    public boolean isLocked() {return isHeldExclusively();}

    // 线程启动后,进行线程中断时执行方法
    void interruptIfStarted() {
        Thread t;
        // worker中状态 >= 0 且 线程不为空 且 当前线程未中断
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                // 中断线程
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

Worker类的构造方法首先会通过setState(-1)设置线程状态state-1,当state-1时,代表线程此时不能被中断,这里采用的是**AQS的独占锁模式**,只有当state不为-1(独占锁被释放),线程才被允许中断,具体中断逻辑在interruptIfStarted()方法。

在创建完worker工作线程对象后,会启动该工作线程,调用wokerrun()方法,run()方法内部又调用了ThreadPoolExecutor类的runWorker()方法。

5、启动线程runWorker方法

runWorker()方法会执行具体的任务,源码如下:

final void runWorker(Worker w) {
    // 获取当前线程
    Thread wt = Thread.currentThread();
    // 获取worker构造方法传入的任务实例
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 解锁,设置worker状态state为0,表示允许线程中断
    w.unlock();
    // 标识线程退出原因,true代表线程异常退出,false代表线程正常结束
    boolean completedAbruptly = true;
    try {
        // 判断当前任务是否不为空 或 从阻塞队列获取成功
        // getTask()方法从阻塞队列获取一个任务
        while (task != null || (task = getTask()) != null) {
            // 加锁处理并发问题,防止shutdown()时终止正在执行的worker
            w.lock();
            // 判断如果线程池状态 >= STOP 且线程未被中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                            runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                // 中断线程
                wt.interrupt();
            try {
                // 可自定义实现的扩展点,执行前的扩展方法
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行任务,对应任务的run()方法
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x;
                    throw x;
                } catch (Error x) {
                    thrown = x;
                    throw x;
                } catch (Throwable x) {
                    thrown = x;
                    throw new Error(x);
                } finally {
                    // 扩展点,执行后的扩展方法
                    afterExecute(task, thrown);
                }
            } finally {
                // 任务对象置空
                task = null;
                // 完成的工作任务数+1
                w.completedTasks++;
                // 解锁
                w.unlock();
            }
        }
        // 标识线程为正常结束退出
        completedAbruptly = false;
    } finally {
        // 处理worker线程退出
        processWorkerExit(w, completedAbruptly);
    }
}

其内部流程主要如下:

  1. 首先解锁worker,允许线程中断
  2. 接着通过while循环判断当前任务是否为空,若为空则调用getTask()方法从阻塞队列中获取待处理的任务
  3. 获取到任务后工作线程直接加锁,同时根据线程池状态判断是否中断当前线程
  4. 执行对象任务的run()方法,在任务执行前后均提供扩展点可自定义扩展
  5. 重复第2步的逻辑,直到阻塞队列中的任务全部执行完毕,最后调用processWorkerExit()方法处理worker线程退出

这里我们也发现,线程池中的线程复用实际上就是通过runWorker()方法中while循环实现的,worker工作线程不断的从阻塞队列中获取新的任务,直接调用任务的run()方法,避免去创建新线程,这一过程实现了线程复用

6、获取阻塞队列getTask方法

getTask()方法用来获取阻塞队列中的任务,源码如下:

private Runnable getTask() {
    // 标记上一次从阻塞队列获取任务是否超时
    boolean timedOut = false;

    // 死循环
    for (; ; ) {
        // 获取ctl值
        int c = ctl.get();
        // 获取当前线程池状态
        int rs = runStateOf(c);

        // 判断是否需获取任务
        // 若线程池状态 >= STOP 或 (线程池状态为SHUTDOWN 且 阻塞队列为空),则不需要再处理任务
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // 工作线程数-1
            decrementWorkerCount();
            // 直接返回
            return null;
        }

        // 获取当前工作线程数
        int wc = workerCountOf(c);

        // timed标记用于判断是否需进行超时控制
        // allowCoreThreadTimeOut是否运行核心线程超时销毁(默认为false,核心线程不允许超时销毁)
        // wc > corePoolSize当前工作线程数大于核心线程数
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 工作线程数 > maximumPoolSize
        // 或
        // timed && timedOut,timedOut首次进入默认为false
        if ((wc > maximumPoolSize || (timed && timedOut))
                // 工作线程数大于1
                // 或
                // 阻塞队列为空
                && (wc > 1 || workQueue.isEmpty())) {
            // 采用CAS,工作线程数-1,并且返回null
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        // 执行到这,说明前面线程池状态和工作线程数校验已通过,开始从阻塞队列获取任务
        try {
            // timed为true代表核心线程允许超时销毁 或 当前工作线程数大于核心线程数(存在非核心线程)
            // timed = true:超时等待获取任务poll(),超时未获取到则返回null
            // timed = false:阻塞获取任务take(),一直等着直到获取到任务
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            // 判断是否获取到任务
            if (r != null)
                // 获取到直接返回
                return r;
            // 未获取到,代表等待超时也没有拿到任务,设置timedOut值为true
            timedOut = true;
        } catch (InterruptedException retry) {
            // 即使异常也循环重试
            timedOut = false;
        }
    }
}

大致过程其实就是从workQueue阻塞队列中获取任务,在获取之前也会校验线程池状态和工作线程数是否合规,最后根据条件选择超时等待获取还是阻塞获取。

7、退出线程processWorkerExit方法

runWorker()方法执行结束之后在finally代码块中会调用processWorkerExit()方法,主要用来销毁空闲线程,具体源码如下:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // completedAbruptly:true代表线程异常退出,false代表线程正常结束
    if (completedAbruptly)
        // 异常退出,工作线程数-1
        decrementWorkerCount();

    // 加锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 统计当前worker完成的任务数,累加到线程池总完成任务数
        completedTaskCount += w.completedTasks;
        // 从线程池HastSet集合中移除当前工作线程
        workers.remove(w);
    } finally {
        // 解锁
        mainLock.unlock();
    }

    // 判断是否终止线程池
    tryTerminate();

    // 获取ctl值
    int c = ctl.get();
    // 当前线程池状态 < STOP,即RUNNING或SHUTDOWN状态时
    if (runStateLessThan(c, STOP)) {
        // 判断线程是否为正常结束
        if (!completedAbruptly) {
            // 正常结束退出
            // allowCoreThreadTimeOut是否运行核心线程超时销毁
            // 若允许,min最小线程数为0,不允许则为corePoolSize核心线程数
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 判断workQueue阻塞队列是否不为空
            if (min == 0 && !workQueue.isEmpty())
                // 确保至少有一个worker线程
                min = 1;
            //判断工作线程数 >= min最小线程数
            if (workerCountOf(c) >= min)
                // 符合条件直接返回
                return;
        }
        // 若线程为异常退出或不符合条件(工作线程数 < min最小线程数)
        // 创建一个非核心线程执行任务
        addWorker(null, false);
    }
}

源码中,会将worker工作线程从线程池HastSet集合中移除,最后也会确保当前工作线程数不小于最小线程数,通过processWorkerExit()方法,控制了线程池中线程数量的大小变化

8、线程池关闭

线程池关闭有两种方法,分别是shutdown()shutdownNow()

(1)shutdown方法

shutdown()方法首先会将线程池状态设置为SHUTDOWN,不再接收新的任务。

内部正在执行的任务和阻塞队列中的任务都会继续执行,待均执行完毕之后会关闭线程池。

具体源码如下:

public void shutdown() {
    // 加锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 安全检查
        checkShutdownAccess();
        // 设置线程池状态为SHUTDOWN
        advanceRunState(SHUTDOWN);
        // 中断线程池中空闲的线程
        interruptIdleWorkers();
        onShutdown();
    } finally {
        mainLock.unlock();
    }
    // 判断是否终止线程池
    tryTerminate();
}
(2)shutdownNow方法

shutdownNow()方法则是将线程池状态设置为STOP,不再接收新的任务。

但所有线程都会被中断(已经拿到任务正在执行的不会被中断),且阻塞队列中的任务会被丢弃,不再执行了,并且返回未执行的任务队列。

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 设置线程池状态为STOP
        advanceRunState(STOP);
        // 中断线程池中所有线程
        interruptWorkers();
        // 清空阻塞队列,赋值给tasks返回
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    // 判断是否终止线程池
    tryTerminate();
    // 返回阻塞队列中未被执行的任务
    return tasks;
}

四、流程图

在这里插入图片描述

五、后记

本文着重带大家分析了ThreadPoolExecutor线程池处理任务流程的源码实现,过程中额外介绍了线程池的创建方式以及核心参数,详细通过本文,大家更深入的理解了线程池的原理和工作机制。

但在我们实际开发中,使用线程池主要关注线程池大小的设置,合理配置线程池大小能够更高的提升程序性能和稳定性,国内通用的共识即根据CPU密集型和IO密集型来设定线程数,这里面其实是基于任务的阻塞系数来商榷增减线程数,一般阻塞时间占比越高,设置越多线程,CPU计算时间所占越高,设置越少线程

网上有很多配置线程池大小的公式,但这些公式无法适用于所有业务场景,我们只能根据实际场景,通过压测及性能监控等手段,不断的调整线程池大小,确定出合适的线程数量,实现最高的任务处理效率。

下一篇,小豪将会继续更新Java多线程与并发编程相关内容,创作不易,如果大家觉得内容对你有收获,不妨考虑关注关注小豪~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1989929.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MindSearch:AI 时代的“思考型”搜索引擎

随着AI技术的飞速发展&#xff0c;搜索引擎领域也迎来了新的变革。继 OpenAI 发布 SearchGPT 之后&#xff0c;国内也涌现出一批优秀的AI搜索引擎&#xff0c;其中&#xff0c;由中科大和上海人工智能实验室联合研发的 MindSearch&#xff08;思索&#xff09;尤为引人注目。这…

php收银系统源码-线上下单,门店接单

1.收银系统开发语言 核心开发语言: PHP、HTML5、Dart后台接口: PHP7.3后合管理网站: HTML5vue2.0element-uicssjs线下收银台&#xff08;安卓/PC收银、安卓自助收银&#xff09;: Dart3框架&#xff1a;Flutter 3.19.6助手: uniapp商城: uniapp 2.线上商城下单&#xff0c;门…

theaterjs使用

import theaterJS from "theaterjs"; interface ITheaterOptions {autoplay?: boolean;minSpeed?: {type: number;erase: number;};maxSpeed?: {type: number;erase: number;}; } export default function useTheater(id: string, options: ITheaterOptions, addS…

变压器耦合放大器(低频应用+高频应用)

2024-8-7&#xff0c;星期三&#xff0c;22:49&#xff0c;天气&#xff1a;晴&#xff0c;心情&#xff1a;晴。下班抽出点时间看看书&#xff0c;话不多说&#xff0c;学习开始啦。 今日继续学习模电自选教材的第六章&#xff0c;多级放大器、RF放大器和功率放大器。主要学习…

【Linux】进程概念—环境变量

目录 一、冯诺依曼体系结构 二、操作系统(Operator System) 1 .概念 2 .设计OS的目的 3 . 定位 4 . 系统调用和库函数概念 三、进程 1 .基本概念 2 .描述进程-PCB&#xff08;process control block&#xff09;进程控制块 3 . 组织进程 4 . 查看进程 5 .通过系统调用获取进程…

【iMSTK】第一期 imstk配置过程

很高兴在雪易的CSDN遇见你 VTK技术爱好者 QQ&#xff1a;870202403 公众号&#xff1a;VTK忠粉 前言 本文分享imstk的配置和使用过程&#xff0c;希望对各位小伙伴有所帮助&#xff01; 感谢各位小伙伴的点赞关注&#xff0c;小易会继续努力分享&#xff0c;一起进步&…

man手册安装

1.什么是man手册 &#xff08;输入man man) linux系统提供的有关函数或指令介绍的相关帮助手册&#xff0c;可以在该手册也中查看函数、指令功能&#xff0c;说白了就是相关操作说明书&#xff0c;一共有七章&#xff0c;主要使用前三章&#xff0c;第一章是shell指令相关说明…

TPS和QPS达到多少才算高并发?

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐&#xff1a;「storm…

操作系统 IO 相关知识

操作系统 IO 相关知识 阻塞与非阻塞同步与异步IO 和系统调用传统的 IODMAmmap 内存映射sendfilesplice 常用的 IO 模型BIO&#xff1a;同步阻塞 IONIO&#xff1a;同步非阻塞 IOIO 多路复用信号驱动 IOAIO&#xff1a;异步 IO 模型 IO 就是计算机内部与外部进行数据传输的过程&…

【Python系列】pathlib模块

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

C语言项目——贪吃蛇,为什么用curses,定义上下左右

在Linux系统中&#xff0c;使用ncurses在程序编译时还要加上 -lcurses 即&#xff1a;gcc cursedemo.c -lcurses #include<curses.h> int main() {initscr(); //ncurse界面的初始化函数printw("This is a curses window.\n");//再ncurse模式下的printfgetc…

【代码随想录训练营第42期 Day22打卡 回溯Part1 - LeetCode 77. 组合 216.组合总和III 17.电话号码的字母组合

目录 一、做题心得 二、回溯基础知识 1.定义 2.适用问题 3.一个思想 4.代码实现 三、题目与题解 题目一&#xff1a;77. 组合 题目链接 题解&#xff1a;回溯 题目二&#xff1a;216.组合总和III 题目链接 题解&#xff1a;回溯 题目三&#xff1a;17.电话号码的字…

企元数智百年营销史的精粹:借鉴历史创造未来商机

随着时代的发展和科技的进步&#xff0c;传统营销方式正在经历前所未有的颠覆和改变。在这个数字化时代&#xff0c;企业需要不断创新&#xff0c;同时借鉴百年营销史的精粹&#xff0c;汲取历史经验&#xff0c;创造未来商机。而"企元数智"作为现代营销的代表&#…

骑行激情,燃动巴黎——维乐Angel Revo坐垫,赋能你的奥运梦想!

当奥林匹克圣火在塞纳河畔熊熊燃烧&#xff0c;巴黎的街头巷尾都弥漫着骑行的激情。2024年的夏天&#xff0c;自行车赛道上&#xff0c;每一圈轮毂的转动都凝聚着运动员的汗水与荣耀。金牌赛程已定&#xff0c;女子公路自行车决赛于7月27日20:30鸣枪&#xff0c;男子紧随其后&a…

【Nuxt】编写接口和全局状态共享

编写接口 ~/server/api/homeInfo.get.ts export default defineEventHandler((event) > {return {code: 200,data: {name: hello world}} })服务端有一些方法可以快速获取请求常见字段&#xff1a; getQuery(event)getMethod(event)await readBody(event)await readRawBo…

配置Cuttlefish 虚拟 Android 设备

google 参考资料&#xff1a; https://source.android.com/docs/setup/start?hlzh-cn https://source.android.com/docs/devices/cuttlefish/get-started?hlzh-cn Cuttlefish 开始 验证 KVM 可用性 Cuttlefish 是一种虚拟设备&#xff0c;依赖于宿主机上可用的虚拟化。 …

鸿蒙Harmony开发:通用焦点样式事件规范

基础概念 焦点、焦点链和走焦 焦点&#xff1a;指向当前应用界面上唯一的一个可交互元素&#xff0c;当用户使用键盘、电视遥控器、车机摇杆/旋钮等非指向性输入设备与应用程序进行间接交互时&#xff0c;基于焦点的导航和交互是重要的输入手段。焦点链&#xff1a;在应用的组…

Docker安装teslamate

要求 Docker&#xff08;如果不熟悉 Docker&#xff0c;请参阅安装 Docker 和 Docker Compose&#xff09;一台始终开启的机器&#xff0c;因此 TeslaMate 可以持续获取数据计算机上至少有 1 GB 的 RAM 才能成功安装。外部互联网访问&#xff0c;与 tesla.com 交谈 创建一个名…

【数据结构】队列篇

文章目录 1.队列1.1 队列的概念及结构 2. 队列的实现2.1 准备工作2.2 队列的初始化2.3 队尾入队列2.4 队头出队列2.5 获取队列头部元素2.6 获取队列队尾元素2.7 获取队列有效元素个数2.8 检测队列是否为空2.9 销毁队列 3. 代码整合 1.队列 1.1 队列的概念及结构 队列&#xff…

黑马Java零基础视频教程精华部分_15_基本查找/顺序查找、二分查找/折半查找、插值查找、斐波那契查找、分块查找、哈希查找

系列文章目录 文章目录 系列文章目录一、基本查找/顺序查找核心思想&#xff1a;从0索引开始挨个往后查找代码&#xff1a;练习&#xff1a;定义一个方法利用基本查找&#xff0c;查询某个元素在数组中的索引&#xff0c;数组包含重复数据。 二、二分查找/折半查找核心思想:属于…