05-流式操作:使用 Flux 和 Mono 构建响应式数据流

news2025/1/21 10:18:36

1 通过 Flux 对象创建响应式流

  • 基于各种工厂模式的静态创建方法
  • 编程的方式动态创建 Flux

相对而言,静态方法在使用上都比较简单,但不如动态方法来得灵活。我们来一起看一下。

2 通过静态方法创建 Flux

Reactor 中静态创建 Flux 的方法常见的包括 just()、range()、interval() 以及各种以 from- 为前缀的方法组等。因为 Flux 可以代表 0 个数据,所以也有一些专门用于创建空序列的工具方法。

2.1 just() 方法

我已经在上一讲为你演示过 just() 方法,它可以指定序列中包含的全部元素,创建出来的 Flux 序列在发布这些元素之后会自动结束。一般情况下,在已知元素数量和内容时,使用 just() 方法是创建 Flux 的最简单直接的做法。

示例:

Flux.just("Hello", "World").subscribe(System.out::println);
Hello
World

这里我们对 Flux 执行了用于订阅的 subscribe() 方法,并通过使用 Lambda 表达式调用了 System.out.println() 方法,这意味着将结果打印到系统控制台。关于 subscribe() 方法以及对响应式流的订阅过程,我会在本讲后续内容中进一步说明。

fromXXX() 方法组

如果我们已经有了一个数组、一个 Iterable 对象或 Stream 对象,那么就可以通过 Flux 提供的 fromXXX() 方法组来从这些对象中自动创建 Flux,包括 fromArray()、fromIterable() 和 fromStream() 方法。

示例:

Flux.fromArray(new Integer[] {1, 2, 3})
	.subscribe(System.out::println);

执行结果

1
2
3

range() 方法

如果你快速生成一个整数数据流,那么可以采用 range() 方法,该方法允许我们指定目标整数数据流的起始元素以及所包含的个数,序列中的所有对象类型都是 Integer,这在创建连续的年份信息或序号信息等场景下非常有用。使用 range() 方法创建 Flux 对象的示例代码如下所示。

Flux.range(2020, 5).subscribe(System.out::println);

显然,这段代码会在控制台中打印出 5 行记录,从 2020 开始,到 2024 结束。

interval() 方法

在 Reactor 框架中,interval() 方法可以用来生成从 0 开始递增的 Long 对象的数据序列。通过 interval() 所具备的一组重载方法,我们可以分别指定这个数据序列中第一个元素发布之前的延迟时间,以及每个元素之间的时间间隔。interval() 方法相对复杂,我们先附上它的弹珠图,如下所示。

使用 interval() 方法创建 Flux 示意图(来自 Reactor 官网)

可以看到,上图中每个元素发布时相当于添加了一个定时器的效果。使用 interval() 方法的示例代码如下所示。

Flux.interval(Duration.ofSeconds(2), Duration.ofMillis(200)).subscribe(System.out::println);

这段代码的执行效果相当于在等待 2 秒钟之后,生成一个从 0 开始逐一递增的无界数据序列,每 200 毫秒推送一次数据。

empty()、error() 和 never()

根据上一讲介绍的 Reactor 异步序列的语义,我们可以分别使用 empty()、error() 和 never() 这三个方法类创建一些特殊的数据序列。其中,如果你希望创建一个只包含结束消息的空序列,那么可以使用 empty() 方法,使用示例如下所示。显然,这时候控制台应该没有任何的输出结果。

Flux.empty().subscribe(System.out::println);

然后,通过 error() 方法可以创建一个只包含错误消息的序列。如果你不希望所创建的序列不发出任何类似的消息通知,也可以使用 never() 方法实现这一目标。当然,这几个方法都比较少用,通常只用于调试和测试。

小结

不难看出,静态创建 Flux 的方法简单直接,一般用于生成那些事先已经定义好的数据序列。

而如果:

  • 数据序列事先无法确定
  • 或生成过程中包含复杂的业务逻辑

就需要用到动态创建方法。

3 通过动态方法创建 Flux

动态创建 Flux 所采用的就是以编程的方式创建数据序列,最常用的就是 generate() 方法和 create() 方法。

generate() 方法

generate() 方法生成 Flux 序列依赖于 Reactor 所提供的 SynchronousSink 组件,定义如下。

public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator)

SynchronousSink 组件包括 next()、complete() 和 error() 这三个核心方法。从 SynchronousSink 组件的命名上就能知道它是一个同步的 Sink 组件,也就是说元素的生成过程是同步执行的。

next() 方法只能最多被调用一次。使用 generate() 方法创建 Flux 的示例代码如下。

Flux.generate(sink -> {
    sink.next("javaedge");
    sink.complete();
}).subscribe(System.out::println);

运行该段代码,会在系统控制台上得到“javaedge”。我们在这里调用了一次 next() 方法,并通过 complete() 方法结束了这个数据流。如果不调用 complete() 方法,那么就会生成一个所有元素均为“javaedge”的无界数据流。

这个示例非常简单,但已经具备了动态创建一个 Flux 序列的能力。如果想要在序列生成过程中引入状态,那么可以使用如下所示的 generate() 方法重载。

Flux.generate(() -> 1, (i, sink) -> {
            sink.next(i);
            if (i == 5) {
                sink.complete();
            }
            return ++i;
}).subscribe(System.out::println);

引入一个代表中间状态的变量 i,然后根据 i 的值来判断是否终止序列。显然,以上代码的执行效果会在控制台中输入 1 到 5 这 5 个数字。

create()

create() 方法与 generate() 方法比较类似,但它使用的是一个 FluxSink 组件,定义如下。

public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter)

FluxSink 除了 next()、complete() 和 error() 这三个核心方法外,还定义了背压策略,并且可以在一次调用中产生多个元素。使用 create() 方法创建 Flux 的示例代码如下。

Flux.create(sink -> {
            for (int i = 0; i < 5; i++) {
                sink.next("javaedge" + i);
            }
            sink.complete();
}).subscribe(System.out::println);

运行该程序,我们会在系统控制台上得到从“javaedge0”到“javaedge4”的 5 个数据。通过 create() 方法创建 Flux 对象的方式非常灵活,在本专栏中会有多种场景用到这个方法。

以上就是通过Flux 对象创建响应式流的方法,此外,还可以通过 Mono 对象来创建响应式流,我们一起来看一下。

4 通过 Mono 对象创建响应式流

可认为它是 Flux 的一种特例,所以很多创建 Flux 的方法同样适用。针对静态创建 Mono 的场景,前面给出的 just()、empty()、error() 和 never() 等方法同样适用。除了这些方法之外,比较常用的还有 justOrEmpty() 等方法。

justOrEmpty() 方法会先判断所传入的对象中是否包含值,只有在传入对象不为空时,Mono 序列才生成对应的元素,该方法示例代码如下。

Mono.justOrEmpty(Optional.of("javaedge"))
	.subscribe(System.out::println);

另一方面,如果要想动态创建 Mono,我们同样也可以通过 create() 方法并使用 MonoSink 组件,示例代码如下。

Mono.create(sink ->
sink.success("javaedge")).subscribe(System.out::println);

5 订阅响应式流

可通过 subscribe() 添加相应的订阅逻辑。调用 subscribe() 方法时可指定需要处理的消息通知类型。

Flux 和 Mono 提供了一批非常有用的 subscribe() 方法重载方法,大大简化订阅的开发例程。这些重载方法包括:

//订阅流的最简单方法,忽略所有消息通知
subscribe();
 
//对每个来自 onNext 通知的值调用 dataConsumer,但不处理 onError 和 onComplete 通知
subscribe(Consumer<T> dataConsumer);
 
//在前一个重载方法的基础上添加对 onError 通知的处理
subscribe(Consumer<T> dataConsumer, Consumer<Throwable> errorConsumer);
 
//在前一个重载方法的基础上添加对 onComplete 通知的处理
subscribe(Consumer<T> dataConsumer, Consumer<Throwable> errorConsumer,
Runnable completeConsumer);
 
//这种重载方法允许通过请求足够数量的数据来控制订阅过程
subscribe(Consumer<T> dataConsumer, Consumer<Throwable> errorConsumer,
Runnable completeConsumer, Consumer<Subscription> subscriptionConsumer);
 
//订阅序列的最通用方式,可以为我们的 Subscriber 实现提供所需的任意行为
subscribe(Subscriber<T> subscriber);

在“Spring 为什么选择 Reactor 作为响应式编程框架”提到 Reactor 中的消息通知类型有三种,即:

  • 正常消息
  • 错误消息
  • 完成消息

通过上述 subscribe() 重载方法,可以:

  • 只处理其中包含的正常消息
  • 也可同时处理错误消息和完成消息

如下代码示例展示同时处理正常和错误消息的实现方法。

Mono.just(“javaedge”)
         .concatWith(Mono.error(new IllegalStateException()))
         .subscribe(System.out::println, System.err::println);

以上代码的执行结果如下所示,我们得到了一个“javaedge”,同时也获取了 IllegalStateException。

javaedge 
java.lang.IllegalStateException

有时候我们不想直接抛出异常,而是希望采用一种

容错策略

返回一个默认值

就可以采用如下方式。

Mono.just(“javaedge”)
          .concatWith(Mono.error(new IllegalStateException()))
          .onErrorReturn(default)
          .subscribe(System.out::println);

以上代码的执行结果如下所示,当产生异常时我们使用 onErrorReturn() 方法返回一个默认值“default”。

javaedge 
default

另外一种容错策略

通过 switchOnError() 方法使用另外的流来产生元素,以下代码演示了这种策略,执行结果与上面的示例一致。

Mono.just(“javaedge”)
         .concatWith(Mono.error(new IllegalStateException()))
         .switchOnError(Mono.just(“default”))
         .subscribe(System.out::println);

我们可以充分利用 Lambda 表达式来使用 subscribe() 方法,例如下面这段代码。

Flux.just("javaedge1", "javaedge2", "javaedge3").subscribe(data -> System.out.println("onNext:" + data), err -> {
        }, () -> System.out.println("onComplete"));

这段代码的执行效果如下所示,可以看到,我们分别对 onNext 通知和 onComplete 通知进行了处理。

onNext:javaedge1
onNext:javaedge2
onNext:javaedge3
onComplete

总结

本文介绍了如何创建 Flux 和 Mono 对象,以及如何订阅响应式流的系统方法。想要创建响应式流,可以利用 Reactor 框架所提供的各种工厂方法来达到静态创建的效果,同时也可以使用更加灵活的编程式方式来实现动态创建。而针对订阅过程,Reactor 框架也提供了一组面向不同场景的 subscribe 方法。

FAQ

在 Reactor 中,通过编程的方式动态创建 Flux 和 Mono 有哪些方法?

一旦我们创建了 Flux 和 Mono 对象,就可以使用操作符来操作这些对象从而实现复杂的数据流处理。下一讲,我们就要引入 Reactor 框架所提供的各种操作符来达成这一目标。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/737990.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

机器学习、深度学习、人工智能三者之间究竟是什么关系?

1 什么是人工智能 人工智能&#xff08;Artificial Intelligence&#xff09;&#xff1a;人工智能是一个广泛的概念&#xff0c;指的是使计算机系统具备像人类一样的智能和能力。人工智能涵盖了包括机器学习和深度学习在内的各种方法和技术&#xff0c;旨在让计算机能够感知、…

IPO观察丨黑芝麻智能递表港交所,车芯行业已迈向量产决战期?

虽然我国汽车芯片行业还处在发展初期&#xff0c;但已经迸发出无限潜力。 一方面&#xff0c;智能汽车的发展不断带动需求增长&#xff1b;另一方面&#xff0c;政策引导下&#xff0c;汽车芯片企业持续抢占高地。对此&#xff0c;在7月5日-7日举办的2023年中国汽车论坛上&…

【数据结构二叉树OJ系列】7、构建二叉树并中序遍历

目录 题述&#xff1a; 思路&#xff1a; 正确代码&#xff1a; 题述&#xff1a; 编写一程序&#xff0c;读入用户输出的一串先序遍历字符串&#xff0c;根据此字符串建立一个二叉树&#xff08;以指针方式存储&#xff09;。例如如下的先序遍历字符串&#xff1a;ABC##DE#…

C++通过回车结束循环输入

试想一个案例&#xff0c;假设需要你输入n行数字&#xff0c;而每一行输入的数字数量都未知&#xff08;不定&#xff09;&#xff0c;如何通过C来实现这一操作&#xff1f; 本贴笔者给出一个具体案例&#xff1a;首先规定输入的行数&#xff0c;而后在每一行输入不定量的数字&…

Scratch 足球打蛤蟆

Scratch 足球打蛤蟆 本程序转换为HTML后运行。“足球”角色和角色“麦克斯”跟随鼠标&#xff0c;点击鼠标时“足球”角色复制并向鼠标方向开始移动&#xff0c;碰到边缘反弹&#xff0c;移动一定步数后删除。“蛤蟆”角色每0.5秒在随机位置和方向生成&#xff0c;碰到足球角色…

精彩回顾 | 模型与数据驱动工业数智化——Modelica暨装备数字化研讨会(2023)圆满召开

2023年6月30日&#xff0c;以“模型与数据驱动工业数智化”为主题的Modelica暨装备数字化研讨会&#xff08;2023&#xff09;在江苏省苏州市隆重召开。会议由苏州同元软控信息技术有限公司&#xff08;简称“同元软控”&#xff09;与哈尔滨工业大学计算学部等单位联合主办&am…

信息文档管理与配置管理

目录 ​编辑 一、软件文档的分类 1.1 开发文档 1.2 产品文档 1.3 管理文档 二、文档质量等级划分 2.1 1级文档 2.2 内部文档(2级) 2.3 工作文档&#xff08;3级&#xff09; 2.4 正式文档&#xff08;4级&#xff09; 三、配置管理 3.1 配置管理的定义 3.2 配置管理的6个主要…

F5是什么意思?聊聊你所不知道的F5

5月底&#xff0c;有幸参加了F5 Forum 科技趋势峰会&#xff0c;这让我不仅关注数字化企业和应用服务的技术趋势&#xff0c;也对华丽转型后的F5有了更深入的了解。如果你对F5是什么意思尚且存在疑问&#xff0c;那这篇文章我们就聊聊我眼中的F5&#xff0c;这个应用及API交付和…

编程(44)----------MySQL索引存储

MySQL的索引存储使用的并非像其展示出的那样以类似表格的方式. 而是以B数的方式存储. 在此之前先了解一下作为前身的B树 首先明确, 无论如何数据库中的存储方式都是树型. 而B树其实就是多叉树. 其结构大概如图所示. 假设根节点中存储了key1为5 33 这两个数据. 那其子节点就以这…

CYCLO(GLY-L-PHE),10125-07-2,环(甘氨酰-L-苯丙氨酰),具有生物活性的光学异构体

资料编辑|陕西新研博美生物科技有限公司小编MISSwu​ cyclo(Gly-Phe)&#xff0c;CYCLO(-GLY-PHE)&#xff0c;CYCLO(GLY-L-PHE)&#xff0c;Cyclo(-Gly-L-Phe)≥ 95% (HPLC) 环(甘氨酰-L-苯丙氨酰)Product structure&#xff1a; Product specifications&#xff1a; 1.CAS N…

软件测试工程师必备的SQL语句基础

文末有惊喜 为一个软件测试工程师&#xff0c;我们在测试过程中往往需要对数据库数据进行操作&#xff0c;但是我们的操作大多以查询居多&#xff0c;有时会涉及到新增&#xff0c;修改&#xff0c;删除等操作&#xff0c;所以我们其实并不需要对数据库的操作有特别深入的了解&…

vscode使用Eslint+Prettier格式化代码

1、安装Eslint插件和Prettier插件 2、 安装eslint npm install eslint -g1&#xff09;、初始化项目 npm init -y 2&#xff09;、生成eslint配置文件 npx eslint --init 完之后生成一个.eslintrc.json的文件 二、vscode配置 1、vscode需要配置保存自动化格式 设置 ->…

QWebEngine应用---基于QWebChannel实现网页与qt层交互

Qt提供了QWebChannel实现和网页的通信&#xff0c;我们直接拿github上一个能直接运行的demo来做说明&#xff0c;demo是基于Widget&#xff0c;且页面是自己实现的页面&#xff0c;接着会介绍基于QML实现且页面是第三方网站如何使用的。 QWebChannel用法 我们先看看demo的运行…

RocketMQ5.0消息消费<二> _ 消息队列负载均衡机制

RocketMQ5.0消息消费&#xff1c;二&#xff1e; _ 消息队列负载均衡机制 一、消费队列负载均衡概览 RocketMQ默认一个主题下有4个消费队列&#xff0c;集群模式下同一消费组内要求每个消费队列在同一时刻只能被一个消费者消费。那么集群模式下多个消费者是如何负载主题的多个…

第四十四章Java访问对象的属性和行为以及销毁

Java访问对象的属性和行为 每个对象都有自己的属性和行为&#xff0c;这些属性和行为在类中体现为成员变量和成员方法&#xff0c;其中成员变量对应对象的属性&#xff0c;成员方法对应对象的行为。 在Java中&#xff0c;要引用对象的属性和行为&#xff0c;需要使用点…

【MySQL 】MySQL 创建数据库, MySQL 删除数据库,MySQL 选择数据库

作者简介&#xff1a; 辭七七&#xff0c;目前大一&#xff0c;正在学习C/C&#xff0c;Java&#xff0c;Python等 作者主页&#xff1a; 七七的个人主页 文章收录专栏&#xff1a; 七七的闲谈 欢迎大家点赞 &#x1f44d; 收藏 ⭐ 加关注哦&#xff01;&#x1f496;&#x1f…

iPhone苹果手机桌面上快速记录笔记的步骤

现在越来越多的人喜欢上记笔记&#xff0c;因为记笔记是一种提升效率和组织思维的重要方式。随着移动设备的普及&#xff0c;手机逐渐成为我们生活中不可或缺的工具之一。在手机上记笔记就成为一种很快捷的记录方式&#xff0c;可以让我们随时随地记录灵感和重要信息。在众多记…

计算机体系结构基础知识介绍之动态调度Tomasulo 算法(二)

Tomasulo方法是一种计算机硬件架构的算法&#xff0c;用于动态调度指令的执行&#xff0c;允许乱序执行以及更有效率的使用多个执行单元。它由IBM公司在1967年提出&#xff0c;首次应用是在IBM System/360 Model 91的浮点单元上。Tomasulo方法的主要创新包括在硬件中进行寄存器…

轻松学习阿里云原生内存数据库Tair

&#x1f4d6;轻松学习阿里云原生内存数据库Tair &#x1f680;前言☁️什么是Redis&#xff1f;☁️什么是云原生内存数据库&#xff1f;✨特点 &#x1f680;阿里云原生内存数据库Tair&#x1f47b;简介✨功能特性&#x1f5fa;️应用场景 ✍️上手案例&#x1f3af; 基于Red…

PCB笔记(PCB设计流程)

双层PCB设计流程&#xff08;以AD10为例&#xff09; 1. Preferences常规设置2. 画好原理图后3.编译工程&#xff0c;看是否有错4.然后执行更新到PCB5. 布线前常规规则设置6. 布局之后开始布线7.布线结束之后&#xff0c;开始铺铜8. 创建铜皮之前调丝印&#xff0c;将所有丝印调…