面向对象编程过程的实例:
1)数据准备
2)数据处理、加工方式
3)数据校验
# -*- coding: utf-8 -*-
import pyodbc
import xlrd
import numpy as np
import openpyxl
import win32com.client
import time
import datetime
import pythoncom
import os
import re
import imaplib
import xlwings as xw
import pandas as pd
import zipfile
import psutil
import subprocess
from con_data import EmailInfo
import logging
import functools
from pathlib import Path
class ReadData():
def __init__(self):
#配置logging装饰器
logging.basicConfig(filename=r'C:\*****\info.log',
filemode='w',
format='%(asctime)s | %(levelname)s | %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=logging.INFO)
logging.basicConfig(level=logging.WARNING)
logging.basicConfig(level=logging.ERROR)
self.path = r"C:\*****\"
self.current_date = datetime.datetime.now()
self.last_month = self.current_date - datetime.timedelta(days=self.current_date.day)
self.last_month_month = self.last_month.strftime("%Y%m")
self.file_name=f'{self.last_month_month}.csv'
self.zip_name=f'{self.last_month_month}.zip'
self.file_path=os.path.join(self.path,self.file_name)
self.zip_path=os.path.join(self.path,self.zip_name)
def logged(func):
@functools.wraps(func)
def with_logging(*args, **kwargs):
try:
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
execution_time = end_time - start_time
logging.info(f"Function {func.__name__} executed in {execution_time} seconds")
logging.info(f"{func.__name__} returned {result}")
logging.warning(f"Calling {func.__name__!r} with {' and '.join(map(str, args))}")
logging.warning(f"{func.__name__} returned: {result}")
return result
except Exception as e:
logging.error(f"Error in function {func.__name__}: {e}")
raise
return with_logging
#此函数表示的是获取每年的每个月最后工作日
@logged
def get_workingday(self):
self.now = pd.Timestamp.now()
self.last_day=pd.Timestamp(year=self.now.year,month=self.now.month-1,day=1)+pd.offsets.MonthEnd(1)
while self.last_day.weekday()>=5:
self.last_day-=pd.Timedelta(days=1)
self.last_date=int(self.last_day.strftime("%Y%m%d"))
else:
self.last_date=int(self.last_day.strftime("%Y%m%d"))
print(self.last_date)
return self.last_date
#此函数可以表示读取本地odbc数据源配置的数据库驱动
@logged
def read_odbc(self):
Data = ReadData
self.date=Data.get_workingday(self)
self.connection_string=(
"*****"
"Host=*****"
"Port=******;"
)
self.conn=pyodbc.connect(self.connection_string, autocommit=True)
self.sql=f"********;"
self.df=pd.read_sql(self.sql,self.conn)
return self.df
#表示结束本地excel进程
@logged
def kill_excel_by_pids(self):
self.pids=psutil.pids()
for pid in self.pids:
try:
p=psutil.Process(pid)
if p.name()=='EXCEL.EXE':
cmd='taskkill/F/IM EXCEL.EXE'
os.system(cmd)
except Exception as e:
print(e)
#表示刷新本地excel数据透视表
@logged
def refresh_pivot(self):
App=xw.App(visible=False,add_book=False)
wb=App.books.open(r"C:******.xlsx")
sheet=wb.sheets('Sheet1')
res=sheet.api.PivotTables('PivotTable1').PivotCache().Refresh()
wb.save()
wb.close()
App.quit()
#加密压缩csv or excel文件(按需转换)
@logged
def csv_zip(self):
self.pwd='********'
self.addpassword = '-p%s'%(self.pwd)
Data = ReadData
self.data=Data.read_odbc(self)
self.data.to_csv(self.file_path, index=False)
if os.path.exists(self.file_path)==True:
self.cmd = ['7z','a',self.zip_path,self.file_path,self.addpassword,'-tzip']
subprocess.Popen(self.cmd,executable=r'C:\Program Files\7-Zip\7z.exe')
print("Compress Complete 100%")
else:
print("No Files")
return self.zip_path
#python 结合 outlook api实例
@logged
def send_email(self):
Data = ReadData
self.path=Data.csv_zip(self)
self.subject='******'
self.body=f"""
**********\n
Best regards,\n
Anna"""
self.recipient='********'
outlook=win32com.client.Dispatch('Outlook.Application')
mail=outlook.CreateItem(0)
mail.Subject=self.subject
mail.Body=self.body
mail.To=self.recipient
mail.Attachments.Add(Source=self.path)
mail.Send()
#遍历文件夹下数据最新更新时间、大小、名字,数据校验阶段
@logged
def check_data(self):
self.latest_files = []
self.check_infos=[]
self.folder_path = r'*********' # 替换为你的文件夹路径
self.n = 4
for filename in os.listdir(self.folder_path):
file_path = os.path.join(self.folder_path, filename)
if os.path.isfile(self.file_path):
file_stats = os.stat(file_path)
self.latest_files.append((filename, file_stats.st_size, file_stats.st_mtime))
self.latest_files.sort(key=lambda x: x[2], reverse=True)
for filename, filesize, mtime in self.latest_files[:self.n]:
filesize=filesize//1024
dt_object = datetime.datetime.fromtimestamp(mtime)
date_string = dt_object.strftime('%Y-%m-%d %H:%M:%S')
check_info=f"文件名: {filename}, 大小: {filesize } KB, 最后修改时间: {date_string}"
self.check_infos.append(check_info)
print(f"文件名: {filename}, 大小: {filesize } KB, 最后修改时间: {date_string}")
return self.check_infos
if __name__ == '__main__':
Data = ReadData()
Data.send_email()
Data.check_data()