环境参数:flink使用的版本是1.13.5、CentOS Linux 8
一,默认环境引起本地与集群的jar包冲突
遇到的情况是在idea执行的时候是没有问题的,然后打成jar包用集群执行的时候就会遇到问题。报错的时候会不太一,总之顺着错误去找的话会找到是代码中的哪个jar包产生了冲突发生的问题。比如有遇到如下错的
处理方法: 通过在启动脚本中增加如下命令:
-yD yarn.per-job-cluster.include-user-jar="FIRST"
将程序环境使用包的时候优先使用本项目包自己的,如果不设置的话是默认使用集群自带的,这也是为什么在本地能跑通但是上的集群就报jar包依赖冲突的原因。
该命令的功能如下所示:
二,提交脚步不同参数应对的需求
首先并行度是通过文件$config_path作为参数传递进去,然后在实际的代码中去设置的,用法如:
SingleOutputStreamOperator<String> FilterStream = SendStream
.process(new SubGroupMemberProcessFunction())
.setParallelism(MemberParallel)
;
在代码执行层面是通过process来执行类以及setParallelism来执行该类时的并行度。
1,最初的提交脚本是这么设置的
run_flink(){
/data0/flink/flink-1.13.5/bin/flink run -m yarn-cluster \
-ynm Flink_Rt_global_pushClick \
-ys 2 \
-yd \
-ytm 3072 \
-yjm 2048 \
-yD taskmanager.memory.jvm-metaspace.size=128mb \
-yD taskmanager.memory.managed.size=0mb \
-yD taskmanager.memory.jvm-overhead.min=512mb \
-yD yarn.containers.vcores=1 \
-c com.push.FlinkGlobalPushClickMain \
flink-1.0-SNAPSHOT.jar \
--config_path $config_path
}
后来发现集群中资源吃紧,因此就将ys的数值增加以此来减少cpu的使用:
run_flink(){
/data0/flink/flink-1.13.5/bin/flink run -m yarn-cluster \
-ynm Flink_Rt_newimpression \
-ys 4 \
-yd \
-ytm 3072 \
-yjm 2048 \
-yD taskmanager.memory.jvm-metaspace.size=128mb \
-yD taskmanager.memory.managed.size=0mb \
-yD taskmanager.memory.jvm-overhead.min=512mb \
-yD yarn.containers.vcores=1 \
-c com.push.FlinkGlobalPushClickMain \
flink-1.0-SNAPSHOT.jar \
--config_path $config_path
}
通过这种设置只有cpu的消耗从266核下降到了134核了,内存是有增加的。
(其实是牺牲了内存来换cpu的)
再接着是有的程序数据量比较大需要的资源更多因此并行度设置的就很大,这导致默认的taskmanager.memory.network是不够用的,报错如下:
java.io.IOException: Insufficient number of network buffers: required 2, but only 0 available. The total number of network buffers is currently set to 7782 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max'.
因此增加了taskmanager.memory.network.min和taskmanager.memory.network.max以此来保证程序在执行过程中即使并行度很大也能有足够的内存用于网络交互:
run_flink(){
/data0/flink/flink-1.13.5/bin/flink run -m yarn-cluster \
-ynm Flink_Rt_SPAndFR \
-ys 4 \
-yd \
-ytm 3072 \
-yjm 2048 \
-yD taskmanager.memory.jvm-metaspace.size=128mb \
-yD taskmanager.memory.managed.size=0mb \
-yD taskmanager.memory.jvm-overhead.min=512mb \
-yD yarn.containers.vcores=1 \
-yD taskmanager.memory.network.min=650mb \
-yD taskmanager.memory.network.max=3500mb \
-c com.push.FlinkGlobalPushClickMain \
flink-1.0-SNAPSHOT.jar \
--config_path $config_path
}
这样程序就你能正确执行了。