Spark Structured Streaming 分流或双写多表 / 多数据源(Multi Sinks / Writes)

news2025/1/16 18:46:19
《大数据平台架构与原型实现:数据中台建设实战》博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。

在 Spark Structured Streaming 中,我们有时候需要将最后的处理结果分流或双写到多张表或多个数据源(Multi Sinks / Writes),一个典型的例子是:在 CDC 数据入湖场景里,一个 Kafka Topic 上存放着整库或多张表的 CDC 消息,使用 Spark 从 Kafka 中摄取这些消息后,需要根据消息中提供的数据库名和数据表名对 CDC 消息分流,然后写到数据湖上对应的 ODS 表中,这就是一种典型的“数据分流”场景。在 Spark Structured Streaming 中,实现多表 / 多数据源的分流或双写主要依赖 foreachBatchforeach 这两个方法,本文就围绕它们介绍一下分流或双写多表 / 多数据源的具体实现。

首先,要明确 foreachforeachBatch 都是 action,也就意味着使用它们时已经到了流的末端,绝大数情况下,就是要将记录写入目标数据源了,这也是foreachforeachBatch 这两个方法绝大多数的应用场景。通常,在 Spark 中将数据写入一个数据源是这样做的(以写 parquet 文件为例):

writeStream
    .format("parquet")
    .option("path", "path/to/destination/dir")
    .start()

由于 Spark 内置了 parquet 格式的 data writer, 所以我们只需填写一些相应的配置,就可以直接把 DF 按对应的格式写到目标位置了,那什么情况下我们要使用 foreachforeachBatch 呢?下面展开来介绍一下。

1. foreachBatch 的应用场景


大多数情况下,一条的流处理的 pipeline 都是从一个 Source 开始,中间经历各种处理后,最终写入了一个 Sink,但是,在某些场景下,我们流的重点可能需要写入的并不是一个 Sink,而是多个,典型的情形有:

  • 数据分流:需要将数据“分流”写入不同的数据源或数据表( 简单说就是 dispatch )

  • 数据多写:需要同时向多个下游数据源相同相同数据( 简单说就是 duplicate )

虽然我们可以非常“粗暴”地通过 for 循环构建多个 writer 实现上述两种典型的写入需求,但是这种做法会让每一个 sink 变成独立的 streaming query(作业),是代价很高的应对方法,并不实用。最好的做法就是通过 foreachBatch 来实现,实际上上面两种需求正是 foreachBatch 的典型应用场景。我们看一下 foreachBatch 的接口声明:

def foreachBatch(function: (Dataset[T], Long) ⇒ Unit): DataStreamWriter[T]

我们需要为 foreachBatch 传入一个函数字面量,它有两参数,第一个对应一个 micro-batch 的 DataFrame, 第二个是这个 micro-batch 的 ID,拿到 micro-batch 的 DataFrame 后,我们可以在这个 DataFrame 上作相应的转换处理,最后调用现成的 writer 写入目标端。这里涉及到 Spark Streaming 的 Micro-Batch,也就是上述参数列表中的 Dataset[T] 类型的那个 DataFrame ,关于 Micro-Batch 在流上运行方式,下图给出了非常形象的描绘:

在这里插入图片描述

简单地说,Micro-Batch 模式下需要收集齐一定量(或一小段时间范围内)的数据,整理成一个 DataFrame 去处理,它的延迟是在秒级。上图下方时间轴上的每一小撮数据就是 foreachBatch 中传入的那个 DataFrame。

这里,我们特别澄清一个容易误解的地方: foreachBatch 是没有“循环”语义的,这里的 foreach 其实是意在针对每一个 micro-batch 的,不是空间维度上迭代多个 micro-batch, 而是时间维度上针对每一个流经的 micro-batch 进行处理。这里也能提现从 source 构建出的 DF 和这个方法里的 micro-batch 的 DF 的差异,前者是一个无界的 DF,本质上是一个流,更加“实体”的 DF 其实是 foreachBatch 中的这个 DF,它是较短时间内聚齐的“一小撮”数据,边界是确定的!

下面,我们针对分流和双写两种典型场景给出详细的示例代码。

1.1. 通过 foreachBatch 实现数据“分流”


我们以向两种不同的 Hudi 表写入数据为例,先将数据过滤,得到分流后的 DF,然后向对应的 Hudi 表中写入:

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  // 分流 table_1 的数据并写入
  filteredDF1 = batchDF.filter(...)
  filteredDF1.write.format("hudi").
      option(TABLE_NAME, "table_1").
      mode(SaveMode.Append).
      save("/path/1")
    
  // 分流 table_2 的数据并写入
  filteredDF2 = batchDF.filter(...)
  filteredDF2.write.format("hudi").
      option(TABLE_NAME, "table_2").
      mode(SaveMode.Append).
      save("/path/2")
}

1.2. 通过 foreachBatch 实现数据“双写”


我们以向两种不同的数据源写入数据为例,可以调用多次 write 操作,但是,由于每次写入都会导致数据被 recomputed,流本身可能不再存在或状态发生了改变,所以,必须要在写入前使用 persist, 保证向下游多次写入的数据是完全一样,最后记得再执行一遍 unpersist 即可。

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.persist()
  batchDF.write.format("csv").save(...)  // location 1, 对应数据源的 data writer 是已存在的
  batchDF.write.format("hudi").save(...)  // location 2, 对应数据源的 data writer 是已存在的
  batchDF.unpersist()
}

2. foreach 的应用场景


foreachBatch 很实用,但是在如下两种场景下无法工作的:

  • 没有现成的支持目标数据源的 data writer;
  • 当前流运行于 continuous processing 模式,不支持 micro-batch

如果是上述情形,我们就得使用 foreach 了,因为 foreach 要自行实现对目标数据源的链接和读写,同时,它的自定义处理逻辑又是作用到每一行上的,所以它能解决上述两种场景的问题。某种程度上,foreach 相比 foreachBatch 是一种更底层的 API。使用 foreach 需要提供一个 ForeachWriter,实现 open, process, 和close 三个方法,不过要注意的是这三个方案的调用时机是不同的,open / close 显然是 per-partition 要调用一次的, proess 则是要针对每条记录进行处理的。以下是 一个自行实现 foreach 的代码模板:

streamingDatasetOfString.writeStream.foreach(
  // 没有现成的 DataStreamWriter,需要自行实现行级别的存储逻辑。
  new ForeachWriter[String] {
	// 在 partition 这个粒度上创建针对目标数据源的连接,这比较符合常规
    def open(partitionId: Long, version: Long): Boolean = {
      // Open connection
    }
	// 数据梳理逻辑会作用到记录级别,而不是 miro-batch 的 df 级别。
    def process(record: String): Unit = {
      // Write string to connection
    }
	// 关闭连接,释放资源
    def close(errorOrNull: Throwable): Unit = {
      // Close the connection
    }
  }
).start()

关于 foreach 更多信息可以参考官方文档,这里就不再深究了,大多数情况,我们更多使用的还是 foreachBatch

3. 小结


foreachforeachBatch 都能在向目标数据源写入数据时实现定制化的逻辑,它们之间的差别在于:

  • foreachBatch多应用于数据分流或双写场景,目标数据源往往是已经有线程的 data writer 了
  • foreach 则要自行实现对目标数据的连接和读写处理
  • 两者操纵数据的颗粒度不同,foreach 对数据的梳理逻辑(process 方法)作用到 DF 中的每一行上,而 foreachBatch 则直接操纵的是每一个 micro-batch 对应的 DF。

参考资料

  • Spark 关于 foreach 和 foreachBatch 的官方文档
  • Introducing Low-latency Continuous Processing Mode in Structured Streaming in Apache Spark 2.3

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1636935.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据库管理-第179期 分库分表vs分布式(20240430

数据库管理179期 2024-04-30 数据库管理-第179期 分库分表vs分布式(20240430)1 分库分表1.1 分库1.2 分表1.3 组合1.4 问题 2 分布式3 常见分布式数据库4 期望总结 数据库管理-第179期 分库分表vs分布式(20240430) 作者&#xff1…

git 第一次安装设置用户名密码

git config --global user.name ljq git config --global user.email 15137659164qq.com创建公钥命令 输入后一直回车 ssh-keygen -t rsa下面这样代表成功 这里是公钥的 信息输入gitee 中 输入下面命令看是否和本机绑定成功 ssh -T gitgitee.com如何是这样,恭喜…

spring的高阶使用技巧1——ApplicationListener注册监听器的使用

Spring中的监听器,高阶开发工作者应该都耳熟能详。在 Spring 框架中,这个接口允许开发者注册监听器来监听应用程序中发布的事件。Spring的事件处理机制提供了一种观察者模式的实现,允许应用程序组件之间进行松耦合的通信。 更详细的介绍和使…

Flask简介

Flask简介 安装概述使用PyCharm创建一个Flask程序 Flask程序的基本结构初始化路由和视图函数启动服务器请求-响应循环 安装 概述 Flask算是小型框架,小到可以称为“微框架”。Flask 非常小,因此你一旦能够熟练使用它,很可能就能读懂它所有的…

element的el-table 解决表格多页选择数据时,数据被清空

问题:切换页码时,勾选的数据会被清空 重点看我圈出来的,直接复制,注意,我这里 return row.productId;一般大家的是 return row.id,根据接口定的唯一变量 :row-key"getRowKeys"​​​​​​​:reserve-sele…

【八大排序(三)】快速排序

❣博主主页: 33的博客❣ ▶️文章专栏分类:八大排序◀️ 🚚我的代码仓库: 33的代码仓库🚚 🫵🫵🫵关注我带你了解更多排序知识 目录 1.前言2.快速排序2.1概念2.2画图理解2.3递归代码实现2.3.1Hoare法2.3.2挖坑法2.3.3前…

vue3 引用虚拟键盘simple-keyboard

simple-keyboard官网地址&#xff1a;https://virtual-keyboard.js.org 目前实现效果图是&#xff08;实现数字、大小写字母键盘&#xff09;&#xff1a; 1.需要先安装simple-keyboard npm install simple-keyboard --save2.封装sinpleKeyboard 组件 <!-- keyboard-bo…

24深圳杯数学建模挑战赛A题6页初步思路+参考论文+保姆级答疑!!!

问题1:单个残骸的精确位置定位 建立数学模型&#xff0c;分析如果要精准确定空中单个残骸发生音爆时的位置坐标&#xff08;经度、纬度、高程&#xff09;和时间&#xff0c;至少需要布置几台监测设备&#xff1f;假设某火箭一级残骸分离后&#xff0c;在落点附近布置了7台监测…

聊聊 ASP.NET Core 中间件(一):一个简单的中间件例子

前言&#xff1a;什么是中间件 服务器在收到 HTTP 请求后会对用户的请求进行一系列的处理&#xff0c;比如检查请求的身份验证信息、处理请求报文头、检查是否存在对应的服务器端响应缓存、找到和请求对应的控制器类中的操作方法等&#xff0c;当控制器类中的操作方法执行完成…

02 spring-boot+mybatis+elementui 的登录,文件上传,增删改查的入门级项目

前言 主要是来自于 朋友的需求 项目概况 就是一个 学生信息的增删改查 然后 具体到业务这边 使用 mybatis xml 来配置的增删改查 后端这边 springboot mybatis mysql fastjson hutool 的一个基础的增删改查的学习项目, 简单容易上手 前端这边 node14 vue element…

【C++】初识string类

一、熟悉string类 1.1 string类的由来&#xff1a; C语音中的字符串需要我们自己管理底层空间&#xff0c;容易内存泄露。而C是面向对象语音&#xff0c;所以它把字符串封装成一个string类。 C中对于string的定义为&#xff1a;typedef basic_string string; 也就是说C中的str…

Microsoft Universal Print 与 SAP 集成教程

引言 从 SAP 环境打印是许多客户的要求。例如数据列表打印、批量打印或标签打印。此类生产和批量打印方案通常使用专用硬件、驱动程序和打印解决方案来解决。 Microsoft Universal Print 是一种基于云的打印解决方案&#xff0c;它允许组织以集中化的方式管理打印机和打印机驱…

【网站项目】个性化商铺系统

&#x1f64a;作者简介&#xff1a;拥有多年开发工作经验&#xff0c;分享技术代码帮助学生学习&#xff0c;独立完成自己的项目或者毕业设计。 代码可以私聊博主获取。&#x1f339;赠送计算机毕业设计600个选题excel文件&#xff0c;帮助大学选题。赠送开题报告模板&#xff…

int类型的取值范围(为什么负数比正数表示的范围多一位)

&#x1f381;个人主页&#xff1a;我们的五年 &#x1f50d;系列专栏&#xff1a;C语言基本概念 &#x1f337;追光的人&#xff0c;终会万丈光芒 目录 &#x1f3dd;1.int的基本概念&#xff1a; 空间大小&#xff1a; 有符号类型的表示形式&#xff1a; &#x1f3dd;2.…

【数据结构与算法】力扣 239. 滑动窗口最大值

题干描述 给你一个整数数组 nums&#xff0c;有一个大小为 k **的滑动窗口从数组的最左侧移动到数组的最右侧。你只可以看到在滑动窗口内的 k 个数字。滑动窗口每次只向右移动一位。 返回 滑动窗口中的最大值 。 示例 1&#xff1a; 输入&#xff1a; nums [1,3,-1,-3,5,3…

PPO 学习笔记

用PPO算法求解整个神经网络在迭代过程中的梯度问题 每走一步就会得到一个新的状态&#xff0c;把这个状态传到网络里面&#xff0c;会得到一个 action&#xff0c;执行这个 action 又会到达一个新状态 policy 中由状态 st 生成动作 at&#xff0c;生成的这个 at 是由整个网络的…

【MySQL】——用户和权限管理(二)

&#x1f4bb;博主现有专栏&#xff1a; C51单片机&#xff08;STC89C516&#xff09;&#xff0c;c语言&#xff0c;c&#xff0c;离散数学&#xff0c;算法设计与分析&#xff0c;数据结构&#xff0c;Python&#xff0c;Java基础&#xff0c;MySQL&#xff0c;linux&#xf…

PotatoPie 4.0 实验教程(33) —— FPGA实现摄像头视频图像叠加

链接直达 https://item.taobao.com/item.htm?ftt&id776516984361 什么是视频水印&#xff1f; 视频水印就是图像叠加&#xff0c;跟画中画&#xff0c;或者是OSD是一样的原理&#xff0c;都是在视频的行场数据流上进行替换操作&#xff0c;比如叠加可以直接用水印图的数…

【Python小练】求斐波那契数列第n个数

题目 输出斐波那契数列第n个数。 分析 首先我们要知道&#xff0c;斐波那契数列&#xff0c;这个数列从第三位开始等于前两个数的和&#xff0c;要知道数列第n个数&#xff08;n>2&#xff09;&#xff0c;就要知道其前两相的值&#xff0c;着就需要用到递归了。来看一下吧…

Java面试重点之反射机制

一、 反射是什么&#xff1f; 允许程序在运行时查询和操作对象的类型信息。通过反射&#xff0c;程序能够在运行时获取对象的类定义信息&#xff0c;如类的名称、方法、字段、注解等&#xff0c;并且可以动态地调用对象的方法或访问其字段&#xff0c;而无需在编译时具体知道对…