Flink处理大型离线任务稳定性与性能调优探索

news2025/1/6 20:52:11

Apache Flink作为分布式处理引擎,用于对无界和有界数据流进行状态计算。其中实时任务用于处理无界数据流,离线任务用于处理有界数据。

通过本文你将掌握让大型离线任务运行稳定的能力,同时能够通过分析离线任务运行特点,降低任务运行资源消耗,减少任务成本。

下面我们进入正题:

01

离线任务情况说明

对于平台处理的离线任务,任务大都是处理:从HDFS到HIVE的数据清洗任务。这类任务的特点是数据来一条处理一条,所以任务大都是没有状态的。

看一个任务

source: 301个文件,每个文件9.6G(压缩后的大小),总共大约240亿条数据

trans:对于每条数据通过正则去获取目标数据。

资源配置:301并发、tm:<10core、10slots、15G>、jm: 10core、8G内存。那将会产生32个container(运行在yarn中)。

任务运行的速度大概在1.2亿/min,运行2小时50多分钟。但是任务会偶发的报hadoop集群的问题,如下报错

connection reset by peer
EOFException: End of File Exception

以至于后面这个任务少了几天数据,任务都跑不下去。但其他类似的任务运行的很稳定,“事出反常必有妖”:

本文尝试从内存、并发的角度分析任务的稳定性及任务运行速度等问题。

02


相关理论

1. Task Slots and Resources


cbcfdba53da46e4ee5130cdccb7c33c6.png

通过阅读官网,我们可以了解到:

1. taskmanager是一个jvm进程,一个taskmanager可能执行一个或多个subtask在各自的线程(slot)中。

2. taskmanager的(托管)内存资源会根据slot的数量而分开,但是tm中的 TCP(通过多路复用)连接、心跳消息、共享数据集、数据结构和cpu资源各slot会共享。

所以我们可以设置多个slot 在一个tm中,来实现资源共享,但是一个tm中设置几个slot合适呢?我们先带着疑问接着往下看。

2. 建议cpu和slot数关系


stack overflow 对于 Ideal Number of Task Slots,有一些建议:

As a rule-of-thumb, a good default number of task slots would be the number of CPU cores. With hyper-threading, each slot then takes 2 or more hardware thread contexts.

即:有超线程的机器可以建议设置:numOfslots = 2 * numOfcores ,没有超线程的机器建议设置:numOfslot = numOfcore。

3. tm的资源配置是否合适


目前配置的tm是:10core、10slot、15G,但是跑上述任务时,任务不稳定,这里在stack overflow 也找到了类似的问题:

a9c7e262608cc67cbfba4bba26de1818.png

    We’ve frequently run into problems where, with multiple hosts running one     large task manager a piece, all jobs get scheduled to one host, which can      cause load problems.

42583adf14f56dd68e43314a0eb927de.png

当多个主机(tm)同时运行一个大型任务管理器时,所有作业都被调度到一个主机上,这可能会导致负载问题。

也给出了相应解答:

5553d593626c02aa66c2950bf3321d89.png

    We ended up making multiple smaller task managers per host and jobs seem to be distributed better (although they still cluster on one node often).

a3c3d2b586df9752a0468705c0e1e0a0.png

在每个主机上创建了多个较小的任务管理器,并且作业似乎可以更好地分布(尽管它们仍然经常聚集在一个节点上)。

简单地总结上面的经验就是:调小tm的资源(cpu和memory),作业可以更好地分布。

4. 阿里对于TaskManager资源配置建议


TaskManager资源设置不宜过小,也不宜过大:

1. 如果单个TaskManager资源过小,则可能影响其上作业的稳定性,并且由于其Slot数目不多,无法有效平摊TaskManager的开销,降低了资源的利用效率。

2. 如果单个TaskManager资源过大,则TaskManager上运行的作业数会很多,一旦TaskManager发生单点故障,影响面会很大。

从阿里给出的建议我们可以得出:

当tm设置的资源过大时,遇到单点问题影响面很大。目前看在部署taskmanager <10core,15G> 时,tm资源设置的大了,造成的单点故障的概率提高。

03


问题分析与解决

总体的调整思路

1. 目前<10core,15G> 的设置导致当任务规模到达一定水平时任务运行的将变得不稳定,所以这里调小Tm的<cores、memory>。

2. 因为任务是IO密集型,所以可以考虑1个cpu对应多个slot个数,这里Flink建议是2倍,但需要测试。

3. 当减小每个Tm的资源时,Tm的个数将会增加。在相同任务下,这时需要考虑Jobmanager的调度压力和管理压力,是否对任务运行的稳定性和效率有所影响。

测试结果对比


任务1

73ac787552b29f6ad3761e0488712a38.png

        source: 301个文件,每个文件9.6G(压缩后的大小),总共大约240亿条数据

        trans:对于每条数据通过正则去获取目标数据。

39d020fecc5ac742d1eb41ea320c1ce2.png

a0947e3b3fcf95a838baccb96b3d9e1e.png

现在从3个方面讨论任务运行的情况:

速度:

  1. 从第2,3,5运行结果对比:可以看出yarn集群对于1core支持多并发的速度没有达到超线程效果、或对于hdfs到hive的io密集型任务没有收获很好的效果;

  2. 从第4运行结果看出:当1core 对应 4并发时,速度下降接近一半;

内存使用与资源共享:

  1. 从5、6运行结果对比看出:随着一个tm的slot数的增多,速度有所提升(提升不高),这里可以暂时认为是tm内的 TCP连接、心跳消息、共享数据集、数据结构和cpu资源等起到了共享的作用。

  2. 从7、8、9运行结果对比:可以看出运行速度基本都到了最高峰,此时tm的共享、内存的提高均没有提升Flink的运行速度。

稳定性:

  1. 从9运行结果:可以看出运行速度达到了最快,且多次运行后都能稳定运行完。

任务2

b5daf4eddec1ee74ebe83748dbd67a7a.png

        数据源: 有1000多个文件,每个文件1G(压缩后的大小)总共大约4.5亿条

        数据处理:对于每条数据通过正则去获取目标数据。

f28b971b85d9018e89b56463e7b28a4a.png

335969448d4cdec0d1c08e3c9c73c039.png

  1. 从1,2可以看出因为我们的任务是无状态的,内存的提升并没有提高运行效率,且增加了yarn部署container的负担。

  2. 1000并发下提高slots与core的比,速度也没有发生变化,可能是因为jm调度效率提升,资源共享状况提升,弥补了单线程cpu的运行效率。


任务3

8597ed3166495c2890a7417fc479aa0b.png

        数据源: 有1000多个文件,每个文件十几k,总共大约320万条数据

        数据处理:对于每条数据通过正则去获取目标数据。

ee4c97e62d3b6cb39f28e0a062981dbb.png

b33a539fed0ec48520ef841cb06dd63c.png

对于一个文件数特别多但文件都很小的情况下,我这里在相同cores下提高slot数(并发),那在相同任务并发的情况下,task的数量将减少:

b7d8b7e217c89806f780522dad7e4d54.png

      这样设置会增加tm资源共享的能力,一批数据运行完之后调度的时间变短。假设每次仅通过一次拉取数据就能处理完一个文件,即在调高slot数量之后,处理速度还高于Flink“单次”处理数据速度峰值,这样总的处理时间将会和原来差不多或者更短。

328fab48eb1633fbeafd89112df08b22.png

04


离线任务性能调优小结

1. 任务稳定性


Tm任务的配置不能太大<10core 10slots 15G>,为了保证稳定调小到<4core …> ,之前启动大于100个tm时,任务就开始运行不稳定易导致HDFS集群问题,或flink内部通讯问题,现在运行250个后还可以稳定运行。

2.(IO型任务)运行效率取舍


a. 当单个文件很大时(9.6G/1G:43万条),需要处理速度峰值,此时需要numOfCores = numOfslots,速度将达到1.2亿/min

b. 当单个文件很小时十几kb(不到100条一个文件),这时更在意调度的效率以及Tm中资源共享的能力,即:单位时间内有更多slot去处理任务。此时可以设置numOfcores = n * numOfslots,随着n的提高,处理速度有所下降,但因为一个tm中slot的增多,jm的调度能力提高。

 3. 内存瘦身


对于没有状态(来一条处理一条)的离线任务,内存的使用要求没有那么强,之前nCores=1.5nG 可以调整为 numOfcores=numOfMemory。

一方面节约资源,对于1000并发的任务,内存可节约0.5*1000=500G的内存;另一方面yarn在调度container时压力也会变小。

17987d7863662c1482f7ef2f5c608933.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/411181.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

150.网络安全渗透测试—[Cobalt Strike系列]—[DNS Beacon原理/实战测试]

我认为&#xff0c;无论是学习安全还是从事安全的人多多少少都会有些许的情怀和使命感&#xff01;&#xff01;&#xff01; 文章目录一、DNS Beacon原理1、DNS Beacon简介2、DSN Beacon工作原理二、DNS Beacon实战测试1、实战测试前提2、实战测试过程一、DNS Beacon原理 1、…

大数据分析工具Power BI(三):导入数据操作介绍

导入数据操作介绍进入PowBI&#xff0c;弹出的如下页面也可以直接关闭&#xff0c;在Power BI中想要导入数据需要通过Power Query 编辑器&#xff0c;Power Query 主要用来清洗和整理数据。文件资料下载&#xff1a;https://download.csdn.net/download/xiaoweite1/87587711一、…

Wijmo JavaScript UI 5.20222.877 Crack

Wijmo使用更快、更灵活的 JavaScript UI 组件构建更好的应用程序 使用 Wijmo&#xff0c;利用我们引人注目的 UI 组件库&#xff0c;将更多时间花在应用程序的核心功能上。要求零依赖&#xff0c;Wijmo sports弹性网格&#xff0c;业内最好的 JavaScript 数据网格&#xff0c;提…

JVM性能调优方法和模板

每天 100 万次登陆请求&#xff0c;8G 内存该如何设置 JVM 参数&#xff0c;大概可以分为以下 8 个步骤 。 第一步、新系统上线如何规划容量&#xff1f; 1. 套路总结 任何新的业务系统在上线以前都需要去估算服务器配置和 JVM 的内存参数&#xff0c;这个容量与资源规划并不…

关于 AI ,大家关心的问题

阅读本文大概需要 1.46 分钟。兄弟们&#xff0c;自从我跟曹老师准备合伙做一个 AI 生态的新社群之后&#xff0c;很多人问我最多的问题就是&#xff1a;AI 时代对我们普通人来说意味着什么&#xff1f;普通人又该如何去把握 AI 时代的机会&#xff1f;那么&#xff0c;今天&am…

nodejs+vue家庭菜谱食谱管理系统

目 录 摘 要 I ABSTRACT I 目 录 III 第1章 绪论 1 1.1开发背景 1 1.2开发意义 1 1.3研究内容 1 第2章 主要技术和工具介绍 5 第3章 系统分析 4 3.1可行性分析 4 3.1.1经济可行性 4 3.1.2技术可行性 4 3.1.3操作可行性 4 3.2需求分…

【云原生】k8s Ingress 实现流量路由规则控制

文章目录前言什么是 IngressIngress 的定义格式Ingress 的类型有哪几种&#xff1f;1. Simple fanout2. Name-based virtual hosting3. Path-based routing该如何实现更新 IngressIngress ControllerIngress Class总结前言 在 Kubernetes 中&#xff0c;Ingress 是一个非常重要…

【数据结构与算法分析inC-MarkAllen】1-数学基础

文章目录1. 第一章1.1 进行算法分析目的1.1.1 适应大量数据情况从 NNN 个数中选择第 kkk 大的数递减排序&#xff0c;取第K大的数插入排序思想1.1.2 边界条件正确1.2 数学知识复习1.2.1 指数1.2.2 对数1.2.3 级数几何级数算术级数1.2.4 模运算性质1.2.5 证明方法归纳法斐波那契…

CRM系统和ERP管理系统二者有何区别?

我们常提到的企业管理系统有CRM系统和ERP管理系统&#xff0c;那么二者有何区别呢&#xff1f; 一、目的。 CRM客户关系管理系统主要的目的是已客户关系的建立、发展以及维护。ERP系统主要强调的业务目的是未来提高整个业务的生产力。 二、重点。 …

回顾|伍鸣博士出席《华人之光-世界瞩目的华人 Web3 项目》圆桌论坛

*本文节选自Foresight News《华人之光——世界瞩目的华人 Web3 项目》的圆桌论坛由 Foresight Ventures 与 Foresight News 联合主办的「FORESIGHT 2023 」年度峰会上&#xff0c;在《华人之光——世界瞩目的华人 Web3 项目》的圆桌论坛中&#xff0c;Conflux Co-Founder 伍鸣、…

LiveData数据倒灌?你真的用对了吗?源码解析

文章目录livedata/lifecycleandroidx-lifecycle基本使用uml-关键角色关系观察者模式关键源码分析livedata基本使用源码分析还是从observe开始livedata变化通知观察者viewmodel源码分析FAQlifecycle用到的设计模式lifecycle是如何监听aty生命周期的一些废弃和原因为什么废弃注解…

理解RESTful架构

越来越多的人开始意识到&#xff0c;网站即软件&#xff0c;而且是一种新型的软件。 这种"互联网软件"采用客户端/服务器模式&#xff0c;建立在分布式体系上&#xff0c;通过互联网通信&#xff0c;具有高延时&#xff08;high latency&#xff09;、高并发等特点。…

【华为机试真题详解JAVA实现】—配置文件恢复

目录 一、题目描述 二、解题代码 一、题目描述 有6条配置命令,它们执行的结果分别是: 命 令执 行resetreset whatreset boardboard faultboard addwhere to add<

Flash Linux to eMMC

实验目的&#xff1a;从eMMC启动Linux系统 Step1:确定eMMC被挂在哪个设备 哪个设备含有boot0分区和boot1分区&#xff0c;就是eMMC。实验中是位于mmcblk1上。 rootam64xx-evm:~# ls -l /dev/mmcblk* brw-rw---- 1 root disk 179, 0 Feb 27 13:25 /dev/mmcblk0 brw-rw---- …

实验三、图像复原

1. 实验目的 (1) 理解退化模型。 (2) 掌握常用的图像复原方法。 2. 实验内容 (1) 模拟噪声的行为和影响的能力是图像复原的核心。 示例 1 &#xff1a;使用 imnoise 添加噪声。 J imnoise(I,gaussian) 将方差为 0.01 的零均值高斯白噪声添加到灰度图像 I。 J imnoise(I,g…

简单的做一个学校毕业啊项目

前言&#xff1a;相信看到这篇文章的小伙伴都或多或少有一些编程基础&#xff0c;懂得一些linux的基本命令了吧&#xff0c;本篇文章将带领大家服务器如何部署一个使用django框架开发的一个网站进行云服务器端的部署。 文章使用到的的工具 Python&#xff1a;一种编程语言&…

2023年14界蓝桥杯省赛题解

2023年14界蓝桥杯省赛题解 蒟蒻笔者大二&#xff0c;第一次省赛。总结一下&#xff1a;“300块没了&#xff0c;退钱&#xff01;” A、日期统计 问题描述 小蓝现在有一个长度为 100 的数组&#xff0c;数组中的每个元素的值都在 0 到 9 的范围之内。数组中的元素从左至右如…

【Spring专题】「技术原理」从源码角度去深入分析关于Spring的异常处理ExceptionHandler的实现原理

ExceptionHandler的作用 ExceptionHandler是Spring框架提供的一个注解&#xff0c;用于处理应用程序中的异常。当应用程序中发生异常时&#xff0c;ExceptionHandler将优先地拦截异常并处理它&#xff0c;然后将处理结果返回到前端。该注解可用于类级别和方法级别&#xff0c;…

ONNX转NCNN记录

【pytorch 转 onnx】pytorch-onnx 【onnx 转 ncnn】onnx-ncnn 【ncnn 加载模型】ncnn-load 一、python安装依赖项 pip install onnxruntime onnx opencv-python 二、创建模型并训练&#xff0c;加载模型参数并输出onnx #### pytorch 转 onnx import torch import torch.onnx…

小白学Pytorch系列- -torch.distributions API Distributions (1)

小白学Pytorch系列- -torch.distributions API Distributions (1) 分布包包含可参数化的概率分布和抽样函数。这允许构造用于优化的随机计算图和随机梯度估计器。这个包通常遵循TensorFlow分发包的设计。 不可能通过随机样本直接反向传播。但是&#xff0c;有两种主要方法可以…