Hadoop MapReduce基本概念与详细流程

news2024/11/16 9:25:10

Hadoop MapReduce是Hadoop 中一个批量计算的框架,在整个mapreduce作业的过程中,包括从数据的输入,数据的处理,数据的数据输入这些部分,而其中数据的处理部分就要map,reduce,combiner等操作组成。这篇文章,分享了MapReduce基本概念与详细流程,enjoy~~

一、概念

MapReduce是一种基于java的分布式计算的处理技术和程序模型。 MapReduce算法包含两个重要任务,即Map和Reduce。Map采用一组数据并将其转换为另一组数据,其中各个元素被分解为元组(键/值对)。其次,reduce任务,它将map的输出作为输入,并将这些数据元组合并成一组较小的元组。作为MapReduce名称的顺序,reduce任务总是在map作业之后执行。

MapReduce的主要优点是易于在多个计算节点上扩展数据处理。在MapReduce模型下,数据处理原语称为映射器和缩减器。将数据处理应用程序分解为映射器和简化器有时并不重要。但是,一旦我们以MapReduce形式编写应用程序,扩展应用程序以在集群中运行数百,数千甚至数万台机器只是一种配置更改。这种简单的可扩展性是吸引许多程序员使用MapReduce模型的原因。

二、MapReduce的基本流程

MapReduce主要是先读取文件数据,然后进行Map处理,接着Reduce处理,最后把处理结果写到文件中。
在这里插入图片描述

三、MapReduce的主要流程

在这里插入图片描述

1,Record reader

记录阅读器会翻译由输入格式生成的记录,记录阅读器用于将数据解析给记录,并不分析记录自身。记录读取器的目的是将数据解析成记录,但不分析记录本身。它将数据以键值对的形式传输给mapper。通常键是位置信息,值是构成记录的数据存储块.自定义记录不在本文讨论范围之内.

2,Map

在映射器中用户提供的代码称为中间对。对于键值的具体定义是慎重的,因为定义对于分布式任务的完成具有重要意义.键决定了数据分类的依据,而值决定了处理器中的分析信息.本书的设计模式将会展示大量细节来解释特定键值如何选择.

3,Shuffle and Sort

ruduce任务以随机和排序步骤开始。此步骤写入输出文件并下载到本地计算机。这些数据采用键进行排序以把等价密钥组合到一起。

4,Reduce

reduce采用分组数据作为输入。该功能传递键和此键相关值的迭代器。可以采用多种方式来汇总、过滤或者合并数据。当reduce功能完成,就会发送0个或多个键值对。

5,输出格式

输出格式会转换最终的键值对并写入文件。默认情况下键和值以tab分割,各记录以换行符分割。因此可以自定义更多输出格式,最终数据会写入HDFS。

四、MapReduce读取数据

通过InputFormat决定读取的数据的类型,然后拆分成一个个InputSplit,每个InputSplit对应一个Map处理,RecordReader读取InputSplit的内容给Map。

1,InputFormat

决定读取数据的格式,可以是文件或数据库等。

功能
(1)验证作业输入的正确性,如格式等
(2) 将输入文件切割成逻辑分片(InputSplit),一个InputSplit将会被分配给一个独立的Map任务
(3)提供RecordReader实现,读取InputSplit中的"K-V对"供Mapper使用
方法
List getSplits(): 获取由输入文件计算出输入分片(InputSplit),解决数据或文件分割成片问题
RecordReader createRecordReader(): 创建RecordReader,从InputSplit中读取数据,解决读取分片中数据问题。
类结构
在这里插入图片描述

TextInputFormat: 输入文件中的每一行就是一个记录,Key是这一行的byte offset,而value是这一行的内容
KeyValueTextInputFormat: 输入文件中每一行就是一个记录,第一个分隔符字符切分每行。在分隔符字符之前的内容为Key,在之后的为Value。分隔符变量通过key.value.separator.in.input.line变量设置,默认为(\t)字符。
NLineInputFormat: 与TextInputFormat一样,但每个数据块必须保证有且只有N行,mapred.line.input.format.linespermap属性,默认为1
SequenceFileInputFormat: 一个用来读取字符流数据的InputFormat,<key,value>为用户自定义的。字符流数据是Hadoop自定义的压缩的二进制数据格式。它用来优化从一个MapReduce任务的输出到另一个MapReduce任务的输入之间的数据传输过程。</key,value>

2,InputSplit

代表一个个逻辑分片,并没有真正存储数据,只是提供了一个如何将数据分片的方法。
Split内有Location信息,利于数据局部化一个InputSplit给一个单独的Map处理。

public abstract class InputSplit {
      /**
       * 获取Split的大小,支持根据size对InputSplit排序.
       */
      public abstract long getLength() throws IOException, InterruptedException;

      /**
       * 获取存储该分片的数据所在的节点位置.
       */
      public abstract String[] getLocations() throws IOException, InterruptedException;
}

3,RecordReader

将InputSplit拆分成一个个<key,value>对给Map处理,也是实际的文件读取分隔对象</key,value>

4,问题

大量小文件如何处理
CombineFileInputFormat可以将若干个Split打包成一个,目的是避免过多的Map任务(因为Split的数目决定了Map的数目,大量的Mapper Task创建销毁开销将是巨大的)

怎么计算split的
通常一个split就是一个block(FileInputFormat仅仅拆分比block大的文件),这样做的好处是使得Map可以在存储有当前数据的节点上运行本地的任务,而不需要通过网络进行跨节点的任务调度
通过mapred.min.split.size, mapred.max.split.size, block.size来控制拆分的大小
如果mapred.min.split.size大于block size,则会将两个block合成到一个split,这样有部分block数据需要通过网络读取
如果mapred.max.split.size小于block size,则会将一个block拆成多个split,增加了Map任务数(Map对split进行计算并且上报结果,关闭当前计算打开新的split均需要耗费资源)
先获取文件在HDFS上的路径和Block信息,然后根据splitSize对文件进行切分( splitSize = computeSplitSize(blockSize, minSize, maxSize) ),默认splitSize 就等于blockSize的默认值(64m)

public List<InputSplit> getSplits(JobContext job) throws IOException {
    // 首先计算分片的最大和最小值。这两个值将会用来计算分片的大小
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    long maxSize = getMaxSplitSize(job);

    // generate splits
    List<InputSplit> splits = new ArrayList<InputSplit>();
    List<FileStatus> files = listStatus(job);
    for (FileStatus file: files) {
        Path path = file.getPath();
        long length = file.getLen();
        if (length != 0) {
              FileSystem fs = path.getFileSystem(job.getConfiguration());
            // 获取该文件所有的block信息列表[hostname, offset, length]
              BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
            // 判断文件是否可分割,通常是可分割的,但如果文件是压缩的,将不可分割
              if (isSplitable(job, path)) {
                long blockSize = file.getBlockSize();
                // 计算分片大小
                // 即 Math.max(minSize, Math.min(maxSize, blockSize));
                long splitSize = computeSplitSize(blockSize, minSize, maxSize);

                long bytesRemaining = length;
                // 循环分片。
                // 当剩余数据与分片大小比值大于Split_Slop时,继续分片, 小于等于时,停止分片
                while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
                      int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                      splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts()));
                      bytesRemaining -= splitSize;
                }
                // 处理余下的数据
                if (bytesRemaining != 0) {
                    splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkLocations.length-1].getHosts()));
                }
            } else {
                // 不可split,整块返回
                splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
            }
        } else {
            // 对于长度为0的文件,创建空Hosts列表,返回
            splits.add(makeSplit(path, 0, length, new String[0]));
        }
    }

    // 设置输入文件数量
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    LOG.debug("Total # of splits: " + splits.size());
    return splits;
}

分片间的数据如何处理
split是根据文件大小分割的,而一般处理是根据分隔符进行分割的,这样势必存在一条记录横跨两个split
在这里插入图片描述
解决办法是只要不是第一个split,都会远程读取一条记录。不是第一个split的都忽略到第一条记录

public class LineRecordReader extends RecordReader<LongWritable, Text> {
    private CompressionCodecFactory compressionCodecs = null;
    private long start;
    private long pos;
    private long end;
    private LineReader in;
    private int maxLineLength;
    private LongWritable key = null;
    private Text value = null;

    // initialize函数即对LineRecordReader的一个初始化
    // 主要是计算分片的始末位置,打开输入流以供读取K-V对,处理分片经过压缩的情况等
    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
        FileSplit split = (FileSplit) genericSplit;
        Configuration job = context.getConfiguration();
        this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
        start = split.getStart();
        end = start + split.getLength();
        final Path file = split.getPath();
        compressionCodecs = new CompressionCodecFactory(job);
        final CompressionCodec codec = compressionCodecs.getCodec(file);

        // 打开文件,并定位到分片读取的起始位置
        FileSystem fs = file.getFileSystem(job);
        FSDataInputStream fileIn = fs.open(split.getPath());

        boolean skipFirstLine = false;
        if (codec != null) {
            // 文件是压缩文件的话,直接打开文件
            in = new LineReader(codec.createInputStream(fileIn), job);
            end = Long.MAX_VALUE;
        } else {
            // 只要不是第一个split,则忽略本split的第一行数据
            if (start != 0) {
                skipFirstLine = true;
                --start;
                // 定位到偏移位置,下次读取就会从偏移位置开始
                fileIn.seek(start);
            }
            in = new LineReader(fileIn, job);
        }

        if (skipFirstLine) {
            // 忽略第一行数据,重新定位start
            start += in.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE, end - start));
        }
        this.pos = start;
    }

    public boolean nextKeyValue() throws IOException {
        if (key == null) {
            key = new LongWritable();
        }
        key.set(pos);// key即为偏移量
        if (value == null) {
            value = new Text();
        }
        int newSize = 0;
        while (pos < end) {
            newSize = in.readLine(value, maxLineLength,    Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength));
            // 读取的数据长度为0,则说明已读完
            if (newSize == 0) {
                break;
            }
            pos += newSize;
            // 读取的数据长度小于最大行长度,也说明已读取完毕
            if (newSize < maxLineLength) {
                break;
            }
            // 执行到此处,说明该行数据没读完,继续读入
        }
        if (newSize == 0) {
            key = null;
            value = null;
            return false;
        } else {
            return true;
        }
    }
}

以上的内容来源网络,仅供学习交流,如有侵犯,联系删除哦!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/366768.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

刚接手的APP项目需要优化,需要从哪些方向入手?

对于每个Android 开发团队来说产品上线&#xff0c;是让人喜忧参半的一件事。**喜指的是&#xff1a;付出了大量的时间&#xff0c;产品终于上线了&#xff1b;而忧指的是&#xff1a;担心中间会不会出现一些性能相关的问题&#xff0c;比如卡顿、内存泄漏、崩溃……等&#xf…

干翻 nio ,王炸 io_uring 来了 !!(图解+史上最全)

大趋势&#xff1a;全链路异步化&#xff0c;性能提升10倍 随着业务的发展&#xff0c;微服务应用的流量越来越大&#xff0c;使用到的资源也越来越多。 在微服务架构下&#xff0c;大量的应用都是 SpringCloud 分布式架构&#xff0c;这种架构总体上是全链路同步模式。 全链…

java 抽象类 详解

目录 一、抽象类概述&#xff1a; 二、抽象方法 : 1.概述 : 2.应用 : 3.特点 : 三、抽象类特点 : 1.关于abstract关键字 : 2.抽象类不能被实例化&#xff0c;只能创建其子类对象 : 3.抽象类子类的两个选择 &#xff1a; 四、抽象类的成员 : 1.成员变量 : 2.成员方…

趣味三角——第12章——tanx

第12章节 tanx In his very numerous memoires, and especially in his great work, Introductio in analysin infinitorum (1748), Euler displayed the most wonderful skill in obtaining a rich harvest of results of great interest. . . . Hardly any other work …

业务单据堆积如山?如何提升会计做账效率?

某集团以“创建现代能源体系、提高人民生活品质”为使命&#xff0c;形成了贯通下游分销、中游贸易储运、上游生产的清洁能源产业链和涵盖健康、文化、旅游、置业的生命健康产品链。目前&#xff0c;某集团在全国21个省&#xff0c;为超过2681万个家庭用户、21万家企业提供能源…

Android:同步屏障的简单理解和使用

同步屏障的简单理解和使用1、背景2、何为同步屏障&#xff1f;2.1、 发送屏障消息——postSyncBarrier2.2、发送异步消息2.3、处理消息2.4、移除屏障消息——removeSyncBarrier2、系统什么时候添加同步屏障&#xff1f;参考1、背景 这里我们假设一个场景&#xff1a;我们向主线…

网狐服务端C++引入http功能剖析

旗舰版本的网狐服务端及以前的版本都是没有http解析功能的&#xff0c;导致就是web后台改了配置不能及时通知到游戏里面去&#xff0c;以至于很多小公司拿这种框架来开发的变通的方案就是用定时器不定时去刷数据库&#xff0c;导致多少个功能就有多少个定时去刷新&#xff0c;代…

0基础学习软件测试难么

0基础开始学习任何一样事情都是有一定难度的&#xff0c;但是也要有对比&#xff0c;软件测试相比于IT行业其他学科已经算是容易入门的了&#xff0c;就看你个人的学习方法&#xff0c;找的学习资源以及你的自制力。 正确学习方法路径 “我一听就懂&#xff0c;一敲就废&…

【Spring的事务传播行为有哪些呢?Spring事务的隔离级别?讲下嵌套事务?】

如果你想寻求一份与后端相关的开发工作&#xff0c;那么关于Spring事务相关的面试题你就不能说不会并且不能不知道&#xff1f; 人生如棋&#xff0c;我愿为卒&#xff0c;行动虽慢&#xff0c;可谁曾见我后退一步&#xff1f; 一.Spring中声明事务的方式 1.1 编程式事务 编程…

Android Gradle脚本打包

1、背景资料 1.1 Android-Gradle-Groovy-Java-JVM 之间的关系 1.2 Android Gradle Plugin Version版本 与 Gradle Version版本的对应关系 Android Gradle Plugin Version版本Gradle Version版本1.0.0 - 1.1.32.2.1 - 2.31.2.0 - 1.3.12.2.1 - 2.91.5.02.2.1 - 2.132.0.0 -…

WFP网络过滤驱动-限制网站访问

文章目录前言WFP入门介绍WFP基本架构名词解释代码基本结构代码示例前言 WFP Architecture - Win32 apps | Microsoft Learn是一个网络流量处理平台。WFP 由一组连接到网络堆栈的钩子和一个协调网络堆栈交互的过滤引擎组成。 本文使用WFP&#xff0c;实现了一个网络阻断的demo…

Guna UI WinForms 2.0.4.4 Crack

Guna.UI2 WinForms is the suite for creating groundbreaking desktop app UI. It is for developers targeting the .NET Windows Forms platform. 50 多个 UI 控件 具有广泛功能的综合组件可帮助您开发任何东西。 无尽的定制 只需拖放即可创建视觉效果命令和体验。 出色的…

服务端开发之Java备战秋招面试篇2-HashMap底层原理篇

现在Java应届生和实习生就业基本上必问HashMap的底层原理和扩容机制等&#xff0c;可以说是十分常见的面试题了&#xff0c;今天我们来好好整理一下这些知识&#xff0c;为后面的秋招做足准备&#xff0c;加油吧&#xff0c;少年。 目录 1、HashMap集合介绍 2、HashMap的存储…

S2-001漏洞分析

首发于个人博客&#xff1a;https://bthoughts.top/posts/S2-001漏洞分析/ 一、简介 1.1 Struts2 Struts2是流行和成熟的基于MVC设计模式的Web应用程序框架。 Struts2不只是Struts1下一个版本&#xff0c;它是一个完全重写的Struts架构。 1.2 S2-001 Remote code exploit o…

【保姆级】手把手捋动态代理流程(JDK+Cglib超详细源码分析)

简介动态代理&#xff0c;通俗点说就是&#xff1a;无需声明式的创建java代理类&#xff0c;而是在运行过程中生成"虚拟"的代理类&#xff0c;被ClassLoader加载。 从而避免了静态代理那样需要声明大量的代理类。上面的简介中提到了两个关键的名词&#xff1a;“静态…

C语言进阶(九)—— 函数指针和回调函数、预处理、动态库和静态库的使用、递归函数

1. 函数指针1.1 函数类型通过什么来区分两个不同的函数&#xff1f;一个函数在编译时被分配一个入口地址&#xff0c;这个地址就称为函数的指针&#xff0c;函数名代表函数的入口地址。函数三要素&#xff1a; 名称、参数、返回值。C语言中的函数有自己特定的类型。c语言中通过…

多元回归分析 | ELM极限学习机多输入单输出预测(Matlab完整程序)

多元回归分析 | ELM极限学习机多输入单输出预测(Matlab完整程序) 目录 多元回归分析 | ELM极限学习机多输入单输出预测(Matlab完整程序)预测结果评价指标基本介绍程序设计参考资料预测结果 评价指标 -----------ELM结果分析-------------- 均方根误差(RMSE):0.55438 测试集…

webgis高德地图

webgis高德地图 首先准备工作,注册一个高德地图账号,然后在创建一个新应用生一个key跟appId 高德开放平台 接着创建一个html页面 高德配置手册 <style>* {margin: 0;padding: 0;}#

如何维护固态继电器?

固态继电器是SSR的简称&#xff0c;是由微电子电路、分立电子器件和电力电子功率器件组成的非接触式开关。隔离装置用于实现控制端子与负载终端之间的隔离。固态继电器的输入端使用微小的控制信号直接驱动大电流负载。那么&#xff0c;如何保养固态继电器呢&#xff1f; 在为小…

Editor工具开发基础一:顶部菜单栏拓展

一.创建脚本路径 不管是那种工具 都需要在工程里创建一个Editor文件夹 来存放工具.cs文件 二.特性 MenuItem 特性 修饰静态方法 三个构造函数 public MenuItem(string itemName); public MenuItem(string itemName, bool isValidateFunction); public MenuItem(string itemN…