线程池源码解析项目中如何配置线程池

news2025/1/9 15:04:17

目录

基础回顾

线程池执行任务流程

简单使用

构造函数

execute方法

execute中出现的ctl属性

execute中出现的addWorker方法

addWorker中出现的addWorkerFailed方法

addWorker中出现的Worker类

Worker类中run方法出现的runWorker方法

runWorker中出现的getTask

runWorker中出现的processWorkerExit

项目中如何配置使用线程池

参考


基础回顾

线程池基础不好的还是要先了解线程池大体知识,不要眼高手低❌

ThreadPoolExecutor线程池有关_明天一定.的博客-CSDN博客

我都忘记了我要写线程池源码相关文章了,填个多年前的坑➿

线程池执行任务流程

线程池新增线程过程

简单使用

看源码,当然要先会简单使用:

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1),
                new ThreadPoolExecutor.CallerRunsPolicy());
        executor.execute(() -> {
            System.out.println("线程池执行");
        });
    }

我们创建了一个ThreadPoolExecutor对象,然后调用execute方法

构造函数

源码中共有4中构造方法

 最终都是调用最后一个构造方法,其他构造都是给出了默认配置,比如默认线程工厂,默认拒绝策略。所以我们只看参数最长的构造

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

 可以看出来有两步,第一步判断参数是否合理,第二步给属性赋值。重要字段具体含义不再赘述,不懂的看文章开头那篇文章。比较生疏的是acc这个属性,他是一个三元表达式去赋值,判断检查操作是否有权限执行,如果有权限则拿到权限控制的上下文,源码中属性注释也说了,该属性用来执行finalizer。

execute方法

这个方法分三步:

  1. 如果运行线程比核心线程数少
  2. 如果任务可以成功的放入队列
  3. 如果任务放入队列失败,我们就新增线程,如果失败则执行拒绝策略或是因为线程池关闭了
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();// 获取当前工作线程数和线程池运行状态
        if (workerCountOf(c) < corePoolSize) { // 判断工作线程是否比核心线程少
            if (addWorker(command, true)) // 新增worker
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) { // 线程池状态是否是running,是的话往阻塞队列加任务
            int recheck = ctl.get(); // 双重检查,因为从上次检查到进入此方法,线程池可能改变状态
            if (! isRunning(recheck) && remove(command)) // 如果当前线程池状态不是RUNNING则从队列删除任务
                reject(command); // 拒绝策略触发
            else if (workerCountOf(recheck) == 0) // wc如果是0时,则新增worker执行queue的任务
                addWorker(null, false);
        }
        else if (!addWorker(command, false))// 阻塞队列已满才会走的逻辑
            reject(command);
    }

execute中出现的ctl属性

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

 可以看出来ctl的类型是原子整数,初始值是状态或上工作线程数。cti共同记录了运行状态和工作线程数。ctl的组成前三位是状态,后29位是表示线程数。

善用位运算可以节省空间(复合值),而在这里我认为是想可以保证状态和数量的统一变化。

顺便补一下线程池的状态和生命周期的转换:

 

execute中出现的addWorker方法

该方法主要分为两步:

  1. cas自旋新增线程
  2. 创建线程实例并且执行任务
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 线程池状态不是running;线程池状态为SHUTDOWN,且要执行的任务不为空;线程池状态为SHUTDOWN,且任务队列为空;都返回失败
            if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                // 工作线程数>=线程池容量 || 工作线程数>=(核心线程数||最大线程数)
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c)) // 线程新增自旋成功
                    break retry; // 退出外层循环
                c = ctl.get();  // Re-read ctl
                // 重新获取状态和之前状态对比,若一样则内层循坏,否则外循环
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false; // 工作线程调用start()方法标志
        boolean workerAdded = false; // 工作线程被添加标志
        Worker w = null;
        try {
            w = new Worker(firstTask); // 创建工作线程实例
            final Thread t = w.thread; // 获取工作线程持有的线程实例
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    // 线程池状态为RUNNING或者(线程池状态为SHUTDOWN并且没有新任务时)
                    if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // 预检查该线程状态
                            throw new IllegalThreadStateException();
                        workers.add(w); // 线程加入到存放工作线程的HashSet容器,workers全局唯一并被mainLock持有。方便索引出线程
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;// 工作线程被添加标志置为true
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start(); // 调用该线程执行任务 
                    workerStarted = true; // 工作线程调用start()方法标志置为true
                }
            }
        } finally {
            if (! workerStarted)  // 如果线程启动失败
                addWorkerFailed(w);
        }
        return workerStarted;
    }

 流程辅助查看

 

addWorker中出现的addWorkerFailed方法

回滚工作线程的创建

private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);// 从hash表移除
            decrementWorkerCount(); // ctl减1
            tryTerminate(); // 尝试把线程池状态变为Terminate
        } finally {
            mainLock.unlock();
        }
    }

addWorker中出现的Worker类

它是ThreadPoolExecutor的内部类,继承了AQS实现了Runnable接口,我们主要关注它的run方法

public void run() {
    runWorker(this);
}

Worker类中run方法出现的runWorker方法

真正执行任务的方法,通过getTask从队列拿任务

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask; // 获取工作线程中用来执行任务的线程实例
        w.firstTask = null;
        w.unlock(); // 允许中断
        boolean completedAbruptly = true; // 线程意外终止标志
        try {
            // 如果当前任务不为空,则直接执行;否则调用getTask()从任务队列中取出一个任务执行
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // 如果状态值大于等于STOP且当前线程还没有被中断,则主动中断线程
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task); // 任务执行前的回调,空实现,可以在子类中自定义
                    Throwable thrown = null;
                    try {
                        task.run(); // 执行线程任务
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown); // 任务执行后的回调,空实现,可以在子类中自定义
                    }
                } finally {
                    task = null; // 将循环变量task设置为null,表示已处理完成
                    w.completedTasks++; // 当前已完成的任务数+1
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly); // 工作线程退出
        }
    }

runWorker流程图如下

runWorker中出现的getTask

根据配置,对任务进行阻塞或超时等待。

    private Runnable getTask() {
        boolean timedOut = false; // // 通过timeOut变量表示拿出的线程是否超时了

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 检查线程池状态以及阻塞队列的大小
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // 当前线程是否允许超时销毁的标志
            // 允许超时销毁:当线程池允许核心线程超时 或 工作线程数>核心线程数
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            // 如果(当前线程数大于最大线程数 或 (允许超时销毁 且 当前发生了空闲时间超时))
            // 且(当前线程数大于1 或 阻塞队列为空)
            // 则减少worker计数并返回null
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
               // 根据线程是否允许超时判断用poll还是take(会阻塞)方法从任务队列头部取出一个任务
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

runWorker中出现的processWorkerExit

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // 如果中断,调整减少worker的数量
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w); // 从工作线程集合中移除该工作线程
        } finally {
            mainLock.unlock();
        }

        tryTerminate(); // 尝试把线程状态变为terminate

        int c = ctl.get();
        // 如果是RUNNING 或 SHUTDOWN状态
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                // 如果允许核心线程超时则最小线程数是0,否则最小线程数等于核心线程数
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                // 如果阻塞队列非空,则至少要有一个线程继续执行剩下的任务
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false); // 重新创建一个worker来代替被销毁的线程
        }
    }

项目中如何配置使用线程池

用过线程池的都知道,那些核心参数是真的不好确定,需要做大量修改、发布、验证这套工作。市面上常说的是分io密集型和cpu密集型,但是具体参数不是那么好确定的,比如线程池设置为 2*CPU 核心数,有点像是把任务都当做 IO 密集型去处理了。而且一个项目里面一般来说不止一个自定义线程池吧?比如有专门处理数据上送的线程池,有专门处理查询请求的线程池,这样去做一个简单的线程隔离。但是如果都用这样的参数配置的话,显然是不合理的。

所以我们需要一个可动态配置的线程池,可以自己写一个模块,也可以用已经开源的dynamic-tp或者hippo4j。具体实现还是利用ThreadPoolExecutor中的一些set方法

关于队列的动态调整:美团他们有一个名字为 ResizableCapacityLinkedBlockIngQueue 的队列:很明显,这是一个自定义队列了。我们也可以按照这个思路自定义一个队列,让其可以对 Capacity 参数进行修改即可。把 LinkedBlockingQueue 粘贴一份出来,修改个名字,然后把 Capacity 参数的 final 修饰符去掉,并提供其对应的 get/set 方法。然后在程序里面把原来的队列换掉即可。

不过比较好的是开源项目自带监控告警和配置文件配置会比较全面一点。具体配置还是得做大量修改、发布、验证。不过也可以从网上常说的理论值入手去尝试。

参考

【超详细】Java线程池源码解析 - 掘金 (juejin.cn) 

Java线程池实现原理及其在美团业务中的实践 - 美团技术团队 (meituan.com)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/352438.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Websocket详细介绍

需求背景 在某个资产平台&#xff0c;在不了解需求的情况下&#xff0c;我突然接到了一个任务&#xff0c;让我做某个页面窗口的即时通讯&#xff0c;想到了用websocket技术&#xff0c;我从来没用过&#xff0c;被迫接受了这个任务&#xff0c;我带着浓烈的兴趣&#xff0c;就…

MinIO

概述MinIO 是一个基于Apache License v2.0开源协议的对象存储服务。它兼容亚马逊S3云存储服务接口&#xff0c;非常适合于存储大容量非结构化的数据&#xff0c;例如图片、视频、日志文件、备份数据和容器/虚拟机镜像等&#xff0c;而一个对象文件可以是任意大小&#xff0c;从…

Redis学习【7】之发布_订阅命令和事务

文章目录一 发布/订阅命令1.1 消息系统1.2 subscribe1.3 psubscribe1.4 publish1.5 unsubscribe1.6 punsubscribe1.7 pubsub1.7.1 pubsub channels1.7.2 pubsub numsub1.7.3 pubsub numpat二 Redis 事务2.1 Redis 事务特性Redis 事务实现2.1.1 三个命令2.1.2 基本使用2.2. Redi…

家用洗地机哪款质量好?洗地机排行榜

伴随着人们消费水平和生活品质的提升&#xff0c;人们对家庭中的需求也随之提高&#xff0c;洗地机凭借着吸拖洗为一体的功能深受大众喜爱&#xff0c;但是市面上洗地机产品越来越多&#xff0c;清洁效果也参差不齐&#xff0c;到底哪款洗地机质量好呢&#xff0c;跟随笔者脚步…

FILE文件操作

文件指针 每个被使用的文件都在内存中开辟了一个相应的文件信息区&#xff0c;用来存放文件的相关信息&#xff08;如文件的名 字&#xff0c;文件状态及文件当前的位置等&#xff09;。这些信息是保存在一个结构体变量中的。该结构体类型是有系统 声明的&#xff0c;取名FILE…

ecaozzz

2. 图形报表ECharts 2.1 ECharts简介 ECharts缩写来自Enterprise Charts&#xff0c;商业级数据图表&#xff0c;是百度的一个开源的使用JavaScript实现的数据可视化工具&#xff0c;可以流畅的运行在 PC 和移动设备上&#xff0c;兼容当前绝大部分浏览器&#xff08;IE8/9/10/…

面试完阿里,字节,腾讯的测试岗,复盘以及面试总结

前段时间由于某些原因辞职了&#xff0c;最近一直在面试。面试这段时间&#xff0c;经历过不同业务类型的公司&#xff08;电商、酒店出行、金融、新能源、银行&#xff09;&#xff0c;也遇到了很多不同类型的面试官。 参加完三家大厂的面试聊聊我对面试的一些看法&#xff0…

AWS攻略——子网

文章目录分配子网给Public子网分配互联网网关创建互联网网关附加到VPC给Public子网创建路由表关联子网打通Public子网和互联网网关创建Public子网下的EC2进行测试配置Private子网路由给Private子网创建路由表附加在Private子网创建Private子网下的EC2进行测试创建实例在跳板机上…

Mybatis 之useGeneratedKeys注意点

一.例子 Order.javapublic class Order {private Long id;private String serial; }orderMapper.xml<?xml version"1.0" encoding"UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org/DTD Mapper 3.0" "http://mybatis.org/dtd…

java学习--多线程

多线程 了解多线程 ​ 多线程是指从软件或者硬件上实现多个线程并发执行的技术。 ​ 具有多线程能力的计算机因有硬件支持而能够在同一时间执行多个线程&#xff0c;提升性能。 并发和并行 并行&#xff1a;在同一时刻&#xff0c;有多个指令在CPU上同时执行并发&#xff1…

20230217使AIO-3399J开发板上跑通Android11系统

20230217使AIO-3399J开发板上跑通Android11系统 2023/2/17 15:45 1、解压缩SDK&#xff1a;rk3399-android-11-r20211216.tar.xzrootrootrootroot-X99-Turbo:~$ tar xvf rk3399-android-11-r20211216.tar.xz 2、编译U-boot&#xff1a; rootrootrootroot-X99-Turbo:~/rk3399-a…

没有接口文档的怎样进行接口测试

前言&#xff1a; 在进行接口测试之前&#xff0c;一般开发会提供接口文档&#xff0c;给出一些接口参数和必要熟悉&#xff0c;便于我们编写接口脚本。但如果没有提供接口开发文档的请求下&#xff0c;我们该如何编写接口测试脚本呢&#xff1f;在编写测试脚本前要做哪些必要…

一台电脑安装26个操作系统(windows,macos,linux)

首先看看安装了哪些操作系统1-4: windows系统 四个5.Ubuntu6.deepin7.UOS家庭版8.fydeOS9.macOS10.银河麒麟11.红旗OS12.openSUSE Leap13.openAnolis14.openEuler(未安装桌面UI)15.中标麒麟&#xff08;NeoKylin&#xff09;16.centos17.debian Edu18.fedora19.oraclelinux20.R…

CCNP350-401学习笔记(1-50题)

1、Which function does a fabric edge node perform in an SD-Access deployment?A. Connects endpoints to the fabric and forwards their traffic. B. Encapsulates end-user data traffic into LISP. C. Connects the SD-Access fabric to another fabric or external La…

YOLOv5:GitHub两万八Star项目

来源&#xff1a;投稿 作者&#xff1a;王同学 编辑&#xff1a;学姐 Yolov5详解 官方源码仓库&#xff1a;https://github.com/ultralytics/yolov5 相关论文&#xff1a;未发表&#xff08;改进点都被你们抢先发了&#xff09; 0 前言 截止到2022年7月&#xff0c;Yolov5项…

docker目录迁移流程

概述 在安装测试最新版本的HOMER7的过程中&#xff0c;docker作为基础工具碰到一些问题&#xff0c;针对问题进行总结。 docker的默认工作目录在/var目录&#xff0c;而在我们的环境中&#xff0c;/var目录空间预留不足&#xff0c;随着docker的运行&#xff0c;/var目录空间…

WiFi网络带宽、流量监控管理

当您的组织拥有越来越多的有线和无线设备时&#xff0c;有必要在预算、性能和安全性之间取得准确的平衡。尽管无线设备可以为用户提供灵活性&#xff0c;但鉴于其动态性质&#xff0c;发现和管理这些设备可能极具挑战性。 为用户提供功能性无线网络性能&#xff0c;同时确保没…

Spire.Office 8.2.2 for NET 开年之喜

Spire.Office for .NET对文档的操作包括打开&#xff0c;创建&#xff0c;修改&#xff0c;转换&#xff0c;打印&#xff0c;浏览 Word、Excel、PowerPoint 和 PDF 文档&#xff0c;以及将数据从数据源导出为常用的文档格式&#xff0c;如&#xff1a;Word&#xff0c;Excel&a…

每天10个前端小知识 【Day 16】

&#x1f469; 个人主页&#xff1a;不爱吃糖的程序媛 &#x1f64b;‍♂️ 作者简介&#xff1a;前端领域新星创作者、CSDN内容合伙人&#xff0c;专注于前端各领域技术&#xff0c;成长的路上共同学习共同进步&#xff0c;一起加油呀&#xff01; ✨系列专栏&#xff1a;前端…

【老王读Spring Transaction-6】spring-tx与DataSource连接池整合的原理

06spring-tx与DataSource连接池的原理前言Spring 版本正文事物连接: Spring-managed transactional Connectionsspring-tx 与 DataSource 的整合SpringBoot 默认使用的 DataSource 连接池是 Hikari事物连接的关闭/释放: 并不是真正的关闭小结前言 javax.sql.DataSource 是 jav…