Java并发 - 线程池

news2025/1/12 6:46:28

文章目录

  • 总体设计
  • 常见线程池
    • FixedThreadPool
    • CachedThreadPool
    • SingleThreadPool
    • ThreadPoolExecutor
  • 核心参数
  • 工作原理
    • 生产者消费者模型
    • 创建线程池
    • 提交任务
      • 任务提交方式
      • 任务提交流程
        • execute
        • addWorker
    • Worker队列
    • 线程运行 runWoker
    • 获取任务
    • 销毁工作线程
    • 线程池关闭
      • shutdown/shutdownNow
      • interruptIdleWorkers 和 interruptWorkers
  • 问题总结

线程的创建是消耗资源的, 而线程间的调度需要频繁依赖CPU的切换,不能频繁创建线程是站在线程池的角度来看的, 如果说创建线程所消耗的资源,在每次使用完后就销毁, 那么下一次使用的时候又得创建,这样造成资源频繁浪费

总体设计

下面是线程池相关类的继承体系

在这里插入图片描述

主要介绍以下几个类:

  1. Executor接口只提供了一个任务执行的抽象。
  2. ExecutorService才是线程池的一个抽象,提供了一些线程池相关的操作
  3. AbstractExecutorService 抽象类,提供一些可供子类复用的方法
  4. ScheduledExecutorService 定义任务相关的实现

常见线程池

FixedThreadPool

该线程池的最大线程数等于核心线程数,所以在默认情况下,该线程池的线程不会因为闲置状态超时而被销毁。如果当前线程数小于核心线程数,并且也有闲置线程的时候提交了任务,这时也不会去复用之前的闲置线程,会创建新的线程去执行任务。如果当前执行任务数大于了核心线程数,大于的部分就会进入队列等待。等着有闲置的线程来执行这个任务。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

CachedThreadPool

这种线程池内部没有核心线程,线程的数量是有没限制的。在创建任务时,若有空闲的线程时则复用空闲的线程,若没有则新建线程。没有工作的线程闲置状态超过了60s就会销毁。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

SingleThreadPool

有且仅有一个工作线程执行任务,所有任务按照加入的顺序执行,即遵循队列的入队出队规则

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

ThreadPoolExecutor

最常用的线程池,支持参数配置,提供了四个构造函数,可通过构造函数来进行线程池的配置,下面来解释下各个参数:

核心参数

int corePoolSize 该线程池中核心线程数最大值

核心线程:线程池新建线程的时候,如果当前线程总数小于corePoolSize,则新建的是核心线程,如果超过corePoolSize,则新建的是非核心线程核心线程默认情况下会一直存活在线程池中,即使这个核心线程啥也不干(闲置状态)。如果指定ThreadPoolExecutor的allowCoreThreadTimeOut这个属性为true,那么核心线程如果不干活(处于闲置状态)的话,并且不干活超过一定时间(时长由下面参数决定),就会被销毁掉。

int maximumPoolSize

该线程池中线程总数最大值,线程总数 = 核心线程数 + 非核心线程数。

核心线程数 VS 最大线程数
核心线程数定义了线程池中最小线程数量,即使这些线程处于空闲状态,也不会被销毁。
最大线程数定义了线程池中允许的最大线程数量,最大线程数等于核心线程数 + 临时线程数,最大线程数主要是提供了一种机制来应对突发的高并发请求,当有大量任务的时候,可以创建线程数量的上限。

long keepAliveTime

该线程池中非核心线程闲置超时时长。一个非核心线程,如果不干活(闲置状态)的时长超过这个参数所设定的时长,就会被销毁掉,如果设置allowCoreThreadTimeOut = true,则会作用于核心线程

TimeUnit unit

TimeUnit是一个枚举类型,表示参数keepAliveTime的单位,包括:
NANOSECONDS : 1微毫秒 = 1微秒 / 1000
MICROSECONDS : 1微秒 = 1毫秒 / 1000
MILLISECONDS : 1毫秒 = 1秒 /1000
SECONDS : 秒
MINUTES : 分
HOURS : 小时
DAYS : 天

BlockingQueue workQueue

该线程池中的任务队列,维护着等待执行的Runnable对象。当所有的核心线程都在干活时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务。

ThreadFactory threadFactory

这是一个线程创建工厂接口,new它的时候需要实现Thread newThread(Runnable r)方法

RejectedExecutionHandler handler 拒绝策略

当提交的任务无法被执行时,线程池会采取拒绝策略。拒绝策略用于处理任务的拒绝情况,以确保系统的稳定性和可靠性。以下是一些常见的拒绝策略:

  1. AbortPolicy(默认策略)
    该策略会直接抛出 RejectedExecutionException,终止当前的任务提交。
    适用场景: 当希望立即知道任务被拒绝时可以使用此策略。
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    corePoolSize,
    maximumPoolSize,
    keepAliveTime,
    TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(queueCapacity),
    new ThreadPoolExecutor.AbortPolicy() // 默认策略
);
  1. CallerRunsPolicy
    描述: 该策略会使调用 execute 方法的线程自己执行被拒绝的任务。这意味着任务将会在调用者的线程中执行,而不是在池中的线程中执行。
    适用场景: 适用于希望减轻任务提交压力的场景。
new ThreadPoolExecutor.CallerRunsPolicy()
  1. DiscardPolicy
    描述: 该策略会默默地丢弃被拒绝的任务。这意味着该任务将被忽略,且不会引发任何异常。
    适用场景: 当你不关心被拒绝的任务时,可以使用此策略。
new ThreadPoolExecutor.DiscardPolicy()
  1. DiscardOldestPolicy
    描述: 该策略会丢弃任务队列中最旧的任务,然后尝试提交当前任务。如果队列中没有任务,提交将成功。
    适用场景: 适用于希望优先保留最新任务的场景。
new ThreadPoolExecutor.DiscardOldestPolicy()
  1. 自定义拒绝策略
    描述: 你可以实现 RejectedExecutionHandler 接口来自定义拒绝策略,以满足特定需求。
public class CustomRejectHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        // 自定义逻辑,例如记录日志、重试任务等
        System.out.println("Task rejected: " + r.toString());
    }
}

// 使用自定义拒绝策略
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS,
                       new ArrayBlockingQueue<>(queueCapacity),
                       new CustomRejectHandler());

工作原理

生产者消费者模型

线程池的工作过程是经典的生产者消费者模型,生产者提交任务,消费者线程获取任务进行消费
在这里插入图片描述

创建线程池

使用下面的代码创建线程池

ThreadPoolExecutor executor = new ThreadPoolExecutor(
		5, 5, 0L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

刚创建出来的线程池中只有一个构造时传入的阻塞队列,里面并没有线程

在这里插入图片描述

如果想要在执行任务之前创建好核心线程数,可以调用ThreadPoolExecutor#prestartCoreThread或者ThreadPoolExecutor#prestartAllCoreThreads方法来预先创建线程,默认是没有线程的。

// 创建单个核心线程
public boolean prestartCoreThread() {
    return workerCountOf(ctl.get()) < corePoolSize &&
        addWorker(null, true);
}
// 启动所有核心线程
public int prestartAllCoreThreads() {
    int n = 0;
    while (addWorker(null, true))
        ++n;
    return n;
}

提交任务

任务提交方式

ThreadPoolExecutor提供了两个主要的方法来提交任务到线程池执行:

  1. void execute(Runnable command) :Executor接口的方法,没有返回值,也不抛出任何已检查的异常
  2. <T> Future<T> submit(Callable<T> task):返回一个结果,并可以抛出已检查的异常
  3. Future<?> submit(Runnable task)
  4. <T> Future<T> submit(Runnable task, T result)

这两个方法的主要区别体现在它们如何处理任务和返回结果上。所有的submit方法内部都是调用execute方法进行任务提交

public Future<?> submit(Runnable task) { // Callable的重载方法也一样
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
     if (task == null) throw new NullPointerException();
     RunnableFuture<T> ftask = newTaskFor(task);
     execute(ftask);
     return ftask;
 }

public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    // FutureTask
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

submit和execute的区别

异常处理:

  1. 对于 execute() 方法,如果任务在执行过程中抛出了异常,并且你没有为任务设置特定的异常处理器(通过 Thread.UncaughtExceptionHandler),那么这个异常将传递给线程池的默认未捕获异常处理器(如果有的话),否则它将被忽略。
  2. 对于 submit() 方法,如果 Callable 任务在执行过程中抛出了异常,这个异常将被封装到一个 ExecutionException 中,并在调用Future.get()时才抛出。

任务取消

  1. 对于通过execute()提交的任务,你可以使用Future.cancel(boolean mayInterruptIfRunning)方法来尝试取消任务,但这个方法并不保证能够成功取消任务。此外由于execute()不返回 Future 对象,无法直接获取该 Future 对象来调用 cancel() 方法。
  2. 对于通过 submit() 提交的任务,你可以获取返回的 Future 对象并调用其 cancel() 方法来尝试取消任务。如果任务已经开始执行,并且 mayInterruptIfRunning 参数为 true,那么线程将尝试中断任务。

任务提交流程

线程通过execute方法提交了一个任务的流程如下:

  1. 首先会去判断当前线程池的线程数是否小于核心线程数,如果小于,那么就直接通过 ThreadFactory 创建一个线程来执行这个任务,当任务执行完之后,创建的这个线程由于是核心线程不会退出,而是会去阻塞队列中获取任务。接下来如果又提交了一个任务,也会按照上述的步骤去判断是否小于核心线程数,如果小于,还是会创建线程来执行任务,执行完之后也会从阻塞队列中获取任务。这里有个细节,就是提交任务的时候,就算有线程池里的线程从阻塞队列中获取不到任务,如果线程池里的线程数还是小于核心线程数,那么依然会继续创建线程,而不是复用已有的线程。
  2. 如果线程池里的线程数不再小于核心线程数,那么此时就会尝试将任务放入阻塞队列中,入队成功之后,核心线程中阻塞的线程就可以获取到任务了。
  3. 随着任务越来越多,队列已经满了,此时会判断当前线程池里的线程数是否小于最大线程数,也就是入参时的 maximumPoolSize 参数。如果小于最大线程数,那么也会创建非核心线程来执行提交的任务,所以就算队列中有任务,新创建的线程还是会优先处理这个提交的任务,而不是从队列中获取已有的任务执行,所以先提交的任务不一定先执行。
  4. 假如线程数已经达到最大线程数量,此时就会执行拒绝策略,也就是构造线程池的时候传入的RejectedExecutionHandler对象来处理这个任务。

在这里插入图片描述

execute
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    // 工作线程数量是否小于核心线程
    if (workerCountOf(c) < corePoolSize) {
    	// 添加核心线程
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 判断线程池是否在运行,并且工作队列是否已满
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
        	// 执行拒绝策略
            reject(command);
        else if (workerCountOf(recheck) == 0)
        	// 如果队列为空,添加非核心线程执行
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
    	// 执行拒绝策略
        reject(command);
}
addWorker

firstTask 表示要添加的任务执行逻辑,第二个参数表示是否是核心线程

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:  // 这个是Java的 goto 语法
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry; // 退出循环
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask); // 创建Worker
        final Thread t = w.thread;
        if (t != null) {
        	// 加锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get()); // 重新检查,保证可见性

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
            	// 启动线程,执行run方法
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

Worker队列

ThreadPoolExecutor中存有所有Worker的集合,这里使用的是非线程安全的HashSet,需要配合ThreadPoolExecutor中的mainLock锁使用

private final HashSet<Worker> workers = new HashSet<Worker>();

Worker类是对线程和任务的一个封装,实现了 Runnable, 它基于AQS队列来实现同步

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable {
    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;
}

创建Worker时就会调用ThreadFactory创建线程

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

这里可以看到 Worker 才是作为 Thread 的 Runnable 来使用的,也就是说 Thread#start 执行的是 Worker#run 方法

线程运行 runWoker

创建Worker时的Runable会作为第一个任务被线程执行

public void run() {
    runWorker(this);
}

final void runWorker(Worker w) {
	// 运行Worker的这个线程
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
    	// 创建worker时如果任务为null,则调用getTask()获取任务
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
    	// 执行到这里表示任务队列中没了任务,或者线程池关闭了
    	// 此时需要将worker从缓存冲清除
        processWorkerExit(w, completedAbruptly);
    }
}

为什么 runWoker 要加锁?

Worker和线程之间都是一对一的关系,除了下面这里,runWorker访问的都是Woker自身的数据,而且下图这里也都是读操作,为什么要加锁呢?

在这里插入图片描述

原因就得从这个条件入手了,它是判断当前线程池是否停止或者当前线程是否被中断

(runStateAtLeast(ctl.get(), STOP) ||
       (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) 
		&& !wt.isInterrupted()

如果条件满足,就会中断当前Worker这个线程,并且这里是Worker这个线程自己中断自己,中断后下面还是会继续执行的

在这里插入图片描述

这里主要是针对线程池关闭的场景进行并发控制,后面再讨论

中断后线程还是会继续执行

可以根据下面的代码试一下:

public class Test2 {
    static final AtomicReference<Thread> ar = new AtomicReference<>();
    public static void main(String[] args) {
        Thread t = new Thread(new Runnable() {
            
            public void run() {
                System.out.println(ar.get() == Thread.currentThread());
                Thread.currentThread().interrupt();
                System.out.println("after Thread.currentThread().interrupt()");
            }
        });
        ar.set(t);
        t.start();
		while (true) {}
    }
}

获取任务

线程需要从任务队列中不断地取任务执行,实现生产者和消费者模型。这部分由getTask方法实现,其执行流程如下图所示:

在这里插入图片描述

代码实现如下:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // wc > corePoolSize 表示是非核心线程
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

销毁工作线程

工作线程Worker会不断接收新任务去执行,当接收不到任务的时候,就会开始被回收。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

此方法做了以下几件事

  1. 将Worker从保存的Worker集合中移除
  2. 切换线程池的状态至TERMINATED,一是SHUTDOWN并且线程池和任务队列为空,二是STOP并且线程池为空

线程池关闭

线程池关闭有以下2种方式:

  1. shutdown:停止接收新任务,但会继续执行已经提交的任务(即正在运行和排队的任务)。当所有任务完成后,线程池会完全关闭。
  2. shutdownNow:该方法会立即停止接受新任务,并尝试中断正在执行的任务。会返回一个列表,包含所有未开始执行的任务。对于正在运行的任务,线程池会尝试中断这些任务,但是否成功取决于任务的实现(任务必须检查中断状态并处理)。

shutdown/shutdownNow

两者具体实现如下,主要区别在于interruptIdleWorkers和interruptWorkers方法

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    	// 校验是否有关闭线程池的权限,这里主要通过 SecurityManager 校验
    	// 当前线程与每个 Worker 线程的 “modifyThread” 权限
        checkShutdownAccess();
        // 修改状态为 SHUTDOWN
        advanceRunState(SHUTDOWN);
        // 关闭所有空闲线程
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

interruptIdleWorkers 和 interruptWorkers

interruptIdleWorkers 意为中断所有空闲的线程,interruptWorkers 就是直接中断所有Worker,不管空闲与否了

private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}

// 中断可能正在等待任务的线程
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

// shutdownNow 调用: 直接调用Worker的interruptIfStarted方法
private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}

为什么 shutdown 调用 interruptIdleWorkers 过程加 3 次锁?

从 interruptIdleWorkers 方法可以看到有两次嵌套的锁操作

从外面shutdown可以看到还有一层嵌套的锁

在这里插入图片描述

首先是最外面加锁,是操作 mainLock,这点其实在源码中有说明

在这里插入图片描述

We also hold mainLock on shutdown and shutdownNow, for the sake of ensuring workers set is stable while separately checking permission to interrupt and actually interrupting.

就是保证在执行shutdown时没有其它线程来操作workers这个集合,也就是关闭时不允许继续添加任务了

那么为什么interruptIdleWorkers方法里面还要操作 mainLock 加锁呢?因为 interruptIdleWorkers 这个方法不只在 shutdown 方法里调用,由于 ReentrantLock 可重入,因此在shutdown这个调用里影响不大

在这里插入图片描述

而 tryTerminate 方法被调用的地方就多了,不仅是线程池关闭,添加任务,删除任务时都会调用

在这里插入图片描述

最后就是为什么要在Worker这个AQS队列上继续加锁

先看一下Worker的AQS相关的方法

protected boolean isHeldExclusively() {
    return getState() != 0;
}

// 参数为unused,从命名也可以知道该参数未被使用
protected boolean tryAcquire(int unused) {
	// 通过CAS改变将状态由0改变为1
    if (compareAndSetState(0, 1)) {
    	// 设置当前线程独占
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

protected boolean tryRelease(int unused) {
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
}

public void lock()        { acquire(1); }
public boolean tryLock()  { return tryAcquire(1); }
public void unlock()      { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}

tryLock 在源码中只有在 interruptIdleWorkers 方法里才有调用,而 lock 方法只有在 runWorker 方法里才调用

在这里插入图片描述

tryLock方法的作用在于判断Worker的state是不是0,如果是0,说明这个Worker还没有调用lock方法,那么此时它是一个IdleWorker,也就是可以中断。如果tryLock失败,说明Worker已经执行了lock()方法。此时,Worker在While循环中不断获取阻塞队列的任务执行,并且不能在shutdown()方法中中断。

因此,Worker的状态管理实际上通过状态值(0或1)判断Worker是否处于空闲状态。如果Worker处于空闲状态,则可以在线程池关闭时中断它,否则它必须保持在while循环中才能进入阻塞队列,此时任务将被执行,并且在队列中的任务为空之前不会被释放。

如果Worker正在执行任务,因为是个 while 循环,等到任务执行完成后,会再次调用 getTask 方法,getTask 方法里面会先判断线程池状态,这个时候就能感知到线程池关闭了,返回 null,这个 worker 也就默默的退出了。

在这里插入图片描述

这里我们也可以看到shutdown和shutdownNow的区别,如下图所示:

在这里插入图片描述

shutdownNow 是怎么直接中断Worker,并返回未执行的任务的

void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}

Worker在创建时候状态为 -1,此时调用interruptIfStarted是中断不了的,只能中断 0 或 1 的状态

在这里插入图片描述

那么什么时候不是-1呢?如果不是 -1,也就是lock了一次,或者调用了 unlock。但是前面说过 lock 方法只在 runWorker 里调用,并且调用了lock说明该Worker正在执行,那不是说明 shutdownNow 中断的都是正在执行的任务了?正在执行任务的线程是不应该被中断的

其实runWorker方法提前调用了一次unlock方法,目的就是将状态从-1改为0,可以理解为线程的就绪态,此时是可以被中断的

在这里插入图片描述

问题总结

为什么 Worker 要实现AQS?

这点在Woker类的注释上有说明:
Class Worker mainly maintains interrupt control state for threads running tasks, along with other minor bookkeeping. This class opportunistically extends AbstractQueuedSynchronizer to simplify acquiring and releasing a lock surrounding each task execution. This protects against interrupts that are intended to wake up a worker thread waiting for a task from instead interrupting a task being run. We implement a simple non-reentrant mutual exclusion lock rather than use ReentrantLock because we do not want worker tasks to be able to reacquire the lock when they invoke pool control methods like setCorePoolSize. Additionally, to suppress interrupts until the thread actually starts running tasks, we initialize lock state to a negative value, and clear it upon start (in runWorker).

因为是要一个不能重入的互斥锁,setCorePoolSize 时通过 interruptIdleWorkers 实现减少当前的Worker功能,如果锁是可以重入的,那么可能会中断当前正在执行的Worker

参考资料:

  1. https://mp.weixin.qq.com/s/baYuX8aCwQ9PP6k7TDl2Ww
  2. https://www.cnblogs.com/thisiswhy/p/15493027.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2215464.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

维修数据屏:重塑热力公司运维管理新格局

在热力公司的运维管理中&#xff0c;高效的报修和维修流程是确保系统稳定运行的关键。随着科技的发展&#xff0c;维修数据屏的出现为热力公司的运维工作带来了重大变革。 一、传统热力运维面临的挑战 过去&#xff0c;热力公司在报修和维修方面存在诸多问题&#xff0c;给运维…

基于Java的超市管理系统(源码+定制+解答)

博主介绍&#xff1a; ✌我是阿龙&#xff0c;一名专注于Java技术领域的程序员&#xff0c;全网拥有10W粉丝。作为CSDN特邀作者、博客专家、新星计划导师&#xff0c;我在计算机毕业设计开发方面积累了丰富的经验。同时&#xff0c;我也是掘金、华为云、阿里云、InfoQ等平台…

创建包含可导入浏览器信任的SSL自签名证书

问题&#xff1a;现在的三大浏览器&#xff0c;chrome、edge、firefox 一般都默认启用https检查&#xff0c;这就要求我们自建的局域网内的网址和其他诸如nextcloud、photoprism、tiddlywiki等应用也必须要有证书。解决方法是使用openssl自己生成一个。由此则会再衍生出一个问题…

哪款宠物空净运行吸毛好、噪音小?希喂、霍尼韦尔、安德迈测评!

作为宠物领域目前最火热的产品&#xff0c;宠物空气净化器的讨论度一直很高。身为铲屎官的我在产品刚出的时候就购入了一台&#xff0c;结果让我非常失望&#xff01; 抛开产品效果不提&#xff0c;它运行起来的声音实在太大了&#xff01;我家猫根本不愿意靠近&#xff0c;每…

定焦镜头可以改变焦距吗?

1、问题背景 焦距是镜头的一个固有光学特性&#xff0c;和镜头设计相关&#xff0c;所谓定焦镜头&#xff0c;焦距肯定是固定不变的。 但有个问题一直有点疑惑&#xff0c;焦距是镜头中心到焦点的距离&#xff0c;当我们拧动镜头调焦的过程&#xff0c;就是为了使得焦点成像在传…

信息安全保障人员认证(CISAW)全攻略

由中国网络安全审查认证和市场监管大数据中心耗时六年&#xff0c;汇聚业界专家、企业翘楚、高校及研究机构学者共同精心打磨而成的针对信息安全保障的不同专业技术方向、应用领域和保障岗位&#xff0c;依循国际标准 ISO/IEC 17024《人员认证机构通用要求》所构建的、多层次的…

LabVIEW提高开发效率技巧----减少UI更新频率

在LabVIEW开发中&#xff0c;图形化用户界面&#xff08;UI&#xff09;的更新频率对程序的响应速度有着显著影响。频繁的UI更新会占用大量资源&#xff0c;导致系统性能下降。本文将详细介绍如何通过减少UI更新频率来提升LabVIEW程序的运行效率&#xff0c;从多个角度进行分析…

TCP/UDP通信协议

TCP通讯时序 下图是一次TCP通讯的时序图。TCP连接建立断开。包含大家熟知的三次握手和四次挥手。 在这个例子中&#xff0c;首先客户端主动发起连接&#xff08;connet&#xff09;、发送请求&#xff0c;然后服务器端响应请求&#xff0c;然后客户端主动关闭连接。两条竖线表…

多功能校准仪怎么进行计量校准?

多功能校准仪计量校准是计量行业常会进行的一种校准&#xff0c;因为其多功能校准仪的普遍适用性&#xff0c;以及其计量校准技术也是在行业内比较通用&#xff0c;那么具体多功能校准仪计量校准怎么进行呢&#xff1f; 校准方法 一、在多功能校准仪的输出范围内&#xff0c;布…

观察者模式的思考

观察者模式由来 观察者模式&#xff08;Observer Pattern&#xff09;是一种行为型设计模式&#xff0c;它的起源可以追溯到20世纪90年代初&#xff0c;由设计模式四人帮&#xff08;Erich Gamma, Richard Helm, Ralph Johnson 和 John Vlissides&#xff09;在其著作《设计模…

反走样算法(MSAA、TAA、FXAA、DLSS)

光栅化的采样过程会导致图形走样,走样有很多种形式: 锯齿 摩尔纹 走样的本质原因是采样速度跟不上信号变化的速度 采样频率低,使得我们将连续变化的信号离散化. 反走样方法 anti-alisaing MSAA 多重采样反走样 超采样 优点&#xff1a; 对几何反走样效果良好 缺点…

razor TagHelper 汇总、HtmlHelper 汇总

Tag Helper Tag Helpers 的范围由 addTagHelper 和 removeTagHelper 进行控制&#xff0c;并且 “!” 为退出字符。 addTagHelper *, Microsoft.AspNetCore.Mvc.TagHelpers // 手动高亮 asp-for 》》 Label <label asp-for"userName"></label>》》生…

你的炼丹炉选对GPU卡了吗?

现在抢GPU卡搞智算、搞AI模型训练的都太火了。 无论你是一个游戏爱好者还是一个赛博炼丹师&#xff08;大模型训练&#xff09;&#xff0c;英伟达GPU卡选型都将是绕不过的一道命题。 那么重点来了&#xff0c;如何在琳琅满目的各种型号GPU卡中选取一款合适且性价比高的呢&…

zookeeper实现RMI服务,高可用,HA

这可不是目录 1.RMI原理与说明1.1含义1.2流程1.3rmi的简单实现1.4RMI的局限性 2.zookeeper实现RMI服务&#xff08;高可用、HA&#xff09;2.1实现原理2.2高可用分析2.3zookeeper实现2.3.1代码分析2.3.2公共部分2.3.3服务端2.3.4客户端2.3.5运行与部署2.3.6效果展示与说明 1.RM…

Spring Boot: 构建高效中小型医院网站

1 绪论 1.1研究背景 随着计算机技术的成熟、普及&#xff0c;现代信息技术革命的迅猛发展,正冲击并进而改变着经济和社会结构。信息化的程度已经成为一个国家&#xff0c;一个企业&#xff0c;一个组织仍至一个人发展的基础和竞争成败的关键。 在实际的生活中&#xff0c;用户都…

软件评测CNAS资质获取流程

软件评测实验室如有意向申请 CNAS 检验机构认可&#xff0c;首先需要依据 CNAS 的认可准则建立管理体系&#xff0c;正式运行6个月以上&#xff0c;自我评估满足 CNAS 认可条件后可向 CNAS 提交申请。软件评测实验室CNAS认可的整体流程如图所示&#xff0c;后面的内容针对每个环…

数据结构之单链表详解:从原理到C语言实现

一、 什么是单链表&#xff1f; 单链表&#xff08;Singly Linked List&#xff09;是一种线性数据结构&#xff0c;它的特点是每个节点通过指针链接到下一个节点。不同于顺序表&#xff08;数组&#xff09;&#xff0c;链表的每个元素&#xff08;节点&#xff09;并不存储在…

【简单版】通过 Window.performance 实现前端页面(性能)监控

1 背景 前端监控系统告警xx接口fetchError 问题&#xff1a;前端监控系统没有更多的错误信息&#xff0c;查询该fetch请求对应的接口日志返回200状态码、无请求异常记录&#xff0c;且后台能查到通过该fetch请求成功发送的数据。那是前端页面的错误还是前端监控系统的问题&…

yjs机器学习常见算法01——KNN(1)(K—近邻算法)

1.K—近邻算法 的含义&#xff1a; 简单来说就是通过你的邻居的“类别”&#xff0c;来推测你的“类别” 定义&#xff1a;如果一个样本在特征空间中的k个最相似&#xff08;即特征空间中最临近&#xff09;的样本中大多数属于某一类别&#xff0c;则该样本也属于这个类别。 2.…

【Python爬虫系列】_028.Python玩Redis

课 程 推 荐我 的 个 人 主 页:👉👉 失心疯的个人主页 👈👈入 门 教 程 推 荐 :👉👉 Python零基础入门教程合集 👈👈虚 拟 环 境 搭 建 :👉👉 Python项目虚拟环境(超详细讲解) 👈👈PyQt5 系 列 教 程:👉👉 Python GUI(PyQt5)教程合集 👈👈