【性能|优化】TB级flink任务报错分析:Could not compute the container Resource

news2024/11/24 8:31:47

文章目录

    • 一. 问题引入
      • 1. 场景描述
      • 2. 日志简析
    • 二. 初级问题分析与解决
      • 1. 问题分析
        • 1.1. yarn的调度器设置
        • 1.2. 程序设置
      • 2. 问题解决
    • 三. (性能)新的问题
      • 1. 问题描述
      • 2. 理想化的最优方案
      • 3. "PlanB"的解决方案
    • 四. 反思与迭代

一. 问题引入

1. 场景描述

使用flink引擎,处理hdfs到hive的任务,hdfs的文件数有4000个,这里设置并行度为20,提交任务运行。

 

2. 日志简析

任务提交之后发现报错,我们简单分析下yarn的日志:

1. 申请的资源超过了yarn最大的container资源限制,也就是说一个taskexecutor所需的资源过大

Could not compute the container Resource from the given TaskExecutorProcessSpec TaskExecutorProcessSpec {cpuCores=10.0, frameworkHeapSize=128.000mb (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), taskHeapSize=5.200gb (5583457416 bytes), taskOffHeapSize=0 bytes, networkMemSize=1024.000mb (1073741824 bytes), managedMemorySize=4.300gb (4617089912 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), jvmOverheadSize=1024.000mb (1073741824 bytes)}. This usually indicates the requested resource is larger than Yarn's max container resource limit.




2. 接着开始请求一个新的worker,这里的worker应该也是container ,那此时正在pend的数量为1.

Requesting new worker with resource spec WorkerResourceSpec {cpuCores=10.0, taskHeapSize=5.200gb (5583457416 bytes), taskOffHeapSize=0 bytes, networkMemSize=1024.000mb (1073741824 bytes), managedMemSize=4.300gb (4617089912 bytes)}, current pending count: 1.



3. 但同样的这个container超过了yarn的资源,这时直接放弃分配资源

Requested container resource (<memory:12288, vCores:10>) exceeds the max limitation of the Yarn cluster (<memory:12288, vCores:4>). Will not allocate resource.



4. 如此在反复这样执行,似乎陷入到了死循环。

最后任务部署超时而报错。。。

上面的日志说的比较明白,就是:申请的资源超过了yarn最大的container资源限制。
 
 
再放出点堆栈信息,供日后参考分析:

2022-11-18 <b>10:12:12</b>,938 WARN  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Failed requesting worker with resource spec WorkerResourceSpec {cpuCores=10.0, taskHeapSize=5.200gb (5583457416 bytes), taskOffHeapSize=0 bytes, networkMemSize=1024.000mb (1073741824 bytes), managedMemSize=4.300gb (4617089912 bytes)}, current pending count: 0
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: <b>Could not compute the container Resource from the given</b> TaskExecutorProcessSpec TaskExecutorProcessSpec {cpuCores=10.0, frameworkHeapSize=128.000mb (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), taskHeapSize=5.200gb (5583457416 bytes), taskOffHeapSize=0 bytes, networkMemSize=1024.000mb (1073741824 bytes), managedMemorySize=4.300gb (4617089912 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), jvmOverheadSize=1024.000mb (1073741824 bytes)}. This usually indicates the requested resource is larger than Yarn's max container resource limit.
	。。。
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.7.jar:1.12.7]
2022-11-18 <b>10:12:12,93</b>9 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Requesting new worker with resource spec WorkerResourceSpec {cpuCores=10.0, taskHeapSize=5.200gb (5583457416 bytes), taskOffHeapSize=0 bytes, networkMemSize=1024.000mb (1073741824 bytes), managedMemSize=4.300gb (4617089912 bytes)}, current pending count: 1.
2022-11-18 10:12:12,940 WARN  org.apache.flink.yarn.TaskExecutorProcessSpecContainerResourcePriorityAdapter [] - Requested container resource (<memory:12288, vCores:10>) exceeds the max limitation of the Yarn cluster (<memory:12288, vCores:4>). Will not allocate resource.
2022-11-18 10:12:12,940 WARN  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Failed requesting worker with resource spec WorkerResourceSpec {cpuCores=10.0, taskHeapSize=5.200gb (5583457416 bytes), taskOffHeapSize=0 bytes, networkMemSize=1024.000mb (1073741824 bytes), managedMemSize=4.300gb (4617089912 bytes)}, current pending count: 0
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: Could not compute the container Resource from the given TaskExecutorProcessSpec TaskExecutorProcessSpec {cpuCores=10.0, frameworkHeapSize=128.000mb (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), taskHeapSize=5.200gb (5583457416 bytes), taskOffHeapSize=0 bytes, networkMemSize=1024.000mb (1073741824 bytes), managedMemorySize=4.300gb (4617089912 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), jvmOverheadSize=1024.000mb (1073741824 bytes)}. This usually indicates the requested resource is larger than Yarn's max container resource limit.
	。。。
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.7.jar:1.12.7]
2022-11-18 10:12:12,941 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Requesting new worker with resource spec WorkerResourceSpec {cpuCores=10.0, taskHeapSize=5.200gb (5583457416 bytes), taskOffHeapSize=0 bytes, networkMemSize=1024.000mb (1073741824 bytes), managedMemSize=4.300gb (4617089912 bytes)}, current pending count: 1.
2022-11-18 10:12:12,941 WARN  org.apache.flink.yarn.TaskExecutorProcessSpecContainerResourcePriorityAdapter [] - Requested container resource (<memory:12288, vCores:10>) exceeds the max limitation of the Yarn cluster (<memory:12288, vCores:4>). Will not allocate resource.
2022-11-18 10:12:12,941 WARN  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Failed requesting worker with resource spec WorkerResourceSpec {cpuCores=10.0, taskHeapSize=5.200gb (5583457416 bytes), taskOffHeapSize=0 bytes, networkMemSize=1024.000mb (1073741824 bytes), managedMemSize=4.300gb (4617089912 bytes)}, current pending count: 0
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: Could not compute the container Resource from the given TaskExecutorProcessSpec TaskExecutorProcessSpec {cpuCores=10.0, frameworkHeapSize=128.000mb (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), taskHeapSize=5.200gb (5583457416 bytes), taskOffHeapSize=0 bytes, networkMemSize=1024.000mb (1073741824 bytes), managedMemorySize=4.300gb (4617089912 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), jvmOverheadSize=1024.000mb (1073741824 bytes)}. This usually indicates the requested resource is larger than Yarn's max container resource limit.
	at org.apache.flink.yarn.YarnResourceManagerDriver.requestResource(YarnResourceManagerDriver.java:254) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:249) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]
	at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]
	at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]
	at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]
	at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]
	at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]
	at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]
	at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]
	at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]
	at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]
	at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]
	at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]
	at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]
	at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]
	at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]
	at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]
	at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]
	at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]
	at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]
	at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]
	at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]
	at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]
	at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.startNewWorker(ActiveResourceManager.java:160) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.resourcemanager.ResourceManager$ResourceActionsImpl.allocateResource(ResourceManager.java:1382) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.allocateResource(SlotManagerImpl.java:1058) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.fulfillPendingSlotRequestWithPendingTaskManagerSlot(SlotManagerImpl.java:954) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$internalRequestSlot$9(SlotManagerImpl.java:943) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:51) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:941) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:410) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.resourcemanager.ResourceManager.requestSlot(ResourceManager.java:529) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_152]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_152]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_152]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_152]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.12.7.jar:1.12.7]
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.12.7.jar:1.12.7]
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.12.7.jar:1.12.7]
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.12.7.jar:1.12.7]
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.12.7.jar:1.12.7]
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.12.7.jar:1.12.7]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.12.7.jar:1.12.7]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.7.jar:1.12.7]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.7.jar:1.12.7]
	at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.12.7.jar:1.12.7]
	at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.12.7.jar:1.12.7]
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.12.7.jar:1.12.7]
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.12.7.jar:1.12.7]
	at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.12.7.jar:1.12.7]
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.12.7.jar:1.12.7]
	at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.12.7.jar:1.12.7]
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.12.7.jar:1.12.7]
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.12.7.jar:1.12.7]
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.12.7.jar:1.12.7]
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.12.7.jar:1.12.7]
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.7.jar:1.12.7]
<br/>

 
 

二. 初级问题分析与解决

1. 问题分析

报错很明确,也就是我们向yarn多申请了资源。所以我们关注两点:yarn的调度器设置(规定了队列,每个任务申请的限制)、程序中是如何设置的并行度的。

1.1. yarn的调度器设置

打开yarn 看到Maximum Allocation限制为<memory:12288, vCores:4> ,具体的说,就是我们申请一个container最大内存能申请12G、最大核心数为4
在这里插入图片描述
 

1.2. 程序设置

看下运行的shell的伪代码

tmp_value=`echo $parallelism 10 | awk '{if($1 > $2) print 1; else print 0;}'`
if [ $tmp_value -eq 1 ] ;then
  tm=12288
  vcores=10
  numberOfTaskSlots=10
fi
...

"yarn.containers.vcores":${vcores}
"taskmanager.numberOfTaskSlots":${numberOfTaskSlots}
...

这里可以看到冲突:当我们的并行度设置超过10时,vcores设置为10,但yarn最大让设置为4,所以会报错。。。

 

2. 问题解决

解决问题的方式也比较简单,让vcores最大保持为4,然后再运行。

这里我们设置整个job的并行度还是为20,然后申请一个container中vcores=4,那么按照一个并行度分配一个线程的逻辑

即一个taskmanager(container)中有4核对应有4个slot,那将会有20/4=5个taskmanager

 
 

三. (性能)新的问题

1. 问题描述

任务是跑起来了,但是出现了一个性能的问题:

先看下job的运行情况:

job的消费速度:

  • 20并行度下,每个taskmanager的内存为12G,每秒消费11.28万条数据,那每分钟处理的速度是676.89万条/min,每小时4.06亿条/小时

  • 2032.766839793899649 GB 每小时平均

  • 5793.385493412613869 GB 累计2.85小时
     

最终的结果

  • 53.14亿条
  • 数据量有:3.03TB
  • 消费每条数据的平均大小是:626byte。
  • 1G=1,073,741,824 bytes

最终整个job运行完,花了11个小时:
在这里插入图片描述
客户的要求是1小时内完成,但速度太慢了。。。
 

2. 理想化的最优方案

客户集群的资源是够的,所以我们不考虑资源问题,那既然这样,因为hdfs的文件总共有4000个,再有yarn最大资源分配是4(core/container),所以我们部署一个4000并行度的任务,它将运行的最快!!!

所以我将并行度设置为4000时将会有1000个taskmanager启动。等一下。1000个???我心疼jobmanager一秒钟先。

果然,任务还没调度完,就失败了。。。

所以理想有些简单,也没有银弹。

 

3. "PlanB"的解决方案

既然一个jobmanager不能管理这么多的taskmanager,那就降低taskmanager的数量。
。。。经过多次尝试之后,这里最终给出了方案B的设置:

并行度设置为500,taskmanager启动了125个。

任务最终处理时间缩短到:23分钟

*********************************************
nErrors              |  0
nullErrors           |  0
duplicateErrors      |  0
conversionErrors     |  0
otherErrors          |  0
numWrite             |  3164002700
byteWrite            |  17123302298224
numRead              |  3164002700
writeDuration        |  545313434
byteRead             |  1983716362408
readDuration         |  539805057
snapshotWrite        |  6328005400
*********************************************

2022-11-21 11:48:56 Start to stop flink job(3734d90205a13eff7cfa61b5c0c9686b)
2022-11-21 11:50:11 Success to stop flink job(3734d90205a13eff7cfa61b5c0c9686b)
2022-11-21 11:50:11 Flink process exit code is :0

 
 

四. 反思与迭代

针对上述的处理过程,发现几点值得思考和迭代:

  1. 我们程序和yarn进行定时通讯,及时获取yarn中调度器的设置,然后动态设置最大核心数,充分利用资源的同时,也保证了程序的稳定;
  1. 我们在设置并行度时,需要考虑jobmanager能够协调的taskmanager的数量,不是靠尝试。
     
    本文学到的经验是,设置125个taskmanager时,jobmanager是可以顶住压力的,但接下来可以分析分析flink的通讯相关的源码,以便能极大的发挥集群资源。
     
    其次对于jobmanager管理这么多节点是否可以设置flink的高可用,增加job运行的稳定度

还有一个小细节
在这里插入图片描述

  1. taskmanager的内存使用有些浪费,如上图,并未充分利用内存资源,这点我们也需思考要如何优化。

仔细点我们也可以发现

  1. 当container设置的核心数最大为4时,numberOfTaskSlots的设置其实无效了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/45956.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2022年工业与电力物联网技术现状分析

主要内容22年工业与电力物联网技术现状分析10大科技趋势达摩院 2022 十大科技趋势腾讯 融合2022年十大数字科技前沿应用趋势艾瑞咨询 2022年中国科技与IT十大趋势物联网技术物联网架构体系结构简介应用层2022年整体情况物联网平台及相关能力平台2022年整体情况物联网网络2022年…

Python学习:json对象与string相互转换教程

首先要明确&#xff0c;python里有json这个库&#xff0c;但并没有json这个类&#xff0c;所以所谓的json对象本质上就是一个dict&#xff1b;而json这个库&#xff0c;用于实现dict到string、string到dict的互转。 更具体一点&#xff0c;json对象&#xff08;dict&#xff0…

鼠标监视 | 拖拽方块 | Vue

title: 拖拽功能 tags: Vue categories: JavaScript abbrlink: 18a433ce date: 2022-11-26 21:14:19 效果 代码 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" co…

使用 KubeSphere 部署高可用 RocketMQ 集群

作者&#xff1a;老Z&#xff0c;云原生爱好者&#xff0c;目前专注于云原生运维&#xff0c;KubeSphere Ambassador。 Spring Cloud Alibaba 全家桶之 RocketMQ 是一款典型的分布式架构下的消息中间件产品&#xff0c;使用异步通信方式和发布订阅的消息传输模型。 很多基于 S…

红色荧光染料AF 594活性酯,Alexa Fluor 594 NHS ester,CAS:295348-87-7

一&#xff1a;产品描述 1、名称 英文&#xff1a; AF 594 NHS Ester AF 594 Succinimidyl Ester Alexa Fluor 594 NHS Ester 中文&#xff1a;AF 594活性酯 2、CAS编号&#xff1a;295348-87-7 3、分子式&#xff1a;C39H37N3O13S2 4、分子量&#xff1a;819.85 5、…

【人工智能 机器学习 深度学习】基础选择题1~30题 练习

目录 一、1~10题1.1 题目1.2 答案二、11~20题2.1 题目2.2 答案三、21~30题3.1 题目3.2 答案写在前面:适用于对 人工智能&机器学习&深度学习 进行复习的同学,同时,也可以通过基础题目的练习,加深理解。 一、1~10题 均是先给出10道题目,而后给出 10道题目的答案。 …

(十二)Java算法:桶排序(详细图解)

目录一、前言1.1、概念1.2、算法步骤二、maven依赖三、流程解析3.1、桶编号计算3.2、桶元素排序四、编码实现一、前言 1.1、概念 计数排序&#xff1a;的核心在于将输入的数据值转化为键存储在额外开辟的数组空间中。作为一种线性时间复杂度的排序&#xff0c;计数排序要求输入…

SpringCloud全系列知识(2)—— Nacos配置和集群

Nacos配置和集群 一 统一配置管理 Nacos官方文档 1.配置热更新 1.新建配置文件 在Nacos的配置列表中新建一个配置文件。 注意事项&#xff1a; Data ID 命名规则&#xff1a;一般情况下使用使用 “微服务名称运行环境” 作为DataID&#xff0c;后缀名建议使用文件后缀全…

集成一个以官网(微信,QQ,微博)为标准的登录分享功能

Hello&#xff0c;各位老铁&#xff0c;今天要分享的是一个老生常谈的一个功能&#xff0c;也是网上一搜一大片的技术点&#xff0c;没什么技术含量&#xff0c;就是整合一下&#xff0c;提供一下方便&#xff0c;相对于友盟&#xff0c;ShareSdk中夹杂着一些别的功能&#xff…

Python编程 集合

作者简介&#xff1a;一名在校计算机学生、每天分享Python的学习经验、和学习笔记。 座右铭&#xff1a;低头赶路&#xff0c;敬事如仪 个人主页&#xff1a;网络豆的主页​​​​​​ 目录 前言 一.集合 1.集合介绍(掌握) 2.集合创建(掌握) 3.添加元素(熟悉) 4.集合…

区域治理杂志区域治理杂志社区域治理编辑部2022年第40期目录

政策 乡村振兴人才发展战略分析 (1) 高庸江 声音《区域治理》投稿&#xff1a;cnqikantg126.com 迈向智能党建&#xff1a;智能媒体在党建工作中的创新运用 (5) 申娟 经验 北京市昌平区节水问题及水资源利用对策研究 (9) 彭聪 重庆市低碳创新政策优化研究 (13) 林芳 …

怎么把Word翻译成中文?建议收藏这些方法

相信不少小伙伴在撰写论文的时候&#xff0c;都有查阅大量的国内外文献吧。有时由于国外文献里面会含有大量的深奥词汇&#xff0c;我们一时半会儿没法马上理解文献的内容。其实我们可以使用软件直接翻译Word文档&#xff0c;那你们知道Word文档怎么翻译吗&#xff1f;有需要对…

【Android App】实战项目之仿抖音的短视频分享App(附源码和演示视频 超详细必看)

需要全部代码请点赞关注收藏后评论区留言私信~~~ 与传统的影视行业相比&#xff0c;诞生于移动互联网时代的短视频是个全新行业&#xff0c;它制作方便又容易传播&#xff0c;一出现就成为大街小巷的时髦潮流。 各行各业的人们均可通过短视频展示自己&#xff0c;短小精悍的视频…

网络面试-0x12 UDP和TCP的区别以及应用场景

一、 UDP &#xff08;user datagram protocol&#xff09;用户数据报协议 ①&#xff1a; 一种简单的面向数据报的通讯协议&#xff0c;即&#xff1a;应用层传下来的报文&#xff0c;不合并&#xff0c;不拆分&#xff0c;只是在其上面加上首部后就交给了下面的网络层。无论应…

Github优秀项目-使用Python基于CPM文本自动生成

基于CPM模型的中文文本生成项目,可用于作文、小说、新闻、古诗等中文生成任务。 虽然说开源的,但是实际部署应用的过程中很多小伙伴还遇见了不少的问题,这里我用自己的方式部署应用,如果还没有实现该功能的小伙伴可以跟我一起来操作。 如果有不了解的小伙伴先来看一下实验…

【多目标进化优化】多目标进化算法的收敛性

0 前言 \quad\quad对 MOEAMOEAMOEA 收敛性的研究是 MOEAMOEAMOEA 研究的重要内容&#xff0c;但目前这方面的研究结果比较少。 一个 MOEAMOEAMOEA 的收敛性可以从两个方面考虑&#xff1b;一是有限时间内的收敛&#xff1b;二是当时间趋向于无穷大时的收敛。第一类收敛是最理想…

Kotlin高仿微信-第9篇-单聊-文本

Kotlin高仿微信-项目实践58篇详细讲解了各个功能点&#xff0c;包括&#xff1a;注册、登录、主页、单聊(文本、表情、语音、图片、小视频、视频通话、语音通话、红包、转账)、群聊、个人信息、朋友圈、支付服务、扫一扫、搜索好友、添加好友、开通VIP等众多功能。 Kotlin高仿…

麦芽糖-阿奇霉素 maltose-Azithromycin 阿奇霉素-PEG-麦芽糖

麦芽糖-阿奇霉素 maltose-Azithromycin 阿奇霉素-PEG-麦芽糖 中文名称&#xff1a;麦芽糖-阿奇霉素 英文名称&#xff1a;maltose-Azithromycin 别称&#xff1a;阿奇霉素修饰麦芽糖&#xff0c;阿奇霉素-麦芽糖 纯度&#xff1a;95% 存储条件&#xff1a;-20C&#xff0c…

webscoket学习

webscoket基本使用 WebSocket - Web API 接口参考 | MDN 使用node编写webscoket服务 nodejs-webscoket 在github的地址↓ GitHub - sitegui/nodejs-websocket: A node.js module for websocket server and client ws和socket.io 是wbscket的两个库 仓库地址&#xff1a;l…

Scala系列-5、scala中的泛型、actor、akka

版权声明&#xff1a;本文为博主原创文章&#xff0c;遵循 CC 4.0 BY-SA 版权协议&#xff0c;转载请附上原文出处链接和本声明。 传送门&#xff1a;大数据系列文章目录 目录scala的 泛型给方法定义泛型给类定义泛型泛型的上下界泛型中 非变 协变 和 逆变scala中actor相关内…