1. 任务启动报错Trying to access closed classloader.
Exception in thread "Thread-5" java.lang.IllegalStateException:
Trying to access closed classloader. Please check if you store
classloaders directly or indirectly in static fields.
If the stacktrace suggests that the leak occurs in a third party
library and cannot be fixed immediately, you can disable this check
with the configuration 'classloader.check-leaked-classloader'.
此错误虽不影响任务的正常启动,但可以通过在flink-conf.yaml
文件中添加classloader.check-leaked-classloader: false
选项后,后续提交任务不会再提示。
2. 资源(memory和vcores)充足,但提交任务阻塞后报错
此错误表象为yarn集群资源充足,在提交任务时也提示“ACCEPT“,但会阻塞一段时间后报错。
问题:可能是AM资源超出了限制
在Yarn集群界面上"Scheduler"里可以看到,Max Application Master Resources
和Used Application Master Resources
两个指标中,已使用的内存已经超过最大限制。
解决方法:在capacity-scheduler.xml
中修改yarn.scheduler.capacity.maximun-am-resource-percent
选项,默认是0.2,可以调大一点。
为什么不看vcores,只看内存限制呢。
3. Yarn集群有多个节点,但任务只集中分布在其中几个节点
通过指令yarn node -all -list
拉取集群当前节点状态,发现有一些节点containers的数量很大,但有一些节点依然是0。虽然那些节点状态都是RUNNING(健康)状态,但就是不接收任务。
通过排查发现,这些节点不是不接收任务,而是在执行任务时报错,导致yarn集群会重新把任务分配给其他正常节点,最终形成只有部分节点有任务的现象。这些不正常节点在接收任务时报错如下(可以在yarn界面查看,点开具体的applicationID,中间有个Diagnostics):
去到正常和异常的节点下比较,确实异常节点缺失这个文件夹(用于存储运行时nodemanager和taskmanager的日志),怀疑是部署中间件框架时遗漏,通过手动增加文件夹的方式,问题解决。
4. Flink任务失败后,log找不到
flink任务执行过程中,日志可以通过flink的ui界面上可以查看(从yarn的application master跳转)。但任务一旦因为未知错误死亡时,在flink history service里并不能查到所有的日志,有时上面写的exception并不是root cause。
这时可以配置yarn集群的日志聚集(log aggregation)功能,在yarn-site.xml
文件里配置yran.log-aggregation-enable=true
功能打开后flink任务结束(不管是否正常结束)时,任务执行的日志会被统一收集。可以在yarn界面查看,点开具体的applicationID,在最下面有个logs
可以进行查看。但是这里只能查看到jobManager
相关的日志,taskManager
需要自己手动拼接地址。例如jobManager
日志地址为:
http://qyfwzx-dev-esbmon4:19888/jobhistory/logs/qyfwzx-dev-esbmon4:45454/container_e18_1634207619484_0505_01_000001/container_e18_1634207619484_0505_01_000001/flink
其中
-
http://qyfwzx-dev-esbmon4:19888/jobhistory/logs
为jobhistoryserver
的地址,不用变。 -
qyfwzx-dev-esbmon4:45454
是jobManager
当时运行时的宿主yarn node地址 -
container_e18_1634207619484_0505_01_000001
则是yarn当时运行时容器id -
/flink
则是提交任务时使用的用户名
所以我们只需要知道taskmanager
的容器id和节点地址就能找到它的日志。这里在jobManager.log
里搜索关键字“Registering TaskManager
”,可以找到当时任务执行时taskManger
的信息。containerId
后面跟的就是当时运行container
的节点地址。
按照连接的拼装方式,可以得到taskManger
日志的地址:
http://qyfwzx-dev-esbmon4:19888/jobhistory/logs/qyfwzx-dev-esbmon1:45454/container_e18_1719387982584_0082_01_000002/container_e18_1719387982584_0082_01_000002/flink
5. Flink任务执行过程中,checkpoint太大导致失败
问题:Flink任务执行一段时间,会自动Restart,重启几次后任务失败。查看log里有以下错误提示。
Size of the state is larger than the maximum permitted memory-backed
state. Size=5244975 , maxSize=5242880 . Consider using a different
state backend, like the File System State backend.
这个是checkpoint写入文件过大导致的,可以通过设置flink-conf.yaml
文件中的选项:
-
state.backend.incremental:true
,此选项可以通过比对只保留增量变化的checkpoint内容,开启后,checkpoint的size大大缩小。 -
state.backend: filesystem
,此选项是将checkpoint写入文件系统,值默认是HashMapStateBackend
,即以java对象的形式放入内存 -
state.checkpoints.dir
,还可以指定外部hdfs地址作为存储
6. Flink任务并行度该如何设置
并行度的设置需要通过压测来决定,测试时主要观察单并行度的处理上限。即先从源头(比如kafka)积压数据,之后开启 Flink 任务,出现反压(就是处理瓶颈)时,从flink ui上查看单个任务的输出量:numRecordsOutPerSecond。然后通过 总QPS / 单并行度的处理能力 = 并行度,最终设置为并行度*1.2 倍,富余一些资源。
以下是一些常用准则:
source端
-
数据源端是kafka,source的并行度设置为kafka对应topic的分区数。
-
如果已经等于kafka的分区数,消费速度仍更不上数据生产速度,考虑下kafka要扩大分区,同时调大并行度等于分区数。
-
flink的一个并行度可以处理一至多个分区的数据,如果并行度多于kafka的分区数,那么就会造成有的并行度空闲,浪费资源
Process端
-
keyby之前的算子,比如map、fliter、flatmap等处理较快的算子,并行度和source保持一致即可。
-
keyby之后的算子,视具体情况而定,可以通过测试反压的方法,得到keyby算子上游的数据发送量和该算子的处理能力来得到合理的并行度(在无倾斜情况下)
sink端
-
sink端是数据流向下游的地方,可以根据sink端的数据量及下游的服务抗压能力进行评估。
-
如果sink端是kafka,可以设为kafka对应topic的分区数。
-
sink端的数据量若比较小,比如一些高度聚合或者过滤比较大的数据(比如监控告警),可以将并行度设置的小一些。
-
如果source端的数据量最小,拿到source端流过来的数据后做了细粒度的拆分,数据量不断的增加,到sink端的数据量非常大的这种情况,就需要提高并行度。
7. Flink任务报错超出内存
任务在执行一段时间后报错
java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.concurrent.FutrueTask.report(FutureTask.java:122)
这个问题其实要先了解flink的内存模型,参考文章 Flink TaskManager内存管理机制
其实就是Task Heap
设置的比较小,然后用户自己写的flink任务逻辑比较复杂或是数据量比较大,存储的数据较多超出了内存。
按照上述说明,再根据当前的flink的配置,发现托管内存默认是占用了40%的内存。但在我的任务里这块内存基本上没有使用的,可以调低。通过设置比例值taskmanager.memory.managed.fraction=0.1
,然后flink会自动调整Task Heap
的大小。
除此之外,还可以给taskManager
增加JVM
启动参数,在flink-conf
文件下增加:
env.java.opts.taskmanager: -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/tmp/flink_taskmanager_oom_%p_%t.hprof
当任务进程发生oom时,会自动生成堆转储(heap dump)文件,后续可以通过jdk
自带的jvisualvm
工具解析查看堆中各类数据占比,辅助分析问题。