MapReduce【自定义InputFormat】

news2025/1/9 1:07:16

MapReduce在处理小文件时效率很低,但面对大量的小文件又不可避免,这个时候就需要相应的解决方案。

默认的输入格式为TextInputFormat,对于小文件,它是按照它的父类FileInputFormat的切片机制来切片的,也就是不管一个文件多小,独占一片!对于之前的wordcount案例来说,输入目录下一共有4个文件,这将开启4个reduceTask去执行!!!

我们可以自定义InputFormat来实现小文件的合并:

需求

我们需要将三个文本文件合并为一个序列化文件

 输入

三个文本文件

输出

一个二进制序列化文件

1.自定义Inputformat类

需要实现两个方法

  1. isSplitable():是否可以切片,我们修改返回值为false不可切割。
  2. createRecordReader:返回我们自定义的RecordReader对象。
public class MyInputFormat extends FileInputFormat<Text, BytesWritable> {

    //设置文件不可切片,使得一个文件最多作为1片
    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }

    //设置读取文件的格式为自定义格式
    @Override
    public RecordReader<Text,BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        return new MyRecordReader();
    }
}

2. 自定义RecordReader类

我们主要修改两个地方:key和value。

因为我们Mapper类中的map方法需要有四个参数,其中的KEY_IN和VALUE_IN都是由我们的RecordReader类来设置的,这里我们需要设置一下。

  1. 默认的RecordReader类的key为LongWritable类型,也就是一行数据对应的字节偏移量,这里我们设置key为我们的文件名,也就是Text类型。
  2. 默认的RecordReader类的value为Text类型,也就是一行文本,这里我们设置value为文件名key对应的文件的二进制序列,也就是BytesWritable类型。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class MyRecordReader extends RecordReader<Text,BytesWritable> {
    private Text key;
    private BytesWritable value;
    private String filename;
    private int length;
    private FileSystem fs;
    private Path path;
    private FSDataInputStream is;
    private boolean flag=true;
    @Override
    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {

        FileSplit fileSplit = (FileSplit) inputSplit;

        filename = fileSplit.getPath().getName();

        length = (int) fileSplit.getLength();

        path = fileSplit.getPath();

        //获取当前Job的配置对象
        Configuration conf = taskAttemptContext.getConfiguration();
        //获取当前Job使用的文件系统
        fs = FileSystem.get(conf);
        is = fs.open(path);
    }


    //文件的名称做为 key - 文件的内容分装为BytesWritable类型的 value, 返回true
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {

        //第一次调用nextKeyValue方法
        if (flag){
            //实例化对象
            if (key==null){
                key = new Text();
            }
            if (value==null){
                value = new BytesWritable();
            }

            //赋值
            //将文件名封装到key中
            key.set(filename);
            //将文件的内容读取封装到value中
            byte[] content = new byte[ length];
            IOUtils.readFully(is,content,0,length);
            value.set(content,0,length);
            flag = false;
            return true;
        }
        //第二次调用直接执行 return false
        return false;
    }

    //返回当前读取到的key
    @Override
    public Text getCurrentKey() throws IOException, InterruptedException {
        return key;
    }

    //返回当前读取到的value
    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    //返回读取切片的进度
    @Override
    public float getProgress() throws IOException, InterruptedException {
        return 0;
    }

    //关闭资源
    @Override
    public void close() throws IOException {
        if (is != null){
            IOUtils.closeStream(is);
        }
        if (fs != null){
            fs.close();
        }
    }
}

3. Mapper类

在我们的自定义RecordReader类中,我们已经设置输入的key为文件名,value设置为文件的二进制序列,所以这里直接将key和value写出即可,key的类型为Text,value的类型为BytesWritable。

public class SequenceFileMapper extends Mapper<Text, BytesWritable,Text,BytesWritable> {
    @Override
    protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {
        context.write(key,value);
    }
}

4.Reducer类

public class SequenceFileReducer extends Reducer<Text, BytesWritable,Text,Text> {
    private Text OUT_VALUE = new Text();
    @Override
    protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {

        String value = values.toString();
        OUT_VALUE.set(value);
        context.write(key,OUT_VALUE);

    }
}

5.启动类

import com.lyh.mapreduce.MaxTemp.MaxTempMapper;
import com.lyh.mapreduce.MaxTemp.MaxTempReducer;
import com.lyh.mapreduce.MaxTemp.MaxTempRunner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MySequenceFileRunner extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(),new MySequenceFileRunner(),args);
    }

    @Override
    public int run(String[] args) throws Exception {
        //1.获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "my sequence file demo");

        //2.配置jar包路径
        job.setJarByClass(MySequenceFileRunner.class);

        //3.关联mapper和reducer
        job.setMapperClass(SequenceFileMapper.class);
        job.setReducerClass(SequenceFileReducer.class);

        //4.设置map、reduce输出的k、v类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);

        //设置切片机制为我们自定义的切片机制
        job.setInputFormatClass(MyInputFormat.class);

        //5.设置统计文件输入的路径,将命令行的第一个参数作为输入文件的路径
        FileInputFormat.setInputPaths(job,new Path("D:\\MapReduce_Data_Test\\myinputformat\\input"));
        //6.设置结果数据存放路径,将命令行的第二个参数作为数据的输出路径
        FileOutputFormat.setOutputPath(job,new Path("D:\\MapReduce_Data_Test\\myinputformat\\output1"));
        return job.waitForCompletion(true) ? 0 : 1;//verbose:是否监控并打印job的信息
    }
}

执行结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/541969.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java基础学习(17)网络编程

Java基础学习 一、 网络编程1.1 什么是网络编程1.2 常见的软件架构&#xff1a;1.3 网络编程的三要素1.4 IP1.4.1 InetAddress用法 1.5 端口号1.6 协议1.6.1 UDP协议1.6.1.1 UDP的三种通信方式 1.6.2 TCP协议1.6.2.1 TCP底层原理 一、 网络编程 1.1 什么是网络编程 解释&…

C++常用的支持中文的GUI库Qt 6之一:下载、安装与使用

C常用的支持中文的GUI库Qt 6之一&#xff1a;下载、安装与使用 因为Qt发展变化较快&#xff0c;网上许多介绍Qt的下载、安装与使用已过时&#xff0c;初学者常因行不通而受挫&#xff0c;故此发布本文&#xff0c;以Qt 6.2.4开源版在Windows 10安装与使用为例介绍。 C好用的GU…

DM8:达梦数据库宕机版本记录汇总(持续更新中)

DM8:达梦数据库宕机版本记录汇总&#xff08;持续更新中&#xff09; 环境介绍1 SQL引起的数据库故障1.1 SQL引起的数据库故障汇总表1.2 gdb dmserver core 调试得到数据库崩溃时的内存信息1.3 优化参数解决故障 环境介绍 在测试环境或生产环境&#xff0c;特别情况下会遇到数…

go tool pprof 参数 ‘-base‘ 和 ‘-diff_base‘ 之间的区别

go tool pprof 工具是用于分析由 runtime/pprof包 或 net/http/pprof包产生的profile数据&#xff0c;完整的帮助文档在 https://github.com/google/pprof/blob/main/doc/README.md &#xff0c;pprof 工具支持的参数很多&#xff0c;可以用命令 go tool pprof --help来查看全部…

DJ5-4 交换局域网(第一节课)

目录 一、局域网概述 1、LAN 的特点和分类 2、常见的网络拓扑结构 二、计算机与局域网的连接 三、局域网体系结构 四、链路层寻址地址 1、MAC 地址分配 2、MAC 地址识别 五、ARP 地址解析协议 1、ARP 地址解析协议 2、ARP&#xff1a;两个主机位于同一个局域网 3、…

网络计算模式复习(六)

什么是CDN CDN的全称是Content Delivery Network&#xff0c;即内容分发网络。 其目的通过在现有的Internet中增加一层新的网络架构&#xff0c;将网站的内容发布到最接近用户的网络“边缘”&#xff0c;使用户可以就近取得所需的内容&#xff0c;解决Internet网络拥挤的状况&…

socket套接字通信 TCP传输控制协议/IP网络协议 5.18

B/S :浏览器和服务器 C/S :客户机和服务器 网络的体系结构&#xff1a; 网络的层次结构和每层所使用协议的集合 网络采用分层管理的方法&#xff0c;将网络的功能划分为不同的模块 OSI模型&#xff1a; 共7种&#xff1a; 应用层&#xff1a;接收用户的数据&#xff0c;面…

Blender基础技巧小结(三)

本文续签一篇&#xff1a;Blender基础技巧小结&#xff08;二&#xff09;_皮尔斯巴巴罗的博客-CSDN博客 将物体显示为模板&#xff0c;类似Maya Template 四窗口视图 调整3d视图远截面 Blender并不直接支持放样&#xff0c;可以用第三方插件&#xff0c;但效果并不好 基于me…

usb摄像头驱动打印信息

usb摄像头驱动打印信息 文章目录 usb摄像头驱动打印信息 在ubuntu中接入罗技c920摄像头打印的信息如下&#xff1a; [ 100.873222] usb 3-2: new high-speed USB device number 5 using xhci_hcd [ 101.230728] usb 3-2: New USB device found, idVendor046d, idProduct08e5 …

SpringMVC的拦截器(Interceptor)

文章目录 1 拦截器概念2 拦截器入门案例2.1 环境准备2.2 拦截器开发步骤1:创建拦截器类步骤2:配置拦截器类步骤3:SpringMVC添加SpringMvcSupport包扫描步骤4:运行程序测试步骤5:修改拦截器拦截规则步骤6:简化SpringMvcSupport的编写 3 拦截器参数3.1 前置处理方法3.2 后置处理方…

使用frp进行内网穿透(远程连接内网服务器)

文章目录 云服务购买服务器端&#xff08;即我们购买的服务器&#xff09;配置客户端&#xff08;即我们自己的服务器&#xff09;配置使用xshell登录远程服务器在服务器端设置frp开机自启动在客户端设置frp开机自启动 这里主要介绍使用frp工具进行内网穿透&#xff0c;适合的场…

Spring6和SpringBoot3的新特性-你不得不了解的AOT原来这么简单

Spring6.0新特性 一、Spring的发展历史 二、AOT AOT是Spring6.0提供的一个新特性&#xff0c;Ahead of Time 提前编译。 1.AOT概述 1.1 JIT和AOT的关系 1.1.1 JIT JIT(Just-in-time) 动态编译&#xff0c;即时编译&#xff0c;也就是边运行边编译&#xff0c;也就是在程序运…

LangChain与大型语言模型(LLMs)应用基础教程:角色定义

如果您还没有看过我之前写的两篇博客&#xff0c;请先看一下&#xff0c;这样有助于对本文的理解&#xff1a; LangChain与大型语言模型(LLMs)应用基础教程:Prompt模板 LangChain与大型语言模型(LLMs)应用基础教程:信息抽取 LangChain是大型语言模型(LLM)的应用框架,LangCha…

RK3568平台开发系列讲解(LCD篇)快速了解RK LCD的使用

🚀返回专栏总目录 文章目录 一、内核Config配置二、MIPI配置2.1 引脚配置2.2 背光配置2.3 显示时序配置2.3.1 Power on/off sequence2.3.2 Display-Timings三、EDP配置3.1 引脚配置3.2 EDP背光配置沉淀、分享、成长,让自己和他人都能有所收获!😄 📢 本篇章带大家快速了…

数据结构_查找

目录 1. 查找的基本概念 2. 顺序查找和折半查找 2.1 顺序查找 2.1.1 一般线性表的顺序查找 2.1.2 有序表的顺序查找 2.2 折半查找 2.3 分块查找 2.4 相关练习 3. 树型查找 3.1 二叉排序树 3.1.1 二叉排序树的定义 3.1.2 二叉排序树的查找 3.1.3 二叉排序树…

想要一个本地部署的海洋实景三维展示系统吗?

最近几年实景三维非常火&#xff0c;很多人包括博主都想将自己平时干的海洋测绘项目进行实景三维化&#xff0c;这样做的好处就是无论是管理数据还是成果展示都非常方便。我们可能会使用谷歌地图、奥维地图、图新地球等地图服务软件&#xff0c;它们也提供了一些测量、画图功和…

使用Hexo在Github上搭建个人博客

使用Hexo在Github上搭建个人博客 1. 安装Node和git2. 安装Hexo3. Git与Github的准备工作4. 将Hexo部署到Github5. 开始写作 1. 安装Node和git 在Mac上安装Node.js可以使用Homebrew&#xff0c;使用以下命令安装&#xff1a; brew install node使用以下命令安装Git&#xff1a; …

解决 Uncaught TypeError: SpriteCanvasMaterial is not a constructor.

文章目录 前言一、替代语法总结 前言 上周买了本《Three.js开发指南》, 第三版, 里面的语法不太跟趟, 有点旧, 倒也不能全怪作者, three迭代的确很快. 一、替代语法 这几天没事做, 加上前面本来就接触过Three, 很快进展到了第六章. 在推进 利用Canvas贴图给精灵(Sprite)增加样…

研发工程师玩转Kubernetes——启用microk8s的监控面板(dashboard)

安装插件 microk8s enable dashboard 查看dashboard 地址 由于dashboard是在kube-system的namespace中&#xff0c;我们可以使用下面指令查看它服务的地址。 microk8s kubectl get service -n kube-system kubernetes-dashboard 可以得到地址是https://10.152.183.167。 登…

Android Jetpack-Databinding基本使用

文章目录 让你的项目支持Databinding基本使用布局和绑定表达式常用运算符判空null获取对象属性避免空指针异常其他控件引用资源引用 事件处理import,variables,and includesimportVariablesIncludes 数据更新->UI更新监听字段变化更新监听对象变化更新 UI更新->数据更新绑…