死磕sparkSQL源码之TreeNode

news2024/11/19 9:31:55

InternalRow体系

学习TreeNode之前,我们先了解下InternalRow。

对于我们一般接触到的数据库关系表来说,我们对于数据库中的数据操作都是按照“行”为单位的。在spark sql内部实现中,InternalRow是用来表示这一行行数据的类。看下源码中的解释,InternalRow作为一个抽象类,包numFields 和 update 方法,以及各列数据对应的 get 与 set 方法,但具体的实现逻辑体现在不同的子类中

/**
* An abstract class for row used internally in Spark SQL, which only contains the columns as
* internal types.
一个抽象类,用于表示spark SQL内部行,只包含内部类型的多个列(其实就是表示一行行数据的类)
*/

详细代码这里就不贴了,整理下一些重要接口的功能含义好了,注意InternalRow中都是根据下标来访问和操作列元素的 。

InternalRow实现类包括,BaseGenericinternalRow、UnsafeRow 和 JoinedRow 3 个直接子类

  • BaseGenericinternalRow:也是一个抽象类,实现了SpecializedGetters类中定义的所有GET方法,但是最终还是调用genericGet方法实现最终逻辑,genericGet方法在BaseGenericinternalRow内中只是定义了一个接口,最终实现在BaseGenericinternalRow的子类中。
  • JoinedRow:该类主要用于join操作,两个InternalRow放在一起形成新的InternalRow,在sparksql 聚合和join相关操作中,会用的比较多
  • UnsafeRow:不采用 Java 对象存储的方式,避免了 JVM 中垃圾回收( GC )的代价 。 此外,UnsafeRow 对行数据进行了特定的编码,使得存储更加高效 。

TreeNode体系

接下来正式开始进行TreeNode的学习

TreeNode是Spark SQL中所有树结构的基类,定义了一系列通用的集合操作和树遍历的操作接口。我们先看下TreeNode的代码

abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with TreePatternBits {


}

首先TreeNode是一个抽象类,一个泛型类;这里TreeNode[BaseType <: TreeNode[BaseType]]这种书写方式,不知道大家会不会很陌生,反正我一开始看的时候,觉得不知道咋回事,那么我们来一起理解写,这个具体是什么含义:

  • 首先,我们很明确这个TreeNode是个泛型,我们把[]中的看作一个T,其实就是TreeNode[T],这个没问题
  • 接下里,我们要理解下“<:”这个符号的含义,这属于scala泛型中的知识,上边界和下边界。上边界是“<:”,下边界是“>:”;上边界,拿代码中的定义的含义解释就是BaseType必须是TreeNode[BaseType]的子类。也就是说TreeNode的泛型类型用BaseType表示,泛型类型比如是TreeNode类的子类

另外,TreeNode还继承了Product接口,对于product接口相关使用介绍,请看这篇文章(scala之product特质理解_大家都叫我船长的博客-CSDN博客),看完应该就明白了。

接下来,开始详细看看TreeNode一些重要方法:

  • 返回子节点,只定义了接口,具体实现在之类中
 /**
   * Returns a Seq of the children of this node.
   * Children should not change. Immutability required for containsChild optimization
   */
  def children: Seq[BaseType]
  •  返回子节点的set集合
lazy val containsChild: Set[TreeNode[_]] = children.toSet
  • 比较两个TreeNode是否相等
def fastEquals(other: TreeNode[_]): Boolean = {
    this.eq(other) || this == other
  }
  • 查找第一个符合f条件的TreeNode
def find(f: BaseType => Boolean): Option[BaseType] = if (f(this)) {
    Some(this)
  } else {
    children.foldLeft(Option.empty[BaseType]) { (l, r) => l.orElse(r.find(f)) }
  }
  • 将函数f 递归 应用于TreeNode节点以及所有子节点(先应用于parent,后应用child)
def foreach(f: BaseType => Unit): Unit = {
    f(this)
    children.foreach(_.foreach(f))
  }
  • 函数f 递归 应用于TreeNode节点以及所有子节点(先应用于child,后应用parent)
def foreachUp(f: BaseType => Unit): Unit = {
    children.foreach(_.foreachUp(f))
    f(this)
  }
  • 通过前序遍历的方式,将函数f递归应用于当前节点以及所有子节点,返回seq
def map[A](f: BaseType => A): Seq[A] = {
    val ret = new collection.mutable.ArrayBuffer[A]()
    foreach(ret += f(_))
    ret.toSeq
  }
  • flatmap和上面的map整体一致,但是这里的函数f的返回值必须是集合类型,这里需要注意
def flatMap[A](f: BaseType => TraversableOnce[A]): Seq[A] = {
    val ret = new collection.mutable.ArrayBuffer[A]()
    foreach(ret ++= f(_))  //f返回的结果必须是一个集合
    ret.toSeq
  }
  • collect ,这里使用到了scala的偏函数的使用(可以参考scala之偏函数学习_大家都叫我船长的博客-CSDN博客),对pf函数作用的所有节点返回为Some(B)都add到ret集合中,最终以seq的形式返回
def collect[B](pf: PartialFunction[BaseType, B]): Seq[B] = {
    val ret = new collection.mutable.ArrayBuffer[B]()
    val lifted = pf.lift
    foreach(node => lifted(node).foreach(ret.+=))
    ret.toSeq
  }
  • 返回当前节点的所有子节点
def collectLeaves(): Seq[BaseType] = {
    this.collect { case p if p.children.isEmpty => p }
  }
  • 先序的方式访问所有节点,且返回第一个pf作用后结果不为None的节点
def collectFirst[B](pf: PartialFunction[BaseType, B]): Option[B] = {
    val lifted = pf.lift
    lifted(this).orElse {
      children.foldLeft(Option.empty[B]) { (l, r) => l.orElse(r.collectFirst(pf)) }
    }
  }
  • mapProductIterator其实功能和productIterator.map(f).toArray一致
protected def mapProductIterator[B: ClassTag](f: Any => B): Array[B] = {
    val arr = Array.ofDim[B](productArity)
    var i = 0
    while (i < arr.length) {
      arr(i) = f(productElement(i))
      i += 1
    }
    arr
  }
  • 将当前节点的子节点替换为新的子节点
inal def withNewChildren(newChildren: Seq[BaseType]): BaseType = {
    val childrenIndexedSeq = asIndexedSeq(children)
    val newChildrenIndexedSeq = asIndexedSeq(newChildren)
    assert(newChildrenIndexedSeq.size == childrenIndexedSeq.size, "Incorrect number of children")
    if (childrenIndexedSeq.isEmpty ||
        childrenFastEquals(newChildrenIndexedSeq, childrenIndexedSeq)) {
      this
    } else {
      CurrentOrigin.withOrigin(origin) {
        val res = withNewChildrenInternal(newChildrenIndexedSeq)
        res.copyTagsFrom(this)
        res
      }
    }
  }
  • transfrom,调用transformDown,传入一个rule偏函数
def transform(rule: PartialFunction[BaseType, BaseType]): BaseType = {
    transformDown(rule)
  }
  •  transformDown,调用transformDownWithPruning,先序的方式使用rule作用于每个子节点,使用新的节点替换之前的,对节点不影响的,保留原来的节点
def transformDown(rule: PartialFunction[BaseType, BaseType]): BaseType = {
    transformDownWithPruning(AlwaysProcess.fn, UnknownRuleId)(rule)
  }

def transformDownWithPruning(cond: TreePatternBits => Boolean,
    ruleId: RuleId = UnknownRuleId)(rule: PartialFunction[BaseType, BaseType])
  : BaseType = {
    if (!cond.apply(this) || isRuleIneffective(ruleId)) {
      return this
    }
    val afterRule = CurrentOrigin.withOrigin(origin) {
      // 如果 this 是 BaseType 或其子类,则对 this 应用 rule 再返回应用 rule 后的结果,否则返回 this
      rule.applyOrElse(this, identity[BaseType])
    }

    // Check if unchanged and then possibly return old copy to avoid gc churn.
    if (this fastEquals afterRule) {
      // 如果应用了 rule 后节点无变化,则递归将 rule 应用于 children
      val rewritten_plan = mapChildren(_.transformDownWithPruning(cond, ruleId)(rule))
      if (this eq rewritten_plan) {
        markRuleAsIneffective(ruleId)
        this
      } else {
        rewritten_plan
      }
    } else {
      // If the transform function replaces this node with a new one, carry over the tags.
      // 如果应用了 rule 后节点有变化,则本节点换成变化后的节点(children 不变),再将 rule 递归应用于子节点。也就是从根节点往下来应用 rule 替换节点
      afterRule.copyTagsFrom(this)
      afterRule.mapChildren(_.transformDownWithPruning(cond, ruleId)(rule))
    }
  }
  • transformWithPruning,底层调用transformDownWithPruning(功能是返回此节点的副本,其中“规则”已递归应用于树。当“规则”不适用于给定节点时,它将保持不变)
def transformWithPruning(cond: TreePatternBits => Boolean,
    ruleId: RuleId = UnknownRuleId)(rule: PartialFunction[BaseType, BaseType])
  : BaseType = {
    transformDownWithPruning(cond, ruleId)(rule)
  }

def transformDownWithPruning(cond: TreePatternBits => Boolean,
    ruleId: RuleId = UnknownRuleId)(rule: PartialFunction[BaseType, BaseType])
  : BaseType = {
    if (!cond.apply(this) || isRuleIneffective(ruleId)) {
      return this
    }
    val afterRule = CurrentOrigin.withOrigin(origin) {
      // 如果 this 是 BaseType 或其子类,则对 this 应用 rule 再返回应用 rule 后的结果,否则返回 this
      rule.applyOrElse(this, identity[BaseType])
    }

    // Check if unchanged and then possibly return old copy to avoid gc churn.
    if (this fastEquals afterRule) {
      // 如果应用了 rule 后节点无变化,则递归将 rule 应用于 children
      val rewritten_plan = mapChildren(_.transformDownWithPruning(cond, ruleId)(rule))
      if (this eq rewritten_plan) {
        markRuleAsIneffective(ruleId)
        this
      } else {
        rewritten_plan
      }
    } else {
      // If the transform function replaces this node with a new one, carry over the tags.
      // 如果应用了 rule 后节点有变化,则本节点换成变化后的节点(children 不变),再将 rule 递归应用于子节点。也就是从根节点往下来应用 rule 替换节点
      afterRule.copyTagsFrom(this)
      afterRule.mapChildren(_.transformDownWithPruning(cond, ruleId)(rule))
    }
  }
  • transformUp 用后序遍历方式将规则作用于所有节点,调用transformUpWithPruning
def transformUp(rule: PartialFunction[BaseType, BaseType]): BaseType = {
    transformUpWithPruning(AlwaysProcess.fn, UnknownRuleId)(rule)
  }

def transformUpWithPruning(cond: TreePatternBits => Boolean,
    ruleId: RuleId = UnknownRuleId)(rule: PartialFunction[BaseType, BaseType])
  : BaseType = {
    if (!cond.apply(this) || isRuleIneffective(ruleId)) {
      return this
    }
    val afterRuleOnChildren = mapChildren(_.transformUpWithPruning(cond, ruleId)(rule))
    val newNode = if (this fastEquals afterRuleOnChildren) {
      CurrentOrigin.withOrigin(origin) {
        rule.applyOrElse(this, identity[BaseType])
      }
    } else {
      CurrentOrigin.withOrigin(origin) {
        rule.applyOrElse(afterRuleOnChildren, identity[BaseType])
      }
    }
    if (this eq newNode) {
      markRuleAsIneffective(ruleId)
      this
    } else {
      // If the transform function replaces this node with a new one, carry over the tags.
      newNode.copyTagsFrom(this)
      newNode
    }
  }
  • mapChildren 返回 f 应用于所有子节点后该节点的 copy。
def mapChildren(f: BaseType => BaseType): BaseType = {
    if (containsChild.nonEmpty) {
      withNewChildren(children.map(f))
    } else {
      this
    }
  }

上面罗列的方法,基本就是TreeNode常用的,还有一些不常用的非核心的,这里就不一一介绍了,大家有兴趣的可以自己去看看源码。

另外TreeNode有两个子类,分别是Expression和QueryPlan,这篇文章我们就先讲到这里,后面会对这两个子类也会进行一一介绍的。

有兴趣的可以关注我,后面一起学习sparkSql源码,另外文章中有错误的地方,感谢指出哈。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/29504.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring Cloud(十二):Spring Cloud Security

主要内容 Spring Security 模块使用设置用户名密码基于内存基于UserDetailsService 接口基于配置类WebSecurityConfigurerAdapter基于DB 用户-角色-权限自定义登录页面登录认证流程自定义成功、自定义失败会话管理&#xff08;Session)会话控制会话超时会话并发控制集群sessio…

【Webpack】webpack的基础使用详细总结 下(建议收藏)

1- 前言 昨天已经介绍了weback的基础使用了&#xff0c;详细总结看这边博客&#xff01;&#xff01;&#xff01; 【Webpack】webpack的基础使用详细总结 上&#xff08;建议收藏&#xff09; 今天来总结一下剩余的常用 &#xff01;&#xff01;&#xff01;&#xff01; …

微信抽奖活动有什么作用_分享微信抽奖小程序开发的好处

在H5游戏中&#xff0c;抽奖是最受消费者喜爱的模式之一。将H5微信抽奖活动结合到营销中&#xff0c;可以带来意想不到的效果&#xff0c;带流量和曝光率&#xff0c;所以许多企业也会在做活动时添加上不同类型的H5微信抽奖活动。 那么&#xff0c;新手怎么搭建微信抽奖活动&am…

01背包、完全背包、多重背包、分组背包总结

文章目录一、01背包问题二、完全背包问题三、多重背包问题四、分组背包一、01背包问题 n个物品&#xff0c;每个物品的重量是wiw_iwi​&#xff0c;价值是viv_ivi​&#xff0c;背包的容量是mmm 若每个物品最多只能装一个&#xff0c;且不能超过背包容量&#xff0c;则背包的最…

【ABAP】SAP发送消息至RabbitMQ

SAP发送消息至RabbitMQ ——以下关于RabbitMQ的内容大致转载于朱忠华老师的《RabbitMQ实战指南》一书 【基础知识】 消息队列中间件(Message Queue Middleware,即MQ)也可以称之为消息队列或者消息中间件,是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数…

面试官: B 树和 B+ 树有什么区别?

问各位小可爱一个问题&#xff1a;MySQL 中 B 树和 B 树的区别&#xff1f; 请自己先思考5秒钟&#xff0c;看看是否已经了然如胸&#xff1f; 好啦&#xff0c;时间到&#xff01; B 树和 B 树是两种数据结构&#xff0c;构建了磁盘中的高速索引结构&#xff0c;因此不仅 …

上海亚商投顾:沪指窄幅震荡 “中字头”概念股又暴涨

上海亚商投顾前言&#xff1a;无惧大盘大跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。 市场情绪沪指今日窄幅震荡&#xff0c;深成指、创业板指盘中跌超1%&#xff0c;午后探底回升一度翻红。光伏、储能等赛道午后…

[Spring Cloud] GateWay自定义过滤器/结合Nacos服务注册中心

✨✨个人主页:沫洺的主页 &#x1f4da;&#x1f4da;系列专栏: &#x1f4d6; JavaWeb专栏&#x1f4d6; JavaSE专栏 &#x1f4d6; Java基础专栏&#x1f4d6;vue3专栏 &#x1f4d6;MyBatis专栏&#x1f4d6;Spring专栏&#x1f4d6;SpringMVC专栏&#x1f4d6;SpringBoot专…

DocuWare Workflow Manager(工作流管理器)

DocuWare Workflow Manager 公司是按流程运转的。销售、人力资源、财务等部门需要流畅、可靠的信息传输&#xff0c;以便在正确的时间做出正确的决策。订单管理、员工入职和发票审批等流程可以根据您的精确需求进行设计和自动化&#xff0c;避免时间浪费。 适用于复杂业务的简…

Mysql数据库相关面试题

1.关系型和非关系型数据库的区别是什么? 关系型和非关系型数据库的主要差异是数据存储的方式,关系型数据库天然就是表格存储,因此存储在数据表的行和列中,数据表可以彼此关联协作存储,很容易提取数据. 优点: 易于维护:都是使用表结构,格式一致,使用方便:sql语言通用,可以用于复…

MyBatis逆向工程和分页插件

1、分页插件 MyBatis 通过提供插件机制&#xff0c;让我们可以根据自己的需要去增强MyBatis 的功能。需要注意的是&#xff0c;如果没有完全理解MyBatis 的运行原理和插件的工作方式&#xff0c;最好不要使用插件&#xff0c; 因为它会改变系底层的工作逻辑&#xff0c;给系统带…

2022年全国职业院校技能大赛:网络系统管理项目-模块B--Windows样题7

初始化环境1.默认账号及默认密码 Username: Administrator Password: ChinaSkill22! Username: demo Password: ChinaSkill22! 注:若非特别指定,所有账号的密码均为 ChinaSkill22! 项目任务描述你作为技术工程师,被指派去构建一个公司的内部网络,要为员工提供便捷、安…

超算云平台在线功能Q-Flow、Q-Studio V2.1版本升级,web端在线建模+DFT计算

建模DFT计算还可以这么玩&#xff1f; Q-Flow&#xff08;在线可视化提交任务功能&#xff09;以及 Q-Studio&#xff08;在线建模功能&#xff09;依托Mcloud平台免费向用户开放使用。告别Linux编辑代码提交任务的模式&#xff0c;Q-Flow可在浏览器里通过拖拽图形化的第一性原…

【第06节】Selenium4 JavaScript 处理场景实战(Python Web自动化测试)

Selenium 4 【01-06节】主讲元素定位&#xff0c;处理一些特殊场景的方法与实战已经全部写完。文章所有素材来自互联网&#xff0c;如果文章有侵权处&#xff0c;请联系作者。 文章目录1、Selenium4 自动化 JavaScript 场景实战1.1 使用 JavaScript 处理富文本1.2 使用 JavaScr…

Linux——网络编程总结性学习

什么是ISP&#xff1f; 网络业务提供商_百度百科 计算机网络有哪些分类方式,计算机网络有哪些分类&#xff1f;_陈泽杜的博客-CSDN博客 路由器_百度百科 目前实际的网络分层是TCP/IP四层协议 当我们浏览⽹站找到想要下载的⽂件以后&#xff0c;找到相应的接⼝点击下载就好了。…

新形势下安全风险评估实践

​ 随着安全内涵的不断扩充和发展&#xff0c;风险评估作为安全管理的重点&#xff0c;内容以及方法都与时俱进的得到了发展和丰富&#xff0c;本文将介绍新形势下风险评估的特点和实践心得&#xff0c;以供参考。 一、新形势下安全风险评估特点 首先是内外部形势和要求的变…

Docker入门教程(详细)

目录 一、Docker概述 1.1 Docker 为什么出现&#xff1f; 1.2 Dorker历史 1.3 能做什么 虚拟机技术&#xff1a;&#xff08;通过 软件 模拟的具有完整 硬件 系统功能的、运行在一个完全 隔离 环境中的完整 计算机系统&#xff09; 容器化技术&#xff1a;&#xff08;容…

【JAVA高级】——玩转JDBC中的三层架构

✅作者简介&#xff1a;热爱国学的Java后端开发者&#xff0c;修心和技术同步精进。 &#x1f34e;个人主页&#xff1a;Java Fans的博客 &#x1f34a;个人信条&#xff1a;不迁怒&#xff0c;不贰过。小知识&#xff0c;大智慧。 &#x1f49e;当前专栏&#xff1a;Java案例分…

Oracle SQL执行计划操作(8)——视图与集合相关操作

8. 视图相关操作 该类操作与包含视图的SQL语句相关。当SQL语句被硬解析时,如果SQL语句中的视图未被合并,则根据不同的具体场景,如下各操作可能会出现于相关SQL语句的执行计划。 1)VIEW 创建数据的一个中间视图,一般分为系统视图和用户视图。优化器在为SQL语句生成执行计…

iOS上架流程详细版本

苹果上架审核周期长一直是困扰用户的一大问题&#xff0c;这次把我自己上架的经历分享给大家&#xff0c;避免大家入坑。 上架总流程&#xff1a; 创建开发者账号 借助辅助工具appuploader创建证书&#xff0c;描述文件 iTunes connect创建App 打包IPA上传App Store等待审…