Delta数据湖upsert调优---1000多列表的调优

news2024/11/25 8:22:07

背景

本文基于
spark 3.1.1
delta 1.0.0
目前在我们公司遇到了一个任务写delta(主要是的upsert操作),写入的时间超过了6个小时,该spark主要的做的事情是:

  1. 一行数据变几百行
  2. 开窗函数去重
  3. 调用pivot函数 行列的转换,该转换以后会存在好多列存在null的情况,导致数据很稀疏

在通过对delta的upsert操作的分析,以及调优后,运行时间直接减少到1.2个小时

分析

上述deltaupsert的操作主要是通过Upsert into a table using merge
实现的,该操作的的具体实现,可以参考delta的MergeIntoCommandrun方法,该方法主要的运行计划如下:

SerializeFromObject [if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, id), StringType), true, false) AS id#5450,
 +- MapPartitions org.apache.spark.sql.delta.commands.MergeIntoCommand$$Lambda$4784/570959825@527325c8, obj#5449: org.apache.spark.sql.Row                                                                                                                                                                                               
    +- DeserializeToObject createexternalrow(id#686.toString, a#716.toString, b#718.toString, c#720.toString, d#722.toString, e#724.toString, f#726.toString, g#728.toString, realti
       +- Join FullOuter, (id#686 = id#768)                                                                                                                                                                                                                                                                                              
          :- Project [id#686, a#716, b#718, c#720, d#722, e#724, f#726, g#728, realtime_finish_albums#730, h#732, i#734, today_hot_inter
          :  +- Aggregate [id#686], [id#686, first(if ((column#687 <=> a)) value#688 else null, true) AS a#716, first(if ((column#687 <=> b)) value#688 else null, true) AS b#718, first(if ((column#687 <=> c)) value#688 else null,
          :     +- Project [id#686, column#687, value#688]                                                                                                                                                                                                                                                                               
          :        +- Filter (isnotnull(rk#702) AND (rk#702 = 1))                                                                                                                                                                                                                                                                        
          :           +- Window [row_number() windowspecdefinition(id#686, column#687, ts#689L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rk#702], [id#686, column#687], [ts#689L DESC NULLS LAST]                                                                                        
          :              +- Project [id#686, column#687, value#688, ts#689L]                                                                                                                                                                                                                                                             
!         :                 +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, DeltaInfo, true])).id, true, false) AS id#686, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, k
          :                    +- ExternalRDD [obj#685]                                                                                                                                                                                                                                                                                  
          +- Project [id#768, baby_birthday#769, w#770, s#771, j#772, k#773, l#774, m#775, n#776, o#777, p#778, q#779, x#780, y#781, z#782, h_la
             +- Relation[id#768,baby_birthday#769,w#770,s#771,j#772,k#773,l#774,m#775,n#776,o#777,p#778,q#779,x#780,y#781,z#782,h_last
      

对应的物理执行图为:
在这里插入图片描述

可以看到SerializeFromObject中会有很多staticinvoke的计划,该计划的代码如下:

case class StaticInvoke(
    staticObject: Class[_],
    dataType: DataType,
    functionName: String,
    arguments: Seq[Expression] = Nil,
    propagateNull: Boolean = true,
    returnNullable: Boolean = true) extends InvokeLike {

  override def eval(input: InternalRow): Any = {
    invoke(null, method, arguments, input, dataType)
  }

  def invoke(
    obj: Any,
    method: Method,
    arguments: Seq[Expression],
    input: InternalRow,
    dataType: DataType): Any = {
  val args = arguments.map(e => e.eval(input).asInstanceOf[Object])
  if (needNullCheck && args.exists(_ == null)) {
    // return null if one of arguments is null
    null
  } else {
    val ret = method.invoke(obj, args: _*)
    val boxedClass = ScalaReflection.typeBoxedJavaMapping.get(dataType)
    if (boxedClass.isDefined) {
      boxedClass.get.cast(ret)
    } else {
      ret
    }
  }
  }

}

也就是说StaticInvoke最终会调用反射去获取字段,要知道反射是比较消耗时间的,要知道我们现在是有1000多个字段,如果每一行都会被反射1000次,再加上几十亿行的数据,这个计算速度肯定是比较慢的,而且为了达到更新的效果,我们还调用了coalease操作,这又增加了cpu的计算(1000多次)

优化

所以我们从以下两个方面进行优化:

  • merge into update的时候,会把所有字段(1000多个字段),做 coalease 操作,增加了cpu的消耗。
    改成只更新有更新的字段,主要是调用pivot的重载函数
      def pivot(pivotColumn: String): RelationalGroupedDataset = pivot(Column(pivotColumn))
    
    
    这里让spark自行推断schema,之后在update的时候只set该schema的字段,其他的字段不变
  • 写入delta的时候 SerializeFromObject 涉及到 staticinvoke 的操作(这里会用反射进行调用,比较耗时)
    分拆成多个表,这样每个表相对于原表只有很少的一部分的字段,这样每一行数据的反射调用就少了很多

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/719024.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux 串口工具minicom

Linux minicom Linux中的Minicom是一个串口通信工具&#xff0c;用于与外部设备进行串口通信。它可以用于与嵌入式设备、调试设备、网络设备等进行通信和配置。 调试和配置串口设备&#xff1a;minicom可以用于连接和调试各种串口设备&#xff0c;如调制解调器、路由器、交换…

软件DevOps云化发展的趋势 【课程限时免费】

你了解什么是DevOps吗&#xff1f; 它是怎么诞生的&#xff1f; DevOps能做些什么&#xff1f; 相信对于DevOps的实践者和关注者来说&#xff0c;对它已经不陌生了&#xff0c;但是对于刚刚进入开发者领域不久的小伙伴应该并不清楚&#xff0c;下面就让小智带你一起了解DevO…

node初识

一、什么是node node官网&#xff1a;https://nodejs.cn/ Node.js是一个开源的、跨平台的JavaScript运行环境。它基于Chrome V8 JavaScript引擎&#xff0c;使得JavaScript可以在服务器端运行。Node.js具有事件驱动、非阻塞式I/O的特性&#xff0c;适用于开发高性能的网络应…

ue4_Dota总结 GameMode篇

一&#xff1a;框架设计 新建地图M01&#xff1b; 创建gamemode&#xff1b; 创建gamestate&#xff1b; 创建playercontroller&#xff1b; 创建hud&#xff1b; 创建pawn&#xff1b; 将gamemode设置为M01地图中&#xff1b;将gamestate/playercontroller/hud/pawn添加…

SourceTree 切换分支时提示框 OpenSSH助手验证失败

问题描述&#xff1a; 这是我找的别的图&#xff0c;我自己的图忘记截了&#xff0c;大概意思差不多&#xff0c;就是服务器验证失败&#xff1a; 解决办法 &#xff1a;以下3步 1、命令行输入 ssh-keygen 然后一直下一步&#xff0c;直到结束&#xff0c;密钥和公钥会…

Dbeaver 往s4 HANA自建表 导入数据

今天有一份数据 13W行 需要导入S4 自建表。本来搞了一个通用的自建表导入程序&#xff0c;无奈13W行的数据就是无法读取&#xff0c;200行倒是可以。 那非常时期&#xff0c;用非常手段&#xff0c;尝试了一下刺激&#xff0c;dbeaver 导入到S4 HANA数据 后面试一下&#xff…

【运维工程师学习】磁盘相关知识——磁盘、柱面、磁道、磁头、扇区、格式化

【运维工程师学习】磁盘 1、DOS&#xff08;Disk Operating System&#xff09;2、硬盘坏道(1)逻辑坏道(2)物理坏道(3)检查坏道(4)原因(5)修复修复逻辑坏道用Scandisk检查用软件隐藏物理坏道低级格式化修复坏道 3、柱面4、扇区5、磁道6、数据区(1)数据区的内容(2)数据区根目录分…

logstash过滤器插件--translate

logstash过滤器之translate 官方手册&#xff1a;https://www.elastic.co/guide/en/logstash/current/plugins-filters-translate.html#plugins-filters-translate-target 功能描述 translate过滤器插件用于根据字典或查找文件过滤传入数据中的特定字段&#xff0c;如果输入…

【Layui】图标选择器 iconPicker 的使用

【Layui】图标选择器 iconPicker 的使用 1.项目前言2.项目目标3.项目实现3.1 图标读取3.2 图标擦除 4.效果展示4.1 简单使用4.2 参数配置4.3 使用 unicode 5.源码地址 系统&#xff1a;Win10 JDK&#xff1a;1.8.0_333 IDEA&#xff1a;2022.3.3 SpringBoot&#xff1a;2.7.6 L…

从零开始 Spring Boot 59:Hibernate 日志

从零开始 Spring Boot 59&#xff1a;Hibernate 日志 图源&#xff1a;简书 (jianshu.com) Hibernate 支持多种日志模块&#xff0c;本文介绍如何在 Spring Boot 中使用 Log4j2记录 Hibernate 日志。 实际上本文是我在写上篇文章时遇到的各种坑和最终解决的记录。 首先需要添加…

《黑马头条》 内容安全 feign 延迟任务精准发布

04自媒体文章-自动审核 1)自媒体文章自动审核流程 1 自媒体端发布文章后&#xff0c;开始审核文章 2 审核的主要是审核文章的 内容&#xff08;文本内容和图片&#xff09; 3 借助 第三方提供的接口审核文本 4 借助第三方提供的接口审核图片&#xff0c;由于图片存储到minIO中&…

C#核心知识回顾——8.ArryList、Stack栈、队列、哈希表

1.ArryList ArrayList array new ArrayList();//1.增array.Add(0);array.Add("1");array.Add(false);ArrayList arrayList new ArrayList();arrayList.Add(123);//范围增加(类似于拼接&#xff09;array.AddRange(arrayList);//插入(指定位置)array.Insert(1, &qu…

coxph-基准累积风险函数

右删失数据下的coxph拟合后&#xff0c;得到回归参数和基准累积风险函数&#xff0c;其中基准累积风险函数使用breslow估计得到&#xff1a; 代码&#xff0c;只是为了说明这个问题 res.cox <- survival::coxph(survival::Surv(time, status 2) ~ X1 X2,data auxData)bh…

LLM应用的技术栈与设计模式详解

大型语言模型是构建软件的强大新原语。 但由于它们是如此新&#xff0c;并且其行为与普通计算资源如此不同&#xff0c;因此如何使用它们并不总是显而易见的。 在这篇文章中&#xff0c;我们将分享新兴 LLM 应用程序的参考架构。 它展示了我们所见过的人工智能初创公司和先进科…

AOP简介

问题1&#xff1a;AOP的作用是什么&#xff1f; 问题2&#xff1a;连接点和切入点有什么区别&#xff0c;二者谁的范围大&#xff1f; 问题3&#xff1a;请描述什么是切面&#xff1f; 1.1 AOP简介和作用【理解】 AOP(Aspect Oriented Programming)面向切面编程&#xff0c;…

Vue3----吸顶导航

安装vueuse&#xff1a; npm i vueuse/core 1. 准备吸顶导航组 2.获取滚动距离 <script setup> // vueUse 中 useScroll import { useScroll } from vueuse/core const {y} useScroll(window) </script><template><div class"app-header-sticky&…

iostat命令和vmstat命令

1、iostat命令(磁盘) 1.1、介绍 iostat是I/O statistics&#xff08;输入/输出统计&#xff09;的缩写&#xff0c;iostat工具将对系统的磁盘操作活动进行监视。它的特点是汇报磁盘活动统计情况&#xff0c;同时也会汇报出 CPU使用情况。同vmstat一样&#xff0c;iostat也有一…

前端基础环境搭建

前端基础环境搭建 序nvm编辑器下载问题 PostMan接口测试工具 序 毕业了第一次写博客&#xff0c;因为入职啦。浅记录下今日工作内容。 刚入职必然是需要搭建好基础的环境。需了解并配置Node.js/NVM/NPM/Git/前端编辑器/Postman等等。 nvm nvm&#xff08;node.js version ma…

数据结构--树的定义与基本术语

数据结构–树的定义与基本术语 数的基本概念 树:从树根生长&#xff0c;逐级分支 非空树 \color{purple}非空树 非空树的特性: 有且仅有一个根节点 没有后继的结点称为“叶子结点”(或终端结点) 有后继的结点称为“分支结点”(或非终端结点) 除了根节点外&#xff0c;任何一个…