1.需求
统计每个手机号消费总金额,按照消费金额降序排序,最终联通、电信、移动分别写入不同的文件。
130、131、132(联通) 133(电信) 135、136、137、138、139 (移动)
手机号,消费记录
13512345678,50
13512345678,90
13122345678,10
13122345678,110
13212345678,10
13212345678,90
13912345378,10
13912345378,90
13612345678,50
13612345678,55
13312345378,65
13312345378,90
2.将数据上传到hdfs
3.Idea代码
MyPartition
package demo8;
import demo5.DescIntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class MyPartition extends Partitioner<DescIntWritable, Text> {
@Override
public int getPartition(DescIntWritable descIntWritable, Text text, int numPartitions) {
String textStr = text.toString();
boolean arr1 = textStr.startsWith("130") || textStr.startsWith("131") || textStr.startsWith("132");
boolean arr2=textStr.startsWith("133");
if(arr1){
return 0;
}else if(arr2){
return 1;
}
return 2;
}
}
PhoneBillJob
package demo8;
import demo5.DescIntWritable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
public class PhoneBillJob {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS","hdfs://hadoop10:8020");
Job job = Job.getInstance(conf);
job.setJarByClass(PhoneBillJob.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextInputFormat.addInputPath(job,new Path("/phtest/phone.txt"));
TextOutputFormat.setOutputPath(job,new Path("/phtest/out"));
job.setMapperClass(PhoneBillMapper.class);
job.setReducerClass(PhoneBillReducer.class);
//map输出的键与值类型
job.setMapOutputKeyClass(DescIntWritable.class);
job.setMapOutputValueClass(Text.class);
//reducer输出的键与值类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DescIntWritable.class);
//设置reduceTask的个数
job.setNumReduceTasks(3);
//设置自定义分区
job.setPartitionerClass(MyPartition.class);
boolean b = job.waitForCompletion(true);
System.out.println(b);
}
static class PhoneBillMapper extends Mapper<LongWritable, Text,DescIntWritable,Text> {
@Override
protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
String[] arr = value.toString().split(",");
context.write(new DescIntWritable(Integer.parseInt(arr[1])),new Text(arr[0]));
}
}
static class PhoneBillReducer extends Reducer<DescIntWritable,Text,Text,DescIntWritable> {
@Override
protected void reduce(DescIntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(value, key);
}
}
}}
4.在hdfs上查看结果
就一直往前走吧,别回头~