1、响应式编程概述
1.1、响应式编程介绍
1.1.1、为什么需要响应式
传统的命令式编程在面对当前的需求时的一些限制。在应用负载较高时,要求应用需要有更高的可用性,并提供低的延迟时间。
1、Thread per Request 模型
比如使用Servlet开发的单体应用,部署到tomcat。tomcat有线程池,每个请求交给线程池中的一个线程来执行,如果执行过程中包括访问数据库,或者包括读取文件,则在调用数据库时或读取文件时,请求线程是阻塞的,即使是阻塞线程也是占用资源的,典型的每个线程要使用1MB的内存。
如果有并发请求,则会同时有多个线程处于阻塞状态,每个线程占据一份资源。同时,Tomcat的线程池大小决定了可以同时处理多少个请求。
Servlet容器有专门的线程池用于管理HTTP请求,每个请求对应一个线程,该线程负责该请求的整个生命周期(Thread per Request模型)。意味着应用仅能处理并发数为线程池大小的请求。可以配置更大的线程池,但是线程占用内存(一般一个线程1MB的样子),线程数越多,占用的内存越大。
如果应用基于微服务架构,我们可以做横向扩展,但是也有内存高占用的问题。因此,当并发数很大的时候,Thread per Request模型很消耗资源。
微服务架构一个特性是分布式,运行很多分立的进程(很多服务器)。传统的命令式编程使用同步的请求/响应模式在服务之间通信,线程需要频繁的在服务调用的时候阻塞。浪费了资源。
2、等待I/O操作
在I/O操作中也存在大量的资源浪费:如调用数据库,读取文件等。
此时,发出I/O请求的线程会阻塞等待I/O操作的完成,即阻塞式I/O。这些线程的阻塞仅仅是为了等待一个响应,浪费了线程,浪费了内存。
3、响应延迟
传统命令式编程另一个问题是:当一个服务需要做很多操作而不仅仅是I/O请求的时候,响应延迟相应的增大。如服务A需要调用服务B和C,比如查询数据库,聚合结果并返回。意味着服务A的响应时间包括:
- 服务B的响应时间(网络延迟时间+处理时间)
- 服务C的响应时间(网络延迟时间+处理时间)
- 数据库请求响应时间(网络延迟时间+处理时间)
如果服务调用没有前后依赖关系,则可以并行调用服务。如果使用java的CompletableFuture异步调用并注册回调,开发会复杂很多,而且阅读和维护也会复杂很多。
4、压垮客户端
微服务的另一个问题是:服务A请求服务B的数据,如果数据量很大,超过了服务A能处理的程度,则导致服务OOM。
5、响应式编程的优势
- 不用Thread per Request模型,使用少量线程即可处理大量的请求。
- 在执行I/O操作时不让线程等待。
- 简化并行调用。
- 支持背压,让客户端告诉服务端它可以处理多少负载。
1.1.2、消息驱动通信
1、定义
响应式编程是使用异步、事件驱动构建非阻塞式应用的,此类应用仅需要少量的线程用于横向扩展。该定义的关键一点是:借助背压技术,防止生产者压垮消费者。
如服务A需要从服务B获取数据。对于响应式编程,服务A向服务B发起请求,并立即返回(非阻塞、异步)。
之后,请求的数据以数据流的方式返回给服务A,服务B对每个数据项发布onNext事件。当所有的数据都发布了onNext事件,就发布onComplete事件结束。如果发生异常,就服务B就发布onError事件,之后不再发布onNext事件。
2、响应式系统
- 响应性(以时序的方式响应)
- 健壮(即使发生错误也可以保证响应性)
- 弹性(在不同的工作负载下保持响应性)
- 消息驱动(依赖异步消息传递机制)
响应式编程可以确保单个服务的异步非阻塞,整个系统的响应式需要整体考虑。
1.1.3、响应性应用案例
1、相关术语
流是一个时序事件序列,可以发射三种不同的事件:(某种类型的)值、错误或者一个完成信号。监听一个流称为订阅,定义的函数是观察者,流是被观察者,即观察者模式。每个流都会有多个方法,如 map, filter, scan, 等等。
当调用其中一个方法时,它会基于原来的流返回一个新的流。它不会对原来的点击流作任何修改。这个特性称为不可变性,也可以对方法进行链式调用。
clickStream: ---c----c--c----c------c-->
vvvvv map(c becomes 1) vvvv
---1----1--1----1------1-->
vvvvvvvvv scan(+) vvvvvvvvv
counterStream: ---1----2--3----4------5-->
- map(f) 会根据你提供的 f 函数把原 Stream 中的 Value 分别映射到新的 Stream 中。
- scan(g) 会根据你提供的 g 函数把 Stream 中的所有 Value 聚合成一个 Value x = g(accumulated,current)
每 Click 一次, counterStream 就会把点击的总次数发给它的观察者。
2、请求和响应
在 RX 中,创建只有一个值的流非常简单。官方把一个 流称作“Observable” :
var requestStream = Rx.Observable.just('https://api.github.com/users');
创建了包含一个字符串的流,并没有其他操作,需要以某种方式使那个值被映射。就是通过subscribing 这个流。
requestStream.subscribe(function(requestUrl) {
// 处理请求
jQuery.getJSON(requestUrl,function(responseData) {
// ...
})
});
Rx 可以用来处理异步数据流。 请求的响应也可以当作一个包含了将会到达的数据的流。
requestStream.subscribe(function(requestUrl) {
// 处理请求
var responseStream = Rx.Observable.create(function(observer) {
jQuery.getJSON(requestUrl).done(function(response) {
observer.onNext(response);
}).fail(function(jqXHR, status, error) {
observer.onError(error);
}).always(function() {
observer.onCompleted();
});
});
responseStream.subscribe(function(response) {
// 对响应进行一些其他操作
});
});
Rx.Observable.create() 创建了响应式流。该流通过显式的通知每一个 Observer (或者说是 “Subscriber”) 数据事件( onNext() )或者错误事件 ( onError() )。
在 subscribe() 内又调用了另外一个 subscribe() ,这类似于 Callback hell。responseStream 是建立在 requestStream 之上的。
var responseMetastream = requestStream.map(function(requestUrl) {
// 从请求的URL获取结果,封装为响应流
return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
});
函数 map(f) ,分别把 f() 应用到 流 A 中的每一个值,并把返回的值放进流 B 中。
map表示转换,将一个类型的数据转换为另一个类型的。上述map转换是将流元素转换为流元素,流元素又可以订阅。但是我们只需要它输出一个流,流中的元素是结果数据即可。
因此需要使用flatMap,flatMap会打破元素之间的结构,将数据拉平:
var responseStream = requestStream.flatMap(function(requestUrl) {
// 将从指定URL获取的结果转换为响应式流
return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
});
此时:
requestStream: --a-----b--c------------|->
responseStream: -----A--------B-----C---|->
(小写是请求,大写是响应)
渲染数据:
responseStream.subscribe(function(response) {
// 渲染数据的代码
});
目前为止所有的代码:
// 从指定URL创建请求流
var requestStream = Rx.Observable.just('https://api.github.com/users');
// 对请求流中的元素执行创建响应流
var responseStream = requestStream.flatMap(function(requestUrl) {
return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
});
// 响应流的订阅方法,在该方法中提供函数,对响应的结果进行渲染
responseStream.subscribe(function(response) {
// 渲染结果数据
});
3、应用案例
页面有个刷新按钮,每点击一次刷新按钮,请求 流就会映射一个新的 URL,同时我们也能得到一个新的响应。
// 获取刷新按钮对象
var refreshButton = document.querySelector('.refresh');
// 对刷新按钮的点击事件封装为刷新点击事件流
var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');
把刷新点击事件流改为新的请求 流,其中每一个 点击事件 都分别映射为带有随机偏移量的 API 端点。
// 刷新点击事件
var requestStream = refreshClickStream.map(function() {
// 将刷新点击事件流中的每个元素按照如下方式进行转换
// 生成随机偏移量
var randomOffset = Math.floor(Math.random() * 500);
// 生成新的URL,该URL包含了随机偏移量
return 'https://api.github.com/users?since=' + randomOffset;
});
// 启动事件请求流
var startupRequestStream = Rx.Observable.just('https://api.github.com/users');
通过merge函数将上述两个流合并到一个流中:
stream A: ---a--------e-----o----->
stream B: -----B---C-----D-------->
vvvvvvvvv merge vvvvvvvvv
---a-B---C--e--D--o----->
// 刷新点击事件请求流
var requestOnRefreshStream = refreshClickStream.map(function() {
var randomOffset = Math.floor(Math.random() * 500);
return 'https://api.github.com/users?since=' + randomOffset;
});
// 启动请求事件流
var startupRequestStream = Rx.Observable.just('https://api.github.com/users');
// 将两个流进行合并
var requestStream = Rx.Observable.merge(requestOnRefreshStream, startupRequestStream);
简洁的编写方式:
var requestStream = refreshClickStream.map(function() {
var randomOffset = Math.floor(Math.random() * 500);
return 'https://api.github.com/users?since=' + randomOffset;
})
// 调用merge操作符,合并另一个流
.merge(Rx.Observable.just('https://api.github.com/users'));
甚至更简短的方式:
var requestStream = refreshClickStream.map(function() {
var randomOffset = Math.floor(Math.random() * 500);
return 'https://api.github.com/users?since=' + randomOffset;
})
// 调用startWith操作符,当启动的时候发送该事件
.startWith('https://api.github.com/users');
1.1.4、响应式的现状
2011年,微软发布了.NET的响应式扩展(Reactive Extensions,即ReactiveX或Rx),以方便异 步、事件驱动的程序。ReactiveX混合了迭代器模式和观察者模式。不同之处在于一个是推模式,一个是基于迭代器的拉模式。除了对变化事件的观察,完成事件和异常事件也会发布给订阅者。
ReactiveX的基本思想是事件是数据,数据是事件。
响应式扩展被移植到几种语言和平台上,包括 JavaScript、Python、C++、Swift和java。ReactiveX 很快成为一种跨语言的标准,将响应式编程引入到行业中。
RxJava,是 Java 的 ReactiveX 实现,很大程度上由 Netflix 的 bench ristensen 和 david karnok创建的。RxJava 1.0于2014年11月发布。RxJava是其他reactivex jvm平台技术的主要技术,其他的如如RxScala、RxKotlin、RxGroovy。RxJava已经成为Android开发的核心技术,并且已经进入Java后端开发。
许多RxJava adapter库,例如RxAndroid,RxJava JDBC,RxNetty, 和RxJavaFX调整了几个Java框架,使之成为响应式的,并且可以开箱即用地使用RxJava。这表明RxJava不仅仅是一个库。它是更大的ReactiveX生态系统的一部分,代表了整个编程方法。
1.1.5、为什么采用响应式 Spring
1、前言
响应式系统非常复杂,在构建这类系统时困难比较多。要轻松创建响应式系统,就必须首先分析能够构建这类系统的框架,然后选择其中一个。选择框架最常用的方法之一是分析其可用功能、相关性以及社区。
在 JVM 领域,构建响应式系统的最知名框架是 Akka 和 Vert.x 生态系统。
一方面,Akka 是一个受欢迎的框架,具有大量功能和大型社区。然而,Akka 最初是作为 Scala生态系统的一部分构建的,在很长一段时间内,它仅在基于 Scala 编写的解决方案中展示了它的强大功 能。尽管 Scala 是一种基于 JVM 的语言,但它与 Java 明显不同。几年前,Akka 直接开始支持 Java,但出于某些原因,它在 Java 世界中不像在 Scala 世界中那么受欢迎。
另一方面,Vert.x 框架也是构建高效响应式系统的强大解决方案。Vert.x 的设计初衷是作为Node.js 在 Java 虚拟机上的替代方法,它支持非阻塞和事件驱动。
2、服务级别的响应性
2.1、Consumer 接口
现在大型系统由多个小系统组成,系统整体的响应性依赖于这些小系统的响应性。也就是说,响应式系统的设计原则适用于各个级别、各种规模的系统,有助于它们很好地组合在一起。
因此,在服务级别提供响应式设计和实现很重要。在使用 Java 编写代码的过程中,最流行的传统技术是命令式编程(imperative programming)。为了理解命令式编程是否遵循响应式系统设计原则,见下图:
在这里,Web 商店应用程序中有两个组件。在这种场景下,OrdersService 在处理用户请求时调用ShoppingCardService。假设 在 ShoppingCardService 内部执行长时间运行的 I/O操作,例如 HTTP 请求或数据库查询,
import com.blnp.net.webflux.demo.entity.Input;
import com.blnp.net.webflux.demo.entity.Output;
/**
* 接口
* 只有一个calculate方法接收一个参数,处理完之后返回结果
*/
public interface ShoppingCardService {
Output calculate(Input value);
}
package com.blnp.net.webflux.demo.service;
import com.blnp.net.webflux.demo.entity.Input;
import com.blnp.net.webflux.demo.entity.Output;
public class OrderService {
private ShoppingCardService service;
void process() {
Input input = ...;
// 同步调用 ShoppingCardService 并在执行后立即接收结果
Output output = service.calculate(input);
}
}
在这种场景下,OrdersService的执行过程与 ShoppingCardService 的执行过程紧密耦合。当ShoppingCardService 处于处理阶段时,无法继续执行任何其他操作。
service.calculate(input)的执行过程阻塞了处理 OrdersService 逻辑的线程。想要在OrdersService中运行单独的独立处理,必须分配一个额外的线程。额外线程的分配可能是一种浪费。从响应式系统的角度来看,这种系统行为是不可接受的。上述逻辑也可以通过应用回调(callback)技术来解决,以实现跨组件通信。
package com.blnp.net.webflux.demo.service;
import com.blnp.net.webflux.demo.entity.Input;
import com.blnp.net.webflux.demo.entity.Output;
import java.util.function.Consumer;
public interface ShoppingCardService {
void calculate(Input value, Consumer <Output> c);
}
package com.blnp.net.webflux.demo.service;
import com.blnp.net.webflux.demo.entity.Input;
public class OrderService {
private final ShoppingCardService service;
public OrderService(ShoppingCardService service) {
this.service = service;
}
void process() {
Input input = null;
service.calculate(input, output - > {
// 需要执行的回调逻辑
});
}
}
package com.blnp.net.webflux.demo.entity;
public class Input {}
package com.blnp.net.webflux.demo.entity;
public class Output {}
OrdersService 传递回调函数以便在操作结束时做出响应。即OrdersService 现在与ShoppingCardService 实现了解耦,OrdersService 可以通过ShoppingCardService#calculate 方法所实现(调用给定函数)的回调获取通知,这个过程可以是同步的也可以是异步的。
package com.blnp.net.webflux.demo.service.impl;
import com.blnp.net.webflux.demo.entity.Input;
import com.blnp.net.webflux.demo.entity.Output;
import com.blnp.net.webflux.demo.service.ShoppingCardService;
import java.util.function.Consumer;
/**
* 该实现假定没有阻塞操作。由于我们没有执行 I/O 操作,将结果传递给回调函数来立即返回结
果。
*/
public class SyncShoppingCardService implements ShoppingCardService {
@Override
public void calculate(Input value, Consumer < Output > c) {
Output output = new Output();
c.accept(output);
}
}
package com.blnp.net.webflux.demo.service.impl;
import com.blnp.net.webflux.demo.entity.Input;
import com.blnp.net.webflux.demo.entity.Output;
import com.blnp.net.webflux.demo.service.ShoppingCardService;
import org.springframework.web.client.RestTemplate;
import java.util.function.Consumer;
/**
* 当发生阻塞I/O 时,我们可以将它包装在单独的 Thread 中。
* 获取结果之后,处理并传递给回调函数。
*/
public class AsyncShoppingCardService implements ShoppingCardService {
private final RestTemplate template = new RestTemplate();
@Override
public void calculate(Input value, Consumer < Output > c) {
new Thread(() - > {
Output result = template.getForObject("", null, "");
c.accept(result);
}).start();
}
}
ShoppingCardService 的同步实现中,从 API 角度讲并不提供任何好处。在异步的情况下,而请求将在单独的线程中执行。OrdersService 与执行过程进行解耦,并通过执行回调获取完成的通知。
组件通过回调函数实时解耦,意味着在调用 service.calculate方法之后,能够立即继续执行其他操作,无须等待 ShoppingCardService 的阻塞式响应。该技术的缺点是,回调要求开发人员对多线程有深入的理解,以避免共享数据修改陷阱和回调地狱(callback hell)。
package blnp.net.cn.jvm.demos.callback;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
/**
* <p></p>
*
* @author lyb 2045165565@qq.com
* @version 1.0
* @since 2024/9/21 14:42
*/
public class OrderService {
private static ShopCardService service;
static {
OrderService.service = new ShopCardService() {
@Override
public Output calculate(Input val, Consumer<Output> out) {
Output result = new Output();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("============================ calculate 方法执行中…… ============================");
result.setMessage("my world!");
System.out.println("这里是验证测试1………………");
out.accept(result);
System.out.println("这里是验证测试2………………");
return result;
}
};
}
static void process() {
Input input = null;
System.out.println("============================ start Process 方法 ============================");
service.calculate(input,output -> {
System.out.println("进入阻塞方法执行…………");
// 需要执行的回调逻辑
String message = output.getMessage();
System.out.println("message = " + message);
});
System.out.println("执行123……");
System.out.println("执行1234……");
System.out.println("============================ end Process 方法执行 ============================");
List<Input> info = new ArrayList<>();
Consumer<Input> printFun = detail -> {
detail.setGradeId(20L);
};
info.forEach(printFun);
}
public static void main(String[] args) {
process();
}
}
根据示例程序的执行结果可以明显看到,实现了解耦且不需要等待。但是时机是在 accept 方法的前后。
执行orderService类的 process 方法时,调用了 ShopCardService 的 calculate 方法。在 calculate 方法没有把结果 accept 之前都是同步执行的,acept 后 process 方法与 calculate 方法的部分执行是并行的。
2.2、Future 接口
回调技术不是唯一的选择。另一个选择是 java.util.concurrent.Future,它在某种程度上隐藏了执行行为并解耦了组件。
import java.util.concurrent.Future;
public interface ShoppingCardService {
// 返回值是可以阻塞方式获取结果的Future对象
Future < Output > calculate(Input value);
}
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class OrderService {
private final ShoppingCardService service;
public OrderService(ShoppingCardService service) {
this.service = service;
}
void process() throws ExecutionException, InterruptedException {
Input input = null;
// 异步调用
Future < Output > result = service.calculate(input);
// 同步获取结果,也可以随后获取结果
Output output = result.get();
}
}
public class Input {
}
public class Output {
}
通过 Future 类,实现了对结果的延迟获取。 在 Future 类的支持下,避免了回调地狱,并将实现多线程的复杂性隐藏在了特定 Future 实现的背后。无论如何,为了获得需要的结果,必须阻塞当前的线程并与外部执行进行同步,这显著降低了可伸缩性。
作为改进,Java 8 提供了 CompletionStage 以及它的直接实现 CompletableFuture。同样,这些类提供了类似 promise 的 API,使构建如下代码成为可能。
public class OrderService {
private final ShoppingCardService service;
public OrderService(ShoppingCardService service) {
this.service = service;
}
void process() {
Input input = null;
// CompletionStage 是一个类似于 Future 的类包装器,但能以函数声明的方式处理返回的结果。
// CompletionStage 的整体行为类似于Future,
// 但 CompletionStage 提供了一种流式 API,可以编写 thenAccept 和 thenCombine等方法。
// 这些方法定义了对结果的转换操作,然后 thenAccept 方法定义了最终消费者,用于处理转换后的结果。
service.calculate(input).thenApply(output1 - > {
return output1;
}).thenCombine(otherStage, (output2, otherOutput) - > {
return
output2;
}).thenAccept(output3 - > {});
}
}
在 CompletionStage 的支持下,可以编写函数式和声明式的代码,这些代码看起来很整洁,并且能够异步处理结果。还可以省略等待结果的过程,并提供在结果可用时对其进行处理的功能。
尽管 CompletionStage 为编写高效且可读性强的代码提供了更好的可能性,但遗憾的是,其中存在一些遗漏点。例如,Spring 4 MVC 在很长时间内不支持 CompletionStage。 为了弥补这一点,Spring提供了自己的 ListenableFuture。之所以发生这种情况,是因为 Spring 4 旨在与旧的 Java 版本兼容。以下代码展示了如何结合使用 ListenableFuture 与AsyncRestTemplate。
public class WebController {
@Autowired
private AsyncRestTemplate asyncRestTemplate;
@RequestMapping("/demo")
public String demo() {
try {
Thread.sleep(30000);
} catch (Exception e) {
e.printStackTrace();
}
return new Date()+"--->>>30秒。。。。";
}
@RequestMapping("/async")
public String async() {
String url = "http://127.0.0.1:8080/demo";
ListenableFuture<ResponseEntity<String>> forEntity = asyncRestTemplate.getForEntity(url, String.class);
//异步调用后的回调函数
forEntity.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
//调用失败
@Override
public void onFailure(Throwable ex) {
System.err.println("-----调用接口失败-------");
}
//调用成功
@Override
public void onSuccess(ResponseEntity<String> result) {
System.out.println("--->异步调用成功, result = "+result.getBody());
}
});
return new Date()+"--->>>异步调用结束";
}
}
2.3、线程单次请求模型
这个模型非常不理想,效率低下。一方面,多线程本质上是一种复杂的技术。当我们使用多线程时,必须考虑很多事情,例如从不同线程访问共享内存、线程同步、错误处理等。
另一方面,Java 中的多线程设计假设一些线程可以共享一个 CPU,并同时运行它们的任务。CPU 时间能在多个线程之间共享的事实引入了上下文切换(context switching)的概念。这意味着后续要恢复线程,就需要保存和加载寄存器、存储器映射以及其他通常属于计算密集型操作的相关元素。因此,具有大量活动线程和少量 CPU 的应用方式将会效率低下。
同时,典型的 Java 线程在内存消耗方面有一定开销。在 64 位 Java 虚拟机上线程的典型栈大小为1024 KB。一方面,尝试在单连接线程(thread per connection)模型中处理 64 000 个并发请求可能使用大约 64 GB 的内存。从业务角度来看,这可能代价高昂;从应用程序的角度来讲,这也是很严重的问题。另外,如果切换到具有有限大小的传统线程池和响应请求的预配置队列,那么客户端等待响应的时间会太长,不是太可靠,因为这增加了平均响应超时的现象,最后可能导致应用程序丧失即时响应性。
为此,响应式规范建议使用非阻塞操作,但这是 Spring 生态系统缺少的。此外,Spring 也没有与Netty 等响应式服务器进行良好集成,而这些响应式服务器解决了上下文切换的问题。
异步处理不仅限于简单的请求−响应模式,对于包含无限元素的数据流,可以具有背压支持的对齐转换流的方式处理它,如下图所示:
1.2、响应式相关问题
1.2.1、API 不一致问题
由于存在大量的同类型响应式库可供选择(如RxJava,CompletableStage,Vert.x,AKKA等)。
例如,我们可能依赖于使用 RxJava 的 API 来编写正在处理的数据项的流程。要构建一个简单的异步请求−响应交互,依赖 CompletableStage 就足够了。
我们也可以使用特定于框架的类(如 org.springframework.util.concurrent.ListenableFuture )来构建组件之间的异步交互,并基于该框架简化开发工作。
另一方面,丰富的选择很容易使系统过于复杂。例如,若存在两个依赖于同一个异步非阻塞通信概念但具有不同 API 的库,会导致我们需要提供额外的工具类,以便将一个回调转换为另一个回调;反之亦然。代码如下:
// async 数据库客户端的接口声明,它是支持异步数据库访问的客户端接口的代表性示例
interface AsyncDatabaseClient {
< T > CompletionStage < T > store(CompletionStage < T > stage);
}
final class AsyncAdapters {
public static < T > CompletionStage < T > toCompletion(ListenableFuture < T > future) {
CompletableFuture < T > completableFuture = new CompletableFuture < > ();
future.addCallback(completableFuture::complete, completableFuture::completeExceptionally);
return completableFuture;
}
public static < T > ListenableFuture < T > toListenable(CompletionStage < T > stage) {
SettableListenableFuture < T > future = new SettableListenableFuture < > ();
stage.whenComplete((v, t) - > {
if(t == null) {
future.set(v);
}
else {
future.setException(t);
}
});
return future;
}
}
@RestController
public class MyController {
// ...
@RequestMapping
public ListenableFuture < ? > requestData() {
AsyncRestTemplate httpClient = ...;
AsyncDatabaseClient databaseClient = ...;
CompletionStage < String > completionStage = toCompletion(httpClient.execute(...));
return toListenable(databaseClient.store(completionStage));
}
}
Spring 5.x 框架扩展了 ListenableFuture 的 API 并且提供了一个 Completable 的方法来解决不兼容的问题。这里的核心问题在于没有标准。
-
Spring 4. x 框架中的 ListenableFuture 和 CompletionStage 之间没有直接集成。
-
许多库和框架为组件之间的异步通信提供了自己的接口和类,包括简单请求 −响应通信以及流处理框架。
-
此外,自己写的适配可能有 bug ,需要额外的维护。
-
在许多情况下,为了解决这个问题并使几个独立的库兼容,必须提供自己的适配并在几个地方重用。
-
该示例并没有脱离响应式编程的常见用法。
1.2.2、推拉模式
在整个响应式环境演变的早期阶段,所有库的设计思想都是把数据从源头推送到订阅者。因为纯粹的拉模型在某些场景下效率不够高。
假设我们要过滤一大堆数据,但只取前 10 个元素。可以采用拉模型来解决这个问题:
// AsyncDatabaseClient 字段声明。在这里,我们使用该客户端将异步、非阻塞通信与外部数据库连接起来。
final AsyncDatabaseClient dbClient = ...;
/*
这是 list 方法声明。这里我们通过返回 CompletionStage 声明一个异步契约,并将其作为调用
list 方法的结果。
同时,为了聚合拉取结果并将其异步发送给调用者,我们声明Queue 和 CompletableFuture 来存储
接收的值,并在稍后手动发送所收集的 Queue。
*/
public CompletionStage < Queue < Item >> list(int count) {
BlockingQueue < Item > storage = new ArrayBlockingQueue < > (count);
CompletableFuture < Queue < Item >> result = new CompletableFuture < > ();
pull("1", storage, result, count);
return result;
}
/*
这是 pull 方法声明。在该方法中,我们调用 AsyncDatabaseClient#getNextAfterId来执行查
询并异步接收结果。然后,当收到结果时,我们在点(3.1)处对其进行过滤。如果是有效项,我们就将其
聚合到队列中。另外,在点(3.2),我们检查是否已经收集了足够的元素,将它们发送给调用者,然后退
出拉操作。如果任何一个所涉及的 if 分支被绕过,就再次递归调用 pull方法(3.3)。
*/
void pull(String elementId, Queue < Item > queue, CompletableFuture resultFuture, int count) {
dbClient.getNextAfterId(elementId).thenAccept(item - > {
if(isValid(item)) {
queue.offer(item);
if(queue.size() == count) {
returnFuture.complete(queue);
return;
}
}
pull(item.getId(), queue, resultFuture, count);
});
}
从上述代码可以看出,在服务和数据库之间使用了异步、非阻塞交互。但是,如果查看下图,就会看到其缺陷所在。
逐个请求下一个元素会导致在从服务传递请求到数据库上花费额外的时间。
从服务的角度来看,整体处理时间大部分浪费在空闲状态上。即使没有使用资源,由于额外的网络活动,整体处理时间也会是原来的两倍甚至三倍。
此外,数据库不知道未来请求的数量,这意味着数据库不能提前生成数据,并因此处于空闲状态。这意味着数据库正在等待新请求。
在响应被传递给服务、服务处理传入响应然后请求新数据的过程中,其效率会比较低下。为了优化整体执行过程并将模型维持为一等公民,可以将拉取操作与批处理结合起来,示例代码的修改如下所示:
// 与上一个例子中的 pull 方法声明相同。
void pull(String elementId, Queue < Item > queue, CompletableFuture resultFuture, int count) {
/*
这是 getNextBatchAfterId 执行过程。可以注意到,AsyncDatabaseClient 方法可用于查
询特定数量的元素,这些元素作为 List <Item>返回。反过来,当数据可用时,除了要创建额外的
for 循环以分别处理该批的每个元素,我们对它们的处理方式几乎相同。
*/
dbClient.getNextBatchAfterId(elementId, count).thenAccept(items - > {
for(Item item: items) {
if(isValid(item)) {
queue.offer(item);
if(queue.size() == count) {
resultFuture.complete(queue);
return;
}
}
}
// 这是递归 pull 方法的执行过程,在缺少来自上次拉取的数据项的情况下,这个设计会被用于获取另外一批数据项。
pull(items.get(items.size() - 1).getId(), queue, resultFuture, count);
});
}
一方面,通过查询一批元素,可以显著提高 list 方法执行的性能并显著减少整体处理时间;另一方面,交互模型中仍然存在一些缺陷,而该缺陷可以通过分析下图来检测。
该流程在处理时间方面仍然存在一些效率低下的情况。例如,当数据库查询数据时,客户端仍处于空闲状态。同时,发送一批元素比发送一个元素需要更多的时间。
最后,对整批元素的额外请求实际上可能是多余的。例如,如果只剩下一个元素就能完成处理,并且下一批中的第一个元素就满足验证条件,那么其余的数据项就完全是多余的且会被跳过。
为了提供最终的优化,只会请求一次数据,当数据变为可用时,该数据源异步推送数据。以下代码修改展示了如何实现这一过程:
// 这是 list 方法声明。这里的 Observable <Item>会返回一个类型,该类型标识正在被推送的元素。
public Observable < Item > list(int count) {
/*
这是流查询阶段。通过调用 AsyncDatabaseClient#getStreamOfItems 方法,我们会对数据
库完成一次订阅。在点(2.1),我们会过滤元素,并且通过使用.take()操作符根据调用者的请求获取特
定数量的数据(2.2)。
*/
return dbClient.getStreamOfItems().filter(item - > isValid(item)).take(count);
}
在这里,我们使用 RxJava 1.x 类作为一等公民来接收所推送的元素。反过来,一旦满足所有要求,就发送取消信号并关闭与数据库的连接。当前的交互流程如下图所示。
如上图所示,再次对整体处理时间做了优化。在交互过程中,只有当服务等待第一个响应时会有一大段空闲时间。当第一个元素到达后,数据库会在数据到来时开始发送后续元素。
反过来,即使处理一个元素的过程可能比查询下一个元素快一点,服务的整体空闲时间也会很短。但是,在服务已经收集到所需数量的元素后,数据库仍可能生成多余元素,此时数据库会忽略它们。
1.2.3、流量控制问题
采用推模型主要是因为可以通过将请求量减少到最小以优化整体处理时间。
这就是为什么RxJava 1.x及类似的开发库以推送数据为目的进行设计,这也是为什么流技术能成为分布式系统中组件之间重要的通信技术。
如果仅仅与推模型进行组合,那么该技术有其局限性。消息驱动通信的本质是假设每个请求都会有一个响应,因此服务可能收到异步的、潜在的无限消息流。而这里存在陷阱,因为如果生产者不关注消费者的吞吐能力,它可能会以下面两节中描述的方式影响系统的整体稳定性。
-
慢生产者和快消费者
-
快生产者和慢消费者
使用队列处理所推送数据的关键要素之一是选择具有合适特性的队列。通常,有 3 种常见的队列类型:无界队列、有界丢弃队列、有界阻塞队列。
1、无界队列
最明显的解决方案是提供一个具有无限大小特性的队列,简单地说就是一个无界队列。在这种情况下,所有生成的元素首先存储在队列中,然后由实际订阅者进行消费。如下图所示:
一方面,使用无界队列处理消息带来的核心好处是消息的可传递性,这意味着消费者将在某个时间点及时处理所有存储的元素。
另一方面,只要成功实现消息的可传递性,因为没有无限制的资源,应用程序的回弹性就会降低。例如,一旦内存达到上限,整个系统很容易崩溃。
2、有界丢弃队列
为了避免内存溢出,我们还可以使用在自身已满的情况下可以忽略传入的消息的队列。下图描绘了一个容量为 2 个元素的队列,其特性是在溢出时丢弃元素。
该技术考虑了资源的限制,并且可以根据资源的能力配置队列的容量。当消息的重要性很低时,采用这种队列是一种常见的做法。
一个业务场景的示例可以是一种代表数据集变更的事件流。同时,每个事件都会触发一些重新进行统计计算的操作,该操作使用整个数据集聚合,与传入事件数量相比会花费大量时间。在这种情况下,唯一重要的是数据集已发生变化这一事实,而哪些数据受到影响并不重要。
3、有界阻塞队列
然而,在每个消息都很重要的情况下,上述技术是不可接受的。例如,支付系统必须处理每个用户提交的支付请求,绝不允许丢弃一部分请求。因此,我们可以在达到上限后阻塞生产者,却不能丢弃消息并保留有界队列以处理被推送的数据。具备阻塞生产者能力的队列通常被称为阻塞队列。下图描述了使用阻塞队列进行交互的示例,该队列的容量为 3 个元素。
遗憾的是,这种技术否定了系统的所有异步行为。通常,一旦生产者达到队列的限制,它就会开始被阻塞并将一直处于该状态,直到消费者消费了一个元素,从而使队列中出现可用空间为止。由此我们可以得出结论,最慢的消费者的吞吐量限制了系统的总吞吐量。继而,除了否定异步行为,该技术还否定了有效的资源利用率。因此,如果我们想要实现回弹性、弹性和即时响应性所有这 3 个方面,那么这些场景全部不可接受。
此外,队列的存在不仅可能使系统的整体设计复杂化,还会增加在上述解决方案之间找到平衡的额外责任。
通常,纯推模型中不受控制的语义可能导致许多我们不希望出现的情况。这就是为什么响应式规范要提到使系统巧妙地响应负载的机制的重要性,即背压控制机制的重要性。
遗憾的是,类似 RxJava 1.x 这样的响应式库并没有提供这样的标准化功能。没有明确的 API能用于控制开箱即用的背压机制。
1.3、响应式规范
1.3.1、响应式流规范
响应式流规范:http://www.reactive-streams.org/
响应式规范发布了一组接口,用于实现。对应maven仓库地址:
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck-flow</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-examples</artifactId>
<version>1.0.3</version>
</dependency>
响应式流的规范文档:reactive-streams-jvm/README.md at v1.0.3 · reactive-streams/reactive-streams-jvm · GitHub
响应式流(Reactive Streams)规范,规定了异步组件之间使用背压进行交互。
响应式流在Java 9中使用Flow API适配。Flow API是互操作的规范,而不是具体的实现,它的语义跟响应式流规范一致。响应式流规范包括如下接口:
Publisher :
表示数据流的生产者或数据源,包含一个方法让订阅者注册到发布者,Publisher 代表了发布者和订阅者直接连接的标准化入口点。
public interface Publisher<T> {
public void subscribe(Subscriber<?super T> s);
}
Subscriber:
表示消费者,onSubscribe 方法为我们提供了一种标准化的方式来通知 Subscriber 订阅成功。
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
-
onSubscribe: 发布者在开始处理之前调用,并向订阅者传递一个订阅票据对象( Subscription )。
-
onNext: 用于通知订阅者发布者发布了新的数据项。
-
onError: 用于通知订阅者,发布者遇到了异常,不再发布数据事件。
-
onComplete: 用于通知订阅者所有的数据事件都已发布完。
Subscription:
同时,onSubscribe 方法的传入参数引入一个名为 Subscription(订阅)的订阅票据。Subscription 为控制元素的生产提供了基础。有如下方法:
public interface Subscription {
public void request(long n);
public void cancel();
}
-
request: 用于让订阅者通知发布者随后需要发布的元素数量
-
cancel: 用于让订阅者取消发布者随后的事件流。
Processor:
如果实体需要转换进来的项目,并将转换后的项目传递给另一个订阅者,此时需要Processor接口。该接口既是订阅者,又是发布者:
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
Publisher 保证只有在 Subscriber 要求时才发送元素中新的部分。Publisher 的整体实现既可以采用纯粹的阻塞等待,也可以采用仅在 Subscriber 请求下才生成数据的复杂机制。
该规范为我们提供了混合推拉模型,此模型可以对背压进行合理控制。另外,在某些情况下,可以优先考虑纯推模型。响应式流非常灵活,除动态推拉模型外,该规范还提供了独立的推模型和拉模型。根据文档,为了实现纯推模型,我们可以考虑请求 2^63−1 (java.lang.Long.MAX_VALUE)个元素的需求。
这个数字可以被认为是无界的,因为对于当前或可预见的硬件而言,在合理的时间内满足2^ 63 −1 个元素的需求是不可能的(即使每纳秒 1 个元素也需要 292 年)。因此,发布者可以停止跟踪超出此要求的需求。
相反,要切换到纯拉模型,可以在 Subscriber.onNext 方法中请求新元素。
1.3.2、规范实战
具体规范说明详见:响应式流规范-CSDN博客
<dependencies>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck</artifactId>
<version>1.0.3</version>
<scope>test</scope>
</dependency>
</dependencies>
异步发布者:
package blnp.net.cn.jvm.demos.webflux;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.Iterator;
import java.util.Collections;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* <p>
* Publisher的实现,使用指定的Executor异步执行,并为指定的Iterable生成元素。以unicast的形式为Subscriber指定的Iterable生成元素。
* 代码中使用了很多try-catch,用于展示什么时候可以抛异常,什么时候不能抛异常。
* </p>
*
* @author lyb 2045165565@qq.com
* @version 1.0
* @since 2024/9/23 16:33
*/
public class AsyncIterablePublisher<T> implements Publisher<T> {
/**
* 默认的批次大小
**/
private final static int DEFAULT_BATCH_SIZE = 1024;
/**
* 用于生成数据的数据源或生成器
**/
private final Iterable<T> elements;
/**
* 线程池,Publisher使用线程池为它的订阅者异步执行
**/
private final Executor executor;
/**
* 既然使用了线程池,就不要在一个线程中执行太多的任务。此处使用批次大小调节单个线程的执行时长
**/
private final int batchSize;
/**
* 用途:
* @author liaoyibin
* @since 16:41 2024/9/23
* @params [elements, executor]
* @param elements 元素生成器
* @param executor 线程池
* @return
**/
public AsyncIterablePublisher(final Iterable<T> elements, final Executor executor) {
// 调用重载的构造器,使用默认的批次大小,指定的数据源和指定的线程池
this(elements, DEFAULT_BATCH_SIZE, executor);
}
/**
* 用途:构造器,构造Publisher实例
* @author liaoyibin
* @since 16:45 2024/9/23
* @params [elements, batchSize, executor]
* @param elements 元素发生器
* @param batchSize 批次大小
* @param executor 线程池
* @return
**/
public AsyncIterablePublisher(final Iterable<T> elements, final int batchSize, final Executor executor) {
// 如果不指定元素发生器则抛异常
if (elements == null) {
throw null;
}
// 如果不指定线程池,抛异常
if (executor == null) {
throw null;
}
// 如果批次大小小于1抛异常:批次大小必须是大于等于1的值
if (batchSize < 1) {
throw new IllegalArgumentException("batchSize must be greater than zero!");
}
// 赋值元素发生器
this.elements = elements;
// 赋值线程池
this.executor = executor;
// 赋值批次大小
this.batchSize = batchSize;
}
/**
* 用途:订阅者订阅的方法
* <p>
* 规范2.13指出,该方法必须正常返回,不能抛异常等。。。
* 在当前实现中,我们使用unicast的方式支持多个订阅者
* </p>
* @author liaoyibin
* @since 17:06 2024/9/23
* @params [s]
* @param s 订阅者
* @return void
**/
@Override
public void subscribe(final Subscriber<? super T> s) {
new SubscriptionImpl(s).init();
}
/**
* 静态接口:信号
*
**/
static interface Signal {
}
/**
* 取消订阅的信号
**/
enum Cancel implements Signal {Instance;}
/**
* 订阅的信号
**/
enum Subscribe implements Signal {Instance;}
/**
* 发送的信号
**/
enum Send implements Signal {Instance;}
/**
* 静态类,表示请求信号
**/
static final class Request implements Signal {
final long n;
Request(final long n) {
this.n = n;
}
}
/**
* 订阅票据,实现了Subscription接口和Runnable接口
**/
final class SubscriptionImpl implements Subscription, Runnable {
// 需要保有Subscriber的引用,以用于通信
final Subscriber<? super T> subscriber;
// 该订阅票据是否失效的标志
private boolean cancelled = false;
// 跟踪当前请求
// 记录订阅者的请求,这些请求还没有对订阅者回复
private long demand = 0;
// 需要发送给订阅者(Subscriber)的数据流指针
private Iterator<T> iterator;
SubscriptionImpl(final Subscriber<? super T> subscriber) {
// 根据规范,如果Subscriber为null,需要抛空指针异常,此处抛null。
if (subscriber == null) {
throw null;
}
this.subscriber = subscriber;
}
// 该队列记录发送给票据的信号(入站信号),如"request","cancel"等。
// 通过该Queue,可以在Publisher端使用多线程异步处理。
private final ConcurrentLinkedQueue<Signal> inboundSignals = new ConcurrentLinkedQueue<Signal>();
// 确保当前票据不会并发的标志
// 防止在调用订阅者的onXxx方法的时候并发调用。规范1.3规定的不能并发。
private final AtomicBoolean on = new AtomicBoolean(false);
/**
* 注册订阅者发送来的请求
**/
private void doRequest(final long n) {
// 规范规定,如果请求的元素个数小于1,则抛异常
// 并在异常信息中指明错误的原因:n必须是正整数。
if (n < 1) {
terminateDueTo(new IllegalArgumentException(subscriber + " violated the Reactive Streams rule 3.9 by requesting a non-positive number of elements."));
}
//demaind + n < 1表示long型数字越界,表示订阅者请求的元素数量大于Long.MAX_VALUE
else if (demand + n < 1) {
// 根据规范 3.17,当请求的元素数大于Long.MAX_VALUE的时候,将请求数设置为Long.MAX_VALUE即可。
// 此时数据流认为是无界流。
demand = Long.MAX_VALUE;
// 开始向下游发送数据元素。
doSend();
} else {
// 记录下游请求的元素个数
demand += n;
// 开始向下游发送数据元素。
doSend();
}
}
/**
* 规范3.5指明,Subscription.cancel方法必须及时的返回,保持调用者的响应性,还必须是幂等的,必须是线程安全的。因此该方法不能执行密集的计算。
**/
private void doCancel() {
cancelled = true;
}
/**
* 不是在`Publisher.subscribe`方法中同步地调用`subscriber.onSubscribe`方法,而是异步地执行subscriber.onSubscribe方法.
* 这样可以避免在调用线程执行用户的代码。因为在订阅者的onSubscribe方法中要执行Iterable.iterator方法。异步处理也无形中遵循了规范的1.9。
**/
private void doSubscribe() {
try {
// 获取数据源的迭代器
iterator = elements.iterator();
// 如果iterator是null,就重置为空集合的迭代器。我们假设iterator永远不是null值。
if (iterator == null) {
iterator = Collections.<T>emptyList().iterator();
}
} catch (final Throwable t) {
/**
* Publisher发生了异常,此时需要通知订阅者onError信号。
* 但是规范1.9指定了在通知订阅者其他信号之前,必须先通知订阅者onSubscribe信号。
* 因此,此处通知订阅者onSubscribe信号,发送空的订阅票据
**/
subscriber.onSubscribe(new Subscription() {
@Override
public void cancel() {
// 空的
}
@Override
public void request(long n) {
// 空的
}
});
// 根据规范1.9,通知订阅者onError信号
terminateDueTo(t);
}
if (!cancelled) {
// 为订阅者设置订阅票据。
try {
// 此处的this就是Subscription的实现类SubscriptionImpl的对象。
subscriber.onSubscribe(this);
} catch (final Throwable t) {
/**
* Publisher方法抛异常,此时需要通知订阅者onError信号。
* 但是根据规范2.13,通知订阅者onError信号之前必须先取消该订阅者的订阅票据。
* Publisher记录下异常信息。
**/
terminateDueTo(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", t));
}
// 立即处理已经完成的迭代器
boolean hasElements = false;
try {
// 判断是否还有未发送的数据,如果没有,则向订阅者发送onComplete信号
hasElements = iterator.hasNext();
} catch (final Throwable t) {
/**
* 规范的1.4规定:
* 如果hasNext发生异常,必须向订阅者发送onError信号,发送信号之前先取消订阅,发送少于订阅者请求的onNext信号。
**/
terminateDueTo(t);
}
/**
* 如果没有数据发送了,表示已经完成,直接发送onComplete信号终止订阅票据。
* 规范1.3规定,通知订阅者onXxx信号,必须串行,不能并发。
**/
if (!hasElements) {
try {
/**
* 规范1.6指明,在通知订阅者onError或onComplete信号之前,必须先取消订阅者的订阅票据。
* 在发送onComplete信号之前,考虑一下,有可能是Subscription取消了订阅。
**/
doCancel();
subscriber.onComplete();
} catch (final Throwable t) {
// 规范2.13指出,onComplete信号不允许抛异常,因此此处只能记录下来日志
(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onComplete.", t)).printStackTrace(System.err);
}
}
}
}
/**
* 向下游发送元素的方法
**/
private void doSend() {
try {
// 为了充分利用Executor,我们最多发送batchSize个元素,然后放弃当前线程,重新调度,通知订阅者onNext信号。
int leftInBatch = batchSize;
do {
T next;
boolean hasNext;
try {
// 在订阅的时候已经调用过hasNext方法了,直接获取元素
next = iterator.next();
// 检查还有没有数据,如果没有,表示流结束了
hasNext = iterator.hasNext();
} catch (final Throwable t) {
// 1.4规定:如果next方法或hasNext方法抛异常(用户提供的),认为流抛异常了发送onError信号
terminateDueTo(t);
return;
}
// 向下游的订阅者发送onNext信号
subscriber.onNext(next);
// 如果已经到达流的结尾
if (!hasNext) {
// 1.6规定:首先考虑是票据取消了订阅
doCancel();
// 发送onComplete信号给订阅者
subscriber.onComplete();
}
} while (!cancelled // 如果没有取消订阅。
&& --leftInBatch > 0 // 如果还有剩余批次的元素。
&& --demand > 0); // 如果还有订阅者的请求。
// 如果订阅票据没有取消,还有请求,通知自己发送更多的数据
if (!cancelled && demand > 0) {
signal(Send.Instance);
}
} catch (final Throwable t) {
// 2.13规定:如果到这里,只能是onNext或onComplete抛异常,只能取消。
// 确保已取消,因为是Subscriber的问题
doCancel();
// 记录错误信息
(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onNext or onComplete.", t)).printStackTrace(System.err);
}
}
/**
* 用途:必须 先取消订阅者的订阅票据(`Subscription`)。
* <p>
* 规范1.6指出,`Publisher`在通知订阅者`onError`或者`onComplete`信号之前,必须先取消订阅
* </p>
* @author liaoyibin
* @since 10:50 2024/9/24
* @params [t]
* @param t
* @return void
**/
private void terminateDueTo(final Throwable t) {
// 当发送onError之前,先取消订阅票据
cancelled = true;
try {
// 给下游Subscriber发送onError信号
subscriber.onError(t);
} catch (final Throwable t2) {
// 规范1.9指出,onError不能抛异常。
// 如果onError抛异常,只能记录信息。
(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err);
}
}
/**
* 该方法异步地给订阅票据发送指定信号
**/
private void signal(final Signal signal) {
// 入站信号的队列,不需要检查是否为null,因为已经实例化过ConcurrentLinkedQueue了
// 将信号添加到入站信号队列中
if (inboundSignals.offer(signal)) {
// 信号入站成功,调度线程处理
tryScheduleToExecute();
}
}
// 主事件循环
@Override
public final void run() {
// 与上次线程执行建立happens-before关系,防止并发执行
// 如果on.get()为false,则不执行,线程退出
// 如果on.get()为 true,则表示没有线程在执行,当前线程可以执行
if (on.get()) {
try {
// 从队列取出一个入站信号
final Signal s = inboundSignals.poll();
// 规范1.8:如果`Subscription`被取消了,则必须最终停止向`Subscriber`发送通知。
// 规范3.6:如果取消了`Subscription`,则随后调用`Subscription.request(long n)`必须是无效的(NOPs)。
// 如果订阅票据没有取消
if (!cancelled) {
// 根据信号的类型调用对应的方法进行处理
if (s instanceof Request) {
// 请求
doRequest(((Request) s).n);
} else if (s == Send.Instance) {
// 发送
doSend();
} else if (s == Cancel.Instance) {
// 取消
doCancel();
} else if (s == Subscribe.Instance) {
// 订阅
doSubscribe();
}
}
} finally {
// 保证与下一个线程调度的happens-before关系
on.set(false);
// 如果还有信号要处理
if (!inboundSignals.isEmpty()) {
// 调度当前线程进行处理
tryScheduleToExecute();
}
}
}
}
/**
* 该方法确保订阅票据同一个时间在同一个线程运行
* 规范1.3规定,调用`Subscriber`的`onSubscribe`,`onNext`,`onError`和`onComplete`方法必须串行,不允许并发。
**/
private final void tryScheduleToExecute() {
// CAS原子性地设置on的值为true,表示已经有一个线程正在处理了
if (on.compareAndSet(false, true)) {
try {
// 向线程池提交任务运行
executor.execute(this);
// 如果不能使用Executor执行,则优雅退出
} catch (Throwable t) {
if (!cancelled) {
// 首先,错误不可恢复,先取消订阅
doCancel();
try {
// 停止
terminateDueTo(new IllegalStateException("Publisher terminated due to unavailable Executor.", t));
} finally {
// 后续的入站信号不需要处理了,清空信号
inboundSignals.clear();
// 取消当前订阅票据,但是让该票据处于可调度状态,以防清空入站信号之后又有入站信号加入。
on.set(false);
}
}
}
}
}
/**
* 用途:Subscription.request的实现,接收订阅者的请求给Subscription,等待处理。
* @author liaoyibin
* @since 11:02 2024/9/24
* @params [n]
* @param n 订阅者请求的元素数量
* @return void
**/
@Override
public void request(final long n) {
signal(new Request(n));
}
/**
* 用途:订阅者取消订阅。
* <p>
* Subscription.cancel的实现,用于通知Subscription,Subscriber不需要更多元素了。
* </p>
* @author liaoyibin
* @since 11:05 2024/9/24
* @params []
* @param
* @return void
**/
@Override
public void cancel() {
signal(Cancel.Instance);
}
/**
* 用途:init方法的设置,用于确保SubscriptionImpl实例在暴露给线程池之前已经构造完成
* 因此,在构造器一完成,就调用该方法,仅调用一次。先发个信号试一下
* @author liaoyibin
* @since 11:06 2024/9/24
* @params []
* @param
* @return void
**/
void init() {
signal(Subscribe.Instance);
}
}
}
异步订阅者:
package blnp.net.cn.jvm.demos.webflux;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* <p>
* 基于Executor的异步运行的订阅者实现,一次请求一个元素,然后对每个元素调用用户定义的方法进行处理。
* 注意:该类中使用了很多try-catch用于说明什么时候可以抛异常,什么时候不可以抛异常
* </p>
*
* @author lyb 2045165565@qq.com
* @version 1.0
* @since 2024/9/24 11:27
*/
public abstract class AsyncSubscriber<T> implements Subscriber<T>, Runnable {
/**
* Signal表示发布者和订阅者之间的异步协议
**/
private static interface Signal {
}
/**
* 表示数据流发送完成,完成信号
**/
private enum OnComplete implements Signal {Instance;}
/**
* 表示发布者给订阅者的异常信号
**/
private static class OnError implements Signal {
public final Throwable error;
public OnError(final Throwable error) {
this.error = error;
}
}
/**
* 表示下一个数据项信号
**/
private static class OnNext<T> implements Signal {
public final T next;
public OnNext(final T next) {
this.next = next;
}
}
/**
* 表示订阅者的订阅成功信号
**/
private static class OnSubscribe implements Signal {
public final Subscription subscription;
public OnSubscribe(final Subscription subscription) {
this.subscription = subscription;
}
}
/**
* 订阅单据,根据规范3.1,该引用是私有的
**/
private Subscription subscription;
/**
* 用于表示当前的订阅者是否处理完成
**/
private boolean done;
/**
* 根据规范的2.2条款,使用该线程池异步处理各个信号
**/
private final Executor executor;
/**
* 用途:仅有这一个构造器,只能被子类调用。传递一个线程池即可
* @author liaoyibin
* @since 14:21 2024/9/24
* @params [executor]
* @param executor 线程池对象
* @return
**/
protected AsyncSubscriber(Executor executor) {
if (executor == null) {
throw null;
}
this.executor = executor;
}
/**
* 用途:幂等地标记当前订阅者已完成处理,不再处理更多的元素。因此,需要取消订阅票据(Subscription)
* @author liaoyibin
* @since 14:22 2024/9/24
* @params []
* @param
* @return void
**/
private final void done() {
/**
* 在此处,可以添加done,对订阅者的完成状态进行设置;
* 虽然规范3.7规定Subscription.cancel()是幂等的,我们不需要这么做
* 当whenNext方法抛异常,认为订阅者已经处理完成(不再接收更多元素)
**/
done = true;
if (subscription != null) {
try {
// 取消订阅票据
subscription.cancel();
} catch (final Throwable t) {
// 根据规范条款3.15,此处不能抛异常,因此只是记录下来。
(new IllegalStateException(subscription + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t)).printStackTrace(System.err);
}
}
}
protected abstract boolean whenNext(final T element);
/**
* 当OnComplete信号到达时,会调用此方法并重写此方法以实现您自己的自定义onComplete逻辑。
**/
protected void whenComplete() {
}
/**
* 如果OnError信号到达,则会调用此方法并重写此方法以实现您自己的自定义onError逻辑。
**/
protected void whenError(Throwable error) {
}
/**
* 订阅逻辑处理
**/
private final void handleOnSubscribe(final Subscription s) {
if (s == null) {
//在这里获得空“订阅”无效,因此忽略它。
} else if (subscription != null) {
//如果发现有重复添加当前订阅的,那么此时进行优雅的处理操作
try {
//取消额外订阅以遵循规则2.5
s.cancel();
} catch (final Throwable t) {
//根据规则3.15,订阅.取消不允许引发异常,因此只做记录异常信息
(new IllegalStateException(s + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t)).printStackTrace(System.err);
}
} else {
//如果我们想成为同步“Subscriber”,我们必须在使用它之前将其分配给本地,因为根据规则3.10,订阅可以从“request”内同步调用“onNext”
subscription = s;
try {
/**
* 我们的订阅者是无缓冲且适度的,它一次请求一个元素
* 如果我们想要元素,根据规则2.1,我们需要调用“request”,并且根据规则3.2,我们可以从“onSubscribe”方法内同步调用该元素
**/
s.request(1);
} catch (final Throwable t) {
//根据规则3.16,不允许抛出订阅.请求
(new IllegalStateException(s + " violated the Reactive Streams rule 3.16 by throwing an exception from request.", t)).printStackTrace(System.err);
}
}
}
/**
* 处理下游数据流
**/
private final void handleOnNext(final T element) {
//如果发现当前订阅尚未完成则继续处理
if (!done) {
//从技术上讲,不需要此检查,因为我们希望出版商符合规范
if (subscription == null) {
//检查是否违反2.1和1.09的规范
(new IllegalStateException("Someone violated the Reactive Streams rule 1.09 and 2.1 by signalling OnNext before `Subscription.request`. (no Subscription)")).printStackTrace(System.err);
} else {
try {
if (whenNext(element)) {
try {
//我们的订阅者是无缓冲且适度的,它一次请求一个元素
subscription.request(1);
} catch (final Throwable t) {
//根据规则3.16,不允许抛出订阅.请求
(new IllegalStateException(subscription + " violated the Reactive Streams rule 3.16 by throwing an exception from request.", t)).printStackTrace(System.err);
}
} else {
//根据规则2.6,这是合法的
done();
}
} catch (final Throwable t) {
done();
try {
onError(t);
} catch (final Throwable t2) {
//根据规则2.13,Subscriber.onMessage 不允许引发异常
(new IllegalStateException(this + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err);
}
}
}
}
}
private void handleOnComplete() {
//从技术上讲,不需要此检查,因为我们希望出版商符合规范
if (subscription == null) {
//根据规则1.09,发布商不允许在 onSubscribe 之前发送onComplete信号
(new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onComplete prior to onSubscribe.")).printStackTrace(System.err);
} else {
//遵守规则2.4
done = true;
whenComplete();
}
}
private void handleOnError(final Throwable error) {
//规范性检查
if (subscription == null) {
//根据规则1.09,发布商不允许在 onSubscribe 之前发送onComplete信号
(new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onError prior to onSubscribe.")).printStackTrace(System.err);
} else {
//遵守规则2.4
done = true;
whenError(error);
}
}
@Override
public final void onSubscribe(final Subscription s) {
//根据规则2.13,如果“Subscription”为“空”,我们需要抛出“java.lang.NullPointerResponse”
if (s == null) {
throw null;
}
signal(new OnSubscribe(s));
}
@Override
public final void onNext(final T element) {
//同上
if (element == null) {
throw null;
}
signal(new OnNext<T>(element));
}
@Override
public final void onError(final Throwable t) {
//同上
if (t == null) {
throw null;
}
signal(new OnError(t));
}
@Override
public final void onComplete() {
signal(OnComplete.Instance);
}
/**
* 此 “ConcurrentLinkedQueue”将跟踪发送到此 “Subscriber” 的信号,例如“OnComplete”和“OnNext”,并遵守规则2.11
*/
private final ConcurrentLinkedQueue<Signal> inboundSignals = new ConcurrentLinkedQueue<Signal>();
/**
* 根据规范2.7和2.11,使用原子变量确保不会有多个订阅者线程并发执行。
*/
private final AtomicBoolean on = new AtomicBoolean(false);
@SuppressWarnings("unchecked")
@Override
public final void run() {
// 跟上次线程执行建立happens-before关系,防止多个线程并发执行
if (on.get()) {
try {
// 从入站队列取出信号
final Signal s = inboundSignals.poll();
// 根据规范条款2.8,如果当前订阅者已完成,就不需要处理了。
if (!done) {
// 根据信号类型调用对应的方法来处理
if (s instanceof OnNext<?>) {
handleOnNext(((OnNext<T>) s).next);
} else if (s instanceof OnSubscribe) {
handleOnSubscribe(((OnSubscribe) s).subscription);
}
else if (s instanceof OnError) {
// 根据规范2.10,必须处理onError信号,不管有没有调用过Subscription.request(long n)方法
handleOnError(((OnError) s).error);
}
else if (s == OnComplete.Instance) {
// 根据规范2.9,必须处理onComplete信号,不管有没有调用过Subscription.request(long n)方法
handleOnComplete();
}
}
} finally {
// 保持happens-before关系,然后开始下一个线程调度执行
on.set(false);
// 如果入站信号不是空的,调度线程处理入站信号
if (!inboundSignals.isEmpty()) {
// 调度处理入站信号
tryScheduleToExecute();
}
}
}
}
/**
* 该方法异步地向订阅票据发送信号
**/
private void signal(final Signal signal) {
// 信号入站,线程池调度处理
// 不需要检查是否为null,因为已经实例化了。
if (inboundSignals.offer(signal)) {
// 线程调度处理
tryScheduleToExecute();
}
}
/**
* 确保订阅者一次仅在一个线程执行
* 调度执行
*/
private final void tryScheduleToExecute() {
// 使用CAS原子性地修改变量on的值改为true。
if (on.compareAndSet(false, true)) {
try {
// 提交任务,多线程执行
executor.execute(this);
} catch (Throwable t) {
// 根据规范条款2.13,如果不能执行线程池的提交方法,需要优雅退出
if (!done) {
try {
// 由于错误不可恢复,因此取消订阅票据
done();
} finally {
// 不再需要处理入站信号,清空之
inboundSignals.clear();
// 由于订阅票据已经取消,但是此处依然让订阅者处于可调度的状态,以防在清空入站信号之后又有信号发送过来
// 因为信号的发送是异步的
on.set(false);
}
}
}
}
}
}
测试类:
package blnp.net.cn.jvm.demos.webflux;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* <p></p>
*
* @author lyb 2045165565@qq.com
* @version 1.0
* @since 2024/9/24 14:52
*/
public class ReactiveTest {
public static void main(String[] args) {
Set<Integer> elements = new HashSet<>();
for (int i = 0; i < 10; i++) {
elements.add(i);
}
final ExecutorService executorService = Executors.newFixedThreadPool(5);
AsyncIterablePublisher<Integer> publisher = new AsyncIterablePublisher<>(elements, executorService);
final AsyncSubscriber<Integer> subscriber = new AsyncSubscriber<Integer>(Executors.newFixedThreadPool(2)) {
@Override
protected boolean whenNext(Integer element) {
System.out.println("接收到的流元素:" + element);
return true;
}
};
publisher.subscribe(subscriber);
}
}
1.3.3、技术兼容套件
1、TCK
响应式流看着比较简单,实际上包含许多隐藏的陷阱。 除 Java 接口之外,该规范还包含许多针对实现的文档化规则。 这些规则严格限制每个接口,同时,保留规范中提到的所有行为至关重要。
开发人员需要一个可以验证所有行为并确保响应式库标准化且相互兼容的通用工具。Konrad Malawski 已经为此实现了一个工具包,其名称为响应式流技术兼容套件(Reactive Streams Technology Compatibility Kit),简称为 TCK。
TCK 是一组 TestNG 测试用例,需要对其进行扩展,并为相应的 Publisher 或 Subscriber 准备验证。所有测试的命名都对应指定的规则。例如,可以在 org.reactivestreams.tck.PublisherVerification 中找到如下所示的一个测试用例示例:
@Override
@Test
public void required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements() throws Throwable {
activePublisherTest(5, false, new PublisherTestRun < T > () {
@Override
public void run(Publisher < T > pub) throws InterruptedException {
// 对被测发布者的手动订阅。响应式流的 TCK 提供了自己的测试类,该测试类可以验证特定的行为。
ManualSubscriber < T > sub = env.newManualSubscriber(pub);
try {
// 根据规则1.01对给定Publisher的行为进行了特定验证。在这种情况下,我们证实Publisher 发出的元素不能比 Subscriber请求的更多。
sub.expectNone(String.format("Publisher %s produced value before the first `request`: ", pub));
sub.request(1);
sub.nextElement(String.format("Publisher %s produced no element after first `request` ", pub));
sub.expectNone(String.format("Publisher %s produced unrequested:", pub));
sub.request(1);
sub.request(2);
sub.nextElements(3, env.defaultTimeoutMillis(),
String.format("Publisher %s produced less than 3 elements after two respective `request` calls ", pub));
sub.expectNone(String.format("Publisher %sproduced unrequested ", pub));
} finally {
// Subscription 的取消阶段。一旦测试通过或失败,我们将关闭打开的资源并完成交互, 并使用 ManualSubscriber API 取消对 Publisher 的订阅。
sub.cancel();
}
}
});
}
2、发布者验证
Maven依赖:
<dependencies>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck</artifactId>
<version>1.0.3</version>
<!-- <scope>test</scope>-->
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.9.10</version>
<!-- <scope>test</scope>-->
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
</dependency>
</dependencies>
验证程序:
package blnp.net.cn.jvm.demos.webflux;
import org.reactivestreams.Publisher;
import org.reactivestreams.tck.PublisherVerification;
import org.reactivestreams.tck.TestEnvironment;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* <p></p>
*
* @author lyb 2045165565@qq.com
* @version 1.0
* @since 2024/9/30 11:51
*/
public class TckTest extends PublisherVerification<String> {
public TckTest() {
super(new TestEnvironment());
}
@Override
public Publisher<String> createPublisher(long elements) {
Set<String> set = new HashSet<>();
for (long i = 0; i < elements; i++) {
set.add("hello-" + i);
}
ExecutorService executorService = Executors.newFixedThreadPool(5);
AsyncIterablePublisher publisher = new AsyncIterablePublisher(set,executorService);
return publisher;
}
@Override
public Publisher<String> createFailedPublisher() {
Set set = new HashSet<>();
set.add(new RuntimeException("手动异常"));
ExecutorService executorService = Executors.newFixedThreadPool(5);
return new AsyncIterablePublisher(set, executorService);
}
@Override
public long maxElementsFromPublisher() {
return 10;
}
}
只遵循上述测试配置,无法检查该 Publisher 的准确性,因为许多测试用例假设流中存在多个元素。响应式流 TCK 考虑到了这种极端情况,并支持设置一个名为 maxElementsFromPublisher()的附加方法,该方法返回一个值,用于指示生成元素的最大数量:
@Override
public long maxElementsFromPublisher() {
// 默认值为Long.MAX_VALUE - 1
// return super.maxElementsFromPublisher();
return 10;
}
一方面,重写该方法可以跳过需要多个元素的测试;另一方面,响应式流规则的覆盖范围将减小,可能需要实现自定义测试用例。
3、订阅者验证
上述配置是启动测试生产者行为所需的最小配置。但是,除了 Publisher 的实例,还需要测试 Subscriber 实例。响应式流规范中针对 Subscriber 的那组规则没有针对Publisher 的规则那么复杂,但仍然需要满足所有需求。有两种不同的测试套件可以测试AsyncSubscriber。
第一个名为 org.reactivestreams.tck.SubscriberBlackboxVerification,它可以用于在不知道内部的细节和修改的情况下验证 Subscriber。
当 Subscriber 来自外部代码库并且我们没有合法的途径来扩展行为时,黑盒(Blackbox)验证是一个有用的测试工具包。另外,黑盒验证仅涵盖一部分规则,并不能确保实现完全正确。要了解如何检查 AsyncSubscriber,首先就要实现黑盒验证测试:
import org.reactivestreams.Subscriber;
import org.reactivestreams.tck.SubscriberBlackboxVerification;
import org.reactivestreams.tck.TestEnvironment;
import java.util.concurrent.Executors;
public class TCKBlackBoxTest extends SubscriberBlackboxVerification <Integer> {
protected TCKBlackBoxTest() {
super(new TestEnvironment());
}
@Override
public Subscriber <Integer> createSubscriber() {
AsyncSubscriber subscriber = new AsyncSubscriber(Executors.newFixedThreadPool(2)) {
@Override
protected boolean whenNext(Object element) {
System.out.println("接收到的元素:" + element);
// 返回true,接着请求下个元素,false表示不再请求了
return true;
}
};
return subscriber;
}
@Override
public Integer createElement(int element) {
return element;
}
// @Override
// public void triggerRequest(Subscriber<? super Integer> s) {
// // 在该方法直接向订阅者发射信号。默认该方法什么都不做。
// AsyncSubscriber<Integer> subscriber = (AsyncSubscriber<Integer>) s;
// subscriber.onNext(10000);
// }
}
上述示例展示了可用于 Subscriber 验证的 API。除了两个必需的方法 createSubscriber和createElement,还有一个额外的方法可以从外部处理 Subscription#request 方法。在该例中,能模拟真实的用户活动,是一个有用的补充。
第二个测试工具包被称为 org.reactivestreams.tck.SubscriberWhiteboxVerification。为了通过验证,Subscriber 需要提供与 WhiteboxSubscriberProbe 的额外交互:
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.tck.SubscriberWhiteboxVerification;
import org.reactivestreams.tck.TestEnvironment;
import java.util.concurrent.Executors;
public class TCKWhiteBoxTest extends SubscriberWhiteboxVerification <Integer> {
protected TCKWhiteBoxTest() {
super(new TestEnvironment());
}
@Override
public Subscriber <Integer> createSubscriber(WhiteboxSubscriberProbe <Integer> probe) {
AsyncSubscriber subscriber = new AsyncSubscriber(Executors.newFixedThreadPool(2)) {
@Override
protected boolean whenNext(Object element) {
System.out.println("接收到的元素:" + element);
// 返回true,接着请求下个元素,false表示不再请求了
return true;
}
@Override
public void onSubscribe(Subscription subscription) {
super.onSubscribe(subscription);
probe.registerOnSubscribe(new SubscriberPuppet() {
@Override
public void triggerRequest(long elements) {
subscription.request(elements);
}
@Override
public void signalCancel() {
subscription.cancel();
}
});
}
@Override
public void onNext(Object element) {
super.onNext(element);
// 注册钩子
probe.registerOnNext((Integer) element);
}
@Override
public void onError(Throwable t) {
super.onError(t);
probe.registerOnError(t);
}
@Override
public void onComplete() {
super.onComplete();
probe.registerOnComplete();
}
};
return subscriber;
}
@Override
public Integer createElement(int element) {
return element;
}
}
createSubscriber 方法实现与黑盒验证的工作方式相同,并返回Subscriber 实例,但此处还有一个名为 WhiteboxSubscriberProbe 的附加参数。在这种情况下,WhiteboxSubscriberProbe 代表了一种机制,该机制可以实现对需求的嵌入式控制和输入信号的捕获。
与黑盒验证相比,通过正确注册探针钩子,测试套件不仅能够发送需求,还能验证需求是否被满足以及所有元素是否被接受。同时,需求监管机制比以前更加透明。我们实现了SubscriberPuppet,它会为直接访问收到的 Subscription 进行适配。
正如上例所述,与黑盒验证相反,白盒(whitebox)需要扩展 Subscriber,以在内部提供额外的钩子。虽然白盒测试覆盖了更多的规则,且这些规则确保了所测试 Subscriber 的行为正确,但是当我们想要避免一个类被扩展时,这可能是不可接受的。
1.4、响应式流的异步和并行
一方面,响应式流 API 中的规则 2.2 和 3.4 规定,对由 Publisher 生成并由 Subscriber消费的所有信号的处理过程应该是非阻塞和非干扰的。因此,基于具体的执行环境,可以高效地利用处理器的一个节点或一个内核。
另一方面,所有处理器或内核的高效利用需要并行化。对响应式流规范中的并行化概念的通常理解可以解释为对 Subscriber#onNext 方法的并行调用。
遗憾的是,规范中的规则 1.3 规定必须以线程安全的方式触发 onXxx 方法的调用,并且如果由多个线程执行,则使用外部同步。这一点假定对所有 onXxx 方法的串行化或简单顺序调用。反过来,这意味着无法创建类似ParallelPublisher 的组件并在流中对元素执行并行处理。因此,问题是如何高效地利用资源。
通常的管道处理(涉及数据源和最终目的地)包括一些处理或转换阶段。同时,每个处理阶段可能花费大量处理时间并延迟其他执行。一种解决方案是在阶段之间传递异步消息。对基于内存的流处理而言,这意味着执行过程的一部分被绑定到一个线程而另一部分被绑定到另一个线程。例如,最终元素消费可能是 CPU 密集型任务,而它将在单独的线程上进行合理处理,见下图:
通常的做法是:在两个独立的线程之间拆分处理过程,在阶段之间放置异步边界。又因为两个线程可以彼此独立地工作,所以通过这样做,将元素的整体处理过程并行化。为了实现并行化,必须应用一种数据结构(例如 Queue)来正确地解耦处理过程。这样,线程 A 内的处理过程独立地提供数据项给 Queue,而在线程 B 内的 Subscriber 则独立地消费来自相同Queue 的数据项。
拆分线程之间的处理过程会导致数据结构中的额外开销。当然,由于响应式流规范,这样的数据结构是有界的。数据结构中的数据项数量通常等于 Subscriber 从其 Publisher 请求的批处理的大小,而这取决于系统的一般容量。
除此之外,流处理部分应该连接到哪个异步边界?
- 将处理流附加到数据源资源,并且使所有操作都在与数据源相同的边界内执行。数据源这一侧的边界内,数据通过管道流式处理。
- 处理过程连接到目的地或消费者线程,可以在元素生产过程为 CPU 密集型任务的场景下使用。
- 发生在生产和消费是 CPU 密集型任务时。因此,在单独的线程对象上运行它,见下图
每个处理阶段可以绑定到一个单独的线程。需要注意的是,在不同的线程之间对处理过程进行拆分不仅不是自由的,还应该在合理的资源消费之间进行平衡,以实现边界(线程和附加数据结构)和高效的元素处理。
1.5、响应式环境的转变
1.5.1、RxJava 的转变
RxJava 提供了一个额外的模块,可以轻松地将一种响应式类型转换为另一种。比如如何将 Observable<T> 转换为 Publisher<T> 并将 rx.Subscriber<T> 转换为 org.reactivestreams.Subscriber<T> ?
假设有一个应用程序把 RxJava 1.x 和 Observable 作为组件之间的核心通信类型,如下例所示:
interface LogService {
Observable <String> stream();
}
遵循响应式流规范并从以下特定依赖项中抽象出我们的接口:
interface LogService {
Publisher<String> stream();
}
用 Publisher 替换了 Observable。但是,实现所需的重构可能比仅更换返回类型花费的时间要多。不过也可以很容易地将现有的 Observable 调整为 Publisher,如下例所示:
// RxLogService 类声明。该类表示旧的基于 Rx 的实现
class RxLogService implements LogService {
// 使用 RxNettyHttpClient,它能使用封装在基于 RxJava API 中的 Netty Client 以异步、非阻塞方式与外部服务进行交互。
final HttpClient<...> rxClient = HttpClient.newClient(...);
@Override
public Publisher<String> stream() {
// 通过使用创建的 HttpClient 实例,从外部服务请求日志流,并将传入的元素转换为String 实例。
Observable<String> rxStream = rxClient.createGet("/logs")
.flatMap(...)
.map(Utils::toString);
// 使用 RxReactiveStreams 库对 Publisher 进行的 rxStream 适配
return RxReactiveStreams.toPublisher(rxStream);
}
}
可以注意到,RxJava 的开发人员关心我们并提供了一个额外的 RxReactiveStreams 类,使我们可以将 Observable 转换为响应式流中的 Publisher。此外,随着响应式流规范的出现,RxJava 开发人员还提供了非标准化的背压支持,以使转换后的 Observable 兼容响应式流规范。
除将 Observable 转换为 Publisher 之外,我们还可以将 rx.Subscriber 转换为org.reactivestreams.Subscriber。例如,日志流先前是存储在文件中。为此,我们实现了自定义Subscriber 以负责 I/O 交互。如此,迁移到响应式流规范的代码变换如下所示:
class RxFileService implements FileService {
@Override
public void writeTo(String file, Publisher<String> content) {
AsyncFileSubscriber rxSubscriber = new AsyncFileSubscriber(file);
content.subscribe(RxReactiveStreams.toSubscriber(rxSubscriber));
}
}
- 这是 RxFileService 类声明。
- 这是 writeTo 方法实现,它接受 Publisher 作为组件之间交互的核心类型。
- 这是基于 RxJava 的 AsyncFileSubscriber 实例声明。
- 这是 content 订阅。要重用基于 RxJava 的 Subscriber,我们需要使用相同的 RxReactiveStreams 实用工具类对其进行适配。
从前面的示例中可以看出,RxReactiveStreams 提供了一个丰富的转换器列表,使我们可以将RxJava API 转换为响应式流 API。同样,任何 Publisher<T>都可以转换回 RxJava Observable:
Publisher<String> publisher = ...;
RxReactiveStreams.toObservable(publisher).subscribe();
总体上讲,RxJava 会以某种方式开始遵循响应式流规范。遗憾的是,由于向后兼容性,实现该规范是不可能的,并且将来为 RxJava 1.x 实现响应式流规范扩展的计划也不存在。此外,从2018 年 3 月 31 日开始,对 RxJava 1.x 的支持停止。
Dávid Karnok 是RxJava的第 2 版之父,他显著改进了整个库的设计,并引入了符合响应式流规范的其他类型。虽然由于向后兼容性,Observable继续维持不变,但同时,RxJava 2 提供了名为 Flowable 的新响应式类型。
Flowable 响应式类型虽然提供与 Observable 相同的 API,但会从头开始扩展org.reactive.streams.Publisher。如下一个示例所示,Flowable 中嵌入了流式 API,可以转换为任何常见的 RxJava 类型并反向转化为响应式流兼容类型:
Flowable.just(1, 2, 3)
.map(String::valueOf)
.toObservable()
.toFlowable(BackpressureStrategy.ERROR)
.subscribe();
从 Flowable 到 Observable 的转换只是一个操作符的简单应用。但是,要将 Observable 转换回Flowable,就必须提供一些可用的背压策略。在 RxJava 2 中,Observable 被设计为仅推送流。因此,保证转换后的 Observable 符合响应式流规范至关重要。
1.5.2、Vert.x 的转变
随着 RxJava 的转变,其他响应式库和框架提供商也开始采用响应式流规范。为了遵循规范,Vert.x 包含一个额外的模块,该模块为响应式流 API 提供支持。以下示例演示了该模块:
// ...
.requestHandler(request -> {
ReactiveReadStream<Buffer> rrs = ReactiveReadStream.readStream();
HttpServerResponse response = request.response();
Flowable<Buffer> logs = Flowable.fromPublisher(logsService.stream())
.map(Buffer::buffer)
.doOnTerminate(response::end);
logs.subscribe(rrs);
response.setStatusCode(200);
response.setChunked(true);
response.putHeader("Content-Type", "text/plain");
response.putHeader("Connection", "keep-alive");
Pump.pump(rrs, response).start();
}
);
// ...
关键点解释说明:
- 这是请求处理程序声明。这是一个通用请求处理程序,能处理发送到服务器的任何请求。
- 这是 Subscriber 和 HTTP 响应声明。这里的 ReactiveReadStream 同时实现了 org.reactivestreams.Subscriber 和 ReadStream,而二者能将任何 Publisher 转换为与 Vert.xAPI 兼容的数据源。
- 这是处理流程声明。在该示例中,我们引用新的基于响应式流的 LogsService 接口,并编写对流中元素的函数式转换,我们使用 RxJava 2.x 中的 Flowable API。
- 这是订阅阶段。一旦声明了处理流程,我们就可以将 ReactiveReadStream 订阅到Flowable。
- 这是一个响应准备阶段。
- 这是发送给客户端的最终响应。这里,Pump 类在一个复杂的背压控制机制中起着重要作用,以防止底层的 WriteStream 缓冲区过满。
我们可以看到,Vert.x 没有提供用于编写元素处理流的流式 API。但是,它提供了一个 API,能将任何 Publisher 转换为 Vert.x API,从而维持响应式流的复杂背压管理。
1.5.3、Ratpack 的改进
除了 Vert.x,另一个名为 Ratpack 的著名 Web 框架也提供对响应式流的支持。与 Vert.x 相比,Ratpack 提供了对响应式流的直接支持。例如,在使用 Ratpack 的场景下,发送日志流的代码如下所示:
RatpackServer.start(server -> server.handlers(chain -> chain.all(ctx -> {
Publisher < String > logs = logsService.stream();
ServerSentEvents events = serverSentEvents(
logs, event ->
event.id(Objects::toString).event("log").data(Function.identity());
);
ctx.render(events);
}
)));
关键点说明:
- 这是服务器启动的操作和请求处理程序声明。
- 这是日志流声明。
- 这是 ServerSentEvents 的准备工作。这里,上述类在映射阶段起作用,该阶段将 Publisher中的元素转换为服务器端发送事件的表示方式。如我们所见,ServerSentEvents 强制要求映射函数声明,而该声明描述了如何将元素映射到特定的 Event 字段。
- 这是流到 I/O 的呈现。
如示例所示,Ratpack 在内核中提供对响应式流的支持。现在,我们可以重用相同的LogService#stream 方法,而无须提供额外的类型转换或要求其他模块添加对特定响应式库的支持。
此外,与只提供对响应式流规范的简单支持的 Vert.x 相比,Ratpack 还提供了对规范接口的自身实现。此功能包含在 ratpack.stream.Streams 类中,类似于 RxJava API:
Publisher<String> logs = logsService.stream();
TransformablePublisher publisher = Streams.transformable(logs)
.filter(this::filterUserSensitiveLogs)
.map(this::escape);
在这里,Ratpack 提供了一个静态工厂,它可以将任何 Publisher 转换为TransformablePublisher,使我们可以使用熟悉的操作符和转换阶段灵活地处理事件流。