Flink实时同步MySQL与Doris数据

news2024/11/16 8:40:05

参考:

技术解析|Doris Connector 结合 Flink CDC 实现 MySQL 分库分表 Exactly Once 精准接入-阿里云开发者社区

逻辑图:

1. Flink环境:

https://flink.apache.org/zh/

  • 下载flink-1.15.1
wget https://dlcdn.apache.org/flink/flink-1.15.1/flink-1.15.1-bin-scala_2.12.tgz

  • 解压,修改配置
tar -zxvf flink-1.15.1-bin-scala_2.12.tgz cd flink-1.15.1

  • 修改配置
修改rest.bind-address为 0.0.0.0
vi conf/flink-conf.yaml

  • 下载依赖jar包 至 flink安装目录lib下
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jar 

wget https://repo1.maven.org/maven2/org/apache/doris/flink-doris-connector-1.14_2.12/1.0.3/flink-doris-connector-1.14_2.12-1.0.3.jar

  • 启动flink
./bin/start-cluster.sh

  • 访问WebUI

http://192.168.0.158:8081

2、MySQL数据表及数据

  1. 开启Binlog,进入容器修改/etc/mysql/mysql.cnf,然后重启mysql
[mysqld] 
log_bin=mysql_bin 
binlog-format=Row 
server-id=1

  1. 进入MySQL命令行:创建数据库emp,数据表employee:
CREATE DATABASE emp; 

USE emp; 

CREATE TABLE employee ( 
emp_no INT NOT NULL, 
birth_date DATE NOT NULL, 
first_name VARCHAR(14) NOT NULL, 
last_name VARCHAR(16) NOT NULL, 
gender ENUM ('M','F') NOT NULL, 
hire_date DATE NOT NULL, PRIMARY KEY (emp_no) 
); ​ 

INSERT INTO `employee` VALUES 
(10001,'1953-09-02','Georgi','Facello','M','1986-06-26'), 
(10002,'1964-06-02','Bezalel','Simmel','F','1985-11-21'), 
(10003,'1959-12-03','Parto','Bamford','M','1986-08-28'), 
(10004,'1954-05-01','Chirstian','Koblick','M','1986-12-01'), 
(10005,'1955-01-21','Kyoichi','Maliniak','M','1989-09-12'), 
(10006,'1953-04-20','Anneke','Preusig','F','1989-06-02'), 
(10007,'1957-05-23','Tzvetan','Zielinski','F','1989-02-10'), 
(10008,'1958-02-19','Saniya','Kalloufi','M','1994-09-15'), 
(10009,'1952-04-19','Sumant','Peac','F','1985-02-18'), 
(10010,'1963-06-01','Duangkaew','Piveteau','F','1989-08-24'), 
(10011,'1953-11-07','Mary','Sluis','F','1990-01-22'), 
(10012,'1960-10-04','Patricio','Bridgland','M','1992-12-18'), 
(10013,'1963-06-07','Eberhardt','Terkki','M','1985-10-20'), 
(10014,'1956-02-12','Berni','Genin','M','1987-03-11'), 
(10015,'1959-08-19','Guoxiang','Nooteboom','M','1987-07-02'), 
(10016,'1961-05-02','Kazuhito','Cappelletti','M','1995-01-27'), 
(10017,'1958-07-06','Cristinel','Bouloucos','F','1993-08-03'), 
(10018,'1954-06-19','Kazuhide','Peha','F','1987-04-03'), 
(10019,'1953-01-23','Lillian','Haddadi','M','1999-04-30'), 
(10020,'1952-12-24','Mayuko','Warwick','M','1991-01-26');

3. Doris数据表

  1. 进入MySQL命令行:创建Doris数据库demo,数据表employee_info
CREATE DATABASE demo; 

USE demo; 

CREATE TABLE employee_info ( 
emp_no int NOT NULL, 
birth_date date, 
first_name varchar(20), 
last_name varchar(20), 
gender char(2), 
hire_date date, 
database_name varchar(50), 
table_name varchar(200) 
) 
UNIQUE KEY(`emp_no`, `birth_date`) 
DISTRIBUTED BY HASH(`birth_date`) BUCKETS 1 
PROPERTIES ( "replication_allocation" = "tag.location.default: 1" );

4. Flink数据表及数据

  • 启动fink-sql-client
./bin/sql-client.sh embedded

  • 开启Checkpoint
Flink作业周期性执行checkpoint,记录Binlog位点,当作业发生Failover时,便会从之前记录的Binlog位点继续处理。
生产环境建议设置为60秒。
Flink SQL> SET execution.checkpointing.interval = 10s

  • 创建MySQL CDC表
Flink SQL> CREATE TABLE employee_source ( 
database_name STRING METADATA VIRTUAL, 
table_name STRING METADATA VIRTUAL, 
emp_no int NOT NULL, 
birth_date date, 
first_name STRING, 
last_name STRING, 
gender STRING, 
hire_date date, 
PRIMARY KEY (`emp_no`) NOT ENFORCED 
) 
WITH ( 
'connector' = 'mysql-cdc', 
'hostname' = 'localhost', 
'port' = '3336', 
'username' = 'root', 
'password' = '1234.abcd', 
'database-name' = 'emp', 
'table-name' = 'employee' 
);

查询数据:

Flink SQL> select * from employee_source limit 10;

  • 创建Doris Sink表
Flink SQL> CREATE TABLE cdc_doris_sink ( 
emp_no int , 
birth_date STRING, 
first_name STRING, 
last_name STRING, 
gender STRING, 
hire_date STRING, 
database_name STRING, 
table_name STRING 
) 
WITH ( 
'connector' = 'doris', 
'fenodes' = 'localhost:8030', 
'table.identifier' = 'demo.employee_info', 
'username' = 'root', 
'password' = '1234.abcd' 
);
参数说明:
connector : 指定连接器是doris
fenodes:doris FE节点IP地址及http port
table.identifier : Doris对应的数据库及表名
username:doris用户名
password:doris用户密码

查询数据:

Flink SQL> select * from cdc_doris_sink;

  • 添加数据同步任务
Flink SQL> insert into cdc_doris_sink (emp_no,birth_date,first_name,last_name,gender,hire_date,database_name,table_name) 
select emp_no,cast(birth_date as string) as birth_date ,first_name,last_name,gender,cast(hire_date as string) as hire_date ,database_name,table_name from employee_source;

WebUI可以看到正在执行中的任务,说明添加完成

查看Doris数据表中数据

mysql> select * from employee_info;

5. 问题说明:

NoResourceAvailableException: Could not acquire the minimum required resources

进入flink目录,修改conf/conf/flink-conf.yaml:taskmanager.numberOfTaskSlots: 4 , 一般配置为cpu的个数。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/349631.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

并发编程之synchronized详解

目录 设计同步器的意义 如何解决线程并发安全问题? synchronized原理详解 synchronized底层原理 Monitor监视器锁 什么是monitor? 对象的内存布局 对象头 对象头分析工具 锁的膨胀升级过程 偏向锁 轻量级锁 自旋锁 锁消除 逃逸分析 设…

RabbitMQ学习(七):交换器

〇、前言在之前的内容中,我们创建了一个工作队列。我们假设的是工作队列背后,每个任务都恰好交付给一个消 费者(工作进程)。在今天的内容中,我们将做一些完全不同的事情——我们将消息传达给多个消费者。这种模式 称为 “发布/订阅”。为了说…

横板格斗类游戏实战:核心玩法介绍(一)

第一章讲解了横板格斗类游戏框架主要涉及到的一些模块设计与技术原理,本章节开始讲解横板格斗类游戏的玩法,美术资源与游戏的数值策划。我们以主要的截图为示意图,来把整个横板格斗类游戏的核心玩法和要实现的功能大致列一遍。对啦&#xff0…

QGIS中进行批量坡向计算

QGIS中进行坡向计算1. 坡向计算中的Z因子(垂直单位与水平单位的比值)2. 坡向计算步骤坡度计算的姊妹篇–坡向计算来了 1. 坡向计算中的Z因子(垂直单位与水平单位的比值) z 因子是一个转换因子,当输入表面的垂直坐标&…

BFC到底是什么?如何理解

BFC到底是什么? BFC全称:Block Formatting Context, 名为“块级格式化上下文”。 W3C官方解释:BFC 它决定了元素如何对其内容进行定位,以及与其他元素的关系和相互作用,当涉及到可视化布局时,B…

IMU调试方案

1.IMU 型号 QMI8658C IMU英文数据手册 参照连线与数据手册使用 类似的惯导模块开发https://www.cnblogs.com/rockyching2009/p/15071669.html 微雪 https://www.waveshare.net/wiki/RP2040-LCD-1.28 micro python :https://docs.micropython.org/en/latest/esp32/t…

ArcGIS API for JavaScript 4.15系列(7)——Dojo中的Ajax请求操作

1、前言 作为重要的前后端交互技术,Ajax被广泛应用于Web项目中。无论是jQuery时代的$.ajax还是Vue时代下的axios,它们都对Ajax做了良好的封装处理。而Dojo也不例外,开发者使用dojo/request模块可以轻松实现Ajax相关操作,下面开始…

UNP 简介

目录 从一个简单的时间获取客户端开始 socket 指定服务器 IP 地址与端口 与服务器建立连接并读取数据 简单的时间获取服务端 Unix 标准 从一个简单的时间获取客户端开始 接下来,将从一个使用 TCP 连接的获取时间的客户端开始。 // 以下代码与 UNP intro/dayt…

后台管理项目重构为vue3.0

目录前言:为什么要重构项目?重构的目的具体案例下载项目一. 为什么要重构后台管理项目二. 安装项目所需的vue3.0 插件三. 具体代码重构四. 在更改中遇到的bug总结前言: 我们平常玩的游戏有时需要更新出新的内容,我们的项目也需要…

组件化、模块化、集中式、分布式、服务化、面向服务的架构、微服务架构

目录 1.组件化与模块化 1.1.组件化 2.模块化 2.1.模块化和组件化的区别 3.集中式与分布式 3.1.集中式 3.2.分布式 4.服务化 5.面向服务的架构 5.1.什么是SOA 5.2.实现SOA 5.3.面向对象和面向服务的对比 6.微服务架构 6.1.SOA和微服务 7.总结 最近最火的词是什么…

1月份 GameFi 行业报告

Jan. 2023, DanielData Source: January Monthly GameFi Report在经历了艰难的一年之后,1 月是对加密货币市场最有利的月份。虽然可以说的大部分内容适用于其他看涨周期,但有几个统计数据令 1 月在区块链领域非常有趣。例如&#…

花3个月面过京东测开岗,拿个20K不过分吧?

背景介绍 计算机专业,代码能力一般,之前有过两段实习以及一个学校项目经历。第一份实习是大二暑期在深圳的一家互联网公司做前端开发,第二份实习由于大三暑假回国的时间比较短(小于两个月),于是找的实习是在…

GPU并行效率问题——通过MPS提升GPU计算收益

现象描述使用V100_32G型号的GPU运行计算程序时,发现程序每5秒能够完成一次任务,耗费显存6G。鉴于V100 GPU拥有32G的显存,还有很多空闲,决定同时运行多个计算程序,来提升GPU计算收益。然而,这一切都是想当然…

Kotlin新手教程一(Kotlin简介及环境搭建)

目录一、 什么是Kotlin?二、为什么要使用Kotlin?三、使用IntelliJ IDEA搭建Kotlin四、Kotlin使用命令行编译一、 什么是Kotlin? Kotlin 是一种在 Java 虚拟机上运行的静态类型编程语言,它也可以被编译成为 JavaScript 源代码&…

IDEA入门安装使用教程

一、背景 作为一个Java开发者,有非常多编辑工具供我们选择,比如Eclipse、IntelliJ IDEA、NetBeans、Visual Studio Code、Sublime Text等等,这些有免费也有收费的,但是就目前市场占比来说普遍使用Eclipse和IntelliJ IDEA这两款主…

字节软件测试岗:惨不忍睹的三面,幸好做足了准备,月薪19k,已拿offer

我今年25岁,专业是电子信息工程本科,19年年末的时候去面试,统一投了测试的岗位,软件硬件都有,那时候面试的两家公司都是做培训的,当初没啥钱,他们以面试为谎言再推荐去培训这点让我特别难受。后…

让huggingface/transformers的AutoTokenizer从本地读词表

https://stackoverflow.com/questions/62472238/autotokenizer-from-pretrained-fails-to-load-locally-saved-pretrained-tokenizer

ArcGIS网络分析之构建网络分析数据集(一)

说明: 1. 本文主要用于演示网络分析服务的搭建过程。所以在此不会深入讨论网络分析服务的每一个细节,本文的目的就是让初学者学会使用网络分析服务进行基本的分析(主要针对后续的WEB开发):路径分析,最近设施点分析,以及服务区分析。 2.关于OD成本矩阵分析,多路径配送,…

如何解决 Python 错误 NameError: name ‘X‘ is not defined

Python“NameError: name is not defined”发生在我们试图访问一个未定义的变量或函数时,或者在它被定义之前。 要解决该错误,需要确保我们没有拼错变量名并在声明后访问它。 确保你没有拼错变量或函数 下面是产生上述错误的示例代码。 employee {na…

【MySQL进阶】SQL优化

😊😊作者简介😊😊 : 大家好,我是南瓜籽,一个在校大二学生,我将会持续分享Java相关知识。 🎉🎉个人主页🎉🎉 : 南瓜籽的主页…