python -【es】基本使用

news2025/1/4 1:29:07

一. 前言

在Python中使用Elasticsearch(ES)通常涉及安装Elasticsearch的Python客户端库,然后通过该库与Elasticsearch集群进行交互。

二. 基本使用

1. 安装Elasticsearch Python客户端库

首先,你需要安装elasticsearch库。你可以使用pip来安装它:

pip install elasticsearch

2. 连接到Elasticsearch集群

在Python中,你可以通过创建一个Elasticsearch对象来连接到Elasticsearch集群。

from elasticsearch import Elasticsearch

# 创建Elasticsearch客户端实例
es = Elasticsearch(['http://localhost:9200'])

# 检查连接是否成功
if es.ping():
    print("Successfully connected to Elasticsearch!")
else:
    print("Could not connect to Elasticsearch")

3. 执行索引操作

创建索引
在Elasticsearch中,索引类似于关系型数据库中的表。可以使用客户端实例的indices.create()方法来创建一个新的索引。

# 创建索引的请求体(这里是一个简单的例子,实际使用中可能更复杂)
index_body = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 1
    },
    "mappings": {
        "properties": {
            "field1": {"type": "text"},
            "field2": {"type": "integer"}
        }
    }
}

# 创建索引
es.indices.create(index='my_index', body=index_body)

添加文档
可以使用index()方法来向索引中添加文档。

# 添加文档的请求体
doc_body = {
    "field1": "value1",
    "field2": 123
}

# 添加文档(指定索引名和文档ID)
es.index(index='my_index', id=1, body=doc_body)

4. 执行搜索操作

使用search()方法来执行搜索查询。

# 查询DSL
query_body = {
    "query": {
        "match": {
            "field1": "value1"
        }
    }
}

# 执行搜索
response = es.search(index='my_index', body=query_body)

# 处理响应
for hit in response['hits']['hits']:
    print(hit['_source'])

三. 整合封装成一个类来使用

import json
import uuid
from datetime import datetime, timedelta

from elasticsearch import Elasticsearch

from base.common.time_format import get_isoformat_time
from configure.configure import config
from configure.server_config import logger
import time
import traceback

es_conf = config['elasticsearch']


class ElasticSearchService(Elasticsearch):
    es_service = None

    mappings = {
        "properties": {
            # "id": {"type": "keyword"},
            "content": {
                "type": "text",
                "analyzer": "ik_max_word",
                "search_analyzer": "ik_smart"
            },
            "time": {"type": "date", "format": "yyyy-MM-dd'T'HH:mm:ss"},
            "qst_id": {"type": "keyword"},
            "reply_type": {"type": "integer"}
        }
    }

    def __init__(self, index_name, addrs, *args, **kwargs):
        self.max_result_window = es_conf['max_result_window']
        self.username = es_conf['username']
        self.password = es_conf['password']
        self.index_name = index_name
        self.addrs = addrs
        super().__init__(self.addrs, basic_auth=(self.username, self.password), request_timeout=3600)
        
        # 1.查询index是否存在
        if not self.indices.exists(index=self.index_name):
            self.create_index(self.index_name)
        if not self.ping():
            logger.error(f"ElasticSearchHandler Connection failed")
        logger.info(
            f"Connect to ElasticService successfully!!! addrs:{addrs}, index_name:{self.index_name}")

    def create_index(self, index_name):
        # 创建索引
        if not self.indices.exists(index=index_name):
            response = self.indices.create(index=index_name, body={})
            logger.info(f"Index [{index_name}] created successfully!")
            # 检查索引是否创建成功
            if not response.get('acknowledged'):
                logger.info(f"Failed to create index '{index_name}'. Response: {response}")
                return False

            self.create_mapping_session_history()
            self.create_index_setting()

            if response.get('shards_acknowledged'):
                logger.info(f"Index '{index_name}' All shards are acknowledged.")
            else:
                logger.info(f"Index '{index_name}' Not all shards are acknowledged.")

    def create_mapping_session_history(self):
        mapping = ElasticSearchService.mappings
        # 将mapping添加到索引
        response = self.indices.put_mapping(index=self.index_name, body=mapping)

        # 检查索引是否创建成功
        if response.get('acknowledged'):
            logger.info(f"Index '{self.index_name}' created successfully with mapping.")
        else:
            logger.info(f"Failed to create index '{self.index_name}'. Response: {response}")

    def create_index_setting(self):
        setting = {"number_of_replicas": "0"}
        # 将setting添加到索引
        response = self.indices.put_settings(index=self.index_name, body=setting)

        # 检查索引是否创建成功
        if response.get('acknowledged'):
            logger.info(f"Index '{self.index_name}' created successfully with setting.")
        else:
            logger.info(f"Failed to create index setting'{self.index_name}'. Response: {response}")

    def delete_index(self, index_name):
        res = self.indices.delete(index=index_name)
        logger.info(f"Index [{index_name}] deleted successfully!, res: {res}")
        return res

    def insert_doc(self, hist_hash_id: str, doc_body: dict):
        """
        新增数据
        :param hist_hash_id:
        :param doc:
        :return:
        """
        try:
            self.index(index=self.index_name, id=hist_hash_id, body=doc_body)
            # 刷新索引以确保文档立即可见
            res = self.indices.refresh(index=self.index_name)
            logger.info(f"Document hist_hash_id:[{hist_hash_id}] indexed successfully!")
            return res
        except Exception as e:
            logger.error(f"Failed to index document hist_hash_id:[{hist_hash_id}]: {e}")

    def bulk_insert_docs(self, session_histories: list):
        """
        批量新增数据
        :param chitchat_list:
        :return:
        """
        try:
            # 准备批量数据
            bulk_actions = []
            failed_list = []
            batch_count = 1000
            for i in range(0, len(session_histories), batch_count):
                for item in session_histories[i:i + batch_count]:
                    doc = {
                        # "id": item.get('id', 0),
                        "you_feild": item.get('you_feild', ''),
                        ...
                        }
                    action = {
                        "index": {  # Use "index" as the action
                            "_index": self.index_name,
                            # 如果需要指定文档ID,可以取消下面的注释
                            "_id": item.get('hist_hash_id', '')
                        }
                    }
                    # 将 action 和 doc 作为元组的两个元素添加到 bulk_actions 列表中
                    bulk_actions.append(action)
                    bulk_actions.append(doc)
                    print(f"insert data -> {item}")
                response = self.bulk(index=self.index_name, body=bulk_actions)

                # 检查响应中的成功和失败项
                for item in response['items']:
                    if item['index']['status'] != 201:
                        failed_list.append(item)
                logger.info(f"Elasticsearch 批量新增完成,failed_list:{failed_list}")
                # 刷新索引以确保文档立即可见
                self.indices.refresh(index=self.index_name)
            return failed_list
        except Exception as e:
            traceback.print_exc()
            logger.error(f"Elasticsearch bulk insert doc error:{e}")

    def delete_doc_by_id(self, doc_ids):
        """ 删除文档 """
        try:
            failed_list = []
            for doc_id in doc_ids:
                response = self.delete(index=self.index_name, id=doc_id)
                # 检查响应状态
                if response.meta.status != 200:
                    failed_list.append(doc_id)
                    logger.info(f"Document with ID {doc_id} in index {self.index_name} was deleted failed!")
                logger.info(f"Document with ID {doc_id} in index {self.index_name} was deleted successfully.")
            return failed_list
        except Exception as e:
            traceback.print_exc()
            logger.error(f"Elasticsearch delete_doc error:{e}")

    def delete_docs_by_query_body(self, query_body):
        # 使用_delete_by_query API 删除所有文档
        # 注意:这里我们使用了一个匹配所有文档的查询:{"query": {"match_all": {}}}
        try:
            response = self.delete_by_query(index=self.index_name, body=query_body)
            logger.info("Deleted documents:", response['_deleted'])  # 这将显示被删除的文档数量
        except Exception as e:
            # 捕获并处理异常
            logger.error(f"Deleted docs error: {e}")

    def update_doc(self, datas: list):
        """ 更新文档 """
        try:
            failed_list = []
            for data in datas:
                # 更新文档(实际上是重新索引)
                response = self.index(index=self.index_name, id=data['doc_id'], body=data['doc'])
                logger.info("Update Response:", response)
                if response.meta.status != 200:
                    failed_list.append(data)
            # 刷新索引以立即应用更改(可选)
            self.indices.refresh(index=self.index_name)
            logger.info(f"Elasticsearch update_doc finished! failed_list -> {failed_list}")
        except Exception as e:
            traceback.print_exc()
            logger.error(f"Elasticsearch update_doc error: {e}")

    def get_doc(self, doc_id):
        """ 获取文档数据 """
        try:
            doc = self.get(index=self.index_name, id=doc_id)['_source']
            return doc
        except Exception as e:
            logger.error(f"Error retrieving document {doc_id}: {e}")
            return None

    def search_index(self, query_body):
        """
        检索文档
        query_body:查询体(Query DSL)
        """
        try:
            logger.info(f'elasticsearch search index_name={self.index_name},query_body={query_body}')
            response = self.search(index=self.index_name, body=query_body)
            logger.info(f'elasticsearch search [{self.index_name}] response.meta.status={response.meta.status}')
            if response.get('_shards', {}).get('successful') == response.get('_shards', {}).get('total'):
                logger.info(
                    f"Search index successful! total count={response['hits']['total']['value']}, match count={len(response['hits']['hits'])}")
                # logger.info(f"search response -> {response}")
                if response['hits']['total']['value'] > 0:
                    return response
                return None
            return None
        except Exception as e:
            traceback.print_exc()
            logger.error(f"ElasticService search_index error:{e}")

    def search_index_by_scroll_api(self, query_body):
        """
        分页查询
        query_body:查询体(Query DSL)
        """
        try:
            logger.info(f'elasticsearch search index_name={self.index_name},query_body={query_body}')
            response = self.search(index=self.index_name, body=query_body, scroll='1m')
            logger.info(f'elasticsearch search [{self.index_name}] response.meta.status={response.meta.status}')
            if response.get('_shards', {}).get('successful') == response.get('_shards', {}).get('total'):
                logger.info(
                    f"Search index successful! total count={response['hits']['total']['value']}, match count={len(response['hits']['hits'])}")
                # logger.info(f"search response -> {response}")
                if response['hits']['total']['value'] > 0:
                    return response
                return None
            return None
        except Exception as e:
            traceback.print_exc()
            logger.error(f"ElasticService search_index error:{e}")

    def search_by_sql(self, sql_body):
        try:
            logger.info(f'elasticsearch search index_name={self.index_name},sql_body={sql_body}')
            response = self.sql.query(body=sql_body)
            logger.info(f'elasticsearch search [{self.index_name}] response.meta.status={response.meta.status}')
            if response.meta.status == 200:
                columns = response.get('columns')
                rows = response.get('rows')
                # 提取列名
                column_names = [col['name'] for col in columns]
                # 组织成字典格式
                result_dicts = []
                for row in rows:
                    result_dict = {column_names[i]: row[i] for i in range(len(column_names))}
                    result_dicts.append(result_dict)
                logger.info(f"Search index successful! match count={len(result_dicts)}")
                return result_dicts
            return []
        except Exception as e:
            traceback.print_exc()
            logger.error(f"ElasticService search_by_sql error:{e}")
            return []


def get_elastic_instance(index_name, addrs):
    _es_service = None
    _wait_times = 0
    _try_count = 5
    _interval_seconds = 10
    for i in range(_try_count):  # 初始化后,尝试启动5次,第次间隔10秒
        try:
            _es_service = ElasticSearchService(index_name, addrs)
            if _es_service:
                logger.info(f"ElasticService initial successfully!")
                print(f"ElasticService initial successfully!")
                return _es_service
            logger.warning(f"Connect to elasticServer failed, try reconnect to elasticServer [{i}]!")
        except Exception as e:
            traceback.print_exc()
            logger.warning(
                f"初始化ElasticService失败,结果:{_es_service}, 异常原因:{str(e)}, 应用将在{_interval_seconds}秒后重新尝试.")
            time.sleep(_interval_seconds)


es_service = None
port = es_conf['port']
host = es_conf['host']
addrs = [f"http://{host}:{port}", ]

if config['elasticsearch']['enabled']:
    index_name = config['elasticsearch']['session_history_index']
    es_service = get_elastic_instance(index_name, addrs)
else:
    logger.info(f"[elasticsearch] 未启用! enabled -> {config['elasticsearch']['enabled']}")

if __name__ == '__main__':
    index_name = config['elasticsearch']['session_history_index']
    es_service = get_elastic_instance(index_name, addrs)
    # 添加文档
    docs = [{
        # "id": i + 1,
        "you_feild": "",
        ...
    } for i in range(5)]
    # 插入数据
    # es_service.insert_doc('2', doc)
    print(es_service.bulk_insert_docs(docs))

    # 删除index
    # print(es_service.delete_index(index_name))

    # 获取文档
    # print(es_service.get_doc('c2b27b31-80f8-4cf6-b3f2-36683b60d7da'))
    # logger.info(es_service.get_doc('2'))

    # 删除文档
    # logger.info(es_service.delete_doc_by_id(['f554d0e5-e4cc-4556-952b-b12cdc640fe56']))
    # query_body = {"query": {"match_all": {}}}
    # logger.info(es_service.delete_docs_by_query_body(query_body))

    # 更新数据
    # datas = [{'doc_id': 'c2b27b31-80f8-4cf6-b3f2-36683b60d7da', 'doc': {'qst_content': 'qqq'}}]
    # print(es_service.update_doc(datas))

    # 查询数据
    keyword = "缴清"
    query_body = {
        "query": {
            "multi_match": {
                "query": keyword,
                "fields": ["reply_content", "qst_content", "standard_qst"]
            }
        },
        "from": 0,
        "size": 10,
        "sort": [{
            "chat_qst_time": "desc"
        }]
    }
    # print(es_service.search_index(query_body))

四. 总结

以上是使用Python与Elasticsearch进行交互的基本步骤。可以根据实际需求扩展这些操作,例如处理更复杂的查询、使用聚合、批量操作等。Elasticsearch的Python客户端库提供了丰富的API,可以满足大多数与Elasticsearch交互的需求。
希望对你有所帮助!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2268878.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

一个最简单的ios程序(object_c)的编写

前言 如何在苹果系统MacOS创建一个简单的ios(iphone)程序,貌似非常的简单。但是,作为习惯了Windows开发的程序员来说,有时候还觉得有点麻烦,至少开始有点很不习惯。 本博文试着把这个过程展现一下&#xff…

Learning Multi-Scale Photo Exposure Correction

Abstract 用错误的曝光捕捉照片仍然是相机成像的主要错误来源。曝光问题可分为以下两类:(i)曝光过度,即相机曝光时间过长,导致图像区域明亮和褪色;(ii)曝光不足,即曝光时间过短,导致图像区域变暗。曝光不足和曝光过度都会大大降低…

宝塔-firefox(Docker应用)-构建自己的Web浏览器

安装基础软件 宝塔中安装firefox(Docker应用) 。宝塔中需要先安装docker及docker-composefirefox配置安装 点击firefox应用,选择【安装配置】点击右边绿色按钮,进行安装,这一步等待docker-compose根据你的配置初始化docker应用 等待安装 …

【深度学习】时间序列表示方法

自然界除了2D的图片数据之外,还有语音、文字,这些数据都有时间的先后顺序的。对于2D的图像的数据,可以用RGB值来表示像素的色彩度。语音可以用信号幅度值来表示,而Pytorch没有自带String支持,在表示文字之前需要进行Em…

使用 Navicat 官方免费版来实现从 DAT 文件填充 MySQL 8 表

在异构存储库之间迁移数据(即源数据库和目标数据库来自不同供应商的不同数据库管理系统)会遇到一些挑战。在某些情况下,可以同时连接两个数据库。但有时根本无法实现。面对这样的困境,数据库从业者别无选择,只能从转储…

【CSS in Depth 2 精译_093】16.2:CSS 变换在动效中的应用(上)—— 图标的放大和过渡效果的设置

当前内容所在位置(可进入专栏查看其他译好的章节内容) 第五部分 添加动效 ✔️【第 16 章 变换】 ✔️ 16.1 旋转、平移、缩放与倾斜 16.1.1 变换原点的更改16.1.2 多重变换的设置16.1.3 单个变换属性的设置 16.2 变换在动效中的应用 ✔️ 16.2.1 放大图…

Linux 信号集与信号掩码

目录 一、引言 二、信号集是什么 三、信号集关键函数 1.信号集的创建与初始化 2.信号的添加与删除 3.信号集的阻塞与解除阻塞 四、信号集实际应用场景 五、信号掩码的作用 六、信号掩码相关函数 1.sigprocmask 函数 2.sigemptyset 和 sigfillset 函数 七、信号掩码注…

CPT203 Software Engineering 软件工程 Pt.5 软件测试(中英双语)

文章目录 8. 软件测试8.1 Testing(测试)8.1.1 A note of testing under the V & A framework8.1.2 The Basics8.1.3 The Goals8.1.4 The Stages 8.2 Developing testing(开发测试)8.2.1 Unit testing(单元测试&…

微信小程序中遇到过的问题

记录微信小程序中遇到的问题(持续更新ing) 问题描述:1. WXML中无法直接调用JavaScript方法。2. css中无法直接引用背景图片。3. 关于右上角胶囊按钮。4. 数据绑定问题。5. 事件处理问题。6. 关于movable-view组件的问题7. 关于设置宽度后设置…

【C++】B2084 质因数分解

博客主页: [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 💯前言💯 题目描述:质因数分解输入格式输出格式输入输出样例: 💯 问题解析1. 质数的定义2. 题目特点3. 约束范围4. 问题分解 💯 解题…

Unity中列表List使用出类似字典Dictionary的感觉

首先为什么会有这个标题? 因为字典很好用,只需要键就能拿到值,这种感觉是真的爽,新手最喜欢用了,遇事不决就字典,但是也有不好的地方,字典的内存开销比列表List要大,遍历也是List占…

分布式项目___某污水处理项目

一.分布式项目___污水处理项目 项目地址:https://gitee.com/yanyigege/collaborative-water-springboot.git ​ 1.项目背景 总公司在全国各地有处理污水的项目部,各项目部处理自己的污水,总部需要监控各地分项目部每天处理污水的原料用量,掌握各分部的污水处理情况 ​ 2.功…

WebRTC:实现浏览器与移动应用的实时通信

1.技术简介 (Web Real-Time)是一种开放式实时通信技术,旨在使浏览器和移动应用程序通过简单的API即可实现实时音频、视频和数据传输,而无需安装插件或额外软件。它支持网络应用中的点对点通信,例如视频聊天、语音通话…

小程序基础 —— 07 创建小程序项目

创建小程序项目 打开微信开发者工具,左侧选择小程序,点击 号即可新建项目: 在弹出的新页面,填写项目信息(后端服务选择不使用云服务,开发模式为小程序,模板选择为不使用模板)&…

数据结构之线性表之链表(附加一个考研题)

链表的定义 链表的结构: 单链表-初始化 代码实现: 单链表-头插法 代码实现: 这里我给大家分析一下 我们每创建一个新的节点都要插在头节点的后面,我们一定要注意顺序 一定要先让新节点指向头节点指向的下一个节点,…

RocketMQ(二)RocketMQ实战

文章目录 一、RocketMQ实战1.1 批量消息发送1.2 消息发送队列自选择1.3 事务消息1.4 SpringCloud集成RocketMQ 二、最佳实践2.1 生产者2.1.1 发送消息注意事项2.1.2 消息发送失败处理方式 2.2 消费者2.2.1 消费过程幂等2.2.2 消费打印日志 2.3 Broker 三、相关问题3.1 为什么要…

Vue router router-link router-view keep-alive

Vue router router-link router-view keep-alive keep-alive 1. /:id 2 ?id

掌握 PostgreSQL 的 psql 命令行工具

title: 掌握 PostgreSQL 的 psql 命令行工具 date: 2024/12/30 updated: 2024/12/30 author: cmdragon excerpt: psql 是 PostgreSQL 关系数据库管理系统的交互式命令行工具,是数据库管理员和开发人员进行数据库管理和操作的主要接口。熟练使用 psql 工具,不仅能够提高对 …

【C++】深入理解 break 和 continue 语句

博客主页: [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 💯前言💯break 和 continue 介绍**break** 的作用**continue** 的作用注意事项 💯break 示例代码示例**执行结果****解析过程** 💯continue 示例代码示例&am…

【C++】B2064 斐波那契数列

博客主页: [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 💯前言💯题目描述输入格式输出格式输入输出样例输入输出 💯思路分析**题目本质** 💯代码实现与对比**我的代码实现**代码展示思路解析优点不足 **老师的代码…