Flink-SQL join 优化 -- MiniBatch + local-global

news2024/11/18 21:25:00

背景

  • 问题1. 近期在开发flink-sql期间,发现数据在启动后,任务总是进行重试,运行一段时间后,container心跳超时,内存溢出,作业无法进行正常工作
023-10-07 14:53:30,408 | INFO  | [flink-akka.actor.default-dispatcher-29] | Stopping worker container_e03_1678102291469_2749_01_000002(node-group-1jPmk0002.mrs-qrmc.com:8041). | org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.internalStopWorker(ActiveResourceManager.java:461)
2023-10-07 14:53:30,408 | INFO  | [flink-akka.actor.default-dispatcher-29] | Stopping container container_e03_1678102291469_2749_01_000002(node-group-1jPmk0002.mrs-qrmc.com:8041). | org.apache.flink.yarn.YarnResourceManagerDriver.releaseResource(YarnResourceManagerDriver.java:298)
2023-10-07 14:53:30,409 | INFO  | [org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl #1] | Processing Event EventType: STOP_CONTAINER for Container container_e03_1678102291469_2749_01_000002 | org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl$ContainerEventProcessor.run(NMClientAsyncImpl.java:955)
2023-10-07 14:53:30,824 | WARN  | [flink-akka.actor.default-dispatcher-29] | Remote connection to [/10.155.0.9:42366] failed with java.io.IOException: Connection reset by peer | akka.event.slf4j.Slf4jLogger$$anonfun$receive$1.$anonfun$applyOrElse$2(Slf4jLogger.scala:90)
2023-10-07 14:53:30,825 | WARN  | [flink-akka.actor.default-dispatcher-29] | Association with remote system [akka.tcp://flink@node-group-1jpmk0002.mrs-qrmc.com:32331] has failed, address is now gated for [50] ms. Reason: [Disassociated]  | akka.event.slf4j.Slf4jLogger$$anonfun$receive$1.$anonfun$applyOrElse$2(Slf4jLogger.scala:90)
2023-10-07 14:53:30,825 | WARN  | [flink-metrics-6] | Association with remote system [akka.tcp://flink-metrics@node-group-1jpmk0002.mrs-qrmc.com:28852] has failed, address is now gated for [50] ms. Reason: [Disassociated]  | akka.event.slf4j.Slf4jLogger$$anonfun$receive$1.$anonfun$applyOrElse$2(Slf4jLogger.scala:90)
2023-10-07 14:53:32,171 | INFO  | [flink-akka.actor.default-dispatcher-29] | Received 1 containers. | org.apache.flink.yarn.YarnResourceManagerDriver$YarnContainerEventHandler.lambda$onContainersAllocated$1(YarnResourceManagerDriver.java:620)
2023-10-07 14:53:32,172 | INFO  | [flink-akka.actor.default-dispatcher-29] | Received 1 containers with priority 1, 1 pending container requests. | org.apache.flink.yarn.YarnResourceManagerDriver.onContainersOfPriorityAllocated(YarnResourceManagerDriver.java:352)
2023-10-07 14:53:32,172 | INFO  | [flink-akka.actor.default-dispatcher-29] | Removing container request Capability[<memory:4096, vCores:3>]Priority[1]AllocationRequestId[0]ExecutionTypeRequest[{Execution Type: GUARANTEED, Enforce Execution Type: false}]Resource Profile[null]. | org.apache.flink.yarn.YarnResourceManagerDriver.removeContainerRequest(YarnResourceManagerDriver.java:405)
2023-10-07 14:53:32,172 | INFO  | [flink-akka.actor.default-dispatcher-29] | Accepted 1 requested containers, returned 0 excess containers, 0 pending container requests of resource <memory:4096, vCores:3>. | org.apache.flink.yarn.YarnResourceManagerDriver.onContainersOfPriorityAllocated(YarnResourceManagerDriver.java:392)
2023-10-07 14:53:32,172 | INFO  | [cluster-io-thread-4] | TaskExecutor container_e03_1678102291469_2749_01_000003(node-group-1jPmk0002.mrs-qrmc.com:8041) will be started on node-group-1jPmk0002.mrs-qrmc.com with TaskExecutorProcessSpec {cpuCores=3.0, frameworkHeapSize=128.000mb (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), taskHeapSize=1.425gb (1530082070 bytes), taskOffHeapSize=0 bytes, networkMemSize=343.040mb (359703515 bytes), managedMemorySize=1.340gb (1438814063 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), jvmOverheadSize=409.600mb (429496736 bytes), numSlots=6}. | org.apache.flink.yarn.YarnResourceManagerDriver.createTaskExecutorLaunchContext(YarnResourceManagerDriver.java:472)
2023-10-07 14:53:32,178 | INFO  | [cluster-io-thread-4] | Creating container launch context for TaskManagers | org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:500)
2023-10-07 14:53:32,179 | INFO  | [cluster-io-thread-4] | Starting TaskManagers | org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:520)
2023-10-07 14:53:32,182 | INFO  | [flink-akka.actor.default-dispatcher-29] | Requested worker container_e03_1678102291469_2749_01_000003(node-group-1jPmk0002.mrs-qrmc.com:8041) with resource spec WorkerResourceSpec {cpuCores=3.0, taskHeapSize=1.425gb (1530082070 bytes), taskOffHeapSize=0 bytes, networkMemSize=343.040mb (359703515 bytes), managedMemSize=1.340gb (1438814063 bytes), numSlots=6}. | org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$1(ActiveResourceManager.java:419)
2023-10-07 14:53:32,182 | INFO  | [org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl #2] | Processing Event EventType: START_CONTAINER for Container container_e03_1678102291469_2749_01_000003 | org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl$ContainerEventProcessor.run(NMClientAsyncImpl.java:955)
2023-10-07 14:53:32,554 | INFO  | [flink-akka.actor.default-dispatcher-29] | Heartbeat of TaskManager with id container_e03_1678102291469_2749_01_000002(node-group-1jPmk0002.mrs-qrmc.com:8041) timed out. | org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1372)
2023-10-07 14:53:32,558 | INFO  | [flink-akka.actor.default-dispatcher-29] | ConstraintEnforcer[20] (1/6) (1a65cb345c504de7b9704270217856a7) switched from RUNNING to FAILED on container_e03_1678102291469_2749_01_000002 @ node-group-1jPmk0002.mrs-qrmc.com (dataPort=32396). | org.apache.flink.runtime.executiongraph.Execution.transitionState(Execution.java:1424)
java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id container_e03_1678102291469_2749_01_000002(node-group-1jPmk0002.mrs-qrmc.com:8041) timed out.
        at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1373) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
        at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:155) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_332]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_332]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:443) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:443) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.actor.ActorCell.invoke(ActorCell.scala:548) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_332]
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_332]
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_332]
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_332]
2023-10-07 14:53:32,567 | INFO  | [flink-akka.actor.default-dispatcher-29] | Call stack:
    at java.lang.Thread.getStackTrace(Thread.java:1564)
    at org.apache.flink.runtime.executiongraph.Execution.transitionState(Execution.java:1432)
    at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1124)
    at org.apache.flink.runtime.executiongraph.Execution.markFailed(Execution.java:908)
    at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.updateStateInternal(DefaultExecutionGraph.java:1318)
    at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.updateState(DefaultExecutionGraph.java:1277)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:733)
    at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
    at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1543)
    at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1119)
    at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1059)
    at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:760)
    at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195)
    at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182)
    at org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271)
    at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
    at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
    at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
    at org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271)
    at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152)
    at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:482)
    at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeAndReleaseSlots(DefaultDeclarativeSlotPool.java:474)
    at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:445)
    at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249)
    at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230)
    at org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:505)
    at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.handleTaskManagerConnectionLoss(JobMaster.java:1378)
    at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1373)

  • 问题2. 未出现container心跳超时的,作业运行缓慢,超过一天 ,作业仍存在反压情况
    在这里插入图片描述

问题分析

  • 查看日志内容发现,出现内存溢出,
2023-10-07 17:58:27,526 | INFO  | [Checkpoint Timer] | Triggering checkpoint 11 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1696672707523 for job f80d756ccb03ed8d6c19d114e0ba9e63. | org.apache.flink.runtime.checkpoint.CheckpointCoordinator.createPendingCheckpoint(CheckpointCoordinator.java:832)
2023-10-07 17:59:31,393 | INFO  | [flink-metrics-4] | No response from remote for outbound association. Handshake timed out after [60000 ms]. | akka.event.slf4j.Slf4jLogger$$anonfun$receive$1.$anonfun$applyOrElse$3(Slf4jLogger.scala:96)
2023-10-07 17:59:31,396 | WARN  | [flink-metrics-4] | Association with remote system [akka.tcp://flink-metrics@node-group-1jpmk0002.mrs-qrmc.com:28852] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink-metrics@node-group-1jpmk0002.mrs-qrmc.com:28852]] Caused by: [No response from remote for outbound association. Handshake timed out after [60000 ms].] | akka.event.slf4j.Slf4jLogger$$anonfun$receive$1.$anonfun$applyOrElse$2(Slf4jLogger.scala:90)
**2023-10-07 18:00:15,005 | INFO  | [flink-akka.actor.default-dispatcher-22] | LookupJoin[29] (3/3) (803c659fe1f2ef1c5886f73bab51a914) switched from RUNNING to FAILED on container_e03_1678102291469_2752_01_000006 @ node-group-1jPmk0002.mrs-qrmc.com (dataPort=32396). | org.apache.flink.runtime.executiongraph.Execution.transitionState(Execution.java:1424)
java.lang.OutOfMemoryError: GC overhead limit exceeded**
	at org.apache.flink.core.memory.MemorySegmentFactory.wrap(MemorySegmentFactory.java:48) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.data.binary.BinaryStringData.fromBytes(BinaryStringData.java:95) ~[flink-table-api-java-uber-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.data.StringData.fromBytes(StringData.java:59) ~[flink-table-api-java-uber-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.data.columnar.ColumnarRowData.getString(ColumnarRowData.java:123) ~[flink-table-api-java-uber-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$1(RowData.java:221) ~[flink-table-api-java-uber-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.data.RowData$$Lambda$794/901417218.getFieldOrNull(Unknown Source) ~[?:?]
	at org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296) ~[flink-table-api-java-uber-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.data.RowData$$Lambda$788/1820263644.getFieldOrNull(Unknown Source) ~[?:?]
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.connectors.hive.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:138) ~[flink-connector-hive_2.12-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.connectors.hive.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:107) ~[flink-connector-hive_2.12-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at LookupFunction$606.flatMap(Unknown Source) ~[?:?]
	at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.fetchElements(LookupJoinRunner.java:90) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.lambda$processElement$0(LookupJoinRunner.java:76) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner$$Lambda$1805/947327052.run(Unknown Source) ~[?:?]
	at org.apache.flink.table.runtime.operators.join.lookup.AbstractLookupJoinRunner.withRetry(AbstractLookupJoinRunner.java:104) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:71) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:31) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:40) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:524) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$743/1473507981.runDefaultAction(Unknown Source) ~[?:?]
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:216) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:812) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.runtime.taskmanager.Task$$Lambda$1317/775704149.run(Unknown Source) ~[?:?]
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2023-10-07 18:00:15,071 | INFO  | [flink-akka.actor.default-dispatcher-22] | Call stack:
    at java.lang.Thread.getStackTrace(Thread.java:1564)
  • checkpoint失败
2023-10-07 15:03:42,678 | WARN  | [flink-akka.actor.default-dispatcher-36] | **Failed to trigger or complete checkpoint 6 for job 0595b0727d9241894b541fd6e82af814. (0 consecutive failed attempts so far) |** org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:114)
2253 org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator is suspending.
2254         at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1909) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2255         at org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2256         at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifyJobStatusChange(DefaultExecutionGraph.java:1512) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2257         at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1113) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2258         at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1082) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2259         at org.apache.flink.runtime.scheduler.SchedulerBase.transitionExecutionGraphState(SchedulerBase.java:590) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2260         at org.apache.flink.runtime.scheduler.DefaultScheduler.addVerticesToRestartPending(DefaultScheduler.java:369) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2261         at org.apache.flink.runtime.scheduler.DefaultScheduler.restartTasksWithDelay(DefaultScheduler.java:345) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2262         at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeRestartTasks(DefaultScheduler.java:328) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2263         at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:303) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2264         at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2265         at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2266         at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2267         at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2268         at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1543) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2269         at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1119) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2270         at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1059) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2271         at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:760) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2272         at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2273         at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2274         at org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2275         at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) ~[?:1.8.0_332]
2276         at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683) ~[?:1.8.0_332]
2277         at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010) ~[?:1.8.0_332]
2278         at org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2279         at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2280         at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:482) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2281         at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeAndReleaseSlots(DefaultDeclarativeSlotPool.java:474) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2282         at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:445) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2283         at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2284         at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2285         at org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:505) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2286         at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.handleTaskManagerConnectionLoss(JobMaster.java:1378) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2287         at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1373) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2288         at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:155) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2289         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_332]
2290         at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_332]
2291         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:443) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2292         at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2293         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:443) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2294         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2295         at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2296         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2297         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2298         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2299         at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2300         at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2301         at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2302         at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2303         at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2304         at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2305         at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2306         at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2307         at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2308         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2309         at akka.actor.ActorCell.invoke(ActorCell.scala:548) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2310         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2311         at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2312         at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2313         at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_332]
2314         at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_332]
2315         at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_332]
2316         at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_332]
2317 2023-10-07 15:03:42,681 | WARN  | [flink-akka.actor.default-dispatcher-19] | Remote connection to [null] failed with java.net.ConnectException: Connection refused: node-group-1jPmk0002.mrs-qrmc.com/10.155.0.9:32331 | akka.event.slf4j.Slf4jLogger$$anonfun$receive$1.$anonfun$applyOrElse$2(Slf4jLogger.scala:90)
2318 2023-10-07 15:03:42,681 | WARN  | [flink-akka.actor.default-dispatcher-19] | Association with remote system [akka.tcp://flink@node-group-1jpmk0002.mrs-qrmc.com:32331] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink@node-group-1jpmk0002.mrs-qrmc.com:32331]] Caused by: [java.net.ConnectException     : Connection refused: node-group-1jPmk0002.mrs-qrmc.com/10.155.0.9:32331] | akka.event.slf4j.Slf4jLogger$$anonfun$receive$1.$anonfun$applyOrElse$2(Slf4jLogger.scala:90)
2319 2023-10-07 15:03:42,681 | INFO  | [SourceCoordinator-Source: dws_hljy_logistics_ykt_rytb_m_rec_consume[11]] | Removing registered reader after failure for subtask 0 of source Source: dws_hljy_logistics_ykt_rytb_m_rec_consume[11]. | org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$subtaskFailed$3(SourceCoordinator.java:267     )
2320 2023-10-07 15:03:42,681 | INFO  | [flink-akka.actor.default-dispatcher-36] | Source: dws_hljy_logistics_ykt_rytb_m_rec_consume[11]
  • akka 超时
8 2023-10-07 15:02:01,864 | INFO  | [flink-scheduler-1] | Triggering Checkpoint 5 for job 0595b0727d9241894b541fd6e82af814 failed due to **java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(TaskExecutorGateway.triggerCheckpoint(ExecutionAttemptID, long, long, CheckpointOptions))] at recipient [akka.tcp://flink@node-group-1     jpmk0002.mrs-qrmc.com:32331/user/rpc/taskmanager_0] timed out. This is usually caused by: 1) Akka failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase akka.ask.timeout. |** org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpointRequest$10(CheckpointCoordinator.java:681)
2079 2023-10-07 15:02:01,865 | WARN  | [Checkpoint Timer] | Failed to trigger or complete checkpoint 5 for job 0595b0727d9241894b541fd6e82af814. (0 consecutive failed attempts so far) | org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:114)
2080 org.apache.flink.runtime.checkpoint.CheckpointException: Trigger checkpoint failure.
2081         at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpointRequest$10(CheckpointCoordinator.java:691) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2082         at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884) ~[?:1.8.0_332]
2083         at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866) ~[?:1.8.0_332]
2084         at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_332]
2085         at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_332]
2086         at org.apache.flink.util.concurrent.FutureUtils$WaitingConjunctFuture.handleCompletedFuture(FutureUtils.java:914) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2087         at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_332]
2088         at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_332]
2089         at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_332]
2090         at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_332]
2091         at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:252) ~[?:?]
2092         at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_332]
2093         at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_332]
2094         at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_332]
2095         at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_332]
2096         at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1387) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2097         at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93) ~[?:?]
2098         at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[?:?]
2099         at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92) ~[?:?]
2100         at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_332]
2101         at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_332]
2102         at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_332]
2103         at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_332]
2104         at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:45) ~[?:?]
2105         at akka.dispatch.OnComplete.internal(Future.scala:299) ~[?:?]
2106         at akka.dispatch.OnComplete.internal(Future.scala:297) ~[?:?]
2107         at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224) ~[?:?]
2108         at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221) ~[?:?]
2109         at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) ~[flink-scala_2.12-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2110         at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65) ~[?:?]
2111         at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) ~[flink-scala_2.12-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2112         at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) ~[flink-scala_2.12-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2113         at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) ~[flink-scala_2.12-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2114         at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) ~[flink-scala_2.12-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2115         at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:729) ~[?:?]
2116         at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:479) ~[?:?]
2117         at akka.dispatch.internal.SameThreadExecutionContext$$anon$1.unbatchedExecute(SameThreadExecutionContext.scala:21) ~[?:?]
2118         at akka.dispatch.BatchingExecutor.execute(BatchingExecutor.scala:133) ~[?:?]
2119         at akka.dispatch.BatchingExecutor.execute$(BatchingExecutor.scala:124) ~[?:?]
2120         at akka.dispatch.internal.SameThreadExecutionContext$$anon$1.execute(SameThreadExecutionContext.scala:20) ~[?:?]
2121         at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:365) ~[?:?]
2122         at akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:314) ~[?:?]
2123         at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:318) ~[?:?]
2124         at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:270) ~[?:?]
2125         at java.lang.Thread.run(Thread.java:750) [?:1.8.0_332]
2126 Caused by: java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(TaskExecutorGateway.triggerCheckpoint(ExecutionAttemptID, long, long, CheckpointOptions))] at recipient [akka.tcp://flink@node-group-1jpmk0002.mrs-qrmc.com:32331/user/rpc/taskmanager_0] timed out. This is usually caused by: 1) Akka failed sending the message      silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase akka.ask.timeout.
2127         at org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.triggerCheckpoint(RpcTaskManagerGateway.java:128) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2128         at org.apache.flink.runtime.executiongraph.Execution.triggerCheckpointHelper(Execution.java:854) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2129         at org.apache.flink.runtime.executiongraph.Execution.triggerCheckpoint(Execution.java:830) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2130         at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerTasks(CheckpointCoordinator.java:745) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2131         at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerCheckpointRequest(CheckpointCoordinator.java:678) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2132         at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$startTriggeringCheckpoint$7(CheckpointCoordinator.java:645) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2133         at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) ~[?:1.8.0_332]
2134         at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) ~[?:1.8.0_332]
2135         at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_332]
2136         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_332]
2137         at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_332]
2138         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_332]
2139         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_332]
2140         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_332]
2141         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_332]

作业优化调整

初步优化

  • 根据上述3个原因,并且不存在数据倾斜的情况,适当增加内存,增加checkpoint超时/间隔时间,akka请求超时时间

-- 增加内存
set 'taskmanager.heap.size' = '5092MB';

-- 增加checkpoint间隔/超时时间
set 'execution.checkpointing.timeout'='60min';
set 'execution.checkpointing.interval' = '10 s';

-- 增加异步通信时间
set 'akka.ask.timeout' = '30 s';
  • 优化发现仍然未解决上述问题,再次查看flink-sql作业,发现仍然看到任务不断attempt,进一步分析发现flink-sql 数据流和外部维表进行join时耗时较久,将维表去除进行测试,发现反压很快消失确定问题方向后,进行针对性

二次优化

  • 根据发现的问题后,一是内存溢出,尽量减少维表数据量,根据业务需求、数据建模需要,只选择必须字段加工新的维表,减少join时的缓存数据量
如 维表 dim_a 有  col1,col2,col3,col4,col5,col6,确认只需要,col2,,col6,,则可以加工出 col2,col6的维表
  • 对指标计算,为减少数据计算,利用两阶聚合(先分桶和 group key聚合,再根据group key进行聚合)优势,minibatch(批次计算)优势,在牺牲较低延迟的基础,批次计算指标后,计算效率得到大幅提升,优化前 运行4d 40min 仍存在反压,优化后35min后,反压完全消失
  set 'table.exec.mini-batch.enabled'='true';
 set 'table.exec.mini-batch.allow-latency'='10 s';
 set 'table.exec.mini-batch.size'='5000';
 set 'table.optimizer.agg-phase-strategy'='TWO_PHASE';

优化对比

  • 优化前

在这里插入图片描述

  • 优化后
    在这里插入图片描述

总结

  • 定位问题时,需要根据具体日志信息进行分析,结合flink运行原理和监控页面一步步定位出现问题蒜子
  • 参考文档 https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/tuning/
    在这里插入图片描述
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1076387.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

关于需要用到暂停的unity游戏

在做游戏的时候&#xff0c;我们经常需要用到Time.timescalse 0&#xff1b;来暂停游戏 但是&#xff0c;我们有些其他的东西&#xff0c;却不想它们被暂停影响了。 例如 1、Dotween 如上图增加一段.SetUpdate(true)即可 2、animator public Animator Ani;public void Firs…

链表(7.27)

3.3 链表的实现 3.3.1头插 原理图&#xff1a; newnode为新创建的节点 实现&#xff1a; //头插 //让新节点指向原来的头指针&#xff08;节点&#xff09;&#xff0c;即新节点位于开头 newnode->next plist; //再让头指针&#xff08;节点&#xff09;指向新节点&#…

【亲测】简易商城小程序源码-易优CMS后台

易优小程序是基于前端开源小程序后端易优CMS标签化API接口&#xff0c; 是一套开源、快速搭建个性化需求的小程序CMS。轻量级TP底层框架&#xff0c;前后端分离&#xff0c; 标签化API接口可对接所有小程序&#xff0c;支持二次开发。即使小白用户也能轻松搭建制作一套完整的线…

90后整顿秦始皇老板

我的日常就像跑步机上急速前行的仓鼠&#xff0c;使劲往前冲&#xff0c;心有余力力有限。 我在一个电商运营公司做策划和写文案&#xff0c;每天总是加不完的班&#xff0c;从来没见过下午六点钟的太阳。 我做文案吗&#xff1f;唉&#xff0c;说实话&#xff0c;我倒觉得大…

C#(Csharp)我的基础教程(二)(我的菜鸟教程笔记)-属性和字段的探究与学习

目录 1、字段字段特点&#xff1a;2、属性属性的特点 1、字段 字段是定义在方法外面的变量&#xff0c;是成员变量&#xff0c;主要是为了类的内部数据交换使用&#xff0c;字段一般是用private修饰&#xff0c;也可以用readonly修饰&#xff0c;表示只读字段&#xff0c;其它…

10月底下架!亚马逊新增5大售前审核品类,提醒这6大站点卖家注意

近期&#xff0c;不少加拿大站、沙特阿拉伯站、埃及站、瑞典站、波兰站以及比利时站卖家陆续收到了亚马逊合规政策要求邮件&#xff0c;包括加拿大站对于“带拉绳的童装”、“水壶”、“玻璃门和围栏”三个品类&#xff0c;沙特阿拉伯、埃及对于“面向婴幼儿的食品”品类&#…

如何在C++项目中用C#运行程序调试C++ DLL

问题描述 在C#项目中调用C DLL时报错或者运行结果不符&#xff0c;此时需要运行C#项目并在C中加入断点进行调试 项目准备 项目一&#xff1a;C#项目&#xff08;该项目调用C DLL&#xff09;项目二&#xff1a;C项目&#xff08;生成C DLL&#xff09; 这两个项目不需要在同…

BGP服务器租用腾讯云和阿里云价格对比

BGP云服务器像阿里云和腾讯云均是BGP多线网络&#xff0c;速度更快延迟更低&#xff0c;阿里云BGP服务器2核2G3M带宽优惠价格108元一年起&#xff0c;腾讯云BGP服务器2核2G3M带宽95元一年起&#xff0c;阿腾云atengyun.com分享更多云服务器配置如2核4G、4核8G、8核16G等配置价格…

VALSE2023-快速总结

会议快速总结 1. 前言2. 热点词2.1 自监督预训练2.2 MIM(Masked Image Modeling)2.3 MAE(Masked Autoencoders)2.4 clip&#xff08;Contrastive Language-Image Pre-Training&#xff09;模型2.5 对比学习2.6 扩散模型&#xff08;diffustion model&#xff09;2.7 Nerf&#…

超高速PCIe实时运动控制卡与应用方案将亮相深圳NEPCON,正运动技术邀您前来体验!

助力电子半导体设备加速国产替代导入&#xff0c;正运动超高速PCIe运动控制卡可覆盖电子半导体大部分工艺流程应用&#xff0c;提供高速高精稳定的运动控制解决方案。 ■展会名称&#xff1a; NEPCON ASIA 2023亚洲电子生产设备暨微电子工业展&#xff08;以下简称“2023亚洲…

c#设计模式-行为型模式 之 中介者模式

&#x1f680;简介 又叫调停模式&#xff0c;定义一个中介角色来封装一系列对象之间的交互&#xff0c;使原有对象之间的耦合松散&#xff0c;且可以独立地改变它们之间的交互。 从下右图中可以看到&#xff0c;任何一个类的变 动&#xff0c;只会影响的类本身&#xff0c;以及…

TikTok震撼全球!用户来自何方?

TikTok&#xff08;抖音国际版&#xff09;作为一款风靡全球的短视频应用&#xff0c;正以惊人的速度改变着人们的娱乐方式、社交习惯和消费行为。它汇聚了来自不同地域、不同文化背景的用户&#xff0c;形成了一个庞大而多样化的社区。 这个社区的形成和发展&#xff0c;让我…

TikTok Shop:年轻一代购物革命的未来之旅

随着社交媒体的不断崛起&#xff0c;我们的生活方式也在发生深刻的变革。在这个数字化时代&#xff0c;年轻一代的文化和消费习惯正在不断演变&#xff0c;而TikTok Shop正是这场购物革命的先锋。 一、TikTok Shop的崛起 TikTok Shop是TikTok平台上的一项新功能&#xff0c;旨在…

Vue3中使用tinymce全功能演示,包括开源功能

效果图&#xff1a; 1、下载插件: npm i tinymce npm i tinymce/tinymce-vue 2、在node_modules文件夹中找到tinymce下的skins复制到项目public文件夹中 &#xff08;可以先创建一个tinymce文件夹&#xff09;&#xff1a; 3、在tinymce官网中下载中文包&#xff0c;并放在刚…

天锐绿盾加密软件——企业数据防泄密-CAD图纸、文档、源代码加密管理系统@德人合科技

天锐绿盾是一款专门为企业提供数据防泄密和文档加密管理的软件。该软件通过加密技术保护企业的核心数据&#xff0c;防止数据泄露和侵权行为&#xff0c;同时提供了全方位的文档加密管理系统&#xff0c;实现了对企业数据的安全保障和有效管理。 PC访问地址&#xff1a; isite…

使用安卓Termux+Hexo,手机也能轻松搭建个人博客网站

文章目录 前言1.安装 Hexo2.安装cpolar3.远程访问4.固定公网地址5.结语 前言 Hexo 是一个用 Nodejs 编写的快速、简洁且高效的博客框架。Hexo 使用 Markdown 解析文章&#xff0c;在几秒内&#xff0c;即可利用靓丽的主题生成静态网页。 下面介绍在Termux中安装个人hexo博客并…

Redis-01基本数据结构

1、String 1.1、介绍 String 是最基本的 key-value 结构&#xff0c;key 是唯一标识&#xff0c;value 是具体的值&#xff0c;value其实不仅是字符串&#xff0c; 也可以是数字&#xff08;整数或浮点数&#xff09;&#xff0c;value 最多可以容纳的数据长度是 512M 1.2、…

KWin、libdrm、DRM从上到下全过程 —— drmModeAddFBxxx(1)

序言 最近在研究libdrm、DRM以及KWin&#xff0c;发现要真正理解Linux图形栈从上到下的机制&#xff0c;最好的、最易于理解的方法是将KWin、libdrm和DRM由上到下的调用过程暨代码统一进行研究&#xff0c;这样才能更好地理清其中的关系&#xff0c;把握总体全貌&#xff0c;因…

【WebService】C#搭建的标准WebService接口,在使ESB模版作为参数无法获取参数数据

一、问题说明 1.1 问题描述 使用C# 搭建WebService接口&#xff0c;并按照ESB平台人员的要求&#xff0c;将命名空间改为"http://esb.webservice",使用PostmanESB平台人员提供的入参示例进行测试时&#xff0c;callBussiness接口参数message始终为null。 以下是ES…