提高错误日志处理效率!使用Python和钉钉机器人实现自动告警聚合

news2025/1/12 1:57:56

1、背景

日志是非常重要的信息资源。它们记录了应用程序的运行状态、错误和异常情况,帮助我们了解系统的健康状况以及发现潜在的问题。为了高效地管理和分析日志数据,许多组织采用了Elasticsearch、Logstash和Kibana(ELK)堆栈作为日志收集和分析的解决方案。

开发一个实时监控和告警脚本,专门用于监控ELK平台中的错误日志,并及时发送告警通知给相关人员。该系统将通过扫描Elasticsearch中的日志数据,筛选出等级为ERROR的错误日志,并根据预设的告警规则进行处理。

2、目的

使用Python从Elasticsearch中查询特定级别为ERROR的错误日志,并通过钉钉机器人实现告警聚合和发送,以提高错误日志的处理效率和及时响应能力。

为什么开发这个脚本?
因为目前我们这边没有监控日志的信息,出现问题不能及时发现 和预知
优势
1、消息进行聚合,每个项目的多条告警信息,汇总一条发送。突破钉钉机器人每分钟只能发送20条的限制
2、告警信息you太多的重复,进行去重处理,添加告警次数发送。防止被钉钉限流
在这里插入图片描述

3、原理

  1. 使用Python的Elasticsearch库连接到Elasticsearch集群。
  2. 构建Elasticsearch查询DSL(领域专用语言),过滤出级别为ERROR的日志记录。
  3. 执行查询并获取结果。
  4. 对查询结果进行聚合,统计每个项目的错误次数。
  5. 根据聚合结果,生成告警消息的Markdown格式内容。
  6. 使用钉钉机器人发送告警消息到指定的钉钉群。

4、流程

  1. 导入必要的Python库,包括elasticsearchrequests
  2. 创建Elasticsearch连接,指定Elasticsearch集群的主机和端口。
  3. 构建Elasticsearch查询DSL,设置查询条件为日志级别为ERROR。
  4. 执行查询,获取查询结果。
  5. 对查询结果进行处理,聚合每个项目的错误次数。
  6. 根据聚合结果生成告警消息的Markdown内容。
  7. 使用钉钉机器人API发送告警消息到指定的钉钉群。

5、实现代码

在这里插入图片描述


# -*- coding: utf-8 -*-
# @Time    : 2023/6/17 18:11
# @Author  : 南宫乘风
# @Email   : 1794748404@qq.com
# @File    : all_es.py
# @Software: PyCharm
from collections import Counter
from datetime import datetime, timedelta

import requests
from elasticsearch import Elasticsearch

from monitor.es_ding import send_pretty_message

# Elasticsearch客户端实例
es = Elasticsearch(hosts=['http://172.18.xxx.xxxx:9200'], http_auth=('elastic', 'xxxxx'),
                   sniff_on_start=True,  # 连接前测试
                   sniff_on_connection_fail=True,  # 节点无响应时刷新节点
                   sniff_timeout=300,  # 设置超时时间
                   headers={'Content-Type': 'application/json'})


def format_timestamp(timestamp):
    """格式化时间为Elasticsearch接受的字符串格式"""
    return timestamp.strftime("%Y-%m-%d %H:%M:%S")


def search_errors():
    """执行查询,获取错误日志数据"""
    current_time = datetime.now()
    one_minute_ago = current_time - timedelta(minutes=10)
    current_time_str = format_timestamp(current_time)
    one_minute_ago_str = format_timestamp(one_minute_ago)

    index = 'app-prod-*'  # 替换为实际的索引名称

    query = {
        "query": {
            "bool": {
                "filter": [
                    {
                        "range": {
                            "@timestamp": {
                                "gte": one_minute_ago_str,
                                "lt": current_time_str,
                                "format": "yyyy-MM-dd HH:mm:ss",
                                "time_zone": "+08:00"
                            }
                        }
                    },
                    {
                        "match": {
                            "loglevel": "ERROR" #匹配项目错误等级
                        }
                    },
                    {
                        "bool": {
                            "must_not": [
                                {
                                    "match": {
                                        "projectname": "fox-data-spiderman" # 需要屏蔽的项目
                                    }
                                }
                            ]
                        }
                    }
                ]
            }
        },
        "_source": [  ## 输出的字段
            "date",
            "projectname",
            "threadname",
            "msg"
        ],
        "from": 0,
        "size": 10000, # 返回查询的条数
    }

    result = es.search(index=index, body=query)
    total_documents = result["hits"]["total"]["value"]
    print(f"总共匹配到 {total_documents} 条文档")

    result = result['hits']['hits']
    all_result = []

    for i in result:
        all_result.append(i['_source'])

    msg_counter = Counter(d['msg'] for d in all_result if 'msg' in d)
    results = []

    for d in all_result:
        if 'msg' in d and d['msg'] in msg_counter:
            count = msg_counter[d['msg']]
            del msg_counter[d['msg']]
            d['count'] = count
            d['msg'] = d['msg'][:100] + ('...' if len(d['msg']) > 100 else '')
            results.append(d)

    return results


def aggregate_errors(results):
    """按项目名称聚合错误日志"""
    aggregated_data = {}
    for d in results:
        projectname = d.get('projectname')
        if projectname:
            if projectname not in aggregated_data:
                aggregated_data[projectname] = []
            aggregated_data[projectname].append({'date': d.get('date'), 'msg': d.get('msg'), 'count': d.get('count')})
    return aggregated_data


def generate_summary(projectname, messages):
    """生成Markdown格式的消息摘要"""
    markdown_text = f'### {projectname} \n\n'
    for message in messages:
        markdown_text += f"**时间:** {message['date']}\n\n"
        markdown_text += f"**告警次数:** <font color='red'><b>{message['count']}</b></font>\n\n"
        markdown_text += f"{message['msg']}\n\n---\n\n"
    return markdown_text


def send_message_summary(projectname, messages):
    """发送摘要消息给钉钉机器人"""
    summary = generate_summary(projectname, messages)
    data = {
        'msgtype': 'markdown',
        'markdown': {
            'title': f'{projectname}消息告警',
            'text': summary
        }
    }
    webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=xxxxxxxxxxxxxxxxx'  # 替换为实际的Webhook URL
    response = requests.post(webhook_url, json=data)
    if response.status_code == 200:
        print('消息发送成功')
    else:
        print('消息发送失败')


if __name__ == '__main__':
    errors = search_errors()
    aggregated_errors = aggregate_errors(errors)

    for projectname, messages in aggregated_errors.items():
        print(f"{projectname}:")
        print(messages)

在这里插入图片描述

6、Crontab添加定时任务

也可以用采用:Jenkins与GitLab的定时任务工作流程
https://blog.csdn.net/heian_99/article/details/131164591?spm=1001.2014.3001.5501

#日志
*/2 * * * * cd /python_app/elasticsearch; /opt/anaconda3/envs/py38/bin/python -u  es_monitor.py >> es_error_info.log 2>&1

该定时任务的含义是每隔2分钟执行一次指定目录下的 es_monitor.py 脚本,并将输出信息追加到 es_error_info.log 文件中。这样可以定期监控 Elasticsearch 的错误日志,并记录相关信息以便后续查看和分析。

7、总结

本博客,为我们构建了一个完整的应用日志监控和告警系统,通过ELK技术栈和钉钉机器人的结合,使得我们能够及时发现和处理应用中的错误,提高了团队的工作效率和系统的稳定性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/656940.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Eclipse Krazo(Jakarta MVC)的使用

文章目录 背景Jakarta MVC规范Eclipse Krazo使用前的思考全局配置Controller示例返回View的三种写法View中用到的Model如何设值&#xff1f;View中如何获取Model中的值&#xff1f; 参数校验防止CSRFKrazo是如何实现的呢&#xff1f;如何生成csrf的token&#xff1f;如何校验cs…

开源赋能,决胜未来 — 参加原子全球开源峰会有感

目录 文章目录 目录前言开源决胜未来&#xff1a;闭源摧毁 UNIX&#xff0c;开源成就 Linux开源创新&#xff1a;软硬件协同&#xff0c;共建开源生态 前言 开源原子基金会作为国内首家开源基金会组织&#xff0c;由其主办的首届 “开放原子全球开源峰会” 也是第一次被冠以 “…

干货文:Mac 中 .bash_profile 和 .zshrc 的区别

如果你想在 Mac OS 中配置 MySQL 的环境变量&#xff0c;在 .zshrc 文件中添加如下内容&#xff1a; # 设置 mysql 的路径 export MYSQL_HOME/usr/local/mysql/bin# 将 MYSQL_HOME 添加到 PATH 中 export PATH$HOME/bin:/usr/local/bin:$MYSQL_HOME:$PATH# 解决需要 source 才…

硬件【9】详解二极管钳位电路

文章目录 1 概述1.1 正向钳位电路1.2 偏置正向钳位电路 1 概述 在之前的 二极管限幅电路 一文中&#xff0c;我们学习了二极管限幅电路&#xff0c;该电路可以削掉一部分信号&#xff0c;但不会影响剩余信号。今天&#xff0c;我们将学习另一种基于二极管的电路&#xff0c;该…

搭建环境【2】windows主机和ubuntu互传文件的4种方法

我的ubuntu系统是安装在 VMware 虚拟机中的&#xff0c;两者之间经常要互传文件&#xff0c;下面介绍4种常用的互传文件方法。 1. 共享文件夹方式互传 在虚拟机中需要开启共享文件夹的功能。首先虚拟机中的ubuntu要求是已经开机了的状态&#xff0c;然后进行设置&#xff1a;…

Vue2封装一个全局通知组件并发布到NPM

✍&#x1f3fc;作者&#xff1a;周棋洛&#xff0c;计算机学生 ♉星座&#xff1a;金牛座 &#x1f3e0;主页&#xff1a;点击查看更多 &#x1f310;关键&#xff1a;vue2 组件封装 npm发包 文章目录 1. 前言 &#x1f343;2. 我为什么要封装通知插件 ❓3. 初始化vue空项目 &…

B047-cms02-高级查询 删除 添加 修改

目录 高级查询页面准备下拉框显示文章类型ArticleController用jstl和el表达式取值展示 高级查询参数ArticleQuery 高级查询页面发送请求导入jquery.jdirk.js在jquery下引用绑定按钮发送请求高级查询sql 绑定删除事件绑定事件拿到标签id值准备模态框来自xmind弹出删除模态框绑定…

matlab不显示子图刻度并调整子图间距

matlab中在使用subplot函数画图时&#xff0c;尤其是做emd分解查看IMF时&#xff0c; 正常画图的代码及结果如下&#xff1a; figure for i 1:size(imf_norm,1)subplot(7,1,i)plot(imf_norm(i,:))ylabel(IMFstring(i)) end其中imf_norm为分解得到的imfs 效果图&#xff1a; …

python里apply用法_Python apply函数的用法

Python编程语言Python 是一种面向对象、解释型计算机程序设计语言&#xff0c;由Guido van Rossum于1989年底发明&#xff0c;第一个公开发行版发行于1991年。Python语法简洁而清晰&#xff0c;具有丰富和强大的类库。它常被昵称为胶水语言&#xff0c;它能够把用其他语言制作的…

HDL抽象等级 仿真模型 网表 delay speicfy与sdf

1.HDL 硬件描述语言 抽象分级 HDL这里主要说verilog 在描述硬件电路时分为三个抽象级别 行为级模型&#xff1a;主要用于test bench&#xff0c;着重系统行为和算法&#xff0c;不在于电路实现&#xff0c;不可综合&#xff08;常用描述有initial&#xff0c;fork/join&#…

【MYSQL】MYSQL应用环境,系统特征,储存引擎,应用框架和索引功能的详细讲解

作者简介&#xff1a; 辭七七&#xff0c;目前大一&#xff0c;正在学习C/C&#xff0c;Java&#xff0c;Python等 作者主页&#xff1a; 七七的个人主页 文章收录专栏&#xff1a; 七七的闲谈 欢迎大家点赞 &#x1f44d; 收藏 ⭐ 加关注哦&#xff01;&#x1f496;&#x1f…

DINO-DETR匈牙利匹配与加噪过程学习记录

今天再来回顾一下DINO中匈牙利匹配与损失函数部分&#xff0c;该部分大致与DETR相似&#xff0c;却又略有不同。 为了查看数据方便&#xff0c;博主将num_query改为20&#xff0c;max_select值也为20。 匈牙利匹配过程 首先是数据送入匈牙利匹配中进行标签匹配过程了。 获取…

qt.qpa.plugin: Could not load the Qt platform plugin “xcb“ in

兄弟们看看是不是这个错&#xff1a; QObject::moveToThread: Current thread (0xe5205f0) is not the objects thread (0xa14d0f0). Cannot move to target thread (0xe5205f0)qt.qpa.plugin: Could not load the Qt platform plugin "xcb" in "/xxx/python3.…

Esp32+Blynk实现云端控制LED开灭

目录 环境配置依赖库安装blynk 基础设置 GPIO 点灯实验 环境配置 依赖库安装 参考 blynk 官方快速上手文档 如果要使用 blynk 提供的环境&#xff0c;我们就必须安装对应的库 选择基于 blynk 且适用于 ESP32 的库并安装到 arduino 上&#xff1a; blynk 基础设置 进入官网并且…

Question1:harbor登录成功,推送镜像失败

denied: requested access to the resource is denied 解决方案 查看用户的权限 Harbor 用户角色权限速查 系统级角色&#xff1a; Harbor 系统管理员&#xff1a;“Harbor 系统管理员”拥有最多的权限。除了上述权限外&#xff0c;“Harbor 系统管理员”还可以列出所有项目、…

一种令人拍案叫绝的 ChatGPT 攻击手段!

公众号关注 “GitHubDaily” 设为 “星标”&#xff0c;每天带你逛 GitHub&#xff01; 最近看到一个非常巧妙的 ChatGPT 攻击手段&#xff0c;跟大家分享一下&#xff0c;也算是做个提醒。 不论你是否懂技术&#xff0c;我都建议你了解一下这种攻击手段&#xff0c;有备无患。…

Golang的trace性能分析

文章目录 一、trace概述二、trace的使用方式代码中trace采集通过pprof采集 三、trace分析细节trace的web界面trace中需要关注的关注GC的频率关注goroutine调度情况关注goroutine的数量理想情况 四、GC分析当前服务GC情况设置GOGC设置GOMEMLIMITGC阈值的讨论GC的特点 五、gorout…

【每日挠头算法题(8)】最后一个单词的长度|重新排列字符串

文章目录 一、最后一个单词的长度思路1&#xff1a;从后往前遍历具体代码如下&#xff1a; 思路2&#xff1a;具体代码如下&#xff1a; 二、重新排列字符串思路具体代码如下&#xff1a; 一、最后一个单词的长度 点我直达~ 思路1&#xff1a;从后往前遍历 从后往前遍历&…

Stable DiffusionAI绘画一键启动整合包

点击"仙网攻城狮”关注我们哦~ 不当想研发的渗透人不是好运维 让我们每天进步一点点 简介 搞了个Stable DiffusionAI绘画整合包&#xff0c;里面有二次元风格、3D风格、真人模型&#xff0c;需要的后台回复“AI绘画”即可获取下载链接,放几个用SD生成的图。 实战 1.下载好…

调用万维易源API实现图像性别转换

目录 1、作者介绍2、调用万维易源API2.1 API介绍2.2 API调用过程 3、代码实现3.1 实现步骤3.2 完整代码 4、问题与分析 1、作者介绍 梁随欣&#xff0c;男&#xff0c;西安工程大学电子信息学院&#xff0c;2022级研究生 研究方向&#xff1a;模式识别与人工智能 电子邮箱&…