Canal同步mysql binlog至pulsar

news2025/1/10 5:55:27

Canal

一、简介

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vMmnZjnN-1670828694955)(/Users/hanwang/Library/Application Support/typora-user-images/image-20221212143319290.png)]

canal [kə’næl],主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

二、工作原理

在这里插入图片描述

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

三、部署

1、准备

  • 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
    
    • 注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
  • 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;
    

2、配置并启动

2.1、下载 canal, 以最新版1.1.6 版本为例

wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz

2.2、解压缩

mkdir /opt/canal/canal.deployer
tar zxvf canal.deployer-1.1.6.tar.gz  -C /opt/canal/canal.deployer

解压完成后,进入/opt/canal/canal.deployer 目录,可以看到如下结构

drwxr-xr-x 2 root root   93 12月 12 14:28 bin
drwxr-xr-x 6 root root  179 12月 12 14:28 conf
drwxr-xr-x 2 root root 4096 12月 12 11:41 lib
drwxrwxrwx 5 root root   62 12月 12 14:05 logs
drwxrwxrwx 2 root root  235 8月  11 10:52 plugin

2.3、Canal server配置修改

$ cat  canal.properties
#################################################
######### 		common argument		#############
#################################################
# tcp bind ip
这里修改为canal服务ip地址
canal.ip = 192.168.150.138
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =

canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
这里修改为pulsarMQ
canal.serverMode = pulsarMQ
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

#################################################
######### 		destinations		#############
#################################################
这里可以设置自定义目录(即instance名称),需要在该目录下添加配置文件instance.properties
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
######### 	      MQ Properties      #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=

canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local

canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8

##################################################
######### 		     Kafka 		     #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0

kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"

##################################################
######### 		    RocketMQ	     #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =

##################################################
######### 		    RabbitMQ	     #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =


##################################################
######### 		      Pulsar         #############
##################################################
这里修改为对应的pulsar地址
pulsarmq.serverUrl = pulsar://192.168.150.139:6650
pulsarmq.roleToken =
pulsarmq.topicTenantPrefix = public/default

2.4、Canal instance配置修改:

vi conf/example/instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
需要改成自己的数据库信息
canal.instance.master.address=192.168.150.140:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
需要改成自己的数据库信息
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@123
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
需要同步的数据库或数据库表
canal.instance.filter.regex=canal\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
自定义pulsar中topic的名字
canal.mq.topic=sync-mysql-bin-test1
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
  • canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1
  • 如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false

2.5、启动

sh bin/startup.sh

2.6、查看 server 日志

tail logs/canal/canal.log
2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

2.7、查看 instance 的日志

vi logs/example/example.log
2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

2.8、该数据库里有binlog变更后,查看队列日志

$ tail  /opt/canal/canal.deployer/logs/example/meta.log
2022-12-12 14:29:01.169 - clientId:1001 cursor:[mysql-bin.000002,40134,1670826065000,1,] address[/111.111.0.117:3306]
2022-12-12 14:29:20.169 - clientId:1001 cursor:[mysql-bin.000002,40401,1670826559000,1,] address[KD111111000117.ppp-bb.dion.ne.jp/192.168.150.140:3306]
2022-12-12 14:29:27.169 - clientId:1001 cursor:[mysql-bin.000002,40669,1670826566000,1,] address[KD111111000117.ppp-bb.dion.ne.jp/192.168.150.140:3306]
2022-12-12 14:29:35.169 - clientId:1001 cursor:[mysql-bin.000002,40937,1670826574000,1,] address[KD111111000117.ppp-bb.dion.ne.jp/192.168.150.140:3306]
2022-12-12 14:29:44.169 - clientId:1001 cursor:[mysql-bin.000002,41205,1670826583000,1,] address[KD111111000117.ppp-bb.dion.ne.jp/192.168.150.140:3306]
2022-12-12 14:30:25.169 - clientId:1001 cursor:[mysql-bin.000002,41473,1670826624000,1,] address[KD111111000117.ppp-bb.dion.ne.jp/192.168.150.140:3306]
2022-12-12 14:30:32.169 - clientId:1001 cursor:[mysql-bin.000002,41741,1670826631000,1,] address[KD111111000117.ppp-bb.dion.ne.jp/192.168.150.140:3306]
2022-12-12 14:30:44.169 - clientId:1001 cursor:[mysql-bin.000002,42008,1670826643000,1,] address[KD111111000117.ppp-bb.dion.ne.jp/192.168.150.140:3306]

2.9、查看puslar中topic是否创建或有数据进入

bin/pulsar-admin topics list public.default

2.10、关闭

sh bin/stop.sh

3、参考

  • https://github.com/alibaba/canal
  • https://github.com/alibaba/canal/wiki/QuickStart

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/82877.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【数据结构】队列(链式队列)

作者:一个喜欢猫咪的的程序员 专栏:《数据结构》 喜欢的话:世间因为少年的挺身而出,而更加瑰丽。 ——《人民日报》 目录 队列的结构和概念: Queue.h文件 Queue.c文件 Test.c文件&am…

ElasticSearch分布式架构原理

一个ES集群中有多个Server节点,每个Server节点中含有多个Index。 主节点(Master) 主资格节点的主要职责是和集群操作相关的内容,如创建或删除索引,跟踪哪些节点是集群的一部分,并决定哪些分片分配给相关的…

文件系统篇

目录 1.文件系统的基本组成 1.1.文件 1.1.1.目录项和目录是一个东西吗? 1.1.2.那文件数据是如何存储在磁盘的呢? 2.page cache 2.1.进程写文件时,进程发生了崩溃,已写入的数据会丢失吗 2.2.page cache是什么? …

Docker:网络配置

目录 一、网络模式简介 二、bridge模式以及host模式的命令演示 bridge模式 host模式 三、自定义网络 一、网络模式简介 Docker在创建容器时有四种网络模式:bridge/host/container/none,bridge为默认不需要用--net去指定,其他三种模式需要…

CAS机制和synchronize的原理及其优化机制(锁消除 偏向锁 自旋锁 膨胀锁 锁粗化)

乐观锁的问题:并不总是能处理所有问题,所以会引入一定的系统复杂度。 读写锁 把加锁操作分成了俩种 一是读锁二是写锁 也就是说在读和读之间是没有互斥的 但是在读写和写写之间就会存在互斥 如果一个场景是一写多度 那么使用这个效率就会很高 重量级锁…

【数据结构与算法】B_树

目录 前言: 一、B树 1、B树概念 2、B树查找 3、B树插入 4、B树前序遍历 5、B树性能 二、B、B*树 1、B树概念 2、B树的插入 2、B*树概念 3、总结 三、B系列树的应用 总结 前言: 我们已经有很多索引的数据结构了 例如: 顺序查找 …

CAD外部参照文件的分解

最近遇到一个编图要求: “图纸文件的内容主要由模型空间和布局空间内的信息组成,尽量减少外部参照的使用。” 我们的综合图分幅主要依照外部参照来的,图件的本体只有1个,分幅图中只有布局试图有点线面等实体存在,模型…

阿里二面:用过GC日志可视化工具进行JVM调优吗?

上周有个小伙伴面了阿里,在二面中被问到GC日志分析,感觉回答的不是很好,过来找我复盘,大致听了他的回答,虽然回答出了部分,但是没抓到重点。 GC日志分析算是JVM调优中比较难的部分,今天这篇文章…

0123 双指针 Day12

剑指 Offer 25. 合并两个排序的链表 输入两个递增排序的链表,合并这两个链表并使新链表中的节点仍然是递增排序的。 示例1: 输入:1->2->4, 1->3->4 输出:1->1->2->3->4->4 /*** Definition for si…

安科瑞嵌入式多功能计量表AEM96 精度0.5S级 2-31次分次谐波

安科瑞 王晶淼/刘芳 一、产品概述 AEM系列三相嵌入式电能计量表是一款主要针对电力系统、工矿企业、公用设施的电能统计、管理需求而设计的智能电能表,集成三相电力参数测量及电能计量与考核管理,提供上24时,上31日以及上12月的电能数据统计…

DNS 区域传送漏洞(dns-zone-tranfer)学习

DNS 区域传送漏洞(dns-zone-tranfer)学习 ———— 相关知识理解 DNS(域名系统)就像一个互联网电话簿。它负责将人类可读的主机名解析为机器可读的 IP 地址。 DNS服务器分为主服务器,备份服务器,缓存服务…

【Docker】多个容器和宿主机之间如何进行数据同步和数据共享?容器数据卷从介绍到常用操作图文教程

专栏往期文章 《Docker是什么?Docker从介绍到Linux安装图文详细教程》《30条Docker常用命令图文举例总结》《Docker如何构建自己的镜像?从镜像构建到推送远程镜像仓库图文教程》 前言 你是否担心 Docker 容器被删除后,容器内的重要数据就丢…

VFIDILDKVENAIHNAAQVGIGFAKPFEKLINPK,果蝇抗菌肽

果蝇抗菌肽是一种含有Lys的抗菌多肽,序列中包含34个氨基酸,是一种含有α-折叠的抗菌多肽。 编号: 223981中文名称: 果蝇抗菌肽,Andropin英文名: Antimicrobial Peptide Andropin单字母: H2N-VFIDILDKVENAIHNAAQVGIGFAKPFEKLINPK-OH三字母: H2…

自然语言处理(NLP)数据集汇总 3(附下载链接)

🎄🎄【自然语言处理NLP】简介 🎄🎄 自然语言处理(Natural Language Processing, NLP)是计算机科学领域与人工智能领域中的一个重要方向。它研究能实现人与计算机之间用自然语言进行有效通信的各种理论和方法。自然语言处理是一门…

Allure:根据step动态设置description

背景 使用pytestAllure进行自动化的时候,为了报告展示更明确会使用 with allure.step(xxx)和 allure.step(xxx)测试结束后就可以看到 测试步骤 Allure还支持配置Description 之前直接在case中编写,例如 """ [用例描述]: 专家问诊 [前置步骤]:1. 打开h5页面…

ScheduledExecutorService的使用及守护线程

只运行一次 private static ScheduledExecutorService scheduler; public static void main(String[] args) throws Exception { scheduler Executors.newScheduledThreadPool(5); // 循环任务,按照上一次任务的发起时间计算下一次任务的开始时间 scheduler.schedu…

解决mysql存储emoji表情唯一索引报错问题

问题发现: 1、正常上班的一天,突然间有运营同事反馈,我们在添加数据的时候,发现添加了🐸之后,对应的💩没有了,添加了💩然后🐸就没有了,需要研发帮…

【Linux】四、Linux 进程概念(四)|进程地址空间

目录 十、进程地址空间 10.1 回顾C/C 地址空间 10.2 测试 10.3 感性理解虚拟地址空间 10.4 如何画大饼? 10.5 如何理解区域划分和区域调整 10.6 虚拟地址空间、页表和物理地址 10.7 为什么存在地址空间 10.7.1 保证物理内存的安全性 10.7.2 保证进程的独立…

铁蛋白-海藻酸钠纳米包埋ACE抑制肽|海藻酸钠修饰碳纳米管(SAL-MWNTs)

铁蛋白-海藻酸钠纳米包埋ACE抑制肽|海藻酸钠修饰碳纳米管(SAL-MWNTs) 铁蛋白-海藻酸钠纳米包埋ACE抑制肽产品描述:利用铁蛋白在较酸条件下可逆组装特性和海藻酸钠(sodium alginate,SA)的控释作用,以马脾脱铁铁蛋白(horse spleen apoferritin,HSF)和SA作为纳米载体,…

Rust 基础(八)—— 高级特性

十九、高级特性 到目前为止,您已经学习了Rust编程语言最常用的部分。在我们开始第20章的下一个项目之前,我们先来看一下你可能偶尔会碰到,但不是每天都在使用的语言的一些方面。当你遇到任何未知的情况时,你可以使用这一章作为参…