Java基础系列(八)——线程池详解

news2024/11/24 22:28:18

目录

线程池详解

什么是线程池?为什么要用线程池?

如何创建线程池

ThreadPoolExecutor类分析

ThreadPoolExecutor 饱和策略

简单的线程池Demo

ThreadPoolExecutor线程池新增线程流程


线程池详解

什么是线程池?为什么要用线程池?

处理高并发场景或者是经常连接数据库的小伙伴应该经常听到或者用过“池”,那么什么是线程池呢?为什么要用线程池呢?

举个简单的例子:

从前,有个店铺,每天的客户都很少,所以,不需要一个常驻的收银员,都是来一个客户找个临时工充当收银员,结束后临时工就被辞退,每次都是招聘与解雇循环往复。当客户量比较少的时候还可以处理的过来(单线程模式),过几年之后,店铺越做越大,客户越来越多,这种模式就已经处理不过来增长的用户量了(高并发场景)。此时,常驻收银员就出现了,而且可能一个常驻还不够,需要好几个。这个时候我们就可以将这一堆的常驻收银员看成是一个"池",每来一个客户就处理业务,没有客户就进行等待,省去了重复招聘与解雇的时间了。

池化技术的思想主要是为了减少每次获取资源的消耗提升对资源的利用率。线程池提供了一种限制和管理资源(包括执行一个任务)的方式。每一个线程池还维护一些基本统计信息,例如已完成任务的数量。

在《Java并发编程的艺术》提到的使用线程池的好处:

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

如何创建线程池

在《阿里巴巴Java开发手册》中提到强制线程池中不允许使用Executor去创建,而是通过ThreadPoolExecutor的方式,这样可以更加灵活去使用线程池,规避资源耗尽的风险。

Executors 返回线程池对象的弊端如下:

  • FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE ,可能堆积大量的请求,从而导致 OOM。
  • CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。

方法1:使用ThreadPoolExecutor的构造方法实现

 后面会对于这个类进行详细分析。

方法2:通过Executor框架的工具类Executors来实现

我们可以创建三种类型的 ThreadPoolExecutor:

  • FixedThreadPool : 该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
  • SingleThreadExecutor: 方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
  • CachedThreadPool: 该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。

直接看看源码。

 可以看源码发现,Executor框架底层都是使用ThreadPoolExecutor进行实现,并且确实如《阿里巴巴Java开发手册》所说,默认的值都是给到了integer的最大值,这样相当于埋下雷,指不定哪天就OOM了……

ThreadPoolExecutor类分析

ThreadPoolExecutor 类中提供的四个构造方法。我们来看最长的那个,其余三个都是在这个构造方法的基础上产生(其他几个构造方法说白点都是给定某些默认参数的构造方法比如默认制定拒绝策略是什么)。

 参数分析:

ThreadPoolExecutor 3 个最重要的参数:

  • corePoolSize : 核心线程数定义了最小可以同时运行的线程数量。
  • maximumPoolSize : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
  • workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
    • ArrayBlockingQueue:基于数组的有界阻塞队列,按FIFO排序。新任务进来后,会放到该队列的队尾,有界的数组可以防止资源耗尽问题。

    • LinkedBlockingQuene:基于链表的无界阻塞队列(其实最大容量为Interger.MAX),按照FIFO排序。由于该队列的近似无界性,当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,而不会去创建新线程直到maxPoolSize,因此使用该工作队列时,参数maxPoolSize其实是不起作用的。

    • SynchronousQuene:一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize,则执行拒绝策略。

    • PriorityBlockingQueue:具有优先级的无界阻塞队列,优先级通过参数Comparator实现。

ThreadPoolExecutor其他常见参数:

  1. keepAliveTime:当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁;
  2. unit : keepAliveTime 参数的时间单位。
  3. threadFactory :创建一个新线程时使用的工厂,可以用来设定线程名、是否为daemon线程等等。
  4. handler :饱和策略。关于饱和策略下面单独介绍一下。

ThreadPoolExecutor 饱和策略

ThreadPoolExecutor 饱和策略定义:

如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolTaskExecutor 定义一些策略:

  • ThreadPoolExecutor.AbortPolicy 抛出 RejectedExecutionException来拒绝新任务的处理。
  • ThreadPoolExecutor.CallerRunsPolicy 调用执行自己的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。
  • ThreadPoolExecutor.DiscardPolicy 不处理新任务,直接丢弃掉。
  • ThreadPoolExecutor.DiscardOldestPolicy 此策略将丢弃最早的未处理的任务请求。

举个例子: Spring 通过 ThreadPoolTaskExecutor 或者我们直接通过 ThreadPoolExecutor 的构造函数创建线程池的时候,当我们不指定 RejectedExecutionHandler 饱和策略的话来配置线程池的时候默认使用的是 ThreadPoolExecutor.AbortPolicy。在默认情况下,ThreadPoolExecutor 将抛出 RejectedExecutionException 来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用 ThreadPoolExecutor.CallerRunsPolicy。当最大池被填满时,此策略为我们提供可伸缩队列。(这个直接查看 ThreadPoolExecutor 的构造函数源码就可以看出,比较简单的原因,这里就不贴代码了)

简单的线程池Demo

创建一个Runnable接口的实现类

import java.util.Date;

/**
 * 这是一个简单的Runnable类,需要大约5秒钟来执行其任务。
 */
public class MyRunnable implements Runnable {

    private String command;

    public MyRunnable(String s) {
        this.command = s;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date());
        processCommand();
        System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date());
    }

    private void processCommand() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String toString() {
        return this.command;
    }
}

使用推荐的ThreadPoolExecutor构造函数自定义参数的方式创建线程池。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutorDemo {

    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE = 10;
    private static final int QUEUE_CAPACITY = 100;
    private static final Long KEEP_ALIVE_TIME = 1L;
    public static void main(String[] args) {

        //使用阿里巴巴推荐的创建线程池的方式
        //通过ThreadPoolExecutor构造函数自定义参数创建
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                new ThreadPoolExecutor.CallerRunsPolicy());

        for (int i = 0; i < 10; i++) {
            //创建 MyRunnable 对象(MyRunnable 类实现了Runnable 接口)
            Runnable worker = new MyRunnable("" + i);
            //执行Runnable
            executor.execute(worker);
        }
        //终止线程池
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
        System.out.println("Finished all threads");
    }
}

参数:

  1. corePoolSize: 核心线程数为 5。
  2. maximumPoolSize :最大线程数 10
  3. keepAliveTime : 等待时间为 1L。
  4. unit: 等待时间的单位为 TimeUnit.SECONDS。
  5. workQueue:任务队列为 ArrayBlockingQueue,并且容量为 100;
  6. handler:饱和策略为 CallerRunsPolicy

output:

pool-1-thread-3 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-5 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-2 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-1 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-4 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-3 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-4 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-1 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-5 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-1 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-2 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-5 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-4 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-3 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-2 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-1 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-4 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-5 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-3 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-2 End. Time = Sun Apr 12 11:14:47 CST 2020

分析

承接 4.6 节,我们通过代码输出结果可以看出:线程池首先会先执行 5 个任务,然后这些任务有任务被执行完的话,就会去拿新的任务执行。

现在,我们就分析上面的输出内容来简单分析一下线程池原理。

为了搞懂线程池的原理,我们需要首先分析一下 execute方法。 在Demo 中我们使用 executor.execute(worker)来提交一个任务到线程池中去,这个方法非常重要,下面我们来看看它的源码:

// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static int workerCountOf(int c) {
    return c & CAPACITY;
}

private final BlockingQueue<Runnable> workQueue;

public void execute(Runnable command) {
    // 如果任务为null,则抛出异常。
    if (command == null)
        throw new NullPointerException();
    // ctl 中保存的线程池当前的一些状态信息
    int c = ctl.get();

    //  下面会涉及到 3 步 操作
    // 1.首先判断当前线程池中执行的任务数量是否小于 corePoolSize
    // 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 2.如果当前执行的任务数量大于等于 corePoolSize 的时候就会走到这里
    // 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态并且队列可以加入任务,该任务才会被加入进去
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
        if (!isRunning(recheck) && remove(command))
            reject(command);
            // 如果当前线程池为空就新创建一个线程并执行。
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
    //如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
    else if (!addWorker(command, false))
        reject(command);
}

流程如图:

流程:

我们在代码中模拟了 10 个任务,我们配置的核心线程数为 5 、等待队列容量为 100 ,所以每次只可能存在 5 个任务同时执行,剩下的 5 个任务会被放到等待队列中去。当前的5个任务中如果有任务被执行完了,线程池就会去拿新的任务执行。

ThreadPoolExecutor线程池新增线程流程

从注释可以看到这个HashSet是存放所有的工作线程的容器,也就是线程池最核心的容器。我们看到ThreadPoolExecutor的构造函数中并没有对workers进行添加操作。只是对于变量进行了一个赋值操作,也就是说在ThreadPoolExecutor被new出来后workers容器里面是空的,所以线程池创建时内部可用线程数为0。那什么时候才创建了线程放在线程池中?我们知道提交任务无非两种方式execute和submit。

这里简单说下执行execute和submit的区别:

  1. execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;
  2. submit()方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功,并且可以通过 Future 的 get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
  3. 线程池针对不同的提交方式会抛出堆栈异常,execute方法会抛出异常而submit则不会。出现异常不会影响其他线程任务的执行,最后该异常线程会被清理,线程池会重新添加一个新线程作为补充。

源码展示下:

execute在ThreadPoolExecutor中

其中三个if的判断逻辑:

  • 如果运行的线程少于corePoolSize,尝试以给定的命令作为第一个启动新线程

  • 如果一个任务可以成功地排队,那么我们仍然需要再次检查是否应该添加线程,还是应该在进入此方法后关闭池。

  • 如果不能对任务进行排队,则尝试添加一个新线程。

submit在抽象类AbstractExecutorService中

 其实可以看到,submit的底层还是调用的是execute方法。

 回归正题,新增线程的过程。

可以看看debug的过程

其实,就是调用了addWorker方法的时候,如果成功就会在之前的hashSet中插入一个work,可以看看下面addWorker源码。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/4729.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Word2Vec原理以及实战详解

文章目录前言0、序言(词嵌入介绍)一、Word2vec详解。二、CBOW 和 Skip-Gram详解。2-1、CBOW模型:&#xff08;已知周围词预测中心词&#xff09;2-2、Skip-Gram模型&#xff08;已知中心词预测周围词&#xff09;2-3、词嵌入的缺点三、Word2vec实战&#xff08;使用Gensim包&am…

[SQL]视图和权限

有时让用户看到整个基本表是不合适的&#xff0c;出于安全考虑&#xff0c;可能需要向用户隐藏特定的数据。如在instructor关系中隐藏工资值&#xff1a; select ID,name,dept_name from instructor;创建更符合特定用户直觉的个人化的关系集合&#xff0c;如学生成绩表由学生关…

一文学会JavaScript计时事件

文章目录JavaScript 计时事件setInterval() 方法clearInterval() 方法setTimeout() 方法clearTimeout() 方法JavaScript 计时事件 通过使用 JavaScript&#xff0c;我们有能力做到在一个设定的时间间隔之后来执行代码&#xff0c;而不是在函数被调用后立即执行。我们称之为计时…

Linux--基础命令

一、Linux指令 mkdir&#xff08;Make Directory&#xff09;&#xff1a;在当前路径下新建一个目录 -p --parents 可以是一个路径名称。此时若路径中的某些目录尚不存在,加上此选项后,系统将自动建立好那些尚不存在的目录,即一次可以建立多个目录; ls&#xff08;List…

Day796.什么是线程数据的强、弱一致性 -Java 性能调优实战

什么是线程数据的强、弱一致性 Hi&#xff0c;我是阿昌&#xff0c;今天学习记录的是关于什么是线程数据的强、弱一致性。 一致性&#xff0c;其实在系统的很多地方都存在数据一致性的相关问题。 除了在并发编程中保证共享变量数据的一致性之外&#xff0c;还有数据库的 ACI…

Autosar基本概念详细介绍

Autosar的由来 在汽车创新应用不断涌现的推动下&#xff0c;当代汽车电子电气&#xff08;E/E—Electronic/Electrical&#xff09;架构已经非常复杂&#xff0c;需要有创新的技术突破才能有效地进行管理&#xff0c;满足日益增长的乘客需求和法律要求。这个需求对汽车制造商及…

记一次简单的HTTP绕WAF

0X01 基础知识 关于WAF的问题&#xff1f; 它是我们日常攻防演练必会遇见的&#xff0c;在IOS七层模型中&#xff0c;WAF分为网络层、应用层的&#xff0c;当然还有云 WAF&#xff08;CDNWAF&#xff09;这新型类场景的。不同环境下我们绕过WAF的思路也是有所区别的&#xff…

mybatis单框架通用mapper使用(二)

mybatis单框架通用mapper使用(二) 1 查询 1.1 简单查询 1.1.1 查多条 a 用法 接口引用.select(实体类对象引用); //里面实体类对象,里面不为null值的部分就会作为条件被查询,多个条件使用and进行拼接起来 //传入为null就是查询全部的值b 测试代码 Testpublic void t2(){Sq…

直播带货系统源码,居家“神器”不出门就能购物

如今&#xff0c;直播带货的火爆程度已经超出了人们的想象。线下销售行业的卖家也开启了直播带货模式&#xff0c;直播带货的的火爆归根到底还是消费者的购物方式发生的变化。从传统的线下购物到电商购物&#xff0c;再到今天的直接直播带货。从各大数据可以总结&#xff0c;消…

应用密码学期末速通复习

文章目录模运算分数求模负数求模gcd最大公约数逆元分组密码DES加密AES加密操作模式ECB电子密码本CBC分组链接CFB密码反馈OFB输出反馈序列密码A5-1算法RC4算法Hash函数Md5算法SHA-1算法消息认证数字信封公钥密码背包公钥算法RSA公钥算法Rabin公钥算法ElGamal公钥算法ECC公钥算法…

C/C++刷题DAY2

1.第一题 27. 移除元素 - 力扣&#xff08;LeetCode&#xff09; 分析&#xff1a;对于此题&#xff0c;我们使用双指针的方式去写它&#xff0c;需要注意空间复杂度是O&#xff08;1&#xff09;&#xff0c;时间复杂度也是尽量的越低越好&#xff0c;要去需要原地修改数组&a…

Matplotlib设置刻度和刻度标签

Matplotlib在我们之前的所有例子中都自动接管了轴上间隔点的任务。Matplotlib的默认刻度定位器和格式化器在很多常见情况下通常都足够了。可以明确提及刻度线的位置和标签以满足特定要求。 xticks()和yticks()函数将列表对象作为参数。列表中的元素表示将显示刻度的相应操作的位…

nginx学习:配置文件详解,负载均衡三种算法学习,上接nginx实操篇

文章目录前言一、对上一篇博文的分析二、配置文件分析1. nginx 官方网址&#xff08;很详细&#xff09;2、配置文件&#xff08;全&#xff09;3、配置文件&#xff08;去掉注释&#xff09;4、讲解a、nginx 配置文件有三部分组成b、全局块c、events块d、http块5、http块中loc…

无脚本自动化测试

在当今的企业环境中&#xff0c;软件测试不再被视为不必要的投资&#xff1b;相反&#xff0c;它已经上升到一种需要而不是奢侈品的水平。随着市场的不断变化和竞争的加剧&#xff0c;企业必须做一些让他们与竞争对手区分开来的事情。 为了使自己与众不同&#xff0c;公司必须…

Dockerfile

Dockerfile指令集 对于Dockerfiel而言&#xff0c;是在学习docker工具里面&#xff0c;最重点的内容&#xff0c;它可以帮助我们生成自己想要的基础镜像。部署一个容器最重要的就是镜像&#xff0c;指令都已经内置好了。 FROM 这个镜像的妈妈是谁&#xff1f;&a…

数据结构每日亿题(四)

复制带随机指针的链表 原题传送门&#xff1a;力扣 题目&#xff1a; 这题的大概意思就是&#xff1a; 有这样一个链表&#xff0c;他比普通的链表多一个成员变量&#xff1a;random指针&#xff0c;这个random指针指向的是这个链表中随机一个地方&#xff0c;这个地方是其它节…

NetSim网络仿真使用及静态路由配置

&#x1f370; 个人主页:__Aurora__ &#x1f35e;如果文章有什么需要改进的地方还请各位大佬指正。 &#x1f349;如果我的文章对你有帮助➡️ 关注&#x1f64f;&#x1f3fb; 点赞&#x1f44d; 收藏⭐️ NetSim网络仿真使用及静态路由配置。 实验要求及其步骤 使用Boson N…

Java面试笔记:Java线程安全的集合类有哪些?线程不安全的集合类有哪些?ArrayList为什么会发生并发修改异常?

一、Java的集合类有哪些&#xff1f; 二、如何定义集合是线程不安全的&#xff1f; 当多个并发同时对线程不安全的集合进行增删改的时候会破坏这些集合的数据完整性&#xff0c;例如&#xff1a;当多个线程访问同一个集合或Map时&#xff0c;如果有超过一个线程修改了A…

3A企业认定有哪些好处?

企业参与申报和认证有什么益处&#xff1f;这个问题可能应该是广大企业参与前最为关心的问题之一了 1、可快速有效提升企业资质、获得国家政府的认可&#xff1b;并将获得由商务部颁发的具有统一编号的牌匾和证书。 2、是企业履约能力、投标信誉、综合实力与竞争力的体现&…

数学建模--优化类模型

目录 一、根据目标函数约束条件类型分类 1、线性规划 ①线性规划模型的一般形式 ​②用MATLAB优化工具箱解线性规划 ③模型分析 2、非线性规划 ①非线性规划的基本概念 ②非线性规划的基本解法 ③二次规划 ④一般非线性规划 二、控制变量类型分类 1、整数规划 …