RxJava-操作符-mergeDelayError使用

news2025/1/9 19:20:17

说明

  • 合并多个源Observable的事件,事件不是按照顺序被发射(如需顺序使用concat操作符)。
  • Error事件被延迟发射,针对的是源Observable中的Error事件,多个源Observable都有Error时,会合并Error事件。
  • 执行结束
    • 正常执行所有事件,onComplete()代表执行结束。
    • 有Error事件时,onError()代表执行结束。

Error事件位置以及类型

Error事件需要在源Observable中才会被延迟发射。

一个源Observable 有Error事件,onError()中接收到的是实际的错误类型,不是CompositeException类型。

多个源Observable 有Error事件时,onError()中接收到的CompositeException类型,此类型是个数组。

案例

错误不在源Observable中,不会被延迟发射

Observable<Integer> odds = Observable.just(1, 3, 5);
Observable<Integer> evens = Observable.just(2, 4, 6);

Observable.mergeDelayError(odds, evens)
        .map(new Function<Integer, Integer>() {
            @Override
            public Integer apply(Integer integer) throws Exception {
                //此处会产生错误,错误在mergeDelayError后面,没有在源Observable中(odds,evens)中,
                //Error不会被延迟
                if (integer == 3) return null;
                return integer;
            }
        })
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "接收到数据:" + integer);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.e(TAG, "接收到异常");
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.e(TAG, "接收完成" + " Thread" + Thread.currentThread());
            }
        }, new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                Log.e(TAG, "doOnSubscribe" + " Thread" + Thread.currentThread());
            }
        });


错误在源Observable中,会被延迟发射

Observable<Integer> odds = Observable.just(1, 3, 5)
        .map(new Function<Integer, Integer>() {
            @Override
            public Integer apply(Integer integer) throws Exception {
                //此处会产生错误,错误在源Observable中,会被延迟
                if (integer == 3) return null;
                return integer;
            }
        });
Observable<Integer> evens = Observable.just(2, 4, 6)
        .map(new Function<Integer, Integer>() {
            @Override
            public Integer apply(Integer integer) throws Exception {
                return integer;
            }
        });

Observable.mergeDelayError(odds, evens)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "接收到数据:" + integer + " Thread" + Thread.currentThread());
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.e(TAG, "接收到异常" + " Thread" + Thread.currentThread());
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.e(TAG, "接收完成" + " Thread" + Thread.currentThread());
            }
        }, new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                Log.e(TAG, "doOnSubscribe" + " Thread" + Thread.currentThread());
            }
        });

多个源Observable中有Error,onError()中的类型是CompositeException

Observable<Integer> odds = Observable.just(1, 3, 5)
        .map(new Function<Integer, Integer>() {
            @Override
            public Integer apply(Integer integer) throws Exception {
                //此处会产生错误,错误在源Observable中,会被延迟
                if (integer == 3) return null;
                return integer;
            }
        });
Observable<Integer> evens = Observable.just(2, 4, 6)
        .map(new Function<Integer, Integer>() {
            @Override
            public Integer apply(Integer integer) throws Exception {
                //此处会产生错误,错误在源Observable中,会被延迟
                if (integer == 4) return null;
                return integer;
            }
        });

Observable.mergeDelayError(odds, evens)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "接收到数据:" + integer + " Thread" + Thread.currentThread());
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable e) throws Exception {
                Log.e(TAG, "接收到异常" + " Thread" + Thread.currentThread());
                if (e instanceof CompositeException) { //源Observable 异常
                    CompositeException compositeException = (CompositeException) e;
                    List<Throwable> execptionList = compositeException.getExceptions();
                    Log.e(TAG, "源Observable 异常数量" +  execptionList.size());
                }
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.e(TAG, "接收完成" + " Thread" + Thread.currentThread());
            }
        }, new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                Log.e(TAG, "doOnSubscribe" + " Thread" + Thread.currentThread());
            }
        });


线程切换位置

线程切换需要在源Observable中完成。

线程切换不在源Observable中执行,源Observable中没有Error事件时正常,如果放在操作符后,会影响Error事件的延迟。

案例

源Observable中没有Error事件,线程切换在mergeDelayError操作符后。(正常)

Observable<Integer> odds = Observable.just(1, 3, 5)
        .subscribeOn(Schedulers.io()) //子线程执行 源Observable
        .map(new Function<Integer, Integer>() {
            @Override
            public Integer apply(Integer integer) throws Exception {
                Log.e(TAG, "改变:" + Thread.currentThread());
                return integer+1;
            }
        });
Observable<Integer> evens = Observable.just(2, 4, 6)
        .subscribeOn(Schedulers.io()) //子线程执行 源Observable
        .map(new Function<Integer, Integer>() {
            @Override
            public Integer apply(Integer integer) throws Exception {
                Log.e(TAG, "改变:" + Thread.currentThread());
                return integer+1;
            }
        });

Observable.mergeDelayError(odds, evens)
        .observeOn(AndroidSchedulers.mainThread()) // 非缘Observable中切换主线程
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "接收到数据:" + integer + " Thread" + Thread.currentThread());
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.e(TAG, "接收到异常" + " Thread" + Thread.currentThread());
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.e(TAG, "接收完成" + " Thread" + Thread.currentThread());
            }
        }, new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                Log.e(TAG, "doOnSubscribe" + " Thread" + Thread.currentThread());
            }
        });


源Observable中有Error事件,线程切换在mergeDelayError操作符后。(异常)

只能接受到一个异常数据,接收不到其他事件的数据

Observable<Integer> odds = Observable.just(1, 3, 5)
        .subscribeOn(Schedulers.io()) //子线程执行 源Observable
        .map(new Function<Integer, Integer>() {
            @Override
            public Integer apply(Integer integer) throws Exception {
                Log.e(TAG, "改变:" + Thread.currentThread());
                if (integer == 3) return null; //此处发生错误
                return integer+1;
            }
        });
Observable<Integer> evens = Observable.just(2, 4, 6)
        .subscribeOn(Schedulers.io()) //子线程执行 源Observable
        .map(new Function<Integer, Integer>() {
            @Override
            public Integer apply(Integer integer) throws Exception {
                Log.e(TAG, "改变:" + Thread.currentThread());
                if (integer == 2) return null; //此处发生错误
                return integer+1;
            }
        });

Observable.mergeDelayError(odds, evens)
        .observeOn(AndroidSchedulers.mainThread()) //切换主线程
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "接收到数据:" + integer + " Thread" + Thread.currentThread());
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.e(TAG, "接收到异常" + " Thread" + Thread.currentThread());
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.e(TAG, "接收完成" + " Thread" + Thread.currentThread());
            }
        }, new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                Log.e(TAG, "doOnSubscribe" + " Thread" + Thread.currentThread());
            }
        });


线程切换在源Observable执行,源Observable中无Error事件(正常)

Observable<Integer> odds = Observable.just(1, 3, 5)
                .subscribeOn(Schedulers.io()) //源Observable 切换线程
                .map(new Function<Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer) throws Exception {
//                        Log.e(TAG, "改变:" + Thread.currentThread());
                        return integer+1;
                    }
                })
                .observeOn(AndroidSchedulers.mainThread());//源Observable 切换线程
        Observable<Integer> evens = Observable.just(2, 4, 6)
                .subscribeOn(Schedulers.io())//源Observable 切换线程
                .map(new Function<Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer) throws Exception {
//                        Log.e(TAG, "改变:" + Thread.currentThread());
                        return integer+1;
                    }
                })
                .observeOn(AndroidSchedulers.mainThread()); //源Observable 切换线程

        Observable.mergeDelayError(odds, evens)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e(TAG, "接收到数据:" + integer + " Thread" + Thread.currentThread());
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.e(TAG, "接收到异常" + " Thread" + Thread.currentThread());
                    }
                }, new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.e(TAG, "接收完成" + " Thread" + Thread.currentThread());
                    }
                }, new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        Log.e(TAG, "doOnSubscribe" + " Thread" + Thread.currentThread());
                    }
                });

线程切换在源Observable执行,源Observable中有Error事件(正常)

 Observable<Integer> odds = Observable.just(1, 3, 5)
                .subscribeOn(Schedulers.io()) //源Observable 切换线程
                .map(new Function<Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer) throws Exception {
//                        Log.e(TAG, "改变:" + Thread.currentThread());
                        if (integer == 3) return null;//此处发生错误
                        return integer;
                    }
                })
                .observeOn(AndroidSchedulers.mainThread()); //源Observable 切换线程
        Observable<Integer> evens = Observable.just(2, 4, 6)
                .subscribeOn(Schedulers.io())//源Observable 切换线程
                .map(new Function<Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer) throws Exception {
//                        Log.e(TAG, "改变:" + Thread.currentThread());
                        return integer;
                    }
                })
                .observeOn(AndroidSchedulers.mainThread());//源Observable 切换线程

        Observable.mergeDelayError(odds, evens)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e(TAG, "接收到数据:" + integer + " Thread" + Thread.currentThread());
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.e(TAG, "接收到异常" + " Thread" + Thread.currentThread());
                    }
                }, new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.e(TAG, "接收完成" + " Thread" + Thread.currentThread());
                    }
                }, new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        Log.e(TAG, "doOnSubscribe" + " Thread" + Thread.currentThread());
                    }
                });


异常处理

源Observable的异常,单独处理,使用源Observable的doOnError(),合并处理使用订阅者的doOnError()。

Observable<Integer> odds = Observable.just(1, 3, 5)
        .subscribeOn(Schedulers.io())
        .map(new Function<Integer, Integer>() {
            @Override
            public Integer apply(Integer integer) throws Exception {
                if (integer == 3) return null;//此处有错误
                return integer;
            }
        })
        .observeOn(AndroidSchedulers.mainThread())
        .doOnError(new Consumer<Throwable>() { //异常被捕获,该Observable其他事件不会被接收
            @Override
            public void accept(Throwable throwable) throws Exception {
                isError1 = true;
                Log.e(TAG, "接收到异常1");
            }
        });
Observable<Integer> evens = Observable.just(2, 4, 6)
        .subscribeOn(Schedulers.io())
        .map(new Function<Integer, Integer>() {
            @Override
            public Integer apply(Integer integer) throws Exception {
                return integer;
            }
        })
        .observeOn(AndroidSchedulers.mainThread())
        .doOnError(new Consumer<Throwable>() { //此处不会执行,没有异常,所有事件会被正常接收
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.e(TAG, "接收到异常2");
            }
        });

Observable.mergeDelayError(odds, evens)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "接收到数据:" + integer + " Thread" + Thread.currentThread());
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable e) throws Exception {
                //只要源Observable有异常,不管有没有在源Observable中被捕获,此处都有会执行
                Log.e(TAG, "接收到所有异常" + " Thread" + Thread.currentThread());
                Log.e(TAG, "isError1 = " + isError1);
                if (e instanceof CompositeException) { //源Observable 异常
                    CompositeException compositeException = (CompositeException) e;
                    List<Throwable> execptionList = compositeException.getExceptions();
                         Log.e(TAG, "源Observable 异常数量" +  execptionList.size());
                }
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.e(TAG, "接收完成" + " Thread" + Thread.currentThread());
            }
        }, new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                Log.e(TAG, "doOnSubscribe" + " Thread" + Thread.currentThread());
            }
        });

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2058735.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vba自动发送邮件的基础步骤?有哪些流程?

vba自动发送邮件如何设置&#xff1f;vba自动发送邮件的技巧&#xff1f; 如果你想节省时间&#xff0c;提高工作效率&#xff0c;学会如何使用VBA自动发送邮件是一个非常有用的技能。AokSend将为你介绍VBA自动发送邮件的基础步骤&#xff0c;并通过简单的分段来详细讲解。 v…

《黑神话:悟空》的发布是否能打开元宇宙游戏世界的门

四年漫长等待&#xff0c;8月20日&#xff0c;国产3A游戏巨制《黑神话&#xff1a;悟空》正式上线并彻底引爆全球市场。这背后不仅是中国游戏史的里程碑&#xff0c;也将为元宇宙的未来夯实地基&#xff01; 游戏上线后&#xff0c;热度持续飙升&#xff0c;成为了社交媒体和游…

【数据结构与算法】并行搜索

并行搜索目录 一.并行的基础知识1.进程2.线程 二.正常遍历搜索三.线程并发搜索1.线程身份证和句柄2.创建线程3.搜索结构体4.处理函数实现 四.完整代码 一.并行的基础知识 1.进程 说的简单点,进程就是计算机中的多个程序,就相当于多个软件. 比如我同时打开QQ和WX,那么这个就叫…

基于yolov5 人体行为检测 对 跌倒 站立 蹲下 坐下 跑 五种行为检测目标检测

该项目使用YOLOv5深度学习框架来检测图像或视频中人体的五种基本行为&#xff1a;跌倒、站立、蹲下、坐下和跑步。YOLOv5&#xff08;You Only Look Once v5&#xff09;是一种高效的物体检测模型&#xff0c;能够快速准确地识别出图像中的目标。本项目具有以下特点&#xff1a…

秋招力扣Hot100刷题总结——动态规划

一、01背包 01背包问题中&#xff0c;遍历顺序可以是先物品后背包&#xff0c;也可以是先背包后物品&#xff0c;但是背包要倒序遍历。 1. 等和子集 题目要求&#xff1a;给你一个 只包含正整数 的 非空 数组 nums 。请你判断是否可以将这个数组分割成两个子集&#xff0c;使…

【芯智雲城】星宸科技Sigmastar SSD101芯片_高清显示解决方案

一、方案描述&#xff1a; Sigmastar SSD101是高度集成高性能的display controller,支持模拟的NTSC/PAL/SECAM CVBS和S-video信号输入,可自动检测视频信号源和模式&#xff0c;可自如地切换不同的信号源,内置2D梳状滤波器和AGC,支持6-bit/8-bit TTL/TCON显示屏。芯片LQFP 64pi…

基于大型语言模型的应急人机协同救援关键技术

源自&#xff1a;指挥与控制学报 作者&#xff1a;石响 王天乐 夏乾臣 陈善广 注&#xff1a;若出现显示不完全的情况&#xff0c;可 V 搜索“人工智能技术与咨询”查看完整文章 人工智能、大数据、多模态大模型、计算机视觉、自然语言处理、数字孪生、深度强化学习 课程…

大屏畅玩游戏,小米SU7迎来大升级,可连接蓝牙游戏手柄

嘿&#xff0c;朋友们&#xff01;今天我要和你们分享一个令人兴奋的消息。 小米SU7汽车迎来了一次重大升级&#xff0c;这不仅仅是技术上的飞跃&#xff0c;更是为驾驶体验带来了革命性的变化。想象一下&#xff0c;在宽敞的车内&#xff0c;通过连接蓝牙游戏手柄&#xff0c…

R语言绘图系列专栏 | 更新中

关于**《R语言绘图专栏》**&#xff0c;此专栏基于R语言绘制图形。每个图形我们会提供对应的R代码、数据和文本文档。此系列将会是一个长期更新的系列。 本系列教程&#xff0c;我们计划发表及收录使用R语言绘制50科研中常用图形。这是个长期的过程&#xff0c;计划花费3-4个的…

graphrag论文

目录 摘要1 介绍2 图 RAG 方法与流程2.1 源文档 → 文本块2.2 文本块 → 元素实例2.3 元素实例 → 元素摘要2.4 元素摘要 → 图社区2.5 图社区 → 社区摘要2.6 社区摘要 → 社区答案 → 全局答案 3 评估3.1 数据集3.2 查询3.3 条件3.4 指标3.5 配置3.6 结果 4 相关工作4.1 RAG …

深度解析DeepMind乒乓球AI:从AlphaGo到AlphaPingPong的进化之路

引言 谷歌DeepMind在AI研究领域再次取得重大突破&#xff0c;最新推出的乒乓球AI机器人已经能够击败人类选手。这一成就标志着AI从单纯的智力游戏&#xff08;如AlphaGo&#xff09;向物理运动竞技场的转移&#xff0c;充分展示了AI在体育竞技中的潜力。这款AI机器人不仅在击败…

【无标题】Image-to-Image Translation 图像风格迁移中的成对图像拼接代码

引 言 在图像风格迁移任务中&#xff0c;近几年比较火热的Generative Adversarial Nets (GAN)模型以及各种变体深受视觉研究团体的青睐&#xff0c;在具体任务中取得比较不错的实验表现。在有监督图像风格迁移任务迁移中&#xff0c;需要输入给模型成对的图片&#xff08;一个来…

中国气膜技术蓬勃发展:专业公司脱颖而出

气膜技术最早起源于美国&#xff0c;凭借其低造价、快速安装、可移动、可重复使用等独特优势&#xff0c;迅速成为体育、仓储、展览、会议、传媒等行业的首选临时或简易室内空间解决方案。21世纪以来&#xff0c;中国逐步引进并发展了膜结构技术&#xff0c;包括骨架膜、张拉膜…

Spire.PDF for .NET【文档操作】演示:创建 PDF 组合

PDF 作品集是一组文件&#xff0c;其中可以包含文本文档、电子表格、电子邮件、图像、PowerPoint 演示文稿和绘图。尽管 PDF 作品集将不同类型的文件组合成一个单元&#xff0c;但其中的每个文件都保留了其原始格式、分辨率和大小。在本文中&#xff0c;您将学习如何使用Spire.…

音频分割软件有什么?最方便的音频分割软件分享给你

一段长音频就像是一本厚重的百科全书&#xff0c;而音频剪辑师的任务&#xff0c;就是要将这本书拆分成数个章节&#xff0c;每章都有其独立的主题和内容&#xff0c;这非常考验剪辑师们的音频分割技巧。 幸运的是&#xff0c;随着技术的发展&#xff0c;市面上出现了许多优秀…

Dell Precision3591 自带RTX2000 Ada + 雷索坞外界3090显卡

问题&#xff1a; 插上雷索坞之后会自动安装驱动&#xff0c;但是自带的驱动可能跟当前的操作系统不兼容&#xff0c;所以安装失败&#xff0c;同时把自带的独立显卡的驱动给搞坏 解决办法&#xff1a; 先把自带的Ada 显卡驱动disable 然后找到跟系统匹配的驱动 GeForce Ga…

考研交流平台设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言具体实现截图详细视频演示技术栈系统测试为什么选择我官方认证玩家&#xff0c;服务很多代码文档&#xff0c;百分百好评&#xff0c;战绩可查&#xff01;&#xff01;入职于互联网大厂&#xff0c;可以交流&#xff0c;共同进步。有保障的售后 代码参考数据库参…

上交2024最新-《动手学大模型》实战教程及ppt分享!

本课介绍 今天分享一个上海交大的免费的大模型课程&#xff0c;有相关教程文档和Slides&#xff0c;目前是2.2K星标&#xff0c;还是挺火的&#xff01; 《动手学大模型》系列编程实践教程&#xff0c;由上海交通大学2024年春季《人工智能安全技术》课程&#xff08;NIS3353&…

使用IDEA开发Java Web项目

下载Tomcat 首先&#xff0c;下载Apache Tomcat并解压到本地计算机&#xff0c;可存放于任何位置。 另外&#xff0c;需要在系统中环境JRE_HOME环境变量&#xff0c;以保证Tomcat可以正常启动&#xff0c;具体配置方式请参考其它教程。 创建Java Web项目 在IntelliJ IDEA的欢…

Swift 数据类型之可选值类型(Optional)详解

&#x1f604;作者简介&#xff1a; 小曾同学.com,一个致力于测试开发的博主⛽️&#xff0c;主要职责&#xff1a;测试开发、CI/CD 如果文章知识点有错误的地方&#xff0c;还请大家指正&#xff0c;让我们一起学习&#xff0c;一起进步。 &#x1f60a; 座右铭&#xff1a;不…