flink反压

news2025/1/9 2:24:46

flink反压(backpressure),简单来说就是当接收方的接收速率低于发送方的发送速率,这时如果不做处理就会导致接收方的数据积压越来越多直到内存溢出,所以此时需要一个机制来根据接收方的状态反过来限制发送方的发送速率,来达到一个两者速率匹配的状态。

TCP-based 反压

在flink1.5之前的版本,使用的是TCP-based 的反压机制,不过这种机制缺点明显。

(1)在一个 TaskManager 中可能要执行多个 Task,如果多个 Task 的数据最终都要传输到下游的同一个 TaskManager 就会复用同一个 Socket 进行传输,这个时候如果单个 Task 产生反压,就会导致复用的 Socket 阻塞,其余的 Task 也无法使用传输,checkpoint barrier 也无法发出导致下游执行 checkpoint 的延迟增大。(2)依赖最底层的 TCP 去做流控,会导致反压传播路径太长,导致生效的延迟比较大。

所以这里不再介绍TCP-based 的反压机制。

在flink1.5以后引入了Credit-based 反压,可以理解为就是在 Flink 层面实现类似 TCP 流控的反压机制来解决上述的弊端,Credit 可以类比为 TCP 的 Window 机制。

Credit-based 反压

在 Flink 层面实现反压机制,通过 ResultPartition 和 InputGate 传输 feedback 。Credit-base 的 feedback 步骤:(1)每一次 ResultPartition 向 InputGate 发送数据的时候,都会发送一个 backlog size 告诉下游准备发送多少消息,下游就会去计算有多少的 Buffer 去接收消息。(backlog 的作用是为了让消费端感知到我们生产端的情况)(2)如果下游有充足的 Buffer ,就会返还给上游 Credit (表示剩余 buffer 数量),告知发送消息(图上两个虚线是还是采用 Netty 和 Socket 进行通信)。下面用图来解释:

如图所示在 Flink 层面实现反压机制,就是每一次 ResultSubPartition 向 InputChannel 发送消息的时候都会发送一个 backlog size 告诉下游准备发送多少消息,下游就会去计算有多少的 Buffer 去接收消息,算完之后如果有充足的 Buffer 就会返还给上游一个 Credit 告知他可以发送消息(图上两个 ResultSubPartition 和 InputChannel 之间是虚线是因为最终还是要通过 Netty 和 Socket 去通信),下面我们看一个具体示例。

假设我们上下游的速度不匹配,上游发送速率为 2,下游接收速率为 1,可以看到图上在 ResultSubPartition 中累积了两条消息,10 和 11, backlog 就为 2,这时就会将发送的数据 <8,9> 和 backlog = 2 一同发送给下游。下游收到了之后就会去计算是否有 2 个 Buffer 去接收,可以看到 InputChannel 中已经不足了这时就会从 Local BufferPool 和 Network BufferPool 申请,好在这个时候 Buffer 还是可以申请到的。

过了一段时间后由于上游的发送速率要大于下游的接受速率,下游的 TaskManager 的 Buffer 已经到达了申请上限,这时候下游就会向上游返回 Credit = 0,ResultSubPartition 接收到之后就不会向 Netty 去传输数据,上游 TaskManager 的 Buffer 也很快耗尽,达到反压的效果,这样在 ResultSubPartition 层就能感知到反压,不用通过 Socket 和 Netty 一层层地向上反馈,降低了反压生效的延迟。同时也不会将 Socket 去阻塞,解决了由于一个 Task 反压导致 TaskManager 和 TaskManager 之间的 Socket 阻塞的问题。

总结:

1. 网络流控是为了在上下游速度不匹配的情况下,防止下游出现过载。

2. 网络流控有静态限速和动态反压两种手段

3. Flink 1.5 之前是基于 TCP 流控 + bounded buffer 实现反压

4. Flink 1.5 之后实现了自己托管的 credit – based 流控机制,在应用层模拟 TCP 的流控机制

是否有了动态反压,静态限速就没用了?不是的。实际上动态反压不是万能的,我们流计算的结果最终是要输出到一个外部的存储(Storage),外部数据存储到 Sink 端的反压是不一定会触发的,这要取决于外部存储的实现,像 Kafka 这样是实现了限流限速的消息中间件可以通过协议将反压反馈给 Sink 端,但是像 ES 无法将反压进行传播反馈给 Sink 端,这种情况下为了防止外部存储在大的数据量下被打爆,我们就可以通过静态限速的方式在 Source 端去做限流。所以说动态反压并不能完全替代静态限速的,需要根据合适的场景去选择处理方案。

反压影响

反压并不会直接影响作业的可用性,它表明作业处于亚健康的状态,有潜在的性能瓶颈并可能导致更大的数据处理延迟。通常来说,对于一些对延迟要求不高或者数据量较少的应用,反压的影响可能并不明显。然而对于规模比较大的 Flink 作业,反压可能会导致严重的问题。

反压会影响checkpoint
(1)checkpoint时长:checkpoint barrier跟随普通数据流动,如果数据处理被阻塞,使得checkpoint barrier流经整个数据管道的时长变长,导致checkpoint 总体时间变长。
(2)state大小:为保证Exactly-Once准确一次,对于有两个以上输入管道的 Operator,checkpoint barrier需要对齐,即接受到较快的输入管道的barrier后,它后面数据会被缓存起来但不处理,直到较慢的输入管道的barrier也到达。这些被缓存的数据会被放到state 里面,导致checkpoint变大。
checkpoint是保证准确一次的关键,checkpoint时间变长有可能导致checkpoint超时失败,而state大小可能拖慢checkpoint甚至导致OOM。

反压监控

1. flink web ui自带反压监控

该页面提供了 SubTask 级别的反压监控,1.13 版本以前是通过周期性对 Task 线程的栈信息采样,得到线程被阻塞在请求 Buffer(意味着被下游队列阻塞)的频率来判断该节点是否处于反压状态。Flink 1.13 优化了反压检测的逻辑(使用基于任务 Mailbox 计时,而不在再于堆栈采 样),并且重新实现了作业图的 UI 展示:Flink 现在在 UI 上通过颜色和数值来展示繁忙和反压的程度。黑色表示反压严重,红色表示非常繁忙,蓝色表示比较空闲。Backpressure Status值有OK/LOW/HIGH。

OK: 0% <= back pressured <= 10%
LOW: 10% < back pressured <= 50%
HIGH: 50% < back pressured <= 100%

假如通过web ui 查看到某个算子处于反压状态,可以分析该算子瓶颈:

如果处于反压状态,那么有两种可能性:
(1)该节点的发送速率跟不上它的产生数据速率。这一般会发生在一条输入多条输出的 Operator(比如 flatmap)。这种情况,该节点是反压的根源节点,它是从 Source Task 到 Sink Task 的第一个出现反压的节点。
(2)下游的节点接受速率较慢,通过反压机制限制了该节点的发送速率。这种情况, 需要继续排查下游节点,一直找到第一个为 OK 的一般就是根源节点。 总体来看,如果我们找到第一个出现反压的节点,反压根源要么是就这个节点,要么是 它紧接着的下游节点。 通常来讲,第二种情况更常见。如果无法确定,还需要结合 Metrics 进一步判断。

2. 利用 Metrics 定位

监控反压时会用到的 Metrics 主要和 Channel 接受端的 Buffer 使用率有关,最为有用的是以下几个 Metrics:

inPoolUsage = floatingBuffersUsage + exclusiveBuffersUsage。

(1)采用 Metrics 分析反压的思路

如果一个 Subtask 的发送端 Buffer 占用率很高,则表明它 被下游反压限速了;如果一个 Subtask 的接受端 Buffer 占用很高,则表明它将反压传导至上游。

(2)将inPoolUsage分为floatingBuffersUsage + exclusiveBuffersUsage进一步分析

Flink 1.9及以上版本,还可以根据 floatingBuffersUsage/exclusiveBuffersUsage 以 及其上游 Task 的 outPoolUsage 来进行进一步的分析一个 Subtask 和其上游 Subtask 的数据传输。
在流量较大时,Channel 的 Exclusive Buffer 可能会被写满,此时 Flink 会向 Buffer Pool 申请剩余的 Floating Buffer。这些 Floating Buffer 属于备用 Buffer。

解析:
1)floatingBuffersUsage 为高则表明反压正在传导至上游。
2)exclusiveBuffersUsage 则表明了反压可能存在倾斜。如果floatingBuffersUsage 高、exclusiveBuffersUsage 低,则存在倾斜。因为少数 channel 占用了大部分的 floating Buffer(channel 有自己的 exclusive buffer,当 exclusive buffer 消耗完,就会使用floating Buffer)。

如何分析反压

(1)数据倾斜
通过 Web UI 各个 SubTask 的 Records Sent 和 Record Received 来确认,另外 Checkpoint detail 里不同 SubTask 的 State size 也是一个分析数据倾斜的有用指标。解决方式把数据分组的 key 进行本地/预聚合来消除/减少数据倾斜。
(2)用户代码的执行效率
对 TaskManager 进行 CPU profile,分析 TaskThread 是否跑满一个 CPU 核:如果没有跑满,需要分析 CPU 主要花费在哪些函数里面,比如生产环境中偶尔会卡在 Regex 的用户函数(ReDoS);如果没有跑满,需要看 Task Thread 阻塞在哪里,可能是用户函数本身有些同步的调用,可能是 checkpoint 或者 GC 等系统活动。
(3)TaskManager 的内存以及 GC
TaskManager JVM 各区内存不合理导致的频繁 Full GC 甚至失联。可以加上 -XX:+PrintGCDetails 来打印 GC 日志的方式来观察 GC 的问题。推荐TaskManager 启用 G1 垃圾回收器来优化 GC。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1460145.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

10、内网安全-横向移动域控提权NetLogonADCSPACKDC永恒之蓝

用途&#xff1a;个人学习笔记&#xff0c;有所借鉴&#xff0c;欢迎指正&#xff01; 背景&#xff1a; 主要针对内网主机中的 域控提权漏洞&#xff0c;包含漏洞探针和漏洞复现利用。 1、横向移动-系统漏洞-CVE-2017-0146&#xff08;ms17-010&#xff0c;永恒之蓝&#xff0…

ts环境下如何解决第三方npm包类型报错的问题

在 TypeScript 环境下&#xff0c;当引入第三方 npm 包时出现类型报错的问题&#xff0c;可以尝试以下几种解决方法&#xff1a; 1、安装 types 包&#xff1a;许多流行的第三方 npm 包都有对应的 TypeScript 类型声明文件&#xff0c;这些声明文件通常以 types/包名 的形式发…

odoo16-API(Controller)带有验证访问的接口

odoo16-API&#xff08;Controller&#xff09;带有验证访问的接口 目前我使用odoo原生的登录token来验证登陆的有效性 废话不多说直接上代码 # 测试获取session_id import requests class GetOdooData(http.Controller):def getOdooToken(self):# http://localhost:8123访问…

蓝桥杯:C++素数、进制转换

素数 定义&#xff1a; 质数又称素数。一个大于1的自然数&#xff0c;除了1和它自身外&#xff0c;不能被其他自然数整除的数叫做质数&#xff1b;否则称为合数&#xff08;规定1既不是质数也不是合数&#xff09;。 代码&#xff1a; #include<bits/stdc.h> using n…

【flutter】环境安装

安装flutter sdk 下载sdk flutter sdk就包含dart&#xff0c;所以我们只用安装flutter sdk就可以了。 我们去清华大学开源软件镜像站下载&#xff0c;flutter开发中&#xff0c;版本对不上基本项目就跑步起来&#xff0c;如果是团队协同开发的话&#xff0c;建议统一下载指定版…

ubuntu22.04@Jetson OpenCV安装

ubuntu22.04Jetson OpenCV安装 1. 源由2. 分析3. 证实3.1 jtop安装3.2 jtop指令3.3 GPU支持情况 4. 安装OpenCV4.1 修改内容4.2 Python2环境【不需要】4.3 ubuntu22.04环境4.4 国内/本地环境问题4.5 cudnn版本问题 5. 总结6. 参考资料 1. 源由 昨天用Jetson跑demo程序发现帧率…

算法——排序算法

目录 1、冒泡排序 2、插入排序 3、选择排序 4、归并排序 5、快速排序 6、堆排序 7、计数排序 8、桶排序 9、基数排序 常见的排序算法包括&#xff1a; 冒泡排序&#xff08;Bubble Sort&#xff09;插入排序&#xff08;Insertion Sort&#xff09;选择排序&#xff08;Se…

Element UI 组件的安装及使用

Element UI 组件的安装及使用 Element UI 是一套基于 Vue.js 的桌面端 UI 组件库&#xff0c;提供了丰富的、高质量的 UI 组件&#xff0c;可以帮助开发者快速构建用户界面。 1、安装 Element UI 使用 npm 安装 npm install element-ui -S2、使用 CDN 安装 在 HTML 页面中引…

祖龙娱乐 x Incredibuild

关于祖龙娱乐 祖龙娱乐有限公司&#xff08;下文简称“祖龙娱乐”&#xff09;是一家总部位于北京的移动游戏开发公司&#xff0c;成立于 2014 年&#xff0c;拥有成功的大型多人在线角色扮演游戏移动游戏组合&#xff0c;如《六龙争霸》、《梦幻诛仙》和《万王之王 3D》。公司…

安宝特AR汽车行业解决方案系列1-远程培训

在汽车行业中&#xff0c;AR技术的应用正悄然改变着整个产业链的运作方式&#xff0c;应用涵盖培训、汽修、汽车售后、PDI交付、质检以及汽车装配等&#xff0c;AR技术为多个环节都带来了前所未有的便利与效率提升。 安宝特AR将以系列推文的形式为读者逐一介绍在汽车行业中安宝…

Kotlin基本语法 4 类

1.定义类 package classStudyclass Player {var name:String "jack"get() field.capitalize()set(value) {field value.trim()} }fun main() {val player Player()println(player.name)player.name " asdas "println(player.name)} 2.计算属性与防范…

网络爬虫基础(上)

1. 爬虫的基本原理 爬虫就是在网页上爬行的蜘蛛&#xff0c;每爬到一个节点就能够访问该网页的信息&#xff0c;所以又称为网络蜘蛛&#xff1b; 网络爬虫就是自动化从网页上获取信息、提取信息和保存信息的过程&#xff1b; 2. URL的组成部分 URL全称为Uniform Resource L…

Docker Compose映射卷的作用是什么,dockerfile这个文件有什么区别和联系?

Docker Compose中映射卷&#xff08;Volumes&#xff09;的作用和Dockerfile之间既有区别也有联系。下面详细解释两者的作用、区别和联系&#xff1a; Docker Compose映射卷的作用 在Docker Compose中&#xff0c;卷&#xff08;Volumes&#xff09;用于数据持久化和数据共享…

ajax 如何从服务器上获取数据?

在Web开发中&#xff0c;AJAX&#xff08;Asynchronous JavaScript and XML&#xff09;是一种常用的技术&#xff0c;用于在不重新加载整个页面的情况下&#xff0c;从服务器获取数据并更新网页的某一部分。使用AJAX&#xff0c;你可以创建异步请求&#xff0c;从而提供更快的…

大话设计模式——2.简单工厂模式(Simple Factory Pattern)

定义&#xff1a;又称静态工厂方法&#xff0c;可以根据参数的不同返回不同类的实例&#xff0c;专门定义一个类&#xff08;工厂类&#xff09;来负责创建其他类的实例可通过类名直接调用&#xff0c;被创建的实例通常具有共同的父类。 UML图&#xff1a; 例子&#xff1a; 计…

数据仓库选型建议

1 数仓分层 1.1 数仓分层的意义 **数据复用&#xff0c;减少重复开发&#xff1a;**规范数据分层&#xff0c;开发一些通用的中间层数据&#xff0c;能够减少极大的重复计算。数据的逐层加工原则&#xff0c;下层包含了上层数据加工所需要的全量数据&#xff0c;这样的加工方…

SQL Developer 小贴士:显示RAC配置

前提&#xff1a; 已建立2节点RAC已在SQL Developer中建立了2个连接&#xff0c;分别到RAC的两个节点 然后单击菜单View>DBA&#xff0c;分别连接RAC节点1和节点2&#xff0c;并组织成目录&#xff08;不必须&#xff0c;但建议&#xff09;。 在两处可以体现为RAC配置。第…

python基础 | 模块与异常

1、模块 Python 模块(Module)&#xff0c;是一个 Python 文件&#xff0c;以 .py 结尾&#xff0c;包含了 Python 对象定义和Python语句 模块让你能够有逻辑地组织你的 Python 代码段&#xff0c;不可能把代码写在一起 把相关的代码分配到一个模块里能让你的代码更好用&#…

Backtrader 量化回测实践(1)—— 架构理解和MACD/KDJ混合指标

Backtrader 量化回测实践&#xff08;1&#xff09;—— 架构理解和MACD/KDJ混合指标 按Backtrader的架构组织&#xff0c;整理了一个代码&#xff0c;包括了Backtrader所有的功能点&#xff0c;原来总是使用SMA最简单的指标&#xff0c;现在稍微增加了复杂性&#xff0c;用MA…

读写锁学习笔记

1、数据结构 读锁是共享模式&#xff0c;写锁是独占模式&#xff0c;两个锁也公用一个AQS 两者共用一个state来表示&#xff0c;state前16位表示读锁&#xff0c;后16位表示写锁 读锁操作 通过向右位移16位&#xff0c;然后进行操作 写锁操作 通过和0000 0000 0000 0000 111…