1总体简介
1 mysql需要开启binlog
binlog分类
1.1)statement:
语句级别,binlog 会记录每一次执行写操作的语句。相对于row模式节省空间,但是会产生数据不一致性,例如:update aa set create_time=new(); 如果用binlog日志进行恢复,由于执行时间不同可能产生的数据就不同。
优点:节省空间
缺点:可能造成数据不一致。(因为只记录sql语句,万一 id>5的个数不一样呢? 随机数函数等等情况)
1.2)row:
行级 binlog会记录每次操作后每行记录的变化。
优点:保持数据的绝对一致性。因为不管sql是什么,引用了什么函数,他只记录执行后的效果。
缺点:占用较大空间
1.3)mixed:
statement升级版,在一定程度上解决了statement模式数据不一致问题。例如:函数中包含UUID(),包含AUTO_INCREMENT字段被更新时,执行 insert ,delayed语句时,会按照row方式,进行处理。
优点:节省空间,同时兼顾一定的一致性
缺点:还是会造成数据不一致情况
2 Canal介绍
Canal:伪装成Slave,假装从Master复制数据
2 安装开始
2.1 mysql配置文件编辑
编辑mysql配置文件(看你安装方式)
vim /etc/my.cnf
[mysqld]
#开启日志
log-bin = mysql-bin
#binlog级别 (statement:只记操作命令,有可能导致主从数据不一致,row:数据一致 mixed:)
binlog_format=row
#设置服务id,主从不能一致 ,一般设置为ip最后一段
server-id = 19
#设置需要同步的数据库
binlog-do-db=aa_db
#屏蔽系统库同步
binlog-ignore-db=mysql
binlog-ignore-db=bb_db
3 重启
systemctl restart mysqld
4登录主库
mysql -uroot -p
5 创建用户
降低密码强度(不建议使用)
SELECT @@VALIDATE_PASSWORD_POLICY;
set global validate_password_policy = 0;
set global validate_password_length=1;
6 授权主从复制专用账号(给从库复制数据使用的)
GRANT REPLICATION SLAVE ON *.* TO canal'@'%' IDENTIFIED BY 'canal';
7 刷新权限
flush privileges;
2.2 Canal安装
下载安装包
Releases · alibaba/canal · GitHub
1解压
tar -zxvf canal.deployer-1.1.6.tar.gz -C /usr/local/canal/
2编辑配置文件
example:一个mysql实例,可以有多个
cd usr/local/canal/conf
vim canal.properties
我们不需要直接发送mq所以选择tcp。异步同步数据系列(canal+mq)
canal.properties文件
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = tcp
# 默认的实例,可以配置多个,同时可以监控多个mysql服务。多个用逗号隔开
canal.destinations = example
instance.properties 编辑
vim conf/example/instance.properties
#默认注释的 放开要 不要和主库重复了 slaveId号
canal.instance.mysql.slaveId=0
# mysql主库的连接地址
canal.instance.master.address=192.168.135.128:3306
# mysql主库自己创建的同步数据的账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
开通端口号
firewall-cmd --permanent --add-port=11111/tcp
firewall-cmd --reload
启动
./startup.sh
查看是否连接数据成功
cd /usr/local/canal/logs/example
tail -n 1000 -f example.log
3 代码使用
3.1 代码
<dependency>
<groupId>com.xpand</groupId>
<artifactId>starter-canal</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
canal:
client:
instances:
example:
host: 192.168.135.128
port: 11111
batchSize: 100
@EnableCanalClient
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.xpand.starter.canal.annotation.CanalEventListener;
import com.xpand.starter.canal.annotation.ListenPoint;
/**
* @创建人 赵伟
* @创建时间 2022/12/10
* @描述
*/
@CanalEventListener
public class BusinessListener {
/**
* destination = "example" mysql实例(默认:example)可以不配置
* schema:库名
* table:表名
* eventType:CanalEntry.EventType.INSERT,DELETE,UPDATE。 配置了哪个他只接收哪个类型
* @param eventType 当前操作数据库的类型
* @param rowData 当前操作数据库的数据
*/
@ListenPoint(destination = "example",schema = "db1", table = "t_order")
public void listenerOrder(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
//1新增数据: eventType:INSERT , BeforeColumnsList 将会是空
//2 删除数据 eventType:DELETE , AfterColumnsList 将会是空
//3 更新数据 eventType:UPDATE, BeforeColumnsList 将是旧数据 AfterColumnsList 将是新数据
// 没有查询类型
System.out.println("订单表数据发生改变");
System.out.println("eventType:"+eventType);
//获取改变之前的数据
rowData.getBeforeColumnsList().forEach((c) ->
System.out.println("改变前的数据:" + c.getName() + "::" + c.getValue())
);
//获取改变之后的数据
rowData.getAfterColumnsList().forEach((c) ->
System.out.println("改变之后的数据:" + c.getName() + "::" + c.getValue())
);
}
}
3.2 坐标安装
https://github.com/chenqian56131/spring-boot-starter-cana
mvn install
直接就打到了你配置好的仓库位置