RxJava操作符变换过程

news2025/1/23 4:09:27

要使用Rxjava首先要导入两个包,其中rxandroid是rxjava在android中的扩展

    implementation 'io.reactivex:rxandroid:1.2.1'
    implementation 'io.reactivex:rxjava:1.2.0'

我们在使用rxjava的操作符时都觉得很方便,但是rxjava是怎么实现操作符的转换呢,以下面的代码进行分析

String host = "https://baidu.com/";
        Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("message");
                subscriber.onCompleted();

            }
        }).map(new Func1<String, String>() {
            @Override
            public String call(String s) {
                return host+s;
            }
        }).subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                ILog.LogDebug("subscriber onCompleted is come in");

            }

            @Override
            public void onError(Throwable e) {
                ILog.LogDebug("subscriber onError is come in");

            }

            @Override
            public void onNext(String s) {
                ILog.LogDebug(s);

            }
        });

上面的代码是链式调用,为了方便理解,我把上面的代码拆分成了下面样式

        String host = "https://baidu.com/";
        Observable<String> obs1 = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("message");
                subscriber.onCompleted();

            }
        });
        Observable<String> obs2 = obs1.map(new Func1<String, String>() {
            @Override
            public String call(String s) {
                return host+s;
            }
        });
      obs2.subscribe(new Subscriber<String>() {
         @Override
         public void onCompleted() {
             ILog.LogDebug("subscriberLast onCompleted is come in");

         }

         @Override
         public void onError(Throwable e) {
             ILog.LogDebug("subscriberLast onError is come in");

         }

         @Override
         public void onNext(String s) {
             ILog.LogDebug(s);

         }
     });
        

上面代码会打印,我们将一步一步分析,打印是怎么来的

https://www.baidu.com/message
subscriberLast onCompleted is come in

obs1是我们的原始Observable, obs2是我们变换过的Observable
首先从obs1的创建开始,就是Observable的创建过程,有不理解的可以先看RXJava的创建订阅过程
osb1的创建过程create函数需要一个OnSubscribe对象,为了方便理解,就暂不拆开来写,代码虽然用的是匿名对象,我们暂且叫它onSubscribe1

       Observable<String> obs1 = Observable.create(new Observable.OnSubscribe<String>() { //onSubscribe1
            @Override
            public void call(Subscriber<? super String> subscriber) { 
                subscriber.onNext("observable call onNext0");

                subscriber.onStart();
                subscriber.onNext("observable call onNext");
                subscriber.onCompleted();
                subscriber.onNext("observable call onNext1");

            }
        });

obs1创建成功后调用了map方法,map方法又返回了一个Observable,就是我们的obs2,同理,map方法的参数我们叫它func1

        Observable<String> obs2 = obs1.map(new Func1<String, String>() { //func1
            @Override
            public String call(String s) {
                return host+s;
            }
        });

随后我们又使用了osb2进行了订阅,同理subscribe方法的参数我们叫它subscriberLast

         obs2.subscribe(new Subscriber<String>() { //subscriberLast
            @Override
            public void onCompleted() {
                ILog.LogDebug("subscriber onCompleted is come in");

            }

            @Override
            public void onError(Throwable e) {
                ILog.LogDebug("subscriber onError is come in");

            }

            @Override
            public void onNext(String s) {
                ILog.LogDebug("subscriber onNext is come in s = "+s);

            }
        });

于是有了下面的关系,到这里还都很好理解
在这里插入图片描述然后我们来分析obs1.map调用了map方法,rxjava做了什么

public class Observable<T> {
....
	    public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return create(new OnSubscribeMap<T, R>(this, func));
    }
    
   public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(RxJavaHooks.onCreate(f));
    }
....
}

map方法又调用了create方法,create方法的调用我们已经在RxJava的调用过程中讲过,传入的参数是一个OnSubscribe对象,OnSubscribeMap实现了OnSubscribe接口,我们把new出来的OnSubscribeMap对象暂时叫做onSubscribeMap1构造方法传入的参数是obs1func1, 那我们再来看OnSubscribeMap类

public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {

    final Observable<T> source;

    final Func1<? super T, ? extends R> transformer;

    public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
        this.source = source;
        this.transformer = transformer;
    }

    @Override
    public void call(final Subscriber<? super R> o) {
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        source.unsafeSubscribe(parent);
    }

    static final class MapSubscriber<T, R> extends Subscriber<T> {

        final Subscriber<? super R> actual;

        final Func1<? super T, ? extends R> mapper;

        boolean done;

        public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            R result;

            try {
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }

            actual.onNext(result);
        }

        @Override
        public void onError(Throwable e) {
            if (done) {
                RxJavaHooks.onError(e);
                return;
            }
            done = true;

            actual.onError(e);
        }


        @Override
        public void onCompleted() {
            if (done) {
                return;
            }
            actual.onCompleted();
        }

        @Override
        public void setProducer(Producer p) {
            actual.setProducer(p);
        }
    }

}

我们知道在Observable创建好后,调用了subscribe方法就可以进行订阅了,最后调用的也是Observable创建时传入的OnSubscribe对象的call方法,以obs1的创建举例,也就是我们这里的onSubscribe1,不懂得去看RxJava的调用过程
因为我们最后执行subscribe订阅方法的是obs2那么也就会调用obs2的OnSubscribe对象,那么obs2的OnSubscribe对象是谁呢,就是onSubscribeMap1那么执行完订阅就会调用onSubscribeMap1的call方法

    @Override
    public void call(final Subscriber<? super R> o) {
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        source.unsafeSubscribe(parent);
    }

这里的source是OnSubscribeMap构造方法调用时初始化的也就是obs1,transformer 是我们的func1

    public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
        this.source = source;
        this.transformer = transformer;
    }

现在又有了下面的关系
在这里插入图片描述在onSubscribeMap1的call方法中,一共有三行代码,这三行代码很重要,我们一行一行分析

        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);

实例化一个MapSubscriber对象parent ,MapSubscriber继承了Subscriber,所以MapSubscriber也是一个观察者;parent 持有了o,o是也就是subscriberLast,还持有了transformer,也就是func1。这里一会要详细分析

        o.add(parent);

o是一个Subscriber,也是subscriberLast,上面的代码直接把parent添加到了subscriberLast的SubscriptionList列表。这里一会还要说一下。在看下一条代码

        source.unsafeSubscribe(parent);

我们知道source也就是我们的obs1obs1的订阅操作就是在这里发生的。至于为什么用unsafeSubscribe方法, 我们一会在分析

我们重新梳理一下,
1、obs2的订阅方法subscribe执行导致了obs2的onSubscribeMap1实例的call方法被执行;
2、onSubscribeMap1的call方法中又执行了obs1的订阅;obs1的观察者就是parent;
3、obs1的订阅必然会导致obs1 的onSubscribe实例onSubscribe1的call方法被执行。
4、在onSubscribe1的call方法中我们又调用了

               subscriber.onNext("message");
                subscriber.onCompleted();

这里的subscriber也就是parent,必然会调用parent的next方法并传入message,和onCompleted。
5、继续分析parent

 static final class MapSubscriber<T, R> extends Subscriber<T> {

        final Subscriber<? super R> actual;

        final Func1<? super T, ? extends R> mapper;

        boolean done;

        public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            R result;

            try {
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }

            actual.onNext(result);
        }

        @Override
        public void onError(Throwable e) {
            if (done) {
                RxJavaHooks.onError(e);
                return;
            }
            done = true;

            actual.onError(e);
        }


        @Override
        public void onCompleted() {
            if (done) {
                return;
            }
            actual.onCompleted();
        }

        @Override
        public void setProducer(Producer p) {
            actual.setProducer(p);
        }
    }

parent是MapSubscriber,MapSubscriber继承了Subscriber,MapSubscriber的构造方法需要两个参数Subscriber和Func1,通过之前的分析,知道actual就是subscriberLast,mapper就是fun1

        public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }

用一张图来说明下
在这里插入图片描述

 static final class MapSubscriber<T, R> extends Subscriber<T> {
 .....
        @Override
        public void onNext(T t) {
            R result;

            try {
                result = mapper.call(t);  //这里调用,mapper就是fun1,的call方法,这里的t就是message,result就是转换后的字符串
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }

            actual.onNext(result);
        }
	.....
}

在parent的onnext方法中调用了func1的call方法,还记得我们在func1的call方法中写的什么,没错就是转换字符串,call方法的返回值就是转换后的字符串

        final String host = "https://www.baidu.com/";
        Observable<String> obs2 = obs1.map(new Func1<String, String>() {
            @Override
            public String call(String s) {
                return host+s;  //func1的call方法 进行了字符串转换,这里的s就是message
            }
        });

继续调用了

actual.onNext(result);

我们知道actual就是subscriberLast,所以会调用subscriberLast的onNext方法,

       obs2.subscribe(new Subscriber<String>() {
        @Override
        public void onCompleted() {
            ILog.LogDebug("subscriberLast onCompleted is come in");

        }

        @Override
        public void onError(Throwable e) {
            ILog.LogDebug("subscriber onError is come in");

        }

        @Override
        public void onNext(String s) { //subscriberLast的onNext方法
            ILog.LogDebug(s);//最后会打印https://www.baidu.com/message

        }

        @Override
        public void onStart() {
            super.onStart();
        }
    });

在obs1中我们还调用了onCompleted方法,先调用了actual.onCompleted();
actual也就是subscriberLast实例。

        @Override
        public void onCompleted() {
            if (done) {
                return;
            }
            actual.onCompleted();
        }

在这添加一张图方便更好的理解
在这里插入图片描述简单的说就是先调用obs1的subscriber的onNext()方法,在onNext()方法中调用func1的call方法,处理数据源数据,然后再把处理过的数据源发射给obs2的subscriber

还记不记得上面的第二条代码 o.add(parent);为什么要把parent添加到o中呢,o也就是subscriberLast。

    @Override
    public void call(final Subscriber<? super R> o) {
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);//第二条代码
        source.unsafeSubscribe(parent);//第三条代码
    }

subscriberLast的add方法最终会把parent添加到subscriberLast的SubscriptionList中,关于Subscriber请看RxJava中的Subscriber。再执行SubscriptionList的解绑方法unsubscribe会把subscriptions中的Subscription一并解绑,也就是会把parentobs1的绑定关系解除。
那上面第三条代码为什么调用的是unsafeSubscribe方法呢,记得我们之前分析RxJava的订阅过程中看到的是最后包装了一个SafeSubscriber,再SafeSubscriber中会进行一些多线程的处理操作。

        /*
         * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
         * to user code from within an Observer"
         */
        // if not already wrapped
        if (!(subscriber instanceof SafeSubscriber)) {
            // assign to `observer` so we return the protected version
            subscriber = new SafeSubscriber<T>(subscriber);
        }

observable中unsafeSubscribe方法也很简单,也没并由对传入的subscriber进行包装而是直接调用。

public class Observable<T> {
....
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
        try {
            // new Subscriber so onStart it
            subscriber.onStart();
            // allow the hook to intercept and/or decorate
            RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // if an unhandled error occurs executing the onSubscribe we will propagate it
            try {
                subscriber.onError(RxJavaHooks.onObservableError(e));
            } catch (Throwable e2) {
                Exceptions.throwIfFatal(e2);
                // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                // so we are unable to propagate the error correctly and will just throw
                RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                // TODO could the hook be the cause of the error in the on error handling.
                RxJavaHooks.onObservableError(r);
                // TODO why aren't we throwing the hook's return value.
                throw r; // NOPMD
            }
            return Subscriptions.unsubscribed();
        }
    }
....

我们回过来再看MapSubscriber中其实也已经进行了简单的处理工作,所以也就不需要使用SafeSubscriber了。
至此RxJava操作符变换过程就分析完了,欢迎大家补充和纠正。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/404196.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

不会编程也能搭建聊天机器人?ChatGPT + Notion AI助你一臂之力!

体验链接&#xff1a;Aitrend ChatBot (无需环境&#xff0c;无需账号&#xff0c;打开即用&#xff0c;完全免费&#xff0c;回答能力同ChatGPT) 背景介绍 本文主要谈关于如何使用ChatGPT&#xff08;实际使用主力工具为Notion AI&#xff09;&#xff0c;应用官方API接口&…

Seay代码审计工具

一、简介Seay是基于C#语言开发的一款针对PHP代码安全性审计的系统&#xff0c;主要运行于Windows系统上。这款软件能够发现SQL注入、代码执行、命令执行、文件包含、文件上传、绕过转义防护、拒绝服务、XSS跨站、信息泄露、任意URL跳转等漏洞&#xff0c;基本上覆盖常见PHP漏洞…

0109二分图-无向图-数据结构和算法(Java)

文章目录1 概念2 API3 分析和实现4 测试5 总结后记1 概念 二分图是一种能将所有结点分为两部分的图&#xff0c;其中图的每条边所连接的两个顶点都分别属于不同的部分。 2 API public classBipartiteBipartite(Graph G)预处理函数public booleanisBipartitle()是否是二分图pub…

【opensea】opensea-js 升级 Seaport v1.4 导致的问题及解决笔记

一、opensea 协议升级导致旧包不能使用了 我使用的是 “opensea-js”: "^4.0.12” 版本当SDK。于2023年3月9日之后&#xff0c;不能使用了&#xff0c;需要升级到 Seaport v1.4 协议的包。 报错如下: Error: API Error 400: Please provide an OPEN order type when us…

可逆神经网络的研究及其在图像中应用

一、摘要 可逆神经网络(INN)自被提出以来&#xff0c;就受到了广泛关注。由于其双射构造和高效可逆性&#xff0c;INN被用于各种推理任务&#xff0c;如图像隐藏、图像重缩放、图像着色、图像压缩和视频超分辨率等等。本文针对最新关于INN在图像方面应用的文献进行介绍&#x…

day30_JS

今日内容 上课同步视频:CuteN饕餮的个人空间_哔哩哔哩_bilibili 同步笔记沐沐霸的博客_CSDN博客-Java2301 零、 复习昨日 一、作业 二、BOM 三、定时器 四、正则表达式 零、 复习昨日 事件 事件绑定方式鼠标事件 onmouseoveronmouseoutonmousemove 键盘事件 onkeydownonkeyupon…

一文带你深入理解【Java基础】· Java反射机制(下)

写在前面 Hello大家好&#xff0c; 我是【麟-小白】&#xff0c;一位软件工程专业的学生&#xff0c;喜好计算机知识。希望大家能够一起学习进步呀&#xff01;本人是一名在读大学生&#xff0c;专业水平有限&#xff0c;如发现错误或不足之处&#xff0c;请多多指正&#xff0…

aws dynamodb 使用awsapi和PartiQL掌握dynamodb的CRUD操作

总结一下 dynamodb通常和java等后端sdk结合使用使用的形式可以是api或partiql语法调用dynamodb的用法不难&#xff0c;更重要的是维护成本&#xff0c;所需的服务集成&#xff0c;技术选型等和大数据结合场景下有独特优势 之后可能再看看java sdk中DynamoDBMapper的写法&…

登入vCenter显示503,证书过期解决办法

登入vCenter显示503 原因&#xff1a;当安全令牌服务 &#xff08;STS&#xff09; 证书已过期时&#xff0c;会出现这些问题。这会导致内部服务和解决方案用户无法获取有效令牌&#xff0c;从而导致无法按预期运行&#xff08;证书两年后就会过期&#xff09;。 解决办法&…

Yocto系列讲解[技巧篇]90 - toolchain交叉编译器SDK中安装的软件

By: fulinux E-mail: fulinux@sina.com Blog: https://blog.csdn.net/fulinus 喜欢的盆友欢迎点赞和订阅! 你的喜欢就是我写作的动力! 目录 问题背景toolchain生成回顾toolchain sdk安装方法1:安装libmyapi到SDK方法2:安装libmyapi到SDK演示过程返回总目录:Yocto开发讲解系…

Linux 学习笔记——二、主机规划与磁盘分区

一、Linux 与硬件的搭配 Linux 中所有设备均被视为文件&#xff0c;其命名规则如下&#xff1a; 设备文件名SCSI/SATA/USB 硬盘机/dev/sd[a-p]USB 闪存盘/dev/sd[a-p]&#xff08;与 SATA 相同&#xff09;Virtl/O 界面/dev/vd[a-p]&#xff08;用于虚拟机内&#xff09;软盘…

RabbitMQ高级特性

RabbitMQ高级特性 消息可靠性投递 Consumer ACK 消费端限流 TTL 死信队列 延迟队列 日志与监控 消息可靠性分析与追踪 管理 消息可靠性投递 在使用 RabbitMQ 的时候&#xff0c;作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制…

docker基本命令-容器

容器 基本概念 镜像&#xff08;Image&#xff09;和容器&#xff08;Container&#xff09;的关系&#xff0c;就像是面向对象程序设计中的 类 和 实例 一样&#xff0c;镜像是静态的定义&#xff0c;容器是镜像运行时的实体。容器可以被创建、启动、停止、删除、暂停等。 容…

.NET Framework .NET Core与 .NET 的区别

我们在创建C#程序时,经常会看到目标框架以下的选项,那么究竟有什么区别? 首先 .NET是一种用于构建多种应用的免费开源开发平台,可以使用多种语言,编辑器和库开发Web应用、Web API和微服务、云中的无服务器函数、云原生应用、移动应用、桌面应用、Windows WPF、Windows窗体…

搭建一个中心化的定时服务

1. 背景 在物联网络&#xff0c;很多设备之间都在进行交互&#xff0c;其中云端在远程交流中起到了很重要的作用。比如&#xff0c;一台设备想进行调温&#xff0c;但是需要知道此时房间的温度&#xff0c;那就需要定时去查询传感器测出来的房间温度&#xff0c;如果温度过高&a…

【C++学习】【STL】list容器

list 容器&#xff0c;又称双向链表容器&#xff0c;即该容器的底层是以双向链表的形式实现的。这意味着&#xff0c;list 容器中的元素可以分散存储在内存空间里&#xff0c;而不是必须存储在一整块连续的内存空间中。可以看到&#xff0c;list 容器中各个元素的前后顺序是靠指…

【NodeJs】使用ffmpeg将视频webm转换为mp4

使用Chrome浏览器录制视频文件是webm格式&#xff0c;但是很多媒体播放器是不支持的&#xff0c;不利于分享&#xff0c;需要转换为mp4格式才行&#xff0c;接下来给大家讲 ffmpeg ffmpeg是什么呢&#xff0c; 一个免费开源的视频转换工具&#xff0c;一款音视频编解码工具&…

日志与可视化方案:从ELK到EFK,再到ClickHouse

EFK方案 从ELK谈起 ELK是三个开源软件的缩写&#xff0c;分别表示&#xff1a;Elasticsearch&#xff0c;Logstash&#xff0c;Kibana。新增了一个FlieBeat&#xff0c;它是一个轻量级的日志收集处理工具&#xff0c;FlieBeat占用资源少&#xff0c;适用于在各个服务器上搜集…

JS语法(扫盲)

文章目录一、初识JavaScript二、第一个JS程序JS代码的引入JS程序的输出三、语法变量使用动态类型内置类型运算符强类型语言&弱类型语言条件语句循环语句数组创建数组获取数组元素新增数组元素删除数组元素函数语法格式形参实参个数的问题匿名函数&函数表达式作用域作用…

PHP 的运行方式有哪些?

PHP本质上的运行方式可以分为两种&#xff1a; 基于命令行的基于PHP-FPM的 但实际上&#xff0c;PHP能做的事很多&#xff0c;很多场景下&#xff0c;不同的运行方式能让开发更方便&#xff0c;减轻各种工作。 测试开发 PHP内置了一个HTTP 的server。这意味着&#xff0c;很…