Python本地下载-实例的SQL审计日志

news2025/1/13 7:32:43

简介:使用阿里云的RDS数据库,开启DAS的数据库治理服务。会产生大量的审计日志。

我们有2T的审计日志数据,保留180天,每小时收费空间:0.008元/GB/小时

计算下来:2x1024x 24x 30 x 0.008 =11796 元

 解决:打算数据量存储30天,以前的审计日志,可以使用阿里云的API 调用,下载,归档报错。如果出现问题,可以及时定位。保留周期更长。费用更少。

阿里云API接口:查询实例的SQL审计日志 (aliyun.com)

 目前使用这个API接口拉取,基本相关信息已经存在。

高级一点可以采用 一下API拉取日志

按照访问来源统计全量请求数据的API接口_数据库自治服务-阿里云帮助中心

GetFullRequestStatResultByInstanceId - 按照SQL ID异步统计全量请求数据 (aliyun.com)

环境准备

1、Python 3.8 的环境

2、安装阿里云的sdk

        SDK 包名称alibabacloud_rds20140815

        SDK 版本2.1.2

        SDK 包管理平台pypi

SDK 安装命令

pip install alibabacloud_rds20140815==2.1.2

提示仓库同步可能会有延迟,如果遇到版本不存在的情况,请稍后再试或使用上一个版本

阿里云 OpenAPI 开发者门户 (aliyun.com)

基础代码

此代码是阿里云自动生成的

一个执行,一个异步执行。选择其中一个即可。 

# -*- coding: utf-8 -*-
# This file is auto-generated, don't edit it. Thanks.
import sys

from typing import List

from alibabacloud_rds20140815.client import Client as Rds20140815Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_rds20140815 import models as rds_20140815_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class Sample:
    def __init__(self):
        pass

    @staticmethod
    def create_client(
        access_key_id: str,
        access_key_secret: str,
    ) -> Rds20140815Client:
        """
        使用AK&SK初始化账号Client
        @param access_key_id:
        @param access_key_secret:
        @return: Client
        @throws Exception
        """
        config = open_api_models.Config(
            # 必填,您的 AccessKey ID,
            access_key_id=access_key_id,
            # 必填,您的 AccessKey Secret,
            access_key_secret=access_key_secret
        )
        # 访问的域名
        config.endpoint = f'rds.aliyuncs.com'
        return Rds20140815Client(config)

    @staticmethod
    def main(
        args: List[str],
    ) -> None:
        # 工程代码泄露可能会导致AccessKey泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考,建议使用更安全的 STS 方式,更多鉴权访问方式请参见:https://help.aliyun.com/document_detail/378659.html
        client = Sample.create_client('accessKeyId', 'accessKeySecret')
        describe_sqllog_records_request = rds_20140815_models.DescribeSQLLogRecordsRequest(
            dbinstance_id='xxxxxx',
            start_time='2022-11-18T00:00:00Z',
            end_time='2022-11-19T00:00:00Z',
            page_size=100,
            page_number=1
        )
        runtime = util_models.RuntimeOptions()
        try:
            # 复制代码运行请自行打印 API 的返回值
            client.describe_sqllog_records_with_options(describe_sqllog_records_request, runtime)
        except Exception as error:
            # 如有需要,请打印 error
            UtilClient.assert_as_string(error.message)

    @staticmethod
    async def main_async(
        args: List[str],
    ) -> None:
        # 工程代码泄露可能会导致AccessKey泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考,建议使用更安全的 STS 方式,更多鉴权访问方式请参见:https://help.aliyun.com/document_detail/378659.html
        client = Sample.create_client('accessKeyId', 'accessKeySecret')
        describe_sqllog_records_request = rds_20140815_models.DescribeSQLLogRecordsRequest(
            dbinstance_id='xxxxxx',
            start_time='2022-11-18T00:00:00Z',
            end_time='2022-11-19T00:00:00Z',
            page_size=100,
            page_number=1
        )
        runtime = util_models.RuntimeOptions()
        try:
            # 复制代码运行请自行打印 API 的返回值
            await client.describe_sqllog_records_with_options_async(describe_sqllog_records_request, runtime)
        except Exception as error:
            # 如有需要,请打印 error
            UtilClient.assert_as_string(error.message)


if __name__ == '__main__':
    Sample.main(sys.argv[1:])

 

2、根据小时拉去代码

因为数据量很大,我们准备根据  小时  拉去日志。

 代码介绍:

根据每小时拉去文件,进行按时间保存,拉去一天的量完成后 会钉钉通知

import datetime
import json
import os
import sys
from time import sleep

import requests
from alibabacloud_rds20140815.client import Client as Rds20140815Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_rds20140815 import models as rds_20140815_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient

import copy


class Sample:
    def __init__(self):
        pass

    @staticmethod
    def create_client(
            access_key_id: str,
            access_key_secret: str,
    ) -> Rds20140815Client:
        """
        使用AK&SK初始化账号Client
        @param access_key_id:
        @param access_key_secret:
        @return: Client
        @throws Exception
        """
        config = open_api_models.Config(
            # 必填,您的 AccessKey ID,
            access_key_id=access_key_id,
            # 必填,您的 AccessKey Secret,
            access_key_secret=access_key_secret
        )
        # 访问的域名
        config.endpoint = f'rds.aliyuncs.com'
        return Rds20140815Client(config)

    @staticmethod
    def main(page_number=1, startTime=None, endTime=None):
        # 工程代码泄露可能会导致AccessKey泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考,建议使用更安全的 STS 方式,更多鉴权访问方式请参见:https://help.aliyun.com/document_detail/378659.html
        client = Sample.create_client('xxxxxxxxxxxxxx', 'xxxxxxxxxxxxxxxxx')
        describe_sqllog_records_request = rds_20140815_models.DescribeSQLLogRecordsRequest(
            #数据库实例ID
            dbinstance_id='rm-xxxxxxxxxxxxxxxxx',
            end_time=endTime,
            start_time=startTime,
            page_size=100,
            page_number=page_number
        )
        runtime = util_models.RuntimeOptions()
        try:
            # 复制代码运行请自行打印 API 的返回值
            data = client.describe_sqllog_records_with_options(describe_sqllog_records_request, runtime)
            return data
        except Exception as error:
            # 如有需要,请打印 error
            UtilClient.assert_as_string(error.message)


def msg(text):
    json_text = {
        "msgtype": "text",
        "at": {
            "atMobiles": [
                "11111"
            ],
            "isAtAll": False
        },
        "text": {
            "content": text
        }
    }
    print(requests.post(api_url, json.dumps(json_text), headers=headers).content)


if __name__ == '__main__':
    startTime = '2022-11-19T00:00:00Z'
    for i in range(24):
        endTime = (datetime.datetime.strptime(startTime, "%Y-%m-%dT%H:%M:%SZ") + datetime.timedelta(
            hours=1)).strftime("%Y-%m-%dT%H:%M:%SZ")
        rds_time = datetime.datetime.strptime(startTime, "%Y-%m-%dT%H:%M:%SZ")
        rds_time_file_log = rds_time.strftime("%Y-%m-%d_%H")  # print(rds_time_file)
        rds_file = rds_time.strftime("%Y-%m-%d")  # print(rds_time_file)
        folder = os.path.join(os.path.abspath(os.path.dirname(__file__)), rds_file)
        print(folder)
        log_path = os.path.exists(folder)
        if not log_path:
            os.makedirs(folder)

        print(startTime, endTime)
        rds = Sample.main(startTime=startTime, endTime=endTime)
        # 总条数
        rds_num = rds.body.total_record_count

        # 页数
        rds_page = rds.body.page_number

        # page的num数量
        pag_num_max = 100
        # 每页返回的数值
        page_record_count = rds.body.page_record_count

        # 总页数
        page_num_sum = int(rds_num / pag_num_max) + 1

        rds_log = rds.body.items.to_map()
        for i in range(page_num_sum + 1):
            print(i + 1)
            num = i + 1
            rds_one = Sample.main(page_number=num, startTime=startTime, endTime=endTime)
            page_record_count_one = rds_one.body.page_record_count
            if page_record_count_one != 0:
                rds_log_one = rds_one.body.items.to_map()
                # 获取rds 的真实日志数据
                for v in rds_log_one['SQLRecord']:
                    with open(folder + '/' + 'rds_' + rds_time_file_log + '.log', 'a', encoding="utf-8") as f:
                        f.write(f"{str(v)} \n")
            if (i + 1) % 2000 == 0:
                sleep(10)
        f.close()

        startTime = endTime  # 参数days=1(天+1) 可以换成 minutes=1(分钟+1)、seconds=1(秒+1)

    token = "xxxxxxxxxxxxxxxxxxxxxxxx"
    text = "python拉去数据"

    headers = {'Content-Type': 'application/json;charset=utf-8'}
    api_url = "https://oapi.dingtalk.com/robot/send?access_token=%s" % token
    msg('RDS数据拉去-完成告警')

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/101372.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

分类预测 | MATLAB实现CNN-LSTM卷积长短期记忆神经网络分类预测(语音分类)

分类预测 | MATLAB实现CNN-LSTM卷积长短期记忆神经网络分类预测(语音分类) 目录 分类预测 | MATLAB实现CNN-LSTM卷积长短期记忆神经网络分类预测(语音分类)基本描述模型结构设计过程参考资料基本描述 传统的语音识别技术,主要在隐马尔可夫模型和高斯混合模型两大”神器“的加持…

[element plus] 分页组件使用 - vue

学习关键语句: 饿了么加组件分页组件 element分页组件 vue3 饿了么分页组件 写在前面 必须要说明的是 , 这篇文章使用的分页组件样式包含了饿了么官方给出警告以后会弃用的部分 但是问题是什么呢? 问题就是我还没找到这个用 vmodel 绑定的使用方法 , 再加上现在也算是有点…

C++代码简化之道

1. 善用emplace C11开始STL容器出现了emplace(置入)的语义。比如 vector、map、unordered_map,甚至 stack和 queue都有。 emplace方便之处在于,可以用函数参数自动构造对象,而不是向vector的push_back,ma…

论文投稿指南——中文核心期刊推荐(电工技术2)

【前言】 🚀 想发论文怎么办?手把手教你论文如何投稿!那么,首先要搞懂投稿目标——论文期刊 🎄 在期刊论文的分布中,存在一种普遍现象:即对于某一特定的学科或专业来说,少数期刊所含…

机器人开发--霍尔元件

机器人开发--霍尔元件1 霍尔效应2 霍尔元件特点3 霍尔传感器的典型应用电流传感器位移测量测转速或转数参考1 霍尔效应 霍尔效应是电磁效应的一种,这一现象是美国物理学家霍尔于1879年在研究金属的导电机制时发现的。当电流垂直于外磁场通过半导体时,载…

ORB-SLAM2 --- Tracking::SearchLocalPoints函数解析

1.函数作用 用局部地图点进行投影匹配,得到更多的匹配关系。 局部地图点中已经是当前帧地图点的不需要再投影,只需要将此外的并且在视野范围内的点和当前帧进行投影匹配。 2.函数流程 Step 1:遍历当前帧的地图点,标记这些地图点不…

Mycat(6):mycat简单配置

1 找到conf/schema.xml并备份 2 配置虚拟表table[在schema里面] 其中 sharding-by-intfile 为rule.xml中的规则 规则文件为conf文件夹中的partition-hash-int.txt 3 配置数据节点dataNode 现在数据库新建3个数据库,skywalking,skywalking1,s…

[附源码]计算机毕业设计Python的低碳生活记录网站(程序+源码+LW文档)

该项目含有源码、文档、程序、数据库、配套开发软件、软件安装教程 项目运行 环境配置: Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术: django python Vue 等等组成,B/S模式 pychram管理等…

MapReduce 排序

文章目录WritableComparable 排序一、排序概述1、排序分类2、自定义排序(1)、原理分析二、WritableComparable 排序案例(全排序)1、需求WritableComparable 排序 一、排序概述 排序是MapReduce框架中最重要的操作之一 MapTask和ReduceTask均会对数据按照key进行排序&#xff…

使用c++部署tensorrt加速yolov7

先放上一张我运行成功的截图,只要跟着我的教程一步一步按操作,下载好匹配的软件是一定可以成功的! 我相信想要在C++平台使用tensorrt加速的朋友们也是有很强的计算机基础的,那么简单的部分我们就跳过,重点是和大家介绍模型转换的部分以及环境的搭建。 一. 环境 我的cudn…

docker-compose安装部署

一、前言 docker compose 给容器做单机编排的。Docker-Compose项目是Docker官方的开源项目,负责实现对Docker容器集群的快速编排。 docker compose是docker的独立产品,因此安装docker compose之前需要安装docker,Centos部署Docker_crazyK.的博…

高斯信号的贝叶斯步长最小均方算法(Matlab代码实现)

👨‍🎓个人主页:研学社的博客 💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜…

云原生爱好者周刊 | 使用 WASM 来写博客是什么感觉?

开源项目推荐 zzhack zzhack 是一个静态博客框架,是一个纯正的 WASM 应用,它由 Rust & Yew 来作为技术栈进行搭建,UI 设计比较美观,大家也可以直接使用该项目的设计模板零成本构建 WASM 应用。 Tracee Tracee 是一个运行时…

数据结构与算法之双向链表的设计与实现

文章目录前言一、双向链表1.1 概念1.2 双向链表的应用1.3 双向链表的node方法1.4 双向链表的add方法1.5 双向链表的remove方法1.6 整体代码1.7 接口测试二、对比学习2.1 单向链表 vs 双向链表2.2 双向链表 vs 动态数组2.3 ArrayList和LinkedList的区别前言 文章链接之前所介绍…

基于python的C环境安装(NLP文本纠错项目使用)

1.下载c环境:(window系统) 链接:Visual Studio: 面向软件开发人员和 Teams 的 IDE 和代码编辑器 (microsoft.com) 2.安装 1.打开下载的安装包 2.进入如下页面,按照下图进行勾选,注意,其它不要动…

全渠道营销与多渠道营销:定义、比较、示例

关键词:全渠道营销、多渠道营销 全渠道还是多渠道?您正在踏上跨境电子商务之旅,为您的品牌寻找合适的营销策略,但这一切似乎都过于理论化和复杂。 我们将使事情变得更容易,因为本文全面解释了多渠道营销和全渠道营销之…

【文本检测】1、DBNet | 实时场景文本检测器

文章目录一、背景二、方法2.1 二值化2.2 Adaptive threshold2.3 可变形卷积2.4 生成标签2.5 优化过程三、效果3.1 实验数据3.2 实验细节3.3 消融实验3.4 和其他方法的对比论文:Real-time Scene Text Detection with Differentiable Binarization 代码:h…

不懂应该怎么选合适的医疗器械进销存?

在医院运行过程中,需要管理医疗设备的采购、养护、报废等各个环节。医疗器械进销存软件是集医院设备、物资、耗材的申请、采购、出入库、维修、维护、折旧、固定资产管理、效益分析等全流程管理功能于一体,实现医院医疗设备的信息化,数据库规…

数据结构之【时间复杂度和空间复杂度】

如何去评价一个代码它的效率高不高呢? 我们通常从两个方面去看! 时间复杂度:主要衡量一个算法的运行速度空间复杂度:主要衡量一个算法所需要的额外空间 1. 时间复杂度 1.1 时间复杂度的定义 在计算机科学中,算法的…

算法题中常用的位运算

文章目录为什么使用位运算?十进制和二进制之间的转化短除法(十进制转二进制)幂次和(二进制转十进制)位运算符异或运算(xor)指定位置的位运算位运算实战要点为什么使用位运算? 机器采…