快捷导航
- 一、提供了什么功能?
- 源码中的定义:
- 此类支持以下几种方法:
- 二、源码中是怎么实现的?
- 1、创建并返回一个配置了常用设置的`ExecutorService`
- newFixedThreadPool()
- newSingleThreadExecutor()
- newCachedThreadPool()
- newWorkStealingPool()
- 2、创建并返回一个`ScheduledExecutorService`
- newScheduledThreadPool()
- newSingleThreadScheduledExecutor()
- 3、创建并返回一个禁用重新配置的`ExecutorService`
- unconfigurableExecutorService()
- unconfigurableScheduledExecutorService()
- 4、创建并返回一个`ThreadFactory`
- defaultThreadFactory()
- privilegedThreadFactory()
- 5、创建并返回一个`Callable`
- callable()
- 三、总结:
一、提供了什么功能?
源码中的定义:
该类为
Executor
、ExecutorService
、ScheduledExecutorService
、ThreadFactory
和Callable
类定义了工厂和工具方法。
此类支持以下几种方法:
- 创建并返回一个配置了常用设置的
ExecutorService
的方法。 - 创建并返回一个配置了常用设置的
ScheduledExecutorService
的方法。 - 创建并返回一个“包装”的
ExecutorService
的方法,该方法通过使实现特定的方法不可访问来禁用重新配置。 - 创建并返回一个将新创建线程设置为已知状态的
ThreadFactory
的方法。 - 创建并返回一个
Callable
的方法,该方法基于其他闭包形式,以便可以在需要Callable
的执行方法中使用。
说了这么多的名词,那么这些名词之间的关系是什么?请看下图:
二、源码中是怎么实现的?
我们先来看看Executors
类的结构:
1、创建并返回一个配置了常用设置的ExecutorService
newFixedThreadPool()
/**
* 创建一个线程池,该线程池重用固定数量的线程,并使用共享的无界队列进行操作。
* 在任何时候,最多将有 nThreads 个线程处于活动状态以处理任务。
* 如果在所有线程都处于活动状态时提交了额外的任务,这些任务将等待在队列中,直到有线程可用。
* 如果在执行过程中任何线程因故障而终止,在关闭之前,如果需要执行后续任务,将会有一个新线程替代它。
* 线程池中的线程将一直存在,直到显式关闭。
*
* 参数:
* nThreads – 线程池中的线程数量
*
* 抛出:
* IllegalArgumentException – 如果 nThreads <= 0
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
/**
* 使用指定线程工厂创建新线程的重载方法
*
* 参数:
* nThreads – 线程池中的线程数量
* threadFactory – 创建新线程时使用的工厂
*
* 抛出:
* NullPointerException – 如果threadFactory为null
* IllegalArgumentException – 如果nThreads <= 0
*/
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
newSingleThreadExecutor()
/**
* 从代码上可以看出,该方法和newFixedThreadPool()大同小异,只有两点不同:
* 1. 线程池核心线程数不同,newFixedThreadPool的核心线程数是方法入参nThreads,而本方法的核心线程数是1。
* 2. 本方法在new ThreadPoolExecutor()之外加了一层封装new FinalizableDelegatedExecutorService()。
* 只提供了本类对接口ExecutorService实现的方法的访问接口,目的是防止ThreadPoolExecutor实例在某些情况下 对线程池配置的修改,
* 例如:使用setCorePoolSize()重新设置线程池核心线程数
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
// 使用指定线程工厂创建新线程的重载方法
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
newCachedThreadPool()
/**
* 创建一个线程池,该线程池根据需要创建新线程,但会在可用时重用之前构造的线程。
* 这些线程池通常会提高执行许多短暂异步任务的程序的性能。
* 调用execute时,如果有可用的线程,将重用之前构造的线程。
* 如果没有现有线程可用,将创建一个新线程并添加到池中。
* 未使用超过六十秒的线程将被终止并从缓存中移除。因此,长时间保持空闲的池不会消耗任何资源。
* 请注意,可以使用ThreadPoolExecutor构造函数创建具有类似属性但不同细节(例如超时参数)的池。
*
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
// 使用指定线程工厂创建新线程的重载方法
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
newWorkStealingPool()
/**
* 创建一个线程池,该线程池维护足够的线程以支持给定的并行级别,并可能使用多个队列来减少竞争。
* 并行级别对应于主动参与或可用于任务处理的最大线程数。
* 实际线程数量可能会动态增长和缩小。
* 工作窃取池不保证提交任务的执行顺序。
*
* 参数:
* parallelism – 目标并行级别
*
* 抛出:
* IllegalArgumentException – 如果 parallelism <= 0
*/
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
// 创建一个工作窃取线程池,使用所有可用处理器作为其目标并行级别。
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
2、创建并返回一个ScheduledExecutorService
newScheduledThreadPool()
/**
* 创建一个可以在给定延迟后运行或定期执行的线程池。
*
* 参数:
* corePoolSize – 即使线程处于空闲状态,也要保持在池中的线程数量
*
* 抛出:
* IllegalArgumentException – 如果 corePoolSize < 0
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
// 使用指定线程工厂创建新线程的重载方法
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
newSingleThreadScheduledExecutor()
// 返回一个类似newScheduledThreadPool(1)的单线程执行器,在任何情况下线程数量都不可更改
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
// 使用指定线程工厂创建新线程的重载方法
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
3、创建并返回一个禁用重新配置的ExecutorService
unconfigurableExecutorService()
// 和newSingleThreadExecutor()、newSingleThreadScheduledExecutor()类似的套路
// 添加一个包装类,屏蔽返回的线程池对象对配置的修改
public static ExecutorService unconfigurableExecutorService(ExecutorService executor) {
if (executor == null)
throw new NullPointerException();
return new DelegatedExecutorService(executor);
}
unconfigurableScheduledExecutorService()
// 和unconfigurableExecutorService同样的套路
public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) {
if (executor == null)
throw new NullPointerException();
return new DelegatedScheduledExecutorService(executor);
}
4、创建并返回一个ThreadFactory
defaultThreadFactory()
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
// 静态内部类DefaultThreadFactory
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
privilegedThreadFactory()
public static ThreadFactory privilegedThreadFactory() {
return new PrivilegedThreadFactory();
}
/**
* Thread factory capturing access control context and class loader
*/
static class PrivilegedThreadFactory extends DefaultThreadFactory {
private final AccessControlContext acc;
private final ClassLoader ccl;
PrivilegedThreadFactory() {
super();
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
// Calls to getContextClassLoader from this class
// never trigger a security check, but we check
// whether our callers have this permission anyways.
sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);
// Fail fast
sm.checkPermission(new RuntimePermission("setContextClassLoader"));
}
this.acc = AccessController.getContext();
this.ccl = Thread.currentThread().getContextClassLoader();
}
public Thread newThread(final Runnable r) {
return super.newThread(new Runnable() {
public void run() {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {
Thread.currentThread().setContextClassLoader(ccl);
r.run();
return null;
}
}, acc);
}
});
}
}
5、创建并返回一个Callable
callable()
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
public static Callable<Object> callable(Runnable task) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<Object>(task, null);
}
public static Callable<Object> callable(final PrivilegedAction<?> action) {
if (action == null)
throw new NullPointerException();
return new Callable<Object>() {
public Object call() { return action.run(); }};
}
public static Callable<Object> callable(final PrivilegedExceptionAction<?> action) {
if (action == null)
throw new NullPointerException();
return new Callable<Object>() {
public Object call() throws Exception { return action.run(); }};
}
public static <T> Callable<T> privilegedCallable(Callable<T> callable) {
if (callable == null)
throw new NullPointerException();
return new PrivilegedCallable<T>(callable);
}
public static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable) {
if (callable == null)
throw new NullPointerException();
return new PrivilegedCallableUsingCurrentClassLoader<T>(callable);
}
三、总结:
- 通过对以上源码的分析,可以发现虽然Executors工具类提供了很多快速创建线程池的方法,但归根结底还是对ThreadPoolExecutor、ScheduledThreadPoolExecutor的封装。我们可以很容易的根据自己的项目情况去自定义,这样的创建的线程池才是最符合业务场景的。
- 在Executors提供的快速创建线程池的方法中,若有贴合使用业务场景的,可以直接使用,这样可以提高开发效率,避免重复造轮子。
- Executors提供了从Runable到Callable的转换,这可能在需要线程提供返回值的时候是有用的。