Hadoop3:MapReduce中Reduce阶段自定义OutputFormat逻辑

news2025/1/11 1:39:22

一、情景描述

我们知道,在MapTask阶段开始时,需要InputFormat来读取数据
而在ReduceTask阶段结束时,将处理完成的数据,输出到磁盘,此时就要用到OutputFormat

在之前的程序中,我们都没有设置过这部分配置
所以,采用的是默认输出格式:TextOutputFormat

在实际工作中,我们的输出不一定是到磁盘,可能是输出到MySQL、HBase

那么,如何实现自定义的OutputFormat
在这里插入图片描述

二、案例

1、源数据

http://www.baidu.com
http://www.google.com
http://cn.bing.com
http://www.atguigu.com
http://www.sohu.com
http://www.baidu.com
http://www.sina.com
http://www.sin2a.com
http://www.baidu.com
http://www.sin2desa.com
http://www.sindsafa.com

2、需求分析

过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log

3、代码实现

LogMapper.java

package com.atguigu.mapreduce.outputformat;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class LogMapper extends Mapper<LongWritable, Text,Text, NullWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // http://www.baidu.com
        //http://www.google.com
        // (http://www.google.com, NullWritable)
        // 不做任何处理
        context.write(value, NullWritable.get());
    }
}

LogReducer.java

package com.atguigu.mapreduce.outputformat;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class LogReducer extends Reducer<Text, NullWritable, Text, NullWritable> {

    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {

        // http://www.baidu.com
        // http://www.baidu.com
        // 防止有相同数据,丢数据
        for (NullWritable value : values) {
            context.write(key, NullWritable.get());
        }
    }
}

LogRecordWriter.java

package com.atguigu.mapreduce.outputformat;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

public class LogRecordWriter extends RecordWriter<Text, NullWritable> {

    private  FSDataOutputStream atguiguOut;
    private  FSDataOutputStream otherOut;

    public LogRecordWriter(TaskAttemptContext job) {
        // 创建两条流
        try {
            FileSystem fs = FileSystem.get(job.getConfiguration());

            atguiguOut = fs.create(new Path("D:\\hadoop\\atguigu.log"));

            otherOut = fs.create(new Path("D:\\hadoop\\other.log"));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void write(Text key, NullWritable value) throws IOException, InterruptedException {
        String log = key.toString();

        // 具体写
        if (log.contains("atguigu")){
            atguiguOut.writeBytes(log+"\n");
        }else {
            otherOut.writeBytes(log+"\n");
        }
    }

    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        // 关流
        IOUtils.closeStream(atguiguOut);
        IOUtils.closeStream(otherOut);
    }
}

LogOutputFormat.java

package com.atguigu.mapreduce.outputformat;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {
    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {

        LogRecordWriter lrw = new LogRecordWriter(job);

        return lrw;
    }
}

LogDriver.java

package com.atguigu.mapreduce.outputformat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class LogDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(LogDriver.class);
        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        //设置自定义的outputformat
        job.setOutputFormatClass(LogOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path("D:\\input\\inputoutputformat"));
        //虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat
        //而fileoutputformat要输出一个_SUCCESS文件,所以在这还得指定一个输出目录
        FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\output1111"));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);

    }
}

3、测试

在这里插入图片描述
在这里插入图片描述

三、总结

关键文件:
LogRecordWriter.java
LogOutputFormat.java
LogDriver.java

        //设置自定义的outputformat
        job.setOutputFormatClass(LogOutputFormat.class);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1846743.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

高速公路声光预警定向广播助力安全出行

近年来&#xff0c;高速重大交通事故屡见不鲜&#xff0c;安全管控一直是高速运营的重中之重。如何利用现代化技术和信息化手段&#xff0c;创新、智能、高效的压降交通事故的发生概率&#xff0c;优化交通安全管控质量&#xff0c;是近年来交管部门的主要工作&#xff0c;也是…

在智星云租用算力时,如何选择适合的GPU?

智星云平台分配GPU、CPU、内存的机制为&#xff1a;按租用的GPU数量成比例分配CPU和内存&#xff0c;算力市场显示的CPU和内存均为每GPU分配的CPU和内存&#xff0c;如果租用两块GPU&#xff0c;那么CPU和内存就x2。此外GPU非共享&#xff0c;每个实例对GPU是独占的。 一. CPU…

A800显卡驱动安装(使用deb安装)

重新安装显卡驱动&#xff0c;查阅了资料将过程记录如下&#xff1a; 1.下载deb安装包 打开nvidia官网查找对应的驱动版本&#xff0c;A800所在的选项卡位置如图&#xff1a; 点击查找后下载得到的是nvidia-driver-local-repo-ubuntu2004-550.90.07_1.0-1_amd64.deb安装包 2.…

JMeter的基本概念

一、主流测试工具 1&#xff0c;Loadrunner HP Loadrunner是一种工业级标准性能测试负载工具&#xff0c;可以模拟上万用户实施测试&#xff0c;并在测试时可实时检测应用服务器及服务器硬件各种数据&#xff0c;来确认和查找存在的瓶颈 支持多协议:Web(HTTP/HTML)、Windows…

文件上传漏洞-上篇

一、概述 文件上传漏洞可以说是日常渗透测试中用得最多的一个漏洞&#xff0c;用它获得服务器权限最快最直接。在web程序中&#xff0c;经常需要用到文件上传的功能。如用户或者管理员上传图片&#xff0c;或者其它文件。如果没有限制上传类型或者限制不严格被绕过&#xff0c…

网络安全之Windows提权(上篇)(高级进阶)

目录 一&#xff0c;什么是提权&#xff1f; 二&#xff0c;提权的前提 三&#xff0c;如何提权&#xff1f; 1&#xff0c;第一步连接服务器 2&#xff0c;提升权限至iuser​编辑 3&#xff0c;利用补丁漏洞提权至最高级 四&#xff0c;总结 一&#xff0c;什么是提权&am…

php上传zip压缩包到服务器并解压,解析压缩包内excel表格数据导入到数据库

需求: 1.需要管理后台将excel表格中的每条单词数据导入到数据库中. 2.每条单词数据对应的图片和音频文件需要上传到服务器中. 为了让客户上传数据方便,考虑了一下决定通过后台上传压缩包的方式实现 测试压缩包: 压缩包的目录结构 管理后台导入教材 public function upload…

用了这么久的群晖NAS,它到底能干些什么?

从21年开始玩群晖也有几年了&#xff0c;除非面临断电或升级&#xff0c;这个小伙伴都任劳任怨的工作着 现在NAS也广泛应用于家庭和企业环境中了&#xff0c;今天盘点一下我用群晖NAS都干了些什么~ 1.文件存储与共享&#xff1a; 群晖NAS可以作为文件服务器&#xff0c;提供…

stable diffusion 模型和lora融合

炜哥的AI学习笔记——SuperMerger插件学习 - 哔哩哔哩接下来学习的插件名字叫做 SuperMerger,它的作用正如其名,可以融合大模型或者 LoRA,一般来说会结合之前的插件 LoRA Block Weight 使用,在调整完成 LoRA 模型的权重后使用改插件进行重新打包。除了 LoRA ,Checkpoint 也…

Redis-数据类型-Geospatial(地理空间索引)

文章目录 1、查看redis是否启动2、通过客户端连接redis3、切换到db5数据库4、将地理位置信息&#xff08;经度和纬度&#xff09;添加到 Redis 的键&#xff08;key&#xff09;中4.1、添加大江商厦4.2、添加西部硅谷 5、升序返回有序集key&#xff0c;让分数一起和值返回的结果…

Java宝藏实验资源库(3)类

一、实验目的 理解面向对象程序的基本概念。掌握类的继承的实现机制。熟悉类中成员的访问控制方法。熟悉ArrayList类的使用。 二、实验内容、过程及结果 *9.5Programming Exerc ise the GregorianCal endar class) Java API has the GregorianCalendar class in the java. uti…

Windows11 24H2网络功能全新升级:全面支持Wi-Fi 7!

Windows11 24H2版本不仅推出了很多新功能&#xff0c;也全面升级了网络功能&#xff0c;全面支持Wi-Fi 7&#xff0c;带给用户最快的网速体验&#xff0c;还支持用户通过二维码分享Wi-Fi密码&#xff0c;操作更加便捷&#xff0c;也更新了SMB、LAPS和NDR协议。接下来跟随小编去…

《C++ Primer》导学系列:第 7 章 - 类

7.1 定义抽象数据类型 7.1.1 类的基本概念 在C中&#xff0c;类是用户定义的类型&#xff0c;提供了一种将数据和操作这些数据的函数&#xff08;成员函数&#xff09;组合在一起的方法。类定义了对象的属性和行为&#xff0c;通过实例化类来创建对象。 7.1.2 定义类 定义类…

ru俄罗斯域名如何申请SSL证书?

我们日常看到的都是com这种国际域名比较普遍&#xff0c;尤其是主流网站&#xff0c;主要原因考虑的其通用性&#xff0c;那么对于地方性的域名大家很少看到&#xff0c;比如俄罗斯国家域名.ru大家还是有些陌生的&#xff0c;但要说中国.CN域名那你就很熟悉了。 有用户在申请过…

Anthropic AI模型Claude 3.5 Sonnet在Amazon Bedrock上正式可用

Claude 3.5 Sonnet是Anthropic最先进的Claude系列AI模型的新成员&#xff0c;比Claude 3 Opus更智能且价格只有其五分之一 北京——2024年6月21日 亚马逊云科技宣布&#xff0c;Anthropic最新、最强大的模型Claude 3.5 Sonnet现已在Amazon Bedrock上正式可用&#xff0c;该模型…

Syslog日志外发

Syslog是一种广泛应用于网络设备、操作系统和应用程序的日志通信协议&#xff0c;通过收集、监控和分析Syslog日志&#xff0c;企业可以有效维护网络安全、故障排除和运营管理。 除了内部监控&#xff0c;有时企业也需要将Syslog日志外发以实现更多的管理和合规需求。在实现Sy…

C#委托:事件驱动编程的基石

目录 了解委托 委托使用的基本步骤 声明委托(定义一个函数的原型&#xff1a;返回值 参数类型和个数&#xff09; 根据委托定义的函数原型编写需要的方法 创建委托对象&#xff0c;关联“具体方法” 通过委托调用方法&#xff0c;而不是直接使用方法 委托对象所关联的方…

YOLOv8改进 | FPN | 新型上采样算子CARAFE【全网独家】

&#x1f4a1;&#x1f4a1;&#x1f4a1;本专栏所有程序均经过测试&#xff0c;可成功执行&#x1f4a1;&#x1f4a1;&#x1f4a1; 专栏目录 &#xff1a;《YOLOv8改进有效涨点》专栏介绍 & 专栏目录 | 目前已有40篇内容&#xff0c;内含各种Head检测头、损失函数Loss、…

构建个人文件上传服务:Python Flask实现上传和下载完整指南

介绍 在本教程中&#xff0c;我们将学习如何使用Python Flask框架将文件上传到服务器&#xff0c;并使用SQLite数据库来跟踪上传的文件。我们将提供后端代码和一个示例项目的Git链接&#xff0c;以便您可以轻松地跟随本教程。 准备工作 首先&#xff0c;您需要安装Python和F…

揭秘无局放电源:定义、工作原理及其在工业设备中的重要性

当代社会对电源安全性&#xff0c;精密性要求越来也高&#xff0c;对电源设备的需求也越来越高。无局放电源作为电源设备中的一种&#xff0c;由于其独特的优点&#xff0c;越来越受到各行各业的关注。目前&#xff0c;无局放电源在全球范围内得到了广泛应用&#xff0c;尤其是…