【并发编程】线程池的原理和源码分析

news2025/1/7 18:42:55

线程使用上可能的问题

我们一般通过new Thread().start();来创建和运行一个线程,如果在业务过程中有大量场景需要使用多线程来并发,那么就会有以下问题

  1. 需要频繁的创建和销毁线程 ,需要消耗CPU资源
  2. 如果创建和销毁的线程的数量过多(大于CPU核数),那么线程之间需要不断的进行上下文切换,会消耗CPU资源

所以我们需要对线程做一个复用

池化技术的核心: 复用
eg. 连接池、对象池、内存池、线程池 。

线程池的思想

提前创建一系列的线程,保存在这个线程池中。
有任务要执行的时候,从线程池中取出线程来执行。
没有任务的时候,将线程放回到线程池中去。

Java中提供的线程池

Executors

  • newFixedThreadPool 创建固定线程数量的线程池
 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
  • newSingleThreadExecutor 只有一个线程的线程池
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
  • newCachedThreadPool 可以缓存的线程池 ->理论上来说,有多少请求,该线程池就可以创建多少个线程来处理。
 public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
  • newScheduledThreadPool 提供了可以按照周期执行的线程池. ->Timer
    ThreadPoolExecutor
 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

线程池的基本参数

 public ThreadPoolExecutor(int corePoolSize, //核心线程数
                              int maximumPoolSize, //最大线程数
                              long keepAliveTime, //存活时间
                              TimeUnit unit, //存活单位
                              BlockingQueue<Runnable> workQueue, //阻塞队列
                              ThreadFactory threadFactory, //线程工厂,用来创建工作线程的。 默认实现DefaultThreadFactory,默认创建的都是非守护线程(会在newThread将守护线程也改为非守护线程),如果我们想自定义线程池中线程的名字,可以自己传参
                                      RejectedExecutionHandler handler) { //拒绝执行策略 。默认实现-报错
        if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }

守护线程区别于用户线程,用户线程即我们手动创建的线程,而守护线程是程序运行的时候在后台提供一种通用服务的线程

守护线程和非守护线程的区别在于当主线程销毁停止时,守护线程会一起销毁。
垃圾回收线程就是典型的守护线程。

守护线程并不属于程序中不可或缺的部分。我们可以理解守护线程的存在就是为了维护用户线程,当所有的非守护线程(用户线程) 结束时,守护线程也就没有存在的必要了,所以程序会终止,同时会杀死进程中的所有守护线程。

源码分析

ctl 设计

ctl用来记录线程池的状态和当前线程池的线程数,高3位用来记录线程池状态,低29为用来记录线程数;通过位运算,分别可以获取到线程池的状态和当前线程数

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;// == 29
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    //初始值为 CAPACITY  000 1   1111 1111 1111 1111 1111 1111 1111
    


    // runState is stored in the high-order bits, 所以这里的status都会左移29位,就是为了存储在高位
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

在这里插入图片描述

execute

线程池中的核心线程是延迟初始化的。

  1. 如果当前工作线程数小于核心线程数,则先初始化核心线程并执行任务。
  2. 否则将task放入阻塞队列进去。(offer() -> true/false)
    1. 如果true ,说明当前的请求量不大(等待的线程不多), 核心线程就可以搞定。
    2. false,增加工作线程(非核心线程)
      1. 如果添加失败,说明当前的工作线程数量达到了最大线程数,直接调用拒绝策略。
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //c一个标记,可以获取线程数和当前标志位
        int c = ctl.get();
        //判断当前工作线程数是否小于核心线程数(延迟初始化)
        if (workerCountOf(c) < corePoolSize) {
            //小于核心线程数则再创建一个新的工作线程,并将commend作为新线程的第一个执行任务
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //执行到这里说明,工作线程>=核心线程数或者前面addWork失败
        //判断当前线程池是否处于Running状态并添加任务到任务队列,由于isRunning和offer非原子,所以后面还要在check一次状态
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //再次check ctl状态,非Running状态则从任务队列移除任务-->执行拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //如果当前工作线程数为0,则创建一个新的工作线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果队列已满不能添加,那么再创建一个工作线程(扩容的线程,非核心线程)
        else if (!addWorker(command, false))
            reject(command);
    }

addWork

addWorker 主要干两个事情:
1.通过CAS增加线程数,也就是更新ctl (自旋)
2.初始化新的工作线程并启动
3.启动失败则回滚

  private boolean addWorker(Runnable firstTask, boolean core) {
        //这里的retry是一个标志位,可以认为是给retry下一行的循环起的一个名字
        //若直接使用break那跳出的就是本次循环,使用break retry就是跳出retry标识下的循环
        retry:
        //自旋,增加线程数量, 通过CAS来保证增加线程操作在并发下的安全性
        for (;;) {
            //(1)
            int c = ctl.get();
            //获取线程池的当前状态
            int rs = runStateOf(c);
            // 如果线程池处于>=SHUTDOWN状态(SHUTDOWN,STOP,TIDYING或者TERMINATED)状态,如果线程池正处于SHUTDOWN状态,则不再接收新任务,但是会继续处理队列中的任务不会马上关闭
            // case1: firstTask != null 表示要添加新的任务 -> 失败 不能再增加新任务
            // case2: firstTask == null且workQueue为空 表示队列中的任务都已经处理完毕了 -> 返回false -> addWorker
            if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                            firstTask == null &&
                            ! workQueue.isEmpty()))
                return false;
            //自旋
            for (;;) {
                int wc = workerCountOf(c);
                //如果当前线程已经超过核心线程数/最大线程数则返回false
                if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //通过CAS增加线程数量,成功则跳出整个循环,执行(2)之后的代码
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                //判断运行状态是否发生了改变,如果变化了,则重新执行循环
                if (runStateOf(c) != rs)
                    //结束本次循环,从(1)开始重新执行循环
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        //(2)
        boolean workerStarted = false;
        boolean workerAdded = false;
        ThreadPoolExecutor.Worker w = null;
        try {
            //Worker extends AbstractQueuedSynchronizer implements Runnable,  创建新的工作线程
            w = new ThreadPoolExecutor.Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                //加锁,保证后续操作的线程安全性
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    //如果线程池是否处于可以正常处理任务的状态 1)运行状态 2)没有新任务的SHUTDOWN状态(即SHUTDOWN以后还在消费工作队列里的任务)
                    if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                        //如果在启动前,新线程就处于alive状态,抛出异常
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        //当前线程数大于最大线程数,则更新
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //调用线程的start方法,启动线程,执行worker.run()
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            //线程启动失败则回滚, 从work容器移除启动失败的线程并且ctl需要-1
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

runWoker

    final void runWorker(ThreadPoolExecutor.Worker w) {
        Thread wt = Thread.currentThread();
        //获取当前线程要执行的第一个任务(一般是触发新增这个工作线程的任务)
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //当task不为空,或者可以从任务队列里获取到非空任务,就会一直执行(while循环保证当前线程不结束,直到task为null)
            while (task != null || (task = getTask()) != null) {
                //加锁,防止线程池SHUNDOWN,导致正在运行中的任务中断。如果其他地方要shutdown().必须要等我执行完成才可以
                // Worker继承了AQS -> 实现了互斥锁
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                // 如果线程池状态为 STOP 及之上的状态,中断线程任务
                if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() &&
                                runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //空实现
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //调用task的run方法
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //空实现
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 根据completedAbruptly决定当前线程执行结束的时候是否需要补充新的Worker
            processWorkerExit(w, completedAbruptly);
        }
    }
  private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            //获取当前线程池的状态
            int rs = runStateOf(c);

            //如果线程池已经处于STOP状态或者处于SHUTDOWN状态且任务队列已空,直接返回null. 并且更新ctl的workCount(-1)
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                //这里返回null,Worker线程的run方法也就执行结束了
                return null;
            }
            //获取当前工作线程数
            int wc = workerCountOf(c);

            //如果超过时间没有获取到任务,是否允许回收当前线程
            // 1)allowCoreThreadTimeOut 为true
            // 2)当前的工作线程数量大于核心线程数
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            //同时满足以下两种情形,可以销毁当前线程
            //condition 1.当前线程数大于最大线程数或者在可以超时控制的情形下从阻塞队列里获取任务发生了超时
            //condition 2.当前线程数>1 或者 任务队列为空
            if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                //通过CAS减少工作线程数 -1
                if (compareAndDecrementWorkerCount(c))
                    //销毁当前线程
                    return null;
                continue;
            }

            try {
                //如果任务队列里没有任务,当前工作线程会阻塞在这里
                Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                if (r != null)
                    return r;
                //执行到这里表示超过keepAliveTime时间,没有获取到新任务
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
 private void processWorkerExit(ThreadPoolExecutor.Worker w, boolean completedAbruptly) {
        //如果当前线程是由于意外突然结束的,那么需要更新ctl的workCount
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        //当前线程池处于STOP之前的状态:RUNNING/SHUTDOWN
        if (runStateLessThan(c, STOP)) {
            //正常退出则判断当前是否需要补充一个新的线程
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            //非正常退出则补充一个新的非核心线程的工作线程
            addWorker(null, false);
        }
    }

线程池数量

对于IO密集型的应用,因为CPU的利用率不高,
所以我们的线程数可以设置的大一点,为CPU的N倍(具体根据业务代码调试,N>=5都可以)

对于CPU密集型的应用,由于CPU的利用率已经很高了,
设置过大的线程数会导致 频繁进行上下文切换,消耗CPU,此时线程数不建议设置的太高
(N为2~3应该就足够了)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/436867.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CMOS图像传感器——从传感器冗余说起

在这先抛出一个概念,什么是成像圈?众所周知,相机的镜头近似于圆柱体,光线透过圆筒子投射出的大都是圆形。我们可以拿一个镜头演示一下,当这个圆圈投在传感器所在焦平面时,我们称之为像场。像场的边界我们称之为成像圈,成像圈是圆的,但是传感器是矩形,天圆地方的怎么放…

Lombok插件下载与离线安装

Lombok插件下载与离线安装 首先你既然搜要离线安装或下载&#xff0c;那么肯定也是在IDEA工具里面&#xff0c;无法搜索到&#xff0c;或者自动下载安装失败吧&#xff1f; 安装包下载地址 记得和 idea版本一样&#xff0c; 如果不知道啥版本看下面

CleanMyMac X4.15重大更新 新功能菜单发布

CleanMyMac&#xff0c;一款电脑清理软件&#xff0c;可以帮助你清理垃圾文件、优化系统性能、管理应用程序等。它就像你的电脑管家&#xff0c;让你的电脑始终保持最佳状态。无论是手机还是电脑&#xff0c;在使用一段时间之后都可能会发生卡顿的现象&#xff0c;很多小伙伴会…

C++ 高级数据结构————[ 单调栈 ]

每周一篇的算法文章来了 今天讲解的是高级数据结构中的——单调栈 单调栈&#xff0c;顾名思义&#xff0c;就是升级版的栈&#xff08;&#xff09; 先回顾一下栈把 栈&#xff0c;是一种线性表&#xff0c;它的特点是只能从一边进出&#xff0c;并且先进后出&#xff0c;后进…

Windows入门篇一之MSDN手册的使用和第一个窗口程序

Windows入门篇之MSDN手册的使用和第一个窗口程序 MSDN手册MSDN手册是什么MSDN手册的下载和安装MSDN手册的使用 第一个窗口程序项目的创建第一个简单的窗口程序 MSDN手册 MSDN手册是什么 MSDN手册是VS中的一个帮助手册&#xff0c;帮助初学者学习Windows编程&#xff0c;来查找…

opencv实现卡尔曼滤波

卡尔曼滤波是一种递归的估计&#xff0c;即只要获知上一时刻状态的估计值以及当前状态的观测值就可以计算出当前状态的估计值&#xff0c;因此不需要记录观测或者估计的历史信息。 卡尔曼滤波器分为两个阶段&#xff1a;预测与更新。在预测阶段&#xff0c;滤波器使用上一状态…

VScode配置远端服务器深度学习项目

前提准备已安装VScode。 1.安装插件Remote Development 安装完成后左侧就多了远程资源管理器图标&#xff1a; 1.点击远程资源管理器。 2.点击小齿轮&#xff08;配置&#xff09;。 3.选择config配置文件&#xff0c;如果没有自己按照相似路径新建config文件后重复1、2、3步骤…

组合总和III

组合总和III 题目 力扣题目链接:https://leetcode.cn/problems/combination-sum-iii/ 代码 class Solution {public:vector<vector<int

小航助学答题系统编程等级考试scratch一级真题2023年3月(含题库答题软件账号)

青少年编程等级考试scratch真题答题考试系统请点击 电子学会-全国青少年编程等级考试真题Scratch一级&#xff08;2019年3月&#xff09;在线答题_程序猿下山的博客-CSDN博客_小航答题助手 1.下列说法不正确的是&#xff1f;&#xff08; &#xff09; A.可以从声音库中随机…

【JSP学习笔记】7.JSP 过滤器

JSP 过滤器 JSP 和 Servlet 中的过滤器都是 Java 类。 过滤器可以动态地拦截请求和响应&#xff0c;以变换或使用包含在请求或响应中的信息。 可以将一个或多个过滤器附加到一个 Servlet 或一组 Servlet。过滤器也可以附加到 JavaServer Pages (JSP) 文件和 HTML 页面。 过…

20230419 | 704.二分查找、27.移除元素

1、数组基础理论 int a[m][n]; 数组长度表示&#xff1a;a[0].length 数组宽度表示&#xff1a;a.length 2、704.二分查找 特征&#xff1a;数组是升序的找某个数&#xff0c;那就使用二分法。时间复杂度O(log n)&#xff0c;空间复杂度O(1) 我使用左闭右闭区间 计算中点&…

22、原理解析

文章目录 1、Profile功能1、application-profile功能2、Profile条件装配功能3、profile分组 2、外部化配置1、外部配置源2、配置文件查找位置3、配置文件加载顺序&#xff1a;4、指定环境优先&#xff0c;外部优先&#xff0c;后面的可以覆盖前面的同名配置项 3、自定义starter…

P600旗舰视觉款正式发布,重新定义视觉追踪与精准定位!

P600旗舰视觉款无人机是一款准行业级无人机&#xff0c;搭载RTK定位系统&#xff0c;定位精度可达厘米级&#xff0c;飞行路径更精准、姿态更稳定&#xff1b;机身搭载Allspark机载计算机&#xff0c;算力可达21TOPS&#xff0c;可运行大部分主流算法&#xff1b;配置G1三轴吊舱…

共模电感是如何抑制共模信号的

这是一个共模电感&#xff0c;外观它和我们常用的电感最大的区别就是共模电感有四个引脚&#xff0c;共模电感的磁芯上绕着两组线圈&#xff0c;这两个线圈匝数和材料都是一样的。 共模电感最主要的作用就是能抑制共模信号&#xff0c;一般用在电源或信号的EMI电路中。 首先来…

【ROS实操3服务调用添加乌龟数量】

需求描述 编码实现向turtlesim 发送请求&#xff0c;在乌龟显示节点的窗体指定位置生成一乌龟&#xff0c;这是一个服务请求操作。 实现分析 1.首先&#xff0c;需要启动乌龟显示节点。 2.要通过ROS命令&#xff0c;来获取乌龟生成服务的服务名称以及服务消息类型。 3.编写服…

C++基础入门——语法详解篇(上)

文章目录 一、什么是 C 呢&#xff1f; 二、为什么要学 C 呢&#xff1f; 三、C 基础语法 3、1 C 关键字 3、2 命名空间 3、2、1 为什么要引入命名空间 3、2、2 命名空间的定义 3、2、3 命名空间的使用 3、3 C的输入和输出 3、4 函数重载 3、4、1 函数重载的概念 3、4、2 C支持…

【WAF】雷池安装及使用体验

文章目录 前言一、雷池介绍二、安装及使用1.下载链接2.下载3. 安装waf测试 前言 长亭一直是我比较喜欢的一家公司&#xff0c;像业界比较出名的扫描器xray还有rad等很多工具都是他们开发的&#xff0c;使用起来非常的nice&#xff0c;联动其他安全工具可以实现自动漏洞挖掘&am…

掌玩科技×OceanBase:HTAP实时数据分析,降低80%存储成本

欢迎访问 OceanBase 官网获取更多信息&#xff1a;https://www.oceanbase.com/ 近日&#xff0c;新兴游戏公司海南掌玩网络科技有限公司&#xff08;以下简称“掌玩科技”&#xff09;正式牵手原生分布式数据库 OceanBase&#xff0c;其投放系统、用户分析系统、数据系统、运营…

beef-xss浏览器劫持

beef-xss浏览器劫持 一&#xff0c;实验拓扑图二&#xff0c;租用一台阿里云&#xff0c;搭建docker环境和beef环境1.租一台阿里云服务器&#xff0c;系统选用ubuntu&#xff0c;计时收费的那种&#xff0c;一个小时几毛钱2.开启策略组3000端口&#xff0c;5000端口4.安装docke…

wait/notify使用详解

1. 使用注意事项 wait/notify(All)可用于线程间(线程数量>3)通信 永远在synchronized方法或对象里使用wait/notify(All),不然JVM报java.lang.IllegalMonitorStateException 永远在while循环里使用wait&#xff0c;防止其他原因改变先前判断条件 永远在线程间共享对象(生产…