文章目录
- 一、实战概述
- 二、提出任务
- 三、完成任务
- (一)准备数据
- 1、在虚拟机上创建文本文件
- 2、上传文件到HDFS指定目录
- (二)实现步骤
- 1、创建Maven项目
- 2、添加相关依赖
- 3、创建日志属性文件
- 4、创建网址去重映射器类
- 5、创建网址去重归并器类
- 6、创建网址去重统计驱动器类
- 7、启动应用,查看结果
- 四、实战总结
一、实战概述
-
本实战项目主要利用Hadoop MapReduce框架对多个文本文件中的IP地址进行整合并去除重复项。首先,在虚拟机上创建了三个包含IP地址列表的文本文件(ips01.txt、ips02.txt、ips03.txt),并将这些文件上传至HDFS上的/deduplicate/input目录作为原始数据。
-
接着,通过IntelliJ IDEA创建了一个Maven项目MRDeduplicateIPs,并添加了hadoop-client和junit相关依赖。在项目中定义了三个关键类:DeduplicateIPsMapper、DeduplicateIPsReducer和DeduplicateIPsDriver。
-
DeduplicateIPsMapper类作为Map阶段的处理单元,读取每行输入文本数据(表示一个IP地址),将IP地址作为新的键输出,并使用NullWritable类型的空值,以准备后续去重操作。
-
DeduplicateIPsReducer类则负责Reduce阶段的逻辑,它接收Mapper阶段输出的所有具有相同IP地址的键值对,并通过不遍历值迭代器的方式实现键(即IP地址)的去重,确保每个唯一IP地址仅被写入一次。
-
最后,DeduplicateIPsDriver类作为整个任务的驱动程序,负责配置和启动MapReduce作业。它设置了作业的输入与输出路径、Mapper和Reducer类,以及它们的键值类型。作业完成后,该类会遍历输出目录下的文件,读取并打印去重后的IP地址列表到控制台。
-
通过运行DeduplicateIPsDriver类启动应用,最终实现了从多个文本文件中提取并整合出一份仅包含唯一IP地址的结果集。
二、提出任务
- 三个包含IP地址列表的文本文件(ips01.txt、ips02.txt、ips03.txt)
- ips01.txt
192.168.1.1
172.16.0.1
10.0.0.1
192.168.1.2
192.168.1.3
172.16.0.2
10.0.0.2
192.168.1.1
172.16.0.1
10.0.0.3
- ips02.txt
192.168.1.4
172.16.0.3
10.0.0.4
192.168.1.5
192.168.2.1
172.16.0.4
10.0.1.1
192.168.1.1
172.16.0.1
10.0.0.1
- ips03.txt
192.168.1.6
172.16.1.1
10.0.2.1
192.168.1.7
192.168.3.1
172.16.0.5
10.0.0.5
192.168.1.1
172.16.0.1
10.0.0.3
- 使用MR框架,实现网址去重
三、完成任务
(一)准备数据
1、在虚拟机上创建文本文件
- 在master虚拟机上使用文本编辑器创建三个文件:
ips01.txt
,ips02.txt
,ips03.txt
,并确保每个文件内存储的是纯文本格式的IP地址列表。
2、上传文件到HDFS指定目录
- 在master虚拟机上创建HDFS上的
/deduplicate/input
目录,用于存放待处理的原始数据文件。 - 执行命令:
hdfs dfs -mkdir -p /deduplicate/input
- 将本地创建的三个文本文件上传至HDFS的
/deduplicate/input
目录hdfs dfs -put ips01.txt /deduplicate/input/ hdfs dfs -put ips02.txt /deduplicate/input/ hdfs dfs -put ips03.txt /deduplicate/input/
- 执行上述命令
(二)实现步骤
- 说明:集成开发环境IntelliJ IDEA版本 -
2022.3
1、创建Maven项目
-
Maven项目 -
MRDeduplicateIPs
,设置了JDK版本 -1.8
,组标识 -net.huawei.mr
-
单击【Create】按钮,得到初始化项目
2、添加相关依赖
- 在
pom.xml
文件里添加hadoop-client
和junit
依赖
<dependencies>
<!--hadoop客户端-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.4</version>
</dependency>
<!--单元测试框架-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
</dependency>
</dependencies>
- 刷新项目依赖
3、创建日志属性文件
- 在
resources
目录里创建log4j.properties
文件
log4j.rootLogger=ERROR, stdout, logfile
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/deduplicateips.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
4、创建网址去重映射器类
- 创建
net.huawei.mr
包,在包里创建DeduplicateIPsMapper
类
package net.huawei.mr;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* 功能:网址去重映射器类
* 作者:华卫
* 日期:2024年01月05日
*/
public class DeduplicateIPsMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 获取行内容
String ip = value.toString();
// 将<ip,null>键值对写入中间结果
context.write(new Text(ip), NullWritable.get());
}
}
-
这段代码是Hadoop MapReduce编程框架中的一个Mapper类实现,名为
DeduplicateIPsMapper
,用于处理URL去重问题。虽然注释中提到的是“网址去重”,但实际代码逻辑仅针对IP地址进行操作。 -
在Map阶段,该类继承自
org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, NullWritable>
-
输入键类型为
LongWritable
,通常表示文本行号; -
输入值类型为
Text
,存储一行原始数据(在这里应是IP地址); -
输出键类型为
Text
,用于输出去重后的IP地址; -
输出值类型为
NullWritable
,由于此处仅需去重并不需要具体值,所以使用空值。 -
map()
方法是Mapper的主体逻辑部分,在每次调用时接收一行输入数据(键和值)。它首先将输入值(即每行文本内容)转换成字符串类型的IP地址,然后将这个IP地址作为新的键输出,并与NullWritable类型的空值一起写入到中间结果中。通过这种方式,Map阶段结束后,相同的IP地址会被归并到一起,以便后续Reducer阶段进一步处理以达到去重的目的。
5、创建网址去重归并器类
- 在
net.huawei.mr
包里创建DeduplicateIPsReducer
package net.huawei.mr;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* 功能:网址去重归并器类
* 作者:华卫
* 日期:2024年01月05日
*/
public class DeduplicateIPsReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context)
throws IOException, InterruptedException {
// 不遍历值迭代器,就可以实现键去重
context.write(key, NullWritable.get());
}
}
-
这段代码是Hadoop MapReduce编程框架中的一个Reducer类实现,名为
DeduplicateIPsReducer
,用于处理URL去重问题。尽管注释中提到的是“网址去重”,但实际代码逻辑只针对IP地址进行操作。 -
在Reduce阶段,该类继承自
org.apache.hadoop.mapreduce.Reducer<Text, NullWritable, Text, NullWritable>
-
输入键类型为
Text
,存储Map阶段输出的去重后的IP地址; -
输入值类型为
Iterable<NullWritable>
,由于Mapper阶段输出的值为NullWritable,因此这里接收一组空值; -
输出键类型仍为
Text
,保持与Mapper阶段一致,输出去重后的唯一IP地址; -
输出值类型也仍为
NullWritable
,表示在这个任务中我们仅关注IP地址的去重,不需要额外信息。 -
reduce()
方法是Reducer的核心逻辑部分,在此场景下,当多个相同的IP地址(键)被归并到一起时,无需遍历值迭代器(因为所有值都是NullWritable的空值),只需将接收到的每个唯一的IP地址作为键输出即可,从而达到去除重复IP的目的。通过这种方式,Reduce阶段结束后,输出结果中每个IP地址都只出现一次。
6、创建网址去重统计驱动器类
- 在
net.huawei.mr
包里,创建DeduplicateIPsDriver
类
package net.huawei.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.net.URI;
/**
* 功能:网址去重驱动器类
* 作者:华卫
* 日期:2024年01月05日
*/
public class DeduplicateIPsDriver {
public static void main(String[] args) throws Exception {
// 创建配置对象
Configuration conf = new Configuration();
// 设置客户端使用数据节点主机名属性
conf.set("dfs.client.use.datanode.hostname", "true");
// 获取作业实例
Job job = Job.getInstance(conf);
// 设置作业启动类
job.setJarByClass(DeduplicateIPsDriver.class);
// 设置Mapper类
job.setMapperClass(DeduplicateIPsMapper.class);
// 设置map任务输出键类型
job.setMapOutputKeyClass(Text.class);
// 设置map任务输出值类型
job.setMapOutputValueClass(NullWritable.class);
// 设置Reducer类
job.setReducerClass(DeduplicateIPsReducer.class);
// 设置reduce任务输出键类型
job.setOutputKeyClass(Text.class);
// 设置reduce任务输出值类型
job.setOutputValueClass(NullWritable.class);
// 定义uri字符串
String uri = "hdfs://master:9000";
// 创建输入目录
Path inputPath = new Path(uri + "/deduplicate/input");
// 创建输出目录
Path outputPath = new Path(uri + "/deduplicate/output");
// 获取文件系统
FileSystem fs = FileSystem.get(new URI(uri), conf);
// 删除输出目录(第二个参数设置是否递归)
fs.delete(outputPath, true);
// 给作业添加输入目录(允许多个)
FileInputFormat.addInputPath(job, inputPath);
// 给作业设置输出目录(只能一个)
FileOutputFormat.setOutputPath(job, outputPath);
// 等待作业完成
job.waitForCompletion(true);
// 输出统计结果
System.out.println("======统计结果======");
FileStatus[] fileStatuses = fs.listStatus(outputPath);
for (int i = 1; i < fileStatuses.length; i++) {
// 输出结果文件路径
System.out.println(fileStatuses[i].getPath());
// 获取文件系统数据字节输入流
FSDataInputStream in = fs.open(fileStatuses[i].getPath());
// 将结果文件显示在控制台
IOUtils.copyBytes(in, System.out, 4096, false);
}
}
}
- 这段代码是Hadoop MapReduce框架下的一个驱动器类(Driver)实现,名为
DeduplicateIPsDriver
,用于处理URL去重问题。它主要负责设置MapReduce作业的相关配置信息,并启动整个作业流程。
-
首先创建一个Hadoop Configuration对象并设置相关属性,如“dfs.client.use.datanode.hostname”,以便正确连接到HDFS数据节点。
-
初始化Job实例,并通过
job.setJarByClass()
方法指定作业的主类(即该驱动器类),使得Hadoop能够找到运行作业所需的JAR包。 -
设置作业的Mapper和Reducer类分别为
DeduplicateIPsMapper
和DeduplicateIPsReducer
,同时设定它们的输入输出键值类型。 -
定义HDFS上输入与输出目录的URI路径,并使用FileSystem API获取文件系统实例,删除预先存在的输出目录以确保每次运行时结果都是新的。
-
将输入目录添加到作业中,设置唯一的输出目录。
-
调用
job.waitForCompletion(true)
方法启动并等待作业完成。 -
作业完成后,遍历输出目录下的所有文件(除成功标志文件外),打开每个文件并将其内容读取并打印到控制台,从而展示去重后的结果。
- 总之,此驱动器类将配置、初始化及执行一个完整的MapReduce作业,该作业的主要功能是对存储在HDFS上的IP地址进行去重处理。
7、启动应用,查看结果
- 运行
DeduplicateIPsDriver
类,查看结果
四、实战总结
- 本实战项目利用Hadoop MapReduce框架,通过自定义的DeduplicateIPsMapper和DeduplicateIPsReducer类处理三个文本文件中的IP地址数据。Mapper阶段读取每行IP并作为键输出,Reducer阶段对相同键(IP)进行归并去重。在DeduplicateIPsDriver驱动类中配置了作业属性、输入输出路径以及Map和Reduce阶段所使用的类,并成功执行了任务。最终,从原始文本数据中提取出一份不重复的IP地址集合。整个过程展示了MapReduce框架高效处理大规模数据集及实现特定业务逻辑的能力。