一.前言
-
本篇文章学习的目标:
-
Rxjava体系的知识相当庞大,我们仅针对以上内容进行学习;其次,源码的细节是比较复杂的,我们以弄清楚原理为主,不会深入细节;
-
需要弄清楚的概念:被观察者;观察者;订阅;上游/下游的被观察者;上游/下游的观察者;这几个概念会在下方的具体场景中对其进行解释;
-
验证一些结论:
- 验证一:下游的被观察者是持有上游被观察者的引用;
- 验证二:上游观察者是持有下游观察者的引用;
- 验证三:下游的被观察者触发上游被观察者的订阅;
- 验证四:下游的观察者会通过“持有”下游观察者的引用从而去调用下游观察者的方法;
二.场景一:无操作符,无线程切换
- 代码如下(为了便于学习理解,使用了部分中文作为变量,选取的是Single)
//1.创建被观察者
var 被观察者 = Single.just("123") //分析1
//2.创建观察者
var 观察者 = object : SingleObserver<String> {
override fun onSuccess(t: String) {
println("test ---> $t")
}
override fun onSubscribe(d: Disposable) {
}
override fun onError(e: Throwable) {
}
}
//3.被观察者订阅观察者
被观察者.subscribe(观察者) //分析2
- 上面这段代码,会将just方法中的参数传递到给SingleObserver匿名内部类的onSuccess方法中;我们逐步分析其是如何实现的;
- 看一下分析1的源码:被观察者的创建
//Single.java
public static <T> Single<T> just(final T item) {
ObjectHelper.requireNonNull(item, "item is null");//校验,忽略
return RxJavaPlugins.onAssembly(new SingleJust<T>(item));//RxJavaPlugins.onAssembly,钩子函数,忽略
}
//SingleJust.java
public final class SingleJust<T> extends Single<T> {
final T value;
public SingleJust(T value) {
this.value = value;
}
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
observer.onSubscribe(Disposables.disposed());
observer.onSuccess(value);
}
}
- Single.just(“123”)实际上就是创建了一个SingleJust对象,也就是被观察者。而观察者的创建是通过一个SingleObserver的匿名内部类来构建,订阅对应的是subscribe方法;
- 看一下分析2的源码:
//Single.java
public final void subscribe(SingleObserver<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null SingleObserver. Please check the handler provided to RxJavaPlugins.setOnSingleSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
try {
subscribeActual(observer);//分析3
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
NullPointerException npe = new NullPointerException("subscribeActual failed");
npe.initCause(ex);
throw npe;
}
}
- 在分析2位置,变量“被观察者”是指向SingleJust对象的(多态),那么对应分析3可以知道会执行SingleJust的subscribeActual方法,看一下源码
//其中,方法中的参数为分析2位置中传递的观察者
protected void subscribeActual(SingleObserver<? super T> observer) {
//调用观察者的onSubscribe和onSuccess方法
observer.onSubscribe(Disposables.disposed());
observer.onSuccess(value);
}
- 通过场景一,我们对被观察者/观察者/订阅有个认识。
- 上面三步的源码分析,实际就是<被观察者>会“持有”(并非真正的持有,而是通过方法传递观察者)<观察者>的引用,当<被观察者>调用了订阅(subscribe)方法,就会通过<观察者>的引用去调用<观察者>自己的方法;如此,从<被观察者>那里产生的事件<我们统一先理解为数据吧,后续也这么理解,虽然有些不规范,但利于学习>会传递到给<观察者>的方法中;经过上面的分析,<数据>形成了流动。我们再来复杂一点的,加上操作符map。
三.场景二:在场景一的基础上增加map操作符
- 代码如下
//1.创建被观察者
var 被观察者2 = Single.just("123")
//2.创建观察者
var 观察者2 = object : SingleObserver<String> {
override fun onSuccess(t: String) {
println("test ---> $t")
}
override fun onSubscribe(d: Disposable) {
}
override fun onError(e: Throwable) {
}
}
//3.调用map操作符(创建新的被观察者)
var 被观察者3 = 被观察者2
.map { it -> "$it ---> 操作符原理" } //分析4
//4.<被观察者3>调用订阅方法
被观察者3.subscribe(观察者2) //位置5
- 看一下分析4的源码
public final <R> Single<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleMap<T, R>(this, mapper)); //分析5
}
- 分析5的位置实际上就是创建SingleMap对象(同样的前面的钩子函数忽略),通过场景一,我们可以分析到,“被观察者2”指向的是SingleJust,也就是分析5位置SingleMap的第一个参数是SingleJust,第二个参数是分析4位置传递过来的,那么,位置5的“被观察者3”实际上就是指向SingleMap,当调用位置5的代码时会执行SingleMap的subscribeActual方法(根据场景一可知);
- 分析SingleMap的subscribeActual方法的源码
public final class SingleMap<T, R> extends Single<R> {
final SingleSource<? extends T> source;
final Function<? super T, ? extends R> mapper;
public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
this.source = source;
this.mapper = mapper;
}
@Override
protected void subscribeActual(final SingleObserver<? super R> t) {
//分析6
source.subscribe(new MapSingleObserver<T, R>(t, mapper));
}
static final class MapSingleObserver<T, R> implements SingleObserver<T> {
final SingleObserver<? super R> t;
final Function<? super T, ? extends R> mapper;
MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
this.t = t;
this.mapper = mapper;
}
@Override
public void onSubscribe(Disposable d) {
t.onSubscribe(d);//位置7
}
@Override
public void onSuccess(T value) {
R v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(e);
return;
}
t.onSuccess(v);//位置8
}
@Override
public void onError(Throwable e) {
t.onError(e);
}
}
}
-
看一下分析6的代码
- 通过分析5我们知道,分析6位置的source表示SingleJust,分析6的位置其逻辑是,通过SingleJust的引用执行订阅操作,订阅方法的参数是一个新的观察者(MapSingleObserver),新观察者持有“观察者2”的引用以及通过map操作符传递过来的mapper参数。
- 当SingleJust执行订阅方法时,会触发SingleJust的subscribeActual方法,结合前面的分析,会执行MapSingleObserver的onSubscribe和onSuccess方法。于是位置7和位置8就会执行“观察者2”的onSubscribe和onSuccess方法。
-
到此,“被观察者2”位置传递的参数就传递到了“观察者2”对应的回调方法中,也就是增加了map操作符的情况下,订阅逻辑以及<数据>传递逻辑就理顺了。
-
我们通过一张图来总结一下:
-
Api从上往下的调用所创建的被观察者我们分别成为上游被观察者和下游被观察者,我们自己主动创建的观察者我们称为下游观察者,该图解释了,在Api调用过程中对应的被观察者的创建以及上游观察者的创建。同时,被观察者与被观察者,被观察者与观察者之间的持有关系;
-
至此,前言中所提及的需要清楚的概念和需要验证的结论已经明确了。下面总结一下Rxjava的核心原理:最下游的被观察者订阅最下游的观察者时(对应上图Api最下方的代码),会创建新的观察者作为下游观察者的上游并持有下游观察者,同时会触发其上游被观察者订阅刚创建的上游观察者。当最上游的被观察者执行了订阅方法,会去回调自身的subscribeActual方法,在该方法内部会调用最上游观察者的api,由于上游的观察者是持有下游观察者的,如此,会触发调用下游的观察者的方法,直到触发我们自己定义的最下游的观察者中的回调。即订阅是从下游逐步往上游触发,数据是从上游逐步往下游传递。
四.场景三:在场景二的基础上增加线程切换
- 场景三是为了分析线程切换的原理
4.1.subscribeOn切换线程的原理
Single.just("123").map { it -> "$it ---> 操作符原理" }
.subscribeOn(Schedulers.io())//分析9
.observeOn(AndroidSchedulers.mainThread())//分析10
.subscribe(object : SingleObserver<String> {
override fun onSuccess(t: String) {
}
override fun onSubscribe(d: Disposable) {
}
override fun onError(e: Throwable) {
}
})
- 查看分析9的源码
//Single.java
public final Single<T> subscribeOn(final Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new SingleSubscribeOn<T>(this, scheduler)); //分析11
}
- 查看分析11的源码
//SingleSubscribeOn.java
public final class SingleSubscribeOn<T> extends Single<T> {
final SingleSource<? extends T> source;
final Scheduler scheduler;
public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
this.source = source;
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer, source);
observer.onSubscribe(parent);
Disposable f = scheduler.scheduleDirect(parent);//分析12
parent.task.replace(f);
}
static final class SubscribeOnObserver<T>
extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
//...
public void run() {
source.subscribe(this);
}
}
}
- 查看分析12的源码
//抽象类 Scheduler.java
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);//分析13
}
//分析13的源码
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();//分析14
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);//钩子函数,忽略
DisposeTask task = new DisposeTask(decoratedRun, w);//位置15
w.schedule(task, delay, unit);//分析16
return task;
}
- 查看分析14的源码
public abstract Worker createWorker();
//会执行其子类的createWorker,具体的是IoScheduler类的createWorker方法,因为在调用subscribeOn(Schedulers.io())时传入了Schedulers.io(),而通过分析Schedulers.io()就可以知道。
- 分析Schedulers.io()
//Schedulers.java
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);//忽略钩子函数,查看IO即可
}
static {
//...
IO = RxJavaPlugins.initIoScheduler(new IOTask());//忽略钩子函数,查看IOTask
//...
}
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
//查看IoHolder.DEFAULT
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
- 现在我们知道分析14的位置,createWorker方法会调用IoScheduler类的createWorker方法,进入查看,可以知晓Worker指向的是EventLoopWorker。
//IoScheduler.java
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
static final class EventLoopWorker extends Scheduler.Worker {
//...
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
//...
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);//分析17
}
}
- 位置15将Work和run(传递过来的新观察者)做了封装,分析16最终会执行EventLoopWorker的scheduleActual方法,最终会执行到分析17位置
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);//钩子函数,忽略
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);//位置18
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);//分析19
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
- 位置18的decoratedRun实际上还是scheduleActual方法的第一个参数run,也就是前面的DisposeTask,由于delayTime是等于0的,会执行到分析19,这里使用到了线程池,最终会执行sr(ScheduledRunnable)的call()方法,查看ScheduledRunnable的call()方法
public final class ScheduledRunnable extends AtomicReferenceArray<Object>
implements Runnable, Callable<Object>, Disposable {
//...
public ScheduledRunnable(Runnable actual, DisposableContainer parent) {
super(3);
this.actual = actual;
this.lazySet(0, parent);
}
//...
public Object call() {
// Being Callable saves an allocation in ThreadPoolExecutor
run();
return null;
}
@Override
public void run() {
lazySet(THREAD_INDEX, Thread.currentThread());
try {
try {
actual.run();//分析20
} catch (Throwable e) {
//...
}
} finally {
//...
}
}
//...
}
- 后面就执行到了分析20位置,而actual参数为DisposeTask,最终会执行DisposeTask的run方法
//Scheduler.java
public void run() {
runner = Thread.currentThread();
try {
decoratedRun.run();//分析21
} finally {
dispose();
runner = null;
}
}
- 分析21,通过前面的分析可以知道,decoratedRun是SubscribeOnObserver,也就是会执行SubscribeOnObserver的run方法,查看其方法
//SingleSubscribeOn.java的内部类SubscribeOnObserver
public void run() {
source.subscribe(this);//执行上游的订阅操作
}
- 到此,我们清楚了subscribeOn切换线程的原理:先做切换线程然后上游的被观察者再做订阅。
- 当多次调用subscribeOn时,只有第一次调用的subscribeOn才生效(因为只有最上游的才会生效);
4.2.observeOn切换线程的原理
- 清楚了subscribeOn切换线程的原理,observeOn切换线程的原理实际上会容易一些,大家可以按照4.1中的分析进行源码梳理,这里就直接写出结论。
- observeOn切换线程的原理:先使上游的被观察者进行订阅,然后在上游的观察者通知下游的观察者时再进行线程切换。切换的是下游的线程,即:上游被观察者订阅的时候不做线程切换,收到上游传递过来数据时再做线程切换。
五.(了解)disposable的工作原理
- 由于disposable很复杂,个人研究的不多,这里只是简单的做一下提及。
- 可以简单的将其分为两类:有后续与无后续。
- 有后续的取消会找上游,无后续的若取消就直接不做事。
六.手写Rxjava
- 通过手写简易版的Rxjava框架来温习其原理
6.1.Uml类图
6.2.运行效果
6.3.代码如下
- MainActivity类
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
SingleU
.just(111)
.map(object : FunctionU<Int, String> {
override fun apply(t: Int): String {
return "操作符$t"
}
})
.subscribeOn()
.observeOn()
.subscribe(object : ObserverU<String> {
override fun onSubscribe() {
println("触发时机 onSubscribe")
}
override fun onError() {
println("触发时机 onError")
}
override fun onSuccess(t: String) {
println("触发时机 onSuccess $t")
}
})
}
}
- ObservableU接口
//被观察者的顶层接口
interface ObservableU<T> {
fun subscribe(observer: ObserverU<T>)
}
- ObserverU接口
//观察者的顶层接口
interface ObserverU<T> {
fun onSubscribe()
fun onSuccess(t: T)
fun onError()
}
- FunctionU转换接口
interface FunctionU<T, R> {
fun apply(t: T): R
}
- SingleU抽象类
//该类继承自ObservableU,定义了具体被观察者的实现类的公共方法,该类是供外界使用的
abstract class SingleU<T> : ObservableU<T> {
override fun subscribe(observer: ObserverU<T>) {
subscribeActual(observer)
}
protected abstract fun subscribeActual(observer: ObserverU<T>)
fun <R> map(mapper: FunctionU<T, R>): SingleU<R> {
return SingleMapU(this, mapper)
}
fun subscribeOn(): SingleU<T> {
return SingleSubscribeOnU(this)
}
fun observeOn(): SingleU<T> {
return SingleObserveOnU(this)
}
companion object {
fun <T> just(value: T): SingleU<T> {
return SingleJustU(value)
}
}
}
- SingleJustU类
class SingleJustU<T>(private val value: T) : SingleU<T>() {
override fun subscribeActual(observer: ObserverU<T>) {
observer.onSubscribe()
observer.onSuccess(value)
}
}
- SingleMapU类
class SingleMapU<T, R>(private val source: ObservableU<T>, private val mapper: FunctionU<T, R>) :
SingleU<R>() {
override fun subscribeActual(observer: ObserverU<R>) {
source.subscribe(MapSingleObserverU1(observer, mapper))
}
internal class MapSingleObserverU1<T, R>(
private val t: ObserverU<R>,
private val mapper: FunctionU<T, R>
) : ObserverU<T> {
override fun onSubscribe() {
t.onSubscribe()
}
override fun onSuccess(value: T) {
t.onSuccess(mapper.apply(value))
}
override fun onError() {
t.onError()
}
}
}
- SingleObserveOnU类
class SingleObserveOnU<T>(private val sourceU: ObservableU<T>) : SingleU<T>() {
override fun subscribeActual(observer: ObserverU<T>) {
sourceU.subscribe(ObserveOnSingleObserver1(observer))
}
internal class ObserveOnSingleObserver1<T>(private val actual: ObserverU<T>) : ObserverU<T> {
private val handler = Handler(Looper.getMainLooper())
override fun onSubscribe() {
actual.onSubscribe()
}
override fun onSuccess(t: T) {
handler.post { actual.onSuccess(t) }
}
override fun onError() {
handler.post { actual.onError() }
}
}
}
- SingleSubscribeOnU类
class SingleSubscribeOnU<T>(private val sourceU: ObservableU<T>) : SingleU<T>() {
override fun subscribeActual(observer: ObserverU<T>) {
executorService.submit(SubscribeOnObserverU1(observer, sourceU))
}
internal class SubscribeOnObserverU1<T>(
private val actual: ObserverU<T>,
private val source: ObservableU<T>
) : ObserverU<T>, Runnable {
override fun onSubscribe() {
actual.onSubscribe()
}
override fun onSuccess(t: T) {
actual.onSuccess(t)
}
override fun onError() {
actual.onError()
}
override fun run() {
source.subscribe(this)
}
}
companion object {
private val executorService = Executors.newCachedThreadPool()
}
}
七.Rxjava结合Retrofit封装网络框架
- 项目地址
- 常用操作符
- map:接收上游发送过来的数据,可以对数据进行一定的处理,然后交给下游。应用场景:可以用来做数据剥壳,根据状态值判断是否需要抛出异常;
- compose:利用其特性减少重复代码,如线程切换。其函数参数传递的是一个具体的Transformer;
- zip:合并数据源;
- flatMap:嵌套网络请求;
- flatMap跟compose的区别,两者都用户变换被观察者。compose操作的是整个被观察者流,可以将其看作接收一个被观察者并返回一个新的观察者的函数。flatMap用来对被观察者发送的数据做变换的,可以看作是接收一个数据项并返回一个被观察者的函数。compose适用于改变数据线程,添加错误处理。
八.总结
- 本文选取了Single的使用来分析了Rxjava的核心原理,同时在最后使用了Rxjava3和Retrofit2封装成网络模块,提供了简易的Api提供给外界使用(新项目中网络部分使用的是协程对Jetpack的支持库结合Retrofit以及Flow来进行封装的,这部分知识会逐步在上方项目中进行实现)。