flinkcdc 动态的增加新的同步表到同一个作业中

news2025/1/6 4:17:16

背景

flinkcdc 2.0版本上线了一个新功能–支持动态加表这个是很有用的feature,本文介绍在开发中如何使用。

设想下假如你一个 CDC pipeline 监控了 4 张表,突然有天业务需求需要再加几张表,你肯定不想另起作业 (浪费资源),那么这个
feature 可以让你在当前作业直接增加需要监控的表。新增表都是先做全量再优雅地切换到增量,遇到新增监控表时不用新起作业,极大地节约了资源

flinkcdc 使用 flink-sql 的方式每同步一张表都需要启动一个新的作业,因此不存在同一个作业中 新增表的问题。该feature主要针对 api 的方式。flinkcdc 只能作为 source 用于采集数据,常用的使用方法如下:

private static MySqlSource<String> createMysqlSource(ParameterTool param) {
        return MySqlSource.<String>builder()
                .hostname(param.get(ConfigConstant.SOURCE_MYSQL_HOST))  // mysql host, 127.0.0.1
                .port(param.getInt(ConfigConstant.SOURCE_MYSQL_PORT))  // mysql port,  3306
                .username(param.get(ConfigConstant.SOURCE_MYSQL_USERNAME)) // username
                .password(param.get(ConfigConstant.SOURCE_MYSQL_PASS)) // pass
                .databaseList("aa") //库名
                .tableList("aa.stu_1") // 表名
                .deserializer(new MysqlBinlogSerialize())
                .build();
    }

这段代码使用flinkcdc 创建了一个 采集mysql 的数据源,采集表名为"aa.stu_1"。

假设我的作业已经上线,这时候我期望在这个作业中新增采集表:“aa.stu_2” 我该如何做呢?

解决方案

1、第一步:修改代码,创建cdc 数据源的时候,需要指定 .scanNewlyAddedTableEnabled(true)
下面是修改后的代码:

private static MySqlSource<String> createMysqlSource(ParameterTool param) {
        return MySqlSource.<String>builder()
                .hostname(param.get(ConfigConstant.SOURCE_MYSQL_HOST))  // mysql host, 127.0.0.1
                .port(param.getInt(ConfigConstant.SOURCE_MYSQL_PORT))  // mysql port,  3306
                .username(param.get(ConfigConstant.SOURCE_MYSQL_USERNAME)) // username
                .password(param.get(ConfigConstant.SOURCE_MYSQL_PASS)) // pass
                .databaseList("aa") //库名
                .tableList("aa.stu_1", "aa.stu_2") // 表名
                .deserializer(new MysqlBinlogSerialize())
                .scanNewlyAddedTableEnabled(true)
                .build();
    }

注意:.startupOptions() 需要配置成 StartupOptions.initial(),因为默认就是 init,可以不写。

2、第二步:cancel job 取消作业,并记录checkpoint 路径
在这里插入图片描述
3、第三步:上传 新的 jar 包后,从刚刚记录的 checkpoint 启动。
作业启动后,会全量同步 aa.stu_2 表数据,然后增量同步 aa.stu_1、aa.stu_2。

至此,修改完成。

其他事项:

我最开始的时候重启了作业,发现新增的表 并没有同步,在日志中找到如下报错信息:

2023-04-20 16:49:01,778 ERROR io.debezium.connector.mysql.MySqlStreamingChangeEventSource  [] - Encountered change event 'Event{header=EventHeaderV4{timestamp=1681980541000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=35, nextPosition=16589, flags=0}, data=TableMapEventData{tableId=252, database='shou_xian', table='stu_2', columnTypes=3, 3, columnMetadata=0, 0, columnNullability={1}, eventMetadata=null}}' at offset {transaction_id=null, file=mysql_bin.000025, pos=16458, server_id=1, event=1} for table shou_xian.stu_2 whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.
Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position=16535 --stop-position=16589 --verbose mysql_bin.000025
2023-04-20 16:49:01,779 ERROR io.debezium.connector.mysql.MySqlStreamingChangeEventSource  [] - Error during binlog processing. Last offset stored = null, binlog reader near position = mysql_bin.000025/16535
2023-04-20 16:49:01,779 WARN  com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler [] - Schema for table shou_xian.stu_2 is null
2023-04-20 16:49:01,779 INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource  [] - Error processing binlog event, and propagating to Kafka Connect so it stops this connector. Future binlog events read before connector is shutdown will be ignored.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/440997.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

大数据——HDFS(分布式文件系统)

一&#xff0c;分布式系统概述 Hadoop的两大核心组件 HDFS&#xff08;Hadoop Distributed Filesystem&#xff09;&#xff1a;是一个易于扩展的分布式文件系统&#xff0c;运行在成百上千台低成本的机器上。HDFS具有高度容错能力&#xff0c;旨在部署在低成本机器上。HDFS主…

日撸 Java 三百行day34

文章目录 说明Day34 图的深度优先遍历1.思路2.代码3.总结1.在广度遍历中借助了队列2.在深度优先遍历借助了栈。 说明 闵老师的文章链接&#xff1a; 日撸 Java 三百行&#xff08;总述&#xff09;_minfanphd的博客-CSDN博客 自己也把手敲的代码放在了github上维护&#xff1a…

Android 开发之核心技术点——性能优化篇(带面试题)~

性能优化对于Android开发的重要性非常大。随着Android设备的不断升级&#xff0c;用户对应用的要求也越来越高&#xff0c;包括应用的运行速度、响应速度、流畅度等方面。如果应用的性能不能满足用户的需求&#xff0c;很可能会导致用户流失、差评以及应用被卸载等情况。 另外…

boot-admin整合flowable官方editor-app进行BPMN2.0建模

boot-admin整合flowable官方editor-app源码进行BPMN2.0建模 正所谓百家争鸣、见仁见智、众说纷纭、各有千秋&#xff01;在工作流bpmn2.0可视化建模工具实现的细分领域&#xff0c;网上扑面而来的是 bpmn.js 这个渲染工具包和web建模器&#xff0c;而笔者却认为使用flowable官…

2023零基础快速跟上人工智能第一梯队

写在前面&#xff1a;有关人工智能学什么&#xff0c;怎么学&#xff0c;什么路线等一系列问题。我决定整理一套可行的规划路线&#xff0c;希望帮助准备入门的朋友们少走些弯路。 下面我会推荐一个比较快速可行的学习模板&#xff0c;并附上我认为比较好的学习资料。 新手不建…

git使用规范文档

git使用规范文档 Git使用规范流程图 开发人员操作步骤&#xff1a; 第一步&#xff1a;clone代码 在你的本地代码库进行从远程仓库clone代码操作&#xff08;100%表示clone完成&#xff09; 进入项目文件&#xff0c;右键Git Bash Here 切换到你所进行开发的分支上 拉取该分…

JavaSE学习进阶day05_02 常见的数据结构和List接口

第三章 数据结构&#xff08;掌握&#xff09; 3.1 数据结构介绍 数据结构 : 数据用什么样的方式组合在一起。 科班出身的同学我想你对数据结构一点也不陌生&#xff0c;不知道你记不记得&#xff0c;当时学习数据结构的逻辑结构中的集合时&#xff0c;只是简单了解它&#…

hackathon 复盘:niche 海外软件工具正确的方法 6 个步骤

上周末&#xff0c;去参加了北京思否 hackathon&#xff0c;两天时间内从脑暴 & 挖掘软件 IDEA -> Demo 研发路演&#xff0c;这次经历让我难忘。这里我的看法是每个开发者圈友&#xff0c;都应该去参加一次 hackathon ~ 做 niche 软件正确的方法 这边先说结论&#xf…

vmware下Ubuntu系统中安装vscode

文章目录 前言&#xff1a;在线下载&#xff1a;离线下载包&#xff1a;配置C/C环境 前言&#xff1a; 这篇博客是为后面交叉编译程序放到树莓派上运行做的准备。同时也是自己在装过程中的一个记录。 在线与离线安装的唯一不同就是获取安装包是在线下载还是别的地方拷贝过来以…

【数据结构】- 链表之单链表(中)

文章目录 前言一、单链表(中)1.1 头删1.2尾删1.2.1第一种方法&#xff1a;1.2.2第二种方法&#xff1a;1.2.3多因素考虑 二、完整版代码2.1 SList.h2.2 SList.c2.3 Test.c 总结 前言 千万不要放弃 最好的东西 总是压轴出场 本章是关于数据结构中的链表之单链表(中) 提示&#…

数据结构与算法基础(王卓)(26)线性表的查找(2):顺序查找(二分查找、分块查找)

二、折半查找&#xff08;二分或对分查找) 前置条件和前面一样 最开始根据PPT示(实)例写出的程序框架&#xff1a; 一开始&#xff1a; low&#xff1a;第一位 high&#xff1a;最后一位 mid&#xff1a;正中间 查找数小于mid&#xff1a; 把high移动到mid前面一位&#xff08;…

从0搭建Vue3组件库(四): 如何开发一个组件

本篇文章将介绍如何在组件库中开发一个组件,其中包括 如何本地实时调试组件如何让组件库支持全局引入如何在 setup 语法糖下给组件命名如何开发一个组件 目录结构 在packages目录下新建components和utils两个包,其中components就是我们组件存放的位置,而utils包则是存放一些…

观看js编程范式笔记(函数式编程)

js为什么鼓励函数式编程&#xff1f; JavaScript&#xff08;简称 JS&#xff09;是一种面向对象和函数式编程语言&#xff0c;但它在语言层面上更加鼓励函数式编程。以下是几个原因&#xff1a; 函数是一等公民&#xff1a;在 JavaScript 中&#xff0c;函数被视为一等公民&a…

HANA SDA连接外部数据库到BW的步骤

咱都知道&#xff0c;我们不能直接从BW连接到外部数据库。第一步得从HANA database通过SDA去建一个到外部DB的连接。 数据库连接好了&#xff0c;那么接下来别忘了&#xff0c;还得建一个源系统。 也就是说第一步&#xff0c;我们要用HANA SDA通过Linux ODBC driver去连接外部…

Vue3表格(Table)

Vue2表格&#xff08;Table&#xff09; 可自定义设置以下属性&#xff1a; 表格列的配置项&#xff08;columns&#xff09;&#xff0c;类型&#xff1a;Array<{title?: string, width?: number, dataIndex?: string, slot?: string}>&#xff0c;默认 [] 表格数…

史上最全面的苹果公司PMO的运作模式详解

01 苹果公司PMO的发展历程 1. 初期阶段&#xff1a; 在苹果公司刚创立的早期&#xff0c;没有明确的PMO组织。项目经理直接向CEO Steve Jobs汇报&#xff0c;项目管理在公司内部较为分散。 2. 1997年-2001年&#xff1a; 在这段时间内&#xff0c;苹果公司开始成立项目管理…

PasteSpider之关于字符串模板占位字符等的说明

PasteSpider中&#xff0c;构建&#xff0c;部署等都是通过命令执行的&#xff0c;为了更加的灵活&#xff0c;引入了不同的变量&#xff0c;以便适合不同的需求使用。 命令占位符 注&#xff01;&#xff01;&#xff01;&#xff0c;占位符的格式为{{对象.属性}},他们之间没有…

【LeetCode: 1691. 堆叠长方体的最大高度 | 暴力递归=>记忆化搜索=>动态规划】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…

vue2+vue3——42+

vue2vue3——42 vue2 v-cloak指令【14:14】调网速 &#xff1a; no throttling 不让慢 &#xff1b; offline 断网JS 阻塞红色 外部JS &#xff1b; 绿色 网页核心 &#xff1b; 粉色 JS 脚本红色 外部JS 我要走不了&#xff0c; 谁都别想走 &#xff1a; 绿色 不可以渲染到页面…

【安全与风险】互联网协议漏洞

互联网协议漏洞 互联网基础设施TCP协议栈因特网协议&#xff08;IP&#xff09;IP路由IP协议功能(概述)问题:没有src IP认证用户数据报协议&#xff08;UDP&#xff09;传输控制协议 (TCP)TCP报头TCP(三向)握手基本安全问题数据包嗅听TCP连接欺骗随机初始TCP SNs 路由的漏洞Arp…