在上篇我们学习了线程池各个参数的含义,线程池任务处理流程,使用线程池的好处等内容,本篇我们学习如何创建一个适合我们业务的线程池。为此,我们有必要先学习一下如何大概确定我们线程池核心线程数、怎么设置阻塞队列的类型与大小、当线程池没有能力处理任务了该如何使用拒绝策略等内容。
合适的线程数量
对于线程池来说,不同的任务类型可能采取不同的线程数量会取得更好的效果。这是因为有的线程任务对CPU消耗大,任务时间却短,有的任务需要访问网络,需要较长时间才能完成任务。如果只是笼统的定义一个固定大小的线程池,往往会出现CPU不繁忙,但线程池的线程已满,不能再接受任务了,此时却有可能有一些不耗时但耗费CPU的任务没机会执行,从而导致CPU资源浪费等情况。因此,我们需要根据业务特点来定义线程池。
对于CPU 密集型任务,比如加密、解密、压缩、计算等一系列需要大量耗费 CPU 资源的任务。对于这样的任务最佳的线程数为 CPU 核心数的 1~2 倍,如果设置过多的线程数,实际上并不会起到很好的效果。假设设置过多线程,首先会耗费更多资源,大量任务占用线程,但CPU资源有限,这就导致了很多线程上下文切换的成本,此时性能可能不升反降。
对耗时IO任务,比如数据库、文件的读写,网络通信等任务,这种任务的特点是并不会特别消耗 CPU 资源,但是 IO 操作很耗时,总体会占用比较多的时间。这种情况应该设置更多的线程来完成任务,当在等待IO的线程挂起后,此时这些线程并不占用CPU资源,但占用了线程资源,如果没有更多的线程,后续的任务没机会执行,CPU资源也白白浪费。此时如果有更多线程,则这些线程可以去执行其它任务,把CPU资源给利用起来,提高系统的总体性能。
《Java并发编程实战》的作者 Brain Goetz 推荐的计算方法:
线程数 = CPU 核心数 *(1+平均等待时间/平均工作时间)
可见,线程数和平均等待时间成正比,和平均工作时间成反比。总体按这个思路来设置线程池线程大小是可行的,实际还可以根据特定的服务器资源进行压测,根据压测结果来调整以达到更高的性能。
线程池阻塞队列
我们知道使用Executors工具类可以方便的创建线程池(不推荐),其内部实际就是使用ThreadPoolExecutor进行创建,只是提供了一些默认参数,不需要开发者去关注而已。这里不展开Executors给我们提供的那几种线程池,这里关注一下这些线程池里面的阻塞队列类型,为我们自定义线程池作参考。
FixedThreadPool | LinkedBlockingQueue |
SingleThreadExecutor | LinkedBlockingQueue |
CachedThreadPool | SynchronousQueue |
SingleThreadScheduledExecutor | DelayedWorkQueue |
ScheduledThreadPoolExecutor | DelayedWorkQueue |
首先是FixedThreadPool和SingleThreadExecutor,它们用的阻塞队列都是LinkedBlockingQueue,容量是Integer.MAX_VALUE。因为它们的线程数都是固定的,不能创建非核心线程,因此,队列几乎无限大,核心线程都忙的情况只能把任务放队列,这也是固定线程数的关键。
然后是CachedThreadPool,看名字是一个可缓存的线程池。它的阻塞队列使用的是SynchronousQueue,这个阻塞队列本身不存储数据,它只起到任务中转的作用,当有任务被线程put进来后,必须需要另一线程take,前面的线程才会返回。CachedThreadPool利用这个队列特性来无限创建线程。我们看一下CachedThreadPool创建时传入的参数就可窥一二。
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to {@code execute} will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
注意到核心线程数传了0,即没有核心线程数,最大线程数为Integer.MAX_VALUE,通常系统都不会有这么多线程,可以认为是可以无限创建线程了。线程空闲60秒后会被回收,因此这是可缓存线程,缓存时间60秒。队列则使用SynchronousQueue作中转,因为核心线程数为0,因此根据线程池内部的运转流程,任务一来,如果当前没可用线程,必定直接放队列,然后队列的任务被线程池take出来创建新线程以执行任务。
最后是SingleThreadScheduledExecutor和ScheduledThreadPoolExecutor,这两个线程池可以定时执行任务,其阻塞队列用了DelayedWorkQueue,看这名称我们即隐约知道为啥它能定时执行任务了。DelayedWorkQueue 的特点是内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构。延时队列执行内部的原理大致是按时间排序好任务,通过锁的await方法和signal方法来控制延时执行,有兴趣可以翻阅ScheduledThreadPoolExecutor源码
private final ReentrantLock lock = new ReentrantLock();
/**
* Condition signalled when a newer task becomes available at the
* head of the queue or a new thread may need to become leader.
*/
private final Condition available = lock.newCondition();
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
线程拒绝策略
这个相对简单,正所谓水满则溢。线程池也有无能为力的情况,当队列、最大线程数都满后,再有任务过来,又无空闲线程可执行任务,此时线程池必须拒绝任务,拒绝任务的做法可以有很多种,直接丟了任务不处理(通常不会用),丢弃最旧的任务,由提交任务的线程自行执行等。下面我们看看JUC包默认的拒绝策略都有哪些。
我们看ThreadPoolExecutor的构造函数可知,需要执行拒绝策略,实现RejectedExecutionHandler接口即可。
可以看出,RejectedExecutionHandler默认有四个实现
- DiscardOldestPolicy
- AbortPolicy
- CallerRunsPolicy
- DiscardPolicy
DiscardOldestPolicy策略会将队列中最旧的一个任务丢弃,即直接将队头的任务移除即可
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
注意Java阻塞队列的实现都是入队添加到队尾,从队头出队,所以上面的代码就是直接poll队头任务。
AbortPolicy实际就是直接拒绝执行任务,丢了个异常出来就不管了。
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
CallerRunsPolicy这个策略在线程池没能力执行任务时,要求提交任务的线程自行执行任务,这个策略也是相对安全的一种策略,建议使用。
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
DiscardPolicy策略将直接把任务丢弃,连异常都不抛,这种策略就比较粗暴,不建议使用。
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
事实上上面四种策略,只有CallerRunsPolicy是推荐使用的,其它三种都有可能造成业务异常,数据丢失等问题,不建议使用。不管是丢弃任务还是抛异常,或是直接不管,都不是我们希望看到的。我们也可以自行实现这个接口,根据实际情况处理。