文章目录
- 1 问题背景
- 2 前言
- 3 思考
- 4 解决思路
- 5 交互
- 6 工作原理
- 7 伪代码实现
- 7.1 安装并配置Canal Server
- 7.2 Canal客户端拉取MQ消息
- 7.3 Canal数据的转换
- 7.4 定制自己的业务逻辑
1 问题背景
有时候客户做了某些操作却不认账,咱们又拿不出证据;有时候客户将账号授权给了别人,客户想要知道期间别人做了什么操作;有些重要操作需要记录起来,等等。以上这些情况都需要我们实现一个操作日志的功能,使用技术的手段获取控制权,使自己处于一个可进可退的处境,证据完全掌握在自己手中。
2 前言
- 本文阐述的是企业级电商解决方案,非自学项目或者八股文或者小demo,解决方案中的任何细节都是站在企业级开发的角度去思考的。解决方案都会尽量从可读性、易用性、高扩展性、统一管理性去考虑。
- 操作日志讲述的是在B端的任何操作都需要持久化起来,非C端
3 思考
- 怎么实现?每个业务逻辑都写一个持久化操作吗?这样不可行,不够统一,容易忘记写,企业级开发是有很多开发者协同开发的。而且业务也复杂,在业务逻辑处写会降低可读性和扩展性。
- 有没有什么组件可以自主感知到每一次数据库操作,然后就可以做自己任意想要做的持久化了。
4 解决思路
使用阿里的开源组件Canal,Canal充当一个MySQL的Slave,订阅binlog。canal有很多种模式,笔者此处使用RocketMQ模式。详情可以了解Canal在Github上介绍。
5 交互
从Canal的github上拿的图,这个交互是最清晰明了的:
6 工作原理
先了解MySQL主备复制原理:
- Master将数据变更写入二进制日志(binary log,其中记录叫做二进制日志事件binary log events),可以通过
show binlog events
查看 - Slave将Master的binary log拷贝到它的中继日志(relay log)
- Slave重放relay log中事件,将数据变更反映它自己的数据
Canal工作原理
- Canal模拟MySQL Slave协议,将自己伪装成Slave,向MySQL Master发送dump协议
- MySQL Master收到dump请求,开始推送binary log给Slave(即Canal)
- Canal解析binary log(原始是byte数据流)
7 伪代码实现
- 此处简单阐述如何用SpringBoot整合Canal,Canal模式使用RocketMQ。
- canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ
7.1 安装并配置Canal Server
详情见官方操作文档
instance配置表中,要配置你想做操作日志的数据库表以及过滤字段的名单,如下所示:
canal.instance.filter.regex=product\\..*,user\\..*,order\\..*
canal.instance.filter.black.field=product.t_product:product_short_desc/product_long_desc/special_return_desc,user.t_user:description,order.t_order:order_desc/order_remark
canal.mq.topic=canal_shoplus_log
canal.instance.filter.regex
表示要对哪些库的哪些表做操作日志的实现;canal.instance.filter.black.field
表示无需做操作日志实现的字段
canal.mq.topic
表示Canal将binary log数据发到MQ的哪个topic
7.2 Canal客户端拉取MQ消息
笔者采用了Canal的MQ模式,因此Canal Server会将MySQL的binary log丢给MQ,我们需要做的是拉取MQ消息来消费,做一个持久化就能实现操作日志了。
canal.client下有对应的MQ数据消费的样例工程,包含数据编解码的功能
- kafka模式: com.alibaba.otter.canal.example.kafka.CanalKafkaClientExample
- rocketMQ模式: com.alibaba.otter.canal.example.rocketmq.CanalRocketMQClientExample
由于代码篇幅过长,详情可以看上面贴出的地址,下面给出关键的代码:
List<Message> messages = connector.getListWithoutAck(1000L, TimeUnit.MILLISECONDS);
解释:Message保存的数据变更是二进制格式,需要使用工具类转换成对象才能更方便写业务逻辑,接下来的小节会给出Canal官方的工具类方法。
7.3 Canal数据的转换
由于Canal传输的数据都是二进制的,我们在代码里面一般都是面向对象来写业务逻辑,因此需要将二进制数据转成一个对象结构,Canal有提供工具类,该工具方法是
com.alibaba.otter.canal.connector.core.producer.MQMessageUtils#messageConverter
,需要下载Canal源码才能找到。下面贴出该工具方法:
/**
* 将Message转换为FlatMessage
*
* @return FlatMessage列表
* @author agapple 2018年12月11日 下午1:28:32
*/
public static List<FlatMessage> messageConverter(EntryRowData[] datas, long id) {
List<FlatMessage> flatMessages = new ArrayList<>();
for (EntryRowData entryRowData : datas) {
CanalEntry.Entry entry = entryRowData.entry;
CanalEntry.RowChange rowChange = entryRowData.rowChange;
// 如果有分区路由,则忽略begin/end事件
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
// build flatMessage
CanalEntry.EventType eventType = rowChange.getEventType();
FlatMessage flatMessage = new FlatMessage(id);
flatMessages.add(flatMessage);
flatMessage.setDatabase(entry.getHeader().getSchemaName());
flatMessage.setTable(entry.getHeader().getTableName());
flatMessage.setIsDdl(rowChange.getIsDdl());
flatMessage.setType(eventType.toString());
flatMessage.setEs(entry.getHeader().getExecuteTime());
flatMessage.setTs(System.currentTimeMillis());
flatMessage.setSql(rowChange.getSql());
flatMessage.setGtid(entry.getHeader().getGtid());
if (!rowChange.getIsDdl()) {
Map<String, Integer> sqlType = new LinkedHashMap<>();
Map<String, String> mysqlType = new LinkedHashMap<>();
List<Map<String, String>> data = new ArrayList<>();
List<Map<String, String>> old = new ArrayList<>();
Set<String> updateSet = new HashSet<>();
boolean hasInitPkNames = false;
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE
&& eventType != CanalEntry.EventType.DELETE) {
continue;
}
Map<String, String> row = new LinkedHashMap<>();
List<CanalEntry.Column> columns;
if (eventType == CanalEntry.EventType.DELETE) {
columns = rowData.getBeforeColumnsList();
} else {
columns = rowData.getAfterColumnsList();
}
for (CanalEntry.Column column : columns) {
if (!hasInitPkNames && column.getIsKey()) {
flatMessage.addPkName(column.getName());
}
sqlType.put(column.getName(), column.getSqlType());
mysqlType.put(column.getName(), column.getMysqlType());
if (column.getIsNull()) {
row.put(column.getName(), null);
} else {
row.put(column.getName(), column.getValue());
}
// 获取update为true的字段
if (column.getUpdated()) {
updateSet.add(column.getName());
}
}
hasInitPkNames = true;
if (!row.isEmpty()) {
data.add(row);
}
if (eventType == CanalEntry.EventType.UPDATE) {
Map<String, String> rowOld = new LinkedHashMap<>();
for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
if (updateSet.contains(column.getName())) {
if (column.getIsNull()) {
rowOld.put(column.getName(), null);
} else {
rowOld.put(column.getName(), column.getValue());
}
}
}
// update操作将记录修改前的值
old.add(rowOld);
}
}
if (!sqlType.isEmpty()) {
flatMessage.setSqlType(sqlType);
}
if (!mysqlType.isEmpty()) {
flatMessage.setMysqlType(mysqlType);
}
if (!data.isEmpty()) {
flatMessage.setData(data);
}
if (!old.isEmpty()) {
flatMessage.setOld(old);
}
}
}
return flatMessages;
}
7.4 定制自己的业务逻辑
根据前面用工具类方法拿到的List<FlatMessage>
就可以写自己的业务逻辑了。