Flink实时数仓同步:切片表实战详解

news2025/1/15 6:43:27

一、背景

在大数据领域,初始阶段业务数据通常被存储于关系型数据库,如MySQL。然而,为满足日常分析和报表等需求,大数据平台采用多种同步方式,以适应这些业务数据的不同存储需求。

一项常见需求是,业务使用人员需要大数据分析平台中实时查看业务表中某一维度的相应数据数据,示例如下:

  1. [Mysql] 业务数据 - 假设我们有一个订单表(也称为事实表),记录了公司的销售订单信息。该表包含以下字段:订单ID、客户ID、产品ID、销售日期、销售数量和销售额等。:
订单ID客户ID产品ID销售日期销售数量销售额
1100120012022-01-013150
2100220022022-01-02280
3100320012022-01-03150
4100120032022-01-045250
5100220022022-01-054160
  1. [大数据平台] - 业务人员希望按照客户ID维度聚合销售数量和销售额,以便实时分析每个客户的销售情况,如下:
客户ID销售数量总计销售额总计
10018400
10026240
1003150
  1. [Mysql] 业务数据 - 新增了两条订单数据,如下:
订单ID客户ID产品ID销售日期销售数量销售额
1100120012022-01-013150
2100220022022-01-02280
3100320012022-01-03150
4100120032022-01-045250
5100220022022-01-054160
6100320012022-01-06150
7100420012022-01-06150

加粗为更新/新增数据

  1. [大数据平台] - 此时每个客户的销售情况,如下:
客户ID销售数量总计销售额总计
10018400
10026240
10032100
1004150

加粗为更新/新增数据

根据上述需求,我们可以得出需要构建实时切片表以满足业务数据的实时分析需求。

切片表也叫维度表,是根据基础表(事实表)某个维度或多个维度对事实数据进行汇总计算,并展示为一个交叉分析的表格。与事实表相比,切片表的数据更加聚合,只包含某些维度或者满足某些特定条件的数据。

二、技术架构

为了实现上述需求,我们可以利用实时同步任务将业务数据实时同步至下游的 MPP(Massively Parallel Processing)库,从而构建切片表。结合市场上常见的技术组件,本文选择了实时引擎 FlinkCDC 和 Doris(MPP)库作为实时同步技术架构。整体架构如下:

在这里插入图片描述

三、设计方案

从背景需求不难看出只需实现切片表即可满足需求,但是在flink + Mpp库中却可以有多种方案,可分为三种,具体如下:

3.1、FlinkCDC + FlinkSQL状态计算

该方案利用了FlinkCDC实时捕获业务数据,并在Flink内部进行有状态的计算,例如聚合查询等操作。这种方法依赖于Checkpoint分布式快照,确保精确一次性的处理。最终,计算得到的聚合结果会实时地下沉到下游MPP库中,使业务人员能够直接查询切片表数据。示例如下:

-- flink cdc 读取订单表
create table mysql_order( 
# ...
) WITH ( 
# ...
);

-- flink sql doris 
create table doris_order( 
# ...
) WITH ( 
# ...
);

-- flink sql 
insert into doris_order select 客户ID, sum(销售数量), sum(销售额) from mysql_order group by 客户ID;
  • 优点:
    • 实现了实时捕获和处理业务数据,保证了数据的准确性和实时性。
    • 利用了Flink的状态计算能力,使得处理逻辑更加灵活和高效。
  • 缺点:
    • 下游Doris表结构固定,无法灵活满足用户对不同维度的查询需求。

3.2、FlinkCDC + Doris Aggregate 模型

这种方法利用了Doris Aggregate聚合模型,实现了切片表的功能。在Doris Aggregate聚合模型中,数据会在每批次导入时进行内部聚合,从而无需上游有状态计算。只需将聚合后的数据下沉至Doris数据库即可。

以下是一个示例的Doris建表语句:

-- Doris aggregate 建表语句
CREATE TABLE IF NOT EXISTS example_db.example_order_agg
(
    `客户ID` LARGEINT NOT NULL COMMENT "客户ID",
    `销售数量总计` BIGINT SUM DEFAULT "0" COMMENT "销售数量总计",
    `销售额总计` BIGINT SUM DEFAULT "0" COMMENT "销售额总计"
)
AGGREGATE KEY(`客户ID`)
DISTRIBUTED BY HASH(`客户ID`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

更多信息:Doris Aggregate 模型

  • 优点:
    • 通过FlinkCDC实现了实时捕获和处理业务数据,确保了数据的准确性和实时性。
    • 利用了Doris aggregate模型进行聚合查询,将聚合压力下沉至下游。
  • 缺点:
    • 下游Doris表结构固定,无法灵活满足用户对不同维度的查询需求。

3.3、FlinkCDC + 实时表 + OLAP查询

这种方案充分利用了Doris的OLAP能力,只需建立一个实时表,业务人员便可根据需要自定义查询语句进行查询。

以下是一个示例的实现:

-- flink cdc 读取订单表
create table mysql_order( 
# ...
) WITH ( 
# ...
);

-- flink sql doris 
create table doris_order( 
# ...
) WITH ( 
# ...
);

-- flink sql 实时同步
insert into doris_order select * from mysql_order;

-- 业务人员查询
select 客户ID, sum(销售数量), sum(销售额) from doris_order group by 客户ID;

对于实时表的具体实现,可参考笔者另一篇文章:Flink实时数仓同步:实时表实战详解

  • 优点:
    • 利用 FlinkCDC 实现了实时捕获和处理业务数据,确保了数据的准确性和实时性。
    • 借助 Doris 的 OLAP查询能力,将聚合压力下沉至下游,提高了系统的性能和稳定性。
    • 无需固定 Doris 表结构,可以灵活满足用户对不同维度的查询需求。
  • 缺点:
    • 当数据量巨大时可能存在一定查询延迟问题。
    • 可能存在并发查询效率降低问题,需要合理规划和调整查询策略。

3.4、总结

针对不同的需求场景,我们需要选择最合适的实现方案。通常情况下,对于固定的聚合查询需求,比如定期汇总统计,FlinkCDC + Doris Aggregate 模型FlinkCDC + FlinkSQL状态计算 是更为合适的选择。而对于需要更灵活查询的情况,FlinkCDC + 实时表 则更加适用。

然而,最终的选择取决于具体的业务需求和场景特点。结合以上几种实现设计,笔者更倾向于 FlinkCDC + 实时表 这种方式。我已经在另一篇博客中详细描述了该实现方式:Flink实时数仓同步:实时表实战详解。

故本文将采用FlinkCDC + FlinkSQL有状态计算实现设计,旨在给读者带来不同的体验。

四、实现方式

设计方案确定后我们还需要考虑实现方式,FlinkCDC 提供了三种实现方式,具体如下:

  1. Flink run jar 模式: 这种模式适用于处理复杂的流数据。当使用简单的 Flink SQL 无法满足复杂业务需求时(例如拉链表等),可以通过编写自定义逻辑的方式,将其打包成 Jar 包并运行。以下是一个示例:
// 示例代码
public class MySqlSourceExample {
  public static void main(String[] args) throws Exception {
    // 配置数据源和处理逻辑
    ...
    
    // 实时任务启动
    env.execute("Print MySQL Snapshot + Binlog");
  }
}

更多信息:MysqlCDC connector

  1. sql脚本模式: bin/sql-client -f file ,这种模式适用于简单的流水任务,例如实时表同步等简单的 ETL 任务。你可以通过编写 SQL 文件并使用 Flink SQL 客户端执行,而无需编写额外的 Java 代码。以下是一个示例:
-- 示例 mysql2doris SQL 文件
set 'execution.checkpointing.interval'='30000';

create table mysql_order(
# ...
) WITH ( 
# ...
);

create table doris_order( 
# ...
) WITH ( 
# ...
);

insert into doris_order select 客户ID, sum(销售数量), sum(销售额) from mysql_order group by 客户ID;

执行如下:

$> bin/sql-client.sh --file /usr/local/flinksql/mysql2doris

更多信息:FlinkSQL 客户端

  1. FlinkCDC Pipeline: 这是 FlinkCDC 3.0 版本引入的全新功能,旨在通过简单的配置即可实现数据同步,无需编写复杂的 Flink SQL。缺点是需要使用 Flink 版本 1.16 或更高版本。以下是一个示例:
# 示例配置文件
source:
   type: mysql
   name: MySQL Source
   hostname: 127.0.0.1
   port: 3306
   username: admin
   password: pass
   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
   server-id: 5401-5404

sink:
  type: doris
  name: Doris Sink
  fenodes: 127.0.0.1:8030
  username: root
  password: pass

pipeline:
   name: MySQL to Doris Pipeline
   parallelism: 4

执行如下:

$> bin/flink-cdc.sh mysql-to-doris.yaml

更多信息:FlinkCDC Pipeline

这三种方式各有优劣,可以根据具体需求和场景选择合适的实现方式。考虑到前几篇 Flink 实时数仓同步相关博客都采用了 Jar 包形式,为了给读者带来不同的体验,本文采用 sql脚本模式 模式来实现背景需求。

五、FlinkCDC + FlinkSQL状态计算实现

5.1、Doris切片表设计

由于FlinkSQL完成聚合计算,因此在Doris中设计表结构时采用了Unique数据模型。建表语句如下:

CREATE TABLE `example_order_slice`
(
    `user_id` INT NOT NULL COMMENT '客户id',
    `sale_count` BIGINT NULL COMMENT '销售数量总计',
    `sale_total` BIGINT NULL COMMENT '销售金额总计'
) ENGINE=OLAP
UNIQUE KEY(`user_id`)
COMMENT '订单切片表'
DISTRIBUTED BY HASH(user_id) BUCKETS AUTO;

关于mysql type 转换 doris type 可参考 Doris 源码内置转换工具

5.2、实时同步逻辑

  1. 首先,由于实时流水表同步使用Flink-cdc读取关系型数据库,flink-cdc提供了四种模式: “initial”,“earliest-offset”,“latest-offset”,“specific-offset” 和 “timestamp”。本文使用的Flink-connector-mysq是2.3版本,这里简单介绍一下这四种模式:

    • initial (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。
    • earliest-offset:跳过快照阶段,从可读取的最早 binlog 位点开始读取
    • latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。
    • specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。
    • timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。
  2. 本文采用initial模式同步任务

  3. 编写mysql2doris SQL文件,这里需要注意的是类型转换:由于 mysql2doris 是 Flink SQL 文件,故需要将 mysql type -> flink type 以及 doris type -> flink type,示例如下:

set 'execution.checkpointing.interval'='30000';
set 'state.checkpoints.dir'='file:///home/finloan/flink-1.16.1/checkpoint/mysql2doris';

create table mysql_order(
                            `id` INT,
                            `user_id` INT,
                            `sale_id` INT,
                            `sale_time` TIMESTAMP(0),
                            `sale_quantity` BIGINT,
                            `sales_volume` BIGINT,
                            PRIMARY KEY(id) NOT ENFORCED
) WITH (
    'connector'='mysql-cdc',
    'hostname'='10.185.163.177',
    'port' = '80',
    'username'='rouser',
    'password'='123456',
    'database-name' = 'database',
    'table-name'='table'
);

create table doris_order(
        `user_id` INT,
        `sale_count` BIGINT,
        `sale_total` BIGINT
) WITH (
    'password'='password',
    'connector'='doris',
    'fenodes'='11.113.208.103:8030',
    'table.identifier'='database.table',
    'sink.label-prefix'='任务唯一标识,每次启动都要更换',
    'username'='username'
);

insert into doris_order select user_id, sum(sale_quantity), sum(sales_volume) from mysql_order group by user_id;

类型转换参考:

Doris & Flink Column Type Mapping

Mysql CDC Data Type Mapping

  1. 执行命令如下:此时任务已经提交到flink 集群,本文中使用的是Flink-Cluster 模式而非yarn模式
$> ./sql-client.sh -f  ~/mysql2doris

Flink SQL> [INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 5c683fba8567e65509870a6db4e99fa5
  1. 登录flinkUi界面查看任务,如下所示:

在这里插入图片描述

  1. 此时Doris 切片表数据如下:
user_idsale_countsale_total
10018400
10026240
1003150
  1. [Mysql]-业务数据新增了两条订单数据,如下:
订单ID客户ID产品ID销售日期销售数量销售额
1100120012022-01-013150
2100220022022-01-02280
3100320012022-01-03150
4100120032022-01-045250
5100220022022-01-054160
6100320012022-01-06150
7100420012022-01-06150
  1. 此时Doris 切片表数据如下:
user_idsale_countsale_total
10018400
10026240
10032100
1004150

六、总结

本文详细介绍了实时数仓同步中切片表的设计与实现。首先,分析了业务背景和需求,说明了切片表的作用和必要性。然后,介绍了基于 FlinkCDC 和 Doris 的技术架构,并比较了不同的设计方案。针对不同的需求场景,提出了三种具体的实现方案:FlinkCDC + FlinkSQL状态计算、FlinkCDC + Doris Aggregate 模型以及 FlinkCDC + 实时表,并分析了它们的优缺点。最后,为了给读者带来不同体验选择了 FlinkCDC + FlinkSQL状态计算 方案进行实现,并详细介绍了实时同步逻辑和相关的技术细节。

通过本文的阅读,读者可以了解到实时数仓同步中切片表的设计与实现方法,以及不同方案的选择和比较。同时,本文还提供了相关资料和参考链接,方便读者进一步深入学习和研究。

七、相关资料

  • Flink实时数仓同步:实时表实战详解
  • Doris Aggregate 模型
  • Flink Doris Connector
  • FlinkCDC Pipeline
  • FlinkSQL 客户端
  • Flink Run jar 模式
  • Doris 源码内置转换工具
  • Doris & Flink Column Type Mapping
  • Mysql CDC Data Type Mapping

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1497137.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

潜水耳机哪个牌子好?认准这几个游泳耳机品牌就对了!

在科技日益发达的今天,人们对于运动设备的需求也在不断提升。作为一项独特的水上运动,潜水爱好者们对耳机的要求也越来越高。一款优秀的潜水耳机不仅能够提供卓越的防水性能和舒适度,还必须具备出色的音质。那么,在众多品牌中&…

C语言进阶—表达式求值

隐式类型转换&#xff1a; C 的整型算术运算总是至少以缺省(默认)整型类型的精度来进行的。 为了获得这个精度&#xff0c;表达式中的字符和短整型操作数在使用之前被转换为普通整型&#xff0c;这种转换称为整型提升。 #include <stdio.h>int main() {char c 1;printf(…

鸿蒙Harmony应用开发—ArkTS声明式开发(自定义手势判定)

为组件提供自定义手势判定能力。开发者可根据需要&#xff0c;在手势识别期间&#xff0c;决定是否响应手势。 说明&#xff1a; 从API Version 11开始支持。后续版本如有新增内容&#xff0c;则采用上角标单独标记该内容的起始版本。 onGestureJudgeBegin onGestureJudgeBegi…

C#与python交互(flask发送Get/Post请求)

先运行python&#xff0c;再运行C# **ps: 注意修改端口号**python发送Get/Post请求 # -*- coding: utf-8 -*- # Time : 2024/1/25 15:52 # Author : YY # File : post_test.py # Content&#xff1a;提交数据给客户端 from flask import Flask, request, jsonify, redirect…

Vue系列-环境快速搭建

vue环境快速搭建 演示视频 快速搭建Vue开发环境pnpm和yarn 1. 基本信息 作者: GMCY系列: Vue仓库: GitHub | Gitee话题(GitHub): tools \ vue创建时间: 2024/03/02 2. 介绍 功能 批处理文件vue 环境的快速搭建nodejs, npm, pnpm, yarn 自动 下载安装npm, pnpm, yarn 自动 …

计网:HTTPS协议详解

1、HTTP 与 HTTPS 有哪些区别&#xff1f;​​​ HTTP以明文方式传输数据&#xff0c;不提供任何加密。如果攻击者截取了传输报文&#xff0c;就可以直接读取其中的信息。HTTPS利用SSL/TLS加密数据包&#xff0c;报文以密文方式传输。 HTTP 连接建立相对简单&#xff0c; TCP …

【办公类-22-08】周计划系列(3-3)“信息窗+主题知识(上传+打印)” (2024年调整版本)

作品展示 背景需求&#xff1a; 前文将信息窗主题知识的内容提取并优化结构 【办公类-22-07】周计划系列&#xff08;3-1&#xff09;“信息窗主题知识&#xff08;提取&#xff09;” &#xff08;2024年调整版本&#xff09;-CSDN博客文章浏览阅读803次&#xff0c;点赞7次…

新一代WLAN解决方案与WLAN的配置实现

案例背景为二层旁挂式组网&#xff0c;转发方式为直接转发&#xff0c;管理Vlan为100&#xff0c;业务Vlan为101。 基本配置&#xff1a; SW1&#xff1a; [SW1]VLAN batch 100 101 [SW1-GigabitEthernet0/0/1]port link-type trunk [SW1-GigabitEthernet0/0/1]port trunk a…

SQL 初级

SQL 初级 SQL 简介 SQL (Structured Query Language:结构化查询语言) 是用于管理关系数据库管理系统&#xff08;RDBMS&#xff09;。 SQL 的范围包括数据插入、查询、更新和删除&#xff0c;数据库模式创建和修改&#xff0c;以及数据访问控制。 SQL 是什么&#xff1f; SQL…

【Linux C | 网络编程】多播的概念、多播地址、UDP实现多播的C语言例子

&#x1f601;博客主页&#x1f601;&#xff1a;&#x1f680;https://blog.csdn.net/wkd_007&#x1f680; &#x1f911;博客内容&#x1f911;&#xff1a;&#x1f36d;嵌入式开发、Linux、C语言、C、数据结构、音视频&#x1f36d; &#x1f923;本文内容&#x1f923;&a…

提高生产效率!虹科MSR165快速检测机器故障,实现精准优化

如今&#xff0c;各种生产机器、机床和运输机都采用了复杂的驱动技术&#xff0c;以便在三个轴上准确生成线性运动或者高效旋转运动。所有机械运动中都会出现特征性的振动或震动模式&#xff0c;可以在该背景下利用这些模式来监测和优化整个驱动技术的机电参数。在这个过程中&a…

太阳能光伏电池的simulink建模与仿真

目录 1.课题概述 2.系统仿真结果 3.核心程序与模型 4.系统原理简介 4.1 光伏电池的基本结构 4.2 光伏电池的工作原理 5.完整工程文件 1.课题概述 太阳能光伏电池的simulink建模与仿真.分析不同光照温度&#xff0c;光照强度下的光伏电池的U-I特性曲线以及P-V特性曲线。 …

备战蓝桥(模板篇)

扩展欧德里几算法 质数筛 分解质因数 LCA BFS floyd Dijkstra prime 日期是否合法 Tire异或 模拟散列表 字符哈希 Tire字符串统计

官方教程 | 在 OpenBayes 平台进行组织协作

想和好 homie 共享账户余额、存储、数据集、模型、容器等资源&#xff0c;又不想共享自己的账户密码&#xff1f; 想跟团队成员分工协作、高效 Coding、加速炼丹&#xff0c;又想隔离权限、差异化管理&#xff1f; 经过为期半年的内测和完善&#xff0c;OpenBayes贝式计算的组织…

平面纯弯梁单元Matlab有限元编程 |欧拉梁单元| 简支梁|悬臂梁|弯矩图 |变形图| Matlab源码 | 视频教程

专栏导读 作者简介&#xff1a;工学博士&#xff0c;高级工程师&#xff0c;专注于工业软件算法研究本文已收录于专栏&#xff1a;《有限元编程从入门到精通》本专栏旨在提供 1.以案例的形式讲解各类有限元问题的程序实现&#xff0c;并提供所有案例完整源码&#xff1b;2.单元…

Ajax、Axios、Vue、Element与其案例

目录 一.Ajax 二.Axios 三.Vue 四.Element 五.增删改查案例 一.依赖&#xff1a;数据库&#xff0c;mybatis&#xff0c;servlet&#xff0c;json-对象转换器 二.资源&#xff1a;elementvueaxios 三.pojo 四.mapper.xml与mapper接口 五.service 六.servlet 七.html页…

目前研一,是选 FPGA 还是 Linux 嵌入式?

目前研一&#xff0c;是选 FPGA 还是 Linux 嵌入式? 在开始前我有一些资料&#xff0c;是我根据网友给的问题精心整理了一份「Linux 的资料从专业入门到高级教程」&#xff0c; 点个关注在评论区回复“888”之后私信回复“888”&#xff0c;全部无偿共享给大家&#xff01;&a…

Sora:AI视频生成的新机遇与挑战

随着科技的飞速进步&#xff0c;人工智能&#xff08;AI&#xff09;和机器学习&#xff08;ML&#xff09;技术已经深入渗透到社会的各个领域。其中&#xff0c;Sora这类基于AI的视频生成工具因其高度逼真的生成能力而备受瞩目。然而&#xff0c;正如一枚硬币有两面&#xff0…

力扣刷题Days11第二题--141. 环形链表(js)

目录 1,题目 2&#xff0c;代码 2.1快慢指针 2.2&#xff0c;哈希表 3&#xff0c;学习与总结 3.1自己尝试写快慢指针 反思 1,题目 给你一个链表的头节点 head &#xff0c;判断链表中是否有环。 如果链表中有某个节点&#xff0c;可以通过连续跟踪 next 指针再次到达&…

蓝牙APP开发实现汽车遥控钥匙解锁汽车智能时代

在现代社会&#xff0c;随着科技的不断发展&#xff0c;汽车已经不再是简单的交通工具&#xff0c;而是与智能科技紧密相连的载体。其中&#xff0c;通过开发APP蓝牙程序实现汽车遥控钥匙成为了一种趋势&#xff0c;为车主带来了便捷与安全的体验。虎克技术公司作为行业领先者&…