说明
以下内容仅供参考,提到不代表考到,请结合实际情况自己复习
目录
说明
一、题型及分值
二、综合案例题-部署Hadoop集群 或 部署Hadoop HA集群
案例 1:Hadoop 基础集群部署
案例 2:Hadoop HA 集群部署
案例 3:集群性能优化
案例 4:故障排查与恢复
案例 5:多集群协同
三、名称解释(8选5)
1.什么是大数据
2.大数据的5V特征
3.什么是SSH
4.HDFS(p32)
5.名称节点
6.数据节点
7.元数据
8.倒排索引
9.单点故障
10.高可用
11.数据仓库
四、简答题
1、简述Hadoop的优点及其含义
2.简述独立模式、伪分布式模式和完全分布式模式部署Hadoop的区别
3.简述HDFS的健壮性
4.简述YARN基本架构的组成部分及其作用
5.简述不同类型ZNode的区别
6.简述Hadoop高可用集群初次启动时的步骤
7.简述Hive中分区和桶的作用
五、Hive代码题
题目: 电商订单分析
答案
六、HDFS代码题
课本资料
相关题目
题目 1: 创建目录并上传文件
题目 2: 查看文件信息
题目 3: 数据移动与复制
题目 4: 删除操作
题目 5: 文件权限设置
题目 6: 文件备份与验证
七、MapReduce编程
题目 1: 单词计数 (Word Count)
题目 2: 最大值求解 (Max Value Finder)
题目 3: 平均值计算 (Average Calculation)
题目 4: Top K 单词统计
思路
题目 5: 日志分析
题目 6: 用户购买分析
题目 7: 倒排索引 (Inverted Index)
题目 8: 用户商品共现分析
题目 9: 数据去重
题目 10: 分组统计 (Group By)
一、题型及分值
1.综合案例题(35分)
2.名词解释(每个3分,共15分)
3.简答题(每题6分,共30分)
4.编程题(共3题,共20分)
二、综合案例题-部署Hadoop集群 或 部署Hadoop HA集群
这部分内容建议观看课本
tar -zxvf jdk-8u... -C /export/...
在Linux系统中,tar命令用于打包和解包文件。以下是tar命令中 -zxvf 和 -C 选项的含义:
-z:这个选项表示同时通过gzip进行压缩或解压缩。如果.tar文件实际上是一个.tar.gz或.tgz文件,这个选项是必要的。
-x:这个选项代表解包(extract)一个.tar文件。
-v:这个选项用于在处理文件时显示详细信息(verbose),它会列出正在处理的文件,这样用户可以看到解压的进度和具体内容。
-f:这个选项用于指定要处理的文件名。在tar命令中,-f通常是最后一个选项,并且后面直接跟着要操作的文件名。
所以,-zxvf组合起来就是告诉tar命令:解压缩一个用gzip压缩的.tar.gz文件,并在处理过程中显示详细信息。
-C:这个选项告诉tar命令在指定的目录中解包文件。在上面的命令中,-C /export/…表示将文件解压到/export/…这个目录下。
综上所述,tar -zxvf jdk-8u... -C /export/…命令的作用是:以详细方式解压名为jdk-8u...的gzip压缩的tar文件,并将其内容解压到/export/…目录中。注意,jdk-8u...是文件名的占位符,你需要替换为实际的文件名。
# 验证Hadoop是否安装成功
bin/hadoop version
# 启动Hadoop
start-dfs.sh
# 启动yarn
start-yarn.sh
# 查看hadoop运行状态
jps
# 查看jdk是否安装成功
java -version
案例 1:Hadoop 基础集群部署
背景: 某公司计划部署一个基本的 Hadoop 集群,要求包含一个 NameNode 和多个 DataNode。公司要求能够顺利存储和处理 10 TB 的数据。
问题:
-
设计集群架构,明确节点的数量和角色分配。
-
描述如何配置
core-site.xml
和hdfs-site.xml
,确保集群可以正常启动并运行。 -
为了实现 MapReduce 作业,如何配置
mapred-site.xml
和yarn-site.xml
? -
如果添加一个新的 DataNode,该如何操作?
答案:
-
设计集群架构:
-
1 个 NameNode,2-3 个 DataNode(视服务器性能与数据量调整)。
-
每个节点运行操作系统(推荐 Linux)。
-
Master 节点运行 NameNode 和 ResourceManager,Slave 节点运行 DataNode 和 NodeManager。
-
-
配置
core-site.xml
和hdfs-site.xml
:
配置 mapred-site.xml
和 yarn-site.xml
:
-
添加新的 DataNode 操作:
-
安装 Hadoop,配置
core-site.xml
和hdfs-site.xml
指向 NameNode 的地址。 -
格式化 DataNode 数据目录:
hdfs datanode -format
。 -
启动 DataNode 服务:
hadoop-daemon.sh start datanode
。
-
案例 2:Hadoop HA 集群部署
背景: 为了提高可靠性,公司计划将现有的单点 NameNode 改为高可用(HA)模式。集群中有两台主机分别作为 Active NameNode 和 Standby NameNode,并通过 ZooKeeper 进行管理。
问题:
-
描述如何配置 HDFS 的 HA 功能,包括必要的配置文件和关键参数。
-
配置 JournalNode,解释其作用及最低运行数量要求。
-
如果 Active NameNode 宕机,系统如何实现自动切换到 Standby NameNode?需要哪些关键组件?
-
模拟 NameNode 切换的过程,验证 HA 功能是否正常运行
案例 3:集群性能优化
背景: 集群在运行大规模作业时,经常出现作业延迟或失败。系统管理员需要优化集群性能。
问题:
-
分析可能导致作业延迟的原因(从硬件、网络、配置等角度)。
-
提出三种优化 HDFS 性能的策略(例如 Block 大小调整)。
-
YARN 中如何配置以确保资源分配更加均衡?
-
描述如何使用 Ganglia 或 Prometheus 监控 Hadoop 集群性能。
案例 4:故障排查与恢复
背景: 运行中的 Hadoop 集群出现以下问题:
-
某些 DataNode 状态为
Dead
。 -
Active NameNode 停止响应。
-
某些作业无法正常调度。
问题:
-
针对 DataNode 状态为
Dead
的问题,描述可能的原因和修复方法。 -
Active NameNode 停止响应时,如何手动切换到 Standby NameNode?
-
某些作业无法调度,如何排查和解决 YARN 资源不足的问题?
-
描述如何通过配置快照和备份机制实现 HDFS 数据恢复。
案例 5:多集群协同
背景: 公司计划在两个地理位置上分别部署 Hadoop 集群 A 和 B,并希望能够实现跨集群的数据同步。
问题:
-
描述如何配置 HDFS Federation 实现多集群协同。
-
如果需要在集群 A 和集群 B 之间同步数据,如何利用
DistCp
工具完成? -
为了提高同步效率,可以采取哪些优化措施?
-
讨论跨集群数据传输时的安全性考虑及配置(例如数据加密)。
三、名称解释(8选5)
1.什么是大数据
大数据是指海量、多样、快速流转且价值密度低的数据集合,其核心价值在于通过先进技术加工处理,实现数据增值。
2.大数据的5V特征
大数据的特征包括大量(Volume)、真实(Veracity)、多样(Variety)、低价值密度(Value)和高速(Velocity)
3.什么是SSH
SSH是一种网络协议,主要用于在不安全网络上提供安全的远程登录和其他安全网络服务。它能够加密网络连接,确保在客户端和服务器之间传输的数据不会轻易被窃取或篡改。
4.HDFS(p32)
HDFS是Hadoop Distributed File System的缩写,中文称为Hadoop分布式文件系统,专为大规模数据集的处理而设计,主要用于存储和管理海量数据文件。
5.名称节点
课本解释:NameNode是HDFS集群的名称节点,通常称为主节点。如果NameNode由于故障原因而无法使用,那么用户就无法访问HDFS。也就是说,NameNode作为HDFS的主节点,起着至关重要的部分
gpt解释:名称节点(NameNode)是Hadoop分布式文件系统(HDFS)中的核心组件之一,主要负责存储文件的元数据信息和处理客户端对文件的访问请求
6.数据节点
DataNode是HDFS集群中的数据节点,通常称为从节点,其主要功能如下:
-
存储Block
-
根据NameNode的指令对Block进行创建、复制、删除等操作
-
定期向NameNode汇报自身存储的Block列表及健康状态
-
负责为客户端发起的读写请求提供服务
7.元数据
MetaData用于记录HDFS文件系统的相关信息,这些信息称为元数据,元数据的内容包括文件系统的目录结构、文件名、文件路径、文件大小、文件副本数、文件与Block的映射关系,以及Block与DataNode的映射关系等信息
8.倒排索引
倒排索引是文档检索系统中最常用的数据结构,被广泛应用于全文搜索引擎。倒排索引主要用来存储某个单词或词组在一组文档中的存储位置的映射,提供了可以根据内容查找文档的方式,而不是根据文档确定内容,因此称为倒排索引。
9.单点故障
在HDFS集群中,NameNode是主节点,它的运行状态决定着HDFS集群是否可用。然而在Hadoop设计之初,HDFS集群只能存在一个NameNode节点,这种设计的缺点是NameNode节点一旦发生故障,就会导致HDFS集群不可用,这就是所谓的单点故障问题
10.高可用
Hadoop通过在HDFS集群中配置多个NameNode(一个Active,多个Standby)来确保系统连续运行,当Active NameNode故障时,自动选举新的Active NameNode,防止单点故障。
11.数据仓库
数据仓库是一个面向主题、集成的、相对稳定和反映历史变化的数据集合,用于企业或组织的决策分析。
四、简答题
有的来自于书本,有的来自于AI(因为书本内容过多)
1、简述Hadoop的优点及其含义
(1)低成本,可用多台廉价机组建集群,分布式处理大数据,降低成本。
(2)高可靠性,自动保存数据副本,避免数据丢失。
(3)高容错性,自动检测并应对故障,通过任务转移,防止任务失败。
(4)高效率,Hadoop可高效的执行并行计算,且在各个计算机中动态地移动计算。
(5)高扩展性,可随时添加更多的计算机,增加集群存储,计算能力。
2.简述独立模式、伪分布式模式和完全分布式模式部署Hadoop的区别
(1)独立模式:本地独立模式不进行任何配置,是Hadoop的默认工作模式,所有组件都在同一台机器运行,适用于学习和体验。
(2)伪分布模式:也是在一台单机上运行,通过单节点模拟分布式,但部署的Hadoop集群是一个伪分布式系统,适合本地开发和验证。
(3)完全分布模式:是一种在多台计算机JVM进程中运行Hadoop集群的工作模式,所有组件分布在多台机器上,部署的集群是完全分布式系统,适用于生产环境。
3.简述HDFS的健壮性
其健壮性可表现为:在HDFS出现故障的情况下可靠的存储数据,其运用了心跳机制、副本机制、数据完整性校验、安全模式和快照 5 种策略保证了数据存储的可靠性
4.简述YARN基本架构的组成部分及其作用
YARN 基本架构由 ResourceManager、ApplicationMaster、NodeManager 和 Container 组成,其中,ResourceManager 为全局资源管理器,负责整个系统的资源管理和分配;ApplicationMaster每个应用程序特有的,负责单个应用程序的管理;NodeManager 负责在节点上启动和管理;Container(容器);Container 封装了每个应用程序使用的资源。
5.简述不同类型ZNode的区别
ZooKeeper中的ZNode类型主要有以下区别:
-
持久节点:除非手动删除,否则一直存在。
-
临时节点:随客户端会话结束而自动删除,不能有子节点。
-
顺序节点:在持久或临时节点基础上,创建时带唯一递增序号,用于记录创建顺序。
6.简述Hadoop高可用集群初次启动时的步骤
1.启动JournalNode
hdfs -- daemon start journalnode
2.格式化HDFS文件系统
hdfs namenode -format
3.同步NameNode
scp -r /export/data/hadoop/namenode/ hadoop2:/export/data/hadoop/
4.格式化ZKFC
hdfs zkfc -formatZK
5.启动HDFS
start-dfs.sh
6.启动YARN
start-yarn.sh
7.简述Hive中分区和桶的作用
分区:将表数据按规则划分存储在不同目录,减少全表扫描,提高查询效率。
桶:按规则将数据均匀分布在不同文件中,避免数据倾斜,优化查询性能。
五、Hive代码题
Hive实践作业三
7.5 数据库操作
7.6 表操作
1.在hadoop1中执行start-dfs.sh和start-yarn.sh分别启动hdfs和yarn,
保证hadoop完全分布式集群正常启动,
hadoop1 jps NameNode ResourceManager
hadoop2 jps NodeManager DataNode SecondaryNameNode
hadoop3 jps NodeManager DataNode
2.在hadoop3中执行systemctl status mysqld验证mysql80正常启动
3.在hadoop3中启动MetaStore服务,hive --service metastore
4.hadoop3复制会话,启动HiveServer2服务,hive --service hiveserver2
5.hadoop3再次复制会话,jps 多了两个RunJar进程,元数据存储系统和HiveServer2正常启动
6.在hadoop2中,执行 hive 登录
7.hadoop2复制会话,执行 beeline -u jdbc:hive2://hadoop3:10000 -n root 登录
8.在登录的hive中,输入:show databases;
9.数据库操作
创建数据库 create database homework;
查看数据库 describe homework;
切换数据库 use homework;
10.表操作
complex表的创建
create table complex(
col1 array<int>,
col2 map<int,string>,
col3 struct<a:string,b:int,c:double>,
col4 string,
col5 int
);
complex表的查看
desc complex;
user_p分区表的创建
create table user_p (id int, name string)
partitioned by (gender string)
row format delimited fields terminated by ',';
user_p分区表的查看
desc user_p;
array_test内部表的创建
create table array_test(
name string,
score array<int>
)
row format delimited fields terminated by '@'
collection items terminated by ',';
array_test内部表导入数据
zhangshan@89,88,97,80
lisi@90,95,99,97
wangwu@90,77,88,79
zhaoliu@91,79,98,89
文本文件array_test.txt需要先创建,在hadoop3上哦
load data local inpath '/root/array_test.txt' into table homework.array_test;
array_test内部表查询数据
select * from homework.array_test;
map_test内部表的创建
create table map_test(
name string,
score map<string,int>
)
row format delimited fields terminated by '@'
collection items terminated by ','
map keys terminated by ':';
map_test内部表导入数据
zhangshan@math:90,english:89,java:88,hive:80
lisi@math:98,english:79,java:96,hive:92
wangwu@math:88,english:86,java:89,hive:88
zhaoliu@math:89,english:78,java:79,hive:77
文本文件map_test.txt需要先创建,在hadoop3上哦
load data local inpath '/root/map_test.txt' into table homework.map_test;
map_test内部表查询数据
select * from homework.map_test;
select name from homework.map_test;
select score from homework.map_test;
题目: 电商订单分析
某电商公司需要构建订单管理系统,并在 Hive 中完成以下任务:
数据描述
订单数据由以下信息组成:
-
order_id
(订单ID, INT) -
customer_id
(客户ID, INT) -
order_date
(订单日期, STRING,格式:yyyy-MM-dd) -
order_items
(订单商品,ARRAY<STRUCT<item_id: INT, item_name: STRING, quantity: INT, price: FLOAT>>) -
order_details
(订单详情,MAP<STRING, STRING>,包含键值对如payment_method -> credit_card
,delivery_status -> delivered
)
任务要求:
任务 1: 内部表创建
创建一个内部表 orders_internal
,用于存储上述订单数据。
任务 2: 外部表创建
创建一个外部表 orders_external
,数据存储在 HDFS 的 /data/orders_external/
目录下。
任务 3: 分区表创建
创建一个按 order_date
分区的表 orders_partitioned
,优化按日期范围查询的性能。
任务 4: 数据插入
为三个表分别插入以下样例数据:
订单1:
order_id: 101, customer_id: 1, order_date: '2024-12-01'
order_items: [
{item_id: 201, item_name: 'Laptop', quantity: 1, price: 1000.0},
{item_id: 202, item_name: 'Mouse', quantity: 2, price: 25.0}
]
order_details: {'payment_method': 'credit_card', 'delivery_status': 'delivered'}
订单2:
order_id: 102, customer_id: 2, order_date: '2024-12-02'
order_items: [
{item_id: 203, item_name: 'Keyboard', quantity: 1, price: 50.0}
]
order_details: {'payment_method': 'paypal', 'delivery_status': 'shipped'}
任务 5: 查询操作
-
查询所有已完成(
delivery_status = delivered
)订单的客户ID及商品明细。 -
查询每个客户的总消费金额。
答案
任务 1: 内部表创建
CREATE TABLE orders_internal (
order_id INT,
customer_id INT,
order_date STRING,
order_items ARRAY<STRUCT<item_id: INT, item_name: STRING, quantity: INT, price: FLOAT>>,
order_details MAP<STRING, STRING>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':';
任务 2: 外部表创建
CREATE EXTERNAL TABLE orders_external (
order_id INT,
customer_id INT,
order_date STRING,
order_items ARRAY<STRUCT<item_id: INT, item_name: STRING, quantity: INT, price: FLOAT>>,
order_details MAP<STRING, STRING>
)
STORED AS TEXTFILE
LOCATION '/data/orders_external/';
任务 3: 分区表创建
CREATE TABLE orders_partitioned (
order_id INT,
customer_id INT,
order_items ARRAY<STRUCT<item_id: INT, item_name: STRING, quantity: INT, price: FLOAT>>,
order_details MAP<STRING, STRING>
)
PARTITIONED BY (order_date STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':';
任务 4: 数据插入
插入内部表:
INSERT INTO TABLE orders_internal VALUES
(101, 1, '2024-12-01',
ARRAY(
NAMED_STRUCT('item_id', 201, 'item_name', 'Laptop', 'quantity', 1, 'price', 1000.0),
NAMED_STRUCT('item_id', 202, 'item_name', 'Mouse', 'quantity', 2, 'price', 25.0)
),
MAP('payment_method', 'credit_card', 'delivery_status', 'delivered')
),
(102, 2, '2024-12-02',
ARRAY(
NAMED_STRUCT('item_id', 203, 'item_name', 'Keyboard', 'quantity', 1, 'price', 50.0)
),
MAP('payment_method', 'paypal', 'delivery_status', 'shipped')
);
插入外部表: 将数据手动存储在 /data/orders_external/
,格式如下:
101 1 2024-12-01 [{"item_id":201,"item_name":"Laptop","quantity":1,"price":1000.0},{"item_id":202,"item_name":"Mouse","quantity":2,"price":25.0}] {"payment_method":"credit_card","delivery_status":"delivered"}
102 2 2024-12-02 [{"item_id":203,"item_name":"Keyboard","quantity":1,"price":50.0}] {"payment_method":"paypal","delivery_status":"shipped"}
加载数据后,直接查询外部表。
插入分区表:
INSERT INTO TABLE orders_partitioned PARTITION(order_date='2024-12-01') VALUES
(101, 1,
ARRAY(
NAMED_STRUCT('item_id', 201, 'item_name', 'Laptop', 'quantity', 1, 'price', 1000.0),
NAMED_STRUCT('item_id', 202, 'item_name', 'Mouse', 'quantity', 2, 'price', 25.0)
),
MAP('payment_method', 'credit_card', 'delivery_status', 'delivered')
);
INSERT INTO TABLE orders_partitioned PARTITION(order_date='2024-12-02') VALUES
(102, 2,
ARRAY(
NAMED_STRUCT('item_id', 203, 'item_name', 'Keyboard', 'quantity', 1, 'price', 50.0)
),
MAP('payment_method', 'paypal', 'delivery_status', 'shipped')
);
任务 5: 查询操作
1. 查询所有已完成(delivery_status = delivered
)订单的客户ID及商品明细:
SELECT
customer_id,
order_items
FROM
orders_internal
WHERE
order_details['delivery_status'] = 'delivered';
2. 查询每个客户的总消费金额:
SELECT
customer_id,
SUM(total_price) AS total_spent
FROM (
SELECT
customer_id,
item.quantity * item.price AS total_price
FROM
orders_internal
LATERAL VIEW EXPLODE(order_items) exploded_items AS item
) t
GROUP BY
customer_id;
六、HDFS代码题
课本资料
dfs常用的子命令选项
子命令选项 | 功能描述 |
---|---|
-ls | 查看指定目录信息 |
-du | 查看指定目录下每个文件和子目录的大小,子目录也可以看作单独的目录,因为它也可 以存在于目录 |
-mv | 移动到指定文件或目录 |
-cp | 复制指定文件或目录 |
-rm | 删除指定文件或目录 |
-put | 将本地文件系统中的指定文件传到 HDFS 指定目录 |
-cat | 查看指定文件的内容 |
-help | 查看帮助文档 |
-mkdir | 创建目录 |
-get | 将 HDFS 的指定文件下载到本地文件系统 |
1.查看目录 /data的信息
hdfs dfs -ls -S /data
-S 按照由大到小的顺序显示指定目录的内容
根据文件内容大小,按照由小到大的顺序显示目录 /data的内容,并将默认的文件大小格式化为便于查看的格式进行显示
hdfs dfs -ls -r -h /data
-r 根据文件大小按照由小到大的顺序显示目录
-h 将默认文件大小(字节数)格式化为便于查看的格式进行显示
递归显示目录/data及其子目录的信息,信息中仅显示文件和子目录的路径
hdfs dfs -ls -R -C /data
-R 递归显示目录/data及其子目录的信息
-C 信息中仅显示文件和子目录的路径
2.在HDFS的目录/data中创建子目录/dataChild1。并在子目录/dataChild1中创建子目录/dataChild2
hdfs dfs -mkdir -p /data/dataChild1/dataChild2
3.查看/data中每个文件和子目录的大小,并将默认的文件和子目录大小格式化为便于查看的格式进行显示
hdfs dfs -du -h /data
4.将目录/data中的子目录/dataChild1 移动到目录/data/dataChild中
hdfs dfs -mv /dataChild1 /data/dataChild
将目录/data中的文件dataA 重命名为dataA_New
hdfs dfs -mv /data/dataA /data/dataA_New
5.将目录/data下的文件dataA_New 和 dataB复制到目录/data/dataChild
hdfs dfs -cp /data/dataA_New /data/dataB_New /data/dataChild
将目录/data下的文件 dataA_New复制到子目录/dataChild,并将其重命名为dataA
hdfs dfs -cp /data/dataA_New /data/dataChild/dataA
6.删除目录/data的子目录/dataChild
hdfs dfs -rm -r /data/dataChild
7.将本地文件系统中/export/data目录下文件a.txt 和 b.txt上传到HDFS的目录/data
hdfs dfs -put /export/data/a.txt /export/data/b.txt /data
8.查看目录/data中的文件a.txt的内容
hdfs dfs -cat /data/a.txt
9.将HDFS中目录/data中的文件a.txt和b.txt 下载到本地文件系统/opt目录下
hdfs dfs -get /data/a.txt /data/b.txt /opt
相关题目
题目 1: 创建目录并上传文件
描述: 假设你在 HDFS 的根目录下,需要完成以下操作:
-
在 HDFS 中创建一个名为
/user/yourname/data
的目录。 -
将本地目录
/local/data/input.txt
中的文件上传到刚创建的 HDFS 目录中。
要求: 编写 Shell 命令完成以上任务。
题目 2: 查看文件信息
描述: 假设 HDFS 中已经存在目录 /user/yourname/data/input.txt
,需要完成以下操作:
-
查看该文件的详细信息(包括文件权限、大小等)。
-
显示该文件的内容。
要求: 写出相应的 HDFS Shell 命令。
题目 3: 数据移动与复制
描述:
-
将文件
/user/yourname/data/input.txt
移动到 HDFS 中的/user/yourname/archive/
目录下。 -
将文件
/user/yourname/archive/input.txt
复制回/user/yourname/data/
目录。
要求: 提供具体的 HDFS Shell 命令。
题目 4: 删除操作
描述: 删除 HDFS 中的 /user/yourname/data
目录及其内容,并验证该目录是否被成功删除。
要求: 写出执行以上操作的 Shell 命令。
题目 5: 文件权限设置
描述: 假设 /user/yourname/data/input.txt
文件需要满足以下权限要求:
-
文件所有者可以读写;
-
文件所在组成员只能读取;
-
其他用户无权限。
要求: 提供修改文件权限的 HDFS Shell 命令。
题目 6: 文件备份与验证
描述:
-
将 HDFS 中的
/user/yourname/data/input.txt
备份到/user/yourname/backup/input.txt
。 -
验证备份文件与原文件的内容是否一致。
要求: 写出完整的 Shell 命令。
七、MapReduce编程
题目 1: 单词计数 (Word Count)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCount {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] tokens = value.toString().split("\\s+");
for (String token : tokens) {
word.set(token);
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
题目 2: 最大值求解 (Max Value Finder)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class MaxValue {
public static class MaxMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static Text keyOut = new Text("Max");
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
int num = Integer.parseInt(value.toString());
context.write(keyOut, new IntWritable(num));
}
}
public static class MaxReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int max = Integer.MIN_VALUE;
for (IntWritable val : values) {
max = Math.max(max, val.get());
}
context.write(key, new IntWritable(max));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "max value");
job.setJarByClass(MaxValue.class);
job.setMapperClass(MaxMapper.class);
job.setReducerClass(MaxReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
题目 3: 平均值计算 (Average Calculation)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class AverageCalculation {
public static class AvgMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static Text keyOut = new Text("Average");
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
int num = Integer.parseInt(value.toString());
context.write(keyOut, new IntWritable(num));
}
}
public static class AvgReducer extends Reducer<Text, IntWritable, Text, Text> {
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0, count = 0;
for (IntWritable val : values) {
sum += val.get();
count++;
}
double average = (double) sum / count;
context.write(key, new Text(String.format("%.2f", average)));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "average calculation");
job.setJarByClass(AverageCalculation.class);
job.setMapperClass(AvgMapper.class);
job.setReducerClass(AvgReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
题目 4: Top K 单词统计
答案:通过两阶段 MapReduce 实现:第一阶段统计单词频率,第二阶段从中找出频率最高的 K 个单词。
第一阶段:统计单词频率
这部分代码与常规单词计数类似。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordFrequency {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] tokens = value.toString().split("\\s+");
for (String token : tokens) {
word.set(token);
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word frequency");
job.setJarByClass(WordFrequency.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
第二阶段:提取 Top K
思路
第二阶段通过输入第一阶段的输出结果,将频率和单词对交换,按频率降序排序,选出频率最高的 K 个单词
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.TreeMap;
public class TopKWords {
public static class SwapMapper extends Mapper<Object, Text, IntWritable, Text> {
private IntWritable frequency = new IntWritable();
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] parts = value.toString().split("\\s+");
if (parts.length == 2) {
word.set(parts[0]);
frequency.set(Integer.parseInt(parts[1]));
context.write(frequency, word); // 倒置键值对,频率作为 key
}
}
}
public static class TopKReducer extends Reducer<IntWritable, Text, Text, IntWritable> {
private TreeMap<Integer, String> topKMap = new TreeMap<>();
private int K = 10; // 设置需要的 Top K 值
public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text val : values) {
topKMap.put(key.get(), val.toString());
if (topKMap.size() > K) {
topKMap.remove(topKMap.firstKey()); // 保持大小为 K
}
}
}
protected void cleanup(Context context) throws IOException, InterruptedException {
for (Integer freq : topKMap.descendingKeySet()) {
context.write(new Text(topKMap.get(freq)), new IntWritable(freq));
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "top k words");
job.setJarByClass(TopKWords.class);
job.setMapperClass(SwapMapper.class);
job.setReducerClass(TopKReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
题目 5: 日志分析
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class LogAnalysis {
public static class LogMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] parts = value.toString().split(" ");
if (parts.length > 0) {
String ip = parts[0]; // 提取 IP
context.write(new Text(ip), one);
}
}
}
public static class LogReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "log analysis");
job.setJarByClass(LogAnalysis.class);
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
题目 6: 用户购买分析
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class UserPurchaseAnalysis {
public static class PurchaseMapper extends Mapper<Object, Text, Text, DoubleWritable> {
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] parts = value.toString().split(" ");
if (parts.length == 2) {
String userId = parts[0];
double amount = Double.parseDouble(parts[1]);
context.write(new Text(userId), new DoubleWritable(amount));
}
}
}
public static class PurchaseReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
double sum = 0.0;
for (DoubleWritable val : values) {
sum += val.get();
}
context.write(key, new DoubleWritable(sum));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "user purchase analysis");
job.setJarByClass(UserPurchaseAnalysis.class);
job.setMapperClass(PurchaseMapper.class);
job.setReducerClass(PurchaseReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
题目 7: 倒排索引 (Inverted Index)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.HashSet;
public class InvertedIndex {
public static class IndexMapper extends Mapper<Object, Text, Text, Text> {
private Text word = new Text();
private Text documentId = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] line = value.toString().split("\t", 2); // 输入格式: 文档ID \t 文本内容
if (line.length < 2) return;
documentId.set(line[0]);
String[] words = line[1].split("\\s+");
for (String w : words) {
word.set(w);
context.write(word, documentId);
}
}
}
public static class IndexReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
HashSet<String> docIds = new HashSet<>();
for (Text docId : values) {
docIds.add(docId.toString());
}
context.write(key, new Text(String.join(", ", docIds)));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "inverted index");
job.setJarByClass(InvertedIndex.class);
job.setMapperClass(IndexMapper.class);
job.setReducerClass(IndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
题目 8: 用户商品共现分析
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.ArrayList;
public class CoOccurrence {
public static class CoOccurrenceMapper extends Mapper<Object, Text, Text, Text> {
private Text user = new Text();
private Text itemList = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] parts = value.toString().split("\\s+");
if (parts.length < 2) return;
user.set(parts[0]);
itemList.set(String.join(",", parts, 1, parts.length));
context.write(user, itemList);
}
}
public static class CoOccurrenceReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
ArrayList<String> items = new ArrayList<>();
for (Text val : values) {
String[] parts = val.toString().split(",");
for (String item : parts) {
items.add(item);
}
}
for (int i = 0; i < items.size(); i++) {
for (int j = i + 1; j < items.size(); j++) {
String pair = items.get(i) + " " + items.get(j);
context.write(new Text(pair), new Text("1"));
}
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "co-occurrence");
job.setJarByClass(CoOccurrence.class);
job.setMapperClass(CoOccurrenceMapper.class);
job.setReducerClass(CoOccurrenceReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
题目 9: 数据去重
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class Deduplication {
public static class DedupMapper extends Mapper<Object, Text, Text, Text> {
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
context.write(value, new Text(""));
}
}
public static class DedupReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
context.write(key, new Text(""));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "data deduplication");
job.setJarByClass(Deduplication.class);
job.setMapperClass(DedupMapper.class);
job.setReducerClass(DedupReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
题目 10: 分组统计 (Group By)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class GroupBy {
public static class GroupMapper extends Mapper<Object, Text, Text, IntWritable> {
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] parts = value.toString().split("\\s+");
if (parts.length == 2) {
String group = parts[0];
int number = Integer.parseInt(parts[1]);
context.write(new Text(group), new IntWritable(number));
}
}
}
public static class GroupReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "group by");
job.setJarByClass(GroupBy.class);
job.setMapperClass(GroupMapper.class);
job.setReducerClass(GroupReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}