简介
https://github.com/alibaba/canal
基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
原理是使用程序模拟一个mysql的从库,使主库发送同步日志给程序,程序再对数据进行处理,比如同步到其他数据库。
使用
下图是一个典型的将mysql增量数据同步到es的流程
这里要注意的是,canal client不是连接数据库,而是必须连接到canal server上的
canal server: canal 服务端,内部可包含多个canal instance
canal instance: 运行在canal server中, 可被canal client连接
canal client: 集成在业务代码中,可连接canal instacne读取bin-log事件,并进行相应处理
安装canal 服务端
直接使用docker来安装
https://github.com/alibaba/canal/wiki/Docker-QuickStart
下载官方镜像及run.sh文档
docker pull canal/canal-server:latest
wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh
可以根据需要修改一下run.sh时面的一些端口和内存配置,然后编写自己的start.sh脚本
#!/bin/sh
run.sh -e canal.auto.scan=false \
-e canal.instance.mysql.slaveId=10001 \
-e canal.zkServers=192.168.1.10:1100 \
-e canal.instance.global.spring.xml=classpath:spring/default-instance.xml \
-e canal.destinations=test \
-e canal.instance.master.address=127.0.0.1:3306 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=canal \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false \
-e canal.instance.filter.regex=.*\\..*
canal服务端配置说明:
https://github.com/alibaba/canal/wiki/AdminGuide
canal.destinations:实例列表
canal.instance.master.address: 源数据库地址
canal.instance.dbUsername:源数据库用户名
canal.instance.dbPassword: 源数据库密码
canal.instance.connectionCharset: 数据库连接字符集
canal.instance.filter.regex:需要处理的表(Perl正则表达式),多个规则使用’,'分隔
线上是使用阿云数据库,阿里云binlog是默认开启的,本地保留时间为18小时,一般来说够用了,不够的话就要去配置保存到oss了。https://github.com/alibaba/canal/wiki/aliyun-RDS-QuickStart
集成客户端
导入库
compile('com.alibaba.otter:canal.client:1.1.6') {
exclude group: 'org.slf4j'
exclude group: 'ch.qos.logback'
}
compile('com.alibaba.otter:canal.protocol:1.1.6') {
exclude group: 'org.slf4j'
exclude group: 'ch.qos.logback'
}
代码示例
//创建连接
CanalConnector connector = CanalConnectors.newClusterConnector("192.168.1.10:1100", "test", "", "");
//连接服务端
connector.connect();
connector.subscribe();
//循环获取事件
while (running) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
//无数据时睡眠一会
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
//开始处理数据
for (CanalEntry.Entry entry : message.getEntries()) {
switch (entry.getEntryType()) {
case TRANSACTIONBEGIN:
processTransactionBegin(entry);
break;
case ROWDATA:
processRawDataEntry(entry);
break;
case TRANSACTIONEND:
processTransactionEnd(entry);
break;
}
}
}
if (batchId != -1) {
connector.ack(batchId); // 提交确认
}
}
值得注意的是,canal事件包含事务开始,数据,事务结束等,一般来说一个事务会在一个message中返回,但也有可能在多个message中,这个和batchSize、服务端缓冲区大小有关系。
一个message中只会包含一个事务。
在没有提交事务之前,binlog不会同步给从服务器,所以canal收到的事件都是提交之后的。