文章目录
基于Flink+Hologres搭建实时数仓
一、使用示例
二、方案架构
1、架构优势
2、Hologres核心优势
三、实践场景
四、项目准备
1、创建阿里云账号AccessKey
2、准备MySQL数据源
五、构建实时数仓编辑
1、管理元数据
2、构建ODS层
2.1、创建CDAS同步作业ODS
2.2、查看MySQL同步到Hologres的3张表数据
3、构建DWD层
3.1、创建DWD层宽表
3.2、实现实时消费ODS层orders、orders_pay表的binlog,写入DWD层
3.3、查看宽表dwd_orders数据
4、构建DWS层
4.1、创建DWS层聚合表
4.2、数据写入DWS层表
4.3、查看DWS层数据
5、数据探查
5.1、流模式探查
5.2、批模式探查
5.3、应用1:Key-Value服务
5.4、应用2:明细查询
5.5、应用3:实时报表
基于Flink+Hologres搭建实时数仓
一、使用示例
随着社会数字化发展,企业对数据时效性的需求越来越强烈。除传统的面向海量数据加工场景设计的离线场景外,大量业务需要解决面向实时加工、实时存储、实时分析的实时场景问题。
如何搭建实时数仓?
二、方案架构
实时计算Flink版是强大的流式计算引擎,支持对海量实时数据高效处理。Hologres是一站式实时数仓,支持数据实时写入与更新,实时数据写入即可查。Hologres与Flink深度集成,能够提供一体化的实时数仓联合解决方案。
基于 Flink+Hologres 的 Streaming Warehouse 方案
1、架构优势
- 支持高效更新、 修正与查询
Hologres的每一层数据都支持高效更新与修正、写入即可查,解决了传统实时数仓解决方案的中间层数据不易查、不易更新、不易修正的问题。
- 支持高效复用
Hologres的每一层数据都可单独对外提供服务,数据的高效复用,真正实现数仓分层复用的目标。
- 架构简单
模型统一,架构简化。实时ETL链路的逻辑是基于Flink SQL实现的;ODS层、DWD层和DWS层的数据统一存储在Hologres中,可以降低架构复杂度,提高数据处理效率。
2、Hologres核心优势
三、实践场景
四、项目准备
1、创建阿里云账号AccessKey
- 使用阿里云账号登录控制台
- 将鼠标悬浮在右上方的账号图标上,单击 AccessKey 管理
- 在安全提示对话框,阅读安全提示信息,然后单击继续使用
- AccessKey 在 AccessKey 页面,单击创建 AccessKey
- 根据界面提示完成安全验证
- 在创建AccessKey对话框,查看 AccessKey ID 和 AccessKey Secret。可以单击下载 CSV 文件,下载 AccessKey 信息。单击复制,复制AccessKey 信息。
注意:一定要保存好AccessKey ID和AccessKey Secret
- 选中我已保存好AccessKey Secret
- 单击确定
2、准备MySQL数据源
通过DMS登录RDS MySQL。 在已登录的SQLConsole窗口,输入如下命令后单击执行。创建order_dw数据库。
create database order_dw;
创建后,点击左侧实例中的order_dw数据库,在order_dw的SQL窗口输入建表和插入数据的代码,详细代码见代码文档 点击执行后生成对应的表和数据,如下:
用型Hologres实例。开通实例后,需要在Hologres开发平台创建order_dw数据库: 进入Hologres管理控制台,单击左侧实例列表。
在实例列表页面,单击实例名称,进入实例详情页。
在实例详情页左侧导航栏,单击数据库管理。
在DB授权页面,单击右上角新增数据库。
在新增数据库对话框,选择实例名并填写数据库名称,简单权限策略选择SPM。
五、构建实时数仓
1、管理元数据
创建 Hologres Catalog:
- 在Flink开发平台,点击左侧SQL开发,点击作业草稿右侧的加号,新建作业草稿。
- 模板选择空白的流作业,作业名称为test,引擎版本选择 vvr-6.0.7-flink-1.15
创建作业后,将如下代码拷贝到test作业的SQL编辑器上,修改目标参数取值后,选中代码片段后单击左侧代码行上的运行。
CREATE CATALOG dw WITH
('type' = 'hologres',
'endpoint' = '<ENDPOINT>',
'username' = '<USERNAME>',
'password' = '<PASSWORD>',
'dbname' = 'order_dw',
'binlog' = 'true', -- 创建catalog时可以设置源表、维表和结果表支持的with参数,之后在使用此catalog下的表时会默认添加这些默认参数。
'sdkMode' = 'jdbc', -- 推荐使用jdbc模式。
'cdcmode' = 'true',
'connectionpoolname' = 'the_conn_pool',
'ignoredelete' = 'true', -- 宽表merge需要开启,防止回撤。
'partial-insert.enabled' = 'true', -- 宽表merge需要开启此参数,实现部分列更新。
'mutateType' = 'insertOrUpdate', -- 宽表merge需要开启此参数,实现部分列更新。
'table_property.binlog.level' = 'replica', -- 也可以在创建catalog时传入持久化的hologres表属性,之后创建表时,默认都开启binlog。
'table_property.binlog.ttl' = '259200'
);
需要修改以下参数取值为我们实际Hologres服务信息。
其中username和password是前面创建的阿里云账号的AccessKey ID和AccessKey Secret;endpoint是hologres实例的指定vpc地址;在hologres实例详情中的网络信息下可以看到。
说明:
创建Catalog时可以设置默认的源表、维表和结果表的WITH参数,也可以设置创建Hologres物理表的默认属性,例如上方table_property开头的参数。
验证1:创建成功后,可以在元数据栏看到对应的catalog 及其信息,以及hologres中的数据库。
创建MySQL Catalog:
- 将如下代码拷贝到test作业的SQL编辑器上,修改目标参数取值后,选中代码片段后单击左侧代码行上的运行。
CREATE CATALOG mysqlcatalog WITH(
'type' = 'mysql',
'hostname' = '<hostname>',
'port' = '3306',
'username' = '<username>',
'password' = '<password>',
'default-database' = 'order_dw'
);
- 需要修改以下参数取值为我们实际MySQL服务信息。
- 其中用户名和密码是自己创建的MySQL 高权限账号。MySQL ip地址可以在RDS 实例详情中数据库连接看到。
验证2:创建完成后,同样可以在元数据栏看到此catalog的信息以及mysql的数据库
2、构建ODS层
2.1、创建CDAS同步作业ODS
a) 在Flink开发平台,新建名为ODS的SQL流作业(步骤与test作业相同,引擎一致),并将如下代码拷贝到SQL编辑器。
CREATE DATABASE IF NOT EXISTS dw.order_dw -- 创建catalog 时设置了table_property.binlog.level参数,因此通过CDAS创建的所有表都开启了binlog。
AS DATABASE mysqlcatalog.order_dw INCLUDING all tables -- 可以根据需要选择上游数据库需要入仓的表。
/*+ OPTIONS('server-id'='8001-8004') */ ; -- 指定mysql-cdc源表。
b) 单击右上方的部署,进行作业部署。
基于Catalog的CREATE DATABASE AS(CDAS)语句功能,详细可见文档,可以一次性把ODS层建出来。 ODS层一般不直接做OLAP,主要作为流式作业的事件驱动,开启binlog即可满足需求。
c) 单击左侧导航栏的作业运维,单击刚刚部署的ODS作业操作列的启动,选择无状态启动启动作业
点击启动,稍等一会,作业状态变成运行中
2.2、查看MySQL同步到Hologres的3张表数据
- 点击上方SQL编辑器,数据库选择order_dw,在SQL编辑器上执行如下命令
---查orders中的数据。
SELECT * FROM orders;
---查orders_pay中的数据。
SELECT * FROM orders_pay;
---查product_catalog中的数据。
SELECT * FROM product_catalog;
- 点击运行后,可以看到三张表中的结果。可以与mysql中数据进行比较
3、构建DWD层
3.1、创建DWD层宽表
- 通过Flink Catalog功能在Hologres中建DWD层的宽表dwd_orders。
- 在Flink开发平台,将如下代码拷贝到test作业的SQL编辑器后,选中目标片段后单击左侧代码行上的运行。
-- 宽表字段要nullable,因为不同的流写入到同一张结果表,每一列都可能出现null的情况。
CREATE TABLE dw.order_dw.dwd_orders (
order_id bigint not null,
order_user_id string,
order_shop_id bigint,
order_product_id bigint,
order_product_catalog_name string,
order_fee numeric(20,2),
order_create_time timestamp,
order_update_time timestamp,
order_state int,
pay_id bigint,
pay_platform int comment 'platform 0: phone, 1: pc',
pay_create_time timestamp,
PRIMARY KEY(order_id) NOT ENFORCED
);
-- 支持通过catalog修改Hologres物理表属性。
ALTER TABLE dw.order_dw.dwd_orders SET (
'table_property.binlog.ttl' = '604800' --修改binlog的超时时间为一周。
);
3.2、实现实时消费ODS层orders、orders_pay表的binlog,写入DWD层
在Flink开发平台,新建名为DWD的SQL流作业,并将如下代码拷贝到SQL编辑器后,部署并启动作业。通过如下SQL作业,orders表会与product_catalog表进行维表关联,将最终结果写入dwd_orders表中,实现数据的实时打宽。
BEGIN STATEMENT SET;
INSERT INTO dw.order_dw.dwd_orders (
order_id,
order_user_id,
order_shop_id,
order_product_id,
order_fee,
order_create_time,
order_update_time,
order_state,
order_product_catalog_name
)
SELECT o.*, dim.catalog_name
FROM dw.order_dw.orders as o
LEFT JOIN dw.order_dw.product_catalog FOR SYSTEM_TIME AS OF proctime() AS dim
ON o.product_id = dim.product_id;
INSERT INTO dw.order_dw.dwd_orders (pay_id, order_id, pay_platform, pay_create_time)
SELECT * FROM dw.order_dw.orders_pay;
END;
3.3、查看宽表dwd_orders数据
在HoloWeb开发页面连接Hologres实例并登录目标数据库(order_dw)后,在SQL编辑器上执行如下命令
4、构建DWS层
4.1、创建DWS层聚合表
这里通过Flink Catalog功能,在Hologres中创建dws层的聚合dws_users以及dws_shops:
在Flink开发平台,将如下代码拷贝到test作业的SQL编辑器,选中目标片段后单击左侧代码行上的运行
-- 用户维度聚合指标表。
CREATE TABLE dw.order_dw.dws_users (
user_id string not null,
ds string not null,
paied_buy_fee_sum numeric(20,2) not null, -- '当日完成支付的总金额'
primary key(user_id,ds) NOT ENFORCED
);
-- 商户维度聚合指标表。
CREATE TABLE dw.order_dw.dws_shops (
shop_id bigint not null,
ds string not null,
paied_buy_fee_sum numeric(20,2) not null, -- '当日完成支付总金额'
primary key(shop_id,ds) NOT ENFORCED
);
4.2、数据写入DWS层表
这里实时消费DWD层的宽表dw.order_dw.dwd_orders,在Flink中做聚合计算,最终写入Hologres中的DWS表:
在Flink开发平台,新建名为DWS的SQL流作业,并将如下代码拷贝到SQL编辑器后,部署并启动作业。
BEGIN STATEMENT SET;
INSERT INTO dw.order_dw.dws_users
SELECT
order_user_id,
DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
SUM (order_fee) --order_fee订单费用,来自于mysql的buy_fee
FROM dw.order_dw.dwd_orders c
WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL -- 订单流和支付流数据都已写入宽表。
GROUP BY order_user_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd');
INSERT INTO dw.order_dw.dws_shops
SELECT
order_shop_id,
DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
SUM (order_fee)
FROM dw.order_dw.dwd_orders c
WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL -- 订单流和支付流数据都已写入宽表。
GROUP BY order_shop_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd');
END;
4.3、查看DWS层数据
查看DWS层的聚合结果,其结果会根据上游数据的变更实时更新:
在HoloWeb开发页面连接Hologres实例并登录目标数据库后,在SQL编辑器上执行如下命令
- 查询dws_users表结果
SELECT * FROM dws_users;
- 查询dws_shops表结果
SELECT * FROM dws_shops;
5、数据探查
5.1、流模式探查
- 新建并启动数据探查流作业。
新建名为Data-exploration的SQL流作业,并将如下代码拷贝到SQL编辑器后,部署并启动作业。
-- 流模式探查,打印到print可以看到数据的变化情况。
CREATE TEMPORARY TABLE print_sink(
order_id bigint not null,
order_user_id string,
order_shop_id bigint,
order_product_id bigint,
order_product_catalog_name string,
order_fee numeric(20,2),
order_create_time timestamp,
order_update_time timestamp,
order_state int,
pay_id bigint,
pay_platform int,
pay_create_time timestamp,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'print'
);
INSERT INTO print_sink SELECT *
FROM dw.order_dw.dwd_orders /*+ OPTIONS('startTime'='2023-02-15 12:00:00') */ --这里的startTime是binlog生成的时间
WHERE order_user_id = 'user_001';
- 查看数据探查结果
在作业运维详情页面,单击目标作业名称,在作业探查页签下左侧运行日志页签,单击运行Task Managers页签下的Path, ID。在Stdout页面搜索(按ctrl+f)user_001相关的日志信息。
5.2、批模式探查
- 接下来要用到调试功能,所以需要创建Session集群。
点击左侧Session管理,点击创建Session集群。
名称自定义,状态选择RUNNING,引擎版本选择vvr-6.0.7-flink-1.15,Task Managers数量为2,其余参数默认即可。
- 创建完成后,Session集群会自动启动。等待一会,状态变成运行中。
- 开始批模式探查
将如下代码拷贝到test作业中,选中这段代码,单击调试。选择刚刚创建的Session集群。
SELECT *
FROM dw.order_dw.dwd_orders /*+ OPTIONS('binlog'='false') */
WHERE order_user_id = 'user_001' and order_create_time > '2023-02-15 12:00:00’;
--批量模式支持filter下推,提升批作业执行效率。
批模式探查是获取当前时刻的终态数据,在Flink作业开发界面调试结果如图所示:
5.3、应用1:Key-Value服务
根据主键查询DWS层的聚合指标表,支持百万级RPS
在HoloWeb开发页面order_dw库下查询指定用户指定日期的消费额的代码示例如下
SELECT * FROM dws_users WHERE user_id ='user_001' AND ds = '20230215';
5.4、应用2:明细查询
对DWD层宽表进行OLAP分析
在HoloWeb开发页面查询某个客户23年2月特定支付平台支付的订单明细的代码示例如下
SELECT * FROM dwd_orders
WHERE order_create_time >= '2023-02-01 00:00:00'
AND order_create_time < '2023-03-01 00:00:00'
AND order_user_id = 'user_001'AND pay_platform = 0
ORDER BY order_create_time LIMIT 100;
5.5、应用3:实时报表
基于DWD层宽表数据展示实时报表,支持秒级响应
在HoloWeb开发页面order_dw数据库下查询23年2月内每个品类的订单总量和订单总金额的代码示例如下
SELECT
TO_CHAR(order_create_time, 'YYYYMMDD') AS order_create_date, --订单创建时间
order_product_catalog_name, --订单类别名称
COUNT(*), --订单总量
SUM(order_fee) --订单总金额
FROM dwd_orders
WHERE order_create_time >= '2023-02-01 00:00:00'
and order_create_time < '2023-03-01 00:00:00'
GROUP BY
order_create_date,
order_product_catalog_name
ORDER BY
order_create_date,
order_product_catalog_name
;
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨