Spark(37):Streaming DataFrame 和 Streaming DataSet 创建

news2024/9/28 10:06:26

目录

0. 相关文章链接

1. 概述

2. socket source

3. file source

3.1. 读取普通文件夹内的文件

3.2. 读取自动分区的文件夹内的文件

4. kafka source

4.1. 导入依赖

4.2. 以 Streaming 模式创建 Kafka 工作流

4.3. 通过 Batch 模式创建 Kafka 工作流

5. Rate Source


0. 相关文章链接

 Spark文章汇总 

1. 概述

        使用 Structured Streaming 最重要的就是对 Streaming DataFrame 和 Streaming DataSet 进行各种操作。从 Spark2。0 开始, DataFrame 和 DataSet 可以表示静态有界的表, 也可以表示流式无界表。与静态 Datasets/DataFrames 类似,我们可以使用公共入口点 SparkSession 从流数据源创建流式 Datasets/DataFrames,并对它们应用与静态 Datasets/DataFrames 相同的操作。通过spark.readStream()得到一个DataStreamReader对象, 然后通过这个对象加载流式数据源, 就得到一个流式的 DataFrame。

// 1. 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
val spark: SparkSession = SparkSession
    .builder()
    .master("local[*]")
    .appName("StreamTest")
    .getOrCreate()
import spark.implicits._

// 2. 从数据源(socket)中加载数据.
val lines: DataFrame = spark.readStream
    .format("socket") // 设置数据源
    .option("host", "localhost")
    .option("port", 9999)
    .load

spark 内置了几个流式数据源, 基本可以满足我们的所有需求:

  • File source 读取文件夹中的文件作为流式数据。 支持的文件格式: text, csv, josn, orc, parquet。 注意, 文件必须放置的给定的目录中, 在大多数文件系统中, 可以通过移动操作来完成。
  • kafka source 从 kafka 读取数据。 目前兼容 kafka 0。10。0+ 版本
  • socket source 用于测试。 可以从 socket 连接中读取 UTF8 的文本数据。 侦听的 socket 位于驱动中。 注意, 这个数据源仅仅用于测试。
  • rate source 用于测试。 以每秒指定的行数生成数据,每个输出行包含一个 timestamp 和 value。其中 timestamp 是一个 Timestamp类型(信息产生的时间),并且 value 是 Long 包含消息的数量。 用于测试和基准测试。
SourceOptionsFault-tolerantNotes
File sourcepath: path to the input directory, and common to all file formats. maxFilesPerTrigger: maximum number of new files to be considered in every trigger (default: no max) latestFirst: whether to process the latest new files first, useful when there is a large backlog of files (default: false) fileNameOnly: whether to check new files based on only the filename instead of on the full path (default: false). With this set to true, the following files would be considered as the same file, because their filenames, “dataset.txt”, are the same: “file:///dataset.txt” “s3://a/dataset.txt” “s3n://a/b/dataset.txt” “s3a://a/b/c/dataset.txt” For file-format-specific options, see the related methods in DataStreamReader(Scala/Java/Python/R). E.g. for “parquet” format options see DataStreamReader.parquet(). In addition, there are session configurations that affect certain file-formats. See the SQL Programming Guide for more details. E.g., for “parquet”, see Parquet configuration section.YesSupports glob paths, but does not support multiple comma-separated paths/globs.
Socket Sourcehost: host to connect to, must be specified port: port to connect to, must be specifiedNo
Rate SourcerowsPerSecond (e.g. 100, default: 1): How many rows should be generated per second. rampUpTime (e.g. 5s, default: 0s): How long to ramp up before the generating speed becomes rowsPerSecond. Using finer granularities than seconds will be truncated to integer seconds. numPartitions (e.g. 10, default: Spark’s default parallelism): The partition number for the generated rows. The source will try its best to reach rowsPerSecond, but the query may be resource constrained, and numPartitions can be tweaked to help reach the desired speed.Yes
Kafka SourceSee the Kafka Integration Guide.Yes

2. socket source

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.streaming.StreamingQuery

object StreamTest {

    def main(args: Array[String]): Unit = {

        // 1. 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamTest")
            .getOrCreate()
        import spark.implicits._

        // 2. 从数据源(socket)中加载数据.
        val lines: DataFrame = spark.readStream
            .format("socket") // 设置数据源
            .option("host", "localhost")
            .option("port", 9999)
            .load

        // 3. 把每行数据切割成单词
        val words: Dataset[String] = lines.as[String].flatMap((_: String).split("\\W"))

        // 4. 计算 word count
        val wordCounts: DataFrame = words.groupBy("value").count()

        // 5. 启动查询, 把结果打印到控制台
        val query: StreamingQuery = wordCounts
            .writeStream
            .outputMode("complete")
            .format("console")
            .start

        query.awaitTermination()
        spark.stop()

    }
}

3. file source

3.1. 读取普通文件夹内的文件

代码示例:

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.types.{LongType, StringType, StructType}

object StreamTest {

    def main(args: Array[String]): Unit = {

        // 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamTest")
            .getOrCreate()
        import spark.implicits._

        // 定义 Schema, 用于指定列名以及列中的数据类型
        val userSchema: StructType = new StructType()
            .add("name", StringType)
            .add("job", StringType)
            .add("age", LongType)

        // 使用SparkSession通过readStream方法读取文件(必须是目录, 不能是文件名)
        val user: DataFrame = spark.readStream
            .format("csv")
            .schema(userSchema)
            .load("/Project/Data/csv")
        // DataStreamReader中还有csv、json、text等方法,可以直接读取对应的文件
        val userCopy: DataFrame = spark.readStream
            .schema(userSchema)
            .csv("/Project/Data/csv")

        // 将对应的数据输出(trigger表示触发器:数字表示毫秒值. 0 表示立即处理)
        val query: StreamingQuery = user.writeStream
            .outputMode("append")
            .trigger(Trigger.ProcessingTime(0))
            .format("console")
            .start()

        // 启动执行器
        query.awaitTermination()
        spark.stop()

    }
}

模板数据:

lisi,male,18
zhiling,female,28

结果输出:

3.2. 读取自动分区的文件夹内的文件

        当文件夹被命名为 “key=value” 形式时, Structured Streaming 会自动递归遍历当前文件夹下的所有子文件夹, 并根据文件名实现自动分区。如果文件夹的命名规则不是“key=value”形式, 则不会触发自动分区。 另外, 同级目录下的文件夹的命名规则必须一致。

步骤一:创建如下目录结构

year=2023
	month=07
	month=08
year=2024
	month=07

步骤二:写入文件数据

lisi,male,18
zhiling,female,28

步骤三:编写代码(如上 读取普通文件夹内的文件 代码完全一致)

步骤四:启动运行打印日志

4. kafka source

4.1. 导入依赖

在其余Spark依赖的情况下,还需要导入如下SparkSQL的kafka依赖,参考文档: Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) - Spark 3.4.1 Documentation

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
    <version>2.4.3</version>
</dependency>

4.2. 以 Streaming 模式创建 Kafka 工作流

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

object StreamTest {

    def main(args: Array[String]): Unit = {

        // 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamTest")
            .getOrCreate()
        import spark.implicits._

        // 使用spark通过readStream方法可以以流的方式读取kafka里面的数据
        // 通过format设置 kafka 数据源
        // 通过 kafka.bootstrap.servers 设置kafka的参数
        // 通过 subscribe 设置订阅的主题,也可以订阅多个主题:   "topic1,topic2"
        // load后会返回一个DataFrame类型, 其schema是固定的: key,value,topic,partition,offset,timestamp,timestampType
        val df: DataFrame = spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "bigdata1:9092,bigdata2:9092,bigdata3:9092")
            .option("subscribe", "topic1")
            .load

        // 通过 selectExpr 只获取其中的value字段
        // 通过as转换成 Dataset
        val lines: Dataset[String] = df
            .selectExpr("CAST(value AS string)")
            .as[String]

        // 可以对 Dataset 进行各种操作
        val query: DataFrame = lines
            .flatMap((_: String).split("\\W+"))
            .groupBy("value")
            .count()

        // 进行输出,并且可以通过checkpointLocation来设置checkpoint
        // 下次启动的时候, 可以从上次的位置开始读取
        query.writeStream
            .outputMode("complete")
            .format("console")
            .option("checkpointLocation", "./ck1") 
            .start
            .awaitTermination()

        // 关闭执行环境
        spark.stop()

    }
}

4.3. 通过 Batch 模式创建 Kafka 工作流

        这种模式一般需要设置消费的其实偏移量和结束偏移量, 如果不设置 checkpoint 的情况下, 默认起始偏移量 earliest, 结束偏移量为 latest。该模式为一次性作业(批处理), 而非持续性的处理数据。

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

object StreamTest {

    def main(args: Array[String]): Unit = {

        // 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamTest")
            .getOrCreate()
        import spark.implicits._

        // 使用 read 方法,而不是 readStream 方法
        val lines: Dataset[String] = spark.read
            .format("kafka")
            .option("kafka.bootstrap.servers", "bigdata1:9092,bigdata2:9092,bigdata3:9092")
            .option("subscribe", "topic1")
            .option("startingOffsets", "earliest")
            .option("endingOffsets", "latest")
            .load
            .selectExpr("CAST(value AS STRING)")
            .as[String]

        // 同样对 Dataset[String] 进行各种操作
        val query: DataFrame = lines
            .flatMap(_.split("\\W+"))
            .groupBy("value")
            .count()

        // 使用 write 而不是 writeStream
        query.write
            .format("console")
            .save()

        // 关闭执行环境
        spark.stop()

    }
}

5. Rate Source

以固定的速率生成固定格式的数据, 用来测试 Structured Streaming 的性能

import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, SparkSession}

object StreamTest {

    def main(args: Array[String]): Unit = {

        // 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamTest")
            .getOrCreate()
        import spark.implicits._

        val rows: DataFrame = spark.readStream
            .format("rate") // 设置数据源为 rate
            .option("rowsPerSecond", 10) // 设置每秒产生的数据的条数, 默认是 1
            .option("rampUpTime", 1) // 设置多少秒到达指定速率 默认为 0
            .option("numPartitions", 2) /// 设置分区数  默认是 spark 的默认并行度
            .load

        rows.writeStream
            .outputMode("append")
            .trigger(Trigger.Continuous(1000))
            .format("console")
            .start()
            .awaitTermination()


        // 关闭执行环境
        spark.stop()

    }
}

注:其他Spark相关系列文章链接由此进 ->  Spark文章汇总 


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/803613.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

回归预测 | MATLAB实现PSO-GPR粒子群优化高斯过程回归多输入单输出回归预测

回归预测 | MATLAB实现PSO-GPR粒子群优化高斯过程回归多输入单输出回归预测 目录 回归预测 | MATLAB实现PSO-GPR粒子群优化高斯过程回归多输入单输出回归预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 Matlab基于PSO-GPR基于粒子群算法优化高斯过程回归的数据回归预…

Jmeter经过处理的变量设置全局变量

之前遇到一个问题&#xff1a;项目的某些接口是需要登录的而且登录不能多用户登录。模拟登录的时候传入请求头的其中一个参数比较复杂&#xff0c;需要登录后的某些参数拼接和加密后设置成全局变量&#xff0c;在请求头中调用这个变量&#xff0c;正常的设置全局变量的方法百度…

Java NIO 详解

Java 从1.4开始引入NIO&#xff08;New IO&#xff09;&#xff0c;是一种基于块&#xff08;Block&#xff09;的IO机制&#xff0c;也称为非阻塞IO。相比于传统的Java IO&#xff08;IO流&#xff09;方式&#xff0c;Java NIO提供了更快速、高效、灵活的IO操作。 Java NIO的…

一文让你彻底搞懂Mybatis之缓存机制

编译软件&#xff1a;IntelliJ IDEA 2019.2.4 x64 操作系统&#xff1a;win10 x64 位 家庭版 Maven版本&#xff1a;apache-maven-3.6.3 Mybatis版本&#xff1a;3.5.6 文章目录 一. 缓存是什么&#xff1f;二. 为什么要使用缓存&#xff1f;三. Mybatis中的缓存分哪几种&#…

自己实现MyBatis 底层机制--抽丝剥茧(上)

&#x1f600;前言 本篇博文是学习过程中的笔记和对于MyBatis底层机制的分析思路&#xff0c;希望能够给您带来帮助&#x1f60a; &#x1f3e0;个人主页&#xff1a;晨犀主页 &#x1f9d1;个人简介&#xff1a;大家好&#xff0c;我是晨犀&#xff0c;希望我的文章可以帮助到…

Base64之间的相互转化

使用org.apache.ommons.codec.binary.Base64实现字符串和Base64之间的相互转化 字符串转化为Base64之间的相互转化一 //转化为Base64字符串 String strOld "Welcome to the new world"; base64EncodeStr Base64.encodeBase64String(strOld.getBytes()); System.o…

黑马点评项目学习笔记(15w字详解,堪称史上最详细,欢迎收藏)

黑马点评项目学习笔记 文章目录 黑马点评项目学习笔记前言项目搭建导入数据库初始化项目启动项目启动前端项目启动后端项目 基于Session实现短信验证码登录短信验证码登录配置登录拦截器数据脱敏 Session集群共享问题基于Redis实现短信验证码登录短信验证登录配置登录拦截器 店…

漏洞分析|Metabase 代码执行漏洞(CVE-2023-38646):H2 JDBC 深入利用

0x01 概述 最近 Metabase 出了一个远程代码执行漏洞&#xff08;CVE-2023-38646&#xff09;&#xff0c;我们通过研究分析发现该漏洞是通过 JDBC 来利用的。在 Metabase 中兼容了多种数据库&#xff0c;本次漏洞中主要通过 H2 JDBC 连接信息触发漏洞。目前公开针对 H2 数据库…

国产内存强势崛起,光威神条有神价,无套路闭眼可入

今年的DIY电脑市场终于摆脱了前两年的阴霾&#xff0c;从CPU到内存都有着充足的货源&#xff0c;而且价格靠谱&#xff0c;特别是国产存储品牌超级厚道&#xff0c;内存、硬盘等配件的价格基本都是大跳水&#xff0c;相比于去年&#xff0c;同样的价格能够买到容量和性能翻倍的…

ERROR 1064 - You have an error in your SQL syntax;

ERROR 1064 - You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near (/, 少个逗号吧&#xff0c;以前开始写SQL&#xff0c;特别是修改SQL的时候容易出现这样错误。 而且自己也知道在附近…

SAP财务系统中的“复式记账法”

1. 前言 “复式记账法”是财务的基础知识&#xff0c;对于财务出身的小伙伴是so easy&#xff0c;但对于技术出身的同学&#xff0c;通常会被“借贷”关系弄的晕头转向。 本文会简明扼要的总结“复式记账法”的基本原理&#xff0c;并以采购和销售流程为例来介绍如何进行复式…

Java - 注解开发

注解开发定义bean Component的衍生注解 Service&#xff1a; 服务层的注解 Repository&#xff1a; 数据层的注解 Controller&#xff1a; 控制层的注解 纯注解开发 bean管理 bean作用范围 在类上面添加Scope(“singleton”) // prototype: 非单例 bean生命周期 PostCon…

PyTorch BatchNorm2d详解

通常和卷积层&#xff0c;激活函数一起使用

基于51单片机和proteus的加热洗手器系统设计

此系统是基于51单片机和proteus的仿真设计&#xff0c;功能如下&#xff1a; 1. 检测到人手后开启出水及加热。 2. LED指示加热出水及系统运行状态。 功能框图如下&#xff1a; Proteus仿真界面如下&#xff1a; 下面就各个模块逐一介绍&#xff0c; 模拟人手检测模块 通过…

redis 第三章

目录 1.主从复制 2.哨兵 3.集群 4.总结 1.主从复制 结果&#xff1a; 2.哨兵 3.集群 4.总结 通过集群&#xff0c;redis 解决了写操作无法负载均衡&#xff0c;以及存储能力受到单机限制的问题&#xff0c;实现了较为完善的高可用方案。

【设计模式】详解单例设计模式(包含并发、JVM)

文章目录 1、背景2、单例模式3、代码实现1、第一种实现&#xff08;饿汉式&#xff09;为什么属性都是static的&#xff1f;2、第二种实现&#xff08;懒汉式&#xff0c;线程不安全&#xff09;3、第三种实现&#xff08;懒汉式&#xff0c;线程安全&#xff09;4、第四种实现…

Android kotlin系列讲解之最佳的UI体验 - Material Design 实战

目录 一、什么是Material Design二、Toolbar三、滑动菜单1、DrawerLayout2、NavigationView 四、悬浮按钮和可交互提示1、FloatingActionButton2、Snackbar3、CoordinatorLayout 五、卡片式布局1、MaterialCardView2、AppBarLayout 六、可折叠式标题栏1、CollapsingToolbarLayo…

Python MySQL

pymysql 除了使用图形化工具以外&#xff0c;我们也可以使用编程语言来执行SQL从而操作数据库。 在Python中&#xff0c;使用第三方库&#xff1a;pymysql 来完成对MySQL数据库的操作。 安装&#xff1a; pip install pymysql 或在pycharm中搜索pymysql插件安装 创建到MySQ…

蓝桥杯单片机第十届国赛 真题+代码

iic.c /* # I2C代码片段说明1. 本文件夹中提供的驱动代码供参赛选手完成程序设计参考。2. 参赛选手可以自行编写相关代码或以该代码为基础&#xff0c;根据所选单片机类型、运行速度和试题中对单片机时钟频率的要求&#xff0c;进行代码调试和修改。 */ #include <STC1…

如何快速用Go获取短信验证码

要用Go获取短信验证码&#xff0c;通常需要连接到一个短信服务提供商的API&#xff0c;并通过该API发送请求来获取验证码。由于不同的短信服务提供商可能具有不同的API和授权方式&#xff0c;我将以一个简单的示例介绍如何使用Go语言来获取短信验证码。 在这个示例中&#xff0…