MapReduce默认的输出格式为TextOutputFormat,它的父类是FileOutputFormat,即按行来写,且内容写到一个文本文件中去,但是并不能满足我们实际开发中的所有需求,所以就需要我们自定义OutPutFormat。
自定义OutPutFormat
输出数据到MySQL、HBase或者Elasticsearch等存储框架中。
步骤
继承FileOutputFormat
继承RecordWriter类
重写write方法
案例
需求
过滤log日志,将包含sxau的网站输出到sxau.log,其他则输出到other.log
输入数据
www.baidu.com
www.google.com
www.sxau.com
www.jd.com
www.bing.com
www.sina.com
www.csdn.com
www.github.com
LogMapper类
public class LogMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//直接写出
context.write(value,NullWritable.get());
}
}
LogReducer类
public class LogReducer extends Reducer<Text, NullWritable,Text,NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
//防止相同数据丢失
for (NullWritable value : values) {
context.write(key,NullWritable.get());
}
}
}
MyOutputFormat类
返回我们自定义的RecordWriter类
public class MyOutputFormat extends FileOutputFormat<Text, NullWritable> {
@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
//将job对象传进去,才能方便对数据流进行操作
MyRecordWriter recordWriter = new MyRecordWriter(job);
return recordWriter;
}
}
MyRecordWriter类
核心是重写write方法
public class MyRecordWriter extends RecordWriter<Text, NullWritable> {
private FSDataOutputStream sxauOut;
private FSDataOutputStream otherOut;
public MyRecordWriter(TaskAttemptContext job) {
//创建两个流
try {
FileSystem fs = FileSystem.get(job.getConfiguration());
//输出路径
sxauOut = fs.create(new Path("D:\\MapReduce_Data_Test\\myoutputformat\\sxau.log"));
otherOut = fs.create(new Path("D:\\MapReduce_Data_Test\\myoutputformat\\other.log"));
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
String log = key.toString();
System.out.println(log);
//具体的写出
if (log.contains("sxau")){
sxauOut.writeBytes(log+"\n");
}else {
otherOut.writeBytes(log+"\n");
}
}
@Override
public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
//关闭数据流
IOUtils.closeStream(sxauOut);
IOUtils.closeStream(otherOut);
}
}
LogRunner类
设置输出格式
public class LogRunner extends Configured implements Tool {
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(),new LogRunner(),args);
}
@Override
public int run(String[] args) throws Exception {
//1.获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "my outputFormat");
//2.配置jar包路径
job.setJarByClass(LogRunner.class);
//3.关联mapper和reducer
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
//4.设置map、reduce输出的k、v类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//设置自定义输出格式
job.setOutputFormatClass(MyOutputFormat.class);
//5.设置统计文件输入的路径,将命令行的第一个参数作为输入文件的路径
FileInputFormat.setInputPaths(job,new Path("D:\\MapReduce_Data_Test\\myoutputformat\\input"));
//6.设置结果数据存放路径,将命令行的第二个参数作为数据的输出路径
FileOutputFormat.setOutputPath(job,new Path("D:\\MapReduce_Data_Test\\myoutputformat\\output1"));
return job.waitForCompletion(true) ? 0 : 1;//verbose:是否监控并打印job的信息
}
}