【Spark精讲】Spark与MapReduce对比

news2024/11/17 7:23:36

目录

对比总结

MapReduce流程

​编辑

MapTask流程

ReduceTask流程

MapReduce原理

阶段划分

Map shuffle

Partition

Collector

Sort

Spill

Merge

Reduce shuffle

Copy

Merge Sort


对比总结

  1. Map端读取文件:都是需要通过split概念来进行逻辑切片,概念相同,底层具体实现和参数略有差异;
  2. 业务逻辑实现方式:MapReduce引擎是通过用户自定义实现Mapper和Reducer类来实现业务逻辑的;而Spark提供了丰富的算子以及上层DataFrame、DataSet的抽象;
  3. 计算引擎:MapReduce是基于标准的map reduce计算理念的实现,map任务和reduce任务数据交换通过读写文件的方式进行的,运行效率低;Spark是基于内存的(RDD),同时Spark的计算引擎是基于DAG方式实现的(类似Tez),除了shuffle会通过文件进行数据交换外,每个阶段的计算是基于内存的,运行效率高,基于内存有利于进行性能优化,同时提供缓存和广播机制有利于计算结果复用,适合算法迭代计算,此外不但可以使用堆内内存还可以使用堆外内存;
  4. 部署方式:都可以基于YARN,MapReduce基于YARN实现了MRAppMaster,Spark基于YARN实现了ApplicationMaster;
  5. Shuffle方式:MapReduce采用了基于排序的数据聚集策略(按Key排序),而该策略是不可定制的,不可以使用其他数据聚集算法(如Hash聚集);而Spark默认也需要排序,但一定条件下也可以不进行排序(ByPass和Tungsten方式)。
  6. 运行方式:MapReduce的Job是基于JVM进程的;而Spark的Driver和Executor是JVM进程,Task运行是基于线程的。

MapReduce流程

MapTask流程

  1. Read阶段:Map Task通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。
  2. Map阶段:该阶段主要是讲解析的key/value交给用户编写的map()函数处理,并产生一些列新的key/value。
  3. Collect阶段:在用户编写的map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分片(通过调用Partitioner),并写入一个环形内存缓冲区中。
  4. Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要主要的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
  5. Combine阶段:当所有数据处理完成后,Map Task对所有临时文件进行一次合并,以确保最终只是生成一个数据文件。

ReduceTask流程

  1. Shuffle阶段:也称之为Copy阶段。Reduce Task从各个Map Task远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
  2. Merge阶段:在远程拷贝数据的同时,Reduce Task启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
  3. Sort阶段:按照MapReduce语义,用户编写的reduce()函数输入数据是按Key进行聚集的一组数据。为了将Key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个Map Task已经实现对自己的处理结果进行了局部排序。因此Reduce Task只需对所有数据进行一次归并排序即可。
  4. Reduce阶段:在该阶段中,Reduce Task将每组数据依次交给用户编写的reduce()函数处理。
  5. Write阶段:reduce()函数将计算结果写到HDFS上。

MapReduce原理

阶段划分

我们知道MapReduce计算模型主要由三个阶段构成:Map、shuffle、Reduce。

  1. Map是映射,负责数据的过滤分法,将原始数据转化为键值对;
  2. Reduce是合并,将具有相同key值的value进行处理后再输出新的键值对作为最终结果。
  3. Shuffle:为了让Reduce可以并行处理Map的结果,必须对Map的输出进行一定的排序与分割,然后再交给对应的Reduce,而这个将Map输出进行进一步整理并交给Reduce的过程就是Shuffle。

整个MR的大致过程如下:

Map和Reduce操作需要我们自己定义相应Map类和Reduce类,以完成我们所需要的化简、合并操作,而shuffle则是系统自动帮我们实现的,了解shuffle的具体流程能帮助我们编写出更加高效的Mapreduce程序。

Shuffle过程包含在Map和Reduce两端,即Map shuffle和Reduce shuffle。

Map shuffle

在Map端的shuffle过程是对Map的结果进行分区、排序、分割,然后将属于同一划分(分区)的输出合并在一起并写在磁盘上,最终得到一个分区有序的文件,分区有序的含义是map输出的键值对按分区进行排列,具有相同partition值的键值对存储在一起,每个分区里面的键值对又按key值进行升序排列(默认),其流程大致如下:

Partition

对于map输出的每一个键值对,系统都会给定一个partition,partition值默认是通过计算key的hash值后对Reduce task的数量取模获得。如果一个键值对的partition值为1,意味着这个键值对会交给第一个Reducer处理。

我们知道每一个Reduce的输出都是有序的,但是将所有Reduce的输出合并到一起却并非是全局有序的,如果要做到全局有序,我们该怎么做呢?最简单的方式,只设置一个Reduce task,但是这样完全发挥不出集群的优势,而且能应对的数据量也很受限。最佳的方式是自己定义一个Partitioner,用输入数据的最大值除以系统Reduce task数量的商作为分割边界,也就是说分割数据的边界为此商的1倍、2倍至numPartitions-1倍,这样就能保证执行partition后的数据是整体有序的。

另一种需要我们自己定义一个Partitioner的情况是各个Reduce task处理的键值对数量极不平衡。对于某些数据集,由于很多不同的key的hash值都一样,导致这些键值对都被分给同一个Reducer处理,而其他的Reducer处理的键值对很少,从而拖延整个任务的进度。当然,编写自己的Partitioner必须要保证具有相同key值的键值对分发到同一个Reducer。

Collector

Map的输出结果是由collector处理的,每个Map任务不断地将键值对输出到在内存中构造的一个环形数据结构中。使用环形数据结构是为了更有效地使用内存空间,在内存中放置尽可能多的数据。

这个数据结构其实就是个字节数组,叫Kvbuffer,名如其义,但是这里面不光放置了数据,还放置了一些索引数据,给放置索引数据的区域起了一个Kvmeta的别名,在Kvbuffer的一块区域上穿了一个IntBuffer(字节序采用的是平台自身的字节序)的马甲。数据区域和索引数据区域在Kvbuffer中是相邻不重叠的两个区域,用一个分界点来划分两者,分界点不是亘古不变的,而是每次Spill之后都会更新一次。初始的分界点是0,数据的存储方向是向上增长,索引数据的存储方向是向下增长,如图所示:

Kvbuffer的存放指针bufindex是一直闷着头地向上增长,比如bufindex初始值为0,一个Int型的key写完之后,bufindex增长为4,一个Int型的value写完之后,bufindex增长为8。

索引是对在kvbuffer中的键值对的索引,是个四元组,包括:value的起始位置、key的起始位置、partition值、value的长度,占用四个Int长度,Kvmeta的存放指针Kvindex每次都是向下跳四个“格子”,然后再向上一个格子一个格子地填充四元组的数据。比如Kvindex初始位置是-4,当第一个键值对写完之后,(Kvindex+0)的位置存放value的起始位置、(Kvindex+1)的位置存放key的起始位置、(Kvindex+2)的位置存放partition的值、(Kvindex+3)的位置存放value的长度,然后Kvindex跳到-8位置,等第二个键值对和索引写完之后,Kvindex跳到-12位置。

Kvbuffer的大小可以通过io.sort.mb设置,默认大小为100M。但不管怎么设置,Kvbuffer的容量都是有限的,键值对和索引不断地增加,加着加着,Kvbuffer总有不够用的那天,那怎么办?把数据从内存刷到磁盘上再接着往内存写数据,把Kvbuffer中的数据刷到磁盘上的过程就叫Spill,多么明了的叫法,内存中的数据满了就自动地spill到具有更大空间的磁盘。

关于Spill触发的条件,也就是Kvbuffer用到什么程度开始Spill,还是要讲究一下的。如果把Kvbuffer用得死死得,一点缝都不剩的时候再开始Spill,那Map任务就需要等Spill完成腾出空间之后才能继续写数据;如果Kvbuffer只是满到一定程度,比如80%的时候就开始Spill,那在Spill的同时,Map任务还能继续写数据,如果Spill够快,Map可能都不需要为空闲空间而发愁。两利相衡取其大,一般选择后者。Spill的门限可以通过io.sort.spill.percent,默认是0.8。

Spill这个重要的过程是由Spill线程承担,Spill线程从Map任务接到“命令”之后就开始正式干活,干的活叫SortAndSpill,原来不仅仅是Spill,在Spill之前还有个颇具争议性的Sort。

Sort

当Spill触发后,SortAndSpill先把Kvbuffer中的数据按照partition值和key两个关键字升序排序,移动的只是索引数据,排序结果是Kvmeta中数据按照partition为单位聚集在一起,同一partition内的按照key有序。

Spill

Spill线程为这次Spill过程创建一个磁盘文件:从所有的本地目录中轮训查找能存储这么大空间的目录,找到之后在其中创建一个类似于“spill12.out”的文件。Spill线程根据排过序的Kvmeta挨个partition的把数据吐到这个文件中,一个partition对应的数据吐完之后顺序地吐下个partition,直到把所有的partition遍历完。一个partition在文件中对应的数据也叫段(segment)。在这个过程中如果用户配置了combiner类,那么在写之前会先调用combineAndSpill(),对结果进行进一步合并后再写出。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用。那哪些场景才能使用Combiner呢?Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。所以从我的想法来看,Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果。

所有的partition对应的数据都放在这个文件里,虽然是顺序存放的,但是怎么直接知道某个partition在这个文件中存放的起始位置呢?强大的索引又出场了。有一个三元组记录某个partition对应的数据在这个文件中的索引:起始位置、原始数据长度、压缩之后的数据长度,一个partition对应一个三元组。然后把这些索引信息存放在内存中,如果内存中放不下了,后续的索引信息就需要写到磁盘文件中了:从所有的本地目录中轮训查找能存储这么大空间的目录,找到之后在其中创建一个类似于“spill12.out.index”的文件,文件中不光存储了索引数据,还存储了crc32的校验数据。spill12.out.index不一定在磁盘上创建,如果内存(默认1M空间)中能放得下就放在内存中,即使在磁盘上创建了,和spill12.out文件也不一定在同一个目录下。每一次Spill过程就会最少生成一个out文件,有时还会生成index文件,Spill的次数也烙印在文件名中。索引文件和数据文件的对应关系如下图所示:

在Spill线程如火如荼的进行SortAndSpill工作的同时,Map任务不会因此而停歇,而是一无既往地进行着数据输出。Map还是把数据写到kvbuffer中,那问题就来了:只顾着闷头按照bufindex指针向上增长,kvmeta只顾着按照Kvindex向下增长,是保持指针起始位置不变继续跑呢,还是另谋它路?如果保持指针起始位置不变,很快bufindex和Kvindex就碰头了,碰头之后再重新开始或者移动内存都比较麻烦,不可取。Map取kvbuffer中剩余空间的中间位置,用这个位置设置为新的分界点,bufindex指针移动到这个分界点,Kvindex移动到这个分界点的-16位置,然后两者就可以和谐地按照自己既定的轨迹放置数据了,当Spill完成,空间腾出之后,不需要做任何改动继续前进。分界点的转换如下图所示:

Map任务总要把输出的数据写到磁盘上,即使输出数据量很小在内存中全部能装得下,在最后也会把数据刷到磁盘上。

Merge

Map任务如果输出数据量很大,可能会进行好几次Spill,out文件和Index文件会产生很多,分布在不同的磁盘上。最后把这些文件进行合并的merge过程闪亮登场。

Merge过程怎么知道产生的Spill文件都在哪了呢?从所有的本地目录上扫描得到产生的Spill文件,然后把路径存储在一个数组里。Merge过程又怎么知道Spill的索引信息呢?没错,也是从所有的本地目录上扫描得到Index文件,然后把索引信息存储在一个列表里。到这里,又遇到了一个值得纳闷的地方。在之前Spill过程中的时候为什么不直接把这些信息存储在内存中呢,何必又多了这步扫描的操作?特别是Spill的索引数据,之前当内存超限之后就把数据写到磁盘,现在又要从磁盘把这些数据读出来,还是需要装到更多的内存中。之所以多此一举,是因为这时kvbuffer这个内存大户已经不再使用可以回收,有内存空间来装这些数据了。(对于内存空间较大的土豪来说,用内存来省却这两个io步骤还是值得考虑的。)

然后为merge过程创建一个叫file.out的文件和一个叫file.out.Index的文件用来存储最终的输出和索引,一个partition一个partition的进行合并输出。对于某个partition来说,从索引列表中查询这个partition对应的所有索引信息,每个对应一个段插入到段列表中。也就是这个partition对应一个段列表,记录所有的Spill文件中对应的这个partition那段数据的文件名、起始位置、长度等等。

然后对这个partition对应的所有的segment进行合并,目标是合并成一个segment。当这个partition对应很多个segment时,会分批地进行合并:先从segment列表中把第一批取出来,以key为关键字放置成最小堆,然后从最小堆中每次取出最小的输出到一个临时文件中,这样就把这一批段合并成一个临时的段,把它加回到segment列表中;再从segment列表中把第二批取出来合并输出到一个临时segment,把其加入到列表中;这样往复执行,直到剩下的段是一批,输出到最终的文件中。最终的索引数据仍然输出到Index文件中。

Reduce shuffle

在Reduce端,shuffle主要分为复制Map输出、排序合并两个阶段。

Copy

Reduce任务通过HTTP向各个Map任务拖取它所需要的数据。Map任务成功完成后,会通知父TaskTracker状态已经更新,TaskTracker进而通知JobTracker(这些通知在心跳机制中进行)。所以,对于指定作业来说,JobTracker能记录Map输出和TaskTracker的映射关系。Reduce会定期向JobTracker获取Map的输出位置,一旦拿到输出位置,Reduce任务就会从此输出对应的TaskTracker上复制输出到本地,而不会等到所有的Map任务结束。

Merge Sort

Copy过来的数据会先放入内存缓冲区中,如果内存缓冲区中能放得下这次数据的话就直接把数据写到内存中,即内存到内存merge。Reduce要向每个Map去拖取数据,在内存中每个Map对应一块数据,当内存缓存区中存储的Map数据占用空间达到一定程度的时候,开始启动内存中merge,把内存中的数据merge输出到磁盘上一个文件中,即内存到磁盘merge。在将buffer中多个map输出合并写入磁盘之前,如果设置了Combiner,则会化简压缩合并的map输出。Reduce的内存缓冲区可通过mapred.job.shuffle.input.buffer.percent配置,默认是JVM的heap size的70%。内存到磁盘merge的启动门限可以通过mapred.job.shuffle.merge.percent配置,默认是66%。

当属于该reducer的map输出全部拷贝完成,则会在reducer上生成多个文件(如果拖取的所有map数据总量都没有内存缓冲区,则数据就只存在于内存中),这时开始执行合并操作,即磁盘到磁盘merge,Map的输出数据已经是有序的,Merge进行一次合并排序,所谓Reduce端的sort过程就是这个合并的过程。一般Reduce是一边copy一边sort,即copy和sort两个阶段是重叠而不是完全分开的。最终Reduce shuffle过程会输出一个整体有序的数据块。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1314289.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CSS新手入门笔记整理:CSS常用属性表

字体样式 属性 属性值 说明 font-family 微软雅黑/苹方 字体类型 font-size 数值 字体大小 font-weight 数值/关键字 字体粗细(字重) font-style normal(正常(默认值))italic(斜体)oblique&am…

湖南财信:灰盒测试筑牢安全防线,保障差异化金融服务体系建设

湖南财信是国有独资公司,湖南省唯一的省级地方金融控股公司、省属国有大型骨干企业。湖南财信坚持金融科技战略,以数字化建设为驱动,创新融资服务手段,逐步打造出差异化产品与数字化服务体系,全方位、多维度推动金融服…

大语言模型:开启自然语言处理新纪元

导言 大语言模型,如GPT-3(Generative Pre-trained Transformer 3),标志着自然语言处理领域取得的一项重大突破。本文将深入研究大语言模型的基本原理、应用领域以及对未来的影响。 1. 简介 大语言模型是基于深度学习和变压器&…

WPF 基于TableControl的页面切换

文章目录 前言其它项目的UserControl切换TableControl添加按钮,隐去TableItem的Header 结论 前言 我想用WPF简单实现一个按钮视图切换的效果,但是我发现别人的实现效果非常的麻烦。 其它项目的UserControl切换 我网上找了个开源的项目,他是…

网络协议 - HTTP 协议详解

网络协议 - HTTP 协议详解 一 、基础概念URL请求和响应报文1. 请求报文2. 响应报文 二、HTTP 方法GETHEADPOSTPUTPATCHDELETEOPTIONSCONNECTTRACE 三、HTTP 状态码1XX 信息2XX 成功3XX 重定向4XX 客户端错误5XX 服务器错误 四、HTTP 首部通用首部字段请求首部字段响应首部字段实…

如何提升数据结构方面的算法能力?

谈及为什么需要花时间学算法,我至少可以列举出三个很好的理由。 (1)性能:选择正确的算法可以显著提升应用程序的速度。仅就搜索来说,用二分查找替 换线性搜索就能为我们帶来巨大的收益。 (2)安全性:如果你选用了错误的算法&…

软件供应链投毒 — NPM 恶意组件分析

聚焦源代码安全,网罗国内外最新资讯! 专栏供应链安全 数字化时代,软件无处不在。软件如同社会中的“虚拟人”,已经成为支撑社会正常运转的最基本元素之一,软件的安全性问题也正在成为当今社会的根本性、基础性问题。 随…

最新Redis7主从复制(保姆级教程)

前提准备:三台云服务器(吐血消费,点赞回血)也可以使用虚拟机创建三台,但是我搞了一天也连接不上,要是又可以连接上的大家可以教我一下,也可以参考一下或者大家可以参考一下这个大佬的配置&#…

【产品经理】产品增效项目落地,项目反哺产品成长

产品和项目是相辅相成的关系,产品的规范、成熟,为项目的快速落地提供支撑,项目的落地反哺产品,促进产品的成长成熟。 软件工程的初期是,我们需要什么,就立项项目,通过项目实现需要。 随着项目的…

hive的分区表和分桶表详解

分区表 Hive中的分区就是把一张大表的数据按照业务需要分散的存储到多个目录,每个目录就称为该表的一个分区。在查询时通过where子句中的表达式选择查询所需要的分区,这样的查询效率会提高很多。 静态分区表基本语法 创建分区表 create table dept_p…

k8s中pod监控数据在grafana中展示

实现目标:将kubesphere[K8S]中运行的pod监控数据在grafana平台进行展示。 前提说明:需要在k8s每个集群中内置的prometheus配置中将pod指标数据远程写入到victoriametrics持久化数据库中。 实现效果如下: CPU使用量: round(sum by (namespace, pod) (irate(container_cpu…

ControlNet Adding Conditional Control to Text-to-Image Diffusion Models

ControlNet: Adding Conditional Control to Text-to-Image Diffusion Models TL; DR:ControlNet 使得我们能通过输入额外的条件图(如 Canny 边缘、人体姿态、深度图等),对 SD 生成结果的空间位置有更准确的控制。它拷贝 SD 部分…

网络协议 - UDP 协议详解

网络协议 - UDP 协议详解 UDP概述UDP特点UDP的首部格式UDP校验 參考文章 基于TCP和UDP的协议非常广泛,所以也有必要对UDP协议进行详解。 UDP概述 UDP(User Datagram Protocol)即用户数据报协议,在网络中它与TCP协议一样用于处理数据包,是一种…

在前端开发中,什么是前端数据缓存(caching)?它有哪些应用场景?

聚沙成塔每天进步一点点 ⭐ 专栏简介 前端入门之旅:探索Web开发的奇妙世界 欢迎来到前端入门之旅!感兴趣的可以订阅本专栏哦!这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发…

C# WPF上位机开发(属性页面的设计)

【 声明:版权所有,欢迎转载,请勿用于商业用途。 联系信箱:feixiaoxing 163.com】 在软件开发中,属性或者参数设置是很重要的一个部分。这个时候如果不想通过动态添加控件的方法来处理的话,那么可以通过tab控…

解决Chrome同一账号在不同设备无法自动同步书签的问题

文章目录 一、问题与原因?2. 解决办法 一、问题与原因? 1.问题 使用谷歌Chrome浏览器比较头疼的问题就是:使用同一个Google账号,办公电脑与家用电脑的数据无法同步。比如:办公电脑中的书签、浏览记录等数据&#xff0…

Renyi散度:Renyi divergence

有关Renyi散度的基本介绍挺多博客已经写了。本文章主要介绍最基础的概念,以及近些年论文中为啥老喜欢引用这个概念。 一.基础概念 Renyi散度主要是描述两个分布之间的关系。对一个离散的概率分布X,其定义域记作,其实就是概率不为零的点的集…

Idea执行bat使用maven打包springboot项目成docker镜像并push到Harbor

如果执行以下命令失败,先把mvn的-q参数去掉,让错误输出到控制台。 《idea配置优化、Maven配置镜像、并行构建加速打包、解决maven打包时偶尔几个文件没权限的问题》下面的使用company-repo私有仓库和阿里云镜像仓库同时使用的配置参考。 bat echo off …

seleniumwire获取页面接口数据

selenium并不支持获取响应的数据,我们可以使用selenium-wire库,selenium-wire扩展了 Selenium 的 Python 绑定,可以访问浏览器发出的底层请求。 编写的代码与 Selenium 的方式相同。 1. 先安装seleniumwire的插件 pip install selenium-wir…

RS®SMM100A 矢量信号发生器具备毫米波测试功能的中档矢量信号发生器

R&SSMM100A 矢量信号发生器 具备毫米波测试功能的中档矢量信号发生器 R&SSMM100A 矢量信号发生器在 100 kHz 至 44 GHz 的频率范围内提供优越的射频特性。这款仪器覆盖现有无线标准所使用的 6 GHz 以下的频段、新定义的最高 7.125 GHz 的 5G NR FR1 和 Wi-Fi 6E 频段以…