员工信息全部存储在emp.csv文件中,员工的属性有:员工id、名称、职位、领导id、雇佣时间、工资、奖金、部门号。
在MapReduce中想要使用员工的信息,需要对员工进行序列化处理。因为MapReduce是一个分布式框架数据会在不同节点之间进行传输,所以需要将对象转换成字节序列以便于存储或传输。并且如果对象不序列化程序会出错。
一、主类
主类作用:在主类中设置MapReduce中的map类和reduce类,指定分区规则类、设置启动reduce的数量,设置map阶段和reduce阶段的输入输出类型。上传文件。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class EmployeeMain {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置主类
job.setJarByClass(EmployeeMain.class);
//设置Map类
job.setMapperClass(EmployeeMapper.class);
//设置Reduce类
job.setReducerClass(SalaryTotalReducer.class);
//指定分区规则
job.setPartitionerClass(DeptnoPartitioner.class);
//设置启动reduce数量
job.setNumReduceTasks(3);
job.setMapOutputKeyClass(IntWritable.class);// map阶段的输出的key
job.setMapOutputValueClass(Employee.class);// map阶段的输出的value
job.setOutputKeyClass(IntWritable.class);// reduce阶段的输出的key
job.setOutputValueClass(Employee.class);// reduce阶段的输出的value
//Windows本地路径
FileInputFormat.setInputPaths(job, new Path("./src/main/java/serialSortPartitioner/emp.csv"));
FileOutputFormat.setOutputPath(job, new Path("./src/main/java/serialSortPartitioner/output"));
System.out.println("计算开始---------------");
boolean res = job.waitForCompletion(true);
System.out.println("计算结束---------------");
}
}
二、员工类
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class Employee implements Writable{
//员工id
private int empno;
//员工名称
private String ename;
//员工职位
private String job;
//直接领导的员工id
private int mgr;
//雇佣时间
private String hiredate;
//工资
private int sal;
//奖金
private int comm;
//部门号
private int deptno;
public Employee(){}
//序列化
public void write(DataOutput out) throws IOException {
out.writeInt(this.empno);
out.writeUTF(this.ename);
out.writeUTF(this.job);
out.writeInt(this.mgr);
out.writeUTF(this.hiredate);
out.writeInt(this.sal);
out.writeInt(this.comm);
out.writeInt(this.deptno);
}
//反序列化
public void readFields(DataInput in) throws IOException {
this.empno = in.readInt();
this.ename = in.readUTF();
this.job = in.readUTF();
this.mgr = in.readInt();
this.hiredate = in.readUTF();
this.sal = in.readInt();
this.comm = in.readInt();
this.deptno = in.readInt();
}
public int getEmpno() {
return empno;
}
public void setEmpno(int empno) {
this.empno = empno;
}
public String getEname() {
return ename;
}
public void setEname(String ename) {
this.ename = ename;
}
public String getJob() {
return job;
}
public void setJob(String job) {
this.job = job;
}
public int getMgr() {
return mgr;
}
public void setMgr(int mgr) {
this.mgr = mgr;
}
public String getHiredate() {
return hiredate;
}
public void setHiredate(String hiredate) {
this.hiredate = hiredate;
}
public Integer getSal() {
return sal;
}
public void setSal(int sal) {
this.sal = sal;
}
public int getComm() {
return comm;
}
public void setComm(int comm) {
this.comm = comm;
}
public int getDeptno() {
return deptno;
}
public void setDeptno(int deptno) {
this.deptno = deptno;
}
@Override
public String toString() {
return "Employee{" +
"empno=" + empno +
", ename='" + ename + '\'' +
", job='" + job + '\'' +
", mgr=" + mgr +
", hiredate='" + hiredate + '\'' +
", sal=" + sal +
", comm=" + comm +
", deptno=" + deptno +
'}';
}
}
三、map类
map类主要作用是输入员工的数据到MapReduce中。
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class EmployeeMapper extends Mapper<LongWritable, Text, IntWritable, Employee> {
protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
//get values string
String v1string = v1.toString();
//spile string
String words[] = v1string.split(",");
//map out key/value
//System.out.println("display this turn <key,1> ");
Employee e = new Employee();
//员工号
e.setEmpno(Integer.parseInt(words[0]));
//姓名
e.setEname(words[1]);
//职位
e.setJob(words[2]);
//老板号
try {
e.setMgr(Integer.parseInt(words[3]));
} catch (Exception e1) {
//没有老板号
e.setMgr(-1);
}
//入职日期
e.setHiredate(words[4]);
//工资
e.setSal(Integer.parseInt(words[5]));
//奖金
try {
e.setComm(Integer.parseInt(words[6]));
} catch (Exception e2) {
e.setComm(0);
}
//部门号
e.setDeptno(Integer.parseInt(words[7]));
// System.out.println("map " + e.toString());
//根据部门号作为关键字,进行默认排序,也可以设置为空
context.write(new IntWritable(e.getDeptno()), e);
}
@Override
public void run(Context context) throws IOException, InterruptedException {
super.run(context);
}
}
三、分区类
根据部门号进行分区
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class DeptnoPartitioner extends Partitioner<IntWritable, Employee> {
//根据部门号设置分区
@Override
public int getPartition(IntWritable k2, Employee v2, int numPartitions) {
// TODO Auto-generated method stub
if (v2.getDeptno() <= 10) {
return 0;
} else if (v2.getDeptno() <= 20) {
return 1;
} else return 2;
}
}
四、reduce类
设置的分区数量和启动的reduce数量相同(在主类中设置启动数量),在reduce类中进行排序就可以实现每个分区进行自定义排序。
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.*;
public class SalaryTotalReducer extends Reducer<IntWritable, Employee, NullWritable, Employee> {
Comparator comparetor = new compareclass();//TreeMap对象可以使用降序的比较器
private TreeMap<Integer, Employee> repToRecordMap =
new TreeMap<Integer, Employee>(comparetor); //如果参数为空是默认升序比较器
protected void reduce(IntWritable k3, Iterable<Employee> v3,
Context context)
throws IOException, InterruptedException {
//在这里自定义排序
for (Employee e : v3) {
repToRecordMap.put(e.getSal(),e);
}
//在这里获取排序后的结果
for (Integer e : repToRecordMap.keySet()) {
//在这里工资数据会改变(原因未知),需要重新设置为原来的工资
repToRecordMap.get(e).setSal(e);
context.write(NullWritable.get(),repToRecordMap.get(e));
}
}
}
class compareclass implements Comparator<Integer> {
//返回一个基本类型的整型,谁大谁排后面(升序).
//返回负数表示:o1 小于o2
//返回0表示:表示:o1和o2相等
//返回正数表示:o1大于o2。
//默认用o1-o2,创建TreeMap对象时可以不用这个继承类,但是要降序,必须修改compare里面的逻辑o2-o1
//谁大谁排在前面(降序)用o2-o1
@Override
//排序
public int compare(Integer o1, Integer o2) {
// TODO Auto-generated method stub
return o1 - o2;
}
}