Elasticsearch:ES|QL 入门 - Python Notebook

news2025/1/12 4:54:48

数据丰富在本笔记本中,你将学习 Elasticsearch 查询语言 (ES|QL) 的基础知识。 你将使用官方 Elasticsearch Python 客户端。

你将学习如何:

  • 运行 ES|QL 查询
  • 使用处理命令
  • 对表格进行排序
  • 查询数据
  • 链式处理命令
  • 计算值
  • 计算统计数据
  • 访问列
  • 创建直方图
  • 丰富数据
  • 处理数据

⚠️ 不要在生产环境中使用 ES|QL。 此功能处于技术预览阶段,可能会在未来版本中更改或删除。 Elastic 将努力解决任何问题,但技术预览版中的功能不受官方 GA 功能的支持 SLA 的约束。ES|QL 将在 8.13 正式发布(以官方发布为准)。

在一下的展示中,我将使用 Elastic Stack 8.12 来进行展示。

安装

安装 Elasticsarch 及 Kibana

如果你还没有安装好自己的 Elasticsearch 及 Kibana,请参考如下的链接来进行安装:

  • 如何在 Linux,MacOS 及 Windows 上进行安装 Elasticsearch
  • Kibana:如何在 Linux,MacOS 及 Windows上安装 Elastic 栈中的 Kibana

在安装的时候,我们选择 Elastic Stack 8.x 来进行安装。特别值得指出的是:ES|QL 只在 Elastic Stack 8.11 及以后得版本中才有。你需要下载 Elastic Stack 8.11 及以后得版本来进行安装。

在首次启动 Elasticsearch 的时候,我们可以看到如下的输出:

我们需要记下 Elasticsearch 超级用户 elastic 的密码。

我们还需要安装 Python 相关的包:

pip3 install elasticsearch
$ pip3 list | grep elasticsearch
elasticsearch               8.12.1

创建环境变量

我们在项目的根目录下创建如下的 .env 文件:

.env

ES_USER="elastic"
ES_PASSWORD="q2rqAIphl-fx9ndQ36CO"
ES_ENDPOINT="localhost"

 你需要根据自己的 Elasticsearch 的配置来修改上面的值。

创建应用

在项目的根目录下打入如下的命令:

jupyter notebook

拷贝 Elasticsearch 证书

我们把 Elasticsearch 的证书拷贝到当前的项目根目录下:

cp ~/elastic/elasticsearch-8.12.0/config/certs/http_ca.crt .

你需要根据自己的安装目录进行相应的修改。

导入包并连接到 Elasticsearch

from dotenv import load_dotenv
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
import os

load_dotenv()

ES_USER = os.getenv("ES_USER")
ES_PASSWORD = os.getenv("ES_PASSWORD")
ES_ENDPOINT = os.getenv("ES_ENDPOINT")

url = f"https://{ES_USER}:{ES_PASSWORD}@{ES_ENDPOINT}:9200"

es = Elasticsearch(
    hosts=[url], 
    ca_certs = "./http_ca.crt", 
    verify_certs = True
)

print(es.info())

添加 sample data 到 Elasticsearch 中

在为示例数据集建立索引之前,让我们使用正确的映射创建一个名为 sample_data 的索引。

index_name = "sample_data"

mappings = {
    "mappings": {
        "properties": {"client_ip": {"type": "ip"}, "message": {"type": "keyword"}}
    }
}

# Create the index
if not es.indices.exists(index=index_name):
    es.indices.create(index=index_name, body=mappings)

接下来,我们使用 Elasticsearch Python 客户端的 bulk helpers 来索引数据:

# Documents to be indexed
documents = [
    {
        "@timestamp": "2023-10-23T12:15:03.360Z",
        "client_ip": "172.21.2.162",
        "message": "Connected to 10.1.0.3",
        "event_duration": 3450233,
    },
    {
        "@timestamp": "2023-10-23T12:27:28.948Z",
        "client_ip": "172.21.2.113",
        "message": "Connected to 10.1.0.2",
        "event_duration": 2764889,
    },
    {
        "@timestamp": "2023-10-23T13:33:34.937Z",
        "client_ip": "172.21.0.5",
        "message": "Disconnected",
        "event_duration": 1232382,
    },
    {
        "@timestamp": "2023-10-23T13:51:54.732Z",
        "client_ip": "172.21.3.15",
        "message": "Connection error",
        "event_duration": 725448,
    },
    {
        "@timestamp": "2023-10-23T13:52:55.015Z",
        "client_ip": "172.21.3.15",
        "message": "Connection error",
        "event_duration": 8268153,
    },
    {
        "@timestamp": "2023-10-23T13:53:55.832Z",
        "client_ip": "172.21.3.15",
        "message": "Connection error",
        "event_duration": 5033755,
    },
    {
        "@timestamp": "2023-10-23T13:55:01.543Z",
        "client_ip": "172.21.3.15",
        "message": "Connected to 10.1.0.1",
        "event_duration": 1756467,
    },
]

# Prepare the actions for the bulk API using list comprehension
actions = [{"_index": index_name, "_source": doc} for doc in documents]

# Perform the bulk index operation and capture the response
success, failed = bulk(es, actions)

if failed:
    print(f"Some documents failed to index: {failed}")
else:
    print(f"Successfully indexed {success} documents.")

我们可以在 Kibana 中进行查看:

取消默认的 500 limit 警告

# Suppress specific Elasticsearch warnings about default limit of [500] that pollute responses

import warnings
from elasticsearch import ElasticsearchWarning

warnings.filterwarnings("ignore", category=ElasticsearchWarning)

格式化响应为可以阅读的格式

# Format response to return human-readable tables


def format_response(response_data):
    column_names = [col["name"] for col in response_data["columns"]]
    column_widths = [
        max(
            len(name),
            max(
                (
                    len(str(row[i]) if row[i] is not None else "None")
                    for row in response_data["values"]
                ),
                default=0,
            ),
        )
        for i, name in enumerate(column_names)
    ]
    row_format = " | ".join(["{:<" + str(width) + "}" for width in column_widths])
    print(row_format.format(*column_names))
    print("-" * sum(column_widths) + "-" * (len(column_widths) - 1) * 3)
    for row in response_data["values"]:
        # Convert None values in the row to "None" before formatting
        formatted_row = [(str(cell) if cell is not None else "None") for cell in row]
        print(row_format.format(*formatted_row))

你的第一个 ES|QL 查询

每个 ES|QL 查询都以源命令开头。 源命令会生成一个表,通常包含来自 Elasticsearch 的数据。

FROM source 命令返回一个表,其中包含来自数据流、索引或别名的文档。 结果表中的每一行代表一个文档。 此查询从 sample_data 索引中返回最多 500 个文档:

esql_query = "FROM sample_data"

response = es.esql.query(query=esql_query)
format_response(response)

每列对应一个字段,并且可以通过该字段的名称进行访问。

ℹ️ ES|QL 关键字不区分大小写。 FROM sample_data 与 from sample_data 相同。

处理命令

源命令后面可以跟一个或多个处理命令,用竖线字符分隔:|。 处理命令通过添加、删除或更改行和列来更改输入表。 处理命令可以执行过滤、投影、聚合等。

例如,你可以使用 LIMIT 命令来限制返回的行数,最多为 10,000 行:

esql_query = """
FROM sample_data
| LIMIT 3
"""

response = es.esql.query(query=esql_query)
format_response(response)

对表格进行排序

另一个处理命令是 SORT 命令。 默认情况下,FROM 返回的行没有定义的排序顺序。 使用 SORT 命令对一列或多列上的行进行排序:

esql_query = """
FROM sample_data
| SORT @timestamp DESC
"""

response = es.esql.query(query=esql_query)
format_response(response)

查询数据

使用 WHERE 命令来查询数据。 例如,要查找持续时间超过 5 毫秒的所有事件:

esql_query = """
FROM sample_data
| WHERE event_duration > 5000000
"""

response = es.esql.query(query=esql_query)
format_response(response)

WHERE 支持多个运算符。

例如,你可以使用 LIKE 对消息列运行通配符查询:

esql_query = """
FROM sample_data
| WHERE message LIKE "Connected*"
"""

response = es.esql.query(query=esql_query)
format_response(response)

更多处理命令

还有许多其他处理命令,例如用于保留或删除列的 KEEP 和 DROP、用于使用 Elasticsearch 中索引的数据丰富表的 ENRICH 以及用于处理数据的 DISSECT 和 GROK。 有关概述,请参阅处理命令。

链式处理命令

你可以链接处理命令,并用竖线字符分隔:|。 每个处理命令都作用于前一个命令的输出表。 查询的结果是最终处理命令生成的表。

以下示例首先根据 @timestamp 对表进行排序,然后将结果集限制为 3 行:

esql_query = """
FROM sample_data
| SORT @timestamp DESC
| LIMIT 3
"""

response = es.esql.query(query=esql_query)
format_response(response)

ℹ️ 处理命令的顺序很重要。 首先将结果集限制为 3 行,然后再对这 3 行进行排序,很可能会返回与此示例不同的结果,其中排序在限制之前。

计算值

使用 EVAL 命令将包含计算值的列追加到表中。 例如,以下查询附加一个 duration_ms 列。 该列中的值是通过将 event_duration 除以 1,000,000 计算得出的。 换句话说: event_duration 从纳秒转换为毫秒。

esql_query = """
FROM sample_data
| EVAL duration_ms = event_duration/1000000.0
"""

response = es.esql.query(query=esql_query)
format_response(response)

EVAL 支持多种函数。 例如,要将数字四舍五入为最接近指定位数的数字,请使用 ROUND 函数:

esql_query = """
FROM sample_data
| EVAL duration_ms = ROUND(event_duration/1000000.0, 1)
"""

response = es.esql.query(query=esql_query)
format_response(response)

计算统计数据

你还可以使用 ES|QL 来聚合数据。 使用 STATS ... BY 命令计算统计数据。

例如,计算中位持续时间:

esql_query = """
FROM sample_data
| STATS median_duration = MEDIAN(event_duration)
"""

response = es.esql.query(query=esql_query)
format_response(response)

你可以使用一个命令计算多个统计数据:

esql_query = """
FROM sample_data
| STATS median_duration = MEDIAN(event_duration), max_duration = MAX(event_duration)
"""

response = es.esql.query(query=esql_query)
format_response(response)

使用 BY 按一列或多列对计算的统计数据进行分组。 例如,要计算每个客户端 IP 的中位持续时间:

esql_query = """
FROM sample_data
| STATS median_duration = MEDIAN(event_duration) BY client_ip
"""

response = es.esql.query(query=esql_query)
format_response(response)

访问列

你可以通过名称访问列。 如果名称包含特殊字符,则需要用反引号(`)引起来。

为 EVAL 或 STATS 创建的列分配显式名称是可选的。 如果不提供名称,则新列名称等于函数表达式。 例如:

esql_query = """
FROM sample_data
| EVAL event_duration/1000000.0
"""

response = es.esql.query(query=esql_query)
format_response(response)

在此查询中,EVAL 添加一个名为 event_duration/1000000.0 的新列。 由于其名称包含特殊字符,因此要访问此列,请用反引号引用它:

esql_query = """
FROM sample_data
| EVAL event_duration/1000000.0
| STATS MEDIAN(`event_duration/1000000.0`)
"""
response = es.esql.query(query=esql_query)
format_response(response)

创建直方图

为了跟踪一段时间内的统计数据,ES|QL 允许你使用 AUTO_BUCKET 函数创建直方图。 AUTO_BUCKET 创建人性化的存储桶大小,并为每行返回一个与该行所属的结果存储桶相对应的值。

例如,要为 10 月 23 日的数据创建每小时存储桶:

esql_query = """
FROM sample_data
| KEEP @timestamp
| EVAL bucket = AUTO_BUCKET (@timestamp, 24, "2023-10-23T00:00:00Z", "2023-10-23T23:59:59Z")
"""
response = es.esql.query(query=esql_query)
format_response(response)

将 AUTO_BUCKET 与 STATS ... BY 结合起来创建直方图。 例如,要计算每小时的事件数:

esql_query = """
FROM sample_data
| KEEP @timestamp, event_duration
| EVAL bucket = AUTO_BUCKET (@timestamp, 24, "2023-10-23T00:00:00Z", "2023-10-23T23:59:59Z")
| STATS COUNT(*) BY bucket
"""
response = es.esql.query(query=esql_query)
format_response(response)

或每小时的中位持续时间:

esql_query = """
FROM sample_data
| KEEP @timestamp, event_duration
| EVAL bucket = AUTO_BUCKET (@timestamp, 24, "2023-10-23T00:00:00Z", "2023-10-23T23:59:59Z")
| STATS median_duration = MEDIAN(event_duration) BY bucket
"""
response = es.esql.query(query=esql_query)
format_response(response)

丰富数据

ES|QL 使你能够使用 ENRICH 命令使用 Elasticsearch 中索引的数据来丰富表。

ℹ️ 在使用 ENRICH 之前,你首先需要创建并执行丰富策略。 我们在 ela.st/ql 提供了一个演示环境,其中已经创建并执行了名为 clientip_policy 的丰富策略,如果你只是想看看它是如何工作的。

以下请求创建并执行名为 clientip_policy 的策略。 该策略将 IP 地址链接到环境(“Development”、“QA” 或 “Production”)。

# Define the mapping
mapping = {
    "mappings": {
        "properties": {"client_ip": {"type": "keyword"}, "env": {"type": "keyword"}}
    }
}

# Create the index with the mapping
es.indices.create(index="clientips", body=mapping)

# Prepare bulk data
bulk_data = [
    {"index": {}},
    {"client_ip": "172.21.0.5", "env": "Development"},
    {"index": {}},
    {"client_ip": "172.21.2.113", "env": "QA"},
    {"index": {}},
    {"client_ip": "172.21.2.162", "env": "QA"},
    {"index": {}},
    {"client_ip": "172.21.3.15", "env": "Production"},
    {"index": {}},
    {"client_ip": "172.21.3.16", "env": "Production"},
]

# Bulk index the data
es.bulk(index="clientips", body=bulk_data)

# Define the enrich policy
policy = {
    "match": {
        "indices": "clientips",
        "match_field": "client_ip",
        "enrich_fields": ["env"],
    }
}

# Put the enrich policy
es.enrich.put_policy(name="clientip_policy", body=policy)

# Execute the enrich policy without waiting for completion
es.enrich.execute_policy(name="clientip_policy", wait_for_completion=True)

创建并执行策略后,你可以将其与 ENRICH 命令一起使用:

esql_query = """
FROM sample_data
| KEEP @timestamp, client_ip, event_duration
| EVAL client_ip = TO_STRING(client_ip)
| ENRICH clientip_policy ON client_ip WITH env
"""
response = es.esql.query(query=esql_query)
format_response(response)

你可以在后续命令中使用 ENRICH 命令添加的新 env 列。 例如,要计算每个环境的中位持续时间:

esql_query = """
FROM sample_data
| KEEP @timestamp, client_ip, event_duration
| EVAL client_ip = TO_STRING(client_ip)
| ENRICH clientip_policy ON client_ip WITH env
| STATS median_duration = MEDIAN(event_duration) BY env
"""
response = es.esql.query(query=esql_query)
format_response(response)

有关使用 ES|QL 进行数据丰富的更多信息,请参阅数据丰富。

处理数据

你的数据可能包含非结构化字符串,你希望对其进行结构化以便更轻松地分析数据。 例如,示例数据包含如下日志消息:

Connected to 10.1.0.3

通过从这些消息中提取 IP 地址,你可以确定哪个 IP 接受了最多的客户端连接。

要在查询时构建非结构化字符串,你可以使用 ES|QL DISSECT 和 GROK 命令。 DISSECT 的工作原理是使用基于分隔符的模式分解字符串。 GROK 的工作原理类似,但使用正则表达式。 这使得 GROK 更强大,但通常也更慢。

在这种情况下,不需要正则表达式,因为 message 很简单:“Connected to ”,后跟服务器 IP。 要匹配此字符串,你可以使用以下 DISSECT 命令:

esql_query = """
FROM sample_data
| DISSECT message "Connected to %{server_ip}"
"""
response = es.esql.query(query=esql_query)
format_response(response)

这会将 server_ip 列添加到具有与此模式匹配的 message 的那些行。 对于其他行,server_ip 的值为空。

你可以在后续命令中使用 DISSECT 命令添加的新 server_ip 列。 例如,要确定每个服务器已接受多少个连接:


esql_query = """
FROM sample_data
| WHERE STARTS_WITH(message, "Connected to")
| DISSECT message "Connected to %{server_ip}"
| STATS COUNT(*) BY server_ip
"""
response = es.esql.query(query=esql_query)
format_response(response)

ℹ️ 要了解有关使用 ES|QL 进行数据处理的更多信息,请参阅使用 DISSECT 和 GROK 进行数据处理。

了解更多,请阅读 “Elasticsearch:ES|QL 查询展示”。

最终的 Notebook 可以在地址 https://github.com/liu-xiao-guo/esql/blob/main/esql-getting-started.ipynb 下载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1536297.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

360企业安全浏览器兼容模式显示异常某个内容不显示 偶发现象 本地无法复现情况js

360企业安全浏览器兼容模式显示异常 &#xff0c;现象测试环境频发 &#xff0c;本地连测试无法复现&#xff0c;线上反馈问题。 出现问题的电脑为windows且使用360企业安全浏览器打开兼容模式可复现 复现过程&#xff1a; 不直接点击超链接跳转页面 &#xff0c;登录后直接通…

ctf_show笔记篇(web入门---反序列化)

目录 反序列化 254&#xff1a;无用&#xff0c;是让熟悉序列化这个东西的 255&#xff1a;直接使$isViptrue 256&#xff1a;还是使用变量覆盖 257&#xff1a;开始使用魔法函数 258&#xff1a;将序列化最前面的过滤了&#xff0c;使用绕过 259: 这一题需要看writeup才…

uni-app攻略:如何对接驰腾打印机

一.引言 在当前的移动开发生态中&#xff0c;跨平台框架如uni-app因其高效、灵活的特点受到了开发者们的青睐。同时&#xff0c;随着物联网技术的飞速发展&#xff0c;智能打印设备已成为许多业务场景中不可或缺的一环。今天&#xff0c;我们就来探讨如何使用uni-app轻松对接驰…

全局过滤器实现Jwt校验

从Session到Jwt 之前我写过一篇 什么是 httpsession &#xff1a; 理解HttpSession 在经典的那个登录场景中&#xff1a; 客户端第一次访问的时候 需要登录 登录成功之后 后面再次访问的时候 为了让服务器认识 这是已经登录成功的我 在session中存储的用户的信息。 现在我…

按摩师C语言

题干出现“接或不接”,“最优”&#xff0c;仔细一想&#xff0c;该用动态规划了。 #include<stdio.h> int max(int a,int b) {if(a>b)return a;elsereturn b; } int massage(int* nums,int numSize) {if(numSize 0)return 0;else if(numSize 1)return nums[0];els…

面试笔记——MySQL(主从同步原理、分库分表)

主从同步原理 主从同步结构&#xff1a;主库负责写数据&#xff0c;从库负责读数据&#xff0c;如图—— MySQL主从复制的核心就是二进制日志&#xff08;BINLOG&#xff09;&#xff0c;它记录了所有的 DDL&#xff08;数据定义语言&#xff09;语句和 DML&#xff08;数据操…

php表单生成器系统下载 全新万能自定义表单系统源码 开源可二开

在数字化时代&#xff0c;表单系统是许多网站和应用不可或缺的一部分。为了满足不同场景下的需求&#xff0c;分享一个全新万能自定义表单系统源码&#xff0c;基于PHP开发&#xff0c;具有高度的灵活性和可扩展性&#xff0c;支持设置收费表单在线提交&#xff0c;比如说&…

Unity类银河恶魔城学习记录11-3 p105 Inventory UI源代码

Alex教程每一P的教程原代码加上我自己的理解初步理解写的注释&#xff0c;可供学习Alex教程的人参考 此代码仅为较上一P有所改变的代码 【Unity教程】从0编程制作类银河恶魔城游戏_哔哩哔哩_bilibili UI_itemSlot.cs using System.Collections; using System.Collections.Gen…

JAVA八股--集合面试题

AVA八股--集合面试题--上 java只有值传递&#xff0c;没有引用传递代理模式Java之HashMap和Hashtable选用 ArrayDeque 来实现队列要比 LinkedList 更好为什么HashMap的长度一定是2的次幂&#xff1f;HashMap常见遍历方式 java只有值传递&#xff0c;没有引用传递 文章讲解 文…

全面放开的主流电商API接口,跨境电商与您“面对面”

通过 API&#xff0c;一个软件可以向另一个软件请求数据、执行操作或者提供服务。比如&#xff0c;当你使用手机上的天气应用程序时&#xff0c;它可能通过调用天气预报 API 来获取实时天气数据。又或者&#xff0c;当你在社交媒体上分享照片时&#xff0c;这个应用程序可能使用…

transformer的学习:Attention is all you need

目录 整体概述&#xff1a;​编辑​编辑 encoder&#xff1a; embedding&#xff1a; ​编辑 self-attention&#xff1a; 向量的相似度计算&#xff1a; qkv怎么来的​编辑 softmax&#xff1a; code multi-head-attention 位置编码&#xff1a; 残差&&FFN&…

基于react native的自定义轮播图

基于react native的自定义轮播图 效果示例图示例代码 效果示例图 示例代码 import React, {useEffect, useRef, useState} from react; import {Animated,PanResponder,StyleSheet,Text,View,Dimensions, } from react-native; import {pxToPd} from ../../common/js/device;c…

8个 C++ 开源项目,帮初学者快速进阶

参与或阅读开源项目的源代码&#xff0c;可以获得丰富的实践机会。下面&#xff0c;让我们一起看看以下八个优秀的 C 开源项目。 通过参与或阅读开源项目的源代码&#xff0c;你可以获得丰富的实践机会。实际的项目代码比简单的教程更具挑战性&#xff0c;可以帮助你深入理解 …

19.作业

1.作业样例图 2.学习视频 19.作业讲解

LeetCode每日一题【19. 删除链表的倒数第 N 个结点】

思路&#xff1a;快慢指针 /*** Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* ListNode() : val(0), next(nullptr) {}* ListNode(int x) : val(x), next(nullptr) {}* ListNode(int x, ListNode *next) : val(x)…

vuex - 21年的笔记 - 后续更新

vuex是什么 Vuex是实现组件全局状态&#xff08;数据&#xff09;管理的一种机制&#xff0c;方便的实现组件之间的数据的共享 使用vuex统一管理状态的好处 能够在vuex中集中管理共享的数据&#xff0c;易于开发和后期维护能够高效地实现组件之间的数据共享&#xff0c;提高…

程序设计基础--C语言【三】

课堂笔记两次合集 3.运算符 目录 3.运算符 3.1.算术运算符 3.2.赋值运算符 3.3.增1、减1运算符 3.4.关系运算符 3.5.逻辑运算符 3.6.条件运算符 3.7.类型转换 3.8.基本输入、输出函数 3.8.1.字符输出函数putchar() 3.8.2.字符输入函数getchar() 3.8.3.格式化输入…

算法-图的强连通分量,图的最小生成树

1.图的强连通分量 (1). 定义 图的强连通分量是图论中的一个重要概念&#xff0c;主要在有向图中进行讨论。具体来说&#xff0c;如果在一个有向图G中&#xff0c;任意两个顶点vi和vj&#xff08;其中vi大于vj&#xff09;之间都存在一条从vi到vj的有向路径&#xff0c;同时也存…

Android App开发的自动化测试框架UI Automator使用教程

Android的自动化测试有很多框架&#xff0c;其中ui automator是google官方提供的黑盒UI相关的自动化测试工具&#xff0c;&#xff08;GitHub主页&#xff1a;case使用java写&#xff0c;今天实践了一下官方文档中样例程序&#xff0c;其中还是有一些小问题需要总结一下的。 环…

为什么签名apk,需要公钥证书和私钥证书,不是私钥就能签名吗?对应的公钥通常包含在APK文件中,这样用户和系统可以验证签名的有效性

在Android开发中&#xff0c;对APK进行签名确实需要使用到公钥证书和私钥证书&#xff0c;而不仅仅是私钥。以下是详细解释&#xff1a; 身份验证&#xff1a;公钥证书作为应用程序的身份证明&#xff0c;可以帮助用户或系统验证安装的APK的真实性。当用户下载并安装APK时&…