给一个命名为:friend.txt的文件
其中每一行中给出两个名字,中间用空格分开。(下图为文件内容)
题目:《查找出可能认识的人 》
代码如下:
RelationMapper:
package com.fesco.friend;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class RelationMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
// 拆分人名
String[] arr = value.toString().split(" ");
context.write(new Text(arr[0]), new Text(arr[1]));
}
}
RelationReducer :
package com.fesco.friend;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
public class RelationReducer extends Reducer<Text, Text, Text, IntWritable> {
// 真的认识
private static final IntWritable trueFriend = new IntWritable(1);
// 可能认识
private static final IntWritable fakeFriend = new IntWritable(0);
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// key = tom
// values = rose jim smith lucy
String name = key.toString();
// 迭代器values本身是一个伪迭代器,只能迭代一次
// 所以还需要自己定义集合来存储好友列表
List<String> fs = new LinkedList<>();
// 确定真实好友关系
for (Text value : values) {
String f = value.toString();
fs.add(f);
if (name.compareTo(f) <= 0) context.write(new Text(name + "-" + f), trueFriend);
else context.write(new Text(f + "-" + name), trueFriend);
}
// 推测好友关系
for (int i = 0; i < fs.size() - 1; i++) {
String f1 = fs.get(i);
for (int j = i + 1; j < fs.size() ; j++) {
String f2 = fs.get(j);
if(f1.compareTo(f2) <= 0) context.write(new Text(f1 + "-" + f2), fakeFriend);
else context.write(new Text(f2 + "-" + f1), fakeFriend);
}
}
}
}
RelatioDriver:
package com.fesco.friend;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class RelationDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(RelationDriver.class);
job.setMapperClass(RelationMapper.class);
job.setReducerClass(RelationReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("hdfs://10.16.3.181:9000/txt/friend.txt"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://10.16.3.181:9000/result/relation"));
job.waitForCompletion(true);
}
}
FriendMapper:
package com.fesco.friend;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FriendMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
// 拆分数据
String[] arr = value.toString().split("\t");
context.write(new Text(arr[0]), new LongWritable(Long.parseLong(arr[1])));
}
}
FriendReducer:
package com.fesco.friend;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FriendReducer extends Reducer<Text, LongWritable, Text, Text> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, Text>.Context context) throws IOException, InterruptedException {
// 想要验证l两个人是否认识,验证逻辑:如果出现了数字1,说明两个人真的认识,那么就不是要找的可能认识的人
// 如果遍历完成,全部都是数字0,那么说明这俩人真的是不认识,但是两个人有共同好友
for (LongWritable value : values) {
if (value.get() == 1) return ;
}
// 循环完成没有return,说明全部都是数字0
String[] arr = key.toString().split("-");
context.write(new Text(arr[0]), new Text(arr[1]));
}
}
FriendDriver:
package com.fesco.friend;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FriendDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FriendDriver.class);
job.setMapperClass(FriendMapper.class);
job.setReducerClass(FriendReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("hdfs://10.16.3.181:9000/result/relation"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://10.16.3.181:9000/result/friend"));
job.waitForCompletion(true);
}
}