Java线程池ThreadPoolExecutor源码阅读

news2024/11/17 10:35:19

文章目录

  • 概述
    • 线程池提交任务流程
    • 线程池提交任务源码阅读
  • 源码阅读
    • 属性字段
    • 工作线程worker
  • 线程池方法
    • runWorker(Worker w) 运行工作线程
    • getTask() 获取任务
    • tryTerminate() 尝试终止线程池
    • interruptWorkers、interruptIdleWorkers 中断工作线程
    • reject(Runnable command) 拒绝任务
    • processWorkerExit 工作线程完成任务后退出
    • 构造函数
    • shutdown shutdownNow 关闭线程池
    • allowCoreThreadTimeOut 允许核心线程超时
    • setMaximumPoolSize 设置最大线程池数量
    • setCorePoolSize 设置核心线程数量
    • setKeepAliveTime 设置空闲线程超时时间
    • remove 移除任务
    • getPoolSize 获取当前工作线程数量
    • RejectedExecutionHandler 拒绝策略

概述

线程池提交任务流程

回顾一下线程池提交任务流程:

    1. 先判断核心线程池是否已经已满,即工作线程数是否大于核心线程数,如果不是则创建核心工作线程执行该任务,否则下一步
    1. 判断工作队列是否已经满了,如果队列没有满,则将该任务入队,否则进入下一步
    1. 判断线程池中的线程是否已经满了,即是否大于最大线程数,如果不是则创建工作线程执行该任务,否则拒绝这个任务

image.png

线程池提交任务源码阅读

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // 工作线程数量
    int c = ctl.get();
    // 1. 线程池先判断核心线程池是否已经已满,即工作线程数是否大于核心线程数,如果不是则创建核心工作线程执行该任务
    if (workerCountOf(c) < corePoolSize) {
        // 新增核心的工作线程
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 2. 判断工作队列是否已经满了,如果队列没有满,则将该任务入队
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 线程池不再运行 或 移除当前线程
        if (! isRunning(recheck) && remove(command))
            // 拒绝当前线程
            reject(command);
        else if (workerCountOf(recheck) == 0)
              // 没有正在工作的线程了 新增一个工作线程去执行任务
            addWorker(null, false);
    }
	// 3. 判断线程池中的线程是否已经满了,即是否大于最大线程数,如果不是则创建工作线程执行该任务,否则拒绝这个任务。
    else if (!addWorker(command, false))
        // 拒绝当前任务
        reject(command);
}

新增工作线程, core参数true新增核心工作线程, false非核心线程工作线程:

    // 新增工作线程
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            // 状态
            int rs = runStateOf(c);

            // 停止 或 没有需要处理的任务 + 队列为空
            // 无需新增
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                // 当前工作线程数量
                int wc = workerCountOf(c);
                // 工作线程大于最大容量失败
                // 根据请求参数判断 
				// 新增核心线程大于最大核心线程数失败
				// 新增非核心线程数,大于最大线程数量也失败
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // workerCount + 1, 成功进入下一步 
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                // 状态变更 重新开始
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 创建工作线程
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
				// 加锁
                mainLock.lock();
                try {
                    // 运行状态
                    int rs = runStateOf(ctl.get());

                    // 运行中状态 或 终止且无要执行的任务
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 添加工作线程
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // 启动
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }


    // 新增工作线程失败,回滚,移除工作线程,数量-1, 尝试终止线程池
    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

源码阅读

JUC包下的工具类,继承了AbstractExecutorService

package java.util.concurrent;

// 继承 AbstractExecutorService
public class ThreadPoolExecutor extends AbstractExecutorService {

属性字段

    // 前3位线程池状态、后29位线程数量
    // 初始化运行中
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));  

    // ctl 后29位记录线程池容量, 32 - 3
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 最大容量 536870911
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

   // RUNNING(-536870912):接受新任务或者处理队列里的任务。 
    private static final int RUNNING    = -1 << COUNT_BITS;
    // SHUTDOWN(0):不接受新任务,但仍在处理已经在队列里面的任务。 
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // STOP(536870912):不接受新任务,也不处理队列中的任务,对正在执行的任务进行中断。 
    private static final int STOP       =  1 << COUNT_BITS;
    // TIDYING(1073741824): 所以任务都被中断,workerCount 是 0,整理状态 
    private static final int TIDYING    =  2 << COUNT_BITS;
    // TERMINATED(1610612736): terminated() 已经完成的时候 
    private static final int TERMINATED =  3 << COUNT_BITS;

    //runState 之间的转变过程: 
    //RUNNING -> SHUTDOWN:调用 shudown(), finalize() 
    //(RUNNING or SHUTDOWN) -> STOP:调用 shutdownNow() 
    //SHUTDOWN -> tryTerminate()  ->  TIDYING -> 此时Workers数量为0
    //STOP -> tryTerminate() -> TIDYING  -> 此时Workers数量为0
    //TIDYING -> TERMINATED -> terminated() 执行完成之后

    // 前3位线程池状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 后29位线程数量
    private static int workerCountOf(int c)  { return c & CAPACITY; }

    // 阻塞队列
    private final BlockingQueue<Runnable> workQueue;

    // 主锁
    private final ReentrantLock mainLock = new ReentrantLock();

    // 工作线程列表
    private final HashSet<Worker> workers = new HashSet<Worker>();


     // 终止条件
    private final Condition termination = mainLock.newCondition();


    // 最大线程池数量
    private int largestPoolSize;

   // 完成任务的数量
    private long completedTaskCount;

    // 线程工厂
    private volatile ThreadFactory threadFactory;

   // 拒绝策略
    private volatile RejectedExecutionHandler handler;

    // 线程空闲保活时间
    private volatile long keepAliveTime;

    // 是否允许核心线程超时
    private volatile boolean allowCoreThreadTimeOut;

   // 核心线程数量
    private volatile int corePoolSize;

   // 最大线程数量
    private volatile int maximumPoolSize;

   // 默认拒绝策略,放弃,抛出异常
   private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
   
   private static final RuntimePermission shutdownPerm =   new RuntimePermission("modifyThread");

   private final AccessControlContext acc;

有关状态和工作线程数量的方法:

  // rs:runState 运行状态  wc:workerCount 线程数量
    private static int ctlOf(int rs, int wc) { return rs | wc; }
   
    // c:当前状态 , 是否小于 s 状态
    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }
    // c:当前状态 , 是否大于等于 s 状态
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }


    // RUNNING:-1   SHUTDOWN:0
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

    // 线程数量+1,成功返回 true、失败false
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }
    // 线程数量-1,成功返回 true、失败false
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }
    // 线程数量-1
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }

    /**
     * CAS 设置 执行状态
     * ctl = targetState + workerCountOf(c)
     */
    private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }

    // 是运行状态 或者 停止,停止 且 shutdownOK 才返回 true
    final boolean isRunningOrShutdown(boolean shutdownOK) {
        int rs = runStateOf(ctl.get());
        return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
    }

工作线程worker

工作线程worker继承了 AQS, 实现了Runnable接口

   // 工作线程
    // 继承了 AQS
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * 序列化id
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Worker中执行的线程,ThreadFactory创建*/
        final Thread thread;

        /** 线程执行的任务,可能为空 */
        Runnable firstTask;

        /**  完成的任务数量 */
        volatile long completedTasks;

        /**
         * 使用来自ThreadFactory的给定第一个任务和线程创建
         */
        Worker(Runnable firstTask) {
            // 设置执行的任务
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            // 执行任务
            runWorker(this);
        }

        // 当前的线程是否已经获取到了同步状态
        // state 0未获取,1获取
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        // 获取信号量, 尝试state从0设置为1
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        // 释放信号量, state设置为0
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }


        // 中断线程
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

线程池方法

runWorker(Worker w) 运行工作线程

  /**
     * 运行Worker
     */
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        // 异常完成
        boolean completedAbruptly = true;
        try {
             // 如果task为null,且尝试获取任务也为null,结束循环
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                // (线程池开始停止 或者 (线程中断 且 线程池开始停止)) 且 wt 没有中断 
                // Thread.interrupted()是一个静态方法, 用于判断当前线程是否被中断, 并清除中断标志位
                if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 留给子类
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 留给子类
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }


getTask() 获取任务

    // 获取任务
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 如果当前状态不是运行中 或者 STOP、TIDYING、TERMINATED 阻塞队列为空
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // 工作线程-1
                decrementWorkerCount();
                return null;
            }


            // 工作线程
            int wc = workerCountOf(c);

            // Are workers subject to culling?
            // 允许核心线程超时 或者 工作线程数量大于核心线程数
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;


            // 1. 工作线程大于最大线程数 或 (timed 且 超时)
            // 2. 工作线程数量>1 或者 工作队列为空
            // 1 且 2
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // 从阻塞队列取出任务,timed 则有时间限制的取出
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

tryTerminate() 尝试终止线程池

/**
  * 尝试终止线程池
  */
final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        // 如果还在运行中 或 已经TERMINATED 或 SHUTDOWN 还有工作线程
        // 无法 treminate
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 设置为 TIDYING
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    // terminated 方法之后 状态从 TIDYING 修改为 TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

interruptWorkers、interruptIdleWorkers 中断工作线程

interruptIdleWorkers 中断空闲的工作线程
interruptIdleWorkers 需要去获取锁, 如果任务正在执行中是获取不到锁的, 不会中断正在执行的任务

    /**
     * 中断所有的工作线程
     */
    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                // 中断已经运行的任务
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

    // 中断Workers
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

    // 中断所有Workers
    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }

    private static final boolean ONLY_ONE = true;

reject(Runnable command) 拒绝任务

    // 拒绝任务
    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }

processWorkerExit 工作线程完成任务后退出

    // 工作线程完成任务后处理
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 意外完成, 工作线程数量-1
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            // 移除工作线程
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            // 如果不是意外退出
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

构造函数

   /**
     * 构造
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    /**
     * 构造
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
  
   /**
     * 构造
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

    /**
     * 构造
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

shutdown shutdownNow 关闭线程池

    /**
     * 关闭
     */
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

    /**
     * 现在关闭
     */
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

    public boolean isShutdown() {
        return ! isRunning(ctl.get());
    }

    /*
     *是否正在终止
     */
    public boolean isTerminating() {
        int c = ctl.get();
        return ! isRunning(c) && runStateLessThan(c, TERMINATED);
    }


   // 线程池是否已经终止
    public boolean isTerminated() {
        return runStateAtLeast(ctl.get(), TERMINATED);
    }


    // 超时等待线程池终止
    public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }

    /**
     * 结束
     */
    protected void finalize() {
        SecurityManager sm = System.getSecurityManager();
        if (sm == null || acc == null) {
            shutdown();
        } else {
            PrivilegedAction<Void> pa = () -> { shutdown(); return null; };
            AccessController.doPrivileged(pa, acc);
        }
    }

allowCoreThreadTimeOut 允许核心线程超时

    /**
     * 允许核心线程超时
     */
    public void allowCoreThreadTimeOut(boolean value) {
        if (value && keepAliveTime <= 0)
            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
        if (value != allowCoreThreadTimeOut) {
            allowCoreThreadTimeOut = value;
            if (value)
                interruptIdleWorkers();
        }
    }

setMaximumPoolSize 设置最大线程池数量

   /**
     * 设置最大线程池数量
     */
    public void setMaximumPoolSize(int maximumPoolSize) {
        if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
            throw new IllegalArgumentException();
        this.maximumPoolSize = maximumPoolSize;
        if (workerCountOf(ctl.get()) > maximumPoolSize)
            interruptIdleWorkers();
    }

setCorePoolSize 设置核心线程数量

设置设置核心线程数量
如果工作线程大于新的核心线程数量, 中断空闲线程
如果核心线程数量变多了, 新增Math.min(delta, workQueue.size())个工作线程

    public void setCorePoolSize(int corePoolSize) {
        if (corePoolSize < 0)
            throw new IllegalArgumentException();
        int delta = corePoolSize - this.corePoolSize;
        this.corePoolSize = corePoolSize;
        if (workerCountOf(ctl.get()) > corePoolSize)
            interruptIdleWorkers();
        else if (delta > 0) {
            // We don't really know how many new threads are "needed".
            // As a heuristic, prestart enough new workers (up to new
            // core size) to handle the current number of tasks in
            // queue, but stop if queue becomes empty while doing so.
            int k = Math.min(delta, workQueue.size());
            while (k-- > 0 && addWorker(null, true)) {
                if (workQueue.isEmpty())
                    break;
            }
        }
    }

setKeepAliveTime 设置空闲线程超时时间

如果设置的keepAliveTime小于老的keepAliveTime, 中断空闲工作线程

    /**
     * 设置空闲线程超时时间,中断wokers
     */
    public void setKeepAliveTime(long time, TimeUnit unit) {
        if (time < 0)
            throw new IllegalArgumentException();
        if (time == 0 && allowsCoreThreadTimeOut())
            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
        long keepAliveTime = unit.toNanos(time);
        long delta = keepAliveTime - this.keepAliveTime;
        this.keepAliveTime = keepAliveTime;
        if (delta < 0)
            interruptIdleWorkers();
    }

remove 移除任务

    /**
     * 移除任务
     */
    public boolean remove(Runnable task) {
        boolean removed = workQueue.remove(task);
        tryTerminate(); // In case SHUTDOWN and now empty
        return removed;
    }

getPoolSize 获取当前工作线程数量

    /**
     * 工作线程数量
     */
    public int getPoolSize() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Remove rare and surprising possibility of
            // isTerminated() && getPoolSize() > 0
            return runStateAtLeast(ctl.get(), TIDYING) ? 0
                : workers.size();
        } finally {
            mainLock.unlock();
        }
    }

RejectedExecutionHandler 拒绝策略

 /**
     * 只用调用者所在线程来运行任务
     */
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        public CallerRunsPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

    /**
     * 抛出异常RejectedExecutionException
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

    /**
     * 默默丢弃拒绝的任务
     */
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

    /**
     * 丢弃队列最近的一个任务,并执行当前任务
     */
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1506476.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数组名的理解,看这一篇就够了!!!

&#xff01;&#xff01;&#xff01;以下是会涉及到的知识的讲解&#xff1a; 一&#xff1a;数组名的理解&#xff1a; 数组名是数组首元素的地址&#xff0c;但是有2个例外&#xff1a; 1. sizeof(数组名)&#xff0c;这里的数组名表示整个数组&#xff0c;计算的是整个…

LeetCode59:螺旋矩阵Ⅱ

题目描述 给你一个正整数 n &#xff0c;生成一个包含 1 到 n2 所有元素&#xff0c;且元素按顺时针顺序螺旋排列的 n x n 正方形矩阵 matrix 。 示例 1&#xff1a; 输入&#xff1a;n 3 输出&#xff1a;[[1,2,3],[8,9,4],[7,6,5]] 代码 class Solution { public:vector…

查看pip当前关联python版本及位置

好久没用python了&#xff0c;把各种pip指向的环境忘光光啦&#xff0c;这里记录一下查看pip当前关联的python版本及位置的方法&#xff1a; pip -V结果&#xff1a; 我一般不用这个版本的python&#xff0c;去环境变量看了一下&#xff0c;原来是anaconda的Scripts自带pip&a…

Vue class和style绑定:动态美化你的组件

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

EB tersos 24.0.1 添加MCU模块失败

1、问题&#xff1a; 新建工程&#xff0c;添加MCU模块总是失败&#xff0c;错误信息如下&#xff1a; 2、解决方案 创建工程时只保留Resource模块&#xff0c;直接点击Finish&#xff0c;其他模块之后再添加 在工程创建成功后再单独添加需要的模块

StableDiffusion3 官方blog论文研究

博客源地址&#xff1a;Stable Diffusion 3: Research Paper — Stability AI 论文源地址&#xff1a;https://arxiv.org/pdf/2403.03206.pdf Stability.AI 官方发布了Stable diffusion 3.0的论文研究&#xff0c;不过目前大家都沉浸在SORA带来的震撼中&#xff0c;所以这个水…

力扣530. 二叉搜索树的最小绝对差

思路1&#xff1a;中序遍历&#xff0c;递归排序成有序数组&#xff1b;因为是有序&#xff0c;只需要求相邻两个值的最小差值。 class Solution {ArrayList <Integer> list new ArrayList();int ans 100001;//题目最大 100000public int getMinimumDifference(TreeNo…

docker学习笔记——Dockerfile

Dockerfile是一个镜像描述文件&#xff0c;通过Dockerfile文件可以构建一个属于自己的镜像。 如何通过Dockerfile构建自己的镜像&#xff1a; 在指定位置创建一个Dockerfile文件&#xff0c;在文件中编写Dockerfile相关语法。 构建镜像&#xff0c;docker build -t aa:1.0 .(指…

异步编程实战:使用C#实现FTP文件下载及超时控制

博客标题: 异步编程实战&#xff1a;使用C#实现FTP文件下载及超时控制 如果你的函数不是async&#xff0c;你仍然可以实现相同的超时功能&#xff0c;但你将不得不依赖更多的同步代码或使用.Result或.GetAwaiter().GetResult()来阻塞等待任务完成&#xff0c;这可能导致死锁的风…

Breach-2.1

靶场环境说明 该靶场是静态IP地址&#xff0c;需要更改网络配置&#xff0c;攻击机kali做了两张网卡&#xff1b; 信息收集 # nmap -sT --min-rate 10000 -p- 192.168.110.151 -oN port.nmap Starting Nmap 7.94 ( https://nmap.org ) at 2024-02-09 10:47 CST Stats: 0:00:…

java通过poi-tl生成word

我看公司之前做电子合同&#xff0c;使用TIBCO jaspersoft做的报表模板&#xff0c;如果是给自己公司开发或者给客户做项目&#xff0c;这个也没有什么&#xff0c;因为反正模板是固定的&#xff0c;一次性开发&#xff0c;不用担心后续的问题。即使后期有调整&#xff0c;改一…

深入解读 Elasticsearch 磁盘水位设置

本文将带你通过查看 Elasticsearch 源码来了解磁盘使用阈值在达到每个阶段的处理情况。 跳转文章末尾获取答案 环境 本文使用 Macos 系统测试&#xff0c;512M 的磁盘&#xff0c;目前剩余空间还有 60G 左右&#xff0c;所以按照 Elasticsearch 的设定&#xff0c;ES 中分片应…

总结:Spring创建Bean循环依赖问题与@Lazy注解使用详解

总结&#xff1a;Spring创建Bean循环依赖问题与Lazy注解使用详解 一前提知识储备&#xff1a;1.Spring Bean生命周期机制&#xff08;IOC&#xff09;2.Spring依赖注入机制&#xff08;DI&#xff09;&#xff08;1&#xff09;Autowired注解标注属性set方法注入&#xff08;2&…

面具安装LSP模块时提示 Unzip error错误的解决办法

面具(Magisk Delta)安装LSP模块时提示 Unzip error错误的解决办法 ​​ 如果前面的配置都正常的话&#xff0c;可能是LSP版本有问题重新去Github下载一个最新版的吧&#xff1b;我是这么解决的。 我安装1.91那个版本的LSP就是死活安装不上&#xff0c;下载了1.92的版本一次就…

Golang-channel合集——源码阅读、工作流程、实现原理、已关闭channel收发操作、优雅的关闭等面试常见问题。

前言 面试被问到好几次“channel是如何实现的”&#xff0c;我只会说“啊&#xff0c;就一块内存空间传递数据呗”…所以这篇文章来深入学习一下Channel相关。从源码开始学习其组成、工作流程及一些常见考点。 NO&#xff01;共享内存 Golang的并发哲学是“要通过共享内存的…

⭐每天一道leetcode:83.删除排序链表中的重复元素(简单;链表遍历、删除经典题目)

⭐今日份题目 给定一个已排序的链表的头 head &#xff0c; 删除所有重复的元素&#xff0c;使每个元素只出现一次 。返回 已排序的链表 。 示例1 输入&#xff1a;head [1,1,2] 输出&#xff1a;[1,2] 示例2 输入&#xff1a;head [1,1,2,3,3] 输出&#xff1a;[1,2,3] …

Linux 进程程序替换

&#x1f493;博主CSDN主页:麻辣韭菜-CSDN博客&#x1f493;   ⏩专栏分类&#xff1a;http://t.csdnimg.cn/G90eI⏪   &#x1f69a;代码仓库:Linux: Linux日常代码练习&#x1f69a;   &#x1f339;关注我&#x1faf5;带你学习更多Linux知识   &#x1f51d;&#x1f5…

考研经验|如何从考研失败中走出来?

对我来说&#xff0c;太丢人了 其实我在本科的时候在同学眼中&#xff0c;一直很优秀&#xff0c;每年奖学金必有我的&#xff0c;国家励志奖学金&#xff0c;国家奖学金&#xff0c;这种非常难拿的奖学金&#xff0c;我也拿过&#xff0c;本科期间学校有一个公费去新西兰留学的…

美化console

console简介 控制台&#xff08;Console&#xff09;是JS开发里最重要的面板&#xff0c;主要作用是显示网页加载过程中产生各类信息,我们经常使用console.log()这个函数在控制台打印一些东西 但是,console这个对象不仅仅有log这个函数,还有很多其他的函数,如下 console.de…

vue学习笔记22-组件传递多种数据类型props效验

组件传递多种数据类型 通过props传递数据&#xff0c;不仅可以传递字符串类型的数据&#xff0c;还可以是其他类型&#xff0c;例如&#xff1a;数字、对象、数组等&#xff0c;但实际上任何类型的值都可以作为props的值被传递&#xff08;即组件与组件之间的传递是没有限制的…