全方位揭秘!大数据从0到1的完美落地之运行流程和分片机制

news2024/12/22 16:26:21


 

一个完整的MapReduce程序在分布式运行时有三类实例进程:

  • MRAppMaster: 负责整个程序的过程调度及状态协调
  • MapTask: 负责Map阶段的整个数据处理流程
  • ReduceTask: 负责Reduce阶段的整个数据处理流程

当一个作业提交后(mr程序启动),大概流程如下:

  1. 一个mr程序启动的时候,会先启动一个进程Application Master,它的主类是MRAppMaster
  2. ApplicationMaster启动之后会根据本次job的描述信息,计算出inputSplit的数据,也就是MapTask的数量
  3. ApplicationMaster然后向ResourceManager来申请对应数量的Container来执行MapTask进程。
  4. MapTask进程启动之后,根据对应的inputSplit来进行数据处理,处理流程如下
    1. 利用客户指定的inputformat来获取recordReader读取数据,形成kv键值对。
    2. 将kv传递给客户定义的Mapper类的map方法,做逻辑运算,并将map方法的输出kv收集到缓存。
    3. 将缓存中的kv数据按照k分区排序后不断的溢出到磁盘文件
  5. ApplicationMaster监控mapTask进程完成之后,会根据用户指定的参数来启动相应的reduceTask进程,并告知reduceTask需要处理的数据范围
  6. ReduceTask启动之后,根据ApplicationMaster告知的待处理的数据位置,从若干的已经存到磁盘的数据中拿到数据,并在本地进行一个归并排序,然后,再按照相同的key的kv为一组,调用客户自定义的reduce方法,并收集输出结果kv,然后按照用户指定的outputFormat将结果存储到外部设备。

MapReduce分片机制

分片的概念

​ MapReduce在进行作业提交时,会预先对将要分析的原始数据进行划分处理,形成一个个等长的逻辑数据对象,称之为输入分片(inputSplit),简称“分片”。MapReduce为每一个分片构建一个单独的MapTask,并由该任务来运行用户自定义的map方法,从而处理分片中的每一条记录。

分片是一个逻辑概念,分块是一个物理概念。

HDFS上数据是按照块为单位进行存储的,我们是能够实实在在的看到每一个数据块的。而分片则不然,是一个逻辑概念,用来描述一个MapTask处理的数据是属于哪个文件的,从什么字节位置开始处理,处理多少个字节的数据等等信息。

分片的大小选择

​ 每一个MapTask处理一个分片的数据,因此分片的数量就决定了MapTask的数量。拥有多个分片,就意味着会有多个MapTask并发执行处理数据集。那么一个MapTask处理多大的数据呢?这也是由分片的大小来决定的。

​ 如果分片设置的太小,那么管理分片的时间和构建MapTask的总时间将在整个作业的时间占比较大,影响程序的执行效率。例如: 一个分片设置为1KB的大小,计算分片、构建MapTask耗时10ms的时间,处理数据耗时10ms的时间,那这样的程序的效率是非常低下的。我们更加乐意让一个任务初始化的时间在整个任务中的时间占比尽可能低。

​ 如果分片设置的太大,那么分片所描述的数据可能会在两个数据块中存储,那就有可能会造成网络IO的产生,需要将数据移动到一个节点上进行处理,效率更低。

​ 因此,最佳分片大小应该和HDFS的块大小一致。

分片源码解读

FileSplit

public class FileSplit extends InputSplit implements Writable {
    private Path file;				// 描述文件的路径信息
    private long start;				// 描述这个分片需要处理的数据起点
    private long length;			// 描述这个分片需要处理的数据长度
    private String[] hosts;			// 描述这个分片对应的数据块在哪些节点
    private SplitLocationInfo[] hostInfos;

    public FileSplit() {
    }

    public FileSplit(Path file, long start, long length, String[] hosts) {
        this.file = file;
        this.start = start;
        this.length = length;
        this.hosts = hosts;
    }
    ...
}
复制代码

FileInputFormat

public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
    // ...
    // 定义了一个1.1倍的溢出值
    private static final double SPLIT_SLOP = 1.1D;
    
    // ...
    // 创建一个分片对象,设置这个分片需要处理的数据位置、起点、长度、hosts等信息
    protected FileSplit makeSplit(Path file, long start, long length, String[] hosts) {
        return new FileSplit(file, start, length, hosts);
    }
    
    // ...
    // 最重要的方法: 获取文件所有的分片信息
    public List<InputSplit> getSplits(JobContext job) throws IOException {
        StopWatch sw = (new StopWatch()).start();
        long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job));
        long maxSize = getMaxSplitSize(job);
        List<InputSplit> splits = new ArrayList();
        List<FileStatus> files = this.listStatus(job);
        boolean ignoreDirs = !getInputDirRecursive(job) && job.getConfiguration().getBoolean("mapreduce.input.fileinputformat.input.dir.nonrecursive.ignore.subdirs", false);
        Iterator var10 = files.iterator();

        while(true) {
            while(true) {
                while(true) {
                    FileStatus file;
                    do {
                        if (!var10.hasNext()) {
                            job.getConfiguration().setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.size());
                            sw.stop();
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
                            }

                            return splits;
                        }

                        file = (FileStatus)var10.next();
                    } while(ignoreDirs && file.isDirectory());

                    // 重要逻辑在这里!!!
                    // 获取到文件的路径描述信息
                    Path path = file.getPath();
                    // 获取到文件的大小
                    long length = file.getLen();
                    // 如果文件的大小不等于0
                    if (length != 0L) {
                        // 获取数据块的分布信息
                        BlockLocation[] blkLocations;
                        if (file instanceof LocatedFileStatus) {
                            blkLocations = ((LocatedFileStatus)file).getBlockLocations();
                        } else {
                            FileSystem fs = path.getFileSystem(job.getConfiguration());
                            blkLocations = fs.getFileBlockLocations(file, 0L, length);
                        }
    
                        // 如果文件可以分片(有些文件是不可以分片的)
                        if (this.isSplitable(job, path)) {
                            // 获取一个Block的大小
                            long blockSize = file.getBlockSize();
                            // 计算分片的大小(块大小, 配置文件中设置的最小分片大小,最大分片大小的中间值)
                            long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);

                            // 用来记录来剩多少字节的数据没有分片
                            long bytesRemaining;
                            int blkIndex;
                            // 循环分片开始了!
                            // 注意: 循环的条件,并不是剩余数量不足分片大小! 有一个1.1倍的溢出的值的!
                            for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {
                                blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
                                // 创建一个分片!添加到分片集合中!
                                splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
                            }

                            // 循环走完后,创建一个分片来描述剩余的数据
                            if (bytesRemaining != 0L) {
                                blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
                                splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
                            }
                        } else {
                            if (LOG.isDebugEnabled() && length > Math.min(file.getBlockSize(), minSize)) {
                                LOG.debug("File is not splittable so no parallelization is possible: " + file.getPath());
                            }

                            splits.add(this.makeSplit(path, 0L, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts()));
                        }
                    } else {
                        splits.add(this.makeSplit(path, 0L, length, new String[0]));
                    }
                }
            }
        }
    }
    
    // 计算分片大小
    protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
        return Math.max(minSize, Math.min(maxSize, blockSize));
    }
}
复制代码

分片总结

  1. 分片大小参数

    ​ 通过分析源码,在FileInputFormat中,计算分片大小的逻辑:Math.max(minSize, Math.min(maxSize, blockSize)); 分片主要由这几个值来运算决定

    参数默认值属性
    minSize1mapreduce.input.fileinputformat.split.minsize
    maxSizeLong.MAX_VALUEmapreduce.input.fileinputformat.split.maxsize
    blockSize128Mdfs.blocksize

    通过计算的逻辑分析可以得出,分片大小的计算,是取这三个值的中间值的,因此:

    • 如果需要增大分片的大小: 调整minSize大于blockSize即可
    • 如果需要减小分片的大小: 调整maxSize小于blockSize即可
  2. 分片创建过程总结

    1. 获取文件大小及位置
    2. 判断文件是否可以分片(压缩格式有的可以进行分片,有的不可以)
    3. 获取分片的大小
    4. 剩余文件的大小/分片大小>1.1时,循环执行封装分片信息的方法,具体如下:
       封装一个分片信息(包含文件的路径,分片的起始偏移量,要处理的大小,分片包含的块的信息,分片中包含的块存在哪儿些机器上)
    5. 剩余文件的大小/分片大小<=1.1且不等于0时,封装一个分片信息(包含文件的路径,分片的起始偏移量,要处理
       的大小,分片包含的块的信息,分片中包含的块存在哪儿些机器上)
    复制代码

    注意事项: 1.1倍的冗余

    一个260M的文件,分几块?分几片?

    • 分块是物理概念: 128M + 128M + 4M,因此一共有3个分块。
    • 分片是逻辑概念:
      • 第一个分片: 260M/128M > 1.1,因此第一个分片大小128M,剩余132M数据未分片。
      • 第二个分片: 132M/128M < 1.1,因此第二个分片大小132M
      • 因此这个文件有2个分片。
  3. 多分片文件读取

    ​ 数据文件被分了多个分片,那么我们不能保证分片是正好按照行分开的,极大的可能性是一行的数据被分到了两个分片中。因此,我们在进行多个分片的数据读取的时候:

    - 第一个分片读到末尾再多读一行
    - 既不是第一个分片也不是最后一个分片第一行数据舍弃,末尾多读一行
    - 最后一个分片舍弃第一行,末尾多读一行
    复制代码

运行流程之MapTask

1. maptask调用FileInputFormat的getRecordReader读取分片数据
2. 每行数据读取一次,返回一个(K,V)对,K是offset,V是一行数据
3. 将k-v对交给MapTask处理
4. 每对k-v调用一次map(K,V,context)方法,然后context.write(k,v)
5. 写出的数据交给收集器OutputCollector.collector()处理
6. 将数据写入环形缓冲区,并记录写入的起始偏移量,终止偏移量,环形缓冲区默认大小100M
7. 默认写到80%的时候要溢写到磁盘,溢写磁盘的过程中数据继续写入剩余20%
8. 溢写磁盘之前要先进行分区然后分区内进行排序
9. 默认的分区规则是hashpatitioner,即key的hash%reduceNum
10. 默认的排序规则是key的字典顺序,使用的是快速排序
11. 溢写会形成多个文件,在maptask读取完一个分片数据后,先将环形缓冲区数据刷写到磁盘
12. 将数据多个溢写文件进行合并,分区内排序(外部排序 => 归并排序)
复制代码

​ MapTask的并行度决定map阶段的任务处理并发度,进而影响到整个job的处理速度.那么,MapTask并行实例是否越多越好呢?其并行度又是如何决定呢?

1. 如果硬件配置为2*12core + 64G,恰当的map并行度是大约每个节点20-100个map,最好每个map的执行时间至少一分钟。

2. 如果job的每个map或者 reduce task的运行时间都只有30-40秒钟,那么就减少该job的map或者reduce数,每一个task(map|reduce)的setup和加入到调度器中进行调度,这个中间的过程可能都要花费几秒钟,所以如果每个task都非常快就跑完了,就会在task的开始和结束的时候浪费太多的时间。

3. 配置task的JVM重用可以改善该问题:
   (mapred.job.reuse.jvm.num.tasks,默认是1,表示一个JVM上最多可以顺序执行的task数目(属于同一个Job)是1。也就是说一个task启一个JVM)

4. 如果input的文件非常的大,比如1TB,可以考虑将hdfs上的每个block size设大,比如设成256MB或者512MB
复制代码

运行流程之ReduceTask

1. 数据按照分区规则发送到reducetask
2. reducetask将来自多个maptask的数据进行合并,排序(外部排序===》归并排序)
3. 按照key相同分组()
4. 一组数据调用一次reduce(k,iterable<v>values,context)
5. 处理后的数据交由reducetask
6. reducetask调用FileOutputFormat组件
7. FileOutputFormat组件中的write方法将数据写出
复制代码

Reduce Task的并行度同样影响整个job的执行并发度和执行效率,但与Map Task的并发数由切片数决定不同,Reduc Task数量的决定是可以直接手动设置:默认值是1,手动设置为4

设置方法:job.setNumReduceTasks(4);
复制代码

如果数据分布不均匀,就有可能在reduce阶段产生数据倾斜

注意: Reduce Task数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个Reduce Task。尽量不要运行太多的Reduce Task。对大多数job来说,最好reduce的个数最多和集群中的reduce持平,或者比集群的 reduce slots小。这个对于小集群而言,尤其重要。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/489373.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

TouchGFX开发(2)----触摸屏幕组件点亮LED

TouchGFX开发.1----安装软件 概述创建 TouchGFX 项目添加图片组件添加按钮interactions 设置生成代码打开文件配置LED触摸点亮LED演示效果 概述 了解如何使用 TouchGFX 配置屏幕&#xff0c;添加触摸按钮&#xff0c;并通过按钮控制板载 LED 的状态。 创建 TouchGFX 项目 打…

详解map、set、multimap、multiset的使用

✍作者&#xff1a;阿润菜菜 &#x1f4d6;专栏&#xff1a;C 目录 前言set、multiset的使用1. set2. multiset3. 什么时候应该使用multiset而不是set map、multimap的使用1.map2.multimap3.什么时候应该使用multimap而不是map 前言 map、set、multimap、multiset是C STL中的四…

如何把握未来增长话语权,全链路数字化运营有解

近年来&#xff0c;良品铺子、元气森林、蔚来等迅速成为市场中现象级的品牌&#xff0c;它们往往在很短时间内就发展成市场的生力军和消费者青睐的对象。 仔细研究背后&#xff0c;这些新生品牌的崛起&#xff0c;核心商业逻辑跟以往品牌大为不同&#xff0c;明显更“懂”新生…

基于微信小程序的酒店预定管理系统设计与实现

第1章 绪论 1 1.1开发背景与意义 1 1.2开发方法 1 1.3论文结构 1 2系统开发技术与环境 3 2.1 系统开发语言 3 2.2 系统开发工具 3 2.3 系统页面技术 3 2.4 系统数据库的选择 4 2.5 系统的运行环境 4 2.5.1 硬件环境 4 2.5.2 软件环境 4 3系统分析 5 3.1可行性分析 5 3.1.1 经济…

Java——和为S的连续正数序列

题目链接 牛客网在线oj题——和为S的连续正数序列 题目描述 小明很喜欢数学,有一天他在做数学作业时,要求计算出9~16的和,他马上就写出了正确答案是100。但是他并不满足于此,他在想究竟有多少种连续的正数序列的和为100(至少包括两个数)。没多久,他就得到另一组连续正数和为…

用Jmeter进行接口自动化测试的工作流程你知道吗?

目录 测试流程 接口测试相关文档管理规范 接口测试要点 测试流程 在测试负责人接受到测试任务后&#xff0c;应该按照以下流程规范完成测试工作。 2.1 测试需求分析 产品开发负责人在完成某产品功能的接口文档编写后&#xff0c;在核对无误后下发给对应的接口测试负责人…

word@论文后期优化和完善工作@页眉页脚页码@配置并导出pdf

文章目录 论文结构例 目录操作页眉页脚页眉样式检查所有页面的页眉添加横线 页码从第二页(封面后的一页)开始用罗马数字标页码 word导出pdf等其他格式额外配置带独立书签和目录打印pdf 最后的优化 论文结构 一篇规范的论文应该大致包括以下部分&#xff1a; 标题页&#xff1…

JavaScript全解析——canvas 入门(下)

canvas 线段两端的样式 ●canvas 中, 是可以设置线段两端的样子的 ●我们先来画三个平行线 // 0. 获取到页面上的 canvas 标签元素节点 const canvasEle document.querySelector(#canvas)// 1. 获取当前这个画布的工具箱 const ctx canvasEle.getContext(2d)// 2. 开始绘制第…

webpack 5 实战(2)

二十一、babel-loader 使用 使用babel-loader对js文件进行处理&#xff0c;在lg.Webpack.js配置文件中配置js文件规则。 使用单独的插件进行转换 使用预设进行转换 使用babel.config.js配置文件进行babel配置 const path require(path) const CopyWebpackPlugin require(…

day12 IP协议与ethernet协议

目录 IP包头 IP网的意义 IP数据报的格式 IP数据报分片 以太网包头&#xff08;链路层协议&#xff09; IP包头 IP网的意义 当互联网上的主机进行通信时&#xff0c;就好像在一个网络上通信一样&#xff0c;看不见互联的各具体的网络异构细节&#xff1b; 如果在这种覆盖…

RabbitMQ 死信队列实现

// consumer处理成功后&#xff0c;通知broker删除队列中的消息&#xff0c;如果设置multipletrue&#xff0c;表示支持批量确认机制以减少网络流量 channel.basicAck(deliveryTag, multiple);// 拒绝deliveryTag对应的消息&#xff0c;第二个参数是否requeue&#xff0c;true则…

Inception模型实现孤立手语词的识别

实现孤立手语词的识别流程如下&#xff0c;在实际研究中&#xff0c;本章将着重研究第三阶段内容&#xff0c;也就是模型的设计与实现过程&#xff0c;目的是提高手语图像的识别准确率。 Inception模型实现 Inception模型是谷歌研究人员在2014年提出的一个深度卷…

网工Python:如何使用Netmiko的SCP函数进行文件传输?

在网络设备管理中&#xff0c;传输配置文件、镜像文件等是经常需要进行的操作。Netmiko是一个Python库&#xff0c;可用于与各种网络设备进行交互&#xff0c;提供了一些用于传输文件的函数&#xff0c;其中包括SCP&#xff08;Secure Copy Protocol&#xff09;函数。本文将介…

【软考备战·希赛网每日一练】2023年5月4日

文章目录 一、今日成绩二、错题总结第一题第二题第三题第四题三、知识查缺 题目及解析来源&#xff1a;2023年05月04日软件设计师每日一练 一、今日成绩 二、错题总结 第一题 解析&#xff1a; 修改Linux文件权限命令&#xff1a;chmod。 第二题 解析&#xff1a; 第三题 解析…

欧拉奔赴品牌2.0时代,女性汽车真实用户需求被定义?

每年的上海国际汽车工业展览会&#xff0c;不仅是各大汽车品牌的技术“秀场”&#xff0c;也是品牌的营销“修罗场”。今年上海车展出圈的营销事件特别多&#xff0c;热度甚至一再蔓延到汽车行业外&#xff0c;其中欧拉也贡献了不少流量。 据了解&#xff0c;在2023上海车展欧…

mount disk space from SAN

mount disk from FC-SAN 配置硬盘域、存储池、LUN、主机及LUN与与主机之间的映射。 fc-san多路径范例1 fc-san多路径2 mount disk from iSCSI [rootqionghai11g ~]# iscsiadm -m discovery -t sendtargets -p 192.16.10.188:3260 Starting iscsid: [ OK ] 192.16.10.188:32…

Yolov1 源码讲解 voc.py

先看结构 1.mean_rgb是voc2007专用的均值 voc2007分别是这样的 坐标格式&#xff08;X0&#xff0c;Y0&#xff0c;X1&#xff0c;Y1&#xff09;其中X0,Y0是左上角的坐标,X1,Y1是右下角的坐标。 coco,voc ,yolo数据集中的bbox的坐标格式_coco bbox格式_十二耳环的博客-CSDN…

Jmeter之BeanShell取出需要参数,传递给下个请求

一、事件背景&#xff1a; 上周同事用Jmeter录制脚本&#xff0c;录制成功回放后&#xff0c;并没有达到自己想要的结果。 他的真实需求是&#xff0c;想从数据库取出某个字段值&#xff0c;然后对数据库做操作。 也就是想实现做参数传递的效果&#xff0c;我心痒痒的&#…

ConcurrentHashMap底层源码解析

ConcurrentHashMap线程安全&#xff0c;底层数组链表红黑树。 思想&#xff0c;分而治之。 JDK7有Segment对象&#xff0c;分段锁 JDK8没有了这个对象 总结&#xff0c; 首先计算hash&#xff0c; 如果集合没有元素&#xff0c;开始initTable方法初始化&#xff0c;这里扩容讲…

有人说ChatGPT信息不新?

Hello ,我是小索奇&#xff0c;今天给大家分享一个插件&#xff0c;这个插件可以通过抓取网页获得最新内容&#xff0c;它可以有效的避免ChatGPT信息过时&#xff0c;获取不到最新的信息等等 演示-这里问它一些问题&#xff1a; 现在几点了呀 可以看到时间也是很准确的&#x…