大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例

news2025/1/12 21:57:05

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Spark Streaming DStream 转换函数
  • DStream 无状态转换
  • DStream 无状态转换 案例
    在这里插入图片描述

转换方式

有两个类型:

  • 无状态转换(已经完成)
  • 有状态转换

接下来开始有状态转换。

有状态转换

有状态转换主要有两种:

  • 窗口操作
  • 状态跟踪操作

窗口操作

Window Operations 可以设置窗口大小和滑动窗口间隔来动态获取当前Streaming的状态
基于窗口的操作会在一个比 StreamingContext 的 batchDuration(批次间隔)更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。

在这里插入图片描述

基于窗口的操作需要两个参数:

  • 窗口长度(Window Duration):控制每次计算最近的多少个批次的数据
  • 滑动间隔(Slide Duration):用来控制对新的 DStream 进行计算的间隔

两者都必须是StreamingContext中批次间隔(batchDuration)的整数倍

准备编码

我们先编写一个每秒发送一个数字:

package icu.wzk

import java.io.PrintWriter
import java.net.{ServerSocket, Socket}

object SocketWithWindow {

  def main(args: Array[String]): Unit = {
    val port = 9999
    val ss = new ServerSocket(port)
    val socket: Socket = ss.accept()
    var i = 0
    while (true) {
      i += 1
      val out = new PrintWriter(socket.getOutputStream)
      out.println(i)
      out.flush()
      Thread.sleep(1000)
    }
  }
}

[窗口操作] 案例2观察窗口数据

  • 观察窗口的数据
  • 观察 batchDuration、windowDuration、slideDuration 三者之间的关系
  • 使用窗口相关的操作

编写代码

package icu.wzk

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WindowDemo {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("WindowDemo")
      .setMaster("local[*]")

    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.sparkContext.setLogLevel("WARN")

    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
    lines.foreachRDD {
      (rdd, time) => {
        println(s"rdd = ${rdd.id}; time = $time")
      }
        rdd.foreach(value => println(value))
    }

    // 20秒窗口长度(DS包含窗口长度范围内的数据)
    // 10秒滑动间隔(多次时间处理一次数据)
    val res1: DStream[String] = lines
      .reduceByWindow(_ + " " + _, Seconds(20), Seconds(10))
    res1.print()

    val res2: DStream[String] = lines
      .reduceByWindow(_ + _, Seconds(20), Seconds(10))
    res2.print()

    // 求窗口元素的和
    val res3: DStream[Int] = lines
      .map(_.toInt)
      .reduceByWindow(_ + _, Seconds(20), Seconds(10))
    res3.print()

    // 请窗口元素和
    val res4 = res2.map(_.toInt).reduce(_ + _)
    res4.print()

    // 程序启动
    ssc.start()
    ssc.awaitTermination()

  }
}

运行结果

-------------------------------------------
Time: 1721628860000 ms
-------------------------------------------

rdd = 39; time = 1721628865000 ms
rdd = 40; time = 1721628870000 ms
-------------------------------------------
Time: 1721628870000 ms
-------------------------------------------

-------------------------------------------
Time: 1721628870000 ms
-------------------------------------------

-------------------------------------------
Time: 1721628870000 ms
-------------------------------------------

运行之后控制截图如下:
在这里插入图片描述

[窗口操作] 案例3 热点搜索词实时统计

编写代码

package icu.wzk

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object HotWordStats {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("HotWordStats")
      .setMaster("local[*]")

    val ssc = new StreamingContext(conf, Seconds(2))
    // 检查点设置 也可以设置到 HDFS
    ssc.sparkContext.setLogLevel("ERROR")
    ssc.checkpoint("checkpoint")

    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
    val words: DStream[String] = lines.flatMap(_.split("\\s+"))
    val pairs: DStream[(String, Int)] = words.map(x => (x, 1))

    // 通过 reduceByKeyAndWindow算子 每隔10秒统计最近20秒的词出现的的次数
    val wordCounts1: DStream[(String, Int)] = pairs
      .reduceByKeyAndWindow(
        (a: Int, b: Int) => a + b, Seconds(20), Seconds(10), 2
      )
    wordCounts1.print()

    // 需要CheckPoint的支持
    val wordCounts2: DStream[(String, Int)] = pairs
      .reduceByKeyAndWindow(
        _ + _, _ - _, Seconds(20), Seconds(10), 2
      )
    wordCounts2.print()

    // 运行程序
    ssc.start()
    ssc.awaitTermination()
  }

}

运行结果

-------------------------------------------
Time: 1721629842000 ms
-------------------------------------------
(4,1)
(8,1)
(6,1)
(2,1)
(7,1)
(5,1)
(3,1)
(1,1)

-------------------------------------------
Time: 1721629842000 ms
--------------------

运行结果如下图:
在这里插入图片描述

[状态追踪操作] updateStateByKey

UpdateStateByKey的主要功能:

  • 为Streaming中每一个Key维护一份State状态,state类型可以是任意类型的,可以是自定义对象,更新函数也可以是自定义的
  • 通过更新函数对该Key的状态不断更新,对于每个新的batch而言,Spark Streaming会在使用updateStateByKey的时候已经存在的key进行state状态更新
  • 使用updateStateByKey时要开启 CheckPoint 功能

编写代码1

流式程序启动后计算wordcount的累计值,将每个批次的结果保存到文件

package icu.wzk


import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StateTracker1 {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("StateTracker1")
      .setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.sparkContext.setLogLevel("ERROR")
    ssc.checkpoint("checkpoint")

    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
    val words: DStream[String] = lines.flatMap(_.split("\\s+"))
    val wordDStream: DStream[(String, Int)] = words.map(x => (x, 1))

    // 定义状态更新函数
    // 函数常量定义 返回类型是 Some(Int),表示的含义是最新状态
    // 函数的功能是将当前时间间隔内产生的Key的Value集合,加到上一个状态中,得到最新状态
    val updateFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => {
      // 通过Spark内部的reduceByKey按Key规约,然后这里传入某Key当前批次的Seq,再计算当前批次的总和
      val currentCount = currValues.sum
      // 已累加的值
      val previousCount = prevValueState.getOrElse(0)
      Some(currentCount + previousCount)
    }

    val stateDStream: DStream[(String, Int)] = wordDStream.updateStateByKey[Int](updateFunc)
    stateDStream.print()

    // 把DStream保存到文本文件中 会生成很多的小文件 一个批次生成一个目录
    val outputDir = "output1"
    stateDStream
      .repartition(1)
      .saveAsTextFiles(outputDir)

    // 开始运行
    ssc.start()
    ssc.awaitTermination()
  }
}

运行结果1

-------------------------------------------
Time: 1721631080000 ms
-------------------------------------------
(1,1)
(2,1)
(3,1)

-------------------------------------------
Time: 1721631085000 ms
-------------------------------------------
(8,1)
(1,1)
(2,1)
(3,1)
(4,1)
(5,1)
(6,1)
(7,1)

运行结果是:
在这里插入图片描述
统计全局的Key的状态,但是就算没有数据输入,也会在每一个批次的时候返回之前的Key的状态。

这样的缺点:

  • 如果数据量很大的话,CheckPoint数据会占用较大存储,而且效率也不高

编写代码2

mapWithState:也是用于全局统计Key的状态,如果没有数据输入,便不会返回之前的Key的状态,有一点增量的感觉。
这样做的好处是,只关心那些已经发生的变化的Key,对于没有数据输入,则不会返回那些没有变化的Key的数据,即使数据量很大,checkpoint也不会像updateStateByKey那样,占用太多的存储。

package icu.wzk

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}

object StateTracker2 {

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setAppName("StateTracker2")
      .setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(2))
    ssc.sparkContext.setLogLevel("ERROR")
    ssc.checkpoint("checkpoint")

    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
    val words: DStream[String] = lines.flatMap(_.split("\\s+"))
    val wordDStream: DStream[(String, Int)] = words.map(x => (x, 1))

    def mappingFunction(key: String, one: Option[Int], state: State[Int]): (String, Int) = {
      val sum: Int = one.getOrElse(0) + state.getOption.getOrElse(0)
      state.update(sum)
      (key, sum)
    }

    val spec = StateSpec.function(mappingFunction _)
    val resultDStream: DStream[(String, Int)] = wordDStream.mapWithState(spec)

    resultDStream.cache()

    // 把DStream保存到文本文件中,会生成很多的小文件。一个批次生成一个目录
    val outputDir = "output2"
    resultDStream.repartition(1).saveAsTextFiles(outputDir)

    ssc.start()
    ssc.awaitTermination()

  }
}

运行代码

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2075603.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

windows安装pytorch精简版(英伟达GPU)

1 下载anaconda 官网:Index of /anaconda/archive/ | 清华大学开源软件镜像站 | Tsinghua Open Source Mirror 选择下载Anaconda-1.4.0-Windows-x86.exe 2 创建虚拟环境 以管理员身份打开Anaconda Prompt conda env list conda creat -n yolov8 python3.8 创建过程中有提示,填…

【图文详解】idea码云环境搭建

公众号:墨轩学习网-----B站:墨轩大楼 欢迎关注!!! 一、码云简介 目前开源中国的四大框架,即四条产品线:开源中国社区、众包、码云和招聘。 码云是开源中国推出的基于Git的代码托管服务&#…

鸿蒙HarmonyOS开发:创建新的Lite工程

当开始开发一个应用/服务时,首先需要根据工程创建向导,创建一个新的工程,工具会自动生成对应的代码和资源模板。 说明 在运行DevEco Studio工程时,建议每一个运行窗口有2GB以上的可用内存空间。 创建和配置新工程 DevEco Studio提…

如何应对市场变革的战略利器之敏捷企业架构实践全景指南

敏捷与企业架构融合的必然性 在全球化和数字化的双重推动下,市场竞争的激烈程度前所未有。企业必须迅速适应市场的变化,以在激烈的竞争中脱颖而出。然而,传统的企业架构往往侧重于长期战略规划,尽管它在维持企业的稳定性方面功不…

阿贝云评测:免费虚拟主机与免费云服务器的优势对比

阿贝云作为一家知名云服务提供商,以其稳定可靠的服务质量在业界享有盛誉。其中,其免费虚拟主机和免费云服务器备受用户喜爱。在这篇评测中,我们将对这两种服务进行详细对比。 首先,就免费虚拟主机而言,阿贝云提供的免费…

图片工具箱:一键批量加水印,守护创意,提升效率!

前言 你是否曾在处理海量图片时,被繁琐的步骤和漫长的等待时间折磨得苦不堪言?是否梦想过拥有一款神器,能让你的图片处理工作变得轻松愉快,从此告别加班的烦恼,迎接升职加薪的曙光?那么,让我向…

我主编的电子技术实验手册(18)——认识电感

本专栏是笔者主编教材(图0所示)的电子版,依托简易的元器件和仪表安排了30多个实验,主要面向经费不太充足的中高职院校。每个实验都安排了必不可少的【预习知识】,精心设计的【实验步骤】,全面丰富的【思考习…

Golang学习笔记-Golang中的锁

同步原语和锁 Golang作为一个原生支持用户态的语言,当提到并发进程,多线程的时候,是离不开锁的,锁是一种并发编程中的同步原语(Synchronization Primitives),它能保证多个 Goroutine 在访问同一…

数据库的范式

作用是减小表的冗余。 防止插入删除更新异常。 第一、第二、第三、第四、BC范式。并且不是越高越好。 第一范式:1NF的定义为:符合1NF的关系中的每个属性都不可再分。表1所示的情况,就不符合1NF的要求。 …

终端Tabby介绍和使用

介绍一款开源的跨平台终端模拟器,支持系统:Windows、macOS、Linux 下载安装 下载链接:Release Alpha 164 Eugeny/tabby GitHub 下载适合自己环境的版本: 选项说明 【1】Enable analytics选项的作用主要是允许Tabby收集和分析…

【数据集】遥感影像建筑物变化检测对比实验常用数据集分享

整理了几个变化检测的对比试验中常用的变化检测数据集(建筑物) LEVIR-CD 下载链接: https://justchenhao.github.io/LEVIR/ 数据介绍: 用于建筑物变化检测数据集 分辨率:0.5m 尺寸:1024*1024 数量:637组&…

python-求和again(赛氪OJ)

[题目描述] 最近小理遇到了麻烦的问题,你能帮帮他吗? 题目是这样的:计算 SUM(n)123...10^n 。输入格式: 输入包含多组数据,每组数据一行,包括一个整数 n 。当 n−1 时输入终止。输出格式: 对于每…

经济学有哪些分支,分别研究什么?

经济学的分支众多,每个分支都专注于研究经济领域的不同方面。 以下是一些主要经济学分支及其研究内容的概述: 微观经济学: 研究个体经济单位(如家庭、企业)的决策过程以及这些决策如何影响资源分配、市场结构和价格形…

viewBinding的使用(android studio)

引入 在开发安卓软件的时候,我们会大量的使用点击事件。通常情况下,我们是这样做的:将在xml文件里把目标组件添加id属性,如下: 然后在activity里面通过findViewById(R.id.back) 得到一个对象,通过对象调用…

BAT32G137国产项目通用第十节:FreeRTOS 计数信号量

主题:计数信号量可以用于资源管理,允许多个任务获取信号量访问共享资源,但会限制任 务的最大数目。访问的任务数达到可支持的最大数目时,会阻塞其他试图获取该信号量的 任务,直到有任务释放了信号量。 1.常用信号量函数接口 ①创建计数信号量 xSemaphoreCreateCounting()…

AMEYA360:ROHM发售4款非常适用于工业电源的SOP封装通用AC-DC控制器IC

全球知名半导体制造商ROHM(总部位于日本京都市)开发出PWM控制方式*1FET外置型通用控制器IC,非常适用于工业设备的AC-DC电源。目前已有支持各种功率晶体管的4款新产品投入量产,包括低耐压MOSFET驱动用的“BD28C55FJ-LB”、中高耐压MOSFET驱动用的“BD28C5…

树莓派制成的 — 带运动检测和摄像头的安防系统

自动布防/撤防、运动检测、带图片的移动通知 项目所用物品 硬件组件 Raspberry Pi 1 Model A 一个:任何支持摄像头模块的 Raspberry Pi 均可 Raspberry Pi 摄像头模块一个 USB WLAN/WiFi适配器一个:必须支持监控模式,推荐使用RT5370 M…

NGINX高性能web服务器

1.web服务器介绍 Nginx (engine x) 是一个高性能的HTTP和反向代理web服务器,同时也提供了IMAP/POP3/SMTP服务。Nginx是由伊戈尔赛索耶夫为俄罗斯访问量第二的Rambler.ru站点(俄文:Рамблер)开发的,第一个公开版…

使用一台电脑监控全体员工电脑,怎么实现?安全高效两不误,透视全公司电脑使用情况!

传统的管理模式无法监管员工上班时间内的所有行为,如聊天、浏览与工作无关的网站、玩游戏等等,总不能让企业管理者一直盯着员工办公吧? 员工电脑作为企业运营的神经末梢,其安全与使用效率直接关系到企业的稳定发展。那么&#xf…

Jenkins docker容器时区修改

背景 用docker搭建的Jenkins环境时间显示和我们本地时间相差8个小时,可能是由于docker run的时候没有加上/etc/localtime:/etc/localtime去同步时区,所以需修改容器内部的系统时间 查看时间 先查看宿主机的系统时间 date -R 进docker容器查看时间 d…