在上一篇博客中有说到线程的基本原理和用法,其实Java中开辟出了一种管理线程的概念,这个概念叫做线程池,线程池的好处,就是可以方便的管理线程,也可以减少内存的消耗。那么,我们应该如何创建一个线程池?线程池的原理是怎么样的?常见的线程池有哪些呢?
一. 线程池简介
1、线程池的概念:
线程池就是首先创建一些线程,它们的集合称为线程池。使用线程池可以很好地提高性能,线程池在系统启动时即创建大量空闲的线程,程序将一个任务传给线程池,线程池就会启动一条线程来执行这个任务,执行结束以后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个任务。
2、线程池的工作机制
(1) 在线程池的编程模式下,任务是提交给整个线程池,而不是直接提交给某个线程,线程池在拿到任务后,就在内部寻找是否有空闲的线程,如果有,则将任务交给某个空闲的线程。
(2)一个线程同时只能执行一个任务,但可以同时向一个线程池提交多个任务。
3、使用线程池的原因:
多线程运行时间,系统不断的启动和关闭新线程,成本非常高,会过渡消耗系统资源,以及过渡切换线程的危险,从而可能导致系统资源的崩溃。这时,线程池就是最好的选择了。
二、常用的线程池
1、 ExecutorService
(1)ExecutorService简介
ExecutorService是Java提供的用于管理线程池的接口,这个接口表述了异步执行的机制,并且可以让任务在后台执行。
package com.it520.threadpool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExecutorServiceDemo {
public static void main(String[] args) {
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
newFixedThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println("ExecutorServiceDemo.main(...).new Runnable() {...}.run()");
}});
newFixedThreadPool.shutdown();
}
}
首先使用 newFixedThreadPool() 工厂方法创建一个 ExecutorService ,上述代码创建了一个可以容纳10個线程任务的线程池。其次,向 execute() 方法中传递一个异步的 Runnable 接口的实现,这样做会让 ExecutorService 中的某个线程执行这个Runnable 线程。
(2)任务委托(Task Delegation)
下方展示了一个线程的把任务委托异步执行的ExecutorService的示意图。
线程把任务委托给 ExecutorService,该线程就会继续执行与运行任务无关的其它任务;
(3)ExecutorService的实现
由于ExecutorService是一个接口,他的实现是需要提供这个接口的实现,ExecutorService 接口在 java.util.concurrent 包中有如下实现类:
- ThreadPoolExecutor
- ScheduledThreadPoolExecutor
可以根据自己的需要来创建一个 ExecutorService ,也可以使用 Executors 工厂方法来创建一个 ExecutorService 实例。如下:
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(10);
ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
这也是用也是用常见的四种线程池来创建ExecutorService.
(4)ExecutorService的使用方法
这里有以下几种不同的方式让你将任务委托给ExecutorService:
下面就对这些方法进行一一讲解
a、execute(Runnable)
方法 execute(Runnable) 接收一个 java.lang.Runnable 对象作为参数,并且以异步的方式执行它。
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
newFixedThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println("ExecutorServiceDemo.main(...).new Runnable() {...}.run()");
}});
newFixedThreadPool.shutdown();
b、submit(Runnable)
方法 submit(Runnable) 同样接收一个 Runnable 的实现作为参数,但是会返回一个Future 对象。这个 Future 对象可以用于判断 Runnable 是否结束执行
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
Future<?> future = newCachedThreadPool.submit(new Runnable() {
@Override
public void run() {
System.out.println("future run");
}
});//如果任务结束执行则返回 null
System.out.println(future.get());
c、submit(Callable)
方法 submit(Callable) 和方法 submit(Runnable) 比较类似,但是区别则在于它们接收不同的参数类型。Callable 的实例与 Runnable 的实例很类似,但是 Callable 的 call() 方法可以返回一个结果。方法 Runnable.run() 则不能返回结果。
ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(10);
Future future2 = newScheduledThreadPool.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
System.out.println("submit Callable");
return "Callable Result";
}
});
System.out.println(future2.get());
}
}
d、inVokeAny()
方法 invokeAny() 接收一个包含 Callable 对象的集合作为参数。调用该方法不会返回 Future 对象,而是返回集合中某一个 Callable 对象的结果,而且无法保证调用之后返回的结果是哪一个Callable,只知道它是这些 Callable 中一个执行结束的 Callable 对象。
如果一个任务运行完毕或者抛出异常,方法会取消其它的 Callable 的执行。
ExecutorService executorService = Executors.newSingleThreadExecutor();
Set<Callable<String>> callables = new HashSet<Callable<String>>();
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 1";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 2";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 3";
}
});
String result = executorService.invokeAny(callables);
System.out.println("result = " + result);
executorService.shutdown();
e、invokeAll()
方法 invokeAll() 会调用存在于参数集合中的所有 Callable 对象,并且返回一个包含 Future 对象的集合,你可以通过这個返回的集合来管理每個个Callable 的执行结果。
需要注意的是,任务有可能因为异常而导致运行结束,所以它可能并不是真的成功运行了。但是我们没有办法通过 Future 对象来了解到这个差异。
ExecutorService executorService = Executors.newSingleThreadExecutor();
Set<Callable<String>> callables = new HashSet<Callable<String>>();
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 1";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 2";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 3";
}
});
List<Future<String>> futures = executorService.invokeAll(callables);
for(Future<String> future : futures){
System.out.println("future.get = " + future.get());
}
executorService.shutdown();
}
}
(5)ExecuteService 服务的关闭
当使用 ExecutorService 完毕之后,我们应该关闭它,这样才能保证线程不会继续保持运行状态。
举例来说,如果你的程序通过 main() 方法启动,并且主线程退出了你的程序,如果你还有一个活动的 ExecutorService 存在于你的程序中,那么程序将会继续保持运行状态。存在于 ExecutorService 中的活动线程会阻止Java虚拟机关闭。
为了关闭在 ExecutorService 中的线程,你需要调用 shutdown() 方法。ExecutorService 并不会马上关闭,而是不再接收新的任务,一但所有的线程结束执行当前任务,ExecutorServie 才会真的关闭。所有在调用 shutdown() 方法之前提交到 ExecutorService 的任务都会执行。
如果你希望立即关闭 ExecutorService,你可以调用 shutdownNow() 方法。这個方法会尝试马上关闭所有正在执行的任务,并且跳过所有已经提交但是还没有运行的任务。但是对于正在执行的任务,是否能够成功关闭它是无法保证的,有可能他们真的被关闭掉了,也有可能它会一直执行到任务结束。这是一个最好的尝试。
关于ExecuteService 的部分知识点转自英文原文链接:http://tutorials.jenkov.com/java-util-concurrent/executorservice.html#executorservice-example ,中文译文首发开源中国社区 http://my.oschina.net/bairrfhoinn/blog/177639
2、ThreadPoolExecutor+BlockingQueue
(1)ThreadPoolExecutor参数
ThreadPoolExecutor构造方法参数如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
a、corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。
b、workQueue(任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列。
- ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
- LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列
- SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量要高于LinkedBlockingQueue。
- PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
c、maximumPoolSize(线程池最大大小):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。
d、ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程做些更有意义的事情,比如设置daemon和优先级等等
e、RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下是JDK1.5提供的四种策略。
- AbortPolicy:直接抛出异常。
- CallerRunsPolicy:只用调用者所在线程来运行任务。
- DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
- DiscardPolicy:不处理,丢弃掉。
也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。
f、keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。
g、TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
根据上面的描述,我相信我们能够在熟悉参数的情况下自定义自己的线程池,但是我们发现在jdk帮助文档里面有这样一句话
强烈建议程序员使用较为方便的 Executors 工厂方法 Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)和Executors.newSingleThreadExecutor()(单个后台线程),它们均为大多数使用场景预定义了设置。
(2)ThreadPoolExecutor执行顺序
a、当currentSize<corePoolSize时,创建线程。
b、当currentSize>=corePoolSize、并且workQueue未满时,将任务放入任务队列。
c、当currentSize>=corePoolSize、workQueue已满
-
若currentSize<maximumPoolSize,创建线程
-
若currentSize>maximumPoolSize,抛出RejectExecutionExpection异常,拒绝任务
(3)BlockingQueue
BlockingQueue是一个特殊的队列,当我们从BlockingQueue中取数据时,如果BlockingQueue是空的,
则取数据的操作会进入到阻塞状态,当 BlockingQueue 中有了新数据时,这个取数据的操作又会被重新唤醒。
同理,如果 BlockingQueue 中的数据已经满了,往 BlockingQueue 中存数据的操作又会进入阻塞状态,直到 BlockingQueue 中又有新的空间,存数据的操作又会被重新唤醒。它的泛型限定它是用来存放 Runnable 对象的。
a、几种常用的BlockingQueue
-
ArrayBlockingQueue:
这个表示一个规定了大小的 BlockingQueue,ArrayBlockingQueue 的构造函数接受一个 int 类型的数据,该数据表示BlockingQueue 的大小,存储在 ArrayBlockingQueue 中的元素按照 FIFO(先进先出)的方式来进行存取。
-
LinkedBlockingQueue:
这个表示一个大小不确定的 BlockingQueue,在LinkedBlockingQueue 的构造方法中可以传一个 int 类型的数据,这样创建出来的 LinkedBlockingQueue是有大小的,默认LinkedBlockingQueue的大小就为 Integer.MAX_VALUE。 -
PriorityBlockingQueue:
这个队列和 LinkedBlockingQueue 类似,不同的是PriorityBlockingQueue 中的元素不是按照 FIFO 来排序的,而是按照元素的Comparator 来决定存取顺序的(这个功能也反映了存入 PriorityBlockingQueue 中的数据必须实现了 Comparator 接口)。 -
SynchronousQueue:
这个是同步 Queue,属于线程安全的 BlockingQueue的一种,在 SynchronousQueue 中,生产者线程的插入操作必须要等待消费者线程的移除操作,Synchronous 内部没有数据缓存空间,因此我们无法对 SynchronousQueue进行读取或者遍历其中的数据,元素只有在你试图取走的时候才有可能存在。我们可以理解为生产者和消费者互相等待,等到对方之后然后再一起离开。
b、常用线程池对应的BlockingQueue
- newFixedThreadPool()—>LinkedBlockingQueue
- newSingleThreadExecutor()—>LinkedBlockingQueue
- newCachedThreadPool()—>SynchronousQueue
- newScheduledThreadPool()—>DelayedWorkQueue
3、常用线程池
具体的4种常用的线程池实现如下:(返回值都是ExecutorService)
(1)Executors.newFixedThreadPool()
Executors.newFixedThreadPool(int n):创建一个可重用固定个数的线程池,以共享的无界队列方式来运行这些线程。
package com.it520;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolTest {
public static void main(String[] args) {
//创建一个可重用固定个数的线程池
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 6; i++)
fixedThreadPool.execute(new Runnable() {
public void run() {
try {
//打印正在执行的缓存线程信息
System.out.println(Thread.currentThread().getName()+"正在被执行");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
执行结果:
pool-1-thread-2正在被执行
pool-1-thread-3正在被执行
pool-1-thread-1正在被执行
pool-1-thread-2正在被执行
pool-1-thread-1正在被执行
pool-1-thread-3正在被执行
线程池的担心为3,每个线程执行后sleep2秒,这时其他线程进入执行,所以每2秒有三个线程执行输出结果。
(2) Executors.newSingleThreadExecutor()
Executors.newSingleThreadExecutor():创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolTest {
public static void main(String[] args) {
//创建一个单线程化的线程池
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
final int index = i;
singleThreadExecutor.execute(new Runnable() {
public void run() {
try {
//结果依次输出,相当于顺序执行各个任务
System.out.println(Thread.currentThread().getName()+"正在被执行,打印的值是:"+index);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
执行结果:
pool-1-thread-1正在被执行,打印的值是:0
pool-1-thread-1正在被执行,打印的值是:1
pool-1-thread-1正在被执行,打印的值是:2
pool-1-thread-1正在被执行,打印的值是:3
pool-1-thread-1正在被执行,打印的值是:4
(3)Executors.newCachedThreadPool()
Executors.newCacheThreadPool():可缓存线程池,先查看池中有没有以前建立的线程,如果有,就直接使用。如果没有,就建一个新的线程加入池中,缓存型池子通常用于执行一些生存期很短的异步型任务。
package com.it520;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolTest {
public static void main(String[] args) {
//创建一个可缓存线程池
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
try {
//sleep可明显看到使用的是线程池里面以前的线程,没有创建新的线程
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
cachedThreadPool.execute(new Runnable() {
public void run() {
//打印正在执行的缓存线程信息
System.out.println(Thread.currentThread().getName()+"正在被执行");
}
});
}
}
}
线程池为无限大,当执行当前任务时上一个任务已经完成,会复用执行上一个任务的线程,而不用每次新建线程.
(4)Executors.newScheduledThreadPool()
Executors.newScheduledThreadPool(int n):创建一个定长线程池,支持定时及周期性任务执行。
延迟执行代码如下:
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ThreadPoolTest {
public static void main(String[] args) {
System.out.println("=======");
//创建一个定长线程池,支持延迟执行
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
//延迟1秒执行
scheduledThreadPool.schedule(new Runnable() {
public void run() {
System.out.println("延迟1秒执行");
}
}, 1, TimeUnit.SECONDS);
}
}
执行结果:
=======
延迟1秒执行
周期性任务执行代码如下:
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ThreadPoolTest {
public static void main(String[] args) {
System.out.println("=============");
//创建一个定长线程池,支持定时及周期性任务执行——定期执行
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
//延迟1秒后每3秒执行一次
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
public void run() {
System.out.println("延迟1秒后每2秒执行一次");
}
}, 1, 2, TimeUnit.SECONDS);
}
}
执行结果如下:
=============
延迟1秒后每2秒执行一次
延迟1秒后每2秒执行一次
延迟1秒后每2秒执行一次
以上就是关于线程池的一些总结,希望对大家有所帮助。
本文参考一下文档:
https://blog.csdn.net/hnd978142833/article/details/80253784
https://blog.csdn.net/u013541140/article/details/95225769
「DeMonnnnnn」的原创文章。
https://blog.csdn.net/DeMonliuhui/article/details/86477888