什么是Canal
Canal 主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。
Canal目前没有独立的官网,可以在GitHub上下载和查看Canal文档,地址如下:https://github.com/alibaba/canal/wiki
Canal工作原理
-
MySQL主备复制原理
1)MySQL master 将数据变更写入二进制日志(binary log, 其中记录叫做二进制日志事件binary log events
,可以通过 show binlog events 进行查看)
2)MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
注意:中继日志是从服务器I/O线程将主服务器的二进制日志读取过来,记录到从服务器本地文件,然后从服务器SQL线程会读取relay-log日志的内容并应用到从服务器,从而使从服务器和主服务器的数据保持一致。
3)MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
(从节点获取主节点的binlog日志) -
canal 工作原理
1)canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
2)MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
3)canal 解析 binary log 对象(原始为 byte 流)
注意:mysql-binlog是MySQL数据库的二进制日志,记录了所有的DDL和DML(除了数据查询语句)语句信息。一般来说开启二进制日志大概会有1%的性能损耗。
(canal模拟为从节点,获取binlog日志并解析)
Canal1.1.4安装
安装前准备-开启MySQL binlog
对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,开启Mysql binlog日志步骤如下:
1)登录mysql查看MySQL是否开启binlog日志
mysql -u root -p
mysql> show variables like 'log_%';
(ON表示已开启)
2)开启mysql binlog日志
在/etc/my.cnf文件中[mysqld]下写入以下内容(如果是OFF状态):
[mysqld]
# 随机指定一个不能和其他集群中机器重名的字符串,配置 MySQL replaction 需要定#义,不要和 canal 的 slaveId 重复
server-id=123
#配置binlog日志目录,配置后会自动开启binlog日志,并写入该目录
log-bin=/var/lib/mysql/mysql-bin
# 选择 ROW 模式
binlog-format=ROW
MySQL binlog-format有三种模式:Row、Statement 和 Mixed 。
- Row:不记录sql语句上下文相关信息,仅保存哪条记录被修改。
优点: binlog中可以不记录执行的sql语句的上下文相关的信息,仅需要记录那一条记录被修改成什么了。所以row level的日志内容会非常清楚的记录下每一行数据修改的细节。
缺点:所有的执行的语句当记录到日志中的时候,都将以每行记录的修改来记录,这样可能会产生大量的日志内容,比如一条update语句,修改多条记录,则binlog中每一条修改都会有记录,这样造成binlog日志量会很大,特别是当执行alter table之类的语句的时候,由于表结构修改,每条记录都发生改变,那么该表每一条记录都会记录到日志中。 - Statement(默认):每一条会修改数据的sql都会记录在binlog中。
这种模式下,slave在复制的时候sql进程会解析成和原来master端执行过的相同的sql来再次执行。
优点:不需要记录每一行的变化,减少了binlog日志量,节约了IO,提高性能。
缺点:由于只记录语句,所以,在statement level下 已经发现了有不少情况会造成MySQL的复制出现问题,主要是修改数据的时候使用了某些定的函数或者功能的时候会出现。 例如:update 语句中含有uuid() ,now() 这种函数时,Statement模式就会有问题(update t1 set xx = now() where xx = xx) - Mixed: 混合模式
在Mixed模式下,MySQL会根据执行的每一条具体的sql语句来区分对待记录的日志格式,也就是在Statement和Row之间选择一种。如果sql语句确实就是update或者delete等修改数据的语句,那么还是会记录所有行的变更。
3)重启mysql 服务,重新查看binlog日志情况
[root@node2 ~]# service mysqld restart
[root@node2 ~]# mysql -u root -p
mysql> show variables like 'log_%';
使用Canal同步MySQL数据
使用Canal同步MySQL的数据可以直接使用Canal客户端API方式消费Canal同步的数据,详细api参照:https://github.com/alibaba/canal/wiki/ClientAPI ,也可以直接通过Canal将数据写入Kafka
原理
EventParser在向mysql发送dump命令之前会先从Log Position中获取上次解析成功的位置(如果是第一次启动,则获取初始指定位置或者当前数据段binlog位点)。mysql接受到dump命令后,由EventParser从mysql上pull binlog数据进行解析并传递给EventSink(传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储成功),传送成功之后更新Log Position。流程图如下:
EventSink起到一个类似channel的功能,可以对数据进行过滤、分发/路由(1:n)、归并(n:1)和加工。EventSink是连接EventParser和EventStore的桥梁。
EventStore实现模式是内存模式,内存结构为环形队列,由三个指针(Put、Get和Ack)标识数据存储和读取的位置。
MetaManager是增量订阅&消费信息管理器,增量订阅和消费之间的协议包括get/ack/rollback,分别为:
(1)Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:batch id[唯一标识]和entries[具体的数据对象]。
(2)void rollback(long batchId),顾名思义,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作。
(3)void ack(long batchId),顾名思义,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作。
关于同步MySQL数据配置信息
首先Canal可以是一个集群,这里以Canal单机为例解释Canal同步MySQL数据配置文件配置原理。
首先需要在Canal中配置CanalServer 对应的canal.properties,这个文件中主要配置Canal对应的同步数据实例(Canal Instance)位置信息及数据导出的模式,例如:我们需要将某个mysql中的数据同步到Kafka中,那么就可以创建一个“数据同步实例”,导出到Kafka就是一种模式。
其次,需要配置Canal Instance 实例中的instance.properties文件,指定同步到MySQL数据源及管道信息。
具体步骤
1)配置canal.properties”
#canal将数据写入Kafka,可配:tcp, kafka, RocketMQ,tcp就是使用canal代码接收
canal.serverMode = kafka
#配置canal写入Kafka地址
canal.mq.servers = node1:9092,node2:9092,node3:9092
关于canal.properties更多参数参照:https://github.com/alibaba/canal/wiki/AdminGuide
2)配置mysql slave的权限
Canal的原理是模拟自己为mysql slave,所以这里一定需要做为mysql slave的相关权限 ,授权Canal连接MySQL具有作为MySQL slave的权限:
mysql> CREATE USER canal IDENTIFIED BY 'canal';
mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
mysql> FLUSH PRIVILEGES;
mysql> show grants for 'canal' ;
3)配置“instance.properties”
进入“/conf/example/”下,编辑“instance.properties”文件
#canal伪装为一个mysql的salve,配置其id,不要和真正mysql server-id冲突,这里也可以不配置,会自动生成
canal.instance.mysql.slaveId=123456
#配置mysql master 节点及端口
canal.instance.master.address=node2:3306
#配置连接mysql的用户名和密码,就是前面复制权限的用户名和密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
#配置Canal将数据导入到Kafka topic
canal.mq.topic=canal_topic
配置参照:
https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
关于“instance.properties”更多参数介绍如下:https://github.com/alibaba/canal/wiki/AdminGuide#instanceproperties%E4%BB%8B%E7%BB%8D
4)启动Canal
进入“/canal/bin”,执行“startup.sh”脚本启动Canal。
#启动Canal
[root@node3 ~]# cd /software/canal/bin/
[root@node3 bin]# ./startup.sh
[root@node3 bin]# jps
68675 CanalLauncher #启动成功
5)启动zookeeper和Kafka,并监控Kafka中“canal_topic”的数据
[root@node2 bin]# ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic canal_topic
6)在MySQL中建表,插入语句
mysql> create database testdb;
mysql> use testdb;
mysql> create table person(id int ,name varchar(255),age int);
mysql> insert into person values (1,"zs",18),(2,"ls",19),(3,"ww",20);
#对应的在Kafka中有对应的数据日志写入
关于以上json字段解析如下:
- data:最新的数据,为JSON数组,如果是插入则表示最新插入的数据,如果是更新,则表示更新后的最新数据,如果是删除,则表示被删除的数据。
- database:数据库名称。
- es:事件时间,13位的时间戳。
- id:事件操作的序列号,1,2,3…
- isDdl:是否是DDL操作。
- mysqlType:字段类型。
- old:旧数据。
- pkNames:主键名称。
- sql:SQL语句。
- sqlType:是经过canal转换处理的,比如unsigned int会被转化为Long,unsigned long会被转换为BigDecimal。
- table:表名。
- ts:日志时间。
- type:操作类型,比如DELETE,UPDATE,INSERT。