Canal+Kafka实现Mysql数据同步

news2025/1/17 0:05:10

Canal介绍

canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。

canal是应阿里巴巴存在杭州和美国的双机房部署,存在跨机房同步的业务需求而提出的。

阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。

canal主要用途是基于 MySQL 数据库增量日志解析,并能提供增量数据订阅和消费,应用场景十分丰富。

目前canal主要支持mysql数据库。

github地址:https://github.com/alibaba/canal

版本下载地址:https://github.com/alibaba/canal/releases

文档地址:https://github.com/alibaba/canal/wiki/Docker-QuickStart

Canal应用场景

1)、电商场景下商品、用户实时更新同步到至Elasticsearch、solr等搜索引擎;
2)、价格、库存发生变更实时同步到redis;
3)、数据库异地备份、数据同步;
4)、代替使用轮询数据库方式来监控数据库变更,有效改善轮询耗费数据库资源。

image.png

MySQL主从复制原理

1)、MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
2)、MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
3)、MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

Canal工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

image.pngCanal安装

参考文档:https://github.com/alibaba/canal/wiki/QuickStart

Canal配置

mq相关参数说明 (>=1.1.5版本)

在1.1.5版本开始,引入了MQ Connector设计,参数配置做了部分调整

参数名

参数说明

默认值

canal.aliyun.accessKey

阿里云ak

canal.aliyun.secretKey

阿里云sk

canal.aliyun.uid

阿里云uid

canal.mq.flatMessage

是否为json格式 如果设置为false,对应MQ收到的消息为protobuf格式 需要通过CanalMessageDeserializer进行解码

false

canal.mq.canalBatchSize

获取canal数据的批次大小

50

canal.mq.canalGetTimeout

获取canal数据的超时时间

100

canal.mq.accessChannel = local

是否为阿里云模式,可选值local/cloud

local

canal.mq.database.hash

是否开启database混淆hash,确保不同库的数据可以均匀分散,如果关闭可以确保只按照业务字段做MQ分区计算

true

canal.mq.send.thread.size

MQ消息发送并行度

30

canal.mq.build.thread.size

MQ消息构建并行度

8

kafka.bootstrap.servers

kafka服务端地址

127.0.0.1:9092

kafka.acks

kafka为ProducerConfig.ACKS_CONFIG

all

kafka.compression.type

压缩类型

none

kafka.batch.size

kafka为ProducerConfig.BATCH_SIZE_CONFIG

16384

kafka.linger.ms

kafka为ProducerConfig.LINGER_MS_CONFIG , 如果是flatMessage格式建议将该值调大, 如: 200

1

kafka.max.request.size

kafka为ProducerConfig.MAX_REQUEST_SIZE_CONFIG

1048576

kafka.buffer.memory

kafka为ProducerConfig.BUFFER_MEMORY_CONFIG

33554432

kafka.max.in.flight.requests.per.connection

kafka为ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION

1

kafka.retries

发送失败重试次数

0

kafka.kerberos.enable

kerberos认证

false

kafka.kerberos.krb5.file

kerberos认证

../conf/kerberos/krb5.conf

kafka.kerberos.jaas.file

kerberos认证

../conf/kerberos/jaas.conf

rocketmq.producer.group

rocketMQ为ProducerGroup名

test

rocketmq.enable.message.trace

是否开启message trace

false

rocketmq.customized.trace.topic

message trace的topic

rocketmq.namespace

rocketmq的namespace

rocketmq.namesrv.addr

rocketmq的namesrv地址

127.0.0.1:9876

rocketmq.retry.times.when.send.failed

重试次数

0

rocketmq.vip.channel.enabled

rocketmq是否开启vip channel

false

rocketmq.tag

rocketmq的tag配置

空值

rabbitmq.host

rabbitMQ配置

rabbitmq.virtual.host

rabbitMQ配置

rabbitmq.exchange

rabbitMQ配置

rabbitmq.username

rabbitMQ配置

rabbitmq.password

rabbitMQ配置

rabbitmq.deliveryMode

rabbitMQ配置

pulsarmq.serverUrl

pulsarmq配置

pulsarmq.roleToken

pulsarmq配置

pulsarmq.topicTenantPrefix

pulsarmq配置

canal.mq.topic

mq里的topic名

canal.mq.dynamicTopic

mq里的动态topic规则, 1.1.3版本支持

canal.mq.partition

单队列模式的分区下标,

1

canal.mq.enableDynamicQueuePartition

动态获取MQ服务端的分区数,如果设置为true之后会自动根据topic获取分区数替换canal.mq.partitionsNum的定义,目前主要适用于RocketMQ

false

canal.mq.partitionsNum

散列模式的分区数

canal.mq.dynamicTopicPartitionNum

mq里的动态队列分区数,比如针对不同topic配置不同partitionsNum

canal.mq.partitionHash

散列规则定义 库名.表名 : 唯一主键,比如mytest.person: id 1.1.3版本支持新语法,见下文

canal.mq.dynamicTopic 表达式说明

canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多个配置之间使用逗号或分号分隔

  • 例子1:test\\.test 指定匹配的单表,发送到以test_test为名字的topic上
  • 例子2:.*\\..* 匹配所有表,则每个表都会发送到各自表名的topic上
  • 例子3:test 指定匹配对应的库,一个库的所有表都会发送到库名的topic上
  • 例子4:test\\..* 指定匹配的表达式,针对匹配的表会发送到各自表名的topic上
  • 例子5:test,test1\\.test1,指定多个表达式,会将test库的表都发送到test的topic上,test1\\.test1的表发送到对应的test1_test1 topic上,其余的表发送到默认的canal.mq.topic值

为满足更大的灵活性,允许对匹配条件的规则指定发送的topic名字,配置格式:topicName:schema 或 topicName:schema.table

  • 例子1: test:test\\.test 指定匹配的单表,发送到以test为名字的topic上
  • 例子2: test:.*\\..* 匹配所有表,因为有指定topic,则每个表都会发送到test的topic下
  • 例子3: test:test 指定匹配对应的库,一个库的所有表都会发送到test的topic下
  • 例子4:testA:test\\..* 指定匹配的表达式,针对匹配的表会发送到testA的topic下
  • 例子5:test0:test,test1:test1\\.test1,指定多个表达式,会将test库的表都发送到test0的topic下,test1\\.test1的表发送到对应的test1的topic下,其余的表发送到默认的canal.mq.topic值

大家可以结合自己的业务需求,设置匹配规则,建议MQ开启自动创建topic的能力

canal.mq.partitionHash 表达式说明

canal 1.1.3版本之后, 支持配置格式:schema.table:pk1^pk2,多个配置之间使用逗号分隔

  • 例子1:test\\.test:pk1^pk2 指定匹配的单表,对应的hash字段为pk1 + pk2
  • 例子2:.*\\..*:id 正则匹配,指定所有正则匹配的表对应的hash字段为id
  • 例子3:.*\\..*:$pk$ 正则匹配,指定所有正则匹配的表对应的hash字段为表主键(自动查找)
  • 例子4: 匹配规则啥都不写,则默认发到0这个partition上
  • 例子5:.*\\..* ,不指定pk信息的正则匹配,将所有正则匹配的表,对应的hash字段为表名
    • 按表hash: 一张表的所有数据可以发到同一个分区,不同表之间会做散列 (会有热点表分区过大问题)

  • 例子6: test\\.test:id,.\\..* , 针对test的表按照id散列,其余的表按照table散列

注意:大家可以结合自己的业务需求,设置匹配规则,多条匹配规则之间是按照顺序进行匹配(命中一条规则就返回)

其他详细参数可参考Canal AdminGuide

mq顺序性问题

binlog本身是有序的,写入到mq之后如何保障顺序是很多人会比较关注,在issue里也有非常多人咨询了类似的问题,这里做一个统一的解答

  1. 1.

    canal目前选择支持的kafka/rocketmq,本质上都是基于本地文件的方式来支持了分区级的顺序消息的能力,也就是binlog写入mq是可以有一些顺序性保障,这个取决于用户的一些参数选择

  2. 2.

    canal支持MQ数据的几种路由方式:单topic单分区,单topic多分区、多topic单分区、多topic多分区

  • canal.mq.dynamicTopic,主要控制是否是单topic还是多topic,针对命中条件的表可以发到表名对应的topic、库名对应的topic、默认topic name
  • canal.mq.partitionsNum、canal.mq.partitionHash,主要控制是否多分区以及分区的partition的路由计算,针对命中条件的可以做到按表级做分区、pk级做分区等
  1. 1.

    canal的消费顺序性,主要取决于描述2中的路由选择,举例说明:

  • 单topic单分区,可以严格保证和binlog一样的顺序性,缺点就是性能比较慢,单分区的性能写入大概在2~3k的TPS
  • 多topic单分区,可以保证表级别的顺序性,一张表或者一个库的所有数据都写入到一个topic的单分区中,可以保证有序性,针对热点表也存在写入分区的性能问题
  • 单topic、多topic的多分区,如果用户选择的是指定table的方式,那和第二部分一样,保障的是表级别的顺序性(存在热点表写入分区的性能问题),如果用户选择的是指定pk hash的方式,那只能保障的是一个pk的多次binlog顺序性 ** pk hash的方式需要业务权衡,这里性能会最好,但如果业务上有pk变更或者对多pk数据有顺序性依赖,就会产生业务处理错乱的情况. 如果有pk变更,pk变更前和变更后的值会落在不同的分区里,业务消费就会有先后顺序的问题,需要注意

性能表现

Kafka + 混合DML场景测试

场景

1个topic + 单分区

1个topic+3分区

2个topic+1分区

2个topic+3分区

不开启flatMessage

29.6k rps (9.71k tps)

17.54k rps (6.53k tps)

21.6k rps (7.9k tps)

16.8k rps (5.71k tps)

开启flatMessage

11.79k rps (4.36k tps)

15.97 rps (5.94k tps)

11.91k rps (4.45k tps)

16.96k rps (6.26k tps)

Kafka + 单表的batch insert场景测试

场景

1个topic + 单分区

1个topic+3分区

不开启flatMessage

59.6k rps

45.1k rps

开启flatMessage

51.3k rps

49.6k rps


RocketMQ + 混合DML场景测试

场景

1个topic + 单分区

1个topic+3分区

2个topic+1分区

2个topic+3分区

不开启flatMessage

29.6k rps (10.71k tps)

23.3k rps (8.59k tps)

26.7k rps (9.46k tps)

21.7k rps (7.66k tps)

开启flatMessage

16.75k rps (6.17k tps)

14.96k rps (5.55k tps)

17.83k rps (6.63k tps)

16.93k rps (6.26k tps)

RocketMQ + 单表的batch insert场景测试

场景

1个topic + 单分区

1个topic+3分区

不开启flatMessage

81.2k rps

51.3k rps

开启flatMessage

62.6k rps

57.9k rps

附录:

canal官方文档:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

Canal+MQ性能表现:https://github.com/alibaba/canal/wiki/Canal-MQ-Performance

参考文档:https://www.cnblogs.com/zwh0910/p/17043265.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/857954.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

什么是NVME

1. 概念 NVM Express(NVMe),或称非易失性内存主机控制器接口规范(Non-Volatile Memory express),,是一个逻辑设备接口规范。他是与AHCI类似的、基于设备逻辑接口的总线传输协议规范(相当于通讯协议中的应用层&#xf…

全开源跨境电商一键铺货货源平台--后台数据采集功能

数据库设计在设计数据库时,需要考虑到以下信息: 货源信息:包括货源标题、价格、描述、图片等信息。 用户信息:包括用户名、密码、邮箱、电话等信息。 订单信息:包括订单号、用户信息、货源信息、支付信息等。 支付信息…

磁盘的管理

一、磁盘的分区 查看磁盘 lsblk fdisk -l 2、分区 没有e扩展,则都是主分区,已经有三个主分区了,剩下的全设置为扩展 查看分区结果: 二、格式化 三、挂载

Nginx1.24源码安装与部署

环境依赖包 zlib zlib-devel pcre pcre-devel 下载nginx安装包 1、下载nginx [rootnode01 ~]# wget http://nginx.org/download/nginx-1.24.0.tar.gz --2023-07-18 16:49:00-- http://nginx.org/download/nginx-1.24.0.tar.gz Resolving nginx.org (nginx.org)... 3.125.19…

Games101学习笔记1

2023-08-10开始接触图形学 Lecture 01--Overview of Computer Graphics Lecture 02 Review of Linear Algebra

UBIFS文件系统

https://blog.csdn.net/shichaog/article/details/45932339 引言 什么是UBIFS文件系统 UBIFS是UBI file system的简称,用于裸的flash设备,作为jffs2的后继文件系统之一。UBIFS通过UBI子系统处理与MTD设备之间动作。UBIFS文件系统更适合MLCNAND FLASH。…

第三章 图论 No.9有向图的强连通与半连通分量

文章目录 定义Tarjan求SCC1174. 受欢迎的牛367. 学校网络1175. 最大半连通子图368. 银河 定义 连通分量是无向图的概念,yxc说错了,不要被误导 强连通分量:在一个有向图中,对于分量中的任意两点u,v,一定能从…

ASP.NET Core中间件记录管道图和内置中间件

管道记录 下图显示了 ASP.NET Core MVC 和 Razor Pages 应用程序的完整请求处理管道 中间件组件在文件中添加的顺序Program.cs定义了请求时调用中间件组件的顺序以及响应的相反顺序。该顺序对于安全性、性能和功能至关重要。 内置中间件记录 内置中间件原文翻译MiddlewareDe…

生产排查org.apache.http.NoHttpResponseException: 127.0.0.1:9000 failed to respond

生产环境,请求方调用我方地址,发生异常NoHttpResponseException,错误详情: org.apache.http.NoHttpResponseException: 127.0.0.1:9000 failed to respondat org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(Def…

C++核心编程:函数提高

函数默认参数 在C中&#xff0c;函数的形参列表是可以有默认值的。 语法&#xff1a; 返回值类型 函数名 (参数 默认值){}示例&#xff1a; #include<iostream> using namespace std;//函数默认参数//如果我们传入了自己的数据就使用自己的数据&#xff0c;否则就是…

联合体union

结构体中的成员彼此是独立存在的&#xff0c;分布在不同的内存单元中 共用体的成员是“一体的”&#xff0c;使用同一个内存单元 #include<stdio.h> int main() {union u {int n;char c[4];};union u u1;u1.n 0x11223344;printf("%x\n", u1.n); …

软工导论知识框架(六)面向对象分析

前言&#xff1a;绘制各种类型的图是重点&#xff0c;对于面向对象建模中需要绘制的图总结在第五期中&#xff1a; ​软工导论知识框架&#xff08;五&#xff09;面向对象方法学 一.分析过程 1.获取需求 与用户交谈&#xff0c;向用户提问题&#xff1b; 参观用户的工作流…

YOLOv5可视化界面

Pyside6可视化界面 安装Pyside6 激活之前的虚拟环境yolov5 在该环境的终端输入以下命令 pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyside6输入where python找到当前使用的Python的路径 找到该路径下的designer.exe文件&#xff08;/Lib/site-packages/PySi…

IP地址定位技术在电商行业的应用

IP地址定位技术在电商行业的广泛应用引起了行业内外的广泛关注。该技术能够提供精确的位置信息&#xff0c;对于电商运营商而言有助于提供更加个性化和便捷的购物体验。 IP地址定位是一种通过Internet协议&#xff08;IP&#xff09;地址识别用户地理位置的技术。在电商行业中&…

windows录屏指南,最详细的教程来了!

在现代科技时代&#xff0c;录屏成为了一种非常常见的技术&#xff0c;它可以让我们轻松记录电脑屏幕上的内容&#xff0c;包括教学、演示、游戏体验等。windows作为全球最流行的操作系统之一&#xff0c;自然也提供了多种方式来录制屏幕。本文将为您介绍三种windows录屏方法&a…

B2B2C多用户手机购物商城快速搭建(java开源)

要快速搭建一个B2B2C多用户手机购物商城&#xff0c;需要使用Java语言和开源框架进行开发。以下是一个基本的搭建步骤&#xff1a; 选择合适的开发框架 首先需要选择一个适合开发B2B2C多用户手机购物商城的Java开源框架&#xff0c;它提供了丰富的功能模块和灵活的扩展性&…

cesium学习记录05-加载数据

1. 矢量数据&#xff1a; 1.1. GeoJSON 定义&#xff1a; 一个基于JSON的地理数据格式&#xff0c;Cesium支持GeoJSON的直接加载。 例子&#xff1a; 加载一个简易故宫建筑的GeoJSON数据。 代码&#xff1a; /*** 添加故宫geojson数据*/AddGuGong() {var viewer this.v…

基于百度语音识别API智能语音识别和字幕推荐系统——深度学习算法应用(含全部工程源码)+测试数据集

目录 前言总体设计系统整体结构图系统流程图 运行环境模块实现1. 数据预处理2. 翻译3. 格式转换4. 音频切割5. 语音识别6. 文本切割7. main函数 系统测试工程源代码下载其它资料下载 前言 本项目基于百度语音识别API&#xff0c;结合了语音识别、视频转换音频识别以及语句停顿…

从数据仓库到数据结构:数据架构的演变之路

在上个世纪&#xff0c;从电子商务巨头到医疗服务机构和政府部门&#xff0c;数据已成为每家组织的生命线。有效地收集和管理这些数据可以为组织提供宝贵的洞察力&#xff0c;以帮助决策&#xff0c;然而这是一项艰巨的任务。 尽管数据很重要&#xff0c;但CIOinsight声称&…

【C++】AVL树模拟实现插入功能

AVL树模拟实现插入 前言正式开始树节点树insert旋转左单旋右单旋左右双旋右左双旋 用旋转来平衡树测试 前言 本篇主要介绍AVL树的插入功能。其中就包含了最重要的旋转。 通过旋转来使得树平衡&#xff0c;是学习AVL树的一个重点&#xff0c;也是也是一个难点。 正式开始 先…