【Spark分布式内存计算框架——Spark SQL】7. 数据处理分析案例

news2024/9/20 22:44:45

4.3 案例:电影评分数据分析

使用电影评分数据进行数据分析,分别使用DSL编程和SQL编程,熟悉数据处理函数及SQL使用,业务需求说明:
对电影评分数据进行统计分析,获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于2000)。

数据集ratings.dat总共100万条数据,数据格式如下,每行数据各个字段之间使用双冒号分开:
在这里插入图片描述
数据处理分析步骤如下:

第一步、读取电影评分数据,从本地文件系统读取
第二步、转换数据,指定Schema信息,封装到DataFrame
第三步、基于SQL方式分析
第四步、基于DSL方式分析

数据 ETL
读取电影评分数据,将其转换为DataFrame,使用指定列名方式定义Schema信息,代码如下:

// 构建SparkSession实例对象
val spark: SparkSession = SparkSession.builder()
.master("local[4]")
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.getOrCreate()
// 导入隐式转换
import spark.implicits._
// 1. 读取电影评分数据,从本地文件系统读取
val rawRatingsDS: Dataset[String] = spark.read.textFile("datas/ml-1m/ratings.dat")
// 2. 转换数据
val ratingsDF: DataFrame = rawRatingsDS
// 过滤数据
.filter(line => null != line && line.trim.split("::").length == 4)
// 提取转换数据
.mapPartitions{iter =>
iter.map{line =>
// 按照分割符分割,拆箱到变量中
val Array(userId, movieId, rating, timestamp) = line.trim.split("::")
// 返回四元组
(userId, movieId, rating.toDouble, timestamp.toLong)
}
}
// 指定列名添加Schema
.toDF("userId", "movieId", "rating", "timestamp")
/*
root
|-- userId: string (nullable = true)
|-- movieId: string (nullable = true)
|-- rating: double (nullable = false)
|-- timestamp: long (nullable = false)
*/
//ratingsDF.printSchema()
/*
+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
| 1| 1193| 5.0|978300760|
| 1| 661| 3.0|978302109|
| 1| 594| 4.0|978302268|
| 1| 919| 4.0|978301368|
+------+-------+------+---------+
*/
//ratingsDF.show(4)

使用 SQL 分析
首先将DataFrame注册为临时视图,再编写SQL语句,最后使用SparkSession执行,代码如下:

// TODO: 基于SQL方式分析
// 第一步、注册DataFrame为临时视图
ratingsDF.createOrReplaceTempView("view_temp_ratings")
// 第二步、编写SQL
val top10MovieDF: DataFrame = spark.sql(
"""
|SELECT
| movieId, ROUND(AVG(rating), 2) AS avg_rating, COUNT(movieId) AS cnt_rating
|FROM
| view_temp_ratings
|GROUP BY
| movieId
|HAVING
| cnt_rating > 2000
|ORDER BY
| avg_rating DESC, cnt_rating DESC
|LIMIT
| 10
""".stripMargin)
//top10MovieDF.printSchema()
top10MovieDF.show(10, truncate = false)

运行程序结果如下:
在这里插入图片描述
使用 DSL 分析
调用Dataset中函数,采用链式编程分析数据,核心代码如下:

// TODO: 基于DSL=Domain Special Language(特定领域语言) 分析
import org.apache.spark.sql.functions._
val resultDF: DataFrame = ratingsDF
// 选取字段
.select($"movieId", $"rating")
// 分组:按照电影ID,获取平均评分和评分次数
.groupBy($"movieId")
.agg( //
round(avg($"rating"), 2).as("avg_rating"), //
count($"movieId").as("cnt_rating") //
)
// 过滤:评分次数大于2000
.filter($"cnt_rating" > 2000)
// 排序:先按照评分降序,再按照次数降序
.orderBy($"avg_rating".desc, $"cnt_rating".desc)
// 获取前10
.limit(10)
//resultDF.printSchema()
resultDF.show(10)

其中使用SparkSQL中自带函数库functions,在org.apache.spark.sql.functions中,包含常用函数,有些与Hive中函数库类似,但是名称不一样。
在这里插入图片描述
使用需要导入函数库:import org.apache.spark.sql.functions._

保存结果数据
将分析结果数据保存到外部存储系统中,比如保存到MySQL数据库表中或者CSV文件中。

// TODO: 将分析的结果数据保存MySQL数据库和CSV文件
// 结果DataFrame被使用多次,缓存
resultDF.persist(StorageLevel.MEMORY_AND_DISK)
// 1. 保存MySQL数据库表汇总
resultDF
.coalesce(1) // 考虑降低分区数目
.write
.mode("overwrite")
.option("driver", "com.mysql.cj.jdbc.Driver")
.option("user", "root")
.option("password", "123456")
.jdbc(
"jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnic
ode=true",
"db_test.tb_top10_movies",
new Properties()
)
// 2. 保存CSV文件:每行数据中个字段之间使用逗号隔开
resultDF
.coalesce(1)
.write.mode("overwrite")
.csv("datas/top10-movies")
// 释放缓存数据
resultDF.unpersist()

查看数据库中结果表的数据:
在这里插入图片描述
案例完整代码
电影评分数据分析,经过数据ETL、数据分析(SQL分析和DSL分析)及最终保存结果,整套数据处理分析流程,其中涉及到很多数据细节,完整代码如下:

import java.util.Properties
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.storage.StorageLevel
/**
* 需求:对电影评分数据进行统计分析,获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于2000)
*/
object SparkTop10Movie {
def main(args: Array[String]): Unit = {
// 构建SparkSession实例对象
val spark: SparkSession = SparkSession.builder()
.master("local[4]")
.appName(this.getClass.getSimpleName.stripSuffix("$"))
// TODO: 设置shuffle时分区数目
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
// 导入隐式转换
import spark.implicits._
// 1. 读取电影评分数据,从本地文件系统读取
val rawRatingsDS: Dataset[String] = spark.read.textFile("datas/ml-1m/ratings.dat")
// 2. 转换数据
val ratingsDF: DataFrame = rawRatingsDS
// 过滤数据
.filter(line => null != line && line.trim.split("::").length == 4)
// 提取转换数据
.mapPartitions{iter =>
iter.map{line =>
// 按照分割符分割,拆箱到变量中
val Array(userId, movieId, rating, timestamp) = line.trim.split("::")
// 返回四元组
(userId, movieId, rating.toDouble, timestamp.toLong)
}
}
// 指定列名添加Schema
.toDF("userId", "movieId", "rating", "timestamp")
/*
root
|-- userId: string (nullable = true)
|-- movieId: string (nullable = true)
|-- rating: double (nullable = false)
|-- timestamp: long (nullable = false)
*/
//ratingsDF.printSchema()
/*
+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
| 1| 1193| 5.0|978300760|
| 1| 661| 3.0|978302109|
| 1| 594| 4.0|978302268|
| 1| 919| 4.0|978301368|
+------+-------+------+---------+
*/
//ratingsDF.show(4)
// TODO: 基于SQL方式分析
// 第一步、注册DataFrame为临时视图
ratingsDF.createOrReplaceTempView("view_temp_ratings")
// 第二步、编写SQL
val top10MovieDF: DataFrame = spark.sql(
"""
|SELECT
| movieId, ROUND(AVG(rating), 2) AS avg_rating, COUNT(movieId) AS cnt_rating
|FROM
| view_temp_ratings
|GROUP BY
| movieId
|HAVING
| cnt_rating > 2000
|ORDER BY
| avg_rating DESC, cnt_rating DESC
|LIMIT
| 10
""".stripMargin)
//top10MovieDF.printSchema()
top10MovieDF.show(10, truncate = false)
println("===============================================================")
// TODO: 基于DSL=Domain Special Language(特定领域语言) 分析
import org.apache.spark.sql.functions._
val resultDF: DataFrame = ratingsDF
// 选取字段
.select($"movieId", $"rating")
// 分组:按照电影ID,获取平均评分和评分次数
.groupBy($"movieId")
.agg( //
round(avg($"rating"), 2).as("avg_rating"), //
count($"movieId").as("cnt_rating") //
)
// 过滤:评分次数大于2000
.filter($"cnt_rating" > 2000)
// 排序:先按照评分降序,再按照次数降序
.orderBy($"avg_rating".desc, $"cnt_rating".desc)
// 获取前10
.limit(10)
//resultDF.printSchema()
resultDF.show(10)
// TODO: 将分析的结果数据保存MySQL数据库和CSV文件
// 结果DataFrame被使用多次,缓存
resultDF.persist(StorageLevel.MEMORY_AND_DISK)
// 1. 保存MySQL数据库表汇总
resultDF
.coalesce(1) // 考虑降低分区数目
.write
.mode("overwrite")
.option("driver", "com.mysql.cj.jdbc.Driver")
.option("user", "root")
.option("password", "123456")
.jdbc(
"jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnic
ode=true",
"db_test.tb_top10_movies",
new Properties()
)
// 2. 保存CSV文件:每行数据中个字段之间使用逗号隔开
resultDF
.coalesce(1)
.write.mode("overwrite")
.csv("datas/top10-movies")
// 释放缓存数据
resultDF.unpersist()
// 应用结束,关闭资源
Thread.sleep(10000000)
spark.stop()
}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/352226.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

4道数学题,求解极狐GitLab CI 流水线|第23题:父子流水线 + 多项目流水线

本文来自: 武让 极狐(GitLab) 高级解决方案架构师 💡 极狐GitLab CI 依靠其一体化、轻量化、声明式、开箱即用的特性,在开发者群体中的使用率越来越高,在国内企业中仅次于 Jenkins ,排在第二位。 极狐GitLab 流水线有…

如何解决错误“已超过了锁请求超时时段。 (Microsoft SQL Server,错误: 1222)“

解决 Microsoft SQL Server 的错误: 1222 使用存储过程 sp_who2设置 LOCK_TIMEOUT在Microsoft SQL Server Management Studio中,有时会在对象资源管理器中查看树、表或过程时收到错误。当查询等待的时间超过锁定超时设置时,通常会发生此错误。锁定超时以毫秒为单位,等待后端…

LeetCode 382. 链表随机节点

原题链接 难度:middle\color{orange}{middle}middle 题目描述 给你一个单链表,随机选择链表的一个节点,并返回相应的节点值。每个节点 被选中的概率一样 。 实现 SolutionSolutionSolution 类: Solution(ListNodehead)Solution…

数据库(第五次作业)

1.1 Redis概述 1.1.1 什么是Redis 2008年,意大利的一家创业公司Merzia推出了一款基于MySQL的网站实时统计系统LLOOGG,然而没过多久该公司的创始人 Salvatore Sanfilippo便开始对MySQL的性能感到失望,于是他决定亲自为LLOOGG量身定做一个数据…

Amazon S3简介

前言: 这段时间来到了某大数据平台,做平台技术底座封装和一些架构等等,有结构化数据也有非结构数据,涉及到很多技术,自己也私下花时间去研究了很多,有很多纯技术类的还是需要梳理并记录,巩固以及…

保姆级教程:Win10远程连接MACBook、MACBook远程连接Win10。

本篇给大家展示Windows远程连接连接MACBook、MACBook远程连接Windows的方法。需要明确的是,这种通过TCP远程连接的远程很稳定,基本比向日葵等软件流畅很多,但是缺点是需要保证在同一网域下,也就是同一局域网下,部分情况…

中国协同办公的终局是什么?

换言之,工具一体化是基本要求,但基于全链路集成的生态则是因地制宜、于当下的产业探索。 作者|斗斗 编辑|皮爷 出品|产业家 “计划裁员1300人,占到全球员工的15%。”美国东部时间2月7日,Zoom CEO袁征在官网博客发布公开信…

【SpringBoot】分布式日志跟踪—通过MDC实现全链路调用日志跟踪

一.MDC 1.MDC介绍 MDC(Mapped Diagnostic Context,映射调试上下文)是 log4j 和 logback 提供的一种方便在多线程场景下记录日志的功能。MDC 可以看成是一个与当前线程绑定的Map,可以往其中添加键值对。MDC 中包含的内容可以被同…

Python机器学习入门笔记(2)—— 分类算法

目录 转换器(transformer)和估计器(estimator) K-近邻(K-Nearest Neighbors,简称KNN)算法 模型选择与调优 交叉验证(Cross-validation) GridSearchCV API 朴素贝叶…

UE4 渲染学习笔记(未完)

原文链接:虚幻4渲染管线入门 - 知乎 从原文摘抄一下: 渲染框架 1,一套是传统的以RHICmdList为核心构建RenderPass,从RHICmdList.BeginRenderPass(...)开始,以RHICmdList.EndRenderPass()结束的框架。 2.一套是以新的Gr…

【面向对象语言三大特性之 “继承”】

目录 1.继承的概念及定义 1.1继承的概念 1.2 继承定义 1.2.1定义格式 1.2.2继承关系和访问限定符 1.2.3继承基类成员访问方式的变化 2.基类和派生类对象赋值转换 3.继承中的作用域 4.派生类的默认成员函数 5.继承与友元 6. 继承与静态成员 7.复杂的菱形继承及菱形虚拟…

linux(全志)初始环境到移植lvgl

一、 格式化TF卡 1. linux命令行格式化 1.1 找到U盘位置(已挂载) sudo fdisk -l 如图,我的在/dev/sdb 1.2 格式化U盘 sudo mkfs -t vfat /dev/sdb-t 后面是格式化为哪种文件系统格式,vfat就是fat32格式,最后…

Hashmap链表长度大于8真的会变成红黑树吗?

1、本人博客《HashMap、HashSet底层原理分析》 2、本人博客《若debug时显示的Hashmap没有table、size等元素时,查看第19条》 结论 1、链表长度大于8时(插入第9条时),会触发树化(treeifyBin)方法,但是不一定会树化,若数组大小小于…

Win11电脑速度慢、延迟高怎么办?

作为新版的系统,Windows 11还需要更多的时间完善。不少用户反映升级了Win11后反而感觉速度慢,还有延迟或死机现象。 如果你使用Win11系统时也有这种感觉,那这篇文章就是为你提供的。 问题可能出在系统存储容量低、驱动程序已过时&#xff0…

APP渗透抓包

APP渗透抓包1.APP渗透测试原理2.安装安卓模拟器抓包2.1.安装模拟器2.2.设置代理下载证书2.2.1.burp suite设置代理2.2.2.浏览器设置代理2.2.3.下载证书2.3.模拟器安装证书2.3.1.移动证书2.3.2.证书设置2.4.设置代理2.4.1.设置burp suite代理2.4.2.夜神模拟器代理2.5.抓包测试2.…

JVM调优及垃圾回收GC

一、说一说JVM的内存模型。JVM的运行时内存也叫做JVM堆,从GC的角度可以将JVM分为新生代、老年代和永久代。其中新生代默认占1/3堆内存空间,老年代默认占2/3堆内存空间,永久代占非常少的对内存空间。新生代又分为Eden区、SurvivorFrom区和Surv…

Node.js安装与配置

Node.js安装与配置 前言 本篇博文记录了Node.js安装与环境变量配置的详细步骤,旨在为将来再次配置Node.js时提供指导方法。 另外:Node.js版本请根据自身系统选择,安装位置、全局模块存放位置和环境变量应根据自身实际情况进行更改。 Node…

spring(四)——————从spring源码角度去解释前面的疑问

前面两篇文章,我们从mybatis-spring的插件包出发,探究如何将第三方框架集成到spring中,也知道了mybatis中使用了FactoryBeanImportBeanDefifinitionRegistrarImport对mapper进行注入。 不过我们在前两篇文章中仍然遗留很多疑点,例…

Revit导出PDF格式图纸流程及“批量导出图纸”

一、Revit导出PDF格式图纸流程 1、点击左上方“应用程序菜单”即“R”图标,进择“打印”选项。 2、在弹出的对话框中,需要设置图纸“打印范围”,选择“所选的视图/图纸选项”,点击“选择”,按钮,选择我们需…

LESS模型与随机森林

模型学习 1 随机森林 https://blog.csdn.net/weixin_35770067/article/details/107346591? 森林就是建立了很多决策树,把很多决策树组合到一起就是森林。 这些决策树都是为了解决同一任务建立的,最终的目标也都是一致的,最后将其结果来平均…