大数据技术-Hadoop(三)Mapreduce的介绍与使用

news2025/1/2 22:52:49

目录

一、概念和定义

二、WordCount案例

1、WordCountMapper

2、WordCountReducer

3、WordCountDriver

三、序列化

1、为什么序列化

2、为什么不用Java的序列化

3、Hadoop序列化特点:

4、自定义bean对象实现序列化接口(Writable)

4.1、bean

4.2、FlowBeanMapper

4.3、FlowReducer

4.4、FlowDriver

四、MapReduce框架原理

1、mapreduce流程

 2、Shuffle机制

3、Partion分区

3.1、 默认分区方法

3.2、自定义分区

4、WritableComparable

5、Combiner合并

6、自定义FileOutputFormat

7、Reduce Join

8、数据清洗 ETL

五、数据压缩

1、参数说明

2、代码示例

六、完整代码

七、参考


一、概念和定义

        请看 https://blog.csdn.net/weixin_48935611/article/details/137856999,这个文章概括的很全面,本文主要展示MapReduce的使用。

二、WordCount案例

1、WordCountMapper

package com.xiaojie.hadoop.mapreduce.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author 熟透的蜗牛
 * @version 1.0
 * @description: TODO
 * @date 2024/12/27 9:00
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    Text kOut = new Text();
    IntWritable vOut = new IntWritable(1);

    /**
     * @param key     偏移量
     * @param value   文本值
     * @param context 上下文
     * @description:
     * @return: void
     * @author 熟透的蜗牛
     * @date: 2024/12/27 9:01
     */
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
//        hello world
//        hello mapreduce
//        hello haddop
//        hadoop
//        java
//        mysql
//        mysql orcale
        /**
         这里输出的结果为(hello,1)(world,1)(hello,1) (mapreduce,1)(hello,1)......
         */


        //获取一行,输入的内容
        String line = value.toString();
        //分隔
        String[] words = line.split(" ");
        for (String word : words) {
            kOut.set(word);
            //kout 即为单词 vout 单词出现的次数
            context.write(kOut, vOut);
        }

    }
}

2、WordCountReducer

package com.xiaojie.hadoop.mapreduce.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author 熟透的蜗牛
 * @version 1.0
 * @description: reduce把map的输出当作输入
 * @date 2024/12/27 9:17
 */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    int sum;
    IntWritable v = new IntWritable();

    /**
     * @param key     map 输出的key kOut
     * @param values  map输出的value Vout
     * @param context
     * @description:
     * @return: void
     * @author 熟透的蜗牛
     * @date: 2024/12/27 9:22
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        //累加求和,合并map传递过来的值
        sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        //输出结果
        v.set(sum);
        context.write(key, v);
    }
}

3、WordCountDriver

package com.xiaojie.hadoop.mapreduce.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author 熟透的蜗牛
 * @version 1.0
 * @description: TODO
 * @date 2024/12/27 9:23
 */
public class WordCountDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        // 1 获取配置信息以及获取job对象
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 2 关联本Driver程序的jar
        job.setJarByClass(WordCountDriver.class);
        // 3 关联Mapper和Reducer的jar
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        // 4 设置Mapper输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 5 设置最终输出kv类型

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 6 设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path("D:\\hadoop\\hello.txt"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\wordcount"));

        // 7 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);

    }
}

三、序列化

1、为什么序列化

一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。

2、为什么不用Java的序列化

Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable)。

3、Hadoop序列化特点:

  • 1)紧凑高效使用存储空间。
  • 2)快速:读写数据的额外开销小。
  • (3)互操作:支持多语言的交互

4、自定义bean对象实现序列化接口(Writable)

4.1、bean

package com.xiaojie.hadoop.mapreduce.flow;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @author 熟透的蜗牛
 * @version 1.0
 * @description: 定义一个bean 实现 writable接口
 * @date 2024/12/27 10:25
 */
public class FlowBean implements Writable {

    private long upFlow; //上行流量
    private long downFlow; //下行流量
    private long sumFlow; //总流量

    //创建无参构造函数
    public FlowBean() {
    }

    //创建gettter setter 方法
    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    //重写setSumFlow 方法
    public void setSumFlow() {
        this.sumFlow = this.upFlow + this.downFlow;
    }

    //重写序列化方法,输出和输入的顺序要保持一致
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);

    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();

    }

    //结果显示在文本中,重写tostring 方法,
    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;

    }
}

4.2、FlowBeanMapper

package com.xiaojie.hadoop.mapreduce.flow;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author 熟透的蜗牛
 * @version 1.0
 * @description: 流量mapper
 * @date 2024/12/27 10:32
 */
public class FlowBeanMapper extends Mapper<LongWritable, Text, Text, FlowBean> {

    //定义一个输出的key
    private Text outKey = new Text();
    //定义输出的value 即 FlowBean
    private FlowBean outValue = new FlowBean();

    /**
     * @param key     map的输入值偏移量
     * @param value   map 的输入value
     * @param context
     * @description:
     * @return: void
     * @author 熟透的蜗牛
     * @date: 2024/12/27 10:35
     */
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {

        //获取一行数据
        String line = value.toString();
        //切割数据
        String[] split = line.split("\t");

        //抓取我们需要的数据:手机号,上行流量,下行流量
        String phone = split[1];  //手机号
        //上行流量 ,由于有的数据没有,这里从后面取值
        Long upFlow = Long.parseLong(split[split.length - 3]);
        Long downFlow = Long.parseLong(split[split.length - 2]);

        //封装输出结果
        //设置输出的key
        outKey.set(phone);
        //设置输出的value
        outValue.setUpFlow(upFlow);
        outValue.setDownFlow(downFlow);
        outValue.setSumFlow();
        //写出outK outV
        context.write(outKey, outValue);
    }
}

4.3、FlowReducer

package com.xiaojie.hadoop.mapreduce.flow;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author 熟透的蜗牛
 * @version 1.0
 * @description: 定义流量输出的reduce
 * @date 2024/12/27 10:46
 */
public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

    private FlowBean finalOutV = new FlowBean();

    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {

        long totalUp = 0;
        long totalDown = 0;
        //遍历values,将其中的上行流量,下行流量分别累加
        for (FlowBean bean : values) {
            totalUp += bean.getUpFlow();
            totalUp += bean.getDownFlow();
        }
        //封装输出结果
        finalOutV.setUpFlow(totalUp);
        finalOutV.setDownFlow(totalDown);
        finalOutV.setSumFlow();
        //输出结果
        context.write(key, finalOutV);

    }
}

4.4、FlowDriver

package com.xiaojie.hadoop.mapreduce.flow;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author 熟透的蜗牛
 * @version 1.0
 * @description: 驱动
 * @date 2024/12/27 10:55
 */
public class FlowDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        //获取job对象
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        //设置jar
        job.setJarByClass(FlowDriver.class);

        //设置manpper 和reducer
        job.setMapperClass(FlowBeanMapper.class);
        job.setReducerClass(FlowReducer.class);

        //设置map输出kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        //设置最终输出结果kv
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        //设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path("d://hadoop//phone.txt"));
        FileOutputFormat.setOutputPath(job, new Path("d://hadoop//phone"));

        //提交任务
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);

    }
}

四、MapReduce框架原理

1、mapreduce流程

直观的效果,图片来自 https://blog.csdn.net/weixin_48935611/article/details/137856999

 2、Shuffle机制

Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle

(1)MapTask收集我们的map()方法输出的kv对,放到内存缓冲区中

(2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件

(3)多个溢出文件会被合并成大的溢出文件

(4)在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对key进行排序

(5)ReduceTask根据自己的分区号,去各个MapTask机器上拉取相应的结果分区数据

(6)ReduceTask会抓取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序)

(7)合并成大文件后,Shuffle的过程也就结束了,后面进入ReduceTask的逻辑运算过程(从文件中取出一个一个的键值对Group,调用用户自定义的reduce()方法)

注意:

(1)Shuffle中的缓冲区大小会影响到MapReduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。

(2)缓冲区的大小可以通过参数调整,参数:mapreduce.task.io.sort.mb默认100M。

3、Partion分区

3.1、 默认分区方法

public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

分区个数小于1的时候,就不会再执行上面的分区计算

3.2、自定义分区

package com.xiaojie.hadoop.mapreduce.partitioner;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * @author 熟透的蜗牛
 * @version 1.0
 * @description: 自定义分区
 * @date 2024/12/29 15:52
 */
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {

    /**
     * @param text          键值
     * @param flowBean      值
     * @param numPartitions 返回的分区数
     * @description: 分区逻辑, 手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中
     * @return: int
     * @author 熟透的蜗牛
     * @date: 2024/12/29 15:54
     */
    @Override
    public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
        int partition;
        if (StringUtils.isNotBlank(text.toString())) {
            if (text.toString().startsWith("136")) {
                partition = 0;
            } else if (text.toString().startsWith("137")) {
                partition = 1;
            } else if (text.toString().startsWith("138")) {
                partition = 2;
            } else if (text.toString().startsWith("139")) {
                partition = 3;
            } else {
                partition = 4;
            }
        } else {
            partition = 4;
        }
        return partition;
    }
}
package com.xiaojie.hadoop.mapreduce.partitioner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author 熟透的蜗牛
 * @version 1.0
 * @description: 驱动
 * @date 2024/12/27 10:55
 */
public class FlowDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        //获取job对象
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        //设置jar
        job.setJarByClass(FlowDriver.class);

        //设置manpper 和reducer
        job.setMapperClass(FlowBeanMapper.class);
        job.setReducerClass(FlowReducer.class);

        //设置map输出kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        //设置最终输出结果kv
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        //设施任务数 ,这里设置的要和分区个数一致,如果任务数>分区数则输出文件会有多个为空的文件,如果任务数>1并且<分区数,会有数据无法处理发生异常,
        // 如果任务数为1 ,只会产生一个文件,分区号必须从0开始,逐渐累加
        job.setNumReduceTasks(5);

        //指定自定义分区类
        job.setPartitionerClass(ProvincePartitioner.class);
        //设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path("d://hadoop//phone.txt"));
        FileOutputFormat.setOutputPath(job, new Path("d://hadoop//phone33"));

        //提交任务
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);

    }
}

4、WritableComparable

  @Override
    public int compareTo(FlowBean o) {
        //按照总流量比较,倒序排列
        if (this.sumFlow > o.sumFlow) {
            return -1;
        } else if (this.sumFlow < o.sumFlow) {
            return 1;
        } else {
            //如果总流量一样,按照上行流量排
            if (this.upFlow > o.upFlow) {
                return -1;
            } else if (this.upFlow < o.upFlow) {
                return 1;
            }
            return 0;
        }
    }

5、Combiner合并

package com.xiaojie.hadoop.mapreduce.combiner;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author 熟透的蜗牛
 * @version 1.0
 * @description: TODO
 * @date 2024/12/29 18:50
 */
public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {

   IntWritable outV= new IntWritable(0);
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
       int sum = 0;
        for (IntWritable val : values) {
            sum+=val.get();
        }
        outV.set(sum);
        context.write(key, outV);
    }
}
package com.xiaojie.hadoop.mapreduce.combiner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author 熟透的蜗牛
 * @version 1.0
 * @description: TODO
 * @date 2024/12/27 9:23
 */
public class WordCountDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        // 1 获取配置信息以及获取job对象
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 2 关联本Driver程序的jar
        job.setJarByClass(WordCountDriver.class);
        // 3 关联Mapper和Reducer的jar
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        // 4 设置Mapper输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 5 设置最终输出kv类型

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //设置Combiner
        job.setCombinerClass(WordCountCombiner.class);

        // 6 设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path("D:\\hadoop\\hello.txt"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\wordcount13"));

        // 7 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);

    }
}

6、自定义FileOutputFormat

package com.xiaojie.hadoop.mapreduce.outputformat;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author 熟透的蜗牛
 * @version 1.0
 * @description: TODO
 * @date 2024/12/29 20:29
 */
public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {


    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        //创建一个自定义的RecordWriter返回
        LogRecordWriter logRecordWriter = new LogRecordWriter(job);
        return logRecordWriter;

    }
}
package com.xiaojie.hadoop.mapreduce.outputformat;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

/**
 * @author 熟透的蜗牛
 * @version 1.0
 * @description: TODO
 * @date 2024/12/29 20:31
 */
public class LogRecordWriter extends RecordWriter<Text, NullWritable> {

    private FSDataOutputStream fileOut;
    private FSDataOutputStream otherOut;

    public LogRecordWriter(TaskAttemptContext job) {
        try {
            //获取文件系统对象
            FileSystem fs = FileSystem.get(job.getConfiguration());
            //用文件系统对象创建两个输出流对应不同的目录
            fileOut = fs.create(new Path("d:/hadoop/file.log"));
            otherOut = fs.create(new Path("d:/hadoop/other.log"));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void write(Text key, NullWritable value) throws IOException, InterruptedException {
        String log = key.toString();
        //根据一行的log数据是否包含atguigu,判断两条输出流输出的内容
        if (log.contains("atguigu")) {
            fileOut.writeBytes(log + "\n");
        } else {
            otherOut.writeBytes(log + "\n");
        }

    }

    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        //关流
        IOUtils.closeStream(fileOut);
        IOUtils.closeStream(otherOut);
    }
}

7、Reduce Join

package com.xiaojie.hadoop.mapreduce.join2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;


public class MapJoinDriver {

    public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {

        // 1 获取job信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 2 设置加载jar包路径
        job.setJarByClass(MapJoinDriver.class);
        // 3 关联mapper
        job.setMapperClass(MapJoinMapper.class);
        // 4 设置Map输出KV类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        // 5 设置最终输出KV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        // 加载缓存数据
        job.addCacheFile(new URI("file:///D:/hadoop/pd.txt"));
        // Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
        job.setNumReduceTasks(0);

        // 6 设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path("D:\\hadoop\\order"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\output2222"));
        // 7 提交
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}
package com.xiaojie.hadoop.mapreduce.join2;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    private Map<String, String> pdMap = new HashMap<>();
    private Text text = new Text();

    //任务开始前将pd数据缓存进pdMap
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {

        //通过缓存文件得到小表数据pd.txt
        URI[] cacheFiles = context.getCacheFiles();
        Path path = new Path(cacheFiles[0]);

        //获取文件系统对象,并开流
        FileSystem fs = FileSystem.get(context.getConfiguration());
        FSDataInputStream fis = fs.open(path);

        //通过包装流转换为reader,方便按行读取
        BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));

        //逐行读取,按行处理
        String line;
        while (StringUtils.isNotEmpty(line = reader.readLine())) {
            //切割一行    
        //01	小米
            String[] split = line.split("\t");
            pdMap.put(split[0], split[1]);
        }

        //关流
        IOUtils.closeStream(reader);
    }

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
        //读取大表数据
        //1001	01	1
        String[] fields = value.toString().split("\t");
        //通过大表每行数据的pid,去pdMap里面取出pname
        String pname = pdMap.get(fields[1]);
        //将大表每行数据的pid替换为pname
        text.set(fields[0] + "\t" + pname + "\t" + fields[2]);
        //写出
        context.write(text,NullWritable.get());
    }
}

8、数据清洗 ETL

package com.xiaojie.hadoop.mapreduce.etl;

import com.xiaojie.hadoop.mapreduce.outputformat.LogDriver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WebLogDriver {
    public static void main(String[] args) throws Exception {

// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
        args = new String[]{"D:\\hadoop\\weblog", "D:\\hadoop\\outlog"};

        // 1 获取job信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2 加载jar包
        job.setJarByClass(LogDriver.class);

        // 3 关联map
        job.setMapperClass(WebLogMapper.class);

        // 4 设置最终输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        // 设置reducetask个数为0
        job.setNumReduceTasks(0);

        // 5 设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 6 提交
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}
package com.xiaojie.hadoop.mapreduce.etl;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author 熟透的蜗牛
 * @version 1.0
 * @description: 数据清洗,清洗掉不符合格式的数据
 * @date 2024/12/29 21:37
 */
public class WebLogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // 1 获取1行数据
        String line = value.toString();

        // 2 解析日志
        boolean result = parseLog(line, context);

        // 3 日志不合法退出
        if (!result) {
            return;
        }

        // 4 日志合法就直接写出
        context.write(value, NullWritable.get());
    }

    // 2 封装解析日志的方法
    private boolean parseLog(String line, Context context) {

        // 1 截取
        String[] fields = line.split(" ");

        // 2 日志长度大于11的为合法
        if (fields.length > 11) {
            return true;
        } else {
            return false;
        }
    }
}

五、数据压缩

1、参数说明

参数

默认值

阶段

建议

io.compression.codecs

(在core-site.xml中配置)

无,这个需要在命令行输入hadoop checknative查看

输入压缩

Hadoop使用文件扩展名判断是否支持某种编解码器

mapreduce.map.output.compress(在mapred-site.xml中配置)

false

mapper输出

这个参数设为true启用压缩

mapreduce.map.output.compress.codec(在mapred-site.xml中配置)

org.apache.hadoop.io.compress.DefaultCodec

mapper输出

企业多使用LZO或Snappy编解码器在此阶段压缩数据

mapreduce.output.fileoutputformat.compress(在mapred-site.xml中配置)

false

reducer输出

这个参数设为true启用压缩

mapreduce.output.fileoutputformat.compress.codec(在mapred-site.xml中配置)

org.apache.hadoop.io.compress.DefaultCodec

reducer输出

使用标准工具或者编解码器,如gzip和bzip2

2、代码示例

package com.xiaojie.hadoop.mapreduce.zip;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author 熟透的蜗牛
 * @version 1.0
 * @description: TODO
 * @date 2024/12/27 9:23
 */
public class WordCountDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        // 1 获取配置信息以及获取job对象
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 2 关联本Driver程序的jar
        job.setJarByClass(WordCountDriver.class);
        // 3 关联Mapper和Reducer的jar
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        // 4 设置Mapper输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 5 设置最终输出kv类型

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);


        //设置压缩格式
        FileOutputFormat.setCompressOutput(job, true);

        // 设置压缩的方式
        FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);

        // 6 设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path("D:\\hadoop\\hello.txt"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\wordcount111"));

        // 7 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);

    }
}

六、完整代码

spring-boot: Springboot整合redis、消息中间件等相关代码 - Gitee.com

七、参考

https://blog.csdn.net/weixin_48935611/article/details/137856999

参考内容来自尚硅谷大数据学习

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2267833.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Echarts+vue电商平台数据可视化——webSocket改造项目

websocket的基本使用&#xff0c;用于测试前端能否正常获取到后台数据 后台代码编写&#xff1a; const path require("path"); const fileUtils require("../utils/file_utils"); const WebSocket require("ws"); // 创建WebSocket服务端的…

神经网络-Inception

Inception网络是由Google开发的一种深度卷积神经网络架构&#xff0c;旨在解决计算机视觉领域中的图像分类和物体识别任务。 Inception网络最初在2014年被提出&#xff0c;并在ImageNet图像分类挑战赛上取得了很好的结果。其设计灵感来自于模块化的思想&#xff0c;将不同尺度…

js给dom分页

减少js操作dom js引擎与渲染引擎相互独立, js操作dom过程开销大操作到了dom层面会触发渲染树的变化,触发回流与重绘开销大 让js给dom分压 js处理完操作后,最后给dom 缓存变量 let container document.getElementById(container) let content for(let count0;count<…

深度学习——神经网络中前向传播、反向传播与梯度计算原理

一、前向传播 1.1 概念 神经网络的前向传播&#xff08;Forward Propagation&#xff09;就像是一个数据处理的流水线。从输入层开始&#xff0c;按照网络的层次结构&#xff0c;每一层的神经元接收上一层神经元的输出作为自己的输入&#xff0c;经过线性变换&#xff08;加权…

秒鲨后端之MyBatis【2】默认的类型别名、MyBatis的增删改查、idea中设置文件的配置模板、MyBatis获取参数值的两种方式、特殊SQL的执行

别忘了请点个赞收藏关注支持一下博主喵&#xff01;&#xff01;&#xff01;! ! ! 下篇更新&#xff1a; 秒鲨后端之MyBatis【3】自定义映射resultMap、动态SQL、MyBatis的缓存、MyBatis的逆向工程、分页插件。 默认的类型别名 MyBatis的增删改查 添加 <!--int insertUs…

瑞芯微全新芯片平台RK3506优势详解,高集成低功耗,为工业而生 触觉智能测评

RK3506是瑞芯微Rockchip在2024年第四季度全新推出的Arm嵌入式芯片平台&#xff0c;三核Cortex-A7单核Cortex-M0多核异构设计&#xff0c;CPU频率达1.5Ghz, M0 MCU为200Mhz。 而RK3506芯片平台下的工业级芯片型号RK3506J&#xff0c;具备-40-85℃的工业宽温性能、发热量小&#…

AIOps平台的功能对比:如何选择适合的解决方案?

定义与概念 AIOps&#xff0c;即人工智能运维&#xff08;Artificial Intelligence for IT Operations&#xff09;&#xff0c;是将人工智能技术应用于 IT 运维领域&#xff0c;以实现自动化、智能化的运维管理。它通过整合大数据、机器学习等先进技术&#xff0c;对海量运维数…

Python + 深度学习从 0 到 1(03 / 99)

希望对你有帮助呀&#xff01;&#xff01;&#x1f49c;&#x1f49c; 如有更好理解的思路&#xff0c;欢迎大家留言补充 ~ 一起加油叭 &#x1f4a6; 欢迎关注、订阅专栏 【深度学习从 0 到 1】谢谢你的支持&#xff01; ⭐ 神经网络的数据表示 – 张量 你可能对矩阵很熟悉&a…

Lumos学习王佩丰Excel第二十三讲:饼图美化与PPT图表

一、双坐标柱形图的补充知识 1、主次坐标设置 2、主次坐标柱形避让&#xff08;通过增加两个系列&#xff0c;挤压使得两个柱形挨在一起&#xff09; 增加两个系列 将一个系列设置成主坐标轴&#xff0c;另一个设成次坐标轴 调整系列位置 二、饼图美化 1、饼图美化常见设置 …

基于Vue+SSM+SpringCloudAlibaba书籍管理系统

功能要求 一、登录功能&#xff08;http://localhost:8080/#/login&#xff09; 输入账号和密码(admin/admin)进行登录&#xff1a; 如果密码错误&#xff0c;给出提示信息 如果密码正确&#xff0c;跳转到主页 账号或密码错误&#xff1a; 账号密码正确&#xff1a;跳转到…

【优先算法】滑动窗口 --(结合例题讲解解题思路)(C++)

目录 ​编辑 1.什么是滑动窗口&#xff1f; 2. 滑动窗口例题 2.1 例题1&#xff1a;长度最小的子数组 2.1.1 解题思路 2.1.2 方法一&#xff1a;暴力枚举出所有的子数组的和 2.1.3 方法二&#xff1a;使用 “同向双指针” 也就是滑动窗口来进行优化 2.2 例题2&#xff1a;无重…

VS Code 从命令行启动

在 VS Code 中&#xff0c;code 命令允许你在命令行中快速打开文件、文件夹或新窗口。 安装 原本地址&#xff1a;https://code.visualstudio.com/docs/setup/mac 使用 使用 code 命令 打开文件&#xff1a;你可以通过在命令行输入 code 文件名 来直接打开一个文件。 打开文…

微服务-配置管理

文章目录 1.什么是配置管理2.配置共享添加共享配置拉取共享配置 3.配置热更新添加配置到Nacos配置热更新 4.动态路由监听Nacos配置变更 1.什么是配置管理 到目前为止我们已经解决了微服务相关的几个问题&#xff1a; 微服务远程调用微服务注册、发现微服务请求路由、负载均衡…

ArrayList 和LinkedList的区别比较

前言 ‌ArrayList和LinkedList的主要区别在于它们的底层数据结构、性能特点以及适用场景。‌ArrayList和LinkedList从名字分析&#xff0c;他们一个是Array&#xff08;动态数组&#xff09;的数据结构&#xff0c;一个是Linked&#xff08;链表&#xff09;的数据结构&#x…

MySQL--》如何在SQL中巧妙运用函数与约束,优化数据处理与验证?

目录 函数使用 字符串函数 数值函数 日期函数 流程函数 约束 外键约束 约束规则 函数使用 函数是指一段可以直接被另一段程序调用的程序或代码&#xff0c;在mysql当中有许多常见的内置函数&#xff0c;接下来开始对这些内置函数及其作用进行简单的讲解和使用&#xf…

一文大白话讲清楚CSS盒子模型和块级格式化上下文(BFC)

一文大白话讲清楚CSS盒子模型和块级格式化上下文&#xff08;BFC&#xff09; 1.啥是个CSS盒子 鞋盒你家总有吧&#xff0c;方方正正&#xff0c;有长度有高度。css盒子跟这个八九不离十当我们编写html页面时&#xff0c;写了很多的元素&#xff0c;比如"div",&quo…

Docker 快速搭建 GBase 8s数据库服务

1.查看Gbase 8s镜像版本 可以去到docker hub网站搜索&#xff1a;gbase8s liaosnet/gbase8s如果无法访问到该网站&#xff0c;可以通过docker search搜索 docker search gbase8s2.拉取Gbase 8s镜像 以下演示的版本是目前官网最新版本Gbase8sV8.8_3.5.1 docker pull liaosn…

密钥登录服务器

1. 生成 SSH 密钥对 如果您还没有生成密钥对&#xff0c;可以使用以下命令生成&#xff1a; ssh-keygen 在 root 用户的家目录中生成了一个 .ssh 的隐藏目录&#xff0c;内含两个密钥文件&#xff1a;id_rsa 为私钥&#xff0c;id_rsa.pub 为公钥。 在提示时&#xff0c;您可…

王佩丰24节Excel学习笔记——第二十讲:图表基础

【以 Excel2010 系列学习&#xff0c;用 Office LTSC 专业增强版 2021 实践】 【本章技巧】 课件图片有问题&#xff0c;不能随隐藏熟悉各个图表小部件的功能&#xff0c;需要修改都是选中右键进行更改。 一、认识图表中的元素 图表标题&#xff1a;主坐标&#xff08;横坐标&…

华为交换机配置本地端口流量镜像

端口镜像&#xff08;Port Mirroring&#xff09;是网络监控的一种重要技术&#xff0c;通过复制流经特定端口的报文&#xff0c;并将其传送到指定的观察端口&#xff0c;以便对网络流量进行分析和监控。下面将详细介绍如何在华为交换机上配置本地端口镜像&#xff0c;以N:1镜像…