响应式编程
- 为什么要有响应式编程?
- 响应式编程的用法
- Flow api的用法
- 处理器
为什么要有响应式编程?
传统编码,操作流程常见的是命令式编程范式,如对于一个请求或操作来说,都是串行执行,直到异常或执行结束;
这种方式的优点是简洁明了,逻辑清晰,但缺点也很明显,性能问题严重;
如果一个接口里的逻辑是先查DB,再请求第三方接口,这两步操作都是阻塞操作,需要耗费大量IO时间。于是可以使用CompletableFuture编排异步线程来优化这两步操作,这样主线程就可以释放CPU时间片了(实现了主线程的异步),这种方式叫做部分响应式编程。而且在查DB和访问第三方接口,各自维护一个异步线程,都有IO阻塞,为了解决这个问题,就要使用非阻塞的方式,而NIO就叩开了非阻塞的大门,其连接、读、写都是非阻塞的,只需一个线程就可以处理以上两步骤,节省JVM维护多个线程的成本。所以编排异步线程+非阻塞的方式就是完全响应式编程;
命令式编程和响应式编程都是一种编程范式;
编程范式是指编程语言的设计和使用方式;
响应式编程的用法
接口操作一般就是对方法的调用,
而命令式编程的步骤,一般就是定义入参,处理业务,再异常或返回;
而响应式编程,是基于Stream流,可看做水流,简单来说,是定义流源头,然后往下流,使用链式处理多个步骤,最后流向最后,就是异常或响应信号。
响应式编程的落地实现有Flow api、Reactor api,Spring WebFlux;
Flow api的用法
从Java 9开始就提供了响应式编程的API,叫做Flow。它实际上是拷贝了Reactive Stream的四个接口定义,然后放在juc包的Flow类中。具体实现是通过SubmissionPublisher和ConsumerSubscriber两个默认实现;
package org.example.flux;
import java.io.IOException;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
/**
* @date: 2025/1/16 18:29
* @author: lvan
* @description: 使用Flow的api
*/
public class FlowApiDemo {
public static void main(String[] args) throws IOException, InterruptedException {
//发布者,就是流的源头
SubmissionPublisher<String> publisher =new SubmissionPublisher();
//定义订阅者
Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {
Flow.Subscription subscription = null;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println(Thread.currentThread().getName()+"开始订阅时触发onSubscribe……");
this.subscription = subscription;
//向发布者发送一个信号,代表订阅者已经准备好
subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.println(Thread.currentThread().getName()+"接收从发布者的下一个元素时触发的onNext……");
System.out.println("订阅者接收"+item);
//背压模式,每次告诉发布者只发送一个
/*try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}*/
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println("接收发布者的错误时触发onError……"+throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("流正常结束的信号onComplete……");
}
};
//订阅者订阅发布者
publisher.subscribe(subscriber);
//发布者发布
for (int i=0;i<10;i++) {
publisher.submit("test"+i);
System.out.println("发布者发送了一个后开始休眠一秒");
/* Thread.sleep(1000);
if (i==7){
throw new RuntimeException("报异常……");
}*/
}
publisher.close();
//因为发布订阅是异步的,所以不要让主线程停止,避免程序结束
System.in.read();
}
}
运行效果:
其实注释得很清晰了,定义发布者和订阅者后,注意:
①要订阅者订阅发布者后,且要request()发送一个信号给发布者,告诉它流程已准备好,开启异步线程接收到发布者发布的数据,否则发布者只是发送到一个缓冲区数组中;
②注意订阅者每次onNext接收发布者的消息时,可以通过背压模式控制接收能力,也就是调request(),如果不调该方法,就代表不再接收消息了;
背压模式的思想是大量请求过来,是发送到缓冲区,然后线程根据自己的消费能力往缓冲区中获取,这就是背压模式
③注意发布者和订阅者定义的泛型,都是String,也就是说要发送和接收的类型要一致才能成功接收;
处理器
还可在发布者和订阅者之间定义处理器,因为前面也说过,响应式编程就像是流,定义了数据源头后,中间可被多个步骤进行处理;可看做处理器作为发布者的订阅者,作为订阅者的发布者:
package org.example.flux;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
/**
* @date: 2025/1/16 18:38
* @author: lvan
* @description: TODO
*/
public class FlowApiProcessor extends SubmissionPublisher<String> implements Flow.Processor<String,String> {
Flow.Subscription subscription = null;
@Override
public void subscribe(Flow.Subscriber subscriber) {
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("处理器订阅触发onSubscribe……");
this.subscription =subscription;
subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.println("作为中间操作,处理数据触发onNext……");
item="hello,Flow Api_"+item;
//作为发布者发送数据
submit(item);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
}
还需在订阅关系那里改一下:
//处理器订阅发布者
FlowApiProcessor flowApiProcessor = new FlowApiProcessor();
publisher.subscribe(flowApiProcessor);
//订阅者订阅处理器
publisher.subscribe(subscriber);
如有需要收藏的看官,顺便也用发财的小手点点赞哈,如有错漏,也欢迎各位在评论区评论!