什么是响应式编程
正如我们所知道的过程式编程、命令式编程、面向对象编程、事件驱动编程范式,响应式编程也是编程范式的一种。
基础介绍
reactor作为一种响应式编程具体实现,能够跟踪异步场景下的数据流和数据状态变更。在Java9中已经集成了响应式编程的接口,类名为Flow。
现在的硬件已经非常厉害了,但是随着并发量的增加,仍然会达到瓶颈。现在主要有两个方式解决程序的系统:
(1)并行:单机维度增加线程,集群维度扩机器;
(2)高效的资源利用:寻找更有效的方式利用当前已有的资源;
现在我们传统的做法是写阻塞式的代码,如果出现性能瓶颈,可能会想办法提升并发度,但是这种方式可能会导致竞争和并发等问题。更糟糕的是,可能由于等待IO资源,导致线程一直处于空闲等待,导致资源浪费。
目前在jvm中听过了两种异步编程的模式:Callbacks、Futures。
Callbacks模式很难实现组合,从而导致读和理解都较困难,而Futures模式比Callbacks模式好一些,但是也存在多种问题:不支持懒计算、缺失多值及错误处理等机制。
多种异步写法的比较
Callbacks写法
// 定义回调接口
interface Callback {
void callback(String res);
}
// 异步回调模拟调用
public void asyncWork(Callback callback) {
new Thread(()->{
// 先做某些事情
String s = doSomething();
// 等事情做完之后,执行回调函数
callback.callback(s);
}).start();
}
// 模拟做的事情
private String doSomething() {
try{
Thread.sleep(5000);
}catch (Exception ignored){}
return "test";
}
Futures写法
// 异步执行
CompletableFuture.runAsync(() ->{
doSomething();
});
// 模拟做的事情
private String doSomething() {
try{
Thread.sleep(5000);
}catch (Exception ignored){}
return "test";
}
Reactor写法
@Test
public void test10() {
// 创建一个流式数据
Flux<String> flux = Flux.just("a", "b", "c");
// 在一个异步调度器中执行数据打印
flux.subscribeOn(Schedulers.parallel())
.subscribe(System.out::println);
}
优缺点
优点
- 非阻塞式;
- 更好的资源利用;
- 可组合性;
- 错误处理;
缺点
- 学习曲线可能复杂些;
- 调试麻烦些;
- 过度使用可能导致性能问题;
使用方式
引入依赖包
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>2023.0.11</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
进行Flux或者Mono封装
@Test
public void test3() {
Mono<String> future1 = Mono.fromCallable(() -> {
Thread.sleep(5000);
return "Result 1";
}).subscribeOn(Schedulers.elastic());
Mono<String> future2 = Mono.fromCallable(() -> {
Thread.sleep(5000);
return "Result 2";
}).subscribeOn(Schedulers.elastic());
long start = System.currentTimeMillis();
Mono<List<String>> mono = Flux.merge(future1, future2)
.collectList()
.timeout(Duration.ofSeconds(9));
List<String> res = mono.block();
System.out.println(res);
System.out.println("cost:" + (System.currentTimeMillis() - start));
}
@Test
public void test9() {
Mono<String> mono = Mono.fromSupplier(()-> {
try {
System.out.println("name:" + Thread.currentThread().getName());
Thread.sleep(5000);
}catch (Exception e){
}
return "hello";
});
long start = System.currentTimeMillis();
mono.subscribeOn(Schedulers.parallel())
.block();
System.out.println("cost:" + (System.currentTimeMillis() - start));
}
Q&A
Q:publishOn和subscribeOn的区别
A:参考下面文档:
https://blog.csdn.net/qq_33797928/article/details/105005629
https://juejin.cn/post/7147655449695748126
https://juejin.cn/post/7251894360722292792
个人思考
1、计算机涉及多种资源,比如:CPU资源、IO资源(网络设备、存储设备)等,如果很容易出现资源瓶颈,会导致计算机资源没法充分利用。比如当CPU资源使用率为30%时,IO资源使用率已达到100%,这时候会导致CPU资源出现严重的浪费。如果能通过合理的系统调度,保证CPU、IO等资源都能进行合理的利用,可能会导致整体的性能有很大的提升。
参考文档
1、reactor问题社区
https://stackoverflow.com/questions/tagged/reactor
2、官方文档
https://projectreactor.io/docs/core/release/reference/#_blocking_can_be_wasteful