Canal简单介绍
贴个官方网址:阿里巴巴MySQL binlog 增量订阅&消费组件
架构图:
基于日志增量订阅和消费的业务包括
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
以上资料来源官网
Canal的简单使用
MySQL配置
首先需要安装MySQL数据库,目前笔者使用的MySQL版本是最新的8.0.33,这个过程就不在赘述了
在Windows下,一般在ProgramData/MySQL文件夹中就能找到配置文件"mysql.ini",打开,搜索修改或新建以下选项:
# 指定服务的id,这个要与canal中的区分开,因为每个服务节点的id都要不一样
server-id=1
# 生成的binlog文件的前缀名称,Windows下binlog文件一般存在ProgramData/MySQL/Data文件夹中
log-bin="NS9052929-bin"
# binlog日志的记录方式:row、statement、mixed
binlog_format=row
# 需要记录binlog的数据库,使用逗号分割可以指定多个,如不配置则是所有
binlog-do-db=canal-demo
然后重启MySQL数据库服务,Windows在服务窗口中就可以重启
新建数据库
新建一个架构(数据库):canal-demo
CREATE DATABASE `canal-demo`
/* DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci */
向其中添加一张表
-- auto-generated definition
create table user
(
id int auto_increment comment '用户id'
primary key,
username varchar(100) not null comment '用户名',
age int default -1 not null comment '用户年龄'
);
Canal的配置
在github中的release中下载压缩包:Releases-alibaba/canal
我下载了最新版本
把它解压到文件中,修改两个配置文件:
修改cana.properties
,将其中的配置项修改:
# 这个要与上面图片中的文件夹名对应起来,其实是对应多个实例
# 如果要新建实例,复制一个example,改名字,并修改其中的配置文件即可
canal.destinations = example
# 设置服务端口,默认为11111
canal.port = 11111
# 设置服务模式,因为下面是对接Java,因此使用TCP
canal.serverMode = tcp
# 设置数据库的连接账号以及密码
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
修改实例中的配置文件:instance.properties
# 设置数据库路径
canal.instance.master.address=127.0.0.1:3306
# 设置数据库的账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 服务id,需要与mysql中的区分开
canal.instance.mysql.slaveId=20
修改完成后,双击bin/startup.bat
,启动canal,看到下面页面则说明启动成功:
结合Java使用
新建一个maven项目, 向pom.xml
文件中加入以下依赖:
<!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.client -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.6</version>
</dependency>
编写一个客户端程序,连接canal:
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
public class CanalClient {
public static void main(String[] args) {
// hostname, port, destination, username, password,username和password默认为空
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
try {
connector.connect();
// 监听的表,格式为数据库.表名,数据库.表名
connector.subscribe("canal-demo.*");
// 不断循环获取
while (true) {
Message message = connector.getWithoutAck(100); // 获取指定数量的数据
List<Entry> entries = message.getEntries();
// 如果没有数据的话就等待1秒
if (entries.isEmpty()) {
log.info("没有数据,休息一下");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
// 有数据的话循环打印出数据
for (Entry entry : entries) {
String tableName = entry.getHeader().getTableName();
log.info("表名:{}", tableName);
// 判断Entry类型是否为ROW变换
EntryType entryType = entry.getEntryType();
if (EntryType.ROWDATA.equals(entryType)) {
log.info("ROW变换");
// 序列化数据
ByteString storeValue = entry.getStoreValue();
// 反序列化数据
RowChange rowChange = RowChange.parseFrom(storeValue);
// 获取事件类型
log.info("事件类型:{}", rowChange.getEventType());
// 获取具体数据
List<RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData : rowDatasList) {
List<Column> beforeColumnsList = rowData.getBeforeColumnsList();
Map<String, Object> beforeMap = new HashMap<>();
for (Column column : beforeColumnsList) {
beforeMap.put(column.getName(), column.getValue());
}
log.info("变化前的数据:{}", beforeMap);
List<Column> afterColumnsList = rowData.getAfterColumnsList();
Map<String, Object> afterMap = new HashMap<>();
for (Column column : afterColumnsList) {
afterMap.put(column.getName(), column.getValue());
}
log.info("变化后的数据:{}", afterMap);
}
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 最后关闭连接
connector.disconnect();
}
}
}
运行程序,insert、update、delete监控的数据库中的数据,就会看到控制台中有打印消息
结合Spring Boot使用
新建一个Spring Boot
应用,向其中添加以下依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>4.0.0-rc-2</version>
</dependency>
</dependencies>
Spring Booot
版本是2.7.11
,使用的JDK8
依赖中的canal-spring-boot-starter
是其他开源者对canal在Spring Boot中的集成,测试不支持JDK17的版本,因此Spring Boot版本只能为3.0以下。
如果需要使用JDK17的话,也就是Spring Boot 3.0以上或者Spring 6的话,可以用另外一个开发者的包:behappy-canal,当然还有其他开发者也做了canal的集成,大家自己尝试下吧~
<dependency>
<groupId>io.github.behappy-project</groupId>
<artifactId>behappy-canal-spring-boot-starter</artifactId>
<version>3.0.2</version>
</dependency>
配置文件:
# 数据库连接信息
spring.datasource.url=jdbc:mysql://localhost:3306/canal-demo?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&useSSL=false
spring.datasource.username=root
spring.datasource.password=root
# Spring 服务名称
spring.application.name=canal-spring-boot-demo
# canal的服务地址
canal.server=127.0.0.1:11111
# 需要监控的实例
canal.destination=example
# 关闭日志,不然一秒打印一个日志,浪费空间
logging.level.top.javatool.canal.client=OFF
首先新建一个实体类User
对应数据库中的字段
@Data
public class User {
private Integer id;
private String username;
private Integer age;
}
然后书写一个CanalHandler
实现接口EntryHandler<T>
,在里面对insert、update、delete这几种操作加入自己的处理
import com.example.springdemo.model.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;
@Component
@CanalTable("user")
@Slf4j
public class CanalHandler implements EntryHandler<User> {
@Override
public void insert(User user) {
log.info("插入用户:{}", user.toString());
}
@Override
public void update(User before, User after) {
log.info("用户修改前:{}", before.toString());
log.info("用户修改后:{}", after.toString());
}
@Override
public void delete(User user) {
log.info("删除用户:{}", user.toString());
}
}
运行结果
总结
通过Canal可以没有侵入的,即时的将数据库的改动同步到Redis、ElasticSearch或者其他数据存储库中,如果是在大数据方面需要数据聚合的话,推荐使用Flink CDC。目前Canal还有一个问题就是似乎不再维护了,但还是为我们提供了一个轻量化的数据迁移、同步工具。