大数据框架之Hadoop:MapReduce(一)MapReduce概述

news2024/9/22 19:36:15

1.1MapReduce定义

MapReduce是一个分布式计算框架,用于编写批处理应用程序,是用户开发“基于Hadoop的数据分析应用”的核心框架。

MapReduce核心功能是将用户编写的业务逻辑代码自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。

这里以词频统计为例进行说明,MapReduce 处理的流程如下:

1MapReduce%E6%A6%82%E8%BF%B0%2027fb072a6b5c4c94b5093f462126f565/v2-8cd36043096252ac626adc292b5c9a7b_720w.png

  1. input : 读取文本文件;
  2. splitting : 将文件按照行进行拆分,此时得到的 K1 行数,V1 表示对应行的文本内容;
  3. mapping : 并行将每一行按照空格进行拆分,拆分得到的 List(K2,V2),其中 K2 代表每一个单词,由于是做词频统计,所以 V2 的值为 1,代表出现 1 次;
  4. shuffling:由于 Mapping 操作可能是在不同的机器上并行处理的,所以需要通过 shuffling 将相同 key 值的数据分发到同一个节点上去合并,这样才能统计出最终的结果,此时得到 K2 为每一个单词,List(V2) 为可迭代集合,V2 就是 Mapping 中的 V2;
  5. Reducing : 这里的案例是统计单词出现的总次数,所以 ReducingList(V2) 进行归约求和操作,最终输出。

MapReduce 编程模型中 splittingshuffing 操作都是由框架实现的,需要我们自己编程实现的只有 mappingreducing,这也就是 MapReduce 这个称呼的来源。

1.2MapReduce优缺点

1、优点

  • Mapr易于编程

它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布在大量廉价的PC机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得MapReduce编程变得非常流行。

  • 良好的扩展性

当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。

  • 高容错性

MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,这就要求它就有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的。

  • 适合PB级以上海量数据的离线处理

可以实现上千台服务器集群并发工作,提供数据处理能力。

2、缺点

  • 不擅长实时计算

MapReduce无法像MySQL一样,在毫秒或者秒级内返回结果。

  • 不擅长流式计算

流失计算的输入数据时动态的,而MapReduce的输入数据集是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的。

  • 不擅长DAG(有向图)计算

多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能,而是使用后,每个MapReduce作业的输出结果都会写入磁盘,会造成大量的磁盘IO,导致性能非常低下

1.3MapReduce核心思想

Untitled

1)分布式的运算程序往往需要分成至少2个阶段。

2)第一个阶段的MapTask并发实例,完全并行运行,互不相干。

3)第二个阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出。

4)MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。

总结:分析WordCount数据流走向深入理解MapReduce核心思想。

1.4MapReduce进程

一个完整的MapR程序在分布式运行时有三类实例进程:

1)MrAppMaster:负责整个程序的过程调度及状态协调

2)MapTask:负责Map阶段的整个数据处理流程

3)ReduceTask:负责Reduce阶段的整个数据处理流程。

1.5常用数据序列化类型

Java类型Hadoop Writable类型
booleanBooleanWritable
byteByteWritable
intIntWritable
floatFloatWritable
longLongWritable
doubleDoubleWritable
StringText
mapMapWritable
arrayArrayWritable

1.6MapReduce编程规范

用户编写的程序分成三个部分:Mapper、Reducer和Driver。

1、Mapper阶段

(1)用户自定义的Mapper要继承自己的父类

(2)Mapper的输入数据是KV对的形式(KV的类型可自定义)

(3)Mapper中的为业务逻辑写在map()方法中

(4)Mapper的输出数据是KV对的形式(KV的类型可自定义)

(5)map()方法(MapTask进程)对每一个<K,V>调用一次

2、Reducer阶段

(1)用户自定义的Reducer要继承自己的父类

(2)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV

(3)Reducer的业务逻辑写在reduce()方法中

(4)ReduceTask进程对每一组相同的K的<k,v>组调用一次reduce()方法

3、Driver阶段

相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象。

1.7WordCount词频统计案例实操

1、需求

在给定的文本文件中统计输出每个单词出现的总次数

(1)输入数据

atguigu atguigu
ss ss
cls cls
jiao
banzhang
xue
hadoop

(2)期望输出数据

atguigu    2
banzhang 1
cls   2
hadoop    1
jiao  1
ss    2
xue  1

2、需求分析

按照MapReduce编程规范,分别编写Mapper,Reducer,Driver。

需求:统计一堆文件中单词出现的个数(WordCount案例)

Untitled

3、环境准备

(1)创建maven工程

Untitled

Untitled

按照要求填写相应的名称即可

Untitled

(2)在pom.xml文件中添加如下依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.cuiyf41</groupId>
    <artifactId>mr-quickstart-java</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven_compiler_plugin.version>3.6.1</maven_compiler_plugin.version>
        <java.version>1.8</java.version>
    </properties>
    <!-- 仓库 -->
    <repositories>
        <repository>
            <id>central</id>
            <name>Maven Repository</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public</url>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>
    </repositories>
    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.6</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${maven_compiler_plugin.version}</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.12.4</version>
                <configuration>
                    <skipTests>true</skipTests>
                </configuration>
            </plugin>
            <!-- 打包工具 -->
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>com.cuiyf41.wordCount.WordcountDriver</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

(3)在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

4、编写程序

(1)编写Mapper类

package com.cuiyf41.wordCount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    Text k = new Text();
    IntWritable v = new IntWritable(1);
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1 获取一行
        String line = value.toString();

        // 2 切割
        String[] words = line.split(" ");

        // 3 输出
        for (String word: words){
            k.set(word);
            context.write(k, v);
        }
    }
}

WordCountMapper 继承自 Mapper 类,这是一个泛型类,定义如下:

WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
   ......
}
  • KEYIN : mapping 输入 key 的类型,即每行的偏移量 (每行第一个字符在整个文本中的位置),Long 类型,对应 Hadoop 中的 LongWritable 类型;
  • VALUEIN : mapping 输入 value 的类型,即每行数据;String 类型,对应 Hadoop 中 Text 类型;
  • KEYOUTmapping 输出的 key 的类型,即每个单词;String 类型,对应 Hadoop 中 Text 类型;
  • VALUEOUTmapping 输出 value 的类型,即每个单词出现的次数;这里用 int 类型,对应 IntWritable 类型。

(2)编写Reducer类

package com.cuiyf41.wordCount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    int sum;
    IntWritable v = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        // 1 累加求和
        sum = 0;
        for(IntWritable count:values){
            sum += count.get();
        }

        // 2 输出
        v.set(sum);
        context.write(key, v);
    }
}

(3)编写Driver驱动类

package com.cuiyf41.wordCount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordcountDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
        args = new String[] { "/Users/cuiyufei/IdeaProjects/input", "/Users/cuiyufei/IdeaProjects/output" };

        // 1 获取配置信息以及封装任务
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2 设置jar加载路径
        job.setJarByClass(WordcountDriver.class);
        // 3 设置map和reduce类
        job.setMapperClass(WordcountMapper.class);
        job.setReducerClass(WordcountReducer.class);

        // 4 设置map输出
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 5 设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 6 设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 提交
        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);

    }
}

5、本地测试

在Driver中添加本地测试文件和输出文件目录

// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
args = new String[] { "/Users/cuiyufei/IdeaProjects/input", "/Users/cuiyufei/IdeaProjects/output" };

在Driver文件中右键运行即可。

Untitled

6、集群上测试

(0)用maven打jar包,需要添加的打包插件依赖

注意:标记红颜色的部分需要替换为自己工程主类

<build>
		<plugins>
			<plugin>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>2.3.2</version>
				<configuration>
					<source>1.8</source>
					<target>1.8</target>
				</configuration>
			</plugin>
			<plugin>
				<artifactId>maven-assembly-plugin </artifactId>
				<configuration>
					<descriptorRefs>
						<descriptorRef>jar-with-dependencies</descriptorRef>
					</descriptorRefs>
					<archive>
						<manifest>
							<mainClass>com.atguigu.mr.WordcountDriver</mainClass>
						</manifest>
					</archive>
				</configuration>
				<executions>
					<execution>
						<id>make-assembly</id>
						<phase>package</phase>
						<goals>
							<goal>single</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>

注意:如果工程上显示红叉。在项目上右键->maven->update project即可。

(1)将程序打成jar包,然后拷贝到Hadoop集群中

步骤详情:右键->Run as->maven install。等待编译完成就会在项目的target文件夹中生成jar包。如果看不到。在项目上右键->Refresh,即可看到。修改不带依赖的jar包名称为wc.jar,并拷贝该jar包到Hadoop集群。

(2)启动Hadoop集群

(3)执行WordCount程序

[root@hdp101 software]# hadoop jar  wc.jar com.atguigu.wordcount.WordcountDriver /user/atguigu/input /user/atguigu/output

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/351625.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

一文掌握如何对项目进行诊断?【步骤方法和工具】

作为项目经理和PMO&#xff0c;面对错综复杂的项目&#xff0c;需要对组织的项目运作情况进行精确的分析和诊断&#xff0c;找出组织项目管理中和项目运行中存在的问题和潜在隐患&#xff0c;分析其原因&#xff0c;预防风险&#xff0c;并且形成科学合理的决策建议和解决方案&…

day43【代码随想录】动态规划之一和零、完全背包理论基础

文章目录前言一、一和零&#xff08;力扣474&#xff09;二、完全背包前言 1、一和零 2、完全背包理论基础 一、一和零&#xff08;力扣474&#xff09; 求装满这个背包最多有多少个物品 给你一个二进制字符串数组 strs 和两个整数 m 和 n 。 请你找出并返回 strs 的最大子集…

Python实现查票以及自动购票....再也不用愁手速不够啦~

前言 嗨喽&#xff0c;大家好呀~这里是爱看美女的茜茜呐 今天咱们来用Python做一个自动查票抢票的脚本&#xff0c;24小时抢票&#xff0c;谁抢的过你&#xff01; 准备工作 环境 Python 3.8 Pycharm 插件 谷歌浏览器驱动 模块 需要安装的第三方模块&#xff0c;直接pi…

极智AI | 算能SC5+智算卡驱动安装与升级

欢迎关注我的公众号 [极智视界]&#xff0c;获取我的更多经验分享 大家好&#xff0c;我是极智视界&#xff0c;本文介绍一下 算能SC5智算卡驱动安装与升级。 邀您加入我的知识星球「极智视界」&#xff0c;星球内有超多好玩的项目实战源码下载&#xff0c;链接&#xff1a;ht…

2023美赛问题E:光污染

背景 背景 光污染是指任何过多或不当使用人造光的表现。我们所称为光污染的一些现象包括光侵入、过亮、以及光混乱。这些现象最容易在大城市太阳落山后观察到天空中的发光&#xff1b;然而&#xff0c;它们也可能发生在更偏远的地区。 光污染改变了我们对夜空的看法&#xff…

递归与尾递归

你肯定知道递归&#xff0c;大概也知道尾递归。可是为什么要在递归中分出尾递归这样一个类&#xff1f;尾递归的本质又是什么&#xff1f;许多人未必清楚。 递归原是Lisp语言提出的概念&#xff0c;后来被许多语言借鉴。递归指自我重复的结构&#xff0c;在编程语言中体现为函…

共享模型之工具(二)

1.自定义线程池 1>.在实际开发过程中建议不要使用JDK提供的方式创建线程池,因为底层不方便优化,在请求量非常大的情况下可能会出现OOM,我们需要手动实现一个线程池; 2>.代码实现: Slf4j public class TestThreadPoolDemo1 {public static void main(String[] args) {/…

容器安全风险and容器逃逸漏洞实践

本文博客地址&#xff1a;https://security.blog.csdn.net/article/details/128966455 一、Docker存在的安全风险 1.1、Docker镜像存在的风险 不安全的第三方组件&#xff1a;用户自己的代码依赖若干开源组件&#xff0c;这些开源组件本身又有着复杂的依赖树&#xff0c;甚至…

在 Python 中只接受数字作为用户输入

只接受数字作为用户输入&#xff1a; 使用 while True 循环进行循环&#xff0c;直到用户输入一个数字。使用 float() 类尝试将值转换为浮点数。如果用户输入了一个数字&#xff0c;请使用 break 语句跳出循环。 while True:try:# &#x1f447;️ use int() instead of floa…

宝马项目化流程标准(BMW ABC flyer requirement)

ABC flyer/ BMWQMT build phase requirement宝马的项目流程标准叫做ABC flyer,也叫QMT build phase requirement.为什么叫这么名字&#xff0c;是因为宝马项目的产品零件分为几个阶段&#xff1a;A-samples, B-samples,C-samples, initial-samples.1、PVL/ VS0:100% ok parts i…

高通平台开发系列讲解(Android篇)AudioTrack音频流数据传输

文章目录 一、音频流数据传输通道创建1.1、流程描述1.2、流程图解二、音频数据传输2.1、流程描述2.2、流程图解沉淀、分享、成长,让自己和他人都能有所收获!😄 📢本篇章主要图解AudioTrack音频流数据传输 。 一、音频流数据传输通道创建 1.1、流程描述 AudioTrack在set函…

项目自动化构建工具make/Makefile

目录 make/Makefile概念和关系 make/Makefie的使用 一个工程中的源文件不计数&#xff0c;其按类型、功能、模块分别放在若干个目录中&#xff0c;makefile定义了一系列的规则来指定&#xff0c;哪些文件需要先编译&#xff0c;哪些文件需要后编译&#xff0c;哪些文件需要重…

33岁测试宝妈,在家带娃一年半,入职新公司居然年薪30W+

疫情3年&#xff0c;每一个行业的危机&#xff0c;每一个企业的倒下&#xff0c;背后都是无数人的降薪、降职和失业。这也暴露了人生的残酷真相&#xff1a;人活一辈子&#xff0c;总有“丰年”和“荒年” 优秀的测试既过得了丰年&#xff0c;也受得住荒年。 我之前认识的一个…

零信任-Zscaler零信任介绍(7)

​Zscaler零信任介绍 Zscaler是一家专注于网络安全的公司&#xff0c;他们提供了一种名为Zscaler Zero Trust Exchange (ZTX)的零信任解决方案。这种解决方案旨在帮助企业提高网络安全&#xff0c;并确保只有授权的用户&#xff0c;设备和应用程序才能访问敏感信息。ZTX采用多…

畅购电商项目

1. 电商项目架构图技术框架/技术选型1.1 系统架构该项目是一个B2C的电商项目&#xff08;类似小米商城、京东商城、天猫商城&#xff09;允许客户通过网络购买商品该项目主要完成的是电商项目前台的开发。采用前后端分离的方式进行开发的前端&#xff1a;vue全家桶&#xff08;…

QT入门Containers之Widget、Frame

目录 一、QWidget界面相关 1、布局介绍 2、基本界面属性 3、特殊属性 二、QFrame 三、Demo展示 此文为作者原创&#xff0c;创作不易&#xff0c;转载请标明出处&#xff01; 一、QWidget界面相关 1、布局介绍 为什么将QWidget容器放在第一个&#xff0c;因为目前使用过…

前端缓存知识-强缓存与协商缓存

缓存的作用 减少了冗余的数据传输&#xff0c;节省了网费。减少了服务器的负担&#xff0c; 大大提高了网站的性能加快了客户端加载网页的速度 缓存分类 强制缓存如果生效&#xff0c;不需要再和服务器发生交互&#xff0c;而对比缓存不管是否生效&#xff0c;都需要与服务端…

查询蓝牙适配器版本

台式机不支持蓝牙&#xff0c;装了个蓝牙适配器&#xff0c;结果换来换去又忘记自己这个是啥版本了&#xff0c;适配器自己也不写。好在微软官方也给了说明如何查询我的电脑运行哪个蓝牙版本&#xff1f; - Microsoft 支持https://support.microsoft.com/zh-cn/windows/%E6%88%…

day44【代码随想录】动态规划之零钱兑换II、组合总和 Ⅳ、零钱兑换

文章目录前言一、零钱兑换II&#xff08;力扣518&#xff09;二、组合总和 Ⅳ&#xff08;力扣377&#xff09;三、零钱兑换&#xff08;力扣322&#xff09;总结前言 1、零钱兑换II 2、组合总和 Ⅳ 3、零钱兑换 一、零钱兑换II&#xff08;力扣518&#xff09; 给你一个整数…

教你如何实现一个页面自动打字效果

前言&#xff1a; 最近在写一个类似于 windows 启动页的项目&#xff0c;不知道大家是否还记的 windows 很经典的小光标加载页面&#xff0c;我又稍微改造了一下效果如下&#xff1a; 一. 光标闪烁效果的实现 tips&#xff1a; 在这里我们使用了 UnoCSS&#xff0c;如果你不清…