目录
- AsyncSubject
- BehaviorSubject
- PublishSubject
- ReplaySubject
- SerializedSubject
- UnicastSubject
在Rxjava中, Subject
可以同时表示Observer
和Observable
, 允许从单个源到多个子观察者multiple child Observers
。
除了 onSubscribe(io.reactivex.disposables.Disposable), onNext(Object), onError(Throwable) and onComplete()
, 其他方法都是线程安全的,可以使用toSerialized()
使它们线程安全。
Subject
有6个继承类,分别是
ReplaySubject
: 释放接收到的所有数据
BehaviorSubject
:释放订阅前最后一个数据和订阅后接收到的所有数据
PublishSubject
:释放订阅后接收到的数据
AsyncSubject
:仅释放接收到的最后一个数据
SerializedSubject
:串行Subject
UnicastSubject
:仅支持订阅一次的Subject
AsyncSubject
AsyncSubject
仅发送Observable
释放的最后一个数据,并且仅在Observable
完成之后。然而如果当Observable
因为异常而终止,AsyncSubject
将不会发送任何数据,但是会向Observer
传递一个异常通知。
A Subject that emits the very last value followed by a completion event or the received error to Observers.
public static void asyncSubject() {
asyncSubject.onNext(1);
asyncSubject.onNext(2);
asyncSubject.onComplete();
asyncSubject.onNext(3);
asyncSubject.subscribe(
integer -> {out.println("number is " + integer);},
throwable -> {out.println("error");}
);
}
// onComplete之后发送的数据无效,只有在onComplete之后才会发送最后一个数据·
// 如果不发送onComplete事件也不会释放最后一个数据。
// output number is 2
// 如果因为异常而终止,不会发送任何数据,只会传递onError事件
BehaviorSubject
BehaviorSubject
发送订阅之前的最后一个数据和订阅之后的所有数据。如果Observable
因异常终止,BehaviorSubject
将不会向后续的Observer
发送数据,但是会向Observer
传递一个异常通知。
Subject that emits the most recent item it has observed and all subsequent observed items to each subscribed Observer
// observer will receive all 4 events (including "default").
BehaviorSubject<Object> subject = BehaviorSubject.createDefault("default");
subject.subscribe(observer);
subject.onNext("one");
subject.onNext("two");
subject.onNext("three");
// observer will receive the "one", "two" and "three" events, but not "zero"
BehaviorSubject<Object> subject = BehaviorSubject.create();
subject.onNext("zero");
subject.onNext("one");
subject.subscribe(observer);
subject.onNext("two");
subject.onNext("three");
// observer will receive only onComplete
BehaviorSubject<Object> subject = BehaviorSubject.create();
subject.onNext("zero");
subject.onNext("one");
subject.onComplete();
subject.subscribe(observer);
// observer will receive only onError
BehaviorSubject<Object> subject = BehaviorSubject.create();
subject.onNext("zero");
subject.onNext("one");
subject.onError(new RuntimeException("error"));
subject.subscribe(observer);
}
PublishSubject
PublishSubject
仅会向Observer
发送在订阅之后Observable
释放的数据。
A Subject that emits (multicasts) items to currently subscribed Observers and terminal events to current or late Observers.
PublishSubject<Object> subject = PublishSubject.create();
// observer1 will receive all onNext and onComplete events
subject.subscribe(observer1);
subject.onNext("one");
subject.onNext("two");
// observer2 will only receive "three" and onComplete
subject.subscribe(observer2);
subject.onNext("three");
subject.onComplete();
ReplaySubject
该Subject会接收数据,当被订阅时,将所有接收到的数据全部发送给订阅者。
Replays events (in a configurable bounded or unbounded manner) to current and late Observers.
This subject does not have a public constructor by design; a new empty instance of this ReplaySubject can be created via the following create methods that allow specifying the retention policy for items:
ReplaySubject<Object> subject = ReplaySubject.create();
subject.onNext("one");
subject.onNext("two");
subject.onNext("three");
subject.onComplete();
// both of the following will get the onNext/onComplete calls from above
subject.subscribe(observer1);
subject.subscribe(observer2);
SerializedSubject
调用Subject的toSerialized
保证观察者和被观察者的数据安全。主要是加锁
/**
* Wraps this Subject and serializes the calls to the onSubscribe, onNext, onError and
* onComplete methods, making them thread-safe.
* <p>The method is thread-safe.
* @return the wrapped and serialized subject
*/
@NonNull
public final Subject<T> toSerialized() {
if (this instanceof SerializedSubject) {
return this;
}
return new SerializedSubject<T>(this);
}
UnicastSubject
仅支持订阅一次的Subject
,如果多个订阅者试图订阅这个Subject
,若该subject
未terminate
,将会受到IllegalStateException
,若已经terminate
,那么只会执行onError
或者onComplete
方法。
A Subject that queues up events until a single Observer subscribes to it, replays those events to it until the Observer catches up and then switches to relaying events live to this single Observer until this UnicastSubject terminates or the Observer unsubscribes.
unicastSubject.onNext(1);
unicastSubject.onNext(2);
unicastSubject.onComplete();
unicastSubject.onNext(3);
unicastSubject.subscribe(
integer -> {out.println("number is " + integer);},
throwable -> {out.println("error");}
);
unicastSubject.subscribe(
integer -> {out.println("number is " + integer);},
throwable -> {out.println("error");}
);
number is 1
number is 2
error