多线程编程
文章目录
- 多线程编程
- @[toc]
- 引言
- 创建多线程的方式
- 继承Thread类
- 实现Runnable接口
- 实现Callable接口
- Callable和Runnable的区别
- Lambda表达式
- 线程的实现原理
- Future&FutureTask
- 具体使用
- submit方法
- Future到FutureTask类
- Future
- 注意事项
- 局限性
- CompletionService
- 引言
- 使用
- 使用场景
- CompletableFuture
- 引言
- 继承结构
- 任务的异步回调
- 多个任务组合处理
- 注意点
- Future需要获取返回值,才能获取异常信息
- CompletableFuture的get()方法是阻塞的。
- 默认线程池的注意点
- 自定义线程池时,注意饱和策略
文章目录
- 多线程编程
- @[toc]
- 引言
- 创建多线程的方式
- 继承Thread类
- 实现Runnable接口
- 实现Callable接口
- Callable和Runnable的区别
- Lambda表达式
- 线程的实现原理
- Future&FutureTask
- 具体使用
- submit方法
- Future到FutureTask类
- Future
- 注意事项
- 局限性
- CompletionService
- 引言
- 使用
- 使用场景
- CompletableFuture
- 引言
- 继承结构
- 任务的异步回调
- 多个任务组合处理
- 注意点
- Future需要获取返回值,才能获取异常信息
- CompletableFuture的get()方法是阻塞的。
- 默认线程池的注意点
- 自定义线程池时,注意饱和策略
引言
为什么使用多线程?
- 最直接的就是提升程序性能,使用多线程可以充分利用硬件资源,同时执行多个任务,从而提高程序的整体性能。通过并行执行任务,可以将工作负载分布到多个线程上,从而更有效地利用 CPU 资源。
- 提高响应性:可以将长时间处理的请求放在后台另一个线程进行处理,不妨碍主线程的用户执行其他的请求
- 实现并发编程:现在的工作中,多线程是并发编程的一种重要方式。利用好多线程机制可以大大提高系统整体的并发能力以及性能。
创建多线程的方式
从实现上来说,Java提供了三种创建线程的方式,但从原理上来看,其实只有一种方式,我们先从实现上来简单介绍一下这三种方式
继承Thread类
直接创建一个ThreadTest的实例,调用它的start()方法就可以创建一个线程了
class ThreadTest extends Thread{
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
实现Runnable接口
如果只是简单的实现了Runnable接口,它与线程并没有任何关系,只是相当于创建了一个线程执行的任务类而已,要想真正的创建线程,还是需要创建一个Thread对象,把RunnableTest实例作为构造方法的入参
1.2 实现Runnable接口
如果只是简单的实现了Runnable接口,它与线程并没有任何关系,只是相当于创建了一个线程执行的任务类而已,要想真正的创建线程,还是需要创建一个Thread对象,把RunnableTest实例作为构造方法的入参
class RunnableTest implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
public class CreateThreadTest {
public static void main(String[] args) {
RunnableTest runnableTest = new RunnableTest();
Thread thread = new Thread(runnableTest);
thread.start();
}
}
实现Callable接口
与Runnable很相似,它相当于也是也个任务的实现类,需要结合线程池的submit()方法才能使用,但与Runnable最本质的区别是,Callable的call()方法可以有返回值
class CallableTest implements Callable<Integer>{
@Override
public Integer call() throws Exception {
return ThreadLocalRandom.current().nextInt();
}
}
public class CreateThreadTest {
public static void main(String[] args) {
CallableTest callableTest = new CallableTest();
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<Integer> future = executorService.submit(callableTest);
}
}
Callable和Runnable的区别
Callable的call方法可以有返回值,可以声明抛出异常。和 Callable
配合的有一个Future
类,通过Future
可以了解任务执行情况,或者取消任务的执行,还可获取任务执行的结果,这些功能都是Runnable
做不到的,Callable
的功能要比Runnable
强大。
@FunctionalInterface
public interface Runnable {
// 没有返回值
public abstract void run();
}
@FunctionalInterface
public interface Callable<V> {
// 有返回值
V call() throws Exception;
}
Lambda表达式
这种方式与第二种方式其实是一样的,只是写法比较简洁明了
Thread thread = new Thread(() -> System.out.println(Thread.currentThread().getName()
线程的实现原理
这三种创建线程的方式,第一种是直接通过继承Thread
进行实现,第二种是通过实现Runnable
接口,然后将类作为创建Thread
类的入参,其实也是通过实现创建Thread
类进行实现。
现在看第三种Callable
的方式到底是怎么实现的,他是通过实现Callable
接口,然后使用submit
进行执行,这里我们通过debug
这个方法,最后发现其实也是通过创建的Thread
进行实现。
**总结:**所以最后我们发现三种方式其实都是创建Thread
类进行实现
Future&FutureTask
我们一共有三种创建线程的方式,继承Thread
和实现Runnable
接口都是没有返回值的,所以我们不知道线程的执行状态,不能获取执行完成的一个结果。所以这时就需要Callable
来解决上面的问题,通过Callable
和Future
能够获得执行的结果。
具体使用
public class CompletableFutureTest {
private static ThreadPoolExecutor executor;
static {
executor = new ThreadPoolExecutor(10, 10, 100, TimeUnit.HOURS, new ArrayBlockingQueue<>(100), new ThreadFactory() {
private int count = 0;
@Override
public Thread newThread(Runnable r) {
count++;
System.out.printf("CustomerThread- %d :", count);
return new Thread(r, "CustomerThread-" + count);
}
});
}
static class CallableTest implements Callable<String>{
@Override
public String call() throws Exception {
Thread.sleep(1000);
System.out.println("线程开始运行");
return "返回值";
}
}
public static void main(String[] args) throws Exception{
CallableTest test = new CallableTest();
FutureTask<String> futureTask = new FutureTask<>(test);
executor.submit(futureTask);
System.out.println(futureTask.get());
executor.shutdown();
}
}
submit方法
在该方法中,我们传入的是FutureTask
类型的,结果把参数转成了RunnableFuture
,任务执行依然是execute()方法
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
这里的Runnable
其实是FutureTask的父类
当我们使用Callable
类的子类作为参数时候,其实也是转换为RunnableFuture
,然后使用excute
进行执行。
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
Future到FutureTask类
Future
其实就是定义了一组接口,Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。
FutureTask实现了这个接口,同时还实现了Runnalbe接口,这样FutureTask就相当于是消费者和生产者的桥梁了,消费者可以通过FutureTask存储任务的执行结果,跟新任务的状态:未开始、处理中、已完成、已取消等等。而任务的生产者可以拿到FutureTask被转型为Future接口,可以阻塞式的获取处理结果,非阻塞式获取任务处理状态
**总结:**FutureTask既可以被当做Runnable放入Excutor
来执行,也可以被当做Future来获取Callable的返回结果。
Future
注意事项
- 当 for 循环批量获取Future的结果时容易 block,get 方法调用时应使用 timeout 限制
- 因为我们可能会有耗时的任务,后面的任务只能等前面耗时的任务完成以后才能获取结果,所以有时会卡住
- Future 的生命周期不能后退。一旦完成了任务,它就永久停在了“已完成”的状态,不能从头再来
局限性
从本质上说,Future表示一个异步计算的结果。它提供了isDone()来检测计算是否已经完成,并且在计算结束后,可以通过get()方法来获取计算结果。在异步计算中,Future确实是个非常优秀的接口。但是,它的本身也确实存在着许多限制:
- 并发执行多任务:Future只提供了get()方法来获取结果,并且是阻塞的。所以,除了等待你别无他法;
- 无法对多个任务进行链式调用:如果你希望在计算任务完成后执行特定动作,比如发邮件,但Future却没有提供这样的能力;
- 无法组合多个任务:如果你运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在Future中这是无能为力的;
- 没有异常处理:Future接口中没有关于异常处理的方法;
而这些局限性CompletionService和CompletableFuture都解决了。
CompletionService
引言
CompletionService
是一个为了解决我们并发执行多个线程的任务的时候,能够及时获取已经完成任务的结果而创建的一个抽象的接口类,我们一般是使用的他的实现类ExecutorCompletionService
使用
具体的使用规则还有方法的作用博客链接
使用场景
- 当需要批量提交异步任务的时候建议使用CompletionService。CompletionService将线程池Executor和阻塞队列BlockingQueue的功能融合在了一起,能够让批量异步任务的管理更简单。
- CompletionService能够让异步任务的执行结果有序化。先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如Forking Cluster这样的需求。
- 线程池隔离。CompletionService支持自己创建线程池,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。
CompletableFuture
引言
我们使用CompletionService
能够解决多个线程并发执行时,获取执行结果的返回值时的阻塞问题。但是假如我们并发执行的多线程任务需要遵循一定的规则,或者执行的顺序时候,CompletionService
就不能满足我们的需求了。
所以CompletableFuture
其实是对Future
进行扩展,弥补了Future
的局限性,同时CompletableFuture
实现了对任务编排的能力。
在以往,虽然通过**
CountDownLatch
**等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以维护。
更加详细介绍博客
继承结构
CompletionStage
接口定义了任务编排的方法,执行某一阶段,可以向下执行后续阶段。异步执行的,默认线程池是ForkJoinPool.commonPool()
,但为了业务之间互不影响,且便于定位问题,强烈推荐使用自定义线程池。
任务的异步回调
多个任务组合处理
注意点
Future需要获取返回值,才能获取异常信息
ExecutorService executorService = new ThreadPoolExecutor(5, 10, 5L,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
int a = 0;
int b = 666;
int c = b / a;
return true;
},executorService).thenAccept(System.out::println);
//如果不加 get()方法这一行,看不到异常信息
//future.get();
Future需要获取返回值,才能获取到异常信息。如果不加 get()/join()方法,看不到异常信息。小伙伴们使用的时候,注意一下哈,考虑是否加try…catch…或者使用exceptionally方法。
CompletableFuture的get()方法是阻塞的。
CompletableFuture的get()方法是阻塞的,如果使用它来获取异步调用的返回值,需要添加超时时间~
csharp复制代码//反例
CompletableFuture.get();
//正例
CompletableFuture.get(5, TimeUnit.SECONDS);
默认线程池的注意点
CompletableFuture代码中又使用了默认的线程池,处理的线程个数是电脑CPU核数-1。在大量请求过来的时候,处理逻辑复杂的话,响应会很慢。一般建议使用自定义线程池,优化线程池配置参数。
自定义线程池时,注意饱和策略
CompletableFuture的get()方法是阻塞的,我们一般建议使用future.get(3, TimeUnit.SECONDS)
。并且一般建议使用自定义线程池。
但是如果线程池拒绝策略是DiscardPolicy
或者DiscardOldestPolicy
,当线程池饱和时,会直接丢弃任务,不会抛弃异常。因此建议,CompletableFuture线程池策略最好使用AbortPolicy,然后耗时的异步线程,做好线程池隔离哈。