Canal安装部署与测试

news2025/1/9 2:13:42

文章目录

  • 第一章 Canal概述
    • 1.1 简介
    • 1.2 工作原理
      • 1.2.1 MySQL主备复制原理
      • 1.2.2 canal 工作原理
    • 1.3 重要版本更新说明
    • 1.4 多语言
  • 第二章 Canal安装部署
    • 2.1 准备
    • 2.2 canal安装
  • 第三章 Canal和Kafka整合测试
  • 注意事项

第一章 Canal概述

Github地址:https://github.com/alibaba/canal

1.1 简介

image-20201118180854772

canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

1.2 工作原理

1.2.1 MySQL主备复制原理

img

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

1.2.2 canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流),再回放中继日志

1.3 重要版本更新说明

  1. canal 1.1.x 版本(release_note),性能与功能层面有较大的突破,重要提升包括:
    • 整体性能测试&优化,提升了150%. #726 参考: Performance
    • 原生支持prometheus监控 #765 Prometheus QuickStart
    • 原生支持kafka消息投递 #695 Canal Kafka/RocketMQ QuickStart
    • 原生支持aliyun rds的binlog订阅 (解决自动主备切换/oss binlog离线解析) 参考: Aliyun RDS QuickStart
    • 原生支持docker镜像 #801 参考: Docker QuickStart
  2. canal 1.1.4版本,迎来最重要的WebUI能力,引入canal-admin工程,支持面向WebUI的canal动态管理能力,支持配置、任务、日志等在线白屏运维能力,具体文档:Canal Admin Guide

1.4 多语言

canal 特别设计了 client-server 模式,交互协议使用 protobuf 3.0 , client 端可采用不同语言实现不同的消费逻辑,欢迎大家提交 pull request

  • canal java 客户端: https://github.com/alibaba/canal/wiki/ClientExample
  • canal c# 客户端: https://github.com/dotnetcore/CanalSharp
  • canal go客户端: https://github.com/CanalClient/canal-go
  • canal Python客户端: https://github.com/haozi3156666/canal-python

canal 作为 MySQL binlog 增量获取和解析工具,可将变更记录投递到 MQ 系统中,比如 Kafka/RocketMQ,可以借助于 MQ 的多语言能力

  • 参考文档: Canal Kafka/RocketMQ QuickStart

第二章 Canal安装部署

2.1 准备

  • 查看mysql是否开启bin-log,查询语句如下:

    SHOW VARIABLES LIKE '%log_bin%'
    

    如查询出来log_bin的Value是OFF状态,则需要执行如下的开启bin-log步骤。

  • 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
    
    #注意,配置后需要重启msyql
    [root@hadoop01 ~]# systemctl restart mysqld
    
    • 注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
  • 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant(如有授权用户可不执行如下命令)。

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;
    

2.2 canal安装

https://github.com/alibaba/canal/releases 中查找 canal 最新deploy 版本的下载链接
1.下载压缩包:

[root@hadoop01 ~]# wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

2.解压

[root@hadoop01 ~]# mkdir /usr/local/canal
[root@hadoop01 ~]# tar -zxvf /home/canal.deployer-1.1.4.tar.gz -C /usr/local/canal

3.创建测试配置直接使用自带的样例进行更改,并复制出默认配置以备后期增加库时使用,这里的每一个文件夹就代表一个数据源,多个数据源就复制出多个文件夹后在 canal.properties 的 canal.destinations 里逗号分隔进行配置:

[root@hadoop01 canal]# cp -r ./conf/example/ ./conf/test

4.修改单个数据源的配置

[root@hadoop01 canal]# vi ./conf/test/instance.properties
#修改如下
canal.instance.master.address=127.0.0.1:3306 	# 数据库服务器地址
canal.instance.dbUsername=root 	# 数据库用户名
canal.instance.dbPassword=root 	# 数据库密码
#canal.instance.filter.regex=.*\\..*  # 数据表过滤正则,dbName.tbName,默认的是所有库的所有表
canal.instance.filter.regex=travel.dim_product1 	# 数据表过滤正则,dbName.tbName,默认的是所有库的所有表
canal.mq.topic=travel_dim_product1 		# 这个数据库存储进 Kafka 时使用的 topic

5.修改 Canal 全局设置

[root@hadoop01 canal]# vi conf/canal.properties

#修改如下
canal.destinations = test 	# 这里配置开启的 instance,具体方法上面步骤有说明

canal.serverMode = kafka 	# 更改模式,直接把数据扔进 Kafka
#canal.mq.servers = 127.0.0.1:6667
canal.mq.servers = 192.168.216.111:9092,192.168.216.112:9092,192.168.216.113:9092 	# Kafka 的地址
canal.mq.batchSize = 16384 # 这里没有更改,值应该小于 Kafka 的 config/producer.properties 中 batch.size,但是 Kafka 里没设置,这里也就不更改了
#canal.mq.flatMessage = false
canal.mq.flatMessage = true 	# 使用文本格式(JSON)进行传输,否则 Kafka 里扔进去的是二进制数据,虽然不影响,但是看起来不方便
配置文件详解:
参数名字 参数说明 默认值
canal.destinations 当前server上部署的instance列表
canal.id 每个canal server实例的唯一标识,暂无实际意义 1
canal.ip canal server绑定的本地IP信息,如果不配置,默认选择一个本机IP进行启动服务 无
canal.port canal server提供socket服务的端口 11111
canal.zkServers canal server链接zookeeper集群的链接信息
例子:127.0.0.1:2181,127.0.0.1:2182 无

canal.zookeeper.flush.period canal持久化数据到zookeeper上的更新频率,单位毫秒 1000
canal.file.data.dir canal持久化数据到file上的目录 …/conf (默认和instance.properties为同一目录,方便运维和备份)
canal.file.flush.period canal持久化数据到file上的更新频率,单位毫秒 1000
canal.instance.mysql.slaveId mysql集群配置中的serverId概念,需要保证和当前mysql集群中id唯一 1234
canal.instance.master.address mysql主库链接地址 127.0.0.1:3306
canal.instance.master.journal.name mysql主库链接时起始的binlog文件 无
canal.instance.master.position mysql主库链接时起始的binlog偏移量 无
canal.instance.master.timestamp mysql主库链接时起始的binlog的时间戳 无
canal.instance.dbUsername mysql数据库帐号 canal
canal.instance.dbPassword mysql数据库密码 canal
canal.instance.defaultDatabaseName mysql链接时默认schema 
canal.instance.connectionCharset mysql 数据解析编码 UTF-8
canal.instance.filter.regex mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\) 
常见例子:
1.  所有表:.*   or  .*\\..*
2.  canal schema下所有表: canal\\..*
3.  canal下的以canal打头的表:canal\\.canal.*
4.  canal schema下的一张表:canal.test1
5.  多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)	.*\\..*

6.开启 Canal
bin/startup.sh

#因为 Canal 的脚本名称都太普通,所以没有添加到 PATH 里

[root@hadoop01 canal]# ./bin/startup.sh

7.查看日志是否有异常:

[root@hadoop01 canal]# tail -f ./logs/canal/canal.log   #server启动日志
[root@hadoop01 canal]# tail -f ./logs/test/test.log   #instance 的日志

第三章 Canal和Kafka整合测试

kafka的安装在这儿省略,大家自行安装即可…

  1. 测试,连上数据库尝试执行更改 SQL
CREATE TABLE `yq` (
  `dt` varchar(25) NOT NULL,
  `province` varchar(255) NOT NULL,
  `adds` int(11) DEFAULT '0',
  `possibles` int(11) DEFAULT '0',
  PRIMARY KEY (`dt`,`province`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
**查看 Kafka 中的数据,列出所有 topic:
kafka-topics.sh --list --zookeeper localhost:2181消费其中的数据:kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning此时应该已经可以列出创建表时的语句后期 Bug 修复:消费端 Kafka 不进数据,Canal 日志报错 org.apache.kafka.common.errors.RecordTooLargeException,认为是 Kafka 消息体大小限制造成的,需要同时修改 Kafka 与 Canal 消息体的最大限制修改 Kafka 配置,server.properties 中修改或添加配置项 message.max.bytes=100000000,producer.properties 中修改或添加配置项 max.request.size=100000000,consumer.properties 中修改或添加配置项 max.partition.fetch.bytes=100000000,重启 Kafka修改 Canal 配置,canal.properties 修改 canal.mq.maxRequestSize 参数值为 90000000,重启 Canal查看 Canal 日志是否报错 Could not find first log file name in binary log index file at… 如果报错则停止 Canal ,再删除实例配置下的 meta.dat 文件,再启动 Canal 即可**
  1. 对yq表进行update、insert、delete操作
{"data":[{"dt":"2020-7-1","province":"beijing","adds":"3","possibles":"12"}],"database":"test","es":1603647506000,"id":1,"isDdl":false,"mysqlType":{"dt":"varchar(25)","province":"varchar(255)","adds":"int(11)","possibles":"int(11)"},"old":[{"possibles":"6"}],"pkNames":["dt","province"],"sql":"","sqlType":{"dt":12,"province":12,"adds":4,"possibles":4},"table":"yq","ts":1603647506282,"type":"UPDATE"}
{"data":[{"dt":"2020-7-2","province":"beijing","adds":"10","possibles":"6"}],"database":"test","es":1603647607000,"id":2,"isDdl":false,"mysqlType":{"dt":"varchar(25)","province":"varchar(255)","adds":"int(11)","possibles":"int(11)"},"old":[{"possibles":"5"}],"pkNames":["dt","province"],"sql":"","sqlType":{"dt":12,"province":12,"adds":4,"possibles":4},"table":"yq","ts":1603647607348,"type":"UPDATE"}
{"data":[{"dt":"2020-10-26","province":"qingdao","adds":"2","possibles":"3"}],"database":"test","es":1603647656000,"id":3,"isDdl":false,"mysqlType":{"dt":"varchar(25)","province":"varchar(255)","adds":"int(11)","possibles":"int(11)"},"old":null,"pkNames":["dt","province"],"sql":"","sqlType":{"dt":12,"province":12,"adds":4,"possibles":4},"table":"yq","ts":1603647656168,"type":"INSERT"}
{"data":[{"dt":"2020-10-26","province":"qingdao","adds":"2","possibles":"3"}],"database":"test","es":1603647707000,"id":4,"isDdl":false,"mysqlType":{"dt":"varchar(25)","province":"varchar(255)","adds":"int(11)","possibles":"int(11)"},"old":null,"pkNames":["dt","province"],"sql":"","sqlType":{"dt":12,"province":12,"adds":4,"possibles":4},"table":"yq","ts":1603647707172,"type":"DELETE"}
  1. 关闭Canal
[root@hadoop01 canal]# ./bin/stop.sh

关闭之后,如果mysql的表数据有修改,当重启canal以后,将会把变更数据投递到kafka中!!!

注意事项

1、bin-log的存储三种Format:

mysql复制主要有三种方式:基于SQL语句的复制(statement-based replication, SBR),基于行的复制(row-based replication, RBR),混合模式复制(mixed-based replication, MBR)。对应的,binlog的格式也有三种:STATEMENT,ROW,MIXED。

① STATEMENT模式(SBR)

每一条会修改数据的sql语句会记录到binlog中。优点是并不需要记录每一条sql语句和每一行的数据变化,减少了binlog日志量,节约IO,提高性能。缺点是在某些情况下会导致master-slave中的数据不一致(如sleep()函数, last_insert_id(),以及user-defined functions(udf)等会出现问题)

② ROW模式(RBR)

不记录每条sql语句的上下文信息,仅需记录哪条数据被修改了,修改成什么样了。而且不会出现某些特定情况下的存储过程、或function、或trigger的调用和触发无法被正确复制的问题。缺点是会产生大量的日志,尤其是alter table的时候会让日志暴涨。

③ MIXED模式(MBR)

以上两种模式的混合使用,一般的复制使用STATEMENT模式保存binlog,对于STATEMENT模式无法复制的操作使用ROW模式保存binlog,MySQL会根据执行的SQL语句选择日志保存方式。

2、canal会丢失数据吗?

建议将增量数据投递到MQ中,如果canal有异常,当恢复时候会重读中继日志

3、canal的下游怎么处理?

存储可以进行实时读写处理中---流式计算 ---> 追加到流表中、选择支持upsert 模式流表中、或者选择自己维护状态
离线,可以使用canal周期落地 --- 离线

4、canal对接kafka,kafka多分区怎么办

1、单分区能保证增量数据顺序---但是丢失并行度
2、按照维度得主键进行分区  --- 增加并行处理能力

kafka是3个分区
k-0
k-1
k-2

mysql
111
112
113

111 zs k-0
111 ls k-1
113 k-2

flink消费:
3个并行度消费
k-1 先消费
k-0 消费

5、mysql多少节点、canal多少节点

如果维度表数量在50个一下,则构建3-5个canal集群即可。
50-100个维度表,则构建7-11个左右canal即可

6、canal保留原始数据和增量数据?

ES或者HBASE:
eid
主键_version 其它字段 是否有效 起始时间 生效时间

redis:
主键 [{},{},{}] 索引+1就是版本信息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/786360.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

脑电信号处理与特征提取——4.脑电信号的预处理及数据分析要点(彭微微)

目录 四、脑电信号的预处理及数据分析要点 4.1 脑电基础知识回顾 4.2 伪迹 4.3 EEG预处理 4.3.1 滤波 4.3.2 重参考 4.3.3 分段和基线校正 4.3.4 坏段剔除 4.3.5 坏导剔除/插值 4.3.6 独立成分分析ICA 4.4 事件相关电位(ERPs) 4.4.1 如何获…

Java子类可以继承父类的所有属性吗

子类可以继承父类的所有属性吗 答案: 不可以,当然是不可以全部属性和方法都继承,那么哪些不可以继承? 最起码私有的就不可以,私有的属性和方法都不可以,其他的那就需要继续去测试了。 我用的是jdk1.8来…

Docker 的数据管理、镜像的创建

Docker 的数据管理、镜像的创建 Docker 的数据管理1.数据卷2.数据卷容器端口映射容器互联(使用centos镜像) Docker 镜像的创建1.基于现有镜像创建(1)首先启动一个镜像,在容器里做修改…

使用docker 部署自己的chatgpt

直接docker部署 docker run --name chatgpt-web -d -p 3002:3002 --env OPENAI_API_KEYyour_api_key chenzhaoyu94/chatgpt-web:latestDocker compose部署 version: 3services:app:image: chenzhaoyu94/chatgpt-web # 总是使用 latest ,更新时重新 pull 该 tag 镜像即可ports…

拆解雪花算法生成规则 | 京东物流技术团队

1 介绍 雪花算法(Snowflake)是一种生成分布式全局唯一 ID 的算法,生成的 ID 称为 Snowflake IDs 或 snowflakes。这种算法由 Twitter 创建,并用于推文的 ID。目前仓储平台生成 ID 是用的雪花算法修改后的版本。 雪花算法几个特性…

体验百度大模型文心千帆有感

大家好,我是雄雄,微信公众号:雄雄的小课堂,欢迎关注。 前言 近段时间来,各种大厂都推出了自己的大模型平台,有讯飞星火大模型、百度文心大模型、阿里通义千问大模型、claude、还有openai公司出产的chatgpt…

基于so-token的前后端分离项目 + uni-app微信小程序

PC端效果: 移动端效果: Sa-Token 是一个轻量级 Java 权限认证框架,主要解决:登录认证、权限认证、单点登录、OAuth2.0、分布式Session会话、微服务网关鉴权 等一系列权限相关问题。 uni-app 是一个使用 Vue.js 开发所有前端应用的…

合并二叉树

给你两棵二叉树: root1 和 root2 。 想象一下,当你将其中一棵覆盖到另一棵之上时,两棵树上的一些节点将会重叠(而另一些不会)。你需要将这两棵树合并成一棵新二叉树。合并的规则是:如果两个节点重叠&#…

荧光粉的发光效率是多少?--光致发光量子效率检测系统

稀土荧光粉广泛应用于生态照明、动态显示、通讯卫星、光学计算机及生物分子探针等高科技领域。三基色荧光粉是目前最有研究价值的荧光粉,其在可见光区具有丰富的谱线,发光谱带狭窄,发光能量更为集中;具有较强的抗紫外辐照能力&…

用户认证模式Cookie-Session、JWT-Token(goland实现)

用户认证 Cookie-Session认证模式简介代码示例优缺点 Token认证模式简介JWT介绍JWT结构标头(Header)负载(Payload)签名(Signature) 代码示例JWT优缺点Access Token和Refresh Token认证模式代码示例 在计算机…

解决QT 编译qmake 无法找到问题

问题: Command qmake not found, but can be installed with: sudo apt install qtchooser 原因: 这个错误提示指出在当前环境中找不到 qmake 命令 解决方法: 其实ubuntu已经给提示了就是要安装qtchooser 安装命令为: sudo…

Springboot中 AOP实现日志信息的记录到数据库

1、导入相关的依赖 <!--spring切面aop依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency> 注意&#xff1a;在application.properties文件里加这样一…

串2:云计算架构思考

开始之前&#xff0c;先给出串1&#xff1a;一文将大数据、云计算、物联网、5G&#xff08;移动网&#xff09;、人工智能等最新技术串起来_龙赤子的博客-CSDN博客 承上 事物的复杂性一般有两个方面&#xff0c;一个是本身结构的复杂&#xff0c;一个是运行机制的复杂。因为这…

如何判断某个视频是深度伪造的?

目录 一、前言 二、仔细检查面部动作 三、声音可以提供线索 四、观察视频中人物的身体姿势 五、小心无意义的词语 深造伪造危险吗&#xff1f; 一、前言 制作深度伪造视频就像在Word文档中编辑文本一样简单。换句话说&#xff0c;您可以拍下任何人的视频&#xff0c;让他…

Java 使用 Google Guava 实现接口限流

一、引入依赖 <dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>30.0-jre</version> </dependency>二、自定义注解及限流拦截器 自定义注解&#xff1a;Limiter package com.haita…

更省更快更安全的云服务器,一站式集中管理,随时随地远程——站斧云桌面

随着全球化和数字化经济的发展&#xff0c;越来越多的企业开始海外扩张和拓展国际市场。而云服务器作为一种高效、灵活且可靠的IT基础设施方案&#xff0c;已成为出海企业不可或缺的重要工具。这里就为大家介绍云服务器在出海企业中的几个使用场景。 1.全球范围内协同办公 对…

Unity 资源与源码

Unity场景资源 Unity场景资源

USB TO TTL

什么是USB TO TTL&#xff1f; 常用的单片机&#xff0c;引出来的串口&#xff0c;如果不加其他的接口电路&#xff0c;出来的信号就是TTL电平。 如果需要看串口的打印信息&#xff0c;一般是需要接一个上位机的&#xff0c;常规的就是电脑&#xff0c;而现在的电脑一般的通信…

transformer理解

transformer的理解 Q、K、V的理解 核心是自注意力机制。即每个位置的结果为所有位置的加权平均和。为了得到每个位置的权重,需要Q*K得到。 整个多头的self-attention过程 单个encoder encoder-decoder encoder中的K和V会传到decoder中的encoder-decoder attention中。 …

Linux实训笔记~操作系统概述

1、操作系统 操作系统作为接口的示意图: 没有安装操作系统的计算机, 通常被称为裸机。 2、不同应用利于的主流操作系统 桌面操作系统 服务器操作系统 嵌入式操作系统 移动设备操作系统