RxJava 其实就是提供一套异步编程的 API,这套 API 是基于观察者模式的,而且是链式调用的,所以使用 RxJava 编写的代码的逻辑会非常简洁。
RxJava 有以下三个基本的元素:
- 被观察者(Observable)
- 观察者(Observer)
- 订阅(subscribe)
下面来说说以上三者是如何协作的:
被观察者发送的事件有以下几种,总结如下表:
移入Rx2Java2jar包,该jar包让我们异步编程变得更加简单 <dependency> <groupId>io.reactivex.rxjava2</groupId> <artifactId>rxjava</artifactId> <version>2.2.21</version> </dependency>
撸点代码运行看看概况:
package org.jd.data.netty.big.window.chat.reactor; import com.sun.istack.internal.NotNull; import io.reactivex.Observable; import io.reactivex.ObservableEmitter; import io.reactivex.ObservableOnSubscribe; import io.reactivex.functions.Consumer; import io.reactivex.schedulers.Schedulers; import java.util.concurrent.TimeUnit; /** * Cold的生产者一个对应多个消费者: * 消息发送者与各个订阅者之间是独立的,并不是共享生产者发送的消息, * 而是单独给每个订阅者从新参数,所有说,订阅者收到的消息不是相同的 * 只有观察者订阅了,才开始发送数据 */ public class ColdObservableOperation { // 创建一个订阅者 static Consumer<Long> subscriber1 = new Consumer<Long>() { @Override public void accept(Long aLong) { System.out.println(" 订阅者:subscriber1 receive message : " + aLong); } }; // 创建一个订阅者 static Consumer<Long> subscriber2 = new Consumer<Long>() { @Override public void accept(Long message) throws Exception { System.out.println(" 订阅者:subscriber2 receive message : " + message); } }; public static void main(String args[]) { // 被观察者 Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() { // 被观察者发射器 @Override public void subscribe(@NotNull ObservableEmitter<Long> observableEmitter) throws Exception { Observable.interval(10, TimeUnit.MILLISECONDS, Schedulers.computation()) .take(Integer.MAX_VALUE) .subscribe(observableEmitter::onNext); // 每个10毫秒调用发送一次数据 } }).observeOn(Schedulers.newThread()); observable.subscribe(subscriber1); observable.subscribe(subscriber2); try { // 主线程等100毫秒 Thread.sleep(10000000L); } catch (InterruptedException e) { e.printStackTrace(); } } }
package org.jd.data.netty.big.window.chat.reactor; import io.reactivex.annotations.NonNull; import io.reactivex.Observable; import io.reactivex.Observer; import io.reactivex.disposables.Disposable; /** * 在RxJava中,被观察者,观察者,Subscribe()三者缺一不可 * 只有使用了subscribe(),被观察者才会开始发送数据,这一点比较重要 */ public class RxJava2ReactiveHelloWorld { /** * 观察输出的顺序 * @param args */ public static void main(String args[]) { // 被观察者 Observable.just("你好,杨哥欢迎来到RxJava2的异步编程世界,这里你将学到关于RxJava2的编程思想!") .subscribe(new Observer<String>() { // 该观察者有四个方法 @Override public void onSubscribe(@NonNull Disposable disposable) { System.out.println("onSubscribe"); } @Override public void onNext(@NonNull String s) { System.out.println(s); } @Override public void onError(@NonNull Throwable throwable) { System.out.println("onError"); } @Override public void onComplete() { System.out.println("onComplete"); } }); } }
package org.jd.data.netty.big.window.chat.reactor; import io.reactivex.Observable; import io.reactivex.ObservableEmitter; import io.reactivex.ObservableOnSubscribe; import io.reactivex.functions.Consumer; /** * RxJava2入门HelloWord版本 */ public class RxJava2HelloWorld { public static void main(String args[]) { simpleLambdaObservable(); System.out.println("===========================Lambda版本==================================="); primitiveRxJava2Observable(); System.out.println("=============================简化版本================================="); simpleSimplifyRxJava2Observable(); } /** * Lambda表达式简化版本 */ private static void simpleLambdaObservable() { Observable.create((ObservableOnSubscribe<String>) observableEmitter -> observableEmitter.onNext("Hello World!")) .subscribe(System.out::println); // 订阅发射器发射的内容 } /** * 原始版本,便于理解 */ private static void primitiveRxJava2Observable() { Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception { observableEmitter.onNext("Hello World!"); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println(s); } }); } /** * 简化的版本: * RxJava是一种新的编程思想,为我们异步编程由此变得更加简单 */ public static void simpleSimplifyRxJava2Observable() { Observable.just("Hello World !").subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { System.err.println(s); } }); } }