【MapReduce】对员工数据按照部门分区并对每个分区排序

news2024/11/28 23:14:40

        员工信息全部存储在emp.csv文件中,员工的属性有:员工id、名称、职位、领导id、雇佣时间、工资、奖金、部门号。

        在MapReduce中想要使用员工的信息,需要对员工进行序列化处理。因为MapReduce是一个分布式框架数据会在不同节点之间进行传输,所以需要将对象转换成字节序列以便于存储或传输。并且如果对象不序列化程序会出错。

一、主类

主类作用:在主类中设置MapReduce中的map类和reduce类,指定分区规则类、设置启动reduce的数量,设置map阶段和reduce阶段的输入输出类型。上传文件。


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class EmployeeMain {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);
        //设置主类
        job.setJarByClass(EmployeeMain.class);
        //设置Map类
        job.setMapperClass(EmployeeMapper.class);
        //设置Reduce类
        job.setReducerClass(SalaryTotalReducer.class);

        //指定分区规则
        job.setPartitionerClass(DeptnoPartitioner.class);
        //设置启动reduce数量
        job.setNumReduceTasks(3);

        job.setMapOutputKeyClass(IntWritable.class);// map阶段的输出的key
        job.setMapOutputValueClass(Employee.class);// map阶段的输出的value

        job.setOutputKeyClass(IntWritable.class);// reduce阶段的输出的key
        job.setOutputValueClass(Employee.class);// reduce阶段的输出的value
        //Windows本地路径
        FileInputFormat.setInputPaths(job, new Path("./src/main/java/serialSortPartitioner/emp.csv"));
        FileOutputFormat.setOutputPath(job, new Path("./src/main/java/serialSortPartitioner/output"));
        System.out.println("计算开始---------------");
        boolean res = job.waitForCompletion(true);
        System.out.println("计算结束---------------");
    }

}

二、员工类


import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


public class Employee implements Writable{
    //员工id
    private int empno;
    //员工名称
    private String ename;
    //员工职位
    private String job;
    //直接领导的员工id
    private int mgr;
    //雇佣时间
    private String hiredate;
    //工资
    private int sal;
    //奖金
    private int comm;
    //部门号
    private int deptno;

    public Employee(){}


    //序列化
    public void write(DataOutput out) throws IOException {
        out.writeInt(this.empno);
        out.writeUTF(this.ename);
        out.writeUTF(this.job);
        out.writeInt(this.mgr);
        out.writeUTF(this.hiredate);
        out.writeInt(this.sal);
        out.writeInt(this.comm);
        out.writeInt(this.deptno);
    }

    //反序列化
    public void readFields(DataInput in) throws IOException {
        this.empno = in.readInt();
        this.ename = in.readUTF();
        this.job = in.readUTF();
        this.mgr = in.readInt();
        this.hiredate = in.readUTF();
        this.sal = in.readInt();
        this.comm = in.readInt();
        this.deptno = in.readInt();

    }

    public int getEmpno() {
        return empno;
    }

    public void setEmpno(int empno) {
        this.empno = empno;
    }

    public String getEname() {
        return ename;
    }

    public void setEname(String ename) {
        this.ename = ename;
    }

    public String getJob() {
        return job;
    }

    public void setJob(String job) {
        this.job = job;
    }

    public int getMgr() {
        return mgr;
    }

    public void setMgr(int mgr) {
        this.mgr = mgr;
    }

    public String getHiredate() {
        return hiredate;
    }

    public void setHiredate(String hiredate) {
        this.hiredate = hiredate;
    }

    public Integer getSal() {
        return sal;
    }

    public void setSal(int sal) {
        this.sal = sal;
    }

    public int getComm() {
        return comm;
    }

    public void setComm(int comm) {
        this.comm = comm;
    }

    public int getDeptno() {
        return deptno;
    }

    public void setDeptno(int deptno) {
        this.deptno = deptno;
    }

    @Override
    public String toString() {
        return "Employee{" +
                "empno=" + empno +
                ", ename='" + ename + '\'' +
                ", job='" + job + '\'' +
                ", mgr=" + mgr +
                ", hiredate='" + hiredate + '\'' +
                ", sal=" + sal +
                ", comm=" + comm +
                ", deptno=" + deptno +
                '}';
    }
}

三、map类

map类主要作用是输入员工的数据到MapReduce中。


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class EmployeeMapper extends Mapper<LongWritable, Text, IntWritable, Employee> {
    protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
        //get values string
        String v1string = v1.toString();
        //spile string
        String words[] = v1string.split(",");
        //map out key/value
        //System.out.println("display this turn <key,1> ");
        Employee e = new Employee();
        //员工号
        e.setEmpno(Integer.parseInt(words[0]));
        //姓名
        e.setEname(words[1]);
        //职位
        e.setJob(words[2]);
        //老板号
        try {
            e.setMgr(Integer.parseInt(words[3]));
        } catch (Exception e1) {
            //没有老板号
            e.setMgr(-1);
        }
        //入职日期
        e.setHiredate(words[4]);
        //工资
        e.setSal(Integer.parseInt(words[5]));
        //奖金
        try {
            e.setComm(Integer.parseInt(words[6]));

        } catch (Exception e2) {
            e.setComm(0);
        }
        //部门号
        e.setDeptno(Integer.parseInt(words[7]));
//        System.out.println("map   " + e.toString());
        //根据部门号作为关键字,进行默认排序,也可以设置为空
        context.write(new IntWritable(e.getDeptno()), e);
    }

    @Override
    public void run(Context context) throws IOException, InterruptedException {
        super.run(context);
    }
}

三、分区类

根据部门号进行分区


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class DeptnoPartitioner extends Partitioner<IntWritable, Employee> {

    //根据部门号设置分区
    @Override
    public int getPartition(IntWritable k2, Employee v2, int numPartitions) {
        // TODO Auto-generated method stub
        if (v2.getDeptno() <= 10) {
            return 0;
        } else if (v2.getDeptno() <= 20) {
            return 1;
        } else return 2;
    }
}

四、reduce类

设置的分区数量和启动的reduce数量相同(在主类中设置启动数量),在reduce类中进行排序就可以实现每个分区进行自定义排序。


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.*;

public class SalaryTotalReducer extends Reducer<IntWritable, Employee, NullWritable, Employee> {
    Comparator comparetor = new compareclass();//TreeMap对象可以使用降序的比较器
    private TreeMap<Integer, Employee> repToRecordMap =
            new TreeMap<Integer, Employee>(comparetor); //如果参数为空是默认升序比较器

    protected void reduce(IntWritable k3, Iterable<Employee> v3,
                          Context context)
            throws IOException, InterruptedException {

        //在这里自定义排序
        for (Employee e : v3) {
            repToRecordMap.put(e.getSal(),e);
        }
        //在这里获取排序后的结果
        for (Integer e : repToRecordMap.keySet()) {
            //在这里工资数据会改变(原因未知),需要重新设置为原来的工资
            repToRecordMap.get(e).setSal(e);
            context.write(NullWritable.get(),repToRecordMap.get(e));
        }
    }
}


class compareclass implements Comparator<Integer> {
    //返回一个基本类型的整型,谁大谁排后面(升序).
    //返回负数表示:o1 小于o2
    //返回0表示:表示:o1和o2相等
    //返回正数表示:o1大于o2。
    //默认用o1-o2,创建TreeMap对象时可以不用这个继承类,但是要降序,必须修改compare里面的逻辑o2-o1
    //谁大谁排在前面(降序)用o2-o1
    @Override
    //排序
    public int compare(Integer o1, Integer o2) {
        // TODO Auto-generated method stub
        return o1 - o2;
    }
}

运行结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1392397.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

用原型实现Class的各项语法

本人之前对Class一直不够重视。平时对原型的使用&#xff0c;也仅限于在构造函数的prototype上挂属性。原型尚且用不着&#xff0c;更何况你Class只是原型的一颗语法糖&#xff1f; 直到公司开始了一个webgis项目&#xff0c;使用openlayers。看了下openlayers的代码&#xff0…

Recv设置MSG_DONTWAIT依然阻塞

服务器上有如下代码&#xff1a; bool recv_handler(connection_t &connection){int fd connection.get_fd();char temp_buffer[2048];while (true){// 清空缓冲区bzero(temp_buffer, 2048);// 设置非阻塞标志MSG_DONTWAITssize_t recv_ret recv(fd, temp_buffer, 2048, …

RabbitMQ常见问题之消息堆积

文章目录 一、介绍二、使用惰性队列1. 基于Bean2. 基于RabbitListener 一、介绍 当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。最 早接收到的消息&#xff0c;可能就会成为死信&#xff0c;会被丢弃&#xff0c;这就…

CSS 超可爱的眼睛转动效果

<template><view class="content"><view class="loader"></view></view> </template><script></script><style>body {background-color: #212121;/* 设置背景颜色为深灰色 */}.content {display: f…

2024中国光伏展

2024年中国光伏展预计将是一个规模庞大的展览&#xff0c;吸引了全球光伏行业的专业人士和企业参与。光伏展将为各个光伏领域的企业提供一个展示最新技术、产品和解决方案的平台。 在2024年的中国光伏展上&#xff0c;参展企业将能够展示他们的光伏组件、太阳能电池板、逆变器、…

MyBatisPlus学习笔记三-核心功能

接上篇&#xff1a; MyBatisPlus学习笔记二-CSDN博客 1、核心功能-IService开发基础业务接口 1.1、介绍 1.2、引用依赖 1.3、配置文件 1.4、用例-新增 1.5、用例-删除 1.6、用例-根据id查询 1.7、用例-根据ids查询 2、核心功能-IService开发复杂业务接口 2.1、实例-更新 3、…

寒假已开启,你的毕业论文写到哪了?

先来看1分钟的视频&#xff0c;对于要写论文的你来说&#xff0c;绝对有所值&#xff01; 还在为写论文焦虑&#xff1f;免费AI写作大师来帮你三步搞定 体验免费智元兔AI写作&#xff1a;智元兔AI 第一步&#xff1a;输入关键信息 第二步&#xff1a;生成大纲 稍等片刻后&…

还不会装箱与拆箱?!看这篇,你正在变强!!

定义 装箱与拆箱允许程序员在基本数据类型和相应的包装类之间自动转换。 装箱指的是基本类型的值包装在包装类的对象中。例如&#xff0c;将int类型的值包装在一个Integer对象中。 拆箱则是相反的过程&#xff0c;将包装类的对象转换为基本类型的值。 手动和自动的装拆箱 装…

B端产品经理学习-版本规划管理

首先我们回顾一下用户故事&#xff0c;用户故事有如下特点&#xff1a; PRD文档的特点则如下&#xff1a; B端产品中用户角色不同&#xff0c;需求侧重也不同 决策人——公司战略需求&#xff1a;转型升级、降本增效、品牌提升等 管理负责人——公司管理需求&#xff1a;提升…

❤ Uniapp使用三( 打包和发布上线)

❤ Uniapp使用三( 打包和发布上线) 一、介绍 什么是 uniapp&#xff1f; uniapp 是一种基于 Vue.js 的多平台开发框架&#xff0c;它可以同时用于开发安卓、iOS、H5 等多个平台。因此&#xff0c;只需要写一次代码就可以在多个平台上运行&#xff0c;提高了开发效率。 打包…

球幕影院气膜:未来娱乐的奇妙之旅

球幕影院气膜&#xff1a;未来娱乐的奇妙之旅 在科技日新月异的时代&#xff0c;娱乐体验的创新与演变从未停歇。气膜球幕影院&#xff0c;作为一项领航未来的前沿科技&#xff0c;正以其沉浸感和颠覆性的观影体验&#xff0c;吸引着人们驻足体验。 创新科技的巅峰之作 气膜球幕…

轻松识别Midjourney等AI生成图片,开源GenImage

AIGC时代&#xff0c;人人都可以使用Midjourney、Stable Diffusion等AI产品生成高质量图片&#xff0c;其逼真程度肉眼难以区分真假。这种虚假照片有时会对社会产生不良影响&#xff0c;例如&#xff0c;生成公众人物不雅图片用于散播谣言&#xff1b;合成虚假图片用于金融欺诈…

好消息,Linux Kernel 6.7正式发布!

据有关资料显示&#xff0c;该版本是有史以来合并数最多的版本之一&#xff0c;包含 17k 个非合并 commit&#xff0c;实际合并的超过1K个。 那么该版本主要有哪边变化呢&#xff1f;下面我来一一列举一下&#xff1a; Bcachefs文件系统已被合并到主线内核&#xff0c;这是一款…

springboot第49集:【思维导图】多线程,常用类与基础API,集合框架,泛型,数据结构源码...

多线程创建方式一&#xff1a;继承Thread类多线程创建方式二&#xff1a;实现Runnable接口jdk5.0新增两种创建多线程的方式 image.png image.png image.png image.png image.png new Thread(new Runnable() {public void run() {for (int i 1; i < 100; i) {if (i % 2 0) …

蓝桥杯练习题-穷举模拟

&#x1f4d1;前言 本文主要是【穷举模拟】——蓝桥杯练习题-穷举模拟的文章&#xff0c;如果有什么需要改进的地方还请大佬指出⛺️ &#x1f3ac;作者简介&#xff1a;大家好&#xff0c;我是听风与他&#x1f947; ☁️博客首页&#xff1a;CSDN主页听风与他 &#x1f304;…

Apollo添加新的lidar检测算法

lidar检测算法 概述架构体系添加lidar检测算法定义一个继承基类 base_lidar_detector 的类实现新类 NewLidarDetector为新类 NewLidarDetector 配置config和param的proto文件更新 lidar_obstacle_detection.conf 主页传送门 &#xff1a; &#x1f4c0; 传送 概述 lidar&#…

使用WAF防御网络上的隐蔽威胁之扫描器

在网络安全领域&#xff0c;扫描器是用于侦察和识别网络系统漏洞的工具。 它们可以帮助网络管理员识别安全漏洞&#xff0c;也可能被攻击者用来寻找攻击目标。 扫描器的基本概念 定义&#xff1a;扫描器是一种自动化工具&#xff0c;用于探测网络和服务器中的漏洞、开放端口、…

软件测试面试题整理

软件测试的几个阶段 在进行Beta测试之前和之后&#xff0c;通常会进行以下几种测试&#xff1a; 内部测试&#xff08;Internal Testing&#xff09; 在Beta测试之前&#xff0c;开发团队会进行内部测试&#xff0c;对软件进行全面的测试。这个阶段包括单元测试、集成测试和系…

使用原生input模拟器样式正常,但是真机上 input框溢出

目录 一、问题 二、解决方法 三、总结 tiips:如嫌繁琐&#xff0c;直接移步总结即可&#xff01; 一、问题 1.使用原生input写了一个搜索框&#xff0c;在模拟器和pc上一切正常。但是打包放到手机上&#xff0c;样式就有问题&#xff1a;这个搜索框的布局是正常的&#xf…

Python 布尔类型:了解真假之间的探索

Python 是一种备受欢迎的编程语言&#xff0c;以其简洁、灵活和易学易用而闻名。其中一个重要的数据类型就是布尔类型&#xff08;bool&#xff09;&#xff0c;它代表了逻辑上的真&#xff08;True&#xff09;和假&#xff08;False&#xff09;。在 Python 中&#xff0c;布…