Flink on yarn 开发过程中遇到的问题

news2025/1/9 22:41:17

1. 任务启动报错Trying to access closed classloader.

Exception in thread "Thread-5" java.lang.IllegalStateException: 
Trying to access closed classloader. Please check if you store 
classloaders directly or indirectly in static fields. 
If the stacktrace suggests that the leak occurs in a third party 
library and cannot be fixed immediately, you can disable this check 
with the configuration 'classloader.check-leaked-classloader'.

此错误虽不影响任务的正常启动,但可以通过在flink-conf.yaml文件中添加classloader.check-leaked-classloader: false选项后,后续提交任务不会再提示。

2. 资源(memory和vcores)充足,但提交任务阻塞后报错

此错误表象为yarn集群资源充足,在提交任务时也提示“ACCEPT“,但会阻塞一段时间后报错。

问题:可能是AM资源超出了限制
请添加图片描述
在Yarn集群界面上"Scheduler"里可以看到,Max Application Master ResourcesUsed Application Master Resources 两个指标中,已使用的内存已经超过最大限制。
解决方法:在capacity-scheduler.xml中修改yarn.scheduler.capacity.maximun-am-resource-percent选项,默认是0.2,可以调大一点。

为什么不看vcores,只看内存限制呢。
在这里插入图片描述

3. Yarn集群有多个节点,但任务只集中分布在其中几个节点

通过指令yarn node -all -list拉取集群当前节点状态,发现有一些节点containers的数量很大,但有一些节点依然是0。虽然那些节点状态都是RUNNING(健康)状态,但就是不接收任务。

通过排查发现,这些节点不是不接收任务,而是在执行任务时报错,导致yarn集群会重新把任务分配给其他正常节点,最终形成只有部分节点有任务的现象。这些不正常节点在接收任务时报错如下(可以在yarn界面查看,点开具体的applicationID,中间有个Diagnostics):
在这里插入图片描述
去到正常和异常的节点下比较,确实异常节点缺失这个文件夹(用于存储运行时nodemanager和taskmanager的日志),怀疑是部署中间件框架时遗漏,通过手动增加文件夹的方式,问题解决。

4. Flink任务失败后,log找不到

flink任务执行过程中,日志可以通过flink的ui界面上可以查看(从yarn的application master跳转)。但任务一旦因为未知错误死亡时,在flink history service里并不能查到所有的日志,有时上面写的exception并不是root cause。

这时可以配置yarn集群的日志聚集(log aggregation)功能,在yarn-site.xml文件里配置yran.log-aggregation-enable=true
在这里插入图片描述
功能打开后flink任务结束(不管是否正常结束)时,任务执行的日志会被统一收集。可以在yarn界面查看,点开具体的applicationID,在最下面有个logs可以进行查看。但是这里只能查看到jobManager相关的日志,taskManager需要自己手动拼接地址。例如jobManager日志地址为:

http://qyfwzx-dev-esbmon4:19888/jobhistory/logs/qyfwzx-dev-esbmon4:45454/container_e18_1634207619484_0505_01_000001/container_e18_1634207619484_0505_01_000001/flink

其中

  • http://qyfwzx-dev-esbmon4:19888/jobhistory/logsjobhistoryserver的地址,不用变。

  • qyfwzx-dev-esbmon4:45454jobManager当时运行时的宿主yarn node地址

  • container_e18_1634207619484_0505_01_000001则是yarn当时运行时容器id

  • /flink则是提交任务时使用的用户名

所以我们只需要知道taskmanager的容器id和节点地址就能找到它的日志。这里在jobManager.log里搜索关键字“Registering TaskManager”,可以找到当时任务执行时taskManger的信息。containerId后面跟的就是当时运行container的节点地址。

按照连接的拼装方式,可以得到taskManger日志的地址:

http://qyfwzx-dev-esbmon4:19888/jobhistory/logs/qyfwzx-dev-esbmon1:45454/container_e18_1719387982584_0082_01_000002/container_e18_1719387982584_0082_01_000002/flink

5. Flink任务执行过程中,checkpoint太大导致失败

问题:Flink任务执行一段时间,会自动Restart,重启几次后任务失败。查看log里有以下错误提示。

Size of the state is larger than the maximum permitted memory-backed
 state. Size=5244975 , maxSize=5242880 . Consider using a different 
state backend, like the File System State backend.

这个是checkpoint写入文件过大导致的,可以通过设置flink-conf.yaml文件中的选项:

  • state.backend.incremental:true,此选项可以通过比对只保留增量变化的checkpoint内容,开启后,checkpoint的size大大缩小。

  • state.backend: filesystem,此选项是将checkpoint写入文件系统,值默认是HashMapStateBackend,即以java对象的形式放入内存

  • state.checkpoints.dir,还可以指定外部hdfs地址作为存储

6. Flink任务并行度该如何设置

并行度的设置需要通过压测来决定,测试时主要观察单并行度的处理上限。即先从源头(比如kafka)积压数据,之后开启 Flink 任务,出现反压(就是处理瓶颈)时,从flink ui上查看单个任务的输出量:numRecordsOutPerSecond。然后通过 总QPS / 单并行度的处理能力 = 并行度,最终设置为并行度*1.2 倍,富余一些资源。

以下是一些常用准则:

source端

  • 数据源端是kafka,source的并行度设置为kafka对应topic的分区数。

  • 如果已经等于kafka的分区数,消费速度仍更不上数据生产速度,考虑下kafka要扩大分区,同时调大并行度等于分区数。

  • flink的一个并行度可以处理一至多个分区的数据,如果并行度多于kafka的分区数,那么就会造成有的并行度空闲,浪费资源

Process端

  • keyby之前的算子,比如map、fliter、flatmap等处理较快的算子,并行度和source保持一致即可。

  • keyby之后的算子,视具体情况而定,可以通过测试反压的方法,得到keyby算子上游的数据发送量和该算子的处理能力来得到合理的并行度(在无倾斜情况下)

sink端

  • sink端是数据流向下游的地方,可以根据sink端的数据量及下游的服务抗压能力进行评估。

  • 如果sink端是kafka,可以设为kafka对应topic的分区数。

  • sink端的数据量若比较小,比如一些高度聚合或者过滤比较大的数据(比如监控告警),可以将并行度设置的小一些。

  • 如果source端的数据量最小,拿到source端流过来的数据后做了细粒度的拆分,数据量不断的增加,到sink端的数据量非常大的这种情况,就需要提高并行度。

7. Flink任务报错超出内存

任务在执行一段时间后报错

java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.util.concurrent.FutrueTask.report(FutureTask.java:122)

这个问题其实要先了解flink的内存模型,参考文章 Flink TaskManager内存管理机制

其实就是Task Heap设置的比较小,然后用户自己写的flink任务逻辑比较复杂或是数据量比较大,存储的数据较多超出了内存。

按照上述说明,再根据当前的flink的配置,发现托管内存默认是占用了40%的内存。但在我的任务里这块内存基本上没有使用的,可以调低。通过设置比例值taskmanager.memory.managed.fraction=0.1,然后flink会自动调整Task Heap的大小。

除此之外,还可以给taskManager增加JVM启动参数,在flink-conf文件下增加:

env.java.opts.taskmanager: -XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/tmp/flink_taskmanager_oom_%p_%t.hprof

当任务进程发生oom时,会自动生成堆转储(heap dump)文件,后续可以通过jdk自带的jvisualvm工具解析查看堆中各类数据占比,辅助分析问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2047326.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Qt QLabel标签制作弹框效果,3s后缓慢自动消失

效果图 初始化说明 void InitStatusTips() {if (NULL statusTips_) {return;}statusTips_->setFixedSize(300, 80);//固定大小statusTips_->move((width() - statusTips_->width()) / 2, height() - 30 - statusTips_->height());//移动位置statusTips_->setA…

汽车IVI中控OS Linux driver开发实操(二十四):I2C设备驱动的编写

在Linux驱动中I2C系统中主要包含以下几个成员: I2C adapter(即I2C适配器,用来控制各种I2C从设备,其驱动需要完成对适配器的完整描述,最主要的工作是需要完成i2c_algorithm结构体。这个结构体包含了此I2C控制器的数据传输具体实现,以及对外上报此设备所支持的功能类型。具…

钉钉虚拟位置打卡神器2024免费试用版下载-钉钉虚拟位置打卡神器

钉钉虚拟位置打卡神器是一款能够快速帮助用户修改定位的辅助,钉钉虚拟位置打卡免费版能够一键切换手机上班的打开地点,帮助打工人更好的应对公司,收获奖金!软件不需要root就可以安装使用,并且体积也比较小,…

仿RabbitMq实现简易消息队列基础篇(future操作实现异步线程池)

TOC 介绍 std::future 是C11标准库中的一个模板类,他表示一个异步操作的结果,当我们在多线程编程中使用异步任务时,std::future可以帮助我们在需要的时候,获取任务的执行结果,std::future 的一个重要特性是能…

【Java学习】Stream流详解

所属专栏:Java学习 Stream流是JDK 8引入的一个概念,它提供了一种高效且表达力强的方式来处理数据集合(如List、Set等)或数组。Stream API可以以声明性方式(指定做什么)来处理数据序列。流操作可以被分为两大…

GD32 ADC配置跳坑

GD32 ADC配置跳坑 :时钟使能配置需在ADC前面 放在后面读取ADC值失败。 DMA配置放在ADC配置后面可以正常读取ADC的值 不同的模式选择可能会导致ADC存在读取失败的问题,红色部分是常用的模式,一般可以读取到相应的ADC的值 adc_software_trigge…

优雅谈大模型:Python编程篇

Python在机器学习领域的地位十分关键,虽然后面有Julia,Mojo等其他对手的挑战,然而Python拥有庞大的机器学习库和框架,尤其是生态系统比以往任何时候又强大了不少。从另外维度它和Java,Scala,Go,…

游戏安全入门-扫雷分析远程线程注入

前言 无论学习什么,首先,我们应该有个目标,那么入门windows游戏安全,脑海中浮现出来的一个游戏 – 扫雷,一款家喻户晓的游戏,虽然已经被大家分析的不能再透了,但是我觉得自己去分析一下还是极好…

适配器模式, 修饰器模式 与 代理模式

这三种模式, 感觉非常类似, 都是把核心类包一层, 在外部做一些额外的事情, 我还没发现他们之间具体的区别, 有想法的同学, 可以评论或者私聊我 适配器模式 简介: 就是在目标类外面包一层, 用以适配其他的模块,兼容整个程序框架 举个例子: 比如运动员, 中国运动员参加法国奥运…

市域社会治理平台规划建设方案

1. 建设背景与市域治理定义 市域社会治理作为国家治理体系的重要组成部分,具有承上启下的枢纽作用。2019年,全国市域社会治理现代化工作会议提出了推进市域社会治理现代化的总体思路,强调以城带乡、以点带面,明确了市域治理的方向…

[项目]文海泛舟测试报告

目录 一、项目背景 二、项目功能 三、功能测试 1. 测试用例: 2. 实际测试的部分(含截图) 1. 正常登录 2. 文章列表页显示/登录用户信息显示 3. 文章详情页内容显示/文章作者信息显示 4. 编辑功能 1. 点击“更新博客”按钮前 2. 点击…

前端开发攻略---Vue实现图像裁剪功能,支持用户通过图形界面进行裁剪区域的调整,最终生成裁剪后的图像。

目录 1、演示 2、实现原理 3、实现功能 4、代码 1、演示 2、实现原理 这里有详细介绍: 前端开发攻略---图片裁剪上传的原理-CSDN博客 3、实现功能 上传图像: 用户选择文件后,changeFile 方法读取文件内容并将其转换为 Data URL&#xff0c…

Amesim中动力电池建模方法与原则简介

引言 新能源动力电池一维仿真与三维仿真的主要区别在与,一维仿真中无法在仿真中精准的得到各个点的温度变化,其仅为质量块的平均温度。而在新能源动力电池一维仿真中,旨在对动力电池的策略、充放电时间等进行验证。而无论是策略还是充放电时…

jmreport测试数据库出现 权限不足,此功能需要分配角色 解决方法

目录 前言1. 问题所示2. 原理分析3. 解决方法前言 关于jmreport的补充可看官网:jmreport上线安全配置 1. 问题所示 jmreport测试数据库出现,出现如下所示的问题:权限不足,此功能需要分配角色! 截图如下所示: 2. 原理分析 对于原理分析的Bug,代表当前用户没有足够的…

HDFS的编程

一、HDFS原理 HDFS(Hadoop Distributed File System)是hadoop生态系统的一个重要组成部分,是hadoop中的的存储组件,在整个Hadoop中的地位非同一般,是最基础的一部分,因为它涉及到数据存储,MapReduce等计算模型都要依赖于存储在HDFS中的数据。HDFS是一个分布式文件系统,…

20款必试AI工具:轻松搞定设计到协作

随着人工智能技术的发展,各种AI工具如雨后春笋般涌现,给我们的工作和生活带来了极大便利。 在AI工具的海洋中,哪一款才是你的真命天子? 众所周知,AI工具如雨后春笋般涌现,让人目不暇接。面对琳琅满目的选…

Oracle 字符串转多行(REGEXP_SUBSTR)

方案一: SQL 1.一个数据表(TABLE1_ZK)中存在一个字段(STRS)(存储格式是以【,】隔开的字符串) 2.现需要将其查分为多行数据(每行为其中一个字符串) 3.sql SELECT t.id,REGEXP_SUBSTR(t.STRS, [^,], 1, LEVEL) AS ma…

招聘|头部云厂商招 PG 核心骨干 DBA【上海】

我们的招聘专区又回来了!🏃 Bytebase 作为先进的数据库 DevOps 团队协同工具 🔧,用户群里汇聚了 💗 业界优秀的 DBA,SRE,运维的同学们 🌟。 上周用户群里有小伙伴发招聘信息 &…

【观察者模式】设计模式系列: 实现与最佳实践案例分析

文章目录 观察者模式深入解析:在Java中的实现与应用1. 引言1.1 观察者模式简介1.2 模式的重要性及其在现实世界的应用示例1.3 本文的目标和读者定位 2. 观察者模式的基本概念2.1 定义与原理2.2 UML类图和时序图2.3 核心原则2.4 使用场景 3. 观察者模式与其他模式的关…

【数据结构】Java实现链表

目录 链表的概念 链表的实现 链表的功能 框架搭建 功能实现 打印链表 获取数据数量 查询数据 插入数据 头插法 尾插法 指定位置插入 删除数据 删除一个数据 删除多个相同数据 删除链表 完整代码 链表的概念 链表是一种物理存储结构上非连续存储结构&#xff0…