Canal实战使用(集群部署)和原理解析

news2025/1/10 17:16:40

1.mysql数据同步工作原理

 

MySQL master将数据变更写入二进制日志(binary log,其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)

MySQL slave将master的binary log events拷贝到它的中继日志(relay log)

MySQL slave重放relay log中事件,将数据变更反映它自己的数据

2.canal工作原理

canal模拟mysql数据同步的原理,伪装自己为mysql slave,向mysql master发送dump协议。

mysql master收到dump请求,开始推送binary log给slave(canal)

使用步骤

1.数据库需要开启binlog

show variables like 'binlog_format'
show variables like 'log_bin'

显示未on则开启。

查看binlog日志


show binlog events in 'DJT2TRG3-bin.000433';

可以查找执行操作的position

可以通过修改canal的meta.dat文件 修改位置 从之前的位置开始同步操作。

下载canal 服务端 canal deployer 和服务端 canal adapter。

下载完成解压后,修改配置文件,由于主要的内容就是修改配置文件,以下贴出我使用的配置:

服务端配置文件canal_local.properties:

# register ip
canal.register.ip = 127.0.0.1

# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster = canal-cluster-1
canal.admin.register.name = canal-server-2

canal.properties(可以在canal-admin中配置)

#################################################
######### 		common argument		#############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =

canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = tcp
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info 如果不配置的话 canal使用的数据存在内置数据库H2,重启等会丢失数据
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

#################################################
######### 		destinations		#############
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
######### 	      MQ Properties      #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=

canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local

canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8

##################################################
######### 		     Kafka 		     #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0

kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"

##################################################
######### 		    RocketMQ	     #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag = 

##################################################
######### 		    RabbitMQ	     #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =

Instance.properties

# MySQL 集群配置中的 serverId 概念,需要保证和当前 MySQL 集群中 id 唯一 (v1.0.26+版本之后 canal 会自动生成,不需要手工指定)
# canal.instance.mysql.slaveId=0
# 源库地址
canal.instance.master.address=11.17.6.185:4679

# 方式一:binlog + postion 如果用方式一 也可以不用填position 和binlog canal会自动获取
# mysql源库起始的binlog文件
# canal.instance.master.journal.name=mysql-bin.000008
# mysql主库链接时起始的binlog偏移量
# canal.instance.master.position=257890708 
# mysql主库链接时起始的binlog的时间戳
# canal.instance.master.timestamp=

# 方式二:gtid(推荐)
# 启用 gtid 方式同步
canal.instance.gtidon=true
# gtid
canal.instance.master.gtid=92572381-0b9c-11ec-9544-8cdcd4b157e0:1-1066141


# 源库账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# MySQL 数据解析编码
canal.instance.connectionCharset = UTF-8

# MySQL 数据解析关注的表,Perl 正则表达式
canal.instance.filter.regex=acpcanaldb.*

# MySQL 数据解析表的黑名单,过滤掉不同步的表
# canal.instance.filter.black.regex=mysql\\.slave_.*

客户端配置:

my_test.yml(adapter\conf\rdb目录下可以添加多个.yml,都会读取)

dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
#dbMapping:
#  database: account
#  table: demo
#  targetTable: demo
#  targetPk:
#    id: id
#  mapAll: true
#  targetColumns:
#    id:
#    name:
#    test:
#    role_id:
#    c_time:
#    test1:
#  etlCondition: "where c_time>={}"
#  commitBatch: 3000 # 批量提交的大小


## Mirror schema synchronize config
#dataSourceKey: defaultDS
#destination: example
#groupId: g1
#outerAdapterKey: mysql1
#concurrent: true
##使用mirror方式 整个库的改动都会同步 但是必须两个库保持一致,不同的已存在表不会处理,另外因为是
##两个同名库,所以必须两个服务器上(两个ip)搭建mysql
##如果不用mirror方式,只会同步一个表,新增表添加yml文件,比较灵活
dbMapping:
  mirrorDb: true
  database: account

配置完成后,先启动服务端(省略zk安装,较为简单可以自己百度),然后启动客户端,新建表添加数据进行测试。需要注意如果要同步DDL语句,必须用mirror方式,否则只能同步dml单表记录。

下边是运行截图:

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/492563.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java基础(十八)网络编程

1. 网络编程概述 Java是 Internet 上的语言,它从语言级上提供了对网络应用程序的支持,程序员能够很容易开发常见的网络应用程序。 Java提供的网络类库,可以实现无痛的网络连接,联网的底层细节被隐藏在 Java 的本机安装系统里&am…

软件测试培训了几个月,找到工作了,面试经验分享给各位

面试问的一些基本问题 功能方面:问的最多的就是测试流程,测试计划包含哪些内容,公司人员配置,有bug开发认为不是 bug怎么处理,怎样才算是好的用例,测试用例设计方法(等价类,边界值等…

无线之红外线技术的组网方式详解

红外线(Infrared rays)也是一种光线,由于它的波长比红色光750nm)还长,超出了人眼可以识别的可见光)范围,所以我们看不见它,又称为红外热辐射(Infrared radiation),通常把波长为0.75~1000μm的光…

腾讯高工手写13W字“Netty速成手册”,3天走向实战

前言 在java界,netty无疑是开发网络应用的拿手菜。你不需要太多关注复杂的nio模型和底层网络的细节,使用其丰富的接口,可以很容易的实现复杂的通讯功能。 作为当前最流行的NIO框架,Netty在互联网领域、大数据分布式计算领域、游…

【Linux0.11代码分析】02 之 bootsect.s 启动流程

【Linux0.11代码分析】02 之 bootsect.s 启动流程 一、boot\bootsect.s1.1 将bootsect.s 从0x7c00 移动到 0x90000 (512byte)1.2 使用 int 0x13 中断加载 setup.s 程序到 0x902001.3 获取并解析磁盘驱动器的参数1.4 开始加载 System模块到 0x10000 地址1…

计算机网络之应用层

文章目录 应用层1. 应用层协议原理1.1 网络应用程序体系结构1.2 进程通信1.3 可供程序使用的运输服务1.4 因特网提供的运输服务1.5 应用层协议 2.Web应用和HTTP协议2.1 HTTP概况2.2 非持久连接和持久连接2.3 HTTP报文格式2.4 用户服务器的交互:cookie2.5 Web缓存器 …

数字医疗算法应用创新大赛-食品与疾病关系预测算法赛道top5方案分享

一、比赛地址和背景 1.1 比赛地址 比赛地址 1.2 数据说明及任务 本次算法赛将提供超过 23.5W 的食物、疾病对应关系及其量化得分,其中食物特征超过 200 个,疾病特征由 3 种不同的方式抽取,累积超过 4000 个特征信息。初赛为 0、1 二分类预…

递归递推练习题

1.用递归的方法123…N的值(in:5,out:15) 2.输出斐波那契数列的第N项,0,1,1,2,3,5,8,13…(in:3,out:1) 3.求n!(in:5,out:120)n!123……

STM32启动/下载模式

启动模式 主闪存存储器区存放用户设计的代码,代码烧录到这里,一般情况在正常工作时需要把STM32设置为从这里启动。 系统存储器区用来实现ISP功能的。 内嵌SRAM区用来实现调试器调试功能的。 ISP ISP一般由PC机通过串口把bin/hex文件直接烧写到单片机内…

java并发编程之美第五章读书笔记

java并发包中并发List源码剖析 介绍 CopyOnWriteArrayList 线程安全的ArrayList,对其进行的修改操作都是在底层的一个复制的数组(快照)进行的,也就是写时复制策略 类图 每一个对象里面有一个array数组进行存放具体的元素,ReentrantLock独占锁对象用来保证同时只有一个线程对…

Maven中optional标签详解

这一篇文章应该是全网讲解optional最细致的,因为我都是拿我自己的试验来证明,从试验当中也发现了很多出乎意外的知识!感兴趣的跟着小编一块来学习呀! 目录 一、前言二、依赖传递代码演示三、是否会影响父子工程之间的依赖继承呢&a…

精致女孩必备的6款APP,内外兼修,提升气质

自立自强、敢于追求、内外兼修,这样的女孩不管在哪个时代都是非常有魅力的! 在我看来,自律、上进、情商高、会打扮、会赚钱、独立、落落大方,这才是精致女孩该有的样子。 不墨迹,速速揭开6款宝藏APP的面纱&#xff0…

石头科技2022年营收实现双位数增长,以技术实力打响创新价值战

近日,石头科技披露了2022年度财务报告,报告显示,在在较大内外部压力下,石头科技2022年营收依然实现双位数增长,且境内外销售收入平稳增长。 该公司在近年来不断完善其产品矩阵,目前已推出手持无线吸尘、商…

数据结构 (入门必看)

一、 预备知识 (一)数据结构前言 1、学习C语言是如何写程序,学习数据结构如何简洁高效的写程序 2、遇到一个实际问题,需要写程序,需要解决两个方面的问题 1)如何表达数据之间的逻辑规律以及如何将数据存储到计算机中 数据结构…

多数据源 使用 flyWay 进行数据库管理

文章目录 前言多数据源 使用 flyWay 进行数据库管理1. 环境2. flyway版本 与 MySQL 版本 对应关系3. flyway 脚本文件命名方式4. flyway工作流程5. 知识点补充6. 集成的时候常见错误6.1. user_variables_by_thread没有访问权限6.2. MySQL不支持Flyway社区版,只支持F…

68元工业级双核A7,全新T113核心板震撼上市!

万象奥科全新T113核心板震撼上市,仅68元、工业级品质、双核A7处理器、外设丰富,诠释极致性价比! 参数速递 1.全志T113工业级处理器,主频1.2GHz 2.双核Cortex-A7玄铁C906 RISC-V HiFi4 DSP异构多核 3.内存128MB/256MB/512MB&a…

【Kafka】概述与集群部署

文章目录 Kafka概述定义应用场景缓冲/削峰解耦异步通信 应用模式点对点模式发布/订阅模式 基础架构 Kafka集群部署集群规划下载解压修改配置文件分发安装包hadoop103、hadoop104修改配置文件配置环境变量启动集群先启动Zookeeper集群然后启动Kafka 关闭集群 集群启停脚本脚本编…

QT自学笔记2:使用介绍(函数)

一、 setAttribute(按F1)—>void QWidget::setAttribute(Qt::WidgetAttribute attribute, bool on true) —>Qt::WidgetAttribute attribute(有一个属性) ----> p->setAttribute(Qt::WA_DeleteOnClose); MainWind…

眼见为实,来瞧瞧MySQL中的隐藏列!

在介绍mysql的多版本并发控制MVCC的过程中,我们提到过mysql中存在一些隐藏列,例如行标识、事务ID、回滚指针等,不知道大家是否和我一样好奇过,要怎样才能实际地看到这些隐藏列的值呢? 本文我们就来重点讨论一下诸多隐…

ThingsBoard集群部署之k8s

1、概述 今天终于有时间去搞这个啦,拖了很久了,一直没时间,因为我本地没有那么多机器资源,开虚拟机不够,如果租用阿里云服务器,需要有充值的时间,因为这个费用是按小时付费,需要有连贯的时间来搞才行,今天恰好有时间,就开始搞了,弄成功搞出来了,特地写博客记录下来…