Flink开发过程中遇到的问题

news2024/11/23 13:00:21

1. 任务启动报错Trying to access closed classloader.

Exception in thread "Thread-5" java.lang.IllegalStateException: 
Trying to access closed classloader. Please check if you store 
classloaders directly or indirectly in static fields. 
If the stacktrace suggests that the leak occurs in a third party 
library and cannot be fixed immediately, you can disable this check 
with the configuration 'classloader.check-leaked-classloader'.

此错误虽不影响任务的正常启动,但可以通过在flink-conf.yaml文件中添加classloader.check-leaked-classloader: false选项后,后续提交任务不会再提示。

2. 资源(memory和vcores)充足,但提交任务阻塞后报错

此错误表象为yarn集群资源充足,在提交任务时也提示“ACCEPT“,但会阻塞一段时间后报错。

问题:可能是AM资源超出了限制
请添加图片描述
在Yarn集群界面上"Scheduler"里可以看到,Max Application Master ResourcesUsed Application Master Resources 两个指标中,已使用的内存已经超过最大限制。

为什么不看vcores,只看内存限制呢。
在这里插入图片描述
解决方法:在capacity-scheduler.xml中修改yarn.scheduler.capacity.maximun-am-resource-percent选项,默认是0.2,可以调大一点。

3. Yarn集群有多个节点,但任务只集中分布在其中几个节点

通过指令yarn node -all -list拉取集群当前节点状态,发现有一些节点containers的数量很大,但有一些节点依然是0。虽然那些节点状态都是RUNNING(健康)状态,但就是不接收任务。

通过排查发现,这些节点不是不接收任务,而是在执行任务时报错,导致yarn集群会重新把任务分配给其他正常节点,最终形成只有部分节点有任务的现象。这些不正常节点在接收任务时报错如下(可以在yarn界面查看,点开具体的applicationID,中间有个Diagnostics):
在这里插入图片描述
去到正常和异常的节点下比较,确实异常节点缺失这个文件夹(用于存储运行时nodemanager和taskmanager的日志),怀疑是部署中间件框架时遗漏,通过手动增加文件夹的方式,问题解决。

4. Flink任务失败后,log找不到

flink任务执行过程中,日志可以通过flink的ui界面上可以查看(从yarn的application master跳转)。但任务一旦因为未知错误死亡时,在flink history service里并不能查到所有的日志,有时上面写的exception并不是root cause。

这时可以配置yarn集群的日志聚集(log aggregation)功能,在yarn-site.xml文件里配置yran.log-aggregation-enable=true
在这里插入图片描述
功能打开后flink任务结束(不管是否正常结束)时,任务执行的日志会被统一收集。可以在yarn界面查看,点开具体的applicationID,在最下面有个logs可以进行查看。但是这里只能查看到jobManager相关的日志,taskManager需要自己手动拼接地址。例如jobManager日志地址为:

http://qyfwzx-dev-esbmon4:19888/jobhistory/logs/qyfwzx-dev-esbmon4:45454/container_e18_1634207619484_0505_01_000001/container_e18_1634207619484_0505_01_000001/flink

其中

  • http://qyfwzx-dev-esbmon4:19888/jobhistory/logsjobhistoryserver的地址,不用变。

  • qyfwzx-dev-esbmon4:45454jobManager当时运行时的宿主yarn node地址

  • container_e18_1634207619484_0505_01_000001则是yarn当时运行时容器id

  • /flink则是提交任务时使用的用户名

所以我们只需要知道taskmanager的容器id和节点地址就能找到它的日志。这里在jobManager.log里搜索关键字“Registering TaskManager”,可以找到当时任务执行时taskManger的信息。containerId后面跟的就是当时运行container的节点地址。

按照连接的拼装方式,可以得到taskManger日志的地址:

http://qyfwzx-dev-esbmon4:19888/jobhistory/logs/qyfwzx-dev-esbmon1:45454/container_e18_1719387982584_0082_01_000002/container_e18_1719387982584_0082_01_000002/flink

5. Flink任务执行过程中,checkpoint太大导致失败

问题:Flink任务执行一段时间,会自动Restart,重启几次后任务失败。查看log里有以下错误提示。

Size of the state is larger than the maximum permitted memory-backed
 state. Size=5244975 , maxSize=5242880 . Consider using a different 
state backend, like the File System State backend.

这个是checkpoint写入文件过大导致的,可以通过设置flink-conf.yaml文件中的选项:

  • state.backend.incremental:true,此选项可以通过比对只保留增量变化的checkpoint内容,开启后,checkpoint的size大大缩小。

  • state.backend: filesystem,此选项是将checkpoint写入文件系统,值默认是HashMapStateBackend,即以java对象的形式放入内存

  • state.checkpoints.dir,还可以指定外部hdfs地址作为存储

6. Flink任务并行度该如何设置

并行度的设置需要通过压测来决定,测试时主要观察单并行度的处理上限。即先从源头(比如kafka)积压数据,之后开启 Flink 任务,出现反压(就是处理瓶颈)时,从flink ui上查看单个任务的输出量:numRecordsOutPerSecond。然后通过 总QPS / 单并行度的处理能力 = 并行度,最终设置为并行度*1.2 倍,富余一些资源。

以下是一些常用准则:

source端

  • 数据源端是kafka,source的并行度设置为kafka对应topic的分区数。

  • 如果已经等于kafka的分区数,消费速度仍更不上数据生产速度,考虑下kafka要扩大分区,同时调大并行度等于分区数。

  • flink的一个并行度可以处理一至多个分区的数据,如果并行度多于kafka的分区数,那么就会造成有的并行度空闲,浪费资源

Process端

  • keyby之前的算子,比如map、fliter、flatmap等处理较快的算子,并行度和source保持一致即可。

  • keyby之后的算子,视具体情况而定,可以通过测试反压的方法,得到keyby算子上游的数据发送量和该算子的处理能力来得到合理的并行度(在无倾斜情况下)

sink端

  • sink端是数据流向下游的地方,可以根据sink端的数据量及下游的服务抗压能力进行评估。

  • 如果sink端是kafka,可以设为kafka对应topic的分区数。

  • sink端的数据量若比较小,比如一些高度聚合或者过滤比较大的数据(比如监控告警),可以将并行度设置的小一些。

  • 如果source端的数据量最小,拿到source端流过来的数据后做了细粒度的拆分,数据量不断的增加,到sink端的数据量非常大的这种情况,就需要提高并行度。

7. Flink任务报错超出内存

任务在执行一段时间后报错

java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.util.concurrent.FutrueTask.report(FutureTask.java:122)

这个问题其实要先了解flink的内存模型,参考文章 Flink TaskManager内存管理机制

其实就是Task Heap设置的比较小,然后用户自己写的flink任务逻辑比较复杂或是数据量比较大,存储的数据较多超出了内存。

按照上述说明,再根据当前的flink的配置,发现托管内存默认是占用了40%的内存。但在我的任务里这块内存基本上没有使用的,可以调低。通过设置比例值taskmanager.memory.managed.fraction=0.1,然后flink会自动调整Task Heap的大小。

除此之外,还可以给taskManager增加JVM启动参数,在flink-conf文件下增加:

env.java.opts.taskmanager: -XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/tmp/flink_taskmanager_oom_%p_%t.hprof

当任务进程发生oom时,会自动生成堆转储(heap dump)文件,后续可以通过jdk自带的jvisualvm工具解析查看堆中各类数据占比,辅助分析问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2036541.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于PSO-BP+BP多特征分类预测对比(多输入单输出) Matlab代码

基于PSO-BPBP多特征分类预测对比(多输入单输出) Matlab代码 1、和市面上的不同,运行一个main一键出对比图,非常方便 2、可以根据需要定制其他算法优化模型对比 程序已经调试好,无需更改代码替换数据集即可运行!!&…

Python | Leetcode Python题解之第334题递增的三元子序列

题目&#xff1a; 题解&#xff1a; class Solution:def increasingTriplet(self, nums: List[int]) -> bool:n len(nums)if n < 3:return Falsefirst, second nums[0], float(inf)for i in range(1, n):num nums[i]if num > second:return Trueif num > first…

C++字体库开发之EM长度单位转换九

freetype 设置EM // if (m_face) // FT_Set_Pixel_Sizes(*m_face, 0, pixelSize); // 动态宽&#xff0c;固定高 px // error FT_Set_Char_Size(face, /* face 对象的句柄 */ // 0, /* 以 …

Unity Audio

这章练习将介绍在unity中创建 audio&#xff08;音频&#xff09;的工具&#xff0c;培养的技能将帮助创建引人入胜的音频音景。完成本次学习后&#xff0c;能够使用 Unity 中的所有主要音频组件&#xff0c;为各种不同体验创建音频效果。 音频处理工具&#xff1a; Audacity…

Mintegral出海系列:解锁全球应用商店新增长路径

在全球化竞争的浪潮中&#xff0c;面对打法各异的应用和游戏品类&#xff0c;以及全球数百个环境不同的国家和地区&#xff0c;开发者们正面临着前所未有的挑战。Mintegral「出海ing」系列专题内容&#xff0c;助力出海开发者选准赛道探索新的增长路径。 据近期数据显示&#x…

LLM微调(精讲)-以高考选择题生成模型为例(DataWhale AI夏令营)

前言 你好&#xff0c;我是GISer Liu&#x1f601;&#xff0c;一名热爱AI技术的GIS开发者&#xff0c;上一篇文章中&#xff0c;作者介绍了基于讯飞开放平台进行大模型微调的完整流程&#xff1b;而在本文中&#xff0c;作者将对大模型微调的数据准备部分进行深入&#xff1b;…

凤凰端子音频矩阵应用领域

凤凰端子音频矩阵&#xff0c;作为一种集成了凤凰端子接口的音频矩阵设备&#xff0c;具有广泛的应用领域。以下是其主要应用领域&#xff1a; 一、专业音响系统 会议系统&#xff1a;在会议室中&#xff0c;凤凰端子音频矩阵能够处理多个话筒和音频源的信号&#xff0c;实现…

Luminar Neo for Mac/Win:创新AI图像编辑软件的强大功能

Luminar Neo&#xff0c;这款由Skylum公司倾力打造的图像编辑软件&#xff0c;为Mac和Windows用户带来了前所未有的创作体验与编辑便利。作为一款融合了先进AI技术的图像处理工具&#xff0c;Luminar Neo以其独特的功能和高效的操作流程&#xff0c;成为了摄影师、设计师及摄影…

使用Sanic和SSE实现实时股票行情推送

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐&#xff1a;「storm…

【Next】全局样式和局部样式

不同于 nuxt &#xff0c;next 的样式绝大部分都需要手动导入。 全局样式 使用 sass 先安装 npm i sass -D 。 我们可以定义一个 styles 文件&#xff0c;存放全局样式。 variables.scss $fs30: 30px;mixin border() {border: 1px solid red; }main.scss use ./variables …

业界首个OpenTelemetry结合eBPF的向导式可观测性平台APO正式开源

AutoPilot Observability (简称APO&#xff09;是什么&#xff1f; 开箱即用的可观测性平台&#xff1a;APO 致力于提供一键安装、开箱即用的可观测性平台。APO 的 OneAgent 支持一键免配置安装 Tracing 探针&#xff0c;支持采集应用的故障现场日志、基础设施指标、应用和下游…

主机防火墙IPV6 域名 测试环境搭建及测试方法

由于国内当前网站支持ipv6的很少,部分支持ipv6 的网站由于路由器的限制,也无法直接访通过ipv6进行访问,因此进行主机防火墙ipv6域名测试时,需要自己搭建环境进行测试,以下为搭建环境的步骤。 1 . 搭建DNS服务器 环境:安装有python,系统为Windows Server 2016 DNS服务…

【Vue3】vue模板中如何使用enum枚举类型

简言 有的时候&#xff0c;我们想在vue模板中直接使用枚举类型的值&#xff0c;来做一些判断。 ts枚举 枚举允许开发人员定义一组命名常量。使用枚举可以更容易地记录意图&#xff0c;或创建一组不同的情况。TypeScript 提供了基于数字和字符串的枚举。 枚举的定义这里不说了…

haproxy最强攻略

1、负载均衡 负载均衡&#xff08;Load Balance&#xff0c;简称 LB&#xff09;是高并发、高可用系统必不可少的关键组件&#xff0c;目标是 尽力将网络流量平均分发到多个服务器上&#xff0c;以提高系统整体的响应速度和可用性。 负载均衡的主要作用如下&#xff1a; 高并发…

接入谷歌支付配置

1.谷歌云创建项目 网址&#xff1a;https://console.cloud.google.com/ 按照步骤创建即可 创建好后选择项目&#xff0c;转到项目设置 选择服务账户&#xff0c;选择创建新的服务账户 名称输入好后访问权限吗账号权限都可以不用填写&#xff0c;默认就好了 然后点击电子邮…

爵士编曲:Bass编写,Walking Bass,SwingBass 爵士鼓 Swing Jazz律动 Moonkits

Walking Bass Line是乐曲构造中的基垫&#xff0c;“Walking”是在BassLine中的一种重要的感觉构成&#xff0c;等同于我们对于“行走”的理解&#xff0c;意义就是“一步接着一步”&#xff0c;先从每一步&#xff08;每一小节&#xff09;建立&#xff0c;并持续构建成一个完…

Android 10.0 SystemUI下拉状态栏QSTileView去掉着色效果显示彩色图标功能实现

1.前言 在10.0的系统rom定制化开发中,在关于SystemUI的下拉状态栏中QSTileView的背景颜色设置过程中,在由于 系统原生有着色效果,导致现在某些彩色背景显示不是很清楚效果不好,所以需要去掉QSTileView的默认着色 背景显示原生的彩色背景,接下来就来实现相关功能 如图: 2.…

【微信小程序】实现中英文切换

1、组织语言资源 创建两个文件夹&#xff0c;分别用于存放中文和英文的语言资源。例如&#xff0c;可以在 utils 文件夹下创建 lang 文件夹&#xff0c;然后在其中创建 zh.js 和 en.js 文件&#xff0c;分别存放中文和英文的文本内容。 zh.js: const zh {home: {title: 这里…

【人工智能】全景解析:【机器学习】【深度学习】从基础理论到应用前景的【深度探索】

目录 1. 人工智能的基本概念 1.1 人工智能的定义与发展 1.1.1 人工智能的定义 1.1.2 人工智能的发展历史 1.2 人工智能的分类 1.2.1 弱人工智能 1.2.2 强人工智能 1.2.3 超人工智能 1.3 人工智能的关键组成部分 1.3.1 数据 1.3.2 算法 1.3.3 计算能力 2. 机器学习…

大模型系列6--神经网络(WIP)

神经网络 1. 背景2. 理论知识2.1. 单个神经元2.1.1. 基础2.1.2. 神经元激活代码 2.2. 多个神经元2.2.1. 基础2.2.2. 神经元激活代码2.2.3. 反向传播 3. 神经网络编程基础3.1. 基本概念3.2. 逻辑回归3.3. 梯度下降法(Gradient Descent)3.3.1. 基础知识3.3.2. 梯度下降的形式化说…