Spark实时(六):Output Sinks案例演示

news2025/1/6 17:59:26

文章目录

Output Sinks案例演示

一、​​​​​​​File sink

二、​​​​​​​​​​​​​​Memory Sink

三、​​​​​​​​​​​​​​Foreach Sink

1、​​​​​​​foreachBatch

2、​​​​​​​​​​​​​​foreach


Output Sinks案例演示

当我们对流式数据处理完成之后,可以将数据写出到Flie、Kafka、console控制台、memory内存,或者直接使用foreach做个性化处理。关于将数据结果写出到Kafka在StructuredStreaming与Kafka整合部分再详细描述。

对于一些可以保证端到端容错的sink输出,需要指定checkpoint目录来写入数据信息,指定的checkpoint目录可以是HDFS中的某个路径,设置checkpoint可以通过SparkSession设置也可以通过DataStreamWriter设置,设置方式如下:

//通过SparkSession设置checkpoint
spark.conf.set("spark.sql.streaming.checkpointLocation","hdfs://mycluster/checkpintdir")

或者

//通过DataStreamWriter设置checkpoint
df.writeStream
  .format("xxx")
  .option("checkpointLocation","./checkpointdir")
  .start()

checkpoint目录中会有以下目录及数据:

  • offsets:记录偏移量目录,记录了每个批次的偏移量。
  • commits:记录已经完成的批次,方便重启任务检查完成的批次与offset批次做对比,继续offset消费数据,运行批次。
  • metadata:metadata元数据保存jobid信息。
  • sources:数据源各个批次读取详情。
  • sinks:数据sink写出批次情况。
  • state:记录状态值,例如:聚合、去重等场景会记录相应状态,会周期性的生成snapshot文件记录状态。

下面对File、memoery、foreach output Sink进行演示。

一、​​​​​​​​​​​​​​File sink

Flie Sink就是数据结果实时写入到执行目录下的文件中,每次写出都会形成一个新的文件,文件格式可以是parquet、orc、json、csv格式。

Scala代码如下:

package com.lanson.structuredStreaming.sink

import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  *  读取Socket数据,将数据写入到csv文件
  */
object FileSink {

  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().master("local")
      .appName("File Sink")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    val result: DataFrame = spark.readStream
      .format("socket")
      .option("host", "node3")
      .option("port", 9999)
      .load()

    val query: StreamingQuery = result.writeStream
      .format("csv")
      .option("path", "./dataresult/csvdir")
      .option("checkpointLocation","./checkpint/dir3")
      .start()
    query.awaitTermination()

  }
}

 ​​​​​​​

在socket中输入数据之后,每批次数据写入到一个csv文件中。 

二、​​​​​​​​​​​​​​Memory Sink

memory Sink是将结果作为内存表存储在内存中,支持Append和Complete输出模式,这种结果写出到内存表方式多用于测试,如果数据量大要慎用。另外查询结果表中数据时需要写一个循环每隔一段时间读取内存中的数据。

Scala代码如下:

package com.lanson.structuredStreaming.sink

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.streaming.StreamingQuery

/**
  *  读取scoket 数据写入memory 内存,再读取
  */
object MemorySink {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().master("local")
      .appName("Memory Sink")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    spark.sparkContext.setLogLevel("Error")

    val result: DataFrame = spark.readStream
      .format("socket")
      .option("host", "node3")
      .option("port", 9999)
      .load()


    val query: StreamingQuery = result.writeStream
      .format("memory")
      .queryName("mytable")
      .start()

    //查询内存中表数据
    while(true){
      Thread.sleep(2000)
      spark.sql(
        """
          |select * from mytable
        """.stripMargin).show()
    }

    query.awaitTermination()
  }

}

三、​​​​​​​​​​​​​​Foreach Sink

foreach 可以对输出的结果数据进行自定义处理逻辑,针对结果数据自定义处理逻辑数据除了有foreach之外还有foreachbatch,两者区别是foreach是针对一条条的数据进行自定义处理,foreachbatch是针对当前小批次数据进行自定义处理。

1、​​​​​​​foreachBatch

foreachBatch可以针对每个批次数据进行自定义处理,该方法需要传入一个函数,函数有2个参数,分别为当前批次数据对应的DataFrame和当前batchId。

案例:实时读取socket数据,将结果批量写入到mysql中。

Scala代码如下:

package com.lanson.structuredStreaming.sink

import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

/**
  *  读取Socket 数据,将数据写出到mysql中
  */
object ForeachBatchTest {

  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().appName("ForeachBatch Sink")
      .master("local")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()


    import spark.implicits._

    val df: DataFrame = spark.readStream
      .format("socket")
      .option("host", "node2")
      .option("port", 9999)
      .load()

    val personDF: DataFrame = df.as[String].map(line => {
      val arr: Array[String] = line.split(",")
      (arr(0).toInt, arr(1), arr(2).toInt)
    }).toDF("id", "name", "age")

    val query: StreamingQuery = personDF.writeStream
      .foreachBatch((batchDF: DataFrame, batchId: Long) => {
        println("batchID : " + batchId)
        batchDF.write.mode(SaveMode.Append).format("jdbc")
          .option("url","jdbc:mysql://node3:3306/testdb?useSSL=false")
          .option("user","root")
          .option("password","123456")
          .option("dbtable","person")
          .save()
      }).start()
    query.awaitTermination();


  }

}

运行结果: 

Java代码如下:

 

package com.lanson.structuredStreaming.sink;

import java.util.concurrent.TimeoutException;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
import scala.Tuple3;

public class ForeachBatchTest01 {
    public static void main(String[] args) throws TimeoutException, StreamingQueryException {
        SparkSession spark = SparkSession.builder().master("local")
                .appName("ForeachBatchTest01")
                .config("spark.sql.shuffle.partitions", 1)
                .getOrCreate();
        spark.sparkContext().setLogLevel("Error");

        Dataset<Row> result = spark.readStream()
                .format("socket")
                .option("host", "node2")
                .option("port", 9999)
                .load()
                .as(Encoders.STRING())
                .map(new MapFunction<String, Tuple3<Integer, String, Integer>>() {
                    @Override
                    public Tuple3<Integer, String, Integer> call(String line) throws Exception {
                        String[] arr = line.split(",");
                        return new Tuple3<>(Integer.valueOf(arr[0]), arr[1], Integer.valueOf(arr[2]));
                    }
                }, Encoders.tuple(Encoders.INT(), Encoders.STRING(), Encoders.INT()))
                .toDF("id", "name", "age");

        result.writeStream()
                .foreachBatch(new VoidFunction2<Dataset<Row>, Long>() {
                    @Override
                    public void call(Dataset<Row> df, Long batchId) throws Exception {
                        System.out.println("batchID : "+batchId);
                        //将df 保存到mysql
                        df.write().format("jdbc")
                                .mode(SaveMode.Append)
                                .option("url","jdbc:mysql://node3:3306/testdb?useSSL=false" )
                                .option("user","root" )
                                .option("password","123456" )
                                .option("dbtable","person" )
                                .save();
                    }
                }).start()
                .awaitTermination();

    }
}

运行结果:

 

在mysql中创建testdb库,并创建person表,这里也可以不创建表:

create database testdb;
create table person(id int(10),name varchar(255),age int(2));
1,zs,18
2,ls,19
3,ww,20
4,ml,21
5,tq,22
6,ll,29

mysql结果如下:

2、​​​​​​​​​​​​​​foreach

foreach可以针对数据结果每条数据进行处理。

案例:实时读取socket数据,将结果一条条写入到mysql中。

Scala代码如下:

package com.lanson.structuredStreaming.sink

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.sql.execution.streaming.sources.ForeachWrite
import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession}

object ForeachSinkTest {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().appName("ForeachBatch Sink")
      .master("local")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    spark.sparkContext.setLogLevel("Error")

    import spark.implicits._

    val df: DataFrame = spark.readStream
      .format("socket")
      .option("host", "node2")
      .option("port", 9999)
      .load()

    val personDF: DataFrame = df.as[String].map(line => {
      val arr: Array[String] = line.split(",")
      (arr(0).toInt, arr(1), arr(2).toInt)
    }).toDF("id", "name", "age")

    personDF.writeStream
      .foreach(new ForeachWriter[Row]() {
        var  conn: Connection  = _
        var pst: PreparedStatement = _
        //打开资源
        override def open(partitionId: Long, epochId: Long): Boolean = {
          conn = DriverManager.getConnection("jdbc:mysql://node3:3306/testdb?useSSL=false","root","123456")
          pst = conn.prepareStatement("insert into person values (?,?,?)")
          true
        }

        //一条条处理数据
        override def process(row: Row): Unit = {
          val id: Int = row.getInt(0)
          val name: String = row.getString(1)
          val age: Int = row.getInt(2)
          pst.setInt(1,id)
          pst.setString(2,name)
          pst.setInt(3,age)
          pst.executeUpdate()
        }

        //关闭释放资源
        override def close(errorOrNull: Throwable): Unit = {
          pst.close()
          conn.close()
        }
      }).start()
      .awaitTermination()
  }

}

运行结果:

Java代码如下:

package com.lanson.structuredStreaming.sink;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.concurrent.TimeoutException;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.ForeachWriter;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
import scala.Tuple3;

public class ForeachSinkTest01 {
    public static void main(String[] args) throws TimeoutException, StreamingQueryException {
        SparkSession spark = SparkSession.builder().master("local")
                .appName("SSReadSocketData")
                .config("spark.sql.shuffle.partitions", 1)
                .getOrCreate();
        spark.sparkContext().setLogLevel("Error");

        Dataset<Row> result = spark.readStream()
                .format("socket")
                .option("host", "node2")
                .option("port", 9999)
                .load()
                .as(Encoders.STRING())
                .map(new MapFunction<String, Tuple3<Integer, String, Integer>>() {
                    @Override
                    public Tuple3<Integer, String, Integer> call(String line) throws Exception {
                        String[] arr = line.split(",");
                        return new Tuple3<>(Integer.valueOf(arr[0]), arr[1], Integer.valueOf(arr[2]));
                    }
                }, Encoders.tuple(Encoders.INT(), Encoders.STRING(), Encoders.INT()))
                .toDF("id", "name", "age");

        result.writeStream()
                .foreach(new ForeachWriter<Row>() {
                    Connection conn;
                    PreparedStatement pst ;
                    @Override
                    public boolean open(long partitionId, long epochId) {
                        try {
                            conn = DriverManager.getConnection("jdbc:mysql://node3:3306/testdb?useSSL=false", "root", "123456");
                            pst = conn.prepareStatement("insert into person values (?,?,?)");
                        } catch (SQLException e) {
                            e.printStackTrace();
                        }
                        return true;
                    }
                    @Override
                    public void process(Row row) {
                        int id = row.getInt(0);
                        String name = row.getString(1);
                        int age = row.getInt(2);
                        try {
                            pst.setInt(1,id );
                            pst.setString(2,name );
                            pst.setInt(3,age );
                            pst.executeUpdate();
                        } catch (SQLException e) {
                            e.printStackTrace();
                        }


                    }

                    @Override
                    public void close(Throwable errorOrNull) {
                        try {
                            pst.close();
                            conn.close();
                        } catch (SQLException e) {
                            e.printStackTrace();
                        }
                    }


                }).start().awaitTermination();
    }
}

运行

以上代码编写完成后,清空mysql person表数据,然后输入以下数据:

1,zs,18
2,ls,19
3,ww,20
4,ml,21
5,tq,22
6,ll,29
1,zs,18
2,ls,19
3,ww,20
4,ml,21
5,tq,22
6,ll,29

mysql结果如下:


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1953097.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

HarmonyOS NEXT星河版零基础入门到实战

文章目录 一、HarmonyOS NEXT介绍学习内容1、鸿蒙APP开发2、能力套件开发3、全场景开发适合人群 持续更新中✒️总结 一、HarmonyOS NEXT介绍 放弃安卓框架之后&#xff0c;HarmonyOS NEXT成为真正独立于安卓、iOS的操作系统&#xff0c;堪称是一场史无前例的脱胎换骨。在其众多…

模拟依赖关系和 AI 是Vue.js测试的下一个前沿领域

Vue.js 是一个流行的 JavaScript 框架&#xff0c;因此&#xff0c;确保其组件按预期工作至关重要&#xff1a;有效&#xff0c;更重要的是&#xff0c;可靠。模拟依赖项是最有效的测试方法之一&#xff0c;我们将在本文中发现。 模拟依赖项的必要性 模拟依赖项是一种对测试施加…

大数据-49 Redis 缓存问题中 穿透、雪崩、击穿、数据不一致、HotKey、BigKey

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; 目前已经更新到了&#xff1a; Hadoop&#xff08;已更完&#xff09;HDFS&#xff08;已更完&#xff09;MapReduce&#xff08;已更完&am…

亚信安慧AntDB-M负载均衡

负载均衡是分布式系统中常用的技术&#xff0c;主要是将工作任务均衡分布到系统的各个资源点上&#xff0c;可以充分利用系统资源。 AntDB-M分布式内存数据库节点角色可以分为管理节点(MN)、计算节点(CN)和数据节点(DN)三种。管理节点收到客户端连接请求后&#xff0c;会经由负…

【学习记录】锚框

主要解释程序代码&#xff0c;具体解析在代码中进行标注 锚框&#xff0c;具体看见网址https://zh-v2.d2l.ai/chapter_computer-vision/anchor.html#iou 对应程序解析&#xff1a;https://fkjkkll.github.io/2021/11/23/%E7%9B%AE%E6%A0%87%E6%A3%80%E6%B5%8BSSD/#more 目录…

数据结构:(1)线性表

一、基本概念 概念&#xff1a;零个或多个数据元素的有限序列 元素之间是有顺序了。如果存在多个元素&#xff0c;第一个元素无前驱&#xff0c;最后一个没有后继&#xff0c;其他的元素只有一个前驱和一个后继。 当线性表元素的个数n&#xff08;n>0&am…

UE4 UnrealPak加密功能(配置AES encrypt key)

本文的重点在于如何使用UnrealPak的加密功能&#xff0c;以及相关的UE4源代码学习。本文参考了&#xff1a;https://www.cnblogs.com/shiroe/p/14803859.html 。 设置密钥 在编辑、项目设置中找到下面栏目&#xff0c;并点击“生成新的加密密钥”&#xff0c;就可以为Unreal P…

K210视觉识别模块学习笔记7:多线程多模型编程识别

今日开始学习K210视觉识别模块: 图形化操作函数 亚博智能 K210视觉识别模块...... 固件库: canmv_yahboom_v2.1.1.bin 训练网站: 嘉楠开发者社区 今日学习使用多线程、多模型来识别各种物体 这里先提前说一下本文这次测试实验的结果吧&#xff1a;结果是不太成…

ERROR: Cannot find command ‘git’- do you have ‘git’ installed and in your PATH?

ERROR: Cannot find command ‘git’- do you have ‘git’ installed and in your PATH? 目录 ERROR: Cannot find command ‘git’- do you have ‘git’ installed and in your PATH? 【常见模块错误】 【解决方案】 欢迎来到英杰社区https://bbs.csdn.net/topics/61780…

详解程序的预处理与编译与连接

文章目录 1.程序的翻译环境和执行环境2.详解编译链接2.1 翻译环境2.2 编译本身也分为几个阶段2.3 运行环境 3.预处理详解3.1 预处理符号3.2 \#define3.2.1 \#define 定义标识符3.2.2 \#define定义宏3.2.3 \#define 替换规则 3.2.4 \#和\#\#3.2.5 带副作用的宏参数3.2.6 宏和函数…

数据结构之判断二叉树是否为搜索树(C/C++实现)

文章目录 判断二叉树是否为搜索树方法一&#xff1a;递归法方法二&#xff1a;中序遍历法总结 二叉树是一种非常常见的数据结构&#xff0c;它在计算机科学中有着广泛的应用。二叉搜索树&#xff08;Binary Search Tree&#xff0c;简称BST&#xff09;是二叉树的一种特殊形式&…

【VUE】封装一个追随鼠标的漂浮组件框架

红色箭头代表鼠标位置&#xff0c;蓝色区域跟随鼠标出现&#xff0c;鼠标进行其他操作的时候&#xff0c;蓝色区域隐藏。 vue全码 <template><divmousemove"updatePosition"mouseleave"hideDiv"class"container":style"{ positi…

AXI总线

目录 一、AXI接口特点二、AXI接口的握手机制2.1 握手原理2.2 握手机制的三种情形 三、AXI接口的通道3.1 AXI4-Stream3.1.1 通道信号3.1.2 数据字节类型3.1.3 流格式 3.2 AXI4-Lite和AXI4-Full3.1.1 读地址通道3.1.2 读数据通道3.1.3 写地址通道3.1.4 写数据通道3.1.5 写响应通道…

MybatisPlus分页插件

分页查询是一个很常见的需求&#xff0c;故Mybatis-Plus提供了一个分页插件&#xff0c;使用它可以十分方便的完成分页查询。下面介绍Mybatis-Plus分页插件的用法&#xff0c;详细信息可参考官方链接。 配置分页插件 创建com.atguigu.hellomp.config.MPConfiguration配置类&a…

奇异值分解(SVD)时间复杂度分析与优化

奇异值分解是一种矩阵分解的方法&#xff0c;大学线性代数里面也讲过奇异值分解的方法&#xff0c;因此这是一个为大家所熟知的算法。 1 SVD 时间复杂度分析 给定一个 m n m \times n mn 的矩阵 a \boldsymbol{a} a&#xff0c;按照下面公式做分解&#xff0c;其中 Σ \S…

python语言利用Tkinter实现GUI计算器|(二)优化计算器:过滤用户不合理的输入

python语言利用Tkinter实现GUI计算器 python语言利用Tkinter实现GUI计算器|&#xff08;一&#xff09;计算器基本功能设计 python语言利用Tkinter实现GUI计算器|&#xff08;二&#xff09;优化计算器 python语言利用Tkinter实现GUI计算器|&#xff08;三&#xff09;pyinstal…

python在类中手动定义标准化输出函数

在深度学习等训练框架中&#xff0c;有时候需要对模型的名称、参数量、训练进度、中间结果等进行标准化输出&#xff0c;从而方便实时查看代码运行情况&#xff0c;这时&#xff0c;可以在类中手动定义如下标准化的输出函数&#xff0c;然后在需要输出的地方进行调用即可。 首…

免费【2024】springboot 超市货品信息管理系统

博主介绍&#xff1a;✌CSDN新星计划导师、Java领域优质创作者、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和学生毕业项目实战,高校老师/讲师/同行前辈交流✌ 技术范围&#xff1a;SpringBoot、Vue、SSM、HTML、Jsp、PHP、Nodejs、Python、爬虫、数据可视化…

修复SteamUI.dll加载失败的指南,快速修复failed to load steamui.dll

在使用Steam平台进行游戏下载、安装和运行时&#xff0c;可能会遇到一些系统错误&#xff0c;比如“failed to load steamui.dll”。这个错误通常意味着Steam的用户界面库文件steamui.dll出现了问题。本文将详细介绍steamui.dll文件的相关信息以及如何修复这一问题。 一.什么是…

vue 开发环境配置

1. nvm 安装 在 github上下载 最新的 nvm 包 https://github.com/coreybutler/nvm-windows/releases或者在 csdn 上下载&#xff08;从github上迁移&#xff0c;方便下载&#xff09;https://download.csdn.net/download/u011171506/89585197 下载后不用修改任何配置&#x…