【Flink CDC(一)】实现mysql整表与增量读取

news2025/1/11 20:00:37

文章目录

  • 一. 运行前准备
    • 1. 依赖
      • 1.1. Maven dependency
      • 1.2. SQL Client JAR(推荐)
    • 2. 配置 MySQL 服务器(必须)
  • 二. 功能说明
    • 1. 启动模式
    • 2. 全量阶段支持 checkpoint
    • 3. 关于无主键表
    • Exactly-Once 处理
  • 三. 实战
    • 1. 实现mysql整表与增量表同步
  • FAQ

MySQL CDC 连接器允许从 MySQL 数据库读取快照数据(比如:flink任务消费时刻的整表数据)和增量数据。本文描述了如何设置 MySQL CDC 连接器来对 MySQL 数据库运行 SQL 查询。

本篇只关注mysql整表与增量读取的实现,对于并发读取等能力后续再探索。

 

一. 运行前准备

1. 依赖

1.1. Maven dependency

<dependency>
  <groupId>com.ververica</groupId>
  <artifactId>flink-connector-mysql-cdc</artifactId>
  <!-- 请使用已发布的版本依赖,snapshot版本的依赖需要本地自行编译。 -->
  <version>2.4.0</version>
</dependency>

 

1.2. SQL Client JAR(推荐)

下载 flink-sql-connector-mysql-cdc-2.4.0.jar 到 <FLINK_HOME>/lib/ 目录下。

 

2. 配置 MySQL 服务器(必须)

你必须定义一个 MySQL 用户,该用户对 MySQL CDC 连接器监视的所有数据库都应该具有所需的权限。

# 创建用户
mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';

# 赋权
mysql> GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';

# 刷新权限
mysql> FLUSH PRIVILEGES;

注意:

scan.incremental.snapshot.enabled 参数已启用时(默认情况下已启用)时,不再需要授予 reload 权限。

 

二. 功能说明

1. 启动模式

配置选项scan.startup.mode指定 MySQL CDC 使用者的启动模式。有效枚举包括:

  • initial (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog

  • earliest-offset:跳过快照阶段,从可读取的最早 binlog 位点开始读取

  • latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog
    的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改

  • specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。

  • timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。



MySQLSource.builder()
    .startupOptions(StartupOptions.earliest()) // 从最早位点启动
    .startupOptions(StartupOptions.latest()) // 从最晚位点启动
    .startupOptions(StartupOptions.specificOffset("mysql-bin.000003", 4L) // 从指定 binlog 文件名和位置启动
    .startupOptions(StartupOptions.specificOffset("24DA167-0C0C-11E8-8442-00059A3C7B00:1-19")) // 从 GTID 集合启动
    .startupOptions(StartupOptions.timestamp(1667232000000L) // 从时间戳启动
    ...
    .build()




CREATE TABLE mysql_source (...) WITH (
    'connector' = 'mysql-cdc',
    'scan.startup.mode' = 'earliest-offset', -- 从最早位点启动
    'scan.startup.mode' = 'latest-offset', -- 从最晚位点启动
    'scan.startup.mode' = 'specific-offset', -- 从特定位点启动
 
    'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- 在特定位点启动模式下指定 binlog 文件名
    'scan.startup.specific-offset.pos' = '4', -- 在特定位点启动模式下指定 binlog 位置
    'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- 在特定位点启动模式下指定 GTID 集合

    'scan.startup.mode' = 'timestamp', -- 从特定位点启动
    'scan.startup.timestamp-millis' = '1667232000000' -- 在时间戳启动模式下指定启动时间戳
    ...
)

 

2. 全量阶段支持 checkpoint

增量快照读取提供了在区块级别执行检查点的能力。它使用新的快照读取机制解决了以前版本中的检查点超时问题。

 

3. 关于无主键表

从2.4.0 版本开始支持无主键表,使用无主键表必须设置 scan.incremental.snapshot.chunk.key-column,且只能选择非空类型的一个字段。

在使用无主键表时,需要注意以下两种情况。

  1. 配置 scan.incremental.snapshot.chunk.key-column 时,如果表中存在索引,请尽量使用索引中的列来加快 select 速度。

  2. 无主键表的处理语义由 scan.incremental.snapshot.chunk.key-column 指定的列的行为决定:

  • 如果指定的列不存在更新操作,此时可以保证 Exactly once 语义。
  • 如果指定的列存在更新操作,此时只能保证 At least once 语义。但可以结合下游,通过指定下游主键,结合幂等性操作来保证数据的正确性。

 

Exactly-Once 处理

MySQL CDC 连接器是一个 Flink Source 连接器,它将首先读取表快照块,然后继续读取 binlog, 无论是在快照阶段还是读取 binlog 阶段,MySQL CDC 连接器都会在处理时准确读取数据,即使任务出现了故障。

 

三. 实战

1. 实现mysql整表与增量表同步

-- 'scan.startup.mode'= 'initial' 
-- 
CREATE TABLE tjy_sql1  
(  
  `id` int,  
  `name` string,  
  `face` string  
 ,PRIMARY KEY(id) NOT ENFORCED  
) WITH (  
        'connector' = 'mysql-cdc',  
        'hostname' = 'xxx',  
        'port' = '3306',  
        'username' = 'middle_test',  
        'password' = '123456',  
        'database-name' = 'middle_test',  
        'table-name' = 'tjy_fortest1'  
       -- ,'scan.incremental.snapshot.enabled' = 'false'  
       --  initial: 默认值,全表同步,然后进行增量同步;
       --  'scan.startup.mode'= 'initial'  
       -- 'debezium.snapshot.mode' = 'initial'      );  
  
  
 CREATE TABLE tjy_sql1_sink  
 (  
  `id` int,  
  `name` string,  
  `face` string  
  ,PRIMARY KEY(id) NOT ENFORCED  
 ) WITH (  
           'connector' = 'mysql-x',  
           'url' = 'jdbc:mysql://xxx:3306/middle_test?useunicode=true&characterEncoding=utf8&useSSL=false&useCursorFetch=true',  
           'username' = 'middle_test',  
           'password' = '123456',  
           'table-name' = 'flink_type',  
           'table-name' = 'tjy_fortest2'  
       );  
  
  
insert into tjy_sql1_sink select * from tjy_sql1;

 

FAQ

相关问题:https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)

可能涉及到的问题

在这里插入图片描述

 

参考:
官网:https://ververica.github.io/flink-cdc-connectors/release-2.4/content/connectors/mysql-cdc%28ZH%29.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1471481.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Node.js】自动生成 API 文档

目录 1、直接使用swagger-ui-express 2、配合swagger-jsdoc 如何在Node.js项目中使用 Swagger 来自动生成 API接口文档&#xff0c;使用生成方式有很多种。本文基于swagger-jsdocswagger-ui-express快速实现 1、直接使用swagger-ui-express // 方便来浏览和测试api npm i sw…

数据可视化引领智慧工业新时代

在智慧工业的大潮中&#xff0c;数据可视化崭露头角&#xff0c;以其直观、清晰的方式赋能工业生产&#xff0c;为智慧工业的高效运转提供了强有力的支持。下面我就以可视化从业者的角度&#xff0c;简单聊聊这个话题。 数据可视化首先在智慧工业的生产监控中大显身手。通过将…

外贸干货|到底跟踪多少次才能拿下客户?

80&#xff05;的销售来源于第4至11次的跟踪&#xff01;这样的调查数据充分说明了深入顾客关系的重要性。而多数情况下&#xff0c;人们只做到了前3层&#xff0c;就放弃了。 作为一名销售&#xff0c;你会对客户坚持跟踪么&#xff1f;在跟踪过程中&#xff0c;该采取什么样…

免费SSL证书申请流程及地址

1&#xff0c;选择证书提供商&#xff1a;有许多机构提供免费的SSL证书&#xff0c;如JoySSL。选择一个可靠的提供商是第一步。 免费SSL证书申请地址https://www.joyssl.com/certificate/select/free.html?nid5 2&#xff0c;验证域名&#xff1a;根据提供商的要求&#xff…

二叉树的链式结构实现

二叉树的链式结构实现 1. 链式存储2. 二叉树的遍历前序遍历中序遍历后序遍历 3. 二叉树遍历的代码实现前序遍历中序遍历后序遍历 4. 二叉树各种相关函数的实现二叉树节点个数二叉树叶子节点个数二叉树的高度二叉树第k层节点个数二叉树查找值为x的节点 5. 代码验证 1. 链式存储 …

Linux系统---nginx(1)

目录 一.Nginx概述 1.定义 2.Nginx模块作用 &#xff08;1&#xff09;main模块 &#xff08;2&#xff09;stream服务模块 &#xff08;3&#xff09;邮件服务模块 &#xff08;4&#xff09;第三方模块 &#xff08;5&#xff09;events模块 &#xff08;6&#xff0…

Go的CSP并发模型实现M, P, G简介

GMP概念简介 G: goroutine&#xff08;协程&#xff0c;也叫用户态线程&#xff09; M: 工作线程(内核态线程) P: 上下文(也可以认为是cpu&#xff0c;逻辑cpu数量&#xff0c;可以在程序启动的时候设置这个数量&#xff0c;gomaxprocs函数设置) GMP 模型 在 Go 中&#xff…

力扣--动态规划1027.最长等差数列

思路分析&#xff1a; 使用动态规划的思想&#xff0c;定义二维数组dp&#xff0c;其中dp[i][j]表示以nums[i]为结尾&#xff0c;公差为(j-1000)的等差数列长度。为了适应负数的情况&#xff0c;将公差的范围设为[-1000, 1000]&#xff0c;并且加上1000作为数组索引。 初始化r…

115/200V 航空交流静变电源 115/200V机场直线加电设备

一、 115/200V 航空交流静变电源简介&#xff1a; 随着全球科技的快速发展和航空产业的不断进步&#xff0c;飞机的性能和功能要求日益提升&#xff0c;对电源设备的需求也更加严格。其中&#xff0c;“115/200V 航空交流静变电源”作为飞机的115/200V机场直线加电设备&#x…

【b站咸虾米】chapter5_uniapp-API_新课uniapp零基础入门到项目打包(微信小程序/H5/vue/安卓apk)全掌握

课程地址&#xff1a;【新课uniapp零基础入门到项目打包&#xff08;微信小程序/H5/vue/安卓apk&#xff09;全掌握】 https://www.bilibili.com/video/BV1mT411K7nW/?p12&share_sourcecopy_web&vd_sourceb1cb921b73fe3808550eaf2224d1c155 目录 5 API 5.1 页面和路…

pytest钩子函数-pytest_runtest_logreport提取测试用例相关信息

问题&#xff1a;想在每个日志中记录测试用例开始结束时间&#xff0c;获取到测试用例的名称。 解决办法&#xff1a;使用钩子pytest_runtest_logreport 在pytest中&#xff0c;想要在conftest.py文件中获取正在运行的测试用例的名称&#xff0c;可以使用pytest_runtest_logre…

云HIS支持连锁集团化管理,1+N模式,支撑运营,管理,决策多位一体

目录 云HIS系统特色 使用简易化 连锁集团化 可扩展化 系统描述 云HIS系统优势 &#xff08;1&#xff09;客户/用户角度 &#xff08;2&#xff09;开发/运维角度 &#xff08;3&#xff09;成功应用案例 HIS分系统&#xff08;HIS子系统&#xff09; 1、医疗业务子…

马思纯后悔未反击伤害,如今瘦身重现巅峰颜值。

♥ 为方便您进行讨论和分享&#xff0c;同时也为能带给您不一样的参与感。请您在阅读本文之前&#xff0c;点击一下“关注”&#xff0c;非常感谢您的支持&#xff01; 文 |猴哥聊娱乐 编 辑|徐 婷 校 对|侯欢庭 近期&#xff0c;电影《热辣滚烫》在各大院线火热上映&#x…

阿里开源的Java诊断利器Arthas

一.什么是Arthas 1.为什么需要Arthas 通常&#xff0c;本地开发环境无法访问生产环境。如果在生产环境中遇到问题&#xff0c;则无法使用 IDE 远程调试。更糟糕的是&#xff0c;在生产环境中调试是不可接受的&#xff0c;因为它会暂停所有线程&#xff0c;导致服务暂停。 开…

第3集《灵峰宗论导读》

《灵峰宗论》导读。诸位法师&#xff0c;诸位同学&#xff0c;阿弥陀佛&#xff01;&#xff08;阿弥陀佛&#xff01;&#xff09; 请大家打开讲义第5面&#xff0c;悟道。 这一科我们是说明论主略史&#xff0c;在这一科当中&#xff0c;我们根据弘一大师所编的《蕅益大师年…

解析ChatGPT Plus相比chatgpt3.5有哪些优势

「ChatGPT Plus」提供更出色的对话体验和更广泛的应用能力&#xff0c;学生可以用来写作、职场人也可以用来写计划书、策划书等等&#xff0c;并且问它一些问题比搜索引擎好用多了简直。但普通人使用起来有一点门槛&#xff0c;并且升级4.0也难住了许多爱好者。 ChatGPT主要功能…

python 基础知识点(蓝桥杯python科目个人复习计划52)

今日复习内容&#xff1a;还是做题 例题1&#xff1a;四元组问题 问题描述&#xff1a; 从小学开始&#xff0c;小明就是一个非常喜欢数学的孩子。他喜欢用数学的方式解决各种问题。在他的高中时期&#xff0c;他遇到了一个非常有趣的问题&#xff0c;那就算给定一个长度为n…

Webserver解决segmentation fault(core dump)段错问问题

前言 在完成了整个项目后&#xff0c;我用make命令编译了server&#xff0c;当我运行./server文件时&#xff0c;出现了段错误 在大量的代码中找出错因并不是一件容易的事&#xff0c;尤其是对新手程序员来说。而寻找bug的过程就像是侦探调查线索追查凶手一样&#xff0c;我们…

vite搭配vue2创建工程

一、安装vite npm init vite2.8.0 vite默认支持的是vue3&#xff0c; 这里选择框架和版本vanilla&#xff0c; 方便以后自己安装vue2. 二、修改package.json 默认生成的pacakage.json文件 {"name": "vite-project","private": true,"v…

AI对话系统app开源

支持对接gpt&#xff0c;阿里云&#xff0c;腾讯云 具体看截图 后端环境&#xff1a;PHP7.4MySQL5.6 软件&#xff1a;uniapp 废话不多说直接上抗揍云链接&#xff1a; https://mny.lanzout.com/iKFRY1o1zusf 部署教程请看源码内的【使用教程】文档 欢迎各位转载该帖/源码