Flink学习笔记(二):Flink内存模型

news2025/1/19 23:20:42

文章目录

  • 1、配置总内存
  • 2、JobManager 内存模型
  • 3、TaskManager 内存模型
  • 4、图形化展示
  • 5、实际案例计算内存分配

1、配置总内存

Flink JVM 进程的进程总内存(Total Process Memory)包含了由 Flink 应用使用的内存(Flink 总内存)以及由运行 Flink 的 JVM 使用的内存。 Flink 总内存(Total Flink Memory)包括 JVM 堆内存(Heap Memory)和堆外内存(Off-Heap Memory)。 其中堆外内存包括直接内存(Direct Memory)和本地内存(Native Memory)。详细的配置参数:https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/deployment/config.html

配置 Flink 进程内存最简单的方法是指定以下两个配置项中的任意一个:

配置项TaskManager 配置参数JobManager 配置参数
Flink 总内存taskmanager.memory.flink.sizejobmanager.memory.flink.size
进程总内存taskmanager.memory.process.sizejobmanager.memory.process.size

Flink 启动需要明确配置:

TaskManagerJobManager
taskmanager.memory.flink.sizejobmanager.memory.flink.size
taskmanager.memory.process.sizejobmanager.memory.process.size
taskmanager.memory.task.heap.size 和
taskmanager.memory.managed.sizejobmanager.memory.heap.size

不建议同时设置进程总内存和 Flink 总内存。 这可能会造成内存配置冲突,从而导致部署失败。 额外配置其他内存部分时,同样需要注意可能产生的配置冲突。
JVM参数

2、JobManager 内存模型

JobManager内存模型
如上图所示,下表中列出了 Flink JobManager 内存模型的所有组成部分,以及影响其大小的相关配置参数。

组成部分配置参数描述
JVM 堆内存jobmanager.memory.heap.sizeJobManager 的 JVM 堆内存。
堆外内存jobmanager.memory.off-heap.sizeJobManager 的堆外内存(直接内存或本地内存)。
JVM Metaspacejobmanager.memory.jvm-metaspace.sizeFlink JVM 进程的 Metaspace。
JVM 开销jobmanager.memory.jvm-overhead.min、jobmanager.memory.jvm-overhead.max、jobmanager.memory.jvm-overhead.fraction用于其他 JVM 开销的本地内存,例如栈空间、垃圾回收空间等。该内存部分为基于进程总内存的受限的等比内存部分。

如配置总内存中所述,另一种配置 JobManager 内存的方式是明确指定 JVM 堆内存的大小(jobmanager.memory.heap.size)。 通过这种方式,用户可以更好地掌控用于以下用途的 JVM 堆内存大小。

3、TaskManager 内存模型

内存模型详解
如上图所示,下表中列出了 Flink TaskManager 内存模型的所有组成部分,以及影响其大小的相关配置参数。

组成部分配置参数描述
框架堆内存(Framework Heap Memory)taskmanager.memory.framework.heap.size用于 Flink 框架的 JVM 堆内存(进阶配置)。
任务堆内存(Task Heap Memory)taskmanager.memory.task.heap.size用于 Flink 应用的算子及用户代码的 JVM 堆内存。
托管内存(Managed memory)taskmanager.memory.managed.size、taskmanager.memory.managed.fraction由 Flink 管理的用于排序、哈希表、缓存中间结果及 RocksDB State Backend 的本地内存。
框架堆外内存(Framework Off-heap Memory)taskmanager.memory.framework.off-heap.size用于 Flink 框架的堆外内存(直接内存或本地内存)(进阶配置)。
任务堆外内存(Task Off-heap Memory)taskmanager.memory.task.off-heap.size用于 Flink 应用的算子及用户代码的堆外内存(直接内存或本地内存)。
网络内存(Network Memory)taskmanager.memory.network.min、taskmanager.memory.network.max、taskmanager.memory.network.fraction用于任务之间数据传输的直接内存(例如网络传输缓冲)。该内存部分为基于 Flink 总内存的受限的等比内存部分。
JVM Metaspacetaskmanager.memory.jvm-metaspace.sizeFlink JVM 进程的 Metaspace。
JVM 开销taskmanager.memory.jvm-overhead.min、taskmanager.memory.jvm-overhead.max、taskmanager.memory.jvm-overhead.fraction用于其他 JVM 开销的本地内存,例如栈空间、垃圾回收空间等。该内存部分为基于进程总内存的受限的等比内存部分。

我们可以看到,有些内存部分的大小可以直接通过一个配置参数进行设置,有些则需要根据多个参数进行调整。通常情况下,不建议对框架堆内存和框架堆外内存进行调整。 除非你非常肯定 Flink 的内部数据结构及操作需要更多的内存。 这可能与具体的部署环境及作业结构有关,例如非常高的并发度。 此外,Flink 的部分依赖(例如 Hadoop)在某些特定的情况下也可能会需要更多的直接内存或本地内存。

4、图形化展示

JobManager 内存直观展示
Job
TaskManager 内存直观展示
task
树状图表示:
树状图

5、实际案例计算内存分配

如果是 Flink On YARN 模式下:

taskmanager.memory.process.size = 4096 MB = 4G
taskmanager.memory.network.fraction = 0.15
taskmanager.memory.managed.fraction = 0.45

然后根据以上参数,就可以计算得到各部分的内存大小:

taskmanager.memory.jvm-overhead = 4096 * 0.1 = 409.6 MB
taskmanager.memory.flink.size = 4096 - 409.6 - 256 = 3430.4 MB
taskmanager.memory.network = 3430.4 * 0.15 = 514.56 MB
taskmanager.memory.managed = 3430.4 * 0.45 = 1543.68 MB
taskmanager.memory.task.heap.size = 3430.4 - 128 * 2 - 1543.68 - 514.56 = 1116.16 MB

实际案例

Flink 启动参考配置参数:

/home/dev/soft/flink/bin/flink run \
	-m yarn-cluster \
	-yD akka.ask.timeout='360 s' \
	-yD akka.framesize=20485760b \
	-yD blob.fetch.backlog=1000 \
	-yD blob.fetch.num-concurrent=500 \
	-yD blob.fetch.retries=50 \
	-yD blob.storage.directory=/data1/flinkdir \
	-yD env.java.opts.jobmanager='-XX:ErrorFile=/tmp/java_error_%p.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=512m -XX:+UseG1GC -XX:MaxGCPauseMillis=300 -XX:InitiatingHeapOccupancyPercent=50 -XX:+ExplicitGCInvokesConcurrent -XX:+AlwaysPreTouch -XX:AutoBoxCacheMax=20000 -XX:G1HeapWastePercent=5 -XX:G1ReservePercent=25 -Dfile.encoding=UTF-8' \
	-yD env.java.opts.taskmanager='-XX:ErrorFile=/tmp/java_error_%p.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:MetaspaceSize=256m -XX:MaxMetaspaceSize=1024m -XX:+UseG1GC -XX:MaxGCPauseMillis=300 -XX:InitiatingHeapOccupancyPercent=50 -XX:+ExplicitGCInvokesConcurrent -XX:+AlwaysPreTouch -XX:AutoBoxCacheMax=20000 -Dsun.security.krb5.debug=false -Dfile.encoding=UTF-8' \
	-yD env.java.opts='-XX:ErrorFile=/tmp/java_error_%p.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:MetaspaceSize=256m -XX:MaxMetaspaceSize=1024m -Dfile.encoding=UTF-8' \
	-yD execution.attached=false \
	-yD execution.buffer-timeout='1000 ms' \
	-yD execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION \
	-yD execution.checkpointing.interval='30 min' \
	-yD execution.checkpointing.max-concurrent-checkpoints=1 \
	-yD execution.checkpointing.min-pause='2 min' \
	-yD execution.checkpointing.mode=EXACTLY_ONCE \
	-yD execution.checkpointing.timeout='28 min' \
	-yD execution.checkpointing.tolerable-failed-checkpoints=8 \
	-yD execution.checkpointing.unaligned=true \
	-yD execution.checkpointing.unaligned.forced=true \
	-yD heartbeat.interval=60000 \
	-yD heartbeat.rpc-failure-threshold=5 \
	-yD heartbeat.timeout=340000 \
	-yD io.tmp.dirs=/data1/flinkdir \
	-yD jobmanager.heap.size=1024m \
	-yD jobmanager.memory.jvm-metaspace.size=268435456b \
	-yD jobmanager.memory.jvm-overhead.max=1073741824b \
	-yD jobmanager.memory.jvm-overhead.min=1073741824b \
	-yD jobmanager.memory.network.fraction=0.2 \
	-yD jobmanager.memory.network.max=6GB \
	-yD jobmanager.memory.off-heap.size=134217728b \
	-yD jobmanager.memory.process.size='18360 mb' \
	-yD metrics.reporter.promgateway.deleteOnShutdown=true \
	-yD metrics.reporter.promgateway.factory.class=org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory \
	-yD metrics.reporter.promgateway.filter.includes=\*:dqc\*,uptime,taskSlotsTotal,numRegisteredTaskManagers,taskSlotsAvailable,numberOfFailedCheckpoints,numRestarts,lastCheckpointDuration,Used,Max,Total,Count,Time:gauge,meter,counter,histogram \
	-yD metrics.reporter.promgateway.groupingKey="yarn=${yarn};hdfs=${hdfs};job_name=TEST-broadcast-${jobName//./-}-${provId}" \
	-yD metrics.reporter.promgateway.host=172.17.xxxx.xxxx \
	-yD metrics.reporter.promgateway.interval='60 SECONDS' \
	-yD metrics.reporter.promgateway.jobName="TEST-broadcast-${jobName//./-}-${provId}" \
	-yD metrics.reporter.promgateway.port=10080 \
	-yD metrics.reporter.promgateway.randomJobNameSuffix=true \
	-yD pipeline.name="TEST-broadcast-${jobName//./-}-${provId}" \
	-yD pipeline.object-reuse=true \
	-yD rest.flamegraph.enabled=true \
	-yD rest.server.numThreads=20 \
	-yD restart-strategy.failure-rate.delay='60 s' \
	-yD restart-strategy.failure-rate.failure-rate-interval='3 min' \
	-yD restart-strategy.failure-rate.max-failures-per-interval=3 \
	-yD restart-strategy=failure-rate \
	-yD security.kerberos.krb5-conf.path=/home/dev/kerberos/krb5.conf \
	-yD security.kerberos.login.contexts=Client,KafkaClient \
	-yD security.kerberos.login.keytab=/home/dev/kerberos/xxxx.keytab \
	-yD security.kerberos.login.principal=xxxx \
	-yD security.kerberos.login.use-ticket-cache=false \
	-yD state.backend.async=true \
	-yD state.backend=hashmap \
	-yD state.checkpoints.dir=hdfs://xxxx/flink/checkpoint/${jobName//.//}/$provId \
	-yD state.checkpoint-storage=filesystem \
	-yD state.checkpoints.num-retained=3 \
	-yD state.savepoints.dir=hdfs://xxxx/flink/savepoint/${jobName//.//}/$provId \
	-yD table.exec.hive.fallback-mapred-writer=false \
	-yD task.manager.memory.segment-size=4mb \
	-yD taskmanager.memory.framework.off-heap.size=1GB \
	-yD taskmanager.memory.managed.fraction=0.2 \
	-yD taskmanager.memory.network.fraction=0.075 \
	-yD taskmanager.memory.network.max=16GB \
    -yD taskmanager.memory.process.size='50 gb' \
	-yD taskmanager.network.netty.client.connectTimeoutSec=600 \
	-yD taskmanager.network.request-backoff.max=120000 \
	-yD taskmanager.network.retries=100 \
    -yD taskmanager.numberOfTaskSlots=10 \
	-yD web.timeout=900000 \
	-yD web.upload.dir=/data1/flinkdir \
	-yD yarn.application.name="TEST-broadcast-${jobName//./-}-${provId}" \
	-yD yarn.application.queue=$yarnQueue \
	-yD yarn.application-attempts=10 \

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1069831.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何把pdf转换成word?4个简单方法效果好

如何把pdf转换成word?大家对于pdf和word两种电脑文件都不陌生吧,毕竟我们平时几乎每天都会使用到它们。将PDF转换为Word文档的一个主要原因是为了方便编辑和修改文档,尽管PDF是一个非常流行的电子文档格式,但如果想要编辑或修改其…

Java数组:没错,不装了我就是书架。

👑专栏内容:Java⛪个人主页:子夜的星的主页💕座右铭:前路未远,步履不停 目录 一、数组的概念1、什么是数组?2、数组的创建3、数组的初始化Ⅰ、动态初始化Ⅱ、静态初始化 二、数组的使用1、数组中…

OLED透明拼接屏的完美融合,唐山的历史遗迹与现代科技

引言:作为河北省的一个重要城市,唐山以其丰富的历史文化和独特的自然风光吸引着众多游客。 近年来,随着科技的不断进步,OLED透明拼接屏的应用为唐山增添了一道新的城市风景线。 作为一名资深工程师,我将介绍唐山的历…

kafka的请求处理机制

目录 前言: kafak是如何处理请求的? 控制请求与数据类请求 参考资料 前言: 无论是 Kafka 客户端还是 Broker 端,它们之间的交互都是通过“请求 / 响应”的方式完成的。比如,客户端会通过网络发送消息生产请求给 B…

Linux 守护进程

一 何为守护进程 守护进程( Daemon )也称为精灵进程,是运行在后台的一种特殊进程,它独立于控制终端并且周期性 地执行某种任务或等待处理某些事情的发生,主要表现为以下两个特点: 长期运行。守护进程是一…

【DRAM存储器十二】SDRAM介绍-各类写操作详解

👉个人主页:highman110 👉作者简介:一名硬件工程师,持续学习,不断记录,保持思考,输出干货内容 参考资料:《镁光SDRAM数据手册》、《PC SDRAM specification》 目录 写操…

【ElasticSearch】基于Docker 部署 ElasticSearch 和 Kibana,使用 Kibana 操作索引库,以及实现对文档的增删改查

文章目录 前言一、使用 Docker 部署 ElasticSearch 和 Kibana1.1 部署 ElasticSearch1.2 部署 Kibana1.3 利用 Kibana 演示 Elasticsearch 分词效果 二、解决中文分词的问题2.1 默认分词器对中文分词的问题2.2 引入 IK 分词器2.3 IK 分词器的两种分词模式2.4 IK 分词器存在的问…

HTTP爬虫IP:流量or数量计费模式那个更适合爬虫?

在使用HTTP爬虫IP时,我们常常需要考虑计费方式:按流量计费还是按数量计费。这两种计费方式各有优势,但是哪种更加划算呢?本文将为您深入探讨HTTP爬虫IP的流量计费和数量计费的特点、适用场景以及选择适合自己的计费方式的实用技巧…

MySQL学习笔记(快速入门)

一、数据库相关概念 名称全称简称数据库存储数据的仓库,数据是有组织的进行存储DataBase(DB)数据库管理系统操纵和管理数据库的大型软件DataBase Management System (DBMS)SQL操作关系型数据库的编程语言,定义了一套操作关系型数…

[PyTorch][chapter 57][WGAN-GP 代码实现]

前言: 下图为WGAN 的效果图: 绿色为真实数据的分布: 8个高斯分布 红色: 为随机产生的数据分布,跟真实分布基本一致 WGAN-GP: 1 判别器D: 最后一层去掉sigmoid 2 生成器G 和判别器D: loss不取log 3 损失函数…

Spring AOP 详解及@Trasactional

Spring AOP 详解 AOP基础 AOP: Aspect Oriented Program, 面向切面编程。解耦(组织结构调整)、增强(扩展)。 AOP术语 术语 说明 Aspect(切面) 横切于系统的连接点实现特定功能的类 JoinPoint&#xf…

Axios、SASS学习笔记

目录 前言 一、Axios基础认识 1、简介 2、相关文档 3、基本配置 4、基础快捷使用 二、Axios封装 1、公共配置文件 2、细化每个接口的配置 3、使用并发送请求 三、SASS 1、简介 2、相关文档 3、使用前奏 4、使用变量 5、嵌套规则 6、父级选择器标识 & 前言…

小谈设计模式(10)—原型模式

小谈设计模式(10)—原型模式 专栏介绍专栏地址专栏介绍 原型模式角色分类抽象原型(Prototype)具体原型(Concrete Prototype)客户端(Client)原型管理器(Prototype Manager…

《C++ Primer》第4章 表达式(二)

参考资料: 《C Primer》第5版《C Primer 习题集》第5版 4.6 成员访问运算符(P133) 点运算符和箭头运算符都可用于访问成员,ptr->mem 等价于 (*ptr).mem 。箭头作用于指针类型对象,结果为左值;点运算符…

【Mysql】 blob 转text

有个数据表字段存储的字段类型是blob,想查看字段内容。 blob是二进制的无法直接查看怎么办? 写sql,blob 转text SELECT CONVERT(content USING utf8) FROM article_content ; 我想把原来content字段完全转成text 新建 text 类型字段conten…

uniapp 在uni.scss 根据@mixin定义方法 、通过@include全局使用

在官方文档中提及到uni.scss中变量的使用,而我想定义方法,这样写css样式更方便 一、官方文档的介绍 根据官方文档我知道,在这面定义的变量全局都可使用。接下来我要在这里定义方法。 二、在uni.scss文件中定义方法 我在uni.scss文件中定义了…

三、浏览器缓存动如何使用(Expires、 cache-control、Etag、last-modified)----哪些文件需要强缓存,哪些文件需要协商缓存

参考链接1:彻底弄懂强缓存与协商缓存 参考链接2:浏览器缓存 参考链接3:扼杀 304,Cache-Control: immutable 如何搭建 express,或者node服务 ### 如何搭建 express,npm install express --save### expre…

[C++基础]-多态

前言 作者:小蜗牛向前冲 名言:我可以接受失败,但我不能接受放弃 如果觉的博主的文章还不错的话,还请点赞,收藏,关注👀支持博主。如果发现有问题的地方欢迎❀大家在评论区指正。 本期学习目标&am…

智能AI系统源码ChatGPT系统源码+详细搭建部署教程+AI绘画系统+已支持OpenAI GPT全模型+国内AI全模型

一、AI创作系统 SparkAi创作系统是基于OpenAI很火的ChatGPT进行开发的Ai智能问答系统,支持OpenAI GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美,可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如何搭建部署AI创作Chat…

NFT Insider#110:The Sandbox与TB Media Global合作,YGG Web3游戏峰会阵容揭晓

引言:NFT Insider由NFT收藏组织WHALE Members、BeepCrypto出品,浓缩每周NFT新闻,为大家带来关于NFT最全面、最新鲜、最有价值的讯息。每期周报将从NFT市场数据,艺术新闻类,游戏新闻类,虚拟世界类&#xff0…