系列文章目录
- 业务实现
3.1 创建catalog
3.1.1 vvp
3.1.2 mysqlcdc
3.1.2.1 使用限制
3.1.2.2 配置MySQL Catalog
3.1.3 xxxxpm
3.1.3.1 下载Paimon插件
3.1.3.2 在MaxCompute项目中上传Paimon插件
3.1.3.3 创建自定义Catalog类型
3.1.3.5 配置catalog
3.1.4 xxxxx
3.1.4.1 背景信息
3.1.4.2 授权
3.1.4.3 创建MaxCompute Catalog
3.1.4.4 基本使用
文章目录
- 系列文章目录
- 前言
- 3. 业务实现
- 3.1 创建catalog
- 3.1.1 vvp
- 3.1.2 mysqlcdc
- 3.1.2.1 使用限制
- 3.1.2.2 配置MySQL Catalog
- 3.1.3 xxxxpm
- 3.1.3.1 下载Paimon插件
- 3.1.3.2 在MaxCompute项目中上传Paimon插件
- 3.1.3.3 创建自定义Catalog类型
- 3.1.3.5 配置catalog
- 3.1.4 xxxxx
- 3.1.4.1 背景信息
- 3.1.4.2 授权
- 3.1.4.3 创建MaxCompute Catalog
- 3.1.4.4 基本使用
前言
本文为flink车联网项目:业务实现1,后续章节为:维表开发.
3. 业务实现
3.1 创建catalog
catalog可以将元数据进行持久化,这样后续的操作就可以反复使用这些表的元数据,而不用每次使用时都要重新注册。
根据上述分析,主要使用四种catalog。一种是阿里云flink自带的,叫vvp;另一种是mysql catalog,起名为mysqlcdc;还有一种是paimon catalog,起名为xxxxxpm;还有maxcompute catalog,起名为xxxxx。
3.1.1 vvp
阿里云flink自带的,直接使用即可。主要用来存储除mysql表和paimon表之外的其他信息。
3.1.2 mysqlcdc
配置MySQL Catalog后,就可以在Flink全托管控制台直接访问MySQL实例中的表,无需通过DDL语句手动注册MySQL表,提升开发效率和正确性。
3.1.2.1 使用限制
仅Flink计算引擎vvr-4.0.11-flink-1.13及以上版本支持配置MySQL Catalog。
不支持修改Catalog DDL。
仅支持查询数据库和表,不支持创建数据库和表。
作为源表仅支持流读、不支持批读,支持作为维表和结果表。
MySQL仅支持5.7和8.0.x版本。
3.1.2.2 配置MySQL Catalog
(1)语法:
CREATE CATALOG <catalogname> WITH(
'type' = 'mysql',
'hostname' = '<hostname>',
'port' = '<port>',
'username' = '<username>',
'password' = '<password>',
'default-database' = '<dbname>',
'catalog.table.metadata-columns' = '<metadata>'
);
参数 说明 是否必填
catalogname MySQL Catalog名称。 是
type 类型,固定值为mysql。 是
hostname MySQL数据库的IP地址或者Hostname。 是
port MySQL数据库服务的端口号,默认值为3306。 否
default-database 默认的MySQL数据库名称。 是
username MySQL数据库服务的用户名。 是
password MySQL数据库服务的密码。 是
catalog.table.metadata-columns 指定获取数据表时,表的Schema需要添加MySQL CDC源表的元数据列。多个元数据列使用英文分号(;)分隔,例如:op_ts;table_name;database_name。默认不添加元数据列。
说明
o 仅实时计算引擎VVR 6.0.5及以上版本支持该参数。
o 当配置该参数时,返回的表Schema会额外添加指定的元数据列,这些列只适用于MySQL CDC源表,所以该Catalog返回的表只能用作数据源表,不可以用作结果表或维表。 否
(2)选中创建Catalog的代码后,单击左侧代码行数上的运行
其中hostname为rds的内网ip。
CREATE CATALOG mysqlcdc WITH(
‘type’ = ‘mysql’,
‘hostname’ = ‘rm-cn-x0r3fp1lj000qa.rwlb.rds.aliyuncs.com’,
‘port’ = ‘3306’,
‘username’ = ‘XXXXX’,
‘password’ = ‘xxxx’,
‘default-database’ = ‘dim’
);
(3)测试
在流作业草稿中调试运行下面的SQL
select * from mysqlcdc.driver.comment_info;
即可看到相应的数据
3.1.3 xxxxpm
为了方便paimon表的创建和管理,使用单独的paimon catalog。这里不使用vvp的原因是:如果使用vvp则每个paimon表都需要指定type,warehouse等信息,比较冗余;另一方面,使用vvp则只能创建paimon的映射表,如果删除了映射表,数据依然存在,而使用paimon catalog创建的表如果删除表也会同步删除对应的数据。
另外,因为MaxCompute需要访问paimon表,所以需要paimon catalog需要整合MaxCompute。
3.1.3.1 下载Paimon插件
可以通过链接下载,也可以直接使用 资料/jar包中的
paimon_maxcompute_connector.jar
https://static-aliyun-doc.oss-cn-hangzhou.aliyuncs.com/file-manage-files/zh-CN/20230912/btmx/paimon_maxcompute_connector.jar
3.1.3.2 在MaxCompute项目中上传Paimon插件
1)登录DataWorks控制台,单击目标工作空间操作列中的快速进入 > 数据开发。
2)在数据开发页面,单击新建按钮,选择新建资源 > JAR。
在新建资源对话框,配置新建资源参数,引擎实例选择mchmcx,资源类型为JAR,路径为 业务流程/Workflow/MaxCompute,点击上传paimon_maxcompute_connector.jar,单击新建。
4)资源创建完成后,需在资源编辑页面,单击工具栏中的提交图标,提交资源至调度开发服务器端。
点击确定
5)提交成功后,点击发布
在发布包界面点击发布
3.1.3.3 创建自定义Catalog类型
1)下载Paimon自定义catalog插件
可以通过链接下载https://static-aliyun-doc.oss-cn-hangzhou.aliyuncs.com/file-manage-files/zh-CN/20230914/zqcx/paimon-ali-vvr-8.0-vvp-0.6-ali-1-SNAPSHOT.jar
也可以直接使用资料/jar包中的paimon-ali-vvr-8.0-vvp-0.6-ali-1-SNAPSHOT.jar
2)点击元数据管理,点击创建Catalog,点击自定义Catalog,点击创建自定义Catalog类型
3)点击选择文件,上传paimon-ali-vvr-8.0-vvp-0.6-ali-1-SNAPSHOT.jar,点击下一步,稍等一些时间。
4)上传完成后,点击确定。
3.1.3.4 创建catalog信息
1)模板:
CREATE CATALOG `<catalog name>` WITH (
'type' = 'paimon-06-1',
'metastore' = 'maxcompute',
'warehouse' = '<warehouse>',
'fs.oss.endpoint' = '<oss endpoint>',
'fs.oss.accessKeyId' = '<oss access key id>',
'fs.oss.accessKeySecret' = '<oss access key secret>',
'maxcompute.endpoint' = '<maxcompute endpoint>',
'maxcompute.accessid' = '<maxcompute access id>',
'maxcompute.accesskey' = '<maxcompute access key>',
'maxcompute.project' = '<maxcompute project>',
'maxcompute.oss.endpoint' = '<maxcompute oss endpoint>'
);
参数含义:
参数 是否必填 说明
catalog name 是 Paimon Catalog名称,自定义配置时,要求为英文字母。本文以catalogname为例。
type 是 Catalog类型,固定值为paimon-06-1。
metastore 是 元数据存储类型,取值为maxcompute。
warehouse 是 OSS服务中所指定的数仓目录,格式为oss:///,其中:
o bucket:表示您创建的OSS Bucket名称。
o object:表示您存放数据的路径。
fs.oss.endpoint 是 OSS服务的连接地址。
您需要根据创建OSS Bucket时选择的地域以及网络连接方式配置Endpoint。各地域及网络对应的Endpoint信息,请参见访问域名和数据中心。
fs.oss.accessKeyId 是 拥有读写OSS权限的阿里云账号或RAM账号的AccessKey ID。
您可以进入AccessKey管理页面获取AccessKey ID。
fs.oss.accessKeySecret 是 AccessKey ID对应的AccessKey Secret。
您可以进入AccessKey管理页面获取AccessKey Secret。
maxcompute.endpoint 是 MaxCompute服务的连接地址。
您需要根据创建MaxCompute项目时选择的地域以及网络连接方式配置Endpoint。各地域及网络对应的Endpoint信息,请参见Endpoint。
maxcompute.accessid 是 拥有MaxCompute权限的阿里云账号或RAM账号的AccessKey ID。
您可以进入AccessKey管理页面获取AccessKey ID。
maxcompute.accesskey 是 AccessKey ID对应的AccessKey Secret。
您可以进入AccessKey管理页面获取AccessKey Secret。
maxcompute.project 是 目标MaxCompute项目名称。
maxcompute.oss.endpoint 否 MaxCompute访问OSS服务的访问域名,如果未填写,将默认使用fs.oss.endpoint参数的值。
说明
由于OSS Bucket与MaxCompute项目处于同一地域,建议您将maxcompute.oss.endpoint配置为内网Endpoint,否则将产生OSS流量费用。关于OSS各地域及网络对应的Endpoint值,请参见访问域名和数据中心。
2)warehouse即使用在paimon创建好的bucket
所以,warehouse为 oss://xxxxx-paimon/
3)通过查询访问域名和数据中心 https://help.aliyun.com/zh/oss/user-guide/regions-and-endpoints 得到 北京的内网Endpoint为 oss-cn-beijing-internal.aliyuncs.com
4)配置Accesskey【如果在StarRocks章节已经配置了则直接使用即可】
登录AccessKey管理页面,https://ram.console.aliyun.com/manage/ak?spm=a2c4g.11186623.0.0.389247f8OlOaMw
首次进入需勾选并点击确定
因为安全原因,这里使用子用户的AccessKey
点击创建用户
登录名为xxxxxx,显示名称为xxxx,勾选两种访问方式
点击自定义密码,设置为xxxxx
然后是不需要重置密码,不需要MFA多因素认证
点击确定
然后通过手机号或者扫脸进行验证。
验证成功后,复制对应的AccessKey ID 和 Secret
5)配置用户权限
点击用户后侧的添加权限
授权范围为整个云账号,选择权限选择系统策略,
搜索OSS,选择AliyunOSSFullAccess
搜索MaxCompute,选择AliyunMaxcomputeFullAccess
然后点击确定
6)将xxxxxx加到dataworks里。
登录dataworks控制台,点击工作空间列表,点击管理
在工作空间中,点击空间成员,点击添加成员
选择用户和 aliyunservicerole,设置角色为空间管理员,点击确定。
7)通过查询 MaxCompute Endpoint得到阿里云经典网络连接方式对应的北京endpoint 为 http://service.cn-beijing.maxcompute.aliyun-inc.com/api
8)maxcompute.accessid 和 maxcompute.accesskey也使用itheima的。
9)maxcompute.project 为 需要绑定的已有的maxcompute的项目名称,这里根据集群搭建时创建名称,为dwxxxxx。可以通过 https://maxcompute.console.aliyun.com/cn-beijing/project-list 查询。
10)最终创建的catalog信息为:
CREATE CATALOG xxxxpm
WITH (
‘type’ = ‘paimon-06-1’,
‘metastore’ = ‘maxcompute’,
‘warehouse’ = ‘oss://xxxxx-paimon/’,
‘fs.oss.endpoint’ = ‘oss-cn-beijing-internal.aliyuncs.com’,
‘fs.oss.accessKeyId’ = ‘LTAI5tKM4wTyz5’,
‘fs.oss.accessKeySecret’ = ‘Hr7RNYz0H0Ze1uQDqou’,
‘maxcompute.endpoint’ = ‘http://service.cn-beijing.maxcompute.aliyun-inc.com/api’,
‘maxcompute.accessid’ = ‘LTAI5tKM4wTyz5’,
‘maxcompute.accesskey’ = ‘Hr7RNYz0H0Ze1uQDqou’,
‘maxcompute.project’ = ‘dwxxxx_dev’
);
3.1.3.5 配置catalog
点击创建Catalog,选择自定义Catalog,选择paimon-06-1,点击下一步
将上一步的配置粘进去,点击确定
创建完成后,可以看到之前创建的paimon库。
3.1.4 xxxxx
因为Flink要访问MaxCompute,配置MaxCompute Catalog后,就可以在Flink全托管作业开发中直接访问MaxCompute中存储的表,无需再定义Schema。
3.1.4.1 背景信息
MaxCompute Catalog通过查询MaxCompute服务来获取MaxCompute中已存储物理表的Schema信息,无需在Flink SQL中声明MaxCompute连接表的Schema便可以获取具体的字段信息。
MaxCompute Catalog具有以下功能特点:
(1)MaxCompute Catalog中的数据库名对应MaxCompute的项目名,可以通过切换数据库来使用不同MaxCompute项目中的表。
(2)MaxCompute Catalog中的表名对应MaxCompute中存储的物理表名,自动映射数据类型,无需再通过DDL语句手动注册MaxCompute表,提升开发效率和正确性。
(3)MaxCompute Catalog提供的表可以直接作为Flink SQL作业中的源表、维表和结果表使用。
(4)在MaxCompute Catalog中创建表能够自动在MaxCompute服务中创建对应的物理表,并自动映射数据类型,提升开发效率。
3.1.4.2 授权
在用 xxxxx 的accessid(即xxxx账号)访问maxcompute生产表的数据时,默认是没有权限的,所以需要开通相应的权限。
开通方式如下:
1)打开maxcompute控制台 https://maxcompute.console.aliyun.com/cn-beijing/project-list
2)点击管理
3)点击角色权限,点击成员管理
4)将itheima加到右侧,点击确定
3.1.4.3 创建MaxCompute Catalog
(1)模板
CREATE CATALOG <catalogName>
WITH (
‘type’ = ‘odps’,
‘endpoint’ = ‘’,
‘accessId’ = ‘’,
‘accessKey’ = ‘’,
‘project’ = ‘’,
‘userAccount’ = ‘’
);
参数 说明 类型 是否必填 备注
catalogName MaxCompute Catalog的名称。 String 是 请填写为自定义的英文名。
type Catalog类型。 String 是 固定值为odps。
endpoint MaxCompute服务连接站点。 String 是 具体站点请参见Endpoint。
accessId 访问MaxCompute服务所使用阿里云账号的AccessKey ID。 String 是 该账号需要对Catalog访问的项目有admin权限。
accessKey 访问MaxCompute服务所使用阿里云账号的AccessKey Secret。 String 是 无。
project Catalog中作为默认数据库的MaxCompute项目名。 String 否 若不设置该值,默认项目为default。
userAccount 阿里云账号或RAM用户名称。 String 否 若使用的AccessKey非主账号,仅对主账号下的部分项目有admin权限,则需要设置该参数为账号名称,例如RAM$[<account_name>:]<RAM_name>,MaxCompute Catalog将仅展示该账号有权限的项目列表。
MaxCompute用户权限管理参见用户规划与管理。
(2)新建空白流作业草稿,在文本编辑区域,输入配置MaxCompute Catalog的命令:
CREATE CATALOG xxxxxmc WITH (
‘type’ = ‘odps’,
‘endpoint’ = ‘http://service.cn-beijing.maxcompute.aliyun-inc.com/api’,
‘accessId’ = ‘LTAI5tKM4wTyz5’,
‘accessKey’ = ‘Hr7RNYz0H0Ze1uQDqou’,
‘project’ = ‘dwxxxx’
);
3.1.4.4 基本使用
1)通过Catalog创建MaxCompute物理表
通过Flink SQL DDL,在MaxCompute Catalog中创建表时,会自动在对应的MaxCompute项目中创建对应的物理表,并自动将Flink中的类型转换为MaxCompute中的类型,支持创建非分区表和分区表。
创建非分区表示例:
CREATE TABLE <catalogName>
.<projectName>
.<tableName>
(
f0 INT,
f1 BIGINT,
f2 DOUBLE,
f3 STRING
);
执行完成后,可以在MaxCompute中查看对应项目中的表,可以看到已创建对应名字的非分区表,其列名称、类型与Flink DDL中对应。
创建分区表示例:
CREATE TABLE <catalogName>
.<projectName>
.<tableName>
(
f0 INT,
f1 BIGINT,
f2 DOUBLE,
f3 STRING,
ds STRING
) PARTITIONED BY (ds);
2)从MaxCompute Catalog表中读取数据
MaxCompute Catalog能够从MaxCompute服务读取物理表的Schema,因此无需在Flink中声明对应Schema即可直接读取数据。例如:
SELECT * FROM <catalogName>
.<projectName>
.<tableName>
;
不声明任何参数的默认行为为全量读取所有分区,若需要读取特定分区,或使用增量源表模式,可以参考大数据计算服务MaxCompute中的参数设置,在SQL注释中声明,例如:
读取特定分区:
SELECT * FROM <catalogName>
.<projectName>
.<tableName>
/*+ OPTIONS(‘partition’ = ‘ds=230613’) /;
使用增量源表模式:
SELECT * FROM <catalogName>
.<projectName>
.<tableName>
/+ OPTIONS(‘startPartition’ = ‘ds=230613’) /;
使用维表模式:
SELECT * FROM <anotherTable>
AS l LEFT JOIN
<catalogName>
.<projectName>
.<tableName>
/+ OPTIONS(‘partition’ = ‘max_pt()’, ‘cache’ = ‘ALL’) /FOR SYSTEM_TIME AS OF l.proc_time AS rON l.id = r.id;
需要注意的是,MaxCompute Catalog中不保存Watermark信息,若需要在以源表读取数据时指定Watermark,可以使用CREATE TALE … LIKE …语句,例如:
CREATE TABLE <newTable>
( WATERMARK FOR ts AS ts ) LIKE <catalogName>
.<projectName>
.<tableName>
;
其中ts为MaxCompute物理表中类型为DATETIME的列,该类型可以在Flink中被设置为事件时间并添加Watermark信息,创建完成后,从newTable读取的数据均带有Watermark。
3)向MaxCompute Catalog表中写入数据
MaxCompute Catalog支持以固定分区或动态分区模式写入数据。例如有MaxCompute物理表有二级分区ds和hh,可以使用如下语句写入数据:
– 写入固定分区
INSERT INTO <catalogName>
.<projectName>
.<tableName>
/+ OPTIONS(‘partition’ = ‘ds=20231024,hh=09’) /SELECT , ‘20231024’, ‘09’ FROM <anotherTable>
;
– 写入动态分区
INSERT INTO <catalogName>
.<projectName>
.<tableName>
/+ OPTIONS(‘partition’ = ‘ds,hh’) */SELECT , ds, hh FROM <anotherTable>
;