一、Java中常见的线程池
1.为什么使用线程池
- 重用线程池的线程,避免因为线程的创造和销毁所带来的性能开销。
- 有效控制线程池的最大并发数,避免大量的线程之间因抢占系统资源而阻塞。
- 能够对线程进行简单的管理,并提供一些特定的操作,如:定时、定期、单线程、并发数控制等功能。
2.线程池可能带来的风险
- 死锁
任何多线程应用程序都有死锁风险。当一组进程或线程中的每一个都在等待一个只有该组中另一个进程才能引起的事件时,我们就说这组进程或线程死锁了。 - 资源不足
一般都是由于线程池设置过大引起的。 - 并发错误
线程池和其他排队机制依靠使用wait()和notify()方法,这两个方法都难于使用。如果编码不正确,那么可能会丢失通知,导致线程保持空闲状态,尽管队列中还有任务正在运行。 - 线程泄露
任务抛出一个RuntimeException或Error时,如果池类没有捕捉到他们,那么线程将会退出而线程池的大小将会永久减少一个,当这种情况的次数发生的足够多时,线程池将会为空,系统将停止运行,因为没有可用的线程来处理任务。 - 请求过载
线程池任务队列中,待执行的任务积累过多,消耗太多的系统资源并引起资源缺乏。
二、Executor
在Java5
之后,并发编程引入了一堆启动、管理和调度线程的API
。Executor
框架
便是Java 5
中引入的,其内部使用了线程池机制,它在java.util.cocurrent
包下,通过
该框架来控制线程的启动、执行和关闭,可以简化并发编程的操作。因此,在Java 5
之
后,通过 Executor
来启动线程比使用 Thread
的 start
方法更好,除了更易管理,效率更
好(用线程池实现,节约开销)外,还有关键的一点:有助于避免this
逃逸问题——如果
我们在构造器中启动一个线程,因为另一个任务可能会在构造器结束之前开始执行,此时
可能会访问到初始化了一半的对象用Executor
在构造器中。Eexecutor
作为灵活且强大的
异步执行框架,其支持多种不同类型的任务执行策略,提供了一种标准的方法将任务的提
交过程和执行过程解耦开发,基于生产者-消费者模式,其提交任务的线程相当于生产者,
执行任务的线程相当于消费者,并用Runnable
来表示任务,Executor
的实现还提供了对生
命周期的支持,以及统计信息收集,应用程序管理机制和性能监视等机制。
Executor框架包括:线程池、Executor,Executors,ExecutorService,CompletionService,Futurn,Callable等。
1.Executor和ExecutorService
Executor:一个接口,其定义了一个接收Runnable
对象的方法executor
,其方法签名
为 executor(Runnable command)
,该方法接收一个 Runable
实例,它用来执行一个任务,任务即一个实现了Runnable
接口的类,一般来说,Runnable
任务开辟在新线程中的使用方法为:new Thread(new RunnableTask()).start()
,但在 Executor
中,可以使用Executor
而不用显示地创建线程:executor.execute(new RunnableTask())
,Executor
接口并不严格要求执行是异步的。
ExecutorService:是一个比Executor
使用更广泛的子类接口,其提供了生命周期管理的方法,返回 Future
对象,以及可跟踪一个或多个异步任务执行状况返回Future
的方法;可以调用 ExecutorService
的 shutdown()
方法来平滑地关闭 ExecutorService
,调用该方法后,将导致ExecutorService停止接受任何新的任务且等待已经提交的任务执行完成(已经提交的任务会分两类:一类是已经在执行的,另一类是还没有开始执行的),当所有已经提交的任务执行完毕后将会关闭
ExecutorService `。因此我们一般用该接口来实现和管理多线程。
三、Executors
Executors类,是一个工具、工厂类,提供了一系列工厂方法用于创建线程池,返回的线程池都实现了 ExecutorService 接口。常见的线程池都可以通过此类创建。
1.CachedThreadPool
可缓存线程池。一种线程数量不定的线程池,并且其最大线程数为 Integer.MAX_VALUE
,
这个数是很大的,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则
新建线程。这些池通常会提高执行许多短暂异步任务的程序的性能。调用execute
将重用
以前构造的线程(如果可用)。如果没有可用的线程,将创建一个新的线程并将其添加到
该池中。但是线程池中的空闲线程都有超时限制,这个超时时长是60秒,超过60秒闲置线
程将被终止并从缓存中删除。因此,长时间保持闲置的池将不会消耗任何资源。
创建方法:
public static ExecutorService newCachedThreadPool();
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory);
package thread.day2;
import java.time.LocalTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CachedThreadPoolExample {
public static void main(String[] args) {
//可缓存线程池,空闲线程60s后回收
ExecutorService pool = Executors.newCachedThreadPool();
System.out.println(LocalTime.now());
for (int i = 0; i < 10; i++) {
int num=i;
pool.execute(new Runnable() {
@Override
public void run() {
System.out.println("当前时间:"+LocalTime.now()+",线程:"+Thread.currentThread().getName()
+":"+num);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
}
}
}
2.FixedThreadPool
创建一个可重用固定个数的线程池,当线程处于空闲状态时,他们并不会被回收,除非线程池被关闭了,如果工作线程数量达到线程池初始的最大容量,则将提交的任务存入池队列中。由于 newFixedThreadPool
只有核心线程并且这些核心线程不会被回收,这样它更加快速的响应外界的请求。
创建方式:
public static ExecutorService newFixedThreadPool(int nThreads);
public static ExecutorService newFixedThreadPool(int nThreads,
ThreadFactory threadFactory);
package thread.day2;
import java.time.LocalTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FixedThreadPoolExample {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(2);
System.out.println("当前时间:"+LocalTime.now());
for (int i = 0; i < 10; i++) {
int num=i;
pool.execute(new Runnable() {
@Override
public void run() {
System.out.println("当前时间:"+LocalTime.now()+",线程:"+Thread.currentThread().getName()+":"+num);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
}
}
}
3.ScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。它的线程数量是固定的,它可安排
给定延迟后运行命令或者定期地执行,这类线程池主要用于执行定时任务和具有固定周期
的重复任务。
创建方式:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize,
ThreadFactory
threadFactory)
package thread.day2;
import java.time.LocalTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class Demo08 {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
System.out.println("当前时间:"+ LocalTime.now());
//周期执行
// scheduledExecutorService.scheduleAtFixedRate(()->{
// System.out.println("当前时间:"+LocalTime.now()+",线程:"+Thread.currentThread().getName());
// try {
// Thread.sleep(3000);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
//
// },2,5, TimeUnit.SECONDS);
scheduledExecutorService.scheduleWithFixedDelay(() -> {
System.out.println("当前时间:" + LocalTime.now() + ", 线程: " + Thread.currentThread().getName() + ", 序号:");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 0, 2, TimeUnit.SECONDS);
for (int i = 0; i < 10; i++) {
int num=i;
//正常线程池执行任务
// scheduledExecutorService.execute(()->{
// System.out.println("当前时间:"+LocalTime.now()+",线程:"+Thread.currentThread().getName()+":"+num);
// try {
// Thread.sleep(3000);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// });
//定时执行
// scheduledExecutorService.schedule(()->{
// System.out.println("当前时间:"+LocalTime.now()+",线程:"+Thread.currentThread().getName()+":"+num);
// try {
// Thread.sleep(3000);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
//
// },2, TimeUnit.SECONDS);
}
}
}
区别:
scheduleAtFixedRate方法,如果要执行的任务耗时比任务的间隔时间长,下次任务执行的时间就是上次任务执行完就执行。(排队)
scheduleWithFixedDelay方法,下次任务的执行时间就是上次任务执行所需的时间加上时间间隔。
4.SingleThreadExecutor
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证任务按照指定顺序(FIFO,LIFO)执行。
public static ExecutorService newSingleThreadExecutor();
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
package thread.day2;
import java.time.LocalTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 使用单线程的线程池可以保证任务按照提交的顺序依次执行,每次只有一个任务在执行。
*/
public class Demo07 {
public static void main(String[] args) {
//创建一个单线程的线程池
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
int num=i;
executorService.execute(()->{
System.out.println("当前时间:"+ LocalTime.now()+",线程:"+Thread.currentThread().getName()+":"+num);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
executorService.shutdown();
}
}
四、Executor VS ExecutorService VS Executors
这三者均是Executor
框架中的一部分。总结一下这三者间的区别:
ExecutorService
接口继承了Executor
接口,是Executor
的子接口。Executor
接口定义了一个execute()
方法用来接收一个Runnable
接口的对象,而ExecutorService
接口中的submit()
方法可以接收Runnable
和Callable
接口的对象。Executor
接口中的execute()
方法不返回任何结果,而ExecuteService
接口中的submit()
方法可以通过一个Future
对象返回运行结果。- 除了允许客户端提交一个任务,
ExecutorService
还提供用来控制线程池的方法。比如:调用shutDown()
方法终止线程池。 Executors
类提供工厂方法用来创建不同类型的线程池。比如:newSingleThreadExecutor()
创建一个只有一个线程的线程池,newFixedThreadPool(int numOfThreads)
来创建固定线程数的线程池,newCachedThreadPool()
可以根据需要创建新的线程,但如果已有线程是空闲的会重用已有线程。
阿里开发规范中强制要求禁用Executors创建线程池。
【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors返回的线程池的弊端如下:
1) FixedThreadPool 和 SingleThreadPool :
允许的请求队列的长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
2) CachedThreadPool 和 ScheduledThreadPool :
允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。
判断核心线程数是否已满,核心线程数大小和corePoolSize
参数有关,未满则创建线程执行任务,若核心线程池已满,判断队列是否满,队列是否满和workQueue
参数有关,若未满则加入队列中,若队列已满,判断线程池是否已满,线程池是否已满和maximumPoolSize
参数有关,若未满创建线程执行任务,若线程池已满,则采用拒绝策略处理无法执执行的任务,拒绝策略和handler
参数有关
五、ThreadPoolExecutor
使用 ThreadPoolExecutor
可以创建出符合自己的业务场景需要的线程池。
public class ThreadPoolExecutor extends AbstractExecutorService
1.构造方法
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
TimeUnit unit,BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,
RejectedExecutionHandler handler)
实际上最后都调用的是最后一个(参数最多的那个)构造方法:
- corePoolSize:线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程
执行任务,直到当前线程数等于corePoolSize
;如果当前线程数为corePoolSize
,
继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAll CoreThreads()
方法,线程池会提前创建并启动所有核心线程。当线程数小于等于core PoolSize
时,默认情况下线程会一直存活在线程池中,即使线程处于空闲状态。如果
allowCoreThreadTimeOut
被设置为true
时,无论线程数多少,那么线程处于空闲状
态超过一定时间就会被销毁掉。 - maximumPoolSize:线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提 交任务,则创建新的线程执行任务,前提是当前线程数小于 maximumPoolSize 。
- keepAliveTime: 线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时 间;默认情况下,该参数只在线程数大于corePoolSize 时才有用;如果 allowCoreThr eadTimeOut 被设置为 true时,无论线程数多少,线程处于空闲状态超过一定时间就会被销毁掉。
- unit: keepAliveTime 的单位。 TimeUnit 是一个枚举类型,其包括:
- NANOSECONDS :1微毫秒 = 1微秒 / 1000
- MICROSECONDS :1微秒 = 1毫秒 / 1000
- MILLISECONDS :1毫秒 = 1秒 /1000
- SECONDS :秒
- MINUTES :分
- HOURS :小时
- DAYS :天 - workQueue: 用来保存等待被执行的任务的阻塞队列,且任务必须实现 Runable 接口,
有如下阻塞队列:
- ArrayBlockingQueue :基于数组结构的有界阻塞队列,按FIFO排序任务;
- LinkedBlockingQuene :基于链表结构的无界阻塞队列,按FIFO排序任务,吞吐量
通常要高于 ArrayBlockingQuene ;
- SynchronousQuene :一个不存储元素的阻塞队列,每个插入操作必须等到另一个
线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 LinkedBl
ockingQuene ; - threadFactory: 创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置
一个具有识别度的线程名 - handler: 线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提
交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
- AbortPolicy :直接抛出异常,默认策略;
- CallerRunsPolicy :用调用者所在的线程来执行任务;
- DiscardOldestPolicy :丢弃阻塞队列中靠最前的任务,并执行当前任务
- DiscardPolicy :直接丢弃任务;
ScheduledThreadPool 线程池是使用 ScheduledThreadPoolExecutor 类创建的。
ScheduledThreadPoolExecutor 类是 ThreadPoolExecutor 的子类。