火山引擎 LAS Spark 升级:揭秘 Bucket 优化技术

news2024/9/25 7:14:21

更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群

文章介绍了 Bucket 优化技术及其在实际业务中的应用,包括 Spark Bucket 的基本原理,重点阐述了火山引擎湖仓一体分析服务 LAS(下文以 LAS 指代)Spark 对 Bucket 优化的功能增强, 实现了 Bucket 易用性的巨大提升,优化的覆盖范围得以扩大,且在字节内部已有成功应用案例。文中提及的能力增强在 LAS 上均可直接使用,欢迎大家到火山引擎官网进行体验和选购(点击前往),文末更有专属彩蛋,新人优惠购福利,等着你来解锁!

本篇文章提纲如下:

  • Bucket 优化技术简介
  • LAS bucket 增强
  • Bucket 优化在字节内部的应用
  • 总结

Bucket 优化技术简介

Bucket 优化是通过将数据进行分桶、排序来优化查询速度的一种技术。

分桶是组织数据的一种方式,需要指定分桶字段、分桶数量;它对分桶字段的值进行哈希并取余,将余数相同的数据存在同一个分桶中。Bucket 表通过指定分桶字段、分桶数量、排序列,将写入的数据利用 Shuffle 分桶、桶内排序后再写入文件中。

Bucket 表创建语法如下,clustered by (id) 指定分桶列,sorted by (id)指定排序列,into 4 buckets ****指定分桶数量。

create table user(id Int, info String) clustered by (id) sorted by (id) into  4 buckets; 

读、写 Bucket 表与非 Bucket 表的 SQL 语法一样,无需用户修改。

insert overwrite table user select id, info from ... where ...

前述 SQL 写出 bucket 表的执行计划如下,如果原本作业输出数据的分布不满足 Bucket 分桶要求的话,会引入一次额外的 Shuffle、Sort 开销。

1.PNG
产出非 Bucket 表和 Bucket 表的示意图对比如下。

2.PNG
下游任务按照分桶列来 Join、Group By 或 Window 时可以省去一次 Shuffle 、Sort 的开销。下图分别展示了有、无 Bucket 优化时的 SortMerge Join 的执行计划;在有 Bucket 优化时,能够省掉 A、B 两表的 Shuffle 和 Sort。

3.png

LAS bucket 增强

结合实际业务场景,LAS Spark 团队进一步增强了 Spark 的 Bucket 优化:

  • 兼容 Hive Bucket 优化,支持跨引擎读取
  • 读、写 Bucket 表时,支持更多场景下的 Shuffle 消除
  • 兼容历史非 Bucket 分区
  • 支持分区级别设置分桶数

兼容 Hive Bucket 优化

数仓中的表可能会被多个计算引擎读取,目前字节内部同时支持 SparkSQL、Presto 两大 OLAP 引擎。为了让不同的计算引擎都能利用表的 Bucket 信息优化查询,需要对齐各个引擎的 Bucket 实现。下图展示了 Hive/Presto 写 Bucket 表与原生 Spark 写 Bucket 表对比。

4.png
通过上图可以发现,Hive 在写 Bucket 表之前会将相同分桶的数据通过 reduce 操作写到一个文件中,而 Spark 原生 Bucket 优化并没有这一步,因此存在如下问题:

问题1 —— 过多小文件:Spark 写出 Bucket 表的原生实现是,在 mapper 端将数据写到文件当中,而每个 map task 中可能包含多个分桶的数据,最坏情况下会产生 MB 个文件,M 是 map task 数目,B 是分桶数。按照这个逻辑,每个分桶内的数据都被分成了 M 份,因此可能大部分都是小文件。当任务并发度为 1000、分桶数目为128 时,最坏情况下会产生 MB = 128000 个文件,如此多的文件数目会大大增加 HDFS NameNode 的压力,增加 HDFS 读取的延时。

问题2 —— 无法保证单个分桶内数据有序性:Spark 原生的 Bucket 表中,每个分桶下有多个文件,无法保证桶内数据有序,因此,在做 SortMerge Join 前仍然需要排序。

由于 Presto 支持兼容 Hive Bucket 优化、Spark 原生的 Bucket 优化存在前述两个问题,LAS Spark 团队最终选择让 Spark 兼容 Hive Bucket 优化。实现步骤如下

  • 动态增加一次以分桶列为 reduce Key 并且并行度与分桶数目相同的 Shuffle,在数据写出前排序,保证同一个分桶内的数据只写一个文件、且桶内数据有序。
  • 为 Spark 支持 HiveHash 的哈希函数,使其既能写出与 Hive 兼容的 Bucket 表,也能读取 Hive Bucket 表、并利用分桶信息消除 Shuffle。

支持更多场景下的 Shuffle 消除

  1. 分桶数成倍数关系

Spark 要求只有分桶数目相同的 Bucket 表才能消除 ShuffledJoin 之前的 Shuffle。对于两张大小相差很大的表,比如几百 GB 的维度表与几十 TB (单分区)的事实表,它们的分桶个数往往不同,并且个数相差很多,默认无法消除 Join 前的 Shuffle。为了尽可能多地减少 Shuffle 带来的开销,LAS Spark 通过两种方式支持了消除分桶数成倍数关系的 Bucket 表在 Join 前的 Shuffle。

第一种方式,Task 个数与小表分桶个数相同。如下图所示,表 A 包含 3 个分桶,表 B 包含 6 个分桶。此时表 B 的 bucket 0 与 bucket 3 的数据合集应该与表 A 的 bucket 0 进行 Join。这种情况下,可以启动 3 个 Task。在 Task 0 中 Join 表 A 的 bucket 0 与表 B 的 bucket 0 + bucket 3 。在这里,需要对表 B 的 bucket 0 与 bucket 3 的数据再做一次 merge sort 从而保证合集有序。

5.png

如果表 A 与表 B 的分桶个数相差不大,可以使用上述方式。如果表 B 的分桶个数是表 A 分桶个数的 10 倍,那上述方式虽然避免了 Shuffle,但可能因为并行度不够反而比包含 Shuffle 的 SortMerge Join 速度慢。此时可以使用另外一种方式,即 Task 个数与大表 Bucket 个数相等,如下图所示:

6.png

在该方案下,可将表 A 的 3 个分桶读多次。在上图中,直接将表 A 与表 A 进行 BucketUnion (新的算子,与 Union 类似,但保留了 Bucket 特性),结果相当于将表 A 设置成 6 个分桶,与表 B 的分桶个数相同。

  1. Shuffle Key 是分桶列超集

对于一张常用表,可能会与另外一张表按 User 字段做 Join,也可能会与另外一张表按 User 和 App 字段做 Join,与其它表按 User 与 Item 字段进行 Join。而 Spark 原生的 Bucket 优化要求 Join Key Set 与表的分桶列完全相同才能消除 ShuffledJoin 之前的 Shuffle。在该场景中,不同 Join 的 Key Set 不同,因此无法同时使用 Bucket 优化。这极大的限制了 Bucket 优化的适用场景。

针对此问题,LAS Spark 团队支持了超集场景下的 Bucket 优化。只要 Join Key Set 包含了分桶列,即可进行消除 Shuffled Join 之前的 Shuffle。

如下图所示,表 X 与表 Y,都按字段 A 分桶。而查询需要对表 X 与表 Y 进行 Join,且 Join Key Set 为 A 与 B。此时,由于 A 相等的数据,在两表中的 Bucket ID 相同,那 A 与 B 各自相等的数据在两表中的 Bucket ID 肯定也相同,所以数据分布是满足 Join 要求的,不需要 Shuffle。同时,Bucket 优化还需要保证两表按 Join Key Set 即 A 和 B 排序,此时只需要对表 X 与表 Y 进行分区内排序即可。由于两边已经按字段 A 排序了,此时再按 A 与 B 排序,代价相对较低。

7.png

  1. Bucket 表写时 Shuffle 消除

兼容 Hive Bucket 优化后,写入 Bucket 表前需要多一次 Shuffle。但是如果写出的数据分布满足分桶要求,则无需额外 Shuffle。LAS Spark 识别了两类这样的场景

  • Case 1:当写入 Bucket 表的前一个算子为 Group By/Window/Inner Join/Left Join/Right Join 时,支持消除 Shuffle;在部分场景下,写入 Bucket 表的前一个算子为 Full Out Join 时也支持消除 Shuffle。前述说的 Join 都是指 Shuffled Join。
  • Case 2:当且仅当写入 Bucket 表的前一个算子是 Group By时,支持写入超集 Shuffle 消除。

8.png

兼容历史非 Bucket 分区

在推进 Bucket 优化覆盖更多任务过程中,用户希望存量表也能通过改造利用到 Bucket 优化。如果直接向表的元信息中写入 Bucket 信息,在查询历史分区数据时会因为它们并未按 Bucket 表要求分布而报错。而存量表中历史数据量级比较大,按照 Bucket 分布重新生成一遍的开销过大,难以推进落地。为了解决以上问题,LAS Spark 团队扩展 了 SparkSQL 的语法,允许通过 DDL 修改表的属性,让表从某一刻开始支持变成 Bucket 表。

alter table tbl clustered by (a) sorted by (a) into 8 buckets;

前述 ddl 执行时会在表的元信息中记录 ddl 执行的日期、bucket 信息。后续写入数据时,会将数据进行 Shuffle 以符合 Bucket 分布;在进行 ShuffledJoin 时,先检查所查询的数据是否只包含 ddl 执行日期之后的分区。如果是,则当成 Bucket 表处理,支持 Bucket 优化;否则当成普通的表。

支持分区级别设置分桶数

由于数据量随业务增长而变多,这会造成单 Task 需要处理的数据量过大。因此,LAS Spark 支持了按分区级别来设置分桶数目,对于业务数据量大的分区,可以将分桶数目设置地更大,在减少 Shuffle 开销的同时提高并发度。

假设有一个被同一部门多条业务线使用的 Bucket 表,表级别定义分桶数目为 1024。

create table department (
    user_id int ,
    user_info string
) partitioned by ( date string, business string)
clustered by(user_id) sorted by(user_id) into 1024 buckets;

其中 business = ’ hot 分区存储热门业务的数据,数据量大约是其他业务的 2 倍 LAS Spark 通过扩展 SparkSQL 的语法,支持为该分区设置更大的分桶数目,如下:

ALTER TABLE department PARTITION(business='hot') SET NUM_BUCKETS = 2048;

数据写入:在将数据写出到表中时,如果当前分区有分区级别的分桶数目信息,按照分区级别的分桶数写入数据;否则,按照表级别的分桶数目写出数据。

数据读取:

  • 读取单个分区的情况比较好理解,读取时优先选择分区级别的分桶信息,如果没有的话再选择表级别分桶信息。
  • 读取多个分区时,如果所有分区的分桶数一样,当成普通 Bucket 表;否则,最大分桶数目的分区会保留分桶信息,其余分区的数据会当成普通分区来读取。以下图为例,同时读取 department 表的 hot、cold 两个分区,前者是 2048 个分桶,后者是 1024 个分桶,读取分区时会保留 hot 分区的分桶信息、对 cold 分区数据进行 Shuffle,然后对两部分数据进行 BucketUnion (保留 Bucket 信息的 Union 算子),让 department 表看起来是分桶数为 2048 的 Bucket 表,后续能利用到 Bucket 优化。

9.png

Bucket 优化在字节内部的应用

业务背景

数据生产链路的稳定性尤为重要,一旦链路中的任务失败或延迟产出都会给业务带来诸多问题。以视频数据为例,视频数据的生产链路如果无法保障稳定性则会带来如下风险:

  • SLA 破线风险高:数据量级大、消耗资源多、单次执行时间较长,一旦失败重跑很容易造成 SLA 破线;
  • 运维成本高:容易造成夜间报警起夜,同时任务就绪时间严重延迟时,还需人工介入调度下游,以免队列阻塞。整个过程需消耗较多人力。

在字节内部的某条链路任务上,耗时长、风险高的过程主要是 Shuffle 环节。目前,任务的 Shuffle 数据量较大,因此命中各种问题节点的概率增大,经常遇到坏节点、慢节点等问题,导致 Shuffle失败。同时也易将 Shuffle Service 的节点打满,进而导致 Task 出现 Fetch Fail 情况重试,当重试超过 3 次时,整个 Spark Application 会被 Kill 掉。

Bucket 改造

通过对链路任务的逻辑分析,结合 Bucket 特性,对全链路应用了 Bucket 优化。这里的全链路是指,从 ODS 层一直到 DM 层。链路中只要能利用到 Bucket 特性,则均进行了改造;共将链路中超过半数的表改造成 Bucket 表。改造时,通过如下方法利用尽可能多的 Bucket 优化:

  1. 通过对链路中任务的分析,发现大部分 Join、Group By 操作的 key 中都共同包含了作品 ID,因此选择作品 ID 作为分桶列,使得 Join/Group By/Window 引入的 Shuffle 的 Key 是分桶列的超集,从而消除这些 Shuffle。
  2. 保证链路中上下游之间的分桶个数成倍数关系,从而消除所有可能的 Shuffle。具体来说,要求设置的分桶数目必须为 2 的次幂,这样就保证了不同 Bucket 表的分桶数目成倍数关系。

改造收益

经过全链路 Bucket 改造后,全链路层面收益如下

  • 减少大任务 Shuffle 消除了大表 Shuffle 问题,减少大量 Shuffle 数据。
  • 链路运行稳定性提升:夜间生产链路无 Shuffle 失败问题。

单个任务层面的收益如下:

  • 长尾 任务减轻: 单 Task 时间可平均缩短 20%。
  • 资源节约: CPU 和内存资源可节约 10%-20%。
  • 运行时长: 任务运行时长无额外增加,其中部分任务因长尾任务减轻,时长可缩短20%。

总结

在消除 Exchange、优化 Shuffle 量级上,Bucket 优化是一个很有效的技术手段,但是原生的 Spark Bucket 不支持存量表迁移、无法在数据量增长后扩容 Bucket 数量、优化覆盖场景也比较有限,针对以上场景 LAS Spark 团队做了一系列的改进,使得 Bucket 的易用性得到了很大的提升、并扩大了优化的覆盖范围,以上能力的增强在 LAS 上均可直接使用,欢迎大家积极试用并提出宝贵的意见和建议。

湖仓一体分析服务 LAS(Lakehouse Analytics Service)是面向湖仓一体架构的 Serverless 数据处理分析服务,提供字节跳动最佳实践的一站式 EB 级海量数据存储计算和交互分析能力,兼容 Spark、Presto 生态,帮助企业轻松构建智能实时湖仓。新人优惠来袭!赠送给所有新人用户的专属福利来啦, LAS 数据中台新人特惠 1 元秒杀活动最新上线!更有超多叠加优惠等你来抢! 感谢大家一直以来对我们的支持与厚爱,我们会一如既往地为您带来更好的内容。 (点击链接,可顺滑体验)

链接:zjsms.com/jVCr5bp/

10.jpg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1131143.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

记录--vue3 + mark.js | 实现文字标注功能

这里给大家分享我在网上总结出来的一些知识,希望对大家有所帮助 页面效果 具体实现 新增 1、监听鼠标抬起事件,通过window.getSelection()方法获取鼠标用户选择的文本范围或光标的当前位置。2、通过 选中的文字长度是否大于0或window.getSelection().isC…

drf-过滤、排序、异常处理、自封装Response

过滤 过滤就是根据路由url?后的信息过滤出符合?后条件的数据而非全部,比如…/?nameweer就是只查name是weer的数据,其余不返回。 1、安装:pip3 install django-filter2、注册:在settings.py中的app中注册django-filt…

MYSQL(事务+锁+MVCC+SQL执行流程)理解(2)

一)MYSQL中的锁(知识补充) 可以通过In_use字段来进行判断是否针对于表进行加了锁 1)对于undo log日志来说:新增类型的,在事务提交之后就可以清除掉了,修改类型的,事务提交之后不能立即清除掉这些日志会用于mvcc只有当没有事务用到该版本信息时…

列表推导式、集合推导式、字典推导式、生成器

列表推导式 可以与三目运算符搭配使用 dict1 {name: "by", "age": 20} dict2 {name: "ss", "age": 25} dict3 {name: "sa", "age": 24} dict4 {name: "xs", "age": 27} list1 [dict1, …

Python 算法高级篇:分治算法的原理与应用

Python 算法高级篇:分治算法的原理与应用 1. 什么是分治算法?2. 分治算法的应用2.1 归并排序2.2 快速排序2.3 最大子数组问题2.4 汉诺塔问题 3. 代码示例3.1 分治算法求幂 4. 总结 分治算法是一种重要的算法设计技巧,它将一个大问题分解为多个…

(R900567512)4WE6D6X/OFEG24N9K4液压电磁换向阀

(R900567512)4WE6D6X/OFEG24N9K4液压电磁换向阀特点: ▶▶ 具有三位四通,二位四通或二位三通的方向设计 ▶▶ 高功率线圈 ▶▶ 油口安装面符合 DIN 24340 形式 A ▶▶ 油口安装面符合 ISO 4401-03-02-0-05和 NFPA T3.5.1 R2-2002 D03 ▶▶ 带…

虹科 | 解决方案 | 汽车示波器 索赔管理方案

索赔管理 Pico汽车示波器应用于主机厂/供应商与服务店/4S店的协作,实现产品索赔工作的高效管理;同时收集的故障波形数据,便于日后的产品优化和改进 故障记录 在索赔申请过程中,Pico汽车示波器的数据记录功能可以用于捕捉故障时的…

搭建zlmediakit和wvp_pro

zlmediakit使用zlmediakit/zlmediakit:master镜像 wvp_pro使用648540858/wvp_pro,可参照https://github.com/648540858/wvp-GB28181-pro wvp_pro官方https://doc.wvp-pro.cn/#/ 刚开始我找了个docker镜像运行,后来播放页面一直加载,最后就用了…

软件设计包括了四个既独立又相互联系的活动

软件设计包括了四个既独立又相互联系的活动

广告联盟是什么?app开发者如何选择广告联盟?

在广告变现过程中,广告联盟是必不可少的因素,广告联盟是一种在线广告服务模式,它将广告主和网站主(或博主、应用开发者等)联系在一起,通过广告联盟平台的中介服务,将广告主的广告投放到网站主的…

sigmoid和softmax函数的区别;神经网路常用的损失函数以及对应的应用场景;softmax的作用

一、Sigmoid函数和Softmax函数是常用的激活函数,它们在神经网络中经常用于实现非线性映射。 Sigmoid函数:Sigmoid函数常用于二分类问题,它将输入的取值范围映射到一个介于0和1之间的输出。Sigmoid函数的公式是 f(x) 1 / (1 exp(-x))&#x…

Python的web自动化学习(一)Selenium库的工作原理

Python的web自动化学习(一)Selenium库的工作原理 首发2023-07-31 11:01云中随心而记 后续会根据持续学习来更新,大家一起学习 引言: Selenium是一个流行的自动化测试工具,用于模拟和控制浏览器行为,常用于…

美创科技列为IDC中国数据安全市场代表厂商

近日,国际权威IT咨询机构IDC发布《中国数据安全市场发展趋势,2023》报告,报告针对中国数据安全市场的发展现状进行调研,明确了最终用户数据安全建设的痛点、难点,阐述了市场中各技术服务提供商的服务方案和优势。 美创…

24 个常见的 Docker 疑难杂症处理技巧!!!记得点赞收藏

1. Docker 迁移存储目录 默认情况系统会将 Docker 容器存放在 /var/lib/docker 目录下 [问题起因] 今天通过监控系统,发现公司其中一台服务器的磁盘快满了,随即上去看了下,发现 /var/lib/docker 这个目录特别大。由上述原因,我们都…

【openGauss】一主一备实现主备节点切换实验(switchover、failover)

openGauss在运行过程中,数据库管理员可能需要手工对数据库节点做主备切换。例如发现数据库节点主备failover后需要恢复原有的主备角色,或怀疑硬件故障需要手动进行主备切换。 级联备机不能直接转换为主机,只能先通过switchover或者failover成…

抖音上的影视解说是怎么配音的?

抖音影视解说的配音是从各种渠道配的。其中,最常见的方法是使用 AI 配音。AI 配音可以通过在线配音平台或语音合成软件实现。这些软件可以生成各种声音,包括电影解说、主播、卡通、游戏等。此外,一些抖音影视解说也使用了真人配音。真人配音通…

代码训练营第49天:leetcode139单词划分|多重背包|背包讲解

leetcode139&#xff1a;单词划分 文章讲解&#xff1a;leetcode139 多重背包&#xff1a;文章讲解 背包讲解&#xff1a;文章讲解 1&#xff0c;leetcode139 单词划分 class Solution { public:bool wordBreak(string s, vector<string>& wordDict) {unordered_set…

Redis 主从

目录 ​编辑一、构建主从架构 1、集群结构 2、准备实例和配置 &#xff08;1&#xff09;创建目录 &#xff08;2&#xff09;修改原始配置 &#xff08;3&#xff09;拷贝配置文件到每个实例目录 &#xff08;4&#xff09;修改每个实例的端口&#xff0c;工作目录 &a…

SpringBoot-集成Minio

官方文档&#xff1a;Kubernetes 的 MinIO 对象存储 — MinIO Object Storage for Kubernetes 一、简介 Minio 是一个基于Apache License v2.0开源协议的对象存储服务。它兼容亚马逊S3云存储服务接口&#xff0c;非常适合于存储大容量非结构化的数据&#xff0c;例如图片、视频…