【Spark分布式内存计算框架——Spark Streaming】13. 偏移量管理(下)MySQL 存储偏移量

news2024/12/28 20:37:22

6.3 MySQL 存储偏移量

此处将偏移量数据存储到MySQL表中,数据库及表的DDL和DML语句如下:

-- 1. 创建数据库的语句
CREATE DATABASE IF NOT EXISTS db_spark DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
USE db_spark ;
-- 2. 创建表的语句
CREATE TABLE `tb_offset` (
`topic` varchar(255) NOT NULL,
`partition` int NOT NULL,
`groupid` varchar(255) NOT NULL,
`offset` bigint NOT NULL,
PRIMARY KEY (`topic`,`partition`,`groupid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ;
-- 3. 插入数据语句replace
replace into tb_offset (`topic`, `partition`, `groupid`, `offset`) values(?, ?, ?, ?)
--/*
-- replace语句执行时,分以下两种情况:
-- - 情况1:insert
-- 当不存在主键冲突或唯一索引冲突,相当于insert操作
-- - 情况2:delete and insert
-- 当存在主键冲突或唯一索引冲突,相当于delete操作,加insert操作
--*/
-- 4. 查询数据语句select
select * from tb_offset where topic in ('xx', 'yy') AND groupid = 'gid001' ;
select * from tb_offset where topic in (?) and groupid = ? ;

编写工具类
工具类OffsetsUtils从MySQL数据库表中读取消费的偏移量信息和保存最近消费的偏移量值,示意图如下所示:
在这里插入图片描述
工 具 类 中 包 含 如 何 保 存 偏 移 量 【 saveOffsetsToTable 】 和 读 取 偏 移 量【getOffsetsToMap】两个函数,相关声明如下:

import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
/**
* 将消费Kafka Topic偏移量数据存储MySQL数据库,工具类用于读取和保存偏移量数据
*/
object OffsetsUtils {
/**
* 依据Topic名称和消费组GroupId获取各个分区的偏移量
*
* @param topicNames Topics名称
* @param groupId 消费组ID
*/
def getOffsetsToMap(topicNames: Array[String], groupId: String): Map[TopicPartition, Long] ={
 null
}
/**
* 保存Streaming每次消费Kafka数据后最新偏移量到MySQL表中
*
* @param offsetRanges Topic中各个分区消费偏移量范围
* @param groupId 消费组ID
*/
def saveOffsetsToTable(offsetRanges: Array[OffsetRange], groupId: String): Unit = {
}
}

依据业务实现工具类中方法,主要考察就是对MySQL数据库表数据的操作:从表中读取数据和向表写入数据,完整代码如下:

package cn.itcast.spark.offset
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import scala.collection.mutable
/**
* 将消费Kafka Topic偏移量数据存储MySQL数据库,工具类用于读取和保存偏移量数据
* 表的创建语句:
CREATE TABLE `tb_offset` (
`topic` varchar(255) NOT NULL,
`partition` int NOT NULL,
`groupid` varchar(255) NOT NULL,
`offset` bigint NOT NULL,
PRIMARY KEY (`topic`,`partition`,`groupid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
*/
object OffsetsUtils {
/**
* 依据Topic名称和消费组GroupId获取各个分区的偏移量
*
* @param topicNames Topics名称
* @param groupId 消费组ID
*/
def getOffsetsToMap(topicNames: Array[String], groupId: String): Map[TopicPartition, Long] ={
// 构建集合
val map: mutable.Map[TopicPartition, Long] = scala.collection.mutable.Map[TopicPartition, Lon
g]()
// 声明变量
var conn: Connection = null
var pstmt: PreparedStatement = null
var result: ResultSet = null
try{
// a. 加载驱动类
Class.forName("com.mysql.cj.jdbc.Driver")
// b. 获取连接
conn = DriverManager.getConnection(
"jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnic
ode=true", //
"root", //
"123456" //
)
// c. 编写SQL,获取PreparedStatement对象
val topicNamesStr = topicNames.map(topicName => s"\'$topicName\'").mkString(", ")
val querySQL =
s"""
|SELECT
| `topic`, `partition`, `groupid`, `offset`
|FROM
| db_spark.tb_offset
|WHERE
| groupid = ? AND topic in ($topicNamesStr)
|""".stripMargin
pstmt = conn.prepareStatement(querySQL)
pstmt.setString(1, groupId)
// d. 查询数据
result = pstmt.executeQuery()
// e. 遍历获取值
while (result.next()){
val topicName = result.getString("topic")
val partitionId = result.getInt("partition")
val offset = result.getLong("offset")
// 加入集合中
map += new TopicPartition(topicName, partitionId) -> offset
}
}catch {
case e: Exception => e.printStackTrace()
}finally {
if(null != result) result.close()
if(null != pstmt) pstmt.close()
if(null != conn) conn.close()
}
// 返回集合,转换为不可变的
map.toMap
}
/**
* 保存Streaming每次消费Kafka数据后最新偏移量到MySQL表中
*
* @param offsetRanges Topic中各个分区消费偏移量范围
* @param groupId 消费组ID
*/
def saveOffsetsToTable(offsetRanges: Array[OffsetRange], groupId: String): Unit = {
// 声明变量
var conn: Connection = null
var pstmt: PreparedStatement = null
try{
// a. 加载驱动类
Class.forName("com.mysql.cj.jdbc.Driver")
// b. 获取连接
conn = DriverManager.getConnection(
"jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnic
ode=true", //
"root", //
"123456" //
)
// c. 编写SQL,获取PreparedStatement对象
val insertSQL = "replace into db_spark.tb_offset (`topic`, `partition`, `groupid`, `offse
t`) values (?, ?, ?, ?)"
pstmt = conn.prepareStatement(insertSQL)
// d. 设置参数
offsetRanges.foreach{offsetRange =>
pstmt.setString(1, offsetRange.topic)
pstmt.setInt(2, offsetRange.partition)
pstmt.setString(3, groupId)
pstmt.setLong(4, offsetRange.untilOffset)
// 加入批次
pstmt.addBatch()
}
// e. 批量插入
pstmt.executeBatch()
}catch {
case e: Exception => e.printStackTrace()
}finally {
if(null != pstmt) pstmt.close()
if(null != conn) conn.close()
}
}
def main(args: Array[String]): Unit = {
/*
saveOffsetsToTable(
Array(
OffsetRange("xx-tp", 0, 11L, 100L),
OffsetRange("xx-tp", 1, 11L, 100L),
OffsetRange("yy-tp", 0, 10L, 500L),
OffsetRange("yy-tp", 1, 10L, 500L)
),
"group_id_00001"
)
*/
//getOffsetsToMap(Array("xx-tp"), "group_id_00001").foreach(println)
}
}

加载和保存偏移量
从Kafka Topic消费数据时,首先从MySQL数据库加载偏移量,如果有值,使用如下函数:
在这里插入图片描述
从Kafka Topic消费数据时,直接获取的DStream中每批次RDD都是KafkaRDD,可以获取数据偏移量范围信息OffsetRanges。
在这里插入图片描述
修改前面实时订单消费额统计代码,自己管理消费偏移量,存储到MySQL表中,代码如下:

import java.util.Date
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
/**
* SparkStreaming从Kafka Topic实时消费数据,手动管理消费偏移量,保存至MySQL数据库表
*/
object StreamingManagerOffsets {
/**
* 抽象一个函数:专门从数据源读取流式数据,经过状态操作分析数据,最终将数据输出
*/
def processData(ssc: StreamingContext): Unit ={
// 1. 从Kafka Topic实时消费数据
val groupId: String = "group_id_10001" // 消费者GroupID
val kafkaDStream: DStream[ConsumerRecord[String, String]] = {
// i.位置策略
val locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent
// ii.读取哪些Topic数据
val topics = Array("search-log-topic")
// iii.消费Kafka 数据配置参数
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node1.itcast.cn:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
// iv.消费数据策略
// TODO: 从MySQL数据库表中获取偏移量信息
val offsetsMap: Map[TopicPartition, Long] = OffsetsUtils.getOffsetsToMap(topics, groupId)
val consumerStrategy: ConsumerStrategy[String, String] = if(offsetsMap.isEmpty){
// TODO: 如果第一次消费topic数据,此时MySQL数据库表中没有偏移量信息, 从最大偏移量消费数据
ConsumerStrategies.Subscribe(topics, kafkaParams)
}else{
// TODO: 如果不为空,指定消费偏移量
ConsumerStrategies.Subscribe(topics, kafkaParams, offsetsMap)
}
// v.采用消费者新API获取数据
KafkaUtils.createDirectStream(ssc, locationStrategy, consumerStrategy)
}
// TODO: 其一、创建空Array数组,指定类型
var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]
// 2. 词频统计,实时累加统计
// 2.1 对数据进行ETL和聚合操作
val reduceDStream: DStream[(String, Int)] = kafkaDStream.transform{ rdd =>
// TODO:其二、从KafkaRDD中获取每个分区数据对应的偏移量信息
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val reduceRDD: RDD[(String, Int)] = rdd
// 过滤不合格的数据
.filter{ record =>
val message: String = record.value()
null != message && message.trim.split(",").length == 4
}
// 提取搜索词,转换数据为二元组,表示每个搜索词出现一次
.map{record =>
val keyword: String = record.value().trim.split(",").last
keyword -> 1
}
// 按照单词分组,聚合统计
.reduceByKey((tmp, item) => tmp + item) // TODO: 先聚合,再更新,优化
// 返回
reduceRDD
}
// 2.2 使用mapWithState函数状态更新, 针对每条数据进行更新状态
val spec: StateSpec[String, Int, Int, (String, Int)] = StateSpec.function(
// (KeyType, Option[ValueType], State[StateType]) => MappedType
(keyword: String, countOption: Option[Int], state: State[Int]) => {
// a. 获取当前批次中搜索词搜索次数
val currentState: Int = countOption.getOrElse(0)
// b. 从以前状态中获取搜索词搜索次数
val previousState = state.getOption().getOrElse(0)
// c. 搜索词总的搜索次数
val latestState = currentState + previousState
// d. 更行状态
state.update(latestState)
// e. 返回最新省份销售订单额
(keyword, latestState)
}
)
// 设置状态的初始值,比如状态数据保存Redis中,此时可以从Redis中读取
//spec.initialState(ssc.sparkContext.parallelize(List("罗志祥" -> 123, "裸海蝶" -> 342)))
// 调用mapWithState函数进行实时累加状态统计
val stateDStream: DStream[(String, Int)] = reduceDStream.mapWithState(spec)
// 3. 统计结果打印至控制台
stateDStream.foreachRDD{(rdd, time) =>
val batchTime: String = FastDateFormat.getInstance("yyyy/MM/dd HH:mm:ss")
.format(new Date(time.milliseconds))
println("-------------------------------------------")
println(s"BatchTime: $batchTime")
println("-------------------------------------------")
if(!rdd.isEmpty()){
rdd.coalesce(1).foreachPartition{_.foreach(println)}
}
// TODO: 其三、保存每批次数据偏移量到MySQL数据库表中
OffsetsUtils.saveOffsetsToTable(offsetRanges, groupId)
}
}
// 应用程序入口
def main(args: Array[String]): Unit = {
// 1). 构建流式上下文StreamingContext实例对象
val ssc: StreamingContext = StreamingContext.getActiveOrCreate(
() => {
// a. 创建SparkConf对象
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[3]")
// b. 创建StreamingContext对象
val context = new StreamingContext(sparkConf, Seconds(5))
// c. 返回
context
}
)
ssc.checkpoint(s"datas/streaming/offsets-${System.nanoTime()}")
// 2). 实时消费Kafka数据,统计分析
processData(ssc)
// 3). 启动流式应用,等待终止(人为或程序异常)
ssc.start()
ssc.awaitTermination() // 流式应用启动以后,一直等待终止,否则一直运行
// 无论是否异常最终关闭流式应用(优雅的关闭)
ssc.stop(stopSparkContext = true, stopGracefully = true)
}
}

经过测试发现完全没有问题的,代码可以进一步优化,提高性能:由于每批次数据结果RDD输出以后,都需要向MySQL数据库表更新偏移量数据,频繁连接数据库,建议构建数据库连接池,每次从池子中获取连接。

实际项目中将偏移量保存至Zookeeper上或者Redis中,原因如下:

  • 1)、保存Zookeeper上:方便使用Kafka 监控工具管理Kafka 各个Topic被消费信息;
  • 2)、保存Redis上:从Redis读取数据和保存数据很快,基于内存数据库;
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/386244.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

蓝牙资讯|2022 年 Q4 全球 TWS 耳机出货量 7900 万部

Canalys 最新数据显示,2022 年第四季度,全球个人智能音频设备出货量下降 26%,跌至 1.1 亿部。所有品类的出货量都面临不一的下滑趋势,甚至是一直支撑市场的 TWS 品类也遭遇 23% 两位数的下降至 7900 万部。 全球市场方面&#x…

MySQL中varchar(M)存储字符串过长

最近写项目&#xff0c;数据库报了一个错&#xff0c;错误原因是MySQL中存储的字符串过长最近在学MySQL的基础&#xff0c;刚好学到了关于varchar类型要存储的字符串是 “<p>12121212121212</p>\n<p><img src\"https://zzjzzjzzjbucket.oss-cn-hangz…

附录5-大事件项目前端

目录 1 前言 2 用到的插件 2.1 截取图像 cropper 2.2 富文本编辑器 tinymce 3 项目结构 4 config.js 5 主页 5.1 iframe 5.2 页面的宽高 5.3 修改文章 6 个人中心-基本资料 7 个人中心-更换头像 8 个人中心-更换密码 9 文章管理-文章分类 10 文章…

Springboot集成kafka(环境搭建+演示)|超级详细,建议收藏

Springboot集成kafka一、前言&#x1f525;二、环境说明&#x1f525;三、概念&#x1f525;四、CentOS7安装kafka&#x1f525;1.下载kafka安装包2.下载好后&#xff0c;进行解压六、kafka项目集成&#x1f525;1️⃣pom引入2️⃣配置kafka3️⃣一个kafka消息发送端4️⃣定义一…

MySQL45讲笔记04深入浅出索引上

索引的目的: 索引的出现其实就是为了提高数据查询的效率&#xff0c;就像书的目录一样。常见索引模型&#xff1a; hash表&#xff0c;以K-V键值对的形式的一种数据结构&#xff0c;底层是数组加链表形式。通过一定的hash运算找到数据合适的位置放入&#xff0c;如果放入的位置…

[jetson]paddlepaddle2.4.0在jetpack5.0.2源码编译流程

由于官方暂时没有提供jetson对应的jetson jetpack5.0.2预编译包&#xff0c;因此只有源码编译&#xff0c;本次编译不带Tensorrt,编译已经顺利成功&#xff0c;注意本次使用的设备是jetson NX 测试环境&#xff1a; ubuntu20.04 jetpack5.0.2 GCC-8.4 Software part of jet…

Centos7搭建NFS

1.NFS简介Network File System(网络文件系统&#xff0c;通过网络让不同的机器系统之间可以彼此共享文件和目录&#xff0c;类似Samba服务。2.NFS挂载原理 在网络中服务器和客户端进行连接都是通过端口进行数据传输&#xff0c;而NFS服务端的端口是随机的&#xff0c;从而导致N…

Linux----网络基础(2)--应用层的序列化与反序列化--守护进程--0226

文章中有使用封装好的头文件&#xff0c;可以在下面连接处查询。 Linux相关博文中使用的头文件_Gosolo&#xff01;的博客-CSDN博客 1. 应用层 我们程序员写的一个个解决我们实际问题, 满足我们日常需求的网络程序, 都是在应用层 1.2 协议 我们在之前的套接字编程中使用的是…

最适合你的团队云协作工具

团队云协作工具哪个好&#xff1f;使用Zoho Projects的团队云协作软件套件&#xff0c;在一个平台上无缝协作&#xff0c;激励您的团队在任何地方以最好的状态完成他们的工作。 使您的团队能够使用团队云协作软件在任何地方进行协作和沟通。Zoho Projects提供了一套强大…

三天吃透计算机网络八股文

本文已经收录到Github仓库&#xff0c;该仓库包含计算机基础、Java基础、多线程、JVM、数据库、Redis、Spring、Mybatis、SpringMVC、SpringBoot、分布式、微服务、设计模式、架构、校招社招分享等核心知识点&#xff0c;欢迎star~ Github地址&#xff1a;https://github.com/…

一文读懂光学天线

天线&#xff0c;按维基百科的定义&#xff0c;"是一种用来发射或接收无线电波—或更广泛来讲—电磁波的器件"。例如&#xff0c;在无线通信系统中&#xff0c;天线被用于发射与接收射频与微波波段的电磁波。而在我们的智能手机中&#xff0c;就有内置的平面倒F天线(…

01-认识产品经理

文章目录引入1.1 合格的产品经理1.2 产品经理的分类按服务对象不同划分按产品平台不同划分按公司所属行业不同按工作内容划分按职级高低划分1.3 产品经理的岗位职责产品的开发流程核心团队成员及其职责产品经理工作中常见误区1.4 产品经理的能力素质专业技能&#xff08;干得了…

Unity Lighting -- 配置平行光源和天空盒

识别不同种类的光源 在游戏或实时应用程序中&#xff0c;我们可能会创建多种不同种类的场景&#xff0c;比如室内场景、室外场景、真实的场景或完全想象的场景。即便项目是一个完全的想象的或是科幻的故事&#xff0c;灯光也是非常重要的一环&#xff0c;它能极大提升沉浸感。 …

Python3-条件控制

Python3 条件控制 Python 条件语句是通过一条或多条语句的执行结果&#xff08;True 或者 False&#xff09;来决定执行的代码块。 可以通过下图来简单了解条件语句的执行过程: 代码执行过程&#xff1a; if 语句 Python中if语句的一般形式如下所示&#xff1a; if condi…

Atlassian Server用户新选择 | 数据中心产品是否适合您的企业(3)?

2024年2月&#xff0c;也就是一年不到&#xff0c;Atlassian将终止对Server产品及插件的所有支持。 此公告发布后&#xff0c;许多用户需要了解怎样的前进方向才是最适合企业的。为此&#xff0c;Atlassian不仅提供云版&#xff0c;还提供了本地部署的数据中心&#xff08;Data…

jupyter lab安装和配置

jupyter lab 安装和配置 一、jupyter lab安装并配置 安装jupyterlab pip install jupyterlab启动 Jupyter lab默认会打开实验环境的&#xff0c;也可以自己在浏览器地址栏输入127.0.0.1:8888/lab 汉化 pip install jupyterlab-language-pack-zh-CN刷新一下网页&#xff0…

ChatGPT解答:PYQT5 组件化实例,Python代码实现,给出100个代码实例

ChatGPT解答&#xff1a; PYQT5 组件化实例&#xff0c;Python代码实现&#xff0c;给出100个代码实例 PYQT5 组件化实例&#xff0c;Python代码实现&#xff0c;给出100个代码实例 实现一个简单的窗口 import sys from PyQt5.QtWidgets import QApplication, QWidgetapp QA…

我90后,零基础成功转行python工程师,从月薪5K到现在月入2W+改变真的难吗?

我是25岁转行学python的。说实在&#xff0c;转行就是奔着挣钱去的。希望我的经历可以给想转行的朋友带来一点启发和借鉴。 先简单介绍下个人背景&#xff0c;三流大学毕业&#xff0c;物流专业&#xff0c;学习能力一般&#xff0c;没啥特别技能&#xff0c;反正就很普通的一…

CSS3新特性-变量

2017年三月&#xff0c;微软宣布 Edge 浏览器将支持 CSS 变量。 这个重要的 CSS 新功能&#xff0c;所有主要浏览器已经都支持了。本文全面介绍如何使用它&#xff0c;你会发现原生 CSS 从此变得异常强大。 一、变量的声明 声明变量的时候&#xff0c;变量名前面要加两根连词…

python入门应该怎么学习

国外Python的使用率非常高&#xff0c;但在国内Python是近几年才火起来&#xff0c;Python正处于高速上升期市场对于Python开发人才的需求量急剧增加&#xff0c;学习Python的前景比较好。 Python应用领域广泛&#xff0c;意味着选择Python的同学在学成之后可选择的就业领域有…