【大数据技术Spark】DStream编程操作讲解实战(图文解释 附源码)

news2025/2/6 9:09:32

DStream编程

批处理引擎Spark Core把输入的数据按照一定的时间片(如1s)分成一段一段的数据,每一段数据都会转换成RDD输入到Spark Core中,然后将DStream操作转换为RDD算子的相关操作,即转换操作、窗口操作以及输出操作。RDD算子操作产生的中间结果数据会保存在内存中,也可以将中间的结果数据输出到外部存储系统中进行保存。

转换操作

1:无状态转换操作

无状态转化操作每个批次的处理不依赖于之前批次的数据。常见的 RDD 转化操作,例如 Map()、filter()、ReduceByKey() 等,都是无状态转化操作。 

2:有状态转化操作:

有状态转化操作需要使用之前批次的数据或者是中间结果来计算当前批次的数据。有状态转化操作包括基于滑动窗口的转化操作和追踪状态变化的转化操作。

 DStream API提供的与窗口操作相关的方法

DStream API提供的与输出操作相关的方法

 编写Spark Streaming程序的基本步骤是:

1)通过创建输入DStream来定义输入源。

2)通过对DStream应用转换操作和输出操作来定义流计算。

3)用streamingContext.start()来开始接收数据和处理流程

 4)通过streamingContext.awaitTermination()方法来等待处理结束(手动结束或因为错误而结束)。

5)可以通过streamingContext.stop()来手动结束流计算进程。

通过示例进行演示,此示例为监视一个文件夹的log日志,并计算每个单词出现的次数

cogroup和join算子需要两个并行数据流,对两个数据流直接关联,不同的是join算子是把两个RDD按照相同的key拼在一起,类似SQL中的等值连接,可以类似的使用其他算子进行RDD的左连接等,而cogroup算子是把两个RDD按照key拼起来,但是它会汇总得到的value,最后的结果的条数是根据key决定的,有多少key就汇总成多少条数据,然后把RDD的所有相同的key的value放到一个Iterable里面,类似于SQL里面的全连接

设置为本地运行模式,2个线程,一个监听,另一个处理数据
    val sparkConf = new SparkConf().setAppName("WordCountStreaming").setMaster("local[2]")
    // 时间间隔为20秒
    val stc = new StreamingContext(sparkConf, Seconds(20))
    //定义输入源,监听本地目录,也可以采用HDFS文件
    val lines = stc.textFileStream("E:/log")
    //应用转换操作flatMap流计算
    val words = lines.flatMap(_.split(" "))
    //应用转换操作Map和ReduceByKey计算
    val wordCounts = words.Map(x => (x, 1)).ReduceByKey(_ + _)
    wordCounts.print()
    //开始接收数据和处理流程
    stc.start()
    //等待处理结束
    stc.awaitTermination()
//创建两个可被并行操作的分布式数据集
    val idName = sc.parallelize(Array((1, "张三"), (2, "李四"), (3, "王五")))
    val idAge = sc.parallelize(Array((1, 30), (2, 29), (4, 21)))
    println("\ncogroup\n")
    //对两个并行数据集进行cogroup操作
    idName.cogroup(idAge).collect().foreach(println)
    println("\njoin\n")
    //对两个并行数据集进行join操作
     idName.join(idAge).collect().foreach(println)
 //3.获取StreamingContext对象,5秒一个批次
    val ssc = new StreamingContext(sparkContext,Seconds(5))
    //4.接收socket的数据
    val textStream: ReceiverInputDStream[String] = ssc.socketTextStream("hhaonote",9999)
    //5.获取每一行的单词
    val words: DStream[String] = textStream.flatMap(_.split(" "))
    //6.为每一个单词置为1
    val wordAndOne: DStream[(String, Int)] = words.Map((_,1))
     //7.每隔10秒统计最近10秒的搜索词出现的次数
    val result: DStream[(String, Int)] = wordAndOne.ReduceByKeyAndWindow((x:Int,y:Int)=>x+y,Seconds(10),Seconds(10))
    //8.打印
    result.print()
    


创作不易 觉得有帮助请点赞关注收藏~~~ 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/102952.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

和冬天有关的歌

我写过关于四季《新朝花夕拾》,我也写过关于夏天的歌:《和夏天有关的歌》,所以我也想写写冬天。(1)北风当你想到冬天的时候,你会想起什么?我会想到:北风、北方。正好有这么一首歌&am…

神经网络架构搜索

神经网络架构搜索 定义内涵 神经网络架构搜索是为给定数据集自动找到一个或多个架构的任务,这些架构将为给定 的数据集生成具有良好结果的模型,其本质是在高维空间的最优参数搜索问题。 技术背景 深度学习模型的使用越来越大众化,在很多行…

免费内网穿透工具测评对比,谁更好用 3

文章目录1. 前言2. 对比内容2.1 TCP协议功能及操作对比2.1.1 网云穿的TCP设置2.1.2 cpolar的TCP设置1.2 使用感受对比3. 结语1. 前言 发布本地网页通常是内网穿透的主要作用之一,但并不是唯一作用,内网穿透还能将本地硬件发布到公共互联网上&#xff0c…

ASP.NET企业HR人力资源管理系统源码(带使用手册和操作说明)

中小型企业HR人力资源管理系统源码带使用手册和操作说明 【程序语言】:.NET 【数据库】:SQL SERVER 2008 【运行环境】:WINDOWSIIS 【其他】:前端bootstrap框架 了解更多HR源码事宜可私信我! 系统运行环境&#x…

微信小程序云开发博客系统源代码,让写博客像发朋友圈一样简单,含使用部署教程

博客就两种:一是随笔,记录自己的成长历程,二是有目的的发文,例如搬运各种网赚文,我想大部分朋友做博客的初衷都是有一块自己的心灵净土,于是催生了wxapp-blog这款小程序。 完整代码下载地址:微…

零基础如何系统地自学Python?Python应该怎么学?

这次疫情让很多人,都发现自己真的适合群居,也让部分人觉得独居是个不错的选择,但是大部分人都会更明白工作的重要性。 如果没有收入,没有朋友,没有同事,没有那些鸡毛蒜皮的小事,会发现&#xf…

C++ Primer 第六章 函数

C Primer 第六章 函数6.1. Function BasicsParameters and Arguments6.2. Argument Passing6.2.3. const Parameters and Arguments6.2.4. Array Parameters6.2.6. Functions with Varying Parametersinitializer_list ParametersEllipsis Parameters6.3. Return Types and the…

Java多线程-线程的创建(Thread类的基本使用)

文章目录一. 线程和Thread类1. 线程和Thread类1.1 Thread类的构造方法1.2 启用线程的相关方法2. 创建第一个Java多线程程序3. 使用Runnable对象创建线程4. 使用内部类创建线程5. 使用Lambada表达式创建线程6. 多线程并发执行简单演示7. 多线程并发执行的优势二. Thread类的属性…

【专业数据】六.2020~2022年北京交通大学【新一代电子信息技术】专业复试线/分数线差/计划招生数/复试数/录取数/复试比例/录取率

文章目录 1.专业介绍2.2020-2022年国家线/复试线/分数线差2.1.数据总览2.2.数据指标2.2.1.复试分数线2.2.2.分数线差3.2020-2022年计划招生数/复试数/录取数/复试比例/录取率3.1.数据总览3.2.数据指标3.2.1.复试比例3.2.2.录取率4.参考资料欢迎订阅本专栏:《北交计算机复试经验…

【数据结构】树和二叉树

半山腰很挤,你得去山顶看看 目录 1.树 1.1 树的概念 1.2 树的特征 1.3 树每个结点的关系 1.4 树的表示形式 2.二叉树 2.1 二叉树的概念 2.2 特殊的二叉树 2.3 二叉树的性质 2.4 二叉树的存储 2.5 二叉树的基本操作 2.5.1 判断二叉树是否为空 2.…

JAVA类的继承和多态基础笔记(二)

1.继承的基本概念 父类中某一个属性是私有的,通过子类对象就不能访问父类的私有变量。 继承完之后拥有父类全部的东西,但是可以根据实际情况进行重写。 一般所有的类都是继承Object的,实现所有他的方法 像这样是重写了Object类的tostring方法…

[附源码]计算机毕业设计Python共享汽车系统(程序+源码+LW文档)

该项目含有源码、文档、程序、数据库、配套开发软件、软件安装教程 项目运行 环境配置: Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术: django python Vue 等等组成,B/S模式 pychram管理等…

【Numpy基础知识】数组的创建

介绍 来源:Numpy官网:https://numpy.org/doc/stable/user/basics.html 文章目录介绍导包【1】将Python序列转换为Numpy数组【2】通过已有的Numpy数组创建函数创建数组【3】复制、连接或者改变现有数组【4】从磁盘读取数组【5】使用字符串或缓冲区从原始字…

[附源码]Nodejs计算机毕业设计久宠宠物店管理系统Express(程序+LW)

该项目含有源码、文档、程序、数据库、配套开发软件、软件安装教程。欢迎交流 项目运行 环境配置: Node.js Vscode Mysql5.7 HBuilderXNavicat11VueExpress。 项目技术: Express框架 Node.js Vue 等等组成,B/S模式 Vscode管理前后端分…

载天冬酰胺酶磺丁基-β-/酶羟丙基-β-/天门冬酰胺酶环/EGF/维生素E乙酸酯糊精脂质体制备

下面为大家整理的科研内容是载天冬酰胺酶磺丁基-β-/酶羟丙基-β-/天门冬酰胺酶环/EGF/维生素E乙酸酯糊精脂质体制备,和小编一起来看! 载天冬酰胺酶磺丁基-β-环糊精制备:采用逆向蒸发法制备ASDL,通过酸碱稳定性,热稳定性,抗胰蛋白酶稳定性,血…

【专业数据】七.2020~2022年北京交通大学【计算机技术】专业复试线/分数线差/计划招生数/复试数/录取数/复试比例/录取率

文章目录 1.专业介绍2.2020-2022年国家线/复试线/分数线差2.1.数据总览2.2.数据指标2.2.1.复试分数线2.2.2.分数线差3.2020-2022年计划招生数/复试数/录取数/复试比例/录取率3.1.数据总览3.2.数据指标3.2.1.复试比例3.2.2.录取率4.参考资料欢迎订阅本专栏:《北交计算机复试经验…

写前端?Python有手就行...

前端除了用jscsshtml,还有没有其它办法?其实python也可以 爬它!(https://jq.qq.com/?_wv1027&keSp12WR5) 1. 安装与基本流程 安装 PyWebIO 和其他的第三方库一样使用pip install PyWebIO就行,没有…

云原生 | Kubernetes - 资源指标管道

目录 Metrics API 度量资源用量 CPU 内存 Metrics 服务器 对于 Kubernetes,Metrics API 提供了一组基本的指标,以支持自动伸缩和类似的用例。 该 API 提供有关节点和 Pod 的资源使用情况的信息, 包括 CPU 和内存的指标。如果将 Metrics …

【设计模式】适配器模式 (七)

文章目录5.2 适配器模式5.2.1 概述5.2.2 结构5.2.3 类适配器模式5.2.4 对象适配器模式5.2.5 应用场景5.2.6 JDK源码解析5.2 适配器模式 5.2.1 概述 如果去欧洲国家去旅游的话,他们的插座如下图最左边,是欧洲标准。而我们使用的插头如下图最右边的。因此…

机器学习笔记之玻尔兹曼机(一)基本介绍

机器学习笔记之玻尔兹曼机——基本介绍引言回顾:玻尔兹曼机的模型表示模型参数的对数似然梯度关于模型参数W\mathcal WW的对数似然梯度关于模型参数L,J\mathcal L,\mathcal JL,J的对数似然梯度引言 在受限玻尔兹曼机——模型表示(Representation)一节中以玻尔兹曼机…