Reactive Steams
一、Reactive Steams介绍
在聊Reactive Steams之前,先了解一下Reactive Programming(反应式/响应式编程)。为了解决异步编程中出现的各种问题,程序员们提出了各种的思路去解决这些问题,这些解决问题的方式,方法,手段就可以叫做Reactive Programming。
Reactive Programming是一种编程思想,类似面向对象,函数式编程。
本质上是对数据流或某种变化做出的反应,这个变化什么时候触发是未知的,所以他是一种基于异步、回调的方式在处理问题。
当越来越多的程序员,开始使用这种编程思想时,需要一些大佬来统一一个思想规范。是国外的几个大佬公司启动的了Reactive Steams项目。Netflix、Pivotal、Lightbend联合来为异步流处理提供标准,规范,他提供了对运行时环境以及网络的工作。
Reactive Steams翻译过来就是响应式/反应式流。其实是一种基于异步流处理的标准化规范,目的是在使用流处理更加可靠,高效和响应式。
二、Java层面的Reactive Steams
基于这个规范的实现很多,比如三方库中比较出名的RxJava,Reactor等等。
但是JDK8版本中,Java已经有了CompletableFuture的支撑,我们可以将大量的异步任务做好编排。但是在JDK8版本中的CompletableFuture依然有很多特性无法支撑。所以在JDK9,CompletableFuture做了很多的更新,比如支持延迟,超时,子类化之类功能。
这时,咱们会发现,其实CompletableFuture已经可以去支撑做一些异步编程的操作了。但是为什么很多大公司依然还是使用RxJava,Reactor这种三方依赖库呢?
问题在于,大多数的时候,咱们的采用异步编程处理的任务并不是非常复杂的。这个时候,咱们确实不需要去使用Reactive Steams反应流的框架。如果系统越来越复杂,或者你处理的业务本身就是及其复杂的那种,你就要去写一个让人头皮发麻的代码了。随着时间的推移,这种代码会变成非常难以维护。
其次CompletableFuture并不是真正的基于Reactive Steams去实现。CompletableFuture描述的是单次执行的结果。尽管可以通过各种方法将异步任务之间构建成一串任务组成的流程图,本质上依然是单词结果。
反应式流,面向的是Stream。 咱们Java中的Stream API更类似Reactive Steams的思想。Stream API是同步阻塞的。
最经典的就是CompletableFuture无法处理Reactive Steams中的一个核心概念,back pressure(背压,反压,回压),比如在上下游承载能力不同时,比如下游玩不转了,需要告知上游采取一些策略去解决。CompletableFuture明显无法处理这种。
其次还有Java中提供的回调,Future机制在实现响应式编程中,问题和缺点都比较难处理。有个比较出名的概念叫做callback hell(回调地狱)。简单来说就是回调里面套回调,虽然将子过程做到解耦,但是随着业务的负责,回调代码的可读性、复杂性就大大的增加,这个就是快乐的回调地狱。
所以,咱们需要一套框架或者说类库来实现真正响应式流,大概需要几个特性:
- 支持将异步任务做封装以及组装,需要API对异步任务进行包装,并且需要很多子任务来对异步操作进行链式组装,过程中包括过滤,异常处理,超时等等操作。
- 减少异步任务的嵌套,减少代码的复杂性,增加可读性,避免callback hell这种及其复杂恶心的代码。
- 支持背压back pressure,也就需要有上游和下游的概念,可以做到协商处理数据流的速度。
三、Java层面Reactive Steams的API
首先Reactive Steams响应流实现方式其实是基于观察者模式的扩展,同时也能看到发布订阅模式,责任链模式等等。
整个Reactive Steams流程大致如下。
直接在JDK9版本之上查看Doug Lee提供的Flow类。(我的是JDK11)。
在Flow类中,提供了核心的四个接口。
Publisher,Subscriber,Subscription,Processor
Publisher:Publisher是函数式接口,负责发布数据的。 Publisher内部有一个方法subscribe方法,去和具体的订阅者绑定关系。
Subscriber:Subscriber是订阅者,负责订阅,消费数据。
四个方法:
- onSubscribe:定于成功后触发,并且表明可以开始接收发布者的数据元素了。
- onNext:每次获取到发布者的数据元素都会执行onNext。
- onError:接收数据元素时,出现异常等问题时,走onError。
- onComplete:当指定接收的元素个数搞定后,触发onComplete。
Subscription:发布者和订阅者是基于Subscription关联的。当建立了订阅的关系后,发布者会将Subscription传递给订阅者。订阅者指定获取元素的数量和取消订阅操作,都要基于Subscription去操作。
提供了两个方法:
- request:订阅者要获取的元素个数。
- cancel:取消订阅,当前的订阅者不接收当前发布者的元素。
Processor:Processor继承了Publisher和Subscriber,即是发布者也是订阅者。Processor一般作为数据的中转,订阅者处理完数据元素,可以再次发给下一个订阅者。
这四个接口很重要,是Reactive Streams的规范,但是可以明显的看到,内部没有具体的内容实现。
这里就类似JDBC这种规范,规范在JDK9中提出来了,想实现,可以基于当前的这个四个接口再做具体的逻辑处理以及实现的细节。
四、Java层面Reactive Steams基本操作
咱们测试Java中的Flow里提供的API时,就是走最基本的操作。
其中Processor不需要重写,玩最基本的操作,不去做订阅者和发布者的转换。
其次Subscription也不需要重写,这东西就是提供了订阅者指定订阅的消息个数,以及取消的操作。
然后Publisher需要重写,但是JDK中已经提供了一个Publisher的实现,SubmissionPublisher,可以直接使用。
最后,Subscriber需要咱们自己重写,指定好订阅消息的个数,已经消费的一些逻辑
public class MySubscriber implements Flow.Subscriber<Integer> {
private Flow.Subscription subscription;
@Override
// 绑定好订阅关系后,就会触发这个方法
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
this.subscription.request(10);
}
@Override
public void onNext(Integer item) {
System.out.println(Thread.currentThread().getName() + ":接收到数据流:" + item);
}
@Override
public void onError(Throwable throwable) {
System.out.println(Thread.currentThread().getName() + ":接收消息出现异常:" + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println(Thread.currentThread().getName() + ":当前订阅者要求接收的消息全部处理完毕。");
}
}
直接使用SubmissionPublisher测试整体效果
public static void main(String[] args) {
// 只有一个工作线程的线程池
ExecutorService executor = Executors.newFixedThreadPool(1);
// 指定缓冲区的大小
int maxBufferCapacity = 5;
// 需要指定两个参数
// 第一个参数需要传递一个线程池,指定订阅者使用的线程
// 第二个参数,需要指定一个缓冲区,发布者发布消息后,消息会扔到缓冲区里。
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>(executor,maxBufferCapacity);
// 绑定订阅者
MySubscriber subscriber = new MySubscriber();
publisher.subscribe(subscriber);
// 发布消息
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName() + ":发布消息:" + i);
publisher.submit(i);
}
// 释放资源
publisher.close();
executor.shutdown();
}
最终效果:
缓冲区: 缓冲区就是发布者和订阅者之间的一块内存,类似线程池中的阻塞队列,可以将消息扔到这个缓存区里。其次咱们设置的缓冲区大小是5,但是发现get出来的时候,5被替换为了8。这边也是SubmissionPublisher为了更有效的使用内存,默认会基于roundCapacity方法将传递的缓冲区大小替换为2的n次幂。
缓冲区大小为4,触发可以发送5条,1条被订阅者拿走执行onNext,剩下的4个会存储在缓冲区内部
背压效果: 当订阅者指定的消息已经全部处理完毕后,发布者最多只能发布缓冲区大小个数的消息,剩下的内容会基于背压的效果直接暂时不发送。
订阅者触发onComplete: 需要你发布者做了close操作,确认了发布者已经将消息全部发送,并且订阅者也已经将全部的消息处理完毕后,才会触发onComplete
订阅者的subscription的使用: 订阅者可以在onNext或者其他方法中动态的使用subscription去指定后续需要几个消息订阅,已经是否需要取消订阅消息等操作。
五、Reactive Steams落地体验
5.1、回调地狱问题
前面的方式大致了解啦JDK9中更新的Reactive Streams的规范,咱们实现也仅仅是看到了发布订阅和回压的效果。并没有看到如何解决回调地狱的问题。咱们可以通过Spring5官网提供的一个例子,来体验一下CallBack Hell回调地狱带来的问题。后面再根据三方的实现来看一下基于Reactive Streams实现后效果如何。这里基本是根据伪代码走的。
例子:在用户的UI页面上,展示当前用户最喜欢的Top5的商品详情。这里会根据用户的ID去查询当前用户Top5商品的ID,如果ID可以查询到之后再根据商品的ID去查询商品的详情。如果当前用户ID查询的结果不存在喜欢的Top商品,没有的话,通过推荐服务查询Top5的商品信息。展示给用户。
当前例子需要三个服务的支撑:
- 根据用户ID查询用户的Top5商品ID
- 根据Top5商品ID查询商品详情
- 调用推荐服务,获取5个商品详情
基于Java最原生的异步编程方式,实现上述操作,查看到底什么是回调地狱。。。
准备了商品详情的实体类。
@Data
public class Fav {
private String itemId;
private String itemName;
private String itemDetail;
}
准备了回调方法,拿到结果后触发
public interface Callback<T> {
void onSuccess(T t);
void onError(Throwable throwable);
}
准备了访问三个服务的Service接口
public interface UserService {
/**
* 根据用户Id查询用户的Top5商品Id
* @param userId
* @param list
*/
void getFav(String userId, Callback<List<String>> list);
}
public interface ItemService {
/**
* 根据商品Id查询商品的详情
* @param itemId
* @param callback
*/
void getDetail(String itemId,Callback<Fav> callback);
}
public interface SuggestionService {
/**
* 调用推荐服务,获取推荐商品
* @param favs
*/
void getSuggestion(Callback<List<Fav>> favs);
}
准备了响应数据的UI线程工具以及响应方法
public class UiUtils {
public static void submitOnUiThread(Runnable runnable){
// 线程池中的线程做响应的操作………………
}
public static void show(Object obj){
// 利用UI线程展示具体数据
}
public static void error(Object obj){
// 出现错误响应的内容
}
}
完成了Controller中的异步编程效果
@RestController
public class CallBackHellController {
@Autowired
private UserService userService;
@Autowired
private ItemService itemService;
@Autowired
private SuggestionService suggestionService;
@GetMapping("/callbackhell")
public void callbackHell(String userId){
//1、调用用户服务,查询Top5商品Id
userService.getFav(userId, new Callback<List<String>>() {
@Override
public void onSuccess(List<String> list) {
// 已经查询到商品Id,但是不知道是否有值
if (list.isEmpty()){
// 3、用户没有Top5商品Id,通过推荐服务查询推荐商品详情
suggestionService.getSuggestion(new Callback<List<Fav>>(){
@Override
public void onSuccess(List<Fav> favs) {
// 推荐服务查询到了商品详情,响应即可
UiUtils.submitOnUiThread(() -> {
favs.stream().limit(5).forEach(UiUtils::show);
});
}
@Override
public void onError(Throwable throwable) {
UiUtils.error(throwable);
}
});
}
else{
// 2、通过用户查询到了Top5商品Id,通过商品Id查询商品详情
list.stream().limit(5).forEach(itemId -> itemService.getDetail(itemId,new Callback<Fav>(){
@Override
public void onSuccess(Fav fav) {
// 查询到了商品详情,利用UI线程,给客户端响应数据
UiUtils.submitOnUiThread(() -> UiUtils.show(fav));
}
@Override
public void onError(Throwable throwable) {
// 出现异常了。
UiUtils.error(throwable);
}
}));
}
}
@Override
public void onError(Throwable throwable) {
// 出现异常了。
UiUtils.error(throwable);
}
});
}
}
5.2、解决回调地狱问题
这里为了解决回调地狱问题,需要一个Reactor的依赖来帮助咱们实现异步编程。
需要导入依赖
<!-- reactor的核心依赖-->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.3.17.RELEASE</version>
</dependency>
不能再使用之前的Callback方式了。需要使用reactor提供的Flux,并且这种链式操作会更直观,也更好维护。就需要修改三个服务对应的Service。
public interface UserService {
/**
* 根据用户ID查询Top5商品ID
* @param userId
* @return
*/
Flux<List<String>> getFav(String userId);
}
public interface ItemService {
/**
* 根据商品ID查询商品详情
* @param itemId
* @return
*/
Flux<Fav> getDetail(String itemId);
}
public interface SuggestionService {
/**
* 获取推荐的商品详情
* @return
*/
Flux<List<Fav>> getSuggestion();
}
就可以利用Flux提供的API来解决之前回调地狱的问题。
@RestController
public class ReactorCallbackController {
@Autowired
private UserService userService;
@Autowired
private ItemService itemService;
@Autowired
private SuggestionService suggestionService;
@GetMapping("reactorcallback")
public void reactorCallback(String userId){
userService
.getFav(userId) // 根据用户Id查询Top5商品Id
.flatMap(itemService::getDetail) // 根据商品ID查询商品详情
.switchIfEmpty(suggestionService.getSuggestion()) // 如果前面为null,这里通过推荐服务查询商品详情
.take(5) // 获取前5个数据
.publishOn(UiUtils.reactorOnUiThread()) // 使用Ui线程
.subscribe(UiUtils::show,UiUtils::error); // 成功走show,失败走error
}
}
5.3、CompletableFuture的异步编程
Future的形式相比Callbacl效果要好一些,虽然JDK8和9都对CompletableFuture做了各种优化,但是他的表现还是不太好。多个Future在嵌套时,可读性还是比较差的。并且CompletableFuture不存在什么回压,或者是延迟调用的功能。
现在借助CompletableFuture来实现一个场景。
1、获取一个用户ID的列表。
2、通过用户ID分别获取他的名字以及统计信息。(希望这两个操作是并行执行的)
3、当两个信息都获取到之后,封装成一个普通字符串即可。
4、响应数据,最后拿到结果(输出一下)
实现的代码
public class GetNameAndStatTestByCF {
public static void main(String[] args) {
// 1、获取一组用户ID列表
CompletableFuture<List<String>> idList = getID();
CompletableFuture<List<String>> dataCompletableFuture = idList.thenComposeAsync(ids -> {
Stream<CompletableFuture<String>> resultStream = ids.stream().map(id -> {
// 2、并行基于ID查询名称信息
CompletableFuture<String> nameTask = getName(id);
// 2、并行基于ID查询统计信息
CompletableFuture<Integer> statTask = getStat(id);
// 让两个查询名称信息和查询统计信息操作并行执行
return nameTask.thenCombineAsync(statTask,(name,stat) -> {
// 3、拿到信息组装
return "Name:" + name + ",Stat:" + stat;
});
});
// 将resultStream转换成一个数组
List<CompletableFuture<String>> resultList = resultStream.collect(Collectors.toList());
// 全部的任务封装起来
CompletableFuture<Void> allDone = CompletableFuture.allOf(resultList.toArray(new CompletableFuture[]{}));
// 执行全部任务
return allDone.thenApplyAsync(v -> resultList.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
});
// 4、获取全部的组件信息后响应客户端(输出)
List<String> data = dataCompletableFuture.join();
System.out.println(data);
}
// 模拟zz服务获取统计信息
private static CompletableFuture<Integer> getStat(String id) {
return CompletableFuture.supplyAsync(() -> {
return 666;
});
}
// 模拟yy服务获取名称信息
private static CompletableFuture<String> getName(String id) {
return CompletableFuture.supplyAsync(() -> {
return "张三";
});
}
// 模拟xx服务,获取一组用户ID
private static CompletableFuture<List<String>> getID() {
return CompletableFuture.supplyAsync(() -> {
// 模拟查询三方服务
List<String> list = new ArrayList<>();
list.add("1");
list.add("2");
list.add("3");
return list;
});
}
}
5.4、解决CompletableFuture的问题
CompletableFuture可以实现一些简单的异步编程,但是可看性和维护性以后后期的扩展都需要对整体代码做比较大成本的维护。依然采用Reactor来实现一个一模一样的逻辑,再看代码效果。
public class GetNameAndStatByReactor {
public static void main(String[] args) {
// 1、获取一组用户ID列表
Flux<String> idFlux = getId();
Flux<String> result = idFlux.flatMap(id -> {
// 2、并行基于ID查询名称信息
Flux<String> nameFlux = getName(id);
// 2、并行基于ID查询统计信息
Flux<Integer> statFlux = getStat(id);
// 俩任务并行处理完毕,触发3
return nameFlux.zipWith(statFlux, (name, stat) -> {
// 3、拿到信息组装
return "Name:" + name + ",Stat:" + stat;
});
});
Mono<List<String>> listMono = result.collectList();
List<String> info = listMono.block();
// 4、获取全部的组件信息后响应客户端(输出)
System.out.println(info);
}
private static Flux<Integer> getStat(String id) {
// 会查询三方服务,然后封装结果
return Flux.just(888);
}
private static Flux<String> getName(String id) {
// 会查询三方服务,然后封装结果
return Flux.just("张三");
}
private static Flux<String> getId() {
// 会查询三方服务,然后封装结果
return Flux.just("1","2","3");
}
}
六、RxJava2实现异步编程
RxJava是一个小框架,或者是依赖库。在RxJava的1.x版本中,他并不基于ReactiveStreams去实现的。没有关系,因为RxJava的2版本,就是基于ReactiveStreams实现的了。
使用RxJava巨简单,因为作者想将RxJava尽量做到轻量级,就一个依赖。
<!-- RxJava2的依赖-->
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.21</version>
</dependency>
6.1 RxJava2的入门操作
获取一个Person对象的集合,将Person集合中的所有年龄大于10岁的Person对象删选出来,并输出他的名字。
采用RxJava来实现一波。
public class Demo1 {
public static void main(String[] args) {
//1、获取Person对象集合
List<Person> personList = getPersonList();
//2、完成上面要求的操作
//2.1、将person集合转换为RxJava的流
Flowable.fromArray(personList.toArray(new Person[]{}))
//2.2 过滤年龄大于10岁的
.filter(person -> person.getAge() > 10)
//2.3 获取筛选后的Person名称
.map(person -> person.getName())
//2.4 输出Name
.subscribe(System.out::println);
}
private static List<Person> getPersonList() {
List<Person> personList = new ArrayList<>();
personList.add(new Person("大娃",5));
personList.add(new Person("二娃",7));
personList.add(new Person("三娃",9));
personList.add(new Person("四娃",11));
personList.add(new Person("五娃",13));
return personList;
}
}
6.2 RxJava2的基础处理流程
在RxJava中有三个核心的角色
- 被观察者(Observable)
- 观察者(Observer)
- 订阅(Subscribe)
/**
* 采用RxJava2 ,来个最基本的操作
* @author zjw
* @description
*/
public class Demo2 {
public static void main(String[] args) {
//1、构建Observable
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Hello");
emitter.onNext("World");
emitter.onComplete();
}
});
//2、构建Observer
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("开始订阅");
}
@Override
public void onNext(String s) {
System.out.println("观察者:" + s);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("订阅结束");
}
};
//3、订阅
observable.subscribe(observer);
}
}
6.3 创建操作符
创建操作符的API还是比较多的,大致都介绍一下。
6.3.1 create
创建被观察者对象,传入ObservableOnSubscribe实现,指定数据流
具体实现在6.2中有。
6.3.2 just
可以在创建被观察者的同时,发送数据流内容
public static void main(String[] args) {
//1、构建被观察者,同时订阅观察者
Observable.just("大娃", "二娃", "三娃", "四娃")
.subscribe(new Observer<String>(){
@Override
public void onSubscribe(Disposable d) {
System.out.println("开始订阅");
}
@Override
public void onNext(String s) {
System.out.println("观察者:" + s);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("订阅结束");
}
});
}
另外,也将Observer的构建简化一下,可以直接使用consumer只编写onNext的逻辑
/**
* 前面的Demo3中指定observer太麻烦。
* @author zjw
* @description
*/
public class Demo4 {
public static void main(String[] args) {
//1、构建被观察者,同时订阅观察者
Observable.just("大娃", "二娃", "三娃", "四娃")
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("观察者:" + s);
}
});
}
}
6.3.3 fromArray
前面聊的just,最多可以传递10个事件/流。 而fromArray入参直接是一个可变参。
public class Demo5 {
public static void main(String[] args) {
Observable.fromArray("大娃","二娃","三娃")
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("观察者:" + s);
}
});
}
}
6.3.4 fromCallable
可以基于fromCallable传递一个函数,基于函数计算出来的返回结果,就是咱们被观察者要传递的事件/流
public class Demo6 {
public static void main(String[] args) {
Observable.fromCallable(() -> {
return "Done!";
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("观察者:" + s);
}
});
}
}
6.3.5 timer
timer方法的做到指定之间之后,发送一个 0L 的值,给观察者。这个0L不能动,他的目的就是触发观察者去执行一下他的函数,做操作。
F
public class Demo7 {
public static void main(String[] args) throws IOException {
Observable.timer(2, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// 在调用timer时,默认会传递一个Schedulers.computation()的线程池,这个线程池中的线程,是守护线程
// 所以在最开始,没有查看到观察者的输出。
System.out.println(Thread.currentThread().getName());
System.out.println("观察者:" + aLong);
}
});
// 让main函数的用户线程不要退出!
System.in.read();
}
}
6.3.6 interval
每隔一段时间,就会发送一个事件/流,发送的时间是从0开始递增,1,2,3,4。
public class Demo8 {
public static void main(String[] args) throws IOException {
Observable.interval(2, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println(Thread.currentThread().getName() + ":" + aLong);
}
});
System.in.read();
}
}
6.3.7 intervalRange
和interval类似,但是Range的意思是指定发送的事件/流,从什么数值开始,以及具体的数量
public class Demo9 {
public static void main(String[] args) throws IOException {
Observable.intervalRange(100,4,0,2, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println(Thread.currentThread().getName() + ":" + aLong);
}
});
System.in.read();
}
}
6.3.8 range&rangeLong
同一时间,发送事件的序列,指定开始和结束,包左不包右 [)
rangeLong就是传递的序列是Long类型的,而普通的range传递的是Integer
public class Demo10 {
public static void main(String[] args) throws IOException {
Observable.range(0,10)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(Thread.currentThread().getName() + ":" + integer);
}
});
System.in.read();
}
}
6.3.9 never、error、empty
- never:不发送任何事件
- error:触发onError事件
- empty:触发onComplete事件
public class Demo11 {
public static void main(String[] args) {
// Observable.never()
// Observable.error(new RuntimeException("error事件"))
Observable.empty()
.subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("开始订阅");
}
@Override
public void onNext(Object s) {
System.out.println("观察者:" + s);
}
@Override
public void onError(Throwable e) {
System.out.println("出现异常:" + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("订阅结束");
}
});
}
}