说明
任务可以分为两种:
- 1 静态(存量)任务
- 2 动态(增量)任务
对于静态任务来说,一般可以事先分好大小均匀的若干block任务,这时候比较强调使用分布式系统进行快速处理;对于动态任务来说,主要按时间+区块大小划分。对于小的应用场景而言
内容
写的时候已经实验成功了,其实还挺好用的。刚开始比较担心job是阻塞性质的,后来发现其实并不会这样。
如果,我们把需要处理的内容包成了worker.py的形式,那么aps去调用的时候其实并不会阻塞,而且可以通过限制最大活动的实例数来控制并发数。
通过这种方式,绕开了python单线程-单核的限制,也就是实现了并发
每个程序是一个线程,这个是合理的。
静态调度
例如这样,会隔10启动一个worker,最多不超过5个。
from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingScheduler
def exe_sh(cmd = None):
os.system(cmd)
# 后台启动命令 nohup python3 aps.py >/dev/null 2>&1 &
if __name__ == '__main__':
sche1 = BlockingScheduler()
# sche1.add_job(exe_sh,'interval', seconds=1, kwargs ={'cmd':'python3 ./main_handler/main.py'})
sche1.add_job(exe_sh,'interval', seconds=10,
kwargs ={'cmd':'python3 ./debugs/result_void/random_bug_blank_left_right_worker.py'},
max_instances=5,coalesce=True)
print('[S] starting inteverl')
sche1.start()
动态调度
这个可以放到之后默认的系统资源监控搭建起来再做。
简单的来说,可以对exe_sh这个任务进行改造:
- 1 执行前读取调度配置
- 2 读取系统资源
- 3 根据资源(数据)和配置(规则)来决定是否在执行周期内发起一个新的worker命令
这样就会由一个静态的方法转为了动态方法:在系统资源富余时,可以让任务跑的更快一点。在资源受限时,会进行降速,避免资源被挤爆了。
智能调度
这个事其实可以等价为约束优化问题。在资源受限的情况下,如何合理的安排任务,让资源最大化的利用。
每次调度,都会有评估(Evaluate)来决定下一步怎么做。
更具体一点,每类任务都有其权重,且有最低执行速度要求。这个是任务本身的约束。另外就是系统运行的安全边界:例如CPU的利用率应当不高于90%,内存不高于80%。