Hudi系列11:Flink CDC 将MySQL的数据写入Hudi

news2024/11/20 18:42:41

文章目录

  • 一. 下载依赖包
  • 二. 源端数据准备
  • 三. 使用Flink cdc mysql连接器创建flinkSQL映射表
  • 四. 创建FlinkSQL Hudi连接器创建hudi表
  • 五. 将数据从CDC表插入hudi表
  • 六. 测试增删改
  • 参考:

一. 下载依赖包

将 flink-sql-connector-mysql-cdc-2.2.1.jar 下载后,上传到$FLINK_HOME/lib目录

二. 源端数据准备

use test;
DROP TABLE IF EXISTS mysql_cdc;

create table mysql_cdc
(
id          int NOT NULL AUTO_INCREMENT ,
name    varchar(100),
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='mysql cdc 表';

insert into mysql_cdc(id, name) values (1,'test1');
insert into mysql_cdc(id, name) values (2,'test2');
insert into mysql_cdc(id, name) values (3,'test3');
insert into mysql_cdc(id, name) values (4,'test4');
insert into mysql_cdc(id, name) values (5,'test5');
insert into mysql_cdc(id, name) values (6,'test6');
insert into mysql_cdc(id, name) values (7,'test7');
insert into mysql_cdc(id, name) values (8,'test8');
insert into mysql_cdc(id, name) values (9,'test9');
insert into mysql_cdc(id, name) values (10,'test10');
insert into mysql_cdc(id, name) values (11,'test11');
insert into mysql_cdc(id, name) values (12,'test12');
insert into mysql_cdc(id, name) values (13,'test13');
insert into mysql_cdc(id, name) values (14,'test14');
insert into mysql_cdc(id, name) values (15,'test15');
insert into mysql_cdc(id, name) values (16,'test16');
insert into mysql_cdc(id, name) values (17,'test17');
insert into mysql_cdc(id, name) values (18,'test18');
insert into mysql_cdc(id, name) values (19,'test19');

三. 使用Flink cdc mysql连接器创建flinkSQL映射表

代码:
这个地方的server-id我给了一个动态的端口,之前给静态的端口,总是报错:

io.debezium.DebeziumException: A slave with the same server_uuid/server_id as this slave has connect
set execution.checkpointing.interval=10sec;

CREATE TABLE flink_mysql_cdc5 (
    id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
    name varchar(100)
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'hp8',
    'port' = '3306',
    'username' = 'root',
    'password' = 'abc123',
    'database-name' = 'test',
    'table-name' = 'mysql_cdc',
    'server-id' = '5400-5408',
    'scan.incremental.snapshot.enabled'='true'
);

set sql-client.execution.result-mode=tableau;

select * from flink_mysql_cdc5;

测试记录:
image.png

四. 创建FlinkSQL Hudi连接器创建hudi表

代码:

CREATE TABLE flink_hudi_mysql_cdc5(
    id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
    name varchar(100)
  ) WITH (
   'connector' = 'hudi',
   'path' = 'hdfs://hp5:8020/tmp/hudi/flink_hudi_mysql_cdc5',
   'table.type' = 'MERGE_ON_READ',
   'changelog.enabled' = 'true',
   'hoodie.datasource.write.recordkey.field' = 'id',
   'write.precombine.field' = 'name',
   'compaction.async.enabled' = 'false'
);

五. 将数据从CDC表插入hudi表

insert into flink_hudi_mysql_cdc5 select * from flink_mysql_cdc5;

select * from flink_hudi_mysql_cdc5 ;

HDFS上也有数据:

六. 测试增删改

insert into mysql_cdc(id, name) values (20,'test20');
delete from mysql_cdc where id = 1;
update mysql_cdc set name ='test2-updated' where id = 2;
update mysql_cdc set name ='test3-updated' where id = 3;
delete from mysql_cdc where id = 4;

image.png

参考:

  1. https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html
  2. https://www.pudn.com/news/6228ca059ddf223e1ad0b87f.html
  3. https://zhuanlan.zhihu.com/p/479832928

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/191204.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

动手深度学习-欠拟合和过拟合

目录训练误差和泛化误差K-折交叉验证欠拟合和过拟合模型复杂性数据集大小权重衰减权重衰减简洁实现暂退法(Dropout)从零开始实现Dropout简洁实现参考教程:https://courses.d2l.ai/zh-v2/ 训练误差和泛化误差 训练误差(training …

5000字带你了解机房搬迁有哪些步骤?干货收藏!

机房搬迁不仅仅是把机房的设备迁移到新机房那么简单,而是要求网络系统的迁移和集中存储系统的迁移必须安全平稳,不能过长时间影响生产应用。表面上就是几个IT 民工的搬运,但实际是一项目高度集中的体力与脑力的综合项目。现将一般机房搬迁步骤…

基于纳什谈判理论的风–光–氢多主体能源系统合作运行方法(Matlab代码实现)

💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…

Go语言进阶和依赖管理(二)——并发和依赖管理

文章目录一、本文重点内容:二、详细知识点介绍:1、并发和并行并发:并行:结论:2、Go的协程协程:线程:3、协程通信方式一:使用通道交换数据方式二:使用共享内存完成数据交换…

SpringBoot 与 SpringCloud 有什么区别?

🏆今日学习目标: 🍀SpringBoot 与 SpringCloud 有什么区别? ✅创作者:林在闪闪发光 ⏰预计时间:30分钟 🎉个人主页:林在闪闪发光的个人主页 🍁林在闪闪发光的个人社区&am…

深拷贝,浅拷贝,引用拷贝有什么区别?

目录 引用拷贝 浅拷贝 深拷贝 深拷贝,浅拷贝,引用拷贝有什么区别? 引用拷贝 当我们向复制一个对象的时候,自然想到的就是赋值,直接赋值给另外一个变量,这种做法只是赋值了对象的地址,即两个变量现在指向的是同一个对象,任意一个对象操作这个属性都会影响到另外一个变量,这…

成为一名网络安全工程师,你应该学习什么?

前言 这是我的建议如何成为网络安全工程师,你应该按照下面顺序学习。 简要说明 第一件事你应该学习如何编程,我建议首先学python,然后是java。 (非必须)接下来学习一些算法和数据结构是很有帮助的,它将帮…

jQuery select三级联动

功能描述: 1 实现三级联动,ajax请求数据。 根据选定级别,查询该级别下的项目类别;根据选择类别,查询该级别类别下所属项目列表; 前端涉及知识点: (1)(‘#app’).change…

【云原生kubernetes】k8s控制器Deployment使用详解

前言 在上一篇我们聊了k8s中各种控制器的使用,本篇将以控制器中比较常用的一种控制器Deployment 进行详细的说明。 一、Deployment 简介 为了更好解决服务编排的问题,kubernetes在V1.2版本开始,引入了Deployment控制器; 需要说明…

47 转置卷积【动手学深度学习v2】】

47 转置卷积【动手学深度学习v2】】 深度学习学习笔记 学习视频:https://www.bilibili.com/video/BV17o4y1X7Jn/?spm_id_from333.1007.top_right_bar_window_history.content.click&vd_source75dce036dc8244310435eaf03de4e330 转置卷积 卷积不会增大输入的高…

【Python小游戏】99%的人都不知道,“猜数字”游戏这么玩才能快速胜出,少年,要不要来猜猜看啊~(附源码)

前言 日子从不亏欠,每一个努力向上的人, 未来的走运, 都是过往尽力的积累。 人勤春来早,奋进正当时。新春伊始,我们迎来了2023年开工第一天。 栗子同学恭祝大家开工大吉,新年新气象,万事开门红&#xff…

Spring事务案例:模拟银行转账

Spring事务案例:模拟银行转账一. 概念二. 原程序2.1 表:2.2 service层接口:2.3 dao层接口:2.4 service实现类:2.5 测试用例:三.使用事务改进3.1 开启注解式事务驱动:3.2 开启事务:3.…

python使用pptx库-从一个ppt复制页面到另一个ppt里面

python使用pptx库-从一个ppt复制页面到另一个ppt里面 作者:虚坏叔叔 博客:https://xuhss.com 早餐店不会开到晚上,想吃的人早就来了!😄 一、原理 如题,我有一个模板课件.pptx: 其内容&#xf…

百趣代谢组学文献分享:大麦盐胁迫响应机制的组学分析

前言 百趣代谢组学文献分享,我国受盐碱化危害耕地面积超过1.4亿亩,严重危险粮食安全和三农问题的解决。因此开发耐盐农作物并研究其耐盐机制具有迫在眉睫的重要意义。 代谢组学文献分享,浙江大学吴德志教授研究组最近发表的研究成果比较了耐…

Jmeter之界面语言设置

一、临时性设置中文 临时性设置:设置后只对本次使用有效,重启Jmeter后恢复默认语言。 选择Options—>Choose Language—>选择其他语言(例如:Chinese(Simplified)简体中文)设置成功。重启…

32 基变换和图像压缩

一、知识概要 本节主题是线性变换与矩阵的关联,从图像压缩与信号处理的应用引入,介绍几种方便的基向量:傅里叶,小波。最后从代数角度大体上介绍了基变换与变换矩阵的关系。 二、图像处理 首先我们假设有一个 512 * 512 的黑白图…

StarRocks斩获「2022 掘金引力榜」年度技术品牌传播案例 Top 10!

近日,由稀土掘金技术社区打造的「掘金引力榜」正式公布,由StarRocks社区举办的StarRocks Summit Asia 2022荣获「掘金引力榜 2022 年度技术品牌传播案例 Top10」!掘金是面向全球中文开发者的技术社区。「掘金引力榜」是由稀土掘金技术社区打造…

【MyBatis持久层框架】配置文件实现增删改查实战案例(下)

前言 前面我们学习了 MyBatis 持久层框架的原生开发方式和 Mapper 代理开发两种方式,解决了使用 JDBC 基础性代码操作数据库时存在的硬编码和操作繁琐的问题。 在配置文件实现增删改查上篇中,我们详细讲解了常用的查询操作,例如查询所有数据…

Spring Boot 项目 - API 文档搜索引擎

在线体验 : http://43.139.1.94:9090/index.html项目 Gitee 链接 : API 文档搜索引擎1.认识搜索引擎我们平时查百度, 搜狗的时候, 结果页会显示若干条相关结果 , 每个结果几乎都包含图片, 标题, 描述, 展示 URL以及时间等等.1.1 搜索引擎的本质输入一个查询词, 得到若干个结果,…

stm32学习笔记-1 STM32简介

1 STM32简介 [toc] 注:笔记主要参考 江科大自化协 教学视频“STM32入门教程-2023持续更新中”。 注:工程及代码文件放在了本人的Github仓库。 1.1 套件简介 本教程使用STM32最小系统板(STM32F103C8T6)面包板硬件平台进行学习。…