TiDB数据库从入门到精通系列之六:使用 TiCDC 将 TiDB 的数据同步到 Apache Kafka

news2024/10/6 8:27:54

TiDB数据库从入门到精通系列之六:使用 TiCDC 将 TiDB 的数据同步到 Apache Kafka

  • 一、技术流程
  • 二、搭建环境
  • 三、创建Kafka changefeed
  • 四、写入数据以产生变更日志
  • 五、配置 Flink 消费 Kafka 数据

一、技术流程

  • 快速搭建 TiCDC 集群、Kafka 集群和 Flink 集群
  • 创建 changefeed,将 TiDB 增量数据输出至 Kafka
  • 使用 go-tpc 写入数据到上游 TiDB
  • 使用 Kafka console consumer 观察数据被写入到指定的 Topic
  • (可选)配置 Flink 集群消费 Kafka 内数据

二、搭建环境

部署包含 TiCDC 的 TiDB 集群

在实验或测试环境中,可以使用 TiUP Playground 功能,快速部署 TiCDC,命令如下:

tiup playground --host 0.0.0.0 --db 1 --pd 1 --kv 1 --tiflash 0 --ticdc 1
# 查看集群状态
tiup status

三、创建Kafka changefeed

1.创建 changefeed 配置文件

根据 Flink 的要求和规范,每张表的增量数据需要发送到独立的 Topic 中,并且每个事件需要按照主键值分发 Partition。因此,需要创建一个名为 changefeed.conf 的配置文件,填写如下内容:

[sink]
dispatchers = [
{matcher = ['*.*'], topic = "tidb_{schema}_{table}", partition="index-value"},
]

2.创建一个 changefeed,将增量数据输出到 Kafka

tiup ctl:v<CLUSTER_VERSION> cdc changefeed 
create --server="http://127.0.0.1:8300" 
--sink-uri="kafka://127.0.0.1:9092/kafka-topic-name?protocol=canal-json" 
--changefeed-id="kafka-changefeed" 
--config="changefeed.conf"

如果命令执行成功,将会返回被创建的 changefeed 的相关信息,包含被创建的 changefeed 的 ID 以及相关信息,内容如下:

Create changefeed successfully!
ID: kafka-changefeed
Info: {... changfeed info json struct ...}

如果命令长时间没有返回,你需要检查当前执行命令所在服务器到 sink-uri 中指定的 Kafka 机器的网络可达性,保证二者之间的网络连接正常。

生产环境下 Kafka 集群通常有多个 broker 节点,你可以在 sink-uri 中配置多个 broker 的访问地址,这有助于提升 changefeed 到 Kafka 集群访问的稳定性,当部分被配置的 Kafka 节点故障的时候,changefeed 依旧可以正常工作。假设 Kafka 集群中有 3 个 broker 节点,地址分别为 127.0.0.1:9092 / 127.0.0.2:9092 / 127.0.0.3:9092,可以参考如下 sink-uri 创建 changefeed:

tiup ctl:v<CLUSTER_VERSION> cdc changefeed create 
--server="http://127.0.0.1:8300" 
--sink-uri="kafka://127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092/kafka-topic-name?protocol=canal-json&partition-num=3&replication-factor=1&max-message-bytes=1048576" 
--config="changefeed.conf"

3.Changefeed 创建成功后,执行如下命令,查看 changefeed 的状态

tiup ctl:v<CLUSTER_VERSION> cdc changefeed list --server="http://127.0.0.1:8300"

四、写入数据以产生变更日志

完成以上步骤后,TiCDC 会将上游 TiDB 的增量数据变更日志发送到 Kafka,下面对 TiDB 写入数据,以产生增量数据变更日志。

1.模拟业务负载

在测试实验环境下,可以使用 go-tpc 向上游 TiDB 集群写入数据,以让 TiDB 产生事件变更数据。如下命令,首先在上游 TiDB 创建名为 tpcc 的数据库,然后使用 TiUP bench 写入数据到这个数据库中。

tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 prepare
tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 run --time 300s

2.消费 Kafka Topic 中的数据

changefeed 正常运行时,会向 Kafka Topic 写入数据,你可以通过由 Kafka 提供的 kafka-console-consumer.sh,观测到数据成功被写入到 Kafka Topic 中:

./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic `${topic-name}`

至此,TiDB 的增量数据变更日志就实时地复制到了 Kafka。下一步,你可以使用 Flink 消费 Kafka 数据。当然,你也可以自行开发适用于业务场景的 Kafka 消费端。

五、配置 Flink 消费 Kafka 数据

1.安装 Flink Kafka Connector

在 Flink 生态中,Flink Kafka Connector 用于消费 Kafka 中的数据并输出到 Flink 中。Flink Kafka Connector 并不是内建的,因此在 Flink 安装完毕后,还需要将 Flink Kafka Connector 及其依赖项添加到 Flink 安装目录中。下载下列 jar 文件至 Flink 安装目录下的 lib 目录中,如果你已经运行了 Flink 集群,请重启集群以加载新的插件。

  • flink-connector-kafka-1.17.1.jar
  • flink-sql-connector-kafka-1.17.1.jar
  • kafka-clients-3.5.1.jar

2.创建一个表

可以在 Flink 的安装目录执行如下命令,启动 Flink SQL 交互式客户端:

[root@flink flink-1.15.0]# ./bin/sql-client.sh

随后,执行如下语句创建一个名为 tpcc_orders 的表:

CREATE TABLE tpcc_orders (
    o_id INTEGER,
    o_d_id INTEGER,
    o_w_id INTEGER,
    o_c_id INTEGER,
    o_entry_d STRING,
    o_carrier_id INTEGER,
    o_ol_cnt INTEGER,
    o_all_local INTEGER
) WITH (
'connector' = 'kafka',
'topic' = 'tidb_tpcc_orders',
'properties.bootstrap.servers' = '127.0.0.1:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json',
'scan.startup.mode' = 'earliest-offset',
'properties.auto.offset.reset' = 'earliest'
)

请将 topic 和 properties.bootstrap.servers 参数替换为环境中的实际值。

3.查询表内容

执行如下命令,查询 tpcc_orders 表中的数据:

SELECT * FROM tpcc_orders;

执行成功后,可以观察到有数据输出,如下图

在这里插入图片描述
至此,就完成了 TiDB 与 Flink 的数据集成。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/887150.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

离谱的Bug

离谱的 Bug Bug 情况发现 Bug修改 Bug其他感受历史 Bug火星Spirit号Mars Global Surveyor任务 Bug 情况 有一次&#xff0c;我在开发一个网页应用程序时&#xff0c;遇到了一个令人目瞪口呆的Bug。这个Bug出现在一个特定的页面上&#xff0c;当用户点击某个按钮时&#xff0c;…

07微服务的事务管理机制

一句话导读 在单体应用程序中&#xff0c;事务通常是在单个数据库或单个操作系统中管理的&#xff0c;而在微服务架构中&#xff0c;事务需要跨越多个服务和数据库&#xff0c;这就使得事务管理变得更加复杂和困难。 目录 一句话导读 一、微服务事务管理的定义和意义 二、微…

ORCA优化器浅析——CDXLScalar Base class for representing scalar DXL operators

CDXLScalar类作为Base class for representing scalar DXL operators&#xff0c;该类只是定义一些接口&#xff0c;其中实现了GetDXLOperatorType函数&#xff0c;其返回EdxloptypeScalar&#xff0c;代表scalar DXL operators。 class CDXLScalar : public CDXLOperator{ pr…

迅捷视频工具箱:多功能音视频处理软件

这是一款以视频剪辑、视频转换、屏幕录像等特色功能为主&#xff0c;同时附带有视频压缩、视频分割、视频合并等常用视频处理功能为主的视频编辑软件。该软件操作简单易用&#xff0c;即使没有视频处理经验的用户也可以轻松上手。将视频添加到工具箱对应功能后&#xff0c;简单…

01- vdom 和模板编译源码

组件渲染的过程 template --> ast --> render --> vDom --> 真实的Dom --> 页面 Runtime-Compiler和Runtime-Only的区别 - 简书 编译步骤 模板编译是Vue中比较核心的一部分。关于 Vue 编译原理这块的整体逻辑主要分三个部分&#xff0c;也可以说是分三步&am…

sklearn机器学习库(二)sklearn中的随机森林

sklearn机器学习库(二)sklearn中的随机森林 集成算法会考虑多个评估器的建模结果&#xff0c;汇总之后得到一个综合的结果&#xff0c;以此来获取比单个模型更好的回归或分类表现。 多个模型集成成为的模型叫做集成评估器&#xff08;ensemble estimator&#xff09;&#xf…

RabbitMq-1基础概念

RabbitMq-----分布式中的一种通信手段 1. MQ的基本概念&#xff08;message queue,消息队列&#xff09; mq:消息队列&#xff0c;存储消息的中间件 分布式系统通信的两种方式&#xff1a;直接远程调用&#xff0c;借助第三方完成间接通信 消息的发送方是生产者&#xff0c…

爬虫逆向实战(九)--猿人学第十三题

一、数据接口分析 主页地址&#xff1a;猿人学第十三题 1、抓包 通过抓包可以发现数据接口是api/match/13 2、判断是否有加密参数 请求参数是否加密&#xff1f; 无请求头是否加密&#xff1f; 无响应是否加密&#xff1f; 无cookie是否加密&#xff1f; 在“cookie”模块…

python数据分析需要学哪些,python数据分析要学多久

大家好&#xff0c;小编为大家解答python数据分析应该学什么软件的问题。很多人还不知道python数据分析需要什么基础&#xff0c;现在让我们一起来看看吧&#xff01; 根据调查结果&#xff0c;十大最常用的数据工具中有八个来自或利用Python。Python广泛应用于所有数据科学领域…

STM32 FLASH 读写数据

1. 《STM32 中文参考手册》&#xff0c;需要查看芯片数据手册&#xff0c;代码起始地址一般都是0x8000 0000&#xff0c;这是存放整个项目代码的起始地址 2. 编译信息查看代码大小&#xff0c;修改代码后第一次编译后会有这个提示信息 2.1 修改代码后编译&#xff0c;会有提示…

物联网在制造业中的应用

制造业目前正在经历第四次工业革命&#xff0c;物联网、人工智能和机器人等技术进步正在推动行业的发展。研究表明&#xff0c;到2024年&#xff0c;全球制造商将在物联网解决方案上投资700亿美元&#xff0c;许多制造商正在实施物联网设备&#xff0c;以利用预测性维护和复杂的…

优化GitHub网站访问慢的问题

方法一、修改host文件解决 大型网站服务器都不会是只有一台服务器,而是多台服务器组成的集群一起对外提供服务。 使用站长工具测速&#xff0c;找一个速度比较快的服务器。 图中可以看到140.82.121.4这个ip比较快&#xff0c; 下面修改hosts: Mac 在 /etc/hosts 中&#x…

.net连接mysql,提示找不到请求的 .Net Framework Data Provider。可能没有安装

开发完成的.net程序需要连接mysql数据库&#xff0c;在个人电脑上运行没问题&#xff0c;别人运行时提示“提示找不到请求的 .Net Framework Data Provider。可能没有安装”。经过查询&#xff0c;安装Connector/NET 8.1.0&#xff0c;下载地址如下所示&#xff1a; https://d…

Mac OS minicom 无法设置921600问题

MacOS minicom 无法设置921600问题 介绍过程解决方案参考资料 介绍 minicom是Mac上一款非常好用的串口工具。本文假设你已经安装minicom&#xff0c;并且知道minicom的一般配置和使用方法。这是“MacOS minicom 无法设置921600”的解决问题记录。它在以下环境中设置成功&#…

【制作npm包1】申请npm账号、认识个人包和组织包

概述 在开发当中经常有一种现象&#xff0c;重复代码写了N多遍&#xff0c;再次写同样的逻辑就再次翻查以前的代码逻辑。效率低下且容易出错&#xff0c;封装一个npm包的价值也不仅仅是给别人用&#xff0c;封装一套属于自己或者本部门的npm包也是相当有必要。 也许经常看到一…

fastadmin 自定义按钮弹窗不是异步xhr提交

遇到一个奇怪的问题 按官方文档要求&#xff0c;js中也重新绑定事件了 但弹窗出来的表单还不是xhr提交&#xff0c;这为什么&#xff1f; 经过我不断测试发现&#xff0c;如上述的方法名不能带有下划线&#xff0c;蛇形大小写&#xff0c;否则一律不生效。 浪费了我一天半时…

小程序CSS button按钮自定义高度之后不居中

问题&#xff1a; 按钮设置高度后不居中 <view><button class"btn1" size"">Save</button> </view> page {font-size: 30rpx; }.btn1 {margin-top: 100rpx;height: 190rpx;background: linear-gradient(90deg, #FF8A06, #FF571…

webSocket 聊天室 node.js 版

全局安装vue脚手架 npm install vue/cli -g 创建 vue3 ts 脚手架 vue create vue3-chatroom 后端代码 src 同级目录下建 server: const express require(express); const app express(); const http require(http); const server http.createServer(app);const io req…

微信小程序:模板使用

目录 模板的优点&#xff1a; 一、静态模板创建 二、静态模板使用 1.*.wxml引入模板 2.模板使用 3.*.wxss引入模板的样式 三、动态模板创建 四、动态模板使用 1.*.wxml引入模板 2.模板使用 3.*.js定义动态数据 五、结果展示 总结 模板的优点&#xff1a; 有利于保持网…

第二章 搜索 No.1BFS之Flood Fill与最短路模型

文章目录 Flood Fill算法1097. 池塘计数1098. 城堡问题1106. 山峰和山谷 最短路模型1076. 迷宫问题188. 武士风度的牛1100. 抓住那头牛 Flood Fill算法 可以在线性时间复杂度内&#xff0c;找到某个点所在的连通块 想象一个矩阵&#xff0c;有洼地和高地&#xff0c;选择一个洼…