【Rxjava详解】(二) 操作符的妙用

news2025/1/11 10:56:33

文章目录

    • 接口变化
    • 操作符
      • map
      • flatmap
      • debounce
      • throttleFirst()
      • take
      • concat

RxJava 是一个基于 观察者模式的异步编程库,它提供了丰富的操作符来处理和转换数据流。 操作符是 RxJava 的核心组成部分,它们提供了一种灵活、可组合的方式来处理数据流,使得开发者可以更加便捷地进行数据处理和流程控制。

接口变化

RxJava 2.x拥有了新的特性,其依赖于4个接口:

  • Publisher
  • Subscriber
  • Subscription
  • Processor
  1. SubscriberFunc1变为Function等等。此外,还引入了SingleMaybeCompletable等新的可观察类型。
  2. RxJava 2.x 中的背压支持:RxJava 2.x 引入了对背压的支持,新增了Flowable类型来处理背压场景。同时,对一些操作符的行为进行了一些修改以适应背压机制。
  3. 异常处理方式的变化:在RxJava 1.x中,异常处理是通过onError()方法来处理,而在RxJava 2.x中,引入了onError(Throwable)方法和onError(Throwable, boolean)方法,允许开发者控制是否中断流程。
  4. 取消订阅的方式变化:在RxJava 1.x中,使用unsubscribe()方法取消订阅,而在RxJava 2.x中,使用dispose()方法取消订阅

关于背压:

在RxJava中,背压(Backpressure)是一种处理生产者和消费者之间速度不匹配的机制。通过背压,可以使得消费者根据自身的处理能力告知生产者它们能够接受的数据量,从而避免生产者产生过多的数据导致消费者无法处理的情况。

而在RxJava 2.x ~ RxJava 3.x,发生以下变化:

  • 不再支持Backpressure:RxJava 3.x不再内置支持背压机制,而是采用基于Reactive-Streams的响应式规范,并提供了相应的Flowable类型。因此,在RxJava 3.x中,需要使用Flowable来处理背压场景。
  • Observer接口的变化:在RxJava 3.x中,Observer接口被拆分为两个接口:ObserverDisposableObserver接口用于处理事件的消费,而Disposable接口用于取消订阅。
  • SingleObserverCompletableObserver的变化:在RxJava 3.x中,SingleObserverCompletableObserver接口的方法签名有所变化,取消订阅的方法从dispose()改为了onDispose()

操作符

RxJava提供了对事件序列进行变换的支持,这是它的核心功能之一.所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。
操作符是用来对Observable(或Flowable)流进行转换、过滤、组合和操作的方法。

RxJava提供了很多很有用的操作符。多的要死
在Rxjava 3.x 下,有以下常见的操作符:

  1. map:将Observable发射的数据项通过指定的函数进行转换,并发射转换后的数据项。
  2. filter:根据指定的条件过滤Observable发射的数据项,只发射满足条件的数据项。
  3. take:只发射Observable发射的前N个数据项,忽略后面的数据项。
  4. skip:跳过Observable发射的前N个数据项,只发射后面的数据项。
  5. merge:将多个Observable合并成一个Observable,按照时间顺序发射合并后的数据项。
  6. zip:将多个Observable按照顺序进行合并,每个数据项都是由对应位置的Observable发射的数据项组合而成。
  7. concat:按照顺序连接多个Observable,依次发射它们的数据项,等前一个Observable完成后才会订阅下一个Observable。
  8. onErrorResumeNext:在Observable发生错误时,使用备用的Observable继续发射数据项。
  9. retry:在Observable发生错误时,进行错误重试,重新订阅Observable。
  10. interval:创建一个按照固定时间间隔发射递增数值的Observable。
  11. debounce:只有在指定的时间间隔内没有发射新的数据项时,才发射最后一个数据项。
  12. distinct:过滤掉重复的数据项,只发射不重复的数据项。
  13. flatMap:将Observable发射的数据项转换为Observable集合,并按顺序发射这些Observable发射的数据项。
  14. reduce:对Observable发射的数据项进行累积操作,返回最终的累积结果。
  15. scan:对Observable发射的数据项进行累积操作,并按顺序发射每次累积的结果

在此简单介绍其中几个的用法:

map

示意图:

image.png

实际上,map操作符可以理解为对Observable发射的每个数据项都应用一个函数,将原始数据项转换为另一种形式的数据项,然后再发射出去。(感觉Kotlin里有)

假设我们有一个Observable发射的是整数序列,我们想将每个整数乘以2,并发射出去。

Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5);

observable
    .map(number -> number * 2)
    .subscribe(result -> System.out.println(result));

输出:
2
4
6
8
10

flatmap

flatMap操作符会对Observable的每个数据项应用一个函数,这个函数返回一个新的Observable。然后,它会将这些新的Observable合并成一个Observable,并发射合并后的数据项。

假设我们有一个Observable发射的是字符串数组,我们想将每个字符串拆分为字符数组,并发射出去。

Observable<String> observable = Observable.just("Hello", "World", "RxJava");

observable
    .flatMap(str -> Observable.fromArray(str.split("")))
    .subscribe(character -> System.out.print(character + " "));

输出:

H e l l o W o r l d R x J a v a

flatMap操作符将每个字符串拆分为字符数组,并将所有的字符合并成了一个Observable,最终发射出去。

debounce

debounce操作符也是RxJava中常用的操作符之一,它用于在一定时间间隔内只发射最后一个数据项,忽略中间的数据项。debounce操作符主要用于处理需要在一定时间内连续发生的事件,但只关心最后一个事件的场景。

在安卓开发中,debounce操作符可以用于处理用户输入场景,比如搜索框输入关键词时,通常需要等待用户停止输入一段时间后再进行搜索,以减少不必要的网络请求。

Observable<String> observable = Observable.create(emitter -> {
    editText.addTextChangedListener(new TextWatcher() {
        @Override
        public void beforeTextChanged(CharSequence s, int start, int count, int after) {
        }

        @Override
        public void onTextChanged(CharSequence s, int start, int before, int count) {
        }

        @Override
        public void afterTextChanged(Editable s) {
            emitter.onNext(s.toString());
        }
    });
});

observable
    .debounce(500, TimeUnit.MILLISECONDS)
    .subscribe(keyword -> {
        // 进行搜索操作
        performSearch(keyword);
    });

我们首先创建了一个Observable对象,该Observable通过监听EditText的文本变化事件,将用户输入的关键词发射出去。

然后,我们使用debounce操作符,设置一个时间间隔(这里是500毫秒),它会在用户输入停止500毫秒后才发射最后一个关键词。

最后,通过subscribe方法订阅Observable,并在订阅中执行搜索操作。

这样做的好处是,用户在连续输入时,debounce操作符会忽略中间的输入,只关注最后一个输入,在用户停止输入一段时间后才执行搜索操作,避免不必要的网络请求。

throttleFirst()

throttleFirst()操作符也是RxJava中常用的操作符之一,它用于在指定时间间隔内只发射第一个数据项,忽略后续的数据项。throttleFirst操作符主要用于处理需要限制触发频率的事件,保证在指定时间间隔内只处理一次。

在安卓开发中,throttleFirst操作符可以用于处理按钮点击事件,防止用户重复点击按钮造成重复操作:

Observable<Object> observable = Observable.create(emitter -> {
    button.setOnClickListener(v -> {
        emitter.onNext(new Object());
    });
});

observable
    .throttleFirst(1000, TimeUnit.MILLISECONDS)
    .subscribe(event -> {
        // 执行按钮点击操作
        ClickAction();
    });

take

take()操作符也是RxJava中常用的操作符之一,用于从Observable中取出一定数量的数据项,并在达到指定数量后停止发射。它可以与Retrofit和RxJava的线程切换一起使用,来控制网络请求结果的数量和线程切换。

在安卓开发中,通常使用Retrofit进行网络请求,而结合RxJava可以实现异步操作和线程切换。下面是一个结合Retrofit和RxJava的实例,使用take操作符来限制结果数量,并配合线程切换:

首先,创建一个网络请求的接口:

public interface ApiInterface {
    @GET("data")
    Observable<List<Data>> getData();
}

然后,创建一个Retrofit实例,并结合RxJava的Observable进行网络请求:

ApiInterface apiInterface = RetrofitClient.getClient().create(ApiInterface.class);

apiInterface.getData()
    .subscribeOn(Schedulers.io()) // 在IO线程进行网络请求
    .observeOn(AndroidSchedulers.mainThread()) // 在主线程接收和处理结果
    .take(5) // 只接收前5个数据项
    .subscribe(dataList -> {
        // 处理获取到的数据
        for (Data data : dataList) {
            Log.d(TAG, "Received data: " + data.toString());
        }
    }, throwable -> {
        // 处理错误
        Log.e(TAG, "Error: " + throwable.getMessage());
    });

先建了一个ApiInterface的实例,用于定义网络接口。

然后用Retrofit和RxJava的Observable结合进行网络请求。通过subscribeOn()方法指定在IO线程进行网络请求,observeOn()方法指定在主线程接收和处理结果。使用take(5)操作符来限制只接收前5个数据项,即结果数量限制为5。

最后,通过subscribe方法订阅Observable,并在订阅中处理获取到的数据或错误。

concat

concat()操作符是RxJava中常用的操作符之一,用于将多个Observable按顺序连接在一起,并依次发射数据。它可以在安卓开发中用于实现多个下载任务的顺序执行。

在安卓开发中,有时需要进行多个文件的下载操作,可以使用concat操作符来依次执行下载任务:

创建一个下载任务的接口:

public interface DownloadService {
    @GET
    Observable<ResponseBody> downloadFile(@Url String fileUrl);
}

Retrofit结合RxJava的Observable进行下载任务:

DownloadService service = RetrofitClient.getClient().create(DownloadService.class);

Observable<ResponseBody> downloadTask1 = service.downloadFile("http://example.com/file1");
Observable<ResponseBody> downloadTask2 = service.downloadFile("http://example.com/file2");
Observable<ResponseBody> downloadTask3 = service.downloadFile("http://example.com/file3");

Observable.concat(downloadTask1, downloadTask2, downloadTask3)
    .subscribeOn(Schedulers.io()) // 在IO线程进行下载任务
    .observeOn(AndroidSchedulers.mainThread()) // 在主线程接收和处理结果
    .subscribe(responseBody -> {
        // 处理下载完成的文件
        saveFile(responseBody);
    }, throwable -> {
        // 处理错误
        Log.e(TAG, "Error: " + throwable.getMessage());
    });

操作符真的很多,其他的可以看详细文档进行转换,学习RxJava的操作符的关键是理解其原理和使用场景,以及熟悉常用的操作符和它们的功能。

操作符可以总结为以下几种:

  1. 转换操作符:用来对数据进行转换,比如将一个数据类型转换成另一个数据类型,或者对数据进行映射或扁平化处理。
  2. 过滤操作符:用来过滤数据流中的元素,比如只保留满足特定条件的元素,或者去除重复的元素。
  3. 组合操作符:用来将多个数据流进行组合,比如将多个流依次连接在一起,或者合并多个流的元素。
  4. 错误处理操作符:用来处理异常和错误情况,比如在遇到错误时返回一个默认值,或者在错误发生时切换到另一个数据流。
  5. 调度操作符:用来控制数据流在不同线程之间的切换,比如将数据流切换到IO线程执行耗时操作,或者将结果切换回主线程更新UI。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1240893.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

5-1 Java 网络编程

第1关&#xff1a;URL类与InetAddress类 任务描述 本关任务&#xff1a;了解网络编程基础知识。 相关知识 为了完成本关任务&#xff0c;你需要掌握&#xff1a;1.URL&#xff1b;2.InetAddress。 URL 统一资源定位符&#xff08;Uniform Resource Locator&#xff0c;缩…

新的centos7.9安装jenkins—(一)

更多ruoyi-nbcio功能请看演示系统 gitee源代码地址 前后端代码&#xff1a; https://gitee.com/nbacheng/ruoyi-nbcio 演示地址&#xff1a;RuoYi-Nbcio后台管理系统 因为是用java8&#xff0c;所以还是要最后java8版本的jenkins&#xff0c;版本号是2.346.3&#xff0c;后…

HTTP四大参数类型及请求参数的方式和如何接收

HTTP 请求中4大参数类型和接收方法。 1、请求头参数head 请求头参数顾名思义&#xff0c;是存放在请求头中发送给服务器的参数&#xff0c;服务器通过解析请求头获取参数内容。通常会存放本次请求的基本设置&#xff0c;以帮助服务器理解并解析本次请求的body体。 参数形式如…

大模型变身双面人:虚假新闻制造机VS假新闻鉴别大师!

大家是怎样看待大型语言模型生成信息的可靠性呢&#xff1f; 尽管大语言模型生成的内容“像模像样”&#xff0c;但这些模型偶尔的失误揭示了一个关键问题&#xff1a;它们生成的内容并不总是真实可靠的。 那么&#xff0c;这种“不保真”特性能否被用来制造虚假信息呢&#x…

使用Python画一棵树

&#x1f38a;专栏【不单调的代码】 &#x1f354;喜欢的诗句&#xff1a;更喜岷山千里雪 三军过后尽开颜。 &#x1f386;音乐分享【如愿】 &#x1f970;欢迎并且感谢大家指出我的问题 文章目录 &#x1f339;Turtle模块&#x1f384;效果&#x1f33a;代码&#x1f6f8;代码…

在 vscode 中的json文件写注释,不报错的解决办法

打开 vscode 的「设置」&#xff0c;搜索&#xff1a;files: associations&#xff0c;然后添加 *.json jsonc最后

react中模块化样式中:global的作用

在react中如果是通过import styles from ./index.less这种方式模块化引入样式的话&#xff0c;那么编译后的less文件里的样式名都会自动添加后缀。而:global的作用就是不让类名添加后缀

IT 领域中的主要自动化趋势

48%的IT自动化流程属于IT服务管理&#xff0c;过去一年中&#xff0c;IT运维自动化增长了272%。 IT部门从交付者转变为战略伙伴 今年的《工作自动化指数》数据显示&#xff0c;自动化正在蔓延到组织的各个部门&#xff0c;越来越多的部门采用自动化&#xff0c;并且IT以外的员工…

城市管理实景三维:打造智慧城市的新引擎

城市管理实景三维&#xff1a;打造智慧城市的新引擎 在城市管理领域&#xff0c;实景三维技术正逐渐成为推动城市发展的新引擎。通过以精准的数字模型呈现城市真实场景&#xff0c;实景三维技术为城市决策提供了全新的思路和工具。从规划设计到交通管理&#xff0c;从环境保护到…

ETL-使用kettle批量复制sqlserver数据到mysql数据库

文章标题 1、安装sqlserver数据库2、下载kettle3、业务分析4、详细流程&#xff08;1&#xff09;转换1&#xff1a;获取sqlserver所有表格名字&#xff0c;将记录复制到结果&#xff08;2&#xff09;转换2&#xff1a;从结果设置变量&#xff08;3&#xff09;转换3&#xff…

STM32_5(中断)

中断系统 中断&#xff1a;在主程序运行过程中&#xff0c;出现了特定的中断触发条件&#xff08;中断源&#xff09;&#xff0c;使得CPU暂停当前正在运行的程序&#xff0c;转而去处理中断程序&#xff0c;处理完成后又返回原来被暂停的位置继续运行中断优先级&#xff1a;当…

Linux之进程替换

创建子进程的目的 创建子进程的第一个目的是让子进程执行父进程对应的磁盘代码中的一部分, 第二个目的是让子进程想办法加载磁盘上指定的程序,让子进程执行新的代码和程序 一是让子进程执行父进程代码的一部分, 比如&#xff1a; 1 #include<stdio.h> 2 #include<…

ubuntu编译sqlite3并使用

SQLite3是一种轻量级的关系型数据库管理系统&#xff0c;它是在C语言基础上实现的。SQLite3具有许多优点&#xff0c;例如&#xff1a; 1.灵活&#xff1a;它可以在多种操作系统上运行&#xff0c;并且可以将多个数据库文件合并成一个文件。 2.易于使用&#xff1a;SQLite3使用…

循环队列详解!!c 语言版本(两种方法)双向链表和数组法!!

目录 1.什么是循环队列 2.循环队列的实现&#xff08;两种方法&#xff09; 第一种方法 数组法 1.源代码 2.源代码详解&#xff01;&#xff01; 1.创造队列空间和struct变量 2.队列判空 3.队列判满&#xff08;重点&#xff09; 4.队列的元素插入 5.队列的元素删除 …

2023亚太杯数学建模竞赛(亚太赛)选题建议+初步分析

如下为C君的2023亚太杯数学建模竞赛&#xff08;亚太赛&#xff09;选题建议初步分析&#xff1a; 提示&#xff1a;DS C君认为的难度&#xff1a;C<A<B&#xff0c;开放度&#xff1a;A<B<C。 以下为ABC题选题建议及初步分析&#xff1a; A题&#xff1a;Image…

读像火箭科学家一样思考笔记06_初学者之心

1. 专业化是目前流行的趋势 1.1. 通才&#xff08;generalist&#xff09;是指博而不精之人 1.2. 懂得的手艺越多&#xff0c;反而会家徒四壁 1.2.1. 希腊谚语 1.3. 这种态度代价很大&#xff0c;它阻断了不同学科思想的交融 2. 组合游戏 2.1. 某个行业的变革可能始于另一…

《微信小程序开发从入门到实战》学习二十六

3.4 开发参与投票页面 参与投票页面同样需要收集用户提交的信息&#xff0c;哪个用户在哪个投票选择了什么选项&#xff0c;因此它也是一个表单页面 3.4.1 如何获取投票信息 假设用户A在投票创建页面后填了表单&#xff08;1.创建投票&#xff09;&#xff0c;用户A 点了提交…

Antd Design的inputNumber实现千位分隔符和小数点并存

代码来自文章: react中使用antDesign的Input/InputNumber最多保留两位小数&#xff0c;多的小数位禁止输入&#xff0c;且实现输入实时校验并添加千位分隔符, 正则忘了很多, 我主要做个笔记. //定义InputNumber的参数 const NumberProps {min: 0,//最小值max: …

SQLite3

数据库简介 常用的数据库 大型数据库&#xff1a;Oracle 中型数据库&#xff1a;Server 是微软开发的数据库产品&#xff0c;主要支持 windows 平台。 小型数据库&#xff1a;mySQL 是一个小型关系型数据库管理系统&#xff0c;开放源码 。(嵌入式不需要存储太多数据。) SQL…

spark shuffle 剖析

ShuffleExchangeExec private lazy val writeMetrics SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)private[sql] lazy val readMetrics SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)用在了两个地方&#xff0c;承接的是…