ThreadPoolExecutor源码阅读流程图

news2024/11/25 18:30:08

1.创建线程池

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

10–核心线程数
20–最大线程数
1 TimeUnit.MINUTES 非核心线程数存活时间 1分钟
new ArrayBlockingQueue(100) 阻塞队列类型 数组类型传入队列长度,需要无限长度可以使用链表类型LinkedBlockingDeque
线程工厂ThreadFactory和超出队列的策略ThreadPoolExecutor.AbortPolicy(抛出异常)暂时先按默认来。

ThreadPoolExecutor executor = new ThreadPoolExecutor
(10,20,1, TimeUnit.MINUTES,new ArrayBlockingQueue<Runnable>(100));

创建线程池的时候是不会启动线程的,需要在执行具体业务逻辑时候才会执行

2.ThreadPoolExecutor重要参数及方法介绍

//ctl Int原子操作类,32位,前三位代表线程池状态,后28位记录线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

//线程池5种状态
//RUNNING状态【可提交新任务】和【可执行阻塞队列中的任务】
private static final int RUNNING    = -1 << COUNT_BITS;
//SHUTDOWN状态【不可提交新任务】提交新任务会抛出异常和【可执行阻塞队列中的任务】
private static final int SHUTDOWN   =  0 << COUNT_BITS;//执行shutDown()方法
//STOP状态【不可提交新任务】和【不可执行阻塞队列中的任务】
private static final int STOP       =  1 << COUNT_BITS;//执行shutDownNow()方法
//TIDYING状态 所有任务都终止了,线程池中也没有线程了,这样线程池的状态就会转为TIDYING,一旦达到此状态,就会调用线程池的terminated()
private static final int TIDYING    =  2 << COUNT_BITS;
//TERMINATED状态 terminated()执行完之后就会转变为TERMINATED
private static final int TERMINATED =  3 << COUNT_BITS;

//获取线程池状态    
private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
//获取当前工作线程数
private static int workerCountOf(int c)  { return c & COUNT_MASK; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

2.1线程的五种状态

  • RUNNING状态【可提交新任务】和【可执行阻塞队列中的任务】 11100000 00000000 00000000 00000000
  • SHUTDOWN状态【不可提交新任务】提交新任务会抛出异常和【可执行阻塞队列中的任务】
    00000000 00000000 00000000 00000000
  • STOP状态【不可提交新任务】和【不可执行阻塞队列中的任务】 00100000 00000000 00000000 00000000
  • TIDYING状态 所有任务都终止了,线程池中也没有线程了,这样线程池的状态就会转为TIDYING,一旦达到此状态,就会调用线程池的terminated()
    00100000 00000000 00000000 00000000
  • TERMINATED状态 terminated()执行完之后就会转变为TERMINATED 01100000 00000000 00000000 00000000
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}

private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}

private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

/**
 * Attempts to CAS-increment the workerCount field of ctl.
 * 通过CAS来对当前工作线程数增加
 */
private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}

/**
 * Attempts to CAS-decrement the workerCount field of ctl.
 * 通过CAS来对当前工作线程数减少
 */
private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1);
}

任务执行流程图

在这里插入图片描述

3.提交任务execute

executor.execute(new Runnable() {
    @Override
    public void run() {
        //业务代码
    }
});
   public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //获取ctl 初始值为ctlOf(RUNNING, 0) 运行状态,工作线程数0
        int c = ctl.get();
        //计算获取工作线程数<核心线程数
        if (workerCountOf(c) < corePoolSize) {
        	//当前command增加为核心工作线程,添加失败下面会进行入队操作
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //判断线程池状态(判断是因为防止别的线程把状态进行修改)
        //workQueue.offer(command) 加入队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //再对线程池状态二次检查,如果不是running则移除队列
            if (! isRunning(recheck) && remove(command))
            	//拒绝策略,默认抛出异常
                reject(command);
            else if (workerCountOf(recheck) == 0)
            	//这里就是执行队列中的任务,下面addWorker里面有体现和讲解
                addWorker(null, false);
        }
        //线程池达到最大了的maxPool,添加失败执行拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

3.1 submit

Future<?> submit = executor.submit(new Runnable() {
    @Override
    public void run() {
        //业务代码
    }
});

这个里面执行了execute,多了一个返回Future

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

4.addWorker

这里不同版本jdk有差异

private boolean addWorker(Runnable firstTask, boolean core) {
    //类似于goto
    retry:
    for (int c = ctl.get();;) {
        // 线程池状态>=SHUTDOWN 并且 线程池状态>=STOP或者传入的任务!=null或者阻塞队列为空则返回
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty()))
            return false;

        for (;;) {
        	//
        	//判断工作的线程是否超过核心线程数或者最大线程数,addWork时候会传入core
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
                //如果没有超过核心线程数或者最大线程数,这里通过cas对工作线程数量增加,多个竞争失败的话循环cas操作
            if (compareAndIncrementWorkerCount(c))
                break retry;//跳出外层循环
            c = ctl.get();  // Re-read ctl
            //如果线程池状态>=SHUTDOWN 跳到外层循环继续执行
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
    	//新建任务Worker,会利用线程工厂去创建一个线程默认的是
    	/**
    	  *  Worker(Runnable firstTask) {
    	  *  //这个状态有0 -1 1 创建时候为-1,运行时候改为1,运行结束改为0
    	  *  // 正常应该是acquire时候+1  release时候-1 这里重写过方法
          *  setState(-1); 
          *  this.firstTask = firstTask;
          *  this.thread = getThreadFactory().newThread(this); }
    	**/
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
        	//这里的mainLock是对Workers进行操作的,防止出现并发问题
        	//用锁是因为private final HashSet<Worker> workers = new HashSet<>(); 这个不是线程安全的
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int c = ctl.get();
				//线程池如果是RUNNING状态
				// 或者状态<STOP并且传入的任务为空 这个是从阻塞队列里面拿任务执行
                if (isRunning(c) ||
                    (runStateLessThan(c, STOP) && firstTask == null)) {
                    if (t.isAlive()) // 如果线程已经在运行,就抛出异常
                        throw new IllegalThreadStateException();
                    //添加任务到工作线程的容器里
                    workers.add(w);
                    int s = workers.size();
                    //largestPoolSize 这个是记录工作线程数,没看到具体作用,但既然有肯定是有用的
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            //这里才到线程运行
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
    	//这里类似于一个回滚操作,异常情况会对worker进行移除,修改ctl
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

5.Worker相关

Worker类

5.1 构造器

   Worker(Runnable firstTask) {
    //这个状态有0 -1 1 创建时候为-1,运行时候改为1,运行结束改为0
   // 正常应该是acquire时候+1  release时候-1 这里重写过方法
   setState(-1); 
   this.firstTask = firstTask;
   this.thread = getThreadFactory().newThread(this); 
  }

5.2 tryAcquire和tryRelease

重写过从+1,-1变成cas为1和设置为0,0代表执行完任务空闲,1代表在执行任务,里面有个

protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}
protected boolean tryRelease(int unused) {
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
}

5.3 runWork

public void run() {runWorker(this);}
    
    final void runWorker(Worker w) {
    	//获取当前工作线程
        Thread wt = Thread.currentThread();
        //获取需要执行的任务
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); 
        boolean completedAbruptly = true;
        try {
        	//执行任务不为空 或者 队列中获取到了需要执行的任务
        	//如果没有获取到getTask是会阻塞的
            while (task != null || (task = getTask()) != null) {
                w.lock();
				//如果线程池状态>=STOP 并且当前线程没有被打断
				//线程池被打断并且线程池状态>=STOP 并且当前线程没有被打断
				//这里是对线程池状态作验证,如果状态发生了变更则要去尝试中断线程
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                //执行前切面 可以用来记录工作中线程和计算空闲线程,Tomcat线程池有这个行为
                    beforeExecute(wt, task);
                    try {
                        task.run();
                        //执行后或异常切面
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    //执行任务数
                    w.completedTasks++;
                    w.unlock();
                }
            }
            //正常执行才会为false,表示正常退出
            completedAbruptly = false;
        } finally {
        	//执行失败completedAbruptly为true
            processWorkerExit(w, completedAbruptly);
        }
    }    

5.4 getTask()

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
            // 线程池状态不为RUNNING,队列为空就不需要处理任务了,直接返回空,上层runWorker也会正常退出循环
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            //工作中的线程数量
            int wc = workerCountOf(c);
            // 核心线程是否超时回收标志,可以通过executor.allowCoreThreadTimeOut(true);设置
            //工作线程数量>核心线程数量
            //用来判断是否是无限阻塞
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            //大于最大线程数或者超时 并且 工作线程数量>1或者队列为空 则ctl减少
   		    // && (wc > 1 || workQueue.isEmpty()) 这个判断就是要留下至少一个线程去处理队列中的任务
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
            //超时阻塞和无限阻塞
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                //超时,循环时会去处理返回null
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

6.shutdown()

shutdown会把线程池状态修改为SHUTDOWN,提交新任务会抛出异常,但会继续执行队列中的任务。

  public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        //修改状态为SHOUTDOWN,并修改ctl
        advanceRunState(SHUTDOWN);
        //这里会中断工作中的线程
        interruptIdleWorkers();
        onShutdown(); // 空方法
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}
//中间还有个方法,传入的onlyOne为false
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    	//遍历Workers,遍历前加锁
        for (Worker w : workers) {
            Thread t = w.thread;
            //把没有被打断并且没有在工作中的线程打断
            //获取到锁说明线程是空闲的,没有获取到锁说明在执行任务
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

7.shutdownNow()

shutdownNow会把线程池状态修改为STOP,提交新任务会抛出异常,也不执行队列中的任务。但会返回队列中的任务。

List<Runnable> runnables = executor.shutdownNow();
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
		//修改状态为STOP,并修改ctl
        advanceRunState(STOP);
        //中断线程
        interruptWorkers();
        //返回队列中的任务
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    //最后一个线程结束时候会把线程池状态改为TERMINATED
    tryTerminate();
    return tasks;
}
private void interruptWorkers() {
    //中断所有工作线程
    for (Worker w : workers)
        w.interruptIfStarted();
}
      void interruptIfStarted() {
        Thread t;
        //getState() >= 0 代表空闲线程和正常执行中的线程,不为空并且没有被打断的就执行打断
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/459919.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

shell脚本编程规范与变量

目录 一.shell脚本的概述2.1 shell的作用 三. shell脚本的作用3.1 编写第一个shell脚本3.1.1 Shell 脚本的构成&#xff1a;3.1.2 脚本的执行方式 三. 重定向与管道符操作3.2 重定向操作3.2 管道操作符号 四. shell的变量的作用&#xff0c;类型4.1 定义变量4.2 命名的规则4.3 …

辛弃疾最有代表性的十首词

辛弃疾的词&#xff0c;风格多样&#xff0c;题材广阔&#xff0c;几乎涉及到生活中的各个方面&#xff0c;从爱国情怀到日常生活&#xff0c;甚至连戒酒这种事都能写入词中。辛弃疾也是两宋词人中&#xff0c;存词最多的作家之一&#xff0c;现存的六百多首作品。 辛弃疾的词…

【数据结构:线性表】单链表

在学习了顺序表&#xff0c;我们可能会对其有一些思考&#xff1a; 中间/头部的插入删除&#xff0c;时间复杂度为O(N)增容需要申请新空间&#xff0c;拷贝数据&#xff0c;释放旧空间。会有不小的消耗。增容一般是呈2倍的增长&#xff0c;势必会有一定的空间浪费。例如当前容…

第四次工业革命的里程碑-chatgpt

文章目录 一、 介绍二、 训练数据方法、数据来源三、 能帮你做什么做表格论文降重写文案、周报写代码改bug写注释写作业制作游戏策划方案 四、 搭建自己的chatgpt方法五、 安全、安全试用chatgpt的方法六、 几款类似chatgpt的工具七、 优点八、 缺点九、下一步的期待十、 总结 …

vue中vue-cli项目各种报错

目录 sockjs.js报错 [WDS] Disconnected报错 假如有以上报错&#xff0c;首先看下index.html有没有这句 <meta http-equiv"Content-Security-Policy" content"upgrade-insecure-requests"> 是限制资源获取&#xff1a;限制网页当中一系列的资源获…

OkHttp3源码解析 - 拦截器

系列文章目录 第一章 OkHttp3源码解析 - 请求流程 第二章 OkHttp3源码解析 - 拦截器 第三章 OkHttp3源码解析 - 连接机制和缓存机制 文章目录 系列文章目录前言一、五大内置拦截器二、拦截器分发流程1.RetryAndFollowUpInterceptor-重试重定向拦截器2.BridgeInterceptor-桥接拦…

用友BIP助力中国领先企业数智化国产替代

随着数字经济的快速发展&#xff0c;软件的重要性日益凸显。软件是新一代信息技术的灵魂&#xff0c;已经成为数字中国、制造强国、网络强国建设的关键支撑。面对全球竞争新格局&#xff0c;关键软件自主创新与国产化替代已迫在眉睫。 助力华为成功替换国外ERP系统 在此背景下…

android studio Switch按钮

1.添加按钮 <LinearLayoutandroid:layout_width"match_parent"android:layout_height"wrap_content"android:orientation"horizontal"><TextViewandroid:id"id/tv1"android:layout_width"0dp"android:layout_weig…

JavaScript如何实现继承?

&#x1f4dd;个人主页&#xff1a;爱吃炫迈 &#x1f48c;系列专栏&#xff1a;JavaScript &#x1f9d1;‍&#x1f4bb;座右铭&#xff1a;道阻且长&#xff0c;行则将至&#x1f497; 文章目录 继承JavaScript如何实现继承&#xff1f;原型链继承构造函数继承组合继承原型式…

纽扣电池出口欧盟ce认证EN62133测试项目

纽扣电池CE证办理&#xff0c;锂电CE证旨在提高环境性能的2006/66/EC入了电池和 蓄电池中0.0005%汞和便携式电池和蓄电池中0.002%镉的限值。自2013/56/EU 修订了2006/66/EC&#xff0c;2013/56/EU(修订2006/66/)规定&#xff0c;2015年10月1日 起&#xff0c;纽扣电池中汞的…

从零基础到条码高手:傻瓜式操作,告别excel、AI和PS的烦恼

条形码是一种用于商品识别、库存管理等方面的编码标识系统&#xff0c;它是通过将数字和字符以特定的图案排列组合起来&#xff0c;从而形成一组能被机器扫描和识别的条纹图案。 通常情况下&#xff0c;条形码的生成可以分为如下几个步骤&#xff1a; 1、编号&#xff1a;首先…

【神秘题 整数溢出】牛客小白月赛71 C-猫猫与数列

被教育了 学到了一些只有我不知道的常识 C-猫猫与数列_牛客小白月赛71(重现赛) (nowcoder.com) 题意&#xff1a; 思路&#xff1a; 直接模拟即可 值得注意的是&#xff0c;他在算数列的过程中可能会爆long long&#xff0c;因此在算的时候注意开__int128&#xff0c;这样…

微信小程序 开发中的问题(simba_wx)

目录 一、[将 proto 文件转成 json 文件](https://blog.csdn.net/wzxzRoad/article/details/129300513)二、[使用 test.json 文件](https://blog.csdn.net/wzxzRoad/article/details/129300513)三、[微信小程序插件网址](https://ext.dcloud.net.cn/)四、[vant-weapp网址](http…

为什么停更ROS2机器人课程-2023-

机器人工匠阿杰肺腑之言&#xff1a; 我放弃了ROS2课程 真正的危机不是同行竞争&#xff0c;比如教育从业者相互竞争不会催生ChatGPT…… 技术变革的突破式发展通常是新势力带来的而非传统行业的升级改革。 2013年也就是10年前在当时主流视频网站开启分享&#xff1a; 比如 …

电脑开机后出现哭脸错误无法启动解决方法

电脑开机后出现哭脸错误无法启动解决方法。有用户安装好电脑系统之后&#xff0c;遇到了哭脸错误的情况。出现这样的错误原因有很多。如果你无法找到问题的根源的话&#xff0c;其实都是可以通过U盘重装系统的方法来解决的&#xff1f;接下来我们一起来看看以下的操作教学吧。 …

BM39-序列化二叉树

题目 请实现两个函数&#xff0c;分别用来序列化和反序列化二叉树&#xff0c;不对序列化之后的字符串进行约束&#xff0c;但要求能够根据序列化之后的字符串重新构造出一棵与原二叉树相同的树。 二叉树的序列化(Serialize)是指&#xff1a;把一棵二叉树按照某种遍历方式的结…

flv怎么转换成mp4?这3种方法总有适合你的

flv怎么转换成mp4&#xff1f;首先我们得知道flv为什么转换成mp4&#xff1f;FLV和MP4都是常用的视频格式&#xff0c;其中FLV格式在以前的一些互联网应用中得到了广泛使用。但随着技术的发展和设备的普及&#xff0c;MP4格式逐渐成为了主流的视频格式。因此&#xff0c;将FLV格…

jQuery 在图片和文字中插入内容(多种情况考虑)

昨天接到一个新的需要&#xff0c;在后台文章编辑器中&#xff0c;每一个文章的正文前面&#xff0c;可以单独添加一个电头字段&#xff0c;但是如果在富文本编辑器中最上面就添加图片的话&#xff0c;图片就会把电头和正文中的文字给隔开。需要做的是获取到电头字段&#xff0…

一款纯Web化免费SQL工具,重新定义数据库管理

SQL Studio是一款由麦聪软件研发的多数据库管理工具&#xff0c;提供Windows、Linux 和 MacOS三种版本的软件包&#xff0c;支持中英文两种语言。SQL Studio是用Java编写的&#xff0c;默认使用 JDK 8进行编译。 下载看这里: [SQLStudio] (http://www.maicongs.com/#/home/web)…

shell脚本function传参的使用

这里直接上操作来说明function 的传参 新建一个脚本 vi 1.sh #!/bin/bash function check_a {echo $2echo $3echo "this is check_a" } function check_b {echo $2echo "this is check_b" } $1 #$1作为选择执行哪个function的参数 执行以下这个脚本传…