185-200-spark-核心编程-Streaming

news2025/4/14 20:25:40

185-spark-核心编程-Streaming:

数据处理延迟的长短分为:实时数据处理(毫秒级别),离线数据处理(小时,天)

数据处理的方式分为:流式数据处理(streaming,来一点处理一点),批量数据处理(batch,来一批数据处理一批)

sparkstreaming,准实时(秒,分钟),微批次(时间)的数据处理框架。 用于流式数据的处理

SparkStreaming 架构图

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2gOOIJ30-1670772304076)(png/image-20211102225102730.png)]

Spark Streaming 使用离散化流(discretized stream)作为抽象表示,叫作 DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因此得名“离散化”)。所以简单来将,DStream 就是对 RDD 在实时数据处理场景的一种封装。

在这里插入图片描述

背压机制

Spark 1.5 以前版本,如果要限制 Receiver 的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer 数据生产高于 maxRate,当前集群处理能力也高于 maxRate,这就会造成资源利用率下降等问题。

为更好协调数据接收速率与资源处理能力,1.5 版本开始 Spark Streaming 可以动态控制数据接收速率来适配集群数据处理能力。背压机制(即 Spark Streaming Backpressure): 根据JobScheduler 反馈作业的执行信息来动态调整 Receiver 数据接收率。通过属性“spark.streaming.backpressure.enabled”来控制是否启用 backpressure 机制,默认值false,即不启用。

SparkStream入门操作

WordCount 案例实操

➢ 需求:使用 netcat 工具向 9999 端口不断的发送数据,通过 SparkStreaming 读取端口数据并统计不同单词出现的次数

window-netcat的使用 https://eternallybored.org/misc/netcat/

建议直接使用linux nc -lk 9999

代码:SparkStreaming01_WordCount,SparkStreaming02_Queue

自定义数据采集器 SparkStreaming03_DIY

package spark.stream.com.zh
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
import scala.util.Random

/**
 * sparkstreaming wordcount的初体验
 */
object SparkStreaming03_DIY {
  def main(args: Array[String]): Unit = {
    //创建环境对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")

    //StreamingContext 参数1:环境配置  参数2:批量处理的周期时间(采集周期)
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val messageDs: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver())
    messageDs.print()
    //启动采集器
    ssc.start()
    //等待采集器的关闭
    ssc.awaitTermination()
  }
  /**
   * 自定义数据采集器
   * 1、继承Receiver,定义泛型
   * 2.重写方法
   */
  class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) {
    private var flag = true;
    //表示启动时候
    override def onStart(): Unit = {
      new Thread(new Runnable {
        override def run(): Unit = {
          while (flag) {
            val message = "采集的数据为:  " + new Random().nextInt(10).toString
            //存储
            store(message)
            Thread.sleep(500)
          }
        }
      }).start()
    }
    //关闭
    override def onStop(): Unit = {
      flag = false
    }
  }
}

Kafka数据源(重点)

Kafka 0-10 Direct模式,通过 SparkStreaming 从 Kafka 读取数据,并将读取过来的数据做简单计算,打印到控制台。

依赖:

<dependency>
 <groupId>org.apache.spark</groupId>
 <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
 <version>3.0.0</version>
</dependency>
<dependency>
 <groupId>com.fasterxml.jackson.core</groupId>
 <artifactId>jackson-core</artifactId>
 <version>2.10.1</version>
 </dependency>

代码:spark.stream.com.zh.SparkStreaming04_Kafka,启动接收消息

启动zk和kafka

>kafkbin/kafka-topics.sh --zookeeper hadoop102:2181 --list
>bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --topic kafkasparkstreamtest --partitions 3 --replication-factor 2
>kafkbin/kafka-topics.sh --zookeeper hadoop102:2181 --list
>bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic kafkasparkstreamtest
发送消息

DStream 转换

DStream 上的操作与 RDD 的类似,分为 Transformations(转换)和 Output Operations(输出)两种

无状态转化操作

Transform SparkStreaming05_State_Transform

Transform 允许 DStream 上执行任意的 RDD-to-RDD 函数。即使这些函数并没有在 DStream的 API 中暴露出来,通过该函数可以方便的扩展 Spark API。该函数每一批次调度一次。其实也就是对 DStream 中的 RDD 应用转换。

**join **SparkStreaming06_State_Join

两个流之间的 join 需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的 RDD 进行 join,与两个 RDD 的 join 效果相同。

有状态转化操作

UpdateStateByKey SparkStreaming05_State

//采集5秒周期数据      采集5秒周期数据        采集5秒周期数据
//[a,b,a]  Data1       [a,b,b] Data2         [a,b,a] Data3
//读取data1时,记录[(a,2)(b,1)],读取data2时,[(a,3)(b,3)],读取data3时,[(a,5)(b,4)]

UpdateStateByKey 原语用于记录历史记录

  1. 定义状态,状态可以是一个任意的数据类型。

  2. 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。使用 updateStateByKey 需要对检查点目录进行配置,会使用检查点来保存状态。

WindowOperations窗体操作 SparkStreaming06_State_Window

Window Operations 可以设置窗口的大小和滑动窗口的间隔来动态的获取当前 Steaming 的允许状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。 注意:这两者都必须为采集周期大小的整数倍。

➢ 窗口时长:计算内容的时间范围

➢ 滑动步长:隔多久触发一次计算。

//采集5秒周期数据      采集5秒周期数据        采集5秒周期数据
//[a,b,c]  Data1       [b,c,d] Data2         [a1,b1,c1] Data3
/** 窗体是10,步长为10,为单个采集周期的2倍。则,
* 一个窗体的数据为data1+data2->data3+data4(含指定步长时,不含则为data2+data3)
* */

关于 Window 的操作还有如下方法(有待进一步了解深入。入门操作和效果概念以及使用了解):

(1)window(windowLength, slideInterval): 基于对源 DStream 窗化的批次进行计算返回一个新的 Dstream;

(2)countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数;

(3)reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流;

(4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]): 当在一个(K,V)对的 DStream 上调用此函数,会返回一个新(K,V)对的 DStream,此处通过对滑动窗口中批次数据使用 reduce 函数来整合每个 key 的 value 值。

(5)reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): 这个函数是上述函数的变化版本,每个窗口的 reduce 值都是通过用前一个窗的 reduce 值来递增计算。通过 reduce 进入到滑动窗口数据并”反向 reduce”离开窗口的旧数据来实现这个操作。一个例子是随着窗口滑动对 keys 的“加”“减”计数。通过前边介绍可以想到,这个函数只适用于可逆的 reduce 函数”,也就是这些 reduce 函数有相应的”反 reduce”函数(以参数 invFunc 形式传入)。如前述函数,reduce 任务的数量通过可选参数来配置。

DStream输出

输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。与 RDD 中的惰性求值类似,如果一个 DStream 及其派生出的 DStream 都没有被执行输出操作,那么这些 DStream 就都不会被求值。如果 StreamingContext 中没有设定输出操作,整个 context 就都不会启动。

输出操作如下:

➢ print():在运行流程序的驱动结点上打印 DStream 中每一批次数据的最开始 10 个元素。这用于开发和调试。

➢ saveAsTextFiles(prefix, [suffix]):以 text 文件形式存储这个 DStream 的内容。每一批次的存储文件名基于参数中的 prefix 和 suffix。”prefix-Time_IN_MS[.suffix]”。

➢ saveAsObjectFiles(prefix, [suffix]):以 Java 对象序列化的方式将 Stream 中的数据保存为SequenceFiles . 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]".

➢ saveAsHadoopFiles(prefix, [suffix]):将 Stream 中的数据保存为 Hadoop files. 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"。

➢ foreachRDD(func):这是最通用的输出操作,即将函数 func 用于产生于 stream 的每一个RDD。其中参数传入的函数 func 应该实现将每一个 RDD 中数据推送到外部系统,如将RDD 存入文件或者通过网络将其写入数据库。通用的输出操作 foreachRDD(),它用来对 DStream 中的 RDD 运行任意计算。这和 transform() 有些类似,都可以让我们访问任意 RDD。在 foreachRDD()中,可以重用我们在 Spark 中实现的所有行动操作。比如,常见的用例之一是把数据写到诸如 MySQL 的外部数据库中。

注意(暂时了解,后续补充):

  1. 连接不能写在 driver 层面(序列化)

  2. 如果写在 foreach 则每个 RDD 中的每一条数据都创建,得不偿失;

  3. 增加 foreachPartition,在分区创建(获取)

优雅关闭 SparkStreaming08_Close

流式任务需要 7*24 小时执行,但是有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所有配置优雅的关闭就显得至关重要了。使用外部文件系统来控制内部程序关闭。

数据恢复:SparkStreaming08_Resume

学习路径:https://space.bilibili.com/302417610/,如有侵权,请联系q进行删除:3623472230

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/89288.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ORACLE19c数据库随LINUX操作系统自动启动实现方式

1.建立目录 # su - oracle $ mkdir /home/oracle/scripts 2.建立启动脚本&#xff1a; $ cd /home/oracle/scripts $ vim startdb.sh #!/bin/bash export ORACLE_BASE/u01/app/oracle export ORACLE_HOME$ ORACLE_BASE/product/19.16.0/db_1 export ORACLE_SIDemrep export PAT…

【电脑使用】利用diskpart删除电脑的EFI分区

文章目录前言问题描述问题解决扩展&#xff1a;测量磁盘读写速度1 win10自带工具2 第三方工具前言 在Windows的磁盘管理中&#xff0c;往往会发现自己电脑的磁盘中莫名多了一些分区&#xff0c;有一些是系统分区&#xff08;一般不删&#xff09;&#xff0c;还有一些是还原分区…

m索引OFDM调制解调系统的性能仿真分析

目录 1.算法描述 2.仿真效果预览 3.MATLAB核心程序 4.完整MATLAB 1.算法描述 随着无线通信技术的不断发展,人们对下一代移动通信系统提出了越来越高的要求。在这样的时代背景下,具有低峰均比,强频偏对抗能力和高能量效率的索引调制OFDM系统(Orthogonal Frequency Division …

【跟学C++】C++STL三大主要组件——容器/迭代器/算法(Study19)

文章目录1、前言2、简介2.1、STL是什么&#xff1f;2.2、STL能干什么&#xff1f;2.3、STL组成3、容器3.1、顺序容器3.2、排序容器(关联式容器)3.3、哈希容器3.4、容器适配器3、迭代器3.1、迭代器介绍3.2、迭代器定义方式3.3、迭代器类别3.4、辅助函数4、算法5、总结 【说明】…

【MATLAB教程案例60】使用matlab实现基于GRU网络的数据分类预测功能与仿真分析

欢迎订阅《FPGA学习入门100例教程》、《MATLAB学习入门100例教程》 目录 1.软件版本 2.GRU网络理论概述

【云原生进阶之容器】第一章Docker核心技术1.5.4节——cgroups使用

4 CGroups使用 4.1 挂载cgroup树 开始使用cgroup前需要先挂载cgroup树,下面先看看如何挂载一颗cgroup树,然后再查看其根目录下生成的文件。 #准备需要的目录 #准备需要的目录 dev@ubuntu:~$ mkdir cgroup && cd cgroup dev@ubuntu:~/cgroup$ mkdir demo#由于name=…

[论文解析] Diffusion Guided Domain Adaptation of Image Generators

project link: https://styleganfusion.github.io/ 文章目录OverviewWhat problem is addressed in the paper?What is the key to the solution?What is the main contribution?IntroductionBackgroundLatent diffusion modelClassifier-free guidanceMethodModel Structur…

pytorch深度学习实战lesson36

第三十六课 锚框 因为我们在目标检测里面需要预测边缘框&#xff0c;所以给我们的预测带来了很大的问题。我们在卷积神经网络里面做图片分类的时候&#xff0c;整个代码写起来看上去非常简单&#xff0c;就是一个 soft Max 出去就完事了。但是因为有边框的加入&#xff0c;使得…

第十二期 | 万元的正版课程仅花9.9就可买到?

顶象防御云业务安全情报中心监测发现&#xff0c;某线上教育培训类平台课件遭遇大规模盗取。被盗取的课件&#xff0c;经加工处理后&#xff0c;进行低价转售&#xff0c;严重损害了平台的合法权益。 飞速发展的在线教育和看不见的风险 随着5G、视频编解码等技术融合&#xff…

DevExpress .Net Components 22.2.3 Crack

DevExpress .Net适用于 Windows、Internet 以及您的移动世界的用户界面组件 使用适用于 WinForms、WPF、ASP.NET&#xff08;WebForms、MVC 和 Core&#xff09;、Windows 10、VCL 和 JavaScript 的 DevExpress 组件&#xff0c;打造一流的用户体验并模拟最热门的企业生产力程…

产品负责人 VS 产品经理

概述 Scrum框架创造了对新角色的需求&#xff0c;其中就包括 “产品负责人” 。这不可避免额外地导致对产品负责人和产品经理角色的误解和误用&#xff0c;对团队产生不必要的压力。 角色混淆会带来噪音和摩擦&#xff0c;削弱团队对价值、质量、速度和满意度的关注。这种混乱…

让搜狗快速收录网站的方法,批量查询网站有没有被搜狗收录

让搜狗快速收录只需做到以下8点&#xff1a; 1、网页标题要与内容相关。 2、页面少用flash&#xff0c;图片等 3、将网站链接大量推送给搜狗。 4、网页尽量采用静态网页。 5、首页的外部链接不要过多。 6、搜狗更喜欢受用户欢迎的内容的网站。 7、网站不要欺骗用户。 8、网站不…

四道编程题(涉及最大公约数最小公倍数,子序列等)

tips 1. scanf当是读取整数%d的时候&#xff0c;这时候如果它读取到\n&#xff0c;它就会停止读取。并且碰到空格的时候也会跳过。 2. getchar不需要传入参数&#xff0c;读取失败的时候会返回EOF。那getchar或者scanf到底是怎么从键盘上读取我输入的字符呢&#xff1f;在getc…

VSCode入门

VSCode入门 零、文章目录 文章地址 个人博客-CSDN地址&#xff1a;https://blog.csdn.net/liyou123456789个人博客-GiteePages&#xff1a;https://bluecusliyou.gitee.io/techlearn 代码仓库地址 Gitee&#xff1a;https://gitee.com/bluecusliyou/TechLearnGithub&#…

[附源码]Node.js计算机毕业设计高校创新学分申报管理系统Express

项目运行 环境配置&#xff1a; Node.js最新版 Vscode Mysql5.7 HBuilderXNavicat11Vue。 项目技术&#xff1a; Express框架 Node.js Vue 等等组成&#xff0c;B/S模式 Vscode管理前后端分离等等。 环境需要 1.运行环境&#xff1a;最好是Nodejs最新版&#xff0c;我…

微服务实用篇4-消息队列MQ

今天主要来学习异步通讯技术MQ&#xff0c;主要包括初识MQ&#xff0c;RabbitMQ快速入门&#xff0c;SpringAMQP三大部分&#xff0c;下面就来一起学习吧。路漫漫其修远兮&#xff0c;吾将上下而求索&#xff0c;继续加油吧&#xff0c;少年。 目录 一、初识MQ 1.1、同步通讯…

文件历史记录无法识别此驱动器如何修复?

案例&#xff1a; 在电脑中尝试使用内置工具文件历史记录将文件备份到另一个硬盘时&#xff0c;发现如图所示的错误“文件历史记录无法识别此驱动器”&#xff0c;这可怎么办&#xff1f; 文件历史记录驱动器断开连接的原因 文件历史记录无法识别此驱动器的原因可能是启动类型…

四种排序(选择排序、冒泡排序、快速排序和插入排序)

四种排序&#xff08;选择排序、冒泡排序、快速排序和插入排序&#xff09;选择排序&#xff1a;完整代码&#xff1a;运行结果&#xff1a;冒泡排序&#xff1a;完整代码&#xff1a;运行结果&#xff1a;插入排序&#xff1a;完整代码&#xff1a;运行结果&#xff1a;快速排…

linux 环境异常登录的甄别方法

1、关于linux的登录记录 查看最近登录IP和历史命令执行日期 last 显示的最末尾的 使用last -10 看最新的 登录IP地址 时间 still仍在登录 选项&#xff1a; &#xff08;1&#xff09;-x&#xff1a;显示系统开关机以及执行等级信息 &#xff08;2&#xff09;-a&am…

SpringSecurity[3]-自定义登录逻辑,自定义登录页面,以及认证过程的其他配置

前一篇:SpringSecurity[2]-UserDetailsService详解以及PasswordEncoder密码解析器详解 链接:SpringSecurity[2]-UserDetailsService详解以及PasswordEncoder密码解析器详解_豆虫儿的博客-CSDN博客 五、自定义登录逻辑 当进行自定义登录逻辑时需要用到之前讲解的UserDetailsS…