将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(3.纯python的实惠版)

news2025/1/21 23:08:16

前情:

将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(1.标准版)-CSDN博客

将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(2.换掉付费的Event Hubs)-CSDN博客

python脚本实现

厉害的小伙伴最终使用python脚本免费实现了这个功能

文件结构

以下是cursor对代码的评审意见:

这是一个用于下载 Azure Storage 访问日志并将其导入到 Elasticsearch 的项目。让我逐个文件分析:

1. 整体架构

项目采用了良好的模块化设计,主要包含以下组件:

  • main.py: 主程序入口,协调整体流程
  • azure_log_downloader.py: Azure blob 存储日志下载器
  • elasticsearch_sender.py: Elasticsearch 数据发送器
  • models.py: 数据模型定义
  • log_config.py: 日志配置
  • .env: 环境配置文件

2. 具体评审

main.py

优点:

  • 支持日期范围处理
  • 良好的错误处理和日志记录
  • 模块化的设计,职责划分清晰

建议改进:

  • 可以考虑添加命令行参数支持,使日期范围可配置
  • 可以添加并发处理支持,提高处理速度
azure_log_downloader.py

优点:

  • 清晰的类设计
  • 良好的错误处理

建议改进:

  • 可以添加重试机制
  • 可以考虑支持异步操作
elasticsearch_sender.py

优点:

  • 实现了批量处理
  • 包含重试机制
  • 良好的连接测试功能

建议改进:

  • 可以考虑添加索引模板管理
  • 可以添加更多的 ES 配置选项
models.py

优点:

  • 使用 dataclass 简化了类定义
  • 详细的字段解析逻辑
  • 良好的类型注解

建议改进:

  • 可以添加字段验证
  • 可以考虑使用 Pydantic 进行数据验证
log_config.py

优点:

  • 完善的日志配置
  • 支持文件和控制台输出

建议改进:

  • 可以考虑添加日志轮转
  • 可以添加日志级别配置选项

3. 安全性建议

  • 建议使用更安全的方式存储敏感信息(如 API keys)
  • 建议添加 SSL 证书验证
  • 建议添加请求速率限制

4. 性能建议

  • 考虑添加并发处理
  • 可以实现增量更新机制
  • 考虑添加缓存机制

5. 可维护性建议

  • 添加单元测试
  • 添加更详细的文档
  • 考虑添加监控指标
  • 添加 CI/CD 配置

总体来说,这是一个设计良好的项目,代码质量较高,有良好的错误处理和日志记录。主要可以在性能优化、安全性和可维护性方面进行进一步改进。

具体脚本

1. azure_log_downloader.py:

from azure.storage.blob import BlobServiceClient
import json
from models import StorageOperationLog

class AzureLogDownloader:
    def __init__(self, connection_string, container_name, blob_name, logger=None):
        self.logger = logger
        self.connection_string = connection_string
        self.container_name = container_name
        self.blob_name = blob_name
        
    def _get_blob_client(self):
        blob_service_client = BlobServiceClient.from_connection_string(self.connection_string)
        blob_client = blob_service_client.get_blob_client(
            container=self.container_name, 
            blob=self.blob_name
        )
        return blob_client

    def download_and_transform(self):
        """Download and transform log data from Azure storage"""
        try:
            blob_client = self._get_blob_client()

            if not blob_client.exists():
                self.logger.info(f"Blob does not exist, skipping: {self.blob_name}")
                return []
            
            blob_data = blob_client.download_blob().readall().decode('utf-8')
            
            transformed_entries = []
            for line in blob_data.splitlines():
                if line.strip():
                    try:
                        log_entry = json.loads(line)
                        log_obj = StorageOperationLog.from_log_entry(log_entry, self.logger)
                        if log_obj:
                            transformed_entries.append(log_obj)
                    except json.JSONDecodeError as e:
                        self.logger.error(f"Error parsing line: {str(e)}")
                        continue
            
            self.logger.info(f"Downloaded and transformed {len(transformed_entries)} logs")
            
            return transformed_entries
        except Exception as e:
            self.logger.error(f"Error downloading blob: {str(e)}")
            self.logger.error(f"Blob: {self.blob_name}, Container: {self.container_name}")
            self.logger.error(f"Error type: {type(e).__name__}")
            return []

2. elasticsearch_sender.py:

from elasticsearch import Elasticsearch, helpers
import time
import uuid

class ElasticsearchSender:
    def __init__(self, host, api_key=None, index_name="logs", logger=None):
        self.logger = logger

        self.config = {
            'hosts': host,
            'timeout': 30,
            'retry_on_timeout': True,
            'max_retries': 3,
            'verify_certs': False,
            'ssl_show_warn': False,
            'use_ssl': True
        }
        if api_key:
            self.config['api_key'] = api_key
            
        self.index_name = index_name
        self.es = Elasticsearch(**self.config)

    def test_connection(self):
        """Test Elasticsearch connection"""
        try:
            info = self.es.info()
            self.logger.info("\nElasticsearch Server Info:")
            self.logger.info(f"Version: {info['version']['number']}")
            self.logger.info(f"Cluster Name: {info['cluster_name']}")
            return True
        except Exception as e:
            self.logger.error(f"\nElasticsearch connection failed: {str(e)}")
            return False

    def send_logs(self, log_entries, batch_size=500, max_retries=3):
        """Send logs to Elasticsearch"""
        def generate_actions():
            for entry in log_entries:
                doc_data = entry.__dict__.copy()
                if 'time' in doc_data:
                    doc_data['@timestamp'] = doc_data.pop('time')

                action = {
                    '_index': self.index_name,
                    '_id': str(uuid.uuid4()),
                    '_source': doc_data
                }
                yield action

        success_count = 0
        failure_count = 0
        retry_count = 0

        while retry_count < max_retries:
            try:
                success, failed = helpers.bulk(
                    self.es,
                    generate_actions(),
                    chunk_size=batch_size,
                    raise_on_error=False,
                    raise_on_exception=False
                )
                
                success_count += success
                failure_count += len(failed) if failed else 0
                
                self.logger.info(f"\nBatch processing results:")
                self.logger.info(f"- Successfully indexed: {success_count} documents")
                self.logger.info(f"- Failed: {failure_count} documents")
                
                if not failed:
                    break
                    
                retry_count += 1
                if retry_count < max_retries:
                    self.logger.info(f"Retrying... (Attempt {retry_count + 1}/{max_retries})")
                    time.sleep(2 ** retry_count)
                
            except Exception as e:
                self.logger.error(f"\nBulk indexing error: {str(e)}")
                retry_count += 1
                if retry_count < max_retries:
                    self.logger.info(f"Retrying... (Attempt {retry_count + 1}/{max_retries})")
                    time.sleep(2 ** retry_count)
                else:
                    self.logger.info("Maximum retry attempts reached")
                    break

        return success_count, failure_count

3. log_config.py:

import logging
import os
from datetime import UTC, datetime

def setup_logger(target_date: datetime = None, log_prefix: str = "app"):
    base_dir = os.path.dirname(os.path.abspath(__file__))
    log_dir = os.path.join(base_dir, 'logs')

    if not os.path.exists(log_dir):
        os.makedirs(log_dir)
    
    current_time = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
    target_date_str = target_date.strftime("%Y%m%d") if target_date else "None"
    log_file = os.path.join(log_dir, f'{log_prefix}_target_date_{target_date_str}_export_at_{current_time}.log')
    
    logger = logging.getLogger('AccessLog')
    logger.setLevel(logging.INFO)
    
    file_handler = logging.FileHandler(log_file, encoding='utf-8')
    file_handler.setLevel(logging.INFO)
    
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)
    
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(formatter)
    console_handler.setFormatter(formatter)
    
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    
    return logger

4. models.py:

from dataclasses import dataclass
from datetime import datetime
import re
from typing import Optional

@dataclass
class StorageOperationLog:
    time: datetime
    category: Optional[str]
    operationName: Optional[str]
    callerIpAddress: Optional[str]
    location: Optional[str]
    uri: Optional[str]
    durationMs: Optional[int]
    referrerHeader: Optional[str]
    userAgentHeader: Optional[str]
    requestBodySize: Optional[int]
    responseBodySize: Optional[int]
    serverLatencyMs: Optional[int]
    objectKey: Optional[str]
    functionName: Optional[str]
    file_extension: Optional[str]

    @staticmethod
    def parse_object_key(object_key: str, logger=None) -> tuple[Optional[str], Optional[str]]:
        """Parse objectKey to get institution_id and functionName"""
        try:
            container_match = re.search(r'container-(\d+)', object_key)

            parts = object_key.split('/')
            function_name = None
            if container_match:
                container_index = next((i for i, part in enumerate(parts) 
                                    if 'container-' in part), None)
                if container_index is not None and container_index + 1 < len(parts):
                    function_name = parts[container_index + 1]

            file_extension = None
            if parts and '.' in parts[-1]:
                file_extension = parts[-1].split('.')[-1].lower()

            return function_name, file_extension
        except Exception as e:
            if logger:
                logger.error(f"Error parsing object_key {object_key}: {str(e)}")
            return None, None

    @classmethod
    def from_log_entry(cls, entry: dict[str, any], logger=None) -> Optional['StorageOperationLog']:
        """Create StorageOperationLog instance from raw log entry"""
        try:
            properties = entry.get('properties', {})
            object_key = properties.get('objectKey', '')
            function_name, file_extension = cls.parse_object_key(object_key)
            
            return cls(
                time=entry.get('time'),
                category=entry.get('category'),
                operationName=entry.get('operationName'),
                callerIpAddress=entry.get('callerIpAddress'),
                location=entry.get('location'),
                uri=entry.get('uri'),
                durationMs=int(entry.get('durationMs')) if entry.get('durationMs') is not None else None,
                referrerHeader=properties.get('referrerHeader'),
                userAgentHeader=properties.get('userAgentHeader'),
                requestBodySize=int(properties.get('requestBodySize')) if properties.get('requestBodySize') is not None else None,
                responseBodySize=int(properties.get('responseBodySize')) if properties.get('responseBodySize') is not None else None,
                serverLatencyMs=int(properties.get('serverLatencyMs')) if properties.get('serverLatencyMs') is not None else None,
                objectKey=object_key,
                functionName=function_name,
                file_extension=file_extension
            )
        except Exception as e:
            if logger:
                logger.error(f"Error creating StorageOperationLog: {str(e)}")
            return None

    def __post_init__(self):
        if isinstance(self.time, str):
            if 'Z' in self.time:
                time_parts = self.time.split('.')
                if len(time_parts) > 1:
                    microseconds = time_parts[1].replace('Z', '')[:6]
                    time_str = f"{time_parts[0]}.{microseconds}Z"
                    self.time = datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S.%fZ")
                else:
                    self.time = datetime.strptime(self.time, "%Y-%m-%dT%H:%M:%SZ")

5. main.py:

from log_config import setup_logger
from azure_log_downloader import AzureLogDownloader
from elasticsearch_sender import ElasticsearchSender
from datetime import datetime, timedelta, UTC
from dotenv import load_dotenv
import os

load_dotenv()

def _get_index_name(target_date: datetime):
    """Get full index name for the specified date"""
    return os.getenv('ELASTICSEARCH_INDEX_TEMPLATE', 'logs-{year}-{month}').format(
        year=target_date.year,
        month=target_date.month
    )

def _get_blob_name_list(target_date: datetime):
    """Get blob paths for all hours of the specified date"""
    blobs = []
    for hour in range(24):
        blob_time = target_date.replace(hour=hour, minute=0, second=0, microsecond=0)
        blob_name = os.getenv('AZURE_STORAGE_BLOB_TEMPLATE', 'logs/y={year}/m={month}/d={day}/h={hour}').format(
            year=blob_time.year,
            month=blob_time.month,
            day=blob_time.day,
            hour=blob_time.hour
        )
        blobs.append(blob_name)
    return blobs

def main():
    start_date = datetime(2024, 1, 1, tzinfo=UTC)
    end_date = datetime(2024, 1, 2, tzinfo=UTC)

    current_date = start_date
    while current_date <= end_date:
        target_date = current_date
        logger = setup_logger(target_date, os.getenv('LOG_PREFIX', 'app'))

        try:
            logger.info(f"\nProcessing data for {current_date.date()}")
            elasticsearch_index = _get_index_name(target_date)

            sender = ElasticsearchSender(
                os.getenv('ELASTICSEARCH_HOST', 'http://localhost:9200'),
                os.getenv('ELASTICSEARCH_API_KEY'),
                elasticsearch_index,
                logger
            )

            if not sender.test_connection():
                logger.error("Elasticsearch connection failed")
                current_date += timedelta(days=1)
                continue

            total_logs = total_success = total_failed = 0
            blobs = _get_blob_name_list(target_date)

            for container in os.getenv('AZURE_STORAGE_CONTAINERS', 'logs').split(','):
                logger.info(f"\nProcessing container: {container}")
                
                for blob_name in blobs:
                    logger.info(f"\nProcessing blob: {blob_name}")
                    
                    downloader = AzureLogDownloader(
                        os.getenv('AZURE_STORAGE_URI'),
                        container,
                        blob_name,
                        logger
                    )
                    
                    try:
                        log_entries = downloader.download_and_transform()
                        success, failed = sender.send_logs(log_entries)
                        
                        total_logs += len(log_entries)
                        total_success += success
                        total_failed += failed
                        
                    except Exception as e:
                        logger.error(f"Error processing {blob_name}: {str(e)}")
                        continue

            logger.info(f"\n{current_date.date()} Processing completed:")
            logger.info(f"Total documents processed: {total_logs}")
            logger.info(f"Successfully indexed: {total_success}")
            logger.info(f"Failed: {total_failed}")

        finally:
            for handler in logger.handlers[:]:
                handler.close()
                logger.removeHandler(handler)

        current_date += timedelta(days=1)

if __name__ == "__main__":
    main()

6. .env :

ELASTICSEARCH_HOST=http://localhost:9200
ELASTICSEARCH_API_KEY=your_api_key
ELASTICSEARCH_INDEX_TEMPLATE=logs-{year}-{month}
AZURE_STORAGE_URI=your_storage_connection_string
AZURE_STORAGE_CONTAINERS=logs
AZURE_STORAGE_BLOB_TEMPLATE=logs/y={year}/m={month}/d={day}/h={hour}
LOG_PREFIX=app


前情后续:

将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(1.标准版)-CSDN博客

将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(2.换掉付费的Event Hubs)-CSDN博客

将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(3.纯python的实惠版)-CSDN博客




本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2280050.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

什么是三高架构?

大家好&#xff0c;我是锋哥。今天分享关于【什么是三高架构?】面试题。希望对大家有帮助&#xff1b; 什么是三高架构? 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 “三高架构”通常是指高可用性&#xff08;High Availability&#xff09;、高性能&#xff…

[微服务]注册中心优化

环境隔离 企业实际开发中&#xff0c;往往会搭建多个运行环境&#xff0c;例如&#xff1a; 开发环境测试环境预发布环境生产环境 这些不同环境之间的服务和数据之间需要隔离。 还有的企业中&#xff0c;会开发多个项目&#xff0c;共享nacos集群。此时&#xff0c;这些项目…

uniapp——App 监听下载文件状态,打开文件(三)

5 实现下载文件并打开 这里演示&#xff0c;导出Excel 表格 文章目录 5 实现下载文件并打开DEMO监听下载进度效果图为什么 totalSize 一直为0&#xff1f; 相关Api&#xff1a; downloader DEMO 提示&#xff1a; 请求方式支持&#xff1a;GET、POST&#xff1b;POST 方式需要…

地图:nuxt3高德地图简单使用 / nuxt2 + amap

一、官方网站 JS API 安全密钥使用-基础-进阶教程-地图 JS API 2.0 | 高德地图API 二、使用 2.1、创建应用 2.2、添加key&#xff0c;得到key值 2.3、nuxt3项目 引入amap 2.4、pages/map.vue <template><div class"container"><div id"map-co…

Java面试专题——面向对象

面向过程和面向对象的区别 面向过程&#xff1a;当事件比较简单的时候&#xff0c;利用面向过程&#xff0c;注重的是事件的具体的步骤/过程&#xff0c;注重的是过程中的具体的行为&#xff0c;以函数为最小单位&#xff0c;考虑怎么做。 面向对象&#xff1a;注重找“参与者…

Flowable 管理各业务流程:流程设计器 (获取流程模型 XML)、流程部署、启动流程、流程审批、流程挂起和激活、任务分配

文章目录 引言I 表结构主要表前缀及其用途核心表II 流程设计器(Flowable BPMN模型编辑器插件)Flowable-UIvue插件III 流程部署部署步骤例子:根据流程模型ID部署IV 启动流程启动步骤ACT_RE_PROCDEF:流程定义相关信息例子:根据流程 ID 启动流程V 流程审批审批步骤Flowable 审…

VIVADO ILA IP进阶使用之任意设置ILA的采样频率

VIVADO ILA IP进阶使用之任意设置ILA的采样频率 VIVADO ILA IP和VIO IP结合使用任意设置ILA的采样频率 目录 前言 一、VIO IP的配置 二、ILA IP的配置 三、测试代码 四、测试结果 总结 前言 VIVADO中编写完程序上板测试时经常会用到viavdo自带的ILA逻辑分析仪IP核&#x…

【机器学习实战入门】使用LSTM机器学习预测股票价格

机器学习在股票价格预测中有重要的应用。在这个机器学习项目中&#xff0c;我们将讨论如何预测股票的收益。这是一个非常复杂的任务&#xff0c;充满了不确定性。我们将会把这个项目分成两部分进行开发&#xff1a; 首先&#xff0c;我们将学习如何使用 LSTM 神经网络预测股票…

DilateFormer: Multi-Scale Dilated Transformer for Visual Recognition 中的空洞自注意力机制

空洞自注意力机制 文章目录 摘要1. 模型解释1.1. 滑动窗口扩张注意力1.2. 多尺度扩张注意力 2. 代码3. 流程图3.1. MultiDilatelocalAttention3.2. DilateAttention3.3. MLP 摘要 本文针对DilateFormer中的空洞自注意力机制原理和代码进行详细介绍&#xff0c;最后通过流程图梳…

大模型GUI系列论文阅读 DAY2续:《一个具备规划、长上下文理解和程序合成能力的真实世界Web代理》

摘要 预训练的大语言模型&#xff08;LLMs&#xff09;近年来在自主网页自动化方面实现了更好的泛化能力和样本效率。然而&#xff0c;在真实世界的网站上&#xff0c;其性能仍然受到以下问题的影响&#xff1a;(1) 开放领域的复杂性&#xff0c;(2) 有限的上下文长度&#xff…

Qt按钮美化教程

前言 Qt按钮美化主要有三种方式&#xff1a;QSS、属性和自绘 QSS 字体大小 font-size: 18px;文字颜色 color: white;背景颜色 background-color: rgb(10,88,163); 按钮边框 border: 2px solid rgb(114,188,51);文字对齐 text-align: left;左侧内边距 padding-left: 10…

云IDE:开启软件开发的未来篇章

敖行客一直致力于将整个研发协作流程线上化&#xff0c;从而打破物理环境依赖&#xff0c;让研发组织模式更加灵活、自由且高效&#xff0c;今天就来聊聊AT Work&#xff08;一站式研发协作平台&#xff09;的重要组成部分-云IDE。 在科技领域&#xff0c;历史常常是未来的风向…

AI agent 在 6G 网络应用,无人机群控场景

AI agent 在 6G 网络应用,无人机群控场景 随着 6G 时代的临近,融合人工智能成为关键趋势。借鉴 IT 行业 AI Agent 应用范式,提出 6G AI Agent 技术框架,包含多模型融合、定制化 Agent 和插件式环境交互理念,构建了涵盖四层结构的框架。通过各层协同实现自主环境感知等能力…

【Linux 重装】Ubuntu 启动盘 U盘无法被识别,如何处理?

背景 U盘烧录了 Ubuntu 系统作为启动盘&#xff0c;再次插入电脑后无法被识别 解决方案&#xff08;Mac 适用&#xff09; &#xff08;1&#xff09;查找 USB&#xff0c;&#xff08;2&#xff09;格式化&#xff08;1&#xff09;在 terminal 中通过 diskutil list 查看是…

【优选算法篇】2----复写零

---------------------------------------begin--------------------------------------- 这道算法题相对于移动零&#xff0c;就上了一点点强度咯&#xff0c;不过还是很容易理解的啦~ 题目解析&#xff1a; 这道题如果没理解好题目&#xff0c;是很难的&#xff0c;但理解题…

高效建站指南:通过Portainer快速搭建自己的在线网站

文章目录 前言1. 安装Portainer1.1 访问Portainer Web界面 2. 使用Portainer创建Nginx容器3. 将Web静态站点实现公网访问4. 配置Web站点公网访问地址4.1公网访问Web站点 5. 固定Web静态站点公网地址6. 固定公网地址访问Web静态站点 前言 Portainer是一个开源的Docker轻量级可视…

redis性能优化参考——筑梦之路

基准性能测试 redis响应延迟耗时多长判定为慢&#xff1f; 比如机器硬件配置比较差&#xff0c;响应延迟10毫秒&#xff0c;就认为是慢&#xff0c;机器硬件配置比较高&#xff0c;响应延迟0.5毫秒&#xff0c;就认为是慢。这个没有固定的标准&#xff0c;只有了解了你的 Red…

Python 入门教程(2)搭建环境 | 2.3、VSCode配置Python开发环境

文章目录 一、VSCode配置Python开发环境1、软件安装2、安装Python插件3、配置Python环境4、包管理5、调试程序 前言 Visual Studio Code&#xff08;简称VSCode&#xff09;以其强大的功能和灵活的扩展性&#xff0c;成为了许多开发者的首选。本文将详细介绍如何在VSCode中配置…

Trimble三维激光扫描-地下公共设施维护的新途径【沪敖3D】

三维激光扫描技术生成了复杂隧道网络的高度详细的三维模型 项目背景 纽约州北部的地下通道网络已有100年历史&#xff0c;其中包含供暖系统、电线和其他公用设施&#xff0c;现在已经开始显露出老化迹象。由于安全原因&#xff0c;第三方的进入受到限制&#xff0c;在没有现成纸…

【强化学习】策略梯度(Policy Gradient,PG)算法

&#x1f4e2;本篇文章是博主强化学习&#xff08;RL&#xff09;领域学习时&#xff0c;用于个人学习、研究或者欣赏使用&#xff0c;并基于博主对相关等领域的一些理解而记录的学习摘录和笔记&#xff0c;若有不当和侵权之处&#xff0c;指出后将会立即改正&#xff0c;还望谅…