文章目录
- 1.定义
- 2.作用
- 3.特点
- 4.使用
- 4.1创建被观察者(Observable)
- 4.2创建观察者(Observer)
- 4.3订阅(Subscribe)
- 4.4Dispose
- 5.操作符
- 5.1操作符类型
- 5.2just操作符
- 5.2链式调用
- 5.3 fromArray操作符
- 5.4 fromIterable操作符
- 5.5map操作符
- 5.6flatMap操作符
- 5.7concatMap操作符
- 5.8buffer操作符
- 5.9concat操作符
- 6.异步
- 7.subscribeOn
- 8.observeOn
- 9.背压
- 9.1Flowable
- 9.2背压策略
- 9.3另一种调用背压策略的方式
- 10.RxBus
- 11.RxBinding
- 12.内存泄露
1.定义
RxJava在GitHub的介绍
RxJava:a library for composing asynchronous and event-based programs using observable sequences for the Java VM
// 翻译:RxJava 是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库
也就是说:RxJava是一个基于事件流,实现异步操作的库
2.作用
类似于Android中的AsyncTask,Handler作用用于实现异步操作
3.特点
由于RxJava的使用方式是:基于事件流的链式调用,所以使得RxJava:
- 逻辑简单
- 实现优雅
- 使用简单
RxJava原理:基于一种扩展的观察者模式
RxJava的扩展观察者模式中有4个角色:
角色 | 作用 |
---|---|
被观察者(Observable) | 产生事件 |
观察者(Observer) | 接收事件,并给出响应动作 |
订阅(Subscribe) | 连接被观察者&观察者 |
事件(Event) | 被观察者&观察者沟通的载体 |
可以总结为:被观察者(Observable)通过订阅(Subscribe)按顺序发送事件给观察者(Observer),观察者(Observer)按顺序接收事件&作出对应的响应动作
4.使用
添加依赖:
implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
4.1创建被观察者(Observable)
val ohThisIsObservable = Observable.create<String>{
it.onNext("Hello") //发送"事件"
it.onNext("rx")
it.onNext("world")
it.onComplete() //发送完成"事件"
}
这里采用了create()创建被观察者,但并非只有create()能创建,其余操作符也可以达成此效果(后面介绍)。
4.2创建观察者(Observer)
val observer: Observer<String> = object : Observer<String> {
override fun onSubscribe(d: Disposable) { System.out.println(" onSubscribe ") }
override fun onNext(string: String) { System.out.println(" onNext : "+string) }
override fun onError(e: Throwable) { System.out.println(e) }
override fun onComplete() { System.out.println(" on Complete ") }
}
可看见这里响应事件分别有以下:
onSubscribe():准备监听,最先调用的方法;
onNext():用来发送数据,调用一次发送一条;
onError():发送异常通知,只发送一次,多次调用也只会发送第一条;
onComplete():发送完成通知,只发送一次,多次调用也只会发送第一条。
PS:onError()和onComplete()互斥,俩方法同时只能调用一个,要么发生异常onError()不会回调onComplete(),要么正常回调onComplete(),不回调onError()。
4.3订阅(Subscribe)
ohThisIsObservable.subscribe(observer)
运行代码,会发现如下结果
日志中可发现,当被观察者(ohThisIsObservable)通过调用onNext()发射数据的时候,观察者(observer)调用onNext()接收数据;当被观察者(ohThisIsObservable)调用onComplete()时,观察者(observer)调用onComplete(),其他事件将不会继续发送(onError同此理)。
RxJava中,观察者不仅仅只有observer才能实现,下面是个简单版示例:
val consumer: Consumer<String> =
Consumer { s ->
//创建观察者consumer
println(s)
}
val stringObservable = Observable.create { emitter ->
emitter.onNext("Hello")
emitter.onNext("~~~rx~~~")
emitter.onNext("world")
emitter.onComplete()
}
//被观察者发出一连串字符并指定consumer订阅被观察者
stringObservable.subscribe(consumer)
对应输出结果如图:
由以上代码可见,Observer相对于Consumer在接口方法上要多onSubscribe、onNext、onError、onComplete这些接口,在一次事件中,可操作程度更精细。
4.4Dispose
在onSubscribe()中会接收到一个Disposable对象,该对象相当于一个开关,如果开关关闭,则观察者不会收到任何事件和数据。例如:
val observer: Observer<String> = object : Observer<String> {
var mDisposeable: Disposable? = null
override fun onSubscribe(d: Disposable) {
println(" onSubscribe ")
mDisposeable = d
}
override fun onNext(s: String) {
println(" onNext : $s")
if (s == "stop") {
mDisposeable!!.dispose()
}
}
override fun onError(e: Throwable) {
println(" onError ")
}
override fun onComplete() {
println(" onComplete ")
}
}
Observable.just("Hello", "world", "stop", "coding").subscribe(observer)
在上述代码中我们使用一个变量来保存Disposable对象,在onNext方法中如果传过来的字符串是“stop”,则调用dispose关闭事件的接收,后续字符串不在发射,甚至onComplete()也不会执行了。结果如下图:
5.操作符
Rxjava提供大量操作符来完成对数据处理,这些操作符也可以理解成函数。如果把Rxjava比喻成一道数据流水线,那么一个操作符就是一道工序,数据通过这些工序加工变换、组装,最后生产出我们想要的数据。
5.1操作符类型
创建型
转换型
组合型
功能型
过滤型
条件型
5.2just操作符
用于创建一个被观察者,并发送事件,发送的事件不可以超过10个以上(从其构造函数就可以看出,如下图):
简单写个示例:
val justObservable = Observable.just("Hello", "rx", "world~!")
val observer: Observer<String> = object : Observer<String> {
override fun onSubscribe(d: Disposable) { System.out.println(" onSubscribe ") }
override fun onNext(string: String) { System.out.println(" onNext : "+string) }
override fun onError(e: Throwable) { System.out.println(e) }
override fun onComplete() { System.out.println(" on Complete ") }
}
justObservable.subscribe(observer)
对应输出结果为:
5.2链式调用
RxJava最方便的一个特征就是链式调用,上述代码可以修改为:
Observable.just("Hello", "rx", "world").subscribe(object : Observer<String> {
override fun onSubscribe(d: Disposable) { System.out.println(" onSubscribe ") }
override fun onNext(string: String) { System.out.println(" onNext : "+string) }
override fun onError(e: Throwable) { System.out.println(e) }
override fun onComplete() { System.out.println(" on Complete ") }
})
效果一样(Java代码在这里的表现形式则是lamba表达式),但跟之前看起来给人感觉完全不一样,如无特殊说明,后续例子都会如此调用。
5.3 fromArray操作符
类似于just,但是可以传入无限个参数,无数量限制
5.4 fromIterable操作符
可直接传一个List给观察者发射(List extends Collection接口,而Collection extends Iterable接口,所以可以直接传进去)。例如:
val arrayList = ArrayList<String>()
arrayList.add("111")
arrayList.add("222")
Observable.fromIterable(arrayList).subscribe(object : Observer<String> {
override fun onSubscribe(d: Disposable) { System.out.println(" onSubscribe ") }
override fun onNext(string: String) { System.out.println(" onNext : "+string) }
override fun onError(e: Throwable) { System.out.println(e) }
override fun onComplete() { System.out.println(" on Complete ") }
})
对应结果:
5.5map操作符
map操作符能直接对发射出来的事件进行处理并且产生新的事件,然后再次发射。例如下述例子:
Observable.just("Hello").map<Any> { "get it!" }
.subscribe(object : Observer<Any> {
override fun onSubscribe(d: Disposable) {
println(" onSubscribe ")
}
override fun onNext(o: Any) {
println(" onNext : "+o)
}
override fun onError(e: Throwable) {
println(" onError ")
}
override fun onComplete() {
println(" onComplete ")
}
})
这里我们本来传入参数是"Hello",通过map()拦截后发射出去的参数变成了"get it!",拦截修改成功。
5.6flatMap操作符
flat,英语翻译过来的意思是“使变平”的意思,跟map()一样,都能直接对发射出来的事件进行处理并且产生新的事件。但其内部方法参数不同。二者都是传参进Function()中并在apply()中进行数据修改,但二者传入参数不同。
map()是两个泛型,而flatMap()第二个参数填Observable被观察者,再将这个被观察者发射出去,这一下灵活度就增大了,这也是网络请求场景中最常用的操作符。下述简单示例:
Observable.just("注册").flatMap<Any> { s ->
println(s + "成功")
Observable.just("进行登陆")
}.subscribe(object : Observer<Any> {
override fun onSubscribe(d: Disposable) {
println(" onSubscribe ")
}
override fun onNext(o: Any) {
println(" onNext :"+o)
}
override fun onError(e: Throwable) {
println(" onError ")
}
override fun onComplete() {
println(" onComplete ")
}
})
对应的日志打印
5.7concatMap操作符
concatMap()与flatMap()使用方式完全一致,基本上是一样的。不过,concatMap()转发出来的数据是有序的,而flatMap()是无序的。
5.8buffer操作符
buffer()有多参数方法,这里介绍最常用的,单参数形式,buffer(x):根据个数来缓冲,每次缓冲x个数转换成数组,再发射出去,例如:
Observable.just("1","2","3","4","5","8","9","7","6","10")
.buffer(3)
.subscribe(object : Observer<Any> {
override fun onSubscribe(d: Disposable) {
println(" onSubscribe ")
}
override fun onNext(o: Any) {
println(" onNext :"+o)
}
override fun onError(e: Throwable) {
println(" onError ")
}
override fun onComplete() {
println(" onComplete ")
}
})
对应的输出结果为:
5.9concat操作符
可以将多个观察者组合在一起,然后按照之前发送顺序发送事件。需要注意的是,concat() 最多只可以发送4个事件。
示例如下:
Observable.concat(Observable.just("111"),Observable.just("222")).subscribe ( object : Observer<Any>{
override fun onSubscribe(d: Disposable) {
println(" onSubscribe ")
}
override fun onNext(t: Any) {
println(" onNext : "+t)
}
override fun onError(e: Throwable) {
println(" onError ")
}
override fun onComplete() {
println(" onComplete ")
}
} )
对应输出结果为:
concatArray()和concat()作用一样,不过concatArray()可以发送多于4个被观察者。
6.异步
RxJava提供了非常方便的API来完成线程的调度,内置的线程调度器有以下几个:
- Schedule.single():单线程调度器,线程可复用;
- Schedule.newThread():为每个任务创建新的线程;
- Schedule.io():处理I/O密集任务,内部线程池实现,可根据需求增长;
- Schedulers.computation():处理计算任务,如事件循环和回调任务; Schedulers.immediate():默认指定的线程,也就是当前线程; AndroidSchedulers.mainThread():Android主线程调度器,属于RxAndroid。
线程调度器实际上是指派事件在什么样的线程中处理,所需应用场景就不难想象了,如果该事件是耗时操作,比如网络请求,但相应结果会先是在UI中,这时候在主线程执行网络请求就不合适了,但在子线程执行,结果同样要刷新UI,也不太合适,这里就凸显自由切换线程的好处了。Rxjava可通过调度器来制定被观察者和观察者分别可以在什么线程中执行自己的代码,而指定调度器的API则是:subscribeOn和observeOn。
7.subscribeOn
首先,我们不用线程调度器,我们先看观察者和被观察者默认情况下在什么线程中执行自己代码,如下:
Observable.create(object : ObservableOnSubscribe<Any>{
override fun subscribe(emitter: ObservableEmitter<Any>) {
println(" subscribe : "+ Thread.currentThread())
emitter.onNext(" guess wich thread ")
emitter.onComplete()
}
}).subscribe ( object : Observer<Any>{
override fun onSubscribe(d: Disposable) {
println(" onSubscribe : "+ Thread.currentThread())
}
override fun onNext(t: Any) {
println(" onNext : "+t+" : "+Thread.currentThread())
}
override fun onError(e: Throwable) {
println(" onError : "+ Thread.currentThread())
}
override fun onComplete() {
println(" onComplete : "+ Thread.currentThread())
}
} )
对应的结果为:
可见默认情况下,观察者和被观察者都是在主线程中执行。假设这个时候要执行耗时操作,Android程序必定崩溃,所以我们这时要切换线程。
subscribeOn()实际上是指定被观察者的代码在哪个线程中执行。例如:
Observable.create(object : ObservableOnSubscribe<Any>{
override fun subscribe(emitter: ObservableEmitter<Any>) {
println(" subscribe : "+ Thread.currentThread())
emitter.onNext(" guess wich thread ")
emitter.onComplete()
}
}).subscribeOn(Schedulers.newThread()) //决定执行subscribe方法所处的线程,也就是产生事件或发射事件所处的线程
.subscribe ( object : Observer<Any>{
override fun onSubscribe(d: Disposable) {
println(" onSubscribe : "+ Thread.currentThread())
}
override fun onNext(t: Any) {
println(" onNext : "+t+" : "+Thread.currentThread())
}
override fun onError(e: Throwable) {
println(" onError : "+ Thread.currentThread())
}
override fun onComplete() {
println(" onComplete : "+ Thread.currentThread())
}
} )
这段代码中采用subscribeOn(Schedulers.newThread())来指定在新建线程中执行:
这时运行得到结果:
可见日志是不对的,onNext()、onComplete()都没有打印。原因很简单,我们在主线程创建观察者和被观察者之后,事件发送的执行转交给调度器Schedulers.newThread(),还没等来得及新线程发送出事件,主线程就直接退出了,所以后续日志看不到,鉴于此,我们使主线程休眠sleep2秒,在上述方法的后面调用如下代码:
try {
Thread.sleep(2000) //这里sleep延时主线程
} catch (e: InterruptedException) {
e.printStackTrace()
}
输出结果为:
8.observeOn
observeOn()指定后续的操作符以及观察者的代码在什么样的线程中执行。且observeOn()可以多次被调用,每次调用都生效。
Observable.create(object : ObservableOnSubscribe<Any> {
override fun subscribe(emitter: ObservableEmitter<Any>) {
Log.i("rxdemo", " subscribe : " + Thread.currentThread())
emitter.onNext(" guess wich thread ")
emitter.onComplete()
}
})
.subscribeOn(Schedulers.io()) //决定执行subscribe方法所处的线程,也就是产生事件或发射事件所处的线程
.observeOn(AndroidSchedulers.mainThread()) //决定下游事件被处理时所处的线程
.subscribe(object : Observer<Any> {
override fun onSubscribe(d: Disposable) {
Log.i("rxdemo", " onSubscribe : " + Thread.currentThread())
}
override fun onNext(t: Any) {
Log.i("rxdemo", " onNext : " + t + " : " + Thread.currentThread())
}
override fun onError(e: Throwable) {
Log.i("rxdemo", " onError : " + Thread.currentThread())
}
override fun onComplete() {
Log.i("rxdemo", " onComplete : " + Thread.currentThread())
}
})
对应输出结果为:
9.背压
这个词是从backpressure直译过来,背压即来自背部的压力,指当被观察者发出很多的数据或事件时,观察者来不及处理,都积压在那,压的观察者喘不过气,有时候还会导致OOM。
如下述代码:
Observable.create(object : ObservableOnSubscribe<Any> {
override fun subscribe(emitter: ObservableEmitter<Any>) {
while (true){
emitter.onNext(" subscribe : Hello ")
}
}
})
.subscribeOn(Schedulers.io()) //被观察者在I/O线程执行
.observeOn(Schedulers.newThread()) //观察者在新线程执行
.subscribe { //Consumer
Thread.sleep(9000);
Log.i("rxdemo"," accept ~");
}
观察者和被观察者在不同线程中执行,被观察者是个死循环不停发射,同时观察者处理数据的速度放缓一些,休眠9秒处理一次。这时我们可以在Profiler中可以看到:
内存随时间可见的上升,这种情况如果不处理,很大概率可能会出现OOM。究其原因是因为发送数据方和接收数据方不在一个线程内,两个线程步调不一致,发送数据太多处理不来就缓存起来,直到内存用完,这就是背压。针对背压,Rxjava提供了支持背压处理的观察者和被观察者,即Flowable和Subscriber。
9.1Flowable
Flowable是Observable(观察者)的一种新实现,但Flowable额外实现了非阻塞式背压策略。同时,用Flowable的时候观察者变为Subscriber。例如下面示例:
Flowable.create(
{ emitter ->
Log.d("rxdemo", "send 1")
emitter.onNext(1)
Log.d("rxdemo", "send 2")
emitter.onNext(2)
Log.d("rxdemo", "finish")
emitter.onComplete()
},
BackpressureStrategy.ERROR
).subscribe(object : Subscriber<Int> {
override fun onSubscribe(s: Subscription) {
Log.d("rxdemo", "onSubscribe")
s.request(2)
}
override fun onNext(integer: Int) {
Log.d("rxdemo", "get the $integer")
}
override fun onError(t: Throwable) {
Log.w("rxdemo", "onError: ", t)
}
override fun onComplete() {
Log.d("rxdemo", "onComplete")
}
})
对应输出结果为:
看到这里,你会对两个地方产生疑问,一个是onSubscribe()中的s.request(2),这里是向观察者请求处理2条数据的意思,如果没有这行代码,则我们不请求处理数据,程序则会触发这里的背压策略:BackpressureStrategy.ERROR,直接报错。当然,背压策略不仅这一个,还有其余几个:
9.2背压策略
·BackpressureStrategy.ERROR:直接抛出MissingBackpressureException异常;
·BackpressureStratery.MISSING:不使用背压,没有缓存,仅提示:缓存区满了
·BackpressureStratery.BUFFER:缓存所有数据,直到观察者处理,如果观察者处理不及时也会出现OOM,被观察者可无限发送事件,但实际上是放在缓存区。
·BackpressureStratery.DROP:丢弃超过缓存区大小(128)的数据
·BackpressureStratery.LATEST:只保存最新的最后的事件,超过缓存区大小(128)时用新数据覆盖老数据。
到此,我们可以总结下,背压的出现是为了解决两个方面主要问题:
· 当发送数据速度 > 接受数据速度,数据堆叠缓存会撑满;
· 当缓存区大小存满,被观察者继续发送下一个事件时(还是相当于撑爆了缓存区)
到这里你会发现,这还是个缓存区问题,那么这个缓存区是否就是128呢?我们可以通过Flowable.bufferSize()来获取缓存的大小,例如:
Flowable.create(
{ emitter ->
//发送128个Hello buffer
for (i in 0 until Flowable.bufferSize()) {
Log.d("rxdemo", "Hello buffer $i")
emitter.onNext("Hello buffer $i")
}
},
BackpressureStrategy.ERROR
).subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(object : Subscriber<String> {
override fun onSubscribe(s: Subscription) {
Log.d("rxdemo", "onSubscribe")
}
override fun onNext(str: String) {
Log.d("rxdemo", "get the $str")
}
override fun onError(t: Throwable) {
Log.w("rxdemo", "onError: ", t)
}
override fun onComplete() {
Log.d("rxdemo", "onComplete")
}
})
对应的日志输出为:
由日志不难看出其发挥大小为128,也就是默认缓存数据为128个,上述代码发出了128个Hello buffer。如果这个时候我们多发出来一个会怎样?修改下for循环条件i in 0 until Flowable.bufferSize()+1。最后会得到结果:
毫无意外,Subscriber并没有请求处理数据,缓存已经爆满,外加配置的背压策略为BackpressureStrategy.ERROR,所以这里会在缓存撑爆的情况下通知Subscriber发生错误,调用ERROR,打印MissingBackpressureException。
9.3另一种调用背压策略的方式
看到这里你可能会想,如果不使用create方法创建Flowable,而是用range、interval这些操作符创建,那如何配置策略?对此,Rxjava提供了对应的方法来匹配相应的背压策略:onBackpressureBuffer()、onBackpressureDrop()、onBackpressureLatest()(看名字就知道对应的策略啦),例如:
Flowable.range(0,100)
.onBackpressureLatest()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(object : Subscriber<Int> {
override fun onSubscribe(s: Subscription) {
Log.d("rxdemo", "onSubscribe")
}
override fun onNext(num: Int) {
Log.d("rxdemo", "get the $num")
}
override fun onError(t: Throwable) {
Log.w("rxdemo", "onError: ", t)
}
override fun onComplete() {
Log.d("rxdemo", "onComplete")
}
})
其实,到这里你会发现Rxjava的强大之处,能随意切换线程,跟retrofit结合做网络请求框架,能用timer做定时操作,用interval做周期性操作,甚至进行数组、list的遍历等。
10.RxBus
一种基于RxJava实现事件总线的一种思想,可完美替代EventBus,相关代码参考
11.RxBinding
主要与RxJava结合用于一些View的事件绑定,相关代码参考
12.内存泄露
Rxjava使用不当会造成内存泄露,在页面销毁后,Observable仍然还有事件等待发送和处理(比如interval做周期性操作而没有停下来),这个时候会导致Activity回收失败,从而致使内存泄露。
解决办法:
·使用Disposable,关闭页面时调用dispose()取消订阅;
·使用CompositeDisposable,添加一组Disposable,在关闭页面时同时取消订阅。
也可以将其与Activity基类生命周期进行绑定,在销毁时取消订阅。