并发编程-线程池

news2024/11/23 23:52:28

并发编程-线程池

本篇我们主要围绕线程池的原理以及源码进行分析,事实上线程池本身并不是什么新的技术,而是在池化技术的思想上把一些工具类整合起来。话不多说,我们开始进入正题。我们先来认识一下什么是线程池

概念

线程池(Thread Pool)是一种基于池化技术设计,用于管理一组工作线程的技术。它允许开发者以较小的开销来创建、管理和销毁线程,通过复用线程来执行多个并发任务,从而提高应用程序的响应速度和吞吐量。线程池通过维护一个运行中的线程集合来管理任务,这些线程可以在多个任务之间共享,从而减少创建和销毁线程的开销。如何更浅显易懂的理解线程池的概念,我们通过一个生活中的例子来描述一下,大家都去过肯德基(KFC) ,我们不想去或者不方便去店内就餐时都会点宅急送,想象一下如果你是一家KFC的老板,你的餐厅提供送餐服务。每当有顾客下单时,你都需要派人去送餐。如果每次有订单就临时招聘一个送餐员,送完餐后再解雇他,这样不仅效率低下(招聘和解雇都需要时间),而且成本也很高(每次都要支付招聘和解雇的费用)。为了解决这个问题,KFC会招募一些固定的骑手,这个骑手的整体相当于一个“送餐员池”。这个池子里有几个固定的骑手(相当于线程),他们平时在餐厅待命,等待送餐的任务。当有新的订单时,就从池子里挑一个空闲的送餐员去送餐,送完后再让他回到池子里等待下一个任务。如果所有的送餐员都在忙,新的订单就会进入一个等待队列,直到有送餐员完成任务并回到池子中。基本理解概念之后,我们来看看线程池如何使用的

使用

public class ThreadPoolExample {
    public static void main(String[] args) {
        // 创建一个固定大小的线程池
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
        // 提交任务到线程池
        for (int i = 0; i < 10; i++) {
            final int index = i;
            //此处也可使用fixedThreadPool.execute()
            fixedThreadPool.submit(() -> {
                System.out.println(Thread.currentThread().getName() + " is processing " + index);
            });
        }
        // 关闭线程池
        fixedThreadPool.shutdown();
    }
}

执行结果如下图所示

在这里插入图片描述

注:这里需要简单说明一下 execute() 方法和 submit() 方法的区别

  1. execute(Runnable command): execute 方法没有返回值。它接收一个实现了 Runnable 接口的任务,该任务不返回结果。
  2. submit(Callable task) / submit(Runnable task, T result): submit 方法返回一个 Future对象,该对象代表了异步计算的结果。如果任务通过 Callable 提交,Future 将能够返回任务执行的结果。如果任务通过 Runnable 和结果值提交,则 Future 的 get() 方法将返回指定的结果值,而不是任务执行的结果。

类型

我们在写线程池使用示例代码时是以newFixedThreadPool()为例子,它还存在其他类型的线程池,如下图所示

在这里插入图片描述

下面我们来看一下这几种线程池

  1. newFixedThreadPool()

    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
    }
    

    newFixedThreadPool(int nThreads) 是指创建一个固定线程数量的线程池,线程池中线程的数量就是传入的参数 nThreads。在需要固定线程数量的场景可使用 newFixedThreadPool()。

  2. newSingleThreadExecutor()

    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
    }
    

    newSingleThreadExecutor() 是指创建一个只有一个线程的线程池,这样所有的任务都由这一个线程来执行,从而保证了执行的顺序。

  3. newCachedThreadPool()

    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
    }
    

    是指创建一个可缓存的线程池,这个线程池会尽量重用已有的线程,当线程池中当前没有可用的线程时,会创建一个新线程来处理任务。如果线程池中的线程空闲时间超过了指定的时间(默认情况下,这个时间是 60 秒),那么这些空闲线程将被终止并从缓存中移除。如果之后再次需要新的线程来执行任务,线程池会重新创建线程。需要注意的是,由于 newCachedThreadPool() 允许线程池的大小无限制地增长(直到内存耗尽),所以在某些情况下可能会导致大量的线程被创建,进而可能耗尽系统的资源。因此,在使用 newCachedThreadPool() 时,需要仔细考虑任务的特点和系统的资源状况。

  4. newScheduledThreadPool()

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
    public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue());
    }
    
    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), defaultHandler);
    }
    

    newScheduledThreadPool() 是指创建一个可调度线程池。这个线程池可以在给定的延迟后执行命令,或者定期地执行命令。常用于定时轮询、心跳检测等场景。下面我们通过代码来演示一下这种线程池是如何使用的。

    public class ScheduledThreadPoolPeriodicExample {
        public static void main(String[] args) {
            // 创建一个有2个线程的ScheduledExecutorService
           ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
           System.out.println("开始时间: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
            // 安排一个任务在5秒后执行
           Runnable task = () -> {
            System.out.println("执行任务: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
           };
    
           // 提交任务,延迟5秒执行 (演示1)
           executor.schedule(task, 5, TimeUnit.SECONDS);
           //提交任务,初始延迟为0秒,之后每3秒执行一次(演示2)
           //executor.scheduleAtFixedRate(task, 0, 3, TimeUnit.SECONDS);
    
           // 为了体现演示效果,延迟10秒关闭线程池
           try {
               // 等待10秒,确保任务有足够的时间执行
               Thread.sleep(10000);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
    
           // 关闭线程池
           executor.shutdown();
           System.out.println("线程池执行完时间: "+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
        }
    }
    

    演示1的执行结果如下图所示

    在这里插入图片描述

    演示2的执行结果如下图所示

    在这里插入图片描述

  5. newWorkStealingPool()

    public static ExecutorService newWorkStealingPool() {
            return new ForkJoinPool
                (Runtime.getRuntime().availableProcessors(),
                 ForkJoinPool.defaultForkJoinWorkerThreadFactory,
                 null, true);
    }
    

    newWorkStealingPool() 是指创建并返回一个工作窃取线程池(Work-Stealing Pool)。这个线程池是 Java 7 引入的,旨在提高多线程应用程序的性能,特别是在执行大量小任务时。工作窃取算法是这种线程池的核心,它允许线程从其他线程的工作队列中“窃取”任务来执行,从而保持线程忙碌,减少线程空闲时间,提高整体的吞吐量。简单来说更适用于任务拆解汇总的场景,这里涉及到了Fork/Join 在前面的文章中也没有讲述过,后续争取补充关于Fork/Join 的文章,这里先大致了解含义。

细心的小伙伴不难发现除了 newWorkStealingPool() 以外,其余类型的线程池的核心都是 ThreadPoolExecutor() 这就引出了提到线程池就绕不开的话题——线程池的核心参数。

核心参数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
}

我们来逐个看一下它的这些参数

  1. corePoolSize 核心线程数,即使它们处于空闲状态,线程池也会保留这些线程。
  2. maximumPoolSize 最大线程数 ,线程池中允许最大线程数量
  3. keepAliveTime 存活时间,当线程数大于核心时,这是多余空闲线程在终止前等待新任务的最长时间。
  4. unit:keepAliveTime 参数的时间单位。
  5. workQueue:用于存放待执行任务的阻塞队列。
  6. threadFactory:用于创建新线程的工厂。
  7. handler:拒绝策略,当线程池无法接受新任务时(因为它已经关闭或已达到容量),用于处理这种情况的饱和策略。

在一些开发手册或者代码规范中都提倡显式的创建线程池即采用 new ThreadPoolExecutor() 来创建线程池。这是因为我们可以根据系统的负载情况动态调整线程的数量,从而优化应用程序的性能。并且我们还可以集中管理线程池的核心参数(配置文件),使代码容易维护。

整个线程池的核心参数不难理解,这里我们要重点看一下最后一个参数——拒绝策略

拒绝策略

拒绝策略(RejectedExecutionHandler)是在线程池无法接受新任务时,决定如何处理这些被拒绝任务的一种机制。当线程池中的任务队列满且线程数量达到最大值时,如果继续提交任务,就会触发拒绝策略。线程池提供了四种预定义的拒绝策略,还可以实现RejectedExecutionHandler 接口来自定义拒绝策略。

  1. AbortPolicy(中断策略)

    线程池默认的拒绝策略。当任务无法被提交给线程池时,会直接抛出 RejectedExecutionException 异常。

    适用于对任务提交失败要求敏感的场景,需要明确知道任务是否被接受并执行。

    我们通过代码示例来演示一下 这个拒绝策略

    public class AbortPolicyExample {
        public static void main(String[] args) {
            // 使用AbortPolicy(中断策略,默认)
            ThreadPoolExecutor executorWithAbortPolicy = new ThreadPoolExecutor(
                    5, 10, 0L, TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<>(5),
                    //这里用默认的 ThreadFactory
                    Executors.defaultThreadFactory(),
                    // 显式指定中断策略
                    new ThreadPoolExecutor.AbortPolicy()
            );
    
            // 示例:提交任务
            // 提交超过线程池处理能力的任务,以触发拒绝策略
            for (int i = 0; i < 20; i++) {
                int taskId = i;
                executorWithAbortPolicy.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + " 正在执行任务 " + taskId);
                    try {
                        // 模拟任务执行时间
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                });
                // 注意:这里不会捕获RejectedExecutionException,因为它是由线程池内部处理的
            }
    
            // 关闭线程池
            executorWithAbortPolicy.shutdown();
        }
    }
    

    执行结果如下图所示

    在这里插入图片描述

  2. DiscardPolicy(直接丢弃策略)

    直接丢弃策略,当任务无法被提交给线程池时,直接丢弃该任务,不会有任何提示或处理。

    适用于一些无关紧要的业务,即使任务被丢弃也不会对系统造成太大影响。

    我们通过代码来演示一下直接丢弃策略

    public class DiscardPolicyExample {
        public static void main(String[] args) {
            // 使用DiscardPolicy(直接丢弃策略)
            ThreadPoolExecutor executorWithDiscardPolicy = new ThreadPoolExecutor(
                    5, 10, 0L, TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<>(5),
                    Executors.defaultThreadFactory(),
                    // 直接丢弃策略
                    new ThreadPoolExecutor.DiscardPolicy()
            );
            // 提交超过线程池处理能力的任务,以触发拒绝策略
            for (int i = 0; i < 20; i++) {
                int taskId = i;
                executorWithDiscardPolicy.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + " 正在执行任务 " + taskId);
                    try {
                        // 模拟任务执行时间
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                });
            }
            // 关闭线程池
            executorWithDiscardPolicy.shutdown();
        }
    }
    

    执行结果如下所示

    在这里插入图片描述

    在上述代码中我们模拟了提交20个任务,而线程池最大线程数是10,也就代表最多10个线程执行20个任务,这过程中会有线程被重复利用,最终我们看到的结果中最后的任务执行到14,而后续任务15-19的直接放弃。这就是直接丢弃策略。

  3. CallerRunsPolicy(调用者运行策略)

    当任务无法被提交给线程池时,会由提交任务的线程自己执行该任务。

    适用于对任务提交失败要求较低的场景,通过调用线程来执行任务,避免任务丢失。

    我们还是通过代码示例来感受一下

    public class CallerRunsPolicyExample {
        public static void main(String[] args) {
            // 使用CallerRunsPolicy(调用者运行策略)
            ThreadPoolExecutor executorWithCallerRunsPolicy = new ThreadPoolExecutor(
                    5, 10, 0L, TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<>(5),
                    Executors.defaultThreadFactory(),
                    // 调用者运行策略
                    new ThreadPoolExecutor.CallerRunsPolicy()
            );
            // 提交超过线程池处理能力的任务,以触发拒绝策略
            for (int i = 0; i < 20; i++) {
                int taskId = i;
                executorWithCallerRunsPolicy.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + " 正在执行任务 " + taskId);
                    try {
                        // 模拟任务执行时间
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                });
            }
            // 关闭线程池
            executorWithCallerRunsPolicy.shutdown();
        }
    }
    

    执行结果如下

    在这里插入图片描述

    它的实现原理是 在 rejectedExecution 方法中,检查线程池是否已关闭,如果没有关闭,则直接调用任务的 run 方法。我们从它的源码来看一下

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
           
            public CallerRunsPolicy() { }
    
            /**
             * Executes task r in the caller's thread, unless the executor
             * has been shut down, in which case the task is discarded.
             * 简单翻译上面的注释
             * 在调用方的线程中执行任务r,除非执行器
             * 已关闭,在这种情况下将放弃任务。
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    r.run();
                }
            }
    }
    
  4. DiscardOldestPolicy(丢弃最旧任务策略)

    当任务无法被提交给线程池时,会丢弃队列中最早的一个任务(即最早提交的任务),然后尝试重新执行当前任务。

    适用于允许丢弃老任务以接纳新任务的场景。

    我们通过代码示例来演示一下 丢弃最旧任务策略

    public class DiscardOldestPolicyExample {
        public static void main(String[] args) {
            // 使用DiscardOldestPolicy(丢弃最旧任务策略)
            ThreadPoolExecutor executorWithDiscardOldestPolicy = new ThreadPoolExecutor(
                    5, 10, 0L, TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<>(5),
                    Executors.defaultThreadFactory(),
                    // 丢弃最旧任务策略
                    new ThreadPoolExecutor.DiscardOldestPolicy() 
            );
    
            // 提交超过线程池处理能力的任务,以触发拒绝策略
            for (int i = 0; i < 20; i++) {
                int taskId = i;
                executorWithDiscardOldestPolicy.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + " 正在执行任务 " + taskId);
                    try {
                        // 模拟任务执行时间
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                });
            }
            // 关闭线程池
            executorWithDiscardOldestPolicy.shutdown();
        }
    }
    

    执行结果如下
    在这里插入图片描述

    它的实现原理是 在 rejectedExecution 方法中,首先移除队列中的头部任务(最旧的任务),然后尝试再次提交当前任务。我们看源码是如何做的

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
            public DiscardOldestPolicy() { }
    
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    //这里很明显就是调用poll()方法取队列第一个元素
                    e.getQueue().poll();
                    //再次提交当前任务
                    e.execute(r);
                }
            }
    }
    
  5. 自定义拒绝策略

    如果前面的四种拒绝策略仍不满足我们的要求,可以通过实现RejectedExecutionHandler接口来定义自己的拒绝策略。我们用代码来演示一下

    public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.out.println("任务被拒绝,将执行自定义拒绝策略");
        }
        public static void main(String[] args) {
            ThreadPoolExecutor executor = new ThreadPoolExecutor(
                    5, 10, 0L, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(5),
                    Executors.defaultThreadFactory(),
                    new CustomRejectedExecutionHandler());
    
            for (int i = 0; i < 20; i++) {
                int taskId = i;
                executor.execute(() -> {
                    try {
                        System.out.println(Thread.currentThread().getName() + ": 执行任务 "+taskId);
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }
    
            executor.shutdown();
        }
    
    }
    

    执行结果如下
    在这里插入图片描述

    在这个上述示例中,我们创建了一个自定义的拒绝策略CustomRejectedExecutionHandler,并在其rejectedExecution 方法中打印了一条日志信息。随后,我们创建了一个线程池,并将自定义的拒绝策略传递给了线程池的构造函数。最后,我们提交了20个任务给线程池,由于线程池的配置(核心线程数为5,最大线程数为10),所以会有任务被拒绝,并触发自定义的拒绝策略。

类关系图

在上述各种代码示例中我们既用了 ExecutorService 又用了 ThreadPoolExecutor 这时就有小伙伴比较茫然,这俩存在一定的联系,但整体感觉又不是太清晰。我们来看一下它们的类关系图
在这里插入图片描述

从类关系图上可以看出 ThreadPoolExecutor 类继承了AbstractExecutorService 类,而 AbstractExecutorService类又实现了 ExecutorService 和 Executor 接口。也就是说 Executor 提供了操作线程池的 API简化线程池操作的调用。

有了以上这些内容的铺垫后,接下来我们进入整篇文章最核心的部分——原理分析与源码分析

原理分析

要搞清楚线程池的原理,首先我们要知道核心需求是什么,核心需求是线程的复用,复用即重复使用就不存在重新创建线程(new Thread())。到底如何才能实现线程的复用,假如我们自己实现一个线程池要怎么做?我们一步步的来分析

  1. 复用的前提是线程一直从存在不会被 JVM 回收。众所周知,线程的调度和执行都是 CPU 来控制的。在 run() 方法执行结束后线程自动销毁并被 JVM 回收,假设现在有这么一个类

    public class Recovery extends Thread{
        @Override
        public void run() {
            System.out.println("线程执行");
        }
    }
    

    试想一下,如果 run() 方法不结束,那么线程是不是就会一直存在,我们给上面的代码加个 while 循环不就可以了

    public class Recovery extends Thread{
        @Override
        public void run() {
            while (true){
                System.out.println("线程能一直执行");
            }
        }
    }
    

    这时候问题来了,如果线程一直执行会耗尽 CPU 的资源,进而影响到我们的系统。结合我们在文章开头的实际案例,假设线程就是骑手,线程一直不停的执行就相当于骑手一直在送餐的路上。这显然是不合理的,骑手不能送餐之后就一直停不下来。而正常的逻辑是在有订单的情况下,骑手去送餐,没有订单的时候,骑手处于等待订单的这么一个状态。同理,我们在调用execute(Task) 方法时,是线程池中的线程去取任务来执行,没有任务的时候就等待。让线程进行等待的操作我们就很熟悉了——阻塞。这样就不会一直占用 CPU 的资源了。到这里我们先梳理下以上的逻辑,当有任务时,线程池中的线程去取任务执行,当没有任务时线程在线程池中阻塞等待。这就是线程复用的核心思想,中心思想有了,接下来就需要围绕这个思想去完善线程池。

  2. 客户下单,骑手接单。小伙伴们可以感受一下,这个过程是不是似曾相识。这就是典型的生产者/消费着模型

    在这里插入图片描述

    试想一下,在上图中的 “ 平台 ” 这一角色应该具备什么样的功能。首先,“ 平台 ”需要有承接客户的下单(提交任务) 的功能,也就是“ 平台 ”能够存储任务。其次,当“ 平台 ”中的任务为空时就不让骑手(线程)接单(取任务)即能够阻塞消费者。那么在下单到接单的过程中谁能代替“ 平台 ”这个角色呢?相信读到这里的小伙伴心中已然有了答案,我们再回看线程池的核心参数,参数中传参 BlockingQueue ——阻塞队列。

    我们发现阻塞队列刚好满足“ 平台 ” 这个角色的功能。 调用BlockingQueue.take() 方法能够从阻塞队列中获取任务,当阻塞队列中没有任务时,会阻塞取任务的线程。直到队列中有了新的任务才会被唤醒。调用BlockingQueue.offer() 方法能够往阻塞队列中添加任务而不阻塞添加任务的线程(这里可以理解为在现实场景中我们不可能阻止用户下单的行为)。通过以上的分析我们对线程池的原理的理解有了一个初步的雏形,如下图所示
    在这里插入图片描述

  3. 假设你是餐厅的老板,那你是不是希望生意越来越火爆,订单越来越多。这时候问题又来了——人手(线程)不够了,原来四五个人的送餐队伍现在订单多的送不过来。就算你变成一个黑心老板,让骑手们连轴转,你的餐厅的物资储备也不足以承载所有的订单。这又衍生出另外一个问题——已经无法接收订单(任务)了。根据刚才的这两种情况,我们把它类比到线程池中来:

    (1)人手(线程)不够了怎么办,那餐厅是不是得增加人手(线程)代表在核心线程的基础上还要增加线程。

    (2)无法再接收订单(任务)了,那餐厅只能拒绝接单了这不就对标了我们之前说的拒绝策略。

    细心的小伙伴会发现,出现问题的这种情况跟我们处理的消息中间件的问题基本差不多(因为问题的本质还是围绕的生产者和消费者),所以处理问题的方案也都一样。阻塞队列满了无非两种处理方式,一是降低生产者提交任务的频率 ,二是提高消费者的消费能力,也就是增加消费者的数量。基于这个推论我们再来看线程池的参数,corePoolSize 就不用多说了,核心线程数这是必要条件。我们关注它的另一个关于线程数量的参数 maximumPoolSize 最大线程数,假设餐厅的订单只是在某个时间段特别多,作为老板的你需要招更多的骑手才能满足需求,而你为了省钱又不想给新骑手交社保。所以你就招了些临时工来应急,忙完之后还可以把他们裁掉。同理类比到线程中 maximumPoolSize 更多充当临时工的这么一个角色。我们对线程池的理解又加深了,如下图所示
    在这里插入图片描述

    加了人手(线程)订单(任务)还是太多了,超过了餐厅的承载。没办法了只能停止接单了,这就跟线程池的参数RejectedExecutionHandler 建立起了联系,当线程池中的线程数已经达到了最大线程且阻塞队列已满则触发拒绝策略。根据这个条件我们的线程池原理图又进一步升级。
    在这里插入图片描述

  4. 线程池中所有的任务总有执行完的时候,那增加的临时线程要怎么办?就好比餐厅的生意总会有淡季。此时作为餐厅黑心老板的你肯定要把那些临时工清退了。这就与线程池中的参数keepAliveTime 有关系了。当线程数大于 corePoolSize 时,这是多余空闲线程在终止前等待新任务的最长时间。既然有存活时间的限制那如何到时间后自动清除多余的线程,结合我们以上那些推论不难发现,在阻塞队列中提供了 poll(timeout,unit) 这个方法。该方法的作用是在阻塞队列为空时,会阻塞一段时间,当阻塞一段时间后仍没有任务来,则直接返回 null。至此我们的线程池原理图补上了最后一块。
    在这里插入图片描述

源码分析

梳理了线程池的原理之后我们再来看源码思路就比较清晰了。不过再看源码之前我们要先了解一下 ThreadPoolExecutor 的成员变量

成员变量

//有些成员变量一眼就能看懂含义,有些则借助英文注释能大致看懂
//不需要全部都看懂,我们只需要了解主要的成员变量的含义即可
public class ThreadPoolExecutor extends AbstractExecutorService { 
    //存储线程池的状态和线程数量的原子变量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //表示ctl中用于存储线程数量的比特位数 Integer.SIZE - 3 = 29 
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //CAPACITY 是根据COUNT_BITS计算得到的,表示ctl中能够表示的最大线程数量
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    // 英文注释翻译过来就是以下这些线程池的状态存储在高位
    // RUNNING 表示线程池处于运行状态
    private static final int RUNNING    = -1 << COUNT_BITS;
    // SHUTDOWN 表示线程不再接收任务,但已经提交的仍然能够执行
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // STOP 表示线程池不接受新任务,并且会尝试立即停止正在执行的任务
    private static final int STOP       =  1 << COUNT_BITS;
    // TIDYING 表示所有任务都已终止
    private static final int TIDYING    =  2 << COUNT_BITS;
    //TERMINATED 表示线程池已完全终止
    private static final int TERMINATED =  3 << COUNT_BITS;

    //从ctl的值中获取线程池的状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //从ctl的值中获取线程池中的线程数量
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    //通过rs | wc操作(或运算)整合成一个完整的ctl
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }
    
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }
    //判断线程池是否是运行状态
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }
    
    //通过cas操作原子的增加ctl里的线程数量
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }
    //通过cas操作原子的减少ctl里的线程数量
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }

    //这里是循环调用原子减少线程数量的方法,因为cas操作存在失败的可能所以通过自旋的形式直到成功为止
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }

    //这个我们就比较熟悉了——阻塞队列
    private final BlockingQueue<Runnable> workQueue;

    //重入锁
    private final ReentrantLock mainLock = new ReentrantLock();

    //存储工作线程的集合
    private final HashSet<Worker> workers = new HashSet<Worker>();

    //支持等待终止的等待条件 跟awaitTermination()方法有关
    private final Condition termination = mainLock.newCondition();
    
    //线程池在其生命周期中曾经达到过的最大线程数
    private int largestPoolSize;

    private long completedTaskCount;
    //创建线程的工厂
    private volatile ThreadFactory threadFactory;
    //拒绝策略
    private volatile RejectedExecutionHandler handler;
    //存活时间
    private volatile long keepAliveTime;
    //是否允许核心线程超时
    private volatile boolean allowCoreThreadTimeOut;
    //核心线程数
    private volatile int corePoolSize;
    //最大线程数
    private volatile int maximumPoolSize;
    //默认拒绝策略
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

    private static final RuntimePermission shutdownPerm =
        new RuntimePermission("modifyThread");

    private final AccessControlContext acc;
}

在以上这些成员变量中存在不少位运算,这里还是有必要说明一下,方便后面源码部分的理解。

AtomicInteger ctl 这个变量存储的是线程池的状态和线程数量,它是采用的是高低位存储。这个小伙伴们应该不陌生之前我们在 Lock 那一章节中也设计到过高低位存储。那 ctl 存储的结构如何,我们用图形的方式来解释一下
在这里插入图片描述

结合图形和变量的位运算,我们能够得出 CAPACITY = (1 << COUNT_BITS) - 1 = 536870911 这就是低位存储所能存的最大线程数量。

线程池状态

线程池的状态是通过高3位表示的且都是通过位运算所得,所以我们来看一下它是如何表示与计算的

/**这里的运算设计到了原码、反码、补码的过程
* 1 的原码是 0000 0000 0000 0000 0000 0000 0000 0001
* -1原码则是符号位改为1
* -1的原码是 1000 0000 0000 0000 0000 0000 0000 0001 
* 接下来-1进行反码除符号位外0变为1 1变为0 
* -1的反码是 1111 1111 1111 1111 1111 1111 1111 1110
* 最后进行补码,除符号位外各位加1
* -1补码后 1111 1111 1111 1111 1111 1111 1111 1111
* 补码后再左移29位得到
* 1110 0000 0000 0000 0000 0000 0000 0000
*/
private static final int RUNNING    = -1 << COUNT_BITS;
//0的二进制还是零所以左移29位后仍然是
//0000 0000 0000 0000 0000 0000 0000 0000 0000 
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 1左移29位得到
// 0010 0000 0000 0000 0000 0000 0000 0000 0000
private static final int STOP       =  1 << COUNT_BITS;
// 2左移29位得到
// 0100 0000 0000 0000 0000 0000 0000 0000 0000
private static final int TIDYING    =  2 << COUNT_BITS;
//3左移29位得到
//0110 0000 0000 0000 0000 0000 0000 0000 0000
private static final int TERMINATED =  3 << COUNT_BITS;

这里的位运算看起来比较懵,为了便于理解我们将 runStateOf(int c) 方法和 workerCountOf(int c) 方法套用具体数据模拟计算

  1. runStateOf(int c)

    这个方法是用来获取线程池的状态,假设 c= 536870947(这个数被特意选择为具有非零状态位和非零工作线程数量位的十进制数) 通过 c & ~CAPACITY 计算得到线程池状态。整个计算过程如下

    //所有操作都是 32位
    //CAPACITY 转为二进制
    0001 1111 1111 1111 1111 1111 1111 1111
    //进行非运算
    1110 0000 0000 0000 0000 0000 0000 0000
    // c的二进制为
    0010 0000 0000 0000 0000 0000 0010 0011 
    //进行 & 运算(与运算的特点是 两个操作数中位都为1,结果才为1,否则结果为0)得到
    0010 0000 0000 0000 0000 0000 0000 0000
    

    得出的计算结果就跟 STOP状态相等,所以线程池处于 STOP状态

  2. workerCountOf(int c)

    还是假设 c= 536870947 通过c & CAPACITY 计算得到线程池中线程的数量,整个计算过程如下

    //所有操作都是 32位
    //CAPACITY 转为二进制
    0001 1111 1111 1111 1111 1111 1111 1111
    // c的二进制为
    0010 0000 0000 0000 0000 0000 0010 0011 
    //进行 & 运算(与运算的特点是 两个操作数中位都为1,结果才为1,否则结果为0)得到
    0000 0000 0000 0000 0000 0000 0010 0011     
    

    得出的计算结果是二进制 100011 转化为十进制是35,所以线程池有35个线程。

现在我们已经对线程池的成员变量和线程池的状态有了初步的了解而且对它的原理也进行了一番推导。那么在分析源码之前我们不妨在猜测一下它的源码大概是怎么做的

  1. 线程池中的核心线程是延迟加载(类似懒加载,有任务就创建线程,无任务时不创建)
  2. 添加任务时调用阻塞队列的添加方法,很显然是调用的offer()方法把任务加入到阻塞队列。通过返回 true/false 来判断阻塞队列是否满了,如果返回 true 则证明阻塞队列未满当前的核心线程就能满足需求。如果返回 false 则证明阻塞队列满了需要增加工作线程(非核心线程)。
  3. 如果增加工作线程已经达到了最大线程数无法增加新的工作线程,则调用拒绝策略

接下来就正式进入源码的分析,我们就以execute()为入口开始分析

execute()

public void execute(Runnable command) {
        //不能执行空的任务,所以任务为空直接抛出异常
        if (command == null)
            throw new NullPointerException();
        //获取线程池的状态和线程数量
        int c = ctl.get();
        //workerCountOf(c)计算出线程数量如果小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
            //创建一个线程并执行当前任务,创建成功直接返回
            if (addWorker(command, true))
                return;
            //如果创建失败则重新获取线程池的状态和线程数量
            c = ctl.get();
        }
        //这里根据 workerCountOf(c) < corePoolSize == false 证明已经存在核心线程了
        //判断线程池处于运行状态并且阻塞队列添加任务成功
        if (isRunning(c) && workQueue.offer(command)) {
            //再次获取线程池的信息,用于判断放入任务的过程中线程池的状态发生了变化(比如被关闭)
            int recheck = ctl.get();
            //如果线程池不是运行状态并且任务从阻塞队列中成功移除则调用拒绝策略
            if (! isRunning(recheck) && remove(command))
                //调用拒绝策略
                reject(command);
            //如果线程数量为0,则增加一个非核心线程
            //这么做是因为存在所有线程因长时间空闲而被终止的可能,所以添加一个非核心线程,即使没有任务也启动,以便于响应新任务
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //根据上下文判断执行到这里时证明队列已满,则尝试增加新的非核心线程
        else if (!addWorker(command, false))
            //如果失败则执行拒绝策略
            reject(command);
}

通过上述 execute() 源码的分析基本符合我们的之前的猜想,我们也能够看出核心的方法是 addWorker() 。我们继续往下分析。

addWorker()

addWorker(Runnable firstTask, boolean core) 有两个参数 firstTask 表示任务,core 则表示是否是核心线程

//addWorker 代码比较长,我们还是常规操作先看主体再拆开分析
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            //part1 第一部分 判断是否需要创建工作线程
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            //part2 第二部分 更新线程池中的线程数量
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }
        
        //part3 创建一个工作线程并执行任务
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
}

我们先来看part1 第一部分的代码

  if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

在看这段代码时,我们要先假设全部条件成立。

  1. rs >= SHUTDOWN == true

    我们在前面的内容中解释了线程池的几个状态,满足大于等于 SHUTDOWN 的状态有 SHUTDOWN、STOP、TIDYING、TERMINATED。他们有一个共同的特点就是不会接收新任务

  2. (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()) == true

    如果条件判断为 true 证明此时线程池处于 SHUTDOWN 状态,不再接收新任务 firstTask == null,阻塞队列中还有任务 ! workQueue.isEmpty() == true,说明此时线程池调用了shutdown() 方法,但队列中的任务还没有执行,需要传递一个空的任务来创建工作线程来把任务执行完。

  3. (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()) == false

    如果条件判断为 false 三个条件都同时不满足,不创建工作线程。

我们再来看part2 第二部分的代码

//涉及cas操作有失败的可能,所以通过for(;;){}不断尝试
for (;;) {
    //获取线程数量
    int wc = workerCountOf(c);
    //判断 如果当线程数量 >= 最大线程数
    if (wc >= CAPACITY ||
        //或者 当前线程数量达到最大线程数或超越了核心线程数
        wc >= (core ? corePoolSize : maximumPoolSize))
        //返回 false 证明不需要创建工作线程
        return false;
    //根据前面的代码判断得出,此处表明当前需要创建工作线程
    //通过 cas 操作修改线程数量,成功则跳出循环
    if (compareAndIncrementWorkerCount(c))
        break retry;
    c = ctl.get();  // Re-read ctl
    //判断线程池状态是否发生变化如果为true跳出循环
    if (runStateOf(c) != rs)
        continue retry;
}

最后来看 part3 第三部分的代码,这段代码的作用就是去创建一个工作线程并执行任务

//用于记录工作线程是否已启动
boolean workerStarted = false;
//用于记录工作线程是否已经加入到 HashSet中
boolean workerAdded = false;
Worker w = null;
try {
    //定义一个 Worker 对象并将任务传入
    w = new Worker(firstTask);
    //从 Worker中获取线程
    final Thread t = w.thread;
    if (t != null) {
        //加锁保证线程安全性
        //这里并不是因为线程竞争而加锁,而是因为避免创建工作线程时其他线程把线程池关闭
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //获取线程池的状态
            int rs = runStateOf(ctl.get());
            //两个条件允许创建线程
            //1是线程池处于运行状态 rs < SHUTDOWN == true
            //2是当前线程是SHUTDOWN 状态并且没有新任务允许传递一个空任务保证从而创建工作线程使当前所有任务都执行完
            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {        
                //判断线程是不是存活状态,若是则抛出异常
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();
                //将 Worker 加入到 HashSet中
                workers.add(w);
                //获取 HashSet 的大小
                int s = workers.size();
                //记录线程池在其生命周期中曾经达到过的最大线程数
                if (s > largestPoolSize)
                    largestPoolSize = s;
                //将workerAdded改为ture表示已经加入到工作线程集合中
                workerAdded = true;
            }
        } finally {
            mainLock.unlock();
        }
        if (workerAdded) {
            //这里就是启动线程
            t.start();
            //记录工作线程已经启动了
            workerStarted = true;
        }
    }
} finally {
    if (! workerStarted)
        //若启动失败则把在HashSet中的当前Worker删除
        //修改当前线程池的线程数量
        addWorkerFailed(w);
}

有的小伙伴这时会有疑问,一直再强调 addWorker() 是创建一个线程并执行任务。但目前只看见了创建线程并没有见具体任务的执行。答案就在内部类 Worker 中。

Worker
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    private static final long serialVersionUID = 6138294804551838833L;

    final Thread thread;
    Runnable firstTask;
    volatile long completedTasks;

    Worker(Runnable firstTask) {
        setState(-1); 
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    
    public void run() {
        runWorker(this);
    }
    //........代码省略...........
}

从上面的代码中我们明显的可以看出 Worker 继承了AbstractQueuedSynchronizer 并且实现了 Runnable 接口,在它的构造方法中通过getThreadFactory().newThread() 创建了一个线程。其次 AbstractQueuedSynchronizer 我们很熟悉它涉及到了独占锁和共享锁的实现。而有 Runnable 接口就必然会执行run() 方法。所以具体的执行任务的逻辑就在 runWorker() 中。

runWorker()

runWorker() 的主要作用是不断从阻塞队列中取出任务并执行

final void runWorker(Worker w) {
    //获取当前线程
    Thread wt = Thread.currentThread();
    //从Worker对象的firstTask属性中获取初始任务,并将其置为null,表示该任务已被取出
    Runnable task = w.firstTask;
    w.firstTask = null;
    //允许被中断
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //如果 firstTask 不为空则执行firstTask,为空则从阻塞队列中取任务
        //while循环就代表了不停的取任务
        while (task != null || (task = getTask()) != null) {
            //加锁,防止其他线程调用 shutdown()
            w.lock();
            //获取线程池状态和当前线程的中断状态,如果线程池状态>=stop 或者当前线程被中断但尚未标记为中断则中断 wt(当前线程)
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //钩子方法,用于任务执行的前置操作
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //任务执行
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                //任务完成赋值为null
                task = null;
                //完成任务数加累加
                w.completedTasks++;
                //释放锁
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}
beforeExecute()

这里补充一下 beforeExecute() 和 afterExecute() ,他俩是用于执行任务的前置处理和后置处理,这个就有点像 Spring AOP里面的前置处理器和后置处理器。直接说可能不太明白。我们通过代码来演示一下(这里只演示 beforeExecute())。

public class BeforeThreadPoolExecutor extends ThreadPoolExecutor {

    public BeforeThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        System.out.println("执行前置操作");
    }
}

//使用
public class BeforeExample {
    public static void main(String[] args) {
        BeforeThreadPoolExecutor executor = new BeforeThreadPoolExecutor(
                2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

        for (int i = 0; i < 10; i++) {
            int taskId = i;
            executor.execute(() -> {
                System.out.println("执行任务的id: " + taskId + " 线程名称是: " + Thread.currentThread().getName());
            });
        }
        executor.shutdown();
    }
}

执行结果
在这里插入图片描述

在上述代码中,BeforeThreadPoolExecutor 类扩展了 ThreadPoolExecutor 并覆盖了 beforeExecute 方法。在每次线程准备执行任务时,beforeExecute 方法都会被调用,我们通过演示结果也能够看出。beforeExecute 可自行扩展前置操作比如日志记录、任务统计等。

再回到源码分析上来,之前我们已经分析了 runWorker() 方法并且知道了任务是如何执行的。这时候问题来了,我们在分析原理的时候提到了非核心线程通过 poll(timeout,unit) 取得任务,那具体是如何进行取任务的呢?我们来看 getTask() 方法。

getTask()

getTask() 的作用是从阻塞队列中取任务,如果取到任务则返回一个 Runnable 反之直接返回null。

private Runnable getTask() {
    //记录上一次取任务是否超时
    boolean timedOut = false; 

    for (;;) {
        int c = ctl.get();
        //获取线程池状态
        int rs = runStateOf(c);
        //如果线程池不是running状态 且如果大于等于STOP状态或阻塞队列为空则把线程数量递减并返回null
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            //线程池中的线程数量递减
            decrementWorkerCount();
            return null;
        }
        //获取线程池中的线程数量
        int wc = workerCountOf(c);

        //allowCoreThreadTimeOut 是对应核心线程是否超时
        //如果为false(默认),则核心线程即使在空闲时也会保持活动状态。
        //如果为true,核心线程将使用keepAliveTime超时等待
        //也就是说 timed 代表了核心线程或工作线程是否会被回收
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        // 下面这个条件判断比较多,我们先从整体上看这个判断是什么意思
        //总体来说就是如果工作线程数超过最大线程数并且队列为空则需要回收工作线程同时线程数量进行递减
        //如果 wc > maximumPoolSize == true,则当前线程数已经超过了最大线程数
        //如果 timed && timedOut== true 则 timed == true, timedOut== true
        //timed == true表示核心线程会被回收
        //timedOut== true 表示非核心线程已经超过了keepAliveTime 时间
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            //如果线程数量递减成功则直接返回null
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            //这里就验证了我们之前原理的分析
            //timed就不重复解释了,如果timed == true则调用阻塞队列的poll(keepAliveTime, TimeUnit.NANOSECONDS)方法
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
            //如果 r!= null则返回 r
            if (r != null)
                return r;
            //如果 r==null 则timedOut改为true,进入下一次循环
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

如果 getTask() == null,那么 runWorker() 方法中的 while(task != null || (task = getTask()) != null){} 条件不成立,则跳出循环最终会进入 finally,我们来看 processWorkerExit() 方法做了什么

processWorkerExit()
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // completedAbruptly 为true 表示线程是突然完成的可能是异常或中断,则需要修正线程池中的线程数量
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    //加锁
    mainLock.lock();
    try {
        //已完成任务数的累加
        completedTaskCount += w.completedTasks;
        //从 HashSet中移除当前退出的工作线程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    //尝试终止线程池
    tryTerminate();
     
    int c = ctl.get();
    //判断线程池是否还处于RUNNING或者SHOUTDOWN状态,如果是则根据下面的条件判断是否需要增加一个线程来处理任务
    if (runStateLessThan(c, STOP)) {
        //如果completedAbruptly == false则证明没有发生异常或中断
        if (!completedAbruptly) {
            //获取最小线程数
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            //如果最小线程数为0,队列不为空设置最小线程数为1
            //这是为了确保当工作队列中有待处理的任务时,至少有一个线程来处理这些任务 
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            //检查当前线程池中的工作线程数是否已经达到或超过了最小线程数
            //如果工作线程数已经达到或超过最小线程数,那么不需要添加新的工作线程,直接返回
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        /**如果上述条件都不满足 即线程池状态允许、没有发生异常或中断、且当前工作线程数少于最小线程数,
         *则调用addWorker()来尝试添加一个新的工作线程
        */
        addWorker(null, false);
    }
}

我们再简单看一下 shutDown() 方法,shutdown() 方法这里就不详细展开了,我们主要看大体逻辑

shutDown()

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        //加锁保证在调用shutDown() 时没有其他线程修改线程池状态
        mainLock.lock();
        try {
            //检查调用线程是否有权限关闭线程池
            checkShutdownAccess();
            //将线程池的运行状态更新为SHUTDOWN。这意味着线程池不再接受新任务,但会完成已提交的任务
            advanceRunState(SHUTDOWN);
            //用于中断那些当前没有执行任务的工作线程。这些线程可能正在等待新任务或者因为其他原因处于空闲状态
            interruptIdleWorkers();
            //钩子方法,用于关闭线程池时的扩展
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        //尝试终止线程池
        tryTerminate();
}

至此线程池的主要源码就分析完了。最后我们还有一项没有固定答案的关于线程池线程数量计算的分享

线程池线程数量计算

关于线程池线程数量的计算其实并没有一个严格准确的答案只能说理论上存在,参考其他资料中有这样一个公式

公式:线程数量 = CPU的数量 * CPU期望利用率 * (1 + 等待时间 / 服务时间)

  • CPU的数量:可以通过Runtime.getRuntime().availableProcessors()获取。
  • CPU期望利用率:期望的CPU使用率,一般在50%左右。
  • 等待时间:任务等待IO完成的时间。
  • 服务时间:CPU处理任务的时间(不包含阻塞等待的时间)。

假设 CPU期望利用率 = 100% ,那么 线程数量 = CPU的数量 * (1 + 等待时间 / 服务时间)

线程池的计算公式根据要执行的任务性质分为 CPU密集型或IO密集型

  1. IO 密集型

    对于IO密集型任务,如MySQL数据库操作、文件读写、网络通信等,这类任务不会特别消耗CPU资源,但IO操作比较耗时,会占用较多时间。对于这类任务,线程数量设置多一点影响不大。建议是 线程数量 = CPU的数量 *2+1

  2. CPU 密集型

    对于CPU密集型任务,如加解密、压缩、计算等需要大量耗费CPU资源的任务,理论上线程的数量等于CPU核数就是最合适的。不过,为了实现更优的利用率,通常将线程的数量设置为CPU核数加1。即使当密集型的线程由于特殊原因导致阻塞时,这个额外的线程也能确保 CPU 周期不会被浪费,从而保证CPU的利用率。建议是 线程数量 = CPU的数量 +1

以上关于线程池线程数量的计算理论上是正确的,但并不能完全按照公式来计算,在实际项目中还是需要具体情况具体分析。

总结

本篇我们对线程池的实现和原理做了一个正向的分析,只能说对主要的代码逻辑进行了分析还有很多细节没有涉及到,感兴趣的小伙伴可自行研究。本篇我们主要关注的重点是它的设计思想,并根据它的这个设计思想代入代码中是如何实现的。介于作者水平有限,文章中若要错误之处还望批评指正

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2212689.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux进程间通信(一)——管道通信

目录 前言 1.管道实现进程间通信 ①管道的所属问题 ②匿名管道通信 ③命名管道通信 2.使用管道通信实现一个进程池 ①进程池类图 ②Channel类实现 ③ProcessPoll类实现 ④代码一览 前言 在学习Linux中的进程时&#xff0c;曾提到过进程的独立性。进程独立性的是进程与进程之间…

SpringMVC后台控制端校验-表单验证深度分析与实战优化

前言 在实战开发中&#xff0c;数据校验也是十分重要的环节之一&#xff0c;数据校验大体分为三部分&#xff1a; 前端校验后端校验数据库校验 本文讲解如何在后端控制端进行表单校验的工作 案例实现 在进行项目开发的时候,前端(jquery-validate),后端,数据库都要进行相关的数据…

【数据结构】图的最短路径

快乐的流畅&#xff1a;个人主页 个人专栏&#xff1a;《C游记》《进击的C》《Linux迷航》 远方有一堆篝火&#xff0c;在为久候之人燃烧&#xff01; 文章目录 引言一、最短路径的概念二、Dijkstra算法2.1 思想2.2 实现 三、Bellman-Ford算法3.1 思想3.2 实现 四、Floyd-Warsh…

操作教程|基于DataEase用RFM分析法分析零售交易数据

DataEase开源BI工具可以在店铺运营的数据分析及可视化方面提供非常大的帮助。同样&#xff0c;在用于客户评估的RFM&#xff08;即Recency、Frequency和Monetary的简称&#xff09;分析中&#xff0c;DataEase也可以发挥出积极的价值&#xff0c;通过数据可视化大屏的方式实时展…

液态神经网络 LNN

神经网络 (NN) 是 机器学习 模仿人脑结构和运算能力以从训练数据中识别模式的算法。 通过处理和传输信息的互连人工神经元网络&#xff0c;神经网络可以执行复杂的任务&#xff0c;例如 人脸识别, 自然语言理解&#xff0c;以及无需人工协助的预测分析。 尽管神经网络是一种强…

Mac电脑SourceTree git账号密码更改提示再次输入密码

前言&#xff1a; 最近小编git账号密码修改了&#xff0c;之前在sourceTree的git仓库在拉代码提交代码就会报错&#xff0c;提示因为密码导致的仓库连接失败。 解决方案 1.在mac电脑应用程序中搜索“钥匙串” 点击钥匙串访问 在钥匙串中选登录&#xff0c;在在右侧列表中找…

key形式和key/value形式二叉树

首先模拟一下key形式类 使用的结构是搜索二叉树 结点中有左孩子和右孩子 还有一个存储的值 template <class K>struct BSTnode//搜索二叉树不支持修改 中序遍历是有序的{K _key;BSTnode<K>* _left;BSTnode<K>* _right;BSTnode(const K& key):_key(key…

【C++】12.string类的使用

文章目录 1. 为什么学习string类&#xff1f;1.1 C语言中的字符串1.2 两个面试题(暂不做讲解) 2. 标准库中的string类2.1 string类(了解)2.2 auto和范围for 3. 查看技术文档4. string的访问5. 如何读取每个字符呢&#xff1f;6. auto语法糖&#xff08;C11&#xff09;7. 范围f…

spring boot 2.7整合Elasticsearch Java client + ingest attachment实现文档解析

一、软件环境 软件版本号备注Spring boot2.7.23.x版本建议使用ElasticSearch8.xElasticSearch7.17.4ElasticSearch 7.x 可使用JDK 8 ElasticSearch 8.x 要求使用JDK 11 二、安装ElasticSearch 下载地址&#xff1a;https://artifacts.elastic.co/downloads/elasticsearch/el…

网站建设中,虚拟主机的各项指标和参数

虚拟主机的各项指标和参数主要包括空间大小、并发连接数、带宽限制、流量限制、CPU限制、内存以及IO速度等。以下是对这些指标和参数的详细介绍&#xff1a; 空间大小&#xff1a;空间大小通常以MB或GB为单位&#xff0c;表示虚拟主机可以容纳的数据量。例如&#xff0c;一个1…

地级市-城市创业活跃度(每百人新创企业数)(2000-2021年)

城市创业活跃度通常指一个城市在一定时期内新创企业的数量和质量&#xff0c;它反映了城市的创业环境、创业者的积极性和创造力。根据中的研究&#xff0c;创业活跃度&#xff08;Entre_Activation&#xff09;作为反映区域层面创业活动积极程度的核心指标&#xff0c;被广泛用…

【Vue】Vue扫盲(二)指令:v-for 、v-if、v-else-if、v-else、v-show

【Vue】Vue扫盲&#xff08;一&#xff09;事件标签、事件修饰符&#xff1a;click.prevent click.stop click.stop.prevent、按键修饰符、及常用指令 文章目录 一、v-for遍历数组数组角标遍历对象&#xff1a;Key作用介绍 二、v-if、v-show基本用法&#xff1a;案例&#xff1…

【unity框架开发12】从零手搓unity存档存储数据持久化系统,实现对存档的创建,获取,保存,加载,删除,缓存,加密,支持多存档

文章目录 前言一、Unity对Json数据的操作方法一、JsonUtility方法二、Newtonsoft 二、持久化的数据路径三、数据加密/解密加密方法解密方法 四、条件编译指令限制仅在编辑器模式下进行加密/解密四、数据持久化管理器1、存档工具类2、一个存档数据3、存档系统数据类4、数据存档存…

【Photoshop——肤色变白——曲线】

1. 三通道曲线原理 在使用RGB曲线调整肤色时&#xff0c;你可以通过调整红、绿、蓝三个通道的曲线来实现黄皮肤到白皮肤的转变。 黄皮肤通常含有较多的红色和黄色。通过减少这些颜色的量&#xff0c;可以使肤色看起来更白。 具体步骤如下&#xff1a; 打开图像并创建曲线调…

几何完备的3D分子生成/优化扩散模型 GCDM-SBDD - 评测

GCDM 是一个新的 3D 分子生成扩散模型&#xff0c;与之前的 EDM 相比&#xff0c;GCDM 优化了其中的图神神经网络部分&#xff0c;使用手性敏感的 SE3 等变神经网络 GCPNET 代替了 EDM 中的 EGNN&#xff0c;让节点间消息传递、聚合根据手性不同而进行。本文对 GCDM-SBDD&#…

DMN决策引擎入门知识点

本文主要讲解Camunda是如何使用Dmn决策引擎&#xff0c;体验地址:www.jeecgflow.com Dmn决策表定义 Dmn在线设计 命中策略(Hit Policy) 策略名称策略描述Unique只有一个或者没有规则可以满足。决策表的结果包含满足规则的输出条目。如果超过一个规则满足&#xff0c;那么就违…

电脑知识:适用于 Windows 10 的 PDF 编辑器列表

PDF 是一种流行的、多功能且安全的文件格式&#xff0c;用于在线共享文档。但是&#xff0c;如果没有合适的应用程序&#xff0c;查看和编辑 PDF 文件可能会变得复杂。 幸运的是&#xff0c;有很多 PDF 编辑器可以帮助您更正重要文档上的错误、填写表格、为合同添加签名、更改…