一. 前言
在Python中使用Elasticsearch(ES)通常涉及安装Elasticsearch的Python客户端库,然后通过该库与Elasticsearch集群进行交互。
二. 基本使用
1. 安装Elasticsearch Python客户端库
首先,你需要安装elasticsearch库。你可以使用pip来安装它:
pip install elasticsearch
2. 连接到Elasticsearch集群
在Python中,你可以通过创建一个Elasticsearch对象来连接到Elasticsearch集群。
from elasticsearch import Elasticsearch
# 创建Elasticsearch客户端实例
es = Elasticsearch(['http://localhost:9200'])
# 检查连接是否成功
if es.ping():
print("Successfully connected to Elasticsearch!")
else:
print("Could not connect to Elasticsearch")
3. 执行索引操作
创建索引
在Elasticsearch中,索引类似于关系型数据库中的表。可以使用客户端实例的indices.create()
方法来创建一个新的索引。
# 创建索引的请求体(这里是一个简单的例子,实际使用中可能更复杂)
index_body = {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"field1": {"type": "text"},
"field2": {"type": "integer"}
}
}
}
# 创建索引
es.indices.create(index='my_index', body=index_body)
添加文档
可以使用index()
方法来向索引中添加文档。
# 添加文档的请求体
doc_body = {
"field1": "value1",
"field2": 123
}
# 添加文档(指定索引名和文档ID)
es.index(index='my_index', id=1, body=doc_body)
4. 执行搜索操作
使用search()
方法来执行搜索查询。
# 查询DSL
query_body = {
"query": {
"match": {
"field1": "value1"
}
}
}
# 执行搜索
response = es.search(index='my_index', body=query_body)
# 处理响应
for hit in response['hits']['hits']:
print(hit['_source'])
三. 整合封装成一个类来使用
import json
import uuid
from datetime import datetime, timedelta
from elasticsearch import Elasticsearch
from base.common.time_format import get_isoformat_time
from configure.configure import config
from configure.server_config import logger
import time
import traceback
es_conf = config['elasticsearch']
class ElasticSearchService(Elasticsearch):
es_service = None
mappings = {
"properties": {
# "id": {"type": "keyword"},
"content": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"time": {"type": "date", "format": "yyyy-MM-dd'T'HH:mm:ss"},
"qst_id": {"type": "keyword"},
"reply_type": {"type": "integer"}
}
}
def __init__(self, index_name, addrs, *args, **kwargs):
self.max_result_window = es_conf['max_result_window']
self.username = es_conf['username']
self.password = es_conf['password']
self.index_name = index_name
self.addrs = addrs
super().__init__(self.addrs, basic_auth=(self.username, self.password), request_timeout=3600)
# 1.查询index是否存在
if not self.indices.exists(index=self.index_name):
self.create_index(self.index_name)
if not self.ping():
logger.error(f"ElasticSearchHandler Connection failed")
logger.info(
f"Connect to ElasticService successfully!!! addrs:{addrs}, index_name:{self.index_name}")
def create_index(self, index_name):
# 创建索引
if not self.indices.exists(index=index_name):
response = self.indices.create(index=index_name, body={})
logger.info(f"Index [{index_name}] created successfully!")
# 检查索引是否创建成功
if not response.get('acknowledged'):
logger.info(f"Failed to create index '{index_name}'. Response: {response}")
return False
self.create_mapping_session_history()
self.create_index_setting()
if response.get('shards_acknowledged'):
logger.info(f"Index '{index_name}' All shards are acknowledged.")
else:
logger.info(f"Index '{index_name}' Not all shards are acknowledged.")
def create_mapping_session_history(self):
mapping = ElasticSearchService.mappings
# 将mapping添加到索引
response = self.indices.put_mapping(index=self.index_name, body=mapping)
# 检查索引是否创建成功
if response.get('acknowledged'):
logger.info(f"Index '{self.index_name}' created successfully with mapping.")
else:
logger.info(f"Failed to create index '{self.index_name}'. Response: {response}")
def create_index_setting(self):
setting = {"number_of_replicas": "0"}
# 将setting添加到索引
response = self.indices.put_settings(index=self.index_name, body=setting)
# 检查索引是否创建成功
if response.get('acknowledged'):
logger.info(f"Index '{self.index_name}' created successfully with setting.")
else:
logger.info(f"Failed to create index setting'{self.index_name}'. Response: {response}")
def delete_index(self, index_name):
res = self.indices.delete(index=index_name)
logger.info(f"Index [{index_name}] deleted successfully!, res: {res}")
return res
def insert_doc(self, hist_hash_id: str, doc_body: dict):
"""
新增数据
:param hist_hash_id:
:param doc:
:return:
"""
try:
self.index(index=self.index_name, id=hist_hash_id, body=doc_body)
# 刷新索引以确保文档立即可见
res = self.indices.refresh(index=self.index_name)
logger.info(f"Document hist_hash_id:[{hist_hash_id}] indexed successfully!")
return res
except Exception as e:
logger.error(f"Failed to index document hist_hash_id:[{hist_hash_id}]: {e}")
def bulk_insert_docs(self, session_histories: list):
"""
批量新增数据
:param chitchat_list:
:return:
"""
try:
# 准备批量数据
bulk_actions = []
failed_list = []
batch_count = 1000
for i in range(0, len(session_histories), batch_count):
for item in session_histories[i:i + batch_count]:
doc = {
# "id": item.get('id', 0),
"you_feild": item.get('you_feild', ''),
...
}
action = {
"index": { # Use "index" as the action
"_index": self.index_name,
# 如果需要指定文档ID,可以取消下面的注释
"_id": item.get('hist_hash_id', '')
}
}
# 将 action 和 doc 作为元组的两个元素添加到 bulk_actions 列表中
bulk_actions.append(action)
bulk_actions.append(doc)
print(f"insert data -> {item}")
response = self.bulk(index=self.index_name, body=bulk_actions)
# 检查响应中的成功和失败项
for item in response['items']:
if item['index']['status'] != 201:
failed_list.append(item)
logger.info(f"Elasticsearch 批量新增完成,failed_list:{failed_list}")
# 刷新索引以确保文档立即可见
self.indices.refresh(index=self.index_name)
return failed_list
except Exception as e:
traceback.print_exc()
logger.error(f"Elasticsearch bulk insert doc error:{e}")
def delete_doc_by_id(self, doc_ids):
""" 删除文档 """
try:
failed_list = []
for doc_id in doc_ids:
response = self.delete(index=self.index_name, id=doc_id)
# 检查响应状态
if response.meta.status != 200:
failed_list.append(doc_id)
logger.info(f"Document with ID {doc_id} in index {self.index_name} was deleted failed!")
logger.info(f"Document with ID {doc_id} in index {self.index_name} was deleted successfully.")
return failed_list
except Exception as e:
traceback.print_exc()
logger.error(f"Elasticsearch delete_doc error:{e}")
def delete_docs_by_query_body(self, query_body):
# 使用_delete_by_query API 删除所有文档
# 注意:这里我们使用了一个匹配所有文档的查询:{"query": {"match_all": {}}}
try:
response = self.delete_by_query(index=self.index_name, body=query_body)
logger.info("Deleted documents:", response['_deleted']) # 这将显示被删除的文档数量
except Exception as e:
# 捕获并处理异常
logger.error(f"Deleted docs error: {e}")
def update_doc(self, datas: list):
""" 更新文档 """
try:
failed_list = []
for data in datas:
# 更新文档(实际上是重新索引)
response = self.index(index=self.index_name, id=data['doc_id'], body=data['doc'])
logger.info("Update Response:", response)
if response.meta.status != 200:
failed_list.append(data)
# 刷新索引以立即应用更改(可选)
self.indices.refresh(index=self.index_name)
logger.info(f"Elasticsearch update_doc finished! failed_list -> {failed_list}")
except Exception as e:
traceback.print_exc()
logger.error(f"Elasticsearch update_doc error: {e}")
def get_doc(self, doc_id):
""" 获取文档数据 """
try:
doc = self.get(index=self.index_name, id=doc_id)['_source']
return doc
except Exception as e:
logger.error(f"Error retrieving document {doc_id}: {e}")
return None
def search_index(self, query_body):
"""
检索文档
query_body:查询体(Query DSL)
"""
try:
logger.info(f'elasticsearch search index_name={self.index_name},query_body={query_body}')
response = self.search(index=self.index_name, body=query_body)
logger.info(f'elasticsearch search [{self.index_name}] response.meta.status={response.meta.status}')
if response.get('_shards', {}).get('successful') == response.get('_shards', {}).get('total'):
logger.info(
f"Search index successful! total count={response['hits']['total']['value']}, match count={len(response['hits']['hits'])}")
# logger.info(f"search response -> {response}")
if response['hits']['total']['value'] > 0:
return response
return None
return None
except Exception as e:
traceback.print_exc()
logger.error(f"ElasticService search_index error:{e}")
def search_index_by_scroll_api(self, query_body):
"""
分页查询
query_body:查询体(Query DSL)
"""
try:
logger.info(f'elasticsearch search index_name={self.index_name},query_body={query_body}')
response = self.search(index=self.index_name, body=query_body, scroll='1m')
logger.info(f'elasticsearch search [{self.index_name}] response.meta.status={response.meta.status}')
if response.get('_shards', {}).get('successful') == response.get('_shards', {}).get('total'):
logger.info(
f"Search index successful! total count={response['hits']['total']['value']}, match count={len(response['hits']['hits'])}")
# logger.info(f"search response -> {response}")
if response['hits']['total']['value'] > 0:
return response
return None
return None
except Exception as e:
traceback.print_exc()
logger.error(f"ElasticService search_index error:{e}")
def search_by_sql(self, sql_body):
try:
logger.info(f'elasticsearch search index_name={self.index_name},sql_body={sql_body}')
response = self.sql.query(body=sql_body)
logger.info(f'elasticsearch search [{self.index_name}] response.meta.status={response.meta.status}')
if response.meta.status == 200:
columns = response.get('columns')
rows = response.get('rows')
# 提取列名
column_names = [col['name'] for col in columns]
# 组织成字典格式
result_dicts = []
for row in rows:
result_dict = {column_names[i]: row[i] for i in range(len(column_names))}
result_dicts.append(result_dict)
logger.info(f"Search index successful! match count={len(result_dicts)}")
return result_dicts
return []
except Exception as e:
traceback.print_exc()
logger.error(f"ElasticService search_by_sql error:{e}")
return []
def get_elastic_instance(index_name, addrs):
_es_service = None
_wait_times = 0
_try_count = 5
_interval_seconds = 10
for i in range(_try_count): # 初始化后,尝试启动5次,第次间隔10秒
try:
_es_service = ElasticSearchService(index_name, addrs)
if _es_service:
logger.info(f"ElasticService initial successfully!")
print(f"ElasticService initial successfully!")
return _es_service
logger.warning(f"Connect to elasticServer failed, try reconnect to elasticServer [{i}]!")
except Exception as e:
traceback.print_exc()
logger.warning(
f"初始化ElasticService失败,结果:{_es_service}, 异常原因:{str(e)}, 应用将在{_interval_seconds}秒后重新尝试.")
time.sleep(_interval_seconds)
es_service = None
port = es_conf['port']
host = es_conf['host']
addrs = [f"http://{host}:{port}", ]
if config['elasticsearch']['enabled']:
index_name = config['elasticsearch']['session_history_index']
es_service = get_elastic_instance(index_name, addrs)
else:
logger.info(f"[elasticsearch] 未启用! enabled -> {config['elasticsearch']['enabled']}")
if __name__ == '__main__':
index_name = config['elasticsearch']['session_history_index']
es_service = get_elastic_instance(index_name, addrs)
# 添加文档
docs = [{
# "id": i + 1,
"you_feild": "",
...
} for i in range(5)]
# 插入数据
# es_service.insert_doc('2', doc)
print(es_service.bulk_insert_docs(docs))
# 删除index
# print(es_service.delete_index(index_name))
# 获取文档
# print(es_service.get_doc('c2b27b31-80f8-4cf6-b3f2-36683b60d7da'))
# logger.info(es_service.get_doc('2'))
# 删除文档
# logger.info(es_service.delete_doc_by_id(['f554d0e5-e4cc-4556-952b-b12cdc640fe56']))
# query_body = {"query": {"match_all": {}}}
# logger.info(es_service.delete_docs_by_query_body(query_body))
# 更新数据
# datas = [{'doc_id': 'c2b27b31-80f8-4cf6-b3f2-36683b60d7da', 'doc': {'qst_content': 'qqq'}}]
# print(es_service.update_doc(datas))
# 查询数据
keyword = "缴清"
query_body = {
"query": {
"multi_match": {
"query": keyword,
"fields": ["reply_content", "qst_content", "standard_qst"]
}
},
"from": 0,
"size": 10,
"sort": [{
"chat_qst_time": "desc"
}]
}
# print(es_service.search_index(query_body))
四. 总结
以上是使用Python与Elasticsearch进行交互的基本步骤。可以根据实际需求扩展这些操作,例如处理更复杂的查询、使用聚合、批量操作等。Elasticsearch的Python客户端库提供了丰富的API,可以满足大多数与Elasticsearch交互的需求。
希望对你有所帮助!