MapReduce【小文件的优化-Sequence文件】

news2025/1/23 6:12:30

在实际开发中,我们肯定希望提高MapReduce的工作效率,其实提高MapReduce的效率,无非就是提高Map阶段和Reduce阶段的效率。

Map阶段优化之小文件问题

我们知道Map阶段中的MapTask个数是与InputSplit的个数有关的,一般一个InputSplit切片对应一个,而且InputSplit的个数我们一般也无法控制,应为默认就是128MB,但是往往我们的文件并不是这样,而是大小不一,有的可能300MB,一个可能只有10KB,尤其是为一群几十KB的小文件一个划分一个InputSplit切片,实在浪费资源。

而且针对HDFS而言,每一个小文件在namenode中都会占用150字节的内存空间,最终会导致集群中虽然存储了很多个文件,但是文件的体积并不大,这样就没有意义了。

        这就想到我之前的文章【自定义InputFormat】,我们可以这样来将多个小文件都合并到一个文件当中,进入map()方法后仍然需要我们进行处理才能使用,但这并不是标准的Sequence文件,它的输出键是一个文件名,值是我们文件的字节码,所以如果我们要实现一个WordCount,还需要进一步需要使用的话还需要针对每一行的值将字节码转为Stering,在做分词处理。而下面是真正生成一个Sequence文件的代码:

生成Sequence文件

我们需要将三个小文件合并成一个Sequence文件。

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.DefaultCodec;

import java.io.File;

/**
 * 小文件解决方案之SequenceFile
 */
public class SmallFileSeq {

    public static void main(String[] args) throws Exception{
        //在hdfs中生成SequenceFile文件到临时目录下
//        write("D:\\smallFile","/tmp/seqFile");
        //读取SequenceFile文件
//        read("/tmp/seqFile");

        //在windows端生成SequenceFile文件
//        write("D:\\MapReduce_Data_Test\\myinputformat\\input","D:\\MapReduce_Data_Test\\myinputformat\\outputSeq");
        write("D:\\MapReduce_Data_Test\\sequence\\word\\","D:\\MapReduce_Data_Test\\sequence\\wordSeq");
        //读取SequenceFile文件 注意是文件地址 不是目录地址
//        read("D:\\MapReduce_Data_Test\\myinputformat\\outputSeq");
    }

    /**
     * 生成SequenceFile文件
     * @param inputDir 输入目录-windows目录
     * @param outputFile 输出文件-hdfs文件
     * @throws Exception
     */
    private static void write(String inputDir,String outputFile)
            throws Exception{
        //创建一个配置对象
        Configuration conf = new Configuration();
        //指定HDFS的地址
//        conf.set("fs.defaultFS","hdfs://hadoop102:9000");
        //获取操作文件系统对象
        FileSystem fileSystem = FileSystem.get(conf);

        //删除输出文件
        fileSystem.delete(new Path(outputFile),true);

        //构造opts数组,有三个元素
        /*
        第一个是输出路径
        第二个是key类型
        第三个是value类型
         */
        SequenceFile.Writer.Option[] opts = new SequenceFile.Writer.Option[]{
                SequenceFile.Writer.file(new Path(outputFile)),
                SequenceFile.Writer.keyClass(Text.class),
                SequenceFile.Writer.valueClass(Text.class)};

        //创建一个writer实例
        SequenceFile.Writer writer = SequenceFile.createWriter(conf, opts);
        //指定要压缩的文件的目录
        File inputDirPath = new File(inputDir);
        if(inputDirPath.isDirectory()){
            File[] files = inputDirPath.listFiles();
            for (File file : files) {
                //获取文件全部内容
                String content = FileUtils.readFileToString(file, "UTF-8");
                //文件名作为key
                Text key = new Text(file.getName());
                //文件内容作为value
                Text value = new Text(content);
                writer.append(key,value);
            }
        }
        writer.close();
    }

    /**
     * 读取SequenceFile文件
     * @param inputFile SequenceFile文件路径 注意是文件地址不是目录地址
     * @throws Exception
     */
    private static void read(String inputFile)
            throws Exception{
        //创建一个配置对象
        Configuration conf = new Configuration();
        //指定HDFS的地址
//        conf.set("fs.defaultFS","hdfs://hadoop102:9000");

//        指定windows端的文件路径,注意在Windows系统中,文件系统没有端口号的概念。因此,只需要使用file://协议指定文件路径即可连接本地文件系统。
        conf.set("fs.defaultFS","file:///");
        //创建阅读器
        SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(new Path(inputFile)));
        Text key = new Text();
        Text value = new Text();
        //循环读取数据
        while(reader.next(key,value)){
            //输出文件名称
            System.out.print("文件名:"+key.toString()+",");
            //输出文件的内容
            System.out.println("文件内容:\n"+value.toString());
        }
        reader.close();
    }


}

输入:

输出:  

 

        我们可以看到一共会输出两个文件,下面的 outputSeq是真正的Sequence文件 ,文件名是我们自己指定的,而上面的.outputSeq.crc则是一个校验码文件。

接下来要做的就是如何写一个MapReduce程序将我们的小文件内容从这个合并后的Sequence文件中读取出来

Mapper类

我们需要在Job中指定输入格式InputFormat为SequenceFileInputFormat,我们读取Sequence文件只需要指定键和值都为Text即可

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;

import java.io.IOException;

public class SeqMapper extends Mapper<Text, Text,Text,LongWritable> {

    private Text OUT_KEY = new Text();
    private LongWritable OUT_VALUE = new LongWritable(1);
    @Override
    protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {

        String line = value.toString();
        System.out.println(line);
        String[] words = StringUtils.split(line,'\t');
        for (String word : words) {
            System.out.println("key:" + word);
            OUT_KEY.set(word);
            context.write(OUT_KEY,OUT_VALUE);
        }
    }

}

Reducer类

依旧和之前的WordCount没什么两样。

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class SeqReducer extends Reducer<Text, LongWritable,Text,LongWritable> {

    private LongWritable OUT_VALUE = new LongWritable();
    @Override
    public void reduce(Text key, Iterable<LongWritable> values, Context context)
            throws IOException, InterruptedException {

        long sum = 0;
        for (LongWritable val : values) {
            sum += val.get();
        }
        OUT_VALUE.set(sum);
        context.write(key,OUT_VALUE);
    }

}

Runner类 

与之前不同的是修改输入格式为SequenceFileInputFormat,而且我们读取Sequence文件时,需要指定键和值的类型都为Text类型。

这个值并不是BytesWritable类型,如果设置为BytesWritable会报错:org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.io.BytesWritable。这就说明Sequence文件读取时的值类型为Text类型。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SeqRunner extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(),new SeqRunner(),args);
    }



    @Override
    public int run(String[] args) throws Exception {
        //1.获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "sequence file word count");

        //2.配置jar包路径
        job.setJarByClass(SeqRunner.class);

        //3.关联mapper和reducer
        job.setMapperClass(SeqMapper.class);
        job.setReducerClass(SeqReducer.class);

        //设置输入格式为 Sequence文件格式
        job.setInputFormatClass(SequenceFileInputFormat.class);

        //4.设置map、reduce输出的k、v类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        //5.设置统计文件输入的路径,将命令行的第一个参数作为输入文件的路径
        FileInputFormat.setInputPaths(job,new Path("D:\\MapReduce_Data_Test\\sequence\\input"));
        //6.设置结果数据存放路径,将命令行的第二个参数作为数据的输出路径
        FileOutputFormat.setOutputPath(job,new Path("D:\\MapReduce_Data_Test\\sequence\\output"));
        return job.waitForCompletion(true) ? 0 : 1;//verbose:是否监控并打印job的信息
    }
}

输出:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/628450.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

《微服务实战》 第二十八章 分布式锁框架-Redisson

前言 Redisson 在基于 NIO 的 Netty 框架上&#xff0c;充分的利⽤了 Redis 键值数据库提供的⼀系列优势&#xff0c;在Java 实⽤⼯具包中常⽤接⼝的基础上&#xff0c;为使⽤者提供了⼀系列具有分布式特性的常⽤⼯具类。使得原本作为协调单机多线程并发程序的⼯具包获得了协调…

VR全景营销颠覆传统营销模式,让商企博“出圈”

在激烈的市场竞争中&#xff0c;营销成为了商企博“出圈”的重要课题&#xff0c;随着5G的到来&#xff0c;VR全景迈入了快速发展时期&#xff0c;随着VR全景的普及应用&#xff0c;商业领域也逐渐引入了VR全景营销。 时下&#xff0c;商企的营销是越发困难&#xff0c;传统的营…

币圈下半年重点之一:以太坊坎昆升级,将带来哪些实质性利好?

近期BRC-20大火&#xff0c;主打价值存储的比特币竟然生态比以太坊还热&#xff0c;但要论生态&#xff0c;以太坊才是真正的王者&#xff0c;因为其正在悄悄酝酿下一个重大升级——坎昆&#xff08;Dencun&#xff09;升级。 最新消息&#xff0c;以太坊开发者已经就Dencun升级…

【MySQL高级篇笔记-数据库的设计规范(中) 】

此笔记为尚硅谷MySQL高级篇部分内容 目录 一、为什么要数据库设计 二、范式 1、范式简介 2、范式都包括哪些 3、键和相关属性的概念 4、第一范式(1st NF) 5、第二范式(2nd NF) 6、第三范式(3rd NF) 7、小结 三、反范式化 1、概述 2、 应用举例 3、反范式的新问…

逆向分析高薪就业:学习Android逆向开发,拥抱行业机会!

简述 Android 逆向开发是指利用各种技术手段对安卓应用程序进行逆向分析和研究&#xff0c;以了解应用程序的内部机制&#xff0c;发现应用程序中的漏洞、脆弱性或者安全问题&#xff0c;并提供相关的解决方案。逆向开发技术可以帮助开发人员更好地了解应用程序的构成、运行机…

Django实现接口自动化平台(六)httprunner(2.x)基本使用【持续更新中】

上一章&#xff1a; Django实现接口自动化平台&#xff08;五&#xff09;httprunner&#xff08;2.x&#xff09;基本使用【持续更新中】_做测试的喵酱的博客-CSDN博客 下一章&#xff1a; 一、 api 文件夹&#xff08;没有任何数据依赖的场景&#xff09; api 文件夹&…

一键生成代码

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

(五)CSharp-进一步理解接口

一、什么是接口 接口是指定一组函数成员而不实现它们的引用类型。 实现接口&#xff1a; 只能由类和结构来实现接口。 二、声明接口 接口声明不能包含以下成员&#xff1a; 数据成员静态成员 接口声明只能包含如下类型的非静态成员函数的声明&#xff1a; 方法属性事件索…

25张python代码速查表,让你python能力突飞猛进的秘诀!

在学习函数时&#xff0c;总是会有很多东西学得很快&#xff0c;遗忘得也很快。但其实在学习中&#xff0c;只需要知道相关参数&#xff0c;加以调整就够了。所以你可能需要这本秘籍&#xff01; 即整理了Python科学速查表&#xff0c;就可以帮你解决以上的问题。当你在练习的时…

怎样正确做 Web 应用的压力测试?字节8年测试5个步骤给我看师了

Web应用&#xff0c;通俗来讲就是一个网站&#xff0c;主要依托于浏览器来访问其功能。 那怎么正确做网站的压力测试呢&#xff1f; 提到压力测试&#xff0c;我们想到的是服务端压力测试&#xff0c;其实这是片面的&#xff0c;完整的压力测试包含服务端压力测试和前端压力测…

高可用系统架构总结

文章目录 系统设计的一些原则海恩法则墨菲定律 软件架构中的高可用设计什么是高可用故障的度量与考核解决高可用问题具体方案 集群化部署负载均衡负载均衡实现内部服务外部服务数据库 负载均衡算法round-robinip_hashhash key 失败重试健康检查TCPHTTP 隔离线程隔离进程隔离集群…

华秋观察 | 通讯产品 PCB 面临的挑战,一文告诉你

印制电路板是电子产品的关键电子互联件&#xff0c;被誉为“电子产品之母”。随着电子产品相关技术应用更快发展、迭代、融合&#xff0c;PCB作为承载电子元器件并连接电路的桥梁&#xff0c;为满足电子信息领域的新技术、新应用的需求&#xff0c;行业将迎来巨大的挑战和发展机…

rocky9脚本py格式

在linux7上编写/root/CreateFile.py的python3脚本&#xff0c;创建20个文件/root/test/File01至/root/test/File20&#xff0c;如果文件存在&#xff0c;则先删除再创建&#xff1b;每个文件的内容同文件名&#xff0c;如File01文件的内容为”File01” 先在root目录下建立所需…

使用单片机遇到的几个问题及解决方案1

1.为什么我跟着视频学习的过程中&#xff0c;我没有找到“端口"的选项呢&#xff1f;我甚至没有出现“其他插口”。 想要找到设备管理器最快的方法就是&#xff1a; 首先如果把输入法调为大写形式&#xff0c;然后按下“WINX”&#xff0c;再按“M”就会出现一个设备管理…

python制作炸弹人游戏,一起来爆破消灭敌人吧

前言 嗨喽&#xff0c;大家好呀~这里是爱看美女的茜茜呐 《炸弹人》是HUDSON出品的一款ACT类型游戏&#xff0c;经典的第一作登陆在FC版本&#xff0c;游戏于1983年发行。 游戏具体操作是一个机器人放置炸弹来炸死敌人&#xff0c;但也可以炸死自己&#xff0c;还有些增强威力…

K8S之服务Service(十三)

1、Service概念&#xff1a; Kubernetes中的 Pod是有生命周期的&#xff0c;它们可以被创建&#xff0c;也可以被销毁&#xff0c;然而一旦被销毁pod生命就永远结束&#xff0c;这个pod就不存在了&#xff0c;通过ReplicaSets能够动态地创建和销毁Pod&#xff08;例如&#xff…

【计算思维题】少儿编程 蓝桥杯青少组计算思维 数学逻辑思维真题详细解析第8套

少儿编程 蓝桥杯青少组计算思维题真题及解析第8套 1、下列哪个选项填到填到下图空缺处最合适 A、 B、 C、 D、 答案:D 考点分析:主要考查小朋友们的观察能力,从给定的图中可以看到,图中的线条都是有实现和虚

【C++ 学习 ⑨】- 万字详解 string 类(上)

目录 一、为什么学习 string 类&#xff1f; 二、标准库中的 string 类 三、C STL容器是什么&#xff1f; 四、string 类的成员函数 4.1 - 构造函数 4.2 - 赋值运算符重载 4.3 - 容量操作 4.4 - 遍历及访问操作 4.4.1 - operator[] 和 at 4.4.2 - 迭代器 4.5 - 修改…

Node.js 使用踩坑

重装电脑后&#xff0c;重装node.js 出现一个问题&#xff1a; npm install 会报错 按提示操作后 而npm run serve 会报xlsx和echart的错误&#xff0c;提示引用不对之类的&#xff0c;但是公司项目固定的&#xff0c;不可以随便改&#xff0c;而且之前是没问题的。 此时需要找…

华为OD机试真题B卷 Java 实现【数组拼接】,附详细解题思路

一、题目描述 现在有多组整数数组&#xff0c;需要将它们合并成一个新的数组。 合并规则&#xff0c;从每个数组里按顺序取出固定长度的内容合并到新的数组中&#xff0c;取完的内容会删除掉&#xff0c;如果该行不足固定长度或者已经为空&#xff0c;则直接取出剩余部分的内…