【Spark分布式内存计算框架——Spark Streaming】10. 应用案例:百度搜索风云榜(中)实时数据ETL存储

news2024/11/16 6:03:48

5.3 实时数据ETL存储

实时从Kafka Topic消费数据,提取ip地址字段,调用【ip2Region】库解析为省份和城市,存储到HDFS文件中,设置批处理时间间隔BatchInterval为10秒,完整代码如下:

package cn.itcast.spark.app.etl
import cn.itcast.spark.app.StreamingContextUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import org.lionsoul.ip2region.{DataBlock, DbConfig, DbSearcher}
/**
* 实时消费Kafka Topic数据,经过ETL(过滤、转换)后,保存至HDFS文件系统中,BatchInterval为:10s
*/
object StreamingETLHdfs {
def main(args: Array[String]): Unit = {
// 1. 获取StreamingContext实例对象
val ssc: StreamingContext = StreamingContextUtils.getStreamingContext(this.getClass, 10)
// 2. 从Kafka消费数据,使用Kafka New Consumer API
val kafkaDStream: DStream[ConsumerRecord[String, String]] = StreamingContextUtils
.consumerKafka(ssc, "search-log-topic")
// 3. 数据ETL:过滤不合格数据及转换IP地址为省份和城市,并存储HDFS上
kafkaDStream.foreachRDD{(rdd, time) =>
// i. message不为null,且分割为4个字段
val kafkaRDD: RDD[ConsumerRecord[String, String]] = rdd.filter{ record =>
val message: String = record.value()
null != message && message.trim.split(",").length == 4
}
// ii. 解析IP地址
val etlRDD: RDD[String] = kafkaRDD.mapPartitions{ iter =>
// 创建DbSearcher对象,针对每个分区创建一个,并不是每条数据创建一个
val dbSearcher = new DbSearcher(new DbConfig(), "dataset/ip2region.db")
iter.map{record =>
val Array(_, ip, _, _) = record.value().split(",")
// 依据IP地址解析
val dataBlock: DataBlock = dbSearcher.btreeSearch(ip)
val region: String = dataBlock.getRegion
val Array(_, _, province, city, _) = region.split("\\|")
// 组合字符串
s"${record.value()},$province,$city"
}
}
// iii. 保存至文件
val savePath = s"datas/streaming/etl/search-log-${time.milliseconds}"
if(!etlRDD.isEmpty()){
etlRDD.coalesce(1).saveAsTextFile(savePath)
}
}
// 4.启动流式应用,一直运行,直到程序手动关闭或异常终止
ssc.start()
ssc.awaitTermination()
ssc.stop(stopSparkContext = true, stopGracefully = true)
}
}

运行模拟日志数据程序和ETL应用程序,查看实时数据ETL后保存文件,截图如下:
在这里插入图片描述

5.4 实时状态更新统计

实 时 累 加 统 计 用 户 各 个 搜 索 词 出 现 的 次 数 , 在 SparkStreaming 中 提 供 函 数【updateStateByKey】实现累加统计,Spark 1.6提供【mapWithState】函数状态统计,性能更好,实际应用中也推荐使用。

updateStateByKey 函数
状态更新函数【updateStateByKey】表示依据Key更新状态,要求DStream中数据类型为【Key/Value】对二元组,函数声明如下:
在这里插入图片描述
将每批次数据状态,按照Key与以前状态,使用定义函数【updateFunc】进行更新,示意图如下:
在这里插入图片描述
文档:http://spark.apache.org/docs/2.4.5/streaming-programming-guide.html#updatestatebykey-operation
针对搜索词词频统计WordCount,状态更新逻辑示意图如下:
在这里插入图片描述
以前的状态数据,保存到Checkpoint检查点目录中,所以在代码中需要设置Checkpoint检查点目录:
在这里插入图片描述
完整演示代码如下:

package cn.itcast.spark.app.state
import cn.itcast.spark.app.StreamingContextUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
/**
* 实时消费Kafka Topic数据,累加统计各个搜索词的搜索次数,实现百度搜索风云榜
*/
object StreamingUpdateState {
def main(args: Array[String]): Unit = {
// 1. 获取StreamingContext实例对象
val ssc: StreamingContext = StreamingContextUtils.getStreamingContext(this.getClass, 5)
// TODO: 设置检查点目录
ssc.checkpoint(s"datas/streaming/state-${System.nanoTime()}")
// 2. 从Kafka消费数据,使用Kafka New Consumer API
val kafkaDStream: DStream[ConsumerRecord[String, String]] = StreamingContextUtils
.consumerKafka(ssc, "search-log-topic")
// 3. 对每批次的数据进行搜索词次数统计
val reduceDStream: DStream[(String, Int)] = kafkaDStream.transform{ rdd =>
val reduceRDD = rdd
// 过滤不合格的数据
.filter{ record =>
val message: String = record.value()
null != message && message.trim.split(",").length == 4
}
// 提取搜索词,转换数据为二元组,表示每个搜索词出现一次
.map{record =>
val keyword: String = record.value().trim.split(",").last
keyword -> 1
}
// 按照单词分组,聚合统计
.reduceByKey((tmp, item) => tmp + item) // TODO: 先聚合,再更新,优化
reduceRDD // 返回
}
/*
def updateStateByKey[S: ClassTag](
// 状态更新函数
updateFunc: (Seq[V], Option[S]) => Option[S]
): DStream[(K, S)]
第一个参数:Seq[V]
表示的是相同Key的所有Value值
第二个参数:Option[S]
表示的是Key的以前状态,可能有值Some,可能没值None,使用Option封装
S泛型,具体类型有业务具体,此处是词频:Int类型
*/
val stateDStream: DStream[(String, Int)] = reduceDStream.updateStateByKey(
(values: Seq[Int], state: Option[Int]) => {
// a. 获取以前状态信息
val previousState = state.getOrElse(0)
// b. 获取当前批次中Key对应状态
val currentState = values.sum
// c. 合并状态
val latestState = previousState + currentState
// d. 返回最新状态
Some(latestState)
}
)
// 5. 将结果数据输出 -> 将每批次的数据处理以后输出
stateDStream.print()
// 6.启动流式应用,一直运行,直到程序手动关闭或异常终止
ssc.start()
ssc.awaitTermination()
ssc.stop(stopSparkContext = true, stopGracefully = true)
}
}

运行应用程序,通过WEB UI界面可以发现,将以前状态保存到Checkpoint检查点目录中,更新时在读取。
在这里插入图片描述
此外,updateStateByKey函数有很多重载方法,依据不同业务需求选择合适的方式使用。

mapWithState 函数
Spark 1.6提供新的状态更新函数【mapWithState】,mapWithState函数也会统计全局的key的状态,但是如果没有数据输入,便不会返回之前的key的状态,只是关心那些已经发生的变化的key,对于没有数据输入,则不会返回那些没有变化的key的数据。
在这里插入图片描述
这样的话,即使数据量很大,checkpoint也不会像updateStateByKey那样,占用太多的存储,效率比较高;
在这里插入图片描述
需要构建StateSpec对象,对状态State进行封装,可以进行相关操作,类的声明定义如下:
在这里插入图片描述
状态函数【mapWithState】参数相关说明:
在这里插入图片描述
完整演示代码如下:

package cn.itcast.spark.app.state
import cn.itcast.spark.app.StreamingContextUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{State, StateSpec, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
/**
* 实时消费Kafka Topic数据,累加统计各个搜索词的搜索次数,实现百度搜索风云榜
*/
object StreamingMapWithState {
def main(args: Array[String]): Unit = {
// 1. 获取StreamingContext实例对象
val ssc: StreamingContext = StreamingContextUtils.getStreamingContext(this.getClass, 5)
// TODO: 设置检查点目录
ssc.checkpoint(s"datas/streaming/state-${System.nanoTime()}")
// 2. 从Kafka消费数据,使用Kafka New Consumer API
val kafkaDStream: DStream[ConsumerRecord[String, String]] = StreamingContextUtils
.consumerKafka(ssc, "search-log-topic")
// 3. 对每批次的数据进行搜索词进行次数统计
val reduceDStream: DStream[(String, Int)] = kafkaDStream.transform{ rdd =>
val reduceRDD: RDD[(String, Int)] = rdd
// 过滤不合格的数据
.filter{ record =>
val message: String = record.value()
null != message && message.trim.split(",").length == 4
}
// 提取搜索词,转换数据为二元组,表示每个搜索词出现一次
.map{record =>
val keyword: String = record.value().trim.split(",").last
keyword -> 1
}
// 按照单词分组,聚合统计
.reduceByKey((tmp, item) => tmp + item) // TODO: 先聚合,再更新,优化
// 返回
reduceRDD
}
// TODO: 4、实时累加统计搜索词搜索次数,使用mapWithState函数
/*
按照Key来更新状态的,一条一条数据的更新状态
def mapWithState[StateType: ClassTag, MappedType: ClassTag](
spec: StateSpec[K, V, StateType, MappedType]
): MapWithStateDStream[K, V, StateType, MappedType]
a. 通过函数源码发现参数使用对象
StateSpec 实例对象
b. StateSpec
表示对状态封装,里面涉及到相关数据类型
c. 如何构建StateSpec对象实例呢??
StateSpec 伴生对象中function函数构建对象
def function[KeyType, ValueType, StateType, MappedType](
// 从函数名称可知,针对每条数据更新Key的转态信息
mappingFunction: (KeyType, Option[ValueType], State[StateType]) => MappedType
): StateSpec[KeyType, ValueType, StateType, MappedType]
*/
// 状态更新函数,针对每条数据进行更新状态
val spec: StateSpec[String, Int, Int, (String, Int)] = StateSpec.function(
// (KeyType, Option[ValueType], State[StateType]) => MappedType
(keyword: String, countOption: Option[Int], state: State[Int]) => {
// a. 获取当前批次中搜索词搜索次数
val currentState: Int = countOption.getOrElse(0)
// b. 从以前状态中获取搜索词搜索次数
val previousState = state.getOption().getOrElse(0)
// c. 搜索词总的搜索次数
val latestState = currentState + previousState
// d. 更行状态
state.update(latestState)
// e. 返回最新省份销售订单额
(keyword, latestState)
}
)
// 调用mapWithState函数进行实时累加状态统计
val stateDStream: DStream[(String, Int)] = reduceDStream.mapWithState(spec)
// 5. 将结果数据输出 -> 将每批次的数据处理以后输出
stateDStream.print()
// 6.启动流式应用,一直运行,直到程序手动关闭或异常终止
ssc.start()
ssc.awaitTermination()
ssc.stop(stopSparkContext = true, stopGracefully = true)
}
}

运行程序可以发现,当Key(搜索单词)没有出现时,不会更新状态,仅仅更新当前批次中出现的Key的状态。
mapWithState 实现有状态管理主要是通过两点:a)、历史状态需要在内存中维护,这里必需的了,updateStateBykey也是一样;b)、自定义更新状态的mappingFunction,这些就是具体的业务功能实现逻辑了(什么时候需要更新状态)

在这里插入图片描述
首先数据像水流一样从左侧的箭头流入,把 mapWithState看成一个转换器的话,mappingFunc就是转换的规则,流入的新数据(key-value)结合历史状态(通过key从内存中获取的历史状态)进行一些自定义逻辑的更新等操作,最终从红色箭头中流出。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/381468.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

华为机试题:HJ94 记票统计(python)

文章目录(1)题目描述(2)Python3实现(3)知识点详解1、input():获取控制台(任意形式)的输入。输出均为字符串类型。1.1、input() 与 list(input()) 的区别、及其相互转换方…

linux和windows中安装emqx消息服务器

大家好,我是雄雄,欢迎关注微信公众号雄雄的小课堂 现在是:2023年3月1日21:53:55 前言 最近几天看了下mqtt,通过不断的搜索资料,也将mqtt集成到项目中,跑了个demo运行,和预想中的差不多&#x…

Altair 宣布将于3月举办 Future.Industry 2023 全球虚拟大会

Altair(纳斯达克股票代码:ALTR)近日宣布将于 2023 年 3 月 8 - 9 日 举办年度全球虚拟大会 Future.Industry 2023。旨在探索影响全球未来的新趋势,并深入探讨仿真、高性能计算 (HPC)、人工智能(AI)和数据分…

使用jenkins+nginx自动化部署前后端项目并打包备份

前言:因为之前使用docker拉取的jenkins无法检测到本地服务器安装的jdk和maven,所以我在本地服务器直接拉取了jenkins,使用8090端口与docker拉取的jenkins进行对比,可以检测到本地服务器安装的jdk和maven,前端和后端分开…

ETHDenver 2023

ETHDenver是全球最大、持续时间最长的以太坊活动之一,今年的活动定于2月24日至3月5日在美国科罗拉多州丹佛市盛大举行。这次活动将面向以太坊和其他区块链协议爱好者、设计者和开发人员。Moonbeam作为ETHDenver 2023的Meta赞助商,将在本次活动中展示令人…

如何使用crAPI学习保护API的安全

关于crAPI crAPI是一个针对API安全的学习和研究平台,在该工具的帮助下,广大研究人员可以轻松学习和了解排名前十的关键API安全风险。因此,crAPI在设计上故意遗留了大量安全漏洞,我们可以通过 crAPI学习和研究API安全。 crAPI采用…

如何把照片的底色修改为想要的颜色

如何给照片更换底色?其实有可以一键给照片更换底色的 APP ,但是几乎都要收费。如果想要免费的给照片更换底色的话,分享两种简单便捷的方法给你。掌握了这项技能,以后就不用店花钱处理啦!1、免费!线上快速 给…

【SpringBoot教程】SpringBoot入门

SpringBoot介绍 SpringBoot简介 在您第1次接触和学习Spring框架的时候,是否因为其繁杂的配置而退却了?在你第n次使用Spring框架的时候,是否觉得一堆反复黏贴的配置有一些厌烦?那么您就不妨来试试使用Spring Boot来让你更易上手&…

Rancher Prime 为平台工程提供面向 K8s 的弹性能力

作者简介 张应罗,SUSE 资深架构师,拥有 16 年架构咨询工作经验,专注于 SUSE Enterprise Container Management 相关的产品落地方案及咨询方案设计。 平台工程 “DevOps 已死,平台工程才是未来!” 去年,知名软件工程师…

原来JS可以这么实现继承

当我们在编写代码的时候,有一些对象内部会有一些方法(函数),如果将这些函数在构造函数内部声明会导致内存的浪费,因为实例化构造函数得到不同的实例对象,其内部都有同一个方法,但是占据了不同的…

【unity】开发rts 4 放置建筑进地图;创建单位;单位移动;单位生产者

一 放置建筑进地图 1 GameManage,slot faction阵营插槽, 新建筑拖进secne,便于管理,在实例栏将建筑放在factions下 inital faction entitys,(也可带上cam look at) 添加新建筑上 此时测试&a…

大专生学云计算,工作好找吗?

当然可以找到工作。云计算行业整体是处理高速成长期,市场规模以每年30%的速度增长,市场的扩张意味着需要更多的技术人才支撑,据统计云计算未来五年人才缺口将达到150万。 目前企业对于学历的要求并不高,基本大专及以上都可以的。…

基于神经网络(RBF)补偿的双关节机械手臂自适应控制

目录 前言 1.双关节机械手臂模型 1.1 实际模型 1.2 名义模型 2. 控制律设计 3. 神经网络补偿自适应律设计 3.1自适应律① 3.2自适应律② 4. 仿真分析 4.1仿真模型 4.2 仿真结果 4.3 小结 5学习问题 前言 所谓的补偿可以简单的理解为:将扰动的模型估计出…

【C++】开散列实现unordered_map与unordered_set的封装

本文主要介绍unordered_map与unordered_set的封装,此次封装主要用上文所说到的开散列,通过开散列的一些改造来实现unordered_map与unordered_set的封装 文章目录一、模板参数二、string的特化三、正向迭代器四、构造与析构五、[]的实现六、unordered_map的实现七、u…

MySQL安装教程

目录 一、认识MySQL 二、为什么要选用MySQL 三、关于MySQL8.0 四、安装步骤 一、认识MySQL 1、MySQL是一个开放源码的关系型数据库管理系统,将数据保存再不同的表中,而不是将所有数据放在一个大仓库内,这样就增加了速度并提高了灵活性。 …

【java基础】接口(interface)

文章目录基础介绍接口的定义关于接口字段和方法的说明使用接口抽象类和接口接口方法冲突的一些说明方法相同名称和参数,返回值相同方法名称相同,参数不同,返回值相同方法返回值不同,名称参数相同方法完全相同,一个有默…

中文预训练大模型—文心Ernie技术原理

文心Ernie技术原理 一、背景技术 Ernie是基于Bert模型进行改进,基本模型是Transformer,Bert完成的预训练任务是:完形填空(通过基本语言单元掩码);上下句预测。 Bert模型的缺陷是:只能捕获局部…

关于Charles抓包

目录 抓包的原理 抓包的步骤 1. 下载Charles 2. PC抓HTTPS协议的包 3. 移动端抓包步骤 证书的原理 抓包的原理 抓包的软件非常多,其实底层逻辑充当了一个中间人代理的角色来对HTTPS进行抓包,结合日常自己用的Charles做一个记录。首先先了解抓包的原…

RT-Thread内核学习笔记

文章目录RT-Thread一、线程1. 线程定义2. 线程栈3. 线程函数 rt_thread_entry()4. 线程控制块 struct rt_thread5. 线程初始化 rt_thread_init()6. 就绪列表7. 调度器二、对象容器1. 对象:所有的数据结构都是对象2. 容器:每当创建一个对象,就…

腾讯云GPU游戏服务器/云主机租用配置价格表出炉!

用于游戏业务的服务器和普通云服务器和主机空间是不同的,游戏服务器对于硬件的配置、网络带宽有更大的要求,一般游戏服务器根据不同的配置和适用场景会有十几元一小时到几十元一小时,而且可以根据不同的按量计费。而普通的云服务器可能需要几…