Flink SQL增量查询Hudi表

news2025/1/11 13:57:11

前言

前面总结了Spark SQL增量查询Hudi表和Hive增量查询Hudi表。最近项目上也有Flink SQL增量查询Hudi表的需求,正好学习总结一下。

官网文档

地址:https://hudi.apache.org/cn/docs/querying_data#incremental-query

参数

  • read.start-commit 增量查询开始时间 对于流读,如果不指定该值,默认取最新的instantTime,也就是流读默认从最新的instantTime开始读(包含最新的)。对于批读,如果不指定该参数,只指定read.end-commit,则实现时间旅行的功能,可查询历史记录
  • read.end-commit 增量查询结束时间 不指定该参数则默认读取到最新的记录,该参数一般只适用于批读,因为流读一般的需求是查询所有的增量数据
  • read.streaming.enabled 是否流读 默认false
  • read.streaming.check-interval 流读的检查时间间隔,单位秒(s),默认值60,也就是一分钟
    查询范围 [BEGIN_INSTANTTIME,END_INSTANTTIME],既包含开始时间又包含结束时间,对于默认值可参考上面的参数说明

版本

建表造数:

  • Hudi 0.9.0
  • Spark 2.4.5

我这里建表造数使用Hudi Spark SQL 0.9.0,目的是为了模拟项目上用Java Client和Spark SQL创建的Hudi表,以验证Hudi Flink SQL增量查询时是否兼容旧版本的Hudi表(大家没有这种需求的,可以使用任何方式正常造数)

查询

  • Hudi 0.13.0-SNAPSHOT
  • Flink 1.14.3 (增量查询)
  • Spark 3.1.2 (主要是为了使用Call Procedures命令查看commit信息)

建表造数

-- Spark SQL Hudi 0.9.0
create table hudi.test_flink_incremental (
  id int,
  name string,
  price double,
  ts long,
  dt string
) using hudi
 partitioned by (dt)
 options (
  primaryKey = 'id',
  preCombineField = 'ts',
  type = 'cow'
);

insert into hudi.test_flink_incremental values (1,'a1', 10, 1000, '2022-11-25');
insert into hudi.test_flink_incremental values (2,'a2', 20, 2000, '2022-11-25');
update hudi.test_flink_incremental set name='hudi2_update' where id = 2;
insert into hudi.test_flink_incremental values (3,'a3', 30, 3000, '2022-11-26');
insert into hudi.test_flink_incremental values (4,'a4', 40, 4000, '2022-12-26');

用show_commits看一下有哪些commits(这里查询用的是Hudi的master,因为show_commits是在0.11.0版本开始支持的,也可以通过使用hadoop命令查看.hoodie文件夹下的.commit文件)

call show_commits(table => 'hudi.test_flink_incremental');
20221205152736
20221205152723
20221205152712
20221205152702
20221205152650

Flink SQL创建Hudi内存表

CREATE TABLE test_flink_incremental (
  id int PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  price double,
  ts bigint,
  dt VARCHAR(10)
)
PARTITIONED BY (dt)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://cluster1/warehouse/tablespace/managed/hive/hudi.db/test_flink_incremental'
); 

建表时不指定增量查询相关的参数,我们在查询时动态指定,这样比较灵活。
动态指定参数方法,在查询语句后面加上如下形式的语句

/*+ 
options(
  'read.start-commit' = '20221205152723',
  'read.end-commit'='20221205152736'
) 
*/

批读

Flink SQL读Hudi有两种模式:批读和流读。默认批读,先看一下批读的增量查询

验证是否包含起始时间和默认结束时间

select * from test_flink_incremental 
/*+ 
options(
	'read.start-commit' = '20221205152723' --起始时间对应id=3的记录
) 
*/

结果包含起始时间,不指定结束时间默认读到最新的数据

id   name     price        ts                 dt
 4     a4      40.0      4000      dt=2022-12-26
 3     a3      30.0      3000      dt=2022-11-26

验证是否包含结束时间

select * from test_flink_incremental 
/*+ 
options(
	'read.start-commit' = '20221205152712',  --起始时间对应id=2的记录
    'read.end-commit'='20221205152723'       --结束时间对应id=3的记录
) 
*/

结果包含结束时间

id           name        price       ts                 dt
 3             a3        30.0      3000      dt=2022-11-26
 2   hudi2_update        20.0      2000      dt=2022-11-25

验证默认开始时间

这种情况是指定结束时间,但不指定开始时间,如果都不指定,则读表所有的最新版本的记录。

select * from test_flink_incremental 
/*+ 
options(
    'read.end-commit'='20221205152712'       --结束时间对应id=2的更新记录
) 
*/

结果:只查询end-commit对应的记录

id           name        price       ts                 dt
 2   hudi2_update        20.0      2000      dt=2022-11-25

时间旅行(查询历史记录)

验证是否可以查询历史记录,我们更新id为2的name,更新前name为a2,更新后为hudi2_update,我们验证一下,是否可以通过Flink SQL查询Hudi历史记录,逾期结果查出id=2,name=a2

select * from test_flink_incremental 
/*+ 
options(
    'read.end-commit'='20221205152702'       --结束时间对应id=2的历史记录
) 
*/

结果:可以正确查询历史记录

id           name        price       ts                 dt
 2             a2        20.0      2000      dt=2022-11-25

流读

开启流读的参数:

read.streaming.enabled = true

流读不需要设置结束时间,因为一般的需求是读所有的增量数据,我们只需要验证开始时间就好了

验证默认开始时间

select * from test_flink_incremental 
/*+ 
options(
    'read.streaming.enabled'='true',
    'read.streaming.check-interval' = '4'
) 
*/

结果:从最新的instantTime开始增量读取,也就是默认的read.start-commit为最新的instantTime

id   name     price        ts                 dt
 4     a4      40.0      4000      dt=2022-12-26

验证指定开始时间

select * from test_flink_incremental 
/*+ 
options(
    'read.streaming.enabled'='true',
    'read.streaming.check-interval' = '4',
    'read.start-commit' = '20221205152712'
) 
*/

结果:

id           name        price       ts                 dt
 2   hudi2_update        20.0      2000      dt=2022-11-25
 3             a3        30.0      3000      dt=2022-11-26
 4             a4        40.0      4000      dt=2022-11-26

如果想第一次查询全部的历史数据,可以将start-commit设置的早一点,比如设置到去年:‘read.start-commit’ = ‘20211205152712’

select * from test_flink_incremental 
/*+ 
options(
    'read.streaming.enabled'='true',
    'read.streaming.check-interval' = '4',
    'read.start-commit' = '20211205152712'
) 
*/
id           name        price       ts                 dt
 1             a1        10.0      1000      dt=2022-11-25
 2   hudi2_update        20.0      2000      dt=2022-11-25
 3             a3        30.0      3000      dt=2022-11-26
 4             a4        40.0      4000      dt=2022-11-26

验证流读的连续性

验证新的增量数据进来,是否可以持续消费Hudi增量数据,验证数据的准确一致性,为了方便验证,我可以使用Flink SQL增量流读Hudi表然后Sink到MySQL表中,最后通过读取MySQL表中的数据验证数据的准确性

Flink SQL读写MySQL需要配置jar包,将flink-connector-jdbc_2.12-1.14.3.jar放到lib下即可,下载地址:https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.12/1.14.3/flink-connector-jdbc_2.12-1.14.3.jar

先在MySQL中创建一张Sink表

-- MySQL
CREATE TABLE `test_sink` (
  `id` int(11),
  `name` text DEFAULT NULL,
  `price` int(11),
  `ts` int(11),
  `dt`  text DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Flink中创建对应的sink表

create table test_sink (
  id int,
  name string,
  price double,
  ts bigint,
  dt string
) with (
 'connector' = 'jdbc',
 'url' = 'jdbc:mysql://192.468.44.128:3306/hudi?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8',
 'username' = 'root',
 'password' = 'root-123',
 'table-name' = 'test_sink',
 'sink.buffer-flush.max-rows' = '1'
);

然后流式增量读取Hudi表Sink Mysql

insert into test_sink
select * from test_flink_incremental 
/*+ 
options(
    'read.streaming.enabled'='true',
    'read.streaming.check-interval' = '4',
    'read.start-commit' = '20221205152712'
) 
*/

这样会起一个长任务,一直处于running状态,我们可以在yarn-session界面上验证这一点

然后先在MySQL中验证一下历史数据的准确性

再利用Spark SQL往source表插入两条数据

-- Spark SQL
insert into hudi.test_flink_incremental values (5,'a5', 50, 5000, '2022-12-07');
insert into hudi.test_flink_incremental values (6,'a6', 60, 6000, '2022-12-07');

我们增量读取的间隔设置的4s,成功插入数据等待4s后,再在MySQL表中验证一下数据

发现新增的数据已经成功Sink到MySQL中了,并且数据没有重复

最后验证一下更新的增量数据,Spark SQL更新Hudi source表

-- Spark SQL
update hudi.test_flink_incremental set name='hudi5_update' where id = 5;

继续验证结果

结果是更新的增量数据也会insert到MySQL中的sink表,但是不会更新原来的数据

那如果想实现更新的效果呢?我们需要在MySQL和Flink的sink表中加上主键字段,两者缺一不可,如下:

-- MySQL
CREATE TABLE `test_sink` (
  `id` int(11),
  `name` text DEFAULT NULL,
  `price` int(11),
  `ts` int(11),
  `dt`  text DEFAULT NULL,
   PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- Flink SQL
create table test_sink (
  id int PRIMARY KEY NOT ENFORCED,
  name string,
  price double,
  ts bigint,
  dt string
) with (
 'connector' = 'jdbc',
 'url' = 'jdbc:mysql://192.468.44.128:3306/hudi?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8',
 'username' = 'root',
 'password' = 'root-123',
 'table-name' = 'test_sink',
 'sink.buffer-flush.max-rows' = '1'
);

将刚才起的长任务关掉,重新执行刚才的insert语句,先跑一下历史数据,最后再验证一下增量效果

-- Spark SQL
update hudi.test_flink_incremental set name='hudi6_update' where id = 6;
insert into hudi.test_flink_incremental values (7,'a7', 70, 7000, '2022-12-07');

可以看到,达到了预期效果,对于id=6的执行更新操作,对于id=7的执行插入操作。

相关阅读

  • Apache Hudi 入门学习总结
  • Flink SQL 客户端查询Hive配置及问题解决
  • Flink SQL操作Hudi并同步Hive使用总结
  • Flink SQL通过Hudi HMS Catalog读写Hudi并同步Hive表(强烈推荐这种方式)
  • Hudi Spark SQL总结
  • Spark SQL增量查询Hudi表
  • Hudi Spark SQL Call Procedures学习总结(一)(查询统计表文件信息)
  • Hive增量查询Hudi表

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/71008.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

WWW2022 | 基于领域增强的图对比协同过滤方法+代码实践

嘿,记得给“机器学习与推荐算法”添加星标今天跟大家分享一篇将对比学习应用于图协同过滤方法的文章,该论文发表于WWW2022会议上。其主要思想是在图神经网络协同过滤方法上应用了两种领域类型的对比学习方法,分别是显式的结构领域和隐式的语义…

TGK-Planner-前后端路径规划(基于梯度的后端无约束优化)

高速移动无人机的在线路径规划一直是学界当前研究的难点,引起了大量机器人行业的研究人员与工程师的关注。然而无人机的计算资源有限,要在短时间内规划出一条安全可执行的路径,这就要求无人机的运动规划算法必须轻型而有效。本文将介绍一种无…

electron-vue中报错 Cannot use import statement outside a module解决方案(亲测有效!!!)

错误: Cannot use import statement outside a module(不能在模块之外使用导入语句)。 原因: 安装的某个依赖包里使用了import语法,因为我们打包输出的是commonjs规范,所以不识别import语法而导致报错。 可以从 .electron-vue/w…

PrimoBurnerSDK蓝光刻录工具开发工具包

PrimoBurnerSDK蓝光刻录工具开发工具包 PrimoBurnerSDK是一个CD、DVD和蓝光刻录工具开发工具包。它还提供了一个全面灵活的API,用于快速轻松地实现各种燃烧/翻录替代方案。 PrimoBurner SDK for.NET的强大功能: 自2003年以来一直在发展的广泛使用的老式发…

比机器人还智能的数字孪生地下停车场监管系统!

现在的停车场管理大多采用人工或智能收费系统,两种方式都有一个弊端就是无法直接知晓停车场内部信息。 车驶入停车场只能自行寻找停车位,工作人员也只有走进停车场才能知晓停车场内部情况,无可避免造成很多麻烦。 停车场智慧监管系统结合数…

期货开户交易操作技巧

期货交易的时候需要有一些操作技巧,以及要注意一些操作上常见的错误。 个人建议刚刚开始交易的投资者期货交易的投资者,一定要多看慢做,首先要摒弃做这个会一夜暴富的想法。抱着个想法来的往往都会折戟沉沙,一去不复返了。所以我…

基于springboot+mybatis+mysql+vue中学生成绩管理系统

基于springbootmybatismysqlvue中学生成绩管理系统一、系统介绍二、功能展示1.登陆2.用户管理(管理员)3.班主任信息管理(管理员)4.教师信息管理(管理员、班主任)5.学生信息管理(管理员)6.成绩信息管理(管理员、班主任、…

一个人,仅30天!开发一款3D竞技足球游戏!他究竟经历了些什么?

今天,晓衡向大家推荐一款Coco Store 优质 3D足球竞技游戏 资源《足球快斗》玩法介绍:游戏为 7V7 足球竞技类玩法。玩家控制本队的一个球员(脚下高亮圆圈显示的是玩家),其他球员和守门员为电脑AI控制,期间可…

Jvm上如何运行其他语言?JSR223规范最详细讲解

一 在Java的平台里,其实是可以执行其他的语言的。包括且不仅限于jvm发展出来的语言。 有的同学可能会说,在java项目里执行其他语言,这不吃饱了撑着么,java体系那么庞大,各种工具一应俱全,放着好好的java不…

责任链模式在复杂数据处理场景中的实战

相信大家在日常的开发中都遇到过复杂数据处理和复杂数据校验的场景,本文从一线开发者的角度,分享了责任链模式在这种复杂数据处理场景下的实战案例,此外,作者在普通责任链模式的基础上进行了升级改造,可以适配更加复杂…

34_DAC原理及数模转换实验

目录 数模转换原理 DAC模块框图 事件选择控制数字模拟转换 DAC转换 DAC数据格式 选择DAC触发 DAC输出电压计算 硬件连接 DAC配置步骤 实验源码 数模转换原理 STM32的DAC模块(数字/模拟转换模块)是12位数字输入,电压输出型的DAC。DAC可以配置为8位或12位模式,也可以与…

linux安装nginx

1.nginx官网 http://nginx.org/en/download.html 下载安装包,如图所示下载nginx-1.23.2,并上传到指定目录:/usr/local/src/nginx 2.解压 tar -zxvf nginx-1.23.2.tar.gz3.安装nginx, cd /usr/local/src/nginx/nginx-1.23.2 该目录…

Titanic 泰坦尼克数据集 特诊工程 机器学习建模

以下内容为讲课时使用到的泰坦尼克数据集分析、建模过程,整体比较完整,分享出来,希望能帮助大家。部分内容由于版本问题,可能无法顺利运行。 Table of Contents 1 经典又有趣的Titanic问题1.1 目标1.2 解决方法1.3 项目目的2…

Vector-常用CAN工具 - CANoe入门到精通_03

NetWork Node 前面已经介绍了CANoe的基本情况、硬件环境搭建、CANoe软件环境配置,今天我们就来聊一下NetWork Node,在我们的测试工作中,大部分情况我们默认CANoe作为一个Client端,但是有些情况,我们需要实时监测被测件…

Akka 学习(四)Remote Actor

目录一 介绍1.1 Remote Actor1.2 适用场景1.3 踩坑点二 实战2.1 需求2.2 Java 版本2.2.1 效果图2.2.2 实体类2.2.3 服务端Actor 处理2.2.4 服务端配置文件2.2.5 客服端Actor处理2.2.6 客服端配置文件2.2.7 测试2.3 Scala 版本2.3.1 效果2.2.3 服务端Actor处理2.3.4 客户端Actor…

使用 Excel 数据透视表深入研究数据分析

问题 1(文章数据在底部) 为美国选民案例研究创建一个数据透视表,并用它来回答以下问题: A) 有多少个州的选民人口百分比低于 55%?哪些州? 答:有5个州的选民人数低于55%,分别是得克萨斯州、阿肯色州、俄克拉荷马州、夏威夷州和西弗吉尼亚州。 步骤:根据以下结果,创建…

基于jsp+java+ssm的社会保险信息管理系统-计算机毕业设计

项目介绍 课题研究的基本内容及预期目标或成果 用户注册与登录功能,在单位注册功能中有申请管理功能,填写具体信息。 系统管理员: 1)个人密码修改:实现了管理员用户密码信息的修改。 2)参保人员管理&a…

ORACE dbca创建报错Oracle system identifier(SID) “orcl“

最近项目需要通过备份恢复oracle实例,必须使用orcl,通过dbca创建实例是提示如下报错: 查看日志,$ORACLE_HOME/cfgtoollogs/dbca/dbcaui.log EVERE: [FATAL] A database instance with Oracle system identifier(SID) "orcl&…

零基础入门推荐系统 - 新闻推荐 - 实操2

内容导航: 零基础入门推荐系统 - 新闻推荐 - 实操2比赛数据分析:用户属性分析:训练集和测试集中分别有多少用户?用户城市分布有什么规律?平均每个用户会点击多少个文章?点击来源与文章点击次数是否存在关联?用户行为分析:零基础入…