Elasticsearch:让你的 Elasticsearch 索引与 Python 和 Google Cloud Platform 功能保持同步

news2025/1/23 17:32:13

作者:来自 Elastic Garson

Elasticsearch 内的索引 (index) 是你可以将数据存储在文档中的位置。 在使用索引时,如果你使用的是动态数据集,数据可能会很快变旧。 为了避免此问题,你可以创建一个 Python 脚本来更新索引,并使用 Google Cloud Platform (GCP) 的 Cloud Functions 和 Cloud Scheduler 进行部署,以便自动保持索引最新。

为了使索引保持最新,你可以首先设置一个 Jupyter Notebook 在本地进行测试,并创建一个脚本框架,该脚本框架将在出现新信息时更新你的索引。 你可以调整脚本以使其更具可重用性并将其作为云函数(Cloud function)运行。 使用 Cloud Scheduler,你可以将 Cloud Function 中的代码设置为使用 cron 类型格式按计划运行。

先决条件

  • 本示例使用 Elasticsearch 版本 8.12; 如果你是新手,请查看我们的 Elasticsearch 快速入门。
  • 如果你的计算机上尚未安装 Python,请下载最新版本。 此示例使用 Python 3.12.1。
  • NASA API 的 API 密钥。
  • 你将使用 Requests 包连接到 NASA API,使用 Pandas 操作数据,使用 Elasticsearch Python 客户端将数据加载到索引中并使其保持最新,并使用 Jupyter Notebooks 在测试时以交互方式处理数据。 你可以运行以下行来安装这些必需的软件包:
pip3 install requests pandas elasticsearch notebook

加载和更新你的数据集

在 GCP 内运行更新脚本之前,你需要上传数据并测试用于保持脚本更新的流程。 你将首先从 API 连接到数据,将其保存为 Pandas DataFrame,连接到 Elasticsearch,将 DataFrame 上传到索引中,检查索引上次更新的时间,并在有新数据可用时更新索引。 你可以在此搜索实验室笔记本中找到本节的完整代码。

加载你的数据

让我们开始使用 Jupyter Notebook 进行本地测试,以交互方式处理你的数据。 为此,你可以在终端中运行以下命令。

jupyter notebook

在右上角,你可以选择 “New” 来创建新的 Jupyter Notebook。

首先,你需要导入将要使用的包。 你将导入之前安装的所有软件包,以及用于处理 API 密钥等机密的 getpass 和用于处理日期对象的 datetime。

import requests
from getpass import getpass
import pandas as pd
from datetime import datetime, timedelta
from elasticsearch import Elasticsearch, helpers

你将使用的数据集是近地天体 Web 服务 (NeoWs),这是一种提供近地小行星信息的 RESTful Web 服务。 该数据集可让你根据小行星最接近地球的日期搜索小行星、查找特定小行星并浏览整个数据集。

通过以下函数,你可以连接到 NASA 的 NeoWs API,获取过去一周的数据,并将响应转换为 JSON 对象。

def connect_to_nasa():
    url = "https://api.nasa.gov/neo/rest/v1/feed"
    nasa_api_key = getpass("NASA API Key: ")
    today = datetime.now()
    params = {
        "api_key": nasa_api_key,
        "start_date": today - timedelta(days=7),
        "end_date": datetime.now(),
    }
    return requests.get(url, params).json()

现在,你可以将 API 调用的结果保存到名为 “response” 的变量中。

要将 JSON 对象转换为 pandas DataFrame,你必须将嵌套对象规范化为一个 DataFrame,并删除包含嵌套 JSON 的列。

def create_df(response):
    all_objects = []
    for date, objects in response["near_earth_objects"].items():
        for obj in objects:
            obj["close_approach_date"] = date
            all_objects.append(obj)
    df = pd.json_normalize(all_objects)
    return df.drop("close_approach_data", axis=1)

要调用此函数并查看数据集的前五行,你可以运行以下命令:

df = create_df(response)
df.head()

连接到 Elasticsearch

你可以通过提供 Elastic Cloud ID 和 API 密钥进行身份验证,从 Python 客户端访问 Elasticsearch。

def connect_to_elastic():
    elastic_cloud_id = getpass("Elastic Cloud ID: ")
    elastic_api_key = getpass("Elastic API Key: ")
    return Elasticsearch(cloud_id=elastic_cloud_id, api_key=elastic_api_key)

现在,你可以将连接函数的结果保存到名为 es 的变量中。

es = connect_to_elastic()

Elasticsearch 中的索引是数据的主要容器。 你可以将索引命名为 asteroid_data_set。

index_name = "asteroid_data_set"
es.indices.create(index=index_name)

你返回的结果将如下所示:

ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'asteroids_data'})

现在,你可以创建一个辅助函数,该函数允许你将 DataFrame 转换为正确的格式以上传到索引中。

def doc_generator(df, index_name):
    for index, document in df.iterrows():
        yield {
            "_index": index_name,
            "_id": f"{document['id']}",
            "_source": document.to_dict(),
        }

接下来,你可以将 DataFrame 的内容批量上传到 Elastic,调用您刚刚创建的辅助函数。

helpers.bulk(es, doc_generator(df, index_name))

你应该得到类似于以下内容的结果,它告诉你已上传的行数:

(146, [])

你最后一次更新数据是什么时候?

将数据上传到 Elasticsearch 后,你可以检查上次更新索引的时间并设置日期格式,以便它可以与 NASA API 配合使用。

def updated_last(es, index_name):
    query = {
        "size": 0,
        "aggs": {"last_date": {"max": {"field": "close_approach_date"}}},
    }
    response = es.search(index=index_name, body=query)
    last_updated_date_string = response["aggregations"]["last_date"]["value_as_string"]
    datetime_obj = datetime.strptime(last_updated_date_string, "%Y-%m-%dT%H:%M:%S.%fZ")
    return datetime_obj.strftime("%Y-%m-%d")

你可以将索引上次更新的日期保存到变量中并打印出该日期。

last_update_date = updated_last(es, index_name)
print(last_update_date)

更新你的数据

现在,你可以创建一个函数来检查自上次更新索引和当前日期以来是否有任何新数据。 如果对象有效并且数据不为空,它将更新索引并让你知道是否没有新数据要更新,或者 DataFrame 是否返回 None 类型,表明可能存在问题。

def update_new_data(df, es, last_update_date, index_name):
    if isinstance(last_update_date, str):
        last_update_date = datetime.strptime(last_update_date, "%Y-%m-%d")

    last_update_date = pd.Timestamp(last_update_date).normalize()

    if not df.empty and "close_approach_date" in df.columns:
        df["close_approach_date"] = pd.to_datetime(df["close_approach_date"])

    today = pd.Timestamp(datetime.now().date()).normalize()

    if df is not None and not df.empty:
        update_range = df.loc[
            (df["close_approach_date"] > last_update_date)
            & (df["close_approach_date"] < today)
        ]
        if not update_range.empty:
            helpers.bulk(es, doc_generator(update_range, index_name))
        else:
            print("No new data to update.")
    else:
        print("The DataFrame is None.")

如果 DataFrame 是有效对象,它将调用你编写的函数并更新索引(如果适用)。 它还会打印出索引上次更新的日期,以帮助你在需要时进行调试。 如果没有,它会告诉你可能有问题。

try:
    if df is None:
        raise ValueError("DataFrame is None. There may be a problem.")
    update_new_data(df, es, last_update_date, index_name)
    print(updated_last(es, index_name))
except Exception as e:
    print(f"An error occurred: {e}")

保持索引最新

现在你已经创建了用于本地测试的框架,你可以设置一个环境,你可以每天运行脚本来检查是否有任何新数据可用并相应地更新索引。

创建云函数

你现在已准备好部署云功能。 为此,你需要选择环境作为第二代函数,命名你的函数,然后选择云区域。 你还可以将其绑定到 Cloud Pub/Sub 触发器,并选择创建新主题(如果你尚未创建)。 你可以在 GitHub 上查看本节的完整代码。

创建 Pub/Sub 主题

创建新主题时,你可以命名主题 ID 并选择使用 Google 管理的加密密钥进行加密。

设置 Cloud Function 的环境变量

在 “Runtime environment variables” 下方,你可以添加 NASA_API_KEY、ELASTIC_CLOUD_ID 和 ELASTIC_API_KEY 的环境变量。 你需要将它们保存为原始值,并且不带单引号。 因此,如果你之前在终端中输入了 “xxxlsdgzxxxxx” 值,你会希望它是 xxxlsdgzxxxxx。

调整你的代码并将其添加到你的云函数

输入环境变量后,你可以按 “下一步(next)” 按钮,这将带你进入代码编辑器。 你需要选择 Python 3.12.1 的运行时或匹配你正在使用的 Python 版本。 之后,将入口点更新为 update_index。 入口点的作用与 Python 中的 main 函数类似。

你将希望使用 os 来执行更自动化的过程,而不是使用 getpass 来获得环境变量(账号信息)。 示例如下所示:

elastic_cloud_id = os.getenv("ELASTIC_CLOUD_ID")
elastic_api_key = os.getenv("ELASTIC_API_KEY")

你需要调整脚本的顺序,以使函数首先连接到 Elasticsearch。 之后,你将想知道索引上次更新的时间,连接到你正在使用的 NASA API,将其保存到 DataFrame,并加载可能可用的任何新数据。

你可能会注意到底部有一个名为 update_index 的新函数,它将你的代码联系在一起。 在此函数中,你定义索引的名称,连接到 Elasticsearch,计算出索引的最后更新日期,连接到 NASA API,将结果保存到数据框中,并在需要时更新索引。 要指示入口点函数是云事件,你可以使用装饰器 @functions_framework.cloud_event 来表示它。

@functions_framework.cloud_event
def update_index(cloud_event):
    index_name = "asteroid_data_set"
    es = connect_to_elastic()
    last_update_date = updated_last(es, index_name)
    print(last_update_date)
    response = connect_to_nasa(last_update_date)
    df = create_df(response)
    if df is not None:
      update_new_data(df, es, last_update_date, index_name)
      print(updated_last(es, index_name)) 
    else:
      print("No new data was retrieved.")

这是完整更新的代码示例:

import functions_framework
import requests
import os
import pandas as pd
from datetime import datetime
from elasticsearch import Elasticsearch, helpers


def connect_to_elastic():
    elastic_cloud_id = os.getenv("ELASTIC_CLOUD_ID")
    elastic_api_key = os.getenv("ELASTIC_API_KEY")
    return Elasticsearch(cloud_id=elastic_cloud_id, api_key=elastic_api_key)


def connect_to_nasa(last_update_date):
    url = "https://api.nasa.gov/neo/rest/v1/feed"
    nasa_api_key = os.getenv("NASA_API_KEY")
    params = {
        "api_key": nasa_api_key,
        "start_date": last_update_date,
        "end_date": datetime.now(),
    }
    return requests.get(url, params).json()


def create_df(response):
    all_objects = []
    for date, objects in response["near_earth_objects"].items():
        for obj in objects:
            obj["close_approach_date"] = date
            all_objects.append(obj)
    df = pd.json_normalize(all_objects)
    return df.drop("close_approach_data", axis=1)


def doc_generator(df, index_name):
    for index, document in df.iterrows():
        yield {
            "_index": index_name,
            "_id": f"{document['close_approach_date']}",
            "_source": document.to_dict(),
        }


def updated_last(es, index_name):
    query = {
        "size": 0,
        "aggs": {"last_date": {"max": {"field": "close_approach_date"}}},
    }
    response = es.search(index=index_name, body=query)
    last_updated_date_string = response["aggregations"]["last_date"]["value_as_string"]
    datetime_obj = datetime.strptime(last_updated_date_string, "%Y-%m-%dT%H:%M:%S.%fZ")
    return datetime_obj.strftime("%Y-%m-%d")


def update_new_data(df, es, last_update_date, index_name):
    if isinstance(last_update_date, str):
        last_update_date = datetime.strptime(last_update_date, "%Y-%m-%d")

    last_update_date = pd.Timestamp(last_update_date).normalize()

    if not df.empty and "close_approach_date" in df.columns:
        df["close_approach_date"] = pd.to_datetime(df["close_approach_date"])

    today = pd.Timestamp(datetime.now().date()).normalize()

    if df is not None and not df.empty:
        update_range = df.loc[
            (df["close_approach_date"] > last_update_date)
            & (df["close_approach_date"] < today)
        ]
        print(update_range)
        if not update_range.empty:
            helpers.bulk(es, doc_generator(update_range, index_name))
        else:
            print("No new data to update.")
    else:
        print("The DataFrame is empty or None.")


# Triggered from a message on a Cloud Pub/Sub topic.
@functions_framework.cloud_event
def hello_pubsub(cloud_event):
    index_name = "asteroid_data_set"
    es = connect_to_elastic()
    last_update_date = updated_last(es, index_name)
    print(last_update_date)
    response = connect_to_nasa(last_update_date)
    df = create_df(response)
    try:
        if df is None:
            raise ValueError("DataFrame is None. There may be a problem.")
        update_new_data(df, es, last_update_date, index_name)
        print(updated_last(es, index_name))
    except Exception as e:
        print(f"An error occurred: {e}")

添加 requirements.txt 文件

你还需要定义一个requirements.txt 文件,其中包含运行代码所需的所有指定包。

functions-framework==3.*
requests==2.31.0
elasticsearch==8.12.0
pandas==2.1.4

调度你的云函数

在 Cloud Scheduler 中,你可以将函数设置为使用 unix cron 格式定期运行。 我将代码设置为每天早上 8 点在我的时区运行。

你还需要配置执行以连接到你之前创建的 Pub/Sub 主题。 我目前将消息正文设置为 “hello”。

现在你已经设置了 Pub/Sub 主题和 Cloud Function 并将该 Cloud Function 设置为按计划运行,只要出现新数据,你的索引就会自动更新。

结论

使用 Python、Google Cloud Platform Functions 和 Google Cloud Scheduler,你应该能够确保定期更新索引。 你可以在此处找到完整的代码以及用于本地测试的搜索实验室笔记本。 我们还与 Google Cloud 一起举办了一场点播网络研讨会,如果你想构建搜索应用程序,这可能是一个不错的下一步。 如果你基于此博客构建了任何内容,或者如果你对我们的讨论论坛和社区 Slack 频道有疑问,请告诉我们。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1538562.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot项目配置文件不生效问题解决

目录 一、问题描述 二、问题解决过程 三、总结 四、拓展 一、问题描述 近期在写一个RabbitMQ基于springboot的使用damon的时候&#xff0c;在启动消费者服务的时候一直发现连接不上RabbitMQ&#xff0c;很是纳闷&#xff0c;配置文件大概如下&#xff1a; server:port:…

Linux相关命令(1)

1、找出文件夹下包含 “aaa” 同时不包含 “bbb”的文件&#xff0c;然后把他们重新生成一下。要求只能用一行命令。 find ./ -type f -name "*aaa*" ! -name "*bbb*" -exec touch {} \;文件系统操作命令 df&#xff1a;列出文件系统的整体磁盘使用情况 …

欧科云链OKLink:坎昆升级后,Layer2项目是否更具竞争力?

在坎昆升级激活之际&#xff0c;OKLink 上线以太坊坎昆升级 Dencun 专题页 &#x1f449; 从专业链上数据分析角度&#xff0c;带来一场充实且即时的 Layer2 数据盛宴。 在近日由 137Labs 发起&#xff0c;Cointime 主持的 Layer2 生态专场讨论中&#xff0c;OKLink 产品…

【办公类-16-07-08】“2023下学期 大班户外游戏2(做成打印用的的贴墙版样式--A4横版撑满)”(python 排班表系列)

背景需求&#xff1a; 运用代码做出了中班每个班级用的户外游戏&#xff08;新版&#xff09;的表格&#xff08;包含有场地的贴墙版和无场地的贴周计划版&#xff09; 【办公类-16-07-07】“2023下学期 大班户外游戏2&#xff08;有场地和无场地版&#xff0c;每天不同场地&…

python爬虫之xpath+多进程爬取百度贴吧实战

文章目录 抓取百度贴吧的某一个帖子的评论内容前言先查看贴吧的robots.txt页面结构分析评论者头像&#xff0c;用户抓取评论内容的抓取评论下回复内容的抓取 源码实现贴吧抓取过程源码实现多进程的实现 抓取百度贴吧的某一个帖子的评论内容 前言 本项目实战是用来学习用&#…

uniapp自定义导航栏左中右内容和图标,以及点击事件

uniapp自定义导航栏左中右内容和图标&#xff0c;以及点击事件 效果&#xff1a; 页面&#xff1a; <view class"navigation-bar"><view class"navigation-bar-left" click"navigateBack"><u-icon name"arrow-left"…

曲面斑马纹分析

曲面斑马纹分析是一种在曲面设计和质量检测中广泛使用的技术&#xff0c;其基本原理是利用明暗相间的光线照射到物体表面经反射产生的纹路来评估曲面的连续性和光顺性。这些斑马纹不仅美观&#xff0c;更重要的是它们能直观地展示曲面上的几何特性&#xff0c;帮助设计师和工程…

C++入门笔记开源【研究生3年+7W字】

博主研究生3年时间积累了一个C的基础知识文档&#xff0c;共计7W字。几乎把常用的各种语法和接口都包含进去了。一个文档&#xff0c;markdown格式的&#xff0c;可以当做工具书来使用。由于本文档内容较多&#xff0c;直接复制到csdn会各种卡&#xff0c;而且图片链接不对&…

【循环神经网络rnn】一篇文章讲透

目录 引言 二、RNN的基本原理 代码事例 三、RNN的优化方法 1 长短期记忆网络&#xff08;LSTM&#xff09; 2 门控循环单元&#xff08;GRU&#xff09; 四、更多优化方法 1 选择合适的RNN结构 2 使用并行化技术 3 优化超参数 4 使用梯度裁剪 5 使用混合精度训练 …

C++剑指offer与高频面试题源码解答与分析

这是博主在当初秋招刷题时候记录的剑指offer第二版以及一些高频题的C源码和解法分析&#xff0c;可以说把这上面的题练好了面试不虚&#xff0c;最后也顺利帮助我拿下baidu ali meituan等多家大厂offer。整篇文章写了大概5W个字&#xff0c;也是积累了很长一段时间的作品&#…

数据分析案例-国际象棋顶级棋手数据可视化分析(文末送书)

&#x1f935;‍♂️ 个人主页&#xff1a;艾派森的个人主页 ✍&#x1f3fb;作者简介&#xff1a;Python学习者 &#x1f40b; 希望大家多多支持&#xff0c;我们一起进步&#xff01;&#x1f604; 如果文章对你有帮助的话&#xff0c; 欢迎评论 &#x1f4ac;点赞&#x1f4…

C++的内存模型

大家好&#xff1a; 衷心希望各位点赞。 您的问题请留在评论区&#xff0c;我会及时回答。 内存分区模型 C程序在执行时&#xff0c;将内存大方向划分为4个区域。 代码区&#xff1a;存放函数体的二进制代码&#xff0c;由操作系统进行管理。 全局区&#xff1a;存放全局变…

是时候来唠一唠synchronized关键字了,Java多线程的必问考点!

写在开头 在之前的博文中&#xff0c;我们介绍了volatile关键字&#xff0c;Java中的锁以及锁的分类&#xff0c;今天我们花5分钟时间&#xff0c;一起学习一下另一个关键字&#xff1a;synchronized。 synchronized是什么&#xff1f; 首先synchronized是Java中的一个关键字…

机器学习-06-无监督算法-01-划分聚类Kmeans算法

总结 本系列是机器学习课程的系列课程&#xff0c;主要介绍机器学习中无监督算法&#xff0c;包括划分聚类等。 参考 数据分析实战 | K-means算法——蛋白质消费特征分析 欧洲48国英文名称的来龙去脉及其国旗动画 Kmeans在线动态演示 本门课程的目标 完成一个特定行业的…

基于modbus TCP实现EPICS与西门子S7 1200系列1215C PLC的通信

PLC介绍 西门子系列PLC在国内的市场占比第一&#xff0c;1200系列中小型PLC&#xff0c;因其众多的产品序列、强大的通讯功能和丰富扩展模块&#xff0c;被使用在工业生产、自动化生产线、智能制造、机器人等各行各业。根据CPU的供电电源的型号和数字量输出的类型&#xff0c;…

科技革新背后:码垛机器人在不同领域的实践应用

随着科技的进步&#xff0c;机器人技术已经渗透到各个行业之中&#xff0c;成为提高生产效率、减少人工成本的重要工具。码垛机器人作为自动化技术的杰出代表&#xff0c;其在各个行业中的应用场景日益广泛&#xff0c;从食品饮料到化工产品&#xff0c;再到物流仓储&#xff0…

矩阵计算-线性系统和 LU 分解

一、三角系统 …… 二、高斯消元法 …… 三、LU分解--直接三角分解法 求解线性方程Axb&#xff1a; 参考视频&#xff1a;【数值分析】矩阵LU三角分解| 速成讲解 考试宝典_哔哩哔哩_bilibili 令ALU&#xff0c;其中L是单位下三角矩阵&#xff08;对角线上元素都是1&#xff…

探秘国内IP代理购买:全面解析与实用建议

随着网络空间的不断发展和扩大&#xff0c;越来越多的用户需要在互联网上获取访问其他国家或地区网站的需求。而为了实现这一目的&#xff0c;使用IP代理服务成为了一种常见的方式。在国内&#xff0c;选择合适的IP代理服务商并购买适合自己需求的IP代理已成为许多人关心的问题…

JavaScript 权威指南第七版(GPT 重译)(二)

第四章&#xff1a;表达式和运算符 本章记录了 JavaScript 表达式以及构建许多这些表达式的运算符。表达式 是 JavaScript 的短语&#xff0c;可以 评估 以产生一个值。在程序中直接嵌入的常量是一种非常简单的表达式。变量名也是一个简单表达式&#xff0c;它评估为分配给该变…

mongodb文档数据建模

基础建模 内嵌方法和数组方完成关系表述 内嵌一对一关系建模 数组内嵌一对N 关系建模 数组内嵌对象多对多关系建模 文档模型设计之二&#xff1a;工况细化 join 查询 不支持外键 设计模式集锦 版本迭代加schema_version 字段 频繁写入改为时间区间写入 聚合变预聚合方式 采用…