在很多的时候,由于一些需求,我们不得不修改索引的映射,也即 mapping,这个时候我们需要重新索引(reindex)来把之前的数据索引到新的索引中。槽糕的是,我们的这个索引还在不断地收集实时数据,那么我们该如何处理这种情况呢?
比如,我们有这样的一个案例。假设你有一个名为 production_logs 的索引处于活动状态,这意味着它不断接收新数据。现在假设你想要以需要重新索(reindex)引该索引的所有数据的方式更新其映射:通常,当你想要更改现有字段的数据类型(例如从keyword 到 integer)时,会发生这种情况。
好的,现在你拥有的选项将取决于你首先如何设置索引。
你需要一个 index template
如果你没有自己创建索引,Elasticsearch 能够创建索引,这意味着如果你尝试索引 foo 索引中的某些数据,Elasticsearch 将创建它(如果它尚不存在)。
通过这样做,它使用称为动态映射的功能为这个新索引创建默认映射。
这就是你需要索引模板的原因! 此功能允许你定义 Elasticsearch 自动创建的索引将获得的所有属性,包括其设置和映射。
因此,如果你的 production_logs 索引没有索引模板,仍然是时候创建它了。 我们称它为 production_logs_template 并将此模板的模式设置为 production_logs* ,这意味着每次 Elasticearch 将自动创建名称与模式 production_logs* 匹配的索引时,它将应用该模板(请参阅完整文档)。
以下是索引模板创建请求的示例:
PUT _index_template/production_logs_template
{
"index_patterns": [ "production_logs*"],
"template": {
"settings": {
...
},
"mappings": {
...
}
}
}
案例 1:你已经有一个 index alias
将真实索引隐藏在索引别名后面始终是一个好习惯。
它引入的抽象级别可帮助你在后台执行修改,而不会影响索引的用户(无论是人还是软件)。
因此,假设你获得了当前链接到要修改的 production_logs 索引的日志别名,并且你想要根据 production_logs_template 更新索引的映射。
这里的方法很简单:
- 更新索引模板以定义新映射
- 创建一个新索引,其名称与 production_logs* 匹配(例如 production_logs_1 )
- 切换日志别名以指向这个新索引
POST _aliases
{
"actions": [
{
"add": {
"index": "production_logs_1",
"alias": "logs"
}
},
{
"remove": {
"index": "production_logs",
"alias": "logs"
}
}
]
}
在上面我们的 add 及 remove 是一个原子操作,也即同时起作用。中间不会有其它的操作,这样保证在删除的同时,向我们的索引别名 logs 写入的数据能够正确地写入到新的索引中。
- 将数据从 production_logs 重新索引到 production_logs_1
POST _reindex?wait_for_completion=false
{
"source": {
"index": "production_logs"
},
"dest": {
"index": "production_logs_1"
}
}
此请求将返回一个 task_id,你可以使用它来监视重建索引过程的进度。
以下是你收到的响应示例:
{
"task": "<task_id>"
}
你可以发出请求以利用任务管理 API:
GET _tasks/<task_id>?human
上述命令可以让我们知道任务的进度。就是这么简单! 在重建索引过程结束时,你的 production_logs_1 索引将包含所有新旧数据,并具有正确的映射。 👏
案例2:你还没有一个 index alias
嗯,这会更难,但没有什么是不可能的,对吧?
从现在开始,我们仍然可以面临(至少)两种不同的情况:你是否使用摄入管道(ingest pipeline)。
在下文中,请记住我们已经创建了一个驱动 production_logs* 索引的索引模板。
案例 2.1:你正在使用一个 ingest pipeline
假设使用 production_logs_pipeline 将数据索引到 Elasticsearch,该管道处理任何传入事件,然后再将其索引到 production_logs 索引中。
以下是你想要更新实时索引的几个步骤。
首先,修改 production_logs_pipeline,在末尾添加 set processor。
{
"set": {
"field": "_index",
"value": "{{{_index}}}_1"
}
}
现在,所有通过此摄取管道的文档都将重定向到 production_logs_1 索引。
然后,你可以将所有数据从 production_logs 重新索引到名为 production_logs_orig 的新索引索引。
POST _reindex?wait_for_completion=false
{
"source": {
"index": "production_logs"
},
"dest": {
"index": "production_logs_orig"
}
}
这里请注意,索引 production_logs_1 和 production_logs_orig 都将根据索引模板 production_logs_template 创建!
重新索引任务完成后,你可以安全地删除 production_logs 索引(它的所有数据都已存储在 production_logs_orig 索引中)。
现在我们可以创建一个新的索引别名,它将被命名为 production_logs 并将同时针对 production_logs_orig 和 production_logs_1 索引,后者将是写入索引。
POST _aliases
{
"actions": [
{
"add": {
"index": "production_logs_orig",
"alias": "production_logs"
}
},
{
"add": {
"index": "production_logs_1",
"alias": "production_logs",
"is_write_index": true
}
}
]
}
最后,你需要返回到以前版本的 production_logs_pipeline,以便传入的数据从现在开始将使用新创建的 production_logs 别名。
作为最终结果,我们现在有一个索引别名指向两个具有预期映射的索引👏
💡 有可能再次从 production_logs_orig 重新索引到 production_logs_1 最后只有一个索引。
案例 2.2:索引是直接对索引进行的,没有摄取管道
在这种情况下,需要执行更多步骤,遗憾的是无法创建别名来替换原始索引,但你仍然可以将新映射应用于实时索引。
因此,首先你必须创建一个新的摄取管道,我们将其命名为 temp_pipeline:
PUT _ingest/pipeline/temp_pipeline
{
"processors": [
{
"set": {
"field": "_index",
"value": "{{{_index}}}_1"
}
}
]
}
它只有一个处理器,其作用是将传入的文档重定向到一个新索引,该索引的名称是原始索引并以 _1 为后缀。
现在我们要表明任何进入 production_logs 索引的文档现在都应该使用这个新的摄取管道。 我们可以在我们的索引上利用 index.default_pipeline 设置,这将允许我们应用管道,即使原始索引请求没有提到它。
PUT production_logs/_settings
{
"index": {
"default_pipeline": "temp_pipeline"
}
}
从现在开始,每个传入的文档都将被重定向到 production_logs_1 索引,该索引将使用 production_logs_template 索引模板自动创建。
production_logs 索引不会获得任何新文档,因此我们可以将数据从该索引重新索引到一个名为 production_logs_orig 的新文档。
POST _reindex?wait_for_completion=false
{
"source": {
"index": "production_logs"
},
"dest": {
"index": "production_logs_orig"
}
}
和以前一样,我们可以使用任务管理 API来监控重建索引过程。
完成后,我们删除 production_logs,下一个传入文档到达后将立即重新创建! 它将获得预期的映射(感谢索引模板)所以我们处于这种情况:
- production_logs_orig 索引包含在我们开始操作之前索引的所有数据
- production_logs_1 索引包含从迁移开始到删除之前的 production_logs 索引之间已编制索引的所有数据
- production_logs 索引已重新创建并收集所有新数据(在如下的步骤中进行操作)
要回到单索引状态,我们只需将数据从 production_logs_orig 和 production_logs_1 重新索引到 production_logs 。
POST _reindex?wait_for_completion=false
{
"source": {
"index": "production_logs_1, production_logs_orig"
},
"dest": {
"index": "production_logs"
}
}
作为最终结果,我们将所有以前的数据(以及所有新数据)与新映射一起存储在 production_logs 索引中 👏
结论
本文可以帮助实现一些数据操作,但请记住当前在 Elasticsearch 中处理数据时的最佳实践:
- 始终使用别名从你用来与之交互的资源中抽象出你的真实索引
- 如果处理时间序列,最好是使用数据流!