Reactor响应式流的核心机制——背压机制

news2025/1/12 13:35:43

响应式流是什么?

       响应式流旨在为无阻塞异步流处理提供一个标准。它旨在解决处理元素流的问题——如何将元素流从发布者传递到订阅者,而不需要发布者阻塞,或订阅者有无限制的缓冲区或丢弃。 

       响应式流模型存在两种基本的实现机制。一种就是传统开发模式下的“拉”模式,即消费者主动从生产者拉取元素;而另一种就是“推”模式,在这种模式下,生产者将元素推送给消费者。相较于“拉”模式,“推”模式下的数据处理的资源利用率更好,下图所示的就是一种典型的推模式处理流程。

       数据流的生产者会持续地生成数据并推送给消费者。这里就引出了流量控制问题,即如果数据的生产者和消费者处理数据的速度是不一致的,我们应该如何确保系统的稳定性呢?

流量控制

生产者生产数据的速率小于消费者的场景

     这种场景对于消费者来说没啥压力,正常消费就好了,这里也就不需要所谓的流量控制了。

生产者生产数据的速率大于消费者的场景

     生产者生产数据的速率大于消费者的场景,应该是我们业务中经常遇到的场景了,这种场景由于消费者处理不过来导致崩溃,业界通常的做法是在生产者与消费者之间加一个队列做缓冲。我们知道队列具有存储与转发的功能,所以可以用它来进行一定的流量控制。

如何对于流量进行很好的控制?这就转变到了如何设计好一个队列了,目前 Java 业界主流的队列有以下三种:

无界队列 :

界队列在原则上是拥有无线大小容量的队列,可以存放生产者产生的所有消息。

  • 优势:确保消费者消费到所有的数据

  • 劣势:系统的回弹性降低,任何一个系统不可能拥有无限的资源,一旦内存等资源耗尽,系统就可能会有崩溃的风险。

有界丢弃队列 :

       为了避免上面无界队列的弊端,有界丢弃队列采用的是如果队列满了,就会采用丢弃后面传入的值,这里可以设置一些丢弃策略,比如说按照优先级或先进先出等。

  • 优势:考虑到资源的限制,适合允许丢消息的业务场景。

  • 劣势:消息重要性很高的场景不建议采取这种队列

有界阻塞队列: 

        像一些支付金融级别的场景,是不允许丢数据的,所以我们引出有界阻塞队列,我们会在队列消息数量达到上限后阻塞生产者,而不是直接丢弃消息。

  • 优势:解决了不允许丢数据的业务场景

  • 劣势:当队列满了的时候,会阻塞生产者停止生产数据,这种场景不可能实现异步操作的。

所以,无论从回弹性、弹性还是即时响应性出发,上述的队列都不是响应式流的上佳解决办法。

背压机制 

       上面说的那几种队列纯“推”模式下的数据流量会有很多不可控制的因素,并不能直接应用,而是需要在“推”模式和“拉”模式之间考虑一定的平衡性,从而优雅地实现流量控制。这就需要引出响应式系统中非常重要的一个概念——背压机制(Backpressure)。 

       什么是背压?简单来说就是下游能够向上游反馈流量请求的机制。通过前面的分析,我们知道如果消费者消费数据的速度赶不上生产者生产数据的速度时,它就会持续消耗系统的资源,直到这些资源被消耗殆尽。

      这个时候,就需要有一种机制使得消费者可以根据自身当前的处理能力通知生产者来调整生产数据的速度,这种机制就是背压。采用背压机制,消费者会根据自身的处理能力来请求数据,而生产者也会根据消费者的能力来生产数据,从而在两者之间达成一种动态的平衡,确保系统的即时响应性。

响应式流规范

   有了背压机制,我们再来看下响应式流是如何基于这种机制去设计的一套规范,规范详情请参考:Reactive Streams

Java API 的响应式流只定义了四个核心接口:

  • Publisher<T>

  • Subscriber<T>

  • Subscription

  • Processor<T,R>

publisher<`T`>

Publisher 代表的就是一种可以生产无限数据的发布者,接口如下:

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

 可以看到,Publisher 里的 subscribe 方法传入的是 Subscriber 接口,其实这里用的是回调,Publisher 根据收到的请求向当前订阅者 Subscriber 发送元素。

Subscriber<`T`>

Subscriber 代表的是一种可以从发布者那里订阅并接收元素的订阅者,接口如下:

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

        Subscriber 接口定义的这组方法构成了数据流请求和处理的基本流程,其中,onSubscribe() 从命名上看就是一个回调方法,当发布者的 subscribe() 方法被调用时就会触发这个回调。而在该方法中有一个参数 Subscription,可以把这个 Subscription 看作是一种用于订阅的上下文对象。Subscription 对象中包含了这次回调中订阅者想要向发布者请求的数据个数。

       当订阅关系已经建立,那么发布者就可以调用订阅者的 onNext() 方法向订阅者发送一个数据。这个过程是持续不断的,直到所发送的数据已经达到 Subscription 对象中所请求的数据个数。这时候 onComplete() 方法就会被触发,代表这个数据流已经全部发送结束。而一旦在这个过程中出现了异常,那么就会触发 onError() 方法,我们可以通过这个方法捕获到具体的异常信息进行处理,而数据流也就自动终止了。 

Subscription

Subscription 代表的就是一种订阅上下文对象,它在订阅者和发布者之间进行传输,从而在两者之间形成一种契约关系,接口如下:

public interface Subscription {
    public void request(long n);
    public void cancel();
}

这里的 request() 方法用于请求 n 个元素,订阅者可以通过不断调用该方法来向发布者请求数据;而 cancel() 方法显然是用来取消这次订阅。请注意,Subscription 对象是确保生产者和消费者针对数据处理速度达成一种动态平衡的基础,也是流量控制中实现背压机制的关键所在

 Processor<`T,R`> 

Processor 代表的就是订阅者和发布者的处理阶段,Processor 接口继承了 Publisher 和 Subscriber 接口。它用于转换发布者——订阅者管道中的元素。Processor订阅类型 T 的数据元素,接收并转换为类型 R 的数据,并发布变换后的数据。接口如下:

public interface Processor<T,R> extends Subscriber<T>, Publisher<R> {}

图显示了处理者在发布者——订阅和管道中作为转换器的作用,可以拥有多个处理者。

总结 

  • 响应式流规范定义的很简洁,但实现起来并不简单,发布者和订阅者之间的所有交互的异步性质以及背压机制使得实现变得复杂。

  • 响应式流规范非常灵活,还可以提供独立的“推”模型和“拉”模型。如果为了实现纯“推”模型,我们可以考虑一次请求足够多的元素;而对于纯“拉”模型,相当于就是在每次调用 Subscriber 的 onNext() 方法时只请求一个新元素。

  • JDK 9 中提供了 Flow 响应式流接口,与响应式流兼容的接口,可以看得出,JDK 团队后续的发展趋势也是想往响应式流这块靠近。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/402877.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【OpenAI 多模态预训练】VideoGPT?微软透露GPT-4或将在下周发布

【多模态预训练】VideoGPT&#xff1f;微软透露GPT-4或将在下周发布 先让我猜个名字&#xff0c;VideoGPT&#xff1f; 太绝了&#xff01;看完ChatGPT之后就感觉OpenAI正在做多模态的预训练语言模型。万万没想到来的这么快。据介绍&#xff0c;GPT-4或将为多模态大模型&#…

redis经典五种数据类型及底层实现

目录一、Redis源代码的核心部分1.redis源码在哪里2.src源码包下面该如何看&#xff1f;二、我们平时说redis是字典数据库KV键值对到底是什么1.6大类型说明(粗分)2.6大类型说明3.上帝视角4.Redis定义了redisObject结构体4.1 C语言struct结构体语法简介4.2 字典、KV是什么4.3 red…

jsp毕业答辩管理系统Myeclipse开发mysql数据库web结构java编程计算机网页项目

一、源码特点 JSP 毕业答辩管理系统是一套完善的java web信息管理系统&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为TOMCAT7.0,Myeclipse8.5开发&#xff0c;数据库为Mysql5.0&…

Spring Security基础入门

基础概念 什么是认证 认证&#xff1a;用户认证就是判断一个用户的身份身份合法的过程&#xff0c;用户去访问系统资源的时候系统要求验证用户的身份信息&#xff0c;身份合法方可继续访问&#xff0c;不合法则拒绝访问。常见的用户身份认证方式有&#xff1a;用户密码登录&am…

基于java的网络选课商城项目部署

前言&#xff1a;相信看到这篇文章的小伙伴都或多或少有一些编程基础&#xff0c;懂得一些linux的基本命令了吧&#xff0c;本篇文章将带领大家服务器如何部署一个使用django框架开发的一个网站进行云服务器端的部署。 文章使用到的的工具 Python&#xff1a;一种编程语言&…

SAP 生产订单收货入库Goods Receipt

Goods Receipt: 收货这块比较简单&#xff0c;当我们做完报工之后&#xff0c;成品就可以入库了。 那么收货完了&#xff0c;到底会有什么样的影响呢&#xff1f; 会产生物料凭证以及会计凭证&#xff0c;但是若订单中勾选”GR非股价的“&#xff0c;则不会有价值。 这里我们需…

E1. Unforgivable Curse (easy version) #855 div3

ps&#xff1a;很久没有更新啦&#xff0c;之前一直在复习准备期末考试&#xff0c;也没怎么写题。现在考完要恢复训练啦 Problem - E1 - Codeforces 题意&#xff1a; 两个字符串s和t&#xff0c;在s中任意两个间隔为k或者k1的字母可以进行任意次的交换&#xff0c;问你可不…

STL源码剖析(1) - 空间配置器与内存操作详解

文章首发于&#xff1a;My Blog 欢迎大佬们前来逛逛1. SGI空间配置器SGI STL的空间配置器是 alloc而非allocator&#xff0c;并且不接受任何参数&#xff1a;vector<int,std::alloc> vec我们通常使用缺省的空间配置器&#xff1a;template <typename T,typename Alloc…

mac 安装python、pip、weditor

问题现象&#xff1a;执行 python3 -m weditor 报错 ➜ ~ python3 -m weditor dyld[42143]: dyld cache (null) not loaded: syscall to map cache into shared region failed dyld[42143]: Library not loaded: /System/Library/Frameworks/CoreFoundation.framework/Versio…

【前端vue2面试题】2023前端最新版vue2模块,高频24问

​ &#x1f973;博 主&#xff1a;初映CY的前说(前端领域) &#x1f31e;个人信条&#xff1a;想要变成得到&#xff0c;中间还有做到&#xff01; &#x1f918;本文核心&#xff1a;博主收集的关于vue2面试题 目录 vue2面试题 1、$route 和 $router的区别 2、一个.v…

Redis高频面试题汇总(上)

目录 1.什么是Redis? 2.为什么Redis这么快 3.分布式缓存常见的技术选型方案有哪些&#xff1f; 4.你知道 Redis 和 Memcached 的区别吗&#xff1f; 5.Redis使用场景有哪些 6.Redis 常用的数据结构有哪些&#xff1f; 7.Redis 数据类型有哪些底层数据结构&#xff1f; …

sonarqube指标详解

最近公司引入了sonar&#xff0c;作为代码质量检测工具&#xff0c;以期提高研发同学的代码质量&#xff0c;但是结果出来后&#xff0c;有些同学不清楚相应的指标内容&#xff0c;不知道应该重点关注哪些指标&#xff0c;于是查询了一下相关的资料&#xff0c;加以总结同时也分…

【数据结构】堆排序

堆是一种叫做完全二叉树的数据结构&#xff0c;可以分为大根堆&#xff0c;小根堆&#xff0c;而堆排序就是基于这种结构而产生的一种程序算法。大堆&#xff1a;每个节点的值都大于或者等于他的左右孩子节点的值小堆&#xff1a;每个结点的值都小于或等于其左孩子和右孩子结点…

扬帆优配|业务量大突破,这个行业发展明显向好

近期上市的新股&#xff0c;大都在招股阐明书里公布了本年第一季度成绩预告。 我国快递事务量本年已达200亿件 国家邮政局监测数据显现&#xff0c;到3月8日&#xff0c;本年我国快递事务量已到达200.9亿件&#xff0c;比2019年到达200亿件提前了72天&#xff0c;比2022年提前…

goland开发环境搭建及运行第一个go程序HelloWorld

1、下载和安装golang 点击进入下载页面 下载好安装包&#xff0c;点击安装。 我之前安装过低版本的安装包&#xff0c;所以这里提示要先卸载已经安装过的低版本的。 同意协议&#xff0c;继续安装。 默认安装的文件夹为C盘&#xff0c;建议更改&#xff0c;我这里更改为D盘…

YOLOv5训练大规模的遥感实例分割数据集 iSAID从切图到数据集制作及训练

最近想训练遥感实例分割&#xff0c;纵观博客发现较少相关 iSAID数据集的切分及数据集转换内容&#xff0c;思来想去应该在繁忙之中抽出时间写个详细的教程。 iSAID数据集下载 iSAID数据集链接 下载上述数据集。 百度网盘中的train和val中包含了实例和语义分割标签。 上述…

哪些职业适合创业?学习哪些技能可以自己创业?

创意行业&#xff1a;创意行业包括广告、设计、影视等领域&#xff0c;需要创新思维和创意能力&#xff0c;适合创业。学习创意思维、平面设计、影视制作等技能可以自己创业。 科技行业&#xff1a;科技行业包括互联网、人工智能、物联网等领域&#xff0c;需要技术能力和创新思…

基于JavaEE开发博客系统项目开发与设计(附源码)

文章目录1.项目介绍2.项目模块3.项目效果1.项目介绍 这是一个基于JavaEE开发的一个博客系统。实现了博客的基本功能&#xff0c;前台页面可以进行文章浏览&#xff0c;关键词搜索&#xff0c;登录注册&#xff1b;登陆后支持对文章进行感谢、评论&#xff1b;然后还可以对评论…

[网络工程师]-网络规划与设计-逻辑网络设计(二)

3、广域网技术选择 3.1广域网互连技术 3.1.1 数字数据网络 数字数据网络(Digital Data Network,DDN)是一种利用数字信道提供数据信号传输的数据传输网,是一个半永久性连接电路的公共数字数据传输网络,为用户提供了一个高质量、高带宽的数字传输通道。 利用DDN网络实现局…

【C++】7.string

1.标准库的string类 string是表示字符串的字符串类在使用string类时&#xff0c;必须包含#include头文件以及using namespace std;string类是使用char(即作为它的字符类型&#xff0c;使用它的默认char_traits和分配器类型(关于模板的更多信息&#xff0c;请参阅basic_string)…