Executors快速创建线程池的方法
Java通过Executors 工厂提供了5种创建线程池的方法,具体方法如下
方法名 | 描述 |
---|---|
newSingleThreadExecutor() | 创建一个单线程的线程池,该线程池中只有一个工作线程。所有任务按照提交的顺序依次执行,保证任务的顺序性。当工作线程意外终止时,会创建一个新的线程来替代它。适用于需要顺序执行任务且保证任务安全性的场景。 |
newFixedThreadPool(int nThreads) | 创建一个固定大小的线程池,该线程池中的线程数量固定为指定的数量。当有新任务提交时,如果线程池中有空闲线程,则立即使用空闲线程执行任务;如果没有空闲线程,则任务将被放入任务队列等待执行。 |
newCachedThreadPool() | 创建一个缓存线程池,该线程池中的线程数量不固定,可以根据任务的需求动态调整线程数量。空闲线程会被保留一段时间,如果在保留时间内没有任务执行,则这些线程将被终止并从线程池中删除。适用于执行大量短期任务的场景 |
newScheduledThreadPool(int corePoolSize) | 创建一个可调度的线程池,该线程池能够按照一定的调度策略执行任务。除了执行任务外,还可以按照指定的延迟时间或周期性地执行任务。适用于需要按照计划执行任务、定时任务或周期性任务的场景。 |
newWorkStealingPool(int parallelism) | newWorkStealingPool(int parallelism) 方法用于创建一个工作窃取线程池。工作窃取线程池是一种特殊的线程池,它根据一定的调度策略执行任务。除了执行任务外,工作窃取线程池还可以按照指定的延迟时间或周期性地执行任务。 |
ThreadFactory
在学习Executor创建线程池之前,我们先来学习一下
ThreadFactory
是一个接口,用于创建线程对象的工厂。它定义了一个方法newThread
,用于创建新的线程。
在Java中,线程的创建通常通过Thread
类的构造函数进行,但是使用ThreadFactory
可以将线程的创建过程与线程的执行逻辑分离开来。通过自定义的ThreadFactory
,我们可以对线程进行更加灵活的配置和管理,例如指定线程名称、设置线程优先级、设置线程是否为守护线程等。
ThreadFactory
接口只有一个方法:
Thread newThread(Runnable runnable);
该方法接受一个Runnable
对象作为参数,并返回一个新的Thread
对象。
一般情况下,我们可以通过实现ThreadFactory
接口来自定义线程的创建。以下是一个示例的自定义ThreadFactory
实现:
public class MyThreadFactory implements ThreadFactory {
// 自定义线程的名称
private final String namePrefix = "test-async-thread";
private final AtomicInteger threadNumber = new AtomicInteger(1);
public MyThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
}
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable);
thread.setName(namePrefix + "-" + threadNumber.getAndIncrement());
thread.setPriority(Thread.NORM_PRIORITY);
thread.setDaemon(false);
return thread;
}
}
在上面的示例中,MyThreadFactory
实现了ThreadFactory
接口,并通过构造函数传入一个namePrefix
参数,用于指定线程的名称前缀。
在newThread
方法中,首先创建一个新的Thread
对象,并设置线程的名称为namePrefix
加上一个递增的数字。然后,可以根据需要设置线程的优先级、是否为守护线程等属性。
newSingleThreadExecutor() 创建单线程化线程池
该方法用于创建一个单线程化的线程池
,也就是只有一个线程的线程池
。该线程池中只有一个工作线程,它负责按照任务的提交顺序依次执行任务。当有任务提交时,会创建一个新的线程来执行任务。如果工作线程意外终止,线程池会创建一个新的线程来替代它,确保线程池中始终有一个可用的线程。
newSingleThreadExecutor()
方法返回的线程池实例实现了ExecutorService
接口,因此可以使用submit()
方法提交任务并获取Future
对象,或使用execute()
方法提交任务。
该线程池适用于需要顺序执行任务且保证任务之间不会发生并发冲突的场景。由于只有一个工作线程,所以不存在线程间的竞争
和并发问题
,可以确保任务的安全性。
此外,newSingleThreadExecutor()
方法创建的线程池还可以用于任务的异常处理。当任务抛出异常时,线程池会捕获异常并记录或处理异常,避免异常导致整个应用程序崩溃。
需要注意的是,由于该线程池只有一个线程,如果任务执行时间过长
或任务量过大
,可能会导致任务队列堆积
,造成应用程序的性能问题。所以在使用该线程池时,需要根据任务的特性和需求进行适当的评估和调优。
下面我们使用Executors中newSingleThreadExecutor()方法创建一个单线程线程池
/**
* 这里创建的线程是 是Executors.newSingleThreadExecutor() 一样 保证只有一个线程来进行执行 并且按照提交的顺序进行执行
* <pre>
* {@code
* public static ExecutorService newSingleThreadExecutor() {
* return new Executors.FinalizableDelegatedExecutorService (
* new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
* }
* }
* </pre>
*/
@Test
public void test10() {
// 如果有多个任务提交到线程池中,那么这个线程池中的线程会依次执行任务 和
ExecutorService executorService = Executors.newSingleThreadExecutor();
// 创建线程 1
Thread t1 = new Thread(() -> {
logger.error("t1 ----> 开始执行了~");
}, "t1");
// 创建线程 2
Thread t2 = new Thread(() -> {
logger.error("t2 ----> 开始执行了~");
}, "t2");
// 创建线程 3
Thread t3 = new Thread(() -> {
logger.error("t3 ----> 开始执行了~");
}, "t3");
// Executor 这个接口定义的功能很有限,同时也只支持 Runnale 形式的异步任务
// 向线程池提交任务
executorService.submit(t1);
executorService.submit(t2);
executorService.submit(t3);
// 关闭线程池
executorService.shutdown();
}
首先,通过Executors.newSingleThreadExecutor();
创建了一个单线程化的线程池。
然后,创建了三个线程t1、t2和t3,分别用于执行不同的任务。
接着,通过executorService.submit(t1)
将t1线程提交到线程池中进行执行。同样地,也将t2和t3线程提交到线程池中。
最后,通过executorService.shutdown()
关闭线程池。
执行时,可以观察到日志输出的顺序。由于线程池中只有一个线程,所以任务会依次按照提交的顺序进行执行。
需要注意的是,通过线程池执行任务后,线程的名称不再是我们自定义的线程名称,而是线程池的名称(如pool-2-thread-1
)。这是因为具体的执行任务是交给线程池来管理和执行的。
从输出中我们可以看出,该线程池有以下特点:
-
单线程化的线程池中的任务,都是按照提交的顺序来进行执行的。
-
该线程池中的唯一线程存活时间是无限的
-
当线程池中唯一的线程正在繁忙时,新提交的任务会进入到其内部的阻塞队列中,而且阻塞队列的容量是无限的
-
// 这是 newSingleThreadExecutor 一个无参的构造方法 public static ExecutorService newSingleThreadExecutor() { // 创建一个FinalizableDelegatedExecutorService实例,该实例是ExecutorService接口的一个包装类 // 将上面创建的ThreadPoolExecutor实例作为参数传入 // 这样就得到了一个单线程化的线程池 return new FinalizableDelegatedExecutorService ( // 创建一个ThreadPoolExecutor实例,指定参数如下: // corePoolSize: 1,线程池中核心线程的数量为1 // maximumPoolSize: 1,线程池中最大线程的数量为1 // keepAliveTime: 0L,空闲线程的存活时间为0毫秒,即空闲线程立即被回收 // unit: TimeUnit.MILLISECONDS,存活时间的时间单位是毫秒 // workQueue: new LinkedBlockingQueue<Runnable>(),使用无界阻塞队列作为任务队列 new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } // 其中 阻塞队列大小,如果不传容量,默认是整形的最大值 public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }
-
总体来说:
ewSingleThreadExecutor();
适用于按照任务提交次序,一个接一个的执行场景
newFixedThreadPool 创建固定数量的线程池
该方法用于创建一个
固定数量的线程池
,其中唯一的参数是用于设置线程池中线程的数量
newFixedThreadPool
是Executors
类提供的一个静态方法,用于创建一个固定大小的线程池。方法具体如下
public static ExecutorService newFixedThreadPool(int nThreads)
参数
nThreads
表示线程池中的线程数量,即固定的线程数量。线程池中的线程数不会根据任务的多少进行动态调整,即使有空闲线程也不会销毁,除非调用了线程池的shutdown
方法。
newFixedThreadPool
方法返回一个ExecutorService
对象,它是Executor
接口的子接口,提供了更加丰富的任务提交和管理方法。通过创建固定大小的线程池,可以在任务并发量较高且预期的任务数量固定的情况下,提供一定程度的线程复用和线程调度控制。线程池会根据固定的线程数量来创建对应数量的线程,并将任务分配给这些线程进行执行。
线程池的工作原理如下:
- 当有任务提交到线程池时,线程池中的某个线程会被唤醒来执行任务。
- 如果所有线程都在执行任务,新的任务会被放入一个任务队列中等待执行。
- 当任务队列已满时,线程池会根据配置的拒绝策略来处理无法执行的任务。
需要注意的是,由于线程池的大小是固定的,如果任务数量超过线程池的容量,任务会在任务队列中等待执行。这可能会导致任务等待时间增加或任务堆积,进而影响系统的响应性能。因此,在选择线程池大小时,需要根据系统的负载情况和任务特点进行合理的配置。
下面我们通过代码来了解一下 newFixedThreadPool
@Test
public void test18() {
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
executor.submit(new Runnable() {
@Override
public void run() {
// 随机睡觉
timeSleep();
printPoolInfo(executor, "*[耗时线程-1]");
}
});
executor.submit(new Runnable() {
@Override
public void run() {
// 随机睡觉
timeSleep();
printPoolInfo(executor, "*[耗时线程-2]");
}
});
executor.submit(new Runnable() {
@Override
public void run() {
timeSleep();
printPoolInfo(executor, "*[耗时线程-3]");
}
});
for (int i = 0; i < 5; i++) {
int finalI = i;
executor.submit(new Runnable() {
@Override
public void run() {
printPoolInfo(executor, "普通线程");
}
});
}
// 优雅关闭线程池
executor.shutdown();
waitPoolExecutedEnd(executor);
}
// 这个方法用于等待线程全部执行结束
public void waitPoolExecutedEnd(ThreadPoolExecutor executor) {
// 确保主线程完全等待子线程执行完毕
try {
// 等待线程池中的任务执行完毕,最多等待1天
if (!executor.awaitTermination(5, TimeUnit.MINUTES)) {
logger.error("线程池中的任务还未全部执行完毕~");
}
} catch (InterruptedException e) {
logger.error("等待线程池中的任务被中断~");
}
}
这段代码创建了一个固定大小为2的线程池 executor
,并向其中提交了多个任务。其中:
- 通过
Executors.newFixedThreadPool(2)
创建了一个固定大小为2的线程池。 - 向线程池中提交了多个任务,包括两个耗时任务和五个普通任务。
- 每个耗时任务都会执行
timeSleep()
方法进行一段随机时间的睡眠,然后执行printPoolInfo()
方法输出当前线程池的信息。 - 普通任务直接执行
printPoolInfo()
方法输出当前线程池的信息。 - 在所有任务提交完毕后,调用
executor.shutdown()
方法优雅地关闭线程池,并调用waitPoolExecutedEnd(executor)
方法等待线程池中的任务执行完毕。
根据输出的结果 我们可以观察到
- 当第一个耗时任务执行时,线程池中有2个核心线程在工作。
- 当第三个任务执行时,线程池中的任务数量已经达到5个,但是活跃线程数量仍然为2,说明任务正在等待空闲线程来执行。
- 所有任务执行完成后,线程池中的线程数量仍然保持为2,这是因为线程池是一个固定大小的线程池。
- 在任务执行的过程中,线程池的核心线程数量一直为2,线程数量也一直保持在2个,因为线程池的大小是固定的。
固定大小线程池适用于以下场景:
- 资源受限场景:当系统资源受限,无法创建过多线程时,固定大小线程池能够限制线程数量,防止系统资源被耗尽。
- 稳定的任务处理:适用于稳定的任务处理场景,例如批量数据处理、定时任务等,因为固定大小线程池能够保持一定数量的线程长时间运行,避免线程的频繁创建和销毁。
- 控制并发数量:在需要限制并发数量的场景下,固定大小线程池能够控制同时执行的任务数量,防止系统过载。
- 避免资源竞争:固定大小线程池可以避免多个任务争夺系统资源而导致的竞争和性能下降。
- 稳定的性能预测:在需要稳定的性能预测下,固定大小线程池能够提供一致的性能表现,因为线程数量是固定的,可以更好地进行性能测试和预测。
newCachedThreadPool 创建可以缓存的线程池
newCachedThreadPool 用于创建一个
可以缓存的线程池
,如果线程内的某些线程无事可干,那么就会成为空线程,可缓存线程池
,可以灵活回收这些空闲线程
newCachedThreadPool
是Executors
类提供的一个静态方法,用于创建一个缓存型线程池。方法定义如下
public static ExecutorService newCachedThreadPool()
newCachedThreadPool
方法返回一个ExecutorService
对象,它是Executor
接口的子接口,提供了更加丰富的任务提交和管理方法。缓存型线程池会根据需要自动创建和回收线程,线程池的大小可以根据任务的数量自动调整。如果当前没有可用的空闲线程,会创建新的线程来执行任务;如果有空闲线程并且它们在指定的时间内没有执行任务,那么这些空闲线程将会被回收。
使用缓存型线程池的优点是可以根据任务的数量动态调整线程池的大小,以适应不同的负载情况。当任务数量较少时,线程池会减少线程的数量以节省资源;当任务数量增加时,线程池会增加线程的数量以提高并发性。
需要注意的是,由于缓存型线程池的大小是不限制的,它可能会创建大量的线程,如果任务的提交速度超过了线程执行任务的速度,可能会导致系统资源消耗过多,甚至造成系统崩溃。因此,在使用缓存型线程池时,需要根据任务特点和系统资源情况进行合理的配置。
下面我们通过一个案例,来了解一下newCachedThreadPool,但是了解newCachedThreadPool之前,我们先来熟悉一个阻塞队列,这个会在后面的阻塞队列专题中详细介绍,这里只是作为了解
SynchronousQueue
- 无内部存储容量:与其他阻塞队列不同,SynchronousQueue 不存储元素,其容量为零。换句话说,它是一个零容量的队列,用于在线程之间同步传输数据。
- 阻塞队列:作为 BlockingQueue 接口的一个实现,SynchronousQueue 提供了阻塞操作,允许线程在队列的插入和移除操作上进行阻塞等待。
- 匹配插入和移除操作:在 SynchronousQueue 中,每个插入操作必须等待另一个线程的移除操作,反之亦然。换句话说,发送线程必须等待接收线程,而接收线程也必须等待发送线程,才能够完成操作。这样的特性保证了数据的可靠传输,只有在有线程与之匹配时,才会进行数据传输。
- 不支持 peek 操作:由于 SynchronousQueue 内部没有存储元素,因此不能调用 peek 操作。只有在移除元素时才会有元素可供操作。
- 支持公平和非公平模式:SynchronousQueue 可以在构造时指定为公平或非公平模式。在公平模式下,队列会按照线程的到达顺序进行操作;而在非公平模式下,则不保证操作的顺序。
SynchronousQueue 在多线程并发编程中常用于一些特定场景,例如生产者-消费者模式中,用于传输数据的场景,以及一些任务执行器中用于任务的传递等。其特殊的同步机制保证了线程之间数据的可靠传输和同步操作。
基于SynchronousQueue 一个小案例
/**
* 理解SynchronousQueue
* SynchronousQueue,实际上它不是一个真正的队列,因为SynchronousQueue没有容量。与其他BlockingQueue(阻塞队列)不同,
* SynchronousQueue是一个不存储元素的BlockingQueue。只是它维护一组线程,这些线程在等待着把元素加入或移出队列。
* 我们简单分为以下几种特点:
* 内部没有存储(容量为0)
* 阻塞队列(也是blockingqueue的一个实现)
* 发送或者消费线程会阻塞,只有有一对消费和发送线程匹配上,才同时退出。
* (其中每个插入操作必须等待另一个线程的移除操作,同样任何一个移除操作都等待另一个线程的插入操作。因此此队列内部其 实没有任何一个元素,因此不能调用peek操作,因为只有移除元素时才有元素。)
* 配对有公平模式和非公平模式(默认)
*/
@Test
public void test19() throws InterruptedException {
SynchronousQueue<String> queue = new SynchronousQueue<>();
// 我们通过线程内 入队 和 出队 了解下 SynchronousQueue的特性
new Thread(new Runnable() {
@Override
public void run() {
try {
// 入队
logger.error("---- 喜羊羊进锅,沐浴~ ----");
queue.put("喜羊羊!");
logger.error("---- 懒羊羊进锅,沐浴~ ----");
queue.put("懒羊羊!");
logger.error("---- 美羊羊进锅,沐浴~ ----");
queue.put("美羊羊!");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, "t1").start();
new Thread(new Runnable() {
@Override
public void run() {
try {
// 入队
Thread.sleep(5000);
String node1 = queue.take();
logger.error("---- {} 出锅~ ----", node1);
Thread.sleep(10000);
String node2 = queue.take();
logger.error("---- {} 出锅~ ----", node2);
Thread.sleep(5000);
String node3 = queue.take();
logger.error("---- {} 出锅~ ----", node3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, "t2").start();
Thread.sleep(Integer.MAX_VALUE);
}
// 根据 结果可以发现, 三只羊同时进锅,但是一个锅只能容纳一只羊,所以只有一只羊能进锅,其他的羊只能等待,直到锅里的羊出锅,才能进锅
// 也就是说,SynchronousQueue是一个不存储元素的BlockingQueue。只是它维护一组线程,这些线程在等待着把元素加入或移出队列。
// 喜羊羊进锅,然后等待 5s后 喜羊羊出锅,此时美羊羊开始进锅
了解这个阻塞队列后,我们再来了解一下newCachedThreadPool这个线程池,还是通过一个案例来进行了解一下具体用法
/**
* newCachedThreadPool 创建可以缓存的线程池
*/
@Test
public void test17() {
// 创建可以缓存的线程池
// 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
// keepAliveTime为60S,意味着线程空闲时间超过60S就会被杀死(60L)
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
// 这里使用两个线程,异步执行
// 此处 羊羊线程是不休眠的,直接放入线程池
new Thread(() -> {
for (int i = 0; i < 2; i++) {
int finalI = i;
threadPoolExecutor.submit(() -> {
printPoolInfo(threadPoolExecutor, "羊羊线程" + finalI + "-正在执行");
});
}
}, "任务线程-1").start();
// 狼狼线程 此时第一个线程也是不会进行阻塞,此时应该是 三个线程 (两个羊羊+一个狼狼 同时进线程池)
new Thread(() -> {
for (int i = 0; i < 5; i++) {
if (i == 1 || i == 2) {
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
int finalI = i;
threadPoolExecutor.submit(() -> {
printPoolInfo(threadPoolExecutor, "狼狼线程" + finalI + "-正在执行");
});
}
}, "任务线程-2").start();
// 等待全部线程都执行完毕
waitPoolExecutedEnd(threadPoolExecutor);
threadPoolExecutor.shutdown();
}
这段代码的主要逻辑如下
- 使用
Executors.newCachedThreadPool()
创建了一个可缓存的线程池threadPoolExecutor
。这种线程池的特点是,如果线程池长度超过当前任务需求,它会灵活地回收空闲线程;若没有可回收的线程,则会新建线程来处理任务。线程的空闲时间超过60秒后,就会被回收。 - 创建了两个新的线程,分别用于提交任务到线程池中执行。其中,一个线程负责提交羊羊线程,另一个线程负责提交狼狼线程。
- 羊羊线程的任务不会进行阻塞,直接提交到线程池中执行。
- 狼狼线程的任务中,当
i
等于1或2时,会进行5秒的睡眠,模拟任务的耗时操作,然后再提交到线程池中执行。 - 使用
waitPoolExecutedEnd(threadPoolExecutor)
方法等待线程池中的任务执行完毕。 - 在所有任务执行完毕后,调用
threadPoolExecutor.shutdown()
方法关闭线程池。
- 通过结果可以看到每个任务的执行都是间隔5秒执行一次。
- 线程池信息中的线程数量始终为3,这是因为
Executors.newCachedThreadPool()
创建的是一个可缓存的线程池,其最大线程数量为 Integer.MAX_VALUE,因此当任务提交到线程池时,如果没有空闲线程可用,则会新建线程来处理任务,线程数量会一直增加,直到达到设定的最大值。 - 在任务提交时,活跃线程数量始终为3,这是因为每次提交任务时,都有空闲线程可用,所以不需要新建线程,而是直接使用已存在的线程执行任务。
- 狼狼线程的任务会进行5秒的睡眠操作,模拟耗时操作,因此在执行任务期间,线程池中的活跃线程数量会减少,直到任务执行完毕后,线程池会继续维持3个活跃线程数量。
- 在执行完所有任务后,线程池并不会立即关闭,因为线程池是可缓存的,会等待一段时间后空闲线程自动被回收。
应用场景:
- 短期任务处理:适用于处理大量短期任务的场景,因为它能够根据需要动态地创建线程,处理任务,处理完毕后又自动回收线程,避免了线程过多占用资源的问题。
- 任务处理时间不确定:适用于任务处理时间不确定的场景,因为它能够根据实际情况动态调整线程数量,保证任务能够及时得到处理,提高系统的响应速度。
- 需要快速响应的任务:适用于需要快速响应的任务,因为它能够快速地创建线程来处理任务,缩短任务等待的时间,提高任务的处理效率。
- 任务负载波动大:适用于任务负载波动大的场景,因为它能够根据负载情况动态调整线程数量,使系统能够更好地适应负载的变化。
缺点:
- 线程数量不受限制:由于
newCachedThreadPool
的最大线程数量为 Integer.MAX_VALUE,因此在大量任务提交的情况下,可能会导致线程数量过多,占用大量系统资源,导致系统负载过高,甚至引发系统崩溃。 - 不适用于长时间任务:由于它的线程数量不受限制,适用于处理短期任务,但不适用于长时间任务,因为长时间任务可能会导致线程数量过多,占用大量系统资源。
- 可能导致频繁创建和销毁线程:由于
newCachedThreadPool
是一个动态的线程池类型,可能会频繁地创建和销毁线程,这种线程的创建和销毁操作会带来一定的性能开销。
综上所述,newCachedThreadPool
适用于任务处理时间不确定、负载波动大、需要快速响应的场景,但在大量长时间任务的情况下,需要慎重选择以避免占用过多系统资源。
newScheduledThreadPool 创建可调度的线程池
项目中经常会遇到一些非分布式的调度任务,需要在未来的某个时刻周期性执行。实现这样的功能,我们有多种方式可以选择:
- Timer类, jdk1.3引入,不推荐
- 它所有任务都是串行执行的,同一时间只能有一个任务在执行,而且前一个任务的延迟或异常都将会影响到之后的任务。
- Spring的@Scheduled注解,不是很推荐
- 这种方式底层虽然是用线程池实现,但是有个最大的问题,所有的任务都使用的同一个线程池,可能会导致长周期的任务运行影响短周期任务运行,造成线程池"饥饿",更加推荐的做法是同种类型的任务使用同一个线程池。
- 自定义ScheduledThreadPoolExecutor实现调度任务
这也是下面重点讲解的方式,通过自定义ScheduledThreadPoolExecutor调度线程池,提交调度任务才是最优解。
newScheduledThreadPool
用于创建一个可调度的线程
,newScheduledThreadPool
是Executors
类提供的一个静态方法方法如下
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
newScheduledThreadPool
方法返回一个ScheduledExecutorService
对象,它是ExecutorService
接口的子接口,提供了任务调度的能力。可调度的线程池可以用于延时执行任务和周期性执行任务。它可以根据需要自动创建和回收线程,并且可以在指定的延时时间后执行任务,或在指定的时间间隔内重复执行任务。
使用
newScheduledThreadPool
创建的可调度线程池有以下特点:
corePoolSize
参数指定了线程池的核心线程数,即线程池中同时执行任务的最大线程数。- 当任务的延时时间到达时,线程池会创建新的线程来执行任务。
- 如果线程池中的线程数量超过核心线程数,空闲的线程会在指定的时间内被回收。
- 可以使用
schedule
方法来延时执行任务,也可以使用scheduleAtFixedRate
方法或scheduleWithFixedDelay
方法来周期性执行任务。使用示例:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(3); executor.schedule(() -> { // 延时执行的任务逻辑 }, 5, TimeUnit.SECONDS); executor.scheduleAtFixedRate(() -> { // 周期性执行的任务逻辑 }, 1, 3, TimeUnit.SECONDS); executor.scheduleWithFixedDelay(() -> { // 周期性执行的任务逻辑 }, 2, 4, TimeUnit.SECONDS);
在示例中,
schedule
方法用于延时执行任务,它接受一个任务和延时时间,表示在指定的延时时间后执行任务。
scheduleAtFixedRate
方法用于周期性执行任务,它接受一个任务、初始延时时间和周期时间,表示在初始延时时间后开始执行任务,并以指定的周期时间重复执行任务。
scheduleWithFixedDelay
方法也用于周期性执行任务,它接受一个任务、初始延时时间和周期时间,表示在初始延时时间后开始执行任务,并在任务执行完成后等待指定的周期时间,然后再执行下一个任务。总之,
newScheduledThreadPool
方法用于创建一个可调度的线程池,可以用于延时执行任务和周期性执行任务。通过合理配置延时时间和周期时间,可以满足不同场景下的任务调度需求。
下面我们通过一个案例来了解如何创建延时线程 和 定时线程
@Test
public void test20() {
// 使用Executors.newScheduledThreadPool 创建线程池
ScheduledThreadPoolExecutor scheduledThreadPool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(5, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
// 对线程名称进行自定义
return new Thread(r, "my-scheduled-job-" + r.hashCode());
}
});
// 准备工作
DelayedThread delayedThread = new DelayedThread();
LoopThread loopThread = new LoopThread();
// 执行延时线程 (延时 10s开始执行 )
logger.error("延时线程工作准备结束!");
scheduledThreadPool.schedule(delayedThread, 10, TimeUnit.SECONDS);
// 执行循环线程
// scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
// command: 执行的任务
// initialDelay: 初始延迟的时间
// delay: 上次执行结束,延迟多久执行
// unit:单位
logger.error("循环线程工作准备结束!");
scheduledThreadPool.scheduleAtFixedRate(loopThread, 10, 5, TimeUnit.SECONDS);
waitPoolExecutedEnd(scheduledThreadPool);
scheduledThreadPool.shutdown();
}
static class DelayedThread implements Runnable {
@Override
public void run() {
logger.error("延时线程正在开始执行~~");
}
}
static class LoopThread implements Runnable {
@Override
public void run() {
logger.error("循环线程开始执行~~");
}
}
首先,通过Executors.newScheduledThreadPool
方法创建了一个可调度的线程池ScheduledThreadPoolExecutor
,并指定了线程池的核心线程数为5。同时,通过自定义的ThreadFactory
来创建线程,并给线程指定了自定义的名称。
接下来,定义了两个任务类DelayedThread
和LoopThread
,分别实现Runnable
接口。
在测试方法中,首先创建了DelayedThread
和LoopThread
的实例。
然后,在测试方法中,首先创建了DelayedThread
和LoopThread
的实例。
然后,通过调用scheduledThreadPool.schedule
方法,将DelayedThread
任务提交给线程池,并指定延时时间为10秒。这意味着DelayedThread
任务将在10秒后执行。
接着,通过调用scheduledThreadPool.scheduleAtFixedRate
方法,将LoopThread
任务提交给线程池,并指定初始延时时间为10秒、周期时间为5秒。这意味着LoopThread
任务将在初始延时时间后开始执行,并且每隔5秒重复执行一次。
最后,调用waitPoolExecutedEnd
方法等待线程池中的任务执行完毕,并调用线程池的shutdown
方法关闭线程池。
总结起来,这段代码演示了使用Executors.newScheduledThreadPool
创建可调度的线程池,并展示了延时执行和周期性执行的例子。通过合理配置延时时间和周期时间,可以实现在指定的时间点或时间间隔内执行任务。
newWorkStealingPool 创建一个可窃取任务的线程池
newWorkStealingPool(int parallelism)
方法用于创建一个工作窃取线程池。工作窃取线程池是一种特殊的线程池,它根据一定的调度策略执行任务。除了执行任务外,工作窃取线程池还可以按照指定的延迟时间或周期性地执行任务。工作窃取线程池最早由美国计算机科学家 Charles E. Leiserson 和 John C. Bains 发明,他们在 1994 年的论文《Scheduling Multithreaded Computations by Work Stealing》中首次提出了这一概念。
工作窃取线程池的设计初衷是为了解决并行计算中独立任务的负载均衡问题。在并行计算中,通常存在大量的独立任务需要并行执行,而这些独立任务的执行时间往往不一致。如果简单地将任务平均分配给每个线程,那些执行时间较短的任务将会导致线程空闲,而执行时间较长的任务则可能导致线程被阻塞,从而降低整体的执行效率。
为了解决这个问题,工作窃取线程池引入了工作窃取算法。该算法允许空闲线程从其他线程的任务队列末尾窃取任务来执行,以实现负载均衡。每个线程都维护一个自己的任务队列,当线程自己的任务执行完毕后,它会尝试从其他线程的任务队列末尾窃取任务执行。这样,任务的分配和执行可以更加均衡,避免线程之间出现明显的负载不均衡。
我们了解 newWorkStealingPool 先来了解一下Fork/Join
Fork/Join框架是Java提供的一种并行执行任务的框架,它基于工作窃取算法实现任务的自动调度和负载均衡。在Fork/Join框架中,工作窃取线程池是其中的核心组件。
Fork/Join框架的工作原理如下:
- 每个任务被划分为更小的子任务,这个过程通常被称为"fork"。
- 当一个线程执行"fork"操作时,它会将子任务放入自己的工作队列中。
- 当一个线程完成自己的任务后,它会从其他线程的工作队列中"steal"(窃取)任务来执行。
- 窃取的任务通常是其他线程工作队列的末尾的任务,这样可以减少线程之间的竞争。
工作窃取线程池在Fork/Join框架中的应用主要体现在以下几个方面:
- 任务分割: 在Fork/Join框架中,任务被递归地分割成更小的子任务,直到达到某个终止条件。工作窃取线程池中的线程负责执行这些任务。
每个线程都有自己的任务队列
,当一个线程执行完自己的任务后,会从自己的队列中获取新的任务来执行。 - 负载均衡: 工作窃取线程池通过
工作窃取算法实现负载均衡
。当一个线程的任务队列为空时,它会从其他线程的任务队列中窃取任务来执行,以保持各个线程的工作量相对均衡。这种负载均衡策略可以避免线程之间出现明显的负载不均衡,提高整体的执行效率。 - 递归任务执行: Fork/Join框架中的任务通常是
递归执行的
。当一个任务被分割成多个子任务时,每个子任务会被提交到工作窃取线程池中执行
。如果子任务还可以进一步分割,线程会继续执行这个过程,直到任务不能再分割为止。这种递归的任务执行方式能够充分利用线程池中的线程资源,提高并行任务的执行效率。 - Join操作: 在Fork/Join框架中,
一个任务可以等待其子任务执行完成后再继续执行
,这个操作被称为"join"。工作窃取线程池在执行任务时会自动进行join操作,确保任务的执行顺序满足依赖关系。这样可以避免线程之间的竞争和冲突,保证任务的正确性。
总的来说,工作窃取线程池在Fork/Join框架中扮演着重要的角色。它通过工作窃取算法和负载均衡策略,实现了并行任务的自动调度和执行。通过递归任务执行和join操作,工作窃取线程池能够高效地处理大量的并行任务,并充分利用系统的并行计算能力。
newWorkStealingPool
是一个创建工作窃取线程池的方法,它使用了ForkJoinPool,并根据CPU核心数动态调整线程数量。这种线程池适用于CPU密集型的任务。
与其他四种线程池不同,newWorkStealingPool
使用了ForkJoinPool。它的优势在于将一个任务拆分成多个小任务,并将这些小任务分发给多个线程并行执行。当所有小任务都执行完成后,再将它们的结果合并。
相较于之前的线程池,newWorkStealingPool
中的每个线程都拥有自己的任务队列,而不是多个线程共享一个阻塞队列。
当一个线程发现自己的任务队列为空时,它会去其他线程的队列中窃取任务来执行。可以将这个过程简单理解为"窃取"。为了降低冲突,一般情况下,自己的本地队列采用后进先出(LIFO)的顺序,而窃取时则采用先进先出(FIFO)的顺序。由于窃取的动作非常快速,这种冲突会大大降低,从而提高了性能。这也是一种优化方式。
下面我们通过一个案例 来了解一下 newWorkStealingPool
@Test
public void test21() {
// 返回可用的计算资源
int core = Runtime.getRuntime().availableProcessors();
logger.error("cpu 可以计算机资源 :{}", core);
// 无参数的话,会根据cpu当前核心数据 动态分配
// ExecutorService executorService = Executors.newWorkStealingPool();
// 当传入参数,就可以指定cpu的一个并行数量
ForkJoinPool forkJoinPool = (ForkJoinPool) Executors.newWorkStealingPool(8);
for (int i = 1; i <= core * 2; i++) {
WorkStealThread workStealThread = new WorkStealThread(i, forkJoinPool);
forkJoinPool.submit(workStealThread);
}
// 优雅关闭
forkJoinPool.shutdown();
// 为了防止 主线程 没有完全执行结束
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
static class WorkStealThread implements Runnable {
private int i;
private ForkJoinPool forkJoinPool;
WorkStealThread() {
}
WorkStealThread(int i, ForkJoinPool forkJoinPool) {
this.i = i;
this.forkJoinPool = forkJoinPool;
}
@Override
public void run() {
try {
// 随机休眠
Thread.sleep(TimeUnit.SECONDS.toMillis(new Random().nextInt(10)));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
logger.error("工作窃取线程-{},线程池的大小:[{}],活动线程数:[{}],总任务窃取次数:[{}]",
i, forkJoinPool.getPoolSize(), forkJoinPool.getActiveThreadCount(), forkJoinPool.getStealCount());
}
}
当代码运行时,首先通过Runtime.getRuntime().availableProcessors()
获取可用的计算资源,即CPU核心数,并将其记录在日志中。
接下来,使用Executors.newWorkStealingPool(8)
创建一个工作窃取线程池,其中参数8表示线程池的并行度,即同时执行的线程数。这个线程池是基于ForkJoinPool
实现的,具有任务窃取的特性。
然后,通过一个循环将一些工作窃取线程提交到线程池中进行并行执行。每个工作窃取线程都有一个编号,从1到核心数的两倍。这些线程使用WorkStealThread
类实现了Runnable
接口。
WorkStealThread
类中的run
方法定义了线程的执行逻辑。首先,线程会随机休眠一段时间,模拟执行一些耗时的任务。然后,它会输出一些关于线程池状态的信息,包括线程池的大小、活动线程数和总任务窃取次数。这些信息会记录在日志中。
最后,线程池会被优雅地关闭,确保所有任务都能执行完毕。为了防止主线程提前结束,使用Thread.sleep(Integer.MAX_VALUE)
使主线程休眠,直到被中断或抛出异常。
通过以上代码,我们可以了解到如何使用newWorkStealingPool
方法创建工作窃取线程池,并通过工作窃取线程实现并行执行。同时,通过获取线程池的状态信息,我们可以了解线程池的工作情况,包括活动线程数和任务窃取次数。
通过观察结果,我们可以发现,当前线程执行完毕,如果空闲的话,会去执行别的线程任务