公司企微聊天数据存储在 ES 中,虽然按照企业分储在不同的ES 索引中,但某些常用的企微主体使用量还是很大。4年中一个索引存储数据已经达到46多亿条数据,占用存储3.1tb,
ES 配置
由于多一个副本,存储得翻倍,成本考虑,所以没有设置副本分片(不建议你们这么做)
索引拆分后,只需要修改插入时的代码逻辑,设置好别名后,查询代码是不需要改动的
进入主题,拆分索引 ,数据按年进行拆分
1.设置索引模板
PUT _template/chat_message_template
{
"index_patterns": ["chat-message-*"],
"settings": {
"number_of_shards": 15, // 这里和集群的节点对应,需要是节点的整数倍
"number_of_replicas": 0, // 分片副本,生产环境最好设置>=1
"refresh_interval": "20s",
"codec": "best_compression",
"max_result_window": "100000000"
},
"mappings": {
"properties": {
"msg_id": {
"type": "keyword"
},
"msg_time": {
"type": "long"
},
"msg_type": {
"type": "keyword"
},
"text": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart",
"term_vector": "with_positions_offsets"
},
"image": {
"type": "text"
},
...
...
...
}
}
}
2.根据模板新建索引
PUT chat-message-2021
3.reindex 到新索引
!!重要!!
# reindex 前查看磁盘是否够用,索引切分后,占用磁盘大小比一个索引大了一些
# 尽量多留一些空间给新索引 , 扩容前磁盘占用40%左右是个不错的选择,请提前进行扩容
# 我们遇到了空间不够用的情况,后扩容,虽然说是滚动扩容,客服说任务可能会取消,但扩容后任务还在,所以尽量提前把空间扩容好
# 查看各节点的分片数量及磁盘使用
GET /_cat/nodes?v&h=name,shards,disk.used_percent
# 结果
name disk.used_percent
es-cn-**-data-g4-3 86.12
es-cn-**-data-g4-2 81.75
es-cn-**-data-g4-0 85.94
es-cn-**-data-g4-4 85.73
es-cn-**-data-g4-1 85.67
reindex命令
# 限流保护:添加?requests_per_second=1000参数避免集群过载
# 异步执行,会直接返回 taskId: wait_for_completion
# 执行完后,修改以下 msg_time 再执行,可以并行迁移每年的数据
POST _reindex?requests_per_second=1000&wait_for_completion=false
{
"conflicts": "proceed", // 默认情况下,当发生version conflict的时候,_reindex会被abort。解决方案设置为“proceed”:
"source": {
"index": "chat-message-0613",
"size":5000,
"query": {
"range": {
"msg_time": {
"gte": 1609430400000,
"lt": 1610380800111
}
}
}
},
"dest": {
"index": "chat-message-2021",
"op_type": "create" // 把op_type设置为create,_reindex API,只在dest index中添加不不存在的doucments。如果相同的documents已经存在,则会报version confilct的错误。
}
}
4.查看进度
GET _tasks?detailed=true&actions=*reindex
# 返回结果
{
"nodes" : {
"z5VL_HJ2Qn****AMQ" : {
"name" : "es-cn-**-data-g4-3",
"transport_address" : "121.**.114.80:9300",
"host" : "121.**.114.80",
"ip" : "121.**.114.80:9300",
"roles" : [
"data",
"ingest",
"master",
"ml",
"remote_cluster_client",
"transform"
],
"attributes" : {
"zone_id" : "cn-shanghai-g",
"ml.machine_memory" : "64887980032",
"ml.max_open_jobs" : "20",
"xpack.installed" : "true",
"zone" : "cn-shanghai-g",
"transform.node" : "true"
},
"tasks" : {
"z5VL_HJ2Qn****YhoAMQ:6597302" : {
"node" : "z5VL_HJ2Qn****YhoAMQ",
"id" : 6597302,
"type" : "transport",
"action" : "indices:data/write/reindex",
"status" : {
"total" : 484234,
"updated" : 0,
"created" : 0,
"deleted" : 0,
"batches" : 11,
"version_conflicts" : 55000,
"noops" : 0,
"retries" : {
"bulk" : 0,
"search" : 0
},
"throttled_millis" : 99999,
"requests_per_second" : 1000.0,
"throttled_until_millis" : 2410
},
"description" : "reindex from [chat-message-0613] to [chat-message-2021][_doc]",
"start_time_in_millis" : 1742189556123,
"running_time_in_nanos" : 109710051531,
"cancellable" : true,
"headers" : {
"trace_id" : "ywiWopUBbHoVT9Iz8TeX"
}
}
}
}
}
}
5.查看文档数量是否相同
把数据和 kibana 中索引文档数据比对即可
例:查看2021年的数据量
GET chat-message-0613/_count
{
"query": {
"range": {
"msg_time": {
"gte": 1609459200000,
"lt": 1640966400000
}
}
}
}
# 数量对的上的话,可以把老索引给删除了,释放磁盘,在 kibana 中操作即可
6.修改别名
先删除之前的别名,把老的别名放到新建的索引上
POST _aliases
{
"actions" : [
{"remove" : {"index" : "chat-message-0613" , "alias" : "chat-message"}},
{"add" : {"index" : "chat-message-2021" , "alias" : "chat-message"}},
{"add" : {"index" : "chat-message-2022" , "alias" : "chat-message"}},
{"add" : {"index" : "chat-message-2023" , "alias" : "chat-message"}},
{"add" : {"index" : "chat-message-2024" , "alias" : "chat-message"}},
{"add" : {"index" : "chat-message-2025" , "alias" : "chat-message"}}
]
}
修改别名后,ES 查询部分不用修改
取消reindex命令
POST _tasks/z5VL_HJ2Qn****YhoAMQ:6597302/_cancel
插入文档数据的部分逻辑代码
func (s *ElasticService) CreateIndex(index, alias, mapping string) (err error) {
// 判断索引是否存在
exists, err := sys.Elastic().IndexExists(index).Do(context.Background())
if err != nil {
sys.Log().Error(sys.NewProjectErr(1000401).Error() + index)
return
}
if exists {
// 加锁,说明此索引已经存在
_, _ = model.Factory.Lock.AddLock(model.LockSync, "mapping:ext:"+index, 370*24*3600)
return
}
defer func() {
model.Factory.Lock.UnLocK(model.LockSync, "mapping:create:"+index)
}()
// 多线程创建防止出错,加锁
ok, err := model.Factory.Lock.AddLock(model.LockSync, "mapping:create:"+index, 120)
if err != nil {
return
}
if !ok {
err = errors.New("未抢占到锁")
return
}
// 创建index
createIndex, err := sys.Elastic().CreateIndex(index).Body(mapping).Do(context.Background())
if err != nil {
sys.Log().Error(sys.NewProjectErr(1000401).Error() + index)
return
}
if !createIndex.Acknowledged {
// Not acknowledged
sys.Log().Error(sys.NewProjectErr(1000401).Error() + index)
return
}
// 创建alias
putAlias, err := sys.Elastic().Alias().Add(index, alias).Do(context.Background())
if err != nil {
sys.Log().Error(sys.NewProjectErr(1000402).Error() + alias)
return
}
// 可选:检查别名操作是否被集群确认(按需添加)
if !putAlias.Acknowledged {
sys.Log().Error("Alias creation not acknowledged: " + alias)
return
}
return
}
// 数据按年切分后请求此方法
func (s *ElasticService) Bulk(index, alias string, chatMsg []model.ChatMessage) (bulkResponseItem []*elastic.BulkResponseItem, err error) {
tryTime := 0
CreateIndex:
// 分布锁
ok, err := model.Factory.Lock.ExistLock(model.LockSync, "mapping:ext:"+index)
if err != nil {
return
}
if !ok {
errCreate := s.CreateIndex(index, alias, ChatMessageMapping)
if errCreate != nil {
if tryTime < 10 {
tryTime++
time.Sleep(time.Second)
goto CreateIndex
} else {
err = errCreate
return
}
}
}
bulkRequest := sys.Elastic().Bulk()
...
...
...
}
最后
整个迁移非常费时间。 让grok 帮我算了下(ps. 同样的提问,国内大模型都没算出来 ,还是MASK的强啊)