一. mapreduce 概述
mapreduce是一个并行计算框架,它起源于Google的MapReduce论文,它主要用于离线海量数据计算。
- 优点:海量数据离线处理,开发简单,部署方便
- 缺点:仅适用于批处理,不支持实时数据计算
二. wordcount案例
1. 需求
统计一个文件中每个单词出现的次数(文件中每行的多个单词用空格分开),下面是用mapreduce实现wordcount的数据流程:
2. 代码实现
package mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountMR {
// 实现map方法
static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split(" ");
for (String word: words) {
context.write(new Text(word), new IntWritable(1));
}
}
}
// 实现reduce方法
static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(WordCountMR.class);
job.setJobName("WordCount");
// 设置输入,输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 设置Mapper
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置Reducer
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置reduce task 数量
job.setNumReduceTasks(1);
boolean waitFor = job.waitForCompletion(true);
System.exit(waitFor ? 0 : 1);
}
}
maven 依赖
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.4</version>
</dependency>
3. 运行
mapreduce程序运行方式可以分为本地模式和集群模式
本地运行模式:方便程序开发与调试
输入文件:
结果文件:
集群运行模式:可以利用集群的计算资源,一般为生产部署方式
将代码打包,并上传到集群上去。
# 查看输入文件
[root@hadoop1 ~]# hdfs dfs -text /test/a.txt
hello world
name hello
world
# 提交任务
[root@hadoop1 ~]# hadoop jar learn-1.0-SNAPSHOT.jar mr.WordCountMR /test/a.txt /output
# 查看结果文件
[root@hadoop1 ~]# hdfs dfs -text /output/part-r-00000
hello 2
name 1
world 2