Python脚本之操作Elasticsearch【一】

news2025/1/15 6:47:42

本文为博主原创,未经授权,严禁转载及使用。
本文链接:https://blog.csdn.net/zyooooxie/article/details/109588072

前面刚写了 requests发请求 操作Elasticsearch - Search https://blog.csdn.net/zyooooxie/article/details/123730279,再来分享下 使用elasticsearch库 ;

【实际这篇博客推迟发布N个月】

个人博客:https://blog.csdn.net/zyooooxie

【以下所有内容仅为个人项目经历,如有不同,纯属正常】

Python Client

在这里插入图片描述

https://www.elastic.co/guide/en/elasticsearch/client/index.html

我使用的 是 7.17.0;

https://pypi.org/project/elasticsearch/7.17.0/

https://www.elastic.co/guide/en/elasticsearch/client/python-api/7.17/overview.html

https://elasticsearch-py.readthedocs.io/en/v7.17.0/index.html

"""
@blog: https://blog.csdn.net/zyooooxie
@qq: 153132336
@email: zyooooxie@gmail.com
"""


import time
import traceback
import sys
import json
import string
import math
import random
from typing import Optional, Union, List, Any
from user_log import Log

from elasticsearch import Elasticsearch
from elasticsearch.helpers import BulkIndexError

gl_es_host_new = 'http://1.1.1.1:1111'
gl_es_host_new_2 = ['http://1.1.1.1:1111', 'http://2.2.2.2:2222']

# ``port`` needs to be an int.
gl_es_host_new_3 = [{'host': '2.2.2.2', 'port': 2222}]
gl_es_host_new_4 = [{'host': '2.2.2.2', 'port': 2222}, {'host': '1.1.1.1', 'port': 1111}]

gl_es_auth = ('es_username', 'es_password')

gl_type = '_doc'

gl_search_dict = {'size': 100, 'from': 0, "sort": {"xxxXXX": {"order": "desc"}}}


# pip install elasticsearch==7.17.0
# https://pypi.org/project/elasticsearch/7.17.0/
# https://www.elastic.co/guide/en/elasticsearch/reference/7.17/docs.html

# https://elasticsearch-py.readthedocs.io/en/v7.17.0/api.html

# doc_type 不建议使用 【Specifying types in requests is deprecated】
# https://www.elastic.co/guide/en/elasticsearch/reference/7.17/removal-of-types.html

# Note that in 7.0, _doc is a permanent part of the path, and represents the endpoint name rather than the document type.
# In Elasticsearch 7.0, each API will support typeless requests, and specifying a type will produce a deprecation warning.


搜索

"""
@blog: https://blog.csdn.net/zyooooxie
@qq: 153132336
@email: zyooooxie@gmail.com
"""


def connect_es_client(hosts: Union[str, list], auth: tuple):
    """

    :param hosts:
    :param auth:
    :return:
    """
    client = Elasticsearch(hosts,
                           sniff_on_start=True,  # sniff before doing anything
                           sniff_on_node_failure=True,  # refresh nodes after a node fails to respond
                           request_timeout=60,
                           http_auth=auth)  # HTTP authentication uses the http_auth parameter by passing in a username and password within a tuple

    Log.error('连接-{}'.format(client))

    return client


def close_es_client(client: Elasticsearch):
    """

    :param client:
    :return:
    """
    client.close()

    Log.error('断开连接')


def _es_search(index_str: str, client: Elasticsearch,
               size_: int = 10000, from_: int = 0,
               sort_: Union[str, dict] = {"seq": {"order": "desc"}},
               get_more_10000: bool = False,
               **kwargs):
    """

    :param index_str:
    :param client:
    :param size_:
    :param from_:
    :param sort_: query 传值是 {"seq": {"order": "desc"}} ; body 是 'seq:desc';
    :param get_more_10000:是否查询超过10000条的数据
    :param kwargs: 不建议使用 body传参;查全部时,啥都不传;
    :return:
    """

    # 索引不存在时,返回值是 None
    if not client.indices.exists(index=index_str):
        return None

    # from + size must be less than or equal to: [10000]
    assert size_ + from_ <= 10000

    # # ✅ New usage:
    # es.search(query={...})
    #
    # # ❌ Deprecated usage:
    # es.search(body={"query": {...}})

    Log.debug(locals())

    # search() 的 from: Defaults to 0.   size: Defaults to 10.
    # 但有时候为了查出来所有数据,size 默认给 最大值10000,from 默认给0;
    res = client.search(index=index_str, size=size_, from_=from_, sort=sort_, **kwargs)

    total = res.get('hits').get('total').get('value')
    Log.info(f'total:{total}')

    hits_len = len(res.get('hits').get('hits'))
    Log.info(f'hits有:{hits_len}条')

    result = _search_10000(hits_len=hits_len, first_search_result=res, locals_=locals(),
                           client=client, first_search_size=size_, get_more_10000=get_more_10000)
    Log.info(result[-10:])
    Log.info(f'search返回的结果有:{len(result)}条')

    return result


def _search_10000(client: Elasticsearch, hits_len: int, first_search_result: dict, locals_: dict,
                  first_search_size: int,
                  get_more_10000: bool = False):
    """

    :param client:
    :param hits_len:
    :param first_search_result:
    :param locals_:
    :param first_search_size:
    :param get_more_10000:
    :return:
    """
    if hits_len < first_search_size or not get_more_10000:

        if hits_len:
            return first_search_result.get('hits').get('hits')
        else:
            return []

    else:
        return __search_10000_get_result(client=client, locals_=locals_)


def __search_10000_get_result(client: Elasticsearch, locals_: dict):
    """

    :param client:
    :param locals_:
    :return:
    """
    from xxx_use.common_functions import compare_dict_key

    one_choice = random.getrandbits(2)
    Log.info(one_choice)

    if not one_choice:

        Log.info('scroll + scan')

        scroll_list = __scroll(client=client, locals_=locals_)
        scan_list = __scan(client=client, locals_=locals_)

        # 很多时候 因为sort值不同
        scroll_list = __change_before_compare(scroll_list)
        scan_list = __change_before_compare(scan_list)

        compare_dict_key(scroll_list, scan_list, assert_0=True)
        compare_dict_key(scan_list, scroll_list, assert_0=True)

        return scroll_list

    elif one_choice == 1:

        Log.info('scroll')
        return __scroll(client=client, locals_=locals_)

    elif one_choice == 2:

        Log.info('scan')
        return __scan(client=client, locals_=locals_)

    else:

        # return __limit(client=client, locals_=locals_)

        # 不推荐
        Log.info('指定seq范围 【自己造的假数据 确保 每条都有seq】')
        limit_list = __limit(client=client, locals_=locals_)
        scan_list = __scan(client=client, locals_=locals_)

        limit_list = __change_before_compare(limit_list)
        scan_list = __change_before_compare(scan_list)

        compare_dict_key(limit_list, scan_list, assert_0=True)
        compare_dict_key(scan_list, limit_list, assert_0=True)

        return limit_list


def __change_before_compare(result_list: list):
    """
    scroll + scan 结果比较前 对数据做个统一
    :param result_list:
    :return:
    """
    for rl in result_list:
        # 每个结果还有一个 _score ,它衡量了文档与查询的匹配程度。默认情况下,首先返回最相关的文档结果,就是说,返回的文档是按照 _score 降序排列的。
        rl.pop('sort', '不存在key')

        rl.pop('_score', '不存在key')

    return result_list


def __scan(client: Elasticsearch, locals_: dict):
    """

    :param client:
    :param locals_:
    :return:
    """
    # https://elasticsearch-py.readthedocs.io/en/v7.17.0/helpers.html#scan

    from elasticsearch.helpers import scan

    # query 要传的是 body for the search() api
    # query={"query": {"match": {"blog": "zyooooxie"}}}
    result = scan(client=client, index=locals_.get('index_str'), query=locals_.get('kwargs'),

                  size=5000,
                  scroll="3m")  # Any additional keyword arguments will be passed to the initial search() call

    Log.info(f'{result}, {type(result)}')

    res = [gr for gr in result]
    Log.info(len(res))

    return res


def __scroll(client: Elasticsearch, locals_: dict):
    """

    :param client:
    :param locals_:
    :return:
    """
    # https://elasticsearch-py.readthedocs.io/en/v7.17.0/api.html#elasticsearch.Elasticsearch.scroll

    scroll_time = '3m'
    search_res = client.search(index=locals_.get('index_str'), scroll=scroll_time,
                               query=locals_.get('kwargs').get('query'),
                               size=5000,
                               sort=['_doc'])
    scroll_id = search_res.get('_scroll_id')
    Log.info(scroll_id)

    total = search_res.get('hits').get('total').get('value')
    Log.info(f'总共有{total}条')

    res = search_res.get('hits').get('hits')

    while True:

        scroll_res = client.scroll(scroll_id=scroll_id, scroll=scroll_time)

        scroll_id = scroll_res.get('_scroll_id')

        data = scroll_res.get('hits').get('hits')
        res.extend(data)

        if not data:
            break

    assert total == len(res)

    # Search context are automatically removed when the scroll timeout has been exceeded.
    # 手动清理,using the clear-scroll API
    clear_res = client.clear_scroll(scroll_id=scroll_id)
    Log.info(clear_res)

    return res


def __limit(client: Elasticsearch, locals_: dict):
    """

    :param client:
    :param locals_:
    :return:
    """
    seq_max: int = get_seq_max(client=client, index_str=locals_.get('index_str'))

    query = locals_.get('kwargs').get('query')

    search_size = 10000  # search的传参 取最大
    limit_size = 5000  # 查询时 以seq排序,每次取的长度
    assert limit_size <= search_size

    res = list()

    for i in range(math.ceil(seq_max / limit_size)):
        query_new = {'bool': {'must': [
            query,
            {'range': {'seq': {'gt': limit_size * i, 'lte': limit_size * (i + 1)}}}  # gt、lte
        ]
        }}
        # Log.info(query_new)

        search_res = client.search(index=locals_.get('index_str'),
                                   query=query_new,
                                   size=search_size)
        data = search_res.get('hits').get('hits')

        res.extend(data)

    else:
        Log.info(len(res))

        return res

本文链接:https://blog.csdn.net/zyooooxie/article/details/109588072

个人博客 https://blog.csdn.net/zyooooxie

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1435313.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

从零开始手写mmo游戏从框架到爆炸(七)— 消息封装

上一篇&#xff0c;我们初步把消息handler 注册到了服务中&#xff0c;在进行后续工作之前我们需要再做一些准备工作。 第一&#xff1a;把之前自己管理的bean放到spring中去管理&#xff0c;后面大部分的bean都通过spring来管理。 第二&#xff1a;为了方便路由消费&#xff0…

Golang-Map有序输出——使用orderedmap库实现

前言 工作中遇到一个问题&#xff1a;需要导出一个MySQL表格&#xff0c;表格内容由sql查询得来。但现在发现&#xff0c;所导出的表格中&#xff0c;各列的顺序不确定。多次导出&#xff0c; 每一次的序列顺序也是不定的。 因此确定是后端&#xff0c;Map使用相关导致的问题。…

别具一格,质感拉满 | PITAKA苹果Apple Watch彩碳表带开箱

别具一格&#xff0c;质感拉满 | PITAKA苹果Apple Watch彩碳表带开箱 我是在前年的时候购买的目前手头这款Apple Watch Series7&#xff0c;因为是购买的Nike版&#xff0c;所以可以看到它的表带标配为透气孔的运动型表带。 &#x1f53a;耐克版的透气孔表带虽说在一定程度上解…

时序预测 | MATLAB实现基于CNN-LSTM-AdaBoost卷积长短期记忆网络结合AdaBoost时间序列预测

时序预测 | MATLAB实现基于CNN-LSTM-AdaBoost卷积长短期记忆网络结合AdaBoost时间序列预测 目录 时序预测 | MATLAB实现基于CNN-LSTM-AdaBoost卷积长短期记忆网络结合AdaBoost时间序列预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 1.MATLAB实现基于CNN-LST…

电机控制系列模块解析(第六篇)—— 观测器

最近有上传一些入门的免积分的资料&#xff0c;方便大家上手进行仿真分析。注意查收。还在继续更新中。 继续回到咱们的电机控制系列模块解析&#xff08;第六篇&#xff09;—— 观测器 1、无位置传感器控制背景 这方面的文献比较多&#xff0c;直接引用一些文献里的背景知…

SSRF:服务端请求伪造攻击

目录 什么是SSRF&#xff1f; 攻击内网应用 端口扫描 攻击非web应用 pikachu中的SSRF curl&#xff08;端口扫描&#xff09; file_get_content&#xff08;读取文件&#xff09; 防御SSRF 什么是SSRF&#xff1f; 服务端请求伪造&#xff08;Server Side Request For…

Python __file__属性:查看模块的源文件路径

除可以查看模块的帮助信息之外&#xff0c;还可以直接阅读模块的源代码来掌握模块功能&#xff0c;提升 Python 编程能力。 不管学习哪种编程语言&#xff0c;认真阅读那些优秀的框架、库的源代码都是非常好的学习方法。 通过模块的 __file__ 属性即可查看到指定模块的源文件…

实践:微服务版本升级步骤以及maven仓库相关概念

进行微服务开发的时候&#xff0c;上层服务依赖于下层的服务的api&#xff0c;比如适配属于上层服务&#xff0c;用户属于下层服务。 例子: 上层服务 <!--订单管理微服务api依赖--> <dependency><groupId>com.jn.server</groupId><artifactId>…

docker部署自己的网站wordpress

目录 安装 1.创建目录 2.创建并启动mysql 3.创建并启动wordpress 使用 1.设置语言 2.设置基础信息 3.首页 安装 1.创建目录 mkdir -p /opt/wordpress/{db,data} 2.创建并启动mysql docker run -d --name my_mysql --restart always -e MYSQL_ROOT_PASSWORD123456 -e …

【网络安全】URL解析器混淆攻击实现ChatGPT账户接管、Glassdoor服务器XSS

文章目录 通配符URL解析器混淆攻击实现ChatGPT账户接管通配符URL解析器混淆攻击实现Glassdoor服务器缓存XSS 本文不承担任何由于传播、利用本文所发布内容而造成的任何后果及法律责任。 本文将基于ChatGPT及Glassdoor两个实例阐发URL解析器混淆攻击。 开始本文前&#xff0c;…

【问题篇】activiti工作流转办并处理备注问题

当处理activiti转办问题时&#xff0c;需要做的就是处理审批人和备注问题。 处理的思路是&#xff0c;先将当前环节标志成转办标签&#xff0c;再通过BUSINESS_KEY_找到流程实例的历史记录&#xff0c;找到最新的一条复制一份出来&#xff0c;表示需要转办到的人的历史记录并设…

【技能树学习】Git入门——练习题解析

前言 本篇文章给出了Git入门技能树中部分的练习题解析&#xff0c;包括分支管理&#xff0c;Git标签&#xff0c;在Mac和Windows上使用GitVSCode的步骤。强调了git cherry-pick不直接支持从标签中选择提交&#xff0c;git tag -d只能删除本地标签&#xff0c;Mac系统的终端可以…

【达梦数据库】使用DBeaver管理达梦数据库

使用DBeaver管理达梦数据库 Step1 安装相关程序 达梦8数据库DBeaver社区版 Step2 新建驱动 类型参数驱动名称DM8驱动类型Generic类名dm.jdbc.driver.DmDriverURL模板jdbc:dm://{host}:{port}默认端口5236默认数据库默认用户SYSDBA Step3 连接服务

ideal打包,如何访问项目根目录的libs中的jar包

参考&#xff1a;idea maven 导入lib中jar 并打包_maven引入lib中的jar包-CSDN博客 解决办法&#xff0c;只需要在pom文件中加入 <includeSystemScope>true</includeSystemScope> <build><!-- <includeSystemScope>true</includeSystemScope&g…

【DC渗透系列】DC-2靶场

arp先扫 ┌──(root㉿kali)-[~] └─# arp-scan -l Interface: eth0, type: EN10MB, MAC: 00:0c:29:6b:ed:27, IPv4: 192.168.100.251 Starting arp-scan 1.10.0 with 256 hosts (https://github.com/royhills/arp-scan) 192.168.100.1 00:50:56:c0:00:08 VMware, In…

EasyExcel下载带下拉框和批注模板

EasyExcel下载带下拉框和批注模板 一、 代码实现 controller下载入口 /***下载excel模板* author youlu* date 2023/8/14 17:31* param response* param request* return void*/PostMapping("/downloadTemplate")public void downloadExcel(HttpServletResponse r…

【计算机学院寒假社会实践】——服务走进社区,共绘幸福蓝图

为深入贯彻落实志愿者服务精神&#xff0c;扎实推进志愿者服务质量&#xff0c;2024年1月28日&#xff0c;曲阜师范大学计算机学院“青年扎根基层&#xff0c;服务走进社区”社会实践队队员周兴睿在孙宇老师的指导下&#xff0c;来到山东省滨州市陈集街道社区开展了为期一天的“…

SaperaCamExpert(相机专家)中文使用指南

参考&#xff1a;SaperaCamExpert中文使用指南.PDF 文章目录 软件介绍安装首次打开资源占用率功能主界面布局菜单栏FileViewPre-Processing&#xff1a;预处理 Tools&#xff1a; 快捷键&#xff1a;新建&#xff1b;打开&#xff1b;保存&#xff1b;帮助Device窗体属性树图像…

GPTs保姆级教程之实践

GPTs什么 使用GPTs的前提&#xff1a;ChatGPT Plus帐号 GTPs的作用&#xff1a;把我们和GPT对话的prompt&#xff0c;封装起来成为一个“黑匣子”。 主要有两个作用&#xff1a; 1、避免反复输入prompt&#xff0c;“黑匣子”打开&#xff0c;输入问题即可使用 2、在别人可以…

docker安装etherpad文档系统

效果 安装 1.创建并进入目录 mkdir -p /opt/etherpad cd /opt/etherpad 2.修改目录权限 chmod -R 777 /opt/etherpad 3.创建并启动容器 docker run -d --name etherpad --restart always -p 10054:9001 -v /opt/etherpad/data:/opt/etherpad-lite/var etherpad/etherpad:la…