简介
RxJava是对响应式扩展( Reactive Extensions,称之为 ReactiveX )规范的Java 实现,该规范还有其他语言实现:RxJS、Rx.Net、RxScala、RxSwift等等(也即,ReactiveX 定义了规范,其他语言实现规范即可,所以我们这里学习RxJava的架构和设计思维,只需研究ReactiveX 即可)。RxJava是一个通过使用可观察序列来组合异步操作(也即观察者模式,观察序列表示一组观察者,后面会详细介绍),并且基于事件驱动的Java库。它基于观察者模式并扩展了支持数据/事件序列的功能,添加了很多在数据转换时使用的操作符(比如:map、flat等等,像不像Java 8的Stream流式编程)。同时,RxJava 抽象了底层线程模型实现、线程安全操作的实现,让使用方不需要关心底层实现,专注于对业务的处理。
原理图解
Rxjava的核心思路被总结在了图中,本文分为两部分,第一部分讲图中的三条流和事件传递,第二部分讲线程切换的原理,下面进入正题。
响应式编程
响应式编程是一种基于异步数据流概念的编程模式;数据/事件就像一条河流,从源头一直往下流,在流动过程中,可以被观测、被过滤、被操作,或者与另一条流合并成一条新的流,最终流向大海被消费掉;
与响应式编程相对应的有同步式编程、异步式编程:
- 同步式编程:比如我们在主线程上请求一个网络接口,一直等到返回结果才能继续执行下一步,这就是同步式的
- 异步式编程:开启一个子线程去请求网络接口,主线程继续执行,然后定时去查询接口返回的结果
- 响应式编程:开启一个子线程去请求网络接口,注册监听后主线程继续执行,网络接口返回数据后,主动回调注册的监听方法,从而达到响应的目的
RxJava可以简单理解为就是观察者模式+异步处理+链式调用(流的概念)
Rxjava需要达成的共识两种设计模式
观察者模式:实现响应式编程的基础
装饰器模式:各种操作符的具体实现类都通过装饰器模式类拓展完成
Rxjava核心框架核心部分
- ObservableSource : 被观察者的顶层接口,提供订阅subsccribe()方法
- Observable: 被观察者抽象类,实现ObservableSource的接口,并提供实际订阅的抽象方法。
- Observer : 观察者接口,提供处理事件的回调方法。
- ObservableOnSubscribe:被观察者与事件解耦的接口
- Emitter : 事件发射的接口,提供发射事件的方法。
- ObservableXXX: 具体的被观察者实现类,持有ObservableOnSubscribe接口的引用
- XXXEmitter : 事件发射器具体实现,持有观察者引用。
- XXXObserver : 具体观察者的实现类。
- AbstractObservableWithUpStream: 被观察者的抽象装饰类,持有了顶层接口的引用,都是通过继承该抽象类来实现各种操作符的被观察者 。
源码分析create()操作符
private static void testCreate() {
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Throwable {
for (int i = 0; i < 10; i++) {
System.out.println("emitter发射value数据:" + i);
emitter.onNext("value=" + i);
}
emitter.onComplete();
}
}).subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Throwable {
System.out.println(o);
}
});
}
Observable & ObservableSource & ObservableOnSubscribe等
- ObservableSource : 被观察者的顶层接口,提供订阅subsccribe()方法
- Observable: 被观察者抽象类,实现ObservableSource的接口,并提供实际订阅的抽象方法。
- ObservableOnSubscribe :被观察者与事件发送解耦的接口
- Observer: 观察者接口,提供处理事件的回调方法。可以在此接口的onSubscribe()函数来控制被观察者的事件发送后,观察者能否被消费
- ObservableXXX: 具体的被观察者实现类,持有
- ObservableOnSubscribe接口的引用
Observable.java
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Throwable {
for (int i = 0; i < 10; i++) {
System.out.println("步骤一 :emitter发射value数据:" + i);
emitter.onNext("value=" + i);
}
emitter.onComplete();
}
}).subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Throwable {
System.out.println("步骤二消费事件:" + o);
}
});
// ObservableOnSubscribe:被观察者与事件解耦的接口
public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
Objects.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}
ObservableCreate.java
// 1: 构造函数保存 ObservableOnSubscribe
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<>(observer);
// 2: 先回调Observer的onSubscribe()函数
observer.onSubscribe(parent);
try {
// 3: ObservableOnSubscribe再 发射事件
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
- Emitter : 事件发射的接口,提供发射事件的方法。
- ObservableXXX: 具体的被观察者实现类,持有ObservableOnSubscribe接口的引用
- XXXEmitter : 事件发射器具体实现,持有观察者引用。
- XXXObserver : 具体观察者的实现类。
ObservableCreate.java
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
// 1: CreateEmitter 持有下游 Observer的引用
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(ExceptionHelper.createNullPointerException("onNext called with a
null value."));
return;
}
// 2: 根据下游的 Observer 的onSubscribe()函数判断是否取消发射,决定Observer是
否调用 observer.onNext函数
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Throwable {
for (int i = 0; i < 10; i++) {
System.out.println("步骤一 :emitter发射value数据:" + i);
// 0 : 上游被观察者传递过来的 emitter
emitter.onNext("value=" + i);
}
emitter.onComplete();
}
}).subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Throwable {
System.out.println("步骤二消费事件:" + o);
}
});
map()操作符源码分析
/**
* 直接对发射出来的事件进行处理并且产生新的事件,然后再次发射
*/
private static void testMap() {
Observable.just("aaa")
.map(new Function<String, Object>() {
@Override
public Object apply(String s) throws Throwable {
System.out.println("步骤二: "+"事件转换之后再次发射");
return s+" + bbb";
}
}).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("步骤一:"+ "use Subscribe connect Observable and Observer");
}
@Override
public void onNext(Object o) {
System.out.println("步骤三: "+"Next event:" + o + " response");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
Observable.just() 生产:ObservableJust类,并在发射事件时调用subscribeAcutal函数
ObservableJust.java
public final class ObservableJust<T> extends Observable<T> implements ScalarSupplier<T> {
private final T value;
public ObservableJust(final T value) {
this.value = value;
}
// Observer 是下游的最后一个 observer
// subscribeActual 是 抽象类 Observable的具体实现类ObservableJust,它会在
// Observable的 subscribe()中被回调
@Override
protected void subscribeActual(Observer<? super T> observer) {
ScalarDisposable<T> sd = new ScalarDisposable<>(observer, value);
observer.onSubscribe(sd);
sd.run();
}
@Override
public T get() {
return value;
}
}
Observable.java
public final void subscribe(@NonNull Observer<? super T> observer) {
Objects.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
// 利用java多态的特性,直接调用 ObservableJust.java中的 subscribeActual
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
被观察者通过 ObservableScalarXMap.ScalarDisposable.run()发射事件
ObservableScalarXMap.ScalarDisposable.java
@Override
public void run() {
if (get() == START && compareAndSet(START, ON_NEXT)) {
// 持有下游的 Observer的引用,直接消费事件
observer.onNext(value);
if (get() == ON_NEXT) {
lazySet(ON_COMPLETE);
observer.onComplete();
}
}
}
在Android开发中rxjava部分是非常重要的;想要更深入学习或者更多Android核心技术,可以参考《Android核心技术手册》点击查看里面上千个技术知识。
文末
1)RxJava
有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。
2)与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()。
3)onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
4)onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
5)在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。