Flink JobManager 内存占用大 问题

news2024/11/25 20:34:16

Flink JobManager 内存占用大问题

问题描述

当在 本地启动一个 flink 简单的 job 时候,发现出现了 heap outMemeory 问题,
然后就不假思索的 调整了 jvm 的 heap -Xms1000m -Xmx16000m 参数,就可以正常的启动了。
通过 jvisualvm 连接上 这个 jvm process,参看 堆大小 竟然达到了 4、5G。
flink jobManager 大内存 jvm 图1

解决过程

直到最近才有时间,来探究一下 到底 为什么 要占用 这么大的内存?

我们下 去掉 jvm 配置 的 heap -Xms1000m -Xmx16000m 参数,看看程序哪里报的错。

Exception in thread "main" com.yyb.flink.core.exception.StreamBasicException: Context submit error
	at com.yyb.flink.core.context.AbstractContextProxy.submit(AbstractContextProxy.java:72)
	at com.yyb.flink.core.context.AbstractContextProxy.submit(AbstractContextProxy.java:101)
	at com.yyb.flink.app.table.dim.dataGen.JoinWithDataGenTable.main(JoinWithDataGenTable.java:39)
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'JoinWithDataGenTable'.
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1969)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1847)
	at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:69)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1833)
	at com.yyb.flink.core.context.AbstractContextProxy.IfPresentSinkExecute(AbstractContextProxy.java:94)
	at com.yyb.flink.core.context.AbstractContextProxy.submit(AbstractContextProxy.java:69)
	... 2 more
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
	at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
	at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
	at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
	at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: java.lang.OutOfMemoryError: Java heap space
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
	... 7 more
Caused by: java.lang.OutOfMemoryError: Java heap space
	at java.util.ArrayDeque.allocateElements(ArrayDeque.java:147)
	at java.util.ArrayDeque.<init>(ArrayDeque.java:203)
	at org.apache.flink.runtime.executiongraph.failover.flip1.FailureRateRestartBackoffTimeStrategy.<init>(FailureRateRestartBackoffTimeStrategy.java:59)
	at org.apache.flink.runtime.executiongraph.failover.flip1.FailureRateRestartBackoffTimeStrategy$FailureRateRestartBackoffTimeStrategyFactory.create(FailureRateRestartBackoffTimeStrategy.java:153)
	at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:97)
	at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
	at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340)
	at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317)
	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107)
	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory$$Lambda$1246/1142234774.get(Unknown Source)
	at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
	at org.apache.flink.util.function.FunctionUtils$$Lambda$1247/405573242.get(Unknown Source)
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
	... 7 more

然后我们找到 代码所在的位置:
FailureRateRestartBackoffTimeStrategy.class

FailureRateRestartBackoffTimeStrategy(
            Clock clock, int maxFailuresPerInterval, long failuresIntervalMS, long backoffTimeMS) {

        checkArgument(
                maxFailuresPerInterval > 0,
                "Maximum number of restart attempts per time unit must be greater than 0.");
        checkArgument(failuresIntervalMS > 0, "Failures interval must be greater than 0 ms.");
        checkArgument(backoffTimeMS >= 0, "Backoff time must be at least 0 ms.");

        this.failuresIntervalMS = failuresIntervalMS;
        this.backoffTimeMS = backoffTimeMS;
        this.maxFailuresPerInterval = maxFailuresPerInterval;
        this.failureTimestamps = new ArrayDeque<>(maxFailuresPerInterval);	//这里
        this.strategyString = generateStrategyString();
        this.clock = checkNotNull(clock);
    }

ArrayDeque.class

public ArrayDeque(int numElements) {
        allocateElements(numElements);
    }
private void allocateElements(int numElements) {
        elements = new Object[calculateSize(numElements)]; //这里
}

可以知道,如果这个 numElements、maxFailuresPerInterval 设置的 比较大的话,那么这里就会直接 申请 这么大 的 object数组,就有可能 heap OutOfMemoryError。
回想到 我们曾经 设置 flink FailureRateRestartStrategyConfiguration 的 次数 为 Integer.MAX_VALUE,那么就 将通了。
为什么要设置这么大的失败重启次数,当时是因为 下载 s3文件,时不时会出现 timeOut 问题,所以 flink 的 FailureRateRestartStrategyConfiguration 设置为 Integer.MAX_VALUE,没有想到 致使 jobManager 的 内存占用 变得这么大了。

解决效果

设置 FailureRateRestartStrategyConfiguration 的 次数 为 3
flink jobManager 大内存 jvm 图2
设置 FailureRateRestartStrategyConfiguration 的 次数 为 10000.
flink jobManager 大内存 jvm 图3

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/52264.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

实验七 循环神经网络(3)LSTM的记忆能力实验

目录6.3 LSTM的记忆能力实验6.3.1 模型构建6.3.1.1 LSTM层6.3.1.2 模型汇总6.3.2 模型训练6.3.2.1 训练指定长度的数字预测模型6.3.2.2 多组训练6.3.2.3 损失曲线展示6.3.3 模型评价6.3.3.1 在测试集上进行模型评价6.3.3.2 模型在不同长度的数据集上的准确率变化图6.3.3.3 LSTM…

robfig/cron-go cron定时任务库架构剖析

Cron深度解析 思想 对于cron 这个三方库来说&#xff0c;他可以说是做两件事&#xff0c;其一是&#xff1a;解析cron string&#xff0c;生成一个定时器&#xff0c;达到循环时间发送信号。其二是核心&#xff08;引擎&#xff09;&#xff1a;用以执行&#xff0c;判断&…

Spring基础篇:Spring简介

第一章&#xff1a;Spring简介 SpringIOC工厂是Spring所有特性的基础&#xff0c;Spring所有的特性都是基于IOC控制反转特性而来的。 当今微服务已经成为主流&#xff0c;微服务依赖于SpringBoot和SpringCloud&#xff0c;而SpringBoot和SpringCloud是衍生于Spring&#xff0c…

贺利坚汇编课程笔记2 访问寄存器和内存

贺利坚汇编课程笔记2 访问寄存器和内存 文章目录贺利坚汇编课程笔记2 访问寄存器和内存0201 寄存器及数据存储CPU的组成寄存器是CPU内部的信息存储单元通用寄存器--以AX为例“字”在寄存器中的存储0202 mov 和 add指令0203 确定物理地址的方法物理地址8086CPU给出物理地址的方法…

pytorch模型网页部署——Flask

一、Flask用法 Flask是python的轻量级web框架&#xff0c;可用来做简单的模型部署。Flask的基本用法如下&#xff1a; step1&#xff1a;定义Flask类的对象&#xff0c;即创建一个基于Flask的服务器 step2&#xff1a;定义公开的路由及路由对应的调用函数 step3&#xff1a…

分享新零售系统商城小程序开发制作功能介绍_商城小程序开发好处

小编主要专注于新零售系统开发商城的领域&#xff0c;新零售系统开发商业模式有哪些&#xff1a; ① 多种销售模式&#xff1a;邀请有奖、销售业绩奖、团队业绩奖、区域分红&#xff0c;分销模式等。 ② 团队协作功能&#xff1a;立即邀约分销模式&#xff0c;清楚搜索直属代…

大型ERP生产制造管理系统源码

&#x1f353;&#x1f353;【淘源码】&#xff1a;一个专业提供高品质源码免费下载的资源共享平台&#x1f353;&#x1f353; &#x1f447;&#x1f447;&#x1f447;以下是博主整理的淘源码网站内大家都比较感兴趣的一些源码&#xff0c;需要源码学习的朋友可以私信博主哦…

Exception | ShardingSphere | ShardingSphere引发的IndexOutOfBoundsException

ShardingSphere引发的IndexOutOfBoundsException一、异常二、 原因三、解决方法四、总结一、异常 ### Error querying database. Cause: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 ### The error may exist in file [D:\JetBrains\Idea\workspace\zohe\bjxz\ru…

N-gram和NNLM语言模型

背景&#xff1a; one-hot:缺点&#xff1a;1.高维稀疏&#xff0c;2.不能体现句子中词的重要性&#xff0c;3.不能体现词与词之间的关系。 embedding:1.解决了高维稀疏 tf-idf&#xff1a;2.解决了one-hot中不能体现句子中词的重要性这一特点。 语言模型&#xff1a;3.解决不能…

【20221201】【每日一题】划分字母区间

给你一个字符串 s 。我们要把这个字符串划分为尽可能多的片段&#xff0c;同一字母最多出现在一个片段中。 注意&#xff0c;划分结果需要满足&#xff1a;将所有划分结果按顺序连接&#xff0c;得到的字符串仍然是 s 。 返回一个表示每个字符串片段的长度的列表。 思路&…

协程Part1-boost.Coroutine.md

首先&#xff0c;在计算机科学中 routine 被定义为一系列的操作&#xff0c;多个 routine 的执行形成一个父子关系&#xff0c;并且子 routine 一定会在父 routine 结束前结束&#xff0c;也就是一个个的函数执行和嵌套执行形成了父子关系。 coroutine 也是广义上的 routine&a…

网页JS自动化脚本(五)修改文字元素的内容和大小

今天的网页打开全是灰色的,顺便缅怀一下伟人,那么我我们今天定位换成按钮文字 window.onloadfunction(){var theElementdocument.querySelector("input[typesubmit]");theElement.value"爱我中华";theElement.style"font-size:25px"; }这一次的…

提分必练!中创教育PMP全真模拟题分享来喽

湖南中创教育每日五题分享来啦&#xff0c;“日日行&#xff0c;不怕千万里&#xff1b;常常做&#xff0c;不怕千万事。”&#xff0c;每日五题我们练起来&#xff01; 1、一个项目正在实行敏捷方法&#xff0c;在迭代过程中&#xff0c;团队成员互相合作&#xff0c;解决了一…

【机器学习】核函数

核方法 核技巧 非线性分类问题是指通过利用非线性模型才能很好地进行分类的问题。如图 111 所示&#xff0c;“●”表示正样本&#xff0c;“”表示负样本&#xff0c;显然无法用直线&#xff08;线性模型&#xff09;将正负样本正确分开&#xff0c;但是可以用一条椭圆曲线&…

记一次大事务优化历程(短信发送)

问题背景 短信服务数据库连接数告警&#xff0c;grafana查看数据库连接池被打满。 问题分析 在这段时间内&#xff0c;通过链路分析&#xff0c;发现最终调用第三方短信发送服务偶然耗时过长&#xff0c;分析了原有发送逻辑的代码&#xff0c;该实现在入口send处加了事务&am…

leetcode4. 寻找两个正序数组的中位数python_二分查找和递归(困难)

题目 给定两个大小分别为 m 和 n 的正序&#xff08;从小到大&#xff09;数组 nums1 和 nums2。请你找出并返回这两个正序数组的 中位数 。算法的时间复杂度应该为 O(log (mn)) 。 示例 1&#xff1a; 输入&#xff1a;nums1 [1,3], nums2 [2] 输出&#xff1a;2.00000 解释…

第二证券|疫情扰动叠加需求不足,11月制造业PMI回落至48%

国家统计局周三称&#xff0c;11月&#xff0c;受国内疫情点多面广频发&#xff0c;世界环境更趋复杂严峻等多重要素影响&#xff0c;我国制造业收购经理人指数&#xff08;PMI&#xff09;较上月回落1.2个百分点至48.0%。制造业PMI接连两个月低于临界点&#xff0c;制造业下行…

第4季2:并口、MIPI、LVDS的简介

以下内容源于朱有鹏嵌入式课程的学习与整理&#xff0c;如有侵权请告知删除。 一、并口的简介 1、并口的含义 并口的含义&#xff0c;可以从AR0130或OV9712的原理图中形象地理解。 如下图所示&#xff0c;AR0130采用12bit的并口向SoC传输图像数据信息&#xff0c;而SoC和AR0130…

b站黑马JavaScript的Ajax案例代码——评论列表案例

目标效果&#xff1a; 1.在表单界面输入评论人和内容&#xff0c;点击发表评论按钮&#xff0c;可以在页面下面看到自己刚刚输入的内容 2.发表评论成功之后&#xff0c;用DOM对象的reset方法&#xff1a;重置表单为其默认值 e.g.1初始状态&#xff1a;【下面的评论内容会因为…

STC 51单片机48——数码管显示外部中断次数

#include<reg52.h> #include<intrins.h> #include "math.h" #define uchar unsigned char #define uint unsigned int #define ulong unsigned long //共阴字形码表【实验】数码管实验时&#xff0c;一定要将点阵模块跳针放到VCC上&#xff01;&…