文章目录
- 一,案例分析
- (一)倒排索引介绍
- (二)案例需求
- 二,案例实施
- (一)准备数据文件
- (1)启动hadoop服务
- (2)虚拟机上创建文本文件
- (3)上传文件到HDFS指定目录
- (二)Map阶段实现
- (1)创建Maven项目:InvertedIndex
- (2)添加相关依赖
- (3)创建日志属性文件
- (4)创建倒排索引映射器类:InvertedIndexMapper
- (三)Combine阶段实现
- (1)创建倒排索引合并器类:InvertedIndexCombiner
- (四)Reduce阶段实现
- (1)创建倒排索引归并器类:InvertedIndexReducer
- (五)Driver主类实现
- (1)创建倒排索引驱动器类:InvertedIndexDriver
- (六)运行倒排索引驱动器类,查看结果
- (1)运行InvertedIndexDriver类
- (2)下载文件查看结果
一,案例分析
(一)倒排索引介绍
倒排索引是文档检索系统中最常用的数据结构,被广泛应用于全文搜索引擎。倒排索引主要用来存储某个单词(或词组)在一组文档中的存储位置的映射,提供了可以根据内容来查找文档的方式,而不是根据文档来确定内容,因此称为倒排索引(Inverted Index)。带有倒排索引的文件我们称为倒排索引文件,简称倒排文件(Inverted File)。
(二)案例需求
现假设有三个源文件file1.txt、file2.txt和file3.txt,需要使用倒排索引的方式对这三个源文件内容实现倒排索引,并将最后的倒排索引文件输出。
首先,使用默认的TextInputFormat类对每个输入文件进行处理,得到文本中每行的偏移量及其内容。Map过程首先分析输入的<key,value>键值对,经过处理可以得到倒排索引中需要的三个信息:单词、文档名称和词频。
经过Map阶段数据转换后,同一个文档中相同的单词会出现多个的情况,而单纯依靠后续Reduce阶段无法同时完成词频统计和生成文档列表,所以必须增加一个Combine阶段,先完成每一个文档的词频统计。
经过上述两个阶段的处理后,Reduce阶段只需将所有文件中相同key值的value值进行统计,并组合成倒排索引文件所需的格式即可。
二,案例实施
(一)准备数据文件
(1)启动hadoop服务
输入命令:start-all.sh
(2)虚拟机上创建文本文件
创建/export/mrtxt
目录,在里面创建三个文本文件 - file1.txt
、file2.txt
和file3.txt
(3)上传文件到HDFS指定目录
在hdfs上创建目录/mrtxt/input
,将三个文本文件 file1.txt
、file2.txt
、file3.txt
,上传到HDFS的/mrtxt/input
目录
在hadoop webui界面查看是否上传成功
(二)Map阶段实现
首先,使用IntelliJ开发工具创建Maven项目InvertedIndex
,并且新创建net.army.mr
包,在该路径下编写自定义Mapper类InvertedIndexMapper
,主要用于将文本中的单词按照空格进行切割,并以冒号拼接,“单词:文档名称”作为key
,单词次数作为value
,都以文本方式输出至Combine
阶段。
(1)创建Maven项目:InvertedIndex
1.设置如下图所示,单击【Create】按钮
(2)添加相关依赖
1.在pom.xml文件里添加hadoop和junit依赖,内容如下
<dependencies>
<!--hadoop客户端-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.4</version>
</dependency>
<!--单元测试框架-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
</dependency>
</dependencies>
2.添加好依赖后,单击刷新按钮下载相关依赖
3.下载好的本地依赖
(3)创建日志属性文件
1.右击【resources】,选择【New】,单击【Resource Bundle】
2.在弹出的对话框输入log4j,单击【OK】按钮
3.成功创建
4.向文件添加如下内容
log4j.rootLogger=INFO, stdout, logfile
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/invertedindex.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
(4)创建倒排索引映射器类:InvertedIndexMapper
1.右击【net.army.mr】包,选择【New】,单击【Java Class】
2.在弹出的对话框中输入:InvertedIndexMapper,按下回车键,成功创建
3.编写代码
package net.army.mr;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
/**
* 作者:梁辰兴
* 日期:2022/12/14
* 功能:倒排索引映射器类
*/
public class InvertedIndexMapper extends Mapper<LongWritable, Text, Text, Text> {
private static Text keyInfo = new Text(); // 存储单词和URL组合
private static final Text valueInfo = new Text("1"); // 存储词频,初始化为1
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 获取文件行数据
String line = value.toString();
// 拆分得到单词数组
String[] words = StringUtils.split(line, " ");
// 得到这行数据所在的文件切片
FileSplit fileSplit = (FileSplit) context.getInputSplit();
// 根据文件切片得到文件名
String fileName = fileSplit.getPath().getName();
for (String word : words) {
// key值由单词和URL组成,如“MapReduce:file1.txt”
keyInfo.set(word + ":" + fileName);
// 将键值对数据传入下一个阶段
context.write(keyInfo, valueInfo);
}
}
}
(三)Combine阶段实现
根据Map阶段的输出结果形式,在net.army.mr包下,自定义实现Combine阶段的类InvertedIndexCombiner,对每个文档的单词进行词频统计。
(1)创建倒排索引合并器类:InvertedIndexCombiner
1.右击【net.army.mr】包,选择【New】,单击【Java Class】
2.在弹出的对话框中输入:InvertedIndexCombiner,按下回车键,成功创建
3.编写代码
package net.army.mr;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* 作者:梁辰兴
* 日期:2022/12/14
* 功能:倒排索引合并器类
*/
public class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text> {
private static Text info = new Text();
// 输入: <MapReduce:file3.txt {1,1,...}>
// 输出: <MapReduce file3.txt:2>
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// 统计词频
int sum = 0;
for (Text value : values) {
sum += Integer.parseInt(value.toString());
}
// 获取分隔符冒号的位置
int splitIndex = key.toString().indexOf(":");
// 重新设置value值由URL和词频组成
info.set(key.toString().substring(splitIndex + 1) + ":" + sum);
// 重新设置key值为单词
key.set(key.toString().substring(0, splitIndex));
// 将键值对数据传入下一个阶段
context.write(key, info);
}
}
(四)Reduce阶段实现
根据Combine阶段的输出结果形式,同样在net.army.mr包下,自定义Reducer类InvertedIndexMapper,主要用于接收Combine阶段输出的数据,并最终案例倒排索引文件需求的样式,将单词作为key,多个文档名称和词频连接作为value,输出到目标目录。
(1)创建倒排索引归并器类:InvertedIndexReducer
1.右击【net.army.mr】包,选择【New】,单击【Java Class】
2.在弹出的对话框中输入:InvertedIndexReducer,按下回车键,成功创建
3.编写代码
package net.army.mr;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* 作者:梁辰兴
* 日期:2022/12/14
* 功能:倒排索引归并器类
*/
public class InvertedIndexReducer extends Reducer<Text, Text, Text, Text> {
private static Text result = new Text();
// 输入:<MapReduce file3.txt:2>
// 输出:<MapReduce file1.txt:1;file2.txt:1;file3.txt:2;>
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// 生成文档列表
String fileList = new String();
for (Text value : values) {
fileList += value.toString() + ";";
}
// 设置结果数据
result.set(fileList);
// 将键值对数据输出
context.write(key, result);
}
}
(五)Driver主类实现
编写MapReduce程序运行主类InvertedIndexDriver,主要用于设置MapReduce工作任务的相关参数,对HDFS上/mrtxt/input目录下的源文件实现倒排索引,并将结果输入到HDFS的/mrtxt/output目录下。
(1)创建倒排索引驱动器类:InvertedIndexDriver
1.右击【net.army.mr】包,选择【New】,单击【Java Class】
2.在弹出的对话框中输入:InvertedIndexDriver,按下回车键,成功创建
3.编写代码
package net.army.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.net.URI;
/**
* 作者:梁辰兴
* 日期:2022/12/14
* 功能:倒排索引驱动器类
*/
public class InvertedIndexDriver {
public static void main(String[] args) throws Exception {
// 创建配置对象
Configuration conf = new Configuration();
// 设置数据节点主机名属性
conf.set("dfs.client.use.datanode.hostname", "true");
// 获取作业实例
Job job = Job.getInstance(conf);
// 设置作业启动类
job.setJarByClass(InvertedIndexDriver.class);
// 设置Mapper类
job.setMapperClass(InvertedIndexMapper.class);
// 设置map任务输出键类型
job.setMapOutputKeyClass(Text.class);
// 设置map任务输出值类型
job.setMapOutputValueClass(Text.class);
// 设置Combiner类
job.setCombinerClass(InvertedIndexCombiner.class);
// 设置Reducer类
job.setReducerClass(InvertedIndexReducer.class);
// 设置reduce任务输出键类型
job.setOutputKeyClass(Text.class);
// 设置reduce任务输出值类型
job.setOutputValueClass(Text.class);
// 定义uri字符串
String uri = "hdfs://master:9000";
// 创建输入目录
Path inputPath = new Path(uri + "/mrtxt/input");
// 创建输出目录
Path outputPath = new Path(uri + "/mrtxt/output");
// 获取文件系统
FileSystem fs = FileSystem.get(new URI(uri), conf);
// 删除输出目录
fs.delete(outputPath, true);
// 给作业添加输入目录
FileInputFormat.addInputPath(job, inputPath);
// 给作业设置输出目录
FileOutputFormat.setOutputPath(job, outputPath);
// 等待作业完成
job.waitForCompletion(true);
// 输出统计结果
System.out.println("======统计结果======");
FileStatus[] fileStatuses = fs.listStatus(outputPath);
for (int i = 1; i < fileStatuses.length; i++) {
// 输出结果文件路径
System.out.println(fileStatuses[i].getPath());
// 获取文件输入流
FSDataInputStream in = fs.open(fileStatuses[i].getPath());
// 将结果文件显示在控制台
IOUtils.copyBytes(in, System.out, 4096, false);
}
}
}
(六)运行倒排索引驱动器类,查看结果
(1)运行InvertedIndexDriver类
(2)下载文件查看结果
注:事先进入mrtxt目录下,输入命令:
cd /export/mrtxt
1.下载文件,输入命令:hdfs dfs -get /mrtxt/output/part-r-00000
2.查看文件,输入命令:cat part-r-00000