Spark(23):SparkStreaming之DStream转换

news2025/1/11 3:42:23

目录

0. 相关文章链接

1. DStream转换概述

2. 无状态转化操作

2.1. Transform

2.2. join

3. 有状态转化操作

3.1. UpdateStateByKey

3.2. WindowOperations


0. 相关文章链接

 Spark文章汇总 

1. DStream转换概述

        DStream 上的操作与 RDD 的类似,分为 Transformations(转换)和 Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种 Window 相关的原语。 

2. 无状态转化操作

        无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的每一个 RDD。部分无状态转化操作列在了下表中。注意,针对键值对的 DStream 转化操作(比如reduceByKey())要添加 import StreamingContext._才能在 Scala 中使用。 

        需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个 DStream 在内部是由许多 RDD(批次)组成,且无状态转化操作是分别应用到每个 RDD 上的。 例如:reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据。

2.1. Transform

        Transform 允许 DStream 上执行任意的 RDD-to-RDD 函数。即使这些函数并没有在 DStream 的 API 中暴露出来,通过该函数可以方便的扩展 Spark API。该函数每一批次调度一次。其实也就是对 DStream 中的 RDD 应用转换。

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamTest {

    def main(args: Array[String]): Unit = {

        //初始化Spark配置信息
        val sparkConf: SparkConf = new SparkConf()
            .setMaster("local[*]")
            .setAppName("StreamTest")

        //初始化SparkStreamingContext,并设置CK
        val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(3))
        ssc.checkpoint("./checkpoint")

        //创建DStream     
        val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)

        //转换为RDD操作
        val wordAndCountDStream: DStream[(String, Int)] = lineDStream.transform(
            (rdd: RDD[String]) => {
                val words: RDD[String] = rdd.flatMap(_.split(" "))
                val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
                val value: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)
                value
            }
        )

        //打印
        wordAndCountDStream.print

        //启动
        ssc.start()
        ssc.awaitTermination()

    }

}

2.2. join

        两个流之间的 join 需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的 RDD 进行 join,与两个 RDD 的 join 效果相同。 

代码如下:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamTest {

    def main(args: Array[String]): Unit = {

        //1.初始化Spark配置信息
        val sparkConf: SparkConf = new SparkConf()
            .setMaster("local[*]")
            .setAppName("StreamTest")

        //2.初始化SparkStreamingContext,并设置CK
        val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(30))
        ssc.checkpoint("./checkpoint")

        //3.从端口获取数据创建流
        val lineDStream1: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
        val lineDStream2: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 8888)

        //4.将两个流转换为KV类型
        val wordToOneDStream: DStream[(String, Int)] = lineDStream1.flatMap(_.split(" ")).map((_, 1))
        val wordToTwoDStream: DStream[(String, String)] = lineDStream2.flatMap(_.split(" ")).map((_, "a"))

        //5.流的JOIN
        val joinDStream: DStream[(String, (Int, String))] = wordToOneDStream.join(wordToTwoDStream)

        //6.打印
        wordToOneDStream.print()
        wordToTwoDStream.print()
        joinDStream.print()

        //7.启动任务
        ssc.start()
        ssc.awaitTermination()

    }

}

数据输入如下:

数据输出如下:

-------------------------------------------
Time: 1689237570000 ms
-------------------------------------------
(abc,1)
(hello,1)
(b,1)

-------------------------------------------
Time: 1689237570000 ms
-------------------------------------------
(abc,a)
(hello,a)
(a,a)

-------------------------------------------
Time: 1689237570000 ms
-------------------------------------------
(abc,(1,a))
(hello,(1,a))

-------------------------------------------
Time: 1689237600000 ms
-------------------------------------------

-------------------------------------------
Time: 1689237600000 ms
-------------------------------------------

-------------------------------------------
Time: 1689237600000 ms
-------------------------------------------

3. 有状态转化操作

3.1. UpdateStateByKey

        UpdateStateByKey 原语用于记录历史记录,有时,我们需要在 DStream 中跨批次维护状态(例如流计算中累加 wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的 DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。 updateStateByKey() 的结果会是一个新的 DStream,其内部的 RDD 序列是由每个时间区间对应的(键,状态)对组成的。 

updateStateByKey 操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能,需要做下面两步: 

  • 定义状态,状态可以是一个任意的数据类型。 
  • 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。

使用 updateStateByKey 需要对检查点目录进行配置,会使用检查点来保存状态,更新版的wordCount如下:

  • 编写代码:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamTest {

    def main(args: Array[String]): Unit = {

        // 创建执行环境,并设置checkpoint
        val conf: SparkConf = new SparkConf()
            .setMaster("local[*]")
            .setAppName("StreamTest")
        val ssc: StreamingContext = new StreamingContext(conf, Seconds(10))
        ssc.checkpoint("./checkpoint")

        // 获取数据,并进行解析
        val dataDStream: DStream[(String, Int)] = ssc
            .socketTextStream("localhost", 9999)
            .flatMap((_: String).split(" "))
            .map((word: String) => (word, 1))

        // 使用updateStateByKey来更新状态,统计从运行开始以来单词总的次数
        // 会根据传入的key进行划分,将相同的key划分到同一个渠道中
        //      其中values参数是当前批次的数据,比如当前批次传入的数据是  abc hello abc,那就是 (abc,1) (hello,1) (abc,1)
        //      其中state为以往批次单词频度,比如之前传入的2个批次的数据是 abc hello abc和abc hello abc,那就是 (abc,4) (hello,2)
        //      注意:上述2个参数中只包含对应的值,不包括key,key在算子中就已经进行划分了
        //      此时在方法中,就可以对取出的value进行运算了(比如sum,avg等运算)
        val stateDstream: DStream[(String, Int)] = dataDStream
            .updateStateByKey[Int](
                (values: Seq[Int], state: Option[Int]) => {

                    println("values开始")
                    values.foreach((data: Int) => {
                        println("values == " + data)
                    })
                    println("values结束")

                    println("state开始")
                    state.foreach((data: Int) => {
                        println("state == " + data)
                    })
                    println("state结束")

                    val currentCount: Int = values.sum
                    val previousCount: Int = state.getOrElse(0)
                    Some(currentCount + previousCount)

                }
            )

        // 打印数据
        stateDstream.print()

        // 启动环境
        ssc.start()
        ssc.awaitTermination()

    }
}
  • 启动程序并向 9999 端口发送数据:

  • 结果展示:
/*
第一个批次说明解析:
因为之前没有state,所以state开始就结束了,没有打印日志出来
而values的话,是当前批次的数据,每条数据就会打印一条日志,所以有3条打印,并且因为是2个key,所以有2个values开始和结束
*/
-------------------------------------------
Time: 1689240910000 ms
-------------------------------------------

values开始
values == 1
values == 1
values结束
state开始
state结束
values开始
values == 1
values结束
state开始
state结束


/*
第二个批次说明解析:
在此批次中,有state数据了,并且还是2个key,所以打印了2个stats开始和结束;而且state存储的是历史状态累积,所以这是这2个key在之前批次的数据累积
而values的话,是当前批次的数据,每条数据就会打印一条日志,所以有6条打印,并且因为是2个key,所以有2个values开始和结束
*/
-------------------------------------------
Time: 1689240920000 ms
-------------------------------------------
(abc,2)
(hello,1)

values开始
values == 1
values == 1
values结束
state开始
state == 1
state结束
values开始
values == 1
values == 1
values == 1
values == 1
values结束
state开始
state == 2
state结束


/*
第三个批次说明解析:
在此批次中,有state数据了,并且还是2个key,所以打印了2个stats开始和结束;而且state存储的是历史状态累积,所以这是这2个key在之前批次的数据累积(比第2个批次的数据值增加了)
而values的话,是当前批次的数据,在这个批次中没有数据输入,所以没有values的值,但是还是那2个key,所以打印了2个values开始和结束
*/
-------------------------------------------
Time: 1689240930000 ms
-------------------------------------------
(abc,6)
(hello,3)

values开始
values结束
state开始
state == 6
state结束
values开始
values结束
state开始
state == 3
state结束

3.2. WindowOperations

        Window Operations 可以设置窗口的大小和滑动窗口的间隔来动态的获取当前 Steaming 的允许状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。

  • 窗口时长:计算内容的时间范围; 
  • 滑动步长:隔多久触发一次计算;

注意:这两者都必须为采集周期大小的整数倍。

WordCount窗口计算版本,如下所示:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamTest {

    def main(args: Array[String]): Unit = {

        // 创建执行环境,并设置checkpoint
        val conf: SparkConf = new SparkConf()
            .setMaster("local[*]")
            .setAppName("StreamTest")
        val ssc: StreamingContext = new StreamingContext(conf, Seconds(5))
        ssc.checkpoint("./checkpoint")

        // 接收数据,并开窗统计
        // 在如下的reduceByKeyAndWindow方法中,第一个参数是数据处理方法,第二个参数窗口的大小,第三个是滑动的步长
        // 如果没有第三个参数,那就是滚动窗口
        val wordCounts: DStream[(String, Int)] = ssc
            .socketTextStream("localhost", 9999)
            .flatMap((_: String).split(" "))
            .map((word: String) => (word, 1))
            .reduceByKeyAndWindow((a: Int, b: Int) => (a + b), Seconds(20), Seconds(10))

        // 结果打印
        wordCounts.print()

        // 启动环境
        ssc.start()
        ssc.awaitTermination()

    }
}

关于 Window 的操作还有如下方法: 

  • window(windowLength, slideInterval): 基于对源 DStream 窗化的批次进行计算返回一个新的 Dstream; 
  • countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数; 
  • reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流; 
  • reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]): 当在一个(K,V)对的 DStream 上调用此函数,会返回一个新(K,V)对的 DStream,此处通过对滑动窗口中批次数据使用 reduce 函数来整合每个 key 的 value 值。 
  • reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): 这个函数是上述函数的变化版本,每个窗口的 reduce 值都是通过用前一个窗的 reduce 值来递增计算。通过 reduce 进入到滑动窗口数据并”反向 reduce”离开窗口的旧数据来实现这个操作。一个例子是随着窗口滑动对 keys 的“加”“减”计数。通过前边介绍可以想到,这个函数只适用于” 可逆的 reduce 函数”,也就是这些 reduce 函数有相应的”反 reduce”函数(以参数 invFunc 形式传入)。如前述函数,reduce 任务的数量通过可选参数来配置。 

val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getIpAddress(), 1)) 
val ipCountDStream = ipDStream
    .reduceByKeyAndWindow( 
        {(x, y) => x + y}, 
        {(x, y) => x - y}, 
        Seconds(30), 
        Seconds(10)
) 
//加上新进入窗口的批次中的元素 
//移除离开窗口的老批次中的元素 
//窗口时长
//滑动步长 

        countByWindow()和 countByValueAndWindow()作为对数据进行计数操作的简写。countByWindow()返回一个表示每个窗口中元素个数的 DStream,而 countByValueAndWindow()返回的 DStream 则包含窗口中每个值的个数。 

val ipDStream = accessLogsDStream.map{
    entry => entry.getIpAddress()
} 
val ipAddressRequestCount = ipDStream.countByValueAndWindow(Seconds(30), Seconds(10))  
val requestCount = accessLogsDStream.countByWindow(Seconds(30), Seconds(10)) 

注:其他Spark相关系列文章链接由此进 ->  Spark文章汇总 


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/749460.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

测试计划(详细版)

目录 简介 测试计划的目的 测试计划的作用 范围 编写条件 注意事项 评审总结 参考模版 测试策略 简介 数据和数据库完整性测试 接口测试 集成测试 功能测试 用户界面测试 性能评测 负载测试 强度测试 容量测试 安全性和访问控制测试 故障转移和恢复测试 …

【linux】“gdb“调试方法与技巧

"gdb"调试方法与技巧 一、什么是gdb?二、gdb的使用进入gdb调试list/l 查看源码l/list行号list/l函数名 run/r运行break(b)设置断点info break查看断点信息finishn/next单条执行s/step进入函数调用p 变量set var修改变量的值continue/cdisable breakpionts…

基于javaweb旅游景点线路预定系统设计与实现

1.引言 随着我国人们生活水平的不断提高,旅游逐渐成为人们工作之余,进行放松压力,调节情绪的首要选择。近几年,我国旅游游客规模不断扩大,使旅游业得到快速发展,但也带来了更激烈竞争。面对更复杂的旅游业…

一分钟让你学会如何判断文件是否结束

文章目录 前言ferror函数feof函数ferror函数与feof函数搭配使用 前言 在所有的文件输入输出函数中,我们介绍了一系列的与文件的顺序读写有关的函数,它们调用成功与失败时的返回值各不相同,为了能更好地记忆这些函数,我们将这些函…

react使用SVGA特效 常用api

下载插件 npm install svgaplayerweb --save react中代码 import React, { useEffect } from react; import SVGA from svgaplayerweb const Svga () > {const bofang () > {var player new SVGA.Player(#demoCanvas);//创建实例var parser new SVGA.Parser(#demo…

【经验贴】项目管理过程中最容易忽略的问题,你踩了几个?

“有没有一句话送给刚入行的项目经理?” 随着毕业季的到来,最近发现越来越多类似的话题,评论区成了众多项目经理自嘲的“据点”,部分新手项目经理看完可能要重新考虑下职业规划了。 “要重点关注客户的需求,而不是项…

Android蓝牙协议栈fluoride(一) - 概述

发展 Android 4.2之前的版本采用Linux官方的蓝牙协议栈BlueZ,Android 4.2开始使用google和Broadcom共同开发的Bluedroid来替代BlueZ,但早期的Bluedroid并不完善,存在较多问题,更新迭代后将其更名为fluoride。接下来的系列文章将逐…

接口测试之深入理解HTTPS

前言 随着网络安全问题越来越被重视,HTTPS协议的使用已经逐渐主流化。目前的主流站点均已使用了HTTPS协议;比如:百度、淘宝、京东等一二线主站都已经迁移到HTTPS服务之上。而作为测试人员来讲,也要需时俱进对HTTPS协议要有一定的…

分布式调用与高并发处理 Dubbo分布式调用

一、Dubbo概念 1.1 什么是分布式系统 单机架构 一个系统业务量很小的时候所有的代码都放在一个项目中就好了,然后这个项目部署在一台服务器上,整个项目所有的服务都由这台服务器提供。 缺点: 服务性能存在瓶颈代码量庞大,系统臃…

未跟踪的文件: (使用 “git add <文件>...“ 以包含要提交的内容)怎么移除这些内容

有时候我们常常修改一些内容 手动就是:rm -rf system/core/healthd/images/.png 怎么丢弃呢? git clean -f . 删除这种文件

微信小程序设置底部导航栏

微信小程序设置底部导航栏 1、前言2、图标准备3、小程序tabbar设置 1、前言 我们先来看下效果图: 注意: 导航栏数量最多5个,最少两个。 2、图标准备 阿里图标库 http://www.iconfont.cn/collections/show/29 我们进入该网站,选…

第九章(1):循环神经网络与pytorch示例(RNN实现股价预测)

第九章(1):循环神经网络与pytorch示例(RNN实现股价预测) 作者:安静到无声 个人主页 作者简介:人工智能和硬件设计博士生、CSDN与阿里云开发者博客专家,多项比赛获奖者,发…

1.4 MVP矩阵

MVP矩阵代表什么 MVP矩阵分别是模型(Model)、观察(View)、投影(Projection)三个矩阵。 我们的顶点坐标起始于局部空间(Local Space),在这里他成为局部坐标(L…

【PHP面试题35】什么是MVC,为什么要使用它

文章目录 一、前言二、MVC介绍2.1 模型(Model)2.2 视图(View)2.3 控制器(Controller) 三、MVC模式的优点四、总结 一、前言 本文已收录于PHP全栈系列专栏:PHP面试专区。 计划将全覆盖PHP开发领域…

『分割』 平面模型分割

PCL提供的几个常见模型: pcl::SACMODEL_PLANE:平面模型,用于拟合平面结构的点云数据。 pcl::SACMODEL_SPHERE:球体模型,适用于拟合球体结构的点云数据。 pcl::SACMODEL_CYLINDER:圆柱体模型,用…

一个四年Android程序猿的2023上半年总结

一晃就做了四年的Android开发了,时光飞逝啊~ 工作的时间飞快,感觉每一天都很充实,但是大多数都是重复的样子。 去年的目标达成: 去年的目标就是学习学习,涨薪涨薪。上家公司的同事氛围很不错&#xff0…

一篇文章了解Redis分布式锁

Redis分布式锁 什么是分布式锁? ​ redis分布式锁是一种基于redis实现的锁机制,它用于在多并发分布式环境下控制并发访问共享资源。在多个应用程序或是进程访问共享资源时,分布式锁可以确保只有一个进程可以访问该资源,不会发生…

采用555时基电路的简易/可调定时长延时电路设计

采用 555 时基电路的简易长延时电路 本电路和一般的定时电路相比是通过在 555 时基电路的 5 脚处加了一个二极管 VD1,使得定时时间延长的特点。 一、电路工作原理 电路原理如图 11 所示。 当按下按钮SB时,12V的电源通过电阻器Rt向电容器Ct充电&#…

弹性IP和公网IP有什么区别?哪个好

​  弹性IP和公网IP有什么区别?哪个好。IP是服务器重要的组成资源,一台云服务器实例一般分为公网IP和内网IP,公网IP指的是对外访问的IP地址,是针对公众用户的IP,这是网站绑定的服务器IP地址。而内网IP顾名思义就是内部的网络IP…

Android Monkey稳定性测试

l 命令样例: adb shell monkey -p packagename --ignore-timeouts --ignore-crashes -v -v --throttle 200 1000000 各个参数的意义如下: -p 用此参数指定一个或多个包(Package)。指定包之后,Monkey将只允许系统启…