Flink 优化 (一) --------- 资源配置调优

news2024/11/24 14:11:54

目录

  • 一、内存设置
    • 1. TaskManager 内存模型
    • 2. 生产资源配置示例
  • 二、合理利用 cpu 资源
    • 1. 使用 DefaultResourceCalculator 策略
    • 2. 使用 DominantResourceCalculator 策略
    • 3 使用 DominantResourceCalculator 策略并指定容器 vcore 数
  • 三、并行度设置
    • 1. 全局并行度计算
    • 2. Source 端并行度的配置
    • 3. Transform 端并行度的配置
    • 4. Sink 端并行度的配置


Flink 性能调优的第一步,就是为任务分配合适的资源,在一定范围内,增加资源的分配与性能的提升是成正比的,实现了最优的资源配置后,在此基础上再考虑进行后面论述的性能调优策略。

提交方式主要是 yarn-per-job,资源的分配在使用脚本提交 Flink 任务时进行指定。

➢ 标准的 Flink 任务提交脚本 (Generic CLI 模式)

从 1.11 开始,增加了通用客户端模式,参数使用 -D <property=value> 指定

bin/flink run \
-t yarn-per-job \
-d \
-p 5 \ 指定并行度
-Dyarn.application.queue=test \ 指定 yarn 队列
-Djobmanager.memory.process.size=1024mb \ 指定 JM 的总进程大小
-Dtaskmanager.memory.process.size=1024mb \ 指定每个 TM 的总进程大小
-Dtaskmanager.numberOfTaskSlots=2 \ 指定每个 TM 的 slot 数
-c com.fancy.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

参数列表:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/deployment/config.html


一、内存设置

1. TaskManager 内存模型

在这里插入图片描述

① 内存模型详解

  • JVM 特定内存:JVM 本身使用的内存,包含 JVM 的 metaspace 和 over-head
    1)JVM metaspace:JVM 元空间
    taskmanager.memory.jvm-metaspace.size,默认 256mb
    2)JVM over-head 执行开销:JVM 执行时自身所需要的内容,包括线程堆栈、IO、编译缓存等所使用的内存。
    taskmanager.memory.jvm-overhead.fraction,默认 0.1
    taskmanager.memory.jvm-overhead.min,默认 192mb
    taskmanager.memory.jvm-overhead.max,默认 1gb
    总进程内存*fraction,如果小于配置的 min(或大于配置的 max)大小,则使用 min/max 大小

  • 框架内存:Flink 框架,即 TaskManager 本身所占用的内存,不计入 Slot 的资源中。
    堆内:taskmanager.memory.framework.heap.size,默认 128MB
    堆外:taskmanager.memory.framework.off-heap.size,默认 128MB

  • Task 内存:Task 执行用户代码时所使用的内存
    堆内:taskmanager.memory.task.heap.size,默认 none,由 Flink 内存扣除掉其他部分的内存得到。
    堆外:taskmanager.memory.task.off-heap.size,默认 0,表示不使用堆外内存

  • 网络内存:网络数据交换所使用的堆外内存大小,如网络数据交换缓冲区
    堆外:taskmanager.memory.network.fraction,默认 0.1
    taskmanager.memory.network.min,默认 64mb
    taskmanager.memory.network.max,默认 1gb
    Flink 内存*fraction,如果小于配置的 min(或大于配置的 max)大小,则使用 min/max大小

  • 托管内存:用于 RocksDB State Backend 的本地内存和批的排序、哈希表、缓存中间结果
    堆外:taskmanager.memory.managed.fraction,默认 0.4
    taskmanager.memory.managed.size,默认 none
    如果 size 没指定,则等于 Flink 内存*fraction

② 案例分析
基于Yarn模式,一般参数指定的是总进程内存,taskmanager.memory.process.size,比如指定为 4G,每一块内存得到大小如下:

(1) 计算 Flink 内存
JVM 元空间 256mJVM 执行开销: 4g*0.1=409.6m,在[192m,1g]之间,最终结果 409.6m
Flink 内存=4g-256m-409.6m=3430.4m

(2) 网络内存=3430.4m*0.1=343.04m,在[64m,1g]之间,最终结果 343.04m

(3) 托管内存=3430.4m*0.4=1372.16m

(4) 框架内存,堆内和堆外都是 128m

(5) Task 堆内内存=3430.4m-128m-128m-343.04m-1372.16m=1459.2m

在这里插入图片描述
在这里插入图片描述
所以进程内存给多大,每一部分内存需不需要调整,可以看内存的使用率来调整。

2. 生产资源配置示例

bin/flink run \
-t yarn-per-job \
-d \
-p 5 \ 指定并行度
-Dyarn.application.queue=test \ 指定 yarn 队列
-Djobmanager.memory.process.size=2048mb \ JM2~4G 足够
-Dtaskmanager.memory.process.size=4096mb \ 单个 TM2~8G 足够
-Dtaskmanager.numberOfTaskSlots=2 \ 与容器核数 1core:1slot 或 2core:1slot
-c com.fancy.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

Flink 是实时流处理,关键在于资源情况能不能抗住高峰时期每秒的数据量,通常用QPS/TPS 来描述数据情况。

二、合理利用 cpu 资源

Yarn 的容量调度器默认情况下是使用“DefaultResourceCalculator”分配策略,只根据内存调度资源,所以在 Yarn 的资源管理页面上看到每个容器的 vcore 个数还是 1。

可以修改策略为 DominantResourceCalculator,该资源计算器在计算资源的时候会综合考虑 cpu 和内存的情况。在 capacity-scheduler.xml 中修改属性:

<property>
	<name>yarn.scheduler.capacity.resource-calculator</name>
	<!-- <value>org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator</value>-->
	<value>org.apache.hadoop.yarn.util.resource.DominantResourceCalculator</value>
</property>

1. 使用 DefaultResourceCalculator 策略

bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=4096mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.fancy.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

可以看到一个容器只有一个 vcore:

在这里插入图片描述

2. 使用 DominantResourceCalculator 策略

修改后 yarn 配置后,分发配置并重启 yarn,再次提交 flink 作业:

bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=4096mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.fancy.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

看到容器的 vcore 数变了:

JobManager 1 个,占用 1 个容器,vcore=1
TaskManager 3 个,占用 3 个容器,每个容器 vcore=2,总 vcore=2*3=6,因为默认单个容器的 vcore 数=单 TM 的 slot 数

3 使用 DominantResourceCalculator 策略并指定容器 vcore 数

指定 yarn 容器的 vcore 数,提交:

bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Dyarn.containers.vcores=3 \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=4096mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.fancy.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

在这里插入图片描述

JobManager1 个,占用 1 个容器,vcore=1
TaskManager3 个,占用 3 个容器,每个容器 vcore =3,总 vcore=3*3=9

三、并行度设置

1. 全局并行度计算

开发完成后,先进行压测。任务并行度给 10 以下,测试单个并行度的处理上限。然后总 QPS/单并行度的处理能力 = 并行度。开发完 Flink 作业,压测的方式很简单,先在 kafka 中积压数据,之后开启 Flink 任务,出现反压,就是处理瓶颈。相当于水库先积水,一下子泄洪。

不能只从 QPS 去得出并行度,因为有些字段少、逻辑简单的任务,单并行度一秒处理几万条数据。而有些数据字段多,处理逻辑复杂,单并行度一秒只能处理 1000 条数据。

最好根据高峰期的 QPS 压测,并行度*1.2 倍,富余一些资源。

2. Source 端并行度的配置

数据源端是 Kafka,Source 的并行度设置为 Kafka 对应 Topic 的分区数。如果已经等于 Kafka 的分区数,消费速度仍跟不上数据生产速度,考虑下 Kafka 要扩大分区,同时调大并行度等于分区数。

Flink 的一个并行度可以处理一至多个分区的数据,如果并行度多于 Kafka 的分区数,那么就会造成有的并行度空闲,浪费资源。

3. Transform 端并行度的配置

➢ Keyby 之前的算子
一般不会做太重的操作,都是比如 map、filter、flatmap 等处理较快的算子,并行度可以和 source 保持一致。

➢ Keyby 之后的算子
如果并发较大,建议设置并行度为 2 的整数次幂,例如:128、256、512;小并发任务的并行度不一定需要设置成 2 的整数次幂;大并发任务如果没有 KeyBy,并行度也无需设置为 2 的整数次幂;

4. Sink 端并行度的配置

Sink 端是数据流向下游的地方,可以根据 Sink 端的数据量及下游的服务抗压能力进行评估。如果 Sink 端是 Kafka,可以设为 Kafka 对应 Topic 的分区数。Sink 端的数据量小,比较常见的就是监控告警的场景,并行度可以设置的小一些。Source 端的数据量是最小的,拿到 Source 端流过来的数据后做了细粒度的拆分,数据量不断的增加,到 Sink 端的数据量就非常大。那么在 Sink 到下游的存储中间件的时候就需要提高并行度。

另外 Sink 端要与下游的服务进行交互,并行度还得根据下游的服务抗压能力来设置,如果在 Flink Sink 这端的数据量过大的话,且 Sink 处并行度也设置的很大,但下游的服务完全撑不住这么大的并发写入,可能会造成下游服务直接被写挂,所以最终还是要在 Sink处的并行度做一定的权衡。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/425313.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

和猿辅导国奖选手的妈妈聊聊:数学新生代的成长之路

2023年第64届IMO中国国家队名单公布&#xff0c;来自猿辅导的学员王淳稷、孙启傲在此次国家队选拔赛中总成绩排名分列第一、第二&#xff0c;将于今年7月代表中国奔赴日本参加IMO竞赛。 值得一提的是&#xff0c;孙启傲同学继入选2022年IMO国家集训队、获阿里巴巴全球数学竞赛…

ubuntu(20.04)-shell脚本(2)echo-date-awk-sed-iptables-shell变量数组

1.echo 语法:echo [-ne][字符串]补充说明: 1、echo会将输入的字符串送往标准输出。 2、输出的字符串间以空白字符隔开,并在最后加上换行号。OPTIONS&#xff1a; -n 不要在最后自动换行 -e 若字符串中出现以下字符&#xff0c;则特别加以处理&#xff0c;而不会将它当成一般文…

【学习时序论文】

目录【2021 NeurIPS】Autoformer: Decomposition Transformers with Auto-Correlation for Long-Term Series Forecasting【2022 ICML】FEDformer: Frequency Enhanced Decomposed Transformer for Long-term Series Forecasting【2023 ICLR】TIMESNET: TEMPORAL 2D-VARIATION …

deque,stack,quque容器

一、deque 1.基本概念 功能: 双端数组&#xff0c;可以对头端进行插入删除操作 deque与vector区别: vector对于头部的插入删除效率低&#xff0c;数据量越大&#xff0c;效率越低. deque相对而言&#xff0c;对头部的插入删除速度会比vector快 vector访问元素时的速度会比de…

NDK编译脚本

一、如何通过NDK进行编译。 1、新建jni文件夹&#xff0c;并将Android.mk、Applicatio n.mk、源文件都放入其中。 2、编写Android.mk文件。 LOCAL_PATH : $(call my-dir) include $(CLEAR_VARS) LOCAL_MODULE: test LOCAL_C_ALL_FILES : test.c LOCAL_SRC_FILES : $(LOCAL_C_…

centos7虚拟机在集群zookeeper上面配置hbase的具体操作步骤

系列文章目录 centos7配置静态网络常见问题归纳_centos7网络问题 centos7克隆虚拟机完成后的的一些配置介绍 虚拟机centos7配置Hadoop单节点伪分布配置教程 卸载centos7自带的jdk的操作步骤 centos7配置zookeeper本地模式与集群模式的详细教程 centos7虚拟机配置集群时间…

HTML引入Typescript编译JS文件 :Uncaught ReferenceError: exports is not defined

初学TypeScript&#xff0c;尝试在html引入ts编译出来的js文件: 报错&#xff1a;Uncaught ReferenceError: exports is not defined 以下是代码&#xff1a; 创建了TS:加入export {}形成独立的作用域&#xff0c;其他ts文件重复声明相同名称的变量。 export {} let str &…

Python和Java二选一该学啥?

首先我们需要了解Python和 Java分别是什么 根据IEEE Spectrum 2022年编程语言排名前十的分别是&#xff1a;Python&#xff0c;C&#xff0c;C&#xff0c;C#&#xff0c;Java&#xff0c;SQL&#xff0c;JavaScript&#xff0c;R&#xff0c;HTML&#xff0c;TypeScript。从该…

专访丨AWS量子网络中心科学家Antía Lamas谈量子计算

​ Anta Lamas Linares&#xff08;图片来源&#xff1a;网络&#xff09; 47岁的Anta Lamas Linares出生于西班牙西北部的圣地亚哥德孔波斯特拉。她在当地学习物理学&#xff0c;然后在牛津大学和加利福尼亚继续深造。后来&#xff0c;她在新加坡领导了亚马逊网络服务&#xf…

Java中线程的常用操作-后台线程、自定义线程工厂ThreadFactpry、join加入一个线程、线程异常捕获

场景 Java中Thread类的常用API以及使用示例&#xff1a; Java中Thread类的常用API以及使用示例_霸道流氓气质的博客-CSDN博客 上面讲了Thread的常用API&#xff0c;下面记录下线程的一些常用操作。 注&#xff1a; 博客&#xff1a;霸道流氓气质的博客_CSDN博客-C#,架构之…

Doris(4):建表

可以通过在mysql-client中执行以下 help 命令获得更多帮助&#xff1a; help create table 1 基本概念 在 Doris 中&#xff0c;数据都以表&#xff08;Table&#xff09;的形式进行逻辑上的描述。 1.1 Row & Column 一张表包括行&#xff08;Row&#xff09;和列&#…

从零开始:如何集成美颜SDK到你的应用中

现在&#xff0c;随着人们对于美的追求不断提升&#xff0c;美颜应用已经成为了人们生活中不可或缺的一部分。在应用中&#xff0c;美颜功能的实现离不开美颜SDK的支持。那么&#xff0c;如何集成美颜SDK到你的应用中呢&#xff1f;下面&#xff0c;我们就来一步步了解。 第一…

Linux复习 / 线程相关----线程互斥 QA梳理

文章目录前言线程互斥Q&#xff1a;什么是临界资源&#xff1f;临界区呢&#xff1f;Q&#xff1a;什么是互斥&#xff1f;Q&#xff1a;数据不一致的本质是什么&#xff1f;Q&#xff1a;用锁对共享资源进行保护的前提是&#xff1a;锁也要作为共享资源被其他线程使用。那么用…

独家 | 招商银行:玩转校园招聘新方式 挖掘金融科技新人才

数字经济时代&#xff0c;金融科技人才队伍的引进与培养是招商银行人才体系建设的关键任务。 01.金融科技校招2大核心课题 招商银行数字化转型过程中&#xff0c;线上化、生态化、平台化、智能化、数据化全面加速发展&#xff0c;对人才队伍能力提出新要求。 2大核心课题&am…

Git的一些使用

虽然说这也不是啥重要的内容&#xff0c;但是作为计算机人也得学学&#xff0c;了解了解。 一些预备内容 首先得下载git&#xff0c;这个就不多说了。 安装完了之后&#xff0c;首先要做的就是设置用户名称和邮箱地址&#xff0c;因为每次Git提交都会使用该信息&#xff0c;…

I.MX6ULL_Linux_驱动篇(33) pinctrl与gpio子系统

上一章我们编写了基于设备树的 LED 驱动&#xff0c;但是驱动的本质还是没变&#xff0c;都是配置 LED 灯所使用的 GPIO 寄存器&#xff0c;驱动开发方式和裸机基本没啥区别。 Linux 是一个庞大而完善的系统&#xff0c;尤其是驱动框架&#xff0c;像 GPIO 这种最基本的驱动不可…

Linux实战学习

文章目录一、Linux权限信息权限控制信息chmodifconfigpingnmap netstatps killzip unzip常用快捷键二、搭建Java环境yumJDKTomcatMysql三、部署Web项目到服务器一、Linux权限信息 Linux中&#xff0c;拥有最大权限的账户为: root(超级管理员)&#xff0c;而普通用户在很多地方…

UWB成为智慧工厂时代的代表技术

UWB成为智慧工厂时代的代表技术 随着智慧工厂的到来&#xff0c;在人员安全问题较为重要的行业中&#xff0c;为了避免人员安全事故的出现&#xff0c;各家企业都逐步装备了UWB定位系统。UWB信号的辐射非常低&#xff0c;通常只有手机辐射的千分之一&#xff0c;因此在工业上应…

【 Spring MVC 核心功能(二) - 获取参数(上)】

文章目录一、获取单个参数二、获取多个参数三、获取对象四、后端参数重命名4.1 使用 RequestParam 重命名参数4.2 RequestParam 中参数必传4.3 设置非必传参数五、使用 PathVariable 获取URL中参数一、获取单个参数 在 Spring MVC 中可以直接⽤⽅法中的参数来实现传单个参&…

uni-app:登录与支付-- 三秒后自动跳转

三秒后自动跳转 三秒后自动跳转到登录页面 需求描述&#xff1a;在购物车页面&#xff0c;当用户点击 “结算” 按钮时&#xff0c;如果用户没有登录&#xff0c;则 3 秒后自动跳转到登录页面 在 my-settle 组件的 methods 节点中&#xff0c;声明一个叫做 showTips 的方法&am…