1.线程池
1.1.线程池产生背景
1>.线程是一种系统资源,每创建一个新的线程都需要占用一定的内存(分配栈内存),在高并发场景下,某一时刻有大量请求访问系统,如果针对每个请求(任务)都创建一个新的线程,那么对内存的占用是相当大的,有可能还出现OOM(内存溢出),甚至会导致整个系统崩溃;
2>.线程的执行需要消耗CPU资源,由于计算机CPU核心数的有限的,如果在系统中创建了大量线程,那么就会有一部分线程在执行过程中无法获取到CPU执行权(/CPU时间片)而处于阻塞状态,从而引起线程上下文切换问题(保存线程的运行状态,下次运行时再恢复到之前的状态).线程上下文的频繁切换对系统性能也有很大的影响,尤其是在高并发环境下,频繁切换线程上下文反而会导致系统性能降低;
针对上述问题可以知道,系统中线程并不是创建的越多越好,而是需要一个容器将数量有限的线程管理起来,对这些线程进行复用(享元模式),以此来减少系统资源的占用.基于此,线程池应运而生;
1.2.线程池概述
线程池是指在初始化一个多线程应用程序过程中提供一个线程集合,在需要执行新的任务时重用这些线程而不是每次都新建一个线程,避免了创建和销毁线程的额外开销,提高响应速度.
线程池中线程的数量通常完全取决于可用内存数量和应用程序的需求.然而,增加可用线程数量是可能的.线程池中的每个线程都有被分配一个任务,一旦任务已经完成了,线程回到线程池中并等待下一次分配任务;
***注意: 线程池中的线程都是非守护线程,不会随着主线程的结束而结束;
1.3.线程池特点
1.3.1.主要特点
1>.线程复用
2>.控制最大并发数
3>.管理线程
…
1.3.2. 优势
1>.降低资源消耗.
通过重复利用已经创建好的线程降低线程创建和销毁造成的资源消耗;
2>.提高响应速度.
当任务到达时,任务可以不需要等待线程创建就可以立即执行;
3>.提高线程的可管理性.
线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控;
1.4.JDK提供的线程池
1.4.1.ThreadPoolExecutor对象
1.4.2.线程池的状态
ThreadPoolExecutor对象使用"int的高3位"来表示线程池状态,"低29位"表示线程数量;
从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING!
***注意:RUNNING状态对应的数字"111",最左侧是数字是符号位,"1"表示负("-"),因此它代表的数字是最小的!
说明:
这些信息存储在一个
原子变量ctl中
,目的是将线程池状态与线程个数合二为一
,这样就可以用一次cas原子操作进行赋值
;// c为旧值(期望值),ctlOf返回结果为新值 ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))); // rs为高3位代表线程池状态, wc为低29位代表线程个数,ctl是合并它们 private static int ctlOf(int rs, int wc) { return rs | wc; }
1.4.3.线程池构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数说明:
①,corePoolSize: 核心线程数目(最多保留的线程数)
②.maximumPoolSize: 最大线程数目,它等于corePoolSize+额外创建的线程数
③.keepAliveTime: 生存时间-针对救急线程(额外创建线程)
④.unit: 时间单位-针对救急线程(额外的线程)
⑤.workQueue: 阻塞队列
⑥.threadFactory: 线程工厂-可以为线程创建时起个好名字
⑦.handler: 拒绝策略
根据这个构造方法,JDK Executors类中提供了众多工厂方法来创建各种用途的线程池;
1.4.4.线程池工作方式
***注意: 救急线程有生存时间(执行完任务,一段时间内没有执行其他任务,就会被销毁),而核心线程没有生存时间(会一直运行)!
说明:
①.线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务;
(线程的创建是懒加载的!)
②.当线程池中的线程数达到corePoolSize并且没有线程空闲(所有的线程都在执行任务),这时再加入任务,新加的任务会被加入workQueue队列排队,直到有空闲的线程;
③.如果队列选择了
有界队列
,那么任务超过了队列大小时,会创建(maximumPoolSize - corePoolSize)数目的线程来救急;
④.如果线程数到达maximumPoolSize仍然有新任务,这时会执行拒绝策略.拒绝策略jdk提供了4 种实现,其它著名框架也提供了实现:
AbortPolicy:让调用者抛出RejectedExecutionException异常,这是默认策略;
- CallerRunsPolicy:让调用者运行任务(在调用者线程中执行任务);
- DiscardPolicy:放弃本次任务;
- DiscardOldestPolicy:放弃队列中最早的任务,本任务取而代之;
⑤.当(流量)高峰过去后,超出corePoolSize的救急线程如果一段时间没有任务可做,需要结束它们以节省资源,这个时间由keepAliveTime和unit来控制;
扩展: 其他框架提供的拒绝策略实现
①.Dubbo的实现: 在抛出RejectedExecutionException异常之前会记录日志,并dump 线程栈信息,方便定位问题;
②.Netty的实现: 创建一个新线程来执行任务;
③.ActiveMQ的实现:带超时等待(60s),之后尝试放入队列,类似我们之前自定义的拒绝策略;
④.PinPoint的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略;
1.4.5.newFixedThreadPool: 固定线程数的线程池
1>.底层源码:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
2>.特点:
①.核心线程数 = 最大线程池数,没有救急线程被创建,因此也无需超时时间;
②.阻塞队列是无界的,可以存放任意数量的任务;
3>.使用场景
适用于任务量已知,相对耗时的任务;
1.4.6.newCachedThreadPool: 带缓存功能的线程池
1>.底层源码:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
2>.特点
①.核心线程数是0,最大线程数是Integer.MAX_VALUE,救急线程的空闲生存时间(默认)是60s,意味着:
- 全部都是救急线程(60s后可以回收);
- 救急线程可以无限创建(Integer.MAX_VALUE);
②.队列采用了SynchronousQueue实现,特点是它没有容量,没有线程来取是放不进去的(一手交钱、一手交货)
例子:SynchronousQueue队列
@Slf4j
public class TestNewCacheThreadPool {
public static void main(String[] args) throws InterruptedException {
SynchronousQueue<Integer> integers = new SynchronousQueue<Integer>();
new Thread(() -> {
try {
log.info("putting{}",1);
integers.put(1);
log.info("putted");
log.info("putting{}",2);
integers.put(2);
log.info("putted");
}catch (Exception ex){
ex.printStackTrace();
}
},"线程t1").start();
Thread.sleep(1000);
new Thread(() -> {
try {
log.info("taking {}", 1);
integers.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t2").start();
Thread.sleep(1);
new Thread(() -> {
try {
log.info("taking {}", 2);
integers.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t3").start();
}
}
3>.使用场景:
整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲分钟后释放线程.
适合任务数比较密集,但每个任务执行时间较短的情况;
1.4.7.newSingleThreadExecutor : 单线程的线程池
1>.底层源码:
public static ExecutorService newSingleThreadExecutor() {
//装饰器模式
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
2>.特点
①.如果是自己创建一个单线程串行执行任务,一旦任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作;
②.Executors.newSingleThreadExecutor()线程个数始终为1,不能修改;
- FinalizableDelegatedExecutorService应用的是装饰器模式,只对外暴露了ExecutorService接口,因此不能调用ThreadPoolExecutor中特有的方法;
③.Executors.newFixedThreadPool(1)初始时为1,以后还可以修改;
- 对外暴露的是ThreadPoolExecutor对象,可以强转后调用setCorePoolSize等方法进行修改;
3>.使用场景
希望多个任务排队执行
.线程数固定为1,任务数多于1时,会放入无界队列排队.任务执行完毕,这唯一的线程也不会被释放;
1.4.8.newScheduledThreadPool
1>.在"任务调度线程池"功能加入之前,可以使用java.util.Timer来实现定时功能,Timer的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务;
2>.整个线程池表现为:线程数固定,任务数多于线程数时,会放入无界队列排队.任务执行完毕,这些线程也不会被释放.用来执行延迟或反复执行的任务;
3>.例子:
@Slf4j
public class TestScheduledExecutorService {
public static void main(String[] args) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// 添加两个任务,希望它们都在 1s 后执行
executor.schedule(() -> {
log.info("任务1,执行时间:" + new Date());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
}, 1000, TimeUnit.MILLISECONDS);
executor.schedule(() -> {
log.info("任务2,执行时间:" + new Date());
}, 1000, TimeUnit.MILLISECONDS);
}
}
扩展:
1>.scheduleAtFixedRate: 定时执行任务,反复不停的执行
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
log.info("start...");
executor.scheduleAtFixedRate(() -> {
log.info("running...");
}, 1, 1, TimeUnit.SECONDS);
2>.scheduleAtFixedRate: 延迟不停地执行
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.info("start...");
pool.scheduleWithFixedDelay(()-> {
log.info("running...");
try {
//延迟时间=sleep时间+设置的间隔时间
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 1, 1, TimeUnit.SECONDS);
scheduleWithFixedDelay的间隔是:上一个任务结束<-> 延时 <-> 下一个任务开始,所以间隔都是3s!
1.4.9.提交任务
// 执行任务
void execute(Runnable command);
// 提交任务task,用返回值Future获得任务执行结果
<T> Future<T> submit(Callable<T> task);
// 提交tasks中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
// 提交tasks中所有任务,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit) throws InterruptedException;
// 提交tasks中所有任务,但是不会执行所有的任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
// 提交tasks中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
1.4.10.关闭/停止线程池
1>.shutdown: 优雅的停止
/*
线程池状态变为 SHUTDOWN
- 不会接收新任务
- 但已提交任务会执行完
- 此方法不会阻塞调用线程的执行
*/
void shutdown();
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改线程池状态
advanceRunState(SHUTDOWN);
// 仅会打断空闲线程
interruptIdleWorkers();
onShutdown(); // 扩展点ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)
tryTerminate();
}
2>.shutdownNow: 强制停止
/*
线程池状态变为 STOP
- 不会接收新任务
- 会将队列中的任务返回
- 并用 interrupt 的方式中断正在执行的任务
*/
List<Runnable> shutdownNow();
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改线程池状态
advanceRunState(STOP);
// 打断所有线程
interruptWorkers();
// 获取队列中剩余任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 尝试终结
tryTerminate();
return tasks;
}
3>.其他方法
// 不在 RUNNING 状态的线程池,此方法就返回 true
boolean isShutdown();
// 线程池状态是否是 TERMINATED
boolean isTerminated();
// 调用shutdown后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池TERMINATED后做些事情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
1.4.11.正确处理执行任务异常
1>.方法1:主动捕捉异常
ExecutorService pool = Executors.newFixedThreadPool(1);
pool.submit(() -> {
try {
log.debug("task1");
int i = 1 / 0;
} catch (Exception e) {
log.error("error:", e);
}
});
2>.方法2:使用Future收集异常
ExecutorService pool = Executors.newFixedThreadPool(1);
Future<Boolean> f = pool.submit(() -> {
log.debug("task1");
int i = 1 / 0;
return true;
});
log.info("result:{}", f.get());
1.4.12.线程池应用–定时任务
1>.需求
让每周四 18:00:00 定时执行任务
2>.代码实现:
@Slf4j
public class TestScheduleDemo {
public static void main(String[] args) {
//当前时间
LocalDateTime now = LocalDateTime.now();
//以当前时间为基础找到周四的时间
LocalDateTime timer = now.withHour(18).withMinute(0).withSecond(0).withNano(0).with(DayOfWeek.THURSDAY);
//一周的时间间隔,这也是任务的执行周期/间隔
long period = 1000 * 60 * 60 * 24 * 7;
//如果当前时间大于周四,例如今天周五,那么就要到下周四去执行,而不是会退到本周四
if (now.compareTo(timer) >= 0) {
timer = timer.plusWeeks(1);
}
//任务延迟执行时间=周四时间执行任务的时间-当前时间
long delay = Duration.between(now, timer).toMillis();
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
pool.scheduleAtFixedRate(() -> {
log.info("running");
}, delay, period, TimeUnit.MILLISECONDS);
}
}