【Spark分布式内存计算框架——Spark Streaming】5. DStream(上)

news2024/10/2 6:30:38

3. DStream

SparkStreaming模块将流式数据封装的数据结构:DStream(Discretized Stream,离散化数据流,连续不断的数据流),代表持续性的数据流和经过各种Spark算子操作后的结果数据流。

3.1 DStream 是什么

离散数据流(DStream)是Spark Streaming最基本的抽象。它代表了一种连续的数据流,要么从某种数据源提取数据,要么从其他数据流映射转换而来。DStream内部是由一系列连续的RDD组成的,每个RDD都包含了特定时间间隔内的一批数据,如下图所示:
在这里插入图片描述
DStream本质上是一个:一系列时间上连续的RDD(Seq[RDD]),DStream = Seq[RDD]。

DStream = Seq[RDD]
DStream相当于一个序列(集合),里面存储的数据类型为RDD(Streaming按照时间间隔划分流式数据)

对DStream的数据进行操作也是按照RDD为单位进行的。
在这里插入图片描述
通过WEB UI界面可知,对DStream调用函数操作,底层就是对RDD进行操作,发现很多时候DStream中函数与RDD中函数一样的。
在这里插入图片描述
DStream中每批次数据RDD在处理时,各个RDD之间存在依赖关系,DStream直接也有依赖关系,RDD具有容错性,那么DStream也具有容错性。
在这里插入图片描述
上图相关说明:
1)、每一个椭圆形表示一个RDD
2)、椭圆形中的每个圆形代表一个RDD中的一个Partition分区
3)、每一列的多个RDD表示一个DStream(图中有三列所以有三个DStream)
4)、每一行最后一个RDD则表示每一个Batch Size所产生的中间结果RDD

Spark Streaming将流式计算分解成多个Spark Job,对于每一时间段数据的处理都会经过Spark DAG图分解以及Spark的任务集的调度过程。

3.2 DStream Operations

DStream类似RDD,里面包含很多函数,进行数据处理和输出操作,主要分为两大类:

  • DStream#Transformations:将一个DStream转换为另一个DStream
    http://spark.apache.org/docs/2.4.5/streaming-programming-guide.html#transformations-on-dstreams
  • DStream#Output Operations:将DStream中每批次RDD处理结果resultRDD输出
    http://spark.apache.org/docs/2.4.5/streaming-programming-guide.html#output-operations-on-dstreams

函数概述
DStream中包含很多函数,大多数与RDD中函数类似,主要分为两种类型:

其一:转换函数【Transformation函数】
在这里插入图片描述
DStream中还有一些特殊函数,针对特定类型应用使用的函数,比如updateStateByKey状态函数、window窗口函数等,后续具体结合案例讲解。

其二:输出函数【Output函数】
在这里插入图片描述

DStream中每批次结果RDD输出使用foreachRDD函数,前面使用的print函数底层也是调用foreachRDD函数,截图如下所示:
在这里插入图片描述
在DStream中有两个重要的函数,都是针对每批次数据RDD进行操作的,更加接近底层,性能更好,强烈推荐使用:

  • 转换函数transform:将一个DStream转换为另外一个DStream;
  • 输出函数foreachRDD:将一个DStream输出到外部存储系统;

在SparkStreaming企业实际开发中,建议:能对RDD操作的就不要对DStream操作,当调用DStream中某个函数在RDD中也存在,使用针对RDD操作。

转换函数:transform
通过源码认识transform函数,有两个方法重载,声明如下:
在这里插入图片描述
接下来使用transform函数,修改词频统计程序,具体代码如下:

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 基于IDEA集成开发环境,编程实现从TCP Socket实时读取流式数据,对每批次中数据进行词频统计。
*/
object StreamingTransformRDD {
def main(args: Array[String]): Unit = {
// 1. 构建StreamingContext流式上下文实例对象
val ssc: StreamingContext = {
// a. 创建SparkConf对象,设置应用配置信息
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[3]")
// b.创建流式上下文对象, 传递SparkConf对象,TODO: 时间间隔 -> 用于划分流式数据为很多批次Batch
val context = new StreamingContext(sparkConf, Seconds(5))
// c. 返回
context
}
// 2. 从数据源端读取数据,此处是TCP Socket读取数据
/*
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String]
*/
val inputDStream: ReceiverInputDStream[String] = ssc.socketTextStream(
"node1.itcast.cn", //
9999, //
// TODO: 设置Block存储级别为先内存,不足磁盘,副本为1
storageLevel = StorageLevel.MEMORY_AND_DISK
)
// TODO: 3. 对每批次的数据进行词频统计
/*
transform表示对DStream中每批次数据RDD进行操作
def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]
*/
// TODO: 在DStream中,能对RDD操作的不要对DStream操作。
val resultDStream: DStream[(String, Int)] = inputDStream.transform(rdd => {
val resultRDD: RDD[(String, Int)] = rdd
// 过滤不合格的数据
.filter(line => null != line && line.trim.length > 0)
// 按照分隔符划分单词
.flatMap(line => line.trim.split("\\s+"))
// 转换数据为二元组,表示每个单词出现一次
.map(word => (word, 1))
// 按照单词分组,聚合统计
.reduceByKey((tmp, item) => tmp + item)
resultRDD
})
// 4. 将结果数据输出 -> 将每批次的数据处理以后输出
resultDStream.print(10)
// 5. 对于流式应用来说,需要启动应用
ssc.start()
// 流式应用启动以后,正常情况一直运行(接收数据、处理数据和输出数据),除非人为终止程序或者程序异常停止
ssc.awaitTermination()
// 关闭流式应用(参数一:是否关闭SparkContext,参数二:是否优雅的关闭)
ssc.stop(stopSparkContext = true, stopGracefully = true)
}
}

查看WEB UI监控中每批次Batch数据执行Job的DAG图,直接显示针对RDD进行操作。
在这里插入图片描述

输出函数:foreachRDD
foreachRDD函数属于将DStream中结果数据RDD输出的操作,类似transform函数,针对每批次RDD数据操作,源码声明如下:
在这里插入图片描述
继续修改词频统计代码,自定义输出数据,具体代码如下:

import java.util.Date
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
/**
* 基于IDEA集成开发环境,编程实现从TCP Socket实时读取流式数据,对每批次中数据进行词频统计。
*/
object StreamingOutputRDD {
def main(args: Array[String]): Unit = {
// 1. 构建StreamingContext流式上下文实例对象
val ssc: StreamingContext = {
// a. 创建SparkConf对象,设置应用配置信息
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[3]")
// TODO:设置数据输出文件系统的算法版本为2
.set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
// b.创建流式上下文对象, 传递SparkConf对象,TODO: 时间间隔 -> 用于划分流式数据为很多批次Batch
val context = new StreamingContext(sparkConf, Seconds(5))
// c. 返回
context
}
// 2. 从数据源端读取数据,此处是TCP Socket读取数据
/*
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String]
*/
val inputDStream: ReceiverInputDStream[String] = ssc.socketTextStream(
"node1.itcast.cn", //
9999, //
// TODO: 设置Block存储级别为先内存,不足磁盘,副本为1
storageLevel = StorageLevel.MEMORY_AND_DISK
)
// 3. 对每批次的数据进行词频统计
/*
transform表示对DStream中每批次数据RDD进行操作
def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]
*/
// TODO: 在DStream中,能对RDD操作的不要对DStream操作。
val resultDStream: DStream[(String, Int)] = inputDStream.transform(rdd => {
val resultRDD: RDD[(String, Int)] = rdd
// 过滤不合格的数据
.filter(line => null != line && line.trim.length > 0)
// 按照分隔符划分单词
.flatMap(line => line.trim.split("\\s+"))
// 转换数据为二元组,表示每个单词出现一次
.map(word => (word, 1))
// 按照单词分组,聚合统计
.reduceByKey((tmp, item) => tmp + item)
resultRDD
})
// TODO: 4. 将结果数据输出 -> 将每批次的数据处理以后输出
/*
对DStream中每批次结果RDD数据进行输出操作
def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit
其中Time就是每批次BatchTime,Long类型数据, 转换格式:2020/05/10 16:53:25
*/
resultDStream.foreachRDD{ (rdd, time) =>
// 使用lang3包下FastDateFormat日期格式类,属于线程安全的
val batchTime: String = FastDateFormat.getInstance("yyyyMMddHHmmss")
.format(new Date(time.milliseconds))
println("-------------------------------------------")
println(s"Time: $batchTime")
println("-------------------------------------------")
// TODO: 先判断RDD是否有数据,有数据在输出
if(!rdd.isEmpty()){
// 对于结果RDD输出,需要考虑降低分区数目
val resultRDD = rdd.coalesce(1)
// 对分区数据操作
resultRDD.foreachPartition{iter =>iter.foreach(item => println(item))}
// 保存数据至HDFS文件
resultRDD.saveAsTextFile(s"datas/streaming/wc-output-${batchTime}")
}
}
// 5. 对于流式应用来说,需要启动应用
ssc.start()
// 流式应用启动以后,正常情况一直运行(接收数据、处理数据和输出数据),除非人为终止程序或者程序异常停止
ssc.awaitTermination()
// 关闭流式应用(参数一:是否关闭SparkContext,参数二:是否优雅的关闭)
ssc.stop(stopSparkContext = true, stopGracefully = true)
}
}

将SparkStreaming处理结果RDD数据保存到MySQL数据库或者HBase表中,代码该如何编写呢?
http://spark.apache.org/docs/2.4.5/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
伪代码如下所示:

// 数据输出,将分析处理结果数据输出到MySQL表
resultDStream.foreachRDD{(rdd, time) =>
// 将BatchTime转换:2019/10/10 14:59:35
val batchTime = FastDateFormat.getInstance("yyyy/MM/dd HH:mm:ss").format(time.milliseconds)
println("-------------------------------------------")
println(s"Time: $batchTime")
println("-------------------------------------------")
// TODO:首先判断每批次结果RDD是否有值,有值才输出, 必须判断,提升性能
if(!rdd.isEmpty()){
rdd.foreachPartition{iter =>
// 第一步、获取连接:从数据库连接池中获取连接
val conn: Connection = null
// 第二步、保存分区数据到MySQL表
iter.foreach{item =>
 // TODO: 使用conn将数据保存到MySQL表中
}
// 第三步、关闭连接:将连接放入到连接池中
if(null != conn) conn.close()
}
}
}

将每批次数据统计结果RDD保存到HDFS文件中,代码如下:

resultDStream.foreachRDD{(rdd, time) =>
// 将BatchTime转换:2019/10/10 14:59:35
val batchTime = FastDateFormat.getInstance("yyyy/MM/dd HH:mm:ss").format(time.milliseconds)
println("-------------------------------------------")
println(s"Time: $batchTime")
println("-------------------------------------------")
// TODO:首先判断每批次结果RDD是否有值,有值才输出, 必须判断,提升性能
if(!rdd.isEmpty()){
// 注意:将Streaming结果数据RDD保存文件中时,最好考虑降低分区数目
rdd.coalesce(1).saveAsTextFile(s"datas/spark/streaming/wc-${time.milliseconds}")
}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/374429.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

镜像、复制和直线绘制CAD图形

这个CAD图形可以分成两个部分:上面一部分和下面一部分,都是有多个相同的对象,所以只需要绘制出其中一个再使用复制和旋转命令将其它的绘制出就可了,用到的主要CAD命令有CAD直线、CAD旋转、CAD镜像和CAD直线 目标对象 操作步骤 1…

利用递归实现括号匹配

案例引入以下则是各个字符串经过括号处理之后的结果:12((21))(12-->12(21)1232((((2121)212(21)-->32(2121)212(21)ABDF((SA)SA)SA(SA)SA(((-->ABDF((SA)SA)SA(SA)SA算法思路:这个问题的解决方法就是将字符按顺序逐一加入到新的string容器store…

python自学之《21天学通Python》(14)——第17章 Web网站编程

Web编程是程序设计应用之一,随着动态网站不断发展,Web编程已经成为程序设计的重要应用领域。目前Web编程主要有ASP.NET、PHP、Java等编程语言,Python语言也可以像其他语言一样应用于Web服务。 17.1 Web网站编程概述 Web是一个由许多互相链接…

mysql索引分析之二

mysql索引分析之一 mysql索引分析之二 mysql索引分析之二1 mysql的索引类型2 Explain执行计划2.1 执行计划之 id 属性2.1.1 id 的属性相同表示加载表的顺序是从上到下2.1.2 id 值越大,优先级越高2.1.3 id 有相同,也有不同,同时存在2.2 执行计…

浅析 Redis 主从同步与故障转移原理

我们在生产中使用 Redis,如果只部署一个 Redis 实例,当该实例宕机,到恢复之前都不可用;虽说 Redis 一般都用来做缓存,但不可用给业务系统带来的影响也是不小的,流量大时甚至会导致整个服务宕机。所以 Redis…

6.0.4:GrapeCity Documents for Excel GcExcel Crack

在更短的时间内生成 Excel 电子表格,不依赖于 Excel! 在任何应用程序中转换、计算、格式化和解析电子表格。 快速高效:其轻巧的尺寸意味着 Documents for Excel 针对快速处理大型 Excel 文档进行了优化 使用适用于 Windows、Linux 和 Mac 的…

Spring Cloud Sentinel实战(一)- Sentinel介绍

Sentinel介绍 什么是Sentinel 分布式系统的流量防卫兵:随着微服务的普及,服务调用的稳定性变得越来越重要。Sentinel以“流量”为切入点,在流量控制、断路、负载保护等多个领域开展工作,保障服务可靠性。 特点: 1. 2…

【尚硅谷MySQL入门到高级-宋红康】数据库概述

1、为什么要使用数据库 数据的持久化 2、数据库与数据库管理系统 2.1 数据库的相关概念 2.2 数据库与数据库管理系统的关系 3、 MySQL介绍 MySQL从5.7版本直接跳跃发布了8.0版本 ,可见这是一个令人兴奋的里程碑版本。MySQL 8版本在功能上做了显著的改进与增强&a…

c++提高篇——STL常用算法

STL常用算法一、常用遍历算法一、for_each 遍历容器二、transform 搬运容器到另一个容器中二、常用查找算法一、find二、find_if三、adjacent_find四、binary_search五、count六、count_if三、常用排序算法一、sort二、random_shuffle三、 merage四、reverse四、常用拷贝和替换…

推荐系统遇上深度学习(一四三)-[快手]一致性终身用户行为建模方法TWIN

标题:《TWIN: TWo-stage Interest Network for Lifelong User Behavior Modeling in CTR Prediction at Kuaishou》链接:https://arxiv.org/pdf/2302.02352.pdf今天给大家分享的是快手近期发表的终身行为序列建模上的工作,当前工业界主流的方…

一文打通计算机字符编码

有关编码的基础知识 1. 位 bit 最小的单元 字节 byte 机器语言的单位 1byte8bits 1KB1024byte 1MB1024KB 1GB1024MB 2.进制 二进制 binary 八进制 octal 十进制 decimal 十六进制 hex 3.字符 字符:是各种文字和符号的总称&#x…

Linux 自带的 LED 灯驱动实验

目录 一、配置内核 二、设备树节点编写 1、确定compatible 属性值 2、编写节点 三、验证 测试 其实 Linux 内核已经自带了 LED 灯驱动,要使用 Linux 内核自带的 LED 灯驱动首先得先配置 Linux 内核,使能自带的 LED 灯驱动。 一、配置内核 在Linux内…

【LeetCode】剑指 Offer(10)

目录 题目:剑指 Offer 27. 二叉树的镜像 - 力扣(Leetcode) 题目的接口: 解题思路: 代码: 过啦!!! 题目:剑指 Offer 28. 对称的二叉树 - 力扣&#xff0…

【人脸识别】FROM:提升遮挡状态下的人脸识别效果

论文题目:《End2End Occluded Face Recognition by Masking Corrupted Features》 论文地址:https://arxiv.org/pdf/2108.09468v3.pdf 代码地址:https://github.com/haibo-qiu/from 1.前言 人脸识别技术已经取得了显著的进展,主要…

几个开源 RUST 安全算法库

这段时间把 RUST 语法过了一遍,写一些简单的 Demo 程序没啥问题了,但离掌握这门语言还差的远,需要项目实战才行。我决定从之前研究过的国密算法入手,使用 RUST 实现国密算法。从头编写算法不太现实,上网搜了一下&#…

pandas数据分析36——快速独热和反独热处理

做数据预处理的时候,很多文本分类变量需要变为数值型。 下面提供一些方法,就以最经典的泰但尼克号数据集作为例子。 先导包读取数据 import numpy as np import pandas as pd datapd.read_csv(train.csv) datadata.drop(columns[Name,Ticket,Cabin],…

jmeter报错: java.io.EOFException: Unexpected end of ZLIB input stream

一、背景: 1.1 报错信息 java.io.EOFException: Unexpected end of ZLIB input stream at java.util.zip.InflaterInputStream.fill(InflaterInputStream.java:240) at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:158) at java.util.zip…

分割线-----

各位好,之前的文章就不再回复和更新了,都是大学的时候随便玩玩的。真的不能算学术研究。致敬过去的自己。努力真的可以幸福!虽然还是一样在搬砖!

Allegro如何快速锁定整板测试点操作指导

Allegro如何快速锁定整板测试点操作指导 在做PCB设计的时候,会需要给整板添加测试点,用于飞针测试,如下图 在测试点添加好之后,文件输出之前需要把测试点全部锁定,避免因为测试点模具开好,测试点被移动的情况出现 如果逐个锁定Via,容易遗漏 Allegro支持快速锁定整板测…

计算机网络高频知识点(一)

目录 一、http状态码 二、浏览器怎么数据缓存 三、强缓存与协商缓存 1、强缓存 2、协商缓存 四、简单请求与复杂请求 五、PUT 请求类型 六、GET请求类型 七、GET 和 POST 的区别 八、跨域 1、什么时候会跨域 2、解决方式 九、计算机网络的七层协议与五层协议分别指…