【Spark分布式内存计算框架——Spark Streaming】11. 应用案例:百度搜索风云榜(下)实时窗口统计

news2024/9/21 21:42:10

5.5 实时窗口统计

SparkStreaming中提供一些列窗口函数,方便对窗口数据进行分析,文档:
http://spark.apache.org/docs/2.4.5/streaming-programming-guide.html#window-operations

在实际项目中,很多时候需求:每隔一段时间统计最近数据状态,并不是对所有数据进行统计,称为趋势统计或者窗口统计,SparkStreaming中提供相关函数实现功能,业务逻辑如下:
在这里插入图片描述
针对用户百度搜索日志数据,实现【近期时间内热搜Top10】,统计最近一段时间范围(比如,最近半个小时或最近2个小时)内用户搜索词次数,获取Top10搜索词及次数。
窗口函数【window】声明如下,包含两个参数:窗口大小(WindowInterval,每次统计数据范围)和滑动大小(每隔多久统计一次),都必须是批处理时间间隔BatchInterval整数倍。
在这里插入图片描述
案例完整实现代码如下,为了演示方便,假设BatchInterval为2秒,WindowInterval为4秒,SlideInterval为2秒。

package cn.itcast.spark.app.window
import cn.itcast.spark.app.StreamingContextUtils
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 实时消费Kafka Topic数据,每隔一段时间统计最近搜索日志中搜索词次数
* 批处理时间间隔:BatchInterval = 2s
* 窗口大小间隔:WindowInterval = 4s
* 滑动大小间隔:SliderInterval = 2s
*/
object StreamingWindow {
def main(args: Array[String]): Unit = {
// Streaming应用BatchInterval
val BATCH_INTERVAL: Int = 2
// Streaming应用窗口大小
val WINDOW_INTERVAL: Int = BATCH_INTERVAL * 2
val SLIDER_INTERVAL: Int = BATCH_INTERVAL * 1
// 1. 获取StreamingContext实例对象
val ssc: StreamingContext = StreamingContextUtils.getStreamingContext(this.getClass, BATCH_INTERVAL)
// 2. 从Kafka消费数据,使用Kafka New Consumer API
val kafkaDStream: DStream[String] = StreamingContextUtils
.consumerKafka(ssc, "search-log-topic")
.map(record => record.value())
// TODO: 添加窗口,设置对应参数
/*
def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
警告信息:
ERROR KafkaRDD: Kafka ConsumerRecord is not serializable.
Use .map to extract fields before calling .persist or .window
*/
val windowDStream: DStream[String] = kafkaDStream.window(
Seconds(WINDOW_INTERVAL), Seconds(SLIDER_INTERVAL)
)
// 4. 对每批次的数据进行搜索词进行次数统计
val countDStream: DStream[(String, Int)] = windowDStream.transform{ rdd =>
val resultRDD = rdd
// 过滤不合格的数据
.filter( message => null != message && message.trim.split(",").length == 4)
// 提取搜索词,转换数据为二元组,表示每个搜索词出现一次
.map{message =>
val keyword: String = message.trim.split(",").last
keyword -> 1
}
// 按照单词分组,聚合统计
.reduceByKey((tmp, item) => tmp + item)
// 返回
resultRDD
}
// 5. 将结果数据输出 -> 将每批次的数据处理以后输出
countDStream.print()
// 6.启动流式应用,一直运行,直到程序手动关闭或异常终止
ssc.start()
ssc.awaitTermination()
ssc.stop(stopSparkContext = true, stopGracefully = true)
}
}

SparkStreaming中同时提供将窗口Window设置与聚合reduceByKey合在一起的函数,为了更加方便编程。
在这里插入图片描述
使用【reduceByKeyAndWindow】函数,修改上述代码,实现窗口统计,具体代码如下:

package cn.itcast.spark.app.window
import cn.itcast.spark.app.StreamingContextUtils
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 实时消费Kafka Topic数据,每隔一段时间统计最近搜索日志中搜索词次数
* 批处理时间间隔:BatchInterval = 2s
* 窗口大小间隔:WindowInterval = 4s
* 滑动大小间隔:SliderInterval = 2s
*/
object StreamingReduceWindow {
def main(args: Array[String]): Unit = {
// Streaming应用BatchInterval
val BATCH_INTERVAL: Int = 2
// Streaming应用窗口大小
val WINDOW_INTERVAL: Int = BATCH_INTERVAL * 2
val SLIDER_INTERVAL: Int = BATCH_INTERVAL * 1
// 1. 获取StreamingContext实例对象
val ssc: StreamingContext = StreamingContextUtils.getStreamingContext(this.getClass, BATCH_INTERVAL)
// 2. 从Kafka消费数据,使用Kafka New Consumer API
val kafkaDStream: DStream[String] = StreamingContextUtils
.consumerKafka(ssc, "search-log-topic")
.map(recored => recored.value())
// 3. 对每批次的数据进行搜索词进行次数统计
val etlDStream: DStream[(String, Int)] = kafkaDStream.transform{ rdd =>
val etlRDD = rdd
// 过滤不合格的数据
.filter( message => null != message && message.trim.split(",").length == 4)
// 提取搜索词,转换数据为二元组,表示每个搜索词出现一次
.map{message =>
val keyword: String = message.trim.split(",").last
keyword -> 1
}
etlRDD // 返回
}
// 4. 对获取流式数据进行ETL后,使用窗口聚合函数统计计算
/*
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V, // 聚合函数
windowDuration: Duration, // 窗口大小
slideDuration: Duration // 滑动大小
): DStream[(K, V)]
*/
val resultDStream: DStream[(String, Int)] = etlDStream.reduceByKeyAndWindow(
(tmp: Int, value: Int) => tmp + value, //
Seconds(WINDOW_INTERVAL), //
Seconds(SLIDER_INTERVAL) //
)
// 5. 将结果数据输出 -> 将每批次的数据处理以后输出
resultDStream.print()
// 6.启动流式应用,一直运行,直到程序手动关闭或异常终止
ssc.start()
ssc.awaitTermination()
ssc.stop(stopSparkContext = true, stopGracefully = true)
}
}

6. 偏移量管理

针对前面实现【百度热搜排行榜Top10】实时状态统计应用来说,当应用关闭以后,再次启动(Restart)执行,并没有继续从上次消费偏移量读取数据和获取以前状态信息,而是从最新偏移量(Latest Offset)开始的消费,肯定不符合实际需求,有两种解决方式:

方式一:Checkpoint 恢复

  • 当流式应用再次启动时,从Checkpoint 检查点目录恢复,可以读取上次消费偏移量信息和状态相关数据,继续实时处理数据。
  • 文档:http://spark.apache.org/docs/2.4.5/streaming-programming-guide.html#checkpointing

方式二:手动管理偏移量

  • 用户编程管理每批次消费数据的偏移量,当再次启动应用时,读取上次消费偏移量信息,继续实时处理数据。
  • 文档:http://spark.apache.org/docs/2.4.5/streaming-kafka-0-10-integration.html#storing-offsets

在实际生产项目中,常常使用第二种方式【手动管理偏移量】,将偏移量存储到MySQL、Redis或Zookeeper中,接下来讲解两种方式实现,都需要掌握。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/383605.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数智未来,AI赋能——第四届OpenI/O 启智开发者大会昇腾人工智能应用专场圆满举行!

为提升启智社区与鹏城实验室在人工智能开源领域的影响力,促进社区成员与开源生态圈伙伴的合作。2月25日上午,第四届OpenI/O 启智开发者大会昇腾人工智能应用专场分论坛在深圳人才研修院举办,进一步促进与发挥企业间资源共通的优势&#xff0c…

【ROS2知识】关于colcon编译和ament指定

一、说明 这里说说编译和包生成的操作要点,以python包为例。对于初学者来说,colcon和ament需要概念上搞清楚,与此同时,工作空间、包、节点在一个工程中需要熟练掌握。本文以humble版的ROS2,进行python编程的实现。 二、…

【408之计算机组成原理】计算机系统概述

目录前言一、计算机的发展历程1. 计算机发展的四代变化2. 计算机元件的更新换代3. 计算机软件的发展二、计算机系统层次结构1. 计算机系统的组成2. 冯诺依曼体系结构3. 计算机的功能部件1. 输入设备2. 输出设备3. 存储器4. 运算器5. 控制器三、 分析计算机各个部件在执行代码中…

【算法】阿里面试题-编码实现20亿个整数,找出某个数X是否存在其中

1.海量数据去重-BitMap位图解决方案 需求(面试题) 一个32位4G内存的操作系统,在20亿个整数,找出某个数X是否存在其中 假如是java语言,int占4字节,1字节8位(1 byte 8 bit) 方式一&…

Mockito 入门

目录1.什么是 Mock 测试?2.Mockito简介3.在 SpringBoot 单元测试中使用 Mockito3.1 Maven依赖:3.2 UserService.java3.3 User.java3.4 thenReturn系列方法(测试桩)3.5 thenThrow系列方法3.6 verify 系列方法4.Spring中mock任何容器…

计算机组成原理-动态链接库-笔记

Linux 下的 ELF 文件格式 Windows 的可执行文件格式是一种叫作PE(Portable Executable Format)的文件格式 动态链接库 这些机器码必须是“地址无关”的。也就是说,我们编译出来的共享库文件的指令代码,是地址无关码(…

10个可以实现高效工作与在线赚钱的 AI 工具网站

自 2020 年以来,内容开发领域已经感受到人工智能工具的存在。 目前,营销人员和内容创作者正在利用这些工具来加快他们的工作流程。 如果您拥有最流行的 AI 工具之一,例如 CopyAI、Jasper AI 或 Content at Scale,您可能正在考虑…

申论套卷 | 要点杂、乱、碎的材料如何快速分类整理?

试卷来源:2020年全国联考上半年材料1A省C市Y区文化馆(非物质文化遗产保护中心)作为政府设立的公益性公共文化事业单位,始终坚持公益文化发展方向,面向基层、贴近百姓、服务大众,积极组织各种大型活动&#…

Ubuntu中安装matelab2020a

Ubuntu中安装matelab2020a1 matelab下载2 安装步骤3 激活matelab4 创建快捷方式我的Ubuntu版本是20.041 matelab下载 matelab官网https://www.mathworks.com/ 点击右上角的get matelab,进入下载页面 没有账号的同学可以先去注册一个,推荐使用教育邮箱&…

错误: tensorflow.python.framework.errors_impl.OutOfRangeError的解决方案

近日,在使用CascadeRCNN完成目标检测任务时,我在使用这个模型训练自己的数据集时出现了如下错误: tensorflow.python.framework.errors_impl.OutOfRangeError: PaddingFIFOQueue _1_get_batch/batch/padding_fifo_queue is closed and has in…

前端JS内存管理

JS内存管理 内存原理: 任何变成语言在执行的时候都需要操作系统来分配内存,只是有些语言需要手动管理分配的内存有些语言有专门来管理内存的方式 如 JVM 了解以上的概念之后,我们再来了解一下大致的内存周期 分配需要的内存使用内存在不使用…

Linux- 系统随你玩之--好用到炸裂的系统级监控、诊断工具

文章目录1、前言2、lsof介绍2.1、问题来了: 所有用户都可以采用该命令吗?3、 服务器安装lsof3.1、安装3.2、检查安装是否正常。4、lsof 命令4.1、常用功能选项4.2、输出内容4.2.1 、FD和 TYPE列5、 lsof 命令实操常见用法6 、常用组合命令7、 结语1、前言…

OpenHarmony 3.2 Beta Audio——音频渲染

一、简介Audio是多媒体子系统中的一个重要模块,其涉及的内容比较多,有音频的渲染、音频的采集、音频的策略管理等。本文主要针对音频渲染功能进行详细地分析,并通过源码中提供的例子,对音频渲染进行流程的梳理。二、目录foundatio…

无线WiFi安全渗透与攻防(一)之无线安全环境搭建

无线安全环境搭建 1.802.11标准 (1).概念 802.11标准是1997年IEEE最初制定的一个WLAN标准,工作在2.4GHz开放频段,支持1Mbit/s和2Mbit/s的数据传输速率,定义了物理层和MAC层规范,允许无线局域网及无线设备…

Crack:LightningChart .NE​​T 10.4.1中的新功能

数据游标 Lightningchart .NET UWP 展示应用 在以前的版本中,LightningChart .NET 提供了不同的工具来实现数据跟踪功能,但都需要额外的用户编码。 现在可以使用 DataCursor 浏览 ViewXY 系列。系列数据值由这个新类/对象显示在鼠标位置或鼠标位置附近。…

【matplotlib】可视化解决方案——如何解决matplotlib中文乱码问题

问题概述 Matplotlib 默认不支持中文字体,这是因为 matplotlib 只支持 ASCII 字符,但是国人使用 matplotlib 肯定需要中文标注。如下图所示,当不对 Matplotlib 进行设置,而直接使用中文时,绘制的图像会出现中文乱码。…

为什么我选择收费的AdsPower指纹浏览器?

在决定开始用指纹浏览器之前,东哥我们团队找了市面上很多产品去测试。最后,还是决定用AdsPower。每个人的使用感受都不一样,我就说几个东哥和我们团队用得顺手的点,大家在选择指纹浏览器的时候也可以做一个参考。 一、指纹环境强大…

3月5日,加入线上对话,点燃科技行业女性影响力!

对话升级,点燃科技行业女性影响力! 👋 2022 年,Jina AI 联合 14 家合作伙伴,首次举办了「Impact Tech, She Can」线上对话,11 位嘉宾与 200 多位参会者分享了如何在科技行业内打造自身影响力。 &#x1f38…

html基础(h、p、br、hr、文本加粗倾斜下划线删除线、资源路径、音频、视频、超链接)

1标题<h><h1>1级标题</h1><h2>2级标题</h2><h3>3级标题</h3><h4>4级标题</h4><h5>5级标题</h5><h6>6级标题</h6>2段落<p>和换行<br><p>段落标签</p><p> fgghikg…

大数据技术——面向对象编程基础

类类的定义字段定义:用val或var关键字进行定义方法定义:使用new关键字创建一个类的实例类成员可见性Scala类中所有成员的默认可见性为公有&#xff0c;任何作用域内都能直接访问公有成员除了默认的公有可见性&#xff0c;Scala也提供private和protected其中&#xff0c;private…