canal部署

news2025/1/19 7:55:54

定义

  1. canal组件是一个基于mysql数据库增量日志解析,提供增量数据订阅和消费,支持将增量数据投递到下游消费者(kafka,rocketmq等)或者存储(elasticearch,hbase等)
  2. canal感知到mysql数据变动,然后解析变动数据,将变动数据发送到mq或者同步到其他数据库,等待进一步业务逻辑处理

canal的工作原理

  1. mysql master将数据变更写入到二进制日志binary long,简称binlog
  2. mysql slave将master的bin log拷贝到它的中继日志relay log
  3. mysql slave重放relay log操作,将变更数据同步到最新

image.png

mysql binlog日志

介绍

  1. mysql的binlog可以说mysql的最重要的日志,它记录了所有的DDL(表的创建)和DML(数据发生变化)语句,以事件形式记录
  2. mysql默认情况下是不开启binlog,因为记录binlog日志需要消耗时间,官方给出的数据是有1%的性能损耗
    1. 一般来说,在下面两场场景下会开启binlog日志
      1. mysql主从集群部署时,需要将在master端开启binlog,方便将数据同步到slaves中
      2. 数据恢复了,通过使用mysql binlog工具来使数据恢复

binlog的分类

分类介绍优点缺点
STATEMENT语句级别,记录每一次执行写操作的语句相当于row模式节省了空间,但是可能产生的数据不一致,有疑执行时间不同,导致产生的数据不同节省空间可能造成数据不一致
ROW行级,记录每次操作后每行记录的变化,假如一个update的sql执行结果是1万行statement只存一条,如果是row的话会把这1万行的结果存着数据绝对一致性,因为不管sql是什么,引用了什么函数,他只记录执行后的效果占用较大空间
MIXED是对STATEMENT的升级,如当函数中包含UUID()时,包含AUTO_INCREMENT字段的表被更新时,执行INSERT DELAYED语句时,用UDF时,会按照ROW的方式进行处理节省空间,同时兼顾了一定的一致性还有些极个别情况依旧会造成不一致,另外STATEMENT和mixed对于需要对binlog的监控的情况都不方便

canal工作原理

  1. canal将自己伪装为mysql slave,向mysql master发送bump协议
  2. mysql master 收到dump请求,开始推送binary log给slave(canal)
  3. canal接收并解析binlog日志,得到变更的数据,执行后续逻辑

image.png

canal应用场景

数据同步

  1. canal可以帮助用户进行多种数据同步操作,如实时同步mysql数据到elasticsearch,redis等数据存储介质中
    1. image.png

数据库实时监控

  1. canal可以实时监控mysql的更新操作,对于敏感数据的修改可以及时通知相关人员
  2. image.png

数据分析和挖掘

  1. canal可以将mysql增量数据投递到kafka等消息队列中,为数据分析和挖掘提供数据来源
  2. image.png

数据库备份

  1. canal可以将mysql主库上的数据增量日志复制到备库上,实现数据库的备份
  2. image.png

数据集成

  1. canal可以将多个mysql数据库中的数据进行集成,为数据处理提供更加高效可靠的解决方案
  2. image.png

数据库迁移

  1. canal可以协助完成mysql数据库的辨别升级及数据迁移任务
  2. image.png

canal中mysql配置

vim /etc/mysql/my.cnf
[mysqld]
# 设置主服务器唯一ID
server-id=1
# 启用二进制日志
log-bin=mysql-bin
# 设置不要复制的数据库(可以设置多个)
binlog-ignore-db=mysql
# 设置需要监听的数据库,不配置表示所有数据库均开启
#binlog-do-db=canal-demo
# 设置logbin格式
binlog_format=row
bind-address = 0.0.0.0

canal下载配置文件

官网下载
tar -zvxf canal.deployer-1.1.7.tar.gz
cd canal
cd conf
#################################################
#########               common argument         #############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip = 192.168.7.129    # canal的ip
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =

canal.zkServers =192.168.5.6:2181   # kafka的端口的和ip
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = kafka     # 选择同步方式为kafka
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

#################################################
#########               destinations            #############
#################################################
canal.destinations = example,script   # 需要进行同步的任务,在conf/example文件下,和script下
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
#########             MQ Properties      #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=

canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local

canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8

##################################################
#########                    Kafka                   #############
##################################################
kafka.bootstrap.servers = 192.168.5.6:9092   # kafka的IP地址和端口
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0

kafka.kerberos.enable = false
kafka.kerberos.krb5.file = ../conf/kerberos/krb5.conf
kafka.kerberos.jaas.file = ../conf/kerberos/jaas.conf

# sasl demo
# kafka.sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \\n username=\"alice\" \\npassword="alice-secret\";
# kafka.sasl.mechanism = SCRAM-SHA-512
# kafka.security.protocol = SASL_PLAINTEXT

##################################################
#########                   RocketMQ         #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =

##################################################
#########                   RabbitMQ         #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =


##################################################
#########                     Pulsar         #############
##################################################
pulsarmq.serverUrl =
pulsarmq.roleToken =
pulsarmq.topicTenantPrefix =

配置任务

vim canal/conf/example
vim instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=192.168.7.129:3306  # 数据库的ip和端口
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=canal              # 数据库的用户名
canal.instance.dbPassword=docker211102       # 数据库的密码
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=script\\..*    #监控script这个数据库下面的所有的表
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example             # 将数据发送到example这个主题里面
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#
# multi stream for polardbx
canal.instance.multi.stream.on=false
#################################################

vim canal/conf/script
vim instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=192.168.7.129:3306   # 数据库的ip和端口
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=canal                # 数据库的用户名
canal.instance.dbPassword=docker211102         # 数据库的密码
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=canal-demo\\..*   # 监控cancl-demo这个库下面的所有表
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=script          # 将数据发送到script这个主题里面
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#
# multi stream for polardbx
canal.instance.multi.stream.on=false
#################################################

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1566873.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AI学习-线性回归推导

线性回归 1.简单线性回归2.多元线性回归3.相关概念熟悉4.损失函数推导5.MSE损失函数 1.简单线性回归 ​ 线性回归:有监督机器学习下一种算法思想。用于预测一个或多个连续型目标变量y与数值型自变量x之间的关系,自变量x可以是连续、离散,但是目标变量y必…

IpcRenderer.invoke Error: An object could not be cloned.

这个错误信息提示“Uncaught (in promise) Error: An object could not be cloned.”通常发生在使用 Electron 的 IPC 通信过程中,尝试通过 ipcRenderer.invoke 或 ipcMain.handle 发送不能被克隆的对象时。JavaScript 中一些特殊对象或包含循环引用的对象无法通过 …

SQL server 查询数据库中所有的表名及行数

SQL server 查询数据库中所有的表名及行数 select a.name,b.rows from sysobjects as ainner join sysindexes as bon a.id b.id where (a.type u)and (b.indid in (0, 1)) and b.rows<50 and b.rows>20 order by a.name, b.rows desc;

虚拟机 ubuntu 20.04 git 设置代理的方法

前言 ubuntu 20.04 虚拟机中 Git 访问 github 或者其他的 git 仓库&#xff0c;大部分情况下速度很慢&#xff0c;并且容易掉线 如果 主机上使用了代理软件&#xff0c;但是虚拟机 ubuntu 中 Git 访问 git 仓库依旧是很慢 【问题】如何设置 虚拟机 ubuntu 的 Git 代理&#x…

C# 批量删除Excel重复项

当从不同来源导入Excel数据时&#xff0c;可能存在重复的记录。为了确保数据的准确性&#xff0c;通常需要删除这些重复的行。 手动查找并删除可能会非常耗费时间&#xff0c;而通过编程脚本则可以实现在短时间内处理大量数据。本文将提供一个使用C# 快速查找并删除Excel重复项…

Beaver Builder Pro v2.8.0.6:最佳的WordPress页面构建器插件

如果你正在寻找一个能帮助你轻松创建具有专业外观的网站的工具&#xff0c;那么Beaver Builder Pro v2.8.0.6就是你的最佳选择。这个高级WordPress插件提供了一个直观的前端可视化页面构建器&#xff0c;让你可以通过拖放元素来快速构建无限的自定义帖子和页面。 Beaver Buil…

JAVAEE之IoCDI

Spring 是⼀个 IoC&#xff08;控制反转&#xff09;容器&#xff0c;作为容器, 那么它就具备两个最基础的功能&#xff1a; • 存 • 取 Spring 容器管理的主要是对象, 这些对象, 我们称之为"Bean". 我们把这些对象交由Spring管理, 由 Spring来负责对象的创建…

Spring Boot--文件上传和下载

文件上传和下载 前言文件上传1、以MultipartFile 接口流文件&#xff0c;流的名称需要和前台传过来的名称对应上2、获取到文件名称截取后缀3、为了放置文件名重复使用uuid来随机生成id后缀4、判断转存路径中是否有这个文件夹如果没有就创建5、将文件存储到转存的目录中 文件下载…

非小米电脑下载小米电脑管家

由于 小米电脑管家 现在新增了机型验证&#xff0c;本篇将分享非小米电脑用户如何绕过机型验证安装 小米电脑管家 首先到小米跨端智联官网 https://hyperos.mi.com/continuity 中下载小米电脑管家 打开官网链接后&#xff0c;直接滑动到底部&#xff0c;点击下载 下载完成后…

C语言编写Linux的Shell外壳

目录 一、输出命令行 1.1 了解环境变量 1.2 获取用户名、主机名、当前路径 1.3 缓冲区改进MakeCommandLine 二、获取用户命令 2.1 读取函数的选择 2.2 细节优化 2.3 返回值 三、指令和选项分割 3.1 strtok 函数 3.2 分割实现 四、执行命令 4.1 fork 方法 4.2 进…

iPhone GPU性能评估:优化移动应用开发

摘要 了解你的显卡对于在电脑上玩现代图形要求高的游戏非常重要。本文介绍了如何轻松查看你的显卡型号以及为什么显卡在玩电脑游戏时如此关键。 引言 随着电脑游戏的发展&#xff0c;现代游戏对硬件性能的要求越来越高。十年前发布的显卡已经无法满足当前游戏的需求。因此&…

【前端】CSS(引入方式+选择器+常用元素属性+盒模型)

文章目录 CSS一、什么是CSS二、语法规范三、引入方式1.内部样式表2.行内样式表3.外部样式 四、选择器1.选择器的种类1.基础选择器&#xff1a;单个选择器构成的1.标签选择器2.类选择器3.id 选择器4.通配符选择器 2.复合选择器1.后代选择器2.子选择器3.并集选择器4.伪类选择器 五…

深挖苹果Find My技术,伦茨科技ST17H6x芯片赋予产品功能

苹果发布AirTag发布以来&#xff0c;大家都更加注重物品的防丢&#xff0c;苹果的 Find My 就可以查找 iPhone、Mac、AirPods、Apple Watch&#xff0c;如今的Find My已经不单单可以查找苹果的设备&#xff0c;随着第三方设备的加入&#xff0c;将丰富Find My Network的版图。产…

Redis主从复制、哨兵模式、Cluster集群

目录 一、Redis主从复制 1、主从复制介绍 2、主从复制原理 ​编辑 3、主从复制的作用 4.Redis主从复制实验搭建 1. 关闭防火墙和安装依赖环境 2. 解压安装包 3. 编译并安装到指定目录 4. 执行脚本文件 5. 做软连接 6. 启动redis并查看端口 7. 重启redis 8. 修改主…

机器学习每周挑战——信用卡申请用户数据分析

数据集的截图 # 字段 说明 # Ind_ID 客户ID # Gender 性别信息 # Car_owner 是否有车 # Propert_owner 是否有房产 # Children 子女数量 # Annual_income 年收入 # Type_Income 收入类型 # Education 教育程度 # Marital_status 婚姻状况 # Housing_type 居住…

Win11 绕过 TPM 或 CPU 检测

方法 1&#xff1a;修改注册表绕过 TPM 或 CPU 检测&#xff08;升级安装&#xff09; 如果你的硬件不完全符合安装 Windows 11 的基本硬件要求&#xff0c;可以通过修改注册表&#xff0c;在至少拥有 TPM 1.2 和不支持的 CPU 上升级安装 Windows 11 系统. 适用场景&#xff…

【Layui】------ layui实现table表格拖拽行、列位置的示例代码

一、完整的示例代码&#xff1a;&#xff08;请使用layui v2.8.3的版本&#xff09;看懂就能用、不要照搬、照搬会出错误、拷贝重要代码改改符合你自己的需求。 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><…

【图论】知识点集合

边的类型 neighbors(邻居)&#xff1a;两个顶点有一条共同边 loop&#xff1a;链接自身 link&#xff1a;两个顶点有一条边 parallel edges&#xff1a;两个顶点有两条及以上条边 无向图 必要条件&#xff1a;删掉顶点数一定大于等于剩下的顶点数 设无向图G<V,E>是…

scRNA+bulk+MR:动脉粥样硬化五个GEO数据集+GWAS,工作量十分到位

今天给大家分享一篇JCR一区&#xff0c;单细胞bulkMR的文章&#xff1a;An integrative analysis of single-cell and bulk transcriptome and bidirectional mendelian randomization analysis identified C1Q as a novel stimulated risk gene for Atherosclerosis 标题&…

基于ZooKeeper的Kafka分布式集群搭建与集群启动停止Shell脚本

下载Kafka压缩包 下方是Kafka官网下载地址&#xff0c;本文使用Kafka 3.0.0在虚拟机环境中搭建分布式集群。 Apache Kafka Downloads link 虽然在Kafka 2.8.0之后可以使用KRaft模式搭建高可用的集群以提高数据处理效率&#xff0c;但是目前还有许多企业依然使用ZooKeeper搭建K…