目录
0. 相关文章链接
1. 什么是Structured Streaming
2. Structure Streaming 快速入门
2.1. 导入依赖
2.2. 代码实现
2.3. 程序测试
2.4. 代码说明
0. 相关文章链接
Spark文章汇总
1. 什么是Structured Streaming
从 spark2.0 开始, spark 引入了一套新的流式计算模型: Structured Streaming。该组件进一步降低了处理数据的延迟时间, 它实现了“有且仅有一次(Exectly Once)” 语义, 可以保证数据被精准消费。Structured Streaming 基于 Spark SQl 引擎, 是一个具有弹性和容错的流式处理引擎。 使用 Structure Streaming 处理流式计算的方式和使用批处理计算静态数据(表中的数据)的方式是一样的。随着流数据的持续到达, Spark SQL 引擎持续不断的运行并得到最终的结果。 我们可以使用 Dataset/DataFrame API 来表达流的聚合, 事件-时间窗口(event-time windows), 流-批处理连接(stream-to-batch joins)等等。 这些计算都是运行在被优化过的 Spark SQL 引擎上。 最终, 通过 chekcpoin 和 WALs(Write-Ahead Logs), 系统保证end-to-end exactly-once。
总之, Structured Streaming 提供了快速, 弹性, 容错, end-to-end exactly-once 的流处理, 而用户不需要对流进行推理(比如 spark streaming 中的流的各种转换)。默认情况下, 在内部, Structured Streaming 查询使用微批处理引擎(micro-batch processing engine)处理, 微批处理引擎把流数据当做一系列的小批job(small batch jobs ) 来处理。 所以, 延迟低至 100 毫秒, 从 Spark2.3, 引入了一个新的低延迟处理模型:Continuous Processing, 延迟低至 1 毫秒。
2. Structure Streaming 快速入门
2.1. 导入依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.3</version>
</dependency>
2.2. 代码实现
object StreamTest {
def main(args: Array[String]): Unit = {
// 1. 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("StreamTest")
.getOrCreate()
import spark.implicits._
// 2. 从数据源(socket)中加载数据.
val lines: DataFrame = spark.readStream
.format("socket") // 设置数据源
.option("host", "localhost")
.option("port", 9999)
.load
// 3. 把每行数据切割成单词
val words: Dataset[String] = lines.as[String].flatMap((_: String).split("\\W"))
// 4. 计算 word count
val wordCounts: DataFrame = words.groupBy("value").count()
// 5. 启动查询, 把结果打印到控制台
val query: StreamingQuery = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start
query.awaitTermination()
spark.stop()
}
}
2.3. 程序测试
步骤一:在windows上启动socket并输入数据
步骤二:查看程序输出结果
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
+-----+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|abcabc| 2|
| hello| 1|
+------+-----+
-------------------------------------------
Batch: 2
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|abcabc| 2|
| hello| 1|
| spark| 2|
| abc| 1|
+------+-----+
-------------------------------------------
Batch: 3
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|abcabc| 2|
| hello| 1|
| spark| 2|
| abc| 3|
+------+-----+
2.4. 代码说明
- DataFrame lines 表示一个“无界表(unbounded table)”, 存储着流中所有的文本数据。 这个无界表包含列名为value的一列数据, 数据的类型为String, 而且在流式文本数据中的每一行(line)就变成了无界表中的的一行(row)。 注意, 这时我们仅仅设置了转换操作, 还没有启动它, 所以现在还没有收到任何数据
- 紧接着我们把 DateFrame 通过 as[String] 变成了 DataSet, 所以我们可以切割每行为多个单词。得到的 words DataSet包含了所有的单词。
- 最后, 我们通过value(每个唯一的单词)进行分组得到wordCounts DataFrame, 并且统计每个单词的个数。 注意, wordCounts是一个流式DataFrame, 它表示流中正在运行的单词数(the running word counts of the stream)。
- 我们必须在流式数据(streaming data)上启动查询。 剩下的实际就是开始接收数据和计算个数。 为此, 当数据更新的时候, 我们通过outputMode("complete")来打印完整的计数集到控制台, 然后通过。start来启动流式计算。
- 代码执行之后, 流式计算将会在后台启动。 查询对象(query: StreamingQuery)可以激活流式查询(streaming query), 然后通过awaitTermination()来等待查询的终止,从而阻止查询激活之后进程退出。
注:其他Spark相关系列文章链接由此进 -> Spark文章汇总