实验三 MapReduce编程

news2025/1/9 19:42:49

实验目的:

1.掌握MapReduce的基本编程流程;

2.掌握MapReduce序列化的使用;

实验内容:

一、在本地创建名为MapReduceTest的Maven工程,在pom.xml中引入相关依赖包,配置log4j.properties文件,搭建windwos开发环境。 编程实现以下内容:

(1)创建com.nefu.(xingming).maxcount包,编写wordcountMapper、Reducer、Driver三个类,实现统计每个学号的最高消费。

输入数据data.txt格式如下:

          序号 \t 学号 \t  日期  \t  消费总额

输出数据格式要求如下:

          学号  \t  最高消费 

ZnMapper.java

package com.nefu.zhangna.maxcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


public class ZnMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
    private Text outk=new Text();
    private IntWritable outv=new IntWritable();
    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line=value.toString();
        String[] content=line.split("\t");
        String schoolnumber=content[1];
        String totalFee=content[3];
        outk.set(schoolnumber);
        outv.set(Integer.parseInt(totalFee));
        context.write(outk,outv);
    }
}

ZnReducer.java

package com.nefu.zhangna.maxcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


import java.io.IOException;

public class ZnReducer extends Reducer<Text,IntWritable,Text, IntWritable> {
    private IntWritable outv=new IntWritable();
    @Override
    protected void  reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
        int total=0;
        for (IntWritable value:values){
            if(value.get()>total)
            total=value.get();
        }
        outv.set(total);
        context.write(key,outv);
    }
}

ZnDriver.java

package com.nefu.zhangna.maxcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;


public class ZnDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        Configuration configuration=new Configuration();
        Job job=Job.getInstance(configuration);
        //FileSystem fs=FileSystem.get(new URI("hdfs://hadoop101:8020"),configuration,"hadoop");
        //fs.copyFromLocalFile(new Path("D://mapreducetest//data.txt"),new Path("/zn/data.txt"));
        job.setJarByClass(ZnDriver.class);
        job.setMapperClass(ZnMapper.class);
        job.setReducerClass(ZnReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //job.setOutputKeyClass(Text.class);
        //job.setOutputValueClass(StudentBean.class);
       // job.setInputFormatClass(CombineTextInputFormat.class);   //否则默认是TextInputFormat.class
        //CombineTextInputFormat.setMaxInputSplitSize(job,4194304);   //设4M
        FileInputFormat.setInputPaths(job,new Path("D:\\mapreducetest\\data.txt"));
        FileOutputFormat.setOutputPath(job,new Path("D:\\cluster\\shiyan3-1"));
        boolean result=job.waitForCompletion(true);
        System.exit(result?0:1);
    }
}

(2)测试上述程序,查看运行结果

原数据

mapreduce之后

(3)查看日志,共有几个切片,几个MapTask(截图)

Number of split表示有一个切片,Starting task: attempt_local649325949_0001_m_000000_0表示有一个Map Tast任务

(4)添加文件data1.txt,重新运行程序,共有几个切片,几个MapTask(截图)

可见我输入了两个文件,切片的数目为2,也就有两个Map Text任务

(5)使用CombinTextInputFormat,让data.txt,data1.txt两个文件在一个切片中

在驱动类中CombinTextInputFormat可见只有一个切片

(6)将data.txt上传至HDFS

(7)使用maven将程序打成jar包并上传至hadoop集群运行,观察是否能正确运行。

 用 maven jar 包,需要添加的打包插件依赖, pom.xml
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

将程序打成jar

上传 jar 包到 hadoop101 中的 /opt/module/hadoop-3.1.3/testcode 目录
确保 hadoop 集群已经正常启动,运行 jar 文件
java运行环境有问题

二、创建com.nefu.(xingming).serialize包,编写ScoreBean、Mapper、Reducer、Driver三个类,实现统计每个学号的平均成绩。并将结果按照年级分别写到三个文件中。

输入数据mydata.txt文件格式:

学号  \t  姓名  \t   成绩

输出数据格式(共3个文件):

学号   \t  姓名  \t   平均成绩

MyPartition

package com.nefu.zhangna.serialize;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;


public class MyPartition extends Partitioner<Text,ScoreBean > {
    @Override
    public int getPartition(Text text,ScoreBean studentBean,int numPartitions) {
        String snum = text.toString();
        int partition;
        if (snum.contains("2021")) {
            partition = 0;
        } else if (snum.contains("2022")) {
            partition = 1;
        } else{
            partition=2;
        }
        return partition;
    }
}

Scorebean

package com.nefu.zhangna.serialize;

import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public  class ScoreBean implements Writable{
    private String name;
    private Double score;
    public ScoreBean(){
    }
    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Double getScore() {
        return score;
    }

    public void setScore(Double score) {
        this.score = score;
    }
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(name);
        out.writeDouble(score);
    }
    @Override
    public  void readFields(DataInput in) throws IOException {
        this.name=in.readUTF();
        this.score=in.readDouble();
    }
    @Override
    public String toString(){
        return this.name+"\t"+this.score;
    }
}

ZnMapper1

package com.nefu.zhangna.serialize;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class ZnMapper1 extends Mapper<LongWritable, Text, Text,ScoreBean> {
    private Text outk=new Text();
    private ScoreBean outv=new ScoreBean();
    @Override
    protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {
        String line=value.toString();
        String[] content=line.split("\t");
        String  schoolnumber=content[0];
        String name=content[1];
        String score=content[2];
        outk.set(schoolnumber);
        outv.setName(name);
        outv.setScore(Double.parseDouble(score));
        context.write(outk,outv);
    }
}

ZnReducer1

package com.nefu.zhangna.serialize;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class ZnReducer1 extends Reducer<Text, ScoreBean,Text,ScoreBean> {
    private ScoreBean outv=new ScoreBean();
    @Override
    protected void reduce(Text key,Iterable<ScoreBean> values,Context context) throws IOException, InterruptedException {
        double score=0;
        int sum=0;
        String name = null;
        for (ScoreBean value:values){
            sum=sum+1;
            score=score+value.getScore();
            name=value.getName();
        }
        outv.setName(name);
        outv.setScore(score/sum);
        context.write(key,outv);
    }
}

ZnDriver1

package com.nefu.zhangna.serialize;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


import java.io.IOException;

public class ZnDriver1 {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration=new Configuration();
        Job job=Job.getInstance(configuration);
        job.setJarByClass(ZnDriver1.class);
        job.setMapperClass(ZnMapper1.class);
        job.setReducerClass(ZnReducer1.class);
        job.setMapOutputKeyClass(Text.class);
        job.setOutputValueClass(ScoreBean.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(ScoreBean.class);
        job.setPartitionerClass(MyPartition.class);
        job.setNumReduceTasks(3);
        FileInputFormat.setInputPaths(job,new Path("D:\\mapreducetest\\mydata.txt"));
        FileOutputFormat.setOutputPath(job,new Path("D:\\cluster\\serialize"));
        boolean result=job.waitForCompletion(true);
        System.exit(result?0:1);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1323354.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

个人老师可直接使用的在线授课软件

大家好&#xff0c;我是 Java陈序员。 大学四年&#xff0c;疫情就占了三年&#xff01; 以前小时候曾经梦想着不用去学校上课&#xff0c;在家就能上课&#xff0c;这不前几年疫情的时候就成为了现实&#xff01; 随着互联网的兴起&#xff0c;各种线下的活动都可以搬到线上…

sql_lab靶场搭建以及存在的一些问题

sql_lab靶场搭建问题 首先检查小皮版本 把小皮改到5.3.29版本如果没有可以直接点击更多版本进行选择安装 当版本不对时则会暴出这种错误 SETTING UP THE DATABASE SCHEMA AND POPULATING DATA IN TABLES: Fatal error: Uncaught Error: Call to undefined function mysql_co…

SSH的交互原理(wireshark的分析)

SSH的交换原理&#xff08;wireshark篇&#xff09; 首先要想了解ssh的交换原理&#xff0c;必须要先了解他的加密方式&#xff0c;他的加密方式是对称加密&#xff0c;和公钥加密。什么意思呢&#xff1f; 首先我们向服务器发送一个请求&#xff0c;然后服务器会发给我们他的…

5个免费、跨平台的SQLite数据库可视化工具

前言 SQLite是一个轻量级的嵌入式关系型数据库&#xff0c;目前最新的版本是 SQLite3。今天推荐5个实用的SQLite数据库可视化工具(GUI)&#xff0c;帮助大家更好的管理SQLite数据库。 什么是SQLite&#xff1f; SQLite是一个轻量级的嵌入式关系型数据库&#xff0c;它以一个…

3d游戏公司选择云电脑进行云办公有哪些优势

随着游戏行业的不断发展&#xff0c;很多的游戏制作公司也遇到了很多的难题&#xff0c;比如硬件更换成本高、团队协同难以及效率低下等问题&#xff0c;那么如何解决游戏行业面临的这些行业痛点&#xff0c;以及游戏制作公司选择云电脑进行云办公有哪些优势&#xff1f;一起来…

Word的兼容性问题很常见,禁用兼容模式虽步不是最有效的,但可以解决兼容性问题

当你在较新版本的Word应用程序中打开用较旧版本的Word创建的文档时&#xff0c;会出现兼容性问题。错误通常发生在文件名附近&#xff08;兼容模式&#xff09;。兼容性模式问题&#xff08;暂时&#xff09;禁用Word功能&#xff0c;从而限制使用较新版本Word的用户编辑文档。…

查看git的帮助信息

说明 在cmd窗口、或者git Bash shell下执行git --help或者git -h命令&#xff0c;可以查看git的帮助信息。 执行git <command> --help命令可以查看某个命令的帮助信息&#xff0c;其中<command>表示某个具体的命令。 示例1&#xff1a;在git Bash shell下运行git…

JavaOOP篇----第六篇

系列文章目录 文章目录 系列文章目录前言一、String 是最基本的数据类型吗?二、float f=3.4;是否正确?三、short s1 = 1; s1 = s1 + 1;有错吗?short s1 = 1; s1 +=1; 有错吗?四、重载和重写的区别前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住…

嵌入式Linux学习(3)——中断(Interrupt)子系统概念

目录 一. 中断概念与分类 1.1 中断分类 1.2 中断事件的处理流程 1.3 中断号(IRQ number) 1.4 中断源(Interrupt Source) 1.5 中断触发方式 二. 中断子系统架构 2.1 GIC 2.2 中断子系统架构 2.3 GIC与IP 2.3.1 典型GIC IP PLC390 GIC 400 GIC 500 REF 一. 中断概念与…

Git初始

一)git的介绍: 1)假设现在有一个文档&#xff0c;你的老板要求你针对于这份文件进行修改&#xff0c;进行完成的修改的版本是版本1&#xff0c;接下来是文档2&#xff0c;修改完文档2以后&#xff0c;接下来老板还不同意&#xff0c;于是又有了文档三&#xff0c;文档四&#x…

海康rtsp拉流,rtmp推流,nginx部署转flv集成

海康rtsp拉流&#xff0c;rtmp推流&#xff0c;nginx部署转flv集成 项目实际使用并测试经正式使用无问题&#xff0c;有问题欢迎评论留言 核心后台java代码&#xff1a; try {// FFmpeg命令String command "ffmpeg -re -i my_video.mp4 -c copy -f flv rtmp://localho…

android11-开机自启脚本

1. 编写myshell脚本 diff --git a/device/rockchip/rk356x/ok3568_r/myshell.sh b/device/rockchip/rk356x/ok3568_r/myshell.sh new file mode 100644 index 0000000000..c78b6d93bd --- /dev/nullb/device/rockchip/rk356x/ok3568_r/myshell.sh-0,0 1,4 #!/vendor/bin/shec…

c# winform chart 单个柱形设置

目前实现到第三张图形,有可以实现四张图形的请大佬帮助。 实现到第三张图的设置如下 private void Form1_Load(object sender, EventArgs e) {// 隐藏标题//chart1.Titles.Clear();// 隐藏图例chart1.Legends.Clear();// 隐藏 Y 轴的网格线和标签chart1.ChartAreas[0].AxisY.…

论文笔记:Bilinear Attention Networks

更精简的论文学习笔记 1、摘要 多模态学习中的注意力网络提供了一种选择性地利用给定视觉信息的有效方法。然而&#xff0c;学习每一对多模态输入通道的注意力分布的计算成本是非常昂贵的。为了解决这个问题&#xff0c;共同注意力为每个模态建立了两个独立的注意分布&#x…

解决驱动模块Licence信息提示

一. 简介 上一篇文章挂载 驱动模块时&#xff0c;提示 驱动模块 Licence相关的信息。文章地址如下&#xff1a; 字符设备驱动的加载与卸载-CSDN博客 本文旨在 解决 "挂载字符驱动模块时&#xff0c;提示 Licence信息的提示" 这个问题。 二. 解决驱动模块Licence信…

[Toolschain cpp ros cmakelist python vscode] 记录写每次项目重复的设置和配置 不断更新

写在前面 用以前的设置&#xff0c;快速配置项目&#xff0c;以防长久不用忘记&#xff0c;部分资料在资源文件里还没有整理 outline cmakelist 复用vscode 找到头文件vscode debug现有代码直接关联远端gitros杂记repo 杂记glog杂记 cmakelist 复用 包含了根据系统路径找库…

0062-Java运算符

文章目录 1.运算符介绍2.算术运算符2.1 介绍2.2 细节说明 3.关系运算符(比较运算符)3.1 介绍3.2 细节说明 4.逻辑运算符4.1 介绍4.2 逻辑运算规则4.3 && 和 & 基本规则4.4 && 和 & 使用区别4.5 || 和 | 基本规则4.6 || 和 | 使用区别 5. ! 取反 基本规…

Jmeter实现性能测试--高并发

高并发场景 高并发场景是指系统在相对短时间内面对大量用户同时访问的情况。这种场景常见于在线服务、电商平台、社交网络、金融交易等需要处理大量并发请求的系统。以下是一些典型的高并发场景&#xff1a; 在线购物活动&#xff1a; 在特定促销或购物节期间&#xff0c;电商…

Databend 开源周报第 124 期

Databend 是一款现代云数仓。专为弹性和高效设计&#xff0c;为您的大规模分析需求保驾护航。自由且开源。即刻体验云服务&#xff1a;https://app.databend.cn 。 Whats On In Databend 探索 Databend 本周新进展&#xff0c;遇到更贴近你心意的 Databend 。 新增对 Delta 和…

智能优化算法应用:基于鼠群算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于鼠群算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于鼠群算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.鼠群算法4.实验参数设定5.算法结果6.参考文献7.MA…