响应式流和reactor框架进阶

news2025/1/23 3:21:39

响应式流和reactor框架进阶

响应式流创建、转换、处理

本文档主要介绍在响应式编程中如何从流中获取数据并处理。

前提条件

假设您已经能掌握Java基础、Maven使用、Lamda表达式、响应式编程等基础。

如何获取流中数据

🌏 说明
1、不要试图从流中获取数据出来,而是先思考需要对流中元素做什么,响应式代码需要使用响应式> 方法(如subscribe())来订阅数据流并触发异步处理。
2、需要对流中的数据进行操作时,都应该使用对应操作符来处理,根据Mono/Flux等提供的操作符> API进行组合操作。
3、如下图reactor官方marble diagrams图示意,我们需要做的就是编写operator部分,而> operator即为Mono/Flux等提供的各类操作符如.map()、.flatMap()等方法。
4、关于操作符的API如果不明白含义时可以看marble diagrams示意图,鼠标放在操作符上即可。
5、响应式里面可以操作非响应式的方法,但非响应式方法内无法返回响应式结果。

在这里插入图片描述
在这里插入图片描述

流数据产生的时机

官方释义:
在 Reactor 中,当你创建了一条 Publisher 处理链,数据还不会开始生成。事实上,你是创建了 一种抽象的对于异步处理流程的描述(从而方便重用和组装)。
当真正“订阅(subscribe)”的时候,你需要将 Publisher 关联到一个 Subscriber 上,然后 才会触发整个链的流动。这时候,Subscriber 会向上游发送一个 request 信号,一直到达源头 的 Publisher。

🌏 说明
1后端代码直接声明响应式流时需要显示声明.subscribe()才能订阅到数据。
2前端接口调用则无需显示声明.subscribe()(前端本身即为订阅者)。

注意事项

‼️ 警告
在响应式编程中,在任何时候执行业务代码时都不要使用block()、blockFirst()、blockXX()方法,
为了避免使用block(),我们应该尽可能地使用响应式操作符(如map、flatMap、filter等)对数> 据流进行转换和处理。
使用block()方法可能引发以下问题:
1、阻塞线程:调用block()方法会阻塞当前线程,导致无法处理其他并发请求。这会降低系统的吞吐> 量和响应性能。
2、死锁风险:如果在处理响应式流时使用了block()方法,而其中某些操作也依赖于同一个线程的结> 果,则可能导致死锁。
3、内存资源浪费:阻塞调用将持续占用线程,而每个线程都需要额外的内存资源。如果应用程序中 同时有大量的阻塞操作,可能导致线程池耗尽和内存资源浪费。

示例代码

创建 Flux 或 Mono 并订阅它的简单方法

创建Mono流并订阅

Mono<String> noData = Mono.empty(); 

Mono<String> data = Mono.just("foo"); //1. 

noData.subscribe(); //2.
data.subscribe();	//3.

代码说明:

1、将String对象转成Mono流

2、订阅Mono流,empty()什么也不会返回,控制台不会打印任何数据。

3、订阅Mono流,打印foo。

Mono更多说明:

●发出一个 T,我已经有了:just

​ ○基于一个 Optional:Mono#justOrEmpty(Optional)

​ ○基于一个可能为 null 的 T:Mono#justOrEmpty(T)

●发出一个 T,且还是由 just 方法返回

​ ○但是“懒”创建的:使用 Mono#fromSupplier 或用 Mono#defer 包装 just

Flux<String> seq1 = Flux.just("foo", "bar", "foobar");

List<String> iterable = Arrays.asList("foo", "bar", "foobar"); //
Flux<String> seq2 = Flux.fromIterable(iterable); //1.

seq1.subscribe(); //2.
seq2.subscribe();

代码说明:

1、将List对象转成Flux流

2、订阅Flux流,使Flux流开始产生数据,并依次打印foo、bar、foobar。

Flux更多说明:

●发出许多 T,这些元素我可以明确列举出来:Flux#just(T…)

●基于迭代数据结构:

​ ○一个数组:Flux#fromArray

​ ○一个集合或 Iterable类型数据:Flux#fromIterable

​ ○一个Stream类型:Flux#fromStream(Supplier)

​ ○一个连续的数字区间:Flux#range

对序列进行转换

1-1的转换

​ ●1对1地转化(比如字符串转化为它的长度):map()

​ ○类型转换:cast()

​ ○获取流中每个元素的序号:Flux#index

Flux<String> source = Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
                          .map(String::toUpperCase);
source.subscribe(d -> System.out.println("Subscriber: "+d));

1-n的转换

​ ●丢弃流中一些数据empty

 Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple", "gray"))
		.flatMap(item -> Mono.justOrEmpty(item.toUpperCase()))
		.flatMap(item -> {
			if (item.startsWith("g")) {
				return Mono.empty();
			}
			return Mono.just(item);
		})
		.subscribe();

​ ●对每一个元素执行一个异步操作flatMap

Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
	.flatMap(item -> Mono.justOrEmpty(item.toUpperCase()))
	.subscribe();

​ ●基于Mono结果流返回多个元素的序列flatMapMany

Mono.just("foo")
	.flatMapMany(foo -> Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
							.flatMap(item -> Mono.justOrEmpty(item.toUpperCase()))
							.map(upperItem -> new StringBuilder(upperItem)
									.append(" ")
									.append("with")
									.append(" ")
									.append(foo)
									.toString()
							)
	)
	.subscribe();

​ ●自定义转化方法和/或状态:handle

💡 注意
响应式方法里面不能返回null,如果上一操作符返回的数据内包含null值,响应流会抛出异常,如果一定要处理null值,可以使用handle方法,Mono/Flux均含有handle方法.
1、映射到字母。
2、如果返回的是 null …
3、就不会调用 sink.next 从而过滤掉

public String alphabet(int letterNumber) {
        if (letterNumber < 1 || letterNumber > 26) {
                return null;
        }
        int letterIndexAscii = 'A' + letterNumber - 1;
        return "" + (char) letterIndexAscii;
}

Flux<String> alphabet = Flux.just(-1, 30, 13, 9, 20)
    .handle((i, sink) -> {
        String letter = alphabet(i); //1.
        if (letter != null) 	//2.
            sink.next(letter); //3.
    });

alphabet.subscribe(System.out::println);

创建Flux流并订阅

仅对数据做打印或赋值

不对序列造成改变的情况下,得到通知或执行一些操作

●发出元素:doOnNext

Map<String, Object> map = new HashMap<>();
map.put("demo","value");
Mono.just(map)
	.doOnNext(time -> map.put("time", LocalDateTime.now()))
	.subscribe(time -> System.out.println("map:" + map));

●因错误终止:doOnError

Flux.<String>error(new IllegalArgumentException())
	.doOnError(System.out::println)
	.subscribe();

​ ●取消:doOnCancel

​ ●订阅时:doOnSubscribe

​ ●请求时:doOnRequest

​ ●序列完成:Flux#doOnComplete,Mono#doOnSuccess

​ ●所有类型的信号(Signal):Flux#doOnEach

​ ●所有结束的情况(完成complete、错误error、取消cancel):doFinally

​ ●记录日志:log

将Flux转成Mono流

​ ●只取第一个元素放到 Mono 中返回:Flux#next()

​ ●最多只取 1 个元素:

​ ○给定序号:Flux#elementAt

​ ○最后一个:.takeLast(1)

​ ■如果为序列空则发出错误信号:Flux#last()

​ ■如果序列为空则返回默认值:Flux#last(T)

​ ●我只想要一个元素(如果多于一个就返回错误)…

​ ○如果序列为空,发出错误信号:Flux#single()

​ ○如果序列为空,发出一个缺省值:Flux#single(T)

​ ○如果序列为空就返回一个空序列:Flux#singleOrEmpty

响应式流中空序列处理

defaultIfEmpty:空流时想要一个默认值来代替

myMethod.emptySequenceForKey("a") // 这个方法返回一个空的 Mono<String>
        .defaultIfEmpty("") // 将空序列转换为包含字符串 "" 的序列
        .zipWhen(aString -> myMethod.process("b")) // 当 "" 发出时被调用
        .subscribe();

switchIfEmpty:空流时响应一个默认流来代替

userService.getFavorites(userId) 
           .flatMap(favoriteService::getDetails) 
           .switchIfEmpty(suggestionService.getSuggestions()) 
		   .subscribe();

响应式流中异常处理

创建一个错误序列

创建Flux.error()流

//Flux.error(Throwable):创建一个发出指定错误的 Flux。
Flux<Object> fluxWithError = Flux.error(new RuntimeException("Something went wrong"));

创建Mono.error()流

//Mono.error(Throwable):创建一个发出指定错误的 Mono。
Mono<Object> monoWithError = Mono.error(new RuntimeException("Something went wrong"));

如果元素超时未发出

模拟一个Flux流中的信号延迟5s才发送,但监听整个流超过3s时会抛出超时异常TimeoutException

Flux<Object> fluxWithTimeout = Flux.just("value")
        .delayElements(Duration.ofSeconds(5))
        .timeout(Duration.ofSeconds(3)); //如果元素在指定时间内未发出,则会超时。

try/catch的表达方式

抛出异常

//Flux.error(Throwable):创建一个发出指定错误的 Flux。
Flux<Object> fluxWithError = Flux.error(new RuntimeException("Something went wrong"));

捕获异常并返回默认值

//onErrorReturn(T):捕获异常并返回一个默认值。
Mono<Object> monoWithDefault = Mono.error(new RuntimeException("Something went wrong"))
        .onErrorReturn("Default value");

获异常并返回另一个Flux或Mono

//onErrorResume(Function):捕获异常并返回另一个 Flux 或 Mono。
Flux<Object> fluxWithFallback = Flux.error(new RuntimeException("Something went wrong"))
        .onErrorResume(e -> Flux.just("Fallback value"));

包装异常后再抛出:

//onErrorMap(Function):捕获异常并对异常进行包装后再抛出。
Flux<Object> fluxWithErrorMapping = Flux.just("value")
        .flatMap(value -> {
            try {
                //某些操作可能引发异常
                return Mono.just(value.toUpperCase());
            } catch (Exception e) {
                return Mono.error(new RuntimeException("Error occurred", e));
            }
        });

finally代码块

//doFinally(Consumer):在序列完成时执行 finally 代码块,可以根据 SignalType 进行不同的处理。
Mono<Object> monoWithFinally = Mono.just("value")
        .doFinally(signalType -> {
            if (signalType == SignalType.ON_COMPLETE) {
                System.out.println("Finally block: Completed");
            } else if (signalType == SignalType.ON_ERROR) {
                System.out.println("Finally block: Error occurred");
            }
        });

处理错误

返回一个默认的值:onErrorReturn

//使用 onErrorReturn 操作符来捕获异常并返回一个默认值。
Mono<Object> monoWithDefault = Mono.error(new RuntimeException("Something went wrong"))
        .onErrorReturn("Default value");

返回另一个Publisher:onErrorResume

//使用 onErrorResume 操作符来捕获异常并返回另一个 Flux 或 Mono。
Flux<Object> fluxWithFallback = Flux.error(new RuntimeException("Something went wrong"))
        .onErrorResume(e -> Flux.just("Fallback value"));

重试:retry

//使用 retry 操作符来在遇到错误时重试。
Flux<Object> fluxWithRetry = Flux.just("value")
        .concatWith(Flux.error(new RuntimeException("Something went wrong")))
        .retry(3); // 重试三次

伴随触发:retryWhen

//使用 retryWhen 操作符来在遇到错误时根据自定义的重试策略重试。
import reactor.util.retry.Retry;
import java.time.Duration;

Flux<Object> fluxWithRetryWhen = Flux.just("value")
        .concatWith(Flux.error(new RuntimeException("Something went wrong")))
        .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))); // 指数退避重试,最多重试三次,间隔一秒

处理背压错误

🌏 说明

处理回压错误是确保上游和下游之间的流量控制,以避免潜在的资源耗尽或系统不稳定。Reactor 提供了几种处理回压错误的策略,包括抛出异常、丢弃元素、保留最新元素、以及缓存元素等。以下是示例:

抛出 IllegalStateException:Flux#onBackpressureError

//onBackpressureError():抛出 IllegalStateException。
Flux.range(1, 1000)
    .onBackpressureError()
    .subscribe(System.out::println);

丢弃策略:Flux#onBackpressureDrop

//onBackpressureDrop():丢弃元素。
Flux.range(1, 1000)
    .onBackpressureDrop()
    .subscribe(System.out::println);

不丢弃最后一个元素:Flux#onBackpressureLatest

//onBackpressureLatest():保留最新元素。
Flux.range(1, 1000)
    .onBackpressureLatest()
    .subscribe(System.out::println);

缓存策略(有限或无限):Flux#onBackpressureBuffer

//onBackpressureBuffer():缓存元素。
Flux.range(1, 1000)
    .onBackpressureBuffer()
    .subscribe(System.out::println);

有限缓存空间并应用给定策略:Flux#onBackpressureBuffer 带有策略 BufferOverflowStrategy

//onBackpressureBuffer(int capacity, BufferOverflowStrategy strategy):指定缓存空间大小和溢出策略。
import reactor.core.publisher.BufferOverflowStrategy;

Flux.range(1, 1000)
    .onBackpressureBuffer(10, BufferOverflowStrategy.DROP_OLDEST)
    .subscribe(System.out::println);

常见响应式操作符

基本概念

Publisher:发布者(数据流),发布者负责向订阅者发送数据项,并管理数据流的发布。

Subscriber:订阅者,通过订阅者,可以实现响应式编程中的数据流控制和处理逻辑。

Mono: 包含0-1个数据的发布者,实现了Publisher

Flux: 包含0-n个数据的发布者,实现了Publisher

Operator: 操作符,表示对数据流中的数据的操作描述。用于改变发布者的行为。

🌏 说明

当发布者被订阅时,发布者才开始生产消息:编写代码实际上是使用操作符来一个描述数据处理逻辑,当发布者被订阅时才会执行这些处理逻辑。

常用操作符

map:转换上游数据

map操作符用于对流中的每个元素进行转换,并返回一个新的流。它可以将一个类型的流转换为另一个类型的流,或者对原始流中的元素进行修改。

功能描述

  • 对流中的每个元素应用指定的转换函数。
  • 返回一个包含转换后的元素的新流。
  • 保持了原来流的顺序,但元素的类型可能不同。

使用场景

  • **数据的转换和映射:**通过定义转换函数,将流中的元素从一种类型转换为另一种类型。
  • **数据的处理和加工:**对流中的元素进行处理,例如提取特定字段、计算属性等。
  • **数据的规范化和标准化:**对流中的元素进行规范化、标准化或格式化操作。

返回值

map操作符返回一个新的流,其中包含经过转换函数转换后的元素。

使用示例

假设有一个数据流source,包含整数数据,我们使用map方法对数据流中的每个整数进行平方操作

import reactor.core.publisher.Flux;

public class MapOperatorExample {

    public static void main(String[] args) {
        Flux<Integer> source = Flux.just(1, 2, 3, 4);
        Flux<Integer> squaredStream = source.map(num -> num * num);

        squaredStream.subscribe(
                squaredNum -> System.out.println("Squared number: " + squaredNum)
        );
    }
}

mapNotNull:转换上游数据,并忽略null

mapNotNull操作符是用于转换上游数据并忽略其中的null值的操作。它类似于map操作符,但会自动过滤掉转换后的结果为null的元素。

功能描述

  • 对流中的每个元素应用指定的转换函数。
  • 自动过滤后转换后的结果为null的元素。
  • 返回一个包含非null转换结果的新流。

使用场景

  • 数据的转换和映射:通过定义转换函数,将流中的元素从一种类型转换为另一种类型,同时过滤掉转换结果为null的元素。

返回值

mapNotNull方法返回一个新的流,其中包含经过转换函数转换后的非null元素。

使用示例

假设有一个数据流source,包含字符串数据,我们使用mapNotNull操作符对数据流中的每个字符串进行转换操作,并过滤掉映射结果为null的元素。

import reactor.core.publisher.Flux;

public class MapNotNullOperatorExample {

    public static void main(String[] args) {
        Flux<String> source = Flux.just("apple", "", "banana", null, "cherry");
        Flux<String> mappedStream = source.mapNotNull(str -> {
            if (str != null && !str.isEmpty()) {
                return str.toUpperCase();
            } else {
                return null;
            }
        });

        mappedStream.subscribe(
                mappedStr -> System.out.println("Mapped string: " + mappedStr)
        );
    }
}

flatMap:转换上游数据,但是结果是一个数据流,并将这个数据流平铺

flatMap操作符用于将上游数据进行转换,并将结果作为一个新的数据流展开(平铺)。它可以将一个元素转换为多个元素的流,并将这些流合并成一个单一的流。

功能描述

  • 对流中的每个元素应用指定的转换函数。
  • 转换函数返回一个流作为结果。
  • 将所有转换后的流合并成一个单一的流。

使用场景

  • **数据的转换和拆解:**通过定义转换函数,将一个元素转换为多个元素的流,并将这些流合并成单一的流。
  • **扁平化数据结构:**将嵌套的数据结构展开为扁平的数据流。

返回值

flatMap操作符返回一个新的流,其中包含转换后的展开(平铺)元素。

使用示例

假设有一个数据流source,包含字符串数据,我们使用flatMap操作符将数据流中的每个字符串拆分为单个字符,并将这些字符作为新的数据流发送给下游。

import reactor.core.publisher.Flux;

public class FlatMapOperatorExample {

    public static void main(String[] args) {
        Flux<String> source = Flux.just("hello", "world", "reactive");
        Flux<Character> flatMappedStream = source.flatMap(str ->
                Flux.fromArray(str.split(""))
        );

        flatMappedStream.subscribe(
                ch -> System.out.println("Character: " + ch)
        );
    }
}

flatMapMany:转换Mono中的元素为Flux(1个转多个)

flatMapMany操作符用于将Mono中的元素转换为一个包含多个元素的Flux。它是对flatMap方法的特化,适用于将单个元素转换为多个元素的场景。

功能描述

  • Mono中的元素应用指定的转换函数。
  • 转换函数返回一个Flux作为结果。
  • 将转换后的Flux中的所有元素合并成一个单一的Flux

使用场景

  • **单个元素转多个元素:**当需要将Mono中的单个元素转换为多个元素时,可以使用flatMapMany操作符。
  • **扩展和拆解数据:**将一个包含单个元素的Mono扩展为包含多个元素的Flux,并进行进一步处理。

返回值

flatMapMany操作符返回一个新的Flux,其中包含转换后的多个元素。

使用示例

假设有一个Mono对象sourceMono,包含一个字符串数据,我们使用flatMapMany操作符将Mono中的字符串拆分为单个字符,并作为一个新的Flux数据流发送给下游。

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class FlatMapManyOperatorExample {

    public static void main(String[] args) {
        Mono<String> sourceMono = Mono.just("hello");
        Flux<Character> flatMappedStream = sourceMono.flatMapMany(str ->
                Flux.fromArray(str.split(""))
        );

        flatMappedStream.subscribe(
                ch -> System.out.println("Character: " + ch)
        );
    }
}

filter:过滤元素

filter操作符用于根据给定的条件过滤流中的元素。它允许只保留符合条件的元素,而过滤掉不符合条件的元素。

功能描述

  • 根据给定的条件过滤流中的元素。
  • 只保留符合条件的元素,过滤掉不符合条件的元素。

使用场景

  • **数据筛选:**当需要从数据流中筛选出符合特定条件的元素时,可以使用filter操作符。
  • **数据过滤:**用于过滤掉不需要的数据,只保留满足条件的数据。

返回值

  • filter操作符返回一个包含符合条件的元素的新的Flux流。

使用示例

假设有一个数据流source,包含整数数据,我们使用filter操作符过滤出数据流中大于5的元素。

import reactor.core.publisher.Flux;

public class FilterOperatorExample {

    public static void main(String[] args) {
        Flux<Integer> source = Flux.just(3, 7, 2, 9, 5);
        Flux<Integer> filteredStream = source.filter(num -> num > 5);

        filteredStream.subscribe(
                filteredNum -> System.out.println("Filtered number: " + filteredNum)
        );
    }
}

filterWhen:异步过滤**

filterWhen操作符是用于异步过滤数据流中元素的操作符,在进行过滤时可以异步地根据给定条件来判断是否保留元素。

功能描述

  • 异步过滤操作:filterWhen操作符允许使用异步的条件来过滤数据流中的元素。
  • **条件判断:**每个元素都可以通过异步条件判断来确定是否应该保留在数据流中。
  • **保留规则:**当条件为true时,保留元素;当条件为false时,丢弃元素。

使用场景

  • **异步条件过滤:**当需要根据异步条件来过滤数据流中的元素时,可以使用filterWhen操作符。
  • **延迟判断:**适用于需要等待异步操作完成后才能确定是否保留元素的场景。

返回值

  • filterWhen操作符返回与原始数据流中符合条件的元素类型相同。

使用示例

假设有一个数据流source,包含整数数据,我们使用filterWhen操作符根据异步条件来过滤出数据流中大于5的元素。

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class FilterWhenOperatorExample {

    public static void main(String[] args) {
        Flux<Integer> source = Flux.just(3, 7, 2, 9, 5);
        Flux<Integer> filteredStream = source.filterWhen(num ->
                Mono.fromCallable(() -> num > 5)
        );

        filteredStream.subscribe(
                filteredNum -> System.out.println("Filtered number: " + filteredNum)
        );
    }
}

concat:将多个流连接在一起组成一个流(按顺序订阅)

concat操作符用于将多个流按照顺序连接在一起,形成一个新的流。它会先订阅并处理第一个流的元素,然后再处理下一个流的元素,以此类推,保持了流的顺序。

功能描述

  • 将多个流按照顺序连接在一起。
  • 按照连接的顺序依次订阅和处理每个流的元素。

使用场景

  • **合并多个流:**当需要将多个流合并成一个单一的流,并保持原始流的顺序时,可以使用concat操作符。
  • **依赖前后顺序:**当需要确保流的订阅和处理按照特定的顺序进行时,可以使用concat操作符。

返回值

concat操作符返回一个新的流,该流是多个流按照顺序连接后的结果。

使用示例

创建两个整数数据流,使用concat操作符按顺序连接它们,并通过订阅处理输出连接后的全部元素的过程。

import reactor.core.publisher.Flux;

public class ConcatOperatorExample {

    public static void main(String[] args) {
        Flux<Integer> flux1 = Flux.just(1, 2, 3);
        Flux<Integer> flux2 = Flux.just(4, 5, 6);

        Flux.concat(flux1, flux2)
                .subscribe(System.out::println);
    }
}

// 输出结果为:1, 2, 3, 4, 5, 6

concatWith:组合流。(原始流追加另一个流后的结果。)

concatWith操作符用于连接两个流,将它们合并为一个新的流。它允许按照顺序将一个流的元素追加到另一个流的末尾。

功能描述

  • 连接两个流,将它们合并为一个新的流。
  • 将一个流的元素追加到另一个流的末尾。

使用场景

  • 当需要将两个流按照顺序合并为一个流时,可以使用concatWith操作符。
  • 适用于需要按照特定的顺序连接多个流的情况。

返回值

concatWith操作符返回一个新的流,包含合并后的元素,该流是原始流追加另一个流后的结果。

使用示例

通过concatWith操作符将flux2追加到flux1的末尾,形成一个新的Flux。最后通过订阅这个新的Flux,输出的结果为按顺序连接后的所有元素:1, 2, 3, 4, 5, 6。

import reactor.core.publisher.Flux;

public class ConcatWithOperatorExample {

    public static void main(String[] args) {
        Flux<Integer> flux1 = Flux.just(1, 2, 3);
        Flux<Integer> flux2 = Flux.just(4, 5, 6);

        flux1.concatWith(flux2)
                .subscribe(System.out::println);
    }
}
// 输出结果为:1, 2, 3, 4, 5, 6

merge:将多个流合并在一起,同时订阅流

merge操作符用于将多个流合并在一起,同时订阅这些流。它会同时订阅所有的流,并按照元素的到达顺序进行处理。

功能描述

  • 将多个流合并在一起。
  • 同时订阅所有的流。
  • 按照元素的到达顺序依次处理这些流的元素。

使用场景

  • **合并多个流:**当需要将多个流合并成一个单一的流时,可以使用merge操作符。
  • **并发处理多个流:**当需要并发地处理多个流中的元素时,可以使用merge操作符。

返回值

merge操作符返回一个新的流,其中包含合并后的多个原始流中的所有元素。

使用示例

通过merge操作符将这两个Flux合并为一个新的Flux,并通过订阅这个新的Flux,同时处理flux1flux2的元素并输出。由于merge是并行合并多个Flux,所以输出的结果可能是乱序的。

import reactor.core.publisher.Flux;

public class MergeOperatorExample {

    public static void main(String[] args) {
        Flux<Integer> flux1 = Flux.just(1, 2, 3);
        Flux<Integer> flux2 = Flux.just(4, 5, 6);

        Flux.merge(flux1, flux2)
                .subscribe(System.out::println);
    }
}

zip:压缩多个流中的元素

zip操作符用将多个流中的元素进行压缩,即按照索引位置一对一地组合这些流的元素。它会从每个流中取出相同索引位置的元素,并将它们合并成一个新的元素。

功能描述

  • 将多个流中的元素按照索引位置进行压缩。
  • 从每个流中取出相同索引位置的元素,并将他们合并成一个新的元素。
  • 新的元素按照原始流的顺序排列。

使用场景

  • **元素一对一组合:**当需要将多个流中的元素按照索引位置进行一对一的组合时,可以使用zip操作符。
  • **数据聚合:**当需要将多个相关流的元素聚合到一个新的数据结构中时,可以使用zip操作符。

返回值

zip操作符返回一个新的流,其中包含原始流中按照索引位置压缩后的元素。

使用示例

通过zip操作符将flux1flux2中对应位置的元素进行乘法操作,生成一个新的Flux,并通过订阅这个新的Flux,输出的结果为对应位置元素相乘的结果:10, 40, 90。

import reactor.core.publisher.Flux;

public class ZipOperatorExample {

    public static void main(String[] args) {
        Flux<Integer> flux1 = Flux.just(1, 2, 3);
        Flux<Integer> flux2 = Flux.just(10, 20, 30);

        Flux.zip(flux1, flux2, (num1, num2) -> num1 * num2)
                .subscribe(System.out::println);
    }
}

then:上游流完成后执行其他的操作.

then操作符用于在上游流完成后执行其他的操作。它允许在流完成时触发一些额外的逻辑,而不是处理流中的元素。

功能描述

  • 在上游流完成后执行其他的操作。
  • 不关心上游流中的具体元素,只关注流的完成事件。
  • 返回一个新的Publisher,在上游流完成后触发指定的操作。

使用场景

  • **执行清理操作:**当需要在上游流完成后执行一些清理逻辑,如关闭资源或释放锁等,可以使用then操作符。
  • **触发异步操作:**当需要在上游流完成后触发一些异步操作,如发送通知或触发其他的流,可以使用then操作符。

返回值

then操作符返回一个新的Publisher,它会在上游流完成后触发指定的操作。

使用示例

创建了一个包含字符串"Hello"的Mono,当流成功完成时会打印一条消息。然后使用then操作符,在流完成后添加了一个新的Mono,包含字符串"World"。在then操作符执行后,会打印一条消息表示执行了then操作,并输出"World"。最终通过订阅执行整个流程。

import reactor.core.publisher.Mono;

public class ThenOperatorExample {

    public static void main(String[] args) {
        Mono.just("Hello")
                .doOnSuccess(value -> System.out.println("Source stream completed with value: " + value))
                .then(Mono.just("World"))
                .doOnSuccess(value -> System.out.println("Then operator executed with value: " + value))
                .subscribe();
    }
}

doOnNext:流中产生数据时执行.

doOnNext操作符用于在流中产生数据时执行一些额外的操作。它允许我们观察到流中每个元素的生成,并在每个元素到达时执行指定的逻辑。

功能描述

  • 在流中产生数据时执行额外的操作。
  • 对每个元素进行观察和处理,而不会改变流的内容。
  • 不影响流的传递,只是在每个数据项上附加附加操作。

使用场景

  • **调试和日志记录:**当需要跟踪流中每个元素的值,或者在特定条件下记录日志时,可以使用doOnNext操作符。
  • **副作用触发:**当需要在流中产生数据时触发其他副作用操作,如发送通知、更新状态等,可以使用doOnNext操作符。

返回方法

doOnNext操作符返回原始的数据流,不会改变流中的数据元素。

使用示例

通过doOnNext操作符,在每次处理元素时输出一条日志记录。然后通过map操作符对每个元素进行乘以10的操作。最终通过订阅输出经过处理后的结果。

import reactor.core.publisher.Flux;

public class DoOnNextOperatorExample {

    public static void main(String[] args) {
        Flux.just(1, 2, 3)
                .doOnNext(num -> System.out.println("Processing element: " + num))
                .map(num -> num * 10)
                .subscribe(System.out::println);
    }
}

doOnError:发送错误时执行.

doOnError操作符用于在流发送 错误时执行一些额外的操作。它允许观察到流中发生的错误,并在错误发生时执行指定的逻辑。

功能描述

  • 当流遇到错误时,执行指定的逻辑。
  • 对每个错误进行观察和处理,而不会改变流的内容。
  • 可以用于调试、记录日志或执行其他副作用操作。

使用场景

  • 错误处理和记录:当需要捕获和处理流中发生的错误,并记录相关信息时,可以使用doOnError操作符。
  • 调试和故障排除:当需要观察和诊断流中发生的错误,以便进行调试和故障排查时,可以使用doOnError操作符。

返回值

doOnError操作符返回与原始流相同类型的新流,其中包含了原始流中的所有元素(包括错误)。

使用示例

通过map操作符对每个元素进行除法操作,当遇到除以0的情况时会产生错误。通过doOnError操作符,在遇到错误时输出错误信息。然后使用onErrorResume操作符来处理错误情况,将错误替换为-1。最终通过订阅输出处理后的结果。

import reactor.core.publisher.Flux;

public class ErrorHandlingExample {

    public static void main(String[] args) {
        Flux.just(1, 2, 0, 4)
                .map(num -> 10 / num)
                .doOnError(error -> System.err.println("Error occurred: " + error.getMessage()))
                .onErrorResume(e -> Flux.just(-1))
                .subscribe(System.out::println);
    }
}

doOnCancel:流被取消时执行

doOnCancel操作符用于在流被取消时执行一些额外的操作。它允许观察到流被取消的事件,并在取消发生时执行指定的逻辑。

功能描述

  • 当流被取消时,执行指定的逻辑。
  • 可以用于资源释放、清理操作或记录相关信息。
  • 对取消事件进行观察和处理,而不会改变流的内容。
  • 不影响流的传递,只是在取消发生时附加额外的操作。

使用场景

  • **资源释放和清理:**当需要再流被取消时执行一些资源释放或者清理的操作,如关闭数据库连接、停止定时任务等,可以使用doOnCancel操作符。
  • **日志记录和统计:**当需要观察和记录流被取消的次数,时间等信息时,可以使用doOnCancel操作符。

返回值

doOnCancel操作符返回与原始流相同类型的新流,其中包含了原始流中的所有元素(如果有)。

使用示例

import reactor.core.publisher.Mono;
import java.time.Duration;

public class DoOnCancelOperatorExample {
    public static void main(String[] args) throws InterruptedException {
        Mono<String> requestMono = Mono.delay(Duration.ofSeconds(5))
                .doOnCancel(() -> System.out.println("Client disconnected"))
                .map(i -> "Response");

        // 模拟客户端发送请求
        System.out.println("Sending HTTP request");
        requestMono.subscribe(response -> {
            System.out.println("Received response: " + response);
        });

        // 模拟客户端在未收到响应之前断开连接
        Thread.sleep(2000);
        System.out.println("Client disconnected");
    }
}

doOnComplete:用于在流完成时执行指定的操作。

doOnComplete操作符用于在流完成时执行指定的操作。它允许在流正常终止时执行一些附加的逻辑。

功能描述

  • doOnComplete操作符订阅一个流,并在该流正常终止时执行给定的操作。
  • 通常用于流完成后执行一些清理操作、记录日志或者发通知等。

使用场景

  • 当需要在流完成时执行一些额外的操作时,可以使用doOnComplete操作符。
  • 适用于需要在流结束后进行附加处理的情况。

返回值

doOnComplete操作符返回一个新的流,该流与原始流相同。

使用示例

创建了一个 Flux 数据流 source,包含了整数 1 到 5。然后使用 map 操作符对每个整数进行乘以 2 的操作。接下来,使用 doOnComplete() 操作符在流完成时打印一条消息。最后,订阅处理过的数据流,并打印输出每个经过处理的数值。

import reactor.core.publisher.Flux;

public class DoOnCompleteOperatorExample {

    public static void main(String[] args) {
        Flux<Integer> source = Flux.range(1, 5);
        Flux<Integer> processedStream = source
                .map(num -> num * 2)
                .doOnComplete(() -> System.out.println("Stream completed"));

        processedStream.subscribe(
                processedNum -> System.out.println("Processed number: " + processedNum)
        );
    }
}

onErrorContinue:流发生错误时,继续处理数据而不是终止整个流.

onErrorContinue操作符的作用是在流发生错误时,继续处理数据而不是立即终止整个流。它允许在遇到错误时进行特定的处理,并继续处理后续的数据。

功能描述

  • 在流发生错误时,继续处理数据而不是终止整个流。
  • 允许我们对错误进行特定的处理,并尝试继续处理后续的数据项。
  • 在错误处理期间可以选择忽略、替换或转换错误的数据项。

使用场景

  • 容错和错误处理:当需要对流中的错误进行特定的处理,并继续处理后续的数据项时,可以使用onErrorContinue操作符。
  • 部分失败处理:当流中的某些数据项可能会导致错误,但希望继续处理其他数据项时,可以使用onErrorContinue操作符。

返回值

onErrorContinue操作符返回与原始流相同类型的新流,其中包含了原始流中的所有元素,包括经过特定处理的错误数据项。

使用示例

创建了一个 Flux 数据流 source,使用 map 操作符对每个整数进行除法操作,当遇到除以 0 的情况时会抛出异常。接着使用 onErrorContinue 方法来处理遇到的异常,输出错误信息并继续使用默认值。最后订阅处理过的数据流,分别输出每个经过处理的数值和最终的错误信息。

import reactor.core.publisher.Flux;

public class OnErrorContinueOperatorExample {

    public static void main(String[] args) {
        Flux<Integer> source = Flux.just(1, 2, 0, 4, 5);
        Flux<Integer> processedStream = source
                .map(num -> 10 / num)
                .onErrorContinue((error, value) -> {
                    System.out.println("Error occurred: " + error.getMessage() + ". Using default value instead.");
                });

        processedStream.subscribe(
                processedNum -> System.out.println("Processed number: " + processedNum),
                error -> System.out.println("Final error: " + error.getMessage())
        );
    }
}

onErrorResume 用于处理流中发生的错误,并返回一个备用的流来继续处理。

功能描述

  • onErrorResume 操作符用于捕获流中的错误,并根据需要返回一个备用的流来替代原始的错误流。当源流发生错误时,onErrorResume 可以将控制流转到备用流上,从而避免流终止并提供容错机制。

使用场景

  • 当我们希望在流中出现错误时进行容错处理,例如返回默认值、重试请求、切换到备用数据源等情况时,可以使用 onErrorResume 操作符。

返回值

  • onErrorResume 操作符返回一个新的流,该流可能是原始流的修改版本或者备用流。

使用示例

创建了一个 Flux 数据流 source,使用 map 操作符对每个整数进行除法操作,当遇到除以 0 的情况时会抛出异常。接着使用 onErrorResume 操作符来处理遇到的异常,输出错误信息并切换到备用的数据流 。最后订阅处理过的数据流,分别输出每个经过处理的数值和最终的错误信息。

import reactor.core.publisher.Flux;

public class OnErrorResumeOperatorExample {

    public static void main(String[] args) {
        Flux<Integer> source = Flux.just(1, 2, 0, 4, 5);
        Flux<Integer> processedStream = source
                .map(num -> 10 / num)
                .onErrorResume(error -> {
                    System.out.println("Error occurred: " + error.getMessage() + ". Switching to default stream.");
                    return Flux.just(-1, -2, -3);
                });

        processedStream.subscribe(
                processedNum -> System.out.println("Processed number: " + processedNum),
                error -> System.out.println("Final error: " + error.getMessage())
        );
    }
}

defaultIfEmpty:当流为空时,使用默认值.

defaultIfEmpty操作符的作用是在流为空时使用默认值。它允许定义一个默认值,在流为空的情况下返回该默认值。

功能描述

  • 当流为空时,使用指定的默认值替代。
  • 允许在流为空时返回一个默认值,以避免特殊处理空流的情况。
  • 可以用于设置默认结果、避免空指针异常等场景。

使用场景

  • **默认结果设置:**当流可能为空,但需要返回一个默认结果时,可以使用defaultIfEmpty操作符。
  • **空值处理:**当需要处理可能为空的流,并避免出现空指针异常时,可以使用defaultIfEmpty操作符。

返回值

defaultIfEmpty操作符返回与原始流相同类型的新流,其中包含了原始流中的所有元素,或者是指定的默认值。

使用示例

创建了一个空的 Flux 数据流 source。然后使用 defaultIfEmpty 操作符来处理空数据流的情况,返回一个包含默认值-1的新数据流。最后订阅处理过的数据流,如果原始数据流为空,则输出默认值 -1,否则不会有任何输出。

import reactor.core.publisher.Flux;

public class DefaultIfEmptyOperatorExample {

    public static void main(String[] args) {
        Flux<Integer> source = Flux.empty();
        Flux<Integer> processedStream = source
                .defaultIfEmpty(-1);

        processedStream.subscribe(
                processedNum -> System.out.println("Processed number: " + processedNum),
                error -> System.out.println("Final error: " + error.getMessage())
        );
    }
}

switchIfEmpty:当流为空时,切换为另外一个流.

switchIfEmpty操作符的作用是在流为空时切换为另一个流。它允许定义一个备选流,在原始流为空的情况下使用备选流作为替代。

功能描述

  • 当流为空时,切换到备选流。
  • 允许在流为空的情况下,使用备选流替代原始流的处理逻辑。
  • 可以用于设置备选数据源、提供默认值等场景。

使用场景

  • 备选数据源:当需要再原始流为空时切换到备选数据源时,可以使用switchIfEmpty操作符。
  • 提供默认值:当需要为空流提供默认值或备选结果时,可以使用switchIfEmpty操作符。

返回值

switchIfEmpty操作符返回一个新的流,根据原始流是否为空进行切换后的结果。

使用示例

创建了一个空的 Flux 数据流 source,以及一个备用的 Flux 数据流 backupStream ,使用 switchIfEmpty 操作符来处理空数据流的情况,切换到备用数据流 backupStream。最后订阅处理过的数据流,如果原始数据流为空,则输出备用数据流中的数据;如果原始数据流不为空,则会输出原始数据流中的数据。

import reactor.core.publisher.Flux;

public class SwitchIfEmptyOperatorExample {

    public static void main(String[] args) {
        Flux<Integer> source = Flux.empty();
        Flux<Integer> backupStream = Flux.just(-1, -2, -3);
        Flux<Integer> processedStream = source
                .switchIfEmpty(backupStream);

        processedStream.subscribe(
                processedNum -> System.out.println("Processed number: " + processedNum),
                error -> System.out.println("Final error: " + error.getMessage())
        );
    }
}

as:将流作为参数,转为另外一个结果

as操作符用于将流作为参数,转换为另外一个结果。它允许对流进行转换操作,并返回一个新的结果对象。

功能描述

  • 将流作为参数,转换为另外一个结果。
  • 允许对流进行转换操作,并返回一个新的结果对象。

使用场景

  • 当需要对流进行转换操作,并将结果用于其他用途时,可以使用as操作符。
  • 适用于需要将流转换为其他类型或结果的情况。

返回值

as操作符返回一个新的结果对象,根据具体使用情况而定。

使用示例

假设有一个从数据库中获取用户信息的流,我们可以使用 as 操作符将流中的数据转换为 User 对象的示例。

import reactor.core.publisher.Flux;

public class UserProcessor {
    public static void main(String[] args) {
        Flux<Integer> userIds = Flux.just(1, 2, 3);

        Flux<User> users = userIds
            .flatMap(userId -> getUserInfoFromDatabase(userId))
            .map(userInfo -> mapToUserObject(userInfo))
            .as(userType -> userType);

        users.subscribe(user -> System.out.println(user.toString()));
    }
}

collectList:收集元素转换为list集合

功能描述

  • 收集流中的所有元素,并将它们组成一个List集合。

使用场景

  • 当需要将流中的所有元素收集到一个List集合中时,可以使用collectList操作符。
  • 适用于需要对流进行聚合操作的场景。

返回值

collectList操作符返回一个Mono<List<T>>对象,该对象表示一个包含所有收集到的元素的List集合。

使用示例

collectList操作符将源流中的所有整数元素收集到一个列表中。在订阅后,将打印出收集到的列表内容。

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;

public class CollectListExample {
    public static void main(String[] args) {
        Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
        Mono<List<Integer>> listMono = numbers.collectList();
        listMono.subscribe(list -> System.out.println("Collected List: " + list));
    }
}

cast:用于将流中的元素类型转换为指定的类型。(父类转子类)

cast操作符用于将流中的元素类型转换为指定的类型。它允许我们对流中的元素进行类型转换操作。

功能描述

  • cast操作符将流中的元素强制转换为指定的类型。
  • 它可以用于处理具有父子类关系的元素类型之间的转换。即将父类的元素转换为子类的元素。

使用场景

  • 当需要将流中的元素类型转换为目标类型时,可以使用cast操作符。
  • 适用于处理具有父子类关系的元素类型之间的转换。

返回值

cast操作符返回一个新的流,其中的元素类型已经被转换为指定的类型。

使用示例

cast操作符将包含不同类型数字的Flux<Number>流转换为Flux<Integer>流。订阅后,将打印出转换后的整数元素。

import reactor.core.publisher.Flux;

public class CastExample {
    public static void main(String[] args) {
        Flux<Number> numbers = Flux.just(1, 2.5, 3, 4.7, 5);

        Flux<Integer> integerFlux = numbers.cast(Integer.class);

        integerFlux.subscribe(number -> System.out.println("Number as Integer: " + number));
    }
}

响应式流调试

在命令式世界,调试通常都是非常直观的:直接看 stack trace 就可以找到问题出现的位置, 以及:是否问题责任全部出在你自己的代码?问题是不是发生在某些库代码?如果是, 那你的哪部分代码调用了库,是不是传参不合适导致的问题?

开启流的日志记录

在响应式流上添加 log()操作符。

将其加到操作链上之后,它会读(只读,peek)流中的事件(包括 onNext、onError、 onComplete, 以及 订阅(subscribe)、 取消)(cancel)、和 请求(request))。

🌍 说明
log操作符通过 SLF4J 使用类似 Log4J 和 Logback 这样的公共的日志工具, 如果 SLF4J 不存在的话,则直接将日志输出到控制台。
控制台使用 System.err 记录 WARN 和 ERROR 级别的日志,使用 System.out 记录其他级别的日志。

编程方式记录

1、doOnNext:通常用于在数据流中的每个元素被处理之前执行一些操作,例如日志记录或调试信息的打印。

Observable.just(1, 2, 3)
    .doOnNext(data -> log.info("Processing data:,{}",data))
    .subscribe();

2、doOnError:通常用于在发生错误时执行一些清理操作或记录错误信息。

Observable.error(new RuntimeException("Something went wrong!"))
    .doOnError(error -> log.error("Error occurred:,{}",error.getMessage())
    .subscribe();

3、doOnEach:可以用于在任何事件发生时执行一些通用的操作。

Observable.just(1, 2, 3)
    .doOnEach(notification -> {
        if (notification.isOnNext()) {
			log.info("Received data:,{}",notification.getValue());
        } else if (notification.isOnError()) {
			log.info("Error occurred:,{}",notification.getError().getMessage());
        } else if (notification.isOnComplete()) {
			log.info("Stream completed successfully");
        }
    })
    .subscribe();

响应式流统计、聚合

统计

count:统计元素个数

  • 用于计算一个 Flux 中包含的元素数量。
  public void countExample() {
        Flux<Integer> num = Flux.just(1, 2, 3, 4);
        num.count()
           .subscribe(count -> System.out.println("count:" + count));
    }
// 输出结果为:count:4

reduce:累积操作

  • 用于将流中的元素按照给定的逻辑进行累积计算,最终返回一个单一的结果。
   //输出流中最大值
    public void reduceExample(){
        Flux<Integer> num = Flux.just(5, 6, 8, 4, 62, 3);
        num.reduce((max,next)->
            Math.max(max,next))
                .subscribe(max-> System.out.println("最大值:"+max));
    }
// 输出结果为:最大值:62


  //对流中元素到进行累乘
    public void reduceExample2(){
        Flux<Integer> range = Flux.range(1, 10);
        range.reduce((now,next)->now*next)
                .subscribe(mul-> System.out.println("1-10的累乘积为:"+mul));
    }
//输出结果为:3628800

Flux 转化为集合

collect:转化为自定义集合

  • 用于将Flux转换为自定义集合。例如ListSetMap等。
        //使用collect将流转成List
        public void collectToListExample(){
            Flux<String> flux = Flux.just("abc", "张三", "apple", "123", "_");
            flux.collect(Collectors.toList())
                    .subscribe(list-> System.out.println("Flux流转List:"+list));
        }
//输出结果为:Flux流转List:[abc, 张三, apple, 123, _]

        //使用collect将流转成Map
        public void collectToMapExample(){
            Flux<String> flux = Flux.just("abc", "张三", "apple", "1234", "_");
            flux.collect(Collectors.toMap(k->k.length(),v->v))//使用元素长度为key,值为value
                    .subscribe(map-> System.out.println("Flux流转Map:"+map));
        }
//输出结果为:Flux流转Map:{1=_, 2=张三, 3=abc, 4=1234, 5=apple}

collectList:转化为一个List集合。

  • collectList会遍历流中的每个元素,并将它们转为List集合。在收集元素到List时,会保留它们在流中的顺序。
 //流转化为 List
    public void collectListExample(){
        Flux<String> flux = Flux.just("f", "a", "b", "c", "d", "e");
        flux.collectList()
                .subscribe(list-> System.out.println("Flux转换为List集合:"+list));
    }
//输出结果为:Flux转换为List集合:[f, a, b, c, d, e]

collectSortList:转化为一个List集合并排序。

  //流转化为 List(顺序)
    public void collectSortListExample(){
        Flux<String> flux = Flux.just("f", "a", "b", "c", "d", "e");
        flux.collectSortedList()
                .subscribe(list-> System.out.println("Flux转换为排序List集合(顺序):"+list));
    }
//输出结果为:Flux转换为排序List集合(顺序):[a, b, c, d, e, f]
    //流转化为 List(逆序)
   public void collectSortListExample2(){
        Flux<String> flux = Flux.just("f", "a", "b", "c", "d", "e");
        flux.collectSortedList(Comparator.reverseOrder())
            .subscribe(list-> System.out.println("Flux转换为排序List集合(逆序):"+list));
    }
//输出结果为:Flux转换为排序List集合(顺序):[f, e, d, c, b, a]

collectMap:转换为一个Map集合

  • 可以通过提供两个 Function 对象来指定如何从流中的元素中提取键和值。第一个 Function 用于提取键,第二个 Function 用于提取值。如果不提供第二个 Function,则默认值为流中的元素本身。
  //流转化为 Map
  public void collectMapExample(){
        Flux<String> fruit = Flux.just("banana", "perry","apple");
        fruit.collectMap(k->k)//map的key就元素本身
                .subscribe(map-> System.out.println("Flux转换为Map集合:"+map));
    }
//输出结果为:Flux转换为Map集合:{banana=banana, apple=apple, perry=perry}

collectMultimap:转换为一个Map集合并分组

  • collectMultimap 会遍历流中的每个元素,并将它们按照指定的键进行分组,将每个键对应的元素收集到一个集合中。collectMultimap 是用于将流中的元素收集到一个 Map 中,一个key有多个value
  • 可以通过提供两个 Function 对象来指定如何从流中的元素中提取键和值。第一个 Function 用于提取键,第二个 Function 用于提取值。如果不提供第二个 Function,则默认值为流中的元素本身。
//流转化为 Map
public void collectMultiMapExample(){
            Flux<String> fruit = Flux.just("123", "12","1342","123456","321","35");
            fruit.collectMultimap(k->k.length())//以元素长度为key
                 .subscribe(map-> System.out.println("Flux转换为Map集合:"+map));

        }
//输出结果为:Flux转换为Map集合:{2=[12, 35], 3=[123, 321], 4=[1342], 6=[123456]}

聚合

merge:合并流

  • 用于将多个流合并在一起,同时订阅这些流。并按照元素的到达顺序依次处理这些流的元素,先到先合并。
public void mergeExample(){
            Flux<Integer> flux1 = Flux.just(1, 2, 5);
            Flux<Integer> flux2 = Flux.just(5, 6, 3);
            Flux.merge(flux1,flux2)
                    .subscribe(System.out::println);
        }
// 输出结果:输出新流 1 2 5 5 6 3

concat:按序连接

  • 用于将多个流合并在一起,形成一个新的流。它会先订阅并处理第一个流的元素,然后再处理下一个流的元素,以此类推,保持了流的顺序。
   public void concatExample(){
            Flux<Integer> flux1 = Flux.just(1, 2, 5);
            Flux<Integer> flux2 = Flux.just(5, 6, 3);
            Flux.concat(flux2,flux1)
                    .subscribe(System.out::println);
        }
// 输出结果:输出新流  5 6 3 1 2 5

concatWith:连接两个流

  • 用于将当前的 Flux 流与指定的 Flux 流连接起来,形成一个新的 Flux 流。
  • 连接后的新 Flux 流中,当前 Flux 流的元素将排在前面,指定的 Flux 流的元素将排在后面,保持它们的顺序不变。
  public void concatWithExample(){
        Flux<Integer> flux1 = Flux.just(1, 2, 5);
        Flux<Integer> flux2 = Flux.just(5, 6, 3);
        flux2.concatWith(flux1)
             .subscribe(System.out::println);
    }//将flux连到flux2后面
//输出结果: 5 6 3 1 2 5 

zip:照索引位置一对一地组合这些流的元素

  • zip会从每个输入的 Flux流中取出相同位置的元素,并将它们按照指定的组合函数进行组合,生成一个新的元素。
  • 可以组合任意数量的 Flux 流,但要求它们的长度必须相同,否则在长度最短的流结束后,zip 操作将停止组合。
// 把两个流中元素组合
public void zipExample(){
            Flux<Integer> flux1 = Flux.just(1, 2, 5,5);
            Flux<Integer> flux2 = Flux.just(5, 6, 3);
            Flux.zip(flux1,flux2,(x,y)->x+y)
                    .subscribe(System.out::println);
        }//流的数量必须相等,否则短的流结束后,zip停止操作
//输出结果6 8 8

// 把三个流中元素组成
public void zipExample2(){
            Flux<Integer> flux1 = Flux.just(1, 2, 5);
            Flux<Integer> flux2 = Flux.just(5, 6, 3);
            Flux<Integer> flux3 = Flux.just(5, 6, 4);
            Flux.zip(flux1,flux2,flux3)
                    .map(tuple->tuple.getT1()+tuple.getT2()+tuple.getT3())
                    .subscribe(System.out::println);
        }
// 输出结果:11 14 12

zipWith:将两个数据流中的元素一一配对,并将它们组合成新的元素

 public void zipWithExample(){
        Flux<Integer> flux1 = Flux.just(1, 2, 5);
        Flux<Integer> flux2 = Flux.just(5, 6, 3);
        flux1.zipWith(flux2,(x,y)->x+y)
                .subscribe(System.out::println);
    }
// 输出结果:6 8 8

combineLatest:合并新元素

  • combineLatest 会持续跟踪多个 Flux 流中最新的元素,并且在任何一个 Flux 流产生新元素时,会取所有 Flux 流中最新的元素进行组合。
    public void combineLatestExample(){
        Flux<Integer> flux1 = Flux.just(1, 2, 3);
        Flux<Integer> flux2 = Flux.just(4, 5, 6);
        Flux.combineLatest(flux1, flux2, (a, b) -> a + b)
                    .subscribe(System.out::println);
    }//flux1中最新的元素3去和flux2里的元素组合
// 输出结果 7 8 9

响应式定时操作

添加时间操作

elapsed:添加间隔时间

  • 用于为每个元素添加自订阅开始以来到元素发出的时间间隔信息。
public void elapsedExample() {
    Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5)
                             .delayElements(Duration.ofSeconds(1)); // 延迟每个元素发出 1 秒
    Flux<Tuple2<Long, Integer>> elapsedFlux = flux.elapsed();
    elapsedFlux.subscribe(tuple -> {
        Long interval = tuple.getT1(); // 获取时间间隔
        Integer value = tuple.getT2(); // 获取原始值
        System.out.println("Interval: " + interval + ", Value: " + value);
    });
    // 为了让程序持续运行,让主线程休眠一段时间
    try {
        Thread.sleep(10000); // 让程序运行 10 秒
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

timestamp:添加时间戳

  • 用于为每个元素添加时间戳信息。
public void timesTampExample() {
    Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);

    Flux<Tuple2<Long, Integer>> timestampFlux = flux.timestamp(); // 使用timestamp操作符添加时间信息

    timestampFlux.subscribe(tuple -> {
        Long timestamp = tuple.getT1(); // 获取时间戳
        Integer value = tuple.getT2(); // 获取原始值
        System.out.println("Timestamp: " + timestamp + ", Value: " + value);
    });
}

超时操作

timeout

  • 用于设置一个超时时间,如果在指定时间内没有收到新的数据项或者完成信号,就会触发超时错误。
  • 这个操作符对于处理需要及时响应的场景非常有用,比如网络请求超时、等待用户输入超时等。

假设我们有一个需求:从一个数据源获取数据,并在一定时间内如果没有新的数据到达,则认为超时。我们可以使用 timeout 操作符来实现这个功能。

public void timeoutExample() {
    // 模拟数据源,每隔一段时间发送一个数据项
    Flux<Integer> dataSource = Flux.just(1, 2, 3, 4)
                                   .delayElements(Duration.ofSeconds(2)); // 每隔 2 秒发送一个数据项
    // 在数据源上应用 timeout 操作符,设置超时时间为 1 秒
    Flux<Integer> timeoutFlux = dataSource.timeout(Duration.ofSeconds(1));
    // 订阅数据流,并处理超时事件
    timeoutFlux.subscribe(
        // 处理正常数据项
        item -> System.out.println("Received item: " + item),
        // 处理超时事件
        error -> System.out.println("Timeout error occurred: " + error)
    );
    // 为了让程序持续运行,让主线程休眠一段时间
    try {
        Thread.sleep(10000); // 让程序运行 10 秒
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

定时操作

interval :定时任务或者周期性操作

  • 用于创建一个周期性地发射递增的 Long 类型数据序列的 Flux
  • 在指定的时间间隔内生成一个递增的序列,从 0 开始,每次递增 1,并且发送给订阅者。
public void intervalExample() {
        // 创建一个每隔一秒发射一个递增的 Long 类型数据序列的 Flux
        Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1));

        // 订阅数据流,并处理数据项
        intervalFlux.subscribe(
            // 处理数据项
            item -> System.out.println("Received item: " + item)
        );

        // 为了让程序持续运行,让主线程休眠一段时间
        try {
            Thread.sleep(10000); // 让程序运行 10 秒
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
}

延迟操作

delay:整个流的延迟发射

  • 用于创建一个延迟发射单个值(或者在一段时间后发射错误或完成信号)的操作符。
  • 指定的延迟时间后,发射一个值或者完成信号。
  • 这个操作符通常用于创建一个在未来某个时间点触发的事件,比如在执行某个异步操作后的一段时间内。
public void delayExample() {
    // 创建一个延迟1秒后发射值的Mono
    Mono<String> delayedMono = Mono.delay(Duration.ofSeconds(1))
                                   .map(ignore -> "延迟之后");

    // 订阅Mono
    delayedMono.subscribe(
        result -> System.out.println("Received value: " + result),
        error -> System.err.println("Error occurred: " + error),
        () -> System.out.println("Completed")
    );

    // 阻塞主线程,以便观察输出
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

Mono#delayElement :单个元素的延迟

  • 用于在 Mono 发射的单个元素上添加延迟。
  • Mono 发射一个元素时,延迟一段时间后再将该元素发射出去。
public void monoDelayElementExample() {
    Mono.just("Hello")
        .delayElement(Duration.ofSeconds(3)) // 在发射元素 "Hello" 后延迟 3 秒再发射
        .doOnNext(System.out::println)
        .subscribe();
}

Flux#delayElements :多个元素的延迟

  • 用于在 Flux 发射的每个元素上添加延迟。
  • 当 Flux 发射一个元素时,延迟一段时间后再将该元素发射出去。

delaySubscription:延迟订阅

  • 用于在调用 subscribe 方法时,延迟一段时间后再开始实际的订阅操作。
  • 比如在订阅前执行一些预处理操作,或者在特定条件下延迟订阅等。
public void delaySubscriptionExample() {
    Flux.range(1, 5)
        .delaySubscription(Duration.ofSeconds(2)) // 延迟 2 秒后开始订阅
        .subscribe(System.out::println);

    // 主线程休眠 3 秒以确保延迟订阅生效
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

注意事项
  • delaySubscription 方法只会影响订阅操作的时机,不会影响数据流中元素的发射时间。
  • 如果在调用 subscribe 方法前已经开始了数据流的发射,则延迟订阅可能会错过一部分数据。

响应式数据分组

分组grouping

分组能够根据 key 将源Flux<T>拆分为多个批次。对应的操作符是 groupBy

每一组用GroupedFlux<T>类型表示,使用它的 key() 方法可以得到该组的 key

public class GroupingDemo {
    public static void main(String[] args)  {
        Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            .groupBy(num -> (num % 2) == 1 ? "odd" : "even")
            .concatMap(g -> g.defaultIfEmpty(-1)
                .map(String::valueOf)
                .startWith(g.key()))
            .subscribe(System.out::println);
    }
}

窗口windowing

window 操作是 根据个数、时间等条件,或能够定义边界的发布者(boundary-defining Publisher), 把源 Flux<T> 拆分为 windows。对应的操作符有 windowwindowTimeoutwindowUntilwindowWhile,以及 windowWhen

以个数为界:window(int)

会出现重叠或丢弃的情况:window(int,int)

StepVerifier.create(
        Flux.range(1, 10)
                .window(5, 3) //overlapping windows
                .concatMap(g -> g.defaultIfEmpty(-1))) //将 windows 显示为 -1
                .expectNext(1, 2, 3, 4, 5)
                .expectNext(4, 5, 6, 7, 8)
                .expectNext(7, 8, 9, 10)
                .expectNext(10)
                .verifyComplete();

🌏 说明

如果将两个参数的配置反过来(maxSize < skip),序列中的一些元素就会被丢弃掉, 而不属于任何 window

以时间为界:window(Duration)

会出现重丢弃或丢弃的情况:window(Duration,Duration)

StepVerifier.create(
        Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
                .windowWhile(i -> i % 2 == 0)
                .concatMap(g -> g.defaultIfEmpty(-1)))
                .expectNext(-1, -1, -1) //分别被奇数 1 3 5 触发
                .expectNext(2, 4, 6) // 被 11 触发
                .expectNext(12) // 被 13 触发
                .expectNext(-1) // 空的 completion window,如果 onComplete 前的元素能够匹配上的话就没有这个了
                .verifyComplete();

缓存buffering

缓存操作之后会发出 buffer(类型为Collection<T>, 默认是 List)。缓存的操作符与窗口的操作符是对应的:bufferbufferTimeoutbufferUntilbufferWhile, 以及bufferWhen

缓存操作也会有丢弃元素或内容重叠的情况

StepVerifier.create(
        Flux.range(1, 10)
                .buffer(5, 3) // 缓存重叠
        )
                .expectNext(Arrays.asList(1, 2, 3, 4, 5))
                .expectNext(Arrays.asList(4, 5, 6, 7, 8))
                .expectNext(Arrays.asList(7, 8, 9, 10))
                .expectNext(Collections.singletonList(10))
                .verifyComplete();

bufferUntilbufferWhile不会发出空的buffer

StepVerifier.create(
        Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
                .bufferWhile(i -> i % 2 == 0)
        )
        .expectNext(Arrays.asList(2, 4, 6)) // 被 11 触发
        .expectNext(Collections.singletonList(12)) // 被 13 触发
        .verifyComplete();

bufferTimeout

public class BufferingDemo {
    public static void main(String[] args) throws InterruptedException {
        // 创建一个每隔一秒发射一个元素的 Flux
        Flux<Integer> sourceFlux = Flux.range(1, 10).delayElements(Duration.ofSeconds(1));

        // 使用 bufferTimeout 操作符,在每个 3 秒的时间间隔内收集元素
        Flux<List<Integer>> bufferedFlux = sourceFlux.bufferTimeout(3, Duration.ofSeconds(3));

        // 订阅并输出收集到的缓冲区
        bufferedFlux.subscribe(System.out::println);

    }
}

响应式重试动作

retry:重试

对出现错误的序列进行重试,对于上游 Flux 是基于重订阅(re-subscribing)的方式。但实际上已经是一个不同的序列了, 发出错误信号的序列仍然是终止了的。

Flux.interval(Duration.ofMillis(250))
    .map(input -> {
        if (input < 3) return "tick " + input;
        throw new RuntimeException("boom");
    })
    .elapsed() 
    .retry(1)
    .subscribe(System.out::println, System.err::println); 

Thread.sleep(2100); 

retryWhen:条件重试

条件重试是一个包含Flux<Throwable>作为 retryWhen 的唯一参数被传递给一个 Function,由开发者自行处理 Flux<Throwable>流,并声明一段处理函数传递给 Function 并返回一个新的 Publisher<?>, 从而实现对重试操作的配置。

retry的区别

  • retryWhen返回的是Flux.empty()
  • retry返回的是error信号.

使用retryWhen实现一个retry(3)

        Flux.<String>error(new IllegalArgumentException())
            .retryWhen(Retry.from(companion -> companion
                    .zipWith(Flux.range(1, 4),
                             (retrySignal, index) -> {
                                 if (index < 4) return index;
                                 else throw Exceptions.propagate(retrySignal.failure());
                             }))
            ).subscribe(i -> System.out.println(System.currentTimeMillis()), System.out::println);

使用retryWhen进行延迟重试

Flux.<String>error(new IllegalArgumentException())
    .retryWhen(Retry.from(companion -> companion
            .doOnNext(s -> System.out.println(s + " at " + LocalTime.now()))
            .zipWith(Flux.range(1, 4), (retrySignal, index) -> {
                if (index < 4) return index;
                else throw Exceptions.propagate(retrySignal.failure());
            })
            .flatMap(index -> Mono.delay(Duration.ofMillis(index * 100)))
            .doOnNext(s -> System.out.println("retried at " + LocalTime.now()))))
    .subscribe();
  • 第一次重试延迟大约 100ms
  • 第二次重试延迟大约 200ms
  • 第三次重试延迟大约 300ms

响应式过滤操作

替换if-else为filter

命令式编程

int[] ints = new int[10]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
for(int i=0;i<ints.length;i++){
    if(ints[i]%2==0){
        System.out.println("当前数据:"+ints[i])
    }
}

响应式编程

int[] ints = new int[10]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
Flux.fromXX(ints)
.filter(i->i%2==2)
.doOnNext(System.out::println)

取集合中第N个元素

过滤一个序列

filter:过滤元素

  • 筛选出符合指定条件的元素,生成一个新的数据流。
  • 接收一个谓词函数(Predicate),用于判断元素是否符合条件。
  //filter过滤掉奇数
    public void filterExample() {
        Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        flux.filter(i -> i % 2 == 0)//留下偶数,去掉奇数
            .subscribe(System.out::print);
    }
    //输出结果:2 4 6 8 10

filterWhen:过滤元素

  • filterWhen 操作符和filter类似,都用于筛选出符合特定条件的元素。区别在于filterWhen是异步。
  //filterWhen过滤掉奇数
   public void filterWhenExample() {
        Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        flux.filterWhen(num -> {
            return Flux.just(num % 2 == 0); // 这里可以是任何异步操作,返回的是一个 Mono<Boolean>
        }).subscribe(System.out::println);
    }
    //输出结果:2 4 6 8 10

ofType:选择需要的类型

  • 过滤源数据流中的元素,只保留指定类型的元素。

      //ofType只留Integer类型数据
      public void ofTypeExample() {
            Flux<Object> flux = Flux.just(1, 5,12, "abc", "jetlinks", 2.0, 4);
            flux.ofType(Integer.class)//只保留Integer类型
                .filter(i -> i % 2 == 0)//将过滤出来的Interger类型数据再过滤,只留偶数
                .subscribe(System.out::println);
        }
        //输出结果:12 4
    

ignoreElements:忽略所有元素

  • 用于忽略源数据流中的所有元素,不需要处理元素本身,只关心数据流的结束状态的情况。
    //  忽略所有元素,只保留序列的完成信号
    public void ignoreElementsExample() {
        Flux<Integer> flux = Flux.range(1, 10);
        flux.ignoreElements()
            .doOnTerminate(() -> System.out.println("成功")) //flux在成功完成时执行
            .subscribe();
    }
//输出结果: 成功

distinct:元素去重

    //去重并排序
       Flux<Integer> flux = Flux.just(1, 2, 3, 3, 5, 6, 7, 6, 9, 10, 1, 2, 5, 4, 5, 6, 11, 8, 9, 10);
        flux.distinct()
                .sort()
            .subscribe(num-> System.out.print(num+"\t"));
    }

//输出结果: 1 2 3 4 1 5 6 3 6 7 8 9 10 11 

distinctUntilChanged:过滤连续重复的元素

  //    去掉连续重复的元素
    public void distinctUntilChangedExample() {
        Flux<Integer> flux = Flux.just(1, 1, 2, 3, 4, 1,5, 5, 6,3, 6, 7, 8, 9, 9, 10, 11);
        flux.distinctUntilChanged()
            .subscribe(num -> System.out.print(num + "\t"));
    }
//输出结果: 1	2	3	4	1	5	6	3	6	7	8	9	10	11	

只要一部分序列

take:取指定数量元素

  //    只取前面3个元素
     public void takeExample1() {
        Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        flux.take(3)
            .subscribe(num -> System.out.print(num + "\t"));
    }
//输出结果: 2	3	4
//  取一段时间内发出的元素
 public void takeExample2() {
        Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        flux.take(Duration.ofSeconds(1))
            .subscribe(num -> System.out.println(num + "\t"));
    }

next:取第一个元素

  //只取第一个元素
    public void nextExample() {
        Flux<Integer> flux = Flux.range(1, 10);
        flux.next()
            .subscribe(num -> System.out.println(num + "\t"));
    }
//输出结果:1

limitRequest :限制请求数量

  • limitRequest 方法用于限制在处理 FluxMono 流时的请求数量。它允许你指定每次订阅时可以处理的元素数量。
//只取前面3个
public void limitRequestExample() {
        Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        flux.limitRequest(3)
            .subscribe(num -> System.out.print(num + "\t"));
    }
// 输出结果: 1 2 3 

takeLast:从末尾取指定数量元素

  //    只取前面3个元素
     public void takeExample1() {
        Flux<Integer> flux = Flux.just( 2, 3, 4, 5, 6, 7, 8, 9, 10);
        flux.take(3)
            .subscribe(num -> System.out.print(num + "\t"));
    }
//输出结果: 8	9	10	
//  取一段时间内发出的元素
 public void takeExample2() {
        Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        flux.take(Duration.ofSeconds(1))
            .subscribe(num -> System.out.println(num + "\t"));
    }

takeUtil:取元素直到满足某个条件(包含)

 //直到满足某个条件(包含)
    public void takeUtilExample(){
        Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        flux.takeUntil(num->num==5)
                .subscribe(num-> System.out.print(num+"\t"));
    }
//输出结果:1	2	3	4	5

takeWhile:取元素直到满足某个条件(不包含)

 //直到满足某个条件(不包含)
    public void takeWhileExample(){
        Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
         flux.takeWhile(num -> num <5)
             .subscribe(System.out::print); 
    }
//输出结果:1	2	3	4	

最多只取 1 个元素

elementAt:通过下标取元素

   //通过下标只取一个值
    public void elementAtExample(){
        Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        flux.elementAt(3)
                .subscribe(System.out::print);
    }
//输出结果:4

last:序列为空时

//如果为序列空则发出错误信号
    public void lastExample1(){
        Flux.empty()
                .last()
            .subscribe(System.out::print);
    }
//输出结果:报NoSuchElementException异常

//如果序列为空则返回默认值
  public void lastExample2() {
        Flux.empty()
                .last("序列为空")
                .subscribe(System.out::println);
    }
//输出结果:序列为空

跳过一些元素

skip:跳过开始n个元素

   //跳过前三个元素
 public void skipExample(){
        Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        flux.skip(3)
                .subscribe(System.out::print);
    }
//输出结果:4 5 6 7 8 9 10


skipLast:跳过最后的 n 个元素

   //跳过最后三个元素
  public void skipLastExample(){
        Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        flux.skipLast(3)
                .subscribe(System.out::print);

    }
//输出结果:1 2 3 4 5 6 7

skipUntil:跳过元素直到满足条件(包含)

   //跳过元素直到值等于5
     public void skipUntilExample() {
        Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        flux.skipUntil(num -> num == 5)
                .subscribe(System.out::print);
    }
//输出结果:5 6 7 8 9 10

skipWhile:跳过元素直到满足条件(不包含)

   //跳过元素直到值等于5
     public void skipWhileExample() {
        Flux<Integer> flux = Flux.range(1,10);
        flux.skipWhile(num -> num <=5)
            .subscribe(System.out::print);
    }
//输出结果: 6 7 8 9 10

采样

sample:给定采样周期来采样

   //每2s采集一个元素
     public void sampleExample()   {
        Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9);
        flux.delayElements(Duration.ofSeconds(1)) // 延迟每个元素的发射时间
            .sample(Duration.ofSeconds(2))
            .subscribe(System.out::print);

        try {
            Thread.sleep(15000);//为了能看到结果,等待一段时间
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
//输出结果: 1 3 5 7 9

sampleFirst:取采样周期里的第一个元素

   //取采样周期里的第一个元素
   public void sampleFirstExample() {
      Flux<String> flux = Flux.just("A", "B", "C", "D", "E", "F", "G", "H", "I", "J");

      flux.sampleFirst(Duration.ofSeconds(2))
          .subscribe(System.out::println);

  }
//输出结果:A

single:只取一个元素

//我只想要一个元素(如果多于一个就返回错误)
   public void singleExample1(){
        Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9);
        flux.single()
                .subscribe(System.out::print);
  }
//输出结果:返回错误

 public void singleExample2(){
        Flux<Integer> flux = Flux.just(1);
        flux.single()
            .subscribe(System.out::print);
    }
//输出结果:1

//如果序列为空,发出错误信号
   public void singleExample3(){
        Flux.empty()
            .single()
            .subscribe(System.out::print);
    }
//输出结果:报NoSuchElementException异常

//如果序列为空,发出一个缺省值
public void singleExample4(){

        Flux.empty()
            .single("序列为空")
            .subscribe(System.out::print);
    }
//输出结果:序列为空

singleOrEmpty:只取一个元素,序列为空就返回空序列

   //只取一个元素,序列为空就返回空序列
    public void singleOrEmptyExample2(){
        Flux.just("A")
            .singleOrEmpty()
            .subscribe(System.out::print);
    }
//输出结果:A
   public void singleOrEmptyExample(){
        Flux.empty()
            .singleOrEmpty()
            .subscribe(System.out::print);
    }
//输出结果:空序列

响应式编程常见问题

我写的操作看上去是正确的,但是没有执行.

有以下几种可能:上游流为空,多个流未组合在一起,在不支持响应式的地方使用了响应式

1. 没有使用return关键字返回

错误

public Mono<Response> handleRequest(Request request){
// 没有return
  	this
      .findOldData(request);
}

正确

public Mono<Response> handleRequest(Request request){
  return this
      .findOldData(request);
}

2. 上游流为空

public Mono<Response> handleRequest(Request request){
   
 return this
      .findOldData(request)
      .flatMap(old -> {
            //这里为什么不执行? 
            return ....
      })
}

🌏 说明

findOldData返回的流为空时,下游的flatMap等操作符需要操作流中元素的操作符是不会执行的。 可以通过switchIfEmpty操作符来处理空流的情况。

3. 多个流未组合在一起

  1. 只要方法返回值是Mono或者Flux,都不能单独行动。
  2. 只要方法中调用了任何响应式操作,那这个方法也应该是响应式。(返回Mono或者Flux)
class Service{
    Mono<Void> handleRequest(request);
}

//错误示例,handleRequest是响应式的,但是此方法没有使用响应式操作。
public Result handleRequest(Request request){
 
 service.handleRequest(request);

 return ok;
}

//正确示例
public Mono<Result> handleRequest(Request request){
 return service
         //处理请求
         .handleRequest(request)
         //返回结果
         .thenReturn(ok);
}

4. 在不支持响应式的操作符中使用响应式

public Mono<Void> saveLog(Request req,Response resp){
    ...
}

public Mono<Result> handleRequest(Request request){

return service
         //处理请求
         .handleRequest(request)
         //记录日志 此为错误的用法,saveLog是响应式的,但是doOnNext并不支持响应式操作
         .doOnNext(response-> saveLog(request,response) )
         //返回结果
         .thenReturn(ok);
}

doOnNext方法的语义以及参数Consumer<T>可知,此方法是不支持响应式的(Consumer<T>只有参数没有返回值)。因此不能在此方法中使用响应式操作。

return service
         //处理请求
         .handleRequest(request)
         //记录日志
         .flatMap(response-> saveLog(request,response) )
         //返回结果
         .thenReturn(ok);

5. 在流内部订阅终止了整个流

public Mono<Void> saveLog(Request req,Response resp){
	...
}
//错误
public Mono<Response> handleRequest(Request request){
	
	return service
	//处理请求
	.handleRequest(request)
	//记录日志 此为错误的用法
	.flatMap(response-> {
		saveLog(request,response)
		.subscribe();
		return Mono.emtpy();
	})
	//返回结果
	.thenReturn(ok);
}

//正确
public Mono<Response> handleRequest(Request request){
	
	return service
	//处理请求
	.handleRequest(request)
	//记录日志
	.flatMap(response-> {
		return saveLog(request,response);
	})
	//返回结果
	.thenReturn(ok);
}

6. 订阅时机不对

7. 用在 Flux 上的操作符好像没起作用,为啥?

Flux<String> flux = Flux.just("foo", "chain");
flux.map(secret -> secret.replaceAll(".", "*")); //①
flux.subscribe(next -> System.out.println("Received: " + next));

错误原因:问题在①, flux 变量并没有改变。

Flux<String> flux = Flux.just("foo", "chain");
flux = flux.map(secret -> secret.replaceAll(".", "*"));//分开写得 这里记得返回!!
flux.subscribe(next -> System.out.println("Received: " + next));

//或者您可以尝试下面的写法
Flux<String> flux = Flux.just("foo", "chain")
                     .map(secret -> secret.replaceAll(".", "*"));
flux.subscribe(next -> System.out.println("Received: " + next));

//如果不存在别的地方需要订阅flux 下面的方法可以尝试
   Flux
  .just("foo", "chain")
  .map(secret -> secret.replaceAll(".", "*"))
  .subscribe(next -> System.out.println("Received: " + next));

我想获取流中的元素怎么办

不要试图从流中获取数据出来,而是先思考需要对流中元素做什么。

需要对流中的数据进行操作时,都应该使用对应操作符来处理,根据Flux或者Mono提供的操作符API进行组合操作。

public List<Book> getAllBooks(){
    List<BookEntity> bookEntities = repository.findAll();

    List<Book> books = new ArrayList(bookEntities.size());

    for(BookEntity entity : bookEntities){
        Book book = entity.copyTo(new Book());
        books.add(book);
    }

    return books;
}

错误示例:

public Book getAllBooks(){

 return getRepository()
		.createQuery()
		.where("id",1)
		.fetchOne()
		.block();
}

‼️ 警告

在响应式编程中,在任何时候执行业务代码时都不要使用**block()**方法,使用block()方法可能引发以下问题:

  1. 阻塞线程:调用block()方法会阻塞当前线程,导致无法处理其他并发请求。这会降低系统的吞吐量和响应性能。
  2. 死锁风险:如果在处理响应式流时使用了block()方法,而其中某些操作也依赖于同一个线程的结果,则可能导致死锁。
  3. 内存资源浪费:阻塞调用将持续占用线程,而每个线程都需要额外的内存资源。如果应用程序中同时有大量的阻塞操作,可能导致线程池耗尽和内存资源浪费。

正确示例:

🌏 说明

为了避免使用block(),我们应该尽可能地使用响应式操作符(如map、flatMap、filter等)对数据流进行转换和处理,并使用其他响应式方法(如subscribe())来订阅数据流并触发异步处理。

public Flux<Book> getAllBooks(){
	return repository
          .findAll()
          .map(entity-> entity.copyTo(new Book()))
}

在非响应式方法中如何使用响应式

public void handleRequest(Request request){

    //不到万不得已请勿使用block方法
    //logService.saveLog(request).block()

    //
  logService
    .saveLog(request)
    .subscribe(
        result->log.debug("保存成功 {}",request),
        error->log.warn("保存失败 {}",request,error)
    )
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1709907.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

大模型部署_书生浦语大模型 _作业2

本节课可以让同学们实践 4 个主要内容&#xff0c;分别是&#xff1a; 1、部署 InternLM2-Chat-1.8B 模型进行智能对话 1.1安装依赖库&#xff1a; pip install huggingface-hub0.17.3 pip install transformers4.34 pip install psutil5.9.8 pip install accelerate0.24.1…

在线教程丨与 Sora 技术路线相似!全球首个开源文生视频 DiT 模型 Latte 一键部署

自OpenAI推出 Sora 以来&#xff0c;「文生视频」概念及相关应用备受瞩目。而伴随 Sora 的大热&#xff0c;其背后的关键技术&#xff0c;DiT(Diffusion Transformers) 也被「考古挖掘」了出来。 事实上&#xff0c;DiT 是一个文生图模型&#xff0c;该模型于两年前开源&#x…

linux 定时执行shell、python脚本

在linux里设置定时执行一般是用crontab&#xff0c;如果没有的话&#xff0c;可以先安装&#xff1a; 安装 查看是否安装 cron -v # 对于基于Debian的系统&#xff08;如Ubuntu&#xff09; sudo apt-get install cron# 对于基于RedHat的系统&#xff08;如CentOS&#xff…

基于Java实现震中附近风景区预警可视化分析实践

目录 前言 一、空间数据说明 1、表结构信息展示 2、空间范围查询 二、Java后台开发实现 1、模型层设计与实现 2、控制层设计与实现 三、Leaflet地图开发 1、地震震中位置展示 2、百公里风景区列表展示 3、风景区列表展示 4、附近风景区展示 四、总结 前言 地震这类…

打印机手动双面打印技巧

一、WORD和PDF &#xff08;1&#xff09;首先选择要打印的页面范围&#xff0c;然后选择仅奇数页打印 &#xff08;2&#xff09;将打印完的纸张翻过来&#xff0c;白纸朝上&#xff0c;纸张的头部先放入打印机 &#xff08;3&#xff09;选择要打印的页面范围&#xff0c;然…

【problem】解决EasyExcel导出日期数据显示为#####问题

前言 在使用EasyExcel进行数据导出时&#xff0c;你可能遇到日期或其他数据在Excel中显示为“#######”的情况&#xff0c;这通常是因为列宽不足以展示单元格内的全部内容。本文将指导你如何通过简单的步骤解决这一问题&#xff0c;并确保导出的Excel文件自动调整列宽或直接指…

成都蓝蛙科技引领AIGC创新,亮相中国AIGC开发者大会

2024年5月25日&#xff0c;第三届AIGC中国开发者大会在北京举行&#xff0c;蓝蛙科技公司CEO兼创始人李辰受邀出席并发表主题演讲。作为开源框架GeneralAgent的作者&#xff0c;发表了题为“Agent框架的挑战和解决方案”的精彩演讲。李辰先生深入探讨了在构建和部署基于大型语言…

git冲突

git冲突的产生&#xff1a; 首先用户A新建一个文件conflict&#xff0c;并在里面添加内容 然后通过add,commit,push将该文件上传到远端仓库 然后用户B通过pull将程序拉下来之后&#xff0c;也在这个文档里面进行编辑&#xff0c;并且内容不一样 如果这个时候其中一个人push&…

Redis面试题深度解析

1、我看你做的项目中&#xff0c;都用到了redis&#xff0c;你在最近的项目中哪些场景使用了redis呢? 2、缓存穿透 布隆过滤器的误判现象 Redisson和Guava都对布隆过滤器进行了实现 3、缓存击穿 互斥锁&#xff0c;就是一个线程来修改&#xff0c;并占据了锁&#xff0c;另外其…

C#--Mapster(高性能映射)用法

1.Nuget安装Mapster包引用 2.界面XAML部分 <Window x:Class"WpfApp35.MainWindow"xmlns"http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:x"http://schemas.microsoft.com/winfx/2006/xaml"xmlns:d"http://schemas.m…

实战指南:Vue 2基座 + Vue 3 + Vite + TypeScript微前端架构实现动态菜单与登录共享

实战指南&#xff1a;Vue 2基座 Vue 3 Vite TypeScript子应用vue2微前端架构实现动态菜单与登录共享 导读&#xff1a; 在当今的前端开发中&#xff0c;微前端架构已经成为了一种流行的架构模式。本文将介绍如何结合Vue 2基座、Vue 3子应用、Vite构建工具和TypeScript语言…

华为机考入门python3--(32)牛客32-密码截取

分类&#xff1a;最长对称子串、动态规划 知识点&#xff1a; 生成二维数组 dp [[0] * n for _ in range(n)] 求最大值 max(value1, value2) 动态规划的步骤 a. 定义问题 长度为n下最长的对称子串的长度 b. 确定状态 dp[i][j]表示字符串从索引i到j的子串是否为对称…

2024.5.28晚训题解

提前预告&#xff0c;市赛初中组会考算法题&#xff0c;应该会有两道模板题 比如DFS BFS 二分 简单动态规划&#xff0c;虽然我们没学多久&#xff0c;但是模板题你还是要会写的 A题 编辑距离 动态规划 注意多组输入 #include<iostream> using namespace std; int dp[1…

unity3D获取某天的0点和23点59分59秒

系列文章目录 unity工具 文章目录 系列文章目录unity工具 &#x1f449;一、前言&#x1f449;二、获取某一天的0点和23点59分59秒1-1.代码如下1-2.调用方法如下1-2-1.获取当天的时间1-2-2.获取某一天的时间 &#x1f449;三、当月第一天0时0分0秒&#x1f449;四、当月最后一…

SHELL编程(三)网络基础命令 Makefile

目标 一、网络基础及相关命令&#xff08;一&#xff09;网络相关命令&#xff08;二&#xff09;重启网络服务 二、Makefile&#xff08;一&#xff09;标签式语法&#xff08;二&#xff09;目标:依赖 式语法1. 格式2. 编译流程&#xff1a;预处理 编译 汇编 链接3. 目标和伪…

TiDB-从0到1-体系结构

TiDB从0到1系列 TiDB-从0到1-体系结构TiDB-从0到1-分布式存储TiDB-从0到1-分布式事务 一、TiDB体系结构图 TiDB基础的体系架构中有4大组件 TiDB Server&#xff1a;用于处理客户端的请求PD&#xff1a;体系的大脑&#xff0c;存储元数据信息TiKV&#xff1a;存储数据TiFlash…

Stable Diffusion 模型演进:LDM、SD 1.0, 1.5, 2.0、SDXL、SDXL-Turbo 等

节前&#xff0c;我们星球组织了一场算法岗技术&面试讨论会&#xff0c;邀请了一些互联网大厂朋友、参加社招和校招面试的同学。 针对算法岗技术趋势、大模型落地项目经验分享、新手如何入门算法岗、该如何准备、面试常考点分享等热门话题进行了深入的讨论。 合集&#x…

Vue3+Ant design 实现Select下拉框一键全选/清空

最近在做后台管理系统项目的时候&#xff0c;产品增加了一个让人非常苦恼的需求&#xff0c;让在Select选择器中添加一键全选和清空的功能&#xff0c;刚开始听到的时候真是很懵&#xff0c;他又不让在外部增加按钮&#xff0c;其实如果说在外部增加按钮实现全选或者清空的话&a…

触摸屏是输入设备还是输出设备?

从功能上讲&#xff0c;触摸屏理应属于输入设备&#xff0c;之所以有很多用户会误会它是输出设备&#xff0c;是因为将其与“触摸显示屏”搞混了&#xff0c;以手机屏幕为例&#xff0c;它并不是单层屏幕&#xff0c;而是有多个不同功能和作用组成的集成屏&#xff0c;这类带有…

ubuntu-24.04系统静态Mac和IP配置

操作系统版本&#xff08;桌面版&#xff09;&#xff1a;ubuntu-24.04-desktop-amd64.iso 原因说明&#xff1a;因网络的IP地址和Mac是预分配的&#xff0c;所以ubuntu系统需要修改网卡的mac地址和IP才能访问&#xff0c;网络查了半天资料都没成功&#xff0c;后再界面提示&a…