Python-ElasticSearch客户端的封装(聚合查询、统计查询、全量数据)

news2025/1/10 16:35:06

目录

  • ES Python客户端介绍
  • 封装代码
  • 测试代码
  • 参考


ES Python客户端介绍

官方提供了两个客户端elasticsearch、elasticsearch-dsl

pip install elasticsearch
pip install elasticsearch-dsl

第二个是对第一个的封装,类似ORM操作数据库,可以.filter、.groupby,个人感觉很鸡肋,star数也不多。平时使用的时候一般会在kibana上测试,然后直接把query拷贝过来获取更多数据,所以这里做下第一个的封装。

封装代码

  1. 封装后依然暴露了es,方便有特殊情况下使用
  2. index一般很少改动,就直接放到对象中了,可以使用set_index修改
  3. 常用的应该是get_doc和get_doc_scroll来获取少量和全量数据

代码测试时使用的是7.17.12版本,大于此版本可能由于官方改动出异常

pip install elasticsearch==7.17.12

es.py

import random
import string
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from typing import List,Dict


class ESClient:

    def __init__(self, host="127.0.0.1",index="", http_auth = None):
        self.index = index
        if http_auth is None:
            self.es = Elasticsearch(hosts=host)
        else:
            self.es = Elasticsearch(hosts=host, http_auth=http_auth)
        print("success to connect " + host)

    def close(self):
        self.es.close()

    # 设置索引
    def set_index(self,index:str):
        self.index = index

    # 创建索引
    def create_index(self, index_name: str, mappings=None):
        res = self.es.indices.create(index=index_name, mappings=mappings)
        return res

    # 删除索引
    def delete_index(self, index_name: str):
        res = self.es.indices.delete(index=index_name)
        return res

    # 获取索引
    def get_index(self, index_name: str):
        res = self.es.indices.get(index=index_name)
        return res

    # 创建文档(单个)
    def create_doc(self,body, _id=''.join(random.sample(string.ascii_letters+string.ascii_uppercase+string.digits,20))):
        res = self.es.create(index=self.index, body=body, id=_id)
        return res

    # 创建文档(批量)
    def create_doc_bulk(self, docs: List[Dict]):
        actions = []
        for doc in docs:
            action = {
                "_index": self.index,
                "_op_type": "create",
                "_id": ''.join(random.sample(string.ascii_letters+string.ascii_uppercase+string.digits,20))
            }
            for k,v in doc.items():
                action[k] = v
            actions.append(action)
        res = bulk(client=self.es, actions=actions)
        return res

    # 删除文档
    def delete_doc(self, doc_id):
        res = self.es.delete(index=self.index, id=doc_id)
        return res

    # 更新文档
    def update_doc(self, doc_id, doc:Dict):
        body = {
            "doc" : doc
        }
        res = self.es.update(index=self.index, id=doc_id, body=body)
        return res

    # 分页获取超过100000的文档
    def get_doc_scroll(self,query:Dict):
        res = self.es.search(index=self.index,size=10000,body=query,search_type="query_then_fetch",scroll="5m")
        data_list = []
        hits = res.get("hits")
        scroll_id = res.get('_scroll_id')
        total_value = 0
        # total 可能为Dict或int
        if isinstance(hits.get('total'),Dict):
            total_value= hits.get('total').get('value')
        else:
            total_value = hits.get('total')

        if total_value>0:
            for data in hits.get('hits'):
                data_list.append(data.get('_source'))
        return scroll_id,data_list

    # 通过scroll_id分页获取后序文档
    def get_doc_by_scroll_id(self,scroll_id):
        page = self.es.scroll(scroll_id=scroll_id,scroll="5m")
        data_list = []
        scroll_id = page.get('_scroll_id')
        for data in page.get('hits').get('hits'):
            data_list.append(data)
        return scroll_id,data_list

    # 清空scroll_id,防止服务端不够用
    def clear_scroll(self,scroll_id):
        self.es.clear_scroll(scroll_id)

    # 获取索引的hits内容(一般用于获取文档id、总数)
    def get_doc_all(self):
        res = self.es.search(index=self.index)
        return res['hits']

    # 获取一个文档
    def get_doc_by_id(self, id_):
        res = self.es.get(index=self.index, id=id_)
        return res["_source"]

    # 获取所有文档的_source内容(小于100000)
    def get_doc(self,query:Dict,size:int=100000):
        query['size'] = size
        res = self.es.search(index=self.index,body=query)
        data_list = []
        hits = res.get("hits")
        total_value = 0
        # total 可能为Dict或int
        if isinstance(hits.get('total'), Dict):
            total_value = hits.get('total').get('value')
        else:
            total_value = hits.get('total')

        if total_value > 0:
            for data in hits.get('hits'):
                data_list.append(data.get('_source'))
        return data_list

    # 聚合查询(分组条件名为group_by,返回buckets)
    def get_doc_agg(self, query):
        res = self.es.search(index=self.index, body=query)
        return res['aggregations']['group_by'].get('buckets')

    # 统计查询(统计条件为stats_by,返回最值、平均值等)
    def get_doc_stats(self,query):
        res = self.es.search(index=self.index,body=query)
        return res['aggregations']["stats_by"]

测试代码

import unittest
from es import ESClient

cli = ESClient(host="http://10.28.144.3:9200",http_auth=["elastic","changeme"])
def test_create_index():
    res = cli.create_index(index_name="test")
    print(res)

def test_delete_index():
    res = cli.delete_index(index_name="test")
    print(res)

def test_get_index():
    res = cli.get_index(index_name="test")
    print(res)

def test_set_index():
    cli.set_index(index="test")

def test_create_doc():
    body = {
        "name": "lady_killer9",
        "age": 19
    }
    res = cli.create_doc(body=body)
    print(res)

def test_create_doc_bulk():
    from copy import deepcopy
    body = {
        "name": "lady_killer9"
    }
    users = []
    for i in range(100001):
        tmp = deepcopy(body)
        tmp["age"] = i
        users.append(tmp)
    res = cli.create_doc_bulk(docs=users)
    print(res)


def test_get_doc_all():
    res = cli.get_doc_all()
    print(res)


def test_get_doc_by_id():
    res = cli.get_doc_by_id("jHALXDQaENQZPM4C9EUt")
    print(res)

def test_get_doc():
    query = {
        "query": {
            "match_all": {

            }
        }
    }
    res = cli.get_doc(query=query,size=20)
    print(res)

def test_update_doc():
    body={
        "name": "lady_killer_after_update"
    }
    res = cli.update_doc(doc_id="jHALXDQaENQZPM4C9EUt",doc=body)
    print(res)


def test_delete_doc():
    res = cli.delete_doc(doc_id="jHALXDQaENQZPM4C9EUt")
    print(res)

def test_get_doc_agg():
    query = {
            "aggs": {
                "group_by": {
                    "terms": {
                        "field": "age"
                    }
                }
            }
    }
    res = cli.get_doc_agg(query=query)
    print(res)

def test_get_doc_stats():
    query = {
            "aggs": {
                "stats_by": {
                    "stats": {
                        "field": "age"
                    }
                }
            }
    }
    res = cli.get_doc_stats(query=query)
    print(res)

def test_get_doc_scroll():
    query = {
        "query": {
            "match_all": {}
        }
    }
    scroll_id,data_list = cli.get_doc_scroll(query=query)
    res = []
    while data_list:
        res.extend(data_list)
        scroll_id,data_list = cli.get_doc_by_scroll_id(scroll_id=scroll_id)
    print(len(res))


if __name__ == '__main__':
    # test_delete_index()
    test_create_index()
    test_get_index()
    # test_set_index()
    # test_create_doc()
    # test_create_doc_bulk()
    # test_get_doc_all()
    # test_update_doc()
    # test_get_doc_by_id()
    # test_get_doc()
    # test_delete_doc()
    # test_get_doc_agg()
    # test_get_doc_stats()
    # test_get_doc_scroll()
    cli.close()

测试截图
在这里插入图片描述
更多python相关内容:【python总结】python学习框架梳理

本人b站账号:一路狂飚的蜗牛

有问题请下方评论,转载请注明出处,并附有原文链接,谢谢!如有侵权,请及时联系。如果您感觉有所收获,自愿打赏,可选择支付宝18833895206(小于),您的支持是我不断更新的动力。

参考

github-elasticsearch
github-elasticsearch-dsl

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/811940.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

EverEdit的一些特殊使用教学(持续更新)

项目场景: EverEdit的使用经常一问三不知 搜也搜不到 解决方案: 先去EverEdit在线帮助文档看一下(附链接) EverEdit在线帮助文档 1.快速排序 使用快捷键时:若是小键盘,请同时按住fn键(在最左下角的ctrl旁)

220. 存在重复元素 III

220. 存在重复元素 III 原题链接:完成情况:解题思路:TreeSetsequenceSet.ceilingjava中的红黑树 参考代码: 原题链接: 220. 存在重复元素 III https://leetcode.cn/problems/contains-duplicate-iii/description/ 完…

ER系列路由器多网段划分设置指南

ER系列路由器多网段划分设置指南 - TP-LINK 服务支持 TP-LINK ER系列路由器支持划分多网段,可以针对不同的LAN接口划分网段,即每一个或多个LAN接口对应一个网段;也可以通过一个LAN接口与支持划分802.1Q VLAN的交换机进行对接,实现…

幅度调制与角度调制

文章目录 前言一、调制简介1、调制定义2、调制目的3、调制的分类 二、幅度调制(线性调制)1、幅度调制的一般模型2、常规双边带调幅 AM①、AM 信号的产生②、AM 调制器的模型③、AM 波形和频谱④、AM 信号的特点⑤、AM 包络检波⑥、调幅系数 3、抑制载波双…

Kotlin~Facade

概念 又称门面模式,为复杂系统提供简单交互接口。 角色介绍 Facade:外观类,供客户端调用,将请求委派给响应的子系统。SubSystem:子系统,独立的子设备或子类 UML 代码实现 class Light(val name: Strin…

Oracle 19c 报ORA-704 ORA-01555故障处理---惜分飞

异常断电导致数据库无法启动,尝试对数据文件进行recover操作,报ORA-00283 ORA-00742 ORA-00312错误,由于redo写丢失无法正常应用 D:\check_db>sqlplus / as sysdba SQL*Plus: Release 19.0.0.0.0 - Production on 星期日 7月 30 07:49:19 2023 Version 19.3.0.0.0 Copyrig…

msvcp120.dll丢失的解决方法?哪种解决方法比较推荐?

msvcp120.dll是Microsoft Visual C Redistributable软件包的一部分。它是用于支持运行使用Microsoft Visual C编写的应用程序的动态链接库文件。msvcp120.dll提供了许多C标准库函数和组件,包括输入/输出、字符串处理、数学运算、内存管理等功能。 当您运行某个依赖于…

C++初阶——缺省参数以及函数重载

1. 缺省参数 缺省参数的分类&#xff1a;全缺省&#xff0c;半缺省 缺省参数是声明或定义函数时为函数的参数指定一个缺省值 在调用该函数时&#xff0c;若没有指定实参则采用该形参的缺省值否则使用指定的实参 void Func(int a 0) {cout<<a<<endl; }int main(…

nest的核心概念

请求进来 --- 中间件 --- 守卫 --- 拦截器 --- 通道 --- 处理&#xff08;controller层&#xff09; --- 拦截器 --- 返回 Pipe &#xff1a; 就是实现 PipeTransform 接口的 transform 方法&#xff0c;它的返回值就是传给 handler 的值。 ---------------------------------…

【Python数据分析】Python常用内置函数(二)

&#x1f389;欢迎来到Python专栏~Python常用内置函数&#xff08;二&#xff09; ☆* o(≧▽≦)o *☆嗨~我是小夏与酒&#x1f379; ✨博客主页&#xff1a;小夏与酒的博客 &#x1f388;该系列文章专栏&#xff1a;Python学习专栏 文章作者技术和水平有限&#xff0c;如果文…

Go语言进阶 + 依赖管理

依赖配置 - version开始&#xff0c;就开始很难听懂了&#xff0c;需要结合很多课后配套资料查阅很多文档和网站....然而好像没有那么多时间&#xff0c;一天给3小时学Go真的顶天了.....还有算法和Linux的Mysql... 这几天学Go已经把算法给挤掉了.....下步要权衡一下&#xff0c…

C#实现数据库数据变化监测(sqlservermysql)

监测数据库表数据变化&#xff0c;可实现数据库同步&#xff08;一主一从&#xff08;双机备份&#xff09;&#xff0c;一主多从&#xff08;总部数据库&#xff0c;工厂1&#xff0c;工厂2&#xff0c;工厂数据合并到总部数据&#xff09;&#xff09; sqlserver 启用数据库…

【Linux】网络基础

&#x1f34e;作者&#xff1a;阿润菜菜 &#x1f4d6;专栏&#xff1a;Linux系统网络编程 文章目录 一、协议初识和网络协议分层&#xff08;TCP/IP四层模型&#xff09;认识协议TCP/IP五层&#xff08;或四层&#xff09;模型 二、认识MAC地址和IP地址认识MAC地址认识IP地址认…

什么是ssm?如何使用ssm进行后端开发

目录 一、ssm概述1.1 定义1.2 持久层框架 (mybatis&&mybatisPlus)1.3 Web 层框架 springMVC1.4 spring框架 二、开发结构2.1 config介绍2.2 controller介绍2.3 dao介绍2.4 domain介绍2.5 exception介绍2.6 interceptor介绍2.7 service介绍 三、注解开发介绍3.1 常见的注…

从零开始学Flask: 3分钟用Python快速构建Web应用

文章目录 一、背景二、安装&基础使用1. 安装 Flask2. 创建 Flask 应用3. 路由解析4. 模板渲染5. 请求和响应处理 三、Demo项目实战 一、背景 什么是Flask&#xff1f;Flask 是一个轻量级的 Python Web 应用框架&#xff0c;因其简单易用、灵活性高等特点&#xff0c;可以帮…

Ribbon源码

学了feign源码之后感觉&#xff0c;这部分还是按运行流程分块学合适。核心组件什么的&#xff0c;当专业术语学妥了。序章&#xff1a;认识真正のRibbon 但只用认识一点点 之前我们学习Ribbon的简单使用时&#xff0c;都是集成了Eureka-client或者Feign等组件&#xff0c;甚至在…

better scoll右 联左

这是先拿一个数组装进我们所有 获取到的dom节点的 高度 因为算的 都是 到最上面的 高度&#xff0c;所以我们 要减去他的 高度 就得到自身的高度 然后给右边加一个滚动事件&#xff0c;得到每一次滑动的高度&#xff0c;在循环上面的数组&#xff0c;就是我们右边的 y就在算出…

如何排查 IDEA 自身报错?| 以 IntelliJ IDEA 2023.1.4 无法刷新项目 Maven 模块的问题为例

这个问题是 2023 年 7 月 26 日遇到的&#xff0c;当时还是 IDEA 2023.1.4&#xff0c;结果文章还没写完&#xff0c;7 月 27 日自动给更新了 IDEA 2023.2。问题估计解决了。 所以&#xff0c;本文就简单提一下 IDEA 自身报错的排查方法。 规避/解决方式 先说问题怎么处理&am…

重生之我要学C++第五天

这篇文章主要内容是构造函数的初始化列表以及运算符重载在顺序表中的简单应用&#xff0c;运算符重载实现自定义类型的流插入流提取。希望对大家有所帮助&#xff0c;点赞收藏评论&#xff0c;支持一下吧&#xff01; 目录 构造函数进阶理解 1.内置类型成员在参数列表中的定义 …

糟了,数据库主从延迟了!

前言 在实际的生产环境中&#xff0c;由单台MySQL作为独立的数据库是完全不能满足实际需求的&#xff0c;无论是在安全性&#xff0c;高可用性以及高并发等各个方面 因此&#xff0c;一般来说都是通过集群主从复制&#xff08;Master-Slave&#xff09;的方式来同步数据&…