一文吃透Java线程池——实现机制篇

news2024/11/14 16:47:20

前言

本篇博客是《一文吃透Java线程池》系列博客的下半部分。
上半部分链接:一文吃透Java线程池——基础篇

实现机制(源码解析)

根据前面的学习,我们知道,线程池是如下的运作机制
在这里插入图片描述

解析:

  • 一开始,核心线程数以内的任务。直接分配线程,然后线程执行任务。
  • 后来任务量越来越多,超出了核心线程处理能力,就被排到了任务队列里
  • 当当前任务量再大,线程池就会临时新增线程来处理,直到达到最大线程数。
  • 当当前任务量超出队列承载能力和最大线程数限制,任务就会被拒绝。
  • 给一开始的任务分配了线程。这些线程处理完他们各自的任务,并不会止步于此。(主次关系开始反转:线程一开始是随着任务生成的,但生成之后,任务结束了,但线程落地生根。完成了第一个任务,就开始找第二个任务,第三个任务。。。。)
  • 线程找第二个,第三个任务。去哪找的呢?就是第二句中说到的BlockingQueue。
  • 图中最右边的“线程回收”,指的就是一开始说的“临时新增”的线程。核心线程是不会被回收的,他们会一直尝试从BlockingQueue获取任务,否则就阻塞着。(BlockingQueue.take()的阻塞特性)

下面我看一下ThreadPoolExecutor中,几个关键的源码逻辑

线程池状态/worker数量记录:ctl

这是ThreadPoolExecutor中的一个成员变量。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

  • 这是一个原子整数,可以保证多线程操作的原子性。
  • int有32位。这个ctl被拆成了两部分:3 + 29。 高3位存储的是线程池状态(runState),低29位存的是有效线程数(也叫WorkerCount。注意:这个值特别容易把人带沟里。这并不是前面我说的”当前任务量”,我说的当前任务量,指的是正在运行的加上队列等待的。而这个单指正在运行的,和线程数量一一对应,所以叫“有效线程数”)。
    这种存储方式在ReentrantReadWriteLock中的state也是这么处理的(它是对半分:16+16)。因为这两个数的操作都是原子同步的,但这两个数之间却未必同步。存在一起,免去了这样的麻烦。
  • 因为拆成了“二进制”,所以围绕着这个数的修改和查看就少不了一些位运算
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

private static final int RUNNING    = -1 << COUNT_BITS;//  11100000 00000000 00000000 00000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;//  00000000 00000000 00000000 00000000
private static final int STOP       =  1 << COUNT_BITS;//  00100000 00000000 00000000 00000000
private static final int TIDYING    =  2 << COUNT_BITS;//  01000000 00000000 00000000 00000000
private static final int TERMINATED =  3 << COUNT_BITS;//  01100000 00000000 00000000 00000000

private static int runStateOf(int c)     { return c & ~CAPACITY; }//获取线程池状态
private static int workerCountOf(int c)  { return c & CAPACITY; }//获取Worker数量
private static int ctlOf(int rs, int wc) { return rs | wc; }//把runStateOf和workerCount合成完整的ctl

解析

  • Integer.SIZE - 3即 32-3=29
  • CAPACITY为:00011111111111111111111111111111
  • '~'是一个位运算符。用于取反操作。结果为
    11100000000000000000000000000000
  • 00011111111111111111111111111111和11100000000000000000000000000000是两个位运算“工具数”,
    可以对c进行"与运算"。分别来求 低位的WorkerCount和高位的runState
  • c就是ctl,并不是一个普通的int值(所以不要当成1,2,3…来看)
  • ctlOf中的’|'是一个位运算符(或运算:0|1–>1, 1|1 -> 1, 0|0 -> 0)

‘|‘和’&‘是位运算符,’||‘和’&&‘是逻辑运算符。
true和false在使用’|‘和’&‘运算时,相当于1和0。从而让’|‘和’&‘表现出可以进行逻辑运算的现象。
因此,初学时可能会产生的印象:’|’ '&'和 ‘||’ '&&'都是逻辑运算符。前者的运算逻辑是运算符前后都要运算,后者可能只需要判断一个就能得出结果(这个结论是没错的,但理解的不够深刻)。

前面有可能会有疑问的二进制解释:
在这里插入图片描述

任务队列:workQueue

这是ThreadPoolExecutor中的一个成员变量。
private final BlockingQueue<Runnable> workQueue;
就是前面一直说的,当任务量超过核心线程数,就会去任务队列里等着被处理。

任务的封装类:Worker

前面总在说Worker。它就是ThreadPoolExecutor的一个内部类。
一开始当任务(Runnable)被提交进来,就会被封装成Worker。Worker里还有一个线程。
可以简单理解为:Worker = Runnable + Thread
这两个东西都是Worker内的两个成员变量,如下图Worker的内部结构:
在这里插入图片描述
这里的任务之所以被叫做firstTask。是因为thread处理完它,就会抛弃它(事实上是抛弃它所属的整个Worker),去处理下一个任务。因为前面打比喻时,就说过,Worker相当于给医生提供的门诊。医生有几个,门诊就有几个,是不会随便增加的。
所以:Thread和Worker都是随着Runnable的提交,而“被动”产生的。但并不会随着Runnable的运行结束而结束。而是反转为主人,开始主动为其他任务服务。(这是源码中反直觉的一个地方)

先不要关心Worker还实现了Runnable。这只是一个”编程技巧“,并非主要逻辑,不要被迷惑了。我们后面会讲为什么还要实现Runnable。

提交任务:executorService.execute

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // 1. 核心线程数内:直接执行
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 2. 超出核心线程:进队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3. 进队列失败 && 超出核心最大线程:拒绝
    else if (!addWorker(command, false))
        reject(command);
}

这段代码和上面那张原理图很容易对应起来,提交任务之后的三条路,正好对应上面三段代码。
这里着重强调几个方法:

  • addWorker : 封装Worker并执行(后面会详细解析)
  • workQueue.offer:进队列(和add的区别是:如果添加失败,只是返回false,而add则会报错)
  • int recheck = ctl.get(); 这是在尝试进核心线程失败后,再次获取ctl,以防止在这短暂的时间内,线程状态和任务数量已经被其他线程给改了。
  • reject:这个很好理解,就是执行拒绝策略。

addWorker:封装并启动

private boolean addWorker(Runnable firstTask, boolean core) {
    //1. 修改ctl
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
                firstTask == null &&
                ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))//  <-------------CAS
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    // 2. 封装并启动Worker
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);//  <-------------封装Worker
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();//  <-------------这里开始启动线程,执行任务了
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

这段代码很长,但逻辑并不复杂,只有两段:1. 修改ctl 2.封装Worker并启动。
修改ctl占了一半的代码。原因是修改ctl是一个需要考虑多线程并发问题的事情,但这里又不想加锁 影响性能,所以弄了两层for循环+CAS,也就是无锁自旋的方式来修改ctl。
封装Worker的过程用了锁来保证线程安全。

在看到 t.start();,你是否会疑惑:线程启动,和任务有什么关系?他俩不是并列关系吗?
要说清楚这个问题,就得先看看Worker是怎么创建的,下面是Worker构造方法

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask; //赋值任务
    this.thread = getThreadFactory().newThread(this);//赋值线程
}

第二句赋值任务,第三句赋值线程。这都很符合我们的预期(给两个关键的成员变量赋值)。
而线程是从getThreadFactory()来的,也没什么好说的,就是一开始传入的线程工厂。
关键是:newThread(this)。把当前Worker对象作为任务,塞给了这个线程。这就是为什么Worker要实现Runnable。
Worker既然实现了Runnable,就需要一个run方法给Thread去执行。说到这里,是不是感觉就能串起来了: 在这个run方法里只要去调用我们提交的任务的run方法。下面展示了:Worker,Runnable,Thread之间更隐秘的关系(别被他们的位置所属关系迷惑了,这里我把他们三个拆成三个独立的个体)
在这里插入图片描述
线程调用Worker的run方法,Worker的run方法里又调用Runnable(用户提交的任务)的run方法。Worker的run相当于一个中转站。

runWorker

上图中,Worker里的run方法,并不是简简单单直接调用了一下下面Runnable的run方法。而是“做了一点文章”。
原因很好理解:如果只是简单调用,那么不就意味着:任务提交,核心线程数内的任务 分配线程,执行完,然后就没然后了。线程变成了“一次性”线程。任务队列里还有一堆嗷嗷待哺的任务没人管了。
所以Worker里的run方法调用了一个叫runWorker的“中转方法”,看一下这个方法内容:

 final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {//  <-----------------不断去获取新任务
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                    runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);//钩子方法1:执行任务前
                Throwable thrown = null;
                try {
                    task.run();//  <-------------------执行我们提交的Runnable的run方法
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);//钩子方法2:执行任务后
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);//  <----------------没任务了,清理线程
    }
}
  • 只要能通过getTask()获取到新任务,这个循环就会一直继续。
  • 获取不到新任务,才会跳出死循环,清理线程。
  • 这两个钩子方法(beforeExecute和afterExecute)AOP的作用对象是工作线程,而不是线程池。

getTask():获取任务

为什么核心线程可以永远不会被清除呢,看一下获取任务的方法就知道了:
在这里插入图片描述
可以看到在从任务队列里取任务,有两种结果:

  1. 在指定时间内拿不到,就放弃(poll)。然后继续走下去,后面就会走出死循环,最终清理掉线程。
  2. 拿不到就一直阻塞(take)。线程会一直存在。

一般情况都是第二种情况。

会产生第一种情况有两种,任意一个条件满足就可以:

  1. 给线程池配置了allowCoreThreadTimeOut参数为true(默认是false)
    executorService.allowCoreThreadTimeOut(true);
    这个参数的意思是:是否允许核心线程超时(被清理)
    【这个参数可以让核心线程“不那么核心”,也会随时被清理。之所以前面不说这个参数,我是希望逐步加深,而不要一开始搞得太复杂】。
  2. 当前的Worker数量超过了最大核心线程数。也就是“最大线程数很大,但队列却很短,不够排队,就直接运行,导致临时新增的线程”(医院排队用的椅子不够用,所以去其他医院借调医生)。

到这里,你可以先停一下。再仔细看看那个’wc’变量,稍微分析一下,看看逻辑是否说的通。判断一下自己的理解是否有偏差。

线程回收

通过前面的讲解,我们知道了什么样的线程会被清理。
有人或许会提到一个词叫“线程回收”。从字面意思来看,“回收”指的就是“回收再利用”。但看过源码我们就知道,并不存在一个“回收再利用”的机制。其实是“同一个线程不断索取新任务”的机制。
我们看看线程是如何清理的,也就是前面runWorker方法里,最后finally里的方法

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        ...
        workers.remove(w);
        ...
    }

我把其他代码都删了,只留了一句关键代码。

这个workers,是ThreadPoolExecutor里的一个成员属性HashSet,存储当前正在执行的Worker。
private final HashSet workers = new HashSet();

这里就是把当前这个Worker,从这个Set中删掉。
所以,准确的说是:清理Worker,顺带着把线程也清理了
因为当前这线程,只有当前这个Worker引用。一旦Worker被GC,那么线程也变的无依无靠,然后也被GC。

为什么Worker要继承AQS

关于AQS的详细解释,请看我的另一篇博客Java锁深入理解2——ReentrantLock

在这里插入图片描述
Worker类实现Runnable的原因,前面已经说过了。那它为啥要继承AQS呢。
我在网上查了好多文章。大家抄来抄去,也没一个人说清楚,如下是一个还不错的说法:

Worker是通过继承AQS,使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。

  1. lock方法一旦获取了独占锁,表示当前线程正在执行任务中。
  2. 如果正在执行任务,则不应该中断线程。
  3. 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。
  4. 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。

对于这段话。实现不可重入的特性反应线程现在的状态,我不太理解。【有理解的,可以评论区留言】
但下面那段话,我大致是明白了,可以给大家通俗直观的讲一下。

锁要起作用,至少要两个线程出现并发。
这里出现并发的线程只有两个:主线程 和 某一个工作线程

而发生的场景,就是对线程池使用shutdown()的时候。

shutdown()

这是线程池的一个停止方法,目的是“优雅的把线程池”停下来(前面我们很多demo也都用了)
下面是代码

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();// <-------------中断空闲线程
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

由于前面我们也说了,线程和Worker一一对应,所以中断Worker,也就是为了中断线程。下面是具体的代码
在这里插入图片描述
基本逻辑就是:遍历所有Worker,然后给他们的线程发出中断信号。
但是呢,有条件。第一个条件没啥,就是已经中断的就不再中断。关键是第二个条件,继续往下看tryLock的代码

public boolean tryLock()  { return tryAcquire(1); }//试图抢锁
----
protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {// <-----只有当前state是0,才更新为1,否则返回false
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

也就是说Worker把自己包装成了一个独占锁。
主线程在试图给Worker发中断消息前,会先抢一下这个锁。抢成功了,才给这个Worker发中断,否就跳过这个Worker。
好,至此主线程这边就讲完了。

然后看看另一边,可能会受影响的工作线程那头的情况(也就是Worker):

在这里插入图片描述
这段代码的大致逻辑:

  • 如果能通过getTask()获取到任务,就一直循环。
  • 在循环体内会通过Worker的锁,开始lock(把state改为1),结尾unlock(把state改为0),围成一个“安全区”。

所以,对那头的主线程来说:只有Worker线程处在绿色框里的getTask()时,才会发中断。而当Worker线程在红框里时,就不会发(注意:是直接跳过不发,而不是等着工作线程跑到绿色框内再发)。

从业务角度讲,红框内属于正在执行程序。绿框里属于空闲或者获取下一个任务的间隙(也算是短暂的空闲阶段)

所以:Worker通过继承AQS,把自己包装成一个独占锁。来实现只中断空闲线程的需求

“中断空闲线程”方法(interruptIdleWorkers),并不只有shutdown()会调用,当修改最大核心线程数是也会调用。

为什么不使用同一个锁?
Worker之间执行应该是互不干扰的。用同一个锁,线程池就退化成“所有任务一个个顺序执行”的无意义框架了。

虽然上面说我不太理解Worker的可重入效果。但我也有自己的想法,只是没有找到佐证我想法的地方。
我觉得这里的“非重入效果”,大白话应该是:防止重复提交中断

可以这么推理:非重入。指的就是自己反复拿锁。
这里只有两个线程:主线程,工作线程。谁可能重复拿锁呢?
工作线程应该不可能,人家就是老老实实的一遍遍循环“lock-unlock”,这是一个自动的过程。
那么可能重复拿锁的就是主线程了,比如用户连着写了两个shutdown()。那么就会在短时间内,出现两次w.tryLock()。如果前一次还没执行完,后面个就会tryLock()失败,返回false,跳过这个Worker。如果Worker很少,执行的很快,大概率是第二遍的所有Worker都会跳过通知中断。就实现了“防止重复提交中断”的效果。

怎么实现的非重入锁效果?(其实关于锁介绍,我之前"锁"的文章都讲过,这里再着重重复一遍)
其实非重入是最简单的,反而是要实现重入锁,反而需要增加判断逻辑。
关键就是tryAcquire方法逻辑(为什么和tryLock一样?因为tryLock调用的就是tryAcquire),就是尝试获取锁的逻辑:
非重入:一视同仁,别说旁人了,连自己都不能重复获取。
可重入:需要对当前线程进行判断,看看当前线程是不是获取锁的线程,如果是,说明是当前线程自己又来了。如下所示
在这里插入图片描述

getTask()的出口

前面我知道,想要从runWorker方法里的死循环里出来,唯一的办法就是getTask()获取任务返回null。所以我们看一下getTask()如何返回null
在这里插入图片描述

getTask()只有两个出口条件(这里的“出口”指的是跳出外面的runWorker循环,而不仅仅是跳出getTask()的循环),就是:

rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())
和
(wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())

第一个条件语句的意思是:如果状态已经被修改成SHUTDOWN并且任务队列为空。或者状态为STOP(使用shutdownNow后的状态)【stop状态值比shutdown的大。所以满足rs>stop 就一定满足 rs>shutdown】。
所以这里也解释了shutdown()和shutdownNow()的业务逻辑的区别。

第二个条件语句比较复杂,大致意思是:

  • 如果确认已经没任务可处理了,那些增加的临时线程,就可以出去了(被清理)
  • 因为改配置(核心线程数,最大线程数),从而需要删减掉的线程,在确保还有其他线程可以干活的时候,也可以出去了
  • 就是处理“临时工” 和 “公司因为架构调整而裁员”(走之前都确保完成了手头工作,并完成了交接),具体怎么动态改配置,见基础篇的最后[如何配置线程池]

shutdown()发出的中断信号是如何起作用的
如果此时线程阻塞在workQueue.take();
因为收到中断信号,阻塞就会解除,循环继续。然后在第一个出口出去(SHUTDOWN状态 并且 任务队列为空)

shutdown()和shutdownNow()的区别

通过前面分析,我们得知两者最大的区别是:shutdown()需要等队列空了之后才会销毁线程。而shutdownNow()只需要等当前线程执行的任务结束,就会销毁线程。

那就产生一个问题:为什么shutdown()只对空闲线程发中断消息?
其实我们看runwork和getTask()的逻辑可知,就算shutdown()像shutdownNow()那样给所有状态的线程都发中断消息,也不影响结果(反正出口 只在getTask里)。那他们中断的条件 做出区别,意义是什么呢?

:我们要考虑工作线程里的任务内容。
假如用户写的任务里也写了一个阻塞语句(比如最简单的Thread.sleep)。此时,两者的差别就出来了:
shutdownNow()因为无视运行中的用户任务,直接发送中断消息。比如此时用户任务里刚好执行到了sleep,正在阻塞。那么线程收到中断消息之后,sleep将会响应这个中断消息,从而结束阻塞,然后抛出一个异常。虽然用户捕获了这个异常(中断异常都是检查异常,需要强制捕获),但还是起到了一个作用:加速用户任务的结束。虽然这样引起了用户任务的报错,一定程度上确实影响了任务的正常执行,但也好过使用Thread.stop(被明令禁止使用的终止线程的粗暴方式,因为可能会引起意想不到的结果)。
但是,用户的任务里未必有阻塞语句。即便有,也不一定刚好赶上,阻塞的时候收到中断消息(也许刚好阻塞结束了,此时收到了中断消息。那么中断消息就对用户任务不起任何作用)。
所以shutdownNow()的作用是通过中断消息让用户任务尽快结束,但是否真的起作用,全凭用户任务是否理睬这个中断消息。如果用户任务无视这个中断消息,那就没什么特殊的效果。

所以我们更进一步引申:添加到线程池里的任务。在将要执行长时间操作的之前,最好手动刻意判断一下当前线程中断状态。如果收到中断消息,就可以根据实际业务情况。如果能停,就优雅的提前结束任务,也算是帮助shutdownNow()加快停机的速度。毕竟线程池执行shutdownNow()并不常见(相比较而言,shutdown()还是更优雅,更常用)。如果出现了,那么执行人必然是希望线程都能尽快的停下来的。

【关于中断,详细请看Java锁深入理解4——ReentrantLock VS synchronized中的中断部分】

线程池状态变化

在这里插入图片描述

  • 对用户来说一般只关心正常运行的RUNNING状态,和操作shutdown()之后变成SHUTDOWN状态,操作shutdownNow()变成STOP状态。
  • 操作shutdown()或shutdownNow()之后,用户一般就不用管了,都会自动走到最终的TERMINATED(shutdown和shutdownNow方法都会通过调用tryTerminate()来完成后续收尾工作)。
  • TIDYING(字面意思是整理收拾残局)是一个中间状态,感知最不明显。
  • terminated()就是基础篇中讲到的ThreadPoolExecutor为子类提供的三个钩子方法之一,也是唯一的一个线程池本身状态变化的钩子方法,表示:线程池关闭后,你要做点什么

两种关闭前的钩子方法

在ThreadPoolExecutor中还发现有一个finalize()方法。

/**
 * Invokes {@code shutdown} when this executor is no longer
 * referenced and it has no threads.
 */
protected void finalize() {
    SecurityManager sm = System.getSecurityManager();
    if (sm == null || acc == null) {
        shutdown();
    } else {
        PrivilegedAction<Void> pa = () -> { shutdown(); return null; };
        AccessController.doPrivileged(pa, acc);
    }
}

表示:在线程池中没有线程了,并且线程池本身这个引用也没人引用了(可能只是一个局部变量)。即将被GC处理掉前调用这个方法。

在初学Java时,会经常提到这个方法,是Object类为我们提供的钩子方法。在这个方法里,调用了一下shutdown()方法。

这让我想起了另外一个关闭前的钩子方法Runtime.getRuntime().addShutdownHook(线程)
它的意思JVM关闭前会调用这线程(这里的只能传入一个线程,而不是方法)。在SpringBoot里,创建完Tomcat对象,就把关闭Tomcat的代码写在了这个线程里。

所以,为了优雅而安全的关闭资源,我们可以重写finalize()方法或者使用Runtime.getRuntime().addShutdownHook(线程)来完成一些收尾操作。

submit + Future + Callable实现异步返回值

这里讲一下线程池实现异步返回运行结果的实现机制
在这里插入图片描述

  • 表面上(左侧),把Callable任务传入ThreadPoolExecutor的submit方法,返回Future。在Future里可以异步get出任务的返回值。
  • 实际内部机制(右侧):以FutureTask为核心。
  • FutureTask干了如下事情:
    • 把Callable封装进来
    • 实现Runnable接口,所以有一个run方法。
    • 在run方法里调用Callable的call方法。
    • 把call方法的返回值封装在自己的另一个成员属性outcome里(等人来取)。
    • 最后把FutureTask作为一个任务(Runnable)传入线程池的execute方法。
    • 线程(Thread)执行FutureTask中的run方法,然后前面的步骤就走通了。。。
    • submit的返回值Future.get(),获取到的就是FutureTask中的outcome(call方法的返回值)

小结

  • 线程池的实现机制基本都在ThreadPoolExecutor类中
  • Worker是ThreadPoolExecutor中的内部类。
  • 任务队列(workQueue)和ctl是ThreadPoolExecutor的成员变量。
  • ctl记录有效线程数和线程池状态
  • Worker封装了线程(Thread)和任务(Runnable)
  • 线程和Worker是公用的,一般都是固定数量(和核心线程数一致)
  • 初始任务(Runnable)的提交,伴随着Worker的创建和线程的创建。之后线程和Worker很快固定下来,不断从任务队列中获取并处理新的任务
  • 有效线程数达到核心线程数,新提交进来的任务都是进入任务队列,等待被线程处理。
  • 核心线程获取不到任务就一直阻塞着。临时新增的线程获取不到任务,等待一段时间确认没活干了,就会被清理掉。
  • Worker通过继承AQS把自己包装成一个不可重入的独占锁。并在运行过程中,用这个锁来保证线程安全(不被中断信号干扰)。同时避免了中断信号的重复发送。
  • shutdown()和shutdownNow()都是通过给线程发中断信号,来中断阻塞。但shutdown()只会让空闲的线程中断。总之,就是为了让线程池“优雅”的关闭。
  • 为了优雅的关闭,我们除了设置一些主动操作,还可以设置一些兜底的被动关闭方式:重写finalize()或者使用Runtime.getRuntime().addShutdownHook(线程)
  • 本质上,线程池只有一个接收任务的方法:execute(Runnable)。任务也只有Runnable。只不过在FutureTask的作用下,实现了第二个接受任务的方式submit。而且这个方法还有返回值,只不过接受的任务也变成了Callable。

总结

  • 为了合理利用线程资源,JDK参考“池化思想”,引入线程池来统一管理和使用线程。
  • 主要涉及到三个相关概念:线程池,任务,返回值(Future)
  • 线程池是一个生产者-消费者模型。通过阻塞队列来让少量的线程处理很多任务(类似于医院里,几个医生为很多患者服务的场景)
  • 配置合理的线程池关键参数(核心线程数,最大线程数,队列长度),在实际生产中并不容易。可以通过线程池提供的getxxx来监控,然后通过setxxx来动态配置参数。
  • 在线程池的实现中,“解决多线程并发问题”和“优雅的关闭线程池”是两个重点考量问题。主要应用了锁和CAS(无锁)来实现。【关于锁,可以参考另一篇博客Java锁深入理解】

参考

JDK1.8源码
Java线程池
JAVA Future类详解
Java线程池实现原理及其在美团业务中的实践
jdk线程池工作原理解析
线程的高效利用

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/431732.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Flutter插件开发-(进阶篇)

一、概述 Flutter也有自己的Dart Packages仓库。插件的开发和复用能够提高开发效率&#xff0c;降低工程的耦合度&#xff0c;像网络请求(http)、用户授权(permission_handler)等客户端开发常用的功能模块&#xff0c;我们只需要引入对应插件就可以为项目快速集成相关能力&…

2023-04-15 学习记录--C/C++-mac vscode配置并运行C/C++

mac vscode配置并运行C/C 一、vscode安装 ⭐️ 去官网下载安装mac版的vscode。 二、vscode配置 ⭐️ &#xff08;一&#xff09;、安装C/C扩展插件及必装好用插件 1、点击左边的 图标(扩展: 商店)&#xff0c;如下图&#xff1a; 2、先安装 C/C、C/CExtension Pack插件&…

大话数据结构-C(2)

二&#xff1a;算法 解决特定问题求解步骤的描述&#xff0c;在计算机中表现为指令的有限序列&#xff0c;并且每条指令表示一个或多个操作。 2.1 算法的特性 算法具有五个基本特性&#xff1a;输入、输出、有穷性、确定性、可行性。 1&#xff09;输入输出&#xff1a; 算法具…

Python --- 文件操作

目录 前言 一、open()函数 1.只读模式 r 2.只写模式 w 3.追加模式 a 二、操作其他文件 1.Python 操作二进制 2.Python 操作 json 文件 三、关闭文件 四、上下文管理器 五、文件指针位置 前言 在实际操作中&#xff0c;通常需要将数据写入到本地文件或者从本地文件中…

南方猛将加盟西方手机完全是臆测,他不会希望落得兔死狗烹的结局

早前南方某科技企业因为命名的问题闹得沸沸扬扬&#xff0c;于是一些业界人士就猜测该猛将会加盟西方手机&#xff0c;对于这种猜测可以嗤之以鼻&#xff0c;从西方手机以往的作风就可以看出来它向来缺乏容纳猛将的气量。一、没有猛将的西方手机迅速沉沦曾几何时&#xff0c;西…

【项目】bxg基于SaaS的餐掌柜项目实战(2023)

基于SaaS的餐掌柜项目实战 餐掌柜是一款基于SaaS思想打造的餐饮系统&#xff0c;采用分布式系统架构进行多服务研发&#xff0c;共包含4个子系统&#xff0c;分别为平台运营端、管家端&#xff08;门店&#xff09;、收银端、小程序端&#xff0c;为餐饮商家打造一站式餐饮服务…

如何用ChatGPT翻译?ChatGPT提升翻译速度,亲测有效

作为翻译新手&#xff0c;你是否为翻译不准确不地道而烦恼&#xff1f; 随着ChatGPT的大火&#xff0c;很多聪明的翻译已经开始使用ChatGPT辅助自己提升翻译能力和速度了。 想用ChatGPT翻译&#xff0c;首先要知道在哪里可以使用ChatGPT&#xff01;在国内选择不用注册不用登录…

python实现批量生成带内容的文件夹

我工作的时候经常遇到这个问题&#xff1a;需要批量生成带内容的文件夹来辅助工作。 我有8种不同名字的文件夹 每个文件夹下面都有以日期命名的文件夹 日期文件夹里面会记录我当天需要记录的东西。 我需要实现的功能是&#xff1a; 1.输入一个天数N&#xff0c;生成N天以前…

机器学习 day05(多元线性回归,向量化)

单个特征&#xff08;变量&#xff09;的线性回归模型 房子的价格仅由房子的大小决定&#xff0c;如图&#xff1a; 多个特征&#xff08;变量&#xff09;的线性回归模型 房子的价格由房子的大小&#xff0c;房子有多少个卧室&#xff0c;房子有几层&#xff0c;房子住了多…

代码随想录|day44|动态规划part06● 完全背包● 518. 零钱兑换 II ● 377. 组合总和 Ⅳ

完全背包 理论基础 视频&#xff1a;带你学透完全背包问题&#xff01; 和 01背包有什么差别&#xff1f;遍历顺序上有什么讲究&#xff1f;_哔哩哔哩_bilibili 链接&#xff1a;代码随想录 //先遍历背包还是先遍历物品是没有影响的。可以和01背包保持一致&#xff0c;都先遍历…

vue-自定义指令

需求1&#xff1a;定义一个v-big指令&#xff0c;和v-text功能类似&#xff0c;但会把绑定的数值放大10倍。 需求2&#xff1a;定义一个v-fbind指令&#xff0c;和v-bind功能类似&#xff0c;但可以让其所绑定的input元素默认获取焦点。 语法&#xff1a; 局部使用&#xff…

【硬件外设使用】——I2C

【硬件外设使用】——I2CI2C基本概念I2C通信协议I2C使用方法pyb.i2cmachine.i2cI2C可用的传感器I2C基本概念 I2C是"Inter-Integrated Circuit"的缩写&#xff0c;也被称为TWI (Two Wire Interface)。 它是一种串行通信协议&#xff0c;用于连接多个设备或组件。 I2…

记一次idea+Dockerfile+docker部署

软件版本&#xff1a;idea:2021.3,docker:19.03.9,服务器&#xff1a;centos7.8 1.centos7服务器配置 在服务器上编辑docker文件 vi /usr/lib/systemd/system/docker.service修改以ExecStart开头的行 ExecStart/usr/bin/dockerd -H tcp://0.0.0.0:2375 -H unix://var/ru…

计算机视觉基础__图像特征

计算机视觉基础__图像特征 本篇目录&#xff1a; 一、前言 二、位图和矢量图概念 三、图像的颜色特征 四、RGB 颜色空间 五、HSV 颜色空间 六、HLS 颜色空间 七、实例代码 八、参考资料 一、前言 传统图像处理&#xff0c;需要找出图片中的关键特征&#xff0c;然后对这…

30天学会《Streamlit》(5)

30学会《Streamlit》是一项编码挑战&#xff0c;旨在帮助您开始构建Streamlit应用程序。特别是&#xff0c;您将能够&#xff1a; 为构建Streamlit应用程序设置编码环境 构建您的第一个Streamlit应用程序 了解用于Streamlit应用程序的所有很棒的输入/输出小部件 第6天 - 将…

GO变量的使用

Go变量的使用注意事项 &#xff08;1&#xff09;第一种&#xff1a;指定了变量类型&#xff0c;但是声明后若不赋值&#xff0c;则使用默认值 &#xff08;2&#xff09;第二种&#xff1a;根据值自行判断我们的变量类型**&#xff08;类型推导&#xff09;** var num10.00 …

Python ---> 衍生的数据技术

我的个人博客主页&#xff1a;如果’真能转义1️⃣说1️⃣的博客主页 关于Python基本语法学习---->可以参考我的这篇博客&#xff1a;《我在VScode学Python》 随着人工智能技术的发展&#xff0c;挖掘和分析商业运用大数据已经成为一种推动应用&#xff0c; 推动社会发展起着…

接口优化方案

前言 最近随着国产化热潮&#xff0c;公司的用于营业的电脑全部从windows更换成了某国产化电脑&#xff0c;换成国产化之后&#xff0c;我们系统的前台web界面也由之前的jsp页面重构成vue.所以之前的一体式架构也变成了前后端分离的架构。但是在更换过程后&#xff0c;发现一些…

蓝绿部署技术方案

文章目录 ngx_lua介绍Nginxluangx_lua模块的原理&#xff1a;ngx_lua 模块执行顺序与阶段ngx_lua应用场景 JWTnginx镜像构造lua-redis蓝绿部署特性注意&#xff1a;蓝绿部署架构图nginx配置服务脚本部署使用职责分工 ngx_lua介绍 Nginx Nginx是Web服务器、HTTP反向代理和TCP代…

apache+tomcat实现动静分离和负载均衡

文章目录 ApacheTomcat整合环境通过JK实现动静分离编译mod_jk.so创建测试页面配置jk模块启动apache和tomcat测试。 ApacheTomcat负载均衡配置测试页配置mod_jk文件配置worker.properties测试 ApacheTomcat整合 Tomcat作为一个Servlet容器&#xff0c;可以用于运行Java Web应用…