[实时计算flink]CREATE DATABASE AS(CDAS)语句

news2025/1/15 6:36:33

CDAS支持整库级别的表结构和数据的实时同步,还支持表结构变更的同步。本文为您介绍CREATE DATABASE AS(CDAS)的使用方法,并提供了多种使用场景下的示例。

背景信息

CDAS是CTAS语法的一个语法糖,用于实现整库同步、多表同步的功能。阿里云Flink引擎会将CDAS语句中每个需要同步的表翻译成一个对应的CTAS语句。因此,CDAS还拥有CTAS的数据同步和表结构变更同步的能力,常用于全自动化的数据集成场景。此外,阿里云Flink还能对源表进行优化,复用一个源表节点读取多业务表的数据。这对于MySQL CDC数据源场景尤为适用,因为不仅可以减少数据库的连接数,还能避免重复拉取Binlog数据,以降低数据库的读取压力。

使用限制

  • 仅Flink计算引擎vvr-4.0.11-flink-1.13及以上版本支持CDAS语法。

    重要

    CDAS语法不支持进行调试。

  • 仅Flink计算引擎vvr-4.0.13-flink-1.13及以上版本支持分库合并同步。

  • CDAS支持的上下游存储列表如下。

    连接器名称

    源表

    结果表

    备注

    MySQL

    ×

    不支持同步MySQL视图。

    消息队列Kafka

    ×

    无。

    MongoDB

    ×

    • 暂不支持分库合并同步。

    • 暂不支持同步MongoDB元信息。

    • 支持通过CDAS语句将MongoDB中的数据及表结构变更同步至目标表。具体的配置要求请参见使用MongoDB Catalog。

    Upsert Kafka

    ×

    无。

    实时数仓Hologres

    ×

    如果下游是Hologres,CDAS在默认情况下会为每个表创建相应数量(connectionSize参数值)个连接。此时您可以使用connectionPoolName参数,让配置相同名称连接池的表可以共享连接池。

    说明

    • 在将数据同步到Hologres时,如果您的上游源表包含了Fixed Plan不支持类型的数据,建议通过INSERT INTO语句的方式,在Flink内部做类型转换后将数据同步到Hologres。不要用CDAS方式创建Sink结果表进行数据同步,因为这种方式会无法走Fixed Plan,写入性能较差。

    • 实时计算Flink版仅支持读写Hologres内表,因此Hologres实例必须是独占实例,不支持Hologres共享集群实例。

    StarRocks

    ×

    仅支持EMR的StarRocks。

    流式数据湖仓Paimon

    ×

    • 仅Flink计算引擎vvr-6.0.7-flink-1.15及以上版本支持Paimon结果表。

    • 暂不支持同步到Paimon DLF 2.0结果表。

前提条件

  • 执行CDAS语法前,确保工作空间中已注册目标端的Catalog,详情请参见管理元数据。

  • 执行CDAS语法前,如果您需要访问不同账号下的上下游资源、以及使用RAM用户或RAM角色等身份访问时,请确保登录Flink全托管的账号具有读写上下游资源的权限,否则会因为权限不足导致读写操作失败。

注意事项

  • 使用VVR 8.0.6及以上版本时,CDAS作业启动后,支持添加新表后从作业快照重启,从而捕获到新的表。详情请参见示例三:源库新增表加入数据同步。

  • 使用VVR 8.0.5及以下版本时,CDAS作业启动后,作业同步的表已经确定,数据库中新增的表不会自动捕捉,也无法通过重启作业的方式捕获到。如果需要同步新增的表,您可以选择以下任一种方案:

    • 原有CDAS作业不变,启动一个新的作业同步新增的表。例如

      // 新建CTAS作业同步新增加的表new_table
      CREATE TABLE IF NOT EXISTS new_table
      AS TABLE mysql.tpcds.new_table 
      /*+ OPTIONS('server-id'='8008-8010') */;
    • 停止现有CDAS作业,清理已同步的数据后,以全新状态重启CDAS作业来重新同步数据。

功能特性

功能

详情

整库同步

支持实时同步整库(或者多张表)的全量和增量数据到每张对应的结果表中。

表结构变更同步

在实时同步整库数据的同时,还支持将每张源表的表结构变更(加列等)实时同步到结果表中。

分库合并同步

支持使用正则表达式定义库名,匹配数据源的多个分库下的源表,合并后同步到下游每张对应表名的结果表中。

源库新增表加入数据同步

CDAS作业启动后,如果源库新增表,支持从作业快照重启,从而捕获到新的表,对新增表进行数据同步。

多CDAS&CTAS语句

支持使用STATEMENT SET语法将多个CDAS和CTAS语句作为一个作业一起提交,并支持对源表节点的合并复用,降低对数据源的压力。

启动流程

当执行CDAS语句时,阿里云Flink将会按照以下流程执行:

  1. 检查目标存储中是否存在目标库和结果表。

    • 如果不存在目标库,则通过目标端Catalog去目标存储中创建相应的目标库。

    • 如果存在目标库,则跳过建库,并检查目标库是否存在该结果表。

      • 如果不存在,则在目标库中创建相应的结果表,该结果表具有和源库中表相同的表名和Schema。

      • 如果存在,则跳过建表。

  2. 提交和启动相应的数据同步作业。将源库中的数据以及Schema变更同步到目标库下的表中。

例如,从MySQL到Hologres的CDAS数据同步流程如下图所示。

CDAS示意图

表结构变更同步策略

因为CDAS是CTAS语法的一个语法糖,所以表结构变更能力与CTAS一致,详情请参见CREATE TABLE AS(CTAS)语句。

基本语法

CREATE DATABASE IF NOT EXISTS <target_database>
[COMMENT database_comment]
[WITH (key1=val1, key2=val2, ...)]
AS DATABASE <source_database>
INCLUDING { ALL TABLES | TABLE 'table_name' }
[EXCLUDING TABLE 'table_name']
[/*+ OPTIONS(key1=val1, key2=val2, ... ) */]

<target_database>:
  [catalog_name.]db_name

<source_database>:
  [catalog_name.]db_name

CDAS语法复用了CREATE DATABASE语法的基本结构,其中的参数解释如下表所示。

参数

说明

target_database

数据同步的目标数据库名,可以指定具体的Catalog名称。

COMMENT

目标库的描述,默认使用source_database的描述。

WITH

目标库的参数,详情请参见管理元数据中对应的Catalog文档。

说明

key和value都需要为字符串类型,例如'sink.parallelism' = '4'。

source_database

数据同步的源库名称,可以指定具体的Catalog名称。

INCLUDING ALL TABLES

同步源库中的所有表。

INCLUDING TABLE

同步源库中指定的表。支持使用竖线(|)分隔指定多个表,也可以使用正则表达式指定符合某一规则的表。例如INCLUDING TABLE 'web.*'表示要同步源库中所有web开头的表。

EXCLUDING TABLE

用于指定不需要同步的表,支持使用竖线(|)分隔指定多个表,也可以使用正则表达式指定符合某一规则的表,例如INCLUDING ALL TABLES EXCLUDING TABLE 'web.*'表示同步源库中所有不是web开头的表。

OPTIONS

源表的参数,详情请参见对应连接器支持的源表WITH参数。

说明

key和value都需要为字符串类型,例如'server-id' = '65500'。

说明

因为IF NOT EXISTS关键字为必填,所以如果目标库或结果表在目标存储中并不存在,则会先创建该目标库和结果表,否则跳过创建步骤。创建的结果表Schema会使用源表的Schema,包括主键以及物理字段的字段名和字段类型,不包括计算列、meta字段、Watermark。其中源表到结果表的字段类型会经过类型映射,详见对应连接器文档中的类型映射。

示例

示例一:整库同步

CDAS通常会配合数据源的Catalog和目标的Catalog一起使用。例如,MySQL Catalog和Hologres Catalog结合CDAS语法,完成MySQL到Hologres的全量和增量数据同步。使用MySQL Catalog可以自动解析源表的Schema及相应的参数,而不用手动编写DDL。

假设已在工作空间中注册了名为holo的Hologres Catalog和名为mysql的MySQL Catalog,MySQL中有一个名为tpcds的库。您可以使用以下语句将tpcds库下的24张表全部同步到Hologres中,包括未来的数据变更和表结构变更,无需提前在Hologres中创建表。

USE CATALOG holo;

CREATE DATABASE IF NOT EXISTS holo_tpcds  -- 在hologres中创建holo_tpcds库。
WITH ('sink.parallelism' = '4') -- 可选,指定目标库的参数,每个holo sink默认使用4并发。
AS DATABASE mysql.tpcds INCLUDING ALL TABLES  -- 同步mysql中tpcds库下所有表。
/*+ OPTIONS('server-id'='8001-8004') */ ; -- 可选,指定mysql-cdc源表的额外参数。

说明

Hologres支持在创建目标Database时指定WITH参数,这些参数仅对当前作业生效,用于控制写入结果表时的行为,不会持久化到Hologres中。支持的WITH参数详情请参见实时数仓Hologres。

示例二:分库合并同步

对于分库合并同步的场景,需要利用正则表达式的库名来匹配所要同步的多个分库。使用CDAS可以将上游多个分库下相同表名的数据合并同步到Hologres目标库对应表名的同一张表中,库名和表名会作为额外的两个字段写入到每张结果表中。为保证主键唯一性,库名、表名和原主键一起作为对应Hologres表的新联合主键。

假设MySQL实例中有order_db01~order_db99多个分库,每个分库下都有order、order_detail等多张表。您可以使用以下语句将99个分库下的order、order_detail等表全部同步到Hologres中,包括未来的数据变更和表结构变更,无需提前在Hologres中创建表。

order1

USE CATALOG holo;

CREATE DATABASE IF NOT EXISTS holo_order--在Hologres中创建holo_order库,包括mysql中order分库的所有表。
WITH('sink.parallelism'='4')        --可选,指定目标库的参数,每个HologresSink默认并发为4。
AS DATABASE mysql.`order_db[0-9]+` INCLUDING ALL TABLES --同步mysql中order_db分库下所有表。
/*+OPTIONS('server-id'='8001-8004')*/;  --可选,指定mysql-cdc源表的额外参数。

示例三:源库新增表加入数据同步

使用VVR 8.0.6及以上版本时,CDAS作业启动后,如果源库新增表,支持从作业快照重启,从而捕获到新的表,对新增表进行数据同步。

  1. SQL作业开发时需要增加以下语句,开启CDAS新增表读取功能。

    SET 'table.cdas.scan.newly-added-table.enabled' = 'true';
  2. 当出现新增的表需要同步时,停止作业并勾选停止前创建一次快照

  3. SQL开发中,重新部署这个SQL作业。

  4. 作业运维页面单击目标作业名称,状态集管理页签,单击历史

  5. 作业快照列表中,找到停止作业时创建的快照。

  6. 单击目标快照操作列,选择更多 > 从该快照恢复作业

  7. 作业启动配置对话框,配置作业启动信息,详情请参见作业启动。

重要

新增表功能只能用于默认的initial启动模式。

示例四:多CDAS&CTAS语句

实时计算Flink版支持使用STATEMENT SET语法将多个CTAS语句作为一个作业一起提交,并且可以对Source进行优化,复用一个Source节点读取多业务表的数据。这对于MySQL CDC数据源场景尤为适用,因为这可以减少server-id的使用,减少对数据库的连接数和读取压力。

说明

对于Source复用优化,需要这些Source表的options保持完全一致,才能合并成功进行复用。

假设MySQL实例中有tpcds、tpch、user_db01~user_db99(分库分表)多个库。您可以通过组合多条CDAS和CTAS语句,将MySQL实例下的所有库和表都同步到Hologres,只需一个Flink作业便能完成所有表的同步,只需一个Source便能读取所有表的数据,代码示例如下。

USE CATALOG holo;

BEGIN STATEMENT SET;

-- 同步user分库分表。
CREATE TABLE IF NOT EXISTS user
AS TABLE mysql.`user_db[0-9]+`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */;

-- 同步TPCDS库。
CREATE DATABASE IF NOT EXISTS holo_tpcds
AS DATABASE mysql.tpcds INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;
-- 同步TPCH库。
CREATE DATABASE IF NOT EXISTS holo_tpch
AS DATABASE mysql.tpch INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;

END;

示例五:多CDAS语句整库同步到Kafka

在使用多个CDAS语句整库同步到Kafka时,由于不同的数据库中可能存在相同的表,为了防止topic冲突,需要使用cdas.topic.pattern配置。cdas.topic.pattern定义了创建topic的名称的格式,其中可通过{table-name}占位符来替换为表名。如:当设置'cdas.topic.pattern'='db1-{table-name}',对于上游表名为table1的表,在Kafka中对应的topic名称为db1-table1

假设MySQL实例中有tpcds、tpch多个库。您可以通过如下方式将MySQL实例下的所有库和表都同步到Kafka,避免topic冲突,代码示例如下。

USE CATALOG kafkaCatalog;

BEGIN STATEMENT SET;

-- 同步TPCDS库。
CREATE DATABASE IF NOT EXISTS kafka
WITH ('cdas.topic.pattern' = 'tpcds-{table-name}')
AS DATABASE mysql.tpcds INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;

-- 同步TPCH库。
CREATE DATABASE IF NOT EXISTS kafka
WITH ('cdas.topic.pattern' = 'tpch-{table-name}')
AS DATABASE mysql.tpch INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;

END;

实时计算Flink版提供MySQL整库同步到Kafka的能力,通过引入Kafka作为中间层,并使用CDAS整库同步或CTAS整表同步到Kafka来解决

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2214989.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

在 Unity 中创建模型动画的探索之旅

在 Unity 游戏开发或 3D 场景构建中&#xff0c;模型动画是赋予虚拟对象生命和个性的关键元素。它能够极大地增强用户体验&#xff0c;使场景更加生动和吸引人。本文将带您深入了解在 Unity 中创建模型动画的基本流程和方法。 一、准备工作 在开始创建动画之前&#xff0c;您…

空间大数据的数据变换与价值提炼

在数字化时代&#xff0c;空间大数据正成为推动社会经济发展的关键因素。空间大数据不仅体量巨大&#xff0c;而且具有高速流转、多样类型和真实性等特点&#xff0c;它们在获取、存储、管理、分析方面超出了传统数据库软件工具的能力范围。地理信息系统&#xff08;GIS&#x…

淘宝详情API接口有什么用处?

淘宝详情API接口有什么用处&#xff1f;主要体现在以下几个方面&#xff1a; 电商数据分析&#xff1a;通过调用API接口获取商品详情数据&#xff0c;可以对商品的销售情况、价格变化、属性分布等进行深入分析。这些数据为电商运营提供了决策支持&#xff0c;帮助商家更好地了…

Redis哨兵模式部署(超详细)

哨兵模式特点 主从模式的弊端就是不具备高可用性&#xff0c;当master挂掉以后&#xff0c;Redis将不能再对外提供写入操作&#xff0c;因此sentinel模式应运而生。sentinel中文含义为哨兵&#xff0c;顾名思义&#xff0c;它的作用就是监控redis集群的运行状况&#xff0c;此…

使用gradle将java项目推送至maven中央仓库(最新版)

前言 maven中央仓库于2024年3月进行改版&#xff0c;下面介绍新的推送方式 一、将项目推送到github 过程略 二、注册sonatype账号 仓库地址&#xff1a;https://central.sonatype.com/ 这里选择使用github账号登录&#xff0c;不注册新的了 三、创建命名空间 这里会自动…

2012年国赛高教杯数学建模D题机器人避障问题解题全过程文档及程序

2012年国赛高教杯数学建模 D题 机器人避障问题 图1是一个800800的平面场景图&#xff0c;在原点O(0, 0)点处有一个机器人&#xff0c;它只能在该平面场景范围内活动。图中有12个不同形状的区域是机器人不能与之发生碰撞的障碍物&#xff0c;障碍物的数学描述如下表&#xff1a…

uniapp 整合 OpenLayers - 加载Geojson数据(在线、离线)

Geojson数据是矢量数据&#xff0c;主要是点、线、面数据集合 Geojson数据获取&#xff1a;DataV.GeoAtlas地理小工具系列 实现代码如下&#xff1a; <template><!-- 监听变量 operation 的变化&#xff0c;operation 发生改变时&#xff0c;调用 openlayers 模块的…

牛只行为及种类识别数据集18g牛只数据,适用于多种图像识别,目标检测,区域入侵检测等算法作为数据集。数据集中包括牛只行走,站立,进食,饮水等不同类型的数据

18g牛只数据&#xff0c;适用于多种图像识别&#xff0c;目标检测&#xff0c;区域入侵检测等算法作为数据集。 数据集中包括牛只行走&#xff0c;站立&#xff0c;进食&#xff0c;饮水等不同类型的数据&#xff0c;可以用于行为检测 数据集中包含多种不同种类的牛只&#xff…

Eking管理易 Html5Upload 前台任意文件上传漏洞复现

0x01 产品描述&#xff1a; ‌Eking管理易是一款专为广告制品制作企业量身定制的管理软件产品&#xff0c;旨在帮助企业实现规范化、科学化管理&#xff0c;提升运营效率和降低运营成本。‌ 该软件由广州易凯软件技术有限公司开发&#xff0c;基于JAVA企业版技术研发&#xff0…

CSS 入门

1. CSS 1.1 概念 CSS&#xff08;Cascading Style Sheet&#xff09;&#xff0c;层叠样式表&#xff0c;用于控制页面的样式 CSS 能够对网页中元素位置的排版进行像素级精确控制&#xff0c;实现美化页面的效果&#xff0c;能够做到页面的样式和结构分离&#xff08;类似于…

Wi-Fi数据帧类别

网络中传送的业务数据对服务质量&#xff08;QualityofService&#xff0c;QoS&#xff09;有不同的要求&#xff0c;例如语音业务需要实时被传送&#xff0c;它对时延的大小很敏感。当W-Fi MAC层在同时传输语音业务和普通业务的数据时&#xff0c;语音业务就需要被高优先级发送…

当下的时代?

我这两天刚接触一个人,错误之皇,每做一件小事的时候他都像救命稻草一样抓着,有一天我一看,嚯,好家伙,他抱着的是已经让我仰望的参天大树了! 这个时代需要我们从无限思维的视角和做法去努力&#xff1b;它不取决于我们现在有多少&#xff0c;而取决于我们未来的成长幅度是多少&a…

动态规划算法专题(七):两个数组的dp问题

目录 1、最长公共子序列 1.1 算法原理 1.2 算法代码 2、不相交的线 2.1 算法原理 2.2 算法代码 3、不同的子序列 3.1 算法原理 3.2 算法代码 4、通配符匹配&#xff08;hard ★★★&#xff09; 4.1 算法原理 4.2 算法代码 5、正则表达式匹配&#xff08;hard ★…

Vue-admin-box后台管理框架

文章目录 1、项目概述2、技术栈3、 特色功能4、基础模板5、 项目演示6、 源码地址7、 演示地址8、小结Vue-Admin-Box,一款精心打造的Vue.js后台管理模板,旨在为开发者提供高效、美观且易于扩展的后台解决方案。它集成了现代Web开发的最佳实践,包括响应式设计、模块化开发、丰…

服务器数据恢复—服务器宕机导致挂载的V7000存储文件系统损坏的数据恢复案例

服务器存储数据恢复环境&#xff1a; 一台挂载在Windows server服务器上的v7000存储。存储空间划分了一个分区&#xff0c;采用NTFS文件系统&#xff0c;存放oracle数据库。 服务器存储故障&#xff1a; 服务器在运行过程中宕机&#xff0c;于是管理员重启服务器。服务器进入系…

针对珠宝,手表等配饰的高保真虚拟试穿解决方案IDM-VTON

今天给大家介绍一种针对配饰的高保真虚拟试穿解决方案IDM-VTON&#xff0c;该方案旨在填补当前虚拟试穿技术在高端配饰&#xff08;如珠宝和手表&#xff09;领域的空白。现有的虚拟试穿模型主要集中在服装上&#xff0c;IDM-VTON结合了2D虚拟试穿技术与其他计算机视觉模型&…

N1安装grafana

apt-get install -y adduser libfontconfig1 musl wget https://dl.grafana.com/enterprise/release/grafana-enterprise_10.0.10_arm64.debdpkg -i grafana-enterprise_10.0.10_arm64.debsystemctl daemon-reload systemctl enable grafana.service --now ip:3000 默认密码…

[Linux] 层层深入理解文件系统——(3)磁盘组织存储的文件

标题&#xff1a;[Linux] 层层深入理解文件系统——&#xff08;3&#xff09;磁盘组织组织存储的文件 个人主页水墨不写bug 目录 一、磁盘中的文件 1&#xff09;磁盘的物理结构 2&#xff09;磁盘的CHS寻址法 3&#xff09;磁盘的空间管理 二、磁盘如何组织存储文件 三…

止步阿里一面。。。

时间过的真快&#xff0c;转眼间国庆已经过去一周了&#xff0c;又到了新的一周&#xff0c;继续分享最新的面经。 今天分享的是粉丝在阿里巴巴的一面&#xff0c;考察了数据库、redis、kafka、ES和项目&#xff0c;数据库和redis不用多说&#xff0c;项目必用面试必考&#x…

【隐私计算篇】替换半同态使用全同态加速计算联邦机器学习算法的实证分析

1. 背景介绍 联邦学习&#xff08;Federated Learning&#xff0c;FL&#xff09;是隐私计算中常见的一种技术范式&#xff0c;其本质是一种面向可信数据流通的分布式机器学习框架&#xff0c;允许多个参与方在不共享其本地数据的前提下&#xff0c;协同训练机器学习模型。与传…