目录
0. 相关文章链接
1. DStream转换概述
2. 无状态转化操作
2.1. Transform
2.2. join
3. 有状态转化操作
3.1. UpdateStateByKey
3.2. WindowOperations
0. 相关文章链接
Spark文章汇总
1. DStream转换概述
DStream 上的操作与 RDD 的类似,分为 Transformations(转换)和 Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种 Window 相关的原语。
2. 无状态转化操作
无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的每一个 RDD。部分无状态转化操作列在了下表中。注意,针对键值对的 DStream 转化操作(比如reduceByKey())要添加 import StreamingContext._才能在 Scala 中使用。
需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个 DStream 在内部是由许多 RDD(批次)组成,且无状态转化操作是分别应用到每个 RDD 上的。 例如:reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据。
2.1. Transform
Transform 允许 DStream 上执行任意的 RDD-to-RDD 函数。即使这些函数并没有在 DStream 的 API 中暴露出来,通过该函数可以方便的扩展 Spark API。该函数每一批次调度一次。其实也就是对 DStream 中的 RDD 应用转换。
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamTest {
def main(args: Array[String]): Unit = {
//初始化Spark配置信息
val sparkConf: SparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("StreamTest")
//初始化SparkStreamingContext,并设置CK
val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(3))
ssc.checkpoint("./checkpoint")
//创建DStream
val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
//转换为RDD操作
val wordAndCountDStream: DStream[(String, Int)] = lineDStream.transform(
(rdd: RDD[String]) => {
val words: RDD[String] = rdd.flatMap(_.split(" "))
val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
val value: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)
value
}
)
//打印
wordAndCountDStream.print
//启动
ssc.start()
ssc.awaitTermination()
}
}
2.2. join
两个流之间的 join 需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的 RDD 进行 join,与两个 RDD 的 join 效果相同。
代码如下:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamTest {
def main(args: Array[String]): Unit = {
//1.初始化Spark配置信息
val sparkConf: SparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("StreamTest")
//2.初始化SparkStreamingContext,并设置CK
val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(30))
ssc.checkpoint("./checkpoint")
//3.从端口获取数据创建流
val lineDStream1: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
val lineDStream2: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 8888)
//4.将两个流转换为KV类型
val wordToOneDStream: DStream[(String, Int)] = lineDStream1.flatMap(_.split(" ")).map((_, 1))
val wordToTwoDStream: DStream[(String, String)] = lineDStream2.flatMap(_.split(" ")).map((_, "a"))
//5.流的JOIN
val joinDStream: DStream[(String, (Int, String))] = wordToOneDStream.join(wordToTwoDStream)
//6.打印
wordToOneDStream.print()
wordToTwoDStream.print()
joinDStream.print()
//7.启动任务
ssc.start()
ssc.awaitTermination()
}
}
数据输入如下:
数据输出如下:
-------------------------------------------
Time: 1689237570000 ms
-------------------------------------------
(abc,1)
(hello,1)
(b,1)
-------------------------------------------
Time: 1689237570000 ms
-------------------------------------------
(abc,a)
(hello,a)
(a,a)
-------------------------------------------
Time: 1689237570000 ms
-------------------------------------------
(abc,(1,a))
(hello,(1,a))
-------------------------------------------
Time: 1689237600000 ms
-------------------------------------------
-------------------------------------------
Time: 1689237600000 ms
-------------------------------------------
-------------------------------------------
Time: 1689237600000 ms
-------------------------------------------
3. 有状态转化操作
3.1. UpdateStateByKey
UpdateStateByKey 原语用于记录历史记录,有时,我们需要在 DStream 中跨批次维护状态(例如流计算中累加 wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的 DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。 updateStateByKey() 的结果会是一个新的 DStream,其内部的 RDD 序列是由每个时间区间对应的(键,状态)对组成的。
updateStateByKey 操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能,需要做下面两步:
- 定义状态,状态可以是一个任意的数据类型。
- 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。
使用 updateStateByKey 需要对检查点目录进行配置,会使用检查点来保存状态,更新版的wordCount如下:
- 编写代码:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamTest {
def main(args: Array[String]): Unit = {
// 创建执行环境,并设置checkpoint
val conf: SparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("StreamTest")
val ssc: StreamingContext = new StreamingContext(conf, Seconds(10))
ssc.checkpoint("./checkpoint")
// 获取数据,并进行解析
val dataDStream: DStream[(String, Int)] = ssc
.socketTextStream("localhost", 9999)
.flatMap((_: String).split(" "))
.map((word: String) => (word, 1))
// 使用updateStateByKey来更新状态,统计从运行开始以来单词总的次数
// 会根据传入的key进行划分,将相同的key划分到同一个渠道中
// 其中values参数是当前批次的数据,比如当前批次传入的数据是 abc hello abc,那就是 (abc,1) (hello,1) (abc,1)
// 其中state为以往批次单词频度,比如之前传入的2个批次的数据是 abc hello abc和abc hello abc,那就是 (abc,4) (hello,2)
// 注意:上述2个参数中只包含对应的值,不包括key,key在算子中就已经进行划分了
// 此时在方法中,就可以对取出的value进行运算了(比如sum,avg等运算)
val stateDstream: DStream[(String, Int)] = dataDStream
.updateStateByKey[Int](
(values: Seq[Int], state: Option[Int]) => {
println("values开始")
values.foreach((data: Int) => {
println("values == " + data)
})
println("values结束")
println("state开始")
state.foreach((data: Int) => {
println("state == " + data)
})
println("state结束")
val currentCount: Int = values.sum
val previousCount: Int = state.getOrElse(0)
Some(currentCount + previousCount)
}
)
// 打印数据
stateDstream.print()
// 启动环境
ssc.start()
ssc.awaitTermination()
}
}
- 启动程序并向 9999 端口发送数据:
- 结果展示:
/*
第一个批次说明解析:
因为之前没有state,所以state开始就结束了,没有打印日志出来
而values的话,是当前批次的数据,每条数据就会打印一条日志,所以有3条打印,并且因为是2个key,所以有2个values开始和结束
*/
-------------------------------------------
Time: 1689240910000 ms
-------------------------------------------
values开始
values == 1
values == 1
values结束
state开始
state结束
values开始
values == 1
values结束
state开始
state结束
/*
第二个批次说明解析:
在此批次中,有state数据了,并且还是2个key,所以打印了2个stats开始和结束;而且state存储的是历史状态累积,所以这是这2个key在之前批次的数据累积
而values的话,是当前批次的数据,每条数据就会打印一条日志,所以有6条打印,并且因为是2个key,所以有2个values开始和结束
*/
-------------------------------------------
Time: 1689240920000 ms
-------------------------------------------
(abc,2)
(hello,1)
values开始
values == 1
values == 1
values结束
state开始
state == 1
state结束
values开始
values == 1
values == 1
values == 1
values == 1
values结束
state开始
state == 2
state结束
/*
第三个批次说明解析:
在此批次中,有state数据了,并且还是2个key,所以打印了2个stats开始和结束;而且state存储的是历史状态累积,所以这是这2个key在之前批次的数据累积(比第2个批次的数据值增加了)
而values的话,是当前批次的数据,在这个批次中没有数据输入,所以没有values的值,但是还是那2个key,所以打印了2个values开始和结束
*/
-------------------------------------------
Time: 1689240930000 ms
-------------------------------------------
(abc,6)
(hello,3)
values开始
values结束
state开始
state == 6
state结束
values开始
values结束
state开始
state == 3
state结束
3.2. WindowOperations
Window Operations 可以设置窗口的大小和滑动窗口的间隔来动态的获取当前 Steaming 的允许状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。
- 窗口时长:计算内容的时间范围;
- 滑动步长:隔多久触发一次计算;
注意:这两者都必须为采集周期大小的整数倍。
WordCount窗口计算版本,如下所示:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamTest {
def main(args: Array[String]): Unit = {
// 创建执行环境,并设置checkpoint
val conf: SparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("StreamTest")
val ssc: StreamingContext = new StreamingContext(conf, Seconds(5))
ssc.checkpoint("./checkpoint")
// 接收数据,并开窗统计
// 在如下的reduceByKeyAndWindow方法中,第一个参数是数据处理方法,第二个参数窗口的大小,第三个是滑动的步长
// 如果没有第三个参数,那就是滚动窗口
val wordCounts: DStream[(String, Int)] = ssc
.socketTextStream("localhost", 9999)
.flatMap((_: String).split(" "))
.map((word: String) => (word, 1))
.reduceByKeyAndWindow((a: Int, b: Int) => (a + b), Seconds(20), Seconds(10))
// 结果打印
wordCounts.print()
// 启动环境
ssc.start()
ssc.awaitTermination()
}
}
关于 Window 的操作还有如下方法:
- window(windowLength, slideInterval): 基于对源 DStream 窗化的批次进行计算返回一个新的 Dstream;
- countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数;
- reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流;
- reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]): 当在一个(K,V)对的 DStream 上调用此函数,会返回一个新(K,V)对的 DStream,此处通过对滑动窗口中批次数据使用 reduce 函数来整合每个 key 的 value 值。
- reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): 这个函数是上述函数的变化版本,每个窗口的 reduce 值都是通过用前一个窗的 reduce 值来递增计算。通过 reduce 进入到滑动窗口数据并”反向 reduce”离开窗口的旧数据来实现这个操作。一个例子是随着窗口滑动对 keys 的“加”“减”计数。通过前边介绍可以想到,这个函数只适用于” 可逆的 reduce 函数”,也就是这些 reduce 函数有相应的”反 reduce”函数(以参数 invFunc 形式传入)。如前述函数,reduce 任务的数量通过可选参数来配置。
val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getIpAddress(), 1))
val ipCountDStream = ipDStream
.reduceByKeyAndWindow(
{(x, y) => x + y},
{(x, y) => x - y},
Seconds(30),
Seconds(10)
)
//加上新进入窗口的批次中的元素
//移除离开窗口的老批次中的元素
//窗口时长
//滑动步长
countByWindow()和 countByValueAndWindow()作为对数据进行计数操作的简写。countByWindow()返回一个表示每个窗口中元素个数的 DStream,而 countByValueAndWindow()返回的 DStream 则包含窗口中每个值的个数。
val ipDStream = accessLogsDStream.map{
entry => entry.getIpAddress()
}
val ipAddressRequestCount = ipDStream.countByValueAndWindow(Seconds(30), Seconds(10))
val requestCount = accessLogsDStream.countByWindow(Seconds(30), Seconds(10))
注:其他Spark相关系列文章链接由此进 -> Spark文章汇总