一、原始消费数据buy.txt
zhangsan 5676 2765 887
lisi 6754 3234 1232
wangwu 3214 6654 388
lisi 1123 4534 2121
zhangsan 982 3421 5566
zhangsan 1219 36 45
二、实现思路:先通过一个MapReduce将顾客的消费金额进行汇总,再通过一个MapReduce来根据金额进行排序
三、定义一个实体类(其中compareTo方法实现了排序规则):
package cn.edu.tju;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class Buy implements WritableComparable<Buy> {
private double jingdong;
private double taobao;
private double duodian;
public Buy() {
}
public Buy(double jingdong, double taobao, double duodian) {
this.jingdong = jingdong;
this.taobao = taobao;
this.duodian = duodian;
}
public double getJingdong() {
return jingdong;
}
public void setJingdong(double jingdong) {
this.jingdong = jingdong;
}
public double getTaobao() {
return taobao;
}
public void setTaobao(double taobao) {
this.taobao = taobao;
}
public double getDuodian() {
return duodian;
}
public void setDuodian(double duodian) {
this.duodian = duodian;
}
@Override
public String toString() {
return "" +
"" + jingdong +
"\t" + taobao +
"\t" + duodian
;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeDouble(jingdong);
out.writeDouble(taobao);
out.writeDouble(duodian);
}
@Override
public void readFields(DataInput in) throws IOException {
this.jingdong =in.readDouble();
this.taobao = in.readDouble();
this.duodian = in.readDouble();
}
@Override
public int compareTo(Buy o) {
if(this.jingdong>o.getJingdong()){
return 1;
} else if(this.getJingdong()< o.getJingdong()){
return -1;
} else {
if(this.getTaobao()>o.getTaobao()){
return 1;
}else if(this.getTaobao()< o.getTaobao()){
return -1;
} else return 0;
}
}
}
四、定义第一对Mapper和Reducer
package cn.edu.tju;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MyBuyMapper1 extends Mapper<LongWritable, Text, Text, Buy> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String str = value.toString();
String[] fieldList = str.split(" ");
double jingdong = Double.parseDouble(fieldList[1]);
double taobao = Double.parseDouble(fieldList[2]);
double duodian = Double.parseDouble(fieldList[3]);
String person = fieldList[0];
context.write(new Text(person), new Buy(jingdong,taobao,duodian));
}
}
package cn.edu.tju;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;
public class MyBuyReducer1 extends Reducer<Text, Buy, Text, Buy> {
@Override
protected void reduce(Text key, Iterable<Buy> values, Reducer<Text, Buy, Text, Buy>.Context context) throws IOException, InterruptedException {
double sum1 = 0;
double sum2 = 0;
double sum3 = 0;
Iterator<Buy> iterator = values.iterator();
while (iterator.hasNext()) {
Buy next = iterator.next();
sum1 += next.getJingdong();
sum2 += next.getTaobao();
sum3 += next.getDuodian();
}
context.write(key, new Buy(sum1, sum2, sum3));
}
}
五、定义第二对Mapper和Reducer
package cn.edu.tju;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MyBuyMapper2 extends Mapper<LongWritable, Text, Buy, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String str = value.toString();
String[] fieldList = str.split("\t");
double jingdong = Double.parseDouble(fieldList[1]);
double taobao = Double.parseDouble(fieldList[2]);
double duodian = Double.parseDouble(fieldList[3]);
String person = fieldList[0];
context.write(new Buy(jingdong,taobao,duodian), new Text(person));
}
}
package cn.edu.tju;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;
public class MyBuyReducer2 extends Reducer<Buy, Text, Text, Buy> {
@Override
protected void reduce(Buy key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Iterator<Text> iterator = values.iterator();
while(iterator.hasNext()){
Text next = iterator.next();
context.write(next, key);
}
}
}
六、定义主类,其中定义两个Job,等第一个job运行结束之后第二Job开始运行
package cn.edu.tju;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MyBuyMain2 {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration(true);
configuration.set("mapreduce.framework.name", "local");
Job job = Job.getInstance(configuration);
//
job.setJarByClass(MyBuyMain.class);
//job name
job.setJobName("buy-" + System.currentTimeMillis());
//设置Reducer数量
//job.setNumReduceTasks(3);
//输入数据路径
FileInputFormat.setInputPaths(job, new Path("D:\\tool\\TestHadoop3\\buy.txt"));
//输出数据路径,当前必须不存在
FileOutputFormat.setOutputPath(job, new Path("count_1" ));
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Buy.class);
job.setMapperClass(MyBuyMapper1.class);
job.setReducerClass(MyBuyReducer1.class);
//等待任务执行完成
job.waitForCompletion(true);
Job job2 = Job.getInstance(configuration);
job2.setJarByClass(MyBuyMain2.class);
job2.setJobName("buy2-" + System.currentTimeMillis());
FileInputFormat.setInputPaths(job2, new Path("D:\\tool\\TestHadoop3\\count_1\\part-r-00000"));
//输出数据路径,当前必须不存在
FileOutputFormat.setOutputPath(job2, new Path("count_2" ));
job2.setMapOutputKeyClass(Buy.class);
job2.setMapOutputValueClass(Text.class);
job2.setMapperClass(MyBuyMapper2.class);
job2.setReducerClass(MyBuyReducer2.class);
//等待任务执行完成
job2.waitForCompletion(true);
}
}
七、运行结果: