spark-sql写入对象存储路径不存在问题(异常路径自动消失)

news2024/11/25 13:26:49

一、问题分析 

1, 环境
        spark3.2  hadoop3.2.2

2, 问题现象
        insert overwrite table到hive表时,出现路径不存在的报错,导致任务失败。

        当表的路径在hdfs上时,没有问题。 表的路径在对象存储上时会有问题。

        insert overwrite table带上分区路径也是没问题的。    

        具体报错:

org.apache.spark.sql.AnalysisException: Path does not exist: s3a://xxxx/hive/warehouse/tablename
                 at org.apache.spark.sql.errors.QueryCompilationErrors$.dataPathNotExistError(QueryCompilationErrors.scala:978)
                 at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:780)
                 at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:777)


3,问题原因分析
  分析全量任务日志后发现,spark am失败了两次。

第一次失败是因为task中数据转换异常导致,属于业务异常。 但是,yarn的容错机制将spark am第二次启动起来的时候,报了 Path does not exist的异常。

        在spark inseroverwrite的执行逻辑中,是先将表的路径删除,然后再写入。并且删除路径的动作,是在driver侧完成的。 表路径的再次创建是在task执行的时候做的。 所以该问题就是为什么task没有再次创建对象存储的表路径。首先梳理下task的写入逻辑(默认使用fileoutcommitter的1.0版本的算法),task写入数据前,会建临时目录,将数据写入到临时目录。

        当一个task成功执行完毕的时候,通过commiter的commit task逻辑,将数据从临时目录转移到中间目录。  当所有的task都成功完成后,driver会调用commiter的commit job逻辑,将中间目录的数据转移到最终路径,也就是表路径下。需要注意的是,上述流程中涉及到的目录或者路径,如果不存在的话,会自动创建。

        对于表路径在hdfs上的表,临时目录,中间目录,都在表路径之下。所以就算task有业务异常,临时目录在异常发生前已经创建了,表的路径同时也被创建了。 当am第二次执行的时候,是不会报 Path does not exist错误的。

        对于表路径在对象存储的表,临时目录为本地目录,类似于/xx/xxx这种的。所以task有业务异常,没有创建表路径。 为什么临时目录为本地目录,因为该任务使用s3a协议去commit task和commit job。

        所以问题就在于s3a协议上。

4,问题解决方案
        不使用s3a协议去执行insert overwrite table。

set spark.sql.sources.commitProtocolClass=org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol;
set spark.sql.sources.outputCommitterClass=;


        注意:不实用s3a协议的话,会严重影响commit性能,因为对象存储的mv或rename,都是先copy再delete。

3.parquet文件写入优化

set spark.sql.parquet.fs.optimized.committer.optimization-enabled=true;

FileOutputCommitter的实际commitTask细节和参数 mapreduce.fileoutputcommitter.algorithm.version 有关(默认值是1)。

mapreduce.fileoutputcommitter.algorithm.version=1时:

commit的操作是将 ${output.dir}/_temporary/${appAttemptId}/_temporary/${taskAttemptId} 重命名为 ${output.dir}/_temporary/${appAttemptId}/${taskId}

mapreduce.fileoutputcommitter.algorithm.version=2时:

commit的操作是将 ${output.dir}/_temporary/${appAttemptId}/_temporary/${taskAttemptId} 下的文件移动到 ${output.dir} 目录下 (也就是最终的输出目录)

spark任务可以通过设置spark配置 spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2来开启版本2的commit逻辑
在hadoop 2.7.0之前,FileOutputCommitter的实现没有区分版本,统一都是使用version=1的commit逻辑。因此如果spark的hadoop依赖包版本如果低于2.7.0,设置mapreduce.fileoutputcommitter.algorithm.version=2是没有用的

二、commit 原理详细介绍

Spark 输出数据到 HDFS 时,需要解决如下问题:

  • 由于多个 Task 同时写数据到 HDFS,如何保证要么所有 Task 写的所有文件要么同时对外可见,要么同时对外不可见,即保证数据一致性
  • 同一 Task 可能因为 Speculation 而存在两个完全相同的 Task 实例写相同的数据到 HDFS中,如何保证只有一个 commit 成功
  • 对于大 Job(如具有几万甚至几十万 Task),如何高效管理所有文件

commit 原理

本文通过 Local mode 执行如下 Spark 程序详解 commit 原理

sparkContext.textFile("/json/input.zstd")
  .map(_.split(","))
  .saveAsTextFile("/jason/test/tmp")

在详述 commit 原理前,需要说明几个述语

  • Task,即某个 Application 的某个 Job 内的某个 Stage 的一个 Task
  • TaskAttempt,Task 每次执行都视为一个 TaskAttempt。对于同一个 Task,可能同时存在多个 TaskAttemp
  • Application Attempt,即 Application 的一次执行

在本文中,会使用如下缩写

  • ${output.dir.root} 即输出目录根路径
  • ${appAttempt} 即 Application Attempt ID,为整型,从 0 开始
  • ${taskAttemp} 即 Task Attetmp ID,为整型,从 0 开始

检查 Job 输出目录

在启动 Job 之前,Driver 首先通过 FileOutputFormat 的 checkOutputSpecs 方法检查输出目录是否已经存在。若已存在,则直接抛出 FileAlreadyExistsException

Driver执行setupJob

Job 开始前,由 Driver(本例使用 local mode,因此由 main 线程执行)调用 FileOuputCommitter.setupJob 创建 Application Attempt 目录,即 output.dir.root/temporary/xxxx/{appAttempt}

Task执行setupTask

由各 Task 执行 FileOutputCommitter.setupTask 方法(本例使用 local mode,因此由 task 线程执行)。该方法不做任何事情,因为 Task 临时目录由 Task 按需创建。

按需创建 Task 目录

本例中,Task 写数据需要通过 TextOutputFormat 的 getRecordWriter 方法创建 LineRecordWriter。而创建前需要通过 FileOutputFormat.getTaskOutputPath设置 Task 输出路径,即 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}/${fileName}。该 Task Attempt 所有数据均写在该目录下的文件内

检查是否需要 commit

Task 执行数据写完后,通过 FileOutputCommitter.needsTaskCommit 方法检查是否需要 commit 它写在 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt} 下的数据。

检查依据是 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt} 目录是否存在

如果需要 commit,并且开启了 Output commit coordination,还需要通过 RPC 由 Driver 侧的 OutputCommitCoordinator 判断该 Task Attempt 是否可以 commit

之所以需要由 Driver 侧的 CommitCoordinator 判断是否可以 commit,是因为可能由于 speculation 或者其它原因(如之前的 TaskAttemp 未被 Kill 成功)存在同一 Task 的多个 Attemp 同时写数据且都申请 commit 的情况。

CommitCoordinator

当申请 commitTask 的 TaskAttempt 为失败的 Attempt,则直接拒绝

若该 TaskAttempt 成功,并且 CommitCoordinator 未允许过该 Task 的其它 Attempt 的 commit 请求,则允许该 TaskAttempt 的 commit 请求

若 CommitCoordinator 之前已允许过该 TaskAttempt 的 commit 请求,则继续同意该 TaskAttempt 的 commit 请求,即 CommitCoordinator 对该申请的处理是幂等的。

若该 TaskAttempt 成功,且 CommitCoordinator 之前已允许该 Task 的其它 Attempt 的 commit 请求,则直接拒绝当前 TaskAttempt 的 commit 请求

OutputCommitCoordinator 为了实现上述功能,为每个 ActiveStage 维护一个如下 StageState

private case class StageState(numPartitions: Int) {
  val authorizedCommitters = Array.fill[TaskAttemptNumber](numPartitions)(NO_AUTHORIZED_COMMITTER)
  val failures = mutable.Map[PartitionId, mutable.Set[TaskAttemptNumber]]()
  }

该数据结构中,保存了每个 Task 被允许 commit 的 TaskAttempt。默认值均为 NO_AUTHORIZED_COMMITTER

同时,保存了每个 Task 的所有失败的 Attempt

commitTask

当 TaskAttempt 被允许 commit 后,Task (本例由于使用 local model,因此由 task 线程执行)会通过如下方式 commitTask。

当 mapreduce.fileoutputcommitter.algorithm.version 的值为 1 (默认值)时,Task 将 taskAttemptPath 即 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt} 重命令为 committedTaskPath 即 ${output.dir.root}/_temporary/${appAttempt}/${taskAttempt}

若 mapreduce.fileoutputcommitter.algorithm.version 的值为 2,直接将taskAttemptPath 即 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt} 内的所有文件移动到 outputPath 即 ${output.dir.root}/

commitJob

当所有 Task 都执行成功后,由 Driver (本例由于使用 local model,故由 main 线程执行)执行 FileOutputCommitter.commitJob

若 mapreduce.fileoutputcommitter.algorithm.version 的值为 1,则由 Driver 单线程遍历所有 committedTaskPath 即 ${output.dir.root}/_temporary/${appAttempt}/${taskAttempt},并将其下所有文件移动到 finalOutput 即 ${output.dir.root} 下

若 mapreduce.fileoutputcommitter.algorithm.version 的值为 2,则无须移动任何文件。因为所有 Task 的输出文件已在 commitTask 内被移动到 finalOutput 即 ${output.dir.root} 内

所有 commit 过的 Task 输出文件移动到 finalOutput 即 ${output.dir.root} 后,Driver 通过 cleanupJob 删除 ${output.dir.root}/_temporary/ 下所有内容

recoverTask

上文所述的 commitTask 与 commitJob 机制,保证了一次 Application Attemp 中不同 Task 的不同 Attemp 在 commit 时的数据一致性

而当整个 Application retry 时,在之前的 Application Attemp 中已经成功 commit 的 Task 无须重新执行,其数据可直接恢复

恢复 Task 时,先获取上一次的 Application Attempt,以及对应的 committedTaskPath,即 ${output.dir.root}/_temporary/${preAppAttempt}/${taskAttempt}

若 mapreduce.fileoutputcommitter.algorithm.version 的值为 1,并且 preCommittedTaskPath 存在(说明在之前的 Application Attempt 中该 Task 已被 commit 过),则直接将 preCommittedTaskPath 重命名为 committedTaskPath

若 mapreduce.fileoutputcommitter.algorithm.version 的值为 2,无须恢复任何数据,因为在之前 Application Attempt 中 commit 过的 Task 的数据已经在 commitTask 中被移动到 ${output.dir.root} 中

abortTask

中止 Task 时,由 Task 调用 FileOutputCommitter.abortTask 方法删除 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}

abortJob

中止 Job 由 Driver 调用 FileOutputCommitter.abortJob 方法完成。该方法通过 FileOutputCommitter.cleanupJob 方法删除 ${output.dir.root}/_temporary

总结

V1 vs. V2 committer 过程

V1 committer(即 mapreduce.fileoutputcommitter.algorithm.version 的值为 1),commit 过程如下

  • Task 线程将 TaskAttempt 数据写入 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}
  • commitTask 由 Task 线程将 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt} 移动到 ${output.dir.root}/_temporary/${appAttempt}/${taskAttempt}
  • commitJob 由 Driver 单线程依次将所有 ${output.dir.root}/_temporary/${appAttempt}/${taskAttempt} 移动到 ${output.dir.root},然后创建 _SUCCESS 标记文件
  • recoverTask 由 Task 线程将 ${output.dir.root}/_temporary/${preAppAttempt}/${preTaskAttempt} 移动到 ${output.dir.root}/_temporary/${appAttempt}/${taskAttempt}

V2 committer(即 mapreduce.fileoutputcommitter.algorithm.version 的值为 2),commit 过程如下

  • Task 线程将 TaskAttempt 数据写入 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}
  • commitTask 由 Task 线程将 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt} 移动到 ${output.dir.root}
  • commitJob 创建 _SUCCESS 标记文件
  • recoverTask 无需任何操作

V1 vs. V2 committer 性能对比

V1 在 Job 执行结束后,在 Driver 端通过 commitJob 方法,单线程串行将所有 Task 的输出文件移动到输出根目录。移动以文件为单位,当 Task 个数较多(大 Job,或者小文件引起的大量小 Task),Name Node RPC 较慢时,该过程耗时较久。在实践中,可能因此发生所有 Task 均执行结束,但 Job 不结束的问题。甚至 commitJob 耗时比 所有 Task 执行时间还要长

而 V2 在 Task 结束后,由 Task 在 commitTask 方法内,将自己的数据文件移动到输出根目录。一方面,Task 结束时即移动文件,不需等待 Job 结束才移动文件,即文件移动更早发起,也更早结束。另一方面,不同 Task 间并行移动文件,极大缩短了整个 Job 内所有 Task 的文件移动耗时

V1 vs. V2 committer 一致性对比

V1 只有 Job 结束,才会将数据文件移动到输出根目录,才会对外可见。在此之前,所有文件均在 ${output.dir.root}/_temporary/${appAttempt} 及其子文件内,对外不可见。

当 commitJob 过程耗时较短时,其失败的可能性较小,可认为 V1 的 commit 过程是两阶段提交,要么所有 Task 都 commit 成功,要么都失败。

而由于上文提到的问题, commitJob 过程可能耗时较久,如果在此过程中,Driver 失败,则可能发生部分 Task 数据被移动到 ${output.dir.root} 对外可见,部分 Task 的数据未及时移动,对外不可见的问题。此时发生了数据不一致性的问题

V2 当 Task 结束时,立即将数据移动到 ${output.dir.root},立即对外可见。如果 Application 执行过程中失败了,已 commit 的 Task 数据仍然对外可见,而失败的 Task 数据或未被 commit 的 Task 数据对外不可见。也即 V2 更易发生数据一致性问题

三、V1和V2 commiter版本比较

mapreduce.fileoutputcommitter.algorithm.version 参数对文件输出有很大的影响,下面总结一下两种版本在各方面的优缺点。

1、性能方面

v1在task结束后只是将输出文件拷到临时目录,然后在job结束后才由Driver把这些文件再拷到输出目录。如果文件数量很多,Driver就需要不断的和NameNode做交互,而且这个过程是单线程的,因此势必会增加耗时。如果我们碰到有spark任务所有task结束了但是任务还没结束,很可能就是Driver还在不断的拷文件。

v2在task结束后立马将输出文件拷贝到输出目录,后面Job结束后Driver就不用再去拷贝了。

因此,在性能方面,v2完胜v1。

2、数据一致性方面

v1在Job结束后才批量拷文件,其实就是两阶段提交,它可以保证数据要么全部展示给用户,要么都没展示(当然,在拷贝过程中也无法保证完全的数据一致性,但是这个时间一般来说不会太长)。如果任务失败,也可以直接删了_temporary目录,可以较好的保证数据一致性。

v2在task结束后就拷文件,就会造成spark任务还未完成就让用户看到一部分输出,这样就完全没办法保证数据一致性了。另外,如果任务在输出过程中失败,就会有一部分数据成功输出,一部分没输出的情况。

因此在数据一致性方面,v1完胜v2

3、总结

很明显,如果我们执着于性能,不在乎数据输出时的一致性,完全可以将mapreduce.fileoutputcommitter.algorithm.version设置为2来提高性能。

但是如果我们对输出要求很高的数据一致性,那么最好不要为了性能将mapreduce.fileoutputcommitter.algorithm.version设置为2。


 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/559611.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

python如何连接mysql数据库

python链接mysql数据库要用到pymysql模块中的connect ,connect函数是pymysql模块中 用于连接MySQL数据库的一个函数。 所以连接mysql之前需要先导入pymysql模块。 第一步,mysql模块下载 方法1(使用pip命令安装): 因…

测试岗位是巨坑,7年测试告诉你千万别入行.....

每次都有人问我软件测试的前景是什么样的,每年也会有人很多人纷纷涌入测试的岗位上,希望自己能够进入阿里、华为等大厂 但是测试岗位真的那么吃香吗?今天我结合从零基础小白到测试开发的成长经历,来说下这个行业的发展前景&#…

微信小程序仿网易音乐播放器项目

一、项目展示 主页: 播放页: 搜索页: 排行榜页: 小控件: 二、项目结构 三、项目功能点 后端接口,使用node写的,使用了网易云音乐API: 封装的api文件 //env是基础地址js文件 im…

05 JDBC基础

什么是持久化 将内存中的数据永久保存在磁盘中,方便以后使用 JDBC是什么 java数据库连接 用于执行sql语句的java API java官方提供接口,各大厂商提供实现类,程序员使用实现类的jar包 JDBC的开发流程 添加包: mysql-connector-java-5.1.48.jar lombok.jar 口诀:贾连欲…

Linux文件内管理命令

目录 Linux文件内管理命令 创建文件 目录 普通文件 链接文件 删除文件 删除文件 删除目录 查看文件 目录 普通文件 编辑普通文件 在命令行进行文本内容处理 查找内容 复制文件 移动文件 命令详解 mkdir 作用 语法格式 touch 作用 语法格式 选项 ​编辑…

STM32——SDIO的学习(驱动SD卡)(理论篇)

目录 一、SD卡简介 1.1历史 1.2 tf卡和SD卡的区别 1.3 mmc,emmc,nand,flash的关系 1.4 SD卡的规格等级 1.4.1按容量分 1.4.2 class等级 1.4.3 UHS总线模式 1.4.4 UHS速度等级 1.4.5 VSC视频速度等级 二、SD卡的内部结构 三、SDIO…

MapReduce【Shuffle-Combiner】

概述 Conbiner在MapReduce的Shuffle阶段起作用,它负责局部数据的聚合,我们可以看到,对于大数据量,如果没有Combiner,将会在磁盘上写入多个文件等待ReduceTask来拉取,但是如果有Combiner组件,我们…

什么是Reactive服务架构

介绍: 在java web开发领域,区别于传统的的同步服务架构(底层实现基于同步阻塞IO模型),异步服务这个“新词”(bushi)在不断被提及和重视,不少公司的研发部门也开始在尝试对自己的业务…

【JMM】并发编程Bug的源头——可见性/有序性/原子性问题

本文目录( ̄∇ ̄)/ 可见性问题 有序性问题 为什么会进行指令重排序/乱序执行? 乱序存在的条件 this对象的溢出 原子性问题 如何保证原子性? synchronized 原理简介 加锁的方式 那么问题来了,JVM是如何知道当前…

2022年中国标准创新贡献奖获奖名单公示,海尔再添两项标准创新奖

01 2022年中国标准创新贡献奖 获奖名单公示 海尔再添两项标准创新奖 近日,2022年中国标准创新贡献奖获奖名单公示。其中,海尔GB/T 28219—2018《智能家用电器通用技术要求》、T/CAS 311.1—2018《电器电子产品绿色供应链管理第1部分:通则》…

前端实现拖拽效果改变元素顺序

文章目录 前言一、实现效果二、拖拽API1.代码2.遇见问题 总结 前言 在一次工作中,前端要实现通过鼠标实现拖拽改变顺序的功能,之前没有接触过拖拽这一块所以刚开始一筹莫展,幸运的是在查阅学习中实现了前端拖拽功能。 一、实现效果 二、拖拽…

月薪从10k到30k,一个普通测试工程师的3年涨薪之路...

“要涨薪,先跳槽”各个行业都存在这一共识,但是任何行业也都没有像程序员这样更为适用且好用的了。 前不久,就有网友分享了自己作为一个普通的自动化测试工程师的三年真实涨薪经历。但看看这个三年涨薪之路,好像并不普通啊&#…

2022年深圳杯数学建模D题复杂水平井三维轨道设计解题全过程文档及程序

2022年深圳杯数学建模 D题 复杂水平井三维轨道设计 原题再现: 在油气田开采过程中,井眼轨迹直接影响着整个钻井整体效率。对于复杂水平井,较差的井眼轨迹很可能会造成卡钻或施加钻压困难等重大事故的发生。因而,在施工之前分析影…

python爬虫-获取某某在线翻译的查询结果,爬取json文件并解析

文章目录 从基础步骤下手正确获取response数据关于url获取方式关于post方法的参数关于payload参数填入运行效果解析json数据到文件中完整代码运行结果 从基础步骤下手 # 指定url # 发出请求,get或post # 获取响应 # 把目标文件转存为字符串形式 # 持久性保存正确获…

从零开始的机械臂yolov5抓取gazebo仿真(六)

项目构造简述 前段时间博主装20.04系统不小心把efi启动给删了,导致18.04系统崩了,所以只能简单讲一下这个项目的设计思路以及以grasp.py代码为例进行简单解析。 yolov5_ros功能包 首先,说一下yolov5_ros功能包,该功能包的作用就…

使用 CameraX 在 Jetpack Compose 中构建相机 Android 应用程序

使用 CameraX 在 Jetpack Compose 中构建相机 Android 应用程序 CameraX 是一个 Jetpack 库,旨在帮助简化相机应用程序的开发。 [camerax官方文档] https://developer.android.com/training/camerax CameraX的几个用例: Image CaptureVideo CapturePrev…

【多线程】什么是线程死锁?形成条件是什么?如何避免?

文章目录 一、什么是线程死锁二、线程死锁三、形成死锁的四个必要条件是什么四、如何避免线程死锁 一、什么是线程死锁 死锁是指两个或两个以上的进程(线程)在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象,若…

Unity 天空盒

在 Unity 中,天空盒是使用天空盒着色器的一种材质。 创建天空盒材质 1.从菜单栏中,单击 Assets > Create > Material。 2.在 Shader 下拉选单中,单击 Skybox,然后单击要使用的天空盒着色器。 有Skybox/6 Sided、Skybox/…

人民大学与加拿大女王金融硕士项目——在现在憧憬美好的未来

未来是一个虚无缥缈的词汇,抓不住也看不到。未来里有着我们无限的希望,也有着美好的憧憬。未来究竟是怎样的呢,有人说现在的样子里藏着未来的模样。在职的你有没有为未来编织一副美丽的画卷呢?未来很远,远到只能靠想象…

MySQL小记——约束、多表查询

目录 约束 常见约束 主键约束 非空约束 唯一约束 自增长约束 非负约束 外键约束之一对多 外键约束之多对多 多表查询 内连接 外连接 左外连接 右外连接 子查询 自查询 case when语句 约束 在MySQL中,约束是对字段规则的一种限制。 常见约束 1.主…