python多方式操作elasticsearch介绍

news2025/1/16 6:58:46

python多方式操作elasticsearch介绍

1. requests模块操作ES

requests 是一个 Python HTTP 库,它简化了发送 HTTP 请求和处理响应的过程。通过 requests 模块,开发人员可以轻松地与 Web 服务进行通信,包括获取网页内容、执行 API 请求等。requests 提供了简洁而直观的 API,使得发送 GET、POST、PUT、DELETE 等类型的请求变得容易。它支持各种认证方式、持久连接、会话管理、文件上传等功能,同时提供了丰富的响应处理方法,包括 JSON 解析、内容解码、状态码检查等。由于其简单易用的特点,requests 成为了 Python 社区中最受欢迎的 HTTP 库之一,被广泛应用于网络爬虫、Web 开发和数据采集等场景。

  • 使用requests库操作ElasticSearch的基本方法
    1.创建/更新文档:使用PUT方法
    2.获取文档:使用GET方法
    3.删除文档:使用DELETE方法
    4.搜索文档:使用GET方法,并在URL中添加搜索参数
    pip install requests
import requests

es_url = 'http://wangting_host:9200'

#### 通过接口简单获取API信息
res = requests.get(f'{es_url}/_cat/nodes')
print(res.text)

"""
Output:
192.170.0.181 28 92 0 0.00 0.01 0.05 cdfhilmrstw - ops03
192.170.0.150 43 98 0 0.01 0.04 0.05 cdfhilmrstw - ops01
192.170.0.13  24 87 0 0.01 0.02 0.05 cdfhilmrstw * ops02
"""



#### 创建/更新文档方法 (存在更新,不存在创建)
def create_or_update_doc(index_name, doc_id, document):
    url = f'{es_url}/{index_name}/_doc/{doc_id}'
    response = requests.put(url, json=document)
    print(response.json())
    
# 调用方法实现功能(调用通常使用main函数,这里直接使用)
# 调用创建
index_name = 'test0329'
doc_id = '1'
document = {
    '标题': 'Python ElasticSearch',
    '正文': 'ElasticSearch is a great tool for full-text search.'
}

create_or_update_doc(index_name, doc_id, document)
"""
Output:
{'_index': 'test0329', '_id': '1', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 0, '_primary_term': 1}

命令行执行效果:
GET /_cat/indices?v

health status index              uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   test0329           HPGaPO2xQiGNOVpozZ5wSg   1   1          1            0     11.3kb          5.6kb
"""

  
    
#### 获取文档方法
def get_doc(index_name, doc_id):
    url = f'{es_url}/{index_name}/_doc/{doc_id}'
    response = requests.get(url)
    print(response.json())

    
get_doc(index_name, doc_id)
"""
Output:
{'_index': 'test0329', '_id': '1', '_version': 1, '_seq_no': 0, '_primary_term': 1, 'found': True, '_source': {'标题': 'Python ElasticSearch', '正文': 'ElasticSearch is a great tool for full-text search.'}}
"""
    
    
    
#### 搜索文档方法
def search_doc(index_name, query):
    url = f'{es_url}/{index_name}/_search'
    response = requests.get(url, params=query)
    print(response.json())


query = {'q': 'Python'}
search_doc(index_name, query)

"""
Output:
{'took': 9, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 1, 'relation': 'eq'}, 'max_score': 0.2876821, 'hits': [{'_index': 'test0329', '_id': '1', '_score': 0.2876821, '_source': {'标题': 'Python ElasticSearch', '正文': 'ElasticSearch is a great tool for full-text search.'}}]}}
"""    
    
    
    
#### 删除文档方法
def delete_doc(index_name, doc_id):
    url = f'{es_url}/{index_name}/_doc/{doc_id}'
    response = requests.delete(url)
    print(response.json())
   

# 调用删除
delete_doc(index_name, doc_id)

"""
再次使用get_doc获取方法验证
get_doc(index_name, doc_id)
Output:
{'_index': 'test0329', '_id': '1', 'found': False}
"""

2. elasticsearch模块操作ES

elasticsearch 是 Python 中用于与 Elasticsearch 进行交互的官方库,它提供了一种方便的方式来执行各种操作,包括索引、搜索、删除文档等。这个库封装了与 Elasticsearch REST API 的交互细节,使得开发人员能够更轻松地与 Elasticsearch 集群进行通信。

使用 elasticsearch 库,您可以轻松地连接到 Elasticsearch 集群,并执行各种操作。

  1. 连接到 Elasticsearch 集群:使用 Elasticsearch 类连接到 Elasticsearch 集群,可以指定多个节点,并支持 HTTP Basic 认证和其他连接参数。

  2. 索引文档:使用 index 方法可以将文档索引到 Elasticsearch 中,您可以指定索引名称、文档类型、文档 ID 和文档内容。

  3. 获取文档:使用 get 方法可以根据索引名称、文档类型和文档 ID 获取特定文档的内容。

  4. 搜索文档:使用 search 方法可以执行搜索操作,您可以指定查询条件、排序规则、分页参数等。

  5. 删除文档:使用 delete 方法可以根据索引名称、文档类型和文档 ID 删除特定文档。

  6. 批量操作elasticsearch 库支持批量索引、更新和删除文档,可以显著提高性能。

  7. 异常处理elasticsearch 库提供了对 Elasticsearch 返回的各种错误和异常的处理机制,使得开发人员能够更好地处理异常情况。

  8. 灵活性elasticsearch 库允许您以不同的方式指定查询条件、索引文档和执行其他操作,以满足各种需求。

pip install elasticsearch

2-1. elasticsearch模块基本用法

from elasticsearch import Elasticsearch


def create_index(client, index_name):
    if not client.indices.exists(index=index_name):
        result = client.indices.create(index=index_name)
        print(f'< def create_index >: Index[{index_name}] created successfully! {result}')
    else:
        print(f'< def create_index >: Index[{index_name}] already exists!')


def delete_index(client, index_name):
    if client.indices.exists(index=index_name):
        result = client.indices.delete(index=index_name)
        print(f'< def delete_index >: Index[{index_name}] {result}')


def insert_data(client, index_name, document_id, data):
    if client.indices.exists(index=index_name):
        result = client.index(index=index_name, id=document_id, body=data)
        print(f'< def insert_data >: Index[{index_name}] {result}')
    else:
        print(f'< def insert_data >: Index[{index_name}] is not exists')


def query_data(client, index_name, query):
    if client.indices.exists(index=index_name):
        result = client.search(index=index_name, body=query)
        clean_data = dict(result)['hits']['hits'][0]['_source']
        for key, value in clean_data.items():
            print(f"{key}:{value}")
    else:
        print("index is not exists")


def delete_data(client, index_name, document_id):
    result = client.delete(index=index_name, id=document_id)
    print(f'< def delete_data >: {result}')


def main():
    es_url = "http://wangting_host:9200"
    client = Elasticsearch(es_url)

    index_name = "user_login"
    document_id = "1"
    data = {
        "username": "wangting_666",
        "password": "12345678",
        "phone": "13813812345"
    }

    query = {'query': {'match_all': {}}}

    create_index(client, index_name)
    insert_data(client, index_name, document_id, data)
    query_data(client, index_name, query)
    delete_data(client, index_name, document_id)
    delete_index(client, index_name)


if __name__ == "__main__":
    main()
    
    
    
##### 控制台输出 #####
### create_index
< def create_index >: Index[user_login] already exists!

### insert_data
< def insert_data >: Index[user_login] {'_index': 'user_login', '_id': '1', '_version': 2, 'result': 'updated', '_shards': {'total': 2, 'successful': 2, 'failed': 0}, '_seq_no': 1, '_primary_term': 1}

### query_data
username:wangting_666
password:12345678
phone:13813812345

### delete_data
< def delete_data >: {'_index': 'user_login', '_id': '1', '_version': 3, 'result': 'deleted', '_shards': {'total': 2, 'successful': 2, 'failed': 0}, '_seq_no': 2, '_primary_term': 1}

### delete_index
< def delete_index >: Index[user_login] {'acknowledged': True}

2-2. elasticsearch模块业务使用方式

  • 生产环境使用demo

README.md :

项目描述:
本项目demo是一个使用 Python 编写的示例代码,演示了如何使用 ElasticSearch 的官方 Python 客户端库 elasticsearch 进行索引管理和文档操作。项目主要包括创建索引、定义映射、插入文档、更新文档、查询文档和删除文档等功能,并且利用 Python 内置的 logging 模块实现了日志记录功能,将运行时的信息输出到指定的日志文件中。

功能概述:

1. 初始化 ES 连接:根据配置文件中的信息初始化 ElasticSearch 连接。
2. 创建索引:如果指定的索引不存在,则创建新的索引;如果索引已存在,则忽略。
3. 删除索引:如果指定的索引存在,则删除该索引。
4. 定义映射:为指定索引定义文档的映射。
5. 插入文档:向指定索引中插入新的文档。
6. 更新文档:更新指定索引中的文档。
7. 查询文档:在指定索引中搜索文档。
8. 删除文档:从指定索引中删除文档。

日志记录:
项目使用 logging 模块记录运行时的信息,包括索引的创建、映射的定义、文档的插入、更新、查询和删除等操作,将日志信息输出到项目根目录下的 log 目录中的 elasticsearch_dsl.log 文件中,方便开发者查看和调试。

使用说明:

1. 在配置文件 config.ini 中配置 ElasticSearch 的主机地址。
2. 运行 ElasticSearch_prd.py 文件,即可执行项目中的示例代码,演示 ElasticSearch 的索引管理和文档操作功能。

该项目提供了一个简单而完整的示例,可供开发者学习和参考,帮助理解如何使用 elasticsearch-dsl 库进行 ElasticSearch 的操作。

目录结构
ElasticSearch/
│
├── conf/                      # 配置文件目录
│   └── config.ini             # ElasticSearch 配置文件
│
├── log/                       # 日志文件目录
│   └── elasticsearch_dsl.log  # 日志文件
│
├── ElasticSearch_prd.py     # 项目主文件
│
└── README.md                  # 项目说明文件


配置文件:
config.ini

[elasticsearch]
host = http://wangting_host:9200

ElasticSearch_prd.py 完整代码:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# @Project  :ElasticSearch
# @File     :ElasticSearch-dsl.py
# @Time     :2024/3/30 20:24
# @Author   :wangting_666


import os
import logging
import configparser
from elasticsearch import Elasticsearch

# 设置日志格式和级别
log_dir = "./log"
if not os.path.exists(log_dir):
    os.makedirs(log_dir)

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.FileHandler(os.path.join(log_dir, "elasticsearch_dsl.log")),
        logging.StreamHandler()
    ]
)

# 读取ES配置
config = configparser.ConfigParser()
config.read('./conf/config.ini')
es_host = config.get('elasticsearch', 'host')


# 初始化ES连接
def init_es_client(host):
    return Elasticsearch(hosts=[host])


# 创建索引方法
def create_index(es, index_name):
    """创建索引,如果索引已存在则忽略"""
    if not es.indices.exists(index=index_name):
        es.indices.create(index=index_name)
        logging.info(f"Index '{index_name}' created successfully")
    else:
        logging.info(f"Index '{index_name}' already exists")


# 删除索引方法
def delete_index(es, index_name):
    if es.indices.exists(index=index_name):
        es.indices.delete(index=index_name)
        logging.info(f"Index '{index_name}' deleted successfully")


# 定义映射方法
def define_mapping(es, index_name, mapping):
    """为索引定义映射"""
    es.indices.create(index=index_name, body=mapping, ignore=400)
    logging.info(f"Mapping for index '{index_name}' defined successfully")


# 插入文档
def insert_document(es, index_name, doc_id=None, document=None):
    """插入文档到指定索引"""
    es.index(index=index_name, id=doc_id, body=document)
    logging.info(f"Document inserted into index '{index_name}' with ID '{doc_id}'")


# 更新文档
def update_document(es, index_name, doc_id=None, updated_doc=None):
    """更新指定ID的文档"""
    es.update(index=index_name, id=doc_id, body={"doc": updated_doc})
    logging.info(f"Document with ID '{doc_id}' in index '{index_name}' updated successfully")


# 查询文档
def search_documents(es, index_name, query):
    """在指定索引中搜索文档"""
    result = es.search(index=index_name, body=query)
    logging.info(f"Search result for index '{index_name}': {result}")
    return result


# 删除文档
def delete_document(es, index_name, doc_id=None):
    """删除指定ID的文档"""
    es.delete(index=index_name, id=doc_id)
    logging.info(f"Document with ID '{doc_id}' deleted from index '{index_name}'")


def main():
    es = init_es_client(es_host)
    index_name = "wangting_666"
    create_index(es, index_name)
    mapping = {
        "mappings": {
            "properties": {
                "name": {"type": "text"},
                "age": {"type": "integer"},
                "email": {"type": "keyword"}
            }
        }
    }
    
    define_mapping(es, index_name, mapping)

    doc = {
        "name": "小米su7",
        "age": 18,
        "email": "wang@xiaomi.com"
    }
    
    insert_document(es, index_name, doc_id="1", document=doc)

    update_document(es, index_name, doc_id="1", updated_doc={"age": 66})

    query = {'query': {'match_all': {}}}
    search_documents(es, index_name, query)

    delete_document(es, index_name, doc_id="1")
    delete_index(es, index_name)


if __name__ == "__main__":
    main()

代码运行日志内容示例:

elasticsearch_dsl.log

2024-03-30 20:08:41,718 [INFO] HEAD http://wangting_host:9200/wangting_666 [status:404 duration:0.098s]
2024-03-30 20:08:41,857 [INFO] PUT http://wangting_host:9200/wangting_666 [status:200 duration:0.140s]
2024-03-30 20:08:41,857 [INFO] Index 'wangting_666' created successfully
2024-03-30 20:08:41,947 [INFO] PUT http://wangting_host:9200/wangting_666 [status:400 duration:0.063s]
2024-03-30 20:08:41,947 [INFO] Mapping for index 'wangting_666' defined successfully
2024-03-30 20:08:42,014 [INFO] PUT http://wangting_host:9200/wangting_666/_doc/1 [status:201 duration:0.066s]
2024-03-30 20:08:42,014 [INFO] Document inserted into index 'wangting_666' with ID '1'
2024-03-30 20:08:42,059 [INFO] POST http://wangting_host:9200/wangting_666/_update/1 [status:200 duration:0.044s]
2024-03-30 20:08:42,059 [INFO] Document with ID '1' in index 'wangting_666' updated successfully
2024-03-30 20:08:42,097 [INFO] POST http://wangting_host:9200/wangting_666/_search [status:200 duration:0.038s]
2024-03-30 20:08:42,097 [INFO] Search result for index 'wangting_666': {'took': 0, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 0, 'relation': 'eq'}, 'max_score': None, 'hits': []}}
2024-03-30 20:08:42,162 [INFO] DELETE http://wangting_host:9200/wangting_666/_doc/1 [status:200 duration:0.064s]
2024-03-30 20:08:42,162 [INFO] Document with ID '1' deleted from index 'wangting_666'
2024-03-30 20:08:42,215 [INFO] HEAD http://wangting_host:9200/wangting_666 [status:200 duration:0.053s]
2024-03-30 20:08:42,283 [INFO] DELETE http://wangting_host:9200/wangting_666 [status:200 duration:0.068s]
2024-03-30 20:08:42,284 [INFO] Index 'wangting_666' deleted successfully

验证执行过程是否有问题,可以先不执行delete相关方法

执行代码后,在kibana上查询数据验证

3. elasticsearch-dsl模块使用

3-1. 什么是 elasticsearch-dsl?

​ Elasticsearch DSL(Domain Specific Language 领域特定语言)是 Elasticsearch 官方提供的一个 Python 客户端库,它允许开发者以一种更加 Pythonic 和直观的方式与 Elasticsearch 进行交互和查询。DSL 不是一种编程语言,而是一种专门针对某一领域(如 Elasticsearch 查询语言)设计的语言。在 Elasticsearch 中,DSL 用于构建复杂的搜索查询、聚合操作和过滤条件。

​ Elasticsearch DSL 提供了一个面向对象的接口,使得开发者可以使用 Python 中的类和方法来构建 Elasticsearch 查询,而不必直接编写 JSON 查询体。这种方式使得代码更加清晰易懂,并且可以利用 Python 的强大功能来构建动态查询。通过 Elasticsearch DSL,开发者可以以更加高效和灵活的方式构建 Elasticsearch 查询,同时还能够利用 Python 生态系统中丰富的工具和库来处理查询结果。

​ Elasticsearch DSL 使得 Python 开发者简化了与 Elasticsearch 的交互过程,并提供了更加直观和易于理解的接口。

3-2. elasticsearch-dsl特点

  • 简化了与 Elasticsearch 的交互过程,使得代码更加易于理解和维护。
  • 提供了一种更加直观的方式来构建查询和聚合操作,无需直接操作 JSON。
  • 支持类型检查和自动完成,减少了错误的可能性。
  • 可以更加灵活地构建动态查询,根据不同的条件生成不同的查询语句。

3-3. elasticsearch-dsl 的基本构件

在 elasticsearch-dsl 中,主要的构件包括:

  • 查询(Queries)
  • 过滤器(Filters)
  • 聚合(Aggregations)
  • 排序(Sorting)
  • 分页(Pagination)

3-4. elasticsearch-dsl使用

elasticsearch-dsl模块功能非常多,常见功能如表

方法功能代码示例
Search(index)创建一个搜索请求对象s = Search(index='my_index')
query(...)设置查询条件s = s.query('match', title='python')
filter(...)设置过滤条件s = s.filter('term', category='programming')
sort(...)设置排序条件s = s.sort('title')
source(...)设置返回字段s = s.source(['title', 'category'])
highlight(...)设置高亮显示字段s = s.highlight('title')
aggs.bucket(...)添加一个聚合桶s.aggs.bucket('by_category', 'terms', field='category')
aggs.metric(...)添加一个聚合指标s.aggs.metric('avg_age', 'avg', field='age')
execute()执行搜索请求response = s.execute()
response.hits.total.value获取搜索结果的总数total_hits = response.hits.total.value
response.hits.hits获取搜索结果的文档列表hits = response.hits.hits
response.aggregations获取聚合结果aggs_result = response.aggregations
Q('query_string', query=...)创建一个查询字符串查询q = Q('query_string', query='python')
A('terms', field=...)创建一个术语聚合a = A('terms', field='category')
Index(name)创建一个索引对象index = Index('my_index')
index.create()创建索引index.create()
index.delete()删除索引index.delete()
3-4-1. 匹配查询(Match Query)

匹配查询用于查找包含指定文本的文档。

from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search, Q

# 创建 Elasticsearch 客户端
client = Elasticsearch(['http://wangting_host:9200'])

# 创建一个 Search 对象
s = Search(using=client, index='storyline_base')

# 匹配查询
s1 = s.query(Q('match', raw_text='碧桂园'))

# 执行查询
response = s.execute()

# 处理查询结果
for hit in response:
    print(hit.raw_text)
    
# Output:
"""
标题:碧桂园简介(碧桂园简介)
标题:碧桂园(2007.HK)跌超4%,报1.56港元,碧桂园6月销售额创新低|碧桂园
标题:碧桂园被冻结3.8亿存款|碧桂园
标题:碧桂园杨惠妍:碧桂园不是家族企业
标题:碧桂园投资版图盘点 碧桂园
标题:碧桂园天誉(碧桂园简介)
标题:碧桂园近期转让多家公司股权|碧桂园
标题:中国平安(601318.SH):有关碧桂园的报道完全与事实不符 公司未持有碧桂园的股份|碧桂园
标题:森鹰窗业:公司正在履行合同中无碧桂园相关项目,也不存在碧桂园相关项目应收账款|碧桂园
标题:碧桂园新开楼盘排名(碧桂园山河城)
"""
3-4-2. 多字段匹配查询(Multi-match Query)

多字段匹配查询用于在多个字段中查找包含指定文本的文档。

from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search, Q

# 创建 Elasticsearch 客户端
client = Elasticsearch(['http://wangting_host:9200'])

# 创建一个 Search 对象
s = Search(using=client, index='storyline_base')

# 多字段匹配查询
s = s.query(Q('match', raw_text='碧桂园') & Q('match', news_id='66432104581263876650'))

# 执行查询
response = s.execute()

# 处理查询结果
for hit in response:
    print(hit.doc_id, hit.news_id, hit.pubtime, hit.raw_text)
    
# Output:
"""
f46b1be478ff3bb94f9b1b8f4d6283ce 66432104581263876650 2023-07-28 09:54:56 标题:港股房地产股多数走强,碧桂园涨近4%
1d06a2c58f536028da3de03db66d540b 66432104581263876650 2023-07-28 09:54:56 摘要:香港股市中的大多数房地产股票表现强劲
"""
3-4-3. 范围查询(Range Query)

范围查询用于查找字段值在指定范围内的文档。

from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search, Q

# 创建 Elasticsearch 客户端
client = Elasticsearch(['http://wangting_host:9200'])

# 创建一个 Search 对象
s = Search(using=client, index='storyline_base')

# 添加范围查询
s = s.query(
    Q('range', news_id={'gte': 2073620092894144610, 'lte': 2073620092894144630}))

# 执行查询
response = s.execute()

# 处理查询结果
for hit in response:
    print(hit.news_id, hit.raw_text)
    
# Output:
"""
2073620092894144620 标题:华为杨超斌:持续创新引领数字时代
2073620092894144623 摘要:华为公司高级副总裁杨超斌在2023年巴塞罗那世界移动通信大会上发表主题演讲,强调5G技术的发展推动社会进入智能世界。
"""
3-4-4. 通配符查询(Wildcard Query)

通配符查询用于查找符合指定模式的文档。

from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search, Q

# 创建 Elasticsearch 客户端
client = Elasticsearch(['http://wangting_host:9200'])

# 创建一个 Search 对象
s = Search(using=client, index='storyline_base')

# 通配符查询
s = s.query(Q('wildcard', doc_id='3dd7e3*'))

# 执行查询
response = s.execute()

# 处理查询结果
for hit in response:
    print(hit.doc_id, hit.raw_text)
    
# Output:
"""
3dd7e3e90822340543f8a265ae564f9d 标题:华为杨超斌:持续创新引领数字时代
"""

4. Elasticsearch服务python日常巡检监控

es_check.py脚本内容

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
import requests
from elasticsearch import Elasticsearch
import datetime
import os
import logging

# 设置日志格式和级别
log_dir = "./log"
if not os.path.exists(log_dir):
    os.makedirs(log_dir)

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.FileHandler(os.path.join(log_dir, "es_check.log")),
        logging.StreamHandler()
    ]
)

# Elasticsearch配置信息
es_host = 'http://wangting_host:9200'
es = Elasticsearch(es_host)


# 函数:检查 Elasticsearch 集群是否可访问
def check_elasticsearch_availability():
    try:
        response = requests.get(es_host)
        if response.status_code == 200:
            logging.info(f'es {es_host} 健康,可以访问')
        else:
            logging.error(f'es {es_host} 不可访问! Status code:{response.status_code}')
    except requests.ConnectionError:
        logging.error("连接错误。es无法正常连接")


# 获取集群健康状态
def check_cluster_health():
    health = es.cluster.health()
    logging.info(f"集群整体健康状态: {health['status']}")


# 函数:检查节点信息
def check_node_info():
    try:
        response = requests.get(f'{es_host}/_nodes')
        if response.status_code == 200:
            nodes_info = response.json()
            nodes_count = len(nodes_info['nodes'])
            logging.info("es集群节点 node 个数: %d", nodes_count)
        else:
            logging.error("获取集群节点信息数据失败: %d", response.status_code)
    except requests.ConnectionError:
        logging.error("连接错误。es无法正常连接")


# 获取所有索引的状态
def check_index_status():
    indices = es.indices.stats()['indices']
    for index_name, index_stats in indices.items():
        status = index_stats['status']
        logging.info(f"索引 {index_name} 状态: {status}")


# 监控搜索性能
def monitor_search_performance():
    query = {'query': {'match_all': {}}}
    start_time = datetime.datetime.now()
    result = es.search(index='wangting_666', body=query)
    end_time = datetime.datetime.now()
    response_time = (end_time - start_time).total_seconds()
    logging.info(f"搜索查询性能 - 响应时间: {response_time} 秒, 搜索请求数量: {result['hits']['total']['value']}")


# 监控索引性能
def monitor_indexing_performance():
    doc = {
        "name": "小米su7",
        "age": 18,
        "email": "wang@xiaomi.com"
    }
    start_time = datetime.datetime.now()
    es.index(index='wangting_666', id="1", body=doc)
    end_time = datetime.datetime.now()
    response_time = (end_time - start_time).total_seconds()
    logging.info(f"索引性能 - 响应时间: {response_time} 秒")


# 执行巡检任务
def run_daily_check():
    logging.info(f"开始每日巡检: {datetime.datetime.now()}...\n")
    check_elasticsearch_availability()
    logging.info("\n")
    check_node_info()
    logging.info("\n")
    check_cluster_health()
    logging.info("\n")
    check_index_status()
    logging.info("\n")
    monitor_search_performance()
    logging.info("\n")
    monitor_indexing_performance()
    logging.info("\n每日巡检完成.")


# 主函数
if __name__ == "__main__":
    run_daily_check()

日志输出:

2024-03-30 15:11:26,799 [INFO] 开始每日巡检: 2024-03-30 15:11:26.799645...

2024-03-30 15:11:26,934 [INFO] es http://wangting_host:9200 健康,可以访问
2024-03-30 15:11:26,934 [INFO] 

2024-03-30 15:11:27,197 [INFO] es集群节点 node 个数: 3
2024-03-30 15:11:27,197 [INFO] 

2024-03-30 15:11:27,450 [INFO] GET http://wangting_host:9200/_cluster/health [status:200 duration:0.254s]
2024-03-30 15:11:27,451 [INFO] 集群整体健康状态: green
2024-03-30 15:11:27,451 [INFO] 

2024-03-30 15:11:27,696 [INFO] GET http://wangting_host:9200/_stats [status:200 duration:0.245s]
2024-03-30 15:11:27,696 [INFO] 索引 event_keystore 状态: open
2024-03-30 15:11:27,697 [INFO] 索引 knowledge_keystore 状态: open
2024-03-30 15:11:27,697 [INFO] 索引 test0330 状态: open
2024-03-30 15:11:27,697 [INFO] 索引 product_keystore 状态: open
2024-03-30 15:11:27,697 [INFO] 索引 storyline_base 状态: open
2024-03-30 15:11:27,697 [INFO] 索引 storyline_demo 状态: open
2024-03-30 15:11:27,697 [INFO] 索引 wangting_666 状态: open
2024-03-30 15:11:27,697 [INFO] 

2024-03-30 15:11:27,876 [INFO] POST http://wangting_host:9200/wangting_666/_search [status:200 duration:0.179s]
2024-03-30 15:11:27,876 [INFO] 搜索查询性能 - 响应时间: 0.178881 秒, 搜索请求数量: 1
2024-03-30 15:11:27,876 [INFO] 

2024-03-30 15:11:28,002 [INFO] PUT http://wangting_host:9200/wangting_666/_doc/1 [status:200 duration:0.126s]
2024-03-30 15:11:28,003 [INFO] 索引性能 - 响应时间: 0.127108 秒
2024-03-30 15:11:28,003 [INFO] 
每日巡检完成.

5. Python同步MySQL数据至ElasticSearch

MySQL样例数据准备:

create database wow;

DROP TABLE IF EXISTS `wow_info`;
CREATE TABLE `wow_info`  (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '角色id',
  `role` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '角色简称',
  `role_cn` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '角色类型',
  `role_pinyin` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '角色拼音',
  `zhuangbei` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '装备类型',
  `tianfu` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '天赋类型',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 14 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

INSERT INTO `wow_info` VALUES (1, 'fs', '法师', 'fashi', '布甲', '冰法|火法|奥法');
INSERT INTO `wow_info` VALUES (2, 'ms', '牧师', 'mushi', '布甲', '神牧|戒律|暗牧');
INSERT INTO `wow_info` VALUES (3, 'ss', '术士', 'shushi', '布甲', '毁灭|痛苦|恶魔');
INSERT INTO `wow_info` VALUES (4, 'dz', '盗贼', 'daozei', '皮甲', '狂徒|刺杀|敏锐');
INSERT INTO `wow_info` VALUES (5, 'ws', '武僧', 'wuseng', '皮甲', '酒仙|踏风|织雾');
INSERT INTO `wow_info` VALUES (6, 'xd', '德鲁伊', 'xiaode', '皮甲', '恢复|平衡|野性|守护');
INSERT INTO `wow_info` VALUES (7, 'dh', '恶魔猎手', 'emolieshou', '皮甲', '复仇|浩劫');
INSERT INTO `wow_info` VALUES (8, 'lr', '猎人', 'lieren', '锁甲', '兽王|生存|射击');
INSERT INTO `wow_info` VALUES (9, 'sm', '萨满', 'saman', '锁甲', '恢复|增强|元素');
INSERT INTO `wow_info` VALUES (10, 'long', '龙人', 'longren', '锁甲', '湮灭|恩护|增辉');
INSERT INTO `wow_info` VALUES (11, 'dk', '死亡骑士', 'siwangqishi', '板甲', '鲜血|冰霜|邪恶');
INSERT INTO `wow_info` VALUES (12, 'zs', '战士', 'zhanshi', '板甲', '武器|狂暴|防护');
INSERT INTO `wow_info` VALUES (13, 'sq', '圣骑士', 'shengqi', '板甲', '神圣|防护|惩戒');

python同步脚本MySQL_to_ElasticSearch.py

#!/usr/bin/env python3
# -*- coding:utf-8 -*-

import pymysql
from elasticsearch import Elasticsearch

# 连接到 MySQL 数据库
conn = pymysql.connect(host='192.168.1.1', user='root', password='123456', database='wow')
cursor = conn.cursor()

# 连接到 Elasticsearch
es = Elasticsearch(['http://wangting_host:9200'])

# 创建 Elasticsearch 索引
es.indices.create(index='es_wow_info', ignore=400)

# 查询 MySQL 数据表
cursor.execute('SELECT id, role,role_cn,role_pinyin,zhuangbei,tianfu FROM wow_info')
wow_info_data = cursor.fetchall()

# 将数据同步到 Elasticsearch
for data in wow_info_data:
    doc = {
        'role': data[1],
        'role_cn': data[2],
        'role_pinyin': data[3],
        'zhuangbei': data[4],
        'tianfu': data[5]
    }
    es.index(index='es_wow_info', id=data[0], body=doc)

# 关闭连接
conn.close()

验证结果:

6. Python同步Hive数据至ElasticSearch

Hive样例数据准备:

CREATE TABLE products (
    id INT,
    name STRING,
    price FLOAT,
    description STRING
);

INSERT INTO products VALUES
(1, 'Product 1', 19.99, 'Description for Product 1'),
(2, 'Product 2', 29.99, 'Description for Product 2'),
(3, 'Product 3', 39.99, 'Description for Product 3');

python同步脚本Hive_to_ElasticSearch.py

from pyhive import hive
from elasticsearch import Elasticsearch
import pandas as pd
from datetime import datetime

# 连接到 Hive 数据库
conn = hive.Connection(host='192.168.3.1', port=10000)
cursor = conn.cursor()

# 连接到 Elasticsearch
es = Elasticsearch(['http://wangting_host:9200'])

# 查询 Hive 数据表
cursor.execute('SELECT * FROM products')
data = cursor.fetchall()

# 将查询结果转换为 Pandas DataFrame
columns = [desc[0] for desc in cursor.description]
df = pd.DataFrame(data, columns=columns)

# 将数据转换为 Elasticsearch 所需的格式
docs = []
for index, row in df.iterrows():
    doc = {
        'id': row['id'],
        'name': row['name'],
        'price': float(row['price']),
        'description': row['description'],
        '@timestamp': datetime.now().isoformat()
    }
    docs.append(doc)

# 将数据同步到 Elasticsearch
for doc in docs:
    es.index(index='es_products_index', body=doc)

# 关闭连接
cursor.close()
conn.close()

7. ElasticSearch大量数据写入实现

Bigdata2ES.py脚本内容:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-

import time
from elasticsearch import Elasticsearch
from elasticsearch import helpers

es = Elasticsearch(['http://wangting_host:9200'])


def timer(func):
    def wrapper(*args, **kwargs):
        start = time.time()
        res = func(*args, **kwargs)
        print('共耗时约 {:.2f} 秒'.format(time.time() - start))
        return res

    return wrapper


def create_data():
    """ 写入数据 """
    for line in range(10):
        es.index(index='bigdata_insert', body={'title': line})


@timer
def gen():
    action = ({
        "_index": "bigdata_insert",
        "_source": {
            "title": i
        }
    } for i in range(1000000))
    helpers.bulk(es, action)


if __name__ == '__main__':
    # create_data()
    gen()


"""
Output:
共耗时约 215.88 秒
"""

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1557941.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JavaScript(一)---【js的两种导入方式、全局作用域、函数作用域、块作用域】

一.JavaScript介绍 1.1什么是JavaScript JavaScript简称“js”&#xff0c;js与java没有任何关系。 js是一种“轻量级、解释型、面向对象的脚本语言”。 二.JavaScript的两种导入方式 2.1内联式 在HTML文档中使用<script>标签直接引用。 <script>console.log…

跨越时空,启迪智慧:奇趣相机重塑儿童摄影与教育体验

【科技观察】近期&#xff0c;奇趣未来公司以其创新之作——“奇趣相机”微信小程序&#xff0c;强势进军儿童AI摄影市场。这款专为亚洲儿童量身定制的应用&#xff0c;凭借精准贴合亚洲儿童面部特征的AIGC大模型&#xff0c;以及丰富的摄影模板与场景设定&#xff0c;正在重新…

【每日跟读】常用英语500句(400~500)

【每日跟读】常用英语500句 Where can I buy a ticket? 在哪里能买到票&#xff1f; When is the next train? 下趟火车什么时候到&#xff1f; Thank you so much for helping me move yesterday. 非常感谢你昨天帮我搬家 I’m feeling a little under the weather toda…

与webpack类似的工具还有哪些?区别?

文章目录 一、模块化工具二、详细对比RollupParcelSnowpackVitewebpack 参考文献 一、模块化工具 模块化是一种处理复杂系统分解为更好的可管理模块的方式 可以用来分割&#xff0c;组织和打包应用。每个模块完成一个特定的子功能&#xff0c;所有的模块按某种方法组装起来&a…

吴恩达2022机器学习专项课程(一) 4.1 梯度下降

问题预览 梯度下降算法的作用是&#xff1f;梯度下降的过程&#xff1f;梯度下降和最小化成本函数的联系&#xff1f;所有的成本函数都是一个形状吗&#xff1f;在非凸形状中&#xff0c;梯度下降的更新过程是&#xff1f;在非凸形状中&#xff0c;不同的初值对最小化成本函数…

量化交易入门(三十二)什么是BIAS指标以及它的优缺点

BIAS&#xff0c;中文名称为“乖离率”&#xff0c;是量化交易中常用的一种技术指标&#xff0c;主要用于衡量价格偏离移动平均线的程度。下面我将从原理、优缺点和应用三个方面对BIAS指标进行详细讲解。 一、BIAS指标的原理 BIAS指标的计算公式为&#xff1a;BIAS(当前收盘价…

车载电子电器架构 —— 电气架构释放检查

车载电子电器架构 —— 电气架构释放检查 我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 屏蔽力是信息过载时代一个人的特殊竞争力,任何消耗你的人和事,多看一眼都是你的不对。非必要不费力证明…

Android vehicle车辆属性新增demo

目录 前言一、Vehicle模块1.1 简介1.2 Vehicle框架1.3 主要功能和特点1.4 重要服务CarService1.4.1 简介1.4.2 组成1.4.3 启动时序1.4.4 作用 二、车辆属性新增demo2.1 CarPropertyService2.1.1 简介2.1.2 架构2.1.3 车辆属性 API2.1.4 CarPropertyService 初始化流程 2.2 App …

Sentinel原理及实践

Sentinel 是什么 Sentinel 是面向分布式、多语言异构化服务架构的流量治理组件&#xff0c;主要以流量为切入点&#xff0c;从流量路由、流量控制、流量整形、熔断降级、系统自适应过载保护、热点流量防护等多个维度来帮助开发者保障微服务的稳定性。 为什么使用sentinel&…

RIP环境下的MGRE 综合实验

实验题目及要求&#xff1a; 1.R5为ISP&#xff0c;只能进行IP地址配置&#xff0c;其所有地址均配为公有IP地址 2.R1和R5间使用PPP的PAP认证&#xff0c;R5为主认证方; R2于R5之间使用PPP的chap认证&#xff0c;R5为主认证方&#xff1b; R3于R5之间使用HDLC封装。 3.R1/…

数据链路层之信道:数字通信的桥梁与守护者

✨✨ 欢迎大家来访Srlua的博文&#xff08;づ&#xffe3;3&#xffe3;&#xff09;づ╭❤&#xff5e;✨✨ &#x1f31f;&#x1f31f; 欢迎各位亲爱的读者&#xff0c;感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua小谢&#xff0c;在这里我会分享我的知识和经验。&am…

LeetCode 双指针专题

11.盛最多水的容器 给定一个长度为 n 的整数数组 height 。有 n 条垂线&#xff0c;第 i 条线的两个端点是 (i, 0) 和 (i, height[i]) 。 找出其中的两条线&#xff0c;使得它们与 x 轴共同构成的容器可以容纳最多的水。 返回容器可以储存的最大水量。 说明&#xff1a;你不…

Kubeflow文档1:介绍与架构

Kubeflow 2024/3/19版本的文档 此专栏用来展示相关的内容翻译&#xff0c;重点关注本地部署&#xff0c;关于运营商的方案&#xff0c;请自行查阅 文档地址https://www.kubeflow.org/docs/ 开始编辑时间&#xff1a;2024/3/27&#xff1b;最后编辑时间2024/3/27 Kubeflow文…

【Hello,PyQt】QTextEdit和QSplider

PyQt5 是一个强大的Python库&#xff0c;用于创建图形用户界面&#xff08;GUI&#xff09;。其中&#xff0c;QTextEdit 控件作为一个灵活多用的组件&#xff0c;常用于显示和编辑多行文本内容&#xff0c;支持丰富的格式设置和文本操作功能。另外&#xff0c;QSlider 控件是一…

java数据结构与算法刷题-----LeetCode1091. 二进制矩阵中的最短路径

java数据结构与算法刷题目录&#xff08;剑指Offer、LeetCode、ACM&#xff09;-----主目录-----持续更新(进不去说明我没写完)&#xff1a;https://blog.csdn.net/grd_java/article/details/123063846 文章目录 广度优先双分裂蛇 广度优先双分裂蛇 双分裂蛇&#xff1a;是求二…

上门家政按摩H5小程序源码

《服务器环境配置》 1、服务器环境&#xff1a;CentOS7 宝塔 Nginx php 2、环境&#xff1a;PHP7.2 MySQL5.6 3、安装扩展&#xff1a;fileinfo、redis 《程序安装配置》 1、新建站点及数据库&#xff0c;然后申请创建SSL证书&#xff0c;配置到站点&#xff0c;开启强…

LeetCode Python - 84. 柱状图中最大的矩形

目录 题目描述解法方法一方法二 运行结果方法一方法二 题目描述 给定 n 个非负整数&#xff0c;用来表示柱状图中各个柱子的高度。每个柱子彼此相邻&#xff0c;且宽度为 1 。 求在该柱状图中&#xff0c;能够勾勒出来的矩形的最大面积。 示例 1: 输入&#xff1a;heights …

“崖山数据库杯”深圳大学程序设计竞赛(正式赛)M题 一图秒

“崖山数据库杯”深圳大学程序设计竞赛&#xff08;正式赛&#xff09;_ACM/NOI/CSP/CCPC/ICPC算法编程高难度练习赛_牛客竞赛OJ (nowcoder.com) —————— 可以去牛客看题解&#xff1a; 题解 | #暂时没想法#_牛客博客 (nowcoder.net) —————— 上面的就是题解了。…

CKS之安全沙箱运行容器:gVisor

目录 一、gVisor介绍 二、gVisor架构 三、gVisor使用前置条件 四、Docker中使用gVisor 五、containerd中使用gVisor 六、Kubernetes结合gVisor使用 一、gVisor介绍 gVisor是Google开源的一种容器沙箱技术&#xff0c;其设计初衷是在提供较高安全性的同时&#xff0c;尽量…

文本文件操作

大家好&#xff1a; 衷心希望各位点赞。 您的问题请留在评论区&#xff0c;我会及时回答。 文件操作 程序运行时&#xff0c;产生的数据都是临时数据&#xff0c;程序一旦运行结束都会被释放。通过文件可以将数据持久化。 C中对文件进行操作需要包含头文件<fstream> 文件…