三.海量数据实时分析-FlinkCDC实现Mysql数据同步到Doris

news2025/1/16 13:41:56

FlinkCDC 同步Mysql到Doris

参考:https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/zh/docs/get-started/quickstart/mysql-to-doris/

1.安装Flink

下载 Flink 1.18.0,下载后把压缩包上传到服务器,使用tar -zxvf flink-xxx-bin-scala_2.12.tgz 解压后得到 flink-1.18.0 目录

cd flink-1.18.1

然后需要配置FLINK_HOME ,执行vi /etc/profile,增加如下内容

export FLINK_HOME=/root/flink/flink-1.18.1 #你的安装目录
export PATH=$PATH:$FLINK_HOME/bin

执行:source /etc/profile 让其生效,然后通过在 conf/flink-conf.yaml 配置文件追加下列参数开启 checkpoint,每隔 3 秒做一次 checkpoint。

execution.checkpointing.interval: 3000

使用下面的命令启动 Flink 集群,

./bin/start-cluster.sh

启动成功的话,可以在 http://localhost:8081/访问到 Flink Web UI,多次执行 start-cluster.sh 可以拉起多个 TaskManager。如下所示:

在这里插入图片描述
访问之前记得开放防火墙端口

firewall-cmd --zone=public --add-port=8081/tcp --permanent;
firewall-cmd --zone=public --add-port=8030/tcp --permanent;
firewall-cmd --zone=public --add-port=8040/tcp --permanent;
firewall-cmd --zone=public --add-port=9030/tcp --permanent;
firewall-cmd --reload ;

2.准备同步的数据库

准备好Mysql数据库,创建数据库 app_db 和表 orders,products,shipments,并插入数据

-- 创建数据库
CREATE DATABASE app_db;

USE app_db;

-- 创建 orders 表
CREATE TABLE `orders` (
`id` INT NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
PRIMARY KEY (`id`)
);

-- 插入数据
INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);

-- 创建 shipments 表
CREATE TABLE `shipments` (
`id` INT NOT NULL,
`city` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);

-- 插入数据
INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');

-- 创建 products 表
CREATE TABLE `products` (
`id` INT NOT NULL,
`product` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);

-- 插入数据
INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer');
INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap');
INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');

给doris创建数据库,通过 Web UI 创建 app_db 数据库 :create database app_db;

在这里插入图片描述

3.安装FlinkCDC

下载 flink cdc-3.0.0 的二进制压缩包 flink-cdc-3.0.0-bin.tar.gz,并解压得到目录 flink cdc-3.0.0 ':. flink-cdc-3.0.0 下会包含 bin、lib、log、conf 四个目录

在这里插入图片描述

然后把下面2个 connector 包,并且移动到 lib 目录下

  • MySQL pipeline connector 3.0.0 : mysql的驱动
  • Apache Doris pipeline connector 3.0.0 : doris的驱动

在这里插入图片描述
编写任务配置 yaml 文件 文件可以放到config目录下。 下面给出了一个整库同步的示例文件 mysql-to-doris.yaml,

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:
  type: mysql
  hostname: 192.168.220.253
  port: 3307
  username: root
  password: 123456
  tables: app_db.\.*
  server-id: 5400-5404
  server-time-zone: UTC

sink:
  type: doris
  fenodes: 127.0.0.1:8030
  username: root
  password: 123456
  table.create.properties.light_schema_change: true
  table.create.properties.replication_num: 1

pipeline:
  name: Sync MySQL Database to Doris
  parallelism: 1

其中: source 中的 tables: app_db..* 通过正则匹配同步 app_db 下的所有表。 sink 添加table.create.properties.replication_num :1 参数是由于 只有一个 Doris BE 节点。

最后,进入到 flink-cdc-3.0.0 目录,通过命令行提交任务到 Flink Standalone cluster :bash bin/flink-cdc.sh mysql-to-doris.yaml

[root@localhost flink-cdc-3.0.0]# bash bin/flink-cdc.sh conf/mysql-to-doris.yaml 
Pipeline has been submitted to cluster.
Job ID: 13e2925fd46e5840243c9523cd093e11
Job Description: Sync MySQL Database to Doris

执行之后查看flink的控制台界面 : 访问 8081端口
在这里插入图片描述
点击 Job Name 进入任务,可以看到同步的情况,还可以查看任务日志如下
在这里插入图片描述
登录doris的控制台,查看数据是否同步进去,访问:8030端口
在这里插入图片描述
当我们修改了Mysql中的数据后就会自动同步到Doris

4.表结构同步

Flink CDC 提供了将源表的表结构/数据路由到其他表名的配置,借助这种能力,我们能够实现表名库名替换,整库同步等功能。 下面提供一个配置文件说明:

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:
   type: mysql
   hostname: localhost
   port: 3306
   username: root
   password: 123456
   tables: app_db.\.*
   server-id: 5400-5404
   server-time-zone: UTC

sink:
   type: doris
   fenodes: 127.0.0.1:8030
   benodes: 127.0.0.1:8040
   username: root
   password: ""
   table.create.properties.light_schema_change: true
   table.create.properties.replication_num: 1

route:
   - source-table: app_db.orders
     sink-table: ods_db.ods_orders
   - source-table: app_db.shipments
     sink-table: ods_db.ods_shipments
   - source-table: app_db.products
     sink-table: ods_db.ods_products

pipeline:
   name: Sync MySQL Database to Doris
   parallelism: 1

通过上面的 route 配置,会将 app_db.orders 表的结构和数据同步到 ods_db.ods_orders 中。从而实现数据库迁移的功能。 特别地,source-table 支持正则表达式匹配多表,从而实现分库分表同步的功能,例如下面的配置:

route:
  - source-table: app_db.order\.*
    sink-table: ods_db.ods_orders

这样,就可以将诸如 app_db.order01、app_db.order02、app_db.order03 的表汇总到 ods_db.ods_orders 中。注意,目前还不支持多表中存在相同主键数据的场景,将在后续版本支持。

文章到这就结束了 ,如果对你有帮助请给个好评

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2121411.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

华为认证 vs 红帽认证 怎么选?有什么区别?

随着技术的日新月异,IT认证成为衡量个人技能和专业知识的重要标准。在众多认证中,华为认证和红帽认证以其权威性和实用性,成为业界颇具含金量的标杆。华为认证,作为华为推出的认证,是网络技术领域的权威认证之一&#…

vscode 中使用 yarn 出错

问题 vscode 中使用 yarn 爆红,类似下图的错误: 原因 由于vscode中的集成终端使用的是powershell,所以需要设置下该权限才能正常使用yarn 解决 找到 powershell,以管理身份运行 输入:set-ExecutionPolicy Remot…

MySQL系列—8.物理结构

目录 1.系统表空间 ibdata 2.通用表空间 .ibd 3.独立表空间 4.Undo 表空间 5.临时表空间 6.Redo Log File 1.系统表空间 ibdata 系统表空间由参数innodb_data_file_path定义路径、初始化大小、自动扩展策略 如: innodb_data_file_path/dayta/mysql/ibdata1:…

感恩 各位老师们!和滋养你的人在一起,确实很重要——早读(逆天打工人爬取热门微信文章解读)

感恩 各位老师们 引言Python 代码第一篇 洞见 和滋养你的人在一起,确实很重要第二篇 一天 风云突变结尾 (不是 现在网上在呢么各种图都有 哈哈哈) 引言 今天是什么特殊的日子吗? 没错 教师节 说起这个教师节 我觉得大家更要记住…

【北京迅为】《STM32MP157开发板使用手册》-第十九章 Yocto系统开发

iTOP-STM32MP157开发板采用ST推出的双核cortex-A7单核cortex-M4异构处理器,既可用Linux、又可以用于STM32单片机开发。开发板采用核心板底板结构,主频650M、1G内存、8G存储,核心板采用工业级板对板连接器,高可靠,牢固耐…

C++——list的实现

目录 0.前言 1.节点类 2.迭代器类 ①普通迭代器 ②const迭代器 ③模板迭代器 3.list类 3.1 clear、析构函数、swap ①clear ② 析构函数 ③ swap 3.2构造函数 ①无参构造 ②赋值构造 3.3 迭代器 3.4插入函数 ①insert插入 ②头插 ③尾插 3.5 删除函数…

【分治】归并排序

【分治】归并排序 1. 排序数组1. 1题目来源1.2 题目描述1.3 题目解析 2. LCR 170. 交易逆序对的总数2. 1题目来源2.2 题目描述2.3 题目解析 3. 计算右侧小于当前元素的个数3. 1题目来源3.2 题目描述3.3 题目解析 1. 排序数组 1. 1题目来源 912. 排序数组 1.2 题目描述 给你…

JAVA毕业设计171—基于Java+Springboot+vue3+小程序的宠物店小程序系统(源代码+数据库)

毕设所有选题: https://blog.csdn.net/2303_76227485/article/details/131104075 基于JavaSpringbootvue3小程序的宠物店小程序系统(源代码数据库)171 一、系统介绍 本项目前后端分离(可以改为ssm版本),分为用户、店员、管理员三种角色 1、用户&…

存储课程学习笔记1_访问scsi磁盘读写测试(struct sg_io_hdr,ioctl,mmap)

创建虚拟机时,可以选择SCSI,STAT,NVME不同类型的磁盘。 0:总结 》了解内核提供的访问scsi的结构和方法 (主要是sg_io_hdr_t 结构体和ioctl函数)。 》需要读scsi协议文档,了解相关指令,只演示了16字节固定…

华为eNSP :WLAN的配置

一、WLAN的知识点: VLAN配置: VLAN:可以想象成一个大房子(网络)里划分的不同房间(VLAN)。每个房间可以有自己的功能,比如一个用于睡觉(管理),另一…

软件工程知识点总结(5):详细设计

面向对象详细设计举例:接口描述、算法描述、数据描述 类的详细描述,内含数据、 方法及方法的参数返回值 public class User { private String userId; private String userName; private String password; private int type; public User(String userId…

基于APISIX实现API网关案例分享

一、APISIX介绍 1、定义 Apache APISIX 是一个动态、实时、高性能的云原生 API 网关。它构建于 NGINX + ngx_lua 的技术基础之上,充分利用了 LuaJIT 所提供的强大性能。 2、软件架构 2.1、架构图 APISIX 主要分为两个部分: APISIX 核心:包括 Lua 插件、多语言插件运行时…

ICM20948 DMP代码详解(12)

接前一篇文章:ICM20948 DMP代码详解(11) 上一回开始解析icm20948_sensor_setup函数的第2段代码也即inv_icm20948_init_matrix函数: /* Setup accel and gyro mounting matrix and associated angle for current board */inv_icm20…

前端技术(七)——less 教程

一、less简介 1. less是什么? less是一种动态样式语言,属于css预处理器的范畴,它扩展了CSS语言,增加了变量、Mixin、函数等特性,使CSS 更易维护和扩展LESS 既可以在 客户端 上运行 ,也可以借助Node.js在服…

关于武汉芯景科技有限公司的IIC缓冲器芯片XJ4307开发指南(兼容LTC4307)

一、芯片引脚介绍 1.芯片引脚 2.引脚描述 二、系统结构图 三、功能描述 1.总线超时,自动断开连接 当 SDAOUT 或 SCLOUT 为低电平时,将启动内部定时器。定时器仅在相应输入变为高电平时重置。如果在 30ms (典型值) 内没有变为高…

社交媒体的未来:Facebook如何通过AI技术引领潮流

在数字化时代的浪潮中,社交媒体平台不断演变,以适应用户需求和技术发展的变化。作为全球领先的社交媒体平台,Facebook在这一进程中扮演了重要角色。尤其是人工智能(AI)技术的应用,正在深刻地改变Facebook的…

Docker零基础入门

参考课程https://www.bilibili.com/video/BV1VC4y177re/?vd_source=b15169a302bee35f484245aecc69d4dd 参考书籍Docker 实践 - 面向 AI 开发人员的 Docker 实践 (dockerpractice.readthedocs.io) 1. 什么是Docker 1.1. Docker起源 随着计算机的发展,计算机上已经可以运行多…

Stable Diffusion绘画 | ControlNet应用-Inpaint(局部重绘):更完美的重绘

Inpaint(局部重绘) 相当于小号的AI版PS,不但可以进行局部画面的修改,还可以去除背景中多余的内容,或者是四周画面内容的扩充。 预处理器说明 Inpaint_Global_Harmonious:重绘-全局融合算法,会对整个图片的画面和色调…

【无标题】SAM(Segment Anything Model)

1.SAM是什么? SAM是基于NLP的一个基础模型,专注于提示分割任务,使用提升工程来适应不同的下游分割任务。 2.SAM有什么用? 1)SAM 可以通过简单地单击或交互选择要包含或排除在对象中的点来分割对象。还可以通过使用多边…