ThreadPoolExecutor 线程池源码学习

news2024/9/23 21:19:02

ThreadPoolExecutor 线程池源码学习

在这里插入图片描述

1.阅读源码

1.ThreadPoolExecutor.execute

   public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // ctl 高三位记录线程状态。低29位记录线程池中线程数
        int c = ctl.get();
        //位运算获取工作线程数 如果少于核心线程数,继续创建核心线程
        if (workerCountOf(c) < corePoolSize) {
        		//当前线程池添加线程任务  两个参数的意义放后面在看
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果当前线程池状态为 RUNNING 状态,将当前线程任务添加到等待队列中。调用的offer方法,队满返回false
        if (isRunning(c) && workQueue.offer(command)) {
        		//再次获取ctl
            int recheck = ctl.get();
            //如果当前线程池状态已经发送变更 不在是 RUNNING 状态 ,则调用remove方法删除刚刚添加到队列中的线程任务
            if (! isRunning(recheck) && remove(command))
            		//调用拒绝策略
                reject(command);
            //如果当前活跃线程数等于0
            else if (workerCountOf(recheck) == 0)
            		//当前线程池添加线程任务
                addWorker(null, false);
        }
        //添加非核心线程
        else if (!addWorker(command, false))
            reject(command);
    }

2. ThreadPoolExecutor.addWorker

 private boolean addWorker(Runnable firstTask, boolean core) {
 				//标记外部循环
        retry:
        for (;;) {
            int c = ctl.get();
            //获取线程池状态
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            //线程池状态检查
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
						
						//内部循环
            for (;;) {
            		//获取活跃线程数
                int wc = workerCountOf(c);
                //如果大于最大上限 或 大于 核心线程数
                if (wc >= CAPACITY ||
                		// core -> addworker方法的第二个参数
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    // 添加失败返回
                    return false;
                //将当前线程池活跃线程数加一
                if (compareAndIncrementWorkerCount(c))
                		//跳出外部循环
                    break retry;
                //如果活跃线程数修改失败  再次检查线程池状态
                c = ctl.get();  // Re-read ctl
                //发生变化
                if (runStateOf(c) != rs)
                //结束本次外部循环,进入下一次循环,直到修改成功或返回false
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

				//结束循环到此
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
        		//创建新的线程任务
            w = new Worker(firstTask);
            //从worker中获取当前线程
            final Thread t = w.thread;
            //如果不为null
            if (t != null) {
            		//上锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    //获取线程池状态
                    int rs = runStateOf(ctl.get());
										//如果线程池状态是Running 或 (SHUTDOWN,但是任务为null)
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        //当前线程是活跃的(新建线程不会被启动)
                        if (t.isAlive()) // precheck that t is startable
                        		//抛出异常
                            throw new IllegalThreadStateException();
                        //添加到workers容器中
                        workers.add(w);
                        //获取当前线程池中所有的worker数量
                        int s = workers.size();
                        //记录历史最大线程数	
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                		//解锁
                    mainLock.unlock();
                }
                if (workerAdded) {
                		//启动当前线程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

3.Worker.run

public void run() {
            runWorker(this);
        }	
 final void runWorker(Worker w) {
 				//当前线程
        Thread wt = Thread.currentThread();
        //线程任务
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
        		//如果当前worker包装的task不为null,执行当前task 如果为null,则调用getTask()从阻塞队列中获取线程任务
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                		//线程任务执行的前置方法  默认空方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                    		//真正执行线程任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
        		//线程任务的后置处理方法
            processWorkerExit(w, completedAbruptly);
        }
    }

4.ThreadPoolExecutor.getTask

private Runnable getTask() {
				//是否超时
        boolean timedOut = false; 

        for (;;) {
            int c = ctl.get();
            //当前线程池状态
            int rs = runStateOf(c);

           
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

						//当前线程池中活跃的线程数量
            int wc = workerCountOf(c);

            // allowCoreThreadTimeOut 是否需要进行超时控制 默认为false
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
						
						//compareAndDecrementWorkerCount 是将活跃线程数减一
						//满足以下情况将执行 :
						// 1.活跃线程大于最大线程数 或者 上次从队列中获取超时
						// 2.活跃线程大于1,但等待队列是空的
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                //将活跃线程数减一
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
            		//上次从等待队列中获取是否超时? poll 队列为空时超时等待 : take 队列为空时一直等待
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                		//获取成功返回线程任务
                    return r;
                //获取失败标记超时
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

5.ThreadPoolExecutor.processWorkerExit

 private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            //从Set集合中移除已执行run方法的 worker,让jvm进行回收
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
        		//是否抛出异常
            if (!completedAbruptly) {
            		//线程替换逻辑判断
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            //woker异常补偿
            addWorker(null, false);
        }
    }

2. 源码分析

1.核心线程和非核心线程的区别

//添加核心线程任务 command 为线程任务
addWorker(command, true)

//添加非核心线程任务 command 为线程任务
addWorker(command, false)

创建时机

 //线程池中活跃线程数少于核心线程数则调用addWorker(command, true)创建核心线程
 if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
        			....
        }
if (isRunning(c) && workQueue.offer(command)) {

// workqueue调用offer方法返回false则会调用addWorker(command, false),尝试创建非核心线程
}else if (!addWorker(command, false))


}

消亡时机 java.util.concurrent.ThreadPoolExecutor#getTask

  for (;;) {
  	boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  	
  	 if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            
    
      try {
          Runnable r = timed ?
              workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
              workQueue.take();
          if (r != null)
              return r;
          timedOut = true;
      } catch (InterruptedException retry) {
          timedOut = false;
      }
  	
  	}

当创建的是核心线程时,如果没有设置allowCoreThreadTimeOut的值true(默认值为false),则将调用queue.take方法,一直阻塞,直到队列中有任务。如果是临时线程,则调用queue.poll方法,在规定时间内未从队列中获取到线程任务,在将活跃线程数量减去一后直接返回null。只要不返回null,将会在java.util.concurrent.ThreadPoolExecutor#runWorker 一直循环获取任务并执行,而一旦超时返回null,将跳出runWorker方法,调用processWorkerExit 对临时线程所属的worker进行回收。

runWorker:

 while (task != null || (task = getTask()) != null){

		getTask:
			for (;;) {
			
			
			}

}

2.线程任务抛出异常怎么办

阅读源码部分我们看的是调用线程的execute方法,java.util.concurrent.ThreadPoolExecutor#runWorker 抛出异常后,completedAbruptly为true,java.util.concurrent.ThreadPoolExecutor#processWorkerExit

boolean completedAbruptly = true;
try {

	  try {
	  		task.run();
	  } catch (RuntimeException x) {
              thrown = x; throw x;
          } catch (Error x) {
              thrown = x; throw x;
          } catch (Throwable x) {
              thrown = x; throw new Error(x);
          } finally {
              afterExecute(task, thrown);
          }


		 completedAbruptly = false;

} finally {
            processWorkerExit(w, completedAbruptly);
        }
  try {
            completedTaskCount += w.completedTasks;
       				//移除老的worker
       				workers.remove(w);
        } finally {
            mainLock.unlock();
        }
 
 if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            //添加一个新的worker。注意参数一task为null,将直接去队列中获取线程任务
            addWorker(null, false);
        }

代码演示

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                10,
                10,3,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(100),
                new DefaultThreadFactory("main"),
                new ThreadPoolExecutor.DiscardPolicy()
        );

        for (int i = 0; i < 2; i++) {
            threadPoolExecutor.execute(()->{
                System.out.println("Thread.currentThread().getName() = " + Thread.currentThread().getName());
                throw new RuntimeException();
            });
        }
        
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }


        for (int i1 = 0; i1 < 2; i1++) {
            threadPoolExecutor.execute(()-> {
                System.out.println("Thread.currentThread().getName() = " + Thread.currentThread().getName());
            });
        }

输出结果. 3,4线程由于1,2抛出异常,已经被创建

Thread.currentThread().getName() = main-1-2
Thread.currentThread().getName() = main-1-1
Exception in thread "main-1-1" Exception in thread "main-1-2" java.lang.RuntimeException
	at org.example.Main.lambda$main$0(Main.java:36)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
	at java.lang.Thread.run(Thread.java:750)
java.lang.RuntimeException
	at org.example.Main.lambda$main$0(Main.java:36)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
	at java.lang.Thread.run(Thread.java:750)
Thread.currentThread().getName() = main-1-5
Thread.currentThread().getName() = main-1-6

所以当使用executor执行线程任务时,要避免抛出异常,对可能出现的异常try{}catch{}包裹处理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/735322.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

wireshark抓包实践

目录 ifconfig ( network interfaces configuring )tcpdump 命令tcpdump&wireshark例子 ifconfig ( network interfaces configuring ) eth0表示网卡UP代表网卡开启状态RUNNING代表网卡的网线被接上mtu1500: MTU&#xff08;最大传输单元&#xff09;是指在网络中传输数据时…

【javaEE面试题(五)在JMM(Java Memory Model (Java 内存模型))下谈volatile的作用】

volatile的作用 JMM下volatile作用 volatile 能保证内存可见性 volatile 修饰的变量, 能够保证 “内存可见性”. 代码在写入 volatile 修饰的变量的时候 改变线程工作内存中volatile变量副本的值将改变后的副本的值从工作内存刷新到主内存 代码在读取 volatile 修饰的变量的时…

B067-基础环境-抽取Basegit

目录 抽取base抽取domain和querymapper接口抽取service抽取 Git优点&#xff1a;Git安装及操作Git Bash命令行操作图形化客户端TortoiseGit操作Git集成Idea操作idea会把workspace作为本地仓库gitee操作idea解决代码冲突 抽取base 抽取domain和query domain&#xff1a;所有实体…

Nodejs 依赖包的存放路径设置(按其他博客修改路径后,安装路径仍在C盘的解决办法)

Nodejs 依赖包的存放路径设置 使用命令npm root -g 查看依赖包的安装位置 默认依赖包的安装位置是在C盘。为了防止C盘存太多东西&#xff0c;我这里已经将安装位置改到了D盘&#xff0c;下面就记录下修改的步骤。 1. 创建新的依赖包安装目录 在 nodejs 的安装目录下创建两个新…

8年资深测试总结,性能测试+性能优化(详细)进军高级测试...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 性能优化常见概念…

LabVIEW FPGA利用响应式数字电子板快速开发空间应用程序

LabVIEW FPGA利用响应式数字电子板快速开发空间应用程序 与传统的基于文本的语言相比&#xff0c;LabVIEW的编程和设计已被证明可以缩短开发时间。各种研究表明&#xff0c;生产率的提高在3到10倍之间。LabVIEW通过图形语言、集成开发环境和多个编译器的组合来实现这一点。 图…

qt对话框

完善文本编辑器 #include "second.h" #include "ui_second.h"second::second(QWidget *parent) :QWidget(parent),ui(new Ui::second) {ui->setupUi(this);this->setWindowTitle("聊天室界面");//设置标题this->setWindowIcon(QIcon(&…

边缘检测

目录 1、边缘检测原理 2、Sobel算子边缘检测 3、Scharr算子边缘检测​编辑 4、算子生成函数 5、Scharr、Sobel的使用 6、Laplacian算子边缘检测 7、Canny算子边缘检测 8、Laplacian、Canny的使用 1、边缘检测原理 2、Sobel算子边缘检测 3、Scharr算子边缘检测 4、算子生成函…

MySQL数据库 - 表的操作

目录 一、创建表 二、创建表案例 1、显示当前使用的数据库名 2、创建表 2.1 MyISAM存储引擎表 2.2 InnoDB存储引擎表 三、查看表结构 四、修改表 1、新增列 2、修改列类型 3、修改列名 4、修改表名 5、删除列 五、删除表 表的操作至少会涉及如下两类SQL语句&…

adb日常使用命令

重启电脑adb服务 adb start-server和adb kill-server mac中uiautoviewer的位置 android-sdk→tools→bin→uiautomatorviewer.bat adb查看本机abi类型 adb shell getprop ro.product.cpu.abi github 比较好的adb教程&#xff1a; https://github.com/mzlogin/awesome-adb a…

[VUE学习]权限管理系统前端vue实现9-动态路由,动态标签页,动态面包屑

1.动态路由 1.因为我们左侧权限菜单是根据不同用户显示不一样的 所以我们需要使用动态路由 来动态生成右侧路由信息 在总体布局页面添加router <router-view> 是 Vue Router 提供的组件&#xff0c;用于动态展示匹配到的路由组件内容。通过在合适的位置放置 <router-v…

将word中超链接的字体颜色更换成白色

文章目录 1、问题描述2、解决方法&#xff08;两种&#xff09;2.1 临时修改2.2 永久修改 1、问题描述 超链接是蓝色&#xff0c;需要将其换成正常颜色的字体 2、解决方法&#xff08;两种&#xff09; 2.1 临时修改 直接选中该字体&#xff0c;从字体的颜色那里选主题颜色…

zabbix安装监控客户端应用

添加 zabbix 客户端主机 服务端和客户端都配置时间同步 服务端和客户端都设置 hosts 解析 设置 zabbix 的下载源&#xff0c;安装 zabbix-agent2 在服务端验证 zabbix-agent2 的连通性 ​编辑 在 Web 页面中添加 agent 主机 自定义监控内容 在客户端创建自定义 key 1.明确…

XSS学习

目录 什么是XSS 概念 理解 XSS分类 存储型XSS 反射型XSS 原理 攻击过程 DOM型 攻击过程 DOM行XSS与反射型XSS区别 存储型XSS与反射型XSS区别 DVWA实验 反射型XSS low等级 JavaScript弹窗函数 攻击思路 攻击者web设计 medium等级 high等级 impissible等级 …

【ES6】中构造函数的语法糖 —— Class(类)

在现代前端开发中&#xff0c;JavaScript的面向对象编程成为了主流。ES6引入了class关键字&#xff0c;使得开发者可以更方便地使用面向对象的方式编写代码&#xff0c;更接近传统语言的写法。ES6的class可以看作是一个语法糖&#xff0c;它的绝大部分功能ES5都可以做到&#x…

Java基础---动态代理

目录 典型回答 静态代理和动态代理的区别 动态代理的用途 Spring AOP的实现方式 JDK 动态代理的代码段 Cglib动态代理的代码段 典型回答 动态代理就是&#xff0c;在程序运行期&#xff0c;创建目标对象的代理对象&#xff0c;并对目标对象中的方法进行功能性增强的一种技…

electron+vue3全家桶+vite项目搭建【22】vite定义编译时全局变量,用于渲染进程判断当前是否为打包环境

引入 demo项目地址 我们在本地运行时往往显示的是一些方便调试的页面&#xff0c;如下所示: 通过页面路由选择&#xff0c;快速打开不同的窗口 而当我们打包运行时&#xff0c;往往希望直接进入软件的主页&#xff0c;而不显示这些调试页面&#xff0c;也许你会觉得&#xf…

设计模式之三:装饰者模式

装饰者模式可以在不修改任何底层代码的情况下&#xff0c;给对象赋予新的职责&#xff08;使用对象组合的方式&#xff0c;在运行时装饰类&#xff09;。 假定星巴兹咖啡需要更新订单系统&#xff0c;而他们原先类的设计如图&#xff1a; 现在他们考虑客户可以选择添加调料&am…

day62_ssm事务

今日内容 零、 复习昨日 零、 复习昨日 excel导入导出,cv配置和方法 aop: 面向切面编程 抽取与业务无关的代码,比如日志记录,事务控制,权限校验等,形成一个切面 利用动态代理的技术将切面中的增强方法,作用到目标方法上 aop日志 日志注解切面类 切入注解获得时间,ip,session中的…

跟我一起从零开始学python(四)数据库编程:MySQL数据库

前言 回顾之前讲了python语法编程 &#xff0c;必修入门基础和网络编程&#xff0c;多线程/多进程/协程等方面的内容&#xff0c;今天到了数据库编程篇&#xff0c;前面没看的也不用往前翻&#xff0c;系列文已经整理好了&#xff1a; 1.跟我一起从零开始学python&#xff08…