SPARKSQL3.0-Catalog源码剖析
一、前言
阅读本节需要先掌握Analyzer阶段的相关知识
在Spark SQL 系统中,Catalog 主要用于各种函数资源信息和元数据信息 (数据库、数据表数据视图、数据分区等)的统一管理。
初次看这种解释还是比较模糊,一会我们看源码就很清晰了
二、源码
首先Catalog是在SessionState类中,看过SessionState一节的话应该知道SessionState是在BaseSessionStateBuilder类中的build函数构建的:
def build(): SessionState = {
    new SessionState(
      session.sharedState,
      conf,
      experimentalMethods,
      functionRegistry,
      udfRegistration,
      () => catalog,		// 构建catalog
      sqlParser,
      () => analyzer,
      () => optimizer,
      planner,
      () => streamingQueryManager,
      listenerManager,
      () => resourceLoader,
      createQueryExecution,
      createClone,
      columnarRules)
  }
// 构建catalog
protected lazy val catalog: SessionCatalog = {
    val catalog = new SessionCatalog(
      () => session.sharedState.externalCatalog,
      () => session.sharedState.globalTempViewManager,
      functionRegistry,
      conf,
      SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
      sqlParser,
      resourceLoader)
    parentState.foreach(_.catalog.copyStateTo(catalog))
    catalog
  }
回顾一下:由于BaseSessionStateBuilder有两个子类分别是:HiveSessionStateBuilder、SessionStateBuilder;
所以决定构建哪个builder关键在于构建SparkSession时是否使用了enableHiveSupport函数,这里我们主要讲有关hive的HiveSessionStateBuilder,SessionStateBuilder不做介绍;

父类BaseSessionStateBuilder的catalog函数在子类HiveSessionStateBuilder中被覆盖,故看子类HiveSessionStateBuilder的catalog函数:

可以看到此处创建的catalog是HiveSessionCatalog【是SessionCatalog的子类】,这里放一张catalog的类图:
如果使用hive则创建HiveSessionCatalog

这里简单介绍一下HiveSessionCatalog的属性:
class HiveSessionCatalog(
    externalCatalogBuilder: () => ExternalCatalog,
    globalTempViewManagerBuilder: () => GlobalTempViewManager,
    val metastoreCatalog: HiveMetastoreCatalog,
    functionRegistry: FunctionRegistry,
    conf: SQLConf,
    hadoopConf: Configuration,
    parser: ParserInterface,
    functionResourceLoader: FunctionResourceLoader)
1. externalCatalogBuilder【重要】:(外部系统的Catalog ):用来管理外部数据库(Databases )、数据 (Tables )、数据分区( Partitions )和函数(Functions )的接口;顾名思义其目标是与外部系统交互。
 2. globalTempViewManagerBuilder:全局的临时视图管理,对应 DataFrame 中常用的 createGlobalTempView 方法,进行跨 Session 的视图管理
 3. metastoreCatalog:用于与配置单元元存储交互的旧目录,将来将取消此类,统一使用externalCatalogBuilder对外交互
 4. functionRegistry:函数注册接口,用来实现对函数的注册 Register 、查找( Lookup )和删除Drop 等功能。
 5. xxxconf:相关配置类
 6. functionResourceLoader:函数资源加载器:在 SparkSQL 中除内置实现的各种函数外,还支持用户自定义的函数和 Hive 中的各种函数
通过以上属性的介绍,再看关于catalog的定义:Catalog 主要用子各种元数资源信息和元数据信息 (数据库、数据表数据视图、数据分区与函数等)的统一管理;感觉是不是清晰了一些
在上面众多属性中最重要的是:externalCatalogBuilder,下面将着重介绍它;
先来看一下它的构建过程:可以看到是通过session.sharedState.externalCatalog构建出来的

这里进入到ShareState类中,贴一下源码:可以看到是根据hive的属性值来确定最终会构建HiveExternalCatalog
lazy val externalCatalog: ExternalCatalogWithListener = {
  	// 这里是通过externalCatalogClassName函数获取到要反射的类名,然后通过reflect函数反射获取到externalCatalog
    val externalCatalog = SharedState.reflect[ExternalCatalog, SparkConf, Configuration](
      SharedState.externalCatalogClassName(conf), conf, hadoopConf)
    // 默认数据库default相关配置
    val defaultDbDefinition = CatalogDatabase(
      SessionCatalog.DEFAULT_DATABASE,
      "default database",
      CatalogUtils.stringToURI(conf.get(WAREHOUSE_PATH)),
      Map())
	  // 这里就是如果连接不上hive,会在默认目录下构建default数据库
    if (!externalCatalog.databaseExists(SessionCatalog.DEFAULT_DATABASE)) {
      externalCatalog.createDatabase(defaultDbDefinition, ignoreIfExists = true)
    }
    // Wrap to provide catalog events
    val wrapped = new ExternalCatalogWithListener(externalCatalog)
    // spark的事件总线,暂不关注
    wrapped.addListener((event: ExternalCatalogEvent) => sparkContext.listenerBus.post(event))
    wrapped
  }
private val HIVE_EXTERNAL_CATALOG_CLASS_NAME = "org.apache.spark.sql.hive.HiveExternalCatalog"
// 可以看到和SessionState构建的方式一样,关键在于构建SparkSession时是否使用了enableHiveSupport函数
// 如果使用了enableHiveSupport,则CATALOG_IMPLEMENTATION = hive ,即此处会构建HiveExternalCatalog
private def externalCatalogClassName(conf: SparkConf): String = {
    conf.get(CATALOG_IMPLEMENTATION) match {
      case "hive" => HIVE_EXTERNAL_CATALOG_CLASS_NAME
      case "in-memory" => classOf[InMemoryCatalog].getCanonicalName
    }
  }
这里放一张HiveExternalCatalog相关类图:可以看到统一父类的接口是ExternalCatalog,如果非hive,则会生成InMemoryCatalog

这里我们再看一下HiveExternalCatalog实现的函数:可以看出对hive的DDL、DML操作都在这个类中

在该类中大部分函数的操作都是使用内部的HiveClient【重要】:这是HiveClient接口,其实现类为HiveClientImpl


而在HiveClientImpl类中同样有一个shim变量,该变量便是spark兼容不同版本hive的关键所在:其内部实现较为巧妙,是将各个函数名称作为抽象函数,再由不同的版本的子类实现函数反射参数名称和入参类型,感兴趣的小伙伴可以跟进去看看

HiveClientImpl内部提供了各种DML/DDL的实现,我们随便看一个dropTable函数:都是使用shim引用进行操作
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EnCWa1o3-1669097347665)(/Users/hzxt/Library/Application Support/typora-user-images/image-20221103123942870.png)]
再来看一下该类中的import引用:都是hive的原生包引用,至此可以很清楚的知道spark中对hive元数据的操作几乎都是通过hive原生包所提供的API

至此catalog中的externalCatalog介绍完毕,其余属性感兴趣的小伙伴可以对照源码观看
**这里再介绍一个Analyzer阶段中hive表解析规则ResolveTables,如果没看过Analyzer阶段,建议先看完Analyzer阶段:https://blog.csdn.net/qq_35128600/article/details/127970299 **
当我们在执行一个sql = select 字段 from table时,Analyzer阶段会通过catalog来确定表是否存在以及获取表字段等相关属性,这个步骤是由解析表数据规则:ResolveTables实现的

点进去看到apply函数有模式匹配,这里我们以UnresolvedTable【未解析表】为节点类型,随后执行CatalogV2Util.loadTable(catalog, ident)函数

这里又调用了catalog.asTableCatalog.loadTable(ident)

这里来到了TableCatalog.loadTable,根据下面的结构知道这里会有几个子类,具体执行的其实是V2SessionCatalog

我们可以简单做个debug测试验证一下,如下我们写一个sql然后执行


在V2SessionCatalog中看到调用catalog.getTableMetadata,此处走到了SessionCatalog,如下图
在catalog.getTableMetadata函数中执行了externalCatalog.getTable(db, table)

这里看到externalCatalog就比较亲切了,假设我们是连接hive的session,则此时会走HiveExternalCatalog的getTable函数:


HiveExternalCatalog的getTable调用了restoreTableMetadata函数,此函数将会访问hive的metastore获取此表的元数据信息并构建出CatalogTable实体类返回

我们再回到ResolveTables规则中,如下图

至此spark的catalog的介绍以及Analyzer阶段中的简单使用结束



















