flink使用StatementSet降低资源浪费

news2024/10/7 13:23:36

背景

项目中有很多ods层(mysql 通过cannal)kafka,需要对这些ods kakfa做一些etl操作后写入下一层的kafka(dwd层)。

一开始采用的是executeSql方式来执行每个ods→dwd层操作,即类似:

 def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
    val configuration: Configuration = tableEnv.getConfig.getConfiguration
 
    tableEnv.createTemporarySystemFunction("etl_handle", classOf[ETLFunction])
 
    // source/sink ddl
    tableEnv.executeSql(CREATE_DB_DDL)
    tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE1)
    tableEnv.executeSql(SINK_KAFKA_DWD_TABLE1)
    tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE2)
    tableEnv.executeSql(SINK_KAFKA_DWD_TABLE2)
    ....
 
    // insert dml,在insert语句中调用etl_handle进行预处理和写入
    tableEnv.executeSql(INSERT_DWD_TABLE1)
    tableEnv.executeSql(INSERT_DWD_TABLE2)
    ... 
}

当有多个ods->dwd操作放在同一个flink作业中时,发现这种方式会导致每次insert操作都是单独的DAG,非常消耗资源,特别是这些处理都是比较轻量级的,最好是能融合在同一个DAG中共享资源。

解决方案

查看flink文档:INSERT 语句 | Apache Flink

因此,可以采用statementset的方式,将不同insert sql进行分组执行,每组的insert sql会先被缓存到 StatementSet 中,并在StatementSet.execute() 方法被调用时,同一组的 insert sql(sink) 会被优化成一张DAG共用taskmanager,减少资源浪费,即类似:

def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
    val configuration: Configuration = tableEnv.getConfig.getConfiguration
 
    tableEnv.createTemporarySystemFunction("etl_handle", classOf[ETLFunction])
 
    // source/sink ddl
    tableEnv.executeSql(CREATE_DB_DDL)
    tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE1)
    tableEnv.executeSql(SINK_KAFKA_DWD_TABLE1)
    tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE2)
    tableEnv.executeSql(SINK_KAFKA_DWD_TABLE2)
    ....
 

     // insert dml
    tableEnv.createStatementSet()
      .addInsertSql(INSERT_DWD_TABLE1)
      .addInsertSql(INSERT_DWD_TABLE2)
      .addInsertSql(INSERT_DWD_TABLE3)
      .execute()
 
 
    tableEnv.createStatementSet()
      .addInsertSql(INSERT_DWD_TABLE4)
      .addInsertSql(INSERT_DWD_TABLE5)
      .addInsertSql(INSERT_DWD_TABLE6)
      .execute()
}

其他

如果是纯flink sql而不用data stream api,也是可以达到同样的效果的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1868246.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java洗鞋小程序预约系统源码

💥洗鞋神器来袭!轻松预约,让你的鞋子焕然一新👟 🎉 告别洗鞋烦恼,洗鞋预约小程序来啦! 你是不是常常为洗鞋而烦恼?手洗太累,送去洗衣店又贵又麻烦。现在,好…

什么是一维正态分布?

正态分布,也以高斯分布而被人熟知。换句话说,正态分布也称为高斯分布。我们都清楚正态分布是用于处理连续型数据的好工具,尤其是当我们的研究对象符合正态分布时。 对于一维正态分布这个名字我其实比较不认可。在英文中,它叫“Un…

电脑怎么保存图片?4个方法,快速保存!

在数字时代的浪潮中,我们与图片的关系愈发密切。从社交媒体上的精美瞬间,到工作项目中的专业图表,再到个人收藏夹里的珍贵回忆,图片已然成为我们生活与工作中不可或缺的一部分。 然而,你是否曾想过,这些看…

拿3个点差价,这家骑手外包公司一年收入近10亿……

最近,有这么一家名不经传的公司突然走进大众视野,因为其冲击资本市场的一份招股书,戳到了不少吃瓜群众的神经…… 这家公司名为博尔捷,前不久,其正式向港交所递交了招股书。招股书数据显示,这家公司2021年至…

【java】【控制台】【javaSE】 初级java家教管理系统控制台命令行程序项目

更多项目点击👆👆👆完整项目成品专栏 【java】【控制台】【javaSE】 初级java家教管理系统控制台命令行程序项目 获取源码方式项目说明:功能点数据库涉及到: 项目文件包含:项目运行环境 :截图其…

零点到两点,我部署了一个es

一开始的准备 实在是水平有限,Clash虚拟机网出不去,研究了LAN方案,还在咸鱼买了一单,搞不定,没辙,那我老老实实下载tar包得了,就不docker了 下载安装 直接官网给它安个es https://www.elasti…

js异常处理方案

文章目录 异常处理方案同步代码的异常处理Promise 的异常处理async await 的异常处理 感谢阅读,觉得有帮助可以点点关注点点赞,谢谢! 异常处理方案 在JS开发中,处理异常包括两步:先抛出异常,然后捕获异常。…

理解MySQL核心技术:外键的概念、作用和应用实例

引言 在数据库管理系统(DBMS)中,外键(Foreign Key)是维持数据一致性和实现数据完整性的重要工具。本文将详细介绍MySQL外键的基本概念、作用,以及相关的操作指南和应用实例,帮助读者掌握并灵活…

MHA、MMM高可用方案及故障切换

目录 一、MHA高可用方案 1、MHA的组成 2、MHA的工作原理 3、部署MHA架构 第一部分:一主两从数据库架构部署 1、全部更改主机名、初始化操作、开启mysql服务、设置主机名管理、时间同步 2、MySQL服务器做主从复制 3、测试主从效果 第二部分:MHA架…

mysql岗位实习----教务系统管理

教务管理系统 一、DDL CREATE TABLE users (user_id int(11) NOT NULL AUTO_INCREMENT COMMENT 用户ID,username varchar(50) NOT NULL COMMENT 用户名,password varchar(255) NOT NULL COMMENT 密码,gender enum(男,女) NOT NULL COMMENT 性别,email varchar(100) DEFAULT N…

Flutter 小技巧之为什么推荐 Widget 使用 const

今天收到这个问题,本来想着简单回复下,但是感觉这个话题又可以稍微展开讲讲,干脆就整理成一篇简单的科普,这样也能更方便清晰地回答这个问题。 聊这个问题之前,我们需要把一个“老生常谈”的概念拿出来说,那…

推荐一个AI导航网站和一篇文章:精益开发

第49期 AI 驿站 一个超级全面AI、的导航网站 https://www.51mskd.com/ “精益开发”的精益是什么? 最流行的软件开发模式,现在是“敏捷开发”(agile development)。 但是,很多人不知道,敏捷只是一种价值…

企业数据治理的下一步是数据资产管理?

随着信息技术的飞速发展和数字化转型的深入推进,企业数据已成为驱动业务增长和创新的核心要素。当企业数据治理工作取得显著成效后,如何进一步发挥数据的价值,实现数据资产的有效管理,成为企业面临的重要课题。 数据治理的基石作用…

倒计时日期 桌面倒数日 重要日期倒计时提醒

在工作、学习、生活中,我们往往会有很多重要的日子需要我们去标记。在工作中的季度考核、学习中的关键时间点、生活中的各种纪念日……等等,都需要我们去对未来这些重要的时间节点做一个倒计时提醒。 日期倒计时让我们对未来的时间,有一个非…

Kafka入门到精通(一)-安装Scala

Scala 简介 Scala 是 Scalable Language 的简写,意味着这种语言设计上支持大规模软件开发,是一门多范式的编程语言,Scala 语言是由 Martin Odersky 等人在 2003 年开发的,并于 2004 年首次发布。Scala 运行于 Java 平台&#xff0…

应用案例 | Panorama SCADA:开创性的铁路电气控制系统

案例概况 客户:英国铁路网运营商Network Rail 合作伙伴:Telent Technology Services Ltd 应用:实现对铁路牵引电网的高效管理与精准控制 应用产品:宏集Panorama E2 SCADA系统 一、应用背景 英国铁路网运营商Network Rail计划…

QT中子工程的创建,以及如何在含有库的子工程项目中引用主项目中的qt资源

1、背景 在qt中创建多项目类型,如下: CustomDll表示其中的一个动态库子项目; CustomLib表示其中的一个静态库子项目; MyWidget表示主项目窗口(main函数所在项目); 2、qrc资源的共享 如何在CustomDll和CustomLib等子项目中也同样使用 MyWidget项目中的qrc资源呢??? 直…

基于STM32的简易智能家居设计

一、项目功能概述 1、OLED显示温湿度、空气质量,并可以设置报警阈值 2、设置4个继电器开关,分别控制灯、空调、开关、风扇 3、设计一个离线语音识别系统,可以语音控制打开指定开关、并且可以显示识别命令词到OLED屏上 4、OLED实时显示&#…

【02-02】SpringMVC基于注解的应用

一、请求处理 1、常用注解 RequestMapping 作用:用来匹配客户端发送的请求(用来处理URL映射,将请求映射到处理方法中),可以在类或者方法上使用。 用在类上,可以将请求模块化,避免请求方法中的…

机器学习 中数据是如何处理的?

数据处理是将数据从给定形式转换为更可用和更理想的形式的任务,即使其更有意义、信息更丰富。使用机器学习算法、数学建模和统计知识,整个过程可以自动化。这个完整过程的输出可以是任何所需的形式,如图形、视频、图表、表格、图像等等&#…