探索Apache Hudi核心概念 (3) - Compaction

news2025/1/12 5:59:23

Compaction是MOR表的一项核心机制,Hudi利用Compaction将MOR表产生的Log File合并到新的Base File中。本文我们会通过Notebook介绍并演示Compaction的运行机制,帮助您理解其工作原理和相关配置。

1. 运行 Notebook

本文使用的Notebook是:《Apache Hudi Core Conceptions (4) - MOR: Compaction》,对应文件是:4-mor-compaction.ipynb,请先修改Notebook中的环境变量S3_BUCKET,将其设为您自己的S3桶,并确保用于数据准备的Notebook:《Apache Hudi Core Conceptions (1) - Data Preparation》已经至少执行过一次。Notebook使用的Hudi版本是0.12.1,Spark集群建议配置32 vCore / 128 GB及以上。

2. 核心概念

Compaction负责定期将一个File Slice里的Base File和从属于它的所有Log File一起合并写入到一个新的Base File中(产生新的File Slice),唯有如此,MOR表的日志文件才不至于无限膨胀下去。以下是与Compaction有关的几项重要配置,在后面的介绍中我们会逐一介绍它们的作用:

配置项默认值
hoodie.compact.inlinefalse
hoodie.compact.schedule.inlinefalse
hoodie.compact.inline.max.delta.commits5

2.1. 排期与执行

Compaction的运行机制包括:排期(Schedule)和执行(Execute)两个阶段。排期阶段的主要工作是划定哪些File Slices将参与Compaction,然后生成一个计划(Compaction Plan)保存到Timeline里,此时在Timeline里会出现一个名为compaction的Instant,状态是REQUESTED;执行阶段的主要工作是读取这个计划(Compaction Plan)并执行它,执行完毕后,Timeline中的compaction就会变成COMPLETED状态。

2.2. 同步与异步

从运行模式上看,Compaction又分同步、异步以及半异步三种模式(“半异步”模式是本文使用的一种叫法,为的是和同步、异步两种模式的称谓对齐,Hudi官方文档对这一模式有介绍,但没有给出命名),它们之间的差异主要体现在从(达到规定阈值的某次)提交(Commit)到排期(Schedule)再到执行(Execute)三个阶段的推进方式上。在Hudi的官方文档中,交替使用了Sync/Async和Inline/Offline两组词汇来描述推进方式,这两组词汇是有微妙差异的,为了表述严谨,我们使用同步/异步和立即/另行这两组中文术语与之对应。以下是Compaction三种运行模式的详细介绍:

  • 同步模式(Inline Schedule,Inline Execute)

请添加图片描述

同步模式可概括为:立即排期,立即执行(Inline Schedule,Inline Execute)。在该模式下,当累积的增量提交(deltacommit)次数到达一个阈值时,会立即触发Compaction的排期与执行(排期和执行是连在一起的),这个阈值是由配置项 hoodie.compact.inline.max.delta.commits 控制的,默认值是5,即:默认情况下,每5次增量提交就会触发并执行一次Compaction。锁定同步模式的配置是:

配置项设定值
hoodie.compact.inlinetrue
hoodie.compact.schedule.inlinefalse
  • 异步模式(Offline Schedule,Offline Execute)

请添加图片描述

异步模式可概括为:另行排期,另行执行(Offline Schedule,Offline Execute)。在该模式下,任何提交都不会直接触发和执行Compaction,除非使用了支持异步Compaction的Writer,否则用户需要自己保证有一个独立的进程或线程负责定期执行Compaction操作。Hudi提供了四种运行异步Compaction的方式:

  1. 通过hudi-cli或提交Spark作业驱动异步Compaction
  2. 提交Flink作业驱动异步Compaction
  3. 在HoodieDeltaStreamer中配置并运行异步Compaction
  4. 在Spark Structured Streaming中配置并运行异步Compaction

在后面的测试用例中,我们将使用第一种方式演示如何进行异步的Compaction排期与执行。和同步模式一样,在异步模式下,同样是当增量提交(deltacommit)次数达到一定的阈值时才会触发排期,这个阈值依然是hoodie.compact.inline.max.delta.commits

异步模式面临的场景要比同步模式复杂一些,同步模式下,每次提交时都会检查累积的提交次数是否已达规定阈值,所以在同步模式下,每次排期涵盖的增量提交数量基本是固定的,就是阈值设定的次数,但是在异步模式下,由于发起排期和增量提交之间没有必然的协同关系,所以在发起排期时,Timeline中可能尚未积累到足够数量的增量提交,或者增量提交数量已经超过了规定阈值,如果是前者,不会产生排期计划,如果是后者,排期计划会将所有累积的增量提交涵盖进来。锁定异步模式的配置是:

配置项设定值
hoodie.compact.inlinefalse
hoodie.compact.schedule.inlinefalse
  • 半异步模式(Inline Schedule,Offline Execute)

请添加图片描述

半异步模式可概括为:立即排期,另行执行(Inline Schedule,Offline Execute),即:排期会伴随增量提交(deltacommit)自动触发,但执行还是通过前面介绍的四种异步方式之一去完成。锁定半异步模式的配置是:

配置项设定值
hoodie.compact.inlinefalse
hoodie.compact.schedule.inlinetrue

3. 同步Compaction

3.1. 关键配置

《Apache Hudi Core Conceptions (4) - MOR: Compaction》的第1个测试用例演示了同步Compaction的运行机制。测试用的数据表有如下几项关键配置:

配置项默认值设定值
hoodie.compact.inlinefalsetrue
hoodie.compact.schedule.inlinefalsefalse
hoodie.compact.inline.max.delta.commits53
hoodie.copyonwrite.record.size.estimate1024175

这些配置项在介绍概念时都已提及,通过这个测试用例将会看到它们组合起来的整体效果。

3.2. 测试计划

该测试用例会先后插入或更新三批数据,然后进行同步的Compaction排期和执行,过程中将重点观察时间线和文件布局的变化,整体测试计划如下表所示:

步骤操作数据量(单分区)文件系统
1Insert96MB+1 Base File
2Update788KB+1 Log File
3Update1.2MB+1 Log File +1 Compacted Base File

提示:我们将使用色块标识当前批次的Instant和对应存储文件,每一种颜色代表一个独立的File Slice。

3.3. 第1批次

第1批次单分区写入96MB数据,Hudi将其写入到一个Parquet文件中,第一个File Group随之产生,它也是后续 Log File的Base File。需要注意的一个细节是:对于MOR表来说,只有进行Compaction的那次提交才会被称为“commit”,在Compaction之前的历次提交都被称作“deltacommit”,即使对于新建Base File写入数据的那次提交也是如此,就如同这里一样。

请添加图片描述

3.4. 第2批次

第2批次更新了一小部分数据,Hudi将更新数据写入到了Log文件中,大小788KB,fileVersion是1,它从属于上一步生成的Parquet文件,即Parquet文件是它的Base File ,这个Log文件的fileId和尾部的时间戳(baseCommitTime)与Parquet文件是一样的。当前的Parquet文件和Log文件组成了一个File Slice。

请添加图片描述

3.5. 第3批次

第3批次再次更新了一小部分数据,Hudi将更新数据又写入到一个Log文件中,大小1.2MB,fileVersion是2。与上一个Log文件一样,fileId和尾部的时间戳(baseCommitTime)与Parquet文件一致,所以它也是Parquet文件的Delta Log,且按Timeline排在上一个Log文件之后。当前的File Slice多了一个新的Log文件。但是,不同于第2批次,第3批次的故事到这里还没有结束,在该测试用例中,当前测试表的设置是:每三次deltacommit会触发一次Compaction,因此,第3次操作后就触发了第1次的Compaction操作:

请添加图片描述

于是,在Timeline上出现了一个commit(No.3),同时,在文件系统上,生成了一个新的96MB的Parquet文件,它是第一个Parquet文件连同它的两个Log文件重新压缩后得到的,这个新的Parquet文件fileId没变,但是instantTime变成了Compaction对应的commit时间,于是,在当前File Group里,第二个File Slice产生了,目前它还只有一个Base File,没有Log File。

请添加图片描述

3.6. 复盘

最后,让我们将此前的全部操作汇总在一起,重新看一下整体的时间线和最后的文件布局:

请添加图片描述

4. 异步Compaction

4.1. 关键配置

《Apache Hudi Core Conceptions (4) - MOR: Compaction》的第2个测试用例演示了异步Compaction的运行机制。测试用的数据表有如下几项关键配置:

配置项默认值设定值
hoodie.compact.inlinefalsefalse
hoodie.compact.schedule.inlinefalsefalse
hoodie.compact.inline.max.delta.commits53
hoodie.copyonwrite.record.size.estimate1024175

这些配置项在介绍概念时都已提及,通过这个测试用例将会看到它们组合起来的整体效果。

4.2. 测试计划

该测试用例会先后插入或更新三批数据,然后进行异步的Compaction排期和执行,过程中将重点观察时间线和文件布局的变化,整体测试计划如下表所示:

步骤操作数据量(单分区)文件系统
1Insert96MB+1 Base File
2Update788KB+1 Log File
3Update1.2MB+1 Log File
4Offline ScheduleN/AN/A
5Offline Execute96.15MB+1 Compacted Base File

由于该测试用例的前三步操作与第3节(第1个测试用例)完全一致,所以不再赘述,我们会从第4步操作(Notebook的3.8节)开始解读。

4.3. 异步排期

在完成了和第3节完全一样的前三批操作后,时间线和文件系统的情形如下:

请添加图片描述

这和3.5节执行后的状况非常不同,没有发生Compaction,连排期也没有看到,因为我们关闭了hoodie.compact.inline。于是,在接下来的第4步操作中(Notebook的3.8节),我们通过spark-submit手动发起了一个排期作业(--mode 'schedule'):

sudo -u hadoop spark-submit \
  --jars '/usr/lib/hudi/hudi-spark-bundle.jar' \
  --class 'org.apache.hudi.utilities.HoodieCompactor' \
  /usr/lib/hudi/hudi-utilities-bundle.jar \
  --spark-memory '4g' \
  --mode 'schedule' \
  --base-path "s3://${S3_BUCKET}/${TABLE_NAME}" \
  --table-name "$TABLE_NAME" \
  --hoodie-conf "hoodie.compact.inline.max.delta.commits=3"

执行后,文件布局没有变化,但是在时间线中出现了一个状态为REQUESTEDcompaction

请添加图片描述

4.4. 异步执行

第5步操作(Notebook的3.9节)通过spark-submit手动发起了一个执行作业(--mode 'execute'):

sudo -u hadoop spark-submit \
  --jars '/usr/lib/hudi/hudi-spark-bundle.jar' \
  --class "org.apache.hudi.utilities.HoodieCompactor" \
  /usr/lib/hudi/hudi-utilities-bundle.jar \
  --spark-memory '4g' \
  --mode 'execute' \
  --base-path "s3://${S3_BUCKET}/${TABLE_NAME}" \
  --table-name "$TABLE_NAME"

执行后,原compaction状态由REQUESTED变为COMPLETED,原Base File和两个Log File被合并打包成一个新的Base File文件,大小96MB:

请添加图片描述

4.5. 异步排期 + 异步执行

异步的排期和执行可以通过一个命令一步完成,《Apache Hudi Core Conceptions (4) - MOR: Compaction》的第3个测试用例演示了这一操作。它的前三步操作与第2个测试用例一样,在第四步时,使用了“Schedule + Execute”一起执行的方式(--mode 'scheduleAndExecute')一步完成了Compaction操作,命令如下:

sudo -u hadoop spark-submit \
  --jars '/usr/lib/hudi/hudi-spark-bundle.jar' \
  --class 'org.apache.hudi.utilities.HoodieCompactor' \
  /usr/lib/hudi/hudi-utilities-bundle.jar \
  --spark-memory '4g' \
  --mode 'scheduleAndExecute' \
  --base-path "s3://${S3_BUCKET}/${TABLE_NAME}" \
  --table-name "$TABLE_NAME" \
  --hoodie-conf "hoodie.compact.inline.max.delta.commits=3"

5. 半异步Compaction

5.1. 关键配置

《Apache Hudi Core Conceptions (4) - MOR: Compaction》的第4个测试用例演示了半异步Compaction的运行机制。测试用的数据表有如下几项关键配置:

配置项默认值设定值
hoodie.compact.inlinefalsefalse
hoodie.compact.schedule.inlinefalsetrue
hoodie.compact.inline.max.delta.commits53
hoodie.copyonwrite.record.size.estimate1024175

这些配置项在介绍概念时都已提及,通过这个测试用例将会看到它们组合起来的整体效果。

5.2. 测试计划

该测试用例会先后插入或更新三批数据,然后进行异步的Compaction Execute,过程中将重点观察时间线和文件布局的变化,整体测试计划如下表所示:

步骤操作数据量(单分区)文件系统
1Insert96MB+1 Base File
2Update788KB+1 Log File
3Update1.2MB+1 Log File
4Offline Execute96.15MB+1 Compacted Base File

由于该测试用例的前三步操作与第3节(第1个测试用例)完全一致,所以不再赘述,我们会从第3步操作(Notebook的5.7节)开始解读。

5.3. 同步排期

在完成了和第3节完全一样的前三批操作后,时间线和文件系统的情形如下:

请添加图片描述

在该模式下,第3次提交自动触发了Compaction排期,状态为REQUESTED

5.4. 异步执行

在接下来的第4步操作中,通过spark-submit手动发起了一个执行作业,排期计划被consume,原REQUESTED状态的Compaction变成了COMPLETED

请添加图片描述


关于作者:耿立超,架构师,著有 《大数据平台架构与原型实现:数据中台建设实战》一书,多年IT系统开发和架构经验,对大数据、企业级应用架构、SaaS、分布式存储和领域驱动设计有丰富的实践经验,个人技术博客:https://laurence.blog.csdn.net

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/419961.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

此战成硕,我成功上岸西南交通大学了~~~

友友们,好久不见,很长时间没有更一个正式点的文章了! 是因为我在去年年底忙着准备初试,今年年初在准备复试,直到3月底拟录取后,终于可以写下这篇上岸贴,和大家分享一下考研至上岸的一个过程 文章…

springboot+thymeleaf实现发Html邮件自由

2019年,我刚接触测试架构和测试开发类的工作时,经常会有自动化发邮件的功能,大都是从各个平台自动化统计一些数据出来,每周定时发一封邮件给领导交差,回过头来再看看我发的邮件,不美观,不专业。…

Android Jetpack:现代化Android开发的利器

Android Jetpack:现代化Android开发的利器 引言 随着移动应用的快速发展和用户体验的不断提升,现代化的Android应用开发变得愈发复杂和多样化。为了提高开发效率、简化代码、加速应用迭代,Google推出了Android Jetpack组件,成为现…

springboot零基础到项目实战

推荐教程: springboot零基础到项目实战 SpringBoot这门技术课程所包含的技术点其实并不是很多,但是围绕着SpringBoot的周边知识,也就是SpringBoot整合其他技术,这样的知识量很大,例如SpringBoot整合MyBatis等等。因此…

gitlab-ce升级方法

Centos7升级gitlab-ce: 1、记录当前版本 在升级前一定要做好备份,记录自己当前的gitlab-ce的版本: yum list | grep gitlab-ce 2、编辑/etc/gitlab/gitlab.rb文件: 1)将下面几行注释取消。 说明: 1&am…

pytorch进阶学习(二):使用DataLoader读取自己的数据集

上一节使用的是官方数据集fashionminist进行训练,这节课使用自己搜集的数据集来进行数据的获取和训练。所需资源教学视频:https://www.bilibili.com/video/BV1by4y1b7hX/?spm_id_from333.1007.top_right_bar_window_history.content.click&vd_sourc…

建立数据驱动,关键字驱动和混合Selenium框架这些你了解吗

一、什么是Selenium框架? Selenium框架是一种代码结构,用于简化代码维护和提高代码可读性。框架涉及将整个代码分成较小的代码段,以测试特定的功能。 该代码的结构使得“数据集”与实际的“测试用例”分开,后者将测试Web应用程序…

【PyTorch】第八节:数据的预处理

作者🕵️‍♂️:让机器理解语言か 专栏🎇:PyTorch 描述🎨:PyTorch 是一个基于 Torch 的 Python 开源机器学习库。 寄语💓:🐾没有白走的路,每一步都算数&#…

【NVIDIA GPU 入门】综述

系列文章目录 文章目录系列文章目录前言一、概述二、GPU架构基础2.1 GPU概述2.2 GPU的架构2.3 自主查询GPU相关信息三、CUDA编程概念3.1 CUDA线程模型3.1 线程层次结构1.引入库2.读入数据总结参考文献前言 GPU作为机器学习的基础运算设备,基本上是无人不知无人不晓。…

【bsauce读论文】PSPRAY-基于时序侧信道的Linux内核堆利用技术

会议:USENIX Security’23 作者:来自 Seoul National University 的 Yoochan Lee、Byoungyoung Lee 等人。 主要内容:由于Linux内核的堆分配器SLUB开启的freelist随机化保护,所以堆相关的内核漏洞利用成功率较低(平均…

BEV(一)---lift splat shoot

1. 算法原理 1.1 2D坐标与3D坐标的关系 如图,已知世界坐标系上的某点P(Xc, Yc, Zc)经过相机的内参矩阵可以获得唯一的图像坐标p(x, y),但是反过来已知图像上某点p&…

软考初级程序员--学习

1、十进制 转 二进制 1.1、整数十进制87 转换为 二进制为 1010111 1.2 、小数十进制0.125 转为 二进制 为 0.001 使用乘2取整法,一直乘到没有小数 2、二进制 转 十进制 2.1、二进制1010111 转换为 十进制 2.2、 二进制小数0.001 转 十进制 3、循环队列 计算长度通用…

周赛341(模拟、双指针、树上DP)

文章目录周赛341[6376. 一最多的行](https://leetcode.cn/problems/row-with-maximum-ones/)暴力模拟[6350. 找出可整除性得分最大的整数](https://leetcode.cn/problems/find-the-maximum-divisibility-score/)暴力模拟[6375. 构造有效字符串的最少插入数](https://leetcode.c…

JVM系统优化实践(15):GC可视化工具实践

您好,我是湘王,这是我的CSDN博客,欢迎您来,欢迎您再来~ 线上系统的JVM监测要么使用jstat、jmap、jhat等工具查看JVM状态,或者使用监控系统,如Zabbix、Prometheus、Open-FaIcon、Ganglia等。作为…

pyg的NeighborLoader和LinkNeighborLoader

NeighborLoader 1 数据格式要求 需要传入加载的属性值: class NeighborLoader(data: Union[Data, HeteroData, Tuple[FeatureStore, GraphStore]], num_neighbors: Union[List[int], Dict[Tuple[str, str, str], List[int]]], input_nodes: Union[Tensor, None…

进程调度的基本过程

进程调度的基本过程🔎 进程是什么🔎 进程管理🔎 进程中结构体的属性进程标识符(PID)内存指针文件描述符表结构体中与进程调度相关的属性进程的状态进程的优先级进程的上下文进程的记账信息🔎 总结🔎 结尾🔎…

(第十四届蓝桥真题) 整数删除(线段树+二分)

样例输入: 5 3 1 4 2 8 7 样例输出: 17 分析:这道题我想的比较复杂,不过复杂度还是够用的,我是用线段树二分来做的。 我们用线段树维护所有位置的最小值,那么我们每次删除一个数之前先求一遍最小值&a…

停车场管理系统文件录入(C++版)

❤️作者主页:微凉秋意 ✅作者简介:后端领域优质创作者🏆,CSDN内容合伙人🏆,阿里云专家博主🏆 文章目录一、案例需求描述1.1、汽车信息模块1.2、普通用户模块1.3、管理员用户模块二、案例分析三…

mysql:使用终端操作数据库

登录进入终端: mysql -u root -p 展示数据库 SHOW DATABASES; 创建数据库: CREATE DATABASE IF NOT EXISTS RUNOOB_TEST DEFAULT CHARSET utf8 COLLATE utf8_general_ci; 1. 如果数据库不存在则创建,存在则不创建。 2. 创建RUNOOB_TEST数据库…

ElasticSearch安装、启动、操作及概念简介

ElasticSearch快速入门 文件链接:https://pan.baidu.com/s/15kJtcHY-RAY3wzpJZIn4-w?pwd0k5a 提取码:0k5a 有些软件对于安装路径有一定的要求,例如:路径中不能有空格,不能有中文,不能有特殊符号&#xf…