前言:在搭建canal 服务之后,项目中就可以连接canal ,完成对感兴趣的数据库及其表中数据的变动完成监听,本文依赖于你已经完成了对canal 服务的搭建工作;
1 Cannal 特点:
Canal是阿里巴巴开源的一款基于MySQL数据库的增量数据订阅和消费组件,能够把MySQL的binlog日志解析,转换成多种类型的事件回调给应用消费或同步到数据存储或搜索引擎等其他数据终端。它提供了类似数据流的方式,对数据进行持续性同步和协作,大幅度提升了数据处理的效率。
Canal的主要特点包括:
- 支持多种数据存储形式,包括MySQL、Oracle等,其设计和性能都针对常见的数据存储场景进行了优化;
- Canal结构清晰、易于上手,并且提供了完善的API和文档,能够方便地实现对数据流的订阅、消费和处理;
- Canal采用微服务架构,支持高可用和集群部署,能够保证数据的持久性和一致性,同时能够支持分布式的数据处理和存储;
- . Canal具有良好的稳定性、可靠性和灵活性,在实现数据库变更的实时同步和数据处理等方面具有广泛的应用价值。
Canal是一款高效、可靠、易用的开源组件,在数据库的数据同步和处理方面具有广泛的应用价值,同时也为企业快速实现数据同步和大规模数据处理提供了有效的技术支持。
2 springBoot 集成:
2.1 先来看下 Canal 客户端的消费流程:
-
canal server作为一个程序独立运行,在启动时会创建一个或多个线程来获取Binlog数据,同时监听与下游客户端的连接请求。
-
canal server连接MySQL主库或从库,并通过MySQL的binlog dump协议实现对Binlog数据的实时抓取。通过数据库相关API,canal server订阅mysql的binlog,并将获取到的Binlog写入内存或磁盘文件中。
-
当canal server捕获到新的Binlog数据时,会检测对应的数据源是否有相应的订阅(即
canal.destinations
参数配置),并将Binlog数据发送到对应的MQ Topic中。RocketMQ或其他MQ中间件将会缓存这些数据,等待下游的消费者来消费。 -
canal客户端作为消费者,通过订阅对应的MQ topic来获取Binlog数据,以实现对应用层的实时数据消费和处理。canal客户端在特定时刻发起订阅请求后,RocketMQ或其他MQ中间件会将缓存的数据推送给该客户端。
-
canal客户端在获取到Binlog数据后,会进行相应的处理和解析,并将结果写入配置的目标数据存储中(如MySQL、Redis等)。
canal server通过监听MySQL binlog dump协议,实现对Binlog实时抓取,并将数据推送到指定的MQ topic中。canal客户端可以通过订阅MQ topic得到实时的Binlog数据,以完成数据消费和处理,从而实现了MySQL数据的实时同步和处理。
通过canal 的消费流程我们可以知道:
- canal server 端,会获取到mysql 的数据变化时,将数据解析后,封装成MQ 消息,放入到对应的topic 中;
- canal client 端,会通过订阅canal server 端 感兴趣的topic 从而消费数据;
canal 实际上就是一个生产者和消费者的模型;canal server在默认情况下使用了RocketMQ作为消息队列,用于将解析出的binlog数据发送到下游的消费端,实现数据的异步传输和消息的可靠投递。
2.2 客户端 pom 依赖引入:每项依赖都进行了标注
<!-- 提供http 访问-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 提供实体类的get ,set ,toString 简化代码使用 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- 单元测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- canal 客户端的集成 -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.6</version>
</dependency>
<!-- 客户端通信 消息传递 -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.6</version>
</dependency>
2.3 客户端配置文件:
application.properties
# canal配置
# Canal服务器的IP地址
canal.host=localhost
# Canal监听的端口
canal.port=11111
# 监听的库名,可以自定义
canal.destination=biglog
# Canal连接MySQL的用户名
canal.username=canal
# Canal连接MySQL的密码
canal.password=canal
# Canal 客户端每次向 服务端拉取消息的数量
canal.client.batch.size=10
# Canal 客户端 感兴趣的表
canal.client.subscribe.filter=biglog.user|biglog.student|biglog.about_us
logging.level.root=debug
这里对配置的参数进行简单说明:
- canal.host : canal server 的ip 地址;
- canal.port : canal server 的 端口 默认11111 可以通过 服务端的canal.properties 配置文件进行修改;
- canal.destination : 要监听的mysql 实例名称;这个参数用来区分客户端消费服务端哪个topic下的消息;
- canal.username:要监听的mysql 实例连接用户名
- canal.password: 要监听的mysql 实例连接密码
- canal.client.batch.size: 客户端每次向服务端拉取的消息数量;
- canal.client.subscribe.filter: 过滤客户端感兴趣的表,正则表达,多个正则之间以逗号(,)分隔,转义符需要双斜杠(\);
这里会发现:
(1)在服务端和客户端都进行了canal.destination 的配置,服务端canal.destination 配置了需要监听哪些mysql 的实例,并将mysql 实例的数据解析后放入 不同的topic 队列中;客户端的canal.destination 则定义了从哪个topic 消费数据;
(2)在服务端和客户端都进行了canal 连接mysql 实例用户名和密码的配置,服务端 配置的mysql用户是用来连接到mysql 的主/从 节点从而拿到binlog;客户端配置的mysql用户则是可以用来获取MySQL的表结构信息,比如列名、列类型、列约束等;
2.4 配置类装载:
- CanalClientConfig.java
package com.example.spring_canal.config;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.net.InetSocketAddress;
@Configuration
public class CanalClientConfig {
@Value("${canal.host}")
private String host;
@Value("${canal.port}")
private int port;
@Value("${canal.destination}")
private String destination;
@Value("${canal.username}")
private String username;
@Value("${canal.password}")
private String password;
@Bean
public CanalConnector canalConnector() {
return CanalConnectors.newSingleConnector(new InetSocketAddress(host, port), destination, username, password);
}
}
2)定义消息消费接口 CanalListener:
package com.example.spring_canal.listener;
import com.alibaba.otter.canal.protocol.Message;
public interface CanalListener {
void onMessage(Message msg);
}
3)定义消息消费业务实现 MyCanalListener:
package com.example.spring_canal.listener;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@Component
public class MyCanalListener implements CanalListener {
@Override
public void onMessage(Message msg) {
List<CanalEntry.Entry> entries = msg.getEntries();
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
CanalEntry.RowChange rowChange = null;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("parse error", e);
}
String tableName = entry.getHeader().getTableName();
CanalEntry.EventType eventType = rowChange.getEventType();
List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
String schemaName = entry.getHeader().getSchemaName();
// 处理数据变更事件
for (CanalEntry.RowData rowData : rowDataList) {
switch (eventType) {
case INSERT:
// 处理插入事件
dealInsert(schemaName, tableName, rowData.getAfterColumnsList());
break;
case UPDATE:
// 处理更新事件
dealUpdate(schemaName, tableName, rowData.getAfterColumnsList());
break;
case DELETE:
// 处理删除事件
dealDelate(schemaName, tableName, rowData.getBeforeColumnsList());
break;
default:
break;
}
}
}
}
}
private void dealDelate(String schemaName, String tableName, List<CanalEntry.Column> afterColumnsList) {
Map<String, Object> dataMap = new HashMap<>();
for (CanalEntry.Column column : afterColumnsList) {
dataMap.put(column.getName(), column.getValue());
}
// log.debug("delate data:{}", afterColumnsList);
log.debug("delate map data:{}", dataMap);
}
private void dealUpdate(String schemaName, String tableName, List<CanalEntry.Column> columns) {
Map<String, Object> dataMap = new HashMap<>();
for (CanalEntry.Column column : columns) {
dataMap.put(column.getName(), column.getValue());
}
// log.debug("update data:{}", columns);
log.debug("update map data:{}", dataMap);
}
private void dealInsert(String schemaName, String tableName, List<CanalEntry.Column> columns) {
Map<String, Object> dataMap = new HashMap<>();
for (CanalEntry.Column column : columns) {
dataMap.put(column.getName(), column.getValue());
}
// log.debug("insert data:{}", columns);
log.debug("insert map data:{}", dataMap);
}
}
4) 定义消息的消费CanalService:
package com.example.spring_canal.listener;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.List;
@Slf4j
@Component
public class CanalService {
@Value("${canal.client.subscribe.filter}")
private String canalFilter;
@Value("${canal.client.batch.size}")
private int batchSize;
@Autowired
private CanalConnector canalConnector;
@Autowired
private CanalListener canalListener;
@PostConstruct
public void run() {
// 定义最后消费的位点
long lastOffset = fetchFromPosition();
while (true) {
Message message = canalConnector.getWithoutAck(batchSize);
long batchId = message.getId();
List<CanalEntry.Entry> entryList = message.getEntries();
int size = message.getEntries().size();
if (batchId == -1 || entryList.isEmpty()) {
try {
// 线程休眠2秒
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
long nowOffset = entryList.get(0).getHeader().getLogfileOffset();
if (nowOffset <= lastOffset) {
continue;
}
try {
canalListener.onMessage(message);
canalConnector.ack(batchId);
// 保存最后消费的位点
lastOffset = message.getEntries().get(size - 1).getHeader().getLogfileOffset();
savePositionState(lastOffset);
} catch (Exception ex) {
log.error("consume error:{}", ex.getMessage());
}
}
}
// 获取并设置消费的起始位点
private long fetchFromPosition() {
// Canal 连接器连接
canalConnector.connect();
// 订阅数据变更
canalConnector.subscribe(canalFilter);
// 从存储中获取上次消费的位点
long position = getPositionState();
if (position != -1) {
// 回滚到上次保存的位点
canalConnector.rollback(position);
}
return position;
}
// 获取位点状态
private static long getPositionState() {
// TODO: 从存储中获取上次消费的位点
return -1;
}
// 保存位点状态
private static void savePositionState(long position) {
// TODO: 将 position 保存到存储中
}
}
至此消息消费的代码完成,从消费端可以看到,通过定义好的CanalConnector 一直向canal server 去拉取消息,完成消费,并且提交消息的ack,并将消费到的最新位点进行保存 ;
3 总结:
-
Canal 客户端通过不断向canal server 拉取对应topic 下的消息(pull 模型),从而获取到mysql 表数据修改的数据;并且当多个客户端,连接到同一个canal 服务端,如果此时客户端感兴趣的数据库和表是相同的,则只有一个客户端能够接收到具体的Binlog数据,从而避免重复消费;
-
如果canal客户端不提交ack,那么代表的是这些binlog事件没有被消费,并不会被标记为已处理,这样会导致重复消费数据和数据的延迟;当canal客户端从canal server拉取到binlog事件后,如果不手动提交ack确认信息,则canal server会认为这些事件未被消费,继续等待客户端发出确认信息。如果到了超时时间仍未收到确认信息,则canal server会将这些事件重新推送给客户端,从而导致重复消费数据。
此外,如果canal客户端不提交ack,那么可以会导致数据的延迟。因为canal server会认为客户端没有处理完当前批次的事件,不会主动推送新的事件给客户端,直到当前批次全部处理完毕并提交ack确认为止。如果客户端一直不提交ack,那么canal server就不能推送新的binlog事件,从而导致实时性受到影响。 -
即使拉取的数量没有达到canal.client.batch.size,客户端也会立即返回进行消费;
4 参考:
4.1 ClientExample 使用官网;