基于 Flink CDC 构建 MySQL 到 Databend 的 实时数据同步

news2024/11/12 18:42:50

这篇教程将展示如何基于 Flink CDC 快速构建 MySQL 到 Databend 的实时数据同步。本教程的演示都将在 Flink SQL CLI 中进行,只涉及 SQL,无需一行 Java/Scala 代码,也无需安装 IDE。

假设我们有电子商务业务,商品的数据存储在 MySQL ,我们需要实时把它同步到 Databend 中。

接下来的内容将介绍如何使用 Flink Mysql/Databend CDC 来实现这个需求,系统的整体架构如下图所示:

准备阶段

准备一台已经安装了 Docker 和 docker-compose 的 Linux 或者 MacOS 。

准备教程所需要的组件

接下来的教程将以 docker-compose 的方式准备所需要的组件。

debezium-MySQL

docker-compose.yaml

version: '2.1'
services:
  postgres:
    image: debezium/example-postgres:1.1
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_DB=postgres
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
  mysql:
    image: debezium/example-mysql:1.1
    ports:
      - "3306:3306"
    environment:
      - MYSQL_ROOT_PASSWORD=123456
      - MYSQL_USER=mysqluser
      - MYSQL_PASSWORD=mysqlpw

Databend

docker-compose.yaml

version: '3'
services:
  databend:
    image: datafuselabs/databend
    volumes:
      - /Users/hanshanjie/databend/local-test/databend/databend-query.toml:/etc/databend/query.toml
    environment:
      QUERY_DEFAULT_USER: databend
      QUERY_DEFAULT_PASSWORD: databend
      MINIO_ENABLED: 'true'
    ports:
      - '8000:8000'
      - '9000:9000'
      - '3307:3307'
      - '8124:8124'

在 docker-compose.yml 所在目录下执行下面的命令来启动本教程需要的组件:

ocker-compose up -d

该命令将以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。你可以通过 docker ps 来观察上述的容器是否正常启动。

  1. 下载 Flink 1.16.0 并将其解压至目录 flink-1.16.0

  2. 下载下面列出的依赖包,并将它们放到目录 flink-1.16.0/lib/ 下:

  3. 下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地编译

  • flink-sql-connector-mysql-cdc-2.3.0.jar
git clone https://github.com/databendcloud/flink-connector-databend
cd flink-connector-databend
mvn clean install -DskipTests

将 target/flink-connector-databend-1.16.0-SNAPSHOT.jar 拷贝到目录 flink-1.16.0/lib/ 下。

准备数据

 MySQL 数据库中准备数据

进入 MySQL 容器

docker-compose exec mysql mysql -uroot -p123456

创建数据库 mydb 和表 products,并插入数据:

CREATE DATABASE mydb;
USE mydb;

CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512));
ALTER TABLE products AUTO_INCREMENT = 10;

INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"cloud","test for databend"),
(default,"spare tire","24 inch spare tire");

Databend 中建表

CREATE TABLE bend_products (id INT NOT NULL, name VARCHAR(255) NOT NULL, description VARCHAR(512) );

使用下面的命令跳转至 Flink 目录下

cd flink-16.0

使用下面的命令启动 Flink 集群

./bin/start-cluster.sh

启动成功的话,可以在 http://localhost:8081/ 访问到 Flink Web UI,如下所示:

使用下面的命令启动 Flink SQL CLI

./bin/sql-client.sh

首先,开启 checkpoint,每隔3秒做一次 checkpoint

-- Flink SQL              
Flink SQL> SET execution.checkpointing.interval = 3s;

然后, 对于数据库中的表 products 使用 Flink SQL CLI 创建对应的表,用于同步底层数据库表的数据

-- Flink SQL
Flink SQL> CREATE TABLE products (id INT,name STRING,description STRING,PRIMARY KEY (id) NOT ENFORCED) 
WITH ('connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'products',
'server-time-zone' = 'UTC'
);

最后,创建 d_products 表, 用来订单数据写入 Databend 中

-- Flink SQL
create table d_products (id INT,name String,description String, PRIMARY KEY (`id`) NOT ENFORCED) 
with ('connector' = 'databend',
'url'='databend://localhost:8000',
'username'='databend',
'password'='databend',
'database-name'='default',
'table-name'='bend_products',
'sink.batch-size' = '5',
'sink.flush-interval' = '1000',
'sink.max-retries' = '3');

使用 Flink SQL 将 products 表中的数据同步到 Databend 的 d_products 表中:

insert into d_products select * from products;

此时 flink job 就会提交成功,打开 flink UI 可以看到:

同时在 databend 中可以看到 MySQL 中的数据已经同步过来了:

同步 Insert/Update 数据

此时我们在 MySQL 中再插入 10 条数据:

INSERT INTO products VALUES 
(default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),        
(default,"hammer","12oz carpenter's hammer"),        
(default,"hammer","14oz carpenter's hammer"),        
(default,"hammer","16oz carpenter's hammer"),        
(default,"rocks","box of assorted rocks"),        
(default,"jacket","water resistent black wind breaker"),
(default,"cloud","test for databend"),        
(default,"spare tire","24 inch spare tire");

这些数据会立即同步到 Databend 当中。

假如此时 MySQL 中更新了一条数据:

那么 id=10 的数据在 databend 中也会被立即更新:

环境清理

操作结束后,在 docker-compose.yml 文件所在的目录下执行如下命令停止所有容器:

docker-compose down

在 Flink 所在目录 flink-1.16.0 下执行如下命令停止 Flink 集群:

./bin/stop-cluster.sh

结论

以上就是基于 Flink CDC 构建 MySQL 到 Databend 的 实时数据同步的全部过程,通过 Flink CDC connectors 可以替换 Debezium+Kafka 的数据采集模块,实现 Flink SQL 采集+计算+传输一体化,减少维护的组件,简化实时链路,减轻部署成本的同时也能达到 Exactly Once 的语义效果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/672189.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【深度学习】5-5 与学习相关的技巧 - 超参数的验证

超参数指的是,比如各层的神经元数量、batch大小、参数更新时的学习率或权值衰减等。如果这些超参数没有设置合适的值,模型的性能就会很差。 那么如何能够高效地寻找超参数的值的方法 验证数据 之前我们使用的数据集分成了训练数据和测试数据&#xff0c…

WorkPlus AI助理正式上线!为企业打造定制化的AI私有助理

毋庸置疑,ChatGPT的应用充满无限的想象空间。但对于企业来说,使用时面临的最核心的问题就是“存在回答准确性不足”的弊端。那企业都想要通过GPT构建内容生态,在数字化时代保持行业领先地位。 企业都想要结合行业属性、业务需求等自身特点打…

【Flutter】Flutter 数据存储 Hive 的简要使用说明

文章目录 一、前言二、Hive 包的版本号三、Hive 简介1. Hive 是什么?2. Hive 的特点 四、Hive 的基本使用1. Hive 的安装2. Hive 的初始化3. 创建和打开 Hive 数据库4. 数据的存储和读取5. 数据的删除 五、总结 一、前言 🎉想要精通 Flutter&#xff0c…

是时候扔掉cmder, 换上Windows Terminal

作为一个Windows的长期用户,一直没有给款好用的终端,知道遇到了 cmder,它拯救一个习惯用Windows敲shell命令的人。 不用跟我安利macOS真香!公司上班一直用macOS,一方面确实更加习惯windows下面学习, 另一方面是上课需要…

Phantomjs实现后端将URL转换为图片

PhantomJS简介 PhantomJS is a command-line tool. – 其实就是一个命令行工具 PhantomJS的下载地址: Windows:phantomjs-2.1.1-windows.zip Linux:phantomjs-2.1.1-linux-x86_64.tar.bz2;phantomjs-2.1.1-linux-i686.tar.bz2 MacOS:phantomjs-2.1.1-macosx.zip…

西门子Mendix 入门 2

今天还是一直下载失败,就算成功了,速度也只有几K,于是使用翻墙软件,最终下载成功 下载成功后重新点击edit in studio pro 出现如下页面 首先先关闭安全性 进行添加任务和管理任务 点击上方绿色箭头后点击View App 出现如下页面…

ESP32-WROOM-32 UDP单播透传AT指令例程

ESP32-WROOM-32 AT指令配置TCP通讯 ESP32-WROOM-32前言固件烧录测试AT指令UDP单播通讯\透传ESP32配置SoftAPESP32与手机间的UDP通讯与透传普通传输模式演示UDP透传演示 ESP32-WROOM-32 前言 上次演示了ESP32与手机的三种TCP连接与数据传输方法,现在接着上一篇“ESP…

第二章 数据结构(一)——链表,栈和队列与kmp

文章目录 链表栈和队列表达式运算 单调栈单调队列kmp链表练习题826. 单链表827. 双链表 栈和队列练习题828. 模拟栈3302. 表达式求值829. 模拟队列830. 单调栈154. 滑动窗口 kmp练习题831. KMP字符串 kmp虐我一下午 链表 若用链式结构实现链表,效率低,因…

软件开发流程

目录 软件软件开发流程的演变 瀑布模型敏捷模型 XPSCRUMDevOps 1.软件 与计算机系统操作有关的计算机程序、可能有的文件、文档及数据。 软件可以分为两种主要类型: 独立软件:独立软件是一种完整的应用程序,可以直接在计算机或移动设备上…

Android系统安全 — 6.2 Ethernet安卓架构

1. Android Ethernet架构介绍 整个Ethernet系统架构如下图所示: 以太网服务(EthernetService)的启动与注册流程;应用层调用使能ethernet功能的方法流程来分析,从应用层如何将指令一步一步传到底层kernel;…

SAAS-HRM系统概述与搭建环境

SAAS-HRM系统概述与搭建环境 学习目标: 理解SaaS的基本概念 了解SAAS-HRM的基本需求和开发方式掌握Power Designer的用例图 完成SAAS-HRM父模块及公共模块的环境搭建完成企业微服务中企业CRUD功能 初识SaaS 云服务的三种模式 IaaS(基础设施即服务…

使用Windows To Go工具制作你的U盘系统【含下载Windows10系统镜像】亲测已成功23.06.21

WinToGo是一款辅助工具:专为能够让你将系统装进U盘,移动硬盘里,让你在任意电脑都能运行U盘里装的系统! 一、下载,安装“Windows To Go”工具 1、下载Windows To Go工具 口袋系统WinToGo: 安装Win 10到U盘 2、双击Wi…

从0到1精通自动化测试,pytest自动化测试框架,assert断言(七)

目录 一、前言 二、assert 三、异常信息 四、异常断言 五、常用断言 一、前言 断言是写自动化测试基本最重要的一步,一个用例没有断言,就失去了自动化测试的意义了。什么是断言呢? 简单来讲就是实际结果和期望结果去对比,符…

三分钟学习一个python小知识2-----------我的对python的类(Class)和对象(Object)的理解

文章目录 一、类(Class)和对象(Object)是什么?二、Python类和对象的实现1.定义类2.创建对象3.调用类的属性和方法 三、利用python实现了一个动物的类(Animal)和其两个子类(Cat和Dog&…

年轻人存款难,要攒够多少存款才可以体面的养老,结论亮了

这个情况确实值得我们思考。年轻人的经济压力比较大,所以他们普遍存款比较少。而10万元确实是一个比较大的数目,对于一些年轻人来说可能确实很难达到。 然而,我认为这并不是一个“坎”。我们应该鼓励年轻人理财,增加存款,以便应对未来可能出现的各种经济问题。同时,我们…

定义一个一维数组存放10个整数,要求从键盘输入10个数,对其进行求和、求平均、求最大值/最小值及其位置的下标

目录 题目 分析思路 法一:在主函数直接编程 法二:用 调用函数 实现 代码 法一:在主函数直接编程 法二:用 调用函数 实现 题目 定义一个一维数组存放10个整数,要求从键盘输入10个数,对其进行求和、求…

新华三H3C无线控制器AC对接网络准入实现定制化Portal短信认证

随着企业办公信息化的不断发展,企业内网安全也面临着诸多挑战。在包含了无线 WiFi、有线网络的混合网络环境中,员工或访客、外包人员、合作伙伴等用户在接入网络时,如果无需进行身份验证及访问权限的管理,则很可能给不法分子可乘之…

一起Talk Android吧(第五百四十八回:如何创建垂直版SeekBar)

文章目录 概念介绍创建方法示例程序 各位看官们大家好,上一回中咱们说的例子是"蓝牙广播中的厂商数据",本章回中介绍的例子是" 如何创建垂直版SeekBar"。闲话休提,言归正转,让我们一起Talk Android吧! 概念介…

基于深度学习的高精度绵羊检测识别系统(PyTorch+Pyside6+YOLOv5模型)

摘要:基于深度学习的高精度绵羊检测识别系统可用于日常生活中或野外来检测与定位绵羊目标,利用深度学习算法可实现图片、视频、摄像头等方式的绵羊目标检测识别,另外支持结果可视化与图片或视频检测结果的导出。本系统采用YOLOv5目标检测模型…

Java基础知识之异常处理

目录 1.Java 异常处理 2.Exception 类的层次 3.Java 内置异常类 4.异常方法 5.捕获异常 6.多重捕获块 7.throws/throw 关键字 7.1 throw 关键字 7.2 throws 关键字 8.finally关键字 8.1 实例--ExcepTest.java 文件代码: 9.try-with-resources 9.1 try-…