Spark2.x 入门:DStream 转换操作

news2025/1/2 0:11:00

DStream转换操作包括无状态转换和有状态转换。
无状态转换:每个批次的处理不依赖于之前批次的数据。
有状态转换:当前批次的处理需要使用之前批次的数据或者中间结果。有状态转换包括基于滑动窗口的转换和追踪状态变化的转换(updateStateByKey)。

DStream无状态转换操作

下面给出一些无状态转换操作的含义:

  • map(func) :对源DStream的每个元素,采用func函数进行转换,得到一个新的DStream;
  • flatMap(func): 与map相似,但是每个输入项可用被映射为0个或者多个输出项;
  • filter(func): 返回一个新的DStream,仅包含源DStream中满足函数func的项;
  • repartition(numPartitions): 通过创建更多或者更少的分区改变DStream的并行程度;
  • union(otherStream): 返回一个新的DStream,包含源DStream和其他DStream的元素;
  • count():统计源DStream中每个RDD的元素数量;
  • reduce(func):利用函数func聚集源DStream中每个RDD的元素,返回一个包含单元素RDDs的新DStream;
  • countByValue():应用于元素类型为K的DStream上,返回一个(K,V)键值对类型的新DStream,每个键的值是在原DStream的每个RDD中的出现次数;
  • reduceByKey(func, [numTasks]):当在一个由(K,V)键值对组成的DStream上执行该操作时,返回一个新的由(K,V)键值对组成的DStream,每一个key的值均由给定的recuce函数(func)聚集起来;
  • join(otherStream, [numTasks]):当应用于两个DStream(一个包含(K,V)键值对,一个包含(K,W)键值对),返回一个包含(K, (V, W))键值对的新DStream;
  • cogroup(otherStream, [numTasks]):当应用于两个DStream(一个包含(K,V)键值对,一个包含(K,W)键值对),返回一个包含(K, Seq[V], Seq[W])的元组;
  • transform(func):通过对源DStream的每个RDD应用RDD-to-RDD函数,创建一个新的DStream。支持在新的DStream中做任何RDD操作。

无状态转换操作实例:我们之前《Spark2.1.0入门:套接字流(DStream)》部分介绍的词频统计,就是采用无状态转换,每次统计,都是只统计当前批次到达的单词的词频,和之前批次无关,不会进行累计。

DStream有状态转换操作

对于DStream有状态转换操作而言,当前批次的处理需要使用之前批次的数据或者中间结果。有状态转换包括基于滑动窗口的转换和追踪状态变化(updateStateByKey)的转换。

滑动窗口转换操作

滑动窗口转换操作的计算过程如下图所示,我们可以事先设定一个滑动窗口的长度(也就是窗口的持续时间),并且设定滑动窗口的时间间隔(每隔多长时间执行一次计算),然后,就可以让窗口按照指定时间间隔在源DStream上滑动,每次窗口停放的位置上,都会有一部分DStream被框入窗口内,形成一个小段的DStream,这时,就可以启动对这个小段DStream的计算。

滑动窗口转换操作是对每个滑动窗口内的数据进行计算。

这里写图片描述

下面给给出一些窗口转换操作的含义:

  • window(windowLength, slideInterval) 基于源DStream产生的窗口化的批数据,计算得到一个新的DStream;
  • countByWindow(windowLength, slideInterval) 返回流中元素的一个滑动窗口数;
  • reduceByWindow(func, windowLength, slideInterval) 返回一个单元素流。利用函数func聚集滑动时间间隔的流的元素创建这个单元素流。函数func必须满足结合律,从而可以支持并行计算;
  • reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 应用到一个(K,V)键值对组成的DStream上时,会返回一个由(K,V)键值对组成的新的DStream。每一个key的值均由给定的reduce函数(func函数)进行聚合计算。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。可以通过numTasks参数的设置来指定不同的任务数;
  • reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) 更加高效的reduceByKeyAndWindow,每个窗口的reduce值,是基于先前窗口的reduce值进行增量计算得到的;它会对进入滑动窗口的新数据进行reduce操作,并对离开窗口的老数据进行“逆向reduce”操作。但是,只能用于“可逆reduce函数”,即那些reduce函数都有一个对应的“逆向reduce函数”(以InvFunc参数传入);
  • countByValueAndWindow(windowLength, slideInterval, [numTasks]) 当应用到一个(K,V)键值对组成的DStream上,返回一个由(K,V)键值对组成的新的DStream。每个key的值都是它们在滑动窗口中出现的频率。

在《Spark2.1.0入门:Apache Kafka作为DStream数据源》内容中,已经使用了窗口转换操作,也就是,在KafkaWordCount.scala代码中,你可以找到下面这一行:

val wordCounts = pair.reduceByKeyAndWindow(_ + _,_ - _,Minutes(2),Seconds(10),2)

这行代码中就是一个窗口转换操作reduceByKeyAndWindow,其中,Minutes(2)是滑动窗口长度,Seconds(10)是滑动窗口时间间隔(每隔多长时间滑动一次窗口)。reduceByKeyAndWindow中就使用了加法和减法这两个reduce函数,加法和减法这两种reduce函数都是“可逆的reduce函数”,也就是说,当滑动窗口到达一个新的位置时,原来之前被窗口框住的部分数据离开了窗口,又有新的数据被窗口框住,但是,这时计算窗口内单词的词频时,不需要对当前窗口内的所有单词全部重新执行统计,而是只要把窗口内新增进来的元素,增量加入到统计结果中,把离开窗口的元素从统计结果中减去,这样,就大大提高了统计的效率。尤其对于窗口长度较大时,这种“逆函数”带来的效率的提高是很明显的。

updateStateByKey操作

当我们需要在跨批次之间维护状态时,就必须使用updateStateByKey操作。
下面我们就给出一个具体实例。我们还是以前面在《Spark2.1.0入门:套接字流(DStream)》讲过的NetworkWordCount为例子来介绍,在之前的套接字流的介绍中,我们统计单词词频采用的是无状态转换操作,也就是说,每个批次的单词发送给NetworkWordCount程序处理时,NetworkWordCount只对本批次内的单词进行词频统计,不会考虑之前到达的批次的单词,所以,不同批次的单词词频都是独立统计的。
对于有状态转换操作而言,本批次的词频统计,会在之前批次的词频统计结果的基础上进行不断累加,所以,最终统计得到的词频,是所有批次的单词的总的词频统计结果。
下面,我们来改造一下在套接字流介绍过的NetworkWordCount程序。

新建NetworkWordCountStateful.scala代码文件,在里面输入以下代码:

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel

object NetworkWordCountStateful {
  def main(args: Array[String]) {
    //定义状态更新函数
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }
        
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")
    val sc = new StreamingContext(conf, Seconds(5))
    sc.checkpoint("file:///usr/local/spark/mycode/streaming/stateful/")    //设置检查点,检查点具有容错机制
    val lines = sc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))
    val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
    stateDstream.print()
    sc.start()
    sc.awaitTermination()
  }
}

Spark Streaming的updateStateByKey可以把DStream中的数据按key做reduce操作,然后对各个批次的数据进行累加。注意,wordDstream.updateStateByKey[Int]每次传递给updateFunc函数两个参数,其中,第一个参数是某个key(即某个单词)的当前批次的一系列值的列表(Seq[Int]形式),updateFunc函数中 val currentCount = values.foldLeft(0)(_ + _) 的作用(请参考之前章节“fold操作”的介绍),就是计算这个被传递进来的与某个key对应的当前批次的所有值的总和,也就是当前批次某个单词的出现次数,保存在变量currentCount中。传递给updateFunc函数的第二个参数是某个key的历史状态信息,也就是某个单词历史批次的词频汇总结果。实际上,某个单词的历史词频应该是一个Int类型,这里为什么要采用Option[Int]呢?
Option[Int]是类型 Int的容器(请参考之前章节“模式匹配”了解Option类的使用方法),更确切地说,你可以把它看作是某种集合,这个特殊的集合要么只包含一个元素(即单词的历史词频),要么就什么元素都没有(这个单词历史上没有出现过,所以没有历史词频信息)。之所以采用 Option[Int]保存历史词频信息,这是因为,历史词频可能不存在,很多时候,在值不存在时,需要进行回退,或者提供一个默认值,Scala 为Option类型提供了getOrElse方法,以应对这种情况。 state.getOrElse(0)的含义是,如果该单词没有历史词频统计汇总结果,那么,就取值为0,如果有历史词频统计结果,就取历史结果,然后赋值给变量previousCount。最后,当前值和历史值进行求和,并包装在Some中返回。

build.sbt

name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.1.0"

启动以上程序,新打开一个窗口作为nc窗口,启动nc程序:

nc -lk 9999
//在这个窗口中手动输入一些单词
hadoop
spark
hadoop
spark
hadoop
spark

然后,你切换到刚才的监听窗口,会发现,已经输出了词频统计信息:

-------------------------------------------
Time: 1479890485000 ms
-------------------------------------------
(spark,1)
(hadoop,1)

-------------------------------------------
Time: 1479890490000 ms
-------------------------------------------
(spark,2)
(hadoop,3)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2055722.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ThreeJs学习笔记--GUI(可视化三维改变场景)

引入gui import { GUI } from "three/examples/jsm/libs/lil-gui.module.min.js";//具体的看自己本地threejs目录创建gui(实例化gui) // 实例化一个gui对象 const gui new GUI(); //改变/设置gui操作界面style属性 gui.domElement.style.ri…

【Protobuf】Protobuf 语法介绍

Protobuf 语法介绍 一、 字段规则二、消息类型的定义与使用1、练习——序列化后并写入文件2、练习——从文件中反序列化后打印输出 三、enum 类型1、 定义规则2、 定义时注意事项3、查看枚举类的操作方法4、 实际使用 四、Any 类型1、 介绍2、查看Any类的操作方法3、 实际使用 …

网络安全审计技术原理与应用

网络安全审计概述 概念 定义:对网络信息系统的安全相关活动信息进行获取、记录、存储、分析和利用的工作 作用:建立“事后”安全保障措施,保存网络安全事件及行为信息,为网络安全事件分析提供线索及证据,以便发现潜在网络安全威胁行为,开展网络安全风险分析及管理 常…

热搜第一!网易云音乐“崩了”

昨日(19日),网易云音乐发生故障,迅速登上热搜,引发了广泛关注。 当天下午,有用户反映网易云音乐官网无法访问。同时,网易云音乐App中的会员中心、创作者中心、商城等多个功能均出现异常&#x…

陕西文无文化与韩国RB娱乐签订2000亿韩元合作备忘录

近日,韩国知名制作人、延世大学教授、(株)RB Entertainment 公司社长尹钟豪先生在辰海资本(亚州)投资有限公司董事、国标舞世界冠军获得者王煜明先生的陪同下,对中国文化市场进行考察调研,先后在北京、西安等地与当地影…

一起学习LeetCode热题100道(47/100)

47.从前序与中序遍历序列构造二叉树(学习) 给定两个整数数组 preorder 和 inorder ,其中 preorder 是二叉树的先序遍历, inorder 是同一棵树的中序遍历,请构造二叉树并返回其根节点。 示例 1: 输入: preorder [3,9,20,15,7], inorder [9…

nginx变量+rewrite相关功能+反向代理+openresty

目录 1、nginx变量 (1)内置变量 示例 (2)自定义变量 示例 2、nginx中的rewrite模块功能 (1)ngx_http_rewrite_module 模块指令 1)if指令 示例 2)set指令 示例​编辑 3&am…

代码随想录算法训练营day48:单调栈

目录 739. 每日温度 503.下一个更大元素II 分析: 42. 接雨水 本质: 暴力解法 分析: 双指针优化 单调栈 84.柱状图中最大的矩形 分析: 双指针: 单调栈 739. 每日温度 力扣题目链接(opens new window) 请根…

这个大佬一年连中五篇顶会!

1、SLAM/3DGS/三维点云/医疗图像/扩散模型/结构光/Transformer/CNN/Mamba/位姿估计 顶会论文指导 2、基于扩散模型的跨域鲁棒自动驾驶场景理解 3、基于环境信息的定位,重建与场景理解 4、轻量级高保真Gaussian Splatting 5、基于大模型与GS的 6D pose estimatio…

【Redis】Redis线程与IO模型—(三)

Redis线程与IO模型 一、Redis 单线程二、多路复用机制三、Redis 6.0 多线程特性四、IO 多线程配置 一、Redis 单线程 通常说 Redis 是单线程,主要是指 Redis 的网络 IO 和键值对读写是由一个线程来完成的,其他功能,比如持久化、异步删除、集…

软件测试之全面质量管理

一.什么是TQM? 全面质量管理 英文:Total Quality Management TQM可以被定义为一种管理技术,用于改进与产品相关的过程、产品、服务和其他方法。 它关注的是整个业务,而不仅仅是一个特定的项目或过程。 二.TQM原则 以顾客为关注…

软考作弊率下降了78.68%!官方为了防止作弊做出了哪些努力?

01\软考违纪违规人数对比 2024年上半年软考(机考)共有52名考生被判违纪违规行为,其中浙江考区有9人,山东枣庄考区有10人,江苏考区有33人。 2023年下半年软考(机考)共有7名考生被判违纪违规行为…

磁盘格式化文件恢复:一文看懂数据恢复操作

当你意识到关键的硬盘已经被格式化,而且你不能获取里面的内容时,这会是非常令人沮丧的。这种情况可能是因为硬盘被不小心格式化,或者是你在试图修正一些问题、调整文件系统或者释放存储空间时,有意进行的格式化。无论具体情况是什…

【论文学习与撰写】论文中公式的编辑,Mathtype的使用,全文编号排版,智能截图识别公式,公式编号自动更新

1、准备工作 在word中安装mathtype插件, 2、插入公式 在想要插入公式的地方,点击右编号,在里面输入公式,保存,关闭,就会得到插入的公式。 3、公式编号的编辑,公式编号自动更新 要想插入的公式…

笔试练习day5

目录 游游的you题目解析解法方法一贪心方法二 腐烂的苹果题目解析例子1例子2解法多源BFS最短路径代码代码解析 JZ62 孩子们的游戏(圆圈中最后剩下的数)题目解析解法方法一模拟环形链表模拟数组模拟 方法二递推/递归/动态规划状态表示状态转移方程代码 感谢各位大佬对我的支持,如…

CORS error 302 Found

CORS error && 302 Found 场景 单点登录认证:访问A系统,在B系统登录认证 此处代码为A系统 controller ResponseBodyGetMapping("/idp/loginCheck")public void loginCheck(HttpServletRequest request, HttpServletResponse httpR…

基于vue框架的爱学习分享平台ud317(程序+源码+数据库+调试部署+开发环境)系统界面在最后面。

系统程序文件列表 项目功能:用户,学科分类,交流答疑,论坛交流,学习资料 开题报告内容 基于Vue框架的爱学习分享平台 开题报告 一、项目背景与意义 随着互联网技术的飞速发展,知识的获取与传播方式正经历着前所未有的变革。在线教育平台逐渐成为满足…

【独立站搭建经验分享】B2C独立站如何搭建?怎么推广?

如果你的产品有C端属性,可能你就需要考虑建一个B2C独立站,用来满足访客的浏览和在线下单。那么B2C独立站应该怎么搭建,选择什么推广方式,这个前期最好有一些基本了解,本篇内容可以针对你最关心的问题进行讲解&#xff…

【文献】3D Gaussian Splatting for Real-Time Radiance Field Rendering

论文地址:https://arxiv.org/abs/2308.04079 项目: https://repo-sam.inria.fr/fungraph/3d-gaussian-splatting/ 代码: git clone https://github.com/graphdeco-inria/gaussian-splatting —recursive 一、文章概述 1.问题导向 辐射场…

《笑谈设计模式》 — 23种尝鲜版(未完待续......

引子:无论在平时开发过程中,还是深夜研读源码亦或者面试时都对遇到——设计模式。比如说Spring中的单例模式(bean单例)、工厂模式(bean创建)、代理模式(动态代理)、策略模式等。我们…