1.概叙
场景一:数据增量实时同步
项目中业务数据量比较大,每类业务表都达到千万级别,虽然做了分库分表,每张表数据控制在300W以下,但是效率还是达不到要求,为了提高查询效率,打算使用ES进行数据查询。
那么这个时候问题来了,怎么把MySQL中增量数据同步到ES?
场景二:缓存一致性问题
Java web应用性能分析之【高并发之缓存-多级缓存】_java多级缓存-CSDN博客
在前面文章中有提到这个场景,如何保证redis、EhCache、MySQL数据一致?
我们都知道作为数据库写操作,是不通过缓存的。假设商品服务实例 1 将 1 号商品价格调整为 80 元,这会衍生一个新问题:如何主动向应用程序推送数据变更的消息来保证它们也能同步更新缓存呢?
针对上面两种场景,可以看看canal的解决方案。
什么是Canal?
canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。项目起源于阿里巴巴内部对于跨机房数据同步的需求,通过解析MySQL的二进制日志(Binary Log),Canal能够捕获并推送数据库的变更事件,满足了诸如数据库镜像、实时备份、索引实时维护等多种业务场景的需求。
GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件
Home · alibaba/canal Wiki · GitHub
支持范围
Canal当前支持MySQL数据库的多个版本,包括但不限于5.1.x、5.5.x、5.6.x、5.7.x及8.0.x,同时也兼容阿里云RDS等云数据库服务,为用户提供了广泛的数据库兼容性保障。
部分支持MySQL体系数据库:Mariadb 10.x、PolarDB-X
主要特性
高性能与低延迟:Canal 1.1.x版本进行了深度优化,性能提升高达150%。
Prometheus监控:原生集成Prometheus监控,便于系统健康状况的跟踪。
消息系统集成:直接支持Kafka、RocketMQ消息投递,便于与大数据平台对接。
云数据库支持:无缝对接阿里云RDS,解决了自动主备切换及离线Binlog解析问题。
Docker部署:提供Docker镜像,简化部署流程。
WebUI管理:Canal-Admin工程引入WebUI,实现动态配置、任务管理与日志查看等功能。
2.Canal原理
MySQL主备复制原理
-
MySQL master 将数据变更写入二进制日志(binary log), 日志中的记录叫做二进制日志事件(binary log events,可以通过 show binlog events 进行查看)
-
MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
-
MySQL slave 重放 relay log 中事件,将数据变更反映到它自己的数据
Canal工作原理
Canal巧妙地模拟了MySQL主从复制的机制。具体而言:
- 伪装为MySQL Slave:canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
- 获取Binary Log:MySQL Master接收到请求后,开始推送Binary Log给Canal。
- 解析日志事件:Canal解析接收到的Binary Log,将数据变更信息转换为易于处理的结构化数据。canal 解析 binary log 对象(原始为 byte 流),转换为json格式。
- 数据同步:Canal 客户端通过 TCP 协议或 MQ 形式监听 Canal 服务端,同步数据到 ES、KFK、HBase、RocketMQ、Pulsar等。
优点: 可以完全和业务代码解耦,增量日志订阅。
缺点:实时性不高,订阅mysql日志,DB中数据事务成功后,开始同步至canal。
Canal总体架构
对应这些软件包:
- deployer包:即canal server,负责从master库同步binlog,将数据通过tcp的方式同步给Adapter适配器,经过Adapter适配同步给目标库;通过MQ将数据同步给canal client或目标库,canal client也可以再同步给目标库。
- server包官方介绍:https://github.com/alibaba/canal/wiki/DevGuide
- adapter包:给目标库同步数据。目前支持:Hbase、RDB、ES。
- adapter包官方介绍:https://github.com/alibaba/canal/wiki/ClientAdapter
- admin包:canal 1.1.4版本,迎来最重要的WebUI能力,引入canal-admin工程,支持面向WebUI的canal动态管理能力,支持配置、任务、日志等在线白屏运维能力,具体文档:Canal Admin Guide
- admin包官方介绍:https://github.com/alibaba/canal/wiki/Canal-Admin-Guide
- canal.client:消费server数据
- Canal设计了client-server架构,支持多种语言客户端通过protobuf 3.0协议与之交互,官方及社区提供了以下客户端:
- Java客户端:ClientExample
- C#客户端:CanalSharp
- Go客户端:canal-go
- Python客户端:canal-python
- PHP客户端:canal-php
- Rust客户端:canal-rs
- Node.js客户端:canal-nodejs
-
除了基础功能,Canal还支持丰富的进阶特性和周边生态工具,如:
Canal-Admin:提供Web界面管理Canal实例,实现配置、监控和运维的可视化操作。
canal2sql:一个工具项目,能根据Binlog生成SQL,便于数据迁移或备份。
Otter:Canal的消费端开源项目,用于数据同步与数据集成。
Canal高可用架构
整个 HA 机制的控制主要是依赖了zookeeper的两个特性:watcher、EPHEMERAL节点。canal的 HA 机制实现分为两部分,canal server 和 canal client分别有对应的实现。
canal server实现流程如下:
- 1、canal server 要启动某个 canal instance 时都先向 zookeeper 进行一次尝试启动判断 (实现:创建 EPHEMERAL 节点,谁创建成功就允许谁启动)。
- 2、创建 zookeeper 节点成功后,对应的 canal server 就启动对应的 canal instance,没有创建成功的 canal instance 就会处于 standby 状态。
- 3、一旦 zookeeper 发现 canal server A 创建的节点消失后,立即通知其他的 canal server 再次进行步骤1的操作,重新选出一个 canal server 启动instance。
- 4、canal client 每次进行connect时,会首先向 zookeeper 询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect。
为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态。
依赖:
- JDK1.8
- MySQL:用于canal-admin存储配置和节点等相关数据
- Zookeeper
3.Canal实战
准备环境
名称 | 版本 |
---|---|
MySQL | 5.7 |
elasticsearch | 7.17.9 |
canal | 1.1.7 |
jdk | 1.8 |
-
对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下
[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
- 注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
-
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
下载 canal
本次操作只需要下载admin、deployer两个包。
下载后解压即可用,当然,要运行起来的话,需要配置jdk1.8,这里就不再多说。
创建admin的数据库:conf/canal_manager.sql
启动admin前,先完成建库和见表(包括admin的默认登录账号密码 admin/123456),否则启动报错。主要是如下六张表。
修改配置文件
修改server的配置文件:conf/canal.properties 和conf/example/instance.properties
切记数据库和配置文件中的密码要一致。
修改admin的配置文件:conf/application.yml
分别启动admin和server
启动server:bin/startup.bat
启动admin:bin/startup.bat
登录admin:http://192.168.1.4:8089/#/login?redirect=%2Fdashboard
默认账号密码:admin/123456
配置java客户端
引入依赖
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
客户端代码
package com.zxx.study.base.canal;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.SneakyThrows;
import java.net.InetSocketAddress;
import java.util.List;
/**
* @author zhouxx
* @create 2024-08-01 21:59
*/
public class CanalClient {
@SneakyThrows
public static void main(String[] args) {
try {
// 创建canal客户端,单链接模式
CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.1.4",
11111), "example", "", "");
// 创建连接
canalConnector.connect();
while (true) {
// 订阅数据库
// canalConnector.subscribe("mall");
// 获取数据
Message message = canalConnector.get(100);
// 获取Entry集合
List<CanalEntry.Entry> entries = message.getEntries();
// 判断集合是否为空,如果为空,则等待一会继续拉取数据
if (entries.size() <= 0) {
// System.out.println("当次抓取没有数据,休息一会。。。。。。");
Thread.sleep(1000);
} else {
// 遍历entries,单条解析
for (CanalEntry.Entry entry : entries) {
//1.获取表名
String tableName = entry.getHeader().getTableName();
//2.获取类型
CanalEntry.EntryType entryType = entry.getEntryType();
//3.获取序列化后的数据
ByteString storeValue = entry.getStoreValue();
//4.判断当前entryType类型是否为ROWDATA
if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
//5.反序列化数据
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
//6.获取当前事件的操作类型
CanalEntry.EventType eventType = rowChange.getEventType();
//7.获取数据集
List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
//8.遍历rowDataList,并打印数据集
for (CanalEntry.RowData rowData : rowDataList) {
JSONObject beforeData = new JSONObject();
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
for (CanalEntry.Column column : beforeColumnsList) {
beforeData.put(column.getName(), column.getValue());
}
JSONObject afterData = new JSONObject();
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
for (CanalEntry.Column column : afterColumnsList) {
afterData.put(column.getName(), column.getValue());
}
//数据打印
System.out.println("Table:" + tableName +
",EventType:" + eventType +
",Before:" + beforeData +
",After:" + afterData);
/**
* Table:test_user,EventType:UPDATE,Before:{"name":"1111221","id":"1"},After:{"name":"11tom","id":"1"}
* Table:test_user,EventType:INSERT,Before:{},After:{"name":"zhouxx","id":"17"}
* Table:test_user,EventType:INSERT,Before:{},After:{"name":"zhouxx1","id":"18"}
* */
}
}
}
}
}
}catch (Exception e){
e.printStackTrace();
}
}
}
运行效果:在数据库里面忝删改查,均可以在客户端中打印出来
cancel框架同步mysql数据到kafka
参考:cancel框架同步mysql数据到kafka_mysql cancel-CSDN博客
利用canal进行MySQL到ES的数据实时同步
参考:利用canal进行MySQL到ES的数据实时同步_canal es-CSDN博客
结合上图,至此,场景一和场景二中的问题,均可以解决。