Paimon 与 Spark 的集成(二):查询优化

news2024/10/3 10:46:01

Paimon

Apache Paimon (incubating) 是一项流式数据湖存储技术,可以为用户提供高吞吐、低延迟的数据摄入、流式订阅以及实时查询能力。Paimon 采用开放的数据格式和技术理念,可以与 Flink / Spark / Trino 等诸多业界主流计算引擎进行对接,共同推进 Streaming Lakehouse 架构的普及和发展。

Paimon x Spark

‍‍Apache Spark,作为大数据处理的统一计算分析引擎,不仅支持多种语言的高级API使用,也支持了丰富的大数据场景应用,包括结构化数据处理的Spark SQL、用于机器学习的MLlib,用于图形处理的GraphX,以及用于增量计算和流处理的Structured Streaming。Spark已经成为了大数据领域软件栈中必不可少的组成部分。对 Paimon 来说,为了在准实时和离线湖仓场景更加便利的落地,与 Spark 深度、全面的集成势在必行。

在之前的Paimon Release版本,我们着重丰富Paimon在功能上和Spark SQL生态的集成,包括Schema Evolution,Structured Streaming Read/Write,Dynamic Insert Overwrite Partition,Update/Merge Into等等。在最近发布的0.6和0.7版本,我们开始在Paimon基于Spark SQL查询性能上做一些工作。在初期我们会结合Spark SQL已有的优化规则和框架,让Paimon充分利用到这些。通过一系列优化,我们将 Paimon x Spark 在 TpcDS 上的性能提高了37+%,已基本和 Parquet x Spark 持平。下文将对其中的关键优化点进行详细介绍。‍‍

动态分区裁剪

动态分区裁剪(Dynamic Partition Prunning,DPP)在SQL优化中是常见的优化点,本质上是谓词下推(Predicate PushDown)的一种拓展,其目的是最小化从数据源中读取数据的IO成本,也进而减少了计算成本。

在数仓中,常常将较大的事实表和很小的维度表关联查询,且事实表需要根据维表中的字段信息来进行过滤,如下面TpcDS Q14中的SQL片段:

select ss_quantity quantity ,ss_list_price list_price
from store_sales, date_dim
where ss_sold_date_sk = d_date_sk and d_year between 1999 and 1999 + 2
order by quantity limit 10;

在不支持DPP的情况下的执行计划简化如下:

706d66e215d0cc267cc305eb9fa0ca70.png

Paimon应用的是Spark DataSource V2的查询框架,该框架在Spark3.2后提供了 SupportsRuntimeFiltering 接口用于V2表实现运行时的动态过滤。理论上,任何字段(包括普通数据字段和分区字段)的过滤条件都能被应用,但一般而言仅分区字段的过滤条件能够被完全应用,即无需上层的Filter的节点再使用该过滤条件去选择数据。Paimon表通过该接口实现了动态分区裁剪的能力。在支持DPP后执行计划如下所示:

5cbc3998d432422f3cb80273bbe295f3.png

在1T的TpcDS数据集下,应用DPP后 store_sales 表参与join的数据量从27亿 减少到16亿。仅应用到该优化后,Q14运行时间减少到原来的~55%,1T TpcDS数据集的查询性能整体提升20+%;

相关代码:

https://github.com/apache/incubator-paimon/pull/2411

https://github.com/apache/incubator-paimon/pull/2421

Exchange复用

Exchange是Spark中物理计划中一个关键的操作,对应逻辑计划中的Shuffle。在执行阶段,Exchange可以代表某个SQL中部分Plan输出的数据。在复杂的SQL中,我们可以通过公共表表达式(Common Table Expression,CTE)语法定义一个SQL片段,用于简化整个SQL或者被多次使用。以下面简化的TpcDS Q23为例,定义的其中一个CTE frequent_ss_items 在整个SQL中被两次使用。

with frequent_ss_items as (
  select substr(i_item_desc,1,30) itemdesc, i_item_sk item_sk, d_date solddate, count(*) cnt
  from store_sales, date_dim, item
  where ss_sold_date_sk = d_date_sk and ss_item_sk = i_item_sk and d_year in (2000,2000+1,2000+2,2000+3)
  group by substr(i_item_desc,1,30),i_item_sk,d_date
  having count(*) >4
),
max_store_sales as (...),
best_ss_customer as (...)
select sum(sales)
from (
  select cs_quantity*cs_list_price sales
       from catalog_sales
           ,date_dim
       where d_year = 2000
         and d_moy = 2
         and cs_sold_date_sk = d_date_sk
         and cs_item_sk in (select item_sk from frequent_ss_items)
         and cs_bill_customer_sk in (select c_customer_sk from best_ss_customer)
      union all
      select ws_quantity*ws_list_price sales
       from web_sales
           ,date_dim
       where d_year = 2000
         and d_moy = 2
         and ws_sold_date_sk = d_date_sk
         and ws_item_sk in (select item_sk from frequent_ss_items)
         and ws_bill_customer_sk in (select c_customer_sk from best_ss_customer)
) y
limit 100;

显然在执行阶段,我们希望 frequent_ss_items 仅被执行一次,执行后的数据可以缓存,然后分别执行后续和 catalog_sales 以及 web_sales 表的Join操作。针对这个场景,Spark提供了Exchange复用的优化,期待的执行计划简化如下所示:

b3492c3d21325832ef6e72d1f4163b02.png

但该优化依赖算子Plan中各个物理操作的 hashCode 来确定实际运行时是否可以复用。我们定位并解决了Paimon中存在的实现问题,使得Paimon可以使用到Spark提供的Exchange复用的优化,从而减少不必要的冗余计算,也降低了IO和网络的开销。仅应用到该优化后,Q23运行时间减少到原来的~50%,1T TpcDS数据集的查询性能整体提升13+%;

相关代码:

https://github.com/apache/incubator-paimon/pull/2488

动态调整Scan并发

任务实际执行时的并发度是影响作业运行性能的关键之一。Spark提供了 spark.sql.shuffle.partitions 参数来调整Join或者Agg等算子的并发,也提供了自适应查询执行(Adative Query Execution,AQE)框架动态调整并发,但这些都无法影响到读取数据源Scan阶段的并发。

在DataSource V2的框架下,数据源的Scan方式包括并发完全由DataSource自己决定。我们以TpcDS Q19为例:

select  i_brand_id brand_id, i_brand brand, i_manufact_id, i_manufact,
   sum(ss_ext_sales_price) ext_price
 from date_dim, store_sales, item,customer,customer_address,store
 where d_date_sk = ss_sold_date_sk
   and ss_item_sk = i_item_sk
   and i_manager_id=8
   and d_moy=11
   and d_year=1998
   and ss_customer_sk = c_customer_sk 
   and c_current_addr_sk = ca_address_sk
   and substr(ca_zip,1,5) <> substr(s_zip,1,5) 
   and ss_store_sk = s_store_sk 
 group by i_brand
      ,i_brand_id
      ,i_manufact_id
      ,i_manufact
 order by ext_price desc
         ,i_brand
         ,i_brand_id
         ,i_manufact_id
         ,i_manufact
limit 100 ;

其中 customer_addressstore 基于 substr(ca_zip,1,5) <> substr(s_zip,1,5) 条件Join。

在未引入CBO对join重排序的情况下,这两张表通过BroadcastNestedLoopJoin来实现,没有引入Exchange调整Join的并发。执行计划如下图所示:

86a8587a6b3b99180bce893548cffcc0.png

在未引入优化之前,由于 customer_address 表的数据分片较小,但任务计算负载较高(数据Join后严重膨胀),整体执行性能很差。

a276a6813767dd352c705d4dc5a3fb52.png

Paimon根据这种问题提供了基于当前作业的可用core数来动态调整数据源的数据分片的能力,也进而调整并发,从而提升查询效率。

b48dbdcff79fc724618f0497f9655ecd.png

仅应用该优化后,Q19运行时间减少到原来的~25%,1T TpcDS数据集的查询性能整体提升14+%;

相关代码:

https://github.com/apache/incubator-paimon/pull/2482

合并标量子查询

类似于Exchange复用,合并标量子查询优化会遍历整个SQL逻辑执行计划,提取出标量子查询(ScalarSubQuery),尝试将多个标量子查询合并起来,使得仅执行一次子查询得到多个标量值。

我们以TpcDS Q9的片段为例,整个Q9由5个case-when语句构成。

select case when (select count(*) 
                  from store_sales 
                  where ss_quantity between 1 and 20) > 74129
            then (select avg(ss_ext_discount_amt) 
                  from store_sales 
                  where ss_quantity between 1 and 20) 
            else (select avg(ss_net_paid)
                  from store_sales
                  where ss_quantity between 1 and 20) end bucket1
from reason
where r_reason_sk = 1;

在该SQL中,case when 的条件,thenelse 语句三个部分使用同样的过滤条件读取同一张表,仅聚合表达式不同。在没有应用到这个优化的情况下,执行计划如下所示:

003ce994bc84f889b4951b45cd8674a5.png

Spark本身提供了 MergeScalarSubQueries 的优化规则,但从实现上没法更好的对接到Paimon这样的DataSource V2表,因此我们在Paimon侧单独实现,并通过Spark提供的Extensions的接口将Paimon自实现的优化注入到了Spark优化器中。在应用该优化后,执行计划如下所示:

4ad8eb47d043d3a69a9034e379efac37.png

由此可见,合并标量子查询优化有效的减少了冗余的计算,提升了Paimon在该场景下的查询性能。仅应用该优化后,Q9运行时间减少到原来的~57%。

相关代码:

https://github.com/apache/incubator-paimon/pull/2657

Cost-Based优化

Spark SQL允许使用基于成本的优化(Cost-Based Optimizer,CBO)来提升查询性能,主要用于多路Join的场景,使用动态规划算法来选择Cost最低的Join顺序。要想使得这个优化能更有效,依赖于计算Cost的模型,以及表的表级和列级统计信息的收集,而其中列级统计信息在评估Plan算子节点的运行时统计信息中尤为重要。

新版本的Paimon在元数据中增加了statistics的信息,可以通过原生的Spark Analyze命令完成收集,并对接到了Spark SQL,使得Spark SQL可以利用Paimon的表级/列级信息进行查询优化。我们以TpcDS Q24a为例:

with ssales as
(select c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color, i_current_price, i_manager_id, i_units, i_size, sum(ss_net_paid) netpaid
from store_sales, store_returns, store, item, customer, customer_address
where ss_ticket_number = sr_ticket_number
  and ss_item_sk = sr_item_sk
  and ss_customer_sk = c_customer_sk
  and ss_item_sk = i_item_sk
  and ss_store_sk = s_store_sk
  and c_current_addr_sk = ca_address_sk
  and c_birth_country <> upper(ca_country)
  and s_zip = ca_zip
and s_market_id=8
group by c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color, i_current_price, i_manager_id, i_units, i_size)
select c_last_name, c_first_name, s_store_name, sum(netpaid) paid
from ssales
where i_color = 'peach'
group by c_last_name, c_first_name, s_store_name
having sum(netpaid) > (select 0.05 * avg(netpaid) from ssales)
order by c_last_name, c_first_name, s_store_name;

其中CTE ssales 的部分,仅提供表级统计信息的情况下的执行计划大致如下所示,包括两个SortMergeJoin,其中左侧虚线框更是大数据量间的Join操作,严重影响性能。

ad78e4ff3db958cf2d7350591101cfa2.png

而执行Analyze提供了列级统计信息后执行计划大致如下所示,对参与Join的表进行了重排序,且所有Join都是BroadcastHashJoin的方式执行。

3daa983ed07518a37aeb6a3152a029fb.png

Paimon提供了完整的statistics,借助于CBO框架,不仅可以提升相应的查询性能,也可以使得在正常资源配置下无法跑通的SQL能够正常运行,比如TpcDS Q72。在之前优化项基础上叠加应用该优化后,Q24运行时间减少到原来的~23%,1T TpcDS数据集的查询性能整体提升~30%;

相关代码:

https://github.com/apache/incubator-paimon/pull/2677

https://github.com/apache/incubator-paimon/pull/2752

https://github.com/apache/incubator-paimon/pull/2798

优 化 效 果

本文使用阿里云 EMR 5.16.0版本,集群节点的属性如下:

  • master: 1 * ecs.g7.8xlarge 32 vCPU 128 GiB

  • core: 6 * ecs.g7.8xlarge 32 vCPU 128 GiB

使用的组件及版本如下:

  • Paimon: 0.8-SNAPSHOT (对应到commit:193df7345aa520f8b45125cdd85588a91a3fc3a9)

  • Spark: 3.3.1 (额外cherry-pick SPARK-41378,以支持DataSource V2下的stats相关功能)

启用的Spark相关配置:

spark.executor.cores

4

spark.executor.memory

14g

spark.executor.memoryOverhead

2g

spark.dynamicAllocation.enabled

true

spark.sql.cbo.enabled

true

spark.sql.cbo.joinReorder.enabled

true

spark.sql.autoBroadcastJoinThreshold

128m

Paimon表选用append表(无主键表),使用parquet作为文件格式,设置bucket=-1(最新代码已经默认设置:PAIMON-2829),这样便于和Spark parquet表进行对比。

24f65bc3f2521ab66af8dced57f63f29.png

上图为我们使用parquet表(带有表级统计信息,即rowCount和sizeInByte两个指标)作为基准,以此向右分别为优化前和应用这些优化后的Paimon表(仅带表统计信息),以及Parquet表和Paimon表在收集到Column级别统计信息时的查询较基准的性能对比。

对比可见,在一般情况下(无column级统计信息)优化后的Paimon和Parquet已经基本持平。开启column级统计信息后,Paimon较Parquet慢~8%,这中间的差距也将是性能优化继续跟进的方向之一。

后 续 规 划 

在湖仓体系下,我们认为读写查询优化一直是一项任重而道远的事情。当前的优化主要集中在让Paimon充分利用到Spark SQL现有的优化规则或者优化框架。在继续推进的同时,我们也会利用Paimon自身的特性,比如Index或者Clustering等,以及优化Scan等进一步提升Paimon性能。

另外,在当前湖仓场景下,依然有很多无主键表的使用,后续对append表支持Upsert能力也是重要的规划之一。

< 往 期 精 彩 推 荐 >

625e8a1345e152acaff075b371c2e9c7.png


▼ 关注「Apache Spark 技术交流社区」,获取更多技术干货 ▼

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1504954.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

视频远程监控平台EasyCVR集成后播放只有一帧画面的原因排查与解决

智慧安防视频监控平台EasyCVR能在复杂的网络环境中&#xff08;专网、局域网、广域网、VPN、公网等&#xff09;将前端海量的设备进行统一集中接入与视频汇聚管理&#xff0c;平台可支持的接入协议包括&#xff1a;国标GB28181、RTSP/Onvif、RTMP&#xff0c;以及厂家的私有协议…

增量式编码器与绝对值编码器基础详解

文章目录 1 使用什么样的电信号来表示旋转和角度信息?1.1 表示相对角度的增量法1.2 表示绝对角度的绝对方法1.3 用脉冲信号表示绝对角度的伪绝对法2 相对角和绝对角的优缺点3 总结1 使用什么样的电信号来表示旋转和角度信息? 在第二部分中,我们解释了旋转和角度信息大致分为…

C++程序设计-第六/七/八章 运算符重载/包含与继承/虚函数和多态性【期末复习|考研复习】

前言 总结整理不易&#xff0c;希望大家点赞收藏。 给大家整理了一下C程序设计中的重点概念&#xff0c;以供大家期末复习和考研复习的时候使用。 C程序设计系列文章传送门&#xff1a; 第一章 面向对象基础 第四/五章 函数和类和对象 第六/七/八章 运算符重载/包含与继承/虚函…

Qt之输入框带自动补全提示功能

这个功能主要是提升人机交互的体验,在输入信息时,自动读取历史信息,协助用户自动补全信息,帮助用户快速输入。 一、使用的控件 使用QComboBox代替传统文本输入框,同时将其属性改为可编辑。 二、使用方式 可以不输入信息,下拉选择项:代码中使用QStringList作为提示信息…

汽车协议学习

ⅠOBD 1.OBD接口 OBD有16个引脚&#xff0c;每个引脚的电压不同&#xff08;可以对应不同的协议&#xff09; 车端&#xff1a; 16- 9 (短一点点的) 8-1 &#xff08;长一点的&#xff09; 2.基于OBDⅡ的通信协议 CAN &#xff08;ISO-15765&am…

如何基于 esp-at 固件测试 TCP (UART 转 WiFi 透传)吞吐?

测试工具&#xff1a; windows/Ubuntu/Android&#xff08;电脑或手机与 ESP 开发板连接相同路由器&#xff09;iperf2 工具ESP 系列的开发板USB-TTL 串口调试工具路由器 测试固件&#xff1a; AT 固件 AT 固件硬件接线说明 不同环境下的 Iperf 工具安装说明 Iperf 工具用于…

用C语言执行SQLite3的gcc编译细节

错误信息&#xff1a; /tmp/cc3joSwp.o: In function main: execSqlite.c:(.text0x100): undefined reference to sqlite3_open execSqlite.c:(.text0x16c): undefined reference to sqlite3_exec execSqlite.c:(.text0x174): undefined reference to sqlite3_close execSqlit…

部署LVS负载均衡集群架构

目录 一、ipvsadm 工具 二、NAT模式下部署LVS负载均衡 1、部署NFS共享存储服务器 1.1 安装NFS软件 1.2 新建共享目录和站点文件 1.3 设置共享策略 2、部署节点服务器1 2.1 安装并启动nginx软件 2.2 挂载共享目录到网页站点目录 2.3 修改网关 3、部署节点服务器2 3.…

植物病害识别:YOLO甘蔗叶片病害识别分类数据集

YOLO甘蔗叶片病害识别数据集, 包含尾孢菌叶斑病&#xff0c;眼斑病&#xff0c;健康&#xff0c;红腐病&#xff0c;锈病&#xff0c;黄叶病6个常见病类别&#xff0c;3300多张图像&#xff0c;yolo标注完整&#xff0c;全部原始图像&#xff0c;未应用增强。 适用于CV项目&…

GEE错误——Landsat9数据集进行去云操作后显示白板

问题 我遇到了一些有关 Landsat9 图像中的云遮蔽和图像处理的问题。我正在分享我所使用的代码以及我感兴趣的区域(资产)。请帮我解决这个问题。我是一名 GEE 学习者。问题:最终图像在大面积上有云状覆盖。 这里我们查看了搜索出的代码发现并不是没有数据集导致的,该区域有…

【Java探索之旅】数据类型与变量,字面常量,整型变量

&#x1f3a5; 屿小夏 &#xff1a; 个人主页 &#x1f525;个人专栏 &#xff1a; Java入门到精通 &#x1f304; 莫道桑榆晚&#xff0c;为霞尚满天&#xff01; 文章目录 &#x1f4d1;前言一、字面常量二、数据类型三、变量3.1 变量概念3.2 语法格式 四、整型变量4.1 整型变…

【C++ vector 类】

1. 标准库中的vector类 vector 类 的介绍&#xff1a; 注意&#xff1a; 1. vector是表示可变大小数组的序列容器。 2. 就像数组一样&#xff0c;vector 也采用的连续存储空间来存储元素。也就是意味着可以采用下标对vector的元素进行访问&#xff0c;和数组一样高效。但是…

seo蜘蛛池的概念!蚂蚁SEO

蜘蛛池是一种特殊的网络营销技术&#xff0c;它的主要作用是吸引搜索引擎爬虫&#xff0c;提高网站的收录和排名&#xff0c;从而增加网站的流量和曝光度。 蚂蚁SEO是一个SEO工具&#xff0c;可以帮助您提高网站权重&#xff0c;吸引更多的搜索引擎爬虫&#xff0c;提高网站的…

物联网云原生云边协同

文章目录 一、物联网平台设计1.物联网平台设计2.物联网平台实现 二、部署环境1.节点配置2.版本信息 三、物联网平台部署1.部署 Kubernetes 集群2.部署 KubeEdge3.部署 ThingsBoard 集群4.部署 ThingsBoard Edge4.1.创建 Edge 实例4.2.部署 PostgreSQL4.3.创建数据库4.4.部署 Th…

Linux之生产消费者模型

(&#xff61;&#xff65;∀&#xff65;)&#xff89;&#xff9e;嗨&#xff01;你好这里是ky233的主页&#xff1a;这里是ky233的主页&#xff0c;欢迎光临~https://blog.csdn.net/ky233?typeblog 点个关注不迷路⌯▾⌯ 我们在条件满足的时候&#xff0c;唤醒指定的线程&a…

unity学习(53)——选择角色界面--分配服务器返回的信息

好久没写客户端了&#xff0c;一上手还不太适应 1.经过测试&#xff0c;成功登陆后&#xff0c;客户端请求list_request&#xff0c;成功返回&#xff0c;如下图&#xff1a; 可见此时model第三个位置的参数是1.也成功返回了所有已注册角色的信息。 2.之前已知创建的角色信息…

计算机服务器中了locked勒索病毒怎么解密,locked勒索病毒解密流程

科技的发展带动了企业生产&#xff0c;越来越多的企业开始利用计算机服务器办公&#xff0c;为企业的生产运营提供了极大便利&#xff0c;但随之而来的网络安全威胁也引起了众多企业的关注。近日&#xff0c;云天数据恢复中心接到许多企业的求助&#xff0c;企业的计算机服务器…

(每日持续更新)信息系统项目管理(第四版)(高级项目管理)考试重点整理 第13章 项目资源管理(七)

项目建议与立项申请、初步可行性研究、详细可行性研究、评估与决策是项目投资前使其的四个阶段。在实际工作中&#xff0c;初步可行性研究和详细可行性研究可以依据项目的规模和繁简程度合二为一&#xff0c;但详细可行性研究是不可缺少的。升级改造项目制作初步和详细研究&…

【01背包与完全背包】Aswing

01 https://www.acwing.com/problem/content/description/2/ #include<bits/stdc.h>using namespace std;const int MAXN 1005; int v[MAXN]; // 体积 int w[MAXN]; // 价值 int f[MAXN][MAXN]; // f[i][j], j体积下前i个物品的最大价值 int main() {int n,…

大模型产业落地,安全运营能否迎来“自动驾驶”时刻?

科技云报道原创。 通过一段文字描述&#xff0c;就能生成60秒堪比大片的视频&#xff0c;来自大模型Sora的出色表现&#xff0c;让全球都为之震撼。 无论是ChatGPT还是Sora&#xff0c;都只是大模型走出实验室的第一步&#xff0c;大模型如何在产业中落地&#xff0c;为具体的…