步入响应式编程篇(二)之Reactor API

news2025/1/26 14:48:06

步入响应式编程篇(二)之Reactor API

  • 前言
  • 回顾响应式编程
  • Reactor API的使用
    • Stream
    • 引入依赖
    • Reactor API的使用
      • 流源头的创建
    • reactor api的背压模式
    • 发布者与订阅者使用的线程
      • 查看弹珠图
      • 查看形成新流的日志

前言

对于响应式编程的基于概念,以及JDK自带的落地实现,可以查看步入响应式编程篇(一)
本篇将介绍Reactor API的使用:

reactor官网介绍,响应式编程是一种与数据流和变化传播相关的异步编程范式。这意味着可以通过所采用的编程语言轻松表达静态(例如数组)或动态(例如时间发射器)数据流;

对比Flow api以及completableFuture,前者编写代码比较麻烦,编写处理器还要自定义一个类,后者还不能满足响应式编程,两者都有其局限性,而Reactor API是基于Stream流操作的,无论是编写还是响应式编程都能满足;

回顾响应式编程

①在面向对象语言中,反应式编程范式通常作为 观察者设计模式的扩展。还可以比较主要的反应流模式与熟悉的迭代器设计模式,因为 Iterable- 所有这些库中的迭代器对。一个主要的区别是,虽然Iterator是基于拉的,但反应流是基于推的。

②使用迭代器是一种命令式编程模式,即使访问值的方法完全由Iterable负责。实际上,由开发人员来选择何时访问序列中的下一个()项。在反应式流中,上述对的等价物是发布者-订阅者。但它是 发布者在新的可用值到来时通知订阅者,这种推送方面是响应的关键。此外,应用于推送值的操作是以声明方式而不是命令方式表达的:程序员表达计算的逻辑,而不是描述其确切的控制流。

③除了推送值之外,还以定义良好的方式涵盖了错误处理和完成方面。发布者可以将新值推送到其订阅者(通过调用onNext),但也可以发出错误信号(通过调用onError)或完成信号(通过调用onComplete)。错误和完成都会终止序列。这可以归纳如下:

onNext x 0…N [onError | onComplete]

这种方法非常灵活。该模式支持没有值、只有一个值或有n个值的用例(包括无限序列的值,例如时钟的连续滴答声)。

Reactor API的使用

Stream

用的就是Java 8引用的Stream API,使用Stream api时也会结合lambda表达式和函数式接口,所以Stream api是包括它们的;
虽然是众所周知,但笔者在此还是提一嘴,Stream API的链式调用,是每个元素完成整个流所有步骤的处理后,再遍历下一个元素的。而不是所有元素完成后,再让流中的下一个步骤处理全部元素,这是一不小心就会陷入的理解误区;

还有诸如flatMap()、map()等参数为函数式接口的种类,总共有四类
//Predicate 有入参且返回值固定为boolean
//Consumer 有入参无返回值
//Function 有一个入参且一个返回值
//Supplier 无入参且有返回值

引入依赖

需引入Reactor API的依赖

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-bom</artifactId>
    <version>2023.0.0</version>
</dependency>

Reactor API的使用

Reactor API的使用是定义流源头,然后往下流,使用链式处理多个步骤,最后流向最后,就是异常或响应信号。

流源头的创建

创建流的方式有Flux和Mono两种,前者是用于创建多个元素的流,后者是创建只有一个元素的流;
在这里插入图片描述

这就分别创建了Flux流和Mono流,很轻易实现了一个全异步的系统。相比Flow流的一顿操作,是不是简便了很多;

而创建流,需指定流的源头,有四种方式
①just(),如上例,穷举出所有元素;
②rang();

//表示流有1到100个元素
Flux.range(1,100);

③generate();
这适用于同步和逐个发射,这意味着接收器是一个SynchronousSink,并且它的next()方法在每次回调调用中最多只能被调用一次。

//入参是泛型为SynchronousSink的Consumer接口
Flux.generate(sink->{
   //查库存
   sink.next(queryStock("1"));
   sink.complete();
}).subscribe(System.out::println);


//但同一次回调中循环调用next()会报错,所以先初始化state值为0
Flux.generate(()->0,(state,sink)->{
            if (state<=10) {
            	//小于10,则继续调用,里面的逻辑是每次将state+1
                sink.next(state);
            }else{
                sink.complete();
            }
            return state+1;
        }).subscribe(System.out::println);
       

④create方式,create是一种更高级的Flux程序化创建形式,它适合于每轮多个发射,甚至来自多个线程。create无需指定初始值而且可以多次执行sink#next():

Flux.create(sink->{
    for (int i=0;i<2;i++) {
        sink.next(queryStock("1"));
    }
    sink.complete();
}).subscribe(System.out::println);

在业务上通常使用后两种,例如可以从缓存或DB查到数据,然后类似于观察者模式,往后推送处理;更多使用create,因为generate需设置初始值,而且每次回调只能调用一次sink#next()

reactor api的背压模式

使用背压模式的原因——响应式编程的思想,主要在于推,推送到每个处理器操作,如果不管当前处理能力就很容易处理出问题,类似于MQ消费者控制从队列中获取数据量,要控制消费能力,于是引出背压模式,接收时通过request告诉上游,所以在Reactor中实现背压时,消费者压力传播回源的方式是向上游操作员发送请求。当前请求的总和有时被称为当前“需求”或“待决请求”。需求的上限为Long.MAX_VALUE,表示一个无限的请求(意思是“尽可能快地生产”-基本上禁用反压)。
如下,与Flow API类似,也是在自定义订阅者中指定背压请求request()

Flux.range(1, 10)
    .doOnRequest(r -> System.out.println("request of " + r))
    .subscribe(new BaseSubscriber<Integer>() {

      @Override
      public void hookOnSubscribe(Subscription subscription) {
        request(1);
      }

      @Override
      public void hookOnNext(Integer integer) {
        System.out.println("Cancelling after having received " + integer);
        cancel();
      }
    });

发布者与订阅者使用的线程

查看弹珠图

Flux.create(sink->{
    for (int i=0;i<2;i++) {
        sink.next(i*queryStock("1"));
    }
    //代表推送给下一个操作形成新流
    sink.complete();
    //过滤不等于0的数
}).filter(Predicate.not(Predicate.isEqual(0))).subscribe(System.out::println);

鼠标点到filter操作,就可以看到元素(这些不同形状的方块就是了)经过filter后,变少了,点到每个操作map、flatmap等都可以看到对应的弹珠图,可根据弹珠图快速理解形成新流的操作
在这里插入图片描述

查看形成新流的日志

在这里插入图片描述
在每个流后面使用log(),查看该流形成每个元素的日志,如都是request无限元素,然后create后,就调onNext(0),形成元素0,后面filter就过滤了0,就没有再调onNext(0),继续create元素1时,就调 onNext(1),形成元素1,后面filter后,形成的新流中仍然有元素1,所以还会调用onNext(1),以此类推。

默认情况下,发布者使用的线程就是订阅者的线程,那就证明一下:
如上用log()打印出,订阅者和发布者使用的线程都是main,那如果订阅者开启新的线程,下图也能看到发布者回调onNext操作也是使用订阅者的线程:

在这里插入图片描述

下图,使用publishOn()里指定新线程Schedulers.single(),代表过滤后,发布者使用新线程会回调onNext()
在这里插入图片描述
图中①是filter之前,发布者还是用的订阅者的线程回调onNext,②是filter之后,发布者使用新的线程回调onNext()。

总之,上面只是介绍reactor api操作的冰山一角,至于更多操作可以查看官网哈

如有需要收藏的看官,顺便也用发财的小手点点赞哈,如有错漏,也欢迎各位在评论区评论!

参考官网:https://projectreactor.io/docs/core/release/reference/aboutDoc.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2283491.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

利用Redis实现数据缓存

目录 1 为啥要缓存捏&#xff1f; 2 基本流程&#xff08;以查询商铺信息为例&#xff09; 3 实现数据库与缓存双写一致 3.1 内存淘汰 3.2 超时剔除&#xff08;半自动&#xff09; 3.3 主动更新&#xff08;手动&#xff09; 3.3.1 双写方案 3.3.2 读写穿透方案 3.3.…

【动态规划】--- 斐波那契数模型

Welcome to 9ilks Code World (๑•́ ₃ •̀๑) 个人主页: 9ilk (๑•́ ₃ •̀๑) 文章专栏&#xff1a; 算法Journey &#x1f3e0; 第N个泰波那契数模型 &#x1f4cc; 题目解析 第N个泰波那契数 题目要求的是泰波那契数&#xff0c;并非斐波那契数。 &…

php-phar打包避坑指南2025

有很多php脚本工具都是打包成phar形式&#xff0c;使用起来就很方便&#xff0c;那么如何自己做一个呢&#xff1f;也找了很多文档&#xff0c;也遇到很多坑&#xff0c;这里就来总结一下 phar安装 现在直接装yum php-cli包就有phar文件&#xff0c;很方便 可通过phar help查看…

java提取系统应用的日志中的sql获取表之间的关系

为了获取到对应的sql数据&#xff0c;分了三步骤 第一步&#xff0c;获取日志文件&#xff0c;解析日志文件中的查询sql&#xff0c;递归解析sql&#xff0c;获取表关系集合 递归解析sql&#xff0c;获取表与表之间的关系 输出得到的对应关联关系数据 第二步&#xff0c;根据获…

PyQt6医疗多模态大语言模型(MLLM)实用系统框架构建初探(下.代码部分)

医疗 MLLM 框架编程实现 本医疗 MLLM 框架结合 Python 与 PyQt6 构建,旨在实现多模态医疗数据融合分析并提供可视化界面。下面从数据预处理、模型构建与训练、可视化界面开发、模型 - 界面通信与部署这几个关键部分详细介绍编程实现。 6.1 数据预处理 在医疗 MLLM 框架中,多…

IMX6ull项目环境配置

文件解压缩&#xff1a; .tar.gz 格式解压为 tar -zxvf .tar.bz2 格式解压为 tar -jxvf 2.4版本后的U-boot.bin移植进SD卡后&#xff0c;通过串口启动配置开发板和虚拟机网络。 setenv ipaddr 192.168.2.230 setenv ethaddr 00:04:9f:…

Gradle buildSrc模块详解:集中管理构建逻辑的利器

文章目录 buildSrc模块二 buildSrc的使命三 如何使用buildSrc1. 创建目录结构2. 配置buildSrc的构建脚本3. 编写共享逻辑4. 在模块中引用 四 典型使用场景1. 统一依赖版本管理2. 自定义Gradle任务 3. 封装通用插件4. 扩展Gradle API 五 注意事项六 与复合构建&#xff08;Compo…

六、深入了解DI

依赖注入是⼀个过程&#xff0c;是指IoC容器在创建Bean时,去提供运⾏时所依赖的资源&#xff0c;⽽资源指的就是对象. 在上⾯程序案例中&#xff0c;我们使⽤了 Autowired 这个注解&#xff0c;完成了依赖注⼊的操作. 简单来说,就是把对象取出来放到某个类的属性中。 关于依赖注…

【论文阅读】HumanPlus: Humanoid Shadowing and Imitation from Humans

作者&#xff1a;Zipeng Fu、Qingqing Zhao、Qi Wu、Gordon Wetstein、Chelsea Finn 项目共同负责人&#xff0c;斯坦福大学 项目网址&#xff1a;https://humanoid-ai.github.io 摘要 制造外形与人类相似的机器人的一个关键理由是&#xff0c;我们可以利用大量的人类数据进行…

第25篇 基于ARM A9处理器用C语言实现中断<一>

Q&#xff1a;怎样理解基于ARM A9处理器用C语言实现中断的过程呢&#xff1f; A&#xff1a;同样以一段使用C语言实现中断的主程序为例介绍&#xff0c;和汇编语言实现中断一样这段代码也使用了定时器中断和按键中断。执行该主程序会在DE1-SoC的红色LED上显示流水灯&#xf…

Spring WebSocket 与 STOMP 协议结合实现私聊私信功能

目录 后端pom.xmlConfig配置类Controller类DTO 前端安装相关依赖websocketService.js接口javascripthtmlCSS 效果展示简单测试连接&#xff1a; 报错解决方法1、vue3 使用SockJS报错 ReferenceError: global is not defined 功能补充拓展1. 安全性和身份验证2. 异常处理3. 消息…

RabbitMQ5-死信队列

目录 死信的概念 死信的来源 死信实战 死信之TTl 死信之最大长度 死信之消息被拒 死信的概念 死信&#xff0c;顾名思义就是无法被消费的消息&#xff0c;一般来说&#xff0c;producer 将消息投递到 broker 或直接到queue 里了&#xff0c;consumer 从 queue 取出消息进…

[JavaScript] 面向对象编程

JavaScript 是一种多范式语言&#xff0c;既支持函数式编程&#xff0c;也支持面向对象编程。在 ES6 引入 class 语法后&#xff0c;面向对象编程在 JavaScript 中变得更加易于理解和使用。以下将详细讲解 JavaScript 中的类&#xff08;class&#xff09;、构造函数&#xff0…

Windows上通过Git Bash激活Anaconda

在Windows上配置完Anaconda后&#xff0c;普遍通过Anaconda Prompt激活虚拟环境并执行Python&#xff0c;如下图所示&#xff1a; 有时需要连续执行多个python脚本时&#xff0c;直接在Anaconda Prompt下可以通过在以下方式&#xff0c;即命令间通过&&连接&#xff0c;…

主机监控软件WGCLOUD使用指南 - 如何设置主题背景色

WGCLOUD运维监控系统&#xff0c;从v3.5.7版本开始支持设置不同的主题背景色&#xff0c;如下 更多主题查看说明 如何设置主题背景色 - WGCLOUD

C语言教程——文件处理(2)

目录 前言 一、顺序读写函数&#xff08;续&#xff09; 1.1fprintf 1.2fscanf 1.3fwrite 1.4fread 二、流和标准流 2.1流 2.2标准流 2.3示例 三、sscanf和sprintf 3.1sprintf 3.2sscanf 四、文件的随机读写 4.1fseek 4.2ftell 4.3rewind 五、文件读取结束的…

ios打包:uuid与udid

ios的uuid与udid混乱的网上信息 新人开发ios&#xff0c;发现uuid和udid在网上有很多帖子里是混淆的&#xff0c;比如百度下&#xff0c;就会说&#xff1a; 在iOS中使用UUID&#xff08;通用唯一识别码&#xff09;作为永久签名&#xff0c;通常是指生成一个唯一标识&#xf…

.NET9增强OpenAPI规范,不再内置swagger

ASP.NETCore in .NET 9.0 OpenAPI官方文档ASP.NET Core API 应用中的 OpenAPI 支持概述 | Microsoft Learnhttps://learn.microsoft.com/zh-cn/aspnet/core/fundamentals/openapi/overview?viewaspnetcore-9.0https://learn.microsoft.com/zh-cn/aspnet/core/fundamentals/ope…

hot100_234. 回文链表

给你一个单链表的头节点 head &#xff0c;请你判断该链表是否为回文链表。如果是&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 示例 1&#xff1a; 输入&#xff1a;head [1,2,2,1] 输出&#xff1a;true 示例 2&#xff1a; 输入&#xff1a;head …

【超详细】ELK实现日志采集(日志文件、springboot服务项目)进行实时日志采集上报

本文章介绍&#xff0c;Logstash进行自动采集服务器日志文件&#xff0c;并手把手教你如何在springboot项目中配置logstash进行日志自动上报与日志自定义格式输出给logstash。kibana如何进行配置索引模式&#xff0c;可以在kibana中看到采集到的日志 日志流程 logfile-> l…