1、Canal实现数据同步
canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。
1.1 Canal工作原理
原理相对比较简单:
1、canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议。
2、mysql master收到dump请求,开始推送binary log给slave(也就是canal)。
3、canal解析binary log对象(原始为byte流)。
canal需要使用到mysql,我们需要先安装mysql,canal是基于mysql的主从模式实现的,所以必须先开启
binlog。
1.2 安装MySQL
# 搜索MySQL镜像
$ docker search mysql
# 拉取MySQL镜像
$ docker pull mysql:5.5
# 启动MySQL
$ docker run -d -p 3306:3306 -v /home/zhangshixing/linuxmysql/conf:/etc/mysql/conf.d -v /home/zhangshixing/linuxmysql/data:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=123456 --name zsxmysq1 mysql:5.5
$ docker exec -it fc566f7d1857 /bin/bash
$ mysql -h 192.168.94.186 -u root -p
1.3 开启binlog模式
(1) 、连接到mysql中,并修改 /etc/mysql/my.cnf
需要开启主从模式,开启binlog模式。
执行如下命令,编辑 mysql 配置文件:
$ docker exec -it fc566f7d1857 /bin/bash
# 该目录一定要存在,否则log日志无法写入
$ cd /etc/mysql/
$ vim my.cnf
没有vim,需要安装vim:
$ apt-get update
$ apt-get install vim
修改 my.cnf
配置文件,添加如下配置:
# 开启logbin
log-bin=/var/lib/mysql/mysql-bin
# binlog日志格式
binlog-format=ROW
# mysql主从备份serverId,canal中不能与此相同
server-id=12345
(2) 、创建账号,用于测试使用,使用root账号创建用户并授予权限
# CREATE USER <用户名> [ IDENTIFIED ] BY [ PASSWORD ] <口令>
# <用户名>格式为 'user_name'@'host_name',这里user_name是用户名,host_name为主机名
# "%" 表示一组主机
create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, INSERT, DELETE, UPDATE, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
(3)、重启mysql容器
$ docker restart fc566f7d1857
(4)、查看配置是否生效
$ docker exec -it fc566f7d1857 /bin/bash
$ mysql -h 192.168.94.186 -u canal -p
# binlog日志文件
$ show master status;
# 查看是否配置成功
$ show variables like 'binlog_format';
查看日志文件:
使用Navicat新建canal_test
数据库和t_user_1
表。
# 查看日志信息
$ mysqlbinlog -vv mysql-bin.000001
1.4 canal容器安装
下载镜像:
$ docker pull docker.io/canal/canal-server
容器安装:
$ docker run -p 11111:11111 --name canal -d docker.io/canal/canal-server
进入容器,修改核心配置canal.properties
和instance.properties
,canal.properties
是canal自身的
配置,instance.properties
是需要同步数据的数据库连接配置。
执行代码如下:
$ docker exec -it canal /bin/bash
$ cd canal-server/conf/
$ vim canal.properties
$ cd example/
$ vim instance.properties
修改canal.properties
的id,不能和mysql的server-id重复,如下图:
修改instance.properties
,配置数据库连接地址:
这里的canal.instance.filter.regex
有多种配置,如下:
可以参考地址如下:
https://github.com/alibaba/canal/wiki/AdminGuide
mysql 数据解析关注的表,Perl正则表达式。
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\)
常见例子:
1、所有表:.* or .*\\..*
2、canal schema下所有表:canal\\..*
3、canal下的以canal打头的表:canal\\.canal.*
4、canal schema下的一张表:canal.test1
5、多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
注意:此过滤条件只针对row模式的数据有效(mixed/statement因为不解析sql,所以无法准确提取
tableName进行过滤)。
配置完成后,设置开机启动,并记得重启canal。
$ docker update --restart=always canal
$ docker restart canal
验证配置是否成功:
$ docker exec -it canal /bin/bash
$ cd canal-server/logs/example/
# 查看日志
$ tail -100f example.log
1.5 canal服务搭建
当用户执行数据库的操作的时候,binlog 日志会被canal捕获到,并解析出数据。
1.5.1 安装辅助jar包
在spring-boot-starter-canal-master
中有一个工程starter-canal
,它主要提供了SpringBoot环境下
canal
的支持,我们需要先安装该工程,在starter-canal
目录下执行mvn install
,如下图:
1.5.2 canal服务工程搭建
在spring-boot-starter-canal-master
中创建一个工程canal-test
pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>cana-test</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>cana-test</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
<relativePath/>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.xpand</groupId>
<artifactId>starter-canal</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml配置
# canal配置
canal.client.instances.example.host=192.168.94.186
canal.client.instances.example.port=11111
监听创建
创建一个CanalDataEventListener
类,实现对表增删改操作的监听,代码如下:
package com.example.canatest.config;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.xpand.starter.canal.annotation.*;
/**
* @author zhangshixing
*/
@CanalEventListener
public class MyEventListener {
/**
* 增加数据监听
*
* @param eventType
* @param rowData
*/
@InsertListenPoint
public void onEvent1(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
System.out.println("InsertListenPoint");
rowData.getAfterColumnsList().forEach((c) -> System.out.println("By--Annotation: " + c.getName() + " :: " + c.getValue()));
}
/**
* 修改数据监听
*
* @param rowData
*/
@UpdateListenPoint
public void onEvent2(CanalEntry.RowData rowData) {
System.out.println("UpdateListenPoint");
rowData.getAfterColumnsList().forEach((c) -> System.out.println("By--Annotation: " + c.getName() + " :: " + c.getValue()));
}
/**
* 刪除数据监听
*
* @param eventType
*/
@DeleteListenPoint
public void onEvent3(CanalEntry.EventType eventType) {
System.out.println("DeleteListenPoint");
}
/**
* 自定义数据修改监听
*
* @param eventType
* @param rowData
*/
@ListenPoint(destination = "example", schema = "canal_test", table = {"t_user_1", "t_user_2"}, eventType = CanalEntry.EventType.INSERT)
public void onEvent4(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
System.out.println("CustomListenPoint");
rowData.getAfterColumnsList().forEach((c) -> System.out.println("By--Annotation: " + c.getName() + " :: " + c.getValue()));
}
}
启动类创建
创建启动类,代码如下:
package com.example.canatest;
import com.xpand.starter.canal.annotation.EnableCanalClient;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableCanalClient
public class CanaTestApplication {
public static void main(String[] args) {
SpringApplication.run(CanaTestApplication.class, args);
}
}
测试
启动canal微服务,然后修改任意数据库的表数据,canal微服务后台输出信息。
1、插入数据
2、修改数据
3、删除数据
4、自定义插入