一、SparkSQL编程模型的创建与转化
1、DataFrame的构建
people.txt数据:
1 zhangsan 20
2 lisi 29
3 wangwu 25
4 zhaoliu 30
5 tianqi 35
6 kobe 40
people.json数据:在SparkSQL—简介及RDD V.S DataFrame V.S Dataset编程模型详解里
1、从Spark数据源进行创建
//创建程序入口
val spark = SparkSession.builder().appName("dataFrame").master("local[*]").getOrCreate()
val sc = spark.sparkContext
//设置日志级别
sc.setLogLevel("WARN")
//加载数据
val dataFrame = spark.read.format("json").load("F:\\test\\people.json")
//展示数据
dataFrame.show()
2、从RDD进行转换
//创建程序入口
val spark = SparkSession.builder().appName("dataFrame").master("local[*]").getOrCreate()
val sc = spark.sparkContext
//设置日志级别
sc.setLogLevel("WARN")
//导包
import spark.implicits._
//加载文件
val file :RDD[String] = sc.textFile("F:\\test\\person.txt")
//按照分隔符进行切分
val filemap :RDD[Array[String]] = file.map(_.split(" "))
//指定数据类型
val tran :RDD[(Int,String,Int)] = filemap.map(x=>(x(0).toInt,x(1),x(2).toInt))
//带参数的是指定表头名字
val dataFrame2=tran.toDF("id","name","age")
3、通过反射创建DataFrame
case class Person(id:Int,name:String,age:Int)
//样例类反射获取列名创建DataFrame
//加载文件
val file :RDD[String] = sc.textFile("F:\\test\\person.txt")
//按照分隔符进行切分
val filemap :RDD[Array[String]] = file.map(_.split(" "))
//指定数据类型
val tran= filemap.map(x1=>Person(x1(0).toInt,x1(1),x1(2).toInt))
//将rdd转换为DataFrame
val dataFrame1 = tran.toDF()
4、动态编程
//数据和结构分离加载的方式动态创建dataFrame
//加载数据
val row:RDD[Row] = sc.parallelize(List(
Row(1, "李伟", 1, 180.0),
Row(2, "汪松伟", 2, 179.0),
Row(3, "常洪浩", 1, 183.0),
Row(4, "麻宁娜", 0, 168.0)))
//指定schema
/*val structType = StructType(List(
StructField("id", DataTypes.IntegerType, false),
StructField("name", DataTypes.StringType, false),
StructField("sex", DataTypes.IntegerType, false),
StructField("height", DataTypes.DoubleType, false)
))*/
val structType = new StructType()
.add("id","Int")
.add("name","string")
.add("sex","Int")
.add("height","Double")
//创建DataFrame
val dataFrame3 = spark.createDataFrame(row,structType)
//Row:代表的是二维表中的一行记录,或者就是一个Java对象
//StructType:是该二维表的元数据信息,是StructField的集合
//StructField:是该二维表中某一个字段/列的元数据信息(主要包括,列名,类型,是否可以为null)
2、Dataset的构建
case class Student(id: Int, name: String, sex: Int, age: Int)
object Create_DataSet {
def main(args: Array[String]): Unit = {
//创建程序入口
val spark = SparkSession.builder().appName("dataSet").master("local[*]").getOrCreate()
//设置日志级别
val sc = spark.sparkContext
sc.setLogLevel("WARN")
//导包
import spark.implicits._
//加载数据
val list = List(
new Student(1, "王盛芃", 1, 19),
new Student(2, "李金宝", 1, 49),
new Student(3, "张海波", 1, 39),
new Student(4, "张文悦", 0, 29)
)
//创建DataSet
val ds = spark.createDataset[Student](list)
//展示输出
ds.show()
}
注:在创建Dataset的时候,需要注意数据的格式,必须使用case class,或者基本数据类型,同时需要通过import spark.implicts._来完成数据类型的编码,而抽取出对应的元数据信息,否则编译无法通过。
3、RDD和DataFrame以及DataSet的互相转换
//创建程序入口
val spark = SparkSession.builder().appName("transform").master("local[*]").getOrCreate()
//调用sparkContext
val sc = spark.sparkContext
//设置日志级别
sc.setLogLevel("WARN")
//导包
import spark.implicits._
//加载数据
val file = sc.textFile("F:\\test\\person.txt")
//切分
val fileMap = file.map(_.split(" "))
//指定数据类型
val tran = fileMap.map(x=>(x(0).toInt,x(1),x(2).toInt))
//----------三者之间的转换--------
//rdd=>DF
val dataFrame = tran.toDF("id","name","age")
//rdd=>DS
val dataSet = tran.toDS()
//DS=>rdd
dataSet.rdd
//DF=>rdd
dataFrame.rdd
//DF=>DS
val ds = dataFrame.as[(Int,String,Int)]
//DS=>DF
val df = ds.toDF()
二、SparkSQL统一数据加载与落地
1、数据加载
//创建程序入口
val spark = SparkSession.builder().appName("load").master("local[*]").getOrCreate()
//调用sparkContext
val sc = spark.sparkContext
//设置日志级别
sc.setLogLevel("WARN")
//加载数据
//第一种方式:spark.read.format(数据文件格式).load(path),默认加载的文件格式为parquet
spark.read.format("parquet").load("F:\\test\\parquet").show()
spark.read.format("json").load("F:\\test\\json").show()
spark.read.format("csv").load("F:\\test\\csv").show()
//加载数据库中的表的数据
spark.read.format("jdbc")
.option("url","jdbc:mysql://localhost:3306/mydata")
.option("user","root")
.option("password","root")
.option("dbtable","person")
.load()
.show()
//第二种方式
spark.read.parquet("F:\\test\\parquet").show()
spark.read.json("F:\\test\\json").show()
spark.read.csv("F:\\test\\csv").show()
//加载数据库中的表
val pro =new Properties()
pro.put("user","root")
pro.put("password","root")
spark.read.jdbc("jdbc:mysql://localhost:3306/mydata","person",pro).show()
2、数据落地
//创建程序入口
val spark = SparkSession.builder().appName("save").master("local[*]").getOrCreate()
//调用sparkContext
val sc = spark.sparkContext
//设置日志级别
sc.setLogLevel("WARN")
//加载数据
val dataFrame = spark.read.json("F:\\test\\people.json")
//数据落地
//第一种方式,save的默认格式也是parquet
dataFrame.write.format("parquet").save("F:\\test\\parquet")
dataFrame.write.format("json").save("F:\\test\\json")
dataFrame.write.format("csv").save("F:\\test\\csv")*/
//将数据保存到数据库
dataFrame.write.format("jdbc")
.option("url","jdbc:mysql://localhost:3306/mydata")
.option("user","root")
.option("password","root")
.option("dbtable","person")
.save()
//第二种方式
dataFrame.write.parquet("F:\\test\\parquet")
dataFrame.write.json("F:\\test\\json")
dataFrame.write.csv("F:\\test\\csv")
//保存到数据库
val pro = new Properties()
pro.setProperty("user","root")
pro.setProperty("password","root")
dataFrame.write.jdbc("jdbc:mysql://localhost:3306/mydata","person",pro)
三、自定义函数的使用
val spark = SparkSession.builder().appName("UDF").master("local[*]").getOrCreate()
val sc = spark.sparkContext
sc.setLogLevel("WARN")
//案例
//加载文件
val dataFrame = spark.read.json("file:\\F:\\test\\people.json")
//sql查询风格
//首先将数据注册为一张表
dataFrame.createOrReplaceTempView("people")
//赋予函数功能
val fun=(x:String)=>{
x.toUpperCase()
}
//注册函数
spark.udf.register("upper",fun)
//使用sql风格查询
spark.sql("select name, upper(name) from people").show()