Java多线程之:详解ThreadPoolExecutor执行源码分析

news2024/11/25 12:57:02

文章目录

  • 线程池的实现原理
  • 详解ThreadPoolExecutor
    • 核心数据结构
    • 核心配置参数解释
    • 线程池的优雅关闭
      • 线程池的生命周期
      • 正确关闭线程池的步骤
      • shutdown()与shutdownNow()的区别
  • 任务的提交过程分析
  • 任务的执行过程分析
    • shutdown()与任务执行过程综合分析
    • shutdownNow() 与任务执行过程综合分析
  • 线程池的4种拒绝策略
  • 线程池使用案例Case

线程池的实现原理

下图所示为线程池的实现原理:调用方不断地向线程池中提交任务,线程池中有⼀组线程,不断地从队列中取任务,这是⼀个典型的⽣产者—消费者模型。

在这里插入图片描述

要实现这样⼀个线程池,有几个问题需要考虑:

  1. 队列设置多长?如果是无界的,调用方不断地往队列中放任务,可能导致内存耗尽。如果是有界的,当队列满了之后,调用方如何处理?
  2. 线程池中的线程个数是固定的,还是动态变化的?
  3. 每次提交新任务,是放⼊队列?还是开新线程?
  4. 当没有任务的时候,线程是睡眠⼀小段时间?还是进入阻塞?如果进入阻塞,如何唤醒?

针对问题4,有3种做法:

  1. 不使用阻塞队列,只使用⼀般的线程安全的队列,也无阻塞/唤醒机制。当队列为空时,线程池中的线程只能睡眠⼀会儿,然后醒来去看队列中有没有新任务到来,如此不断轮询。
  2. 不使用阻塞队列,但在队列外部、线程池内部实现了阻塞/唤醒机制。
  3. 使用阻塞队列。

很显然,做法3最完善,既避免了线程池内部自己实现阻塞/唤醒机制的麻烦,也避免了做法1的睡眠/轮询带来的资源消耗和延迟。正因为如此,接下来要讲的ThreadPoolExector/ScheduledThreadPoolExecutor都是基于阻塞队列来实现的,而不是⼀般的队列,至此,各式各样的阻塞队列就要派上用场了。

详解ThreadPoolExecutor

核心数据结构

基于线程池的实现原理,下面看⼀下ThreadPoolExector的核心数据结构。

public class ThreadPoolExecutor extends AbstractExecutorService {
	//...
	private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
	// 存放任务的阻塞队列
	private final BlockingQueue<Runnable> workQueue;
	// 对线程池内部各种变量进⾏互斥访问控制
	private final ReentrantLock mainLock = new ReentrantLock();
	// 线程集合
	private final HashSet<Worker> workers = new HashSet<Worker>();
	//...
}

每⼀个线程是⼀个Worker对象。 Worker是ThreadPoolExector的内部类,核心数据结构如下:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
	// ...
	final Thread thread; // Worker封装的线程
	Runnable firstTask; // Worker接收到的第1个任务
	volatile long completedTasks; // Worker执⾏完毕的任务个数
	// ...
}

由定义会发现, Worker继承于AQS,也就是说Worker本身就是⼀把锁。这把锁有什么⽤处呢?用于线程池的关闭、线程执行任务的过程中。

核心配置参数解释

ThreadPoolExecutor在其构造方法中提供了几个核心配置参数,来配置不同策略的线程池。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

上面的各个参数,解释如下:

  1. corePoolSize:在线程池中始终维护的线程个数。
  2. maxPoolSize:在corePooSize已满、队列也满的情况下,扩充线程至此值。
  3. keepAliveTime/TimeUnit: maxPoolSize 中的空闲线程,销毁所需要的时间,总线程数收缩回corePoolSize。
  4. blockingQueue:线程池所用的队列类型。
  5. threadFactory:线程创建工厂,可以自定义,有默认值Executors.defaultThreadFactory() 。
  6. RejectedExecutionHandler: corePoolSize已满,队列已满, maxPoolSize 已满,最后的拒绝策略。

下面来看这6个配置参数在任务的提交过程中是怎么运作的。在每次往线程池中提交任务的时候,有如下的处理流程:

  • 步骤⼀:判断当前线程数是否大于或等于corePoolSize。如果小于,则新建线程执行;如果大于,则进⼊步骤⼆。
  • 步骤⼆:判断队列是否已满。如未满,则放入;如已满,则进⼊步骤三。
  • 步骤三:判断当前线程数是否大于或等于maxPoolSize。如果小于,则新建线程执行;如果大于,则进⼊步骤四。
  • 步骤四:根据拒绝策略,拒绝任务。

总结⼀下:首先判断corePoolSize,其次判断blockingQueue是否已满,接着判断maxPoolSize,最后使用拒绝策略

很显然,基于这种流程,如果队列是无界的,将永远没有机会走到步骤三,也即maxPoolSize没有使用,也⼀定不会走到步骤四。

线程池的优雅关闭

线程池的关闭,较之线程的关闭更加复杂。当关闭⼀个线程池的时候,有的线程还正在执行某个任务,有的调用者正在向线程池提交任务,并且队列中可能还有未执行的任务。因此,关闭过程不可能是瞬时的,而是需要⼀个平滑的过渡,这就涉及线程池的完整生命周期管理。

线程池的生命周期

在JDK 7中,把线程数量(workerCount)和线程池状态(runState)这两个变量打包存储在⼀个字段里面,即ctl变量。如下图所示,最高的3位存储线程池状态,其余29位存储线程个数。而在JDK 6中,这两个变量是分开存储的。

在这里插入图片描述

	private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

由上面的代码可以看到, ctl变量被拆成两半,最高的3位用来表示线程池的状态,低的29位表示线程的个数。线程池的状态有五种,分别是RUNNING、 SHUTDOWN、 STOP、 TIDYING和TERMINATED。

下面分析状态之间的迁移过程,如图所示:

在这里插入图片描述

线程池有两个关闭方法, shutdown()和shutdownNow(),这两个方法会让线程池切换到不同的状态。在队列为空,线程池也为空之后,进入TIDYING 状态;最后执行⼀个钩子方法terminated(),进⼊TERMINATED状态,线程池才真正关闭。

这里的状态迁移有⼀个非常关键的特征:从小到大迁移, -1, 0, 1, 2, 3,只会从小的状态值往大的状态值迁移,不会逆向迁移。例如,当线程池的状态在TIDYING=2时,接下来只可能迁移到TERMINATED=3,不可能迁移回STOP=1或者其他状态。

除 terminated()之外,线程池还提供了其他几个钩子方法,这些方法的实现都是空的。如果想实现自己的线程池,可以重写这几个方法:

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }

正确关闭线程池的步骤

关闭线程池的过程为:在调用 shutdown()或者shutdownNow()之后,线程池并不会立即关闭,接下来需要调用awaitTermination() 来等待线程池关闭。关闭线程池的正确步骤如下:

// executor.shutdownNow();
executor.shutdown();
try {
	boolean flag = true;
	do {
		flag = ! executor.awaitTermination(500, TimeUnit.MILLISECONDS);
	} while (flag);
} catch (InterruptedException e) {
	// ...
}

awaitTermination(…)方法的内部实现很简单,如下所示。不断循环判断线程池是否到达了最终状态
TERMINATED,如果是,就返回。如果不是,则通过termination条件变量阻塞⼀段时间,之后继续判断。

public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                // 判断线程池状态,是否为Termination
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }

shutdown()与shutdownNow()的区别

  1. shutdown()不会清空任务队列,会等所有任务执行完成, shutdownNow()清空任务队列。
  2. shutdown()只会中断空闲的线程, shutdownNow()会中断所有线程。
public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
    	// 加锁,确保线程安全
        mainLock.lock();
        try {
            // 检查是否有关闭线程池的权限
            checkShutdownAccess();
            // 将线程池状态修改为ShutDown
            advanceRunState(SHUTDOWN);
            // 中断空闲线程
            interruptIdleWorkers();
            // 具体空方法体的钩子方法
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
    	// 加锁,确保线程安全
        mainLock.lock();
        try {
            // 检查是否有关闭线程池的权限
            checkShutdownAccess();
            // 将线程池状态设置为STOP
            advanceRunState(STOP);
            // 中断所有线程
            interruptWorkers();
            // 任务队列清空
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

下面看⼀下在上面的代码里中断空闲线程和中断所有线程的区别。

shutdown()方法中的interruptIdleWorkers()方法的实现:

    /**
     * Common form of interruptIdleWorkers, to avoid having to
     * remember what the boolean argument means.
     */
    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }

	private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                // 如果tryLock成功,表示线程处于空闲状态
                // 如果不成功,表示线程持有锁,正在执行某个任务
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

关键区别点在tryLock()⼀个线程在执行⼀个任务之前,会先加锁,这意味着通过是否持有锁,可以判断出线程是否处于空闲状态。 tryLock()如果调用成功,说明线程处于空闲状态,向其发送中断信号;否则不发送。

shutdownNow()调用了 interruptWorkers()方法:

 	private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

 	void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    // 只要线程启动了,并且没有被中断过,则一律中断
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }

通过源码可以发现,shutdownNow()源码内部,通过对工作线程调用interrupt方法强制使工作线程中断。

在上面的代码中, shutdown() 和shutdownNow()都调用了tryTerminate()方法,如下所示:

	final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
            
            // 当workQueue为空, wordCount为0时,执⾏下述代码。
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 将状态切换到到TIDYING状态
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // 调⽤钩⼦函数
                        terminated();
                    } finally {
                        // 将状态由TIDYING改为TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        // 通知awaitTermination(...)
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

tryTerminate()不会强制终止线程池,只是做了⼀下检测:当workerCount为0, workerQueue为空时,先把状态切换到TIDYING,然后调用钩子方法terminated()。当钩子方法执行完成时,把状态从TIDYING 改为 TERMINATED,接着调用termination.sinaglAll(),通知前面阻塞在awaitTermination的所有调用者线程。

所以, TIDYING和TREMINATED的区别是在⼆者之间执⾏了⼀个钩⼦⽅法terminated(),⽬前是⼀个空实现。

任务的提交过程分析

提交任务的方法如下:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
		// 如果当前线程数⼩于corePoolSize,则启动新线程
        if (workerCountOf(c) < corePoolSize) {
			// 添加Worker,并将command设置为Worker线程的第⼀个任务开始执⾏。
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
		// 如果当前的线程数⼤于或等于corePoolSize,则调⽤workQueue.offer放⼊队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
			// 如果线程池正在停⽌,则将command任务从队列移除,并拒绝command任务请求。
            if (! isRunning(recheck) && remove(command))
                reject(command);
			// 放⼊队列中后发现没有线程执⾏任务,开启新线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 线程数⼤于maxPoolSize,并且队列已满,调⽤拒绝策略
		else if (!addWorker(command, false))
        	reject(command);
        }
	
// 该方法⽤于启动新线程。如果第⼆个参数为true,则使⽤corePoolSize作为上限,否则使⽤maxPoolSize作为上限。
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (int c = ctl.get();;) {
			// 如果线程池状态值起码是SHUTDOWN和STOP,或则第⼀个任务不是null,或者⼯作队列为空
			// 则添加worker失败,返回false
        	if (runStateAtLeast(c, SHUTDOWN)
        		&& (runStateAtLeast(c, STOP)
        			|| firstTask != null
        			|| workQueue.isEmpty()))
        		return false;
        for (;;) {
			// 工作线程数达到上限,要么是corePoolSize要么是maximumPoolSize,启动线程失败
        	if (workerCountOf(c)
       			 >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
        		return false;
			// 增加worker数量成功,返回到retry语句
        	if (compareAndIncrementWorkerCount(c))
        		break retry;
        	c = ctl.get(); // Re-read ctl
			// 如果线程池运⾏状态起码是SHUTDOWN,则重试retry标签语句, CAS
        	if (runStateAtLeast(c, SHUTDOWN))
        		continue retry;
			// else CAS failed due to workerCount change; retry inner loop
        	}
        }
		// worker数量加1成功后,接着运⾏:
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
			// 新建worker对象
        	w = new Worker(firstTask);
			// 获取线程对象
			final Thread t = w.thread;
        	if (t != null) {
				final ReentrantLock mainLock = this.mainLock;
				// 加锁
        		mainLock.lock();
        		try {
					// Recheck while holding lock.
					// Back out on ThreadFactory failure or if
					// shut down before lock acquired.
        			int c = ctl.get();
                    if (isRunning(c) ||
        					(runStateLessThan(c, STOP) && firstTask == null)) {
						// 由于线程已经在运⾏中,⽆法启动,抛异常
        				if (t.isAlive()) // precheck that t is startable
        					throw new IllegalThreadStateException();
						// 将线程对应的worker加⼊worker集合
        				workers.add(w);
        				int s = workers.size();
        				if (s > largestPoolSize)
        					largestPoolSize = s;
        				workerAdded = true;
        			}
        		} finally {
					// 释放锁
        			mainLock.unlock();
        		}
				// 如果添加worker成功,则启动该worker对应的线程
        		if (workerAdded) {
        			t.start();
        			workerStarted = true;
        		}
        	}
        } finally {
			// 如果启动新线程失败
        	if (! workerStarted)
				// workCount - 1
        		addWorkerFailed(w);
        	}
        return workerStarted;
    }

任务的执行过程分析

在上面的任务提交过程中,可能会开启⼀个新的Worker,并把任务本身作为firstTask赋给该Worker。但对于⼀个Worker来说,不是只执行⼀个任务,而是源源不断地从队列中取任务执行,这是⼀个不断循环的过程。

下面来看Woker的run()方法的实现过程

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    // 当前Worker对象封装的线程
    final Thread thread;
    // 线程需要运⾏的第⼀个任务。可以是null,如果是null,则线程从队列获取任务
    Runnable firstTask;
    // 记录线程执⾏完成的任务数量,每个线程⼀个计数器
    volatile long completedTasks;
    
    /**
     * 使⽤给定的第⼀个任务并利⽤线程⼯⼚创建Worker实例
     * @param firstTask 线程的第⼀个任务,如果没有,就设置为null,此时线程会从队列获取任务。
     */
    Worker(Runnable firstTask) {
        setState(-1); // 线程处于阻塞状态,调⽤runWorker的时候中断
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    
    // 调⽤ThreadPoolExecutor的runWorker⽅法执⾏线程的运⾏
    public void run() {
        runWorker(this);
    }
}


final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
		// 中断Worker封装的线程
        w.unlock();
        boolean completedAbruptly = true;
        try {
			// 如果线程初始任务不是null,或者从队列获取的任务不是null,表示该线程应该执⾏任务。
        	while (task != null || (task = getTask()) != null) {
				// 获取线程锁
        		w.lock();
				// 如果线程池停⽌了,确保线程被中断
				// 如果线程池正在运⾏,确保线程不被中断
        		if ((runStateAtLeast(ctl.get(), STOP) ||
       				 (Thread.interrupted() &&
        				runStateAtLeast(ctl.get(), STOP))) &&
        			!wt.isInterrupted())
					// 获取到任务后,再次检查线程池状态,如果发现线程池已经停⽌,则给⾃⼰发中断信号
        			wt.interrupt();
        			try {
						// 任务执⾏之前的钩⼦⽅法,实现为空
        				beforeExecute(wt, task);
        				try {
        					task.run();
							// 任务执⾏结束后的钩⼦⽅法,实现为空
        					afterExecute(task, null);
        				} catch (Throwable ex) {
        					afterExecute(task, ex);
        					throw ex;
        				}
        			} finally {
						// 任务执⾏完成,将task设置为null
        				task = null;
						// 线程已完成的任务数加1
                        w.completedTasks++;
						// 释放线程锁
        				w.unlock();
        			}
        		}
				// 判断线程是否是正常退出
        		completedAbruptly = false;
       } finally {
			// Worker退出
        	processWorkerExit(w, completedAbruptly);
        }
    }

shutdown()与任务执行过程综合分析

把任务的执行过程和上面的线程池的关闭过程结合起来进行分析,当调⽤ shutdown()的时候,可能出现以下几种场景:

  1. 当调用shutdown()的时候,所有线程都处于空闲状态。
    这意味着任务队列⼀定是空的。此时,所有线程都会阻塞在 getTask()方法的地方。然后,所有线程都会收到interruptIdleWorkers()发来的中断信号, getTask()返回null,所有Worker都会退出while循环,之后执行processWorkerExit。

  2. 当调用shutdown()的时候,所有线程都处于忙碌状态。

    此时,队列可能是空的,也可能是非空的。 interruptIdleWorkers()内部的tryLock调用失败,什么都不会做,所有线程会继续执行⾃⼰当前的任务。之后所有线程会执行完队列中的任务,直到队列为空, getTask()才会返回null。之后,就和场景1⼀样了,退出while循环。

  3. 当调用shutdown()的时候,部分线程忙碌,部分线程空闲。

    有部分线程空闲,说明队列⼀定是空的,这些线程肯定阻塞在 getTask()方法的地方。空闲的这些线程会和场景1⼀样处理,不空闲的线程会和场景2⼀样处理。

下面看⼀下getTask()方法的内部细节:

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
			// 如果线程池调⽤了shutdownNow(),返回null
			// 如果线程池调⽤了shutdown(),并且任务队列为空,也返回null
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
			// 工作线程数减一
        	decrementWorkerCount();
        	return null;
    	}
            
        int wc = workerCountOf(c);
            
		// Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
        	&& (wc > 1 || workQueue.isEmpty())) {
        	if (compareAndDecrementWorkerCount(c))
        		return null;
        	continue;
        }
            
        try {
			// 如果队列为空,就会阻塞pool或者take,前者有超时时间,后者没有超时时间
			// ⼀旦中断,此处抛异常,对应上⽂场景1。
        	Runnable r = timed ?
        		workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
        		workQueue.take();
        	if (r != null)
        		return r;
        	timedOut = true;
        } catch (InterruptedException retry) {
        	timedOut = false;
        }
    }
}

shutdownNow() 与任务执行过程综合分析

和上面的 shutdown()类似,只是多了⼀个环节,即清空任务队列。如果⼀个线程正在执行某个业务代码,即使向它发送中断信号,也没有用,只能等它把代码执行完成。因此,中断空闲线程和中断所有线程的区别并不是很大,除非线程当前刚好阻塞在某个地方

当⼀个Worker最终退出的时候,会执行清理工作:

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
		// 如果线程正常退出,不会执⾏if的语句,这⾥⼀般是⾮正常退出,需要将worker数量减⼀
        if (completedAbruptly)
            decrementWorkerCount();
        final ReentrantLock mainLock = this.mainLock;
        
        mainLock.lock();
        try {
        	completedTaskCount += w.completedTasks;
			// 将自己的worker从集合移除
        	workers.remove(w);
        } finally {
        	mainLock.unlock();
        }
        
		// 每个线程在结束的时候都会调⽤该⽅法,看是否可以停⽌线程池
        tryTerminate();
        int c = ctl.get();
		// 如果在线程退出前,发现线程池还没有关闭
        if (runStateLessThan(c, STOP)) {
        	if (!completedAbruptly) {
        		int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
				// 如果线程池中没有其他线程了,并且任务队列⾮空
        		if (min == 0 && ! workQueue.isEmpty())
        			min = 1;
				// 如果⼯作线程数⼤于min,表示队列中的任务可以由其他线程执⾏,退出当前线程
        		if (workerCountOf(c) >= min)
        			return; // replacement not needed
        		}
				// 如果当前线程退出前发现线程池没有结束,任务队列不是空的,也没有其他线程来执⾏
				// 就再启动⼀个线程来处理。
        		addWorker(null, false);
        }
    }

线程池的4种拒绝策略

在execute(Runnable command)的最后,调用了reject(command)执行拒绝策略,代码如下所示:

public void execute(Runnable command) {
    //...
	else if (!addWorker(command, false))
    	reject(command);


final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

其中,handler就是我们可以设置的拒绝策略管理器

/**
 * Handler called when saturated or shutdown in execute.
 */
private volatile RejectedExecutionHandler handler;

RejectedExecutionHandler 是⼀个接口,定义了四种实现,分别对应四种不同的拒绝策略,默认是
AbortPolicy

ThreadPoolExecutor类中默认的实现是defaultHandler

    /**
     * The default rejected execution handler
     */
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

四种策略的实现代码如下:

策略1:调用者直接在自己的线程里执行,线程池不处理。

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

策略2:线程池抛异常:

/**
     * A handler for rejected tasks that throws a
     * {@code RejectedExecutionException}.
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            // 直接throw RejectExecutionException异常
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

策略3:线程池直接丢掉任务,神不知鬼不觉:

/**
 * A handler for rejected tasks that silently discards the
 * rejected task.
 */
public static class DiscardPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardPolicy}.
     */
    public DiscardPolicy() { }

    /**
     * Does nothing, which has the effect of discarding task r.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

策略4:删除队列中最早的任务,将当前任务入队列:

/**
     * A handler for rejected tasks that discards the oldest unhandled
     * request and then retries {@code execute}, unless the executor
     * is shut down, in which case the task is discarded.
     */
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

线程池使用案例Case

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutorDemo {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                3,
                5,
                1,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(3),
				// new ThreadPoolExecutor.AbortPolicy()
				// new ThreadPoolExecutor.CallerRunsPolicy()
				// new ThreadPoolExecutor.DiscardOldestPolicy()
                new ThreadPoolExecutor.DiscardPolicy()
        );
        for (int i = 0; i < 20; i++) {
            int finalI = i;
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getId() + "[" + finalI
                            + "] -- 开始");
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getId() + "[" + finalI
                            + "] -- 结束");
                }
            });
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        executor.shutdown();
        boolean flag = true;
        try {
            do {
                flag = !executor.awaitTermination(1, TimeUnit.SECONDS);
        		System.out.println(flag);
        	} while (flag);
        } catch (InterruptedException e) {
        	e.printStackTrace();
        }
        
        System.out.println("线程池关闭成功。。。 ");
        System.out.println(Thread.currentThread().getId());
        }
    }

执行结果如下所示:

12[0] -- 开始
13[1] -- 开始
14[2] -- 开始
15[6] -- 开始
16[7] -- 开始
12[0] -- 结束
12[3] -- 开始
true
13[1] -- 结束
13[4] -- 开始
14[2] -- 结束
14[5] -- 开始
true
15[6] -- 结束
16[7] -- 结束
true
true
true
12[3] -- 结束
13[4] -- 结束
true
14[5] -- 结束
false
线程池关闭成功。。。 
1

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/89243.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【大一大二必看】计算机专业的同学应该参加哪些比赛?

文章目录1. 前言2. ICPC3. CCPC4. 蓝桥杯5. 天梯赛6. CCF CSP7. PAT8. 全国高校计算机能力挑战赛9. 其他&#x1f351; 天池大赛&#x1f351; 华为软件精英挑战赛&#x1f351; LeetCode 周赛 / 双周赛&#x1f351; CSDN 编程竞赛总结1. 前言 2022 年已经过半&#xff0c;对…

java版商城 b2b2c o2o 多商家入驻商城 直播带货商城 电子商务

一个好的SpringCloudSpringBoot b2b2c 电子商务平台涉及哪些技术、运营方案&#xff1f;以下是我结合公司的产品做的总结&#xff0c;希望可以帮助到大家&#xff01; 搜索体验小程序&#xff1a;海哇 1. 涉及平台 平台管理、商家端&#xff08;PC端、手机端&#xff09;、买…

巡检过程中有哪些注意事项?智能巡检了解一下

智能巡检系统是现场过程管理的生产力革命&#xff0c;由人工记录蝶化为掌上电脑运作&#xff0c;适用于设备运行值班记录、仓库/资产管理、设备巡检保养、安全巡更、机房值守、基站维护等一切重复性的工作管理。 安全巡检的目的在于识别信息系统存在的安全脆弱性、分析信息系统…

2022-12-14 移植Qt Creator helloworld 应用到ARM平台运行过程,我这里用buildroot里面的编译器。

一、在ubuntu上运行可执行文件。 1、ubuntu里面安装qt creator 建立helloworld 工程&#xff0c;点击run就可以运行&#xff0c;运行如下。 2、在ubuntu上运行方法二&#xff1a;同级目录下有build-helloworld-Desktop_Qt_5_12_12_GCC_64bit-Debug&#xff0c;用file hellowor…

VS使用技巧汇总

总目录 文章目录总目录前言一、快捷技巧1.代码片段快捷方式2.选择性粘贴3.快速停靠窗口4.多行同步快速编辑5.引用命名空间6.整行上下移动7.规整代码格式二、其他技巧1.其他总结前言 本文会持续收录一些VS的使用技巧&#xff0c;掌握VS一些常用的使用技巧对于提高我们编程效率很…

MA-Net:用于肝脏和肿瘤分割的多尺度注意力网络

摘要 近年来为了提高医学图像分割的性能&#xff0c;提出了大量基于多尺度特征融合的UNet变体。与以往通过多尺度特征融合提取医学图像上下文信息的方法不同&#xff0c;本文提出了一种新的多尺度注意力网格&#xff08;MA-Net&#xff09;在这个网络方法中引入了自注意力机制…

Netty使用篇:自定义编解码器

我们今天还是继续Netty&#xff0c;Netty的编码器和解码器就是Netty对Handler这个组件的一种使用场景而已&#xff0c;SpringWebFlex就是基于这个Netty来做的&#xff0c;在往上引深一层GateWay服务网关就是SpringWebFlex的实现&#xff0c;所以SpringCloud当中明确说明了&…

DPDK源码分析之DPDK基础概览

本文主要介绍一下DPDK这项技术的基础概览&#xff0c;包括什么是DPDK&#xff0c;为什么有它存在的必要&#xff0c;它的框架是怎样的&#xff0c;使用了哪些技术实现&#xff0c;DPDK的应用场景有哪些&#xff0c;最后在centos7服务器上实装一个dpdk环境做一个简单的数据包收发…

C++ VTK鼠标网格表面绘制曲线

程序示例精选 C VTK鼠标表面绘制曲线 如需安装运行环境或远程调试&#xff0c;见文章底部微信名片&#xff0c;由专业技术人员远程协助&#xff01; 前言 C VTK鼠标表面绘制曲线&#xff0c;功能完善&#xff0c;代码整洁&#xff0c;规则&#xff0c;易读。 文章目录 一、所需…

基于Android的招聘求职网站的设计与实现

毕业设计 基于Android的招聘求职网站的设计与实现 1&#xff0e;课题意义及目标 在二十一世纪求职方式跟以前是不同的&#xff0c;与在各个用人单位和招聘会上寻找理想的工作&#xff0c;基于安卓的招聘系统能够提供最好的最丰富及时的招聘信息。。 通过对该系统的研究设计…

【人工智能与机器学习】——决策树与集成学习(学习笔记)

&#x1f4d6; 前言&#xff1a;决策树&#xff08;Decision Tree&#xff09;是一种通过对历史数据进行测算&#xff0c;实现对新数据进行分类和预测的算法。机器学习中&#xff0c;决策树是一个预测模型&#xff0c;代表的是对象属性与对象值之间的一种映射关系。该算法由于逻…

django计算机毕业设计基于安卓Android的移动电商平台系统APP-商品购物商城app

项目介绍 网络的广泛应用给生活带来了十分的便利。所以把移动电商平台与现在网络相结合,利用python技术建设移动电商平台APP,实现移动电商平台的信息化。则对于进一步提高移动电商平台发展,丰富移动电商平台经验能起到不少的促进作用。 移动电商平台APP能够通过互联网得到广泛的…

如何向gitlab发布的附件里上传文件

gitlab 发布后在附件里会有打包好的源码&#xff0c;类似下图 笔者想把构建好的文件也打包放在这个附件里&#xff0c;经过研究可行&#xff0c;步骤分享如下 注&#xff1a;笔者用的gitlab版本为12.10.3 创建Access Token 登录gitlab,点击右上角图像&#xff0c;点击Settin…

Linux基本命令(3)

Linux基本命令(3) &#x1f4df;作者主页&#xff1a;慢热的陕西人 &#x1f334;专栏链接&#xff1a;Linux &#x1f4e3;欢迎各位大佬&#x1f44d;点赞&#x1f525;关注&#x1f693;收藏&#xff0c;&#x1f349;留言 本博客主要讲解了最后一部分常用的Linux指令和一些热…

1年时间,从小公司到美团测试开发,我做对了这些事情....

&#x1f4cc; 博客主页&#xff1a; 程序员二黑 &#x1f4cc; 专注于软件测试领域相关技术实践和思考&#xff0c;持续分享自动化软件测试开发干货知识&#xff01; &#x1f4cc; 公号同名&#xff0c;欢迎加入我的测试交流群&#xff0c;我们一起交流学习&#xff01; 我的…

优化cv2.findContours()函数提取的目标边界点,使语义分割进行远监督辅助标注

优化cv2.findContours()函数提取的目标边界点 假设我们想要提取的目标边界长这样&#xff1a; 我们先使用以下代码查看效果 import cv2 import numpy as np import osif __name__ __main__:# 图像可以选择自己的image_filepath ./landslide/image/20221129112713.png# 读取…

Java并发和多线程编程学习(二) Java内存模型

并发编程中需要处理的两个重要问题是线程之间如何通信以及线程之间如何同步&#xff0c;Java的并发采用的是共享内存模型&#xff0c;且线程之间的通信总是隐式执行&#xff0c;所以需要我们深入学习从而避免复杂的内存可见性问题 内存模型的抽象结构 在Java中&#xff0c;所…

java计算机毕业设计ssm基于H5的音乐播放管理系统

项目介绍 该系统是基于H5,使用Vue、JavaScript、CSS技术开发而成。系统服务器使用Tomcat,利用MySQL存储数据、用JDBC实现数据的访问。管理员在系统部署阶段将所有用户对应权限进行分配。正式投入使用时,用户通过登录模块进入系统。根据权限控制管理,每个用户角色的操作界面也有…

【轻量级开源ROS 的机器人设备(4)】--(2)通信实现

前文链接 【轻量级开源ROS 的机器人设备&#xff08;4&#xff09;】--&#xff08;1&#xff09;通信模块_无水先生的博客-CSDN博客 三、 通信概要 概述 ROS 的通信层是 ros_comm 堆栈的一部分&#xff0c;遵循发布/订阅范式&#xff0c;如图 2.2 所示。网络&#xff0c;也称…

(设计模式) (李建忠 C++) 23种设计模式

文章目录前言组件协作模板方法 Template Method动机模式定义结构代码情景版本1版本2变化原理要点总结个人小结策略模式 Strategy动机模式定义结构代码情景版本1版本2要点总结个人小结观察者模式 Observer动机模式定义结构代码场景版本1版本2版本2要点总结个人小结单一职责装饰模…