Canal实时同步Mysql
Binlog至
Elasticsearch
文章目录
-
Canal实时同步Mysql **Binlog**至**Elasticsearch** - 一. 环境准备
- 1.环境检查
- 检查`Mysql`是否开启`BinLog`
- 开启Mysql Binlog
- Java环境检查
- 2.新建测试库和表
- 3.新建Es索引
- 二.**部署 Canal Server**
- **2.1 解压安装包**
- **2.2 配置 Canal Server**
- **2.3 启动 Canal Server**
- **三. 部署 Canal Adapter(同步到 Elasticsearch)**
- **3.1 配置 Adapter**
- **3.2 配置数据映射**
- **3.3 启动 Adapter**
- **4. 验证同步**
- **4.1 插入测试数据到 MySQL**
- **4.2 查询 Elasticsearch**
一. 环境准备
- 操作系统:Linux(Ubuntu 20.04)
- Java 环境:JDK 8+(建议 OpenJDK 11)
- MySQL:已启用 Binlog(ROW 模式),并创建 Canal 用户
- Elasticsearch:已部署(版本 7.x 或 8.x)
- Canal 二进制包:从 Canal Release 下载
canal.deployer-1.1.8.tar.gz
和canal.adapter-1.1.8.tar.gz
1.环境检查
#root账号执行
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';
输出如下证明已经打开:
创建 Canal 用户并授权:
#创建用户
CREATE USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'Password@123';
# 给新创建账户赋予从库权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
# 刷新权限
FLUSH PRIVILEGES;
如果没打开BinLog
可以通过如下方法打开:
修改my.cnf
文件,加入如下内容:
log_bin=mysql-bin
binlog_format=ROW
binlog_expire_logs_seconds=172800
expire_logs_days=2
log_bin
:启用二进制日志,日志文件会以mysql-bin
为前缀,并依次生成日志文件(例如:mysql-bin.000001
,mysql-bin.000002
等)。
binlog_format
:设置使用的二进制日志格式,在 MySQL 8.0 版本中,binlog_format
的默认值已经变为ROW
。所以,即使你在配置文件中没有明确设置binlog_format
,MySQL 会默认使用ROW
作为二进制日志格式。在较早的 MySQL 版本中默认值是STATEMENT
。
binlog_expire_logs_seconds=172800
和expire_logs_days=2
:这些配置设置了二进制日志的过期时间(默认情况下,MySQL 会保留二进制日志,直到它们过期或达到日志文件数的限制)。在这种情况下,日志会在 2 天后过期。
配置好后重启Mysql
:
systemctl restart mysqld.service
echo $JAVA_HOME
![image-20250211111637904](https://i-blog.csdnimg.cn/img_convert/1f93ac205597669078346a90fa95fa8b.png)
2.新建测试库和表
CREATE DATABASE IF NOT EXISTS canal default charset utf8 COLLATE utf8_general_ci;
CREATE TABLE `test_user` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '姓名',
`sex` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '性别',
`tel` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '电话',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
3.新建Es索引
curl -X PUT "http://<your es IP>:9200/test_user" -H 'Content-Type: application/json' -u <es账号>:<es 密码> -d'
{
"mappings": {
"properties": {
"id": {
"type": "long"
},
"title": {
"type": "text"
},
"sex": {
"type": "text"
},
"tel": {
"type": "text"
}
}
}
}
'
二.部署 Canal Server
2.1 解压安装包
# 创建目录
mkdir -p /opt/canal/server /opt/canal/adapter
# 解压 Server
tar -zxvf canal.deployer-1.1.8.tar.gz -C /opt/canal/server
# 解压 Adapter
tar -zxvf canal.adapter-1.1.8.tar.gz -C /opt/canal/adapter
2.2 配置 Canal Server
修改配置文件 /opt/canal/server/conf/canal.properties
:
# tcp bind ip
canal.ip =127.0.0.1
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# 目标实例名称(默认 example)
canal.destinations = example
# 持久化模式(默认内存,可选 H2/MySQL)
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
这里主要修改canal.ip
其他保持默认即可。
修改实例配置 /opt/canal/server/conf/example/instance.properties
:
#被同步的mysql地址,填写自己的IP地址
canal.instance.master.address=127.0.0.1:3306
#第一步创建的数据库从库权限账号/密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=Password@123
#数据库连接编码
canal.instance.connectionCharset = UTF-8
#Binlog 过滤规则(监控所有库表)
canal.instance.filter.regex=.*\\..*
#指定了 Canal 消费者(比如 MQ 客户端)读取和写入消息的目标主题,保持默认即可
canal.mq.topic=example
2.3 启动 Canal Server
cd /opt/canal/server/bin
./startup.sh
# 查看日志
tail -f /opt/canal/server/logs/canal/canal.log
tail -f /opt/canal/server/logs/example/example.log
可以看到日志没有明显报错,且进程已经启动,则表示Canal Server
已经启动成功。
三. 部署 Canal Adapter(同步到 Elasticsearch)
3.1 配置 Adapter
修改配置文件 /opt/canal/adapter/conf/application.yml
:
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp # 客户端的模式,可选tcp kafka rocketMQ
flatMessage: true # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效
zookeeperHosts: # 对应集群模式下的zk地址
syncBatchSize: 1000 # 每次同步的批数量
retries: 0 # 重试次数, -1为无限重试
timeout: # 同步超时时间, 单位毫秒
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111 #配置canal-server的地址
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
srcDataSources: # 源数据库配置
defaultDScanal是测试数据库
url: jdbc:mysql://<yourIP>:3306/canal?useUnicode=true&useSSL=true #数据库连接,canal是测试用的数据库
username: root #数据库账号
password: Pass@1234 #数据库密码
canalAdapters: # 适配器列表
- instance: example # canal实例名,和上述Server的配置一样
groups: # 分组列表
- groupId: g1 # 分组id, 如果是MQ模式将用到该值
outerAdapters:
- name: logger # 日志打印适配器
- name: es8 # ES同步适配器根据自己的es版本来
hosts: <your IP>:9200 # ES连接地址
properties:
mode: rest # 模式可选transport端口(9300) 或者 rest端口(9200)
security.auth: elastic:123456 # 连接es的用户和密码,仅rest模式使用
cluster.name: elasticsearch # ES集群名称
如何获取es集群名称,命令输出的cluster_name
就是上面需要配置的集群名字:
curl -u elastic:<esPass> -X GET "http://<es IP>:9200/_cluster/health?pretty"
3.2 配置数据映射
创建 Elasticsearch 映射文件 /opt/canal/adapter/conf/es8/mytest_user.yml
:
dataSourceKey: defaultDS # 源数据源的key, 对应上面application配置的srcDataSources中的值
destination: example # canal的instance或者MQ的topic
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
_index: test_user # es 的索引名称
_id: _id # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
sql: "SELECT
tb.id AS _id,
tb.name,
tb.sex,
tb.tel
FROM
test_user us" # sql映射
etlCondition: "where p.id>={}" #etl的条件参数
commitBatch: 3000 # 提交批大小
3.3 启动 Adapter
cd /opt/canal/adapter/bin
./startup.sh
#查看日志
tail -f /opt/canal/adapter/logs/adapter/adapter.log
会输出很多数据库变更的日志:
4. 验证同步
4.1 插入测试数据到 MySQL
#执行sql
INSERT INTO test_user (name, sex, tel) VALUES ('Paco', '男', '123456789');
4.2 查询 Elasticsearch
curl -u elastic:<esPass> -X GET "http://<esIP>:9200/test_user/_search?pretty"
也可以在工具上查看,这边是Eage
插件:
至此,即可验证可同步成功。我们可以修改数据测试,看是否能同步。
然后我们测试修改Es
的数据:
可以发现数据库并没有变,至此Canal
单向实时同步Mysql Binlog
至Elasticsearch
就配置完成了。