要使用Rxjava首先要导入两个包,其中rxandroid是rxjava在android中的扩展
implementation 'io.reactivex:rxandroid:1.2.1'
implementation 'io.reactivex:rxjava:1.2.0'
我们在使用rxjava的操作符时都觉得很方便,但是rxjava是怎么实现操作符的转换呢,以下面的代码进行分析
String host = "https://baidu.com/";
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("message");
subscriber.onCompleted();
}
}).map(new Func1<String, String>() {
@Override
public String call(String s) {
return host+s;
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
ILog.LogDebug("subscriber onCompleted is come in");
}
@Override
public void onError(Throwable e) {
ILog.LogDebug("subscriber onError is come in");
}
@Override
public void onNext(String s) {
ILog.LogDebug(s);
}
});
上面的代码是链式调用,为了方便理解,我把上面的代码拆分成了下面样式
String host = "https://baidu.com/";
Observable<String> obs1 = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("message");
subscriber.onCompleted();
}
});
Observable<String> obs2 = obs1.map(new Func1<String, String>() {
@Override
public String call(String s) {
return host+s;
}
});
obs2.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
ILog.LogDebug("subscriberLast onCompleted is come in");
}
@Override
public void onError(Throwable e) {
ILog.LogDebug("subscriberLast onError is come in");
}
@Override
public void onNext(String s) {
ILog.LogDebug(s);
}
});
上面代码会打印,我们将一步一步分析,打印是怎么来的
https://www.baidu.com/message
subscriberLast onCompleted is come in
obs1是我们的原始Observable, obs2是我们变换过的Observable
首先从obs1的创建开始,就是Observable的创建过程,有不理解的可以先看RXJava的创建订阅过程
在osb1的创建过程create函数需要一个OnSubscribe对象,为了方便理解,就暂不拆开来写,代码虽然用的是匿名对象,我们暂且叫它onSubscribe1
Observable<String> obs1 = Observable.create(new Observable.OnSubscribe<String>() { //onSubscribe1
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("observable call onNext0");
subscriber.onStart();
subscriber.onNext("observable call onNext");
subscriber.onCompleted();
subscriber.onNext("observable call onNext1");
}
});
obs1创建成功后调用了map方法,map方法又返回了一个Observable,就是我们的obs2,同理,map方法的参数我们叫它func1
Observable<String> obs2 = obs1.map(new Func1<String, String>() { //func1
@Override
public String call(String s) {
return host+s;
}
});
随后我们又使用了osb2进行了订阅,同理subscribe方法的参数我们叫它subscriberLast
obs2.subscribe(new Subscriber<String>() { //subscriberLast
@Override
public void onCompleted() {
ILog.LogDebug("subscriber onCompleted is come in");
}
@Override
public void onError(Throwable e) {
ILog.LogDebug("subscriber onError is come in");
}
@Override
public void onNext(String s) {
ILog.LogDebug("subscriber onNext is come in s = "+s);
}
});
于是有了下面的关系,到这里还都很好理解
然后我们来分析obs1.map调用了map方法,rxjava做了什么
public class Observable<T> {
....
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return create(new OnSubscribeMap<T, R>(this, func));
}
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
....
}
map方法又调用了create方法,create方法的调用我们已经在RxJava的调用过程中讲过,传入的参数是一个OnSubscribe对象,OnSubscribeMap实现了OnSubscribe接口,我们把new出来的OnSubscribeMap对象暂时叫做onSubscribeMap1构造方法传入的参数是obs1和func1, 那我们再来看OnSubscribeMap类
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {
final Observable<T> source;
final Func1<? super T, ? extends R> transformer;
public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
this.source = source;
this.transformer = transformer;
}
@Override
public void call(final Subscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
source.unsafeSubscribe(parent);
}
static final class MapSubscriber<T, R> extends Subscriber<T> {
final Subscriber<? super R> actual;
final Func1<? super T, ? extends R> mapper;
boolean done;
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
@Override
public void onNext(T t) {
R result;
try {
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result);
}
@Override
public void onError(Throwable e) {
if (done) {
RxJavaHooks.onError(e);
return;
}
done = true;
actual.onError(e);
}
@Override
public void onCompleted() {
if (done) {
return;
}
actual.onCompleted();
}
@Override
public void setProducer(Producer p) {
actual.setProducer(p);
}
}
}
我们知道在Observable创建好后,调用了subscribe方法就可以进行订阅了,最后调用的也是Observable创建时传入的OnSubscribe对象的call方法,以obs1的创建举例,也就是我们这里的onSubscribe1,不懂得去看RxJava的调用过程
因为我们最后执行subscribe订阅方法的是obs2那么也就会调用obs2的OnSubscribe对象,那么obs2的OnSubscribe对象是谁呢,就是onSubscribeMap1那么执行完订阅就会调用onSubscribeMap1的call方法
@Override
public void call(final Subscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
source.unsafeSubscribe(parent);
}
这里的source是OnSubscribeMap构造方法调用时初始化的也就是obs1,transformer 是我们的func1
public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
this.source = source;
this.transformer = transformer;
}
现在又有了下面的关系
在onSubscribeMap1的call方法中,一共有三行代码,这三行代码很重要,我们一行一行分析
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
实例化一个MapSubscriber对象parent ,MapSubscriber继承了Subscriber,所以MapSubscriber也是一个观察者;parent 持有了o,o是也就是subscriberLast,还持有了transformer,也就是func1。这里一会要详细分析
o.add(parent);
o是一个Subscriber,也是subscriberLast,上面的代码直接把parent添加到了subscriberLast的SubscriptionList列表。这里一会还要说一下。在看下一条代码
source.unsafeSubscribe(parent);
我们知道source也就是我们的obs1,obs1的订阅操作就是在这里发生的。至于为什么用unsafeSubscribe方法, 我们一会在分析
我们重新梳理一下,
1、obs2的订阅方法subscribe执行导致了obs2的onSubscribeMap1实例的call方法被执行;
2、onSubscribeMap1的call方法中又执行了obs1的订阅;obs1的观察者就是parent;
3、obs1的订阅必然会导致obs1 的onSubscribe实例onSubscribe1的call方法被执行。
4、在onSubscribe1的call方法中我们又调用了
subscriber.onNext("message");
subscriber.onCompleted();
这里的subscriber也就是parent,必然会调用parent的next方法并传入message,和onCompleted。
5、继续分析parent
static final class MapSubscriber<T, R> extends Subscriber<T> {
final Subscriber<? super R> actual;
final Func1<? super T, ? extends R> mapper;
boolean done;
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
@Override
public void onNext(T t) {
R result;
try {
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result);
}
@Override
public void onError(Throwable e) {
if (done) {
RxJavaHooks.onError(e);
return;
}
done = true;
actual.onError(e);
}
@Override
public void onCompleted() {
if (done) {
return;
}
actual.onCompleted();
}
@Override
public void setProducer(Producer p) {
actual.setProducer(p);
}
}
parent是MapSubscriber,MapSubscriber继承了Subscriber,MapSubscriber的构造方法需要两个参数Subscriber和Func1,通过之前的分析,知道actual就是subscriberLast,mapper就是fun1
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
用一张图来说明下
static final class MapSubscriber<T, R> extends Subscriber<T> {
.....
@Override
public void onNext(T t) {
R result;
try {
result = mapper.call(t); //这里调用,mapper就是fun1,的call方法,这里的t就是message,result就是转换后的字符串
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result);
}
.....
}
在parent的onnext方法中调用了func1的call方法,还记得我们在func1的call方法中写的什么,没错就是转换字符串,call方法的返回值就是转换后的字符串
final String host = "https://www.baidu.com/";
Observable<String> obs2 = obs1.map(new Func1<String, String>() {
@Override
public String call(String s) {
return host+s; //func1的call方法 进行了字符串转换,这里的s就是message
}
});
继续调用了
actual.onNext(result);
我们知道actual就是subscriberLast,所以会调用subscriberLast的onNext方法,
obs2.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
ILog.LogDebug("subscriberLast onCompleted is come in");
}
@Override
public void onError(Throwable e) {
ILog.LogDebug("subscriber onError is come in");
}
@Override
public void onNext(String s) { //subscriberLast的onNext方法
ILog.LogDebug(s);//最后会打印https://www.baidu.com/message
}
@Override
public void onStart() {
super.onStart();
}
});
在obs1中我们还调用了onCompleted方法,先调用了actual.onCompleted();
actual也就是subscriberLast实例。
@Override
public void onCompleted() {
if (done) {
return;
}
actual.onCompleted();
}
在这添加一张图方便更好的理解
简单的说就是先调用obs1的subscriber的onNext()方法,在onNext()方法中调用func1的call方法,处理数据源数据,然后再把处理过的数据源发射给obs2的subscriber
还记不记得上面的第二条代码 o.add(parent);为什么要把parent添加到o中呢,o也就是subscriberLast。
@Override
public void call(final Subscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);//第二条代码
source.unsafeSubscribe(parent);//第三条代码
}
subscriberLast的add方法最终会把parent添加到subscriberLast的SubscriptionList中,关于Subscriber请看RxJava中的Subscriber。再执行SubscriptionList的解绑方法unsubscribe会把subscriptions中的Subscription一并解绑,也就是会把parent和obs1的绑定关系解除。
那上面第三条代码为什么调用的是unsafeSubscribe方法呢,记得我们之前分析RxJava的订阅过程中看到的是最后包装了一个SafeSubscriber,再SafeSubscriber中会进行一些多线程的处理操作。
/*
* See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
* to user code from within an Observer"
*/
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
observable中unsafeSubscribe方法也很简单,也没并由对传入的subscriber进行包装而是直接调用。
public class Observable<T> {
....
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
try {
// new Subscriber so onStart it
subscriber.onStart();
// allow the hook to intercept and/or decorate
RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(RxJavaHooks.onObservableError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
RxJavaHooks.onObservableError(r);
// TODO why aren't we throwing the hook's return value.
throw r; // NOPMD
}
return Subscriptions.unsubscribed();
}
}
....
我们回过来再看MapSubscriber中其实也已经进行了简单的处理工作,所以也就不需要使用SafeSubscriber了。
至此RxJava操作符变换过程就分析完了,欢迎大家补充和纠正。