spark-sql数据重复之File Output Committer问题

news2025/1/19 10:39:27

前言

 


我们先来回顾下之前介绍过的三种Committer:FileOutputCommitter V1、FileOutputCommitter V2、S3A Committer,其基本代表了整体的演进趋势。

核心代码讲解详细参照:Spark CommitCoordinator 保证数据一致性 OutputCommitter commitTask commitJob mapreduce.fileoutputcommitter.algorithm.version | 技术世界 | committask,commitjob,spark 一致性,mapreduce.fileoutputcommitter.algorithm.version,spark,大数据,集群,消息系统,郭俊 Jason,spark 优化,大数据架构,技术世界

FileOutputCommitter V1,采用两次Commit的方式来保证较强的一致性,每次Commit都对应一次文件的Rename。每个Task先将数据写入到Task的临时目录下,写完后将其Rename到Job的临时目录下;所有Task都完成后,由Job负责将其临时目录下的所有文件Rename到正式目录下,此时文件对外可见。对于HDFS而已,Rename是一个十分高效的操作,然而对于S3这样的对象存储来说,则有着很大的代价。原因在于,S3本身并不是文件系统,不存在Rename操作,一个Rename操作需要分解为List + Copy + Delete操作。因此,对于S3而言,两次Rename有着非常大的性能开销。我们经常发现,Spark UI上看到各个Task已经结束了,但是Job就是迟迟不结束,有点像hang住了,其实就是在做第二次Rename。

Rename机制


  首先来看看Spark写文件的执行方式和可能存在的问题。通常情况下,我们在单机上写文件时,都会生成一个指定文件名的文件,而调用Spark DataFrame的write接口来写文件时,所得到的结果却与此不同。如下图右侧所示,其写入了3个数据文件在指定的路径下。为什么会这样呢?这与Spark的执行方式有关。Spark是分布式计算系统,其RDD中的数据是分散在多个Partition中的,而每个Partiton对应一个Task来执行,这些Task会根据vcores个数来并行执行。在下图的示例中,笔者分配了3个Partition,所以生成了part-00000、part-00001、part-00002三个文件(文件名中间的一段UUID是在job中生成的)。按照这样的执行方式,假设我们直接把数据写入到指定的路径下,会出现哪些问题?

由于是多个Task并行写文件,如何保证所有Task写的所有文件要么同时对外可见,要么同时不可见?在下图示例中,三个Task的写入速度是不同的,那就会导致不同时刻看到的文件个数是不一样的。另外,如果有一个Task执行失败了,就会导致有2个文件残留在这个路径下。
同一个Task可能因为Speculation或者其他极端原因导致某一时刻有多个Task Attempt同时执行,即同一个Task有多个实例同时写相同的数据到相同的文件中,势必会造成冲突。如何保证最终只有一个是成功的并且数据是正确的?

 

为了应对这些问题,尽可能保证数据一致性,Hadoop FileOutputCommitter设计了Rename机制(Spark写文件还是调用Hadoop的相关库来完成的)。Rename机制先后有两个版本:v1和v2,二者在性能和保证数据一致性的粒度上有所区别。
  下图所示为v1的思想,其需要经历两次Rename。每个Task首先将数据写入如下临时路径:

${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}/${fileName}


示例:
hdfs:///data/_temporary/0/_temporary/attempt_20190219101232_0003_m_000005_0/part-00000-936a4f3b-8f71-48ce-960a-e60d3cf4488f.c000.snappy.parquet

  Task写入完成后,执行commitTask做第一次Rename,将文件从Task Attempt的临时目录中移动到Task的临时目录中。

${output.dir.root}/_temporary/${appAttempt}/${task}/${fileName}
示例:
hdfs:///data/_temporary/0/task_20190219101232_0003_m_000005/part-00000-936a4f3b-8f71-48ce-960a-e60d3cf4488f.c000.snappy.parquet

  最后,当所有Task都完成上述操作后,由Driver负责执行commitJob做第二次Rename,依次将文件从每个Task的临时目录中移动到真实目录中,并写入_SUCCESS标识。

${output.dir.root}/${fileName}
示例:
hdfs:///data/part-00000-936a4f3b-8f71-48ce-960a-e60d3cf4488f.c000.snappy.parquet

v1的思想较好的解决了前面提到的问题,基于此,只有在Rename的过程中出问题才可能导致数据一致性问题,然而这种概率相比之前提到的情况要低很多。但是,两次Rename也带来了性能问题,主要表现在:当有大量Task写入时,即使所有Task都完成了,还需要等待很长一段时间Job才能结束,这个时间主要花在Driver端做第二次Rename。例如,在笔者的系统中,每次需要写入1200个文件到S3,平均每个需要花费0.5~1.5秒的时间来做第二次Rename,整体需要花费10-30分钟。

FileOutputCommitter V2,在V1的基础上减去了第二次Rename,即每个Task先将数据写入到Task的临时目录下,写完后直接将其Rename到正式目录中;所有Task都完成后,Job只是写入一个_SUCCESS文件来标识已完成。显然,V2是牺牲一定的一致性来换取性能。因为,如果Spark Job在执行过程中失败了,就会出现部分成功的Task写入的文件对外可见,成为脏数据。S3A Committer,最初由Netflix贡献给社区,采用S3 Multipart Upload机制替换了Rename机制。原因在于,对于S3而言,Rename不仅仅会带来性能问题,还可能因为S3的“最终一致性”特性而失败。社区版的这个Committer,会在每个Task中将数据先写入到本地磁盘,然后采用Multipart Upload方式上传到S3;所有Task都完成后,由Job统一向S3发送Complete信号,此时文件对外可见。

综合来看,使用Spark往S3写入文件时,应该尽量选择基于S3 Multipart Upload机制的Committer。在我们的系统中,主要采用AWS EMR来构建Spark集群,数据写入S3存储。EMR在5.19.0之后引入了EMRFS S3-optimized Committer,同样采用S3 Multipart Upload机制,因此我们会优先使用这个Committer。

V1和V2 commiter版本比较


mapreduce.fileoutputcommitter.algorithm.version 参数对文件输出有很大的影响,下面总结一下两种版本在各方面的优缺点。

1、性能方面
v1在task结束后只是将输出文件拷到临时目录,然后在job结束后才由Driver把这些文件再拷到输出目录。如果文件数量很多,Driver就需要不断的和NameNode做交互,而且这个过程是单线程的,因此势必会增加耗时。如果我们碰到有spark任务所有task结束了但是任务还没结束,很可能就是Driver还在不断的拷文件。

v2在task结束后立马将输出文件拷贝到输出目录,后面Job结束后Driver就不用再去拷贝了。

因此,在性能方面,v2完胜v1。

2、数据一致性方面
v1在Job结束后才批量拷文件,其实就是两阶段提交,它可以保证数据要么全部展示给用户,要么都没展示(当然,在拷贝过程中也无法保证完全的数据一致性,但是这个时间一般来说不会太长)。如果任务失败,也可以直接删了_temporary目录,可以较好的保证数据一致性。

v2在task结束后就拷文件,就会造成spark任务还未完成就让用户看到一部分输出,这样就完全没办法保证数据一致性了。另外,如果任务在输出过程中失败,就会有一部分数据成功输出,一部分没输出的情况。

因此在数据一致性方面,v1完胜v2。

Spark任务写数据到s3,执行时间特别长

场景

目前使用s3替代hdfs作为hive表数据存储,使用spark sql insert数据到hive表,发现一个简单的查询+插入任务,查询+insert的动作显示已经执行完,任务还在跑,直到跑了两个小时后才执行结束。

原因

s3对spark默认的commit操作兼容性不强,spark有两种commit操作,一种是commit task,在executor上执行,一种是commit job,在driver上执行。默认commit策略下,spark在输出数据的时,会先输出到临时目录上,临时目录分task临时目录和job临时目录,默认的commit task操作是将执行成功的task的输出数据从task的临时目录rename到job的临时目录task目录,commit job操作则是driver单线程遍历所有job临时目录下所有task目录并rename到用户指定的输出目录下。driver运行时间长在于单线程rename所有task目录,最后在最终输出的目录加上SUCCESS文件,而s3的rename操作是mv=cp+rm,和hdfs的rename操作不同,效率低下。

解决

一般情况下,我们使用的committer是FileOutputCommitter,在hadoop2.7后,支持新的commit算法,将mapreduce.fileoutputcommitter.algorithm.version设置为2,默认是1,新的commit算法对commit task做了一下改动,不再将task临时目录mv到job的临时目录下,而是直接移动到最终目录下,不需要driver最后再单线程移动一次,commit job操作是在最终目录下直接加上SUCCESS文件即可。简单概括就是单线程mv变多线程mv,新的commit算法提高了性能,但是降低了数据一致性。

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2 


 AWS EMRFS S3-optimized Committer


EMRFS S3-optimized Committer,如AWS官方博客所言,其思想来自于S3A Committer,但是就目前的实现来看,坦白说,个人感觉比较鸡肋。一方面,这是官方出品,虽然不开源,但是相信其内部做了很多跟EMR集成的东西,因此我们通常会默认选择使用;另一方面,它有两个比较大的缺陷。

目前只能对部分符合条件的语法代码生效,比如只能是写Parquet文件,具体可以参考官方文档
它只是在FileOutputCommitter V2的基础上进行了改进,即将V2中的Rename机制替换为了S3 Multipart Upload机制,因此V2存在的数据一致性问题它也存在。

S3 Multipart Upload机制,原本是S3用于支持大文件上传的方法。其将一次文件上传分解为三个动作:

第一步,初始化,向S3申请用于本次上传的Upload ID;
第二步,将大文件分解为多个Part进行上传,此过程中文件对外是不可见的;
第三步,全部Part上传完成后,向S3发送完成信号,S3内部会将多个Part的文件进行合并,之后文件对外可见;又或者发送取消信号,S3会将已上传的文件删除。

基于S3 Multipart Upload机制的Committer便是充分利用这个特性来保证数据的一致性。每个Task都利用Multipart Upload来上传文件,但是有两点不同:第一,只做前两步,即初始化和多Part上传;第二,通常只有1个Part文件。所有Task都成功后,由Driver在Job中统一做第三步,即发送完成信号,之后所有文件对外可见。 

对于第一个缺陷,我们在使用中经常战战兢兢,需要确认是否有效触发了这个Committer。以下面代码为例,我们在Spark Streaming中,每10分钟一个Batch,对数据进行加工处理后写入到S3中。该代码是否触发到了这个Committer呢?通过INFO Log分析来看,是有触发到的。下图上半部分是一个Executor的Log,下半部分是Driver的Log,读者可以参考这个来判断是否有效触发。

df.write.mode("append")\
        .partitionBy("ts_interval", "schema_version") \
        .option("path", s3_path)\
        .saveAsTable(table)
 


3. Job失败带来的数据不一致问题
这里重点探讨一下第二个缺陷,即Job失败带来的数据不一致问题,也是FileOutputCommitter V2存在的问题。如下图,假设一个Job有12个Task,执行过程中Task 0~2成功了,而其他失败了,进而导致Job失败了。此时,在S3上就可以看到前面三个Task写入的文件,当这个Job重做一次时,已经写入的文件的数据就会成为重复的数据。

 


针对这个问题,有哪些解法呢?目前,就我们接触到的而言,主要有三种方法,可以分别应用在不同的业务场景。

第一种,每次写入的目录都带有一个UUID,在整体文件写入成功后,将这个目录分发出去,给下游的Reader使用。比如下面的代码中,seq就扮演着这样的角色。但是,这样做还是会有数据残留在S3中的,只是暂时不会被下游Reader读到而已。

path = "s3://{bucket}/type={type}/ts_interval={ts_interval}/seq={uuid}" \
                    .format(bucket=args["bucket"],
                            type=log_type,
                            ts_interval=ts_interval,
                            uuid=uuid.uuid1())
df.write.parquet(path)

第二种,采用overwrite写入的方式,该方式需要有一个UUID来标识某一批数据,保证该批数据在多次写入时UUID是不变的,而不同批次的数据的UUID是不同的。比如,在Spark Streaming中,每个Batch的数据,就可以使用Batch Timestamp来作为这个UUID。在下面的代码中,就可以达到这个效果,然而遗憾的是,AWS官方文档明确提出了目前这种写法无法触发到EMRFS S3-optimized Committer。

df.write.mode("overwrite")\
        .partitionBy("ts_interval", "schema_version", "ts_batch") \
        .option("path", s3_path)\
        .option("partitionOverwriteMode", "dynamic")
        .saveAsTable(table)

第三种,保持写文件的方式不变,但是在Job失败后,捕获其异常,然后进行一次补偿,即删除掉多余的文件。我们知道,每次commitJob成功后,都会写入一个_SUCCESS文件来标识整体写入成功。如果在这个文件的Last Modified Time之后又有一些新的文件残留在S3上,我们就认为其是脏数据,将其删除。当然,随着数据量的积累,我们不可能检测所有的数据,不过对于数据实时上传的业务而言,只要检测最近一段时间内的数据文件就好了。

df.write.mode("append")\
        .partitionBy("ts_interval", "schema_version") \
        .option("path", s3_path)\
        .saveAsTable(table)
 

4. 残留数据问题
除了上述的问题之外,采用S3 Multipart Upload机制实现的Committer还会存在一些共性的数据残留问题,需要在实践中有所注意。残留的数据主要来自两方面:

每个Task的数据会先写入到本地磁盘,比如上面的“/mnt/s3/emrfs-4425809305170904769/0000000000”,如果Task中断,有可能会有数据残留在本地
S3 Multipart Upload会先将上传的多个Part的文件放在S3的cache隐藏目录,如果Task中断,有可能会有数据残留在S3
对于第一方面,通过脚本监控相应的本地目录的磁盘大小并定期清理掉历史悠久的数据即可,避免磁盘被用爆了。

对于第二方面,残留在S3上的数据虽然对外不可见,但是会被收取存储费用的,因此需要进行相关清理,目前有两种方式:

设置Spark参数fs.s3.multipart.clean.enabled,该方式会启动一个异步进程来定期清理,会有一定的负载压力
在S3中配置相关Policy Lifecyle的属性即可,我们更倾向于这种方式,由S3来负责解决,没有额外开销
<LifecycleConfiguration>
    <Rule>
        <ID>sample-rule</ID>
        <Prefix></Prefix>
        <Status>Enabled</Status>
        <AbortIncompleteMultipartUpload>
          <DaysAfterInitiation>7</DaysAfterInitiation>
        </AbortIncompleteMultipartUpload>
    </Rule>
</LifecycleConfiguration>
 

以上便是当前我们在AWS EMRFS S3-optimized Committer,虽然存在诸多问题,我们还是尽量优先选择使用,毕竟是官方出品的。

结束语


  以上便是Spark写文件的机制,更多细节可以参考阅读源码。至于文章开头提到的失败情况,因为笔者公司的Spark相关业务都部署在AWS EMR上,目前计划通过升级AWS EMR版本来解决。EMR在其5.20.0之后的版本中,默认采用EMRFS S3-optimized Committer,其采用了上述的S3 Multipart Upload机制,当然目前仅支持从DataFrame、SQL中写Parquet文件,不过应该能满足需求了。

参考文献

Spark任务输出文件过程详解_疯狂哈丘的博客-CSDN博客
[1] Hadoop FileOutputCommitter Source Code
[2] Committing work to S3 with the “S3A Committers”
[3] Introducing the S3A Committers
[4] Introduction to S3Guard
[5] Spark 2.0.0 Cluster Takes a Longer Time to Append Data

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/821566.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

集群部署dolphinscheduler踩坑

本文主要总结一下最新版dolphinscheduler3.1.5的安装过程中遇到的坑。 dolphinscheduler启动报错 Exception in thread "Master-Server" org.springframework.beans.factory.BeanCreationException: Error creating bean with name masterServer: Invocation of in…

先进先出法与加权平均法的比较

加权平均法 加权平均的成本核算方法在计算销货成本和期末库存价值时使用每个库存物料的平均成本。企业将使用以下公式计算每个库存单位&#xff08;在特定会计期间内&#xff09;的平均成本&#xff1a; 平均库存成本 &#xff08;所有采购商品的总成本&#xff09;/&#xff…

Matlab Optimization Toolbox中的遗传算法工具包(GA)

matlab optimization 中使用了GA求解器 默认的是小于等于 找到GA 工具包 找到 APP选择 Optimization Tool 选择Solver ga - Genetic Algorithm 应用GA solver 定义适应度函数(Fitness function)与问题约束(Constraints) example one 优化函数 sin(x) 2 * cos(x)极其重要的…

【原创】IPTVC2实现方案(文末有demo)

前言: 名词解释: IPTVC2, 全称: 央视国际节目定价发布接口规范,标准版本当前最新为2.7.12 附赠资源链接&#xff0c;侵删:规范 规范中提供的样例&#xff0c;实现基于axis1.4(2006的时代宠物) 基于axis1版本的实现参考: Spring boot 集成Axis1.4 &#xff0c;使用wsdd文件发…

C语言每日一题:12《数据结构》相交链表。

题目&#xff1a; 题目链接 思路一&#xff1a; 1.如果最后一个节点相同说明一定有交点。 2.使用两个循环获取一下长度&#xff0c;同时可以获取到尾节点。 3。注意初始化lenA和lenB为1&#xff0c;判断下一个节点是空是可以保留尾节点的。长度会少一个&#xff0c;尾节点没有…

【C++修炼之路】多态

&#x1f451;作者主页&#xff1a;安 度 因 &#x1f3e0;学习社区&#xff1a;StackFrame &#x1f4d6;专栏链接&#xff1a;C修炼之路 文章目录 一、概念二、定义和实现1、虚函数2、虚函数的重写3、多态的构成条件4、重写的例外5、C11 override 和 final6、不能被继承的类7…

RxJava异步编程初探

RxJava 其实就是提供一套异步编程的 API&#xff0c;这套 API 是基于观察者模式的&#xff0c;而且是链式调用的&#xff0c;所以使用 RxJava 编写的代码的逻辑会非常简洁。 RxJava 有以下三个基本的元素&#xff1a; 被观察者&#xff08;Observable&#xff09;观察者&…

prometheus+grafana进行服务器资源监控

在性能测试中&#xff0c;服务器资源是值得关注一项内容&#xff0c;目前&#xff0c;市面上已经有很多的服务器资 源监控方法和各种不同的监控工具&#xff0c;方便在各个项目中使用。 但是&#xff0c;在性能测试中&#xff0c;究竟哪些指标值得被关注呢&#xff1f; 监控有…

SqlSugar、Freesql、Dos.ORM、EF、四种ORM框架的对比

SqlSugar、Freesql、Dos.ORM、EF、四种ORM框架的对比 一、默认情况下,导航属性是延迟查询; 答:ORM(Object-relational mapping)即对象关系映射,是一种为了解决面向对象与关系数据库存在的互不匹配的现象的技术。也就是说,ORM是通过使用描述对象和数据库之间映射的元数据…

线程状态

从卖包子的案例学习进程间的通信 public class Test {public static void main(String[] args) {Object objnew Object();Thread th1new Thread(){Overridepublic void run() {synchronized (obj){System.out.println("来三个包子&#xff01;");try {obj.wait(); /…

IDEA删除本地git仓库、创建本地git仓库、关联其他仓库并上传

IDEA删除本地git仓库、创建本地git仓库、关联其他仓库并上传 删除本地Git仓库 创建本地Git仓库 关联其他仓库并上传 要在IntelliJ IDEA中删除本地Git仓库并创建新的本地Git仓库&#xff0c;以及关联其他仓库并上传&#xff0c;请按照以下步骤进行操作&#xff1a; 删除本地G…

【笔记】数据结构与算法 python-03-列表查找

列表查找 在一个数据结构中&#xff0c;通过一定的方法找出与给定关键字相同的数据元素的过程。 列表查找&#xff08;线性表查找&#xff09;&#xff1a;从列表&#xff08;一种线性数据结构&#xff0c;元素按照一定的顺序存储&#xff0c;每个元素都有一个唯一的位置索引…

网络出口技术中的单一出口网络结构,你会用吗?

我们在设计一个园区网络的时候&#xff0c;园区网络的出口需要和运营商的网络进行对接&#xff0c;从而提供internet服务。 在和运营商网络对接的时候&#xff0c;一般采用如下3终方式&#xff1a; 单一出口网络结构 1、网络拓扑 终端用户接入到交换机&#xff0c;交换机直…

干货 ,ChatGPT 4.0插件Review Reader,秒杀一切选品神器

Hi! 大家好&#xff0c;我是专注于AI项目实战的赤辰&#xff0c;今天继续跟大家介绍另外一款GPT4.0插件Review Reader&#xff08;评论阅读器&#xff09;。 做电商领域的小伙伴们&#xff0c;都知道选品分析至关重要&#xff0c;可以说选品决定成败&#xff0c;它直接关系到产…

MySQL用通配符过滤数据

简单的不使用通配符过滤数据的方式使用的值都是已知的&#xff0c;但是当搜索产品名中包含ashui的所有产品时&#xff0c;用简单的比较操作符肯定不行&#xff0c;必须使用通配符。利用通配符可以创建比较特定数据的搜索模式。 通配符&#xff1a;用来匹配值的一部分的特殊字符…

【数据分享】1999—2021年地级市市政公用事业和邮政、电信业发展情况相关指标(Excel/Shp格式)

在之前的文章中&#xff0c;我们分享过基于2000-2022年《中国城市统计年鉴》整理的1999-2021年地级市的人口相关数据、各类用地面积数据、污染物排放和环境治理相关数据、房地产投资情况和商品房销售面积、社会消费品零售总额和年末金融机构存贷款余额、地方一般公共预算收支状…

减轻 PWM 的滤波要求

经典脉宽调制器 (PWM) 发出 H 个连续逻辑高电平&#xff08;1&#xff09;&#xff0c;后跟 L 个连续逻辑低电平&#xff08;0&#xff09;的重复序列。每个高电平和低电平持续一个时钟周期 T 1/F (Hz)。结果的占空比可定义为 H/N&#xff0c;其中 N HL 时钟周期。N 通常是 2…

keil固件库的安装 库函数的配置

文章目录&#xff1a; 第一步&#xff1a;下载固件库文件 第二步&#xff1a;创建一个新的文件夹&#xff0c;用来保存固件库文件。在该文件夹下新建文件夹&#xff1a;CMSIS、Lib、Startup、User 第三步&#xff1a;把库文件中文件放入我们建立对应的文件中 1.CMSIS模块 …

【MIPI协议 C-PHY详解】

MIPI协议 C-PHY详解 前言一、C-PHY介绍1.1 C-PHY 与 D-PHY wire的区别1.2 3 wire的状态&#xff08;states&#xff09;变化1.3 C-PHY Data Mapping Between 7 Symbols and a 16-Bit Data1.3 C-PHY Lane States and Line Levels ~ LP Mode 二、C-PHY LP Package Format2.1 C-PH…

SQLI_LABS攻击

目录 Less1 首先来爆字段 联合注入 判断注入点 爆数据库名 爆破表名 information_schema information_schmea.tables group_concat() 爆破列名 information_schema.columns 爆值 SQLMAP Less-2 -4 Less -5 布尔 数据库 表名 字段名 爆破值 SQLMAP Less-6 …