mysql数据同步到elasticsearch数据解决方案
问题场景
1.分库分表后多关联或者多条件查找效率低下,例如2b场景的查询,导出等需要多条件查询,继续用分库分表话效率低下。
2.数据量太多需要转移非关系型数据库elasticsearch存储
3.其他数据转移场景等
这两种场景都涉及到mysql数据同步到es数据解决方案,解决起来分总体两步走,一是存量数据的同步,二是增量数据的同步。这里利用的是canal的方案去同步数据,方案如下图所示
这个是不停机的方案,首先同时开启存量的数据的导入和增量数据的监听,待存量数据导入完成,开启java服务消费mq消息,对数据进行更新或者插入,若数据存在则进行更新,若数据不存在,是新插入则插入,是更新则保存到定时任务重试。这里只是理想方案,实际过程中和存量数据的大小,数据的增长率等有关系,具体实施肯定较为复杂。
若要执行停机方案,则比较简单,数据不再更新后,将存量数据插入到es后,再开启增量数据监听服务以及消费服务,这样es就能实时同步数据了,下面实践下canal adapter的mysql存量数据导入elasticsearch中。
增量数据导入elasticsearch
实践版本:
elasticsearch &kibana:7.12.1
canal.client-adapter:1.1.7-SNAPSHOT
mysql :8.0 主从
以这个分表的数据为例
CREATE TABLE `pay_parent_1` (
`id` bigint NOT NULL AUTO_INCREMENT,
`user_id` int NOT NULL,
`status` varchar(50) COLLATE utf8mb4_bin DEFAULT NULL,
`creator` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT '' COMMENT '创建者',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`updater` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT '' COMMENT '更新者',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`deleted` bit(1) NOT NULL DEFAULT b'0' COMMENT '是否删除',
`tenant_id` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '租户id',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1571152425171070978 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
在kibana中建立索引
PUT /pay_parent_0
{
"mappings":{
"properties":{
"id": {
"type": "long"
},
"user_id": {
"type": "long"
},
"status": {
"type": "text"
},
"creator": {
"type": "text"
},
"create_time": {
"type": "date",
"formats" : ["yyyy-MM-dd HH:mm:ss"],
"timezone" : "Asia/Shanghai"
},
"updater": {
"type": "text"
},
"update_time": {
"type": "date",
"formats" : ["yyyy-MM-dd HH:mm:ss"],
"timezone" : "Asia/Shanghai"
},
"deleted": {
"type": "long"
},
"tenant_id": {
"type": "text"
}
}
}
}
client-adapter 配置 表和es索引的映射
dataSourceKey: defaultDS #数据源
destination: pay_parent_0 #也可以从监听的数据源取数据 mq或者canal
outerAdapterKey: es #对应的适配器的key
groupId: g1 #对应适配器的分组
esMapping:
index: pay_parent_0 #es索引名称
id: id #数据库主键对应es文档id 插入数据时一定要填写
# upsert: true #是否更新 以主键id作为更新条件
sql: "select id, user_id, status, creator, create_time, updater, update_time, deleted, tenant_id
from pay_parent_0 as a"
etlCondition: "where a.id={}" # etl 的条件参数 接口请求
commitBatch: 3000 # 提交批大小
下载canal源码,启动lanucher 模块,client-adapter 启动配置
srcDataSources:
defaultDS:
url: jdbc:mysql://xxx:3306/demo0?useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&characterEncoding=UTF-8&serverTimezone=CTT
username: root
password: xxx
canalAdapters:
- instance: canal.test.queue # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: es7
key: es
hosts: http://xxx:39200 # 127.0.0.1:9200 for rest mode
properties:
mode: rest # or rest
cluster.name: elasticsearch
启动后执行,先测试第一条数据插入
http://127.0.0.1:8081/etl/es7/es/payParent0.yml?params=1571052632021184514
插入后查看kibana,以创建时间搜索,如图所示,值得注意的是elasti存储时默认将时间转化为UTC时间存储,即0时区,内部默认是个长整型。所以这里的date格式的时间都时零时区的,但是搜索的时候kibana会进行时区转换。搜出来的结果是准确的。
另外,重复请求时,指定了id插入的数据会覆盖原来的数据,这个是es内部api的功能。
简单测试完后,测试下10w的数据插入,这里只需要把请求中的参数去掉就行,执行http://127.0.0.1:8081/etl/es7/es/payParent0.yml
可以看到性能还是不错,10w的数据准确无误的插入,花了41ms,外网和硬件条件一般的情况,内网的花更快。
总结
利用canal adpter的es插件可以实现mysql 同步的数据的功能,存量数据批量更新或者批量插入,非常方便。里面的源码插件的实现,配置文件分离,插入实例的实现以及mysql数据的批量插入都可以借鉴。若后续业务中有设计数据迁移到es中,参考实现是非常有帮助的。