RxJava/RxAndroid的操作符使用(二)

news2025/1/10 23:42:17

文章目录

  • 一、创建操作
    • 1、基本创建
    • 2、快速创建
      • 2.1 empty
      • 2.2 never
      • 2.3 error
      • 2.4 from
      • 2.5 just
    • 3、定时与延时创建操作
      • 3.1 defer
      • 3.2 timer
      • 3.3 interval
      • 3.4 intervalRange
      • 3.5 range
      • 3.6 repeat
  • 二、过滤操作
    • 1、skip/skipLast
    • 2、debounce
    • 3、distinct——去重
    • 4、elementAt——获取指定位置元素
    • 5、filter——过滤
    • 6、first——取第一个数据
    • 7、last——取最后一个
    • 8、ignoreElements & ignoreElement(忽略元素)
    • 9、ofType(过滤类型)
    • 10、sample
    • 11 、take & takeLast
  • 三、组合可观察对象操作符
    • 1、CombineLatest
    • 2、merge
    • 3、zip
    • 4、startWith
    • 5、join
  • 四、变化操作符
    • 1、map
    • 2、flatMap / concatMap
    • 3、scan
    • 4、buffer
    • 5、window
  • 关于RxJava/RxAndroid的全部文章

一、创建操作

1、基本创建

create.c

create创建一个基本的被观察者

在使用create()操作符时,最好在被观察者的回调函数subscribe()中加上isDisposed(),以便在观察者断开连接的时候不在执行subscribe()函数中的相关逻辑,避免意想不到的错误出现。

        Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Object> emitter) throws Throwable {
                try {
                    if(!emitter.isDisposed()){
                        emitter.onNext("a");
                        emitter.onNext("b");
                    }
                } catch (Exception e) {
                    emitter.onError(e);
                }
            }
        }).subscribe(
                value -> Log.e(TAG, "onNext: " + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete"));

2、快速创建

完整&快速创建被观察者、数组、集合遍历

操作符作用
empty创建一个只发送 onComplete 事件的 Observable。
never创建一个不发送任何事件的 Observable。
error创建一个只发送 onError 事件的 Observable。
from操作符用于将其他对象或数据结构转换为 Observable,可发送不同类型的数据流
just操作符将对象或一组对象转换为 Observable,并立即发送这些对象,没有延迟。

2.1 empty

empty.c

创建一个不发射任何items但正常终止的 Observable——create an Observable that emits no items but terminates normally

        Observable.empty().subscribe(
                value -> Log.e(TAG, "onNext: "+value ),
                error -> Log.e(TAG, "Error: "+error),
                ()->Log.e(TAG,"onComplete")
        );

image-20231107093722826

2.2 never

never.c

创建一个不发射任何items且不会终止的 Observable——create an Observable that emits no items and does not terminate

        Observable.never().subscribe(
                value -> Log.e(TAG, "onNext: " + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete"));

不发送任何事件

2.3 error

throw.c

创建一个不发射任何items并以错误终止的 Observable——create an Observable that emits no items and terminates with an error

        Observable.error(new Exception("ERROR")).subscribe(
                value -> Log.e(TAG, "onNext: " + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete"));

image-20231107094348333

2.4 from

image-20231107095234857

from.c

以fromAray举例:

        Observable.fromArray(1,2,3,4,5).subscribe(
                value -> Log.e(TAG, "onNext: " + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete"));

image-20231107095642506

2.5 just

just.c

        Observable.just(1,2,3,4,5).subscribe(
                value -> Log.e(TAG, "onNext: " + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete"));

image-20231107094504061

通过just()创建传入Integer类型的参数构建Observable被观察者,相当于执行了onNext(1)~onNext(5),通过链式编程订阅观察者。注意just的数据一般不能超过10个

注意,如果将 null 传递给 Just,它将返回一个将 null 作为项目发出的 Observable。不要错误地认为这将返回一个空的 Observable(根本不发出任何项目)

3、定时与延时创建操作

定时操作、周期性操作

操作符作用
defer直到有Observer观察者订阅时,才会通过Observeable的工厂方法动态创建Observeable,并且发送事件
timer用于延时发送,在给定的延迟后发出单个项目
interval它按照指定时间间隔发出整数序列,通常用于定时操作。
intervalRange类似于interval(),快速创建一个被观察者对象,指定时间间隔就发送事件,可以执行发送事件的数量
range它发出一个连续的整数序列,可以指定发送的次数
repeat重复发送指定次数的某个事件流

3.1 defer

defer.c

直到有Observer观察者订阅时,才会通过Observeable的工厂方法动态创建Observeable,并且发送事件

defer不会立即创建 Observable,而是等到观察者订阅时才动态创建,每个观察者都会得到一个新的 Observable 实例。

defer确保了Observable代码在被订阅后才执行(而不是创建后立即执行)

        Observable<Integer> integerObservable = Observable.defer(new Supplier<ObservableSource<? extends Integer>>() {
            @Override
            public ObservableSource<? extends Integer> get() throws Throwable {
                int randomNumber = (int) (Math.random() * 100);
                return Observable.just(randomNumber);
            }
        });

        integerObservable.subscribe(
                new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Throwable {
                        Log.e(TAG, "第一次" + integer.toString());
                    }
                }
        );
        

        integerObservable.subscribe(
                integer -> Log.e(TAG, "第二次" + integer.toString())
        );

image-20231107123942326

3.2 timer

timer.c

构造方法如下:

image-20231107152036633

 timer(long delay, TimeUnit unit)
 timer(long delay, TimeUnit unit, Scheduler scheduler)
  • delay:延时的时间,类型为Long;
  • unit:表示时间单位,有TimeUnit.SECONDS等多种类型;
  • scheduler:表示调度器,用于指定线程。

用于延时发送

        final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Log.e(TAG, "timer:当前时间 ==" + dateFormat.format(System.currentTimeMillis()));

        Observable.timer(5, TimeUnit.SECONDS).subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + dateFormat.format(System.currentTimeMillis())),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete")
        );

表示延迟5s后发送数据

image-20231107154813890

3.3 interval

interval.c

用于定时发送数据,快速创建Observable被观察者对象,每隔指定的时间就发送相应的事件,事件序列从0开始,无限递增1;

//在指定延迟时间后,每个多少时间发送一次事件
interval(long initialDelay, long period, TimeUnit unit)
 
//在指定的延迟时间后,每隔多少时间发送一次事件,可以指定调度器
interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
 
//每间隔多少时间发送一次事件,使用默认的线程
Observable<Long> interval(long period, TimeUnit unit)
 
//每间隔多少时间发送一次事件,可以指定调度器
interval(long period, TimeUnit unit, Scheduler scheduler)
  • initialDelay: 表示延迟开始的时间,类型为Long
  • period:距离下一次发送事件的时间间隔,类型Long
  • unit:时间单位,有TimeUnit.SECONDS等多种类型;
  • scheduler:表示调度器,用于指定线程。

它会从0开始,然后每隔 1 秒发射一个递增的整数值

        Observable.interval(1,3,TimeUnit.SECONDS).subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + dateFormat.format(System.currentTimeMillis())),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete")
        );

image-20231107160116792

定时发射指定的结果

// 创建一个每秒发射一个递增整数的 Observable
        Observable<Long> intervalObservable = Observable.interval(1, TimeUnit.SECONDS);

        // 使用 map 操作符将递增的整数值映射为您想要的数据类型
        Observable<String> customObservable = intervalObservable
            .map(index -> "Data_" + index); // 映射为字符串 "Data_" + index

        // 订阅并输出结果
        customObservable.subscribe(
            data -> System.out.println("Received: " + data),
            error -> System.err.println("Error: " + error),
            () -> System.out.println("Completed")
        );

3.4 intervalRange

类似于interval(),快速创建一个被观察者对象,指定时间间隔就发送事件,可以执行发送事件的数量,数据依次递增1。

intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
 
intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
  • start:表示事件开始的数值大小,类型为Long
  • count:表示事件执行的次数,类型为long,不能为负数;
  • initialDelay:表示延迟开始的时间,类型为Long;
  • period:距离下一次发送事件的时间间隔,类型Long;
  • unit:时间单位,有TimeUnit.SECONDS等多种类型;
  • scheduler:表示调度器,用于指定线程。
        Observable.intervalRange(10, 3, 2, 1,
                TimeUnit.SECONDS,
                Schedulers.io()).subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete")
        );

image-20231107161150732

3.5 range

range.c

Range 运算符按顺序发出一系列连续整数,可以在其中选择范围的起点及其长度。

它发出一个连续的整数序列,通常不涉及延迟。类似于intervalRange。

 public static Observable<Integer> range(int start, int count)
 public static Observable<Long> rangeLong(long start, long count)
  • start:事件开始的大小
  • count:发送的事件次数
    Observable.range(10,5).subscribe(
            value -> Log.e(TAG, "timer:onNext ==" + value),
            error -> Log.e(TAG, "Error: " + error),
            () -> Log.e(TAG, "onComplete")
    );

3.6 repeat

repeat.c

repeat操作符可以重复发送指定次数的某个事件流,repeat操作符默认在trampoline调度器上执行,repeat默认重复次数为Long.MAX_VALUE,可使用重载方法指定次数以及使用repeatUntil指定条件。

        //一直重复
        Observable.fromArray(1, 2, 3, 4).repeat();
        //重复发送5次
        Observable.fromArray(1, 2, 3, 4).repeat(5);
        //重复发送直到符合条件时停止重复
        Observable.fromArray(1, 2, 3, 4).repeatUntil(new BooleanSupplier() {
            @Override
            public boolean getAsBoolean() throws Exception {
                //自定判断条件,为true即可停止,默认为false
                return false;
            }
        });

二、过滤操作

1、skip/skipLast

skip

可以在Flowable,Observable中使用,表示源发射数据前,跳过多少个。

  1. skip: skip 操作符用于跳过 Observable 开头的一定数量的事件,然后开始发射后续的事件。它忽略序列的头部事件。

    例如,observable.skip(3) 会跳过前面的 3 个事件,然后发射后续的事件。

  2. skipLast: skipLast 操作符用于跳过 Observable 末尾的一定数量的事件,然后发射前面的事件。它忽略序列的末尾事件。

    例如,observable.skipLast(3) 会发射从序列开头到倒数第 3 个事件之前的事件,忽略了最后 3 个事件。

        Observable<Integer> integerObservable = Observable.just(1, 2, 3, 4, 5, 6, 7, 8);
        
        integerObservable.skipLast(3).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                Log.e(TAG, "accept: " + integer);
            }
        });

image-20231107205656235

换成skip后结果如下:

image-20231107205732571

2、debounce

仅当特定时间跨度过去而没有发出另一个项目时,才从 Observable 发出一个项目

DM_20231107210151_001

Observable.create(emitter -> {
    emitter.onNext(1);
    Thread.sleep(1_500);
    emitter.onNext(2);
    Thread.sleep(500);
    emitter.onNext(3);
    Thread.sleep(2000);
    emitter.onNext(4);
    emitter.onComplete();
}).subscribeOn(Schedulers.io()).debounce(1,TimeUnit.SECONDS).blockingSubscribe(
    value -> Log.e(TAG, "timer:onNext ==" + value),
    error -> Log.e(TAG, "Error: " + error),
    () -> Log.e(TAG, "onComplete")
);

debounce(1, TimeUnit.SECONDS) 表示将事件流中的事件按照时间窗口的方式进行过滤。具体含义是,如果在连续的 1 秒内没有新的事件发射,那么才会将最后一个事件传递给观察者,否则会丢弃之前的事件。

image-20231108122709311

结合图像理解,红色线条为debounce监听的发射节点,也就是每隔一秒发送一次数据。

在0s时发送了1。

在1s时由于没有数据,就没有发送数据。

在1s—2s期间产生了两次数据,分别是2和3。但是debounce只会将距离2s最近一次的数据发送。因此2被不会发送出来。

image-20231108123140801

3、distinct——去重

DM_20231108124123_001

可作用于Flowable,Observable,去掉数据源重复的数据。

        Observable.just(1,2,3,1,2,3,4).distinct().subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete")
        );

image-20231108123826742

DM_20231108124148_001

distinctUntilChanged()去掉相邻重复数据。

      Observable.just(1,3,3,2,2,3,4).distinctUntilChanged().subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete")
        );

//还可以指定重复条件
        Observable.just(1,3,3,2,2,3,4).distinctUntilChanged(new Function<Integer, Boolean>() {
            @Override
            public Boolean apply(Integer integer) throws Throwable {
                return integer>3;
            }
        }).subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete")
        );

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

4、elementAt——获取指定位置元素

20200404170305941

//获取索引为1的元素,如果不存在返回Error
Observable.just("a","b","c","d","e").elementAt(1,"Error").subscribe(
        value -> Log.e(TAG, "timer:onNext ==" + value),
        error -> Log.e(TAG, "Error: " + error),
);

image-20231108124926798

5、filter——过滤

用于过滤指定的发射元素。

20200404170341923

        Observable.just(1, 2, 3, 4, 5, 6).filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Throwable {
                return (integer % 2) != 0;
            }
        }).subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete")
        );

image-20231108125316182

6、first——取第一个数据

20200404170409304

      //不存在则返回100
Observable.just(1, 2, 3, 4, 5, 6).first(100).subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + value),
                error -> Log.e(TAG, "Error: " + error)
        );

image-20231108130922361

7、last——取最后一个

20200404170445807

last、lastElement、lastOrError与fist、firstElement、firstOrError相对应。

   Observable.just(1, 2, 3, 4, 5, 6).last(100).subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + value),
                error -> Log.e(TAG, "Error: " + error)
        );

image-20231108131530432

8、ignoreElements & ignoreElement(忽略元素)

20200404170613536

ignoreElements 作用于FlowableObservableignoreElement作用于MaybeSingle。两者都是忽略掉数据,不发射任何数据,返回完成或者错误时间。

9、ofType(过滤类型)

作用于Flowable、Observable、Maybe,过滤选择类型。

        Observable.just(1, 2, 3, 4.4, 5.5, 6.6).ofType(Integer.class).subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + value),
                error -> Log.e(TAG, "Error: " + error)
        );

image-20231108132125800

10、sample

  1. debounce:它等待一段时间,如果在这段时间内没有新事件到达,它会发射最后一个事件。它用于处理高频率事件流,例如用户输入,以确保只处理用户停止输入后的事件。debounce 等待事件流静止,然后发射最后一个事件。
  2. sample:它按照固定的时间间隔从事件流中抽样一个事件,并发射该事件。它用于定期采样事件流,例如从传感器数据中每隔一段时间获取一次数据。sample 定期获取事件,无论事件流是否活跃。
        Observable<Integer> observable = Observable.create(emitter -> {
            emitter.onNext(1);
            Thread.sleep(1_500);
            emitter.onNext(2);
            Thread.sleep(500);
            emitter.onNext(3);
            Thread.sleep(2000);
            emitter.onNext(4);
            emitter.onComplete();
        });

        observable.sample(1, TimeUnit.SECONDS).subscribe(
                        value -> Log.e(TAG, "timer:onNext ==" + value),
                        error -> Log.e(TAG, "Error: " + error)
                );

image-20231108133255137

产生的数据在红线处发送

1在第1s时被发送,2在第2s时被发送,3在第3s时被发送,由于4还未在第5s时就已经onComplete所以4无法被发送

image-20231108133014660

11 、take & takeLast

20200404170852608

作用于Flowable、Observable。take发射前n个元素。takeLast发射后n个元素。

        Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        source.take(4)
                .subscribe(value-> Log.e(TAG, "timer:onNext ==" + value));
        //打印:1 2 3 4

        source.takeLast(4)
                .subscribe(value-> Log.e(TAG, "timer:onNext ==" + value));
        //打印:7 8 9 10

三、组合可观察对象操作符

操作符作用
combineLatest用于将多个 Observable 中最新的事件进行组合,并生成一个新的事件。
merge用于将多个 Observable 合并成一个单一的 Observable,按照它们发射事件的顺序合并
zip用于一一配对多个 Observable 发射的事件,只有当所有 Observable 都有事件时才生成新事件
startWith用于在一个 Observable 发射的事件前插入一个或多个初始事件
join用于将两个 Observable 的事件按照时间窗口的方式进行组合。

1、CombineLatest

image-20231108134447676

通过指定的函数将每个 Observable 发出的最新项目组合在一起,并根据该函数的结果发出项目

  • combineLatest 用于将多个 Observable 中最新的事件进行组合,并生成一个新的事件
  • 当任何一个 Observable 发射新数据时,都会生成新的组合事件。
  • 适用于需要及时反应多个数据源最新值变化的情况。
Observable<Integer> source1 = Observable.just(1, 2, 3);
Observable<String> source2 = Observable.just("A", "B", "C");
Observable<Boolean> source3 = Observable.just(true, false, true);

Observable<String> combined = Observable.combineLatest(
        source1,
        source2,
        source3,
        (integer, string, aBoolean) -> integer + " " + string + " " + aBoolean
);

combined.subscribe(
        value -> Log.e(TAG, "timer:onNext ==" + value),
        error -> Log.e(TAG, "Error: " + error),
        () -> Log.e(TAG, "onComplete")
);

image-20231108134458384

2、merge

mergeDelayError.C

  • merge 用于将多个 Observable 合并成一个单一的 Observable,按照它们发射事件的顺序合并
  • merge 不会进行事件的组合,只是合并多个 Observable 的事件。
  • 适用于需要将多个 Observable 的事件合并成一个流的情况。

注意:merge只能合并相同类型的Observable

        Observable<Integer> source1 = Observable.just(1, 2, 3);
        Observable<Integer> source2 = Observable.just(4,5,6);
        Observable<Integer> source3 = Observable.just(7,8,9);

        Observable<Integer> combined = Observable.merge(source1,source2,source3);

        combined.subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete")
        );

image-20231108135448811

3、zip

下载

  • zip 用于一一配对多个 Observable 发射的事件,只有当所有 Observable 都有事件时才生成新事件
  • zip 会等待所有 Observable 都有事件后,才会执行组合函数生成新事件。
  • 适用于需要将多个数据源的事件一一配对的情况。
        Observable<Integer> source1 = Observable.just(1, 2, 3);
        Observable<String> source2 = Observable.just("A", "B", "C");
        Observable<Boolean> source3 = Observable.just(true, false, true);

        Observable<String> combined = Observable.zip(
                source1,
                source2,
                source3,
                (integer, string, aBoolean) -> integer + " " + string + " " + aBoolean
        );

        combined.subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete")
        );

image-20231108135842793

4、startWith

DM_20231108140443_001

  • startWith 用于在一个 Observable 发射的事件前插入一个或多个初始事件
  • 这些初始事件会作为 Observable 的开头。
  • 适用于需要在 Observable 发射事件前添加一些初始数据的情况。

image-20231108140316862

        Observable<Integer> source = Observable.just(1, 2, 3);
        Observable<Integer> withStart = source.startWithArray(100,200);

        withStart.subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete")
        );

image-20231108140348057

5、join

20200404175817782

  • join 用于将两个 Observable 的事件按照时间窗口的方式进行组合
  • 可以为每个 Observable 设置时间窗口,然后在这些窗口内组合事件。
  • 适用于需要在时间窗口内组合两个 Observable 的事件的情况。

时间窗口:

  • 固定时间窗口:定义一个固定的时间段,将在该时间段内的事件分为一个时间窗口。
  • 延时时间窗口:定义一个时间段,但在事件发生后延迟一段时间后才分为时间窗口。
  • 动态时间窗口:根据事件的特定条件动态地定义时间窗口。
        Observable<Integer> left = Observable.just(1, 2, 3);
        Observable<Integer> right = Observable.just(10, 20, 30);

        left.join(
                        right,
                        leftDuration -> Observable.timer(1, TimeUnit.SECONDS),
                        rightDuration -> Observable.timer(1, TimeUnit.SECONDS),
                        (leftValue, rightValue) -> "Left: " + leftValue + ", Right: " + rightValue
                )
                .subscribe(value -> Log.e(TAG, "timer:onNext ==" + value));

在这个示例中,我们定义了以下时间窗口规则:

  • 左边的时间窗口规则:leftDuration -> Observable.timer(1, TimeUnit.SECONDS) 表示在左边的事件后等待 1 秒后生成一个时间窗口。
  • 右边的时间窗口规则:rightDuration -> Observable.timer(2, TimeUnit.SECONDS) 表示在右边的事件后等待 2 秒后生成一个时间窗口。

现在让我们看看时间窗口如何影响事件的组合:

  • 当左边的事件 1 发生时,它会进入左边的时间窗口,并等待 1 秒。在此期间,右边的事件没有机会进入左边的时间窗口。
  • 当右边的事件 10 发生时,它会进入右边的时间窗口,并等待 2 秒。在此期间,左边的事件也没有机会进入右边的时间窗口。

只有在左边和右边的事件都在各自的时间窗口内时,它们才会被组合。在这个示例中,左边的事件会在右边的时间窗口内被组合。所以,在 1 秒后,左边的事件 1 和右边的事件 10 被组合成 “Left: 1, Right: 10”。

image-20231108185229730

四、变化操作符

| 操作符 | 说明 |

map()对数据流的类型进行转换
flatMap()对数据流的类型进行包装成另一个数据流
scan()scan操作符会对发射的数据上一轮发射的数据进行函数处理,并返回的数据供下一轮使用。
buffer()缓存指定大小数据
window()缓存指定大小数据,返回新的integerObservable


​ 对上一轮处理过后的数据流进行函数处理
​ 对所有的数据流进行分组
​ 缓存发射的数据流到一定数量,随后发射出数据流集合
​ 缓存发射的数据流到一定数量,随后发射出新的事件流

1、map

20200404154637646

        Observable.just(1,2,3).map(new Function<Integer, Object>() {
            @Override
            public Object apply(Integer integer) throws Throwable {
                return integer * 100;
            }
        }).subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete")
        );

2、flatMap / concatMap

20200404155254319

        Observable observable = Observable.just(isLogin("12346")).flatMap(new Function<Boolean, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(Boolean aBoolean) throws Throwable {
                String Login = "登陆失败,帐号秘密错误";
                if (aBoolean) Login = "登陆成功";
                return Observable.just(Login).delay(2, TimeUnit.SECONDS);
            }
        });

        observable.subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete")
        );
        
            private boolean isLogin(String passWord) {
        if (passWord.equals("123456")) {
            return true;
        }
        return false;
    }

image-20231108191254259

  1. Observable.just(isLogin("12346")) 创建一个 Observable,它会发射一个布尔值,表示登录是否成功。
  2. .flatMap(new Function<Boolean, ObservableSource<?>>() { ... }:使用 flatMap 操作符将上一步的布尔值结果转换成一个新的 Observable,其中包含登录的结果消息。flatMap 中的 apply 方法根据登录结果 aBoolean 决定返回不同的消息。如果登录成功,返回 “登陆成功” 消息,否则返回 “登陆失败,帐号秘密错误” 消息,并使用 delay 延迟 2 秒发送消息。
  3. observable.subscribe(...):最后,订阅 observable,并设置了三个回调函数,分别处理 onNext、onError、onComplete 事件。

concatMap与flatMap的区别: concatMap是有序的,flatMap是无序的

  1. flatMap():
    • 不保证内部 Observable 的发射顺序,它会尽可能并行地处理内部 Observable,并将它们的发射结果合并到一个单一的 Observable 中。
    • 内部 Observable 可以乱序发射数据,最终结果也可能是乱序的。
  2. concatMap():
    • 保证内部 Observable 的发射顺序,它会按照原始数据的顺序依次处理每个内部 Observable,等待一个内部 Observable 完成后再处理下一个。
    • 内部 Observable 的发射顺序和最终结果的顺序都与原始数据的顺序一致。
        Observable<Integer> source = Observable.just(1, 2, 3);

        // 使用 flatMap
        source.flatMap(num -> Observable.just(num * 2).delay(num, TimeUnit.MILLISECONDS))
                .subscribe(value -> Log.e(TAG, "timer:flatMapOnNext ==" + value));

        // 使用 concatMap
        source.concatMap(num -> Observable.just(num * 2).delay(num, TimeUnit.MILLISECONDS))
                .subscribe(value -> Log.e(TAG, "timer:concatMapOnNext ==" + value));

image-20231108191544320

3、scan

20200404161402851

scan操作符会对发射的数据上一轮发射的数据进行函数处理,并返回的数据供下一轮使用。

        Observable<Integer> observable = Observable.just(1,2,3,4).scan(new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) throws Throwable {
                Log.e(TAG, "integer = " + integer +" integer2 = "+integer2);
                return integer2-integer;
            }
        });

        observable.subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete")
        );

image-20231108192719260

  1. 初始情况下,前一个累积结果为空(因为没有前一个值),所以第一个数据项 1 直接发射出来,产生的结果是 1。
  2. 接下来,前一个累积结果是 1,当前数据项是 2,所以执行操作 2 - 1,产生的结果是 1。
  3. 再次执行,前一个累积结果是 1,当前数据项是 3,所以执行操作 3 - 1,产生的结果是 2。
  4. 最后,前一个累积结果是 2,当前数据项是 4,执行操作 4 - 2,产生的结果是 2。

4、buffer

20200404163816872

buffer操作符可以将发射出来的数据流,在给定的缓存池中进行缓存,当缓存池中的数据项溢满时,则将缓存池的数据项进行输出,重复上述过程,直到将发射出来的数据全部发射出去。

        Observable.just(1,2,3,4,5,6,7,8).buffer(3).subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete")
        )

image-20231108194136484

5、window

20200404164717644

window操作符和buffer操作符在功能上实现的效果是一样的,但window操作符最大区别在于同样是缓存一定数量的数据项,window操作符最终发射出来的是新的事件流integerObservable,而buffer操作符发射出来的是新的数据流。

也就是说,window操作符发射出来新的事件流中的数据项,还可以经过Rxjava其他操作符进行处理

window 操作符用于将一个 Observable 拆分为多个子 Observable,每个子 Observable 包含一定数量的连续数据项。window 操作符的两个参数的含义如下:

  1. 第一个参数(count):指定每个子 Observable 中包含的数据项的数量。
  2. 第二个参数(skip):指定何时启动新的窗口。它定义了窗口之间的重叠或间隔。如果 skip 等于 count,则窗口之间不重叠。如果 skip 小于 count,则窗口之间有重叠数据。

举个例子来说明:

假设有一个 Observable 发出的数据序列如下:1, 2, 3, 4, 5, 6, 7, 8, 9。

  • 如果你使用 window(3, 2),它的含义是每个窗口包含 3 个数据项,且窗口之间间隔 2 个数据项。那么分割后的子 Observable 将是:
    • 窗口1:1, 2, 3
    • 窗口2:3, 4, 5
    • 窗口3:5, 6, 7
    • 窗口4:7, 8, 9
  • 如果你使用 window(3, 3),窗口之间不重叠,每个窗口包含 3 个数据项。那么分割后的子 Observable 将是:
    • 窗口1:1, 2, 3
    • 窗口2:4, 5, 6
    • 窗口3:7, 8, 9

这里只使用了一个参数,用于指定窗口的大小。然后更具window发射新的事件流integerObservable的特性,对这个事件流进行了去重操作。

        Observable.just(1,1,3,4,6,6,7,8)
                .window(3).subscribe(new Consumer<Observable<Integer>>() {
                    @Override
                    public void accept(Observable<Integer> integerObservable) throws Throwable {
                        integerObservable.distinct().subscribe(value -> Log.e(TAG, "timer:onNext ==" + value.toString()));
                    }
                });

关于RxJava/RxAndroid的全部文章

RxJava/RxAndroid的基本使用方法(一)

RxJava的操作符使用(二)


参考文档:

官方文档:reactivex

RxJava3 Wiki:Home · ReactiveX/RxJava Wiki · GitHub

RxJava3官方github:What’s different in 3.0 · ReactiveX/RxJava Wiki · GitHub

RxJava2 只看这一篇文章就够了–玉刚说

RxJava2最全面、最详细的讲解–苏火火丶

关于背压(Backpressure)的介绍:关于RxJava最友好的文章——背压(Backpressure)

RXJava3+OKHTTP3+Retrofit2(观察者设计模式)讲解+实战

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1186300.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

xshell是什么软件,1000字让你完全了解xshell

很多从事开发或网络安全的人都或多或少知道xshell是什么软件&#xff0c;但是如果没有试用过的话可能对它的功能并不完全了解。今天小编就带你详细了解一下Xshell究竟是什么。 xshell是什么软件 一、xshell是什么软件 Xshell是一款功能强大的SSH&#xff08;Secure Shell&…

Jdk 1.8 for mac 详细安装教程(含版本切换)

Jdk 1.8 for mac 详细安装教程&#xff08;含版本切换&#xff09; 官网下载链接 https://www.oracle.com/cn/java/technologies/downloads/#java8-mac 一、选择我们需要安装的jdk版本&#xff0c;这里以jdk8为例&#xff0c;下载 macOS 版本&#xff0c;M芯片下载ARM64版本…

基于驾驶训练算法的无人机航迹规划-附代码

基于驾驶训练算法的无人机航迹规划 文章目录 基于驾驶训练算法的无人机航迹规划1.驾驶训练搜索算法2.无人机飞行环境建模3.无人机航迹规划建模4.实验结果4.1地图创建4.2 航迹规划 5.参考文献6.Matlab代码 摘要&#xff1a;本文主要介绍利用驾驶训练算法来优化无人机航迹规划。 …

07 # 手写 find 方法

find 的使用 find() 方法返回数组中满足提供的测试函数的第一个元素的值。否则返回 undefined。 ele&#xff1a;表示数组中的每一个元素index&#xff1a;表示数据中元素的索引array&#xff1a;表示数组 <script>var arr [1, 3, 5, 7, 9];var result arr.find(fun…

SpringBoot加载测试类属性和配置说明

一、项目准备 1.创建项目 2.配置yml文件 test:name: FOREVERlove: sing二、测试类属性 1.Value 说明&#xff1a;读取yml中的数据。 package com.forever;import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Value; import org.spr…

用中文编程工具编写的代码实现如图所示效果,请分享一下用你所学的编程语言写下这个代码,大家一起交流学习

用中文编程工具编写的代码实现如图所示效果&#xff0c;请分享一下用你所学的编程语言写下这个代码&#xff0c;大家一起交流学习 编程要求如图&#xff1a;在输入框里输入行数&#xff0c;随便输入多少&#xff0c;点击按钮&#xff0c;即刻显示如图所示效果&#xff0c;下一…

nacos注册中心/配置中心的使用

Nacos下载: https://github.com/alibaba/nacos/releases Nacos启动&#xff1a; 此处为了演示方便&#xff0c;下载的是 Windows版本 nacos-server-2.2.2.zip 。 进入 nacos-server-2.2.2\nacos\bin 文件夹&#xff0c;按shift和右键&#xff0c;选择"在此处打开PowerS…

【快速使用ShardingJDBC的哈希分片策略进行分库分表】

文章目录 &#x1f50a;博主介绍&#x1f964;本文内容&#x1f34a;1.引入maven依赖&#x1f34a;2.启动类上添加注解MapperScan&#x1f34a;3.添加application.properties配置&#x1f34a;4.普通的自定义实体类&#x1f34a;5.写个测试类验证一下&#x1f34a;6.控制台打印…

[动态规划] (十三) 简单多状态 LeetCode 740.删除并获得点数

[动态规划] (十三) 简单多状态: LeetCode 740.删除并获得点数 文章目录 [动态规划] (十三) 简单多状态: LeetCode 740.删除并获得点数题目解析解题思路状态表示状态转移方程初始化和填表顺序返回值 代码实现总结 740. 删除并获得点数 题目解析 (1) 给定一个整数数组。 (2) 选…

魔法导航菜单

效果展示 CSS 知识点 使用 box-shadow 属性实现不定项曲面 整体页面布局 <div class"navigation"><ul><li class"active"><a href"#"><span class"icon"><ion-icon name"home-outline"…

项目管理之项目时间箱内容补充

本章节内容为前述文章“项目管理之如何召开项目时间箱启动会议”的补充内容&#xff0c;请结合阅读。 时间箱管理 包括时间箱启动会、时间箱执行与控制、时间箱回顾会三个部分。 时间箱执行与控制包括探索、精进、巩固三个部分&#xff0c;每个部分使用迭代开发技术。 迭代开…

V90伺服 EPOS模式下回原(详细配置+SCL源代码)

TYPE "udtPID" VERSION : 0.1 STRUCT bRun : Bool; bDir : Bool; rSetPoint : Real; // 设置目标值工程量 rInput : Real; // 反馈工程量 Ts : DInt : 500; // 采样时间 Kp : Real : 1.0; // 比例系数 Ti : Re…

C++定位new(placement new)

C定位new(placement new) 普通new是在堆上申请一块内存空间&#xff0c;交由用户自己管理 定位new则是在用户已经申请好的一块空间重复使用&#xff0c;这个空间可以是栈上的也可以是堆上的 1、若是在堆上使用定位new&#xff0c;那么还是需要用户对内存自行管理&#xff0c;避…

希尔排序原理

目录&#xff1a; 一、希尔排序与插入排序 1&#xff09;希尔排序的概念 2&#xff09;插入排序实现 二、希尔排序实现 一、希尔排序与插入排序 1&#xff09;希尔排序的概念 希尔排序(Shells Sort)是插入排序的一种又称“缩小增量排序”&#xff08;Diminishing Incremen…

Python进阶教程:pandas数据分析实践示例总结

文章目录 前言一、分析数据文件二、数据预处理关于Python技术储备一、Python所有方向的学习路线二、Python基础学习视频三、精品Python学习书籍四、Python工具包项目源码合集①Python工具包②Python实战案例③Python小游戏源码五、面试资料六、Python兼职渠道 前言 在近日的py…

RestTemplate配置和使用

在项目中&#xff0c;如果要调用第三方的http服务&#xff0c;就需要发起http请求&#xff0c;常用的请求方式&#xff1a;第一种&#xff0c;使用java原生发起http请求&#xff0c;这种方式不需要引入第三方库&#xff0c;但是连接不可复用&#xff0c;如果要实现连接复用&…

武汉凯迪正大—电能质量分析仪功能介绍

功能介绍&#xff1a; 测试功能&#xff1a; 波形实时显示&#xff08;4路电压/4路电流&#xff09;&#xff1b;电压和电流真有效值&#xff1b;电压直流成份&#xff1b;电流和电压峰值&#xff1b;电流和电压一段时间内的最大/最小值&#xff1b;相量图显示&#xff1b;各相…

深入浅出理解ResNet网络模型+PyTorch实现

温故而知新&#xff0c;可以为师矣&#xff01; 一、参考资料 原始论文&#xff1a;Identity Mappings in Deep Residual Networks 原论文地址&#xff1a;Deep Residual Learning for Image Recognition ResNet详解PyTorch实现 PyTorch官方实现ResNet 【pytorch】ResNet18、…

10道高频Vuex面试题快问快答

※其他的快问快答&#xff0c;看这里&#xff01; 10道高频Qiankun微前端面试题快问快答 10道高频webpack面试题快问快答 20道高频CSS面试题快问快答 20道高频JavaScript面试题快问快答 30道高频Vue面试题快问快答 面试中的快问快答 快问快答的情景在面试中非常常见。 在面试过…

凡泰极客亮相香港金融科技周,投身大湾区数字化建设

11月2-3日&#xff0c;作为全球性的金融科技盛会——香港金融科技周2023于香港会展中心隆重开幕。大会云集全球500多家金融机构及金融科技企业参展&#xff0c;吸引超过3万余人次相关人士参会。 凡泰极客作为中国领先的金融科技企业受邀参会&#xff0c;吸引了多领域专家、投资…