Fink CDC数据同步(六)数据入湖Hudi

news2025/1/10 16:38:13

数据入湖Hudi

Apache Hudi(简称:Hudi)使得您能在hadoop兼容的存储之上存储大量数据,同时它还提供两种原语,使得除了经典的批处理之外,还可以在数据湖上进行流处理。这两种原语分别是:

  • Update/Delete记录:Hudi使用细粒度的文件/记录级别索引来支持Update/Delete记录,同时还提供写操作的事务保证。查询会处理最后一个提交的快照,并基于此输出结果。
  • 变更流:Hudi对获取数据变更提供了一流的支持:可以从给定的时间点获取给定表中已updated/inserted/deleted的所有记录的增量流,并解锁新的查询姿势(类别)。

配置

将hudi相关jar包放在flink安装目录的lib下

hudi-flink1.16-bundle-0.13.0.jar

hudi-hadoop-mr-0.13.0.jar

hudi-hive-sync-0.13.0.jar

确保/etc/profile配置了hadoop和hive的环境变量

#HADOOP_HOME
export HADOOP_HOME=/usr/hdp/3.1.5.0-152/hadoop
export HADOOP_CONF_DIR=/usr/hdp/3.1.5.0-152/hadoop/etc/hadoop
export HADOOP_COMMON_HOME=/usr/hdp/3.1.5.0-152/hadoop
export HADOOP_HDFS_HOME=/usr/hdp/3.1.5.0-152/hadoop
export HADOOP_YARN_HOME=/usr/hdp/3.1.5.0-152/hadoop
export HADOOP_MAPRED_HOME=/usr/hdp/3.1.5.0-152/hadoop
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CLASSPATH=`hadoop classpath`

#HIVE HOME
export HIVE_HOME=/usr/hdp/3.1.5.0-152/hive
export PATH=$PATH:$HIVE_HOME/bin:$HIVE_HOME/sbin

测试插入hudi表

set sql-client.execution.result-mode = tableau;
set execution.checkpointing.interval=30sec;
SET table.sql-dialect=default;

CREATE TABLE hudi_test(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',  -- 连接器指定hudi
  'path' = 'hdfs://bigdata101:8020/hudi/hudi_test',  -- 数据存储地址
  'table.type' = 'MERGE_ON_READ' -- 表类型,默认COPY_ON_WRITE,可选MERGE_ON_READ
);

INSERT INTO hudi_test VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

MySql数据写入Hudi表

建hudi表

create table hudi_user(
    id string not null,
    name string,
    birth string,
    gender string,
    primary key (id) not enforced
)
with (
    'connector' = 'hudi',
    'path' = 'hdfs://bigdata101:8020/hudi/hudi_user',
    'table.type' = 'MERGE_ON_READ',
    'write.option' = 'bulk_insert',
    'write.precombine.field' = 'id'
);

将MySql映射表的数据插入hudi表,此时会生成一个flink任务

insert into ods.hudi_user select * from mysql_user;

流式查询

上面的查询方式是非流式查询,流式查询会生成一个flink作业,并且实时显示数据源变更的数据。

流式查询(Streaming Query)需要设置read.streaming.enabled = true。再设置read.start-commit,如果想消费所有数据,设置值为earliest。

使用参数如下:

参数名称

是否必填

默认值

备注

read.streaming.enabled

FALSE

FALSE

设置为true,开启stream query

read.start-commit

FALSE

the latest commit

Instant time的格式为:’yyyyMMddHHmmss’

read.streaming_skip_compaction

FALSE

FALSE

是否不消费compaction commit,消费compaction commit会出现重复数据

clean.retain_commits

FALSE

10

当开启change log mode,保留的最大commit数量。如果checkpoint interval为5分钟,则保留50分钟的change log

建表:

create table hudi_user_read_streaming(
    id int not null ,
    name string,
    birth string,
    gender string,
    primary key (id) not enforced
)
with (
    'connector' = 'hudi',
    'path' = 'hdfs://bigdata101:8020/hudi/hudi_user',
    'table.type' = 'MERGE_ON_READ',
    'write.option' = 'bulk_insert',
    'write.precombine.field' = 'id',
    'read.streaming.enabled' = 'true',  -- 默认值false,设置为true,开启stream query
    'read.start-commit' = '20231008134557', -- start-commit之前提交的数据不显示,
    'read.streaming.check-interval' = '4'  -- 检查间隔,默认60s

);


insert into hudi_user_read_streaming select * from mysql_user;

select * from hudi_user_read_streaming;

此时,执行select 语句就会生成一个flink 作业

源端变更数据会实时展示出来


 

 系列文章

Fink CDC数据同步(一)环境部署icon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017355?spm=1001.2014.3001.5502​​​​​​​
Fink CDC数据同步(二)MySQL数据同步icon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017472?spm=1001.2014.3001.5501
Fink CDC数据同步(三)Flink集成Hiveicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017571?spm=1001.2014.3001.5501
Fink CDC数据同步(四)Mysql数据同步到Kafkaicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023747?spm=1001.2014.3001.5501
Fink CDC数据同步(五)Kafka数据同步Hiveicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023837?spm=1001.2014.3001.5501

Fink CDC数据同步(六)数据入湖Hudiicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023939?spm=1001.2014.3001.5502

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1433767.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

什么?AI居然能拜年!!!

一、AI拜年的意义和背景 AI拜年作为新年祝福的一种形式,是利用人工智能技术为传统的拜年文化注入现代元素。在当今移动互联网时代,AI拜年的出现具有重要意义,它既延续了传统的节日习俗,又利用了先进的科技手段,为人们…

洛谷问题美国血统 American Heritage、新二叉树题解(关于二叉树的遍历问题)

目录 一.美国血统 American Heritage 二.新二叉树 一.美国血统 American Heritage P1827 [USACO3.4] 美国血统 American Heritage - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) 题目描述 农夫约翰非常认真地对待他的奶牛们的血统。然而他不是一个真正优秀的记帐员。他把他…

MVC、MVP、MVVM三种模式的介绍及区别(代码例子说明)

目录 一、MVC(Model-View-Controller) 二、MVP(Model-View-Presenter) 三、MVVM(Model-View-ViewModel) 四、几种架构模式的比较 五、例子演示 前言 ”软件系统的架构将系统描述为计算机组件及组件之间…

基于A-Star搜索算法的迷宫小游戏的设计

这篇文章是作者人工智能导论课的大作业,发出来供大家学习参考(有完整代码)。想要论文WORD文件的可以在本文资源处下载(可能还在审核)。 摘要: 本文章聚焦于基于A-Star搜索算法的迷宫小游戏设计,…

蓝桥杯Web应用开发-CSS 基础语法3(文本属性)

CSS 基础语法-文本属性 专栏持续更新中 文本属性 文本属性用于定义文本的样式,通过文本属性,可以改变文本的颜色、字间距、对齐方式、文本修饰和文本缩进等。常用文本属性如下表所示: 属 性可取值描述line-heightnormal、number、length、…

vulhub中Apache APISIX Dashboard API权限绕过导致RCE(CVE-2021-45232)

Apache APISIX是一个动态、实时、高性能API网关,而Apache APISIX Dashboard是一个配套的前端面板。 Apache APISIX Dashboard 2.10.1版本前存在两个API/apisix/admin/migrate/export和/apisix/admin/migrate/import,他们没有经过droplet框架的权限验证&…

【Java报错】显示错误“Error:java: 程序包org.springframework.boot不存在“

使用idea运行项目,显示错误信息如下: 原因是:idea配置的maven加载不到autoconfigure。 解决方案一: 第6步绕过证书语句如下: -Dmaven.wagon.http.ssl.insecuretrue -Dmaven.wagon.http.ssl.allowalltrue 打开终端&am…

C++一维数组

个人主页:PingdiGuo_guo 收录专栏:C干货专栏 铁汁们大家好呀,我是PingdiGuo_guo,今天我们来学习一下数组(一维)。 文章目录 1.数组的概念与思想 2.为什么要使用数组 3.数组的特性 4.数组的操作 1.定义…

浅谈QT的几种线程的使用和区别。

简介: 线程是操作系统中的基本执行单元,是一个独立的执行路径。每个线程都有自己的栈空间,用于存储本地变量和函数调用的上下文。多个线程可以在同一进程中并发执行,从而实现并发处理,提高程序的性能和响应能力。 与进…

【Docker】入门到精通(常用命令解读)

一、准备工作 1.配置Docker的yum库 首先要安装一个yum工具 yum install -y yum-utils安装成功后,执行命令,配置Docker的yum源: yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo2.安装Docker 执…

Go语言深度解析:探索 crypto/md5 标准库的强大功能

Go语言深度解析:探索 crypto/md5 标准库的强大功能 引言Go语言和MD5的基础知识MD5算法简介Go语言概述Go中的MD5实现 crypto/md5 库的使用方法基本用法处理大型数据安全注意事项 实际案例分析示例1:文件的MD5校验示例2:网络数据的MD5哈希示例3…

第9章 安全漏洞、威胁和对策(9.11-9.16)

9.11 专用设备 专用设备王国疆域辽阔,而且仍在不断扩张。 专用设备是指为某一特定目的而设计,供某一特定类型机构使用或执行某一特定功能的任何设备。 它们可被看作DCS、物联网、智能设备、端点设备或边缘计算系统的一个类型。 医疗设备、智能汽车、…

154基于matlab的二维元胞自动机模拟森林火灾(生命游戏 )和模拟收费站交通流

基于matlab的二维元胞自动机模拟森林火灾(生命游戏 )和模拟收费站交通流。全国大学生美国建模竞赛,程序已调通,可直接运行。 154 元细胞自动机 森林起火 收费站交通 (xiaohongshu.com)

大人不华,君子务实|复旦大学-华盛顿大学EMBA C18班优秀学生代表周靖毕业典礼演讲

周靖      Ecarx集团首席财务官      亲爱的Mazzeo院长、殷院长、尊敬的各位老师们、家人们以及各位同学们和毕业生们:      今天,当我们站在开启人生新篇章的分岔路,真是五味杂陈,感慨万千。对我们而言,能…

Day 35 | 贪心 860.柠檬水找零 、 406.根据身高重建队列 、 452. 用最少数量的箭引爆气球

860.柠檬水找零 题目 文章讲解 视频讲解 思路&#xff1a;分别列出三种支付方式对应的找零情况 class Solution {public boolean lemonadeChange(int[] bills) {int five 0, ten 0, twenty 0;for (int i 0; i < bills.length; i) {if (bills[i] 5) {five;} else if…

undefined symbol: avio_protocol_get_class, version LIBAVFORMAT_58

rv1126上进行编译和在虚拟机里面进行交叉编译ffmpeg都不行 解决办法查看 查看安装的ffmpeg链接的文件 ldd ./ffmpeg rootEASY-EAI-NANO:/home/nano/ffmpeg-4.3.6# ldd ffmpeg linux-vdso.so.1 (0xaeebd000)libavdevice.so.58 > /lib/arm-linux-gnueabihf/libavde…

防火墙是什么?聊聊如何轻松缓解应用漏洞

数字经济时代&#xff0c;也是应用爆炸的时代。企业越来越多地使用分布式应用架构构建现代微服务&#xff0c;以适应日益增长的应用使用量并提供更高的性能。与此同时却出现许多热点**&#xff0c;如供应链安全、零日漏洞、数据泄露等。忽视安全防护的企业会面临丢失业务的风险…

npm install express -g报错或一直卡着,亲测可解决

问题描述&#xff1a; 最近学习vue3前端框架&#xff0c;安装Node.js之后&#xff0c;在测试是否可行时&#xff0c;cmd窗口执行了&#xff1a;npm install express -g&#xff0c;发现如下图所示一直卡着不动&#xff0c;最后还报错了&#xff0c;网上找了好久&#xff0c;各…

操作系统基础:文件系统基础【下】

&#x1f308;个人主页&#xff1a;godspeed_lucip &#x1f525; 系列专栏&#xff1a;OS从基础到进阶 ⚔️1 文件的基本操作⚖️1.1 总览⚖️1.2 几种基本操作&#x1f52d;1.2.1 创建文件&#x1f52d;1.2.2 删除文件&#x1f52d;1.2.3 打开文件&#x1f52d;1.2.4 关闭文件…

python算法与数据结构---动态规划

动态规划 记不住过去的人&#xff0c;注定要重蹈覆辙。 定义 对于一个模型为n的问题&#xff0c;将其分解为k个规模较小的子问题&#xff08;阶段&#xff09;&#xff0c;按顺序求解子问题&#xff0c;前一子问题的解&#xff0c;为后一子问题提供有用的信息。在求解任一子…