线程池及源码分析

news2025/4/8 5:53:52

目录

1 java构建线程的方式

2 线程池的7个参数

3 线程池属性标识&线程池的状态

3.1 核心属性

3.2 线程池的状态

4 线程池的执行流程

5 添加工作线程的流程

6 Worker的封装&后续任务的处理


1 java构建线程的方式

一般就3~4种:

  • 继承Thread:发现Thread类已经实现了Runnable,继承Thread类时,本质也是间接的实现了Runnable
  • 实现Runnable:(本质只有Runnable这种方式)
  • 实现Callable:手动去实现话,将Callable的实现传入到FutureTask,讲FutureTask传入Thread的有参构造。发现FutureTask本身也间接的实现的Runnable接口。
  • 线程池:线程池构建工作线程时,构建的Worker对象,Worker也实现了Runnabl接口。

本质只有1种:实现Runnable接口

2 线程池的7个参数

为什么要用线程池?(玩多线程的目的是为了充分发挥硬件资源的性能。)

  • 多线程场景:接口优化(让之前串行处理的任务,现在可以并行处理加快速度。)
  • 管理线程(控制线程数量):线程也要占用内存资源,线程需要被CPU调度,如果线程数量不好好控制,反而会造成业务处理速度变慢。
  • 重复利用(重复创建问题):每次线程的构建都需要分配内存资源,用完之后,还要归还内存资源,成本很高,利用连接池的思想,构件好反复用就ok。

JDK其实提供了Executors的工具类,可以去很方便的构建一些常用的线程池。BUT,因为Executors封装好之后,很多参数无法主动去设置,不能更好的去管理线程池。

使用线程池,强烈推荐手动构建ThreadPoolExecutor。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    // corePoolSize:核心线程数(核心线程默认情况下不会被干掉,一直在线程池里)
    // maximumPoolSize:最大线程数(核心线程数 + 非核心线程数)
    // keepAliveTime:非核心线程的最大空闲时间。
    // unit:最大空闲时间的时间单位
    // workQueue:首先有活来了,先让核心线程处理,核心线程处理不过来了,先堆workQueue里。实在处理不过来。临时
    构建非核心线程,来处理workQueue中的任务。
    // threadFactory:构建thread对象的一个工厂。
    // handler:拒绝策略,线程没有空闲,阻塞队列也放满了,这时,再来任务,就拒绝滴干活~
}

3 线程池属性标识&线程池的状态

3.1 核心属性

线程池中核心属性是AtomicInteger 类型的ctl(可以理解为线程安全的int类型)。ctl的高3位维护了线程池状态,低29位维护了工作线程个数

// 线程池的核心属性,ctl,
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//可以看做:private int ctl = 0;
// ctl属性标识了线程池的两个信息
// int是一个32个bit位的数值
// 其中高3位,标识线程池状态。  低29位,标识工作线程个数(工作线程的最大值:2^29-1)

// Integer.SIZE ,是获取int类型占用的位数。 Integer.SIZE = 32
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_BITS = 29;

// CAPACITY 线程池中最大的工作线程个数
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

00000000 00000000 00000000 00000001(1)
00100000 00000000 00000000 00000000(1 << 29)
00011111 11111111 11111111 11111111(1 << 29) - 1

3.2 线程池的状态

RUNNING:一切正常,任务正常处理

SHUTDOWN:不接收新任务,但是正在处理的任务要完成,阻塞队列的任务要处理完毕。

STOP:不接收新任务,中断正在处理的任务,阻塞队列的任务全部丢掉。

// 线程池的5个状态
111
private static final int RUNNING    = -1 << COUNT_BITS;
000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
001
private static final int STOP       =  1 << COUNT_BITS;
010
private static final int TIDYING    =  2 << COUNT_BITS;
011
private static final int TERMINATED =  3 << COUNT_BITS;

4 线程池的执行流程

分析执行流程,要查看execute方法。

// 提交任务到线程池的方法
public void execute(Runnable command) {
	// 非空判断,没啥说的。
    if (command == null)
        throw new NullPointerException();
    // 获取核心属性。
    int c = ctl.get();
    // workerCountOf:获取工作线程个数
    // 当前工作线程个数 < 核心线程数
    if (workerCountOf(c) < corePoolSize) {
    	// 添加一个工作线程    true:核心线程  false:非核心线程
    	// 添加工作线程会有并发问题,如果添加成功返回true,失败返回false
        if (addWorker(command, true))
        	// 如果添加成功,告辞~~~
            return;
        // 添加失败,重新获取ctl,看下什么情况
        c = ctl.get();
    }
    // isRunning:线程池状态还是RUNNING嘛?
    // 如果是RUNNING,就把任务扔到阻塞队列里,先排队等着。
    if (isRunning(c) && workQueue.offer(command)) {
    	// 再次获取ctl属性
        int recheck = ctl.get();
        // 再次确认线程池状态是否是RUNNING,如果不是RUNNING,将任务从阻塞队列移除
        if (!isRunning(recheck) && remove(command))
        	// 执行拒绝策略
            reject(command);
        // 现在阻塞队列有任务,但是没有工作线程
        else if (workerCountOf(recheck) == 0)
        	// 创建一个非核心线程,去处理阻塞队列任务
            addWorker(null, false);
    }

    // 如果任务没有扔到阻塞队列,添加非核心线程去处理任务
    else if (!addWorker(command, false))
    	// 如果添加非核心线程失败,执行拒绝策略
        reject(command);
}

5 添加工作线程的流程

添加工作线程本质就是addWorker方法的流程

private boolean addWorker(Runnable firstTask, boolean core) {

	//=====================线程池状态&工作线程个数的判断========================================
    retry:
    for (;;) {
    	// 拿ctl并且拿到高三位的值
        int c = ctl.get();
        int rs = runStateOf(c);

        // 状态大于等于SHUTDOWN(如果状态不是RUNNING,不需要添加工作线程)
        if (rs >= SHUTDOWN &&
        	// 阻塞队列有任务,没有工作线程,会addWorker(null,false);
        	// 线程池状态是SHUTDOWN,并且阻塞队列任务,没有工作线程处理
        	// 针对addWorker(null,false)的放行
            !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))

        	// 进来就无法处理,因为状态不对,直接告辞~
            return false;

        // 判断工作线程个数
        for (;;) {
        	// 获取工作线程个数
            int wc = workerCountOf(c);
            // 如果大于低29位,直接告辞。
            if (wc >= CAPACITY ||
            	// 根据core决定判断核心线程个数还是最大线程个数
                wc >= (core ? corePoolSize : maximumPoolSize))
            	// 个数不够~
                return false;
            // 基于CAS对ctl + 1  
            if (compareAndIncrementWorkerCount(c))
            	// 如果成功,直接跳出外层for循环
                break retry;
            // 到这,说明出现了并发情况,重新获取一次ctl
            c = ctl.get(); 
            // 如果状态改变了。重新走外层for循环
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    //=====================创建工作线程&启动工作线程========================================
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
    	// new一个工作线程,将任务扔进去。
        w = new Worker(firstTask);
        // 拿到工作线程的Thread
        final Thread t = w.thread;
        // Thread是ThreadFactory构建出来的,如果构建的是null,是程序员提供的ThreadFactory有问题
        if (t != null) {
        	// 加锁,因为后面要操作hashSet还有一个int类型,保证线程安全
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
                // 状态是RUNNING
                if (rs < SHUTDOWN ||
                	// 这个是为了addWorker(null,false)情况的判断
                    (rs == SHUTDOWN && firstTask == null)) {
                	// 线程启动了么?避免程序员提供的ThreadFactory有问题
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException();
                    // 工作线程扔Set集合,用HashSet维护的工作线程
                    workers.add(w);
                    int s = workers.size();
                    // largestPoolSize是记录线程池中工作线程数的最大值记录
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // 添加成功!
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
            	// 如果添加成功,启动工作线程,并且设置workerStarted为true
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

6 Worker的封装&后续任务的处理

调用Worker的有参构造时,会基于ThreadFactory构建线程,并且将Worker对象本身作为Runnable 传入到Thread对象中,当执行Thread的start方法时,会执行Worker的run方法,最终执行是run方法内部的runWorker方法。

后续处理任务的方式~

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    boolean completedAbruptly = true;
    try {
    	// 任务的获取方式有2种
    	// 1:直接基于addWorker携带过来的任务,优先处理。
    	// 2:如果addWorker携带的任务处理完毕,或者没携带任务,直接从阻塞队列中获取
        while (task != null || (task = getTask()) != null) {
            // 处理任务!!!! 省略部分代码
            task.run();
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}
private Runnable getTask() {
    for (;;) {
        try {
            Runnable r = timed ?
            	// 从阻塞队列获取任务
            	// 如果执行poll,代表是非核心线程,等待一会,没任务,直接告辞
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                // 如果执行take,代表是核心线程,死等。
                workQueue.take();
            if (r != null)
                return r;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/145678.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

迎接新年,暂且用Python绘制几个中国结吧

前言 今天就来分享几个用python绘制的图案吧 马上就要迎来新年了 就绘制了几个中国结&#xff0c;嘿嘿 话不多说&#xff0c;直接展示一下代码和效果图吧 更多学习资料与源码点击文章末尾名片领取 1. 效果图&#xff1a; 代码展示 import turtle turtle.screensize(600,…

GPDB插件安装工具之gppkg

gppkg命令gppkg是一个python3编写的打包脚本&#xff0c;在整个集群中安装.gppkg格式的Greenplum数据库扩展&#xff08;例如PL/Java、PL/R和MADlib&#xff09;及其依赖项&#xff0c;位于/usr/local/cloudberry-db/bin/gppkg(自己安装的gpdb目录)&#xff0c;安装到$GPHOME里…

1个寒假能学多少网络安全知识?

现在可以看到很多标题都声称三个月内就可以转行网络安全领域&#xff0c;并且成为月入15K的网络工程师。那么&#xff0c;这个寒假的时间能学多少网络安全知识&#xff1f;是否能入门网络安全工程师呢&#xff1f; 答案是肯定的。 虽然网络完全知识是一门广泛的学科&#xff…

ccc-sklearn-13-朴素贝叶斯(1)

朴素贝叶斯 一种直接衡量标签和特征之间概率关系的有监督学习算法&#xff0c;专注分类的算法&#xff0c;基于概率论和数理统计的贝叶斯理论。在计算的过程中&#xff0c;假设特征之间条件独立&#xff0c;不进行建模&#xff0c;采用后验估计。 sklearn中的朴素贝叶斯 类含…

1-选择题练手

1.采用递归方式对顺序表进行快速排序&#xff0c;下列关于递归次数的叙述中&#xff0c;正确的是 A.每次划分后&#xff0c;先处理较长的分区可以减少递归次数 B.递归次数与初始数据的排列次序无关 C.每次划分后&#xff0c;先处理较短的分区可以减少递归次数 D.递归次数与…

DaVinci:键 - 外部蒙版

调色页面&#xff1a;键Color&#xff1a;Key在调色页面&#xff0c;可以轻松地从媒体池将某个片段拖至节点面板中&#xff0c;以作为外部蒙版。或者&#xff0c;在节点上右击选择“添加蒙版” Add Matte。若无附加&#xff0c;则可以选择本节点片段的明度信息作为外部蒙版。当…

hbase2.x orphan regions on filesystem(region丢失)问题修复

问题描述&#xff1a;orphan regions on filesystem 可以通过主master web页面的HBCK Report查看 也可以通过hbck2工具查看 # 查看指定表 hbase hbck -j $HBASE_HOME/lib/hbase-hbck2-1.3.0-SNAPSHOT.jar addFsRegionsMissingInMeta default:tableName # 查看命名空间下所有…

Yolov5+TensorRT-生成dll-python/c++调用dll

YOlov5-6.0TensorRTdllpython/c调用简介1.项目环境2.TensorRT验证1.在tensorrtx-yolov5-v6.0\yolov5目录下新建build目录2.编写CMake.txt,根据自己目录更改2&#xff08;OpenCV_DIR&#xff09;、3&#xff08;TRT_DIR&#xff09;、10&#xff08;Dirent_INCLUDE_DIRS&#xf…

LabVIEW网络服务器何使用,有哪些不同

LabVIEW网络服务器何使用&#xff0c;有哪些不同NI有几款不同的Web服务器&#xff0c;可使用不同的产品并覆盖不同的用例。它们具有非常相似的名称&#xff0c;可以互换使用&#xff0c;但每个都提供不同的功能。应用程序Web服务器描述&#xff1a;NI应用Web服务器加载使用LabV…

企业微信商户号是什么?如何开通?

企业微信作为一款优秀的移动办公工具&#xff0c;与微信全方位打通&#xff0c;既可以与客户沟通交流&#xff0c;也可以在达成交易后直接进行对公收款&#xff0c;但是前提是要开通企业微信商户号。前言企业微信和微信都出自腾讯&#xff0c;而且企业微信全方位连接微信&#…

C#,图像二值化(16)——全局阈值的力矩保持算法(Moment-proserving Thresholding)及其源代码

1、力矩保持法 提出了一种基于矩保持原理的自动阈值选择方法。以这样的方式确定地计算阈值&#xff0c;即在输出画面中保留输入画面的时刻。实验结果表明&#xff0c;该方法可以将给定的图像阈值化为有意义的灰度级。该方法描述了全局阈值&#xff0c;但也适用于局部阈值。 A…

企业微信开发——企业内部自建应用开发(第二篇)---JS_SDK配置

企业微信如果想要使用企业微信的JS_SDK来实现拍照、定位等等功能&#xff0c;就需要预先在使用到的页面进行配置&#xff0c;当然你可以做全局配置。对于JS_SDK的配置设计前端和后端的统一配置。下面我来说明下具体的步骤。特别说明&#xff1a;1、企业微信有的接口需要配置wx.…

shader基础入门(1)

本文基于unity免费公开课“Hi Shader以及网络公开资料等书写”遵循开源协议。 MeshFilter网格过滤器 从海量资源中挑选适合的Mesh将他交给MeshRender MeshRenderer 网格渲染器 负责把MeshFilter丢过来的Mesh&#xff0c;绘制显示到我们的场景中 Material 材质球 Material…

多线程之死锁

目录&#xff1a; 1.什么是死锁&#xff1f; 2.可重入与不可重入 3.发生死锁的三个典型情况 4.发生死锁的四个必要条件 5.如何破除死锁&#xff1f; 1.什么是死锁&#xff1f; 谈到死锁&#xff0c;程序猿们都心存忌惮&#xff0c;因为程序一旦出现死锁&#xff0c;就会导…

深度学习训练营之鸟类识别

深度学习训练营之鸟类识别原文链接环境介绍前置工作设置GPU导入数据并进行查找数据处理可视化数据配置数据集残差网络的介绍构建残差网络模型训练开始编译结果可视化训练样本和测试样本预测原文链接 &#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&am…

机器学习:如何解决类别不平衡问题

类别不平衡是一个常见问题&#xff0c;其中数据集中示例的分布是倾斜的或有偏差的。 1. 简介 类别不平衡是机器学习中的一个常见问题&#xff0c;尤其是在二元分类领域。当训练数据集的类分布不均时会发生这种情况&#xff0c;从而导致训练模型存在潜在偏差。不平衡分类问题的示…

【Unity云消散】理论基础:实现SDF的8SSEDT算法

距离元旦假期已经过去5天了&#xff08;从31号算起&#xff01;&#xff09;&#xff0c;接着开始学习&#xff01; 游戏中的很多渲染效果都离不开SDF&#xff0c;那么SDF究竟是什么呢&#xff1f;到底是个怎么样的技术&#xff1f;为什么能解决那么多问题&#xff1f; 1 SD…

git介绍及环境搭建

git介绍及环境搭建Git介绍Git安装流程配置用户信息git工作流程与常用命令问题点总结主要工作流程git工作流程与原理总结Git介绍 1.Git是什么&#xff1f; Git版本控制系统是一个分布式的系统,是用来保存工程源代码历史状态(游戏存档)的命令行工具 GIT是一个命令行工具,用于版…

基于Java+Spring+vue+element社区疫情服务平台设计和实现

基于JavaSpringvueelement社区疫情服务平台设计和实现 博主介绍&#xff1a;5年java开发经验&#xff0c;专注Java开发、定制、远程、文档编写指导等,csdn特邀作者、专注于Java技术领域 作者主页 超级帅帅吴 Java毕设项目精品实战案例《500套》 欢迎点赞 收藏 ⭐留言 文末获取源…

Django+channels -> websocket

Django+channels -> websocket 学习视频: https://www.bilibili.com/video/BV1J44y1p7NX/?p=10 workon # 查看虚拟环境 mkvirtualenv web -p python3.10 # 创建虚拟环境 workon web # 进入虚拟环境pip insatll django channelsdjango-admin startproject ws_demo python …