【Spark编程基础】第7章 Structured Streaming

news2025/1/12 23:02:23

系列文章目录


文章目录

  • 系列文章目录
  • 前言
  • 第7章 Structured Streaming
    • 7.1 概述
      • 7.1.1 基本概念
      • 7.1.2 两种处理模型
      • 7.1.3 Structured Streaming 和 Spark SQL、Spark Streaming 关系
    • 7.2 编写Structured Streaming程序的基本步骤
    • 7.3 输入源
      • 7.3.1 File源
      • 7.3.2 Kafka源
      • 7.3.3 Socket源
      • 7.3.4 Rate源
      • 7.3.1 File源
      • 7.3.2 Kafka源
      • 7.3.3 Socket源
      • 7.3.4 Rate源
    • 7.4 输出操作
    • 7.5 容错处理(自学)
    • 7.6 迟到数据处理(自学)
    • 7.7 查询的管理和监控(自学)
  • 总结


前言


第7章 Structured Streaming

7.1 概述

7.1.1 基本概念

  • Structured Streaming的关键思想是将实时数据流视为一张正在不断添加数据的表
  • 可以把流计算等同于在一个静态表上的批处理查询,Spark会在不断添加数据的无界输入表上运行计算,并进行增量查询
    在这里插入图片描述
  • 在无界表上对输入的查询将生成结果表,系统每隔一定的周期会触发对无界表的计算并更新结果表
    在这里插入图片描述

7.1.2 两种处理模型

(1)微批处理

  • Structured Streaming默认使用微批处理执行模型,这意味着Spark流计算引擎会定期检查流数据源,并对自上一批次结束后到达的新数据执行批量查询
  • 数据到达和得到处理并输出结果之间的延时超过100毫秒
    在这里插入图片描述

(2)持续处理

  • Spark从2.3.0版本开始引入了持续处理的试验性功能,可以实现流计算的毫秒级延迟
  • 在持续处理模式下,Spark不再根据触发器来周期性启动任务,而是启动一系列的连续读取、处理和写入结果的长时间运行的任务
    在这里插入图片描述

7.1.3 Structured Streaming 和 Spark SQL、Spark Streaming 关系

  • Structured Streaming处理的数据跟Spark Streaming一样,也是源源不断的数据流,区别在于,Spark Streaming采用的数据抽象是DStream(本质上就是一系列RDD),而Structured Streaming采用的数据抽象是DataFrame。
  • Structured Streaming可以使用Spark SQL的DataFrame/Dataset来处理数据流。虽然Spark SQL也是采用DataFrame作为数据抽象,但是,Spark SQL只能处理静态的数据,而Structured Streaming可以处理结构化的数据流。这样,Structured Streaming就将Spark SQL和Spark Streaming二者的特性结合了起来。
  • Structured Streaming可以对DataFrame/Dataset应用前面章节提到的各种操作,包括select、where、groupBy、map、filter、flatMap等。
  • Spark Streaming只能实现秒级的实时响应,而Structured Streaming由于采用了全新的设计方式,采用微批处理模型时可以实现100毫秒级别的实时响应,采用持续处理模型时可以支持毫秒级的实时响应。

7.2 编写Structured Streaming程序的基本步骤

  • 编写Structured Streaming程序的基本步骤包括:
    • 导入pyspark模块
    • 创建SparkSession对象
    • 创建输入数据源
    • 定义流计算过程
    • 启动流计算并输出结果
  • 实例任务:
    • 一个包含很多行英文语句的数据流源源不断到达,
    • Structured Streaming程序对每行英文语句进行拆分,并统计每个单词出现的频率

1.步骤1:导入pyspark模块

  • 导入PySpark模块,代码如下:
from pyspark.sql import SparkSession
from pyspark.sql.functions import split
from pyspark.sql.functions import explode

由于程序中需要用到拆分字符串和展开数组内的所有单词的功能,所以引用了来自pyspark.sql.functions里面的split和explode函数。

2.步骤2:创建SparkSession对象

  • 创建一个SparkSession对象,代码如下:
if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("StructuredNetworkWordCount") \
        .getOrCreate()
 
    spark.sparkContext.setLogLevel('WARN')

3.步骤3:创建输入数据源

  • 创建一个输入数据源,从“监听在本机(localhost)的9999端口上的服务”那里接收文本数据,具体语句如下:
lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

4.步骤4:定义流计算过程

  • 有了输入数据源以后,接着需要定义相关的查询语句,具体如下:
words = lines.select(
    explode(
        split(lines.value, " ")
    ).alias("word")
)
wordCounts = words.groupBy("word").count()

5.步骤5:启动流计算并输出结果

  • 定义完查询语句后,下面就可以开始真正执行流计算,具体语句如下:
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime="8 seconds") \
    .start()
 
query.awaitTermination()

在这里插入图片描述
在这里插入图片描述

7.3 输入源

  • File源(或称为“文件源”)以文件流的形式读取某个目录中的文件,支持的文件格式为csv、json、orc、parquet、text等。
  • 需要注意的是,文件放置到给定目录的操作应当是原子性的,即不能长时间在给定目录内打开文件写入内容,而是应当采取大部分操作系统都支持的、通过写入到临时文件后移动文件到给定目录的方式来完成。

7.3.1 File源

  • File源(或称为“文件源”)以文件流的形式读取某个目录中的文件,支持的文件格式为csv、json、orc、parquet、text等。
  • 需要注意的是,文件放置到给定目录的操作应当是原子性的,即不能长时间在给定目录内打开文件写入内容,而是应当采取大部分操作系统都支持的、通过写入到临时文件后移动文件到给定目录的方式来完成。
  • 一个实例:
    • 这里以一个JSON格式文件的处理来演示File源的使用方法,主要包括以下两个步骤:
      • 创建程序生成JSON格式的File源测试数据
      • 创建程序对数据进行统计

(1)创建程序生成JSON格式的File源测试数据

  • 为了演示JSON格式文件的处理,这里随机生成一些JSON格式的文件来进行测试。
  • 代码文件spark_ss_filesource_generate.py内容如下:
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
  • 这段程序首先建立测试环境,清空测试数据所在的目录,接着使用for循环一千次来生成一千个文件,
  • 文件名为“e-mall-数字.json”,
  • 文件内容是不超过100行的随机JSON行,行的格式是类似如下:
    • {“eventTime”: 1546939167, “action”: “logout”, “district”: “fujian”}\n

(2)创建程序对数据进行统计

  • spark_ss_filesource.py”,其代码内容如下:
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

(3)测试运行程序
在这里插入图片描述

7.3.2 Kafka源

7.3.3 Socket源

7.3.4 Rate源

7.3.1 File源

7.3.2 Kafka源

7.3.3 Socket源

7.3.4 Rate源

7.4 输出操作

7.5 容错处理(自学)

7.6 迟到数据处理(自学)

7.7 查询的管理和监控(自学)


总结

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/535758.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

开发框架Furion之WebApi+SqlSugar (一)

目录 1.开发环境 2.项目创建 2.1创建WebApi主项目 2.2 创建Start类库 2.3创建Model实体类库 2.4创建Application仓储业务类库 2.5创建Unility通用方法类库 3.基础功能配置 3.1 Model实体对象与数据表映射 3.2 基类仓储及动态Api接口配置 3.3 数据库IOC注册 3.4 Star…

Java字符串知多少:String、StringBuffer、StringBuilder

一、String 1、简介 String 是 Java 中使用得最频繁的一个类了,不管是作为开发者的业务使用,还是一些系统级别的字符使用, String 都发挥着重要的作用。String 是不可变的、final的,不能被继承,且 Java 在运行时也保…

【C++】哈希表-开散列闭散列

文章目录 哈希概念例子: 哈希冲突哈希函数哈希冲突解决方法1:闭散列 哈希表的闭散列实现闭散列结构设计**哈希表的插入过程:****哈希表的查找过程:**哈希表的删除过程:只能存储key为整形的元素 那其他类型怎么解决 CloseHash.h哈希表的开散列实现开散列概念开散列的最坏情况及解…

车机CarLauncher的Activity多屏模式WindowingMode为WINDOWING_MODE_MULTI_WINDOW疑问解析

hi,粉丝朋友们! IntDef(prefix { "WINDOWING_MODE_" }, value {WINDOWING_MODE_UNDEFINED,WINDOWING_MODE_FULLSCREEN,WINDOWING_MODE_MULTI_WINDOW,WINDOWING_MODE_PINNED,WINDOWING_MODE_SPLIT_SCREEN_PRIMARY,WINDOWING_MODE_SPLIT_SCREE…

nacos注册中心源码分析一之服务注册、服务心跳

源码分析 nacos客户端注册分析 依赖包 <dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency>Nacos的客户端是基于SpringBoot的自动装配实现的 看下依…

算法性能分析

一、时间复杂度分析 1.什么是时间复杂度 时间复杂度是一个函数&#xff0c;它定性描述该算法的运行时间。我们在软件开发中&#xff0c;时间复杂度就是用来方便开发者估算出程序运行的答题时间。 那么该如何估计程序运行时间呢&#xff0c;通常会估算算法的操作单元数量来代表…

10个顶级AI艺术生成器

人工智能 (AI) 不仅影响商业和医疗保健等行业。 通过开创人工智能生成艺术的新时代&#xff0c;它还在创意产业中发挥着越来越重要的作用。 人工智能技术和工具通常可供任何人广泛使用&#xff0c;这有助于创造全新一代的艺术家。 我们经常听说人工智能将自动化或接管所有人类…

Java中的正则表达式详解

文章和代码已经归档至【Github仓库&#xff1a;https://github.com/timerring/java-tutorial 】或者【AIShareLab】回复 java 也可获取。 文章目录 正则表达式为什么要学习正则表达式再提出几个问题解决之道-正则表达式正则表达式基本介绍介绍 正则表达式底层实现实例分析 正则…

Word控件Aspose.Words教程:设置图表数据标签的默认选项

Aspose.Words是一种高级Word文档处理API&#xff0c;用于执行各种文档管理和操作任务。API支持生成&#xff0c;修改&#xff0c;转换&#xff0c;呈现和打印文档&#xff0c;而无需在跨平台应用程序中直接使用Microsoft Word。 Aspose API支持流行文件格式处理&#xff0c;并…

新星计划2023【网络应用领域基础】————————Day4

常见的网络基础介绍 前言 我们学习了一些基础的网络协议&#xff0c;以及子网掩码和vlan&#xff0c;同时也做了个简单的单臂路由实验 这篇文章我将仔细的讲解单臂路由的应用和交换机二层接口类型&#xff0c;以及wireshark的教程。 一&#xff0c;交换机二层接口 交换机的二…

Everypixel: AI图片搜索引擎

【产品介绍】 Everypixel是一个基于人工智能的图片搜索引擎。可以搜索超过 50 个图片来源的优质的授权图库版权素材图片&#xff0c;还可以使用免费图案功能&#xff0c;找到适合自己需求的可定制无缝图案。 Everypixel利用深度学习和计算机视觉技术&#xff0c;为客户提供先进…

Taro小程序配置网络请求

目录 1. 创建目录结构2. 全局通用的config的变量配置3. 配置http网络请求4. 使用 1. 创建目录结构 在 src 目录下新建 service 目录&#xff0c;目录下创建 api 和 http 子目录在 src 目录下新建 config 配置文件 2. 全局通用的config的变量配置 在 config 文件中添加一下代…

日本进口Hioki IM3536 LCR测试仪

Hioki IM3536 LCR测试仪 测量频率DC&#xff0c;4Hz~8MHz 测量时间&#xff1a;最快1ms 基本精度&#xff1a;0.05% rdg 1mΩ以上的精度保证范围&#xff0c;也可安心进行低阻测量 可内部发生DC偏压测量 从研发到生产线活跃在各种领域中 测量频率4Hz~8MHz&#xff0c;精度…

【pyq文案】可可爱爱、脑回路清奇の朋友圈文案

1.人每一个身体器官都是无价之宝&#xff0c;全部加起来1个月3000 2.别人出门&#xff1a;辣妹风、复古风、学院风&#xff1b;我出门&#xff1a;打工的勤劳小蜜蜂 3.看见自己就烦&#xff0c;50出&#xff1b;和今天星期四没关系 4.上学时拿钱混日子&#xff0c;上班后拿日…

种子轮、天使轮等相关知识

我们可以通过查询企业的相关工具网站&#xff0c;查看企业是否上市、独角兽、瞪羚企业、上市企业等情况。 转载&#xff1a; https://zhuanlan.zhihu.com/p/565389690 科创板挂牌不属于上市&#xff0c;企业在挂牌之后要经过协会核准后可以进行股份登记挂牌&#xff0c;大概需要…

【Redis】Redis 命令之 Hash

文章目录 ⛄介绍⛄命令⛄RedisTemplate API⛄应用场景 ⛄介绍 Hash类型&#xff0c;也叫散列&#xff0c;其value是一个无序字典&#xff0c;类似于Java中的 HashMap 结构。 String结构是将对象序列化为JSON字符串后存储&#xff0c;当需要修改对象某个字段时很不方便&#xf…

java服务-常用技术-生僻函数、方法、技巧

一、字符串操作 1. 需要转义的字符 java字符串中需要转义的特殊字符1. \n 表示换行&#xff1b;2. \t 表示制表符&#xff0c;相当于Table键&#xff1b;3. \ 表示单引号&#xff1b;4. \" 表示双引号&#xff1b;5. \\ 表示一个斜杠“\”。 2. split第二个参数limit的用…

Dynamics 365 DevOps CI/CD之WebResource

对于D365自身的发布&#xff0c;简单点来说就是Solution的发布&#xff0c;复杂一些会涉及周边集成接口等一系列的发布。如果是单纯的Solution的发布的Azure DevOps商店里有很多工具&#xff0c;比如Power DevOps Tools&#xff0c;这个我之前也有博文转载过相关文章&#xff0…

史上最通俗易懂的EWMA(指数加权移动平均)的参数解释以及程序代码

文章目录 一、EWMA&#xff08;指数加权移动平均&#xff09;是什么&#xff1f;二、详细的参数解释3、使用Python pandas库中的ewm()函数实现指数加权移动平均&#xff08;EWMA&#xff09;的示例代码总结 一、EWMA&#xff08;指数加权移动平均&#xff09;是什么&#xff1f…

抢跑智驾AI芯片「新路径」

“胆量”这个词&#xff0c;被后摩智能创始人兼CEO吴强着重提及。 5月10日&#xff0c;后摩智能发布首款存算一体智驾芯片鸿途™H30&#xff0c;以12nm制程实现最高物理算力 256TOPS&#xff0c;典型功耗 35W&#xff0c;成为国内率先落地存算一体大算力 AI 芯片的公司。即&am…