[实时计算flink]数据摄入YAML作业快速入门

news2025/1/16 21:06:57

实时计算Flink版基于Flink CDC,通过开发YAML作业的方式有效地实现了将数据从源端同步到目标端的数据摄入工作。本文介绍如何快速构建一个YAML作业将MySQL库中的所有数据同步到StarRocks中。

前提条件

  • 已创建Flink工作空间,详情请参见开通实时计算Flink版。

  • 上下游存储

    • 已创建RDS MySQL实例,详情请参见快速创建RDS MySQL实例。

    • 已创建StarRocks实例,详情请参见步骤一:创建存算一体版StarRocks实例。

    说明

    RDS MySQL和StarRocks需要与Flink工作空间在相同VPC下,否则需要打通网络和配置RDS MySQL的IP白名单,详情请参见如何访问跨VPC的其他服务?、实时计算Flink版如何访问公网?和操作指导。

背景信息

假设MySQL实例中有一个order_dw_mysql库,里面有名称为orders、orders_pay和product_catalog的3张业务表。此时,如果您希望开发一个数据摄入YAML作业,将这些表和数据都同步到StarRocks的order_dw_sr数据库中,则可以按照以下步骤进行:

  1. 步骤一:准备RDS MySQL测试数据

  2. 步骤二:开发数据摄入YAML作业

  3. 步骤三:启动数据摄入YAML作业

  4. 步骤四:在StarRocks上查看同步结果

步骤一:准备RDS MySQL测试数据

  1. 创建数据库和账号。

    为目标实例创建名称为order_dw_mysql数据库和具有对应数据库读写权限的普通账号。具体操作请参见创建数据库和账号和管理数据库。

  2. 通过DMS登录RDS MySQL。

    详情请参见通过DMS登录RDS MySQL。

  3. 在已登录的SQL Console窗口,输入如下命令后单击执行,创建数据库和三张业务表,并插入数据。

    CREATE TABLE `orders` (
      order_id bigint not null primary key,
      user_id varchar(50) not null,
      shop_id bigint not null,
      product_id bigint not null,
      buy_fee numeric(20,2) not null,   
      create_time timestamp not null,
      update_time timestamp not null default now(),
      state int not null 
    );
    
    
    CREATE TABLE `orders_pay` (
      pay_id bigint not null primary key,
      order_id bigint not null,
      pay_platform int not null, 
      create_time timestamp not null
    );
    
    
    CREATE TABLE `product_catalog` (
      product_id bigint not null primary key,
      catalog_name varchar(50) not null
    );
    
    -- 准备数据
    INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee');
    
    INSERT INTO orders VALUES
    (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
    (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
    (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
    (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
    (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
    (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
    (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
    
    INSERT INTO orders_pay VALUES
    (2001, 100001, 1, '2023-02-15 17:40:56'),
    (2002, 100002, 1, '2023-02-15 17:40:56'),
    (2003, 100003, 0, '2023-02-15 17:40:56'),
    (2004, 100004, 0, '2023-02-15 17:40:56'),
    (2005, 100005, 0, '2023-02-15 18:40:56'),
    (2006, 100006, 0, '2023-02-15 18:40:56'),
    (2007, 100007, 0, '2023-02-15 18:40:56');

步骤二:开发数据摄入YAML作业

  1. 登录实时计算管理控制台。

  2. 在左侧导航栏选择数据开发 > 数据摄入

  3. 单击新建,选择MySQL到Starrocks数据同步,单击下一步

  4. 填写作业名称存储位置和选择引擎版本后,单击确定

  5. 配置YAML作业代码信息。

    将MySQL中order_dw_mysql数据库下的所有表同步到starrocks的order_dw_sr数据库中,代码示例如下。

    source:
      type: mysql
      hostname: rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com
      port: 3306
      username: ${secret_values.mysqlusername}
      password: ${secret_values.mysqlpassword}
      tables: order_dw_mysql.\.*
      server-id: 5405-5415
    
    sink:
      type: starrocks
      name: StarRocks Sink
      jdbc-url: jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030
      load-url: fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030
      username: ${secret_values.starrocksusername}
      password: ${secret_values.starrockspassword}
      table.create.properties.replication_num: 1
      
    route:
      - source-table: order_dw_mysql.\.*
        sink-table: order_dw_sr.<>
        replace-symbol: <>
        description: route all tables in source_db to sink_db
    
    pipeline:
      name: Sync MySQL Database to StarRocks

    关于MySQL和Starrocks的本示例需要的配置信息说明如下表所示,数据摄入更多参数详情请参见MySQL和StarRocks。

    类别

    参数

    说明

    示例值

    source

    hostname

    MySQL数据库的IP地址或者Hostname。

    建议填写专有网络VPC地址。

    rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com

    port

    MySQL数据库服务的端口号。

    3306

    username

    MySQL数据库服务的用户名和密码。填写您步骤一:准备RDS MySQL测试数据中创建的账号和密码信息。

    说明

    本示例使用变量,可以避免明文展示密码等信息,详情请参见变量管理。

    ${secret_values.mysqlusername}

    password

    ${secret_values.mysqlpassword}

    tables

    MySQL表名。支持正则表达式以读取多个表的数据。

    本文将同步order_dw_mysql数据库所有表及数据。

    order_dw_mysql.\.*

    server-id

    数据库客户端的一个数字ID。

    5405-5415

    sink

    jdbc-url

    JDBC连接的URL。

    指定FE(Front End)的IP和查询端口,格式为jdbc:mysql://ip:port

    您可以在E-MapReduce控制台实例详情页签,查看目标实例的FE内网地址查询端口

     jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030

    load-url

    连接到FE节点的HTTP服务URL。

    您可以在E-MapReduce控制台实例详情页签,查看目标实例的FE内网地址HTTP端口

    fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030

    username

    StarRocks连接用户名和密码。

    此处需要填写为您开通StarRocks时填写的用户名和密码信息。

    说明

    本示例使用变量,可以避免明文展示密码等信息,详情请参见变量管理。

    ${secret_values.starrocksusername}

    password

    ${secret_values.starrockspassword}

    route

    source-table

    指定生效上游表。

    order_dw_mysql.\.*

    sink-table

    指定数据路由的目标位置。

    order_dw_sr.<>

    replace-symbol

    在使用模式匹配功能时,用于指代上游表名的字符串。

    <>

  6. 单击部署

步骤三:启动数据摄入YAML作业

  1. 数据摄入页面,单击部署后,在弹出的对话框中,单击确定

  2. 运维中心 > 作业运维页面,单击目标YAML作业操作中的启动

  3. 单击启动

    本示例选择为无状态启动,参数配置详情请参见作业启动。作业启动后,您可以在作业运维页面观察作业的运行信息和状态。

步骤四:在StarRocks上查看同步结果

当YAML作业处于运行中后,您就可以在StarRocks查看数据同步情况。

  1. 通过EMR StarRocks Manager连接StarRocks实例。

  2. 在左侧导航栏,单击SQL Editor,在数据库页签,单击

    image

    按钮。

    您会看到default_catalog下出现名称为order_dw_sr的数据库。

  3. 查询列表页签,单击+文件,新建查询脚本后,输入以下SQL语句,单击运行

    SELECT * FROM default_catalog.order_dw_sr.orders order by order_id;
    SELECT * FROM default_catalog.order_dw_sr.orders_pay order by pay_id;
    SELECT * FROM default_catalog.order_dw_sr.product_catalog order by product_id;
  4. 在命令下方查看同步结果。

    您会看到StarRocks中已存在和MySQL数据库中相同名称的表及数据。

    image

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2222104.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于SSM+小程序的就业管理系统(就业1)

&#x1f449;文末查看项目功能视频演示获取源码sql脚本视频导入教程视频 1、项目介绍 学生实习与就业管理系统的设计与实现管理员、辅导员管理、企业管理、工作管理人、用户管理5个角色。 1、管理员实现了基础数据管理、辅导员管理、企业管理、工作管理人管理、公告信息管理…

2024.10.23 软考学习笔记(知识点)

刷题网站&#xff1a; 软考中级软件设计师在线试题、软考解析及答案-51CTO题库-软考在线做题备考工具

RTDETR 引入 MogaBlock | 多阶门控聚合网络 | ICLR 2024

本改进已集成到 YOLOv8-Magic 框架。 通过尽可能将卷积核的上下文扩展为全局,现代卷积神经网络(ConvNets)在计算机视觉任务中展现出了巨大的潜力。然而,最近在深度神经网络(DNN)中进行的多阶博弈论交互研究揭示了现代卷积神经网络的表示瓶颈,即随着卷积核大小的增加,复…

048_python基于Python的广东旅游数据分析

目录 系统展示 开发背景 代码实现 项目案例 获取源码 博主介绍&#xff1a;CodeMentor毕业设计领航者、全网关注者30W群落&#xff0c;InfoQ特邀专栏作家、技术博客领航者、InfoQ新星培育计划导师、Web开发领域杰出贡献者&#xff0c;博客领航之星、开发者头条/腾讯云/AW…

SQLite 3.47.0 发布,大量新功能来袭

SQLite 开发团队于 2024 年 10 月 21 日发布了 SQLite 3.47.0 版本&#xff0c;我们来了解一下新版本的改进功能。 触发器增强 SQLite 3.47.0 版本开始&#xff0c;触发器函数 RAISE() 的 error-message 参数可以支持任意 SQL 表达式。在此之前&#xff0c;该参数只能是字符串…

go 语言 Gin Web 框架的实现原理探究

Gin 是一个用 Go (Golang) 编写的 Web 框架&#xff0c;性能极优&#xff0c;具有快速、支持中间件、crash处理、json验证、路由组、错误管理、内存渲染、可扩展性等特点。 官网地址&#xff1a;https://gin-gonic.com/ 源码地址&#xff1a;https://github.com/gin-gonic/gi…

CMOS 图像传感器:像素寻址与信号处理

CMOS image sensor : pixel addressing and signal processing CMOS image sensor 对于寻址和信号处理有三种架构 pixel serial readout and processingcolumn parallel readout and processingpixel parallel readout and processing 其中&#xff0c;图 (b) column paralle…

从 Web2 到 Web3:区块链技术的演进与未来趋势

在互联网的发展历程中&#xff0c;我们正经历着从 Web2 向 Web3 的重大转变。这个转变的核心驱动力之一&#xff0c;便是区块链技术的不断演进。 Web2 时代&#xff0c;互联网上的社交媒体、在线购物、视频分享等平台蓬勃发展。用户可以便捷地获取信息、与他人交流互动&#x…

css模糊遮罩效果

原图&#xff1a; 模糊后的图片&#xff1a; html: <div class"bj"><div class"mengban"></div> </div> css: .bj {width: 750rpx;height: 643rpx;background-image:url(https://onlinekc.a.hlidc.cn/uploads/20241023/9e552fc…

如何快速解决游戏提示系统中的emp.dll缺失问题

emp.dll是一个动态链接库&#xff08;Dynamic Link Library, DLL&#xff09;文件&#xff0c;这类文件在Windows操作系统中扮演着至关重要的角色。它们包含了可由多个程序同时使用的代码和数据&#xff0c;其主要目的是实现模块化&#xff0c;以便于程序的更新和动态链接。emp…

ECharts饼图-饼图34,附视频讲解与代码下载

引言&#xff1a; 在数据可视化的世界里&#xff0c;ECharts凭借其丰富的图表类型和强大的配置能力&#xff0c;成为了众多开发者的首选。今天&#xff0c;我将带大家一起实现一个饼图图表&#xff0c;通过该图表我们可以直观地展示和分析数据。此外&#xff0c;我还将提供详…

【毕业设计】基于SpringBoot的网上商城系统

前言 &#x1f525;本系统可以选作为毕业设计&#xff0c;运用了现在主流的SSM框架&#xff0c;采用Maven来帮助我们管理依赖&#xff0c;所选结构非常合适大学生所学的技术&#xff0c;非常合适作为大学的毕业设计&#xff0c;难以适中。 &#x1f525;采用技术&#xff1a;Sp…

Java项目-基于springboot框架的疫苗接种管理系统项目实战(附源码+文档)

作者&#xff1a;计算机学长阿伟 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、ElementUI等&#xff0c;“文末源码”。 开发运行环境 开发语言&#xff1a;Java数据库&#xff1a;MySQL技术&#xff1a;SpringBoot、Vue、Mybaits Plus、ELementUI工具&#xff1a;IDEA/…

huggingface的数据集下载(linux下clone)

1. 安装lfs sudo apt-get install git-lfs 或者 apt-get install git-lfs 2. git lfs install git lfs install 3. git clone dataset包 第2&#xff0c;3步骤的截图如下&#xff1a;

CentOS7 上安装GitLab的经历

一、安装必要的基础环境 1.安装依赖包 [rootgitlab-server ~]#yum install curl policycoreutils openssh-server openssh-clients postfix wget git patch -y [rootgitlab-server ~]# systemctl start postfix 2.配置yum源(由于网络问题&#xff0c;国内用户请使用清华大学…

架构设计笔记-21-案例分析

1.遗留系统策略 / 数据迁移 / REST和RPC风格 2.分布式系统 / 分布式对象调用 3.开放式架构 / GOA 4.ESB 5.FMEA故障分析 6. 加密 / 公钥体系机制 / 加解密API和透明加密 7.嵌入式系统故障 / 故障滤波算法 / 容错算法 8.开源框架struts / spring / Hibenate 9.企业应用集成 10.T…

Python 应用可观测重磅上线:解决 LLM 应用落地的“最后一公里”问题

作者&#xff1a;彦鸿 背景 随着 LLM&#xff08;大语言模型&#xff09;技术的不断成熟和应用场景的不断拓展&#xff0c;越来越多的企业开始将 LLM 技术纳入自己的产品和服务中。LLM 在自然语言处理方面表现出令人印象深刻的能力。然而&#xff0c;其内部机制仍然不明确&am…

2023 WMCTF pwn【blindless jit】

文章目录 blindlessIDA结构体命名逆向漏洞方法1方法2 exp jitstrtol(v9, &endptr, 16)__errno_location和__throw_out_of_range详细解释&#xff1a; __errno_location相关具体操作详细分析为什么要执行上述代码&#xff1f;示例代码段的解释 _acrt_iob_funcSetProcessMiti…

Vue.js 学习总结(9)—— Vue 3 组件封装技巧

1、需求说明 需求背景&#xff1a;日常开发中&#xff0c;我们经常会使用一些UI组件库诸如and design vue、element plus等辅助开发&#xff0c;提升效率。有时我们需要进行个性化封装&#xff0c;以满足在项目中大量使用的需求。错误示范&#xff1a;基于a-modal封装一个自定…

MinIO安装教程

MinIO简介 Minio是一个开源的、云原生的分布式对象存储系统&#xff0c;是一个基于Apache License v2.0开源协议的对象存储服务。它兼容亚马逊S3云存储服务接口&#xff0c;非常适合于存储大容量非结构化的数据。 它一大特点就是轻量&#xff0c;虽然轻量&#xff0c;却拥有着不…