1. Canal简介
https://github.com/alibaba/canal
1.1 Canal工作原理
MySQL主备复制原理
- MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log
events,可以通过 show binlog events 进行查看) - MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
- MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal 工作原理
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump
协议 - MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
2. 启动Canal
前提是mysql、rabbitmq已经安装完成
2.1 打开mysql的binlog,修改my.cnf
[mysqld]
log-bin=mysql-bin #开启binlog
binlog-format=ROW #选择ROW模式
server_id=1 # 配置MySQL replaction需要定义,不和Canal的slaveId重复即可
查看是否生效:
show variables like 'log_bin';
2.2 下载canal.deployer
https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz
下载完之后解压压缩文件,进入canal.deployer-1.1.6之后可以看到:
2.2.1 进入\canal.deployer-1.1.6\config目录:编辑canal.properties文件
配置 Canal 服务方式为 RabbitMQ 和连接配置(🏷 只列出需要修改的地方)
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = rabbitMQ
##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host = 192.168.0.181
rabbitmq.virtual.host = /test
rabbitmq.exchange = canal.exchange
rabbitmq.username = user123
rabbitmq.password = pass123
rabbitmq.deliveryMode =
填写自己RabbitMQ的地址以及用户名密码和虚拟机即可.
2.2.2 进入\canal.deployer-1.1.6\config\example目录:编辑instance.properties文件
(1)配置mysql的连接地址和正在使用的bin-log文件以及偏移量
# position info
canal.instance.master.address=192.168.0.80:3306
canal.instance.master.journal.name=mysql-bin.000988
canal.instance.master.position=446897591
canal.instance.master.timestamp=
canal.instance.master.gtid=
查看bin-log文件可使用以下命令查看
# 查看正在使用的二进制日志文件和对应的偏移量
SHOW MASTER STATUS;
# 查看所有的二进制日志文件和对应文件的大小
SHOW BINARY LOGS;
(2)配置要监听的数据库
Canal的订阅规则是通过正则表达式来进行匹配的。具体来说,订阅规则由三个部分组成:数据库名、表名和操作类型,使用“.”分隔。
例如,订阅规则“testdb.test_table.*”表示订阅testdb数据库中名为test_table的表的所有操作类型。
test\…*表示监听test数据库下所有的表的所有操作
# table regex
# 监听的
canal.instance.filter.regex=test\\..*
# table black regex
# 不监听的
canal.instance.filter.black.regex=xxl_job\\..*
当然这里配置完毕之后可能会不生效,所以我们要进入\canal.deployer-1.1.6\config目录:编辑canal.properties文件
# binlog filter config
canal.instance.filter.query.dml = true
2.3 启动
在启动之前将canal.deployer-1.1.6\plugin中的jar包拷贝到 canal.deployer-1.1.6\lib目录下;否则启动会报错,因为我们使用了mq,但是在lib目录下没有相关mq的jar。
进入\canal.deployer-1.1.6\bin目录双击startup.bat即可,
或者在此目录下cmd命令:startup.bat pause
不加pause也可,加上只是防止窗口闪退
3. 创建SpringBoot项目
3.1添加依赖:
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
canal版本使用Canal服务相同版本,即1.1.6
3.2编写yml配置
spring:
rabbitmq:
addresses: 192.168.0.181:5672
username: user123
password: pass123
virtual-host: /test
listener:
simple:
acknowledge-mode: auto
direct:
acknowledge-mode: auto
3.3编写RabbitMQ Listener:
@Component
public class CanalListener {
private final static Logger logger = LoggerFactory.getLogger(CanalListener.class);
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = "canal.exchange"),
value = @Queue(value = "canal_queue"), key = "canal_routing_key"))
public void receiveMessage(Message message) {
logger.debug("receive canal message:{}", message);
MessageProperties messageProperties = message.getMessageProperties();
String body = new String(message.getBody());
logger.info("messageProperties:{}", messageProperties);
logger.info("body:{}", body);
}
}
其中的交换机、队列、路由key都是上面配置文件配置中一样的。
3.4 效果
执行如下sql之后:
UPDATE `student` SET `del_flag` = 0;
其中body结构大致如下:
表结构: