代码说明:
- 连接 Elasticsearch:使用
basic_auth
参数进行认证。 - 测试连接:获取集群的健康状态,并格式化输出结果。
- 索引文档:将一个文档索引到指定的索引中,并格式化输出结果。
- 搜索文档:在指定的索引中搜索文档,并格式化输出结果。
http_auth
参数已经被弃用,应该使用 basic_auth
参数代替
from elasticsearch import Elasticsearch
# 配置 Elasticsearch 连接
# 替换为你的 Elasticsearch 地址、端口、用户名和密码
es = Elasticsearch(
['http://xxxx:8816'],
basic_auth=('admin', 'xxx')
)
# 测试连接
try:
# 尝试获取集群的健康状态
health = es.cluster.health()
print("集群健康状态:")
print(f" 集群名称: {health.get('cluster_name')}")
print(f" 状态: {health.get('status')}")
print(f" 节点数量: {health.get('number_of_nodes')}")
print(f" 数据节点数量: {health.get('number_of_data_nodes')}")
print(f" 活跃主分片: {health.get('active_primary_shards')}")
print(f" 活跃分片: {health.get('active_shards')}")
print(f" 初始化中分片: {health.get('initializing_shards')}")
print(f" 未分配分片: {health.get('unassigned_shards')}")
print(f" 挂起的未分配分片: {health.get('delayed_unassigned_shards')}")
print(f" 待处理任务数: {health.get('number_of_pending_tasks')}")
print(f" 活跃分片百分比: {health.get('active_shards_percent_as_number')}%")
except Exception as e:
print(f"连接或查询失败: {e}")
# 索引一个文档
try:
response = es.index(index="your_index", id=1, document={"title": "Test Document"})
print("\n索引文档结果:")
print(f" 索引: {response.get('_index')}")
print(f" 文档ID: {response.get('_id')}")
print(f" 结果: {response.get('result')}")
print(f" 版本: {response.get('_version')}")
except Exception as e:
print(f"索引文档失败: {e}")
# 搜索文档
try:
response = es.search(index="your_index", body={"query": {"match": {"title": "Test"}}})
print("\n搜索文档结果:")
print(f" 耗时: {response.get('took')} 毫秒")
print(f" 超时: {response.get('timed_out')}")
print(f" 总文档数: {response.get('hits').get('total')['value']}")
hits = response.get('hits').get('hits', [])
if hits:
for hit in hits:
print(f" 标题: {hit.get('_source').get('title')}")
else:
print(" 没有找到匹配的文档")
except Exception as e:
print(f"搜索失败: {e}")