Elasticsearch:将数据从 Elasticsearch 和 Kibana 导出到 Pandas Dataframe

news2025/1/13 3:00:40

在这篇文章中,我们将看到如何从 Elasticsearch 索引和 Kibana 的 CSV 报告中导出数据 - post-url 到 pandas 数据帧。 数据的可视化可以在 Kibana 中完成,但如果你想对数据进行更精细的分析并创建更动态的可视化,将数据导出到 pandas dataframe 将是一个不错的选择。

在如下的演示中,我将使用 Elastic Stack 8.5.3 来进行展示。

安装

为了说明问题的方便,我们可以选择只有基本安全的 Elastic Stack 安装。我们可以参考之前的文章 “Elastic Stack 8.0 安装 - 保护你的 Elastic Stack 现在比以往任何时候都简单” 中的 “如何配置 Elasticsearch 只带有基本安全” 章节。针对我们的安装,我们配置 Elasticsearch 的超级用户 elastic 的密码为 password。你也可以参考另外一篇文章 “Elasticsearch:如何在 Docker 上运行 Elasticsearch 8.x 进行本地开发” 进行安装。

准备数据

我们选用 Kibana 中自带的数据来进行展示。我们打开 Kibana:

这样就有一个叫做 kibana_sample_data_logs 的索引在 Elasticsearch 中被创造。我们在 Discover 中打开:

如上所示,我们可以看到数据的时序直方图。我们选择合适的时间区域,然后添加一个 filter:

如上所示,我们仅做了一个很简单的表格。第一行是 timestamp,而第二行是 geo.dest。我们在搜索中创建了一个 geo.src 为 US 的过滤器。我们保存当前的搜索:

如上所示,我们有两种方法可以得到一个 CSV 格式的输出。一中是使用 Generate CSV 按钮。点击它后,我们可以在如下的地址下载相应的 CSV 文件。 

我们可以看到如上所示 CSV 输出。另外一种方式是使用 POST URL 来通过软件的方式来获得这个数据。我们在上面的图中选择 Copy POST URL。我们可以得到如下所示的一个链接:

http://localhost:5601/api/reporting/generate/csv_searchsource?jobParams=%28browserTimezone%3AAsia%2FShanghai%2Ccolumns%3A%21%28timestamp%2Cgeo.dest%29%2CobjectType%3Asearch%2CsearchSource%3A%28fields%3A%21%28%28field%3Atimestamp%2Cinclude_unmapped%3Atrue%29%2C%28field%3Ageo.dest%2Cinclude_unmapped%3Atrue%29%29%2Cfilter%3A%21%28%28meta%3A%28field%3Atimestamp%2Cindex%3A%2790943e30-9a47-11e8-b64d-95841ca0b247%27%2Cparams%3A%28%29%29%2Cquery%3A%28range%3A%28timestamp%3A%28format%3Astrict_date_optional_time%2Cgte%3Anow-7d%2Fd%2Clte%3Anow%29%29%29%29%29%2Cindex%3A%2790943e30-9a47-11e8-b64d-95841ca0b247%27%2Cparent%3A%28filter%3A%21%28%28%27%24state%27%3A%28store%3AappState%29%2Cmeta%3A%28alias%3A%21n%2Cdisabled%3A%21f%2Cindex%3A%2790943e30-9a47-11e8-b64d-95841ca0b247%27%2Ckey%3Ageo.src%2Cnegate%3A%21f%2Cparams%3A%28query%3AUS%29%2Ctype%3Aphrase%29%2Cquery%3A%28match_phrase%3A%28geo.src%3AUS%29%29%29%29%2Cindex%3A%2790943e30-9a47-11e8-b64d-95841ca0b247%27%2Cquery%3A%28language%3Akuery%2Cquery%3A%27%27%29%29%2Csort%3A%21%28%28timestamp%3Adesc%29%29%2CtrackTotalHits%3A%21t%29%2Ctitle%3Asrc-US%2Cversion%3A%278.5.3%27%29

方法一:从 Kibana 中获取数据

我们把上面的链接复制并粘贴到如下的代码中:

kibana-to-pandas.py

import pandas as pd
import requests
from requests.auth import HTTPBasicAuth
from io import StringIO
import json
import time

kibana_ip = "0.0.0.0"

headers = {"kbn-xsrf": "reporting"}

#  post_url = 'http://' + kibana_ip + \
    # '/api/reporting/generate/csv?jobParams=(conflictedTypesFields:!(),fields:!(xxxxx))'
    
post_url = "http://localhost:5601/api/reporting/generate/csv_searchsource?jobParams=%28browserTimezone%3AAsia%2FShanghai%2Ccolumns%3A%21%28timestamp%2Cgeo.dest%29%2CobjectType%3Asearch%2CsearchSource%3A%28fields%3A%21%28%28field%3Atimestamp%2Cinclude_unmapped%3Atrue%29%2C%28field%3Ageo.dest%2Cinclude_unmapped%3Atrue%29%29%2Cfilter%3A%21%28%28meta%3A%28field%3Atimestamp%2Cindex%3A%2790943e30-9a47-11e8-b64d-95841ca0b247%27%2Cparams%3A%28%29%29%2Cquery%3A%28range%3A%28timestamp%3A%28format%3Astrict_date_optional_time%2Cgte%3Anow-7d%2Fd%2Clte%3Anow%29%29%29%29%29%2Cindex%3A%2790943e30-9a47-11e8-b64d-95841ca0b247%27%2Cparent%3A%28filter%3A%21%28%28%27%24state%27%3A%28store%3AappState%29%2Cmeta%3A%28alias%3A%21n%2Cdisabled%3A%21f%2Cindex%3A%2790943e30-9a47-11e8-b64d-95841ca0b247%27%2Ckey%3Ageo.src%2Cnegate%3A%21f%2Cparams%3A%28query%3AUS%29%2Ctype%3Aphrase%29%2Cquery%3A%28match_phrase%3A%28geo.src%3AUS%29%29%29%29%2Cindex%3A%2790943e30-9a47-11e8-b64d-95841ca0b247%27%2Cquery%3A%28language%3Akuery%2Cquery%3A%27%27%29%29%2Csort%3A%21%28%28timestamp%3Adesc%29%29%2CtrackTotalHits%3A%21t%29%2Ctitle%3Asrc-US%2Cversion%3A%278.5.3%27%29"
    
# print(post_url)

post_url_data = requests.post(post_url, \
    auth = HTTPBasicAuth('elastic', 'password'), \
        headers = headers)

get_api_json = json.loads(post_url_data.text)

print(get_api_json)

time.sleep(10)

print(get_api_json['path'])

api_url = "http://" + "localhost:5601" + get_api_json['path']
print(api_url)

csv_url = requests.get(api_url, \
    auth = HTTPBasicAuth('elastic', 'password'), \
        headers = headers)

print(csv_url)

traffic_data = pd.read_csv(StringIO(csv_url.text))

print(traffic_data.head())
print(traffic_data)

在上面的代码中,特别需要注意的是:

time.sleep(10)

我们在发送完请求后,需要等待一定的时间让 Kibana 做相应的处理,并得到相应的数据。在上面,我使用了超级用户 elastic 的账号信息。运行上面的代码:

$ pwd
/Users/liuxg/python/pandas
$ ls
kibana-to-pandas.py
$ python kibana-to-pandas.py 
{'path': '/api/reporting/jobs/download/ldfcx8kq1k9b9c2bfe5reij0', 'job': {'id': 'ldfcx8kq1k9b9c2bfe5reij0', 'index': '.reporting-2023-01-22', 'jobtype': 'csv_searchsource', 'created_at': '2023-01-28T02:51:55.178Z', 'created_by': 'elastic', 'meta': {'objectType': 'search'}, 'status': 'pending', 'attempts': 0, 'migration_version': '7.14.0', 'payload': {'browserTimezone': 'Asia/Shanghai', 'columns': ['timestamp', 'geo.dest'], 'objectType': 'search', 'searchSource': {'fields': [{'field': 'timestamp', 'include_unmapped': 'true'}, {'field': 'geo.dest', 'include_unmapped': 'true'}], 'filter': [{'meta': {'field': 'timestamp', 'index': '90943e30-9a47-11e8-b64d-95841ca0b247', 'params': {}}, 'query': {'range': {'timestamp': {'format': 'strict_date_optional_time', 'gte': 'now-7d/d', 'lte': 'now'}}}}], 'index': '90943e30-9a47-11e8-b64d-95841ca0b247', 'parent': {'filter': [{'$state': {'store': 'appState'}, 'meta': {'alias': None, 'disabled': False, 'index': '90943e30-9a47-11e8-b64d-95841ca0b247', 'key': 'geo.src', 'negate': False, 'params': {'query': 'US'}, 'type': 'phrase'}, 'query': {'match_phrase': {'geo.src': 'US'}}}], 'index': '90943e30-9a47-11e8-b64d-95841ca0b247', 'query': {'language': 'kuery', 'query': ''}}, 'sort': [{'timestamp': 'desc'}], 'trackTotalHits': True}, 'title': 'src-US', 'version': '8.5.3'}, 'output': {}}}
/api/reporting/jobs/download/ldfcx8kq1k9b9c2bfe5reij0
http://localhost:5601/api/reporting/jobs/download/ldfcx8kq1k9b9c2bfe5reij0
<Response [200]>
                     timestamp geo.dest
0  Jan 28, 2023 @ 09:15:02.127       CN
1  Jan 28, 2023 @ 09:00:52.596       CN
2  Jan 28, 2023 @ 08:17:33.769       IN
3  Jan 28, 2023 @ 05:15:19.548       RU
4  Jan 28, 2023 @ 04:18:45.660       KE
                        timestamp geo.dest
0     Jan 28, 2023 @ 09:15:02.127       CN
1     Jan 28, 2023 @ 09:00:52.596       CN
2     Jan 28, 2023 @ 08:17:33.769       IN
3     Jan 28, 2023 @ 05:15:19.548       RU
4     Jan 28, 2023 @ 04:18:45.660       KE
...                           ...      ...
1608  Jan 21, 2023 @ 11:30:30.110       TJ
1609  Jan 21, 2023 @ 11:14:28.231       IN
1610  Jan 21, 2023 @ 11:05:31.057       FR
1611  Jan 21, 2023 @ 10:40:26.055       TR
1612  Jan 21, 2023 @ 10:24:53.405       IN

[1613 rows x 2 columns]

从上面的输出中,我们可以看到 pandas 的 dataframe 输出。

请注意,它只能根据您在 kibana.yml 文件中指定的字节大小检索部分数据。 请增加 xpack.reporting.csv.maxSizeBytes 的值以获得完整数据。

方法二:从 Elasticsearch 中获取数据

下面的代码片段将有助于直接从 Elasticsearch 索引中检索数据,但它不足以检索大量数据,因此你可能需要根据你的情况决定使用 kibana 的 csv 报告还是 Elasticsearch 索引要求。

关于如何连接到 Elasticsearch,请参阅我之前的文章 “Elasticsearch:关于在 Python 中使用 Elasticsearch 你需要知道的一切 - 8.x”。我们创建如下的一个 python 文件:

elasticsearch-to-pandas.py

from elasticsearch import Elasticsearch
import pandas as pd
import numpy as np
import json

# create a client instance of the library

es = Elasticsearch("http://localhost:9200", basic_auth=("elastic", "password"))
resp = es.info()
# print(resp)

total_docs = 50

search_query = {
    "match_all": {}
}

response = es.search(
    _source="false",
    index='kibana_sample_data_logs',
    query=search_query,
    size=total_docs,
    fields=[
        "@timestamp", 
        "clientip",
        "host"
    ]
)

# print(response)

elastic_docs = response["hits"]["hits"]
# print(elastic_docs)

fields = {}
for num, doc in enumerate(elastic_docs):
    fields_data = doc["fields"]
    for key, val in fields_data.items():
        try:
            fields[key] = np.append(fields[key], val)
        except KeyError:
            fields[key] = np.array([val])

# print(fields)
traffic_data = pd.DataFrame(fields)
print(traffic_data.info())

在上面我们做了如下的一个搜索:

GET kibana_sample_data_logs/_search
{
  "_source": false, 
  "query": {
    "match_all": {}
  },
  "fields": [
    "@timestamp", 
    "clientip",
    "host"
  ]
}

上面的查询返回如下的结果:

{
  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 10000,
      "relation": "gte"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "kibana_sample_data_logs",
        "_id": "Gf8y9oUBLUWnAhRe7p8u",
        "_score": 1,
        "fields": {
          "@timestamp": [
            "2023-01-15T00:39:02.912Z"
          ],
          "clientip": [
            "223.87.60.27"
          ],
          "host": [
            "artifacts.elastic.co"
          ]
        }
      },
      {
        "_index": "kibana_sample_data_logs",
        "_id": "Gv8y9oUBLUWnAhRe7p8u",
        "_score": 1,
        "fields": {
          "@timestamp": [
            "2023-01-15T03:26:21.326Z"
          ],
          "clientip": [
            "130.246.123.197"
          ],
          "host": [
            "www.elastic.co"
          ]
        }
      },
      {
        "_index": "kibana_sample_data_logs",
        "_id": "G_8y9oUBLUWnAhRe7p8u",
        "_score": 1,
        "fields": {
          "@timestamp": [
            "2023-01-15T03:30:25.131Z"
          ],
          "clientip": [
            "120.49.143.213"
          ],
          "host": [
            "cdn.elastic-elastic-elastic.org"
          ]
        }
      },
      {
        "_index": "kibana_sample_data_logs",
        "_id": "HP8y9oUBLUWnAhRe7p8u",
        "_score": 1,
        "fields": {
          "@timestamp": [
            "2023-01-15T03:34:43.399Z"
          ],
          "clientip": [
            "99.74.118.237"
          ],
          "host": [
            "artifacts.elastic.co"
          ]
        }
      },
      {
        "_index": "kibana_sample_data_logs",
        "_id": "Hf8y9oUBLUWnAhRe7p8u",
        "_score": 1,
        "fields": {
          "@timestamp": [
            "2023-01-15T03:37:04.863Z"
          ],
          "clientip": [
            "177.111.217.54"
          ],
          "host": [
            "www.elastic.co"
          ]
        }
      },
      {
        "_index": "kibana_sample_data_logs",
        "_id": "Hv8y9oUBLUWnAhRe7p8u",
        "_score": 1,
        "fields": {
          "@timestamp": [
            "2023-01-15T03:49:40.669Z"
          ],
          "clientip": [
            "106.225.58.146"
          ],
          "host": [
            "www.elastic.co"
          ]
        }
      }
   ...

运行上面的代码,我们可以得到如下的数据:

$ pwd
/Users/liuxg/python/pandas
$ ls
elasticsearch-to-pandas.py kibana-to-pandas.py
$ python elasticsearch-to-pandas.py 
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 50 entries, 0 to 49
Data columns (total 3 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   @timestamp  50 non-null     object
 1   clientip    50 non-null     object
 2   host        50 non-null     object
dtypes: object(3)
memory usage: 1.3+ KB
None

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/181910.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

LeetCode 55. 跳跃游戏 45. 跳跃游戏 II 22. 括号生成 53. 最大子数组和

&#x1f308;&#x1f308;&#x1f604;&#x1f604; 55. 跳跃游戏 一、力扣示例 二、解决办法 三、代码实现 45. 跳跃游戏 II 一、力扣示例 二、解决办法 三、代码实现 22. 括号生成 一、力扣示例 二、解决办法 三、代码实现 53. 最大子数组和 一、力扣示例 …

WebAssembly编译之(3)-WASM编译实战之C/C++导出asm.js及wasm库

引言 上一节我们介绍了Ubuntu下的WASM的编译环境快速搭建。这一节我们继续WASM编译相关的介绍——如何导出C/C编写的函数库 WASM 相关文档&#xff1a; WebAssembly编译之(1)-asm.js及WebAssembly原理介绍 WebAssembly编译之(2)-Ubuntu搭建WASM编译环境 单个C文件(*.cpp)的导出…

每日学术速递1.28

CV - 计算机视觉 | ML - 机器学习 | RL - 强化学习 | NLP 自然语言处理 今天带来的arXiv上最新发表的3篇AI论文。 Subjects: cs.AI、cs.Cv 1.Revisiting Temporal Modeling for CLIP-based Image-to-Video Knowledge Transferring 标题&#xff1a;重新审视基于CLIP的图像-视…

计讯物联5G工业级路由器在智慧消防的功能解析

据悉&#xff0c;国务院安全生产委员会印发《“十四五”国家消防工作规划》&#xff08;以下简称《规划》&#xff09;&#xff0c;对“十四五”时期消防改革发展作出全面部署。《规划》提出&#xff0c;坚持防消一体、防救并重&#xff1b;加强改革创新&#xff0c;加快消防“…

11 Day : 编写操作系统中断程序,加快时钟

前言&#xff1a;昨天学习了中断&#xff0c;今天就废话不多说&#xff0c;直接编写程序吧 内容更新&#xff1a;之前有朋友说看不太懂我的代码写的是啥&#xff0c;能不能详细讲讲&#xff0c;所以本期开始我会详细讲解代码&#xff0c;也会同步更新之前的博客&#xff0c;大多…

java基础巩固-宇宙第一AiYWM:为了维持生计,做项目经验之~高速项目大数据及机器学习算法方面的思路总结~整起

原始项目可能主要的功能是接收下位机传送来的很多参数&#xff0c;然后将参数以不同形式表达出来&#xff0c;在此过程中会涉及到文件上传下载、excel表格导出…等&#xff0c;但是呢&#xff0c;这么多数据不玩一下岂不是太浪费。于是&#xff0c;额们决定这样来: 项目中有一个…

Metasploit工具使用(上)

Metasploit工具使用1.Metasploit简介1.1.Metasploit下载1.2.Metasploit框架结构1.2.1.框架路径1.2.2.框架内容介绍1.2.2.1.data目录文件1.2.2.2.modules目录文件1.2.2.3.scripts目录文件1.2.2.4.tools目录文件1.2.2.5.plugins目录文件1.3.Metasploit更新2.MSF中数据库设置2.1.数…

动态与静态函数库的的使用 和 区别 及 优缺点

这里写目录标题初识静态库与动态库静态函数库动态函数库初识静态库与动态库 静态函数库与动态函数库的使用中&#xff0c;有人也把他称为程序的静态链接及动态链接。 静态链接&#xff1a;指程序链接时使用静态库的链接方式&#xff0c;把所有需要的库函数加入&#xff08;拷贝…

Vue3商店后台管理系统设计文稿篇(七)

记录使用vscode构建Vue3商店后台管理系统&#xff0c;这是第七篇&#xff0c;主要记录系统登录页面的创建过程&#xff0c;包含完整vue登录页面代码&#xff1b;Vuex的相关知识以及具体的使用&#xff0c;对state中值得获取&#xff0c;修改&#xff0c;异步修改&#xff0c;分…

Gradle学习笔记之Hook生命周期

简介 Gradle生命周期中的hook&#xff08;钩子&#xff09;函数是由gradle自动回调的&#xff0c;可以用来帮助我们实现一些功能&#xff1a; Gradle在生命周期各个阶段都提供了用于回调的钩子函数: Gradle初始化阶段: 在settings.gradle执行完后&#xff0c;会回调Gradle对…

2022爱分析・智能客服厂商全景报告 | 爱分析报告

报告编委 张扬 爱分析联合创始人&首席分析师 文鸿伟 爱分析高级分析师 王鹏 爱分析分析师 目录 研究范围定义厂商全景地图市场分析与厂商评估入选厂商列表研究范围定义 研究范围 在数字化快速发展的大背景下&#xff0c;随着消费人群及其消费意识的转变&#xff0c;客户对…

亚马逊云科技凭借多年云业务经验,协同合作伙伴快速展开生态化创新

在过去的两周里&#xff0c;ChatGPT的热度居高不下&#xff0c;引发全网讨论。虽然AlphaGo这类AI产品也曾引起热议&#xff0c;但是在应用层面终究还是离用户太远了。而ChatGPT更像是「民用级」的产品&#xff0c;真正意义上让AI技术跨入广泛破圈应用时代。在当下&#xff0c;机…

大数据-Hive

第1章 Hive入门 1.1 什么是Hive 1&#xff09;Hive简介 Hive是由Facebook开源&#xff0c;基于Hadoop的一个数据仓库工具&#xff0c;可以将结构化的数据文件映射为一张表&#xff0c;并提供类SQL查询功能。 2&#xff09;Hive本质 Hive是一个Hadoop客户端&#xff0c;用于…

springboot项目解决@ResponseBody注解返回xml格式数据而不是json格式的问题

目录 1.说明 2.解决 1.说明 一般情况下&#xff0c;RestController中的接口默认响应数据格式都是 json 格式的数据&#xff0c;但有时候使用某些依赖包&#xff0c;会影响ResponseBody的响应数据类型为xml格式&#xff0c; 例&#xff1a; 2.解决 但我们希望响应数据格式是…

使用腾讯云服务器+Nonebot2+go-cqhttp搭建QQ聊天机器人

文章目录一、查看conda版本二、查看系统版本三、配置go-cqhttp1.请切换至同一网络下扫码2.打包Docker镜像四、创建NoneBot环境安装脚手架一、查看conda版本 二、查看系统版本 uname -a arch getconf LONG_BIT三、配置go-cqhttp 下载go-cqhttp 这里有不同版本的cqhttp,并且对…

【数据结构】——如何设计一个链表?(设计链表)

本文主题&#xff1a;通过一道题目&#xff0c;学习链表的基本操作 更多算法&#xff1a;动态规划 ✔️ 边界控制 我的主页&#xff1a;蓝色学者的主页 文章目录一、前言二、题目信息三、解决方案3.0什么是链表&#xff1f;3.1节点的概念虚拟头节点3.2链表创建3.3头插/尾插3…

JUC面试(十三)——锁膨胀

锁膨胀 monitor概念 Monitor是 Java中用以实现线程之间的互斥与协作的主要手段&#xff0c;它可以看成是对象或者 Class的锁。每一个对象都有&#xff0c;也仅有一个 monitor。上面这个图&#xff0c;描述了线程和 Monitor之间关系&#xff0c;以及线程的状态转换图。 进入区…

windows11 永久关闭windows defender的方法

1、按键盘上的windows按键&#xff0c;再点【设置】选项。 2、点击左侧菜单的【隐私和安全性】&#xff0c;再点击列表的【Windows安全中心】选项。 3、点击界面的【病毒和威胁保护】设置项。 4、病毒保护的全部关闭 5、别人的图&#xff08;正常是都开着的&#xff09; 6、终极…

为什么看上去很简单的智慧功能点要价上千万?

人工智能&#xff08;Artificial Intelligence&#xff0c;AI&#xff09;已经不是什么新概念&#xff0c;第三次浪潮于2016年AlphaGo战胜李世石为标志正式开启&#xff0c;至今也已经走过6个年头。 发展至今&#xff0c;AI已经进入老百姓的日常生活&#xff0c;比如随处可见的…

【C语言】从0到1带你学会文件版动态通讯录

&#x1f307;个人主页&#xff1a;平凡的小苏 &#x1f4da;学习格言&#xff1a;别人可以拷贝我的模式&#xff0c;但不能拷贝我不断往前的激情 &#x1f6f8;C语言专栏&#xff1a;https://blog.csdn.net/vhhhbb/category_12174730.html 小苏希望大家能从这篇文章中收获到许…