实训笔记7.21

news2025/4/8 23:53:14

实训笔记7.21

  • 7.21
    • 一、MapReduce编程代码的打包问题与大数据集群环境中运行问题
      • 1.1 MR程序在运行的时候,job提交作业的时候会自动识别我们的运行环境,如果我们是在windows本地运行的话,MR程序识别的环境未LocalRunner这么一个环境,这个环境是windows的模拟分布式的环境,因此我们MR程序基本上都是在windows上测试没有问题之后,打成jar包,提交给Hadoop集群的YARN进行运行。
      • 1.2 如果将代码打成JAR包,部署到大数据集群上运行,也不一定是分布式运行,这个得看我们的配置
        • 1.2.1 本地安装模式
        • 1.2.2 伪分布式安装模式
        • 1.2.3 完全分布式安装模式
        • 1.2.4 HA高可用安装模式
      • 1.3 MR程序运行中报错问题
        • 1.3.1 运行MR程序报错HDFS的权限问题
        • 1.3.2 当MR程序打成JAR包以后,在Hadoop集群的YARN上运行的时候,报错ClassNotFoundException: xxxxx.xxMapper
        • 1.3.3 当MR程序打成JAR包以后,在Hadoop集群的YARN上运行的时候,报错资源不足的问题
    • 二、InputFormat的常见实现类的知识点
      • 2.1 cInputFortmat在MR程序中作用:
      • 2.2 TextInputFormat
      • 2.3 KeyValueTextInputFormat
      • 2.4 NLineInputFormat
      • 2.5 CombineTextInputFormat
      • 2.6 SequenceFileInputFormat
    • 三、MR程序Job提交的流程的源码分析
    • 四、Mapper阶段的核心知识点
    • 五、MR程序运行的Shuffle知识点
      • 5.1 Shuffle阶段的执行逻辑
        • 5.1.1 Map方法执行之后的逻辑
        • 5.1.2 Reduce方法执行之前的逻辑
      • 5.2 Shuffle中map输出开始执行源码解读
      • 5.3 Shuffle阶段中通过自定义分区实现数据的分区规则定义
        • 5.3.1 定义Java类继承Partitioner类
        • 5.3.2 重写Partitioner类中getPartition 方法自定义分区规则即可
        • 5.3.3 分区的数量必须和ReduceTask的数量保持一致,如果两者不一致,出现以下三种情况
      • 5.4 Shuffle阶段中通过自定义排序规则保证输出结果有序
      • 5.5 Shuffle阶段中的Combiner操作(MR程序的可选组件)
        • 5.5.1 Combiner的使用规则
        • 5.5.2 Combiner的执行时机
      • 5.6 shuffle阶段reduce聚合数据的时候,哪些数据为相同的key值?
    • 六、代码示例

7.21

一、MapReduce编程代码的打包问题与大数据集群环境中运行问题

1.1 MR程序在运行的时候,job提交作业的时候会自动识别我们的运行环境,如果我们是在windows本地运行的话,MR程序识别的环境未LocalRunner这么一个环境,这个环境是windows的模拟分布式的环境,因此我们MR程序基本上都是在windows上测试没有问题之后,打成jar包,提交给Hadoop集群的YARN进行运行。

1.2 如果将代码打成JAR包,部署到大数据集群上运行,也不一定是分布式运行,这个得看我们的配置

1.2.1 本地安装模式

有一个特点,如果是在本地安装模式下运行,MR程序也不是分布式运行,采用的也是模拟的运行环境,而非YARN

1.2.2 伪分布式安装模式

1.2.3 完全分布式安装模式

1.2.4 HA高可用安装模式

1.2.2~1.2.4 需要修改配置文件,其中在mapred-site,xml文件中专门配置了MR的运行环境在YARN上运行的
mapreduce.framework.name yarn模式
如果在三种安装模式当中,如果没有配置上述的选项,那么就算YARN启动成功了,MR程序也不会在YARN上运行,还是使用local本地模拟环境

1.3 MR程序运行中报错问题

1.3.1 运行MR程序报错HDFS的权限问题

  1. [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-e7FS0Rms-1689935697335)(F:/Typora/%E5%AE%9E%E8%AE%ADmd/%E7%AC%AC%E4%B9%9D%E5%91%A8/7.21/86fb0414522836b52f7127dbbe9f9a739133d646628a6e74f0852e2c187ad7fd.png)]

  2. 问题的原因:MR程序运行过程中需要在HDFS创建目录,并且向目录中写入MR程序运行结果,但是如果我们是在windows本地运行代码,MR程序在运行中,会使用windows上的用户名当作HDFS用户进行写操作权限,但是默认情况下,HDFS上除了root用户以外,其他用户基本上都是无权限写入的

  3. 报错解决方案

    1. 简单粗暴,但是不安全:给HDFS的根目录赋予一个777最高权限,不安全----禁止大家操作

    2. MR程序在运行的时候,指定HDFS的用户为root用户而非windows本地的用户(建议大家使用) 在MR程序的 vm options中增加一个配置项:-DHADOOP_USER_NAME=root

    3. 在HDFS集群中配置忽略权限检查,这个效果等同于第一种设置的方式hdfs-site.xml 必须在hdfs集群中配置,而非MR代码中

       <property>          
       	<name>dfs.permissions.enabled</name>          
       	<value>false</value>      
       </property>
      

1.3.2 当MR程序打成JAR包以后,在Hadoop集群的YARN上运行的时候,报错ClassNotFoundException: xxxxx.xxMapper

  1. 报错原因不是因为类的class文件没有打包到jar包当中,而是因为hadoop运行jar包的时候,不知道如何在JAR包中寻找这个类
  2. 解决方案:只需要让Hadoop运行jar包能找到类即可,在Driver驱动程序当中配置一行代码即可 job.setJarByClass(xxxDriver.class);

1.3.3 当MR程序打成JAR包以后,在Hadoop集群的YARN上运行的时候,报错资源不足的问题

  1. [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7bxIAKUh-1689935697337)(F:/Typora/%E5%AE%9E%E8%AE%ADmd/%E7%AC%AC%E4%B9%9D%E5%91%A8/7.21/c17f9e7b9859401b1bbd9a04dd7d29956aed6b698837591dce8b21d09b42345e.png)]

  2. 报错原因

    1. 虚拟机的资源太少,MR程序运行的时候,每一个map任务默认需要1024MB的内存mapred-site.xml

       <property>  
       	<name>mapreduce.map.memory.mb</name>  
       	<value>250</value> 
       </property> 
       <property>  
       	<name>mapreduce.map.java.opts</name>  
       	<value>-Xmx250M</value> 
       </property> 
       <property>  
       	<name>mapreduce.reduce.memory.mb</name>  
       	<value>300</value> 
       </property> 
       <property>  
       	<name>mapreduce.reduce.java.opts</name>  
       	<value>-Xmx300M</value> 
       </property>
      
    2. 资源不足之后,YARN会把一些已经分配了资源的MapTask强制杀死,之所以会杀死,是因为YARN会进行资源的检查,如果不想报这个错,还有一种方案,关闭YARN的资源检测yarn-site.xml

      <property>    
      	<name>yarn.nodemanager.vmem-check-enabled</name>   
      	<value>false</value> 
      </property>
      
  3. 【注意】:MR程序的jar包的运行命令如下:

    hadoop jar jar包的路径 jar包中的Driver驱动程序的全限定类名 参数1 参数2 .......

二、InputFormat的常见实现类的知识点

2.1 cInputFortmat在MR程序中作用:

  1. 如何对输入的文件进行切片,切片很重要,切片直接关系到Mapper阶段的MapTask的任务个数 getSplits()
  2. Mapper阶段的map方法在读取切片数据,如何读取,以及如何将读取的数据转换成为key-value格式的数据 createRecordReader()

2.2 TextInputFormat

  1. 切片机制:每一个文件单独进行切片,根据splitSize来确定
  2. kv读取机制:一行一行读取,每一行以偏移量为key 以当前行的数据为value

2.3 KeyValueTextInputFormat

  1. 切片机制和TextInputFormat机制一样的
  2. kv读取机制:一行一行读取,每一行以指定的分隔符进行切割,将切割之后的第一个字符串当作key值,剩余的字符串当作value

2.4 NLineInputFormat

  1. 切片机制:每一文件单独进行切片,根据行数来确定
  2. kv读取机制:和TextInputFormat机制一样的

2.5 CombineTextInputFormat

  1. 切片机制:所有的输入文件整体进行切片,基于设置的虚拟切片容量进行虚拟切片和物理切片两部分来完成
  2. kv读取机制:和TextInputFormat机制是一样的

2.6 SequenceFileInputFormat

三、MR程序Job提交的流程的源码分析

  1. 底层会先识别我们的运行环境
  2. 生成一个资源提交目录,如果是本地运行模式,那么资源提交到本地的某个路径下,如果是YARN运行模式,那么资源提交给HDFS的某个路径,生成一个JobID
  3. 基于InputFormat的切片机制生成切片规划文件job.split文件,并且把文件写入到资源提交目录
  4. 将MR程序中所有的配置项写入到一个job.xml文件,文件也写入到资源提交目录
  5. 程序开始申请运行资源,运行Map任务和reduce任务

四、Mapper阶段的核心知识点

  1. Mapper阶段需要启动多个MapTask任务,多个mapTask是并行运行,互不干扰的。
  2. 多个MapTask可能会在多个节点上运行,那么到底这些MapTask要在哪些节点上启动运行,MR程序也是有规矩的,移动数据不如移动计算。 启动MapTask的时候,一般要求MapTask最好在该MapTask负责的切片节点启动。这样的话我们MapTask计算数据的时候就不需要移动数据了。如果切片节点没有资源能启动MapTask了,那我们也会在距离这个数据最近的节点启动MapTask(网络拓扑原则)
  3. MapTask的任务个数是和切片的个数是对应,默认情况切片机制下,一个切片就是一个block块

【注意】我们之所以可以在切片所在节点启动计算任务,是因为当初我们配置Hadoop集群的时候,DataNode和NodeManager是同时配置的

五、MR程序运行的Shuffle知识点

shuffle指的是重新洗牌,逻辑上指的是将数据全部重新通过网络进行分发和洗牌,Shuffle指的是从map方法输出开始到reudce方法执行之前的这一段计算逻辑称之为Shuffle阶段

5.1 Shuffle阶段的执行逻辑

5.1.1 Map方法执行之后的逻辑

  1. map方法输出kv数据时,先根据指定的Partitioner计算kv数据的分区,计算成功之后,将kv数据的分区编号、kv数据本身、key、value分别在内存的其实地址,key、value数据的长度等信息写入到一个内存的环形缓冲区中(100M),
  2. 当环形缓冲区到达设定的阈值(80%),将环形缓冲区的数据溢写到磁盘文件,溢写数据之前,环形缓冲区的数据会根据不同分区进行一次分区排序(根据key值进行排序,默认使用快速排序算法),将排好序的分区数据溢写到磁盘文件中
  3. 可能Map阶段进行多次溢写,每一次溢写都需要先在环形缓冲区进行分区排序,然后再溢写文件,每一次溢写都会产生一个新的溢写文件
  4. 如果溢写文件的数量的超过3个,那么就会触发自己设置的combiner操作,对已经溢写完成的数据先进行一次map端的聚合操作。Combiner操作可选的。
  5. 当map阶段执行完成,会将产生的多个溢写文件,以及环形缓冲区剩余的还没有溢写的数据进行一次合并操作,合并成为一个大文件,只不过再合并的时候也需要进行一次排序(排序也是基于每一个分区进行,基于key值大小,使用的排序算法是归并排序算法)。
  6. 归并排序生成大文件之后,还会进行一次自定义的Combiner操作,对map阶段输出的数据进行一次局部汇总

【注意】Combiner操作可选的组件,如果加上的操作,第4和第6步就会执行,如果没有加,第4步和第6步一定不会执行 Combiner就算你指定了,可能一次也不执行,当map任务的计算负担很重,如果map任务的计算压力很大,那么combiner操作就算设置了,MR程序也不会执行的

5.1.2 Reduce方法执行之前的逻辑

  1. Copy阶段:Reduce任务根据负责的分区,从不同的MapTask上把对应的分区数据拉取到ReduceTask的内存中,如果ReduceTask内存放不下这些数据,把数据写到文件
  2. merge阶段:会把我们从不同maptask拉去回来的数据进行一次整体的合并
  3. sort阶段:合并拉取的不同mapTask分区的数据得时候,还需要对数据进行一次排序,排序可以单独指定规则,如果没有指定,默认还是使用key值得大小规则,排序算法也是归并排序

5.2 Shuffle中map输出开始执行源码解读

  1. collecotr收集器往环形缓冲区写出数据,只不过写出数据的时候先根据Partitioner计算数据的分区,partitioner分区计算默认情况下有两种计算方式

    1. 如果reduceTask的数量等于1的时候,采用一个内部类的分区器进行分区,分区器是吧所有的数据都分配到0号分区
    2. 如果reduceTask的数量大于1的时候,采用一个HashPartitioner分区机制,按照key的hashcode值和Integer.MAX_VALUE进行一次&位运算,然后和reduceTask取%余数得到一个分区编号。 分区编号看ReduceTask数量,[0,reduceTask-1]
    3. 如果你想自己控制分区的数据,那么就得需要自定义Partitioner来完成
  2. collector将数据写入环形缓冲区,环形缓冲区代码的体现就是一个字节数组,字节数组默认100M,超过80M,需要把缓冲区的数据写入到一个文件中

    缓冲区可以设置大小,阈值可以设置

    mapreduce.task.io.sort.mb 100 指定MR程序运行中环形缓冲区的默认大小 100M mapreduce.map.sort.spill.percent 0.80 指定MR程序运行中缓冲区的阈值 默认是0.8

    也可以再mapred-site.xml配置,如果在这个文件配置了,以后所有在Hadoop集群上运行的MR程序的缓冲区和阈值都是配置文件的值了。但是这样的配置我们不建议

    因为不同的计算程序环形缓冲区和阈值配置不同的参数,因此一般在MR的驱动程序使用Configuration配置,虽然这个配置只是对当前的MR生效。但是这是最常用的。

    配置有个规则:缓冲区越大,溢写的次数越小,计算的速度越高。

5.3 Shuffle阶段中通过自定义分区实现数据的分区规则定义

5.3.1 定义Java类继承Partitioner类

5.3.2 重写Partitioner类中getPartition 方法自定义分区规则即可

5.3.3 分区的数量必须和ReduceTask的数量保持一致,如果两者不一致,出现以下三种情况

  1. reduceTask的数量大于分区数,那么会产生多个结果文件,只不过有些结果文件就是一个空白文件,多余的reduceTask没有分区数据处理才会产生空白文件
  2. reduceTask的数量小于分区数,而且大于1的,报错
  3. reduceTask的数量小于分区数,但是等于1 正常执行,只不过分区不执行了

5.4 Shuffle阶段中通过自定义排序规则保证输出结果有序

整体Shuffle阶段,一共对数据进行三次排序,而且最终输出结果文件里面的数据其实是有顺序的。三次排序分别发生在:

  1. 当环形缓冲区超过阈值之后溢写磁盘的时候,会先在环形缓冲区进行第一次排序操作,排序基于key值的比较器进行排序,底层采用的快速排序的算法
  2. 当map阶段产生了多个溢写文件之后,合并多个溢写文件以及缓冲区中的数据的之后会进行第二次排序操作,排序基于key值得比较器进行排序的,底层采用是归并排序的算法
  3. 当ReduceTask把它所负责的分区数据拉去到ReduceTask节点之后,也需要对拉取的多个MapTask上的数据在进行一次归并排序,默认情况下我们排序也是基于key值的比较器进行排序,但是reduce比较特殊,也可以单独指定另外一种排序规则。

5.5 Shuffle阶段中的Combiner操作(MR程序的可选组件)

Combiner其实也是一个Reducer,只不过和Reducer不一样的地方在于,Reducer是对所有的MapTask计算的结果进行聚合操作,Combiner只对当前的MapTask计算的结果进行一次局部汇总,目的是为了减少了Map阶段向Reduce阶段传输的数据量,从而提升MR程序的计算效率。

5.5.1 Combiner的使用规则

  1. 一般默认情况下,Combiner就是Reducer,Reducer可以当作Combiner来使用
  2. 如果你不想用Reducer充当Combiner,也可以自定义Combiner,如果自定义Combiner,那么必须满足以下要求
    1. 自定义的Combiner的类必须继承Reducer
    2. Combiner的输入的KV是map阶段输出的kv类型 Combiner输出的kv类型必须是Reducer阶段输入的key value类型
  3. 使用在Map阶段给Reduce阶段传输的数据量过大的情况下,可以使用Combine进行一次map的局部汇总,减少数据的传输量

5.5.2 Combiner的执行时机

  1. 当Map阶段的的溢写文件超过三个,自动触发Combiner操作
  2. 当map阶段执行完成之后,把所有的溢写合并之后也会触发一次Combiner操作
  3. Combiner在有些极端的情况下,就算我们设置了,它也可能不会执行,如果map端的计算压力过大,那么Combiner就不会执行了,而是直接执行Reducer

5.6 shuffle阶段reduce聚合数据的时候,哪些数据为相同的key值?

除了需要借助自定义类型的hashCode和equals方法以外,还需要通过比较器判断。

六、代码示例

package com.sxuek.wc02;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

/**
 * 现在想使用mapreduce去实现单词技术案例,案例需求:
 *   1、要求可以统计出输入文件中每一个单词出现的总次数
 *   2、要求输出文件有两个,其中如果单词的首字母是大写,那么单词的统计结果写出到part-r-00000文件
 *      如果单词的首字母是小写,那么单词的统计结果写出到part-r-00001文件中
 *
 * 逻辑实现:
 *   因为结果需要两个文件,因此我们需要两个ReduceTask(因为MR程序中一个reduceTask默认只输出一个文件)
 *   而且现在我们还指定了分区的数据规则,MR程序的默认分区机制无法满足我们的需求,因此我们还得需要自定义分区机制
 *   剩余的操作就是基本的求单词计数案例的代码
 */
public class WCDriver02 {
    public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS","hdfs://192.168.68.101:9000");

        Job job = Job.getInstance(conf);
        //这一行代码用来指定程序打成JAR包之后在集群中运行时避免ClassNotFound异常问题
        job.setJarByClass(WCDriver02.class);

        //封装InputFormat  默认使用是TextInputFormat
        job.setInputFormatClass(TextInputFormat.class);
        FileInputFormat.setInputPaths(job,new Path("/wordcount.txt"));

        //封装Mapper阶段
        job.setMapperClass(WCMapper02.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        //封装分区Partitioner
//        job.setPartitionerClass(WCPartitioner.class);
        job.setPartitionerClass(WCPartitioner01.class);

        //封装reducer阶段
        job.setReducerClass(WCReducer02.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        /**
         * 设置reduceTask的数量,默认情况下 我们规定ReduceTask的数量必须和自定义的分区数保持一致
         * 但是规定是规定 是可以打破的,但是打破规则是要接受代价的
         * 代价:如果reduceTask的数量和分区返回的数量不一致,会出现以下三种情况:
         * 1、reduceTask的数量大于分区数,那么会产生多个结果文件,只不过有些结果文件就是一个空白文件,
         *    多余的reduceTask没有分区数据处理才会产生空白文件
         * 2、reduceTask的数量小于分区数,而且大于1的,报错
         * 3、reduceTask的数量小于分区数,但是等于1  正常执行,只不过分区不执行了
         */
        job.setNumReduceTasks(3);

        //封装OutputFormat
//        job.setOutputFormatClass();
        Path path = new Path("/output");
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.35.101:9000"), conf, "root");
        if (fs.exists(path)){
            fs.delete(path,true);
        }
        FileOutputFormat.setOutputPath(job,path);

        //提交程序运行
        boolean flag = job.waitForCompletion(true);
        System.exit(flag?0:1);

    }
}

class WCMapper02 extends Mapper<LongWritable, Text,Text,LongWritable>{
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] words = line.split(" ");
        for (String word : words) {
            context.write(new Text(word),new LongWritable(1L));
        }
    }
}

class WCReducer02 extends Reducer<Text,LongWritable,Text,LongWritable>{
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        long sum = 0L;
        for (LongWritable value : values) {
            sum += value.get();
        }
        context.write(key,new LongWritable(sum));
    }
}

/**
 * 自定义Partitioner实现数据分区机制
 * 1、自定义的Partitioner需要传递两个泛型,两个泛型就是map阶段输出的key-value的类型,
 *    因为partitioner分区是map阶段输出数据的时候触发的
 * 2、重写getPartition方法
 */
class WCPartitioner extends Partitioner<Text,LongWritable>{

    /**
     * 方法传递了三个参数
     * @param key   map阶段输出的key值
     * @param value map阶段输出的value值
     * @param numPartitions 设置的reduceTask的数量
     * @return 返回值整数类型 整数代表的是数据的分区编号 分区编号从0开始  而且分区编号必须是连贯的
     */
    @Override
    public int getPartition(Text key, LongWritable value, int numPartitions) {
        String word = key.toString();
        /**
         * 分区逻辑是 如果单词的首字母是大写 那么把数据分配给0号分区处理
         * 如果单词的首字母是小写 那么把数据分配给1号分区处理
         * 分区的编号从0开始
         */
        char first = word.charAt(0);
        if (first>=65 && first <= 90){
            return 0;
        }else{
            return 1;
        }
    }
}

class WCPartitioner01 extends Partitioner<Text,LongWritable>{
    @Override
    public int getPartition(Text key, LongWritable value, int numPartitions) {
        String word = key.toString();
        //首字母是全部转成了小写形式  这样的话我们就可以实现类似于忽略大小写判断的规则
        char first = word.toLowerCase().charAt(0);
        if(first == 'h'){
            return 0;
        }else if(first == 's'){
            return 1;
        }else{
            return 2;
        }
    }
}

package com.sxuek.wordcount;

import com.google.common.base.Charsets;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;

import java.io.IOException;
import java.util.List;

/**
 * 自定义InputFormat 需要指定两个泛型,两个泛型就是读取kv的类型
 */
public class MyInputFormat extends InputFormat<LongWritable, Text> {
    @Override
    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {

        return null;
    }

    /**
     * 也是一行一行读取的   以每一行的偏移量为key  以每一行的数据为value进行读取
     * @param split the split to be read
     * @param context the information about the task
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        String delimiter = context.getConfiguration().get("textinputformat.record.delimiter");
        byte[] recordDelimiterBytes = null;
        if (null != delimiter){
            recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
        }
        return new LineRecordReader(recordDelimiterBytes);
    }
}

package com.sxuek.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class WCDriver {
    public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {
        //1、准备一个配置文件对象
        Configuration configuration = new Configuration();
//        configuration.set("mapreduce.task.io.sort.mb","80");
//        configuration.set("mapreduce.map.sort.spill.percent","0.90");

        //在配置文件指定HDFS的地址,因此MR处理的数据一般都是HDFS的,但是我这里不指定了,因此在resources目录下已经通过core-site.xml文件指定了
//        configuration.set(KeyValueLineRecordReader.KEY_VALUE_SEPARATOR," ");

        //2、创建一个封装MR程序使用Job对象
        Job job = Job.getInstance(configuration);
        job.setJarByClass(WCDriver.class);

        /**
         * 3、封装指定的InputFormat类 如果没有指定,默认使用TextInputFormat
         */
//        job.setInputFormatClass(KeyValueTextInputFormat.class);
//        job.setInputFormatClass(NLineInputFormat.class);
//        NLineInputFormat.setNumLinesPerSplit(job,3);//指定3行做一个切片

//        job.setInputFormatClass(CombineTextInputFormat.class);
//        CombineTextInputFormat.setMaxInputSplitSize(job,8);
        //指定输入文件路径  输入路径默认是本地的,如果你想要是HDFS上的 那么必须配置fs.defaultFS  指定HDFS的路径
        FileInputFormat.setInputPaths(job,new Path("/wordcount.txt"));

        /**
         * 4、封装Mapper阶段
         */
        job.setMapperClass(WCMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        /**
         * 5、封装Partitioner分区类,分区类可以不用指定,默认分区机制
         */
//        job.setPartitionerClass();

        job.setCombinerClass(WCCombiner.class);

        /**
         * 6、封装Reducer阶段
         */
        job.setReducerClass(WCReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        job.setNumReduceTasks(1);

        /**
         * 7、封装指定的OutputFormat,如果没有指定OutputFormat  默认使用TextOutputFormat
         */
//        job.setOutputFormatClass();
        //封装输出路径  输出路径不能提前存在,因此代码在中先判断是否存在,如果存在删除了
        Path path = new Path("/output2");
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.68.101:9000"), configuration, "root");
        if (fs.exists(path)){
            fs.delete(path,true);
        }
        FileOutputFormat.setOutputPath(job,path);

        /**
         * 8、提交程序运行
         *    提交的时候先进行切片规划,然后将配置和代码提交给资源调度器
         */
        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }
}


class WCMapper extends Mapper<LongWritable, Text,Text,LongWritable>{
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        String line = value.toString();
        System.out.println("map通过inputFormat机制读取的key值为"+key.get()+",读取的value值为"+line);
        String[] words = line.split(" ");
        for (String word : words) {
             context.write(new Text(word),new LongWritable(1L));
        }
    }
}
 class WCReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        long sum =0l;
        for (LongWritable value : values) {
            sum += value.get();
        }
        context.write(key,new LongWritable(sum));
    }
}

class WCCombiner extends Reducer<Text,LongWritable,Text,LongWritable>{
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        long sum =0l;
        for (LongWritable value : values) {
            sum += value.get();
        }
        context.write(key,new LongWritable(sum));
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/789492.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

王道考研数据结构--4.2循环队列

目录 前言 1.循环队列的定义 2.循环队列的结构 3.循环队列的操作 3.1定义循环队列 3.2初始化 3.3入队 3.4出队 3.5遍历&#xff0c;求表长 3.6清空销毁 4.完整代码 前言 日期&#xff1a;2023.7.25 书籍&#xff1a;2024年数据结构考研复习指导&#xff08;王道考研…

MySQL 中一条 SQL 的查询与更新

MySQL 中一条 SQL 的查询与更新 1 SQL 的查询1.1 MySQL 的逻辑架构图1.2 连接器1.3 查询缓存1.4 分析器1.5 优化器1.6 执行器 2 SQL 的更新2.1 redo log&#xff08;重做日志&#xff09;2.2 binlog&#xff08;归档日志&#xff09;2.3 redo log 和 binlog 日志的差异2.4 示例…

【Java SE】类和对象

目录 【1】面向对象的初步认识 【1.1】什么是面向对象 【1.2】面向对象与面向过程 【2】类定义和使用 【2.1】简单认识类 【2.2】类的定义格式 【2.3】练习 【2.3.1】定义一个狗类 【2.3.2】定义一个学生类 【3】类的实例化 【3.1】什么是实例化 【3.2】类和对象的…

Acwing.282 石子合并(动态规划)

题目 设有N堆沙子排成一排&#xff0c;其编号为1&#xff0c;2&#xff0c;3&#xff0c;…&#xff0c;N。 每堆沙子有一定的质量&#xff0c;可以用一个整数来描述&#xff0c;现在要将这N堆沙子合并成为一堆。 每次只能合并相邻的两堆&#xff0c;合并的代价为这两堆沙子的…

应用层协议——http

文章目录 1. HTTP协议1.1 认识URL1.2 urlencode和urldecode1.3 HTTP协议格式1.3.1 HTTP请求1.3.2 HTTP响应1.3.3 外网测试1.3.4 添加html文件1.3.5 HTTP常见Header1.3.6 GET和POST 1.4 HTTP的状态码1.4.1 301和3021.4.2 代码实现 1.5 Cookie1.5.1 代码验证1.5.2 Cookiesession …

【Nodejs】Puppeteer\爬虫实践

puppeteer 文档:puppeteer.js中文文档|puppeteerjs中文网|puppeteer爬虫教程 Puppeteer本身依赖6.4以上的Node&#xff0c;但是为了异步超级好用的async/await&#xff0c;推荐使用7.6版本以上的Node。另外headless Chrome本身对服务器依赖的库的版本要求比较高&#xff0c;c…

海外网红营销合作指南:详解海外合同与协议要点

随着互联网的发展和社交媒体的普及&#xff0c;海外网红营销成为了品牌推广和营销的重要力量。然而&#xff0c;这种跨国合作需要谨慎考虑&#xff0c;签订合适的合同与协议显得尤为重要&#xff0c;以确保各方权益得到保障并促进合作的顺利进行。本文Nox聚星将详细介绍与海外网…

Stable Diffusion - 扩展 Segment Anything 和 GroundingDINO 实例分割算法 插件的配置与使用

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://blog.csdn.net/caroline_wendy/article/details/131918652 Paper and GitHub&#xff1a; Segment Anything: SAM - Segment Anything GitHub: https://github.com/facebookresearch/s…

学习机器视觉要点:

图像采集&#xff1a;了解镜头、光源、相机选型&#xff0c;打光对图像质量的重要性。 图像处理&#xff1a;掌握压缩、增强、匹配、识别等图像处理技术&#xff0c;包括滤波、连通域、腐蚀膨胀等基本操作。 矩视智能低代码平台&#xff1a;使用该平台进行快速开发&#xff0…

mac电脑强大的解压缩软件BetterZip 5.3.4 for Mac中文版及betterzip怎么压缩

BetterZip 5.3.4 for Mac 是Mac系统平台上一款功能强大的文件解压缩软件&#xff0c;不必解压就能快速地检查压缩文档。它能执行文件之间的合并并提供密码。使用它&#xff0c;用户可以更快捷的向压缩文件中添加和删除文件。它支持包括zip、gz、bz、bz2、tar、tgz、tbz、rar、7…

华为数通HCIA-数通网络基础

基础概念 通信&#xff1a;两个实体之间进行信息交流 数据通信&#xff1a;网络设备之间进行的通信 计算机网络&#xff1a;实现网络设备之间进行数据通信的媒介 园区网络&#xff08;企业网络&#xff09;/私网/内网&#xff1a;用于实现园区内部互通&#xff0c;并且需要部…

Python实现人脸识别功能

Python实现人脸识别功能 闲来没事&#xff0c;记录一下前几天学习的人脸识别小项目。 要想实现人脸识别&#xff0c;我们首先要搞明白&#xff0c;人脸识别主要分为哪些步骤&#xff1f;为了提高人脸识别的准确性&#xff0c;我们首先要把图像或视频中的人脸检测出来&#xf…

Linux入门 系统编程三 嵌入式开发 使用gcc制作静态库动态库,及调用库头文件应用

一、静态库的制作与使用 生成静态的主要是有5个步骤 1、编写源代码 2、将要编译成库的源文件编译成.o文件 3、使用ar命令创建静态库 4、调用库 5、测试 静态库的命名规则&#xff1a;lib开头。.a结尾。lib和.a中间的称作库名。lib库名称作库文件名 1.1 先写两个测试程序&…

《论文阅读》具有特殊Token和轮级注意力的层级对话理解 ICLR 2023

《论文阅读》具有特殊Token和轮级注意力的层级对话理解 前言简介问题定义模型构建知识点Intra-turn ModelingInter-turn Modeling分类前言 你是否也对于理解论文存在困惑? 你是否也像我之前搜索论文解读,得到只是中文翻译的解读后感到失望? 小白如何从零读懂论文?和我一…

用OpenCV图像处理技巧之巧用直方图

1. 引言 欢迎回到我的Python图像处理系列&#xff01;在这一节中&#xff0c;我们将更深入地研究图像分析领域中图像直方图的应用&#xff0c;事实上通过对直方图进行相应操作&#xff0c;我们可以来调整图像的对比度和亮度&#xff0c;这可以极大地改善图像的视觉效果。 闲话…

vue脚手架文件说明

vue脚手架文件说明 1、文件介绍2、脚手架里面主要文件和作用 1、文件介绍 2、脚手架里面主要文件和作用 node_modules 都是下载的第三方包public/index.html 浏览器运行的网页src/main.js webpack打包的入口src/APP.vue Vue页面入口package.json 依赖包列表文件

Tangible Software Solutions Crack

Tangible Software Solutions Crack 有形软件解决方案-最准确可靠的源代码转换器&#xff0c;在VB.NET、C#、Java、C和Python之间进行转换&#xff0c;同时节省了无数小时的艰苦工作和宝贵的时间。 主要优点&#xff1a; 节省宝贵时间 准确全面 安全-您的代码永远不会离开您的机…

阿里云容器镜像仓库(ACR)的创建和使用

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

(一)RabbitMQ概念-优势、劣势、应用场景 、AMQP、工作原理

Lison <dreamlison163.com>, v1.0.0, 2023.06.22 RabbitMQ概念-优势、劣势、应用场景 、AMQP、工作原理 文章目录 RabbitMQ概念-优势、劣势、应用场景 、AMQP、工作原理RabbitMQ概念RabbitMQ的优势RabbitMQ劣势RabbitMQ应用的场景RabbitMQ_AMQPRabbitMQ工作原理 RabbitM…

如何在3ds max中创建可用于真人场景的巨型机器人:第 1部分

推荐&#xff1a; NSDT场景编辑器助你快速搭建可二次开发的3D应用场景 1. 创建主体 步骤 1 打开 3ds Max。 打开 3ds Max 步骤 2 在左侧视口中&#xff0c;按键盘上的 Alt-B 键。它 打开视口配置窗口。 打开“锁定缩放/平移”和“匹配位图”选项。单击“文件”并转到参考 …