【Spark分布式内存计算框架——Spark Streaming】12. 偏移量管理(上)代码重构与Checkpoint 恢复

news2024/12/25 12:16:00

6. 偏移量管理

针对前面实现【百度热搜排行榜Top10】实时状态统计应用来说,当应用关闭以后,再次启动(Restart)执行,并没有继续从上次消费偏移量读取数据和获取以前状态信息,而是从最新偏移量(Latest Offset)开始的消费,肯定不符合实际需求,有两种解决方式:

方式一:Checkpoint 恢复

  • 当流式应用再次启动时,从Checkpoint 检查点目录恢复,可以读取上次消费偏移量信息和状态相关数据,继续实时处理数据。
  • 文档:http://spark.apache.org/docs/2.4.5/streaming-programming-guide.html#checkpointing

方式二:手动管理偏移量

  • 用户编程管理每批次消费数据的偏移量,当再次启动应用时,读取上次消费偏移量信息,继续实时处理数据。
  • 文档:http://spark.apache.org/docs/2.4.5/streaming-kafka-0-10-integration.html#storing-offsets

在实际生产项目中,常常使用第二种方式【手动管理偏移量】,将偏移量存储到MySQL、Redis或Zookeeper中,接下来讲解两种方式实现,都需要掌握。

6.1 重构代码

实际项目开发中,为了代码重构复用和代码简洁性,将【从数据源读取数据、实时处理及结果输出】封装到方法【processData】中,类的结构如下:
在这里插入图片描述
Streaming流式应用模板完整代码:

import java.util.Date
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
/**
* SparkStreaming流式应用模板Template,将从数据源读取数据、实时处理及结果输出封装到方法中。
*/
object StreamingTemplate {
/**
* 抽象一个函数:专门从数据源读取流式数据,经过状态操作分析数据,最终将数据输出
* @param ssc 流式上下文StreamingContext实例对象
*/
def processData(ssc: StreamingContext): Unit ={
// TODO: 1. 从Kafka Topic实时消费数据
val kafkaDStream: DStream[ConsumerRecord[String, String]] = {
// i.位置策略
val locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent
// ii.读取哪些Topic数据
val topics = Array("search-log-topic")
// iii.消费Kafka 数据配置参数
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node1.itcast.cn:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group_id_streaming_0002",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
// iv.消费数据策略
val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe(
topics, kafkaParams
)
// v.采用消费者新API获取数据
KafkaUtils.createDirectStream(ssc, locationStrategy, consumerStrategy)
}
// TODO: 2. 词频统计,实时累加统计
// 2.1 对数据进行ETL和聚合操作
val reduceDStream: DStream[(String, Int)] = kafkaDStream.transform{ rdd =>
val reduceRDD: RDD[(String, Int)] = rdd
// 过滤不合格的数据
.filter{ record =>
val message: String = record.value()
null != message && message.trim.split(",").length == 4
}
// 提取搜索词,转换数据为二元组,表示每个搜索词出现一次
.map{record =>
val keyword: String = record.value().trim.split(",").last
keyword -> 1
}
// 按照单词分组,聚合统计
.reduceByKey((tmp, item) => tmp + item) // TODO: 先聚合,再更新,优化
// 返回
reduceRDD
}
// 2.2 使用mapWithState函数状态更新, 针对每条数据进行更新状态
val spec: StateSpec[String, Int, Int, (String, Int)] = StateSpec.function(
// (KeyType, Option[ValueType], State[StateType]) => MappedType
(keyword: String, countOption: Option[Int], state: State[Int]) => {
// a. 获取当前批次中搜索词搜索次数
val currentState: Int = countOption.getOrElse(0)
// b. 从以前状态中获取搜索词搜索次数
val previousState = state.getOption().getOrElse(0)
// c. 搜索词总的搜索次数
val latestState = currentState + previousState
// d. 更行状态
state.update(latestState)
// e. 返回最新省份销售订单额
(keyword, latestState)
}
)
// 调用mapWithState函数进行实时累加状态统计
val stateDStream: DStream[(String, Int)] = reduceDStream.mapWithState(spec)
// TODO: 3. 统计结果打印至控制台
stateDStream.foreachRDD{(rdd, time) =>
val batchTime: String = FastDateFormat.getInstance("yyyy/MM/dd HH:mm:ss")
.format(new Date(time.milliseconds))
println("-------------------------------------------")
println(s"BatchTime: $batchTime")
println("-------------------------------------------")
if(!rdd.isEmpty()){
rdd.coalesce(1).foreachPartition{_.foreach(println)}
}
}
}
// 应用程序入口
def main(args: Array[String]): Unit = {
// TODO: 构建流式上下文实例对象StreamingContext
val ssc: StreamingContext = {
// a. 创建SparkConf对象,设置应用配置信息
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[3]")
// TODO: 设置消费最大数量
.set("spark.streaming.kafka.maxRatePerPartition", "10000")
// b. 传递SparkConf和BatchInterval创建流式上下对象
val context = new StreamingContext(sparkConf, Seconds(5))
// c. 返回实例对象
context
}
// TODO: 从数据源端消费数据,实时处理分析及最后输出
processData(ssc)
// TODO: 启动流式应用,等待终止(人为或程序异常)
ssc.start()
ssc.awaitTermination() // 流式应用启动以后,一直等待终止,否则一直运行
// 无论是否异常最终关闭流式应用(优雅的关闭)
ssc.stop(stopSparkContext = true, stopGracefully = true)
}
}

如果流式应用业务复杂,可以将其单独抽取方法,对DStream数据进行处理分析。

6.2 Checkpoint 恢复

针对Spark Streaming状态应用程序,设置Checkpoint检查点目录,其中存储两种类型数据:

第一类:元数据(Metadata checkpointing),保存定义了 Streaming 计算逻辑

  • 应用程序的配置(Configuration): The configuration that was used to create the streaming application;
  • DStream操作(DStream operations):The set of DStream operations that define the streaming application;
  • 没有完成批处理(Incomplete batches):Batches whose jobs are queued but have not completed yet;

第二类:数据(Data checkpointing),保存已生成的RDDs至可靠的存储

  • 通常有状态的数据横跨多个batch流的时候,需要做checkpoint

Metadata Checkpointing 用来恢复 Driver;Data Checkpointing用来容错stateful的数据处理失败的场景 。
当我们再次运行Streaming Application时,只要从Checkpoint 检查点目录恢复,构建StreamingContext应用,就可以继续从上次消费偏移量消费数据。
在这里插入图片描述
使用StreamingContext中【getActiveOrCreate】方法构建StreamingContext实例对象,方法声明如下:
在这里插入图片描述
若Application为首次重启,将创建一个新的StreamingContext实例;如果Application从失败中重启,从checkpoint目录导入checkpoint数据来重新创建StreamingContext实例。伪代码如下:
在这里插入图片描述
修改上述案例代码:

import java.util.Date
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
/**
* SparkStreaming实现状态累计实时统计:DStream#mapWithState,当流式应用停止以后,再次启动时:
* - 其一:继续上次消费Kafka数据偏移量消费数据:MetaData
* - 其二:继续上次应用停止的状态累加更新状态:State
*/
object StreamingStateCkpt {
// 检查点目录
val CKPT_DIR: String = s"datas/streaming/state-ckpt-10002"
/**
* 抽象一个函数:专门从数据源读取流式数据,经过状态操作分析数据,最终将数据输出
* @param ssc 流式上下文StreamingContext实例对象
*/
def processData(ssc: StreamingContext): Unit ={
// 1. 从Kafka Topic实时消费数据
val kafkaDStream: DStream[ConsumerRecord[String, String]] = {
// i.位置策略
val locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent
// ii.读取哪些Topic数据
val topics = Array("search-log-topic")
// iii.消费Kafka 数据配置参数
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node1.itcast.cn:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group_id_streaming_0002",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
// iv.消费数据策略
val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe(
topics, kafkaParams
)
// v.采用消费者新API获取数据
KafkaUtils.createDirectStream(ssc, locationStrategy, consumerStrategy)
}
// 2. 词频统计,实时累加统计
// 2.1 对数据进行ETL和聚合操作
val reduceDStream: DStream[(String, Int)] = kafkaDStream.transform{ rdd =>
val reduceRDD: RDD[(String, Int)] = rdd
// 过滤不合格的数据
.filter{ record =>
val message: String = record.value()
null != message && message.trim.split(",").length == 4
}
// 提取搜索词,转换数据为二元组,表示每个搜索词出现一次
.map{record =>
val keyword: String = record.value().trim.split(",").last
keyword -> 1
}
// 按照单词分组,聚合统计
.reduceByKey((tmp, item) => tmp + item) // TODO: 先聚合,再更新,优化
// 返回
reduceRDD
}
// 2.2 使用mapWithState函数状态更新, 针对每条数据进行更新状态
val spec: StateSpec[String, Int, Int, (String, Int)] = StateSpec.function(
// (KeyType, Option[ValueType], State[StateType]) => MappedType
(keyword: String, countOption: Option[Int], state: State[Int]) => {
// a. 获取当前批次中搜索词搜索次数
val currentState: Int = countOption.getOrElse(0)
// b. 从以前状态中获取搜索词搜索次数
val previousState = state.getOption().getOrElse(0)
// c. 搜索词总的搜索次数
val latestState = currentState + previousState
// d. 更行状态
state.update(latestState)
// e. 返回最新省份销售订单额
(keyword, latestState)
}
)
// 调用mapWithState函数进行实时累加状态统计
val stateDStream: DStream[(String, Int)] = reduceDStream.mapWithState(spec)
// 3. 统计结果打印至控制台
stateDStream.foreachRDD{(rdd, time) =>
val batchTime: String = FastDateFormat.getInstance("yyyy/MM/dd HH:mm:ss")
.format(new Date(time.milliseconds))
println("-------------------------------------------")
println(s"BatchTime: $batchTime")
println("-------------------------------------------")
if(!rdd.isEmpty()){
rdd.coalesce(1).foreachPartition{_.foreach(println)}
}
}
}
// 应用程序入口
def main(args: Array[String]): Unit = {
// TODO: 构建流式上下文实例对象StreamingContext
/*
def getActiveOrCreate(
checkpointPath: String,
creatingFunc: () => StreamingContext,
hadoopConf: Configuration = SparkHadoopUtil.get.conf,
createOnError: Boolean = false
): StreamingContext
*/
val ssc: StreamingContext = StreamingContext.getActiveOrCreate(
CKPT_DIR, // 检查点目录,第一次运行时没有,构建新的,调用如下方法
// TODO: 第一次运行应用时,一切都是新的,需要创建和指定;非第一次一切都是检查点目录数据恢复
() => {
// a. 创建SparkConf对象,设置应用配置信息
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[3]")
// TODO: 设置消费最大数量
.set("spark.streaming.kafka.maxRatePerPartition", "10000")
// b. 传递SparkConf和BatchInterval创建流式上下对象
val context = new StreamingContext(sparkConf, Seconds(5))
// c. 设置检查点目录
context.checkpoint(CKPT_DIR)
// d. 读取数据、处理数据和输出数据
processData(context)
// e. 返回StreamingContext对象
context
}
)
// 启动流式应用,等待终止(人为或程序异常)
ssc.start()
ssc.awaitTermination() // 流式应用启动以后,一直等待终止,否则一直运行
// 无论是否异常最终关闭流式应用(优雅的关闭)
ssc.stop(stopSparkContext = true, stopGracefully = true)
}
}

当Streaming Application再次运行时,从Checkpoint检查点目录恢复时,有时有问题,比如修改程序,再次从运行时,可能出现类型转换异常,如下所示:

ERROR Utils: Exception encountered
java.lang.ClassCastException: cannot assign instance of cn.itcast.spark.ckpt.StreamingCkptState$$anonfun$streamingProcess$1 to field 
org.apache.spark.streaming.dstream.ForEachDStream.org$apache$spark$streaming$dstream$ForEachDStream$$
foreachFunc of type scala.Function2 in instance of org.apache.spark.streaming.dstream.ForEachDStream
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024)

原因在于修改DStream转换操作,在检查点目录中存储的数据没有此类的相关代码,所以保存ClassCastException异常。
此时无法从检查点读取偏移量信息和转态信息,所以SparkStreaming中Checkpoint功能,属于鸡肋,食之无味,弃之可惜。解决方案:

  • 1)、针对状态信息:当应用启动时,从外部存储系统读取最新状态,比如从MySQL表读取,或者从Redis读取;
  • 2)、针对偏移量数据:自己管理偏移量,将偏移量存储到MySQL表、Zookeeper、HBase或Redis;
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/384198.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JVM 不同垃圾回收器的日志格式分析

1、GC日志采集 在服务器上我们需要配置一些参数才能采集到历史的GC日志信息,这些参数通常在项目启动的时候就需要指定, 如果你项目是jar包,可以按照下面方式指定这些GC参数即可。 下面这些参数意思是把GC日志记录到/opt/app/abc-user/ard-…

蓝桥杯备赛——Echarts学习

文章目录前言学习 ECharts 的方法快速上手基础知识option 配置选项可选配置title 标题组件tooltip 提示框组件axisPointer 坐标轴指示器legend 图例组件toolbox 工具栏坐标轴xAxis和yAxisseries ([ ]用数组表示,数组里是一个个数据对象)饼状图散点图交互…

盘点代码情诗集合❤,程序员表白的巅峰之作,特此奉献

程序员怎么表白?写代码啊!每到情人节,程序员们就纷纷出动,各种别出心裁的表白代码倾囊相送。我曾被大批表白代码砸晕,沉浸在“虚拟的”幸福感中不能自拔。我在众多代码中精选了以下几十条,每一条都是文学素…

Python中的遍历字典的键和值

一、Python的字典在项目的开发过程中,如果遇到有映射关系的内容可以考虑使用Python中的字典进行存储数据,字典中冒号前的数据称为【键】、冒号后的数据称为【值】。二、Python字典的用法2.1、Python的定义#Python字典的定义 字典名称{键1:值1,键2:值2,键…

JavaScript Date 日期对象

文章目录JavaScript Date 日期对象Date 对象Date 对象属性Date 对象方法创建日期设置日期两个日期比较JavaScript Date 日期对象 日期对象用于处理日期和时间。 Date 对象 Date 对象用于处理日期与实际。 创建 Date 对象: new Date(). 以上四种方法同样可以创建…

Validate端口使用手册

知行之桥EDI系统从2020版本开始引入了Validate端口,用来实现对XML数据文件进行一些规则的验证,保证XML数据文件的有效性。本文将介绍如何使用Validate端口。 端口创建 同其他功能性端口一样,只需要将Validata端口从左侧的端口清单拖拽到右侧…

子数组达到规定累加和的最大长度系列问题

文章目录1、题目一:正整数数组中子数组累加和 KKK 最大长度1.1 题目描述1.2 思路分析1.3 代码实现2、题目二:整数数组中子数组累加和为 KKK 的最大长度2.1 题目描述2.2 思路分析2.3 代码实现2.4 引申变形2.5 技巧应用题2.5.1 剑指 Offer II 010. 和为 k …

关于HTTP/3的小知识点

客户端用 TCP 发送了三个包,但服务器所在的操作系统只收到了后两个包,第一个包丢了。那么内核里的 TCP 协议栈就只能把已经收到的包暂存起来,“停下”等着客户端重传那个丢失的包,这样就又出现了“队头阻塞”。由于这种“队头阻塞…

Kubernetes之存储管理(中)

NFS网络存储 emptyDir和hostPath存储,都仅仅是把数据存储在pod所在的节点上,并没有同步到其他节点,如果pod出现问题,通过deployment会产生一个新的pod,如果新的pod不在之前的节点,则会出现问题&#xff0c…

CV——day81(1) 读论文: 基于自监督一致性学习的驾驶场景交通事故检测(有源码)

Traffic Accident Detection via Self-Supervised Consistency Learning in Driving Scenarios 基于自监督一致性学习的驾驶场景交通事故检测I. INTRODUCTIONIII. OUR APPROACHA. 帧预测B. 物体位置预测C. 驾驶场景上下文表示(DSCR)D. 协作多任务一致性学习E.交通事故判定IV. E…

UART串口通信协议

一、协议 1.1 消息格式 串口协议是一种全双工、异步通信协议,不需要同步时钟,数据的发送是一位一位的发送,完整的一帧数据通常由起始位、数据、奇偶校验位和停止位组成 1.2 波特率 为确保正确的收发信息,双方必须设置相同的波…

火山引擎 DataLeap:揭秘字节跳动数据血缘架构演进之路

更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群 DataLeap 是火山引擎数智平台 VeDI 旗下的大数据研发治理套件产品,帮助用户快速完成数据集成、开发、运维、治理、资产、安全等全套数据中台建设&#x…

MySQL 中的锁有哪些类型,MySQL 中加锁的原则

锁的类型MySQL 找那个根据加锁的范围,大致可以分成全局锁,表级锁和行级锁。全局锁全局锁,就是对整个数据库加锁。加锁flush tables with read lock解锁unlock tables全局锁会让整个库处于只读状态,之后所有的更新操作都会被阻塞&a…

OB运维 | 连接 kill 中的 session_id

作者:姚嵩 外星人… 本文来源:原创投稿 *爱可生开源社区出品,原创内容未经授权不得随意使用,转载请联系小编并注明来源。 背景: 通过 obproxy 连接 OB 后,发现: kill 命令使⽤ show processli…

如何写出让人看不懂的MATLAB代码?

最近呢有不少好奇的伙伴私下问咱这是怎么实现大幅度降低matlab代码可读性。于是咱准备将相关的资源分享给大家,这个工具的根来源于大神thrynae (Rik)公开分享一款名叫minify的小工具图片。 咱也不藏着掖着,其实大家用minify作为关键词检索,不…

学生成绩管理系统/学生信息管理系统

文章目录项目介绍一、技术栈二、项目功能介绍三、功能页面展示四、获取代码项目介绍 一、技术栈 编程语言:Java 技术栈:ServletJspJdbcEasyUIjQueryAjax面向接口编程 二、项目功能介绍 用户角色:学生、教师、系统管理员。; 管…

Nuxt 3.0 全栈开发

Nuxt 3.0 全栈开发 - 杨村长 - 掘金小册核心知识 工程架构 全栈进阶 项目实战,快速精通 Nuxt3 开发!。「Nuxt 3.0 全栈开发」由杨村长撰写,299人购买https://s.juejin.cn/ds/S6p7MVo/ 这门课我会全面讲解 Nuxt3 核心知识,然后…

【uni-app教程】一、UniAPP 介绍

一、UniAPP 介绍 (1) 什么是 UniAPP? uni-app 是一个使用 Vue.js 开发所有前端应用的框架,开发者编写一套代码,可发布到 iOS,Android,HS,以及各种小程序(微信/支付宝/百度/头条/QQ/钉钉》等多个平台&#…

Docker之部署Canal

Canal 就是一个同步增量数据的一个工具。 目录概念Mysql开启binlog是否开启binlog开启binlog日志创建授权用户部署Canal拉取镜像挂载properties配置文件创建容器概念 canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数…

VTK例子--使用不同的vtkActor同时显示灰度图、体渲染、多边形

在实际项目中,常遇到不同类型的数据在同一个渲染窗口显示;如网格多边形与灰度图像的显示、体渲染与多边形的显示、体渲染与灰度图像的显示,如下面几张图的效果;多边形灰度图像体渲染多边形体渲染灰度图像如何实现这种混合显示的效…