一、利用RDD计算总分与平均分
(一)准备工作
1、启动HDFS服务
2、启动Spark服务
3、在本地创建成绩文件
4、将成绩文件上传到HDFS
(二)完成任务
1、在Spark Shell里完成任务
(1)读取成绩文件,生成RDD
(2)定义二元组成绩列表
(3)利用RDD填充二元组成绩列表
(4)基于二元组成绩列表创建RDD
(5)对rdd按键归约得到rdd1,计算总分
(6)将rdd1映射成rdd2,计算总分与平均分
2、在IntelliJ IDEA里完成任务
(1)打开RDD项目 创建计算总分平均分对象
package net.cxf.rdd.day07
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer
/**
* 功能:统计总分与平均分
* 作者:cxf
* 日期:2023年05月11日
*/
object CalculateSumAvg {
def main(args: Array[String]): Unit = {
// 创建Spark配置对象
val conf = new SparkConf()
.setAppName("CalculateSumAvg ") // 设置应用名称
.setMaster("local[*]") // 设置主节点位置(本地调试)
// 基于Spark配置对象创建Spark容器
val sc = new SparkContext(conf)
// 读取成绩文件,生成RDD
val lines = sc.textFile("hdfs://master:9000/scoresumavg/input/scores.txt")
// 定义二元组成绩列表
val scores = new ListBuffer[(String, Int)]()
// 利用RDD填充二元组成绩列表
lines.collect.foreach(line => {
val fields = line.split(" ")
scores.append((fields(0), fields(1).toInt))
scores.append((fields(0), fields(2).toInt))
scores.append((fields(0), fields(3).toInt))
})
// 基于二元组成绩列表创建RDD
val rdd = sc.makeRDD(scores);
// 对rdd按键归约得到rdd1,计算总分
val rdd1 = rdd.reduceByKey(_ + _)
// 将rdd1映射成rdd2,计算总分与平均分
val rdd2 = rdd1.map(score => (score._1, score._2, (score._2 / 3.0).formatted("%.2f")))
// 在控制台输出rdd2的内容
rdd2.collect.foreach(println)
// 将rdd2内容保存到HDFS指定位置
rdd2.saveAsTextFile("hdfs://master:9000/scoresumavg/output")
// 关闭Spark容器
sc.stop()
}
}
运行程序,查看结果
查看HDFS的结果文件
二、利用RDD统计每日新增用户
(一)准备工作
1、在本地创建用户文件
2、将用户文件上传到HDFS指定位置
(二)完成任务
1、在Spark Shell里完成任务
(1)读取文件,得到RDD
(2)倒排,互换RDD中元组的元素顺序
(3)倒排后的RDD按键分组
(4)取分组后的日期集合最小值,计数为1
(5)按键计数,得到每日新增用户数
(6)让输出结果按日期升序
2、在IntelliJ IDEA里完成任务
(1)打开RDD项目 创建统计新增用户对象
package net.cxf.rdd.day07
import org.apache.spark.{SparkConf, SparkContext}
/**
* 功能:统计新增用户
* 作者:cxf
* 日期:2023年05月24日
*/
object CountNewUsers {
def main(args: Array[String]): Unit = {
// 创建Spark配置对象
val conf = new SparkConf()
.setAppName("CountNewUsers") // 设置应用名称
.setMaster("local[*]") // 设置主节点位置(本地调试)
// 基于Spark配置对象创建Spark容器
val sc = new SparkContext(conf)
// 读取文件,得到RDD
val rdd1 = sc.textFile("hdfs://master:9000/newusers/input/users.txt")
// 倒排,互换RDD中元组的元素顺序
val rdd2 = rdd1.map(
line => {
val fields = line.split(",")
(fields(1), fields(0))
}
)
// 倒排后的RDD按键分组
val rdd3 = rdd2.groupByKey()
// 取分组后的日期集合最小值,计数为1
val rdd4 = rdd3.map(line => (line._2.min, 1))
// 按键计数,得到每日新增用户数
val result = rdd4.countByKey()
// 让统计结果按日期升序
val keys = result.keys.toList.sorted
keys.foreach(key => println(key + "新增用户:" + result(key)))
// 停止Spark容器
sc.stop()
}
}
运行程序,查看结果
三、利用RDD实现分组排行榜
(一)准备工作
1、在本地创建成绩文件
2、将成绩文件上传到HDFS上指定目录
(二)完成任务
(1)读取成绩文件得到RDD
(2)利用映射算子生成二元组构成的RDD
(3)按键分组得到新的二元组构成的RDD
(4)按值排序,取前三
(5)按指定格式输出结果
2、在IntelliJ IDEA里完成任务
(1)打开RDD项目 创建分组排行榜单例对象
package net.cxf.rdd.day07
import org.apache.spark.{SparkConf, SparkContext}
/**
* 功能:成绩分组排行榜
* 作者:cxf
* 日期:2023年05月24日
*/
object GradeTopN {
def main(args: Array[String]): Unit = {
// 创建Spark配置对象
val conf = new SparkConf()
.setAppName("GradeTopN") // 设置应用名称
.setMaster("local[*]") // 设置主节点位置(本地调试)
// 基于Spark配置对象创建Spark容器
val sc = new SparkContext(conf)
// 实现分组排行榜
val top3 = sc.textFile("hdfs://master:9000/topn/input/grades.txt")
.map(line => {
val fields = line.split(" ")
(fields(0), fields(1))
}) // 将每行成绩映射成二元组(name, grade)
.groupByKey() // 按键分组
.map(item => {
val name = item._1
val top3 = item._2.toList.sortWith(_ > _).take(3)
(name, top3)
}) // 值排序,取前三
// 输出分组排行榜结果
top3.collect.foreach(line => {
val name = line._1
val scores = line._2.mkString(" ")
println(name + ": " + scores)
})
// 停止Spark容器,结束任务
sc.stop()
}
}
运行程序,查看结果