flinkcdc 3.0 尝鲜

news2025/2/4 3:58:31

本文会将从环境搭建到demo来全流程体验flinkcdc 3.0
包含了如下内容

  1. flink1.18 standalone搭建
  2. doris 1fe1be 搭建
  3. 整库数据同步
  4. 测试各同步场景
  5. 从检查点重启同步任务

环境搭建

flink环境(Standalone模式)

下载flink 1.18.0 链接 : https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz

解压 :

tar -zxvf flink-1.18.0-bin-scala_2.12.tgz

修改checkpoint 时间间隔 为3秒

vim conf/flink-conf.yaml 
# 94 行(set nu 显示行)
taskmanager.numberOfTaskSlots: 2
# 148 行
execution.checkpointing.interval: 3000

启动

./bin/start-cluster.sh

访问页面 : http://127.0.0.1:8081
image.png

doris环境(1fe1be)

修改环境宿主机的内存映射

# 因为mac内部实现容器的方式不同,直接修改max_map_count值可能无法成功,所以在容器中进行修改
docker run -it --privileged --pid=host --name=change_count debian nsenter -t 1 -m -u -n -i sh
# 修改内存映射值(这个值通常用于限制一个进程打开的文件数量,默认是65530)
sysctl -w vm.max_map_count=2000000
# 退出容器
exit

使用docker compose 搭建doris 1fe1be集群

version: '3'
services:
  docker-fe-01:
    image: "apache/doris:1.2.2-fe-arm"
    container_name: "doris-fe-01"
    hostname: "fe-01"
    environment:
      - FE_SERVERS=fe1:172.20.80.2:9010
      - FE_ID=1
    ports:
      - 8031:8030
      - 9031:9030
    volumes:
      - /Users/antg/docker/doris_1fe_1be/data/fe-01/doris-meta:/opt/apache-doris/fe/doris-meta
      - /Users/antg/docker/doris_1fe_1be/data/fe-01/conf:/opt/apache-doris/fe/conf
      - /Users/antg/docker/doris_1fe_1be/data/fe-01/log:/opt/apache-doris/fe/log
    networks:
      doris_net:
        ipv4_address: 172.20.80.2
  docker-be-01:
    image: "apache/doris:1.2.2-be-arm"
    container_name: "doris-be-01"
    hostname: "be-01"
    depends_on:
      - docker-fe-01
    environment:
      - FE_SERVERS=fe1:172.20.80.2:9010
      - BE_ADDR=172.20.80.5:9050
    ports:
      - 8041:8040
    volumes:
      - /Users/antg/docker/doris_1fe_1be/data/be-01/storage:/opt/apache-doris/be/storage
      - /Users/antg/docker/doris_1fe_1be/data/be-01/conf:/opt/apache-doris/be/conf
      - /Users/antg/docker/doris_1fe_1be/data/be-01/script:/docker-entrypoint-initdb.d
      - /Users/antg/docker/doris_1fe_1be/data/be-01/log:/opt/apache-doris/be/log
    networks:
      doris_net:
        ipv4_address: 172.20.80.5
networks:
  doris_net:
    ipam:
      config:
        - subnet: 172.20.80.0/24

启动并验证是否启动成功

# 启动
docker-compose -f 1fe_1be.yaml up -d
# 连接doris
mysql -h127.0.0.1 -P9031 -uroot -p
# 创建数据库 doris_sync
> create database doris_sync;

mysql环境及测试数据准备

使用本机之前安装的mysql

建测试库测试表

create database doris_sync;
CREATE TABLE `a_0` (
  `id` int NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

CREATE TABLE `a_1` (
  `id` int NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

CREATE TABLE `abc` (
  `id` int NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=11 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

CREATE TABLE `table_0` (
  `id` int NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=11 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

CREATE TABLE `table_1` (
  `id` int NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=101 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

其中 a_0,a_1 是分表,table_0,table_1是另外一个分表,abc是一个单独的表

初始化插入一些测试数据

INSERT INTO `a_0` (`id`, `name`) VALUES (1, 'a');
INSERT INTO `a_1` (`id`, `name`) VALUES (2, 'b');
BEGIN;
INSERT INTO `abc` (`id`, `name`) VALUES (1, 'Luo Rui');
INSERT INTO `abc` (`id`, `name`) VALUES (2, 'Yung Wing Kuen');
INSERT INTO `abc` (`id`, `name`) VALUES (3, 'Chiang Chun Yu');
INSERT INTO `abc` (`id`, `name`) VALUES (4, 'Tang Ming');
INSERT INTO `abc` (`id`, `name`) VALUES (5, 'Man Wai Lam');
INSERT INTO `abc` (`id`, `name`) VALUES (6, 'Tin Tsz Ching');
INSERT INTO `abc` (`id`, `name`) VALUES (7, 'Doris Moore');
INSERT INTO `abc` (`id`, `name`) VALUES (8, 'Abe Mitsuki');
INSERT INTO `abc` (`id`, `name`) VALUES (9, 'Du Shihan');
INSERT INTO `abc` (`id`, `name`) VALUES (10, 'Chiang Chi Yuen');
COMMIT;
BEGIN;
INSERT INTO `table_0` (`id`, `name`) VALUES (1, 'Luo Rui');
INSERT INTO `table_0` (`id`, `name`) VALUES (2, 'Yung Wing Kuen');
INSERT INTO `table_0` (`id`, `name`) VALUES (3, 'Chiang Chun Yu');
INSERT INTO `table_0` (`id`, `name`) VALUES (4, 'Tang Ming');
INSERT INTO `table_0` (`id`, `name`) VALUES (5, 'Man Wai Lam');
INSERT INTO `table_0` (`id`, `name`) VALUES (6, 'Tin Tsz Ching');
INSERT INTO `table_0` (`id`, `name`) VALUES (7, 'Doris Moore');
INSERT INTO `table_0` (`id`, `name`) VALUES (8, 'Abe Mitsuki');
INSERT INTO `table_0` (`id`, `name`) VALUES (9, 'Du Shihan');
INSERT INTO `table_0` (`id`, `name`) VALUES (10, 'Chiang Chi Yuen');
COMMIT;
INSERT INTO `table_1` (`id`, `name`) VALUES (100, 'tom');

配置容器路由转发

我们在代码中开发过程中可能会用到容器的ip地址,例如上面的172.20.80.0/24这个网段,但是你会发现你是ping不通的,这里设计到了一些docker网络的一些知识,可以在网上看一下资料,这里只给出解决方法
安装路由转发镜像

# 现在连接器
brew install wenjunxiao/brew/docker-connector
# 加入路由
docker network ls --filter driver=bridge --format "{{.ID}}" | xargs docker network inspect --format "route {{range .IPAM.Config}}{{.Subnet}}{{end}}" >> /opt/homebrew/etc/docker-connector.conf
# 启动路由器
sudo /opt/homebrew/opt/docker-connector/bin/docker-connector -config /opt/homebrew/etc/docker-connector.conf
# 启动镜像
docker run -it -d --restart always --net host --cap-add NET_ADMIN --name connector wenjunxiao/mac-docker-connector

如果还是ping不通就重启一下上面的转发容器
这一步很重要,想要通过访问容器的ip就要完成这一步

依赖包准备

下载flinkcdc 的依赖包放到flink目录下并解压
flinkcdc 依赖 : flink-cdc-3.0.0-bin.tar.gz
下载连接器 的依赖包放到flinkcdc的lib目录下
connector 依赖 :

  • MySQL pipeline connector 3.0.0
  • Apache Doris pipeline connector 3.0.0

配置FLINK_HOME环境变量

pwd
/Users/antg/software/flink-1.18.0/
export FLINK_HOME=/Users/antg/software/flink-1.18.0/

数据同步

整库同步

编写yaml文件 mysql-to-doris.yaml

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:
  type: mysql
  hostname: localhost
  port: 3306
  username: root
  password: 12345678
  tables: doris_sync.\.*
  server-id: 5400-5404
  server-time-zone: Asia/Shanghai

sink:
  type: doris
  fenodes: 127.0.0.1:8031
  username: root
  password: ""
  table.create.properties.light_schema_change: true
  table.create.properties.replication_num: 1

pipeline:
  name: Sync MySQL Database to Doris
  parallelism: 2

启动任务

bash bin/flink-cdc.sh mysql-to-doris.yaml

查看页面效果image.png
这里可以看到同步的数据条数及大小

查看doris的数据及建表情况
image.png
可以看到表被自动创建并且数据也同步过来了

新增数据

INSERT INTO `a_0` (`id`, `name`) VALUES (3, 'jack');

Kapture 2024-01-21 at 15.04.36.gif

更新数据

update a_0 set name='tom' where id=3;

Kapture 2024-01-21 at 15.09.41.gif

删除数据

delete from a_0 where id=1;

没成功同步(已咨询社区是1.2.2的bug,在1.2.3修复了,正常来说会同步)

新增字段

alter table a_0 add column age int;

在这里插入图片描述

修改字段

# 修改名称
alter table a_0 change age age_range int;
# 修改字段类型
alter table a_0 modify column age_range varchar(100);
# 字段字段长度
alter table a_0 modify column age_range varchar(1200);

以上语句不会被同步

删除字段

alter table a_0 drop column age_range;

以上语句不会被同步

删除表

drop table a_0;

不会被同步

结论 :
1.新增数据,新增字段,修改数据会被实时同步到doris
2.delete数据不会被同步(已咨询社区是1.2.2的bug,在1.2.3修复了,正常来说会同步)
3.修改字段名称,类型,长度不会被同步(可能有参数可以开启)
4.删除字段不会被同步
5.删除表不会被同步

路由变更

这里将使用flinkcdc3.0 新增的路由功能来实现分表合一的效果,而且也可以做到同步到doris的库名和表名换成自己想要的名称
将之前的mysql端数据清理,表重新建立

需求 :
将mysql端doris_sync同步到doris的ods库中
a_0,a_1 合并到ods_a表
abc 同步到 ods_abc表
table_0,table_1同步到 ods_table表

任务配置 route.yaml

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:
  type: mysql
  hostname: localhost
  port: 3306
  username: root
  password: 12345678
  tables: doris_sync.\.*
  server-id: 5400-5404
  server-time-zone: Asia/Shanghai

sink:
  type: doris
  fenodes: 127.0.0.1:8031
  username: root
  password: ""
  table.create.properties.light_schema_change: true
  table.create.properties.replication_num: 1

route:
  - source-table: doris_sync.a_\.*
    sink-table: ods.ods_a
  - source-table: doris_sync.abc
    sink-table: ods.ods_abc
  - source-table: doris_sync.table_\.*
    sink-table: ods.ods_table

pipeline:
  name: Sync MySQL Database to Doris
  parallelism: 2

创建doris端ods库(不会自动创建库,必须手动创建)

create database ods;

将之前的任务停掉,启动这个任务
image.png
在这里插入图片描述

可以看到
1.多个分表在doris只创建了一个目标表
2.多个分表的数据都同步到了一个表中
非常棒的功能 👍👍👍

测试一下新增一个分表是否会自动同步到目标表

CREATE TABLE `a_2` (
  `id` int NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

INSERT INTO `a_2` (`id`, `name`) VALUES (1000, 'a');

image.png
新增分表后,分表不会被自动同步

重启任务
image.png
重启后数据可以被正常同步

从checkpoint恢复任务并新增分表

先修改一下flink-conf.yaml,否则任务cancel的时候ck不会被保留,还需要修改一下ck存储的路径

# 在flink目录下创建一个路径存储ck
mkdir ckdata

image.png
image.png

启动任务

bash bin/flink-cdc.sh route.yaml

image.png
看一下ck是否正常存储
image.png
新增表,cancel任务,然后从ck处重启

CREATE TABLE `a_4` (
  `id` int NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

INSERT INTO `a_4` (`id`, `name`) VALUES (1000000, 'a');
################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:
  type: mysql
  hostname: localhost
  port: 3306
  username: root
  password: 12345678
  tables: doris_sync.\.*
  server-id: 5400-5404
  server-time-zone: Asia/Shanghai

sink:
  type: doris
  fenodes: 127.0.0.1:8031
  username: root
  password: ""
  table.create.properties.light_schema_change: true
  table.create.properties.replication_num: 1

route:
  - source-table: doris_sync.a_\.*
    sink-table: ods.ods_a
  - source-table: doris_sync.abc
    sink-table: ods.ods_abc
  - source-table: doris_sync.table_\.*
    sink-table: ods.ods_table

pipeline:
  name: Sync MySQL Database to Doris
  parallelism: 2

在flink-conf最后加上ck的重启路径

# 查看当前路径
pwd
/Users/antg/software/flink-1.18.0/flink-cdc-3.0.0

# 找到最新的ck存储路径
ll -rth ../ckdata
drwxr-xr-x@ 5 antg  staff   160B Jan 21 16:27 436dfeb839b2c877d6e49023e3e099b5
drwxr-xr-x@ 5 antg  staff   160B Jan 21 17:12 d519a3f930d9f410e048f63a883e1dce
drwxr-xr-x@ 5 antg  staff   160B Jan 21 18:59 b0ed22a804ad34336ab3e9b328d13257
drwxr-xr-x@ 5 antg  staff   160B Jan 21 19:01 394d7a89885bbd319e8ab92043283de9
drwxr-xr-x@ 5 antg  staff   160B Jan 21 19:05 1547d3cf60ed278ccd3787025bb4b5f6
drwxr-xr-x@ 5 antg  staff   160B Jan 21 19:07 51ff313e98fb9882f20f57bc697a8ae6
drwxr-xr-x@ 5 antg  staff   160B Jan 21 19:08 f10623b642135002499775274c078b9e
drwxr-xr-x@ 5 antg  staff   160B Jan 21 19:09 73b47091ca00547a5d8121474b3dbd79

ll ../ckdata/73b47091ca00547a5d8121474b3dbd79
drwxr-xr-x@ 3 antg  staff    96B Jan 21 19:09 chk-172
drwxr-xr-x@ 2 antg  staff    64B Jan 21 19:09 shared
drwxr-xr-x@ 2 antg  staff    64B Jan 21 19:09 taskowned

# 将ck路径加到flink-conf的最后一行
vim ../conf/flink-conf.yaml
execution.savepoint.path: file:///Users/antg/software/flink-1.18.0/ckdata/73b47091ca00547a5d8121474b3dbd79/chk-172

# 启动任务
bin/flink-cdc.sh route.yaml

image.png
可以看到任务从检查点重启了
image.png
数据也正常同步

这里从ck重启是修改了flink-conf,但是感觉这样很不方便,尝试过在yaml的pipeline下加上这个属性,但是不起作用,其他位置也没找到加ck路径的地方,如果各位大神有其他好的方法欢迎评论区留言,也欢迎加我的个人微信一起交流各种技术.

参考

[基于 Flink CDC 3.0 构建 MySQL 到 Doris 的 Streaming ELT] : https://ververica.github.io/flink-cdc-connectors/release-3.0/content/%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B/mysql-doris-pipeline-tutorial-zh.html
[vm.max_map_count参数详解] : https://blog.csdn.net/a772304419/article/details/132585239

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1427371.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【大数据】专业融合型人才迎来发展良机-国家数据局正式揭牌

⭐简单说两句⭐ 作者:后端小知识 CSDN个人主页:后端小知识 🔎GZH:后端小知识 🎉欢迎关注🔎点赞👍收藏⭐️留言📝 摘要: 新华社北京10月26日电 《中国证券报》26日刊发文章…

shell - 正则表达式和grep命令和sed命令

一.正则表达式概述 1.正则表达式定义 1.1 定义 使用字符串描述、匹配一系列符合某个规则的字符串 1.2 了解 普通字符: 大小写字母、数字、标点符号及一些其它符号元字符: 在正则表达式中具有特殊意义的专用字符 1.3 层次分类 基础正则表达式扩展正…

git修改密码后mac使用sourceTree出现Authentication failed错误

1、退出sourceTree 2、在钥匙串中删除git对应站点Access Key 3、执行命令:git config --system --unset credential.helper 4、重新启动sourceTree,这时会弹出输入密码框,重新输入密码即可

react 之 UseMemo

useMemo 看个场景 下面我们的本来的用意是想基于count的变化计算斐波那契数列之和,但是当我们修改num状态的时候,斐波那契求和函数也会被执行,显然是一种浪费 // useMemo // 作用:在组件渲染时缓存计算的结果import { useState …

树——二叉搜索树

二叉搜索树 概述 随着计算机算力的提升和对数据结构的深入研究,二叉搜索树也不断被优化和扩展,例如AVL树、红黑树等。 特性 二叉搜索树(也称二叉排序树)是符合下面特征的二叉树: 树节点增加 key 属性,用来…

Git介绍与常用命令总结

Git介绍与其常用命令总结 1、Git介绍2、Git的使用3、Git常用命令3.1 初始化仓库3.2 克隆仓库3.3 配置用户信息3.4 提交代码(Commit)3.5 推送代码(Push)3.6 拉取代码(Pull)3.7 分支(Branch)3.8 远程仓库(Remote)3.9 撤销回退本地改动3.10 更新本地仓库与远程仓库 1、Git介绍 Gi…

编程流程图

对于复杂流程,我做开发之前一般会 先画一下流程图。特别是多个部门有交叉的情况下: processOn: 这个是我之前 一直的选择,他可以画上面的这些,流程图,网页操作,但是他不是免费的,查过…

【LVGL环境搭建】

LVGL环境搭建 win模拟器环境搭建一.二.三.四.五. Ubuntu模拟器环境搭建一. 前置准备二. 下载LVGL Source code:三. 安装sdl2:四. 开启VScode执行五. 安装扩展套件六. 按F5执行七. 执行结果 win模拟器环境搭建 一. 二. 三. 四. 五. Ubuntu模拟器环境…

同城上门预约软件开发:改变生活服务模式

随着互联网技术的飞速发展,人们的生活方式也在发生着深刻的变化。特别是在生活服务领域,新的需求和模式不断涌现。其中,同城上门预约服务正逐渐成为一种新的趋势。本文将探讨开发同城上门预约软件的意义、市场需求、功能设计以及面临的挑战。…

C++类和对象入门(一)

顾得泉:个人主页 个人专栏:《Linux操作系统》 《C从入门到精通》 《LeedCode刷题》 键盘敲烂,年薪百万! 一、面相过程和面向对象的初步认识 C语言是面向过程的,关注的是过程,分析出求解问题的步骤&#…

大语言模型之LlaMA系列-LlaMA 2及LlaMA_chat(下)

多转一致性的系统消息 - System Message for Multi-Turn Consistency 在对话设置中,某些指示应适用于所有对话轮次。 例如,简洁地响应,或"充当"某个公众人物。当我们向Llama 2-Chat提供此类指示时,后续应响应始终遵守约…

专业139总分400+南昌大学811信号与系统考研经验电子信息与通信工程集成电路

今年专业课811信号与系统139分,总分400,顺利上岸南昌大学,回首这一年的复习,有很多经验想和大家分享,希望对大家复习会有一些帮助。专业课:139分,811信号与系统 主要参考书:《信号与…

2024年人工智能应用与先进制造科学国际学术会议(ICAIAAMS 2024)

2024年人工智能应用与先进制造科学国际学术会议(ICAIAAMS 2024) 2024 International Conference on Artificial Intelligence Applications and Advanced Manufacturing Science (ICAIAAMS 2024) 会议简介: 2024年人工智能应用与先进制造科学国际学术会议&#xff…

C语言基础:头歌练习数组练习

(字符串插入) 任务描述 题目描述:输入两个字符串a和b,将b串中的最大字符插入到a串中最小字符后面。 样例输入: MynameisAmy MynameisJane 样例输出: MynameisAymy 题目分析:a字符串中最小的字符是A…

黑马程序员——html css基础——day06——Flex布局

目录: 小米登录 第一步搭建大盒子logo设置标题和input设置密码框和登录按钮完整写法:爱宠案例 大盒子dog搭建h2标题的做法ul布局修改li和a链接的样式给li添加背景图片完整的写法:标准流浮动 基本使用产品区域布局 左右布局区域小li布局清除浮…

天拓四方:边缘计算网关功能、特点与应用举例

传统的数据处理方式面临网络延迟、带宽限制和安全风险等问题。为了解决这些问题,边缘计算技术应运而生,而边缘计算网关作为其核心组件,正发挥着越来越重要的作用。边缘计算网关位于数据源和云数据中心之间。它具备数据采集、协议转换、数据处…

极限挑战:使用 Go 打造百亿级文件系统的实践之旅

JuiceFS 企业版是一款为云环境设计的分布式文件系统,单命名空间内可稳定管理高达百亿级数量的文件。 构建这个大规模、高性能的文件系统面临众多复杂性挑战,其中最为关键的环节之一就是元数据引擎的设计。JuiceFS 企业版于 2017 年上线,经过…

Istio-解决Zipkin对项目的侵入性问题

Istio采用SideCar模式注入的Enovy代理在某些情况下不能完全解决对项目的无侵入性,比如需要用到Istio的链路追踪功能的时候。需要在代码中手动注入链路追踪需要的header,这样就出现了Istio对业务功能的侵入性。 istio服务网格的调用链跟踪需要依赖在服务之…

鸿蒙ArkUI日期选择组件

鸿蒙ArkUI日期选择组件,基于基础组件进行的二次封装的日期选择组件,快速实现日期选择。 /*** 日期*/ Component export default struct DiygwDate{//绑定的值Link Watch(onValue) value:string;// 隐藏值State valueField: string value;// 显示值Sta…

探索Gin框架:Golang使用Gin完成文件上传

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站https://www.captainbed.cn/kitie。 前言 在之前的文章中,我们讲解了Gin框架的快速入门使用,今天我们来聊聊如何使用…