3.3 Row
DataFrame中每条数据封装在Row中,Row表示每行数据,具体哪些字段位置,获取DataFrame中第一条数据。
如何构建Row对象:要么是传递value,要么传递Seq,官方实例代码:
import org.apache.spark.sql._
// Create a Row from values.
Row(value1, value2, value3, ...)
// Create a Row from a Seq of values.
Row.fromSeq(Seq(value1, value2, ...))
如何获取Row中每个字段的值呢????
-
方式一:下标获取,从0开始,类似数组下标获取
-
方式二:指定下标,知道类型
-
方式三:通过As转换类型, 此种方式开发中使用最多
3.4 RDD转换DataFrame
实际项目开发中,往往需要将RDD数据集转换为DataFrame,本质上就是给RDD加上Schema信息,官方提供两种方式:类型推断和自定义Schema。
官方文档:http://spark.apache.org/docs/2.4.5/sql-getting-started.html#interoperating-with-rdds
范例演示说明:使用经典数据集【电影评分数据u.data】,先读取为RDD,再转换为DataFrame。
字段信息:user id 、 item id、 rating 、 timestamp。
反射类型推断
当RDD中数据类型CaseClass样例类时,通过反射Reflecttion获取属性名称和类型,构建Schema,应用到RDD数据集,将其转换为DataFrame。
第一步、定义CaseClass样例类,封装电影评分数据
/**
* 封装电影评分数据
*
* @param userId 用户ID
* @param itemId 电影ID
* @param rating 用户对电影评分
* @param timestamp 评分时间戳
*/
case class MovieRating(
userId: String,
itemId: String,
rating: Double,
timestamp: Long
)
第二步、SparkContext读取电影评分数据封装到RDD中,转换数据类型
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 采用反射的方式将RDD转换为DataFrame和Dataset
*/
object SparkRDDInferring {
def main(args: Array[String]): Unit = {
// 构建SparkSession实例对象
val spark: SparkSession = SparkSession
.builder() // 使用建造者模式构建对象
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[3]")
.getOrCreate()
import spark.implicits._
// 读取电影评分数据u.data, 每行数据有四个字段,使用制表符分割
// user id | item id | rating | timestamp.
val rawRatingsRDD: RDD[String] = spark.sparkContext
.textFile("datas/ml-100k/u.data", minPartitions = 2)
// 转换数据
val ratingsRDD: RDD[MovieRating] = rawRatingsRDD
.filter(line => null != line && line.trim.split("\t").length == 4)
.mapPartitions{iter =>
iter.map{line =>
// 拆箱操作, Python中常用
val Array(userId, itemId, rating, timestamp) = line.trim.split("\t")
// 返回MovieRating实例对象
MovieRating(userId, itemId, rating.toDouble, timestamp.toLong)
}
}
// 将RDD转换为DataFrame和Dataset
val ratingsDF: DataFrame = ratingsRDD.toDF()
/*
root
|-- userId: string (nullable = true)
|-- itemId: string (nullable = true)
|-- rating: double (nullable = false)
|-- timestamp: long (nullable = false)
*/
ratingsDF.printSchema()
ratingsDF.show(10)
// 应用结束,关闭资源
spark.stop()
}
}
此种方式要求RDD数据类型必须为CaseClass,转换的DataFrame中字段名称就是CaseClass中属性名称。
自定义Schema
依据RDD中数据自定义Schema,类型为StructType,每个字段的约束使用StructField定义,具体步骤如下:
第一步、RDD中数据类型为Row:RDD[Row];
第二步、针对Row中数据定义Schema:StructType;
第三步、使用SparkSession中方法将定义的Schema应用到RDD[Row]上;
范例演示代码:
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
/**
* 自定义Schema方式转换RDD为DataFrame
*/
object SparkRDDSchema {
def main(args: Array[String]): Unit = {
// 构建SparkSession实例对象
val spark: SparkSession = SparkSession
.builder() // 使用建造者模式构建对象
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[3]")
.getOrCreate()
import spark.implicits._
// 读取电影评分数据u.data, 每行数据有四个字段,使用制表符分割
// user id | item id | rating | timestamp.
val ratingsRDD: RDD[String] = spark
.sparkContext.textFile("datas/ml-100k/u.data", minPartitions = 2)
// a. RDD[Row]
val rowsRDD: RDD[Row] = ratingsRDD.mapPartitions{ iter =>
iter.map{line =>
// 拆箱操作, Python中常用
val Array(userId, itemId, rating, timestamp) = line.trim.split("\t")
// 返回Row实例对象
Row(userId, itemId, rating.toDouble, timestamp.toLong)
}
}
// b. schema
val rowSchema: StructType = StructType(
Array(
StructField("userId", StringType, nullable = true),
StructField("itemId", StringType, nullable = true),
StructField("rating", DoubleType, nullable = true),
StructField("timestamp", LongType, nullable = true)
)
)
// c. 应用函数createDataFrame
val ratingDF: DataFrame = spark.createDataFrame(rowsRDD, rowSchema)
ratingDF.printSchema()
ratingDF.show(10, truncate = false)
// 应用结束,关闭资源
spark.stop()
}
}
此种方式可以更加体会到DataFrame = RDD[Row] + Schema组成,在实际项目开发中灵活的选择方式将RDD转换为DataFrame
3.5 toDF函数
除了上述两种方式将RDD转换为DataFrame以外,SparkSQL中提供一个函数:toDF,通过指定列名称,将数据类型为元组的RDD或Seq转换为DataFrame,实际开发中也常常使用。
范例演示:将数据类型为元组的RDD或Seq直接转换为DataFrame。
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 隐式调用toDF函数,将数据类型为元组的Seq和RDD集合转换为DataFrame
*/
object SparkSQLToDF {
def main(args: Array[String]): Unit = {
// 构建SparkSession实例对象,通过建造者模式创建
val spark: SparkSession = SparkSession
.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[3]")
.getOrCreate()
import spark.implicits._
// TODO: 1、构建RDD,数据类型为三元组形式
val usersRDD: RDD[(Int, String, Int)] = spark.sparkContext.parallelize(
Seq(
(10001, "zhangsan", 23),
(10002, "lisi", 22),
(10003, "wangwu", 23),
(10004, "zhaoliu", 24)
)
)
// 将RDD转换为DataFrame
val usersDF: DataFrame = usersRDD.toDF("id", "name", "age")
usersDF.printSchema()
usersDF.show(10, truncate = false)
println("========================================================")
val df: DataFrame = Seq(
(10001, "zhangsan", 23),
(10002, "lisi", 22),
(10003, "wangwu", 23),
(10004, "zhaoliu", 24)
).toDF("id", "name", "age")
df.printSchema()
df.show(10, truncate = false)
// TODO: 应用结束,关闭资源
spark.stop()
}
}
运行程序结果如下截图: