Spark实时(四):Strctured Streaming简单应用

news2024/9/20 0:30:14

文章目录

Strctured Streaming简单应用

一、Output Modes输出模式

二、Streaming Table API

三、​​​​​​​​​​​​​​Triggers

1、​​​​​​​unspecified(默认模式)

2、​​​​​​​​​​​​​​Fixed interval micro-batches(固定间隔批次)

3、 ​​​​​​​​​​​​​​One-time micro-batch (仅一次触发)

4、​​​​​​​​​​​​​​Continuous with fixed checkpoint interval(连续处理)


Strctured Streaming简单应用

一、Output Modes输出模式

Structured Streaming中结果输出时outputMode可以设置三种模式,三种默认区别如下:

  • Append Mode(默认模式):追加模式,只有自上次触发后追加到结果表中的新行才会被输出。只有select、where、map、flatmap、filter、join查询支持追加模式。
  • Complete Mode(完整模式):将整个更新的结果输出。仅可用于代码中有聚合查询情况,代码中没有聚合查询不能使用。
  • Update Mode(更新模式):自Spark2.1.1版本后可用,只有自上次触发后更新的行才会被输出。这种模式仅仅输出自上次触发以来发生更改的行。如果结果数据没有聚合操作那么相当于Append Mode。

二、​​​​​​​​​​​​​​Streaming Table API

在Spark3.1版本之后,我们可以通过DataStreamReader.table()方式实时读取流式表中的数据,使用DataStreamWriter.toTable()向表中实时写数据。

案例:读取Socket数据实时写入到Spark流表中,然后读取流表数据展示数据。

代码示例如下:

package com.lanson.structuredStreaming

import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SparkSession}

object StreamTableAPI {
  def main(args: Array[String]): Unit = {
    //1.创建对象
    val spark: SparkSession = SparkSession.builder().master("local")
      .appName("StreamTableAPI")
      .config("spark.sql.shuffle.partitions", 1)
      .config("spark.sql.warehouse.dir", "./my-spark-warehouse")
      .getOrCreate()

    spark.sparkContext.setLogLevel("Error");
    import spark.implicits._

    //2.读取socket数据,注册流表
    val df: DataFrame = spark.readStream
      .format("socket")
      .option("host", "node3")
      .option("port", 9999)
      .load()

    //3.对df进行转换
    val personinfo: DataFrame = df.as[String]
      .map(line => {
        val arr: Array[String] = line.split(",")
        (arr(0).toInt, arr(1), arr(2).toInt)
      }).toDF("id", "name", "age")

    //4.将以上personinfo 写入到流表中
    personinfo.writeStream
      .option("checkpointLocation","./checkpoint/dir1")
      .toTable("mytbl")

    import org.apache.spark.sql.functions._

    //5.读取mytbl 流表中的数据
    val query: StreamingQuery = spark.readStream
      .table("mytbl")
      .withColumn("new_age", col("age").plus(6))
      .select("id", "name", "age", "new_age")
      .writeStream
      .format("console")
      .start()

    query.awaitTermination()

  }
}

以上代码编写完成后启动,向控制台输入以下数据:

1,zs,18
2,ls,19
3,ww,20
4,ml,21
5,tq,22

结果输入如下:

注意:以上代码执行时Spark中写出的表由Spark 参数”spark.sql.warehouse.dir”指定的路径临时维护数据,每次执行时,需要将该路径下的表数据清空。

三、​​​​​​​​​​​​​​Triggers

Structured Streaming Triggers 决定了流式数据被处理时是微批处理还是连续实时处理,以下是支持的Triggers:

实时处理,以下是支持的Triggers:

Trigger Type

描述

Unspecified(默认)

  • 代码使用:Trigger.ProcessingTime(0L)。
  • 代码中没有明确指定触发类型则查询默认以微批模式执行,表示尽可能快的执行查询。

Fixed interval micro-batches(固定间隔批次)

  • 代码使用:Trigger.ProcessingTime(long interval,TimeUnit timeUnit)
  • 查询将以微批模式处理,批次间隔根据用户指定的时间间隔决定
  1. 如果前一个微批处理时间在时间间隔内完成,则会等待间隔时间完成后再开始下一个微批处理
  2. 如果前一个微批处理时间超过了时间间隔,那么下一个微批处理将在前一个微批处理完成后立即开始。
  3. 如果没有新数据可用,则不会启动微批处理。

One-time micro-batch(仅一次性触发)

  • 代码使用:Trigger.Once()
  • 只执行一个微批次查询所有可用数据,然后自动停止,适用于一次性作业。

Continuous with fixed checkpoint interval(以固定checkpoint interval连续处理(实验阶段))

  • 代码使用:Trigger.Continuous(long interval,TimeUnit timeUnit)
  • 以固定的Checkpoint间隔(interval)连续处理。在这种模式下,连续处理引擎将每隔一定的间隔(interval)做一次checkpoint,可获得低至1ms的延迟。

下面以读取Socket数据为例,Scala代码演示各个模式

1、​​​​​​​unspecified(默认模式)

代码如下:

//3.默认微批模式执行查询,尽快将结果写出到控制台
val query: StreamingQuery = frame.writeStream
  .format("console")
  .start()

query.awaitTermination()

2、​​​​​​​​​​​​​​Fixed interval micro-batches(固定间隔批次)

代码如下:

//3.用户指定固定间隔批次触发查询
    val query: StreamingQuery = frame.writeStream
      .format("console")
      .trigger(Trigger.ProcessingTime("5 seconds"))
//      .trigger(Trigger.ProcessingTime(5,TimeUnit.SECONDS)
      .start()
    query.awaitTermination()

注意:这种固定间隔批次指的是第一批次处理完成,等待间隔时间,然后处理第二批次数据,依次类推。

3、 ​​​​​​​​​​​​​​One-time micro-batch (仅一次触发)

代码如下:

//4.仅一次触发执行
val query: StreamingQuery = frame.writeStream
  .format("console")
  .trigger(Trigger.Once())
  .start()
query.awaitTermination()

4、​​​​​​​​​​​​​​Continuous with fixed checkpoint interval(连续处理)

Continuous不再是周期性启动task的批量执行数,而是启动长期运行的task,而是不断一个一个数据进行处理,周期性的通过指定checkpoint来记录状态(如果不指定checkpoint目录,会将状态记录在Temp目录下),保证exactly-once语义,这样就可以实现低延迟。详细内容可以参照后续“Continuous处理”章节。

代码如下:

//3.Continuous 连续触发执行
val query: StreamingQuery = frame.writeStream
  .format("console")
  //每10ms 记录一次状态,而不是执行一次
  .trigger(Trigger.Continuous(10,TimeUnit.MILLISECONDS))
  .option("checkpointLocation","./checkpint/dir4")
  .start()
query.awaitTermination()

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1947673.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

总结20个Python接单赚钱的平台,兼职月入6000+_让你早日实现财富自由

今天就给大家盘点几个基本入门接私活的资源,让你轻松学python,实现经济独立。 一、Python兼职种类: 接私活刚学会python那会,就有认识的朋友介绍做一个网站的私活,当时接单赚了4K,后又自己接过开发网站后…

vue3+element-plus 实现动态菜单和动态路由的渲染

在 Vue.js 中,使用 Vue Router 管理路由数据,并将其用于渲染 el-menu(Element UI 的菜单组件)通常涉及以下几个步骤: 定义路由元数据: 在你的路由配置中,为每个路由项添加 meta 字段&#xff0c…

SQL labs-SQL注入(五,使用sqlmap进行cookie注入)

本文仅作为学习参考使用,本文作者对任何使用本文进行渗透攻击破坏不负任何责任。 引言: Cookie 是一些数据, 存储于你电脑上的文本文件中。当 web 服务器向浏览器发送 web 页面时,在连接关闭后,服务端不会记录用户的信息。Cookie…

新形势下职业教育大数据人才培养策略

一、引言 随着信息技术的飞速发展,大数据已成为驱动经济社会变革的关键力量。在新形势下,职业教育作为技术技能人才培养的重要阵地,面临着如何适应大数据时代要求、提升人才培养质量的紧迫任务。当前,职业教育在大数据人才培养方…

【C语言】指针大小知多少 ?一场探寻C语言深处的冒险 !

目录 C语言中指针的大小1. 指针大小的基本概念1.1 32位系统1.2 64位系统 2. 指针大小示例2.1 32位系统输出2.2 64位系统输出 3. 指针大小与数据类型无关示例输出示例 4. 跨平台的指针大小示例输出示例 5. 关键点总结5.1 指针大小与平台关系5.2 跨平台编程注意事项 6. 指针大小示…

PySide(PyQt)的小部件通过伪状态以及自定义特性改变外观

1、通过伪状态来改变外观 伪状态是一种特殊的状态,通常用于描述控件在特定条件下的外观变化。这些状态不是控件的实际属性,而是用于在样式表中应用不同样式的标记。 以QPushButton为例。在 PySide6 中,QPushButton 具有多种伪状态&#xff0c…

卷积神经网络(二)-AlexNet

前言: AlexNet是2012年ImageNet竞赛冠军(以领先第二名10%的准确率夺得冠军)获得者Hinton和他的学生Alex Krizhevsky设计的,在ILSVRC-2010测试集上取得了top-1错误率37.5%,top-5错误率17.0%(优于第二名的16.4%),明显优…

科技快讯丨智驱未来,校企共融:浪潮海岳携手山东大学软件学院开展低代码开发实训活动

近日,山东大学软件学院暑期实训活动圆满落幕。作为领先的企业数字化转型优秀服务商,浪潮海岳主导的低代码开发课题吸引了众多师生参训,取得了良好成效。 当前,低代码开发已成为软件行业降本增效、提升用户体验的必然选择&#xff…

labview实现两台电脑共享变量传输及同步

因为工作需要,需要实现多台主机间进行数据传输, 有两个备选方案, 1:建立tcp,然后自己解包 2:就是通过共享变量传输 虽然共享变量也是建立在TCP/IP上面的,但是不用自己解包呀 关于共享变量网络上…

vivo手机恢复出厂设置在哪里?清除数据后如何找回?2个技巧

随着使用时间的增长,手机可能会因为累积的缓存文件、不必要的数据或软件问题而出现性能下降或系统运行缓慢。为了解决这些问题,执行恢复出厂设置成为了一种流行的解决方案。那么,vivo手机恢复出厂设置在哪里?数据清除后该如何找回…

CCRC-DSO数据安全官:打造数据“冷链”,做强做大数据产业

在7月22日国新办举办的“推动高质量发展”系列新闻发布会上,国家数据局局长刘烈宏宣布,为响应党的二十届三中全会的决策,将加速推进数字经济发展机制的构建和完善数据要素市场制度。 他强调了对地方试点探索的支持,目标是建立强大…

基础复习(数组)

数组 一维数组 1.静态初始化 数据类型[] 数组名 new 数据类型[]{元素1,元素2,元素3,...}; 数据类型[] 数组名 {元素1,元素2,元素3...}; 2.动态初始化 数组存储的元素的数据类型[] 数组名字 new 数组存储的元素的数据类型[长度]; 3.执行原理 变量存储的是数组的地址值。…

Pyqt5新手教程

PyQt界面开发的两种方式:可视化UI 编程式UI (1)可视化UI:基于Qt Designer可视化编辑工具进行组件拖放、属性设置、布局管理等操作创建界面。 一是将其保存为.ui文件,然后在PyQt应用程序中加载和使用.ui文件。 二是使用…

接口自动化测试框架实战-3-文件读写封装

上一小节我们详细介绍了项目中所使用的接口文档,本小节我们将进入到接口测试框架第一个部分通用函数commons的开发,本小节我们重点完成文件读写方法的封装。 首先为什么要封装文件读写的方法,原因有如下几点: 读接口配置&#x…

B站音视频分开 大小问题

音频是33331 kb,视频是374661 kb 合并之后却是2561363 kb 这可能是B站音频和视频分开的原因吧

html实现酷炫美观的可视化大屏(十种风格示例,附源码)

文章目录 完整效果演示1.蓝色流线风的可视化大屏1.1 大屏效果1.2 大屏代码1.3 大屏下载 2.地图模块风的可视化大屏2.1 大屏效果2.2 大屏代码2.3 大屏下载 3.科技轮动风的可视化大屏3.1 大屏效果3.2 大屏代码3.3 大屏下载 4.蓝色海洋风的可视化大屏4.1 大屏效果4.2 大屏代码4.3 …

深入指南:VitePress 如何自定义样式

💝💝💝欢迎莅临我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」…

边缘计算网关项目(含上报进程、32Modbus采集进程、设备搜索响应进程源码)

目录 边缘层 架构说明 包含知识点 数据上报进程 功能描述 功能开发 上报线程 数据存储线程 指令处理线程 项目源码 上报模块.c代码: 上报模块Makefile代码: STM32采集模块.c代码 设备搜索响应模块Linux部分.c代码 设备搜索响应模块Qt端代码.h …

计算机毕业设计-程序论文-高校智能排课系统

本系统开发采用技术为JSP、Bootstrap、Ajax、SSM、Java、Tomcat、Maven 此文章为本人亲自指导加编写,禁止任何人抄袭以及各类盈利性传播, 相关的代码部署论文ppt代码讲解答辩指导文件都有可私要 项目源码,请关注❥点赞收藏并私信博主&#xf…

UML通信图建模技术及应用例

新书速览|《UML 2.5基础、建模与设计实践》 在对系统的动态行为进行建模时,通信图常被用于按组织结构对控制流进行建模。与顺序图一样,一个单独的通信图只能显示一个控制流。 使用通信图建模时可以遵循如下策略: (1&#xff09…