Flink实时数仓同步:实时表实战详解

news2025/1/16 5:14:12

一、背景

在大数据领域,初始阶段业务数据通常被存储于关系型数据库,如MySQL。然而,为满足日常分析和报表等需求,大数据平台采用多种同步方式,以适应这些业务数据的不同存储需求。这些同步存储方式包括离线仓库和实时仓库等,选择取决于业务需求和数据特性。

一项常见需求是,业务使用人员需要大数据分析平台中实时查看业务表数据,示例如下:

  1. [Mysql] 业务数据 - 用户表全量数据:
idnamephonegendercreate_timeupdate_time
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom3332023-06-01 13:00:002023-06-01 13:00:00
  1. [Mysql] 2023-06-02 业务数据新增了一名用户,且更改了tom的手机号,此时表数据如下:
idnamephonegendercreate_timeupdate_time
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom4442023-06-01 13:00:002023-06-02 09:00:00
4tony5552023-06-02 10:00:002023-06-02 10:00:00

加粗为更新/新增数据

  1. [大数据平台] 2023-06-02日业务人员在大数据平台中查看用户表实时数据,期望数据和Mysql业务数据一致,如下:
idnamephonegendercreate_timeupdate_time
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom4442023-06-01 13:00:002023-06-02 09:00:00
4tony5552023-06-02 10:00:002023-06-02 10:00:00

根据上述需求,我们可以得出需要构建实时表以满足业务数据的实时分析需求。

二、技术架构

为了实现上述需求,我们可以利用实时同步任务将业务数据实时同步至下游的 MPP(Massively Parallel Processing)库,从而构建实时表。结合市场上常见的技术组件,本文选择了实时引擎 FlinkCDC 和 Doris(MPP)库作为实时同步技术架构。整体架构如下:

在这里插入图片描述

三、实现方式

FlinkCDC 提供了三种实现方式,具体如下:

  1. Flink run jar 模式: 这种模式适用于处理复杂的流数据。当使用简单的 Flink SQL 无法满足复杂业务需求时(例如拉链表等),可以通过编写自定义逻辑的方式,将其打包成 Jar 包并运行。以下是一个示例:
// 示例代码
public class MySqlSourceExample {
  public static void main(String[] args) throws Exception {
    // 配置数据源和处理逻辑
    ...
    
    // 实时任务启动
    env.execute("Print MySQL Snapshot + Binlog");
  }
}

更多信息:MysqlCDC connector

  1. sql脚本模式: bin/sql-client -f file ,这种模式适用于简单的流水任务,例如实时表同步等简单的 ETL 任务。你可以通过编写 SQL 文件并使用 Flink SQL 客户端执行,而无需编写额外的 Java 代码。以下是一个示例:
# 示例 mysql2doris SQL 文件
set 'execution.checkpointing.interval'='30000';

create table mysql_user( 
# ...
) WITH ( 
# ...
);

create table doris_user( 
# ...
) WITH ( 
# ...
);

insert into doris_user select * from mysql_user;

执行如下:

$> bin/sql-client.sh --file /usr/local/flinksql/mysql2doris

更多信息:FlinkSQL 客户端

  1. FlinkCDC Pipeline: 这是 FlinkCDC 3.0 版本引入的全新功能,旨在通过简单的配置即可实现数据同步,无需编写复杂的 Flink SQL。缺点是需要使用 Flink 版本 1.16 或更高版本。以下是一个示例:
# 示例配置文件
source:
   type: mysql
   name: MySQL Source
   hostname: 127.0.0.1
   port: 3306
   username: admin
   password: pass
   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
   server-id: 5401-5404

sink:
  type: doris
  name: Doris Sink
  fenodes: 127.0.0.1:8030
  username: root
  password: pass

pipeline:
   name: MySQL to Doris Pipeline
   parallelism: 4

执行如下:

$> bin/flink-cdc.sh mysql-to-doris.yaml

更多信息:FlinkCDC Pipeline

这三种方式各有优劣,可以根据具体需求和场景选择合适的实现方式。考虑到前几篇 Flink 实时数仓同步相关博客都采用了 Jar 包形式,为了给读者带来不同的体验,本文采用 sql脚本模式 模式来实现背景需求。

四、sql脚本模式 + 实时表实现

4.1、实时表设计

背景需求需要实时查看业务表数据,因此在Doris中设计表结构时采用了Unique数据模型。建表语句如下:

CREATE TABLE `example_user_real`
(
    `id` INT NOT NULL COMMENT '用户id',
    `name` STRING NULL COMMENT '用户昵称',
    `phone` STRING NULL COMMENT '手机号',
    `gender` CHAR(5) NULL COMMENT '用户性别',
    `create_time` DATETIMEV2(0) NULL COMMENT '用户注册时间',
    `update_time` DATETIMEV2(0) NULL COMMENT '用户更新时间'
) ENGINE=OLAP
UNIQUE KEY(`id`)
COMMENT '用户实时表'
DISTRIBUTED BY HASH(id) BUCKETS AUTO;

关于mysql type 转换 doris type 可参考 Doris 源码内置转换工具

4.2、实时同步逻辑

  1. 首先,由于实时流水表同步使用Flink-cdc读取关系型数据库,flink-cdc提供了四种模式: “initial”,“earliest-offset”,“latest-offset”,“specific-offset” 和 “timestamp”。本文使用的Flink-connector-mysq是2.3版本,这里简单介绍一下这四种模式:

    • initial (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。
    • earliest-offset:跳过快照阶段,从可读取的最早 binlog 位点开始读取
    • latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。
    • specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。
    • timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。
  2. 本文采用initial模式同步任务

  3. 编写mysql2doris SQL文件,这里需要注意的是类型转换:由于 mysql2doris 是 Flink SQL 文件,故需要将 mysql type -> flink type 以及 doris type -> flink type,示例如下:

set 'execution.checkpointing.interval'='30000';
set 'state.checkpoints.dir'='file:///home/finloan/flink-1.16.1/checkpoint/mysql2doris';

create table mysql_user(
`id` INT,
`name` STRING,
`phone` STRING,
`gender` CHAR(5),
`create_time` TIMESTAMP(0),
`update_time` TIMESTAMP(0),
PRIMARY KEY(id) NOT ENFORCED
) WITH ( 
'connector'='mysql-cdc',
'hostname'='10.185.163.177',
'port' = '80',
'username'='rouser',
'password'='123456',
'database-name' = 'database',
'table-name'='user'
);

create table doris_user(
`id` INT,
`name` STRING,
`phone` STRING,
`gender` STRING,
`create_time` TIMESTAMP(0),
`update_time` TIMESTAMP(0)
) WITH ( 
'password'='password',
'connector'='doris',
'fenodes'='11.113.208.103:8030',
'table.identifier'='database.user',
'sink.label-prefix'='唯一任务标识,每次启动都要唯一',
'username'='username' 
);

insert into doris_user select * from mysql_user;

类型转换参考:

Doris & Flink Column Type Mapping

Mysql CDC Data Type Mapping

  1. 执行命令如下:此时任务已经提交到flink 集群,本文中使用的是Flink-Cluster 模式而非yarn模式
$> ./sql-client.sh -f  ~/mysql2doris

Flink SQL> [INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 5c683fba8567e65509870a6db4e99fa5
  1. 登录flinkUi界面查看任务,如下所示:

在这里插入图片描述

  1. 此时Doris 数据如下:
idnamephonegendercreate_timeupdate_time
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom3332023-06-01 13:00:002023-06-01 13:00:00
  1. [Mysql]业务数据2023-06-02日新增了一名tony用户,且更改了tom的手机号,此时表数据如下:
idnamephonegendercreate_timeupdate_time备注
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom4442023-06-01 13:00:002023-06-02 09:00:00(手机号从333->444)
4tony5552023-06-02 10:00:002023-06-02 10:00:00(新增tony用户)
  1. 此时Doris 数据如下:
idnamephonegendercreate_timeupdate_time
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom4442023-06-01 13:00:002023-06-02 09:00:00
4tony5552023-06-02 10:00:002023-06-02 10:00:00

五、总结

本文介绍了实时数仓同步的实操案例,通过 FlinkCDC 和 Doris 实现了实时表的构建和数据同步。在实现过程中,尤其压迫注意数据类型的转换问题,以确保不同数据存储之间的兼容性。

此外要根据具体需求和场景,选择合适的实现方式,本文选择了 sql-client --f file 模式来实现实时表需求,旨在为读者提供了不同的实践体验。

六、相关资料

  • Doris 数据模型
  • Flink Doris Connector
  • FlinkCDC Pipeline
  • FlinkSQL 客户端
  • Flink Run jar 模式
  • Doris 源码内置转换工具
  • Doris & Flink Column Type Mapping
  • Mysql CDC Data Type Mapping

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1496935.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

鸿蒙NEXT实战开发:【截屏】

展示全屏截图和屏幕局部截图。通过[screenshot]模块实现屏幕截图 ,通过[window]模块实现隐私窗口切换,通过[display]模块查询当前隐私窗口。 效果预览 全屏截图局部截图选择区域局部截图 使用说明: 点击右上角图标打开弹窗,选…

vulhub中ThinkPHP 多语言本地文件包含漏洞复现

ThinkPHP是一个在中国使用较多的PHP框架。在其6.0.13版本及以前,存在一处本地文件包含漏洞。当多语言特性被开启时,攻击者可以使用lang参数来包含任意PHP文件。 虽然只能包含本地PHP文件,但在开启了register_argc_argv且安装了pcel/pear的环…

【C语言】glibc

一、获取源码 apt install glibc-source 在Debian系统中,通过apt install glibc-source命令安装的glibc源码通常会被放置在/usr/src/glibc目录下。安装完成后,可能需要解压缩该源码包。以下是解压缩源码包的步骤: 1. 打开终端。 2. 切换到源…

zipkin Access denied for user ‘xxx‘@‘xxx‘ (using password: NO)

Access denied : 拒绝访问 解决方案: 授权 登录mysql 之后执行命令 step 1 mysql -u username -p // username: 替换成你sql 用户名step2 授权 alter user usernameip identified with mysql_native_password by password // step 3 刷新 flush privileges;

每日一练 | 华为认证真题练习Day194

1、下面是路由器Huawei的部分输出配置,关于该部分配置描迷正确的是: [huawei] bgp 100 [huawei-bgp]peer 12.12.12.2 ip-prefix P1 export [huawei]ip-prefix P1 index 5 deny 10.0.0.0 0 greater-equal 8 less-equal 32 [huawei]ip-prefix P1 index 5 deny 172…

【wine】解决 0024:fixme:msctf:KeystrokeMgr_TestKeyUp STUB:(00A3D508)

故障日志 0024:fixme:msctf:KeystrokeMgr_TestKeyUp STUB:(00A3D508) AI分析 这些消息表示Wine对IE内核组件以及IME(Input Method Editor,输入法编辑器)的支持不完全。特别是涉及文本输入、拖放事件、属性变化通知等功能。 解决 winetrick…

redis最新版本在Windows系统上的安装

一、说明 这次安装操作主要是根据redis官网说明,一步步安装下来的,英语比较好的同学,可以直接看文章底部的超链接1,跳到官网按步操作即可。 目前redis的最新稳定版本为redis7.2。 二、Windows环境改造 Redis在Windows上不被官方…

MySQL基础-----函数

目录 前言 一、字符串函数 演示 案例 二、数值函数 演示 案例 三、日期函数 演示 案例 四、流程函数 演示 案例 前言 本期我们就开始MySQL中函数的学习。函数 是指一段可以直接被另一段程序调用的程序或代码。 也就意味着,这一段程序或代码在MySQL中 已经…

根据身高重建队列 用最少数量的箭引爆气球 无重叠区间

406.根据身高重建队列 力扣题目链接(opens new window) 假设有打乱顺序的一群人站成一个队列,数组 people 表示队列中一些人的属性(不一定按顺序)。每个 people[i] [hi, ki] 表示第 i 个人的身高为 hi ,前面 正好 有 ki 个身高…

【C++庖丁解牛】模版初阶

📙 作者简介 :RO-BERRY 📗 学习方向:致力于C、C、数据结构、TCP/IP、数据库等等一系列知识 📒 日后方向 : 偏向于CPP开发以及大数据方向,欢迎各位关注,谢谢各位的支持 目录 1. 泛型编程2. 函数模…

教师必备的办公AI工具

在当今这个信息爆炸的时代,教师的工作已不仅仅局限于传统的课堂教学。为了更好地管理学生、与家长沟通以及提高工作效率,教师们急需一些高效的办公工具。其中,基于AI技术的办公工具成为了教师们的首选。本文将重点介绍群发成绩小程序这一AI工…

Pygame教程05:帧动画原理+边界值检测,让小球来回上下运动

------------★Pygame系列教程★------------ Pygame教程01:初识pygame游戏模块 Pygame教程02:图片的加载缩放旋转显示操作 Pygame教程03:文本显示字体加载transform方法 Pygame教程04:draw方法绘制矩形、多边形、圆、椭圆、弧…

2024.3.5每日一题

LeetCode 到达目的地的方案数 题目链接:1976. 到达目的地的方案数 - 力扣(LeetCode) 题目描述 你在一个城市里,城市由 n 个路口组成,路口编号为 0 到 n - 1 ,某些路口之间有 双向 道路。输入保证你可以…

TDengine 资深研发分享解决思路,长查询不再成为系统性能瓶颈!

长查询问题指的是在数据库写入和查询并存的日常应用场景中,存在处理数据量大且耗时很长的查询长时间占用系统资源,导致写入可能被阻塞的问题。有时,查询代码对于资源释放函数调用的遗忘也可能以长查询问题的形式表现出来。如何在数据写入不被…

SCCM部署时出现的问题(二):找不到数据库路径

场景还原: 在部署SCCM服务器时,客户采用的是分开部署,即一台SCCM服务器和一台SQL Server服务器。 在搭建SCCM服务器时,我们指定了数据库实例,跳转到指定SQL Server数据文件和事务日志文件的位置时,出现了…

大学机器人专业相关课程太难了怎么办

这个问题其实有个更合适的提问角度。 {大学机器人专业相关课程太难了一点兴趣都没有怎么办} 个性化、差异化发展才是主流。 人工智能时代,学生再卷再拼,也干不过机器人啊…… 这个问题反馈非常普遍。 常规解释 大学课程其实想要理解并应用起来&#xff…

Salesforce 2024财年爆发式增长!第一次现金分红

对于Salesforce来说,这是非凡的转型之年,所有的关键指标都表现强劲,现金流和利润增长创下了纪录。截至第四季度末,Salesforce的剩余履约价值(RPO)总额为569亿美元,同比增长17%。 Marc Benioff …

NLP自然语言——基础

一、介绍 1、概念 NLP(Natural Language Processing,自然语言处理)是计算机科学领域以及人工智能领域的一个重要的研究方向,它研究用计算机来处理、理解以及运用人类语言(如中文、英文等),达到…

Error:java:JDK isn‘t specified for module “模块名称“

可能是创建模块后不小心删掉了.idea.或.idea出错 只要删除.idea,close project出去,重新进让idea自动下载

Flyway 9.22.3 + springboot3 + MySQL8.0+,简单使用

文章目录 flyway的依赖配置ieda 启动!!! 关于这篇文章主要是自己在使用flyway时遇到的一些问题以及最终的解决方法 当然包括所有的配置,主要目的是记录一下防止下次使用的时候忘记 flyway的依赖 这里 springboot 3 具体版本不再描…