【Flink系列二】如何计算Job并行度及slots数量

news2025/1/20 1:58:35

接上文的问题

  1. 并行的任务,需要占用多少slot ?
  2. 一个流处理程序,需要包含多少个任务

首先明确一下概念

slot:TM上分配资源的最小单元,它代表的是资源(比如1G内存,而非线程的概念,好多人把slot类比成线程,是不恰当的)

任务(task):线程调度的最小单元,和java中的类似。

---------------------------------------------------------------------------

为更好的去理解后面如何计算并行度及需要的slots数量,先介绍一下几个概念

并行度(Parallelism)

图1

  •  一个特定算子的子任务(subtask)的个数被称之为并行度(parallelism)一般情况下,一个stream的并行度,可以认为就是其所有算子中最大的并行度。
  • 图中source算子的并行度=2,map算子的并行度=2,keyby算子的并行度=2,sink算子的并行度=1

ps:并行度的设置有3个地方,1=代码中指定,2=提交Job时指定-p参数,3=Flink配置文件conf中执行,其优先级1>2>3, 不详细展开,有问题可以评论区

由图1,我们可以算出stream的任务数=7(两个source + 两个map + 两个keyby + 一个sink)

TaskManager和Slots

图2

  • Flink中每个TaskManager都是一个JVM进程,它可能会在独立的线程上执行一个或多个任务
  • 为了控制一个TM(TaskManager缩写)能接受多少哥task,TM通过task slot来进行控制(一个TM至少有1个slot)
  • 建议TM中slot数量设置为cpu核心数,因为一个TM中slot内存的独享的,但是cpu是共享的,为避免不同slot执行任务时争抢cpu资源,建议slot数量设置和cpu核心数一致
  • 图中slot数量决定了TM上的最大线程并行能力,一个slot可以执行一个线程,也可以串行执行多个线程。

图2中我们看到

  1. source和map算子合并到一块了,那为什么可以合并呢?
  2. 合并后每个任务都占用一个slot,一共是占用了5个slot,现实真的是这样的吗?

带着问题,再看一个例子

source和map算子及keyby算子的并行都调整为6,sink算子的并行度还是1,排列方式如图

图3

按照我们上面的理解,我们应该需要的slot数量=6+6+1=13,但是这样会造成slot资源的浪费(流处理任务第一个算子处理完了之后需要等后面的算子都执行完,再开始下一批次的任务处理),为此,Flink允许任务共享slot

  • 默认情况下,Flink允许子任务共享slot(必须是前后执行的不同的任务),及时他们是不同任务的子任务。这样的结果是,一个slot可以保存作业的整个管道。
  • Task slot是静态的概念,是指TM具有的并发的并行执行能力

所以,Flink优化后一共占用6个slot。

slot共享组

  • 任务槽共享的好处:

1.Flink 集群所需的 task slot 和作业中使用的最大并行度恰好一样。无需计算程序总共包含多少个 task(具有不同并行度)。
2.资源 容易获得更好的资源利用。如果没有 slot 共享,非密集 subtask(source/map())将阻塞和密集型 subtask(window)
一样多的资源
 

默认情况下会设置一个默认的共享组, slotSharingGroup("default"),这样所有的算子都可以共享slot;如果想让两个算子任务不共享slot,通过调整共享组来实现。 不同的共享组一定在不同的slot上

// 1. 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        //设置并行度,所有算子都默认这个并行度
        env.setParallelism(1);


        DataStreamSource<String> ds = env.socketTextStream("hadoop102", 8888);

        ds.flatmap(new WordCount.MyFlatMapper()).name("f1").setParallelism(2).slotSharingGroup("a")
          .keyBy(0)
          .sum(1).setParallelism(2).slotSharingGroup("c");
          .print().setParallelism(1)

        // 5. 启动执行
        env.execute();

show plan后我们可以看到slot没有共享,执行stream需要4个slot

图4

如果不单独设置slot共享组,那么该任务的slot个数=2,

并行子任务的分配

图5

图5中有两条不同的流,每个字母右下角的下标代表并行度,A并行度=4,B并行度=4,C并行度=2,D并行度=4,E并行度=2;

整个任务开启slot共享后,一个会有4+4+4+2+2=16个任务,一共需要申请4个slot;

C->D过程涉及数据的合并,需要将数据copy到D的每个子任务中。

总结

下图中在flink中配置文件flink-conf设置的并行度是3,flink集群中TM数量=3,每个TM中slot数量=3

Example1中代码中设置的paeallelism=1,并且允许slot共享,所以会占用1个slot,3个算子任务

Example1中代码中设置的paeallelism=2,并且允许slot共享,所以会占用2个slot,6个算子任务

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1293463.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

设备制造行业CRM:提升客户满意度,驱动业务增长

设备制造行业客户需求多样化、服务链路长&#xff0c;企业在关注APS、EMS等工业软件之余还要以客户为中心&#xff0c;做好客户服务。设备制造行业CRM管理系统是企业管理客户关系的利器&#xff0c;设备制造行业CRM的作用有哪些&#xff1f;一文带您看懂。 设备制造行业需要解…

【深度学习】强化学习(一)强化学习定义

文章目录 一、强化学习问题1、交互的对象1. 智能体&#xff08;Agent&#xff09;2. 环境&#xff08;Environment&#xff09; 2、强化学习的基本要素1. 状态 &#x1d460;2. 动作 &#x1d44e;3. 策略 &#x1d70b;(&#x1d44e;|&#x1d460;)4. 状态转移概率 &#x1…

elk(filebeat)日志收集工具

elk&#xff08;filebeat&#xff09;日志收集工具 elk&#xff1a;filebeat日志收集工具 和logstash相同 filebeat是一个轻量级的日志收集工具&#xff0c;所使用的系统资源比logstash部署和启动时使用的资源要小得多 filebeat可以运行在非Java环境。他可以代理logstash在…

ArcGIS Pro中怎么设置标注换行

在ArcGIS Pro中进行文字标注的时候&#xff0c;如果标注的字段内容太长&#xff0c;直接标注的话会不美观&#xff0c;而且还会影响旁边的标注显示&#xff0c;这里为大家介绍一下在ArcGIS Pro中设置文字换行的方法&#xff0c;希望能对你有所帮助。 数据来源 本教程所使用的…

【UE5】瞬移+马赛克过渡效果

效果 步骤 1. 新建一个工程&#xff0c;创建一个Basic关卡 2. 添加第三人称游戏资源到内容浏览器 3. 新建一个材质&#xff0c;这里命名为“M_Pixel” 打开“M_Pixel”&#xff0c;设置材质域为“后期处理” 在材质图表中添加如下节点 此时效果如下&#xff0c;已经有马赛克的…

JVM 命令行监控及诊断工具

面试题 你使用过Java虚拟机性能监控和故障处理工具吗&#xff1f;&#xff08;美图&#xff09; 怎么打出线程栈信息。&#xff08;字节跳动&#xff09; JVM诊断调优工具用过哪些&#xff1f; (京东) 怎么获取 Java 程序使用的内存&#xff1f;堆使用…

Django模板,Django中间件,ORM操作(pymysql + SQL语句),连接池,session和cookie, 缓存

day04 django进阶-知识点 今日概要&#xff1a; 模板中间件ORM操作&#xff08;pymysql SQL语句&#xff09;session和cookie缓存&#xff08;很多种方式&#xff09; 内容回顾 请求周期 路由系统 最基本路由关系动态路由&#xff08;含正则&#xff09;路由分发不同的app中…

ssh安装和Gitee(码云)源码拉取

文章目录 安装ssh服务注册码云公钥设置码云账户SSH公钥安装git客户端和git-lfs源码获取 安装ssh服务 更新软件源&#xff1a; sudo apt-get update安装ssh服务 sudo apt-get install openssh-server检查ssh是否安装成功 which ssh输出&#xff1a; /usr/bin/ssh启动ssh 服…

『亚马逊云科技产品测评』活动征文|基于亚马逊云EC2搭建PG开源数据库

授权声明&#xff1a;本篇文章授权活动官方亚马逊云科技文章转发、改写权&#xff0c;包括不限于在 Developer Centre, 知乎&#xff0c;自媒体平台&#xff0c;第三方开发者媒体等亚马逊云科技官方渠道 亚马逊EC2云服务器&#xff08;Elastic Compute Cloud&#xff09;是亚马…

conda配置不同版本的python及依赖库--conda conda conda

一、conda介绍 Conda 是一个开源的软件包管理系统和环境管理系统&#xff0c;用于安装多个不同版本的软件包及其依赖关系&#xff0c;并在它们之间轻松切换。 Conda 是为 Python 程序创建的&#xff0c;适用于 Linux&#xff0c;OS X 和Windows Conda可以构建不同的环境&#…

时间序列预测实战(二十四)PyTorch实现RNN进行多元和单元预测(附代码+数据集+完整解析)

一、本文介绍 本篇文章给大家带来的是利用我个人编写的架构进行RNN时间序列卷积进行时间序列建模&#xff08;专门为了时间序列领域新人编写的架构&#xff0c;简单且不同于市面上大家用GPT写的代码&#xff09;&#xff0c;包括结果可视化、支持单元预测、多元预测、模型拟合…

Ubuntu安装nvidia GPU显卡驱动教程

Ubuntu安装nvidia显卡驱动 1.安装前安装必要的依赖 sudo apt-get install build-essential sudo apt-get install g sudo apt-get install make2.到官网下载对应驱动 https://www.nvidia.cn/Download/index.aspx?langcn 3.卸载原有驱动 sudo apt-get remove --purge nvidi…

Attack Lab 【深入理解计算机组成与系统实验】(更新ing)

前情提要&#xff1a;因为图片都很长的原因&#xff0c;up不会调&#xff0c;所以可能看的不舒服&#xff0c;请原谅up&#xff0c;电脑可以缩小至80或者90看会舒服一点&#xff0c;实在抱歉了 实验原理&#xff1a; 本次实验用到的基本上是缓冲区越界&#xff0c;学过c语言的可…

AR + 通信,虚实结合让工作协同从线上到「现场」

在数字经济无所不在的当下&#xff0c;千行百业都与数智化办公接轨并因其实现转型升级。关注【融云 RongCloud】&#xff0c;了解协同办公平台更多干货。 升级的背后&#xff0c;是利用技术把工作用更自然的方式连接起来&#xff0c;让整个工作流协同更顺、体验更好。 而其中…

龙良曲PyTorch入门到实战 深度学习

文章目录 笔记激活函数与Loss的梯度lesson5 手写数字识别问题lesson6 基本数据类型lesson7 创建tensorlesson8 索引和切片lesson9 维度变换lesson10 broadcastinglesson11 分割和合并lesson12 数学运算lesson13 Tensor统计lesson14 Tensor高阶lesson16 什么是梯度lesson17 常见…

postcss-pxtorem实现页面自适应的原理

先声明一点这玩意本身不能实现哈&#xff0c;他只是一个工具&#xff0c;更是一个postcss的插件 帮助我们从px转化成为rem比如我们的代码 div {height: 100px;width: 100px; }经过这个插件转化之后变成 假设变成下面这样哈 div {height: 1rem;width: 1rem; }其他没啥子太大作…

WMMSE方法的使用笔记

标题很帅 原论文的描述WMMSE的简单应用 无线蜂窝通信系统的预编码设计问题中&#xff0c;经常提到用WMMSE方法设计多用户和速率最大化的预编码&#xff0c;其中最为关键的一步是将原和速率最大化问题转化为均方误差最小化问题&#xff0c;从而将问题由非凸变为关于三个新变量的…

【计算机组成体系结构】SRAM和DRAM

RAM — Random Access Memory 随机访问存储器 —指定某一存储单元地址的时候&#xff0c;存储单元的读取速度并不会因为存储单元的物理位置改变 SRAM即为 Static RAM 静态随机访问存储器 — 用于主存DRAM即为 Dynamic RAM 动态随机访问存储器 — 用于Cache 一、SRAM和DRAM的特…

Elasticsearch 8.9 flush刷新缓存中的数据到磁盘源码

一、相关API的handler1、接收HTTP请求的hander2、每一个数据节点(node)执行分片刷新的action是TransportShardFlushAction 二、对indexShard执行刷新请求1、首先获取读锁&#xff0c;再获取刷新锁&#xff0c;如果获取不到根据参数决定是否直接返回还是等待2、在刷新之后transl…

Jquery easyui异步提交表单的两种方式

这篇文章分享一下easyui常用的两种表单异步提交的方式。 目录 第一种&#xff1a;利用ajax提交 $.post() $.ajax() 第二种&#xff1a;使用easyui提供的表单提交方式 首先&#xff0c;准备一个简单的表单&#xff0c;包含三个输入框&#xff0c;在页面引入easyui的js文件。…