Spark Streaming DStream

news2025/1/12 8:52:47

Spark Streaming DStream

DStream

Discretized Stream,中文叫做离散流,Spark Streaming提供的一种高级抽象,代表了一个持续不断的数据流。

DStream可以通过输入数据源来创建,比如Kafka、Flume,也可以通过对其他DStream应用高阶函数来创建,比如map、reduce、join、window。

DStream的内部,其实是一系列持续不断产生的RDD,RDD是Spark Core的核心抽象,即不可变的,分布式的数据集。

DStream中的每个RDD都包含了一个时间段内的数据。
spark streaming 离散数据流

对DStream应用的算子,其实在底层会被翻译为对DStream中每个RDD的操作,比如对一个DStream执行一个map操作,会产生一个新的DStream,其底层原理为,对输入DStream中的每个时间段的RDD,都应用一遍map操作,然后生成的RDD,即作为新的DStream中的那个时间段的一个RDD。

底层的RDD的transformation操作,还是由Spark Core的计算引擎来实现的,Spark Streaming对Spark core进行了一层封装,隐藏了细节,然后对开发人员提供了方便易用的高层次API。
spark streaming DStream

操作DStream

对于从数据源得到的DStream,用户可以在其基础上进行各种操作。

与RDD类似,DStream也提供了自己的一系列操作方法,这些操作可以分成三类:普通的转换操作、窗口转换操作和输出操作。

普通的转换操作

转换描述
map(func)源 DStream的每个元素通过函数func返回一个新的DStream。
flatMap(func)类似与map操作,不同的是每个输入元素可以被映射出0或者更多的输出元素。
filter(func)在源DSTREAM上选择Func函数返回仅为true的元素,最终返回一个新的DSTREAM 。
repartition(numPartitions)通过输入的参数numPartitions的值来改变DStream的分区大小。
union(otherStream)返回一个包含源DStream与其他 DStream的元素合并后的新DSTREAM。
count()对源DStream内部的所含有的RDD的元素数量进行计数,返回一个内部的RDD只包含一个元素的DStreaam。
reduce(func)使用函数func(有两个参数并返回一个结果)将源DStream 中每个RDD的元素进行聚 合操作,返回一个内部所包含的RDD只有一个元素的新DStream。
countByValue()计算DStream中每个RDD内的元素出现的频次并返回新的DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素出现的频次。
reduceByKey(func, [numTasks])当一个类型为(K,V)键值对的DStream被调用的时候,返回类型为类型为(K,V)键值对的新 DStream,其中每个键的值V都是使用聚合函数func汇总。注意:默认情况下,使用 Spark的默认并行度提交任务(本地模式下并行度为2,集群模式下位8),可以通过配置numTasks设置不同的并行任务数。
join(otherStream, [numTasks])当被调用类型分别为(K,V)和(K,W)键值对的2个DStream 时,返回类型为(K,(V,W))键值对的一个新 DSTREAM。
cogroup(otherStream, [numTasks])当被调用的两个DStream分别含有(K, V) 和(K, W)键值对时,返回一个(K, Seq[V], Seq[W])类型的新的DStream。
transform(func)通过对源DStream的每RDD应用RDD-to-RDD函数返回一个新的DStream,这可以用来在DStream做任意RDD操作。
updateStateByKey(func)返回一个新状态的DStream,其中每个键的状态是根据键的前一个状态和键的新值应用给定函数func后的更新。这个方法可以被用来维持每个键的任何状态数据。
transform(func)

该transform操作(转换操作)及其类似的transformWith操作,允许在DStream上应用任意的RDD-to-RDD函数。它可以实现DStream API中未提供的操作,比如两个数据流的连接操作。

示例代码如下:

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
val cleanedDStream = wordCounts.transform { rdd =>
  rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
  ...
}
updateStateByKey操作

我们使用的一般操作都是不记录历史数据的,也就说只记录当前定义时间段内的数据,跟前后时间段无关。如果要统计历史时间内的总共数据并且实时更新,如何解决呢?该updateStateByKey操作可以让你保持任意状态,同时不断有新的信息进行更新。要使用updateStateByKey操作,必须进行下面两个步骤 :

  • 定义状态: 状态可以是任意的数据类型。
  • 定义状态更新函数:用一个函数指定如何使用先前的状态和从输入流中获取的新值更新状态。

对DStream通过updateStateByKey(updateFunction)来实现实时更新。

更新函数有两个参数 :

  • newValues是当前新进入的数据。
  • runningCount 是历史数据,被封装到了Option中。

为什么历史数据要封装到Option中呢?有可能我们没有历史数据,这个时候就可以用None,有数据可以用Some(x)。当然我们的当前结果也要封装到Some()中,以便作为之后的历史数据。

我们并不用关心新进入的数据和历史数据,系统会自动帮我们产生和维护,我们只需要专心写处理方法就行。

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {//定义的更新函数
    val newCount = ...  // add the new values with the previous running count to get the new count
    Some(newCount)
}
val runningCounts = pairs.updateStateByKey[Int](updateFunction)//应用

示例:

  1. 首先我们需要了解数据的类型

  2. 编写处理方法

  3. 封装结果

    //定义更新函数
    //我们这里使用的Int类型的数据,因为要做统计个数
    def updateFunc(newValues : Seq[Int],state :Option[Int]) :Some[Int] = {
     //传入的newVaules将当前的时间段的数据全部保存到Seq中
     //调用foldLeft(0)(_+_) 从0位置开始累加到结束   
     val currentCount = newValues.foldLeft(0)(_+_) 
     //获取历史值,没有历史数据时为None,有数据的时候为Some
     //getOrElse(x)方法,如果获取值为None则用x代替
     val  previousCount = state.getOrElse(0)
     //计算结果,封装成Some返回
     Some(currentCount+previousCount) 
    }
    //使用
    val stateDStream = DStream.updateStateByKey[Int](updateFunc)
    

窗口转换函数

Spark Streaming 还提供了窗口的计算,它允许你通过滑动窗口对数据进行转换,窗口转换操作如下:

转换描述
window(windowLength, slideInterval)返回一个基于源DStream的窗口批次计算后得到新的DStream。
countByWindow(windowLength,slideInterval)返回基于滑动窗口的DStream中的元素的数量。
reduceByWindow(func, windowLength,slideInterval)基于滑动窗口对源DStream中的元素进行聚合操作,得到一个新的DStream。
reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks])基于滑动窗口对(K,V)键值对类型的DStream中的值按K使用聚合函数func进行聚合操作,得到一个新的DStream。
reduceByKeyAndWindow(func, invFunc,windowLength, slideInterval, [numTasks])一个更高效的reduceByKkeyAndWindow()的实现版本,先对滑动窗口中新的时间间隔内数据增量聚合并移去最早的与新增数据量的时间间隔内的数据统计量。例如,计算t+4秒这个时刻过去5秒窗口的WordCount,那么我们可以将t+3时刻过去5秒的统计量加上[t+3,t+4]的统计量,在减去[t-2,t-1]的统计量,这种方法可以复用中间三秒的统计量,提高统计的效率。
countByValueAndWindow(windowLength,slideInterval, [numTasks])基于滑动窗口计算源DStream中每个RDD内每个元素出现的频次并返回DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素频次。与countByValue一样,reduce任务的数量可以通过一个可选参数进行配置。

在Spark Streaming中,数据处理是按批进行的,而数据采集是逐条进行的。因此在Spark Streaming中会先设置好批处理间隔(batch duration),当超过批处理间隔的时候就会把采集到的数据汇总起来成为一批数据交给系统去处理。

对于窗口操作而言,在其窗口内部会有N个批处理数据,批处理数据的大小由窗口间隔(window duration)决定,而窗口间隔指的就是窗口的持续时间,在窗口操作中,只有窗口的长度满足了才会触发批数据的处理。除了窗口的长度,窗口操作还有另一个重要的参数就是滑动间隔(slide duration),它指的是经过多长时间窗口滑动一次形成新的窗口,滑动窗口默认情况下和批次间隔的相同,而窗口间隔一般设置的要比它们两个大。

在这里必须注意的一点是滑动间隔和窗口间隔的大小一定得设置为批处理间隔的整数倍。
spark streaming 窗口转换函数

如图所示,批处理间隔是1个时间单位,窗口间隔是3个时间单位,滑动间隔是2个时间单位。对于初始的窗口time 1-time 3,只有窗口间隔满足了定义的长度也就是3才触发数据的处理,不够3继续等待。当间隔满足3之后进行计算后然后进行窗口滑动,滑动2个单位,会有新的数据流入窗口。然后重复等待满足窗口间隔执行计算。

输出操作

Spark Streaming允许DStream的数据被输出到外部系统,如数据库或文件系统。由于输出操作实际上使transformation操作后的数据可以通过外部系统被使用,同时输出操作触发所有DStream的transformation操作的实际执行(类似于RDD操作)。

以下表列出了目前主要的输出操作:

转换描述
print()在Driver中打印出DStream中数据的前10个元素。
saveAsTextFiles(prefix, [suffix])将DStream中的内容以文本的形式保存为文本文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。
saveAsObjectFiles(prefix, [suffix])将DStream中的内容按对象序列化并且以SequenceFile的格式保存。其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。
saveAsHadoopFiles(prefix, [suffix])将DStream中的内容以文本的形式保存为Hadoop文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。
foreachRDD(func)最基本的输出操作,将func函数应用于DStream中的RDD上,这个操作会输出数据到外部系统,比如保存RDD到文件或者网络数据库等。需要注意的是func函数是在运行该streaming应用的Driver进程里执行的。

DStream持久化

与RDD一样,DStream同样也能通过persist()方法将数据流存放在内存中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1542230.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Arduino智能家居

文章目录 一、接线框图1、下载fritzing 二、Arduino IDE 下载三、实现代码 一、接线框图 1、下载fritzing https://github.com/fritzing/fritzing-app/releases打开的软件界面如下: 二、Arduino IDE 下载 官网地址 P.S. 如果upload代码过程中出现cant open de…

球面数据的几何深度学习--球形 CNN

目录 一、说明二、球形 CNN概述三、球面数据的对称性四、标准(平面)CNN的局限性五、卷积并发症六、球面卷积七、球面卷积是不够的 一、说明 球面数据的几何深度学习–球形 CNN。通过对物理世界的平移对称性进行编码,卷积神经网络 &#xff0…

【Postman】工具使用介绍

一、postman工具介绍 1.什么是postman postman是谷歌开发的一款网页调试和接口测试工具,能够发送任何请求类型的http请求,支持GET/POST/PUT/DELETE等方法。postman简单易用,可以直接填写URL,header,body就可以发送一…

基于SSM非遗视域下喀什旅游网站

ssm非遗视域下喀什旅游网站的设计与实现 摘要 我们的生活水平正在不断的提高,然而提高的一个重要的侧面表现就是更加注重我们的娱乐生活。旅行是我们都喜欢的一种娱乐方式,各式各样的旅行经历给我们带来的喜悦也是大不相同的。带来快乐的同时也因为其复…

详细剖析多线程2----线程安全问题(面试高频考点)

文章目录 一、概念二、线程不安全的原因三、解决线程不安全问题--加锁(synchronized)synchronized的特性 四、死锁问题五、内存可见性导致的线程安全问题 一、概念 想给出⼀个线程安全的确切定义是复杂的,但我们可以这样认为: 在多…

STL标准模板库(C++

在C里面有已经写好的标准模板库〈Standard Template Library),就是我们常说的STL库,实现了集合、映射表、栈、队列等数据结构和排序、查找等算法。我们可以很方便地调用标准库来减少我们的代码量。 size/empty 所有的STL容器都支持这两个方法&#xff0c…

腾讯云GPU云服务器_GPU云计算_异构计算_弹性计算

腾讯云GPU服务器是提供GPU算力的弹性计算服务,腾讯云GPU服务器具有超强的并行计算能力,可用于深度学习训练、科学计算、图形图像处理、视频编解码等场景,腾讯云百科txybk.com整理腾讯云GPU服务器租用价格表、GPU实例优势、GPU解决方案、GPU软…

Python 全栈系列236 rabbit_agent搭建

说明 通过rabbit_agent, 以接口方式实现对队列的标准操作,将pika包在微服务内,而不必在太多地方重复的去写。至少在服务端发布消息时,不必再去考虑这些问题。 在分布式任务的情况下,客户端本身会启动一个持续监听队列的客户端服…

vscode使用Runner插件将.exe文件统一放到一个目录下

找到右下角管理,点击扩展。 找到Code Runner插件,打开扩展设置。 向下翻,找到Executor Map,点击在settings.json中编辑。 在c和c的配置命令栏中增加\\\output\\即可。(增加的目录不能自动创建,需要手动创建…

超高并发下Redis热点数据风险破解

1 介绍 作者是互联网一线研发负责人,所在业务也是业内核心流量来源,经常参与 业务预定、积分竞拍、商品秒杀等工作。 近期参与多场新员工的面试工作,经常就 『超高并发场景下热点数据』 可用性保障与候选人进行讨论。 本文聚焦一些关键点技术进行讨论,并总结一些热点场景…

pytorch 实现线性回归 softmax(Pytorch 04)

一 softmax 定义 softmax 是多分类问题,对决策结果不是多少,而是分类,哪一个。 为了估计所有可能类别的条件概率,我们需要一个有 多个输出的模型,每个类别对应一个输出。为了解决线 性模型的分类问题,我们…

Vscode按键占用问题解决

Vscode按键占用 在使用vscode的过程中,官方按键 Ctrl . 按键可以提示修复代码中的问题,但是发现按了没有反应。 解决问题 首先确认vscode中是否设置了这个按键,默认设置了的系统输入法中是否有按键冲突了,打开输入法设置检查 …

STM32 | Systick定时器(第四天源码解析)

STM32 | Systick定时器(第四天)STM32 | STM32F407ZE中断、按键、灯(续第三天)1、参考delay_us代码,完成delay_ms的程序 定时器频率换算单位:1GHZ=1000MHZ=1000 000KHZ = 1000 000 000HZ 定时器定时时间:计数个数/f(频率) 或者 (1/f(频率))*计数的个数 500/1MHZ = 500/1…

力扣3. 无重复字符的最长子串

Problem: 3. 无重复字符的最长子串 文章目录 题目描述思路及解法复杂度Code 题目描述 思路及解法 1.川建一个set集合存储最长的无重复的字符; 2.创建双指针p、q,每次当q指针指向的字符不在set集合中时将其添加到set集合中让q指针后移,并且更新…

IDEA, Pycharm, Goland控制台乱码

IDEA, Pycharm, Goland控制台乱码 问题描述: 控制台出现����等乱码 复现频率: 总是 解决方案: 以IDEA为例 添加 -Dfile.encodingUTF-8位置 idea64.exe.vmoptions 在安装idea的bin目录idea.vmoptions idea客户端 示意图

SpringBoot3+Vue3项目的阿里云部署--将后端以及前端项目打包

一、后端:在服务器上制作成镜像 1.准备Dockerfile文件 # 基础镜像 FROM openjdk:17-jdk-alpine # 作者 MAINTAINER lixuan # 工作目录 WORKDIR /usr/local/lixuan # 同步docker内部的时间 RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ…

为什么 Hashtable 不允许插入 null 键 和 null 值?

1、典型回答 浅层次的来回答这个问题的答案是,JDK 源码不支持 Hashtable 插入 value 值为 null,如以下JDK 源码所示: 也就是JDK 源码规定了,如果你给 Hashtable 插入 value 值为 null 就会抛出空指针异常 并目看上面的JDK 源码可…

SpringAOP+自定义注解实现限制接口访问频率,利用滑动窗口思想Redis的ZSet(附带整个Demo)

目录 1.创建切面 2.创建自定义注解 3.自定义异常类 4.全局异常捕获 5.Controller层 demo的地址,自行获取《《—————————————————————————— Spring Boot整合Aop面向切面编程实现权限校验,SpringAop自定义注解自定义异常全局…

Godot.NET C# 工程化开发(1):通用Nuget 导入+ 模板文件导出,包含随机数生成,日志管理,数据库连接等功能

文章目录 前言Github项目地址,包含模板文件后期思考补充项目设置编写失误环境visual studio 配置详细的配置看我这篇文章 Nuget 推荐NewtonSoft 成功Bogus 成功Github文档地址随机生成构造器生成构造器接口(推荐) 文件夹设置Nlog 成功!Nlog.configNlogHe…

AIPaperPass功能介绍

点击下方▼▼▼▼链接直达AIPaperPass ! AIPaperPass - AI论文写作指导平台 目录 1.AIPaperPass 插入代码功能上线! 体验方式 2.AIPaperPass介绍 1.高质量 2.免费大纲 3.参考文献 4.致谢模板 3.书籍介绍 AIPaperPass智能论文写作平台 1.AIPap…