Spark Streaming DStream转换

news2024/10/6 22:24:20

        DStream上的操作与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的算子,如:updateStateByKey()、transform()以及各种Window相关的算子。

无状态转化操作

        无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。部分无状态转化操作列在了下表中

         需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD(批次)组成,且无状态转化操作是分别应用到每个RDD上的。

        例如:reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据。在之前的wordcount程序中,我们只会统计几秒内接收到的数据的单词个数,而不会累加。

Transform

Transform允许DStream上执行任意的RDD-to-RDD函数。即使这些函数并没有在DStream的API中暴露出来,通过该函数可以方便的扩展Spark API。该函数每一批次调度一次。其实也就是对DStream中的RDD应用转换。


object Spark06_Nostate_Transform {
  def main(args: Array[String]): Unit = {
    //创建SparkConf
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    //创建StreamingContext
    val ssc = new StreamingContext(conf, Seconds(3))
    //创建DStream
    val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop202", 9999)
    //转换为RDD操作
    val wordAndCountDStream: DStream[(String, Int)] = lineDStream.transform(rdd => {
    val words: RDD[String] = rdd.flatMap(_.split(" "))
    val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
    val value: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)
      value
    })
    //打印
    wordAndCountDStream.print
    //启动
    ssc.start()
    ssc.awaitTermination()
  }
}

有状态转化操作

UpdateStateByKey

        UpdateStateByKey算子用于将历史结果应用到当前批次,该操作允许在使用新信息不断更新状态的同时能够保留他的状态。

        有时,我们需要在DStream中跨批次维护状态(例如流计算中累加wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。

UpdateStateByKey() 的结果会是一个新的DStream,其内部的RDD 序列是由每个时间区间对应的(键,状态)对组成的。

        为使用这个功能,需要做下面两步:

        1. 定义状态,状态可以是一个任意的数据类型。

        2. 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。

        使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态。

        更新版的wordcount

 例如:每隔一段时间景点人新增流量(从程序启动开始,在原有递增)

  1. 编写代码

object Spark07_State_updateStateByKey {
  def main(args: Array[String]): Unit = {
    //创建SparkConf
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    //创建StreamingContext
    val ssc = new StreamingContext(conf, Seconds(3))
    //设置检查点路径  用于保存状态
    ssc.checkpoint("D:\\dev\\workspace\\my-bak\\spark-bak\\cp")
    //创建DStream
    val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop202", 9999)
    //扁平映射
    val flatMapDS: DStream[String] = lineDStream.flatMap(_.split(" "))
    //结构转换
    val mapDS: DStream[(String, Int)] = flatMapDS.map((_,1))
    //聚合
    // 注意:DStreasm中reduceByKey只能对当前采集周期(窗口)进行聚合操作,没有状态
    //val reduceDS: DStream[(String, Int)] = mapDS.reduceByKey(_+_)
    val stateDS: DStream[(String, Int)] = mapDS.updateStateByKey(
      (seq: Seq[Int], state: Option[Int]) => {
        Option(seq.sum + state.getOrElse(0))
      }
    )
//打印输出
    stateDS.print()
    //启动
    ssc.start()
    ssc.awaitTermination()
  }
}
  1. 启动程序并向9999端口发送数据

[atguigu@hadoop202 ~]$ nc -lk 9999

  1. 查看结果为累加

Window Operations(窗口操作)

Spark Streaming 也提供了窗口计算, 允许执行转换操作作用在一个窗口内的数据。默认情况下, 计算只对一个时间段内的RDD进行, 有了窗口之后, 可以把计算应用到一个指定的窗口内的所有 RDD 上。一个窗口可以包含多个时间段,基于窗口的操作会在一个比StreamingContext的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长

  • 窗口时长:计算内容的时间范围;
  • 滑动步长:隔多久触发一次计算。

注意:这两者都必须为采集周期的整数倍。

如下图所示WordCount案例:窗口大小为批次的2倍,滑动步等于批次大小。

 例如:一小时人流量的变化,窗口(6秒)和间隔(3秒)不一致,不一定从程序启动开始

需求:WordCount统计 3秒一个批次,窗口6秒,滑步3秒。

object Spark08_State_window {
  def main(args: Array[String]): Unit = {
    //创建SparkConf
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    //创建StreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    //设置检查点路径  用于保存状态
    ssc.checkpoint("D:\\dev\\workspace\\my-bak\\spark-bak\\cp")
    //创建DStream
    val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop202", 9999)
    //扁平映射
    val flatMapDS: DStream[String] = lineDStream.flatMap(_.split(" "))
    //设置窗口大小,滑动的步长
    val windowDS: DStream[String] = flatMapDS.window(Seconds(6),Seconds(3))
    //结构转换
    val mapDS: DStream[(String, Int)] = windowDS.map((_,1))
    //聚合
   val reduceDS: DStream[(String, Int)] = mapDS.reduceByKey(_+_)
    reduceDS.print()
    //启动
    ssc.start()
    ssc.awaitTermination()
  }
}

关于Window的操作还有如下方法:

1) window(windowLength, slideInterval)

        基于对源DStream窗化的批次进行计算返回一个新的Dstream

2) countByWindow(windowLength, slideInterval)

         返回一个滑动窗口计数流中的元素个数

3) countByValueAndWindow()

        返回的DStream则包含窗口中每个值的个数

4) reduceByWindow(func, windowLength, slideInterval)

         通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流

5) reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])

        当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值

6) reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

        这个函数是上述函数的变化版本,每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算。通过reduce进入到滑动窗口数据并”反向reduce”离开窗口的旧数据来实现这个操作。如果把3秒的时间窗口当成一个池塘,池塘每一秒都会有鱼游进或者游出,那么第一个函数表示每由进来一条鱼,就在该类鱼的数量上累加。而第二个函数是,每游出去一条鱼,就将该鱼的总数减去一。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/385496.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

打造优秀项目团队的3个核心原则

优秀的项目团队必须是高绩效的,而打造这样优秀团队需要3个核心原则:共同的目标、专业的技能和高效的协作。 1、共同的项目目标 项目团队的共同目标就是实现项目的交付成果。项目经理以远景宏大的方式将目标传递给团队成员,以激发团队成员的战…

jeesite多环境配置

jeesite多环境配置 参考网址: https://blog.csdn.net/shaoming314/article/details/129115912?spm1001.2014.3001.5501 开源项目地址: https://gitee.com/thinkgem/jeesite Spring Spring MVC mybatis Ehcache shiro mysql jsp (主要技术栈) 项目…

【大数据离线开发】8.3 Hive的数据模型

8.4 Hive的数据模型 Hive的数据存储 基于HDFS没有专门的数据存储格式存储结构主要包括:数据库、文件、表、视图可以直接加载文本文件(.txt文件)创建表时,指定Hive数据的列分隔符与行分隔符 8.4.1 内部表 hive 的内部表类似 My…

hexo静态网站部署到腾讯云cos

hexo支持很多部署方案,最直接的就是部署在GitHub Pages服务上,国内gitee、coding等代码托管平台也都支持静态网站服务,而且免费。 但是GitHub在国内访问不太稳定,国内的代码托管平台资源和服务也不太稳定,后来想了想&…

windows安装tomcat

这里写自定义目录标题tomcat官网下载安装包并解压环境变量配置启动tomcat访问http://localhost:8080/修复启动出现乱码问题tomcat官网下载安装包并解压 环境变量配置 系统环境变量新增: 变量名:CATALINA_HOME 变量值:tomcat的安装目录 编辑…

使用MAT进行内存分析,并找到OOM问题

前言 在处理一次现场问题时,发现服务还在运行,但是出现假死情况,后通过分析GC日志以及使用MAT分析确定问题是内存溢出OutOfMemery(OOM);这里只记录MAT分析学习过程,最近工作忙,补记录。 GC日志分析 首先,如…

EM@三角函数诱导公式

文章目录诱导公式单位圆坐标和三角函数记忆口诀符号看象限奇变偶不变例常用诱导公式🎈常用部分(5对)倒数关系六种三角函数间的转换关系小结ReflectionsShifts and periodicity诱导公式 诱导公式 - 维基百科,自由的百科全书 (wikipedia.org) 单位圆坐标…

推送投票制作微信推送里投票制作教程在线投票活动制作

近些年来,第三方的微信投票制作平台如雨后春笋般络绎不绝。随着手机的互联网的发展及微信开放平台各项基于手机能力的开放,更多人选择微信投票小程序平台,因为它有非常大的优势。1.它比起微信公众号自带的投票系统、传统的H5投票系统有可以图…

一文看懂网上下单的手机流量卡为什么归属都是随机的!

最近很多网上下单的小伙伴们心中似乎都有一个疑问。那就是网上很多手机卡、流量卡都不能自选号码和归属地,就算能自选号码,归属地也是随机的而且很多都不会跟你说具体的城市,这是为什么呢?莫非其中有什么不可告人的秘密吗?小伙伴…

JetBrains IntelliJ支持自动切换输入法,写代码如丝般顺滑

背景简介对于母语为中文的开发者,写代码过程中经常需要在中/英输入法之间进行切换,而且由于不清楚当前处于哪种输入状态,有时输入到一半发现输入法错了,删除重新输入,有时切换了好几次都没有成功,实在太影响…

【强化学习】强化学习数学基础:蒙特卡洛方法

强化学习数学方法:蒙特卡洛方法举个例子举个例子1:投掷硬币The simplest MC-based RL algorithm举个例子2:Episode lengthUse data more efficientlyMC without exploring starts总结内容来源将value iteration和policy iteration方法称为mod…

无线耳机哪个品牌好一点?2023四款好用的无线耳机排行

随着蓝牙耳机的普及,越来越多的耳机厂商加入蓝牙耳机这条竞争赛道。不同品牌的蓝牙耳机又有着不同的价位区间,不同的性能配置,不同的外观设计,可以说现在的蓝牙耳机多到让人在选择时眼花缭乱。那么,无线耳机哪个品牌好…

云端需求助力跑赢周期,金山办公有望借助ChatGPT加速腾飞

与微软在办公领域“搏杀”了三十年的金山办公,或许正在迎来自己的“第二春”。2月25日,金山办公(688111)发布2022年度业绩快报,全年营收38.85亿元人民币(单位下同),同比增加18.44%&a…

智慧工厂数字孪生可视化监测系统有效提升厂区安全管控效力

我国制造业正处于产业升级的关键时期,基于数据进行生产策略制定与管理是大势所趋,而数据可视化以更直观的方式成为数据分析传递信息的重要工具。 深圳华锐视点通过三维可视化手段对工厂各类设备进行三维建模,真实复现设备设施外观、结构、运转…

基于卷积神经网络CNN的分类研究,基于卷积神经网络的手写体识别

目录 背影 卷积神经网络CNN的原理 卷积神经网络CNN的定义 卷积神经网络CNN的神经元 卷积神经网络CNN的激活函数 卷积神经网络CNN的传递函数 卷积神经网络CNN手写体识别 基本结构 主要参数 MATALB代码 结果图 展望 背影 现在生活,各种人工智能都要求对图像拥有识别…

Linux内核4.14版本——drm框架分析(1)——drm简介

目录 1. DRM简介(Direct Rendering Manager) 1.1 DRM发展历史 1.2 DRM架构对比FB架构优势 1.3 DRM图形显示框架 1.4 DRM图形显示框架涉及元素 1.4.1 DRM Framebuffer 1.4.2 CRTC 1.4.3 Encoder 1.4.4 Connector 1.4.5 Bridge 1.4.6 Panel 1.4.…

双指针法将时间复杂度从 O(n^2) 优化到 O(n)

[1] 什么是双指针法 双指针法(Two Pointers)是一种常见的算法技巧,常用于数组和链表等数据结构中。 双指针法的基本思想是维护两个指针,分别指向不同的位置,通过它们的移动来解决问题。在某些情况下,使用双…

【Leetcode】移除链表元素 链表的中间节点 链表中倒数第k个节点

目录 一.【Leetcode203】移除链表元素 1.链接 2.题目再现 A.双指针法 B.类尾删法 C.哨兵位 二.【Leetcode876】链表的中间节点 1.链接:链表的中间节点 2.题目再现 3.解法:快慢指针 三.链表中倒数第k个节点 1.链接:链表中倒数第k个…

LiveGBS国标GB/T28181国标视频流媒体平台-功能报警订阅配置报警预案告警截图及录像

LiveGBS国标GB/T28181国标视频流媒体平台-功能报警订阅配置报警预案告警截图及录像1、报警信息1.1、报警查询1.2、配置开启报警订阅1.2.1、国标设备编辑1.2.2、选择开启报警订阅1.3、配置摄像头报警1.3.1、配置摄像头报警通道ID1.3.2、配置摄像头开启侦测1.3.3、尝试触发摄像头…

企业为什么需要做APP安全评估?

近几年新型信息基础设施建设和移动互联网技术的不断发展,移动APP数量也呈现爆发式增长,进而APP自身的“脆弱性”也日益彰显,这对移动用户的个人信息及财产安全带来巨大威胁和挑战。在此背景下,国家出台了多部法律法规,…