Merge-On-Read

news2025/1/9 14:42:53

基本介绍

Iceberg的Merge-On-Read

Merge-On-Read,顾名思义,就是在读取的时候进行合并,是与Copy-On-Write相反的一种模式

在Iceberg中,Merge-On-Read同样用于行级更新,整体过程如下

当更新数据时,Iceberg不再Copy一份旧数据,而是直接将更新数据写入独立的一个文件用来标识需要删除的数据

这种模式减少了写入时的合并操作,但是加重了读取数据时的合并操作,因此适合写多读少的场景

Merge-On-Read类型

Iceberg的Merge-On-Read有两种类型,对应使用两种不同的方法来定位需要删除的数据,分别是:EqualityDelete和PositionDelete

EqualityDelete:等值删除。这种模式下,delete文件记录的内容跟数据文件一样,是行级的数据。进行读取合并时,使用指定的列做等值判断,进行数据的删除合并

PositionDelete:位置删除。这种模式下,delete文件记录的内容是需要删除的数据位置,即数据文件地址和数据行号。进行读取合并时,基于数据地址进行删除合并

目前Iceberg没有进行EqualityDelete和PositionDelete选择的配置,使用哪种模式就看API实现了哪个接口。比如配置Spark使用Merge-On-Read时,使用的就是PositionDelete

case MERGE_ON_READ:

    return new SparkPositionDeltaOperation(spark, table, branch, info, isolationLevel);

测试表现

执行如下测试

CREATE TABLE icedb.morTable (

id bigint COMMENT 'unique id',

data string)

USING iceberg TBLPROPERTIES ('format-version'='2', 'write.update.mode' = 'merge-on-read');

insert into icedb.morTable values (1, 'name1'),(99, 'name99');

insert into icedb.morTable values (2, 'name1'),(88, 'name88');

update icedb.morTable set data = 'update' WHERE id =1;
  1. snapshot:与Copy-On-Write表现一致
  2. manifest-list:这里存在差异,产生了一条content=1的数据,也就是被标记为deletes,其他三条数据都是content=0
  3. manifest:这里总共四个manifest(2条insert分别产生一个,一个update产生两个),这里四个manifest的status都为1(0=existing、1=added、2=deleted);注意的是data_file中的content字段,指向deletes文件的是1,其他为0(0=data, 1=position deletes, 2=equality deletes)
  4. datafile:主要关注update产生的两个文件,一个是正常的数据文件,存的是update后的数据(1, 'update' );另一个是position delete文件,存的是需要删除的文件地址和行号{"file_path": "hdfs://nameservice/spark/icedb.db/morTable/data/00000-0-52cbb7ca-6a2b-4f87-a2ba-fefa2470d5b8-00001.parquet", "pos": 0}

Spark Update更新流程

整体流程与Copy-On-Write是一样的,两者都是Iceberg用来做行级更新的,只是具体的实现过程不一样

扫描过程

第一步同样是做旧数据的扫描,根据update传入的where条件扫描数据

不同的是,构建BatchScan扫描器的时候,Merge-On-Read比Copy-On-Write少了一项设置:ignoreResiduals()

ignoreResiduals()的作用在Copy-On-Write中已经介绍过,是设置扫描过滤条件只应用到文件。也就是说,设置了此项的Copy-On-Write扫描的时候返回的是文件粒度的数据,会把不需要更新的数据也读出来重写一遍;而没有设置此项的Merge-On-Read只读取完全匹配的数据,并不会读取不需要更新的数据行来进行重写

这个的具体应用是在文件扫描任务BaseFileScanTask当中,构建的时候会设置这个参数

BaseFileScanTask baseFileScanTask =

    new BaseFileScanTask(

         dataFile, deleteFiles, ctx.schemaAsString(), ctx.specAsString(), ctx.residuals());

RewriteUpdateTable

Merge-On-Read走buildWriteDeltaPlan分支,生成的计划树如下

WriteIcebergDelta RelationV2[id#0L, data#1] spark_catalog.icedb.morTable spark_catalog.icedb.morTable

+- UpdateRows[__row_operation#8, id#9L, data#10, _file#11, _pos#12L, _spec_id#13, _partition#14]

   +- Filter (id#0L = cast(1 as bigint))

      +- RelationV2[id#0L, data#1, _file#4, _pos#5L, _spec_id#2, _partition#3] spark_catalog.icedb.morTable spark_catalog.icedb.morTable

RowLevelCommandScanRelationPushDown

这步走第一个分支,rewritePlan为WriteIcebergDelta的分支,最终的计划树为

WriteIcebergDelta RelationV2[id#105L, data#106] spark_catalog.icedb.morTable spark_catalog.icedb.morTable

+- UpdateRows[__row_operation#113, id#114L, data#115, _file#116, _pos#117L, _spec_id#118, _partition#119]

   +- Project [id#105L, data#106, _file#109, _pos#110L, _spec_id#107, _partition#108]

      +- Filter (isnotnull(id#105L) AND (id#105L = 1))

         +- RelationV2[id#105L, data#106, _file#109, _pos#110L, _spec_id#107, _partition#108] spark_catalog.icedb.morTable

ExtendedV2Writes

这一步走的是WriteIcebergDelta分支,最终的计划树为

UpdateIcebergTable [assignment(id#0L, id#0L), assignment(data#1, update2)], (id#0L = 1)

:- RelationV2[id#0L, data#1] spark_catalog.icedb.morTable spark_catalog.icedb.morTable

+- WriteIcebergDelta

   +- UpdateRows[__row_operation#8, id#9L, data#10, _file#11, _pos#12L, _spec_id#13, _partition#14]

      +- Project [id#0L, data#1, _file#4, _pos#5L, _spec_id#2, _partition#3]

         +- Filter (isnotnull(id#0L) AND (id#0L = 1))

            +- RelationV2[id#0L, data#1, _file#4, _pos#5L, _spec_id#2, _partition#3] spark_catalog.icedb.morTab

WriteDeltaExec

最后同样进入ExtendedDataSourceV2Strategy,此处走WriteIcebergDelta分支,生成WriteDeltaExec(3.4这个类Spark集成了,所以不在Iceberg里了

WriteDeltaExec与ReplaceDataExec一样是V2ExistingTableWriteExec的子类,不同的是WriteDeltaExec里对writingTask进行了特殊的实现,分别是DeltaWritingSparkTask和DeltaWithMetadataWritingSparkTask

两个writingTask的实现类基本相同,核心逻辑是根据不同的操作类型(insert、update、delete)调用DeltaWriter的对应接口

这里的DeltaWriter就是前面SparkPositionDeltaOperation创建的SparkPositionDeltaWrite,目前测试看update语句走的是insert+delete的流程

insert就是正常的流程,SparkPositionDeltaWrite的delete接口接收数据,这个数据是包含两个字段filePath和position,最终把这个字段写入delete类型的文件当中,写delete文件的时候在result的字段会记录delete列表,最终commit提交的时候就有对应关系

看前面的计划树,返回的数据schema当中包含了(_file#4, _pos#5L),这个就是delete这边数据的来源

定位信息

前面说过,最终写入delete文件的是file、pos两个字段,这两个字段在构建计划树的时候已经放在了schema上,来源是RewriteUpdateTable.buildWriteDeltaPlan,也就是RewriteUpdateTable的SupportsDelta分支

分支会重写计划,重写计划的时候有一步resolveRowIdAttrs,这一步会提取相应字段,让最终的数据源返回上加上file和position信息

读取数据

首先在创建ManifestGroup的时候,会去构建deleteIndexBuilder,deleteIndexBuilder用于查询标记为delete的文件,其输入是deleteManifests。

Iceberg元数据可以单独获取对应的类型的Manifest,具体操作在build的时候,build当中会单独对EqualityDelete和PositionDelete做操作,操作相同,只是找的文件类型不同

最后在createFileScanTasks构建扫描任务的时候,会把DeleteFile信息放入扫描任务

最终生效在读取文件的时候。以Spark为例,在BatchDataReader当中,基于上诉生成的扫描任务,首先获取要读取的文件,然后获取对应的delete文件封装SparkDeleteFilter,最终传入reader函数

真正生效的地方在ColumnarBatchReader当中,调用读取的时候会略过传入的delete对应的点,这样就只读取有效函数,忽略delete数据

具体的逻辑是在spark里,首先传入delete文件当中的position单独读取要删除的数据;然后在spark的逻辑里,从数据集当中把读到的这个数据给删除掉,在BufferedRowIterator的next当中,有删除逻辑

public InternalRow next() {

    return currentRows.remove();

  }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1938960.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

巴黎奥运会倒计时 一个非常不错的倒计时提醒

巴黎奥运会还有几天就要开幕了,大家应该到处都可以看到巴黎奥运会的倒计时,不管是电视上,还是网络里,一搜索奥运会,就会看到。倒计时其实是一个我们在生活中很常用的一个方法,用来做事情的提醒,…

【学习笔记】无人机系统(UAS)的连接、识别和跟踪(九)-无人机区域地面探测与避让(DAA)

引言 3GPP TS 23.256 技术规范,主要定义了3GPP系统对无人机(UAV)的连接性、身份识别、跟踪及A2X(Aircraft-to-Everything)服务的支持。 3GPP TS 23.256 技术规范: 【免费】3GPPTS23.256技术报告-无人机系…

【CSS in Depth 2 精译_020】3.3 元素的高度

当前内容所在位置(可进入专栏查看其他译好的章节内容) 第一章 层叠、优先级与继承(已完结) 1.1 层叠1.2 继承1.3 特殊值1.4 简写属性1.5 CSS 渐进式增强技术1.6 本章小结 第二章 相对单位(已完结) 2.1 相对…

Linux:Linux进程概念

目录 前言 1. 冯诺依曼体系结构 2. 操作系统 2.1 什么是操作系统 3. 进程 3.1 基本概念 3.2 描述进程——PCB 3.3 进程和程序的区别 3.4 task_struct-PCB的一种 3.5 task_struct的内容分类 4. 查看进程 4.1 通过系统文件查看进程 4.2 通过ps指令查看进程 4.3 …

Redis7(二)Redis持久化双雄

持久化之RDB RDB的持久化方式是在指定时间间隔,执行数据集的时间点快照。也就是在指定的时间间隔将内存中的数据集快照写入磁盘,也就是Snapshot内存快照,它恢复时再将硬盘快照文件直接读回到内存里面。 RDB保存的是dump.rdb文件。 自动触发…

记录些Spring+题集(3)

百万QPS下热点数据的收集方案 在高并发场景下,如京东、淘宝的秒杀活动开始时候,会有很多的用户同时抢购秒杀商品,由于同一个场次成百上千种商品参与秒杀活动,但是热点的商品往往就只有那么几十个左右,此时系统的90%的…

linux桌面运维---第八天

1、rm命令: 用法:删除一个文件或者目录。 语法:rm [选项] name... 选项: -f 即使原档案属性设为唯读,亦直接删除,无需逐一确认。-r 将目录及以下之档案亦逐一删除。需要进行一一确认 2、ln命令&#…

Lamp 小白菜鸟从入门到精通

前言 “LAMP包”的脚本组件中包括了CGIweb接口,它在90年代初期变得流行。这个技术允许网页浏览器的用户在服务器上执行一个程序,并且和接受静态的内容一样接受动态的内容。程序员使用脚本语言来创建这些程序因为它们能很容易有效的操作文本流&#xff0…

SpringBoot限制请求访问次数

本篇文章的主要内容是SpringBoot怎么限制请求访问次数。 当我们的服务端程序部署到服务器上后,就要考虑很多关于安全的问题。总会有坏人来攻击你的服务,比如说会窃取你的数据或者给你的服务器上强度。关于给服务器上强度,往往就有高强度给服务…

【ffmpeg入门】安装CUDA并使用gpu加速

文章目录 前言CUDACUDA是什么CUDA 的主要组成部分CUDA 的优点CUDA 的基本编程模型安装CUDA ffmpeg使用gpu加速为什么需要使用gpu加速1. 提高处理速度2. 减少 CPU 负载3. 提高实时处理能力4. 支持高分辨率和复杂编码格式5. 提供更好的可扩展性6. 提高能效 ffmpeg使用gpu加速常用…

在项目中加入 husky + lint-staged + eslint,代码检测格式化

背景 由于日常工作的多人协作中,会因为个人代码编写风格导致代码在不同人电脑上,会有异常代码格式的提示,为了解决这个小问题,我们可以使用 husky lint-staged 来对代码进行一定程度上的格式化,使格式风格统一&#x…

初谈Linux信号-=-信号的产生

文章目录 概述从生活角度理解信号Linux中信号信号常见的处理方式理解信号的发送与保存 信号的产生core、term区别 概述 从生活角度理解信号 你在网上买了很多件商品,再等待不同商品快递的到来。但即便快递没有到来,你也知道快递来临时, 你该…

【 DHT11 温湿度传感器】使用STC89C51读取发送到串口、通过时序图编写C语言

文章目录 DHT11 温湿度传感器概述接线数据传送通讯过程时序图检测模块是否存在 代码实现总结对tmp tmp << 1;的理解对sendByte(datas[0]/10 0x30);的理解 DHT11 温湿度传感器 使用80C51单片机通过读取HDT11温湿度传感的数据&#xff0c;发送到串口。 通过时序图编写相应…

Windows下lapack的编译

文章目录 LAPACK库LAPACK库的地址LAPACK库的安装和编译 LAPACK库 LAPACK&#xff08;Linear Algebra PACKage&#xff09;库&#xff0c;是用Fortran语言编写的线性代数计算库&#xff0c;包含线性方程组求解&#xff08;AXb&#xff09;、矩阵分解、矩阵求逆、求矩阵特征值、…

javac详解 idea maven内部编译原理 自制编译器

起因 不知道大家在开发中&#xff0c;有没有过下面这些疑问。有的话&#xff0c;今天就一次解答清楚。 如何使用javac命令编译一个项目&#xff1f;java或者javac的一些参数到底有什么用&#xff1f;idea或者maven是如何编译java项目的&#xff1f;&#xff08;你可能猜测底层…

4核16G服务器支持多少人?4C16G服务器性能测评

租赁4核16G服务器费用&#xff0c;目前4核16G服务器10M带宽配置70元1个月、210元3个月&#xff0c;那么能如何呢&#xff1f;配置为ECS经济型e实例4核16G、按固定带宽10Mbs、100GB ESSD Entry系统盘。 那么问题来了&#xff0c;4C16G10M带宽的云服务器可以支持多少人同时在线&…

【2024最新华为OD-C/D卷试题汇总】[支持在线评测] LYA的生日派对座位安排(200分) - 三语言AC题解(Python/Java/Cpp)

🍭 大家好这里是清隆学长 ,一枚热爱算法的程序员 ✨ 本系列打算持续跟新华为OD-C/D卷的三语言AC题解 💻 ACM银牌🥈| 多次AK大厂笔试 | 编程一对一辅导 👏 感谢大家的订阅➕ 和 喜欢💗 🍿 最新华为OD机试D卷目录,全、新、准,题目覆盖率达 95% 以上,支持题目在线…

原神运行闪退报错怎么办解决办法 crossover24运行exe游戏程序错误没反应

对于Mac用户来说&#xff0c;能够在Mac上体验Windows游戏是一种极大的便利。然而&#xff0c;当尝试运行热门游戏《原神》时&#xff0c;一些用户可能会遇到闪退或报错的问题。这不仅影响了游戏体验&#xff0c;也让人倍感挫败。幸运的是&#xff0c;有多种方法可以帮助解决这些…

优化德育评估流程:智慧职校的考核类型设置功能

智慧职校德育管理系统的引入标志着教育领域向着更加智能化、高效化方向迈进的重要一步。其中&#xff0c;考核类型设置功能作为系统的核心模块之一&#xff0c;扮演着不可或缺的角色。这一功能的创新之处在于它赋予了学校前所未有的灵活性和自主权&#xff0c;让教育者能够根据…

PyTorch的自动微分模块【含梯度基本数学原理详解】

文章目录 1、简介1.1、基本概念1.2、基本原理1.2.1、自动微分1.2.2、梯度1.2.3、梯度求导1.2.4、梯度下降法1.2.5、张量梯度举例 1.3、Autograd的高级功能 2、梯度基本计算2.1、单标量梯度2.2、单向量梯度的计算2.3、多标量梯度计算2.4、多向量梯度计算 3、控制梯度计算4、累计…