spark shuffle·读写流程 和 rdd持久化

news2024/11/15 23:41:50

1.对比mapreduce和spark

 mapreduce里 map的第3条就是说,比如我存了很多条数据,如果一条一条写进磁盘,肯定有很多次IO,我先归并到一个缓存里面再溢写磁盘。

spark与其的差别就是用map代替了buffer,因为map存的key唯一,用map的话可以直接完成combiner操作,且map的key里也有分区的key,避免了一定的排序操作。

除此之外,spark溢写也是必须触发磁盘的

但是多次利用的数据可以放入内存

=======================================================================

spark bypasshandle那个 就是map的时候直接根据最终分区号,直接将记录存入磁盘对应分区文件,中间什么都不干,也不对记录暂存排序,等到数据都map完得到一个个分区内的文件,最后线性IO拼接成一个大文件,并用一个索引文件记录每个分区对应的偏移量

而mapreduce是内存中开辟一个哈希环,每次记录存入环中,等到达到80%阈值,就用一个线程对环内数据进行排序,并溢写到一个磁盘小文件里面,这个小文件包含了各种分区,但内部有序。最终通过归并算法,再把各种小文件读到内存进行归并,IO拼接

java object有浪费空间的嫌疑,哈希表是用数组加链表形成,比如数组里面存的都是一个个元素,给他们指定了类型其实存的也就是一个个object对象,所以数组里面不仅仅是元素,还有各种对象头 链表等元数据,还有指针要指向堆里具体数据。这些加在一起都很浪费空间。 想做哈希表,可以直接数组里面直接指向堆里数据

前者速度快,浪费空间,后者速度慢需要cpu但剩空间,spark会用到后者,因为要保留内存

缺点就是 当分区数过多,每个分区内的文件大小就会很小,且文件数量会很多,在读取的时候,由于文件在磁盘的位置随机,就会有一个随机读写的情况,性能就会特别慢

所以 想走这种handle他还有一个限制,就是分区数小于两百,大于两百的话就只能走开辟内存的sortshuffle

===========================================================================

在sortshuffle里,如果是上述不开启合并但是分区数又特别多的情况会把数据存入内存缓冲区,内存缓冲区是一个数组,数组里存的都是对象引用,真正数据都在jvm堆里,相当于存了引用。 每次用insert方法像内存缓冲区去插入,且key为p和k,如果溢出了会调用growarry方法,扩大array至两倍

mapreduce和这个区别就是,它存放的内存是一个字节数组,或者说字节哈希表,也就是他是环形的,前面存具体的序列化后的数据kv,后面存其对应的索引位置,kv由于有大有小,所以每次占用空间不同,如果溢出会占用索引的位置来存放,在排序的时候,也是比较真实的key,而移动索引!

相同点就是,这两种情况在溢写时由于小文件内存的都是不同分区的数据,最终一定会发生排序,造成分区有序,然后再来一个归并排序IO,并生成相应的分区位置索引文件

如果是正常的比如reducebykey用的必须进行合并,他用的就是map,但是本质上还是一个数组,里面存的还是引用,不过在存的时候会根据key进行哈希运算确定未来最终要在数组存的位置,存的方式就是一个线性再探测法,比如先看你这个位置有没有存过,如果没存过直接放进去,存过就线性再探测。 如果已经存过相同key的,会有一个自定义的update函数来进行更新操作。 和mapreduce的区别就是,他直接在内存进行了聚合,而且存的全是引用不是具体值

=========================================================================

unsafesortshuffle

之所以有unsafesortshuffle,就是因为有以下一个常识,我们之前的放内存缓冲区是一个数组里面放了对象引用,jvm堆里放具体数据,但是这些都没有序列化(字节数组),真实的对象体积是大于序列化后体积,放jvm堆里也不如放java堆里(堆外)方便,因为最终要放磁盘如果是jvm堆是要用内核IO的,有个状态切换的过程浪费效率。注意,堆外只能是字节数组

unsafesortshuffle就是在以上优化下进行,但也有图中的限制条件

 补充知识,内存的管理有一个page这个单位,主要原因是,数据存在内存中,还要存储其对应的索引、偏移量元数据等信息,也就是其在内存中的位置,这个时候,数据存储的单位越小,比如bit或byte,那么索引就会用大量空间记录其文件的位置,比如一个文件有4KB,用bit存储的话索引就是 第1bit第二个bit第三个bit。。。。什么什么是我的1号文件,这个时候如果数据存储的单位越大,那么其索引的占用空间就会越小,所以有了page概念,他的大小不是固定的。可以说他类似于虚拟映射

ShuffleExternalSorter这部分这个地方就是,我把读取的数据他本来是对象,我给他序列化成字节数组(方便存在缓冲区里面),然后我可以得到page的真实地址以及他的索引位置,(真实位置需要得到page的基地址和偏移量),然后我有一个inMemSorter来排序这些索引!

上图是executor的具体构造 

上图是在分配page时候的一系列方法,主要是分为使用jvm堆内空间分配或者是堆wai空间 

之所以有unsafeSortShuffle,就是为了节省内存利用,顺而减少磁盘溢写次数,尽量减少 序列化反序列化这种过程,比如你从磁盘拿到数据它是一个序列化后的字节数组,你拿到内存去给他进行操作,有些操作它是不需要你反序列化成对象去操作的(比如filter),这个时候你如果直接给他走一个unsafe放到堆外,他就不用反序列化,又替内存节省空间(因为序列化比反序列化占用空间小),又能最快速度(堆外是内核读到地址就能走,另一种还要涉及jvm里面代码一些流程来换算地址)

它是为了追求内存最高利用率

unsafeShuffleWriter里也有写在堆上和堆外的,主要是多了一个堆上有对象头这种信息,每次放数据的时候,还有一个对象头的偏移量要算进去,详细看图片

========================================================================

shuffle读

 要读肯定要先去拉取数据,这个时候就分为远端数据和本地数据来拉取,本地的不用说,主要是远端的,他会先定义一个targetRequestSize来指定一次拉取多少,一般他会指定1/48M。上图就是他在拉取远端数据,在一个address中(一台机器上)会有多个块需要拉,但是由于它限制了这个1/48M,就是他不会一口气把一台机器的全部块都拉取过来,而是每台机器都拉个1/48M,这样就不会造成单台机器阻塞。类似于计算机cpu轮询

shuffle的使用场景:

1. 下游分区数量改变   这种情况对应于你上游task很多,但是每个里面的数据很小,开那么多task就很没必要,毕竟浪费进程线程生成和消亡等,可以在shuffle的时候让他们分区数变少来改善

2. 需要聚合 就比如reduce groupByKey

3. 需要全排序,这个时候肯定是shuffle拉取所有主机数据

 在拉取完全部数据后就是开始读取,但是他会根据几个条件走不同分支,首先是根据map的时候是否发生aggregator 我这边读也给他聚合,比如map端已经聚合过了,后面就不用聚合,map端如果没有聚合,后面就要聚合。有这种设置是因为比如reduceByKey和groupByKey。(聚合就是combineByKey那三条算子,第一条怎么搞,后续的怎么搞,最终怎么溢写)

reduceByKey是map端发生聚合,shuffle读的时候就不用聚合了,因为你一条一条数据在map端直接聚合的话,可以让数据量变少,IO也就快,计算和内存占用量也会少。这种map直接聚合 shuffle拉取过来不用聚合就会很方便

groupByKey如果也是map端聚合的话,他就显得很没必要,首先 就算你map的时候也开启聚合了,让相同key的放在一起,这个时候你shuffle读的话,IO传输和内存占用一点没少,因为还是一条一条被传过来,数据量没变,白白浪费map端的cpu空转时间。这种情况还不如直接在map端直接传过来,我shuffle读的时候去进行聚合。

=========================================================================

RDD持久化

调优:

rdd执行过程中,如果有重复调用的地方可以用rdd.cache()或rdd.persist(级别)来暂时存储到内存或磁盘。

但是 上述情况只能在task中间去执行调优,如果是shuffle read之后的rdd,就不用去暂时保存,spark内部会自动帮你跳过shuffle之前的东西,自动调优

另一个调优是为了系统的可靠性,比如你用persist存储在本地磁盘,但是如果这台机器挂了,那么来之不易的查询结果也就丢了,可以用setCheckPointDir()    和 checkpoint()来存储到hdfs系统中,让他更加可靠

=========================================

广播变量 broadcast

 比如上述的list 这个东西是我们driver端编写好逻辑 推送给executor执行,结果再拿给driver

下面的闭包是发生在driver的,每次执行再发送给executor,之间的来回发送就浪费效率 

可以使用广播包(像那种闭包必须序列化才能发,也就是你写代码定义的东西是在driver端生成的,最终的执行还是在executor,executor想要用你的东西,必须让driver序列化一下,然后发送给executor那里反序列化,那边用完就直接销毁了,driver用的东西和executor根本不是一个东西)

广播变量就是driver 发出去之后存在executor的blockManager

上述是下面方法的一种调优 

场景 : 一个数据量大 一个数据量小 用来做关联 不想用shuffle(IO是最大的瓶颈)

low方法: 就是list是一个rdd 我的data也是一个rdd,两个rdd去做一个join操作,但是中间会涉及到shuffle,这样就会导致效率低下,我们的广播变量可以让数据量小的直接发送到数据量多的那个主机,然后直接本地进行操作更方便 没有shuffle

广播变量的方式一可以减少IO,二他不同于于taskbinary,taskbinary虽然也是发送到blockManager,但是他只能作用于单个任务,换了别的任务还是需要driver重新发送相同的数据,而使用broadcast的话,只发送一次给所以executor,不管是谁的任务都可以直接使用变量

所以如果只是单个job,直接taskbinary就行,多个job就把他变广播变量

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/57748.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[附源码]JAVA毕业设计教学辅助系统(系统+LW)

[附源码]JAVA毕业设计教学辅助系统(系统LW) 目运行 环境项配置: Jdk1.8 Tomcat8.5 Mysql HBuilderX(Webstorm也行) Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。 项目技术&…

[附源码]计算机毕业设计springboo酒店客房管理系统

项目运行 环境配置: Jdk1.8 Tomcat7.0 Mysql HBuilderX(Webstorm也行) Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。 项目技术: SSM mybatis Maven Vue 等等组成,B/S模式 M…

【小5聊】使用div+css布局绘制32支球队比赛对阵图,拭目以待冠军花落谁家

虽然不是狂爱足球爱好者,但多少会有关注下,像梅西和C罗是经常听到的 可能是没有我们队的原因,关注会比较少,只看个结果,所以 趁着这次机会,通过js前端技术divcss布局方式绘制本次世界杯足球比赛对阵图 2022…

Android 腾讯位置服务地图简单使用

文章目录概述腾讯位置服务地图SDK兼容性创建工程获取Appkey配置AppKey配置工程代码混淆权限配置地图基础地图地图类型个性化地图3D建筑行政区划出现的问题及解决源码概述 ​ 本文参考腾讯位置服务官方文档:Android地图SDK | 腾讯位置服务 (qq.com) ​ 腾讯位置服…

【数据结构与算法】一套链表 OJ 带你轻松玩转链表

✨个人主页:bit me ✨当前专栏:数据结构 ✨刷题专栏:基础算法 链 表 OJ🏳️一. 移除链表元素🏴二.反转链表🏁三.链表的中间结点🚩四.链表中倒数第k个结点🏳️‍🌈五.合并…

华为阿里等技术专家15年开发经验总结:SSM整合开发实战文档

前言 都说程序员工资高、待遇好, 2022 金九银十到了,你的小目标是 30K、40K,还是 16薪的 20K?作为一名 Java 开发工程师,当能力可以满足公司业务需求时,拿到超预期的 Offer 并不算难。然而,提升…

Spring基础篇:事务开发

Spring整合持久层 Spring技术为什么要与持久层技术进行整合? 1、JavaEE开发过程中我们需要持久才能进行数据库的访问操作。 2、JDBC Hibernate MyBatis进行持久层过程中存在大量的代码冗余。 3、Spring基于模板设计模式对这些持久层技术呢作出了封装。 ps注释&…

[附源码]计算机毕业设计疫情期间小学生作业线上管理系统Springboot程序

项目运行 环境配置: Jdk1.8 Tomcat7.0 Mysql HBuilderX(Webstorm也行) Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。 项目技术: SSM mybatis Maven Vue 等等组成,B/S模式 M…

【Android App】在线直播之搭建WebRTC的服务端(图文解释 简单易懂)

有问题或需要源码请点赞关注收藏后评论区留言私信~~~ 一、WebRTC的系统架构 WebRTC(网页即时通信)是一个支持浏览器之间实时音视频对话的新型技术,WebRTC体系由应用于实时通信的编程接口和一组通信协议组成,已成为互联网流媒体通…

(八) 共享模型之管程【ReentrantLock】

相对于 synchronized 具备如下特定: (1)可中断 (2)可以设置超市时间 (3)可以设置为公平锁 (4)支持多个条件变量 与 synchronized 一样,都支持可重入 基本语法…

回归预测 | MATLAB实现基于RF随机森林的用水量预测(多因素、多指标)

回归预测 | MATLAB实现基于RF随机森林的用水量预测(多因素、多指标) 目录 回归预测 | MATLAB实现基于RF随机森林的用水量预测(多因素、多指标)预测效果基本介绍模型原理程序设计参考资料预测效果 基本介绍 将随机森林回归原理应用到了预测领域,构建了基于随机森林的预测模型,…

Spring-Cloud-Zipkin-05

前言 1、链路追踪由来:在微服务框架中,一个由客户端发起的请求在后端系统中会经过多个不同的服务节点调用来协同产生最后的请求结果,每一个请求都会开成一条复杂的分布式服务调用链路,链路中的任何一环出现高延时或错误都会引导起…

一文读懂vue3和vue2的API风格对比

Vue3 组合式 API(Composition API) 主要用于在大型组件中提高代码逻辑的可复用性。 传统的组件随着业务复杂度越来越高,代码量会不断的加大,整个代码逻辑都不易阅读和理解。 Vue3 使用组合式 API 的地方为 setup。 在 setup 中…

JavaEE-多线程初阶1

✏️作者:银河罐头 📋系列专栏:JavaEE 🌲“种一棵树最好的时间是十年前,其次是现在” 目录1.认识线程1.1概念1.2多线程程序1.3创建线程2.Thread类及常见方法2.1Thread 的常见构造方法2.2 Thread 的几个常见属性2.3中断…

在本地利用服务器显卡跑代码

除了使用xshell等连接服务器以外,pycharm也可以连接服务器,在服务器上运行代码,上传下载文件等操作。 参考:https://cloud.tencent.com/developer/article/1738482 步骤如下: 1、pycharm工具栏:Tools– D…

基于51单片机的压力监测仪(MPX4115)(Proteus仿真+程序)

编号:28 基于51单片机的压力监测仪(MPX4115) 功能描述: 本设计由51单片机最小系统MPX4115压力传感器ADC0832模块液晶1602模块 1、主控制器是AT89C82单片机 2、MPX4115压力传感器采集气压力,通过ADC0832模数转换器进行A/D转换,读…

Java语言与系统设计课程实验报告

做个课设做的我人间失格,写了一晚上没保存,真是哭死 一、目的与要求 (一)、实验目的 掌握Java语言与系统设计的基本思路和方法。 利用所学的基本知识和技能,解决简单的Java语言与系统设计问题。 (二&…

挂耳式蓝牙耳机性价比推荐,几款高性能的耳机分享

无论是在日常还是运动的场景下,我们通常都会选择佩戴着耳机,让我们能够顺利过渡掉枯燥的生活,之前人们会选择入耳式的耳机,在长期佩戴过后会有不小的疾病诞生,在近些年迅速火起的骨传导耳机成为了焦点,其保…

Java线程池理解与学习

线程过多就容易引发内存溢出,因此我们有必要使用线程池的技术 线程池的好处 降低资源消耗: 通过重复利用已创建的线程降低线程创建和销毁造成的消耗 提高响应速度: 当任务到达时,任务可以不需要等待线程创建就能立即执行 提高线…

GEE:关系、条件和布尔运算

ee.Image对象具有一组用于构建决策表达式的关系、条件和布尔运算方法。这些方法可以用来掩膜、绘制分类地图和重新赋值。 本文记录了在GEE(Google Earth Engine)平台上的关系运算符和布尔运算符,分别应用到了三个不用的场景(筛选低…