Spark - 创建 _SUCCESS 文件与获取最新可用文件

news2025/2/22 13:22:38

目录

一.引言

二.增加 _SUCCESS 标识

1.SparkContext 生成

2.FileSystem 生成

3.Hadoop 生成

三.获取最新文件

1.获取 SparkContext

2.按照时间排序

3.遍历生成 Input

四.总结


一.引言

有任务需要每小时生成多个 split 文件分片,为了保证线上任务读取最新的 SUCCESS 文件,需要在文件生成后增加 _SUCCESS 标识供线上文件判断当前文件路径是否可用。

最终效果:

其中 split 为文件夹 (dir),_SUCCESS 为文件 (file)。

二.增加 _SUCCESS 标识

根据使用场景与文件类型与位置的不同,下面提供三种方案供大家使用。

1.SparkContext 生成

    val spark = SparkSession
      .builder
      .config(conf)
      .getOrCreate()
    val sc = spark.sparkContext   

    sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true")

直接设置 marksuccessfuljobs = true,即可在 saveAsTextFile 的对应路径生成 _SUCCESS 标识。

由于每个 split 都是调用 saveAsTextFile 生成的文件夹 (dir),所以每个 split 文件夹下都存在一个 _SUCCESS 标识当前 split 生成。

2.FileSystem 生成

上面的方法可以在 saveAsTextFile 时生成 _SUCCESS 标识,但是 split 的上级父目录无法标识 _SUCCESS,我们想要在 split 全部成功生成结束后,为父目录也增加一个 _SUCCESS 标识,此时需要 FileSystem 大显身手。

    val spark = SparkSession
      .builder
      .config(conf)
      .getOrCreate()
    val sc = spark.sparkContext       

  val fileSystem = new Path("/user/...").getFileSystem(sc.hadoopConfiguration)
  fileSystem.create(new Path(s"$output/_SUCCESS"))

通过 HDFS 的 "/user/xxx" 路径生成对应文件系统 FileSystem 类,随后调用 create 方法,其中 output 为对应父目录地址。

3.Hadoop 生成

如果不想在程序内生成 _SUCCESS 标识,或者希望获取到 Spark APP 正常退出后再生成子/父目录的标识,则可以直接使用 touchz 实现。

hadoop fs -touchz /user/xxx/tmp/dt=20230516/1130/_SUCCESS

直接调用 hadoop 生成空文件即可。判断程序是否正常运行可以调用下面的 Shell 代码:

if [ "$?" -ne 0 ];
    then echo "Application Failed";
    sh ./sendMail.sh '任务异常'
else
    echo "success"
    sh ./sendMail.sh '任务成功'
    hadoop fs -touchz /user/xxx/tmp/dt=20230516/1130/_SUCCESS
fi

将代码放到 Spark-Submit 脚本的后面,即可实现任务正常退出生成 _SUCCESS。"$?" 可以看作是检查任务是否 System.exit(0) 正常退出。

三.获取最新文件

1.获取 SparkContext

    val conf = new SparkConf().setAppName("GetInputPath")

    val spark = SparkSession
      .builder
      .config(conf)
      .getOrCreate()
    val sc = spark.sparkContext

    sc.setLogLevel("error")

    // 1.获取 FileSystem
    val baseDirPath = new Path("/user_ext/...")
    val fs = baseDirPath.getFileSystem(spark.sparkContext.hadoopConfiguration)

通过 HDFS 文件系统路径与 SC 获取 FileSystem。

2.按照时间排序

    // 2.按最新时间排序文件夹
    val satisfiedPath = fs.listStatus(baseDirPath).filter(dir => {
      dir.isDirectory && fs.listStatus(dir.getPath).length == 5
    }).sortWith { case (dir1, dir2) =>
      dir1.getModificationTime > dir2.getModificationTime
    }.iterator

dir.isDirectory 判断是否为文件,fs.listStatus 判断文件夹个数,因为生成了 1 个 _SUCCESS File + 5 个 split Dir,所以长度判断为 5,最后通过 getModificationTime 获取调整时间并排序。

3.遍历生成 Input

    // 3.遍历寻找合规文件夹
    var state = true
    var inputPath = ""

    while (state && satisfiedPath.hasNext) {
      // dir + file.getName 构成完整的 file path
      val path = new Path(baseDirPath + File.separator + satisfiedPath.next().getPath.getName)
      // 检查文件大小 1024-KB 1024x1024-MB 1024x1024x1024-GB
      val capacity = fs.getContentSummary(path).getLength / (1024 * 1024 * 1024.0)
      // 检查是否包含 SUCCESS
      val allFileName = fs.listStatus(path).map(_.getPath.getName)
      var isSuccess = false
      allFileName.foreach(fileName => {
        if (fileName.contains("_SUCCESS")) isSuccess = true
      })

      // 判断文件合规
      if (capacity >= 100 && isSuccess) {
        inputPath = path.toString
        state = false
      }
    }

getContentSummary.getLength 可以获取对应 Path 地址的 byte 大小,可以根据自己的场景进行单位转化,例如 K、M、G 等,而 listStatus.map(_.getPath.getName) 则是遍历我们的 File,判断是否有 _SUCCESS 标识。最后合规的输入路径需要容量达到指定要求且存在 _SUCCESS 才可以,否则继续 iterator 迭代,直到找到合规的文件路径。

四.总结

上述方法适用于在频繁生成的文件中添加 _SUCCESS 标识,并在对应读取的程序中获取最新的可用路径。除此之外,FileSystem 还有很多 API,除了 listStatus 方法外,大家也可以使用 fs.globStatus 方法获取全局的匹配路径文件,再依次处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/532712.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux实操篇---常用的基本命令5(进程管理类和crontab系统定时任务)

一、进程管理类 进程是正在执行的一个程序或命令,每一个进程都是一个运行的实体,都有自己的地址空间,并占用一定的系统资源。 守护进程和系统服务就是一一对应的关系。 有系统级别的进程和用户级别的进程。 进程管理:所有的进…

如何使用自定义知识库构建自定义ChatGPT机器人

目录 隐藏 使用自定义数据源为您的 ChatGPT 机器人提供数据 1. 通过Prompt提示工程提供数据 2. 使用 LlamaIndex(GPT 索引)扩展 ChatGPT 如何添加自定义数据源 先决条件 怎么运行的 最后的总结 使用自定义数据源为您的 ChatGPT 机器人提供数据…

rt-thread启动流程

资料下载 RT-Thread Simulator 例程 操作流程 将上面的仿真例程下载并解压,通过MDK打开,编译,调试,并打开串口点击运行,就可以看到如下输出了: 添加自己的 thread:在main()函数中添加即可&am…

[Java基础]基本概念(上)(标识符,关键字,基本数据类型)

hello 大家好,计算机语言各有不同,但本质上都是操作内存和计算。这章的内容是介绍Java中的基本概念展开,包括:标识符,关键字,Java基本数据类型,运算符,表达式和语句,分支…

前端架构师-week6-require源码解析

require 源码解析——彻底搞懂 npm 模块加载原理 require 的使用场景 加载模块类型 加载内置模块:require(fs)加载 node_modules 模块:require(ejs)加载本地模块:require(./utils)支持文件类型 加载 .js 文件加载 .mjs 文件加载 .json 文件…

AI女友同时和1000人谈恋爱,狂赚500万

AI女友,预计暴赚4亿 要说当下什么最火,AI首当其冲无可置疑。00后网络红人红卡琳玛乔丽(Caryn Marjorie)最近与Forever Voices公司合作,通过视频训练等方式打造出个人形象、声音和性格的AI虚拟女友,就像在和…

Redis高可用--持久化

在Web服务器中,高可用是指服务器可以正常访问的时间,衡量的标准实在多长时间内可以提供正常服务(99.9%、99.99%、99.999%等等)。 但是在Redis语境中,高可用的含义似乎要宽泛一些,除了保证提提供正常服务&a…

大疆无人机 MobileSDK(遥控器/手机端)开发 v4版<3>

导读 第三篇文章准备单独拿出来写,因为在大疆为人机的所有功能中,航线规划的功能最为复杂,也相当的繁琐,这里需要说仔细一点,可能会将代码进行多步分解。 航线规划 1)航线打点 点击 按钮进行打点,在地图中手动选择点位选择完成后点击**[完成]**按钮,即可完成航线打点…

新展预告 | YT U LOVE——许峰个展即将亮相!

深圳东方美术馆荣幸地宣布,将于5月20日呈现艺术家许峰在鹏城的首次个展“YT U LOVE”,展出艺术家从2020年至2023年创作的油画、纸本及雕塑40余件作品。此次展览以“YT U LOVE”为题,恰逢兔年,yutu在中国意指玉兔,前后两…

美创科技首家互联网医院数据安全建设案例实践

互联网医院作为医疗服务模式创新发展的新产物,在各项配套政策支持下快速发展。然而,蓬勃之势下,无数双“暗夜之手”也在蠢蠢欲动,试图从中渔利,关乎患者隐私、种类繁多的医疗数据迎来愈加严峻的安全挑战。 某市中心医院…

劳有所学|文献可视化分析工具CiteSpace、vosviewer使用指南

【基于Citespace和vosviewer文献计量学相关论文 】 专题一:文献计量学方法与应用 1 文献计量学方法基本介绍 2 与其他综述方法区别联系 3 各学科领域应用趋势近况 4 主流分析软件优缺点对比 5 经典高分10SCI思路复盘 6 软件安装与Java环境配置 专题二&#…

理解PMP的顺序

PMP,大量考的是“下一步”、“本应该”的顺序逻辑。在学习的时候,我们需要把整本书十个知识领域,穿起来形成一个线性的结构。 在整理的过程中,很多人都会认为,线性结构,应该是这样的: 每个过程…

安卓播放H264/H265实时流(安卓实时预览H264/H265 安卓实时预览AVC/HEVC)

实际项目中经常遇到两种场景,第一种从无人机拿H264/H265码流转GB28181等协议,转协议的同时可能还需要实时预览无人机画面; 第二种是安卓接USB外置摄像头, 由于USB2.0传输带宽有限,对于高分辨率图像, 带宽无法满足YUV图像的传输, 摄像头只好先…

数据的比较

前言 在学习Java过程中&#xff0c;数据的比较是必学的。 对于不同的数据有不同的比较方式。 目录 前言 一、算术比较器 二、equals() 三、Comparable接口 四、Comparator接口 结语 一、算术比较器 算数比较器有&#xff1a;、>、<、>、<、! 但是算数比较器…

win下C++部署深度学习模型之clion配置pytorch+opencv教程记录

win下clion配置pytorch和OpenCV 一、clion配置vs编译器以及测试二、clion配置pytorch2.1、下载libtorch2. 2、环境变量配置2.3、cmakelist.txt编写2.4、main函数测试运行 三、clion配置opencv3.1、源码下载3.2、编译3.3、环境变量配置3.4、cmakelist.txt编写3.5 main函数测试运…

揭 秘~月薪2-3万的程序员一天到底是怎么度过的?

程序员的高薪资&#xff0c;一直是大家热衷讨论的话题&#xff0c;几乎每隔一段时间就会在社交平台被网友们热议一番。 比如这条“月薪2万到3万的程序员的一天是怎么样度过的&#xff1f;”的帖子就一直排在知乎前列。 作为薪资可观的岗位&#xff0c;大家都非常好奇&#xff…

【Fiddler移动端抓包】~抓包不是偷窥,Fiddler教你看透移动应用背后的秘密~

目录 引言 抓包 什么是抓包 哪些场景下需要抓包 Fiddler Fiddler抓包原理 安装 Fiddler移动端抓包 第一步&#xff1a;允许远程计算机连接 第二步&#xff0c;设置手机网络代理 第三步&#xff0c;允许捕获HTTPS连接 第四步&#xff0c;手机安装证书 结语 引言 当…

探寻生机 | 数说故事助力微播易第七届风向大会,研判新风向,洞察新趋势

“过去一年&#xff0c;有的人用ChatGPT谁出具的北京烤鸭图片最准确搞怪&#xff0c;有的人却已经利用虚拟主播单场带货百万&#xff1b;有的人正在被AIGC淘汰&#xff0c;有的人却通过人机协作实现20秒制作100张创意图&#xff1b;有的百万粉丝接不到广告&#xff0c;有的仅靠…

使用python实现微博评论分词与关键词提取(从MySQL数据库中读取数据)

一、实验环境 &#xff08;1&#xff09;Windows 操作系统&#xff1b; &#xff08;2&#xff09;PyCharm 2019.1。 &#xff08;3&#xff09;数据库用户名为 root&#xff0c;密码为 123456. (4) 学校机房电脑&#xff0c;带有mysql 二、获取数据库信息 &#xff08;1&a…

【Linux从入门到精通】进程的基本概念

我们通过对上篇文章冯诺依曼体系结构对硬件进行讲解后&#xff0c; 本篇文章会对进程进行深入讲解。同时会讲解PCB&#xff08;进程控制块&#xff09;。希望本篇文章内容会对你有所帮助。 文章目录 一、再次理解操作系统 1、1 操作系统的作用 1、2 操作系统的管理 二、进程基本…