场景:
需要对mysql进行定时备份,受限于硬盘空间的大小,需要对备份的数据需要定时清理
python代码实现:
# -*- coding:UTF-8 -*-
"""
@ProjectName : HotelGo2DelonixPmx
@FileName : fix_missing_rates
@Description :
使用mysqldump命令行工具进行备份:
在命令行中执行以下命令:mysqldump -u 用户名 -p 密码 数据库名 > 备份文件.sql。
这将生成一个包含所有SQL语句的备份文件,其中包括创建表、插入数据等操作。
要还原备份,可以使用以下命令:mysql -u 用户名 -p 密码 新数据库名 < 备份文件.sql。
@Time : 2023/7/4 15:48
@Author : Qredsun
"""
import datetime
import functools
import os
import subprocess
import sys
import time
import schedule
from loguru import logger
# 数据库登录账户、密码
USERNAME = ''
PASSWORD = ''
# 指定备份存放路径
BACKUP_DIR = '/path/to/backup'
# 备份周期
RETENTION_PERIOD = 7
logger.remove() # 删去import logger之后自动产生的handler,不删除的话会出现重复输出的现象
handler_id = logger.add(sys.stderr, level="INFO") # 添加一个可以修改控制的handler
info_log_path = os.path.join('log', datetime.datetime.now().strftime("database_backup_%y_%m_%d_debug.log"))
logger.add(info_log_path,
rotation="23:00",
retention="7 days",
level="DEBUG")
info_log_path = os.path.join('log', datetime.datetime.now().strftime("database_backup_%y_%m_%d_error.log"))
logger.add(info_log_path,
rotation="23:00",
retention="7 days",
level="ERROR")
# 备份数据库
def backup_datebase():
current_time = datetime.datetime.now()
backup_file = current_time.strftime('backup_%y_%m_%d_%H_%M_%S.sql')
backup_path = os.path.join(BACKUP_DIR, backup_file)
# 使用 subprocess 模块执行数据备份命令 备份数据库所有数据
back_command = [
'mysqldump',
'-u',
USERNAME,
'-p',
PASSWORD,
'--all-database'
]
with open(backup_path, 'w') as fp:
subprocess.run(back_command, stdout=fp)
logger.info(f'数据库备份已经完成,备份文件保存路径:{backup_path}')
# 清理过期的备份文件
def cleanup_backup():
current_time = datetime.datetime.now()
expiration_time = current_time - datetime.timedelta(days=RETENTION_PERIOD)
for file_name in os.listdir(BACKUP_DIR):
file_path = os.path.join(BACKUP_DIR, file_name)
if not os.path.isfile(file_path):
continue
file_time = datetime.datetime.fromtimestamp(os.path.getatime(file_path))
if file_time < expiration_time:
os.remove(file_path)
logger.info(f'删除过期文件:{file_path}')
# 数据备份恢复
def restore_database(backup_file):
backup_path = os.path.join(BACKUP_DIR, backup_file)
# 使用 subprocess 模块执行数据恢复命令
restore_command = [
'mysqldump',
'-u',
USERNAME,
'-p',
PASSWORD,
'--all-database'
]
with open(backup_path, 'r') as fp:
subprocess.run(restore_command, stdin=fp)
logger.info(f'使用 {backup_path} 完成数据库数据恢复')
def catch_exceptions(cancel_on_failure = False):
def catch_exceptions_decorator(job_func):
@functools.wraps(job_func)
def wrapper(*args, **kwargs):
try:
return job_func(*args, **kwargs)
except:
import traceback
traceback.format_exc()
if cancel_on_failure:
return schedule.CancelJob
return wrapper
return catch_exceptions_decorator
# 异常捕获方法的使用
@catch_exceptions(cancel_on_failure=True)
def task_job():
# 定时任务
backup_datebase()
cleanup_backup()
schedule.every().day.at("23:30").do(task_job)
while True:
schedule.run_pending() # 运行所有可以运行的任务
time.sleep(60*30)