StructuredStreaming Sink

news2024/9/27 10:55:39

StructuredStreaming Sink

Output Modes

  • append

    默认追加模式, 将新的数据输出,只支持简单查询

  • complete

    完整模式,支持聚合和排序

  • update

    更新模式,支持聚合不支持排序,没有聚合和append一样

下面这段操作,有聚合,有排序,只能用complete

val ds = df.as[String]
val wordDs = ds.flatMap(_.split(" "))
val result = wordDs
  .groupBy('value)
  .count()
  .orderBy('count)

Sink位置

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zcv9N6Yb-1674275476338)(F:\study\大数据\spark\spark3.0\代码-笔记\Spark-day05\笔记\Spark-day05.assets\1610097909181.png)]

Memory sink

输出内存表存储在内存中

支持append和complete

应用于测试环境

// 输出
val query = result.writeStream
  .format("memory")
  .queryName("result")
  .outputMode("complete")
  .start()

while (true) {

  spark.sql("select * from result").show()
  Thread.sleep(3000)
}

完整代码

object Sink {

  def main(args: Array[String]): Unit = {

    // 创建环境
    val spark = SparkSession.builder().appName("Operation").master("local[*]")
      .config("spark.sql.shuffle.partitions", "4") // 设置分区数
      .getOrCreate()

    val sc = spark.sparkContext

    import spark.implicits._

    // 加载数据
    val df = spark.readStream
      .format("socket")
      .option("host", "hadoop102")
      .option("port", 9999)
      .load()

    df.printSchema()

    // 处理数据

    // DSL
    val ds = df.as[String]
    val wordDs = ds.flatMap(_.split(" "))
    val result = wordDs
      .groupBy('value)
      .count()
      .orderBy('count)


    // 输出
    val query = result.writeStream
      .format("memory")
      .queryName("result")
      .outputMode("complete")
      .start()

    while (true) {

      spark.sql("select * from result").show()
      Thread.sleep(3000)
    }

//    query.awaitTermination()

    spark.stop()


  }

}

ForeachBatch Sink

ForeachSink可以对输出记录进行任意计算,针对每一条数据

ForeachBatch Sink 针对每一批数据

object ForeachBatchSink {

  def main(args: Array[String]): Unit = {
    //TODO 0.创建环境
    val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[*]")
      .config("spark.sql.shuffle.partitions", "4")
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    import spark.implicits._

    //TODO 1.加载数据
    val df: DataFrame = spark.readStream
      .format("socket")
      .option("host", "node1")
      .option("port", 9999)
      .load()

    df.printSchema()

    //TODO 2.处理数据
    val ds: Dataset[String] = df.as[String]
    val result: Dataset[Row] = ds.flatMap(_.split(" "))
      .groupBy('value)
      .count()
      .orderBy('count.desc)

    //TODO 3.输出结果
    result.writeStream
      .foreachBatch((ds: Dataset[Row], batchId:Long) => {
        //自定义输出到控制台
        println("-------------")
        println(s"batchId:${batchId}")
        println("-------------")
        ds.show()
        //自定义输出到MySQL
        ds.coalesce(1)
          .write
          .mode(SaveMode.Overwrite)
          .format("jdbc")
          .option("url", "jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8")
          .option("user", "root")
          .option("password", "root")
          .option("dbtable", "bigdata.words")
          .save()
      })
      .outputMode("complete")
      //TODO 4.启动并等待结束
      .start()
      .awaitTermination()


    //TODO 5.关闭资源
    spark.stop()
  }

}

触发间隔

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cwImfSh6-1674275476340)(F:\study\大数据\spark\spark3.0\代码-笔记\Spark-day05\笔记\Spark-day05.assets\1610099456264.png)]

微批处理的时候,每隔批次都可以做checkpoint

连续处理时需要指定时间做checkpoint

val ds: Dataset[String] = df.as[String]
    val result: Dataset[Row] = ds.coalesce(1).flatMap(_.split(" "))
      .groupBy('value)
      .count()

result.writeStream
  .format("console")
  .outputMode("complete")
  //触发间隔:
  //1.默认的不写就是:尽可能快的运行微批,Default trigger (runs micro-batch as soon as it can)
  //2.指定0也是尽可能快的运行
  // .trigger(Trigger.ProcessingTime("0 seconds"))
  //3.指定时间间隔
  //.trigger(Trigger.ProcessingTime("5 seconds"))
  //4.触发1次
  //.trigger(Trigger.Once())
  //5.连续处理并指定Checkpoint时间间隔,实验的
  .trigger(Trigger.Continuous("1 second"))
  .option("checkpointLocation", "./ckp"+System.currentTimeMillis())
  //TODO 4.启动并等待结束
  .start()
  .awaitTermination()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/175075.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python机器学习数据建模与分析——Numpy和Pandas综合应用案例:空气质量监测数据的预处理和基本分析

本篇文章主要以北京市空气质量监测数据为例子,聚集数据建模中的数据预处理和基本分析环节,说明Numpy和Pandas的数据读取、数据分组、数据重编码、分类汇总等数据加工处理功能。同时在实现案例的过程中对用到的Numpy和Pandas相关函数进行讲解。 文章目录数…

新的一年即将到来,分享2023年火爆的行业和值得做的副业兼职项目

明天就是除夕啦,小编还在努力工作着,分享完这一篇文章后,小编也要和家人朋友们一起好好休息下,过一个乐此不疲的春节。今天分享的主要是对明年行业的憧憬以及一些值得做的副业兼职项目,文章比较长,如果觉得…

Spring源码学习:setter循环依赖

1.案例:Component public class A {private B b;Autowiredpublic void setB(B b) {this.b b;}public B getB() {return b;}public void f(){System.out.println(b);} }Component public class B {private A a;Autowiredpublic void setA(A a) {this.a a;}public v…

Ubuntu 终端美化(oh-my-zsh)

文章目录Ubuntu 终端美化(oh-my-zsh)一、 环境准备二、 配置文件1、 主题2、 修改插件2.1 官方插件2.2 第三方插件Ubuntu 终端美化(oh-my-zsh) 一、 环境准备 这个美化教程适合于大多数的 Linux 系统,其实可以通用的。…

递归和分治(基础)

目录 一、递归的定义 1、什么时候会用到递归的方法 1. 定义是递归的 2. 数据结构是递归的 3. 问题的解法是递归的 2、应用递归的原则 3、递归调用顺序问题 1. 首先递归的过程可以总结为以下几点: 2. 递归工作栈 二、 递归和非递归的转化 1. 单向递归可…

【JavaGuide面试总结】计算机网络·下

【JavaGuide面试总结】计算机网络下1.HTTP 和 HTTPS 有什么区别?2.HTTP 1.0 和 HTTP 1.1 有什么区别?连接方式状态响应码缓存处理Host头处理带宽优化3.HTTP 是不保存状态的协议, 如何保存用户状态?4.简单说说 ARP 协议的工作原理同一局域网内的 MAC 寻址…

【Java寒假打卡】JavaWeb-ServletContext

【Java寒假打卡】JavaWeb-ServletContext概述域对象ServletContext的配置方式ServletContext的常用方法ServletContext共享数据的方法概述 ServletContext是应用上下文对象(应用域对象)。每一个应用中只有一个ServletContext对象作用:可以配…

kubernetes集群搭建问题记录

centos7 系统 内核需要升级 centos7内核升级文章 init-config.yaml文件初始化master的时候 advertiseAddress 是主机的ip地址 kubeadm kubelet kubectl 安装 1.19.0 版本,版本高了有问题 yum install -y kubeadm-1.19.0 kubelet-1.19.0 kubectl-1.19.0 master 和…

<Python的字典>——《Python》

目录 1. 字典 1.1 字典是什么 1.2 创建字典 1.3 查找 key 1.4 新增/修改元素 1.5 删除元素 1.6 遍历字典元素 1.7 取出所有 key 和 value 1.8 合法的 key 类型 1. 字典 1.1 字典是什么 字典是一种存储 键值对 的结构. 键值对是计算机/生活中一个非常广泛使用的概念…

【C++算法图解专栏】一篇文章带你掌握前缀和算法(一维+二维)

✍个人博客:https://blog.csdn.net/Newin2020?spm1011.2415.3001.5343 📣专栏定位:为 0 基础刚入门数据结构与算法的小伙伴提供详细的讲解,也欢迎大佬们一起交流~ 📚专栏地址:https://blog.csdn.net/Newin…

Leetcode.1824 最少侧跳次数

题目链接 Leetcode.1824 最少侧跳次数 题目描述 给你一个长度为 n的 3 跑道道路 ,它总共包含 n 1个 点 ,编号为 0 到 n 。一只青蛙从 0 号点第二条跑道 出发 ,它想要跳到点 n处。然而道路上可能有一些障碍。 给你一个长度为 n 1的数组 ob…

ESP32设备驱动-DHT22数字温度湿度传感器驱动

DHT22数字温度湿度传感器驱动 1、DHT22介绍 DHT22电容式湿度传感数字温湿度模块是一款包含复合已校准数字信号输出的温湿度传感器。 应用了专用的数字模块采集技术和温湿度传感技术,确保产品具有高可靠性和优异的长期稳定性。 该传感器包括一个电容式传感器湿元件和一个高精…

Word2Vec与文章相似度

2.7 Word2Vec与文章相似度 学习目标 目标 知道文章向量计算方式了解Word2Vec模型原理知道文章相似度计算方式应用 应用Spark完成文章相似度计算 2.7.1 文章相似度 在我们的某项目推荐中有很多地方需要推荐相似文章,包括首页频道可以推荐相似的文章,详情…

详解Map和Set

目录 一、二叉搜索树 1、概述 2、模拟实现搜索二叉树 a、向搜索二叉树中插入数据 b、查找二叉搜索树的指定值的结点 c、删除二叉树的指定值的结点 3、对二叉搜索树进行性能分析 二、Map的使用 1、Map简介 2、Map常用方法 ​编辑三、Set的使用 1、Set简介 2、S…

零基础学习笔记 - ADF4159

目录1.准备工作1.1.前言1.2.资料1.3.介绍1.4.应用1.5.应用电路2.ADF41592.1.功能框图2.2.通信协议时序2.2.寄存器2.2.0.注意2.2.1.延迟寄存器(R7)映射2.2.2.步进寄存器(R6)映射2.2.3.偏差寄存器(R5)映射2.2.4.时钟寄存器(R4)映射2.2.5.功能寄存器(R3)映射2.2.6.R分频器寄存器(R…

Batchsize的大小怎样设置?Batchsize过大和过小有什么影响

一、Batchsize基本介绍 1. Batchsize是什么 batch_size:表示单次传递给程序用以训练的数据(样本)个数。如果我们的数据集钟含有的样本总数为12800个样本,batch_size=128,那么就需要10个batch才能够训练完一个epoch。 batch_size一般取值为2的N次幂的形式,这是因为CPU或…

高级性能测试系列《38.Arrivals Thread Group、ConcurrencyThread Group、终极线程组》

一、面向目标:Arrivals Thread Group需求:要做一个秒杀, 能支持1000个人同时秒杀,我们的系统不能崩溃。错误案例示范1秒内的人数的运行是有先后的,1000个人在1秒钟内启动,运行完毕一次就停掉了。由图可以看…

Cadence PCB仿真使用Allegro PCB SI查看仿真波形的方法图文教程

🏡《Cadence 开发合集目录》   🏡《Cadence PCB 仿真宝典目录》 目录 1,概述2,拓扑提取阶段仿真方法3,图纸设计阶段仿真方法4,总结1,概述 本文简单介绍使用Alegro PCB SI执行仿真查看仿真波形的两种方法。 2,拓扑提取阶段仿真方法 如下图在拓扑提取阶段,添加完激励…

走进后端开发流程 | 青训营笔记

目录 一、走进后端开发流程 1、传统流程 2、敏捷开发 3、SAFe简介 4、字节团队的开发流程 二、开发流程详解 1、需求阶段 2、开发阶段 云原生开发 团队的分支策略 自测 3、测试阶段 4、发布阶段 简单发布 金丝雀发布 滚动发布(推荐) 蓝…

记录每日LeetCode 160.相交链表 Java实现

题目描述: 给你两个单链表的头节点 headA 和 headB ,请你找出并返回两个单链表相交的起始节点。如果两个链表不存在相交节点,返回 null 。 图示两个链表在节点 c1 开始相交: 题目数据 保证 整个链式结构中不存在环。 注意&…