1、前言
通过前面的文章我们了解到,spark sql通过catalyst框架解析sql,而在将sql语句转变为可执行的任务过程中会将大的sql解析流程划分为未解析的逻辑计划、解析后的逻辑计划、优化后的逻辑计划、物理计划、可执行物理计划等阶段。大概的解析流程如下所述:
1)SQL语句经过SqlParser解析成Unresolved LogicalPlan;
2)使用analyzer结合数据字典(catalog)进行绑定,生成Resolved LogicalPlan;
3)使用optimizer对Resolved LogicalPlan进行优化,生成Optimized LogicalPlan;
4)使用SparkPlan将LogicalPlan转换成PhysiclPlan;
5)使用prepareForException将PhysicalPlan转换成可执行物理计划。
spark sql将解析流程划分为多个阶段,每个阶段都有其特有的功能和解析工具,这种清晰的划分使得我们更容易理解catalyst框架的功能作用。除此之外,该框架在每个解析阶段都提供了自定义扩展点,即在每个解析阶段都可以引入我们的自定义处理逻辑。
2、自定义扩展点
spark sql大体在四个阶段提供了扩展能力,分别是未解析的逻辑计划、解析后的逻辑计划、优化后的逻辑计划、物理计划。在这四个阶段后面我们都可以引入自己的处理逻辑,具体的引入方法如下:
1)Parser 阶段:
injectParser – 添加parser自定义规则,parser负责SQL解析。
2)Analyzer阶段(都是analysis解析后才执行):
injectCheckRule – 添加Analyzer自定义Check规则。
injectResolutionRule – 添加Analyzer自定义规则到Resolution阶段,analyzer负责逻辑执行计划生成。
injectPostHocResolutionRule – 添加Analyzer自定义规则到Post Resolution阶段。
3)optimizer阶段:
injectOptimizerRule – 添加optimizer自定义规则,optimizer负责逻辑执行计划的优化,我们例子中就是扩展了逻辑优化规则。
4)planner strategy阶段(物理计划阶段):
injectPlannerStrategy – 添加planner strategy自定义规则,planner负责物理执行计划的生成。
3、扩展demo
首先是通用的main方法,因为所有示例都用的同一个查询,所以这里只展示一次:
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder
.appName("test")
.master("local")
.withExtensions(e => e.injectParser(ExtendParser))
.withExtensions(e => e.injectResolutionRule(ExtendAnalyzerResolutionRule))
.withExtensions(e => e.injectPostHocResolutionRule(ExtendAnalyzerPostHocResolutionRule))
.withExtensions(e => e.injectCheckRule(ExtendAnalyzerCheckRule))
.withExtensions(e => e.injectOptimizerRule(ExtendOptimizerRule))
.withExtensions(e => e.injectPlannerStrategy(ExtendSparkStrategy))
.getOrCreate
import org.apache.spark.sql.types._
val schema = StructType(
List(StructField("id", IntegerType, nullable = false),
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = false))
)
val rdd = sparkSession.sparkContext.parallelize(Seq(
Row(1, "xiaohong", 34),
Row(2, "xiaoli", 42),
Row(3, "xiaoming", 28),
))
val df = sparkSession.sqlContext.createDataFrame(rdd, schema)
df.createOrReplaceTempView("person")
val df2 = sparkSession.sql("select id,name from person where age > 30")
df2.show()
}
另外这里有个小小的语法知识,就是case classs 不用我们主动new创建对象,因为样例类默认提供了apply方法,因此我们可以直接写类名,下面所有的扩展对象我们都用case class进行定义。
3.1 Parser 阶段
这个阶段是将sql语句初始转换为语法树。通常这个阶段扩展的场景比较少,难度也较大。
case class ExtendParser(sparkSession: SparkSession, delegate: ParserInterface) extends ParserInterface with Logging{
override def parsePlan(sqlText: String): LogicalPlan = {
logInfo("parser: extend ourselves parser ")
delegate.parsePlan(sqlText)
}
override def parseExpression(sqlText: String): Expression = delegate.parseExpression(sqlText)
override def parseTableIdentifier(sqlText: String): TableIdentifier = delegate.parseTableIdentifier(sqlText)
override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = delegate.parseFunctionIdentifier(sqlText)
override def parseMultipartIdentifier(sqlText: String): Seq[String] = delegate.parseMultipartIdentifier(sqlText)
override def parseTableSchema(sqlText: String): StructType = delegate.parseTableSchema(sqlText)
override def parseDataType(sqlText: String): DataType = delegate.parseDataType(sqlText)
override def parseRawDataType(sqlText: String): DataType = delegate.parseRawDataType(sqlText)
}
运行后可以看到如下一行日志:
3.2 Analyzer阶段
这个阶段是将语法树和数据源关联上,将语法树上的表、字段等信息转换为解析后的状态。
case class ExtendAnalyzerResolutionRule(sparkSession: SparkSession) extends Rule[LogicalPlan] with Logging {
override def apply(plan: LogicalPlan): LogicalPlan = {
logInfo("Analyzer: extend ourselves analyzer - ResolutionRule")
plan
}
}
case class ExtendAnalyzerPostHocResolutionRule(sparkSession: SparkSession) extends Rule[LogicalPlan] with Logging {
override def apply(plan: LogicalPlan): LogicalPlan = {
logInfo("Analyzer: extend ourselves analyzer - PostHocResolutionRule")
plan
}
}
case class ExtendAnalyzerCheckRule(sparkSession: SparkSession) extends (LogicalPlan => Unit) with Logging {
override def apply(v1: LogicalPlan): Unit = {
logInfo("Analyzer: extend ourselves analyzer - CheckRule")
}
}
运行后的日志效果如下:
可以看到有很多重复的解析日志,这是因为catalyst框架解析语法树时是以访问者的模式将规则应用到语法树所有节点上。而不是将语法树传入解析规则中。
3.3 optimizer阶段
这个阶段主要是针对解析的逻辑计划做各种优化处理,目前spark中提供了很多针对固定场景的优化规则。但是有些特定业务场景下的sql优化,spark sql中并没有提供,此时就需要我们通过手动扩展的方式引入我们的优化逻辑。这个阶段的扩展也是最常用的扩展阶段。
case class ExtendOptimizerRule(sparkSession: SparkSession) extends Rule[LogicalPlan] with Logging {
override def apply(plan: LogicalPlan): LogicalPlan = {
logInfo("Optimizer: extend ourselves optimizer - OptimizerRule")
plan
}
}
运行后的日志效果如下:
3.4 planner strategy阶段
这个阶段主要跟物理计划的生成有关,比如你想更改查询源的方式,从jdbc方式改为odbc或者直接查询文件。
case class ExtendSparkStrategy(sparkSession: SparkSession) extends SparkStrategy with Logging {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
logInfo("Planner strategy: extend ourselves strategy - SparkStrategy")
Seq.empty
}
}
运行后日志效果如下:
4、总结
本文章主要讲解的是spark sql解析中的扩展方式,这些内容更偏向于实战。具体的逻辑计划、物理计划等的细节会在后续系列文章中进行讲解。因为Parser 阶段主要是将sql语句转换为基础的语法树;Analyzer阶段主要是将语法树和数据源关联上,将语法树上的表、字段等信息转换为解析后的状态。这两个阶段扩展的需求很少,因此工作中常用的扩展方式是在optimizer阶段和planner strategy阶段。