目录
- ES Python客户端介绍
- 封装代码
- 测试代码
- 参考
ES Python客户端介绍
官方提供了两个客户端elasticsearch、elasticsearch-dsl
pip install elasticsearch
pip install elasticsearch-dsl
第二个是对第一个的封装,类似ORM操作数据库,可以.filter、.groupby,个人感觉很鸡肋,star数也不多。平时使用的时候一般会在kibana上测试,然后直接把query拷贝过来获取更多数据,所以这里做下第一个的封装。
封装代码
- 封装后依然暴露了es,方便有特殊情况下使用
- index一般很少改动,就直接放到对象中了,可以使用set_index修改
- 常用的应该是get_doc和get_doc_scroll来获取少量和全量数据
代码测试时使用的是7.17.12版本,大于此版本可能由于官方改动出异常
pip install elasticsearch==7.17.12
es.py
import random
import string
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from typing import List,Dict
class ESClient:
def __init__(self, host="127.0.0.1",index="", http_auth = None):
self.index = index
if http_auth is None:
self.es = Elasticsearch(hosts=host)
else:
self.es = Elasticsearch(hosts=host, http_auth=http_auth)
print("success to connect " + host)
def close(self):
self.es.close()
# 设置索引
def set_index(self,index:str):
self.index = index
# 创建索引
def create_index(self, index_name: str, mappings=None):
res = self.es.indices.create(index=index_name, mappings=mappings)
return res
# 删除索引
def delete_index(self, index_name: str):
res = self.es.indices.delete(index=index_name)
return res
# 获取索引
def get_index(self, index_name: str):
res = self.es.indices.get(index=index_name)
return res
# 创建文档(单个)
def create_doc(self,body, _id=''.join(random.sample(string.ascii_letters+string.ascii_uppercase+string.digits,20))):
res = self.es.create(index=self.index, body=body, id=_id)
return res
# 创建文档(批量)
def create_doc_bulk(self, docs: List[Dict]):
actions = []
for doc in docs:
action = {
"_index": self.index,
"_op_type": "create",
"_id": ''.join(random.sample(string.ascii_letters+string.ascii_uppercase+string.digits,20))
}
for k,v in doc.items():
action[k] = v
actions.append(action)
res = bulk(client=self.es, actions=actions)
return res
# 删除文档
def delete_doc(self, doc_id):
res = self.es.delete(index=self.index, id=doc_id)
return res
# 更新文档
def update_doc(self, doc_id, doc:Dict):
body = {
"doc" : doc
}
res = self.es.update(index=self.index, id=doc_id, body=body)
return res
# 分页获取超过100000的文档
def get_doc_scroll(self,query:Dict):
res = self.es.search(index=self.index,size=10000,body=query,search_type="query_then_fetch",scroll="5m")
data_list = []
hits = res.get("hits")
scroll_id = res.get('_scroll_id')
total_value = 0
# total 可能为Dict或int
if isinstance(hits.get('total'),Dict):
total_value= hits.get('total').get('value')
else:
total_value = hits.get('total')
if total_value>0:
for data in hits.get('hits'):
data_list.append(data.get('_source'))
return scroll_id,data_list
# 通过scroll_id分页获取后序文档
def get_doc_by_scroll_id(self,scroll_id):
page = self.es.scroll(scroll_id=scroll_id,scroll="5m")
data_list = []
scroll_id = page.get('_scroll_id')
for data in page.get('hits').get('hits'):
data_list.append(data)
return scroll_id,data_list
# 清空scroll_id,防止服务端不够用
def clear_scroll(self,scroll_id):
self.es.clear_scroll(scroll_id)
# 获取索引的hits内容(一般用于获取文档id、总数)
def get_doc_all(self):
res = self.es.search(index=self.index)
return res['hits']
# 获取一个文档
def get_doc_by_id(self, id_):
res = self.es.get(index=self.index, id=id_)
return res["_source"]
# 获取所有文档的_source内容(小于100000)
def get_doc(self,query:Dict,size:int=100000):
query['size'] = size
res = self.es.search(index=self.index,body=query)
data_list = []
hits = res.get("hits")
total_value = 0
# total 可能为Dict或int
if isinstance(hits.get('total'), Dict):
total_value = hits.get('total').get('value')
else:
total_value = hits.get('total')
if total_value > 0:
for data in hits.get('hits'):
data_list.append(data.get('_source'))
return data_list
# 聚合查询(分组条件名为group_by,返回buckets)
def get_doc_agg(self, query):
res = self.es.search(index=self.index, body=query)
return res['aggregations']['group_by'].get('buckets')
# 统计查询(统计条件为stats_by,返回最值、平均值等)
def get_doc_stats(self,query):
res = self.es.search(index=self.index,body=query)
return res['aggregations']["stats_by"]
测试代码
import unittest
from es import ESClient
cli = ESClient(host="http://10.28.144.3:9200",http_auth=["elastic","changeme"])
def test_create_index():
res = cli.create_index(index_name="test")
print(res)
def test_delete_index():
res = cli.delete_index(index_name="test")
print(res)
def test_get_index():
res = cli.get_index(index_name="test")
print(res)
def test_set_index():
cli.set_index(index="test")
def test_create_doc():
body = {
"name": "lady_killer9",
"age": 19
}
res = cli.create_doc(body=body)
print(res)
def test_create_doc_bulk():
from copy import deepcopy
body = {
"name": "lady_killer9"
}
users = []
for i in range(100001):
tmp = deepcopy(body)
tmp["age"] = i
users.append(tmp)
res = cli.create_doc_bulk(docs=users)
print(res)
def test_get_doc_all():
res = cli.get_doc_all()
print(res)
def test_get_doc_by_id():
res = cli.get_doc_by_id("jHALXDQaENQZPM4C9EUt")
print(res)
def test_get_doc():
query = {
"query": {
"match_all": {
}
}
}
res = cli.get_doc(query=query,size=20)
print(res)
def test_update_doc():
body={
"name": "lady_killer_after_update"
}
res = cli.update_doc(doc_id="jHALXDQaENQZPM4C9EUt",doc=body)
print(res)
def test_delete_doc():
res = cli.delete_doc(doc_id="jHALXDQaENQZPM4C9EUt")
print(res)
def test_get_doc_agg():
query = {
"aggs": {
"group_by": {
"terms": {
"field": "age"
}
}
}
}
res = cli.get_doc_agg(query=query)
print(res)
def test_get_doc_stats():
query = {
"aggs": {
"stats_by": {
"stats": {
"field": "age"
}
}
}
}
res = cli.get_doc_stats(query=query)
print(res)
def test_get_doc_scroll():
query = {
"query": {
"match_all": {}
}
}
scroll_id,data_list = cli.get_doc_scroll(query=query)
res = []
while data_list:
res.extend(data_list)
scroll_id,data_list = cli.get_doc_by_scroll_id(scroll_id=scroll_id)
print(len(res))
if __name__ == '__main__':
# test_delete_index()
test_create_index()
test_get_index()
# test_set_index()
# test_create_doc()
# test_create_doc_bulk()
# test_get_doc_all()
# test_update_doc()
# test_get_doc_by_id()
# test_get_doc()
# test_delete_doc()
# test_get_doc_agg()
# test_get_doc_stats()
# test_get_doc_scroll()
cli.close()
测试截图
更多python相关内容:【python总结】python学习框架梳理
本人b站账号:一路狂飚的蜗牛
有问题请下方评论,转载请注明出处,并附有原文链接,谢谢!如有侵权,请及时联系。如果您感觉有所收获,自愿打赏,可选择支付宝18833895206(小于),您的支持是我不断更新的动力。
参考
github-elasticsearch
github-elasticsearch-dsl