上一篇:
大数据-MapReduce-关于Json数据格式的数据的处理与练习-CSDN博客
16.7 Json在Spark中的引用
依旧利用上篇的数据去获取每部电影的平均分
{"mid":1,"rate":6,"uid":"u001","ts":15632433243}
{"mid":1,"rate":4,"uid":"u002","ts":15632433263}
{"mid":1,"rate":5,"uid":"u003","ts":15632403263}
{"mid":1,"rate":3,"uid":"u004","ts":15632403963}
{"mid":1,"rate":4,"uid":"u004","ts":15632403963}
{"mid":2,"rate":5,"uid":"u001","ts":15632433243}
{"mid":2,"rate":4,"uid":"u002","ts":15632433263}
{"mid":2,"rate":5,"uid":"u003","ts":15632403263}
{"mid":2,"rate":3,"uid":"u005","ts":15632403963}
{"mid":2,"rate":7,"uid":"u005","ts":15632403963}
{"mid":2,"rate":6,"uid":"u005","ts":15632403963}
{"mid":3,"rate":2,"uid":"u001","ts":15632433243}
{"mid":3,"rate":1,"uid":"u002","ts":15632433263}
{"mid":3,"rate":3,"uid":"u005","ts":15632403963}
{"mid":3,"rate":8,"uid":"u005","ts":15632403963}
{"mid":3,"rate":7,"uid":"u005","ts":15632403963}
Spark代码
/**
* Test02.scala
*
* Scala code for calculating the average rating of each movie.
*/
package com.doit.day0130
import com.doit.day0126.Movie
import com.alibaba.fastjson.JSON
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Test02 {
def main(args: Array[String]): Unit = {
// 创建SparkConf对象,并设置应用程序名称和运行模式
val conf = new SparkConf()
.setAppName("Starting...")
.setMaster("local[*]")
// 创建SparkContext对象,并传入SparkConf对象
val sc = new SparkContext(conf)
// 读取数据文件"movie.json",并将其转换为RDD
val rdd1 = sc.textFile("data/movie.json")
// 将RDD中的每一行转换为Movie对象,并形成新的RDD
val rdd2: RDD[Movie] = rdd1.map(line => {
// 使用JSON解析器将每一行转换为Movie对象
val mv = JSON.parseObject(line, classOf[Movie])
mv
})
// 对RDD进行分组操作,以电影ID作为分组依据
val rdd3: RDD[(Int, Iterable[Movie])] = rdd2.groupBy(_.mid)
// 计算每个电影的评分总和和数量,并计算平均评分
val rdd4 = rdd3.map(tp => {
// 获取电影ID
val mid = tp._1
// 计算评分总和
val sum = tp._2.map(_.rate).sum
// 计算电影数量
val size = tp._2.size
// 计算平均评分
(mid, 1.0 * sum / size)
})
// 打印出每部电影的平均评分
rdd4.foreach(println)
}
}