该文章主要为完成实训任务,详细实现过程及结果见【http://t.csdn.cn/Twpwe】
文章目录
- 一、任务目标
- 二、准备工作
- 2.1 在本地创建用户文件
- 2.2 将用户文件上传到HDFS指定位置
- 三、完成任务
- 3.1 在Spark Shell里完成任务
- 3.1.1 读取文件,得到RDD
- 3.1.2 倒排,互换RDD中元组的元素顺序
- 3.1.3 倒排后的RDD按键分组
- 3.1.4 取分组后的日期集合最小值,计数为1
- 3.1.5 按键计数,得到每日新增用户数
- 3.1.6 让输出结果按日期升序
- 3.2 在IntelliJ IDEA里完成任务
- 3.2.1 打开RDD项目
- 3.2.2 创建统计新增用户对象
- 3.2.3 运行程序,查看结果
一、任务目标
- 已知有以下用户访问历史数据,第一列为用户访问网站的日期,第二列为用户名。
2023-05-01,mike
2023-05-01,alice
2023-05-01,brown
2023-05-02,mike
2023-05-02,alice
2023-05-02,green
2023-05-03,alice
2023-05-03,smith
2023-05-03,brian
- 现需要根据上述数据统计每日新增的用户数量,期望统计结果。
2023-05-01新增用户数:3
2023-05-02新增用户数:1
2023-05-03新增用户数:2
- 即2023-05-01新增了3个用户(分别为mike、alice、brown),2023-05-02新增了1个用户(green),2023-05-03新增了两个用户(分别为smith、brian)。
二、准备工作
2.1 在本地创建用户文件
- 在
/home
目录里创建users.txt
文件
2.2 将用户文件上传到HDFS指定位置
- 先创建
/newusers/input
目录,再将用户文件上传到该目录
三、完成任务
3.1 在Spark Shell里完成任务
3.1.1 读取文件,得到RDD
- 执行命令:
val rdd1 = sc.textFile("hdfs://master:9000/newusers/input/users.txt")
3.1.2 倒排,互换RDD中元组的元素顺序
val rdd2 = rdd1.map(
line => {
val fields = line.split(",")
(fields(1), fields(0))
}
)
rdd2.collect.foreach(println)
3.1.3 倒排后的RDD按键分组
- 执行命令:
val rdd3 = rdd2.groupByKey()
3.1.4 取分组后的日期集合最小值,计数为1
- 执行命令:
val rdd4 = rdd3.map(line => (line._2.min, 1))
3.1.5 按键计数,得到每日新增用户数
- 执行命令:
val result = rdd4.countByKey()
- 执行命令:
result.keys.foreach(key => println(key + "新增用户:" + result(key)))
3.1.6 让输出结果按日期升序
- 执行命令:
val keys = result.keys.toList.sorted
,让键集升序排列
- 按日期降序
3.2 在IntelliJ IDEA里完成任务
3.2.1 打开RDD项目
3.2.2 创建统计新增用户对象
- 在
cn.kox.day07
包里创建CountNewUsers
对象
package cn.kox.rdd.day07
import org.apache.spark.{SparkConf, SparkContext}
/**
* @ClassName: CountNewUsers
* @Author: Kox
* @Data: 2023/6/15
* @Sketch:
*/
object CountNewUsers {
def main(args: Array[String]): Unit = {
// 创建Spark配置对象
val conf = new SparkConf()
.setAppName("CountNewUsers") // 设置应用名称
.setMaster("local[*]") // 设置主节点位置(本地调试)
// 基于Spark配置对象创建Spark容器
val sc = new SparkContext(conf)
// 读取文件,得到RDD
val rdd1 = sc.textFile("hdfs://master:9000/newusers/input/users.txt")
// 倒排,互换RDD中元组的元素顺序
val rdd2 = rdd1.map(
line => {
val fields = line.split(",")
(fields(1), fields(0))
}
)
// 倒排后的RDD按键分组
val rdd3 = rdd2.groupByKey()
// 取分组后的日期集合最小值,计数为1
val rdd4 = rdd3.map(line => (line._2.min, 1))
// 按键计数,得到每日新增用户数
val result = rdd4.countByKey()
// 让统计结果按日期升序
val keys = result.keys.toList.sorted
keys.foreach(key => println(key + "新增用户:" + result(key)))
// 停止Spark容器
sc.stop()
}
}