记 Rxjava zip操作符遇到的问题

news2024/12/28 5:03:44

在项目中遇到了类似下面这样的代码
本意是希望当zip操作符中三个Observable执行完毕之后,将他们返回的数据统一进行处理

        Observable.zip(startFirst(), startSecond(), startThird(),
                (first, second, third) -> {
                    Log.i("Rxjava", "handle all data");
                    return 1;
                }).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Object o) {

            }

            @Override
            public void onError(Throwable e) {
                Log.e("Rxjava", e.getMessage());
            }

            @Override
            public void onComplete() {

            }
        });


    private Observable<Boolean> startFirst() {
        return new Observable<Boolean>() {
            @Override
            protected void subscribeActual(Observer<? super Boolean> observer) {
                Log.i("Rxjava", "startFirst");
            }
        };
    }


    private Observable<Boolean> startSecond() {
        return Observable
                .timer(300, TimeUnit.MILLISECONDS)
                .map(aLong -> {
                    Log.i("Rxjava", "startSecond");
                    return true;
                });
    }


       private Observable<Boolean> startThird() {
        return Observable
                .timer(360, TimeUnit.MILLISECONDS)
                .map(aLong -> {
                    Log.i("Rxjava", "startThird");
                    return true;
                });
    }

但打印日志后,实际结果如下:
在这里插入图片描述
发现他只走了三个Observable流, 并没有去处理这些Observable返回的数据
带着这个问题我去看了zip实现原理‘
下面我把自定义传入的类称为zipper 就是框中的这一段
在这里插入图片描述

来看zip方法实现

    public static <T1, T2, T3, R> Observable<R> zip(
            ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, ObservableSource<? extends T3> source3,
            Function3<? super T1, ? super T2, ? super T3, ? extends R> zipper) {
        ObjectHelper.requireNonNull(source1, "source1 is null");
        ObjectHelper.requireNonNull(source2, "source2 is null");
        ObjectHelper.requireNonNull(source3, "source3 is null");
        
        return zipArray(Functions.toFunction(zipper), false, bufferSize(), source1, source2, source3);
    }
    
    //上面将我们传入zipper封装 Functions.toFunction(zipper),封装成了一个Array3Func类
        public static <T1, T2, T3, R> Function<Object[], R> toFunction(final Function3<T1, T2, T3, R> f) {
        ObjectHelper.requireNonNull(f, "f is null");
        return new Array3Func<T1, T2, T3, R>(f);
    }

   //最终zipper被封装在这个类里面
   static final class Array3Func<T1, T2, T3, R> implements Function<Object[], R> {
        final Function3<T1, T2, T3, R> f;

        Array3Func(Function3<T1, T2, T3, R> f) {
            this.f = f;  //这个f就是我们传的zipper
        }

        @SuppressWarnings("unchecked")
        @Override
        public R apply(Object[] a) throws Exception {
            if (a.length != 3) {
                throw new IllegalArgumentException("Array of size 3 expected but got " + a.length);
            }
            return f.apply((T1)a[0], (T2)a[1], (T3)a[2]);  //这里调用zipper的apply方法 参数对应的就是三个Observable发送的值  至于这个值怎么得来的  继续往下看
        }
    }
    

zip调用了zipArray,而zipArray果然不出意外返回了一个ObservableZip ,参数sources就是我们传递的那些Observables

    public static <T, R> Observable<R> zipArray(Function<? super Object[], ? extends R> zipper,
            boolean delayError, int bufferSize, ObservableSource<? extends T>... sources) {
        if (sources.length == 0) {
            return empty();
        }
        ObjectHelper.requireNonNull(zipper, "zipper is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableZip<T, R>(sources, null, zipper, bufferSize, delayError));
    }

现在需要去看ObservableZip的subscribeActual(),了解Rxjava原理的应该都知道 当调用ObservableZip的subscribe()时,他一定会去调subscribeActual()

    public ObservableZip(ObservableSource<? extends T>[] sources,
            Iterable<? extends ObservableSource<? extends T>> sourcesIterable,
            Function<? super Object[], ? extends R> zipper,
            int bufferSize,
            boolean delayError) {
        this.sources = sources;  //Observable列表
        this.sourcesIterable = sourcesIterable;
        this.zipper = zipper;  //自定义的zipper
        this.bufferSize = bufferSize;
        this.delayError = delayError;
    }

    @Override
    @SuppressWarnings("unchecked")
    public void subscribeActual(Observer<? super R> observer) {
        ObservableSource<? extends T>[] sources = this.sources;
        int count = 0;
        if (sources == null) {
            sources = new ObservableSource[8];
            for (ObservableSource<? extends T> p : sourcesIterable) {
                if (count == sources.length) {
                    ObservableSource<? extends T>[] b = new ObservableSource[count + (count >> 2)];
                    System.arraycopy(sources, 0, b, 0, count);
                    sources = b;
                }
                sources[count++] = p;
            }
        } else {
            count = sources.length;  //获取到Observable的数量
        }

        if (count == 0) {
            EmptyDisposable.complete(observer);
            return;
        } 

       //将observer zipper count 等参数封装
        ZipCoordinator<T, R> zc = new ZipCoordinator<T, R>(observer, zipper, count, delayError);
        zc.subscribe(sources, bufferSize);
    }

当调用subscribe时,会调用subscribeActual,在subscribeActual中 获取到我们传入的Observable数量 ,将zipper ,observer (这个observer是subscribe zip操作符时传入的 ),Observable数量等封装成一个ZipCoordinator类 然后调用ZipCoordinator类的subscribe

ZipCoordinator类:

 ZipCoordinator(Observer<? super R> actual,
                Function<? super Object[], ? extends R> zipper,
                int count, boolean delayError) {
            this.downstream = actual;  //observer
            this.zipper = zipper;
            this.observers = new ZipObserver[count];  //创建了一个zipObserver数组
            this.row = (T[])new Object[count];  //这个数组用来存储那三个Observable发射的数据
            this.delayError = delayError;
        }

        public void subscribe(ObservableSource<? extends T>[] sources, int bufferSize) {
            ZipObserver<T, R>[] s = observers;
            int len = s.length;
            for (int i = 0; i < len; i++) {
                s[i] = new ZipObserver<T, R>(this, bufferSize);  
            }
            // this makes sure the contents of the observers array is visible
            this.lazySet(0);
            downstream.onSubscribe(this);
            for (int i = 0; i < len; i++) {
                if (cancelled) {
                    return;
                }
                sources[i].subscribe(s[i]);  //给传入的那几个Observable注册观察者
            }
        }

ZipCoordinator的subscribe()主要做的事情就是实例化了count个ZipObserver,count代表zip中传入的Observable数量,然后用这个zipObserver分别作为观察者注册给我们传入的那几个Observable,这样的话我们传入的那几个Observable操作流就可以执行了
针对本例就是startFirst() startSecond() startThird()这几个方法开始执行,因为Rxjava流是从下到上,由上而下调用 所以他最终去调用Observer的onNext onError或者onComplete

所以现在去看ZipObserver里面这几个方法的实现


        ZipObserver(ZipCoordinator<T, R> parent, int bufferSize) {
            this.parent = parent;  //parent指的是上面的ZipCoordinator
            this.queue = new SpscLinkedArrayQueue<T>(bufferSize);
        }

        @Override
        public void onSubscribe(Disposable d) {
            DisposableHelper.setOnce(this.upstream, d);
        }

        @Override
        public void onNext(T t) {
            queue.offer(t); 
            parent.drain();
        }

        @Override
        public void onError(Throwable t) {
            error = t;
            done = true;
            parent.drain();
        }

        @Override
        public void onComplete() {
            done = true;
            parent.drain();
        }

ZipObserver的onNext onError或者onComplete都是将Observable发射的数据存在了一个queue里面 然后调用ZipCoordinator的drain()

 public void drain() {
            if (getAndIncrement() != 0) {
                return;
            }

            int missing = 1;

            final ZipObserver<T, R>[] zs = observers;
            final Observer<? super R> a = downstream;
            final T[] os = row; //这个row上面提到过  是一个Object[]  长度为count
            final boolean delayError = this.delayError;

            for (;;) {

                for (;;) {
                    int i = 0;
                    int emptyCount = 0;
                    for (ZipObserver<T, R> z : zs) {
                        if (os[i] == null) {  
                            boolean d = z.done;
                            T v = z.queue.poll();  //从queue中取出Observable发射的数据
                            boolean empty = v == null; 

                            if (checkTerminated(d, empty, a, delayError, z)) {
                                return;
                            }
                            if (!empty) {  //如果不为空 将数据保存在os[]中
                                os[i] = v;
                            } else {
                                emptyCount++;  // 记录空值数量
                            }
                        } else {
                            if (z.done && !delayError) {
                                Throwable ex = z.error;
                                if (ex != null) {
                                    cancelled = true;
                                    cancel();
                                    a.onError(ex);
                                    return;
                                }
                            }
                        }
                        i++;
                    }

                    // 在这里判断  如果Observale没有发射数据  就不会再继续往下了
                    if (emptyCount != 0) {
                        break;
                    }

                    R v;
                    try {
                      //调用zipper的apply  这个os就是发射的数据集合
                        v = ObjectHelper.requireNonNull(zipper.apply(os.clone()), "The zipper returned a null value");
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        cancel();
                        a.onError(ex);
                        return;
                    }

                    a.onNext(v);

                    Arrays.fill(os, null);
                }

                missing = addAndGet(-missing);
                if (missing == 0) {
                    return;
                }
            }
        }

好了 这下明白了
zip实现就是给传入的那些Observablef分别注册了一个zipObserver,然后在zipObserver里面是将发射的数据存在了一个queue里面,接着for循环去取queue里面的数据 如果不为空 将把数据存在了一个object[]里面,只要有一个Observable没有发射数据 就不会去分发这个object[] 如果都不为空 才去调用zipper的apply,参数就是这个object[]
还记得吗 之前zipper是被封装成了一个Array3Func

  public R apply(Object[] a) throws Exception {
            if (a.length != 3) {
                throw new IllegalArgumentException("Array of size 3 expected but got " + a.length);
            }
            return f.apply((T1)a[0], (T2)a[1], (T3)a[2]);  //
        }

三个Observable发射的数据调用f.apply去处理 f就是自己传的那个zipper啦
在这里插入图片描述
至于我遇到的问题再来看,发现在startFirst()里面我没有发送任何数据
在这里插入图片描述
修改一下startFirst
在这里插入图片描述
果然就可以走到处理数据的地方了 log如下
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1413061.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

鸿蒙开发(八)添加常用控件(下)

添加控件的文章分成了上下两篇&#xff0c;上篇介绍了文本显示、文本输入、按钮、图片、单选框、切换按钮这六种常用控件&#xff0c;本篇继续介绍其他几种很重要但略微复杂的控件。 鸿蒙系列上一篇&#xff1a; 鸿蒙开发&#xff08;七&#xff09;添加常用控件&#xff08;…

网络安全防御保护实验(二)

一、登录进防火墙的web控制页面进行配置安全策略 登录到Web控制页面&#xff1a; 打开Web浏览器&#xff0c;输入防火墙的IP地址或主机名&#xff0c;然后使用正确的用户名和密码登录到防火墙的Web管理界面。通常&#xff0c;这些信息在防火墙设备的文档或设备上会有说明。 导…

[晓理紫]每日论文分享(有中文摘要,源码或项目地址)-机器人、强化学习

专属领域论文订阅 关注{晓理紫}&#xff0c;每日更新论文&#xff0c;如感兴趣&#xff0c;请转发给有需要的同学&#xff0c;谢谢支持 如果你感觉对你有所帮助&#xff0c;请关注我&#xff0c;每日准时为你推送最新论文。 分类: 具身智能&#xff0c;机器人强化学习开放词汇&…

[C++开发 03_2/2 _ STL(185)]

知识点1&#xff1a;STL初始 概述&#xff1a; STL是标准模板库的意思&#xff0c;STL从广义上来讲分为&#xff1a;容器&#xff0c;算法&#xff0c;迭代器。 容器算法之间通过迭代器进行无缝连接。 知识点2&#xff1a;STL初始 2.1 STL诞生 C中面向对象的三大特性&#xff1…

九、Kotlin 注解

1. 什么是注解 注解是对程序的附件信息说明。 注解可以作用在类、函数、函数参数、属性等上面。 注解的信息可用于源码级、编译期、运行时。 2. 注解类的定义 使用元注解 Retention 声明注解类的作用时期。 使用元注解 Target 声明注解类的作用对象。 定义注解类时可以声…

8.6 代理设计模式

文章目录 一、代理模式&#xff08;Proxy Pattern&#xff09;概述二、代理模式和观察者设计模式三、模式结构四、协作角色五、实现策略六、相关模式七、示例八、应用 一、代理模式&#xff08;Proxy Pattern&#xff09;概述 代理模式是一种设计模式&#xff0c;它通过引入一个…

Windows Defender存在威胁执行操作无反应且一直存在红叉(已解决)

文章目录 前言问题如图一、原因二、解决办法&#xff08;亲试有效&#xff09;总结 前言 Windows安全中心&#xff08;Windows Defender&#xff09;执行快速扫描/完全扫描后一直存在威胁&#xff0c;执行隔离或者删除操作后下次扫描还会扫出该威胁&#xff0c;但看威胁文件位置…

38、Flink 的CDC 格式:canal部署以及示例

Flink 系列文章 一、Flink 专栏 Flink 专栏系统介绍某一知识点&#xff0c;并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分&#xff0c;比如术语、架构、编程模型、编程指南、基本的…

leetcode1237. 找出给定方程的正整数解

1237. 找出给定方程的正整数解https://leetcode.cn/problems/find-positive-integer-solution-for-a-given-equation/ 难度中等 101 给你一个函数 f(x, y) 和一个目标结果 z&#xff0c;函数公式未知&#xff0c;请你计算方程 f(x,y) z 所有可能的正整数 数对 x 和 y。满…

java生成验证码工具类,java生成图片验证码

java生成验证码工具类&#xff0c;java生成图片验证码 java生成验证码工具类&#xff0c;java生成图片验证码&#xff0c;java生成彩色图片验证码&#xff0c;带干扰线验证码。 调用结果&#xff1a; 工具类调用&#xff1a; GetMapping("/validateCode")public vo…

ubuntu设置右键打开terminator、code

前言&#xff1a; 这里介绍一种直接右键打开本地目录下的terminator和vscode的方法。 一&#xff1a;右键打开terminator 1.安装terminator sudo apt install terminator 2.安装nautilus-actions filemanager-actions sudo apt-get install nautilus-actions filemanager…

【大数据】Flink 中的事件时间处理

Flink 中的事件时间处理 1.时间戳2.水位线3.水位线传播和事件时间4.时间戳分配和水位线生成 在之前的博客中&#xff0c;我们强调了时间语义对于流处理应用的重要性并解释了 处理时间 和 事件时间 的差异。虽然处理时间是基于处理机器的本地时间&#xff0c;相对容易理解&#…

可视化智慧水电站EasyCVR视频智能监控系统方案设计与技术应用介绍

一、背景需求 水电站作为国家重要的能源基地&#xff0c;其安全运行对于保障能源供应和社会稳定具有重要意义。然而&#xff0c;传统的人工监控方式存在着诸多问题&#xff0c;如人力成本高、监控范围有限、反应不及时等。因此&#xff0c;水电站急需引进一种先进的视频智能监…

《Q年文峰》GPT应用的交互式非线性体验

Phoncent博客创始人庄泽峰把自己的小说《Q年文峰》做成GPT应用&#xff0c;显然这是一件值得探索且具有创新意义的事情。 因为传统的阅读体验是线性的&#xff0c;读者只能按照固定的情节顺序进行阅读&#xff0c;而把小说制作成GPT应用后&#xff0c;读者阅读小说的方式是非线…

安卓程序开发——搭建主页框架

一、实验目的 搭建项目框架掌握Android Activity组件使用和Intent机制&#xff0c;加强对Activity生命周期的理解&#xff0c;掌握Fragment的使用。 二、实验设备及器件 Android Studio 三、实验内容 1.创建一个Android应用&#xff0c;设置工程名MobileShop&#xff0c;包…

鸿蒙ArkUI开发-应用添加弹窗

在我们日常使用应用的时候&#xff0c;可能会进行一些敏感的操作&#xff0c;比如删除联系人&#xff0c;这时候我们给应用添加弹窗来提示用户是否需要执行该操作&#xff0c;如下图所示&#xff1a; 弹窗是一种模态窗口&#xff0c;通常用来展示用户当前需要的或用户必须关注的…

智能体AI Agent的极速入门:从ReAct到AutoGPT、QwenAgent、XAgent

前言 如这两天在微博上所说&#xff0c;除了已经在七月官网上线的AIGC模特生成系统外&#xff0c;我正在并行带多个项目组 第二项目组&#xff0c;论文审稿GPT第2版的效果已经超过了GPT4&#xff0c;详见《七月论文审稿GPT第2版&#xff1a;用一万多条paper-review数据集微调…

PBM模型学习(二)Discrete离散方法

1.Discreate离散方法主要涉及Bins的分区、粒径设置、颗粒粒径、Phenomena,如下图所示: ## 2.重要参数设置 Bins分区 Bins表示将颗粒分成了几份 也就是说在PBM模型中,颗粒粒径是离散的,比如实际的颗粒粒径可能是0.001m-0.01m范围内,那么颗粒在这个范围内粒径应该是比较连续…

幻兽帕鲁游戏服务器搭建by阿里云服务器4核16G和32G配置价格表

如何自建幻兽帕鲁服务器&#xff1f;基于阿里云服务器搭建幻兽帕鲁palworld服务器教程来了&#xff0c;一看就懂系列。本文是利用OOS中幻兽帕鲁扩展程序来一键部署幻兽帕鲁服务器&#xff0c;阿里云百科aliyunbaike.com分享官方基于阿里云服务器快速创建幻兽帕鲁服务器教程&…

C++初阶--list

list C的list是标准模板库中的一个容器&#xff0c;用于存储和管理元素的双向链表。提供了一系列访问和修改数据的函数&#xff1b; 使用时需要包含头文件 #include< list > 下面演示下它的一些基础功能 使用list list的遍历 int main() {list<int> lt;lt.push_…