响应式流的核心机制——背压机制

news2024/11/25 20:42:29

一、响应式流是什么?

Reactive Streams 是 2013 年底由 Netflix、Lightbend 和 Pivotal(Spring 背后的公司)的工程师发起的一项计划,响应式流旨在为无阻塞异步流处理提供一个标准。它旨在解决处理元素流的问题——如何将元素流从发布者传递到订阅者,而不需要发布者阻塞,或订阅者有无限制的缓冲区或丢弃。

响应式流模型存在两种基本的实现机制。一种就是传统开发模式下的“拉”模式,即消费者主动从生产者拉取元素;而另一种就是“推”模式,在这种模式下,生产者将元素推送给消费者。相较于“拉”模式,“推”模式下的数据处理的资源利用率更好,下图所示的就是一种典型的推模式处理流程。

在这里插入图片描述
上图中,数据流的生产者会持续地生成数据并推送给消费者。这里就引出了流量控制问题,即如果数据的生产者和消费者处理数据的速度是不一致的,我们应该如何确保系统的稳定性呢?

二、流量控制

2.1 生产者生产数据的速率小于消费者的场景

这种场景对于消费者来说没啥压力,正常消费就好了,这里也就不需要所谓的流量控制了。

2.2 生产者生产数据的速率大于消费者的场景

生产者生产数据的速率大于消费者的场景,应该是我们业务中经常遇到的场景了,这种场景由于消费者处理不过来导致崩溃,业界通常的做法是在生产者与消费者之间加一个队列做缓冲。我们知道队列具有存储与转发的功能,所以可以用它来进行一定的流量控制。

在这里插入图片描述
如何对于流量进行很好的控制?这就转变到了如何设计好一个队列了,目前 Java 业界主流的队列有以下三种:

2.2.1 无界队列

见名知意,无界队列在原则上是拥有无线大小容量的队列,可以存放生产者产生的所有消息。

在这里插入图片描述

  • 优势:确保消费者消费到所有的数据
  • 劣势:系统的回弹性降低,任何一个系统不可能拥有无限的资源,一旦内存等资源耗尽,系统就可能会有崩溃的风险。

2.2.2 有界丢弃队列

为了避免上面无界队列的弊端,有界丢弃队列采用的是如果队列满了,就会采用丢弃后面传入的值,这里可以设置一些丢弃策略,比如说按照优先级或先进先出等。

在这里插入图片描述

  • 优势:考虑到资源的限制,适合允许丢消息的业务场景。
  • 劣势:消息重要性很高的场景不建议采取这种队列

2.2.3 有界阻塞队列

像一些支付金融级别的场景,是不允许丢数据的,所以我们引出有界阻塞队列,我们会在队列消息数量达到上限后阻塞生产者,而不是直接丢弃消息。

在这里插入图片描述

  • 优势:解决了不允许丢数据的业务场景
  • 劣势:当队列满了的时候,会阻塞生产者停止生产数据,这种场景不可能实现异步操作的。

所以,无论从回弹性、弹性还是即时响应性出发,上述的队列都不是响应式流的上佳解决办法。

三、背压机制

上面说的那几种队列纯“推”模式下的数据流量会有很多不可控制的因素,并不能直接应用,而是需要在“推”模式和“拉”模式之间考虑一定的平衡性,从而优雅地实现流量控制。这就需要引出响应式系统中非常重要的一个概念——背压机制(Backpressure)。

什么是背压?简单来说就是下游能够向上游反馈流量请求的机制。通过前面的分析,我们知道如果消费者消费数据的速度赶不上生产者生产数据的速度时,它就会持续消耗系统的资源,直到这些资源被消耗殆尽。

这个时候,就需要有一种机制使得消费者可以根据自身当前的处理能力通知生产者来调整生产数据的速度,这种机制就是背压。采用背压机制,消费者会根据自身的处理能力来请求数据,而生产者也会根据消费者的能力来生产数据,从而在两者之间达成一种动态的平衡,确保系统的即时响应性。

四、响应式流规范

有了背压机制,我们再来看下响应式流是如何基于这种机制去设计的一套规范,规范详情请参考:Reactive Streams

Java API 的响应式流只定义了四个核心接口:

  • Publisher<T>
  • Subscriber<T>
  • Subscription
  • Processor<T,R>

4.1 Publisher<T>

Publisher 代表的就是一种可以生产无限数据的发布者,接口如下:

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

可以看到,Publisher 里的 subscribe 方法传入的是 Subscriber 接口,其实这里用的是回调,Publisher 根据收到的请求向当前订阅者 Subscriber 发送元素。

4.2 Subscriber<T>

Subscriber 代表的是一种可以从发布者那里订阅并接收元素的订阅者,接口如下:

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

Subscriber 接口定义的这组方法构成了数据流请求和处理的基本流程,其中,onSubscribe() 从命名上看就是一个回调方法,当发布者的 subscribe() 方法被调用时就会触发这个回调。而在该方法中有一个参数 Subscription,可以把这个 Subscription 看作是一种用于订阅的上下文对象。Subscription 对象中包含了这次回调中订阅者想要向发布者请求的数据个数。

当订阅关系已经建立,那么发布者就可以调用订阅者的 onNext() 方法向订阅者发送一个数据。这个过程是持续不断的,直到所发送的数据已经达到 Subscription 对象中所请求的数据个数。这时候 onComplete() 方法就会被触发,代表这个数据流已经全部发送结束。而一旦在这个过程中出现了异常,那么就会触发 onError() 方法,我们可以通过这个方法捕获到具体的异常信息进行处理,而数据流也就自动终止了。

4.3 Subscription

Subscription 代表的就是一种订阅上下文对象,它在订阅者和发布者之间进行传输,从而在两者之间形成一种契约关系,接口如下:

public interface Subscription {
    public void request(long n);
    public void cancel();
}

这里的 request() 方法用于请求 n 个元素,订阅者可以通过不断调用该方法来向发布者请求数据;而 cancel() 方法显然是用来取消这次订阅。请注意,Subscription 对象是确保生产者和消费者针对数据处理速度达成一种动态平衡的基础,也是流量控制中实现背压机制的关键所在

在这里插入图片描述

4.4 Processor<T,R>

Processor 代表的就是订阅者和发布者的处理阶段,Processor 接口继承了 Publisher 和 Subscriber 接口。 它用于转换发布者——订阅者管道中的元素。 Processor<T,R> 订阅类型 T 的数据元素,接收并转换为类型 R 的数据,并发布变换后的数据。接口如下:

public interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

下图显示了处理者在发布者——订阅和管道中作为转换器的作用,可以拥有多个处理者。

在这里插入图片描述

五、总结

  • 响应式流规范定义的很简洁,但实现起来并不简单,发布者和订阅者之间的所有交互的异步性质以及背压机制使得实现变得复杂。
  • 响应式流规范非常灵活,还可以提供独立的“推”模型和“拉”模型。如果为了实现纯“推”模型,我们可以考虑一次请求足够多的元素;而对于纯“拉”模型,相当于就是在每次调用 Subscriber 的 onNext() 方法时只请求一个新元素。
  • JDK 9 中提供了 Flow 响应式流接口,与响应式流兼容的接口,可以看得出,JDK 团队后续的发展趋势也是想往响应式流这块靠近。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/167530.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【BP靶场portswigger-客户端14】点击劫持-5个实验(全)

前言&#xff1a; 介绍&#xff1a; 博主&#xff1a;网络安全领域狂热爱好者&#xff08;承诺在CSDN永久无偿分享文章&#xff09;。 殊荣&#xff1a;CSDN网络安全领域优质创作者&#xff0c;2022年双十一业务安全保卫战-某厂第一名&#xff0c;某厂特邀数字业务安全研究员&…

Fastdfs分布式文件系统原理浅析

文章目录1、fastdfs文件系统原理简述2、storage server状态2.1 组内新增加一台storage server A时&#xff0c;由系统自动完成已有数据同步&#xff0c;处理逻辑如下&#xff1a;第一步&#xff1a;第二步&#xff1a;第三步&#xff1a;第四步&#xff1a;3、同步时间管理4、B…

[有人@你]请查收你的年终总结报告

嗨&#xff0c;兄dei&#xff0c;我是建模助手。 新年伊始&#xff0c;最近大家想必已经被各大平台的2022年度报告刷屏了。 听歌软件伴你度过的失眠夜&#xff0c;外卖软件拯救你的饥饿时刻&#xff0c;还有某俩宝账单告诉你&#xff0c;其实你是有钱的&#xff0c;只是你看不到…

基于有向图的邻接矩阵计算其割点、割边、压缩图,并用networkx可视化绘制

基于有向图的邻接矩阵计算其割点、割边、压缩图&#xff0c;并用networkx可视化绘制为什么基于邻接矩阵计算图的割点、割边、压缩图实现python代码代码运行效果结论&#xff1a;为什么基于邻接矩阵计算图的割点、割边、压缩图 由于矩阵计算过程&#xff0c;被广泛优化&#xf…

Linux关于 gdb 调试器的使用

坚持看完&#xff0c;结尾有思维导图总结 这里写目录标题debug 和 release 版本gdb 常见命令断点逐行调试和观察变量总结debug 和 release 版本 首先要说的是 &#xff0c;在 Linux 中 gcc 直接编译是不能进行调试的 而是要在加上 -g 选项才能得到可调试的文件 以下程序用一个…

算法第十二期——BFS-双向广搜

双向广搜 应用场景&#xff1a;有确定的起点s和终点t&#xff1b;把从起点到终点的单向搜索&#xff0c;变换为分别从起点出发和从终点出发的“相遇”问题。操作&#xff1a;从起点s(正向搜索&#xff09;和终点t(逆向搜索&#xff09;同时开始搜索&#xff0c;当两个搜索产生…

Spring入门-Spring事务管理

文章目录1&#xff0c;Spring事务管理1.1 Spring事务简介1.1.1 相关概念介绍1.1.2 转账案例-需求分析1.1.3 转账案例-环境搭建步骤1:准备数据库表步骤2:创建项目导入jar包步骤3:根据表创建模型类步骤4:创建Dao接口步骤5:创建Service接口和实现类步骤6:添加jdbc.properties文件步…

数据治理与档案信息资源体系建设

如果要评选大数据或者数字化转型领域中哪个词最让人费解、最讲不清楚&#xff0c;“数据治理&#xff08;Data Governance&#xff09;”绝对是候选之一。说实话&#xff0c;笔者到现在也没有完全整明白&#xff0c;因为数据治理包含的范围太广了&#xff0c;可以说是包罗万象&…

高潜人才的自我要求

前言&#xff0c;上次写了个《潜力出众的你有这样的特质吗&#xff1f;》&#xff0c;地址如下&#xff1a;点我查看&#xff0c;这次在写个高潜人才的自我要求。本次以6个纬度来进行分析&#xff1b;3是基本要求&#xff0c;4是追求卓越&#xff0c;看你目前做到了哪个级别&am…

跨平台API对接(Python)的使用

Jenkins 是一个开源的、提供友好操作界面的持续集成(CI)工具&#xff0c;起源于 Hudson&#xff08;Hudson 是商用的&#xff09;&#xff0c;主要用于持续、自动的构建/测试软件项目、监控外部任务的运行。后端可以利用 Jenkins 对任务进行调度运行&#xff1a;后端可利用 HTT…

【进阶】Spring更简单的读取和存储对象

努力经营当下&#xff0c;直至未来明朗&#xff01; 文章目录一、存储Bean对象一&#xff09;前置工作&#xff1a;配置扫描路径&#xff08;重要&#xff09;二&#xff09;添加注解存储Bean对象3. 五大类注解&#xff1a;4. 方法注解&#xff1a;6. 相关问题7. 补充【结论、查…

ROS2机器人编程简述humble-第二章-DEVELOPING THE FIRST NODE .2

0.1ROS2机器人编程简述新书推荐-A Concise Introduction to Robot Programming with ROS21.1ROS2机器人编程简述humble-第一章-Introduction2.1ROS2机器人编程简述humble-第二章-First Steps with ROS2 .12.2主要内容是全手工创建一个最简单的自定义节点&#xff0c;其实没啥具…

IB学生必看的时间表(二)

上期谈到在IB预科课程的第一个学年下学期&#xff0c;便要开始作报读大学的准备&#xff0c;到底为什么&#xff1f; 暑假不容松懈 现在来到放暑假了。虽说不用上课&#xff0c;学生没有了学习压力&#xff0c;但就以下三方面来看&#xff0c;学生还是要继续投放心力。 首先&am…

Unity 之 Addressable可寻址系统 -- 代码加载介绍 -- 进阶(一)

Unity 之 可寻址系统 -- 代码加载介绍 -- 进阶&#xff08;一&#xff09;一&#xff0c;可寻址系统代码加载1.1 回调形式1.2 异步等待1.3 面板赋值1.4 同步加载二&#xff0c;可寻址系统分标签加载2.1 场景搭建2.2 代码示例2.3 效果展示三&#xff0c;代码加载可寻址的解释概述…

Cadence OrCAD: 跨页符和电源符号命名优先级的一个小问题

Cadence OrCAD: 跨页符和电源符号命名优先级的一个小问题 遇到的问题 最近项目中&#xff0c;有个电源需要做负载端的反馈&#xff0c;类似下图的signal1和signal1N&#xff0c;反馈使用类似伪差分线&#xff0c;把电压信号和负载端的GND都连到DC-DC控制器。DC-DC对应的反馈引…

字节跳动青训营--前端day1

文章目录前言一、 前端1 前端的技术栈2. 前端的边界3. 前端的关注点二、 HTML1. HTML常用标签及语义化2. HTML 语法3. 谁在使用我们写的HTML前言 仅以此文章记录学习历程 一、 前端 解决GUI人机交互问题 1 前端的技术栈 2. 前端的边界 nodejs–服务器端应用 electron… --客…

【数据结构】6.1 图的基本概念和术语

文章目录前言6.1 图的定义和术语前言 图是一种比线性表和树更为复杂的数据结构。 在线性结构中&#xff0c;结点之间的关系属于一个对一个&#xff1b;数据元素之间有着线性关系&#xff0c;每个数据元素只有一个直接前趋和一个直接后继&#xff0c; 在树形结构中&#xff0c;…

算法设计与分析课程

算法的由来 算法的定义 算法的定义&#xff1a;给定计算问题&#xff0c;算法是一系列良定义的计算步骤&#xff0c;逐一执行计算步骤可得到预期的输出。 良定义&#xff1a;定义明确无歧义 计算步骤&#xff1a;计算机可以实现的指令 有了良定义的计算步骤&#xff0c;计算机就…

Java基础篇01-运算符的使用

01| Java中的数据类型 ) 1. 数值型&#xff1a; 序号类型空间占用说明最小值最大值默认值优缺点对比举例1byte8位有符号整数-128(-2^7)127 (2^7-1)0byte 类型用在大型数组中节约空间&#xff0c;主要代替整数&#xff0c;因为 byte 变量占用的空间只有 int 类型的四分之一by…

6、Denoising Diffusion Probabilistic Models(扩散模型)

简介 主页&#xff1a;https://hojonathanho.github.io/diffusion/ 扩散模型 &#xff08;diffusion models&#xff09;是深度生成模型中新的SOTA。 扩散模型在图片生成任务中超越了原SOTA&#xff1a;GAN&#xff0c;并且在诸多应用领域都有出色的表现&#xff0c;如计算机…