Android多线程学习:线程池(二)

news2025/1/16 1:12:52

一、线程池运行流程

具体执行流程如下:

1、首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务;

2、如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;

3、如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中,然后空闲的核心线程会依次去缓存队列中取任务来执行;

4、如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个非核心线程(临时工)来执行新提交的任务;

5、如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

线程池运行.png

二、线程池状态计算

上一节讲了线程池状态的含义及转化,这节讲状态变量计算方式:

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
                                
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS; 
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }  //计算当前运行状态
    private static int workerCountOf(int c)  { return c & CAPACITY; }  //计算当前线程数量
    private static int ctlOf(int rs, int wc) { return rs | wc; }  //通过状态和线程数生成ctl

如上面代码所示:
ThreadPoolExecutor中使用一个AtomicInteger 类来表示线程池状态及该状态下的线程数量。将一个32位的int值分为了两部分,前3位表示线程状态,后29位表示线程个数,最大可表示2^29 - 1个线程,运行线程池根本达不到这个数量。

位数计算:
COUNT_BITS = Integer.SIZE - 3 = 32 - 3 = 29
容量计算:
CAPACITY = (1 << COUNT_BITS) - 1 = 100000000000000000000000000000(30位) - 1 = 11111111111111111111111111111(29位)

之所以这样设计是因为线程池天生处于一个并发的环下,如果分开用两个变量进行存储,就必须要通过锁进行线程安全处理,从而保证两个变量的修改具备原子性,但是这种做法对性能影响非常严重,因此将两个变量分别包装在一个变量中,最后的并发操作发生在AtomicInteger 上,而AtomicInteger 恰恰就是一个无锁原子操作类,这样既可以解决线程安全问题,又避免锁的使用,从而保证了并发性能。并且这里都使用的是位运算的方式,相比于基本运算,速度也会快很多。

三、线程池源码阅读

1、任务执行入口execute:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
   
    int c = ctl.get();
    int num = workerCountOf(c);
    //第一步
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //第二步
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }//第三步
    else if (!addWorker(command, false))
        reject(command);
}

// firstTask 待执行的任务 core 是否要启动核心线程
private boolean addWorker(Runnable firstTask, boolean core) 
  • 第一步,判断工作线程数如果小于corePoolSize核心线程数,直接创先新的线程addWorker,并将任务作为该线程的第一个任务(firstTask),添加成功直接返回。如果运行线程大于等于核心线程数执行第二步;

  • 第二步,若当前线程池处于运行状态,则向阻塞队列中插入一个任务。若插入成功,则进行进行二次校验,防止一次校验通过后线程池关闭。二次校验如果线程池不是运行状态,移除任务并执行拒绝策略。如果运行状态没变,但工作线程个数是0,则新建一个非核心线程,添加个null任务,线程会去队列中获取执行任务。

    若一开始判断线程池处于非运行状态或者插入队列失败(队列任务已经满了),执行第三步;

  • 第三步,如果队列已经满了,新建一个非核心线程执行该任务,如果新建失败,可能是线程池关闭了,或者线程数量达到了maxPoolSize

2、新建工作线程addWorker:

private boolean addWorker(Runnable firstTask, boolean core) {
         //第一步
        retry: //双层for循环 retry外层循环终止标识
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {         
                int wc = workerCountOf(c);
                if (wc >= CAPACITY || 
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                 //CAS:workerCount +1 
                if (compareAndIncrementWorkerCount(c))
                    //跳出外层循环 ,循环结束
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    //继续最外层循环
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
  • 第一步,这一步主要是检查是否能新建工作线程,如果可以执行CAS操作,workCount1。工作线程创建失败的主要原因是:
    (1)当前线程池处于关闭状态rs >= SHUTDOWN
    (2)线程数量超过线程池设置最大值;
    (3)CAS操作失败,说明其他线程也有该操作,会在循环内重试。
        //第二步
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                       
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                   
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //workerAdded = true; 
                if (workerAdded) {
                    // 启动线程 
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
  • 第二步,经过第一步说明当前条件下可以创建工作线程,这一步主要是线程创建逻辑。中间修改变量workerslargestPoolSize 前需要加锁,设置成功后将锁放开,启动工作线程工作。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable  {
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
}

Worker类,实现了Runnable接口,并持有一个线程thread,一个初始化的任务firstTaskthread是在调用构造方法时通过ThreadFactory来创建的线程,可以用来执行任务;

firstTask用它来保存传入的第一个任务,这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建。

3、工作线程执行

public void run() {
    runWorker(this);
}

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {            
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
               //任务执行前执行,子类可实现自己的逻辑
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {             
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //执行任务后调用,子类可实现自己的逻辑
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //获取不到任务时,主动回收自己
        processWorkerExit(w, completedAbruptly);
    }
}

工作线程创建成功后,t.start()会启动线程执行任务,最终会进入到runWorker方法中,主要步骤如下:

  • 第一步,看下当前线程创建的时候有没有自带的任务,有的话先执行自带任务,没有的话通过getTask()去阻塞队列获取,如果获取到的tasknull,则执行processWorkerExit(w, completedAbruptly)回收自己。

  • 第二步,在执行task的时候的需要加锁,如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态。之所以要加锁是因为在工作线程回收的时候,会通过锁来判断线程是否在执行任务,如果加锁了说明在执行task则不回收。

  • 第三步, task.run()任务执行。

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

判断流程如下:

  • 第一步,getTask()返回 null 说明工作线程需要被回收,如下几个情况会返回null
    (1)线程状态处于STOP,或者处于SHUTDOWN且队列中没有需要执行的task
    (2)线程最大个数maximumPoolSize中间被修改过,导致当前线程数超过maximumPoolSize或者获取任务超时,此时如果队列中没有任务或者线程数大于1,返回null

  • 第二步,获取timed变量值,如果核心线程设置过超时时长,或者当前线程是非核心线程,该变量会置为true,说明获取任务的时候需要加入超时判断,进行线程回收。

  • 第三步,如果线程需要限时回收执行poll()函数,如果没设置过超时,并且是核心线程,执行take()函数。poll()在指定的超时时间内没有获取到线程的task()自动超时回收,take()会一直阻塞,直到有新的task出现。

总结,代码看多了会有点晕,前面的源码分析抽离出来大体流程图如下:
线程池任务执行流程图.png

4、工作线程回收

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly)
        decrementWorkerCount();
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {       
       completedTaskCount += w.completedTasks;       
       workers.remove(w);
    } finally {
       mainLock.unlock();
    }

    tryTerminate();
    ....
}

在执行processWorkerExit函数后,加锁修改completedTaskCountworkers变量值,最后走到 tryTerminate()中,这个方法在任何可能导致线程池终止的动作后执行,比如减少wokerCountSHUTDOWN状态下从队列中移除任务。

final void tryTerminate() {
    for (;;) {
    int c = ctl.get();
    if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN &&           
               !workQueue.isEmpty()))
        return;

   if (workerCountOf(c) != 0) { // Eligible to terminate
       interruptIdleWorkers(ONLY_ONE);
       return;
   }
   final ReentrantLock mainLock = this.mainLock;
   mainLock.lock();
   try {
      if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
            try {
                // 子类重载:一些资源清理工作
                terminated();
            } finally {
              ctl.set(ctlOf(TERMINATED, 0));
              termination.signalAll();
            }
            return;
       }
   } finally {
        mainLock.unlock();
   }
// else retry on failed CAS
}

tryTerminate()主要分为以下几步:

  • 第一步,判断当前线程池状态,如果还处于RUNNING状态,或SHUTDOWN状态但是任务队列非空,或runState >= TIDYING 线程池已经停止了或在停止了,则直接返回;

  • 第二步,如果当前状态不属于第一步的判断,则可以将空闲的工作线程进行回收,如果工作线程个数大于0,则执行interruptIdleWorkers()

private void interruptIdleWorkers(boolean onlyOne) {
   final ReentrantLock mainLock = this.mainLock;
   mainLock.lock();
   try {
      for (Worker w : workers) {
         Thread t = w.thread;
         if (!t.isInterrupted() && w.tryLock()) {
              try {
                  t.interrupt();
              } catch (SecurityException ignore) {

              } finally {
                  w.unlock();
              }
       }
       if (onlyOne)
        break;
     }
  }  finally {
     mainLock.unlock();
  }
}

从上面代码可以看到,在回收的时候会通过w.tryLock验证是否能获取到锁,如果可以获取到说明该线程没有在运行,因为runWorker中执行任务会先lock,因此保证了中断的肯定是空闲的线程。

参考文章:
Java线程池实现原理及其在美团业务中的实践
全方位解析-Android中的线程池

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1074269.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

12. Java异常及异常处理处理

Java —— 异常及处理 1. 异常2. 异常体系3. 常见Exception4. 异常处理4.1 try finally catch关键字4.2 throws和throw 自定义异常4.3 finally&#xff0c;final&#xff0c;finalize三者的区别 1. 异常 异常&#xff1a;在程序执行过程中发生的意外状况&#xff0c;可能导致程…

使用Pritunl OpenVPN远程连接,实现安全高效的远程访问

文章目录 前言1.环境安装2.开始安装3.访问测试4.创建连接5.局域网测试连接6.安装cpolar7.配置固定公网访问地址8.远程连接测试 前言 Pritunl是一款免费开源的 VPN 平台软件&#xff08;但使用的不是标准的开源许可证&#xff0c;用户受到很多限制&#xff09;。这是一种简单有…

印度网络安全:威胁与应对

随着今年过半&#xff0c;我们需要评估并了解不断崛起的网络威胁复杂性&#xff0c;这些威胁正在改变我们的数字景观。 从破坏性的网络钓鱼攻击到利用人工智能的威胁&#xff0c;印度的网络犯罪正在升级。然而&#xff0c;在高调的数据泄露事件风暴中&#xff0c;我们看到了政…

禁用Chrome自动更新

chrome浏览器会强制用户自动更新&#xff0c;每次点击关于google时&#xff0c;会自动检测更新并下载&#xff0c;非常不好 1. 进入%userprofile%\AppData\Local\Google文件夹 2. 找到其中的Update文件夹&#xff0c;右键属性-安全&#xff0c;将所有组/用户的权限设置为拒绝…

8年测试老鸟,性能测试-数据库连接池问题定位/分析,一篇打通...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 1、环境准备 1&a…

JProfiler14.0(Java开发分析)

JProfiler是一款专业的Java应用程序性能分析工具&#xff0c;可帮助开发人员识别和解决Java应用程序中的性能问题。JProfiler支持Java SE、Java EE和Android平台&#xff0c;提供了多种分析选项&#xff0c;包括CPU分析、内存分析和线程分析等。 使用JProfiler&#xff0c;开发…

【手写数字识别】数据挖掘实验二

文章目录 Ⅰ、项目任务要求任务描述&#xff1a;主要任务要求(必须完成以下内容但不限于这些内容)&#xff1a; II、方法思想及实现原理陈述&#xff08;20分&#xff09;算法思想和实现原理数据集描述实验运行环境描述不同方法对MNIST数据集分类识别结果分析(不同方法识别对比…

拍摄的照片怎么做二维码?一分钟在线生成二维码

​手机拍摄的照片怎么做成二维码呢&#xff1f;用二维码来查看图片的方式现在很多人都在使用&#xff0c;其优点在于不占用自身空间&#xff0c;还可以拥有更快速度让他人查看图片内容&#xff0c;常见的图片二维码类型一般有信息展示、照片展示、商品海报、表情包等等。图片二…

2023性能测试入门,其实很简单,看看这篇,好好学习

注&#xff1a;性能测试&#xff0c;入门简单&#xff0c;深入难。经常有同学问&#xff0c;建议看看这篇 。 为了帮助大家快速的入门性能测试&#xff0c;接下来文章将从以下几个方面进行展开&#xff1a; 一、赶鸭子上架要我搞性能测试&#xff0c;怎么办&#xff1f; 二、想…

最新外卖点餐小程序开源源码 支持单店+多店双模式 含完整前后端代码包和搭建教程

随着移动互联网的普及&#xff0c;外卖点餐已成为人们日常生活中不可或缺的一部分。给大家分享一个全新的外卖点餐小程序开源源码&#xff01;该程序支持单店及多店模式&#xff0c;含完整的前后端代码和详细的搭建教程&#xff0c;让您轻松开启外卖点餐业务&#xff01; 一、…

企业数字化之库存管理篇

一、前言 接上一篇 《企业数字化之采购篇》&#xff0c;这一篇我们来了解一下如何做好库存管理&#xff0c;主要还是讲销售型企业成品库存的管理&#xff0c;对于生产制造型企业库存因涉及到物料、半成品、各种消耗品、成品&#xff0c;其存在一定依赖的相关性&#xff0c;会复…

简单强大的时序图绘制工具

今天分享一个简单强大的时序图绘制工具——WaveDrom。 WaveDrom Digital Timing Diagram everywhere WaveDrom draws your Timing Diagram or Waveform from simple textual description. It comes with description language, rendering engine and the editor. WaveDrom edi…

vue-7-vuex

一、Vuex 概述 目标&#xff1a;明确Vuex是什么&#xff0c;应用场景以及优势 1.是什么 Vuex 是一个 Vue 的 状态管理工具&#xff0c;状态就是数据。 大白话&#xff1a;Vuex 是一个插件&#xff0c;可以帮我们管理 Vue 通用的数据 (多组件共享的数据)。例如&#xff1a;购…

浅谈内存函数以及模拟实现

1.memcpy void * memcpy ( void * destination, const void * source, size_t num ); 函数memcpy从source的位置开始向后复制num个字节的数据到destination的内存位置。 这个函数在遇到 \0 的时候并不会停下来。 如果source和destination有任何的重叠&#xff0c;复制的结果都…

小白学习笔记—网络安全/黑客技术

作为一个合格的网络安全工程师&#xff0c;应该做到攻守兼备&#xff0c;毕竟知己知彼&#xff0c;才能百战百胜。 谈起黑客&#xff0c;可能各位都会想到&#xff1a;盗号&#xff0c;其实不尽然&#xff1b;黑客是一群喜爱研究技术的群体&#xff0c;在黑客圈中&#xff0c;一…

【HomeKit】HAT User Manual教程

前言&#xff1a;这篇文章是对于苹果协议文件《HomeKit Accessory Tester (HAT) User Manual》的学习&#xff0c;即 HomeKit配件测试仪(HAT) 用户手册&#xff0c;该版本是第11次修订 第一章 概述 本文档介绍了Apple HomeKit配件测试仪(HAT)的配置和使用方法。HAT是一个Mac应…

家政系统开发,家政保洁维修预约小程序开发;

家政系统是家政行业的专业管理系统软件&#xff0c;功能涉及到家政公司运营的方方面面&#xff0c;包括&#xff1a;推广、营销、管理、培训、周边服务等等&#xff1b; 家政系统功能介绍&#xff1a; 系统集成分销客户裂变、微信推广、团购引流、热文海报推广、短视频引流、搜…

Camera metadata

目录 背景 CameraMetadata基本概念 Google Metadata Google—Metadata结构 官方注释 Aandroid API cameraMetadata头部 : struct camera_metadata camera_metadata_buffer_entry struct camera_metadata_entry data区为什么是一个联合体&#xff1f; camera metadat…

什么是Fetch API?与传统的AJAX相比,有什么优势?

聚沙成塔每天进步一点点 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 欢迎来到前端入门之旅&#xff01;感兴趣的可以订阅本专栏哦&#xff01;这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发…

3分钟学会批量查询快递秘籍

随着网购的普及&#xff0c;我们经常需要查询快递来了解自己的包裹状态。然而&#xff0c;如果一个个手动查询&#xff0c;不仅费时而且麻烦。这时候&#xff0c;一款能够批量查询快递的软件就变得尤为重要。今天&#xff0c;我将向大家介绍一款名为“固乔快递查询助手”的软件…