canal使用了Rocketmq来接收mysql采集的binlog的事件,做到采集和处理的解耦。同时满足一次采集多方消费的需求。那么既然使用到Rocketmq就一定会存在MQ消费超时或是处理失败MQ重发的问题。
那么canal是如何处理MQ重复消费幂等性问题的呢
一般,在业务上我们都会为每个消息生成一个uuid来标记这条消息的唯一性。在消费时业务表增加uuid字段或是MQ唯一表来判断是否已经处理过这条消息,如果消费过了就直接回给MQ ack。
但我们定义的t_user表中并没有用于检查唯一性的uuid字段。那canal是如何做的呢。
首先从canal接收RocketMQ的代码开始分析。
canal正对消费方做了不同的adapter实现,例:RdbAdapter、ESAdapter、HbaseAdapter
我们使用的mysql数据库,直接分析RdbAdapter
/**
* 同步方法
*
* @param dmls 数据包
*/
@Override
public void sync(List<Dml> dmls) {
if (dmls == null || dmls.isEmpty()) {
return;
}
try {
//rdb同步服务
rdbSyncService.sync(mappingConfigCache, dmls, envProperties);
rdbMirrorDbSyncService.sync(dmls);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
RdbSyncService
//dmlsPartition这里不展开分析,下文做分析
futures.add(executorThreads[i].submit(() -> {
try {
//通过多线程并行执行dmlsPartition的里dml
dmlsPartition[j].forEach(syncItem -> sync(batchExecutors[j],
syncItem.config,
syncItem.singleDml));
dmlsPartition[j].clear();
batchExecutors[j].commit();
return true;
} catch (Throwable e) {
dmlsPartition[j].clear();
batchExecutors[j].rollback();
throw new RuntimeException(e);
}
}));
/**
* 单条 dml 同步
*
* @param batchExecutor 批量事务执行器
* @param config 对应配置对象
* @param dml DML
*/
public void sync(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) {
if (config != null) {
try {
String type = dml.getType();
if (type != null && type.equalsIgnoreCase("INSERT")) {
//直接分析insert
insert(batchExecutor, config, dml);
} else if (type != null && type.equalsIgnoreCase("UPDATE")) {
update(batchExecutor, config, dml);
} else if (type != null && type.equalsIgnoreCase("DELETE")) {
delete(batchExecutor, config, dml);
} else if (type != null && type.equalsIgnoreCase("TRUNCATE")) {
truncate(batchExecutor, config);
}
if (logger.isDebugEnabled()) {
logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
canal在insert时,出现主键冲突时走了SQLException。skipDupException默认是=true,直接忽略了这个异常
/**
* 插入操作
*
* @param config 配置项
* @param dml DML数据
*/
private void insert(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
Map<String, Object> data = dml.getData();
if (data == null || data.isEmpty()) {
return;
}
DbMapping dbMapping = config.getDbMapping();
Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
StringBuilder insertSql = new StringBuilder();
insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append("`")
.append(targetColumnName)
.append("`")
.append(","));
int len = insertSql.length();
insertSql.delete(len - 1, len).append(") VALUES (");
int mapLen = columnsMap.size();
for (int i = 0; i < mapLen; i++) {
insertSql.append("?,");
}
len = insertSql.length();
insertSql.delete(len - 1, len).append(")");
Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
List<Map<String, ?>> values = new ArrayList<>();
for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
String targetColumnName = entry.getKey();
String srcColumnName = entry.getValue();
if (srcColumnName == null) {
srcColumnName = Util.cleanColumn(targetColumnName);
}
Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
if (type == null) {
throw new RuntimeException("Target column: " + targetColumnName + " not matched");
}
Object value = data.get(srcColumnName);
BatchExecutor.setValue(values, type, value);
}
try {
batchExecutor.execute(insertSql.toString(), values);
} catch (SQLException e) {
if (skipDupException
&& (e.getMessage().contains("Duplicate entry") || e.getMessage().startsWith("ORA-00001:"))) {
// ignore
// TODO 增加更多关系数据库的主键冲突的错误码
} else {
throw e;
}
}
if (logger.isTraceEnabled()) {
logger.trace("Insert into target table, sql: {}", insertSql);
}
}
结论
canal在处理MQ重复消费insert事件时,使用的是忽略的方式。当数据库存在这条数据时,数据库会返回Duplicate entry告诉canal这条数据已经在数据库里了。canal直接回复MQ ack就行了。
扩展: insert的批量插入
在源数据库中执行一条批量插入的sql,canal是怎么进行同步的。
insert into t_user (username,password,create_time,sex)
values ('1','1','2020-10-10',1) , ('1','1','2020-10-10',1);
回到canal的RdbAdapter的批量同步方法
/**
* 批量同步
*
* @param mappingConfig 配置集合
* @param dmls 批量 DML
*/
public void sync(Map<String, Map<String, MappingConfig>> mappingConfig, List<Dml> dmls, Properties envProperties) {
sync(dmls, dml -> {
if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) {
// DDL
columnsTypeCache.remove(dml.getDestination() + "." + dml.getDatabase() + "." + dml.getTable());
return false;
} else {
// DML
......
for (MappingConfig config : configMap.values()) {
boolean caseInsensitive = config.getDbMapping().isCaseInsensitive();
if (config.getConcurrent()) {
//将批量的多个values数据转换成了一条条单个的insert
List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml, caseInsensitive);
singleDmls.forEach(singleDml -> {
int hash = pkHash(config.getDbMapping(), singleDml.getData());
SyncItem syncItem = new SyncItem(config, singleDml);
dmlsPartition[hash].add(syncItem);
});
} else {
int hash = 0;
List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml, caseInsensitive);
singleDmls.forEach(singleDml -> {
SyncItem syncItem = new SyncItem(config, singleDml);
dmlsPartition[hash].add(syncItem);
});
}
}
return true;
}
} );
}
继续分析dmlsPartition的作用
关键代码
List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml, caseInsensitive);
singleDmls.forEach(singleDml -> {
int hash = pkHash(config.getDbMapping(), singleDml.getData());
SyncItem syncItem = new SyncItem(config, singleDml);
dmlsPartition[hash].add(syncItem);
});
canal将单条批量insert的sql,转换成了多条单个的insert。并将每条的主键pk和处理线程数threads做hash(pk % threads)放入不通的分区,多线程执行提高canal的处理能力。