用于大规模数据集(大于1TB)的并行运算的MapReduce是怎么实现的?

news2024/12/25 10:26:58

MapReduce 是一种编程模型,用于处理和生成大数据集。MapReduce 分为两个阶段:Map 阶段和 Reduce 阶段。

  1. Map 阶段:在这个阶段,输入数据被拆分成不同的数据块,这些数据块被分发到各个 Map 任务上。每个 Map 任务对输入的数据块进行处理并输出成一组键值对。具体的处理方式取决于编写 Map 任务的方式,这将由程序员决定。

  2. Shuffle 阶段:在 Map 阶段之后,系统会根据 Map 阶段输出的键值对进行排序和分区。这个过程是系统自动完成的,不需要程序员编写代码。

  3. Reduce 阶段:在这个阶段,一组键值对会送到同一个 Reduce 任务上。Reduce 任务会对这些键值对按照键来进行合并,并对每个键的所有值进行处理,输出处理结果。具体的处理方式也是由程序员编写的 Reduce 任务来决定。

通过 Map 和 Reduce 两个步骤,MapReduce 能够将大规模的数据计算工作分解成小规模的计算任务,这些小规模的计算任务可以在不同的计算节点上并行处理。这使得 MapReduce 可以处理非常大规模的数据。

例如,一个简单的MapReduce程序可能会将文本文件分成单词(map),然后计算每个单词的出现次数(reduce)。在这种情况下,Map任务将文本文件切分为单词,并输出每个单词的键值对(键是单词,值是1),然后Reduce任务将这些键值对按照键(单词)合并,并计算每个键(单词)的值(出现次数)的总和。

在真实的环境中,这个模型通常由分布式计算框架(如 Hadoop)来实现,框架会负责任务的调度、数据的分布和容错等工作,使得程序员可以专注于编写处理数据的 Map 和 Reduce 函数。

上demo:

// 导入 IOException 类,用于处理输入/输出错误
import java.io.IOException;

// 导入 StringTokenizer 类,用于解析字符串
import java.util.StringTokenizer;

// 导入 Hadoop 提供的 Configuration 类,用于 Hadoop 配置设置
import org.apache.hadoop.conf.Configuration;

// 导入 Hadoop 提供的 Path 类,处理 Hadoop 文件系统路径
import org.apache.hadoop.fs.Path;

// 导入 Hadoop 提供的 IntWritable 和 Text 类,数据类型转换
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

// 导入 Hadoop MapReduce Job 类,用于定义和提交 MapReduce 任务
import org.apache.hadoop.mapreduce.Job;

// 从 Hadoop MapReduce 库导入 Mapper 和 Reducer 类
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

// 导入 Hadoop IO 类库,用于 MapReduce 输入输出
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

// 定义 WordCount 类
public class WordCount {

  // 定义公开的静态 TokenizerMapper 类,继承 Hadoop 的 Mapper 类
  public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{

    // 定义一个值为 1 的整型变量,作为每个单词计数的值
    private final static IntWritable one = new IntWritable(1);

    // 定义一个 Text 类型的变量,存储每个即将被计数的单词
    private Text word = new Text();

    // 重写 map 方法,这个方法会被 MapReduce 框架调用,每读入一行数据调用一次
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
      
      // 使用 StringTokenizer 将每行的字符串拆解为单个的单词
      StringTokenizer itr = new StringTokenizer(value.toString());

      // 循环单词字符串
      while (itr.hasMoreTokens()) {

        // 为 word 对象设值
        word.set(itr.nextToken());

        // 将每个单词及其数量(默认为 1)输出
        context.write(word, one);
      }
    }
  }

  // 定义公开的静态 IntSumReducer 类,继承 Hadoop 的 Reducer 类
  public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {

    // 定义 IntWritable 类型的对象,存储每个单词的计数总和
    private IntWritable result = new IntWritable();

    // 重写 reduce 方法,这个方法用于汇总每个单词的数量
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
      
      // 定义 sum 变量,用于累加每个单词的数量
      int sum = 0;

      // 使用 for 循环,遍历当前单词的所有数量值,进行累加
      for (IntWritable val : values) {
        sum += val.get();
      }

      // 为结果设值
      result.set(sum);

      // 将每个单词及其最终计数总和写出
      context.write(key, result);
    }
  }

  // 定义主方法,运行 MapReduce 任务的入口
  public static void main(String[] args) throws Exception {

    // 创建 Hadoop 配置对象
    Configuration conf = new Configuration();

    // 创建 Job 实例,设置 Job 名称
    Job job = Job.getInstance(conf, "word count");

    // 设置 Jar 文件 这行代码的作用是设置该作业的 jar 文件。在 Hadoop 中执行一个 MapReduce 作业,需要将其打包为一个 jar 文件并上传到 Hadoop 集群中。这里的 WordCount.class 即是你的作业驱动类, Hadoop 将会通过这个类查找包含该类的 jar 文件,然后使用这个 jar 文件执行整个作业。这样设置可以确保作业在 Hadoop 集群中的每个节点上都可以正确执行。
    job.setJarByClass(WordCount.class);

    // 设置 Mapper 类 这行代码的作用是设置 MapReduce 作业的 Mapper 类。在 MapReduce 模型中,Mapper 是负责处理输入数据,将输入数据转换为一系列键值对的组件。TokenizerMapper.class 就是你写的处理数据的类,你可以在这个类中定义如何处理和映射输入数据。设置 Mapper 类是进行 MapReduce 作业的关键步骤之一。
    job.setMapperClass(TokenizerMapper.class);

    // 设置 Combiner 类  当一个特定的 Mapper 结束处理之后,它会输出很多记录,这些记录中可能有很多键是相同的。如果没有 Combiner,这些记录就会直接传递给 Reducer,这样可能会产生大量的网络 I/O,显著地影响性能。
	//而有了 Combiner,我们可以在每个 Mapper 所在的节点上先进行一次局部的归约操作,将相同键的值合并后再传输到 Reducer,这样可以大大减少需要传输的数据量,提高任务的运行性能。
    job.setCombinerClass(IntSumReducer.class);

    // 设置 Reducer 类及其输出键-值对的类型
    // 这行代码是设定了该 MapReduce 任务的 Reducer 类是 IntSumReducer。Reducer 类是对所有 Mapper 的输出进行汇总的部分,每一个 MapReduce 任务都需要设定一个 Reducer 类。这里的 IntSumReducer 类定义了如何对数据进行汇总操作。
	job.setReducerClass(IntSumReducer.class); 
	//这行代码是设定了该 MapReduce 任务输出数据的键的类型是 Text。在 MapReduce 中,数据是以键值对的形式存储和传输的。这行代码指明了输出键的数据类型,确保处理结果的格式正确。
	job.setOutputKeyClass(Text.class);
	//这行代码是设定了该 MapReduce 任务输出数据的值的类型是 IntWritable。这是对job.setOutputKeyClass(Text.class)的补充,指明了输出值的数据类型。
    job.setOutputValueClass(IntWritable.class);

    // 设置输入数据的路径,取自程序参数
    FileInputFormat.addInputPath(job, new Path(args[0]));

    // 设置输出数据的路径,取自程序参数
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    // 等待任务完成后退出程序 这行代码是 MapReduce 任务的最后一步,作用是让主线程等待这个 MapReduce 任务完成。
    //在这段代码中,waitForCompletion方法将主线程挂起,直到 MapReduce 任务完成为止。这里的参数 true 表示会在控制台上显示这个任务的进度,也就是说你运行这个程序的时候,会在控制台上看到任务的完成进度。
	//这段代码的含义就是:如果任务成功完成,那么程序正常退出;如果任务执行过程中出现了错误,那么程序异常退出。
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}
/**
MapReduce 模型的并发性体现在其 Map 和 Reduce 操作的实现中,这两个操作对数据进行分布式处理。在以上代码片段中,没有一行明确地标示了并发操作,因为并发性质是 MapReduce 框架内部管理的,并非是在业务代码中直接表现出来的。
然而,job.setMapperClass(TokenizerMapper.class); 和 job.setReducerClass(IntSumReducer.class); 
这两行设置了 Mapper 类和 Reducer 类,这两类的实例在运行时会在多个节点上同时处理数据(一个节点处理输入数据的一部分),这便是 MapReduce 的并发性!!!
具体来说:
在 Mapper 阶段,输入的数据集会被划分成多个数据块,每一个数据块会被一个 Mapper 实例负责处理,这些 Mapper 实例是在集群的各个计算节点上同时运行的。
Reducer 阶段同样具有并发性,不同的 Reducer 实例会并发地处理 Mapper 输出的数据集中具有相同键的数据子集。
请注意,这些并发处理是由 Hadoop 框架自动管理的,并不需要在业务代码中明确地进行并发控制,这也是使用这种大数据处理框架的优点之一。
*/

这个程序把一个任务分为两个部分:

  1. Mapper (TokenizerMapper class):接受一行文本,拆分成单词,然后返回一系列的<单词,1>这样的键值对。
  2. Reducer (IntSumReducer class):对所有相同的单词计数总和,并返回<单词,总计数>这样的键值对。

执行这个程序,需要有一个 Hadoop 环境并且把待统计名词的文本文件放到应当的路径,并将结果输出的路径设置为期望的路径进行查看结果。

请注意,这些并发处理是由 Hadoop 框架自动管理的,并不需要在业务代码中明确地进行并发控制,这也是使用这种大数据处理框架的优点之一。

"Hadoop 框架自动管理" 的主要体现在哪些方便?

  1. 数据划分和任务分派:在 Map 阶段,Hadoop 按照一定的规则(默认为按文件大小)将输入数据划分成多个块(block),并为每个数据块创建一个 Map 任务,然后分派到集群的多个节点上并行执行。这个过程是 Hadoop 框架自动完成的,用户无需在代码中做任何处理。

  2. 任务调度和监控:Hadoop 采用 master-slave 架构,其中 master 节点(即 JobTracker)负责管理和调度整个集群,slave 节点(即 TaskTracker)负责执行具体的任务。JobTracker 根据集群的资源情况和任务需求,智能调度任务到各个 TaskTracker 执行,并实时监控任务的执行状态,一旦某个任务失败,会自动重新调度该任务到其他节点执行。

  3. 数据局部性优化:在划分数据和分派任务时,Hadoop 尽可能将数据存放在本地,或者距离执行任务的计算节点较近的其他节点上,以提高数据读取速度,减少网络传输开销,这称为数据局部性优化。

  4. 容错处理:如果在执行过程中某个节点出现故障,Hadoop 可以自动将该节点上的任务重新分派到其他正常的节点上执行,用户无需人工干预,这就是 Hadoop 的高容错性。

以上这些都是 Hadoop 框架自动进行的管理操作,用户只需要关注业务逻辑的编写,无需在代码中处理这些底层的细节问题。这就是 Hadoop 框架的强大之处,能够帮助用户简化分布式计算的复杂性,大大提高开发效率。

MapReduce模型的体系结构:
MapReduce体系结构主要由四个部分组成,分别是:Client、JobTracker、TaskTracker以及Task。

  1. Client:Client 是用户交互的入口,用户编写的MapReduce程序通过Client提交到JobTracker。同时,用户也可以通过Client提供接口查看作业运行状态。

  2. JobTracker:JobTracker 是集群的管理者,负责资源监控和作业调度。它监控TaskTracker与Job的状态,如有节点失败,将会将任务转移至其他节点。同时,JobTracker跟踪任务的执行进度、资源使用量等信息,并将这些信息提供给任务调度器(TaskScheduler),该调度器在资源出现空闲时选择合适的任务去使用这些资源。

  3. TaskTracker:TaskTracker 是具体任务执行的工作节点,它会周期性地通过心跳信号将本节点上的资源使用情况和任务的运行进度汇报给JobTracker。任务调度器根据这些信息为任务分配资源。TaskTracker还会执行JobTracker发送的命令(如启动新任务、杀死任务等)。它使用“slot”对本节点上的资源进行划分(CPU、内存等)。一个Task 获取到一个slot 后才有机会运行。

  4. Task:Task 是具体的执行单位,它们分为Map Task和Reduce Task两种,由TaskTracker启动。这两种任务分别处理Map和Reduce操作。

MapReduce模型的总体过程可以概括为:首先由Client提交作业,然后由JobTracker接收并管理这个作业,作业被分解为多个Task,然后由TaskTracker在各计算节点上执行这些任务。这就是MapReduce体系结构的工作方式。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1590111.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

DOTS Instancing合批:如何针对单个渲染实体修改材质参数

最近在做DOTS的教程,由于DOTS(版本1.0.16)目前不支持角色的骨骼动画&#xff0c;我们是将角色的所有动画数据Baker到一个纹理里面&#xff0c;通过修改材质中的参数AnimBegin,AnimEnd来决定动画播放的起点和终点&#xff0c;材质参数AnimTime记录当前过去的动画时间。但是在做大…

【opencv】示例-inpaint.cpp 图像修复是通过填充损坏图像部分从而修复这些损坏的过程...

原始图像 这段代码展示了一个使用OpenCV库进行图像修复的例子。它首先包含了处理图像编码、解码、显示、处理和照片处理所必要的OpenCV模块的头文件。然后利用cv和std命名空间下的类和方法。通过定义一个鼠标回调函数onMouse来处理图像上的绘图操作&#xff0c;并通过主函数mai…

节省30%成本,宝马使用 NVIDIA Omniverse 构造的数字孪生虚拟汽车工厂,实现降本增效

在数字化转型过程中&#xff0c;汽车制造商宝马集团将工业 AI 的力量运用到整个生产网络&#xff0c;与NVIDIA Omniverse平台共同构建并运行工业元宇宙应用。 宝马集团董事Milan Nedeljković在GTC主题演讲会中&#xff0c;与NVIDIA创始人兼首席执行官黄仁勋共同展示了Omniver…

基于mpc实现无人机轨迹跟踪ROS功能包:mav_control_rw

功能包简介 mav_control_rw 功能包是ETHZ ASL的利用mpc控制实现了旋翼式无人机的轨迹追踪算法。 mpc是模型预测控制的简称&#xff0c;全称是Model-based Predictive Control mpc利用一个已有的模型、系统当前的状态和未来的控制量&#xff0c;来预测系统未来的输出&#xf…

数据中心的网络架构设计,打造高效、安全的数字底座

数据中心的网络架构设计 一、数据中心网络架构设计原则 网络,作为数据中心的核心支柱,其结构精妙,由众多二层接入设备与少量三层设备共同编织而成。过去,数据中心网络规模有限,仅凭数十台设备的简单互连便能实现信息的畅通无阻。然而,随着技术与应用需求的飞速增长,数据…

FFmpeg: 简易ijkplayer播放器实现--05ijkplayer–连接UI界面和ffplay.c

文章目录 ijkplayer时序图消息循环--回调函数实现播放器播放时状态转换播放停止 ijkmediaPlay成员变量成员函数 ijkplayer时序图 stream_open: frame_queue_init packet_queue_init init_clock 创建read_thread线程 创建video_refresh_thread线程 消息循环–回调函数实现 ui …

SpringMVC--02--上下文工具类(RequestContextHolder)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 RequestContextHolder背景1.RequestContextHolder的使用2.request和response怎么和当前请求挂钩?3.request和response等是什么时候设置进去的? 案例应用---用户信…

【C++题解】1027 - 求任意三位数各个数位上数字的和

问题&#xff1a;1027 - 求任意三位数各个数位上数字的和 类型&#xff1a;基础问题 题目描述&#xff1a; 对于一个任意的三位自然数 x &#xff0c;编程计算其各个数位上的数字之和 S 。 输入&#xff1a; 输入一行&#xff0c;只有一个整数 x(100≤x≤999) 。 输出&…

微信小程序 uniapp+vue城市公交线路查询系统dtjl3

小程序Android端运行软件 微信开发者工具/hbuiderx uni-app框架&#xff1a;使用Vue.js开发跨平台应用的前端框架&#xff0c;编写一套代码&#xff0c;可编译到Android、小程序等平台。 前端&#xff1a;HTML5,CSS3 VUE 后端&#xff1a;java(springbootssm)/python(flaskdja…

STM32完成软件I2C通讯

今天的重点是利用STM32的软件方案和MPU60506轴姿态传感器建立通讯&#xff0c;今天只完成了简单的发送地址和接收应答的部分&#xff0c;特此记录一下过程&#xff0c;以后忘记可以随时翻出来看看。 先介绍最基本的I2C通讯的最基本的6个时序&#xff1a; 一&#xff1a;起始条…

goproxy 简单介绍 及一键安装脚本

goproxy 官网 https://goproxy.cn/ GoProxy 是一项用于 Go 模块的高性能代理服务&#xff0c;旨在为 Go 开发人员提供更快速、更可靠的模块下载体验。它提供以下主要功能&#xff1a; 全球分布式代理服务器: GoProxy 在全球多个地区部署了代理服务器&#xff0c;例如拉斯维加…

驱动开发:探索DRIVER_OBJECT驱动对象

本章将探索驱动程序开发的基础部分&#xff0c;了解驱动对象DRIVER_OBJECT结构体的定义&#xff0c;一般来说驱动程序DriverEntry入口处都会存在这样一个驱动对象&#xff0c;该对象内所包含的就是当前所加载驱动自身的一些详细参数&#xff0c;例如驱动大小&#xff0c;驱动标…

B站基于Apache Ranger的大数据权限服务的技术演进

01 背景 随着云计算、大数据技术的日趋成熟&#xff0c;复杂多元、规模庞大的数据所蕴含的经济价值和社会价值逐步凸显&#xff0c;数据安全也是企业面临的巨大挑战&#xff0c;B站一直致力于对用户隐私数据的保护。 02 Ranger概述 2.1 用户认证 提到安全&#xff0c;就不得不…

uni-app web端使用getUserMedia,摄像头拍照

<template><view><video id"video"></video></view> </template> 摄像头显示在video标签上 var opts {audio: false,video: true }navigator.mediaDevices.getUserMedia(opts).then((stream)> {video document.querySelec…

emmet语法--快速生成html标签

emmet语法介绍 可以直接把它理解为快捷键。 通过一定简略的缩写配合快捷键&#xff0c;直接生成我们想要的html代码。 vscode中已经内置了emmet语法&#xff0c;可以直接使用。 emmet的核心就是tab键&#xff0c;我们输入关键词然后按下tap就可以直接生成我们要的代码。 标…

高云FPGA的管脚约束文件的复制

问&#xff1a;Gowin里面能不能直接拷贝一个管脚约束文件进去用&#xff1f; 答&#xff1a; 可以直接拷贝&#xff0c;但是拷贝前后两个工程对应的芯片必须要是同一个芯片 拷贝方法: 第一步&#xff1a;按照被拷贝约束文件对应的芯片新建一个工程&#xff0c;然后将原工程文…

产品推荐 | 基于Lattice的Crosslink系列LIF-MD6000 FPGA开发板

1、产品概述 LATTICE是一家老牌的FPGA厂家。在CPLD和FPGA低成本&#xff0c;小封装独树一帜。特别在消费电子&#xff0c;小型化设备&#xff0c;控制领域用的比较多。 Crosslink开发板采用Lattice的Crosslink家族系列芯片&#xff0c;LIF-MD6000-6JM80。Crosslink芯片具有低…

取出/var/log/secure中一小时内登录失败超过三次的IP

取出/var/log/secure中一小时内登录失败超过三次的IP 前两个字段是日期&#xff0c;第三个字段是小时&#xff0c;第四个字段是IP cat /var/log/secure | sort -i | awk -F [ :] /Failed/{a[$1" "$2" "$3" "$4" "$(NF-3)]}END{for(i …

二百三十一、ClickHouse——DBeaver连接ClickHouse中时间戳字段的时区差了8小时

一、目的 在用kettle把MySQL中的数据同步到ClickHouse中&#xff0c;发现kettle里的数据显示正常、DataGrip查询ClickHouse中的数据显示正常&#xff0c;但是DBeaver中显示的ClickHosue中的时间字段晚8个小时 二、错误原因 DBeaver的数据库时区设置有问题 三、解决办法 右…

DP10RF001一款工作于200MHz~960MHz低功耗、高性能、单片集成的(G)FSK/OOK无线收发芯片

产品概述. DP10RF001是一款工作于200MHz~960MHz范围内的低功耗、高性能、单片集成的(G)FSK/OOK无线收发机芯片。内部集成完整的射频接收机、射频发射机、频率综合器、调制解调器&#xff0c;只需配备简单、低成本的外围器件就可以获得良好的收发性能。芯片支持灵活可设的数据包…