Spark(35):Structured Streaming 概述

news2024/12/30 3:10:46

目录

0. 相关文章链接

1. 什么是Structured Streaming

2. Structure Streaming 快速入门

2.1. 导入依赖

2.2. 代码实现

2.3. 程序测试

2.4. 代码说明


0. 相关文章链接

 Spark文章汇总 

1. 什么是Structured Streaming

        从 spark2.0 开始, spark 引入了一套新的流式计算模型: Structured Streaming。该组件进一步降低了处理数据的延迟时间, 它实现了“有且仅有一次(Exectly Once)” 语义, 可以保证数据被精准消费。Structured Streaming 基于 Spark SQl 引擎, 是一个具有弹性和容错的流式处理引擎。 使用 Structure Streaming 处理流式计算的方式和使用批处理计算静态数据(表中的数据)的方式是一样的。随着流数据的持续到达, Spark SQL 引擎持续不断的运行并得到最终的结果。 我们可以使用 Dataset/DataFrame API 来表达流的聚合, 事件-时间窗口(event-time windows), 流-批处理连接(stream-to-batch joins)等等。 这些计算都是运行在被优化过的 Spark SQL 引擎上。 最终, 通过 chekcpoin 和 WALs(Write-Ahead Logs), 系统保证end-to-end exactly-once。

        总之, Structured Streaming 提供了快速, 弹性, 容错, end-to-end exactly-once 的流处理, 而用户不需要对流进行推理(比如 spark streaming 中的流的各种转换)。默认情况下, 在内部, Structured Streaming 查询使用微批处理引擎(micro-batch processing engine)处理, 微批处理引擎把流数据当做一系列的小批job(small batch jobs ) 来处理。 所以, 延迟低至 100 毫秒, 从 Spark2.3, 引入了一个新的低延迟处理模型:Continuous Processing, 延迟低至 1 毫秒。

2. Structure Streaming 快速入门

2.1. 导入依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.4.3</version>
</dependency>

2.2. 代码实现

object StreamTest {

    def main(args: Array[String]): Unit = {

        // 1. 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamTest")
            .getOrCreate()
        import spark.implicits._

        // 2. 从数据源(socket)中加载数据.
        val lines: DataFrame = spark.readStream
            .format("socket") // 设置数据源
            .option("host", "localhost")
            .option("port", 9999)
            .load

        // 3. 把每行数据切割成单词
        val words: Dataset[String] = lines.as[String].flatMap((_: String).split("\\W"))

        // 4. 计算 word count
        val wordCounts: DataFrame = words.groupBy("value").count()

        // 5. 启动查询, 把结果打印到控制台
        val query: StreamingQuery = wordCounts.writeStream
            .outputMode("complete")
            .format("console")
            .start

        query.awaitTermination()
        spark.stop()

    }
}

2.3. 程序测试

步骤一:在windows上启动socket并输入数据

步骤二:查看程序输出结果

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
+-----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|abcabc|    2|
| hello|    1|
+------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|abcabc|    2|
| hello|    1|
| spark|    2|
|   abc|    1|
+------+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|abcabc|    2|
| hello|    1|
| spark|    2|
|   abc|    3|
+------+-----+

2.4. 代码说明

  1. DataFrame lines 表示一个“无界表(unbounded table)”, 存储着流中所有的文本数据。 这个无界表包含列名为value的一列数据, 数据的类型为String, 而且在流式文本数据中的每一行(line)就变成了无界表中的的一行(row)。 注意, 这时我们仅仅设置了转换操作, 还没有启动它, 所以现在还没有收到任何数据
  2. 紧接着我们把 DateFrame 通过 as[String] 变成了 DataSet, 所以我们可以切割每行为多个单词。得到的 words DataSet包含了所有的单词。
  3. 最后, 我们通过value(每个唯一的单词)进行分组得到wordCounts DataFrame, 并且统计每个单词的个数。 注意, wordCounts是一个流式DataFrame, 它表示流中正在运行的单词数(the running word counts of the stream)。
  4. 我们必须在流式数据(streaming data)上启动查询。 剩下的实际就是开始接收数据和计算个数。 为此, 当数据更新的时候, 我们通过outputMode("complete")来打印完整的计数集到控制台, 然后通过。start来启动流式计算。
  5. 代码执行之后, 流式计算将会在后台启动。 查询对象(query: StreamingQuery)可以激活流式查询(streaming query), 然后通过awaitTermination()来等待查询的终止,从而阻止查询激活之后进程退出。

注:其他Spark相关系列文章链接由此进 ->  Spark文章汇总 


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/782889.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

新Viewport单位

本文为翻译本文译者为 360 奇舞团前端开发工程师原文标题&#xff1a;New Viewport Units原文作者&#xff1a;Ahmad Shadeed原文地址&#xff1a;https://ishadeed.com/article/new-viewport-units/ 自 2012 年以来&#xff0c;我们一直在使用 CSS viewport 单位。它们对于帮助…

RGB简单人脸活体检测(Liveness Detection)

参考&#xff1a; https://github.com/minivision-ai/Silent-Face-Anti-Spoofing&#xff08;主要这个库&#xff09; https://github.com/computervisioneng/face-attendance-system&#xff08;使用案例&#xff09; ##概念&#xff1a; 活体检测是指针对人脸识别过程中的人脸…

TSDB - VictoriaMetrics 技术原理浅析

一、前言 在监控领域&#xff0c;通常需要指标存储组件TSDB&#xff0c;目前开源的TSDB组件比较多&#xff0c;各个组件性能、高可用性、维护成本等等各有差异。本文不分析选型问题&#xff0c;重点讲解VictoriaMetrics&#xff08;后面简称为vm&#xff09;。 有兴趣的朋友建议…

Linux中常用的一些shell命令

很多的时候我们知道有一个命令&#xff0c;但不知道它的详细用法&#xff0c;可以来搜索下。但有些时候压根不知道有这个命令&#xff0c;比如vimdiff和diff这两个命令&#xff0c;知道人就比较少。 本节内容主要汇总一下Linux中常用的一些shell命令。 1. 文件和目录操作 ls …

win11安装MySQL5.7.43的问题清单

文章目录 1、win11查看自己电脑有没有安装mysql法1法2 2、完全清除之前安装的mysql3、 mysql的安装法1法2 4、遇到的一些问题1) ‘mysql‘不是内部或外部命令&#xff0c;也不是可运行的程序或批处理文件2) 忘记mysql的密码3)mysql启动不了:本地计算机上的MySQL服务启动后停止4…

机器学习深度学习——torch.nn模块

机器学习&&深度学习——torch.nn模块 卷积层池化层激活函数循环层全连接层 torch.nn模块包含着torch已经准备好的层&#xff0c;方便使用者调用构建网络。 卷积层 卷积就是输入和卷积核之间的内积运算&#xff0c;如下图&#xff1a; 容易发现&#xff0c;卷积神经网…

汽车养护店服务难题,看帕凝怎样解决?

中国汽车市场庞大&#xff0c;入户已然成为标配&#xff0c;加之新能源汽车近些年高增量&#xff0c;更促进了行业增长。而汽车后市场也迎来了一系列变化&#xff0c;客户服务前后路径需完善&#xff0c;商家们应该如何数字化经营呢&#xff1f; 接下来让我们看看【帕凝汽车养…

提升内功之模拟实现库函数atoi

本文包含知识点&#xff1a; 库函数atoi的使用和模拟实现枚举常量的运用fgets代替gets函数读取字符串isspace isdigit库函数的使用 一、库函数atoi的介绍与使用atoi的介绍atoi的使用细节 二、库函数atoi的模拟实现 一、库函数atoi的介绍与使用 atoi的介绍 函数介绍 头文件——…

密码学学习笔记(十七 ):Edwards曲线数字签名算法 - edDSA

Edwards曲线数字签名算法(Edwards-curve Digital Signature Alogorithm, edDSA)由Daniel J. Bernstein等人在2011年提出&#xff0c;它是一种使用基于扭曲爱德华兹曲线的Schnorr签名变体的数字签名方案。 EdDSA的一个特殊之处在于&#xff0c;该方案不要求每次签名都是用全新的…

Spring项目如何创建?Bean对象是如何存储的?

博主简介&#xff1a;想进大厂的打工人博主主页&#xff1a;xyk:所属专栏: JavaEE进阶 目录 文章目录 一、创建Spring项目 1.1 创建Maven项目 2.2 配置国内源 二、Bean对象的存储和读取 2.1 添加spring配置文件 2.2 创建Bean对象 2.3 读取Bean对象 2.3.1 得到spring上下文对象…

前端技术Vue学习笔记--001

前端技术Vue学习笔记 文章目录 前端技术Vue学习笔记1、Vue2和Vue3比较2、Vue简介3、Vue快速上手4、插值表达式{{}}5、Vue响应式特性6、Vue指令6.1、v-html指令6.2、v-show指令和v-if指令6.3、v-else指令和v-else-if指令6.4、v-on指令6.4.1、v-on指令基础6.4.2、v-on调用传参 6.…

生命的样子

bbc纪录片《王朝》第一季就让我颇为震撼&#xff0c;第二季拖到现在才看&#xff0c;不过好在看了《晚酌de流派》之后&#xff0c;现在对待上好的游戏和视频都要颇有仪式感的情况下食用&#xff0c;夜深人静&#xff0c;配着暖灯&#xff0c;一杯茶&#xff0c;伴随大卫爱登堡的…

Lesson2——时间复杂度与空间复杂度

前言&#xff1a; 一个物品的好坏&#xff0c;我们可以通过体验、口碑、销量等因素判断。那一个算法的好坏我们怎么判断呢&#xff1f; 目录&#xff1a; 1. 算法的效率 2. 时间复杂度 3. 空间复杂度 4. 常见时间复杂度以及复杂度oj练习 一、算法的效率 1、如何衡量一个算…

react-draft-wysiwyg富文本编辑器

在React项目中使用 yarn add react-draft-wysiwyg draft-js or npm i react-draft-wysiwyg draft-js推荐在项目中单独创建一个富文本编辑器组件 import { Editor } from "react-draft-wysiwyg"; import { EditorState, convertToRaw, ContentState } from draft-js…

12、动手学深度学习——循环神经网络从零实现+Pytorch内置函数实现:代码详解

1、基础知识 参考文章&#xff1a;8.4. 循环神经网络 2、从零开始实现 本节将上述描述&#xff0c; 从头开始基于循环神经网络实现字符级语言模型。 这样的模型将在H.G.Wells的时光机器数据集上训练。 首先&#xff0c; 我们先读取数据集。 %matplotlib inline import math…

陆拾柒- 如何通过数据影响决策(二)

是否曾感觉自己已经很努力了&#xff0c;但却一直被人说表现的比以前差了。 虽然古语有云“眼见为实”&#xff0c;但着眼之处很有可能是错的。 一、某咖啡店近期销量 7月17日准备要开大会时&#xff0c;负责小程序渠道的同事看到7月17日趋势下跌之后&#xff0c;就开始想办法…

fatal: unable to connect to github.com:github.com[0:20.205.243.166]: errno=Unknown error

git&#xff1a;fatal: unable to connect to github.com:github.com[0:20.205.243.166]: errnoUnknown error 在 bash 执行命令 git clone 时 报 &#xff1a; fatal: unable to connect to github.com:github.com[0: 20.205.243.166]: errnoUnknown error 发生此错误是因为 g…

【C++】string类的模拟实现(增删查改,比大小,运算符重载)

文章目录 1.1大框架1.2基本函数&#xff1a;2.成员函数2.0构造函数2.05析构函数2.09拷贝构造函数补充&#xff1a;预留存储空间&#xff08;reserve&#xff09; 2.1增加字符&#xff08;push_back&#xff0c;append&#xff0c;s&#xff09;push_backappends 2. 删除字符&am…

SpringCloud-Alibaba之Seata处理分布式事务

一ID三组件模型 Transaction ID XID 全局唯一的事务ID Transaction Coordinator(TC) 事务协调器&#xff0c;维护全局事务的运行状态&#xff0c;负责协调并驱动全局事务的提交或回滚 Transaction Manager™ 控制全局事务的边界&#xff0c;负责开启一个全局事务&#xff0c;…

Java当中的深拷贝和浅拷贝

文章目录 一、前提二、浅拷贝1. BeanUtils实现浅拷贝 三、深拷贝1. 实现Cloneable接口并重写clone()方法&#xff1a;2. 使用序列化与反序列化&#xff1a; 一、前提 在计算机的内存中&#xff0c;每个对象都被赋予一个地址&#xff0c;该地址指向对象在内存中存储的位置。当我…