分析CanalRocketMQProducer.send
canal发送消息到RocketMQ使用到了partitionNum、partitionHash
通过partitionHash可以把消息发送到RocketMQ的不同分区上,因为同一个分区在消费时有序的
public void send(final MQDestination destination, String topicName, com.alibaba.otter.canal.protocol.Message message) {
// 获取当前topic的分区数
Integer partitionNum = MQMessageUtils.parseDynamicTopicPartition(topicName,
destination.getDynamicTopicPartitionNum());
if (partitionNum == null) {
partitionNum = destination.getPartitionsNum();
}
if (!mqProperties.isFlatMessage()) {
......
} else {
// 并发构造
MQMessageUtils.EntryRowData[] datas = MQMessageUtils.buildMessageData(message, buildExecutor);
// 串行分区
List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(datas, message.getId());
// 初始化分区合并队列
if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
List<List<FlatMessage>> partitionFlatMessages = new ArrayList<>();
for (int i = 0; i < destination.getPartitionsNum(); i++) {
partitionFlatMessages.add(new ArrayList<>());
}
for (FlatMessage flatMessage : flatMessages) {
FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
partitionNum,
destination.getPartitionHash(),
mqProperties.isDatabaseHash());
int length = partitionFlatMessage.length;
for (int i = 0; i < length; i++) {
// 增加null判断,issue #3267
if (partitionFlatMessage[i] != null) {
partitionFlatMessages.get(i).add(partitionFlatMessage[i]);
}
}
}
ExecutorTemplate template = new ExecutorTemplate(sendPartitionExecutor);
for (int i = 0; i < partitionFlatMessages.size(); i++) {
final List<FlatMessage> flatMessagePart = partitionFlatMessages.get(i);
if (flatMessagePart != null && flatMessagePart.size() > 0) {
final int index = i;
template.submit(() -> {
List<Message> messages = flatMessagePart.stream()
.map(flatMessage -> new Message(topicName,
((RocketMQProducerConfig) this.mqProperties).getTag(),
JSON.toJSONBytes(flatMessage, SerializerFeature.WriteMapNullValue)))
.collect(Collectors.toList());
// 批量发送
sendMessage(messages, index);
});
}
}
// 批量等所有分区的结果
template.waitForResult();
} else {
final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
List<Message> messages = flatMessages.stream()
.map(flatMessage -> new Message(topicName,
((RocketMQProducerConfig) this.mqProperties).getTag(),
JSON.toJSONBytes(flatMessage, SerializerFeature.WriteMapNullValue)))
.collect(Collectors.toList());
// 批量发送
sendMessage(messages, partition);
}
}
}
partitionHash计算公式
Math.abs(pk.hashCode) % partitionsNum
分析计算公式实现MQMessageUtils.messagePartition
List<String> pkNames = hashMode.pkNames;
if (hashMode.autoPkHash) {
pkNames = flatMessage.getPkNames();
}
int idx = 0;
for (Map<String, String> row : flatMessage.getData()) {
int hashCode = 0;
if (databaseHash) {
hashCode = database.hashCode();
}
if (pkNames != null) {
for (String pkName : pkNames) {
String value = row.get(pkName);
if (value == null) {
value = "";
}
hashCode = hashCode ^ value.hashCode();
}
}
int pkHash = Math.abs(hashCode) % partitionsNum;
// math.abs可能返回负值,这里再取反,把出现负值的数据还是写到固定的分区,仍然可以保证消费顺序
pkHash = Math.abs(pkHash);
RocketMQ client发送消息时指定了partition
private void sendMessage(Message message, int partition) {
try {
SendResult sendResult = this.defaultMQProducer.send(message, (mqs, msg, arg) -> {
if (partition >= mqs.size()) {
return mqs.get(partition % mqs.size());
} else {
return mqs.get(partition);
}
}, null);
if (logger.isDebugEnabled()) {
logger.debug("Send Message Result: {}", sendResult);
}
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
partitionHash表达式如何配置
分析MQMessageUtils.getPartitionHashColumns,从partitionDatas中获取配置
public static HashMode getPartitionHashColumns(String name, String pkHashConfigs) {
if (StringUtils.isEmpty(pkHashConfigs)) {
return null;
}
List<PartitionData> datas = partitionDatas.get(pkHashConfigs);
for (PartitionData data : datas) {
if (data.simpleName != null) {
if (data.simpleName.equalsIgnoreCase(name)) {
return data.hashMode;
}
} else {
if (data.regexFilter.filter(name)) {
return data.hashMode;
}
}
}
return null;
}
代码中认为一个冒号后面的表达式为pkHash的表达式。使用$pk$变量名来表示取主键,当然也可以自定义表达式
Map<String, List<PartitionData>> partitionDatas = MigrateMap.makeComputingMap(CacheBuilder.newBuilder()
.softValues(),
pkHashConfigs -> {
List<PartitionData> datas = Lists.newArrayList();
String[] pkHashConfigArray = StringUtils.split(StringUtils.replace(pkHashConfigs,
",",
";"),
";");
// schema.table:id^name
for (String pkHashConfig : pkHashConfigArray) {
PartitionData data = new PartitionData();
int i = pkHashConfig.lastIndexOf(":");
if (i > 0) {
String pkStr = pkHashConfig.substring(i + 1);
// 变量名
if (pkStr.equalsIgnoreCase("$pk$")) {
data.hashMode.autoPkHash = true;
} else {
//自定义表达式 val1 ^ val2 ^ val3
data.hashMode.pkNames = Lists.newArrayList(StringUtils.split(pkStr,
'^'));
}
pkHashConfig = pkHashConfig.substring(0,
i);
} else {
data.hashMode.tableHash = true;
}
if (!isWildCard(pkHashConfig)) {
data.simpleName = pkHashConfig;
} else {
data.regexFilter = new AviaterRegexFilter(pkHashConfig);
}
datas.add(data);
}
return datas;
});
所以通用的多表分区表达式如下
.*\\..*:$pk$
实践
修改rocketmq分区
修改instance配置
测试查询消息分区后的结构
查询rocketmq