Flink 第3章 反压策略

news2025/1/15 6:39:05

概述

Flink 中文网站的讲解

https://flink-learning.org.cn/article/detail/138316d1556f8f9d34e517d04d670626

涉及内容:

  • 网络流控的概念与背景

  • TCP的流控机制

  • Flink TCP-based 反压机制 1.5之前

  • Flink Credit-based 反压机制 1.5及以后

  • 总结与思考

网络流控的概念与背景

为什么需要网络流控

当我们 Producer 的速率是2MB/S,而 Consumer 的速率是 1MB/S,这个时候我们就会发现在网络通信的时候,我们的 Producer 速度是比 Consumer 要快的,有 1MB/S 的差距。那么对应到 Send Buffer 和 Receiver Buffer 来说,速率差就回导致 Receive Buffer 越积越多,最终支撑不住。

当 Receive buffer 是固定大小,那么就会造成 Consumer 丢弃数据。

当 Receive Buffer 是无限大小,那么 Receive Buffer 会持续扩张,最终导致 OOM。

网络流控的实现:静态限速

那么我就需要一个限速的功能来避免上述的后果:

我们要解决上下游速度差的问题,就需要在 Producer 端实现一个类似 Rate Limiter 的静态限流。将 Rroducer 端的速率慢慢降到 1MB/S 即可,那么需要解决的问题:

  • 事先无法预估 Consumer 到底能够承受多大的速率。

  • Consumer 的承受能力通常是变化波动的。

网络流控的实现:动态反馈 / 自动反压

针对静态限速的问题,我们演进到了动态反馈(自动反压)的机制,我们需要 Consumer 能够及时的给 Producer 做一个 feedback ,即告知 Producer 能够承受的速率是多少,动态反馈分为两种:

负反馈:接受速率不够的时候,告知 Producer 降低发送速率。

正反馈:Consumer 速率大于 Producer 的时候,告知 Producer 可以把发送速率提上来。

Storm 反压实现

在 Storm 的反压机制中,每一个 Bolt 都会有一个线程用来监测反压 Backpressure Thread,这个线程一旦检测到 Bolt 里的接收队列 recv queue 出现了严重的阻塞,就会把这个结果写入到 zookeeper 中,而 zookeeper 会一直被 Spout 监听,监听到有反压的情况就回停止发送,通过这样的方式匹配上下游的发送接收速率。

Spark Steaming 反压实现

Spark Streaming 里面也有类似的 FeedBack 机制,上图 Fecher 会实时的从 Buffer、Processing 这样的节点,收集指标,然后通过 Controller 把速度接收的情况再反馈到 Receiver,实现速率的匹配。

Flink before 1.5 为什么没有类似的方式实现feedback 机制?

Flink的网络传输架构:

可以看到 Flink 在做网络传输的时候,基本的数据流向。发送端在发送网络数据前要经历自己内部的一个流程,会有一个自己的 Network Buffer,在底层使用 Netty 去做通信,Netty 这一层又有属于自己的 ChannelOutBound Buffer。因为最终使用过 Socket 进行网络请求发送,那么在 Socket 也有自己的 Send Buffer。所以在发送端和接收端都是这样的三级 Buffer。

TCP 本身是自带流量控制的,所以 Flink before 1.5 就是通过 TCP 来实现 feedback 操作的。

TCP 流控机制

回顾一下简单 TCP 包的格式结构。首先会有 Sequence number这样一个机制给每个数据包做一个编号,还有 ACK number 用来确保 TCP 的数据传输是可靠的,还有 Window Size,接收端再回复消息的时候通过 Window Size 来告诉发送端还可以发送多少条数据。

TCP 流控: 滑动窗口

TCP 的流控制就是基于滑动窗口的机制,现在我们有一个 Socket 的发送端一个 Socket 接收端,如果发送端是接收端的3倍速率:

假定发送端的 window 大小为3,接收端的 window 大小固定为5(当然可能是不固定的)。

上游发送3个数据给下游,下游存储在 window 中。

下游消费1个 packet,那么下游窗口会往右滑动一格,而数据 2/3还在队列中,4/5/6空着,这个时候下游会发送 ACK=4 给上游(代表请求从4处进行数据的发送,同时将反馈 window 设置为3,因为还空着3个格子)。

如果这个时候下游继续消费完毕2,那么继续往右滑动1个格子,同时反馈 ACK=7(代表请求从7处发送数据,同时反馈上游请将 window 设为1,因为只剩一个格子)。这样达成限流的作用:

Flink TCP-based 反压机制 before v1.5

知道了 TCP 是通过滑动窗口进行数据的限流,以免数据丢失。那现在看看 Flink 的 TCP 反压示例:

WindowWordCount 反压

使用 Socket 接收数据后,每5秒进行一次 WordCount 操作。

编译为 JobGraph

在 Client 端,根据 StreamGraph 生成 JobGraph,进行一些适量的优化,比如没有 shuffle 的机制节点进行合并,然后进行提交。

调度ExecutionGraph

当JobGraph提交到集群后,会生成ExecutionGraph,这个时候具备执行任务的雏形,把每个任务拆解成不同的SubTask:

最后将ExecutionGraph物理化执行图,可以看到每个Task在接收数据的时候都会通过InputChannel来接收数据,ResultPartition来发送数据,在ResultPartition中去做分区与下游的Task数量保持一致,就形成了两者对应关系:

问题拆解:反压传播的两个阶段

反压的传播实际上是分为两个阶段的,对应着执行图。

一共涉及3个TaskManager,在每个TaskManager中都有相应的Task在执行,还有负责接收数据的InputGate,发送数据的ResultPartition。

假设在这个时候,下游Sink出现了问题,那么处理速度降速的信号是怎么传播回去的呢?

一般分为两种:

  1. 跨TaskManager

  1. TaskManager内部

跨TaskManager数据推送过程

前面我们知道了发送数据需要ResultPartition,在其内部会有跟去ResultSubPartition,中间还会有内存管理的Buffer。

对于一个TaskManager来说会有一个统一的Network BufferPool被所有的Task共享,在初始化时从Off-heap Memory中申请内存,申请到内存的后续内存管理就是同步Network BufferPool来进行的,不需要依赖JVM GC的机制去释放空间(堆外内存的直接内存 网络内存部分?)。有了NetworkBufferPool之后,可以为每一个ResultSubPartition创建Local BufferPool。

注意InputChannel的个数是上游有多少个ResultSubPartition给自己发数据而决定的。

如上图左边的TaskManager 的Record Writer写了 <1,2>数据进入,因为ResultSubPartition初始化的时候为空,没有Buffer用来接收,就回直接向Local BufferPool申请内存,这时 LocalBufferPool 也没有足够的内存,于是将请求传递给Network BufferPool,最终将申请到的Buffer按原链路返回给ResultSubPartition,这个时候ResultSubPartition写入<1,2>。

之后会将ResultSubPartition的Buffer拷贝到Netty的Buffer,继续拷贝到Socket的Buffer中,将数据发送出去。

而下游也是一样的申请规则进行资源申请,接收数据。

跨TaskManager反压过程

那么当上游速率大于下游的时候,整体反压是怎么样的呢?

因为下游的速度慢,会导致InputChannel的Buffer被用尽,于是他会向Local BufferPool申请新的Buffer。

这个时候可以看到Local BufferPool中的一个Buffer被标记为Used,当Local BufferPool中全部被标记为Used后,只能继续向Network BufferPool中进行申请,当然每个LocalBufferPool都有最大使用Buffer(防止一个Local BufferPool耗尽 NetworkBufferPool)。

当都申请达到最大值后,那么Local BufferPool可以使用的Buffer到达了上限,无法继续向Network BufferPool进行申请。那么意味着没办法去读取新的数据,这个时候Netty AutoRead就会被禁止,Netty也不会从Socket的Buffer中读取数据了。

显然,我们底层的Socket过不了多久也会将Buffer用尽,这时候TCP连接中就会将Window=0发送给客户端,上游Socket停止发送数据。

同样的,上游也会不断的堵塞,将能够申请的Buffer申请完毕,最终整个Operator停止写数据。

TaskManager内部反压过程

跨TaskManager的反压其实就是多个Buffer资源的申请,最终依赖Socket TCP之间的传输完成。

而内部反压的过程就简单很多:

下游的TaskManager反压导致本TaskManager的ResultSubpartition无法继续写入数据,类似于跨TaskManager,会一直申请Buffer,知道Network BufferPool用完,导致RecordWriter不能继续写入数据,Record Reader也不会读入数据。

因为Operator需要有输入才能有计算后的输出,输入输出都是同一个线程执行,那么上游的TaskManager不断发送数据导致接收Buffer也用完,直到TCP 发送window=0的信息给上游。

Flink Credit-based 反压机制(since V1.5)

TCP-based 反压弊端

上面我们介绍了Flink 基于TCP的反压机制,但这个机制存在很多的弊端:

  • 在一个TaskManager中可能要执行多个Task,如果多个Task的数据需要传送到下游的同一个TaskManager中,就会复用同一个Socket进行传输。这样会导致单个Task反压,复用的Socket阻塞,其余的Task也无法传输,Checkpoint barrier无法传输,执行checkpoint的延迟增大。

  • 最终还是使用的TCP 窗口滑动去做流控传输,导致反压的传播途径太长,生效延迟大。

Credit-based 反压

放弃使用TCP的滑动窗口去做流传输,我们需要自己定义一种Socket传输去实现TCP一样的流控。

这样Socket不可以复用得以解决,TCP滑动窗口也被替代:

Flink层面实现反压机制,就是每一次ResultSubPartition向InputChannel发送消息的时候都会发送一个backlog size 告诉下游我将准备发送多少消息,下游依据这个消息去计算有多少的Buffer可以去接受数据,如果有充足的空间,那么会给上游发送 credit 标示可以发送消息。

(最终还是通过 Netty 和 Socket 去通信)

假设上下游的速度不匹配(上游大于下游),可以看到在ResultSubPartition中积累了两条消息,backlog 就为2,这时就会将发送的数据<8,9> 和backlog=2 一起发送给下游,下游之后就回计算是否有2个buffer去接收数据。

如果这个时候InputChannel 的buffer不够用了还是一样的会走 local bufferpool 和 NetworkBufferPool。

如果这个时候我们的InputChannel无法进行Buffer的申请,这时候下游就会向上游返回一个Credit=0,ResultSubPartition接收到之后就不会向Netty传输数据。这样上游的ResultSubPartition也开始进行buffer数据的装填,直到反压。

这整个流程就减少了从TCP最底层和Socket层取阻塞,解决了一个由于Task反压导致TaskManager和TaskManager之间的Socket阻塞问题。

总结和思考

有了动态反压,静态限速是不是完全没有作用了?

实际上动态反压不是万能的,我们流计算的结果最终是要输出到一个外部的存储(Storage),外部数据存储到 Sink 端的反压是不一定会触发的,这要取决于外部存储的实现,像 Kafka 这样是实现了限流限速的消息中间件可以通过协议将反压反馈给 Sink 端,但是像 ES 无法将反压进行传播反馈给 Sink 端,这种情况下为了防止外部存储在大的数据量下被打爆,我们就可以通过静态限速的方式在 Source 端去做限流。所以说动态反压并不能完全替代静态限速的,需要根据合适的场景去选择处理方案。

本期学习

我们理解了 Flink 的背压机制,从 TCP 的反压,到 credit 机制。

整个流程很简单,但是对于真实来说远远没有那么简单,其实还是有很多内容需要考虑。

(反压比率是怎么计算的,各类 queue 大小怎么配置,barrier 遇到背压怎么办……)

结尾

Hadi Flink 中文社区学习之路。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/145498.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AtCoder Beginner Contest 283 E - Don‘t Isolate Elements

E - Dont Isolate Elements (atcoder.jp)题意&#xff1a;题意&#xff1a;定义孤独的数为&#xff0c;该数上下左右的数都和它相反给定一个01矩阵&#xff0c;每次操作可以把某一行的数取反&#xff0c;问你把该矩阵变成没有孤独的数的最少操作次数是多少思路&#xff1a;一开…

AI降噪的N种数据扩增方法

数据和特征决定了机器学习的上限&#xff0c;而模型和算法只是逼近这个上限而已 基于统计信号处理的传统噪声抑制方法是通过检测持续的背景声&#xff0c;来估计背景噪声&#xff0c;然后通过估计到的背景噪声计算增益因子对带噪语音进行抑制。但这种方式针对规律的稳态噪声比较…

【算法笔记】最近公共祖先(LCA)算法详解

0. 前言 最近公共祖先简称 LCA&#xff08;Lowest Common Ancestor&#xff09;。两个节点的最近公共祖先&#xff0c;就是这两个点的公共祖先里面&#xff0c;离根最远的那个。 这种算法应用很广泛&#xff0c;可以很容易解决树上最短路等问题。 为了方便&#xff0c;我们记…

企业内训方案|领导力与执行力/TTT内训师/管理者情商修炼

企业内训方案|领导力与执行力/TTT内训师/管理者情商修炼 》》领导力与执行力 从精兵到强将 高绩效团队协作与跨部门沟通 核心人才的管理与激励 卓越管理者的胜任力提升 MTP中层管理技能提升训练 打造高绩效团队 高效沟通技巧 高绩效团队管理&#xff08;中高层/中基层&#xf…

CRM帮助企业实现销售自动化

随着互联网技术的发展&#xff0c;各家企业都善用互联网优势发布各种信息&#xff0c;导致潜在客户被各种推销信息所淹没&#xff0c;销售周期延长&#xff0c;企业可以借助CRM有效规范销售流程&#xff0c;帮助企业实现销售自动化。 前言 各行各业的业务流程中似乎都少不了销…

OSPF综合实验(1.5)

目标&#xff1a; 1、首先进行基于172.16.0.0/16的ip地址规划 首先题中有5个区域和一个RIP共需要5个网段 可以借3位划分为8个网段 172.16.0.0/19 area 0 然后将172.16.0.0/19再借6位分为172.16.0.0/25---172.16.31.128 25作为其中前一个骨干ip网段 172.16.0.0/25在用于只…

TCP滑动窗口机制(附图例)

文章目录前言一、滑动窗口的引出二、流量控制2.1 16位窗口大小2.2 发送缓冲区2.3 逐步解析滑动窗口运作三、快重传机制四、拥塞控制&#xff08;仅供参考&#xff09;五、延迟应答与捎带应答&#xff08;略&#xff09;总结前言 博主个人社区&#xff1a;开发与算法学习社区 博…

测开-刷笔试题时的知识点

圈复杂度&#xff08;暂缓&#xff09;复杂度越大&#xff0c;程序越复杂计算公式&#xff1a;V(G) E - N 2E代表控制流边的数量&#xff0c;n代表节点数量V (G) P 1p为判定节点数几种常见的控制流图&#xff1a;Linux文件权限具有四种访问权限&#xff1a;r&#xff08;可…

进程信号理解3

进程信号理解3 1.什么叫做信号递达 实际执行信号的处理动作叫做信号递达&#xff0c;比如默认&#xff0c;忽略&#xff0c;自定义动作 2.什么叫做信号未决&#xff1f; 信号产生到信号递达的状态叫做信号未决 3.进程被阻塞和进程被忽略有什么区别&#xff1f; 进程被阻塞属…

iPhone更换字体教程,无需越狱,支持所有苹果设备!

上周开始&#xff0c;技术大神zhuowei 发现了一个iOS系统更换字体的漏洞&#xff0c;经过不断修正&#xff0c;现在已经可利用上了&#xff01; 先来看看更换字体后的效果&#xff0c;更换之后&#xff0c;所有App上的字体都得到更改&#xff0c;下图是打开文章的效果 下图是聊…

excel查重技巧:如何用组合函数快速统计重复数据(上)

统计不重复数据的个数&#xff0c;相信不少小伙伴在工作中都遇到过这样的问题。通常的做法都是先把不重复的数据提取出来&#xff0c;再去统计个数。而提取不重复数据的方法之前也分享过&#xff0c;基本有三种方法&#xff1a;高级筛选、数据透视表和删除重复项。其实使用公式…

Ngnix 实现访问黑名单功能

前言 有时候在配置的时候我们会禁用到一些IP&#xff0c;使用nginx 禁用到ip但是需要重启nginx&#xff0c;这样当我们要是实现动态的这种就比较麻烦&#xff0c;当然你可以使用网关来实现相对于nginx实现的这种方式要好很多&#xff0c;但是今天咱们说到这里&#xff0c;那就…

数据可视化系列-05数据分析报告

文章目录数据可视化系列-05数据分析报告1、了解初识数据分析报告数据分析报告简介数据分析报告的作用报告的能力体现报告编写的原则报告种类2、掌握数据分析报告结构标题页目录前言正文结论与建议附录3、了解报告的描述规范报告注意事项报告表达的维度数据结论可用指标数据可视…

代码随想录算法训练营第3天| 203. 移除链表元素、206. 反转链表

代码随想录算法训练营第3天| 203. 移除链表元素、206. 反转链表 移除链表元素 力扣题目链接 删除链表中等于给定值 val 的所有节点。 这里以链表 1 4 2 4 来举例&#xff0c;移除元素4。 那么因为单链表的特殊性&#xff0c;只能指向下一个节点&#xff0c;刚刚删除的是链表…

RS485通信----基本原理+电路图

一、RS485 通信----简介 RS485 是美国电子工业协会&#xff08;Electronic Industries Association&#xff0c;EIA&#xff09;于1983年发布的串行通信接口标准&#xff0c;经通讯工业协会&#xff08;TIA&#xff09;修订后命名为 TIA/EIA-485-A。 RS485 是一种工业控制环境…

获取Java集合中泛型的Class对象

直接获取时获取不到的&#xff0c;类型被虚拟机擦除了 泛型的正常工作是依赖编译器在编译源码的时候&#xff0c;先进行类型检查&#xff0c;然后进行类型擦除并且在类型参数出现的地方插入强制转换的相关指令实现的。编译器在编译时擦除了所有类型相关的信息&#xff0c;所以…

【36张图,一次性补全网络基础知识】

OSI和TCP/IP是很基础但又非常重要的知识&#xff0c;很多知识点都是以它们为基础去串联的&#xff0c;作为底层&#xff0c;掌握得越透彻&#xff0c;理解上层时会越顺畅。今天这篇网络基础科普&#xff0c;就是根据OSI层级去逐一展开的。 01 计算机网络基础 01 计算机网络的…

让阿里再次伟大--钉钉如何长成独角兽的?

文章目录引子开端发展历程&#xff1a;从2014到2022钉钉和阿里云的全面融合钉钉体系架构技术挑战ToB与ToC的差异安全要求高稳定性要求高业务多样性钉钉的创新存储创新单元化平台开放这些年&#xff0c;钉钉做了哪些优化?钉钉的技术栈钉钉的竞争对手们飞书微信华为welink未来参…

ArcGIS基础实验操作100例--实验58二维点、线转三维

本实验专栏参考自汤国安教授《地理信息系统基础实验操作100例》一书 实验平台&#xff1a;ArcGIS 10.6 实验数据&#xff1a;请访问实验1&#xff08;传送门&#xff09; 高级编辑篇--实验58 二维点、线转三维 目录 一、实验背景 二、实验数据 三、实验步骤 &#xff08;1&…

二、kubernetes集群环境搭建

文章目录1.前置知识点2.kubeadm 部署方式介绍3.安装要求4.最终目标5.集群环境6 初始化环境6.1 检查操作系统版本6.2 主机名解析6.3 时间同步6.4 禁用selinux6.5 禁用swap分区6.6 修改linux的内核参数6.7 配置ipvs功能6.8 安装docker6.9 安装Kubernetes组件6.10 准备集群镜像6.1…