作者:来自 Elastic Essodjolo Kahanam
本博客介绍如何使用语义搜索以自然语言表达形式从 Elasticsearch 索引中获取信息。我们将创建一个无服务器 Elasticsearch 项目,将之前的奥运会数据集加载到索引中,使用推理处理器和 ELSER 模型生成推理数据(在稀疏向量场中),最后借助文本扩展(text expansion)查询以自然语言表达形式搜索历史奥运会比赛信息。
工具和数据集
对于这个项目,我们将使用 Elasticsearch serverless 项目和无服务器 Python 客户端 (elasticsearch_serverless) 与 Elasticsearch 交互。要创建 serverless 项目,只需按照 serverless 入门指南操作即可。有关 serverless 的更多信息(包括定价),可在此处找到。
设置 serverless 项目时,请务必选择 Elasticsearch 选项和用于本教程的通用选项。
使用的数据集是从 Kaggle (Athletes_summer_games.csv) 获得的 1896 年至 2020 年夏季奥运会参赛者的数据集。它包含有关比赛年份、比赛类型、参赛者姓名、他们是否赢得奖牌以及最终获得哪枚奖牌以及其他信息的信息。
对于数据集操作,我们将使用 Eland,这是一个用于 Elasticsearch 中的 DataFrames 和机器学习的 Python 客户端和工具包。
最后使用的自然语言处理 (NLP) 模型是 Elastic Learned Sparse EncodeR (ELSER),这是一个由 Elastic 训练的检索模型,允许通过语义搜索检索更相关的搜索结果。
在执行以下步骤之前,请确保你已安装 severless Python 客户端和 Eland。
pip install elasticsearch_serverless
pip install eland
请注意下面我使用的版本。如果你使用的不是同一个版本,则可能需要根据你使用的版本中的任何最终语法更改来调整代码。
➜ ~ python3 --version
Python 3.9.6
➜ ~ pip3 list | grep -E 'elasticsearch-serverless|eland'
eland 8.14.0
elasticsearch-serverless 0.3.0.20231031
下载并部署 ELSER 模型
我们将使用 Python 客户端下载并部署 ELSER 模型。在此之前,让我们先确认我们可以连接到我们的 serverless 项目。下面的 URL 和 API 密钥是从环境变量中读取的;你需要根据自己的情况使用适当的值,或者使用你喜欢的任何方法来读取凭据。
from elasticsearch_serverless import Elasticsearch
from os import environ
serverless_endpoint = environ.get("SERVERLESS_ENDPOINT_URL")
serverless_api_key = environ.get("SERVERLESS_API_KEY")
client = Elasticsearch(
serverless_endpoint,
api_key=serverless_api_key
)
client.info()
如果一切配置正确,你应该得到如下输出:
ObjectApiResponse({'name': 'serverless', 'cluster_name': 'd6c6698e28c34e58b6f858df9442abac', 'cluster_uuid': 'hOuAhMUPQkumEM-PxW_r-Q', 'version': {'number': '8.11.0', 'build_flavor': 'serverless', 'build_type': 'docker', 'build_hash': '00000000', 'build_date': '2023-10-31', 'build_snapshot': False, 'lucene_version': '9.7.0', 'minimum_wire_compatibility_version': '8.11.0', 'minimum_index_compatibility_version': '8.11.0'}, 'tagline': 'You Know, for Search'})
现在我们已经确认 Python 客户端已成功连接到无服务器 Elasticsearch 项目,让我们下载并部署 ELSER 模型。我们将检查该模型是否之前已部署,并将其删除以执行全新安装。此外,由于部署阶段可能需要几分钟,我们将不断检查模型配置信息,以确保在进入下一阶段之前模型定义存在。有关更多信息,请查看Get trained models API。
from elasticsearch_serverless import Elasticsearch, exceptions
import time
# delete model if already downloaded and deployed
try:
client.ml.delete_trained_model(model_id=".elser_model_2", force=True)
print("Model deleted successfully, We will proceed with creating one")
except exceptions.NotFoundError:
print("Model doesn't exist, but We will proceed with creating one")
# Creates the ELSER model configuration. Automatically downloads the model if it doesn't exist.
client.ml.put_trained_model(
model_id=".elser_model_2",
input={
"field_names": [
"concatenated_textl"
]
}
)
# Check the download and deploy progress
while True:
status = client.ml.get_trained_models(
model_id=".elser_model_2", include="definition_status"
)
if status["trained_model_configs"][0]["fully_defined"]:
print("ELSER Model is downloaded and ready to be deployed.")
break
else:
print("ELSER Model is downloaded but not ready to be deployed.")
time.sleep(5)
一旦我们确认模型已下载并准备部署,我们就可以继续启动 ELSER。完全准备好部署可能需要一点时间。
# A function to check the model's routing state
# https://www.elastic.co/guide/en/elasticsearch/reference/current/get-trained-models-stats.html
def get_model_routing_state(model_id=".elser_model_2"):
try:
status = client.ml.get_trained_models_stats(
model_id=".elser_model_2",
)
return status["trained_model_stats"][0]["deployment_stats"]["nodes"][0]["routing_state"]["routing_state"]
except:
return None
# If ELSER is already started, then we are fine.
if get_model_routing_state(".elser_model_2") == "started":
print("ELSER Model has been already deployed and is currently started.")
# Otherwise, we will deploy it, and monitor the routing state to make sure it is started.
else:
print("ELSER Model will be deployed.")
# Start trained model deployment
client.ml.start_trained_model_deployment(
model_id=".elser_model_2",
number_of_allocations=16,
threads_per_allocation=4,
wait_for="starting"
)
while True:
if get_model_routing_state(".elser_model_2") == "started":
print("ELSER Model has been successfully deployed.")
break
else:
print("ELSER Model is currently being deployed.")
time.sleep(5)
使用 Eland 将数据集加载到 Elasticsearch
eland.csv_to_eland 允许将逗号分隔值 (csv) 文件读入存储在 Elasticsearch 索引中的数据框中。我们将使用它将奥运会数据 (Athletes_summer_games.csv) 加载到 Elasticsearch 中。es_type_overrides 允许覆盖默认映射。
import eland as ed
index="elser-olympic-games"
csv_file="Athletes_summer_games.csv"
ed.csv_to_eland(
csv_file,
es_client=client,
es_dest_index=index,
es_if_exists='replace',
es_dropna=True,
es_refresh=True,
index_col=0,
es_type_overrides={
"City": "text",
"Event": "text",
"Games": "text",
"Medal": "text",
"NOC": "text",
"Name": "text",
"Season": "text",
"Sport": "text",
"Team": "text"
}
)
执行上述代码后,数据将写入索引 elser-olympic-games。你还可以将生成的数据框 (eland.DataFrame) 检索到变量中,以供进一步操作。
基于 ELSER 创建用于推理的摄取管道
我们使用语义搜索探索过去奥运会比赛数据的下一步是创建一个包含运行 ELSER 模型的 inference processor 的摄取管道。已选择一组字段并将其串联成推理处理器将在其上工作的单个字段。根据你的用例,你可能需要使用另一种策略。
串联是使用 script processor 完成的。推理处理器使用先前部署的 ELSER 模型,将串联字段作为输入,并将输出存储在稀疏向量类型字段中(参见以下要点)。
client.ingest.put_pipeline(
id="elser-ingest-pipeline",
description="Ingest pipeline for ELSER",
processors=[
{
"script": {
"description": "Concatenate some selected fields value into `concatenated_text` field",
"lang": "painless",
"source": """
ctx['concatenated_text'] = ctx['Name'] + ' ' + ctx['Team'] + ' ' + ctx['Games'] + ' ' + ctx['City'] + ' ' + ctx['Event'];
"""
}
},
{
"inference": {
"model_id": ".elser_model_2",
"ignore_missing": True,
"input_output": [
{
"input_field": "concatenated_text",
"output_field": "concatenated_text_embedding"
}
]
}
}
]
)
准备索引
这是使用自然语言表达查询过去奥运会比赛数据之前的最后一个阶段。我们将更新之前创建的索引的映射,添加一个 sparse vector 类型字段。
更新映射:添加稀疏向量字段
我们将通过添加一个用于保存串联数据(concatenated data)的字段和一个用于保存推理处理器使用 ELSER 模型计算出的推断信息的稀疏向量字段来更新索引映射。
index="elser-olympic-games"
mappings_properties={
"concatenated_text": {
"type": "text"
},
"concatenated_text_embedding": {
"type": "sparse_vector"
}
}
client.indices.put_mapping(
index=index,
properties=mappings_properties
)
填充稀疏向量字段
我们将通过运行 update by query 来调用之前创建的摄取管道,以便填充每个文档中的稀疏向量字段。
client.update_by_query(
index="elser-olympic-games",
pipeline="elser-ingest-pipeline",
wait_for_completion=False
)
该请求将需要一些时间,具体取决于文档数量以及用于部署 ELSER 的分配数量和每个分配的线程数。完成此步骤后,我们现在可以开始使用语义搜索探索过去的奥运会数据集。
让我们使用语义搜索探索奥运会数据集
现在我们将使用 text expansion 查询,使用自然语言表达来检索有关过去奥运会比赛的信息。在进行演示之前,让我们创建一个函数来检索和格式化搜索结果。
def semantic_search(search_text):
response = client.search(
index="elser-olympic-games",
size=3,
query={
"bool": {
"must": [
{
"text_expansion": {
"concatenated_text_embedding": {
"model_id": ".elser_model_2",
"model_text": search_text
}
}
},
{
"exists": {
"field": "Medal"
}
}
]
}
},
source_excludes="*_embedding, concatenated_text"
)
for hit in response["hits"]["hits"]:
doc_id = hit["_id"]
score = hit["_score"]
year = hit["_source"]["Year"]
event = hit["_source"]["Event"]
games = hit["_source"]["Games"]
sport = hit["_source"]["Sport"]
city = hit["_source"]["City"]
team = hit["_source"]["Team"]
name = hit["_source"]["Name"]
medal = hit["_source"]["Medal"]
print(f"Score: {score}\nDocument ID: {doc_id}\nYear: {year}\nEvent: {event}\nName: {name}\nCity: {city}\nTeam: {team}\nMedal: {medal}\n")
上述函数将接收有关往届奥运会比赛获胜者的问题,并使用 Elastic 的 text expansion 查询执行语义搜索。检索到的结果将被格式化并打印出来。请注意,我们强制查询中存在奖牌,因为我们只对获胜者感兴趣。我们还将结果的大小限制为 3,因为我们预计会有三名获胜者(金牌、银牌、铜牌)。同样,根据你的用例,你可能不一定会做同样的事情。
🏌️♂️ “Who won the Golf competition in 1900?”
请求:
semantic_search("Who won the Golf competition in 1900?")
响应:
Score: 18.184263
Document ID: 206566
Year: 1900
Event: Golf Men's Individual
Name: Walter Mathers Rutherford
City: Paris
Team: Great Britain
Medal: Silver
Score: 17.443663
Document ID: 209892
Year: 1900
Event: Golf Men's Individual
Name: Charles Edward Sands
City: Paris
Team: United States
Medal: Gold
Score: 16.939331
Document ID: 192747
Year: 1900
Event: Golf Women's Individual
Name: Myra Abigail "Abbie" Pratt (Pankhurst-, Wright-, -Karageorgevich)
City: Paris
Team: United States
Medal: Bronze
🏹 “Women archery winners of 1908”
请求:
semantic_search("Women archery winners of 1908")
响应:
Score: 21.876282
Document ID: 96010
Year: 1908
Event: Archery Women's Double National Round
Name: Beatrice Geraldine Hill-Lowe (Ruxton-, -Thompson)
City: London
Team: Great Britain
Medal: Bronze
Score: 21.0998
Document ID: 170250
Year: 1908
Event: Archery Women's Double National Round
Name: Sybil Fenton Newall
City: London
Team: Great Britain
Medal: Gold
Score: 21.079535
Document ID: 56686
Year: 1908
Event: Archery Women's Double National Round
Name: Charlotte "Lottie" Dod
City: London
Team: Great Britain
Medal: Silver
🚴♂️ “Who won the individual cycling competition in 1972?”
请求:
semantic_search("Who won the cycling competition in 1972?")
响应:
Score: 20.554308
Document ID: 215559
Year: 1972
Event: Cycling Men's Road Race, Individual
Name: Kevin "Clyde" Sefton
City: Munich
Team: Australia
Medal: Silver
Score: 20.267525
Document ID: 128598
Year: 1972
Event: Cycling Men's Road Race, Individual
Name: Hendrikus Andreas "Hennie" Kuiper
City: Munich
Team: Netherlands
Medal: Gold
Score: 19.108923
Document ID: 19225
Year: 1972
Event: Cycling Men's Team Pursuit, 4,000 metres
Name: Michael John "Mick" Bennett
City: Munich
Team: Great Britain
Medal: Bronze
结论
本博客展示了如何使用 serverlss 的 Python 编程语言,通过 Elastic Learned Sparse EncodeR (ELSER) NLP 模型执行语义搜索。运行本教程后,你需要确保关闭 serverless,以避免任何额外费用。要进一步了解,请随时查看我们的 Elasticsearch 相关性引擎 (ESRE) 工程师课程,你可以在其中学习如何利用 Elasticsearch 相关性引擎 (ESRE) 和大型语言模型 (LLMs) 构建高级 RAG(检索增强生成)应用程序,将 Elasticsearch 的存储、处理和搜索功能与 LLM 的生成能力相结合。
本文中描述的任何特性或功能的发布和时间均由 Elastic 自行决定。任何当前不可用的特性或功能可能无法按时交付或根本无法交付。
准备好自己尝试一下了吗?开始免费试用。
想要获得 Elastic 认证?了解下一期 Elasticsearch 工程师培训何时开始!
更多关于奥运的数据分析,请阅读文章 :
-
使用 Elastic Stack 来分析奥运数据(一)(二)(三)
原文:Serverless semantic search with ELSER in Python — Search Labs