Android开发进阶——RxJava核心架构分析

news2025/1/15 22:41:36

简介

RxJava是对响应式扩展( Reactive Extensions,称之为 ReactiveX )规范的Java 实现,该规范还有其他语言实现:RxJS、Rx.Net、RxScala、RxSwift等等(也即,ReactiveX 定义了规范,其他语言实现规范即可,所以我们这里学习RxJava的架构和设计思维,只需研究ReactiveX 即可)。RxJava是一个通过使用可观察序列来组合异步操作(也即观察者模式,观察序列表示一组观察者,后面会详细介绍),并且基于事件驱动的Java库。它基于观察者模式并扩展了支持数据/事件序列的功能,添加了很多在数据转换时使用的操作符(比如:map、flat等等,像不像Java 8的Stream流式编程)。同时,RxJava 抽象了底层线程模型实现、线程安全操作的实现,让使用方不需要关心底层实现,专注于对业务的处理。

原理图解

Rxjava的核心思路被总结在了图中,本文分为两部分,第一部分讲图中的三条流和事件传递,第二部分讲线程切换的原理,下面进入正题。

响应式编程

响应式编程是一种基于异步数据流概念的编程模式;数据/事件就像一条河流,从源头一直往下流,在流动过程中,可以被观测、被过滤、被操作,或者与另一条流合并成一条新的流,最终流向大海被消费掉;

与响应式编程相对应的有同步式编程、异步式编程:

  • 同步式编程:比如我们在主线程上请求一个网络接口,一直等到返回结果才能继续执行下一步,这就是同步式的
  • 异步式编程:开启一个子线程去请求网络接口,主线程继续执行,然后定时去查询接口返回的结果
  • 响应式编程:开启一个子线程去请求网络接口,注册监听后主线程继续执行,网络接口返回数据后,主动回调注册的监听方法,从而达到响应的目的

RxJava可以简单理解为就是观察者模式+异步处理+链式调用(流的概念)

Rxjava需要达成的共识两种设计模式

观察者模式:实现响应式编程的基础

装饰器模式:各种操作符的具体实现类都通过装饰器模式类拓展完成

Rxjava核心框架核心部分

  • ObservableSource : 被观察者的顶层接口,提供订阅subsccribe()方法
  • Observable: 被观察者抽象类,实现ObservableSource的接口,并提供实际订阅的抽象方法。
  • Observer : 观察者接口,提供处理事件的回调方法。
  • ObservableOnSubscribe:被观察者与事件解耦的接口
  • Emitter : 事件发射的接口,提供发射事件的方法。
  • ObservableXXX: 具体的被观察者实现类,持有ObservableOnSubscribe接口的引用
  • XXXEmitter : 事件发射器具体实现,持有观察者引用。
  • XXXObserver : 具体观察者的实现类。
  • AbstractObservableWithUpStream: 被观察者的抽象装饰类,持有了顶层接口的引用,都是通过继承该抽象类来实现各种操作符的被观察者 。

源码分析create()操作符

private static void testCreate() {
        Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Throwable {
                for (int i = 0; i < 10; i++) {
                    System.out.println("emitter发射value数据:" + i);
                    emitter.onNext("value=" + i);
                }
                emitter.onComplete();
            }
        }).subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Throwable {
                System.out.println(o);
            }
        });
}

Observable & ObservableSource & ObservableOnSubscribe等

  • ObservableSource : 被观察者的顶层接口,提供订阅subsccribe()方法
  • Observable: 被观察者抽象类,实现ObservableSource的接口,并提供实际订阅的抽象方法。
  • ObservableOnSubscribe :被观察者与事件发送解耦的接口
  • Observer: 观察者接口,提供处理事件的回调方法。可以在此接口的onSubscribe()函数来控制被观察者的事件发送后,观察者能否被消费
  • ObservableXXX: 具体的被观察者实现类,持有
  • ObservableOnSubscribe接口的引用
Observable.java
​
​
Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Throwable {
                for (int i = 0; i < 10; i++) {
                    System.out.println("步骤一 :emitter发射value数据:" + i);
                    emitter.onNext("value=" + i);
                }
                emitter.onComplete();
            }
        }).subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Throwable {
                System.out.println("步骤二消费事件:" + o);
            }
});
​
​
//  ObservableOnSubscribe:被观察者与事件解耦的接口
public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
        Objects.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}
​
​
ObservableCreate.java 
​
// 1: 构造函数保存 ObservableOnSubscribe
public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
}
​
​
@Override
protected void subscribeActual(Observer<? super T> observer) {
     CreateEmitter<T> parent = new CreateEmitter<>(observer);
​
     // 2: 先回调Observer的onSubscribe()函数
     observer.onSubscribe(parent);
     
     try {
     
         // 3:  ObservableOnSubscribe再 发射事件
         source.subscribe(parent);
     
     } catch (Throwable ex) {
         Exceptions.throwIfFatal(ex);
         parent.onError(ex);
     }
}
  • Emitter : 事件发射的接口,提供发射事件的方法。
  • ObservableXXX: 具体的被观察者实现类,持有ObservableOnSubscribe接口的引用
  • XXXEmitter : 事件发射器具体实现,持有观察者引用。
  • XXXObserver : 具体观察者的实现类。
ObservableCreate.java
​
static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
​
        private static final long serialVersionUID = -3434801548987643227L;
     
        final Observer<? super T> observer;
     
        // 1: CreateEmitter 持有下游 Observer的引用
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
     
        @Override
        public void onNext(T t) {
            if (t == null) {
     
                onError(ExceptionHelper.createNullPointerException("onNext called with a 
                      null value."));
     
                return;
            }
     
            // 2: 根据下游的 Observer 的onSubscribe()函数判断是否取消发射,决定Observer是 
              否调用 observer.onNext函数
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
     
        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }
​
 
​
​
 Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Throwable {
                for (int i = 0; i < 10; i++) {
                    System.out.println("步骤一 :emitter发射value数据:" + i);
​
                    // 0 : 上游被观察者传递过来的 emitter
                    emitter.onNext("value=" + i);
                }
                emitter.onComplete();
            }
        }).subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Throwable {
                System.out.println("步骤二消费事件:" + o);
            }
        });

map()操作符源码分析

 /**
     *  直接对发射出来的事件进行处理并且产生新的事件,然后再次发射
     */
    private static void testMap() {
        Observable.just("aaa")
                .map(new Function<String, Object>() {
                    @Override
                    public Object apply(String s) throws Throwable {
                        System.out.println("步骤二: "+"事件转换之后再次发射");
                        return s+" + bbb";
                    }
                }).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("步骤一:"+ "use Subscribe connect Observable and Observer");
            }
​
            @Override
            public void onNext(Object o) {
                System.out.println("步骤三: "+"Next event:" + o + " response");
            }
     
            @Override
            public void onError(Throwable e) {
     
            }
     
            @Override
            public void onComplete() {
     
            }
        });
    }

Observable.just() 生产:ObservableJust类,并在发射事件时调用subscribeAcutal函数

ObservableJust.java
​
public final class ObservableJust<T> extends Observable<T> implements ScalarSupplier<T> {
​
    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }
     
    // Observer 是下游的最后一个 observer
     
    // subscribeActual 是 抽象类 Observable的具体实现类ObservableJust,它会在
   // Observable的 subscribe()中被回调
​
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        ScalarDisposable<T> sd = new ScalarDisposable<>(observer, value);
        observer.onSubscribe(sd);
        sd.run();
    }
     
    @Override
    public T get() {
        return value;
    }
}
​
 
​
Observable.java
​
public final void subscribe(@NonNull Observer<? super T> observer) {
        Objects.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);
            
            // 利用java多态的特性,直接调用 ObservableJust.java中的 subscribeActual
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);
     
            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

被观察者通过 ObservableScalarXMap.ScalarDisposable.run()发射事件

ObservableScalarXMap.ScalarDisposable.java

@Override
public void run() {
     if (get() == START && compareAndSet(START, ON_NEXT)) {
​
           // 持有下游的 Observer的引用,直接消费事件
           observer.onNext(value);
           if (get() == ON_NEXT) {
                 lazySet(ON_COMPLETE);
                 observer.onComplete();
           }
      }
}

在Android开发中rxjava部分是非常重要的;想要更深入学习或者更多Android核心技术,可以参考《Android核心技术手册》点击查看里面上千个技术知识。

文末

1)RxJava

有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。

2)与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()。

3)onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。

4)onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。

5)在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/273638.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

pip安装时报错 ascii‘ codec can‘t decode byte 0xe2 in position...

在使用pip安装包的时候报错ascii’ codec can’t decode byte 0xe2 in position… 报错信息 UnicodeDecodeError: ‘ascii’ codec can’t decode byte 0xe2 in position 1429: ordinal not in range(128) 以前好像见过类似的情况&#xff0c;所以搜了一下怎么修改python默认…

聊聊ChatGPT

最近ChatGPT火出圈。 在过去三个月里&#xff0c;眼见着它的热度火箭一般蹿升&#xff0c;据瑞银上周三发布的报告显示&#xff0c;GPT已经超过了Tiktok&#xff0c;成为人类有史以来最快突破1亿月活跃用户的互联网产品。Tiktok当初用了9个月&#xff0c;而GPT只用了2个月。 …

Spring使用了哪些设计模式?

目录Spring中涉及的设计模式总结1.简单工厂(非23种设计模式中的一种)2.工厂方法3.单例模式4.适配器模式5.装饰器模式6.代理模式7.观察者模式8.策略模式9.模版方法模式Spring中涉及的设计模式总结 1.简单工厂(非23种设计模式中的一种) 实现方式&#xff1a; BeanFactory。Spri…

go-grpc的使用和学习

文章目录基础知识&#xff1a;操作流程安装proto文件配置grpc&#xff1a; 正常客户端发送数据(以字节流的方式)&#xff0c;服务器接受并解析&#xff0c;根据约定知道要执行什么&#xff0c;然后把结果返回给客户 rpc将上述过程封装&#xff0c;使其操作更加优化&#xff0c;…

Vue 3 中的极致防抖/节流(含常见方式防抖/节流)

各位朋友你们好呀。今天是立春&#xff0c;明天就是正月十五元宵节了&#xff0c;这种立春 元宵相隔的时候&#xff0c;可是很难遇到的&#xff0c;百年中就只有几次。在这提前祝大家元宵快乐。 今天给大家带来的是Vue 3 中的极致防抖/节流&#xff08;含常见方式防抖/节流&a…

ChatGPT给程序员人手一个,这很朋克

目录ChatGPT、程序员、朋克为什么程序员需要ChatGPT&#xff0c;为什么这很朋克总结ChatGPT、程序员、朋克 本文由ChatGPT编写。 ChatGPT是由OpenAI开发的大型语言模型。它的核心功能是生成人类语言文本&#xff0c;因此有多种应用场景&#xff0c;如文本生成、对话生成、文本…

「VUE架构」Vue2与Vue3的区别

文章目录前言一、性能比Vue2快1.2~2倍1.1 diff算法优化1.2 事件侦听缓存1.3 减少创建组件实例的开销二、 按需编译&#xff0c;体积比Vue2更小三、 Compostion API四、 支持TS五、 自定义渲染API六、更先进的组件七、 更快的开发体验前言 VUE是一套用于构建用户界面的渐进式框…

Nginx常用功能举例解析

Nginx提供的基本功能服务从大体上归纳为"基本HTTP服务"、“高级HTTP服务”和"邮件服务"等三大类。基本HTTP服务Nginx可以提供基本HTTP服务&#xff0c;可以作为HTTP代理服务器和反向代理服务器&#xff0c;支持通过缓存加速访问&#xff0c;可以完成简单的…

【FPGA】Verilog:组合逻辑电路应用 | 数码管 | 8421BCD编码 | 转换七段数码管段码

前言&#xff1a;本章内容主要是演示Vivado下利用Verilog语言进行电路设计、仿真、综合和下载 示例&#xff1a;数码管的使用 功能特性&#xff1a; 采用 Xilinx Artix-7 XC7A35T芯片 配置方式&#xff1a;USB-JTAG/SPI Flash 高达100MHz 的内部时钟速度 存储器&#xff1a;2M…

Vue-VueRouter

前言 Vue Router 是 Vue.js (opens new window)官方的路由管理器。它和 Vue.js 的核心深度集成&#xff0c;让构建单页面应用变得易如反掌。包含的功能有&#xff1a; 嵌套的路由/视图表模块化的、基于组件的路由配置路由参数、查询、通配符基于 Vue.js 过渡系统的视图过渡效果…

SPSS聚类分析(含k-均值聚类,系统聚类和二阶聚类)

本篇博客主要是根据1、聚类的基本知识点_哔哩哔哩_bilibili系列视频进行的学习记录一、SPSS聚类分析的基本知识点1、什么是聚类分析?聚类分析(Cluster analysis)又叫做群集分析,通过一些属性将对象或变量分成不同的组别&#xff0c;在同一类下的对象或变量在这些属性上具有一些…

最全面的SpringBoot教程(四)——数据库连接

前言 本文为 最全面的SpringBoot教程&#xff08;四&#xff09;——数据库连接 相关知识&#xff0c;下边将对JDBC连接配置&#xff0c;与使用Druid数据源&#xff0c;从添加依赖到修改配置项再到测试进行详尽介绍~ &#x1f4cc;博主主页&#xff1a;小新要变强 的主页 &…

C语言深度解剖-关键字(5)

目录 if else 组合 if else 的基本用法 注释 深入理解bool float(double)与浮点数的比较 写在最后&#xff1a; if else 组合 if else 的基本用法 #include <stdio.h>int main() {int flag 1;if (flag 1){printf("flag %d\n", flag);}else if (flag…

crmeb pro多门店版二次开发

重启一下swool 新创建的数据库表 ALTER TABLE eb_store_cart ADD price DECIMAL(10,2) NOT NULL DEFAULT 0 COMMENT 单独改价 AFTER status;ALTER TABLE eb_store_order ADD car_no VARCHAR(255) NOT NULL DEFAULT COMMENT 车牌号 AFTER erp_order_id, ADD frame_no VARCHA…

2023软考系统集成项目管理工程师易混淆知识点~(7)

将2023上半年软考《系统集成项目管理工程师》易混淆知识点&#xff0c;分享给大家&#xff0c;快来跟着一起打卡学习吧&#xff01;概念辨析 1:项目、运营概念:(1)项目:项目是为达到特定目的&#xff0c;使用一定资源&#xff0c;在确定的期间内&#xff0c;为特定发起人而提供…

Linux perf的使用(一)

文章目录前言一、perf简介二、perf子命令简介三、perf工作模式3.1 计数3.2 采样参考资料前言 系统级性能优化通常包括两个阶段&#xff1a;性能剖析&#xff08;performance profiling&#xff09;和代码优化。 &#xff08;1&#xff09;性能剖析的目标是寻找性能瓶颈&#x…

python(13)--字典(Dict)

一、字典的基本操作 1.定义字典 字典也是一个列表型的数据结构&#xff0c;字典的数据是用“{ }”装的&#xff08;列表&#xff1a;[ ]&#xff0c;元组&#xff1a;( )&#xff09;&#xff0c;字典的元素是一一对应的关系“key-value”。 格式&#xff1a; Dictname{ key1…

vue中父子组件的传值

1. 父组件给子组件传值 主要两个步骤&#xff1a; 1. 在父组件调用子组件的时候绑定数据 2. 在子组件里面通过props接收父组件传过来的数据 2. 子组件给父组件传值 主要三个步骤&#xff1a; 1.在子组件中创建一个按钮&#xff0c;给按钮绑定一个点击事件 2.在响应该点击事件…

NSSRound#7

[NSSRound#7 Team]ec_RCE 源码: <?PHPif(!isset($_POST["action"]) && !isset($_POST["data"]))show_source(__FILE__);putenv(LANGzh_TW.utf8); $action $_POST["action"];$data "".$_POST["data"]."…

Java综合实验1题目: 猜心术---猜姓氏游戏

一、 实验内容及要求 假设游戏者共有十人&#xff0c;且有10个不同的姓&#xff1a;张、王、李、赵、刘、于、许、金、钱、孙&#xff0c;魔术师将十个姓写在四张纸牌上&#xff0c;游戏者只需指出那几张纸上有自己的姓&#xff0c;魔术师就能准确的说出游戏者的姓&#xff0c…