sparkstreaming
1.批处理与流处理
spark本身作为引擎时是批处理,从信息源全部读取数据,然后一批一批处理数据。处理sparkSQL等之后再存入hdfs。
sparkstreaming是实时引擎,在一个窗口时间内(比如1s)积攒数据,然后处理。更像伪实时处理。
DStream 就是streaming在一段十时间内收集数据的抽象,类似于RDD。
DStream内部是由一系列连续的RDD组成的.
2.DEMO
package org
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @author noor9
* @date 2021-02-01-19:55
*/
object StreamingWordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[6]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.sparkContext.setLogLevel("WARN")
val lines: ReceiverInputDStream[String] = ssc.socketTextStream(
hostname = "xxx.xxx.xxx.xxxx",
port = 9999,
storageLevel = StorageLevel.MEMORY_AND_DISK_SER
)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>scala_config</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- 在
Spark
中, 一般使用XXContext
来作为入口,Streaming
也不例外, 所以创建StreamingContext
就是创建入口 - 开启
Socket
的Receiver
, 连接到某个TCP
端口, 作为Socket client
, 去获取数据 - 选择
Receiver
获取到数据后的保存方式, 此处是内存和磁盘都有, 并且序列化后保存 - 类似
RDD
中的Action
, 执行最后的数据输出和收集 - 启动流和
JobGenerator
, 开始流式处理数据 - 阻塞主线程, 后台线程开始不断获取数据并处理
注意点
-
Spark Streaming
并不是真正的来一条数据处理一条Spark Streaming
的处理机制叫做小批量, 英文叫做mini-batch
, 是收集了一定时间的数据后生成RDD
, 后针对RDD
进行各种转换操作, 这个原理提现在如下两个地方- 控制台中打印的结果是一个批次一个批次的, 统计单词数量也是按照一个批次一个批次的统计
- 多长时间生成一个
RDD
去统计呢? 由new StreamingContext(sparkConf, Seconds(1))
这段代码中的第二个参数指定批次生成的时间
-
Spark Streaming
中至少要有两个线程在使用
spark-submit
启动程序的时候, 不能指定一个线程- 主线程被阻塞了, 等待程序运行
- 需要开启后台线程获取数据
创建 StreamingContext
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
-
StreamingContext
是Spark Streaming
程序的入口 -
在创建
StreamingContext
的时候, 必须要指定两个参数, 一个是SparkConf
, 一个是流中生成RDD
的时间间隔 -
StreamingContext
提供了如下功能
- 创建
DStream
, 可以通过读取Kafka
, 读取Socket
消息, 读取本地文件等创建一个流, 并且作为整个DAG
中的InputDStream
RDD
遇到Action
才会执行, 但是DStream
不是,DStream
只有在StreamingContext.start()
后才会开始接收数据并处理数据- 使用
StreamingContext.awaitTermination()
等待处理被终止 - 使用
StreamingContext.stop()
来手动的停止处理
- 创建
-
在使用的时候有如下注意点
- 同一个
Streaming
程序中, 只能有一个StreamingContext
- 一旦一个
Context
已经启动 (start
), 则不能添加新的数据源 **
- 同一个
各种算子
- 这些算子类似
RDD
, 也会生成新的DStream
- 这些算子操作最终会落到每一个
DStream
生成的RDD
中
算子 | 释义 |
---|---|
flatMap | lines.flatMap(_.split(" ")) 将一个数据一对多的转换为另外的形式, 规则通过传入函数指定 |
map | words.map(x => (x, 1)) 一对一的转换数据 |
reduceByKey | words.reduceByKey(_ + _) 这个算子需要特别注意, 这个聚合并不是针对于整个流, 而是针对于某个批次的数据 |
SparkStreaming原理
- 静态
DAG
- 动态切分
- 数据流入
- 容错机制
关于receiver的一些知识
- receiver是分片的
- receiver可以在每一个executer中运行
- receiver是专门用于接受数据的一个组件
3.MovieDemo
package org
import org.apache.spark.sql.SparkSession
object MoviesDemo {
def main(args: Array[String]): Unit = {
// 创建一个SparkContext来初始化Spark
// 2.0 以前的用法
// val conf = new SparkConf().setMaster("local").setAppName("movie demo")
// val sc = new SparkContext(conf)
// 2.0 以后的用法
val spark = SparkSession.builder().master("local[*]").appName("movie demo").getOrCreate()
val sc = spark.sparkContext
// 加载数据,构造RDD(注:这里数据集放在项目的src/data/movielens/目录下)
val ratingsFile = "src/data/movielens/rating.csv" // 评分数据集
val moviesFile = "src/data/movielens/movie.csv" // 电影数据集
val ratingsRDD = sc.textFile(ratingsFile)
val moviesRDD = sc.textFile(moviesFile)
// 从评分数据集中抽取每部电影的评分,以(movieid, rating)的形式返回
// 因为第一行是标题行,所以过滤掉
val movieAvgScore = ratingsRDD
.filter(line => !line.startsWith("\"userId\""))
.map(line => {val fields = line.split(","); (fields(1).trim.toInt, fields(2).trim.toDouble)})
.groupByKey()
.map(t => (t._1, t._2.sum/t._2.size))
.filter(t => t._2 > 4.0)
// 从电影数据集中抽取电影名称,以(movieId, movieName)的形式返回
// 因为第一行是标题行,所以过滤掉
val moviesInfo = moviesRDD
.filter(line => !line.startsWith("\"movieId\""))
.map(line => {val fields = line.split(","); (fields(0).toInt, fields(1))})
// 将两个数据集连接起来,得到(movieId, movieName, avgScore)
val result = movieAvgScore.join(moviesInfo)
.map(f => (f._2._1,(f._1, f._2._2, f._2._1)))
.sortByKey(ascending = false)
.map(t => t._2)
// 列表显示
result.collect.foreach(println)
// 将查询结果保存到HDFS文件系统中
//result.saveAsTextFile("src/data/movielens/result")
result.repartition(1).saveAsTextFile("file:///C://Users//esvtek//IdeaProjects//scala_config//src//data//movielens//result")
}
}
4.开发依赖项
还有,对于从Kafka、Flume以及Kinesis这类数据源提取数据的流式应用来说,还需要额外增加相应的依赖项,下表列出了各种数据源对应的额外依赖项:
数据源 | Maven工件 |
---|---|
Kafka | spark-streaming-kafka_2.10 |
Flume | spark-streaming-flume_2.10 |
Kinesis | spark-streaming-kinesis-asl_2.10 [Amazon Software License] |
spark-streaming-twitter_2.10 | |
ZeroMQ | spark-streaming-zeromq_2.10 |
MQTT | spark-streaming-mqtt_2.10 |
5.初始化
要初始化任何一个Spark Streaming程序,都需要在入口代码中创建一个StreamingContext对象。
而StreamingContext对象需要一个SparkConf对象作为其构造参数。
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
上面代码中的 appName 是你给该应用起的名字,这个名字会展示在Spark集群的web UI上。而 master 是Spark, Mesos or YARN cluster URL,如果支持本地测试,你也可以用”local[]”为其赋值。通常在实际工作中,你不应该将master参数硬编码到代码里,而是应用通过spark-submit的参数来传递master的值(launch the application with spark-submit
)。不过对本地测试来说,”local[]”足够了(该值传给master后,Spark Streaming将在本地进程中,启动n个线程运行,n与本地系统CPU core数相同)。注意,StreamingContext在内部会创建一个 SparkContext 对象(SparkContext是所有Spark应用的入口,在StreamingContext对象中可以这样访问:ssc.sparkContext)。
StreamingContext还有另一个构造参数,即:批次间隔,这个值的大小需要根据应用的具体需求和可用的集群资源来确定。详见Spark性能调优( Performance Tuning)。
StreamingContext对象也可以通过已有的SparkContext对象来创建,示例如下:
val sc = ... // 已有的SparkContext
val ssc = new StreamingContext(sc, Seconds(1))
context对象创建后,你还需要如下步骤:
- 创建DStream对象,并定义好输入数据源。
- 基于数据源DStream定义好计算逻辑和输出。
- 调用streamingContext.start() 启动接收并处理数据。
- 调用streamingContext.awaitTermination() 等待流式处理结束(不管是手动结束,还是发生异常错误)
- 你可以主动调用 streamingContext.stop() 来手动停止处理流程。
需要关注的重点:
- 一旦streamingContext启动,就不能再对其计算逻辑进行添加或修改。
- 一旦streamingContext被stop掉,就不能restart。
- 单个JVM虚机同一时间只能包含一个active的StreamingContext。
- StreamingContext.stop() 也会把关联的SparkContext对象stop掉,如果不想把SparkContext对象也stop掉,可以将StreamingContext.stop的可选参数 stopSparkContext 设为false。
- 一个SparkContext对象可以和多个StreamingContext对象关联,只要先对前一个StreamingContext.stop(sparkContext=false),然后再创建新的StreamingContext对象即可。
- 如果本地运行Spark Streaming应用,记得不能将master设为”local” 或 “local[1]”。这两个值都只会在本地启动一个线程。而如果此时你使用一个包含接收器(如:套接字、Kafka、Flume等)的输入DStream,那么这一个线程只能用于运行这个接收器,而处理数据的逻辑就没有线程来执行了。因此,本地运行时,一定要将master设为”local[n]”,其中 n > 接收器的个数。
- 将Spark Streaming应用置于集群中运行时,同样,分配给该应用的CPU core数必须大于接收器的总数。否则,该应用就只会接收数据,而不会处理数据。
6.可靠性
从可靠性角度来划分,大致有两种数据源。其中,像Kafka、Flume这样的数据源,它们支持对所传输的数据进行确认。系统收到这类可靠数据源过来的数据,然后发出确认信息,这样就能够确保任何失败情况下,都不会丢数据。因此我们可以将接收器也相应地分为两类:
- 可靠接收器(Reliable Receiver) – 可靠接收器会在成功接收并保存好Spark数据副本后,向可靠数据源发送确认信息。
- *不可靠接收器(*Unreliable Receiver) – 不可靠接收器不会发送任何确认信息。不过这种接收器常用语于不支持确认的数据源,或者不想引入数据确认的复杂性的数据源。
7.缓存/持久化
和RDD类似,DStream也支持将数据持久化到内存中。只需要调用 DStream的persist() 方法,该方法内部会自动调用DStream中每个RDD的persist方法进而将数据持久化到内存中。这对于可能需要计算很多次的DStream非常有用(例如:对于同一个批数据调用多个算子)。对于基于滑动窗口的算子,如:reduceByWindow和reduceByKeyAndWindow,或者有状态的算子,如:updateStateByKey,数据持久化就更重要了。因此,滑动窗口算子产生的DStream对象默认会自动持久化到内存中(不需要开发者调用persist)。
对于从网络接收数据的输入数据流(如:Kafka、Flume、socket等),默认的持久化级别会将数据持久化到两个不同的节点上互为备份副本,以便支持容错。
注意,与RDD不同的是,DStream的默认持久化级别是将数据序列化到内存中。进一步的讨论见性能调优这一小节。
8.检查点/checkpoint
streaming一般长时间运行,所以需要有一个备份机制,防止意外宕机时数据重头计算。
检查点一般保存以下两种数据。
元数据检查点(Metadata checkpointing)
– 保存流式计算逻辑的定义信息到外部可容错存储系统(如:HDFS)。主要用途是用于在故障后回复应用程序本身(后续详谈)。元数包括:
- Configuration – 创建Streaming应用程序的配置信息。
- DStream operations – 定义流式处理逻辑的DStream操作信息。
- Incomplete batches – 已经排队但未处理完的批次信息。
数据检查点(Data checkpointing)
– 将生成的RDD保存到可靠的存储中。这对一些需要跨批次组合数据或者有状态的算子来说很有必要。在这种转换算子中,往往新生成的RDD是依赖于前几个批次的RDD,因此随着时间的推移,有可能产生很长的依赖链条。为了避免在恢复数据的时候需要恢复整个依赖链条上所有的数据,检查点需要周期性地保存一些中间RDD状态信息,以斩断无限制增长的依赖链条和恢复时间。
检查点的启用时间
- 使用了有状态的转换算子(Usage of stateful transformations) – 不管是用了 updateStateByKey 还是用了 reduceByKeyAndWindow(有”反归约”函数的那个版本),你都必须配置检查点目录来周期性地保存RDD检查点。
- 支持驱动器故障中恢复(Recovering from failures of the driver running the application) – 这时候需要元数据检查点以便恢复流式处理的进度信息。
注意,一些简单的流式应用,如果没有用到前面所说的有状态转换算子,则完全可以不开启检查点。不过这样的话,驱动器(driver)故障恢复后,有可能会丢失部分数据(有些已经接收但还未处理的数据可能会丢失)。不过通常这点丢失时可接受的,很多Spark Streaming应用也是这样运行的。对非Hadoop环境的支持未来还会继续改进。