文章目录
- 词频统计实现思路
- 词频统计实现步骤
- 1. 准备数据文件
- 1.1 在虚拟机上创建文本文件
- 1.2 上传文件到HDFS指定目录
- 2. 创建Maven项目
- 3. 添加相关依赖
- 4. 创建日志属性文件
- 5. 创建词频统计映射类
- 6. 创建词频统计驱动器类
- 7. 运行词频统计驱动器类,查看结果
- 8. 修改词频统计映射器类
- 9. 修改词频统计驱动器类
- 10. 启动词频统计驱动器类,查看结果
- 11. 创建词频统计归并器类
- 12. 修改词频统计驱动器类
- 13. 启动词频统计驱动器,查看结果
- 14. 采用多个Reduce做合并
- 15. 将三个类合并成一个类完成词频统计
词频统计实现思路
词频统计实现步骤
1. 准备数据文件
1.1 在虚拟机上创建文本文件
- 创建wordcount目录,在里面创建words.txt文件
1.2 上传文件到HDFS指定目录
- 创建/wordcount/input目录,执行命令:
hadoop fs -mkdir -p /wordcount/input
- 将文本文件words.txt,上传到HDFS的/wordcount/input目录
- 在Hadoop WebUI界面上查看上传的文件
2. 创建Maven项目
- 创建Maven项目 - MRWordCount
3. 添加相关依赖
- 在pom.xml文件里添加hadoop和junit依赖
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.4</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
</dependency>
</dependencies>
4. 创建日志属性文件
- 在resources目录里创建log4j.properties文件
log4j.rootLogger=INFO, stdout, logfile
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/wordcount.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
5. 创建词频统计映射类
- 创建net.kox.mr包,在包里创建WordCountMapper类
- 为了更好理解Mapper类的作用,在map()函数里暂时不进行每行文本分词处理,直接利用context输出key和value。
package net.kox.mr;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(key, value);
}
}
- Mapper参数说明
6. 创建词频统计驱动器类
- 创建WordCountDriver类
- 编写程序:
package net.kox.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.net.URI;
public class WordCountDriver {
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
conf.set("dfs.client.use.datanode.hostname", "true");
Job job = Job.getInstance(conf);
job.setJarByClass(WordCountDriver.class);
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
String uri = "hdfs://master:9000";
Path inputPath = new Path(uri + "/wordcount/input");
Path outputPath = new Path(uri + "/wordcount/output");
FileSystem fs = FileSystem.get(new URI(uri), conf);
fs.delete(outputPath, true);
FileInputFormat.addInputPath(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
job.waitForCompletion(true);
System.out.println("===统计结果===");
FileStatus[] fileStatuses = fs.listStatus(outputPath);
for (int i = 1; i < fileStatuses.length; i++) {
System.out.println(fileStatuses[i].getPath());
FSDataInputStream in = fs.open(fileStatuses[i].getPath());
IOUtils.copyBytes(in, System.out, 4096, false);
}
}
}
7. 运行词频统计驱动器类,查看结果
- 统计结果之前会显示大量信息
- 如果不想看到统计结果之前的大堆信息,可以修改log4j.properties文件,将INFO改为ERROR
- 再次运行程序,查看结果
- 利用Hadoop WebUI界面查看结果文件
8. 修改词频统计映射器类
- 行首数字对于我们做单词统计没有任何用处,只需要拿到每一行内容,按空格拆分成单词,每个单词计数1,因此,WordCoutMapper的输出应该是单词和个数,于是,输出键类型为Text,输出值类型为IntWritable。
- 将每行按空格拆分成单词数组,输出<单词, 1>的键值对
- WordCountMapper.java:
package net.kox.mr;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for (int i = 0; i < words.length; i++) {
context.write(new Text(words[i]), new IntWritable(1));
}
}
}
9. 修改词频统计驱动器类
- 修改map任务输出键值类型
10. 启动词频统计驱动器类,查看结果
- 观察输出结果,map阶段会按键排序输出
11. 创建词频统计归并器类
- 一个类继承Reducer,变成一个Reducer组件类
- Reducer组件会接收Mapper组件的输出结果
- 第一个泛型对应的是Mapper输出key类型
- 第二个泛型对应的是Mapper输出value类型
- 第三个泛型和第四个泛型是Reducer的输出key类型和输出value类型
- Reducer组件不能单独存在,但是Mapper组件可以单独存在
- 当引入Reducer组件后,输出结果文件内容就是Reducer的输出key和输出value
- 创建WordCountReducer类
- 编写程序
package net.kox.mr;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count = count + value.get();
}
context.write(key, new IntWritable(count));
}
}
12. 修改词频统计驱动器类
- 设置WordCountReducer,并且设置归并任务的输出键值类型
job.setReducerClass(WordCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
13. 启动词频统计驱动器,查看结果
- 统计出每个单词出现的次数
(1)MR框架有两个核心组件,分别是Mapper组件和Reducer组件
(2)写一个类,继承Mapper,则变成了一个Mapper组件类
(3)LongWritable,Text(String),IntWritable,NullWritable都是Hadoop序列化类型
(4)Mapper组件将每行的行首偏移量,作为输入key,通过map()传给程序员
(5)Mapper组件会将每行内容,作为输入value,通过map()传给程序员,重点是获取输入value
(6)Mapper的第一个泛型类型对应的是输入key的类型,第二个泛型类型对应的输入value(在初学阶段,第一个和第二个类型写死)
(7)MR框架所处理的文件必须是在HDFS上的
(8)map()被调用几次,取决于文件的行数
(9)通过context进行结果的输出,以输出key和输出value的形式来输出
(10)输出key是由第三个泛型类型决定,输出value是由第四个泛型类型决定
(11)输出结果文件的数据以及行数取决于context.write
(12)Text=>String: value.toString()
(13)String=>Text: new Text(string var)
(14)LongWritable=>long: key.get()
(15)long=>LongWritable: new LongWritable(long var)
14. 采用多个Reduce做合并
- 修改词频统计驱动器类,设置分区数量
- 运行程序,查看结果
- 可以看到,产生了三个结果文件
15. 将三个类合并成一个类完成词频统计
- 创建WordCount类
- 编写程序:
package net.kox.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.net.URI;
public class WordCount extends Configured implements Tool {
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for (int i = 0; i < words.length; i++) {
String word = words[i].replaceAll("[\\pP]", "");
context.write(new Text(word), new IntWritable(1));
}
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count = count + value.get();
}
context.write(key, new IntWritable(count));
}
}
@Override
public int run(String[] strings) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(WordCount.class);
job.setMapperClass(Map.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
String uri = "hdfs://master:9000";
Path inputPath = new Path(uri + "/wordcount/input");
Path outputPath = new Path(uri + "/wordcount/result");
job.setNumReduceTasks(3);
FileSystem fs = FileSystem.get(new URI(uri), conf);
fs.delete(outputPath, true);
FileInputFormat.addInputPath(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
boolean res = job.waitForCompletion(true);
System.out.println("统计结果:");
FileStatus[] fileStatuses = fs.listStatus(outputPath);
for (int i = 1; i < fileStatuses.length; i++) {
System.out.println(fileStatuses[i].getPath());
FSDataInputStream in = fs.open(fileStatuses[i].getPath());
IOUtils.copyBytes(in, System.out, 4096, false);
}
if (res) {
return 0;
} else {
return -1;
}
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new WordCount(), args);
System.exit(res);
}
}
- 运行程序,查看你结果: