Flink 优化 (三) --------- 反压处理

news2025/1/24 17:33:46

目录

  • 一、概述
    • 1. 反压的理解
    • 2. 反压的危害
  • 二、定位反压节点
    • 1. 利用 Flink Web UI 定位
    • 2. 利用 Metrics 定位
  • 三、反压的原因及处理
    • 1. 查看是否数据倾斜
    • 2. 使用火焰图分析
    • 3. 分析 GC 情况
    • 4. 外部组件交互


一、概述

Flink 网络流控及反压的介绍:https://flink-learning.org.cn/article/detail/138316d1556f8f9d34e517d04d670626

1. 反压的理解

简单来说,Flink 拓扑中每个节点(Task)间的数据都以阻塞队列的方式传输,下游来不及消费导致队列被占满后,上游的生产也会被阻塞,最终导致数据源的摄入被阻塞。

反压 (BackPressure) 通常产生于这样的场景:短时间的负载高峰导致系统接收数据的速率远高于它处理数据的速率。许多日常问题都会导致反压,例如,垃圾回收停顿可能会导致流入的数据快速堆积,或遇到大促、秒杀活动导致流量陡增。

2. 反压的危害

反压如果不能得到正确的处理,可能会影响到 checkpoint 时长和 state 大小,甚至可能会导致资源耗尽甚至系统崩溃。

(1) 影响 checkpoint 时长:barrier 不会越过普通数据,数据处理被阻塞也会导致 checkpoint barrier 流经整个数据管道的时长变长,导致 checkpoint 总体时间(End to End Duration)变长。

(2) 影响 state 大小:barrier 对齐时,接受到较快的输入管道的 barrier 后,它后面数据会被缓存起来但不处理,直到较慢的输入管道的 barrier 也到达,这些被缓存的数据会被放到 state 里面,导致 checkpoint 变大。

这两个影响对于生产环境的作业来说是十分危险的,因为 checkpoint 是保证数据一致性的关键,checkpoint 时间变长有可能导致 checkpoint 超时失败,而 state 大小同样可能拖慢 checkpoint 甚至导致 OOM (使用 Heap-based StateBackend)或者物理内存使用超出容器资源(使用 RocksDBStateBackend)的稳定性问题。因此,我们在生产中要尽量避免出现反压的情况。

二、定位反压节点

解决反压首先要做的是定位到造成反压的节点,排查的时候,先把 operator chain 禁用,方便定位到具体算子。

提交 UvDemo:

bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=2048mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.atguigu.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

1. 利用 Flink Web UI 定位

Flink Web UI 的反压监控提供了 SubTask 级别的反压监控,1.13 版本以前是通过周期性对 Task 线程的栈信息采样,得到线程被阻塞在请求 Buffer(意味着被下游队列阻塞)的频率来判断该节点是否处于反压状态。默认配置下,这个频率在 0.1 以下则为 OK,0.1 至 0.5 为 LOW,而超过 0.5 则为 HIGH。

Flink 1.13 优化了反压检测的逻辑(使用基于任务 Mailbox 计时,而不在再于堆栈采样),并且重新实现了作业图的 UI 展示:Flink 现在在 UI 上通过颜色和数值来展示繁忙和反压的程度。

在这里插入图片描述
1)通过 WebUI 看到 Map 算子处于反压:

在这里插入图片描述
2)分析瓶颈算子

如果处于反压状态,那么有两种可能性:

(1)该节点的发送速率跟不上它的产生数据速率。这一般会发生在一条输入多条输出的Operator(比如 flatmap)。这种情况,该节点是反压的根源节点,它是从 Source Task 到 Sink Task 的第一个出现反压的节点。

(2)下游的节点接受速率较慢,通过反压机制限制了该节点的发送速率。这种情况,需要继续排查下游节点,一直找到第一个为 OK 的一般就是根源节点。总体来看,如果我们找到第一个出现反压的节点,反压根源要么是就这个节点,要么是它紧接着的下游节点。

通常来讲,第二种情况更常见。如果无法确定,还需要结合 Metrics 进一步判断。

2. 利用 Metrics 定位

监控反压时会用到的 Metrics 主要和 Channel 接受端的 Buffer 使用率有关,最为有用的是以下几个 Metrics:

Metris描述
outPoolUsage发送端 Buffer 的使用率
inPoolUsage接收端 Buffer 的使用率
floatingBuffersUsage(1.9 以上)接收端 Floating Buffer 的使用率
exclusiveBuffersUsage(1.9 以上)接收端 Exclusive Buffer 的使用率

其中 inPoolUsage = floatingBuffersUsage + exclusiveBuffersUsage。

1)根据指标分析反压

分析反压的大致思路是:如果一个 Subtask 的发送端 Buffer 占用率很高,则表明它被下游反压限速了;如果一个 Subtask 的接受端 Buffer 占用很高,则表明它将反压传导至上游。反压情况可以根据以下表格进行对号入座(1.9 以上):

outPoolUsage 低outPoolUsage 高
inPoolUsage 低正常① 被下游反压,处于临时情况(还没传递到上游)②可能是反压的根源,一条输入多条输出的场景
inPoolUsage 高① 如果上游所有 outPoolUsage 都是低,有可能最终可能导致反压 (还没传递到上游) ②如果上游的 outPoolUsage 是高,则为反压根源被下游反压

2)可以进一步分析数据传输

Flink 1.9及以上版本,还可以根据 floatingBuffersUsage/exclusiveBuffersUsage 以及其上游 Task 的 outPoolUsage 来进行进一步的分析一个 Subtask 和其上游Subtask 的数据传输。

在流量较大时,Channel 的 Exclusive Buffer 可能会被写满,此时 Flink 会向 BufferPool 申请剩余的 Floating Buffer。这些 Floating Buffer 属于备用 Buffer。

exclusiveBuffersUsage 低exclusiveBuffersUsage 高
① floatingBuffersUsage 低 ② 所有上游 outPoolUsage 低正常
① floatingBuffersUsage 低 ② 上游某个 outPoolUsage 高潜在的网络瓶颈
① floatingBuffersUsage 高 ② 所有上游 outPoolUsage 低最终对部分 inputChannel 反压 (正在传递)最终对大多数或所有 inputChannel反压 (正在传递)
① floatingBuffersUsage 高 ②上游某个 outPoolUsage 高只对部分 inputChannel 反压对大多数或所有inputChannel 反压

总结:

  • floatingBuffersUsage 为高,则表明反压正在传导至上游
  • 同时 exclusiveBuffersUsage 为低,则表明可能有倾斜

比如,floatingBuffersUsage 高、exclusiveBuffersUsage 低为有倾斜,因为少数channel 占用了大部分的 Floating Buffer。

三、反压的原因及处理

注意:反压可能是暂时的,可能是由于负载高峰、CheckPoint 或作业重启引起的数据积压而导致反压。如果反压是暂时的,应该忽略它。另外,请记住,断断续续的反压会影响我们分析和解决问题。

定位到反压节点后,分析造成原因的办法主要是观察 Task Thread。按照下面的顺序,一步一步去排查。

1. 查看是否数据倾斜

在实践中,很多情况下的反压是由于数据倾斜造成的,这点我们可以通过 Web UI 各个 SubTask 的 Records Sent 和 Record Received 来确认,另外 Checkpoint detail 里不同 SubTask 的 State size 也是一个分析数据倾斜的有用指标。

在这里插入图片描述

2. 使用火焰图分析

如果不是数据倾斜,最常见的问题可能是用户代码的执行效率问题 (频繁被阻塞或者性能问题),需要找到瓶颈算子中的哪部分计算逻辑消耗巨大。

最有用的办法就是对 TaskManager 进行 CPU profile,从中我们可以分析到 Task Thread 是否跑满一个 CPU 核:如果是的话要分析 CPU 主要花费在哪些函数里面;如果不是的话要看 Task Thread 阻塞在哪里,可能是用户函数本身有些同步的调用,可能是checkpoint 或者 GC 等系统活动导致的暂时系统暂停。

1)开启火焰图功能

Flink 1.13 直接在 WebUI 提供 JVM 的 CPU 火焰图,这将大大简化性能瓶颈的分析,默认是不开启的,需要修改参数:

rest.flamegraph.enabled: true #默认 false

也可以在提交时指定:

bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Drest.flamegraph.enabled=true \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=2048mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.fancy.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

2)WebUI 查看火焰图

在这里插入图片描述
火焰图是通过对堆栈跟踪进行多次采样来构建的。每个方法调用都由一个条形表示,其中条形的长度与其在样本中出现的次数成正比。

  • On-CPU: 处于 [RUNNABLE, NEW]状态的线程
  • Off-CPU: 处于 [TIMED_WAITING, WAITING, BLOCKED]的线程,用于查看在样本中发现的阻塞调用。

3)分析火焰图

颜色没有特殊含义,具体查看:
➢ 纵向是调用链,从下往上,顶部就是正在执行的函数
➢ 横向是样本出现次数,可以理解为执行时长。

看顶层的哪个函数占据的宽度最大。只要有"平顶"(plateaus),就表示该函数可能存在性能问题。

如果是 Flink 1.13 以前的版本,可以手动做火焰图:
如何生成火焰图:http://www.54tianzhisheng.cn/2020/10/05/flink-jvm-profiler/

3. 分析 GC 情况

TaskManager 的内存以及 GC 问题也可能会导致反压,包括 TaskManager JVM 各区内存不合理导致的频繁 Full GC 甚至失联。通常建议使用默认的 G1 垃圾回收器。

可以通过打印 GC 日志(-XX:+PrintGCDetails),使用 GC 分析器(GCViewer 工具)来验证是否处于这种情况。

➢ 在 Flink 提交脚本中,设置 JVM 参数,打印 GC 日志:

bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Denv.java.opts="-XX:+PrintGCDetails -XX:+PrintGCDateStamps" \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=2048mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.atguigu.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

➢ 下载 GC 日志的方式:

因为是 on yarn 模式,运行的节点一个一个找比较麻烦。可以打开 WebUI,选择JobManager 或者 TaskManager,点击 Stdout,即可看到 GC 日志,点击下载按钮即可将 GC 日志通过 HTTP 的方式下载下来。

在这里插入图片描述
➢ 分析 GC 日志:

通过 GC 日志分析出单个 Flink Taskmanager 堆总大小、年轻代、老年代分配的内存空间、Full GC 后老年代剩余大小等,相关指标定义可以去 Github 具体查看。
GCViewer 地址:https://github.com/chewiebug/GCViewer

Linux 下分析:

java -jar gcviewer_1.3.4.jar gc.log

Windows 下分析:

直接双击 gcviewer_1.3.4.jar,打开 GUI 界面,选择 gc 的 log 打开

扩展:最重要的指标是 Full GC 后,老年代剩余大小这个指标,按照《Java 性能优化权威指南》这本书 Java 堆大小计算法则,设 Full GC 后老年代剩余大小空间为 M,那么堆的大小建议 3 ~ 4 倍 M,新生代为 1 ~ 1.5 倍 M,老年代应为 2 ~ 3 倍 M。

4. 外部组件交互

如果发现我们的 Source 端数据读取性能比较低或者 Sink 端写入性能较差,需要检查第三方组件是否遇到瓶颈,还有就是做维表 join 时的性能问题。

例如:
Kafka 集群是否需要扩容,Kafka 连接器是否并行度较低
HBase 的 rowkey 是否遇到热点问题,是否请求处理不过来
ClickHouse 并发能力较弱,是否达到瓶颈
……

关于第三方组件的性能问题,需要结合具体的组件来分析,最常用的思路:
1)异步 io+热缓存来优化读写性能
2)先攒批再读写

维表 join 参考:
https://flink-learning.org.cn/article/detail/b8df32fbc6542257a5b449114e137cc3
https://www.jianshu.com/p/a62fa483ff54

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/413673.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

threejs-后期通道效果汇总

文章目录前言后期处理通道汇总简单通道效果FilmPassDotScreenPassBloomPassUnrealBloomPassOutlinePassGlitchPassHalftonePass高级通道效果掩码效果MaskPass景深效果 BokehPass景自定义效果 ShaderPass总结前言 Threejs提供了很多后期处理通道,配合 THREE.EffectC…

【并发编程Python】一文了解Python并发编程,协程、线程、进程

并发编程简介和一些前缀知识 并发编程是使得程序大幅度提速的。在并发编程中,程序可以同一时间执行多个任务,这有助于提高程序的吞吐量和响应时间。并发编程设计的主要概念包括线程、锁、同步、信号量、进程间通信等。 前缀知识: IO&#x…

信息系统项目管理师第四版知识摘编:第22章 组织通用治理​

第22章 组织通用治理​ 组织治理是协调组织利益相关者之间关系的一种制度安排,目标是为了确保组织的高效决策,实现利益相关者之间的利益均衡,提高组织的绩效,确保组织运行的可持续发展。​ 22.1组织战略​ 组织战略是组织高质量…

一文读懂:低代码开发平台对企业效益有什么作用?

一文读懂:低代码开发平台对企业效益有什么作用? 近年来,企业数字化转型的需求越来越迫切,但面临着IT人才不足、成本高昂等痛点问题,于是零代码平台应运而生,成为企业数字化转型的重要工具。 市面上的零代…

基于支持向量机SVM的脑部肿瘤识别,脑电波样本熵提取

目录 支持向量机SVM的详细原理 SVM的定义 SVM理论 Libsvm工具箱详解 简介 参数说明 易错及常见问题 SVM应用实例,基于SVM的的脑部肿瘤识别分类预测 代码 结果分析 展望 支持向量机SVM的详细原理 SVM的定义 支持向量机(support vector machines, SVM)是一种二分类模型,它…

Spring boot+Vue博客平台:文章列表展示、文章分类与标签管理模块实现

本文将详细介绍如何实现博客平台中的文章列表展示、文章分类与标签管理功能,包括前端的Vue组件设计和后端的Spring Boot接口实现。在阅读本文后,您将了解如何设计和实现高效、易用的文章列表展示、文章分类与标签管理功能。 一、文章列表展示 1.设计思…

电脑蓝屏错误MACHINE-CHECK-EXCEPTION重装系统教程

电脑蓝屏错误MACHINE-CHECK-EXCEPTION重装系统教程分享。最近有用户电脑遇到了蓝屏问题,正常使用电脑的时候常常会出现了蓝屏错误代码“MACHINE-CHECK-EXCEPTION”。那么遇到这个问题要怎么去进行系统的重装呢?来看看以下的具体操作方法教学吧。 准备工作…

JVM/GC/CMS

CMS (Concurrent Mark Sweep) jdk1.4后期版本开始引入的新gc算法ParNew(新生代) CMS(老年代)组合使用使用标记-清除算法目标:适合于B/S等对响应时间要求高的场景缺点:运行结束产生大量空间碎片缺点:由于分配给用户使用的老年代空间不足造成…

一文快速回顾 Servlet、Filter、Listener

什么是Servlet? 前置知识: Web 服务器:可以指硬件上的,也可以指软件上的。从硬件的角度来说, Web 服务器指的就是一台存储了网络服务软件的计算机;从软件的角度来说, Web 服务器指的是一种软件…

使用codon加速你的python程序

使用codon加速你的python程序 作为高性能 Python 编译器,Codon 可将 Python 代码编译为本机机器代码,而无需任何运行时开销。在单线程上,Python 的典型加速大约为 10-100 倍或更多。Codon 的性能通常与 C/C 的性能相当。与 Python 不同&#…

Three.js教程:第一个3D场景

推荐:将NSDT场景编辑器加入你3D工具链其他工具系列:NSDT简石数字孪生下面的代码完整展示了通过three.js引擎创建的一个三维场景,在场景中绘制并渲染了一个立方体的效果,为了大家更好的宏观了解three.js引擎, 尽量使用了…

【linux】进程和线程的几种状态及状态切换

文章目录一、进程的状态1.1 进程的三种状态1.2 三种状态转换图1.3 三种状态之间的转换1.4 linux下的进程进程状态二、线程的状态三、总结一、进程的状态 1.1 进程的三种状态 进程状态:一个进程的生命周期可以划分为一组状态,这些状态刻画了整个进程。进…

安装spacy+zh_core_web_sm避坑指南

目录 一、spacy简介 二、安装spacy 三、安装zh_core_web_sm 四、安装en_core_web_sm 五、效果测试 5.1 英文测试 5.2 中文测试 一、spacy简介 spacy是Python自然语言处理(NLP)软件包,可以对自然语言文本做词性分析、命名实体识别、依赖…

Java数组的四种拷贝方式

🎉🎉🎉点进来你就是我的人了 博主主页:🙈🙈🙈戳一戳,欢迎大佬指点!人生格言:当你的才华撑不起你的野心的时候,你就应该静下心来学习! 欢迎志同道合的朋友一起加油喔🦾&am…

ERTEC200P-2 PROFINET设备完全开发手册(3-2)

周期数据分为两大类,输出数据OutputData和输入数据InputData,输出数据是PLC发送给设备的;输入数据是设备发送给PLC。如果采用标准接口(SI),读取输出数据和写入输入数据都是一次初始化数据读写调用和一次/多…

【ChatGPT】多国“围堵”,万人抵制,AI发展的红线到底在哪?

个人主页:【😊个人主页】 文章目录前言Chatgpt💻💻💻多国拟发ChatGPT禁令🈲🈲🈲开端发展高潮联名抵制自我辩解🎛️🎛️🎛️名家争言比尔盖茨&…

TiDB进阶篇-TiKV架构

简介 简要的介绍下TiKV的架构。 底层存储RocksDB RocksDB的写操作 在写入WAL的时候为了防止操作系统写入的时候有缓存,要设置操作系统的参数sync_logtrue,也就是说只要有数据就执行刷写到磁盘,就不会存储到操作系统的缓存了。MemTable的数据…

【Python】无限逼近求积分

✨博文作者 wangzirui32 💖 喜欢的可以 点赞 收藏 关注哦~~ 👉本文首发于CSDN,未经许可禁止转载 Hello,大家好,我是wangzirui32,今天我们来学习如何用Python无限逼近求积分,开始学习吧&#xff…

krita源码提供了Tarball 和KDE Repository两套源码的区别

krita系列文章目录 文章目录krita系列文章目录前言一、Tarball 和KDE Repository区别是什么?二、使用步骤前言 krita官方主页 krita官方下载界面 krita源码提供了Tarball 和KDE Repository两套源码,我一下就懵圈了,不知道两者的区别 一…

第二章 自然语言处理与单词的分布式表示

目录2.1 自然语言处理(Natural Language Processing,NLP)2.2 同义词词典2.2.1 WordNet2.2.2 同义词词典的问题2.3 基于计数的方法2.3.1 基于 Python的语料库的预处理2.3.2 单词的分布式表示2.3.3 分布式假设2.3.4 共现矩阵2.3.5 向量间的相似…