4.4 Shuffle 分区数目
运行上述程序时,查看WEB UI监控页面发现,某个Stage中有200个Task任务,也就是说RDD有200分区Partition。
原因:在SparkSQL中当Job中产生Shuffle时,默认的分区数(spark.sql.shuffle.partitions )为200,在实际项目中要合理的设置。在构建SparkSession实例对象时,设置参数的值:
// 构建SparkSession实例对象
val spark: SparkSession = SparkSession.builder()
.master("local[4]")
.appName(this.getClass.getSimpleName.stripSuffix("$"))
// TODO: 设置shuffle时分区数目
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
// 导入隐式转换
import spark.implicits._
第五章 Dataset
Dataset是在Spark1.6中添加的新的接口,是DataFrame API的一个扩展,是Spark最新的数据抽象,结合了RDD和DataFrame的优点。
- 与RDD相比:保存了更多的描述信息,概念上等同于关系型数据库中的二维表;
- 与DataFrame相比:保存了类型信息,是强类型的,提供了编译时类型检查,调用Dataset的方法先会生成逻辑计划,然后被Spark的优化器进行优化,最终生成物理计划,然后提交到集群中运行;
5.1 Dataset 是什么
Dataset是一个强类型的特定领域的对象,这种对象可以函数式或者关系操作并行地转换。
从Spark 2.0开始,DataFrame与Dataset合并,每个Dataset也有一个被称为一个DataFrame的类型化视图,这种DataFrame是Row类型的Dataset,即Dataset[Row]。
Dataset API是DataFrames的扩展,它提供了一种类型安全的,面向对象的编程接口。它是一个强类型,不可变的对象集合,映射到关系模式。在数据集的核心 API是一个称为编码器的新概念,它负责在JVM对象和表格表示之间进行转换。表格表示使用Spark内部Tungsten二进制格式存储,允许对序列化数据进行操作并提高内存利用率。Spark 1.6支持自动生成各种类型的编码器,包括基本类型(例如String,Integer,Long),Scala案例类和Java Bean。
针对Dataset数据结构来说,可以简单的从如下四个要点记忆与理解:
Spark 框架从最初的数据结构RDD、到SparkSQL中针对结构化数据封装的数据结构DataFrame,最终使用Dataset数据集进行封装,发展流程如下。
所以在实际项目中建议使用Dataset进行数据封装,数据分析性能和数据存储更加好。
5.2 对比DataFrame
Spark在Spark 1.3版本中引入了Dataframe,DataFrame是组织到命名列中的分布式数据集合,但是有如下几点限制:
编译时类型安全:
- Dataframe API不支持编译时安全性,这限制了在结构不知道时操纵数据。
- 以下示例在编译期间有效。但是,执行此代码时将出现运行时异常。
无法对域对象(丢失域对象)进行操作: - 将域对象转换为DataFrame后,无法从中重新生成它;
- 下面的示例中,一旦我们从personRDD创建personDF,将不会恢复Person类的原始RDD(RDD [Person]);
基于上述的两点,从Spark 1.6开始出现Dataset,至Spark 2.0中将DataFrame与Dataset合并,其中DataFrame为Dataset特殊类型,类型为Row。
针对RDD、DataFrame与Dataset三者编程比较来说,Dataset API无论语法错误和分析错误在编译时都能发现,然而RDD和DataFrame有的需要在运行时才能发现。
此外RDD与Dataset相比较而言,由于Dataset数据使用特殊编码,所以在存储数据时更加节省内存。