文章目录
- 一、线程池是什么?
- 二、为什么要使用线程池?
- 三、jdk自带的四种线程池
- 1. 线程池参数
- 2.工作队列
- 3.拒绝策略
- 4.四种线程池一些示例
- 四、自定义线程池
一、线程池是什么?
一种线程使用模式,是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。
二、为什么要使用线程池?
(1) 降低资源消耗。 通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
(2) 提高响应速度。 当任务到达时,任务可以不需要等到线程创建就能立即执行。
(3) 提高线程的可管理性。 线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配、调优和监控。
三、jdk自带的四种线程池
- newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
- newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
- newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
- newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序执行。
简单使用,代码如下(示例):
public class UseExecutors {
public static void main(String[] args) {
Runnable taskOne = () -> {
ThreadUtils.sleep(1000);
System.out.println(Thread.currentThread().getName()+":taskOne");
};
// ExecutorService pools = Executors.newCachedThreadPool();
// ExecutorService pools = Executors.newSingleThreadExecutor();
// ExecutorService pools = Executors.newScheduledThreadPool(10);
ExecutorService pools = Executors.newFixedThreadPool(10);
for (int i = 0; i < 40; i++) {
pools.submit(taskOne);
}
}
}
无论是哪一个,都调用ThreadPoolExecutor
构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
1. 线程池参数
线程池有七个参数,意义如下:
corePoolSize | 指定了线程池里的线程数量,核心线程池大小 |
maximumPoolSize | 指定了线程池里的最大线程数量 |
keepAliveTime | 当线程池线程数量大于corePoolSize时候,多出来的空闲线程,多长时间会被销毁 |
unit | 时间单位,TimeUnit |
workQueue | 任务队列,用于存放提交但是尚未被执行的任务 |
threadFactory | 线程工厂,用于创建线程,线程工厂就是给我们new线程的 |
handler | 所谓拒绝策略,是指将任务添加到线程池中时,线程池拒绝该任务所采取的相应策略 |
2.工作队列
常见的工作队列我们有如下选择,这些都是阻塞队列,阻塞队列的意思是,当队列中没有值的时候,取值操作会阻塞,一直等队列中产生值。
• ArrayBlockingQueue:基于数组结构的有界阻塞队列,FIFO。
• LinkedBlockingQueue:基于链表结构的无界阻塞队列,FIFO。
3.拒绝策略
线程池提供了四种拒绝策略:
• AbortPolicy:直接抛出异常,默认策略;
• CallerRunsPolicy:用调用者所在的线程来执行任务;
• DiscardOldestPolicy:丢弃阻塞队列中最靠前的任务,并执行当前任务;
• DiscardPolicy:直接丢弃任务;
线程池中,有三个重要的参数,决定影响了拒绝策略:
corePoolSize - 核心线程数,也即最小的线程数。
workQueue - 阻塞队列 。
maximumPoolSize - 最大线程数
当提交任务数大于 corePoolSize 的时候,会优先将任务放到 workQueue 阻塞队列中。当阻塞队列饱和后,会扩充线程池中线程数,直到达到 maximumPoolSize 最大线程数配置。此时,再多余的任务,则会触发线程池的拒绝策略了。也就是说当提交的任务数大于(workQueue.size() + maximumPoolSize ),就会触发线程池的拒绝策略。
线程池执行任务流程如下:
4.四种线程池一些示例
- newCachedThreadPool
示例代码
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
核心线程池大小=0
最大线程池大小为Integer.MAX_VALUE
线程过期时间为60s
使用SynchronousQueue作为工作队列.
所以线程池为0-max个线程,并且会60s过期,实现了可以缓存的线程池。
- newFixedThreadPool
示例代码
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
核心线程池大小=传入参数
最大线程池大小为传入参数
线程过期时间为0ms
LinkedBlockingQueue作为工作队列.
通过最小与最大线程数量来控制实现定长线程池.
- newScheduledThreadPool
示例代码
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
核心线程池大小=传入参数
最大线程池大小为Integer.MAX_VALUE
线程过期时间为0ms
DelayedWorkQueue作为工作队列.
主要是通过DelayedWorkQueue来实现的定时线程。
- newSingleThreadExecutor
示例代码
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
核心线程池大小=1
最大线程池大小为1
线程过期时间为0ms
LinkedBlockingQueue作为工作队列.
四、自定义线程池
这里是针对JDK1.8版本,使用JDK自带的线程池会出现OOM问题,
中小型公司一般很难遇到,在阿里巴巴开发文档上面有明确的标识:
上边我们已经分析了线程池的几个参数,这几个参数核心线程数、最大线程数、活跃时间和单位根据服务器本身的性能和程序的特性设定。但是线程工厂、决绝策略、阻塞队列又该怎么搞呢?
拒绝策略其实很简单,ExecutorService构造时可以不传递拒绝策略,默认使用异常抛出的方式。
阻塞队列我们搞一个定长的队列就好了,ArrayBlockingQueue<>(DEFAULT_SIZE)
线程工厂的获取我们可以使用以下的方法:
第一种办法,看看原生的怎么搞一个线程工厂:
进入看他的源码:
private static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
@SuppressWarnings("removal")
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
线程工厂就是创建线程的,这里用到了一种设计模式,叫工厂设计模式。我们可以按照他的方式自己写一个.
public class MyThreadFactory implements ThreadFactory {
// @Override
// public Thread newThread(Runnable r) {
// return new Thread(r);
// }
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
@SuppressWarnings("removal")
MyThreadFactory(String name) {
// @SuppressWarnings("removal")
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = name + "-" +
poolNumber.getAndIncrement() +
"-thread-";
}
MyThreadFactory(){
this("default");
}
public Thread newThread(Runnable r) {
// 就是在创建线程
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
第二种:Google guava 工具类 提供的 ThreadFactoryBuilder 。
ThreadFactory guavaThreadFactory = new ThreadFactoryBuilder().setNameFormat("retryClient-pool-").build();
第三种:Apache commons-lang3 提供的 BasicThreadFactory。
ThreadFactory basicThreadFactory = new BasicThreadFactory.Builder()
.namingPattern("basicThreadFactory-").build();
因此,定义一下线程池示例:
public class AsyncProcessor {
/**
* 默认最大并发数<br>
*/
private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2;
/**
* 线程池名称格式
*/
private static final String THREAD_POOL_NAME = "kf1-log-%d";
/**
* 线程工厂名称
*/
private static final ThreadFactory FACTORY = new BasicThreadFactory.Builder().namingPattern(THREAD_POOL_NAME)
.daemon(true).build();
/**
* 默认队列大小
*/
private static final int DEFAULT_SIZE = 500;
/**
* 默认线程存活时间
*/
private static final long DEFAULT_KEEP_ALIVE = 60L;
/**
* NewEntryServiceImpl.java:689
* Executor
*/
private static ExecutorService executor;
/**
* 执行队列
*/
private static BlockingQueue<Runnable> executeQueue = new ArrayBlockingQueue<>(DEFAULT_SIZE);
static {
executor = new ThreadPoolExecutor(
DEFAULT_MAX_CONCURRENT,
DEFAULT_MAX_CONCURRENT * 4,
DEFAULT_KEEP_ALIVE,
TimeUnit.SECONDS,
executeQueue,
FACTORY);
}
/**
* 此类型无法实例化
*/
private AsyncProcessor() {
}
public static boolean executeTask(Runnable task) {
try {
executor.execute(task);
} catch (RejectedExecutionException e) {
System.out.println("Task executing was rejected.");
return false;
}
return true;
}
/**
* 提交任务,并可以在稍后获取其执行情况<br>
* 当提交失败时,会抛出 {@link }
* @param task
* @return
*/
public static <T> Future<T> submitTask(Callable<T> task) {
try {
return executor.submit(task);
} catch (RejectedExecutionException e) {
throw new UnsupportedOperationException("Unable to submit the task, rejected.", e);
}
}
}