从设计模式角度来看 RxJava 核心源码
从订阅者模式和装饰器模式的角度来看 RxJava 源码。
1. 普通订阅者模式与 RxJava 中的订阅者模式
订阅者模式又叫做观察者模式,主要用于实现响应式编程。其本质,就是接口回调。
普通订阅者模式:多个对象间存在一对多的依赖关系,当一个对象的状态发生改变的时候,所有依赖于它的对象都会得到通知并被自动更新。简单来说,就是发布者持有多个订阅者的引用,当其状态发生改变时调用这些订阅者执行更新的方法。(调用订阅者执行更新的方法,也被叫做进行接口回调)
具体订阅者,也叫做具体观察者,通过订阅,添加到发布者的 Observers 集合中,当发布者有消息要发布时,通过回调订阅到自己的所有订阅者的 response() 方法,即可完成消息发布。
在 RxJava 中,增加了一个事件发生器,RxJava 将事件与发布者解耦,事件发射器负责事件发布,发布者负责接收订阅。
可以说,只要有用到回调,就是用了观察者模式。RxJava 中,订阅者、发布者、事件发射器是 1:1:1 的关系。发布者的实现类,负责订阅以及订阅过程中的功能拓展。观察者由于订阅,将自己的引用交给了发布者,发布者又将订阅者的引用交给了事件发射器。真正产生消息、发射消息给订阅者的任务,由事件发射器完成。
源码:
Observable的创建操作符创建了事件发射器:
//Observable.java
//通过将方法设置为静态的设计,来让用户必须从创建操作符开始使用RxJava
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
传入的 ObservableOnSubscribe 是事件形成的内容,由用户自定义:
Observable.create(new ObservableOnSubscribe<String>(){
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception{
//发射某消息
emitter.onNext("room1");
}
})...
Observable构建出的发布者源头,将会在观察者订阅到它的时候,将观察者的引用,交给事件发射器。事件发射器的事件形成逻辑为上述代码。
//ObservableCreate.java的订阅方法
public final class ObservableCreate<T> extends Observable<T>{
@Override
protected void subscribeActual(Observer<? super T> observer) {
//创建事件发射器,并持有观察者的引用
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//订阅成功,回调订阅者的onSubscribe接口
observer.onSubscribe(parent);
//订阅逻辑,也叫事件形成逻辑(由用户自定义实现)
source.subscribe(parent);
}
}
事件发射器,发射事件的方式,其实就是回调观察者的 onNext(T t) 方法,将事件消息回调发送给观察者:
//ObservableCreate.CreateEmitter.java
public final class ObservableCreate<T> extends Observable<T>{
//...
static final class CreateEmitter<T> ...{
final Observer<? super T> observer;
//构建时,传入观察者的引用
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
//通过持有的观察者引用,回调其onNext方法,将消息回调回去
observer.onNext(t);
}
//...
}
}
2. RxJava 中独特设计的装饰器模式
RxJava 的作用不仅仅是生成事件直接就发给订阅者,中途可以对消息进行多次处理。例如将多个需要逐步完成的任务,通过 RxJava 操作符连接起来,只将最后的结果交给观察者。甚至还可以切换线程,例如将生成事件、处理事件切换到子线程进行,将观察者的回调切换回主线程,以便更新 UI 。
RxJava是如何实现这种效果的呢?答案是使用了装饰器模式。不过这个装饰器模式不仅运用在了发布者上,还运用在了监听者上。
- 对于发布者,根据调用的操作符,对原始发布者进行功能包装:
- 拓展
subscribeActual()
的功能 - 增加功能需要使用到的属性,例如:线程切换需要的 Scheduler 实例、类型转换需要的 Function 实例
- 被包装的发布者被称为上游、源:source
- 拓展
- 对于观察者,在订阅回流中,按订阅途径的发布者的要求,包装自己:
- 拓展
onNext()
、onComplete()
等回调方法的功能 - 增加功能需要使用到的属性,例如:线程切换需要用到的 Worker 实例、类型转换需要的 Function 实例
- 被包装的监听者被称为下游:downStream
- 拓展
我们来模拟一个调用链场景:
create()
->map()
->flatMap()
->subscribeOn()
->observeOn()
->subscribe()
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
//发射项目组房间号
emitter.onNext("room1");
}
}).map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
//通过房间号先加载本地缓存
//... get from network
String roomInfo = "get from local disk";
return roomInfo;
}
}).flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String s) throws Exception {
//本地缓存加载好后,从网络获取当前在房人员
List<String> users = new ArrayList<>();//get from network
//逐个交给后续处理
return Observable.fromIterable(users);
}
})
//以上是事件处理流程
//接下来进行线程切换
.subscribeOn(Schedulers.io())//将上游设置为在线程池中执行
.observeOn(AndroidSchedulers.mainThread())//将下游设置在主线程运行
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println(" onSubscribe ");
}
//最终观察者的回调
@Override
public void onNext(String s) {
System.out.println(" get one user "+s);
}
@Override
public void onError(Throwable e) {
System.out.println("get error "+e);
}
@Override
public void onComplete() {
System.out.println("get all users completed");
}
});
我们可以根据这个场景画出调用链的图,更好理解装饰器在 RxJava 中是如何做到增强功能的:
Observable 通过创建操作符,构建出了首个真正将事件发射器、订阅者、发布者关联起来的源头。该源头也被作为装饰器的具体部件,它可以被继承自 AbstractObservableWithUpStream 这个基础装饰类的具体装饰类如 ObservableSubscribeOn 类装饰,从而实现功能拓展。它的装饰顺序为从上游到下游。(最左边最靠近原始发布者的称为上游,最靠近原始订阅者的为下游)
RxJava装饰器模式的命名特点: Observable+XXX 、 XXX+Observer
map操作符装饰的发布者:ObservableMap 、订阅者 MapObserver
subscribeOn操作符装饰的发布者:ObservableSubscribeOn、订阅者 SubscribeOnObserver
Observer 由用户实现,并在从下游到上游的订阅过程中,由各层发布者 Observable 的规则进行装饰,对 onNext() 方法进行功能拓展。
下面这个图将订阅流、装饰流标注出来了:
我们从源码上看一下装饰器模式到底是如何进行功能拓展的。先看对发布者的装饰,是如何进行发布者的功能拓展的.
看到 subscribeOn() 操作符对应的发布者类:
- 通过装饰,增加了一个成员变量 Scheduler实例
- 通过装饰,增加线程切换功能,可以实现 把observer订阅到source的方法 切换到 Scheduler 对应的线程中。
- 通过装饰,增加了对订阅者 observer 的装饰。(这个增强在几乎所有发布者装饰类的 subscribeActual() 方法中都有实现)
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
//通过装饰,增加了一个成员变量 Scheduler实例
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
//装饰器增加的变量一般都由构造器传入。
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
//通过装饰,增加了对订阅者 observer 的装饰
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
//回调observer的完成订阅回调
observer.onSubscribe(parent);
//通过装饰,将 source.subscribe(observer) 整条订阅代码放到了 Scheduler 的线程中执行。实现了线程切换的功能拓展。
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
//...
}
装饰器模式再对发布者的功能拓展中,也都增加了对订阅者的装饰。发布者的功能增强主要体现在 subscribeActual() 方法中。订阅者的功能增强,则主要体现在 onNext() 方法中。比如我们看到 map 操作符中对订阅者的装饰类 MapObserver:
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
//在订阅者向发布者订阅的途中,发布者将实现功能拓展需要的Function实例由发布者传入该装饰类
this.mapper = mapper;
}
@Override
public void onNext(T t) {
//通过传入的function,对参数t进行类型处理,得到了类型为U的参数v,并回调给下游订阅者
U v = mapper.apply(t)
downstream.onNext(v);
}
}
3. 装饰器模式在 RxJava 线程切换中的优越表现
从 2 中,我们分析得知,发布者、订阅者的功能拓展都是在装饰的途中完成的,其中 subscribeOn() 操作符实现了对上游的线程切换,observeOn() 操作符实现了对下游的线程切换。
我们知道,当有多个 subscribeOn() 操作符时,只有最上游的 subscribeOn() 才决定发布者形成事件、处理事件所在的线程。
我们还知道,当有多个 observeOn() 操作符时,下游回调的方法所在线程由上游距离最近的 observeOn() 操作符所决定,换句话说,可能有多个 observeOn() 操作符生效。 例如操作符调用流程:
op -> observeOn(MAIN) -> op1 -> observeOn(IO) -> op2 -> observeOn(MAIN) -> op3 -> op4-> subscribe(new Observer)
op1受距离其最近的上游 observeOn() 操作符影响,所以 op1的订阅者 的 onNext() 执行所在线程为主线程。同理 op2的订阅者 执行在子线程,op3、op4、最后的observer 都执行在主线程。
3.1 subscribeOn() 装饰类 ObservableSubscribeOn.java 切换上游线程原理:
ObservableSubscribeOn 类在构建时,传入了 scheduler实例,scheduler实例中含有 worker:
- HandlerScheduler类实现的Worker将传入的 Runnable 通过 handler.post(runnable) 方法提交到主线程,实现线程切换到主线程的操作
- IoScheduler类实现的Worker则是将传入的 Runnable 交给线程池执行,实现线程切换到子线程的操作。
ObservableSubscribeOn 做了两件事:
- 将 source.subscribe(observer) 订阅语句包装在了 Runnable 中;
- 将包装好的 Runnable 交给 scheduler 放到目标线程执行。
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
@Override
public void subscribeActual(final Observer<? super T> observer) {
//包装observer、observer的订阅完成回调
//...
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
}
订阅语句被包装在了 SubscribeTask 类中,这是个内部类,它实现了 Runnable 接口:
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
//传入的是被装饰的observer
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//订阅语句被包装到了 run() 方法中,等待被用户指定的线程调用
source.subscribe(parent);
}
}
由 scheduler 来决定如何执行该 Runnable 实例:
public abstract class Scheduler {
//...
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//根据scheduler子类的具体实现,创建Worker
//HandlerScheduler子类实现的是HandlerWorker
//IoScheduler子类实现的是 ThreadWorker、EventLoopWorker
final Worker w = createWorker();
//由用户指定的Worker来执行Runnable实例
w.schedule(task, delay, unit);
return task;
}
//...
}
3.2 IoScheduler 将 Runnable 放入子线程运行
我们来看一下 IoScheduler 如何将 Runnable 放入子线程运行的:
public final class IoScheduler extends Scheduler {
//...
@Override
public Worker createWorker() {
//pool.get()是从CachedWorkerPool中获取一个子线程
return new EventLoopWorker(pool.get());
}
static final class EventLoopWorker extends Scheduler.Worker{
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
//实际是调用了构造方法传入的threadWorker.scheduleActual来执行runnable
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
}
ThreadWorker 继承自 NewThreadWorker,最后会调用到 NewThreadWorker 的 scheduleActual() 方法来执行传入的 runnable:
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
//...
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
//将传入的runnable进行第一次包装
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//将传入的runnable进行第二次包装
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
//根据是否有延时,决定调用线程池的submit方法还是schedule方法
f = executor.submit((Callable<Object>)sr);
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
return sr;
}
//...
}
3.3 HandlerScheduler 将 Runnable 放入主线程运行
HandlerScheduler 执行 Runnable 的逻辑比较简单,HandlerScheduler 类在构建的时候需要传入执行 Runnable 实例的 handler。如果传入的是处在主线程的Handler,那么 runnable 将会在主线程被执行。
private static final class HandlerWorker extends Worker {
private final Handler handler;
private final boolean async;
private volatile boolean disposed;
HandlerWorker(Handler handler, boolean async) {
this.handler = handler;
this.async = async;
}
@SuppressLint({"NewApi"})
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
HandlerScheduler.ScheduledRunnable scheduled = new HandlerScheduler.ScheduledRunnable(this.handler, run);
//将 Runnable 包装到 Message 中发送
Message message = Message.obtain(this.handler, scheduled);
message.obj = this;
if (this.async) {
message.setAsynchronous(true);
}
//真正发送消息,即将 runnable 放到 mainLooper中 等待主线程执行。
this.handler.sendMessageDelayed(message, unit.toMillis(delay));
if (this.disposed) {
//内存泄漏优化
}
}
}
}
3.4 observeOn() 装饰类 ObservableObserveOn.java 切换下游线程原理
ObservableObserveOn 将订阅者 observer 包装成了一个 Runnble,切换线程的功能拓展放在了订阅者的 onNext() 方法中:
- observer 被包装成为了 Runnable
- onNext() :
- 将消息放到 queue中;
- 让 worker执行runnable;
- run()-> drainNormal():
- 从 queue 中获取消息;
- 在 worker 指定的线程环境下回调 downStream.onNext(t)
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
//在订阅的途中,将observer包装成了 ObserveOnObserver类
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
//observer在本层的装饰类
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
final SimpleQueue<T> queue;
//onNext()将消息放入了队列,并通过schedule让worker执行run()方法
@Override
public void onNext(T t) {
//将消息放入队列
queue.offer(t);
//通过schedule()让worker执行run方法
schedule();
}
//通过schedule()让worker执行run方法。
//Worker如何执行run在 3.2和3.3中已经探讨过
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
//run方法,执行的线程 由worker决定。
@Override
public void run() {
//下游的onNext()在run()所在线程执行,从而实现对直接下游的线程切换
drainNormal();
}
//drainNormal()方法,从queue中获取消息,回调给下一层
void drainNormal(){
while(true){
//从队列中获取消息
T t = queue.poll();
//...检查问题
//自旋空转等待数据获取到
//获取到数据后,回调给下一层
downStream.onNext(t);
}
}
}
}
总而言之,subscribeOn() 操作符对上游线程切换的功能拓展,实现在订阅过程中发布者的 subscribeActual() 方法中。
而 observeOn() 操作符对下游线程切换的功能拓展,是现在消息回调过程中订阅者的 onNext() 方法中。
不论如何线程切换,方式都是将 订阅 source.subscribe(observer)
或者 消息回调downStream.onNext(t)
的核心代码包装在 Runnable 的 run() 中,并交给 Scheduler 的Worker调度执行。
4. RxJava 通过 Disposal 来解决内存泄漏问题
由于 RxJava 的调用方式,最后会有一个匿名内部类:
Observable.。。。.subscribe(new Consumer(){...})
匿名内部类是持有外部类的引用的,如果 Activity 关闭后,仍然被长生命周期的对象持有引用,将不会被 GC 回收,这将会引发内存泄漏。在 RxJava 中,解决方法就是在 Activity 退出后,也将长生命周期的对象停止使用,从而使得 Activity 可以被回收。
方法一:
订阅成功后,拿到 Disposable 对象,在 Activity/Fragment 销毁时,调用 Disposable 对象的 dispose() 方法,将所有异步任务中断。
方法二:
如果我们开了很多个异步任务,需要在 Activity/Fragment 销毁的时候中断多个异步任务,为了方便起见,可以使用 CompositeDisposable类 进行异步任务的停止。
方法三:
AndroidX提供了 Lifecycle,可以监听 Activity/Fragment 的生命周期,当 Activity/Fragment 销毁的时候,中断异步任务。可以使用框架 RxLifecycle,完成生命周期的绑定,也可以自行实现。