响应式编程-Project Reactor Mono 介绍

news2025/1/4 5:50:29

响应式编程-Project Reactor Mono 介绍

本文以Mono的角度来介绍Reactor编程,Flux的使用同理。

初体验

Web应用 controller 方法在Spring webmvc 和 Spring webFlux下Controller方法实现示例如下:

Spring webmvc:

    @GetMapping("/test1")

    @ResponseBody

    public String test1(){

        String result =  geterateTest();

        return result;

    }

Spring webFlux

    @GetMapping("/test2")

    @ResponseBody

    public Mono<String> test2(){

        Mono<String> result = Mono.fromSupplier(this:: geterateTest);

        return result;

    }

一个的响应是String对象, 另一个是Mono<String>对象。Reactor Mono表示一个产生0-1元素的异步序列,异步指Mono创建的时候并不会执行任何操作,当Mono发生订阅时才触发Mono序列的运行。非阻塞表示test2方法不会产生任何阻塞,即使genereateTest里面是一个阻塞的操作,因为此时不会执行实际的逻辑,所以不会发生任何阻塞。

NettyHttpServer.onStateChange方法中构建Mono并进行订阅。

HttpServerOperations ops = (HttpServerOperations)connection;

//Web Flux将按照Spring Web中的约定构建一个Publisher(执行过滤器、Controller方//法)

Publisher<Void> publisher = (Publisher)this.handler.apply(ops, ops);

Mono<Void> mono = Mono.deferContextual((ctx) -> {

      ops.currentContext = Context.of(ctx);

      return Mono.fromDirect(publisher);

});

……

//subscribe将触发前面Spring web中封装在Mono构建过程中的业务逻辑的真正执行。

//如果我们按照命令是编程去编写代码,业务逻辑在构建Mono的过程中就执行了。

mono.subscribe(ops.disposeSubscriber());

注: Spring web flux框架下也可以按照传统的命令式编程。

Mono的构建

Reactor编程可以分为 异步序列Mono/Flux的构建和和使用两部分。

Mono的基本构建

Mono类 提供了大量静态方法帮助构建Mono。

  • just(T):返回T类型对象的Mono序列
  • fromFuture(future):Mono序列的元素对象由future产生,订阅时Future产生T并推送至订阅者。其他from方法类似。
  • empty():返回一个订阅时直接完成的异步序列
  • error():返回一个订阅时直接推送错误信号的序列

其他方法详见Mono类API:

如:Mono<String> mono = Mono.just("TEST");

Mono装配

假设我们按照上面示例,将整个程序都以响应式编程的模式进行开发,方法都返回一个异步序列Mono/Flux。当调用者调用某一个方法时,面对返回的Mono/Flux对象有两种选择:1. 订阅(触发执行), 2.装配(Assembly):继续将获取到的异步序列封装到一个新的异步序列中,继续返回给外部调用者。如:Spring Web Flux 则是将Spring web 定义的包括WebFilter、Controller等逻辑组装成一个复合的Mono,最终进行订阅。

图1 Mono装配示例

OptimizableOperator 接口

       OptimizableOperator <IN, OUT>接口提供了指向下一个OptimizableOperator的指针,并且提供了从IN型订阅者获取OUT订阅者的方法,提供了一个Mono串行的组装方法。

图2 OptimizableOperator接口串行组装示意图

要实现一个串行化的Mono组装类通常实现抽象类InternalMonoOperator<I, O>,构造函数传入一个Mono<I>,得到一个新的O型序列。实现subscribeOrReturn方法将O型订阅转化为原I型订阅者,新的I型订阅者实现了基于O性订阅者之上的强化操作。Mono提供了大量InternalMonoOperator<I,O>的实现类。下面对MonoFilter进行分析,解释了如果创建基于InternalMonoOperator实现的装配类和使用方法。

MonoFilter

将原Mono上增加一个过滤Predicate函数,当原Mono产生元素时,只有Predicate测试通过的元素才会传递给最终的订阅者,测试失败将进行过滤,Mono元素直接完成。

final class MonoFilter<T> extends InternalMonoOperator<T, T> {

         final Predicate<? super T> predicate;

         //构造函数必须包含源Mono,和其他附加增加元素,这里是一个Predicate函数

         MonoFilter(Mono<? extends T> source, Predicate<? super T> predicate) {

                  super(source);

                  this.predicate = Objects.requireNonNull(predicate, "predicate");

         }

         /**

         * 实现subscribeOrReturn,接收新Mono类型的订阅者,返回原Mono类型的订阅者。

         * 新的订阅者实现订阅时装配的目的,这里只有通过Predicate函数测试的元素,才会

         * 调用actual.onNext(T)方法推送给最终的订阅者

         **/

         @Override

         @SuppressWarnings("unchecked")

         public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {

                  if (actual instanceof ConditionalSubscriber) {

                          return new FluxFilter.FilterConditionalSubscriber<>((ConditionalSubscriber<? super T>) actual, predicate);

                  }

                  return new FluxFilter.FilterSubscriber<>(actual, predicate);

         }

    ......

}

Mono内置了大量的InternalMonoOperator实现类,如MonoFilter,但Reactor框架并不对外暴露这些类,(这些实现类都是包内可见的),而是通过Mono方法的形式去方便获取各个可实现类的对象,并且统一以Mono类型的对外暴露。抽象统一的Mono使用范式比起暴露各种各样的实现细节显得简洁清晰。

我们可以使用Mono内置的InternalMonoOperator实现类,也可以实现自己的InternalMonoOperator类,但应和Reactor框架保持统一的用法, 在Mono的使用上统一以Mono类型和协议进行操作,不对外暴露具体的实现细节。

Mono 提供的装配方法

       Reactor框架并不暴露具体的装配类细节,而是提供了大量静态或实例方法来对Mono进行装配,返回装配后的新Mono。如上节所述的MonoFilter使用方法如下:

Mono.just(2).filter( (v -> v % 2 != 0)).subscribe(i -> System.out.println(i),

                error -> System.err.println("Error: " + error),

                ()-> System.out.println("complete"));

Mono filter方法返回了一个可以对原序列元素进行检测的增强Mono,上述例子因Mono.just(2) 中的元素值2 无法通过(v -> v % 2 != 0)的测试,将被过滤掉,无法传给最终的订阅者,而只能接受到原序列的结束信号, 因此只会打印“complete“。

Filter方法显示实际是返回的MonoFilter对象。

public final Mono<T> filter(final Predicate<? super T> tester) {

         ……

         return onAssembly(new MonoFilter<>(this, tester));

}

其他Mono装配方法:

  • Mono<Tuple2<T1, T2>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2)

:将一个T1类型元素的Mono和一个T2类型元素的Mono中的元素组合成一个Tuple2<T1,T2>元素的Mono. Mono还提供了zip的多种版本,满足各种情况的Mono组合模式。

  • public final Mono<T> timeout(Duration timeout): 当原序列产生一个T类型元素后,如果没有在指定的时间内完成,则将触发一个错误。如果在限期内完成则没有任何影响,该实现使用了MonoTimeout<T, U, V> extends InternalMonoOperator<T, T>。
  • doOnXXXX系列方法,如doOnCancel,  doOnNext, doOnError等, 返回在特定事件上加入行为的增强Mono。

更多Mono的装配方法详见Mono API。

Mono的使用

Mono的使用其实只有一种就是对Mono进行订阅, 但是Mono类也提供了其他传统的接口来进行Mono的使用。

Mono的订阅

订阅Mono很简单,调用Mono对象的subscribe方法,传入一个CoreSubscriber的实现对象即可。

Mono.subscribe.源码中展示了对Mono装配后的复合Mono进行订阅的处理逻辑。

public final void subscribe(Subscriber<? super T> actual) {

    //获取最后一个装配的Mono corePublisher

         CorePublisher publisher = Operators.onLastAssembly(this);

         CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);

         ......

             //如果最后一个装配的publisher 实现了OptmizableOperator接口,一路组装

             //增强的Subscriber,按照循序后去下一个OptmizableOperator

                  if (publisher instanceof OptimizableOperator) {

                          OptimizableOperator operator = (OptimizableOperator) publisher;

                          while (true) {

                                   subscriber = operator.subscribeOrReturn(subscriber);

                                   if (subscriber == null) {

                                            return;

                                   }

                                   OptimizableOperator newSource = operator.nextOptimizableSource();

                                   if (newSource == null) {

                                            publisher = operator.source();

                                            break;

                                   }

                                   operator = newSource;

                          }

                  }

             //直到最底层的CorePublisher,使用最终转换所得的subscriber进行订阅,

             //原始序列产生的序号,将在一些列增强subscriber的增强下,或丢弃、或加工后传给

             //实际的订阅者

                  publisher.subscribe(subscriber);

}

Mono的简化使用

       Mono 提供了一些方法简化Mono的订阅操作,如block() 阻塞当前线程知道Mono序列返回元素或完成/异常信号

PublishOn和SubscribeOn

       publishOn 和 SubscribeOn 传入Scheduler对象,将Mono的行为交由Scheduler的现成执行。其中publishOn调用之后的序列行为在新的执行线程执行,而SubscribeOn则是整个序列的执行都在新的现成中执行。

final Flux<String> flux = Flux

    .range(1, 2)

    .map(i -> 10 + i) 

    .publishOn(s) 

.map(i -> "value " + i);

flux.subscribe(System.out::println)

final Flux<String> flux = Flux

    .range(1, 2)

    .map(i -> 10 + i) 

    .subscribeOn(s) 

    .map(i -> "value " + i);

flux.subscribe(System.out::println)

总结

       本文对Reactor的Mono编程进行了初步的介绍,体现了响应式编程的核心在于异步序列的构建(Mono/Flux)和订阅使用。 其中构建时对Mono/Flux的装配(Assembly)是整个编程模型的核心。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1181143.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【单链表】无头单项不循环(2)

目录 Test.c主函数 test5 test6 test7 test8 test9 Test.c总代码 SList.h头文件&函数声明 头文件 函数声明 SList.h总代码 SList.c函数实现 查询SLFind pos前面插入 pos后面插入 pos后面删除 pos删除 空间释放 SList.c总代码 今天链表。 Test.c主函…

力扣最热一百题——盛水最多的容器

终于又来了。我的算法记录的文章已经很久没有更新了。为什么呢&#xff1f; 这段时间都在更新有关python的文章&#xff0c;有对python感兴趣的朋友可以在主页找到。 但是这也并不是主要的原因 在10月5号我发布了我的第一篇博客&#xff0c;大家也可以看见我的每一篇算法博客…

『MySQL快速上手』-④-表的操作

文章目录 1.创建表2.查看表结构3.修改表4.删除表 1.创建表 语法格式如下&#xff1a; CREATE TABLE table_name ( field1 datatype, field2 datatype, field3 datatype ) character set 字符集 collate 校验规则 engine 存储引擎;说明&#xff1a; field 表示列名&#xff1…

javascript模块化之ESM

[[toc]] ESM是什么 个人理解是: EcmaScript Modules常说的 es modules常说的 es模块常说的 前端模块化demo1: 浏览器基本使用 <!-- 【1】 浏览器基本使用script 标签设置 type = module,浏览器就会以 ES modules 的标准去执行 JavaScript 代码。默认情况下,代码是以严格…

简化磁盘分区管理的 6 个分区管理器软件!

在计算机上存储和管理数据的方式对机器的性能起着至关重要的作用。对计算机硬盘进行分区是管理文件和确保系统高效运行的绝对必要步骤。 对硬盘进行分区涉及将其分成可用于存储数据的部分&#xff0c;使其更有条理和安全。但是&#xff0c;对硬盘进行分区可能是一个繁琐而复杂…

Redis中的Zset类型

目录 Zset的相关命令 zadd zrange zcard zcount zrevrange zrangebyscore zpopmax bzpopmax zpopmin和bzpopmin zrank zrevrank zscore zrem zremrangebyrank zremrangebyscore 操作集合间的命令 zinterstore和zunionstore 内部编码 Zset的应用场景 Zset表…

华为交换机镜像端口配置

目录 一、进入交换机&#xff08;进入web页面&#xff09; 1.配置vlan 二、配置镜像流量 1.配置观察口 2.配置镜像端口 3.结果展示 4.关闭接口后的效果 三、镜像流量的删除 1.删除镜像流量接口 2.删除镜像监听口 一、进入交换机&#xff08;进入web页面&#xff09; …

全网最牛,Python接口自动化测试实战干货-项目接口案例,看这篇足够...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 1、单元测试主要做…

pixhawk接深度计

环境 pixhawk 2.4.8 深度计MS5837 ardusub 4.1.1 mission planner 1.3.80 QGroundCpntrol 4.2.8 物理连接 pixhawk的IIC接口接个IIC扩展板&#xff0c;可扩展出4个接口&#xff0c;然后深度计的接头直接插入任意一个IIC扩展口即可 过程 在mission planner上往pixahwk中安装…

“舞”动金鸡 | 安全狗连续5年零失误守护金鸡奖颁奖典礼安全

11月4日&#xff0c;第36届中国电影金鸡奖颁奖典礼暨2023年中国金鸡百花电影节闭幕式在厦门圆满落幕。 作为国内云原生安全领导厂商&#xff0c;安全狗再一次收到客户委托&#xff0c;为其金鸡期间的相关宣传窗口、网页和系统的网络安全全程护航。 厦门服云信息科技有限公司&am…

threejs (二) 相机

正交相机 const camera new THREE.OrthographicCamera(-aspect,aspect,aspect,-aspect,0.1, //进平面1000 //远平面); // 透视相机创建相机辅助线 const cameraHelper new THREE.CameraHelper(this.camera);创建一个透视相机观察正交相机 // 创建透视相机const watchCamera …

什么是客户体验 (CX)?如何改善客户体验?

客户体验&#xff08;CX&#xff09;就是客户在与某个公司、产品或服务互动时所感受到的整体体验。这包括客户的感觉、情感和意见&#xff0c;无论是购物、与客服互动、使用产品还是与品牌互动。CX就像客户在与一家公司或产品互动时的"情感指南"&#xff0c;可以是积…

使用c++解压rar文件,基于UnRAR64,非命令行

最近项目需要解压缩rar文件&#xff0c;我们都知道rar是闭源收费软件&#xff0c;如果直接采用命令行可能会有限制&#xff0c;或者盗版问题&#xff0c;使用正版的winrar命令行解压rar文件是否有限制&#xff0c;这个我没来得及测试&#xff0c;但是从交互体验上来说&#xff…

深兰科技科研团队6篇论文被国际医学信息科学顶尖学术会议收录

近日&#xff0c;深兰科技科学院智能科学首席科学家黄智生教授及其所带领的科研团队与同济大学团队&#xff0c;北京工业大学团队等合作&#xff0c;在国际医学信息科学顶尖学术会议“HIS 2023”上接连发表了六篇论文(其中有两篇论文的第一作者是黄教授本人)。 10月下旬&#x…

太坑了,降低 代码可读性的 12 个技巧

工作六七年以来&#xff0c;接手过无数个烂摊子&#xff0c;屎山雕花、开关编程已经成为常态。 下面细数一下 降低代码可读性&#xff0c;增加维护难度的 12 个编码“技巧”。 假设一个叫”二狗“ 的程序员&#xff0c;喜欢做以下事情。 1. 二狗积极拆分微服务&#xff0c;一个…

软文推广如何提高点击率,媒介盒子告诉你

随着软文推广的广泛应用&#xff0c;对于广告主来说&#xff0c;提高点击率已经成为一项关键任务&#xff0c;点击率直接影响广告主的投资回报率&#xff0c;今天媒介盒子将从两大方面给出软文推广如何提高点击率的建议&#xff1a; 一、 优化广告文案 广告文案是影响点击率的…

[动态规划] (十二) 简单多状态 LeetCode 213.打家劫舍II

[动态规划] (十二) 简单多状态: LeetCode 213.打家劫舍II 文章目录 [动态规划] (十二) 简单多状态: LeetCode 213.打家劫舍II题目解析解题思路状态表示状态转移方程初始化和填表顺序返回值提醒 代码实现总结 213. 打家劫舍 II 题目解析 本题是对打家劫舍和按摩师的升级题型&am…