[实时计算flink]基于Paimon的数据库实时入湖快速入门

news2025/1/15 23:36:11

Apache Paimon是一种流批统一的湖存储格式,支持高吞吐的写入和低延迟的查询。本文通过Paimon Catalog和MySQL连接器,将云数据库RDS中的订单数据和表结构变更导入Paimon表中,并使用Flink对Paimon表进行简单分析。

背景信息

Apache Paimon是一种流批统一的湖存储格式,支持高吞吐的写入和低延迟的查询。目前阿里云实时计算Flink版,以及开源大数据平台E-MapReduce上常见的计算引擎(例如Spark、Hive或Trino)都与Paimon有着较为完善的集成度。您可以借助Apache Paimon快速地在HDFS或者OSS上构建自己的数据湖存储服务,并接入计算引擎实现数据湖的分析。

前提条件

  • 如果您使用RAM用户或RAM角色等身份访问,需要确认已具有Flink控制台相关权限,详情请参见权限管理。

  • 已创建Flink工作空间,详情请参见开通实时计算Flink版。

步骤一:准备数据源

  1. 快速创建RDS MySQL实例。

    说明

    RDS MySQL版实例需要与Flink工作空间处于同一VPC。不在同一VPC下时请参见网络连通性。

  2. 创建数据库和账号。

    创建名称为orders的数据库,并创建高权限账号或具有数据库orders读写权限的普通账号。

  3. 通过DMS登录RDS MySQL,在orders数据库中创建表orders_1和orders_2。

    CREATE TABLE `orders_1` (
        orderkey BIGINT NOT NULL,
        custkey BIGINT,
        order_status VARCHAR(100),
        total_price DOUBLE,
        order_date DATE,
        order_priority VARCHAR(100),
        clerk VARCHAR(100),
        ship_priority INT,
        comment VARCHAR(100),
        PRIMARY KEY (orderkey)
    );
    
    CREATE TABLE `orders_2` (
        orderkey BIGINT NOT NULL,
        custkey BIGINT,
        order_status VARCHAR(100),
        total_price DOUBLE,
        order_date DATE,
        order_priority VARCHAR(100),
        clerk VARCHAR(100),
        ship_priority INT,
        comment VARCHAR(100),
        PRIMARY KEY (orderkey)
    );
  4. 插入如下测试数据。

    INSERT INTO `orders_1` VALUES (1, 1, 'O', 131251.81, '1996-01-02', '5-LOW', 'Clerk#000000951', 0, 'nstructions sleep furiously among ');
    INSERT INTO `orders_1` VALUES (2, 3, 'O', 40183.29, '1996-12-01', '1-URGENT', 'Clerk#000000880', 0, ' foxes. pending accounts at the pending, silent asymptot');
    INSERT INTO `orders_1` VALUES (3, 6, 'F', 160882.76, '1993-10-14', '5-LOW', 'Clerk#000000955', 0, 'sly final accounts boost. carefully regular ideas cajole carefully. depos');
    INSERT INTO `orders_1` VALUES (4, 6, 'O', 31084.79, '1995-10-11', '5-LOW', 'Clerk#000000124', 0, 'sits. slyly regular warthogs cajole. regular, regular theodolites acro');
    INSERT INTO `orders_1` VALUES (5, 2, 'F', 86615.25, '1994-07-30', '5-LOW', 'Clerk#000000925', 0, 'quickly. bold deposits sleep slyly. packages use slyly');
    INSERT INTO `orders_1` VALUES (6, 2, 'F', 36468.55, '1992-02-21', '4-NOT SPECIFIED', 'Clerk#000000058', 0, 'ggle. special, final requests are against the furiously specia');
    INSERT INTO `orders_1` VALUES (7, 2, 'O', 171488.73, '1996-01-10', '2-HIGH', 'Clerk#000000470', 0, 'ly special requests ');
    INSERT INTO `orders_1` VALUES (8, 6, 'O', 116923.00, '1995-07-16', '2-HIGH', 'Clerk#000000616', 0, 'ise blithely bold, regular requests. quickly unusual dep');
    INSERT INTO `orders_1` VALUES (9, 3, 'F', 99798.76, '1993-10-27', '3-MEDIUM', 'Clerk#000000409', 0, 'uriously. furiously final request');
    INSERT INTO `orders_1` VALUES (10, 3, 'O', 41670.02, '1998-07-21', '3-MEDIUM', 'Clerk#000000223', 0, 'ly final packages. fluffily final deposits wake blithely ideas. spe');
    INSERT INTO `orders_2` VALUES (11, 6, 'O', 148789.52, '1995-10-23', '4-NOT SPECIFIED', 'Clerk#000000259', 0, 'zzle. carefully enticing deposits nag furio');
    INSERT INTO `orders_2` VALUES (12, 5, 'O', 38988.98, '1995-11-03', '1-URGENT', 'Clerk#000000358', 0, ' quick packages are blithely. slyly silent accounts wake qu');
    INSERT INTO `orders_2` VALUES (13, 4, 'F', 113701.89, '1992-06-03', '3-MEDIUM', 'Clerk#000000456', 0, 'kly regular pinto beans. carefully unusual waters cajole never');
    INSERT INTO `orders_2` VALUES (14, 6, 'O', 46366.56, '1996-08-21', '4-NOT SPECIFIED', 'Clerk#000000604', 0, 'haggle blithely. furiously express ideas haggle blithely furiously regular re');
    INSERT INTO `orders_2` VALUES (15, 4, 'O', 219707.84, '1996-09-20', '3-MEDIUM', 'Clerk#000000659', 0, 'ole express, ironic requests: ir');
    INSERT INTO `orders_2` VALUES (16, 1, 'F', 20065.73, '1994-07-16', '3-MEDIUM', 'Clerk#000000661', 0, 'wake fluffily. sometimes ironic pinto beans about the dolphin');
    INSERT INTO `orders_2` VALUES (17, 0, 'P', 65883.92, '1995-03-18', '1-URGENT', 'Clerk#000000632', 0, 'ular requests are blithely pending orbits-- even requests against the deposit');
    INSERT INTO `orders_2` VALUES (18, 6, 'F', 79258.24, '1994-01-20', '5-LOW', 'Clerk#000000743', 0, 'y pending requests integrate');
    INSERT INTO `orders_2` VALUES (19, 2, 'O', 116227.05, '1996-12-19', '4-NOT SPECIFIED', 'Clerk#000000547', 0, 'symptotes haggle slyly around the furiously iron');
    INSERT INTO `orders_2` VALUES (20, 1, 'O', 215135.72, '1998-04-18', '3-MEDIUM', 'Clerk#000000440', 0, ' pinto beans sleep carefully. blithely ironic deposits haggle furiously acro');

步骤二:创建Catalog

  1. 进入元数据管理页面。

    1. 登录实时计算控制台。

    2. 单击目标工作空间操作列下的控制台

    3. 单击左侧的元数据管理

  2. 创建Paimon Catalog。

    1. 单击创建Catalog内置Catalog页签,选择Apache Paimon后,单击下一步。

    2. 填写配置信息。

      image.png

      配置项

      说明

      备注

      catalog name

      您自定义的Paimon Catalog名称。

      填写为自定义的英文名。

      metastore

      Paimon表的元数据存储类型:

      • filesystem:仅将元数据存储在OSS中。

      • dlf:除了将元数据存储在OSS上外,还会将元数据同步到阿里云数据湖构建服务DLF中。

      本文选择filesystem。

      warehouse

      Paimon Catalog的存储根目录,是一个OSS目录。可以选择创建实时计算Flink版时使用的OSS Bucket,也可以使用同一账号同一地域下的其他OSS Bucket。

      格式为oss://<bucket>/<object>。其中:

      • bucket:表示您创建的OSS Bucket名称。

      • object:表示您存放数据的路径。

      您可以在OSS管理控制台上查看您的bucket和object名称。

    3. 单击确定

步骤三:创建Flink作业

  1. 数据开发 > ETL页面,单击新建

  2. 选择空白的流作业草稿,单击下一步

  3. 新建作业草稿对话框,填写作业配置信息。

    作业参数

    说明

    文件名称

    作业的名称。

    说明

    作业名称在当前项目中必须保持唯一。

    存储位置

    指定该作业的存储位置。

    您还可以在现有文件夹右侧,单击

    新建文件夹

    图标,新建子文件夹。

    引擎版本

    当前作业使用的Flink的引擎版本。引擎版本号含义、版本对应关系和生命周期重要时间点详情请参见引擎版本介绍。

  4. 单击创建

  5. 输入以下语句,实时捕获orders数据库中相关表的变化,并同步到Paimon表中。

    -- 使用刚刚创建的Paimon Catalog
    USE CATALOG `test`;
    
    -- 创建一张MySQL临时表,捕获表名符合正则表达式orders_\d+的MySQL表的变化
    CREATE TEMPORARY TABLE mysql_orders (
        orderkey BIGINT,
        custkey BIGINT,
        order_status VARCHAR(100),
        total_price DOUBLE,
        order_date DATE,
        order_priority VARCHAR(100),
        clerk VARCHAR(100),
        ship_priority INT,
        `comment` VARCHAR(100),
        PRIMARY KEY (orderkey) NOT ENFORCED
    ) WITH (
        'connector' = 'mysql',
        'hostname' = 'rm-bp1s1xgll21ey****.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = 'your_username',
        'password' = '${secret_values.mysql_pw}',
        'database-name' = 'orders',
        'table-name' = 'orders_\d+',
        'server-time-zone' = 'Asia/Shanghai'
    );
    
    -- 将MySQL表的变化同步到Paimon表中
    CREATE TABLE IF NOT EXISTS orders AS TABLE mysql_orders;

    ​参数说明如下,您可以根据实际情况进行修改。MySQL连接器更多参数详情请参见MySQL。

    参数

    说明

    备注

    connector

    连接器类型。

    本示例固定值为mysql

    hostname

    MySQL数据库的IP地址或者Hostname。

    本文填写为RDS实例的内网地址。

    username

    MySQL数据库服务的用户名。

    无。

    password

    MySQL数据库服务的密码。

    本示例通过使用名为mysql_pw密钥的方式填写密码值,避免信息泄露,详情请参见变量管理。

    database-name

    MySQL数据库名称。

    本示例填写为步骤一:准备数据源中创建的数据库。

    table-name

    MySQL表名。

    作为源表时,表名支持正则表达式以读取多个表的数据。

    port

    MySQL数据库服务的端口号。

    无。

  6. (可选)单击右上方的深度检查,确认作业Flink SQL语句中是否存在语法错误。

  7. 单击右上方的部署,单击确定

  8. 在左侧导航栏,单击运维中心 > 作业运维,单击目标作业名称,进入作业部署详情页面。

  9. 单击运行参数配置区域右侧的编辑

    本文为了更快观察到任务运行的结果,将系统检查点间隔两次系统检查点之间的最短时间间隔均改为10s,单击保存

    image

  10. 在目标作业部署详情页顶部,单击启动,选择无状态启动

    image.png

  11. 查询Paimon数据。

    1. 数据开发 > 数据查询页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片段后单击左侧代码行上的运行

      select custkey, sum(total_price) from `test`.`default`.`orders` group by custkey;
    2. 结果浏览完成后,单击左侧的

      image.png

      停止调试。

      image.png

步骤四:更新MySQL表结构

本部分将演示MySQL表结构变更同步到Paimon表的功能。

  1. 登录云数据库RDS控制台。

  2. 在orders数据库,输入如下SQL语句,然后单击执行,为两张数据表添加一列,并填充一些数据。

    ALTER TABLE `orders_1` ADD COLUMN quantity BIGINT; 
    ALTER TABLE `orders_2` ADD COLUMN quantity BIGINT; 
    UPDATE `orders_1` SET `quantity` = 100 WHERE `orderkey` < 5;
    UPDATE `orders_2` SET `quantity` = 100 WHERE `orderkey` > 15;
  3. 在实时计算控制台数据开发 > 数据查询页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片段后单击左侧代码行上的运行

    select * from `test`.`default`.`orders` where `quantity` is not null;

    结果如下,浏览完成后,可单击左侧的

    image.png

    停止调试。

    Image 32

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2222514.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Elasticsearch如何搜索日志并存储

Elasticsearch 是一个分布式搜索引擎&#xff0c;擅长对大量数据进行实时的搜索、分析和存储。它常被用于处理日志数据&#xff0c;配合工具如 Logstash 或 Filebeat 来收集和存储日志&#xff0c;并提供强大的搜索和分析能力。接下来&#xff0c;我将解释 Elasticsearch 如何处…

8年经验之谈 —— 如何使用自动化工具编写测试用例?

以下为作者观点&#xff0c;仅供参考&#xff1a; 在快速变化的软件开发领域&#xff0c;保证应用程序的可靠性和质量至关重要。随着应用程序复杂性和规模的不断增加&#xff0c;仅手动测试 无法满足行业需求。 这就是测试自动化发挥作用的地方&#xff0c;它使软件测试人员…

NVR小程序接入平台/设备EasyNVR多个NVR同时管理的高效解决方案

在当今的数字化安防时代&#xff0c;视频监控系统的需求日益复杂和多样化。为了满足不同场景下的监控需求&#xff0c;一种高效、灵活且兼容性强的安防视频监控平台——NVR批量管理软件/平台EasyNVR应运而生。本篇探讨这一融合所带来的创新与发展。 一、NVR监测软件/设备EasyNV…

【设计模式】MyBatis 与经典设计模式:从ORM到设计的智慧

作者&#xff1a;后端小肥肠 &#x1f347; 我写过的文章中的相关代码放到了gitee&#xff0c;地址&#xff1a;xfc-fdw-cloud: 公共解决方案 &#x1f34a; 有疑问可私信或评论区联系我。 &#x1f951; 创作不易未经允许严禁转载。 姊妹篇&#xff1a; 【设计模式】揭秘Spri…

vue2 el-select赋值无效(无法选中)

背景&#xff1a;点击添加明细时&#xff0c;el-table会新增一条数据&#xff0c;其中&#xff0c;存货原申购用途 会根据 费用承担事业部 下拉框的值改变而改变&#xff0c;所以每次费用承担事业部发生变化时&#xff0c;都需要清空存货原申购用途的值 最开始是直接这样写的&a…

D. Deleting Divisors

传送门&#xff1a;Problem - D - Codeforces 题意&#xff1a; 思路&#xff1a;博弈论 打表找规律&#xff08; 递推 &#xff09; 如果 ans[i] 为 true &#xff0c;则 Alice 能赢 ans[i] 为 false&#xff0c;则 Bob 会赢 数字 n 的一个因子 为 x &#xff0c; 如果 …

【简历】25届浙江某211大学JAVA简历:明明项目有货,但是长篇大论减分!!

注&#xff1a;为保证用户信息安全&#xff0c;姓名和学校等信息已经进行同层次变更&#xff0c;内容部分细节也进行了部分隐藏 另外&#xff1a;我们出这一系列校招简历指导的原因&#xff0c;就是看很多学生被忽悠&#xff0c;没有先定位大厂、中厂还是小公司&#xff0c;导…

【日志】力扣刷题——买卖股票的最佳时机 // Unity——添加数据表文件、EPPlus插件以及编辑器开发生成Excel转Json代码文件复习

2024.10.17 【力扣刷题】 两题连一起&#xff0c;思路很像 121. 买卖股票的最佳时机 - 力扣&#xff08;LeetCode&#xff09; 122. 买卖股票的最佳时机 II - 力扣&#xff08;LeetCode&#xff09; 121. 买卖股票的最佳时机 按照顺序查找&#xff0c;找到最大的差值时&#x…

Prompt-Tuning方法学习

文章目录 一、背景1.1 Pre-training1.2 Fine-Tuning1.3 高效微调&#xff08;SOTA PEFT&#xff09;1.4 基于强化学习的进阶微调方法&#xff08;RLHF&#xff09; 二、Prompt-Tuning技术2.1 发展历程2.2 Prompt模板构建方式 三、基于连续提示的Prompt Tuning四、Q&A 一、背…

【升华】一文从0到1到实际性应用大语言模型(LLM)

一、前言 相信网已经很多LLM大模型 的介绍 &#xff0c;概念&#xff0c;发展历史&#xff0c;应用场景的很多文章&#xff0c;但是很多文章都是缺少细节的描述&#xff0c;到底怎么用&#xff0c;需要些什么东西怎么层显出来。所以虽然看了很多大模型的介绍&#xff0c;也仅仅…

【Linux篇】初学Linux,如何快速搭建Linux开发环境

文章目录 前言1. Linux背景介绍1.1 UNIX的发展历史1.2 Linux的发展历史 2. 企业应用现状3. 开源3.1 探索Linux源代码3.2 开源 VS 闭源 4. Linux的版本4.1 技术线4.2 商业产品线 5. os概念&#xff0c;定位6. 搭建Linux环境6.1 Linux环境的搭建方式6.2 购买云服务器 7. 使用XShe…

从一个简单的计算问题,看国内几个大语言模型推理逻辑能力

引言 首先&#xff0c;来看问题&#xff1a; 123456*987654等于多少&#xff0c;给出你计算的过程。 从openai推出chatgpt以来&#xff0c;大模型发展的很快&#xff0c;笔者也经常使用免费的大语言模型辅助进行文档编写和编码工作。大模型推出时间也好久了&#xff0c;笔者想…

红队-安全见闻篇(上)

声明 学习视频来自B站UP主 泷羽sec的个人空间-泷羽sec个人主页-哔哩哔哩视频,如涉及侵权马上删除文章 笔记的只是方便各位师傅学习知识,以下网站只涉及学习内容,其他的都与本人无关,切莫逾越法律红线,否则后果自负 一.编程与开发 1.后端语言学习 C语⾔&#xff1a;⼀种通⽤的…

[解决]在Arduino IDE 打开 ino 类型文件处于read only editor模式

今天打开一个ino类型文件发现这个问题&#xff0c;无法编辑…… 解决方法&#xff1a;右键点击ino类型文件&#xff08;你打开的那个&#xff09;进入属性栏 发现只读被勾上&#xff0c;取消打勾并点击最下方的确认 现在就可以编辑啦

Unity目录居然这么写就不会被引入到项目内

只要加一个小符号~ 这是一个约定俗成的符号么~~~~ 当然&#xff0c;代码管理器还是识别的 也&#xff0c;只要稍微加一些规则&#xff0c;去避免代码入库 只要一天不死&#xff0c;还是能在程序员这个座位上看到新的东西 什么时候才到尽头&#xff1f;&#xff1f;&#xff1f…

服务器数据恢复—EXT3文件系统下邮件数据被误删的数据恢复案例

服务器数据恢复环境&#xff1a; 邮件服务器中有一组由8块盘组成的RAID5阵列, 上层是Linux操作系统EXT3文件系统。 服务器故障&#xff1a; 由于误删除导致文件系统中的邮件数据丢失。 服务器数据恢复过程&#xff1a; 1、将故障服务器中所有硬盘做好标记后取出&#xff0c;硬…

面试必备:RabbitMQ与Kafka核心知识点总结

写在前面 &#x1f525;我把后端Java面试题做了一个汇总&#xff0c;有兴趣大家可以看看&#xff01;这里&#x1f449; ⭐️在无数次的复习巩固中&#xff0c;我逐渐意识到一个问题&#xff1a;面对同样的面试题目&#xff0c;不同的资料来源往往给出了五花八门的解释&#…

Windows电脑怎么设置局域网内共享磁盘?

一、设置局域网磁盘共享 1、假设这是电脑A&#xff0c;先启动公用文件夹共享&#xff1a; &#xff08;1&#xff09;点击【控制面板】 &#xff08;2&#xff09;点击【网络和Internet】 &#xff08;3&#xff09;点击【网络和共享中心】 &#xff08;4&#xff09;点击【…

【linux网络编程】 | 网络基础Ⅰ| 认识网络

前言: 在本节几乎不讲任何网络协议的系结&#xff0c; 只是将网络的概念搭建起来。本节将会讲到网络的一些专有名词&#xff0c; 概念&#xff0c; 以及网络的结构划分等等。 主要是带大家做一下前期知识的铺垫。 下面&#xff0c; 开始我们的学习吧&#xff01; ps:本节内容因…

成功解决pycharm软件中按住Ctrl+点击指定函数却不能跳转到对应库中的源代码

成功解决pycharm软件中按住Ctrl点击指定函数却不能跳转到对应库中的源代码 目录 解决问题 解决方法 解决问题 在pycharm软件中按住Ctrl点击指定函数却不能跳转到对应库中的源代码 解决方法