前情:
将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(1.标准版)-CSDN博客
将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(2.换掉付费的Event Hubs)-CSDN博客
python脚本实现
厉害的小伙伴最终使用python脚本免费实现了这个功能
文件结构
以下是cursor对代码的评审意见:
这是一个用于下载 Azure Storage 访问日志并将其导入到 Elasticsearch 的项目。让我逐个文件分析:
1. 整体架构
项目采用了良好的模块化设计,主要包含以下组件:
- main.py: 主程序入口,协调整体流程
- azure_log_downloader.py: Azure blob 存储日志下载器
- elasticsearch_sender.py: Elasticsearch 数据发送器
- models.py: 数据模型定义
- log_config.py: 日志配置
- .env: 环境配置文件
2. 具体评审
main.py
优点:
- 支持日期范围处理
- 良好的错误处理和日志记录
- 模块化的设计,职责划分清晰
建议改进:
- 可以考虑添加命令行参数支持,使日期范围可配置
- 可以添加并发处理支持,提高处理速度
azure_log_downloader.py
优点:
- 清晰的类设计
- 良好的错误处理
建议改进:
- 可以添加重试机制
- 可以考虑支持异步操作
elasticsearch_sender.py
优点:
- 实现了批量处理
- 包含重试机制
- 良好的连接测试功能
建议改进:
- 可以考虑添加索引模板管理
- 可以添加更多的 ES 配置选项
models.py
优点:
- 使用 dataclass 简化了类定义
- 详细的字段解析逻辑
- 良好的类型注解
建议改进:
- 可以添加字段验证
- 可以考虑使用 Pydantic 进行数据验证
log_config.py
优点:
- 完善的日志配置
- 支持文件和控制台输出
建议改进:
- 可以考虑添加日志轮转
- 可以添加日志级别配置选项
3. 安全性建议
- 建议使用更安全的方式存储敏感信息(如 API keys)
- 建议添加 SSL 证书验证
- 建议添加请求速率限制
4. 性能建议
- 考虑添加并发处理
- 可以实现增量更新机制
- 考虑添加缓存机制
5. 可维护性建议
- 添加单元测试
- 添加更详细的文档
- 考虑添加监控指标
- 添加 CI/CD 配置
总体来说,这是一个设计良好的项目,代码质量较高,有良好的错误处理和日志记录。主要可以在性能优化、安全性和可维护性方面进行进一步改进。
具体脚本
1. azure_log_downloader.py:
from azure.storage.blob import BlobServiceClient
import json
from models import StorageOperationLog
class AzureLogDownloader:
def __init__(self, connection_string, container_name, blob_name, logger=None):
self.logger = logger
self.connection_string = connection_string
self.container_name = container_name
self.blob_name = blob_name
def _get_blob_client(self):
blob_service_client = BlobServiceClient.from_connection_string(self.connection_string)
blob_client = blob_service_client.get_blob_client(
container=self.container_name,
blob=self.blob_name
)
return blob_client
def download_and_transform(self):
"""Download and transform log data from Azure storage"""
try:
blob_client = self._get_blob_client()
if not blob_client.exists():
self.logger.info(f"Blob does not exist, skipping: {self.blob_name}")
return []
blob_data = blob_client.download_blob().readall().decode('utf-8')
transformed_entries = []
for line in blob_data.splitlines():
if line.strip():
try:
log_entry = json.loads(line)
log_obj = StorageOperationLog.from_log_entry(log_entry, self.logger)
if log_obj:
transformed_entries.append(log_obj)
except json.JSONDecodeError as e:
self.logger.error(f"Error parsing line: {str(e)}")
continue
self.logger.info(f"Downloaded and transformed {len(transformed_entries)} logs")
return transformed_entries
except Exception as e:
self.logger.error(f"Error downloading blob: {str(e)}")
self.logger.error(f"Blob: {self.blob_name}, Container: {self.container_name}")
self.logger.error(f"Error type: {type(e).__name__}")
return []
2. elasticsearch_sender.py:
from elasticsearch import Elasticsearch, helpers
import time
import uuid
class ElasticsearchSender:
def __init__(self, host, api_key=None, index_name="logs", logger=None):
self.logger = logger
self.config = {
'hosts': host,
'timeout': 30,
'retry_on_timeout': True,
'max_retries': 3,
'verify_certs': False,
'ssl_show_warn': False,
'use_ssl': True
}
if api_key:
self.config['api_key'] = api_key
self.index_name = index_name
self.es = Elasticsearch(**self.config)
def test_connection(self):
"""Test Elasticsearch connection"""
try:
info = self.es.info()
self.logger.info("\nElasticsearch Server Info:")
self.logger.info(f"Version: {info['version']['number']}")
self.logger.info(f"Cluster Name: {info['cluster_name']}")
return True
except Exception as e:
self.logger.error(f"\nElasticsearch connection failed: {str(e)}")
return False
def send_logs(self, log_entries, batch_size=500, max_retries=3):
"""Send logs to Elasticsearch"""
def generate_actions():
for entry in log_entries:
doc_data = entry.__dict__.copy()
if 'time' in doc_data:
doc_data['@timestamp'] = doc_data.pop('time')
action = {
'_index': self.index_name,
'_id': str(uuid.uuid4()),
'_source': doc_data
}
yield action
success_count = 0
failure_count = 0
retry_count = 0
while retry_count < max_retries:
try:
success, failed = helpers.bulk(
self.es,
generate_actions(),
chunk_size=batch_size,
raise_on_error=False,
raise_on_exception=False
)
success_count += success
failure_count += len(failed) if failed else 0
self.logger.info(f"\nBatch processing results:")
self.logger.info(f"- Successfully indexed: {success_count} documents")
self.logger.info(f"- Failed: {failure_count} documents")
if not failed:
break
retry_count += 1
if retry_count < max_retries:
self.logger.info(f"Retrying... (Attempt {retry_count + 1}/{max_retries})")
time.sleep(2 ** retry_count)
except Exception as e:
self.logger.error(f"\nBulk indexing error: {str(e)}")
retry_count += 1
if retry_count < max_retries:
self.logger.info(f"Retrying... (Attempt {retry_count + 1}/{max_retries})")
time.sleep(2 ** retry_count)
else:
self.logger.info("Maximum retry attempts reached")
break
return success_count, failure_count
3. log_config.py:
import logging
import os
from datetime import UTC, datetime
def setup_logger(target_date: datetime = None, log_prefix: str = "app"):
base_dir = os.path.dirname(os.path.abspath(__file__))
log_dir = os.path.join(base_dir, 'logs')
if not os.path.exists(log_dir):
os.makedirs(log_dir)
current_time = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
target_date_str = target_date.strftime("%Y%m%d") if target_date else "None"
log_file = os.path.join(log_dir, f'{log_prefix}_target_date_{target_date_str}_export_at_{current_time}.log')
logger = logging.getLogger('AccessLog')
logger.setLevel(logging.INFO)
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
4. models.py:
from dataclasses import dataclass
from datetime import datetime
import re
from typing import Optional
@dataclass
class StorageOperationLog:
time: datetime
category: Optional[str]
operationName: Optional[str]
callerIpAddress: Optional[str]
location: Optional[str]
uri: Optional[str]
durationMs: Optional[int]
referrerHeader: Optional[str]
userAgentHeader: Optional[str]
requestBodySize: Optional[int]
responseBodySize: Optional[int]
serverLatencyMs: Optional[int]
objectKey: Optional[str]
functionName: Optional[str]
file_extension: Optional[str]
@staticmethod
def parse_object_key(object_key: str, logger=None) -> tuple[Optional[str], Optional[str]]:
"""Parse objectKey to get institution_id and functionName"""
try:
container_match = re.search(r'container-(\d+)', object_key)
parts = object_key.split('/')
function_name = None
if container_match:
container_index = next((i for i, part in enumerate(parts)
if 'container-' in part), None)
if container_index is not None and container_index + 1 < len(parts):
function_name = parts[container_index + 1]
file_extension = None
if parts and '.' in parts[-1]:
file_extension = parts[-1].split('.')[-1].lower()
return function_name, file_extension
except Exception as e:
if logger:
logger.error(f"Error parsing object_key {object_key}: {str(e)}")
return None, None
@classmethod
def from_log_entry(cls, entry: dict[str, any], logger=None) -> Optional['StorageOperationLog']:
"""Create StorageOperationLog instance from raw log entry"""
try:
properties = entry.get('properties', {})
object_key = properties.get('objectKey', '')
function_name, file_extension = cls.parse_object_key(object_key)
return cls(
time=entry.get('time'),
category=entry.get('category'),
operationName=entry.get('operationName'),
callerIpAddress=entry.get('callerIpAddress'),
location=entry.get('location'),
uri=entry.get('uri'),
durationMs=int(entry.get('durationMs')) if entry.get('durationMs') is not None else None,
referrerHeader=properties.get('referrerHeader'),
userAgentHeader=properties.get('userAgentHeader'),
requestBodySize=int(properties.get('requestBodySize')) if properties.get('requestBodySize') is not None else None,
responseBodySize=int(properties.get('responseBodySize')) if properties.get('responseBodySize') is not None else None,
serverLatencyMs=int(properties.get('serverLatencyMs')) if properties.get('serverLatencyMs') is not None else None,
objectKey=object_key,
functionName=function_name,
file_extension=file_extension
)
except Exception as e:
if logger:
logger.error(f"Error creating StorageOperationLog: {str(e)}")
return None
def __post_init__(self):
if isinstance(self.time, str):
if 'Z' in self.time:
time_parts = self.time.split('.')
if len(time_parts) > 1:
microseconds = time_parts[1].replace('Z', '')[:6]
time_str = f"{time_parts[0]}.{microseconds}Z"
self.time = datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S.%fZ")
else:
self.time = datetime.strptime(self.time, "%Y-%m-%dT%H:%M:%SZ")
5. main.py:
from log_config import setup_logger
from azure_log_downloader import AzureLogDownloader
from elasticsearch_sender import ElasticsearchSender
from datetime import datetime, timedelta, UTC
from dotenv import load_dotenv
import os
load_dotenv()
def _get_index_name(target_date: datetime):
"""Get full index name for the specified date"""
return os.getenv('ELASTICSEARCH_INDEX_TEMPLATE', 'logs-{year}-{month}').format(
year=target_date.year,
month=target_date.month
)
def _get_blob_name_list(target_date: datetime):
"""Get blob paths for all hours of the specified date"""
blobs = []
for hour in range(24):
blob_time = target_date.replace(hour=hour, minute=0, second=0, microsecond=0)
blob_name = os.getenv('AZURE_STORAGE_BLOB_TEMPLATE', 'logs/y={year}/m={month}/d={day}/h={hour}').format(
year=blob_time.year,
month=blob_time.month,
day=blob_time.day,
hour=blob_time.hour
)
blobs.append(blob_name)
return blobs
def main():
start_date = datetime(2024, 1, 1, tzinfo=UTC)
end_date = datetime(2024, 1, 2, tzinfo=UTC)
current_date = start_date
while current_date <= end_date:
target_date = current_date
logger = setup_logger(target_date, os.getenv('LOG_PREFIX', 'app'))
try:
logger.info(f"\nProcessing data for {current_date.date()}")
elasticsearch_index = _get_index_name(target_date)
sender = ElasticsearchSender(
os.getenv('ELASTICSEARCH_HOST', 'http://localhost:9200'),
os.getenv('ELASTICSEARCH_API_KEY'),
elasticsearch_index,
logger
)
if not sender.test_connection():
logger.error("Elasticsearch connection failed")
current_date += timedelta(days=1)
continue
total_logs = total_success = total_failed = 0
blobs = _get_blob_name_list(target_date)
for container in os.getenv('AZURE_STORAGE_CONTAINERS', 'logs').split(','):
logger.info(f"\nProcessing container: {container}")
for blob_name in blobs:
logger.info(f"\nProcessing blob: {blob_name}")
downloader = AzureLogDownloader(
os.getenv('AZURE_STORAGE_URI'),
container,
blob_name,
logger
)
try:
log_entries = downloader.download_and_transform()
success, failed = sender.send_logs(log_entries)
total_logs += len(log_entries)
total_success += success
total_failed += failed
except Exception as e:
logger.error(f"Error processing {blob_name}: {str(e)}")
continue
logger.info(f"\n{current_date.date()} Processing completed:")
logger.info(f"Total documents processed: {total_logs}")
logger.info(f"Successfully indexed: {total_success}")
logger.info(f"Failed: {total_failed}")
finally:
for handler in logger.handlers[:]:
handler.close()
logger.removeHandler(handler)
current_date += timedelta(days=1)
if __name__ == "__main__":
main()
6. .env :
ELASTICSEARCH_HOST=http://localhost:9200
ELASTICSEARCH_API_KEY=your_api_key
ELASTICSEARCH_INDEX_TEMPLATE=logs-{year}-{month}
AZURE_STORAGE_URI=your_storage_connection_string
AZURE_STORAGE_CONTAINERS=logs
AZURE_STORAGE_BLOB_TEMPLATE=logs/y={year}/m={month}/d={day}/h={hour}
LOG_PREFIX=app
前情后续:
将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(1.标准版)-CSDN博客
将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(2.换掉付费的Event Hubs)-CSDN博客
将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(3.纯python的实惠版)-CSDN博客