文章目录
- 一、简介
- 1. 背景
- 2. Executor接口介绍
- 二、使用Executors工厂创建线程池
- 1. 简介
- 2. 使用newCachedThreadPool()方法创建无界线程池
- 3. 验证newCachedThreadPool()方法创建的线程池和线程复用特性
- 4. 使用newCachedThreadPool(ThreadFactory)定制线程工厂
- 5. 使用newCachedThreadPool()方法创建无界线程池的缺点
- 6. 使用newFixedThreadPool(int)方法创建有界线程池
- 7. 使用newSingleThreadExecutor()方法创建单一线程池
- 三、ThreadPoolExecutor使用(重点)
- 1. 简介
- 2. 队列LinkedBlockingQueue、ArrayBlockingQueue和SynchronousQueue的基本使用
- 3. ThreadPoolExecutor的构造方法详解
- (1)简介
- (2)使用阻塞队列
- (3)验证1-1
- (4)验证1-2
- (5)验证1-3
- (6)验证2-1
- (7)验证2-2
- (8)验证2-3
- (9)验证3-1
- (10)验证3-2
- (11)验证3-3
- (12)验证3-4
- (13)验证参数keepAliveTime非0的实验
- 4. 方法shutdown()和shutdownNow()
- 5. 其它方法
- (1) isShutdown()方法
- (2) 方法isTerminating()和isTerminated()
- (3) 方法awaitTermination(long timeout,TimeUnit unit)
- (4) 使用ThreadFactory+UncaughtExceptionHandler处理异常
- (5)方法set/getRejectedExceptionHandler()
- (5)方法allowsCoreThreadOut()和allowCoreThreadTimeOut(boolean value)
- (6)方法prestartCoreThread()和prestartAllcoreThreads()
- (7)方法getCompletedTaskCount()
- 6. 线程池ThreadPoolExecutor的拒绝策略
- (1)AbortPolicy策略
- (2)CallerRunsPolicy策略
- (3)DiscardOldestPolicy策略
- (3)DiscardPolicy策略
- 7. 方法afterExcute()和beforeExcute()
- 8. 方法Remove(Runnable)使用
- 9. 常见的get方法以及使用
一、简介
1. 背景
在开发服务端软件项目时,软件经常需要处理执行时间很短而数目巨大的请求,如果为每一个请求创建一个新的线程,则会导致性能上的瓶颈。因为JVM需要频繁的处理线程对象的创建与销毁,如果请求的执行时间很短,则有可能花在创建和销毁线程对象上的时间大于真正执行的时间,所以系统的性能会大幅度降低。JDK 5以上版本提供了对线程池的支持,主要用于支持高并发的访问处理,并且复用线程对象。线程池的核心原理是创建一个“线程池”,在池中对线程对象进行管理,包括创建与销毁,使用池时只需执行具体的任务即可,线程对象的处理都在池中封装了。线程池ThreadPoolExecutor实现类Executor接口,该接口是学习线程池的重点,因为掌握了该接口中的方法也就大致掌握了ThreadPoolExecutor类的主要功能了。
2. Executor接口介绍
在介绍线程池之前,要先了解一下接口java.util.concurrent.Executor,与线程池有关的大部分类都要实现这个接口。源码如下:
public interface Executor {
/**在将来的某个时间执行给定的命令。该命令可以在新线程、池线程或调用线程中执行,由实现自行 Executor 决定。
参数:
command – 可运行的任务
抛出:
RejectedExecutionException – 如果无法接受此任务执行
NullPointerException – 如果命令为空
**/
void execute(Runnable command);
}
ExecutorService 是 Java 中的一个接口,用于管理和控制任务的执行。它继承了 Executor 接口,并扩展了一些额外的方法,提供了更丰富的功能来操作任务执行。ExecutorService 的主要目的是将任务的提交和执行解耦,使得任务的提交者可以专注于任务的创建,而不需要关心任务的具体执行细节。它提供了一种线程池的方式来执行任务,以提高效率并控制资源的使用。通过 ExecutorService,可以将任务提交给线程池进行执行,并且可以获取任务执行的结果。它还提供了一些管理和监控线程池的方法,如关闭线程池、等待任务执行完成、判断线程池是否关闭等,但Executor是接口,并不能直接使用,所以还需要实现类,下图就是继承关系图:
ExecutorService接口是Executor的子接口,在内部添加了比较多的方法,其源代码结构如下:
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
虽然ExcutorService
接口添加了若干个方法的定义,但还是不能实例化,那么就要看它的唯一实现类AbstractExecutorService
,AbstractExecutorService
是 Java 中的一个抽象类,实现了部分 ExecutorService
接口的方法,并提供了一些模板方法,用于方便地创建自定义的 ExecutorService
实现。AbstractExecutorService
提供了默认的实现,可以用作自定义 ExecutorService
的基类。它实现了 shutdown()
、isShutdown()
、isTerminated()
、submit(Runnable)
、submit(Callable)
和 invokeAll()
等方法,并对部分方法提供了通用的实现逻辑。通过继承 AbstractExecutorService
,开发者可以更轻松地创建自定义的 ExecutorService
实现,只需要实现一些抽象方法即可。这些抽象方法包括 execute(Runnable)
、shutdownNow()
、isTerminated()
、newTaskFor(Callable)
、newTaskFor(Runnable, V)
等,开发者需要根据具体需求实现这些方法的逻辑。AbstractExecutorService
还提供了一些模板方法,可以在子类中重写以自定义特定的行为。例如,submit(Runnable, T
) 方法可以在子类中重写以实现特定的返回结果逻辑,invokeAny()
方法可以在子类中重写以自定义选择任务结果的策略。总的来说,AbstractExecutorService
提供了一个可扩展的框架,简化了自定义 ExecutorService
实现的过程,并提供了一些默认的方法实现和模板方法,减少了开发者的工作量。该类的部分源码如下所示:
public abstract class AbstractExecutorService implements ExecutorService {
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
}
所以AbstractExecutorService类同样是不能实例化的,再看一下其子类ThreadPoolExcutor。ThreadPoolExecutor 是 Java 中用于创建和管理线程池的类,它提供了一个灵活和可定制的线程池实现。ThreadPoolExecutor 具有以下特点:
-
线程池的创建和管理:它可以通过构造函数创建一个线程池,指定线程池的核心线程数、最大线程数、空闲线程的存活时间等参数。同时,它提供了方法来动态地调整线程池的大小,以适应不同的工作负载。
-
任务调度和执行:ThreadPoolExecutor 可以接受任务并将其分配给线程池中的线程进行执行。任务可以是 Runnable 或 Callable 类型,可以根据需要提交多个任务。
-
线程池的饱和策略:当线程池的工作队列已满并且线程数达到最大线程数时,ThreadPoolExecutor 提供了几种饱和策略来处理新提交的任务。例如,可以拒绝任务、丢弃任务、或者在调用线程中执行任务等。
-
任务执行的状态和控制:ThreadPoolExecutor 提供了方法来查询线程池的状态,如是否关闭、是否在终止中,以及等待线程池的终止。它还提供了方法来控制线程池的行为,如关闭线程池、等待任务完成等。
-
通过使用 ThreadPoolExecutor,可以实现线程池的复用,避免频繁地创建和销毁线程,提高程序的性能和资源利用率。它能够管理线程的生命周期、调度任务的执行,并提供了灵活的配置选项和监控手段,以适应不同的并发场景。
总结起来,ThreadPoolExecutor 是一个功能强大的线程池实现,提供了线程池的创建、任务调度和执行、线程池状态的管理等功能。它是 Java 并发编程中常用的工具,能够有效地管理线程资源,提高程序的性能和可伸缩性。源代码结构如下:
二、使用Executors工厂创建线程池
1. 简介
Executors 是 Java 中的一个工具类,提供了用于创建各种类型的线程池的静态工厂方法。它简化了线程池的创建过程,并提供了一些常用的线程池配置选项。Executors 类中的静态工厂方法返回 ExecutorService 对象,可以用于提交和执行任务。以下是 Executors 类中一些常用的静态工厂方法:
-
newFixedThreadPool(int nThreads):创建一个固定大小的线程池,其中线程数量为指定的固定值。
-
newCachedThreadPool():创建一个可缓存的线程池,线程池的大小可以根据需要进行自动调整。
-
newSingleThreadExecutor():创建一个单线程的线程池,保证所有任务按照顺序执行。
-
newScheduledThreadPool(int corePoolSize):创建一个定时任务的线程池,可以执行定时任务和周期性任务。
-
newWorkStealingPool():创建一个工作窃取线程池,使用多个队列和工作窃取算法来提高并行性。
这些工厂方法返回的 ExecutorService 对象可以进行任务的提交和执行,提供了统一的接口来管理线程池。使用这些工厂方法可以避免手动配置线程池的繁琐过程,而是使用预定义的配置选项。需要注意的是,Executors 工厂方法返回的 ExecutorService 对象是基于 ThreadPoolExecutor 的具体实现。虽然提供了简化的创建方式,但在某些情况下可能不适用于特定的需求。对于更复杂的线程池配置,可能需要直接使用 ThreadPoolExecutor 类进行手动配置。总的来说,Executors 提供了方便的静态工厂方法来创建不同类型的线程池,简化了线程池的创建过程。它是 Java 并发编程中常用的工具,可以根据需要选择合适的线程池类型和配置选项。
2. 使用newCachedThreadPool()方法创建无界线程池
使用Executors类的newCachedThreadPool()方法创建无界线程池,可以进行线程自动回收,所谓的无界线程池,就是池中存放的线程个数是理论上的最大值,即Integer.MAX_VALUE
public class Main {
public static void main(String[] args) throws InterruptedException {
//创建线程池对象
ExecutorService executorService=Executors.newCachedThreadPool();
//下面创建了两个线程,执行了两个任务
executorService.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println("Runnable1 begin"+System.currentTimeMillis());
Thread.sleep(1000);
System.out.println("A");
System.out.println("Runnable1 end"+System.currentTimeMillis());
}catch (InterruptedException e){
e.printStackTrace();
}
}
});
executorService.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println("Runnable2 begin"+System.currentTimeMillis());
Thread.sleep(1000);
System.out.println("A");
System.out.println("Runnable2 end"+System.currentTimeMillis());
}catch (InterruptedException e){
e.printStackTrace();
}
}
});
}
}
由结果可以知道,A和B几乎是在相同的时间内开始打印的,也就是创建了2个线程,而且两个线程是异步运行的
3. 验证newCachedThreadPool()方法创建的线程池和线程复用特性
public class Main {
public static void main(String[] args) throws InterruptedException {
//创建线程池对象
ExecutorService executorService=Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
executorService.execute(new MyRunnable((" "+(i+1))));
}
Thread.sleep(1000);
System.out.println("");
System.out.println("");
for (int i = 0; i < 5; i++) {
executorService.execute(new MyRunnable((" "+(i+1))));
}
}
}
class MyRunnable implements Runnable{
private String username;
public MyRunnable(String username){
super();
this.username=username;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+" username= "+username+" begin "+System.currentTimeMillis());
System.out.println(Thread.currentThread().getName()+" username= "+username+" end "+System.currentTimeMillis());
}
}
可见线程确实被复用了,但是线程池中的对象只有处于闲置状态时,才可以被复用
4. 使用newCachedThreadPool(ThreadFactory)定制线程工厂
无界线程池中创建线程类的过程是可以定制的,我们可以使用newCachedThreadPool方法来解决这个问题
public class Main {
public static void main(String[] args) throws InterruptedException {
MyThreadFactory myThreadFactory=new MyThreadFactory();
ExecutorService executorService=Executors.newCachedThreadPool(myThreadFactory);
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("我要运行"+System.currentTimeMillis()+" "+Thread.currentThread().getName());
}
});
}
}
//定制自己的线程工厂
class MyThreadFactory implements ThreadFactory{
@Override
public Thread newThread(Runnable r) {
Thread thread=new Thread(r);
thread.setName("定制池中的线程对象的名称:"+Math.random());
return thread;
}
}
上面通过自定义的ThreadFactory接口实现类,实现了线程对象的定制性
5. 使用newCachedThreadPool()方法创建无界线程池的缺点
如果在高并发的情况下,使用newCachedThreadPool方法创建无界线程池极容易造成内存占用率过高,导致内存溢出或者系统运行效率严重下降。原因是newCachedThreadPool可以无限制的创建线程,对线程数量并没有控制,所以容易导致内存过度消耗。这时可以使用有界线程池来限制线程池占用内存的最大空间。
6. 使用newFixedThreadPool(int)方法创建有界线程池
该方法就是用于创建有界线程池的,也就是池中线程的个数可以指定最大数量
public class Main {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService=Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
executorService.execute(new MyRunnable(" "+(i+1)));
}
for (int i = 0; i < 3; i++) {
executorService.execute(new MyRunnable(" "+(i+1)));
}
}
}
class MyRunnable implements Runnable{
private String username;
public MyRunnable(String username){
super();
this.username=username;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+" username=" +username+" begin "+System.currentTimeMillis());
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName()+" username=" +username+" end "+System.currentTimeMillis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
7. 使用newSingleThreadExecutor()方法创建单一线程池
使用该方法可以创建单一的先吃池,单一线程池可以实现以队列的方式来执行任务
public class Main {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService=Executors.newSingleThreadExecutor();
for (int i = 0; i < 3; i++) {
executorService.execute(new MyRunnable(" "+(i+1)));
}
}
}
class MyRunnable implements Runnable{
private String username;
public MyRunnable(String username){
super();
this.username=username;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+" username=" +username+" begin "+System.currentTimeMillis());
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName()+" username=" +username+" end "+System.currentTimeMillis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
三、ThreadPoolExecutor使用(重点)
1. 简介
ThreadPoolExecutor 是 Java 中用于创建和管理线程池的一个类,它是 ExecutorService 接口的具体实现。
ThreadPoolExecutor 提供了一个可定制的线程池实现,可以根据需求进行灵活的配置和管理。通过使用 ThreadPoolExecutor,可以有效地管理线程资源,提高程序的性能和可伸缩性。以下是 ThreadPoolExecutor 的一些重要特点和功能:
-
线程池的大小控制:ThreadPoolExecutor 允许指定线程池的核心线程数、最大线程数以及空闲线程的存活时间。线程池会根据任务的数量和状态自动调整线程的数量,避免创建过多的线程。
-
任务队列的管理:ThreadPoolExecutor 使用一个阻塞队列来保存待执行的任务。当线程池的线程数达到核心线程数时,新提交的任务会被放入任务队列中等待执行。可以选择不同类型的阻塞队列,如无界队列、有界队列或优先级队列。
-
饱和策略的处理:当线程池的工作队列已满并且线程数达到最大线程数时,ThreadPoolExecutor 提供了几种饱和策略来处理新提交的任务。可以选择抛出异常、丢弃任务、丢弃队列中最旧的任务或者在调用线程中执行任务。
-
线程池的状态和监控:ThreadPoolExecutor 提供了方法来获取线程池的状态,如是否关闭、是否在终止中。它还提供了一些监控方法,如获取活动线程数、已完成任务数等,用于监视线程池的性能和运行情况。
-
任务执行的异常处理:ThreadPoolExecutor 允许为任务执行过程中抛出的异常提供统一的处理策略。可以设置一个 UncaughtExceptionHandler 来处理未捕获的异常,或者覆盖 afterExecute 方法来自定义异常处理逻辑。
通过合理配置 ThreadPoolExecutor 的参数,可以根据不同的并发需求创建适合的线程池。它提供了灵活性和可定制性,能够适应不同的任务类型、负载情况和系统资源。总的来说,ThreadPoolExecutor 是一个强大而灵活的线程池实现,提供了对线程池的完全控制。它是 Java 并发编程中常用的工具,用于管理和执行多线程任务,并提供了丰富的配置选项和状态监控功能。
2. 队列LinkedBlockingQueue、ArrayBlockingQueue和SynchronousQueue的基本使用
上面这些并发集合不了解的可以看我这篇博客
3. ThreadPoolExecutor的构造方法详解
(1)简介
ThreadPoolExecutor的构造函数的源码如下:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
- corePoolSize:池中至少要保存的线程数,该属性就是定义corePool核心池的大小
- maximumPoolSize:池中允许的最大线程数,maximumPoolSize包括corePoolSize
- keepAliveTime:当线程数量大于corePoolSize时,在没有超过指定的时间内是不能从线程池中将空闲的线程删除的,如果超过此时间单位,则删除空闲线程,“能删除的空闲的线程”范围是maximumPoolSize~corePoolSize,也就是核心线程之外的线程
- unit:keepAliveTIme参数的时间单位
- workQueue:执行用于保持任务的队列。此队列仅保持由excute方法提交的runnable任务
(2)使用阻塞队列
使用LinkedBlockingQueue队列:特点是只会使用核心池中的线程执行任务
1-1
:如果任务数量小于等于核心线程数量:立即在和性池中创建线程并运行任务,这些任务不会放入LinkedBlockingQueue队列中,构造方法中的maximumPoolSize、keepAliveTime和unit将会失效(因为只有核心线程在工作,且核心线程是不能删除的)1-2
:如果任务数量大于核心线程数量:但小于最大线程数,此时maximumPoolSize、keepAliveTime和unit仍然会失效,并且超出核心线程数的这些线程会放入到LinkedBlockingQueue队列中阻塞,等待执行1-3
: 如果任务数量大于最大线程数:和2-1
一样
使用SynchronousQueue队列
2-1
:如果任务数量小于核心线程数量:立即在核心池中创建线程并运行任务,这些任务不会放入SynchronousQueue队列中,构造方法中的maximumPoolSize、keepAliveTime和unit将会失效2-2
:如果任务数量大于核心线程数,但小于最大线程数,构造方法maximumPoolSize、keepAliveTime和unit有效,并且马上创建最多maximumPoolSize个线程运行这些任务,任务不会放入SynchronousQueue队列,非核心线程的线程在执行完任务后在指定的keepAliveTime时间发生超时后会被清楚,如果在指定keepAliveTime时间内没有完成任务,则会在等这些线程完成任务后立即清除2-3
:如果任务数量大于最大线程数:则最多处理最大线程数的线程,其它任务不再处理,并抛出异常
使用LinkedBlockingQueue(xxxx)队列:其中参数xxxx代表队列的最大存储长度(相当于设置里边界),注意使用有参数的LinkedBlockingQueue队列的执行特点是核心池中的线程和maximumPoolSize-corePoolSize线程有可能一起执行任务,也就是最多执行任务的线程数量就是maximumPoolSize。另外在使用有参LinkedBlockingQueue队列时,执行的流程是先判断核心池大小够不够,如果不够则向队列中存放任务,如果队列放不下则在maximumPoolSize-corePoolSize中创建线程并执行。如果还放不下则抛出异常。
3-1
:如果任务的数量小于核心线程池大小,则任务不入队列,和上面情况一样3-2
:如果任务的数量大于核心线程池的大小,但小于xxxx,则超过核心线程数的线程进队列3-3
:如果任务的数量大于核心线程池的大小,但大于xxxx,且(任务数量-corePoolsize-xxxx)<(maximumPoolSize-corePoolSize),则核心线程池全部运行,且队列存储xxxx个任务,多余的任务在maximumPoolSize-corePoolSize创建并运行3-4
:如果任务的数量大于核心线程池的大小,但大于xxxx,且(任务数量-corePoolsize-xxxx)>(maximumPoolSize-corePoolSize),其它都按照3-4
,但大于maximumPoolSize-corePoolSize)的线程被拒绝执行并出现异常
(3)验证1-1
如果任务数量小于等于核心线程数量:立即在和性池中创建线程并运行任务,这些任务不会放入LinkedBlockingQueue队列中,构造方法中的maximumPoolSize、keepAliveTime和unit将会失效(因为只有核心线程在工作,且核心线程是不能删除的)
public class Main {
public static void main(String[] args) throws InterruptedException {
Runnable runnable=new Runnable() {
@Override
public void run() {
try{
System.out.println(Thread.currentThread().getName()+" run !"+System.currentTimeMillis());
Thread.sleep(1000);
}catch (InterruptedException e)
{
e.printStackTrace();
}
}
};
ThreadPoolExecutor executor=new ThreadPoolExecutor(7,8,9,TimeUnit.SECONDS,new LinkedBlockingDeque());
//6个任务,小于corePoolSize
executor.execute(runnable);
executor.execute(runnable);
executor.execute(runnable);
executor.execute(runnable);
executor.execute(runnable);
executor.execute(runnable);
Thread.sleep(300);
System.out.println("A executor.getPoolCoreSize()="+executor.getCorePoolSize());
System.out.println("A executor.getMaxPoolSize()="+executor.getMaximumPoolSize());
System.out.println("A executor.getPoolSize()="+executor.getPoolSize());
System.out.println("A executor.getQueueSize()="+executor.getQueue().size());
Thread.sleep(10000);
System.out.println("10s后打印结果,此时任务都处理完了");
System.out.println("B executor.getPoolCoreSize()="+executor.getCorePoolSize());
System.out.println("B executor.getMaxPoolSize()="+executor.getMaximumPoolSize());
System.out.println("B executor.getPoolSize()="+executor.getPoolSize());
System.out.println("B executor.getQueueSize()="+executor.getQueue().size());
}
可以发现,所有的输出都满足我们的预期结果
(4)验证1-2
更改1-1中的代码,再加2个线程即可
现在队列中已经有一个元素了
(5)验证1-3
更改1-2中的代码,再加1个线程即可
现在队列中有两个元素了
(6)验证2-1
public class Main {
public static void main(String[] args) throws InterruptedException {
Runnable runnable=new Runnable() {
@Override
public void run() {
try{
System.out.println(Thread.currentThread().getName()+" run !"+System.currentTimeMillis());
Thread.sleep(1000);
}catch (InterruptedException e)
{
e.printStackTrace();
}
}
};
ThreadPoolExecutor executor=new ThreadPoolExecutor(7,8,9,TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
//7个任务,小于corePoolSize
executor.execute(runnable);
executor.execute(runnable);
executor.execute(runnable);
executor.execute(runnable);
executor.execute(runnable);
executor.execute(runnable);
executor.execute(runnable);
Thread.sleep(300);
System.out.println("A executor.getPoolCoreSize()="+executor.getCorePoolSize());
System.out.println("A executor.getMaxPoolSize()="+executor.getMaximumPoolSize());
System.out.println("A executor.getPoolSize()="+executor.getPoolSize());
System.out.println("A executor.getQueueSize()="+executor.getQueue().size());
Thread.sleep(10000);
System.out.println("10s后打印结果,此时任务都处理完了");
System.out.println("B executor.getPoolCoreSize()="+executor.getCorePoolSize());
System.out.println("B executor.getMaxPoolSize()="+executor.getMaximumPoolSize());
System.out.println("B executor.getPoolSize()="+executor.getPoolSize());
System.out.println("B executor.getQueueSize()="+executor.getQueue().size());
}
}
所有输出均符合预期结果
(7)验证2-2
在2-1的基础上,加入一个线程
可以发现coresize是7,而poolsize为8,说明有1个线程加入了maximumPoolSize-corePoolSize中
(8)验证2-3
在2-2的代码基础上加入1个线程
线程数大于maximumPoolSize,多的抛出异常
(9)验证3-1
public class Main {
public static void main(String[] args) throws InterruptedException {
Runnable runnable=new Runnable() {
@Override
public void run() {
try{
System.out.println(Thread.currentThread().getName()+" run !"+System.currentTimeMillis());
Thread.sleep(1000);
}catch (InterruptedException e)
{
e.printStackTrace();
}
}
};
ThreadPoolExecutor executor=new ThreadPoolExecutor(3,6,5,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2));
//7个任务,小于corePoolSize
executor.execute(runnable);
executor.execute(runnable);
Thread.sleep(300);
System.out.println("A executor.getPoolCoreSize()="+executor.getCorePoolSize());
System.out.println("A executor.getMaxPoolSize()="+executor.getMaximumPoolSize());
System.out.println("A executor.getPoolSize()="+executor.getPoolSize());
System.out.println("A executor.getQueueSize()="+executor.getQueue().size());
Thread.sleep(10000);
System.out.println("10s后打印结果,此时任务都处理完了");
System.out.println("B executor.getPoolCoreSize()="+executor.getCorePoolSize());
System.out.println("B executor.getMaxPoolSize()="+executor.getMaximumPoolSize());
System.out.println("B executor.getPoolSize()="+executor.getPoolSize());
System.out.println("B executor.getQueueSize()="+executor.getQueue().size());
}
}
poolcoresize为3,开始只有两个任务核心线程可以全部执行
(10)验证3-2
在3-1的基础上,再加入2个任务
核心线程执行3个任务,还有1个线程放入队列中保存(队列的容量为2)
(11)验证3-3
在3-2的基础上,再加入2个任务
核心线程执行3个任务,队列放两个任务,还有一个任务放到maximumPoolSize-corePoolSize中执行
(12)验证3-4
在3-3的基础上,再加入3个任务
多出1个任务无法处理,系统报错
(13)验证参数keepAliveTime非0的实验
构造方法参数keepAliveTime的意义是使用SynchronousQueue队列,当线程数量大于corePoolSize值时,在没有超出指定时间内是不从线程池中将空闲线程删除的,如果超出此时间则删除,如果keepAliveTime的值为0则任务执行完毕后立即从队列中删除。
public class Main {
public static void main(String[] args) throws InterruptedException {
Runnable runnable=new Runnable() {
@Override
public void run() {
try{
System.out.println(Thread.currentThread().getName()+" run !"+System.currentTimeMillis());
Thread.sleep(1000);
}catch (InterruptedException e)
{
e.printStackTrace();
}
}
};
ThreadPoolExecutor executor=new ThreadPoolExecutor(3,6,5,TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
//7个任务,小于corePoolSize
executor.execute(runnable);
executor.execute(runnable);
executor.execute(runnable);
executor.execute(runnable);
executor.execute(runnable);
Thread.sleep(300);
System.out.println("A executor.getPoolCoreSize()="+executor.getCorePoolSize());
System.out.println("A executor.getMaxPoolSize()="+executor.getMaximumPoolSize());
System.out.println("A executor.getPoolSize()="+executor.getPoolSize());
System.out.println("A executor.getQueueSize()="+executor.getQueue().size());
Thread.sleep(2000);
System.out.println("2s后打印结果");
System.out.println("B executor.getPoolCoreSize()="+executor.getCorePoolSize());
System.out.println("B executor.getMaxPoolSize()="+executor.getMaximumPoolSize());
System.out.println("B executor.getPoolSize()="+executor.getPoolSize());
System.out.println("B executor.getQueueSize()="+executor.getQueue().size());
Thread.sleep(6000);
System.out.println("6s后打印结果");
System.out.println("C executor.getPoolCoreSize()="+executor.getCorePoolSize());
System.out.println("C executor.getMaxPoolSize()="+executor.getMaximumPoolSize());
System.out.println("C executor.getPoolSize()="+executor.getPoolSize());
System.out.println("C executor.getQueueSize()="+executor.getQueue().size());
}
keepAlived参数为5s,由于有5个任务,核心线程处理3个,还有两个在maximumPoolSize-corePoolSize中执行,可以发现poolsize开始为5,6s后变成来3,说明maximumPoolSize-corePoolSize,下面测试keepalive=0
可以发现2s后,Poolsize=0,说明处理完后立即删除了
4. 方法shutdown()和shutdownNow()
方法public void shutdown方法的作用是使当前未执行完的任务继续执行,而队列中未执行完的任务会继续执行,不删除队列的任务,不再运行添加新的任务,同时shutdown方法不会阻塞。public List <Runnable> shutdownNow()方法的作用是使当前未执行完的任务继续执行,而队列中的任务不再执行,删除队列中的任务,不再允许添加新的任务,同时该方法也不会阻塞。
public class Main {
public static void main(String[] args) throws InterruptedException {
Runnable runnable=new Runnable() {
@Override
public void run() {
try{
System.out.println(Thread.currentThread().getName()+" run !"+System.currentTimeMillis());
Thread.sleep(1000);
}catch (InterruptedException e)
{
e.printStackTrace();
}
}
};
ThreadPoolExecutor executor=new ThreadPoolExecutor(3,6,0,TimeUnit.SECONDS,new LinkedBlockingQueue<>());
//7个任务,小于corePoolSize
executor.execute(runnable);
executor.execute(runnable);
executor.execute(runnable);
executor.execute(runnable);
executor.shutdown();
executor.execute(runnable);
System.out.println("main end");
}
}
从运行结果可以知道,程序执行了4个任务。最后一个抛出异常,因为执行了shutdown()方法后不能继续添加任务
public class Main {
public static void main(String[] args) {
try {
MyRunnable myRunnable1=new MyRunnable("A1");
MyRunnable myRunnable2=new MyRunnable("A2");
MyRunnable myRunnable3=new MyRunnable("A3");
MyRunnable myRunnable4=new MyRunnable("A4");
ThreadPoolExecutor poolExecutor=new ThreadPoolExecutor(2,10,30,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
poolExecutor.execute(myRunnable1);
poolExecutor.execute(myRunnable2);
poolExecutor.execute(myRunnable3);
poolExecutor.execute(myRunnable4);
Thread.sleep(10);
List<Runnable> list=poolExecutor.shutdownNow();
for (int i = 0; i < list.size(); i++) {
MyRunnable myRunnable=(MyRunnable) list.get(i);
System.out.println(myRunnable.getUsername()+"任务被取消");
}
System.out.println("main end");
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
class MyRunnable implements Runnable{
private String username;
public MyRunnable(String username){
super();
this.username=username;
}
public String getUsername(){
return this.username;
}
@Override
public void run(){
for (int i = 0; i < Integer.MAX_VALUE/500; i++) {
String newString1=new String();
String newString5=new String();
String newString6=new String();
String newString9=new String();
}
System.out.println(Thread.currentThread().getName()+" 任务完成");
}
}
两个任务被取消了,队列中的。注意如果正在执行的任务中使用if(Thread.currentThread.isInterrupted()==true)和throw new InterruptedException判断任务是否中断,那么在调用shutdown()后任务不会被中断而是继续运行,当调用shutdown方法后,任务会立即中断。
5. 其它方法
(1) isShutdown()方法
该方法是用来判断线程池是否关闭的
public class Main {
public static void main(String[] args) {
try {
MyRunnable myRunnable1=new MyRunnable("A1");
MyRunnable myRunnable2=new MyRunnable("A2");
MyRunnable myRunnable3=new MyRunnable("A3");
MyRunnable myRunnable4=new MyRunnable("A4");
ThreadPoolExecutor poolExecutor=new ThreadPoolExecutor(2,10,30,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
poolExecutor.execute(myRunnable1);
poolExecutor.execute(myRunnable2);
poolExecutor.execute(myRunnable3);
poolExecutor.execute(myRunnable4);
Thread.sleep(1000);
List<Runnable> list=poolExecutor.shutdownNow();
System.out.println("main end");
System.out.println("线程池是否关闭:"+poolExecutor.isShutdown());
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
class MyRunnable implements Runnable{
private String username;
public MyRunnable(String username){
super();
this.username=username;
}
public String getUsername(){
return this.username;
}
@Override
public void run(){
for (int i = 0; i < Integer.MAX_VALUE/500; i++) {
String newString1=new String();
String newString5=new String();
String newString6=new String();
String newString9=new String();
}
System.out.println(Thread.currentThread().getName()+" 任务完成");
}
}
(2) 方法isTerminating()和isTerminated()
- public boolean isTerminating()方法:如果执行程序处于shutdown或shutdownNow之后且正在终止但尚未完全终止的过程中,也就是还有任务在执行则返回true,此方法比喻成门是否正在关闭。
- public boolean isTerminated()方法:如果关闭后所有的线程都已经执行完毕了,则返回true
这两个方法的功能是发出一个关闭线程池的命令,isShutdown方法用于判断关闭线程池的命令发出或未发出。isTerminating方法代表线程池是否正在关闭中,isTerminated则判断是否已经关闭了。
public static void main(String[] args) {
try {
MyRunnable myRunnable1=new MyRunnable("A1");
MyRunnable myRunnable2=new MyRunnable("A2");
MyRunnable myRunnable3=new MyRunnable("A3");
MyRunnable myRunnable4=new MyRunnable("A4");
ThreadPoolExecutor poolExecutor=new ThreadPoolExecutor(2,10,30,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
poolExecutor.execute(myRunnable1);
poolExecutor.execute(myRunnable2);
poolExecutor.execute(myRunnable3);
poolExecutor.execute(myRunnable4);
Thread.sleep(1000);
List<Runnable> list=poolExecutor.shutdownNow();
System.out.println("线程池是否正在关闭中::"+poolExecutor.isTerminating()+" 线程池是否已经关闭成功:"+poolExecutor.isTerminated());
Thread.sleep(2000);
System.out.println("线程池是否正在关闭中::"+poolExecutor.isTerminating()+" 线程池是否已经关闭成功:"+poolExecutor.isTerminated());
System.out.println("main end");
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
class MyRunnable implements Runnable{
private String username;
public MyRunnable(String username){
super();
this.username=username;
}
public String getUsername(){
return this.username;
}
@Override
public void run(){
for (int i = 0; i < Integer.MAX_VALUE/500; i++) {
String newString1=new String();
String newString5=new String();
String newString6=new String();
String newString9=new String();
}
System.out.println(Thread.currentThread().getName()+" 任务完成");
}
}
(3) 方法awaitTermination(long timeout,TimeUnit unit)
该方法的作用是查看在指定的时间内,线程池是否已经终止工作,也就是“最多”等待多少时间后去判断线程是否已经终止工作。如果在指定的时间内,线程池没有调用shutdown方法,则该方法会阻塞timeout的时间,如果指定的时间调用了shutdown方法,则会返回true,如果超过指定时间,该方法会返回false。注意该方法要配置shutdown使用
public class Main {
public static void main(String[] args) {
try {
MyRunnable myRunnable1=new MyRunnable("A1");
MyRunnable myRunnable2=new MyRunnable("A2");
MyRunnable myRunnable3=new MyRunnable("A3");
MyRunnable myRunnable4=new MyRunnable("A4");
ThreadPoolExecutor poolExecutor=new ThreadPoolExecutor(2,10,30,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
poolExecutor.execute(myRunnable1);
poolExecutor.execute(myRunnable2);
poolExecutor.execute(myRunnable3);
poolExecutor.execute(myRunnable4);
Thread.sleep(1000);
//List<Runnable> list=poolExecutor.shutdownNow();
System.out.println("开始时间:"+System.currentTimeMillis());
System.out.println("线程池是否关闭:"+poolExecutor.awaitTermination(10,TimeUnit.SECONDS));
System.out.println("结束时间:"+System.currentTimeMillis());
System.out.println("main end");
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
(4) 使用ThreadFactory+UncaughtExceptionHandler处理异常
有时候需要对线程中创建的线程进行定制化,这时就需要ThreadFactory线程工厂(前面介绍过),如果线程出现异常,可以结合UncaughtExceptionHandler处理
public class Main {
public static void main(String[] args) {
Myrunnable mythread=new Myrunnable();
ThreadPoolExecutor poolExecutor=new ThreadPoolExecutor(2,9999,9999L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
poolExecutor.setThreadFactory(new Mythread());
poolExecutor.execute(mythread);
}
}
class Mythread implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread newThread = new Thread(r);
newThread.setName("jakie:" + new Date());
newThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println("自定义异常处理:" + t.getName() + " " + e.getMessage());
e.printStackTrace();
}
});
return newThread;
}
}
class Myrunnable implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+" "+System.currentTimeMillis());
String abc=null;
abc.indexOf(0);
System.out.println(Thread.currentThread().getName()+" "+System.currentTimeMillis());
}
}
(5)方法set/getRejectedExceptionHandler()
public void set/getRejectedExceptionHandler()方法用于处理任务被拒绝执行时的行为。
public static void main(String[] args) {
// 创建一个线程池,设置拒绝策略为抛出异常
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
// 设置拒绝执行处理器
executor.setRejectedExecutionHandler(getRejectedExceptionHandler());
// 提交任务到线程池
executor.submit(() -> {
try {
Thread.sleep(1000); // 模拟任务执行
System.out.println("Task completed successfully.");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 提交第二个任务,超过线程池容量会触发拒绝执行处理器
executor.submit(() -> {
try {
Thread.sleep(1000); // 模拟任务执行
System.out.println("This task should not be executed due to rejection policy.");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 关闭线程池
executor.shutdown();
}
private static RejectedExecutionHandler getRejectedExceptionHandler() {
return (r, executor) -> {
throw new RejectedExecutionException("Task rejected: " + r.toString());
};
}
}
(5)方法allowsCoreThreadOut()和allowCoreThreadTimeOut(boolean value)
allowsCoreThreadTimeOut ()方法和 allowCoreThreadTimeOut (boolean value)方法是 ThreadPoolExecutor 类中的方法,用于设置核心线程是否允许超时和允许核心线程超时的时间。。
- allowCoreThreadTimeOut(boolean value) 方法用于同时设置核心线程是否允许超时和允许核心线程超时的时间。如果设置为 true,则核心线程在超过指定的超时时间后,如果没有任务需要执行,也会被回收。如果设置为 false,核心线程将一直保持活动状态,不会被回收。
- allowCoreThreadTimeOut() :用于判断核心线程是否有超时销毁的特性
public class Main {
public static void main(String[] args) {
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // 核心线程数
10, // 最大线程数
1, TimeUnit.SECONDS, // 线程空闲超时时间
new LinkedBlockingQueue<>()); // 任务队列
System.out.println(executor.allowsCoreThreadTimeOut());
// 提交任务到线程池
for (int i = 1; i <= 4; i++) {
final int taskId = i;
executor.execute(() -> {
try {
System.out.println("Executing task: " + taskId);
Thread.sleep(2000); // 模拟任务执行
System.out.println("Task completed: " + taskId);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// 关闭线程池
executor.shutdown();
}
private static RejectedExecutionHandler getRejectedExceptionHandler() {
return (r, executor) -> {
throw new RejectedExecutionException("Task rejected: " + r.toString());
};
}
}
(6)方法prestartCoreThread()和prestartAllcoreThreads()
prestartCoreThread() 和 prestartAllCoreThreads() 是 ThreadPoolExecutor 类中的方法,用于预启动核心线程。
-
prestartCoreThread() 方法用于预启动一个核心线程。如果当前线程池中的线程数量少于核心线程数,该方法将启动一个核心线程。
-
prestartAllCoreThreads() 方法用于预启动所有核心线程。如果当前线程池中的线程数量少于核心线程数,该方法将启动所有缺少的核心线程。
这些方法的目的是在开始提交任务之前,提前创建并启动核心线程,以提高任务处理的响应性能。
public class Main {
public static void main(String[] args) {
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // 核心线程数
10, // 最大线程数
1, TimeUnit.SECONDS, // 线程空闲超时时间
new LinkedBlockingQueue<>()); // 任务队列
System.out.println("当前线程数:"+executor.getPoolSize()+"——说明在提交任务之前,不会生成线程,属于懒加载");
// 预启动一个核心线程
executor.prestartCoreThread();
System.out.println("当前线程数:"+executor.getPoolSize()+"——生成一个线程");
// 预启动所有核心线程
executor.prestartAllCoreThreads();
System.out.println("当前线程数:"+executor.getPoolSize()+"——生成全部线程");
// 提交任务到线程池
for (int i = 1; i <= 2; i++) {
final int taskId = i;
executor.execute(() -> {
try {
System.out.println("Executing task: " + taskId);
Thread.sleep(2000); // 模拟任务执行
System.out.println("Task completed: " + taskId);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// 关闭线程池
executor.shutdown();
}
}
当核心线程生成完后,再调用上面的方法是无效的
(7)方法getCompletedTaskCount()
获得已经完成的任务的总数
public class Main {
public static void main(String[] args) throws InterruptedException {
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // 核心线程数
10, // 最大线程数
1, TimeUnit.SECONDS, // 线程空闲超时时间
new LinkedBlockingQueue<>()); // 任务队列
// 提交任务到线程池
for (int i = 1; i <= 2; i++) {
final int taskId = i;
executor.execute(() -> {
try {
System.out.println("Executing task: " + taskId);
Thread.sleep(2000); // 模拟任务执行
System.out.println("Task completed: " + taskId);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
Thread.sleep(3000);
System.out.println("当前完成了多少任务"+executor.getCompletedTaskCount());
// 关闭线程池
executor.shutdown();
}
}
6. 线程池ThreadPoolExecutor的拒绝策略
线程池中的资源全部被占用的时候,对新添加的任务有不同的处理策略,在默认情况下,有四种不同处理方式
(1)AbortPolicy策略
该策略是当任务添加到线程池中被拒绝时,将抛出RejectedExecutionException异常,这也是默认的策略
public class Main {
public static void main(String[] args) throws InterruptedException {
Myrunnable myrunnable1=new Myrunnable();
Myrunnable myrunnable2=new Myrunnable();
Myrunnable myrunnable3=new Myrunnable();
Myrunnable myrunnable4=new Myrunnable();
Myrunnable myrunnable5=new Myrunnable();
Myrunnable myrunnable6=new Myrunnable();
ThreadPoolExecutor poolExecutor=new ThreadPoolExecutor(2,3,5,TimeUnit.SECONDS,new ArrayBlockingQueue<>(2),new ThreadPoolExecutor.AbortPolicy());
poolExecutor.execute(myrunnable1);
poolExecutor.execute(myrunnable2);
poolExecutor.execute(myrunnable3);
poolExecutor.execute(myrunnable4);
poolExecutor.execute(myrunnable5);
poolExecutor.execute(myrunnable6);
}
}
class Myrunnable implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+" "+System.currentTimeMillis());
}
}
(2)CallerRunsPolicy策略
当任务添加到线程池中被拒绝时,会调用线程池的Thread线程对象处理被拒绝的任务,在上面代码的基础上更改策略
可以看到多的任务是Main线程处理的,所以验证会影响程序的效率
(3)DiscardOldestPolicy策略
当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的没有处理的任务,然后即将被拒绝的任务添加到等待队列中
public class Main {
public static void main(String[] args) throws InterruptedException {
Myrunnable myrunnable1=new Myrunnable("A1");
Myrunnable myrunnable2=new Myrunnable("A2");
Myrunnable myrunnable3=new Myrunnable("A3");
Myrunnable myrunnable4=new Myrunnable("A4");
Myrunnable myrunnable5=new Myrunnable("A5");
Myrunnable myrunnable6=new Myrunnable("A6");
ThreadPoolExecutor poolExecutor=new ThreadPoolExecutor(2,3,5,TimeUnit.SECONDS,new ArrayBlockingQueue<>(2),new ThreadPoolExecutor.DiscardOldestPolicy());
poolExecutor.execute(myrunnable1);//A1
poolExecutor.execute(myrunnable2);//A2
poolExecutor.execute(myrunnable3);//A3
poolExecutor.execute(myrunnable4);//A4
poolExecutor.execute(myrunnable5);//A5
poolExecutor.execute(myrunnable6);//A6-被拒绝的线程
}
}
class Myrunnable implements Runnable{
private String name;
public Myrunnable(String name){
super();
this.name=name;
}
@Override
public void run() {
Thread.currentThread().setName(this.name);
System.out.println(Thread.currentThread().getName() + "被执行");
}
}
A3线程没有执行
(3)DiscardPolicy策略
当任务添加到线程池中被拒绝时,线程池将会丢弃被拒绝的线程
7. 方法afterExcute()和beforeExcute()
afterExecute() 和 beforeExecute() 是 Java 中的线程池(ExecutorService)接口中的两个方法。这两个方法允许您在任务执行前后执行自定义的操作。
- beforeExecute() 方法:beforeExecute() 方法在每个任务执行之前被调用。它接受一个 Runnable 参数,表示即将执行的任务。您可以使用这个方法来执行一些准备工作,例如初始化资源或记录任务执行的开始时间。
- afterExecute() 方法:afterExecute() 方法在每个任务执行完成后被调用。它接受三个参数:一个 Runnable 表示已完成的任务,一个 Throwable 表示任务执行过程中发生的异常(如果有),还有一个 Thread 表示执行任务的线程。您可以使用这个方法来进行一些收尾工作,例如释放资源、记录任务执行结果等。
public class Main {
public static void main(String[] args) {
int corePoolSize = 2;
int maxPoolSize = 5;
long keepAliveTime = 1;
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
MyThreadPoolExecutor executor = new MyThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue);
// 提交任务到线程池
for (int i = 1; i <= 5; i++) {
executor.submit(new MyTask(i));
}
// 关闭线程池
executor.shutdown();
}
}
class MyThreadPoolExecutor extends ThreadPoolExecutor {
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
System.out.println("任务开始执行:" + r.toString());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
System.out.println("任务执行完成:" + r.toString());
if (t != null) {
System.out.println("任务执行过程中发生异常:" + t.getMessage());
}
}
}
class MyTask implements Runnable {
private final int taskId;
public MyTask(int taskId) {
this.taskId = taskId;
}
@Override
public void run() {
System.out.println("任务 " + taskId + " 开始执行");
try {
// 模拟任务执行耗时
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务 " + taskId + " 执行完成");
}
}
8. 方法Remove(Runnable)使用
remove(Runnable) 方法的作用是取消尚未开始执行的任务,并尝试从工作队列中移除该任务。如果任务已经在执行或已经完成,则无法移除。使用 remove(Runnable) 方法时,需要注意以下几点:
- 如果任务已经在执行或已经完成,remove(Runnable) 方法将返回 false,表示无法移除任务。
- 如果任务尚未开始执行且成功从工作队列中移除,remove(Runnable) 方法将返回 true。
- 如果任务在工作队列中多次出现,remove(Runnable) 方法只会移除第一个匹配的任务。
class MyTask implements Runnable {
private final int taskId;
public MyTask(int taskId) {
this.taskId = taskId;
}
@Override
public void run() {
System.out.println("任务 " + taskId + " 开始执行");
try {
// 模拟任务执行耗时
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务 " + taskId + " 执行完成");
}
}
public class Main {
public static void main(String[] args) {
int corePoolSize = 2;
int maxPoolSize = 5;
long keepAliveTime = 1;
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue);
// 提交任务到线程池
for (int i = 1; i <= 5; i++) {
executor.submit(new MyTask(i));
}
// 等待一段时间
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 移除指定任务
boolean removed = executor.remove(new MyTask(3));
System.out.println("任务是否被成功移除:" + removed);
// 关闭线程池
executor.shutdown();
}
}
9. 常见的get方法以及使用
- getActiveCount():取得有多少个线程正在执行任务
- getTaskCount():获得有多少任务发送给了线程池
- getLargestPoolSize():返回线程池中曾经最多的线程
public class Main {
public static void main(String[] args) throws InterruptedException {
Myrunnable myrunnable1=new Myrunnable("A1");
Myrunnable myrunnable2=new Myrunnable("A2");
Myrunnable myrunnable3=new Myrunnable("A3");
Myrunnable myrunnable4=new Myrunnable("A4");
Myrunnable myrunnable5=new Myrunnable("A5");
Myrunnable myrunnable6=new Myrunnable("A6");
ThreadPoolExecutor poolExecutor=new ThreadPoolExecutor(2,3,5,TimeUnit.SECONDS,new ArrayBlockingQueue<>(2),new ThreadPoolExecutor.DiscardOldestPolicy());
poolExecutor.execute(myrunnable1);//A1
System.out.println("获得多少线程正在执行任务:"+poolExecutor.getActiveCount());
poolExecutor.execute(myrunnable2);//A2
poolExecutor.execute(myrunnable3);//A3
poolExecutor.execute(myrunnable4);//A4
poolExecutor.execute(myrunnable5);//A5
poolExecutor.execute(myrunnable6);//A6
System.out.println("获得有多少任务发送给了线程池:"+poolExecutor.getTaskCount());
System.out.println("返回线程池中曾经最多的线程:"+poolExecutor.getLargestPoolSize());
}
}
class Myrunnable implements Runnable{
private String name;
public Myrunnable(String name){
super();
this.name=name;
}
@Override
public void run() {
}
}