1、canal是什么,可以用来作什么
canal是阿里开源的一个用于监听数据库binlog,从而实现数据同步的工具。
2、安装
我使用的是1.1.5版本,太高的版本需要的jdk版本和mysql的驱动版本会更高,可以根据自己的环境选择。
如果是自己玩的话安装
canal.deployer-1.1.5.tar.gz就可以了
地址:
Release v1.1.5 · alibaba/canal · GitHub
3、springboot+mysql+canal实现数据同步可以在网上找到很多博客,不在赘述
4、源码梳理
(1)、既然用到springboot,肯定有一个自动注入的autoconfigure的start。
可以看到spring.factories会自动注入几个client。
(2)、找到一个看着顺眼的client进去看看:
-
我选择的是SimpleClientAutoConfiguration
@Configuration
@EnableConfigurationProperties({CanalSimpleProperties.class})
@ConditionalOnBean({EntryHandler.class})
@ConditionalOnProperty(
value = {"canal.mode"},
havingValue = "simple",
matchIfMissing = true
)
@Import({ThreadPoolAutoConfiguration.class})
public class SimpleClientAutoConfiguration {
private CanalSimpleProperties canalSimpleProperties;
public SimpleClientAutoConfiguration(CanalSimpleProperties canalSimpleProperties) {
this.canalSimpleProperties = canalSimpleProperties;
}
@Bean
public RowDataHandler<RowData> rowDataHandler() {
return new RowDataHandlerImpl(new EntryColumnModelFactory());
}
@Bean
@ConditionalOnProperty(
value = {"canal.async"},
havingValue = "true",
matchIfMissing = true
)
public MessageHandler messageHandler(RowDataHandler<RowData> rowDataHandler, List<EntryHandler> entryHandlers, ExecutorService executorService) {
return new AsyncMessageHandlerImpl(entryHandlers, rowDataHandler, executorService);
}
@Bean
@ConditionalOnProperty(
value = {"canal.async"},
havingValue = "false"
)
public MessageHandler messageHandler(RowDataHandler<RowData> rowDataHandler, List<EntryHandler> entryHandlers) {
return new SyncMessageHandlerImpl(entryHandlers, rowDataHandler);
}
@Bean(
initMethod = "start",
destroyMethod = "stop"
)
public SimpleCanalClient simpleCanalClient(MessageHandler messageHandler) {
String server = this.canalSimpleProperties.getServer();
String[] array = server.split(":");
return SimpleCanalClient.builder().hostname(array[0]).port(Integer.parseInt(array[1])).destination(this.canalSimpleProperties.getDestination()).userName(this.canalSimpleProperties.getUserName()).password(this.canalSimpleProperties.getPassword()).messageHandler(messageHandler).batchSize(this.canalSimpleProperties.getBatchSize()).filter(this.canalSimpleProperties.getFilter()).timeout(this.canalSimpleProperties.getTimeout()).unit(this.canalSimpleProperties.getUnit()).build();
}
}
看到会注入SimpleCanalClient。并且指明了初始化方法和销毁的方法。进去看看。发现是继承了一个抽象的client,这个类是关键,内部有start和stop的具体实现。
很明显,start就是启动一个线程 while(true)的去循环执行binlog的获取和处理。
如何获取的代码没有跟进,但是可以猜到,应该是通过连接然后去获取数据。
-
这里着重看一下处理数据的代码:
public abstract class AbstractMessageHandler implements MessageHandler<Message> {
private Map<String, EntryHandler> tableHandlerMap;
private RowDataHandler<CanalEntry.RowData> rowDataHandler;
public AbstractMessageHandler(List<? extends EntryHandler> entryHandlers, RowDataHandler<CanalEntry.RowData> rowDataHandler) {
this.tableHandlerMap = HandlerUtil.getTableHandlerMap(entryHandlers);
this.rowDataHandler = rowDataHandler;
}
@Override
public void handleMessage(Message message) {
List<CanalEntry.Entry> entries = message.getEntries(); 第一步
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA)) { 第二步
try {
EntryHandler<?> entryHandler = HandlerUtil.getEntryHandler(tableHandlerMap, entry.getHeader().getTableName()); 第三步
if(entryHandler!=null){
CanalModel model = CanalModel.Builder.builder().id(message.getId()).table(entry.getHeader().getTableName())
.executeTime(entry.getHeader().getExecuteTime()).database(entry.getHeader().getSchemaName()).build();
CanalContext.setModel(model);
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList(); 第四步
CanalEntry.EventType eventType = rowChange.getEventType();
for (CanalEntry.RowData rowData : rowDataList) {
rowDataHandler.handlerRowData(rowData,entryHandler,eventType);
}
}
} catch (Exception e) {
throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
}finally {
CanalContext.removeModel();
}
}
}
}
}
-
进入rowDataHandler.handlerRowData(maps, entryHandler, eventType);实现类选择的是RowDataHandlerImpl。
public class RowDataHandlerImpl implements RowDataHandler<CanalEntry.RowData> {
private IModelFactory<List<CanalEntry.Column>> modelFactory;
public RowDataHandlerImpl(IModelFactory modelFactory) {
this.modelFactory = modelFactory;
}
@Override
public <R> void handlerRowData(CanalEntry.RowData rowData, EntryHandler<R> entryHandler, CanalEntry.EventType eventType) throws Exception {
if (entryHandler != null) {
switch (eventType) {
case INSERT:
R object = modelFactory.newInstance(entryHandler, rowData.getAfterColumnsList());
entryHandler.insert(object);
break;
case UPDATE:
Set<String> updateColumnSet = rowData.getAfterColumnsList().stream().filter(CanalEntry.Column::getUpdated)
.map(CanalEntry.Column::getName).collect(Collectors.toSet());
R before = modelFactory.newInstance(entryHandler, rowData.getBeforeColumnsList(),updateColumnSet);
R after = modelFactory.newInstance(entryHandler, rowData.getAfterColumnsList());
entryHandler.update(before, after);
break;
case DELETE:
R o = modelFactory.newInstance(entryHandler, rowData.getBeforeColumnsList());
entryHandler.delete(o);
break;
default:
break;
}
}
}
}
回想一下springboot中使用canal的时候,会有一个注解@CanalTable和一个实现类EntryHandler。
这里的代码要做的就是(1)、匹配合适的语句类型(insert、delete、update)。(2)、insert和delete只需要记录一下操作的值;update需要记录一下修改前和修改后的值。也很好理解,insert和delete回滚只需要反向重放代码就行,而update需要知道之前的数据采集重新update。
-
进入newInstance方法,选择AbstractModelFactory:
public abstract class AbstractModelFactory<T> implements IModelFactory<T> {
@Override
public <R> R newInstance(EntryHandler entryHandler, T t) throws Exception {
String canalTableName = HandlerUtil.getCanalTableName(entryHandler);
if (TableNameEnum.ALL.name().toLowerCase().equals(canalTableName)) {
return (R) t;
}
Class<R> tableClass = GenericUtil.getTableClass(entryHandler);
if (tableClass != null) {
return newInstance(tableClass, t);
}
return null;
}
abstract <R> R newInstance(Class<R> c, T t) throws Exception;
}
重点来了,有两个HandlerUtil.getCanalTableName和GenericUtil.getTableClass。还记得咱们再springboot中的代码会指定 @CanalTable 处理的是那个表和EntryHandler泛型吗。
第一步判断这个EntryHandler实现类有没有指定要处理那个表,如果指定了All。那么就要就走自定义的返回值,这个返回值通常不是我们需要的。所以在使用中一定尽量指定要处理的表。
第二步需要匹配EntryHandler中的泛型类进行赋值操作了。
-
最后进入newInstance方法:
public class EntryColumnModelFactory extends AbstractModelFactory<List<CanalEntry.Column>> {
......
@Override
<R> R newInstance(Class<R> c, List<CanalEntry.Column> columns) throws Exception {
R object = c.newInstance();
Map<String, String> columnNames = EntryUtil.getFieldName(object.getClass());
for (CanalEntry.Column column : columns) {
String fieldName = columnNames.get(column.getName());
if (StringUtils.isNotEmpty(fieldName)) {
FieldUtil.setFieldValue(object, fieldName, column.getValue());
}
}
return object;
}
}
代码比较简单,通过反射给对象赋值。如果不太清楚这里是怎么把数据解析出来的,可以自己搭建起来服务执行一下看看canal返回的结构体,我下边也提出来我的返回,并且我也会将上边代码中和数据解析的地方标红。
获取消息 Message[id=14,entries=[header {
version: 1
logfileName: "mysql-bin.000004"
logfileOffset: 19806
serverId: 1
serverenCode: "UTF-8"
executeTime: 1706838103000
sourceType: MYSQL
schemaName: ""
tableName: ""
eventLength: 80
}
entryType: TRANSACTIONBEGIN
storeValue: " 9"
, header {
version: 1
logfileName: "mysql-bin.000004"
logfileOffset: 19939
serverId: 1
serverenCode: "UTF-8"
executeTime: 1706838103000
sourceType: MYSQL
schemaName: "test"
tableName: "first"
eventLength: 53
eventType: INSERT
props {
key: "rowsCount"
value: "1"
}
}
entryType: ROWDATA
storeValue: "\b\341\001\020\001P\000b\203\001\022\"\b\000\020\373\377\377\377\377\377\377\377\377\001\032\002id \001(\0010\000B\0016R\006bigint\022%\b\001\020\f\032\aaddress \000(\0010\000B\003333R\vvarchar(10)\0226\b\002\020]\032\vcreate_time \000(\0010\000B\0232024-02-02 09:41:43R\bdatetime"
, header {
version: 1
logfileName: "mysql-bin.000004"
logfileOffset: 19992
serverId: 1
serverenCode: "UTF-8"
executeTime: 1706838103000
sourceType: MYSQL
schemaName: ""
tableName: ""
eventLength: 31
}
entryType: TRANSACTIONEND
storeValue: "\022\006381841"
],raw=false,rawEntries=[]]
至此,在springboot中通过canal获取binlog的日志并且解析为自定义的entry对象的流程就已经分析、梳理完了。至于后续要怎么处理就有很多的方式了。
最后在分享一个idea跟踪源码的小技巧:
比如我们看到一个比较重要的注解,但是不知道这个注解具体实现在哪里,可以进入注解中,选中注解名称,然后选择Find Usages。就可以看到哪里使用了。