MapReduce实战案例(2)

news2024/11/24 19:01:42

案例二: MR实战之数据分类输出(自定义outputFormat)

2.1 项目准备

  1. 需求

现有一些原始日志需要做增强解析处理,流程:

  • a) 从原始日志文件中读取数据
  • b) 根据日志中的一个URL字段到外部知识库中获取信息增强到原始日志
  • c) 如果成功增强,则输出到增强结果目录;如果增强失败,则抽取原始数据中URL字段输出到待爬清单目录
  1. 测试数据

  2. 分析

程序的关键点是要在一个MapReduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可以通过自定义outputformat来实现

2.2 项目实现

  1. 实现要点

    a) 在MapReduce中访问外部资源

    b) 自定义outputformat,改写其中的recordwriter,改写具体输出数据的方法write()

  2. 代码实现如下

    a) 数据库获取数据的工具

/**
 * @Author 千锋大数据教学团队
 * @Company 千锋好程序员大数据
 * @Description 
 */
public class DBLoader {

    public static void dbLoader(HashMap<String, String> ruleMap) {
        Connection conn = null;
        Statement st = null;
        ResultSet res = null;

        try {
            Class.forName("com.mysql.jdbc.Driver");
            conn = DriverManager.getConnection("jdbc:mysql://hdp-node01:3306/urlknowledge", "root", "root");
            st = conn.createStatement();
            res = st.executeQuery("select url,content from urlcontent");
            while (res.next()) {
                ruleMap.put(res.getString(1), res.getString(2));
            }
        } catch (Exception e) {
            e.printStackTrace();

        } finally {
            try{
                if(res!=null){
                    res.close();
                }
                if(st!=null){
                    st.close();
                }
                if(conn!=null){
                    conn.close();
                }

            }catch(Exception e){
                e.printStackTrace();
            }
        }
    }


    public static void main(String[] args) {
        DBLoader db = new DBLoader();
        HashMap<String, String> map = new HashMap<String,String>();
        db.dbLoader(map);
        System.out.println(map.size());
    }
}
复制代码

文末扫码领取福利! 

b) 自定义一个outputformat

/**
 * @Author 千锋大数据教学团队
 * @Company 千锋好程序员大数据
 * @Description 自定义一个outputformat 
 */
public class LogEnhancerOutputFormat extends FileOutputFormat<Text, NullWritable>{

    
    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {


        FileSystem fs = FileSystem.get(context.getConfiguration());
        Path enhancePath = new Path("hdfs://hdp-node01:9000/flow/enhancelog/enhanced.log");
        Path toCrawlPath = new Path("hdfs://hdp-node01:9000/flow/tocrawl/tocrawl.log");
        
        FSDataOutputStream enhanceOut = fs.create(enhancePath);
        FSDataOutputStream toCrawlOut = fs.create(toCrawlPath);
        
        
        return new MyRecordWriter(enhanceOut,toCrawlOut);
    }
    
    
    
    static class MyRecordWriter extends RecordWriter<Text, NullWritable>{
        
        FSDataOutputStream enhanceOut = null;
        FSDataOutputStream toCrawlOut = null;
        
        public MyRecordWriter(FSDataOutputStream enhanceOut, FSDataOutputStream toCrawlOut) {
            this.enhanceOut = enhanceOut;
            this.toCrawlOut = toCrawlOut;
        }

        @Override
        public void write(Text key, NullWritable value) throws IOException, InterruptedException {
             
            //有了数据,你来负责写到目的地  —— hdfs
            //判断,进来内容如果是带tocrawl的,就往待爬清单输出流中写 toCrawlOut
            if(key.toString().contains("tocrawl")){
                toCrawlOut.write(key.toString().getBytes());
            }else{
                enhanceOut.write(key.toString().getBytes());
            }
                
        }

        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
             
            if(toCrawlOut!=null){
                toCrawlOut.close();
            }
            if(enhanceOut!=null){
                enhanceOut.close();
            }
            
        }
        
        
    }
}
复制代码

c) 开发MapReduce处理流程

/**
 * @Author 千锋大数据教学团队
 * @Company 千锋好程序员大数据
 * @Description 这个程序是对每个小时不断产生的用户上网记录日志进行增强(将日志中的url所指向的网页内容分析结果信息追加到每一行原始日志后面)
 */
public class LogEnhancer {

    static class LogEnhancerMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

        HashMap<String, String> knowledgeMap = new HashMap<String, String>();

        /**
         * maptask在初始化时会先调用setup方法一次 利用这个机制,将外部的知识库加载到maptask执行的机器内存中
         */
        @Override
        protected void setup(org.apache.hadoop.MapReduce.Mapper.Context context) throws IOException, InterruptedException {

            DBLoader.dbLoader(knowledgeMap);

        }

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String line = value.toString();

            String[] fields = StringUtils.split(line, "\t");

            try {
                String url = fields[26];

                // 对这一行日志中的url去知识库中查找内容分析信息
                String content = knowledgeMap.get(url);

                // 根据内容信息匹配的结果,来构造两种输出结果
                String result = "";
                if (null == content) {
                    // 输往待爬清单的内容
                    result = url + "\t" + "tocrawl\n";
                } else {
                    // 输往增强日志的内容
                    result = line + "\t" + content + "\n";
                }

                context.write(new Text(result), NullWritable.get());
            } catch (Exception e) {

            }
        }

    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);

        job.setJarByClass(LogEnhancer.class);

        job.setMapperClass(LogEnhancerMapper.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        // 要将自定义的输出格式组件设置到job中
        job.setOutputFormatClass(LogEnhancerOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        // 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat
        // 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
        System.exit(0);
    }
}

 

 可以观看视频:

千锋大数据Hadoop全新增强版-先导片

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/588143.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

提升日期处理效率:day.js 实战经验分享

theme: smartblue 本文简介 点赞 关注 收藏 学会了 本文主要介绍我在工作中使用 day.js 较多的方法。本文并不能代替 day.js 官方文档&#xff0c;日常工作中该查文档的还是要查文档。本文是写给刚接触 day.js 的工友&#xff0c;让这部分工友能更顺利上手 day.js。本文不涉…

SMESwin Unet:融合CNN和Transformer进行医学图像分割

文章目录 SMESwin Unet: Merging CNN and Transformer for Medical Image Segmentation摘要本文方法SuperpixelMCCT SMESwin Unet: Merging CNN and Transformer for Medical Image Segmentation 摘要 视觉Transformer 是自去年以来医学图像分割领域最受欢迎的新范式&#xf…

ts报错“this“ 隐式具有类型 “any“,因为它没有类型注释。解决方案

序 1、参考博文》①严格模式 - 知乎&#xff0c;②ts的tsconfig.son中文说明③TypeScript Number | 菜鸟教程 2、解决&#xff08;ts报错“this“ 隐式具有类型 “any“&#xff0c;因为它没有类型注释。&#xff09; 3、解决&#xff08;函数内this是undefined 的问题&#xf…

汽车EDI:如何与Stellantis建立EDI连接?

Stellantis 是一家实力雄厚的汽车制造公司&#xff0c;由法国标致雪铁龙集团&#xff08;PSA集团&#xff09;和意大利菲亚特克莱斯勒汽车集团&#xff08;FCA集团&#xff09;合并而成&#xff0c;是世界上第四大汽车制造商&#xff0c;拥有包括标致、雪铁龙、菲亚特、克莱斯勒…

Hive学习---1、Hive入门、Hive 安装

1、Hive入门 1.1 什么是Hive 1、Hive简介 Hive是由Facebook开源&#xff0c;基于Hadoop的一个数据仓库工具&#xff0c;可以将结构化的数据文件映射为一张表&#xff0c;并提供类SQL查询功能。 2、Hive本质 Hive是一个Hadoop客户端&#xff0c;用于将HQL&#xff08;Hive SQL…

【六一为孩子建模吧】沐风老师3DMAX建模雕刻插件SculptTool使用教程

3DMAX建模雕刻插件&#xff0c;该工具旨在使对角色和地形等有机模型进行小型编辑成为可能&#xff0c;而无需离开3dMax并启动如ZBrush等专用雕刻应用程序&#xff0c;就可以在3DMAX中直接对小型模型进行简单的雕刻建模处理&#xff0c;这样会方便很多。 【适用版本】 3dMax202…

【C++初阶】:string类

string 一string的基本用法二.迭代器1.基本使用2.语法糖3.反向迭代器4.const迭代器 三.容量四.插入和删除五.一个例题&#xff1a;解析协议六.读取空格七.一些其他函数 一string的基本用法 文档里 一般使用 二.迭代器 1.基本使用 string里重载了一种非常厉害的运算符[ ] 这个运…

【web安全】文件包含漏洞

目录 1.什么是文件包含漏洞 2.产生原因 3.文件包含的类型 3.1本地文件包含 3.2远程文件包含 4.攻击利用手法 4.1 file:协议 4.2 php://协议 ​4.3 zip://,bzip2://,zlib://协议 4.4 data://协议 4.5 PHP伪协议总结 5.如何防御&#xff1f; 6.常见系统的默认路径…

Tcl-11. 列表操作

Tcl 中的列表操作&#xff1a; 列表则是具有特殊解释的字符串。Tcl 中的列表操作和其它 Tcl 命令 一样具有相同的结构。 列表可应1用在诸如 foreach 这样的以列表为变元的循环命令中&#xff0c;也应于构建 eval 命令的延迟命令字符串。 一、list 命令 list 命令用来创建列表…

城市内涝的解决措施,城市内涝积水监测预警系统解决方案

随着城市化进程的加快&#xff0c;城市土地面积不断扩大&#xff0c;原本吸收雨水的土地被水泥、沥青等硬质材料所取代&#xff0c;导致雨水无法迅速渗透和排泄&#xff0c;增加了城市内涝的风险。同时&#xff0c;气候变化带来的极端降雨事件频率增加&#xff0c;更加加剧了内…

mysql触发器监听数据投递中间件

目前市面上有许多的 CDC&#xff08;Change Data Capture&#xff09; 框架用于监听数据库的数据变动&#xff0c;例如&#xff1a;canal、Debezium、Maxwell等都是用来解析 binlog 日志实现事件的监听。但是有一个情况就是如果公司对 binlog 日志文件的权限管控的很严格&#…

传统ERP和SaaS ERP区别在哪里?

一、ERP和SaaS ERP概念 企业资源计划 (ERP) 系统是许多企业的主干&#xff0c;助力管理整个企业内的会计、采购流程、项目等。对于许多 IT 部门而言&#xff0c;ERP 系统通常意味着大型、昂贵且耗时的部署&#xff0c;并可能需要进行大量硬件或基础设施投资。然而&#xff0c;…

基于标准库函数的STM32的freertos的移植(三)——MDK工程搭建、配置与修改

1.打开MDK5软件&#xff0c;新建MDK工程&#xff0c;将新建工程文件保存在Project_Stm32f407/mdk文件夹下&#xff0c;并将工程命名为freertos_M4&#xff0c;选择MCU型号为STM32F407ZG&#xff0c;新建工程文件的步骤如下图所示&#xff1a; 图1 新建工程 图2 保存工程路径和工…

ChatGPT-AI地图

ChatGPT-AI地图 1、AI-对话 应用名称应用地址ChatGPThttps://chat.openai.com/NotionAINotion AIA.I. Data Sidekick&#xff1a;AI工具编写 SQL、文档等的速度提高10倍[AirOpsWritesonic&#xff1a;人工智能写作辅助工具Writesonic - Best AI Writer, Copywriting & Par…

《深入理解计算机系统(CSAPP)》第3章 程序的机器级表示 - 学习笔记

写在前面的话&#xff1a;此系列文章为笔者学习CSAPP时的个人笔记&#xff0c;分享出来与大家学习交流&#xff0c;目录大体与《深入理解计算机系统》书本一致。因是初次预习时写的笔记&#xff0c;在复习回看时发现部分内容存在一些小问题&#xff0c;因时间紧张来不及再次整理…

MySQL 数值函数

文章目录 数值函数1. abs(num)2. ceil(num)3. floor(num)4. mod(num1,num2)5. rand()6. round(num,n)7. truncate(num,n)8. sqrt(num) 数值函数 数值函数用来处理数值方面的运算&#xff0c;能够提高用户的工作效率。常用的数值函数如下表所示&#xff0c;函数括号内为输入的参…

牛客网基础语法11~20题

前言&#xff1a;今天是咱们第二期刷牛客网上的题目。 目标&#xff1a;对输入输出的格式&#xff0c;方法&#xff0c;类型掌握熟练&#xff0c;对double&#xff0c;float理解深入&#xff0c;编程思想更进一步。 鸡汤&#xff1a;人活着&#xff0c;再苦再累&#xff0c;都别…

16-Vue技术栈之常用的 Composition API

目录 1、什么是组合式 API&#xff1f;2、拉开序幕的setup3、ref函数4、reactive函数5、Vue3.0中的响应式原理5.1 vue2.x的响应式5.2 Vue3.0的响应式 6、reactive对比ref7、setup的两个注意点8、计算属性与监视8.1 computed函数8.2 watch函数8.3 watchEffect函数 9、 生命周期1…

防雷接地的施工工艺与防雷施工方案

雷电是自然界的一种强大而危险的自然现象&#xff0c;经常造成重大财产损失和人员伤亡。为了保护建筑物和人员免受雷电的危害&#xff0c;防雷接地系统的设计和施工至关重要。本文将介绍防雷接地的施工工艺和防雷施工方案&#xff0c;强调专业和符合国家标准的方法&#xff0c;…

chatgpt赋能python:Python中乘方运算符号:用于数学计算和科学计算

Python中乘方运算符号&#xff1a;用于数学计算和科学计算 简介 乘方运算是Python中常用的数学运算符&#xff0c;通常在数学计算和科学计算中使用。在Python中&#xff0c;乘方运算符用**表示。该运算符用于计算数字的指数幂。 用法 乘方运算符可以用于两个数字之间的计算…