基于django management/commands目录下的脚本
from django.core.management import BaseCommand
import logging
import uuid
from pia.utils.cache import reset_redis_expire
from pia.utils.reids_key import TASK_KEY
logging = logging.getLogger('task')
"""
定时任务: 定时任务 看门狗续时
"""
class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument('trace_id', type=str)
def handle(self, *args, **options):
trace_id = options["trace_id"]
if not trace_id:
trace_id = str(uuid.uuid4())
self.trace_id = trace_id
self.writeLog('watchdog continuation start')
reset_redis_expire(TASK_KEY + "privacy_detect", 80)
reset_redis_expire(TASK_KEY + "privacy_analysis", 80)
reset_redis_expire(TASK_KEY + "privacy_notify", 80)
reset_redis_expire(TASK_KEY + "tx_organization", 80)
reset_redis_expire(TASK_KEY + "send_message", 80)
reset_redis_expire(TASK_KEY + "todo_notify", 80)
reset_redis_expire(TASK_KEY + "thirdparty_ex_notify", 80)
reset_redis_expire(TASK_KEY + "cookies_scan", 80)
reset_redis_expire(TASK_KEY + "xuanwu_app_scan", 80)
self.writeLog('watchdog continuation end')
def writeLog(self, msg: str):
logging.info(f'[{self.trace_id}] {msg}')
import sys
import os
from pathlib import Path
from tasks import CrontabTask
# 添加 Django 项目的根目录到 Python 路径
django_project_path = Path(__file__).resolve().parent.parent.parent
sys.path.append(str(django_project_path))
# 设置 DJANGO_SETTINGS_MODULE 环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'tencheck.settings')
if __name__ == '__main__':
"""总入口 脚本启动信息"""
# crontab任务 redis key
redis_key = sys.argv[1]
# crontab任务名称
task_name = sys.argv[2]
# 环境yaml文件
env = sys.argv[3]
env_name = sys.argv[4]
config_name = sys.argv[5]
rainbow_key = sys.argv[6]
os.environ.setdefault('ENV_NAME', env_name)
os.environ.setdefault('CONFIG_NAME', config_name)
os.environ.setdefault('RAINBOW_KEY', rainbow_key)
task = CrontabTask(env=env, redis_key=redis_key, task_name=task_name)
task.start()
from pathlib import Path
import os
import sys
import logging
import time
import traceback
import uuid
import yaml
from django.core.cache import cache
TASK_KEY = "pia:crontab:"
class BaseTask:
def __init__(self, *args, **kwargs):
self.env = kwargs.get('env')
self.task_name = kwargs.get('task_name')
self.redis_key = kwargs.get('redis_key')
self.workspace = Path(__file__).resolve().parent.parent.parent
self.config = self.load_config()
self.pid = os.getpid()
self.trace_id = str(uuid.uuid4())
def load_config(self):
""" 读取配置文件 """
config = {}
with open(os.path.join(self.workspace, f"task/conf/config-{self.env}.yaml"), 'r') as f:
config = yaml.safe_load(f)
return config
def writelog(self, text: str) -> None:
""" 记录日志 """
log_path = self.config['log_path']
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s")
file_handler = logging.FileHandler(log_path + "/crontab_task.log") # 日志目录
file_handler.setFormatter(formatter)
logger = logging.getLogger()
logger.addHandler(file_handler)
logger.setLevel(logging.DEBUG)
text = time.strftime(f'[PID:{self.pid}] ', time.localtime(time.time())) + text
logger.info(f'[{self.trace_id}] ' + text) # 写入日志
logger.removeHandler(file_handler)
file_handler.close()
def script_err_report(push_user: list, info: str) -> None:
""" 脚本告警信息 """
# writelog(info)
pass
def start_crontab(self) -> int:
"""redis控制脚本执行"""
try:
redis_key = self.redis_key
"""保证原子性 True if lock acquired, False otherwise"""
task_status = cache.add(TASK_KEY + redis_key, True, 80)
if task_status:
self.writelog(f"task set redis key {TASK_KEY + redis_key}")
return 0
else:
self.writelog(f"task redis key {TASK_KEY + redis_key} exists")
return 1
except Exception:
self.writelog(f"redis connect fail {traceback.format_exc()}")
def end_crontab(self) -> None:
""" 清除redis信息 """
try:
redis_key = self.redis_key
cache.delete(TASK_KEY + redis_key)
self.writelog(f"clean redis key {TASK_KEY + redis_key}")
except Exception:
self.writelog(f"redis connect fail {traceback.format_exc()}")
def start(self):
start_time = time.time()
execute = False
try:
self.writelog(f"task {self.task_name} start")
if self.start_crontab() != 0:
error_info = f"task {self.task_name} not finish!"
self.writelog(error_info)
sys.exit(0)
execute = True
self.do_work()
except Exception as e:
result_err_info = traceback.format_exc()
self.writelog(f"{self.task_name} ERROR: {result_err_info}")
# 告警
# self.script_err_report(SCRIPT_ERR_REPORT_USER, result_err_info)
finally:
end_time = time.time()
self.writelog(f"{self.task_name} cost time {end_time - start_time}")
if execute:
self.end_crontab()
def do_work(self):
""" 脚本核心功能主流程 """
# self.writelog("privacy analysis task start")
# cmds = [f'cd {WORKSPACE_DIR} && {self.config["privacy_detect"]["python_path"]} manage.py privacy_analysis {self.trace_id}']
# retcode = subprocess.call(cmds, shell=True)
# self.writelog(f"privacy analysis task end retcode:{retcode}")
import subprocess
from base_task import BaseTask
class CrontabTask(BaseTask):
def do_work(self):
config = self.config
task_name = self.task_name
workspace = self.workspace
self.writelog(f"{task_name} task start")
cmds = [f'cd {workspace} && {config["python_path"]} manage.py {task_name} {self.trace_id}']
retcode = subprocess.call(cmds, shell=True)
self.writelog(f"{task_name} task end retcode:{retcode}")
将任务添加到Linux服务器的 crontab里面
# argv[1]------redis_key ; argv[2]------task_name即 commands目录下 crontab_前缀的文件 ; argv[3]------配置文件 ; argv[4]------环境变量 ; argv[5]------组 ; argv[6]------通道
*/1 * * * * cd /usr/local/oit/tencheck && ../env/bin/python3.11 task/script/start_task.py watchdog crontab_watchdog tx xx xx xx >/dev/null 2>&1 &