spark Structured Streaming checkpoint参数优化

news2024/12/28 21:22:03

目录

  • 1 问题描述
  • 2 分析 checkpointLocation 配置
    • 2.1 checkpointLocation 在源码调用链
    • 2.2 MetadataLog(元数据日志接口)
  • 3 分析 checkpointLocation 目录内容
    • 3.1 offsets 目录
    • 3.2 commitLog 目录
    • 3.3 metadata 目录
    • 3.4 sources 目录
    • 3.5 sinks 目录
  • 4 解决方案
    • 4.1 File 作为接收端
    • 4.2 Elasticsearch 作为接收端

内容可能持续性修改完善,最新专栏内容与 spark-docs 同步,源码与 spark-advanced 同步。

1 问题描述

Spark StructuredStreaming 实时任务 kafka -> elasticsearchkafka -> hdfs(parquet格式文件) 任务运行过程中每隔固定时间后某个出现耗时较长。  
本内容以kafka -> elasticsearch为例说明,生产环境版本号 Spark-2.4.0,下图为 SQL-UI Job 运行耗时情况:

问题定位 分析耗时较长任务出现时间,发现出现该问题间隔时间点固定,怀疑是spark某种机制导致,与任务逻辑无关性较大。

查看指定的 checkpointPath 目录发现,在 $checkpointPath/sinks/elasticsearch 下与SQL-UI Job 长时间耗时的时间点一致。初步判断控制生成大文件的方式或者策略即可解决问题。 

2 分析 checkpointLocation 配置

2.1 checkpointLocation 在源码调用链

分析源码查看 StructuredStreaming 启动流程发现,DataStreamWriter#start 方法启动一个 StreamingQuery。 同时将 checkpointLocation 配置参数传递给StreamingQuery管理。

StreamingQuery 接口实现关系如下: 

  • StreamingQueryWrapper 仅包装了一个不可序列化的StreamExecution
  • StreamExecution 管理Spark SQL查询的执行器
    • MicroBatchExecution 微批处理执行器
    • ContinuousExecution 连续处理(流式)执行器

因此我们仅需要分析 checkpointLocation 在 StreamExecution中调用即可。

StreamExecution 中 protected def checkpointFile(name: String): String 方法为所有与 checkpointLocation 有关逻辑,该方法返回 $checkpointFile/name 路径

2.2 MetadataLog(元数据日志接口)

spark 提供了org.apache.spark.sql.execution.streaming.MetadataLog接口用于统一处理元数据日志信息。
checkpointLocation 文件内容均使用 MetadataLog进行维护。

分析 MetadataLog 接口实现关系如下: 

各类作用说明

  • NullMetadataLog 空日志,即不输出日志直接丢弃
  • HDFSMetadataLog 使用 HDFS 作为元数据日志输出
    • CommitLog 提交日志
    • OffsetSeqLog 偏移量日志
    • CompactibleFileStreamLog 封装了支持按大小合并、删除历史记录的 MetadataLog
      • FileStreamSourceLog 文件类型作为数据源时日志记录
      • FileStreamSinkLog 文件类型作为数据接收端时日志记录
      • EsSinkMetadataLog Es作为数据接收端时日志记录  

分析 CompactibleFileStreamLog#compact 合并逻辑简单描述为:

假设有 0,1,2,3,4,5,6,7,8,9,10 个批次依次到达,合并大小为3
当前合并结果为   `0,1,2.compact,3,4`
下一次合并结果为 `0,1,2.compact,3,4,5.compact` , **说明:5.compact 文件内容 = 2.compact + 3 + 4**

last.compact 文件大小会随着批次运行无限增大
...

分析 CompactibleFileStreamLog 删除过期文件逻辑:

// CompactibleFileStreamLog#add 方法被调用时,默认会判断是否支持删除操作
  override def add(batchId: Long, logs: Array[T]): Boolean = {
    val batchAdded =
      if (isCompactionBatch(batchId, compactInterval)) { // 是否合并
        compact(batchId, logs)
      } else {
        super.add(batchId, logs)
      }
    if (batchAdded && isDeletingExpiredLog) { // 添加成功且支持删除过期文件
      // 删除时判断当前批次是否在 spark.sql.streaming.minBatchesToRetain 配置以外且在文件保留时间内
      // 配置项参考 第4节 解决方案配置说明
      deleteExpiredLog(batchId) 
    }
    batchAdded
  }

3 分析 checkpointLocation 目录内容

目前 checkpointLocation 内容主要包含以下几个目录(子小节中逐个介绍目录数据来源及功能性)

  • offsets
  • commits
  • metadata
  • sources
  • sinks

3.1 offsets 目录

记录每个批次中的偏移量。为了保证给定的批次始终包含相同的数据,在处理数据前将其写入此日志记录。 此日志中的第 N 条记录表示当前正在已处理,第 N-1 个条目指示哪些偏移已处理完成。

// StreamExecution 类中声明了 OffsetSeqLog 变量进行操作
val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))

// 该日志示例内容如下,文件路径=checkpointLocation/offsets/560504
v1
{"batchWatermarkMs":0,"batchTimestampMs":1574315160001,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"game_dc_real_normal":{"17":279843310,"8":318732102,"11":290676804,"2":292352132,"5":337789356,"14":277147358,"13":334833752,"4":319279439,"16":314038811,"7":361740056,"1":281418138,"10":276872234,"9":244398684,"3":334708621,"12":290208334,"15":267180971,"6":296588360,"0":350011707}}

3.2 commitLog 目录

记录已完成的批次,重启任务检查完成的批次与 offsets 批次记录比对,确定接下来运行的批次

// StreamExecution 类中声明了 CommitLog 变量进行操作
val commitLog = new CommitLog(sparkSession, checkpointFile("commits"))

// 该日志示例内容如下,文件路径=checkpointLocation/commits/560504
v1
{"nextBatchWatermarkMs":0}

3.3 metadata 目录

metadata 与整个查询关联的元数据,目前仅保留当前job id

// StreamExecution 类中声明了 StreamMetadata 变量进行操作,策略如下:

/** Metadata associated with the whole query */
  protected val streamMetadata: StreamMetadata = {
    val metadataPath = new Path(checkpointFile("metadata"))
    val hadoopConf = sparkSession.sessionState.newHadoopConf()
    StreamMetadata.read(metadataPath, hadoopConf).getOrElse {
      val newMetadata = new StreamMetadata(UUID.randomUUID.toString)
      StreamMetadata.write(newMetadata, metadataPath, hadoopConf)
      newMetadata
    }
  }
  
// 该日志示例内容如下,文件路径=checkpointLocation/metadata
{"id":"5314beeb-6026-485b-947a-cb088a9c9bac"}

3.4 sources 目录

sources 目录为数据源(Source)时各个批次读取详情

3.5 sinks 目录

sinks 目录为数据接收端(Sink)时批次的写出详情  
例如: es 作为 sink 时,内容如下

目前 Es 支持配置自定义写出目录,如果未配置写入 checkpointLocation/sinks/ 目录,参考SparkSqlStreamingConfigs

文件路径=checkpointLocation/sinks/elasticsearch/560504
v1
{"taskId":0,"execTimeMillis":1574302020143,"resource":"rs_real_{app}.{dt}","records":220}
{"taskId":1,"execTimeMillis":1574302020151,"resource":"rs_real_{app}.{dt}","records":221}
{"taskId":2,"execTimeMillis":1574302020154,"resource":"rs_real_{app}.{dt}","records":219}
{"taskId":3,"execTimeMillis":1574302020151,"resource":"rs_real_{app}.{dt}","records":221}
{"taskId":4,"execTimeMillis":1574302020154,"resource":"rs_real_{app}.{dt}","records":220} 


例如: 文件类型作为 sink,默认写出到各个 $path/_spark_metadata 目录下 ,参考 FileStreamSink

hdfs 写出时内容为,文件路径=$path/_spark_metadata/560504
v1
{"path":"hdfs://xx:8020/$path/1.c000.snappy.parquet","size":8937,"isDir":false,"modificationTime":1574321763584,"blockReplication":2,"blockSize":134217728,"action":"add"}
{"path":"hdfs://xx:8020/$path/2.c000.snappy.parquet","size":11786,"isDir":false,"modificationTime":1574321763596,"blockReplication":2,"blockSize":134217728,"action":"add"}

4 解决方案

根据实际业务情况合理调整日志输出参数(配置见4.1/4.2说明):

  • 关闭日志输出
  • 控制保留并可以恢复的最小批次数量且减小日志文件保留时间
  • 调整日志文件合并阈值

无论如何调整参数,.compact(合并的文件)大小会一直增长,目前关闭可以解决。调整其他阈值可减小任务出现耗时情况次数。 针对该问题已提交给官方 SPARK-29995尝试解决

4.1 File 作为数据源或者数据接收端

  • spark.sql.streaming.minBatchesToRetain (默认100) 保留并可以恢复的最小批次数量
  • spark.sql.streaming.commitProtocolClass 默认:org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol 合并实现类,其余支持实现参考FileCommitProtocol实现类

fileSource 数据源端:配置在 FileStreamSourceLog 引用

  • spark.sql.streaming.fileSource.log.deletion (默认true),删除过期日志文件
  • spark.sql.streaming.fileSource.log.compactInterval (默认10),日志文件合并阈值
  • spark.sql.streaming.fileSource.log.cleanupDelay (默认10m),日志文件保留时间

fileSink 接收端:配置在 FileStreamSinkLog 引用

  • spark.sql.streaming.fileSink.log.deletion (默认true),删除过期日志文件CompactibleFileStreamLog
  • spark.sql.streaming.fileSink.log.compactInterval (默认10),日志文件合并阈值
  • spark.sql.streaming.fileSink.log.cleanupDelay (默认10m),日志文件保留时间

4.2 Elasticsearch 作为接收端

elasticsearch-spark 官方文档,es 官方重写变量命名及赋值方式,参考EsSinkMetadataLog

  • es.spark.sql.streaming.sink.log.enabled(默认true) 启用或禁用流作业的提交日志。默认情况下,该日志处于启用状态,并且具有相同批次ID的输出批次将被跳过,以避免重复写入。设置false为时,将禁用提交日志,并且所有输出都将发送到Elasticsearch,无论它们是否在先前的执行中已发送。
  • es.spark.sql.streaming.sink.log.path 设置存储此流查询的日志数据的位置。如果未设置此值,那么Elasticsearch接收器会将其提交日志存储在中给定的路径下checkpointLocation。任何与HDFS客户端兼容的URI都是可以接受的。
  • es.spark.sql.streaming.sink.log.cleanupDelay(默认10m) 提交日志通过Spark的HDFS客户端进行管理。一些与HDFS兼容的文件系统(例如Amazon的S3)以异步方式传播文件更改。为了解决这个问题,在压缩了一组日志文件之后,客户端将等待此时间,然后再清理旧文件。
  • es.spark.sql.streaming.sink.log.deletion(默认true) 确定日志是否应删除不再需要的旧日志。提交每个批次后,客户端将检查是否有已压缩且可以安全删除的提交日志。如果设置为false,日志将跳过此清理步骤,为每个批次保留一个提交文件。
  • es.spark.sql.streaming.sink.log.compactInterval(默认10) 设置压缩日志文件之前要处理的批次数。默认情况下,每10批提交日志将被压缩为一个包含所有以前提交的批ID的文件。

问题:_spark_metadata/0 doesn't exist while compacting batch error

Sample code of application:

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", <server list>)
  .option("subscribe", <topic>)
  .load()

[...] // do some processing

dfProcessed.writeStream
  .format("csv")
  .option("format", "append")
  .option("path",hdfsPath)
  .option("checkpointlocation","")
  .outputmode(append)
  .start

Example :

df.writeStream
      .format("parquet")
      .outputMode("append")
      .option("checkpointLocation", "D:/path/dir/checkpointLocation")
      .option("path", "D:/path/dir/output")
      .trigger(Trigger.ProcessingTime("5 seconds"))
      .start()
      .awaitTermination()

The error message

_spark_metadata/n.compact doesn't exist when compacting batch n+10

解决方案一: How to recover from a deleted _spark_metadata folder in Spark Structured Streaming - DEV Community 

解决方案二:

增加:option('retention', retention_value)

 df.writeStream
      .format("parquet")
      .outputMode("append")
      .option("checkpointLocation", "D:/path/dir/checkpointLocation")
      .option("path", "D:/path/dir/output")

       .option('retention', retention_value)
      .trigger(Trigger.ProcessingTime("5 seconds"))
      .start()
      .awaitTermination()

 

https://github.com/apache/spark/blob/master/docs/structured-streaming-programming-guide.md 

 https://github.com/apache/spark/pull/24128

参照:https://github.com/GourdErwa/awesome-spark/blob/master/docs/structured-streaming/Spark-StructuredStreaming%20checkpointLocation%E4%BC%98%E5%8C%96%E7%82%B9%E5%88%86%E6%9E%90.md 

Configuration Properties · The Internals of Spark Structured Streaming

FileStreamSinkLog - The Internals of Spark Structured Streaming 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/747951.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

跨站脚本攻击XSS

​​​​​​1、漏洞原理及防御 XSS又叫CSS (CrossSiteScript),因为与层叠样式表(css)重名,所以叫Xss&#xff0c;中文名叫跨站脚本攻击。 xss攻击&#xff0c;主要就是攻击者通过“html注入”篡改了网页&#xff0c;插入了恶意的脚本&#xff0c;从而在用户浏览网页时&#…

迅为龙芯3A5000_7A2000COMe_模块和主板

龙芯 3A50007A2000 COME 采用全国产龙芯 3A5000 处理器&#xff0c;基于龙芯自主指令系统 (LoongArche)的 LA464 微结构&#xff0c;并进一步提升频率&#xff0c;降低功耗&#xff0c;优化性能。桥片采用龙芯 7A2000&#xff0c;支持 PCIE 3.0、USB 3.0 和 SATA 3.0、显示接口…

单独编译 Android 固件-打包 update.img-iTOP-RK3588开发板

进入到 3588-android12 文件夹&#xff0c;输入以下命令设置 java 版本为 1.8 版本&#xff0c;如下图所示: source javaenv.sh java -version 输入以下命令使能编译环境: source build/envsetup.sh lunch rk3588_s-userdebug 执行完上述命令&#xff0c;如果需要编译 uboo…

如何破解滑动验证码?

本文通过自动化查询域名或公司的备案信息&#xff0c;来演示其中图片滑动验证码的破解方式&#xff0c;以此来思考验证码的安全性问题&#xff0c;思考如何设计出安全性更高的验证码。 注意&#xff1a;破解验证码进行网络内容抓取可能是一种违规行为&#xff0c;可以以此进行验…

西门子PLC硬件编程需要注意的几个要点

往往一个好的编程程序习惯可以让事情事半功倍。用正确的逻辑思维和方法去写程序&#xff0c;一方面可以减少出错&#xff0c;另一方面就是方便检查程序里出现的bug。下面就为大家盘点一下&#xff0c;西门子PLC的一些硬件编程的好习惯。 1、关于选型 项目开始需要统计出IO点表…

Linux嵌入式项目-智能家居

一、资料下载 二、框架知识 三、MQTT通信协议 1、上位机APP主要工作 1.wait for msg / while(1)订阅等待消息 2.处理消息 客户端创建了两个线程&#xff0c;一个线程用于发布消息&#xff0c;一个线程用于监听订阅消息 &#xff08;那我的仿真系统也可以啊&#xff0c;一个…

《AutoSar实战》DIO配置

文章目录 前言一、配置过程1&#xff0c;选择引脚2&#xff0c;DIO模块配置1&#xff09;新建DioChannel 3&#xff0c;PORT模块配置4&#xff0c;保存并生成DIO&#xff0c;PORT模块 二、实现并验证1&#xff0c;调用函数接口2&#xff0c;示波器测量周期 总结 ->返回总目录…

富士施乐/Fuji Xerox SC2022 CPS DA 彩色激光复印机不能扫描的解决方法

一台富士施乐/Fuji Xerox SC2022 CPS DA 彩色激光复印机用网线连接的&#xff0c;有分配的IP地址&#xff0c;有三台电脑连接&#xff0c;可打印&#xff0c;但是不能扫描。 驱动也没问题&#xff0c;找了一台电脑先删除了打印机&#xff0c;在官网下载了驱动重新安装&#xff…

【Spring学习一】简单认识Spring是什么?——框架

目录 1、为什么要学习Spring&#xff1f; 2、Spring是什么&#xff1f; 1、IoC是什么&#xff1f; 2、进一步通过代码演示理解IoC 3、怎么理解容器&#xff1f; 4、知道DI与IoC的区别&#xff1f; 1、为什么要学习Spring&#xff1f; 我们常说的Spring 指的是 Spring Fra…

Microsoft 已经发布了7月份的产品安全问题修复报告。

&#x1f525;Microsoft 已经发布了7月份的产品安全问题修复报告。我们建议您关注趋势性漏洞&#xff0c;即那些已经或即将被攻击者积极利用的漏洞。 7月份报告中的两个危险漏洞&#xff1a; CVE-2023-32049和CVE-2023-35311。 CVE-2023-32049漏洞允许网络犯罪分子绕过Window…

卫星图片的Classification_model

Tensorflow版本&#xff1a;2.6.0 使用的是CNN神经网络&#xff0c;网络结构在最后给出 飞机和湖泊的卫星图片二分类网络 数据集请点击链接&#xff1a;https://www.kaggle.com/datasets/yo7oyo/lake-plane-binaryclass 数据集的构成&#xff1a;airplane: 700 张&#xff0c; …

着眼未来砥砺前行,知了汇智携西南交大学生走进企业参观学习

随着数字化转型推进的深入&#xff0c;企业对数字化人才的需求量大幅增长&#xff0c;人才需求结构也发生显著在变化。为加强学生与企业的接触&#xff0c;拓展专业视野&#xff0c;对接行业需求&#xff0c;激发学生对所学专业的兴趣&#xff0c;明确自己学习的目标&#xff0…

NC19 连续子数组的最大和

import java.util.*; public class Solution {public int FindGreatestSumOfSubArray(int[] array) {//记录到下标i为止的最大连续子数组和int[] dp new int[array.length]; dp[0] array[0];int maxsum dp[0];for(int i 1; i < array.length; i){//状态转移&#xff1a;…

优雅实现垂直SeekBar:不继承Seekbar、不自定义View

目录 0 前言 关于自定义View 1 实现竖直SeekBar 1.1 XML布局解析 1.1.1 套一层FrameLayout 1.1.2 SeekBar去除左右间距 1.1.3 SeekBar高度无法设置 1.1.4 SeekBar背景设置 1.1.5 底部View尺寸和距底部距离不硬编码 1.2 自定义样式属性与主题 1.2.1 自定义样式属性 …

应急管理大屏助力暴雨天气下的水灾防范

随着气候变化和城市化进程的加剧&#xff0c;暴雨天气引发的水灾风险日益凸显。在面对这种自然灾害时&#xff0c;如何高效、及时地应对、减轻损失成为了当务之急。水灾应急管理平台的可视化大屏为相关部门和决策者提供了实时、全面的信息展示和决策支持&#xff0c;大大提升了…

每天5个好用的实用工具链接分享(第1弹)

每天5个好用的实用工具链接分享&#xff08;第1弹&#xff09; 1、免费PPT模板网站2、科研狗租用GPU跑模型网站3、在线正则测试网站4、免费数据集下载网站5、在线curl命令转代码网站6、号外 1、免费PPT模板网站 【链接】&#xff1a;https://www.ypppt.com/ 【网站名】&#x…

性能测试工具 Jmeter 做 Http 接口测试 :编写自定义函数

目录 一、 前言 二、 编写自定义函数的步骤 1. 新建一个工程&#xff0c;导入 jmeter jar 包。 2. 新建 package&#xff1a;stressTest.functions 3. 新建一个类继承 AbstractFunction&#xff0c;重写以下方法&#xff1a; 4. 打包 5. 将打出来的 jar 包拷贝至 jmeter…

学习记录——BiSeNetV1、BiSeNetV2、BiSeNetV3、PIDNet、CMNeXt

BiSeNetV1 BiSeNetV1为了在不影响速度的情况下&#xff0c;同时收集到空间信息和语义信息&#xff0c;设计了两条路&#xff1a; Spatial Path: 用了三层stride为 2 的卷积&#xff0c;卷积BNRELU模块。最后提取了相当于原图像 1/8 的输出特征图。由于它利用了较大尺度的特征图…

怎样把手机录音转换成文字免费?分享3个免费方法给给大家!

将手机录音转换为文字可以提高工作和学习效率&#xff0c;但很多人不知道如何实现。在本文中&#xff0c;我将分享三个免费的方法来帮助您将手机录音转换为文字&#xff0c;分别是使用记灵在线工具&#xff08;网页&#xff09;、微信和剪映。无论您是需要转录会议记录、课堂笔…

界面控件DevExtreme UI组件——增强的API功能

虽然DevExtreme刚刚发布了v23.1&#xff0c;但今天我们仍然要继续总结一下之前的主要更新&#xff08;v22.2&#xff09;中发布的一些与DevExtreme API相关的重要特性。 DevExtreme拥有高性能的HTML5 / JavaScript小部件集合&#xff0c;使您可以利用现代Web开发堆栈&#xff…