Java线程池的创建以及原理

news2025/1/11 21:44:52

一、为什么要使用线程池

在外面的日常开发中,也使用了不少池化技术,比如线程池、数据库连接池、HTTP连接池等等都是对这个思想的应用。
池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。

线程池提供了一种限制和管理资源(包括执行一个任务)的方式。每个线程池还维护一些基本统计信息,例如已完成任务的数量,正常执行的任务等等。
下面说一下开发中使用线程池的好处:
1、降低资源的消耗。通过重复利用已经创建的线程降低创建和销毁造成的消耗。
2、提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
3、提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

二、如何创建线程池

一般情况下,创建线程池使用ThreadPoolExecutor的方式,而不是使用Executors的方式去创建,这样的处理方式可以让写线程池的时候明白线程池的运行规则,避免资源耗尽的风险。
Executors返回线程池对象的弊端如下:
1、FixedThreadPoolSingleThreadExecutor:允许请求的队列长度为Integer.MAX_VALUE,可能堆积大量的请求,从而导致OOM。
2、CachedThreadPoolScheduledThreadPool:允许创建的线程数量为Integer.MAX_VALUE,可能导致OOM。

方式一:通过构造方法实现

new ThreadPoolExecutor()

在这里插入图片描述
方式二:通过Executor框架的工具类Executors来实现
可以创建三种类型的ThreadPoolExecutor
FixedThreadPool:该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在任务队列中,待有线程空闲时,便处理在任务队列中的任务。
SingleThreadExecutor:方法返回一个只要一个线程的线程池。若多余一个任务被提交到线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
CachedThreadPool:该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕之后,将会返回线程池进行复用。

三、核心线程数和最大线程数的关系

ThreadPoolExecutor的源代码如下:
在这里插入图片描述

	public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

ThreadPoolExecutor 3 个最重要的参数:
corePoolSize : 核心线程数定义了最小可以同时运行的线程数量。maximumPoolSize : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
ThreadPoolExecutor其他常见参数:
keepAliveTime:当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁;
unit : keepAliveTime 参数的时间单位。
threadFactory :executor 创建新线程的时候会用到。
handler :饱和策略(拒绝策略)。

四、线程池的饱和策略

如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolTaskExecutor 定义一些策略:
ThreadPoolExecutor.AbortPolicy: 抛出 RejectedExecutionException来拒绝新任务的处理。
ThreadPoolExecutor.CallerRunsPolicy: 调用执行自己的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。
ThreadPoolExecutor.DiscardPolicy: 不处理新任务,直接丢弃掉。
ThreadPoolExecutor.DiscardOldestPolicy: 此策略将丢弃最早的未处理的任务请求。举个例子: Spring 通过 ThreadPoolTaskExecutor 或者我们直接通过 ThreadPoolExecutor 的构造函数创建线程池的时候,当我们不指定 RejectedExecutionHandler 饱和策略的话来配置线程池的时候默认使用的是 ThreadPoolExecutor.AbortPolicy。在默认情况下,ThreadPoolExecutor 将抛出 RejectedExecutionException 来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用 ThreadPoolExecutor.CallerRunsPolicy。当最大池被填满时,此策略为我们提供可伸缩队列。(这个直接查看 ThreadPoolExecutor 的构造函数源码就可以看出,比较简单的原因)

五、线程池的原理

为了搞懂线程池的原理,我们需要首先分析一下 execute方法。我们可以使用 executor.execute(worker)来提交一个任务到线程池中去,这个方法非常重要,下面我们来看看它的源码:

// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
   private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    private static int workerCountOf(int c) {
        return c & CAPACITY;
    }
    //任务队列
    private final BlockingQueue<Runnable> workQueue;

    public void execute(Runnable command) {
        // 如果任务为null,则抛出异常。
        if (command == null)
            throw new NullPointerException();
        // ctl 中保存的线程池当前的一些状态信息
        int c = ctl.get();

        //  下面会涉及到 3 步 操作
        // 1.首先判断当前线程池中执行的任务数量是否小于 corePoolSize
        // 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 2.如果当前执行的任务数量大于等于 corePoolSize 的时候就会走到这里
        // 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态并且队列可以加入任务,该任务才会被加入进去
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
            if (!isRunning(recheck) && remove(command))
                reject(command);
                // 如果当前线程池为空就新创建一个线程并执行。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
        //如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
        else if (!addWorker(command, false))
            reject(command);
    }

addWorker 这个方法主要用来创建新的工作线程,如果返回 true 说明创建和启动工作线程成功,否则的话返回的就是 false。

  	// 全局锁,并发操作必备
    private final ReentrantLock mainLock = new ReentrantLock();
    // 跟踪线程池的最大大小,只有在持有全局锁mainLock的前提下才能访问此集合
    private int largestPoolSize;
    // 工作线程集合,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁mainLock的前提下才能访问此集合
    private final HashSet<Worker> workers = new HashSet<>();
    //获取线程池状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //判断线程池的状态是否为 Running
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }


    /**
     * 添加新的工作线程到线程池
     * @param firstTask 要执行
     * @param core参数为true的话表示使用线程池的基本大小,为false使用线程池最大大小
     * @return 添加成功就返回true否则返回false
     */
   private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            //这两句用来获取线程池的状态
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
               //获取线程池中工作的线程的数量
                int wc = workerCountOf(c);
                // core参数为false的话表明队列也满了,线程池大小变为 maximumPoolSize
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
               //原子操作将workcount的数量加1
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 如果线程的状态改变了就再次执行上述操作
                c = ctl.get();
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        // 标记工作线程是否启动成功
        boolean workerStarted = false;
        // 标记工作线程是否创建成功
        boolean workerAdded = false;
        Worker w = null;
        try {

            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
              // 加锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                   //获取线程池状态
                    int rs = runStateOf(ctl.get());
                   //rs < SHUTDOWN 如果线程池状态依然为RUNNING,并且线程的状态是存活的话,就会将工作线程添加到工作线程集合中
                  //(rs=SHUTDOWN && firstTask == null)如果线程池状态小于STOP,也就是RUNNING或者SHUTDOWN状态下,同时传入的任务实例firstTask为null,则需要添加到工作线程集合和启动新的Worker
                   // firstTask == null证明只新建线程而不执行任务
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                       //更新当前工作线程的最大容量
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                      // 工作线程是否启动成功
                        workerAdded = true;
                    }
                } finally {
                    // 释放锁
                    mainLock.unlock();
                }
                 如果成功添加工作线程,则调用Worker内部的线程实例t的Thread#start()方法启动真实的线程实例
                if (workerAdded) {
                    t.start();
                  /// 标记线程启动成功
                    workerStarted = true;
                }
            }
        } finally {
           // 线程启动失败,需要从工作线程中移除对应的Worker
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

六、设定线程池的大小

线程池数量的确定一直是困扰着程序员的一个难题,大部分程序员在设定线程池大小的时候就是随心而定。
对于多线程这个场景来说主要是增加了上下文切换成本。不清楚什么是上下文切换的话,可以看我下面的介绍。
上下文切换:多线程编程中一般线程的个数都大于 CPU 核心的个数,而一个 CPU 核心在任意时刻只能被一个线程使用,为了让这些线程都能得到有效执行,CPU 采取的策略是为每个线程分配时间片并轮转的形式。当一个线程的时间片用完的时候就会重新处于就绪状态让给其他线程使用,这个过程就属于一次上下文切换。概括来说就是:当前任务在执行完 CPU 时间片切换到另一个任务之前会先保存自己的状态,以便下次再切换回这个任务时,可以再加载这个任务的状态。任务从保存到再加载的过程就是一次上下文切换。上下文切换通常是计算密集型的。也就是说,它需要相当可观的处理器时间,在每秒几十上百次的切换中,每次切换都需要纳秒量级的时间。所以,上下文切换对系统来说意味着消耗大量的 CPU 时间,事实上,可能是操作系统中时间消耗最大的操作。Linux 相比与其他操作系统(包括其他类 Unix 系统)有很多的优点,其中有一项就是,其上下文切换和模式切换的时间消耗非常少。
类比于现实世界中的人类通过合作做某件事情,我们可以肯定的一点是线程池大小设置过大或者过小都会有问题,合适的才是最好。如果我们设置的线程池数量太小的话,如果同一时间有大量任务/请求需要处理,可能会导致大量的请求/任务在任务队列中排队等待执行,甚至会出现任务队列满了之后任务/请求无法处理的情况,或者大量任务堆积在任务队列导致 OOM。这样很明显是有问题的! CPU 根本没有得到充分利用。但是,如果我们设置线程数量太大,大量线程可能会同时在争取 CPU 资源,这样会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。
有一个简单并且适用面比较广的公式:
CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。
如何判断是 CPU 密集任务还是 IO 密集任务:
CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。但凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/357217.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

centos7系统-kubeadm安装k8s集群(v1.26版本)亲测有效,解决各种坑可供参考

文章目录硬件要求可省略的步骤配置虚拟机ip设置阿里镜像源各服务器初始化配置配置主节点的主机名称配置从节点的主机名称配置各节点的Host文件关闭各节点的防火墙关闭selinux永久禁用各节点的交换分区同步各节点的时间将桥接的IPv4流量传递到iptables的链&#xff08;三台都执行…

PHP面向对象01:面向对象基础

PHP面向对象01&#xff1a;面向对象基础一、关键字说明二、技术实现1. 定义类2. 类成员三、 访问修饰限定符1. public2. protected3. private4. 空修饰限定符四、类内部对象五、构造和析构1. 构造方法2. 析构方法六、范围解析操作符1. 访问类常量2. 静态成员3. self关键字七、类…

自动驾驶:时钟同步

文章目录 一、自动驾驶时间同步简介二、时间同步需要的服务1、PTP1.1 ptp4l三、UTC转换UNIX时间戳(timestamp)一、自动驾驶时间同步简介 二、时间同步需要的服务 1、PTP ptp4l -i mgbe3_0 -f /etc/automotive-slave.cfg & phc2sys -s mgbe3_0 -O 0

IDEA插件 RestfulTool插件——Restful服务开发辅助工具集

IDEA插件 RestfulTool插件——Restful服务开发辅助工具集 目录IDEA插件 RestfulTool插件——Restful服务开发辅助工具集1.插件介绍2.安装方式3.使用方法1.插件介绍 RestfulTool插件。一套 Restful 服务开发辅助工具集&#xff1a; 提供了一个 Services tree 的显示窗口 双击 …

Linux C/C++ 多线程TCP/UDP服务器 (监控系统状态)

Linux环境中实现并发TCP/IP服务器。多线程在解决方案中提供了并发性。由于并发性&#xff0c;它允许多个客户端同时连接到服务器并与服务器交互。 Linux多线程编程概述 许多应用程序同时处理多项杂务。服务器应用程序处理并发客户端&#xff1b;交互式应用程序通常在处理后台…

80211无线网络架构

无线网络架构物理组件BSS&#xff08;Basic Service Set&#xff09;基本服务集BSSID&#xff08;BSS Identification&#xff09;ssid&#xff08;Service Set Identification&#xff09;ESS&#xff08;Extended Service Set&#xff09;扩展服务集物理组件 无线网络包含四…

【C++学习】基础语法(三)

众所周知C语言是面向过程的编程语言&#xff0c;关注的是过程&#xff1b;解决问题前&#xff0c;需要分析求解的步骤&#xff0c;然后编辑函数逐步解决问题。C是基于面向对象的&#xff0c;关注的是对象&#xff0c;将一件事拆分成不同的对象&#xff0c;不同对象间交互解决问…

C++:类与对象

文章目录一.面向过程和面向对象的初步认识二.类1.类的初步认识2.类的定义3.类的访问限定符4.类的作用域5.类的实例化6.类对象模型三.this指针1.什么是this指针2.this指针的特性3.this指针的空指针问题四.浅谈封装五.类的默认成员函数1.构造函数1.1构造函数的概念1.2构造函数的用…

jenkins 安装 -适用于在线安装 后续写个离线安装的

jenkins安装1.下载jenkins2.安装启动3.附件卸载jdk的命令4.配置jenkins一、在jenkins配置文件中配置jdk环境变量二、修改jenkins默认的操作用户1.下载jenkins jenkins官网下载 https://www.jenkins.io/ 点击下载 我是centos系统所以选择centos&#xff0c;点击后按着官方提供…

golang简单实现chatgpt网页聊天

效果如图&#xff1a; 安装openai的sdk&#xff1a; go get github.com/sashabaranov/go-gpt3go代码&#xff1a; main.go package mainimport ("fmt""net/http""os"gogpt "github.com/sashabaranov/go-gpt3" )var client gogpt.N…

高可用 - 02 Keepalived_VRRP工作原理

文章目录Keepalived VS HeartbeatKeepalived的用途VRRP与工作原理物理路由器和虚拟路由器Keepalived VS Heartbeat Keepalived是Linux下一个轻量级的高可用解决方案&#xff0c;它与Heartbeat、RoseHA实现的功能类似&#xff0c;都可以实现服务或者网络的高可用&#xff0c;但…

SmS-Activate一款好用的短信验证码接收工具

前言 有些国外应用在使用应用上的功能时需要注册账号&#xff0c;由于某种不可抗因素&#xff0c;我们的手机号一般不支持注册&#xff0c;接收不到信息验证码&#xff0c;于是我们可以使用SmS-Activate提供的服务&#xff0c;使用$实现我们的需求&#xff08;大概一次验证1-5…

Python练习系统

用python给自己做个练习系统刷题吧&#xff01; #免费源码在文末公众号哈# 选择题 def xuanze():global flag2if flag21:def insert():numvar1.get()questionvar2.get()choicevar3.get()answervar4.get()with open(d:\\练习系统\\练习三3.1.pickle,rb) as file:lst1pickle.lo…

golang入门笔记——内存管理和编译器优化

静态分析 静态分析&#xff1a;不执行程序代码&#xff0c;推导程序的行为&#xff0c;分析程序的性质 控制流&#xff08;control flow&#xff09;&#xff1a;程序的执行流程 数据流&#xff08;data flow&#xff09;&#xff1a;数据在控制流上的传递 通过分析控制流和…

网络安全应急响应服务方案怎么写?包含哪些阶段?一文带你了解!

文章目录一、服务范围及流程1.1 服务范围1.2 服务流程及内容二、准备阶段2.1 负责人准备内容2.2 技术人员准备内容&#xff08;一&#xff09;服务需求界定&#xff08;二&#xff09;主机和网络设备安全初始化快照和备份2.3市场人员准备内容&#xff08;1&#xff09;预防和预…

全网最新的软件测试/自动化测试必问的面试题合集

1.你为什么选择软件测试行业因为之前有了解软件测试这个行业&#xff0c;觉得他的发展前景很好。2.根据你以前的工作经验描述一下软件开发、测试过程&#xff0c;由那些角色负责&#xff0c;你做什么要有架构师、开发经理、测试经理、程序员、测试员。我在里面主要是负责所分到…

曙光超算平台 如何使用 Tensorboard (乌镇中心)

在E-Shell中执行 1. salloc -p 队列名 -N 1 -n 32 --gresdcu:4 salloc -p wzhdtest -N 1 -n 32 --gresdcu:4 2. ssh 节点名 ssh g01r3n07 3. conda deactivate 4. 查看当前所有模块 module ava compiler 从上图中加载某个dtk模块 module load compiler/dtk/21.10 注&…

第九章 vue 进阶篇 Element Plus 基本使用

Element Plus 基本使用 element-ui 是基于vue 开发的一套ui组件库&#xff0c;提供丰富的网页开发组件&#xff0c;可用快速开发网站&#xff0c;降低前端开发成本 版本 element目前有两个版本 element-ui&#xff1a;基于vue2 element-plus: 基于vue3官网地址 https://elem…

JVM知识体系学习七:了解JVM常用命令行参数、GC日志详解、调优三大方面(JVM规划和预调优、优化JVM环境、JVM运行出现的各种问题)

文章目录前言一、了解JVM常用命令行参数1、命令行参数概述2、常用命令3、通过案例学命令行参数(Linux)4、区分概念二、GC日志详解1、打印详细日志2、日志描述3、解析案例三、调优前的基础概念四、调优是什么&#xff1f;1、调优1&#xff1a;JVM规划和预调优a、涨知识时刻b、概…

解決 torch 無法使用GPU

1.使用 import torch torch.cuda.is_available() ------> False print(torch.version) --> 查詢 torch 版本 2.命令行&#xff0c;輸入 nvidia-smi 3.直接去網站找出相對應版本 https://download.pytorch.org/whl/torch_stable.html cuda : 11.7 -> cu117 python3.…