python_每天定时向数据库插入数据

news2024/11/25 7:16:14

每天的零点十分,定时向mysql数据库插入,昨天新增的文件和昨天下载文件的记录。第一次运行的时候,会全量同步昨天之前的数据。

import os
import threading
from datetime import datetime, timedelta
import time
import schedule
from pymysql import Connection

localhost = 'localhost'
user = 'root'
password = 'root'
db = 'book'


# 筛选所有非文件夹的文件
def dir_file(folder_path):
    filess = []
    for root, dirs, files in os.walk(folder_path):
        for file in files:
            filess.append(os.path.join(root, file))

    return filess


# ******上传文件的处理******

# 根据文件的创建日期,提前文件名、创建时间、路径等信息
def yesterday_files_count(files):
    # 获取每个文件的详细信息
    file_details = []
    for file in files:
        # 剔除掉tmt后缀的文件
        if 'tmt' not in file:

            # 文件夹路径,去掉前面路径,F:\24年路测数据\04-PBOX一型4G传输数据采集\
            parent_path = os.path.dirname(file).replace('\\', '/')[10:]

            # 文件名
            file_name = os.path.basename(file)

            # 设备名称
            device_name = file_name[:4]

            # 文件修改时间
            # m_time = os.path.getmtime(file_path),
            # 文件创建时间
            c_time = os.path.getctime(file)
            # 转换时间格式
            c_datetime = datetime.fromtimestamp(c_time)
            c_date = c_datetime.strftime('%Y-%m-%d')
            date_c = datetime.strptime(c_date, "%Y-%m-%d")

            # 获取昨天的日期
            yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
            date_y = datetime.strptime(yesterday, "%Y-%m-%d")

            if date_c == date_y:
                # 追加到列表
                file_details.append({
                    'device_name': device_name,
                    'file_name': file_name,
                    'parent_path': parent_path,
                    'create_time': c_date,
                })

    # 打印文件名称和创建时间
    # for file_dict in file_details:
    #     print(
    #         f"('{file_dict['device_name']}','{file_dict['file_name']}','{file_dict['parent_path']}','{file_dict['create_time']}'),")

    current_time = datetime.now()
    print(f"{current_time}:完成创建时间是昨天的文件筛选")
    return file_details


def before_yesterday_files_count(files):
    # 获取每个文件的详细信息
    file_details = []
    for file in files:
        # 剔除掉tmt后缀的文件
        if 'tmt' not in file:

            # 文件夹路径,去掉前面路径,F:\24年路测数据\04-PBOX一型4G传输数据采集\
            parent_path = os.path.dirname(file).replace('\\', '/')[10:]

            # 文件名
            file_name = os.path.basename(file)

            # 设备名称
            device_name = file_name[:4]

            # 文件修改时间
            # m_time = os.path.getmtime(file_path),
            # 文件创建时间
            c_time = os.path.getctime(file)
            # 转换时间格式
            c_datetime = datetime.fromtimestamp(c_time)
            c_date = c_datetime.strftime('%Y-%m-%d')
            date_c = datetime.strptime(c_date, "%Y-%m-%d")

            # 获取昨天的日期
            yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
            date_y = datetime.strptime(yesterday, "%Y-%m-%d")

            if date_c < date_y:
                # 追加到列表
                file_details.append({
                    'device_name': device_name,
                    'file_name': file_name,
                    'parent_path': parent_path,
                    'create_time': c_date,
                })

    # 打印文件名称和创建时间
    # for file_dict in file_details:
    #     print(
    #         f"('{file_dict['device_name']}','{file_dict['file_name']}','{file_dict['parent_path']}','{file_dict['create_time']}'),")

    current_time = datetime.now()
    print(f"{current_time}:完成创建时间在昨天之前的文件筛选")
    return file_details


# ******日志文件的下载记录处理******

# 拼接昨天的文件
def yesterday():
    # 获取昨天的日期
    yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")

    # 设置日志文件的模式,假设是按日期格式命名的,例如: log_2023-03-10.txt
    log_pattern = f"fzs-{yesterday}.log"

    return log_pattern


# 获取昨天之前的文件
def before_yesterday(file):
    # 截取日志文件名的日期部分
    part = file[-14:-4]
    date_p = datetime.strptime(part, "%Y-%m-%d")
    # 获取昨天的日期
    yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
    date_y = datetime.strptime(yesterday, "%Y-%m-%d")

    if date_p < date_y:
        return True
    else:
        return False


# 搜索日志中的下载人员,时间,文件名
def log_download(log_file):
    # 设置关键字,匹配相邻的行,打开通道->建立连接->成功传输
    keyword_d = "150 Opening data channel for file download from server of"
    keyword_t = "TLS connection for data connection established"
    keyword_s = "226 Successfully transferred"

    download = []
    if os.path.exists(log_file):

        with open(log_file, 'r', encoding="utf-8", errors="ignore") as file:
            lines = file.readlines()
            for index, line in enumerate(lines):
                # 匹配相近的三行关键词
                if index <= len(lines) - 2 and keyword_d in line and keyword_t in lines[index + 1] and keyword_s in \
                        lines[
                            index + 2]:
                    # 空格分隔,去掉末尾的换行
                    dr = line.split(' ')[1].strip('\n').replace('/', '-')
                    de = line.split(' ')[4].strip('\n')
                    fn = line.split(' ')[-1].strip('\n')

                    # 对文件路径进行裁剪,"/04-PBOX一型4G传输数据采集/RefData.24/Month.Aug/T_AQ_Incoming_2350.org"
                    fnn = fn.strip('"').split('/')[-1]

                    if 'tmt' not in fnn:
                        download.append({
                            'download_date': dr,
                            'downloader': de,
                            'file_name': fnn,
                        })

    # for dl in download:
    #     print(
    #         f"('{dl['file_name']}','{dl['downloader']}','{dl['download_date']}'),")

    return download


# 处理昨天的日志文件
def yesterday_job(log_path):
    log_pattern = yesterday()
    log_file = os.path.join(log_path, log_pattern)

    ld = []

    if os.path.exists(log_file):
        ld = log_download(log_file)
        sql_log(ld)

    current_time = datetime.now()
    print(f"{current_time}:完成昨天的日志文件筛选")
    # return ld


# 处理昨天之前的日志文件
def before_yesterday_job(log_path):
    # 列出文件夹下所有文件和文件夹
    # files_and_dirs = os.listdir(log_path)
    # 过滤出文件,排除文件夹
    # files = [f for f in files_and_dirs if os.path.isfile(os.path.join(log_path, f))]

    ld = []

    files = dir_file(log_path)

    for file in files:
        if before_yesterday(file):
            ld = log_download(file)
            sql_log(ld)

    current_time = datetime.now()
    print(f"{current_time}:完成昨天之前的日志文件筛选")
    # return ld


# ******数据库插入操作******


# 新增文件的插入sql
def sql_file(sql_sta):
    con = None

    try:
        # 创建数据库连接
        con = Connection(
            host=localhost,  # 主机名
            port=3306,  # 端口
            user=user,  # 账户
            password=password,  # 密码
            database=db,  # 指定操作的数据库
            autocommit=True  # 设置自动提交
        )
        # 获取游标对象
        cursor = con.cursor()

        # 使用游标对象,执行sql语句
        for sta in sql_sta:
            sql_ins = ("insert into record_ftp (device_name, file_name, file_path, create_time) values (" + "'" +
                       sta['device_name'] + "'" + ", " + "'" + sta['file_name'] + "'" + "," + "'" + sta[
                           'parent_path'] + "'" + ", " + "'" + sta['create_time'] + "'" + ")")
            # print(sql_ins)
            cursor.execute(sql_ins)

            # 获取主键
            # print("主键id=", con.insert_id())
            # 确认提交
            # con.commit()
    except Exception as e:
        print("异常:", e)
    finally:
        if con:
            # 关闭连接
            con.close()


# 日志信息的插入sql
def sql_log(sql_sta):
    con = None

    try:
        # 创建数据库连接
        con = Connection(
            host=localhost,  # 主机名
            port=3306,  # 端口
            user=user,  # 账户
            password=password,  # 密码
            database=db,  # 指定操作的数据库
            autocommit=True  # 设置自动提交
        )
        # 获取游标对象
        cursor = con.cursor()

        # 使用游标对象,执行sql语句
        for sta in sql_sta:
            sql_ins = ("insert into record_download_details (file_name, downloader, download_time) values (" + "'" +
                       sta['file_name'] + "'" + ", " + "'" + sta['downloader'] + "'" + "," + "'" + sta[
                           'download_date'] + "'" + ")")
            # print(sql_ins)
            cursor.execute(sql_ins)

            # 获取主键
            # print("主键id=", con.insert_id())
            # 确认提交
            # con.commit()
    except Exception as e:
        print("异常:", e)
    finally:
        if con:
            # 关闭连接
            con.close()


# 处理昨天文件的job
def job_yesterday_file(folder_path):
    if os.path.exists(folder_path):
        # 对文件进行处理
        files = dir_file(folder_path)
        yfc = yesterday_files_count(files)

        sql_file(yfc)
    else:
        # 文件不存在
        current_time = datetime.now()
        print(f"{current_time}:该文件夹不存在", log_path)


# 处理昨天之前文件的job
def job_before_yesterday_file(folder_path):
    if os.path.exists(folder_path):
        # 对文件进行处理
        files = dir_file(folder_path)
        byfc = before_yesterday_files_count(files)

        sql_file(byfc)
    else:
        # 文件不存在
        current_time = datetime.now()
        print(f"{current_time}:该文件夹不存在", log_path)


# 处理昨天的日志文件的job
def job_yesterday_log(log_path):
    if os.path.exists(log_path):
        # 对文件进行处理
        yesterday_job(log_path)
        # 调用before_yesterday_job方法,仅返回最后一个日志文件的下载记录
        # sql_log(yj)
    else:
        # 文件不存在
        current_time = datetime.now()
        print(f"{current_time}:该文件夹不存在", log_path)


# 处理昨天之前的日志文件的job
def job_before_yesterday_log(log_path):
    if os.path.exists(log_path):
        # 对文件进行处理
        before_yesterday_job(log_path)
        # 调用before_yesterday_job方法,仅返回最后一个日志文件的下载记录
        # sql_log(byj)
    else:
        # 文件不存在
        current_time = datetime.now()
        print(f"{current_time}:该文件夹不存在", log_path)


# 多线程运行
def run_threading(job_func, path):
    # 多线程并行运行
    job_thread = threading.Thread(target=job_func, args=(path,))
    job_thread.start()


if __name__ == '__main__':
    # 文件处理
    folder_path = os.getcwd()
    # folder_path = r'E:\DPI\WDWG'

    # 日志处理
    # log_path = os.getcwd()
    # log_path = r'E:\data\ftp的日志\Logs'
    log_path = r'F:\FileZilla\FileZilla Serve 中文版\Logs'

    run_time = "00:10"

    current_time = datetime.now()
    print(f"{current_time}:每天" + run_time + "执行任务,统计昨天新增的文件和昨天下载的日志信息")

    # 创建调度器
    schedule_daily_file = schedule.Scheduler()
    schedule_daily_log = schedule.Scheduler()

    # 每天定时调度任务
    schedule_daily_file.every().day.at(run_time).do(run_threading, job_yesterday_file, folder_path)
    current_time = datetime.now()
    print(f"{current_time}:完成昨日新增文件的统计")
    schedule_daily_log.every().day.at(run_time).do(run_threading, job_yesterday_log, log_path)
    current_time = datetime.now()
    print(f"{current_time}:完成昨日日志下载的统计")

    # 初始化时,调用一次
    job_before_yesterday_file(folder_path)
    job_before_yesterday_log(log_path)

    # 立即执行所有任务
    schedule_daily_file.run_all()
    schedule_daily_log.run_all()

    while True:
        schedule_daily_file.run_pending()
        schedule_daily_log.run_pending()
        time.sleep(1)

运行的日志:

数据库:昨日新增文件的表

下载记录表:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2083208.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

仓颉编程语言亮相全国大学生计算机系统能力大赛

2024年8月18日-22日&#xff0c;由全国高等学校计算机教育研究会、系统能力培养研究专家组、系统能力培养研究项目发起高校主办&#xff0c;杭州电子科技大学承办的2024全国大学生计算机系统能力大赛编译系统设计赛&#xff08;华为毕昇杯&#xff09;及操作系统设计赛在杭电下…

企业防泄密首选!哪款公司防泄密软件更强?看这里,一文解惑!

早在2011年&#xff0c;前苹果员工Paul Devine泄露苹果公司的机密信息&#xff0c;涉及新产品的预测、计划蓝图、价格和产品特征&#xff0c;还为苹果公司的合作伙伴、供应商和代工厂商提供的关于苹果公司的数据&#xff0c;这使得这些供应商和代工厂商拥有了与苹果谈判的筹码&…

Wireless Communications - 模拟调制

AM/DSB/VSB/SSB的调制与解调 AM DSB SSB 滤波法 相移法 VSB 相干解调 线性调制的抗噪声分析 DSB SSB FM/PM 的调制与解调 NBFM WBFM 调频信号的产生和解调 模拟调制对比

SpringBootFFmpeg实现M3U8切片转码播放(本地)

文章目录 参考概述代码pom.xmlffmpegFFmpegUtilsMediaInfoTranscodeConfig application.ymlApplicationUploadControllerindex.html 测试 参考 springboot-ffmpeg-demo gitee代码 SpringBoot FFmpeg实现一个简单的M3U8切片转码系统 FFmpeg音视频核心技术精讲 - 百度网盘 概…

【STM32】红外遥控

红外遥控&#xff0c;掌握了就能装逼了&#xff0c;哈哈哈哈哈哈。 大部分图片来源&#xff1a;正点原子HAL库课程 专栏目录&#xff1a;记录自己的嵌入式学习之路-CSDN博客 1 器件特性 这里载波发射周期的发射与不发射时间实际上是因为载波是38kHz、占空为三分之一的方波&a…

符号译码_网络同步赛

哎……又是 If平推字符 #include <bits/stdc++.h> using namespace std; signed main(){char x;while(cin>>x){if(x==1){cin>>x;if(x==1)cout<<">";else if(x == 0){cin>>x;if(x==1)cout<<"]";else if(x==0)cout&…

Three.js湖边小屋,包含gltf渲染、天空和水纹、光照阴影、运动的点光源、相机位置和文字切屏、粒子效果等

前期准备 使用vue3vitethree.jsgsap 开发 npm install three gsap 代码 <script setup> // 导入three.js import * as THREE from three; // 导入轨道控制器 import { OrbitControls } from three/examples/jsm/controls/OrbitControls.js; // 加载模型 import { GLT…

SQLserver中的游标的分类和游标的生命周期

SQLserver中的游标的分类 在 SQL Server 中&#xff0c;游标&#xff08;Cursor&#xff09;是一种数据库对象&#xff0c;用于逐行处理结果集中的数据。游标可以用于复杂的数据处理任务&#xff0c;尤其是那些不能通过简单的 SELECT 语句和 JOIN 操作完成的任务。SQL Server …

网络通信---三次握手

文章目录 概述第一次握手第二次握手第三次握手整体看下 小结 概述 “三次握手”&#xff08;Three-way Handshake&#xff09;是TCP/IP协议中建立一个可靠的连接时使用的一种机制。这个过程确保了两个网络实体&#xff08;通常是两台计算机&#xff09;在开始数据传输之前能够…

std::futrue异步操作结果的三种搭配使用

目录 一、std::future 应用场景 二、使用 std::async关联异步任务 三、使用std::packaged_task和std::future配合 四、std::promise和std::future配合 一、std::future std::future是C11标准库中的⼀个模板类&#xff0c;它表⽰⼀个异步操作的结果。当我们在多线程编程中使…

VBA技术资料MF194:屏蔽右键菜单

我给VBA的定义&#xff1a;VBA是个人小型自动化处理的有效工具。利用好了&#xff0c;可以大大提高自己的工作效率&#xff0c;而且可以提高数据的准确度。“VBA语言専攻”提供的教程一共九套&#xff0c;分为初级、中级、高级三大部分&#xff0c;教程是对VBA的系统讲解&#…

云计算概述

云计算的产生以及发展 分布式计算&#xff1a;包含了云计算和网格计算 云计算&#xff1a;以数据为中心进行的计算 网格计算&#xff1a;以计算为中心进行的计算 诞生-1999 初期的发展-2007-2008 加速发展-2009-2014 日渐成熟阶段-2015-目前 云计算的种类 公有云-第三方提供…

第74集《大佛顶首楞严经》

请大家打开讲义第一百六十三页。我们讲到丑一&#xff0c;圆破色阴超劫浊。 在整个五阴的对治当中&#xff0c;第一个所要对治的&#xff0c;最粗重的就是色阴&#xff1b;色阴所引生的根结&#xff0c;就是动静两种的结相。当我们开始在闻的功能当中&#xff0c;不再攀缘外在…

@ohos.systemParameterEnhance系统参数接口调用:控制设备硬件(执行shell命令方式)

本文介绍如何使用应用ohos.systemParameterEnhance (系统参数)(系统接口)来控制设备硬件&#xff0c;可以通过它在系统中执行一段shell命令&#xff0c;从而实现控制设备的效果。接下来以一个实际的样例来演示如何通过它来控制设备以太网接口 开源地址&#xff1a;https://git…

售后更新出现问题分析-幂等和防重

2024-08-27 早上测试提交BUG,说售后单状态流转不对&#xff0c;吓得我一激灵&#xff0c;赶紧打开IDEA 查看代码&#xff0c;发现售后这块代码没有动过呀&#xff0c;咋回事&#xff1f; 流程是这样的&#xff1a; 测试模拟用户下单&#xff0c;提交订单后付款&#xff0c;然后…

[MOCO v2] Improved Baselines with Momentum Contrastive Learning

1、目的 结合SimCLR和MoCo&#xff0c;实现SoTA 2、方法 ​​​​​​​ ​​​​​​​ 将SimCLR的两点设计融入MoCo中&#xff1a; 1&#xff09;MLP projection head 2-layer, hidden layer 2048-d, with ReLU 2&#xff09;more data augmentation blur a…

【专项刷题】— 链表

1、2两数相加 - 力扣&#xff08;LeetCode&#xff09; 思路&#xff1a; 只要有任意一个链表还没有为空的时候就继续加&#xff0c;当链表为空的时候但是t不尾0&#xff0c;还是进入循环进行操作 代码&#xff1a; public ListNode addTwoNumbers(ListNode l1, ListNode l2) {…

【HuggingFace Transformers】LlamaModel源码解析

LlamaModel源码解析 1. LlamaModel 介绍2. LlamaModel类 源码解析3. 4维因果注意力掩码生成 1. LlamaModel 介绍 LlamaModel 是一个基于 Transformer 架构的解码器模型&#xff0c;用于自然语言处理任务。它是 Meta 的 LLaMA (Large Language Model Meta AI) 系列的一部分&…

Spatial Structure Constraints for Weakly SupervisedSemantic Segmentation

摘要 由于易于获得&#xff0c;图像级标签在弱监督语义分割任务中很受欢迎。 由于图像级标签只能指示特定类别对象的存在或不存在&#xff0c;因此基于可视化的技术已被广泛采用来提供对象位置线索。由于类激活图(class activation map, CAMs)只能定位目标中最具辨识性的部分…

API测试基础知识(基本概念、测试方法、测试工具)

在进行API测试之前&#xff0c;我们先了解一下 什么是API&#xff1f; API&#xff08;全称Application Programming Interface&#xff09;是两个单独的软件系统之间的通信和数据交换。实现API的软件系统包含可以由另一个软件系统执行的功能/子例程。 什么是API测试 API测…