持续学习&持续更新中…
守破离
【雷丰阳-谷粒商城 】【分布式高级篇-微服务架构篇】【15】异步_线程池
- 初始化线程的 4 种方式
- 开发中为什么使用线程池
- 线程池七大参数
- 线程池工作原理
- 常见的 4 种线程池
- 生产中如何使用线程池?
- CompletableFuture 异步编排—简介
- 业务场景
- 简介
- CompletableFuture—创建(run/supply)
- CompletableFuture—计算完成(whenComplete)
- CompletableFuture—处理(handle)
- CompletableFuture—线程串行化方法
- CompletableFuture—两任务组合—都要完成(Both/Combine)
- CompletableFuture—两任务组合—一个完成(Either)
- CompletableFuture—多任务组合
- 参考
初始化线程的 4 种方式
1)、继承 Thread
2)、实现 Runnable 接口
3)、实现 Callable 接口 + FutureTask (可以拿到返回结果,可以处理异常)
4)、线程池
-
方式 1 和方式 2:主进程无法获取线程的运算结果。不适合当前场景,也会导致资源耗尽
-
方式 3:主进程可以获取线程的运算结果,但是不利于控制服务器中的线程资源。极大可能导致服务器资源耗尽。
-
我们以后在业务代码里面,方式123,这三种启动线程的方式都不用。可能会导致资源耗尽【应该将所有的多线程异步任务都交给线程池执行】
-
方式 4:通过如下两种方式初始化线程池(当前系统中线程池最好只有一两个,每个异步任务,提交给线程池让他自己去执行就行)
-
1、Executors.newFiexedThreadPool(3);
-
2、new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit unit, workQueue, threadFactory, handler);
-
开发中为什么使用线程池
-
降低资源的消耗:通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗
-
提高响应速度:因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于 等待分配任务 的状态,当任务来时无需创建新的线程就能执行
-
提高线程的可管理性:线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来的系统开销。无限的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main....start....");
/**
* 1)、继承Thread
* Thread01 thread = new Thread01();
* thread.start();//启动线程
*
* 2)、实现Runnable接口
* Runable01 runable01 = new Runable01();
* new Thread(runable01).start();
* 3)、实现Callable接口 + FutureTask (可以拿到返回结果,可以处理异常)
* FutureTask<Integer> futureTask = new FutureTask<>(new Callable01());
* new Thread(futureTask).start();
* //阻塞等待整个线程执行完成,获取返回结果
* Integer integer = futureTask.get();
* 4)、线程池[ExecutorService]
* 给线程池直接提交任务。
* service.execute(new Runable01());
* 创建:
* 1)、Executors
* 2)、new ThreadPoolExecutor
*
* Future:可以获取到异步结果
*
* 区别;
* 1、2不能得到返回值。3可以获取返回值
* 1、2、3都不能控制资源
* 4可以控制资源,性能稳定。
*/
// new Thread(()-> System.out.println("hello")).start();
//我们以后再业务代码里面,以上三种启动线程的方式都不用。可能会导致资源耗尽【应该将所有的多线程异步任务都交给线程池执行】
// executor.execute(new Runable01());
// Future<?> submit = executor.submit(new Thread01());
// Object o = submit.get();
// System.out.println(o);
//当前系统中线程池最好只有一两个,每个异步任务,提交给线程池让他自己去执行就行
/**
* 七大参数
* corePoolSize:核心线程数[一直存在除非设置allowCoreThreadTimeOut]; 线程池创建好以后就会准备就绪这些数量的线程,它们等待接受异步任务去执行。
* 5个 Thread thread = new Thread(); thread.start();
* maximumPoolSize:[200] 最大线程数量; 控制资源
* keepAliveTime:存活时间。如果当前的线程池中的线程数量大于core线程数量。
* 并且只要线程空闲时间大于指定的keepAliveTime,也就是线程在最大多长时间没有接到新任务
* 就会释放(maximumPoolSize-corePoolSize)数量空闲的线程,最终线程池维持在 corePoolSize大小
*
* unit:时间单位
* BlockingQueue<Runnable> workQueue:阻塞队列。如果任务有很多,就会将目前多的任务放在队列里面。
* 只要有线程空闲,就会去队列里面取出新的任务继续执行。
* threadFactory:线程的创建工厂。
* RejectedExecutionHandler handler:如果队列满了,按照我们指定的拒绝策略拒绝执行任务
*
*
*
* 工作顺序:
* 1)、线程池创建,准备好core数量的核心线程,准备接受任务
* 1.1、core满了,就将再进来的任务放入阻塞队列中。空闲的core就会自己去阻塞队列获取任务执行
* 1.2、阻塞队列满了,就直接开新线程执行,最大只能开到max指定的数量
* 1.3、max满了就用RejectedExecutionHandler拒绝任务
* 1.4、max都执行完成,有很多空闲.在指定的时间keepAliveTime以后,释放max-core这些线程
*
* new LinkedBlockingDeque<>():默认是Integer的最大值。内存不够
*
* 一个线程池 core 7; max 20 ,queue:50,100并发进来怎么分配的;
* 7个会立即得到执行,50个会进入队列,再开13个进行执行。剩下的30个就使用拒绝策略。
* 如果不想抛弃还要执行。CallerRunsPolicy;
*
*/
ExecutorService executor = new ThreadPoolExecutor(5,
200,
10,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(100000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
// Executors.newCachedThreadPool() core是0,所有都可回收
// Executors.newFixedThreadPool() 固定大小,core=max;都不可回收
// Executors.newScheduledThreadPool() 定时任务的线程池
// Executors.newSingleThreadExecutor() 单线程的线程池,后台从队列里面获取任务,挨个执行
//
System.out.println("main....end....");
}
线程池七大参数
corePoolSize:
- the number of threads to keep in the pool, even if they are idle, unless allowCoreThreadTimeOut is set
- 核心线程数[一直存在除非设置allowCoreThreadTimeOut];
- 线程池创建好以后就会将这些数量的线程准备就绪,它们等待接受异步任务去执行。
maximumPoolSize:
- the maximum number of threads to allow in the pool
- 最大线程数量;
- 可以控制资源
keepAliveTime:
- when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.
- 存活时间。
- 如果当前的线程池中的线程数量大于core线程数量。
- 并且只要这些多余的空闲线程空闲时间大于指定的keepAliveTime,也就是线程在最大多长时间没有接到新任务
- 就会释放(maximumPoolSize-corePoolSize)数量空闲的线程,最终使得线程池维持在 corePoolSize 大小
unit:
- the time unit for the keepAliveTime argument
- 时间单位
workQueue(阻塞队列)
- the queue to use for holding tasks before they are executed. This queue will hold only the Runnable tasks submitted by the execute method.
- 如果任务有很多,就会将目前多的任务放在队列里面。
- 队列用来存储等待执行的任务,只要有线程空闲,就会去队列里面取出新的任务继续执行。
threadFactory:
- the factory to use when the executor creates a new thread
- 线程的创建工厂。
handler:(RejectedExecutionHandler)
- the handler to use when execution is blocked because the thread bounds and queue capacities are reached
- 如果队列满了,按照我们指定的拒绝策略拒绝执行任务:
- AbortPolicy【默认】:直接拒绝策略,也就是不会执行任务,直接抛出RejectedExecutionException,这是默认的拒绝策略。
- DiscardPolicy:抛弃策略,也就是直接忽略提交的任务(通俗来说就是空实现)。
- DiscardOldestPolicy:抛弃最老任务策略,也就是通过poll()方法取出任务队列队头的任务抛弃,然后执行当前提交的任务。
- CallerRunsPolicy:调用者执行策略,也就是当前调用Executor#execute()的线程直接调用任务Runnable.run(),一般不希望任务丢失会选用这种策略,但从实际角度来看,原来的异步调用意图会退化为同步调用。
线程池工作原理
文字描述:
-
在创建了线程池后,开始等待请求。
-
当调用execute()方法添加一个请求任务时,线程池会做出如下判断:
- 如果正在运行的线程数量小于corePoolSize,那么马上创建核心线程运行这个任务;
- 如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列;
- 如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
- 如果队列满了且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
-
当一个线程完成任务时,它会从队列中取下一个任务来执行。
-
当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:
- 如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。
- 所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小。
常见的 4 种线程池
点进源码可以去看一看:
Executors.newCachedThreadPool() core是0,所有都可回收
Executors.newFixedThreadPool() 固定大小,core=max;都不可回收
Executors.newScheduledThreadPool() 定时任务的线程池
Executors.newSingleThreadExecutor() 单线程的线程池,后台从队列里面获取任务,挨个执行
生产中如何使用线程池?
在工作中 单一的/固定数的/可变的 三种创建线程池的方法哪个用的多?
答案是一个都不用,我们工作中只能使用自定义的
Executors中JDK已经给你提供了,为什么不用?
一般不要把最大线程数写死:
final int availableProcessors = Runtime.getRuntime().availableProcessors();
int maximumPoolSize = availableProcessors + 1;
ExecutorService threadPool = new ThreadPoolExecutor(
2,
maximumPoolSize,
200L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
// new ThreadPoolExecutor.CallerRunsPolicy()
// new ThreadPoolExecutor.DiscardPolicy()
// new ThreadPoolExecutor.DiscardOldestPolicy()
);
CompletableFuture 异步编排—简介
业务场景
通过线程池性能稳定,也可以获取执行结果,并捕获异常。
但是,在业务复杂情况下,一 个异步调用可能会依赖于另一个异步调用的执行结果。
业务场景: 查询商品详情页的逻辑比较复杂(第4/5/6步需要获取到第1步的数据才能去查询),有些数据还需要远程调用,必然需要花费更多的时间。
假如商品详情页的每个查询,需要以上标注的时间之和才能完成,那么,用户需要 5.5s 后才能看到商品详情页的内容。很显然是不能接受的。
如果有多个线程同时完成这 6 步操作,也许只需要 1.5s(某个最大耗时) 即可完成响应。
简介
-
Future 是 Java 5 添加的类,用来描述一个异步计算的结果。你可以使用
isDone
方法检查计 算是否完成,或者使用get
阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel
方法停止任务的执行。 -
虽然
Future
以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不 方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的 初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果,为 什么不能用观察者设计模式当计算结果完成及时通知监听者呢? -
很多语言,比如 Node.js,采用回调的方式实现异步编程。Java 的一些框架,比如 Netty,自 己扩展了 Java 的
Future
接口,提供了addListener
等多个扩展方法;Google guava 也提供了 通用的扩展 Future;Scala 也提供了简单易用且功能强大的 Future/Promise 异步编程模式。
作为正统的 Java 类库,是不是应该做点什么,加强一下自身库的功能呢? -
在 Java 8 中, 新增加了一个包含 50 个方法左右的类: CompletableFuture,提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以 通过回调的方式处理计算结果,并且提供了转换和组合 CompletableFuture 的方法。
-
CompletableFuture 类实现了 Future 接口,所以你还是可以像以前一样通过
get
方法阻塞或 者轮询的方式获得结果,但是这种方式不推荐使用。 -
CompletableFuture 和 FutureTask 同属于 Future 接口的实现类,都可以获取线程的执行结果。
CompletableFuture—创建(run/supply)
-
runXxxx 都是没有返回结果的,supplyXxx 都是可以获取返回结果的
-
可以传入自定义的线程池,否则就用默认的线程池;
CompletableFuture—计算完成(whenComplete)
-
whenComplete 可以处理正常和异常的计算结果,exceptionally 处理异常情况。
-
whenComplete 和 whenCompleteAsync 的区别:
- whenComplete:是执行当前任务的线程(创建该CompletableFuture的线程)继续执行 whenComplete的任务。
- whenCompleteAsync:是把 whenCompleteAsync 这个任务继续提交给线程池来执行。
-
方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程 执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("运行结果:" + i);
return i;
}, executor).whenComplete((res,excption)->{
//虽然能得到异常信息,但是没法修改返回数据。
System.out.println("异步任务成功完成了...结果是:"+res+";异常是:"+excption);
}).exceptionally(throwable -> {
//可以感知异常,同时返回默认值
return 10;
});
public static void main(String[] args) throws Exception {
System.out.println(Thread.currentThread().getName());
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\tfuture");
return 1024;
});
completableFuture
.whenComplete((t, u) -> {
System.out.println(Thread.currentThread().getName() + "\twhenComplete");
})
.exceptionally(f -> {
return 4444;
}).get();
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName());
new Thread(() -> {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\tfuture");
return 1024;
});
try {
future
.whenComplete((t, u) -> {
System.out.println(Thread.currentThread().getName() + "\twhenComplete");
})
.exceptionally(f -> {
return 4444;
}).get();
} catch (Exception e) {
e.printStackTrace();
}
}, "线程a").start();
//
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName());
new Thread(() -> {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\tfuture");
return 1024;
});
try {
future
.whenCompleteAsync((t, u) -> {
System.out.println(Thread.currentThread().getName() + "\twhenCompleteAsync");
})
.exceptionally(f -> {
return 4444;
}).get();
} catch (Exception e) {
e.printStackTrace();
}
}, "线程b").start();
}
CompletableFuture—处理(handle)
和 complete 一样,可对结果做最后的处理(可处理异常),可改变返回值。
/**
* 方法执行完成后的处理【无论成功完成还是失败完成】
*/
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 4;
i = i / 0;
System.out.println("运行结果:" + i);
return i;
}, executor)
.handle(
(res, thr) -> {
if (res != null) {
return res * 2;
}
if (thr != null) {
return 0;
}
return 0;
}
);
// R apply(T t, U u);
System.out.println(future.get());
Thread.sleep(50000);
if(1==1) throw new RuntimeException();
CompletableFuture—线程串行化方法
-
带有 Async 默认是异步执行的。同之前。
-
thenRun
方法:【不接收,不返回】只要上面的任务执行完成,就开始执行thenRun。 -
thenAccept
方法:【只接收,无返回】能接收消费上一步处理的结果,无返回结果。 -
thenApply
方法:【既接收,又返回】当一个线程依赖另一个线程时,获取上一个任务返回的结果,并且当前任务也有返回值可以传递给下一个异步任务。
/**
* 线程串行化
* 1)、thenRun:不能获取到上一步的执行结果,无返回值
* .thenRunAsync(() -> {
* System.out.println("任务2启动了...");
* }, executor);
* 2)、thenAcceptAsync;能接受上一步结果,但是无返回值
* 3)、thenApplyAsync:;能接受上一步结果,有返回值
*/
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("运行结果:" + i);
return i;
}, executor).thenApplyAsync(res -> {
System.out.println("任务2启动了..." + res);
return "Hello " + res;
}, executor);
future.get() //阻塞方法
CompletableFuture—两任务组合—都要完成(Both/Combine)
-
两个任务必须都完成,触发该任务。
-
thenCombine:组合两个 future,获取两个 future 的返回结果,并返回当前任务的返回值
-
thenAcceptBoth:组合两个 future,获取两个 future 任务的返回结果,然后处理任务,没有 返回值。
-
runAfterBoth:组合两个 future,不需要获取 future 的结果,只需两个 future 处理完任务后, 处理该任务。
/**
* 两个都完成
*/
CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1线程:" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("任务1结束:");
return i;
}, executor);
CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2线程:" + Thread.currentThread().getId());
try {
Thread.sleep(3000);
System.out.println("任务2结束:");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello";
}, executor);
// future01.runAfterBothAsync(future02, () -> {
// System.out.println("任务3开始...runAfterBothAsync");
// }, executor);
//
// future01.thenAcceptBothAsync(future02, (f1, f2) -> {
// System.out.println("任务3开始...之前的结果thenAcceptBothAsync:" + f1 + "--》" + f2);
// }, executor);
CompletableFuture<String> future = future01.thenCombineAsync(future02, (f1, f2) -> {
return f1 + " :" + f2 + " -> Haha thenCombineAsync";
}, executor);
System.out.println(future.get());
CompletableFuture—两任务组合—一个完成(Either)
-
当两个任务中,任意一个 future 任务完成的时候,执行任务。
-
applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。
-
acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。
-
runAfterEither:两个任务有一个执行完成,不需要获取 future 的结果,处理任务,也没有返 回值。
/**
* 两个任务,只要有一个完成,我们就执行任务3
* runAfterEitherAsync:不感知结果,自己没有返回值
* acceptEitherAsync:感知结果,自己没有返回值
* applyToEitherAsync:感知结果,自己有返回值
*/
// future01.runAfterEitherAsync(future02, () -> {
// System.out.println("任务3开始...没有之前的结果");
// }, executor);
// future01.acceptEitherAsync(future02, (res) -> {
// System.out.println("任务3开始...之前的结果:" + res);
// }, executor);
CompletableFuture<String> future = future01.applyToEitherAsync(future02, res -> {
System.out.println("任务3开始...之前的结果:" + res);
return res.toString() + "->哈哈";
}, executor);
System.out.println(future.get());
CompletableFuture—多任务组合
-
allOf:等待所有任务完成
-
anyOf:只要有一个任务完成
CompletableFuture<String> futureImg = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品的图片信息" + Thread.currentThread().getName());
return "hello.jpg";
}, executor);
CompletableFuture<String> futureAttr = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("查询商品的属性");
return "黑色+256G";
}, executor);
CompletableFuture<String> futureDesc = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
System.out.println("查询商品介绍");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "华为";
}, executor);
CompletableFuture<Void> allOf = CompletableFuture.allOf(futureImg, futureAttr, futureDesc);
allOf.get();//等待所有结果完成
System.out.println("main....end....");
System.out.println(futureImg.get() + "=>" + futureAttr.get() + "=>" + futureDesc.get()); //直接拿来future的结果来用
// CompletableFuture<Object> anyOf = CompletableFuture.anyOf(futureImg, futureAttr, futureDesc);
// anyOf.get();//等待任意一个结果完成
// System.out.println("main....end....");
// System.out.println(anyOf.get());
参考
雷丰阳: Java项目《谷粒商城》Java架构师 | 微服务 | 大型电商项目.
Throwable: 硬核干货:4W字从源码上分析JUC线程池ThreadPoolExecutor的实现原理.
话唠扇贝: 线程池 ThreadPoolExecutor 详解.
本文完,感谢您的关注支持!