00.创作背景,在每天的巡检报告中要 要检查oa相关服务器的备份作业是否备份成功
那个备份软件有个功能,就是完成备份作业后,可以发送信息到我的邮箱。
01.通过检查我邮箱的信息,就可以了解那个备份作业的情况。
通过解释邮件的名称可以了解备份作业的名称。
通过解释邮件的内容可以得知作业是否完成,因为备份作业前会发一封邮件告诉我备份作业开始的。当备份作业完成后也会发一封邮件显示作业完成。
这样可以间接对备份作业进行一个跟踪。
因为邮件的内容是一种接近于html的格式,所以要编写一个方法对内容进行解释,获取自己想要的内容。
03.备份作业分为每天,每周,每月。
首先讲一下每天,执行备份作业时间,从早上到晚上都有的。而我检查邮箱的时间是早上9点左右,所以对于一些晚上10点才执行的备份作业,我只检查昨天的未读邮件。有些作业超过24小时的,我直接检查两天。
之后是每周,检查时间最长跨度是7天,但有时周五夜易的备份作业,周五的早上检查不出来,那我就把检查时间的跨度改为8天。
每月同理,但要计算每月每一个周六,周日的时间,每月第二个周六,周日的时间。
例如:今个月没有到第一个周六,就显示上个月第一个周六。
'''000导入包'''
#01邮件的api
'''收邮件的包'''
import imaplib
'''邮件的工具'''
import email.utils
from html import unescape as un
#02.解释html的包
#
# import date_util as du
# import read_html as rh
# import bk_job_info as bji
'''读取html的包,用来解释html'''
import util.read_html as rh
'''处理日期的包'''
import util.date_util as du
'''保存账号信息的包'''
import entity.ssh_info as si
#03.导入修改时间格式的类
from datetime import datetime, timedelta
#04.导入保存作业信息的包
'''100.这个是变量'''
#获取imap的信息 例如:'imap.163.com'
imap=si.ssh_access().imap
#获取我邮箱的密码
email_pwd=si.ssh_access().email_pwd
#获取我邮箱的账号
email_my_account=si.ssh_access().emal_my_account
#这是发件人的账号,这个是固定的
email_bk_account=si.ssh_access().email_bk_account
# 101.设置时间范围
'''因为有时时间跨度在1天多一点,所以时间定在2天前'''
since = (datetime.now() - timedelta(days=2)).strftime('%d-%b-%Y') # 2天前
'''同上,但这个时间跨度为7天'''
week = (datetime.now() - timedelta(days=7)).strftime('%d-%b-%Y') # 7天前
before = datetime.now().strftime('%d-%b-%Y') # 现在
now=datetime.now().strftime("%Y-%m-%d") #例如 获取系统当前时间,并以2024-04-18格式显示
yesterday=(datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
keyword_every_day="每天"
today=0
#判断当天的增量备份是否成功
def check_today_backup(
imap,
email_my_account,
email_pwd,
job_name
):
'''200.业务逻辑'''
# 201.连接到邮箱服务器
mail = imaplib.IMAP4_SSL(imap)
pwd = email_pwd
# 202.登录邮箱(这个要隐藏)
mail.login(email_my_account, pwd)
# 203选择邮箱文件夹
mail.select('INBOX')
# 204.搜索符合条件的邮件
"""status, data = mail.search(None, 'ALL') """# 搜索所有邮件
"""status, data = mail.search(None, '(UNSEEN)')""" # 搜索未读邮件
# 205.近一天未读邮件
status, data = mail.search(None, '(UNSEEN)'.format(since,before)) # 搜索未读邮件
if status == 'OK':
# 获取邮件id列表
email_ids = data[0].split()
split_list=""
return_list=[]
# 遍历邮件id列表
for email_id in email_ids:
# 根据邮件id获取邮件内容
status, data = mail.fetch(email_id, '(RFC822)')
raw_email = data[0][1]
# 解析原始邮件内容
msg = email.message_from_bytes(raw_email)
body = None
# 判断邮件内容类型
if msg.is_multipart():
for part in msg.get_payload():
if part.get_content_type() == 'text/plain':
body = part.get_payload(decode=True).decode('utf-8')
else:
body = msg.get_payload(decode=True).decode('utf-8')
plain_text = un(body)
# print('正文内容:', plain_text)
content=rh.explain_html(plain_text)
# print("content的值:")
# print(content)
for i in content:
split_list = i.split(":")
if split_list[1]=="作业完成。":
if now in content[8]:
if content[0].split(":")[1]==job_name:
return_list.append(True)
return_list.append(content[0].split(":")[1])
return_list.append(content[8][5:])
return return_list
#作业结束时间
# 关闭连接
mail.close()
mail.logout()
#检查昨天的备份
def check_yesterday_backup(
imap,
email_my_account,
email_pwd,
job_name
):
'''200.业务逻辑'''
# 201.连接到邮箱服务器
mail = imaplib.IMAP4_SSL(imap)
pwd = email_pwd
# 202.登录邮箱(这个要隐藏)
mail.login(email_my_account, pwd)
# 203选择邮箱文件夹
mail.select('INBOX')
# 204.搜索符合条件的邮件
"""status, data = mail.search(None, 'ALL') """# 搜索所有邮件
"""status, data = mail.search(None, '(UNSEEN)')""" # 搜索未读邮件
# 205.近一天未读邮件
status, data = mail.search(None, '(UNSEEN)'.format(since,before)) # 搜索未读邮件
# 获取邮件id列表
email_ids = data[0].split()
split_list=""
return_list=[]
# 遍历邮件id列表
for email_id in email_ids:
# 根据邮件id获取邮件内容
status, data = mail.fetch(email_id, '(RFC822)')
raw_email = data[0][1]
# 解析原始邮件内容
msg = email.message_from_bytes(raw_email)
body = None
# 判断邮件内容类型
if msg.is_multipart():
for part in msg.get_payload():
if part.get_content_type() == 'text/plain':
body = part.get_payload(decode=True).decode('utf-8')
else:
body = msg.get_payload(decode=True).decode('utf-8')
plain_text = un(body)
# print('正文内容:', plain_text)
content=rh.explain_html(plain_text)
# print("content的值:")
# print(content)
for i in content:
split_list = i.split(":")
if split_list[1]=="作业完成。":
# print("yesterday的值:")
# print(yesterday)
# print("content[8]的值 是开始时间")
# print(content[8])
if yesterday in content[8]:
if content[0].split(":")[1]==job_name:
# print("这里能执行?")
return_list.append(True)
return_list.append(content[0].split(":")[1])
return_list.append(content[8][5:])
# print(return_list)
return return_list
#作业结束时间
# 关闭连接
mail.close()
mail.logout()
#检查每周的全量备份
def check_week_full_backup(
imap,
email_my_account,
email_pwd,
job_name
):
'''200.业务逻辑'''
# 201.连接到邮箱服务器
mail = imaplib.IMAP4_SSL(imap)
pwd = email_pwd
# 202.登录邮箱(这个要隐藏)
mail.login(email_my_account, pwd)
# 203选择邮箱文件夹
mail.select('INBOX')
# 204.搜索符合条件的邮件
"""status, data = mail.search(None, 'ALL') """# 搜索所有邮件
"""status, data = mail.search(None, '(UNSEEN)')""" # 搜索未读邮件
# 205.近一周未读邮件,搜索条件不起效
status, data = mail.search(None, '(UNSEEN SINCE{})'.format(week)) # 搜索未读邮件
# 获取邮件id列表
email_ids = data[0].split()
split_list=""
return_list=[]
# 遍历邮件id列表
for email_id in email_ids:
# 根据邮件id获取邮件内容
status, data = mail.fetch(email_id, '(RFC822)')
raw_email = data[0][1]
# 解析原始邮件内容
msg = email.message_from_bytes(raw_email)
body = None
# 判断邮件内容类型
if msg.is_multipart():
for part in msg.get_payload():
if part.get_content_type() == 'text/plain':
body = part.get_payload(decode=True).decode('utf-8')
else:
body = msg.get_payload(decode=True).decode('utf-8')
plain_text = un(body)
# print('正文内容:', plain_text)
content=rh.explain_html(plain_text)
# print("content的值:")
# print(content)
for i in content:
split_list = i.split(":")
# print("content的值")
# print(content)
if split_list[1]=="作业完成。":
# print("这是周的")
# print("content的值")
# print(content)
# print("yesterday的值:")
# print(yesterday)
# print("content[8]的值")
# print(content[8].split("T")[0].split(":")[1])
recent_time=content[8].split("T")[0].split(":")[1]
# 判断今天是不是周五
isFri=du.isFriday()
isMon=du.isMonday()
# if isFri==True or isMon==True:
if isFri==True:
# print("today的值")
#如果今天是周五,参数是8
today=8
# print(today)
else:
# print("today的值")
today=7
#如果今天不是周五,参数是7
# print(today)
if du.days_in_recently(recent_time,today):
# print("content的值")
# print(content)
if content[0].split(":")[1]==job_name:
# print("这里能执行?")
# print("content的值")
# print(content)
# print("job_name")
# print(content[0].split(":")[1])
# print("job_name")
# print(job_name)
return_list.append(True)
return_list.append(content[0].split(":")[1])
return_list.append(content[8][5:])
# print("return_list的值")
# print(return_list)
return return_list
#作业结束时间
if content[0].split(":")[1]=='OA_apache3_每周一全备' and content[8][5:].split("T")[0]==now:
# print("test 2024-5-27")
# print("content的值")
# print(content)
# print("job_name")
# print(content[0].split(":")[1])
# print("job_name")
# print(job_name)
return_list.append(True)
return_list.append(content[0].split(":")[1])
return_list.append(content[8][5:])
# print("return_list的值")
# print(return_list)
return return_list
# 关闭连接
mail.close()
mail.logout()
#day_gaps= 每个月第一个周六 每个月第一个周日 每个月第二个周六 每个月第二个周日
#控制查询未读邮件的范围
#检查每月的全量备份
def check_month_full_backup(
imap,
email_my_account,
email_pwd,
job_name,
day_gaps
):
# 200.业务逻辑
# 201.连接到邮箱服务器
mail = imaplib.IMAP4_SSL(imap)
pwd = email_pwd
# 202.登录邮箱(这个要隐藏)
mail.login(email_my_account, pwd)
# 203选择邮箱文件夹
mail.select('INBOX')
# 204.搜索符合条件的邮件
"""status, data = mail.search(None, 'ALL') """# 搜索所有邮件
"""status, data = mail.search(None, '(UNSEEN)')""" # 搜索未读邮件
# 205.近一个月未读邮件
status, data = mail.search(None, '(UNSEEN)'.format(day_gaps,before)) # 搜索未读邮件
# 获取邮件id列表
if status == 'OK':
email_ids = data[0].split()
split_list=""
return_list=[]
# 遍历邮件id列表
for email_id in email_ids:
# 根据邮件id获取邮件内容
status, data = mail.fetch(email_id, '(RFC822)')
raw_email = data[0][1]
# 解析原始邮件内容
msg = email.message_from_bytes(raw_email)
body = None
# 判断邮件内容类型
if msg.is_multipart():
for part in msg.get_payload():
if part.get_content_type() == 'text/plain':
body = part.get_payload(decode=True).decode('utf-8')
else:
body = msg.get_payload(decode=True).decode('utf-8')
plain_text = un(body)
# print('正文内容:', plain_text)
content=rh.explain_html(plain_text)
# print("content的值:")
# print(content)
for i in content:
split_list = i.split(":")
# print("content的值")
# print(content)
# print("split_list[1]的类型")
# print(type(split_list[1]))
# print("now的类型")
# print(type(now))
#如果含有作业开始,标记为已读
if split_list[1]=="作业开始。":
mail.store(email_id,'+FLAGS', '\\Seen')
#如果含有每天且不含有今天,昨天日期的,标记为已读
if keyword_every_day in split_list[1] and now not in content[8] and yesterday not in content[8]:
mail.store(email_id,'+FLAGS', '\\Seen')
if split_list[1]=="作业完成。":
# print("content的值")
# print(content)
# print("yesterday的值:")
# print(yesterday)
# print("content[8]的值 是开始时间")
# print(content[8])
if content[0].split(":")[1]==job_name:
# print("这里能执行?")
return_list.append(True)
return_list.append(content[0].split(":")[1])
return_list.append(content[8][5:])
# print("return_list的值")
# print(return_list)
return return_list
#作业结束时间
# 关闭连接
mail.close()
mail.logout()
def check_backup(
imap,
email_my_account,
email_pwd,
job_name_list #工作名的列表
):
'''200.业务逻辑'''
# 201.连接到邮箱服务器
mail = imaplib.IMAP4_SSL(imap)
pwd = email_pwd
# 202.登录邮箱(这个要隐藏)
mail.login(email_my_account, pwd)
# 203选择邮箱文件夹
mail.select('INBOX')
# 204.搜索符合条件的邮件
"""status, data = mail.search(None, 'ALL') """# 搜索所有邮件
"""status, data = mail.search(None, '(UNSEEN)')""" # 搜索未读邮件
# 205.近一天未读邮件
status, data = mail.search(None, '(UNSEEN)'.format(since,before)) # 搜索未读邮件
if status == 'OK':
# 获取邮件id列表
email_ids = data[0].split()
split_list=""
return_list=[]
return_list_list=[]
# 遍历邮件id列表
for email_id in email_ids:
# 根据邮件id获取邮件内容
status, data = mail.fetch(email_id, '(RFC822)')
raw_email = data[0][1]
# 解析原始邮件内容
msg = email.message_from_bytes(raw_email)
body = None
# 判断邮件内容类型
if msg.is_multipart():
for part in msg.get_payload():
if part.get_content_type() == 'text/plain':
body = part.get_payload(decode=True).decode('utf-8')
else:
body = msg.get_payload(decode=True).decode('utf-8')
plain_text = un(body)
# print('正文内容:', plain_text)
content=rh.explain_html(plain_text)
# print("content的值:")
# print(content)
for i in content:
split_list = i.split(":")
#限制了是作业完成
if split_list[1]=="作业完成。":
#限制了是今天
if now in content[8]:
for job_name in job_name_list:
if "每天" in job_name:
if content[0].split(":")[1]==job_name:
return_list.append(True)
return_list.append(content[0].split(":")[1])
return_list.append(content[8][5:])
return_list_list.append(return_list)
return return_list_list
#作业结束时间
# 关闭连接
mail.close()
mail.logout()
#20240604
def month_ago_already_read(
imap,
email_my_account,
email_pwd
# job_name_list #工作名的列表
):
mail = imaplib.IMAP4_SSL(imap)
mail.login(email_my_account,email_pwd)
mail.select('INBOX')
one_month_ago = datetime.now()-timedelta(days=40)
one_month_ago_str = one_month_ago.strftime('%d-%b-%Y')
status, messages = mail.search(None, '(BEFORE {date})'.format(date=one_month_ago_str))
if messages:
# 将搜索到的邮件标记为已读
for msg_id in messages[0].split():
mail.store(msg_id, '+FLAGS', '\\Seen')
# 关闭连接
mail.close()
mail.logout()
def change_mail_seen(
imap,
email_my_account,
email_pwd):
mail = imaplib.IMAP4_SSL(imap)
mail.login(email_my_account,email_pwd)
# 选择收件箱
mail.select('inbox')
# 搜索邮件,使用(UNSEEN)只获取未读邮件
status, messages = mail.search(None, 'UNSEEN')
# 解析邮件信息,获取邮件的UID
if messages:
# 获取邮件的UID列表
mail_ids = messages[0].split()
# 遍历邮件UID列表,获取每封邮件的标题
for mail_id in mail_ids:
# 获取邮件的标题
status, data = mail.fetch(mail_id, '(RFC822.HEADER)')
if status == 'OK':
# 解析邮件头部信息,提取标题
header = data[0][1].decode('utf-8')
# 打印邮件标题
# print(header)
# print(header.split('\n')[0])
#首先过滤发件人是jiankong@lw.gov.cn
if email_bk_account in header:
if "每日" in header:
for row in header.split('\n'):
if "Subject" in row:
print(row)
if "Date" in row:
date_str=row.replace("Date: ","")
datetime_obj = datetime.strptime(date_str, "%a, %d %b %Y %H:%M:%S +0800 (CST) ")
# 按照年月日的格式输出
date_formatted = datetime_obj.strftime("%Y-%m-%d")
# print(date_formatted)
# print("4天的值:")
# print(du.acount_date(4))
# print("2天的值")
# print(du.acount_date(2))
# print("列表")
# print(du.generate_date_range(du.acount_date(4),du.acount_date(2)))
if date_formatted in du.generate_date_range(du.acount_date(4),du.acount_date(2)):
mail.store(mail_id, '+FLAGS', '\\SEEN')
# print("ok")
# print(date_formatted)
if "每天" in header:
for row in header.split('\n'):
if "Subject" in row:
print(row)
if "Date" in row:
date_str=row.replace("Date: ","")
datetime_obj = datetime.strptime(date_str, "%a, %d %b %Y %H:%M:%S +0800 (CST) ")
# 按照年月日的格式输出
date_formatted = datetime_obj.strftime("%Y-%m-%d")
# print(date_formatted)
# print("4天的值:")
# print(du.acount_date(4))
# print("2天的值")
# print(du.acount_date(2))
# print("列表")
# print(du.generate_date_range(du.acount_date(4),du.acount_date(2)))
if date_formatted in du.generate_date_range(du.acount_date(4),du.acount_date(2)):
mail.store(mail_id, '+FLAGS', '\\SEEN')
# print("ok")
# print(date_formatted)
if "每周" in header:
for row in header.split('\n'):
if "Subject" in row:
print(row)
if "Date" in row:
date_str=row.replace("Date: ","")
datetime_obj = datetime.strptime(date_str, "%a, %d %b %Y %H:%M:%S +0800 (CST) ")
date_formatted = datetime_obj.strftime("%Y-%m-%d")
# print(date_formatted)
# print("14天的值:")
# print(du.acount_date(14))
# print("7天的值")
# print(du.acount_date(7))
# print("列表")
# print(du.generate_date_range(du.acount_date(14),du.acount_date(7)))
if date_formatted in du.generate_date_range(du.acount_date(14),du.acount_date(7)):
mail.store(mail_id, '+FLAGS', '\\SEEN')
# print("ok")
# print(date_formatted)
if "每月" in header:
for row in header.split('\n'):
if "Subject" in row:
print(row)
if "Date" in row:
date_str=row.replace("Date: ","")
datetime_obj = datetime.strptime(date_str, "%a, %d %b %Y %H:%M:%S +0800 (CST) ")
date_formatted = datetime_obj.strftime("%Y-%m-%d")
# print(date_formatted)
# print("14天的值:")
# print(du.acount_date(58))
# print("7天的值")
# print(du.acount_date(28))
# print("列表")
# print(du.generate_date_range(du.acount_date(58),du.acount_date(28)))
if date_formatted in du.generate_date_range(du.acount_date(58),du.acount_date(28)):
mail.store(mail_id, '+FLAGS', '\\SEEN')
# print("ok")
# print(date_formatted)
# 关闭连接
mail.close()
mail.logout()
if __name__ == '__main__':
# # 创建固定范围的列表
# start_date = datetime.strptime('2020-01-01', '%Y-%m-%d')
# end_date = datetime.strptime('2020-12-31', '%Y-%m-%d')
# date_range = generate_date_range(start_date, end_date)
# print(date_range)
imap=si.ssh_access().imap
email_pwd=si.ssh_access().email_pwd
email_my_account=si.ssh_access().emal_my_account
email_bk_account=si.ssh_access().email_bk_account
# get_mail_name(
# imap,
# email_my_account,
# email_pwd)
change_mail_seen(imap,email_my_account,email_pwd)
# #
# day_backs=check_backup(
# imap,
# email_my_account,
# email_pwd,
# bji.job_info().job_name_list
# )
# print(day_backs)
'''月全备'''
# cmfb=check_month_full_backup(
# imap,
# email_my_account,
# email_pwd,
# bji.job_info().check_month_full_status["oadoc2_full_status"],
# day_gaps
# )
# print(cmfb)
'''增备'''
# oracle_append_status_is_ok=check_today_backup(
# imap,
# email_my_account,
# email_pwd,
# bji.job_info().check_today_append_status['oracle_append_status'])
# print("获取是否成功")
# print(oracle_append_status_is_ok)
# oa_middleware_full_is_ok=check_week_full_backup(
# imap,
# email_my_account,
# email_pwd,
# # bji.job_info().check_week_full_status['oa_middleware_full_status'])
# bji.job_info().check_week_full_status['oa_apche3_full_status'])
# print("获取是否成功")
# print(oa_middleware_full_is_ok)
'''
这个是例如
content的值:
[
'作业:Oracle每日增量备',
'级别:INFO',
'主机:荔湾区通用OA系统-数据库',
'地址:10.108.183.19',
'模块:rac-backup.oracle.linux-x86',
'资源:lwgov1',
'时间:2024-04-19T00:17:15+08:00',
'开始时间:2024-04-19T00:00:00+08:00',
'结束时间:2024-04-19T00:19:38+08:00',
'错误码:0x3000103060100002',
'消息:作业完成。']
'''