每天的零点十分,定时向mysql数据库插入,昨天新增的文件和昨天下载文件的记录。第一次运行的时候,会全量同步昨天之前的数据。
import os
import threading
from datetime import datetime, timedelta
import time
import schedule
from pymysql import Connection
localhost = 'localhost'
user = 'root'
password = 'root'
db = 'book'
# 筛选所有非文件夹的文件
def dir_file(folder_path):
filess = []
for root, dirs, files in os.walk(folder_path):
for file in files:
filess.append(os.path.join(root, file))
return filess
# ******上传文件的处理******
# 根据文件的创建日期,提前文件名、创建时间、路径等信息
def yesterday_files_count(files):
# 获取每个文件的详细信息
file_details = []
for file in files:
# 剔除掉tmt后缀的文件
if 'tmt' not in file:
# 文件夹路径,去掉前面路径,F:\24年路测数据\04-PBOX一型4G传输数据采集\
parent_path = os.path.dirname(file).replace('\\', '/')[10:]
# 文件名
file_name = os.path.basename(file)
# 设备名称
device_name = file_name[:4]
# 文件修改时间
# m_time = os.path.getmtime(file_path),
# 文件创建时间
c_time = os.path.getctime(file)
# 转换时间格式
c_datetime = datetime.fromtimestamp(c_time)
c_date = c_datetime.strftime('%Y-%m-%d')
date_c = datetime.strptime(c_date, "%Y-%m-%d")
# 获取昨天的日期
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
date_y = datetime.strptime(yesterday, "%Y-%m-%d")
if date_c == date_y:
# 追加到列表
file_details.append({
'device_name': device_name,
'file_name': file_name,
'parent_path': parent_path,
'create_time': c_date,
})
# 打印文件名称和创建时间
# for file_dict in file_details:
# print(
# f"('{file_dict['device_name']}','{file_dict['file_name']}','{file_dict['parent_path']}','{file_dict['create_time']}'),")
current_time = datetime.now()
print(f"{current_time}:完成创建时间是昨天的文件筛选")
return file_details
def before_yesterday_files_count(files):
# 获取每个文件的详细信息
file_details = []
for file in files:
# 剔除掉tmt后缀的文件
if 'tmt' not in file:
# 文件夹路径,去掉前面路径,F:\24年路测数据\04-PBOX一型4G传输数据采集\
parent_path = os.path.dirname(file).replace('\\', '/')[10:]
# 文件名
file_name = os.path.basename(file)
# 设备名称
device_name = file_name[:4]
# 文件修改时间
# m_time = os.path.getmtime(file_path),
# 文件创建时间
c_time = os.path.getctime(file)
# 转换时间格式
c_datetime = datetime.fromtimestamp(c_time)
c_date = c_datetime.strftime('%Y-%m-%d')
date_c = datetime.strptime(c_date, "%Y-%m-%d")
# 获取昨天的日期
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
date_y = datetime.strptime(yesterday, "%Y-%m-%d")
if date_c < date_y:
# 追加到列表
file_details.append({
'device_name': device_name,
'file_name': file_name,
'parent_path': parent_path,
'create_time': c_date,
})
# 打印文件名称和创建时间
# for file_dict in file_details:
# print(
# f"('{file_dict['device_name']}','{file_dict['file_name']}','{file_dict['parent_path']}','{file_dict['create_time']}'),")
current_time = datetime.now()
print(f"{current_time}:完成创建时间在昨天之前的文件筛选")
return file_details
# ******日志文件的下载记录处理******
# 拼接昨天的文件
def yesterday():
# 获取昨天的日期
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
# 设置日志文件的模式,假设是按日期格式命名的,例如: log_2023-03-10.txt
log_pattern = f"fzs-{yesterday}.log"
return log_pattern
# 获取昨天之前的文件
def before_yesterday(file):
# 截取日志文件名的日期部分
part = file[-14:-4]
date_p = datetime.strptime(part, "%Y-%m-%d")
# 获取昨天的日期
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
date_y = datetime.strptime(yesterday, "%Y-%m-%d")
if date_p < date_y:
return True
else:
return False
# 搜索日志中的下载人员,时间,文件名
def log_download(log_file):
# 设置关键字,匹配相邻的行,打开通道->建立连接->成功传输
keyword_d = "150 Opening data channel for file download from server of"
keyword_t = "TLS connection for data connection established"
keyword_s = "226 Successfully transferred"
download = []
if os.path.exists(log_file):
with open(log_file, 'r', encoding="utf-8", errors="ignore") as file:
lines = file.readlines()
for index, line in enumerate(lines):
# 匹配相近的三行关键词
if index <= len(lines) - 2 and keyword_d in line and keyword_t in lines[index + 1] and keyword_s in \
lines[
index + 2]:
# 空格分隔,去掉末尾的换行
dr = line.split(' ')[1].strip('\n').replace('/', '-')
de = line.split(' ')[4].strip('\n')
fn = line.split(' ')[-1].strip('\n')
# 对文件路径进行裁剪,"/04-PBOX一型4G传输数据采集/RefData.24/Month.Aug/T_AQ_Incoming_2350.org"
fnn = fn.strip('"').split('/')[-1]
if 'tmt' not in fnn:
download.append({
'download_date': dr,
'downloader': de,
'file_name': fnn,
})
# for dl in download:
# print(
# f"('{dl['file_name']}','{dl['downloader']}','{dl['download_date']}'),")
return download
# 处理昨天的日志文件
def yesterday_job(log_path):
log_pattern = yesterday()
log_file = os.path.join(log_path, log_pattern)
ld = []
if os.path.exists(log_file):
ld = log_download(log_file)
sql_log(ld)
current_time = datetime.now()
print(f"{current_time}:完成昨天的日志文件筛选")
# return ld
# 处理昨天之前的日志文件
def before_yesterday_job(log_path):
# 列出文件夹下所有文件和文件夹
# files_and_dirs = os.listdir(log_path)
# 过滤出文件,排除文件夹
# files = [f for f in files_and_dirs if os.path.isfile(os.path.join(log_path, f))]
ld = []
files = dir_file(log_path)
for file in files:
if before_yesterday(file):
ld = log_download(file)
sql_log(ld)
current_time = datetime.now()
print(f"{current_time}:完成昨天之前的日志文件筛选")
# return ld
# ******数据库插入操作******
# 新增文件的插入sql
def sql_file(sql_sta):
con = None
try:
# 创建数据库连接
con = Connection(
host=localhost, # 主机名
port=3306, # 端口
user=user, # 账户
password=password, # 密码
database=db, # 指定操作的数据库
autocommit=True # 设置自动提交
)
# 获取游标对象
cursor = con.cursor()
# 使用游标对象,执行sql语句
for sta in sql_sta:
sql_ins = ("insert into record_ftp (device_name, file_name, file_path, create_time) values (" + "'" +
sta['device_name'] + "'" + ", " + "'" + sta['file_name'] + "'" + "," + "'" + sta[
'parent_path'] + "'" + ", " + "'" + sta['create_time'] + "'" + ")")
# print(sql_ins)
cursor.execute(sql_ins)
# 获取主键
# print("主键id=", con.insert_id())
# 确认提交
# con.commit()
except Exception as e:
print("异常:", e)
finally:
if con:
# 关闭连接
con.close()
# 日志信息的插入sql
def sql_log(sql_sta):
con = None
try:
# 创建数据库连接
con = Connection(
host=localhost, # 主机名
port=3306, # 端口
user=user, # 账户
password=password, # 密码
database=db, # 指定操作的数据库
autocommit=True # 设置自动提交
)
# 获取游标对象
cursor = con.cursor()
# 使用游标对象,执行sql语句
for sta in sql_sta:
sql_ins = ("insert into record_download_details (file_name, downloader, download_time) values (" + "'" +
sta['file_name'] + "'" + ", " + "'" + sta['downloader'] + "'" + "," + "'" + sta[
'download_date'] + "'" + ")")
# print(sql_ins)
cursor.execute(sql_ins)
# 获取主键
# print("主键id=", con.insert_id())
# 确认提交
# con.commit()
except Exception as e:
print("异常:", e)
finally:
if con:
# 关闭连接
con.close()
# 处理昨天文件的job
def job_yesterday_file(folder_path):
if os.path.exists(folder_path):
# 对文件进行处理
files = dir_file(folder_path)
yfc = yesterday_files_count(files)
sql_file(yfc)
else:
# 文件不存在
current_time = datetime.now()
print(f"{current_time}:该文件夹不存在", log_path)
# 处理昨天之前文件的job
def job_before_yesterday_file(folder_path):
if os.path.exists(folder_path):
# 对文件进行处理
files = dir_file(folder_path)
byfc = before_yesterday_files_count(files)
sql_file(byfc)
else:
# 文件不存在
current_time = datetime.now()
print(f"{current_time}:该文件夹不存在", log_path)
# 处理昨天的日志文件的job
def job_yesterday_log(log_path):
if os.path.exists(log_path):
# 对文件进行处理
yesterday_job(log_path)
# 调用before_yesterday_job方法,仅返回最后一个日志文件的下载记录
# sql_log(yj)
else:
# 文件不存在
current_time = datetime.now()
print(f"{current_time}:该文件夹不存在", log_path)
# 处理昨天之前的日志文件的job
def job_before_yesterday_log(log_path):
if os.path.exists(log_path):
# 对文件进行处理
before_yesterday_job(log_path)
# 调用before_yesterday_job方法,仅返回最后一个日志文件的下载记录
# sql_log(byj)
else:
# 文件不存在
current_time = datetime.now()
print(f"{current_time}:该文件夹不存在", log_path)
# 多线程运行
def run_threading(job_func, path):
# 多线程并行运行
job_thread = threading.Thread(target=job_func, args=(path,))
job_thread.start()
if __name__ == '__main__':
# 文件处理
folder_path = os.getcwd()
# folder_path = r'E:\DPI\WDWG'
# 日志处理
# log_path = os.getcwd()
# log_path = r'E:\data\ftp的日志\Logs'
log_path = r'F:\FileZilla\FileZilla Serve 中文版\Logs'
run_time = "00:10"
current_time = datetime.now()
print(f"{current_time}:每天" + run_time + "执行任务,统计昨天新增的文件和昨天下载的日志信息")
# 创建调度器
schedule_daily_file = schedule.Scheduler()
schedule_daily_log = schedule.Scheduler()
# 每天定时调度任务
schedule_daily_file.every().day.at(run_time).do(run_threading, job_yesterday_file, folder_path)
current_time = datetime.now()
print(f"{current_time}:完成昨日新增文件的统计")
schedule_daily_log.every().day.at(run_time).do(run_threading, job_yesterday_log, log_path)
current_time = datetime.now()
print(f"{current_time}:完成昨日日志下载的统计")
# 初始化时,调用一次
job_before_yesterday_file(folder_path)
job_before_yesterday_log(log_path)
# 立即执行所有任务
schedule_daily_file.run_all()
schedule_daily_log.run_all()
while True:
schedule_daily_file.run_pending()
schedule_daily_log.run_pending()
time.sleep(1)
运行的日志:
数据库:昨日新增文件的表
下载记录表: