第1章 Spark SQL概述
1.1 什么是Spark SQL
1)Spark SQL是Spark用于结构化数据(Structured Data)处理的Spark模块。
1.2 为什么要有Spark SQL
1.3 Spark SQL原理
1.3.1 什么是DataFrame
(1)DataFrame是一种类似RDD的分布式数据集,类似于传统数据库中的二维表格。
(2)DataFrame与RDD的主要区别在于,DataFrame带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。
左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解Person类的内部结构。而右侧的DataFrame却提供了详细的结构信息,使得Spark SQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。
(3)Spark SQL性能上比RDD要高。因为Spark SQL了解数据内部结构,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在Stage层面进行简单、通用的流水线优化。
1.3.2 什么是DataSet
DataSet是分布式数据集。
- DataSet是强类型的。比如可以有DataSet[Car],DataSet[User]。具有类型安全检查
- DataFrame是DataSet的特例,type DataFrame = DataSet[Row] ,Row是一个类型,跟Car、User这些的类型一样,所有的表结构信息都用Row来表示。
1.3.3 RDD、DataFrame和DataSet之间关系
1)发展历史
RDD(Spark1.0)=》Dataframe(Spark1.3)=》Dataset(Spark1.6)
如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果。不同的是他们的执行效率和执行方式。在后期的Spark版本中,DataSet有可能会逐步取代RDD和DataFrame成为唯一的API接口。
2)三者的共性
(1)RDD、DataFrame、DataSet全都是Spark平台下的分布式弹性数据集,为处理超大型数据提供便利
(2)三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action行动算子如foreach时,三者才会开始遍历运算
(3)三者有许多共同的函数,如filter,排序等
(4)三者都会根据Spark的内存情况自动缓存运算
(5)三者都有分区的概念
1.4 Spark SQL的特点
1)易整合
无缝的整合了SQL查询和Spark编程。
2)统一的数据访问方式
使用相同的方式连接不同的数据源。
3)兼容Hive
在已有的仓库上直接运行SQL或者HQL。
4)标准的数据连接
通过JDBC或者ODBC来连接
第2章 Spark SQL编程
在老的版本中,SparkSQL提供两种SQL查询起始点:
- 一个叫SQLContext,用于Spark自己提供的SQL查询;
- 一个叫HiveContext,用于连接Hive的查询。
SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。
SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。当我们使用spark-shell的时候,Spark框架会自动的创建一个名称叫做Spark的SparkSession,就像我们以前可以自动获取到一个sc来表示SparkContext。
2.2 DataFrame
DataFrame是一种类似于RDD的分布式数据集,类似于传统数据库中的二维表格。
2.2.1 创建DataFrame
在Spark SQL中SparkSession是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式:
- 通过Spark的数据源进行创建;
- 从一个存在的RDD进行转换;
- 还可以从Hive Table进行查询返回。
* 1、通过toDF方法创建
* import spark.implicits._
* rdd.toDF() / rdd.toDF(列名,...)
* 集合.toDF() / 集合.toDF(列名,...)
*
* 2、读取文件创建: spark.read.csv/json/text/parquet/orc/jdbc
* 3、通过其他DataFrame衍生: val df2 = df.filter/select/where/...
* 4、通过createDataFrame方法创建
* val 列的信息 = StructType(Array(StructFiled(列名,列类型),...))
* val 数据:RDD[Row] = ...
* spark.createDataFrame(数据,列的信息)
注意:如果从内存中获取数据,Spark可以知道数据类型具体是什么,如果是数字,默认作为Int处理;但是从文件中读取的数字,不能确定是什么类型,所以用BigInt接收,可以和Long类型转换,但是和Int不能进行转换。
2.2.2 SQL风格语法
SQL语法风格是指我们查询数据的时候使用SQL语句来查询,这种风格的查询必须要有临时视图或者全局视图来辅助。
视图:对特定表的数据的查询结果重复使用。View只能查询,不能修改和插入。
select * from t_user where age > 30 的查询结果可以存储在临时表(视图)v_user_age中,方便在后面重复使用。例如:select * from v_user_age
1)临时视图
(1)创建一个DataFrame
scala> val df = spark.read.json("/opt/module/spark-local/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
(2)对DataFrame创建一个临时视图
scala> df.createOrReplaceTempView("user")
(3)通过SQL语句实现查询全表
scala> val sqlDF = spark.sql("SELECT * FROM user")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
(4)结果展示
scala> sqlDF.show
+---+--------+
|age| name|
+---+--------+
| 20|qiaofeng|
| 19| xuzhu|
| 18| duanyu|
+---+--------+
(5)求年龄的平均值
scala> val sqlDF = spark.sql("SELECT avg(age) from user")
sqlDF: org.apache.spark.sql.DataFrame = [avg(age): double]
(6)结果展示
scala> sqlDF.show
+--------+
|avg(age)|
+--------+
| 19.0|
+--------+
(7)创建一个新会话再执行,发现视图找不到
scala> spark.newSession().sql("SELECT avg(age) from user ").show()
org.apache.spark.sql.AnalysisException: Table or view not found: user; line 1 pos 14;
注意:普通临时视图是Session范围内的,如果想全局有效,可以创建全局临时视图。
2)全局视图
(1)对于DataFrame创建一个全局视图
scala> df.createOrReplaceGlobalTempView ("user2")
(2)通过SQL语句实现查询全表
scala> spark.sql("SELECT * FROM global_temp.user2").show()
+---+--------+
|age| name|
+---+--------+
| 20|qiaofeng|
| 19| xuzhu|
| 18| duanyu|
+---+--------+
(3)新建session,通过SQL语句实现查询全表
scala> spark.newSession().sql("SELECT * FROM global_temp.user2").show()
+---+--------+
|age| name|
+---+--------+
| 20|qiaofeng|
| 19| xuzhu|
| 18| duanyu|
+---+--------+
2.2.3 DSL风格语法
DataFrame提供一个特定领域语言(domain-specific language,DSL)去管理结构化的数据,可以在Scala,Java,Python和R中使用DSL,使用DSL语法风格不必去创建临时视图了。
1)创建一个DataFrame
scala> val df = spark.read.json("/opt/module/spark-local/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
2)查看DataFrame的Schema信息
scala> df.printSchema
root
|-- age: Long (nullable = true)
|-- name: string (nullable = true)
3)只查看“name”列数据
注意:列名要用双引号引起来,如果是单引号的话,只能在前面加一个单引号。
scala> df.select("name").show()
+--------+
| name|
+--------+
|qiaofeng|
| xuzhu|
| duanyu|
+--------+
scala> df.select('name).show
+--------+
| name|
+--------+
|qiaofeng|
| xuzhu|
| duanyu|
+--------+
4)查看年龄和姓名,且年龄大于18
scala> df.select("age","name").where("age>18").show
+---+--------+
|age| name|
+---+--------+
| 20|qiaofeng|
| 19| xuzhu|
+---+--------+
5)查看所有列
scala> df.select("*").show
+---+--------+
|age| name|
+---+--------+
| 20| qiaofeng|
| 19| xuzhu|
| 18| duanyu|
+---+--------+
6)查看“name”列数据以及“age+1”数据
注意:涉及到运算的时候,每列都必须使用$,或者采用单引号表达式:单引号+字段名
scala> df.select($"name",$"age" + 1).show
scala> df.select('name, 'age + 1).show()
scala> df.select('name, 'age + 1 as "newage").show()
+--------+---------+
| name |(age + 1)|
+--------+---------+
|qiaofeng| 21|
| xuzhu| 20|
| duanyu| 19|
+--------+---------+
7)查看“age”大于“19”的数据
scala> df.filter("age>19").show
+---+--------+
|age | name|
+---+--------+
| 20|qiaofeng|
+---+--------+
8)按照“age”分组,查看数据条数
scala> df.groupBy("age").count.show
+---+-----+
|age|count|
+---+-----+
| 19| 1|
| 18| 1|
| 20| 1|
+---+-----+
9)求平均年龄avg(age)
scala> df.agg(avg("age")).show
+--------+
|avg(age)|
+--------+
| 19.0|
+--------+
10)求年龄总和sum(age)
scala> df.agg(max("age")).show
+--------+
|max(age)|
+--------+
| 20|
+--------+
2.3 DataSet
DataSet是具有强类型的数据集合,需要提供对应的类型信息。
2.3.1 创建DataSet(基本类型序列)
* DataSet的创建
1、通过toDS方法创建
2、读取文件创建: spark.read.textFile(..)
3、通过其他DataSet衍生: val ds2 = ds.map/flatMap/filter...
4、通过createDataSet方法创建
spark.createDataSet(集合/rdd)
2.3.2 创建DataSet(样例类序列)
使用样例类序列创建DataSet。
(1)创建一个User的样例类
scala> case class User(name: String, age: Long)
defined class User
(2)将集合转换为DataSet
scala> val caseClassDS = Seq(User("wangyuyan",18)).toDS()
caseClassDS: org.apache.spark.sql.Dataset[User] = [name: string, age: bigint]
(3)查看DataSet的值
scala> caseClassDS.show
+---------+---+
| name|age|
+---------+---+
|wangyuyan| 18|
+---------+---+
注意:在实际开发的时候,很少会把序列转换成DataSet,更多是通过RDD和DataFrame转换来得到DataSet.
2.4 RDD、DataFrame、DataSet相互转换
2.5 用户自定义函数
2.5.1 UDF
1)UDF:一行进入,一行出
2)代码实现
package com.atguigu.sparksql
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
object SparkSQL05_UDF{
def main(args: Array[String]): Unit = {
// 1 创建上下文环境配置对象
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
// 2 创建SparkSession对象
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
// 3 读取数据
val df: DataFrame = spark.read.json("input/user.json")
// 4 创建DataFrame临时视图
df.createOrReplaceTempView("user")
// 5 注册UDF函数。功能:在数据前添加字符串“Name:”
spark.udf.register("addName", (x:String) => "Name:"+ x)
// 6 调用自定义UDF函数
spark.sql("select addName(name), age from user").show()
// 7 释放资源
spark.stop()
}
}
2.5.2 UDAF
1)UDAF:输入多行,返回一行。
2)Spark3.x推荐使用extends Aggregator自定义UDAF,属于强类型的Dataset方式。
3)Spark2.x使用extends UserDefinedAggregateFunction,属于弱类型的DataFrame
4)案例实操
需求:实现求平均年龄,自定义UDAF,MyAvg(age)
(1)自定义聚合函数实现-强类型
package com.atguigu.sparksql
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{DataFrame, Encoder, Encoders, SparkSession, functions}
object SparkSQL06_UDAF {
def main(args: Array[String]): Unit = {
// 1 创建上下文环境配置对象
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
// 2 创建SparkSession对象
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
// 3 读取数据
val df: DataFrame = spark.read.json("input/user.json")
// 4 创建DataFrame临时视图
df.createOrReplaceTempView("user")
// 5 注册UDAF
spark.udf.register("myAvg", functions.udaf(new MyAvgUDAF()))
// 6 调用自定义UDAF函数
spark.sql("select myAvg(age) from user").show()
// 7 释放资源
spark.stop()
}
}
//输入数据类型
case class Buff(var sum: Long, var count: Long)
/**
* 1,20岁; 2,19岁; 3,18岁
* IN:聚合函数的输入类型:Long
* Buff : sum = (18+19+20) count = 1+1+1
* OUT:聚合函数的输出类型:Double (18+19+20) / 3
*/
class MyAvgUDAF extends Aggregator[Long, Buff, Double] {
// 初始化缓冲区
override def zero: Buff = Buff(0L, 0L)
// 将输入的年龄和缓冲区的数据进行聚合
override def reduce(buff: Buff, age: Long): Buff = {
buff.sum = buff.sum + age
buff.count = buff.count + 1
buff
}
// 多个缓冲区数据合并
override def merge(buff1: Buff, buff2: Buff): Buff = {
buff1.sum = buff1.sum + buff2.sum
buff1.count = buff1.count + buff2.count
buff1
}
// 完成聚合操作,获取最终结果
override def finish(buff: Buff): Double = {
buff.sum.toDouble / buff.count
}
// SparkSQL对传递的对象的序列化操作(编码)
// 自定义类型就是product 自带类型根据类型选择
override def bufferEncoder: Encoder[Buff] = Encoders.product
override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
2.5.3 UDTF(没有)
输入一行,返回多行(Hive);
SparkSQL中没有UDTF,Spark中用flatMap即可实现该功能。
第3章 SparkSQL数据的加载与保存
3.1 加载数据
* sparksql 读取有两种方式
* 1. spark.read <不用>
* .format("csv/text/parquet/jdbc/json/orc") --- 指定读数据格式
* [.option(k,v)....] --- 指定读取数据需要的参数
* .load([path]) --- 加载数据
* 2. spark.read.[option(k,v)..].json/csv/parquet/orc/jdbc(...) <常用>
代码案例:
package com.atguigu.day007
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import org.junit.Test
import java.util.Properties
class $02_Reader {
val spark: SparkSession = SparkSession.builder().appName("appName").master("local[4]").getOrCreate()
@Test
def readFile(): Unit ={
//TODO 读取文件第一种方式
val df: DataFrame = spark.read.format("text").load("data/wc.txt")
df.show
//TODO 读取文件第二种方式
val df2: DataFrame = spark.read.text("data/wc.txt")
df2.show
//TODO 读取csv文件
// csv 常用的 option:
// seq:指定字段之间的分隔符
// header: 是否以第一行作为列名
// inferSchema: 自动推断列的类型
// val df3: DataFrame = spark.read.option("header", "ture").option("inferSchema", "true").csv("data/xxx.csv")
// df3.printSchema()
//TODO 读取 parquet
df2.write.mode(SaveMode.Overwrite).parquet("data/parquet")
spark.read.parquet("data/parquet").show
}
/**
* 读取 mysql
*/
@Test
def readMysql(): Unit ={
// 指定mysql url地址
val url = "jdbc:mysql://hadoop102:3306/gmall"
// TODO 拉取mysql整表数据
val tableName = "user_info"
//TODO 拉取mysql指定数据
val tableName1 = "(select * from user_info where id>50) t1"
// 指定读取mysql需要的参数封装
val props = new Properties()
props.setProperty("user","root")
props.setProperty("password","123456")
// TODO 读取mysql第一种方式 : 此种方式获取 mysql 生成的 DataFrame的分区数 = 1
// TODO 此种方式获一般只用于小数据量场景
val df: DataFrame = spark.read.jdbc(url, tableName, props)
println(df.rdd.getNumPartitions)
// TODO 读取mysql第二种方式: 此种方式读取mysql生成的DataFrame的分区数 = conditions数组中元素个数 <不用>
//每个分区拉取数据的 where 条件
val conditions: Array[String] = Array("id<=50", "id>50 and id <= 300", "id>300 and id<500", "id>=500")
val df2: DataFrame = spark.read.jdbc(url, tableName, conditions, props)
// TODO 读取mysql第三种方式: 此种式读取mysql 生成的DataFrame的分区数 = (upperBound - lowerBound) < numPartitions ? (upperBound - lowerBound) : nunPartitions
// TODO 此方式一般用于大数据量场景
val tableName2 = "(select min(id) min_id, max(id) max_id froam user_info) user"
val minMaxIdDF: DataFrame = spark.read.jdbc(url, tableName2, props)
val row: Row = minMaxIdDF.first()
val min_id: Long = row.getAs[Long]("min_id")
val max_id: Long = row.getAs[Long]("max_id")
// 用于分区的mysql字段名,必须是数字、日期、时间戳, 建议用主键
val columnName = "id"
// 用于决定分区数据间距的下限, 一般设置为 columnName字段的最小值
val lowerBound: Int = 1
// 用于决定分区数据间距的上限, 一般设置为 columnName字段的最大值
val upperBound: Int = 1
val df4: DataFrame = spark.read.jdbc(url, tableName, columnName, lowerBound, upperBound, 5, props)
println(df4.rdd.getNumPartitions)
}
}
3.2 保存数据
* sparksql write有两种方式
* 1. ds/df.write <不用>
* .mode(SaveMode.XXX) --- 指定写入模式
* .format("csv/text/parquet/jdbc/json/orc") --- 指定读数据格式
* [.option(k,v)....] --- 指定读取数据需要的参数
* .save([path]) --- 保存
* 2. ds/df.write.mode(SaveMode.XXX).[option(k,v)..].json/csv/parquet/orc/jdbc(...) <常用>
*SaveMode.Overwrite:如果保存数据的目录表存在,则覆盖数据[一般用于数据写入DFS]
SaveMode.Append : 如果保存数据的目录/表存在,则追加数据[一般用于将数据写入 没有主键 的mysql表中]
如果有主键, 如何写入mysql ==> 代码实现:
package com.atguigu.day007
import org.apache.spark.sql.SparkSession
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Properties
class $03_Writer {
val spark: SparkSession = SparkSession.builder().appName("appName").master("local[4]").getOrCreate()
/**
* 如果有主键, 如何写入mysql
*/
def writeSql(): Unit ={
val df=spark.read.json(path ="datas/test.json")
val url="jdbc:mysql://hadoop102:3306/test"
val props = new Properties()
props.setProperty("user", "root")
props.setProperty("password","123456")
//TODO 当 主键 冲突时 会报错
// df.write.mode("append").jdbc(url,table ="test",props)
//TODO 如果想要向有主键的表中写入数据,则使用下面这种方式
df.rdd.foreachPartition(it => {
var connection: Connection = null
var statement:PreparedStatement = null
try{
connection = DriverManager.getConnection(url, "root", "123456")
statement = connection.prepareStatement(
"""
|insert into test(id,name,age,sex) values(?,?,?,?)
|on duplicate key update name=?, age=?, sex=?
|""".stripMargin)
var i = 0
it.foreach(row=>{
val id: Long = row.getAs[Long]("id")
val name: String = row.getAs[String]("name")
val age: Long = row.getAs[Long]("age")
val sex: String = row.getAs[String]("sex")
statement.setLong(1,id)
statement.setString(2,name)
statement.setLong(3,age)
statement.setString(4,sex)
statement.setString(5,name)
statement.setLong(6,age)
statement.setString(7,sex)
statement.addBatch()
if(i % 1000 == 0) {
statement.executeBatch()
statement.clearBatch()
}
i+=1
})
statement.executeBatch()
}catch {
case e: Exception => e.printStackTrace()
}finally {
if (statement!=null)
statement.close()
if (connection!=null)
connection.close()
}
})
}
}
3.4 与Hive交互
SparkSQL可以采用内嵌Hive,也可以采用外部Hive。企业开发中,通常采用外部Hive。
* idea 中 操作hive
* 1、引入 spark-hive, mysql jdbc 依赖
* 2. 将 hive-site.xml 放入 resource 目录
* 3. 在 创建 sparkSession 的时候开启hive 支持
* 4. 操作 hive: spark.sql(...)