lift()
变换原理
这些变换虽然功能各有不同,但实质上都是针对事件序列的处理和再发送。而在RxJava
的内部,它们是基于同一个基础的变换方法:lift()
。
首先看一下lift()
的内部实现(仅显示了部分主要逻辑代码):
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
onSubscribe.call(newSubscriber);
}
});
}
方法用于将当前的 Observable 对象转换成另一种类型的 Observable 对象。它接受一个 Operator
参数,用于定义转换的规则。返回的是一个新的 Observable 对象。
它创建了一个新的 Observable 对象,并且将 operator
对象作用于当前 Observable 对象的订阅过程中。
在 Observable.create
方法中,通过创建一个匿名内部类实现了 OnSubscribe
接口的 call
方法。在 call
方法中,首先通过调用 operator.call(subscriber)
,将原始的 Subscriber
对象转换成一个新的 Subscriber
对象 newSubscriber
。然后调用 newSubscriber.onStart()
方法进行一些初始化操作。最后调用 onSubscribe.call(newSubscriber)
,将转换后的 newSubscriber
对象传递给原始的 onSubscribe
对象进行订阅操作。
类似于这个图(别的地方扒下来的)
RxJava
不建议开发者自定义Operator
来直接使用lift()
,而是建议尽量使用已有的lift()
包装方法(如map()、flatMap()
等)进行组合来实现需求,因为直接使用lift()
非常容易发生一些难以发现的错误。
线程控制Scheduler
在不指定线程的情况下,RxJava
遵循的是线程不变的原则,即在哪个线程调用subscribe()
方法就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。也就是说事件的发出和消费都是在同一个线程的。观察者模式本身的目的就是『后台处理,前台回调』的异步机制,因此异步对于RxJava
是至关重要的。而要实现异步,则需要用到RxJava
的另一个概念:Scheduler
。
Scheduler
简介
在RxJava
中,Scheduler
相当于线程控制器,通过使用 Scheduler
可以实现事件的异步处理和线程切换。Scheduler
可以指定事件发送和处理所在的线程,从而实现异步的操作,RxJava 提供了多种类型的 Scheduler
:
Schedulers.immediate()
: 直接在当前线程运行,相当于不指定线程。这是默认的Scheduler
。Schedulers.newThread()
: 总是启用新线程,并在新线程执行操作。Schedulers.io()
: I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler
。行为模式和newThread()
差不多,区别在于io()
的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下io()
比newThread()
更有效率。不要把计算工作放在io()
中,可以避免创建不必要的线程。Schedulers.computation()
: 计算所使用的Scheduler
。这个计算指的是CPU
密集型计算,即不会被I/O
等操作限制性能的操作,例如图形的计算。这个Scheduler
使用的固定的线程池,大小为CPU
核数。不要把I/O
操作放在computation()
中,否则I/O
操作的等待时间会浪费CPU
。- 另外,
Android
还有一个专用的AndroidSchedulers.mainThread()
,它指定的操作将在Android
主线程运行。
有了这几个Scheduler
,就可以使用subscribeOn()
和observeOn()
两个方法来对线程进行控制了。subscribeOn()
指定subscribe()
所发生的线程,即Observable.OnSubscribe()
被激活时所处的线程或者叫做事件产生的线程。observeOn()
指定Subscriber
所运行在的线程或者叫做事件消费的线程。
Observable.just("Hello")
.subscribeOn(Schedulers.io()) // 在 IO 线程发送事件
.map(str -> str + " World")
.observeOn(AndroidSchedulers.mainThread()) // 在主线程中处理事件
.subscribe(str -> {
// 更新 UI
textView.setText(str);
}, throwable -> {
// 处理错误
Log.e(TAG, "Error: " + throwable.getMessage());
});
上面这段代码中,subscribeOn(Schedulers.io())
的指定会让创建的事件的内容Hello
、World !
将会在IO
线程发出;而由于observeOn(AndroidScheculers.mainThread())
的指定,因此subscriber()
方法设置后的回调中内容的打印将发生在主线程中。事实上,这种在subscribe()
之前写上两句subscribeOn(Scheduler.io())
和observeOn(AndroidSchedulers.mainThread())
的使用方式非常常见,它适用于多数的***后台线程取数据,主线程显示***的程序策略。
Scheduler
的原理
我们可以多切换几次线程,因为observeOn()
指定的是Subscriber
的线程,而这个Subscriber
并不是subscribe()
参数中的Subscriber
,而是observeOn()
执行时的当前Observable
所对应的Subscriber
,即它直接对应的Subscriber
。换句话说observeOn()
指定的是它之后的操作所在的线程。所以想要多次切换线程,只要在每个想要切换线程的位置调用一次observeOn()
即可。
Observable.just("Hello")
.subscribeOn(Schedulers.io()) // 在 IO 线程执行
.observeOn(Schedulers.computation()) // 切换到计算线程执行
.map(s -> s + " World")
.observeOn(AndroidSchedulers.mainThread()) // 切换到主线程执行
.subscribe(s -> {
// 更新 UI
textView.setText(s);
});
如上,通过observeOn()
的多次调用,程序实现了线程的多次切换。 不过,不同于observeOn()
,subscribeOn()
的位置放在哪里都可以,但它是只能调用一次的。
subscribeOn()
和observeOn()
的内部实现,也是用的lift()
。
具体看图(不同颜色的箭头表示不同的线程,subscribeOn()
原理图:
observeOn()
原理图:
从图中可以看出,subscribeOn()
和observeOn()
都做了线程切换的工作(图中的schedule...
部位)。不同的是,subscribeOn()
的线程切换发生在OnSubscribe
中,即在它通知上一级 OnSubscribe
时,这时事件还没有开始发送,因此subscribeOn()
的线程控制可以从事件发出的开端就造成影响;而observeOn()
的线程切换则发生在它内建的Subscriber
中,即发生在它即将给下一级Subscriber
发送事件时,因此observeOn()
控制的是它后面的线程。
用一张图来(扒的)解释当多个subscribeOn()
和observeOn()
混合使用时,线程调度是怎么发生的
图中共有5处含有对事件的操作。由图中可以看出,①和②两处受第一个subscribeOn()
影响,运行在红色线程;③和④处受第一个observeOn()
的影响,运行在绿色线程;⑤处受第二个 onserveOn()
影响,运行在紫色线程;而第二个subscribeOn()
,由于在通知过程中线程就被第一个subscribeOn()
截断,因此对整个流程并没有任何影响。这里也就回答了前面的问题:当使用了多个subscribeOn()
的时候,只有第一个subscribeOn()
起作用。
在前面讲Subscriber
的时候,提到过Subscriber
的onStart()
可以用作流程开始前的初始化。然而onStart()
由于在subscribe()
发生时就被调用了,因此不能指定线程,而是只能执行在subscribe()
被调用时的线程。这就导致如果onStart()
中含有对线程有要求的代码(例如在界面上显示一个ProgressBar
,这必须在主线程执行),将会有线程非法的风险,因为有时你无法预测subscribe()
将会在什么线程执行。
而与Subscriber.onStart()
相对应的,有一个方法Observable.doOnSubscribe()
。它和Subscriber.onStart()
同样是在subscribe()
调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下,doOnSubscribe()
执行在subscribe()
发生的线程;而如果在doOnSubscribe()
之后有subscribeOn()
的话,它将执行在离它最近的subscribeOn()
所指定的线程。
示例代码:
Observable.create(onSubscribe)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
}
})
.subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
Agera
之前Google
发布agera
,它在Github
上的介绍是:Reactive Programming for Android
,可以进行了解。它为 Android 应用程序提供了一种简单且灵活的方式来处理数据流和事件驱动的编程模型。很轻量化,很适合安卓。
但是缺点也很明显:与 RxJava 相比,Agera 的功能相对较为有限,操作符和功能较少。对于一些复杂的数据流操作和并发处理,可能需要额外的工作量来实现