Spark(39):Streaming DataFrame 和 Streaming DataSet 输出

news2025/2/14 3:21:24

目录

0. 相关文章链接

1. 输出的选项

2. 输出模式(output mode)

2.1. Append 模式(默认)

2.2. Complete 模式

2.3. Update 模式

2.4. 输出模式总结

3. 输出接收器(output sink)

3.1. file sink

3.2. kafka sink

3.2.1. 以 Streaming 方式输出数据

3.2.2. 以 batch 方式输出数据

3.3. console sink

3.4. memory sink

3.5. foreach sink

3.6. ForeachBatch Sink


0. 相关文章链接

 Spark文章汇总 

1. 输出的选项

一旦定义了最终结果DataFrame / Dataset,剩下的就是开始流式计算。为此,必须使用返回的 DataStreamWriter Dataset.writeStream()。

需要指定一下选项:

  • 输出接收器的详细信息:数据格式,位置等。
  • 输出模式:指定写入输出接收器的内容。
  • 查询名称:可选,指定查询的唯一名称以进行标识。
  • 触发间隔:可选择指定触发间隔。如果未指定,则系统将在前一处理完成后立即检查新数据的可用性。如果由于先前的处理尚未完成而错过了触发时间,则系统将立即触发处理。
  • 检查点位置:对于可以保证端到端容错的某些输出接收器,请指定系统写入所有检查点信息的位置。这应该是与HDFS兼容的容错文件系统中的目录。

2. 输出模式(output mode)

2.1. Append 模式(默认)

        默认输出模式, 仅仅添加到结果表的新行才会输出。采用这种输出模式, 可以保证每行数据仅输出一次。在查询过程中, 如果没有使用 watermask 机制, 则不能使用聚合操作。 如果使用了 watermask 机制, 则只能使用基于 event-time 的聚合操作。watermask 用于高速 append 模式如何输出不会再发生变动的数据。 即只有过期的聚合结果才会在 Append 模式中被“有且仅有一次”的输出。

2.2. Complete 模式

每次触发, 整个结果表的数据都会被输出。 仅仅聚合操作才支持。同时该模式使用 watermask 无效。

2.3. Update 模式

        该模式在 从 spark 2.1.1 可用. 在处理完数据之后, 该模式只输出相比上个批次变动的内容(新增或修改)。如果没有聚合操作, 则该模式与 append 模式一样。如果有聚合操作, 则可以基于 watermast 清理过期的状态。

2.4. 输出模式总结

不同的查询支持不同的输出模式

3. 输出接收器(output sink)

spark 提供了几个内置的 output-sink,不同 output sink 所适用的 output mode 不尽相同:

SinkSupported Output ModesOptionsFault-tolerantNotes
File SinkAppendpath: path to the output directory, must be specified. For file-format-specific options, see the related methods in DataFrameWriter (Scala/Java/Python/R). E.g. for “parquet” format options see DataFrameWriter.parquet()Yes (exactly-once)Supports writes to partitioned tables. Partitioning by time may be useful.
Kafka SinkAppend, Update, CompleteSee the Kafka Integration GuideYes (at-least-once)More details in the Kafka Integration Guide
Foreach SinkAppend, Update, CompleteNoneDepends on ForeachWriter implementationMore details in the next section
ForeachBatch SinkAppend, Update, CompleteNoneDepends on the implementationMore details in the next section
Console SinkAppend, Update, CompletenumRows: Number of rows to print every trigger (default: 20) truncate: Whether to truncate the output if too long (default: true)No
Memory SinkAppend, CompleteNoneNo. But in Complete Mode, restarted query will recreate the full table.Table name is the query name.

3.1. file sink

存储输出到目录中 仅仅支持 append 模式

需求: 把单词和单词的反转组成 json 格式写入到目录中。

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

import java.sql.Timestamp

object StreamTest {

    def main(args: Array[String]): Unit = {

        // 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamTest")
            .getOrCreate()
        import spark.implicits._

        // 设置数据源,并接收数据
        val lines: DataFrame = spark.readStream
            .format("socket")
            .option("host", "localhost")
            .option("port", 9999)
            .load

        // 数据计算
        val words: DataFrame = lines
            .as[String]
            .flatMap((line: String) => {
                line
                    .split("\\W+")
                    .map((word: String) => {
                        (word, word.reverse)
                    })
            })
            .toDF("原单词", "反转单词")

        // 结果输出
        words
            .writeStream
            .outputMode("append")
            .format("json") // 支持 "orc", "json", "csv"
            .option("path", "./filesink") // 输出目录
            .option("checkpointLocation", "./ck1") // 必须指定 checkpoint 目录
            .start
            .awaitTermination()

        // 关闭执行环境
        spark.stop()

    }
}

输出的数据:

{"原单词":"abc","反转单词":"cba"}

3.2. kafka sink

将 wordcount 结果写入到 kafka

写入到 kafka 的时候应该包含如下列:

ColumnType
key (optional)string or binary
value (required)string or binary
topic (optional)string

注意:

  • 如果没有添加 topic option 则 topic 列必须有.
  • kafka sink 三种输出模式都支持

3.2.1. 以 Streaming 方式输出数据

这种方式使用流的方式源源不断的向 kafka 写入数据:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

import java.sql.Timestamp

object StreamTest {

    def main(args: Array[String]): Unit = {

        // 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamTest")
            .getOrCreate()
        import spark.implicits._

        // 设置数据源,并接收数据
        val lines: DataFrame = spark.readStream
            .format("socket")
            .option("host", "localhost")
            .option("port", 9999)
            .load

        // 数据计算
        val words = lines
            .as[String]
            .flatMap((_: String).split("\\W+"))
            .groupBy("value")
            .count()
            .map((row: Row) => row.getString(0) + "," + row.getLong(1))
            .toDF("value") // 写入数据时候, 必须有一列 "value"

        words.writeStream
            .outputMode("update")
            .format("kafka")
            .trigger(Trigger.ProcessingTime(0))
            .option("kafka.bootstrap.servers", "bigdata1:9092,bigdata2:9092,bigdata3:9092") // kafka 配置
            .option("topic", "update") // kafka 主题
            .option("checkpointLocation", "./ck1") // 必须指定 checkpoint 目录
            .start
            .awaitTermination()

        // 关闭执行环境
        spark.stop()

    }
}

3.2.2. 以 batch 方式输出数据

这种方式输出离线处理的结果, 将已存在的数据分为若干批次进行处理. 处理完毕后程序退出:

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

import java.sql.Timestamp

object StreamTest {

    def main(args: Array[String]): Unit = {

        // 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamTest")
            .getOrCreate()
        import spark.implicits._

        val wordCount: DataFrame = spark
            .sparkContext
            .parallelize(Array("hello hello abc", "abc, hello"))
            .toDF("word")
            .groupBy("word")
            .count()
            .map(row => row.getString(0) + "," + row.getLong(1))
            .toDF("value") // 写入数据时候, 必须有一列 "value"

        wordCount.write // batch 方式
            .format("kafka")
            .option("kafka.bootstrap.servers", "bigdata1:9092,bigdata2:9092,bigdata3:9092") // kafka 配置
            .option("topic", "update") // kafka 主题
            .save()

        // 关闭执行环境
        spark.stop()

    }
}

3.3. console sink

主要用于测试数据输出

3.4. memory sink

该 sink 也是用于测试, 将其统计结果全部输入内中指定的表中, 然后可以通过 sql 与从表中查询数据。

如果数据量非常大, 可能会发送内存溢出:

import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

import java.sql.Timestamp
import java.util.{Timer, TimerTask}

object StreamTest {

    def main(args: Array[String]): Unit = {

        // 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamTest")
            .getOrCreate()
        import spark.implicits._

        // 设置数据源,并接收数据
        val lines: DataFrame = spark.readStream
            .format("socket")
            .option("host", "localhost")
            .option("port", 9999)
            .load

        val words: DataFrame = lines
            .as[String]
            .flatMap((_: String).split("\\W+"))
            .groupBy("value")
            .count()

        val query: StreamingQuery = words
            .writeStream
            .outputMode("complete")
            .format("memory") // memory sink
            .queryName("word_count") // 内存临时表名
            .start

        // 测试使用定时器执行查询表
        val timer: Timer = new Timer(true)
        val task: TimerTask = new TimerTask {
            override def run(): Unit = spark.sql("select * from word_count").show
        }
        timer.scheduleAtFixedRate(task, 0, 2000)

        query.awaitTermination()

        // 关闭执行环境
        spark.stop()

    }
}

3.5. foreach sink

foreach sink 会遍历表中的每一行, 允许将流查询结果按开发者指定的逻辑输出;把 wordcount 数据写入到 mysql。

注意(需要在依赖中添加MySQL的驱动依赖):

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.27</version>
</dependency>

建表语句如下所示:

create database ss;
use ss;
create table word_count
(
    word  varchar(255) primary key not null,
    count bigint                   not null
);

代码示例如下:

import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row, SparkSession}

import java.sql.{Connection, DriverManager, PreparedStatement, Timestamp}
import java.util.{Timer, TimerTask}

object StreamTest {

    def main(args: Array[String]): Unit = {

        // 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamTest")
            .getOrCreate()
        import spark.implicits._

        // 设置数据源,并接收数据
        val lines: DataFrame = spark.readStream
            .format("socket")
            .option("host", "localhost")
            .option("port", 9999)
            .load

        val wordCount: DataFrame = lines
            .as[String]
            .flatMap((_: String).split("\\W+"))
            .groupBy("value")
            .count()

        val query: StreamingQuery = wordCount
            .writeStream
            .outputMode("update")
            // 使用 foreach 的时候, 需要传递ForeachWriter实例, 三个抽象方法需要实现. 每个批次的所有分区都会创建 ForeeachWriter 实例
            .foreach(new ForeachWriter[Row] {

                var conn: Connection = _
                var ps: PreparedStatement = _
                var batchCount = 0

                // 一般用于 打开链接. 返回 false 表示跳过该分区的数据,
                override def open(partitionId: Long, epochId: Long): Boolean = {
                    println("open ..." + partitionId + "  " + epochId)
                    Class.forName("com.mysql.jdbc.Driver")
                    conn = DriverManager.getConnection("jdbc:mysql://hadoop201:3306/ss", "root", "aaa")
                    // 插入数据, 当有重复的 key 的时候更新
                    val sql = "insert into word_count values(?, ?) on duplicate key update word=?, count=?"
                    ps = conn.prepareStatement(sql)

                    conn != null && !conn.isClosed && ps != null
                }

                // 把数据写入到连接
                override def process(value: Row): Unit = {
                    println("process ...." + value)
                    val word: String = value.getString(0)
                    val count: Long = value.getLong(1)
                    ps.setString(1, word)
                    ps.setLong(2, count)
                    ps.setString(3, word)
                    ps.setLong(4, count)
                    ps.execute()
                }

                // 用户关闭连接
                override def close(errorOrNull: Throwable): Unit = {
                    println("close...")
                    ps.close()
                    conn.close()
                }
            })
            .start

        query.awaitTermination()

        // 关闭执行环境
        spark.stop()

    }
}

3.6. ForeachBatch Sink

ForeachBatch Sink 是 spark 2.4 才新增的功能, 该功能只能用于输出批处理的数据。将统计结果同时输出到本地文件和 mysql 中。

import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row, SparkSession}

import java.sql.{Connection, DriverManager, PreparedStatement, Timestamp}
import java.util.{Properties, Timer, TimerTask}

object StreamTest {

    def main(args: Array[String]): Unit = {

        // 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamTest")
            .getOrCreate()
        import spark.implicits._

        // 设置数据源,并接收数据
        val lines: DataFrame = spark.readStream
            .format("socket")
            .option("host", "localhost")
            .option("port", 9999)
            .load

        val wordCount: DataFrame = lines
            .as[String]
            .flatMap(_.split("\\W+"))
            .groupBy("value")
            .count()

        val props: Properties = new Properties()
        props.setProperty("user", "root")
        props.setProperty("password", "aaa")
        val query: StreamingQuery = wordCount
            .writeStream
            .outputMode("complete")
            .foreachBatch((df: Dataset[Row], batchId: Long) => { // 当前分区id, 当前批次id
                if (df.count() != 0) {
                    df.cache()
                    df.write.json(s"./$batchId")
                    df.write.mode("overwrite").jdbc("jdbc:mysql://hadoop201:3306/ss", "word_count", props)
                }
            })
            .start()

        query.awaitTermination()

        // 关闭执行环境
        spark.stop()

    }
}

注:其他Spark相关系列文章链接由此进 ->  Spark文章汇总 


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/863820.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

gin的占位符:和通配符*

1、用法 在 Gin 路由中&#xff0c;可以使用一个通配符&#xff08;*&#xff09;或一个占位符&#xff08;:&#xff09;来捕获 URL 的一部分。 r.GET("/royal/:id", func(c *gin.Context) {id : c.Param("id")//fmt.Println("into :id")c.Str…

为什么商业基础软件需要开源

Bytebase 本身是一家商业软件公司&#xff0c;而作为最核心资产的代码从 Day 0 却是开源的。同时我们还是 star-history.com 的运营者&#xff0c;大家在各种开源渠道会看到它生成的图&#xff1a; 一直以来&#xff0c;常会被别人问起的一个问题&#xff0c;就是为什么 Byteba…

软件研发的道德情操

作者&#xff1a;许晓斌(晓斌) 两百多年前苏格兰出了一位大哲学家&#xff0c;他的名字叫做亚当斯密。今天人们对他的了解更多是在经济学家这个身份&#xff0c;都认为是他发现了“看不见的手”这一神奇的经济规律&#xff0c;以及他那本著名的《国富论》。然而除了这本书之外&…

一文详解自然语言处理两大任务与代码实战:NLU与NLG

目录 1. 自然语言处理定义NLP的定义和重要性NLP的主要挑战 2. 基础模型语言模型统计语言模型n-gram模型 连续词袋模型 3. 基础概念词向量Word2VecSkip-Gram GloVeFastText1. 子词表示2. 词向量训练3. 文本分类4. 预训练模型 4. 文本预处理分词去除停用词词干提取和词形还原文本…

R语言的物种气候生态位动态量化与分布特征模拟实践技术

在全球气候快速变化的背景下&#xff0c;理解并预测生物种群如何应对气候变化&#xff0c;特别是它们的地理分布如何变化&#xff0c;已经变得至关重要。利用R语言进行物种气候生态位动态量化与分布特征模拟&#xff0c;不仅可以量化描述物种对环境的需求和适应性&#xff0c;预…

2021年09月 C/C++(一级)真题解析#中国电子学会#全国青少年软件编程等级考试

第1题&#xff1a;数字判断 输入一个字符&#xff0c;如何输入的字符是数字&#xff0c;输出yes&#xff0c;否则输出no 输入 一个字符 输出 如何输入的字符是数字&#xff0c;输出yes&#xff0c;否则输出no 样例1输入 样例1输入 5 样例1输出 yes 样例2输入 A 样例2输出 no 下…

工程优化问题之三杆桁架设计研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

【UniApp开发小程序】小程序首页(展示商品、商品搜索、商品分类搜索)【后端基于若依管理系统开发】

文章目录 界面效果界面实现工具js页面首页让文字只显示两行路由跳转传递对象将商品分为两列显示使用中划线划掉原价 后端商品controllerservicemappersql 界面效果 【说明】 界面中商品的图片来源于闲鱼&#xff0c;若侵权请联系删除关于商品分类页面的实现&#xff0c;请在我…

JVM——Java 内存区域详解

文章目录 写在前面 (常见面试题)基本问题拓展问题 一 概述二 运行时数据区域2.1 程序计数器2.2 Java 虚拟机栈2.3 本地方法栈2.4 堆2.5 方法区2.5.1 方法区和永久代的关系2.5.2 常用参数2.5.3 为什么要将永久代 (PermGen) 替换为元空间 (MetaSpace) 呢? 2.6 运行时常量池2.7 直…

DP(背包模型)

求方案数初始化总结 二维 1.体积至多是 j f[0][i]1 (0<i<m) 其余为 0 &#xff08;因为体积是至多是j 它可以小于j f[0][0]1 是肯定的 f[0][i] 这当中 i也可以包括0 一定要get到至多 即便不选物品 f[0][i]也是 1 它可以包含f[0][0]) 2.体积恰好是 j f[0][0]1 其余都…

MATLAB | 绘图复刻(十一) | 截断的含散点及误差棒的分组柱状图

hey大家好久不见&#xff0c;本期带来一篇绘图复刻&#xff0c;居然已经出到第11篇了&#xff0c;不知道有朝一日有没有机会破百&#xff0c;本期绘制的是《PNAS》期刊中pnas.2200057120文章的figure03&#xff0c;文章题目为Intranasal delivery of full-length anti-Nogo-A a…

SaaS BI数据可视化工具:免下载安装,登录即分析

之前有人问我&#xff0c;说&#xff1a;“BI数据可视化工具总是要下载安装&#xff0c;过程繁琐&#xff0c;没点IT基础的人也不太搞得定&#xff0c;有没有不用下载安装就能用的数据可视化工具&#xff1f;”答案当然是有的&#xff0c;那就是SaaS BI数据可视化工具。 SaaS …

并发——AQS原理以及AQS同步组件总结

文章目录 1 AQS 简单介绍2 AQS 原理2.1 AQS 原理概览2.2 AQS 对资源的共享方式2.3 AQS 底层使用了模板方法模式 3 Semaphore(信号量)-允许多个线程同时访问4 CountDownLatch &#xff08;倒计时器&#xff09;4.1 CountDownLatch 的两种典型用法4.2 CountDownLatch 的使用示例4…

生信豆芽菜-箱线图使用说明

网站&#xff1a;http://www.sxdyc.com/visualsBoxplot 一、使用方法 1.打开网址&#xff08;http://www.sxdyc.com/singleCollectionTool?href-diff&#xff09;&#xff0c;选择“箱线图”。 准备数据&#xff1a; 第一个文件为特征的表达文件&#xff0c;第一列为样本&am…

机器人CPP编程基础-05完结The End

非常不可思议……之前四篇博文竟然有超过100的阅读量…… 此文此部分终结&#xff0c;没有继续写下去的必要了。 插入一个分享&#xff1a; 编程基础不重要了&#xff0c;只要明确需求&#xff0c;借助AI工具就能完成一个项目。 当然也不是一次成功&#xff0c;工具使用也需要…

PP模块生产过程检验(工序检验)

质量检验是在一些情形下对物料进行检查的一个操作,用于检验物料是否符合标准。 1.常见检验类型: (1)进货检验(Incoming material Quality Check,IQC) (2) 生产过程检验(In Process Quality Check,IQPC) (3) 最终检验/产品检验(Final Quality Check,FQC) (4)出…

【产品应用】一体化步进伺服电机在自动液体处理工作站中的应用

随着工业自动化的快速发展&#xff0c;自动液体处理工作站在医药、食品、化工等领域中发挥着越来越重要的作用。一体化步进伺服电机作为一种高集成的驱动元件&#xff0c;具有高精度、高速度和高稳定性的特点&#xff0c;被广泛应用于自动液体处理工作站中。本文将详细介绍一体…

同源建模-build loop

对于有残基缺失的晶体结构往往采用同源建模的方式补全&#xff0c;但常规的同源建模方法往往造成非缺失区域残基的挪动&#xff0c;有时我们并不想出现这种状况&#xff0c;尤其是涉及到多个截短体拼合的情况&#xff0c;这时就需要用到约束性同源建模的方法&#xff0c;只对缺…

对一线大厂游戏测试员的访谈实录,带你了解游戏测试

今天采访了一个在游戏行业做测试的同学&#xff0c;他所在的游戏公司是做大型多人在线角色扮演类的游戏&#xff0c;类似传奇游戏。他所在的公司目前有1200多人&#xff0c;是上市公司&#xff0c;目前游戏产品在国内海外都有市场。 因为我是一个对游戏无感的人&#xff0c;所…

户外组网摆脱布线困扰,工业5G网关实现无人值守、远程实时监控

在物联网通信技术发达的2023&#xff0c;网络覆盖对所及之处的全面覆盖&#xff0c;科技发展的促使下很多高危户外场景也在思考如何利用无线技术提高人员安全及现场无人化管理。 煤矿是我们国家不可缺少的重要能源&#xff0c;其开采过程的危险系数也是众所皆知的&#xff0c;…