1.FutureTask的作用?
FutureTask 是 Java 并发编程中的一个类,用于异步执行任务并获取其结果。它实现了 Future 和 Runnable 接口,因此可以作为一个可运行的任务提交给 Executor 执行,也可以通过 Future 接口获取任务执行的结果。
FutureTask 出现的原因是因为在并发编程中,有时需要执行一些比较耗时的操作,但又不能阻塞当前线程的执行,否则会影响程序的性能和响应速度。因此需要一种机制来异步执行这些操作,并在需要时获取其结果。
FutureTask 提供了一种简单的方式来实现这种异步执行和结果获取的机制。它可以让我们提交一个任务到线程池中执行,并立即返回一个 Future 对象,以便在需要的时候获取任务执行的结果。这样可以让主线程继续执行其他任务,而不必等待当前任务的执行结果。
另外,FutureTask 还提供了一些其他的方法,例如 cancel() 方法用于取消任务的执行,isDone() 方法用于判断任务是否已经完成等等。这些方法使得 FutureTask 成为了一个非常实用的工具类,可以帮助我们更方便地实现异步执行和结果获取的功能。
2.FutureTask类结构关系?
可以看到,FutureTask实现了RunnableFuture接口,则RunnableFuture接口继承了Runnable接口和Future接口,所以FutureTask既能当做一个Runnable直接被Thread执行,也能作为Future用来得到Callable的计算结果。
3.FutureTask的线程安全是由什么保证的?
状态的可见性:FutureTask 中的状态是使用 volatile 修饰的,保证状态变化的可见性。多个线程可以看到 FutureTask 中状态的最新值。
原子性操作:FutureTask 内部使用了 CAS(Compare And Swap)操作来确保同步状态的正确性。在多个线程同时对 FutureTask 中的状态进行修改时,CAS 操作可以确保只有一个线程能够成功修改状态。
同步机制:FutureTask 通过 AQS(AbstractQueuedSynchronizer)类实现了同步机制。AQS 本质上是一个基于锁和等待队列实现的同步框架,它可以支持多线程之间的竞争。
线程池:FutureTask 内部使用了线程池来执行任务,线程池会根据实际情况动态调整线程池的大小和资源分配,保证多个任务之间的执行不会相互影响。
4.FutureTask通常会怎么用?
下面是一个使用 FutureTask 的示例代码,该代码模拟了一个耗时的计算任务,通过 FutureTask 实现异步执行并获取计算结果:
import java.util.concurrent.*;
public class FutureTaskExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 创建一个 Callable 对象,用于执行耗时的计算任务
Callable<Integer> task = () -> {
int sum = 0;
for (int i = 1; i <= 1000000000; i++) {
sum += i;
}
return sum;
};
// 创建一个 FutureTask 对象,并将其传递给线程池进行执行
FutureTask<Integer> futureTask = new FutureTask<>(task);
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(futureTask);
// 等待异步任务执行完成,并获取计算结果
Integer result = futureTask.get();
System.out.println("计算结果为:" + result);
// 关闭线程池
executorService.shutdown();
}
}
5.为什么要有线程池?
节省资源:线程的创建和销毁都需要消耗一定的系统资源,如果每次都创建新线程,会导致系统资源的浪费。使用线程池可以避免频繁创建和销毁线程,节省系统资源。
提高执行效率:线程池中的线程可以重复利用,避免频繁创建和销毁线程的开销,提高程序的执行效率和响应速度。
控制并发线程的数量:线程池可以限制并发线程的数量,防止过多线程导致的系统负载过重,从而提高系统的稳定性和可靠性。
提高线程的复用率:线程池中的线程可以重复利用,避免频繁创建和销毁线程的开销,提高系统的性能和稳定性。
统一管理和监控:线程池可以对线程进行统一的管理和监控,例如可以设置线程的优先级、超时时间等参数,可以记录线程的执行状态和执行结果,方便问题排查和性能优化。
6.Java中实现和管理线程池的方式?
使用Java内置的线程池类库
Java中提供了java.util.concurrent包,其中包括ThreadPoolExecutor和ScheduledThreadPoolExecutor等类,可以方便地创建和管理线程池。下面是一个简单的示例:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2); // 创建一个大小为2的固定线程池
for (int i = 1; i <= 5; i++) {
final int count = i;
executor.execute(new Runnable() { // 提交5个任务给线程池执行
@Override
public void run() {
System.out.println("Task " + count + " is running.");
try {
Thread.sleep(1000); // 模拟任务执行过程
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task " + count + " is done.");
}
});
}
executor.shutdown(); // 关闭线程池
}
}
上述代码中,通过Executors.newFixedThreadPool(2)创建了一个大小为2的固定线程池,并通过execute()方法提交了5个任务给线程池执行。最后通过shutdown()方法关闭了线程池。
使用第三方库实现线程池
除了Java内置的线程池类库外,还可以使用第三方库,例如Apache的Commons Pool和Google的Guava等库,来实现线程池的功能。以Commons Pool为例,创建线程池的代码如下:
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.ObjectPool;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
public class ObjectPoolExample {
public static void main(String[] args) throws Exception {
GenericObjectPoolConfig<MyThread> config = new GenericObjectPoolConfig<>(); // 创建一个线程池配置对象
config.setMaxTotal(2); // 设置线程池的最大大小为2
ObjectPool<MyThread> pool = new GenericObjectPool<>(new MyThreadFactory(), config); // 创建线程池
for (int i = 1; i <= 5; i++) {
MyThread thread = pool.borrowObject(); // 从线程池中借用一个线程
thread.setCount(i);
thread.start(); // 启动线程
}
pool.close(); // 关闭线程池
}
private static class MyThreadFactory extends BasePooledObjectFactory<MyThread> {
@Override
public MyThread create() throws Exception {
return new MyThread();
}
@Override
public PooledObject<MyThread> wrap(MyThread thread) {
return new DefaultPooledObject<>(thread);
}
}
private static class MyThread extends Thread {
private int count;
public void setCount(int count) {
this.count = count;
}
@Override
public void run() {
System.out.println("Task " + count" is running.");
try {
Thread.sleep(1000); // 模拟任务执行过程
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task " + count + " is done.");
}
}
}
上述代码中,使用了`Commons Pool`来创建了一个大小为2的线程池,并通过`borrowObject()`方法从线程池中借用了5个线程来执行任务。在`MyThreadFactory`类中实现了线程对象的创建和包装,`MyThread`类中实现了任务的执行过程。最后通过`close()`方法关闭了线程池。
7.ThreadPoolExecutor的原理?
当提交一个任务到线程池时,线程池会先尝试将任务放入任务队列中。
如果任务队列未满,那么任务将会被加入到任务队列中等待执行。
如果任务队列已满,那么线程池会尝试创建新的线程执行任务。
如果当前线程数未超过最大线程数限制,那么线程池会创建一个新线程执行任务。
如果当前线程数已经达到最大线程数限制,那么线程池会根据指定的拒绝策略来处理任务提交失败的情况。
当一个线程执行完一个任务后,它会从任务队列中取出下一个任务继续执行。
如果线程在等待任务时超过了指定的空闲时间,那么线程将会被终止并从线程池中移除。
当线程池关闭时,它将会等待所有任务执行完成并停止所有线程。
8.ThreadPoolExecutor有哪些核心的配置参数? 请简要说明
corePoolSize:核心线程池大小,即线程池中保留的线程数,即使它们处于空闲状态也不会被回收。
maximumPoolSize:线程池允许的最大线程数,包括核心线程数和非核心线程数。
keepAliveTime:非核心线程闲置超时时间,超过这个时间将被回收。默认情况下,只有在线程池中的线程数量超过corePoolSize时才会使用该参数。
unit:keepAliveTime的时间单位。
workQueue:工作队列,用于存储等待执行的任务。线程池会尝试将新任务放入工作队列,如果队列已满,则会根据线程池的配置创建新的线程来执行任务。
handler:拒绝策略,用于处理新任务无法被线程池接收执行的情况。常见的拒绝策略包括AbortPolicy(默认策略,抛出RejectedExecutionException异常)、CallerRunsPolicy(主线程直接执行任务)、DiscardPolicy(默默丢弃无法执行的任务)和DiscardOldestPolicy(丢弃队列中等待时间最长的任务)
以下是一个示例代码,演示如何使用ThreadPoolExecutor创建一个线程池:
// 创建一个核心线程池大小为5,最大线程池大小为10,线程闲置时间为1分钟,工作队列大小为100的线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.AbortPolicy());
// 提交一个任务
executor.submit(() -> {
// 任务代码
});
// 关闭线程池
executor.shutdown();
9.ThreadPoolExecutor可以创建哪三种线程池呢?
ThreadPoolExecutor可以创建以下三种线程池:
FixedThreadPool:固定大小的线程池,核心线程池大小和最大线程池大小相同,且不会被回收。
CachedThreadPool:动态大小的线程池,核心线程池大小为0,最大线程池大小为Integer.MAX_VALUE,线程闲置时间为60秒。当有新任务提交时,线程池会创建新的线程执行任务,如果线程闲置超过60秒,则会被回收。
SingleThreadPool:大小为1的线程池,只有一个核心线程,可以保证任务按照顺序执行。
10.当队列满了并且worker的数量达到maxSize的时候,会怎么样?
当线程池中的工作队列已满,且已经创建的线程数量达到了 maximumPoolSize 时,新的任务将会被拒绝。此时,ThreadPoolExecutor 会调用 RejectedExecutionHandler 接口的实现类来处理这个被拒绝的任务。
RejectedExecutionHandler 接口有四种预定义的实现方式:
AbortPolicy:该策略直接抛出一个 RejectedExecutionException 异常,阻止系统正常运行。
CallerRunsPolicy:该策略将在调用者线程中直接执行被拒绝的任务。
DiscardOldestPolicy:该策略将会丢弃掉队列中最老的一个任务,然后执行被拒绝的任务。
DiscardPolicy:该策略将会默默地丢弃掉被拒绝的任务,不会有任何异常抛出。
默认情况下,如果没有提供 RejectedExecutionHandler 的实现,则使用 AbortPolicy 策略,即拒绝任务并抛出异常。
如果希望自定义拒绝策略,可以通过 ThreadPoolExecutor 构造函数的最后一个参数指定 RejectedExecutionHandler 的实现类,例如:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1),
new ThreadPoolExecutor.DiscardOldestPolicy()
);
上述代码中,使用了 DiscardOldestPolicy 策略来处理被拒绝的任务。当任务被拒绝时,线程池会丢弃队列中最老的任务,并执行被拒绝的任务。
11.简要说下线程池的任务执行机制?
execute –> addWorker –>runworker (getTask)
线程池的工作线程通过Woker类实现,在ReentrantLock锁的保证下,把Woker实例插入到HashSet后,并启动Woker中的线程。
从Woker类的构造方法实现可以发现: 线程工厂在创建线程thread时,将Woker实例本身this作为参数传入,当执行start方法启动线程thread时,本质是执行了Worker的runWorker方法。
firstTask执行完成之后,通过getTask方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask方法会被阻塞并挂起,不会占用cpu资源;
12.线程池中任务是如何提交的?
线程池中任务的提交是通过调用 execute(Runnable task) 方法实现的。execute() 方法会将任务封装成一个 Worker 对象,然后将其放入任务队列中等待执行。
当一个线程被创建时,会先从任务队列中获取任务进行执行。如果队列中没有任务,则该线程会阻塞等待,直到有任务被加入到任务队列中。如果任务队列中的任务已经被取完,线程会等待新的任务加入队列中。直到线程池被关闭或超时,该线程才会终止。
13.线程池中任务是如何关闭的?
线程池的任务关闭通常有两种方式:一种是 shutdown() 方法,另一种是 shutdownNow() 方法。
shutdown() 方法是一个平缓的关闭过程,它将允许所有已经提交到线程池中的任务完成执行,但不再接受新的任务。在调用 shutdown() 方法后,线程池状态将转换为 SHUTDOWN,并且在所有任务执行完毕后,线程池将自动关闭。
shutdownNow() 方法是一个立即关闭过程,它将尝试终止所有正在执行的任务,并且不再处理等待队列中的任何任务。在调用 shutdownNow() 方法后,线程池状态将转换为 STOP,并且将尝试中断所有正在执行的任务。如果线程池中的任务中有一些不响应中断,则这些任务将继续运行。在这种情况下,可以通过在任务中使用 Thread.currentThread().isInterrupted() 来判断当前线程是否已经被中断,以及在任务中捕获 InterruptedException 异常并处理它。
14.在配置线程池的时候需要考虑哪些配置因素?
corePoolSize:核心线程数,一般设置为 CPU 核心数 + 1,例如在一个 8 核的机器上,可以设置为 9。
maximumPoolSize:最大线程数,一般根据具体的业务需求和机器性能设置,一般推荐设置为 corePoolSize 的 2 倍左右。
keepAliveTime:线程空闲时间,如果线程池中的线程数量超过 corePoolSize,则当线程空闲时间超过该参数设定的时间时,多余的线程会被销毁,直到线程池中的线程数不超过 corePoolSize。
workQueue:任务队列,可以使用有界队列或无界队列,有界队列可以避免线程池因任务数量过多而导致的内存不足问题,但是需要根据具体情况选择合适的队列大小。
rejectedExecutionHandler:拒绝策略,当任务队列已满且线程池中的线程数量已达到最大线程数时,新提交的任务会被拒绝执行,此时可以通过拒绝策略进行处理,例如丢弃任务、抛出异常等。
15.如何监控线程池的状态?
ThreadPoolExecutor类的方法:ThreadPoolExecutor类提供了一些方法,如getActiveCount()、getCompletedTaskCount()、getTaskCount()等来获取线程池中正在执行的任务数、已经完成的任务数和总的任务数等。
ExecutorService接口的方法:ExecutorService接口继承了Executor接口,提供了isShutdown()和isTerminated()等方法来判断线程池是否已经关闭或者已经终止。
Future接口的方法:如果使用submit()方法来提交任务,返回值是一个Future对象,可以使用Future接口提供的isDone()和isCancelled()方法来判断任务是否完成或者被取消。
16.为什么很多公司不允许使用Executors去创建线程池? 那么推荐怎么使用呢?
虽然Executors类提供了一些静态方法用于创建线程池,但是这些方法往往会使用一些默认的配置参数,可能不符合实际业务的需求,导致线程池出现一些问题。比如默认情况下,Executors.newCachedThreadPool()方法会创建一个无限大小的线程池,当任务数量过多时会导致内存溢出;而Executors.newFixedThreadPool()方法会创建一个固定大小的线程池,当任务数量超过线程池大小时,会将任务放入无界队列中,可能会导致队列过大,从而出现内存泄漏等问题。
因此,很多公司不允许使用Executors类去创建线程池,而是要求开发人员自己手动配置线程池的参数,比如corePoolSize、maximumPoolSize、keepAliveTime、workQueue、ThreadFactory等,以满足具体业务场景的需求。
推荐使用ThreadPoolExecutor类手动配置线程池参数,还可以使用第三方线程池工具类,比如Apache Commons Lang库中的BasicThreadFactory类,可以方便地创建线程池并设置线程工厂等参数,避免手动配置的繁琐。
17.ScheduledThreadPoolExecutor要解决什么样的问题?
ScheduledThreadPoolExecutor是Java中的一个线程池,用于解决需要定时执行任务或定期执行任务的场景。通常在需要周期性地执行某个任务,或在未来的某个时间点执行某个任务时使用。
例如,一个定时任务需要每天凌晨2点执行某个任务,那么可以使用ScheduledThreadPoolExecutor来实现。又例如,需要每隔10秒执行某个任务,也可以使用ScheduledThreadPoolExecutor来实现。
相较于普通的ThreadPoolExecutor,ScheduledThreadPoolExecutor多了一个调度器,可以使用预定的时间或延迟来安排任务的执行。
18.ScheduledThreadPoolExecutor相比ThreadPoolExecutor有哪些特性?
ScheduledFutureTask
ScheduledThreadPoolExecutor 内部维护了一个 ScheduledFutureTask 的队列,ScheduledFutureTask 是一个实现了 Future 接口和 RunnableScheduledFuture 接口的类,它表示一个可调度的任务。这个队列按照任务的执行时间进行排序,如果有新的任务加入,则会根据执行时间进行调整。
DelayedWorkQueue
ScheduledThreadPoolExecutor 内部使用 DelayedWorkQueue 来存储任务。DelayedWorkQueue 是一个实现了 BlockingQueue 接口的类,它是一个延迟队列,内部存储的是 ScheduledFutureTask,这些任务会按照执行时间进行排序,队头元素是最早要执行的任务。
run-after-shutdown
ScheduledThreadPoolExecutor 与 ThreadPoolExecutor 的一个重要区别是在关闭线程池时的行为不同。当调用线程池的 shutdown() 方法后,ThreadPoolExecutor 会等待队列中的任务都执行完成后再退出。但是 ScheduledThreadPoolExecutor 在调用 shutdown() 后,已经加入队列但还未执行的任务会被移除掉,已经执行的任务会继续执行。如果想要等待任务全部执行完成再关闭线程池,需要使用 awaitTermination() 方法来等待。
综上所述,ScheduledThreadPoolExecutor 相比于 ThreadPoolExecutor 的主要特性是支持延迟任务和周期性任务的执行。其内部维护了一个 ScheduledFutureTask 的队列,使用 DelayedWorkQueue 存储任务,且在关闭线程池时的行为与 ThreadPoolExecutor 不同。
19.ScheduledThreadPoolExecutor有什么样的数据结构,核心内部类和抽象类?
ScheduledThreadPoolExecutor内部维护了一个DelayedWorkQueue延迟队列,其中的元素都是ScheduledFutureTask类型的任务,而ScheduledFutureTask是继承自FutureTask,同时实现了ScheduledFuture接口,可以定时或延迟执行任务。
核心内部类:
ScheduledFutureTask:继承自FutureTask,实现了ScheduledFuture接口,封装了需要延迟执行的任务。
ScheduledThreadPoolExecutor.DelayedWorkQueue:继承自AbstractQueue,实现了Delayed接口,是一个延迟队列,其中存放着需要延迟执行的ScheduledFutureTask。
抽象类:
ScheduledExecutorService:是ExecutorService接口的子接口,定义了一些调度方法,例如schedule、scheduleAtFixedRate、scheduleWithFixedDelay等方法,用于按一定的时间间隔执行任务。ScheduledThreadPoolExecutor实现了ScheduledExecutorService接口,因此可以使用这些调度方法。
20.ScheduledThreadPoolExecutor有哪两个关闭策略? 区别是什么?
shutdown():调用该方法后,线程池进入关闭状态,已提交但未执行的任务会继续执行,直到执行完所有任务后停止。
shutdownNow():调用该方法后,线程池立即进入关闭状态,正在执行的任务会被中断,已提交但未执行的任务会被取消,方法会返回已提交但未执行的任务列表。
21.ScheduledThreadPoolExecutor中scheduleAtFixedRate 和 scheduleWithFixedDelay区别是什么?
调度周期的计算方式不同:
scheduleAtFixedRate是基于固定的周期来进行调度,无论上一个任务是否执行完成,都会按照固定的周期进行调度。
scheduleWithFixedDelay是基于上一个任务执行完成的时间,再加上固定的延迟时间,来进行下一次任务的调度。
调度任务的时间处理方式不同:
scheduleAtFixedRate是基于系统时间的绝对时间进行调度的,如果任务执行时间过长,则后续任务会被延迟,会导致任务重叠。
scheduleWithFixedDelay是基于上一个任务执行完成的时间,再加上固定的延迟时间,计算出下一个任务的执行时间,任务的执行时间不受前一个任务的影响。
22.为什么ThreadPoolExecutor 的调整策略却不适用于 ScheduledThreadPoolExecutor?
ThreadPoolExecutor 中的调整策略是通过 allowCoreThreadTimeOut 和 keepAliveTime 两个参数实现的,可以动态调整线程池中核心线程的数量。但是 ScheduledThreadPoolExecutor 中不支持动态调整核心线程数,因为它的调度器需要提前计算好所有任务的执行时间,所以无法随时动态调整核心线程数。
在 ScheduledThreadPoolExecutor 中,如果想要调整线程池大小,只能通过调整 corePoolSize 和 maximumPoolSize 这两个参数来实现。但这样做会有一些限制,比如如果当前线程数已经超过了 corePoolSize,那么只有在所有任务都执行完毕后,才能缩小线程池的大小,否则新的任务可能得不到执行。
23.Executors 提供了几种方法来构造 ScheduledThreadPoolExecutor?
newScheduledThreadPool: 可指定核心线程数的线程池。
newSingleThreadScheduledExecutor: 只有一个工作线程的线程池。如果内部工作线程由于执行周期任务异常而被终止,则会新建一个线程替代它的位置
24.Fork/Join主要用来解决什么样的问题?
Fork/Join主要用来解决分治任务(Divide and Conquer)的问题,Fork/Join 技术是分治算法(Divide-and-Conquer)的并行实现,即将一个大的任务拆分成若干个小任务并行执行,最后合并小任务的结果得到大任务的结果。这种问题通常需要递归地将任务拆分成更小的子任务,直到达到某个终止条件才停止递归。Fork/Join框架提供了高效的任务调度和数据共享机制,能够自动化地拆分和合并任务,充分利用多核处理器的计算能力,提高并发处理效率。常见的应用场景包括排序、搜索、归并、图形处理等。
25.Fork/Join框架主要包含哪三个模块? 模块之间的关系是怎么样的?
ForkJoinTask:代表Fork/Join框架中的任务,其中包括两种类型的任务:无返回值的任务(RecursiveAction)和有返回值的任务(RecursiveTask)。每个任务可以拆分成更小的子任务并将这些子任务提交给ForkJoinPool中的工作线程执行。
ForkJoinWorkerThread:代表Fork/Join框架中的工作线程,是一个扩展了Java线程的特殊线程。ForkJoinPool中维护了一个工作线程池,每个工作线程负责执行提交给ForkJoinPool的任务。
ForkJoinPool:代表Fork/Join框架中的线程池,它负责调度和执行提交给它的任务。ForkJoinPool是一种特殊的线程池,它可以自动分割任务,将一个大任务拆分成若干个小任务,然后将这些小任务分配给线程池中的工作线程执行。ForkJoinPool使用工作窃取算法来提高执行效率,它允许一个线程从其他线程的任务队列中偷取任务来执行,以此来减少线程之间的竞争和等待。
这三个模块之间的关系如下:
ForkJoinTask通过fork方法可以将一个任务拆分成若干个子任务并将这些子任务提交给线程池中的工作线程执行,或者通过join方法来等待子任务执行完成并得到子任务的执行结果。
ForkJoinWorkerThread是线程池中的工作线程,它执行线程池提交的任务,如果一个任务需要拆分成更小的子任务执行,则工作线程可以将这些子任务压入自己的任务队列中。
ForkJoinPool是线程池的核心,它维护了线程池中的工作线程集合和任务队列,负责将提交给它的任务分割成若干个子任务并分配给工作线程执行。ForkJoinPool还使用了一种分治算法来将任务拆分成更小的子任务,从而充分利用CPU资源。
26.ForkJoinPool类继承关系?
内部类介绍:
ForkJoinWorkerThreadFactory: 内部线程工厂接口,用于创建工作线程ForkJoinWorkerThread
DefaultForkJoinWorkerThreadFactory: ForkJoinWorkerThreadFactory 的默认实现类
InnocuousForkJoinWorkerThreadFactory: 实现了 ForkJoinWorkerThreadFactory,无许可线程工厂,当系统变量中有系统安全管理相关属性时,默认使用这个工厂创建工作线程。
EmptyTask: 内部占位类,用于替换队列中 join 的任务。
ManagedBlocker: 为 ForkJoinPool 中的任务提供扩展管理并行数的接口,一般用在可能会阻塞的任务(如在 Phaser 中用于等待 phase 到下一个generation)。
WorkQueue: ForkJoinPool 的核心数据结构,本质上是work-stealing 模式的双端任务队列,内部存放 ForkJoinTask 对象任务,使用 @Contented 注解修饰防止伪共享。
工作线程在运行中产生新的任务(通常是因为调用了 fork())时,此时可以把 WorkQueue 的数据结构视为一个栈,新的任务会放入栈顶(top 位);工作线程在处理自己工作队列的任务时,按照 LIFO 的顺序。
工作线程在处理自己的工作队列同时,会尝试窃取一个任务(可能是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的队列任务),此时可以把 WorkQueue 的数据结构视为一个 FIFO 的队列,窃取的任务位于其他线程的工作队列的队首(base位)。
伪共享状态: 缓存系统中是以缓存行(cache line)为单位存储的。缓存行是2的整数幂个连续字节,一般为32-256个字节。最常见的缓存行大小是64个字节。当多线程修改互相独立的变量时,如果这些变量共享同一个缓存行,就会无意中影响彼此的性能,这就是伪共享。
ForkJoinTask抽象类继承关系?
ForkJoinTask 实现了 Future 接口,说明它也是一个可取消的异步运算任务,实际上ForkJoinTask 是 Future 的轻量级实现,主要用在纯粹是计算的函数式任务或者操作完全独立的对象计算任务。fork 是主运行方法,用于异步执行;而 join 方法在任务结果计算完毕之后才会运行,用来合并或返回计算结果。 其内部类都比较简单,ExceptionNode 是用于存储任务执行期间的异常信息的单向链表;其余四个类是为 Runnable/Callable 任务提供的适配器类,用于把 Runnable/Callable 转化为 ForkJoinTask 类型的任务(因为 ForkJoinPool 只可以运行 ForkJoinTask 类型的任务)。
27.整个Fork/Join 框架的执行流程/运行机制是怎么样的?
Fork/Join框架的执行流程/运行机制如下:
将大的任务分解成若干个子任务,每个子任务都可以独立执行。
将子任务加入到任务队列中等待执行。
启动多个线程(ForkJoinWorkerThread)从任务队列中取出任务执行,直到任务队列为空。
如果某个线程执行完了自己的任务队列中的任务,它会从其他线程的任务队列中窃取任务执行,以此来实现负载均衡和避免线程饥饿的问题。
子任务执行完成后,将会合并结果,然后返回给父任务,直到最终的结果合并完成。
Fork/Join框架的执行流程主要是由ForkJoinTask、ForkJoinPool和ForkJoinWorkerThread这三个核心模块共同实现的。ForkJoinTask表示一个任务,可以分解成若干个子任务,执行完毕后可以将结果合并,这是一个抽象类,实现该抽象类需要实现compute方法。ForkJoinPool是线程池,负责调度ForkJoinTask任务,内部包含多个ForkJoinWorkerThread线程。ForkJoinWorkerThread是线程池中的线程,执行ForkJoinTask任务,并将子任务加入到任务队列中。当一个ForkJoinTask任务被执行时,如果发现其compute方法分解了若干个子任务,这些子任务将被加入到任务队列中,供其他线程窃取执行。
通过这样的方式,Fork/Join框架可以实现任务的自动分解和合并,同时也能够避免线程饥饿和任务负载不均等问题。
28.具体阐述Fork/Join的分治思想和work-stealing 实现方式?
Fork/Join框架的核心思想是分治法,将大任务分割成小任务,通过多线程并发执行小任务,最后合并小任务的结果得到大任务的结果。为了提高任务的执行效率,Fork/Join框架使用了work-stealing算法,实现了任务的动态负载均衡。
具体地,Fork/Join框架将一个大任务分割成多个小任务,每个小任务独立地执行,小任务的执行结果被合并为大任务的最终结果。分割和合并的过程可以使用ForkJoinTask和它的子类来完成。ForkJoinTask是Fork/Join框架中表示任务的抽象类,其中包含两个核心方法:
fork():将任务拆分成子任务并异步执行。
join():等待子任务完成并返回其结果。
当一个任务被拆分成多个子任务后,子任务被放到ForkJoinTask的任务队列中,等待其他线程去执行。每个线程都会从自己的任务队列中获取任务执行,这个过程称为工作窃取(work-stealing)。如果一个线程执行完了自己的任务,但任务队列中没有任务可供执行,那么它就会去其他线程的任务队列中窃取任务并执行。
ForkJoinPool是Fork/Join框架的核心类,它继承自AbstractExecutorService类。ForkJoinPool包含多个ForkJoinWorkerThread线程和一个ForkJoinTask任务队列。当提交一个任务时,ForkJoinPool会将任务放到任务队列中,ForkJoinWorkerThread从任务队列中取出任务并执行。当一个任务调用fork()方法时,它会被拆分成多个子任务并放到任务队列中,ForkJoinWorkerThread会从队列中取出子任务并执行。当一个任务调用join()方法时,如果子任务还没有执行完,则当前线程会等待子任务执行完并返回结果。
29.有哪些JDK源码中使用了Fork/Join思想?
java.util.Arrays#parallelSort: 该方法使用了Fork/Join框架实现对数组的并行排序,利用分治思想将数组拆分为多个小数组进行排序,然后再通过归并排序的方式合并排序结果。
java.util.concurrent.ConcurrentHashMap: 该类是Java并发编程中常用的线程安全的哈希表,其内部使用了Fork/Join框架实现并行操作,例如在扩容时就会将哈希表分割为多个小段,然后并行扩容每个小段,提高了扩容的效率。
java.util.stream.Stream: 该类提供了一组函数式操作流的方法,包括map、reduce、filter等操作,内部使用Fork/Join框架实现并行流处理,将流分割为多个小段,然后并行处理每个小段,提高了处理效率。
java.util.concurrent.RecursiveTask: 该类是Fork/Join框架中的抽象类,用于实现可递归划分的任务,例如求解Fibonacci数列、矩阵乘法等问题,内部使用Fork/Join框架实现任务的分割和合并。
30.如何使用Executors工具类创建ForkJoinPool?
Executors 工具类提供了创建 ForkJoinPool 的方法,可以通过以下方式创建:
ForkJoinPool forkJoinPool = Executors.newWorkStealingPool();
这将返回一个基于当前系统的可用处理器数量创建的 ForkJoinPool,其并行度等于可用处理器数量,工作窃取队列采用默认设置。可以使用其他方法创建自定义的 ForkJoinPool。例如:
ForkJoinPool forkJoinPool = new ForkJoinPool(4); // 创建4个线程的ForkJoinPool
这将创建一个并行度为4的 ForkJoinPool。还可以使用 ForkJoinPool.ForkJoinWorkerThreadFactory 和 Thread.UncaughtExceptionHandler 创建自定义的 ForkJoinPool。
31.写一个例子: 用ForkJoin方式实现1+2+3+...+100000?
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class ForkJoinSum extends RecursiveTask<Long> {
private final int THRESHOLD = 10000; // 阈值,即分治任务的最小规模
private final int start;
private final int end;
public ForkJoinSum(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start <= THRESHOLD) { // 如果任务规模小于等于阈值,直接计算
long sum = 0L;
for (int i = start; i <= end; i++) {
sum += i;
}
return sum;
} else { // 否则,将任务拆分为两个子任务
int mid = start + (end - start) / 2;
ForkJoinSum leftTask = new ForkJoinSum(start, mid);
ForkJoinSum rightTask = new ForkJoinSum(mid + 1, end);
leftTask.fork();
rightTask.fork();
return leftTask.join() + rightTask.join();
}
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
ForkJoinSum task = new ForkJoinSum(1, 100000);
long sum = pool.invoke(task);
System.out.println("1+2+3+...+100000=" + sum);
}
}
在这个例子中,我们首先定义了一个ForkJoinSum类,它继承了RecursiveTask<Long>类,并重写了compute()方法来实现任务的分治计算。在compute()方法中,如果任务规模小于等于阈值THRESHOLD,我们就直接计算出结果并返回。否则,我们将任务拆分为两个子任务,并对子任务进行fork()操作,然后调用join()方法等待子任务执行完成并返回结果。在main()方法中,我们首先创建一个ForkJoinPool线程池,然后创建一个ForkJoinSum任务并调用invoke()方法提交任务。
32.Fork/Join在使用时有哪些注意事项?
任务粒度划分:Fork/Join框架对任务的划分需要细致且恰当,任务过小会导致线程创建和上下文切换开销,任务过大会导致线程饥饿和负载不均衡,进而影响程序性能。
任务依赖关系:Fork/Join框架默认是采用工作窃取的方式来调度任务的执行,所以任务之间需要满足无依赖关系或者依赖关系可以被解决,否则会出现死锁等问题。
线程池大小:Fork/Join框架的线程池大小需要适当,过多的线程会导致线程上下文切换,过少的线程会导致负载不均衡。